数据上报
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

104 lines
2.6 KiB

package consumers
import (
"encoding/json"
"goUpload/adaptors"
"goUpload/consumers/WJHP"
"goUpload/dbHelper"
"goUpload/monitors"
"log"
"strings"
"time"
)
type consumerWJHP struct {
//数据缓存管道
dataCache chan []byte
//具体配置
Info WJHP.ConfigFile
InMqtt *dbHelper.MqttHelper
InFileMonitor *monitors.FileMonitor
outHttpPost *dbHelper.HttpHelper
}
func (the *consumerWJHP) LoadConfigJson(cfgStr string) {
// 将 JSON 格式的数据解析到结构体中
err := json.Unmarshal([]byte(cfgStr), &the.Info)
if err != nil {
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
panic(err)
}
}
func (the *consumerWJHP) Initial(cfg string) error {
the.LoadConfigJson(cfg)
err := the.InputInitial()
if err != nil {
return err
}
err = the.OutputInitial()
return err
}
func (the *consumerWJHP) InputInitial() error {
the.dataCache = make(chan []byte, 200)
//mqtt数据入口
the.InMqtt = dbHelper.MqttInitial(
the.Info.IOConfig.InMqtt.Host,
the.Info.IOConfig.InMqtt.Port,
the.Info.IOConfig.InMqtt.ClientId,
the.Info.IOConfig.InMqtt.UserName,
the.Info.IOConfig.InMqtt.Password,
false)
for _, inTopic := range the.Info.IOConfig.InMqtt.Topics {
the.InMqtt.Subscribe(inTopic, the.onData)
}
return nil
}
func (the *consumerWJHP) OutputInitial() error {
//数据出口
the.outHttpPost = &dbHelper.HttpHelper{
Url: the.Info.IOConfig.OutHttpPost.Url,
}
the.outHttpPost.Initial()
return nil
}
func (the *consumerWJHP) Work() {
go func() {
for {
pushBytes := <-the.dataCache
log.Printf("取出ch数据,剩余[%d]", len(the.dataCache))
log.Printf("准备推送=%s", pushBytes)
resp, err := the.outHttpPost.Publish(pushBytes)
log.Printf("推送[%s]%v\n结果[%s]: ", the.outHttpPost.Url, err, resp)
time.Sleep(50 * time.Millisecond)
}
}()
}
func (the *consumerWJHP) onData(Topic string, Msg string) {
if len(Msg) > 80 {
log.Printf("mqtt-recv:[%s]:%s ...", Topic, Msg[:100])
}
var needPush []byte
topicPrefixIndex := strings.LastIndex(Topic, "/")
matchTopic := Topic[:topicPrefixIndex]
deviceModule := Topic[topicPrefixIndex+1:]
adaptor := the.getAdaptor(matchTopic)
if adaptor != nil {
needPush = adaptor.Transform(deviceModule, Msg)
if len(needPush) > 0 {
the.dataCache <- needPush
}
}
}
func (the *consumerWJHP) getAdaptor(flag string) (adaptor adaptors.IAdaptor3) {
switch flag {
case "/fs-flexometer":
log.Printf("[视觉位移数据]-上报,准备处理")
adaptor = adaptors.Adaptor_SJWY_WJHP{IdMap: the.Info.SensorConfig}
default:
log.Printf("[无匹配 %s],不处理", flag)
}
return adaptor
}