From 070dcde441cc625b3fba23a8c265cbd8b7efb2eb Mon Sep 17 00:00:00 2001 From: wuqun Date: Mon, 31 Oct 2022 16:49:07 +0800 Subject: [PATCH] =?UTF-8?q?(*)kafka=E6=8E=A5=E6=94=B6=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E8=B0=83=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/.vscode/launch.json | 2 +- api/app/lib/service/kafka.js | 131 +++++++++++++++++++++++++++- api/app/lib/utils/helper.js | 160 +++++++++++++++++++++++++++++++++++ api/package.json | 2 +- 4 files changed, 290 insertions(+), 5 deletions(-) create mode 100644 api/app/lib/utils/helper.js diff --git a/api/.vscode/launch.json b/api/.vscode/launch.json index 69a9028..137834e 100644 --- a/api/.vscode/launch.json +++ b/api/.vscode/launch.json @@ -19,7 +19,7 @@ "-g postgres://postgres:123@10.8.30.32:5432/orational_service", // 测试 // "-g postgres://FashionAdmin:123456@10.8.30.156:5432/POMS", - "-k node35:6667,node36:6667,node37:6667", + "-k 10.8.30.72:29092,10.8.30.73:29092,10.8.30.74:29092", "--iotaProxy http://10.8.30.157:17007", "--redisHost 10.8.30.112", "--redisPort 6379", diff --git a/api/app/lib/service/kafka.js b/api/app/lib/service/kafka.js index f164eee..7e3046d 100644 --- a/api/app/lib/service/kafka.js +++ b/api/app/lib/service/kafka.js @@ -1,11 +1,19 @@ 'use strict'; - +const moment = require('moment'); const Kafka = require('kafka-node'); +const { getAxyStructs } = require('../utils/helper'); + +module.exports = async function factory(app, opts) { + -module.exports = async function factory (app, opts) { + let structsAche = { + dataList: [], + expireTime: null//10分钟更新一次结构物列表 + } + + //const { utils: { getAxyStructs } } = app.fs const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL }); const producer = new Kafka.HighLevelProducer(client); - producer.on('error', function (err) { app.fs.logger.log('error', "[FS-KAFKA]", err); }); @@ -17,4 +25,121 @@ module.exports = async function factory (app, opts) { app.fs.kafka = kafka; app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success"); + + ////------------------------------------------------------------------- try try try + + // setTimeout(async () => { + // let msg = { + // "messageMode": "AlarmGeneration", + // "structureId": 1043, + // "structureName": "TPA 数据波动影响分析及实验", + // "sourceId": "727466e9-28c3-48f0-a320-3fb66b7e4151", + // "sourceName": "忻德TPA1", + // "alarmTypeCode": "3004", + // "alarmCode": "3002", + // "content": "link [soip:40001:00012532] is nil", + // "time": "2022-10-31T11:21:14.000+0800", + // "sourceTypeId": 1, + // "sponsor": "et.recv", + // "extras": null + // } + // await savePullAlarm(msg); + // }, 3000) + + const topics = [{ topic: 'anxinyun_alarm', partition: 0 }] + const options = { + groupId: 'topic-test-one', + autoCommit: true, + } + const consumer = new Kafka.Consumer(client, topics, options) + consumer.on("ready", function () { + console.log('consumer ready 666666666666') + }) + consumer.on('message', function (message) { + const buf = new Buffer(String(message.value), 'binary') + const decodedMessage = JSON.parse(buf.toString()) + console.log('decodedMessage: ', decodedMessage) + }) + consumer.on('error', function (err) { + console.log('error', err) + }) + process.on('SIGINT', function () { + consumer.close(true, function () { + process.exit() + }) + }) + + + async function getStructsAche(app) { + try { + if (!structsAche.dataList.length || moment() > moment(structsAche.expireTime)) { + let structList = await getAxyStructs(app); + structsAche.dataList = structList; + structsAche.expireTime = moment().add(10, 'minute').format('YYYY-MM-DD HH:mm:ss'); + } + return structsAche; + } catch (err) { + console.log(`获取结构物列表失败, error: ${err}`); + } + } + + //保存告警[发现] + async function savePullAlarm(msg) { + try { + let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg; + let structsAche = await getStructsAche(app); + if (structsAche) { + let structs = structsAche.dataList;//结构物列表 + const { models } = app.fs.dc + let exist = structs.find(s => s.strucId == structureId); + if (exist) { + let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); + if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1 + let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间 + return { + projectCorrelationId: d, + alarmInfo: { structureId, sourceName, alarmTypeCode, alarmCode, content }, + time: time, + type: 1//异常类型 TODO + } + }) + let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); + let dynamics = rslt.map(r => { + return { + time: r.time, + alarmAppearId: r.id, + projectCorrelationId: r.projectCorrelationId, + type: 1//发现 + } + }) + await models.LatestDynamicList.bulkCreate(dynamics); + } else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2 + let datas = projects.map(d => { + return { + pepUserId: null, + projectCorrelationId: d, + alarmInfo: { sourceName, alarmTypeCode, alarmCode },//包含告警id,type,source TODO + confirmTime: time, + confirmContent: '自动恢复' + } + }) + let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true }); + let dynamics = rslt.map(r => { + return { + time: r.confirmTime, + alarmConfirmId: r.id, + projectCorrelationId: r.projectCorrelationId, + type: 4//告警确认 + } + }) + await models.LatestDynamicList.bulkCreate(dynamics); + } + } + } else { + console.log(`获取结构物列表失败, error: ${err}`); + } + } catch (error) { + console.error(error); + } + } } \ No newline at end of file diff --git a/api/app/lib/utils/helper.js b/api/app/lib/utils/helper.js new file mode 100644 index 0000000..e5e6b04 --- /dev/null +++ b/api/app/lib/utils/helper.js @@ -0,0 +1,160 @@ +'use strict'; + +const moment = require('moment') + +async function getAxyStructs(app, pepProjectId) { + const { models } = app.fs.dc + const { clickHouse } = app.fs + const { database: anxinyun } = clickHouse.anxinyun.opts.config + try { + try { + const { pepProjectRes, bindRes } = await pomsWithPepRangeParams(app, pepProjectId) + // 获取不重复的 安心云项目 id + const anxinProjectIds = [ + ...(bindRes).reduce( + (arr, b) => { + for (let sid of b.anxinProjectId) { + arr.add(sid); + } + return arr; + }, + new Set() + ) + ] + // 查询安心云项目及结构物信息 + const undelStrucRes = anxinProjectIds.length ? + await clickHouse.anxinyun.query( + `SELECT + t_project.id AS projectId, + t_structure.id AS strucId, + t_structure.name AS strucName, + project_state + FROM + t_project + LEFT JOIN + t_project_structure + ON t_project_structure.project = t_project.id + LEFT JOIN + t_project_structuregroup + ON t_project_structuregroup.project = t_project.id + LEFT JOIN + t_structuregroup_structure + ON t_structuregroup_structure.structuregroup = t_project_structuregroup.structuregroup + LEFT JOIN + t_project_construction + ON t_project_construction.project = t_project.id + LEFT JOIN + t_structure_site + ON t_structure_site.siteid = t_project_construction.construction + RIGHT JOIN + t_structure + ON t_structure.id = t_project_structure.structure + OR t_structure.id = t_structuregroup_structure.structure + OR t_structure.id = t_structure_site.structid + WHERE + project_state != -1 + AND + t_project.id IN (${anxinProjectIds.join(',')})`).toPromise() : [] + + // 构建安心云结构物和项企项目的关系 + // 并保存信息至数据 + let undelStruc = [] + for (let s of undelStrucRes) { + let corStruc = undelStruc.find(us => us.strucId == s.strucId) + if (corStruc) { + if (!corStruc.project.some(cp => cp.id == s.projectId)) { + corStruc.project.push({ + id: s.projectId + }) + } + } else { + corStruc = { + strucId: s.strucId, + strucName: s.strucName, + projectId: s.projectId, + project: [{ + id: s.projectId, + }], + pomsProject: [] + } + undelStruc.push(corStruc) + } + for (let { dataValues: br } of bindRes) { + if (br.anxinProjectId.some(braId => braId == s.projectId)) { + let corPepProject = pepProjectRes.find(pp => pp.id == br.pepProjectId) + let corPomsProject = corStruc.pomsProject.find(cp => cp.id == br.id) + + if (corPomsProject) { + // poms 的 project 和 pep 的 project 是一对一的关系 所以这个情况不用处理 + } else { + corStruc.pomsProject.push({ + ...br, + pepProject: corPepProject + }) + } + + } + } + } + return undelStruc + } catch (error) { + console.error(error); + } + } catch (error) { + console.log(error) + } +} + +async function pomsWithPepRangeParams(app, pepProjectId) { + const { models } = app.fs.dc + const { clickHouse } = app.fs + const { database: anxinyun } = clickHouse.anxinyun.opts.config + try { + let findOption = { + where: { + del: false + } + } + if (pepProjectId) { + // 有 特定的项目id 就按此查询 + findOption.where.id = pepProjectId + } + const bindRes = await models.ProjectCorrelation.findAll(findOption); + // 获取不重复的 项企项目id + let pepProjectIds = [] + for (let b of bindRes) { + if (b.pepProjectId) { + pepProjectIds.push(b.pepProjectId) + } + } + // 查询项企项目的信息 + const pepProjectRes = pepProjectIds.length ? + await clickHouse.projectManage.query( + `SELECT + t_pim_project.id AS id, + t_pim_project.project_name AS projectName, + t_pim_project.isdelete AS isdelete, + t_pim_project_construction.construction_status_id AS constructionStatusId, + t_pim_project_state.construction_status AS constructionStatus + FROM + t_pim_project + LEFT JOIN t_pim_project_construction + ON t_pim_project.id = t_pim_project_construction.project_id + LEFT JOIN t_pim_project_state + ON t_pim_project_construction.construction_status_id = t_pim_project_state.id + WHERE + id IN (${pepProjectIds.join(',')})` + ).toPromise() : []; + + return { + pepProjectRes, bindRes + } + + } catch (error) { + console.error(error); + } +} + +module.exports = { + getAxyStructs +}; \ No newline at end of file diff --git a/api/package.json b/api/package.json index 4d3e893..f630fc4 100644 --- a/api/package.json +++ b/api/package.json @@ -22,7 +22,7 @@ "file-saver": "^2.0.2", "fs-web-server-scaffold": "^2.0.2", "ioredis": "^5.0.4", - "kafka-node": "^2.2.3", + "kafka-node": "^5.0.0", "koa-convert": "^1.2.0", "koa-proxy": "^0.9.0", "moment": "^2.24.0",