You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
71 lines
1.8 KiB
71 lines
1.8 KiB
package kafka
|
|
|
|
import (
|
|
"dataSource"
|
|
"gitea.anxinyun.cn/container/common_utils/configLoad"
|
|
"gitea.anxinyun.cn/container/common_utils/kafkaHelper"
|
|
"log"
|
|
)
|
|
|
|
type KafkaDataSource struct {
|
|
groupId string
|
|
brokers []string
|
|
topics map[string]string
|
|
DataChannels *dataSource.DataChannels
|
|
}
|
|
|
|
func NewKafkaDataSource() *KafkaDataSource {
|
|
// 初始化所有通道
|
|
dataSource.InitChannels()
|
|
|
|
// 读取配置
|
|
config := configLoad.LoadConfig()
|
|
groupId := config.GetString("kafka.groupId")
|
|
brokers := config.GetStringSlice("kafka.brokers")
|
|
topics := config.GetStringMapString("kafka.topics")
|
|
|
|
return &KafkaDataSource{
|
|
groupId: groupId,
|
|
brokers: brokers,
|
|
topics: topics,
|
|
DataChannels: dataSource.GetChannels(),
|
|
}
|
|
}
|
|
|
|
// Producer 将 kafka message -> 各数据模型 -> 各数据通道
|
|
func (s *KafkaDataSource) Producer() {
|
|
// 消费数据
|
|
kafkaConsumer := kafkaHelper.NewConsumerGroupHandler(s.brokers, s.groupId)
|
|
for cfgName, topic := range s.topics {
|
|
// 创建消息处理器
|
|
handler := NewMessageHandler(cfgName)
|
|
if handler == nil {
|
|
log.Printf("Kafka topic【%s】未定义处理者。\n", cfgName)
|
|
continue
|
|
}
|
|
// 订阅主题 和 消息处理
|
|
kafkaConsumer.Subscribe(topic, handler.HandleMessage)
|
|
}
|
|
|
|
kafkaConsumer.Worker()
|
|
}
|
|
|
|
// IMessageHandler 是 kafka 消息处理者接口
|
|
type IMessageHandler interface {
|
|
HandleMessage(message string) bool
|
|
}
|
|
|
|
// NewMessageHandler 是 MessageHandler 构造函数
|
|
// cfgName: config.yaml 中 kafka.topics 的配置名
|
|
func NewMessageHandler(cfgName string) IMessageHandler {
|
|
switch cfgName {
|
|
case "data_raw":
|
|
log.Printf("Kafka topic【%s】已注册处理者IotaDataHandler\n", cfgName)
|
|
return IotaDataHandler{}
|
|
case "data_agg":
|
|
log.Printf("Kafka topic【%s】已注册处理者AggDataHandler\n", cfgName)
|
|
return NewAggDataHandler()
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|