From 6bcf9bff7c7a5c65a50c6616dbbb4a0ae61e88d0 Mon Sep 17 00:00:00 2001 From: yfh Date: Mon, 24 Feb 2025 23:44:49 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=A4=9A=E4=BD=99=E6=96=87?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dataSource/channels.go | 30 ------------- dataSource/kafka/aggData.go | 60 -------------------------- dataSource/kafka/iotaData.go | 23 ---------- dataSource/kafka/kafka_handler.go | 71 ------------------------------- 4 files changed, 184 deletions(-) delete mode 100644 dataSource/channels.go delete mode 100644 dataSource/kafka/aggData.go delete mode 100644 dataSource/kafka/iotaData.go delete mode 100644 dataSource/kafka/kafka_handler.go diff --git a/dataSource/channels.go b/dataSource/channels.go deleted file mode 100644 index b6b8b7d..0000000 --- a/dataSource/channels.go +++ /dev/null @@ -1,30 +0,0 @@ -package dataSource - -import ( - "gitea.anxinyun.cn/container/common_models" - "sync" -) - -type DataChannels struct { - RawDataChan chan common_models.IotaData - AggDataChan chan common_models.AggData -} - -var ( - once sync.Once - dataChannels *DataChannels -) - -func InitChannels() *DataChannels { - once.Do(func() { - dataChannels = &DataChannels{ - RawDataChan: make(chan common_models.IotaData, 1), - AggDataChan: make(chan common_models.AggData, 1), - } - }) - return dataChannels -} - -func GetChannels() *DataChannels { - return dataChannels -} diff --git a/dataSource/kafka/aggData.go b/dataSource/kafka/aggData.go deleted file mode 100644 index f358b45..0000000 --- a/dataSource/kafka/aggData.go +++ /dev/null @@ -1,60 +0,0 @@ -package kafka - -import ( - "dataSource" - "encoding/json" - "gitea.anxinyun.cn/container/common_models" - "gitea.anxinyun.cn/container/common_utils" - "gitea.anxinyun.cn/container/common_utils/configLoad" - "log" - "strings" - "time" -) - -type AggDataHandler struct { - configHelper *common_utils.ConfigHelper -} - -func NewAggDataHandler() *AggDataHandler { - redisAddr := configLoad.LoadConfig().GetString("redis.address") - return &AggDataHandler{ - configHelper: common_utils.NewConfigHelper(redisAddr), - } - -} - -func (h AggDataHandler) HandleMessage(message string) bool { - // aggDataMsg: {"date":"2024-09-19T09:39:59.999+0000","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}} - // aggDataMsg 中的时间为UTC格式 2024-04-19T01:10:59.999+0000, - // 在进行 json.Unmarshal() 时报错 - // 解决方案:先将 +0000 -> Z,然后再将 UTC 时间转换为中国时区时间("Asia/Shanghai") - - // 2024-09-28T23:59:59.999+0800 - // 将 2024-04-19T01:10:59.999+0000 -> 2024-04-19T01:10:59.999Z - utcTimeStr := strings.Replace(message, "+0800", "+08:00", 1) - utcTimeStr = strings.Replace(utcTimeStr, "+0000", "Z", 1) - - aggData := common_models.AggData{} - err := json.Unmarshal([]byte(utcTimeStr), &aggData) - if err != nil { - log.Printf("json parse error: %v", err) - return false - } - // 转换为中国时区时间("Asia/Shanghai") - loc, _ := time.LoadLocation("Asia/Shanghai") - chinaTime := aggData.Date.In(loc) - aggData.Date = chinaTime - //log.Printf("message:%v\n, cvt: %+v", message, aggData) - if aggData.ThingId == "" { - structure, err := h.configHelper.GetStructure(aggData.StructId) - if err != nil { - log.Printf("redis 中无 key = structure:%d 的缓存数据.", aggData.StructId) - return false - } - aggData.ThingId = structure.ThingId - } - - log.Printf("handler 处理sensorId[%d]消息", aggData.SensorId) - dataSource.GetChannels().AggDataChan <- aggData - return true -} diff --git a/dataSource/kafka/iotaData.go b/dataSource/kafka/iotaData.go deleted file mode 100644 index 4d2fc59..0000000 --- a/dataSource/kafka/iotaData.go +++ /dev/null @@ -1,23 +0,0 @@ -package kafka - -import ( - "dataSource" - "encoding/json" - "gitea.anxinyun.cn/container/common_models" - "log" -) - -type IotaDataHandler struct{} - -func (h IotaDataHandler) HandleMessage(message string) bool { - // 处理 alarm 消息 - rawData := common_models.IotaData{} - err := json.Unmarshal([]byte(message), &rawData) - if err != nil { - log.Printf("[dataSource/kafka/iotaData/IotaHandler] Parse msg error: %v", err) - return false - } - log.Printf("handler 处理[%s|%s]消息", rawData.DeviceId, rawData.TriggerTime) - dataSource.GetChannels().RawDataChan <- rawData - return true -} diff --git a/dataSource/kafka/kafka_handler.go b/dataSource/kafka/kafka_handler.go deleted file mode 100644 index 67251e9..0000000 --- a/dataSource/kafka/kafka_handler.go +++ /dev/null @@ -1,71 +0,0 @@ -package kafka - -import ( - "dataSource" - "gitea.anxinyun.cn/container/common_utils/configLoad" - "gitea.anxinyun.cn/container/common_utils/kafkaHelper" - "log" -) - -type KafkaDataSource struct { - groupId string - brokers []string - topics map[string]string - DataChannels *dataSource.DataChannels -} - -func NewKafkaDataSource() *KafkaDataSource { - // 初始化所有通道 - dataSource.InitChannels() - - // 读取配置 - config := configLoad.LoadConfig() - groupId := config.GetString("kafka.groupId") - brokers := config.GetStringSlice("kafka.brokers") - topics := config.GetStringMapString("kafka.topics") - - return &KafkaDataSource{ - groupId: groupId, - brokers: brokers, - topics: topics, - DataChannels: dataSource.GetChannels(), - } -} - -// Producer 将 kafka message -> 各数据模型 -> 各数据通道 -func (s *KafkaDataSource) Producer() { - // 消费数据 - kafkaConsumer := kafkaHelper.NewConsumerGroupHandler(s.brokers, s.groupId) - for cfgName, topic := range s.topics { - // 创建消息处理器 - handler := NewMessageHandler(cfgName) - if handler == nil { - log.Printf("Kafka topic【%s】未定义处理者。\n", cfgName) - continue - } - // 订阅主题 和 消息处理 - kafkaConsumer.Subscribe(topic, handler.HandleMessage) - } - - kafkaConsumer.Worker() -} - -// IMessageHandler 是 kafka 消息处理者接口 -type IMessageHandler interface { - HandleMessage(message string) bool -} - -// NewMessageHandler 是 MessageHandler 构造函数 -// cfgName: config.yaml 中 kafka.topics 的配置名 -func NewMessageHandler(cfgName string) IMessageHandler { - switch cfgName { - case "data_raw": - log.Printf("Kafka topic【%s】已注册处理者IotaDataHandler\n", cfgName) - return IotaDataHandler{} - case "data_agg": - log.Printf("Kafka topic【%s】已注册处理者AggDataHandler\n", cfgName) - return NewAggDataHandler() - default: - return nil - } -}