package adaptors import ( "encoding/hex" "encoding/json" "fmt" "goInOut/consumers/JYES_NJZX" "goInOut/consumers/JYES_NJZX/protoDataFiles" "goInOut/dbOperate" "goInOut/models" "google.golang.org/protobuf/proto" "log" "strings" "time" ) // Adaptor_AXYES_NJZX 安心云依加尔山es 特征数据 to 南京智行平台 type Adaptor_AXYES_NJZX struct { //传感器code转换信息 PointInfo map[int64]map[int64]int64 StructInfo map[int64]int64 //一些必要信息 Info map[string]string Redis *dbOperate.RedisHelper } func (the *Adaptor_AXYES_NJZX) Transform(structId int64, factorId int, rawMsg string) []NeedPush { //es查到的数据分装进结构体里面 esAggDateHistogram := JYES_NJZX.EsThemeDateValue{} var needPush []NeedPush err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram) if err != nil { fmt.Println("Error unmarshalling JSON:", err) return nil } Payload := the.EsDataValueChangeToNJZX(structId, factorId, esAggDateHistogram) if len(Payload) == 0 { return needPush } needPush = append(needPush, NeedPush{ Payload: Payload, }) return needPush } func (the *Adaptor_AXYES_NJZX) EsDataValueChangeToNJZX(structId int64, factorId int, esDataValue JYES_NJZX.EsThemeDateValue) (result []byte) { buckets := esDataValue.Hits.Hits //数据汇总 complexData := &protoDataFiles.ComplexData{} for _, sensorBucket := range buckets { sensorId := sensorBucket.Source.Sensor //安心云de测点id //优先redis获取 station := models.Station{} k1 := fmt.Sprintf("station:%d", sensorId) errRedis := the.Redis.GetObj(k1, &station) if errRedis != nil { log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签异常: %v", structId, factorId, sensorId, errRedis) continue } monitorCodeStr := the.getPointCodeFromLabel(station.Labels) if monitorCodeStr == "" { log.Printf("redis 获取[s:%d,f:%d]测点[%d],标签信息[%s]转换异常,跳过", structId, factorId, sensorId, station.Labels) continue } dataDefinition := the.ChangeToNJZXData(factorId, monitorCodeStr, sensorBucket) complexData.SensorData = append(complexData.SensorData, dataDefinition) } //v, _ := json.Marshal(complexData) //log.Printf("[struct:%d,factor:%d] 特征数据=> %s", structId, factorId, v) result, _ = proto.Marshal(complexData) log.Printf("[struct:%d,factor:%d] protobuf数据=> %s", structId, factorId, hex.EncodeToString(result)) return result } func (the *Adaptor_AXYES_NJZX) getMonitorTypeByFactorId(factorId int) protoDataFiles.MonitoryType { //监测因素 2温湿度 4温度 18裂缝检测 //103净空收敛 102拱顶沉降 96二次衬彻应变 //107道床及拱腰结构沉降 156风速 578风向 switch factorId { case 2: //温湿度 return protoDataFiles.MonitoryType_RHS case 4: //温度 return protoDataFiles.MonitoryType_TMP case 18: //裂缝检测 return protoDataFiles.MonitoryType_CRK case 103: //净空收敛 return protoDataFiles.MonitoryType_INC //无对应 case 102: //拱顶沉降 return protoDataFiles.MonitoryType_CRK //无对应 case 96: //二次衬彻应变 return protoDataFiles.MonitoryType_RSG case 107: //道床及拱腰结构沉降 return protoDataFiles.MonitoryType_VIB //无对应 case 156: //风速 return protoDataFiles.MonitoryType_WDS case 578: //风向 return protoDataFiles.MonitoryType_WDD default: log.Printf("factorId=%d,无匹配的MonitorType", factorId) return protoDataFiles.MonitoryType_RHS } } func (the *Adaptor_AXYES_NJZX) parseTimeToTimestamp(timeStr string) (int64, error) { // 解析时间字符串为 time.Time 对象 parsedTime, err := time.Parse(time.RFC3339, timeStr) if err != nil { return 0, err } // 返回 Unix 时间戳(秒数) return parsedTime.Unix(), nil } func (the *Adaptor_AXYES_NJZX) ChangeToNJZXData(factorId int, monitorCodeStr string, dateBucket JYES_NJZX.Hits) *protoDataFiles.SensorData { Atime, _ := the.parseTimeToTimestamp(dateBucket.Source.CollectTime) monitoryType := the.getMonitorTypeByFactorId(factorId) dataDefinitionData := &protoDataFiles.SensorData{ MonitorType: monitoryType, SensorNo: monitorCodeStr, UpTime: Atime, } switch factorId { case 2: //温湿度 dataDefinitionData.DataBody = &protoDataFiles.SensorData_Rhs{Rhs: &protoDataFiles.RHSRealTime{ Temperature: []float32{float32(dateBucket.Source.Data["temperature"])}, Humidity: []float32{float32(dateBucket.Source.Data["humidity"])}, }} case 4: //温度 dataDefinitionData.DataBody = &protoDataFiles.SensorData_Tmp{Tmp: &protoDataFiles.TMPRealTime{ Temperature: []float32{float32(dateBucket.Source.Data["temperature"])}, }} case 18: //裂缝检测 dataDefinitionData.DataBody = &protoDataFiles.SensorData_Crk{Crk: &protoDataFiles.CRKRealTime{ CrackWidth: []float32{float32(dateBucket.Source.Data["crack"])}, }} case 103: //净空收敛//123456789 dataDefinitionData.DataBody = &protoDataFiles.SensorData_Disr{Disr: &protoDataFiles.DISRRealTime{ Displacement: []float32{float32(dateBucket.Source.Data["displacement"])}, }} case 102: //拱顶沉降//123456789 dataDefinitionData.DataBody = &protoDataFiles.SensorData_Disr{Disr: &protoDataFiles.DISRRealTime{ Displacement: []float32{float32(dateBucket.Source.Data["displacement"])}, }} case 96: //二次衬彻应变 dataDefinitionData.DataBody = &protoDataFiles.SensorData_Rsg{Rsg: &protoDataFiles.RSGRealTime{ Strain: []float32{float32(dateBucket.Source.Data["strain"])}, }} case 107: //道床及拱腰结构沉降//123456789 dataDefinitionData.DataBody = &protoDataFiles.SensorData_Disr{Disr: &protoDataFiles.DISRRealTime{ Displacement: []float32{float32(dateBucket.Source.Data["displacement"])}, }} case 156: //风速 dataDefinitionData.DataBody = &protoDataFiles.SensorData_Wds{Wds: &protoDataFiles.WDSRealTime{ WindSpeed: []float32{float32(dateBucket.Source.Data["speed"])}, }} case 578: //风向 dataDefinitionData.DataBody = &protoDataFiles.SensorData_Wdd{Wdd: &protoDataFiles.WDDRealTime{ WindDirection: []float32{float32(dateBucket.Source.Data["direction"])}, }} } return dataDefinitionData } func (the *Adaptor_AXYES_NJZX) getUniqueCode(structId int64) (uniqueCode int64) { if v, ok := the.StructInfo[structId]; ok { uniqueCode = v } return uniqueCode } func (the *Adaptor_AXYES_NJZX) getPointCodeFromLabel(label string) string { //解析label {code:wd01} pointUniqueCode := "" if len(label) > 3 { newLabel := strings.TrimLeft(label, "{code:") str := strings.TrimRight(newLabel, "}") if str == "" { log.Printf("测点标签转换异常[%s]", label) } pointUniqueCode = str } return pointUniqueCode }