diff --git a/et_push/pushHandler.go b/et_push/pushHandler.go index 5ba8495..0e38c91 100644 --- a/et_push/pushHandler.go +++ b/et_push/pushHandler.go @@ -13,7 +13,7 @@ import ( ) type dataOut struct { - topic string + StructId int `json:"structId"` Id int `json:"id"` Name string `json:"name"` Data map[string]any `json:"data"` @@ -40,7 +40,7 @@ func NewPushHandler() *PushHandler { the := &PushHandler{ stage: stages.NewStage("测点数据推送"), signBatch: make(chan bool, 1), - batchCount: 500, + batchCount: 100, } the.addClients() go the.publishBatchMonitor() @@ -64,6 +64,7 @@ func (the *PushHandler) addClients() { ) the.pushClients = append(the.pushClients, mq) } + kafkaEnable := configLoad.LoadConfig().GetBool("push.kafka.enable") if kafkaEnable { kafkaBrokers := configLoad.LoadConfig().GetStringSlice("push.kafka.brokers") @@ -71,26 +72,32 @@ func (the *PushHandler) addClients() { the.pushClients = append(the.pushClients, ka) } } -func (the *PushHandler) push(p *common_models.ProcessData) *common_models.ProcessData { + +func (the *PushHandler) push(data []*common_models.ProcessData) []*common_models.ProcessData { if len(the.pushClients) == 0 { - return p + return data } - for _, station := range p.Stations { - dataPush := dataOut{ - topic: fmt.Sprintf("etpush/%d/%d", station.Info.StructureId, station.Info.Id), - Id: station.Info.Id, - Name: station.Info.Name, - Data: station.Data.ThemeData, - CollectTime: station.Data.CollectTime, + for _, p := range data { + for _, station := range p.Stations { + dataPush := dataOut{ + //topic: fmt.Sprintf("etpush/%d/%d", station.Info.StructureId, station.Info.Id), + StructId: station.Info.StructureId, + Id: station.Info.Id, + Name: station.Info.Name, + Data: station.Data.ThemeData, + CollectTime: station.Data.CollectTime, + } + the.dataQueue = append(the.dataQueue, dataPush) } - the.dataQueue = append(the.dataQueue, dataPush) } + if len(the.dataQueue) >= the.batchCount { log.Printf("推送队列 len=%d > %d,触发 批信号", len(the.dataQueue), the.batchCount) the.signBatch <- true } - return p + + return data } func (the *PushHandler) publish(dataArrayOut []dataOut) { @@ -98,10 +105,24 @@ func (the *PushHandler) publish(dataArrayOut []dataOut) { log.Printf("[client-%d]publish %d 条数据", i, len(dataArrayOut)) for _, out := range dataArrayOut { outBytes, _ := json.Marshal(out) - client.Publish(out.topic, outBytes) + topic := fmt.Sprintf("etpush/%d/%d", out.StructId, out.Id) + client.Publish(topic, outBytes) } } +} +func (the *PushHandler) publishArray(dataArrayOut []dataOut) { + outBytes, err := json.Marshal(dataArrayOut) + if err != nil { + log.Printf("JSON Marshal error: %v", err) + return + } + + // 遍历每个客户端并推送数据 + for i, client := range the.pushClients { + log.Printf("[client-%d] publish %d 条数据", i, len(dataArrayOut)) + client.Publish("etpush/0/0", outBytes) + } } func (the *PushHandler) publishBatchMonitor() { @@ -116,7 +137,8 @@ func (the *PushHandler) publishBatchMonitor() { count := len(the.dataQueue) needPush := the.dataQueue[:count] the.dataQueue = the.dataQueue[count:] - go the.publish(needPush) + //go the.publish(needPush) + go the.publishArray(needPush) } } }