数据 输入输出 处理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

228 lines
8.1 KiB

package adaptors
import (
"encoding/hex"
"encoding/json"
"fmt"
"goInOut/consumers/GZG2ZJHL/protoFiles_zjhl/protoFiles_zjhl_v3"
"goInOut/consumers/HBJCAS"
"goInOut/dbOperate"
"goInOut/models"
"google.golang.org/protobuf/proto"
"log"
"math"
"strconv"
"strings"
"time"
)
// Adaptor_ZWYES_ZJHL 知物云果子沟es 特征数据 to 中交华联平台
type Adaptor_ZWYES_ZJHL struct {
//传感器code转换信息
PointInfo map[int64]map[int64]int64
StructInfo map[int64]int64
//一些必要信息
Info map[string]string
Redis *dbOperate.RedisHelper
}
func (the Adaptor_ZWYES_ZJHL) Transform(structId int64, factorId int, rawMsg string) []NeedPush {
esAggDateHistogram := HBJCAS.EsThemeAggDateHistogram{}
var needPush []NeedPush
err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram)
if err != nil {
return nil
}
Payload := the.EsAggTopToHBJCAS(structId, factorId, esAggDateHistogram)
if len(Payload) >= 0 {
return needPush
}
needPush = append(needPush, NeedPush{
Payload: Payload,
})
return needPush
}
func (the Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, esAggs HBJCAS.EsThemeAggDateHistogram) (result []byte) {
buckets := esAggs.Aggregations.GroupSensor.Buckets
if len(buckets) == 0 {
log.Printf("[s=%d,f=%d] ,es agg数据数量==0", structId, factorId)
return
}
//设施唯一编码(省平台)
uniqueCode := the.getUniqueCode(structId)
if uniqueCode == 0 {
log.Printf("structId=%d,无匹配省平台uniqueCode", structId)
return
}
//数据汇总
complexData := &protoFiles_zjhl_v3.ComplexData{}
for _, sensorBucket := range buckets {
sensorId := sensorBucket.Key
for _, dateBucket := range sensorBucket.GroupDate.Buckets {
//优先redis获取
station := models.Station{}
k1 := fmt.Sprintf("station:%d", sensorId)
errRedis := the.Redis.GetObj(k1, &station)
if errRedis != nil {
log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签异常", structId, factorId, sensorId)
continue
}
monitorCode := the.getPointCodeFromLabel(station.Labels)
if monitorCode == 0 {
log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签,信息转换int64异常,跳过", structId, factorId, sensorId)
continue
}
dataBody := the.EsAgg2StatisticData(factorId, monitorCode, dateBucket)
if dataBody == nil {
log.Printf("[s:%d,f:%d]测点[%d] 特征数据组包异常,跳过", structId, factorId, sensorId)
continue
}
dataDefinition := &protoFiles_zjhl_v3.DataDefinition{
DataType: protoFiles_zjhl_v3.DataType_STATISTICS,
//BridgeCode: fmt.Sprintf("%d", uniqueCode), //提示 不传该字段
DataBody: dataBody,
}
complexData.SensorData = append(complexData.SensorData, dataDefinition)
}
}
v, _ := json.Marshal(complexData)
log.Printf("[struct:%d,factor:%d] 特征数据=> %s", structId, factorId, v)
result, _ = proto.Marshal(complexData)
log.Printf("[struct:%d,factor:%d] protobuf数据=> %s", structId, factorId, hex.EncodeToString(result))
return result
}
func (the Adaptor_ZWYES_ZJHL) getMonitorTypeByFactorId(factorId int) protoFiles_zjhl_v3.MonitoryType {
//结构温度4 桥墩倾斜 15 裂缝18 支座位移20 桥面振动28 加速度三项监测592
switch factorId {
case 4:
return protoFiles_zjhl_v3.MonitoryType_TMP
case 6:
return protoFiles_zjhl_v3.MonitoryType_VIC
case 11:
return protoFiles_zjhl_v3.MonitoryType_RSG
case 15:
return protoFiles_zjhl_v3.MonitoryType_INC
case 18:
return protoFiles_zjhl_v3.MonitoryType_CRK
case 19:
return protoFiles_zjhl_v3.MonitoryType_HPT
case 20, 24:
return protoFiles_zjhl_v3.MonitoryType_DIS
case 28:
return protoFiles_zjhl_v3.MonitoryType_VIB
case 592:
return protoFiles_zjhl_v3.MonitoryType_VIB
default:
log.Printf("factorId=%d,无匹配的MonitorType", factorId)
return protoFiles_zjhl_v3.MonitoryType_CMM
}
}
func (the Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket HBJCAS.BucketsXY) *protoFiles_zjhl_v3.DataDefinition_StatisticData {
Atime := dateBucket.KeyAsString.Add(-8 * time.Hour).UnixMilli()
maxAbsoluteValueX := max(math.Abs(dateBucket.X.Max), math.Abs(dateBucket.X.Min))
avgValueX := dateBucket.X.Avg
rootMeanSquareX := math.Sqrt(dateBucket.X.SumOfSquares / float64(dateBucket.X.Count))
maxAbsoluteValueY := max(math.Abs(dateBucket.Y.Max), math.Abs(dateBucket.Y.Min))
avgValueY := dateBucket.Y.Avg
rootMeanSquareY := math.Sqrt(dateBucket.Y.SumOfSquares / float64(dateBucket.Y.Count))
monitoryType := the.getMonitorTypeByFactorId(factorId)
dataDefinitionStatisticData := &protoFiles_zjhl_v3.DataDefinition_StatisticData{
StatisticData: &protoFiles_zjhl_v3.StatisticData{
MonitorType: monitoryType,
MonitorCode: monitorCode, //测点唯一编码
EventTime: Atime,
Interval: 60 * 1000,
},
}
switch factorId {
case 4: //结构温度
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Tmp{Tmp: &protoFiles_zjhl_v3.TMPStatistic{
MaxTemperature: float32(dateBucket.X.Max),
MinTemperature: float32(dateBucket.X.Min),
AvgTemperature: float32(avgValueX),
MaxDifference: float32(dateBucket.X.Max - dateBucket.X.Min),
}}
case 6: //索力
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Vic{Vic: &protoFiles_zjhl_v3.VICStatistic{
MaxValue: float32(dateBucket.X.Max),
MinValue: float32(dateBucket.X.Min),
AvgValue: float32(avgValueX),
RootMeanSquare: float32(rootMeanSquareX),
}}
case 11: //应变
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Rsg{Rsg: &protoFiles_zjhl_v3.RSGStatistic{
MaxAbsoluteValue: float32(maxAbsoluteValueX),
AvgValue: float32(avgValueX),
}}
case 15: //倾角
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Inc{Inc: &protoFiles_zjhl_v3.INCStatistic{
MaxAbsoluteValueX: float32(maxAbsoluteValueX),
AvgValueX: float32(avgValueX),
RootMeanSquareX: float32(rootMeanSquareX),
MaxAbsoluteValueY: float32(maxAbsoluteValueY),
AvgValueY: float32(avgValueY),
RootMeanSquareY: float32(rootMeanSquareY),
}}
case 18: //裂缝监测
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Crk{Crk: &protoFiles_zjhl_v3.CRKStatistic{
MaxAbsoluteValue: float32(maxAbsoluteValueX),
AvgValue: float32(avgValueX),
RootMeanSquare: float32(rootMeanSquareX),
TotalAbsoluteValue: float32(dateBucket.X.Max - dateBucket.X.Min),
}}
case 19: //挠度监测
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Hpt{Hpt: &protoFiles_zjhl_v3.HPTStatistic{
MaxAbsoluteValue: float32(maxAbsoluteValueX),
AvgValue: float32(avgValueX),
RootMeanSquare: float32(rootMeanSquareX),
}}
case 20, 24: //支座位移20, 位移24
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Dis{Dis: &protoFiles_zjhl_v3.DISStatistic{
MaxAbsoluteValue: float32(maxAbsoluteValueX),
AvgValue: float32(avgValueX),
RootMeanSquare: float32(rootMeanSquareX),
TotalAbsoluteValue: float32(dateBucket.X.Max - dateBucket.X.Min),
}}
case 28: //振动
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Vib{Vib: &protoFiles_zjhl_v3.VIBStatistic{
MaxAbsoluteValue: float32(maxAbsoluteValueX),
RootMeanSquare: float32(rootMeanSquareY),
}}
case 935: //gnss
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Gnss{Gnss: &protoFiles_zjhl_v3.GNSSStatistic{}}
}
return dataDefinitionStatisticData
}
func (the Adaptor_ZWYES_ZJHL) getUniqueCode(structId int64) (uniqueCode int64) {
if v, ok := the.StructInfo[structId]; ok {
uniqueCode = v
}
return uniqueCode
}
func (the Adaptor_ZWYES_ZJHL) getPointCodeFromLabel(label string) int64 {
//解析label {ID:6500030042}
pointUniqueCode := int64(0)
if len(label) > 5 {
newLabel := strings.TrimLeft(label, "{ID:")
str := strings.TrimRight(newLabel, "}")
codeInt64, err := strconv.ParseInt(str, 10, 64)
if err != nil {
log.Printf("测点标签转换异常[%s]", label)
}
pointUniqueCode = codeInt64
}
return pointUniqueCode
}