You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
317 lines
7.8 KiB
317 lines
7.8 KiB
package consumers
|
|
|
|
import (
|
|
"fmt"
|
|
"goInOut/adaptors"
|
|
"goInOut/consumers/GDJKJC"
|
|
"goInOut/dbOperate"
|
|
"goInOut/monitors"
|
|
"goInOut/utils"
|
|
"gopkg.in/yaml.v3"
|
|
"log"
|
|
"strconv"
|
|
"time"
|
|
)
|
|
|
|
type consumerAXYES2GDJKJC struct {
|
|
//数据缓存管道
|
|
ch chan []adaptors.NeedPush
|
|
//具体配置
|
|
ConfigInfo GDJKJC.ConfigFile
|
|
InHttp *dbOperate.HttpHelper
|
|
outHttp *dbOperate.HttpHelper
|
|
monitor *monitors.CommonMonitor
|
|
infoRedis *dbOperate.RedisHelper
|
|
}
|
|
|
|
func (the *consumerAXYES2GDJKJC) LoadConfigJson(cfgStr string) {
|
|
err := yaml.Unmarshal([]byte(cfgStr), &the.ConfigInfo)
|
|
if err != nil {
|
|
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (the *consumerAXYES2GDJKJC) Initial(cfg string) error {
|
|
the.LoadConfigJson(cfg)
|
|
err := the.InputInitial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = the.OutputInitial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = the.infoComponentInitial()
|
|
return err
|
|
}
|
|
func (the *consumerAXYES2GDJKJC) InputInitial() error {
|
|
the.ch = make(chan []adaptors.NeedPush, 200)
|
|
//数据入口
|
|
the.InHttp = &dbOperate.HttpHelper{Url: the.ConfigInfo.IoConfig.In.Http.Url, Token: ""}
|
|
the.monitor = &monitors.CommonMonitor{
|
|
MonitorHelper: &monitors.MonitorHelper{},
|
|
}
|
|
|
|
the.monitor.Start()
|
|
for taskName, cron := range the.ConfigInfo.Monitor {
|
|
switch taskName {
|
|
case "cron10min":
|
|
the.monitor.RegisterTask(cron, the.GetEs10minAggData)
|
|
case "cron1hour":
|
|
the.monitor.RegisterTask(cron, the.getEs1HourAggData)
|
|
default:
|
|
log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
func (the *consumerAXYES2GDJKJC) OutputInitial() error {
|
|
//数据出口
|
|
the.outHttp = &dbOperate.HttpHelper{Url: the.ConfigInfo.IoConfig.Out.Http.Url, Token: ""}
|
|
return nil
|
|
}
|
|
|
|
func (the *consumerAXYES2GDJKJC) infoComponentInitial() error {
|
|
//数据出口
|
|
addr := the.ConfigInfo.QueryComponent.Redis.Address
|
|
the.infoRedis = dbOperate.NewRedisHelper("", addr)
|
|
return nil
|
|
}
|
|
func (the *consumerAXYES2GDJKJC) Work() {
|
|
go func() {
|
|
for {
|
|
needPushList := <-the.ch
|
|
if len(the.ch) > 0 {
|
|
log.Printf("取出ch数据,剩余[%d] ", len(the.ch))
|
|
}
|
|
|
|
for _, push := range needPushList {
|
|
structIdStr := push.Topic
|
|
if _, ok := the.ConfigInfo.Info[structIdStr]; !ok {
|
|
log.Printf("structId=%s 无匹配的省平台AppKeySecret", structIdStr)
|
|
continue
|
|
}
|
|
|
|
appKey := the.ConfigInfo.Info[structIdStr].AppKey
|
|
appSecret := the.ConfigInfo.Info[structIdStr].AppSecret
|
|
rnd := strconv.FormatInt(time.Now().Unix(), 10)
|
|
Header := map[string]string{
|
|
"appKey": appKey,
|
|
"rnd": rnd,
|
|
"sign": utils.GetSign(string(push.Payload), rnd, appKey, appSecret),
|
|
}
|
|
|
|
log.Printf("推送数据=%s", push.Payload)
|
|
_, err := the.outHttp.PublishWithHeader(push.Payload, Header)
|
|
if err != nil {
|
|
log.Printf("数据推送异常=> %s", err.Error())
|
|
return
|
|
}
|
|
}
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (the *consumerAXYES2GDJKJC) getAdaptor() (adaptor adaptors.Adaptor_AXYES_GDJKJC) {
|
|
|
|
return adaptors.Adaptor_AXYES_GDJKJC{
|
|
Redis: the.infoRedis,
|
|
}
|
|
}
|
|
|
|
func (the *consumerAXYES2GDJKJC) getStructIdsByBridge() []int {
|
|
var structIds []int
|
|
for strutId, _ := range the.ConfigInfo.StructInfo.Bridge {
|
|
structIds = append(structIds, strutId)
|
|
}
|
|
return structIds
|
|
}
|
|
func (the *consumerAXYES2GDJKJC) getStructIdsBySlope() []int {
|
|
var structIds []int
|
|
for strutId, _ := range the.ConfigInfo.StructInfo.Slope {
|
|
structIds = append(structIds, strutId)
|
|
}
|
|
return structIds
|
|
}
|
|
func (the *consumerAXYES2GDJKJC) getEs1HourAggData() {
|
|
start, end := utils.GetTimeRangeByHour(-1)
|
|
log.Printf("查询数据时间范围 %s - %s", start, end)
|
|
hourFactorIds := []int{11, 15, 18, 63} //应变11 桥墩倾斜15 裂缝监测18 支护结构变形63
|
|
structIds := the.getStructIdsByBridge()
|
|
the.handlerHourAggData(start, end, "bridge", structIds, hourFactorIds)
|
|
}
|
|
|
|
func (the *consumerAXYES2GDJKJC) handlerHourAggData(start, end, structType string, structIds, factorIds []int) {
|
|
adaptor := the.getAdaptor()
|
|
switch structType {
|
|
case "bridge":
|
|
adaptor.StructInfo = the.ConfigInfo.StructInfo.Bridge
|
|
case "slope":
|
|
adaptor.StructInfo = the.ConfigInfo.StructInfo.Slope
|
|
default:
|
|
log.Printf("无 匹配的结构物类型 => %s", structType)
|
|
return
|
|
}
|
|
|
|
for _, structId := range structIds {
|
|
for _, factorId := range factorIds {
|
|
esQuery := the.getESQueryStrByHour(structId, factorId, start, end)
|
|
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
|
|
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
|
|
|
|
lenRes := len(esAggResultStr)
|
|
if lenRes < 250 {
|
|
log.Printf("[s=%d,f=%d],es agg 返回无数据 len<250", structId, factorId)
|
|
continue
|
|
}
|
|
|
|
needPushes := adaptor.Transform(structId, factorId, esAggResultStr)
|
|
|
|
if len(needPushes) > 0 {
|
|
the.ch <- needPushes
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (the *consumerAXYES2GDJKJC) GetEs10minAggData() {
|
|
//utils.GetTimeRangeBy10min() 由于振动数据实时性问题 改用一小时统一上报
|
|
start, end := utils.GetTimeRangeByHour(-1)
|
|
log.Printf("查询10min数据时间范围 %s - %s", start, end)
|
|
factorIds := []int{28} //桥面振动 28
|
|
structIds := the.getStructIdsByBridge()
|
|
for _, structId := range structIds {
|
|
for _, factorId := range factorIds {
|
|
esQuery := the.getESQueryStrBy10min(structId, factorId, start, end)
|
|
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
|
|
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
|
|
|
|
adaptor := the.getAdaptor()
|
|
adaptor.StructInfo = the.ConfigInfo.StructInfo.Bridge
|
|
needPushes := adaptor.Transform(structId, factorId, esAggResultStr)
|
|
|
|
if len(needPushes) > 0 {
|
|
the.ch <- needPushes
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func (the *consumerAXYES2GDJKJC) getESQueryStrByHour(structureId int, factorId int, start, end string) string {
|
|
aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId)
|
|
esQuery := fmt.Sprintf(`
|
|
{
|
|
"size": 0,
|
|
"query": {
|
|
"bool": {
|
|
"must": [
|
|
{
|
|
"term": {
|
|
"structure": {
|
|
"value": %d
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"term": {
|
|
"factor": {
|
|
"value": %d
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"range": {
|
|
"collect_time": {
|
|
"gte": "%s",
|
|
"lt": "%s"
|
|
}
|
|
}
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"aggs": {
|
|
"groupSensor": {
|
|
"terms": {
|
|
"field": "sensor"
|
|
},
|
|
"aggs": {
|
|
"groupDate": {
|
|
"date_histogram": {
|
|
"field": "collect_time",
|
|
"interval": "1h",
|
|
"time_zone": "Asia/Shanghai",
|
|
"min_doc_count": 1
|
|
},
|
|
"aggs": %s
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
`, structureId, factorId, start, end, aggSubSql)
|
|
|
|
return esQuery
|
|
}
|
|
|
|
func (the *consumerAXYES2GDJKJC) getESQueryStrBy10min(structureId int, factorId int, start, end string) string {
|
|
aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId)
|
|
esQuery := fmt.Sprintf(`
|
|
{
|
|
"size": 0,
|
|
"query": {
|
|
"bool": {
|
|
"must": [
|
|
{
|
|
"term": {
|
|
"structure": {
|
|
"value": %d
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"term": {
|
|
"factor": {
|
|
"value": %d
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"range": {
|
|
"collect_time": {
|
|
"gte": "%s",
|
|
"lte": "%s"
|
|
}
|
|
}
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"aggs": {
|
|
"groupSensor": {
|
|
"terms": {
|
|
"field": "sensor"
|
|
},
|
|
"aggs": {
|
|
"groupDate": {
|
|
"date_histogram": {
|
|
"field": "collect_time",
|
|
"interval": "10m",
|
|
"time_zone": "Asia/Shanghai",
|
|
"min_doc_count": 1
|
|
},
|
|
"aggs": %s
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
`, structureId, factorId, start, end, aggSubSql)
|
|
|
|
return esQuery
|
|
}
|
|
|