|
@ -17,7 +17,7 @@ module.exports = async function factory(app, opts) { |
|
|
let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId, fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm']) |
|
|
let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId, fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm']) |
|
|
consumer.on('message', async function (message) { |
|
|
consumer.on('message', async function (message) { |
|
|
let msg = JSON.parse(message.value) |
|
|
let msg = JSON.parse(message.value) |
|
|
await savePullAlarm(msg); |
|
|
await handleMsg(msg); |
|
|
}) |
|
|
}) |
|
|
|
|
|
|
|
|
const kafka = { |
|
|
const kafka = { |
|
@ -29,48 +29,54 @@ module.exports = async function factory(app, opts) { |
|
|
app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success"); |
|
|
app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// const topics = [{ topic: 'anxinyun_alarm', partition: 0 }]
|
|
|
/*----------------------推送节流---20230103---------------------------------*/ |
|
|
// const options = {
|
|
|
//全局变量
|
|
|
// // groupId: 'topic-test-one',
|
|
|
let kafkaMsgObj = { |
|
|
// autoCommit: false,
|
|
|
time: null, |
|
|
// //fromOffset: true,
|
|
|
dataMap: [], |
|
|
// fromOffset: 'latest'
|
|
|
toHandle: false//有待处理
|
|
|
// }
|
|
|
} |
|
|
// const consumer = new Kafka.Consumer(client, topics, options)
|
|
|
async function handleMsg(msg) { |
|
|
// consumer.on("ready", function () {
|
|
|
try { |
|
|
// console.log('consumer ready 666666666666')
|
|
|
let { messageMode } = msg |
|
|
// })
|
|
|
if (['AlarmGeneration', 'AlarmAutoElimination'].indexOf(messageMode) != -1) { |
|
|
// // consumer.on("message", function (w) {
|
|
|
if (!kafkaMsgObj.time || moment() > moment(kafkaMsgObj.time).add(2, 'minute')) {//首次 || 跟上次时间间隔大于2分钟 直接发送
|
|
|
// // console.log('consumer ready 666666666666')
|
|
|
let structsAche = await getStructsAche() |
|
|
// // })
|
|
|
if (structsAche.dataList.length) { |
|
|
// consumer.on('message', function (message) {
|
|
|
await savePullAlarm(msg, structsAche.dataList)//直接给处理了
|
|
|
// const decodedMessage = JSON.parse(message.value)
|
|
|
|
|
|
// console.log('decodedMessage: ', decodedMessage)
|
|
|
kafkaMsgObj.time = moment().format();//记录本次发送时间
|
|
|
// })
|
|
|
kafkaMsgObj.dataMap = [] |
|
|
// consumer.on('error', function (err) {
|
|
|
kafkaMsgObj.toHandle = false |
|
|
// console.log('consumer err:')
|
|
|
} |
|
|
// console.log(err);
|
|
|
} else {//放进队列
|
|
|
// });
|
|
|
kafkaMsgObj.dataMap.push(msg); |
|
|
|
|
|
kafkaMsgObj.toHandle = true;//有待发送
|
|
|
|
|
|
} |
|
|
// let offset = new Kafka.Offset(client);
|
|
|
} |
|
|
// consumer.on('offsetOutOfRange', function (topic) {
|
|
|
} catch (error) { |
|
|
// console.log('offsetOutOfRange')
|
|
|
console.error(error); |
|
|
// // consumer.setOffset('topic', 0, 0);
|
|
|
} |
|
|
// // topic.maxNum = 1;
|
|
|
} |
|
|
// offset.fetch([topic], function (err, offsets) {
|
|
|
setInterval(async () => { |
|
|
// if (err) {
|
|
|
if (kafkaMsgObj.toHandle) {//有待发送
|
|
|
// return console.error(err);
|
|
|
let structsAche = await getStructsAche() |
|
|
// }
|
|
|
if (structsAche.dataList.length) { |
|
|
// try {
|
|
|
|
|
|
// const max = Math.max.apply(null, offsets[topic.topic][topic.partition]);
|
|
|
kafkaMsgObj.dataMap.forEach(async d => { |
|
|
// consumer.setOffset(topic.topic, topic.partition, max);
|
|
|
await savePullAlarm(d, structsAche.dataList) |
|
|
// } catch (error) {
|
|
|
}) |
|
|
// console.log(error);
|
|
|
|
|
|
// }
|
|
|
kafkaMsgObj.time = moment().format() |
|
|
|
|
|
kafkaMsgObj.dataMap = [] |
|
|
// });
|
|
|
kafkaMsgObj.toHandle = false |
|
|
// });
|
|
|
} else { |
|
|
|
|
|
console.log(`获取结构物列表失败`); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
}, 60 * 1000 * 2)//2分钟
|
|
|
|
|
|
/*----------------------推送节流---20230103---------------------------------*/ |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let structsAche = { |
|
|
let structsAche = { |
|
|
dataList: [], |
|
|
dataList: [], |
|
@ -100,86 +106,86 @@ module.exports = async function factory(app, opts) { |
|
|
4: '设备异常', |
|
|
4: '设备异常', |
|
|
5: '设备异常' |
|
|
5: '设备异常' |
|
|
} |
|
|
} |
|
|
async function savePullAlarm(msg) { |
|
|
async function savePullAlarm(msg, structs) { |
|
|
const { clickHouse, utils: { sendAppearToWeb, sendConfirmToWeb } } = app.fs |
|
|
const { clickHouse, utils: { sendAppearToWeb, sendConfirmToWeb } } = app.fs |
|
|
try { |
|
|
try { |
|
|
let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg; |
|
|
let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg; |
|
|
let structsAche = await getStructsAche() |
|
|
// let structsAche = await getStructsAche()
|
|
|
if (structsAche) { |
|
|
// if (structsAche) {
|
|
|
let structs = structsAche.dataList;//结构物列表
|
|
|
//let structs = structsAche.dataList;//结构物列表
|
|
|
const { models } = app.fs.dc |
|
|
const { models } = app.fs.dc |
|
|
let exist = structs.find(s => s.strucId == structureId); |
|
|
let exist = structs.find(s => s.strucId == structureId); |
|
|
if (exist) { |
|
|
if (exist) { |
|
|
let alarm_group = await clickHouse.anxinyun.query( |
|
|
let alarm_group = await clickHouse.anxinyun.query( |
|
|
`SELECT alarm_group, alarm_group_unit FROM t_alarm_code WHERE code='${alarmCode}'`).toPromise() |
|
|
`SELECT alarm_group, alarm_group_unit FROM t_alarm_code WHERE code='${alarmCode}'`).toPromise() |
|
|
|
|
|
|
|
|
let type = null, alarmGroup = null;//告警类型 异常类型
|
|
|
let type = null, alarmGroup = null;//告警类型 异常类型
|
|
|
if (alarm_group) { |
|
|
if (alarm_group) { |
|
|
type = constAlarmGroups[alarm_group[0].alarm_group];//告警类型
|
|
|
type = constAlarmGroups[alarm_group[0].alarm_group];//告警类型
|
|
|
let gId = alarm_group[0].alarm_group_unit; |
|
|
let gId = alarm_group[0].alarm_group_unit; |
|
|
let alarm_group_unit = await clickHouse.anxinyun.query( |
|
|
let alarm_group_unit = await clickHouse.anxinyun.query( |
|
|
`SELECT name FROM t_alarm_group_unit WHERE id=${gId}`).toPromise(); |
|
|
`SELECT name FROM t_alarm_group_unit WHERE id=${gId}`).toPromise(); |
|
|
alarmGroup = alarm_group_unit.length ? alarm_group_unit[0].name : null//异常类型
|
|
|
alarmGroup = alarm_group_unit.length ? alarm_group_unit[0].name : null//异常类型
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); |
|
|
let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); |
|
|
if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1
|
|
|
if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1
|
|
|
let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间
|
|
|
let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间
|
|
|
return { |
|
|
return { |
|
|
projectCorrelationId: d, |
|
|
projectCorrelationId: d, |
|
|
alarmInfo: { messageMode, sourceName, alarmTypeCode, alarmCode, content, type: alarmGroup }, |
|
|
alarmInfo: { messageMode, sourceName, alarmTypeCode, alarmCode, content, type: alarmGroup }, |
|
|
time: time, |
|
|
time: time, |
|
|
type//告警类型
|
|
|
type//告警类型
|
|
|
} |
|
|
} |
|
|
}) |
|
|
}) |
|
|
let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); |
|
|
let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); |
|
|
let dynamics = rslt.map(r => { |
|
|
let dynamics = rslt.map(r => { |
|
|
return { |
|
|
return { |
|
|
time: r.time, |
|
|
time: r.time, |
|
|
alarmAppearId: r.id, |
|
|
alarmAppearId: r.id, |
|
|
projectCorrelationId: r.projectCorrelationId, |
|
|
projectCorrelationId: r.projectCorrelationId, |
|
|
type: 1//发现
|
|
|
type: 1//发现
|
|
|
} |
|
|
|
|
|
}) |
|
|
|
|
|
await models.LatestDynamicList.bulkCreate(dynamics); |
|
|
|
|
|
|
|
|
|
|
|
//消息推送到前端
|
|
|
|
|
|
if (datas.length) { |
|
|
|
|
|
await sendAppearToWeb(datas, 'data'); |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
}) |
|
|
|
|
|
await models.LatestDynamicList.bulkCreate(dynamics); |
|
|
|
|
|
|
|
|
|
|
|
//消息推送到前端
|
|
|
|
|
|
if (datas.length) { |
|
|
|
|
|
await sendAppearToWeb(datas, 'data'); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2
|
|
|
} else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2
|
|
|
let datas = projects.map(d => { |
|
|
let datas = projects.map(d => { |
|
|
return { |
|
|
return { |
|
|
pepUserId: null, |
|
|
pepUserId: null, |
|
|
projectCorrelationId: d, |
|
|
projectCorrelationId: d, |
|
|
alarmInfo: { source: sourceName, type: alarmGroup },//包含告警id,type,source
|
|
|
alarmInfo: { source: sourceName, type: alarmGroup },//包含告警id,type,source
|
|
|
confirmTime: time, |
|
|
confirmTime: time, |
|
|
confirmContent: '自动恢复' |
|
|
confirmContent: '自动恢复' |
|
|
} |
|
|
|
|
|
}) |
|
|
|
|
|
let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true }); |
|
|
|
|
|
let dynamics = rslt.map(r => { |
|
|
|
|
|
return { |
|
|
|
|
|
time: r.confirmTime, |
|
|
|
|
|
alarmConfirmId: r.id, |
|
|
|
|
|
projectCorrelationId: r.projectCorrelationId, |
|
|
|
|
|
type: 4//告警确认
|
|
|
|
|
|
} |
|
|
|
|
|
}) |
|
|
|
|
|
await models.LatestDynamicList.bulkCreate(dynamics); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//消息推送到前端
|
|
|
|
|
|
if (datas.length) { |
|
|
|
|
|
await sendConfirmToWeb(datas, true); |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
}) |
|
|
|
|
|
let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true }); |
|
|
|
|
|
let dynamics = rslt.map(r => { |
|
|
|
|
|
return { |
|
|
|
|
|
time: r.confirmTime, |
|
|
|
|
|
alarmConfirmId: r.id, |
|
|
|
|
|
projectCorrelationId: r.projectCorrelationId, |
|
|
|
|
|
type: 4//告警确认
|
|
|
|
|
|
} |
|
|
|
|
|
}) |
|
|
|
|
|
await models.LatestDynamicList.bulkCreate(dynamics); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//消息推送到前端
|
|
|
|
|
|
if (datas.length) { |
|
|
|
|
|
await sendConfirmToWeb(datas, true); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} else { |
|
|
|
|
|
console.log(`获取结构物列表失败`); |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
// } else {
|
|
|
|
|
|
// console.log(`获取结构物列表失败`);
|
|
|
|
|
|
// }
|
|
|
} catch (error) { |
|
|
} catch (error) { |
|
|
console.error(error); |
|
|
console.error(error); |
|
|
} |
|
|
} |
|
|