package data_source import ( "context" "encoding/json" "fmt" "gitea.anxinyun.cn/container/common_utils/configLoad" "github.com/IBM/sarama" "github.com/panjf2000/ants/v2" "golang.org/x/time/rate" "log" "sync" "time" ) type RawConsumerGroupHandler struct { kafkaConfig KafkaConfig topicHandlers sync.Map // 分区主题处理方法 kafkaPaused bool //是否处于暂停数据接收状态 ControlChan chan string // 控制信号通道 mu sync.RWMutex } func NewRawConsumerGroupHandler(kafkaConfig KafkaConfig) *RawConsumerGroupHandler { return &RawConsumerGroupHandler{ kafkaConfig: kafkaConfig, ControlChan: make(chan string), } } func (h *RawConsumerGroupHandler) ConnectConsumerGroup() { log.Println("RawData kafka init...") vp := configLoad.LoadConfig() minFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.minFetch") maxFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxFetch") maxWaitTime := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxWaitTime") // 消费者配置信息 config := sarama.NewConfig() config.Consumer.Return.Errors = false // 不返回消费过程中的错误 config.Version = sarama.V2_0_0_0 config.Consumer.Offsets.Initial = sarama.OffsetOldest // 从最旧的消息开始消费 config.Consumer.Offsets.AutoCommit.Enable = true // 启动自动提交偏移量 config.Consumer.Offsets.AutoCommit.Interval = 1000 * time.Millisecond config.Consumer.Fetch.Min = minFetch // 最小拉取 10 KB config.Consumer.Fetch.Max = maxFetch // 最大拉取 5 MB config.Consumer.MaxWaitTime = time.Duration(maxWaitTime) * time.Millisecond // 最大等待时间 ms config.Consumer.Retry.Backoff = 10000 * time.Millisecond // 消费失败后重试的延迟时间 //config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {} config.Consumer.Group.Session.Timeout = 60000 * time.Millisecond // 消费者的心跳 默认10s -> 30s config.Consumer.Group.Heartbeat.Interval = 6000 * time.Millisecond // Heartbeat 这个值必须小于 session.timeout.ms ,一般小于 session.timeout.ms/3,默认是3s config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} // 设置消费者组的负载均衡策略为轮询策略 // 创建消费者组 client, err := sarama.NewConsumerGroup(h.kafkaConfig.Brokers, h.kafkaConfig.GroupID, config) if err != nil { panic(err) } defer func() { _ = client.Close() }() // 启动错误处理协程 go func() { for err := range client.Errors() { log.Println("消费错误:", err) } }() // 接收控制信号 ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { for { select { case signal := <-h.ControlChan: switch signal { case "stop": log.Printf("[Raw-ConsumerGroup-%d] 收到停止信号,将停止消费.", h.kafkaConfig.ClientID) h.kafkaPaused = true case "resume": log.Printf("[Raw-ConsumerGroup-%d] 收到恢复信号,将恢复消费.", h.kafkaConfig.ClientID) h.kafkaPaused = false } case <-ctx.Done(): return } } }() log.Printf("[Raw-ConsumerGroup-%d] 准备启动 Kafka 消费者协程。订阅的主题: %v", h.kafkaConfig.ClientID, h.kafkaConfig.Topic) // 创建消费者实例 consumerInstance := h var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() for { // 消费 Kafka 消息 topics := []string{h.kafkaConfig.Topic} err1 := client.Consume(ctx, topics, consumerInstance) // 加入消费者组,并订阅指定主题。Kafka会为每个消费者分配相应的分区。 if err1 != nil { log.Printf("[Raw-ConsumerGroup-%d] 订阅主题[%v]异常。%s", h.kafkaConfig.ClientID, h.kafkaConfig.Topic, err1.Error()) return } if ctx.Err() != nil { log.Println(ctx.Err()) return } } }() log.Println("RawData Sarama consumer up and running ...") wg.Wait() } // Setup 在新会话开始时运行 func (h *RawConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { // 在此执行任何必要的设置任务。 log.Printf("data_raw消费者组会话开始,%+v \n", session.Claims()) return nil } // Cleanup 在会话结束时运行 func (h *RawConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { // 在此执行任何必要的清理任务。 log.Println("data_raw消费者组会话结束,", session.Claims()) return nil } // ConsumeClaim 启动消费者循环 func (h *RawConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { log.Printf("data_raw 处理消费者组会话,%+v, MemberID: %v, Topic: %v, Partition: %v \n", session.Claims(), session.MemberID(), claim.Topic(), claim.Partition()) topic := claim.Topic() isDeadLetterQueue := false // 是否为死信队列消息 if len(topic) > 4 && topic[:4] == "DLP_" { isDeadLetterQueue = true } if isDeadLetterQueue { return h.DLPConsumeClaim(session, claim) } else { return h.BatchConsumeClaim(session, claim) } } func (h *RawConsumerGroupHandler) BatchConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { //const maxBatchSize = 50 * 1024 // TODO 设置每个批次的最大字节数,例如 50kB maxBatchSize := configLoad.LoadConfig().GetInt("performance.master.kafkaConsumer.data_raw.maxBatchSize") messageChannel := make(chan map[string][]*sarama.ConsumerMessage, 100) topicHandlerKey := fmt.Sprintf("%s_%d", claim.Topic(), claim.Partition()) msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey) if !isValid { log.Printf("[Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition()) return fmt.Errorf("[Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition()) } messageMapPool := sync.Pool{ New: func() interface{} { return make(map[string][]*sarama.ConsumerMessage) }, } // 启动一个 goroutine 来处理消息 var wg sync.WaitGroup pool, _ := ants.NewPool(100) defer pool.Release() var rawRateLimiter = rate.NewLimiter(rate.Every(10*time.Millisecond), 1) go func() { for thingMessages := range messageChannel { _ = rawRateLimiter.Wait(context.Background()) // 控制消费速率 wg.Add(len(thingMessages)) for k, v := range thingMessages { key := k msgs := v _ = pool.Submit(func() { defer wg.Done() defer messageMapPool.Put(thingMessages) // 归还到池中 if len(msgs) == 0 { return } values := make([]string, len(msgs)) for i, msg := range msgs { values[i] = string(msg.Value) } // 处理消息并检查是否成功 isProcessed := msgHandler.(func(thingId string, values []string) bool)(key, values) //msgHandler(key, values) if !isProcessed { log.Printf("[Raw-ConsumerGroup] 消息处理失败,键: %s,消息: %v", key, msgs) } else { // 处理成功后,消息标记为已处理 for _, msg := range msgs { session.MarkMessage(msg, "is handled") } } }) } } }() batchBuffer := make(map[string][]*sarama.ConsumerMessage) // 按 thingId 分类的批次缓冲 currentBatchSize := make(map[string]int) // 记录每个 thingId 的当前批次大小 // 定义一个定时器,用于处理剩余消息 ctx, cancel := context.WithCancel(context.Background()) defer cancel() ticker := time.NewTicker(20 * time.Second) defer ticker.Stop() go func() { for { select { case <-ticker.C: // 处理剩余的消息 for thingId, msgs := range batchBuffer { if len(msgs) > 0 { msgMap := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage) msgMap[thingId] = msgs messageChannel <- msgMap delete(batchBuffer, thingId) delete(currentBatchSize, thingId) // 统一清理 } } case <-ctx.Done(): return // 退出 Goroutine } } }() // 读消息 for msg := range claim.Messages() { thingId := string(msg.Key) if thingId == "" { thingId = "thingId-null" } // 将消息添加到批次缓冲 batchBuffer[thingId] = append(batchBuffer[thingId], msg) // 计算当前 key 的消息大小 currentBatchSize[thingId] += len(msg.Value) // 如果当前批次达到 maxBatchSize,发送到通道并重置 if currentBatchSize[thingId] >= maxBatchSize { // 从池中获取 map 对象 thingMessages := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage) thingMessages[thingId] = batchBuffer[thingId] messageChannel <- thingMessages delete(batchBuffer, thingId) // 清除已处理的键 delete(currentBatchSize, thingId) // 清除已处理的大小 } } close(messageChannel) wg.Wait() return nil } func (h *RawConsumerGroupHandler) DLPConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { topicHandlerKey := "DLP_DATA_RAW" msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey) if !isValid { log.Printf("[DLP-Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition()) return fmt.Errorf("[DLP-Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition()) } for msg := range claim.Messages() { thingId := string(msg.Key) if thingId == "" { thingId = "thingId-null" } // 解析 value var value []string err := json.Unmarshal(msg.Value, &value) if err != nil { log.Printf("[DLP_Raw-ConsumerGroup]Failed to unmarshal value: %v", err) continue } isProcessed := msgHandler.(func(thingId string, values []string) bool)(thingId, value) if !isProcessed { log.Printf("[DLP_Raw-ConsumerGroup]消息处理失败,thingId: %s,消息: %v", thingId, value) } else { // 处理成功后,消息标记为已处理 session.MarkMessage(msg, "is handled") } } return nil } func (h *RawConsumerGroupHandler) SetTopicHandler(topicHandlerKey string, fun func(thingId string, values []string) bool) { h.mu.Lock() defer h.mu.Unlock() h.topicHandlers.Store(topicHandlerKey, fun) } // 消息消费启动控制 func (h *RawConsumerGroupHandler) SetKafkaPaused(paused bool) { h.kafkaPaused = paused if paused { log.Println("Kafka消息消费 已暂停.") } else { log.Println("Kafka消息消费 已恢复.") } } //// TODO 动态调整消费速率 //func (h *RawConsumerGroupHandler) SetConsumeRate(interval time.Duration) { // h.mu.Lock() // defer h.mu.Unlock() // rateLimiter.SetLimit(rate.Every(interval)) //}