Browse Source

update 更新倾角数据 配置信息 获取 结构物和测点映射

dev
lucas 3 months ago
parent
commit
0d01bd4f98
  1. 99
      adaptors/安心云es主题特征to河北公路设施监测.go
  2. 15
      configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml
  3. 10
      consumers/HBJCAS/config.go
  4. 57
      consumers/consumerHBJCAS.go
  5. 4
      models/esThemeAggDateHistogram.go

99
adaptors/安心云es主题特征to河北公路设施监测.go

@ -6,21 +6,21 @@ import (
"goInOut/consumers/HBJCAS/protoFiles_hb" "goInOut/consumers/HBJCAS/protoFiles_hb"
"goInOut/models" "goInOut/models"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"math"
"strconv"
"time" "time"
) )
// Adaptor_AXYES_HBGL 统一采集软件数据 转换 湘潭健康监测平台 // Adaptor_AXYES_HBGL 统一采集软件数据 转换 湘潭健康监测平台
type Adaptor_AXYES_HBGL struct { type Adaptor_AXYES_HBGL struct {
//传感器code转换信息 //传感器code转换信息
GnssMap map[string]string PointInfo map[int64]map[int64]int64
RainMap map[string]string StructInfo map[int64]int64
NBWYMap map[string]string
DXSWMap map[string]string
//一些必要信息 //一些必要信息
Info map[string]string Info map[string]string
} }
func (the Adaptor_AXYES_HBGL) Transform(rawMsg string) []NeedPush { func (the Adaptor_AXYES_HBGL) Transform(structId int64, factorId int, rawMsg string) []NeedPush {
esAggDateHistogram := models.EsThemeAggDateHistogram{} esAggDateHistogram := models.EsThemeAggDateHistogram{}
var needPush []NeedPush var needPush []NeedPush
err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram) err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram)
@ -28,44 +28,41 @@ func (the Adaptor_AXYES_HBGL) Transform(rawMsg string) []NeedPush {
return nil return nil
} }
needPush = append(needPush, NeedPush{ needPush = append(needPush, NeedPush{
Payload: the.EsAggTopToHBJCAS(esAggDateHistogram), Payload: the.EsAggTopToHBJCAS(structId, factorId, esAggDateHistogram),
}) })
return needPush return needPush
} }
func (the Adaptor_AXYES_HBGL) EsAggTopToHBJCAS(esAggs models.EsThemeAggDateHistogram) (result []byte) { func (the Adaptor_AXYES_HBGL) EsAggTopToHBJCAS(structId int64, factorId int, esAggs models.EsThemeAggDateHistogram) (result []byte) {
buckets := esAggs.Aggregations.GroupSensor.Buckets buckets := esAggs.Aggregations.GroupSensor.Buckets
if len(buckets) == 0 { if len(buckets) == 0 {
log.Info("es agg数据数量==0") log.Info("es agg数据数量==0")
return return
} }
//设施唯一编码(省平台)
uniqueCode := the.getUniqueCode(structId)
if uniqueCode == 0 {
log.Printf("structId=%d,无匹配省平台uniqueCode", structId)
return
}
//数据汇总 //数据汇总
complexData := &protoFiles_hb.ComplexData{} complexData := &protoFiles_hb.ComplexData{}
for _, sensorBucket := range buckets { for _, sensorBucket := range buckets {
//sensorId := sensorBucket.Key sensorId := sensorBucket.Key
monitorCode := int64(0)
for _, dateBucket := range sensorBucket.GroupDate.Buckets { for _, dateBucket := range sensorBucket.GroupDate.Buckets {
Atime := dateBucket.KeyAsString if _, ok := the.PointInfo[structId]; !ok {
dataDefinitionStatisticData := &protoFiles_hb.DataDefinition_StatisticData{ continue
StatisticData: &protoFiles_hb.StatisticData{
MonitorType: protoFiles_hb.MonitoryType_INC,
MonitorCode: 13000100001, //测点唯一编码
EventTime: Atime.Add(-8 * time.Hour).UnixMilli(),
Interval: 60 * 1000,
DataBody: &protoFiles_hb.StatisticData_Inc{Inc: &protoFiles_hb.INCStatistic{
MaxAbsoluteValueX: 0,
AvgValueX: 0,
RootMeanSquareX: 0,
MaxAbsoluteValueY: 0,
AvgValueY: 0,
RootMeanSquareY: 0,
}},
},
} }
if _, ok := the.PointInfo[structId][sensorId]; !ok {
continue
}
monitorCode = the.PointInfo[structId][sensorId]
dataDefinition := &protoFiles_hb.DataDefinition{ dataDefinition := &protoFiles_hb.DataDefinition{
DataType: protoFiles_hb.DataType_STATISTICS, DataType: protoFiles_hb.DataType_STATISTICS,
UniqueCode: "130109", //乃积沟大桥 UniqueCode: strconv.FormatInt(uniqueCode, 10), //乃积沟大桥
DataBody: dataDefinitionStatisticData, DataBody: the.EsAgg2StatisticData(factorId, monitorCode, dateBucket),
} }
complexData.SensorData = append(complexData.SensorData, dataDefinition) complexData.SensorData = append(complexData.SensorData, dataDefinition)
@ -76,3 +73,53 @@ func (the Adaptor_AXYES_HBGL) EsAggTopToHBJCAS(esAggs models.EsThemeAggDateHisto
result, _ = proto.Marshal(complexData) result, _ = proto.Marshal(complexData)
return result return result
} }
func (the Adaptor_AXYES_HBGL) getMonitorTypeByFactorId(factorId int) protoFiles_hb.MonitoryType {
//桥墩倾斜 15 支座位移20 桥面振动28
switch factorId {
case 15:
return protoFiles_hb.MonitoryType_INC
case 20:
return protoFiles_hb.MonitoryType_AND
case 28:
return protoFiles_hb.MonitoryType_VIB
default:
return protoFiles_hb.MonitoryType_CMM
}
}
func (the Adaptor_AXYES_HBGL) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket models.BucketsXY) *protoFiles_hb.DataDefinition_StatisticData {
Atime := dateBucket.KeyAsString.Add(-8 * time.Hour).UnixMilli()
maxAbsoluteValueX := max(math.Abs(dateBucket.X.Max), math.Abs(dateBucket.X.Min))
avgValueX := dateBucket.X.Avg
maxAbsoluteValueY := max(math.Abs(dateBucket.Y.Max), math.Abs(dateBucket.Y.Min))
avgValueY := dateBucket.Y.Avg
rootMeanSquareX := math.Sqrt(dateBucket.X.SumOfSquares / float64(dateBucket.X.Count))
rootMeanSquareY := math.Sqrt(dateBucket.Y.SumOfSquares / float64(dateBucket.Y.Count))
monitoryType := the.getMonitorTypeByFactorId(factorId)
dataDefinitionStatisticData := &protoFiles_hb.DataDefinition_StatisticData{
StatisticData: &protoFiles_hb.StatisticData{
MonitorType: monitoryType,
MonitorCode: monitorCode, //测点唯一编码
EventTime: Atime,
Interval: 60 * 1000,
DataBody: &protoFiles_hb.StatisticData_Inc{Inc: &protoFiles_hb.INCStatistic{
MaxAbsoluteValueX: float32(maxAbsoluteValueX),
AvgValueX: float32(avgValueX),
RootMeanSquareX: float32(rootMeanSquareX),
MaxAbsoluteValueY: float32(maxAbsoluteValueY),
AvgValueY: float32(avgValueY),
RootMeanSquareY: float32(rootMeanSquareY),
}},
},
}
return dataDefinitionStatisticData
}
func (the Adaptor_AXYES_HBGL) getUniqueCode(structId int64) (uniqueCode int64) {
if v, ok := the.StructInfo[structId]; ok {
uniqueCode = v
}
return uniqueCode
}

15
configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml

@ -3,7 +3,7 @@ ioConfig:
in: in:
http: http:
url: https://esproxy.anxinyun.cn/anxincloud_themes/_search url: https://esproxy.anxinyun.cn/anxincloud_themes/_search
cronStr: 55 0/1 * * * cronStr: 48 0/1 * * *
out: out:
mqtt: mqtt:
host: 10.8.30.160 host: 10.8.30.160
@ -14,10 +14,13 @@ ioConfig:
topics: topics:
- t/province/1307 - t/province/1307
info: info:
structureId: '5016'
rc4key: sK3kJttzZyf7aZI94zSYgytBYCrfZRt6yil2 rc4key: sK3kJttzZyf7aZI94zSYgytBYCrfZRt6yil2
#结构物id对应
structInfo:
5016: 130109
#点位id对应信息
pointInfo: pointInfo:
'5016': 5016: #河北承德乃积沟大桥
'68684': 13000100001 68384: 13000100001
'68685': 13000100002 68385: 13000100002
'68686': 13000100003 68386: 13000100003

10
consumers/HBJCAS/config.go

@ -3,18 +3,20 @@ package HBJCAS
import "goInOut/config" import "goInOut/config"
type ConfigFile struct { type ConfigFile struct {
IoConfig ioConfig `yaml:"ioConfig"` IoConfig ioConfig `yaml:"ioConfig"`
OtherInfo map[string]string `yaml:"info"` OtherInfo map[string]string `yaml:"info"`
PointInfo map[int64]map[int64]int64 `yaml:"pointInfo"`
StructInfo map[int64]int64 `yaml:"structInfo"`
} }
type ioConfig struct { type ioConfig struct {
In In `yaml:"in"` In In `yaml:"in"`
Out OUT `yaml:"out"` Out Out `yaml:"out"`
} }
type In struct { type In struct {
Http config.HttpConfig `yaml:"http"` Http config.HttpConfig `yaml:"http"`
CronStr string `yaml:"cronStr"` CronStr string `yaml:"cronStr"`
} }
type OUT struct { type Out struct {
Mqtt config.MqttConfig `json:"mqtt"` Mqtt config.MqttConfig `json:"mqtt"`
} }

57
consumers/consumerHBJCAS.go

@ -25,7 +25,7 @@ type consumerHBJCAS struct {
} }
func (the *consumerHBJCAS) LoadConfigJson(cfgStr string) { func (the *consumerHBJCAS) LoadConfigJson(cfgStr string) {
// 将 JSON 格式的数据解析到结构体中 // 将 yaml 格式的数据解析到结构体中
err := yaml.Unmarshal([]byte(cfgStr), &the.Info) err := yaml.Unmarshal([]byte(cfgStr), &the.Info)
if err != nil { if err != nil {
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
@ -96,27 +96,38 @@ func (the *consumerHBJCAS) getAdaptor() (adaptor adaptors.Adaptor_AXYES_HBGL) {
return adaptors.Adaptor_AXYES_HBGL{} return adaptors.Adaptor_AXYES_HBGL{}
} }
func (the *consumerHBJCAS) getStructIds() []int64 {
var structIds []int64
for strutId, _ := range the.Info.PointInfo {
structIds = append(structIds, strutId)
}
return structIds
}
func (the *consumerHBJCAS) getEsData() { func (the *consumerHBJCAS) getEsData() {
structureId := the.getStructureId()
start, end := utils.GetTimeRangeByHour(-1) start, end := utils.GetTimeRangeByHour(-1)
log.Printf("查询数据时间范围 %s - %s", start, end) log.Printf("查询数据时间范围 %s - %s", start, end)
hourFactorIds := []int{15, 20} //, "28"
structIds := the.getStructIds()
for _, structId := range structIds {
for _, factorId := range hourFactorIds {
esQuery := the.getESQueryStrByHour(structId, factorId, start, end)
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
esAggResult := the.InHttp.HttpGetWithHeader(esQuery, auth)
adaptor := the.getAdaptor()
adaptor.PointInfo = the.Info.PointInfo
adaptor.StructInfo = the.Info.StructInfo
needPushes := adaptor.Transform(structId, factorId, esAggResult)
for i := range needPushes {
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload)
}
hourFactorIds := []string{"15", "20"} //, "28" if len(needPushes) > 0 {
for _, factorId := range hourFactorIds { the.ch <- needPushes
esQuery := the.getESQueryStrByHour(structureId, factorId, start, end) }
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
esAggResult := the.InHttp.HttpGetWithHeader(esQuery, auth)
adaptor := the.getAdaptor()
needPushes := adaptor.Transform(esAggResult)
for i := range needPushes {
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload)
}
if len(needPushes) > 0 {
the.ch <- needPushes
} }
} }
} }
func (the *consumerHBJCAS) crc16rc4(transBytes []byte) []byte { func (the *consumerHBJCAS) crc16rc4(transBytes []byte) []byte {
resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes) resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes)
@ -134,7 +145,7 @@ func (the *consumerHBJCAS) crc16rc4(transBytes []byte) []byte {
log.Printf("rc4加密结果=> %s ", hex.EncodeToString(dest1)) log.Printf("rc4加密结果=> %s ", hex.EncodeToString(dest1))
return dest1 return dest1
} }
func (the *consumerHBJCAS) getESQueryStrByHour(structureId, factorId, start, end string) string { func (the *consumerHBJCAS) getESQueryStrByHour(structureId int64, factorId int, start, end string) string {
aggSubSql := getEsAggSubSqlByFactorId(factorId) aggSubSql := getEsAggSubSqlByFactorId(factorId)
esQuery := fmt.Sprintf(` esQuery := fmt.Sprintf(`
{ {
@ -145,14 +156,14 @@ func (the *consumerHBJCAS) getESQueryStrByHour(structureId, factorId, start, end
{ {
"term": { "term": {
"structure": { "structure": {
"value": %s "value": %d
} }
} }
}, },
{ {
"term": { "term": {
"factor": { "factor": {
"value": %s "value": %d
} }
} }
}, },
@ -190,11 +201,11 @@ func (the *consumerHBJCAS) getESQueryStrByHour(structureId, factorId, start, end
return esQuery return esQuery
} }
func getEsAggSubSqlByFactorId(factorId string) string { func getEsAggSubSqlByFactorId(factorId int) string {
//桥墩倾斜 15 支座位移20 桥面振动28 //桥墩倾斜 15 支座位移20 桥面振动28
subAggSQl := "" subAggSQl := ""
switch factorId { switch factorId {
case "15": case 15:
subAggSQl = ` subAggSQl = `
{ {
"x": { "x": {
@ -208,7 +219,7 @@ func getEsAggSubSqlByFactorId(factorId string) string {
} }
} }
}` }`
case "20": case 20:
subAggSQl = ` subAggSQl = `
{ {
"displacement": { "displacement": {
@ -217,7 +228,7 @@ func getEsAggSubSqlByFactorId(factorId string) string {
} }
} }
}` }`
case "28": case 28:
subAggSQl = ` subAggSQl = `
{ {
"trms": { "trms": {

4
models/esThemeAggDateHistogram.go

@ -27,8 +27,8 @@ type GPBySensorIdAggByDateHistogram struct {
} }
type Buckets struct { type Buckets struct {
Key int `json:"key"` Key int64 `json:"key"`
DocCount int `json:"doc_count"` DocCount int64 `json:"doc_count"`
GroupDate struct { GroupDate struct {
Buckets []BucketsXY `json:"buckets"` Buckets []BucketsXY `json:"buckets"`
} `json:"groupDate"` } `json:"groupDate"`

Loading…
Cancel
Save