|
|
|
package kafkaHelper
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"github.com/IBM/sarama"
|
|
|
|
"log"
|
|
|
|
)
|
|
|
|
|
|
|
|
type ConsumerGroupHandler struct {
|
|
|
|
brokers []string
|
|
|
|
topics []string
|
|
|
|
groupId string
|
|
|
|
onHandlersMap map[string]func(*sarama.ConsumerMessage) bool
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewConsumerGroupHandler(Brokers []string, GroupId string) *ConsumerGroupHandler {
|
|
|
|
return &ConsumerGroupHandler{
|
|
|
|
brokers: Brokers,
|
|
|
|
groupId: GroupId,
|
|
|
|
topics: make([]string, 0),
|
|
|
|
onHandlersMap: make(map[string]func(*sarama.ConsumerMessage) bool),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
|
|
|
|
// 在此执行任何必要的设置任务。
|
|
|
|
fmt.Println("kafka Consumed start set")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
|
|
|
|
// 在此执行任何必要的清理任务。
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
func (h *ConsumerGroupHandler) Subscribe(topic string, fun func(string) bool) {
|
|
|
|
h.topics = append(h.topics, topic)
|
|
|
|
|
|
|
|
h.onHandlersMap[topic] = h.decorateSubscribeString(fun)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *ConsumerGroupHandler) SubscribeRaw(topic string, fun func(*sarama.ConsumerMessage) bool) {
|
|
|
|
h.topics = append(h.topics, topic)
|
|
|
|
|
|
|
|
h.onHandlersMap[topic] = fun
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *ConsumerGroupHandler) decorateSubscribeString(handler func(string) bool) func(*sarama.ConsumerMessage) bool {
|
|
|
|
f := func(cm *sarama.ConsumerMessage) bool {
|
|
|
|
msg := string(cm.Value)
|
|
|
|
log.Printf("处理topic[%s]消息 offset=[%d]", cm.Topic, cm.Offset)
|
|
|
|
return handler(msg)
|
|
|
|
}
|
|
|
|
return f
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error {
|
|
|
|
|
|
|
|
for message := range Claim.Messages() {
|
|
|
|
// 在这里处理消息。
|
|
|
|
topic := message.Topic
|
|
|
|
//log.Printf("[%s]Message: %s\n", topic, string(message.Value))
|
|
|
|
|
|
|
|
allOk := true
|
|
|
|
handlerFunc, isValid := h.onHandlersMap[topic]
|
|
|
|
if !isValid {
|
|
|
|
log.Printf("不存在 [%s]的解析", topic)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
allOk = handlerFunc(message)
|
|
|
|
|
|
|
|
if allOk {
|
|
|
|
// 将消息标记为在会话中已处理。
|
|
|
|
session.MarkMessage(message, "is handled")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *ConsumerGroupHandler) Worker() {
|
|
|
|
config := sarama.NewConfig()
|
|
|
|
config.Consumer.Return.Errors = false
|
|
|
|
config.Version = sarama.V2_0_0_0
|
|
|
|
config.Consumer.Offsets.Initial = sarama.OffsetOldest
|
|
|
|
config.Consumer.Offsets.AutoCommit.Enable = true
|
|
|
|
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
|
|
|
|
group, err := sarama.NewConsumerGroup(h.brokers, h.groupId, config)
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
defer func() { _ = group.Close() }()
|
|
|
|
|
|
|
|
// Track errors
|
|
|
|
go func() {
|
|
|
|
for err := range group.Errors() {
|
|
|
|
fmt.Println("ERROR", err)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
// Iterate over consumer sessions.
|
|
|
|
ctx := context.Background()
|
|
|
|
for {
|
|
|
|
//topics := []string{"anxinyun_data"} //my_topic
|
|
|
|
//handler := consumerGroupHandler{}
|
|
|
|
// `Consume` should be called inside an infinite loop, when a
|
|
|
|
// server-side rebalance happens, the consumer session will need to be
|
|
|
|
// recreated to get the new claims
|
|
|
|
handler := h
|
|
|
|
err := group.Consume(ctx, h.topics, handler)
|
|
|
|
log.Printf("订阅 topics=%v", h.topics)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("订阅异常=%s", err.Error())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// 同步生产模式
|
|
|
|
func Send2Topic(brokers []string, topic, content string) {
|
|
|
|
producer, err := sarama.NewSyncProducer(brokers, nil)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatalln(err)
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
if err := producer.Close(); err != nil {
|
|
|
|
log.Fatalln(err)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(content)}
|
|
|
|
partition, offset, err := producer.SendMessage(msg)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("FAILED to send message: %s\n", err)
|
|
|
|
} else {
|
|
|
|
log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|