diff --git a/adaptors/知物云es主题特征to中交华联.go b/adaptors/知物云es主题特征to中交华联.go index 2a00d09..4897232 100644 --- a/adaptors/知物云es主题特征to中交华联.go +++ b/adaptors/知物云es主题特征to中交华联.go @@ -4,8 +4,8 @@ import ( "encoding/hex" "encoding/json" "fmt" + "goInOut/consumers/GZG2ZJHL" "goInOut/consumers/GZG2ZJHL/protoFiles_zjhl/protoFiles_zjhl_v3" - "goInOut/consumers/HBJCAS" "goInOut/dbOperate" "goInOut/models" "google.golang.org/protobuf/proto" @@ -27,7 +27,7 @@ type Adaptor_ZWYES_ZJHL struct { } func (the Adaptor_ZWYES_ZJHL) Transform(structId int64, factorId int, rawMsg string) []NeedPush { - esAggDateHistogram := HBJCAS.EsThemeAggDateHistogram{} + esAggDateHistogram := GZG2ZJHL.EsThemeAggDateHistogram{} var needPush []NeedPush err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram) if err != nil { @@ -45,7 +45,7 @@ func (the Adaptor_ZWYES_ZJHL) Transform(structId int64, factorId int, rawMsg str return needPush } -func (the Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, esAggs HBJCAS.EsThemeAggDateHistogram) (result []byte) { +func (the Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, esAggs GZG2ZJHL.EsThemeAggDateHistogram) (result []byte) { buckets := esAggs.Aggregations.GroupSensor.Buckets if len(buckets) == 0 { log.Printf("[s=%d,f=%d] ,es agg数据数量==0", structId, factorId) @@ -72,7 +72,7 @@ func (the Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, esA } monitorCode := the.getPointCodeFromLabel(station.Labels) if monitorCode == 0 { - log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签,信息转换int64异常,跳过", structId, factorId, sensorId) + log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签[%s],信息转换int64异常,跳过", structId, factorId, sensorId, station.Labels) continue } @@ -103,6 +103,8 @@ func (the Adaptor_ZWYES_ZJHL) getMonitorTypeByFactorId(factorId int) protoFiles_ return protoFiles_zjhl_v3.MonitoryType_TMP case 6: return protoFiles_zjhl_v3.MonitoryType_VIC + case 7: + return protoFiles_zjhl_v3.MonitoryType_HSD case 11: return protoFiles_zjhl_v3.MonitoryType_RSG case 15: @@ -115,23 +117,16 @@ func (the Adaptor_ZWYES_ZJHL) getMonitorTypeByFactorId(factorId int) protoFiles_ return protoFiles_zjhl_v3.MonitoryType_DIS case 28: return protoFiles_zjhl_v3.MonitoryType_VIB - case 592: - return protoFiles_zjhl_v3.MonitoryType_VIB + case 935: + return protoFiles_zjhl_v3.MonitoryType_GNSS default: log.Printf("factorId=%d,无匹配的MonitorType", factorId) return protoFiles_zjhl_v3.MonitoryType_CMM } } -func (the Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket HBJCAS.BucketsXY) *protoFiles_zjhl_v3.DataDefinition_StatisticData { +func (the Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket GZG2ZJHL.BucketsXYZ) *protoFiles_zjhl_v3.DataDefinition_StatisticData { Atime := dateBucket.KeyAsString.Add(-8 * time.Hour).UnixMilli() - maxAbsoluteValueX := max(math.Abs(dateBucket.X.Max), math.Abs(dateBucket.X.Min)) - avgValueX := dateBucket.X.Avg - rootMeanSquareX := math.Sqrt(dateBucket.X.SumOfSquares / float64(dateBucket.X.Count)) - - maxAbsoluteValueY := max(math.Abs(dateBucket.Y.Max), math.Abs(dateBucket.Y.Min)) - avgValueY := dateBucket.Y.Avg - rootMeanSquareY := math.Sqrt(dateBucket.Y.SumOfSquares / float64(dateBucket.Y.Count)) monitoryType := the.getMonitorTypeByFactorId(factorId) dataDefinitionStatisticData := &protoFiles_zjhl_v3.DataDefinition_StatisticData{ @@ -143,6 +138,18 @@ func (the Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int6 }, } + maxAbsoluteValueX := max(math.Abs(dateBucket.X.Max), math.Abs(dateBucket.X.Min)) + avgValueX := dateBucket.X.Avg + rootMeanSquareX := math.Sqrt(dateBucket.X.SumOfSquares / float64(dateBucket.X.Count)) + + maxAbsoluteValueY := max(math.Abs(dateBucket.Y.Max), math.Abs(dateBucket.Y.Min)) + avgValueY := dateBucket.Y.Avg + rootMeanSquareY := math.Sqrt(dateBucket.Y.SumOfSquares / float64(dateBucket.Y.Count)) + + maxAbsoluteValueZ := max(math.Abs(dateBucket.Z.Max), math.Abs(dateBucket.Z.Min)) + avgValueZ := dateBucket.Z.Avg + rootMeanSquareZ := math.Sqrt(dateBucket.Z.SumOfSquares / float64(dateBucket.Z.Count)) + switch factorId { case 4: //结构温度 dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Tmp{Tmp: &protoFiles_zjhl_v3.TMPStatistic{ @@ -158,6 +165,14 @@ func (the Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int6 AvgValue: float32(avgValueX), RootMeanSquare: float32(rootMeanSquareX), }} + case 7: //车流量(车载称重) x->load y->overload + dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Hsd{Hsd: &protoFiles_zjhl_v3.HSDStatistic{ + TrafficFlow: int32(dateBucket.X.Count), + MaxTotalLoad: int32(maxAbsoluteValueX), + //MaxAxleLoad: 0, //目前无法支持统计 字符串类型,先不上传 + OverLoadCount: int32(dateBucket.Y.Sum), + AvgLoad: float32(avgValueX), + }} case 11: //应变 dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Rsg{Rsg: &protoFiles_zjhl_v3.RSGStatistic{ MaxAbsoluteValue: float32(maxAbsoluteValueX), @@ -198,7 +213,26 @@ func (the Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int6 RootMeanSquare: float32(rootMeanSquareY), }} case 935: //gnss - dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Gnss{Gnss: &protoFiles_zjhl_v3.GNSSStatistic{}} + dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Gnss{Gnss: &protoFiles_zjhl_v3.GNSSStatistic{ + // 统计时间范围内的空间变形X绝对最大值 + MaxAbsoluteValueX: float32(maxAbsoluteValueX), + // 统计时间范围内的空间变形X平均值 + AvgValueX: float32(avgValueX), + // 统计时间范围内的空间变形X均方根 + RootMeanSquareX: float32(rootMeanSquareX), + // 统计时间范围内的空间变形Y绝对最大值 + MaxAbsoluteValueY: float32(maxAbsoluteValueY), + // 统计时间范围内的空间变形Y平均值 + AvgValueY: float32(avgValueY), + // 统计时间范围内的空间变形Y均方根 + RootMeanSquareY: float32(rootMeanSquareY), + // 统计时间范围内的空间变形Z绝对最大值 + MaxAbsoluteValueZ: float32(maxAbsoluteValueZ), + // 统计时间范围内的空间变形Z平均值 + AvgValueZ: float32(avgValueZ), + // 统计时间范围内的空间变形Z均方根 + RootMeanSquareZ: float32(rootMeanSquareZ), + }} } return dataDefinitionStatisticData diff --git a/consumers/GZG2ZJHL/dataModel.go b/consumers/GZG2ZJHL/dataModel.go new file mode 100644 index 0000000..281c387 --- /dev/null +++ b/consumers/GZG2ZJHL/dataModel.go @@ -0,0 +1,67 @@ +package GZG2ZJHL + +import "time" + +type EsThemeAggDateHistogram struct { + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Shards struct { + Total int `json:"total"` + Successful int `json:"successful"` + Skipped int `json:"skipped"` + Failed int `json:"failed"` + } `json:"_shards"` + Hits struct { + Total int `json:"total"` + MaxScore float64 `json:"max_score"` + Hits []interface{} `json:"hits"` + } `json:"hits"` + Aggregations GPBySensorIdAggByDateHistogram `json:"aggregations"` +} +type GPBySensorIdAggByDateHistogram struct { + GroupSensor struct { + DocCountErrorUpperBound int `json:"doc_count_error_upper_bound"` + SumOtherDocCount int `json:"sum_other_doc_count"` + Buckets []Buckets `json:"buckets"` + } `json:"groupSensor"` +} + +type Buckets struct { + Key int64 `json:"key"` + DocCount int64 `json:"doc_count"` + GroupDate struct { + Buckets []BucketsXYZ `json:"buckets"` + } `json:"groupDate"` +} + +type BucketsX struct { + KeyAsString time.Time `json:"key_as_string"` + Key int64 `json:"key"` + DocCount int `json:"doc_count"` + X ExtendedStats `json:"x"` +} + +type BucketsXY struct { + BucketsX + Y ExtendedStats `json:"y"` +} + +type BucketsXYZ struct { + BucketsXY + Z ExtendedStats `json:"z"` +} + +type ExtendedStats struct { + Count int `json:"count"` + Min float64 `json:"min"` + Max float64 `json:"max"` + Avg float64 `json:"avg"` + Sum float64 `json:"sum"` + SumOfSquares float64 `json:"sum_of_squares"` + Variance float64 `json:"variance"` + StdDeviation float64 `json:"std_deviation"` + StdDeviationBounds struct { + Upper float64 `json:"upper"` + Lower float64 `json:"lower"` + } `json:"std_deviation_bounds"` +} diff --git a/consumers/consumerGZG2ZJHL.go b/consumers/consumerGZG2ZJHL.go index 2b544cd..1e64059 100644 --- a/consumers/consumerGZG2ZJHL.go +++ b/consumers/consumerGZG2ZJHL.go @@ -131,7 +131,7 @@ func (the *consumerGZG2ZJHL) getStructIds() []int64 { func (the *consumerGZG2ZJHL) getEs1HourAggData() { start, end := utils.GetTimeRangeByHour(-1) log.Printf("查询数据时间范围 %s - %s", start, end) - hourFactorIds := []int{4, 6, 11} //4(温度),6索力, 15, 18, 20 + hourFactorIds := []int{7, 935, 4, 6, 11} //Gnss 935 ,4(温度),6索力, 15, 18, 20 structIds := the.getStructIds() for _, structId := range structIds { for _, factorId := range hourFactorIds { @@ -316,7 +316,7 @@ func (the *consumerGZG2ZJHL) getESQueryStrBy10min(structureId int64, factorId in } func getEsAggSubSqlByZwyFactorId(factorId int) string { - //桥墩倾斜 15 裂缝 18 支座位移20 挠度19 桥面振动28 加速度三项监测592(承德隧道专用) + //桥墩倾斜 15 裂缝 18 支座位移20 挠度19 桥面振动28 Gnss935 subAggSQl := "" switch factorId { case 4: //结构温度 @@ -336,6 +336,20 @@ func getEsAggSubSqlByZwyFactorId(factorId int) string { "field": "data.cableForce" } } +}` + case 7: // 称重 车载 + subAggSQl = ` +{ + "x": { + "extended_stats": { + "field": "data.load" + } + }, + "y": { + "extended_stats": { + "field": "data.overload" + } + } }` case 11: //应变 subAggSQl = ` @@ -401,12 +415,22 @@ func getEsAggSubSqlByZwyFactorId(factorId int) string { } } }` - case 592: + case 935: subAggSQl = ` { "x": { "extended_stats": { - "field": "data.z_acc_speed" + "field": "data.x" + } + }, + "y": { + "extended_stats": { + "field": "data.y" + } + }, + "z": { + "extended_stats": { + "field": "data.z" } } }`