package kafkaHelper import ( "fmt" "github.com/IBM/sarama" "log" "strings" "time" ) func getMsgs(count int, startTime time.Time, offsetTime time.Duration) []string { msgs := make([]string, 0) for i := 0; i < count; i++ { t := startTime.Add(time.Duration(i) * offsetTime) msgs = append(msgs, getMsg(t)) } return msgs } func getMsg(collectTime time.Time) string { collectTimeStr := collectTime.Format(time.RFC3339Nano) rawDataMsg := fmt.Sprintf(` { "userId": "77804162-837d-4ff9-96c0-beb8e8888f8e", "thingId": "8e3eec71-c924-47fd-ac8b-2f28c49ad4e9", "dimensionId": "f65e5990-540d-40ff-9056-af775e5e4c56", "dimCapId": "aaab627b-5a58-454b-a83f-e236a63930bb", "capId": "35ec137a-ea04-4c81-bbaf-23ddae650c0e", "deviceId": "9c94a305-dd18-4809-8fc6-4191c699e726", "scheduleId": "c5559fbd-eaf4-4fd9-b667-30cc40607ea9", "taskId": "c05ccef4-017c-48d5-a2e5-c01fac16f797", "jobId": 1, "jobRepeatId": 1, "triggerTime": "%s", "realTime": "%s", "finishTime": "%s", "seq": 0, "released": false, "data": { "type": 1, "data": { "pressure": 17.186, "temperature": 23.25 }, "result": { "code": 0, "msg": "", "detail": null, "errTimes": 0, "dropped": false } } }`, collectTimeStr, collectTimeStr, collectTimeStr) return rawDataMsg } // AsyncProductMsg 是一个异步发送消息到Kafka的函数 // topic: 消息的主题 // msgs: 需要发送的消息的内容列表 func AsyncProductMsg(topic string, msgs []string) { producer, err := sarama.NewAsyncProducer([]string{"10.8.30.160:30992"}, nil) if err != nil { log.Fatalln(err) } defer func() { if err := producer.Close(); err != nil { log.Fatalln(err) } }() for i := 0; i < len(msgs); i++ { content := msgs[i] msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(content)} producer.Input() <- msg } if err != nil { log.Printf("FAILED to send message: %s\n", err) } } type KafkaAsyncProducer struct { producer sarama.AsyncProducer } func (p *KafkaAsyncProducer) Publish(topic string, messageBytes []byte) { //由于kafka topic 无法分级 需要特殊处理 topic = strings.Split(topic, "/")[0] msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.ByteEncoder(messageBytes)} p.producer.Input() <- msg } func (p *KafkaAsyncProducer) Close() { if err := p.producer.Close(); err != nil { log.Fatalln(err) } } func NewKafkaAsyncProducer(brokers []string) *KafkaAsyncProducer { config := sarama.NewConfig() config.ClientID = "et-go-push" config.Net.DialTimeout = time.Second * 10 config.Net.ReadTimeout = time.Second * 10 config.Net.WriteTimeout = time.Second * 10 producer, err := sarama.NewAsyncProducer(brokers, config) if err != nil { log.Printf("kafka 生产者[%v] 初始化异常 %s", brokers, err.Error()) return nil } //[]string{"10.8.30.160:30092"} //etpush/3/197 return &KafkaAsyncProducer{ producer: producer, } }