Browse Source

update 更新广东省平台处理

dev
lucas 1 month ago
parent
commit
06a0acf53b
  1. 190
      adaptors/安心云es主题特征to广东省平台.go
  2. 11
      config/init.go
  3. 29
      consumers/GDJKJC/config.go
  4. 2
      consumers/HBJCAS/config.go
  5. 8
      consumers/consumerAXYES2GDJKJC.go
  6. 13
      consumers/consumer_test.go

190
adaptors/安心云es主题特征to广东省平台.go

@ -0,0 +1,190 @@
package adaptors
import (
"encoding/json"
"fmt"
"goInOut/consumers/HBJCAS"
"goInOut/consumers/HBJCAS/protoFiles_hb"
"goInOut/dbOperate"
"goInOut/models"
"google.golang.org/protobuf/proto"
"log"
"math"
"strconv"
"strings"
"time"
)
// Adaptor_AXYES_HBGL 统一采集软件数据 转换 湘潭健康监测平台
type Adaptor_AXYES_GDJKJC struct {
//传感器code转换信息
PointInfo map[int64]map[int64]int64
StructInfo map[int64]string
//一些必要信息
Info map[string]string
Redis *dbOperate.RedisHelper
}
func (the Adaptor_AXYES_GDJKJC) Transform(structId int64, factorId int, rawMsg string) []NeedPush {
esAggDateHistogram := HBJCAS.EsThemeAggDateHistogram{}
var needPush []NeedPush
err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram)
if err != nil {
return nil
}
Payload := the.EsAggTopToHBJCAS(structId, factorId, esAggDateHistogram)
if len(Payload) == 0 {
return needPush
}
needPush = append(needPush, NeedPush{
Payload: Payload,
})
return needPush
}
func (the Adaptor_AXYES_GDJKJC) EsAggTopToHBJCAS(structId int64, factorId int, esAggs HBJCAS.EsThemeAggDateHistogram) (result []byte) {
buckets := esAggs.Aggregations.GroupSensor.Buckets
if len(buckets) == 0 {
log.Printf("[s=%d,f=%d] ,es agg数据数量==0", structId, factorId)
return
}
//设施唯一编码(省平台)
uniqueCode := the.getUniqueCode(structId)
if uniqueCode == "" {
log.Printf("structId=%d,无匹配省平台uniqueCode", structId)
return
}
//数据汇总
complexData := &protoFiles_hb.ComplexData{}
for _, sensorBucket := range buckets {
sensorId := sensorBucket.Key
for _, dateBucket := range sensorBucket.GroupDate.Buckets {
//优先redis获取
station := models.Station{}
k1 := fmt.Sprintf("station:%d", sensorId)
errRedis := the.Redis.GetObj(k1, &station)
if errRedis != nil {
log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签异常", structId, factorId, sensorId)
continue
}
monitorCode := the.getPointCodeFromLabel(station.Labels)
if monitorCode == 0 {
log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签,信息转换int64异常,跳过", structId, factorId, sensorId)
continue
}
dataDefinition := &protoFiles_hb.DataDefinition{
DataType: protoFiles_hb.DataType_STATISTICS,
UniqueCode: fmt.Sprintf("%d", uniqueCode), //乃积沟大桥
DataBody: the.EsAgg2StatisticData(factorId, monitorCode, dateBucket),
}
complexData.SensorData = append(complexData.SensorData, dataDefinition)
}
}
v, _ := json.Marshal(complexData)
log.Printf("[s:%d,f:%d] 特征数据=> %s", structId, factorId, v)
result, _ = proto.Marshal(complexData)
return result
}
func (the Adaptor_AXYES_GDJKJC) getMonitorTypeByFactorId(factorId int) protoFiles_hb.MonitoryType {
//桥墩倾斜 15 裂缝18 支座位移20 桥面振动28 加速度三项监测592
switch factorId {
case 15:
return protoFiles_hb.MonitoryType_INC
case 18:
return protoFiles_hb.MonitoryType_CRK
case 20:
return protoFiles_hb.MonitoryType_DIS
case 28:
return protoFiles_hb.MonitoryType_VIB
case 592:
return protoFiles_hb.MonitoryType_VIB
default:
log.Printf("factorId=%d,无匹配的MonitorType", factorId)
return protoFiles_hb.MonitoryType_CMM
}
}
func (the Adaptor_AXYES_GDJKJC) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket HBJCAS.BucketsXY) *protoFiles_hb.DataDefinition_StatisticData {
Atime := dateBucket.KeyAsString.Add(-8 * time.Hour).UnixMilli()
maxAbsoluteValueX := max(math.Abs(dateBucket.X.Max), math.Abs(dateBucket.X.Min))
avgValueX := dateBucket.X.Avg
rootMeanSquareX := math.Sqrt(dateBucket.X.SumOfSquares / float64(dateBucket.X.Count))
maxAbsoluteValueY := max(math.Abs(dateBucket.Y.Max), math.Abs(dateBucket.Y.Min))
avgValueY := dateBucket.Y.Avg
rootMeanSquareY := math.Sqrt(dateBucket.Y.SumOfSquares / float64(dateBucket.Y.Count))
monitoryType := the.getMonitorTypeByFactorId(factorId)
dataDefinitionStatisticData := &protoFiles_hb.DataDefinition_StatisticData{
StatisticData: &protoFiles_hb.StatisticData{
MonitorType: monitoryType,
MonitorCode: monitorCode, //测点唯一编码
EventTime: Atime,
Interval: 60 * 1000,
},
}
switch factorId {
case 15: //倾角
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Inc{Inc: &protoFiles_hb.INCStatistic{
MaxAbsoluteValueX: float32(maxAbsoluteValueX),
AvgValueX: float32(avgValueX),
RootMeanSquareX: float32(rootMeanSquareX),
MaxAbsoluteValueY: float32(maxAbsoluteValueY),
AvgValueY: float32(avgValueY),
RootMeanSquareY: float32(rootMeanSquareY),
}}
case 18: //裂缝监测
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Crk{Crk: &protoFiles_hb.CRKStatistic{
MaxAbsoluteValue: float32(maxAbsoluteValueX),
AvgValue: float32(avgValueX),
RootMeanSquare: float32(rootMeanSquareX),
TotalAbsoluteValue: float32(dateBucket.X.Max - dateBucket.X.Min),
}}
case 20: //支座位移
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Dis{Dis: &protoFiles_hb.DISStatistic{
MaxAbsoluteValue: float32(maxAbsoluteValueX),
AvgValue: float32(avgValueX),
RootMeanSquare: float32(rootMeanSquareX),
TotalAbsoluteValue: float32(dateBucket.X.Max - dateBucket.X.Min),
}}
case 28: //振动
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Vib{Vib: &protoFiles_hb.VIBStatistic{
MaxAbsoluteValue: float32(maxAbsoluteValueX),
RootMeanSquare: float32(rootMeanSquareY),
}}
case 592: //加速度三项监测
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Vib{Vib: &protoFiles_hb.VIBStatistic{
MaxAbsoluteValue: float32(maxAbsoluteValueX),
RootMeanSquare: float32(rootMeanSquareX),
}}
}
return dataDefinitionStatisticData
}
func (the Adaptor_AXYES_GDJKJC) getUniqueCode(structId int64) (uniqueCode string) {
if v, ok := the.StructInfo[structId]; ok {
uniqueCode = v
}
return uniqueCode
}
func (the Adaptor_AXYES_GDJKJC) getPointCodeFromLabel(label string) int64 {
//解析label {13010600001}
pointUniqueCode := int64(0)
if len(label) > 2 {
newLabel := strings.TrimLeft(label, "{")
str := strings.TrimRight(newLabel, "}")
codeInt64, err := strconv.ParseInt(str, 10, 64)
if err != nil {
log.Printf("测点标签转换异常[%s]", label)
}
pointUniqueCode = codeInt64
}
return pointUniqueCode
}

