From bb15e9b7f956caae29fe4574c26af36701be8ba4 Mon Sep 17 00:00:00 2001 From: lucas Date: Tue, 19 Nov 2024 16:01:16 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E6=B7=BB=E5=8A=A0=20=E5=AE=89?= =?UTF-8?q?=E5=85=A8=E5=B8=A6=E7=8A=B6=E6=80=81=E6=95=B0=E6=8D=AE=E8=BD=AC?= =?UTF-8?q?=E5=8F=91=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config_转发http2axy60000端口.json | 9 ++- consumers/HTTP_PRPXY/dataModel.go | 44 ++++++++++++++ consumers/consumerHTTP_PRPXY.go | 60 ++++++++++++------- dbHelper/apiServerHelper.go | 25 +++++--- dbHelper/httpHelper.go | 35 ++++++++++- testUnit/安心云http转发_test.go | 38 ++++++++++++ utils/textTemplateHandler.go | 22 +++++++ 7 files changed, 197 insertions(+), 36 deletions(-) create mode 100644 consumers/HTTP_PRPXY/dataModel.go create mode 100644 testUnit/安心云http转发_test.go create mode 100644 utils/textTemplateHandler.go diff --git a/configFiles/config_转发http2axy60000端口.json b/configFiles/config_转发http2axy60000端口.json index a45dd7a..5b35c55 100644 --- a/configFiles/config_转发http2axy60000端口.json +++ b/configFiles/config_转发http2axy60000端口.json @@ -13,8 +13,13 @@ } }, "out": { - "url": "http://127.0.0.1:4009/write", - "method": "post" + "httpPost": { + "url": "http://218.3.126.49:60000/upload", + "method": "post" + } } + }, + "info": { + "idTemplate": "{{.Values.UcId}}" } } \ No newline at end of file diff --git a/consumers/HTTP_PRPXY/dataModel.go b/consumers/HTTP_PRPXY/dataModel.go new file mode 100644 index 0000000..15e8f3f --- /dev/null +++ b/consumers/HTTP_PRPXY/dataModel.go @@ -0,0 +1,44 @@ +package HTTP_PRPXY + +type ABStatus struct { + Values ABStatusValues `json:"values"` + Secret string `json:"secret"` + Type string `json:"type"` +} + +type ABStatusValues struct { + CreateTime int64 `json:"createTime"` + Updater string `json:"updater"` + Deleted bool `json:"deleted"` + Id string `json:"id"` + WorkId string `json:"workId"` + UcId string `json:"ucId"` + DeviceCode string `json:"deviceCode"` + UserId string `json:"userId"` + UserName string `json:"userName"` + DataTime int64 `json:"dataTime"` + BeltStatus string `json:"beltStatus"` + DeviceStatus string `json:"deviceStatus"` + StartDate int64 `json:"startDate"` + EndDate int64 `json:"endDate"` + MainVol int `json:"mainVol"` + AscendWorkCertificate string `json:"ascend_work_certificate"` + BeltPoseLeft string `json:"beltPoseLeft"` + BeltPoseRight string `json:"beltPoseRight"` + BeltStatusLeft string `json:"beltStatusLeft"` + BeltStatusRight string `json:"beltStatusRight"` + TickbVol int `json:"tickbVol"` + //Avatar interface{} `json:"avatar"` + //AlarmType interface{} `json:"alarmType"` + //Identity interface{} `json:"identity"` + //IdentityImg interface{} `json:"identityImg"` + //IdentityImgF interface{} `json:"identityImgF"` + //Phone interface{} `json:"phone"` + Versions int `json:"versions"` + //Team interface{} `json:"team"` + Longitude string `json:"longitude"` + Latitude string `json:"latitude"` + Outsource string `json:"outsource"` + LocTime int64 `json:"locTime"` + //WorkLen interface{} `json:"workLen"` +} diff --git a/consumers/consumerHTTP_PRPXY.go b/consumers/consumerHTTP_PRPXY.go index b80d9fc..5636d24 100644 --- a/consumers/consumerHTTP_PRPXY.go +++ b/consumers/consumerHTTP_PRPXY.go @@ -2,12 +2,14 @@ package consumers import ( "encoding/json" + "fmt" "goInOut/adaptors" "goInOut/consumers/HTTP_PRPXY" "goInOut/dbHelper" + "goInOut/utils" + "io" "log" - "strings" - "time" + "net/http" ) type consumerHttpProxy struct { @@ -44,43 +46,55 @@ func (the *consumerHttpProxy) InputInitial() error { the.Info.IoConfig.In.ApiServer.Port, the.Info.IoConfig.In.ApiServer.Routes, ) - ////inTopic := "Upload/#" //荔枝乌江大桥 - //for _, inTopic := range the.Info.IoConfig.In.Mqtt.Topics { - // the.InMqtt.Subscribe(inTopic, the.onData) - //} the.InApiServer.Initial() return nil } func (the *consumerHttpProxy) OutputInitial() error { //数据出口 + the.outHttpPost = &dbHelper.HttpHelper{ + Url: the.Info.IoConfig.Out.HttpPost.Url, + } the.outHttpPost.Initial() return nil } func (the *consumerHttpProxy) Work() { go func() { - for { - - log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) - - time.Sleep(100 * time.Millisecond) - } + router := "POST /onData" + the.InApiServer.RouteRegister(router, the.onData) + the.InApiServer.Run() }() } -func (the *consumerHttpProxy) onData(inTopic string, Msg string) { - if len(Msg) > 100 { - log.Printf("mqtt-recv:[%s]:%s ...", inTopic, Msg[:100]) +func (the *consumerHttpProxy) onData(w http.ResponseWriter, r *http.Request) { + route := "/onData" + w.Header().Set("Content-Type", "application/json") + body, err := io.ReadAll(r.Body) + + log.Printf("收到 %s 请求 %s", route, body) + + bodyObj := HTTP_PRPXY.ABStatus{} + err = json.Unmarshal(body, &bodyObj) + if err != nil { + log.Printf("body 解析失败,请检查 %s", err.Error()) + return } + idTemplate := the.Info.OtherInfo["idTemplate"] - topicPrefixIndex := strings.LastIndex(inTopic, "/") - matchTopic := inTopic[:topicPrefixIndex] - adaptor := the.getAdaptor(matchTopic) - if adaptor != nil { - needPush := adaptor.Transform(matchTopic, Msg) - if len(needPush) > 0 { - the.ch <- needPush - } + id, err := utils.TextTemplateMatch(bodyObj, idTemplate) + if err != nil { + log.Printf("解析id 失败,请检查 %s", err.Error()) + } + params := map[string]string{ + "id": id, + } + resp, err := the.outHttpPost.HttpPostWithParams(string(body), params) + if err != nil { + return } + log.Printf("应答: %s", resp) + defer fmt.Fprintf(w, resp) + } + func (the *consumerHttpProxy) getAdaptor(flag string) (adaptor adaptors.IAdaptor4) { bridgeCode := "" if v, ok := the.Info.OtherInfo["bridgeCode"]; ok { diff --git a/dbHelper/apiServerHelper.go b/dbHelper/apiServerHelper.go index 06c5d79..29a98a0 100644 --- a/dbHelper/apiServerHelper.go +++ b/dbHelper/apiServerHelper.go @@ -10,6 +10,7 @@ type ApiServerHelper struct { mux *http.ServeMux routes map[string]string port uint + server http.Server } func NewApiServer(port uint, routes map[string]string) *ApiServerHelper { @@ -24,26 +25,34 @@ func (the *ApiServerHelper) Initial() { the.mux = http.NewServeMux() // 创建 HTTP 服务器 - ser := http.Server{ + the.server = http.Server{ Handler: the.mux, Addr: fmt.Sprintf(":%d", the.port), } the.routeRegister() - log.Printf("apiServer监听端口 %d", the.port) - go log.Fatal(ser.ListenAndServe()) } +func (the *ApiServerHelper) Run() { + the.server.ListenAndServe() + log.Printf("apiServer监听端口 %d", the.port) +} func (the *ApiServerHelper) routeRegister() { - for _, rote := range the.routes { - the.mux.HandleFunc(rote, func(w http.ResponseWriter, r *http.Request) { + for _, route := range the.routes { + the.mux.HandleFunc(route, func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") - s := fmt.Sprintf(`{"rote":"%s","resp":"安全带状态应答"}`, rote) - println("收到请求", rote) + s := fmt.Sprintf(`{"route":"%s","resp":"安全带状态应答"}`, route) + println("收到请求", route) fmt.Fprintf(w, s) }) - log.Printf("注册路由 %s", rote) + log.Printf("注册路由 %s", route) } } +func (the *ApiServerHelper) RouteRegister(route string, handler func(w http.ResponseWriter, r *http.Request)) { + + the.mux.HandleFunc(route, handler) + log.Printf("注册路由 %s", route) + +} diff --git a/dbHelper/httpHelper.go b/dbHelper/httpHelper.go index 0362e38..b5b7b3a 100644 --- a/dbHelper/httpHelper.go +++ b/dbHelper/httpHelper.go @@ -8,9 +8,9 @@ import ( "log" "mime/multipart" "net/http" + "net/url" "os" "strings" - "time" ) type HttpHelper struct { @@ -23,8 +23,6 @@ type HttpHelper struct { func (the *HttpHelper) Initial() { the.client = http.Client{} - time.Sleep(time.Second * 1) - } func (the *HttpHelper) HttpGet(queryBody string) string { url := the.Url @@ -95,6 +93,37 @@ func (the *HttpHelper) PublishWithHeader(messageBytes []byte, headers map[string } return resp, err } +func (the *HttpHelper) HttpPostWithParams(queryBody string, params map[string]string) (string, error) { + tr := &http.Transport{ + DisableKeepAlives: true, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{Transport: tr} + UrlParams := url.Values{} + for k, v := range params { + UrlParams.Set(k, v) + } + targetUrl, _ := url.ParseRequestURI(the.Url) + targetUrl.RawQuery = UrlParams.Encode() + + req, err := http.NewRequest("POST", targetUrl.String(), strings.NewReader(queryBody)) + req.Header.Set("Content-Type", "application/json") + + log.Printf("http post 开始请求,%s", targetUrl) + resp, err := client.Do(req) + if err != nil { + fmt.Println("请求POST异常 ", err, resp) + return "", err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Println("请求ReadAll异常 ", err, resp) + return "", err + } + return string(body), err +} // 静态方法 func HttpGet(url string, queryBody string) string { diff --git a/testUnit/安心云http转发_test.go b/testUnit/安心云http转发_test.go new file mode 100644 index 0000000..0cea01e --- /dev/null +++ b/testUnit/安心云http转发_test.go @@ -0,0 +1,38 @@ +package testUnit + +import ( + "fmt" + "goInOut/utils" + "testing" +) + +// 定义一个结构体,包含嵌套的结构体字段 +type Person struct { + Name Name `json:"name"` +} + +type Name struct { + First string `json:"first"` + Last string `json:"last"` +} + +func Test_httpProxy(t *testing.T) { + +} + +func Test_template(t *testing.T) { + // 创建一个Person实例 + person := Person{ + Name: Name{ + First: "John", + Last: "Doe", + }, + } + + templateStr := "Hello, {{.Name.First}} !" + sbs, err := utils.TextTemplateMatch(person, templateStr) + println(sbs) + if err != nil { + fmt.Println(err.Error()) + } +} diff --git a/utils/textTemplateHandler.go b/utils/textTemplateHandler.go new file mode 100644 index 0000000..4da4159 --- /dev/null +++ b/utils/textTemplateHandler.go @@ -0,0 +1,22 @@ +package utils + +import ( + "strings" + "text/template" +) + +func TextTemplateMatch(obj any, textTemplate string) (string, error) { + // 定义模板字符串 + tmpl, err := template.New("template").Parse(textTemplate) + if err != nil { + panic(err) + } + + sb := strings.Builder{} + err = tmpl.Execute(&sb, obj) + if err != nil { + return "", err + } + sbs := sb.String() + return sbs, err +}