巴林闲侠
2 years ago
5 changed files with 351 additions and 138 deletions
@ -1,150 +1,171 @@ |
|||
'use strict'; |
|||
const moment = require('moment'); |
|||
const Kafka = require('kafka-node'); |
|||
//const { getAxyStructs } = require('../utils/helper');
|
|||
|
|||
const { sendAppearToWeb, sendConfirmToWeb } = require('../controllers/alarm/alarmConfirmLog') |
|||
module.exports = async function factory(app, opts) { |
|||
try { |
|||
const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL, fromOffset: true }); |
|||
const producer = new Kafka.HighLevelProducer(client); |
|||
producer.on('error', function (err) { |
|||
app.fs.logger.log('error', "[FS-KAFKA]", err); |
|||
}); |
|||
producer.on("ready", function () { |
|||
console.log('111111 ready 666666666666') |
|||
}) |
|||
|
|||
// const kafka = {
|
|||
// producer: producer,
|
|||
// configUpdateMessage: opts.configUpdateMessage || {}
|
|||
// };
|
|||
|
|||
let structsAche = { |
|||
dataList: [], |
|||
expireTime: null//10分钟更新一次结构物列表
|
|||
} |
|||
|
|||
const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL }); |
|||
const producer = new Kafka.HighLevelProducer(client); |
|||
producer.on('error', function (err) { |
|||
app.fs.logger.log('error', "[FS-KAFKA]", err); |
|||
}); |
|||
|
|||
const kafka = { |
|||
producer: producer, |
|||
configUpdateMessage: opts.configUpdateMessage || {} |
|||
}; |
|||
// app.fs.kafka = kafka;
|
|||
app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success"); |
|||
|
|||
app.fs.kafka = kafka; |
|||
app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success"); |
|||
|
|||
////------------------------------------------------------------------- try try try
|
|||
// const topics = [{ topic: 'anxinyun_alarm', partition: 0 }]
|
|||
// const options = {
|
|||
// // groupId: 'topic-test-one',
|
|||
// autoCommit: false,
|
|||
// //fromOffset: true,
|
|||
// fromOffset: 'latest'
|
|||
// }
|
|||
// const consumer = new Kafka.Consumer(client, topics, options)
|
|||
// consumer.on("ready", function () {
|
|||
// console.log('consumer ready 666666666666')
|
|||
// })
|
|||
// // consumer.on("message", function (w) {
|
|||
// // console.log('consumer ready 666666666666')
|
|||
// // })
|
|||
// consumer.on('message', function (message) {
|
|||
// const decodedMessage = JSON.parse(message.value)
|
|||
// console.log('decodedMessage: ', decodedMessage)
|
|||
// })
|
|||
// consumer.on('error', function (err) {
|
|||
// console.log('consumer err:')
|
|||
// console.log(err);
|
|||
// });
|
|||
|
|||
// setTimeout(async () => {
|
|||
// let msg = {
|
|||
// "messageMode": "AlarmGeneration",
|
|||
// "structureId": 1043,
|
|||
// "structureName": "TPA 数据波动影响分析及实验",
|
|||
// "sourceId": "727466e9-28c3-48f0-a320-3fb66b7e4151",
|
|||
// "sourceName": "忻德TPA1",
|
|||
// "alarmTypeCode": "3004",
|
|||
// "alarmCode": "3002",
|
|||
// "content": "link [soip:40001:00012532] is nil",
|
|||
// "time": "2022-10-31T11:21:14.000+0800",
|
|||
// "sourceTypeId": 1,
|
|||
// "sponsor": "et.recv",
|
|||
// "extras": null
|
|||
// }
|
|||
// await savePullAlarm(msg);
|
|||
// }, 3000)
|
|||
|
|||
const topics = [{ topic: 'anxinyun_alarm', partition: 0 }] |
|||
const options = { |
|||
groupId: 'topic-test-one', |
|||
autoCommit: true, |
|||
} |
|||
const consumer = new Kafka.Consumer(client, topics, options) |
|||
consumer.on("ready", function () { |
|||
console.log('consumer ready 666666666666') |
|||
}) |
|||
consumer.on('message', function (message) { |
|||
const buf = new Buffer(String(message.value), 'binary') |
|||
const decodedMessage = JSON.parse(buf.toString()) |
|||
console.log('decodedMessage: ', decodedMessage) |
|||
}) |
|||
consumer.on('error', function (err) { |
|||
console.log('error', err) |
|||
}) |
|||
process.on('SIGINT', function () { |
|||
consumer.close(true, function () { |
|||
process.exit() |
|||
let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId: 'yunwei-platform-api', fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm']) |
|||
consumer.on('message', async function (message) { |
|||
let msg = JSON.parse(message.value) |
|||
await savePullAlarm(msg); |
|||
}) |
|||
}) |
|||
// let offset = new Kafka.Offset(client);
|
|||
// consumer.on('offsetOutOfRange', function (topic) {
|
|||
// console.log('offsetOutOfRange')
|
|||
// // consumer.setOffset('topic', 0, 0);
|
|||
// // topic.maxNum = 1;
|
|||
// offset.fetch([topic], function (err, offsets) {
|
|||
// if (err) {
|
|||
// return console.error(err);
|
|||
// }
|
|||
// try {
|
|||
// const max = Math.max.apply(null, offsets[topic.topic][topic.partition]);
|
|||
// consumer.setOffset(topic.topic, topic.partition, max);
|
|||
// } catch (error) {
|
|||
// console.log(error);
|
|||
// }
|
|||
|
|||
// });
|
|||
// });
|
|||
|
|||
async function getStructsAche() { |
|||
const { utils: { getAxyStructs } } = app.fs |
|||
try { |
|||
if (!structsAche.dataList.length || moment() > moment(structsAche.expireTime)) { |
|||
let structList = await getAxyStructs(); |
|||
structsAche.dataList = structList; |
|||
structsAche.expireTime = moment().add(10, 'minute').format('YYYY-MM-DD HH:mm:ss'); |
|||
let structsAche = { |
|||
dataList: [], |
|||
expireTime: null//10分钟更新一次结构物列表
|
|||
} |
|||
async function getStructsAche() { |
|||
const { utils: { getAxyStructs } } = app.fs |
|||
try { |
|||
if (!structsAche.dataList.length || moment() > moment(structsAche.expireTime)) { |
|||
let structList = await getAxyStructs(); |
|||
structsAche.dataList = structList; |
|||
structsAche.expireTime = moment().add(10, 'minute').format('YYYY-MM-DD HH:mm:ss'); |
|||
} |
|||
return structsAche; |
|||
} catch (err) { |
|||
console.log(`获取结构物列表失败, error: ${err}`); |
|||
} |
|||
return structsAche; |
|||
} catch (err) { |
|||
console.log(`获取结构物列表失败, error: ${err}`); |
|||
} |
|||
} |
|||
|
|||
//保存告警[发现]
|
|||
async function savePullAlarm(msg) { |
|||
const { clickHouse } = app.fs |
|||
try { |
|||
let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg; |
|||
let structsAche = await getStructsAche(); |
|||
if (structsAche) { |
|||
let structs = structsAche.dataList;//结构物列表
|
|||
const { models } = app.fs.dc |
|||
let exist = structs.find(s => s.strucId == structureId); |
|||
if (exist) { |
|||
let alarmType = await clickHouse.anxinyun.query( |
|||
`SELECT name FROM t_alarm_type WHERE code='${alarmTypeCode}'`).toPromise() |
|||
let type = alarmType.length ? alarmType[0].name : '' |
|||
//保存告警[发现]
|
|||
async function savePullAlarm(msg) { |
|||
const { clickHouse } = app.fs |
|||
try { |
|||
let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg; |
|||
let structsAche = await getStructsAche(); |
|||
if (structsAche) { |
|||
let structs = structsAche.dataList;//结构物列表
|
|||
const { models } = app.fs.dc |
|||
let exist = structs.find(s => s.strucId == structureId); |
|||
if (exist) { |
|||
let alarmType = await clickHouse.anxinyun.query( |
|||
`SELECT name FROM t_alarm_type WHERE code='${alarmTypeCode}'`).toPromise() |
|||
let type = alarmType.length ? alarmType[0].name : '' |
|||
|
|||
let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); |
|||
if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1
|
|||
let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间
|
|||
return { |
|||
projectCorrelationId: d, |
|||
alarmInfo: { sourceName, alarmTypeCode }, |
|||
time: time, |
|||
type//异常类型
|
|||
} |
|||
}) |
|||
let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); |
|||
let dynamics = rslt.map(r => { |
|||
return { |
|||
time: r.time, |
|||
alarmAppearId: r.id, |
|||
projectCorrelationId: r.projectCorrelationId, |
|||
type: 1//发现
|
|||
} |
|||
}) |
|||
await models.LatestDynamicList.bulkCreate(dynamics); |
|||
} else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2
|
|||
let datas = projects.map(d => { |
|||
return { |
|||
pepUserId: null, |
|||
projectCorrelationId: d, |
|||
alarmInfo: { source: sourceName, type },//包含告警id,type,source
|
|||
confirmTime: time, |
|||
confirmContent: '自动恢复' |
|||
let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); |
|||
if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1
|
|||
let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间
|
|||
return { |
|||
projectCorrelationId: d, |
|||
alarmInfo: { messageMode, sourceName, alarmTypeCode, content }, |
|||
time: time, |
|||
type//异常类型
|
|||
} |
|||
}) |
|||
let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); |
|||
let dynamics = rslt.map(r => { |
|||
return { |
|||
time: r.time, |
|||
alarmAppearId: r.id, |
|||
projectCorrelationId: r.projectCorrelationId, |
|||
type: 1//发现
|
|||
} |
|||
}) |
|||
await models.LatestDynamicList.bulkCreate(dynamics); |
|||
|
|||
//消息推送到前端
|
|||
if (datas.length) { |
|||
await sendAppearToWeb(app, models, clickHouse, datas, 'data'); |
|||
} |
|||
}) |
|||
let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true }); |
|||
let dynamics = rslt.map(r => { |
|||
return { |
|||
time: r.confirmTime, |
|||
alarmConfirmId: r.id, |
|||
projectCorrelationId: r.projectCorrelationId, |
|||
type: 4//告警确认
|
|||
|
|||
|
|||
} else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2
|
|||
let datas = projects.map(d => { |
|||
return { |
|||
pepUserId: null, |
|||
projectCorrelationId: d, |
|||
alarmInfo: { source: sourceName, type },//包含告警id,type,source
|
|||
confirmTime: time, |
|||
confirmContent: '自动恢复' |
|||
} |
|||
}) |
|||
let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true }); |
|||
let dynamics = rslt.map(r => { |
|||
return { |
|||
time: r.confirmTime, |
|||
alarmConfirmId: r.id, |
|||
projectCorrelationId: r.projectCorrelationId, |
|||
type: 4//告警确认
|
|||
} |
|||
}) |
|||
await models.LatestDynamicList.bulkCreate(dynamics); |
|||
|
|||
|
|||
//消息推送到前端
|
|||
if (datas.length) { |
|||
await sendConfirmToWeb(app, models, clickHouse, datas, true); |
|||
} |
|||
}) |
|||
await models.LatestDynamicList.bulkCreate(dynamics); |
|||
} |
|||
} |
|||
} else { |
|||
console.log(`获取结构物列表失败, error: ${err}`); |
|||
} |
|||
} else { |
|||
console.log(`获取结构物列表失败, error: ${err}`); |
|||
} catch (error) { |
|||
console.error(error); |
|||
} |
|||
} catch (error) { |
|||
console.error(error); |
|||
} |
|||
} catch (error) { |
|||
console.log(error); |
|||
} |
|||
} |
Loading…
Reference in new issue