Compare commits
4 Commits
3a1ef52537
...
17c0f9d0b0
Author | SHA1 | Date |
---|---|---|
|
17c0f9d0b0 | 2 months ago |
|
6f0ee40d06 | 2 months ago |
|
6bcf9bff7c | 2 months ago |
|
c566ed4c83 | 2 months ago |
11 changed files with 17 additions and 280 deletions
@ -1,30 +0,0 @@ |
|||||
package dataSource |
|
||||
|
|
||||
import ( |
|
||||
"gitea.anxinyun.cn/container/common_models" |
|
||||
"sync" |
|
||||
) |
|
||||
|
|
||||
type DataChannels struct { |
|
||||
RawDataChan chan common_models.IotaData |
|
||||
AggDataChan chan common_models.AggData |
|
||||
} |
|
||||
|
|
||||
var ( |
|
||||
once sync.Once |
|
||||
dataChannels *DataChannels |
|
||||
) |
|
||||
|
|
||||
func InitChannels() *DataChannels { |
|
||||
once.Do(func() { |
|
||||
dataChannels = &DataChannels{ |
|
||||
RawDataChan: make(chan common_models.IotaData, 1), |
|
||||
AggDataChan: make(chan common_models.AggData, 1), |
|
||||
} |
|
||||
}) |
|
||||
return dataChannels |
|
||||
} |
|
||||
|
|
||||
func GetChannels() *DataChannels { |
|
||||
return dataChannels |
|
||||
} |
|
@ -1,69 +0,0 @@ |
|||||
module dataSource |
|
||||
|
|
||||
go 1.23.1 |
|
||||
|
|
||||
require ( |
|
||||
gitea.anxinyun.cn/container/common_models v0.0.10 |
|
||||
gitea.anxinyun.cn/container/common_utils v0.0.8 |
|
||||
) |
|
||||
|
|
||||
require ( |
|
||||
github.com/IBM/sarama v1.43.0 // indirect |
|
||||
github.com/allegro/bigcache v1.2.1 // indirect |
|
||||
github.com/beorn7/perks v1.0.1 // indirect |
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect |
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect |
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect |
|
||||
github.com/eapache/go-resiliency v1.6.0 // indirect |
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect |
|
||||
github.com/eapache/queue v1.1.0 // indirect |
|
||||
github.com/eclipse/paho.mqtt.golang v1.4.3 // indirect |
|
||||
github.com/eko/gocache/lib/v4 v4.1.5 // indirect |
|
||||
github.com/eko/gocache/store/bigcache/v4 v4.2.1 // indirect |
|
||||
github.com/eko/gocache/store/redis/v4 v4.2.1 // indirect |
|
||||
github.com/fsnotify/fsnotify v1.7.0 // indirect |
|
||||
github.com/golang/mock v1.6.0 // indirect |
|
||||
github.com/golang/protobuf v1.5.3 // indirect |
|
||||
github.com/golang/snappy v0.0.4 // indirect |
|
||||
github.com/gorilla/websocket v1.5.0 // indirect |
|
||||
github.com/hashicorp/errwrap v1.0.0 // indirect |
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect |
|
||||
github.com/hashicorp/go-uuid v1.0.3 // indirect |
|
||||
github.com/hashicorp/hcl v1.0.0 // indirect |
|
||||
github.com/jcmturner/aescts/v2 v2.0.0 // indirect |
|
||||
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect |
|
||||
github.com/jcmturner/gofork v1.7.6 // indirect |
|
||||
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect |
|
||||
github.com/jcmturner/rpc/v2 v2.0.3 // indirect |
|
||||
github.com/klauspost/compress v1.17.7 // indirect |
|
||||
github.com/magiconair/properties v1.8.7 // indirect |
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect |
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect |
|
||||
github.com/pelletier/go-toml/v2 v2.1.0 // indirect |
|
||||
github.com/pierrec/lz4/v4 v4.1.21 // indirect |
|
||||
github.com/prometheus/client_golang v1.14.0 // indirect |
|
||||
github.com/prometheus/client_model v0.3.0 // indirect |
|
||||
github.com/prometheus/common v0.37.0 // indirect |
|
||||
github.com/prometheus/procfs v0.8.0 // indirect |
|
||||
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect |
|
||||
github.com/redis/go-redis/v9 v9.5.1 // indirect |
|
||||
github.com/sagikazarmark/locafero v0.4.0 // indirect |
|
||||
github.com/sagikazarmark/slog-shim v0.1.0 // indirect |
|
||||
github.com/sourcegraph/conc v0.3.0 // indirect |
|
||||
github.com/spf13/afero v1.11.0 // indirect |
|
||||
github.com/spf13/cast v1.6.0 // indirect |
|
||||
github.com/spf13/pflag v1.0.5 // indirect |
|
||||
github.com/spf13/viper v1.18.2 // indirect |
|
||||
github.com/subosito/gotenv v1.6.0 // indirect |
|
||||
go.uber.org/atomic v1.9.0 // indirect |
|
||||
go.uber.org/multierr v1.9.0 // indirect |
|
||||
golang.org/x/crypto v0.19.0 // indirect |
|
||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect |
|
||||
golang.org/x/net v0.21.0 // indirect |
|
||||
golang.org/x/sync v0.6.0 // indirect |
|
||||
golang.org/x/sys v0.17.0 // indirect |
|
||||
golang.org/x/text v0.14.0 // indirect |
|
||||
google.golang.org/protobuf v1.31.0 // indirect |
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect |
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect |
|
||||
) |
|
@ -1,60 +0,0 @@ |
|||||
package kafka |
|
||||
|
|
||||
import ( |
|
||||
"dataSource" |
|
||||
"encoding/json" |
|
||||
"gitea.anxinyun.cn/container/common_models" |
|
||||
"gitea.anxinyun.cn/container/common_utils" |
|
||||
"gitea.anxinyun.cn/container/common_utils/configLoad" |
|
||||
"log" |
|
||||
"strings" |
|
||||
"time" |
|
||||
) |
|
||||
|
|
||||
type AggDataHandler struct { |
|
||||
configHelper *common_utils.ConfigHelper |
|
||||
} |
|
||||
|
|
||||
func NewAggDataHandler() *AggDataHandler { |
|
||||
redisAddr := configLoad.LoadConfig().GetString("redis.address") |
|
||||
return &AggDataHandler{ |
|
||||
configHelper: common_utils.NewConfigHelper(redisAddr), |
|
||||
} |
|
||||
|
|
||||
} |
|
||||
|
|
||||
func (h AggDataHandler) HandleMessage(message string) bool { |
|
||||
// aggDataMsg: {"date":"2024-09-19T09:39:59.999+0000","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}}
|
|
||||
// aggDataMsg 中的时间为UTC格式 2024-04-19T01:10:59.999+0000,
|
|
||||
// 在进行 json.Unmarshal() 时报错
|
|
||||
// 解决方案:先将 +0000 -> Z,然后再将 UTC 时间转换为中国时区时间("Asia/Shanghai")
|
|
||||
|
|
||||
// 2024-09-28T23:59:59.999+0800
|
|
||||
// 将 2024-04-19T01:10:59.999+0000 -> 2024-04-19T01:10:59.999Z
|
|
||||
utcTimeStr := strings.Replace(message, "+0800", "+08:00", 1) |
|
||||
utcTimeStr = strings.Replace(utcTimeStr, "+0000", "Z", 1) |
|
||||
|
|
||||
aggData := common_models.AggData{} |
|
||||
err := json.Unmarshal([]byte(utcTimeStr), &aggData) |
|
||||
if err != nil { |
|
||||
log.Printf("json parse error: %v", err) |
|
||||
return false |
|
||||
} |
|
||||
// 转换为中国时区时间("Asia/Shanghai")
|
|
||||
loc, _ := time.LoadLocation("Asia/Shanghai") |
|
||||
chinaTime := aggData.Date.In(loc) |
|
||||
aggData.Date = chinaTime |
|
||||
//log.Printf("message:%v\n, cvt: %+v", message, aggData)
|
|
||||
if aggData.ThingId == "" { |
|
||||
structure, err := h.configHelper.GetStructure(aggData.StructId) |
|
||||
if err != nil { |
|
||||
log.Printf("redis 中无 key = structure:%d 的缓存数据.", aggData.StructId) |
|
||||
return false |
|
||||
} |
|
||||
aggData.ThingId = structure.ThingId |
|
||||
} |
|
||||
|
|
||||
log.Printf("handler 处理sensorId[%d]消息", aggData.SensorId) |
|
||||
dataSource.GetChannels().AggDataChan <- aggData |
|
||||
return true |
|
||||
} |
|
@ -1,23 +0,0 @@ |
|||||
package kafka |
|
||||
|
|
||||
import ( |
|
||||
"dataSource" |
|
||||
"encoding/json" |
|
||||
"gitea.anxinyun.cn/container/common_models" |
|
||||
"log" |
|
||||
) |
|
||||
|
|
||||
type IotaDataHandler struct{} |
|
||||
|
|
||||
func (h IotaDataHandler) HandleMessage(message string) bool { |
|
||||
// 处理 alarm 消息
|
|
||||
rawData := common_models.IotaData{} |
|
||||
err := json.Unmarshal([]byte(message), &rawData) |
|
||||
if err != nil { |
|
||||
log.Printf("[dataSource/kafka/iotaData/IotaHandler] Parse msg error: %v", err) |
|
||||
return false |
|
||||
} |
|
||||
log.Printf("handler 处理[%s|%s]消息", rawData.DeviceId, rawData.TriggerTime) |
|
||||
dataSource.GetChannels().RawDataChan <- rawData |
|
||||
return true |
|
||||
} |
|
@ -1,71 +0,0 @@ |
|||||
package kafka |
|
||||
|
|
||||
import ( |
|
||||
"dataSource" |
|
||||
"gitea.anxinyun.cn/container/common_utils/configLoad" |
|
||||
"gitea.anxinyun.cn/container/common_utils/kafkaHelper" |
|
||||
"log" |
|
||||
) |
|
||||
|
|
||||
type KafkaDataSource struct { |
|
||||
groupId string |
|
||||
brokers []string |
|
||||
topics map[string]string |
|
||||
DataChannels *dataSource.DataChannels |
|
||||
} |
|
||||
|
|
||||
func NewKafkaDataSource() *KafkaDataSource { |
|
||||
// 初始化所有通道
|
|
||||
dataSource.InitChannels() |
|
||||
|
|
||||
// 读取配置
|
|
||||
config := configLoad.LoadConfig() |
|
||||
groupId := config.GetString("kafka.groupId") |
|
||||
brokers := config.GetStringSlice("kafka.brokers") |
|
||||
topics := config.GetStringMapString("kafka.topics") |
|
||||
|
|
||||
return &KafkaDataSource{ |
|
||||
groupId: groupId, |
|
||||
brokers: brokers, |
|
||||
topics: topics, |
|
||||
DataChannels: dataSource.GetChannels(), |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
// Producer 将 kafka message -> 各数据模型 -> 各数据通道
|
|
||||
func (s *KafkaDataSource) Producer() { |
|
||||
// 消费数据
|
|
||||
kafkaConsumer := kafkaHelper.NewConsumerGroupHandler(s.brokers, s.groupId) |
|
||||
for cfgName, topic := range s.topics { |
|
||||
// 创建消息处理器
|
|
||||
handler := NewMessageHandler(cfgName) |
|
||||
if handler == nil { |
|
||||
log.Printf("Kafka topic【%s】未定义处理者。\n", cfgName) |
|
||||
continue |
|
||||
} |
|
||||
// 订阅主题 和 消息处理
|
|
||||
kafkaConsumer.Subscribe(topic, handler.HandleMessage) |
|
||||
} |
|
||||
|
|
||||
kafkaConsumer.Worker() |
|
||||
} |
|
||||
|
|
||||
// IMessageHandler 是 kafka 消息处理者接口
|
|
||||
type IMessageHandler interface { |
|
||||
HandleMessage(message string) bool |
|
||||
} |
|
||||
|
|
||||
// NewMessageHandler 是 MessageHandler 构造函数
|
|
||||
// cfgName: config.yaml 中 kafka.topics 的配置名
|
|
||||
func NewMessageHandler(cfgName string) IMessageHandler { |
|
||||
switch cfgName { |
|
||||
case "data_raw": |
|
||||
log.Printf("Kafka topic【%s】已注册处理者IotaDataHandler\n", cfgName) |
|
||||
return IotaDataHandler{} |
|
||||
case "data_agg": |
|
||||
log.Printf("Kafka topic【%s】已注册处理者AggDataHandler\n", cfgName) |
|
||||
return NewAggDataHandler() |
|
||||
default: |
|
||||
return nil |
|
||||
} |
|
||||
} |
|
Loading…
Reference in new issue