package data_source import ( "context" "encoding/json" "fmt" "gitea.anxinyun.cn/container/common_utils/configLoad" "github.com/IBM/sarama" "github.com/panjf2000/ants/v2" "golang.org/x/time/rate" "log" "sync" "time" ) type AggConsumerGroupHandler struct { kafkaConfig KafkaConfig topicHandlers sync.Map // 主题处理方法 kafkaPaused bool //是否处于暂停数据接收状态 ControlChan chan string // 控制信号通道 mu sync.RWMutex } func NewAggConsumerGroupHandler(kafkaConfig KafkaConfig) *AggConsumerGroupHandler { return &AggConsumerGroupHandler{ kafkaConfig: kafkaConfig, ControlChan: make(chan string), } } func (h *AggConsumerGroupHandler) ConnectConsumerGroup() { log.Println("AggData kafka init...") vp := configLoad.LoadConfig() minFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.minFetch") maxFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxFetch") maxWaitTime := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxWaitTime") // 消费者配置信息 config := sarama.NewConfig() config.Consumer.Return.Errors = false // 不返回消费过程中的错误 config.Version = sarama.V2_0_0_0 config.Consumer.Offsets.Initial = sarama.OffsetOldest // 从最旧的消息开始消费 config.Consumer.Offsets.AutoCommit.Enable = true // 启动自动提交偏移量 config.Consumer.Offsets.AutoCommit.Interval = 1000 * time.Millisecond config.Consumer.Fetch.Min = minFetch // 最小拉取 10 KB config.Consumer.Fetch.Max = maxFetch // 最大拉取 5 MB config.Consumer.MaxWaitTime = time.Duration(maxWaitTime) * time.Millisecond // 最大等待时间 ms config.Consumer.Retry.Backoff = 10000 * time.Millisecond // 消费失败后重试的延迟时间 //config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {} config.Consumer.Group.Session.Timeout = 60000 * time.Millisecond // 消费者的心跳 默认10s -> 30s config.Consumer.Group.Heartbeat.Interval = 6000 * time.Millisecond // Heartbeat 这个值必须小于 session.timeout.ms ,一般小于 session.timeout.ms/3,默认是3s config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} // 设置消费者组的负载均衡策略为轮询策略 // 创建消费者组 client, err := sarama.NewConsumerGroup(h.kafkaConfig.Brokers, h.kafkaConfig.GroupID, config) if err != nil { panic(err) } defer func() { _ = client.Close() }() // 启动错误处理协程 go func() { for err := range client.Errors() { log.Println("消费错误:", err) } }() ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 接收控制信号 go func() { for { select { case signal := <-h.ControlChan: switch signal { case "stop": log.Printf("[Agg-ConsumerGroup-%d] 收到停止信号,将停止消费.", h.kafkaConfig.ClientID) h.kafkaPaused = true case "resume": log.Printf("[Agg-ConsumerGroup-%d] 收到恢复信号,将恢复消费.", h.kafkaConfig.ClientID) h.kafkaPaused = false } } } }() log.Printf("[Agg-ConsumerGroup-%d] 准备启动 Kafka 消费者协程。订阅的主题: %v", h.kafkaConfig.ClientID, h.kafkaConfig.Topic) // 创建消费者实例 consumerInstance := h var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() for { topics := []string{h.kafkaConfig.Topic} err1 := client.Consume(ctx, topics, consumerInstance) if err1 != nil { log.Printf("[Agg-ConsumerGroup-%d] 订阅主题[%v]异常。%s", h.kafkaConfig.ClientID, h.kafkaConfig.Topic, err1.Error()) return } if ctx.Err() != nil { log.Println(ctx.Err()) return } } }() log.Println("AggData Sarama consumer up and running ...") wg.Wait() } func (h *AggConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { // 在此执行任何必要的设置任务。 log.Printf("data_agg消费者组会话开始,%+v", session.Claims()) return nil } func (h *AggConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { // 在此执行任何必要的清理任务。 log.Println("data_agg消费者组会话结束,", session.Claims()) return nil } func (h *AggConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { log.Printf("data_agg 处理消费者组会话,%+v. MemberID: %v, Topic: %v, Partition: %v \n", session.Claims(), session.MemberID(), claim.Topic(), claim.Partition()) topic := claim.Topic() isDeadLetterQueue := false // 是否为死信队列消息 if len(topic) > 4 && topic[:4] == "DLP_" { isDeadLetterQueue = true } if isDeadLetterQueue { return h.DLPConsumeClaim(session, claim) } else { return h.BatchConsumeClaim(session, claim) } } func (h *AggConsumerGroupHandler) BatchConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { maxBatchSize := configLoad.LoadConfig().GetInt("performance.master.kafkaConsumer.data_agg.maxBatchSize") messageChannel := make(chan map[string][]*sarama.ConsumerMessage, 50) topicHandlerKey := fmt.Sprintf("%s_%d", claim.Topic(), claim.Partition()) msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey) if !isValid { log.Printf("[Agg-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition()) return fmt.Errorf("[Agg-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition()) } // 使用 sync.Pool 复用 map 对象 messageMapPool := sync.Pool{ New: func() interface{} { return make(map[string][]*sarama.ConsumerMessage) }, } // 启动一个 goroutine 来处理消息 var wg sync.WaitGroup pool, _ := ants.NewPool(100) defer pool.Release() //创建一个速率限制器,它允许每 10 毫秒执行一次操作,且在这个时间窗口内最多只能执行一次。 var aggRateLimiter = rate.NewLimiter(rate.Every(10*time.Millisecond), 1) go func() { for structMessages := range messageChannel { _ = aggRateLimiter.Wait(context.Background()) // 控制消费速率 wg.Add(len(structMessages)) for k, v := range structMessages { key := k msgs := v _ = pool.Submit(func() { defer wg.Done() defer messageMapPool.Put(structMessages) if len(msgs) == 0 { return } values := make([]string, len(msgs)) for i, msg := range msgs { values[i] = string(msg.Value) } // 处理消息并检查是否成功 isProcessed := msgHandler.(func(structId string, values []string) bool)(key, values) //msgHandler(key, values) if !isProcessed { log.Printf("[Agg-ConsumerGroup] 消息处理失败,键: %s,消息: %v", key, msgs) } else { // 处理成功后,消息标记为已处理 for _, msg := range msgs { session.MarkMessage(msg, "is handled") } } }) } } }() batchBuffer := make(map[string][]*sarama.ConsumerMessage) // 按键分类的批次缓冲 currentBatchSize := make(map[string]int) // 记录每个 key 的当前批次大小 // 定义一个定时器,用于处理残余消息 ctx, cancel := context.WithCancel(context.Background()) defer cancel() ticker := time.NewTicker(20 * time.Second) defer ticker.Stop() go func() { for { select { case <-ticker.C: // 处理剩余的消息 for structId, msgs := range batchBuffer { if len(msgs) > 0 { // 从池中获取 map 对象 msgMap := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage) msgMap[structId] = msgs messageChannel <- msgMap delete(batchBuffer, structId) // 清除已处理的键 delete(currentBatchSize, structId) } } case <-ctx.Done(): return // 退出 Goroutine } } }() // 读消息 for msg := range claim.Messages() { structId := string(msg.Key) if structId == "" { structId = "structId-null" } // 将消息添加到批次缓冲 batchBuffer[structId] = append(batchBuffer[structId], msg) // 计算当前 key 的消息大小 currentBatchSize[structId] += len(msg.Value) // 如果当前批次达到 maxBatchSize,发送到通道并重置 if currentBatchSize[structId] >= maxBatchSize { // 从池中获取 map 对象 msgMap := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage) msgMap[structId] = batchBuffer[structId] messageChannel <- msgMap delete(batchBuffer, structId) // 清除已处理的键 delete(currentBatchSize, structId) // 清除已处理的大小 } } close(messageChannel) wg.Wait() return nil } func (h *AggConsumerGroupHandler) DLPConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { topicHandlerKey := "DLP_DATA_AGG" msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey) if !isValid { log.Printf("[DLP-Agg-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition()) return fmt.Errorf("[DLP-Agg-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition()) } for msg := range claim.Messages() { structId := string(msg.Key) if structId == "" { structId = "structId-null" } // 解析 value var value []string err := json.Unmarshal(msg.Value, &value) if err != nil { log.Printf("[DLP_Agg-ConsumerGroup]Failed to unmarshal value: %v", err) continue } isProcessed := msgHandler.(func(structId string, values []string) bool)(structId, value) if !isProcessed { log.Printf("[DLP_Agg-ConsumerGroup]消息处理失败,structId: %s,消息: %v", structId, value) } else { // 处理成功后,消息标记为已处理 session.MarkMessage(msg, "is handled") } } return nil } func (h *AggConsumerGroupHandler) SetTopicHandler(topicHandlerKey string, fun func(structId string, values []string) bool) { h.mu.Lock() defer h.mu.Unlock() h.topicHandlers.Store(topicHandlerKey, fun) } // 消息消费启动控制 func (h *AggConsumerGroupHandler) SetKafkaPaused(paused bool) { h.kafkaPaused = paused if paused { log.Println("Kafka消息消费 已暂停.") } else { log.Println("Kafka消息消费 已恢复.") } }