import os import datetime import configparser import math from decimal import Decimal, ROUND_HALF_UP from clickhouse_driver import Client from docx import Document from docx.shared import Inches, Cm, Pt, RGBColor from docx.enum.text import WD_PARAGRAPH_ALIGNMENT, WD_LINE_SPACING from docx.enum.table import WD_TABLE_ALIGNMENT, WD_CELL_VERTICAL_ALIGNMENT from docx.oxml.ns import qn, nsdecls from docx.oxml import parse_xml import matplotlib.pyplot as plt import matplotlib.ticker as ticker import numpy as np from app import db_helper, image_helper import qiniu import logging REPORT_DIR = 'output' LOG_DIR = 'log' # 小于十万的整数转换 def number_to_chinese(number): chinese_digits = ['零', '一', '二', '三', '四', '五', '六', '七', '八', '九'] chinese_units = ['', '十', '百', '千', '万'] if number == 0: return chinese_digits[0] result = '' unit_index = 0 last_digit_is_zero = False # 记录上一位数字是否为零 while number > 0: digit = number % 10 if digit != 0: if last_digit_is_zero: result = chinese_digits[0] + result # 添加零 result = chinese_digits[digit] + chinese_units[unit_index] + result last_digit_is_zero = False else: # 如果当前位是零,并且结果不为空,则记录上一位为零 if result != '': last_digit_is_zero = True unit_index += 1 number //= 10 # 如果结果以'一十'开头,则去掉'一',只保留'十' if result[:2] == chinese_digits[1] + chinese_units[1]: result = result[1:] return result def round_to_integer(v): if math.isnan(v): return v integer_in_decimal = Decimal(v).quantize(Decimal('0'), rounding=ROUND_HALF_UP) # 取整,如:2.5 -> 3 (not 2), 3.5 -> 4 return int(integer_in_decimal) def get_report_year_month(): # 获取上月的最后一天 last_day = datetime.datetime.now().replace(day=1) - datetime.timedelta(days=1) return last_day.year, last_day.month def get_report_start_end(): # 获取当前日期 now = datetime.datetime.now() # 获取本月第一天 this_month_start = datetime.datetime(now.year, now.month, 1) # 获取上个月最后一天 last_month_end = this_month_start - datetime.timedelta(days=1) # 获取上个月第一天 last_month_start = datetime.datetime(last_month_end.year, last_month_end.month, 1) return last_month_start, this_month_start def set_paragraph_space(p): # 设置间距 p.paragraph_format.space_before = Pt(6) # 段前 p.paragraph_format.space_after = Pt(6) # 段后 p.paragraph_format.line_spacing = 1.5 # 设置段落行距:1.5倍行距 def set_paragraph_format(p): set_paragraph_space(p) # 设置首行缩进2个字符 p.paragraph_format.first_line_indent = Inches(0.3) def set_heading_format(h, r, head_level=1): set_paragraph_space(h) r.font.name = 'Times New Roman' r._element.rPr.rFonts.set(qn('w:eastAsia'), u'宋体') r.font.size = Pt(16) if head_level == 1 else Pt(15) # 三号 -> Pt(16), 小三 -> Pt(15) r.font.color.rgb = RGBColor(0, 0, 0) r.bold = True def set_title_format(h, r): h.paragraph_format.space_before = Pt(6) # 段前0.5行 h.paragraph_format.space_after = Pt(6) # 段后0.5行 h.paragraph_format.line_spacing = 1.5 # 1.5倍行距 h.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER r.font.name = u'Times New Roman' r._element.rPr.rFonts.set(qn('w:eastAsia'), u'宋体') r.font.size = Pt(18) # 小二 r.font.color.rgb = RGBColor(0, 0, 0) r.bold = True def set_global_normal_style(doc): style = doc.styles['Normal'] style.font.name = 'Times New Roman' # 英文、数字字体 style._element.rPr.rFonts.set(qn('w:eastAsia'), u'宋体') # 中文字体 style.font.size = Pt(14) # 四号 style.paragraph_format.space_before = Pt(6) # 段前0.5行 style.paragraph_format.space_after = Pt(6) # 段后0.5行 style.paragraph_format.line_spacing_rule = WD_LINE_SPACING.ONE_POINT_FIVE # 1.5倍行距 # 设置单元格底纹 def set_table_cell_shading_color(table_cell, color): shading_elm = parse_xml(r''.format(nsdecls('w'), bgColor=color)) table_cell._tc.get_or_add_tcPr().append(shading_elm) # 设置表格行底纹 def set_table_row_shading_color(table_row, color): shading_dict = {} for i, cell in enumerate(table_row.cells): shading_dict['shading_elm_' + str(i)] = parse_xml(r''.format(nsdecls('w'), bgColor=color)) cell._tc.get_or_add_tcPr().append(shading_dict['shading_elm_' + str(i)]) def add_heading(doc, head_level, head_content): h = doc.add_heading(level=head_level) r = h.add_run(head_content) set_heading_format(h, r, head_level) def create_twinx_chart(department, title, filename): plt.rcParams["font.sans-serif"] = ["SimHei"] # 设置字体 plt.rcParams["axes.unicode_minus"] = False # 正常显示负号 x = [] y1 = [] y2 = [] for user in department['d_users']: x.append(user[0]) # 用户名 y1.append(user[2]) # 用户处理流程耗时 y2.append(user[1]) # 用户处理流程条数 x_range = np.arange(len(x)) bgcolor = '#d9d9d9' fig, ax1 = plt.subplots(figsize=(8, 5), layout='constrained', facecolor=bgcolor) ax1.set_ylabel('平均处理流程时间(小时)') y1_bar = ax1.bar(x_range, y1, width=0.25, label='流程处理时间', color=(91 / 255, 155 / 255, 213 / 255)) ax1.set_xticks(x_range, x) # 设置x轴标签旋转角度 plt.xticks(rotation=-30, ha='left') ax2 = ax1.twinx() # instantiate a second axes that shares the same x-axis y2_line, = ax2.plot(x_range, y2, label='处理流程条数', color='#ff0000', marker='o', linestyle='dashed', linewidth=2) ax2.set_ylabel('处理流程条数(条)') ax2.set_ylim(0, max(y2) + 1) ax2.yaxis.set_major_locator(ticker.MaxNLocator(integer=True)) # 设置整数刻度 annotate_kwargs = {'color': '#ffffff', 'arrowprops': dict(color='#a5a5a5', arrowstyle='-')} # y2_line:添加数据标签 # UserWarning: constrained_layout not applied because axes sizes collapsed to zero. for x, y in zip(x_range, y2): xtext_offset = 0.01 if len(x_range) == 1 else 0.1 ax2.annotate(f"{y}", xy=(x, y), xytext=(x + xtext_offset, y + 0.1), backgroundcolor='#000000', **annotate_kwargs) # 绘制均值线:相对于左侧坐标轴"平均处理流程时间(小时)"绘制 hline_kwargs = {'linestyle': 'solid', 'linewidth': 2} axhline1 = ax1.axhline(department['d_elapse'], label='部门平均时间', color='#ed7d31', **hline_kwargs) axhline2 = ax1.axhline(procinst_duration_by_company, label='公司平均时间', color='#a5a5a5', **hline_kwargs) # 获取 y1 轴两个刻度之间的距离 y1_ticks = ax1.get_yticks() y1_ticks_distance = y1_ticks[1] - y1_ticks[0] if len(y1_ticks) > 1 else y1_ticks[0] # y1_bar:添加数据标签 bar_label_kwargs = {'color': '#ffffff', 'backgroundcolor': '#8ba7de'} y1_threshold = y1_ticks_distance / 2 y1_bar_large = [v if v > y1_threshold else '' for v in y1] y1_bar_small = [v if v <= y1_threshold else '' for v in y1] ax1.bar_label(y1_bar, y1_bar_large, padding=-20, **bar_label_kwargs) ax1.bar_label(y1_bar, y1_bar_small, padding=3, **bar_label_kwargs) # 均值线:添加数据标签 x_pos = 0 if len(x_range) == 1 else 1 xtext_benchmark = 0 if len(x_range) == 1 else 1 xtext_offset = 0.03 if len(x_range) == 1 else 0.3 ytext_offset = y1_ticks_distance / 3 ax1.annotate(f"{department['d_elapse']}", xy=(x_pos, department['d_elapse']), xytext=(xtext_benchmark - xtext_offset, department['d_elapse'] + ytext_offset), backgroundcolor='#f5b482', **annotate_kwargs) ax1.annotate(f"{procinst_duration_by_company}", xy=(x_pos, procinst_duration_by_company), xytext=(xtext_benchmark + xtext_offset, procinst_duration_by_company + ytext_offset), backgroundcolor='#ef95a0', **annotate_kwargs) plt.title(title, fontsize=14, pad=30) # pad: 标题与坐标轴顶部的偏移量 fig.legend(handles=[y1_bar, axhline1, axhline2, y2_line], loc='outside lower center', ncols=4, facecolor=bgcolor, edgecolor=bgcolor) ax1.set_xlabel(department['d_name'], fontsize=12, labelpad=8) # 在图形下方显示部门名称 ax1.set_facecolor(bgcolor) # 设置绘图区域的背景色 # 去掉绘图区域的边框线和坐标轴刻度线 axes = [ax1, ax2] for ax in axes: ax.spines['top'].set_visible(False) ax.spines['bottom'].set_visible(False) ax.spines['left'].set_visible(False) ax.spines['right'].set_visible(False) ax.tick_params(axis='both', which='both', length=0) # 设置绘图区与图片边缘的距离 # plt.subplots_adjust(left=0.08, right=0.94, top=0.9, bottom=0.24) # 保存图形 plt.savefig('{}/{}'.format(REPORT_DIR, filename)) plt.close() def add_section(doc, department_data): d_name = department_data['d_name'] d_elapse = department_data['d_elapse'] d_elapse_rank = department_data['d_elapse_rank'] d_count = department_data['d_count'] d_count_rank = department_data['d_count_rank'] d_users = department_data['d_users'] d_users_gt_company_avg_duration = [u[0] for u in d_users if u[2] > procinst_duration_by_company] add_heading(doc, 2, '({}){}'.format(number_to_chinese(d_elapse_rank), d_name)) p = doc.add_paragraph(f'部门人均处理流程平均耗时时长排名:{d_elapse_rank};', style='List Bullet') set_paragraph_space(p) operator = '>' if d_elapse > procinst_duration_by_company else '<' if d_elapse < procinst_duration_by_company else '==' p = doc.add_paragraph(f'部门处理流程平均耗时时长:{d_elapse}小时{operator}公司平均{procinst_duration_by_company}小时;', style='List Bullet') set_paragraph_space(p) users_count = len(d_users_gt_company_avg_duration) p = doc.add_paragraph(style='List Bullet') p.add_run(f'部门处理流程平均耗时大于公司平均水平的有{users_count}人') p.add_run(f":{'、'.join(d_users_gt_company_avg_duration)};" if users_count > 0 else ";") set_paragraph_space(p) p = doc.add_paragraph(f'人均处理流程条数排名:{d_count_rank};', style='List Bullet') set_paragraph_space(p) operator = '>' if d_count > procinst_count_by_company else '<' if d_count < procinst_count_by_company else '==' p = doc.add_paragraph(f'人均处理流程条数:{d_count}条{operator}公司平均{procinst_count_by_company}条;', style='List Bullet') set_paragraph_space(p) pic_name = f"{d_name}-平均处理流程时间及流程处理条数.png" create_twinx_chart(department_data, "部门成员平均处理流程时间及流程处理条数", pic_name) # 插入图形到 Word 文档中 p = doc.add_paragraph() p.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER r = p.add_run() r.add_picture('{}/{}'.format(REPORT_DIR, pic_name), width=Inches(6)) def add_chapter_2(doc): add_heading(doc, 1, '二、各部门成员详情') # 部门流程处理条数排行 department_rank_by_count = [item[0] for item in procinst_by_department] # procinst_by_department 默认按照流程条数倒序排列 # 部门流程处理耗时排行 department_sorted_by_elapse = sorted(procinst_by_department, key=lambda d: d[2], reverse=True) department_rank_by_elapse = [item[0] for item in department_sorted_by_elapse] for item in department_sorted_by_elapse: department = item[0] d_count = item[1] d_elapse = item[2] d_count_rank = department_rank_by_count.index(department) + 1 d_elapse_rank = department_rank_by_elapse.index(department) + 1 add_section(doc, {'d_name': department, 'd_elapse': d_elapse, 'd_elapse_rank': d_elapse_rank, 'd_count': d_count, 'd_count_rank': d_count_rank, 'd_users': item[3]}) def insert_table(doc, table_headers, col_width_dict, table_data, table_style): # 在文档中添加一个空表格 table = doc.add_table(rows=0, cols=len(table_headers), style='Table Grid') # 添加表头 heading_cells = table.add_row().cells for i, header in enumerate(table_headers): heading_cells[i].text = header heading_cells[i].paragraphs[0].runs[0].bold = True # 字体加粗 # 循环添加数据 avg_row_added = False for i, row_data in enumerate(table_data): if table_style in ['style2', 'style3']: avg_value = {'style2': procinst_duration_by_company, 'style3': procinst_count_by_company} if not avg_row_added and row_data[1] < avg_value[table_style]: # 添加“平均”值行 row = table.add_row() row_cells = row.cells row_cells[0].merge(row_cells[1]).text = '平均' # 合并单元格 row_cells[2].text = str(avg_value[table_style]) set_table_row_shading_color(row, 'ff0000') row_cells[0].paragraphs[0].runs[0].bold = True # 字体加粗 row_cells[2].paragraphs[0].runs[0].bold = True avg_row_added = True row = table.add_row() row_cells = row.cells row_cells[0].text = str(i + 1) for j, cell_value in enumerate(row_data): col_index = j + 1 row_cells[col_index].text = str(cell_value) if table_style == 'style1': if row_data[1] >= procinst_count_by_company: set_table_cell_shading_color(row_cells[2], '#fedb61') if row_data[2] >= procinst_duration_by_company: set_table_cell_shading_color(row_cells[3], '#ff0000') elif table_style == 'style2' and row_data[1] >= procinst_duration_by_company: set_table_row_shading_color(row, 'fedb61') if table_style == 'style1': # 统计值:平均值 row_cells = table.add_row().cells avg_data = [len(table_data) + 1, '公司平均', procinst_count_by_company, procinst_duration_by_company] for i, cell_value in enumerate(avg_data): row_cells[i].text = str(cell_value) row_cells[i].paragraphs[0].runs[0].bold = True # 字体加粗 # 设置表格垂直对齐方式 table.alignment = WD_TABLE_ALIGNMENT.CENTER # 设置单元格样式 for row in table.rows: for cell in row.cells: cell.vertical_alignment = WD_CELL_VERTICAL_ALIGNMENT.CENTER # 垂直居中 cell.paragraphs[0].alignment = WD_PARAGRAPH_ALIGNMENT.CENTER # 水平居中 for col_index, col_width in col_width_dict.items(): # 设置列宽 row.cells[col_index].width = Cm(col_width) row.height = Cm(0.7) # 设置行高 def add_chapter_1(doc): try: qs = db_helper.querystring_user_count(start_time, end_time) user_count = client.execute(qs)[0][0] except Exception as error: print('数据库查询错误:', error) logging.error('数据库查询错误:{}'.format(error)) raise RuntimeError('数据库查询错误:', error) department_count_gt_avg_count = len(list(filter(lambda d: d[1] > procinst_count_by_company, procinst_by_department))) department_count_gt_avg_duration = len(list(filter(lambda d: d[2] > procinst_duration_by_company, procinst_by_department))) user_count_gt_avg_count = len(list(filter(lambda d: d[1] > procinst_count_by_company, procinst_by_user))) user_count_gt_avg_duration = len(list(filter(lambda d: d[2] > procinst_duration_by_company, procinst_by_user))) add_heading(doc, 1, '一、公司各部门横向比较情况') add_heading(doc, 2, '(一)一览表') p = doc.add_paragraph() p.add_run('{}年{}月,公司共计{}人使用项企平台,'.format(last_year, last_month, user_count)) p.add_run('个人处理流程平均耗时{}小时/流程,平均单一流程处理耗时高于公司平均数的有{}个部门,{}人。'.format( procinst_duration_by_company, department_count_gt_avg_duration, user_count_gt_avg_duration)) p.add_run('个人处理流程平均条数为{}条,个人处理流程平均条数高于公司平均数的有{}个部门,{}人。'.format( procinst_count_by_company, department_count_gt_avg_count, user_count_gt_avg_count)) set_paragraph_format(p) table_data = list(map(lambda d: (d[0], d[1], d[2]), procinst_by_department)) insert_table(doc, ['序号', '部门', '人均处理流程条数', '平均单条流程处理时间'], {0: 1.5, 1: 5.8, 2: 3.6, 3: 4.3}, table_data, 'style1') add_heading(doc, 2, '(二)部门平均单一流程处理耗时排行') table_data = list(map(lambda d: (d[0], d[2]), procinst_by_department)) table_data.sort(key=lambda d: d[1], reverse=True) insert_table(doc, ['排名', '部门', '平均单条流程处理时间'], {0: 1.5, 1: 5.8, 2: 4.3}, table_data, 'style2') add_heading(doc, 2, '(三)部门人均处理流程平均条数排行') table_data = list(map(lambda d: (d[0], d[1]), procinst_by_department)) insert_table(doc, ['排名', '部门', '人均处理流程条数'], {0: 1.5, 1: 5.8, 2: 4.3}, table_data, 'style3') def upload_to_qiniu(filename): logging.info('上传报表至七牛云...') # 获取 qiniu 配置参数 qn_cfg = config['qiniu'] access_key = os.environ.get('QINIU_AK', qn_cfg['access-key']) secret_key = os.environ.get('QINIU_SK', qn_cfg['secret-key']) bucket_name = os.environ.get('QINIU_BUCKET', qn_cfg['bucket']) domain = os.environ.get('QINIU_DOMAIN', qn_cfg['domain']) q = qiniu.Auth(access_key, secret_key) bucket = qiniu.BucketManager(q) key = 'pep-stats-report/{}'.format(filename) localfile = '{}/{}'.format(REPORT_DIR, filename) ret, info = bucket.stat(bucket_name, key) if info.status_code == 200: print('文件已存在,删除文件...') ret, info = bucket.delete(bucket_name, key) if info.status_code == 200: print('删除成功') else: print('删除失败') print('上传文件...') token = q.upload_token(bucket_name, key) ret, info = qiniu.put_file(token, key, localfile) if info.status_code == 200: print('上传成功') logging.info('上传成功') # 获取文件访问 URL url = '{}/{}?attname='.format(domain, ret['key']) print('文件访问 URL:', url) logging.info('文件访问 URL:{}'.format(url)) else: print('上传失败,错误信息:', info.error) logging.error('上传失败,错误信息:{}'.format(error)) def generate_word_report(): # 创建一个新的Word文档 doc = Document() set_global_normal_style(doc) # 添加标题 h = doc.add_heading(level=0) r = h.add_run('{}月项企流程效能分析结果公示'.format(last_month)) set_title_format(h, r) # 添加段落 p_first = doc.add_paragraph( '为了提升公司整体工作效率,确保跨部门工作高效推进,充分发挥我司自研平台的优势,现基于项企系统数据对我司项企平台的使用数据进行分析,' '进而通过量化数据发现我司办公流程流转中的问题,' '现将{}年{}月公司员工处理流程平均响应时长(响应时长为从流程流转到当前节点到当前节点处理完成的耗时,单位:小时)' '以及处理流程条数情况结果进行公示如下:'.format(last_year, last_month)) set_paragraph_format(p_first) # 一、公司各部门横向比较情况 add_chapter_1(doc) # 二、各部门成员详情 add_chapter_2(doc) p_last = doc.add_paragraph( '针对以上数据,请各部门负责人认真对待并针对部门情况进行分析。' '现项企业流程效能分析是公司管理工具逐步完善尝试和摸索的阶段,评估维度、呈现形式均在调整优化中,' '欢迎大家对效能评估维提出建设意见,共同促进项企流程效能分析不断完善,进而提升公司整体效率!') set_paragraph_format(p_last) # 保存文档 report_file_name = '项企流程效能分析结果公示-{}年{}月.docx'.format(last_year, last_month) report_file = f'{REPORT_DIR}/{report_file_name}' doc.save(report_file) logging.info('本地报表创建成功:{}'.format(report_file)) # 将文档转换为图片 png_file = image_helper.word_to_long_image(report_file) # 上传七牛云 upload_to_qiniu(report_file_name) upload_to_qiniu(os.path.basename(png_file)) def create_clickhouse_client(): # 获取 clickhouse 配置参数 ch_cfg = config['clickhouse'] host = os.environ.get('CLICKHOUSE_HOST', ch_cfg['host']) port = int(os.environ.get('CLICKHOUSE_PORT', ch_cfg.getint('port'))) username = os.environ.get('CLICKHOUSE_USERNAME', ch_cfg['username']) password = os.environ.get('CLICKHOUSE_PASSWORD', ch_cfg['password']) database = os.environ.get('CLICKHOUSE_DATABASE', ch_cfg['database']) # 创建 ClickHouse 客户端对象 return Client(host=host, port=port, user=username, password=password, database=database) def try_create_dir(dir_name): # 判断目录是否存在,如果不存在,则创建目录 if not os.path.exists(dir_name): print(f"目录'{dir_name}'不存在,即将创建...") os.makedirs(dir_name) print(f"目录'{dir_name}'创建成功!") def generate_procinst_by_department(procinst_by_senior, procinst_by_department_user): departments = {} # 存储每个部门的信息 procinst_by_senior_user = [('高管', item[0], item[1], item[2]) for item in procinst_by_senior] all_departments = procinst_by_department_user + procinst_by_senior_user for item in all_departments: department = item[0] user_name = item[1] procinst_count_by_user = item[2] procinst_duration_by_user = item[3] if department not in departments: departments[department] = { 'd_users': [], 'd_count': [], 'd_duration': [] } departments[department]['d_users'].append((user_name, procinst_count_by_user, procinst_duration_by_user)) departments[department]['d_count'].append(procinst_count_by_user) departments[department]['d_duration'].append(procinst_duration_by_user) rslt = [] for department, info in departments.items(): d_count_average = round_to_integer(np.mean(info['d_count'])) d_duration_average = round(np.mean(info['d_duration']), 2) rslt.append((department, d_count_average, d_duration_average, info['d_users'])) rslt.sort(key=lambda d: d[1], reverse=True) # 按照部门处理流程条数倒序排列 return rslt if __name__ == '__main__': try: try_create_dir(REPORT_DIR) try_create_dir(LOG_DIR) logging.basicConfig(filename='{}/runtime.log'.format(LOG_DIR), level=logging.INFO, format='%(asctime)s.%(msecs)03d - %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') config = configparser.ConfigParser() config.read('config.ini') client = create_clickhouse_client() last_year, last_month = get_report_year_month() start_time, end_time = get_report_start_end() qs1 = db_helper.querystring_procinst_by_user(start_time, end_time) procinst_by_user = client.execute(qs1) procinst_count_by_company = round_to_integer(np.mean([item[1] for item in procinst_by_user])) procinst_duration_by_company = round(np.mean([item[2] for item in procinst_by_user]), 2) qs2 = db_helper.querystring_procinst_by_user(start_time, end_time, senior=True) qs3 = db_helper.querystring_procinst_by_department_user(start_time, end_time) procinst_by_department = generate_procinst_by_department(client.execute(qs2), client.execute(qs3)) generate_word_report() # 生成报表文件 except Exception as error: print('程序运行出错:', error) logging.error('程序运行出错:{}'.format(error)) # input("请按任意键退出...")