wenlele 2 years ago
parent
commit
297349b212
  1. 2
      api/.vscode/launch.json
  2. 110
      api/app/lib/controllers/alarm/data.js
  3. 3
      api/app/lib/index.js
  4. 5
      api/app/lib/routes/alarm/index.js
  5. 1
      api/app/lib/utils/index.js
  6. 21
      api/app/lib/utils/kafkaSend.js
  7. 16
      api/config.js
  8. 1
      api/package.json

2
api/.vscode/launch.json

@ -56,6 +56,8 @@
"--clickHouseProjectManage peppm8",
"--clickHouseVcmp video_accrss1",
"--clickHouseDataAlarm default",
"--confirmAlarmAnxinUserId 1",
]
},
{

110
api/app/lib/controllers/alarm/data.js

@ -1,4 +1,5 @@
'use strict';
const moment = require('moment');
async function groupList (ctx) {
try {
@ -33,7 +34,7 @@ async function list (ctx) {
const { utils: { judgeSuper, anxinStrucIdRange } } = ctx.app.fs
const { database: anxinyun } = clickHouse.anxinyun.opts.config
const { pepProjectId, groupId, groupUnitId, sustainTimeStart, sustainTimeEnd, limit, page } = ctx.query
const { pepProjectId, keyword, groupId, groupUnitId, sustainTimeStart, sustainTimeEnd, limit, page } = ctx.query
const isSuper = judgeSuper(ctx)
let anxinStrucIds = null
@ -67,6 +68,7 @@ async function list (ctx) {
)
`)
}
const alarmRes = await clickHouse.dataAlarm.query(`
SELECT
alarms.AlarmId AS AlarmId,
@ -100,6 +102,9 @@ async function list (ctx) {
// SELECT MAX(alarm_details.Time) from alarm_details WHERE AlarmId = alarms.AlarmId
// )
// State = 3 是 自动恢复 / 4 是 人工恢复 / 其他数字 是 需要恢复
ctx.status = 200;
ctx.body = alarmRes
} catch (error) {
@ -132,23 +137,86 @@ async function detail (ctx) {
}
}
async function confirm (ctx) {
function confirm (opts) {
return async function (ctx) {
try {
const { models } = ctx.fs.dc;
const { utils: { kfkSendAsync } } = ctx.app.fs
const { clickHouse } = ctx.app.fs
const { content = '', alarmId } = ctx.request.body
// 发送告警恢复通知
// Topic: alarm
/*
* {
* messageMode: "AlarmManElimination",
* sourceId: "",
* alarmTypeCode: "",
* sponsor: userId
* content: "确认消息"
* time: "YYYY-MM-DDTHH:mm:ss.SSSZ"
* }
*/
const alarmRes = await clickHouse.dataAlarm.query(`
SELECT * FROM alarms WHERE AlarmId = '${alarmId}'
`).toPromise();
if (!alarmRes.length) {
throw '没有查询到对应的告警信息'
}
const [corAlarm] = alarmRes
if ([3, 4].some(s => s == corAlarm.State)) {
throw '告警信息已确认'
}
const message = {
messageMode: "AlarmManElimination",
sourceId: corAlarm.SourceId,
alarmTypeCode: corAlarm.AlarmTypeCode,
sponsor: opts.anxinCloud.confirmAlarmAnxinUserId,
content: content,
time: moment().toISOString()
};
const payloads = [{
topic: `${opts.kafka.topicPrefix}_alarm`,
messages: [JSON.stringify(message)],
partition: 0
}];
await kfkSendAsync(payloads)
ctx.status = 204;
} catch (error) {
ctx.fs.logger.error(`path: ${ctx.path}, error: error`);
ctx.status = 400;
ctx.body = {
message: typeof error == 'string' ? error : undefined
}
}
}
}
async function detailAggregation (ctx) {
try {
const { models } = ctx.fs.dc;
// 发送告警恢复通知
// Topic: alarm
/*
* {
* messageMode: "AlarmManElimination",
* sourceId: "",
* alarmTypeCode: "",
* sponsor: userId
* content: "确认消息"
* time: "YYYY-MM-DDTHH:mm:ss.SSSZ"
* }
*/
ctx.status = 204;
const { alarmId } = ctx.query
const { clickHouse } = ctx.app.fs
const alarmDetailAggRes = await clickHouse.dataAlarm.query(`
SELECT
formatDateTime(Time,'%F %H') hours, count(AlarmId) count
FROM
alarm_details
WHERE
AlarmId=${alarmId}
GROUP BY
hours;
`).toPromise();
ctx.status = 200;
ctx.body = alarmDetailAggRes
} catch (error) {
ctx.fs.logger.error(`path: ${ctx.path}, error: error`);
ctx.status = 400;
@ -158,18 +226,17 @@ async function confirm (ctx) {
}
}
async function detailAggregation (ctx) {
async function alarmCount (ctx) {
try {
const { models } = ctx.fs.dc;
const { alarmId } = ctx.query
const { clickHouse } = ctx.app.fs
const alarmDetailAggRes = await clickHouse.dataAlarm.query(`
SELECT formatDateTime(Time,'%F %H') days, count(AlarmId) count from alarm_details WHERE AlarmId=${AlarmId} group by days;
const alarmUnconfirmedAggRes = await clickHouse.dataAlarm.query(`
SELECT count(AlarmId) count, AlarmGroup from alarms GROUP BY AlarmGroup;
`).toPromise();
ctx.status = 200;
ctx.body = alarmDetailAggRes
ctx.body = alarmUnconfirmedAggRes
} catch (error) {
ctx.fs.logger.error(`path: ${ctx.path}, error: error`);
ctx.status = 400;
@ -185,4 +252,5 @@ module.exports = {
groupList,
confirm,
detailAggregation,
alarmCount,
};

3
api/app/lib/index.js

@ -10,6 +10,7 @@ const mqttVideoServer = require('./service/mqttServer')
const paasRequest = require('./service/paasRequest');
const authenticator = require('./middlewares/authenticator');
const clickHouseClient = require('./service/clickHouseClient')
const kafka = require('./service/kafka')
const schedule = require('./schedule')
// const apiLog = require('./middlewares/api-log');
@ -29,6 +30,8 @@ module.exports.entry = function (app, router, opts) {
// 实例其他平台请求方法
paasRequest(app, opts)
kafka(app, opts)
// clickHouse 数据库 client
clickHouseClient(app, opts)

5
api/app/lib/routes/alarm/index.js

@ -29,6 +29,9 @@ module.exports = function (app, router, opts) {
app.fs.api.logAttr['GET/alarm/data/group'] = { content: '获取数据告警分类', visible: true };
router.get('/alarm/data/group', dataAlarm.groupList);
app.fs.api.logAttr['GET/alarm/data/count'] = { content: '查询数据告警未确认数量', visible: true };
router.get('/alarm/data/count', dataAlarm.alarmCount);
app.fs.api.logAttr['GET/alarm/data/list'] = { content: '查询数据告警列表', visible: true };
router.get('/alarm/data/list', dataAlarm.list);
@ -39,5 +42,5 @@ module.exports = function (app, router, opts) {
router.get('/alarm/data/detail_agg', dataAlarm.detailAggregation);
app.fs.api.logAttr['PUT/alarm/data/confirm'] = { content: '确认数据告警', visible: true };
router.put('/alarm/data/confirm', dataAlarm.confirm);
router.put('/alarm/data/confirm', dataAlarm.confirm(opts));
};

1
api/app/lib/utils/index.js

@ -7,6 +7,7 @@ module.exports = async function (app, opts) {
fs.readdirSync(__dirname).forEach((filename) => {
if (!['index.js'].some(f => filename == f)) {
const utils = require(`./${filename}`)(app, opts)
console.log(`载入 ${filename} 工具集成功`);
app.fs.utils = {
...app.fs.utils,
...utils,

21
api/app/lib/utils/kafkaSend.js

@ -0,0 +1,21 @@
'use strict';
module.exports = function (app, opts) {
async function kfkSendAsync (payloads) {
const { producer } = app.fs.kafka
return new Promise((resolve, reject) => {
producer.send(payloads, function (err) {
if (err) {
reject(err);
} else {
resolve();
}
});
})
}
return {
kfkSendAsync
}
}

16
api/config.js

@ -9,9 +9,9 @@ const dev = process.env.NODE_ENV == 'development';
// 启动参数
args.option(['p', 'port'], '启动端口');
args.option(['g', 'pg'], 'postgre服务URL');
args.option(['g', 'pg'], 'postgre 服务 URL');
args.option(['f', 'fileHost'], '文件中心本地化存储: WebApi 服务器地址(必填), 该服务器提供文件上传Web服务');
args.option(['k', 'kafka'], 'kafka服务URL');
args.option(['k', 'kafka'], 'kafka 服务 URL');
args.option('redisHost', 'redisHost');
args.option('redisPort', 'redisPort');
@ -40,6 +40,8 @@ args.option('clickHouseProjectManage', 'clickHouse 项目管理数据库名称')
args.option('clickHouseVcmp', 'clickHouse 视频平台数据库名称');
args.option('clickHouseDataAlarm', 'clickHouse 视频平台数据告警库名称');
args.option('confirmAlarmAnxinUserId', '确认告警时保存到 ES 的安心云的用户的 id');
const flags = args.parse(process.argv);
const POMS_DB = process.env.POMS_DB || flags.pg;
@ -82,6 +84,10 @@ const CLICKHOUST_PROJECT_MANAGE = process.env.CLICKHOUST_PROJECT_MANAGE || flags
const CLICKHOUST_VCMP = process.env.CLICKHOUST_VCMP || flags.clickHouseVcmp
const CLICKHOUST_DATA_ALARM = process.env.CLICKHOUST_DATA_ALARM || flags.clickHouseDataAlarm
const CONFIRM_ALARM_ANXIN_USER_ID = process.env.CONFIRM_ALARM_ANXIN_USER_ID || flags.confirmAlarmAnxinUserId
const PLATFORM_NAME = process.env.PLATFORM_NAME || flags.platformName || 'anxinyun';
if (
!POMS_DB
|| !IOTA_REDIS_SERVER_HOST || !IOTA_REDIS_SERVER_PORT
@ -92,6 +98,7 @@ if (
|| !QINIU_DOMAIN_QNDMN_RESOURCE || !QINIU_BUCKET_RESOURCE || !QINIU_AK || !QINIU_SK
|| !CLICKHOUST_URL || !CLICKHOUST_PORT
|| !CLICKHOUST_ANXINCLOUD || !CLICKHOUST_PEP_EMIS || !CLICKHOUST_PROJECT_MANAGE || !CLICKHOUST_VCMP || !CLICKHOUST_DATA_ALARM
|| !CONFIRM_ALARM_ANXIN_USER_ID
) {
console.log('缺少启动参数,异常退出');
args.showHelp();
@ -123,9 +130,12 @@ const product = {
{ p: '/project/app_list', o: 'GET' },
{ p: '/alarm/application/api', o: 'POST' }
], // 不做认证的路由,也可以使用 exclude: ["*"] 跳过所有路由
anxinCloud: {
confirmAlarmAnxinUserId: CONFIRM_ALARM_ANXIN_USER_ID
},
kafka: {
rootURL: ANXINCLOUD_KAFKA_BROKERS,
// topicPrefix: PLATFORM_NAME
topicPrefix: PLATFORM_NAME
},
redis: {
host: IOTA_REDIS_SERVER_HOST,

1
api/package.json

@ -22,6 +22,7 @@
"file-saver": "^2.0.2",
"fs-web-server-scaffold": "^2.0.2",
"ioredis": "^5.0.4",
"kafka-node": "^2.2.3",
"koa-convert": "^1.2.0",
"koa-proxy": "^0.9.0",
"moment": "^2.24.0",

Loading…
Cancel
Save