运维服务中台
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

197 lines
7.8 KiB

'use strict';
const moment = require('moment');
const Kafka = require('kafka-node');
2 years ago
module.exports = async function factory(app, opts) {
try {
const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL, fromOffset: true });
const producer = new Kafka.HighLevelProducer(client);
producer.on('error', function (err) {
app.fs.logger.log('error', "[FS-KAFKA]", err);
});
producer.on("ready", function () {
2 years ago
console.log('Kafka ready')
})
let groupId = 'group' + Math.random()
let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId, fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm'])
consumer.on('message', async function (message) {
let msg = JSON.parse(message.value)
await handleMsg(msg);
})
const kafka = {
producer: producer,
configUpdateMessage: opts.configUpdateMessage || {}
};
app.fs.kafka = kafka;
app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success");
/*----------------------推送节流---20230103---------------------------------*/
//全局变量
let kafkaMsgObj = {
time: null,
dataMap: [],
toHandle: false//有待处理
}
async function handleMsg(msg) {
try {
let { messageMode } = msg
if (['AlarmGeneration', 'AlarmAutoElimination'].indexOf(messageMode) != -1) {
if (!kafkaMsgObj.time || moment() > moment(kafkaMsgObj.time).add(2, 'minute')) {//首次 || 跟上次时间间隔大于2分钟 直接发送
let structsAche = await getStructsAche()
if (structsAche.dataList.length) {
await savePullAlarm(msg, structsAche.dataList)//直接给处理了
kafkaMsgObj.time = moment().format();//记录本次发送时间
kafkaMsgObj.dataMap = []
kafkaMsgObj.toHandle = false
}
} else {//放进队列
kafkaMsgObj.dataMap.push(msg);
kafkaMsgObj.toHandle = true;//有待发送
}
}
} catch (error) {
console.error(error);
}
}
setInterval(async () => {
if (kafkaMsgObj.toHandle) {//有待发送
let structsAche = await getStructsAche()
if (structsAche.dataList.length) {
kafkaMsgObj.dataMap.forEach(async d => {
await savePullAlarm(d, structsAche.dataList)
})
kafkaMsgObj.time = moment().format()
kafkaMsgObj.dataMap = []
kafkaMsgObj.toHandle = false
} else {
console.log(`获取结构物列表失败`);
}
}
}, 60 * 1000 * 2)//2分钟
/*----------------------推送节流---20230103---------------------------------*/
let structsAche = {
dataList: [],
expireTime: null//10分钟更新一次结构物列表
}
async function getStructsAche() {
const { utils: { getAxyStructs } } = app.fs
try {
if (!structsAche.dataList.length || moment() > moment(structsAche.expireTime)) {
let structList = await getAxyStructs();
if (structList && structList.length) {
structsAche.dataList = structList;
structsAche.expireTime = moment().add(10, 'minute').format('YYYY-MM-DD HH:mm:ss');
}
}
return structsAche;
} catch (err) {
console.log(`获取结构物列表失败, error: ${err}`);
}
}
//保存告警[发现]
let constAlarmGroups = {
1: '数据中断',
2: '数据异常',
3: '策略命中',
4: '设备异常',
5: '设备异常'
}
async function savePullAlarm(msg, structs) {
const { clickHouse, utils: { sendAppearToWeb, sendConfirmToWeb } } = app.fs
try {
let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg;
// let structsAche = await getStructsAche()
// if (structsAche) {
//let structs = structsAche.dataList;//结构物列表
const { models } = app.fs.dc
let exist = structs.find(s => s.strucId == structureId);
if (exist) {
let alarm_group = await clickHouse.anxinyun.query(
`SELECT alarm_group, alarm_group_unit FROM t_alarm_code WHERE code='${alarmCode}'`).toPromise()
let type = null, typeId = null, alarmGroup = null;//告警类型 异常类型
if (alarm_group) {
typeId = alarm_group[0].alarm_group;
type = constAlarmGroups[alarm_group[0].alarm_group];//告警类型
let gId = alarm_group[0].alarm_group_unit;
let alarm_group_unit = await clickHouse.anxinyun.query(
`SELECT name FROM t_alarm_group_unit WHERE id=${gId}`).toPromise();
alarmGroup = alarm_group_unit.length ? alarm_group_unit[0].name : null//异常类型
}
let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id);
if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1
let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间
return {
projectCorrelationId: d,
alarmInfo: { messageMode, sourceName, alarmTypeCode, alarmCode, content, type: alarmGroup, typeId },
time: time,
type//告警类型
}
})
let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true });
let dynamics = rslt.map(r => {
return {
time: r.time,
alarmAppearId: r.id,
projectCorrelationId: r.projectCorrelationId,
type: 1//发现
}
})
await models.LatestDynamicList.bulkCreate(dynamics);
//消息推送到前端
if (datas.length) {
await sendAppearToWeb(datas, 'data', exist);
}
} else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2
let datas = projects.map(d => {
return {
pepUserId: null,
projectCorrelationId: d,
alarmInfo: { source: sourceName, type: alarmGroup },//包含告警id,type,source
confirmTime: time,
confirmContent: '自动恢复'
}
})
let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true });
let dynamics = rslt.map(r => {
return {
time: r.confirmTime,
alarmConfirmId: r.id,
projectCorrelationId: r.projectCorrelationId,
type: 4//告警确认
}
})
await models.LatestDynamicList.bulkCreate(dynamics);
//消息推送到前端
if (datas.length) {
await sendConfirmToWeb(datas, true);
}
}
}
// } else {
// console.log(`获取结构物列表失败`);
// }
} catch (error) {
console.error(error);
}
}
} catch (error) {
console.log(error);
}
}