数据 输入输出 处理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

113 lines
3.0 KiB

package _kafka
import (
"fmt"
"github.com/IBM/sarama"
"log"
"strings"
"time"
)
func getMsgs(count int, startTime time.Time, offsetTime time.Duration) []string {
msgs := make([]string, 0)
for i := 0; i < count; i++ {
t := startTime.Add(time.Duration(i) * offsetTime)
msgs = append(msgs, getMsg(t))
}
return msgs
}
func getMsg(collectTime time.Time) string {
collectTimeStr := collectTime.Format(time.RFC3339Nano)
rawDataMsg := fmt.Sprintf(`
{
"userId": "77804162-837d-4ff9-96c0-beb8e8888f8e",
"thingId": "8e3eec71-c924-47fd-ac8b-2f28c49ad4e9",
"dimensionId": "f65e5990-540d-40ff-9056-af775e5e4c56",
"dimCapId": "aaab627b-5a58-454b-a83f-e236a63930bb",
"capId": "35ec137a-ea04-4c81-bbaf-23ddae650c0e",
"deviceId": "9c94a305-dd18-4809-8fc6-4191c699e726",
"scheduleId": "c5559fbd-eaf4-4fd9-b667-30cc40607ea9",
"taskId": "c05ccef4-017c-48d5-a2e5-c01fac16f797",
"jobId": 1,
"jobRepeatId": 1,
"triggerTime": "%s",
"realTime": "%s",
"finishTime": "%s",
"seq": 0,
"released": false,
"data": {
"type": 1,
"data": {
"pressure": 17.186,
"temperature": 23.25
},
"result": {
"code": 0,
"msg": "",
"detail": null,
"errTimes": 0,
"dropped": false
}
}
}`, collectTimeStr, collectTimeStr, collectTimeStr)
return rawDataMsg
}
// AsyncProductMsg 是一个异步发送消息到Kafka的函数
// topic: 消息的主题
// msgs: 需要发送的消息的内容列表
func AsyncProductMsg(topic string, msgs []string) {
producer, err := sarama.NewAsyncProducer([]string{"10.8.30.160:30992"}, nil)
if err != nil {
log.Fatalln(err)
}
defer func() {
if err := producer.Close(); err != nil {
log.Fatalln(err)
}
}()
for i := 0; i < len(msgs); i++ {
content := msgs[i]
msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(content)}
producer.Input() <- msg
}
if err != nil {
log.Printf("FAILED to send message: %s\n", err)
}
}
type KafkaAsyncProducer struct {
producer sarama.AsyncProducer
}
func (p *KafkaAsyncProducer) Publish(topic string, messageBytes []byte) {
//由于kafka topic 无法分级 需要特殊处理
topic = strings.Split(topic, "/")[0]
msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.ByteEncoder(messageBytes)}
p.producer.Input() <- msg
}
func (p *KafkaAsyncProducer) Close() {
if err := p.producer.Close(); err != nil {
log.Fatalln(err)
}
}
func NewKafkaAsyncProducer(brokers []string) *KafkaAsyncProducer {
config := sarama.NewConfig()
config.ClientID = "et-go-push"
config.Net.DialTimeout = time.Second * 10
config.Net.ReadTimeout = time.Second * 10
config.Net.WriteTimeout = time.Second * 10
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
log.Printf("_kafka 生产者[%v] 初始化异常 %s", brokers, err.Error())
return nil
}
//[]string{"10.8.30.160:30092"} //etpush/3/197
return &KafkaAsyncProducer{
producer: producer,
}
}