Browse Source

添加 振动数据上报支持

dev
lucas 5 months ago
parent
commit
72d75c6d9f
  1. 13
      adaptors/安心云es主题特征to河北公路设施监测.go
  2. 7
      configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml
  3. 4
      consumers/HBJCAS/config.go
  4. 2
      consumers/HBJCAS/dataModel.go
  5. 2
      consumers/consumerCDJYSN.go
  6. 127
      consumers/consumerHBJCAS.go
  7. 4
      dbOperate/httpHelper.go
  8. 4
      monitors/commonMonitor.go
  9. 5
      monitors/httpMonitor.go
  10. 6
      monitors/monitorHelper.go

13
adaptors/安心云es主题特征to河北公路设施监测.go

@ -3,8 +3,8 @@ package adaptors
import (
"encoding/json"
"github.com/labstack/gommon/log"
"goInOut/consumers/HBJCAS"
"goInOut/consumers/HBJCAS/protoFiles_hb"
"goInOut/models"
"google.golang.org/protobuf/proto"
"math"
"strconv"
@ -21,7 +21,7 @@ type Adaptor_AXYES_HBGL struct {
}
func (the Adaptor_AXYES_HBGL) Transform(structId int64, factorId int, rawMsg string) []NeedPush {
esAggDateHistogram := models.EsThemeAggDateHistogram{}
esAggDateHistogram := HBJCAS.EsThemeAggDateHistogram{}
var needPush []NeedPush
err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram)
if err != nil {
@ -39,7 +39,7 @@ func (the Adaptor_AXYES_HBGL) Transform(structId int64, factorId int, rawMsg str
return needPush
}
func (the Adaptor_AXYES_HBGL) EsAggTopToHBJCAS(structId int64, factorId int, esAggs models.EsThemeAggDateHistogram) (result []byte) {
func (the Adaptor_AXYES_HBGL) EsAggTopToHBJCAS(structId int64, factorId int, esAggs HBJCAS.EsThemeAggDateHistogram) (result []byte) {
buckets := esAggs.Aggregations.GroupSensor.Buckets
if len(buckets) == 0 {
log.Info("es agg数据数量==0")
@ -93,7 +93,7 @@ func (the Adaptor_AXYES_HBGL) getMonitorTypeByFactorId(factorId int) protoFiles_
}
}
func (the Adaptor_AXYES_HBGL) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket models.BucketsXY) *protoFiles_hb.DataDefinition_StatisticData {
func (the Adaptor_AXYES_HBGL) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket HBJCAS.BucketsXY) *protoFiles_hb.DataDefinition_StatisticData {
Atime := dateBucket.KeyAsString.Add(-8 * time.Hour).UnixMilli()
maxAbsoluteValueX := max(math.Abs(dateBucket.X.Max), math.Abs(dateBucket.X.Min))
avgValueX := dateBucket.X.Avg
@ -137,6 +137,11 @@ func (the Adaptor_AXYES_HBGL) EsAgg2StatisticData(factorId int, monitorCode int6
RootMeanSquare: float32(rootMeanSquareX),
TotalAbsoluteValue: float32(dateBucket.X.Max - dateBucket.X.Min),
}}
case 28: //振动
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Vib{Vib: &protoFiles_hb.VIBStatistic{
MaxAbsoluteValue: float32(maxAbsoluteValueX),
RootMeanSquare: float32(rootMeanSquareY),
}}
}
return dataDefinitionStatisticData

7
configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml

@ -3,7 +3,6 @@ ioConfig:
in:
http:
url: https://esproxy.anxinyun.cn/anxincloud_themes/_search
cronStr: 11 0/1 * * *
out:
mqtt:
host: 10.8.30.160
@ -13,6 +12,10 @@ ioConfig:
clientId: chengDe
topics:
- t/province/1307
monitor:
#由于振动是触发式,数据迟缓 cron10min也改成1小时一次 上报多条
cron10min: 30 0/1 * * * #6/10 * * * *
cron1hour: 11 0/1 * * *
info:
rc4key: sK3kJttzZyf7aZI94zSYgytBYCrfZRt6yil2
#结构物id对应
@ -27,9 +30,11 @@ pointInfo:
68398: 1301100002
68399: 1301100001
5016: #河北承德乃积沟大桥
#桥墩倾斜
68384: 13010900001
68385: 13010900002
68386: 13010900003
#支座位移
68387: 13010900004
68388: 13010900005
68389: 13010900006

4
consumers/HBJCAS/config.go

@ -7,14 +7,14 @@ type ConfigFile struct {
OtherInfo map[string]string `yaml:"info"`
PointInfo map[int64]map[int64]int64 `yaml:"pointInfo"`
StructInfo map[int64]int64 `yaml:"structInfo"`
Monitor map[string]string `yaml:"monitor"`
}
type ioConfig struct {
In In `yaml:"in"`
Out Out `yaml:"out"`
}
type In struct {
Http config.HttpConfig `yaml:"http"`
CronStr string `yaml:"cronStr"`
Http config.HttpConfig `yaml:"http"`
}
type Out struct {

2
models/esThemeAggDateHistogram.go → consumers/HBJCAS/dataModel.go

@ -1,4 +1,4 @@
package models
package HBJCAS
import "time"

2
consumers/consumerCDJYSN.go

@ -52,7 +52,7 @@ func (the *consumerCDJYSN) InputInitial() error {
}
the.InHttp.Start()
the.InHttp.RegisterFun(the.getEsData)
the.InHttp.RegisterTask(the.ConfigInfo.IoConfig.In.CronStr, the.getEsData)
return nil
}
func (the *consumerCDJYSN) OutputInitial() error {

127
consumers/consumerHBJCAS.go

@ -47,10 +47,20 @@ func (the *consumerHBJCAS) InputInitial() error {
//数据入口
the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""}
the.monitor = &monitors.CommonMonitor{
MonitorHelper: &monitors.MonitorHelper{CronStr: the.Info.IoConfig.In.CronStr},
MonitorHelper: &monitors.MonitorHelper{},
}
for taskName, cron := range the.Info.Monitor {
switch taskName {
case "cron10min":
the.monitor.RegisterTask(cron, the.getEs1HourAggData)
case "cron1hour":
the.monitor.RegisterTask(cron, the.getEs10minAggData)
default:
log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron)
}
}
the.monitor.Start()
the.monitor.RegisterFun(the.getEsHourAggData)
return nil
}
func (the *consumerHBJCAS) OutputInitial() error {
@ -103,21 +113,48 @@ func (the *consumerHBJCAS) getStructIds() []int64 {
}
return structIds
}
func (the *consumerHBJCAS) getEsHourAggData() {
func (the *consumerHBJCAS) getEs1HourAggData() {
start, end := utils.GetTimeRangeByHour(-1)
log.Printf("查询数据时间范围 %s - %s", start, end)
hourFactorIds := []int{18, 15, 20} //15, 20 , 28
hourFactorIds := []int{15, 18, 20}
structIds := the.getStructIds()
for _, structId := range structIds {
for _, factorId := range hourFactorIds {
esQuery := the.getESQueryStrByHour(structId, factorId, start, end)
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
esAggResult := the.InHttp.HttpGetWithHeader(esQuery, auth)
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
adaptor := the.getAdaptor()
adaptor.PointInfo = the.Info.PointInfo
adaptor.StructInfo = the.Info.StructInfo
needPushes := adaptor.Transform(structId, factorId, esAggResultStr)
for i := range needPushes {
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload)
}
if len(needPushes) > 0 {
the.ch <- needPushes
}
}
}
}
func (the *consumerHBJCAS) getEs10minAggData() {
start, end := utils.GetTimeRangeBy10min()
log.Printf("查询数据时间范围 %s - %s", start, end)
factorIds := []int{28}
structIds := the.getStructIds()
for _, structId := range structIds {
for _, factorId := range factorIds {
esQuery := the.getESQueryStrBy10min(structId, factorId, start, end)
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
adaptor := the.getAdaptor()
adaptor.PointInfo = the.Info.PointInfo
adaptor.StructInfo = the.Info.StructInfo
needPushes := adaptor.Transform(structId, factorId, esAggResult)
needPushes := adaptor.Transform(structId, factorId, esAggResultStr)
for i := range needPushes {
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload)
}
@ -129,6 +166,7 @@ func (the *consumerHBJCAS) getEsHourAggData() {
}
}
func (the *consumerHBJCAS) crc16rc4(transBytes []byte) []byte {
resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes)
needRC4 := append(transBytes, resultByCrc16...)
@ -187,8 +225,66 @@ func (the *consumerHBJCAS) getESQueryStrByHour(structureId int64, factorId int,
"groupDate": {
"date_histogram": {
"field": "collect_time",
"interval": "hour",
"time_zone": "Asia/Shanghai"
"interval": "1h",
"time_zone": "Asia/Shanghai",
"min_doc_count": 1
},
"aggs": %s
}
}
}
}
}
`, structureId, factorId, start, end, aggSubSql)
return esQuery
}
func (the *consumerHBJCAS) getESQueryStrBy10min(structureId int64, factorId int, start, end string) string {
aggSubSql := getEsAggSubSqlByFactorId(factorId)
esQuery := fmt.Sprintf(`
{
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {
"structure": {
"value": %d
}
}
},
{
"term": {
"factor": {
"value": %d
}
}
},
{
"range": {
"collect_time": {
"gte": "%s",
"lte": "%s"
}
}
}
]
}
},
"aggs": {
"groupSensor": {
"terms": {
"field": "sensor"
},
"aggs": {
"groupDate": {
"date_histogram": {
"field": "collect_time",
"interval": "10m",
"time_zone": "Asia/Shanghai",
"min_doc_count": 1
},
"aggs": %s
}
@ -240,11 +336,16 @@ func getEsAggSubSqlByFactorId(factorId int) string {
case 28:
subAggSQl = `
{
"trms": {
"extended_stats": {
"field": "data.trms"
}
}
"x": {
"extended_stats": {
"field": "data.pv"
}
},
"y": {
"extended_stats": {
"field": "data.trms"
}
}
}`
}
return subAggSQl

4
dbOperate/httpHelper.go

@ -36,7 +36,7 @@ func (the *HttpHelper) HttpGet(queryBody string) string {
fmt.Println(err)
}
defer resp.Body.Close()
fmt.Println("http get 请求,url", url, " <- code=", resp.StatusCode)
log.Println("http get 请求,url", url, " <- code=", resp.StatusCode)
body, err := io.ReadAll(resp.Body)
return string(body)
}
@ -64,7 +64,7 @@ func (the *HttpHelper) HttpGetWithHeader(queryBody string, headerMap map[string]
}
}
}(resp)
fmt.Println("http get 请求,url", url, " <- code=", resp.StatusCode)
log.Println("http get 请求,url", url, " <- code=", resp.StatusCode)
body, err := io.ReadAll(resp.Body)
return string(body)
}

