package consumers import ( "encoding/json" "fmt" "goInOut/adaptors" "goInOut/consumers/SinoGnssMySQL" "goInOut/dbOperate" "goInOut/monitors" "goInOut/utils" "log" "os" "time" ) type consumerSinoGnssMySQL struct { //数据缓存管道 ch chan []adaptors.NeedPush //具体配置 Info SinoGnssMySQL.ConfigFile InDB *dbOperate.DBHelper outMqtt *dbOperate.MqttHelper monitor *monitors.CommonMonitor } func (the *consumerSinoGnssMySQL) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := json.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerSinoGnssMySQL) Initial(cfg string) error { the.LoadConfigJson(cfg) err := the.InputInitial() if err != nil { return err } err = the.OutputInitial() return err } func (the *consumerSinoGnssMySQL) InputInitial() error { the.ch = make(chan []adaptors.NeedPush, 200) //数据入口 the.InDB = dbOperate.NewDBHelper( the.Info.IoConfig.In.Db.Type, the.Info.IoConfig.In.Db.ConnStr) the.monitor = &monitors.CommonMonitor{ MonitorHelper: &monitors.MonitorHelper{CronStr: the.Info.IoConfig.In.CronStr}, } the.monitor.Start() the.monitor.RegisterFun(the.onData) return nil } func (the *consumerSinoGnssMySQL) OutputInitial() error { //数据出口 the.outMqtt = dbOperate.MqttInitial( the.Info.IoConfig.Out.Mqtt.Host, the.Info.IoConfig.Out.Mqtt.Port, the.Info.IoConfig.Out.Mqtt.ClientId, the.Info.IoConfig.Out.Mqtt.UserName, the.Info.IoConfig.Out.Mqtt.Password, false, //按照具体项目来 "") return nil } func (the *consumerSinoGnssMySQL) Work() { go func() { for { needPushList := <-the.ch if len(the.ch) > 0 { log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) } for _, push := range needPushList { if push.Topic != "" { the.outMqtt.Publish(push.Topic, push.Payload) } } time.Sleep(100 * time.Millisecond) } }() } func (the *consumerSinoGnssMySQL) onData() { recordInfo, err := readRecord() if err != nil { log.Printf("读取 缓存异常,err=%v", err.Error()) return } sql := fmt.Sprintf(`select d.id,d.station_name,p.group_name,d.time,d.x,d.y,d.h from %s as d LEFT JOIN datasolution as p ON d.station_name=p.sn where p.group_name is not null and d.id > %d and d.id <= %d ORDER BY p.group_name;`, recordInfo.TableName, recordInfo.Id, recordInfo.Id+200) var GnssDatas []SinoGnssMySQL.GnssData err = the.InDB.Query(&GnssDatas, sql) if err != nil || len(GnssDatas) == 0 { log.Printf("当前批次无数据,跳过") return } maxId := MaxId(GnssDatas) log.Printf("当前批次id=%d => %d", recordInfo.Id, maxId) recordInfo.Id = maxId adaptor := the.getAdaptor() needPush := adaptor.Transform(GnssDatas) if len(needPush) > 0 { the.ch <- needPush } fileName := "cache.inout" //发现新月新纪录 newTableName := tableNameNow() if recordInfo.TableName != newTableName { recordInfo.TableName = newTableName recordInfo.Id = 0 } cacheStr, _ := json.Marshal(recordInfo) err = utils.SaveCache2File(string(cacheStr), fileName) if err != nil { log.Panicf("record id to file,error: %v", err.Error()) } } func (the *consumerSinoGnssMySQL) getAdaptor() (adaptor adaptors.Adaptor_SINOMYSQL_AXYMQTT) { return adaptors.Adaptor_SINOMYSQL_AXYMQTT{} } func readRecord() (SinoGnssMySQL.RecordInfo, error) { fileName := "cache.inout" //文件存在? isExist := utils.FileExists(fileName) if !isExist { // 文件不存在,创建文件 file, err := os.Create(fileName) if err != nil { log.Panicf("Error creating file: %v", err) } defaultRecord := SinoGnssMySQL.RecordInfo{ Id: 0, TableName: tableNameNow(), } str, _ := json.Marshal(defaultRecord) _, err = file.WriteString(string(str)) if err != nil { log.Panicf("file write error: %v", err.Error()) return SinoGnssMySQL.RecordInfo{}, err } } recordStr, err := utils.ReadCache2File(fileName) if err != nil { panic("") } record := SinoGnssMySQL.RecordInfo{} err = json.Unmarshal([]byte(recordStr), &record) return record, err } func MaxId(GnssDatas []SinoGnssMySQL.GnssData) int64 { maxId := GnssDatas[0].Id for _, data := range GnssDatas { if data.Id > maxId { maxId = data.Id } } return maxId } func tableNameNow() string { return "data_gnss_" + time.Now().Format("200601") }