package adaptors import ( "encoding/json" "fmt" "goInOut/consumers/AXYraw" "goInOut/dbHelper" "goInOut/models" "log" "sync" "time" ) // Adaptor_AXY_LastRAW 安心云 kafka iota数据 转换 es设备数据 type Adaptor_AXY_LastRAW struct { AXYraw.Info Redis *dbHelper.RedisHelper } func (the Adaptor_AXY_LastRAW) Transform(topic, rawMsg string) *models.EsRaw { iotaData := models.IotaData{} json.Unmarshal([]byte(rawMsg), &iotaData) return the.raw2es(iotaData) } func (the Adaptor_AXY_LastRAW) raw2es(iotaData models.IotaData) *models.EsRaw { if !iotaData.Data.Success() { return nil } log.Printf("设备[%s] 数据时间 %s", iotaData.DeviceId, iotaData.TriggerTime) deviceInfo := the.GetDeviceInfo(iotaData.DeviceId) //查不到信息的数据 if deviceInfo.Name == "" { log.Printf("设备[%s] 无deviceInfo信息 %s", iotaData.DeviceId, iotaData.TriggerTime) return nil } dataType := "" if _dataType, ok := iotaData.Data.Data["_data_type"]; ok { if v, ok := _dataType.(string); ok { dataType = v } } devdata := &models.DeviceData{ DeviceId: iotaData.DeviceId, Name: deviceInfo.Name, ThingId: iotaData.ThingId, StructId: deviceInfo.Structure.Id, AcqTime: iotaData.TriggerTime, RealTime: iotaData.RealTime, ErrCode: 0, Raw: iotaData.Data.Data, DeviceInfo: deviceInfo, DimensionId: iotaData.DimensionId, DataType: dataType, } EsRaws := toEsRaw(devdata) return EsRaws } var deviceInfoMap = map[string]models.DeviceInfo{} var deviceInfoMap2 = sync.Map{} func (the Adaptor_AXY_LastRAW) GetDeviceInfo(deviceId string) models.DeviceInfo { if vObj, ok := deviceInfoMap2.Load(deviceId); ok { if v, ok := vObj.(models.DeviceInfo); ok { durationMin := time.Now().Sub(v.RefreshTime).Minutes() if durationMin < 5 { return v } } } v, err := the.GetDeviceInfoFromRedis(deviceId) if err != nil { log.Printf("设备%s 无法查询到对应信息", deviceId) } //deviceInfoMap[deviceId] = v deviceInfoMap2.Store(deviceId, v) return v } func (the Adaptor_AXY_LastRAW) GetDeviceInfoFromRedis(deviceId string) (models.DeviceInfo, error) { Key_Iota_device := "iota_device" key_Thing_struct := "thing_struct" key_Iota_meta := "iota_meta" k1 := fmt.Sprintf("%s:%s", Key_Iota_device, deviceId) dev := models.IotaDevice{} thingStruct := models.ThingStruct{} devMeta := models.DeviceMeta{} err1 := the.Redis.GetObj(k1, &dev) if err1 != nil { return models.DeviceInfo{RefreshTime: time.Now()}, err1 } k2 := fmt.Sprintf("%s:%s", key_Thing_struct, dev.ThingId) err2 := the.Redis.GetObj(k2, &thingStruct) if err2 != nil { return models.DeviceInfo{RefreshTime: time.Now()}, err2 } k3 := fmt.Sprintf("%s:%s", key_Iota_meta, dev.DeviceMeta.Id) err3 := the.Redis.GetObj(k3, &devMeta) if err3 != nil { return models.DeviceInfo{RefreshTime: time.Now()}, err3 } //if err1 != nil || err2 != nil || err3 != nil { // log.Printf("redis读取异常,err1=%s, err2=%s, err3=%s", err1, err2, err3) // return models.DeviceInfo{} //} s := models.Structure{ ThingId: thingStruct.ThingId, Id: thingStruct.Id, Name: thingStruct.Name, OrgId: thingStruct.OrgId, } return models.DeviceInfo{ Id: deviceId, Name: dev.Name, Structure: s, DeviceMeta: devMeta, RefreshTime: time.Now(), }, nil } func toEsRaw(deviceData *models.DeviceData) *models.EsRaw { if deviceData == nil { return nil } dataOutMeta := deviceData.DeviceInfo.DeviceMeta.GetOutputProps() createNativeRaw := models.EsRaw{ StructId: deviceData.StructId, IotaDeviceName: deviceData.Name, Data: deviceData.Raw, CollectTime: deviceData.AcqTime, Meta: dataOutMeta, IotaDevice: deviceData.DeviceId, CreateTime: time.Now(), } return &createNativeRaw }