Browse Source

update 添加es 最新数据的指定设备的排查log

pull/2/head
lucas 2 months ago
parent
commit
e7c1fbec68
  1. 5
      adaptors/安心云最新设备数据toES.go
  2. 5
      config/configStruct.go
  3. 2
      configFiles/config_安心云设备数据_最新同步.json
  4. 0
      configFiles/弃用备份/config_江苏农村公路桥梁.json
  5. 20
      configFiles/弃用备份/config_转发http2axy60000端口.json
  6. 25
      consumers/HTTP_PRPXY/config.go
  7. 18
      consumers/consumerAXYraw.go
  8. 94
      consumers/consumerHTTP_PRPXY.go
  9. 3
      consumers/consumerManage.go
  10. 4
      dbHelper/_kafka/consumerGroupHandler.go
  11. 38
      dbHelper/apiServer.go
  12. 46
      dbHelper/apiServerHelper.go
  13. 4
      dbHelper/httpHelper.go

5
adaptors/安心云最新设备数据toES.go

@ -34,6 +34,11 @@ func (the Adaptor_AXY_LastRAW) raw2es(iotaData models.IotaData) *models.EsRaw {
if deviceInfo.Name == "" {
return nil
}
//log 日志排查
logTagDeviceId := "91da4d1f-fbc7-4dad-bedd-f8ff05c0e0e0"
if iotaData.DeviceId == logTagDeviceId {
log.Printf("onData -> needPush 标记设备数据 [%s]", logTagDeviceId)
}
dataType := ""
if _dataType, ok := iotaData.Data.Data["_data_type"]; ok {

5
config/configStruct.go

@ -39,6 +39,11 @@ type HttpConfig struct {
Token Token `json:"token,omitempty"`
}
type ApiServerConfig struct {
Port uint `json:"port"`
Routes map[string]string `json:"routes"`
}
type Token struct {
Static string `json:"static,omitempty"`
RefreshInterval string `json:"refreshInterval,omitempty"`

2
configFiles/弃用备份/config_安心云设备数据_最新同步.json → configFiles/config_安心云设备数据_最新同步.json

@ -6,7 +6,7 @@
"brokers": [
"10.8.30.160:30992"
],
"groupId": "synchronizeRaw",
"groupId": "synchronizeRaw_50",
"topics": [
"RawData"
]

0
configFiles/config_江苏农村公路桥梁.json → configFiles/弃用备份/config_江苏农村公路桥梁.json

20
configFiles/弃用备份/config_转发http2axy60000端口.json

@ -0,0 +1,20 @@
{
"consumer": "consumerHttpProxy",
"ioConfig": {
"in": {
"httpServer": {
"port": 19700,
"userName": "goInOut",
"password": "",
"routes": [
"upload/uds/+",
"upload/ZD/+"
]
}
},
"out": {
"url": "http://127.0.0.1:4009/write?u=mingyuexia_wkd&p=mingyuexia_wkd&db=MingYueXia_Bridge&rp=autogen",
"method": "post"
}
}
}

25
consumers/HTTP_PRPXY/config.go

@ -0,0 +1,25 @@
package HTTP_PRPXY
import "goInOut/config"
type ConfigFile struct {
config.Consumer
IoConfig ioConfig `json:"ioConfig"`
OtherInfo map[string]string `json:"info"`
}
type ioConfig struct {
In In `json:"in"`
Out OUT `json:"out"`
}
type In struct {
ApiServer config.ApiServerConfig `json:"apiServer"`
}
type OUT struct {
HttpPost config.HttpConfig `json:"httpPost"`
}
type SensorInfo struct {
Name string `json:"name"` //测点名称
Code string `json:"code"` //测点编号 宜由“桥名简称-监测类别简称-构件类型编码-截面序号-构件序号-测点编号”组成
}

18
consumers/consumerAXYraw.go

@ -99,6 +99,11 @@ func (the *consumerAXYraw) toSink() {
the.sinkRawMap.Range(func(key, value any) bool {
if v, ok := value.(*models.EsRaw); ok {
raws = append(raws, *v)
//零时打日志用
if v.IotaDevice == logTagDeviceId {
bs, _ := json.Marshal(v)
log.Printf("toSink -> Range 标记设备数据 [%s] %s ", logTagDeviceId, string(bs))
}
return ok
}
return false
@ -111,12 +116,18 @@ func (the *consumerAXYraw) toSink() {
}
}
const logTagDeviceId = "91da4d1f-fbc7-4dad-bedd-f8ff05c0e0e0"
func (the *consumerAXYraw) Work() {
go the.sinkTask()
go func() {
for {
pushEsRaw := <-the.dataCache
//log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache))
if pushEsRaw.IotaDevice == logTagDeviceId {
bs, _ := json.Marshal(pushEsRaw)
log.Printf("存储 标记设备数据 [%s] %s ", logTagDeviceId, string(bs))
}
//有效数据存入缓存
the.lock.Lock()
@ -137,6 +148,11 @@ func (the *consumerAXYraw) onData(topic string, msg string) bool {
needPush := adaptor.Transform(topic, msg)
if needPush != nil {
//日志标记
if needPush.IotaDevice == logTagDeviceId {
bs, _ := json.Marshal(needPush)
log.Printf("onData -> needPush 标记设备数据 [%s] %s ", logTagDeviceId, string(bs))
}
the.dataCache <- needPush
}

94
consumers/consumerHTTP_PRPXY.go

@ -0,0 +1,94 @@
package consumers
import (
"encoding/json"
"goInOut/adaptors"
"goInOut/consumers/HTTP_PRPXY"
"goInOut/dbHelper"
"log"
"strings"
"time"
)
type consumerHttpProxy struct {
//数据缓存管道
ch chan []adaptors.NeedPush
//具体配置
Info HTTP_PRPXY.ConfigFile
InApiServer *dbHelper.ApiServerHelper
outHttpPost *dbHelper.HttpHelper
}
func (the *consumerHttpProxy) LoadConfigJson(cfgStr string) {
// 将 JSON 格式的数据解析到结构体中
err := json.Unmarshal([]byte(cfgStr), &the.Info)
if err != nil {
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
panic(err)
}
}
func (the *consumerHttpProxy) Initial(cfg string) error {
the.LoadConfigJson(cfg)
err := the.InputInitial()
if err != nil {
return err
}
err = the.OutputInitial()
return err
}
func (the *consumerHttpProxy) InputInitial() error {
the.ch = make(chan []adaptors.NeedPush, 200)
//数据入口
the.InApiServer = dbHelper.NewApiServer(
the.Info.IoConfig.In.ApiServer.Port,
the.Info.IoConfig.In.ApiServer.Routes,
)
////inTopic := "Upload/#" //荔枝乌江大桥
//for _, inTopic := range the.Info.IoConfig.In.Mqtt.Topics {
// the.InMqtt.Subscribe(inTopic, the.onData)
//}
the.InApiServer.Initial()
return nil
}
func (the *consumerHttpProxy) OutputInitial() error {
//数据出口
the.outHttpPost.Initial()
return nil
}
func (the *consumerHttpProxy) Work() {
go func() {
for {
log.Printf("取出ch数据,剩余[%d] ", len(the.ch))
time.Sleep(100 * time.Millisecond)
}
}()
}
func (the *consumerHttpProxy) onData(inTopic string, Msg string) {
if len(Msg) > 100 {
log.Printf("mqtt-recv:[%s]:%s ...", inTopic, Msg[:100])
}
topicPrefixIndex := strings.LastIndex(inTopic, "/")
matchTopic := inTopic[:topicPrefixIndex]
adaptor := the.getAdaptor(matchTopic)
if adaptor != nil {
needPush := adaptor.Transform(matchTopic, Msg)
if len(needPush) > 0 {
the.ch <- needPush
}
}
}
func (the *consumerHttpProxy) getAdaptor(flag string) (adaptor adaptors.IAdaptor4) {
bridgeCode := ""
if v, ok := the.Info.OtherInfo["bridgeCode"]; ok {
bridgeCode = v
}
if bridgeCode == "" {
panic("无正确的 bridgeCode")
}
return adaptor
}

3
consumers/consumerManage.go

@ -25,6 +25,9 @@ func GetConsumer(name string) (consumer IConsumer) {
case "consumerJSNCGLQL": //工迅-广州高支模平台
consumer = new(consumerJSNCGLQL)
case "consumerHttpProxy":
consumer = new(consumerHttpProxy)
default:
consumer = nil
}

4
dbHelper/_kafka/consumerGroupHandler.go

@ -48,7 +48,7 @@ func (h *ConsumerGroupHandler) SubscribeRaw(topic string, fun func(*sarama.Consu
func (h *ConsumerGroupHandler) decorateSubscribeString(handler func(string, string) bool) func(*sarama.ConsumerMessage) bool {
f := func(cm *sarama.ConsumerMessage) bool {
msg := string(cm.Value)
log.Printf("处理topic[%s]数据 offset=[%d]", cm.Topic, cm.Offset)
//log.Printf("处理topic[%s]数据 offset=[%d]", cm.Topic, cm.Offset)
return handler(cm.Topic, msg)
}
return f
@ -82,7 +82,7 @@ func (h *ConsumerGroupHandler) Worker() {
config := sarama.NewConfig()
config.Consumer.Return.Errors = false
config.Version = sarama.V2_0_0_0
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
group, err := sarama.NewConsumerGroup(h.brokers, h.groupId, config)

38
dbHelper/apiServer.go

@ -1,38 +0,0 @@
package dbHelper
import (
"fmt"
"github.com/gin-gonic/gin"
"log"
"net/http"
"time"
)
type RouterFunc struct {
relativePath string //相对路由 如/gzm/data/upload
funcType string // 方法类型 如 post ,get
fun func(c *gin.Context) //方法
}
type ApiServerHelper struct {
Port uint16
RoutFun map[string]RouterFunc
}
func (the *ApiServerHelper) Initial() {
router := gin.Default()
for name, routerFunc := range the.RoutFun {
switch routerFunc.funcType {
case http.MethodGet:
router.GET(routerFunc.relativePath, routerFunc.fun)
case http.MethodPost:
router.GET(routerFunc.relativePath, routerFunc.fun)
default:
log.Printf("不支持的 [%s]方法类型 %s", routerFunc.relativePath, routerFunc.funcType)
continue
}
log.Printf("注册路由 %s,监听地址=%s", name, routerFunc.relativePath)
}
router.Run(fmt.Sprintf("0.0.0.0:%d", the.Port))
time.Sleep(time.Second * 1)
}

46
dbHelper/apiServerHelper.go

@ -0,0 +1,46 @@
package dbHelper
import (
"fmt"
"log"
"net/http"
)
type ApiServerHelper struct {
mux *http.ServeMux
route map[string]string
port uint
}
func NewApiServer(port uint, routes map[string]string) *ApiServerHelper {
return &ApiServerHelper{
mux: http.NewServeMux(),
route: routes,
port: port,
}
}
func (the *ApiServerHelper) Initial() {
the.mux = http.NewServeMux()
// 创建 HTTP 服务器
ser := http.Server{
Handler: the.mux,
Addr: fmt.Sprintf(":%d", the.port),
}
log.Printf("apiServer监听端口 %d", the.port)
go log.Fatal(ser.ListenAndServe())
}
func (the *ApiServerHelper) routeRegister() {
the.mux.HandleFunc("GET /nodeList", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
s := `{"a":1}`
fmt.Fprintf(w, s)
})
the.mux.HandleFunc("GET /namespaceList", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
s := `{"b":1}`
fmt.Fprintf(w, s)
})
}

4
dbHelper/httpHelper.go

@ -111,6 +111,7 @@ func HttpGet(url string, queryBody string) string {
return string(body)
}
// 静态方法
func HttpPost(url string, queryBody string) (string, error) {
client := &http.Client{}
req, err := http.NewRequest("POST", url, strings.NewReader(queryBody))
@ -131,6 +132,7 @@ func HttpPost(url string, queryBody string) (string, error) {
return string(body), err
}
// 静态方法
func HttpPostWithHeader(url string, queryBody string, headers map[string]string) (string, error) {
tr := &http.Transport{
DisableKeepAlives: true,
@ -162,6 +164,7 @@ func HttpPostWithHeader(url string, queryBody string, headers map[string]string)
return string(body), err
}
// 静态方法
func HttpPostFormDataWithHeader(url string, queryBody string, headers map[string]string) (string, error) {
tr := &http.Transport{
DisableKeepAlives: true,
@ -193,6 +196,7 @@ func HttpPostFormDataWithHeader(url string, queryBody string, headers map[string
return string(body), err
}
// 静态方法
func UploadFile(url string, headers map[string]string, bodyParams map[string]string, fileName string) ([]byte, error) {
body := new(bytes.Buffer)
// 创建 multipart writer

Loading…
Cancel
Save