Compare commits

...

2 Commits

  1. 104
      api/app/lib/service/kafka.js

104
api/app/lib/service/kafka.js

@ -17,7 +17,7 @@ module.exports = async function factory(app, opts) {
let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId, fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm'])
consumer.on('message', async function (message) {
let msg = JSON.parse(message.value)
await savePullAlarm(msg);
await handleMsg(msg);
})
const kafka = {
@ -29,48 +29,54 @@ module.exports = async function factory(app, opts) {
app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success");
// const topics = [{ topic: 'anxinyun_alarm', partition: 0 }]
// const options = {
// // groupId: 'topic-test-one',
// autoCommit: false,
// //fromOffset: true,
// fromOffset: 'latest'
// }
// const consumer = new Kafka.Consumer(client, topics, options)
// consumer.on("ready", function () {
// console.log('consumer ready 666666666666')
// })
// // consumer.on("message", function (w) {
// // console.log('consumer ready 666666666666')
// // })
// consumer.on('message', function (message) {
// const decodedMessage = JSON.parse(message.value)
// console.log('decodedMessage: ', decodedMessage)
// })
// consumer.on('error', function (err) {
// console.log('consumer err:')
// console.log(err);
// });
// let offset = new Kafka.Offset(client);
// consumer.on('offsetOutOfRange', function (topic) {
// console.log('offsetOutOfRange')
// // consumer.setOffset('topic', 0, 0);
// // topic.maxNum = 1;
// offset.fetch([topic], function (err, offsets) {
// if (err) {
// return console.error(err);
// }
// try {
// const max = Math.max.apply(null, offsets[topic.topic][topic.partition]);
// consumer.setOffset(topic.topic, topic.partition, max);
// } catch (error) {
// console.log(error);
// }
/*----------------------推送节流---20230103---------------------------------*/
//全局变量
let kafkaMsgObj = {
time: null,
dataMap: [],
toHandle: false//有待处理
}
async function handleMsg(msg) {
try {
let { messageMode } = msg
if (['AlarmGeneration', 'AlarmAutoElimination'].indexOf(messageMode) != -1) {
if (!kafkaMsgObj.time || moment() > moment(kafkaMsgObj.time).add(2, 'minute')) {//首次 || 跟上次时间间隔大于2分钟 直接发送
let structsAche = await getStructsAche()
if (structsAche.dataList.length) {
await savePullAlarm(msg, structsAche.dataList)//直接给处理了
kafkaMsgObj.time = moment().format();//记录本次发送时间
kafkaMsgObj.dataMap = []
kafkaMsgObj.toHandle = false
}
} else {//放进队列
kafkaMsgObj.dataMap.push(msg);
kafkaMsgObj.toHandle = true;//有待发送
}
}
} catch (error) {
console.error(error);
}
}
setInterval(async () => {
if (kafkaMsgObj.toHandle) {//有待发送
let structsAche = await getStructsAche()
if (structsAche.dataList.length) {
kafkaMsgObj.dataMap.forEach(async d => {
await savePullAlarm(d, structsAche.dataList)
})
kafkaMsgObj.time = moment().format()
kafkaMsgObj.dataMap = []
kafkaMsgObj.toHandle = false
} else {
console.log(`获取结构物列表失败`);
}
}
}, 60 * 1000 * 2)//2分钟
/*----------------------推送节流---20230103---------------------------------*/
// });
// });
let structsAche = {
dataList: [],
@ -100,13 +106,13 @@ module.exports = async function factory(app, opts) {
4: '设备异常',
5: '设备异常'
}
async function savePullAlarm(msg) {
async function savePullAlarm(msg, structs) {
const { clickHouse, utils: { sendAppearToWeb, sendConfirmToWeb } } = app.fs
try {
let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg;
let structsAche = await getStructsAche()
if (structsAche) {
let structs = structsAche.dataList;//结构物列表
// let structsAche = await getStructsAche()
// if (structsAche) {
//let structs = structsAche.dataList;//结构物列表
const { models } = app.fs.dc
let exist = structs.find(s => s.strucId == structureId);
if (exist) {
@ -177,9 +183,9 @@ module.exports = async function factory(app, opts) {
}
}
}
} else {
console.log(`获取结构物列表失败`);
}
// } else {
// console.log(`获取结构物列表失败`);
// }
} catch (error) {
console.error(error);
}

Loading…
Cancel
Save