You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
123 lines
3.2 KiB
123 lines
3.2 KiB
1 month ago
|
package et_push
|
||
|
|
||
|
import (
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"gitea.anxinyun.cn/container/common_models"
|
||
|
"gitea.anxinyun.cn/container/common_utils"
|
||
|
"gitea.anxinyun.cn/container/common_utils/configLoad"
|
||
|
"gitea.anxinyun.cn/container/common_utils/kafkaHelper"
|
||
|
"log"
|
||
|
"node/stages"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
type dataOut struct {
|
||
|
topic string
|
||
|
Id int `json:"id"`
|
||
|
Name string `json:"name"`
|
||
|
Data map[string]any `json:"data"`
|
||
|
CollectTime time.Time `json:"collect_time"`
|
||
|
}
|
||
|
|
||
|
type IPushClient interface {
|
||
|
Publish(topic string, messageBytes []byte)
|
||
|
}
|
||
|
|
||
|
type PushHandler struct {
|
||
|
stage *stages.Stage
|
||
|
pushClients []IPushClient
|
||
|
dataQueue []dataOut
|
||
|
signBatch chan bool
|
||
|
batchCount int
|
||
|
}
|
||
|
|
||
|
func (the *PushHandler) GetStage() stages.Stage {
|
||
|
return *the.stage
|
||
|
}
|
||
|
|
||
|
func NewPushHandler() *PushHandler {
|
||
|
the := &PushHandler{
|
||
|
stage: stages.NewStage("测点数据推送"),
|
||
|
signBatch: make(chan bool, 1),
|
||
|
batchCount: 500,
|
||
|
}
|
||
|
the.addClients()
|
||
|
go the.publishBatchMonitor()
|
||
|
the.stage.AddProcess(the.push)
|
||
|
return the
|
||
|
}
|
||
|
|
||
|
func (the *PushHandler) addClients() {
|
||
|
mqttEnable := configLoad.LoadConfig().GetBool("push.mqtt.enable")
|
||
|
if mqttEnable {
|
||
|
mqttHost := configLoad.LoadConfig().GetString("push.mqtt.host")
|
||
|
mqttPort := configLoad.LoadConfig().GetInt("push.mqtt.port") //clientIdPrefix
|
||
|
clientIdPrefix := configLoad.LoadConfig().GetString("push.mqtt.clientIdPrefix")
|
||
|
mq := common_utils.NewMqttHelper(
|
||
|
mqttHost,
|
||
|
mqttPort,
|
||
|
fmt.Sprintf("%s-%s", clientIdPrefix, time.Now().Format("20060102-150405")),
|
||
|
"",
|
||
|
"",
|
||
|
false,
|
||
|
)
|
||
|
the.pushClients = append(the.pushClients, mq)
|
||
|
}
|
||
|
kafkaEnable := configLoad.LoadConfig().GetBool("push.kafka.enable")
|
||
|
if kafkaEnable {
|
||
|
kafkaBrokers := configLoad.LoadConfig().GetStringSlice("push.kafka.brokers")
|
||
|
ka := kafkaHelper.NewKafkaAsyncProducer(kafkaBrokers)
|
||
|
the.pushClients = append(the.pushClients, ka)
|
||
|
}
|
||
|
}
|
||
|
func (the *PushHandler) push(p *common_models.ProcessData) *common_models.ProcessData {
|
||
|
if len(the.pushClients) == 0 {
|
||
|
return p
|
||
|
}
|
||
|
|
||
|
for _, station := range p.Stations {
|
||
|
dataPush := dataOut{
|
||
|
topic: fmt.Sprintf("etpush/%d/%d", station.Info.StructureId, station.Info.Id),
|
||
|
Id: station.Info.Id,
|
||
|
Name: station.Info.Name,
|
||
|
Data: station.Data.ThemeData,
|
||
|
CollectTime: station.Data.CollectTime,
|
||
|
}
|
||
|
the.dataQueue = append(the.dataQueue, dataPush)
|
||
|
}
|
||
|
if len(the.dataQueue) >= the.batchCount {
|
||
|
log.Printf("推送队列 len=%d > %d,触发 批信号", len(the.dataQueue), the.batchCount)
|
||
|
the.signBatch <- true
|
||
|
}
|
||
|
return p
|
||
|
}
|
||
|
|
||
|
func (the *PushHandler) publish(dataArrayOut []dataOut) {
|
||
|
for i, client := range the.pushClients {
|
||
|
log.Printf("[client-%d]publish %d 条数据", i, len(dataArrayOut))
|
||
|
for _, out := range dataArrayOut {
|
||
|
outBytes, _ := json.Marshal(out)
|
||
|
client.Publish(out.topic, outBytes)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
func (the *PushHandler) publishBatchMonitor() {
|
||
|
for {
|
||
|
select {
|
||
|
case <-the.signBatch:
|
||
|
log.Printf("批推送信号,监控器收到")
|
||
|
case <-time.After(200 * time.Millisecond):
|
||
|
}
|
||
|
if len(the.dataQueue) > 0 {
|
||
|
log.Printf("推送队列长度=%d/%d", len(the.dataQueue), cap(the.dataQueue))
|
||
|
count := len(the.dataQueue)
|
||
|
needPush := the.dataQueue[:count]
|
||
|
the.dataQueue = the.dataQueue[count:]
|
||
|
go the.publish(needPush)
|
||
|
}
|
||
|
}
|
||
|
}
|