Browse Source

(*)kafka接收消息调试

dev
wuqun 2 years ago
parent
commit
070dcde441
  1. 2
      api/.vscode/launch.json
  2. 129
      api/app/lib/service/kafka.js
  3. 160
      api/app/lib/utils/helper.js
  4. 2
      api/package.json

2
api/.vscode/launch.json

@ -19,7 +19,7 @@
"-g postgres://postgres:123@10.8.30.32:5432/orational_service",
//
// "-g postgres://FashionAdmin:123456@10.8.30.156:5432/POMS",
"-k node35:6667,node36:6667,node37:6667",
"-k 10.8.30.72:29092,10.8.30.73:29092,10.8.30.74:29092",
"--iotaProxy http://10.8.30.157:17007",
"--redisHost 10.8.30.112",
"--redisPort 6379",

129
api/app/lib/service/kafka.js

@ -1,11 +1,19 @@
'use strict';
const moment = require('moment');
const Kafka = require('kafka-node');
const { getAxyStructs } = require('../utils/helper');
module.exports = async function factory(app, opts) {
let structsAche = {
dataList: [],
expireTime: null//10分钟更新一次结构物列表
}
//const { utils: { getAxyStructs } } = app.fs
const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL });
const producer = new Kafka.HighLevelProducer(client);
producer.on('error', function (err) {
app.fs.logger.log('error', "[FS-KAFKA]", err);
});
@ -17,4 +25,121 @@ module.exports = async function factory (app, opts) {
app.fs.kafka = kafka;
app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success");
////------------------------------------------------------------------- try try try
// setTimeout(async () => {
// let msg = {
// "messageMode": "AlarmGeneration",
// "structureId": 1043,
// "structureName": "TPA 数据波动影响分析及实验",
// "sourceId": "727466e9-28c3-48f0-a320-3fb66b7e4151",
// "sourceName": "忻德TPA1",
// "alarmTypeCode": "3004",
// "alarmCode": "3002",
// "content": "link [soip:40001:00012532] is nil",
// "time": "2022-10-31T11:21:14.000+0800",
// "sourceTypeId": 1,
// "sponsor": "et.recv",
// "extras": null
// }
// await savePullAlarm(msg);
// }, 3000)
const topics = [{ topic: 'anxinyun_alarm', partition: 0 }]
const options = {
groupId: 'topic-test-one',
autoCommit: true,
}
const consumer = new Kafka.Consumer(client, topics, options)
consumer.on("ready", function () {
console.log('consumer ready 666666666666')
})
consumer.on('message', function (message) {
const buf = new Buffer(String(message.value), 'binary')
const decodedMessage = JSON.parse(buf.toString())
console.log('decodedMessage: ', decodedMessage)
})
consumer.on('error', function (err) {
console.log('error', err)
})
process.on('SIGINT', function () {
consumer.close(true, function () {
process.exit()
})
})
async function getStructsAche(app) {
try {
if (!structsAche.dataList.length || moment() > moment(structsAche.expireTime)) {
let structList = await getAxyStructs(app);
structsAche.dataList = structList;
structsAche.expireTime = moment().add(10, 'minute').format('YYYY-MM-DD HH:mm:ss');
}
return structsAche;
} catch (err) {
console.log(`获取结构物列表失败, error: ${err}`);
}
}
//保存告警[发现]
async function savePullAlarm(msg) {
try {
let { messageMode, structureId, sourceName, alarmTypeCode, alarmCode, content, time } = msg;
let structsAche = await getStructsAche(app);
if (structsAche) {
let structs = structsAche.dataList;//结构物列表
const { models } = app.fs.dc
let exist = structs.find(s => s.strucId == structureId);
if (exist) {
let projects = exist.pomsProject.filter(d => !d.del).map(p => p.id);
if (messageMode == 'AlarmGeneration') {//告警产生--------------------------------------------------1
let datas = projects.map(d => {//需要 项目,告警源,异常类型,时间
return {
projectCorrelationId: d,
alarmInfo: { structureId, sourceName, alarmTypeCode, alarmCode, content },
time: time,
type: 1//异常类型 TODO
}
})
let rslt = await models.AlarmAppearRecord.bulkCreate(datas, { returning: true });
let dynamics = rslt.map(r => {
return {
time: r.time,
alarmAppearId: r.id,
projectCorrelationId: r.projectCorrelationId,
type: 1//发现
}
})
await models.LatestDynamicList.bulkCreate(dynamics);
} else if (messageMode == 'AlarmAutoElimination') {//告警自动恢复------------------------------------2
let datas = projects.map(d => {
return {
pepUserId: null,
projectCorrelationId: d,
alarmInfo: { sourceName, alarmTypeCode, alarmCode },//包含告警id,type,source TODO
confirmTime: time,
confirmContent: '自动恢复'
}
})
let rslt = await models.AlarmConfirmLog.bulkCreate(datas, { returning: true });
let dynamics = rslt.map(r => {
return {
time: r.confirmTime,
alarmConfirmId: r.id,
projectCorrelationId: r.projectCorrelationId,
type: 4//告警确认
}
})
await models.LatestDynamicList.bulkCreate(dynamics);
}
}
} else {
console.log(`获取结构物列表失败, error: ${err}`);
}
} catch (error) {
console.error(error);
}
}
}

