数据 输入输出 处理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

149 lines
4.0 KiB

package adaptors
import (
"encoding/json"
"fmt"
"goInOut/consumers/AXYraw"
"goInOut/dbOperate"
"goInOut/models"
"log"
"sync"
"time"
)
// Adaptor_AXY_LastRAW 安心云 kafka iota数据 转换 es设备数据
type Adaptor_AXY_LastRAW struct {
AXYraw.Info
Redis *dbOperate.RedisHelper
}
func (the Adaptor_AXY_LastRAW) Transform(topic, rawMsg string) *models.EsRaw {
iotaData := models.IotaData{}
json.Unmarshal([]byte(rawMsg), &iotaData)
return the.raw2es(iotaData)
}
func (the Adaptor_AXY_LastRAW) raw2es(iotaData models.IotaData) *models.EsRaw {
if len(iotaData.Data.Data) == 0 {
return nil
}
if !iotaData.Data.Success() {
msg, _ := json.Marshal(iotaData.Data.Result)
iotaData.Data.Data["errMsg"] = string(msg)
}
//log.Printf("设备[%s] 数据时间 %s", iotaData.DeviceId, iotaData.TriggerTime)
deviceInfo := the.GetDeviceInfo(iotaData.DeviceId)
//查不到信息的数据
if deviceInfo.Name == "" {
log.Printf("设备[%s] 无deviceInfo信息 %s", iotaData.DeviceId, iotaData.TriggerTime)
return nil
}
dataType := ""
if _dataType, ok := iotaData.Data.Data["_data_type"]; ok {
if v, ok := _dataType.(string); ok {
dataType = v
}
}
devdata := &models.DeviceData{
DeviceId: iotaData.DeviceId,
Name: deviceInfo.Name,
ThingId: iotaData.ThingId,
StructId: deviceInfo.Structure.Id,
AcqTime: iotaData.TriggerTime,
RealTime: iotaData.RealTime,
ErrCode: 0,
Raw: iotaData.Data.Data,
DeviceInfo: deviceInfo,
DimensionId: iotaData.DimensionId,
DataType: dataType,
}
EsRaws := toEsRaw(devdata)
return EsRaws
}
var deviceInfoMap = map[string]models.DeviceInfo{}
var deviceInfoMap2 = sync.Map{}
func (the Adaptor_AXY_LastRAW) GetDeviceInfo(deviceId string) models.DeviceInfo {
if vObj, ok := deviceInfoMap2.Load(deviceId); ok {
if v, ok := vObj.(models.DeviceInfo); ok {
durationMin := time.Now().Sub(v.RefreshTime).Minutes()
if durationMin < 5 {
return v
}
}
}
v, err := the.GetDeviceInfoFromRedis(deviceId)
if err != nil {
log.Printf("设备%s 无法查询到对应信息", deviceId)
}
//deviceInfoMap[deviceId] = v
deviceInfoMap2.Store(deviceId, v)
return v
}
func (the Adaptor_AXY_LastRAW) GetDeviceInfoFromRedis(deviceId string) (models.DeviceInfo, error) {
Key_Iota_device := "iota_device"
key_Thing_struct := "thing_struct"
key_Iota_meta := "iota_meta"
k1 := fmt.Sprintf("%s:%s", Key_Iota_device, deviceId)
dev := models.IotaDevice{}
thingStruct := models.ThingStruct{}
devMeta := models.DeviceMeta{}
err1 := the.Redis.GetObj(k1, &dev)
if err1 != nil {
return models.DeviceInfo{RefreshTime: time.Now()}, err1
}
k2 := fmt.Sprintf("%s:%s", key_Thing_struct, dev.ThingId)
err2 := the.Redis.GetObj(k2, &thingStruct)
if err2 != nil {
return models.DeviceInfo{RefreshTime: time.Now()}, err2
}
k3 := fmt.Sprintf("%s:%s", key_Iota_meta, dev.DeviceMeta.Id)
err3 := the.Redis.GetObj(k3, &devMeta)
if err3 != nil {
return models.DeviceInfo{RefreshTime: time.Now()}, err3
}
//if err1 != nil || err2 != nil || err3 != nil {
// log.Printf("redis读取异常,err1=%s, err2=%s, err3=%s", err1, err2, err3)
// return models.DeviceInfo{}
//}
s := models.Structure{
ThingId: thingStruct.ThingId,
Id: thingStruct.Id,
Name: thingStruct.Name,
OrgId: thingStruct.OrgId,
}
return models.DeviceInfo{
Id: deviceId,
Name: dev.Name,
Structure: s,
DeviceMeta: devMeta,
RefreshTime: time.Now(),
}, nil
}
func toEsRaw(deviceData *models.DeviceData) *models.EsRaw {
if deviceData == nil {
return nil
}
var cstZone = time.FixedZone("CST", 8*3600) // 东八区 用以解决
dataOutMeta := deviceData.DeviceInfo.DeviceMeta.GetOutputProps()
createNativeRaw := models.EsRaw{
StructId: deviceData.StructId,
IotaDeviceName: deviceData.Name,
Data: deviceData.Raw,
CollectTime: deviceData.AcqTime.In(cstZone).Format("2006-01-02T15:04:05.000+0800"),
Meta: dataOutMeta,
IotaDevice: deviceData.DeviceId,
CreateTime: time.Now(),
}
return &createNativeRaw
}