package et_push import ( "encoding/json" "fmt" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils" "gitea.anxinyun.cn/container/common_utils/configLoad" "gitea.anxinyun.cn/container/common_utils/kafkaHelper" "log" "node/stages" "time" ) type dataOut struct { topic string Id int `json:"id"` Name string `json:"name"` Data map[string]any `json:"data"` CollectTime time.Time `json:"collect_time"` } type IPushClient interface { Publish(topic string, messageBytes []byte) } type PushHandler struct { stage *stages.Stage pushClients []IPushClient dataQueue []dataOut signBatch chan bool batchCount int } func (the *PushHandler) GetStage() stages.Stage { return *the.stage } func NewPushHandler() *PushHandler { the := &PushHandler{ stage: stages.NewStage("测点数据推送"), signBatch: make(chan bool, 1), batchCount: 500, } the.addClients() go the.publishBatchMonitor() the.stage.AddProcess(the.push) return the } func (the *PushHandler) addClients() { mqttEnable := configLoad.LoadConfig().GetBool("push.mqtt.enable") if mqttEnable { mqttHost := configLoad.LoadConfig().GetString("push.mqtt.host") mqttPort := configLoad.LoadConfig().GetInt("push.mqtt.port") //clientIdPrefix clientIdPrefix := configLoad.LoadConfig().GetString("push.mqtt.clientIdPrefix") mq := common_utils.NewMqttHelper( mqttHost, mqttPort, fmt.Sprintf("%s-%s", clientIdPrefix, time.Now().Format("20060102-150405")), "", "", false, ) the.pushClients = append(the.pushClients, mq) } kafkaEnable := configLoad.LoadConfig().GetBool("push.kafka.enable") if kafkaEnable { kafkaBrokers := configLoad.LoadConfig().GetStringSlice("push.kafka.brokers") ka := kafkaHelper.NewKafkaAsyncProducer(kafkaBrokers) the.pushClients = append(the.pushClients, ka) } } func (the *PushHandler) push(p *common_models.ProcessData) *common_models.ProcessData { if len(the.pushClients) == 0 { return p } for _, station := range p.Stations { dataPush := dataOut{ topic: fmt.Sprintf("etpush/%d/%d", station.Info.StructureId, station.Info.Id), Id: station.Info.Id, Name: station.Info.Name, Data: station.Data.ThemeData, CollectTime: station.Data.CollectTime, } the.dataQueue = append(the.dataQueue, dataPush) } if len(the.dataQueue) >= the.batchCount { log.Printf("推送队列 len=%d > %d,触发 批信号", len(the.dataQueue), the.batchCount) the.signBatch <- true } return p } func (the *PushHandler) publish(dataArrayOut []dataOut) { for i, client := range the.pushClients { log.Printf("[client-%d]publish %d 条数据", i, len(dataArrayOut)) for _, out := range dataArrayOut { outBytes, _ := json.Marshal(out) client.Publish(out.topic, outBytes) } } } func (the *PushHandler) publishBatchMonitor() { for { select { case <-the.signBatch: log.Printf("批推送信号,监控器收到") case <-time.After(200 * time.Millisecond): } if len(the.dataQueue) > 0 { log.Printf("推送队列长度=%d/%d", len(the.dataQueue), cap(the.dataQueue)) count := len(the.dataQueue) needPush := the.dataQueue[:count] the.dataQueue = the.dataQueue[count:] go the.publish(needPush) } } }