package kafkaHelper import ( "context" "fmt" "github.com/IBM/sarama" "log" ) type ConsumerGroupHandler struct { brokers []string topics []string groupId string onHandlersMap map[string]func(*sarama.ConsumerMessage) bool } func NewConsumerGroupHandler(Brokers []string, GroupId string) *ConsumerGroupHandler { return &ConsumerGroupHandler{ brokers: Brokers, groupId: GroupId, topics: make([]string, 0), onHandlersMap: make(map[string]func(*sarama.ConsumerMessage) bool), } } func (h *ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { // 在此执行任何必要的设置任务。 fmt.Println("kafka Consumed start set") return nil } func (h *ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { // 在此执行任何必要的清理任务。 return nil } func (h *ConsumerGroupHandler) Subscribe(topic string, fun func(string) bool) { h.topics = append(h.topics, topic) h.onHandlersMap[topic] = h.decorateSubscribeString(fun) } func (h *ConsumerGroupHandler) SubscribeRaw(topic string, fun func(*sarama.ConsumerMessage) bool) { h.topics = append(h.topics, topic) h.onHandlersMap[topic] = fun } func (h *ConsumerGroupHandler) decorateSubscribeString(handler func(string) bool) func(*sarama.ConsumerMessage) bool { f := func(cm *sarama.ConsumerMessage) bool { msg := string(cm.Value) log.Printf("处理topic[%s]数据 offset=[%d]", cm.Topic, cm.Offset) return handler(msg) } return f } func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error { for message := range Claim.Messages() { // 在这里处理消息。 topic := message.Topic //log.Printf("[%s]Message: %s\n", topic, string(message.Value)) allOk := true handlerFunc, isValid := h.onHandlersMap[topic] if !isValid { log.Printf("不存在 [%s]的解析", topic) continue } allOk = handlerFunc(message) if allOk { // 将消息标记为在会话中已处理。 session.MarkMessage(message, "is handled") } } return nil } func (h *ConsumerGroupHandler) Worker() { config := sarama.NewConfig() config.Consumer.Return.Errors = false config.Version = sarama.V2_0_0_0 config.Consumer.Offsets.Initial = sarama.OffsetOldest config.Consumer.Offsets.AutoCommit.Enable = true config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} group, err := sarama.NewConsumerGroup(h.brokers, h.groupId, config) if err != nil { panic(err) } defer func() { _ = group.Close() }() // Track errors go func() { for err := range group.Errors() { fmt.Println("ERROR", err) } }() // Iterate over consumer sessions. ctx := context.Background() for { //topics := []string{"anxinyun_data"} //my_topic //handler := consumerGroupHandler{} // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims handler := h err := group.Consume(ctx, h.topics, handler) log.Printf("订阅 topics=%v", h.topics) if err != nil { log.Printf("订阅异常=%s", err.Error()) } } } // 同步生产模式 func Send2Topic(brokers []string, topic, content string) { producer, err := sarama.NewSyncProducer(brokers, nil) if err != nil { log.Fatalln(err) } defer func() { if err := producer.Close(); err != nil { log.Fatalln(err) } }() msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(content)} partition, offset, err := producer.SendMessage(msg) if err != nil { log.Printf("FAILED to send message: %s\n", err) } else { log.Printf("> message sent to partition %d at offset %d\n", partition, offset) } }