数据 输入输出 处理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
3.7 KiB

package _kafka
import (
"context"
"fmt"
"github.com/IBM/sarama"
"log"
)
type ConsumerGroupHandler struct {
brokers []string
topics []string
groupId string
onHandlersMap map[string]func(*sarama.ConsumerMessage) bool
}
func NewConsumerGroupHandler(Brokers []string, GroupId string) *ConsumerGroupHandler {
return &ConsumerGroupHandler{
brokers: Brokers,
groupId: GroupId,
topics: make([]string, 0),
onHandlersMap: make(map[string]func(*sarama.ConsumerMessage) bool),
}
}
func (h *ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
// 在此执行任何必要的设置任务。
fmt.Println("_kafka Consumed start set")
return nil
}
func (h *ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
// 在此执行任何必要的清理任务。
return nil
}
func (h *ConsumerGroupHandler) Subscribe(topic string, fun func(string, string) bool) {
h.topics = append(h.topics, topic)
h.onHandlersMap[topic] = h.decorateSubscribeString(fun)
}
func (h *ConsumerGroupHandler) SubscribeRaw(topic string, fun func(*sarama.ConsumerMessage) bool) {
h.topics = append(h.topics, topic)
h.onHandlersMap[topic] = fun
}
func (h *ConsumerGroupHandler) decorateSubscribeString(handler func(string, string) bool) func(*sarama.ConsumerMessage) bool {
f := func(cm *sarama.ConsumerMessage) bool {
msg := string(cm.Value)
log.Printf("处理topic[%s]数据 offset=[%d]", cm.Topic, cm.Offset)
return handler(cm.Topic, msg)
}
return f
}
func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error {
for message := range Claim.Messages() {
// 在这里处理消息。
topic := message.Topic
//log.Printf("[%s]Message: %s\n", topic, string(message.Value))
allOk := true
handlerFunc, isValid := h.onHandlersMap[topic]
if !isValid {
log.Printf("不存在 [%s]的解析", topic)
continue
}
allOk = handlerFunc(message)
if allOk {
// 将消息标记为在会话中已处理。
session.MarkMessage(message, "is handled")
}
}
return nil
}
func (h *ConsumerGroupHandler) Worker() {
config := sarama.NewConfig()
config.Consumer.Return.Errors = false
config.Version = sarama.V2_0_0_0
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
group, err := sarama.NewConsumerGroup(h.brokers, h.groupId, config)
if err != nil {
panic(err)
}
defer func() { _ = group.Close() }()
// Track errors
go func() {
for err := range group.Errors() {
fmt.Println("ERROR", err)
}
}()
// Iterate over consumer sessions.
ctx := context.Background()
for {
//topics := []string{"anxinyun_data"} //my_topic
//handler := consumerGroupHandler{}
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
handler := h
err := group.Consume(ctx, h.topics, handler)
log.Printf("订阅 topics=%v", h.topics)
if err != nil {
log.Printf("订阅异常=%s", err.Error())
}
}
}
// 同步生产模式
func Send2Topic(brokers []string, topic, content string) {
producer, err := sarama.NewSyncProducer(brokers, nil)
if err != nil {
log.Fatalln(err)
}
defer func() {
if err := producer.Close(); err != nil {
log.Fatalln(err)
}
}()
msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(content)}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("FAILED to send message: %s\n", err)
} else {
log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
}
}