数据 输入输出 处理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

109 lines
2.5 KiB

package consumers
import (
"encoding/json"
"fmt"
"goInOut/adaptors"
"goInOut/consumers/HTTP_PRPXY"
"goInOut/dbHelper"
"goInOut/utils"
"io"
"log"
"net/http"
)
type consumerHttpProxy struct {
//数据缓存管道
ch chan []adaptors.NeedPush
//具体配置
Info HTTP_PRPXY.ConfigFile
InApiServer *dbHelper.ApiServerHelper
outHttpPost *dbHelper.HttpHelper
}
func (the *consumerHttpProxy) LoadConfigJson(cfgStr string) {
// 将 JSON 格式的数据解析到结构体中
err := json.Unmarshal([]byte(cfgStr), &the.Info)
if err != nil {
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
panic(err)
}
}
func (the *consumerHttpProxy) Initial(cfg string) error {
the.LoadConfigJson(cfg)
err := the.InputInitial()
if err != nil {
return err
}
err = the.OutputInitial()
return err
}
func (the *consumerHttpProxy) InputInitial() error {
the.ch = make(chan []adaptors.NeedPush, 200)
//数据入口
the.InApiServer = dbHelper.NewApiServer(
the.Info.IoConfig.In.ApiServer.Port,
the.Info.IoConfig.In.ApiServer.Routes,
)
the.InApiServer.Initial()
return nil
}
func (the *consumerHttpProxy) OutputInitial() error {
//数据出口
the.outHttpPost = &dbHelper.HttpHelper{
Url: the.Info.IoConfig.Out.HttpPost.Url,
}
the.outHttpPost.Initial()
return nil
}
func (the *consumerHttpProxy) Work() {
go func() {
router := "POST /onData"
the.InApiServer.RouteRegister(router, the.onData)
the.InApiServer.Run()
}()
}
func (the *consumerHttpProxy) onData(w http.ResponseWriter, r *http.Request) {
route := "/onData"
w.Header().Set("Content-Type", "application/json")
body, err := io.ReadAll(r.Body)
log.Printf("收到 %s 请求 %s", route, body)
bodyObj := HTTP_PRPXY.ABStatus{}
err = json.Unmarshal(body, &bodyObj)
if err != nil {
log.Printf("body 解析失败,请检查 %s", err.Error())
return
}
idTemplate := the.Info.OtherInfo["idTemplate"]
id, err := utils.TextTemplateMatch(bodyObj, idTemplate)
if err != nil {
log.Printf("解析id 失败,请检查 %s", err.Error())
}
params := map[string]string{
"id": id,
}
resp, err := the.outHttpPost.HttpPostWithParams(string(body), params)
if err != nil {
return
}
log.Printf("应答: %s", resp)
defer fmt.Fprintf(w, resp)
}
func (the *consumerHttpProxy) getAdaptor(flag string) (adaptor adaptors.IAdaptor4) {
bridgeCode := ""
if v, ok := the.Info.OtherInfo["bridgeCode"]; ok {
bridgeCode = v
}
if bridgeCode == "" {
panic("无正确的 bridgeCode")
}
return adaptor
}