/** * Created by Julin on 2022/11/07. */ 'use strict'; module.exports = async function () { try { // 1. delete all rows in the table await clearStatsProcessNodes(); // 2. insert data into the table let processes = await process.clickhouse['pg_pepca_m'].query(` select v.id as version_id, v.process_id as process_id, p.name as process_name, v.bpmn_json from pg_pepca_m.workflow_process_version as v inner join pg_pepca_m.workflow_process as p on v.process_id=p.id where v.current=true and p.is_enable=true and p.deleted=false order by v.id desc `).toPromise(); let dataToDB = processes.reduce((p, c) => { let nodes = JSON.parse(c.bpmn_json); let taskNodesCount = 0; for (let key in nodes) { if (nodes[key].type == 'bpmn:UserTask') taskNodesCount++; } p.push({ processVersionId: c.version_id, processId: c.process_id, processName: c.process_name, processNodesTotal: taskNodesCount }); return p; }, []); await storageStatsProcessNodes(dataToDB); } catch (err) { process.logger.error('Something error in function [statProcessNodes]:', err); } }; async function clearStatsProcessNodes() { const transaction = await process.postgres.orm.transaction(); const models = process.postgres.models; const { Op } = process.postgres.ORM; try { await models.StatsProcessNodes.destroy({ where: { processVersionId: { [Op.gt]: 0 } }, transaction }); await transaction.commit(); } catch (err) { await transaction.rollback(); process.logger.error('Destroy data from Postgres DB [stats_process_nodes] error:', err); } }; async function storageStatsProcessNodes(data) { const transaction = await process.postgres.orm.transaction(); const models = process.postgres.models; try { await models.StatsProcessNodes.bulkCreate(data, { transaction }); await transaction.commit(); process.logger.info('Sync data to Postgres DB [stats_process_nodes]'); } catch (err) { await transaction.rollback(); process.logger.error('Storage data to Postgres DB [stats_process_nodes] error:', err); } };