From bd0a6167adb706db5fe35b23204bb6b3825f0d72 Mon Sep 17 00:00:00 2001 From: wuqun Date: Tue, 1 Nov 2022 11:35:40 +0800 Subject: [PATCH] =?UTF-8?q?(*)=E8=A7=86=E9=A2=91=E5=91=8A=E8=AD=A6?= =?UTF-8?q?=E5=92=8C=E5=BA=94=E7=94=A8=E5=91=8A=E8=AD=A6=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E4=B8=AD=E5=8A=A0=E6=97=A5=E5=BF=97=E8=AE=B0?= =?UTF-8?q?=E5=BD=95;=20kafka=E6=8E=A5=E6=94=B6=E6=B6=88=E6=81=AF=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/app/lib/controllers/alarm/app.js | 42 ++++ api/app/lib/controllers/alarm/video.js | 31 ++- api/app/lib/service/kafka.js | 260 +++++++++++++------------ 3 files changed, 206 insertions(+), 127 deletions(-) diff --git a/api/app/lib/controllers/alarm/app.js b/api/app/lib/controllers/alarm/app.js index 4472c04..51bd88c 100644 --- a/api/app/lib/controllers/alarm/app.js +++ b/api/app/lib/controllers/alarm/app.js @@ -198,6 +198,48 @@ async function apiError(ctx) { storageData.createTime = now storageData.screenshot = screenshot await models.AppAlarm.create(storageData) + + //存告警记录 + let constTypes = { 'element': "元素异常", 'apiError': "接口报错 ", 'timeout': "加载超时" } + let belongsTo = await models.ProjectApp.findOne({ + where: { + id: projectAppId + }, + include: [{ + model: models.ProjectCorrelation, + attributes: ['id'], + where: { del: false } + }] + }) + if (belongsTo) { + let appName = await models.App.findOne({ + where: { + id: belongsTo.appId + }, + attributes: ['name'], + }) + let projects = belongsTo.projectCorrelation.map(d => d.id);//归属项目 + if (projects.length) { + let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间 + return { + projectCorrelationId: d, + alarmInfo: { messageMode: 'AlarmGeneration', sourceName: appName.name, content: alarmContent, type },//AlarmGeneration代表告警首次产生 + time: now, + type: constTypes[type] + } + }) + let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); + let dynamics = rslt.map(r => { + return { + time: r.time, + alarmAppearId: r.id, + projectCorrelationId: r.projectCorrelationId, + type: 1//发现 + } + }) + await models.LatestDynamicList.bulkCreate(dynamics); + } + } } } diff --git a/api/app/lib/controllers/alarm/video.js b/api/app/lib/controllers/alarm/video.js index b2348fe..02c59c7 100644 --- a/api/app/lib/controllers/alarm/video.js +++ b/api/app/lib/controllers/alarm/video.js @@ -287,9 +287,38 @@ async function confirm (ctx) { async function alarmAdded (ctx) { try { const { models } = ctx.fs.dc; + const { clickHouse } = ctx.app.fs + const { utils: { anxinStrucIdRange } } = ctx.app.fs + let anxinStruc = await anxinStrucIdRange({ ctx }) + + const { serial_no, channel_no, create_time, description, status_id } = ctx.request.body; - + let belongToStruct = await clickHouse.anxinyun.query( + `SELECT name, structure FROM t_video_ipc WHERE serial_no='${serial_no}' and channel_no='${channel_no}'`).toPromise() + let structId = belongToStruct.length ? belongToStruct[0].structure : null + if (structId) { + let exist = anxinStruc.find(s => s.strucId == structId); + let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); + let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间 + return { + projectCorrelationId: d, + alarmInfo: { messageMode: 'AlarmGeneration', sourceName: belongToStruct[0].name, status_id, content: description },//AlarmGeneration代表告警首次产生 + time: create_time, + type: description + } + }) + let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); + let dynamics = rslt.map(r => { + return { + time: r.time, + alarmAppearId: r.id, + projectCorrelationId: r.projectCorrelationId, + type: 1//发现 + } + }) + await models.LatestDynamicList.bulkCreate(dynamics); + } ctx.status = 200; } catch (error) { ctx.fs.logger.error(`path: ${ctx.path}, error: error`); diff --git a/api/app/lib/service/kafka.js b/api/app/lib/service/kafka.js index 0c81913..cc0bb85 100644 --- a/api/app/lib/service/kafka.js +++ b/api/app/lib/service/kafka.js @@ -1,150 +1,158 @@ 'use strict'; const moment = require('moment'); const Kafka = require('kafka-node'); -//const { getAxyStructs } = require('../utils/helper'); module.exports = async function factory(app, opts) { + try { + const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL, fromOffset: true }); + const producer = new Kafka.HighLevelProducer(client); + producer.on('error', function (err) { + app.fs.logger.log('error', "[FS-KAFKA]", err); + }); + producer.on("ready", function () { + console.log('111111 ready 666666666666') + }) + // const kafka = { + // producer: producer, + // configUpdateMessage: opts.configUpdateMessage || {} + // }; - let structsAche = { - dataList: [], - expireTime: null//10分钟更新一次结构物列表 - } - - const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL }); - const producer = new Kafka.HighLevelProducer(client); - producer.on('error', function (err) { - app.fs.logger.log('error', "[FS-KAFKA]", err); - }); - - const kafka = { - producer: producer, - configUpdateMessage: opts.configUpdateMessage || {} - }; + // app.fs.kafka = kafka; + app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success"); - app.fs.kafka = kafka; - app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success"); - ////------------------------------------------------------------------- try try try + // const topics = [{ topic: 'anxinyun_alarm', partition: 0 }] + // const options = { + // // groupId: 'topic-test-one', + // autoCommit: false, + // //fromOffset: true, + // fromOffset: 'latest' + // } + // const consumer = new Kafka.Consumer(client, topics, options) + // consumer.on("ready", function () { + // console.log('consumer ready 666666666666') + // }) + // // consumer.on("message", function (w) { + // // console.log('consumer ready 666666666666') + // // }) + // consumer.on('message', function (message) { + // const decodedMessage = JSON.parse(message.value) + // console.log('decodedMessage: ', decodedMessage) + // }) + // consumer.on('error', function (err) { + // console.log('consumer err:') + // console.log(err); + // }); - // setTimeout(async () => { - // let msg = { - // "messageMode": "AlarmGeneration", - // "structureId": 1043, - // "structureName": "TPA 数据波动影响分析及实验", - // "sourceId": "727466e9-28c3-48f0-a320-3fb66b7e4151", - // "sourceName": "忻德TPA1", - // "alarmTypeCode": "3004", - // "alarmCode": "3002", - // "content": "link [soip:40001:00012532] is nil", - // "time": "2022-10-31T11:21:14.000+0800", - // "sourceTypeId": 1, - // "sponsor": "et.recv", - // "extras": null - // } - // await savePullAlarm(msg); - // }, 3000) - const topics = [{ topic: 'anxinyun_alarm', partition: 0 }] - const options = { - groupId: 'topic-test-one', - autoCommit: true, - } - const consumer = new Kafka.Consumer(client, topics, options) - consumer.on("ready", function () { - console.log('consumer ready 666666666666') - }) - consumer.on('message', function (message) { - const buf = new Buffer(String(message.value), 'binary') - const decodedMessage = JSON.parse(buf.toString()) - console.log('decodedMessage: ', decodedMessage) - }) - consumer.on('error', function (err) { - console.log('error', err) - }) - process.on('SIGINT', function () { - consumer.close(true, function () { - process.exit() + let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId: 'yunwei-platform-api', fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm']) + consumer.on('message', async function (message) { + let msg = JSON.parse(message.value) + await savePullAlarm(msg); }) - }) + // let offset = new Kafka.Offset(client); + // consumer.on('offsetOutOfRange', function (topic) { + // console.log('offsetOutOfRange') + // // consumer.setOffset('topic', 0, 0); + // // topic.maxNum = 1; + // offset.fetch([topic], function (err, offsets) { + // if (err) { + // return console.error(err); + // } + // try { + // const max = Math.max.apply(null, offsets[topic.topic][topic.partition]); + // consumer.setOffset(topic.topic, topic.partition, max); + // } catch (error) { + // console.log(error); + // } + // }); + // }); - async function getStructsAche() { - const { utils: { getAxyStructs } } = app.fs - try { - if (!structsAche.dataList.length || moment() > moment(structsAche.expireTime)) { - let structList = await getAxyStructs(); - structsAche.dataList = structList; - structsAche.expireTime = moment().add(10, 'minute').format('YYYY-MM-DD HH:mm:ss'); + let structsAche = { + dataList: [], + expireTime: null//10分钟更新一次结构物列表 + } + async function getStructsAche() { + const { utils: { getAxyStructs } } = app.fs + try { + if (!structsAche.dataList.length || moment() > moment(structsAche.expireTime)) { + let structList = await getAxyStructs(); + structsAche.dataList = structList; + structsAche.expireTime = moment().add(10, 'minute').format('YYYY-MM-DD HH:mm:ss'); + } + return structsAche; + } catch (err) { + console.log(`获取结构物列表失败, error: ${err}`); } - return structsAche; - } catch (err) { - console.log(`获取结构物列表失败, error: ${err}`); } - } - //保存告警[发现] - async function savePullAlarm(msg) { - const { clickHouse } = app.fs - try { - let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg; - let structsAche = await getStructsAche(); - if (structsAche) { - let structs = structsAche.dataList;//结构物列表 - const { models } = app.fs.dc - let exist = structs.find(s => s.strucId == structureId); - if (exist) { - let alarmType = await clickHouse.anxinyun.query( - `SELECT name FROM t_alarm_type WHERE code='${alarmTypeCode}'`).toPromise() - let type = alarmType.length ? alarmType[0].name : '' + //保存告警[发现] + async function savePullAlarm(msg) { + const { clickHouse } = app.fs + try { + let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg; + let structsAche = await getStructsAche(); + if (structsAche) { + let structs = structsAche.dataList;//结构物列表 + const { models } = app.fs.dc + let exist = structs.find(s => s.strucId == structureId); + if (exist) { + let alarmType = await clickHouse.anxinyun.query( + `SELECT name FROM t_alarm_type WHERE code='${alarmTypeCode}'`).toPromise() + let type = alarmType.length ? alarmType[0].name : '' - let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); - if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1 - let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间 - return { - projectCorrelationId: d, - alarmInfo: { sourceName, alarmTypeCode }, - time: time, - type//异常类型 - } - }) - let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); - let dynamics = rslt.map(r => { - return { - time: r.time, - alarmAppearId: r.id, - projectCorrelationId: r.projectCorrelationId, - type: 1//发现 - } - }) - await models.LatestDynamicList.bulkCreate(dynamics); - } else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2 - let datas = projects.map(d => { - return { - pepUserId: null, - projectCorrelationId: d, - alarmInfo: { source: sourceName, type },//包含告警id,type,source - confirmTime: time, - confirmContent: '自动恢复' - } - }) - let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true }); - let dynamics = rslt.map(r => { - return { - time: r.confirmTime, - alarmConfirmId: r.id, - projectCorrelationId: r.projectCorrelationId, - type: 4//告警确认 - } - }) - await models.LatestDynamicList.bulkCreate(dynamics); + let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); + if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1 + let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间 + return { + projectCorrelationId: d, + alarmInfo: { messageMode, sourceName, alarmTypeCode, content }, + time: time, + type//异常类型 + } + }) + let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); + let dynamics = rslt.map(r => { + return { + time: r.time, + alarmAppearId: r.id, + projectCorrelationId: r.projectCorrelationId, + type: 1//发现 + } + }) + await models.LatestDynamicList.bulkCreate(dynamics); + } else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2 + let datas = projects.map(d => { + return { + pepUserId: null, + projectCorrelationId: d, + alarmInfo: { source: sourceName, type },//包含告警id,type,source + confirmTime: time, + confirmContent: '自动恢复' + } + }) + let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true }); + let dynamics = rslt.map(r => { + return { + time: r.confirmTime, + alarmConfirmId: r.id, + projectCorrelationId: r.projectCorrelationId, + type: 4//告警确认 + } + }) + await models.LatestDynamicList.bulkCreate(dynamics); + } } + } else { + console.log(`获取结构物列表失败, error: ${err}`); } - } else { - console.log(`获取结构物列表失败, error: ${err}`); + } catch (error) { + console.error(error); } - } catch (error) { - console.error(error); } + } catch (error) { + console.log(error); } } \ No newline at end of file