wenlele
2 years ago
15 changed files with 954 additions and 158 deletions
@ -1,20 +1,171 @@ |
|||
'use strict'; |
|||
|
|||
const moment = require('moment'); |
|||
const Kafka = require('kafka-node'); |
|||
|
|||
module.exports = async function factory(app, opts) { |
|||
const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL }); |
|||
try { |
|||
const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL, fromOffset: true }); |
|||
const producer = new Kafka.HighLevelProducer(client); |
|||
|
|||
producer.on('error', function (err) { |
|||
app.fs.logger.log('error', "[FS-KAFKA]", err); |
|||
}); |
|||
producer.on("ready", function () { |
|||
console.log('111111 ready 666666666666') |
|||
}) |
|||
|
|||
const kafka = { |
|||
producer: producer, |
|||
configUpdateMessage: opts.configUpdateMessage || {} |
|||
}; |
|||
// const kafka = {
|
|||
// producer: producer,
|
|||
// configUpdateMessage: opts.configUpdateMessage || {}
|
|||
// };
|
|||
|
|||
app.fs.kafka = kafka; |
|||
// app.fs.kafka = kafka;
|
|||
app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success"); |
|||
|
|||
|
|||
// const topics = [{ topic: 'anxinyun_alarm', partition: 0 }]
|
|||
// const options = {
|
|||
// // groupId: 'topic-test-one',
|
|||
// autoCommit: false,
|
|||
// //fromOffset: true,
|
|||
// fromOffset: 'latest'
|
|||
// }
|
|||
// const consumer = new Kafka.Consumer(client, topics, options)
|
|||
// consumer.on("ready", function () {
|
|||
// console.log('consumer ready 666666666666')
|
|||
// })
|
|||
// // consumer.on("message", function (w) {
|
|||
// // console.log('consumer ready 666666666666')
|
|||
// // })
|
|||
// consumer.on('message', function (message) {
|
|||
// const decodedMessage = JSON.parse(message.value)
|
|||
// console.log('decodedMessage: ', decodedMessage)
|
|||
// })
|
|||
// consumer.on('error', function (err) {
|
|||
// console.log('consumer err:')
|
|||
// console.log(err);
|
|||
// });
|
|||
|
|||
|
|||
let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId: 'yunwei-platform-api', fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm']) |
|||
consumer.on('message', async function (message) { |
|||
let msg = JSON.parse(message.value) |
|||
console.log('kafka consumer----------接收到消息'); |
|||
await savePullAlarm(msg); |
|||
}) |
|||
// let offset = new Kafka.Offset(client);
|
|||
// consumer.on('offsetOutOfRange', function (topic) {
|
|||
// console.log('offsetOutOfRange')
|
|||
// // consumer.setOffset('topic', 0, 0);
|
|||
// // topic.maxNum = 1;
|
|||
// offset.fetch([topic], function (err, offsets) {
|
|||
// if (err) {
|
|||
// return console.error(err);
|
|||
// }
|
|||
// try {
|
|||
// const max = Math.max.apply(null, offsets[topic.topic][topic.partition]);
|
|||
// consumer.setOffset(topic.topic, topic.partition, max);
|
|||
// } catch (error) {
|
|||
// console.log(error);
|
|||
// }
|
|||
|
|||
// });
|
|||
// });
|
|||
|
|||
let structsAche = { |
|||
dataList: [], |
|||
expireTime: null//10分钟更新一次结构物列表
|
|||
} |
|||
async function getStructsAche() { |
|||
const { utils: { getAxyStructs } } = app.fs |
|||
try { |
|||
if (!structsAche.dataList.length || moment() > moment(structsAche.expireTime)) { |
|||
let structList = await getAxyStructs(); |
|||
structsAche.dataList = structList; |
|||
structsAche.expireTime = moment().add(10, 'minute').format('YYYY-MM-DD HH:mm:ss'); |
|||
} |
|||
return structsAche; |
|||
} catch (err) { |
|||
console.log(`获取结构物列表失败, error: ${err}`); |
|||
} |
|||
} |
|||
|
|||
//保存告警[发现]
|
|||
async function savePullAlarm(msg) { |
|||
const { clickHouse, utils: { sendAppearToWeb, sendConfirmToWeb } } = app.fs |
|||
try { |
|||
let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg; |
|||
let structsAche = await getStructsAche(); |
|||
if (structsAche) { |
|||
let structs = structsAche.dataList;//结构物列表
|
|||
const { models } = app.fs.dc |
|||
let exist = structs.find(s => s.strucId == structureId); |
|||
if (exist) { |
|||
let alarmType = await clickHouse.anxinyun.query( |
|||
`SELECT name FROM t_alarm_type WHERE code='${alarmTypeCode}'`).toPromise() |
|||
let type = alarmType.length ? alarmType[0].name : '' |
|||
|
|||
let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); |
|||
if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1
|
|||
let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间
|
|||
return { |
|||
projectCorrelationId: d, |
|||
alarmInfo: { messageMode, sourceName, alarmTypeCode, content }, |
|||
time: time, |
|||
type//异常类型
|
|||
} |
|||
}) |
|||
let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); |
|||
let dynamics = rslt.map(r => { |
|||
return { |
|||
time: r.time, |
|||
alarmAppearId: r.id, |
|||
projectCorrelationId: r.projectCorrelationId, |
|||
type: 1//发现
|
|||
} |
|||
}) |
|||
await models.LatestDynamicList.bulkCreate(dynamics); |
|||
|
|||
//消息推送到前端
|
|||
if (datas.length) { |
|||
await sendAppearToWeb(datas, 'data'); |
|||
} |
|||
|
|||
|
|||
} else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2
|
|||
let datas = projects.map(d => { |
|||
return { |
|||
pepUserId: null, |
|||
projectCorrelationId: d, |
|||
alarmInfo: { source: sourceName, type },//包含告警id,type,source
|
|||
confirmTime: time, |
|||
confirmContent: '自动恢复' |
|||
} |
|||
}) |
|||
let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true }); |
|||
let dynamics = rslt.map(r => { |
|||
return { |
|||
time: r.confirmTime, |
|||
alarmConfirmId: r.id, |
|||
projectCorrelationId: r.projectCorrelationId, |
|||
type: 4//告警确认
|
|||
} |
|||
}) |
|||
await models.LatestDynamicList.bulkCreate(dynamics); |
|||
|
|||
|
|||
//消息推送到前端
|
|||
if (datas.length) { |
|||
await sendConfirmToWeb(datas, true); |
|||
} |
|||
} |
|||
} |
|||
} else { |
|||
console.log(`获取结构物列表失败, error: ${err}`); |
|||
} |
|||
} catch (error) { |
|||
console.error(error); |
|||
} |
|||
} |
|||
} catch (error) { |
|||
console.log(error); |
|||
} |
|||
} |
@ -0,0 +1,132 @@ |
|||
'use strict'; |
|||
|
|||
module.exports = function (app, opts) { |
|||
const { models } = app.fs.dc |
|||
const { clickHouse } = app.fs |
|||
|
|||
let constAlarmGroups = { |
|||
1: '数据中断', |
|||
2: '数据异常', |
|||
3: '策略命中', |
|||
4: '设备异常', |
|||
5: '设备异常', |
|||
'video': '视频异常', |
|||
'app': '应用异常' |
|||
} |
|||
async function sendAppearToWeb(datas, ttype) { |
|||
try { |
|||
//告警类型
|
|||
let alarmGroup = null |
|||
|
|||
//项目信息
|
|||
let { projects, pepProjects } = await getProjectsInfo(datas); |
|||
|
|||
//数据类区分alarmGroup
|
|||
if (ttype == 'data') { |
|||
let alarm_group = await clickHouse.anxinyun.query( |
|||
`SELECT alarm_group FROM t_alarm_code WHERE code='${datas[0].alarmInfo.alarmTypeCode}'`).toPromise(); |
|||
|
|||
alarmGroup = alarm_group.length ? constAlarmGroups[alarm_group[0].alarm_group] : null |
|||
} else { |
|||
alarmGroup = constAlarmGroups[ttype] |
|||
} |
|||
|
|||
let sendData = [] |
|||
datas.map(ld => { |
|||
let pepPId = projects.find(p => p.id == ld.projectCorrelationId).pepProjectId; |
|||
sendData.push({ |
|||
projectCorrelationId: ld.projectCorrelationId, |
|||
project: projects.find(p => p.id == ld.projectCorrelationId).name || pepProjects.find(pp => pp.id == pepPId).project_name,//前者为自定义项目名称
|
|||
source: ld.alarmInfo.sourceName, |
|||
type: ld.type, |
|||
time: ld.time, |
|||
alarmGroup//告警类型
|
|||
}) |
|||
}) |
|||
app.socket.emit('alarmSendSocket', { type: 'alarmAppear', sendData }) |
|||
} catch (err) { |
|||
console.log(`告警(发现)推送失败, error: ${err}`); |
|||
} |
|||
} |
|||
|
|||
async function sendConfirmToWeb(logDatas, isAuto) { |
|||
try { |
|||
//用户信息
|
|||
let userName = null |
|||
if (!isAuto) { |
|||
let userPepRes = await clickHouse.pepEmis.query( |
|||
`SELECT DISTINCT user.id AS id, "user"."name" AS name FROM user WHERE user.id=${logDatas[0].pepUserId}`).toPromise(); |
|||
userName = userPepRes.length ? userPepRes[0].name : null |
|||
} |
|||
|
|||
//项目信息
|
|||
let { projects, pepProjects } = await getProjectsInfo(logDatas); |
|||
let sendData = [] |
|||
logDatas.map(ld => { |
|||
let pepPId = projects.find(p => p.id == ld.projectCorrelationId).pepProjectId; |
|||
sendData.push({ |
|||
user: userName, |
|||
projectCorrelationId: ld.projectCorrelationId, |
|||
project: projects.find(p => p.id == ld.projectCorrelationId).name || pepProjects.find(pp => pp.id == pepPId).project_name,//前者为自定义项目名称
|
|||
source: ld.alarmInfo.source, |
|||
type: ld.alarmInfo.type, |
|||
time: ld.confirmTime, |
|||
isAuto//是否为自动恢复,自动恢复时user为null
|
|||
}) |
|||
}) |
|||
app.socket.emit('alarmSendSocket', { type: 'alarmConfirm', sendData })//小飞(处理人) 确认并关闭了A项目(项目) DTU设备(告警源) 状态异常(异常类型)的问题
|
|||
} catch (err) { |
|||
console.log(`告警(确认)推送失败, error: ${err}`); |
|||
} |
|||
} |
|||
|
|||
async function getProjectsInfo(datas) { |
|||
try { |
|||
let pIds = datas.map(l => l.projectCorrelationId);//所有的项目的id
|
|||
let projects = await models.ProjectCorrelation.findAll({ |
|||
where: { id: { $in: pIds } }, |
|||
attributes: ['id', 'name', 'pepProjectId'] |
|||
}); |
|||
|
|||
let pepPojectIds = new Set(); |
|||
for (let p of projects) { |
|||
pepPojectIds.add(p.pepProjectId); |
|||
} |
|||
|
|||
let pepProjects = pepPojectIds.size ? await clickHouse.projectManage.query(` |
|||
SELECT id, project_name FROM t_pim_project WHERE id IN (${[...pepPojectIds]})`).toPromise() : [];
|
|||
|
|||
return { projects, pepProjects }; |
|||
} catch (err) { |
|||
console.log(`获取项目信息失败, error: ${err}`); |
|||
} |
|||
} |
|||
|
|||
async function sendNoticeToWeb(pepUsers, data) { |
|||
try { |
|||
let { cfgName, tactics, tacticsParams, projectCorrelationId, time } = data; |
|||
//项目信息
|
|||
let { projects, pepProjects } = await getProjectsInfo([data]); |
|||
let pepPId = projects.find(p => p.id == projectCorrelationId).pepProjectId; |
|||
|
|||
//需要 策略名称 处理人 项目 策略和参数 时间
|
|||
let sendData = { |
|||
pushConfig: { cfgName, tactics, tacticsParams },//策略信息
|
|||
pepUsers, |
|||
projectCorrelationId: projectCorrelationId, |
|||
project: projects.find(p => p.id == projectCorrelationId).name || pepProjects.find(pp => pp.id == pepPId).project_name,//前者为自定义项目名称
|
|||
time |
|||
} |
|||
|
|||
app.socket.emit('alarmSendSocket', { type: 'alarmNotice', sendData }) |
|||
} catch (err) { |
|||
console.log(`推送通知失败, error: ${err}`); |
|||
} |
|||
} |
|||
|
|||
return { |
|||
sendAppearToWeb,//推送告警发现
|
|||
sendConfirmToWeb,//推送告警确认
|
|||
sendNoticeToWeb//推送通知
|
|||
} |
|||
} |
@ -0,0 +1,159 @@ |
|||
'use strict'; |
|||
|
|||
const moment = require('moment') |
|||
|
|||
module.exports = function (app, opts) { |
|||
const { models } = app.fs.dc |
|||
const { clickHouse } = app.fs |
|||
const { database: anxinyun } = clickHouse.anxinyun.opts.config |
|||
async function getAxyStructs(pepProjectId) { |
|||
try { |
|||
try { |
|||
const { pepProjectRes, bindRes } = await pomsWithPepRangeParams(pepProjectId) |
|||
// 获取不重复的 安心云项目 id
|
|||
const anxinProjectIds = [ |
|||
...(bindRes).reduce( |
|||
(arr, b) => { |
|||
for (let sid of b.anxinProjectId) { |
|||
arr.add(sid); |
|||
} |
|||
return arr; |
|||
}, |
|||
new Set() |
|||
) |
|||
] |
|||
// 查询安心云项目及结构物信息
|
|||
const undelStrucRes = anxinProjectIds.length ? |
|||
await clickHouse.anxinyun.query( |
|||
`SELECT
|
|||
t_project.id AS projectId, |
|||
t_structure.id AS strucId, |
|||
t_structure.name AS strucName, |
|||
project_state |
|||
FROM |
|||
t_project |
|||
LEFT JOIN |
|||
t_project_structure |
|||
ON t_project_structure.project = t_project.id |
|||
LEFT JOIN |
|||
t_project_structuregroup |
|||
ON t_project_structuregroup.project = t_project.id |
|||
LEFT JOIN |
|||
t_structuregroup_structure |
|||
ON t_structuregroup_structure.structuregroup = t_project_structuregroup.structuregroup |
|||
LEFT JOIN |
|||
t_project_construction |
|||
ON t_project_construction.project = t_project.id |
|||
LEFT JOIN |
|||
t_structure_site |
|||
ON t_structure_site.siteid = t_project_construction.construction |
|||
RIGHT JOIN |
|||
t_structure |
|||
ON t_structure.id = t_project_structure.structure |
|||
OR t_structure.id = t_structuregroup_structure.structure |
|||
OR t_structure.id = t_structure_site.structid |
|||
WHERE |
|||
project_state != -1 |
|||
AND |
|||
t_project.id IN (${anxinProjectIds.join(',')})`).toPromise() : []
|
|||
|
|||
// 构建安心云结构物和项企项目的关系
|
|||
// 并保存信息至数据
|
|||
let undelStruc = [] |
|||
for (let s of undelStrucRes) { |
|||
let corStruc = undelStruc.find(us => us.strucId == s.strucId) |
|||
if (corStruc) { |
|||
if (!corStruc.project.some(cp => cp.id == s.projectId)) { |
|||
corStruc.project.push({ |
|||
id: s.projectId |
|||
}) |
|||
} |
|||
} else { |
|||
corStruc = { |
|||
strucId: s.strucId, |
|||
strucName: s.strucName, |
|||
projectId: s.projectId, |
|||
project: [{ |
|||
id: s.projectId, |
|||
}], |
|||
pomsProject: [] |
|||
} |
|||
undelStruc.push(corStruc) |
|||
} |
|||
for (let { dataValues: br } of bindRes) { |
|||
if (br.anxinProjectId.some(braId => braId == s.projectId)) { |
|||
let corPepProject = pepProjectRes.find(pp => pp.id == br.pepProjectId) |
|||
let corPomsProject = corStruc.pomsProject.find(cp => cp.id == br.id) |
|||
|
|||
if (corPomsProject) { |
|||
// poms 的 project 和 pep 的 project 是一对一的关系 所以这个情况不用处理
|
|||
} else { |
|||
corStruc.pomsProject.push({ |
|||
...br, |
|||
pepProject: corPepProject |
|||
}) |
|||
} |
|||
|
|||
} |
|||
} |
|||
} |
|||
return undelStruc |
|||
} catch (error) { |
|||
console.error(error); |
|||
} |
|||
} catch (error) { |
|||
console.log(error) |
|||
} |
|||
} |
|||
|
|||
async function pomsWithPepRangeParams(pepProjectId) { |
|||
try { |
|||
let findOption = { |
|||
where: { |
|||
del: false |
|||
} |
|||
} |
|||
if (pepProjectId) { |
|||
// 有 特定的项目id 就按此查询
|
|||
findOption.where.id = pepProjectId |
|||
} |
|||
const bindRes = await models.ProjectCorrelation.findAll(findOption); |
|||
// 获取不重复的 项企项目id
|
|||
let pepProjectIds = [] |
|||
for (let b of bindRes) { |
|||
if (b.pepProjectId) { |
|||
pepProjectIds.push(b.pepProjectId) |
|||
} |
|||
} |
|||
// 查询项企项目的信息
|
|||
const pepProjectRes = pepProjectIds.length ? |
|||
await clickHouse.projectManage.query( |
|||
`SELECT
|
|||
t_pim_project.id AS id, |
|||
t_pim_project.project_name AS projectName, |
|||
t_pim_project.isdelete AS isdelete, |
|||
t_pim_project_construction.construction_status_id AS constructionStatusId, |
|||
t_pim_project_state.construction_status AS constructionStatus |
|||
FROM |
|||
t_pim_project |
|||
LEFT JOIN t_pim_project_construction |
|||
ON t_pim_project.id = t_pim_project_construction.project_id |
|||
LEFT JOIN t_pim_project_state |
|||
ON t_pim_project_construction.construction_status_id = t_pim_project_state.id |
|||
WHERE |
|||
id IN (${pepProjectIds.join(',')})` |
|||
).toPromise() : []; |
|||
|
|||
return { |
|||
pepProjectRes, bindRes |
|||
} |
|||
|
|||
} catch (error) { |
|||
console.error(error); |
|||
} |
|||
} |
|||
|
|||
return { |
|||
getAxyStructs |
|||
} |
|||
} |
Loading…
Reference in new issue