Browse Source

(*)视频告警和应用告警新增接口中加日志记录; kafka接收消息修改

dev
wuqun 3 years ago
parent
commit
bd0a6167ad
  1. 42
      api/app/lib/controllers/alarm/app.js
  2. 29
      api/app/lib/controllers/alarm/video.js
  3. 114
      api/app/lib/service/kafka.js

42
api/app/lib/controllers/alarm/app.js

@ -198,6 +198,48 @@ async function apiError(ctx) {
storageData.createTime = now storageData.createTime = now
storageData.screenshot = screenshot storageData.screenshot = screenshot
await models.AppAlarm.create(storageData) await models.AppAlarm.create(storageData)
//存告警记录
let constTypes = { 'element': "元素异常", 'apiError': "接口报错 ", 'timeout': "加载超时" }
let belongsTo = await models.ProjectApp.findOne({
where: {
id: projectAppId
},
include: [{
model: models.ProjectCorrelation,
attributes: ['id'],
where: { del: false }
}]
})
if (belongsTo) {
let appName = await models.App.findOne({
where: {
id: belongsTo.appId
},
attributes: ['name'],
})
let projects = belongsTo.projectCorrelation.map(d => d.id);//归属项目
if (projects.length) {
let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间
return {
projectCorrelationId: d,
alarmInfo: { messageMode: 'AlarmGeneration', sourceName: appName.name, content: alarmContent, type },//AlarmGeneration代表告警首次产生
time: now,
type: constTypes[type]
}
})
let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true });
let dynamics = rslt.map(r => {
return {
time: r.time,
alarmAppearId: r.id,
projectCorrelationId: r.projectCorrelationId,
type: 1//发现
}
})
await models.LatestDynamicList.bulkCreate(dynamics);
}
}
} }
} }

29
api/app/lib/controllers/alarm/video.js

