diff --git a/adaptors/安心云最新设备数据toES.go b/adaptors/安心云最新设备数据toES.go index 70d5e66..e18d759 100644 --- a/adaptors/安心云最新设备数据toES.go +++ b/adaptors/安心云最新设备数据toES.go @@ -34,6 +34,11 @@ func (the Adaptor_AXY_LastRAW) raw2es(iotaData models.IotaData) *models.EsRaw { if deviceInfo.Name == "" { return nil } + //log 日志排查 + logTagDeviceId := "91da4d1f-fbc7-4dad-bedd-f8ff05c0e0e0" + if iotaData.DeviceId == logTagDeviceId { + log.Printf("onData -> needPush 标记设备数据 [%s]", logTagDeviceId) + } dataType := "" if _dataType, ok := iotaData.Data.Data["_data_type"]; ok { diff --git a/config/configStruct.go b/config/configStruct.go index 1b83152..f6ccdf4 100644 --- a/config/configStruct.go +++ b/config/configStruct.go @@ -39,6 +39,11 @@ type HttpConfig struct { Token Token `json:"token,omitempty"` } +type ApiServerConfig struct { + Port uint `json:"port"` + Routes map[string]string `json:"routes"` +} + type Token struct { Static string `json:"static,omitempty"` RefreshInterval string `json:"refreshInterval,omitempty"` diff --git a/configFiles/弃用备份/config_安心云设备数据_最新同步.json b/configFiles/config_安心云设备数据_最新同步.json similarity index 93% rename from configFiles/弃用备份/config_安心云设备数据_最新同步.json rename to configFiles/config_安心云设备数据_最新同步.json index d07688e..bfa8d6a 100644 --- a/configFiles/弃用备份/config_安心云设备数据_最新同步.json +++ b/configFiles/config_安心云设备数据_最新同步.json @@ -6,7 +6,7 @@ "brokers": [ "10.8.30.160:30992" ], - "groupId": "synchronizeRaw", + "groupId": "synchronizeRaw_50", "topics": [ "RawData" ] diff --git a/configFiles/config_江苏农村公路桥梁.json b/configFiles/弃用备份/config_江苏农村公路桥梁.json similarity index 100% rename from configFiles/config_江苏农村公路桥梁.json rename to configFiles/弃用备份/config_江苏农村公路桥梁.json diff --git a/configFiles/弃用备份/config_转发http2axy60000端口.json b/configFiles/弃用备份/config_转发http2axy60000端口.json new file mode 100644 index 0000000..d01b113 --- /dev/null +++ b/configFiles/弃用备份/config_转发http2axy60000端口.json @@ -0,0 +1,20 @@ +{ + "consumer": "consumerHttpProxy", + "ioConfig": { + "in": { + "httpServer": { + "port": 19700, + "userName": "goInOut", + "password": "", + "routes": [ + "upload/uds/+", + "upload/ZD/+" + ] + } + }, + "out": { + "url": "http://127.0.0.1:4009/write?u=mingyuexia_wkd&p=mingyuexia_wkd&db=MingYueXia_Bridge&rp=autogen", + "method": "post" + } + } +} \ No newline at end of file diff --git a/consumers/HTTP_PRPXY/config.go b/consumers/HTTP_PRPXY/config.go new file mode 100644 index 0000000..6f8b04c --- /dev/null +++ b/consumers/HTTP_PRPXY/config.go @@ -0,0 +1,25 @@ +package HTTP_PRPXY + +import "goInOut/config" + +type ConfigFile struct { + config.Consumer + IoConfig ioConfig `json:"ioConfig"` + OtherInfo map[string]string `json:"info"` +} +type ioConfig struct { + In In `json:"in"` + Out OUT `json:"out"` +} +type In struct { + ApiServer config.ApiServerConfig `json:"apiServer"` +} + +type OUT struct { + HttpPost config.HttpConfig `json:"httpPost"` +} + +type SensorInfo struct { + Name string `json:"name"` //测点名称 + Code string `json:"code"` //测点编号 宜由“桥名简称-监测类别简称-构件类型编码-截面序号-构件序号-测点编号”组成 +} diff --git a/consumers/consumerAXYraw.go b/consumers/consumerAXYraw.go index 410f016..46ab795 100644 --- a/consumers/consumerAXYraw.go +++ b/consumers/consumerAXYraw.go @@ -99,6 +99,11 @@ func (the *consumerAXYraw) toSink() { the.sinkRawMap.Range(func(key, value any) bool { if v, ok := value.(*models.EsRaw); ok { raws = append(raws, *v) + //零时打日志用 + if v.IotaDevice == logTagDeviceId { + bs, _ := json.Marshal(v) + log.Printf("toSink -> Range 标记设备数据 [%s] %s ", logTagDeviceId, string(bs)) + } return ok } return false @@ -111,12 +116,18 @@ func (the *consumerAXYraw) toSink() { } } +const logTagDeviceId = "91da4d1f-fbc7-4dad-bedd-f8ff05c0e0e0" + func (the *consumerAXYraw) Work() { go the.sinkTask() go func() { for { pushEsRaw := <-the.dataCache - //log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache)) + + if pushEsRaw.IotaDevice == logTagDeviceId { + bs, _ := json.Marshal(pushEsRaw) + log.Printf("存储 标记设备数据 [%s] %s ", logTagDeviceId, string(bs)) + } //有效数据存入缓存 the.lock.Lock() @@ -137,6 +148,11 @@ func (the *consumerAXYraw) onData(topic string, msg string) bool { needPush := adaptor.Transform(topic, msg) if needPush != nil { + //日志标记 + if needPush.IotaDevice == logTagDeviceId { + bs, _ := json.Marshal(needPush) + log.Printf("onData -> needPush 标记设备数据 [%s] %s ", logTagDeviceId, string(bs)) + } the.dataCache <- needPush } diff --git a/consumers/consumerHTTP_PRPXY.go b/consumers/consumerHTTP_PRPXY.go new file mode 100644 index 0000000..b80d9fc --- /dev/null +++ b/consumers/consumerHTTP_PRPXY.go @@ -0,0 +1,94 @@ +package consumers + +import ( + "encoding/json" + "goInOut/adaptors" + "goInOut/consumers/HTTP_PRPXY" + "goInOut/dbHelper" + "log" + "strings" + "time" +) + +type consumerHttpProxy struct { + //数据缓存管道 + ch chan []adaptors.NeedPush + //具体配置 + Info HTTP_PRPXY.ConfigFile + InApiServer *dbHelper.ApiServerHelper + outHttpPost *dbHelper.HttpHelper +} + +func (the *consumerHttpProxy) LoadConfigJson(cfgStr string) { + // 将 JSON 格式的数据解析到结构体中 + err := json.Unmarshal([]byte(cfgStr), &the.Info) + if err != nil { + log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) + panic(err) + } +} + +func (the *consumerHttpProxy) Initial(cfg string) error { + the.LoadConfigJson(cfg) + err := the.InputInitial() + if err != nil { + return err + } + err = the.OutputInitial() + return err +} +func (the *consumerHttpProxy) InputInitial() error { + the.ch = make(chan []adaptors.NeedPush, 200) + //数据入口 + the.InApiServer = dbHelper.NewApiServer( + the.Info.IoConfig.In.ApiServer.Port, + the.Info.IoConfig.In.ApiServer.Routes, + ) + ////inTopic := "Upload/#" //荔枝乌江大桥 + //for _, inTopic := range the.Info.IoConfig.In.Mqtt.Topics { + // the.InMqtt.Subscribe(inTopic, the.onData) + //} + the.InApiServer.Initial() + return nil +} +func (the *consumerHttpProxy) OutputInitial() error { + //数据出口 + the.outHttpPost.Initial() + return nil +} +func (the *consumerHttpProxy) Work() { + go func() { + for { + + log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) + + time.Sleep(100 * time.Millisecond) + } + }() +} +func (the *consumerHttpProxy) onData(inTopic string, Msg string) { + if len(Msg) > 100 { + log.Printf("mqtt-recv:[%s]:%s ...", inTopic, Msg[:100]) + } + + topicPrefixIndex := strings.LastIndex(inTopic, "/") + matchTopic := inTopic[:topicPrefixIndex] + adaptor := the.getAdaptor(matchTopic) + if adaptor != nil { + needPush := adaptor.Transform(matchTopic, Msg) + if len(needPush) > 0 { + the.ch <- needPush + } + } +} +func (the *consumerHttpProxy) getAdaptor(flag string) (adaptor adaptors.IAdaptor4) { + bridgeCode := "" + if v, ok := the.Info.OtherInfo["bridgeCode"]; ok { + bridgeCode = v + } + if bridgeCode == "" { + panic("无正确的 bridgeCode") + } + + return adaptor +} diff --git a/consumers/consumerManage.go b/consumers/consumerManage.go index 66ab9c7..636cc6e 100644 --- a/consumers/consumerManage.go +++ b/consumers/consumerManage.go @@ -25,6 +25,9 @@ func GetConsumer(name string) (consumer IConsumer) { case "consumerJSNCGLQL": //工迅-广州高支模平台 consumer = new(consumerJSNCGLQL) + + case "consumerHttpProxy": + consumer = new(consumerHttpProxy) default: consumer = nil } diff --git a/dbHelper/_kafka/consumerGroupHandler.go b/dbHelper/_kafka/consumerGroupHandler.go index e10a27f..8e8f9b0 100644 --- a/dbHelper/_kafka/consumerGroupHandler.go +++ b/dbHelper/_kafka/consumerGroupHandler.go @@ -48,7 +48,7 @@ func (h *ConsumerGroupHandler) SubscribeRaw(topic string, fun func(*sarama.Consu func (h *ConsumerGroupHandler) decorateSubscribeString(handler func(string, string) bool) func(*sarama.ConsumerMessage) bool { f := func(cm *sarama.ConsumerMessage) bool { msg := string(cm.Value) - log.Printf("处理topic[%s]数据 offset=[%d]", cm.Topic, cm.Offset) + //log.Printf("处理topic[%s]数据 offset=[%d]", cm.Topic, cm.Offset) return handler(cm.Topic, msg) } return f @@ -82,7 +82,7 @@ func (h *ConsumerGroupHandler) Worker() { config := sarama.NewConfig() config.Consumer.Return.Errors = false config.Version = sarama.V2_0_0_0 - config.Consumer.Offsets.Initial = sarama.OffsetOldest + config.Consumer.Offsets.Initial = sarama.OffsetNewest config.Consumer.Offsets.AutoCommit.Enable = true config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} group, err := sarama.NewConsumerGroup(h.brokers, h.groupId, config) diff --git a/dbHelper/apiServer.go b/dbHelper/apiServer.go deleted file mode 100644 index a75c965..0000000 --- a/dbHelper/apiServer.go +++ /dev/null @@ -1,38 +0,0 @@ -package dbHelper - -import ( - "fmt" - "github.com/gin-gonic/gin" - "log" - "net/http" - "time" -) - -type RouterFunc struct { - relativePath string //相对路由 如/gzm/data/upload - funcType string // 方法类型 如 post ,get - fun func(c *gin.Context) //方法 -} - -type ApiServerHelper struct { - Port uint16 - RoutFun map[string]RouterFunc -} - -func (the *ApiServerHelper) Initial() { - router := gin.Default() - for name, routerFunc := range the.RoutFun { - switch routerFunc.funcType { - case http.MethodGet: - router.GET(routerFunc.relativePath, routerFunc.fun) - case http.MethodPost: - router.GET(routerFunc.relativePath, routerFunc.fun) - default: - log.Printf("不支持的 [%s]方法类型 %s", routerFunc.relativePath, routerFunc.funcType) - continue - } - log.Printf("注册路由 %s,监听地址=%s", name, routerFunc.relativePath) - } - router.Run(fmt.Sprintf("0.0.0.0:%d", the.Port)) - time.Sleep(time.Second * 1) -} diff --git a/dbHelper/apiServerHelper.go b/dbHelper/apiServerHelper.go new file mode 100644 index 0000000..665d224 --- /dev/null +++ b/dbHelper/apiServerHelper.go @@ -0,0 +1,46 @@ +package dbHelper + +import ( + "fmt" + "log" + "net/http" +) + +type ApiServerHelper struct { + mux *http.ServeMux + route map[string]string + port uint +} + +func NewApiServer(port uint, routes map[string]string) *ApiServerHelper { + return &ApiServerHelper{ + mux: http.NewServeMux(), + route: routes, + port: port, + } +} + +func (the *ApiServerHelper) Initial() { + the.mux = http.NewServeMux() + // 创建 HTTP 服务器 + ser := http.Server{ + Handler: the.mux, + Addr: fmt.Sprintf(":%d", the.port), + } + log.Printf("apiServer监听端口 %d", the.port) + go log.Fatal(ser.ListenAndServe()) +} + +func (the *ApiServerHelper) routeRegister() { + the.mux.HandleFunc("GET /nodeList", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + s := `{"a":1}` + fmt.Fprintf(w, s) + }) + + the.mux.HandleFunc("GET /namespaceList", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + s := `{"b":1}` + fmt.Fprintf(w, s) + }) +} diff --git a/dbHelper/httpHelper.go b/dbHelper/httpHelper.go index 2475293..0362e38 100644 --- a/dbHelper/httpHelper.go +++ b/dbHelper/httpHelper.go @@ -111,6 +111,7 @@ func HttpGet(url string, queryBody string) string { return string(body) } +// 静态方法 func HttpPost(url string, queryBody string) (string, error) { client := &http.Client{} req, err := http.NewRequest("POST", url, strings.NewReader(queryBody)) @@ -131,6 +132,7 @@ func HttpPost(url string, queryBody string) (string, error) { return string(body), err } +// 静态方法 func HttpPostWithHeader(url string, queryBody string, headers map[string]string) (string, error) { tr := &http.Transport{ DisableKeepAlives: true, @@ -162,6 +164,7 @@ func HttpPostWithHeader(url string, queryBody string, headers map[string]string) return string(body), err } +// 静态方法 func HttpPostFormDataWithHeader(url string, queryBody string, headers map[string]string) (string, error) { tr := &http.Transport{ DisableKeepAlives: true, @@ -193,6 +196,7 @@ func HttpPostFormDataWithHeader(url string, queryBody string, headers map[string return string(body), err } +// 静态方法 func UploadFile(url string, headers map[string]string, bodyParams map[string]string, fileName string) ([]byte, error) { body := new(bytes.Buffer) // 创建 multipart writer