11
config/init.go

@ -36,10 +36,15 @@ func LoadConfigJson() map[string]string {
return allConfig
}
func LoadConfig() map[string]string {
func LoadConfig(path ...string) map[string]string {
allConfig := make(map[string]string)
files, err := os.ReadDir("configFiles")
defaultDir := "configFiles"
if len(path) != 0 {
defaultDir = path[0]
}
files, err := os.ReadDir(defaultDir)
if err != nil {
log.Fatal(err)
}
@ -51,7 +56,7 @@ func LoadConfig() map[string]string {
continue
}
// 读取配置
configBytes, _ := os.ReadFile(fmt.Sprintf("configFiles/%s", file.Name()))
configBytes, _ := os.ReadFile(fmt.Sprintf("%s/%s", defaultDir, file.Name()))
consumer := new(Consumer)
// 将 JSON 格式的数据解析到结构体中
err = yaml.Unmarshal(configBytes, &consumer)

29
consumers/GDJKJC/config.go

@ -0,0 +1,29 @@
package GDJKJC
import "goInOut/config"
type ConfigFile struct {
IoConfig ioConfig `yaml:"ioConfig"`
OtherInfo map[string]string `yaml:"info"`
PointInfo map[int64]map[int64]int64 `yaml:"pointInfo"`
StructInfo map[int64]string `yaml:"structInfo"`
Monitor map[string]string `yaml:"monitor"`
QueryComponent queryComponent `yaml:"queryComponent"`
}
type ioConfig struct {
In In `yaml:"in"`
Out Out `yaml:"out"`
}
type In struct {
Http config.HttpConfig `yaml:"http"`
}
type Out struct {
Http config.HttpConfig `yaml:"http"`
}
type queryComponent struct {
Redis struct {
Address string `yaml:"address"`
} `yaml:"redis"`
}

2
consumers/HBJCAS/config.go

@ -19,7 +19,7 @@ type In struct {
}
type Out struct {
Http config.HttpConfig `yaml:"http"`
Mqtt config.MqttConfig `yaml:"mqtt"`
}
type queryComponent struct {

8
consumers/consumerAXYES2GDJKJC.go

@ -5,7 +5,7 @@ import (
"encoding/hex"
"fmt"
"goInOut/adaptors"
"goInOut/consumers/HBJCAS"
"goInOut/consumers/GDJKJC"
"goInOut/dbOperate"
"goInOut/monitors"
"goInOut/utils"
@ -18,7 +18,7 @@ type consumerAXYES2GDJKJC struct {
//数据缓存管道
ch chan []adaptors.NeedPush
//具体配置
Info HBJCAS.ConfigFile
Info GDJKJC.ConfigFile
InHttp *dbOperate.HttpHelper
outHttp *dbOperate.HttpHelper
monitor *monitors.CommonMonitor
@ -101,9 +101,9 @@ func (the *consumerAXYES2GDJKJC) Work() {
}()
}
func (the *consumerAXYES2GDJKJC) getAdaptor() (adaptor adaptors.Adaptor_AXYES_HBGL) {
func (the *consumerAXYES2GDJKJC) getAdaptor() (adaptor adaptors.Adaptor_AXYES_GDJKJC) {
return adaptors.Adaptor_AXYES_HBGL{
return adaptors.Adaptor_AXYES_GDJKJC{
Redis: the.infoRedis,
}
}

13
consumers/consumer_test.go

@ -2,7 +2,7 @@ package consumers
import (
"fmt"
"goUpload/config"
"goInOut/config"
"log"
"testing"
"time"
@ -260,3 +260,14 @@ func Test_MYX_GDGS(t *testing.T) {
consumerMyx.onFileData()
}
func Test_GDJKJC(t *testing.T) {
log.Printf("广州省平台")
myConfigs := config.LoadConfig("../configFiles")
configBody := myConfigs["consumerAXYES2GDJKJC"]
_consumerAXYES2GDJKJC := new(consumerAXYES2GDJKJC)
_consumerAXYES2GDJKJC.Initial(configBody)
//go _consumerGZGZM.RefreshTask()
time.Sleep(2 * time.Second)
}

Loading…
Cancel
Save