160
api/app/lib/utils/helper.js

@ -0,0 +1,160 @@
'use strict';
const moment = require('moment')
async function getAxyStructs(app, pepProjectId) {
const { models } = app.fs.dc
const { clickHouse } = app.fs
const { database: anxinyun } = clickHouse.anxinyun.opts.config
try {
try {
const { pepProjectRes, bindRes } = await pomsWithPepRangeParams(app, pepProjectId)
// 获取不重复的 安心云项目 id
const anxinProjectIds = [
...(bindRes).reduce(
(arr, b) => {
for (let sid of b.anxinProjectId) {
arr.add(sid);
}
return arr;
},
new Set()
)
]
// 查询安心云项目及结构物信息
const undelStrucRes = anxinProjectIds.length ?
await clickHouse.anxinyun.query(
`SELECT
t_project.id AS projectId,
t_structure.id AS strucId,
t_structure.name AS strucName,
project_state
FROM
t_project
LEFT JOIN
t_project_structure
ON t_project_structure.project = t_project.id
LEFT JOIN
t_project_structuregroup
ON t_project_structuregroup.project = t_project.id
LEFT JOIN
t_structuregroup_structure
ON t_structuregroup_structure.structuregroup = t_project_structuregroup.structuregroup
LEFT JOIN
t_project_construction
ON t_project_construction.project = t_project.id
LEFT JOIN
t_structure_site
ON t_structure_site.siteid = t_project_construction.construction
RIGHT JOIN
t_structure
ON t_structure.id = t_project_structure.structure
OR t_structure.id = t_structuregroup_structure.structure
OR t_structure.id = t_structure_site.structid
WHERE
project_state != -1
AND
t_project.id IN (${anxinProjectIds.join(',')})`).toPromise() : []
// 构建安心云结构物和项企项目的关系
// 并保存信息至数据
let undelStruc = []
for (let s of undelStrucRes) {
let corStruc = undelStruc.find(us => us.strucId == s.strucId)
if (corStruc) {
if (!corStruc.project.some(cp => cp.id == s.projectId)) {
corStruc.project.push({
id: s.projectId
})
}
} else {
corStruc = {
strucId: s.strucId,
strucName: s.strucName,
projectId: s.projectId,
project: [{
id: s.projectId,
}],
pomsProject: []
}
undelStruc.push(corStruc)
}
for (let { dataValues: br } of bindRes) {
if (br.anxinProjectId.some(braId => braId == s.projectId)) {
let corPepProject = pepProjectRes.find(pp => pp.id == br.pepProjectId)
let corPomsProject = corStruc.pomsProject.find(cp => cp.id == br.id)
if (corPomsProject) {
// poms 的 project 和 pep 的 project 是一对一的关系 所以这个情况不用处理
} else {
corStruc.pomsProject.push({
...br,
pepProject: corPepProject
})
}
}
}
}
return undelStruc
} catch (error) {
console.error(error);
}
} catch (error) {
console.log(error)
}
}
async function pomsWithPepRangeParams(app, pepProjectId) {
const { models } = app.fs.dc
const { clickHouse } = app.fs
const { database: anxinyun } = clickHouse.anxinyun.opts.config
try {
let findOption = {
where: {
del: false
}
}
if (pepProjectId) {
// 有 特定的项目id 就按此查询
findOption.where.id = pepProjectId
}
const bindRes = await models.ProjectCorrelation.findAll(findOption);
// 获取不重复的 项企项目id
let pepProjectIds = []
for (let b of bindRes) {
if (b.pepProjectId) {
pepProjectIds.push(b.pepProjectId)
}
}
// 查询项企项目的信息
const pepProjectRes = pepProjectIds.length ?
await clickHouse.projectManage.query(
`SELECT
t_pim_project.id AS id,
t_pim_project.project_name AS projectName,
t_pim_project.isdelete AS isdelete,
t_pim_project_construction.construction_status_id AS constructionStatusId,
t_pim_project_state.construction_status AS constructionStatus
FROM
t_pim_project
LEFT JOIN t_pim_project_construction
ON t_pim_project.id = t_pim_project_construction.project_id
LEFT JOIN t_pim_project_state
ON t_pim_project_construction.construction_status_id = t_pim_project_state.id
WHERE
id IN (${pepProjectIds.join(',')})`
).toPromise() : [];
return {
pepProjectRes, bindRes
}
} catch (error) {
console.error(error);
}
}
module.exports = {
getAxyStructs
};

2
api/package.json

@ -22,7 +22,7 @@
"file-saver": "^2.0.2",
"fs-web-server-scaffold": "^2.0.2",
"ioredis": "^5.0.4",
"kafka-node": "^2.2.3",
"kafka-node": "^5.0.0",
"koa-convert": "^1.2.0",
"koa-proxy": "^0.9.0",
"moment": "^2.24.0",

Loading…
Cancel
Save