Browse Source

数据告警获取、告警确认 kafka引入

dev
巴林闲侠 2 years ago
parent
commit
9f7db97f4e
  1. 1
      api/.vscode/launch.json
  2. 109
      api/app/lib/controllers/alarm/data.js
  3. 6
      api/app/lib/routes/alarm/index.js
  4. 20
      api/app/lib/service/kafka.js
  5. 9
      api/config.js

1
api/.vscode/launch.json

@ -19,6 +19,7 @@
"-g postgres://postgres:123@10.8.30.32:5432/orational_service",
//
// "-g postgres://FashionAdmin:123456@10.8.30.156:5432/POMS",
"-k node35:6667,node36:6667,node37:6667",
"--redisHost 10.8.30.112",
"--redisPort 6379",
"--axyApiUrl http://127.0.0.1:4100",

109
api/app/lib/controllers/alarm/data.js

@ -1,5 +1,31 @@
'use strict';
async function groupList (ctx) {
try {
const { models } = ctx.fs.dc;
const { clickHouse } = ctx.app.fs
const groupRes = await clickHouse.anxinyun.query(`
SELECT * FROM t_alarm_group
`).toPromise();
for (let g of groupRes) {
g.unit = await await clickHouse.anxinyun.query(`
SELECT * FROM t_alarm_group_unit WHERE group_id = ${g.id}
`).toPromise();
}
ctx.status = 200;
ctx.body = groupRes
} catch (error) {
ctx.fs.logger.error(`path: ${ctx.path}, error: error`);
ctx.status = 400;
ctx.body = {
message: typeof error == 'string' ? error : undefined
}
}
}
async function list (ctx) {
try {
const { models } = ctx.fs.dc;
@ -7,7 +33,7 @@ async function list (ctx) {
const { utils: { judgeSuper, anxinStrucIdRange } } = ctx.app.fs
const { database: anxinyun } = clickHouse.anxinyun.opts.config
const { pepProjectId, groupId, groupUnitId, sustainTimeStart, sustainTimeEnd } = ctx.query
const { pepProjectId, groupId, groupUnitId, sustainTimeStart, sustainTimeEnd, limit, page } = ctx.query
const isSuper = judgeSuper(ctx)
let anxinStrucIds = null
@ -19,7 +45,7 @@ async function list (ctx) {
whereOption.push(`alarms.StructureId IN (${anxinStrucIds.join(",")})`)
}
if (groupId) {
whereOption.push(`alarms.AlarmGroup=${groupId}`)
whereOption.push(`alarms.AlarmGroup IN (${groupId})`)
}
if (groupUnitId) {
whereOption.push(`alarms.AlarmGroupUnit=${groupId}`)
@ -43,26 +69,39 @@ async function list (ctx) {
}
const alarmRes = await clickHouse.dataAlarm.query(`
SELECT
alarms.AlarmId, alarms.State, SourceName, StartTime, EndTime, alarms.CurrentLevel, SourceTypeId, AlarmAdviceProblem,AlarmGroup, AlarmGroupUnit, AlarmAdviceProblem,
${anxinyun}.t_structure.name AS StructureName, StructureId,
${anxinyun}.t_alarm_code.name AS AlarmCodeName,
alarm_details.Time, alarm_details.Content
alarms.AlarmId AS AlarmId,
alarms.State AS State,
SourceName, StartTime, EndTime,
alarms.CurrentLevel AS CurrentLevel,
SourceTypeId,
AlarmAdviceProblem, AlarmGroup, AlarmGroupUnit, AlarmAdviceProblem,
${anxinyun}.t_structure.name AS StructureName,
StructureId,
${anxinyun}.t_alarm_code.name AS AlarmCodeName
FROM
alarms
LEFT JOIN ${anxinyun}.t_structure
ON ${anxinyun}.t_structure.id = alarms.StructureId
LEFT JOIN ${anxinyun}.t_alarm_code
ON ${anxinyun}.t_alarm_code.code = alarms.AlarmTypeCode
LEFT JOIN alarm_details
ON alarms.AlarmId = alarm_details.AlarmId
AND alarm_details.Time = (
SELECT MAX(alarm_details.Time) from alarm_details WHERE AlarmId = alarms.AlarmId
)
${whereOption.length ? 'WHERE ' + whereOption.join(' AND ') : ''}
ORDER BY alarms.StartTime DESC
${limit ? 'LIMIT ' + limit : ''}
${limit && page ? 'OFFSET ' + parseInt(limit) * parseInt(page) : ''}
`).toPromise();
// alarm_details.Time, alarm_details.Content
// LEFT JOIN alarm_details
// ON alarms.AlarmId = alarm_details.AlarmId
// AND alarm_details.Time = (
// SELECT MAX(alarm_details.Time) from alarm_details WHERE AlarmId = alarms.AlarmId
// )
ctx.status = 200;
ctx.body = []
ctx.body = alarmRes
} catch (error) {
ctx.fs.logger.error(`path: ${ctx.path}, error: error`);
ctx.status = 400;
@ -93,7 +132,51 @@ async function detail (ctx) {
}
}
async function confirm (ctx) {
try {
const { models } = ctx.fs.dc;
// 发送告警恢复通知
// Topic: alarm
/*
* {
* messageMode: "AlarmManElimination",
* sourceId: "",
* alarmTypeCode: "",
* sponsor: userId
* content: "确认消息"
* time: "YYYY-MM-DDTHH:mm:ss.SSSZ"
* }
*/
ctx.status = 204;
} catch (error) {
ctx.fs.logger.error(`path: ${ctx.path}, error: error`);
ctx.status = 400;
ctx.body = {
message: typeof error == 'string' ? error : undefined
}
}
}
async function detailAggregation (ctx) {
try {
const { models } = ctx.fs.dc;
ctx.status = 200;
ctx.body = []
} catch (error) {
ctx.fs.logger.error(`path: ${ctx.path}, error: error`);
ctx.status = 400;
ctx.body = {
message: typeof error == 'string' ? error : undefined
}
}
}
module.exports = {
list,
detail
detail,
groupList,
confirm,
detailAggregation,
};

6
api/app/lib/routes/alarm/index.js

@ -26,9 +26,15 @@ module.exports = function (app, router, opts) {
router.post('/alarm/application/api_confirm', application.confirmApiError);
// 数据告警
app.fs.api.logAttr['GET/alarm/data/group'] = { content: '获取数据告警分类', visible: true };
router.get('/alarm/data/group', dataAlarm.groupList);
app.fs.api.logAttr['GET/alarm/data/list'] = { content: '查询数据告警列表', visible: true };
router.get('/alarm/data/list', dataAlarm.list);
app.fs.api.logAttr['GET/alarm/data/detail'] = { content: '查询数据告警详情', visible: true };
router.get('/alarm/data/detail', dataAlarm.detail);
app.fs.api.logAttr['PUT/alarm/data/confirm'] = { content: '确认数据告警', visible: true };
router.put('/alarm/data/confirm', dataAlarm.confirm);
};

20
api/app/lib/service/kafka.js

@ -0,0 +1,20 @@
'use strict';
const Kafka = require('kafka-node');
module.exports = async function factory (app, opts) {
const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL });
const producer = new Kafka.HighLevelProducer(client);
producer.on('error', function (err) {
app.fs.logger.log('error', "[FS-KAFKA]", err);
});
const kafka = {
producer: producer,
configUpdateMessage: opts.configUpdateMessage || {}
};
app.fs.kafka = kafka;
app.fs.logger.log('debug', "[FS-KAFKA]", "Init.Success");
}

9
api/config.js

@ -11,6 +11,7 @@ const dev = process.env.NODE_ENV == 'development';
args.option(['p', 'port'], '启动端口');
args.option(['g', 'pg'], 'postgre服务URL');
args.option(['f', 'fileHost'], '文件中心本地化存储: WebApi 服务器地址(必填), 该服务器提供文件上传Web服务');
args.option(['k', 'kafka'], 'kafka服务URL');
args.option('redisHost', 'redisHost');
args.option('redisPort', 'redisPort');
@ -44,6 +45,9 @@ const flags = args.parse(process.argv);
const POMS_DB = process.env.POMS_DB || flags.pg;
const LOCAL_SVR_ORIGIN = process.env.LOCAL_SVR_ORIGIN || flags.fileHost;
// kafka
const ANXINCLOUD_KAFKA_BROKERS = process.env.ANXINCLOUD_KAFKA_BROKERS || flags.kafka;
// Redis 参数
const IOTA_REDIS_SERVER_HOST = process.env.IOTA_REDIS_SERVER_HOST || flags.redisHost || "localhost";//redis IP
const IOTA_REDIS_SERVER_PORT = process.env.IOTA_REDIS_SERVER_PORT || flags.redisPort || "6379";//redis 端口
@ -81,6 +85,7 @@ const CLICKHOUST_DATA_ALARM = process.env.CLICKHOUST_DATA_ALARM || flags.clickHo
if (
!POMS_DB
|| !IOTA_REDIS_SERVER_HOST || !IOTA_REDIS_SERVER_PORT
|| !ANXINCLOUD_KAFKA_BROKERS
|| !GOD_KEY
|| !API_ANXINYUN_URL
|| !API_EMIS_URL
@ -118,6 +123,10 @@ const product = {
{ p: '/project/app_list', o: 'GET' },
{ p: '/alarm/application/api', o: 'POST' }
], // 不做认证的路由,也可以使用 exclude: ["*"] 跳过所有路由
kafka: {
rootURL: ANXINCLOUD_KAFKA_BROKERS,
// topicPrefix: PLATFORM_NAME
},
redis: {
host: IOTA_REDIS_SERVER_HOST,
port: IOTA_REDIS_SERVER_PORT,

Loading…
Cancel
Save