You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
278 lines
7.0 KiB
278 lines
7.0 KiB
package data_source
|
|
|
|
import (
|
|
"fmt"
|
|
"gitea.anxinyun.cn/container/common_utils/configLoad"
|
|
"github.com/spf13/viper"
|
|
"log"
|
|
"sync"
|
|
)
|
|
|
|
type KafkaConfig struct {
|
|
ClientID int // 暂时无用
|
|
Brokers []string
|
|
GroupID string
|
|
Topic string
|
|
Partitions int
|
|
}
|
|
|
|
type TopicConfig struct {
|
|
Topic string
|
|
Partitions int
|
|
}
|
|
|
|
type KafkaConsumerConfig struct {
|
|
RawData *RawDataConfig //`yaml:"data_raw"`
|
|
AggData *AggDataConfig // `yaml:"data_agg"`
|
|
}
|
|
|
|
type RawDataConfig struct {
|
|
MaxBatchSize int `yaml:"maxBatchSize"`
|
|
IotaBufSize int `yaml:"iotaBufSize"`
|
|
ProcessBufSize int `yaml:"processBufSize"`
|
|
}
|
|
|
|
type AggDataConfig struct {
|
|
MaxBatchSize int `yaml:"maxBatchSize"`
|
|
AggBufSize int `yaml:"aggBufSize"`
|
|
}
|
|
|
|
type KafkaDataSource struct {
|
|
groupId string
|
|
Brokers []string
|
|
Topics map[string]TopicConfig
|
|
Master_kafkaConsumer_config *KafkaConsumerConfig // 性能配置
|
|
RawDataHandlers *sync.Map // 原始数据处理器
|
|
AggDataHandlers *sync.Map // 聚集数据处理器
|
|
|
|
kafkaPaused bool
|
|
controlChan chan string // 控制信号通道
|
|
}
|
|
|
|
func NewKafkaDataSource() *KafkaDataSource {
|
|
k := &KafkaDataSource{
|
|
controlChan: make(chan string),
|
|
}
|
|
|
|
k.loadKafkaConfig()
|
|
return k
|
|
}
|
|
|
|
func (s *KafkaDataSource) loadKafkaConfig() {
|
|
vp := configLoad.LoadConfig()
|
|
groupId := vp.GetString("kafka.groupId")
|
|
brokers := vp.GetStringSlice("kafka.Brokers")
|
|
log.Println("消费者组 kafka.groupId:", groupId)
|
|
|
|
s.groupId = groupId
|
|
s.Brokers = brokers
|
|
s.loadTopics(vp)
|
|
s.loadKafkaConsumerConfig(vp)
|
|
}
|
|
func (s *KafkaDataSource) loadTopics(vp *viper.Viper) {
|
|
topics := make(map[string]TopicConfig)
|
|
|
|
// 定义要加载的主题列表
|
|
topicNames := []string{"data_raw", "data_agg"}
|
|
|
|
for _, topicName := range topicNames {
|
|
topic := vp.GetString(fmt.Sprintf("kafka.Topics.%s.topic", topicName))
|
|
if topic == "" {
|
|
log.Printf("主题 kafka.Topics.%s.topic 配置为空", topicName)
|
|
continue
|
|
}
|
|
|
|
partitions := vp.GetInt(fmt.Sprintf("kafka.Topics.%s.partitions", topicName))
|
|
if partitions <= 0 {
|
|
partitions = 1
|
|
}
|
|
|
|
topics[topicName] = TopicConfig{
|
|
Topic: topic,
|
|
Partitions: partitions,
|
|
}
|
|
}
|
|
|
|
s.Topics = topics
|
|
}
|
|
func (s *KafkaDataSource) loadKafkaConsumerConfig(vp *viper.Viper) {
|
|
// 获取 kafkaConsumer 部分的配置
|
|
kafkaConsumerKey := "performance.master.kafkaConsumer"
|
|
if !vp.IsSet(kafkaConsumerKey) {
|
|
log.Panicf("配置 %s 必须存在", kafkaConsumerKey)
|
|
}
|
|
|
|
// 创建 KafkaConsumerConfig 实例
|
|
config := &KafkaConsumerConfig{}
|
|
|
|
// 解析 data_raw 配置
|
|
if vp.IsSet(kafkaConsumerKey + ".data_raw") {
|
|
dataRaw := &RawDataConfig{}
|
|
if err := vp.UnmarshalKey(kafkaConsumerKey+".data_raw", dataRaw); err != nil {
|
|
log.Panicf("解析 data_raw 配置失败: %v\n", err)
|
|
} else {
|
|
config.RawData = dataRaw
|
|
}
|
|
}
|
|
|
|
// 解析 data_agg 配置
|
|
if vp.IsSet(kafkaConsumerKey + ".data_agg") {
|
|
dataAgg := &AggDataConfig{}
|
|
if err := vp.UnmarshalKey(kafkaConsumerKey+".data_agg", dataAgg); err != nil {
|
|
log.Panicf("解析 data_agg 配置失败: %v\n", err)
|
|
} else {
|
|
config.AggData = dataAgg
|
|
}
|
|
}
|
|
|
|
s.Master_kafkaConsumer_config = config
|
|
}
|
|
|
|
func (s *KafkaDataSource) AggDataProducer() {
|
|
var wg sync.WaitGroup
|
|
const topicCfgKey = "data_agg"
|
|
topicCfg := s.Topics[topicCfgKey]
|
|
|
|
if topicCfg.Topic == "" {
|
|
log.Printf("Error: 启动 AggData Producer 失败,无 kafka.topics.data_agg 配置。")
|
|
return
|
|
}
|
|
|
|
if s.Master_kafkaConsumer_config.AggData == nil {
|
|
log.Printf("Error: 启动 AggData Producer 失败,无 performance.master.kafkaConsumer.data_agg 配置。")
|
|
return
|
|
}
|
|
|
|
// 启动工作协程
|
|
wg.Add(1)
|
|
go func(clientID int) {
|
|
defer wg.Done()
|
|
kafkaHandler := NewAggConsumerGroupHandler(KafkaConfig{
|
|
Brokers: s.Brokers,
|
|
GroupID: s.groupId,
|
|
Topic: topicCfg.Topic,
|
|
Partitions: topicCfg.Partitions,
|
|
ClientID: clientID,
|
|
})
|
|
kafkaHandler.ControlChan = s.controlChan
|
|
|
|
// 装载配置
|
|
for partId := 0; partId < topicCfg.Partitions; partId++ {
|
|
key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId)
|
|
msgHandler, ok := s.getDataHandler(topicCfgKey, key)
|
|
|
|
if !ok || msgHandler == nil {
|
|
log.Panicf("Kafka topic[%s] 未定义data_agg 消息处理者,跳过。\n", key)
|
|
continue
|
|
}
|
|
|
|
// 把消息传递给 dataSource/kafka/AggDataHandler.HandleMessage([]string)
|
|
kafkaHandler.SetTopicHandler(key, msgHandler.HandleMessage)
|
|
}
|
|
|
|
// 失败消息处理者
|
|
dlpKey := "DLP_DATA_RAW"
|
|
dataHandler, ok := s.RawDataHandlers.Load(dlpKey)
|
|
if !ok {
|
|
log.Panicf("Kafka topic[%s] 未定义消息处理者,跳过。\n", dlpKey)
|
|
}
|
|
msgHandler, _ := dataHandler.(IMessageHandler)
|
|
kafkaHandler.SetTopicHandler(dlpKey, msgHandler.HandleMessage)
|
|
|
|
// 启动消费组
|
|
kafkaHandler.ConnectConsumerGroup()
|
|
|
|
}(1)
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
// Producer 将 kafka message -> 各数据模型 -> 各数据通道
|
|
func (s *KafkaDataSource) RawDataProducer() {
|
|
var wg sync.WaitGroup
|
|
const topicCfgKey = "data_raw"
|
|
topicCfg := s.Topics[topicCfgKey]
|
|
|
|
if topicCfg.Topic == "" {
|
|
log.Printf("Error: 启动 RawData Producer 失败,无 kafka.topics.data_raw 配置。")
|
|
return
|
|
}
|
|
|
|
if s.Master_kafkaConsumer_config.RawData == nil {
|
|
log.Printf("Error: 启动 RawData Producer 失败,无 performance.master.kafkaConsumer.data_raw 配置。")
|
|
return
|
|
}
|
|
|
|
// 启动工作协程
|
|
wg.Add(1)
|
|
go func(clientID int) {
|
|
defer wg.Done()
|
|
kafkaHandler := NewRawConsumerGroupHandler(KafkaConfig{
|
|
Brokers: s.Brokers,
|
|
GroupID: s.groupId,
|
|
Topic: topicCfg.Topic,
|
|
Partitions: topicCfg.Partitions,
|
|
ClientID: clientID,
|
|
})
|
|
kafkaHandler.ControlChan = s.controlChan
|
|
|
|
for partId := 0; partId < topicCfg.Partitions; partId++ {
|
|
key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId)
|
|
msgHandler, ok := s.getDataHandler(topicCfgKey, key)
|
|
if !ok || msgHandler == nil {
|
|
log.Panicf("Kafka topic[%s] 未定义消息处理者,跳过。\n", key)
|
|
continue
|
|
}
|
|
// 把消息传递给 dataSource/kafka/RawDataHandler.HandleMessage([]string)
|
|
kafkaHandler.SetTopicHandler(key, msgHandler.HandleMessage)
|
|
}
|
|
|
|
// 失败消息处理者
|
|
dlpKey := "DLP_DATA_RAW"
|
|
dataHandler, ok := s.RawDataHandlers.Load(dlpKey)
|
|
if !ok {
|
|
log.Panicf("Kafka topic[%s] 未定义消息处理者,跳过。\n", dlpKey)
|
|
}
|
|
msgHandler, _ := dataHandler.(IMessageHandler)
|
|
kafkaHandler.SetTopicHandler(dlpKey, msgHandler.HandleMessage)
|
|
|
|
//启动消费
|
|
kafkaHandler.ConnectConsumerGroup()
|
|
|
|
}(1)
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
// 根据 key 获取 dataHandler
|
|
func (s *KafkaDataSource) getDataHandler(topicCfg, key string) (IMessageHandler, bool) {
|
|
var dataHandler any
|
|
var exists bool
|
|
|
|
if topicCfg == "data_agg" {
|
|
dataHandler, exists = s.AggDataHandlers.Load(key)
|
|
} else if topicCfg == "data_raw" {
|
|
dataHandler, exists = s.RawDataHandlers.Load(key)
|
|
}
|
|
|
|
if !exists {
|
|
return nil, false
|
|
}
|
|
|
|
handler, ok := dataHandler.(IMessageHandler)
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
|
|
return handler, true
|
|
}
|
|
|
|
// 发送停止信号
|
|
func (s *KafkaDataSource) StopConsumers() {
|
|
s.controlChan <- "stop"
|
|
}
|
|
|
|
// 发送恢复信号
|
|
func (s *KafkaDataSource) ResumeConsumers() {
|
|
s.controlChan <- "resume"
|
|
}
|
|
|