const moment = require('moment') let isDev = false // isDev = true let proDebug = false proDebug = true module.exports = function (app, opts) { const alarmsPush = app.fs.scheduleInit( { interval: isDev ? '24 0 */1 * * *' : // 延长运行时间 好debug '12 */1 * * * *', // interval: '12 0 0 0 */1 *', immediate: isDev, proRun: !isDev, // disabled: true }, async () => { try { const { models, ORM: sequelize } = app.fs.dc const { apMergeDeVeAnxinProjectId = '' } = opts const { clickHouse } = app.fs const { database: anxinyun } = clickHouse.anxinyun.opts.config const { database: dataAlarm } = clickHouse.dataAlarm.opts.config const { pushBySms, pushByEmail, sendNoticeToWeb } = app.fs.utils const curMinOfYear = moment().diff(moment().startOf('year'), 'minutes') const pLog = (msg) => { if (proDebug) { console.log(msg) } } const configListRes = await models.AlarmPushConfig.findAll({ where: { del: false, disable: false }, order: ['id'], }) let pomsProjectId = new Set() let pepProjectIds = new Set() for (let { dataValues: c } of configListRes) { if (c.pomsProjectId) { c.pomsProjectId.forEach(pid => pomsProjectId.add(pid)) } } const pomsProjectRes = pomsProjectId.size ? await models.ProjectCorrelation.findAll({ where: { id: { $in: [...pomsProjectId] } } }) : [] for (let { dataValues: c } of pomsProjectRes) { if (c.pepProjectId) { pepProjectIds.add(c.pepProjectId) } } const pepProjectRes = pepProjectIds.size ? await clickHouse.projectManage.query( ` SELECT t_pim_project.id AS id, t_pim_project.project_name AS project_name, t_pim_project.isdelete AS isdelete, t_pim_project_construction.construction_status_id AS construction_status_id, t_pim_project_state.construction_status AS construction_status FROM t_pim_project LEFT JOIN t_pim_project_construction ON t_pim_project.id = t_pim_project_construction.project_id LEFT JOIN t_pim_project_state ON t_pim_project_construction.construction_status_id = t_pim_project_state.id WHERE id IN (${[...pepProjectIds].join(',')}, -1) ` ).toPromise() : [] const calcMinute2DHM = (minute) => { if (!minute && minute != 0) { return minute } let title = '' let dm = 24 * 60 let d = Math.floor(minute / dm) let h = Math.floor(minute % dm / 60) let m = (minute % dm) % 60 if (d) { title = d + '天' } if (h) { title = title + h + '时' } if (h) { title = title + m + '分' } return title } pLog('EM推送列表_' + configListRes.length,); pLog(`当前时间:${moment().format('YYYY-MM-DD HH:mm:ss')}`); pLog(`curMinOfYear / ${curMinOfYear}`) for (let { dataValues: c } of configListRes) { if (c.tacticsParams && c.tactics) { pLog(`当前运行EM配置:id=${c.id} name=${c.name}`); // pomsProjectId 是个数组 [] const { strucId, pomsProjectId, pomsStrucFactorId } = c let { interval, deviceProportion } = c.tacticsParams if (c.tactics == 'abnormal_rate') { interval = parseInt(interval) * 60 } pLog(`tactics = ${c.tactics}`) pLog(`interval / ${interval} /${curMinOfYear % parseInt(interval)}`) if (curMinOfYear % parseInt(interval) == 0 || isDev) { pLog(`符合时间断点`) const corPomsProject = pomsProjectRes.filter(poms => pomsProjectId.includes(poms.id)) let curAnxinProjectId = new Set() let pepProjectName_ = [] let pepProject_name = [] //当前有关联的项目,后面往对应项目里面插入对应的结构物-监测因素-告警源 try { for (let { dataValues: poms } of corPomsProject) { if (poms.pepProjectId) { // 找对应的项企项目 const corPepProject = pepProjectRes.find(p => p.id == poms.pepProjectId) if (corPepProject && c.timeType.some(ct => ct == corPepProject.construction_status_id)) { pepProjectName_.push(corPepProject.project_name) pepProject_name.push({ id: poms.id, anxinProjectId: poms.anxinProjectId, name: corPepProject.project_name }) } else { // 不符合当前项目的时间节点 continue } } else { // 是自定义项目 if (poms.name) { pepProjectName_.push(poms.name) pepProject_name.push({ id: poms.id, anxinProjectId: poms.anxinProjectId, name: poms.name }) } } // 筛选全部的 anxinProjectId pepProjectId for (let axId of poms.anxinProjectId) { curAnxinProjectId.add(axId) } } } catch (error) { console.error(error); throw error } const anxinProjectId = [...curAnxinProjectId] // 查当前 poms 下的结构物 并把不包含的去掉 // 可能有结构物已解绑 const strucListRes = strucId.length && anxinProjectId.length ? await clickHouse.anxinyun.query( ` SELECT DISTINCT id, t_structure.id AS id, t_structure.name AS name, t_structure.iota_thing_id AS iotaThingId, t_project.id AS projectId FROM t_project LEFT JOIN t_project_structure ON t_project_structure.project = t_project.id LEFT JOIN t_project_structuregroup ON t_project_structuregroup.project = t_project.id LEFT JOIN t_structuregroup_structure ON t_structuregroup_structure.structuregroup = t_project_structuregroup.structuregroup LEFT JOIN t_project_construction ON t_project_construction.project = t_project.id LEFT JOIN t_structure_site ON t_structure_site.siteid = t_project_construction.construction RIGHT JOIN t_structure ON t_structure.id = t_project_structure.structure OR t_structure.id = t_structuregroup_structure.structure OR t_structure.id = t_structure_site.structid WHERE project_state != -1 AND t_project.id IN (${anxinProjectId.join(',')}) AND t_structure.id IN (${strucId.join(',')}) ` ).toPromise() : [] //在数据里加入项企项目或自定义项目id strucListRes.map(f => { f.pepProject = [] pepProject_name.map(s => { if (s.anxinProjectId.includes(f.projectId)) { f.pepProject.push(s.id) } }) }) let strucThingId = [] let strucMap = {} let searchStrucIds = strucListRes.map(s => { if (s.iotaThingId) { strucThingId.push(s.iotaThingId) } strucMap[s.id] = { ...s, factor: [] } return s.id }) if (searchStrucIds.length) { searchStrucIds.unshift(-1) } else { // 没有结构物可查 continue } let pepProjectName = pepProjectName_.length ? pepProjectName_.join('
') : '' let emailTitle = `${pepProjectName_.length ? pepProjectName_.join('、') : ''}-${c.name}-` let emailSubTitle = '' let dataAlarmOption = [] let dataAlarmGroupOption = [] let dataAlarmSubType = [] let dataAlarms = [] let videoAlarmWhereOption = [] let videoAlarms = [] let appAlarmWhereOption = { confirmTime: null, } let appAlarms = [] let deviceCount = 0 let alarmDeviceCount = 0 let cameraCount = 0 let alarmCameraCount = 0 // 判断推送策略 let nowTime = moment().startOf('minute') let pointTime = moment() .subtract( parseInt(interval), // + 1440 * 365, 'minute' ) .startOf('minute') .format('YYYY-MM-DD HH:mm:ss') let newAddStartTime = pointTime let newAddEndTime = nowTime.clone() if (c.tactics == 'immediately') { // !查所有未解决告警 所以时间范围大可不必 // dataAlarmOption.push(`StartTime >= '${pointTime}'`); // appAlarmWhereOption.createTime = { $gte: pointTime } // videoAlarmWhereOption.push(`camera_status_alarm.create_time >= '${pointTime}'`) emailTitle += `即时推送服务` emailSubTitle += `截止${moment(pointTime).format('YYYY年MM月DD日 HH时mm分')}-${moment(nowTime).format('HH时mm分')}:` } else if (c.tactics == 'continue' || c.tactics == 'abnormal_rate') { // 新增的应该是上一个时间节点到上上个节点之间 dataAlarmOption.push(`StartTime <= '${pointTime}'`); appAlarmWhereOption.createTime = { $lte: pointTime } videoAlarmWhereOption.push(`camera_status_alarm.create_time <= '${pointTime}'`) // 新增的应该是上一个时间节点到上上个节点之间 newAddStartTime = moment(pointTime).subtract(parseInt(interval), 'minute').format('YYYY-MM-DD HH:mm:ss') newAddEndTime = pointTime if (c.tactics == 'continue') { emailTitle += `持续时长推送服务` emailSubTitle += `告警持续时长超${interval}分钟的告警源,详情如下:` } else { if (c.alarmType.includes('data_outages') || c.alarmType.includes('data_exception')) { // 查了设备异常率 去安心云查当前项目下的设备数量 let deviceCountRes = strucThingId.length ? await clickHouse.iot.query(` SELECT count(id) AS count FROM Device WHERE thingId IN (${strucThingId.map(t => `'${t}'`).join(',')}, '-1') `).toPromise() : [] deviceCount = deviceCountRes.length ? deviceCountRes[0].count : 0 pLog(`查得安心云当前项目下设备总数量 - ${deviceCount}`) } if (c.alarmType.includes('video_exception')) { // 查了视频异常 去安心云查 接入的 萤石 设备数量 cameraCount = searchStrucIds.length ? (await clickHouse.anxinyun.query(` SELECT count(*) AS count FROM t_video_ipc WHERE structure IN (${searchStrucIds.join(',')}) `).toPromise())[0].count : 0 pLog(`查得安心云当前项目下萤石设备总数量 - ${cameraCount}`) } emailTitle += `异常率推送服务` emailSubTitle += `持续产生时间超过${(interval / 60).toFixed(0)}小时的异常设备数量**个,异常率达到项目或结构物内设备总数量${parseInt(deviceCount) + parseInt(cameraCount)}个的 --%,详情如下` // --% ** 是在下面计算得 } } emailTitle += '_POMS飞尚运维中台' // 判断告警数据范围 if (c.alarmType.includes('data_outages')) { dataAlarmGroupOption.push(1) if (c.alarmSubType) dataAlarmSubType = dataAlarmSubType.concat(c.alarmSubType['data_outages'] || []) } if (c.alarmType.includes('data_exception')) { dataAlarmGroupOption.push(2) if (c.alarmSubType) dataAlarmSubType = dataAlarmSubType.concat(c.alarmSubType['data_exception'] || []) } if (c.alarmType.includes('strategy_hit')) { dataAlarmGroupOption.push(3) if (c.alarmSubType) dataAlarmSubType = dataAlarmSubType.concat(c.alarmSubType['strategy_hit'] || []) } if (c.alarmType.includes('video_exception')) { let videoAlarmSubType = c.alarmSubType ? ( c.alarmSubType['video_exception'] || [] ) : [] if (videoAlarmSubType && videoAlarmSubType.length == 1) { // 一个参数的时候不能兼容 sql 的 in 方法 in (1,2,3) videoAlarmSubType.push(-1) } videoAlarms = searchStrucIds.length && ( !c.alarmSubType || (videoAlarmSubType && videoAlarmSubType.length > 0) ) ? await clickHouse.vcmp.query( ` SELECT cameraAlarm.cameraId AS cameraId, cameraAlarm.cameraName AS cameraName, cameraAlarm.cameraSerialNo AS cameraSerialNo, cameraAlarm.cameraChannelNo AS cameraChannelNo, cameraAlarm.alarmId AS alarmId, cameraAlarm.createTime AS createTime, cameraAlarm.confirmContent AS confirmContent, cameraAlarm.confirmTime AS confirmTime, cameraAlarm.autoRestore AS autoRestore, camera_status.describe AS statusDescribe, camera_kind.kind AS cameraKind, "gbCamera".online AS cameraOnline, anxinIpc.t_video_ipc.name AS anxinIpcPosition, anxinStation.id AS anxinStationId, anxinStation.name AS anxinStationName, anxinStation.factor AS anxinStationFactorId, anxinStruc.name AS strucName, anxinStruc.id AS strucId FROM ( SELECT camera.id AS cameraId, camera.gb_id AS gbId, camera.name AS cameraName, camera.kind_id AS cameraKindId, camera_status_alarm.id AS alarmId, camera_status_alarm.platform AS platform, camera_status_alarm.create_time AS createTime, camera_status_alarm.status_id AS statusId, camera_status_alarm.serial_no AS cameraSerialNo, camera_status_alarm.channel_no AS cameraChannelNo, camera_status_alarm.confirm AS confirmContent, camera_status_alarm.auto_restore AS autoRestore, camera_status_alarm.confirm_time AS confirmTime FROM camera_status_alarm INNER JOIN ${anxinyun}.t_video_ipc ON toString(${anxinyun}.t_video_ipc.channel_no) = camera_status_alarm.channel_no AND ${anxinyun}.t_video_ipc.serial_no = camera_status_alarm.serial_no AND ${anxinyun}.t_video_ipc.structure IN (${searchStrucIds.join(',')}) INNER JOIN camera ON camera.serial_no = camera_status_alarm.serial_no AND camera.channel_no = camera_status_alarm.channel_no AND camera.delete = false AND camera.recycle_time is null ${c.alarmSubType ? `AND camera.kind_id in (${videoAlarmSubType.join(',')})` : ""} WHERE camera_status_alarm.confirm_time IS null ${videoAlarmWhereOption.length ? ` AND ${videoAlarmWhereOption.join(' AND ')}` : ''} ) AS cameraAlarm LEFT JOIN camera_status ON cameraAlarm.platform = camera_status.platform AND cameraAlarm.statusId = camera_status.id LEFT JOIN "gbCamera" ON "gbCamera".id = cameraAlarm.gbId LEFT JOIN camera_kind ON camera_kind.id = cameraAlarm.cameraKindId LEFT JOIN ${anxinyun}.t_video_ipc AS anxinIpc ON toString(anxinIpc.channel_no) = cameraAlarm.cameraChannelNo AND anxinIpc.serial_no = cameraAlarm.cameraSerialNo LEFT JOIN ${anxinyun}.t_structure AS anxinStruc ON anxinStruc.id = anxinIpc.structure AND anxinStruc.id IN (${searchStrucIds.join(',')}) LEFT JOIN ${anxinyun}.t_video_ipc_station AS anxinIpcStation ON anxinIpcStation.ipc = anxinIpc.id LEFT JOIN ${anxinyun}.t_sensor AS anxinStation ON anxinStation.id = anxinIpcStation.station ORDER BY cameraAlarm.createTime DESC ` ).toPromise() : [] let returnD = [] let positionD = {} // 每个设备一个告警 for (let a of videoAlarms) { if (positionD[a.cameraId]) { let curD = returnD[positionD[a.cameraId].positionReturnD] if (a.strucId && !curD.struc.some(s => s.id == a.strucId)) { curD.struc.push({ id: a.strucId, projectId: a.projectId, name: a.strucName }) } if (a.anxinStationId && !curD.station.some(s => s.id == a.anxinStationId)) { curD.station.push({ id: a.anxinStationId, name: a.anxinStationName, position: a.anxinIpcPosition }) } } else { /**按监测因素 factor 筛选告警*/ // if (pomsStrucFactorId) { // if (!a.strucId || !a.anxinStationFactorId) { // // 当前告警没有绑定结构物或者摄像头没有绑定测点 // continue // } else if (!pomsStrucFactorId[a.strucId]) { // // 推送配置没配置这个结构物 // continue // } else if (!pomsStrucFactorId[a.strucId].includes(a.anxinStationFactorId)) { // // 不包含这个监测因素 // continue // } // } let d = { cameraId: a.cameraId, cameraName: a.cameraName, camerOnline: a.cameraOnline, cameraSerialNo: a.cameraSerialNo, cameraChannelNo: a.cameraChannelNo, autoRestore: a.autoRestore, createTime: a.createTime, updateTime: a.updateTime, platform: a.platform, statusDescribe: a.statusDescribe, alarmId: a.alarmId, confirmContent: a.confirmContent, confirmTime: a.confirmTime, cameraKind: a.cameraKind, factorId: a.anxinStationFactorId, struc: [], station: [] } if (a.strucId) { d.struc.push({ id: a.strucId, projectId: a.projectId, name: a.strucName }) } if (a.anxinStationId) { d.station.push({ id: a.anxinStationId, name: a.anxinStationName, position: a.anxinIpcPosition }) } returnD.push(d) positionD[a.cameraId] = { positionReturnD: returnD.length - 1 } } } videoAlarms = returnD } if (c.alarmType.includes('app_exception')) { if (c.alarmSubType) { appAlarmWhereOption.type = { $in: c.alarmSubType['app_exception'] || [] } } appAlarms = c.alarmSubType && c.alarmSubType['app_exception'] && c.alarmSubType['app_exception'].length ? await models.AppAlarm.findAll({ where: appAlarmWhereOption, order: [['createTime', 'DESC']], include: [{ model: models.App, required: true, include: [{ model: models.ProjectApp, where: { projectId: { $in: pomsProjectId } }, required: true, }] }] }) : [] } if (c.alarmType.includes('device_exception')) { dataAlarmGroupOption.push(4) dataAlarmGroupOption.push(5) if (c.alarmSubType) dataAlarmSubType = dataAlarmSubType.concat(c.alarmSubType['device_exception']) } // 查数据告警 三警合一 if (dataAlarmGroupOption.length && searchStrucIds.length) { dataAlarmGroupOption.push(-1) dataAlarmOption.push(`AlarmGroup IN (${dataAlarmGroupOption.join(',')})`) let dataAlarmSubType_ = dataAlarmSubType.filter(s => s) if (c.alarmSubType && dataAlarmSubType_.length) { dataAlarmSubType_.push(-1) dataAlarmOption.push(`AlarmGroupUnit IN (${dataAlarmSubType_.join(',')})`) } dataAlarms = !c.alarmSubType || dataAlarmSubType_.length ? await clickHouse.dataAlarm.query(` SELECT * FROM alarms LEFT JOIN ${anxinyun}.t_sensor AS anxinStation ON toString(anxinStation.id) = alarms.SourceId AND alarms.SourceTypeId = 2 WHERE ${`State NOT IN (3, 4) AND `} StructureId IN (${searchStrucIds.join(',')}) ${dataAlarmOption.length ? ' AND ' + dataAlarmOption.join(' AND ') : ''} ORDER BY StartTime DESC `).toPromise() : [] } let dataAlarmTitle2 = [{ n: '项目', k: '', v: pepProjectName }, { n: '结构物', k: '', f: (d) => { return (strucMap[d.StructureId] || { name: '' }).name // return (strucListRes.find(s => s.id == d.StructureId) || { name: '' }).name } }, { n: '告警源名称', k: 'SourceName' }, { n: '告警源类型', k: '', f: (d) => { switch (d.SourceTypeId) { case 0: return 'DTU' case 1: return '传感器' case 2: return '测点' default: return '' } } }, { n: '告警信息', k: 'AlarmContent' }, { n: '合理值', k: '', f: (d) => d.AlarmTypeCode == 3018 ? "是" : "否" }, { n: '告警等级(当前)', k: '', f: (d) => { switch (d.CurrentLevel) { case 1: return '一级' case 2: return '二级' case 3: return '三级' default: return '' } } }, { n: '持续时间', k: '', f: (d) => { return d.StartTime ? '超过' + calcMinute2DHM(moment().diff(moment(d.StartTime), 'minutes')) : '' } },] let dataAlarmTitle = [{ n: '项目', k: '', v: pepProjectName }, { n: '结构物', k: '', f: (d) => { return (strucMap[d.StructureId] || { name: '' }).name // return (strucListRes.find(s => s.id == d.StructureId) || { name: '' }).name } }, { n: '告警源名称', k: 'SourceName' }, { n: '告警源类型', k: '', f: (d) => { switch (d.SourceTypeId) { case 0: return 'DTU' case 1: return '传感器' case 2: return '测点' default: return '' } } }, { n: '告警信息', k: 'AlarmContent' }, { }, { n: '告警等级(当前)', k: '', f: (d) => { switch (d.CurrentLevel) { case 1: return '一级' case 2: return '二级' case 3: return '三级' default: return '' } } }, { n: '持续时间', k: '', f: (d) => { return d.StartTime ? '超过' + calcMinute2DHM(moment().diff(moment(d.StartTime), 'minutes')) : '' } },] // let dataLnterruptTitle = [{ // n: '项目', // k: '', // v: pepProjectName // }, { // n: '结构物', // k: '', // f: (d) => { // return (strucMap[d.StructureId] || { name: '' }).name // // return (strucListRes.find(s => s.id == d.StructureId) || { name: '' }).name // } // }, { // n: '监测因素(中断比例)', // k: '', // f: (d) => { // // d.factor.join('') // // console.log(21211231131,d.factor); // let data = [] // d.factor.map(f => data.push(f.name + '(' + f.breakData + '/' + f.sum + ')')) // return data.join('
') // } // }, { // n: '告警源名称', // k: 'SourceName' // }, { // n: '告警源类型', // k: '', // f: (d) => { // switch (d.SourceTypeId) { // case 0: // return 'DTU' // case 1: // return '传感器' // case 2: // return '测点' // default: // return '' // } // } // }, { // n: '告警信息', // k: 'AlarmContent' // }, { // n: '告警等级(当前)', // k: '', // f: (d) => { // switch (d.CurrentLevel) { // case 1: // return '一级' // case 2: // return '二级' // case 3: // return '三级' // default: // return '' // } // } // }, { // n: '持续时间', // k: '', // f: (d) => { // return d.StartTime ? // '超过' + calcMinute2DHM(moment().diff(moment(d.StartTime), 'minutes')) : '' // } // }, // { // n: '中断时间', // k: '', // f: (d) => { // return d.StartTime ? // moment(d.StartTime).format('YYYY-MM-DD HH:mm:ss') : '' // } // },] let dataLnterruptTitle = [{ n: '项目', k: 'name', // v: pepProjectName }, { n: '结构物', k: 'Structure', // f: (d) => { // return (strucMap[d.StructureId] || { name: '' }).name // // return (strucListRes.find(s => s.id == d.StructureId) || { name: '' }).name // } }, { n: '监测因素(中断比例)', k: 'factor', // f: (d) => { // // d.factor.join('') // let data = [] // d.factor.map(f => data.push(f.name + '(' + f.breakData + '/' + f.sum + ')')) // return data.join('
') // } }, { n: '告警源', k: 'SourceName' }, { n: '中断时间', k: '', f: (d) => { return d.StartTime ? moment(d.StartTime).format('YYYY-MM-DD HH:mm:ss') : '' } },] let videoAlarmTitle = [{ n: '项目', k: '', v: pepProjectName }, { n: '结构物', k: '', f: (d) => { return d.struc.map(ds => ds.name).join('、') } }, { n: '告警源名称', k: 'cameraName' }, { n: '告警源类型', k: 'cameraKind' }, { n: '序列号', k: 'cameraSerialNo' }, { n: '通道号', k: 'cameraChannelNo' }, { n: '测点', k: '', f: (d) => { return d.station ? d.station.map(ds => ds.name).join('、') : '' } }, { n: '位置', k: '', f: (d) => { return d.station ? d.station.map(ds => ds.position).join('、') : '' } }, { n: '告警信息', k: 'statusDescribe' }, { n: '持续时间', k: '', f: (d) => { return d.createTime ? '超过' + calcMinute2DHM(moment().diff(moment(d.createTime), 'minutes')) : '' } },] let appAlarmTitle = [{ n: '项目', k: '', v: pepProjectName }, { n: '应用名称', k: '', f: (d) => { return d.app ? d.app.name : '' } }, { n: '异常类型', k: '', f: (d) => { if (d.type == 'element') { return '元素异常' } else if (d.type == 'apiError') { return '接口报错' } else { return '' } } }, { n: '异常信息', k: 'alarmContent' }, { n: 'URL地址', k: 'cameraName', f: (d) => { return d.app && d.app.url ? `${d.app.url}` : '' } }, { n: '持续时间', k: '', f: (d) => { return d.createTime ? '超过' + calcMinute2DHM(moment().diff(moment(d.createTime), 'minutes')) : '' } },] let ifEmailSend = false let tableTitlePostfix = ',详情如下:' function packageTableTitle (titleArr) { let tableTitle = '' for (let t of titleArr) { tableTitle += `${t.n}` } tableTitle += '' return tableTitle } function packageTableData ({ data, titleArr }) { let tableData = '' for (let t of titleArr) { if (t.v) { tableData += `${t.v || ''}` } else if (t.f) { tableData += `${t.f(data) || ''}` } else if (t.k) { tableData += `${data[t.k] || ''}` } else { tableData += `` } } tableData += '' return tableData } let apMergeDeVeAnxinProjectId_ = apMergeDeVeAnxinProjectId ? apMergeDeVeAnxinProjectId.split(',') : []; let apMergeDeVeAlarms = { // 结构物id :{ // data_exception 数据异常告警:[], // video_exception 视频异常告警:[] // } } let dataAlarmG1 = []; let dataAlarmG2 = []; let dataAlarmG3 = []; let dataAlarmG45 = []; let deviceStatistic = new Set() let dataAlarmDetails = [] pLog(`查得数据告警 ${dataAlarms.length} 条`); // pLog(dataAlarms); if (dataAlarms.length) { const alarmIds = dataAlarms .map(ar => "'" + ar.AlarmId + "'") dataAlarmDetails = await clickHouse.dataAlarm.query(` SELECT * FROM alarm_details WHERE AlarmId IN (${alarmIds.join(',')}, '-1') AND AlarmState = 0 `).toPromise() } pLog(`查得数据告警详情数据 ${dataAlarmDetails.length} 条`); let deviceIds = new Set() for (let d of dataAlarms) { d = { ...d, stationId: d.id } /** 按监测因素筛选 且为测点告警 */ // if (pomsStrucFactorId && d.SourceTypeId == 2) { // // 做了监测因素筛选 且当前告警有监测因素 // if (!d.factor || d.factor == 0) { // // 监测因素不对劲 // continue // } else if (!d.StructureId) { // // 当前告警没有绑定结构物 // continue // } else if (!pomsStrucFactorId[d.StructureId]) { // // 推送配置没配置这个结构物 // continue // } else if (!pomsStrucFactorId[d.StructureId].includes(d.factor)) { // // 不包含这个监测因素 // continue // } // } if (d.AlarmGroup == 1) { // if (d.StructureId) { // //查询结构物的监测因素 // const factorData = await clickHouse.anxinyun.query( // ` // SELECT // t_structure_factor.structure AS structureId, // t_factor.name AS name, // t_factor.id AS id // FROM // t_structure_factor // LEFT JOIN t_factor // ON t_factor.id = t_structure_factor.factor // WHERE // t_structure_factor.structure = (${d.StructureId}) // ` // ).toPromise() || [] // let factorId = factorData.map(f => f.id) // //查询结构物对应的设备 // const equipment = await clickHouse.anxinyun.query( // ` // SELECT // t_sensor.id AS id, // t_sensor.name AS name, // t_sensor.structure AS structureId, // t_sensor.factor AS factorId, // t_device_sensor.iota_device_id AS iotaDeviceId // FROM // t_sensor // LEFT JOIN t_device_sensor // ON t_device_sensor.sensor = t_sensor.id // WHERE // t_sensor.structure = (${d.StructureId}) // AND // t_sensor.factor IN (${factorId.join(',')}) // ` // ).toPromise() || [] // const alarmDatas = await clickHouse.dataAlarm.query( // ` // SELECT // alarms.AlarmId AS alarmId, // alarms.State AS state, // alarms.AlarmGroup AS alarmGroup, // alarms.SourceId AS sourceId // FROM // alarms // WHERE // alarms.StructureId = (${d.StructureId}) // AND // alarms.AlarmGroup = 1 // AND // alarms.State < 3 // ` // ).toPromise() || [] // equipment.map(r => { // alarmDatas.map(u => { // if (r.iotaDeviceId == u.sourceId) { // r.sourceId = u.sourceId // } // }) // }) // d.factor = [] // factorData.map(c => { // let breakData = equipment.filter(m => (m.factorId == c.id && m.sourceId)) // d.factor.push({ // name: c.name, // sum: equipment.filter(d => d.factorId == c.id).length, // breakData: breakData.length // }) // d.factors = equipment.filter(d => d.factorId == c.id) // d.breakData = breakData // }) // } // dataAlarmG1.push(d) } else if (d.AlarmGroup == 2) { dataAlarmG2.push(d) /** 注1 * 根据指定的安心云结构物把数据异常和视频告警合并在一起的代码 * 还要指定是扬尘设备 * 以测点关联 */ if ( isDev || ( apMergeDeVeAnxinProjectId_.length && d.SourceName && d.SourceName.includes('扬尘') && apMergeDeVeAnxinProjectId_.some(pid => pid == (strucMap[d.StructureId] || {}).projectId) ) ) { // SourceTypeId 0: 'DTU' / 1: '传感器' / 2: '测点' if (d.SourceTypeId != 2) { deviceIds.add(d.SourceId) } if (apMergeDeVeAlarms[d.StructureId]) { apMergeDeVeAlarms[d.StructureId].data_exception.push(d) } else { apMergeDeVeAlarms[d.StructureId] = { data_exception: [d], video_exception: [] } } } // 注1 END } else if (d.AlarmGroup == 3) { /** 按监测因项 factor-item 的名称筛选*/ if (pomsStrucFactorId) { // 兼容之前的设置 有这个信息才进行判断 if (!d.StructureId) { // 当前告警没有绑定结构物 continue } else if (!pomsStrucFactorId[d.StructureId]) { // 推送配置没配置这个结构物 continue } else { let corDetail = dataAlarmDetails.find(da => da.AlarmId == d.AlarmId) if (proDebug) { console.log(`相应告警详情(策略命中):`, corDetail); } if (corDetail) { // 判断告警详情信息里有没有监测项关键字 if (!pomsStrucFactorId[d.StructureId].some(factorItemName => { return corDetail.Content.includes(factorItemName) })) { continue } } else { continue } } } if (proDebug) { console.log(`符合条件的策略命中 + 1`, d); } dataAlarmG3.push(d) } else if (d.AlarmGroup == 4 || d.AlarmGroup == 5) { dataAlarmG45.push(d) } deviceStatistic.add(d.SourceId) } if (c.tactics == 'abnormal_rate') { pLog(`异常设备数量 - ${deviceStatistic.size}`) pLog(`异常视频告警数量 - ${videoAlarms.length}`) let abnormalDeviceCount = deviceStatistic.size + videoAlarms.length let rate = (abnormalDeviceCount / (parseInt(deviceCount) + parseInt(cameraCount))) * 100; pLog(`异常比率 ${rate} 设定值 ${deviceProportion}`) if (rate < parseFloat(deviceProportion)) { // 设备异常率低于设定值 continue } emailSubTitle = emailSubTitle .replace('--%', rate.toFixed(1) + '%') .replace('**', abnormalDeviceCount.toFixed(0)) } // 注1 if (apMergeDeVeAnxinProjectId_.length || isDev) { for (let a of videoAlarms) { let existStruc = a.struc.find(asc => apMergeDeVeAlarms[asc.id] || isDev) if (existStruc) { apMergeDeVeAlarms[existStruc.id].video_exception.push(a) } } } // 注1 END let html = `
${emailSubTitle}
` let dataAlarmG1Data = dataAlarms.filter(a => a.AlarmGroup == 1) if (dataAlarmG1Data.length) { let dataAlarmG1StructureId = new Set() dataAlarmG1Data.map(c => { dataAlarmG1StructureId.add(c.StructureId) }) //查询结构物的监测因素 const factorData = await clickHouse.anxinyun.query( ` SELECT t_structure_factor.structure AS structureId, t_factor.name AS name, t_factor.id AS id FROM t_structure_factor LEFT JOIN t_factor ON t_factor.id = t_structure_factor.factor WHERE t_structure_factor.structure IN (${[...dataAlarmG1StructureId]}) ` ).toPromise() || [] let factorId = factorData.map(f => f.id) //查询结构物对应的测点 const equipment = await clickHouse.anxinyun.query( ` SELECT t_sensor.id AS id, t_sensor.name AS name, t_sensor.structure AS structureId, t_sensor.factor AS factorId, t_device_sensor.iota_device_id AS iotaDeviceId FROM t_sensor LEFT JOIN t_device_sensor ON t_device_sensor.sensor = t_sensor.id WHERE t_sensor.structure IN (${[...dataAlarmG1StructureId]},) AND t_sensor.factor IN (${factorId.join(',')}) ` ).toPromise() || [] const alarmDataRes = await clickHouse.dataAlarm.query( ` SELECT alarms.AlarmId AS alarmId, alarms.State AS state, alarms.AlarmGroup AS alarmGroup, alarms.SourceId AS sourceId, alarms.StartTime AS StartTime, alarms.SourceName AS SourceName, alarms.AlarmCode AS AlarmCode, alarms.SourceTypeId AS SourceTypeId, alarms.StructureId AS StructureId FROM alarms WHERE alarms.StructureId In (${[...dataAlarmG1StructureId]}) AND alarms.AlarmGroup = 1 AND alarms.State < 3 ` // , // alarms.subStations AS subStations ).toPromise() || [] let alarmDatas = [] for (let alarms of alarmDataRes) { if (alarms.subStations && alarms.subStations.length) { pLog('拆解 subStations' + alarms.alarmId + ' ' + alarms.subStations.length + '个') for (let subStation of alarms.subStations) { alarmDatas.push({ ...alarms, alarmId: subStation }) } } else { alarmDatas.push(alarms) } } // 为设备分配自己产生的告警数据 let matchedAlarmIds = [] // 这一步能被分配的告警应该也能最终体现在邮件中 equipment.map(f => { f.alarmDatas = [] alarmDatas.map(r => { if (r.sourceId == f.iotaDeviceId) { f.alarmDatas.push({ ...r }) matchedAlarmIds.push(r.alarmId) } }) }) // 为监测因素分配绑定的设备 factorData.map(v => { v.devices = [] equipment.map(f => { if (v.id == f.factorId && v.structureId == f.structureId) { v.devices.push({ ...f }) } }) if (strucMap[v.structureId]) { // 并为 strucMap 补充 factor 监测因素信息 strucMap[v.structureId].factor.push({ ...v }) } }) let projectList = [] for (let key in strucMap) { if (strucMap[key].factor.length > 0) { // 如果有监测因素信息 // 存结构物信息至 projectList projectList.push(strucMap[key]) } } if (projectList.length) { pepProject_name.map(s => { s.projects = [] projectList.map(f => { // 匹配 pepProject_name 中的项目 // 匹配到则向 projects 中插入 f 也就是结构物信息 + 项目的id if (s.anxinProjectId.includes(f.projectId)) { s.projects.push(f) } }) }) } pLog('数据组合' + JSON.stringify(pepProject_name)) // 上面的代码自内而外构建数据 // 下面的逻辑自外向内!!! // 处理那些没有被匹配到的告警信息 // 将设备没有绑定到测点的告警也推出去 for (let a of alarmDatas) { if (!matchedAlarmIds.includes(a.alarmId)) { pLog('未匹配到的告警 ' + a.alarmId) // let curStruc = strucMap[a.StructureId] // if (curStruc) { // // curStruc 可以推出 告警属于哪个 project -> pomsProject // let curProject = pepProject_name.filter(p => p.anxinProjectId && p.anxinProjectId.includes(curStruc.projectId)); // for (let cp of curProject) { // // cp.projects 是结构物信息 // let curStruc = cp.projects.find(cpp => cpp.id == a.StructureId) // if (!curStruc) { // console.error(`没有查到结构物信息:${a.StructureId}?!不应该不应该!`); // } else { // // let emptyFactor = { id: -1 } // let emptyFactor = curStruc.factor.find(f => f.id == -1) // if (!emptyFactor) { // curStruc.factor.push({ // id: -1, // devices: [{ // alarmDatas: [ // { ...a } // ], // iotaDeviceId: a.sourceId, // name: a.SourceName, // structureId: a.StructureId, // }] // }) // } else { // let curDevice = emptyFactor.devices.find(d => d.iotaDeviceId == a.sourceId) // if (!curDevice) { // emptyFactor.devices.push({ // alarmDatas: [ // { ...a } // ], // iotaDeviceId: a.sourceId, // name: a.SourceName, // structureId: a.StructureId, // }) // } else { // curDevice.alarmDatas.push({ ...a }) // } // } // } // } // } } } pepProject_name.forEach(h => { let rowspan1 = 0 if (h.projects.length) { h.projects.forEach(x => { let rowspan2 = 0 if (x.factor.length) { x.factor.forEach(f => { let rowspan3 = 0 let problem = 0 if (f.devices.length) { f.devicesLength = f.devices.length f.devices = f.devices.filter(b => b.alarmDatas && b.alarmDatas.length > 0) if (f.devices.length) { f.devices.forEach(c => { if (c.alarmDatas.length) { problem += 1 let grouped = c.alarmDatas.reduce((acc, cur) => { if (!acc[cur.AlarmCode]) { acc[cur.AlarmCode] = [] } acc[cur.AlarmCode].push(cur) return acc }, {}) || {} let alarmData = [] for (let key in grouped) { rowspan1 += 1 rowspan2 += 1 rowspan3 += 1 grouped[key].sort((a, b) => new Date(b.StartTime) - new Date(a.StartTime)) alarmData.push(grouped[key][0]) } c.alarmDatas = alarmData } else { delete c.alarmDatas } }) // 进一步筛选了有告警的设备 f.devices = f.devices.filter(b => b.alarmDatas && b.alarmDatas.length > 0) } else { delete f.devices } } else { delete f.devices } f.rowspan = rowspan3 f.problem = problem }) x.factor = x.factor.filter(b => b.devices && b.devices.length > 0) } else { delete x.factor } x.rowspan = rowspan2 }) h.projects = h.projects.filter(b => b.factor && b.factor.length > 0) } else { delete h.projects } h.rowspan = rowspan1 }) dataAlarmG1 = pepProject_name //.filter(b => b.projects && b.projects.length > 0) || [] } function packageAlarmData2Table ({ titlePrefix, alarmData, alarmTitleArr, keyOfStartTime = 'StartTime', type }) { pLog(`${titlePrefix} ${alarmData.length}`) if (!alarmData.length) { return '' } ifEmailSend = true let tableTitlePrefix = titlePrefix + '告警源' let newAddCount = 0 let alarmHtml = '' let alarmContent = '' let alarmHtmlTitle = packageTableTitle(alarmTitleArr) let accumulate = 0 if (type == 1) { alarmData.map((h, hi) => { accumulate += h.rowspan || 0 if (h.projects && h.projects.length) { h.projects.map((x, xi) => { let showOne1 = (xi == 0) ? true : false if (x.factor && x.factor.length) { x.factor.map((f, fi) => { let showOne2 = (fi == 0) ? true : false if (f.devices && f.devices.length) { f.devices.map((c, ci) => { let showOne3 = (ci == 0) ? true : false if (c.alarmDatas && c.alarmDatas.length) { //取设备里面告警最新的一条数据 if (c.alarmDatas.length > 0) { c.alarmDatas.map((d, di) => { let showOne4 = (di == 0) ? true : false let tableData = '' for (let t of alarmTitleArr) { if (t.f) { tableData += `` } else if (t.k) { switch (t.k) { case 'name': if (showOne1 && showOne2 && showOne3 && showOne4) { tableData += `` } break; case 'Structure': if (showOne2 && showOne3 && showOne4) { tableData += `` } break; case 'factor': if (showOne3 && showOne4) { tableData += `` } break; case 'SourceName': tableData += `` break; default: break; } } } tableData += '' alarmContent += tableData if (d[keyOfStartTime] && moment(d[keyOfStartTime]).isBetween(newAddStartTime, newAddEndTime)) { newAddCount++ } }) } } }) } }) } }) } }) } else { for (let [index, a] of alarmData.entries()) { alarmContent += packageTableData({ data: a, titleArr: alarmTitleArr }) if (a[keyOfStartTime] && moment(a[keyOfStartTime]).isBetween(newAddStartTime, newAddEndTime)) { newAddCount++ } } accumulate = alarmData.length } tableTitlePrefix += titlePrefix != '数据异常&视频异常' ? c.tactics == 'abnormal_rate' ? `${alarmData.length}个` : `新增${newAddCount}个,未解决累计${accumulate}个` + tableTitlePostfix : '' alarmHtml += `' alarmHtml += alarmHtmlTitle alarmHtml += alarmContent alarmHtml += '
${t.f(d) || ''}${h[t.k] || ''}${x['name'] || ''}${((f['name'] || '--') + '(' + f.problem + '/' + f.devicesLength + ')') || ''}${d[t.k] || ''}
` + tableTitlePrefix + '

' return alarmHtml } if (c.alarmType.includes('data_outages')) { html += packageAlarmData2Table({ titlePrefix: '数据中断', alarmData: dataAlarmG1, alarmTitleArr: dataLnterruptTitle, type: 1 }) } if (c.alarmType.includes('data_exception')) { html += packageAlarmData2Table({ titlePrefix: '数据异常', alarmData: dataAlarmG2, alarmTitleArr: dataAlarmTitle2, }) } if (c.alarmType.includes('strategy_hit')) { html += packageAlarmData2Table({ titlePrefix: '策略命中', alarmData: dataAlarmG3, alarmTitleArr: dataAlarmTitle, }) } if (c.alarmType.includes('video_exception')) { html += packageAlarmData2Table({ titlePrefix: '视频异常', alarmData: videoAlarms, alarmTitleArr: videoAlarmTitle, keyOfStartTime: 'createTime', }) } if (c.alarmType.includes('app_exception')) { html += packageAlarmData2Table({ titlePrefix: '应用异常', alarmData: appAlarms, alarmTitleArr: appAlarmTitle, keyOfStartTime: 'createTime', }) } if (c.alarmType.includes('device_exception')) { html += packageAlarmData2Table({ titlePrefix: '设备异常', alarmData: dataAlarmG45, alarmTitleArr: dataAlarmTitle, }) } if (Object.keys(apMergeDeVeAlarms).length) { if (proDebug) { console.log(`查得数据异常、视频异常合并の告警:`); console.log(apMergeDeVeAlarms); } let deviceSensorRes = [] if (deviceIds.size) { const device4Search = [...deviceIds] .map(id => "'" + id + "'") deviceSensorRes = await clickHouse.anxinyun.query(` SELECT iota_device_id, sensor FROM t_device_sensor WHERE iota_device_id ${device4Search.length > 1 ? `IN (${device4Search.join(',')})` : `= ${device4Search[0]}`} `).toPromise() } let alarmTitle = dataAlarmTitle.concat( videoAlarmTitle.slice(2).map(v => { return { ...v, n: v.n == '持续时间' ? '摄像头告警' + v.n : '摄像头' + v.n } }) ) let alarmData = [] for (let aKey in apMergeDeVeAlarms) { let curStrucAlarm = apMergeDeVeAlarms[aKey] for (let de of curStrucAlarm.data_exception) { if (!de.id) { let corSensor = deviceSensorRes.find(ds => ds.iota_device_id == de.SourceId) if (corSensor) { de.id = corSensor.sensor } } let corVideoException = curStrucAlarm.video_exception.filter(v => { // ! de.id 是告警信息关联查出来的测点的id return v.station.some(vs => vs.id == de.id) }) if (!corVideoException.length) { // 构造一个长度 以便在for循环里把 data_exception 放进去 corVideoException.push({}) } for (let ve of corVideoException) { alarmData.push({ ...de, ...ve }) } } } html += packageAlarmData2Table({ titlePrefix: '数据异常&视频异常', alarmData: alarmData, alarmTitleArr: alarmTitle, }) } pLog(`ifEmailSend:${ifEmailSend}`); if (ifEmailSend) { // 查接收人的信息 const receiverRes = c.receiverPepUserId.length ? await clickHouse.pepEmis.query(` SELECT id, name, email FROM user WHERE id IN (${c.receiverPepUserId.join(',')},-1) `).toPromise() : [] let receiverId = [] let emails = receiverRes.reduce((arr, r) => { if (r.email) { arr.push(r.email) receiverId.push({ id: r.id, name: r.name }) } return arr }, []) if (isDev) { // !开发测试用的数据 emails = ['1650192445@qq.com'] // emails = ['wen.lele@free-sun.com.cn'] } if (emails.length) { pLog(`推送给${emails.length}人`); await pushByEmail({ email: emails, title: emailTitle, text: '', html: html }) //存日志 存动态 socket到前端 let dataToSave = { time: moment().format(), pushConfigId: c.id, cfgName: c.name,//策略名称 tactics: c.tactics, tacticsParams: c.tacticsParams, projectCorrelationId: pomsProjectId, toPepUserIds: receiverId.map(r => r.id) } let r = await models.EmailSendLog.create(dataToSave, { returning: true }) let dynamic = { time: r.dataValues.time, emailSendId: r.dataValues.id, // projectCorrelationId: r.dataValues.projectCorrelationId, type: 2//通知 } await models.LatestDynamicList.create(dynamic); //消息推送到前端 await sendNoticeToWeb(receiverId, dataToSave); } } } } } } catch (error) { console.error(error); } } ) return { alarmsPush, } }