package consumers import ( "bufio" "encoding/csv" "encoding/json" "fmt" "goInOut/adaptors" "goInOut/consumers/MYX" "goInOut/dbOperate" "goInOut/monitors" "golang.org/x/text/encoding/unicode" "log" "os" "path/filepath" "strings" "time" ) type consumerMYX struct { //数据缓存管道 dataCache chan []byte //具体配置 Info MYX.ConfigFile InMqtt *dbOperate.MqttHelper InFileMonitor *monitors.FileMonitor outHttpPost *dbOperate.HttpHelper } func (the *consumerMYX) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := json.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerMYX) Initial(cfg string) error { the.LoadConfigJson(cfg) err := the.InputInitial() if err != nil { return err } err = the.OutputInitial() return err } func (the *consumerMYX) InputInitial() error { the.dataCache = make(chan []byte, 200) //mqtt数据入口 the.InMqtt = dbOperate.MqttInitial( the.Info.IOConfig.InMqtt.Host, the.Info.IOConfig.InMqtt.Port, the.Info.IOConfig.InMqtt.ClientId, the.Info.IOConfig.InMqtt.UserName, the.Info.IOConfig.InMqtt.Password, false) for _, inTopic := range the.Info.IOConfig.InMqtt.Topics { the.InMqtt.Subscribe(inTopic, the.onData) } //文件监视器数据入口 the.InFileMonitor = monitors.FileMonitorInitial( the.Info.IOConfig.InFileMonitor.Directory, the.Info.IOConfig.InFileMonitor.FileNameExtension, the.Info.IOConfig.InFileMonitor.CronStr, ) the.InFileMonitor.Subscribe(the.onFileData) return nil } func (the *consumerMYX) OutputInitial() error { //数据出口 the.outHttpPost = &dbOperate.HttpHelper{ Url: the.Info.IOConfig.OutHttpPost.Url, } the.outHttpPost.Initial() return nil } func (the *consumerMYX) Work() { go func() { for { pushBytes := <-the.dataCache log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache)) log.Printf("推送[%v]: len=%d", the.outHttpPost.Url, len(pushBytes)) //hex.EncodeToString(pushBytes) the.outHttpPost.Publish(pushBytes) time.Sleep(50 * time.Millisecond) } }() } func (the *consumerMYX) onData(Topic string, Msg string) { if len(Msg) > 80 { log.Printf("mqtt-recv:[%s]:%s ...", Topic, Msg[:80]) } var needPush []byte topicPrefixIndex := strings.LastIndex(Topic, "/") matchTopic := Topic[:topicPrefixIndex] adaptor := the.getAdaptor(matchTopic) if adaptor != nil { needPush = adaptor.Transform(Msg) if len(needPush) > 0 { the.dataCache <- needPush } } } func (the *consumerMYX) getAdaptor(flag string) (adaptor adaptors.IAdaptor) { switch flag { case "upload/uds": log.Printf("[统一采集软件]-上报,准备处理") adaptor = adaptors.Adaptor_TYCJ_MYX{IdMap: the.Info.Sensors.TYCJsensorNameMap} case "content/ZD": log.Printf("[振动软件]-上报,准备处理") adaptor = adaptors.Adaptor_ZD_MYX{IdMap: the.Info.Sensors.ZDsensorMCMap} default: log.Printf("[无匹配 %s],不处理", flag) } return adaptor } func (the *consumerMYX) onFileData() { dir := the.InFileMonitor.Directory log.Printf("执行文件监视器-任务 扫描目录:%s....", dir) adaptor := adaptors.Adaptor_GDGS_MYX{ IdMap: getGDGSIdMap(the.Info.Sensors.GDGSsensorNameMap), } err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } isUTF8BOM := isUTF8BOM(path) println(isUTF8BOM) if filepath.Ext(path) == ".bbcsv" { //商用改为.csv file, err := os.Open(path) if err != nil { return err } defer file.Close() reader := csv.NewReader(unicode.UTF8BOM.NewDecoder().Reader(file)) records, err := reader.ReadAll() if err != nil { return err } fmt.Println("文件名:", info.Name()) //内容: payload := adaptor.Transform(records) if len(payload) > 0 { the.dataCache <- payload } } return nil }) if err != nil { fmt.Println("遍历目录时发生错误:", err) } //var needPush []byte // //adaptor := the.getAdaptor(matchTopic) //if adaptor != nil { // needPush = adaptor.Transform(Msg) // if len(needPush) > 0 { // the.dataCache <- needPush // } //} } func getGDGSIdMap(gdgsSensors []MYX.GDGSsensorNameMap) (IdMap map[string]MYX.CHInfo) { IdMap = map[string]MYX.CHInfo{} for _, gdgsSensor := range gdgsSensors { for name, info := range gdgsSensor.Map { IdMap[name] = MYX.CHInfo{ SensorInfo: info, Type: gdgsSensor.Type, Formula: gdgsSensor.Formula, } } } return IdMap } // 检查文件是否是UTF-8格式并带有BOM func isUTF8BOM(path string) bool { file, err := os.Open(path) if err != nil { return false } defer file.Close() reader := bufio.NewReader(file) bytes, err := reader.Peek(3) // 检查文件的前3个字节是否是UTF-8 BOM(0xEF,0xBB,0xBF) if err != nil { return false } if bytes[0] == 0xEF && bytes[1] == 0xBB && bytes[2] == 0xBF { return true // 是UTF-8 BOM,是UTF-8格式的文本文件 } else { return false // 不是UTF-8 BOM,不是UTF-8格式的文本文件或不是文本文件(例如二进制文件) } }