1 changed files with 310 additions and 0 deletions
@ -0,0 +1,310 @@ |
|||||
|
package consumers |
||||
|
|
||||
|
import ( |
||||
|
"crypto/rc4" |
||||
|
"encoding/hex" |
||||
|
"fmt" |
||||
|
"goInOut/adaptors" |
||||
|
"goInOut/consumers/HBJCAS" |
||||
|
"goInOut/dbOperate" |
||||
|
"goInOut/monitors" |
||||
|
"goInOut/utils" |
||||
|
"gopkg.in/yaml.v3" |
||||
|
"log" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
type consumerAXYES2GDJKJC struct { |
||||
|
//数据缓存管道
|
||||
|
ch chan []adaptors.NeedPush |
||||
|
//具体配置
|
||||
|
Info HBJCAS.ConfigFile |
||||
|
InHttp *dbOperate.HttpHelper |
||||
|
outHttp *dbOperate.HttpHelper |
||||
|
monitor *monitors.CommonMonitor |
||||
|
infoRedis *dbOperate.RedisHelper |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYES2GDJKJC) LoadConfigJson(cfgStr string) { |
||||
|
err := yaml.Unmarshal([]byte(cfgStr), &the.Info) |
||||
|
if err != nil { |
||||
|
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) |
||||
|
panic(err) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYES2GDJKJC) Initial(cfg string) error { |
||||
|
the.LoadConfigJson(cfg) |
||||
|
err := the.InputInitial() |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
err = the.OutputInitial() |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
err = the.infoComponentInitial() |
||||
|
return err |
||||
|
} |
||||
|
func (the *consumerAXYES2GDJKJC) InputInitial() error { |
||||
|
the.ch = make(chan []adaptors.NeedPush, 200) |
||||
|
//数据入口
|
||||
|
the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""} |
||||
|
the.monitor = &monitors.CommonMonitor{ |
||||
|
MonitorHelper: &monitors.MonitorHelper{}, |
||||
|
} |
||||
|
|
||||
|
the.monitor.Start() |
||||
|
for taskName, cron := range the.Info.Monitor { |
||||
|
switch taskName { |
||||
|
case "cron10min": |
||||
|
the.monitor.RegisterTask(cron, the.getEs10minAggData) |
||||
|
case "cron1hour": |
||||
|
the.monitor.RegisterTask(cron, the.getEs1HourAggData) |
||||
|
default: |
||||
|
log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
return nil |
||||
|
} |
||||
|
func (the *consumerAXYES2GDJKJC) OutputInitial() error { |
||||
|
//数据出口
|
||||
|
the.outHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.Out.Http.Url, Token: ""} |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYES2GDJKJC) infoComponentInitial() error { |
||||
|
//数据出口
|
||||
|
addr := the.Info.QueryComponent.Redis.Address |
||||
|
the.infoRedis = dbOperate.NewRedisHelper("", addr) |
||||
|
return nil |
||||
|
} |
||||
|
func (the *consumerAXYES2GDJKJC) Work() { |
||||
|
go func() { |
||||
|
for { |
||||
|
needPushList := <-the.ch |
||||
|
if len(the.ch) > 0 { |
||||
|
log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) |
||||
|
} |
||||
|
|
||||
|
for _, push := range needPushList { |
||||
|
_, err := the.outHttp.Publish(push.Payload) |
||||
|
if err != nil { |
||||
|
log.Printf("数据推送异常=> %s", err.Error()) |
||||
|
return |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
time.Sleep(100 * time.Millisecond) |
||||
|
} |
||||
|
}() |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYES2GDJKJC) getAdaptor() (adaptor adaptors.Adaptor_AXYES_HBGL) { |
||||
|
|
||||
|
return adaptors.Adaptor_AXYES_HBGL{ |
||||
|
Redis: the.infoRedis, |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYES2GDJKJC) getStructIds() []int64 { |
||||
|
var structIds []int64 |
||||
|
for strutId, _ := range the.Info.StructInfo { |
||||
|
structIds = append(structIds, strutId) |
||||
|
} |
||||
|
return structIds |
||||
|
} |
||||
|
func (the *consumerAXYES2GDJKJC) getEs1HourAggData() { |
||||
|
start, end := utils.GetTimeRangeByHour(-1) |
||||
|
log.Printf("查询数据时间范围 %s - %s", start, end) |
||||
|
hourFactorIds := []int{15, 18, 20} |
||||
|
structIds := the.getStructIds() |
||||
|
for _, structId := range structIds { |
||||
|
for _, factorId := range hourFactorIds { |
||||
|
esQuery := the.getESQueryStrByHour(structId, factorId, start, end) |
||||
|
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} |
||||
|
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) |
||||
|
|
||||
|
adaptor := the.getAdaptor() |
||||
|
adaptor.PointInfo = the.Info.PointInfo |
||||
|
adaptor.StructInfo = the.Info.StructInfo |
||||
|
needPushes := adaptor.Transform(structId, factorId, esAggResultStr) |
||||
|
for i := range needPushes { |
||||
|
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) |
||||
|
} |
||||
|
|
||||
|
if len(needPushes) > 0 { |
||||
|
the.ch <- needPushes |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYES2GDJKJC) getEs10minAggData() { |
||||
|
//utils.GetTimeRangeBy10min() 由于振动数据实时性问题 改用一小时统一上报
|
||||
|
start, end := utils.GetTimeRangeByHour(-1) |
||||
|
log.Printf("查询10min数据时间范围 %s - %s", start, end) |
||||
|
factorIds := []int{28, 592} //监测因素 592 -> 结构物[5222]隧道河北承德广仁岭隧道(上行) 的加速度三项监测
|
||||
|
structIds := the.getStructIds() |
||||
|
for _, structId := range structIds { |
||||
|
for _, factorId := range factorIds { |
||||
|
esQuery := the.getESQueryStrBy10min(structId, factorId, start, end) |
||||
|
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} |
||||
|
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) |
||||
|
|
||||
|
adaptor := the.getAdaptor() |
||||
|
adaptor.PointInfo = the.Info.PointInfo |
||||
|
adaptor.StructInfo = the.Info.StructInfo |
||||
|
needPushes := adaptor.Transform(structId, factorId, esAggResultStr) |
||||
|
for i := range needPushes { |
||||
|
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) |
||||
|
log.Printf("topic[%s],Payload=> %s", needPushes[i].Topic, hex.EncodeToString(needPushes[i].Payload)) |
||||
|
} |
||||
|
|
||||
|
if len(needPushes) > 0 { |
||||
|
the.ch <- needPushes |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYES2GDJKJC) crc16rc4(transBytes []byte) []byte { |
||||
|
resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes) |
||||
|
needRC4 := append(transBytes, resultByCrc16...) |
||||
|
rc4KeyStr, ok := the.Info.OtherInfo["rc4key"] |
||||
|
if !ok { |
||||
|
log.Panicf("未配置 rc4key") |
||||
|
} |
||||
|
rc4Key := []byte(rc4KeyStr) //the.RC4Key
|
||||
|
// 加密操作
|
||||
|
dest1 := make([]byte, len(needRC4)) |
||||
|
rc4.NewCipher(rc4Key) |
||||
|
cipher1, _ := rc4.NewCipher(rc4Key) |
||||
|
cipher1.XORKeyStream(dest1, needRC4) |
||||
|
return dest1 |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYES2GDJKJC) getESQueryStrByHour(structureId int64, factorId int, start, end string) string { |
||||
|
aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) |
||||
|
esQuery := fmt.Sprintf(` |
||||
|
{ |
||||
|
"size": 0, |
||||
|
"query": { |
||||
|
"bool": { |
||||
|
"must": [ |
||||
|
{ |
||||
|
"term": { |
||||
|
"structure": { |
||||
|
"value": %d |
||||
|
} |
||||
|
} |
||||
|
}, |
||||
|
{ |
||||
|
"term": { |
||||
|
"factor": { |
||||
|
"value": %d |
||||
|
} |
||||
|
} |
||||
|
}, |
||||
|
{ |
||||
|
"range": { |
||||
|
"collect_time": { |
||||
|
"gte": "%s", |
||||
|
"lt": "%s" |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
] |
||||
|
} |
||||
|
}, |
||||
|
"aggs": { |
||||
|
"groupSensor": { |
||||
|
"terms": { |
||||
|
"field": "sensor" |
||||
|
}, |
||||
|
"aggs": { |
||||
|
"groupDate": { |
||||
|
"date_histogram": { |
||||
|
"field": "collect_time", |
||||
|
"interval": "1h", |
||||
|
"time_zone": "Asia/Shanghai", |
||||
|
"min_doc_count": 1 |
||||
|
}, |
||||
|
"aggs": %s |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
`, structureId, factorId, start, end, aggSubSql) |
||||
|
|
||||
|
return esQuery |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYES2GDJKJC) getESQueryStrBy10min(structureId int64, factorId int, start, end string) string { |
||||
|
aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) |
||||
|
esQuery := fmt.Sprintf(` |
||||
|
{ |
||||
|
"size": 0, |
||||
|
"query": { |
||||
|
"bool": { |
||||
|
"must": [ |
||||
|
{ |
||||
|
"term": { |
||||
|
"structure": { |
||||
|
"value": %d |
||||
|
} |
||||
|
} |
||||
|
}, |
||||
|
{ |
||||
|
"term": { |
||||
|
"factor": { |
||||
|
"value": %d |
||||
|
} |
||||
|
} |
||||
|
}, |
||||
|
{ |
||||
|
"range": { |
||||
|
"collect_time": { |
||||
|
"gte": "%s", |
||||
|
"lte": "%s" |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
] |
||||
|
} |
||||
|
}, |
||||
|
"aggs": { |
||||
|
"groupSensor": { |
||||
|
"terms": { |
||||
|
"field": "sensor" |
||||
|
}, |
||||
|
"aggs": { |
||||
|
"groupDate": { |
||||
|
"date_histogram": { |
||||
|
"field": "collect_time", |
||||
|
"interval": "10m", |
||||
|
"time_zone": "Asia/Shanghai", |
||||
|
"min_doc_count": 1 |
||||
|
}, |
||||
|
"aggs": %s |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
`, structureId, factorId, start, end, aggSubSql) |
||||
|
|
||||
|
return esQuery |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYES2GDJKJC) getStructureId() string { |
||||
|
structureId, ok := the.Info.OtherInfo["structureId"] |
||||
|
if !ok { |
||||
|
log.Panicf("无法识别有效的structureId") |
||||
|
} |
||||
|
return structureId |
||||
|
} |
Loading…
Reference in new issue