diff --git a/master/data_source/data_agg_handler.go b/master/data_source/data_agg_handler.go new file mode 100644 index 0000000..a3d2f8b --- /dev/null +++ b/master/data_source/data_agg_handler.go @@ -0,0 +1,36 @@ +package data_source + +import ( + "log" + "time" +) + +type AggDataHandler struct { + key string + topic string + partitionID int + dataChannel chan *RPCPayload // 用于发送打包后的数据 +} + +func NewAggDataHandler(key, topic string, partitionID int) *AggDataHandler { + handler := &AggDataHandler{ + key: key, + topic: topic, + partitionID: partitionID, + dataChannel: make(chan *RPCPayload, 10), + } + + return handler +} + +func (h *AggDataHandler) HandleMessage(structId string, values []string) bool { + h.dataChannel <- &RPCPayload{Id: structId, Messages: values} + log.Printf("****** AggDataHandler.HandleMessage() ,h.dataChannel【%p】通道数据量:%d/%d", h.dataChannel, len(h.dataChannel), cap(h.dataChannel)) + time.Sleep(50 * time.Millisecond) + return true +} + +// GetDataChannel 返回 dataChannel +func (h *AggDataHandler) GetDataChannel() chan *RPCPayload { + return h.dataChannel +} diff --git a/dataSource/kafka/aggData_test.go b/master/data_source/data_agg_handler_test.go similarity index 97% rename from dataSource/kafka/aggData_test.go rename to master/data_source/data_agg_handler_test.go index 907162d..0391c2a 100644 --- a/dataSource/kafka/aggData_test.go +++ b/master/data_source/data_agg_handler_test.go @@ -1,4 +1,4 @@ -package kafka +package data_source import ( "encoding/json" @@ -13,7 +13,7 @@ func TestAggDataHandler_HandleMessage(t *testing.T) { aggDataMsg := ` {"date":"2024-09-19T09:39:59.999+0800","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}} ` - h.HandleMessage(aggDataMsg) + h.HandleMessage("1", []string{aggDataMsg}) } func TestFormatTime(t *testing.T) { diff --git a/master/data_source/data_raw_handler.go b/master/data_source/data_raw_handler.go new file mode 100644 index 0000000..2268451 --- /dev/null +++ b/master/data_source/data_raw_handler.go @@ -0,0 +1,49 @@ +package data_source + +import ( + "log" + "time" +) + +// IMessageHandler 是 kafka 消息处理者接口 +type IMessageHandler interface { + HandleMessage(key string, values []string) bool + GetDataChannel() chan *RPCPayload +} + +type RPCPayload struct { + Id string + Messages []string +} + +type RawDataHandler struct { + key string + topic string + partitionID int + dataChannel chan *RPCPayload +} + +// 创建一个新的 RawDataHandler 实例 +func NewRawDataHandler(key, topic string, partitionID int) *RawDataHandler { + handler := &RawDataHandler{ + key: key, + topic: topic, + partitionID: partitionID, + dataChannel: make(chan *RPCPayload, 10), + } + + return handler +} + +// 在 kafka_dataSource.go 的 Producer() 中被使用 +func (h *RawDataHandler) HandleMessage(thingId string, values []string) bool { + h.dataChannel <- &RPCPayload{Id: thingId, Messages: values} + log.Printf("--> RawDataHandler%d ,h.dataChannel【%p】通道数据量: %d/%d", h.partitionID, h.dataChannel, len(h.dataChannel), cap(h.dataChannel)) + time.Sleep(50 * time.Millisecond) + return true +} + +// GetDataChannel 返回 dataChannel +func (h *RawDataHandler) GetDataChannel() chan *RPCPayload { + return h.dataChannel +} diff --git a/master/data_source/kafka_consumerGroup_aggHandler.go b/master/data_source/kafka_consumerGroup_aggHandler.go new file mode 100644 index 0000000..0de36a8 --- /dev/null +++ b/master/data_source/kafka_consumerGroup_aggHandler.go @@ -0,0 +1,317 @@ +package data_source + +import ( + "context" + "encoding/json" + "fmt" + "gitea.anxinyun.cn/container/common_utils/configLoad" + "github.com/IBM/sarama" + "github.com/panjf2000/ants/v2" + "golang.org/x/time/rate" + "log" + "sync" + "time" +) + +type AggConsumerGroupHandler struct { + kafkaConfig KafkaConfig + topicHandlers sync.Map // 主题处理方法 + + kafkaPaused bool //是否处于暂停数据接收状态 + ControlChan chan string // 控制信号通道 + + mu sync.RWMutex +} + +func NewAggConsumerGroupHandler(kafkaConfig KafkaConfig) *AggConsumerGroupHandler { + return &AggConsumerGroupHandler{ + kafkaConfig: kafkaConfig, + ControlChan: make(chan string), + } +} + +func (h *AggConsumerGroupHandler) ConnectConsumerGroup() { + log.Println("AggData kafka init...") + vp := configLoad.LoadConfig() + minFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.minFetch") + maxFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxFetch") + maxWaitTime := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxWaitTime") + + // 消费者配置信息 + config := sarama.NewConfig() + config.Consumer.Return.Errors = false // 不返回消费过程中的错误 + config.Version = sarama.V2_0_0_0 + config.Consumer.Offsets.Initial = sarama.OffsetOldest // 从最旧的消息开始消费 + config.Consumer.Offsets.AutoCommit.Enable = true // 启动自动提交偏移量 + config.Consumer.Offsets.AutoCommit.Interval = 1000 * time.Millisecond + config.Consumer.Fetch.Min = minFetch // 最小拉取 10 KB + config.Consumer.Fetch.Max = maxFetch // 最大拉取 5 MB + config.Consumer.MaxWaitTime = time.Duration(maxWaitTime) * time.Millisecond // 最大等待时间 ms + config.Consumer.Retry.Backoff = 10000 * time.Millisecond // 消费失败后重试的延迟时间 + //config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {} + config.Consumer.Group.Session.Timeout = 60000 * time.Millisecond // 消费者的心跳 默认10s -> 30s + config.Consumer.Group.Heartbeat.Interval = 6000 * time.Millisecond // Heartbeat 这个值必须小于 session.timeout.ms ,一般小于 session.timeout.ms/3,默认是3s + config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} // 设置消费者组的负载均衡策略为轮询策略 + + // 创建消费者组 + client, err := sarama.NewConsumerGroup(h.kafkaConfig.Brokers, h.kafkaConfig.GroupID, config) + if err != nil { + panic(err) + } + defer func() { + _ = client.Close() + }() + + // 启动错误处理协程 + go func() { + for err := range client.Errors() { + log.Println("消费错误:", err) + } + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // 接收控制信号 + go func() { + for { + select { + case signal := <-h.ControlChan: + switch signal { + case "stop": + log.Printf("[Agg-ConsumerGroup-%d] 收到停止信号,将停止消费.", h.kafkaConfig.ClientID) + h.kafkaPaused = true + case "resume": + log.Printf("[Agg-ConsumerGroup-%d] 收到恢复信号,将恢复消费.", h.kafkaConfig.ClientID) + h.kafkaPaused = false + } + } + } + }() + + log.Printf("[Agg-ConsumerGroup-%d] 准备启动 Kafka 消费者协程。订阅的主题: %v", h.kafkaConfig.ClientID, h.kafkaConfig.Topic) + + // 创建消费者实例 + consumerInstance := h + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + topics := []string{h.kafkaConfig.Topic} + err1 := client.Consume(ctx, topics, consumerInstance) + if err1 != nil { + log.Printf("[Agg-ConsumerGroup-%d] 订阅主题[%v]异常。%s", h.kafkaConfig.ClientID, h.kafkaConfig.Topic, err1.Error()) + return + } + + if ctx.Err() != nil { + log.Println(ctx.Err()) + return + } + } + }() + + log.Println("AggData Sarama consumer up and running ...") + wg.Wait() +} + +func (h *AggConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { + // 在此执行任何必要的设置任务。 + log.Printf("data_agg消费者组会话开始,%+v", session.Claims()) + return nil +} + +func (h *AggConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { + // 在此执行任何必要的清理任务。 + log.Println("data_agg消费者组会话结束,", session.Claims()) + return nil +} + +func (h *AggConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + log.Printf("data_agg 处理消费者组会话,%+v. MemberID: %v, Topic: %v, Partition: %v \n", session.Claims(), session.MemberID(), claim.Topic(), claim.Partition()) + topic := claim.Topic() + isDeadLetterQueue := false // 是否为死信队列消息 + if len(topic) > 4 && topic[:4] == "DLP_" { + isDeadLetterQueue = true + } + + if isDeadLetterQueue { + return h.DLPConsumeClaim(session, claim) + } else { + return h.BatchConsumeClaim(session, claim) + } +} + +func (h *AggConsumerGroupHandler) BatchConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + maxBatchSize := configLoad.LoadConfig().GetInt("performance.master.kafkaConsumer.data_agg.maxBatchSize") + messageChannel := make(chan map[string][]*sarama.ConsumerMessage, 50) + topicHandlerKey := fmt.Sprintf("%s_%d", claim.Topic(), claim.Partition()) + msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey) + if !isValid { + log.Printf("[Agg-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition()) + return fmt.Errorf("[Agg-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition()) + } + + // 使用 sync.Pool 复用 map 对象 + messageMapPool := sync.Pool{ + New: func() interface{} { + return make(map[string][]*sarama.ConsumerMessage) + }, + } + + // 启动一个 goroutine 来处理消息 + var wg sync.WaitGroup + pool, _ := ants.NewPool(100) + defer pool.Release() + + //创建一个速率限制器,它允许每 10 毫秒执行一次操作,且在这个时间窗口内最多只能执行一次。 + var aggRateLimiter = rate.NewLimiter(rate.Every(10*time.Millisecond), 1) + go func() { + for structMessages := range messageChannel { + _ = aggRateLimiter.Wait(context.Background()) // 控制消费速率 + + wg.Add(len(structMessages)) + + for k, v := range structMessages { + key := k + msgs := v + + _ = pool.Submit(func() { + defer wg.Done() + defer messageMapPool.Put(structMessages) + + if len(msgs) == 0 { + return + } + + values := make([]string, len(msgs)) + for i, msg := range msgs { + values[i] = string(msg.Value) + } + + // 处理消息并检查是否成功 + isProcessed := msgHandler.(func(structId string, values []string) bool)(key, values) //msgHandler(key, values) + if !isProcessed { + log.Printf("[Agg-ConsumerGroup] 消息处理失败,键: %s,消息: %v", key, msgs) + } else { + // 处理成功后,消息标记为已处理 + for _, msg := range msgs { + session.MarkMessage(msg, "is handled") + } + } + }) + } + } + }() + + batchBuffer := make(map[string][]*sarama.ConsumerMessage) // 按键分类的批次缓冲 + currentBatchSize := make(map[string]int) // 记录每个 key 的当前批次大小 + + // 定义一个定时器,用于处理残余消息 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ticker := time.NewTicker(20 * time.Second) + defer ticker.Stop() + go func() { + for { + select { + case <-ticker.C: + // 处理剩余的消息 + for structId, msgs := range batchBuffer { + if len(msgs) > 0 { + // 从池中获取 map 对象 + msgMap := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage) + msgMap[structId] = msgs + messageChannel <- msgMap + delete(batchBuffer, structId) // 清除已处理的键 + delete(currentBatchSize, structId) + } + } + case <-ctx.Done(): + return // 退出 Goroutine + } + } + }() + + // 读消息 + for msg := range claim.Messages() { + structId := string(msg.Key) + if structId == "" { + structId = "structId-null" + } + + // 将消息添加到批次缓冲 + batchBuffer[structId] = append(batchBuffer[structId], msg) + + // 计算当前 key 的消息大小 + currentBatchSize[structId] += len(msg.Value) + + // 如果当前批次达到 maxBatchSize,发送到通道并重置 + if currentBatchSize[structId] >= maxBatchSize { + // 从池中获取 map 对象 + msgMap := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage) + msgMap[structId] = batchBuffer[structId] + messageChannel <- msgMap + delete(batchBuffer, structId) // 清除已处理的键 + delete(currentBatchSize, structId) // 清除已处理的大小 + } + } + + close(messageChannel) + wg.Wait() + + return nil +} + +func (h *AggConsumerGroupHandler) DLPConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + topicHandlerKey := "DLP_DATA_AGG" + msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey) + if !isValid { + log.Printf("[DLP-Agg-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition()) + return fmt.Errorf("[DLP-Agg-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition()) + } + + for msg := range claim.Messages() { + structId := string(msg.Key) + if structId == "" { + structId = "structId-null" + } + + // 解析 value + var value []string + err := json.Unmarshal(msg.Value, &value) + if err != nil { + log.Printf("[DLP_Agg-ConsumerGroup]Failed to unmarshal value: %v", err) + continue + } + + isProcessed := msgHandler.(func(structId string, values []string) bool)(structId, value) + if !isProcessed { + log.Printf("[DLP_Agg-ConsumerGroup]消息处理失败,structId: %s,消息: %v", structId, value) + } else { + // 处理成功后,消息标记为已处理 + session.MarkMessage(msg, "is handled") + } + } + + return nil +} + +func (h *AggConsumerGroupHandler) SetTopicHandler(topicHandlerKey string, fun func(structId string, values []string) bool) { + h.mu.Lock() + defer h.mu.Unlock() + h.topicHandlers.Store(topicHandlerKey, fun) +} + +// 消息消费启动控制 +func (h *AggConsumerGroupHandler) SetKafkaPaused(paused bool) { + h.kafkaPaused = paused + if paused { + log.Println("Kafka消息消费 已暂停.") + } else { + log.Println("Kafka消息消费 已恢复.") + } +} diff --git a/master/data_source/kafka_consumerGroup_iotaHandler.go b/master/data_source/kafka_consumerGroup_iotaHandler.go new file mode 100644 index 0000000..15a7076 --- /dev/null +++ b/master/data_source/kafka_consumerGroup_iotaHandler.go @@ -0,0 +1,329 @@ +package data_source + +import ( + "context" + "encoding/json" + "fmt" + "gitea.anxinyun.cn/container/common_utils/configLoad" + "github.com/IBM/sarama" + "github.com/panjf2000/ants/v2" + "golang.org/x/time/rate" + "log" + "sync" + "time" +) + +type RawConsumerGroupHandler struct { + kafkaConfig KafkaConfig + topicHandlers sync.Map // 分区主题处理方法 + + kafkaPaused bool //是否处于暂停数据接收状态 + ControlChan chan string // 控制信号通道 + + mu sync.RWMutex +} + +func NewRawConsumerGroupHandler(kafkaConfig KafkaConfig) *RawConsumerGroupHandler { + return &RawConsumerGroupHandler{ + kafkaConfig: kafkaConfig, + ControlChan: make(chan string), + } +} + +func (h *RawConsumerGroupHandler) ConnectConsumerGroup() { + log.Println("RawData kafka init...") + vp := configLoad.LoadConfig() + minFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.minFetch") + maxFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxFetch") + maxWaitTime := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxWaitTime") + + // 消费者配置信息 + config := sarama.NewConfig() + config.Consumer.Return.Errors = false // 不返回消费过程中的错误 + config.Version = sarama.V2_0_0_0 + config.Consumer.Offsets.Initial = sarama.OffsetOldest // 从最旧的消息开始消费 + config.Consumer.Offsets.AutoCommit.Enable = true // 启动自动提交偏移量 + config.Consumer.Offsets.AutoCommit.Interval = 1000 * time.Millisecond + config.Consumer.Fetch.Min = minFetch // 最小拉取 10 KB + config.Consumer.Fetch.Max = maxFetch // 最大拉取 5 MB + config.Consumer.MaxWaitTime = time.Duration(maxWaitTime) * time.Millisecond // 最大等待时间 ms + config.Consumer.Retry.Backoff = 10000 * time.Millisecond // 消费失败后重试的延迟时间 + //config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {} + config.Consumer.Group.Session.Timeout = 60000 * time.Millisecond // 消费者的心跳 默认10s -> 30s + config.Consumer.Group.Heartbeat.Interval = 6000 * time.Millisecond // Heartbeat 这个值必须小于 session.timeout.ms ,一般小于 session.timeout.ms/3,默认是3s + config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} // 设置消费者组的负载均衡策略为轮询策略 + + // 创建消费者组 + client, err := sarama.NewConsumerGroup(h.kafkaConfig.Brokers, h.kafkaConfig.GroupID, config) + if err != nil { + panic(err) + } + defer func() { + _ = client.Close() + }() + + // 启动错误处理协程 + go func() { + for err := range client.Errors() { + log.Println("消费错误:", err) + } + }() + + // 接收控制信号 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for { + select { + case signal := <-h.ControlChan: + switch signal { + case "stop": + log.Printf("[Raw-ConsumerGroup-%d] 收到停止信号,将停止消费.", h.kafkaConfig.ClientID) + h.kafkaPaused = true + case "resume": + log.Printf("[Raw-ConsumerGroup-%d] 收到恢复信号,将恢复消费.", h.kafkaConfig.ClientID) + h.kafkaPaused = false + } + case <-ctx.Done(): + return + } + } + }() + + log.Printf("[Raw-ConsumerGroup-%d] 准备启动 Kafka 消费者协程。订阅的主题: %v", h.kafkaConfig.ClientID, h.kafkaConfig.Topic) + + // 创建消费者实例 + consumerInstance := h + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + // 消费 Kafka 消息 + topics := []string{h.kafkaConfig.Topic} + err1 := client.Consume(ctx, topics, consumerInstance) // 加入消费者组,并订阅指定主题。Kafka会为每个消费者分配相应的分区。 + if err1 != nil { + log.Printf("[Raw-ConsumerGroup-%d] 订阅主题[%v]异常。%s", h.kafkaConfig.ClientID, h.kafkaConfig.Topic, err1.Error()) + return + } + + if ctx.Err() != nil { + log.Println(ctx.Err()) + return + } + } + }() + + log.Println("RawData Sarama consumer up and running ...") + wg.Wait() +} + +// Setup 在新会话开始时运行 +func (h *RawConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { + // 在此执行任何必要的设置任务。 + log.Printf("data_raw消费者组会话开始,%+v \n", session.Claims()) + return nil +} + +// Cleanup 在会话结束时运行 +func (h *RawConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { + // 在此执行任何必要的清理任务。 + log.Println("data_raw消费者组会话结束,", session.Claims()) + return nil +} + +// ConsumeClaim 启动消费者循环 +func (h *RawConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + log.Printf("data_raw 处理消费者组会话,%+v, MemberID: %v, Topic: %v, Partition: %v \n", session.Claims(), session.MemberID(), claim.Topic(), claim.Partition()) + topic := claim.Topic() + isDeadLetterQueue := false // 是否为死信队列消息 + if len(topic) > 4 && topic[:4] == "DLP_" { + isDeadLetterQueue = true + } + + if isDeadLetterQueue { + return h.DLPConsumeClaim(session, claim) + } else { + return h.BatchConsumeClaim(session, claim) + } +} + +func (h *RawConsumerGroupHandler) BatchConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + //const maxBatchSize = 50 * 1024 // TODO 设置每个批次的最大字节数,例如 50kB + maxBatchSize := configLoad.LoadConfig().GetInt("performance.master.kafkaConsumer.data_raw.maxBatchSize") + messageChannel := make(chan map[string][]*sarama.ConsumerMessage, 100) + topicHandlerKey := fmt.Sprintf("%s_%d", claim.Topic(), claim.Partition()) + msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey) + if !isValid { + log.Printf("[Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition()) + return fmt.Errorf("[Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition()) + } + + messageMapPool := sync.Pool{ + New: func() interface{} { + return make(map[string][]*sarama.ConsumerMessage) + }, + } + + // 启动一个 goroutine 来处理消息 + var wg sync.WaitGroup + + pool, _ := ants.NewPool(100) + defer pool.Release() + + var rawRateLimiter = rate.NewLimiter(rate.Every(10*time.Millisecond), 1) + go func() { + for thingMessages := range messageChannel { + _ = rawRateLimiter.Wait(context.Background()) // 控制消费速率 + + wg.Add(len(thingMessages)) + + for k, v := range thingMessages { + key := k + msgs := v + + _ = pool.Submit(func() { + defer wg.Done() + defer messageMapPool.Put(thingMessages) // 归还到池中 + + if len(msgs) == 0 { + return + } + + values := make([]string, len(msgs)) + for i, msg := range msgs { + values[i] = string(msg.Value) + } + + // 处理消息并检查是否成功 + isProcessed := msgHandler.(func(thingId string, values []string) bool)(key, values) //msgHandler(key, values) + if !isProcessed { + log.Printf("[Raw-ConsumerGroup] 消息处理失败,键: %s,消息: %v", key, msgs) + } else { + // 处理成功后,消息标记为已处理 + for _, msg := range msgs { + session.MarkMessage(msg, "is handled") + } + } + }) + } + } + }() + + batchBuffer := make(map[string][]*sarama.ConsumerMessage) // 按 thingId 分类的批次缓冲 + currentBatchSize := make(map[string]int) // 记录每个 thingId 的当前批次大小 + + // 定义一个定时器,用于处理剩余消息 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ticker := time.NewTicker(20 * time.Second) + defer ticker.Stop() + go func() { + for { + select { + case <-ticker.C: + // 处理剩余的消息 + for thingId, msgs := range batchBuffer { + if len(msgs) > 0 { + msgMap := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage) + msgMap[thingId] = msgs + messageChannel <- msgMap + delete(batchBuffer, thingId) + delete(currentBatchSize, thingId) // 统一清理 + } + } + case <-ctx.Done(): + return // 退出 Goroutine + } + } + }() + + // 读消息 + for msg := range claim.Messages() { + thingId := string(msg.Key) + if thingId == "" { + thingId = "thingId-null" + } + + // 将消息添加到批次缓冲 + batchBuffer[thingId] = append(batchBuffer[thingId], msg) + + // 计算当前 key 的消息大小 + currentBatchSize[thingId] += len(msg.Value) + + // 如果当前批次达到 maxBatchSize,发送到通道并重置 + if currentBatchSize[thingId] >= maxBatchSize { + // 从池中获取 map 对象 + thingMessages := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage) + thingMessages[thingId] = batchBuffer[thingId] + messageChannel <- thingMessages + delete(batchBuffer, thingId) // 清除已处理的键 + delete(currentBatchSize, thingId) // 清除已处理的大小 + } + } + + close(messageChannel) + wg.Wait() + + return nil +} + +func (h *RawConsumerGroupHandler) DLPConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + topicHandlerKey := "DLP_DATA_RAW" + msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey) + if !isValid { + log.Printf("[DLP-Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition()) + return fmt.Errorf("[DLP-Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition()) + } + + for msg := range claim.Messages() { + thingId := string(msg.Key) + if thingId == "" { + thingId = "thingId-null" + } + + // 解析 value + var value []string + err := json.Unmarshal(msg.Value, &value) + if err != nil { + log.Printf("[DLP_Raw-ConsumerGroup]Failed to unmarshal value: %v", err) + continue + } + + isProcessed := msgHandler.(func(thingId string, values []string) bool)(thingId, value) + if !isProcessed { + log.Printf("[DLP_Raw-ConsumerGroup]消息处理失败,thingId: %s,消息: %v", thingId, value) + } else { + // 处理成功后,消息标记为已处理 + session.MarkMessage(msg, "is handled") + } + } + + return nil +} + +func (h *RawConsumerGroupHandler) SetTopicHandler(topicHandlerKey string, fun func(thingId string, values []string) bool) { + h.mu.Lock() + defer h.mu.Unlock() + h.topicHandlers.Store(topicHandlerKey, fun) +} + +// 消息消费启动控制 +func (h *RawConsumerGroupHandler) SetKafkaPaused(paused bool) { + h.kafkaPaused = paused + if paused { + log.Println("Kafka消息消费 已暂停.") + } else { + log.Println("Kafka消息消费 已恢复.") + } +} + +//// TODO 动态调整消费速率 +//func (h *RawConsumerGroupHandler) SetConsumeRate(interval time.Duration) { +// h.mu.Lock() +// defer h.mu.Unlock() +// rateLimiter.SetLimit(rate.Every(interval)) +//} diff --git a/master/data_source/kafka_dataSource.go b/master/data_source/kafka_dataSource.go new file mode 100644 index 0000000..4ddc23f --- /dev/null +++ b/master/data_source/kafka_dataSource.go @@ -0,0 +1,278 @@ +package data_source + +import ( + "fmt" + "gitea.anxinyun.cn/container/common_utils/configLoad" + "github.com/spf13/viper" + "log" + "sync" +) + +type KafkaConfig struct { + ClientID int // 暂时无用 + Brokers []string + GroupID string + Topic string + Partitions int +} + +type TopicConfig struct { + Topic string + Partitions int +} + +type KafkaConsumerConfig struct { + RawData *RawDataConfig //`yaml:"data_raw"` + AggData *AggDataConfig // `yaml:"data_agg"` +} + +type RawDataConfig struct { + MaxBatchSize int `yaml:"maxBatchSize"` + IotaBufSize int `yaml:"iotaBufSize"` + ProcessBufSize int `yaml:"processBufSize"` +} + +type AggDataConfig struct { + MaxBatchSize int `yaml:"maxBatchSize"` + AggBufSize int `yaml:"aggBufSize"` +} + +type KafkaDataSource struct { + groupId string + Brokers []string + Topics map[string]TopicConfig + Master_kafkaConsumer_config *KafkaConsumerConfig // 性能配置 + RawDataHandlers *sync.Map // 原始数据处理器 + AggDataHandlers *sync.Map // 聚集数据处理器 + + kafkaPaused bool + controlChan chan string // 控制信号通道 +} + +func NewKafkaDataSource() *KafkaDataSource { + k := &KafkaDataSource{ + controlChan: make(chan string), + } + + k.loadKafkaConfig() + return k +} + +func (s *KafkaDataSource) loadKafkaConfig() { + vp := configLoad.LoadConfig() + groupId := vp.GetString("kafka.groupId") + brokers := vp.GetStringSlice("kafka.Brokers") + log.Println("消费者组 kafka.groupId:", groupId) + + s.groupId = groupId + s.Brokers = brokers + s.loadTopics(vp) + s.loadKafkaConsumerConfig(vp) +} +func (s *KafkaDataSource) loadTopics(vp *viper.Viper) { + topics := make(map[string]TopicConfig) + + // 定义要加载的主题列表 + topicNames := []string{"data_raw", "data_agg"} + + for _, topicName := range topicNames { + topic := vp.GetString(fmt.Sprintf("kafka.Topics.%s.topic", topicName)) + if topic == "" { + log.Printf("主题 kafka.Topics.%s.topic 配置为空", topicName) + continue + } + + partitions := vp.GetInt(fmt.Sprintf("kafka.Topics.%s.partitions", topicName)) + if partitions <= 0 { + partitions = 1 + } + + topics[topicName] = TopicConfig{ + Topic: topic, + Partitions: partitions, + } + } + + s.Topics = topics +} +func (s *KafkaDataSource) loadKafkaConsumerConfig(vp *viper.Viper) { + // 获取 kafkaConsumer 部分的配置 + kafkaConsumerKey := "performance.master.kafkaConsumer" + if !vp.IsSet(kafkaConsumerKey) { + log.Panicf("配置 %s 必须存在", kafkaConsumerKey) + } + + // 创建 KafkaConsumerConfig 实例 + config := &KafkaConsumerConfig{} + + // 解析 data_raw 配置 + if vp.IsSet(kafkaConsumerKey + ".data_raw") { + dataRaw := &RawDataConfig{} + if err := vp.UnmarshalKey(kafkaConsumerKey+".data_raw", dataRaw); err != nil { + log.Panicf("解析 data_raw 配置失败: %v\n", err) + } else { + config.RawData = dataRaw + } + } + + // 解析 data_agg 配置 + if vp.IsSet(kafkaConsumerKey + ".data_agg") { + dataAgg := &AggDataConfig{} + if err := vp.UnmarshalKey(kafkaConsumerKey+".data_agg", dataAgg); err != nil { + log.Panicf("解析 data_agg 配置失败: %v\n", err) + } else { + config.AggData = dataAgg + } + } + + s.Master_kafkaConsumer_config = config +} + +func (s *KafkaDataSource) AggDataProducer() { + var wg sync.WaitGroup + const topicCfgKey = "data_agg" + topicCfg := s.Topics[topicCfgKey] + + if topicCfg.Topic == "" { + log.Printf("Error: 启动 AggData Producer 失败,无 kafka.topics.data_agg 配置。") + return + } + + if s.Master_kafkaConsumer_config.AggData == nil { + log.Printf("Error: 启动 AggData Producer 失败,无 performance.master.kafkaConsumer.data_agg 配置。") + return + } + + // 启动工作协程 + wg.Add(1) + go func(clientID int) { + defer wg.Done() + kafkaHandler := NewAggConsumerGroupHandler(KafkaConfig{ + Brokers: s.Brokers, + GroupID: s.groupId, + Topic: topicCfg.Topic, + Partitions: topicCfg.Partitions, + ClientID: clientID, + }) + kafkaHandler.ControlChan = s.controlChan + + // 装载配置 + for partId := 0; partId < topicCfg.Partitions; partId++ { + key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId) + msgHandler, ok := s.getDataHandler(topicCfgKey, key) + + if !ok || msgHandler == nil { + log.Panicf("Kafka topic[%s] 未定义data_agg 消息处理者,跳过。\n", key) + continue + } + + // 把消息传递给 dataSource/kafka/AggDataHandler.HandleMessage([]string) + kafkaHandler.SetTopicHandler(key, msgHandler.HandleMessage) + } + + // 失败消息处理者 + dlpKey := "DLP_DATA_RAW" + dataHandler, ok := s.RawDataHandlers.Load(dlpKey) + if !ok { + log.Panicf("Kafka topic[%s] 未定义消息处理者,跳过。\n", dlpKey) + } + msgHandler, _ := dataHandler.(IMessageHandler) + kafkaHandler.SetTopicHandler(dlpKey, msgHandler.HandleMessage) + + // 启动消费组 + kafkaHandler.ConnectConsumerGroup() + + }(1) + + wg.Wait() +} + +// Producer 将 kafka message -> 各数据模型 -> 各数据通道 +func (s *KafkaDataSource) RawDataProducer() { + var wg sync.WaitGroup + const topicCfgKey = "data_raw" + topicCfg := s.Topics[topicCfgKey] + + if topicCfg.Topic == "" { + log.Printf("Error: 启动 RawData Producer 失败,无 kafka.topics.data_raw 配置。") + return + } + + if s.Master_kafkaConsumer_config.RawData == nil { + log.Printf("Error: 启动 RawData Producer 失败,无 performance.master.kafkaConsumer.data_raw 配置。") + return + } + + // 启动工作协程 + wg.Add(1) + go func(clientID int) { + defer wg.Done() + kafkaHandler := NewRawConsumerGroupHandler(KafkaConfig{ + Brokers: s.Brokers, + GroupID: s.groupId, + Topic: topicCfg.Topic, + Partitions: topicCfg.Partitions, + ClientID: clientID, + }) + kafkaHandler.ControlChan = s.controlChan + + for partId := 0; partId < topicCfg.Partitions; partId++ { + key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId) + msgHandler, ok := s.getDataHandler(topicCfgKey, key) + if !ok || msgHandler == nil { + log.Panicf("Kafka topic[%s] 未定义消息处理者,跳过。\n", key) + continue + } + // 把消息传递给 dataSource/kafka/RawDataHandler.HandleMessage([]string) + kafkaHandler.SetTopicHandler(key, msgHandler.HandleMessage) + } + + // 失败消息处理者 + dlpKey := "DLP_DATA_RAW" + dataHandler, ok := s.RawDataHandlers.Load(dlpKey) + if !ok { + log.Panicf("Kafka topic[%s] 未定义消息处理者,跳过。\n", dlpKey) + } + msgHandler, _ := dataHandler.(IMessageHandler) + kafkaHandler.SetTopicHandler(dlpKey, msgHandler.HandleMessage) + + //启动消费 + kafkaHandler.ConnectConsumerGroup() + + }(1) + + wg.Wait() +} + +// 根据 key 获取 dataHandler +func (s *KafkaDataSource) getDataHandler(topicCfg, key string) (IMessageHandler, bool) { + var dataHandler any + var exists bool + + if topicCfg == "data_agg" { + dataHandler, exists = s.AggDataHandlers.Load(key) + } else if topicCfg == "data_raw" { + dataHandler, exists = s.RawDataHandlers.Load(key) + } + + if !exists { + return nil, false + } + + handler, ok := dataHandler.(IMessageHandler) + if !ok { + return nil, false + } + + return handler, true +} + +// 发送停止信号 +func (s *KafkaDataSource) StopConsumers() { + s.controlChan <- "stop" +} + +// 发送恢复信号 +func (s *KafkaDataSource) ResumeConsumers() { + s.controlChan <- "resume" +} diff --git a/master/data_source/kafka_producer.go b/master/data_source/kafka_producer.go new file mode 100644 index 0000000..c223717 --- /dev/null +++ b/master/data_source/kafka_producer.go @@ -0,0 +1,67 @@ +package data_source + +import ( + "encoding/json" + "fmt" + "github.com/IBM/sarama" + "log" + "time" +) + +type KafkaProducer struct { + producer sarama.SyncProducer + brokers []string +} + +func NewKafkaProducer(brokers []string) (*KafkaProducer, error) { + // 配置 Kafka 生产者 + producerConfig := sarama.NewConfig() + producerConfig.Producer.Return.Successes = true // 返回成功发送的消息 + producerConfig.Producer.Return.Errors = true // 返回发送失败的消息 + producerConfig.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认 + producerConfig.Producer.Timeout = 10 * time.Second // 生产者超时时间 + producerConfig.Producer.Retry.Max = 3 // 最大重试次数 + producerConfig.Producer.Retry.Backoff = 100 * time.Millisecond // 重试间隔时间 + producerConfig.Producer.Compression = sarama.CompressionSnappy // 使用 Snappy 压缩 + producerConfig.Producer.MaxMessageBytes = 1024 * 1024 * 4 // 单条消息最大 4MB + + producer, err := sarama.NewSyncProducer(brokers, producerConfig) + if err != nil { + return nil, fmt.Errorf("failed to create Kafka producer:%v", err) + } + + return &KafkaProducer{ + producer: producer, + brokers: brokers, + }, nil +} + +// 实现将 messages 发送到 Kafka 的指定 topic +func (kp *KafkaProducer) SendStringArrayMessage(topic, msgKey string, values []string) error { + // 将 value 序列化为 JSON + valueBytes, err := json.Marshal(values) + if err != nil { + return fmt.Errorf("failed to marshal value: %v", err) + } + + // 构造 Kafka 消息 + msg := &sarama.ProducerMessage{ + Topic: topic, + Key: sarama.StringEncoder(msgKey), + Value: sarama.ByteEncoder(valueBytes), + } + + // 发送消息 + _, _, err = kp.producer.SendMessage(msg) + if err != nil { + return fmt.Errorf("failed to send message to Kafka: %v", err) + } + + log.Printf("Message sent successfully: key=%s, len(value)=%v", msgKey, len(values)) + return nil +} + +// Close 关闭 Kafka 生产者 +func (kp *KafkaProducer) Close() error { + return kp.producer.Close() +}