package consumers import ( "encoding/json" "goUpload/adaptors" "goUpload/consumers/HTJC" "goUpload/dbHelper" "log" "strings" "time" ) type consumerHTJC struct { //数据缓存管道 dataCache chan []byte //具体配置 Info HTJC.ConfigFile InMqtt *dbHelper.MqttHelper outUdp *dbHelper.UdpHelper } func (the *consumerHTJC) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := json.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerHTJC) Initial(cfg string) error { the.LoadConfigJson(cfg) err := the.InputInitial() if err != nil { return err } err = the.OutputInitial() return err } func (the *consumerHTJC) InputInitial() error { the.dataCache = make(chan []byte, 1000) //数据入口 the.InMqtt = dbHelper.MqttInitial( the.Info.Config.InMqtt.Host, the.Info.Config.InMqtt.Port, the.Info.Config.InMqtt.ClientId, the.Info.Config.InMqtt.UserName, the.Info.Config.InMqtt.Password, false) //inTopic := "Upload/#" //荔枝乌江大桥 for _, inTopic := range the.Info.Config.InMqtt.Topics { the.InMqtt.Subscribe(inTopic, the.onData) } return nil } func (the *consumerHTJC) OutputInitial() error { //数据出口 the.outUdp = &dbHelper.UdpHelper{ Host: the.Info.Config.OutUdp.Host, Port: the.Info.Config.OutUdp.Port, } the.outUdp.Initial() return nil } func (the *consumerHTJC) Work() { go func() { for { pushBytes := <-the.dataCache log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache)) log.Printf("推送[%v]: len=%d", the.outUdp, len(pushBytes)) go the.outUdp.Publish(pushBytes) time.Sleep(2 * time.Millisecond) } }() } func (the *consumerHTJC) onData(Topic string, Msg string) { if len(Msg) > 80 { log.Printf("mqtt-recv:[%s]:%s ...", Topic, Msg[:80]) } var needPushs [][]byte topicPrefixIndex := strings.LastIndex(Topic, "/") matchTopic := Topic[:topicPrefixIndex] adaptor := the.getAdaptor(matchTopic) if adaptor != nil { needPushs = adaptor.Transform(Msg) for _, needPush := range needPushs { if len(needPush) > 0 { the.dataCache <- needPush } } } } func (the *consumerHTJC) getAdaptor(flag string) (adaptor adaptors.IAdaptor2) { switch flag { case "upload/uds": log.Printf("[统一采集软件]-上报,准备处理") adaptor = adaptors.Adaptor_TYCJ_XTJC{IdMap: the.Info.SensorMap.TYCJsensorNameMap} case "upload/ZD": log.Printf("[振动软件]-上报,准备处理") adaptor = adaptors.Adaptor_ZD_XTJC{IdMap: the.Info.SensorMap.ZDsensorMCMap} default: log.Printf("[无匹配 %s],不处理", flag) } return adaptor }