Browse Source

推送JSON数组

dev
yfh 1 month ago
parent
commit
62668afdda
  1. 52
      et_push/pushHandler.go

52
et_push/pushHandler.go

@ -13,7 +13,7 @@ import (
)
type dataOut struct {
topic string
StructId int `json:"structId"`
Id int `json:"id"`
Name string `json:"name"`
Data map[string]any `json:"data"`
@ -40,7 +40,7 @@ func NewPushHandler() *PushHandler {
the := &PushHandler{
stage: stages.NewStage("测点数据推送"),
signBatch: make(chan bool, 1),
batchCount: 500,
batchCount: 100,
}
the.addClients()
go the.publishBatchMonitor()
@ -64,6 +64,7 @@ func (the *PushHandler) addClients() {
)
the.pushClients = append(the.pushClients, mq)
}
kafkaEnable := configLoad.LoadConfig().GetBool("push.kafka.enable")
if kafkaEnable {
kafkaBrokers := configLoad.LoadConfig().GetStringSlice("push.kafka.brokers")
@ -71,26 +72,32 @@ func (the *PushHandler) addClients() {
the.pushClients = append(the.pushClients, ka)
}
}
func (the *PushHandler) push(p *common_models.ProcessData) *common_models.ProcessData {
func (the *PushHandler) push(data []*common_models.ProcessData) []*common_models.ProcessData {
if len(the.pushClients) == 0 {
return p
return data
}
for _, station := range p.Stations {
dataPush := dataOut{
topic: fmt.Sprintf("etpush/%d/%d", station.Info.StructureId, station.Info.Id),
Id: station.Info.Id,
Name: station.Info.Name,
Data: station.Data.ThemeData,
CollectTime: station.Data.CollectTime,
for _, p := range data {
for _, station := range p.Stations {
dataPush := dataOut{
//topic: fmt.Sprintf("etpush/%d/%d", station.Info.StructureId, station.Info.Id),
StructId: station.Info.StructureId,
Id: station.Info.Id,
Name: station.Info.Name,
Data: station.Data.ThemeData,
CollectTime: station.Data.CollectTime,
}
the.dataQueue = append(the.dataQueue, dataPush)
}
the.dataQueue = append(the.dataQueue, dataPush)
}
if len(the.dataQueue) >= the.batchCount {
log.Printf("推送队列 len=%d > %d,触发 批信号", len(the.dataQueue), the.batchCount)
the.signBatch <- true
}
return p
return data
}
func (the *PushHandler) publish(dataArrayOut []dataOut) {
@ -98,10 +105,24 @@ func (the *PushHandler) publish(dataArrayOut []dataOut) {
log.Printf("[client-%d]publish %d 条数据", i, len(dataArrayOut))
for _, out := range dataArrayOut {
outBytes, _ := json.Marshal(out)
client.Publish(out.topic, outBytes)
topic := fmt.Sprintf("etpush/%d/%d", out.StructId, out.Id)
client.Publish(topic, outBytes)
}
}
}
func (the *PushHandler) publishArray(dataArrayOut []dataOut) {
outBytes, err := json.Marshal(dataArrayOut)
if err != nil {
log.Printf("JSON Marshal error: %v", err)
return
}
// 遍历每个客户端并推送数据
for i, client := range the.pushClients {
log.Printf("[client-%d] publish %d 条数据", i, len(dataArrayOut))
client.Publish("etpush/0/0", outBytes)
}
}
func (the *PushHandler) publishBatchMonitor() {
@ -116,7 +137,8 @@ func (the *PushHandler) publishBatchMonitor() {
count := len(the.dataQueue)
needPush := the.dataQueue[:count]
the.dataQueue = the.dataQueue[count:]
go the.publish(needPush)
//go the.publish(needPush)
go the.publishArray(needPush)
}
}
}

Loading…
Cancel
Save