et-go 20240919重建
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

329 lines
10 KiB

package data_source
import (
"context"
"encoding/json"
"fmt"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"github.com/IBM/sarama"
"github.com/panjf2000/ants/v2"
"golang.org/x/time/rate"
"log"
"sync"
"time"
)
type RawConsumerGroupHandler struct {
kafkaConfig KafkaConfig
topicHandlers sync.Map // 分区主题处理方法
kafkaPaused bool //是否处于暂停数据接收状态
ControlChan chan string // 控制信号通道
mu sync.RWMutex
}
func NewRawConsumerGroupHandler(kafkaConfig KafkaConfig) *RawConsumerGroupHandler {
return &RawConsumerGroupHandler{
kafkaConfig: kafkaConfig,
ControlChan: make(chan string),
}
}
func (h *RawConsumerGroupHandler) ConnectConsumerGroup() {
log.Println("RawData kafka init...")
vp := configLoad.LoadConfig()
minFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.minFetch")
maxFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxFetch")
maxWaitTime := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxWaitTime")
// 消费者配置信息
config := sarama.NewConfig()
config.Consumer.Return.Errors = false // 不返回消费过程中的错误
config.Version = sarama.V2_0_0_0
config.Consumer.Offsets.Initial = sarama.OffsetOldest // 从最旧的消息开始消费
config.Consumer.Offsets.AutoCommit.Enable = true // 启动自动提交偏移量
config.Consumer.Offsets.AutoCommit.Interval = 1000 * time.Millisecond
config.Consumer.Fetch.Min = minFetch // 最小拉取 10 KB
config.Consumer.Fetch.Max = maxFetch // 最大拉取 5 MB
config.Consumer.MaxWaitTime = time.Duration(maxWaitTime) * time.Millisecond // 最大等待时间 ms
config.Consumer.Retry.Backoff = 10000 * time.Millisecond // 消费失败后重试的延迟时间
//config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {}
config.Consumer.Group.Session.Timeout = 60000 * time.Millisecond // 消费者的心跳 默认10s -> 30s
config.Consumer.Group.Heartbeat.Interval = 6000 * time.Millisecond // Heartbeat 这个值必须小于 session.timeout.ms ,一般小于 session.timeout.ms/3,默认是3s
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} // 设置消费者组的负载均衡策略为轮询策略
// 创建消费者组
client, err := sarama.NewConsumerGroup(h.kafkaConfig.Brokers, h.kafkaConfig.GroupID, config)
if err != nil {
panic(err)
}
defer func() {
_ = client.Close()
}()
// 启动错误处理协程
go func() {
for err := range client.Errors() {
log.Println("消费错误:", err)
}
}()
// 接收控制信号
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for {
select {
case signal := <-h.ControlChan:
switch signal {
case "stop":
log.Printf("[Raw-ConsumerGroup-%d] 收到停止信号,将停止消费.", h.kafkaConfig.ClientID)
h.kafkaPaused = true
case "resume":
log.Printf("[Raw-ConsumerGroup-%d] 收到恢复信号,将恢复消费.", h.kafkaConfig.ClientID)
h.kafkaPaused = false
}
case <-ctx.Done():
return
}
}
}()
log.Printf("[Raw-ConsumerGroup-%d] 准备启动 Kafka 消费者协程。订阅的主题: %v", h.kafkaConfig.ClientID, h.kafkaConfig.Topic)
// 创建消费者实例
consumerInstance := h
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
// 消费 Kafka 消息
topics := []string{h.kafkaConfig.Topic}
err1 := client.Consume(ctx, topics, consumerInstance) // 加入消费者组,并订阅指定主题。Kafka会为每个消费者分配相应的分区。
if err1 != nil {
log.Printf("[Raw-ConsumerGroup-%d] 订阅主题[%v]异常。%s", h.kafkaConfig.ClientID, h.kafkaConfig.Topic, err1.Error())
return
}
if ctx.Err() != nil {
log.Println(ctx.Err())
return
}
}
}()
log.Println("RawData Sarama consumer up and running ...")
wg.Wait()
}
// Setup 在新会话开始时运行
func (h *RawConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
// 在此执行任何必要的设置任务。
log.Printf("data_raw消费者组会话开始,%+v \n", session.Claims())
return nil
}
// Cleanup 在会话结束时运行
func (h *RawConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
// 在此执行任何必要的清理任务。
log.Println("data_raw消费者组会话结束,", session.Claims())
return nil
}
// ConsumeClaim 启动消费者循环
func (h *RawConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
log.Printf("data_raw 处理消费者组会话,%+v, MemberID: %v, Topic: %v, Partition: %v \n", session.Claims(), session.MemberID(), claim.Topic(), claim.Partition())
topic := claim.Topic()
isDeadLetterQueue := false // 是否为死信队列消息
if len(topic) > 4 && topic[:4] == "DLP_" {
isDeadLetterQueue = true
}
if isDeadLetterQueue {
return h.DLPConsumeClaim(session, claim)
} else {
return h.BatchConsumeClaim(session, claim)
}
}
func (h *RawConsumerGroupHandler) BatchConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
//const maxBatchSize = 50 * 1024 // TODO 设置每个批次的最大字节数,例如 50kB
maxBatchSize := configLoad.LoadConfig().GetInt("performance.master.kafkaConsumer.data_raw.maxBatchSize")
messageChannel := make(chan map[string][]*sarama.ConsumerMessage, 100)
topicHandlerKey := fmt.Sprintf("%s_%d", claim.Topic(), claim.Partition())
msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey)
if !isValid {
log.Printf("[Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
return fmt.Errorf("[Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
}
messageMapPool := sync.Pool{
New: func() interface{} {
return make(map[string][]*sarama.ConsumerMessage)
},
}
// 启动一个 goroutine 来处理消息
var wg sync.WaitGroup
pool, _ := ants.NewPool(100)
defer pool.Release()
var rawRateLimiter = rate.NewLimiter(rate.Every(10*time.Millisecond), 1)
go func() {
for thingMessages := range messageChannel {
_ = rawRateLimiter.Wait(context.Background()) // 控制消费速率
wg.Add(len(thingMessages))
for k, v := range thingMessages {
key := k
msgs := v
_ = pool.Submit(func() {
defer wg.Done()
defer messageMapPool.Put(thingMessages) // 归还到池中
if len(msgs) == 0 {
return
}
values := make([]string, len(msgs))
for i, msg := range msgs {
values[i] = string(msg.Value)
}
// 处理消息并检查是否成功
isProcessed := msgHandler.(func(thingId string, values []string) bool)(key, values) //msgHandler(key, values)
if !isProcessed {
log.Printf("[Raw-ConsumerGroup] 消息处理失败,键: %s,消息: %v", key, msgs)
} else {
// 处理成功后,消息标记为已处理
for _, msg := range msgs {
session.MarkMessage(msg, "is handled")
}
}
})
}
}
}()
batchBuffer := make(map[string][]*sarama.ConsumerMessage) // 按 thingId 分类的批次缓冲
currentBatchSize := make(map[string]int) // 记录每个 thingId 的当前批次大小
// 定义一个定时器,用于处理剩余消息
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ticker := time.NewTicker(20 * time.Second)
defer ticker.Stop()
go func() {
for {
select {
case <-ticker.C:
// 处理剩余的消息
for thingId, msgs := range batchBuffer {
if len(msgs) > 0 {
msgMap := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage)
msgMap[thingId] = msgs
messageChannel <- msgMap
delete(batchBuffer, thingId)
delete(currentBatchSize, thingId) // 统一清理
}
}
case <-ctx.Done():
return // 退出 Goroutine
}
}
}()
// 读消息
for msg := range claim.Messages() {
thingId := string(msg.Key)
if thingId == "" {
thingId = "thingId-null"
}
// 将消息添加到批次缓冲
batchBuffer[thingId] = append(batchBuffer[thingId], msg)
// 计算当前 key 的消息大小
currentBatchSize[thingId] += len(msg.Value)
// 如果当前批次达到 maxBatchSize,发送到通道并重置
if currentBatchSize[thingId] >= maxBatchSize {
// 从池中获取 map 对象
thingMessages := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage)
thingMessages[thingId] = batchBuffer[thingId]
messageChannel <- thingMessages
delete(batchBuffer, thingId) // 清除已处理的键
delete(currentBatchSize, thingId) // 清除已处理的大小
}
}
close(messageChannel)
wg.Wait()
return nil
}
func (h *RawConsumerGroupHandler) DLPConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
topicHandlerKey := "DLP_DATA_RAW"
msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey)
if !isValid {
log.Printf("[DLP-Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
return fmt.Errorf("[DLP-Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
}
for msg := range claim.Messages() {
thingId := string(msg.Key)
if thingId == "" {
thingId = "thingId-null"
}
// 解析 value
var value []string
err := json.Unmarshal(msg.Value, &value)
if err != nil {
log.Printf("[DLP_Raw-ConsumerGroup]Failed to unmarshal value: %v", err)
continue
}
isProcessed := msgHandler.(func(thingId string, values []string) bool)(thingId, value)
if !isProcessed {
log.Printf("[DLP_Raw-ConsumerGroup]消息处理失败,thingId: %s,消息: %v", thingId, value)
} else {
// 处理成功后,消息标记为已处理
session.MarkMessage(msg, "is handled")
}
}
return nil
}
func (h *RawConsumerGroupHandler) SetTopicHandler(topicHandlerKey string, fun func(thingId string, values []string) bool) {
h.mu.Lock()
defer h.mu.Unlock()
h.topicHandlers.Store(topicHandlerKey, fun)
}
// 消息消费启动控制
func (h *RawConsumerGroupHandler) SetKafkaPaused(paused bool) {
h.kafkaPaused = paused
if paused {
log.Println("Kafka消息消费 已暂停.")
} else {
log.Println("Kafka消息消费 已恢复.")
}
}
//// TODO 动态调整消费速率
//func (h *RawConsumerGroupHandler) SetConsumeRate(interval time.Duration) {
// h.mu.Lock()
// defer h.mu.Unlock()
// rateLimiter.SetLimit(rate.Every(interval))
//}