You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
186 lines
5.6 KiB
186 lines
5.6 KiB
package adaptors
|
|
|
|
import (
|
|
"encoding/json"
|
|
"goInOut/consumers/GDJKJC"
|
|
"goInOut/consumers/HBJCAS"
|
|
"goInOut/consumers/HBJCAS/protoFiles_hb"
|
|
"goInOut/dbOperate"
|
|
|
|
"log"
|
|
"math"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// Adaptor_AXYES_HBGL 统一采集软件数据 转换 湘潭健康监测平台
|
|
type Adaptor_AXYES_GDJKJC struct {
|
|
//传感器code转换信息
|
|
PointInfo map[int64]map[int64]int64
|
|
StructInfo map[int64]string
|
|
//一些必要信息
|
|
Info map[string]string
|
|
Redis *dbOperate.RedisHelper
|
|
}
|
|
|
|
func (the Adaptor_AXYES_GDJKJC) Transform(structId int64, factorId int, rawMsg string) []NeedPush {
|
|
esAggDateHistogram := HBJCAS.EsThemeAggDateHistogram{}
|
|
var needPush []NeedPush
|
|
err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
Payloads := the.EsAggTopToGDJKJC(structId, factorId, esAggDateHistogram)
|
|
if len(Payloads) == 0 {
|
|
return needPush
|
|
}
|
|
|
|
for _, payload := range Payloads {
|
|
needPush = append(needPush, NeedPush{
|
|
Topic: strconv.FormatInt(structId, 10),
|
|
Payload: payload,
|
|
})
|
|
}
|
|
|
|
return needPush
|
|
}
|
|
|
|
func (the Adaptor_AXYES_GDJKJC) EsAggTopToGDJKJC(structId int64, factorId int, esAggs HBJCAS.EsThemeAggDateHistogram) (result [][]byte) {
|
|
buckets := esAggs.Aggregations.GroupSensor.Buckets
|
|
if len(buckets) == 0 {
|
|
log.Printf("[s=%d,f=%d] ,es agg数据数量==0", structId, factorId)
|
|
return
|
|
}
|
|
//设施唯一编码(省平台)
|
|
uniqueCode := the.getUniqueCode(structId)
|
|
if uniqueCode == "" {
|
|
log.Printf("structId=%d,无匹配省平台uniqueCode", structId)
|
|
return
|
|
}
|
|
//数据汇总
|
|
for _, sensorBucket := range buckets {
|
|
sensorId := sensorBucket.Key
|
|
for _, dateBucket := range sensorBucket.GroupDate.Buckets {
|
|
//优先redis获取
|
|
//station := models.Station{}
|
|
//k1 := fmt.Sprintf("station:%d", sensorId)
|
|
//errRedis := the.Redis.GetObj(k1, &station)
|
|
//if errRedis != nil {
|
|
// log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签异常", structId, factorId, sensorId)
|
|
// continue
|
|
//}
|
|
monitorCode := "LJ-VIB-P01-004-01" //the.getPointCodeFromLabel(station.Labels)
|
|
if monitorCode == "" {
|
|
log.Printf("redis 获取[s:%d,f:%d]测点[%d]信息,异常,跳过", structId, factorId, sensorId)
|
|
continue
|
|
}
|
|
|
|
dataBytes := the.EsAgg2StatisticData(factorId, monitorCode, dateBucket)
|
|
//dataDefinition := &protoFiles_hb.DataDefinition{
|
|
// DataType: protoFiles_hb.DataType_STATISTICS,
|
|
// UniqueCode: fmt.Sprintf("%d", uniqueCode), //乃积沟大桥
|
|
// DataBody: the.EsAgg2StatisticData(factorId, monitorCode, dateBucket),
|
|
//}
|
|
result = append(result, dataBytes)
|
|
log.Printf("[s:%d,f:%d] t=%s, 特征数据=> %s", structId, factorId, dateBucket.KeyAsString, dataBytes)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
func (the Adaptor_AXYES_GDJKJC) getMonitorTypeByFactorId(factorId int) protoFiles_hb.MonitoryType {
|
|
//应变11 桥墩倾斜 15 裂缝18 支座位移20 桥面振动28 加速度三项监测592
|
|
switch factorId {
|
|
case 15:
|
|
return protoFiles_hb.MonitoryType_INC
|
|
case 18:
|
|
return protoFiles_hb.MonitoryType_CRK
|
|
case 20:
|
|
return protoFiles_hb.MonitoryType_DIS
|
|
case 28:
|
|
return protoFiles_hb.MonitoryType_VIB
|
|
case 592:
|
|
return protoFiles_hb.MonitoryType_VIB
|
|
default:
|
|
log.Printf("factorId=%d,无匹配的MonitorType", factorId)
|
|
return protoFiles_hb.MonitoryType_CMM
|
|
}
|
|
}
|
|
|
|
func (the Adaptor_AXYES_GDJKJC) EsAgg2StatisticData(factorId int, monitorCode string, dateBucket HBJCAS.BucketsXY) (dataBytes []byte) {
|
|
Atime := dateBucket.KeyAsString.Add(-8 * time.Hour).UnixMilli()
|
|
maxAbsoluteValueX := max(math.Abs(dateBucket.X.Max), math.Abs(dateBucket.X.Min))
|
|
avgValueX := dateBucket.X.Avg
|
|
rootMeanSquareX := math.Sqrt(dateBucket.X.SumOfSquares / float64(dateBucket.X.Count))
|
|
|
|
maxAbsoluteValueY := max(math.Abs(dateBucket.Y.Max), math.Abs(dateBucket.Y.Min))
|
|
avgValueY := dateBucket.Y.Avg
|
|
rootMeanSquareY := math.Sqrt(dateBucket.Y.SumOfSquares / float64(dateBucket.Y.Count))
|
|
|
|
commonBody := GDJKJC.CommonBody{
|
|
ThirdChannelCode: monitorCode,
|
|
DataTimeUnix: Atime,
|
|
}
|
|
var destStruct any
|
|
switch factorId {
|
|
case 11, 18: //应变11 裂缝18
|
|
destStruct = GDJKJC.A44Strain{
|
|
CommonBody: commonBody,
|
|
Avg: avgValueX,
|
|
Max: dateBucket.X.Max,
|
|
}
|
|
case 15: //倾角
|
|
destStruct = GDJKJC.A43AngleBody{
|
|
CommonBody: commonBody,
|
|
XAvg: avgValueX,
|
|
XMax: maxAbsoluteValueX,
|
|
XRms: rootMeanSquareX, //均方根
|
|
YAvg: avgValueY,
|
|
YMax: maxAbsoluteValueY,
|
|
YRms: rootMeanSquareY, //均方根
|
|
}
|
|
case 28: //振动
|
|
destStruct = GDJKJC.A48Acc{
|
|
CommonBody: commonBody,
|
|
Max: maxAbsoluteValueX / 100, //省平台 加速度单位 m/s² 实际设备单位 cm/s²
|
|
Rms: rootMeanSquareX / 100, //均方根
|
|
}
|
|
case 63: //支护结构变形(主梁位移)
|
|
destStruct = GDJKJC.A40DisplacementBody{
|
|
CommonBody: commonBody,
|
|
Avg: avgValueY,
|
|
Max: maxAbsoluteValueY,
|
|
Rms: rootMeanSquareY,
|
|
}
|
|
}
|
|
if destStruct != nil {
|
|
dataBytes, _ = json.Marshal(destStruct)
|
|
} else {
|
|
log.Printf("!!! [f=%d,s=%s]无匹配 映射数据", factorId, monitorCode)
|
|
}
|
|
return dataBytes
|
|
}
|
|
|
|
func (the Adaptor_AXYES_GDJKJC) getUniqueCode(structId int64) (uniqueCode string) {
|
|
if v, ok := the.StructInfo[structId]; ok {
|
|
uniqueCode = v
|
|
}
|
|
return uniqueCode
|
|
}
|
|
|
|
func (the Adaptor_AXYES_GDJKJC) getPointCodeFromLabel(label string) int64 {
|
|
//解析label {13010600001}
|
|
pointUniqueCode := int64(0)
|
|
if len(label) > 2 {
|
|
newLabel := strings.TrimLeft(label, "{")
|
|
str := strings.TrimRight(newLabel, "}")
|
|
codeInt64, err := strconv.ParseInt(str, 10, 64)
|
|
if err != nil {
|
|
log.Printf("测点标签转换异常[%s]", label)
|
|
}
|
|
pointUniqueCode = codeInt64
|
|
}
|
|
|
|
return pointUniqueCode
|
|
}
|
|
|