From 5274bf11bfdbdd874cf40b6e0fb9fd9f763fcdf5 Mon Sep 17 00:00:00 2001
From: yfh <yuan.fenghua@free-sun.com.cn>
Date: Mon, 24 Feb 2025 22:11:30 +0800
Subject: [PATCH] =?UTF-8?q?Master=E7=9A=84Kafka=E6=B6=88=E8=B4=B9=E9=87=8D?=
 =?UTF-8?q?=E6=9E=84?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 master/data_source/data_agg_handler.go        |  36 ++
 .../data_source/data_agg_handler_test.go      |   4 +-
 master/data_source/data_raw_handler.go        |  49 +++
 .../kafka_consumerGroup_aggHandler.go         | 317 +++++++++++++++++
 .../kafka_consumerGroup_iotaHandler.go        | 329 ++++++++++++++++++
 master/data_source/kafka_dataSource.go        | 278 +++++++++++++++
 master/data_source/kafka_producer.go          |  67 ++++
 7 files changed, 1078 insertions(+), 2 deletions(-)
 create mode 100644 master/data_source/data_agg_handler.go
 rename dataSource/kafka/aggData_test.go => master/data_source/data_agg_handler_test.go (97%)
 create mode 100644 master/data_source/data_raw_handler.go
 create mode 100644 master/data_source/kafka_consumerGroup_aggHandler.go
 create mode 100644 master/data_source/kafka_consumerGroup_iotaHandler.go
 create mode 100644 master/data_source/kafka_dataSource.go
 create mode 100644 master/data_source/kafka_producer.go

