|
|
@ -12,6 +12,13 @@ module.exports = async function factory(app, opts) { |
|
|
|
console.log('111111 ready 666666666666') |
|
|
|
}) |
|
|
|
|
|
|
|
let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId: 'yunwei-platform-api', fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm']) |
|
|
|
consumer.on('message', async function (message) { |
|
|
|
let msg = JSON.parse(message.value) |
|
|
|
console.log('kafka consumer----------接收到消息'); |
|
|
|
await savePullAlarm(msg); |
|
|
|
}) |
|
|
|
|
|
|
|
const kafka = { |
|
|
|
producer: producer, |
|
|
|
configUpdateMessage: opts.configUpdateMessage || {} |
|
|
@ -45,12 +52,6 @@ module.exports = async function factory(app, opts) { |
|
|
|
// });
|
|
|
|
|
|
|
|
|
|
|
|
let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId: 'yunwei-platform-api', fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm']) |
|
|
|
consumer.on('message', async function (message) { |
|
|
|
let msg = JSON.parse(message.value) |
|
|
|
console.log('kafka consumer----------接收到消息'); |
|
|
|
await savePullAlarm(msg); |
|
|
|
}) |
|
|
|
// let offset = new Kafka.Offset(client);
|
|
|
|
// consumer.on('offsetOutOfRange', function (topic) {
|
|
|
|
// console.log('offsetOutOfRange')
|
|
|
@ -89,6 +90,13 @@ module.exports = async function factory(app, opts) { |
|
|
|
} |
|
|
|
|
|
|
|
//保存告警[发现]
|
|
|
|
let constAlarmGroups = { |
|
|
|
1: '数据中断', |
|
|
|
2: '数据异常', |
|
|
|
3: '策略命中', |
|
|
|
4: '设备异常', |
|
|
|
5: '设备异常' |
|
|
|
} |
|
|
|
async function savePullAlarm(msg) { |
|
|
|
const { clickHouse, utils: { sendAppearToWeb, sendConfirmToWeb } } = app.fs |
|
|
|
try { |
|
|
@ -99,18 +107,20 @@ module.exports = async function factory(app, opts) { |
|
|
|
const { models } = app.fs.dc |
|
|
|
let exist = structs.find(s => s.strucId == structureId); |
|
|
|
if (exist) { |
|
|
|
let alarmType = await clickHouse.anxinyun.query( |
|
|
|
`SELECT name FROM t_alarm_type WHERE code='${alarmTypeCode}'`).toPromise() |
|
|
|
let type = alarmType.length ? alarmType[0].name : '' |
|
|
|
let alarm_group = await clickHouse.anxinyun.query( |
|
|
|
`SELECT alarm_group FROM t_alarm_code WHERE code='${alarmCode}'`).toPromise(); |
|
|
|
|
|
|
|
let type = alarm_group.length ? constAlarmGroups[alarm_group[0].alarm_group] : null//告警类型
|
|
|
|
|
|
|
|
|
|
|
|
let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); |
|
|
|
if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1
|
|
|
|
let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间
|
|
|
|
return { |
|
|
|
projectCorrelationId: d, |
|
|
|
alarmInfo: { messageMode, sourceName, alarmTypeCode, content }, |
|
|
|
alarmInfo: { messageMode, sourceName, alarmTypeCode, alarmCode, content }, |
|
|
|
time: time, |
|
|
|
type//异常类型
|
|
|
|
type//告警类型
|
|
|
|
} |
|
|
|
}) |
|
|
|
let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); |
|
|
|