You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
139 lines
3.7 KiB
139 lines
3.7 KiB
1 month ago
|
package kafkaHelper
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"github.com/IBM/sarama"
|
||
|
"log"
|
||
|
)
|
||
|
|
||
|
type ConsumerGroupHandler struct {
|
||
|
brokers []string
|
||
|
topics []string
|
||
|
groupId string
|
||
|
onHandlersMap map[string]func(*sarama.ConsumerMessage) bool
|
||
|
}
|
||
|
|
||
|
func NewConsumerGroupHandler(Brokers []string, GroupId string) *ConsumerGroupHandler {
|
||
|
return &ConsumerGroupHandler{
|
||
|
brokers: Brokers,
|
||
|
groupId: GroupId,
|
||
|
topics: make([]string, 0),
|
||
|
onHandlersMap: make(map[string]func(*sarama.ConsumerMessage) bool),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (h *ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
|
||
|
// 在此执行任何必要的设置任务。
|
||
|
fmt.Println("kafka Consumed start set")
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (h *ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
|
||
|
// 在此执行任何必要的清理任务。
|
||
|
return nil
|
||
|
}
|
||
|
func (h *ConsumerGroupHandler) Subscribe(topic string, fun func(string) bool) {
|
||
|
h.topics = append(h.topics, topic)
|
||
|
|
||
|
h.onHandlersMap[topic] = h.decorateSubscribeString(fun)
|
||
|
}
|
||
|
|
||
|
func (h *ConsumerGroupHandler) SubscribeRaw(topic string, fun func(*sarama.ConsumerMessage) bool) {
|
||
|
h.topics = append(h.topics, topic)
|
||
|
|
||
|
h.onHandlersMap[topic] = fun
|
||
|
}
|
||
|
|
||
|
func (h *ConsumerGroupHandler) decorateSubscribeString(handler func(string) bool) func(*sarama.ConsumerMessage) bool {
|
||
|
f := func(cm *sarama.ConsumerMessage) bool {
|
||
|
msg := string(cm.Value)
|
||
|
log.Printf("处理topic[%s]数据 offset=[%d]", cm.Topic, cm.Offset)
|
||
|
return handler(msg)
|
||
|
}
|
||
|
return f
|
||
|
}
|
||
|
|
||
|
func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error {
|
||
|
|
||
|
for message := range Claim.Messages() {
|
||
|
// 在这里处理消息。
|
||
|
topic := message.Topic
|
||
|
//log.Printf("[%s]Message: %s\n", topic, string(message.Value))
|
||
|
|
||
|
allOk := true
|
||
|
handlerFunc, isValid := h.onHandlersMap[topic]
|
||
|
if !isValid {
|
||
|
log.Printf("不存在 [%s]的解析", topic)
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
allOk = handlerFunc(message)
|
||
|
|
||
|
if allOk {
|
||
|
// 将消息标记为在会话中已处理。
|
||
|
session.MarkMessage(message, "is handled")
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (h *ConsumerGroupHandler) Worker() {
|
||
|
config := sarama.NewConfig()
|
||
|
config.Consumer.Return.Errors = false
|
||
|
config.Version = sarama.V2_0_0_0
|
||
|
config.Consumer.Offsets.Initial = sarama.OffsetOldest
|
||
|
config.Consumer.Offsets.AutoCommit.Enable = true
|
||
|
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
|
||
|
group, err := sarama.NewConsumerGroup(h.brokers, h.groupId, config)
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
defer func() { _ = group.Close() }()
|
||
|
|
||
|
// Track errors
|
||
|
go func() {
|
||
|
for err := range group.Errors() {
|
||
|
fmt.Println("ERROR", err)
|
||
|
}
|
||
|
}()
|
||
|
// Iterate over consumer sessions.
|
||
|
ctx := context.Background()
|
||
|
for {
|
||
|
//topics := []string{"anxinyun_data"} //my_topic
|
||
|
//handler := consumerGroupHandler{}
|
||
|
// `Consume` should be called inside an infinite loop, when a
|
||
|
// server-side rebalance happens, the consumer session will need to be
|
||
|
// recreated to get the new claims
|
||
|
handler := h
|
||
|
err := group.Consume(ctx, h.topics, handler)
|
||
|
log.Printf("订阅 topics=%v", h.topics)
|
||
|
if err != nil {
|
||
|
log.Printf("订阅异常=%s", err.Error())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
// 同步生产模式
|
||
|
func Send2Topic(brokers []string, topic, content string) {
|
||
|
producer, err := sarama.NewSyncProducer(brokers, nil)
|
||
|
if err != nil {
|
||
|
log.Fatalln(err)
|
||
|
}
|
||
|
defer func() {
|
||
|
if err := producer.Close(); err != nil {
|
||
|
log.Fatalln(err)
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(content)}
|
||
|
partition, offset, err := producer.SendMessage(msg)
|
||
|
if err != nil {
|
||
|
log.Printf("FAILED to send message: %s\n", err)
|
||
|
} else {
|
||
|
log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
|
||
|
}
|
||
|
|
||
|
}
|