You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
174 lines
4.3 KiB
174 lines
4.3 KiB
package consumers
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"goInOut/adaptors"
|
|
"goInOut/consumers/SinoGnssMySQL"
|
|
"goInOut/dbOperate"
|
|
"goInOut/monitors"
|
|
"goInOut/utils"
|
|
"log"
|
|
"os"
|
|
"time"
|
|
)
|
|
|
|
type consumerSinoGnssMySQL struct {
|
|
//数据缓存管道
|
|
ch chan []adaptors.NeedPush
|
|
//具体配置
|
|
Info SinoGnssMySQL.ConfigFile
|
|
InDB *dbOperate.DBHelper
|
|
outMqtt *dbOperate.MqttHelper
|
|
monitor *monitors.CommonMonitor
|
|
}
|
|
|
|
func (the *consumerSinoGnssMySQL) LoadConfigJson(cfgStr string) {
|
|
// 将 JSON 格式的数据解析到结构体中
|
|
err := json.Unmarshal([]byte(cfgStr), &the.Info)
|
|
if err != nil {
|
|
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (the *consumerSinoGnssMySQL) Initial(cfg string) error {
|
|
the.LoadConfigJson(cfg)
|
|
err := the.InputInitial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = the.OutputInitial()
|
|
return err
|
|
}
|
|
func (the *consumerSinoGnssMySQL) InputInitial() error {
|
|
the.ch = make(chan []adaptors.NeedPush, 200)
|
|
//数据入口
|
|
the.InDB = dbOperate.NewDBHelper(
|
|
the.Info.IoConfig.In.Db.Type,
|
|
the.Info.IoConfig.In.Db.ConnStr)
|
|
the.monitor = &monitors.CommonMonitor{
|
|
MonitorHelper: &monitors.MonitorHelper{CronStr: the.Info.IoConfig.In.CronStr},
|
|
}
|
|
the.monitor.Start()
|
|
the.monitor.RegisterFun(the.onData)
|
|
return nil
|
|
}
|
|
func (the *consumerSinoGnssMySQL) OutputInitial() error {
|
|
//数据出口
|
|
the.outMqtt = dbOperate.MqttInitial(
|
|
the.Info.IoConfig.Out.Mqtt.Host,
|
|
the.Info.IoConfig.Out.Mqtt.Port,
|
|
the.Info.IoConfig.Out.Mqtt.ClientId,
|
|
the.Info.IoConfig.Out.Mqtt.UserName,
|
|
the.Info.IoConfig.Out.Mqtt.Password,
|
|
false, //按照具体项目来
|
|
"")
|
|
return nil
|
|
}
|
|
func (the *consumerSinoGnssMySQL) Work() {
|
|
go func() {
|
|
for {
|
|
needPushList := <-the.ch
|
|
if len(the.ch) > 0 {
|
|
log.Printf("取出ch数据,剩余[%d] ", len(the.ch))
|
|
}
|
|
|
|
for _, push := range needPushList {
|
|
if push.Topic != "" {
|
|
the.outMqtt.Publish(push.Topic, push.Payload)
|
|
}
|
|
}
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}()
|
|
}
|
|
func (the *consumerSinoGnssMySQL) onData() {
|
|
recordInfo, err := readRecord()
|
|
if err != nil {
|
|
log.Printf("读取 缓存异常,err=%v", err.Error())
|
|
return
|
|
}
|
|
sql := fmt.Sprintf(`select d.id,d.station_name,p.group_name,d.time,d.x,d.y,d.h from %s as d
|
|
LEFT JOIN datasolution as p
|
|
ON d.station_name=p.sn
|
|
where p.group_name is not null
|
|
and d.id > %d and d.id <= %d
|
|
ORDER BY p.group_name;`, recordInfo.TableName, recordInfo.Id, recordInfo.Id+200)
|
|
var GnssDatas []SinoGnssMySQL.GnssData
|
|
err = the.InDB.Query(&GnssDatas, sql)
|
|
if err != nil || len(GnssDatas) == 0 {
|
|
log.Printf("当前批次无数据,跳过")
|
|
return
|
|
}
|
|
maxId := MaxId(GnssDatas)
|
|
log.Printf("当前批次id=%d => %d", recordInfo.Id, maxId)
|
|
recordInfo.Id = maxId
|
|
adaptor := the.getAdaptor()
|
|
needPush := adaptor.Transform(GnssDatas)
|
|
if len(needPush) > 0 {
|
|
the.ch <- needPush
|
|
}
|
|
fileName := "cache.inout"
|
|
//发现新月新纪录
|
|
newTableName := tableNameNow()
|
|
if recordInfo.TableName != newTableName {
|
|
recordInfo.TableName = newTableName
|
|
recordInfo.Id = 0
|
|
}
|
|
cacheStr, _ := json.Marshal(recordInfo)
|
|
err = utils.SaveCache2File(string(cacheStr), fileName)
|
|
if err != nil {
|
|
log.Panicf("record id to file,error: %v", err.Error())
|
|
}
|
|
|
|
}
|
|
func (the *consumerSinoGnssMySQL) getAdaptor() (adaptor adaptors.Adaptor_SINOMYSQL_AXYMQTT) {
|
|
|
|
return adaptors.Adaptor_SINOMYSQL_AXYMQTT{}
|
|
}
|
|
|
|
func readRecord() (SinoGnssMySQL.RecordInfo, error) {
|
|
fileName := "cache.inout"
|
|
//文件存在?
|
|
isExist := utils.FileExists(fileName)
|
|
if !isExist {
|
|
// 文件不存在,创建文件
|
|
file, err := os.Create(fileName)
|
|
if err != nil {
|
|
log.Panicf("Error creating file: %v", err)
|
|
}
|
|
defaultRecord := SinoGnssMySQL.RecordInfo{
|
|
Id: 0,
|
|
TableName: tableNameNow(),
|
|
}
|
|
str, _ := json.Marshal(defaultRecord)
|
|
_, err = file.WriteString(string(str))
|
|
if err != nil {
|
|
log.Panicf("file write error: %v", err.Error())
|
|
return SinoGnssMySQL.RecordInfo{}, err
|
|
}
|
|
}
|
|
recordStr, err := utils.ReadCache2File(fileName)
|
|
if err != nil {
|
|
panic("")
|
|
}
|
|
record := SinoGnssMySQL.RecordInfo{}
|
|
err = json.Unmarshal([]byte(recordStr), &record)
|
|
return record, err
|
|
}
|
|
|
|
func MaxId(GnssDatas []SinoGnssMySQL.GnssData) int64 {
|
|
maxId := GnssDatas[0].Id
|
|
for _, data := range GnssDatas {
|
|
if data.Id > maxId {
|
|
maxId = data.Id
|
|
}
|
|
}
|
|
return maxId
|
|
}
|
|
|
|
func tableNameNow() string {
|
|
return "data_gnss_" + time.Now().Format("200601")
|
|
}
|
|
|