数据 输入输出 处理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

130 lines
3.5 KiB

package consumers
import (
"encoding/json"
"goInOut/adaptors"
"goInOut/consumers/JSNCGLQL"
"goInOut/dbOperate"
"log"
"strings"
"time"
)
type consumerJSNCGLQL struct {
//数据缓存管道
ch chan []adaptors.NeedPush
//具体配置
Info JSNCGLQL.ConfigFile
InMqtt *dbOperate.MqttHelper
outMqtt *dbOperate.MqttHelper
}
func (the *consumerJSNCGLQL) LoadConfigJson(cfgStr string) {
// 将 JSON 格式的数据解析到结构体中
err := json.Unmarshal([]byte(cfgStr), &the.Info)
if err != nil {
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
panic(err)
}
}
func (the *consumerJSNCGLQL) Initial(cfg string) error {
the.LoadConfigJson(cfg)
err := the.InputInitial()
if err != nil {
return err
}
err = the.OutputInitial()
return err
}
func (the *consumerJSNCGLQL) InputInitial() error {
the.ch = make(chan []adaptors.NeedPush, 200)
//数据入口
the.InMqtt = dbOperate.MqttInitial(
the.Info.IoConfig.In.Mqtt.Host,
the.Info.IoConfig.In.Mqtt.Port,
the.Info.IoConfig.In.Mqtt.ClientId,
the.Info.IoConfig.In.Mqtt.UserName,
the.Info.IoConfig.In.Mqtt.Password,
false)
//inTopic := "Upload/#" //荔枝乌江大桥
for _, inTopic := range the.Info.IoConfig.In.Mqtt.Topics {
the.InMqtt.Subscribe(inTopic, the.onData)
}
return nil
}
func (the *consumerJSNCGLQL) OutputInitial() error {
//数据出口
the.outMqtt = dbOperate.MqttInitial(
the.Info.IoConfig.Out.Mqtt.Host,
the.Info.IoConfig.Out.Mqtt.Port,
the.Info.IoConfig.Out.Mqtt.ClientId,
the.Info.IoConfig.Out.Mqtt.UserName,
the.Info.IoConfig.Out.Mqtt.Password,
false, //按照具体项目来
"consumers/CQZG/ssl/centerCA.crt")
return nil
}
func (the *consumerJSNCGLQL) Work() {
go func() {
for {
needPushs := <-the.ch
log.Printf("取出ch数据,剩余[%d] ", len(the.ch))
for _, outTopic := range the.Info.IoConfig.Out.Mqtt.Topics {
for _, push := range needPushs {
log.Printf("推送[%s]: len=%d", outTopic, len(push.Payload))
//hex.EncodeToString(pushBytes)
if push.Topic != "" {
outTopic = push.Topic
}
the.outMqtt.Publish(outTopic, push.Payload)
}
}
time.Sleep(100 * time.Millisecond)
}
}()
}
func (the *consumerJSNCGLQL) onData(inTopic string, Msg string) {
if len(Msg) > 100 {
log.Printf("mqtt-recv:[%s]:%s ...", inTopic, Msg[:100])
}
topicPrefixIndex := strings.LastIndex(inTopic, "/")
matchTopic := inTopic[:topicPrefixIndex]
adaptor := the.getAdaptor(matchTopic)
if adaptor != nil {
needPush := adaptor.Transform(matchTopic, Msg)
if len(needPush) > 0 {
the.ch <- needPush
}
}
}
func (the *consumerJSNCGLQL) getAdaptor(flag string) (adaptor adaptors.IAdaptor4) {
bridgeCode := ""
if v, ok := the.Info.OtherInfo["bridgeCode"]; ok {
bridgeCode = v
}
if bridgeCode == "" {
panic("无正确的 bridgeCode")
}
switch flag {
case "upload/uds":
log.Printf("[统一采集软件]-上报,准备处理")
adaptor = adaptors.Adaptor_TYCJ_JSNCGLQL{IdMap: the.Info.TYCJsensorMap, BridgeCode: bridgeCode}
case "upload/ZD":
log.Printf("[振动软件]-上报,准备处理")
adaptor = adaptors.Adaptor_ZD_JSNCGLQL{IdMap: the.Info.ZDsensorMap, BridgeCode: bridgeCode}
case "upload/ZDSL":
log.Printf("[振动索力软件]-上报,准备处理")
adaptor = adaptors.Adaptor_ZD_JSNCGLQL{IdMap: the.Info.ZDSLensorMap, BridgeCode: bridgeCode}
case "upload/gdnd":
log.Printf("[光电挠度]-上报,准备处理")
adaptor = adaptors.Adaptor_GDND2LA_JSNCGLQL{IdMap: the.Info.GDNDsensorMap, BridgeCode: bridgeCode}
default:
log.Printf("[无匹配 %s],不处理", flag)
}
return adaptor
}