You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
490 lines
11 KiB
490 lines
11 KiB
package consumers
|
|
|
|
import (
|
|
"crypto/rc4"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"goInOut/adaptors"
|
|
"goInOut/consumers/HBJCAS"
|
|
"goInOut/dbOperate"
|
|
"goInOut/monitors"
|
|
"goInOut/utils"
|
|
"gopkg.in/yaml.v3"
|
|
"log"
|
|
"time"
|
|
)
|
|
|
|
type consumerGZG2ZJHL struct {
|
|
//数据缓存管道
|
|
ch chan []adaptors.NeedPush
|
|
//具体配置
|
|
Info HBJCAS.ConfigFile
|
|
InHttp *dbOperate.HttpHelper
|
|
outMqtt *dbOperate.MqttHelper
|
|
monitor *monitors.CommonMonitor
|
|
infoRedis *dbOperate.RedisHelper
|
|
}
|
|
|
|
func (the *consumerGZG2ZJHL) LoadConfigJson(cfgStr string) {
|
|
// 将 yaml 格式的数据解析到结构体中
|
|
err := yaml.Unmarshal([]byte(cfgStr), &the.Info)
|
|
if err != nil {
|
|
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (the *consumerGZG2ZJHL) Initial(cfg string) error {
|
|
the.LoadConfigJson(cfg)
|
|
err := the.InputInitial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = the.OutputInitial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = the.infoComponentInitial()
|
|
return err
|
|
}
|
|
func (the *consumerGZG2ZJHL) InputInitial() error {
|
|
the.ch = make(chan []adaptors.NeedPush, 200)
|
|
//数据入口
|
|
the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""}
|
|
the.monitor = &monitors.CommonMonitor{
|
|
MonitorHelper: &monitors.MonitorHelper{},
|
|
}
|
|
|
|
the.monitor.Start()
|
|
for taskName, cron := range the.Info.Monitor {
|
|
switch taskName {
|
|
case "cron10min":
|
|
the.monitor.RegisterTask(cron, the.getEs10minAggData)
|
|
case "cron1hour":
|
|
the.monitor.RegisterTask(cron, the.getEs1HourAggData)
|
|
default:
|
|
log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
func (the *consumerGZG2ZJHL) OutputInitial() error {
|
|
//数据出口
|
|
the.outMqtt = dbOperate.MqttInitial(
|
|
the.Info.IoConfig.Out.Mqtt.Host,
|
|
the.Info.IoConfig.Out.Mqtt.Port,
|
|
the.Info.IoConfig.Out.Mqtt.ClientId,
|
|
the.Info.IoConfig.Out.Mqtt.UserName,
|
|
the.Info.IoConfig.Out.Mqtt.Password,
|
|
false, //按照具体项目来
|
|
"")
|
|
return nil
|
|
}
|
|
|
|
func (the *consumerGZG2ZJHL) infoComponentInitial() error {
|
|
//数据出口
|
|
addr := the.Info.QueryComponent.Redis.Address
|
|
the.infoRedis = dbOperate.NewRedisHelper("", addr)
|
|
return nil
|
|
}
|
|
func (the *consumerGZG2ZJHL) Work() {
|
|
go func() {
|
|
for {
|
|
needPushList := <-the.ch
|
|
if len(the.ch) > 0 {
|
|
log.Printf("取出ch数据,剩余[%d] ", len(the.ch))
|
|
}
|
|
|
|
for _, push := range needPushList {
|
|
if push.Topic != "" {
|
|
the.outMqtt.Publish(push.Topic, push.Payload)
|
|
continue
|
|
}
|
|
|
|
//没有标记topic 的 按照配置文件里面的推送
|
|
for _, topic := range the.Info.IoConfig.Out.Mqtt.Topics {
|
|
the.outMqtt.Publish(topic, push.Payload)
|
|
}
|
|
|
|
}
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (the *consumerGZG2ZJHL) getAdaptor() (adaptor adaptors.Adaptor_ZWYES_ZJHL) {
|
|
|
|
return adaptors.Adaptor_ZWYES_ZJHL{
|
|
Redis: the.infoRedis,
|
|
}
|
|
}
|
|
|
|
func (the *consumerGZG2ZJHL) getStructIds() []int64 {
|
|
var structIds []int64
|
|
for strutId, _ := range the.Info.StructInfo {
|
|
structIds = append(structIds, strutId)
|
|
}
|
|
return structIds
|
|
}
|
|
func (the *consumerGZG2ZJHL) getEs1HourAggData() {
|
|
start, end := utils.GetTimeRangeByHour(-1)
|
|
log.Printf("查询数据时间范围 %s - %s", start, end)
|
|
hourFactorIds := []int{4, 6, 7, 11, 19, 24, 883, 935}
|
|
//hourFactorIds := []int{4, 883}
|
|
//湿度883 -> 要当作温湿度处理,合并温度监测中的3个有相同label的测点数据,组合成虚拟温湿度数据
|
|
//温湿度2(要温度和湿度数据拼接), 源于温度测点和湿度测点 特殊的3个label测点算温湿度
|
|
//6500030003(温度测点 72179,湿度测点 72191),6500030004(温度测点 72178,湿度测点 72192),6500030005(温度测点 72180,湿度测点 72166)
|
|
|
|
//935(Gnss) ,883(湿度,果子沟特有),4(温度),6索力, 15, 18, 20
|
|
structIds := the.getStructIds()
|
|
for _, structId := range structIds {
|
|
adaptor := the.getAdaptor()
|
|
adaptor.PointInfo = the.Info.PointInfo
|
|
adaptor.StructInfo = the.Info.StructInfo
|
|
for _, factorId := range hourFactorIds {
|
|
esQuery := the.getESQueryStrByHour(structId, factorId, start, end)
|
|
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
|
|
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
|
|
|
|
needPushes := adaptor.Transform(structId, factorId, esAggResultStr)
|
|
for i := range needPushes {
|
|
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload)
|
|
println("mqtt 推送报文=", hex.EncodeToString(needPushes[i].Payload))
|
|
}
|
|
|
|
if len(needPushes) > 0 {
|
|
the.ch <- needPushes
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (the *consumerGZG2ZJHL) getEs10minAggData() {
|
|
start, end := utils.GetTimeRangeBy10minByOffset(-20)
|
|
//start, end := "2025-02-28T15:30:00.000+08:00", "2025-02-28T15:40:00.000+08:00"
|
|
log.Printf("查询10min数据时间范围 %s - %s", start, end)
|
|
factorIds := []int{756, 156, 225} //监测因素 加速度756 风速156 风向225 需要合并风向
|
|
structIds := the.getStructIds()
|
|
for _, structId := range structIds {
|
|
adaptor := the.getAdaptor()
|
|
adaptor.PointInfo = the.Info.PointInfo
|
|
adaptor.StructInfo = the.Info.StructInfo
|
|
for _, factorId := range factorIds {
|
|
esQuery := the.getESQueryStrBy10min(structId, factorId, start, end)
|
|
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
|
|
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
|
|
|
|
needPushes := adaptor.Transform(structId, factorId, esAggResultStr)
|
|
for i := range needPushes {
|
|
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload)
|
|
log.Printf("topic[%s],Payload=> %s", needPushes[i].Topic, hex.EncodeToString(needPushes[i].Payload))
|
|
}
|
|
|
|
if len(needPushes) > 0 {
|
|
the.ch <- needPushes
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func (the *consumerGZG2ZJHL) crc16rc4(transBytes []byte) []byte {
|
|
resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes)
|
|
needRC4 := append(transBytes, resultByCrc16...)
|
|
rc4KeyStr, ok := the.Info.OtherInfo["rc4key"]
|
|
if !ok {
|
|
log.Panicf("未配置 rc4key")
|
|
}
|
|
rc4Key := []byte(rc4KeyStr) //the.RC4Key
|
|
// 加密操作
|
|
dest1 := make([]byte, len(needRC4))
|
|
rc4.NewCipher(rc4Key)
|
|
cipher1, _ := rc4.NewCipher(rc4Key)
|
|
cipher1.XORKeyStream(dest1, needRC4)
|
|
return dest1
|
|
}
|
|
|
|
func (the *consumerGZG2ZJHL) getESQueryStrByHour(structureId int64, factorId int, start, end string) string {
|
|
aggSubSql := getEsAggSubSqlByZwyFactorId(factorId)
|
|
esQuery := fmt.Sprintf(`
|
|
{
|
|
"size": 0,
|
|
"query": {
|
|
"bool": {
|
|
"must": [
|
|
{
|
|
"term": {
|
|
"structure": {
|
|
"value": %d
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"term": {
|
|
"factor": {
|
|
"value": %d
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"range": {
|
|
"collect_time": {
|
|
"gte": "%s",
|
|
"lt": "%s"
|
|
}
|
|
}
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"aggs": {
|
|
"groupSensor": {
|
|
"terms": {
|
|
"field": "sensor",
|
|
"size": 100
|
|
},
|
|
"aggs": {
|
|
"groupDate": {
|
|
"date_histogram": {
|
|
"field": "collect_time",
|
|
"interval": "1h",
|
|
"time_zone": "Asia/Shanghai",
|
|
"min_doc_count": 1
|
|
},
|
|
"aggs": %s
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
`, structureId, factorId, start, end, aggSubSql)
|
|
|
|
return esQuery
|
|
}
|
|
|
|
func (the *consumerGZG2ZJHL) getESQueryStrBy10min(structureId int64, factorId int, start, end string) string {
|
|
aggSubSql := getEsAggSubSqlByZwyFactorId(factorId)
|
|
esQuery := fmt.Sprintf(`
|
|
{
|
|
"size": 0,
|
|
"query": {
|
|
"bool": {
|
|
"must": [
|
|
{
|
|
"term": {
|
|
"structure": {
|
|
"value": %d
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"term": {
|
|
"factor": {
|
|
"value": %d
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"range": {
|
|
"collect_time": {
|
|
"gte": "%s",
|
|
"lte": "%s"
|
|
}
|
|
}
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"aggs": {
|
|
"groupSensor": {
|
|
"terms": {
|
|
"field": "sensor",
|
|
"size": 100
|
|
},
|
|
"aggs": {
|
|
"groupDate": {
|
|
"date_histogram": {
|
|
"field": "collect_time",
|
|
"interval": "10m",
|
|
"time_zone": "Asia/Shanghai",
|
|
"min_doc_count": 1
|
|
},
|
|
"aggs": %s
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
`, structureId, factorId, start, end, aggSubSql)
|
|
|
|
return esQuery
|
|
}
|
|
|
|
func getEsAggSubSqlByZwyFactorId(factorId int) string {
|
|
//桥墩倾斜 15 裂缝 18 支座位移20 挠度19 桥面振动28 加速度756 Gnss935
|
|
subAggSQl := ""
|
|
switch factorId {
|
|
case 4: //结构温度
|
|
subAggSQl = `
|
|
{
|
|
"x": {
|
|
"extended_stats": {
|
|
"field": "data.temperature"
|
|
}
|
|
}
|
|
}`
|
|
case 6: // 索力
|
|
subAggSQl = `
|
|
{
|
|
"x": {
|
|
"extended_stats": {
|
|
"field": "data.cableForce"
|
|
}
|
|
}
|
|
}`
|
|
case 7: // 称重 车载
|
|
subAggSQl = `
|
|
{
|
|
"x": {
|
|
"extended_stats": {
|
|
"field": "data.load"
|
|
}
|
|
},
|
|
"y": {
|
|
"extended_stats": {
|
|
"field": "data.overload"
|
|
}
|
|
}
|
|
}`
|
|
case 11: //应变
|
|
subAggSQl = `
|
|
{
|
|
"x": {
|
|
"extended_stats": {
|
|
"field": "data.strain"
|
|
}
|
|
}
|
|
}`
|
|
case 15:
|
|
subAggSQl = `
|
|
{
|
|
"x": {
|
|
"extended_stats": {
|
|
"field": "data.x"
|
|
}
|
|
},
|
|
"y": {
|
|
"extended_stats": {
|
|
"field": "data.y"
|
|
}
|
|
}
|
|
}`
|
|
case 18:
|
|
subAggSQl = `
|
|
{
|
|
"x": {
|
|
"extended_stats": {
|
|
"field": "data.crack"
|
|
}
|
|
}
|
|
}`
|
|
case 19:
|
|
subAggSQl = `
|
|
{
|
|
"x": {
|
|
"extended_stats": {
|
|
"field": "data.deflection"
|
|
}
|
|
}
|
|
}`
|
|
case 20, 24:
|
|
subAggSQl = `
|
|
{
|
|
"x": {
|
|
"extended_stats": {
|
|
"field": "data.displacement"
|
|
}
|
|
}
|
|
}`
|
|
case 28:
|
|
subAggSQl = `
|
|
{
|
|
"x": {
|
|
"extended_stats": {
|
|
"field": "data.pv"
|
|
}
|
|
},
|
|
"y": {
|
|
"extended_stats": {
|
|
"field": "data.trms"
|
|
}
|
|
}
|
|
}`
|
|
case 156:
|
|
subAggSQl = `
|
|
{
|
|
"x": {
|
|
"extended_stats": {
|
|
"field": "data.speed"
|
|
}
|
|
}
|
|
}`
|
|
case 225:
|
|
subAggSQl = `
|
|
{
|
|
"x": {
|
|
"extended_stats": {
|
|
"field": "data.direction"
|
|
}
|
|
}
|
|
}`
|
|
case 756: //加速度 m/s²
|
|
subAggSQl = `
|
|
{
|
|
"x": {
|
|
"extended_stats": {
|
|
"field": "data.acceler"
|
|
}
|
|
}
|
|
}`
|
|
case 883: //湿度 (后期需要合并3个温度 拼成 温湿度)
|
|
subAggSQl = `
|
|
{
|
|
"x": {
|
|
"extended_stats": {
|
|
"field": "data.humidity"
|
|
}
|
|
}
|
|
}`
|
|
case 935:
|
|
subAggSQl = `
|
|
{
|
|
"x": {
|
|
"extended_stats": {
|
|
"field": "data.x"
|
|
}
|
|
},
|
|
"y": {
|
|
"extended_stats": {
|
|
"field": "data.y"
|
|
}
|
|
},
|
|
"z": {
|
|
"extended_stats": {
|
|
"field": "data.z"
|
|
}
|
|
}
|
|
}`
|
|
}
|
|
return subAggSQl
|
|
}
|
|
|
|
func (the *consumerGZG2ZJHL) getStructureId() string {
|
|
structureId, ok := the.Info.OtherInfo["structureId"]
|
|
if !ok {
|
|
log.Panicf("无法识别有效的structureId")
|
|
}
|
|
return structureId
|
|
}
|
|
|