diff --git a/adaptors/司南GNSS_MySql库to安心云.go b/adaptors/司南GNSS_MySql库to安心云.go index 065cfae..80cf393 100644 --- a/adaptors/司南GNSS_MySql库to安心云.go +++ b/adaptors/司南GNSS_MySql库to安心云.go @@ -17,10 +17,11 @@ func (the Adaptor_SINOMYSQL_AXYMQTT) Transform(gnssDataList []SinoGnssMySQL.Gnss for _, gnssData := range gnssDataList { OnceDxFiles := allDxFiles[gnssData.GroupName] OnceDxFiles = append(OnceDxFiles, SinoGnssMySQL.DxFile{ - Module: gnssData.StationName, - Channel: 1, - Timespan: gnssData.Time.UnixMilli(), - RawValue: []float64{gnssData.X, gnssData.Y, gnssData.H}, //file_mqtt协议里面只解析RV + Module: gnssData.StationName, + Channel: 1, + Timespan: gnssData.Time.UnixMilli(), + //file_mqtt协议里面只解析RV, m=> mm + RawValue: []float64{gnssData.X * 1000, gnssData.Y * 1000, gnssData.H * 1000}, LimitValue: []float64{}, PhyValue: []float64{}, ThemeValue: []float64{}, diff --git a/configFiles/config_司南GnssMySQL.json b/configFiles/config_司南GnssMySQL.json index 179b69b..c32c4a1 100644 --- a/configFiles/config_司南GnssMySQL.json +++ b/configFiles/config_司南GnssMySQL.json @@ -5,7 +5,8 @@ "db": { "type": "mysql", "connStr": "root:Xuchen@2024@tcp(39.105.5.154:3306)/navi_cloud_sinognss?charset=utf8&parseTime=true" - } + }, + "cronStr": "0/1 * * * *" }, "out": { "mqtt": { @@ -14,13 +15,10 @@ "userName": "upload", "password": "", "clientId": "goInOut_SinoGnssMySQL", - "Topics": [ - "SinoGnss/{{.group_name}}/{{.station_name}}" - ] + "Topics": [] } } }, "info": { - "bridgeCode": "G2320281L0012" } } \ No newline at end of file diff --git a/configFiles/弃用备份/config_承德金隅水泥_河北矿山_大斜阳.json b/configFiles/弃用备份/config_承德金隅水泥_河北矿山_大斜阳.json index 8c60ede..e5722dd 100644 --- a/configFiles/弃用备份/config_承德金隅水泥_河北矿山_大斜阳.json +++ b/configFiles/弃用备份/config_承德金隅水泥_河北矿山_大斜阳.json @@ -5,7 +5,7 @@ "http": { "url": "https://esproxy.anxinyun.cn/anxincloud_themes/_search" }, - "cronStr": "18 0/1 * * *" + "cronStr": "0/1 * * * *" }, "out": { "file": { diff --git a/consumers/SinoGnssMySQL/config.go b/consumers/SinoGnssMySQL/config.go index a6989db..2f0ec37 100644 --- a/consumers/SinoGnssMySQL/config.go +++ b/consumers/SinoGnssMySQL/config.go @@ -12,9 +12,16 @@ type ioConfig struct { Out OUT `json:"out"` } type In struct { - Db config.DbConfig `json:"db"` + Db config.DbConfig `json:"db"` + CronStr string `json:"cronStr"` } type OUT struct { Mqtt config.MqttConfig `json:"mqtt"` } + +// 缓存用 +type RecordInfo struct { + Id int64 `json:"id"` + TableName string `json:"table_name"` +} diff --git a/consumers/consumerSinoGnssMySQL.go b/consumers/consumerSinoGnssMySQL.go index 6876446..1cd39d8 100644 --- a/consumers/consumerSinoGnssMySQL.go +++ b/consumers/consumerSinoGnssMySQL.go @@ -2,10 +2,14 @@ package consumers import ( "encoding/json" + "fmt" "goInOut/adaptors" "goInOut/consumers/SinoGnssMySQL" "goInOut/dbOperate" + "goInOut/monitors" + "goInOut/utils" "log" + "os" "time" ) @@ -16,6 +20,7 @@ type consumerSinoGnssMySQL struct { Info SinoGnssMySQL.ConfigFile InDB *dbOperate.DBHelper outMqtt *dbOperate.MqttHelper + monitor *monitors.CommonMonitorMonitor } func (the *consumerSinoGnssMySQL) LoadConfigJson(cfgStr string) { @@ -42,7 +47,11 @@ func (the *consumerSinoGnssMySQL) InputInitial() error { the.InDB = dbOperate.NewDBHelper( the.Info.IoConfig.In.Db.Type, the.Info.IoConfig.In.Db.ConnStr) - + the.monitor = &monitors.CommonMonitorMonitor{ + MonitorHelper: &monitors.MonitorHelper{CronStr: the.Info.IoConfig.In.CronStr}, + } + the.monitor.Start() + the.monitor.RegisterFun(the.onData) return nil } func (the *consumerSinoGnssMySQL) OutputInitial() error { @@ -58,51 +67,108 @@ func (the *consumerSinoGnssMySQL) OutputInitial() error { return nil } func (the *consumerSinoGnssMySQL) Work() { - //测试 - the.onData() - go func() { for { needPushList := <-the.ch - log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) + if len(the.ch) > 0 { + log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) + } - for _, outTopic := range the.Info.IoConfig.Out.Mqtt.Topics { - for _, push := range needPushList { - log.Printf("推送[%s]: len=%d", push.Topic, len(push.Payload)) - //hex.EncodeToString(pushBytes) - if push.Topic != "" { - outTopic = push.Topic - } - the.outMqtt.Publish(outTopic, push.Payload) + for _, push := range needPushList { + if push.Topic != "" { + the.outMqtt.Publish(push.Topic, push.Payload) } - } + time.Sleep(100 * time.Millisecond) } }() } func (the *consumerSinoGnssMySQL) onData() { - sql := `select d.id,d.station_name,p.group_name,d.time,d.x,d.y,d.h from data_gnss_202412 as d + recordInfo, err := readRecord() + if err != nil { + log.Printf("读取 缓存异常,err=%v", err.Error()) + return + } + sql := fmt.Sprintf(`select d.id,d.station_name,p.group_name,d.time,d.x,d.y,d.h from %s as d LEFT JOIN datasolution as p ON d.station_name=p.sn where p.group_name is not null -ORDER BY p.group_name -limit 10;` +and d.id > %d and d.id <= %d +ORDER BY p.group_name;`, recordInfo.TableName, recordInfo.Id, recordInfo.Id+100) var GnssDatas []SinoGnssMySQL.GnssData - err := the.InDB.Query(&GnssDatas, sql) - if err != nil { + err = the.InDB.Query(&GnssDatas, sql) + if err != nil || len(GnssDatas) == 0 { + log.Printf("当前批次无数据,跳过") return } - + maxId := MaxId(GnssDatas) + log.Printf("当前批次id=%d => %d", recordInfo.Id, maxId) + recordInfo.Id = maxId adaptor := the.getAdaptor() - needPush := adaptor.Transform(GnssDatas) if len(needPush) > 0 { the.ch <- needPush } + fileName := "cache.inout" + //发现新月新纪录 + newTableName := tableNameNow() + if recordInfo.TableName != newTableName { + recordInfo.TableName = newTableName + recordInfo.Id = 0 + } + cacheStr, _ := json.Marshal(recordInfo) + err = utils.SaveCache2File(string(cacheStr), fileName) + if err != nil { + log.Panicf("record id to file,error: %v", err.Error()) + } } func (the *consumerSinoGnssMySQL) getAdaptor() (adaptor adaptors.Adaptor_SINOMYSQL_AXYMQTT) { return adaptors.Adaptor_SINOMYSQL_AXYMQTT{} } + +func readRecord() (SinoGnssMySQL.RecordInfo, error) { + fileName := "cache.inout" + //文件存在? + isExist := utils.FileExists(fileName) + if !isExist { + // 文件不存在,创建文件 + file, err := os.Create(fileName) + if err != nil { + log.Panicf("Error creating file: %v", err) + } + defaultRecord := SinoGnssMySQL.RecordInfo{ + Id: 0, + TableName: tableNameNow(), + } + str, _ := json.Marshal(defaultRecord) + _, err = file.WriteString(string(str)) + if err != nil { + log.Panicf("file write error: %v", err.Error()) + return SinoGnssMySQL.RecordInfo{}, err + } + } + recordStr, err := utils.ReadCache2File(fileName) + if err != nil { + panic("") + } + record := SinoGnssMySQL.RecordInfo{} + err = json.Unmarshal([]byte(recordStr), &record) + return record, err +} + +func MaxId(GnssDatas []SinoGnssMySQL.GnssData) int64 { + maxId := GnssDatas[0].Id + for _, data := range GnssDatas { + if data.Id > maxId { + maxId = data.Id + } + } + return maxId +} + +func tableNameNow() string { + return "data_gnss_" + time.Now().Format("200601") +} diff --git a/dbOperate/mqttHelper.go b/dbOperate/mqttHelper.go index 1888293..6a74fc7 100644 --- a/dbOperate/mqttHelper.go +++ b/dbOperate/mqttHelper.go @@ -25,7 +25,7 @@ type subscribeCall struct { } func (the *MqttHelper) reConn2Subscribe(client mqtt.Client) { - log.Println("mqtt触发重连后的重订阅") + log.Println("mqtt触发链接后的自动订阅") for _, call := range the.subscribeCalls { the.Subscribe(call.topic, call.f) } @@ -86,7 +86,7 @@ func (the *MqttHelper) Publish(topic string, messageBytes []byte) { token := the.client.Publish(topic, 1, false, messageBytes) token.Wait() //the.client.Disconnect(200) - fmt.Printf("[%s] -[%v]推送Msg 长度=%d \n", topic, the.client.IsConnected(), len(messageBytes)) + log.Printf("[%s] -[%v]推送Msg 长度=%d \n", topic, the.client.IsConnected(), len(messageBytes)) } } diff --git a/monitors/commonMonitor.go b/monitors/commonMonitor.go new file mode 100644 index 0000000..dd04e10 --- /dev/null +++ b/monitors/commonMonitor.go @@ -0,0 +1,14 @@ +package monitors + +type CommonMonitorMonitor struct { + *MonitorHelper +} + +func (the *CommonMonitorMonitor) RegisterFun(fun func()) { + the.registerFun(fun) +} + +func (the *CommonMonitorMonitor) Start() { + the.initial() + the.monitorStart() +} diff --git a/utils/cacheFile.go b/utils/cacheFile.go new file mode 100644 index 0000000..17ae27f --- /dev/null +++ b/utils/cacheFile.go @@ -0,0 +1,53 @@ +package utils + +import ( + "fmt" + "log" + "os" +) + +func SaveCache2File(cacheStr string, fileName string) error { + // 打开文件,如果文件不存在则创建 + file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) + if err != nil { + log.Println("Error opening file:", err) + return err + } + defer file.Close() + + // 将变量写入文件 + _, err = file.WriteString(cacheStr) + if err != nil { + log.Println("Error writing to file:", err) + } + return err + +} + +func ReadCache2File(fileName string) (string, error) { + // 打开文件 + file, err := os.Open(fileName) + if err != nil { + log.Println("Error opening file:", err) + return "", err + } + defer file.Close() + + // 读取文件内容 + var content string + _, err = fmt.Fscanf(file, "%s", &content) + if err != nil { + log.Println("Error reading from file:", err) + } + return content, err +} + +func FileExists(filePath string) bool { + _, err := os.Stat(filePath) + if err != nil { + if os.IsNotExist(err) { + return false + } + } + return true +}