'use strict'; const moment = require('moment'); const Kafka = require('kafka-node'); module.exports = async function factory(app, opts) { try { const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL, fromOffset: true }); const producer = new Kafka.HighLevelProducer(client); producer.on('error', function (err) { app.fs.logger.log('error', "[FS-KAFKA]", err); }); producer.on("ready", function () { console.log('111111 ready 666666666666') }) let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId: 'yunwei-platform-api', fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm']) consumer.on('message', async function (message) { let msg = JSON.parse(message.value) console.log('kafka consumer----------接收到消息'); await savePullAlarm(msg); }) const kafka = { producer: producer, configUpdateMessage: opts.configUpdateMessage || {} }; app.fs.kafka = kafka; app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success"); // const topics = [{ topic: 'anxinyun_alarm', partition: 0 }] // const options = { // // groupId: 'topic-test-one', // autoCommit: false, // //fromOffset: true, // fromOffset: 'latest' // } // const consumer = new Kafka.Consumer(client, topics, options) // consumer.on("ready", function () { // console.log('consumer ready 666666666666') // }) // // consumer.on("message", function (w) { // // console.log('consumer ready 666666666666') // // }) // consumer.on('message', function (message) { // const decodedMessage = JSON.parse(message.value) // console.log('decodedMessage: ', decodedMessage) // }) // consumer.on('error', function (err) { // console.log('consumer err:') // console.log(err); // }); // let offset = new Kafka.Offset(client); // consumer.on('offsetOutOfRange', function (topic) { // console.log('offsetOutOfRange') // // consumer.setOffset('topic', 0, 0); // // topic.maxNum = 1; // offset.fetch([topic], function (err, offsets) { // if (err) { // return console.error(err); // } // try { // const max = Math.max.apply(null, offsets[topic.topic][topic.partition]); // consumer.setOffset(topic.topic, topic.partition, max); // } catch (error) { // console.log(error); // } // }); // }); let structsAche = { dataList: [], expireTime: null//10分钟更新一次结构物列表 } async function getStructsAche() { const { utils: { getAxyStructs } } = app.fs try { if (!structsAche.dataList.length || moment() > moment(structsAche.expireTime)) { let structList = await getAxyStructs(); structsAche.dataList = structList; structsAche.expireTime = moment().add(10, 'minute').format('YYYY-MM-DD HH:mm:ss'); } return structsAche; } catch (err) { console.log(`获取结构物列表失败, error: ${err}`); } } //保存告警[发现] let constAlarmGroups = { 1: '数据中断', 2: '数据异常', 3: '策略命中', 4: '设备异常', 5: '设备异常' } async function savePullAlarm(msg) { const { clickHouse, utils: { sendAppearToWeb, sendConfirmToWeb } } = app.fs try { let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg; let structsAche = await getStructsAche(); if (structsAche) { let structs = structsAche.dataList;//结构物列表 const { models } = app.fs.dc let exist = structs.find(s => s.strucId == structureId); if (exist) { let alarm_group = await clickHouse.anxinyun.query( `SELECT alarm_group FROM t_alarm_code WHERE code='${alarmCode}'`).toPromise(); let type = alarm_group.length ? constAlarmGroups[alarm_group[0].alarm_group] : null//告警类型 let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1 let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间 return { projectCorrelationId: d, alarmInfo: { messageMode, sourceName, alarmTypeCode, alarmCode, content }, time: time, type//告警类型 } }) let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); let dynamics = rslt.map(r => { return { time: r.time, alarmAppearId: r.id, projectCorrelationId: r.projectCorrelationId, type: 1//发现 } }) await models.LatestDynamicList.bulkCreate(dynamics); //消息推送到前端 if (datas.length) { await sendAppearToWeb(datas, 'data'); } } else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2 let datas = projects.map(d => { return { pepUserId: null, projectCorrelationId: d, alarmInfo: { source: sourceName, type },//包含告警id,type,source confirmTime: time, confirmContent: '自动恢复' } }) let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true }); let dynamics = rslt.map(r => { return { time: r.confirmTime, alarmConfirmId: r.id, projectCorrelationId: r.projectCorrelationId, type: 4//告警确认 } }) await models.LatestDynamicList.bulkCreate(dynamics); //消息推送到前端 if (datas.length) { await sendConfirmToWeb(datas, true); } } } } else { console.log(`获取结构物列表失败, error: ${err}`); } } catch (error) { console.error(error); } } } catch (error) { console.log(error); } }