From 89110bb4a5ebab19e2d88a07941a8b41a9be4472 Mon Sep 17 00:00:00 2001 From: lucas Date: Wed, 15 Jan 2025 15:24:16 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E6=9B=B4=E6=96=B0=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=A4=84=E7=90=86=E5=92=8C=E6=97=A5=E5=BF=97=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...s主题特征to河北公路设施监测.go | 8 +-- ...监测_承德_轻量化特征数据.yaml | 14 +++--- consumers/consumerHBJCAS.go | 10 ++-- dbOperate/mqttHelper.go | 50 +++++++++++++------ 4 files changed, 54 insertions(+), 28 deletions(-) diff --git a/adaptors/安心云es主题特征to河北公路设施监测.go b/adaptors/安心云es主题特征to河北公路设施监测.go index f827736..f9dd09b 100644 --- a/adaptors/安心云es主题特征to河北公路设施监测.go +++ b/adaptors/安心云es主题特征to河北公路设施监测.go @@ -2,10 +2,10 @@ package adaptors import ( "encoding/json" - "github.com/labstack/gommon/log" "goInOut/consumers/HBJCAS" "goInOut/consumers/HBJCAS/protoFiles_hb" "google.golang.org/protobuf/proto" + "log" "math" "strconv" "time" @@ -42,7 +42,7 @@ func (the Adaptor_AXYES_HBGL) Transform(structId int64, factorId int, rawMsg str func (the Adaptor_AXYES_HBGL) EsAggTopToHBJCAS(structId int64, factorId int, esAggs HBJCAS.EsThemeAggDateHistogram) (result []byte) { buckets := esAggs.Aggregations.GroupSensor.Buckets if len(buckets) == 0 { - log.Info("es agg数据数量==0") + log.Printf("[s=%d,f=%d] ,es agg数据数量==0", structId, factorId) return } //设施唯一编码(省平台) @@ -73,7 +73,8 @@ func (the Adaptor_AXYES_HBGL) EsAggTopToHBJCAS(structId int64, factorId int, esA complexData.SensorData = append(complexData.SensorData, dataDefinition) } } - + v, _ := json.Marshal(complexData) + log.Printf("[s:%d,f:%d] 特征数据=> %s", structId, factorId, v) result, _ = proto.Marshal(complexData) return result } @@ -89,6 +90,7 @@ func (the Adaptor_AXYES_HBGL) getMonitorTypeByFactorId(factorId int) protoFiles_ case 28: return protoFiles_hb.MonitoryType_VIB default: + log.Printf("factorId=%d,无匹配的MonitorType", factorId) return protoFiles_hb.MonitoryType_CMM } } diff --git a/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml b/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml index 5acc086..a3772cc 100644 --- a/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml +++ b/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml @@ -5,16 +5,16 @@ ioConfig: url: https://esproxy.anxinyun.cn/anxincloud_themes/_search out: mqtt: - host: 10.8.30.160 - port: 30883 + host: 123.249.81.52 + port: 8883 userName: bs1307 password: 7vEh(xomn9DX4X(L clientId: chengDe topics: - t/province/1307 monitor: - #振动是触发式,数据迟缓 cron10min也改成1小时一次 上报多条,不进行实时上报 - cron10min: 40 0/1 * * * #6/10 * * * * + #振动是触发式,数据迟缓 cron10min也改成1小时一次 最多上报6条,不进行10min实时上报 + cron10min: 20 0/1 * * * #6/10 * * * * #普通类型 特征数据 cron1hour: 11 0/1 * * * info: @@ -24,9 +24,9 @@ structInfo: 5011: 130110 5016: 130109 #点位id对应信息 -pointInfo: - 5011: #河北承德横河子中桥 - #裂缝 crack +pointInfo: #测点类型支持 桥墩倾斜 15 裂缝18 支座位移20 桥面振动28 + 5011: #河北承德横河子中桥 (axy structureId) + #裂缝 axy sensorId映射 省平台 pointUniqueCode 68397: 1301100001 68398: 1301100002 68399: 1301100003 diff --git a/consumers/consumerHBJCAS.go b/consumers/consumerHBJCAS.go index dc1d0da..b110626 100644 --- a/consumers/consumerHBJCAS.go +++ b/consumers/consumerHBJCAS.go @@ -72,8 +72,10 @@ func (the *consumerHBJCAS) OutputInitial() error { the.Info.IoConfig.Out.Mqtt.ClientId, the.Info.IoConfig.Out.Mqtt.UserName, the.Info.IoConfig.Out.Mqtt.Password, - false, //按照具体项目来 - "") + true, //按照具体项目来 + "consumers/HBJCAS/ssl/cacert.pem", + "consumers/HBJCAS/ssl/client-cert.pem", + "consumers/HBJCAS/ssl/client-key.pem") return nil } func (the *consumerHBJCAS) Work() { @@ -142,7 +144,7 @@ func (the *consumerHBJCAS) getEs1HourAggData() { } func (the *consumerHBJCAS) getEs10minAggData() { - //utils.GetTimeRangeBy10min() 由于振动数据 + //utils.GetTimeRangeBy10min() 由于振动数据实时性问题 改用一小时统一上报 start, end := utils.GetTimeRangeByHour(-1) log.Printf("查询数据时间范围 %s - %s", start, end) factorIds := []int{28} @@ -159,6 +161,7 @@ func (the *consumerHBJCAS) getEs10minAggData() { needPushes := adaptor.Transform(structId, factorId, esAggResultStr) for i := range needPushes { needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) + log.Printf("topic[%s],Payload=> %s", needPushes[i].Topic, hex.EncodeToString(needPushes[i].Payload)) } if len(needPushes) > 0 { @@ -182,7 +185,6 @@ func (the *consumerHBJCAS) crc16rc4(transBytes []byte) []byte { rc4.NewCipher(rc4Key) cipher1, _ := rc4.NewCipher(rc4Key) cipher1.XORKeyStream(dest1, needRC4) - log.Printf("rc4加密结果=> %s ", hex.EncodeToString(dest1)) return dest1 } func (the *consumerHBJCAS) getESQueryStrByHour(structureId int64, factorId int, start, end string) string { diff --git a/dbOperate/mqttHelper.go b/dbOperate/mqttHelper.go index da40482..c0f66b2 100644 --- a/dbOperate/mqttHelper.go +++ b/dbOperate/mqttHelper.go @@ -30,22 +30,26 @@ func (the *MqttHelper) reConn2Subscribe(client mqtt.Client) { the.Subscribe(call.topic, call.f) } } -func (the *MqttHelper) initialClient(sslEnable bool, caPath string) { +func (the *MqttHelper) initialClient(sslEnable bool, caCertPath, clientCertPath, clientKeyPath string) { maxReConnCount := 3 ReConnDurationSec := 30 reConn: - mqttConnectStr := fmt.Sprintf("tcp://%v:%d", the.Host, the.Port) + tag := "tcp" + if sslEnable { + tag = "ssl" + } + mqttConnectStr := fmt.Sprintf("%s://%v:%d", tag, the.Host, the.Port) opts := mqtt.NewClientOptions().AddBroker(mqttConnectStr) opts.SetUsername(the.UserName) opts.SetPassword(the.Password) opts.SetClientID(the.ClientId) opts.SetOnConnectHandler(the.reConn2Subscribe) if sslEnable { - opts.SetTLSConfig(NewTlsConfig(caPath)) + opts.SetTLSConfig(NewTlsConfig(caCertPath, clientCertPath, clientKeyPath)) } the.client = mqtt.NewClient(opts) if token := the.client.Connect(); token.Wait() && token.Error() != nil { - log.Printf("mqtt连接状态异常 %v(u:%v,p:%v,cid:%v) [err=%s]", mqttConnectStr, the.UserName, the.Password, the.ClientId, token.Error()) + log.Printf("mqtt连接状态异常 %v [ u:%v,p:%v,cid:%v ] [err=%s]", mqttConnectStr, the.UserName, the.Password, the.ClientId, token.Error()) log.Printf("mqtt重连,%ds后尝试,剩余次数=%d", ReConnDurationSec, maxReConnCount) if maxReConnCount > 0 { maxReConnCount-- @@ -59,26 +63,36 @@ reConn: } func (the *MqttHelper) Initial() { - the.initialClient(false, "") + the.initialClient(false, "", "", "") } -func (the *MqttHelper) InitialWithSSL(caPath string) { - the.initialClient(true, caPath) +func (the *MqttHelper) InitialWithSSL(caCertPath, clientCertPath, clientKeyPath string) { + the.initialClient(true, caCertPath, clientCertPath, clientKeyPath) } -func NewTlsConfig(sslPath string) *tls.Config { +func NewTlsConfig(caCertPath, clientCertPath, clientKeyPath string) *tls.Config { //"ssl/centerCA.crt" - certpool := x509.NewCertPool() - ca, err := os.ReadFile(sslPath) + certPool := x509.NewCertPool() + ca, err := os.ReadFile(caCertPath) if err != nil { log.Fatalln(err.Error()) } - certpool.AppendCertsFromPEM(ca) - return &tls.Config{ - //RootCAs: certpool, + certPool.AppendCertsFromPEM(ca) + + tlsConfig := &tls.Config{ + RootCAs: certPool, InsecureSkipVerify: true, } + if len(clientCertPath) > 0 && len(clientKeyPath) > 0 { + // 读取客户端证书和密钥 + clientCert, err := tls.LoadX509KeyPair(clientCertPath, clientKeyPath) + if err != nil { + fmt.Printf("Error loading client certificate and key: %v\n", err) + } + tlsConfig.Certificates = append(tlsConfig.Certificates, clientCert) + } + return tlsConfig } func (the *MqttHelper) Publish(topic string, messageBytes []byte) { @@ -123,7 +137,15 @@ func MqttInitial(host string, port int, clientId string, userName string, passwo } if isSSL && len(caPtah) > 0 { log.Println("SSL mqHelpers初始化") - mqHelpers.InitialWithSSL(caPtah[0]) + switch len(caPtah) { + case 1: + mqHelpers.InitialWithSSL(caPtah[0], "", "") + case 3: + mqHelpers.InitialWithSSL(caPtah[0], caPtah[1], caPtah[2]) + default: + log.Printf("caPtah 参数量错误,请注意") + } + } else { mqHelpers.Initial() }