Browse Source

update 更新广东平台基础流程

dev
lucas 1 month ago
parent
commit
f5b6e6625a
  1. 4
      adaptors/安心云es主题特征to广东省平台.go
  2. 6
      consumers/consumerAXYES2GDJKJC.go
  3. 2
      consumers/consumer_test.go

4
adaptors/安心云es主题特征to广东省平台.go

@ -33,7 +33,7 @@ func (the Adaptor_AXYES_GDJKJC) Transform(structId int64, factorId int, rawMsg s
return nil
}
Payload := the.EsAggTopToHBJCAS(structId, factorId, esAggDateHistogram)
Payload := the.EsAggTopToGDJKJC(structId, factorId, esAggDateHistogram)
if len(Payload) == 0 {
return needPush
}
@ -44,7 +44,7 @@ func (the Adaptor_AXYES_GDJKJC) Transform(structId int64, factorId int, rawMsg s
return needPush
}
func (the Adaptor_AXYES_GDJKJC) EsAggTopToHBJCAS(structId int64, factorId int, esAggs HBJCAS.EsThemeAggDateHistogram) (result []byte) {
func (the Adaptor_AXYES_GDJKJC) EsAggTopToGDJKJC(structId int64, factorId int, esAggs HBJCAS.EsThemeAggDateHistogram) (result []byte) {
buckets := esAggs.Aggregations.GroupSensor.Buckets
if len(buckets) == 0 {
log.Printf("[s=%d,f=%d] ,es agg数据数量==0", structId, factorId)

6
consumers/consumerAXYES2GDJKJC.go

@ -58,7 +58,7 @@ func (the *consumerAXYES2GDJKJC) InputInitial() error {
for taskName, cron := range the.Info.Monitor {
switch taskName {
case "cron10min":
the.monitor.RegisterTask(cron, the.getEs10minAggData)
the.monitor.RegisterTask(cron, the.GetEs10minAggData)
case "cron1hour":
the.monitor.RegisterTask(cron, the.getEs1HourAggData)
default:
@ -142,11 +142,11 @@ func (the *consumerAXYES2GDJKJC) getEs1HourAggData() {
}
func (the *consumerAXYES2GDJKJC) getEs10minAggData() {
func (the *consumerAXYES2GDJKJC) GetEs10minAggData() {
//utils.GetTimeRangeBy10min() 由于振动数据实时性问题 改用一小时统一上报
start, end := utils.GetTimeRangeByHour(-1)
log.Printf("查询10min数据时间范围 %s - %s", start, end)
factorIds := []int{28, 592} //监测因素 592 -> 结构物[5222]隧道河北承德广仁岭隧道(上行) 的加速度三项监测
factorIds := []int{28} //桥面振动 28
structIds := the.getStructIds()
for _, structId := range structIds {
for _, factorId := range factorIds {

2
consumers/consumer_test.go

@ -268,6 +268,6 @@ func Test_GDJKJC(t *testing.T) {
_consumerAXYES2GDJKJC := new(consumerAXYES2GDJKJC)
_consumerAXYES2GDJKJC.Initial(configBody)
//go _consumerGZGZM.RefreshTask()
_consumerAXYES2GDJKJC.GetEs10minAggData()
time.Sleep(2 * time.Second)
}

Loading…
Cancel
Save