From f1d9e7625d2d1ba22579d73a8734fa7c7f8c9af4 Mon Sep 17 00:00:00 2001 From: "gao.zhiyuan" Date: Fri, 23 Sep 2022 17:46:10 +0800 Subject: [PATCH] =?UTF-8?q?=E5=91=8A=E8=AD=A6=E8=AF=A6=E6=83=85=E8=81=9A?= =?UTF-8?q?=E9=9B=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/.vscode/launch.json | 2 + api/app/lib/controllers/alarm/data.js | 110 +++++++++++++++++++++----- api/app/lib/index.js | 3 + api/app/lib/routes/alarm/index.js | 5 +- api/app/lib/utils/index.js | 1 + api/app/lib/utils/kafkaSend.js | 21 +++++ api/config.js | 16 +++- api/package.json | 1 + 8 files changed, 134 insertions(+), 25 deletions(-) create mode 100644 api/app/lib/utils/kafkaSend.js diff --git a/api/.vscode/launch.json b/api/.vscode/launch.json index 57bf5b8..aaedb1e 100644 --- a/api/.vscode/launch.json +++ b/api/.vscode/launch.json @@ -56,6 +56,8 @@ "--clickHouseProjectManage peppm8", "--clickHouseVcmp video_accrss1", "--clickHouseDataAlarm default", + + "--confirmAlarmAnxinUserId 1", ] }, { diff --git a/api/app/lib/controllers/alarm/data.js b/api/app/lib/controllers/alarm/data.js index 76a52f5..7d1b960 100644 --- a/api/app/lib/controllers/alarm/data.js +++ b/api/app/lib/controllers/alarm/data.js @@ -1,4 +1,5 @@ 'use strict'; +const moment = require('moment'); async function groupList (ctx) { try { @@ -33,7 +34,7 @@ async function list (ctx) { const { utils: { judgeSuper, anxinStrucIdRange } } = ctx.app.fs const { database: anxinyun } = clickHouse.anxinyun.opts.config - const { pepProjectId, groupId, groupUnitId, sustainTimeStart, sustainTimeEnd, limit, page } = ctx.query + const { pepProjectId, keyword, groupId, groupUnitId, sustainTimeStart, sustainTimeEnd, limit, page } = ctx.query const isSuper = judgeSuper(ctx) let anxinStrucIds = null @@ -67,6 +68,7 @@ async function list (ctx) { ) `) } + const alarmRes = await clickHouse.dataAlarm.query(` SELECT alarms.AlarmId AS AlarmId, @@ -100,6 +102,9 @@ async function list (ctx) { // SELECT MAX(alarm_details.Time) from alarm_details WHERE AlarmId = alarms.AlarmId // ) + // State = 3 是 自动恢复 / 4 是 人工恢复 / 其他数字 是 需要恢复 + + ctx.status = 200; ctx.body = alarmRes } catch (error) { @@ -132,23 +137,86 @@ async function detail (ctx) { } } -async function confirm (ctx) { +function confirm (opts) { + return async function (ctx) { + try { + const { models } = ctx.fs.dc; + const { utils: { kfkSendAsync } } = ctx.app.fs + const { clickHouse } = ctx.app.fs + const { content = '', alarmId } = ctx.request.body + // 发送告警恢复通知 + // Topic: alarm + /* + * { + * messageMode: "AlarmManElimination", + * sourceId: "", + * alarmTypeCode: "", + * sponsor: userId, + * content: "确认消息", + * time: "YYYY-MM-DDTHH:mm:ss.SSSZ" + * } + */ + + const alarmRes = await clickHouse.dataAlarm.query(` + SELECT * FROM alarms WHERE AlarmId = '${alarmId}' + `).toPromise(); + + if (!alarmRes.length) { + throw '没有查询到对应的告警信息' + } + + const [corAlarm] = alarmRes + if ([3, 4].some(s => s == corAlarm.State)) { + throw '告警信息已确认' + } + const message = { + messageMode: "AlarmManElimination", + sourceId: corAlarm.SourceId, + alarmTypeCode: corAlarm.AlarmTypeCode, + sponsor: opts.anxinCloud.confirmAlarmAnxinUserId, + content: content, + time: moment().toISOString() + }; + + const payloads = [{ + topic: `${opts.kafka.topicPrefix}_alarm`, + messages: [JSON.stringify(message)], + partition: 0 + }]; + + await kfkSendAsync(payloads) + + ctx.status = 204; + } catch (error) { + ctx.fs.logger.error(`path: ${ctx.path}, error: error`); + ctx.status = 400; + ctx.body = { + message: typeof error == 'string' ? error : undefined + } + } + } +} + + +async function detailAggregation (ctx) { try { const { models } = ctx.fs.dc; - // 发送告警恢复通知 - // Topic: alarm - /* - * { - * messageMode: "AlarmManElimination", - * sourceId: "", - * alarmTypeCode: "", - * sponsor: userId, - * content: "确认消息", - * time: "YYYY-MM-DDTHH:mm:ss.SSSZ" - * } - */ - - ctx.status = 204; + const { alarmId } = ctx.query + const { clickHouse } = ctx.app.fs + + const alarmDetailAggRes = await clickHouse.dataAlarm.query(` + SELECT + formatDateTime(Time,'%F %H') hours, count(AlarmId) count + FROM + alarm_details + WHERE + AlarmId=${alarmId} + GROUP BY + hours; + `).toPromise(); + + ctx.status = 200; + ctx.body = alarmDetailAggRes } catch (error) { ctx.fs.logger.error(`path: ${ctx.path}, error: error`); ctx.status = 400; @@ -158,18 +226,17 @@ async function confirm (ctx) { } } -async function detailAggregation (ctx) { +async function alarmCount (ctx) { try { const { models } = ctx.fs.dc; - const { alarmId } = ctx.query const { clickHouse } = ctx.app.fs - const alarmDetailAggRes = await clickHouse.dataAlarm.query(` - SELECT formatDateTime(Time,'%F %H') days, count(AlarmId) count from alarm_details WHERE AlarmId=${AlarmId} group by days; + const alarmUnconfirmedAggRes = await clickHouse.dataAlarm.query(` + SELECT count(AlarmId) count, AlarmGroup from alarms GROUP BY AlarmGroup; `).toPromise(); ctx.status = 200; - ctx.body = alarmDetailAggRes + ctx.body = alarmUnconfirmedAggRes } catch (error) { ctx.fs.logger.error(`path: ${ctx.path}, error: error`); ctx.status = 400; @@ -185,4 +252,5 @@ module.exports = { groupList, confirm, detailAggregation, + alarmCount, }; \ No newline at end of file diff --git a/api/app/lib/index.js b/api/app/lib/index.js index 21063b4..bd5cde2 100644 --- a/api/app/lib/index.js +++ b/api/app/lib/index.js @@ -10,6 +10,7 @@ const mqttVideoServer = require('./service/mqttServer') const paasRequest = require('./service/paasRequest'); const authenticator = require('./middlewares/authenticator'); const clickHouseClient = require('./service/clickHouseClient') +const kafka = require('./service/kafka') const schedule = require('./schedule') // const apiLog = require('./middlewares/api-log'); @@ -29,6 +30,8 @@ module.exports.entry = function (app, router, opts) { // 实例其他平台请求方法 paasRequest(app, opts) + kafka(app, opts) + // clickHouse 数据库 client clickHouseClient(app, opts) diff --git a/api/app/lib/routes/alarm/index.js b/api/app/lib/routes/alarm/index.js index ca64541..60a9792 100644 --- a/api/app/lib/routes/alarm/index.js +++ b/api/app/lib/routes/alarm/index.js @@ -29,6 +29,9 @@ module.exports = function (app, router, opts) { app.fs.api.logAttr['GET/alarm/data/group'] = { content: '获取数据告警分类', visible: true }; router.get('/alarm/data/group', dataAlarm.groupList); + app.fs.api.logAttr['GET/alarm/data/count'] = { content: '查询数据告警未确认数量', visible: true }; + router.get('/alarm/data/count', dataAlarm.alarmCount); + app.fs.api.logAttr['GET/alarm/data/list'] = { content: '查询数据告警列表', visible: true }; router.get('/alarm/data/list', dataAlarm.list); @@ -39,5 +42,5 @@ module.exports = function (app, router, opts) { router.get('/alarm/data/detail_agg', dataAlarm.detailAggregation); app.fs.api.logAttr['PUT/alarm/data/confirm'] = { content: '确认数据告警', visible: true }; - router.put('/alarm/data/confirm', dataAlarm.confirm); + router.put('/alarm/data/confirm', dataAlarm.confirm(opts)); }; diff --git a/api/app/lib/utils/index.js b/api/app/lib/utils/index.js index 4d3272e..19d9745 100644 --- a/api/app/lib/utils/index.js +++ b/api/app/lib/utils/index.js @@ -7,6 +7,7 @@ module.exports = async function (app, opts) { fs.readdirSync(__dirname).forEach((filename) => { if (!['index.js'].some(f => filename == f)) { const utils = require(`./${filename}`)(app, opts) + console.log(`载入 ${filename} 工具集成功`); app.fs.utils = { ...app.fs.utils, ...utils, diff --git a/api/app/lib/utils/kafkaSend.js b/api/app/lib/utils/kafkaSend.js new file mode 100644 index 0000000..2956b8a --- /dev/null +++ b/api/app/lib/utils/kafkaSend.js @@ -0,0 +1,21 @@ +'use strict'; + +module.exports = function (app, opts) { + + async function kfkSendAsync (payloads) { + const { producer } = app.fs.kafka + return new Promise((resolve, reject) => { + producer.send(payloads, function (err) { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }) + } + + return { + kfkSendAsync + } +} \ No newline at end of file diff --git a/api/config.js b/api/config.js index dad3c78..e294fc3 100644 --- a/api/config.js +++ b/api/config.js @@ -9,9 +9,9 @@ const dev = process.env.NODE_ENV == 'development'; // 启动参数 args.option(['p', 'port'], '启动端口'); -args.option(['g', 'pg'], 'postgre服务URL'); +args.option(['g', 'pg'], 'postgre 服务 URL'); args.option(['f', 'fileHost'], '文件中心本地化存储: WebApi 服务器地址(必填), 该服务器提供文件上传Web服务'); -args.option(['k', 'kafka'], 'kafka服务URL'); +args.option(['k', 'kafka'], 'kafka 服务 URL'); args.option('redisHost', 'redisHost'); args.option('redisPort', 'redisPort'); @@ -40,6 +40,8 @@ args.option('clickHouseProjectManage', 'clickHouse 项目管理数据库名称') args.option('clickHouseVcmp', 'clickHouse 视频平台数据库名称'); args.option('clickHouseDataAlarm', 'clickHouse 视频平台数据告警库名称'); +args.option('confirmAlarmAnxinUserId', '确认告警时保存到 ES 的安心云的用户的 id'); + const flags = args.parse(process.argv); const POMS_DB = process.env.POMS_DB || flags.pg; @@ -82,6 +84,10 @@ const CLICKHOUST_PROJECT_MANAGE = process.env.CLICKHOUST_PROJECT_MANAGE || flags const CLICKHOUST_VCMP = process.env.CLICKHOUST_VCMP || flags.clickHouseVcmp const CLICKHOUST_DATA_ALARM = process.env.CLICKHOUST_DATA_ALARM || flags.clickHouseDataAlarm +const CONFIRM_ALARM_ANXIN_USER_ID = process.env.CONFIRM_ALARM_ANXIN_USER_ID || flags.confirmAlarmAnxinUserId + +const PLATFORM_NAME = process.env.PLATFORM_NAME || flags.platformName || 'anxinyun'; + if ( !POMS_DB || !IOTA_REDIS_SERVER_HOST || !IOTA_REDIS_SERVER_PORT @@ -92,6 +98,7 @@ if ( || !QINIU_DOMAIN_QNDMN_RESOURCE || !QINIU_BUCKET_RESOURCE || !QINIU_AK || !QINIU_SK || !CLICKHOUST_URL || !CLICKHOUST_PORT || !CLICKHOUST_ANXINCLOUD || !CLICKHOUST_PEP_EMIS || !CLICKHOUST_PROJECT_MANAGE || !CLICKHOUST_VCMP || !CLICKHOUST_DATA_ALARM + || !CONFIRM_ALARM_ANXIN_USER_ID ) { console.log('缺少启动参数,异常退出'); args.showHelp(); @@ -123,9 +130,12 @@ const product = { { p: '/project/app_list', o: 'GET' }, { p: '/alarm/application/api', o: 'POST' } ], // 不做认证的路由,也可以使用 exclude: ["*"] 跳过所有路由 + anxinCloud: { + confirmAlarmAnxinUserId: CONFIRM_ALARM_ANXIN_USER_ID + }, kafka: { rootURL: ANXINCLOUD_KAFKA_BROKERS, - // topicPrefix: PLATFORM_NAME + topicPrefix: PLATFORM_NAME }, redis: { host: IOTA_REDIS_SERVER_HOST, diff --git a/api/package.json b/api/package.json index d2ec263..cffb191 100644 --- a/api/package.json +++ b/api/package.json @@ -22,6 +22,7 @@ "file-saver": "^2.0.2", "fs-web-server-scaffold": "^2.0.2", "ioredis": "^5.0.4", + "kafka-node": "^2.2.3", "koa-convert": "^1.2.0", "koa-proxy": "^0.9.0", "moment": "^2.24.0",