Browse Source

update gnss上报 优化处理

pull/2/head
lucas 2 weeks ago
parent
commit
6d979cbe2c
  1. 22
      adaptors/司南GNSS_MySql库to安心云.go
  2. 2
      consumers/consumerSinoGnssMySQL.go
  3. 2
      dbOperate/mqttHelper.go

22
adaptors/司南GNSS_MySql库to安心云.go

@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"goInOut/consumers/SinoGnssMySQL" "goInOut/consumers/SinoGnssMySQL"
"strconv"
) )
// Adaptor_SINOMYSQL_AXYMQTT 数据 转换 江苏农村公路桥梁监测系统 // Adaptor_SINOMYSQL_AXYMQTT 数据 转换 江苏农村公路桥梁监测系统
@ -12,25 +13,32 @@ type Adaptor_SINOMYSQL_AXYMQTT struct {
func (the Adaptor_SINOMYSQL_AXYMQTT) Transform(gnssDataList []SinoGnssMySQL.GnssData) []NeedPush { func (the Adaptor_SINOMYSQL_AXYMQTT) Transform(gnssDataList []SinoGnssMySQL.GnssData) []NeedPush {
var needPush []NeedPush var needPush []NeedPush
var allDxFiles map[string][]SinoGnssMySQL.DxFile allDxFiles := make(map[string][]SinoGnssMySQL.DxFile)
for _, gnssData := range gnssDataList { for _, gnssData := range gnssDataList {
OnceDxFiles := allDxFiles[gnssData.GroupName] OnceDxFiles := allDxFiles[gnssData.GroupName]
OnceDxFiles = append(OnceDxFiles, SinoGnssMySQL.DxFile{ OnceDxFiles = append(OnceDxFiles, SinoGnssMySQL.DxFile{
Module: gnssData.StationName, Module: gnssData.StationName,
Channel: 1,
Timespan: gnssData.Time.UnixMilli(), Timespan: gnssData.Time.UnixMilli(),
RawValue: []float64{gnssData.X, gnssData.Y, gnssData.H}, RawValue: []float64{gnssData.X, gnssData.Y, gnssData.H}, //file_mqtt协议里面只解析RV
PhyValue: []float64{gnssData.X, gnssData.Y, gnssData.H}, LimitValue: []float64{},
ThemeValue: []float64{gnssData.X, gnssData.Y, gnssData.H}, PhyValue: []float64{},
ThemeValue: []float64{},
}) })
allDxFiles[gnssData.GroupName] = OnceDxFiles allDxFiles[gnssData.GroupName] = OnceDxFiles
} }
for groupName, groupFile := range allDxFiles { for groupName, groupFileList := range allDxFiles {
bs, _ := json.Marshal(groupFile) contentMap := make(map[string]string)
for i, file := range groupFileList {
bs, _ := json.Marshal(file)
contentMap[strconv.Itoa(i)] = string(bs)
}
gpContent, _ := json.Marshal(contentMap)
topic := fmt.Sprintf("SinoGnss/%s/", groupName) topic := fmt.Sprintf("SinoGnss/%s/", groupName)
needPush = append(needPush, NeedPush{ needPush = append(needPush, NeedPush{
Topic: topic, Topic: topic,
Payload: bs, Payload: gpContent,
}) })
} }

2
consumers/consumerSinoGnssMySQL.go

@ -68,7 +68,7 @@ func (the *consumerSinoGnssMySQL) Work() {
for _, outTopic := range the.Info.IoConfig.Out.Mqtt.Topics { for _, outTopic := range the.Info.IoConfig.Out.Mqtt.Topics {
for _, push := range needPushList { for _, push := range needPushList {
log.Printf("推送[%s]: len=%d", outTopic, len(push.Payload)) log.Printf("推送[%s]: len=%d", push.Topic, len(push.Payload))
//hex.EncodeToString(pushBytes) //hex.EncodeToString(pushBytes)
if push.Topic != "" { if push.Topic != "" {
outTopic = push.Topic outTopic = push.Topic

2
dbOperate/mqttHelper.go

@ -83,7 +83,7 @@ func NewTlsConfig(sslPath string) *tls.Config {
func (the *MqttHelper) Publish(topic string, messageBytes []byte) { func (the *MqttHelper) Publish(topic string, messageBytes []byte) {
if the.client != nil { if the.client != nil {
token := the.client.Publish(topic, 0, false, messageBytes) token := the.client.Publish(topic, 1, false, messageBytes)
token.Wait() token.Wait()
//the.client.Disconnect(200) //the.client.Disconnect(200)
fmt.Printf("[%s] -[%v]推送Msg 长度=%d \n", topic, the.client.IsConnected(), len(messageBytes)) fmt.Printf("[%s] -[%v]推送Msg 长度=%d \n", topic, the.client.IsConnected(), len(messageBytes))

Loading…
Cancel
Save