运维服务中台
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

636 lines
22 KiB

const moment = require('moment');
//获取异常识别算法
async function findAbnMethods(ctx, next) {
let rslt = null;
let error = { name: 'FindError', message: '异常识别算法获取失败' };
try {
const models = ctx.fs.dc.models;
let abnMethods = await models.AbnTypes.findAll();
rslt = abnMethods.map(s => ({
id: s.id,
name: s.name,
des: s.des
}));
error = null;
} catch (err) {
ctx.fs.logger.error(`path: ${ctx.path}, error: ${err}`);
}
if (error) {
ctx.status = 400;
ctx.body = error;
} else {
ctx.status = 200;
ctx.body = rslt;
}
}
//获取异常参数配置
async function findAbnParamList(ctx) {
const { factorId } = ctx.query
const id=factorId.split(',')
let rslt = null;
let error = { name: 'FindError', message: '异常参数配置获取失败' };
try {
const models = ctx.fs.dc.models;
let abnParamList = await models.AbnReportParams.findAll({
where: { factorId:{$in:id} }
})
rslt = abnParamList.map(s => ({
key: s.id,
id: s.id,
sensorId: s.sensorId,
sensorName: s.sensorLocationDescription,
abnType: s.abnTypeId,
enabled: s.enabled,
factorId: s.factorId,
factorName: s.factor,
params: s.params,
itemIndex:s.itemIndex
}));
error = null;
} catch (err) {
ctx.fs.logger.error(`path: ${ctx.path}, error: ${err}`);
}
if (error) {
ctx.status = 400;
ctx.body = error;
} else {
ctx.status = 200;
ctx.body = rslt;
}
}
//新增异常参数配置
async function createAbnParam(ctx) {
let error = { name: 'CreateError', message: '异常参数配置新增失败' };
const models = ctx.fs.dc.models
const data = ctx.request.body
try {
for (let i = 0; i < data.length; i++) {
let dataItem = data[i];
if (dataItem && dataItem.params && dataItem.abnType && dataItem.enabled != null && dataItem.sensorId && dataItem.factorId) {
let dataToSave = {
sensorId: dataItem.sensorId,
sensorLocationDescription:dataItem.sensorName,
enabled: dataItem.enabled,
abnTypeId: dataItem.abnType,
factorId: dataItem.factorId,
factor: dataItem.factorName,
itemIndex: dataItem.itemId,
params: dataItem.params
};
await models.AbnReportParams.create(dataToSave)
error = null
// // 日志信息
// ctx.fs.api = ctx.fs.api || {}
// ctx.fs.api.actionParameter = JSON.stringify(data)
// ctx.fs.api.actionParameterShow = `新增异常推送配置id:${newId}`
}
}
}
catch (err) {
ctx.fs.logger.error(`path: ${ctx.path}, error: ${err}`)
}
if (error) {
ctx.status = 400
ctx.body = error
} else {
ctx.status = 204
}
}
//
async function batchSwitch(ctx, next) {
const ids = ctx.params.ids.split(',')
const data = ctx.request.body
let error = { name: 'UpdateError', message: data ? '批量启用异常参数配置失败' : '批量禁用异常参数配置失败' };
try {
for (let i = 0; i < ids.length; i++) {
let id = ids[i];
const models = ctx.fs.dc.models;
let abnParam = await models.AbnReportParams.findOne({ where: { id: id } });
if (abnParam) {
let dataToSave = {};
if (data.use == 'switch') {
dataToSave.enabled = data.enabled;//批量启用or禁用
} else {
dataToSave.params = data.paramJson;//批量改参数
}
if (Object.keys(dataToSave).length) {
await models.AbnReportParams.update(dataToSave, { where: { id } });
}
error = null;
// // 日志信息
// ctx.fs.api = ctx.fs.api || {};
// ctx.fs.api.actionParameter = JSON.stringify(data);
// ctx.fs.api.actionParameterShow = `异常参数配置id:${id}`;
} else {
error = { name: 'NotFound', message: `不存在{id=${id}}的异常参数配置` };
}
}
} catch (err) {
ctx.fs.logger.error(`path: ${ctx.path}, error: ${err}`);
}
if (error) {
ctx.status = 400;
ctx.body = error;
} else {
ctx.status = 204;
}
}
//删除异常参数配置
async function deleteAbnParam(ctx, next) {
let error = { name: 'DeleteError', message: '异常参数配置删除失败' };
const ids = ctx.params.ids.split(',');
//const { id } = ctx.params;
try {
for (let i = 0; i < ids.length; i++) {
let id = ids[i];
const models = ctx.fs.dc.models;
let abnParam = await models.AbnReportParams.findOne({ where: { id } });
if (abnParam) {
await models.AbnReportParams.destroy({ where: { id } });
error = null;
} else {
error = { name: 'NotFound', message: `不存在{id=${id}}的异常参数配置` };
}
}
} catch (err) {
ctx.fs.logger.error(`path: ${ctx.path}, error: ${err}`);
}
if (error) {
ctx.status = 400;
ctx.body = error;
} else {
ctx.status = 204;
}
}
//修改异常推送配置
async function updateAbnParam(ctx) {
let error = { name: 'UpdateError', message: '异常参数配置修改失败' }
const ids = ctx.params.ids.split(',')
const data = ctx.request.body
if (data && Object.keys(data).length) {
try {
for (let i = 0; i < ids.length; i++) {
let id = ids[i];
const models = ctx.fs.dc.models;
let abnParam = await models.AbnReportParams.findOne({ where: { id: id } });
if (abnParam) {
let dataToSave = {};
const { abnType, params, enabled } = data;
if (enabled != null && enabled != abnParam.enabled)
dataToSave.enabled = enabled;
//中断
if (abnType == 1) {
if (params != null && params.thr_int !== abnParam.params.thr_int) {
dataToSave.params = params;
}
}
//毛刺
if (abnType == 2) {
if (params != null && params.thr_burr !== abnParam.params.thr_burr) {
dataToSave.params = params;
}
}
//趋势
if (abnType == 3) {
if (params != null &&
(params.thr_burr !== abnParam.params.thr_burr || params.win_med !== abnParam.params.win_med
|| params.win_avg !== abnParam.params.win_avg || params.win_grad !== abnParam.params.win_grad
|| params.thr_grad !== abnParam.params.thr_grad || params.thr_der !== abnParam.params.thr_der
|| params.days_Last !== abnParam.params.days_Last)) {
dataToSave.params = params;
}
}
if (Object.keys(dataToSave).length) {
await models.AbnReportParams.update(dataToSave, { where: { id } });
}
error = null;
} else {
error = { name: 'NotFound', message: `不存在{id=${id}}的异常参数配置` };
}
}
} catch (err) {
ctx.fs.logger.error(`path: ${ctx.path}, error: ${err}`);
}
}
if (error) {
ctx.status = 400;
ctx.body = error;
} else {
ctx.status = 204;
}
}
//异常数据对比
async function getAbnTaskResult(ctx, next) {
let error = { name: 'TaskError', message: '异常数据对比失败' }
const models = ctx.fs.dc.models
const structId = ctx.params.id
const startTime = ctx.params.start
const endTime = ctx.params.end
const data = ctx.request.body
const stationId = data.station
let factorProto = await models.Factor.findOne({
where: { id: data.factorId },
attributes: ['id', 'proto']
});
let protoItems = await models.FactorProtoItem.findAll({
where: { proto: factorProto.proto },
attributes: ['id', 'name']
});
let itemName = await models.FactorProtoItem.findOne({
where: { id: data.itemId ? data.itemId : protoItems[0].id },
attributes: ['id', 'field_name', 'name']
});
try {
const itemsObj = await findThemeItems(models, data.factorId)
const filter = {
query: {
bool: {
must: [
{ match: { "sensor": stationId } }
]
}
}
}
if (startTime && endTime) {
filter.query.bool.must.push({ range: { "collect_time": { gte: moment(startTime).toISOString(), lte: moment(endTime).toISOString() } } });
}
const esThemeData = await findThemeDataFromES(ctx.app.fs.esclient, filter)
const stationsData = esThemeData.reduce((p, c) => {
const { sensor, data, collect_time } = c._source;
p.unshift(Object.assign({}, data, { time: moment(collect_time).format('YYYY-MM-DD HH:mm:ss') }));
return p;
}, []);
//获取前一天的最后一条数据
const preFilter = {
query: {
bool: {
must: [
{ match: { "sensor": stationId } },
{ range: { "collect_time": { gte: moment(startTime).add('days', -1).toISOString(), lte: moment(startTime).toISOString() } } }
]
}
}
}
const esPreData = await findThemeDataFromES(ctx.app.fs.esclient, preFilter);
const preOneData = esPreData.reduce((p, c) => {
const { data, collect_time } = c._source;
p.unshift(Object.assign({}, data, { time: moment(collect_time).format('YYYY-MM-DD HH:mm:ss') }));
return p;
}, []);
let one = preOneData && preOneData.length > 0 ? preOneData[preOneData.length - 1] : null;
let itemKey = itemName.get({ plain: true }).field_name;//监测项名称
let itemn = itemName.get({ plain: true }).name;
let calcResult = calcAlgorithm(one, stationsData, data, itemKey);//计算
ctx.status = 200;
ctx.body = {
unit: itemsObj[itemKey].unit,
method: data.abnType,
itemKey: itemKey,
itemName: itemn,
stationData: stationsData,
resultArray: calcResult
};
} catch (err) {
ctx.fs.logger.error(err);
ctx.status = 400;
ctx.body = {
name: "FindError",
message: "异常数据识别-数据对比失败"
}
}
}
let calcAlgorithm = function (dataOne, dataSource, params, itemKey) {
let result;
switch (params.abnType) {
case "interrupt":
result = interrupt(dataOne, dataSource, params, itemKey);
break;
case "burr":
result = burr(dataOne, dataSource, params, itemKey).result;
break;
case "trend":
result = trend(null, dataSource, params, itemKey);
break;
}
return result;
};
//中断
let interrupt = function (dataOne, dataSource, params, key) {
let result = [];
if (dataSource.length != 0) {
if (dataOne == null) {
result.push({
type: "interrupt",
hour: 24.00,
time: dataSource[0].time,
value: dataSource[0][key]
});//第一个点中断
} else {
dataSource.unshift(dataOne);
}
for (let i = 0; i < dataSource.length - 1; i++) {
if (dataSource[i] == null || dataSource[i + 1] == null) continue;
let hour = getHour(dataSource[i + 1].time, dataSource[i].time);
if (hour >= params.params.thr_int) {
result.push({
type: "interrupt",
hour: hour,
time: dataSource[i + 1].time,
value: dataSource[i + 1][key]
});
}
}
}
return result;
}
//毛刺
let burr = function (dataOne, dataSource, params, key) {
let burrTv = params.params.thr_burr;
let result = [];//毛刺点
let dataSAfterBurr = [];//去掉毛刺点的数组
if (dataSource.length != 0) {
if (dataOne != null) {
dataSource.unshift(dataOne);
}
for (let i = 1; i < dataSource.length - 1; i++) {
if (dataSource[i - 1] == null || dataSource[i] == null || dataSource[i + 1] == null) continue
let gap1 = dataSource[i][key] - dataSource[i - 1][key]
let gap2 = dataSource[i][key] - dataSource[i + 1][key]
let gap3 = dataSource[i - 1][key] - dataSource[i][key]
let gap4 = dataSource[i + 1][key] - dataSource[i][key]
let result1 = (gap1 > burrTv && gap2 > burrTv)
let result2 = (gap3 > burrTv && gap4 > burrTv)
if (i == 1) {//第一个点
dataSAfterBurr.push(dataSource[0])
}
if (result1 || result2) {
result.push({
type: "burr",
burr: result1 ? Math.min(gap1, gap2) : Math.min(gap3, gap4),
time: dataSource[i].time,
value: dataSource[i][key]
})
} else {
dataSAfterBurr.push(dataSource[i])
}
if (i == dataSource.length - 2) {//最后一个点
dataSAfterBurr.push(dataSource[dataSource.length - 1])
}
}
}
return { result: result, dataSAfterBurr: dataSAfterBurr }
}
//异常趋势
let trend = function (dataOne, dataSource, params, key) {
let result;
if (dataSource.length != 0) {
//去完毛刺的新数组
let afterBurr = burr(dataOne, dataSource, params, key).dataSAfterBurr;
//滑动中值
let arrAfterMedian = [];
for (let i = 0; i < afterBurr.length; i += parseInt(params.params.win_med)) {
let arr = afterBurr.slice(i, i + parseInt(params.params.win_med))
let oneMedian = calcMedian(arr, key)
arrAfterMedian.push(oneMedian)
}
//滑动均值
let arrAfterAvg = calcMeanValue(arrAfterMedian, params.params.win_avg, key)
//错位相减,相当于求导
let arrAfterDe = []
for (let j = 0; j < arrAfterAvg.length - 1; j++) {
let one = {
value: arrAfterAvg[j + 1].value - arrAfterAvg[j].value,
time: arrAfterAvg[j + 1].time
}
arrAfterDe.push(one);
}
//最后判断
let finalArray = finalJudge(arrAfterDe, arrAfterMedian, params)
result = {
calcFinal: finalArray,
calcPreprocess: arrAfterAvg//要画预处理+滑动均值完了的曲线
};
}
return result
}
let getHour = function (s1, s2) {
s1 = new Date(s1.replace(/-/g, '/'))
s2 = new Date(s2.replace(/-/g, '/'))
let ms = Math.abs(s1.getTime() - s2.getTime())
return ms / 1000 / 60 / 60;
}
//计算一组数据的中值
let calcMedian = function (array, key) {
let result;
if (array != null || array.length > 0) {
array.sort((a, b) => { return a[key] - b[key]})
if (array.length % 2 == 0) {//偶数
let index1 = array.length / 2;
result = {
value: (array[index1][key] + array[index1 - 1][key]) / 2,
time: array[index1].time
}
} else {//奇数
let index = (array.length - 1) / 2
result = {
value: array[index][key],
time: array[index].time
}
}
}
return result;
}
//计算一组数据的均值
let calcMeanValue = function (array, coef, key) {
let result = [];
let sum = 0;
if (array != null || array.length > 0) {
for (let i = 0; i < array.length; i++) {
let value;
if (i < parseInt(coef)) {
sum = sum + array[i].value
value = sum / (i + 1)
} else {
let arr = array.slice(i - parseInt(coef) + 1, i + 1)
let ssum = 0;
for (let s = 0; s < arr.length; s++) {
ssum = ssum + arr[s].value
}
value = ssum / parseInt(coef)
}
let one = {
value: value,
time: array[i].time
}
result.push(one)
}
}
return result
}
let finalJudge = function (array, original, params) {
let ups = 1, downs = 1;
let tempUp = [], tempDown = [];
let point = params.params.win_grad;//渐变点个数
let deTv = params.params.thr_der;//导数阈值
let greTv = params.params.thr_grad;//渐变阈值
let finalArray = [];
for (let i = 0; i < array.length; i++)//对最新数组作阈值判断
{
if (array[i].value > deTv) {
ups = ups + 1
if (ups == 2) {
tempUp.push(original[i])
}
tempUp.push(original[i + 1])
if (tempDown.length >= point) {
let bbb = tempDown[tempDown.length - 1].value - tempDown[0].Value
if (downs >= point && bbb < -greTv) {
let one = {
startTime: tempDown[0].time,
endTime: tempDown[tempDown.length - 1].time,
startValue: tempDown[0].value,
endValue: tempDown[tempDown.length - 1].value,
value: bbb,
des: "异常下降"
};
finalArray.push(one)
}
}
downs = 1
tempDown = []
} else if (array[i].value < -deTv) {
downs = downs + 1;
if (downs == 2) {
tempDown.push(original[i])
}
tempDown.push(original[i + 1])
if (tempUp.length >= point) {
let aaa = tempUp[tempUp.length - 1].value - tempUp[0].value
if (ups >= point && aaa > greTv) {
let one = {
startTime: tempUp[0].time,
endTime: tempUp[tempUp.length - 1].time,
startValue: tempUp[0].value,
endValue: tempUp[tempUp.length - 1].value,
value: aaa,
des: "异常上升"
};
finalArray.push(one)
}
}
ups = 1;
tempUp = []
}
}
if (ups >= point) {
let ccc = tempUp[tempUp.length - 1].value - tempUp[0].value
if (ccc > greTv) {
let one = {
startTime: tempUp[0].time,
endTime: tempUp[tempUp.length - 1].time,
startValue: tempUp[0].value,
endValue: tempUp[tempUp.length - 1].value,
value: ccc,
des: "异常上升"
};
finalArray.push(one)
}
}
if (downs >= point) {
let ddd = tempDown[tempDown.length - 1].value - tempDown[0].value
if (ddd < -greTv) {
let one = {
startTime: tempDown[0].time,
endTime: tempDown[tempDown.length - 1].time,
startValue: tempDown[0].value,
endValue: tempDown[tempDown.length - 1].value,
value: ddd,
des: "异常下降"
};
finalArray.push(one);
}
}
return finalArray
}
async function findThemeItems(models, factor) {
try {
let factorProto = await models.Factor.findOne({
where: { id: factor },
attributes: ['id', 'proto']
});
let protoItems = await models.FactorProtoItem.findAll({
where: { proto: factorProto.proto },
attributes: ['name', 'fieldName'],
include: [{
model: models.ItemUnit,
where: { default: true },
required: true,
attributes: ['name']
}]
});
let itemsObj = protoItems.reduce((p, c) => {
p[c.fieldName] = {
name: c.name,
unit: c.itemUnits[0] ? c.itemUnits[0].name : null
}
return p
}, {})
return itemsObj
} catch (err) {
throw err
}
};
async function findThemeDataFromES(esclient, filter, limit, _source) {
try {
let rslt = []
const client = esclient[THEME_DATA]
let params = {
index: client.config.index,
type: client.config.type,
body: filter
}
params.size = limit
if (limit == null) {
const countRes = await client.count(params)
params.size = countRes.count > 10000 ? 10000 : countRes.count
}
params._source = _source || ["sensor", "collect_time", "data"]
params.body.sort = { "collect_time": { "order": "desc" } }
let res = await client.search(params)
rslt = res.hits.hits
return rslt
} catch (err) {
throw err
}
}
module.exports = {
findAbnMethods,
findAbnParamList,
createAbnParam,
updateAbnParam,
batchSwitch,
deleteAbnParam,
getAbnTaskResult
};