@ -287,9 +287,38 @@ async function confirm (ctx) {
async function alarmAdded (ctx) { async function alarmAdded (ctx) {
try { try {
const { models } = ctx.fs.dc; const { models } = ctx.fs.dc;
const { clickHouse } = ctx.app.fs
const { utils: { anxinStrucIdRange } } = ctx.app.fs
let anxinStruc = await anxinStrucIdRange({ ctx })
const { serial_no, channel_no, create_time, description, status_id } = ctx.request.body;
let belongToStruct = await clickHouse.anxinyun.query(
`SELECT name, structure FROM t_video_ipc WHERE serial_no='${serial_no}' and channel_no='${channel_no}'`).toPromise()
let structId = belongToStruct.length ? belongToStruct[0].structure : null
if (structId) {
let exist = anxinStruc.find(s => s.strucId == structId);
let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id);
let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间
return {
projectCorrelationId: d,
alarmInfo: { messageMode: 'AlarmGeneration', sourceName: belongToStruct[0].name, status_id, content: description },//AlarmGeneration代表告警首次产生
time: create_time,
type: description
}
})
let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true });
let dynamics = rslt.map(r => {
return {
time: r.time,
alarmAppearId: r.id,
projectCorrelationId: r.projectCorrelationId,
type: 1//发现
}
})
await models.LatestDynamicList.bulkCreate(dynamics);
}
ctx.status = 200; ctx.status = 200;
} catch (error) { } catch (error) {
ctx.fs.logger.error(`path: ${ctx.path}, error: error`); ctx.fs.logger.error(`path: ${ctx.path}, error: error`);

114
api/app/lib/service/kafka.js

@ -1,74 +1,79 @@
'use strict'; 'use strict';
const moment = require('moment'); const moment = require('moment');
const Kafka = require('kafka-node'); const Kafka = require('kafka-node');
//const { getAxyStructs } = require('../utils/helper');
module.exports = async function factory(app, opts) { module.exports = async function factory(app, opts) {
try {
const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL, fromOffset: true });
let structsAche = {
dataList: [],
expireTime: null//10分钟更新一次结构物列表
}
const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL });
const producer = new Kafka.HighLevelProducer(client); const producer = new Kafka.HighLevelProducer(client);
producer.on('error', function (err) { producer.on('error', function (err) {
app.fs.logger.log('error', "[FS-KAFKA]", err); app.fs.logger.log('error', "[FS-KAFKA]", err);
}); });
producer.on("ready", function () {
console.log('111111 ready 666666666666')
})
const kafka = { // const kafka = {
producer: producer, // producer: producer,
configUpdateMessage: opts.configUpdateMessage || {} // configUpdateMessage: opts.configUpdateMessage || {}
}; // };
app.fs.kafka = kafka; // app.fs.kafka = kafka;
app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success"); app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success");
////------------------------------------------------------------------- try try try
// setTimeout(async () => { // const topics = [{ topic: 'anxinyun_alarm', partition: 0 }]
// let msg = { // const options = {
// "messageMode": "AlarmGeneration", // // groupId: 'topic-test-one',
// "structureId": 1043, // autoCommit: false,
// "structureName": "TPA 数据波动影响分析及实验", // //fromOffset: true,
// "sourceId": "727466e9-28c3-48f0-a320-3fb66b7e4151", // fromOffset: 'latest'
// "sourceName": "忻德TPA1",
// "alarmTypeCode": "3004",
// "alarmCode": "3002",
// "content": "link [soip:40001:00012532] is nil",
// "time": "2022-10-31T11:21:14.000+0800",
// "sourceTypeId": 1,
// "sponsor": "et.recv",
// "extras": null
// } // }
// await savePullAlarm(msg); // const consumer = new Kafka.Consumer(client, topics, options)
// }, 3000) // consumer.on("ready", function () {
// console.log('consumer ready 666666666666')
// })
// // consumer.on("message", function (w) {
// // console.log('consumer ready 666666666666')
// // })
// consumer.on('message', function (message) {
// const decodedMessage = JSON.parse(message.value)
// console.log('decodedMessage: ', decodedMessage)
// })
// consumer.on('error', function (err) {
// console.log('consumer err:')
// console.log(err);
// });
const topics = [{ topic: 'anxinyun_alarm', partition: 0 }]
const options = { let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId: 'yunwei-platform-api', fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm'])
groupId: 'topic-test-one', consumer.on('message', async function (message) {
autoCommit: true, let msg = JSON.parse(message.value)
} await savePullAlarm(msg);
const consumer = new Kafka.Consumer(client, topics, options)
consumer.on("ready", function () {
console.log('consumer ready 666666666666')
})
consumer.on('message', function (message) {
const buf = new Buffer(String(message.value), 'binary')
const decodedMessage = JSON.parse(buf.toString())
console.log('decodedMessage: ', decodedMessage)
})
consumer.on('error', function (err) {
console.log('error', err)
})
process.on('SIGINT', function () {
consumer.close(true, function () {
process.exit()
})
}) })
// let offset = new Kafka.Offset(client);
// consumer.on('offsetOutOfRange', function (topic) {
// console.log('offsetOutOfRange')
// // consumer.setOffset('topic', 0, 0);
// // topic.maxNum = 1;
// offset.fetch([topic], function (err, offsets) {
// if (err) {
// return console.error(err);
// }
// try {
// const max = Math.max.apply(null, offsets[topic.topic][topic.partition]);
// consumer.setOffset(topic.topic, topic.partition, max);
// } catch (error) {
// console.log(error);
// }
// });
// });
let structsAche = {
dataList: [],
expireTime: null//10分钟更新一次结构物列表
}
async function getStructsAche() { async function getStructsAche() {
const { utils: { getAxyStructs } } = app.fs const { utils: { getAxyStructs } } = app.fs
try { try {
@ -103,7 +108,7 @@ module.exports = async function factory(app, opts) {
let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间 let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间
return { return {
projectCorrelationId: d, projectCorrelationId: d,
alarmInfo: { sourceName, alarmTypeCode }, alarmInfo: { messageMode, sourceName, alarmTypeCode, content },
time: time, time: time,
type//异常类型 type//异常类型
} }
@ -147,4 +152,7 @@ module.exports = async function factory(app, opts) {
console.error(error); console.error(error);
} }
} }
} catch (error) {
console.log(error);
}
} }
Loading…
Cancel
Save