数据 输入输出 处理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

378 lines
8.5 KiB

package consumers
import (
"crypto/rc4"
"encoding/hex"
"fmt"
"goInOut/adaptors"
"goInOut/consumers/HBJCAS"
"goInOut/dbOperate"
"goInOut/monitors"
"goInOut/utils"
"gopkg.in/yaml.v3"
"log"
"time"
)
type consumerHBJCAS struct {
//数据缓存管道
ch chan []adaptors.NeedPush
//具体配置
Info HBJCAS.ConfigFile
InHttp *dbOperate.HttpHelper
outMqtt *dbOperate.MqttHelper
monitor *monitors.CommonMonitor
infoRedis *dbOperate.RedisHelper
}
func (the *consumerHBJCAS) LoadConfigJson(cfgStr string) {
// 将 yaml 格式的数据解析到结构体中
err := yaml.Unmarshal([]byte(cfgStr), &the.Info)
if err != nil {
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
panic(err)
}
}
func (the *consumerHBJCAS) Initial(cfg string) error {
the.LoadConfigJson(cfg)
err := the.InputInitial()
if err != nil {
return err
}
err = the.OutputInitial()
if err != nil {
return err
}
err = the.infoComponentInitial()
return err
}
func (the *consumerHBJCAS) InputInitial() error {
the.ch = make(chan []adaptors.NeedPush, 200)
//数据入口
the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""}
the.monitor = &monitors.CommonMonitor{
MonitorHelper: &monitors.MonitorHelper{},
}
the.monitor.Start()
for taskName, cron := range the.Info.Monitor {
switch taskName {
case "cron10min":
the.monitor.RegisterTask(cron, the.getEs10minAggData)
case "cron1hour":
the.monitor.RegisterTask(cron, the.getEs1HourAggData)
default:
log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron)
}
}
return nil
}
func (the *consumerHBJCAS) OutputInitial() error {
//数据出口
the.outMqtt = dbOperate.MqttInitial(
the.Info.IoConfig.Out.Mqtt.Host,
the.Info.IoConfig.Out.Mqtt.Port,
the.Info.IoConfig.Out.Mqtt.ClientId,
the.Info.IoConfig.Out.Mqtt.UserName,
the.Info.IoConfig.Out.Mqtt.Password,
true, //按照具体项目来
"consumers/HBJCAS/ssl/cacert.pem",
"consumers/HBJCAS/ssl/client-cert.pem",
"consumers/HBJCAS/ssl/client-key.pem")
return nil
}
func (the *consumerHBJCAS) infoComponentInitial() error {
//数据出口
addr := the.Info.QueryComponent.Redis.Address
the.infoRedis = dbOperate.NewRedisHelper("", addr)
return nil
}
func (the *consumerHBJCAS) Work() {
go func() {
for {
needPushList := <-the.ch
if len(the.ch) > 0 {
log.Printf("取出ch数据,剩余[%d] ", len(the.ch))
}
for _, push := range needPushList {
if push.Topic != "" {
the.outMqtt.Publish(push.Topic, push.Payload)
continue
}
//没有标记topic 的 按照配置文件里面的推送
for _, topic := range the.Info.IoConfig.Out.Mqtt.Topics {
the.outMqtt.Publish(topic, push.Payload)
}
}
time.Sleep(100 * time.Millisecond)
}
}()
}
func (the *consumerHBJCAS) getAdaptor() (adaptor adaptors.Adaptor_AXYES_HBGL) {
return adaptors.Adaptor_AXYES_HBGL{
Redis: the.infoRedis,
}
}
func (the *consumerHBJCAS) getStructIds() []int64 {
var structIds []int64
for strutId, _ := range the.Info.StructInfo {
structIds = append(structIds, strutId)
}
return structIds
}
func (the *consumerHBJCAS) getEs1HourAggData() {
start, end := utils.GetTimeRangeByHour(-1)
log.Printf("查询数据时间范围 %s - %s", start, end)
hourFactorIds := []int{15, 18, 20}
structIds := the.getStructIds()
for _, structId := range structIds {
for _, factorId := range hourFactorIds {
esQuery := the.getESQueryStrByHour(structId, factorId, start, end)
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
adaptor := the.getAdaptor()
adaptor.PointInfo = the.Info.PointInfo
adaptor.StructInfo = the.Info.StructInfo
needPushes := adaptor.Transform(structId, factorId, esAggResultStr)
for i := range needPushes {
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload)
}
if len(needPushes) > 0 {
the.ch <- needPushes
}
}
}
}
func (the *consumerHBJCAS) getEs10minAggData() {
//utils.GetTimeRangeBy10min() 由于振动数据实时性问题 改用一小时统一上报
start, end := utils.GetTimeRangeByHour(-1)
log.Printf("查询10min数据时间范围 %s - %s", start, end)
factorIds := []int{28}
structIds := the.getStructIds()
for _, structId := range structIds {
for _, factorId := range factorIds {
esQuery := the.getESQueryStrBy10min(structId, factorId, start, end)
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
adaptor := the.getAdaptor()
adaptor.PointInfo = the.Info.PointInfo
adaptor.StructInfo = the.Info.StructInfo
needPushes := adaptor.Transform(structId, factorId, esAggResultStr)
for i := range needPushes {
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload)
log.Printf("topic[%s],Payload=> %s", needPushes[i].Topic, hex.EncodeToString(needPushes[i].Payload))
}
if len(needPushes) > 0 {
the.ch <- needPushes
}
}
}
}
func (the *consumerHBJCAS) crc16rc4(transBytes []byte) []byte {
resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes)
needRC4 := append(transBytes, resultByCrc16...)
rc4KeyStr, ok := the.Info.OtherInfo["rc4key"]
if !ok {
log.Panicf("未配置 rc4key")
}
rc4Key := []byte(rc4KeyStr) //the.RC4Key
// 加密操作
dest1 := make([]byte, len(needRC4))
rc4.NewCipher(rc4Key)
cipher1, _ := rc4.NewCipher(rc4Key)
cipher1.XORKeyStream(dest1, needRC4)
return dest1
}
func (the *consumerHBJCAS) getESQueryStrByHour(structureId int64, factorId int, start, end string) string {
aggSubSql := getEsAggSubSqlByFactorId(factorId)
esQuery := fmt.Sprintf(`
{
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {
"structure": {
"value": %d
}
}
},
{
"term": {
"factor": {
"value": %d
}
}
},
{
"range": {
"collect_time": {
"gte": "%s",
"lt": "%s"
}
}
}
]
}
},
"aggs": {
"groupSensor": {
"terms": {
"field": "sensor"
},
"aggs": {
"groupDate": {
"date_histogram": {
"field": "collect_time",
"interval": "1h",
"time_zone": "Asia/Shanghai",
"min_doc_count": 1
},
"aggs": %s
}
}
}
}
}
`, structureId, factorId, start, end, aggSubSql)
return esQuery
}
func (the *consumerHBJCAS) getESQueryStrBy10min(structureId int64, factorId int, start, end string) string {
aggSubSql := getEsAggSubSqlByFactorId(factorId)
esQuery := fmt.Sprintf(`
{
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {
"structure": {
"value": %d
}
}
},
{
"term": {
"factor": {
"value": %d
}
}
},
{
"range": {
"collect_time": {
"gte": "%s",
"lte": "%s"
}
}
}
]
}
},
"aggs": {
"groupSensor": {
"terms": {
"field": "sensor"
},
"aggs": {
"groupDate": {
"date_histogram": {
"field": "collect_time",
"interval": "10m",
"time_zone": "Asia/Shanghai",
"min_doc_count": 1
},
"aggs": %s
}
}
}
}
}
`, structureId, factorId, start, end, aggSubSql)
return esQuery
}
func getEsAggSubSqlByFactorId(factorId int) string {
//桥墩倾斜 15 裂缝 18 支座位移20 桥面振动28
subAggSQl := ""
switch factorId {
case 15:
subAggSQl = `
{
"x": {
"extended_stats": {
"field": "data.x"
}
},
"y": {
"extended_stats": {
"field": "data.y"
}
}
}`
case 18:
subAggSQl = `
{
"x": {
"extended_stats": {
"field": "data.crack"
}
}
}`
case 20:
subAggSQl = `
{
"x": {
"extended_stats": {
"field": "data.displacement"
}
}
}`
case 28:
subAggSQl = `
{
"x": {
"extended_stats": {
"field": "data.pv"
}
},
"y": {
"extended_stats": {
"field": "data.trms"
}
}
}`
}
return subAggSQl
}
func (the *consumerHBJCAS) getStructureId() string {
structureId, ok := the.Info.OtherInfo["structureId"]
if !ok {
log.Panicf("无法识别有效的structureId")
}
return structureId
}