4
monitors/commonMonitor.go

@ -4,8 +4,8 @@ type CommonMonitor struct {
*MonitorHelper
}
func (the *CommonMonitor) RegisterFun(fun func()) {
the.registerFun(fun)
func (the *CommonMonitor) RegisterTask(cron string, fun func()) {
the.registerTask(cron, fun)
}
func (the *CommonMonitor) Start() {

5
monitors/httpMonitor.go

@ -7,10 +7,9 @@ type HttpMonitor struct {
*MonitorHelper
}
func (the *HttpMonitor) RegisterFun(fun func()) {
the.registerFun(fun)
func (the *HttpMonitor) RegisterTask(cron string, fun func()) {
the.registerTask(cron, fun)
}
func (the *HttpMonitor) Start() {
the.HttpClient.Initial()
the.initial()

6
monitors/monitorHelper.go

@ -15,9 +15,9 @@ func (the *MonitorHelper) initial() {
log.Printf("cronStr=%s", the.CronStr)
}
// RegisterFun 注册定时器方法
func (the *MonitorHelper) registerFun(task func()) {
entryID, err := the.Cron.AddFunc(the.CronStr, task)
// RegisterTask 注册定时器方法
func (the *MonitorHelper) registerTask(cron string, task func()) {
entryID, err := the.Cron.AddFunc(cron, task)
if err != nil {
log.Printf("cron 定时任务[%v]添加异常:%s", entryID, err.Error())
}

Loading…
Cancel
Save