package kafkaHelper import ( "log" "os" "strings" "testing" "time" ) func TestInitial(t *testing.T) { //"10.8.30.160:30092" // os.Setenv("kafkaBrokers", "10.8.30.72:29092,10.8.30.73:29092,10.8.30.74:29092") //os.Setenv("kafkaTopics", "RawData2") } func TestConsumer(t *testing.T) { TestInitial(t) //消费者 brokers := strings.Split(os.Getenv("kafkaBrokers"), ",") //"10.8.30.160:30092" //topics := strings.Split(os.Getenv("kafkaTopics"), ",") groupID := "consumer_group_lk2" cgh := NewConsumerGroupHandler(brokers, groupID) cgh.Subscribe("testTopic01", handler1) cgh.Subscribe("RawData11", handler2) cgh.Worker() println("=======") } func handler1(msg string) bool { log.Printf("handler1 处理消息:%s", msg) return true } func handler2(msg string) bool { log.Printf("handler2 处理消息:%s", msg) return true } func TestProductAsync(t *testing.T) { layout := "2006-01-02T15:04:05.000+0800" timeStr := "2024-02-02T00:00:01.001+0800" startTime, err := time.ParseInLocation(layout, timeStr, time.Local) if err != nil { panic("时间异常") } msgs := getMsgs(2000, startTime, time.Millisecond) topic := "go_RawData" AsyncProductMsg(topic, msgs) time.Sleep(time.Second * 2) }