Browse Source

update 更新 最新 郑州旭宸 -司南gnss 上报

pull/2/head
lucas 2 weeks ago
parent
commit
c681f9e48e
  1. 9
      adaptors/司南GNSS_MySql库to安心云.go
  2. 8
      configFiles/config_司南GnssMySQL.json
  3. 2
      configFiles/弃用备份/config_承德金隅水泥_河北矿山_大斜阳.json
  4. 9
      consumers/SinoGnssMySQL/config.go
  5. 108
      consumers/consumerSinoGnssMySQL.go
  6. 4
      dbOperate/mqttHelper.go
  7. 14
      monitors/commonMonitor.go
  8. 53
      utils/cacheFile.go

9
adaptors/司南GNSS_MySql库to安心云.go

@ -17,10 +17,11 @@ func (the Adaptor_SINOMYSQL_AXYMQTT) Transform(gnssDataList []SinoGnssMySQL.Gnss
for _, gnssData := range gnssDataList {
OnceDxFiles := allDxFiles[gnssData.GroupName]
OnceDxFiles = append(OnceDxFiles, SinoGnssMySQL.DxFile{
Module: gnssData.StationName,
Channel: 1,
Timespan: gnssData.Time.UnixMilli(),
RawValue: []float64{gnssData.X, gnssData.Y, gnssData.H}, //file_mqtt协议里面只解析RV
Module: gnssData.StationName,
Channel: 1,
Timespan: gnssData.Time.UnixMilli(),
//file_mqtt协议里面只解析RV, m=> mm
RawValue: []float64{gnssData.X * 1000, gnssData.Y * 1000, gnssData.H * 1000},
LimitValue: []float64{},
PhyValue: []float64{},
ThemeValue: []float64{},

8
configFiles/config_司南GnssMySQL.json

@ -5,7 +5,8 @@
"db": {
"type": "mysql",
"connStr": "root:Xuchen@2024@tcp(39.105.5.154:3306)/navi_cloud_sinognss?charset=utf8&parseTime=true"
}
},
"cronStr": "0/1 * * * *"
},
"out": {
"mqtt": {
@ -14,13 +15,10 @@
"userName": "upload",
"password": "",
"clientId": "goInOut_SinoGnssMySQL",
"Topics": [
"SinoGnss/{{.group_name}}/{{.station_name}}"
]
"Topics": []
}
}
},
"info": {
"bridgeCode": "G2320281L0012"
}
}

2
configFiles/弃用备份/config_承德金隅水泥_河北矿山_大斜阳.json

