From 81598a01056e4827c87b217180735e59a09effe5 Mon Sep 17 00:00:00 2001 From: wuqun Date: Mon, 31 Oct 2022 17:19:27 +0800 Subject: [PATCH] =?UTF-8?q?(*)=E4=BF=AE=E5=A4=8D=E5=90=AF=E5=8A=A8?= =?UTF-8?q?=E6=8E=A7=E5=88=B6=E5=8F=B0=E6=8A=A5=E9=94=99;=20kafka=E5=91=8A?= =?UTF-8?q?=E8=AD=A6=E5=A4=84=E7=90=86=E5=8A=A0=E5=BC=82=E5=B8=B8=E7=B1=BB?= =?UTF-8?q?=E5=9E=8B=E5=AD=97=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/app/lib/service/kafka.js | 21 ++-- api/app/lib/utils/helper.js | 183 +++++++++++++++++------------------ 2 files changed, 104 insertions(+), 100 deletions(-) diff --git a/api/app/lib/service/kafka.js b/api/app/lib/service/kafka.js index 7e3046d..0c81913 100644 --- a/api/app/lib/service/kafka.js +++ b/api/app/lib/service/kafka.js @@ -1,7 +1,7 @@ 'use strict'; const moment = require('moment'); const Kafka = require('kafka-node'); -const { getAxyStructs } = require('../utils/helper'); +//const { getAxyStructs } = require('../utils/helper'); module.exports = async function factory(app, opts) { @@ -11,7 +11,6 @@ module.exports = async function factory(app, opts) { expireTime: null//10分钟更新一次结构物列表 } - //const { utils: { getAxyStructs } } = app.fs const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL }); const producer = new Kafka.HighLevelProducer(client); producer.on('error', function (err) { @@ -70,10 +69,11 @@ module.exports = async function factory(app, opts) { }) - async function getStructsAche(app) { + async function getStructsAche() { + const { utils: { getAxyStructs } } = app.fs try { if (!structsAche.dataList.length || moment() > moment(structsAche.expireTime)) { - let structList = await getAxyStructs(app); + let structList = await getAxyStructs(); structsAche.dataList = structList; structsAche.expireTime = moment().add(10, 'minute').format('YYYY-MM-DD HH:mm:ss'); } @@ -85,22 +85,27 @@ module.exports = async function factory(app, opts) { //保存告警[发现] async function savePullAlarm(msg) { + const { clickHouse } = app.fs try { let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg; - let structsAche = await getStructsAche(app); + let structsAche = await getStructsAche(); if (structsAche) { let structs = structsAche.dataList;//结构物列表 const { models } = app.fs.dc let exist = structs.find(s => s.strucId == structureId); if (exist) { + let alarmType = await clickHouse.anxinyun.query( + `SELECT name FROM t_alarm_type WHERE code='${alarmTypeCode}'`).toPromise() + let type = alarmType.length ? alarmType[0].name : '' + let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id); if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1 let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间 return { projectCorrelationId: d, - alarmInfo: { structureId, sourceName, alarmTypeCode, alarmCode, content }, + alarmInfo: { sourceName, alarmTypeCode }, time: time, - type: 1//异常类型 TODO + type//异常类型 } }) let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true }); @@ -118,7 +123,7 @@ module.exports = async function factory(app, opts) { return { pepUserId: null, projectCorrelationId: d, - alarmInfo: { sourceName, alarmTypeCode, alarmCode },//包含告警id,type,source TODO + alarmInfo: { source: sourceName, type },//包含告警id,type,source confirmTime: time, confirmContent: '自动恢复' } diff --git a/api/app/lib/utils/helper.js b/api/app/lib/utils/helper.js index e5e6b04..f2f56d5 100644 --- a/api/app/lib/utils/helper.js +++ b/api/app/lib/utils/helper.js @@ -2,29 +2,30 @@ const moment = require('moment') -async function getAxyStructs(app, pepProjectId) { +module.exports = function (app, opts) { const { models } = app.fs.dc const { clickHouse } = app.fs const { database: anxinyun } = clickHouse.anxinyun.opts.config - try { + async function getAxyStructs(pepProjectId) { try { - const { pepProjectRes, bindRes } = await pomsWithPepRangeParams(app, pepProjectId) - // 获取不重复的 安心云项目 id - const anxinProjectIds = [ - ...(bindRes).reduce( - (arr, b) => { - for (let sid of b.anxinProjectId) { - arr.add(sid); - } - return arr; - }, - new Set() - ) - ] - // 查询安心云项目及结构物信息 - const undelStrucRes = anxinProjectIds.length ? - await clickHouse.anxinyun.query( - `SELECT + try { + const { pepProjectRes, bindRes } = await pomsWithPepRangeParams(pepProjectId) + // 获取不重复的 安心云项目 id + const anxinProjectIds = [ + ...(bindRes).reduce( + (arr, b) => { + for (let sid of b.anxinProjectId) { + arr.add(sid); + } + return arr; + }, + new Set() + ) + ] + // 查询安心云项目及结构物信息 + const undelStrucRes = anxinProjectIds.length ? + await clickHouse.anxinyun.query( + `SELECT t_project.id AS projectId, t_structure.id AS strucId, t_structure.name AS strucName, @@ -56,81 +57,78 @@ async function getAxyStructs(app, pepProjectId) { AND t_project.id IN (${anxinProjectIds.join(',')})`).toPromise() : [] - // 构建安心云结构物和项企项目的关系 - // 并保存信息至数据 - let undelStruc = [] - for (let s of undelStrucRes) { - let corStruc = undelStruc.find(us => us.strucId == s.strucId) - if (corStruc) { - if (!corStruc.project.some(cp => cp.id == s.projectId)) { - corStruc.project.push({ - id: s.projectId - }) - } - } else { - corStruc = { - strucId: s.strucId, - strucName: s.strucName, - projectId: s.projectId, - project: [{ - id: s.projectId, - }], - pomsProject: [] - } - undelStruc.push(corStruc) - } - for (let { dataValues: br } of bindRes) { - if (br.anxinProjectId.some(braId => braId == s.projectId)) { - let corPepProject = pepProjectRes.find(pp => pp.id == br.pepProjectId) - let corPomsProject = corStruc.pomsProject.find(cp => cp.id == br.id) - - if (corPomsProject) { - // poms 的 project 和 pep 的 project 是一对一的关系 所以这个情况不用处理 - } else { - corStruc.pomsProject.push({ - ...br, - pepProject: corPepProject + // 构建安心云结构物和项企项目的关系 + // 并保存信息至数据 + let undelStruc = [] + for (let s of undelStrucRes) { + let corStruc = undelStruc.find(us => us.strucId == s.strucId) + if (corStruc) { + if (!corStruc.project.some(cp => cp.id == s.projectId)) { + corStruc.project.push({ + id: s.projectId }) } + } else { + corStruc = { + strucId: s.strucId, + strucName: s.strucName, + projectId: s.projectId, + project: [{ + id: s.projectId, + }], + pomsProject: [] + } + undelStruc.push(corStruc) + } + for (let { dataValues: br } of bindRes) { + if (br.anxinProjectId.some(braId => braId == s.projectId)) { + let corPepProject = pepProjectRes.find(pp => pp.id == br.pepProjectId) + let corPomsProject = corStruc.pomsProject.find(cp => cp.id == br.id) + if (corPomsProject) { + // poms 的 project 和 pep 的 project 是一对一的关系 所以这个情况不用处理 + } else { + corStruc.pomsProject.push({ + ...br, + pepProject: corPepProject + }) + } + + } } } + return undelStruc + } catch (error) { + console.error(error); } - return undelStruc } catch (error) { - console.error(error); + console.log(error) } - } catch (error) { - console.log(error) } -} -async function pomsWithPepRangeParams(app, pepProjectId) { - const { models } = app.fs.dc - const { clickHouse } = app.fs - const { database: anxinyun } = clickHouse.anxinyun.opts.config - try { - let findOption = { - where: { - del: false + async function pomsWithPepRangeParams(pepProjectId) { + try { + let findOption = { + where: { + del: false + } } - } - if (pepProjectId) { - // 有 特定的项目id 就按此查询 - findOption.where.id = pepProjectId - } - const bindRes = await models.ProjectCorrelation.findAll(findOption); - // 获取不重复的 项企项目id - let pepProjectIds = [] - for (let b of bindRes) { - if (b.pepProjectId) { - pepProjectIds.push(b.pepProjectId) + if (pepProjectId) { + // 有 特定的项目id 就按此查询 + findOption.where.id = pepProjectId } - } - // 查询项企项目的信息 - const pepProjectRes = pepProjectIds.length ? - await clickHouse.projectManage.query( - `SELECT + const bindRes = await models.ProjectCorrelation.findAll(findOption); + // 获取不重复的 项企项目id + let pepProjectIds = [] + for (let b of bindRes) { + if (b.pepProjectId) { + pepProjectIds.push(b.pepProjectId) + } + } + // 查询项企项目的信息 + const pepProjectRes = pepProjectIds.length ? + await clickHouse.projectManage.query( + `SELECT t_pim_project.id AS id, t_pim_project.project_name AS projectName, t_pim_project.isdelete AS isdelete, @@ -144,17 +142,18 @@ async function pomsWithPepRangeParams(app, pepProjectId) { ON t_pim_project_construction.construction_status_id = t_pim_project_state.id WHERE id IN (${pepProjectIds.join(',')})` - ).toPromise() : []; + ).toPromise() : []; - return { - pepProjectRes, bindRes - } + return { + pepProjectRes, bindRes + } - } catch (error) { - console.error(error); + } catch (error) { + console.error(error); + } } -} -module.exports = { - getAxyStructs -}; \ No newline at end of file + return { + getAxyStructs + } +} \ No newline at end of file