You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
95 lines
2.1 KiB
95 lines
2.1 KiB
2 months ago
|
package consumers
|
||
|
|
||
|
import (
|
||
|
"encoding/json"
|
||
|
"goInOut/adaptors"
|
||
|
"goInOut/consumers/HTTP_PRPXY"
|
||
|
"goInOut/dbHelper"
|
||
|
"log"
|
||
|
"strings"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
type consumerHttpProxy struct {
|
||
|
//数据缓存管道
|
||
|
ch chan []adaptors.NeedPush
|
||
|
//具体配置
|
||
|
Info HTTP_PRPXY.ConfigFile
|
||
|
InApiServer *dbHelper.ApiServerHelper
|
||
|
outHttpPost *dbHelper.HttpHelper
|
||
|
}
|
||
|
|
||
|
func (the *consumerHttpProxy) LoadConfigJson(cfgStr string) {
|
||
|
// 将 JSON 格式的数据解析到结构体中
|
||
|
err := json.Unmarshal([]byte(cfgStr), &the.Info)
|
||
|
if err != nil {
|
||
|
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
|
||
|
panic(err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (the *consumerHttpProxy) Initial(cfg string) error {
|
||
|
the.LoadConfigJson(cfg)
|
||
|
err := the.InputInitial()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
err = the.OutputInitial()
|
||
|
return err
|
||
|
}
|
||
|
func (the *consumerHttpProxy) InputInitial() error {
|
||
|
the.ch = make(chan []adaptors.NeedPush, 200)
|
||
|
//数据入口
|
||
|
the.InApiServer = dbHelper.NewApiServer(
|
||
|
the.Info.IoConfig.In.ApiServer.Port,
|
||
|
the.Info.IoConfig.In.ApiServer.Routes,
|
||
|
)
|
||
|
////inTopic := "Upload/#" //荔枝乌江大桥
|
||
|
//for _, inTopic := range the.Info.IoConfig.In.Mqtt.Topics {
|
||
|
// the.InMqtt.Subscribe(inTopic, the.onData)
|
||
|
//}
|
||
|
the.InApiServer.Initial()
|
||
|
return nil
|
||
|
}
|
||
|
func (the *consumerHttpProxy) OutputInitial() error {
|
||
|
//数据出口
|
||
|
the.outHttpPost.Initial()
|
||
|
return nil
|
||
|
}
|
||
|
func (the *consumerHttpProxy) Work() {
|
||
|
go func() {
|
||
|
for {
|
||
|
|
||
|
log.Printf("取出ch数据,剩余[%d] ", len(the.ch))
|
||
|
|
||
|
time.Sleep(100 * time.Millisecond)
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
func (the *consumerHttpProxy) onData(inTopic string, Msg string) {
|
||
|
if len(Msg) > 100 {
|
||
|
log.Printf("mqtt-recv:[%s]:%s ...", inTopic, Msg[:100])
|
||
|
}
|
||
|
|
||
|
topicPrefixIndex := strings.LastIndex(inTopic, "/")
|
||
|
matchTopic := inTopic[:topicPrefixIndex]
|
||
|
adaptor := the.getAdaptor(matchTopic)
|
||
|
if adaptor != nil {
|
||
|
needPush := adaptor.Transform(matchTopic, Msg)
|
||
|
if len(needPush) > 0 {
|
||
|
the.ch <- needPush
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
func (the *consumerHttpProxy) getAdaptor(flag string) (adaptor adaptors.IAdaptor4) {
|
||
|
bridgeCode := ""
|
||
|
if v, ok := the.Info.OtherInfo["bridgeCode"]; ok {
|
||
|
bridgeCode = v
|
||
|
}
|
||
|
if bridgeCode == "" {
|
||
|
panic("无正确的 bridgeCode")
|
||
|
}
|
||
|
|
||
|
return adaptor
|
||
|
}
|