package consumers import ( "encoding/json" "goInOut/adaptors" "goInOut/consumers/SinoGnssMySQL" "goInOut/dbOperate" "log" "time" ) type consumerSinoGnssMySQL struct { //数据缓存管道 ch chan []adaptors.NeedPush //具体配置 Info SinoGnssMySQL.ConfigFile InDB *dbOperate.DBHelper outMqtt *dbOperate.MqttHelper } func (the *consumerSinoGnssMySQL) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := json.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerSinoGnssMySQL) Initial(cfg string) error { the.LoadConfigJson(cfg) err := the.InputInitial() if err != nil { return err } err = the.OutputInitial() return err } func (the *consumerSinoGnssMySQL) InputInitial() error { the.ch = make(chan []adaptors.NeedPush, 200) //数据入口 the.InDB = dbOperate.NewDBHelper( the.Info.IoConfig.In.Db.Type, the.Info.IoConfig.In.Db.ConnStr) return nil } func (the *consumerSinoGnssMySQL) OutputInitial() error { //数据出口 the.outMqtt = dbOperate.MqttInitial( the.Info.IoConfig.Out.Mqtt.Host, the.Info.IoConfig.Out.Mqtt.Port, the.Info.IoConfig.Out.Mqtt.ClientId, the.Info.IoConfig.Out.Mqtt.UserName, the.Info.IoConfig.Out.Mqtt.Password, false, //按照具体项目来 "") return nil } func (the *consumerSinoGnssMySQL) Work() { //测试 the.onData() go func() { for { needPushList := <-the.ch log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) for _, outTopic := range the.Info.IoConfig.Out.Mqtt.Topics { for _, push := range needPushList { log.Printf("推送[%s]: len=%d", outTopic, len(push.Payload)) //hex.EncodeToString(pushBytes) if push.Topic != "" { outTopic = push.Topic } the.outMqtt.Publish(outTopic, push.Payload) } } time.Sleep(100 * time.Millisecond) } }() } func (the *consumerSinoGnssMySQL) onData() { sql := `select d.id,d.station_name,p.group_name,d.time,d.x,d.y,d.h from data_gnss_202412 as d LEFT JOIN datasolution as p ON d.station_name=p.sn where p.group_name is not null ORDER BY p.group_name limit 10;` var GnssDatas []SinoGnssMySQL.GnssData err := the.InDB.Query(&GnssDatas, sql) if err != nil { return } adaptor := the.getAdaptor() needPush := adaptor.Transform(GnssDatas) if len(needPush) > 0 { the.ch <- needPush } } func (the *consumerSinoGnssMySQL) getAdaptor() (adaptor adaptors.Adaptor_SINOMYSQL_AXYMQTT) { return adaptors.Adaptor_SINOMYSQL_AXYMQTT{} }