Browse Source

update 更新最新数据存储部分,默认定时30s存储数据

pull/2/head
lucas 2 months ago
parent
commit
5dc6916de9
  1. 145
      adaptors/安心云最新设备数据toES.go
  2. 1
      config/configStruct.go
  3. 5
      configFiles/config_安心云设备数据_最新同步.json
  4. 72
      consumers/consumerAXYraw.go
  5. 2
      dbHelper/elasticsearchHelper.go
  6. 1
      main.go
  7. 6
      models/constant.go
  8. 107
      models/deviceData.go
  9. 27
      models/deviceInfo.go

145
adaptors/安心云最新设备数据toES.go

@ -4,11 +4,9 @@ import (
"encoding/json"
"fmt"
"goUpload/consumers/AXYraw"
"goUpload/consumers/GZGZM"
"goUpload/dbHelper"
"goUpload/models"
"log"
"math"
"time"
)
@ -18,112 +16,83 @@ type Adaptor_AXY_LastRAW struct {
Redis *dbHelper.RedisHelper
}
func (the Adaptor_AXY_LastRAW) Transform(topic, rawMsg string) []byte {
func (the Adaptor_AXY_LastRAW) Transform(topic, rawMsg string) *models.EsRaw {
iotaData := models.IotaData{}
json.Unmarshal([]byte(rawMsg), &iotaData)
return the.Theme2GzGZM(iotaData)
return the.raw2es(iotaData)
}
func (the Adaptor_AXY_LastRAW) Theme2GzGZM(iotaData models.IotaData) (result []byte) {
func (the Adaptor_AXY_LastRAW) raw2es(iotaData models.IotaData) *models.EsRaw {
if !iotaData.Data.Success() {
return
return nil
}
log.Printf("设备[%s] 数据时间 %s", iotaData.DeviceId, iotaData.TriggerTime)
the.GetDeviceInfo(iotaData.DeviceId)
return result
}
func (the Adaptor_AXY_LastRAW) getSensorId(sensorId string) GZGZM.SensorInfo {
s := GZGZM.SensorInfo{}
//if v, ok := the.SensorInfoMap[sensorId]; ok {
// s = v
//}
return s
}
func (the Adaptor_AXY_LastRAW) getCodeBytes(sensorCode int16) []byte {
bytes := make([]byte, 0)
bytes = append(bytes,
byte(sensorCode&0xFF),
byte(sensorCode>>8),
)
return bytes
}
func (the Adaptor_AXY_LastRAW) getTimeBytes(sensorTime time.Time) []byte {
deviceInfo := the.GetDeviceInfo(iotaData.DeviceId)
year := int8(sensorTime.Year() - 1900)
month := int8(sensorTime.Month())
day := int8(sensorTime.Day())
hour := int8(sensorTime.Hour())
minute := int8(sensorTime.Minute())
millisecond := uint16(sensorTime.Second()*1000 + sensorTime.Nanosecond()/1e6)
bytes := make([]byte, 0)
bytes = append(bytes,
byte(year),
byte(month),
byte(day),
byte(hour),
byte(minute),
byte(millisecond&0xFF),
byte(millisecond>>8),
)
return bytes
}
func (the Adaptor_AXY_LastRAW) getDatasBytes(datas []float32) []byte {
bytes := make([]byte, 0)
for _, data := range datas {
bits := math.Float32bits(data)
bytes = append(bytes,
byte(bits&0xFF),
byte(bits>>8&0xFF),
byte(bits>>16&0xFF),
byte(bits>>24&0xFF),
)
dataType := ""
if _dataType, ok := iotaData.Data.Data["_data_type"]; ok {
if v, ok := _dataType.(string); ok {
dataType = v
}
}
return bytes
}
func (the Adaptor_AXY_LastRAW) getPayloadHeader(floatCount int16) []byte {
bytes := make([]byte, 0)
bytes = append(bytes,
//报文类型
0x02,
0x00,
//1:上行信息
0x01,
//默认,通讯计算机编号
0x00,
//命令码
0x01,
//报文长度
byte((floatCount*4+9)&0xFF),
byte((floatCount*4+9)>>8),
)
return bytes
devdata := &models.DeviceData{
DeviceId: iotaData.DeviceId,
Name: deviceInfo.Name,
ThingId: iotaData.ThingId,
StructId: deviceInfo.Structure.Id,
AcqTime: iotaData.TriggerTime,
RealTime: iotaData.RealTime,
ErrCode: 0,
Raw: iotaData.Data.Data,
DeviceInfo: deviceInfo,
DimensionId: iotaData.DimensionId,
DataType: dataType,
}
EsRaws := toEsRaw(devdata)
return EsRaws
}
func (the Adaptor_AXY_LastRAW) GetDeviceInfo(deviceId string) []byte {
func (the Adaptor_AXY_LastRAW) GetDeviceInfo(deviceId string) models.DeviceInfo {
Key_Iota_device := "iota_device"
key_Thing_struct := "thing_struct"
key_Iota_meta := "iota_meta"
k1 := fmt.Sprintf("%s:%s", Key_Iota_device, deviceId)
dev := models.IotaDevice{}
ts := models.ThingStruct{}
thingStruct := models.ThingStruct{}
devMeta := models.DeviceMeta{}
err1 := the.Redis.GetObj(k1, &dev)
k2 := fmt.Sprintf("%s:%s", key_Thing_struct, dev.ThingId)
err2 := the.Redis.GetObj(k2, &ts)
err2 := the.Redis.GetObj(k2, &thingStruct)
k3 := fmt.Sprintf("%s:%s", key_Iota_meta, dev.DeviceMeta.Id)
err3 := the.Redis.GetObj(k3, &devMeta)
println(err1, err2, err3)
if err1 != nil || err2 != nil || err3 != nil {
log.Printf("redis读取异常,err1=%s, err2=%s, err3=%s", err1, err2, err3)
}
s := models.Structure{
ThingId: thingStruct.ThingId,
Id: thingStruct.Id,
Name: thingStruct.Name,
OrgId: thingStruct.OrgId,
}
return models.DeviceInfo{
Id: deviceId,
Name: dev.Name,
Structure: s,
DeviceMeta: devMeta,
}
}
func toEsRaw(deviceData *models.DeviceData) *models.EsRaw {
dataOutMeta := deviceData.DeviceInfo.DeviceMeta.GetOutputProps()
createNativeRaw := models.EsRaw{
StructId: deviceData.StructId,
IotaDeviceName: deviceData.Name,
Data: deviceData.Raw,
CollectTime: deviceData.AcqTime,
Meta: dataOutMeta,
IotaDevice: deviceData.DeviceId,
CreateTime: time.Now(),
}
return make([]byte, 0)
return &createNativeRaw
}

1
config/configStruct.go

@ -25,6 +25,7 @@ type EsConfig struct {
UserName string `json:"userName"`
Password string `json:"password"`
} `json:"auth"`
Interval int `json:"interval"`
}
type UdpConfig struct {

5
configFiles/config_安心云设备数据_最新同步.json

@ -14,12 +14,13 @@
},
"out": {
"es": {
"address": ["http://10.8.30.142:30092"],
"address": ["http://10.8.30.160:30092"],
"index": "anxincloud_raws_last",
"auth": {
"userName": "post",
"password": "123"
}
},
"interval": 30
}
}
},

72
consumers/consumerAXYraw.go

@ -6,18 +6,22 @@ import (
"goUpload/consumers/AXYraw"
"goUpload/dbHelper"
"goUpload/dbHelper/_kafka"
"goUpload/models"
"log"
"sync"
"time"
)
type consumerAXYraw struct {
//数据缓存管道
dataCache chan []byte
dataCache chan *models.EsRaw
//具体配置
ConfigInfo AXYraw.ConfigFile
InKafka _kafka.KafkaHelper
OutEs dbHelper.ESHelper
infoRedis *dbHelper.RedisHelper
sinkRawMap sync.Map
lock sync.Mutex
}
func (the *consumerAXYraw) LoadConfigJson(cfgStr string) {
@ -30,7 +34,8 @@ func (the *consumerAXYraw) LoadConfigJson(cfgStr string) {
}
func (the *consumerAXYraw) Initial(cfg string) error {
the.dataCache = make(chan []byte, 200)
the.sinkRawMap = sync.Map{}
the.dataCache = make(chan *models.EsRaw, 200)
the.LoadConfigJson(cfg)
err := the.inputInitial()
@ -77,52 +82,63 @@ func (the *consumerAXYraw) infoComponentInitial() error {
return nil
}
func (the *consumerAXYraw) RefreshTask() {
the.tokenRefresh()
ticker := time.NewTicker(24 * time.Hour)
func (the *consumerAXYraw) sinkTask() {
intervalSec := the.ConfigInfo.IoConfig.Out.Es.Interval
ticker := time.NewTicker(time.Duration(intervalSec) * time.Second)
defer ticker.Stop()
for true {
<-ticker.C
the.tokenRefresh()
the.toSink()
}
}
func (the *consumerAXYraw) tokenRefresh() {
func (the *consumerAXYraw) toSink() {
var raws []models.EsRaw
the.lock.Lock()
defer the.lock.Unlock()
the.sinkRawMap.Range(func(key, value any) bool {
if v, ok := value.(*models.EsRaw); ok {
raws = append(raws, *v)
return ok
}
return false
})
if len(raws) > 0 {
log.Printf("准备写入es %d条", len(raws))
index := the.ConfigInfo.IoConfig.Out.Es.Index
the.OutEs.BulkWriteRaws2Es(index, raws)
the.sinkRawMap.Clear()
}
}
func (the *consumerAXYraw) Work() {
go the.sinkTask()
go func() {
for {
pushBytes := <-the.dataCache
pushEsRaw := <-the.dataCache
log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache))
log.Printf("推送[%v]: len=%d", "OutEs", len(pushBytes))
//the.OutEs.PublishWithHeader(pushBytes, map[string]string{"Authorization": the.OutEs.Token})
time.Sleep(10 * time.Millisecond)
//有效数据存入缓存
the.lock.Lock()
the.sinkRawMap.Store(pushEsRaw.IotaDevice, pushEsRaw)
the.lock.Unlock()
}
}()
}
func (the *consumerAXYraw) onData(topic string, msg string) bool {
if len(msg) > 80 {
log.Printf("recv:[%s]:%s ...", topic, msg[:80])
//if len(msg) > 80 {
// log.Printf("recv:[%s]:%s ...", topic, msg[:80])
//}
adaptor := adaptors.Adaptor_AXY_LastRAW{
Redis: the.infoRedis,
}
adaptor := the.getAdaptor()
if adaptor != nil {
needPush := adaptor.Transform(topic, msg)
if len(needPush) > 0 {
the.dataCache <- needPush
}
}
return true
}
func (the *consumerAXYraw) getAdaptor() (adaptor adaptors.IAdaptor3) {
needPush := adaptor.Transform(topic, msg)
adaptor = adaptors.Adaptor_AXY_LastRAW{
Redis: the.infoRedis,
if needPush != nil {
the.dataCache <- needPush
}
return adaptor
return true
}

2
dbHelper/elasticsearchHelper.go

@ -200,7 +200,7 @@ func (the *ESHelper) BulkWriteRaws2Es(index string, raws []models.EsRaw) {
for _, raw := range raws {
// scala => val id = UUID.nameUUIDFromBytes(s"${v.deviceId}-${v.acqTime.getMillis}".getBytes("UTF-8")).toString
source, _ := json.Marshal(raw)
_id := fmt.Sprintf("%s-%d", raw.IotaDevice, raw.CollectTime.UnixMilli())
_id := raw.IotaDevice
s := fmt.Sprintf(
`{"index": {"_index": "%s","_id": "%s"}}
%s

1
main.go

@ -22,7 +22,6 @@ func init() {
log.SetFlags(log.LstdFlags | log.Lshortfile | log.Lmicroseconds)
log.SetOutput(multiWriter)
log.Println("=================log start=================")
log.Println("==>")
}
func main() {

6
models/constant.go

@ -0,0 +1,6 @@
package models
const (
RawTypeVib = "vib"
RawTypeDiag = "diag"
)

107
models/deviceData.go

@ -0,0 +1,107 @@
package models
import (
"time"
)
type DeviceData struct {
DeviceId string
Name string
ThingId string
StructId int
TaskId string
AcqTime time.Time
RealTime time.Time
ErrCode int
Raw map[string]any
RawUnit map[string]string
DeviceInfo DeviceInfo
DimensionId string
//数据类型 常见有 comm="" ,RawTypeVib="vib"
DataType string
}
func (d *DeviceData) GetVibrationData() VibrationData {
vibData := VibrationData{}
if d.DataType == RawTypeVib {
if v, ok := d.Raw["filterFreq"]; ok {
if vv, ok := v.(float64); ok {
vibData.FilterFreq = vv
}
}
if v, ok := d.Raw["sampleFreq"]; ok {
if vv, ok := v.(float64); ok {
vibData.SampleFreq = vv
}
}
if v, ok := d.Raw["gainAmplifier"]; ok {
if vv, ok := v.(float64); ok {
vibData.GainAmplifier = byte(vv)
}
}
if v, ok := d.Raw["version"]; ok {
if vv, ok := v.(float64); ok {
vibData.Version = byte(vv)
}
}
if v, ok := d.Raw["triggerType"]; ok {
if vv, ok := v.(float64); ok {
vibData.TriggerType = byte(vv)
}
}
if v, ok := d.Raw["physicalvalue"]; ok {
if vSlice, ok := v.([]any); ok {
for _, vObj := range vSlice {
if vv, ok := vObj.(float64); ok {
vibData.Data = append(vibData.Data, vv)
}
}
}
//去直流
if len(vibData.Data) > 0 {
avg := func(dataArray []float64) float64 {
sum := 0.0
for _, f := range dataArray {
sum += f
}
return sum / float64(len(dataArray))
}(vibData.Data) //common_calc.GetAvg(vibData.Data)
for i := 0; i < len(vibData.Data); i++ {
vibData.Data[i] = vibData.Data[i] - avg
}
}
}
}
return vibData
}
// VibrationData 振动数据
type VibrationData struct {
Version byte
SampleFreq float64
FilterFreq float64
GainAmplifier byte
TriggerType byte
Data []float64 // 原始波形数据
Unit string
}
func (v *VibrationData) FormatParams() map[string]any {
return map[string]any{
"sampleFreq": v.SampleFreq,
"version": v.Version,
"filterFreq": v.FilterFreq,
"gainAmplifier": v.GainAmplifier,
"triggerType": v.TriggerType,
}
}

27
models/deviceInfo.go

@ -1,6 +1,9 @@
package models
import "encoding/json"
import (
"encoding/json"
"fmt"
)
type DeviceInfo struct {
Id string `json:"id"`
@ -17,6 +20,28 @@ type DeviceMeta struct {
Capabilities []IotaCapability `json:"capabilities"`
}
func (m *DeviceMeta) GetOutputProps() (out map[string]string) {
out = make(map[string]string)
if len(m.Capabilities) == 0 {
return
}
for _, property := range m.Capabilities[0].Properties {
info := fmt.Sprintf("%s(%s)", property.ShowName, property.Unit)
out[property.Name] = info
}
return
}
func (m *DeviceMeta) GetOutputUnit() (out map[string]string) {
out = make(map[string]string)
if len(m.Capabilities) == 0 {
return
}
for _, property := range m.Capabilities[0].Properties {
out[property.Name] = property.Unit
}
return
}
// redis序列化
func (m *DeviceMeta) MarshalBinary() (data []byte, err error) {
return json.Marshal(m)

Loading…
Cancel
Save