You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
329 lines
10 KiB
329 lines
10 KiB
package data_source
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"gitea.anxinyun.cn/container/common_utils/configLoad"
|
|
"github.com/IBM/sarama"
|
|
"github.com/panjf2000/ants/v2"
|
|
"golang.org/x/time/rate"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type RawConsumerGroupHandler struct {
|
|
kafkaConfig KafkaConfig
|
|
topicHandlers sync.Map // 分区主题处理方法
|
|
|
|
kafkaPaused bool //是否处于暂停数据接收状态
|
|
ControlChan chan string // 控制信号通道
|
|
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func NewRawConsumerGroupHandler(kafkaConfig KafkaConfig) *RawConsumerGroupHandler {
|
|
return &RawConsumerGroupHandler{
|
|
kafkaConfig: kafkaConfig,
|
|
ControlChan: make(chan string),
|
|
}
|
|
}
|
|
|
|
func (h *RawConsumerGroupHandler) ConnectConsumerGroup() {
|
|
log.Println("RawData kafka init...")
|
|
vp := configLoad.LoadConfig()
|
|
minFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.minFetch")
|
|
maxFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxFetch")
|
|
maxWaitTime := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxWaitTime")
|
|
|
|
// 消费者配置信息
|
|
config := sarama.NewConfig()
|
|
config.Consumer.Return.Errors = false // 不返回消费过程中的错误
|
|
config.Version = sarama.V2_0_0_0
|
|
config.Consumer.Offsets.Initial = sarama.OffsetOldest // 从最旧的消息开始消费
|
|
config.Consumer.Offsets.AutoCommit.Enable = true // 启动自动提交偏移量
|
|
config.Consumer.Offsets.AutoCommit.Interval = 1000 * time.Millisecond
|
|
config.Consumer.Fetch.Min = minFetch // 最小拉取 10 KB
|
|
config.Consumer.Fetch.Max = maxFetch // 最大拉取 5 MB
|
|
config.Consumer.MaxWaitTime = time.Duration(maxWaitTime) * time.Millisecond // 最大等待时间 ms
|
|
config.Consumer.Retry.Backoff = 10000 * time.Millisecond // 消费失败后重试的延迟时间
|
|
//config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {}
|
|
config.Consumer.Group.Session.Timeout = 60000 * time.Millisecond // 消费者的心跳 默认10s -> 30s
|
|
config.Consumer.Group.Heartbeat.Interval = 6000 * time.Millisecond // Heartbeat 这个值必须小于 session.timeout.ms ,一般小于 session.timeout.ms/3,默认是3s
|
|
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} // 设置消费者组的负载均衡策略为轮询策略
|
|
|
|
// 创建消费者组
|
|
client, err := sarama.NewConsumerGroup(h.kafkaConfig.Brokers, h.kafkaConfig.GroupID, config)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer func() {
|
|
_ = client.Close()
|
|
}()
|
|
|
|
// 启动错误处理协程
|
|
go func() {
|
|
for err := range client.Errors() {
|
|
log.Println("消费错误:", err)
|
|
}
|
|
}()
|
|
|
|
// 接收控制信号
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case signal := <-h.ControlChan:
|
|
switch signal {
|
|
case "stop":
|
|
log.Printf("[Raw-ConsumerGroup-%d] 收到停止信号,将停止消费.", h.kafkaConfig.ClientID)
|
|
h.kafkaPaused = true
|
|
case "resume":
|
|
log.Printf("[Raw-ConsumerGroup-%d] 收到恢复信号,将恢复消费.", h.kafkaConfig.ClientID)
|
|
h.kafkaPaused = false
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
log.Printf("[Raw-ConsumerGroup-%d] 准备启动 Kafka 消费者协程。订阅的主题: %v", h.kafkaConfig.ClientID, h.kafkaConfig.Topic)
|
|
|
|
// 创建消费者实例
|
|
consumerInstance := h
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for {
|
|
// 消费 Kafka 消息
|
|
topics := []string{h.kafkaConfig.Topic}
|
|
err1 := client.Consume(ctx, topics, consumerInstance) // 加入消费者组,并订阅指定主题。Kafka会为每个消费者分配相应的分区。
|
|
if err1 != nil {
|
|
log.Printf("[Raw-ConsumerGroup-%d] 订阅主题[%v]异常。%s", h.kafkaConfig.ClientID, h.kafkaConfig.Topic, err1.Error())
|
|
return
|
|
}
|
|
|
|
if ctx.Err() != nil {
|
|
log.Println(ctx.Err())
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
log.Println("RawData Sarama consumer up and running ...")
|
|
wg.Wait()
|
|
}
|
|
|
|
// Setup 在新会话开始时运行
|
|
func (h *RawConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
|
|
// 在此执行任何必要的设置任务。
|
|
log.Printf("data_raw消费者组会话开始,%+v \n", session.Claims())
|
|
return nil
|
|
}
|
|
|
|
// Cleanup 在会话结束时运行
|
|
func (h *RawConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
|
|
// 在此执行任何必要的清理任务。
|
|
log.Println("data_raw消费者组会话结束,", session.Claims())
|
|
return nil
|
|
}
|
|
|
|
// ConsumeClaim 启动消费者循环
|
|
func (h *RawConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
|
log.Printf("data_raw 处理消费者组会话,%+v, MemberID: %v, Topic: %v, Partition: %v \n", session.Claims(), session.MemberID(), claim.Topic(), claim.Partition())
|
|
topic := claim.Topic()
|
|
isDeadLetterQueue := false // 是否为死信队列消息
|
|
if len(topic) > 4 && topic[:4] == "DLP_" {
|
|
isDeadLetterQueue = true
|
|
}
|
|
|
|
if isDeadLetterQueue {
|
|
return h.DLPConsumeClaim(session, claim)
|
|
} else {
|
|
return h.BatchConsumeClaim(session, claim)
|
|
}
|
|
}
|
|
|
|
func (h *RawConsumerGroupHandler) BatchConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
|
//const maxBatchSize = 50 * 1024 // TODO 设置每个批次的最大字节数,例如 50kB
|
|
maxBatchSize := configLoad.LoadConfig().GetInt("performance.master.kafkaConsumer.data_raw.maxBatchSize")
|
|
messageChannel := make(chan map[string][]*sarama.ConsumerMessage, 100)
|
|
topicHandlerKey := fmt.Sprintf("%s_%d", claim.Topic(), claim.Partition())
|
|
msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey)
|
|
if !isValid {
|
|
log.Printf("[Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
|
|
return fmt.Errorf("[Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
|
|
}
|
|
|
|
messageMapPool := sync.Pool{
|
|
New: func() interface{} {
|
|
return make(map[string][]*sarama.ConsumerMessage)
|
|
},
|
|
}
|
|
|
|
// 启动一个 goroutine 来处理消息
|
|
var wg sync.WaitGroup
|
|
|
|
pool, _ := ants.NewPool(100)
|
|
defer pool.Release()
|
|
|
|
var rawRateLimiter = rate.NewLimiter(rate.Every(10*time.Millisecond), 1)
|
|
go func() {
|
|
for thingMessages := range messageChannel {
|
|
_ = rawRateLimiter.Wait(context.Background()) // 控制消费速率
|
|
|
|
wg.Add(len(thingMessages))
|
|
|
|
for k, v := range thingMessages {
|
|
key := k
|
|
msgs := v
|
|
|
|
_ = pool.Submit(func() {
|
|
defer wg.Done()
|
|
defer messageMapPool.Put(thingMessages) // 归还到池中
|
|
|
|
if len(msgs) == 0 {
|
|
return
|
|
}
|
|
|
|
values := make([]string, len(msgs))
|
|
for i, msg := range msgs {
|
|
values[i] = string(msg.Value)
|
|
}
|
|
|
|
// 处理消息并检查是否成功
|
|
isProcessed := msgHandler.(func(thingId string, values []string) bool)(key, values) //msgHandler(key, values)
|
|
if !isProcessed {
|
|
log.Printf("[Raw-ConsumerGroup] 消息处理失败,键: %s,消息: %v", key, msgs)
|
|
} else {
|
|
// 处理成功后,消息标记为已处理
|
|
for _, msg := range msgs {
|
|
session.MarkMessage(msg, "is handled")
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
}()
|
|
|
|
batchBuffer := make(map[string][]*sarama.ConsumerMessage) // 按 thingId 分类的批次缓冲
|
|
currentBatchSize := make(map[string]int) // 记录每个 thingId 的当前批次大小
|
|
|
|
// 定义一个定时器,用于处理剩余消息
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
ticker := time.NewTicker(20 * time.Second)
|
|
defer ticker.Stop()
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
// 处理剩余的消息
|
|
for thingId, msgs := range batchBuffer {
|
|
if len(msgs) > 0 {
|
|
msgMap := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage)
|
|
msgMap[thingId] = msgs
|
|
messageChannel <- msgMap
|
|
delete(batchBuffer, thingId)
|
|
delete(currentBatchSize, thingId) // 统一清理
|
|
}
|
|
}
|
|
case <-ctx.Done():
|
|
return // 退出 Goroutine
|
|
}
|
|
}
|
|
}()
|
|
|
|
// 读消息
|
|
for msg := range claim.Messages() {
|
|
thingId := string(msg.Key)
|
|
if thingId == "" {
|
|
thingId = "thingId-null"
|
|
}
|
|
|
|
// 将消息添加到批次缓冲
|
|
batchBuffer[thingId] = append(batchBuffer[thingId], msg)
|
|
|
|
// 计算当前 key 的消息大小
|
|
currentBatchSize[thingId] += len(msg.Value)
|
|
|
|
// 如果当前批次达到 maxBatchSize,发送到通道并重置
|
|
if currentBatchSize[thingId] >= maxBatchSize {
|
|
// 从池中获取 map 对象
|
|
thingMessages := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage)
|
|
thingMessages[thingId] = batchBuffer[thingId]
|
|
messageChannel <- thingMessages
|
|
delete(batchBuffer, thingId) // 清除已处理的键
|
|
delete(currentBatchSize, thingId) // 清除已处理的大小
|
|
}
|
|
}
|
|
|
|
close(messageChannel)
|
|
wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *RawConsumerGroupHandler) DLPConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
|
topicHandlerKey := "DLP_DATA_RAW"
|
|
msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey)
|
|
if !isValid {
|
|
log.Printf("[DLP-Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
|
|
return fmt.Errorf("[DLP-Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
|
|
}
|
|
|
|
for msg := range claim.Messages() {
|
|
thingId := string(msg.Key)
|
|
if thingId == "" {
|
|
thingId = "thingId-null"
|
|
}
|
|
|
|
// 解析 value
|
|
var value []string
|
|
err := json.Unmarshal(msg.Value, &value)
|
|
if err != nil {
|
|
log.Printf("[DLP_Raw-ConsumerGroup]Failed to unmarshal value: %v", err)
|
|
continue
|
|
}
|
|
|
|
isProcessed := msgHandler.(func(thingId string, values []string) bool)(thingId, value)
|
|
if !isProcessed {
|
|
log.Printf("[DLP_Raw-ConsumerGroup]消息处理失败,thingId: %s,消息: %v", thingId, value)
|
|
} else {
|
|
// 处理成功后,消息标记为已处理
|
|
session.MarkMessage(msg, "is handled")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *RawConsumerGroupHandler) SetTopicHandler(topicHandlerKey string, fun func(thingId string, values []string) bool) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
h.topicHandlers.Store(topicHandlerKey, fun)
|
|
}
|
|
|
|
// 消息消费启动控制
|
|
func (h *RawConsumerGroupHandler) SetKafkaPaused(paused bool) {
|
|
h.kafkaPaused = paused
|
|
if paused {
|
|
log.Println("Kafka消息消费 已暂停.")
|
|
} else {
|
|
log.Println("Kafka消息消费 已恢复.")
|
|
}
|
|
}
|
|
|
|
//// TODO 动态调整消费速率
|
|
//func (h *RawConsumerGroupHandler) SetConsumeRate(interval time.Duration) {
|
|
// h.mu.Lock()
|
|
// defer h.mu.Unlock()
|
|
// rateLimiter.SetLimit(rate.Every(interval))
|
|
//}
|
|
|