diff --git a/adaptors/安心云es主题特征to广东省平台.go b/adaptors/安心云es主题特征to广东省平台.go index 772f1f8..d6b7371 100644 --- a/adaptors/安心云es主题特征to广东省平台.go +++ b/adaptors/安心云es主题特征to广东省平台.go @@ -2,12 +2,11 @@ package adaptors import ( "encoding/json" - "fmt" + "goInOut/consumers/GDJKJC" "goInOut/consumers/HBJCAS" "goInOut/consumers/HBJCAS/protoFiles_hb" "goInOut/dbOperate" - "goInOut/models" - "google.golang.org/protobuf/proto" + "log" "math" "strconv" @@ -33,18 +32,22 @@ func (the Adaptor_AXYES_GDJKJC) Transform(structId int64, factorId int, rawMsg s return nil } - Payload := the.EsAggTopToGDJKJC(structId, factorId, esAggDateHistogram) - if len(Payload) == 0 { + Payloads := the.EsAggTopToGDJKJC(structId, factorId, esAggDateHistogram) + if len(Payloads) == 0 { return needPush } - needPush = append(needPush, NeedPush{ - Payload: Payload, - }) + for _, payload := range Payloads { + needPush = append(needPush, NeedPush{ + Topic: strconv.FormatInt(structId, 10), + Payload: payload, + }) + } + return needPush } -func (the Adaptor_AXYES_GDJKJC) EsAggTopToGDJKJC(structId int64, factorId int, esAggs HBJCAS.EsThemeAggDateHistogram) (result []byte) { +func (the Adaptor_AXYES_GDJKJC) EsAggTopToGDJKJC(structId int64, factorId int, esAggs HBJCAS.EsThemeAggDateHistogram) (result [][]byte) { buckets := esAggs.Aggregations.GroupSensor.Buckets if len(buckets) == 0 { log.Printf("[s=%d,f=%d] ,es agg数据数量==0", structId, factorId) @@ -57,35 +60,33 @@ func (the Adaptor_AXYES_GDJKJC) EsAggTopToGDJKJC(structId int64, factorId int, e return } //数据汇总 - complexData := &protoFiles_hb.ComplexData{} for _, sensorBucket := range buckets { sensorId := sensorBucket.Key for _, dateBucket := range sensorBucket.GroupDate.Buckets { //优先redis获取 - station := models.Station{} - k1 := fmt.Sprintf("station:%d", sensorId) - errRedis := the.Redis.GetObj(k1, &station) - if errRedis != nil { - log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签异常", structId, factorId, sensorId) - continue - } - monitorCode := the.getPointCodeFromLabel(station.Labels) - if monitorCode == 0 { - log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签,信息转换int64异常,跳过", structId, factorId, sensorId) + //station := models.Station{} + //k1 := fmt.Sprintf("station:%d", sensorId) + //errRedis := the.Redis.GetObj(k1, &station) + //if errRedis != nil { + // log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签异常", structId, factorId, sensorId) + // continue + //} + monitorCode := "LJ-VIB-P01-004-01" //the.getPointCodeFromLabel(station.Labels) + if monitorCode == "" { + log.Printf("redis 获取[s:%d,f:%d]测点[%d]信息,异常,跳过", structId, factorId, sensorId) continue } - dataDefinition := &protoFiles_hb.DataDefinition{ - DataType: protoFiles_hb.DataType_STATISTICS, - UniqueCode: fmt.Sprintf("%d", uniqueCode), //乃积沟大桥 - DataBody: the.EsAgg2StatisticData(factorId, monitorCode, dateBucket), - } - complexData.SensorData = append(complexData.SensorData, dataDefinition) + dataBytes := the.EsAgg2StatisticData(factorId, monitorCode, dateBucket) + //dataDefinition := &protoFiles_hb.DataDefinition{ + // DataType: protoFiles_hb.DataType_STATISTICS, + // UniqueCode: fmt.Sprintf("%d", uniqueCode), //乃积沟大桥 + // DataBody: the.EsAgg2StatisticData(factorId, monitorCode, dateBucket), + //} + result = append(result, dataBytes) + log.Printf("[s:%d,f:%d] t=%s, 特征数据=> %s", structId, factorId, dateBucket.KeyAsString, dataBytes) } } - v, _ := json.Marshal(complexData) - log.Printf("[s:%d,f:%d] 特征数据=> %s", structId, factorId, v) - result, _ = proto.Marshal(complexData) return result } func (the Adaptor_AXYES_GDJKJC) getMonitorTypeByFactorId(factorId int) protoFiles_hb.MonitoryType { @@ -107,7 +108,7 @@ func (the Adaptor_AXYES_GDJKJC) getMonitorTypeByFactorId(factorId int) protoFile } } -func (the Adaptor_AXYES_GDJKJC) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket HBJCAS.BucketsXY) *protoFiles_hb.DataDefinition_StatisticData { +func (the Adaptor_AXYES_GDJKJC) EsAgg2StatisticData(factorId int, monitorCode string, dateBucket HBJCAS.BucketsXY) (dataBytes []byte) { Atime := dateBucket.KeyAsString.Add(-8 * time.Hour).UnixMilli() maxAbsoluteValueX := max(math.Abs(dateBucket.X.Max), math.Abs(dateBucket.X.Min)) avgValueX := dateBucket.X.Avg @@ -117,53 +118,35 @@ func (the Adaptor_AXYES_GDJKJC) EsAgg2StatisticData(factorId int, monitorCode in avgValueY := dateBucket.Y.Avg rootMeanSquareY := math.Sqrt(dateBucket.Y.SumOfSquares / float64(dateBucket.Y.Count)) - monitoryType := the.getMonitorTypeByFactorId(factorId) - dataDefinitionStatisticData := &protoFiles_hb.DataDefinition_StatisticData{ - StatisticData: &protoFiles_hb.StatisticData{ - MonitorType: monitoryType, - MonitorCode: monitorCode, //测点唯一编码 - EventTime: Atime, - Interval: 60 * 1000, - }, + commonBody := GDJKJC.CommonBody{ + ThirdChannelCode: monitorCode, + DataTimeUnix: Atime, } - + var destStruct any switch factorId { case 15: //倾角 - dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Inc{Inc: &protoFiles_hb.INCStatistic{ - MaxAbsoluteValueX: float32(maxAbsoluteValueX), - AvgValueX: float32(avgValueX), - RootMeanSquareX: float32(rootMeanSquareX), - MaxAbsoluteValueY: float32(maxAbsoluteValueY), - AvgValueY: float32(avgValueY), - RootMeanSquareY: float32(rootMeanSquareY), - }} - case 18: //裂缝监测 - dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Crk{Crk: &protoFiles_hb.CRKStatistic{ - MaxAbsoluteValue: float32(maxAbsoluteValueX), - AvgValue: float32(avgValueX), - RootMeanSquare: float32(rootMeanSquareX), - TotalAbsoluteValue: float32(dateBucket.X.Max - dateBucket.X.Min), - }} - case 20: //支座位移 - dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Dis{Dis: &protoFiles_hb.DISStatistic{ - MaxAbsoluteValue: float32(maxAbsoluteValueX), - AvgValue: float32(avgValueX), - RootMeanSquare: float32(rootMeanSquareX), - TotalAbsoluteValue: float32(dateBucket.X.Max - dateBucket.X.Min), - }} + destStruct = GDJKJC.A43AngleBody{ + CommonBody: commonBody, + XAvg: avgValueX, + XMax: maxAbsoluteValueX, + XRms: rootMeanSquareX, //均方根 + YAvg: avgValueY, + YMax: maxAbsoluteValueY, + YRms: rootMeanSquareY, //均方根 + } case 28: //振动 - dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Vib{Vib: &protoFiles_hb.VIBStatistic{ - MaxAbsoluteValue: float32(maxAbsoluteValueX), - RootMeanSquare: float32(rootMeanSquareY), - }} - case 592: //加速度三项监测 - dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Vib{Vib: &protoFiles_hb.VIBStatistic{ - MaxAbsoluteValue: float32(maxAbsoluteValueX), - RootMeanSquare: float32(rootMeanSquareX), - }} + destStruct = GDJKJC.A48Acc{ + CommonBody: commonBody, + Max: maxAbsoluteValueX / 100, //省平台 加速度单位 m/s² 实际设备单位 cm/s² + Rms: rootMeanSquareX / 100, //均方根 + } } - - return dataDefinitionStatisticData + if destStruct != nil { + dataBytes, _ = json.Marshal(destStruct) + } else { + log.Printf("!!! [f=%d,s=%s]无匹配 映射数据", factorId, monitorCode) + } + return dataBytes } func (the Adaptor_AXYES_GDJKJC) getUniqueCode(structId int64) (uniqueCode string) { diff --git a/configFiles/config_安心云测点特征数据_广东省平台.yaml b/configFiles/config_安心云测点特征数据_广东省平台.yaml index 0a4a2f4..636c815 100644 --- a/configFiles/config_安心云测点特征数据_广东省平台.yaml +++ b/configFiles/config_安心云测点特征数据_广东省平台.yaml @@ -9,19 +9,31 @@ ioConfig: method: "post" monitor: #振动是触发式,数据迟缓 cron10min也改成1小时一次 最多上报6条,不进行10min实时上报 - cron10min: 22 0/1 * * * #6/10 * * * * + cron10min: 36 0/1 * * * #6/10 * * * * #普通类型 特征数据 cron1hour: 20 0/1 * * * info: - rc4key: sK3kJttzZyf7aZI94zSYgytBYCrfZRt6yil2 + 5450: #隆江大桥 + appKey: db43bc5d74534348 + appSecret: 162c0a92f089464eaf9349358af4a830 + 5452: #深汕东边坡 + appKey: cdf1493747174885 + appSecret: 563e5d752181450da3b9d1617f75b527 + 5455: #螺河特大桥 + appKey: 11b8f81901134570 + appSecret: 1505ef9480e4491bb578e2d7d9781620 + 5456: #螺河东大桥 + appKey: d20477c3247b4012 + appSecret: 3d9bfb809be545bba11af17a88c81eb3 + queryComponent: redis: address: 10.8.30.160:30379 #结构物id对应 structInfo: - 5450: G0000000000001 - 5452: G0000000000002 - 5455: G0000000000003 - 5456: G0000000000004 + 5450: G0000000000001 #隆江大桥 + 5452: G0000000000002 #深汕东边坡 + 5455: G0000000000003 #螺河特大桥 + 5456: G0000000000004 #螺河东大桥 diff --git a/consumers/GDJKJC/config.go b/consumers/GDJKJC/config.go index 007257c..2b721e8 100644 --- a/consumers/GDJKJC/config.go +++ b/consumers/GDJKJC/config.go @@ -4,7 +4,7 @@ import "goInOut/config" type ConfigFile struct { IoConfig ioConfig `yaml:"ioConfig"` - OtherInfo map[string]string `yaml:"info"` + Info map[string]AppKeySecret `yaml:"info"` PointInfo map[int64]map[int64]int64 `yaml:"pointInfo"` StructInfo map[int64]string `yaml:"structInfo"` Monitor map[string]string `yaml:"monitor"` @@ -27,3 +27,8 @@ type queryComponent struct { Address string `yaml:"address"` } `yaml:"redis"` } + +type AppKeySecret struct { + AppKey string `yaml:"appKey"` + AppSecret string `yaml:"appSecret"` +} diff --git a/consumers/consumerAXYES2GDJKJC.go b/consumers/consumerAXYES2GDJKJC.go index d9ad787..a9edf27 100644 --- a/consumers/consumerAXYES2GDJKJC.go +++ b/consumers/consumerAXYES2GDJKJC.go @@ -1,8 +1,6 @@ package consumers import ( - "crypto/rc4" - "encoding/hex" "fmt" "goInOut/adaptors" "goInOut/consumers/GDJKJC" @@ -11,6 +9,7 @@ import ( "goInOut/utils" "gopkg.in/yaml.v3" "log" + "strconv" "time" ) @@ -18,15 +17,15 @@ type consumerAXYES2GDJKJC struct { //数据缓存管道 ch chan []adaptors.NeedPush //具体配置 - Info GDJKJC.ConfigFile - InHttp *dbOperate.HttpHelper - outHttp *dbOperate.HttpHelper - monitor *monitors.CommonMonitor - infoRedis *dbOperate.RedisHelper + ConfigInfo GDJKJC.ConfigFile + InHttp *dbOperate.HttpHelper + outHttp *dbOperate.HttpHelper + monitor *monitors.CommonMonitor + infoRedis *dbOperate.RedisHelper } func (the *consumerAXYES2GDJKJC) LoadConfigJson(cfgStr string) { - err := yaml.Unmarshal([]byte(cfgStr), &the.Info) + err := yaml.Unmarshal([]byte(cfgStr), &the.ConfigInfo) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) @@ -49,13 +48,13 @@ func (the *consumerAXYES2GDJKJC) Initial(cfg string) error { func (the *consumerAXYES2GDJKJC) InputInitial() error { the.ch = make(chan []adaptors.NeedPush, 200) //数据入口 - the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""} + the.InHttp = &dbOperate.HttpHelper{Url: the.ConfigInfo.IoConfig.In.Http.Url, Token: ""} the.monitor = &monitors.CommonMonitor{ MonitorHelper: &monitors.MonitorHelper{}, } the.monitor.Start() - for taskName, cron := range the.Info.Monitor { + for taskName, cron := range the.ConfigInfo.Monitor { switch taskName { case "cron10min": the.monitor.RegisterTask(cron, the.GetEs10minAggData) @@ -70,13 +69,13 @@ func (the *consumerAXYES2GDJKJC) InputInitial() error { } func (the *consumerAXYES2GDJKJC) OutputInitial() error { //数据出口 - the.outHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.Out.Http.Url, Token: ""} + the.outHttp = &dbOperate.HttpHelper{Url: the.ConfigInfo.IoConfig.Out.Http.Url, Token: ""} return nil } func (the *consumerAXYES2GDJKJC) infoComponentInitial() error { //数据出口 - addr := the.Info.QueryComponent.Redis.Address + addr := the.ConfigInfo.QueryComponent.Redis.Address the.infoRedis = dbOperate.NewRedisHelper("", addr) return nil } @@ -89,7 +88,22 @@ func (the *consumerAXYES2GDJKJC) Work() { } for _, push := range needPushList { - _, err := the.outHttp.Publish(push.Payload) + structIdStr := push.Topic + if _, ok := the.ConfigInfo.Info[structIdStr]; !ok { + log.Printf("structId=%s 无匹配的省平台AppKeySecret", structIdStr) + continue + } + + appKey := the.ConfigInfo.Info[structIdStr].AppKey + appSecret := the.ConfigInfo.Info[structIdStr].AppSecret + rnd := strconv.FormatInt(time.Now().Unix(), 10) + Header := map[string]string{ + "appKey": appKey, + "rnd": rnd, + "sign": utils.GetSign(string(push.Payload), rnd, appKey, appSecret), + } + + _, err := the.outHttp.PublishWithHeader(push.Payload, Header) if err != nil { log.Printf("数据推送异常=> %s", err.Error()) return @@ -110,7 +124,7 @@ func (the *consumerAXYES2GDJKJC) getAdaptor() (adaptor adaptors.Adaptor_AXYES_GD func (the *consumerAXYES2GDJKJC) getStructIds() []int64 { var structIds []int64 - for strutId, _ := range the.Info.StructInfo { + for strutId, _ := range the.ConfigInfo.StructInfo { structIds = append(structIds, strutId) } return structIds @@ -127,12 +141,9 @@ func (the *consumerAXYES2GDJKJC) getEs1HourAggData() { esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) adaptor := the.getAdaptor() - adaptor.PointInfo = the.Info.PointInfo - adaptor.StructInfo = the.Info.StructInfo + adaptor.PointInfo = the.ConfigInfo.PointInfo + adaptor.StructInfo = the.ConfigInfo.StructInfo needPushes := adaptor.Transform(structId, factorId, esAggResultStr) - for i := range needPushes { - needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) - } if len(needPushes) > 0 { the.ch <- needPushes @@ -155,13 +166,9 @@ func (the *consumerAXYES2GDJKJC) GetEs10minAggData() { esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) adaptor := the.getAdaptor() - adaptor.PointInfo = the.Info.PointInfo - adaptor.StructInfo = the.Info.StructInfo + adaptor.PointInfo = the.ConfigInfo.PointInfo + adaptor.StructInfo = the.ConfigInfo.StructInfo needPushes := adaptor.Transform(structId, factorId, esAggResultStr) - for i := range needPushes { - needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) - log.Printf("topic[%s],Payload=> %s", needPushes[i].Topic, hex.EncodeToString(needPushes[i].Payload)) - } if len(needPushes) > 0 { the.ch <- needPushes @@ -171,22 +178,6 @@ func (the *consumerAXYES2GDJKJC) GetEs10minAggData() { } -func (the *consumerAXYES2GDJKJC) crc16rc4(transBytes []byte) []byte { - resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes) - needRC4 := append(transBytes, resultByCrc16...) - rc4KeyStr, ok := the.Info.OtherInfo["rc4key"] - if !ok { - log.Panicf("未配置 rc4key") - } - rc4Key := []byte(rc4KeyStr) //the.RC4Key - // 加密操作 - dest1 := make([]byte, len(needRC4)) - rc4.NewCipher(rc4Key) - cipher1, _ := rc4.NewCipher(rc4Key) - cipher1.XORKeyStream(dest1, needRC4) - return dest1 -} - func (the *consumerAXYES2GDJKJC) getESQueryStrByHour(structureId int64, factorId int, start, end string) string { aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) esQuery := fmt.Sprintf(` @@ -300,11 +291,3 @@ func (the *consumerAXYES2GDJKJC) getESQueryStrBy10min(structureId int64, factorI return esQuery } - -func (the *consumerAXYES2GDJKJC) getStructureId() string { - structureId, ok := the.Info.OtherInfo["structureId"] - if !ok { - log.Panicf("无法识别有效的structureId") - } - return structureId -} diff --git a/consumers/consumerManage.go b/consumers/consumerManage.go index ac5e1e5..a76f30e 100644 --- a/consumers/consumerManage.go +++ b/consumers/consumerManage.go @@ -41,6 +41,9 @@ func GetConsumer(name string) (consumer IConsumer) { case "consumerJYESNJZX": consumer = new(consumerJYESNJZX) + case "consumerAXYES2GDJKJC": + consumer = new(consumerAXYES2GDJKJC) + default: consumer = nil } diff --git a/dbOperate/dbHelper.go b/dbOperate/dbHelper.go index 373b67a..55142a4 100644 --- a/dbOperate/dbHelper.go +++ b/dbOperate/dbHelper.go @@ -57,7 +57,7 @@ func (the *DBHelper) dbOpen() error { func (the *DBHelper) Exec(dbRecordSQL string) error { if the.dbClient == nil { if openErr := the.dbOpen(); openErr != nil { - //logger.Info("[%s]数据库链接失败,err=%v\n", time.Now().Format("2006-01-02 15:04:05.000"), openErr) + //logger.ConfigInfo("[%s]数据库链接失败,err=%v\n", time.Now().Format("2006-01-02 15:04:05.000"), openErr) return openErr } } diff --git a/dbOperate/httpHelper.go b/dbOperate/httpHelper.go index b491521..236ae87 100644 --- a/dbOperate/httpHelper.go +++ b/dbOperate/httpHelper.go @@ -92,7 +92,9 @@ func (the *HttpHelper) Publish(messageBytes []byte) (string, error) { func (the *HttpHelper) PublishWithHeader(messageBytes []byte, headers map[string]string) (string, error) { resp, err := HttpPostWithHeader(the.Url, string(messageBytes), headers) if err != nil { - log.Printf("数据推送异常 err=%s,resp=%s", err.Error(), resp) + log.Printf("数据推送,异常 err=%s,resp=%s", err.Error(), resp) + } else { + log.Printf("数据推送,正常 resp=%s", resp) } return resp, err } @@ -178,10 +180,10 @@ func HttpPostWithHeader(url string, queryBody string, headers map[string]string) req.Header.Add(k, v) } - fmt.Printf("http post 开始请求,%s,\n,%v \n", url, req) + log.Printf("http post 开始请求,%s,\n,%v \n", url, req) resp, err := client.Do(req) if err != nil { - fmt.Println("请求POST异常 ", err, resp) + log.Println("请求POST异常 ", err, resp) return "", err } defer resp.Body.Close() @@ -190,7 +192,7 @@ func HttpPostWithHeader(url string, queryBody string, headers map[string]string) body, err := io.ReadAll(resp.Body) if err != nil { - fmt.Println("请求ReadAll异常 ", err, resp) + log.Println("请求ReadAll异常 ", err, resp) return "", err } return string(body), err diff --git a/utils/signByGDJKJC.go b/utils/signByGDJKJC.go new file mode 100644 index 0000000..9e4b1e4 --- /dev/null +++ b/utils/signByGDJKJC.go @@ -0,0 +1,49 @@ +package utils + +import ( + "crypto/sha1" + "fmt" + "log" + "os" + "time" +) + +func GetSHA1(rawStr string) (result string) { + //s := "sha2 this string" + + /* + 生成一个hash的模式是sha1.New() + sha1.Write(bytes) + sha1.Sum() + */ + + h := sha1.New() + h.Write([]byte(rawStr)) + bs := h.Sum(nil) + fmt.Printf("%x \n", bs) + result = fmt.Sprintf("%X", bs) + return +} + +func GetSHA1FromFile(fileName string) (result string) { + + content, err := os.ReadFile(fileName) + log.Printf("err=%v", err) + h := sha1.New() + h.Write(content) + bs := h.Sum(nil) + result = fmt.Sprintf("%X", bs) + log.Printf("SHA1_file=%s", result) + return +} + +// GetRnd 获取当前时间戳 到秒 +func GetRnd() (rnd string) { + t := time.Now().Unix() + log.Printf("%d", t) + return +} + +func GetSign(postStr, rnd, AppKey, AppSecret string) (sign string) { + return GetSHA1(postStr + rnd + AppKey + AppSecret) +}