'use strict'; const moment = require('moment'); const Kafka = require('kafka-node'); //const { getAxyStructs } = require('../utils/helper'); module.exports = async function factory(app, opts) { let structsAche = { dataList: [], expireTime: null//10分钟更新一次结构物列表 } const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL }); const producer = new Kafka.HighLevelProducer(client); producer.on('error', function (err) { app.fs.logger.log('error', "[FS-KAFKA]", err); }); const kafka = { producer: producer, configUpdateMessage: opts.configUpdateMessage || {} }; app.fs.kafka = kafka; app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success"); ////------------------------------------------------------------------- try try try // setTimeout(async () => { // let msg = { // "messageMode": "AlarmGeneration", // "structureId": 1043, // "structureName": "TPA 数据波动影响分析及实验", // "sourceId": "727466e9-28c3-48f0-a320-3fb66b7e4151", // "sourceName": "忻德TPA1", // "alarmTypeCode": "3004", // "alarmCode": "3002", // "content": "link [soip:40001:00012532] is nil", // "time": "2022-10-31T11:21:14.000+0800", // "sourceTypeId": 1, // "sponsor": "et.recv", // "extras": null // } // await savePullAlarm(msg); // }, 3000) const topics = [{ topic: 'anxinyun_alarm', partition: 0 }] const options = { groupId: 'topic-test-one', autoCommit: true, } const consumer = new Kafka.Consumer(client, topics, options) consumer.on("ready", function () { console.log('consumer ready 666666666666') }) consumer.on('message', function (message) { const buf = new Buffer(String(message.value), 'binary') const decodedMessage = JSON.parse(buf.toString()) console.log('decodedMessage: ', decodedMessage) }) consumer.on('error', function (err) { console.log('error', err) }) process.on('SIGINT', function () { consumer.close(true, function () { process.exit() }) }) async function getStructsAche() { const { utils: { getAxyStructs } } = app.fs try { if (!structsAche.dataList.length || moment() > moment(structsAche.expireTime)) { let structList = await getAxyStructs(); structsAche.dataList = structList; structsAche.expireTime = moment().add(10, 'minute').format('YYYY-MM-DD HH:mm:ss'); } return structsAche; } catch (err) { console.log(`获取结构物列表失败, error: ${err}`); } } //保存告警[发现] async function savePullAlarm(msg) { const { clickHouse } = app.fs try { let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg; let structsAche = await getStructsAche(); if (structsAche) { let structs = structsAche.dataList;//结构物列表 const { models } = app.fs.dc let exist = structs.find(s => s.strucId == structureId); if (exist) { let alarmType = await clickHouse.anxinyun.query( `SELECT name FROM t_alarm_type WHERE code='${alarmTypeCode}'`).toPromise() let type = alarmType.length ? alarmType[0].name : '' let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1 let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间 return { projectCorrelationId: d, alarmInfo: { sourceName, alarmTypeCode }, time: time, type//异常类型 } }) let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); let dynamics = rslt.map(r => { return { time: r.time, alarmAppearId: r.id, projectCorrelationId: r.projectCorrelationId, type: 1//发现 } }) await models.LatestDynamicList.bulkCreate(dynamics); } else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2 let datas = projects.map(d => { return { pepUserId: null, projectCorrelationId: d, alarmInfo: { source: sourceName, type },//包含告警id,type,source confirmTime: time, confirmContent: '自动恢复' } }) let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true }); let dynamics = rslt.map(r => { return { time: r.confirmTime, alarmConfirmId: r.id, projectCorrelationId: r.projectCorrelationId, type: 4//告警确认 } }) await models.LatestDynamicList.bulkCreate(dynamics); } } } else { console.log(`获取结构物列表失败, error: ${err}`); } } catch (error) { console.error(error); } } }