You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
207 lines
5.1 KiB
207 lines
5.1 KiB
2 weeks ago
|
package consumers
|
||
|
|
||
|
import (
|
||
|
"bufio"
|
||
|
"encoding/csv"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"goUpload/adaptors"
|
||
|
"goUpload/consumers/MYX"
|
||
|
"goUpload/dbHelper"
|
||
|
"goUpload/monitors"
|
||
|
"golang.org/x/text/encoding/unicode"
|
||
|
"log"
|
||
|
"os"
|
||
|
"path/filepath"
|
||
|
"strings"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
type consumerMYX struct {
|
||
|
//数据缓存管道
|
||
|
dataCache chan []byte
|
||
|
//具体配置
|
||
|
Info MYX.ConfigFile
|
||
|
InMqtt *dbHelper.MqttHelper
|
||
|
InFileMonitor *monitors.FileMonitor
|
||
|
outHttpPost *dbHelper.HttpHelper
|
||
|
}
|
||
|
|
||
|
func (the *consumerMYX) LoadConfigJson(cfgStr string) {
|
||
|
// 将 JSON 格式的数据解析到结构体中
|
||
|
err := json.Unmarshal([]byte(cfgStr), &the.Info)
|
||
|
if err != nil {
|
||
|
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
|
||
|
panic(err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (the *consumerMYX) Initial(cfg string) error {
|
||
|
the.LoadConfigJson(cfg)
|
||
|
err := the.InputInitial()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
err = the.OutputInitial()
|
||
|
return err
|
||
|
}
|
||
|
func (the *consumerMYX) InputInitial() error {
|
||
|
the.dataCache = make(chan []byte, 200)
|
||
|
//mqtt数据入口
|
||
|
the.InMqtt = dbHelper.MqttInitial(
|
||
|
the.Info.IOConfig.InMqtt.Host,
|
||
|
the.Info.IOConfig.InMqtt.Port,
|
||
|
the.Info.IOConfig.InMqtt.ClientId,
|
||
|
the.Info.IOConfig.InMqtt.UserName,
|
||
|
the.Info.IOConfig.InMqtt.Password,
|
||
|
false)
|
||
|
|
||
|
for _, inTopic := range the.Info.IOConfig.InMqtt.Topics {
|
||
|
the.InMqtt.Subscribe(inTopic, the.onData)
|
||
|
}
|
||
|
|
||
|
//文件监视器数据入口
|
||
|
the.InFileMonitor = monitors.FileMonitorInitial(
|
||
|
the.Info.IOConfig.InFileMonitor.Directory,
|
||
|
the.Info.IOConfig.InFileMonitor.FileNameExtension,
|
||
|
the.Info.IOConfig.InFileMonitor.CronStr,
|
||
|
)
|
||
|
the.InFileMonitor.Subscribe(the.onFileData)
|
||
|
return nil
|
||
|
}
|
||
|
func (the *consumerMYX) OutputInitial() error {
|
||
|
//数据出口
|
||
|
the.outHttpPost = &dbHelper.HttpHelper{
|
||
|
Url: the.Info.IOConfig.OutHttpPost.Url,
|
||
|
}
|
||
|
the.outHttpPost.Initial()
|
||
|
return nil
|
||
|
}
|
||
|
func (the *consumerMYX) Work() {
|
||
|
go func() {
|
||
|
for {
|
||
|
pushBytes := <-the.dataCache
|
||
|
log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache))
|
||
|
|
||
|
log.Printf("推送[%v]: len=%d", the.outHttpPost.Url, len(pushBytes))
|
||
|
//hex.EncodeToString(pushBytes)
|
||
|
the.outHttpPost.Publish(pushBytes)
|
||
|
time.Sleep(50 * time.Millisecond)
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
func (the *consumerMYX) onData(Topic string, Msg string) {
|
||
|
if len(Msg) > 80 {
|
||
|
log.Printf("mqtt-recv:[%s]:%s ...", Topic, Msg[:80])
|
||
|
}
|
||
|
var needPush []byte
|
||
|
topicPrefixIndex := strings.LastIndex(Topic, "/")
|
||
|
matchTopic := Topic[:topicPrefixIndex]
|
||
|
adaptor := the.getAdaptor(matchTopic)
|
||
|
if adaptor != nil {
|
||
|
needPush = adaptor.Transform(Msg)
|
||
|
if len(needPush) > 0 {
|
||
|
the.dataCache <- needPush
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
func (the *consumerMYX) getAdaptor(flag string) (adaptor adaptors.IAdaptor) {
|
||
|
switch flag {
|
||
|
case "upload/uds":
|
||
|
log.Printf("[统一采集软件]-上报,准备处理")
|
||
|
adaptor = adaptors.Adaptor_TYCJ_MYX{IdMap: the.Info.Sensors.TYCJsensorNameMap}
|
||
|
case "content/ZD":
|
||
|
log.Printf("[振动软件]-上报,准备处理")
|
||
|
adaptor = adaptors.Adaptor_ZD_MYX{IdMap: the.Info.Sensors.ZDsensorMCMap}
|
||
|
default:
|
||
|
log.Printf("[无匹配 %s],不处理", flag)
|
||
|
}
|
||
|
return adaptor
|
||
|
}
|
||
|
|
||
|
func (the *consumerMYX) onFileData() {
|
||
|
dir := the.InFileMonitor.Directory
|
||
|
log.Printf("执行文件监视器-任务 扫描目录:%s....", dir)
|
||
|
adaptor := adaptors.Adaptor_GDGS_MYX{
|
||
|
IdMap: getGDGSIdMap(the.Info.Sensors.GDGSsensorNameMap),
|
||
|
}
|
||
|
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
isUTF8BOM := isUTF8BOM(path)
|
||
|
println(isUTF8BOM)
|
||
|
if filepath.Ext(path) == ".bbcsv" { //商用改为.csv
|
||
|
file, err := os.Open(path)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer file.Close()
|
||
|
|
||
|
reader := csv.NewReader(unicode.UTF8BOM.NewDecoder().Reader(file))
|
||
|
records, err := reader.ReadAll()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
fmt.Println("文件名:", info.Name())
|
||
|
//内容:
|
||
|
payload := adaptor.Transform(records)
|
||
|
|
||
|
if len(payload) > 0 {
|
||
|
the.dataCache <- payload
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
})
|
||
|
|
||
|
if err != nil {
|
||
|
fmt.Println("遍历目录时发生错误:", err)
|
||
|
}
|
||
|
//var needPush []byte
|
||
|
//
|
||
|
//adaptor := the.getAdaptor(matchTopic)
|
||
|
//if adaptor != nil {
|
||
|
// needPush = adaptor.Transform(Msg)
|
||
|
// if len(needPush) > 0 {
|
||
|
// the.dataCache <- needPush
|
||
|
// }
|
||
|
//}
|
||
|
}
|
||
|
|
||
|
func getGDGSIdMap(gdgsSensors []MYX.GDGSsensorNameMap) (IdMap map[string]MYX.CHInfo) {
|
||
|
IdMap = map[string]MYX.CHInfo{}
|
||
|
for _, gdgsSensor := range gdgsSensors {
|
||
|
for name, info := range gdgsSensor.Map {
|
||
|
IdMap[name] = MYX.CHInfo{
|
||
|
SensorInfo: info,
|
||
|
Type: gdgsSensor.Type,
|
||
|
Formula: gdgsSensor.Formula,
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return IdMap
|
||
|
}
|
||
|
|
||
|
// 检查文件是否是UTF-8格式并带有BOM
|
||
|
func isUTF8BOM(path string) bool {
|
||
|
file, err := os.Open(path)
|
||
|
if err != nil {
|
||
|
return false
|
||
|
}
|
||
|
defer file.Close()
|
||
|
|
||
|
reader := bufio.NewReader(file)
|
||
|
bytes, err := reader.Peek(3) // 检查文件的前3个字节是否是UTF-8 BOM(0xEF,0xBB,0xBF)
|
||
|
if err != nil {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
if bytes[0] == 0xEF && bytes[1] == 0xBB && bytes[2] == 0xBF {
|
||
|
return true // 是UTF-8 BOM,是UTF-8格式的文本文件
|
||
|
} else {
|
||
|
return false // 不是UTF-8 BOM,不是UTF-8格式的文本文件或不是文本文件(例如二进制文件)
|
||
|
}
|
||
|
}
|