package _kafka import ( "log" ) type KafkaHelper struct { Brokers []string GroupId string client *ConsumerGroupHandler } func (the *KafkaHelper) initialClient() *ConsumerGroupHandler { the.client = NewConsumerGroupHandler(the.Brokers, the.GroupId) return the.client } func (the *KafkaHelper) Initial() *ConsumerGroupHandler { return the.initialClient() } func (the *KafkaHelper) Subscribe(topic string, callback func(topic string, msg string) bool) { log.Printf("=================开始订阅 %s [%s]=================", the.Brokers, topic) the.client.Subscribe(topic, callback) } func (the *KafkaHelper) Worker() { go the.client.Worker() }