From 9f7db97f4e52bd3c8a4d09276fcfbf74bec06b1e Mon Sep 17 00:00:00 2001 From: "gao.zhiyuan" Date: Thu, 22 Sep 2022 21:31:32 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=91=8A=E8=AD=A6=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E3=80=81=E5=91=8A=E8=AD=A6=E7=A1=AE=E8=AE=A4=20kafka?= =?UTF-8?q?=E5=BC=95=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/.vscode/launch.json | 1 + api/app/lib/controllers/alarm/data.js | 113 ++++++++++++++++++++++---- api/app/lib/routes/alarm/index.js | 6 ++ api/app/lib/service/kafka.js | 20 +++++ api/config.js | 9 ++ 5 files changed, 134 insertions(+), 15 deletions(-) create mode 100644 api/app/lib/service/kafka.js diff --git a/api/.vscode/launch.json b/api/.vscode/launch.json index 8f919f3..57bf5b8 100644 --- a/api/.vscode/launch.json +++ b/api/.vscode/launch.json @@ -19,6 +19,7 @@ "-g postgres://postgres:123@10.8.30.32:5432/orational_service", // 测试 // "-g postgres://FashionAdmin:123456@10.8.30.156:5432/POMS", + "-k node35:6667,node36:6667,node37:6667", "--redisHost 10.8.30.112", "--redisPort 6379", "--axyApiUrl http://127.0.0.1:4100", diff --git a/api/app/lib/controllers/alarm/data.js b/api/app/lib/controllers/alarm/data.js index e193722..82da6a4 100644 --- a/api/app/lib/controllers/alarm/data.js +++ b/api/app/lib/controllers/alarm/data.js @@ -1,13 +1,39 @@ 'use strict'; +async function groupList (ctx) { + try { + const { models } = ctx.fs.dc; + const { clickHouse } = ctx.app.fs + + const groupRes = await clickHouse.anxinyun.query(` + SELECT * FROM t_alarm_group + `).toPromise(); + + for (let g of groupRes) { + g.unit = await await clickHouse.anxinyun.query(` + SELECT * FROM t_alarm_group_unit WHERE group_id = ${g.id} + `).toPromise(); + } + + ctx.status = 200; + ctx.body = groupRes + } catch (error) { + ctx.fs.logger.error(`path: ${ctx.path}, error: error`); + ctx.status = 400; + ctx.body = { + message: typeof error == 'string' ? error : undefined + } + } +} + async function list (ctx) { try { const { models } = ctx.fs.dc; const { clickHouse } = ctx.app.fs const { utils: { judgeSuper, anxinStrucIdRange } } = ctx.app.fs const { database: anxinyun } = clickHouse.anxinyun.opts.config - - const { pepProjectId, groupId, groupUnitId, sustainTimeStart, sustainTimeEnd } = ctx.query + + const { pepProjectId, groupId, groupUnitId, sustainTimeStart, sustainTimeEnd, limit, page } = ctx.query const isSuper = judgeSuper(ctx) let anxinStrucIds = null @@ -19,7 +45,7 @@ async function list (ctx) { whereOption.push(`alarms.StructureId IN (${anxinStrucIds.join(",")})`) } if (groupId) { - whereOption.push(`alarms.AlarmGroup=${groupId}`) + whereOption.push(`alarms.AlarmGroup IN (${groupId})`) } if (groupUnitId) { whereOption.push(`alarms.AlarmGroupUnit=${groupId}`) @@ -29,7 +55,7 @@ async function list (ctx) { let momentEnd = moment(sustainTimeEnd).format() whereOption.push(` ( - alarms."StartTime" + alarms."StartTime" BETWEEN '${momentStart}' AND '${momentEnd}' OR "alarms"."EndTime" BETWEEN '${momentStart}' AND '${momentEnd}' @@ -43,26 +69,39 @@ async function list (ctx) { } const alarmRes = await clickHouse.dataAlarm.query(` SELECT - alarms.AlarmId, alarms.State, SourceName, StartTime, EndTime, alarms.CurrentLevel, SourceTypeId, AlarmAdviceProblem,AlarmGroup, AlarmGroupUnit, AlarmAdviceProblem, - ${anxinyun}.t_structure.name AS StructureName, StructureId, - ${anxinyun}.t_alarm_code.name AS AlarmCodeName, - alarm_details.Time, alarm_details.Content + alarms.AlarmId AS AlarmId, + alarms.State AS State, + SourceName, StartTime, EndTime, + alarms.CurrentLevel AS CurrentLevel, + SourceTypeId, + AlarmAdviceProblem, AlarmGroup, AlarmGroupUnit, AlarmAdviceProblem, + ${anxinyun}.t_structure.name AS StructureName, + StructureId, + ${anxinyun}.t_alarm_code.name AS AlarmCodeName + FROM alarms LEFT JOIN ${anxinyun}.t_structure ON ${anxinyun}.t_structure.id = alarms.StructureId LEFT JOIN ${anxinyun}.t_alarm_code ON ${anxinyun}.t_alarm_code.code = alarms.AlarmTypeCode - LEFT JOIN alarm_details - ON alarms.AlarmId = alarm_details.AlarmId - AND alarm_details.Time = ( - SELECT MAX(alarm_details.Time) from alarm_details WHERE AlarmId = alarms.AlarmId - ) + ${whereOption.length ? 'WHERE ' + whereOption.join(' AND ') : ''} + ORDER BY alarms.StartTime DESC + ${limit ? 'LIMIT ' + limit : ''} + ${limit && page ? 'OFFSET ' + parseInt(limit) * parseInt(page) : ''} `).toPromise(); + // alarm_details.Time, alarm_details.Content + + // LEFT JOIN alarm_details + // ON alarms.AlarmId = alarm_details.AlarmId + // AND alarm_details.Time = ( + // SELECT MAX(alarm_details.Time) from alarm_details WHERE AlarmId = alarms.AlarmId + // ) + ctx.status = 200; - ctx.body = [] + ctx.body = alarmRes } catch (error) { ctx.fs.logger.error(`path: ${ctx.path}, error: error`); ctx.status = 400; @@ -93,7 +132,51 @@ async function detail (ctx) { } } +async function confirm (ctx) { + try { + const { models } = ctx.fs.dc; + // 发送告警恢复通知 + // Topic: alarm + /* + * { + * messageMode: "AlarmManElimination", + * sourceId: "", + * alarmTypeCode: "", + * sponsor: userId, + * content: "确认消息", + * time: "YYYY-MM-DDTHH:mm:ss.SSSZ" + * } + */ + + ctx.status = 204; + } catch (error) { + ctx.fs.logger.error(`path: ${ctx.path}, error: error`); + ctx.status = 400; + ctx.body = { + message: typeof error == 'string' ? error : undefined + } + } +} + +async function detailAggregation (ctx) { + try { + const { models } = ctx.fs.dc; + + ctx.status = 200; + ctx.body = [] + } catch (error) { + ctx.fs.logger.error(`path: ${ctx.path}, error: error`); + ctx.status = 400; + ctx.body = { + message: typeof error == 'string' ? error : undefined + } + } +} + module.exports = { list, - detail + detail, + groupList, + confirm, + detailAggregation, }; \ No newline at end of file diff --git a/api/app/lib/routes/alarm/index.js b/api/app/lib/routes/alarm/index.js index c535a31..dd9d3b6 100644 --- a/api/app/lib/routes/alarm/index.js +++ b/api/app/lib/routes/alarm/index.js @@ -26,9 +26,15 @@ module.exports = function (app, router, opts) { router.post('/alarm/application/api_confirm', application.confirmApiError); // 数据告警 + app.fs.api.logAttr['GET/alarm/data/group'] = { content: '获取数据告警分类', visible: true }; + router.get('/alarm/data/group', dataAlarm.groupList); + app.fs.api.logAttr['GET/alarm/data/list'] = { content: '查询数据告警列表', visible: true }; router.get('/alarm/data/list', dataAlarm.list); app.fs.api.logAttr['GET/alarm/data/detail'] = { content: '查询数据告警详情', visible: true }; router.get('/alarm/data/detail', dataAlarm.detail); + + app.fs.api.logAttr['PUT/alarm/data/confirm'] = { content: '确认数据告警', visible: true }; + router.put('/alarm/data/confirm', dataAlarm.confirm); }; diff --git a/api/app/lib/service/kafka.js b/api/app/lib/service/kafka.js new file mode 100644 index 0000000..f164eee --- /dev/null +++ b/api/app/lib/service/kafka.js @@ -0,0 +1,20 @@ +'use strict'; + +const Kafka = require('kafka-node'); + +module.exports = async function factory (app, opts) { + const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL }); + const producer = new Kafka.HighLevelProducer(client); + + producer.on('error', function (err) { + app.fs.logger.log('error', "[FS-KAFKA]", err); + }); + + const kafka = { + producer: producer, + configUpdateMessage: opts.configUpdateMessage || {} + }; + + app.fs.kafka = kafka; + app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success"); +} \ No newline at end of file diff --git a/api/config.js b/api/config.js index 11e9062..dad3c78 100644 --- a/api/config.js +++ b/api/config.js @@ -11,6 +11,7 @@ const dev = process.env.NODE_ENV == 'development'; args.option(['p', 'port'], '启动端口'); args.option(['g', 'pg'], 'postgre服务URL'); args.option(['f', 'fileHost'], '文件中心本地化存储: WebApi 服务器地址(必填), 该服务器提供文件上传Web服务'); +args.option(['k', 'kafka'], 'kafka服务URL'); args.option('redisHost', 'redisHost'); args.option('redisPort', 'redisPort'); @@ -44,6 +45,9 @@ const flags = args.parse(process.argv); const POMS_DB = process.env.POMS_DB || flags.pg; const LOCAL_SVR_ORIGIN = process.env.LOCAL_SVR_ORIGIN || flags.fileHost; +// kafka +const ANXINCLOUD_KAFKA_BROKERS = process.env.ANXINCLOUD_KAFKA_BROKERS || flags.kafka; + // Redis 参数 const IOTA_REDIS_SERVER_HOST = process.env.IOTA_REDIS_SERVER_HOST || flags.redisHost || "localhost";//redis IP const IOTA_REDIS_SERVER_PORT = process.env.IOTA_REDIS_SERVER_PORT || flags.redisPort || "6379";//redis 端口 @@ -81,6 +85,7 @@ const CLICKHOUST_DATA_ALARM = process.env.CLICKHOUST_DATA_ALARM || flags.clickHo if ( !POMS_DB || !IOTA_REDIS_SERVER_HOST || !IOTA_REDIS_SERVER_PORT + || !ANXINCLOUD_KAFKA_BROKERS || !GOD_KEY || !API_ANXINYUN_URL || !API_EMIS_URL @@ -118,6 +123,10 @@ const product = { { p: '/project/app_list', o: 'GET' }, { p: '/alarm/application/api', o: 'POST' } ], // 不做认证的路由,也可以使用 exclude: ["*"] 跳过所有路由 + kafka: { + rootURL: ANXINCLOUD_KAFKA_BROKERS, + // topicPrefix: PLATFORM_NAME + }, redis: { host: IOTA_REDIS_SERVER_HOST, port: IOTA_REDIS_SERVER_PORT,