package consumers import ( "encoding/json" "goUpload/adaptors" "goUpload/consumers/CQZG" "goUpload/dbHelper" "log" "os" "strings" "time" ) type consumerCQZG struct { //数据缓存管道 ch_t500101 chan []byte //具体配置 Info CQZG.ConfigFile InMqtt *dbHelper.MqttHelper outMqtt *dbHelper.MqttHelper } func (the *consumerCQZG) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := json.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerCQZG) Initial(cfg string) error { the.LoadConfigJson(cfg) err := the.InputInitial() if err != nil { return err } err = the.OutputInitial() return err } func (the *consumerCQZG) InputInitial() error { os.Setenv("RC4Key", the.Info.Config.Rc4key) log.Printf("RC4Key=%s", the.Info.Config.Rc4key) the.ch_t500101 = make(chan []byte, 200) //数据入口 the.InMqtt = dbHelper.MqttInitial( the.Info.Config.InMqtt.Host, the.Info.Config.InMqtt.Port, the.Info.Config.InMqtt.ClientId, the.Info.Config.InMqtt.UserName, the.Info.Config.InMqtt.Password, false) //inTopic := "Upload/#" //荔枝乌江大桥 for _, inTopic := range the.Info.Config.InMqtt.Topics { the.InMqtt.Subscribe(inTopic, the.onData) } return nil } func (the *consumerCQZG) OutputInitial() error { //数据出口 the.outMqtt = dbHelper.MqttInitial( the.Info.Config.OutMqtt.Host, the.Info.Config.OutMqtt.Port, the.Info.Config.OutMqtt.ClientId, the.Info.Config.OutMqtt.UserName, the.Info.Config.OutMqtt.Password, true, "consumers/CQZG/ssl/centerCA.crt") return nil } func (the *consumerCQZG) Work() { go func() { for { pushBytes := <-the.ch_t500101 log.Printf("取出ch数据,剩余[%d] ", len(the.ch_t500101)) for _, outTopic := range the.Info.Config.OutMqtt.Topics { log.Printf("推送[%s]: len=%d", outTopic, len(pushBytes)) //hex.EncodeToString(pushBytes) the.outMqtt.Publish(outTopic, pushBytes) } time.Sleep(100 * time.Millisecond) } }() } func (the *consumerCQZG) onData(Topic string, Msg string) { if len(Msg) > 80 { log.Printf("mqtt-recv:[%s]:%s ...", Topic, Msg[:80]) } var needPush []byte topicPrefixIndex := strings.LastIndex(Topic, "/") matchTopic := Topic[:topicPrefixIndex] adaptor := the.getAdaptor(matchTopic) if adaptor != nil { needPush = adaptor.Transform(Msg) if len(needPush) > 0 { the.ch_t500101 <- needPush } } } func (the *consumerCQZG) getAdaptor(flag string) (adaptor adaptors.IAdaptor) { RC4Key := os.Getenv("RC4Key") switch flag { case "upload/uds": log.Printf("[统一采集软件]-上报,准备处理") adaptor = adaptors.Adaptor_TYCJ_CQZG{IdMap: the.Info.SensorMap.TYCJsensorNameMap, RC4Key: RC4Key} case "upload/CZ": log.Printf("[称重软件]-上报,准备处理") adaptor = adaptors.Adaptor_OLCZ_CQZG{IdMap: the.Info.SensorMap.CZsensorRoadnoMap, RC4Key: RC4Key} case "upload/ZD": log.Printf("[振动软件]-上报,准备处理") adaptor = adaptors.Adaptor_ZD_CQZG{IdMap: the.Info.SensorMap.ZDsensorMCMap, RC4Key: RC4Key} default: log.Printf("[无匹配 %s],不处理", flag) } return adaptor }