diff --git a/api/.vscode/launch.json b/api/.vscode/launch.json index 5e3848a..8125a3e 100644 --- a/api/.vscode/launch.json +++ b/api/.vscode/launch.json @@ -59,7 +59,7 @@ // 测试 "--clickHouseAnxincloud anxinyun88", "--clickHousePepEmis pepca8", - "--clickHouseProjectManage peppm8", + "--clickHouseProjectManage peppm", "--clickHouseVcmp video_access_dev", "--clickHouseDataAlarm default", "--clickHouseIot iot", diff --git a/api/app/lib/service/kafka.js b/api/app/lib/service/kafka.js index 5e85899..889b402 100644 --- a/api/app/lib/service/kafka.js +++ b/api/app/lib/service/kafka.js @@ -17,7 +17,7 @@ module.exports = async function factory(app, opts) { let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId, fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm']) consumer.on('message', async function (message) { let msg = JSON.parse(message.value) - await savePullAlarm(msg); + await handleMsg(msg); }) const kafka = { @@ -29,48 +29,54 @@ module.exports = async function factory(app, opts) { app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success"); - // const topics = [{ topic: 'anxinyun_alarm', partition: 0 }] - // const options = { - // // groupId: 'topic-test-one', - // autoCommit: false, - // //fromOffset: true, - // fromOffset: 'latest' - // } - // const consumer = new Kafka.Consumer(client, topics, options) - // consumer.on("ready", function () { - // console.log('consumer ready 666666666666') - // }) - // // consumer.on("message", function (w) { - // // console.log('consumer ready 666666666666') - // // }) - // consumer.on('message', function (message) { - // const decodedMessage = JSON.parse(message.value) - // console.log('decodedMessage: ', decodedMessage) - // }) - // consumer.on('error', function (err) { - // console.log('consumer err:') - // console.log(err); - // }); - - - // let offset = new Kafka.Offset(client); - // consumer.on('offsetOutOfRange', function (topic) { - // console.log('offsetOutOfRange') - // // consumer.setOffset('topic', 0, 0); - // // topic.maxNum = 1; - // offset.fetch([topic], function (err, offsets) { - // if (err) { - // return console.error(err); - // } - // try { - // const max = Math.max.apply(null, offsets[topic.topic][topic.partition]); - // consumer.setOffset(topic.topic, topic.partition, max); - // } catch (error) { - // console.log(error); - // } - - // }); - // }); + /*----------------------推送节流---20230103---------------------------------*/ + //全局变量 + let kafkaMsgObj = { + time: null, + dataMap: [], + toHandle: false//有待处理 + } + async function handleMsg(msg) { + try { + let { messageMode } = msg + if (['AlarmGeneration', 'AlarmAutoElimination'].indexOf(messageMode) != -1) { + if (!kafkaMsgObj.time || moment() > moment(kafkaMsgObj.time).add(2, 'minute')) {//首次 || 跟上次时间间隔大于2分钟 直接发送 + let structsAche = await getStructsAche() + if (structsAche.dataList.length) { + await savePullAlarm(msg, structsAche.dataList)//直接给处理了 + + kafkaMsgObj.time = moment().format();//记录本次发送时间 + kafkaMsgObj.dataMap = [] + kafkaMsgObj.toHandle = false + } + } else {//放进队列 + kafkaMsgObj.dataMap.push(msg); + kafkaMsgObj.toHandle = true;//有待发送 + } + } + } catch (error) { + console.error(error); + } + } + setInterval(async () => { + if (kafkaMsgObj.toHandle) {//有待发送 + let structsAche = await getStructsAche() + if (structsAche.dataList.length) { + + kafkaMsgObj.dataMap.forEach(async d => { + await savePullAlarm(d, structsAche.dataList) + }) + + kafkaMsgObj.time = moment().format() + kafkaMsgObj.dataMap = [] + kafkaMsgObj.toHandle = false + } else { + console.log(`获取结构物列表失败`); + } + } + }, 60 * 1000 * 2)//2分钟 + /*----------------------推送节流---20230103---------------------------------*/ + let structsAche = { dataList: [], @@ -100,86 +106,86 @@ module.exports = async function factory(app, opts) { 4: '设备异常', 5: '设备异常' } - async function savePullAlarm(msg) { + async function savePullAlarm(msg, structs) { const { clickHouse, utils: { sendAppearToWeb, sendConfirmToWeb } } = app.fs try { let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg; - let structsAche = await getStructsAche() - if (structsAche) { - let structs = structsAche.dataList;//结构物列表 - const { models } = app.fs.dc - let exist = structs.find(s => s.strucId == structureId); - if (exist) { - let alarm_group = await clickHouse.anxinyun.query( - `SELECT alarm_group, alarm_group_unit FROM t_alarm_code WHERE code='${alarmCode}'`).toPromise() - - let type = null, alarmGroup = null;//告警类型 异常类型 - if (alarm_group) { - type = constAlarmGroups[alarm_group[0].alarm_group];//告警类型 - let gId = alarm_group[0].alarm_group_unit; - let alarm_group_unit = await clickHouse.anxinyun.query( - `SELECT name FROM t_alarm_group_unit WHERE id=${gId}`).toPromise(); - alarmGroup = alarm_group_unit.length ? alarm_group_unit[0].name : null//异常类型 - } + // let structsAche = await getStructsAche() + // if (structsAche) { + //let structs = structsAche.dataList;//结构物列表 + const { models } = app.fs.dc + let exist = structs.find(s => s.strucId == structureId); + if (exist) { + let alarm_group = await clickHouse.anxinyun.query( + `SELECT alarm_group, alarm_group_unit FROM t_alarm_code WHERE code='${alarmCode}'`).toPromise() + + let type = null, alarmGroup = null;//告警类型 异常类型 + if (alarm_group) { + type = constAlarmGroups[alarm_group[0].alarm_group];//告警类型 + let gId = alarm_group[0].alarm_group_unit; + let alarm_group_unit = await clickHouse.anxinyun.query( + `SELECT name FROM t_alarm_group_unit WHERE id=${gId}`).toPromise(); + alarmGroup = alarm_group_unit.length ? alarm_group_unit[0].name : null//异常类型 + } - let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); - if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1 - let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间 - return { - projectCorrelationId: d, - alarmInfo: { messageMode, sourceName, alarmTypeCode, alarmCode, content, type: alarmGroup }, - time: time, - type//告警类型 - } - }) - let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); - let dynamics = rslt.map(r => { - return { - time: r.time, - alarmAppearId: r.id, - projectCorrelationId: r.projectCorrelationId, - type: 1//发现 - } - }) - await models.LatestDynamicList.bulkCreate(dynamics); - - //消息推送到前端 - if (datas.length) { - await sendAppearToWeb(datas, 'data'); + let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); + if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1 + let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间 + return { + projectCorrelationId: d, + alarmInfo: { messageMode, sourceName, alarmTypeCode, alarmCode, content, type: alarmGroup }, + time: time, + type//告警类型 + } + }) + let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); + let dynamics = rslt.map(r => { + return { + time: r.time, + alarmAppearId: r.id, + projectCorrelationId: r.projectCorrelationId, + type: 1//发现 } + }) + await models.LatestDynamicList.bulkCreate(dynamics); + + //消息推送到前端 + if (datas.length) { + await sendAppearToWeb(datas, 'data'); + } - } else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2 - let datas = projects.map(d => { - return { - pepUserId: null, - projectCorrelationId: d, - alarmInfo: { source: sourceName, type: alarmGroup },//包含告警id,type,source - confirmTime: time, - confirmContent: '自动恢复' - } - }) - let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true }); - let dynamics = rslt.map(r => { - return { - time: r.confirmTime, - alarmConfirmId: r.id, - projectCorrelationId: r.projectCorrelationId, - type: 4//告警确认 - } - }) - await models.LatestDynamicList.bulkCreate(dynamics); - - - //消息推送到前端 - if (datas.length) { - await sendConfirmToWeb(datas, true); + } else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2 + let datas = projects.map(d => { + return { + pepUserId: null, + projectCorrelationId: d, + alarmInfo: { source: sourceName, type: alarmGroup },//包含告警id,type,source + confirmTime: time, + confirmContent: '自动恢复' } + }) + let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true }); + let dynamics = rslt.map(r => { + return { + time: r.confirmTime, + alarmConfirmId: r.id, + projectCorrelationId: r.projectCorrelationId, + type: 4//告警确认 + } + }) + await models.LatestDynamicList.bulkCreate(dynamics); + + + //消息推送到前端 + if (datas.length) { + await sendConfirmToWeb(datas, true); } } - } else { - console.log(`获取结构物列表失败`); } + // } else { + // console.log(`获取结构物列表失败`); + // } } catch (error) { console.error(error); }