diff --git a/adaptors/安心云es主题特征to河北公路设施监测.go b/adaptors/安心云es主题特征to河北公路设施监测.go index bef6f68..07db9cc 100644 --- a/adaptors/安心云es主题特征to河北公路设施监测.go +++ b/adaptors/安心云es主题特征to河北公路设施监测.go @@ -2,8 +2,10 @@ package adaptors import ( "encoding/json" + "goInOut/consumers/HBJCAS/protoFiles_hb" "goInOut/models" - "strings" + "google.golang.org/protobuf/proto" + "time" ) // Adaptor_AXYES_HBGL 统一采集软件数据 转换 湘潭健康监测平台 @@ -24,12 +26,40 @@ func (the Adaptor_AXYES_HBGL) Transform(rawMsg string) []NeedPush { if err != nil { return nil } + needPush = append(needPush, NeedPush{ + Payload: the.EsAggTopToHBJCAS(esAggDateHistogram), + }) return needPush } -func (the Adaptor_AXYES_HBGL) EsAggTopToHBKS(esAggTop models.EsAggTop) (result []byte) { +func (the Adaptor_AXYES_HBGL) EsAggTopToHBJCAS(esAgg models.EsThemeAggDateHistogram) (result []byte) { - fileContent := strings.Builder{} + Atime := time.Now() + QjComplexData := &protoFiles_hb.ComplexData{} + dataDefinitionStatisticData := &protoFiles_hb.DataDefinition_StatisticData{ + StatisticData: &protoFiles_hb.StatisticData{ + MonitorType: protoFiles_hb.MonitoryType_INC, + MonitorCode: 13000100001, + EventTime: Atime.Add(-8 * time.Hour).UnixMilli(), + Interval: 60 * 1000, + DataBody: &protoFiles_hb.StatisticData_Inc{Inc: &protoFiles_hb.INCStatistic{ + MaxAbsoluteValueX: 0, + AvgValueX: 0, + RootMeanSquareX: 0, + MaxAbsoluteValueY: 0, + AvgValueY: 0, + RootMeanSquareY: 0, + }}, + }, + } + + dataDefinition := &protoFiles_hb.DataDefinition{ + DataType: protoFiles_hb.DataType_STATISTICS, + UniqueCode: "130109", //乃积沟大桥 + DataBody: dataDefinitionStatisticData, + } - return []byte(fileContent.String()) + QjComplexData.SensorData = append(QjComplexData.SensorData, dataDefinition) + result, _ = proto.Marshal(QjComplexData) + return result } diff --git a/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.json b/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.json index 10dc0ed..cb64663 100644 --- a/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.json +++ b/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.json @@ -5,7 +5,7 @@ "http": { "url": "https://esproxy.anxinyun.cn/anxincloud_themes/_search" }, - "cronStr": "16 0/1 * * *" + "cronStr": "55 0/1 * * *" }, "out": { "mqtt": { @@ -17,11 +17,11 @@ "Topics": [ "t/province/1307" ] - }, - "rc4key": "sK3kJttzZyf7aZI94zSYgytBYCrfZRt6yil2" + } } }, "info": { - "structureId": "5016" + "structureId": "5016", + "rc4key": "sK3kJttzZyf7aZI94zSYgytBYCrfZRt6yil2" } } \ No newline at end of file diff --git a/consumers/HBJCAS/protoFiles/MonitorDataProtocol-Ministry-V3.pb.go b/consumers/HBJCAS/protoFiles_hb/MonitorDataProtocol-Ministry-V3.pb.go similarity index 99% rename from consumers/HBJCAS/protoFiles/MonitorDataProtocol-Ministry-V3.pb.go rename to consumers/HBJCAS/protoFiles_hb/MonitorDataProtocol-Ministry-V3.pb.go index e81ddae..7272747 100644 --- a/consumers/HBJCAS/protoFiles/MonitorDataProtocol-Ministry-V3.pb.go +++ b/consumers/HBJCAS/protoFiles_hb/MonitorDataProtocol-Ministry-V3.pb.go @@ -11,7 +11,7 @@ // protoc v4.22.2 // source: MonitorDataProtocol-Ministry-V3.proto -package protoFiles +package protoFiles_hb import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" @@ -9561,8 +9561,9 @@ var file_MonitorDataProtocol_Ministry_V3_proto_rawDesc = []byte{ 0x44, 0x54, 0x51, 0x51, 0x58, 0x10, 0x32, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x42, 0x4c, 0x46, 0x10, 0x33, 0x12, 0x07, 0x0a, 0x03, 0x4b, 0x53, 0x59, 0x10, 0x34, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x58, 0x53, 0x57, 0x10, 0x35, 0x12, 0x07, 0x0a, 0x03, 0x50, 0x4c, 0x44, 0x10, 0x61, 0x12, 0x07, 0x0a, - 0x03, 0x43, 0x4d, 0x4d, 0x10, 0x63, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2e, 0x2f, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x03, 0x43, 0x4d, 0x4d, 0x10, 0x63, 0x42, 0x12, 0x5a, 0x10, 0x2e, 0x2e, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x5f, 0x68, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( diff --git a/consumers/consumerHBJCAS.go b/consumers/consumerHBJCAS.go index f06fb65..eebab5f 100644 --- a/consumers/consumerHBJCAS.go +++ b/consumers/consumerHBJCAS.go @@ -1,6 +1,8 @@ package consumers import ( + "crypto/rc4" + "encoding/hex" "encoding/json" "fmt" "goInOut/adaptors" @@ -74,7 +76,14 @@ func (the *consumerHBJCAS) Work() { for _, push := range needPushList { if push.Topic != "" { the.outMqtt.Publish(push.Topic, push.Payload) + continue } + + //没有标记topic 的 按照配置文件里面的推送 + for _, topic := range the.Info.IoConfig.Out.Mqtt.Topics { + the.outMqtt.Publish(topic, push.Payload) + } + } time.Sleep(100 * time.Millisecond) @@ -91,24 +100,41 @@ func (the *consumerHBJCAS) getEsData() { structureId := the.getStructureId() start, end := utils.GetTimeRangeByHour(-1) log.Printf("查询数据时间范围 %s - %s", start, end) - //start := "2024-02-05T00:00:00.000+0800" - //end := "2024-02-05T23:59:59.999+0800" - factorIds := []string{"15", "20", "28"} - for _, factorId := range factorIds { - esQuery := the.getESQueryStr(structureId, factorId, start, end) + hourFactorIds := []string{"15", "20"} //, "28" + for _, factorId := range hourFactorIds { + esQuery := the.getESQueryStrByHour(structureId, factorId, start, end) auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} esAggResult := the.InHttp.HttpGetWithHeader(esQuery, auth) adaptor := the.getAdaptor() - needPush := adaptor.Transform(esAggResult) - if len(needPush) > 0 { - the.ch <- needPush + needPushes := adaptor.Transform(esAggResult) + for i := range needPushes { + needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) + } + + if len(needPushes) > 0 { + the.ch <- needPushes } } } - -func (the *consumerHBJCAS) getESQueryStr(structureId, factorId, start, end string) string { +func (the *consumerHBJCAS) crc16rc4(transBytes []byte) []byte { + resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes) + needRC4 := append(transBytes, resultByCrc16...) + rc4KeyStr, ok := the.Info.OtherInfo["rc4key"] + if !ok { + log.Panicf("未配置 rc4key") + } + rc4Key := []byte(rc4KeyStr) //the.RC4Key + // 加密操作 + dest1 := make([]byte, len(needRC4)) + rc4.NewCipher(rc4Key) + cipher1, _ := rc4.NewCipher(rc4Key) + cipher1.XORKeyStream(dest1, needRC4) + log.Printf("rc4加密结果=> %s ", hex.EncodeToString(dest1)) + return dest1 +} +func (the *consumerHBJCAS) getESQueryStrByHour(structureId, factorId, start, end string) string { aggSubSql := getEsAggSubSqlByFactorId(factorId) esQuery := fmt.Sprintf(` { @@ -207,7 +233,7 @@ func getEsAggSubSqlByFactorId(factorId string) string { func (the *consumerHBJCAS) getStructureId() string { structureId, ok := the.Info.OtherInfo["structureId"] if !ok { - structureId = "5016" //河北承德乃积沟大桥结构物id=5016 + log.Panicf("无法识别有效的structureId") } return structureId }