You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
67 lines
2.0 KiB
67 lines
2.0 KiB
package data_source
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/IBM/sarama"
|
|
"log"
|
|
"time"
|
|
)
|
|
|
|
type KafkaProducer struct {
|
|
producer sarama.SyncProducer
|
|
brokers []string
|
|
}
|
|
|
|
func NewKafkaProducer(brokers []string) (*KafkaProducer, error) {
|
|
// 配置 Kafka 生产者
|
|
producerConfig := sarama.NewConfig()
|
|
producerConfig.Producer.Return.Successes = true // 返回成功发送的消息
|
|
producerConfig.Producer.Return.Errors = true // 返回发送失败的消息
|
|
producerConfig.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认
|
|
producerConfig.Producer.Timeout = 10 * time.Second // 生产者超时时间
|
|
producerConfig.Producer.Retry.Max = 3 // 最大重试次数
|
|
producerConfig.Producer.Retry.Backoff = 100 * time.Millisecond // 重试间隔时间
|
|
producerConfig.Producer.Compression = sarama.CompressionSnappy // 使用 Snappy 压缩
|
|
producerConfig.Producer.MaxMessageBytes = 1024 * 1024 * 4 // 单条消息最大 4MB
|
|
|
|
producer, err := sarama.NewSyncProducer(brokers, producerConfig)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create Kafka producer:%v", err)
|
|
}
|
|
|
|
return &KafkaProducer{
|
|
producer: producer,
|
|
brokers: brokers,
|
|
}, nil
|
|
}
|
|
|
|
// 实现将 messages 发送到 Kafka 的指定 topic
|
|
func (kp *KafkaProducer) SendStringArrayMessage(topic, msgKey string, values []string) error {
|
|
// 将 value 序列化为 JSON
|
|
valueBytes, err := json.Marshal(values)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal value: %v", err)
|
|
}
|
|
|
|
// 构造 Kafka 消息
|
|
msg := &sarama.ProducerMessage{
|
|
Topic: topic,
|
|
Key: sarama.StringEncoder(msgKey),
|
|
Value: sarama.ByteEncoder(valueBytes),
|
|
}
|
|
|
|
// 发送消息
|
|
_, _, err = kp.producer.SendMessage(msg)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send message to Kafka: %v", err)
|
|
}
|
|
|
|
log.Printf("Message sent successfully: key=%s, len(value)=%v", msgKey, len(values))
|
|
return nil
|
|
}
|
|
|
|
// Close 关闭 Kafka 生产者
|
|
func (kp *KafkaProducer) Close() error {
|
|
return kp.producer.Close()
|
|
}
|
|
|