et-go 20240919重建
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

67 lines
2.0 KiB

package data_source
import (
"encoding/json"
"fmt"
"github.com/IBM/sarama"
"log"
"time"
)
type KafkaProducer struct {
producer sarama.SyncProducer
brokers []string
}
func NewKafkaProducer(brokers []string) (*KafkaProducer, error) {
// 配置 Kafka 生产者
producerConfig := sarama.NewConfig()
producerConfig.Producer.Return.Successes = true // 返回成功发送的消息
producerConfig.Producer.Return.Errors = true // 返回发送失败的消息
producerConfig.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认
producerConfig.Producer.Timeout = 10 * time.Second // 生产者超时时间
producerConfig.Producer.Retry.Max = 3 // 最大重试次数
producerConfig.Producer.Retry.Backoff = 100 * time.Millisecond // 重试间隔时间
producerConfig.Producer.Compression = sarama.CompressionSnappy // 使用 Snappy 压缩
producerConfig.Producer.MaxMessageBytes = 1024 * 1024 * 4 // 单条消息最大 4MB
producer, err := sarama.NewSyncProducer(brokers, producerConfig)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka producer:%v", err)
}
return &KafkaProducer{
producer: producer,
brokers: brokers,
}, nil
}
// 实现将 messages 发送到 Kafka 的指定 topic
func (kp *KafkaProducer) SendStringArrayMessage(topic, msgKey string, values []string) error {
// 将 value 序列化为 JSON
valueBytes, err := json.Marshal(values)
if err != nil {
return fmt.Errorf("failed to marshal value: %v", err)
}
// 构造 Kafka 消息
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(msgKey),
Value: sarama.ByteEncoder(valueBytes),
}
// 发送消息
_, _, err = kp.producer.SendMessage(msg)
if err != nil {
return fmt.Errorf("failed to send message to Kafka: %v", err)
}
log.Printf("Message sent successfully: key=%s, len(value)=%v", msgKey, len(values))
return nil
}
// Close 关闭 Kafka 生产者
func (kp *KafkaProducer) Close() error {
return kp.producer.Close()
}