package consumers import ( "encoding/json" "goUpload/adaptors" "goUpload/consumers/WJHP" "goUpload/dbHelper" "goUpload/monitors" "log" "strings" "time" ) type consumerWJHP struct { //数据缓存管道 dataCache chan []byte //具体配置 Info WJHP.ConfigFile InMqtt *dbHelper.MqttHelper InFileMonitor *monitors.FileMonitor outHttpPost *dbHelper.HttpHelper } func (the *consumerWJHP) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := json.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerWJHP) Initial(cfg string) error { the.LoadConfigJson(cfg) err := the.InputInitial() if err != nil { return err } err = the.OutputInitial() return err } func (the *consumerWJHP) InputInitial() error { the.dataCache = make(chan []byte, 200) //mqtt数据入口 the.InMqtt = dbHelper.MqttInitial( the.Info.IOConfig.InMqtt.Host, the.Info.IOConfig.InMqtt.Port, the.Info.IOConfig.InMqtt.ClientId, the.Info.IOConfig.InMqtt.UserName, the.Info.IOConfig.InMqtt.Password, false) for _, inTopic := range the.Info.IOConfig.InMqtt.Topics { the.InMqtt.Subscribe(inTopic, the.onData) } return nil } func (the *consumerWJHP) OutputInitial() error { //数据出口 the.outHttpPost = &dbHelper.HttpHelper{ Url: the.Info.IOConfig.OutHttpPost.Url, } the.outHttpPost.Initial() return nil } func (the *consumerWJHP) Work() { go func() { for { pushBytes := <-the.dataCache log.Printf("取出ch数据,剩余[%d]", len(the.dataCache)) log.Printf("准备推送=%s", pushBytes) resp, err := the.outHttpPost.Publish(pushBytes) log.Printf("推送[%s]%v\n结果[%s]: ", the.outHttpPost.Url, err, resp) time.Sleep(50 * time.Millisecond) } }() } func (the *consumerWJHP) onData(Topic string, Msg string) { if len(Msg) > 80 { log.Printf("mqtt-recv:[%s]:%s ...", Topic, Msg[:100]) } var needPush []byte topicPrefixIndex := strings.LastIndex(Topic, "/") matchTopic := Topic[:topicPrefixIndex] deviceModule := Topic[topicPrefixIndex+1:] adaptor := the.getAdaptor(matchTopic) if adaptor != nil { needPush = adaptor.Transform(deviceModule, Msg) if len(needPush) > 0 { the.dataCache <- needPush } } } func (the *consumerWJHP) getAdaptor(flag string) (adaptor adaptors.IAdaptor3) { switch flag { case "/fs-flexometer": log.Printf("[视觉位移数据]-上报,准备处理") adaptor = adaptors.Adaptor_SJWY_WJHP{IdMap: the.Info.SensorConfig} default: log.Printf("[无匹配 %s],不处理", flag) } return adaptor }