import os import datetime import configparser from clickhouse_driver import Client from docx import Document from docx.shared import Inches, Pt, RGBColor from docx.enum.text import WD_PARAGRAPH_ALIGNMENT, WD_LINE_SPACING from docx.oxml.ns import qn import matplotlib.pyplot as plt import numpy as np from app import db_helper, image_helper import qiniu import logging REPORT_DIR = 'output' LOG_DIR = 'log' def get_report_year_month(): # 获取上月的最后一天 last_day = datetime.datetime.now().replace(day=1) - datetime.timedelta(days=1) return last_day.year, last_day.month def get_report_start_end(): # 获取当前日期 now = datetime.datetime.now() # 获取本月第一天 this_month_start = datetime.datetime(now.year, now.month, 1) # 获取上个月最后一天 last_month_end = this_month_start - datetime.timedelta(days=1) # 获取上个月第一天 last_month_start = datetime.datetime(last_month_end.year, last_month_end.month, 1) return last_month_start, this_month_start def set_paragraph_space(p): # 设置间距 p.paragraph_format.space_before = 0 # 段前 p.paragraph_format.space_after = 0 # 段后 p.paragraph_format.line_spacing = Pt(20) # 设置段落行距为20磅 def set_paragraph_format(p): set_paragraph_space(p) # 设置首行缩进2个字符 p.paragraph_format.first_line_indent = Inches(0.3) def set_heading_format(h, r): set_paragraph_space(h) r.font.name = 'Times New Roman' r._element.rPr.rFonts.set(qn('w:eastAsia'), u'宋体') r.font.size = Pt(10.5) # 五号 r.font.color.rgb = RGBColor(0, 0, 0) r.bold = True def set_title_format(h, r): h.paragraph_format.space_before = Pt(12) # 段前1行 h.paragraph_format.space_after = Pt(12) # 段后1行 h.paragraph_format.line_spacing = Pt(20) # 设置段落行距为20磅 h.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER r.font.name = u'Times New Roman' r._element.rPr.rFonts.set(qn('w:eastAsia'), u'宋体') r.font.size = Pt(10.5) # 五号 r.font.color.rgb = RGBColor(0, 0, 0) r.bold = True def set_global_normal_style(doc): style = doc.styles['Normal'] style.font.name = 'Times New Roman' # 英文、数字字体 style._element.rPr.rFonts.set(qn('w:eastAsia'), u'宋体') # 中文字体 style.font.size = Pt(10.5) # 五号 style.paragraph_format.space_before = 0 # 段前 style.paragraph_format.space_after = 0 # 段后 style.paragraph_format.line_spacing_rule = WD_LINE_SPACING.SINGLE # 单倍行距 def create_bar_chart(doc, x, y, mean, title, filename): plt.rcParams["font.sans-serif"] = ["SimHei"] # 设置字体 plt.rcParams["axes.unicode_minus"] = False # 正常显示负号 if title == '部门流程处理平均耗时': plt.figure(figsize=(12, 6), layout='tight') plt.margins(x=0.01) else: plt.figure(figsize=(6.4, 3.4), layout='tight') # 绘制柱状图 plt.bar(x, y, color=(31 / 255, 168 / 255, 201 / 255)) # 绘制均值线 plt.axhline(mean, linestyle='dashed', color='#FF8C00') # 添加标题 plt.title(title) # 设置x轴标签旋转角度 plt.xticks(rotation=-30, ha='left') # 添加图例 plt.legend(['均值:{}'.format(mean)], loc='upper right') # 保存图形 plt.savefig('{}/{}'.format(REPORT_DIR, filename)) plt.close() # 插入图形到 Word 文档中 p = doc.add_paragraph() p.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER r = p.add_run() r.add_picture('{}/{}'.format(REPORT_DIR, filename), width=Inches(6)) def create_bar_twinx_chart(doc, x, y1, y2, mean, title, filename): plt.rcParams["font.sans-serif"] = ["SimHei"] # 设置字体 plt.rcParams["axes.unicode_minus"] = False # 正常显示负号 x_range = np.arange(len(x)) width = 0.25 # the width of the bars fig, ax1 = plt.subplots() ax1.set_ylabel('耗时') rects = ax1.bar(x_range, y1, width, color=(31 / 255, 168 / 255, 201 / 255), label='耗时') ax1.bar_label(rects, padding=3) ax1.set_xticks(x_range + width / 2, x) # 设置x轴标签旋转角度 plt.xticks(rotation=-30, ha='left') # 绘制均值线 plt.axhline(mean, linestyle='dashed', color='#FF8C00', label=f'均值:{mean}') ax2 = ax1.twinx() # instantiate a second axes that shares the same x-axis ax2.set_ylabel('流程数') rects = ax2.bar(x_range + width, y2, width, color='#5AC189', label='流程数') ax2.bar_label(rects, padding=3) plt.title(title) fig.legend(loc='upper right', ncols=1, bbox_to_anchor=(1, 1), bbox_transform=ax1.transAxes) fig.tight_layout() # otherwise the right y-label is slightly clipped # 保存图形 plt.savefig('{}/{}'.format(REPORT_DIR, filename)) plt.close() # 插入图形到 Word 文档中 p = doc.add_paragraph() p.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER r = p.add_run() r.add_picture('{}/{}'.format(REPORT_DIR, filename), width=Inches(6)) def add_section(doc, rank, department, d_elapse): h = doc.add_heading(level=2) r = h.add_run('{}. {}'.format(rank, department)) set_heading_format(h, r) try: qs = db_helper.querystring_user_gt_senior_avg( start_time, end_time, d_elapse ) if department == '高管' else db_helper.querystring_user_gt_department_avg( start_time, end_time, department, d_elapse) user_gt_department_avg = client.execute(qs) qs = db_helper.querystring_procinst_duration_by_user_senior( start_time, end_time ) if department == '高管' else db_helper.querystring_procinst_duration_by_user( start_time, end_time, department) procinst_duration_by_user = client.execute(qs) qs = db_helper.querystring_procinst_count_by_senior( start_time, end_time ) if department == '高管' else db_helper.querystring_procinst_count_by_user( start_time, end_time, department) procinst_count_by_user = client.execute(qs) except Exception as error: print('数据库查询错误:', error) logging.error('数据库查询错误:{}'.format(error)) raise RuntimeError('数据库查询错误:', error) users = [] for user, _ in user_gt_department_avg: users.append(user) p = doc.add_paragraph() p.add_run('{}{}月份个人处理流程平均耗时时长在公司排名为'.format(department, last_month)) p.add_run('{}'.format(rank)).underline = True if len(procinst_duration_by_user) == 1: p.add_run(',{}公司平均水平。'.format( '高于' if d_elapse > procinst_duration_by_company else '低于' if d_elapse < procinst_duration_by_company else '等于')) else: p.add_run(',其中耗时较长的有') p.add_run('{}'.format('、'.join(users))).underline = True p.add_run(',{}({}小时/流程)。' .format('高于公司平均水平和部门平均水平' if d_elapse >= procinst_duration_by_company else '低于公司平均水平但高于部门平均水平', d_elapse)) set_paragraph_format(p) x = [] y1 = [] y2 = [] for user, elapse in procinst_duration_by_user: x.append(user) y1.append(elapse) for name in x: for user, count in procinst_count_by_user: if user == name: y2.append(count) break create_bar_twinx_chart(doc, x, y1, y2, d_elapse, "个人流程处理平均耗时", "{}-流程处理平均耗时.png".format(department)) caption = doc.add_paragraph('图 2-{} {}个人处理流程平均耗时'.format(rank, department)) caption.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER def add_chapter_2(doc): h = doc.add_heading(level=1) r = h.add_run('二、各部门成员详情') set_heading_format(h, r) departments_excluded = ['汇派-质量部', '汇派-生产部', '汇派-计划部', '汇派-人事部', '汇派-采购部', '党建工会', '工程项目中心', '北京技术中心'] d = [] for department, elapse in procinst_duration_by_department: if department not in departments_excluded: d.append((department, elapse)) for index, (department, elapse) in enumerate(d): add_section(doc, index + 1, department, elapse) def add_chapter_1(doc): h = doc.add_heading(level=1) r = h.add_run('一、公司各部门横向比较情况') set_heading_format(h, r) try: qs = db_helper.querystring_user_count(start_time, end_time) user_count = client.execute(qs)[0][0] qs = db_helper.querystring_department_count_gt_avg(start_time, end_time, procinst_duration_by_company) department_count_gt_avg = client.execute(qs)[0][0] qs = db_helper.querystring_user_count_gt_avg(start_time, end_time, procinst_duration_by_company) user_count_gt_avg = client.execute(qs)[0][0] except Exception as error: print('数据库查询错误:', error) logging.error('数据库查询错误:{}'.format(error)) raise RuntimeError('数据库查询错误:', error) if procinst_duration_by_senior is not None and procinst_duration_by_senior > procinst_duration_by_company: department_count_gt_avg += 1 p = doc.add_paragraph() p.add_run('{}年{}月,公司共计'.format(last_year, last_month)) p.add_run('{}'.format(user_count)).underline = True p.add_run('人使用项企平台,个人处理流程平均耗时') p.add_run('{}'.format(procinst_duration_by_company)).underline = True p.add_run('小时/流程,其中,平均单一流程处理耗时高于公司平均数的有') p.add_run('{}'.format(department_count_gt_avg)).underline = True p.add_run('个部门,') p.add_run('{}'.format(user_count_gt_avg)).underline = True p.add_run('人。') set_paragraph_format(p) # 准备数据 x = [] y = [] for department, elapse in procinst_duration_by_department: x.append(department) y.append(elapse) # 创建柱状图并保存为文件 create_bar_chart(doc, x, y, procinst_duration_by_company, "部门流程处理平均耗时", "部门流程处理平均耗时.png") caption = doc.add_paragraph('图 1 部门流程处理平均耗时') caption.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER def upload_to_qiniu(filename): logging.info('上传报表至七牛云...') # 获取 qiniu 配置参数 qn_cfg = config['qiniu'] access_key = os.environ.get('QINIU_AK', qn_cfg['access-key']) secret_key = os.environ.get('QINIU_SK', qn_cfg['secret-key']) bucket_name = os.environ.get('QINIU_BUCKET', qn_cfg['bucket']) domain = os.environ.get('QINIU_DOMAIN', qn_cfg['domain']) q = qiniu.Auth(access_key, secret_key) bucket = qiniu.BucketManager(q) key = 'pep-stats-report/{}'.format(filename) localfile = '{}/{}'.format(REPORT_DIR, filename) ret, info = bucket.stat(bucket_name, key) if info.status_code == 200: print('文件已存在,删除文件...') ret, info = bucket.delete(bucket_name, key) if info.status_code == 200: print('删除成功') else: print('删除失败') print('上传文件...') token = q.upload_token(bucket_name, key) ret, info = qiniu.put_file(token, key, localfile) if info.status_code == 200: print('上传成功') logging.info('上传成功') # 获取文件访问 URL url = '{}/{}?attname='.format(domain, ret['key']) print('文件访问 URL:', url) logging.info('文件访问 URL:{}'.format(url)) else: print('上传失败,错误信息:', info.error) logging.error('上传失败,错误信息:{}'.format(error)) def generate_word_report(): # 创建一个新的Word文档 doc = Document() set_global_normal_style(doc) # 添加标题 h = doc.add_heading(level=0) r = h.add_run('项企流程效能分析结果公示') set_title_format(h, r) # 添加段落 p_first = doc.add_paragraph( '为了提升公司整体工作效率,确保跨部门工作高效推进,充分发挥我司自研平台的优势,现基于项企系统数据,利用Superset工具打造了项企流程效能分析平台,能够对我司项企平台的数据进行分析,进而通过量化数据发现我司办公流程流转中的问题,' \ '现将{}年{}月公司员工处理流程平均响应时长(响应时长为从流程流转到当前节点到当前节点处理完成的耗时,单位:小时)情况结果进行公示如下:' .format(last_year, last_month)) set_paragraph_format(p_first) # 一、公司各部门横向比较情况 add_chapter_1(doc) # 二、各部门成员详情 add_chapter_2(doc) p_last = doc.add_paragraph( '现项企业流程效能分析是公司管理工具逐步完善尝试和摸索的阶段,只是单一评估维度,大家若对评估维度有相应意见,欢迎积极提出,共同促进项企流程效能分析不断完善,进而提升公司整体效率!') set_paragraph_format(p_last) # 保存文档 report_file_name = '项企流程效能分析结果公示(全员)-{}年{}月.docx'.format(last_year, last_month) report_file = f'{REPORT_DIR}/{report_file_name}' doc.save(report_file) logging.info('本地报表创建成功:{}'.format(report_file)) # 将文档转换为图片 png_file = image_helper.word_to_long_image(report_file) # 上传七牛云 upload_to_qiniu(report_file_name) upload_to_qiniu(os.path.basename(png_file)) def create_clickhouse_client(): # 获取 clickhouse 配置参数 ch_cfg = config['clickhouse'] host = os.environ.get('CLICKHOUSE_HOST', ch_cfg['host']) port = int(os.environ.get('CLICKHOUSE_PORT', ch_cfg.getint('port'))) username = os.environ.get('CLICKHOUSE_USERNAME', ch_cfg['username']) password = os.environ.get('CLICKHOUSE_PASSWORD', ch_cfg['password']) database = os.environ.get('CLICKHOUSE_DATABASE', ch_cfg['database']) # 创建 ClickHouse 客户端对象 return Client(host=host, port=port, user=username, password=password, database=database) def try_create_dir(dir_name): # 判断目录是否存在,如果不存在,则创建目录 if not os.path.exists(dir_name): print(f"目录'{dir_name}'不存在,即将创建...") os.makedirs(dir_name) print(f"目录'{dir_name}'创建成功!") if __name__ == '__main__': try: try_create_dir(REPORT_DIR) try_create_dir(LOG_DIR) logging.basicConfig(filename='{}/runtime.log'.format(LOG_DIR), level=logging.INFO, format='%(asctime)s.%(msecs)03d - %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') config = configparser.ConfigParser() config.read('config.ini') client = create_clickhouse_client() last_year, last_month = get_report_year_month() start_time, end_time = get_report_start_end() qs1 = db_helper.querystring_procinst_duration_by_company(start_time, end_time) procinst_duration_by_company = client.execute(qs1)[0][0] qs2 = db_helper.querystring_procinst_duration_by_department(start_time, end_time) procinst_duration_by_department = client.execute(qs2) qs3 = db_helper.querystring_procinst_duration_by_senior(start_time, end_time) procinst_duration_by_senior = client.execute(qs3)[0][0] for index, (department, elapse) in enumerate(procinst_duration_by_department): if procinst_duration_by_senior is not None and procinst_duration_by_senior >= elapse: procinst_duration_by_department.insert(index, ('高管', procinst_duration_by_senior)) break generate_word_report() # 生成报表文件 except Exception as error: print('程序运行出错:', error) logging.error('程序运行出错:{}'.format(error)) # input("请按任意键退出...")