数据 输入输出 处理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

48 lines
1.1 KiB

package _kafka
import (
"log"
"time"
)
type KafkaHelper struct {
Brokers []string
GroupId string
client *ConsumerGroupHandler
producer *KafkaAsyncProducer
}
func (the *KafkaHelper) initialClient() *ConsumerGroupHandler {
the.client = NewConsumerGroupHandler(the.Brokers, the.GroupId)
return the.client
}
func (the *KafkaHelper) Initial() *ConsumerGroupHandler {
return the.initialClient()
}
func (the *KafkaHelper) Subscribe(topic string, callback func(topic string, msg string) bool) {
log.Printf("=================开始订阅 %s [%s]=================", the.Brokers, topic)
the.client.Subscribe(topic, callback)
}
func (the *KafkaHelper) Publish(topic string, bytes []byte) {
if the.producer == nil {
the.producer = NewKafkaAsyncProducer(the.Brokers, the.GroupId+"_p")
}
the.producer.Publish(topic, bytes)
}
func (the *KafkaHelper) Worker() {
go the.client.Worker()
}
func KafkaInitial(Brokers []string, GroupId string) *KafkaHelper {
kafkaHelper := KafkaHelper{
Brokers: Brokers,
GroupId: GroupId,
}
kafkaHelper.Initial()
time.Sleep(time.Second * 1)
return &kafkaHelper
}