package consumers import ( "encoding/json" "goInOut/adaptors" "goInOut/consumers/JSNCGLQL" "goInOut/dbHelper" "log" "strings" "time" ) type consumerJSNCGLQL struct { //数据缓存管道 ch chan []adaptors.NeedPush //具体配置 Info JSNCGLQL.ConfigFile InMqtt *dbHelper.MqttHelper outMqtt *dbHelper.MqttHelper } func (the *consumerJSNCGLQL) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := json.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerJSNCGLQL) Initial(cfg string) error { the.LoadConfigJson(cfg) err := the.InputInitial() if err != nil { return err } err = the.OutputInitial() return err } func (the *consumerJSNCGLQL) InputInitial() error { the.ch = make(chan []adaptors.NeedPush, 200) //数据入口 the.InMqtt = dbHelper.MqttInitial( the.Info.IoConfig.In.Mqtt.Host, the.Info.IoConfig.In.Mqtt.Port, the.Info.IoConfig.In.Mqtt.ClientId, the.Info.IoConfig.In.Mqtt.UserName, the.Info.IoConfig.In.Mqtt.Password, false) //inTopic := "Upload/#" //荔枝乌江大桥 for _, inTopic := range the.Info.IoConfig.In.Mqtt.Topics { the.InMqtt.Subscribe(inTopic, the.onData) } return nil } func (the *consumerJSNCGLQL) OutputInitial() error { //数据出口 the.outMqtt = dbHelper.MqttInitial( the.Info.IoConfig.Out.Mqtt.Host, the.Info.IoConfig.Out.Mqtt.Port, the.Info.IoConfig.Out.Mqtt.ClientId, the.Info.IoConfig.Out.Mqtt.UserName, the.Info.IoConfig.Out.Mqtt.Password, false, //按照具体项目来 "consumers/CQZG/ssl/centerCA.crt") return nil } func (the *consumerJSNCGLQL) Work() { go func() { for { needPushs := <-the.ch log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) for _, outTopic := range the.Info.IoConfig.Out.Mqtt.Topics { for _, push := range needPushs { log.Printf("推送[%s]: len=%d", outTopic, len(push.Payload)) //hex.EncodeToString(pushBytes) if push.Topic != "" { outTopic = push.Topic } the.outMqtt.Publish(outTopic, push.Payload) } } time.Sleep(100 * time.Millisecond) } }() } func (the *consumerJSNCGLQL) onData(inTopic string, Msg string) { if len(Msg) > 100 { log.Printf("mqtt-recv:[%s]:%s ...", inTopic, Msg[:100]) } topicPrefixIndex := strings.LastIndex(inTopic, "/") matchTopic := inTopic[:topicPrefixIndex] adaptor := the.getAdaptor(matchTopic) if adaptor != nil { needPush := adaptor.Transform(inTopic, Msg) if len(needPush) > 0 { the.ch <- needPush } } } func (the *consumerJSNCGLQL) getAdaptor(flag string) (adaptor adaptors.IAdaptor4) { switch flag { case "upload/uds": log.Printf("[统一采集软件]-上报,准备处理") //adaptor = adaptors.Adaptor_TYCJ_JSNCGLQL{IdMap: the.Info.TYCJsensorMap} case "upload/ZD": log.Printf("[振动软件]-上报,准备处理") //adaptor = adaptors.Adaptor_ZD_CQZG{IdMap: the.Info.SensorMap.ZDsensorMCMap, RC4Key: RC4Key} default: log.Printf("[无匹配 %s],不处理", flag) } return adaptor }