You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
118 lines
3.1 KiB
118 lines
3.1 KiB
3 months ago
|
package consumers
|
||
|
|
||
|
import (
|
||
|
"encoding/json"
|
||
|
"goUpload/adaptors"
|
||
|
"goUpload/consumers/CQZG"
|
||
|
"goUpload/dbHelper"
|
||
|
"log"
|
||
|
"os"
|
||
|
"strings"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
type consumerCQZG struct {
|
||
|
//数据缓存管道
|
||
|
ch_t500101 chan []byte
|
||
|
//具体配置
|
||
|
Info CQZG.ConfigFile
|
||
|
InMqtt *dbHelper.MqttHelper
|
||
|
outMqtt *dbHelper.MqttHelper
|
||
|
}
|
||
|
|
||
|
func (the *consumerCQZG) LoadConfigJson(cfgStr string) {
|
||
|
// 将 JSON 格式的数据解析到结构体中
|
||
|
err := json.Unmarshal([]byte(cfgStr), &the.Info)
|
||
|
if err != nil {
|
||
|
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
|
||
|
panic(err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (the *consumerCQZG) Initial(cfg string) error {
|
||
|
the.LoadConfigJson(cfg)
|
||
|
err := the.InputInitial()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
err = the.OutputInitial()
|
||
|
return err
|
||
|
}
|
||
|
func (the *consumerCQZG) InputInitial() error {
|
||
|
os.Setenv("RC4Key", the.Info.Config.Rc4key)
|
||
|
log.Printf("RC4Key=%s", the.Info.Config.Rc4key)
|
||
|
the.ch_t500101 = make(chan []byte, 200)
|
||
|
//数据入口
|
||
|
the.InMqtt = dbHelper.MqttInitial(
|
||
|
the.Info.Config.InMqtt.Host,
|
||
|
the.Info.Config.InMqtt.Port,
|
||
|
the.Info.Config.InMqtt.ClientId,
|
||
|
the.Info.Config.InMqtt.UserName,
|
||
|
the.Info.Config.InMqtt.Password,
|
||
|
false)
|
||
|
//inTopic := "Upload/#" //荔枝乌江大桥
|
||
|
for _, inTopic := range the.Info.Config.InMqtt.Topics {
|
||
|
the.InMqtt.Subscribe(inTopic, the.onData)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
func (the *consumerCQZG) OutputInitial() error {
|
||
|
//数据出口
|
||
|
the.outMqtt = dbHelper.MqttInitial(
|
||
|
the.Info.Config.OutMqtt.Host,
|
||
|
the.Info.Config.OutMqtt.Port,
|
||
|
the.Info.Config.OutMqtt.ClientId,
|
||
|
the.Info.Config.OutMqtt.UserName,
|
||
|
the.Info.Config.OutMqtt.Password,
|
||
|
true,
|
||
|
"consumers/CQZG/ssl/centerCA.crt")
|
||
|
return nil
|
||
|
}
|
||
|
func (the *consumerCQZG) Work() {
|
||
|
go func() {
|
||
|
for {
|
||
|
pushBytes := <-the.ch_t500101
|
||
|
log.Printf("取出ch数据,剩余[%d] ", len(the.ch_t500101))
|
||
|
|
||
|
for _, outTopic := range the.Info.Config.OutMqtt.Topics {
|
||
|
log.Printf("推送[%s]: len=%d", outTopic, len(pushBytes))
|
||
|
//hex.EncodeToString(pushBytes)
|
||
|
the.outMqtt.Publish(outTopic, pushBytes)
|
||
|
}
|
||
|
time.Sleep(100 * time.Millisecond)
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
func (the *consumerCQZG) onData(Topic string, Msg string) {
|
||
|
if len(Msg) > 80 {
|
||
|
log.Printf("mqtt-recv:[%s]:%s ...", Topic, Msg[:80])
|
||
|
}
|
||
|
var needPush []byte
|
||
|
topicPrefixIndex := strings.LastIndex(Topic, "/")
|
||
|
matchTopic := Topic[:topicPrefixIndex]
|
||
|
adaptor := the.getAdaptor(matchTopic)
|
||
|
if adaptor != nil {
|
||
|
needPush = adaptor.Transform(Msg)
|
||
|
if len(needPush) > 0 {
|
||
|
the.ch_t500101 <- needPush
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
func (the *consumerCQZG) getAdaptor(flag string) (adaptor adaptors.IAdaptor) {
|
||
|
RC4Key := os.Getenv("RC4Key")
|
||
|
switch flag {
|
||
|
case "upload/uds":
|
||
|
log.Printf("[统一采集软件]-上报,准备处理")
|
||
|
adaptor = adaptors.Adaptor_TYCJ_CQZG{IdMap: the.Info.SensorMap.TYCJsensorNameMap, RC4Key: RC4Key}
|
||
|
case "upload/CZ":
|
||
|
log.Printf("[称重软件]-上报,准备处理")
|
||
|
adaptor = adaptors.Adaptor_OLCZ_CQZG{IdMap: the.Info.SensorMap.CZsensorRoadnoMap, RC4Key: RC4Key}
|
||
|
case "upload/ZD":
|
||
|
log.Printf("[振动软件]-上报,准备处理")
|
||
|
adaptor = adaptors.Adaptor_ZD_CQZG{IdMap: the.Info.SensorMap.ZDsensorMCMap, RC4Key: RC4Key}
|
||
|
default:
|
||
|
log.Printf("[无匹配 %s],不处理", flag)
|
||
|
}
|
||
|
return adaptor
|
||
|
}
|