|
|
|
package adaptors
|
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"goInOut/consumers/AXYraw"
|
|
|
|
"goInOut/dbOperate"
|
|
|
|
"goInOut/models"
|
|
|
|
"log"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Adaptor_AXY_LastRAW 安心云 kafka iota数据 转换 es设备数据
|
|
|
|
type Adaptor_AXY_LastRAW struct {
|
|
|
|
AXYraw.Info
|
|
|
|
Redis *dbOperate.RedisHelper
|
|
|
|
}
|
|
|
|
|
|
|
|
func (the Adaptor_AXY_LastRAW) Transform(topic, rawMsg string) *models.EsRaw {
|
|
|
|
iotaData := models.IotaData{}
|
|
|
|
json.Unmarshal([]byte(rawMsg), &iotaData)
|
|
|
|
return the.raw2es(iotaData)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (the Adaptor_AXY_LastRAW) raw2es(iotaData models.IotaData) *models.EsRaw {
|
|
|
|
if len(iotaData.Data.Data) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if !iotaData.Data.Success() {
|
|
|
|
msg, _ := json.Marshal(iotaData.Data.Result)
|
|
|
|
iotaData.Data.Data["errMsg"] = string(msg)
|
|
|
|
}
|
|
|
|
//log.Printf("设备[%s] 数据时间 %s", iotaData.DeviceId, iotaData.TriggerTime)
|
|
|
|
deviceInfo := the.GetDeviceInfo(iotaData.DeviceId)
|
|
|
|
|
|
|
|
//查不到信息的数据
|
|
|
|
if deviceInfo.Name == "" {
|
|
|
|
log.Printf("设备[%s] 无deviceInfo信息 %s", iotaData.DeviceId, iotaData.TriggerTime)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
dataType := ""
|
|
|
|
if _dataType, ok := iotaData.Data.Data["_data_type"]; ok {
|
|
|
|
if v, ok := _dataType.(string); ok {
|
|
|
|
dataType = v
|
|
|
|
}
|
|
|
|
}
|
|
|
|
devdata := &models.DeviceData{
|
|
|
|
DeviceId: iotaData.DeviceId,
|
|
|
|
Name: deviceInfo.Name,
|
|
|
|
ThingId: iotaData.ThingId,
|
|
|
|
StructId: deviceInfo.Structure.Id,
|
|
|
|
AcqTime: iotaData.TriggerTime,
|
|
|
|
RealTime: iotaData.RealTime,
|
|
|
|
ErrCode: 0,
|
|
|
|
Raw: iotaData.Data.Data,
|
|
|
|
DeviceInfo: deviceInfo,
|
|
|
|
DimensionId: iotaData.DimensionId,
|
|
|
|
DataType: dataType,
|
|
|
|
}
|
|
|
|
EsRaws := toEsRaw(devdata)
|
|
|
|
return EsRaws
|
|
|
|
}
|
|
|
|
|
|
|
|
var deviceInfoMap = map[string]models.DeviceInfo{}
|
|
|
|
var deviceInfoMap2 = sync.Map{}
|
|
|
|
|
|
|
|
func (the Adaptor_AXY_LastRAW) GetDeviceInfo(deviceId string) models.DeviceInfo {
|
|
|
|
if vObj, ok := deviceInfoMap2.Load(deviceId); ok {
|
|
|
|
if v, ok := vObj.(models.DeviceInfo); ok {
|
|
|
|
durationMin := time.Now().Sub(v.RefreshTime).Minutes()
|
|
|
|
if durationMin < 5 {
|
|
|
|
return v
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
v, err := the.GetDeviceInfoFromRedis(deviceId)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("设备%s 无法查询到对应信息", deviceId)
|
|
|
|
}
|
|
|
|
//deviceInfoMap[deviceId] = v
|
|
|
|
deviceInfoMap2.Store(deviceId, v)
|
|
|
|
return v
|
|
|
|
}
|
|
|
|
|
|
|
|
func (the Adaptor_AXY_LastRAW) GetDeviceInfoFromRedis(deviceId string) (models.DeviceInfo, error) {
|
|
|
|
Key_Iota_device := "iota_device"
|
|
|
|
key_Thing_struct := "thing_struct"
|
|
|
|
key_Iota_meta := "iota_meta"
|
|
|
|
k1 := fmt.Sprintf("%s:%s", Key_Iota_device, deviceId)
|
|
|
|
dev := models.IotaDevice{}
|
|
|
|
thingStruct := models.ThingStruct{}
|
|
|
|
devMeta := models.DeviceMeta{}
|
|
|
|
err1 := the.Redis.GetObj(k1, &dev)
|
|
|
|
if err1 != nil {
|
|
|
|
return models.DeviceInfo{RefreshTime: time.Now()}, err1
|
|
|
|
}
|
|
|
|
k2 := fmt.Sprintf("%s:%s", key_Thing_struct, dev.ThingId)
|
|
|
|
err2 := the.Redis.GetObj(k2, &thingStruct)
|
|
|
|
if err2 != nil {
|
|
|
|
return models.DeviceInfo{RefreshTime: time.Now()}, err2
|
|
|
|
}
|
|
|
|
k3 := fmt.Sprintf("%s:%s", key_Iota_meta, dev.DeviceMeta.Id)
|
|
|
|
err3 := the.Redis.GetObj(k3, &devMeta)
|
|
|
|
if err3 != nil {
|
|
|
|
return models.DeviceInfo{RefreshTime: time.Now()}, err3
|
|
|
|
}
|
|
|
|
//if err1 != nil || err2 != nil || err3 != nil {
|
|
|
|
// log.Printf("redis读取异常,err1=%s, err2=%s, err3=%s", err1, err2, err3)
|
|
|
|
// return models.DeviceInfo{}
|
|
|
|
//}
|
|
|
|
|
|
|
|
s := models.Structure{
|
|
|
|
ThingId: thingStruct.ThingId,
|
|
|
|
Id: thingStruct.Id,
|
|
|
|
Name: thingStruct.Name,
|
|
|
|
OrgId: thingStruct.OrgId,
|
|
|
|
}
|
|
|
|
return models.DeviceInfo{
|
|
|
|
Id: deviceId,
|
|
|
|
Name: dev.Name,
|
|
|
|
Structure: s,
|
|
|
|
DeviceMeta: devMeta,
|
|
|
|
RefreshTime: time.Now(),
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func toEsRaw(deviceData *models.DeviceData) *models.EsRaw {
|
|
|
|
|
|
|
|
if deviceData == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
var cstZone = time.FixedZone("CST", 8*3600) // 东八区 用以解决
|
|
|
|
dataOutMeta := deviceData.DeviceInfo.DeviceMeta.GetOutputProps()
|
|
|
|
createNativeRaw := models.EsRaw{
|
|
|
|
StructId: deviceData.StructId,
|
|
|
|
IotaDeviceName: deviceData.Name,
|
|
|
|
Data: deviceData.Raw,
|
|
|
|
CollectTime: deviceData.AcqTime.In(cstZone).Format("2006-01-02T15:04:05.000+0800"),
|
|
|
|
Meta: dataOutMeta,
|
|
|
|
IotaDevice: deviceData.DeviceId,
|
|
|
|
CreateTime: time.Now(),
|
|
|
|
}
|
|
|
|
|
|
|
|
return &createNativeRaw
|
|
|
|
}
|