package consumers import ( "encoding/json" "goInOut/adaptors" "goInOut/consumers/HTTP_PRPXY" "goInOut/dbHelper" "log" "strings" "time" ) type consumerHttpProxy struct { //数据缓存管道 ch chan []adaptors.NeedPush //具体配置 Info HTTP_PRPXY.ConfigFile InApiServer *dbHelper.ApiServerHelper outHttpPost *dbHelper.HttpHelper } func (the *consumerHttpProxy) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := json.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerHttpProxy) Initial(cfg string) error { the.LoadConfigJson(cfg) err := the.InputInitial() if err != nil { return err } err = the.OutputInitial() return err } func (the *consumerHttpProxy) InputInitial() error { the.ch = make(chan []adaptors.NeedPush, 200) //数据入口 the.InApiServer = dbHelper.NewApiServer( the.Info.IoConfig.In.ApiServer.Port, the.Info.IoConfig.In.ApiServer.Routes, ) ////inTopic := "Upload/#" //荔枝乌江大桥 //for _, inTopic := range the.Info.IoConfig.In.Mqtt.Topics { // the.InMqtt.Subscribe(inTopic, the.onData) //} the.InApiServer.Initial() return nil } func (the *consumerHttpProxy) OutputInitial() error { //数据出口 the.outHttpPost.Initial() return nil } func (the *consumerHttpProxy) Work() { go func() { for { log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) time.Sleep(100 * time.Millisecond) } }() } func (the *consumerHttpProxy) onData(inTopic string, Msg string) { if len(Msg) > 100 { log.Printf("mqtt-recv:[%s]:%s ...", inTopic, Msg[:100]) } topicPrefixIndex := strings.LastIndex(inTopic, "/") matchTopic := inTopic[:topicPrefixIndex] adaptor := the.getAdaptor(matchTopic) if adaptor != nil { needPush := adaptor.Transform(matchTopic, Msg) if len(needPush) > 0 { the.ch <- needPush } } } func (the *consumerHttpProxy) getAdaptor(flag string) (adaptor adaptors.IAdaptor4) { bridgeCode := "" if v, ok := the.Info.OtherInfo["bridgeCode"]; ok { bridgeCode = v } if bridgeCode == "" { panic("无正确的 bridgeCode") } return adaptor }