package data_source import ( "fmt" "gitea.anxinyun.cn/container/common_utils/configLoad" "github.com/spf13/viper" "log" "sync" ) type KafkaConfig struct { ClientID int // 暂时无用 Brokers []string GroupID string Topic string Partitions int } type TopicConfig struct { Topic string Partitions int } type KafkaConsumerConfig struct { RawData *RawDataConfig //`yaml:"data_raw"` AggData *AggDataConfig // `yaml:"data_agg"` } type RawDataConfig struct { MaxBatchSize int `yaml:"maxBatchSize"` IotaBufSize int `yaml:"iotaBufSize"` ProcessBufSize int `yaml:"processBufSize"` } type AggDataConfig struct { MaxBatchSize int `yaml:"maxBatchSize"` AggBufSize int `yaml:"aggBufSize"` } type KafkaDataSource struct { groupId string Brokers []string Topics map[string]TopicConfig Master_kafkaConsumer_config *KafkaConsumerConfig // 性能配置 RawDataHandlers *sync.Map // 原始数据处理器 AggDataHandlers *sync.Map // 聚集数据处理器 kafkaPaused bool controlChan chan string // 控制信号通道 } func NewKafkaDataSource() *KafkaDataSource { k := &KafkaDataSource{ controlChan: make(chan string), } k.loadKafkaConfig() return k } func (s *KafkaDataSource) loadKafkaConfig() { vp := configLoad.LoadConfig() groupId := vp.GetString("kafka.groupId") brokers := vp.GetStringSlice("kafka.Brokers") log.Println("消费者组 kafka.groupId:", groupId) s.groupId = groupId s.Brokers = brokers s.loadTopics(vp) s.loadKafkaConsumerConfig(vp) } func (s *KafkaDataSource) loadTopics(vp *viper.Viper) { topics := make(map[string]TopicConfig) // 定义要加载的主题列表 topicNames := []string{"data_raw", "data_agg"} for _, topicName := range topicNames { topic := vp.GetString(fmt.Sprintf("kafka.Topics.%s.topic", topicName)) if topic == "" { log.Printf("主题 kafka.Topics.%s.topic 配置为空", topicName) continue } partitions := vp.GetInt(fmt.Sprintf("kafka.Topics.%s.partitions", topicName)) if partitions <= 0 { partitions = 1 } topics[topicName] = TopicConfig{ Topic: topic, Partitions: partitions, } } s.Topics = topics } func (s *KafkaDataSource) loadKafkaConsumerConfig(vp *viper.Viper) { // 获取 kafkaConsumer 部分的配置 kafkaConsumerKey := "performance.master.kafkaConsumer" if !vp.IsSet(kafkaConsumerKey) { log.Panicf("配置 %s 必须存在", kafkaConsumerKey) } // 创建 KafkaConsumerConfig 实例 config := &KafkaConsumerConfig{} // 解析 data_raw 配置 if vp.IsSet(kafkaConsumerKey + ".data_raw") { dataRaw := &RawDataConfig{} if err := vp.UnmarshalKey(kafkaConsumerKey+".data_raw", dataRaw); err != nil { log.Panicf("解析 data_raw 配置失败: %v\n", err) } else { config.RawData = dataRaw } } // 解析 data_agg 配置 if vp.IsSet(kafkaConsumerKey + ".data_agg") { dataAgg := &AggDataConfig{} if err := vp.UnmarshalKey(kafkaConsumerKey+".data_agg", dataAgg); err != nil { log.Panicf("解析 data_agg 配置失败: %v\n", err) } else { config.AggData = dataAgg } } s.Master_kafkaConsumer_config = config } func (s *KafkaDataSource) AggDataProducer() { var wg sync.WaitGroup const topicCfgKey = "data_agg" topicCfg := s.Topics[topicCfgKey] if topicCfg.Topic == "" { log.Printf("Error: 启动 AggData Producer 失败,无 kafka.topics.data_agg 配置。") return } if s.Master_kafkaConsumer_config.AggData == nil { log.Printf("Error: 启动 AggData Producer 失败,无 performance.master.kafkaConsumer.data_agg 配置。") return } // 启动工作协程 wg.Add(1) go func(clientID int) { defer wg.Done() kafkaHandler := NewAggConsumerGroupHandler(KafkaConfig{ Brokers: s.Brokers, GroupID: s.groupId, Topic: topicCfg.Topic, Partitions: topicCfg.Partitions, ClientID: clientID, }) kafkaHandler.ControlChan = s.controlChan // 装载配置 for partId := 0; partId < topicCfg.Partitions; partId++ { key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId) msgHandler, ok := s.getDataHandler(topicCfgKey, key) if !ok || msgHandler == nil { log.Panicf("Kafka topic[%s] 未定义data_agg 消息处理者,跳过。\n", key) continue } // 把消息传递给 dataSource/kafka/AggDataHandler.HandleMessage([]string) kafkaHandler.SetTopicHandler(key, msgHandler.HandleMessage) } // 失败消息处理者 dlpKey := "DLP_DATA_RAW" dataHandler, ok := s.RawDataHandlers.Load(dlpKey) if !ok { log.Panicf("Kafka topic[%s] 未定义消息处理者,跳过。\n", dlpKey) } msgHandler, _ := dataHandler.(IMessageHandler) kafkaHandler.SetTopicHandler(dlpKey, msgHandler.HandleMessage) // 启动消费组 kafkaHandler.ConnectConsumerGroup() }(1) wg.Wait() } // Producer 将 kafka message -> 各数据模型 -> 各数据通道 func (s *KafkaDataSource) RawDataProducer() { var wg sync.WaitGroup const topicCfgKey = "data_raw" topicCfg := s.Topics[topicCfgKey] if topicCfg.Topic == "" { log.Printf("Error: 启动 RawData Producer 失败,无 kafka.topics.data_raw 配置。") return } if s.Master_kafkaConsumer_config.RawData == nil { log.Printf("Error: 启动 RawData Producer 失败,无 performance.master.kafkaConsumer.data_raw 配置。") return } // 启动工作协程 wg.Add(1) go func(clientID int) { defer wg.Done() kafkaHandler := NewRawConsumerGroupHandler(KafkaConfig{ Brokers: s.Brokers, GroupID: s.groupId, Topic: topicCfg.Topic, Partitions: topicCfg.Partitions, ClientID: clientID, }) kafkaHandler.ControlChan = s.controlChan for partId := 0; partId < topicCfg.Partitions; partId++ { key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId) msgHandler, ok := s.getDataHandler(topicCfgKey, key) if !ok || msgHandler == nil { log.Panicf("Kafka topic[%s] 未定义消息处理者,跳过。\n", key) continue } // 把消息传递给 dataSource/kafka/RawDataHandler.HandleMessage([]string) kafkaHandler.SetTopicHandler(key, msgHandler.HandleMessage) } // 失败消息处理者 dlpKey := "DLP_DATA_RAW" dataHandler, ok := s.RawDataHandlers.Load(dlpKey) if !ok { log.Panicf("Kafka topic[%s] 未定义消息处理者,跳过。\n", dlpKey) } msgHandler, _ := dataHandler.(IMessageHandler) kafkaHandler.SetTopicHandler(dlpKey, msgHandler.HandleMessage) //启动消费 kafkaHandler.ConnectConsumerGroup() }(1) wg.Wait() } // 根据 key 获取 dataHandler func (s *KafkaDataSource) getDataHandler(topicCfg, key string) (IMessageHandler, bool) { var dataHandler any var exists bool if topicCfg == "data_agg" { dataHandler, exists = s.AggDataHandlers.Load(key) } else if topicCfg == "data_raw" { dataHandler, exists = s.RawDataHandlers.Load(key) } if !exists { return nil, false } handler, ok := dataHandler.(IMessageHandler) if !ok { return nil, false } return handler, true } // 发送停止信号 func (s *KafkaDataSource) StopConsumers() { s.controlChan <- "stop" } // 发送恢复信号 func (s *KafkaDataSource) ResumeConsumers() { s.controlChan <- "resume" }