diff --git a/adaptors/安心云最新设备数据toES.go b/adaptors/安心云最新设备数据toES.go index 7b18115..4df3b9c 100644 --- a/adaptors/安心云最新设备数据toES.go +++ b/adaptors/安心云最新设备数据toES.go @@ -52,7 +52,21 @@ func (the Adaptor_AXY_LastRAW) raw2es(iotaData models.IotaData) *models.EsRaw { return EsRaws } +var deviceInfoMap = map[string]models.DeviceInfo{} + func (the Adaptor_AXY_LastRAW) GetDeviceInfo(deviceId string) models.DeviceInfo { + if v, ok := deviceInfoMap[deviceId]; ok { + durationMin := time.Now().Sub(v.RefreshTime).Minutes() + if durationMin < 5 { + return v + } + } + v := the.GetDeviceInfoFromRedis(deviceId) + deviceInfoMap[deviceId] = v + return v +} + +func (the Adaptor_AXY_LastRAW) GetDeviceInfoFromRedis(deviceId string) models.DeviceInfo { Key_Iota_device := "iota_device" key_Thing_struct := "thing_struct" key_Iota_meta := "iota_meta" @@ -76,12 +90,14 @@ func (the Adaptor_AXY_LastRAW) GetDeviceInfo(deviceId string) models.DeviceInfo OrgId: thingStruct.OrgId, } return models.DeviceInfo{ - Id: deviceId, - Name: dev.Name, - Structure: s, - DeviceMeta: devMeta, + Id: deviceId, + Name: dev.Name, + Structure: s, + DeviceMeta: devMeta, + RefreshTime: time.Now(), } } + func toEsRaw(deviceData *models.DeviceData) *models.EsRaw { dataOutMeta := deviceData.DeviceInfo.DeviceMeta.GetOutputProps() createNativeRaw := models.EsRaw{ diff --git a/consumers/consumerAXYraw.go b/consumers/consumerAXYraw.go index ae309f8..410f016 100644 --- a/consumers/consumerAXYraw.go +++ b/consumers/consumerAXYraw.go @@ -86,7 +86,7 @@ func (the *consumerAXYraw) sinkTask() { intervalSec := the.ConfigInfo.IoConfig.Out.Es.Interval ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) defer ticker.Stop() - for true { + for { <-ticker.C the.toSink() } @@ -116,7 +116,7 @@ func (the *consumerAXYraw) Work() { go func() { for { pushEsRaw := <-the.dataCache - log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache)) + //log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache)) //有效数据存入缓存 the.lock.Lock() diff --git a/dbHelper/_kafka/consumerGroupHandler.go b/dbHelper/_kafka/consumerGroupHandler.go index e10a27f..45da072 100644 --- a/dbHelper/_kafka/consumerGroupHandler.go +++ b/dbHelper/_kafka/consumerGroupHandler.go @@ -82,7 +82,7 @@ func (h *ConsumerGroupHandler) Worker() { config := sarama.NewConfig() config.Consumer.Return.Errors = false config.Version = sarama.V2_0_0_0 - config.Consumer.Offsets.Initial = sarama.OffsetOldest + config.Consumer.Offsets.Initial = sarama.OffsetNewest config.Consumer.Offsets.AutoCommit.Enable = true config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} group, err := sarama.NewConsumerGroup(h.brokers, h.groupId, config) diff --git a/models/deviceInfo.go b/models/deviceInfo.go index 48d0ad7..53329c2 100644 --- a/models/deviceInfo.go +++ b/models/deviceInfo.go @@ -3,13 +3,15 @@ package models import ( "encoding/json" "fmt" + "time" ) type DeviceInfo struct { - Id string `json:"id"` - Name string `json:"name"` - Structure Structure `json:"structure"` - DeviceMeta DeviceMeta `json:"device_meta"` + Id string `json:"id"` + Name string `json:"name"` + Structure Structure `json:"structure"` + DeviceMeta DeviceMeta `json:"device_meta"` + RefreshTime time.Time } type DeviceMeta struct {