数据 输入输出 处理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

206 lines
5.1 KiB

package consumers
import (
"bufio"
"encoding/csv"
"encoding/json"
"fmt"
"goInOut/adaptors"
"goInOut/consumers/MYX"
"goInOut/dbHelper"
"goInOut/monitors"
"golang.org/x/text/encoding/unicode"
"log"
"os"
"path/filepath"
"strings"
"time"
)
type consumerMYX struct {
//数据缓存管道
dataCache chan []byte
//具体配置
Info MYX.ConfigFile
InMqtt *dbHelper.MqttHelper
InFileMonitor *monitors.FileMonitor
outHttpPost *dbHelper.HttpHelper
}
func (the *consumerMYX) LoadConfigJson(cfgStr string) {
// 将 JSON 格式的数据解析到结构体中
err := json.Unmarshal([]byte(cfgStr), &the.Info)
if err != nil {
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
panic(err)
}
}
func (the *consumerMYX) Initial(cfg string) error {
the.LoadConfigJson(cfg)
err := the.InputInitial()
if err != nil {
return err
}
err = the.OutputInitial()
return err
}
func (the *consumerMYX) InputInitial() error {
the.dataCache = make(chan []byte, 200)
//mqtt数据入口
the.InMqtt = dbHelper.MqttInitial(
the.Info.IOConfig.InMqtt.Host,
the.Info.IOConfig.InMqtt.Port,
the.Info.IOConfig.InMqtt.ClientId,
the.Info.IOConfig.InMqtt.UserName,
the.Info.IOConfig.InMqtt.Password,
false)
for _, inTopic := range the.Info.IOConfig.InMqtt.Topics {
the.InMqtt.Subscribe(inTopic, the.onData)
}
//文件监视器数据入口
the.InFileMonitor = monitors.FileMonitorInitial(
the.Info.IOConfig.InFileMonitor.Directory,
the.Info.IOConfig.InFileMonitor.FileNameExtension,
the.Info.IOConfig.InFileMonitor.CronStr,
)
the.InFileMonitor.Subscribe(the.onFileData)
return nil
}
func (the *consumerMYX) OutputInitial() error {
//数据出口
the.outHttpPost = &dbHelper.HttpHelper{
Url: the.Info.IOConfig.OutHttpPost.Url,
}
the.outHttpPost.Initial()
return nil
}
func (the *consumerMYX) Work() {
go func() {
for {
pushBytes := <-the.dataCache
log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache))
log.Printf("推送[%v]: len=%d", the.outHttpPost.Url, len(pushBytes))
//hex.EncodeToString(pushBytes)
the.outHttpPost.Publish(pushBytes)
time.Sleep(50 * time.Millisecond)
}
}()
}
func (the *consumerMYX) onData(Topic string, Msg string) {
if len(Msg) > 80 {
log.Printf("mqtt-recv:[%s]:%s ...", Topic, Msg[:80])
}
var needPush []byte
topicPrefixIndex := strings.LastIndex(Topic, "/")
matchTopic := Topic[:topicPrefixIndex]
adaptor := the.getAdaptor(matchTopic)
if adaptor != nil {
needPush = adaptor.Transform(Msg)
if len(needPush) > 0 {
the.dataCache <- needPush
}
}
}
func (the *consumerMYX) getAdaptor(flag string) (adaptor adaptors.IAdaptor) {
switch flag {
case "upload/uds":
log.Printf("[统一采集软件]-上报,准备处理")
adaptor = adaptors.Adaptor_TYCJ_MYX{IdMap: the.Info.Sensors.TYCJsensorNameMap}
case "content/ZD":
log.Printf("[振动软件]-上报,准备处理")
adaptor = adaptors.Adaptor_ZD_MYX{IdMap: the.Info.Sensors.ZDsensorMCMap}
default:
log.Printf("[无匹配 %s],不处理", flag)
}
return adaptor
}
func (the *consumerMYX) onFileData() {
dir := the.InFileMonitor.Directory
log.Printf("执行文件监视器-任务 扫描目录:%s....", dir)
adaptor := adaptors.Adaptor_GDGS_MYX{
IdMap: getGDGSIdMap(the.Info.Sensors.GDGSsensorNameMap),
}
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
isUTF8BOM := isUTF8BOM(path)
println(isUTF8BOM)
if filepath.Ext(path) == ".bbcsv" { //商用改为.csv
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
reader := csv.NewReader(unicode.UTF8BOM.NewDecoder().Reader(file))
records, err := reader.ReadAll()
if err != nil {
return err
}
fmt.Println("文件名:", info.Name())
//内容:
payload := adaptor.Transform(records)
if len(payload) > 0 {
the.dataCache <- payload
}
}
return nil
})
if err != nil {
fmt.Println("遍历目录时发生错误:", err)
}
//var needPush []byte
//
//adaptor := the.getAdaptor(matchTopic)
//if adaptor != nil {
// needPush = adaptor.Transform(Msg)
// if len(needPush) > 0 {
// the.dataCache <- needPush
// }
//}
}
func getGDGSIdMap(gdgsSensors []MYX.GDGSsensorNameMap) (IdMap map[string]MYX.CHInfo) {
IdMap = map[string]MYX.CHInfo{}
for _, gdgsSensor := range gdgsSensors {
for name, info := range gdgsSensor.Map {
IdMap[name] = MYX.CHInfo{
SensorInfo: info,
Type: gdgsSensor.Type,
Formula: gdgsSensor.Formula,
}
}
}
return IdMap
}
// 检查文件是否是UTF-8格式并带有BOM
func isUTF8BOM(path string) bool {
file, err := os.Open(path)
if err != nil {
return false
}
defer file.Close()
reader := bufio.NewReader(file)
bytes, err := reader.Peek(3) // 检查文件的前3个字节是否是UTF-8 BOM(0xEF,0xBB,0xBF)
if err != nil {
return false
}
if bytes[0] == 0xEF && bytes[1] == 0xBB && bytes[2] == 0xBF {
return true // 是UTF-8 BOM,是UTF-8格式的文本文件
} else {
return false // 不是UTF-8 BOM,不是UTF-8格式的文本文件或不是文本文件(例如二进制文件)
}
}