generated from container/tmpl
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
585 lines
25 KiB
585 lines
25 KiB
import os
|
|
import datetime
|
|
import configparser
|
|
import math
|
|
from decimal import Decimal, ROUND_HALF_UP
|
|
from clickhouse_driver import Client
|
|
from docx import Document
|
|
from docx.shared import Inches, Cm, Pt, RGBColor
|
|
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT, WD_LINE_SPACING
|
|
from docx.enum.table import WD_TABLE_ALIGNMENT, WD_CELL_VERTICAL_ALIGNMENT
|
|
from docx.oxml.ns import qn, nsdecls
|
|
from docx.oxml import parse_xml
|
|
import matplotlib.pyplot as plt
|
|
import matplotlib.ticker as ticker
|
|
import numpy as np
|
|
from app import db_helper, image_helper
|
|
import qiniu
|
|
import logging
|
|
|
|
REPORT_DIR = 'output'
|
|
LOG_DIR = 'log'
|
|
|
|
|
|
# 小于十万的整数转换
|
|
def number_to_chinese(number):
|
|
chinese_digits = ['零', '一', '二', '三', '四', '五', '六', '七', '八', '九']
|
|
chinese_units = ['', '十', '百', '千', '万']
|
|
|
|
if number == 0:
|
|
return chinese_digits[0]
|
|
|
|
result = ''
|
|
unit_index = 0
|
|
last_digit_is_zero = False # 记录上一位数字是否为零
|
|
while number > 0:
|
|
digit = number % 10
|
|
if digit != 0:
|
|
if last_digit_is_zero:
|
|
result = chinese_digits[0] + result # 添加零
|
|
result = chinese_digits[digit] + chinese_units[unit_index] + result
|
|
last_digit_is_zero = False
|
|
else:
|
|
# 如果当前位是零,并且结果不为空,则记录上一位为零
|
|
if result != '':
|
|
last_digit_is_zero = True
|
|
|
|
unit_index += 1
|
|
number //= 10
|
|
|
|
# 如果结果以'一十'开头,则去掉'一',只保留'十'
|
|
if result[:2] == chinese_digits[1] + chinese_units[1]:
|
|
result = result[1:]
|
|
|
|
return result
|
|
|
|
|
|
def round_to_integer(v):
|
|
if math.isnan(v):
|
|
return v
|
|
integer_in_decimal = Decimal(v).quantize(Decimal('0'), rounding=ROUND_HALF_UP) # 取整,如:2.5 -> 3 (not 2), 3.5 -> 4
|
|
return int(integer_in_decimal)
|
|
|
|
|
|
def get_report_year_month():
|
|
# 获取上月的最后一天
|
|
last_day = datetime.datetime.now().replace(day=1) - datetime.timedelta(days=1)
|
|
return last_day.year, last_day.month
|
|
|
|
|
|
def get_report_start_end():
|
|
# 获取当前日期
|
|
now = datetime.datetime.now()
|
|
# 获取本月第一天
|
|
this_month_start = datetime.datetime(now.year, now.month, 1)
|
|
# 获取上个月最后一天
|
|
last_month_end = this_month_start - datetime.timedelta(days=1)
|
|
# 获取上个月第一天
|
|
last_month_start = datetime.datetime(last_month_end.year, last_month_end.month, 1)
|
|
return last_month_start, this_month_start
|
|
|
|
|
|
def set_paragraph_space(p, line_spacing=1.5):
|
|
# 设置间距
|
|
p.paragraph_format.space_before = Pt(6) # 段前
|
|
p.paragraph_format.space_after = Pt(6) # 段后
|
|
p.paragraph_format.line_spacing = line_spacing # 设置段落行距:默认1.5倍行距
|
|
|
|
|
|
def set_paragraph_format(p):
|
|
set_paragraph_space(p)
|
|
# 设置首行缩进2个字符
|
|
p.paragraph_format.first_line_indent = Inches(0.3)
|
|
|
|
|
|
def set_heading_format(h, r, head_level=1, line_spacing=1.5):
|
|
set_paragraph_space(h, line_spacing)
|
|
r.font.name = 'Times New Roman'
|
|
r._element.rPr.rFonts.set(qn('w:eastAsia'), u'宋体')
|
|
r.font.size = Pt(16) if head_level == 1 else Pt(15) # 三号 -> Pt(16), 小三 -> Pt(15)
|
|
r.font.color.rgb = RGBColor(0, 0, 0)
|
|
r.bold = True
|
|
|
|
|
|
def set_title_format(h, r):
|
|
h.paragraph_format.space_before = Pt(6) # 段前0.5行
|
|
h.paragraph_format.space_after = Pt(6) # 段后0.5行
|
|
h.paragraph_format.line_spacing = 1.5 # 1.5倍行距
|
|
h.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
|
|
r.font.name = u'Times New Roman'
|
|
r._element.rPr.rFonts.set(qn('w:eastAsia'), u'宋体')
|
|
r.font.size = Pt(18) # 小二
|
|
r.font.color.rgb = RGBColor(0, 0, 0)
|
|
r.bold = True
|
|
|
|
|
|
def set_global_normal_style(doc):
|
|
style = doc.styles['Normal']
|
|
style.font.name = 'Times New Roman' # 英文、数字字体
|
|
style._element.rPr.rFonts.set(qn('w:eastAsia'), u'宋体') # 中文字体
|
|
style.font.size = Pt(14) # 四号
|
|
style.paragraph_format.space_before = 0 # 段前
|
|
style.paragraph_format.space_after = 0 # 段后
|
|
style.paragraph_format.line_spacing_rule = WD_LINE_SPACING.SINGLE # 单倍行距
|
|
# 设置页边距
|
|
section = doc.sections[0]
|
|
section.top_margin = Cm(1.27)
|
|
section.right_margin = Cm(0.5)
|
|
section.bottom_margin = Cm(1.27)
|
|
section.left_margin = Cm(0.5)
|
|
|
|
|
|
# 设置单元格底纹
|
|
def set_table_cell_shading_color(table_cell, color):
|
|
shading_elm = parse_xml(r'<w:shd {} w:fill="{bgColor}"/>'.format(nsdecls('w'), bgColor=color))
|
|
table_cell._tc.get_or_add_tcPr().append(shading_elm)
|
|
|
|
|
|
# 设置表格行底纹
|
|
def set_table_row_shading_color(table_row, color):
|
|
shading_dict = {}
|
|
for i, cell in enumerate(table_row.cells):
|
|
shading_dict['shading_elm_' + str(i)] = parse_xml(r'<w:shd {} w:fill="{bgColor}"/>'.format(nsdecls('w'), bgColor=color))
|
|
cell._tc.get_or_add_tcPr().append(shading_dict['shading_elm_' + str(i)])
|
|
|
|
|
|
def add_heading(doc, head_level, head_content, line_spacing=1.5):
|
|
h = doc.add_heading(level=head_level)
|
|
r = h.add_run(head_content)
|
|
set_heading_format(h, r, head_level, line_spacing)
|
|
|
|
|
|
def create_twinx_chart(department, title, filename):
|
|
plt.rcParams["font.sans-serif"] = ["SimHei"] # 设置字体
|
|
plt.rcParams["axes.unicode_minus"] = False # 正常显示负号
|
|
|
|
x = []
|
|
y1 = []
|
|
y2 = []
|
|
for user in department['d_users']:
|
|
x.append(user[0]) # 用户名
|
|
y1.append(user[2]) # 用户处理流程耗时
|
|
y2.append(user[1]) # 用户处理流程条数
|
|
|
|
x_range = np.arange(len(x))
|
|
|
|
bgcolor = '#d9d9d9'
|
|
|
|
fig, ax1 = plt.subplots(figsize=(10, 6), layout='constrained', facecolor=bgcolor)
|
|
ax1.set_ylabel('平均处理流程时间(小时)')
|
|
y1_bar = ax1.bar(x_range, y1, width=0.25, label='流程处理时间', color=(91 / 255, 155 / 255, 213 / 255))
|
|
|
|
ax1.set_xticks(x_range, x)
|
|
# 设置x轴标签旋转角度
|
|
plt.xticks(rotation=-30, ha='left')
|
|
|
|
ax2 = ax1.twinx() # instantiate a second axes that shares the same x-axis
|
|
|
|
y2_line, = ax2.plot(x_range, y2, label='处理流程条数', color='#ff0000', marker='o', linestyle='dashed', linewidth=2)
|
|
ax2.set_ylabel('处理流程条数(条)')
|
|
ax2.set_ylim(0, max(y2) + 1)
|
|
ax2.yaxis.set_major_locator(ticker.MaxNLocator(integer=True)) # 设置整数刻度
|
|
|
|
annotate_kwargs = {'color': '#ffffff', 'arrowprops': dict(color='#a5a5a5', arrowstyle='-')}
|
|
|
|
# y2_line:添加数据标签
|
|
# UserWarning: constrained_layout not applied because axes sizes collapsed to zero.
|
|
for x, y in zip(x_range, y2):
|
|
xtext_offset = 0.01 if len(x_range) == 1 else 0.1
|
|
ax2.annotate(f"{y}", xy=(x, y), xytext=(x + xtext_offset, y + 0.1), backgroundcolor='#000000', **annotate_kwargs)
|
|
|
|
# 绘制均值线:相对于左侧坐标轴"平均处理流程时间(小时)"绘制
|
|
hline_kwargs = {'linestyle': 'solid', 'linewidth': 2}
|
|
axhline1 = ax1.axhline(department['d_elapse'], label='部门平均时间', color='#ed7d31', **hline_kwargs)
|
|
axhline2 = ax1.axhline(procinst_duration_by_company, label='公司平均时间', color='#a5a5a5', **hline_kwargs)
|
|
|
|
# 获取 y1 轴两个刻度之间的距离
|
|
y1_ticks = ax1.get_yticks()
|
|
y1_ticks_distance = y1_ticks[1] - y1_ticks[0] if len(y1_ticks) > 1 else y1_ticks[0]
|
|
|
|
# y1_bar:添加数据标签
|
|
bar_label_kwargs = {'color': '#ffffff', 'backgroundcolor': '#8ba7de'}
|
|
y1_threshold = y1_ticks_distance / 2
|
|
y1_bar_large = [v if v > y1_threshold else '' for v in y1]
|
|
y1_bar_small = [v if v <= y1_threshold else '' for v in y1]
|
|
ax1.bar_label(y1_bar, y1_bar_large, padding=-20, **bar_label_kwargs)
|
|
ax1.bar_label(y1_bar, y1_bar_small, padding=3, **bar_label_kwargs)
|
|
|
|
# 均值线:添加数据标签
|
|
x_pos = 0 if len(x_range) == 1 else 1
|
|
xtext_benchmark = 0 if len(x_range) == 1 else 1
|
|
xtext_offset = 0.03 if len(x_range) == 1 else 0.3
|
|
ytext_offset = y1_ticks_distance / 3
|
|
ax1.annotate(f"{department['d_elapse']}", xy=(x_pos, department['d_elapse']),
|
|
xytext=(xtext_benchmark - xtext_offset, department['d_elapse'] + ytext_offset),
|
|
backgroundcolor='#f5b482', **annotate_kwargs)
|
|
ax1.annotate(f"{procinst_duration_by_company}", xy=(x_pos, procinst_duration_by_company),
|
|
xytext=(xtext_benchmark + xtext_offset, procinst_duration_by_company + ytext_offset),
|
|
backgroundcolor='#ef95a0', **annotate_kwargs)
|
|
|
|
plt.title(title, fontsize=14, pad=30) # pad: 标题与坐标轴顶部的偏移量
|
|
fig.legend(handles=[y1_bar, axhline1, axhline2, y2_line], loc='outside lower center', ncols=4, facecolor=bgcolor, edgecolor=bgcolor)
|
|
|
|
ax1.set_xlabel(department['d_name'], fontsize=12, labelpad=8) # 在图形下方显示部门名称
|
|
ax1.set_facecolor(bgcolor) # 设置绘图区域的背景色
|
|
# 去掉绘图区域的边框线和坐标轴刻度线
|
|
axes = [ax1, ax2]
|
|
for ax in axes:
|
|
ax.spines['top'].set_visible(False)
|
|
ax.spines['bottom'].set_visible(False)
|
|
ax.spines['left'].set_visible(False)
|
|
ax.spines['right'].set_visible(False)
|
|
ax.tick_params(axis='both', which='both', length=0)
|
|
|
|
# 设置绘图区与图片边缘的距离
|
|
# plt.subplots_adjust(left=0.08, right=0.94, top=0.9, bottom=0.24)
|
|
|
|
# 保存图形
|
|
plt.savefig('{}/{}'.format(REPORT_DIR, filename))
|
|
plt.close()
|
|
|
|
|
|
def add_section(doc, department_data):
|
|
d_name = department_data['d_name']
|
|
d_elapse = department_data['d_elapse']
|
|
d_elapse_rank = department_data['d_elapse_rank']
|
|
d_count = department_data['d_count']
|
|
d_count_rank = department_data['d_count_rank']
|
|
d_users = department_data['d_users']
|
|
|
|
d_users_gt_company_avg_duration = [u[0] for u in d_users if u[2] > procinst_duration_by_company]
|
|
|
|
if d_elapse_rank > 1:
|
|
doc.add_page_break() # 插入分页符
|
|
|
|
add_heading(doc, 2, '({}){}'.format(number_to_chinese(d_elapse_rank), d_name), line_spacing=2)
|
|
|
|
p = doc.add_paragraph(f'部门人均处理流程平均耗时时长排名:{d_elapse_rank};', style='List Bullet')
|
|
set_paragraph_space(p, line_spacing=2)
|
|
|
|
operator = '>' if d_elapse > procinst_duration_by_company else '<' if d_elapse < procinst_duration_by_company else '=='
|
|
p = doc.add_paragraph(f'部门处理流程平均耗时时长:{d_elapse}小时{operator}公司平均{procinst_duration_by_company}小时;',
|
|
style='List Bullet')
|
|
set_paragraph_space(p, line_spacing=2)
|
|
|
|
users_count = len(d_users_gt_company_avg_duration)
|
|
p = doc.add_paragraph(style='List Bullet')
|
|
p.add_run(f'部门处理流程平均耗时大于公司平均水平的有{users_count}人')
|
|
p.add_run(f":{'、'.join(d_users_gt_company_avg_duration)};" if users_count > 0 else ";")
|
|
set_paragraph_space(p, line_spacing=2)
|
|
|
|
p = doc.add_paragraph(f'人均处理流程条数排名:{d_count_rank};', style='List Bullet')
|
|
set_paragraph_space(p, line_spacing=2)
|
|
|
|
operator = '>' if d_count > procinst_count_by_company else '<' if d_count < procinst_count_by_company else '=='
|
|
p = doc.add_paragraph(f'人均处理流程条数:{d_count}条{operator}公司平均{procinst_count_by_company}条;', style='List Bullet')
|
|
set_paragraph_space(p, line_spacing=2)
|
|
|
|
pic_name = f"{d_name}-平均处理流程时间及流程处理条数.png"
|
|
create_twinx_chart(department_data, "部门成员平均处理流程时间及流程处理条数", pic_name)
|
|
|
|
# 插入图形到 Word 文档中
|
|
p = doc.add_paragraph()
|
|
p.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
|
|
r = p.add_run()
|
|
r.add_picture('{}/{}'.format(REPORT_DIR, pic_name), width=Cm(20.27))
|
|
|
|
|
|
def add_chapter_2(doc):
|
|
doc.add_page_break() # 插入分页符
|
|
add_heading(doc, 1, '二、各部门成员详情', line_spacing=2)
|
|
|
|
# 部门流程处理条数排行
|
|
department_rank_by_count = [item[0] for item in procinst_by_department] # procinst_by_department 默认按照流程条数倒序排列
|
|
|
|
# 部门流程处理耗时排行
|
|
department_sorted_by_elapse = sorted(procinst_by_department, key=lambda d: d[2], reverse=True)
|
|
department_rank_by_elapse = [item[0] for item in department_sorted_by_elapse]
|
|
|
|
for item in department_sorted_by_elapse:
|
|
department = item[0]
|
|
d_count = item[1]
|
|
d_elapse = item[2]
|
|
d_count_rank = department_rank_by_count.index(department) + 1
|
|
d_elapse_rank = department_rank_by_elapse.index(department) + 1
|
|
|
|
add_section(doc, {'d_name': department,
|
|
'd_elapse': d_elapse,
|
|
'd_elapse_rank': d_elapse_rank,
|
|
'd_count': d_count,
|
|
'd_count_rank': d_count_rank,
|
|
'd_users': item[3]})
|
|
|
|
|
|
def insert_table(doc, table_headers, col_width_dict, table_data, table_style):
|
|
# 在文档中添加一个空表格
|
|
table = doc.add_table(rows=0, cols=len(table_headers), style='Table Grid')
|
|
|
|
# 添加表头
|
|
heading_cells = table.add_row().cells
|
|
for i, header in enumerate(table_headers):
|
|
heading_cells[i].text = header
|
|
heading_cells[i].paragraphs[0].runs[0].bold = True # 字体加粗
|
|
|
|
# 循环添加数据
|
|
avg_row_added = False
|
|
for i, row_data in enumerate(table_data):
|
|
if table_style in ['style2', 'style3']:
|
|
avg_value = {'style2': procinst_duration_by_company, 'style3': procinst_count_by_company}
|
|
if not avg_row_added and row_data[1] < avg_value[table_style]:
|
|
# 添加“平均”值行
|
|
row = table.add_row()
|
|
row_cells = row.cells
|
|
row_cells[0].merge(row_cells[1]).text = '平均' # 合并单元格
|
|
row_cells[2].text = str(avg_value[table_style])
|
|
set_table_row_shading_color(row, 'ff0000')
|
|
row_cells[0].paragraphs[0].runs[0].bold = True # 字体加粗
|
|
row_cells[2].paragraphs[0].runs[0].bold = True
|
|
avg_row_added = True
|
|
|
|
row = table.add_row()
|
|
row_cells = row.cells
|
|
|
|
row_cells[0].text = str(i + 1)
|
|
for j, cell_value in enumerate(row_data):
|
|
col_index = j + 1
|
|
row_cells[col_index].text = str(cell_value)
|
|
|
|
if table_style == 'style1':
|
|
if row_data[1] >= procinst_count_by_company:
|
|
set_table_cell_shading_color(row_cells[2], '#fedb61')
|
|
if row_data[2] >= procinst_duration_by_company:
|
|
set_table_cell_shading_color(row_cells[3], '#ff0000')
|
|
elif table_style == 'style2' and row_data[1] >= procinst_duration_by_company:
|
|
set_table_row_shading_color(row, 'fedb61')
|
|
|
|
if table_style == 'style1':
|
|
# 统计值:平均值
|
|
row_cells = table.add_row().cells
|
|
avg_data = [len(table_data) + 1, '公司平均', procinst_count_by_company, procinst_duration_by_company]
|
|
for i, cell_value in enumerate(avg_data):
|
|
row_cells[i].text = str(cell_value)
|
|
row_cells[i].paragraphs[0].runs[0].bold = True # 字体加粗
|
|
|
|
# 设置表格垂直对齐方式
|
|
table.alignment = WD_TABLE_ALIGNMENT.CENTER
|
|
|
|
# 设置单元格样式
|
|
for row in table.rows:
|
|
for cell in row.cells:
|
|
cell.vertical_alignment = WD_CELL_VERTICAL_ALIGNMENT.CENTER # 垂直居中
|
|
cell.paragraphs[0].alignment = WD_PARAGRAPH_ALIGNMENT.CENTER # 水平居中
|
|
for col_index, col_width in col_width_dict.items(): # 设置列宽
|
|
row.cells[col_index].width = Cm(col_width)
|
|
row.height = Cm(0.7) # 设置行高
|
|
|
|
|
|
def add_chapter_1(doc):
|
|
try:
|
|
qs = db_helper.querystring_user_count(start_time, end_time)
|
|
user_count = client.execute(qs)[0][0]
|
|
except Exception as error:
|
|
print('数据库查询错误:', error)
|
|
logging.error('数据库查询错误:{}'.format(error))
|
|
raise RuntimeError('数据库查询错误:', error)
|
|
|
|
department_count_gt_avg_count = len(list(filter(lambda d: d[1] > procinst_count_by_company, procinst_by_department)))
|
|
department_count_gt_avg_duration = len(list(filter(lambda d: d[2] > procinst_duration_by_company, procinst_by_department)))
|
|
user_count_gt_avg_count = len(list(filter(lambda d: d[1] > procinst_count_by_company, procinst_by_user)))
|
|
user_count_gt_avg_duration = len(list(filter(lambda d: d[2] > procinst_duration_by_company, procinst_by_user)))
|
|
|
|
add_heading(doc, 1, '一、公司各部门横向比较情况')
|
|
add_heading(doc, 2, '(一)一览表')
|
|
p = doc.add_paragraph()
|
|
p.add_run('{}年{}月,公司共计{}人使用项企平台,'.format(last_year, last_month, user_count))
|
|
p.add_run('个人处理流程平均耗时{}小时/流程,平均单一流程处理耗时高于公司平均数的有{}个部门,{}人。'.format(
|
|
procinst_duration_by_company, department_count_gt_avg_duration, user_count_gt_avg_duration))
|
|
p.add_run('个人处理流程平均条数为{}条,个人处理流程平均条数高于公司平均数的有{}个部门,{}人。'.format(
|
|
procinst_count_by_company, department_count_gt_avg_count, user_count_gt_avg_count))
|
|
set_paragraph_format(p)
|
|
|
|
table_data = list(map(lambda d: (d[0], d[1], d[2]), procinst_by_department))
|
|
insert_table(doc, ['序号', '部门', '人均处理流程条数', '平均单条流程处理时间'], {0: 1.8, 1: 5.8, 2: 5.9, 3: 6.8}, table_data, 'style1')
|
|
|
|
add_heading(doc, 2, '(二)部门平均单一流程处理耗时排行')
|
|
table_data = list(map(lambda d: (d[0], d[2]), procinst_by_department))
|
|
table_data.sort(key=lambda d: d[1], reverse=True)
|
|
insert_table(doc, ['排名', '部门', '平均单条流程处理时间'], {0: 1.5, 1: 5.8, 2: 6.2}, table_data, 'style2')
|
|
|
|
add_heading(doc, 2, '(三)部门人均处理流程平均条数排行')
|
|
table_data = list(map(lambda d: (d[0], d[1]), procinst_by_department))
|
|
insert_table(doc, ['排名', '部门', '人均处理流程条数'], {0: 1.5, 1: 5.8, 2: 6.2}, table_data, 'style3')
|
|
|
|
|
|
def upload_to_qiniu(filename):
|
|
logging.info('上传报表至七牛云...')
|
|
|
|
# 获取 qiniu 配置参数
|
|
qn_cfg = config['qiniu']
|
|
access_key = os.environ.get('QINIU_AK', qn_cfg['access-key'])
|
|
secret_key = os.environ.get('QINIU_SK', qn_cfg['secret-key'])
|
|
bucket_name = os.environ.get('QINIU_BUCKET', qn_cfg['bucket'])
|
|
domain = os.environ.get('QINIU_DOMAIN', qn_cfg['domain'])
|
|
|
|
q = qiniu.Auth(access_key, secret_key)
|
|
bucket = qiniu.BucketManager(q)
|
|
|
|
key = 'pep-stats-report/{}'.format(filename)
|
|
localfile = '{}/{}'.format(REPORT_DIR, filename)
|
|
|
|
ret, info = bucket.stat(bucket_name, key)
|
|
if info.status_code == 200:
|
|
print('文件已存在,删除文件...')
|
|
ret, info = bucket.delete(bucket_name, key)
|
|
if info.status_code == 200:
|
|
print('删除成功')
|
|
else:
|
|
print('删除失败')
|
|
|
|
print('上传文件...')
|
|
token = q.upload_token(bucket_name, key)
|
|
ret, info = qiniu.put_file(token, key, localfile)
|
|
if info.status_code == 200:
|
|
print('上传成功')
|
|
logging.info('上传成功')
|
|
# 获取文件访问 URL
|
|
url = '{}/{}?attname='.format(domain, ret['key'])
|
|
print('文件访问 URL:', url)
|
|
logging.info('文件访问 URL:{}'.format(url))
|
|
else:
|
|
print('上传失败,错误信息:', info.error)
|
|
logging.error('上传失败,错误信息:{}'.format(error))
|
|
|
|
|
|
def generate_word_report():
|
|
# 创建一个新的Word文档
|
|
doc = Document()
|
|
set_global_normal_style(doc)
|
|
|
|
# 添加标题
|
|
h = doc.add_heading(level=0)
|
|
r = h.add_run('{}月项企流程效能分析结果公示'.format(last_month))
|
|
set_title_format(h, r)
|
|
|
|
# 添加段落
|
|
p_first = doc.add_paragraph(
|
|
'为了提升公司整体工作效率,确保跨部门工作高效推进,充分发挥我司自研平台的优势,现基于项企系统数据对我司项企平台的使用数据进行分析,'
|
|
'进而通过量化数据发现我司办公流程流转中的问题,'
|
|
'现将{}年{}月公司员工处理流程平均响应时长(响应时长为从流程流转到当前节点到当前节点处理完成的耗时,单位:小时)'
|
|
'以及处理流程条数情况结果进行公示如下:'.format(last_year, last_month))
|
|
set_paragraph_format(p_first)
|
|
|
|
# 一、公司各部门横向比较情况
|
|
add_chapter_1(doc)
|
|
|
|
# 二、各部门成员详情
|
|
add_chapter_2(doc)
|
|
|
|
doc.add_paragraph() # 添加换行符
|
|
|
|
p_last = doc.add_paragraph(
|
|
'针对以上数据,请各部门负责人认真对待并针对部门情况进行分析。'
|
|
'现项企业流程效能分析是公司管理工具逐步完善尝试和摸索的阶段,评估维度、呈现形式均在调整优化中,'
|
|
'欢迎大家对效能评估维提出建设意见,共同促进项企流程效能分析不断完善,进而提升公司整体效率!')
|
|
set_paragraph_format(p_last)
|
|
|
|
# 保存文档
|
|
report_file_name = '项企流程效能分析结果公示-{}年{}月.docx'.format(last_year, last_month)
|
|
report_file = f'{REPORT_DIR}/{report_file_name}'
|
|
doc.save(report_file)
|
|
logging.info('本地报表创建成功:{}'.format(report_file))
|
|
|
|
# 将文档转换为图片
|
|
png_file = image_helper.word_to_long_image(report_file)
|
|
|
|
# 上传七牛云
|
|
upload_to_qiniu(report_file_name)
|
|
upload_to_qiniu(os.path.basename(png_file))
|
|
|
|
|
|
def create_clickhouse_client():
|
|
# 获取 clickhouse 配置参数
|
|
ch_cfg = config['clickhouse']
|
|
host = os.environ.get('CLICKHOUSE_HOST', ch_cfg['host'])
|
|
port = int(os.environ.get('CLICKHOUSE_PORT', ch_cfg.getint('port')))
|
|
username = os.environ.get('CLICKHOUSE_USERNAME', ch_cfg['username'])
|
|
password = os.environ.get('CLICKHOUSE_PASSWORD', ch_cfg['password'])
|
|
database = os.environ.get('CLICKHOUSE_DATABASE', ch_cfg['database'])
|
|
# 创建 ClickHouse 客户端对象
|
|
return Client(host=host, port=port, user=username, password=password, database=database)
|
|
|
|
|
|
def try_create_dir(dir_name):
|
|
# 判断目录是否存在,如果不存在,则创建目录
|
|
if not os.path.exists(dir_name):
|
|
print(f"目录'{dir_name}'不存在,即将创建...")
|
|
os.makedirs(dir_name)
|
|
print(f"目录'{dir_name}'创建成功!")
|
|
|
|
|
|
def generate_procinst_by_department(procinst_by_senior, procinst_by_department_user):
|
|
departments = {} # 存储每个部门的信息
|
|
|
|
procinst_by_senior_user = [('总监以上团队长', item[0], item[1], item[2]) for item in procinst_by_senior]
|
|
all_departments = procinst_by_department_user + procinst_by_senior_user
|
|
|
|
for item in all_departments:
|
|
department = item[0]
|
|
user_name = item[1]
|
|
procinst_count_by_user = item[2]
|
|
procinst_duration_by_user = item[3]
|
|
|
|
if department not in departments:
|
|
departments[department] = {
|
|
'd_users': [],
|
|
'd_count': [],
|
|
'd_duration': []
|
|
}
|
|
|
|
departments[department]['d_users'].append((user_name, procinst_count_by_user, procinst_duration_by_user))
|
|
departments[department]['d_count'].append(procinst_count_by_user)
|
|
departments[department]['d_duration'].append(procinst_duration_by_user)
|
|
|
|
rslt = []
|
|
|
|
for department, info in departments.items():
|
|
d_count_average = round_to_integer(np.mean(info['d_count']))
|
|
d_duration_average = round(np.mean(info['d_duration']), 2)
|
|
rslt.append((department, d_count_average, d_duration_average, info['d_users']))
|
|
|
|
rslt.sort(key=lambda d: d[1], reverse=True) # 按照部门处理流程条数倒序排列
|
|
|
|
return rslt
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
try_create_dir(REPORT_DIR)
|
|
try_create_dir(LOG_DIR)
|
|
logging.basicConfig(filename='{}/runtime.log'.format(LOG_DIR), level=logging.INFO,
|
|
format='%(asctime)s.%(msecs)03d - %(levelname)s: %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S')
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read('config.ini')
|
|
|
|
client = create_clickhouse_client()
|
|
|
|
last_year, last_month = get_report_year_month()
|
|
start_time, end_time = get_report_start_end()
|
|
|
|
qs1 = db_helper.querystring_procinst_by_user(start_time, end_time)
|
|
procinst_by_user = client.execute(qs1)
|
|
procinst_count_by_company = round_to_integer(np.mean([item[1] for item in procinst_by_user]))
|
|
procinst_duration_by_company = round(np.mean([item[2] for item in procinst_by_user]), 2)
|
|
|
|
qs2 = db_helper.querystring_procinst_by_user(start_time, end_time, senior=True)
|
|
qs3 = db_helper.querystring_procinst_by_department_user(start_time, end_time)
|
|
procinst_by_department = generate_procinst_by_department(client.execute(qs2), client.execute(qs3))
|
|
|
|
generate_word_report() # 生成报表文件
|
|
except Exception as error:
|
|
print('程序运行出错:', error)
|
|
logging.error('程序运行出错:{}'.format(error))
|
|
|
|
# input("请按任意键退出...")
|
|
|