@ -5,7 +5,7 @@
"http": {
"url": "https://esproxy.anxinyun.cn/anxincloud_themes/_search"
},
"cronStr": "18 0/1 * * *"
"cronStr": "0/1 * * * *"
},
"out": {
"file": {

9
consumers/SinoGnssMySQL/config.go

@ -12,9 +12,16 @@ type ioConfig struct {
Out OUT `json:"out"`
}
type In struct {
Db config.DbConfig `json:"db"`
Db config.DbConfig `json:"db"`
CronStr string `json:"cronStr"`
}
type OUT struct {
Mqtt config.MqttConfig `json:"mqtt"`
}
// 缓存用
type RecordInfo struct {
Id int64 `json:"id"`
TableName string `json:"table_name"`
}

108
consumers/consumerSinoGnssMySQL.go

@ -2,10 +2,14 @@ package consumers
import (
"encoding/json"
"fmt"
"goInOut/adaptors"
"goInOut/consumers/SinoGnssMySQL"
"goInOut/dbOperate"
"goInOut/monitors"
"goInOut/utils"
"log"
"os"
"time"
)
@ -16,6 +20,7 @@ type consumerSinoGnssMySQL struct {
Info SinoGnssMySQL.ConfigFile
InDB *dbOperate.DBHelper
outMqtt *dbOperate.MqttHelper
monitor *monitors.CommonMonitorMonitor
}
func (the *consumerSinoGnssMySQL) LoadConfigJson(cfgStr string) {
@ -42,7 +47,11 @@ func (the *consumerSinoGnssMySQL) InputInitial() error {
the.InDB = dbOperate.NewDBHelper(
the.Info.IoConfig.In.Db.Type,
the.Info.IoConfig.In.Db.ConnStr)
the.monitor = &monitors.CommonMonitorMonitor{
MonitorHelper: &monitors.MonitorHelper{CronStr: the.Info.IoConfig.In.CronStr},
}
the.monitor.Start()
the.monitor.RegisterFun(the.onData)
return nil
}
func (the *consumerSinoGnssMySQL) OutputInitial() error {
@ -58,51 +67,108 @@ func (the *consumerSinoGnssMySQL) OutputInitial() error {
return nil
}
func (the *consumerSinoGnssMySQL) Work() {
//测试
the.onData()
go func() {
for {
needPushList := <-the.ch
log.Printf("取出ch数据,剩余[%d] ", len(the.ch))
if len(the.ch) > 0 {
log.Printf("取出ch数据,剩余[%d] ", len(the.ch))
}
for _, outTopic := range the.Info.IoConfig.Out.Mqtt.Topics {
for _, push := range needPushList {
log.Printf("推送[%s]: len=%d", push.Topic, len(push.Payload))
//hex.EncodeToString(pushBytes)
if push.Topic != "" {
outTopic = push.Topic
}
the.outMqtt.Publish(outTopic, push.Payload)
for _, push := range needPushList {
if push.Topic != "" {
the.outMqtt.Publish(push.Topic, push.Payload)
}
}
time.Sleep(100 * time.Millisecond)
}
}()
}
func (the *consumerSinoGnssMySQL) onData() {
sql := `select d.id,d.station_name,p.group_name,d.time,d.x,d.y,d.h from data_gnss_202412 as d
recordInfo, err := readRecord()
if err != nil {
log.Printf("读取 缓存异常,err=%v", err.Error())
return
}
sql := fmt.Sprintf(`select d.id,d.station_name,p.group_name,d.time,d.x,d.y,d.h from %s as d
LEFT JOIN datasolution as p
ON d.station_name=p.sn
where p.group_name is not null
ORDER BY p.group_name
limit 10;`
and d.id > %d and d.id <= %d
ORDER BY p.group_name;`, recordInfo.TableName, recordInfo.Id, recordInfo.Id+100)
var GnssDatas []SinoGnssMySQL.GnssData
err := the.InDB.Query(&GnssDatas, sql)
if err != nil {
err = the.InDB.Query(&GnssDatas, sql)
if err != nil || len(GnssDatas) == 0 {
log.Printf("当前批次无数据,跳过")
return
}
maxId := MaxId(GnssDatas)
log.Printf("当前批次id=%d => %d", recordInfo.Id, maxId)
recordInfo.Id = maxId
adaptor := the.getAdaptor()
needPush := adaptor.Transform(GnssDatas)
if len(needPush) > 0 {
the.ch <- needPush
}
fileName := "cache.inout"
//发现新月新纪录
newTableName := tableNameNow()
if recordInfo.TableName != newTableName {
recordInfo.TableName = newTableName
recordInfo.Id = 0
}
cacheStr, _ := json.Marshal(recordInfo)
err = utils.SaveCache2File(string(cacheStr), fileName)
if err != nil {
log.Panicf("record id to file,error: %v", err.Error())
}
}
func (the *consumerSinoGnssMySQL) getAdaptor() (adaptor adaptors.Adaptor_SINOMYSQL_AXYMQTT) {
return adaptors.Adaptor_SINOMYSQL_AXYMQTT{}
}
func readRecord() (SinoGnssMySQL.RecordInfo, error) {
fileName := "cache.inout"
//文件存在?
isExist := utils.FileExists(fileName)
if !isExist {
// 文件不存在,创建文件
file, err := os.Create(fileName)
if err != nil {
log.Panicf("Error creating file: %v", err)
}
defaultRecord := SinoGnssMySQL.RecordInfo{
Id: 0,
TableName: tableNameNow(),
}
str, _ := json.Marshal(defaultRecord)
_, err = file.WriteString(string(str))
if err != nil {
log.Panicf("file write error: %v", err.Error())
return SinoGnssMySQL.RecordInfo{}, err
}
}
recordStr, err := utils.ReadCache2File(fileName)
if err != nil {
panic("")
}
record := SinoGnssMySQL.RecordInfo{}
err = json.Unmarshal([]byte(recordStr), &record)
return record, err
}
func MaxId(GnssDatas []SinoGnssMySQL.GnssData) int64 {
maxId := GnssDatas[0].Id
for _, data := range GnssDatas {
if data.Id > maxId {
maxId = data.Id
}
}
return maxId
}
func tableNameNow() string {
return "data_gnss_" + time.Now().Format("200601")
}

4
dbOperate/mqttHelper.go

@ -25,7 +25,7 @@ type subscribeCall struct {
}
func (the *MqttHelper) reConn2Subscribe(client mqtt.Client) {
log.Println("mqtt触发重连后的重订阅")
log.Println("mqtt触发链接后的自动订阅")
for _, call := range the.subscribeCalls {
the.Subscribe(call.topic, call.f)
}
@ -86,7 +86,7 @@ func (the *MqttHelper) Publish(topic string, messageBytes []byte) {
token := the.client.Publish(topic, 1, false, messageBytes)
token.Wait()
//the.client.Disconnect(200)
fmt.Printf("[%s] -[%v]推送Msg 长度=%d \n", topic, the.client.IsConnected(), len(messageBytes))
log.Printf("[%s] -[%v]推送Msg 长度=%d \n", topic, the.client.IsConnected(), len(messageBytes))
}
}

14
monitors/commonMonitor.go

@ -0,0 +1,14 @@
package monitors
type CommonMonitorMonitor struct {
*MonitorHelper
}
func (the *CommonMonitorMonitor) RegisterFun(fun func()) {
the.registerFun(fun)
}
func (the *CommonMonitorMonitor) Start() {
the.initial()
the.monitorStart()
}

53
utils/cacheFile.go

@ -0,0 +1,53 @@
package utils
import (
"fmt"
"log"
"os"
)
func SaveCache2File(cacheStr string, fileName string) error {
// 打开文件,如果文件不存在则创建
file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
if err != nil {
log.Println("Error opening file:", err)
return err
}
defer file.Close()
// 将变量写入文件
_, err = file.WriteString(cacheStr)
if err != nil {
log.Println("Error writing to file:", err)
}
return err
}
func ReadCache2File(fileName string) (string, error) {
// 打开文件
file, err := os.Open(fileName)
if err != nil {
log.Println("Error opening file:", err)
return "", err
}
defer file.Close()
// 读取文件内容
var content string
_, err = fmt.Fscanf(file, "%s", &content)
if err != nil {
log.Println("Error reading from file:", err)
}
return content, err
}
func FileExists(filePath string) bool {
_, err := os.Stat(filePath)
if err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}
Loading…
Cancel
Save