'use strict'; const moment = require('moment'); const Kafka = require('kafka-node'); module.exports = async function factory(app, opts) { try { const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL, fromOffset: true }); const producer = new Kafka.HighLevelProducer(client); producer.on('error', function (err) { app.fs.logger.log('error', "[FS-KAFKA]", err); }); producer.on("ready", function () { console.log('Kafka ready') }) let groupId = 'group' + Math.random() let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId, fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm']) consumer.on('message', async function (message) { let msg = JSON.parse(message.value) await handleMsg(msg); }) const kafka = { producer: producer, configUpdateMessage: opts.configUpdateMessage || {} }; app.fs.kafka = kafka; app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success"); /*----------------------推送节流---20230103---------------------------------*/ //全局变量 let kafkaMsgObj = { time: null, dataMap: [], toHandle: false//有待处理 } async function handleMsg(msg) { try { let { messageMode } = msg if (['AlarmGeneration', 'AlarmAutoElimination'].indexOf(messageMode) != -1) { if (!kafkaMsgObj.time || moment() > moment(kafkaMsgObj.time).add(2, 'minute')) {//首次 || 跟上次时间间隔大于2分钟 直接发送 let structsAche = await getStructsAche() if (structsAche.dataList.length) { await savePullAlarm(msg, structsAche.dataList)//直接给处理了 kafkaMsgObj.time = moment().format();//记录本次发送时间 kafkaMsgObj.dataMap = [] kafkaMsgObj.toHandle = false } } else {//放进队列 kafkaMsgObj.dataMap.push(msg); kafkaMsgObj.toHandle = true;//有待发送 } } } catch (error) { console.error(error); } } setInterval(async () => { if (kafkaMsgObj.toHandle) {//有待发送 let structsAche = await getStructsAche() if (structsAche.dataList.length) { kafkaMsgObj.dataMap.forEach(async d => { await savePullAlarm(d, structsAche.dataList) }) kafkaMsgObj.time = moment().format() kafkaMsgObj.dataMap = [] kafkaMsgObj.toHandle = false } else { console.log(`获取结构物列表失败`); } } }, 60 * 1000 * 2)//2分钟 /*----------------------推送节流---20230103---------------------------------*/ let structsAche = { dataList: [], expireTime: null//10分钟更新一次结构物列表 } async function getStructsAche() { const { utils: { getAxyStructs } } = app.fs try { if (!structsAche.dataList.length || moment() > moment(structsAche.expireTime)) { let structList = await getAxyStructs(); if (structList && structList.length) { structsAche.dataList = structList; structsAche.expireTime = moment().add(10, 'minute').format('YYYY-MM-DD HH:mm:ss'); } } return structsAche; } catch (err) { console.log(`获取结构物列表失败, error: ${err}`); } } //保存告警[发现] let constAlarmGroups = { 1: '数据中断', 2: '数据异常', 3: '策略命中', 4: '设备异常', 5: '设备异常' } async function savePullAlarm(msg, structs) { const { clickHouse, utils: { sendAppearToWeb, sendConfirmToWeb } } = app.fs try { let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg; // let structsAche = await getStructsAche() // if (structsAche) { //let structs = structsAche.dataList;//结构物列表 const { models } = app.fs.dc let exist = structs.find(s => s.strucId == structureId); if (exist) { let alarm_group = await clickHouse.anxinyun.query( `SELECT alarm_group, alarm_group_unit FROM t_alarm_code WHERE code='${alarmCode}'`).toPromise() let type = null, typeId = null, alarmGroup = null;//告警类型 异常类型 if (alarm_group) { typeId = alarm_group[0].alarm_group; type = constAlarmGroups[alarm_group[0].alarm_group];//告警类型 let gId = alarm_group[0].alarm_group_unit; let alarm_group_unit = await clickHouse.anxinyun.query( `SELECT name FROM t_alarm_group_unit WHERE id=${gId}`).toPromise(); alarmGroup = alarm_group_unit.length ? alarm_group_unit[0].name : null//异常类型 } let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1 let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间 return { projectCorrelationId: d, alarmInfo: { messageMode, sourceName, alarmTypeCode, alarmCode, content, type: alarmGroup, typeId }, time: time, type//告警类型 } }) let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); let dynamics = rslt.map(r => { return { time: r.time, alarmAppearId: r.id, projectCorrelationId: r.projectCorrelationId, type: 1//发现 } }) await models.LatestDynamicList.bulkCreate(dynamics); //消息推送到前端 if (datas.length) { await sendAppearToWeb(datas, 'data', exist); } } else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2 let datas = projects.map(d => { return { pepUserId: null, projectCorrelationId: d, alarmInfo: { source: sourceName, type: alarmGroup },//包含告警id,type,source confirmTime: time, confirmContent: '自动恢复' } }) let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true }); let dynamics = rslt.map(r => { return { time: r.confirmTime, alarmConfirmId: r.id, projectCorrelationId: r.projectCorrelationId, type: 4//告警确认 } }) await models.LatestDynamicList.bulkCreate(dynamics); //消息推送到前端 if (datas.length) { await sendConfirmToWeb(datas, true); } } } // } else { // console.log(`获取结构物列表失败`); // } } catch (error) { console.error(error); } } } catch (error) { console.log(error); } }