Compare commits

...

2 Commits

  1. 230
      api/app/lib/service/kafka.js

230
api/app/lib/service/kafka.js

@ -17,7 +17,7 @@ module.exports = async function factory(app, opts) {
let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId, fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm']) let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId, fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm'])
consumer.on('message', async function (message) { consumer.on('message', async function (message) {
let msg = JSON.parse(message.value) let msg = JSON.parse(message.value)
await savePullAlarm(msg); await handleMsg(msg);
}) })
const kafka = { const kafka = {
@ -29,48 +29,54 @@ module.exports = async function factory(app, opts) {
app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success"); app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success");
// const topics = [{ topic: 'anxinyun_alarm', partition: 0 }] /*----------------------推送节流---20230103---------------------------------*/
// const options = { //全局变量
// // groupId: 'topic-test-one', let kafkaMsgObj = {
// autoCommit: false, time: null,
// //fromOffset: true, dataMap: [],
// fromOffset: 'latest' toHandle: false//有待处理
// } }
// const consumer = new Kafka.Consumer(client, topics, options) async function handleMsg(msg) {
// consumer.on("ready", function () { try {
// console.log('consumer ready 666666666666') let { messageMode } = msg
// }) if (['AlarmGeneration', 'AlarmAutoElimination'].indexOf(messageMode) != -1) {
// // consumer.on("message", function (w) { if (!kafkaMsgObj.time || moment() > moment(kafkaMsgObj.time).add(2, 'minute')) {//首次 || 跟上次时间间隔大于2分钟 直接发送
// // console.log('consumer ready 666666666666') let structsAche = await getStructsAche()
// // }) if (structsAche.dataList.length) {
// consumer.on('message', function (message) { await savePullAlarm(msg, structsAche.dataList)//直接给处理了
// const decodedMessage = JSON.parse(message.value)
// console.log('decodedMessage: ', decodedMessage) kafkaMsgObj.time = moment().format();//记录本次发送时间
// }) kafkaMsgObj.dataMap = []
// consumer.on('error', function (err) { kafkaMsgObj.toHandle = false
// console.log('consumer err:') }
// console.log(err); } else {//放进队列
// }); kafkaMsgObj.dataMap.push(msg);
kafkaMsgObj.toHandle = true;//有待发送
}
// let offset = new Kafka.Offset(client); }
// consumer.on('offsetOutOfRange', function (topic) { } catch (error) {
// console.log('offsetOutOfRange') console.error(error);
// // consumer.setOffset('topic', 0, 0); }
// // topic.maxNum = 1; }
// offset.fetch([topic], function (err, offsets) { setInterval(async () => {
// if (err) { if (kafkaMsgObj.toHandle) {//有待发送
// return console.error(err); let structsAche = await getStructsAche()
// } if (structsAche.dataList.length) {
// try {
// const max = Math.max.apply(null, offsets[topic.topic][topic.partition]); kafkaMsgObj.dataMap.forEach(async d => {
// consumer.setOffset(topic.topic, topic.partition, max); await savePullAlarm(d, structsAche.dataList)
// } catch (error) { })
// console.log(error);
// } kafkaMsgObj.time = moment().format()
kafkaMsgObj.dataMap = []
// }); kafkaMsgObj.toHandle = false
// }); } else {
console.log(`获取结构物列表失败`);
}
}
}, 60 * 1000 * 2)//2分钟
/*----------------------推送节流---20230103---------------------------------*/
let structsAche = { let structsAche = {
dataList: [], dataList: [],
@ -100,86 +106,86 @@ module.exports = async function factory(app, opts) {
4: '设备异常', 4: '设备异常',
5: '设备异常' 5: '设备异常'
} }
async function savePullAlarm(msg) { async function savePullAlarm(msg, structs) {
const { clickHouse, utils: { sendAppearToWeb, sendConfirmToWeb } } = app.fs const { clickHouse, utils: { sendAppearToWeb, sendConfirmToWeb } } = app.fs
try { try {
let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg; let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg;
let structsAche = await getStructsAche() // let structsAche = await getStructsAche()
if (structsAche) { // if (structsAche) {
let structs = structsAche.dataList;//结构物列表 //let structs = structsAche.dataList;//结构物列表
const { models } = app.fs.dc const { models } = app.fs.dc
let exist = structs.find(s => s.strucId == structureId); let exist = structs.find(s => s.strucId == structureId);
if (exist) { if (exist) {
let alarm_group = await clickHouse.anxinyun.query( let alarm_group = await clickHouse.anxinyun.query(
`SELECT alarm_group, alarm_group_unit FROM t_alarm_code WHERE code='${alarmCode}'`).toPromise() `SELECT alarm_group, alarm_group_unit FROM t_alarm_code WHERE code='${alarmCode}'`).toPromise()
let type = null, alarmGroup = null;//告警类型 异常类型 let type = null, alarmGroup = null;//告警类型 异常类型
if (alarm_group) { if (alarm_group) {
type = constAlarmGroups[alarm_group[0].alarm_group];//告警类型 type = constAlarmGroups[alarm_group[0].alarm_group];//告警类型
let gId = alarm_group[0].alarm_group_unit; let gId = alarm_group[0].alarm_group_unit;
let alarm_group_unit = await clickHouse.anxinyun.query( let alarm_group_unit = await clickHouse.anxinyun.query(
`SELECT name FROM t_alarm_group_unit WHERE id=${gId}`).toPromise(); `SELECT name FROM t_alarm_group_unit WHERE id=${gId}`).toPromise();
alarmGroup = alarm_group_unit.length ? alarm_group_unit[0].name : null//异常类型 alarmGroup = alarm_group_unit.length ? alarm_group_unit[0].name : null//异常类型
} }
let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id);
if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1 if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1
let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间 let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间
return { return {
projectCorrelationId: d, projectCorrelationId: d,
alarmInfo: { messageMode, sourceName, alarmTypeCode, alarmCode, content, type: alarmGroup }, alarmInfo: { messageMode, sourceName, alarmTypeCode, alarmCode, content, type: alarmGroup },
time: time, time: time,
type//告警类型 type//告警类型
} }
}) })
let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true });
let dynamics = rslt.map(r => { let dynamics = rslt.map(r => {
return { return {
time: r.time, time: r.time,
alarmAppearId: r.id, alarmAppearId: r.id,
projectCorrelationId: r.projectCorrelationId, projectCorrelationId: r.projectCorrelationId,
type: 1//发现 type: 1//发现
}
})
await models.LatestDynamicList.bulkCreate(dynamics);
//消息推送到前端
if (datas.length) {
await sendAppearToWeb(datas, 'data');
} }
})
await models.LatestDynamicList.bulkCreate(dynamics);
//消息推送到前端
if (datas.length) {
await sendAppearToWeb(datas, 'data');
}
} else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2 } else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2
let datas = projects.map(d => { let datas = projects.map(d => {
return { return {
pepUserId: null, pepUserId: null,
projectCorrelationId: d, projectCorrelationId: d,
alarmInfo: { source: sourceName, type: alarmGroup },//包含告警id,type,source alarmInfo: { source: sourceName, type: alarmGroup },//包含告警id,type,source
confirmTime: time, confirmTime: time,
confirmContent: '自动恢复' confirmContent: '自动恢复'
}
})
let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true });
let dynamics = rslt.map(r => {
return {
time: r.confirmTime,
alarmConfirmId: r.id,
projectCorrelationId: r.projectCorrelationId,
type: 4//告警确认
}
})
await models.LatestDynamicList.bulkCreate(dynamics);
//消息推送到前端
if (datas.length) {
await sendConfirmToWeb(datas, true);
} }
})
let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true });
let dynamics = rslt.map(r => {
return {
time: r.confirmTime,
alarmConfirmId: r.id,
projectCorrelationId: r.projectCorrelationId,
type: 4//告警确认
}
})
await models.LatestDynamicList.bulkCreate(dynamics);
//消息推送到前端
if (datas.length) {
await sendConfirmToWeb(datas, true);
} }
} }
} else {
console.log(`获取结构物列表失败`);
} }
// } else {
// console.log(`获取结构物列表失败`);
// }
} catch (error) { } catch (error) {
console.error(error); console.error(error);
} }

Loading…
Cancel
Save