diff --git a/adaptors/安心云最新设备数据toES.go b/adaptors/安心云最新设备数据toES.go index ee3d1fe..d361ca7 100644 --- a/adaptors/安心云最新设备数据toES.go +++ b/adaptors/安心云最新设备数据toES.go @@ -4,11 +4,9 @@ import ( "encoding/json" "fmt" "goUpload/consumers/AXYraw" - "goUpload/consumers/GZGZM" "goUpload/dbHelper" "goUpload/models" "log" - "math" "time" ) @@ -18,112 +16,83 @@ type Adaptor_AXY_LastRAW struct { Redis *dbHelper.RedisHelper } -func (the Adaptor_AXY_LastRAW) Transform(topic, rawMsg string) []byte { +func (the Adaptor_AXY_LastRAW) Transform(topic, rawMsg string) *models.EsRaw { iotaData := models.IotaData{} json.Unmarshal([]byte(rawMsg), &iotaData) - return the.Theme2GzGZM(iotaData) + return the.raw2es(iotaData) } -func (the Adaptor_AXY_LastRAW) Theme2GzGZM(iotaData models.IotaData) (result []byte) { +func (the Adaptor_AXY_LastRAW) raw2es(iotaData models.IotaData) *models.EsRaw { if !iotaData.Data.Success() { - return + return nil } log.Printf("设备[%s] 数据时间 %s", iotaData.DeviceId, iotaData.TriggerTime) - the.GetDeviceInfo(iotaData.DeviceId) - return result -} -func (the Adaptor_AXY_LastRAW) getSensorId(sensorId string) GZGZM.SensorInfo { - s := GZGZM.SensorInfo{} - //if v, ok := the.SensorInfoMap[sensorId]; ok { - // s = v - //} - return s -} -func (the Adaptor_AXY_LastRAW) getCodeBytes(sensorCode int16) []byte { - - bytes := make([]byte, 0) - bytes = append(bytes, - byte(sensorCode&0xFF), - byte(sensorCode>>8), - ) - - return bytes -} - -func (the Adaptor_AXY_LastRAW) getTimeBytes(sensorTime time.Time) []byte { + deviceInfo := the.GetDeviceInfo(iotaData.DeviceId) - year := int8(sensorTime.Year() - 1900) - month := int8(sensorTime.Month()) - day := int8(sensorTime.Day()) - hour := int8(sensorTime.Hour()) - minute := int8(sensorTime.Minute()) - millisecond := uint16(sensorTime.Second()*1000 + sensorTime.Nanosecond()/1e6) - bytes := make([]byte, 0) - bytes = append(bytes, - byte(year), - byte(month), - byte(day), - byte(hour), - byte(minute), - byte(millisecond&0xFF), - byte(millisecond>>8), - ) - - return bytes -} - -func (the Adaptor_AXY_LastRAW) getDatasBytes(datas []float32) []byte { - - bytes := make([]byte, 0) - for _, data := range datas { - bits := math.Float32bits(data) - bytes = append(bytes, - byte(bits&0xFF), - byte(bits>>8&0xFF), - byte(bits>>16&0xFF), - byte(bits>>24&0xFF), - ) + dataType := "" + if _dataType, ok := iotaData.Data.Data["_data_type"]; ok { + if v, ok := _dataType.(string); ok { + dataType = v + } } - - return bytes -} - -func (the Adaptor_AXY_LastRAW) getPayloadHeader(floatCount int16) []byte { - - bytes := make([]byte, 0) - - bytes = append(bytes, - //报文类型 - 0x02, - 0x00, - //1:上行信息 - 0x01, - //默认,通讯计算机编号 - 0x00, - //命令码 - 0x01, - //报文长度 - byte((floatCount*4+9)&0xFF), - byte((floatCount*4+9)>>8), - ) - - return bytes + devdata := &models.DeviceData{ + DeviceId: iotaData.DeviceId, + Name: deviceInfo.Name, + ThingId: iotaData.ThingId, + StructId: deviceInfo.Structure.Id, + AcqTime: iotaData.TriggerTime, + RealTime: iotaData.RealTime, + ErrCode: 0, + Raw: iotaData.Data.Data, + DeviceInfo: deviceInfo, + DimensionId: iotaData.DimensionId, + DataType: dataType, + } + EsRaws := toEsRaw(devdata) + return EsRaws } -func (the Adaptor_AXY_LastRAW) GetDeviceInfo(deviceId string) []byte { +func (the Adaptor_AXY_LastRAW) GetDeviceInfo(deviceId string) models.DeviceInfo { Key_Iota_device := "iota_device" key_Thing_struct := "thing_struct" key_Iota_meta := "iota_meta" k1 := fmt.Sprintf("%s:%s", Key_Iota_device, deviceId) dev := models.IotaDevice{} - ts := models.ThingStruct{} + thingStruct := models.ThingStruct{} devMeta := models.DeviceMeta{} err1 := the.Redis.GetObj(k1, &dev) k2 := fmt.Sprintf("%s:%s", key_Thing_struct, dev.ThingId) - err2 := the.Redis.GetObj(k2, &ts) + err2 := the.Redis.GetObj(k2, &thingStruct) k3 := fmt.Sprintf("%s:%s", key_Iota_meta, dev.DeviceMeta.Id) err3 := the.Redis.GetObj(k3, &devMeta) - println(err1, err2, err3) + if err1 != nil || err2 != nil || err3 != nil { + log.Printf("redis读取异常,err1=%s, err2=%s, err3=%s", err1, err2, err3) + } + + s := models.Structure{ + ThingId: thingStruct.ThingId, + Id: thingStruct.Id, + Name: thingStruct.Name, + OrgId: thingStruct.OrgId, + } + return models.DeviceInfo{ + Id: deviceId, + Name: dev.Name, + Structure: s, + DeviceMeta: devMeta, + } +} +func toEsRaw(deviceData *models.DeviceData) *models.EsRaw { + dataOutMeta := deviceData.DeviceInfo.DeviceMeta.GetOutputProps() + createNativeRaw := models.EsRaw{ + StructId: deviceData.StructId, + IotaDeviceName: deviceData.Name, + Data: deviceData.Raw, + CollectTime: deviceData.AcqTime, + Meta: dataOutMeta, + IotaDevice: deviceData.DeviceId, + CreateTime: time.Now(), + } - return make([]byte, 0) + return &createNativeRaw } diff --git a/config/configStruct.go b/config/configStruct.go index 306432e..1b83152 100644 --- a/config/configStruct.go +++ b/config/configStruct.go @@ -25,6 +25,7 @@ type EsConfig struct { UserName string `json:"userName"` Password string `json:"password"` } `json:"auth"` + Interval int `json:"interval"` } type UdpConfig struct { diff --git a/configFiles/config_安心云设备数据_最新同步.json b/configFiles/config_安心云设备数据_最新同步.json index 164452c..f7aef29 100644 --- a/configFiles/config_安心云设备数据_最新同步.json +++ b/configFiles/config_安心云设备数据_最新同步.json @@ -14,12 +14,13 @@ }, "out": { "es": { - "address": ["http://10.8.30.142:30092"], + "address": ["http://10.8.30.160:30092"], "index": "anxincloud_raws_last", "auth": { "userName": "post", "password": "123" - } + }, + "interval": 30 } } }, diff --git a/consumers/consumerAXYraw.go b/consumers/consumerAXYraw.go index 0adb36d..2c28651 100644 --- a/consumers/consumerAXYraw.go +++ b/consumers/consumerAXYraw.go @@ -6,18 +6,22 @@ import ( "goUpload/consumers/AXYraw" "goUpload/dbHelper" "goUpload/dbHelper/_kafka" + "goUpload/models" "log" + "sync" "time" ) type consumerAXYraw struct { //数据缓存管道 - dataCache chan []byte + dataCache chan *models.EsRaw //具体配置 ConfigInfo AXYraw.ConfigFile InKafka _kafka.KafkaHelper OutEs dbHelper.ESHelper infoRedis *dbHelper.RedisHelper + sinkRawMap sync.Map + lock sync.Mutex } func (the *consumerAXYraw) LoadConfigJson(cfgStr string) { @@ -30,7 +34,8 @@ func (the *consumerAXYraw) LoadConfigJson(cfgStr string) { } func (the *consumerAXYraw) Initial(cfg string) error { - the.dataCache = make(chan []byte, 200) + the.sinkRawMap = sync.Map{} + the.dataCache = make(chan *models.EsRaw, 200) the.LoadConfigJson(cfg) err := the.inputInitial() @@ -77,52 +82,63 @@ func (the *consumerAXYraw) infoComponentInitial() error { return nil } -func (the *consumerAXYraw) RefreshTask() { - the.tokenRefresh() - ticker := time.NewTicker(24 * time.Hour) +func (the *consumerAXYraw) sinkTask() { + intervalSec := the.ConfigInfo.IoConfig.Out.Es.Interval + ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) defer ticker.Stop() for true { <-ticker.C - the.tokenRefresh() + the.toSink() } } -func (the *consumerAXYraw) tokenRefresh() { - +func (the *consumerAXYraw) toSink() { + var raws []models.EsRaw + the.lock.Lock() + defer the.lock.Unlock() + the.sinkRawMap.Range(func(key, value any) bool { + if v, ok := value.(*models.EsRaw); ok { + raws = append(raws, *v) + return ok + } + return false + }) + if len(raws) > 0 { + log.Printf("准备写入es %d条", len(raws)) + index := the.ConfigInfo.IoConfig.Out.Es.Index + the.OutEs.BulkWriteRaws2Es(index, raws) + the.sinkRawMap.Clear() + } } func (the *consumerAXYraw) Work() { - + go the.sinkTask() go func() { for { - pushBytes := <-the.dataCache + pushEsRaw := <-the.dataCache log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache)) - log.Printf("推送[%v]: len=%d", "OutEs", len(pushBytes)) - //the.OutEs.PublishWithHeader(pushBytes, map[string]string{"Authorization": the.OutEs.Token}) - time.Sleep(10 * time.Millisecond) + //有效数据存入缓存 + the.lock.Lock() + the.sinkRawMap.Store(pushEsRaw.IotaDevice, pushEsRaw) + the.lock.Unlock() } }() } func (the *consumerAXYraw) onData(topic string, msg string) bool { - if len(msg) > 80 { - log.Printf("recv:[%s]:%s ...", topic, msg[:80]) + //if len(msg) > 80 { + // log.Printf("recv:[%s]:%s ...", topic, msg[:80]) + //} + adaptor := adaptors.Adaptor_AXY_LastRAW{ + Redis: the.infoRedis, } - adaptor := the.getAdaptor() - if adaptor != nil { - needPush := adaptor.Transform(topic, msg) - if len(needPush) > 0 { - the.dataCache <- needPush - } - } - return true -} -func (the *consumerAXYraw) getAdaptor() (adaptor adaptors.IAdaptor3) { + needPush := adaptor.Transform(topic, msg) - adaptor = adaptors.Adaptor_AXY_LastRAW{ - Redis: the.infoRedis, + if needPush != nil { + the.dataCache <- needPush } - return adaptor + + return true } diff --git a/dbHelper/elasticsearchHelper.go b/dbHelper/elasticsearchHelper.go index 7c18d19..b616537 100644 --- a/dbHelper/elasticsearchHelper.go +++ b/dbHelper/elasticsearchHelper.go @@ -200,7 +200,7 @@ func (the *ESHelper) BulkWriteRaws2Es(index string, raws []models.EsRaw) { for _, raw := range raws { // scala => val id = UUID.nameUUIDFromBytes(s"${v.deviceId}-${v.acqTime.getMillis}".getBytes("UTF-8")).toString source, _ := json.Marshal(raw) - _id := fmt.Sprintf("%s-%d", raw.IotaDevice, raw.CollectTime.UnixMilli()) + _id := raw.IotaDevice s := fmt.Sprintf( `{"index": {"_index": "%s","_id": "%s"}} %s diff --git a/main.go b/main.go index eea8e6d..c1555e1 100644 --- a/main.go +++ b/main.go @@ -22,7 +22,6 @@ func init() { log.SetFlags(log.LstdFlags | log.Lshortfile | log.Lmicroseconds) log.SetOutput(multiWriter) log.Println("=================log start=================") - log.Println("==>") } func main() { diff --git a/models/constant.go b/models/constant.go new file mode 100644 index 0000000..def8050 --- /dev/null +++ b/models/constant.go @@ -0,0 +1,6 @@ +package models + +const ( + RawTypeVib = "vib" + RawTypeDiag = "diag" +) diff --git a/models/deviceData.go b/models/deviceData.go new file mode 100644 index 0000000..a6045ec --- /dev/null +++ b/models/deviceData.go @@ -0,0 +1,107 @@ +package models + +import ( + "time" +) + +type DeviceData struct { + DeviceId string + Name string + ThingId string + StructId int + TaskId string + AcqTime time.Time + RealTime time.Time + ErrCode int + Raw map[string]any + RawUnit map[string]string + DeviceInfo DeviceInfo + DimensionId string + //数据类型 常见有 comm="" ,RawTypeVib="vib" + DataType string +} + +func (d *DeviceData) GetVibrationData() VibrationData { + vibData := VibrationData{} + if d.DataType == RawTypeVib { + if v, ok := d.Raw["filterFreq"]; ok { + if vv, ok := v.(float64); ok { + vibData.FilterFreq = vv + } + } + + if v, ok := d.Raw["sampleFreq"]; ok { + if vv, ok := v.(float64); ok { + vibData.SampleFreq = vv + } + } + + if v, ok := d.Raw["gainAmplifier"]; ok { + if vv, ok := v.(float64); ok { + vibData.GainAmplifier = byte(vv) + } + } + + if v, ok := d.Raw["version"]; ok { + if vv, ok := v.(float64); ok { + vibData.Version = byte(vv) + } + } + + if v, ok := d.Raw["triggerType"]; ok { + if vv, ok := v.(float64); ok { + vibData.TriggerType = byte(vv) + } + } + + if v, ok := d.Raw["physicalvalue"]; ok { + + if vSlice, ok := v.([]any); ok { + for _, vObj := range vSlice { + if vv, ok := vObj.(float64); ok { + vibData.Data = append(vibData.Data, vv) + } + } + + } + + //去直流 + if len(vibData.Data) > 0 { + avg := func(dataArray []float64) float64 { + sum := 0.0 + for _, f := range dataArray { + sum += f + } + return sum / float64(len(dataArray)) + }(vibData.Data) //common_calc.GetAvg(vibData.Data) + + for i := 0; i < len(vibData.Data); i++ { + vibData.Data[i] = vibData.Data[i] - avg + } + } + } + + } + return vibData +} + +// VibrationData 振动数据 +type VibrationData struct { + Version byte + SampleFreq float64 + FilterFreq float64 + GainAmplifier byte + TriggerType byte + Data []float64 // 原始波形数据 + Unit string +} + +func (v *VibrationData) FormatParams() map[string]any { + return map[string]any{ + "sampleFreq": v.SampleFreq, + "version": v.Version, + "filterFreq": v.FilterFreq, + "gainAmplifier": v.GainAmplifier, + "triggerType": v.TriggerType, + } +} diff --git a/models/deviceInfo.go b/models/deviceInfo.go index c612b04..48d0ad7 100644 --- a/models/deviceInfo.go +++ b/models/deviceInfo.go @@ -1,6 +1,9 @@ package models -import "encoding/json" +import ( + "encoding/json" + "fmt" +) type DeviceInfo struct { Id string `json:"id"` @@ -17,6 +20,28 @@ type DeviceMeta struct { Capabilities []IotaCapability `json:"capabilities"` } +func (m *DeviceMeta) GetOutputProps() (out map[string]string) { + out = make(map[string]string) + if len(m.Capabilities) == 0 { + return + } + for _, property := range m.Capabilities[0].Properties { + info := fmt.Sprintf("%s(%s)", property.ShowName, property.Unit) + out[property.Name] = info + } + return +} +func (m *DeviceMeta) GetOutputUnit() (out map[string]string) { + out = make(map[string]string) + if len(m.Capabilities) == 0 { + return + } + for _, property := range m.Capabilities[0].Properties { + out[property.Name] = property.Unit + } + return +} + // redis序列化 func (m *DeviceMeta) MarshalBinary() (data []byte, err error) { return json.Marshal(m)