diff --git a/api/app/lib/controllers/alarm/alarmConfirmLog.js b/api/app/lib/controllers/alarm/alarmConfirmLog.js index 378c584..11cfd5b 100644 --- a/api/app/lib/controllers/alarm/alarmConfirmLog.js +++ b/api/app/lib/controllers/alarm/alarmConfirmLog.js @@ -5,6 +5,7 @@ const moment = require('moment') async function alarmConfirmLog(ctx, confirmPost, content) { try { const { models } = ctx.fs.dc; + const { clickHouse } = ctx.app.fs; //存日志 let logDatas = []; confirmPost.map(cp => { @@ -33,9 +34,9 @@ async function alarmConfirmLog(ctx, confirmPost, content) { await models.LatestDynamicList.bulkCreate(dynamics); //消息推送到前端 - //ctx.app.socket.emit('TEST', { someProperty: '【广播】呼叫青铜时代号!!!', }) - - + if (logDatas.length) { + await sendConfirmToWeb(ctx.app, models, clickHouse, logDatas, false); + } } catch (error) { ctx.fs.logger.error(`path: ${ctx.path}, error: ${error}`); ctx.status = 400; @@ -45,6 +46,105 @@ async function alarmConfirmLog(ctx, confirmPost, content) { } } +let constAlarmGroups = { + 1: '数据中断', + 2: '数据异常', + 3: '策略命中', + 4: '设备异常', + 5: '设备异常', + 'video': '视频异常', + 'app': '应用异常' +} +async function sendAppearToWeb(app, models, clickHouse, datas, ttype) { + try { + //告警类型 + let alarmGroup = null + + //项目信息 + let { projects, pepProjects } = await getProjectsInfo(models, clickHouse, datas); + + //数据类区分alarmGroup + if (ttype == 'data') { + let alarm_group = await clickHouse.anxinyun.query( + `SELECT alarm_group FROM t_alarm_code WHERE code='${datas[0].alarmInfo.alarmTypeCode}'`).toPromise(); + + alarmGroup = alarm_group.length ? constAlarmGroups[alarm_group[0].alarm_group] : null + } else { + alarmGroup = constAlarmGroups[ttype] + } + + let sendData = [] + datas.map(ld => { + let pepPId = projects.find(p => p.id == ld.projectCorrelationId).pepProjectId; + sendData.push({ + projectCorrelationId: ld.projectCorrelationId, + project: projects.find(p => p.id == ld.projectCorrelationId).name || pepProjects.find(pp => pp.id == pepPId).project_name,//前者为自定义项目名称 + source: ld.alarmInfo.sourceName, + type: ld.type, + time: ld.time, + alarmGroup//告警类型 + }) + }) + app.socket.emit('alarmSendSocket', { type: 'alarmAppear', sendData }) + } catch (err) { + console.log(`告警(发现)推送失败, error: ${err}`); + } +} + +async function getProjectsInfo(models, clickHouse, logDatas) { + try { + let pIds = logDatas.map(l => l.projectCorrelationId);//所有的项目的id + let projects = await models.ProjectCorrelation.findAll({ + where: { id: { $in: pIds } }, + attributes: ['id', 'name', 'pepProjectId'] + }); + + let pepPojectIds = new Set(); + for (let p of projects) { + pepPojectIds.add(p.pepProjectId); + } + + let pepProjects = pepPojectIds.size ? await clickHouse.projectManage.query(` + SELECT id, project_name FROM t_pim_project WHERE id IN (${[...pepPojectIds]})`).toPromise() : []; + + return { projects, pepProjects }; + } catch (err) { + console.log(`获取项目信息失败, error: ${err}`); + } +} +async function sendConfirmToWeb(app, models, clickHouse, logDatas, isAuto) { + try { + //用户信息 + let userName = null + if (!isAuto) { + let userPepRes = await clickHouse.pepEmis.query( + `SELECT DISTINCT user.id AS id, "user"."name" AS name FROM user WHERE user.id=${logDatas[0].pepUserId}`).toPromise(); + userName = userPepRes.length ? userPepRes[0].name : null + } + + //项目信息 + let { projects, pepProjects } = await getProjectsInfo(models, clickHouse, logDatas); + let sendData = [] + logDatas.map(ld => { + let pepPId = projects.find(p => p.id == ld.projectCorrelationId).pepProjectId; + sendData.push({ + user: userName, + projectCorrelationId: ld.projectCorrelationId, + project: projects.find(p => p.id == ld.projectCorrelationId).name || pepProjects.find(pp => pp.id == pepPId).project_name,//前者为自定义项目名称 + source: ld.alarmInfo.source, + type: ld.alarmInfo.type, + time: ld.confirmTime, + isAuto//是否为自动恢复,自动恢复时user为null + }) + }) + app.socket.emit('alarmSendSocket', { type: 'alarmConfirm', sendData })//小飞(处理人) 确认并关闭了A项目(项目) DTU设备(告警源) 状态异常(异常类型)的问题 + } catch (err) { + console.log(`告警(确认)推送失败, error: ${err}`); + } +} + module.exports = { - alarmConfirmLog + alarmConfirmLog, + sendAppearToWeb,//推送告警发现 + sendConfirmToWeb//推送告警确认 }; \ No newline at end of file diff --git a/api/app/lib/controllers/alarm/app.js b/api/app/lib/controllers/alarm/app.js index 4472c04..89a5896 100644 --- a/api/app/lib/controllers/alarm/app.js +++ b/api/app/lib/controllers/alarm/app.js @@ -1,7 +1,7 @@ 'use strict'; const moment = require('moment') -const { alarmConfirmLog } = require('./alarmConfirmLog'); +const { alarmConfirmLog, sendAppearToWeb } = require('./alarmConfirmLog'); async function inspection(ctx) { // 巡查 try { @@ -143,6 +143,7 @@ async function notedInspection(ctx) { async function apiError(ctx) { try { + const { clickHouse } = ctx.app.fs const models = ctx.fs.dc.models; const { projectAppId, alarmContent, router, statusCode, screenshot = '', type } = ctx.request.body const now = moment().format() @@ -198,6 +199,46 @@ async function apiError(ctx) { storageData.createTime = now storageData.screenshot = screenshot await models.AppAlarm.create(storageData) + + //存告警记录 + let constTypes = { 'element': "元素异常", 'apiError': "接口报错 ", 'timeout': "加载超时" } + let belongsTo = await models.ProjectApp.findOne({ + where: { + id: projectAppId + }, + include: [{ + model: models.ProjectCorrelation, + attributes: ['id'], + where: { del: false } + }] + }) + if (belongsTo) { + let appName = await models.App.findOne({ + where: { + id: belongsTo.appId + }, + attributes: ['name'], + }) + let pId = belongsTo.projectCorrelation.dataValues.id;//归属项目 + let data = { + projectCorrelationId: pId, + alarmInfo: { messageMode: 'AlarmGeneration', sourceName: appName.name, content: alarmContent, type },//AlarmGeneration代表告警首次产生 + time: now, + type: constTypes[type] + } + let r = await models.AlarmAppearRecord.create(data, { returning: true }); + let dynamic = { + time: r.dataValues.time, + alarmAppearId: r.dataValues.id, + projectCorrelationId: r.dataValues.projectCorrelationId, + type: 1//发现 + } + await models.LatestDynamicList.create(dynamic); + + + //消息推送到前端 + await sendAppearToWeb(ctx.app, models, clickHouse, [data], 'app'); + } } } diff --git a/api/app/lib/controllers/alarm/video.js b/api/app/lib/controllers/alarm/video.js index b2348fe..8fd296b 100644 --- a/api/app/lib/controllers/alarm/video.js +++ b/api/app/lib/controllers/alarm/video.js @@ -1,7 +1,7 @@ 'use strict'; const moment = require('moment') -const { alarmConfirmLog } = require('./alarmConfirmLog'); -async function deviceType (ctx) { +const { alarmConfirmLog, sendAppearToWeb } = require('./alarmConfirmLog'); +async function deviceType(ctx) { try { const { models } = ctx.fs.dc; const { clickHouse } = ctx.app.fs @@ -21,7 +21,7 @@ async function deviceType (ctx) { } } -async function alarmList (ctx) { +async function alarmList(ctx) { try { const { models } = ctx.fs.dc; const { clickHouse } = ctx.app.fs @@ -262,7 +262,7 @@ async function alarmList (ctx) { } } -async function confirm (ctx) { +async function confirm(ctx) { try { const { alarmId, content, confirmPost } = ctx.request.body; // TODO: 以视频·应用的秘钥进行鉴权 @@ -284,12 +284,46 @@ async function confirm (ctx) { } } -async function alarmAdded (ctx) { +async function alarmAdded(ctx) { try { const { models } = ctx.fs.dc; + const { clickHouse } = ctx.app.fs + const { utils: { anxinStrucIdRange } } = ctx.app.fs + let anxinStruc = await anxinStrucIdRange({ ctx }) + + const { serial_no, channel_no, create_time, description, status_id } = ctx.request.body; - + let belongToStruct = await clickHouse.anxinyun.query( + `SELECT name, structure FROM t_video_ipc WHERE serial_no='${serial_no}' and channel_no='${channel_no}'`).toPromise() + let structId = belongToStruct.length ? belongToStruct[0].structure : null + if (structId) { + let exist = anxinStruc.find(s => s.strucId == structId); + let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); + let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间 + return { + projectCorrelationId: d, + alarmInfo: { messageMode: 'AlarmGeneration', sourceName: belongToStruct[0].name, status_id, content: description },//AlarmGeneration代表告警首次产生 + time: create_time, + type: description + } + }) + let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); + let dynamics = rslt.map(r => { + return { + time: r.time, + alarmAppearId: r.id, + projectCorrelationId: r.projectCorrelationId, + type: 1//发现 + } + }) + await models.LatestDynamicList.bulkCreate(dynamics); + + //消息推送到前端 + if (datas.length) { + await sendAppearToWeb(ctx.app, models, clickHouse, datas, 'video'); + } + } ctx.status = 200; } catch (error) { ctx.fs.logger.error(`path: ${ctx.path}, error: error`); @@ -300,7 +334,7 @@ async function alarmAdded (ctx) { } } -async function vcmpAppAuthToken (ctx) { +async function vcmpAppAuthToken(ctx) { try { const { models } = ctx.fs.dc; const { utils: { vcmpAuth } } = ctx.app.fs diff --git a/api/app/lib/service/kafka.js b/api/app/lib/service/kafka.js index 0c81913..3e2d7e2 100644 --- a/api/app/lib/service/kafka.js +++ b/api/app/lib/service/kafka.js @@ -1,150 +1,171 @@ 'use strict'; const moment = require('moment'); const Kafka = require('kafka-node'); -//const { getAxyStructs } = require('../utils/helper'); - +const { sendAppearToWeb, sendConfirmToWeb } = require('../controllers/alarm/alarmConfirmLog') module.exports = async function factory(app, opts) { + try { + const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL, fromOffset: true }); + const producer = new Kafka.HighLevelProducer(client); + producer.on('error', function (err) { + app.fs.logger.log('error', "[FS-KAFKA]", err); + }); + producer.on("ready", function () { + console.log('111111 ready 666666666666') + }) + // const kafka = { + // producer: producer, + // configUpdateMessage: opts.configUpdateMessage || {} + // }; - let structsAche = { - dataList: [], - expireTime: null//10分钟更新一次结构物列表 - } - - const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL }); - const producer = new Kafka.HighLevelProducer(client); - producer.on('error', function (err) { - app.fs.logger.log('error', "[FS-KAFKA]", err); - }); - - const kafka = { - producer: producer, - configUpdateMessage: opts.configUpdateMessage || {} - }; + // app.fs.kafka = kafka; + app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success"); - app.fs.kafka = kafka; - app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success"); - ////------------------------------------------------------------------- try try try + // const topics = [{ topic: 'anxinyun_alarm', partition: 0 }] + // const options = { + // // groupId: 'topic-test-one', + // autoCommit: false, + // //fromOffset: true, + // fromOffset: 'latest' + // } + // const consumer = new Kafka.Consumer(client, topics, options) + // consumer.on("ready", function () { + // console.log('consumer ready 666666666666') + // }) + // // consumer.on("message", function (w) { + // // console.log('consumer ready 666666666666') + // // }) + // consumer.on('message', function (message) { + // const decodedMessage = JSON.parse(message.value) + // console.log('decodedMessage: ', decodedMessage) + // }) + // consumer.on('error', function (err) { + // console.log('consumer err:') + // console.log(err); + // }); - // setTimeout(async () => { - // let msg = { - // "messageMode": "AlarmGeneration", - // "structureId": 1043, - // "structureName": "TPA 数据波动影响分析及实验", - // "sourceId": "727466e9-28c3-48f0-a320-3fb66b7e4151", - // "sourceName": "忻德TPA1", - // "alarmTypeCode": "3004", - // "alarmCode": "3002", - // "content": "link [soip:40001:00012532] is nil", - // "time": "2022-10-31T11:21:14.000+0800", - // "sourceTypeId": 1, - // "sponsor": "et.recv", - // "extras": null - // } - // await savePullAlarm(msg); - // }, 3000) - const topics = [{ topic: 'anxinyun_alarm', partition: 0 }] - const options = { - groupId: 'topic-test-one', - autoCommit: true, - } - const consumer = new Kafka.Consumer(client, topics, options) - consumer.on("ready", function () { - console.log('consumer ready 666666666666') - }) - consumer.on('message', function (message) { - const buf = new Buffer(String(message.value), 'binary') - const decodedMessage = JSON.parse(buf.toString()) - console.log('decodedMessage: ', decodedMessage) - }) - consumer.on('error', function (err) { - console.log('error', err) - }) - process.on('SIGINT', function () { - consumer.close(true, function () { - process.exit() + let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId: 'yunwei-platform-api', fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm']) + consumer.on('message', async function (message) { + let msg = JSON.parse(message.value) + await savePullAlarm(msg); }) - }) + // let offset = new Kafka.Offset(client); + // consumer.on('offsetOutOfRange', function (topic) { + // console.log('offsetOutOfRange') + // // consumer.setOffset('topic', 0, 0); + // // topic.maxNum = 1; + // offset.fetch([topic], function (err, offsets) { + // if (err) { + // return console.error(err); + // } + // try { + // const max = Math.max.apply(null, offsets[topic.topic][topic.partition]); + // consumer.setOffset(topic.topic, topic.partition, max); + // } catch (error) { + // console.log(error); + // } + // }); + // }); - async function getStructsAche() { - const { utils: { getAxyStructs } } = app.fs - try { - if (!structsAche.dataList.length || moment() > moment(structsAche.expireTime)) { - let structList = await getAxyStructs(); - structsAche.dataList = structList; - structsAche.expireTime = moment().add(10, 'minute').format('YYYY-MM-DD HH:mm:ss'); + let structsAche = { + dataList: [], + expireTime: null//10分钟更新一次结构物列表 + } + async function getStructsAche() { + const { utils: { getAxyStructs } } = app.fs + try { + if (!structsAche.dataList.length || moment() > moment(structsAche.expireTime)) { + let structList = await getAxyStructs(); + structsAche.dataList = structList; + structsAche.expireTime = moment().add(10, 'minute').format('YYYY-MM-DD HH:mm:ss'); + } + return structsAche; + } catch (err) { + console.log(`获取结构物列表失败, error: ${err}`); } - return structsAche; - } catch (err) { - console.log(`获取结构物列表失败, error: ${err}`); } - } - //保存告警[发现] - async function savePullAlarm(msg) { - const { clickHouse } = app.fs - try { - let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg; - let structsAche = await getStructsAche(); - if (structsAche) { - let structs = structsAche.dataList;//结构物列表 - const { models } = app.fs.dc - let exist = structs.find(s => s.strucId == structureId); - if (exist) { - let alarmType = await clickHouse.anxinyun.query( - `SELECT name FROM t_alarm_type WHERE code='${alarmTypeCode}'`).toPromise() - let type = alarmType.length ? alarmType[0].name : '' + //保存告警[发现] + async function savePullAlarm(msg) { + const { clickHouse } = app.fs + try { + let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg; + let structsAche = await getStructsAche(); + if (structsAche) { + let structs = structsAche.dataList;//结构物列表 + const { models } = app.fs.dc + let exist = structs.find(s => s.strucId == structureId); + if (exist) { + let alarmType = await clickHouse.anxinyun.query( + `SELECT name FROM t_alarm_type WHERE code='${alarmTypeCode}'`).toPromise() + let type = alarmType.length ? alarmType[0].name : '' - let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); - if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1 - let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间 - return { - projectCorrelationId: d, - alarmInfo: { sourceName, alarmTypeCode }, - time: time, - type//异常类型 - } - }) - let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); - let dynamics = rslt.map(r => { - return { - time: r.time, - alarmAppearId: r.id, - projectCorrelationId: r.projectCorrelationId, - type: 1//发现 - } - }) - await models.LatestDynamicList.bulkCreate(dynamics); - } else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2 - let datas = projects.map(d => { - return { - pepUserId: null, - projectCorrelationId: d, - alarmInfo: { source: sourceName, type },//包含告警id,type,source - confirmTime: time, - confirmContent: '自动恢复' + let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); + if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1 + let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间 + return { + projectCorrelationId: d, + alarmInfo: { messageMode, sourceName, alarmTypeCode, content }, + time: time, + type//异常类型 + } + }) + let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); + let dynamics = rslt.map(r => { + return { + time: r.time, + alarmAppearId: r.id, + projectCorrelationId: r.projectCorrelationId, + type: 1//发现 + } + }) + await models.LatestDynamicList.bulkCreate(dynamics); + + //消息推送到前端 + if (datas.length) { + await sendAppearToWeb(app, models, clickHouse, datas, 'data'); } - }) - let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true }); - let dynamics = rslt.map(r => { - return { - time: r.confirmTime, - alarmConfirmId: r.id, - projectCorrelationId: r.projectCorrelationId, - type: 4//告警确认 + + + } else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2 + let datas = projects.map(d => { + return { + pepUserId: null, + projectCorrelationId: d, + alarmInfo: { source: sourceName, type },//包含告警id,type,source + confirmTime: time, + confirmContent: '自动恢复' + } + }) + let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true }); + let dynamics = rslt.map(r => { + return { + time: r.confirmTime, + alarmConfirmId: r.id, + projectCorrelationId: r.projectCorrelationId, + type: 4//告警确认 + } + }) + await models.LatestDynamicList.bulkCreate(dynamics); + + + //消息推送到前端 + if (datas.length) { + await sendConfirmToWeb(app, models, clickHouse, datas, true); } - }) - await models.LatestDynamicList.bulkCreate(dynamics); + } } + } else { + console.log(`获取结构物列表失败, error: ${err}`); } - } else { - console.log(`获取结构物列表失败, error: ${err}`); + } catch (error) { + console.error(error); } - } catch (error) { - console.error(error); } + } catch (error) { + console.log(error); } } \ No newline at end of file diff --git a/web/client/src/sections/control/containers/control.jsx b/web/client/src/sections/control/containers/control.jsx index 801caad..78f4867 100644 --- a/web/client/src/sections/control/containers/control.jsx +++ b/web/client/src/sections/control/containers/control.jsx @@ -48,6 +48,23 @@ const Control = (props) => { const exhibition = useRef({ workbench: [], statistical: [] }) //页面结构 const FormApi = useRef() + // websocket 使用测试 + useEffect(() => { + if (socket) { + socket.on('alarmSendSocket', function (msg) { + //console.info(msg); + if (msg.type == "alarmAppear") {//告警出现 + + } else if (msg.type == "alarmConfirm") {//告警确认 + + } + }); + return () => { + socket.off("alarmSendSocket"); + } + } + }, [socket]) + useEffect(() => { consoleToollink() @@ -770,7 +787,7 @@ function mapStateToProps (state) { actions: global.actions, pepProjectId: global.pepProjectId, // members: members.data, - // socket: webSocket.socket + socket: webSocket.socket }; }