diff --git a/master/data_source/data_agg_handler.go b/master/data_source/data_agg_handler.go
new file mode 100644
index 0000000..a3d2f8b
--- /dev/null
+++ b/master/data_source/data_agg_handler.go
@@ -0,0 +1,36 @@
+package data_source
+
+import (
+	"log"
+	"time"
+)
+
+type AggDataHandler struct {
+	key         string
+	topic       string
+	partitionID int
+	dataChannel chan *RPCPayload // 用于发送打包后的数据
+}
+
+func NewAggDataHandler(key, topic string, partitionID int) *AggDataHandler {
+	handler := &AggDataHandler{
+		key:         key,
+		topic:       topic,
+		partitionID: partitionID,
+		dataChannel: make(chan *RPCPayload, 10),
+	}
+
+	return handler
+}
+
+func (h *AggDataHandler) HandleMessage(structId string, values []string) bool {
+	h.dataChannel <- &RPCPayload{Id: structId, Messages: values}
+	log.Printf("****** AggDataHandler.HandleMessage() ,h.dataChannel【%p】通道数据量:%d/%d", h.dataChannel, len(h.dataChannel), cap(h.dataChannel))
+	time.Sleep(50 * time.Millisecond)
+	return true
+}
+
+// GetDataChannel 返回 dataChannel
+func (h *AggDataHandler) GetDataChannel() chan *RPCPayload {
+	return h.dataChannel
+}
diff --git a/dataSource/kafka/aggData_test.go b/master/data_source/data_agg_handler_test.go
similarity index 97%
rename from dataSource/kafka/aggData_test.go
rename to master/data_source/data_agg_handler_test.go
index 907162d..0391c2a 100644
--- a/dataSource/kafka/aggData_test.go
+++ b/master/data_source/data_agg_handler_test.go
@@ -1,4 +1,4 @@
-package kafka
+package data_source
 
 import (
 	"encoding/json"
@@ -13,7 +13,7 @@ func TestAggDataHandler_HandleMessage(t *testing.T) {
 	aggDataMsg := `
 {"date":"2024-09-19T09:39:59.999+0800","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}}
 `
-	h.HandleMessage(aggDataMsg)
+	h.HandleMessage("1", []string{aggDataMsg})
 }
 
 func TestFormatTime(t *testing.T) {
diff --git a/master/data_source/data_raw_handler.go b/master/data_source/data_raw_handler.go
new file mode 100644
index 0000000..2268451
--- /dev/null
+++ b/master/data_source/data_raw_handler.go
@@ -0,0 +1,49 @@
+package data_source
+
+import (
+	"log"
+	"time"
+)
+
+// IMessageHandler 是 kafka 消息处理者接口
+type IMessageHandler interface {
+	HandleMessage(key string, values []string) bool
+	GetDataChannel() chan *RPCPayload
+}
+
+type RPCPayload struct {
+	Id       string
+	Messages []string
+}
+
+type RawDataHandler struct {
+	key         string
+	topic       string
+	partitionID int
+	dataChannel chan *RPCPayload
+}
+
+// 创建一个新的 RawDataHandler 实例
+func NewRawDataHandler(key, topic string, partitionID int) *RawDataHandler {
+	handler := &RawDataHandler{
+		key:         key,
+		topic:       topic,
+		partitionID: partitionID,
+		dataChannel: make(chan *RPCPayload, 10),
+	}
+
+	return handler
+}
+
+// 在 kafka_dataSource.go 的 Producer() 中被使用
+func (h *RawDataHandler) HandleMessage(thingId string, values []string) bool {
+	h.dataChannel <- &RPCPayload{Id: thingId, Messages: values}
+	log.Printf("--> RawDataHandler%d ,h.dataChannel【%p】通道数据量: %d/%d", h.partitionID, h.dataChannel, len(h.dataChannel), cap(h.dataChannel))
+	time.Sleep(50 * time.Millisecond)
+	return true
+}
+
+// GetDataChannel 返回 dataChannel
+func (h *RawDataHandler) GetDataChannel() chan *RPCPayload {
+	return h.dataChannel
+}
diff --git a/master/data_source/kafka_consumerGroup_aggHandler.go b/master/data_source/kafka_consumerGroup_aggHandler.go
new file mode 100644
index 0000000..0de36a8
--- /dev/null
+++ b/master/data_source/kafka_consumerGroup_aggHandler.go
@@ -0,0 +1,317 @@
+package data_source
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"gitea.anxinyun.cn/container/common_utils/configLoad"
+	"github.com/IBM/sarama"
+	"github.com/panjf2000/ants/v2"
+	"golang.org/x/time/rate"
+	"log"
+	"sync"
+	"time"
+)
+
+type AggConsumerGroupHandler struct {
+	kafkaConfig   KafkaConfig
+	topicHandlers sync.Map // 主题处理方法
+
+	kafkaPaused bool        //是否处于暂停数据接收状态
+	ControlChan chan string // 控制信号通道
+
+	mu sync.RWMutex
+}
+
+func NewAggConsumerGroupHandler(kafkaConfig KafkaConfig) *AggConsumerGroupHandler {
+	return &AggConsumerGroupHandler{
+		kafkaConfig: kafkaConfig,
+		ControlChan: make(chan string),
+	}
+}
+
+func (h *AggConsumerGroupHandler) ConnectConsumerGroup() {
+	log.Println("AggData kafka init...")
+	vp := configLoad.LoadConfig()
+	minFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.minFetch")
+	maxFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxFetch")
+	maxWaitTime := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxWaitTime")
+
+	// 消费者配置信息
+	config := sarama.NewConfig()
+	config.Consumer.Return.Errors = false // 不返回消费过程中的错误
+	config.Version = sarama.V2_0_0_0
+	config.Consumer.Offsets.Initial = sarama.OffsetOldest // 从最旧的消息开始消费
+	config.Consumer.Offsets.AutoCommit.Enable = true      // 启动自动提交偏移量
+	config.Consumer.Offsets.AutoCommit.Interval = 1000 * time.Millisecond
+	config.Consumer.Fetch.Min = minFetch                                        // 最小拉取 10 KB
+	config.Consumer.Fetch.Max = maxFetch                                        // 最大拉取 5 MB
+	config.Consumer.MaxWaitTime = time.Duration(maxWaitTime) * time.Millisecond // 最大等待时间 ms
+	config.Consumer.Retry.Backoff = 10000 * time.Millisecond                    // 消费失败后重试的延迟时间
+	//config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {}
+	config.Consumer.Group.Session.Timeout = 60000 * time.Millisecond                                                  // 消费者的心跳 默认10s -> 30s
+	config.Consumer.Group.Heartbeat.Interval = 6000 * time.Millisecond                                                // Heartbeat 这个值必须小于 session.timeout.ms ,一般小于 session.timeout.ms/3,默认是3s
+	config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} // 设置消费者组的负载均衡策略为轮询策略
+
+	// 创建消费者组
+	client, err := sarama.NewConsumerGroup(h.kafkaConfig.Brokers, h.kafkaConfig.GroupID, config)
+	if err != nil {
+		panic(err)
+	}
+	defer func() {
+		_ = client.Close()
+	}()
+
+	// 启动错误处理协程
+	go func() {
+		for err := range client.Errors() {
+			log.Println("消费错误:", err)
+		}
+	}()
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	// 接收控制信号
+	go func() {
+		for {
+			select {
+			case signal := <-h.ControlChan:
+				switch signal {
+				case "stop":
+					log.Printf("[Agg-ConsumerGroup-%d] 收到停止信号,将停止消费.", h.kafkaConfig.ClientID)
+					h.kafkaPaused = true
+				case "resume":
+					log.Printf("[Agg-ConsumerGroup-%d] 收到恢复信号,将恢复消费.", h.kafkaConfig.ClientID)
+					h.kafkaPaused = false
+				}
+			}
+		}
+	}()
+
+	log.Printf("[Agg-ConsumerGroup-%d] 准备启动 Kafka 消费者协程。订阅的主题: %v", h.kafkaConfig.ClientID, h.kafkaConfig.Topic)
+
+	// 创建消费者实例
+	consumerInstance := h
+
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for {
+			topics := []string{h.kafkaConfig.Topic}
+			err1 := client.Consume(ctx, topics, consumerInstance)
+			if err1 != nil {
+				log.Printf("[Agg-ConsumerGroup-%d] 订阅主题[%v]异常。%s", h.kafkaConfig.ClientID, h.kafkaConfig.Topic, err1.Error())
+				return
+			}
+
+			if ctx.Err() != nil {
+				log.Println(ctx.Err())
+				return
+			}
+		}
+	}()
+
+	log.Println("AggData Sarama consumer up and running ...")
+	wg.Wait()
+}
+
+func (h *AggConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
+	// 在此执行任何必要的设置任务。
+	log.Printf("data_agg消费者组会话开始,%+v", session.Claims())
+	return nil
+}
+
+func (h *AggConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
+	// 在此执行任何必要的清理任务。
+	log.Println("data_agg消费者组会话结束,", session.Claims())
+	return nil
+}
+
+func (h *AggConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+	log.Printf("data_agg 处理消费者组会话,%+v. MemberID: %v, Topic: %v, Partition: %v \n", session.Claims(), session.MemberID(), claim.Topic(), claim.Partition())
+	topic := claim.Topic()
+	isDeadLetterQueue := false // 是否为死信队列消息
+	if len(topic) > 4 && topic[:4] == "DLP_" {
+		isDeadLetterQueue = true
+	}
+
+	if isDeadLetterQueue {
+		return h.DLPConsumeClaim(session, claim)
+	} else {
+		return h.BatchConsumeClaim(session, claim)
+	}
+}
+
+func (h *AggConsumerGroupHandler) BatchConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+	maxBatchSize := configLoad.LoadConfig().GetInt("performance.master.kafkaConsumer.data_agg.maxBatchSize")
+	messageChannel := make(chan map[string][]*sarama.ConsumerMessage, 50)
+	topicHandlerKey := fmt.Sprintf("%s_%d", claim.Topic(), claim.Partition())
+	msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey)
+	if !isValid {
+		log.Printf("[Agg-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
+		return fmt.Errorf("[Agg-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
+	}
+
+	// 使用 sync.Pool 复用 map 对象
+	messageMapPool := sync.Pool{
+		New: func() interface{} {
+			return make(map[string][]*sarama.ConsumerMessage)
+		},
+	}
+
+	// 启动一个 goroutine 来处理消息
+	var wg sync.WaitGroup
+	pool, _ := ants.NewPool(100)
+	defer pool.Release()
+
+	//创建一个速率限制器,它允许每 10 毫秒执行一次操作,且在这个时间窗口内最多只能执行一次。
+	var aggRateLimiter = rate.NewLimiter(rate.Every(10*time.Millisecond), 1)
+	go func() {
+		for structMessages := range messageChannel {
+			_ = aggRateLimiter.Wait(context.Background()) // 控制消费速率
+
+			wg.Add(len(structMessages))
+
+			for k, v := range structMessages {
+				key := k
+				msgs := v
+
+				_ = pool.Submit(func() {
+					defer wg.Done()
+					defer messageMapPool.Put(structMessages)
+
+					if len(msgs) == 0 {
+						return
+					}
+
+					values := make([]string, len(msgs))
+					for i, msg := range msgs {
+						values[i] = string(msg.Value)
+					}
+
+					// 处理消息并检查是否成功
+					isProcessed := msgHandler.(func(structId string, values []string) bool)(key, values) //msgHandler(key, values)
+					if !isProcessed {
+						log.Printf("[Agg-ConsumerGroup] 消息处理失败,键: %s,消息: %v", key, msgs)
+					} else {
+						// 处理成功后,消息标记为已处理
+						for _, msg := range msgs {
+							session.MarkMessage(msg, "is handled")
+						}
+					}
+				})
+			}
+		}
+	}()
+
+	batchBuffer := make(map[string][]*sarama.ConsumerMessage) // 按键分类的批次缓冲
+	currentBatchSize := make(map[string]int)                  // 记录每个 key 的当前批次大小
+
+	// 定义一个定时器,用于处理残余消息
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	ticker := time.NewTicker(20 * time.Second)
+	defer ticker.Stop()
+	go func() {
+		for {
+			select {
+			case <-ticker.C:
+				// 处理剩余的消息
+				for structId, msgs := range batchBuffer {
+					if len(msgs) > 0 {
+						// 从池中获取 map 对象
+						msgMap := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage)
+						msgMap[structId] = msgs
+						messageChannel <- msgMap
+						delete(batchBuffer, structId) // 清除已处理的键
+						delete(currentBatchSize, structId)
+					}
+				}
+			case <-ctx.Done():
+				return // 退出 Goroutine
+			}
+		}
+	}()
+
+	// 读消息
+	for msg := range claim.Messages() {
+		structId := string(msg.Key)
+		if structId == "" {
+			structId = "structId-null"
+		}
+
+		// 将消息添加到批次缓冲
+		batchBuffer[structId] = append(batchBuffer[structId], msg)
+
+		// 计算当前 key 的消息大小
+		currentBatchSize[structId] += len(msg.Value)
+
+		// 如果当前批次达到 maxBatchSize,发送到通道并重置
+		if currentBatchSize[structId] >= maxBatchSize {
+			// 从池中获取 map 对象
+			msgMap := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage)
+			msgMap[structId] = batchBuffer[structId]
+			messageChannel <- msgMap
+			delete(batchBuffer, structId)      // 清除已处理的键
+			delete(currentBatchSize, structId) // 清除已处理的大小
+		}
+	}
+
+	close(messageChannel)
+	wg.Wait()
+
+	return nil
+}
+
+func (h *AggConsumerGroupHandler) DLPConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+	topicHandlerKey := "DLP_DATA_AGG"
+	msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey)
+	if !isValid {
+		log.Printf("[DLP-Agg-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
+		return fmt.Errorf("[DLP-Agg-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
+	}
+
+	for msg := range claim.Messages() {
+		structId := string(msg.Key)
+		if structId == "" {
+			structId = "structId-null"
+		}
+
+		// 解析 value
+		var value []string
+		err := json.Unmarshal(msg.Value, &value)
+		if err != nil {
+			log.Printf("[DLP_Agg-ConsumerGroup]Failed to unmarshal value: %v", err)
+			continue
+		}
+
+		isProcessed := msgHandler.(func(structId string, values []string) bool)(structId, value)
+		if !isProcessed {
+			log.Printf("[DLP_Agg-ConsumerGroup]消息处理失败,structId: %s,消息: %v", structId, value)
+		} else {
+			// 处理成功后,消息标记为已处理
+			session.MarkMessage(msg, "is handled")
+		}
+	}
+
+	return nil
+}
+
+func (h *AggConsumerGroupHandler) SetTopicHandler(topicHandlerKey string, fun func(structId string, values []string) bool) {
+	h.mu.Lock()
+	defer h.mu.Unlock()
+	h.topicHandlers.Store(topicHandlerKey, fun)
+}
+
+// 消息消费启动控制
+func (h *AggConsumerGroupHandler) SetKafkaPaused(paused bool) {
+	h.kafkaPaused = paused
+	if paused {
+		log.Println("Kafka消息消费 已暂停.")
+	} else {
+		log.Println("Kafka消息消费 已恢复.")
+	}
+}
diff --git a/master/data_source/kafka_consumerGroup_iotaHandler.go b/master/data_source/kafka_consumerGroup_iotaHandler.go
new file mode 100644
index 0000000..15a7076
--- /dev/null
+++ b/master/data_source/kafka_consumerGroup_iotaHandler.go
@@ -0,0 +1,329 @@
+package data_source
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"gitea.anxinyun.cn/container/common_utils/configLoad"
+	"github.com/IBM/sarama"
+	"github.com/panjf2000/ants/v2"
+	"golang.org/x/time/rate"
+	"log"
+	"sync"
+	"time"
+)
+
+type RawConsumerGroupHandler struct {
+	kafkaConfig   KafkaConfig
+	topicHandlers sync.Map // 分区主题处理方法
+
+	kafkaPaused bool        //是否处于暂停数据接收状态
+	ControlChan chan string // 控制信号通道
+
+	mu sync.RWMutex
+}
+
+func NewRawConsumerGroupHandler(kafkaConfig KafkaConfig) *RawConsumerGroupHandler {
+	return &RawConsumerGroupHandler{
+		kafkaConfig: kafkaConfig,
+		ControlChan: make(chan string),
+	}
+}
+
+func (h *RawConsumerGroupHandler) ConnectConsumerGroup() {
+	log.Println("RawData kafka init...")
+	vp := configLoad.LoadConfig()
+	minFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.minFetch")
+	maxFetch := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxFetch")
+	maxWaitTime := vp.GetInt32("performance.master.kafkaConsumer.consumerCfg.maxWaitTime")
+
+	// 消费者配置信息
+	config := sarama.NewConfig()
+	config.Consumer.Return.Errors = false // 不返回消费过程中的错误
+	config.Version = sarama.V2_0_0_0
+	config.Consumer.Offsets.Initial = sarama.OffsetOldest // 从最旧的消息开始消费
+	config.Consumer.Offsets.AutoCommit.Enable = true      // 启动自动提交偏移量
+	config.Consumer.Offsets.AutoCommit.Interval = 1000 * time.Millisecond
+	config.Consumer.Fetch.Min = minFetch                                        // 最小拉取 10 KB
+	config.Consumer.Fetch.Max = maxFetch                                        // 最大拉取 5 MB
+	config.Consumer.MaxWaitTime = time.Duration(maxWaitTime) * time.Millisecond // 最大等待时间 ms
+	config.Consumer.Retry.Backoff = 10000 * time.Millisecond                    // 消费失败后重试的延迟时间
+	//config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {}
+	config.Consumer.Group.Session.Timeout = 60000 * time.Millisecond                                                  // 消费者的心跳 默认10s -> 30s
+	config.Consumer.Group.Heartbeat.Interval = 6000 * time.Millisecond                                                // Heartbeat 这个值必须小于 session.timeout.ms ,一般小于 session.timeout.ms/3,默认是3s
+	config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} // 设置消费者组的负载均衡策略为轮询策略
+
+	// 创建消费者组
+	client, err := sarama.NewConsumerGroup(h.kafkaConfig.Brokers, h.kafkaConfig.GroupID, config)
+	if err != nil {
+		panic(err)
+	}
+	defer func() {
+		_ = client.Close()
+	}()
+
+	// 启动错误处理协程
+	go func() {
+		for err := range client.Errors() {
+			log.Println("消费错误:", err)
+		}
+	}()
+
+	// 接收控制信号
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	go func() {
+		for {
+			select {
+			case signal := <-h.ControlChan:
+				switch signal {
+				case "stop":
+					log.Printf("[Raw-ConsumerGroup-%d] 收到停止信号,将停止消费.", h.kafkaConfig.ClientID)
+					h.kafkaPaused = true
+				case "resume":
+					log.Printf("[Raw-ConsumerGroup-%d] 收到恢复信号,将恢复消费.", h.kafkaConfig.ClientID)
+					h.kafkaPaused = false
+				}
+			case <-ctx.Done():
+				return
+			}
+		}
+	}()
+
+	log.Printf("[Raw-ConsumerGroup-%d] 准备启动 Kafka 消费者协程。订阅的主题: %v", h.kafkaConfig.ClientID, h.kafkaConfig.Topic)
+
+	// 创建消费者实例
+	consumerInstance := h
+
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for {
+			// 消费 Kafka 消息
+			topics := []string{h.kafkaConfig.Topic}
+			err1 := client.Consume(ctx, topics, consumerInstance) // 加入消费者组,并订阅指定主题。Kafka会为每个消费者分配相应的分区。
+			if err1 != nil {
+				log.Printf("[Raw-ConsumerGroup-%d] 订阅主题[%v]异常。%s", h.kafkaConfig.ClientID, h.kafkaConfig.Topic, err1.Error())
+				return
+			}
+
+			if ctx.Err() != nil {
+				log.Println(ctx.Err())
+				return
+			}
+		}
+	}()
+
+	log.Println("RawData Sarama consumer up and running ...")
+	wg.Wait()
+}
+
+// Setup 在新会话开始时运行
+func (h *RawConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
+	// 在此执行任何必要的设置任务。
+	log.Printf("data_raw消费者组会话开始,%+v \n", session.Claims())
+	return nil
+}
+
+// Cleanup 在会话结束时运行
+func (h *RawConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
+	// 在此执行任何必要的清理任务。
+	log.Println("data_raw消费者组会话结束,", session.Claims())
+	return nil
+}
+
+// ConsumeClaim 启动消费者循环
+func (h *RawConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+	log.Printf("data_raw 处理消费者组会话,%+v, MemberID: %v, Topic: %v, Partition: %v \n", session.Claims(), session.MemberID(), claim.Topic(), claim.Partition())
+	topic := claim.Topic()
+	isDeadLetterQueue := false // 是否为死信队列消息
+	if len(topic) > 4 && topic[:4] == "DLP_" {
+		isDeadLetterQueue = true
+	}
+
+	if isDeadLetterQueue {
+		return h.DLPConsumeClaim(session, claim)
+	} else {
+		return h.BatchConsumeClaim(session, claim)
+	}
+}
+
+func (h *RawConsumerGroupHandler) BatchConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+	//const maxBatchSize = 50 * 1024 // TODO 设置每个批次的最大字节数,例如 50kB
+	maxBatchSize := configLoad.LoadConfig().GetInt("performance.master.kafkaConsumer.data_raw.maxBatchSize")
+	messageChannel := make(chan map[string][]*sarama.ConsumerMessage, 100)
+	topicHandlerKey := fmt.Sprintf("%s_%d", claim.Topic(), claim.Partition())
+	msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey)
+	if !isValid {
+		log.Printf("[Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
+		return fmt.Errorf("[Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
+	}
+
+	messageMapPool := sync.Pool{
+		New: func() interface{} {
+			return make(map[string][]*sarama.ConsumerMessage)
+		},
+	}
+
+	// 启动一个 goroutine 来处理消息
+	var wg sync.WaitGroup
+
+	pool, _ := ants.NewPool(100)
+	defer pool.Release()
+
+	var rawRateLimiter = rate.NewLimiter(rate.Every(10*time.Millisecond), 1)
+	go func() {
+		for thingMessages := range messageChannel {
+			_ = rawRateLimiter.Wait(context.Background()) // 控制消费速率
+
+			wg.Add(len(thingMessages))
+
+			for k, v := range thingMessages {
+				key := k
+				msgs := v
+
+				_ = pool.Submit(func() {
+					defer wg.Done()
+					defer messageMapPool.Put(thingMessages) // 归还到池中
+
+					if len(msgs) == 0 {
+						return
+					}
+
+					values := make([]string, len(msgs))
+					for i, msg := range msgs {
+						values[i] = string(msg.Value)
+					}
+
+					// 处理消息并检查是否成功
+					isProcessed := msgHandler.(func(thingId string, values []string) bool)(key, values) //msgHandler(key, values)
+					if !isProcessed {
+						log.Printf("[Raw-ConsumerGroup] 消息处理失败,键: %s,消息: %v", key, msgs)
+					} else {
+						// 处理成功后,消息标记为已处理
+						for _, msg := range msgs {
+							session.MarkMessage(msg, "is handled")
+						}
+					}
+				})
+			}
+		}
+	}()
+
+	batchBuffer := make(map[string][]*sarama.ConsumerMessage) // 按 thingId 分类的批次缓冲
+	currentBatchSize := make(map[string]int)                  // 记录每个 thingId 的当前批次大小
+
+	// 定义一个定时器,用于处理剩余消息
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	ticker := time.NewTicker(20 * time.Second)
+	defer ticker.Stop()
+	go func() {
+		for {
+			select {
+			case <-ticker.C:
+				// 处理剩余的消息
+				for thingId, msgs := range batchBuffer {
+					if len(msgs) > 0 {
+						msgMap := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage)
+						msgMap[thingId] = msgs
+						messageChannel <- msgMap
+						delete(batchBuffer, thingId)
+						delete(currentBatchSize, thingId) // 统一清理
+					}
+				}
+			case <-ctx.Done():
+				return // 退出 Goroutine
+			}
+		}
+	}()
+
+	// 读消息
+	for msg := range claim.Messages() {
+		thingId := string(msg.Key)
+		if thingId == "" {
+			thingId = "thingId-null"
+		}
+
+		// 将消息添加到批次缓冲
+		batchBuffer[thingId] = append(batchBuffer[thingId], msg)
+
+		// 计算当前 key 的消息大小
+		currentBatchSize[thingId] += len(msg.Value)
+
+		// 如果当前批次达到 maxBatchSize,发送到通道并重置
+		if currentBatchSize[thingId] >= maxBatchSize {
+			// 从池中获取 map 对象
+			thingMessages := messageMapPool.Get().(map[string][]*sarama.ConsumerMessage)
+			thingMessages[thingId] = batchBuffer[thingId]
+			messageChannel <- thingMessages
+			delete(batchBuffer, thingId)      // 清除已处理的键
+			delete(currentBatchSize, thingId) // 清除已处理的大小
+		}
+	}
+
+	close(messageChannel)
+	wg.Wait()
+
+	return nil
+}
+
+func (h *RawConsumerGroupHandler) DLPConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+	topicHandlerKey := "DLP_DATA_RAW"
+	msgHandler, isValid := h.topicHandlers.Load(topicHandlerKey)
+	if !isValid {
+		log.Printf("[DLP-Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
+		return fmt.Errorf("[DLP-Raw-ConsumerGroup]Topic[%s]Partitions[%d]无消息处理者。", claim.Topic(), claim.Partition())
+	}
+
+	for msg := range claim.Messages() {
+		thingId := string(msg.Key)
+		if thingId == "" {
+			thingId = "thingId-null"
+		}
+
+		// 解析 value
+		var value []string
+		err := json.Unmarshal(msg.Value, &value)
+		if err != nil {
+			log.Printf("[DLP_Raw-ConsumerGroup]Failed to unmarshal value: %v", err)
+			continue
+		}
+
+		isProcessed := msgHandler.(func(thingId string, values []string) bool)(thingId, value)
+		if !isProcessed {
+			log.Printf("[DLP_Raw-ConsumerGroup]消息处理失败,thingId: %s,消息: %v", thingId, value)
+		} else {
+			// 处理成功后,消息标记为已处理
+			session.MarkMessage(msg, "is handled")
+		}
+	}
+
+	return nil
+}
+
+func (h *RawConsumerGroupHandler) SetTopicHandler(topicHandlerKey string, fun func(thingId string, values []string) bool) {
+	h.mu.Lock()
+	defer h.mu.Unlock()
+	h.topicHandlers.Store(topicHandlerKey, fun)
+}
+
+// 消息消费启动控制
+func (h *RawConsumerGroupHandler) SetKafkaPaused(paused bool) {
+	h.kafkaPaused = paused
+	if paused {
+		log.Println("Kafka消息消费 已暂停.")
+	} else {
+		log.Println("Kafka消息消费 已恢复.")
+	}
+}
+
+//// TODO 动态调整消费速率
+//func (h *RawConsumerGroupHandler) SetConsumeRate(interval time.Duration) {
+//	h.mu.Lock()
+//	defer h.mu.Unlock()
+//	rateLimiter.SetLimit(rate.Every(interval))
+//}
diff --git a/master/data_source/kafka_dataSource.go b/master/data_source/kafka_dataSource.go
new file mode 100644
index 0000000..4ddc23f
--- /dev/null
+++ b/master/data_source/kafka_dataSource.go
@@ -0,0 +1,278 @@
+package data_source
+
+import (
+	"fmt"
+	"gitea.anxinyun.cn/container/common_utils/configLoad"
+	"github.com/spf13/viper"
+	"log"
+	"sync"
+)
+
+type KafkaConfig struct {
+	ClientID   int // 暂时无用
+	Brokers    []string
+	GroupID    string
+	Topic      string
+	Partitions int
+}
+
+type TopicConfig struct {
+	Topic      string
+	Partitions int
+}
+
+type KafkaConsumerConfig struct {
+	RawData *RawDataConfig //`yaml:"data_raw"`
+	AggData *AggDataConfig // `yaml:"data_agg"`
+}
+
+type RawDataConfig struct {
+	MaxBatchSize   int `yaml:"maxBatchSize"`
+	IotaBufSize    int `yaml:"iotaBufSize"`
+	ProcessBufSize int `yaml:"processBufSize"`
+}
+
+type AggDataConfig struct {
+	MaxBatchSize int `yaml:"maxBatchSize"`
+	AggBufSize   int `yaml:"aggBufSize"`
+}
+
+type KafkaDataSource struct {
+	groupId                     string
+	Brokers                     []string
+	Topics                      map[string]TopicConfig
+	Master_kafkaConsumer_config *KafkaConsumerConfig // 性能配置
+	RawDataHandlers             *sync.Map            // 原始数据处理器
+	AggDataHandlers             *sync.Map            // 聚集数据处理器
+
+	kafkaPaused bool
+	controlChan chan string // 控制信号通道
+}
+
+func NewKafkaDataSource() *KafkaDataSource {
+	k := &KafkaDataSource{
+		controlChan: make(chan string),
+	}
+
+	k.loadKafkaConfig()
+	return k
+}
+
+func (s *KafkaDataSource) loadKafkaConfig() {
+	vp := configLoad.LoadConfig()
+	groupId := vp.GetString("kafka.groupId")
+	brokers := vp.GetStringSlice("kafka.Brokers")
+	log.Println("消费者组 kafka.groupId:", groupId)
+
+	s.groupId = groupId
+	s.Brokers = brokers
+	s.loadTopics(vp)
+	s.loadKafkaConsumerConfig(vp)
+}
+func (s *KafkaDataSource) loadTopics(vp *viper.Viper) {
+	topics := make(map[string]TopicConfig)
+
+	// 定义要加载的主题列表
+	topicNames := []string{"data_raw", "data_agg"}
+
+	for _, topicName := range topicNames {
+		topic := vp.GetString(fmt.Sprintf("kafka.Topics.%s.topic", topicName))
+		if topic == "" {
+			log.Printf("主题 kafka.Topics.%s.topic 配置为空", topicName)
+			continue
+		}
+
+		partitions := vp.GetInt(fmt.Sprintf("kafka.Topics.%s.partitions", topicName))
+		if partitions <= 0 {
+			partitions = 1
+		}
+
+		topics[topicName] = TopicConfig{
+			Topic:      topic,
+			Partitions: partitions,
+		}
+	}
+
+	s.Topics = topics
+}
+func (s *KafkaDataSource) loadKafkaConsumerConfig(vp *viper.Viper) {
+	// 获取 kafkaConsumer 部分的配置
+	kafkaConsumerKey := "performance.master.kafkaConsumer"
+	if !vp.IsSet(kafkaConsumerKey) {
+		log.Panicf("配置 %s 必须存在", kafkaConsumerKey)
+	}
+
+	// 创建 KafkaConsumerConfig 实例
+	config := &KafkaConsumerConfig{}
+
+	// 解析 data_raw 配置
+	if vp.IsSet(kafkaConsumerKey + ".data_raw") {
+		dataRaw := &RawDataConfig{}
+		if err := vp.UnmarshalKey(kafkaConsumerKey+".data_raw", dataRaw); err != nil {
+			log.Panicf("解析 data_raw 配置失败: %v\n", err)
+		} else {
+			config.RawData = dataRaw
+		}
+	}
+
+	// 解析 data_agg 配置
+	if vp.IsSet(kafkaConsumerKey + ".data_agg") {
+		dataAgg := &AggDataConfig{}
+		if err := vp.UnmarshalKey(kafkaConsumerKey+".data_agg", dataAgg); err != nil {
+			log.Panicf("解析 data_agg 配置失败: %v\n", err)
+		} else {
+			config.AggData = dataAgg
+		}
+	}
+
+	s.Master_kafkaConsumer_config = config
+}
+
+func (s *KafkaDataSource) AggDataProducer() {
+	var wg sync.WaitGroup
+	const topicCfgKey = "data_agg"
+	topicCfg := s.Topics[topicCfgKey]
+
+	if topicCfg.Topic == "" {
+		log.Printf("Error: 启动 AggData Producer 失败,无 kafka.topics.data_agg 配置。")
+		return
+	}
+
+	if s.Master_kafkaConsumer_config.AggData == nil {
+		log.Printf("Error: 启动 AggData Producer 失败,无 performance.master.kafkaConsumer.data_agg 配置。")
+		return
+	}
+
+	// 启动工作协程
+	wg.Add(1)
+	go func(clientID int) {
+		defer wg.Done()
+		kafkaHandler := NewAggConsumerGroupHandler(KafkaConfig{
+			Brokers:    s.Brokers,
+			GroupID:    s.groupId,
+			Topic:      topicCfg.Topic,
+			Partitions: topicCfg.Partitions,
+			ClientID:   clientID,
+		})
+		kafkaHandler.ControlChan = s.controlChan
+
+		// 装载配置
+		for partId := 0; partId < topicCfg.Partitions; partId++ {
+			key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId)
+			msgHandler, ok := s.getDataHandler(topicCfgKey, key)
+
+			if !ok || msgHandler == nil {
+				log.Panicf("Kafka topic[%s] 未定义data_agg 消息处理者,跳过。\n", key)
+				continue
+			}
+
+			// 把消息传递给 dataSource/kafka/AggDataHandler.HandleMessage([]string)
+			kafkaHandler.SetTopicHandler(key, msgHandler.HandleMessage)
+		}
+
+		// 失败消息处理者
+		dlpKey := "DLP_DATA_RAW"
+		dataHandler, ok := s.RawDataHandlers.Load(dlpKey)
+		if !ok {
+			log.Panicf("Kafka topic[%s] 未定义消息处理者,跳过。\n", dlpKey)
+		}
+		msgHandler, _ := dataHandler.(IMessageHandler)
+		kafkaHandler.SetTopicHandler(dlpKey, msgHandler.HandleMessage)
+
+		// 启动消费组
+		kafkaHandler.ConnectConsumerGroup()
+
+	}(1)
+
+	wg.Wait()
+}
+
+// Producer 将 kafka message -> 各数据模型 -> 各数据通道
+func (s *KafkaDataSource) RawDataProducer() {
+	var wg sync.WaitGroup
+	const topicCfgKey = "data_raw"
+	topicCfg := s.Topics[topicCfgKey]
+
+	if topicCfg.Topic == "" {
+		log.Printf("Error: 启动 RawData Producer 失败,无 kafka.topics.data_raw 配置。")
+		return
+	}
+
+	if s.Master_kafkaConsumer_config.RawData == nil {
+		log.Printf("Error: 启动 RawData Producer 失败,无 performance.master.kafkaConsumer.data_raw 配置。")
+		return
+	}
+
+	// 启动工作协程
+	wg.Add(1)
+	go func(clientID int) {
+		defer wg.Done()
+		kafkaHandler := NewRawConsumerGroupHandler(KafkaConfig{
+			Brokers:    s.Brokers,
+			GroupID:    s.groupId,
+			Topic:      topicCfg.Topic,
+			Partitions: topicCfg.Partitions,
+			ClientID:   clientID,
+		})
+		kafkaHandler.ControlChan = s.controlChan
+
+		for partId := 0; partId < topicCfg.Partitions; partId++ {
+			key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId)
+			msgHandler, ok := s.getDataHandler(topicCfgKey, key)
+			if !ok || msgHandler == nil {
+				log.Panicf("Kafka topic[%s] 未定义消息处理者,跳过。\n", key)
+				continue
+			}
+			// 把消息传递给 dataSource/kafka/RawDataHandler.HandleMessage([]string)
+			kafkaHandler.SetTopicHandler(key, msgHandler.HandleMessage)
+		}
+
+		// 失败消息处理者
+		dlpKey := "DLP_DATA_RAW"
+		dataHandler, ok := s.RawDataHandlers.Load(dlpKey)
+		if !ok {
+			log.Panicf("Kafka topic[%s] 未定义消息处理者,跳过。\n", dlpKey)
+		}
+		msgHandler, _ := dataHandler.(IMessageHandler)
+		kafkaHandler.SetTopicHandler(dlpKey, msgHandler.HandleMessage)
+
+		//启动消费
+		kafkaHandler.ConnectConsumerGroup()
+
+	}(1)
+
+	wg.Wait()
+}
+
+// 根据 key 获取 dataHandler
+func (s *KafkaDataSource) getDataHandler(topicCfg, key string) (IMessageHandler, bool) {
+	var dataHandler any
+	var exists bool
+
+	if topicCfg == "data_agg" {
+		dataHandler, exists = s.AggDataHandlers.Load(key)
+	} else if topicCfg == "data_raw" {
+		dataHandler, exists = s.RawDataHandlers.Load(key)
+	}
+
+	if !exists {
+		return nil, false
+	}
+
+	handler, ok := dataHandler.(IMessageHandler)
+	if !ok {
+		return nil, false
+	}
+
+	return handler, true
+}
+
+// 发送停止信号
+func (s *KafkaDataSource) StopConsumers() {
+	s.controlChan <- "stop"
+}
+
+// 发送恢复信号
+func (s *KafkaDataSource) ResumeConsumers() {
+	s.controlChan <- "resume"
+}
diff --git a/master/data_source/kafka_producer.go b/master/data_source/kafka_producer.go
new file mode 100644
index 0000000..c223717
--- /dev/null
+++ b/master/data_source/kafka_producer.go
@@ -0,0 +1,67 @@
+package data_source
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/IBM/sarama"
+	"log"
+	"time"
+)
+
+type KafkaProducer struct {
+	producer sarama.SyncProducer
+	brokers  []string
+}
+
+func NewKafkaProducer(brokers []string) (*KafkaProducer, error) {
+	// 配置 Kafka 生产者
+	producerConfig := sarama.NewConfig()
+	producerConfig.Producer.Return.Successes = true                // 返回成功发送的消息
+	producerConfig.Producer.Return.Errors = true                   // 返回发送失败的消息
+	producerConfig.Producer.RequiredAcks = sarama.WaitForAll       // 等待所有副本确认
+	producerConfig.Producer.Timeout = 10 * time.Second             // 生产者超时时间
+	producerConfig.Producer.Retry.Max = 3                          // 最大重试次数
+	producerConfig.Producer.Retry.Backoff = 100 * time.Millisecond // 重试间隔时间
+	producerConfig.Producer.Compression = sarama.CompressionSnappy // 使用 Snappy 压缩
+	producerConfig.Producer.MaxMessageBytes = 1024 * 1024 * 4      // 单条消息最大 4MB
+
+	producer, err := sarama.NewSyncProducer(brokers, producerConfig)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create Kafka producer:%v", err)
+	}
+
+	return &KafkaProducer{
+		producer: producer,
+		brokers:  brokers,
+	}, nil
+}
+
+// 实现将 messages 发送到 Kafka 的指定 topic
+func (kp *KafkaProducer) SendStringArrayMessage(topic, msgKey string, values []string) error {
+	// 将 value 序列化为 JSON
+	valueBytes, err := json.Marshal(values)
+	if err != nil {
+		return fmt.Errorf("failed to marshal value: %v", err)
+	}
+
+	// 构造 Kafka 消息
+	msg := &sarama.ProducerMessage{
+		Topic: topic,
+		Key:   sarama.StringEncoder(msgKey),
+		Value: sarama.ByteEncoder(valueBytes),
+	}
+
+	// 发送消息
+	_, _, err = kp.producer.SendMessage(msg)
+	if err != nil {
+		return fmt.Errorf("failed to send message to Kafka: %v", err)
+	}
+
+	log.Printf("Message sent successfully: key=%s, len(value)=%v", msgKey, len(values))
+	return nil
+}
+
+// Close 关闭 Kafka 生产者
+func (kp *KafkaProducer) Close() error {
+	return kp.producer.Close()
+}