et-go 20240919重建
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

122 lines
3.2 KiB

package et_push
import (
"encoding/json"
"fmt"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"gitea.anxinyun.cn/container/common_utils/kafkaHelper"
"log"
"node/stages"
"time"
)
type dataOut struct {
topic string
Id int `json:"id"`
Name string `json:"name"`
Data map[string]any `json:"data"`
CollectTime time.Time `json:"collect_time"`
}
type IPushClient interface {
Publish(topic string, messageBytes []byte)
}
type PushHandler struct {
stage *stages.Stage
pushClients []IPushClient
dataQueue []dataOut
signBatch chan bool
batchCount int
}
func (the *PushHandler) GetStage() stages.Stage {
return *the.stage
}
func NewPushHandler() *PushHandler {
the := &PushHandler{
stage: stages.NewStage("测点数据推送"),
signBatch: make(chan bool, 1),
batchCount: 500,
}
the.addClients()
go the.publishBatchMonitor()
the.stage.AddProcess(the.push)
return the
}
func (the *PushHandler) addClients() {
mqttEnable := configLoad.LoadConfig().GetBool("push.mqtt.enable")
if mqttEnable {
mqttHost := configLoad.LoadConfig().GetString("push.mqtt.host")
mqttPort := configLoad.LoadConfig().GetInt("push.mqtt.port") //clientIdPrefix
clientIdPrefix := configLoad.LoadConfig().GetString("push.mqtt.clientIdPrefix")
mq := common_utils.NewMqttHelper(
mqttHost,
mqttPort,
fmt.Sprintf("%s-%s", clientIdPrefix, time.Now().Format("20060102-150405")),
"",
"",
false,
)
the.pushClients = append(the.pushClients, mq)
}
kafkaEnable := configLoad.LoadConfig().GetBool("push.kafka.enable")
if kafkaEnable {
kafkaBrokers := configLoad.LoadConfig().GetStringSlice("push.kafka.brokers")
ka := kafkaHelper.NewKafkaAsyncProducer(kafkaBrokers)
the.pushClients = append(the.pushClients, ka)
}
}
func (the *PushHandler) push(p *common_models.ProcessData) *common_models.ProcessData {
if len(the.pushClients) == 0 {
return p
}
for _, station := range p.Stations {
dataPush := dataOut{
topic: fmt.Sprintf("etpush/%d/%d", station.Info.StructureId, station.Info.Id),
Id: station.Info.Id,
Name: station.Info.Name,
Data: station.Data.ThemeData,
CollectTime: station.Data.CollectTime,
}
the.dataQueue = append(the.dataQueue, dataPush)
}
if len(the.dataQueue) >= the.batchCount {
log.Printf("推送队列 len=%d > %d,触发 批信号", len(the.dataQueue), the.batchCount)
the.signBatch <- true
}
return p
}
func (the *PushHandler) publish(dataArrayOut []dataOut) {
for i, client := range the.pushClients {
log.Printf("[client-%d]publish %d 条数据", i, len(dataArrayOut))
for _, out := range dataArrayOut {
outBytes, _ := json.Marshal(out)
client.Publish(out.topic, outBytes)
}
}
}
func (the *PushHandler) publishBatchMonitor() {
for {
select {
case <-the.signBatch:
log.Printf("批推送信号,监控器收到")
case <-time.After(200 * time.Millisecond):
}
if len(the.dataQueue) > 0 {
log.Printf("推送队列长度=%d/%d", len(the.dataQueue), cap(the.dataQueue))
count := len(the.dataQueue)
needPush := the.dataQueue[:count]
the.dataQueue = the.dataQueue[count:]
go the.publish(needPush)
}
}
}