Browse Source

update 添加 设备信息缓存

pull/2/head
lucas 2 months ago
parent
commit
86fda6ed1c
  1. 24
      adaptors/安心云最新设备数据toES.go
  2. 4
      consumers/consumerAXYraw.go
  3. 2
      dbHelper/_kafka/consumerGroupHandler.go
  4. 10
      models/deviceInfo.go

24
adaptors/安心云最新设备数据toES.go

@ -52,7 +52,21 @@ func (the Adaptor_AXY_LastRAW) raw2es(iotaData models.IotaData) *models.EsRaw {
return EsRaws return EsRaws
} }
var deviceInfoMap = map[string]models.DeviceInfo{}
func (the Adaptor_AXY_LastRAW) GetDeviceInfo(deviceId string) models.DeviceInfo { func (the Adaptor_AXY_LastRAW) GetDeviceInfo(deviceId string) models.DeviceInfo {
if v, ok := deviceInfoMap[deviceId]; ok {
durationMin := time.Now().Sub(v.RefreshTime).Minutes()
if durationMin < 5 {
return v
}
}
v := the.GetDeviceInfoFromRedis(deviceId)
deviceInfoMap[deviceId] = v
return v
}
func (the Adaptor_AXY_LastRAW) GetDeviceInfoFromRedis(deviceId string) models.DeviceInfo {
Key_Iota_device := "iota_device" Key_Iota_device := "iota_device"
key_Thing_struct := "thing_struct" key_Thing_struct := "thing_struct"
key_Iota_meta := "iota_meta" key_Iota_meta := "iota_meta"
@ -76,12 +90,14 @@ func (the Adaptor_AXY_LastRAW) GetDeviceInfo(deviceId string) models.DeviceInfo
OrgId: thingStruct.OrgId, OrgId: thingStruct.OrgId,
} }
return models.DeviceInfo{ return models.DeviceInfo{
Id: deviceId, Id: deviceId,
Name: dev.Name, Name: dev.Name,
Structure: s, Structure: s,
DeviceMeta: devMeta, DeviceMeta: devMeta,
RefreshTime: time.Now(),
} }
} }
func toEsRaw(deviceData *models.DeviceData) *models.EsRaw { func toEsRaw(deviceData *models.DeviceData) *models.EsRaw {
dataOutMeta := deviceData.DeviceInfo.DeviceMeta.GetOutputProps() dataOutMeta := deviceData.DeviceInfo.DeviceMeta.GetOutputProps()
createNativeRaw := models.EsRaw{ createNativeRaw := models.EsRaw{

4
consumers/consumerAXYraw.go

@ -86,7 +86,7 @@ func (the *consumerAXYraw) sinkTask() {
intervalSec := the.ConfigInfo.IoConfig.Out.Es.Interval intervalSec := the.ConfigInfo.IoConfig.Out.Es.Interval
ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) ticker := time.NewTicker(time.Duration(intervalSec) * time.Second)
defer ticker.Stop() defer ticker.Stop()
for true { for {
<-ticker.C <-ticker.C
the.toSink() the.toSink()
} }
@ -116,7 +116,7 @@ func (the *consumerAXYraw) Work() {
go func() { go func() {
for { for {
pushEsRaw := <-the.dataCache pushEsRaw := <-the.dataCache
log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache)) //log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache))
//有效数据存入缓存 //有效数据存入缓存
the.lock.Lock() the.lock.Lock()

2
dbHelper/_kafka/consumerGroupHandler.go

@ -82,7 +82,7 @@ func (h *ConsumerGroupHandler) Worker() {
config := sarama.NewConfig() config := sarama.NewConfig()
config.Consumer.Return.Errors = false config.Consumer.Return.Errors = false
config.Version = sarama.V2_0_0_0 config.Version = sarama.V2_0_0_0
config.Consumer.Offsets.Initial = sarama.OffsetOldest config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Consumer.Offsets.AutoCommit.Enable = true config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
group, err := sarama.NewConsumerGroup(h.brokers, h.groupId, config) group, err := sarama.NewConsumerGroup(h.brokers, h.groupId, config)

10
models/deviceInfo.go

@ -3,13 +3,15 @@ package models
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"time"
) )
type DeviceInfo struct { type DeviceInfo struct {
Id string `json:"id"` Id string `json:"id"`
Name string `json:"name"` Name string `json:"name"`
Structure Structure `json:"structure"` Structure Structure `json:"structure"`
DeviceMeta DeviceMeta `json:"device_meta"` DeviceMeta DeviceMeta `json:"device_meta"`
RefreshTime time.Time
} }
type DeviceMeta struct { type DeviceMeta struct {

Loading…
Cancel
Save