et-go 20240919重建
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

317 lines
10 KiB

package data_source
import (
"context"
"encoding/json"
"fmt"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"github.com/IBM/sarama"
"github.com/panjf2000/ants/v2"
"golang.org/x/time/rate"
"log"
"sync"
"time"
)
type AggConsumerGroupHandler struct {
kafkaConfig KafkaConfig
topicHandlers sync.Map // 主题处理方法
kafkaPaused bool //是否处于暂停数据接收状态
ControlChan chan string // 控制信号通道
mu sync.RWMutex
}
func NewAggConsumerGroupHandler(kafkaConfig KafkaConfig) *AggConsumerGroupHandler {
return &AggConsumerGroupHandler{
kafkaConfig: kafkaConfig,
ControlChan: make(chan string),
}
}
func (h *AggConsumerGroupHandler) ConnectConsumerGroup() {
log.Println("AggData kafka init...")
vp := configLoad.LoadConfig()
minFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.minFetch")
maxFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxFetch")
maxWaitTime := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxWaitTime")
// 消费者配置信息
config := sarama.NewConfig()
config.Consumer.Return.Errors = false // 不返回消费过程中的错误
config.Version = sarama.V2_0_0_0
config.Consumer.Offsets.Initial = sarama.OffsetOldest // 从最旧的消息开始消费
config.Consumer.Offsets.AutoCommit.Enable = true // 启动自动提交偏移量
config.Consumer.Offsets.AutoCommit.Interval = 1000 * time.Millisecond
config.Consumer.Fetch.Min = minFetch // 最小拉取 10 KB
config.Consumer.Fetch.Max = maxFetch // 最大拉取 5 MB
config.Consumer.MaxWaitTime = time.Duration(maxWaitTime) * time.Millisecond // 最大等待时间 ms
config.Consumer.Retry.Backoff = 10000 * time.Millisecond // 消费失败后重试的延迟时间
//config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {}
config.Consumer.Group.Session.Timeout = 60000 * time.Millisecond // 消费者的心跳 默认10s -> 30s
config.Consumer.Group.Heartbeat.Interval = 6000 * time.Millisecond // Heartbeat 这个值必须小于 session.timeout.ms ,一般小于 session.timeout.ms/3,默认是3s
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} // 设置消费者组的负载均衡策略为轮询策略
// 创建消费者组
client, err := sarama.NewConsumerGroup(h.kafkaConfig.Brokers, h.kafkaConfig.GroupID, config)
if err != nil {
panic(err)
}
defer func() {
_ = client.Close()
}()
// 启动错误处理协程
go func() {
for err := range client.Errors() {
log.Println("消费错误:", err)
}
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 接收控制信号
go func() {
for {
select {
case signal := <-h.ControlChan:
switch signal {
case "stop":
log.Printf("[Agg-ConsumerGroup-%d] 收到停止信号,将停止消费.", h.kafkaConfig.ClientID)
h.kafkaPaused = true
case "resume":
log.Printf("[Agg-ConsumerGroup-%d] 收到恢复信号,将恢复消费.", h.kafkaConfig.ClientID)
h.kafkaPaused = false
}
}
}
}()
log.Printf("[Agg-ConsumerGroup-%d] 准备启动 Kafka 消费者协程。订阅的主题: %v", h.kafkaConfig.ClientID, h.kafkaConfig.Topic)
// 创建消费者实例
consumerInstance := h
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
topics := []string{h.kafkaConfig.Topic}
err1 := client.Consume(ctx, topics, consumerInstance)
if err1 != nil {
log.Printf("[Agg-ConsumerGroup-%d] 订阅主题[%v]异常。%s", h.kafkaConfig.ClientID, h.kafkaConfig.Topic, err1.Error())
return
}
if ctx.Err() != nil {
log.Println(ctx.Err())
return
}
}
}()
log.Println("AggData Sarama consumer up and running ...")
wg.Wait()
}
func (h *AggConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
// 在此执行任何必要的设置任务。
log.Printf("data_agg消费者组会话开始,%+v", session.Claims())
return nil
}
func (h *AggConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
// 在此执行任何必要的清理任务。
log.Println("data_agg消费者组会话结束,", session.Claims())
return nil
}
func (h *AggConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
log.Printf("data_agg 处理消费者组会话,%+v. MemberID: %v, Topic: %v, Partition: %v \n", session.Claims(), session.MemberID(), claim.Topic(), claim.Partition())
topic := claim.Topic()
isDeadLetterQueue := false // 是否为死信队列消息
if len(topic) > 4 && topic[:4] == "DLP_" {
isDeadLetterQueue = true
}
if isDeadLetterQueue {
return h.DLPConsumeClaim(session, claim)
} else {
return h.BatchConsumeClaim(session, claim)
}
}
func (h *AggConsumerGroupHandler) BatchConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
maxBatchSize := configLoad.LoadConfig().GetInt("performance.master.kafkaConsumer.data_agg.maxBatchSize")
messageChannel := make(chan map[string][]*sarama.ConsumerMessage, 50)
topicHandlerKey := fmt.Sprintf("%s_%d", claim.Topic(), claim.Partition())
msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey)
if !isValid {
log.Printf("[Agg-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
return fmt.Errorf("[Agg-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
}
// 使用 sync.Pool 复用 map 对象
messageMapPool := sync.Pool{
New: func() interface{} {
return make(map[string][]*sarama.ConsumerMessage)
},
}
// 启动一个 goroutine 来处理消息
var wg sync.WaitGroup
pool, _ := ants.NewPool(100)
defer pool.Release()
//创建一个速率限制器,它允许每 10 毫秒执行一次操作,且在这个时间窗口内最多只能执行一次。
var aggRateLimiter = rate.NewLimiter(rate.Every(10*time.Millisecond), 1)
go func() {
for structMessages := range messageChannel {
_ = aggRateLimiter.Wait(context.Background()) // 控制消费速率
wg.Add(len(structMessages))
for k, v := range structMessages {
key := k
msgs := v
_ = pool.Submit(func() {
defer wg.Done()
defer messageMapPool.Put(structMessages)
if len(msgs) == 0 {
return
}
values := make([]string, len(msgs))
for i, msg := range msgs {
values[i] = string(msg.Value)
}
// 处理消息并检查是否成功
isProcessed := msgHandler.(func(structId string, values []string) bool)(key, values) //msgHandler(key, values)
if !isProcessed {
log.Printf("[Agg-ConsumerGroup] 消息处理失败,键: %s,消息: %v", key, msgs)
} else {
// 处理成功后,消息标记为已处理
for _, msg := range msgs {
session.MarkMessage(msg, "is handled")
}
}
})
}
}
}()
batchBuffer := make(map[string][]*sarama.ConsumerMessage) // 按键分类的批次缓冲
currentBatchSize := make(map[string]int) // 记录每个 key 的当前批次大小
// 定义一个定时器,用于处理残余消息
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ticker := time.NewTicker(20 * time.Second)
defer ticker.Stop()
go func() {
for {
select {
case <-ticker.C:
// 处理剩余的消息
for structId, msgs := range batchBuffer {
if len(msgs) > 0 {
// 从池中获取 map 对象
msgMap := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage)
msgMap[structId] = msgs
messageChannel <- msgMap
delete(batchBuffer, structId) // 清除已处理的键
delete(currentBatchSize, structId)
}
}
case <-ctx.Done():
return // 退出 Goroutine
}
}
}()
// 读消息
for msg := range claim.Messages() {
structId := string(msg.Key)
if structId == "" {
structId = "structId-null"
}
// 将消息添加到批次缓冲
batchBuffer[structId] = append(batchBuffer[structId], msg)
// 计算当前 key 的消息大小
currentBatchSize[structId] += len(msg.Value)
// 如果当前批次达到 maxBatchSize,发送到通道并重置
if currentBatchSize[structId] >= maxBatchSize {
// 从池中获取 map 对象
msgMap := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage)
msgMap[structId] = batchBuffer[structId]
messageChannel <- msgMap
delete(batchBuffer, structId) // 清除已处理的键
delete(currentBatchSize, structId) // 清除已处理的大小
}
}
close(messageChannel)
wg.Wait()
return nil
}
func (h *AggConsumerGroupHandler) DLPConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
topicHandlerKey := "DLP_DATA_AGG"
msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey)
if !isValid {
log.Printf("[DLP-Agg-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
return fmt.Errorf("[DLP-Agg-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
}
for msg := range claim.Messages() {
structId := string(msg.Key)
if structId == "" {
structId = "structId-null"
}
// 解析 value
var value []string
err := json.Unmarshal(msg.Value, &value)
if err != nil {
log.Printf("[DLP_Agg-ConsumerGroup]Failed to unmarshal value: %v", err)
continue
}
isProcessed := msgHandler.(func(structId string, values []string) bool)(structId, value)
if !isProcessed {
log.Printf("[DLP_Agg-ConsumerGroup]消息处理失败,structId: %s,消息: %v", structId, value)
} else {
// 处理成功后,消息标记为已处理
session.MarkMessage(msg, "is handled")
}
}
return nil
}
func (h *AggConsumerGroupHandler) SetTopicHandler(topicHandlerKey string, fun func(structId string, values []string) bool) {
h.mu.Lock()
defer h.mu.Unlock()
h.topicHandlers.Store(topicHandlerKey, fun)
}
// 消息消费启动控制
func (h *AggConsumerGroupHandler) SetKafkaPaused(paused bool) {
h.kafkaPaused = paused
if paused {
log.Println("Kafka消息消费 已暂停.")
} else {
log.Println("Kafka消息消费 已恢复.")
}
}