|
|
@ -1,6 +1,8 @@ |
|
|
|
package consumers |
|
|
|
|
|
|
|
import ( |
|
|
|
"crypto/rc4" |
|
|
|
"encoding/hex" |
|
|
|
"encoding/json" |
|
|
|
"fmt" |
|
|
|
"goInOut/adaptors" |
|
|
@ -74,7 +76,14 @@ func (the *consumerHBJCAS) Work() { |
|
|
|
for _, push := range needPushList { |
|
|
|
if push.Topic != "" { |
|
|
|
the.outMqtt.Publish(push.Topic, push.Payload) |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
//没有标记topic 的 按照配置文件里面的推送
|
|
|
|
for _, topic := range the.Info.IoConfig.Out.Mqtt.Topics { |
|
|
|
the.outMqtt.Publish(topic, push.Payload) |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
time.Sleep(100 * time.Millisecond) |
|
|
@ -91,24 +100,41 @@ func (the *consumerHBJCAS) getEsData() { |
|
|
|
structureId := the.getStructureId() |
|
|
|
start, end := utils.GetTimeRangeByHour(-1) |
|
|
|
log.Printf("查询数据时间范围 %s - %s", start, end) |
|
|
|
//start := "2024-02-05T00:00:00.000+0800"
|
|
|
|
//end := "2024-02-05T23:59:59.999+0800"
|
|
|
|
|
|
|
|
factorIds := []string{"15", "20", "28"} |
|
|
|
for _, factorId := range factorIds { |
|
|
|
esQuery := the.getESQueryStr(structureId, factorId, start, end) |
|
|
|
hourFactorIds := []string{"15", "20"} //, "28"
|
|
|
|
for _, factorId := range hourFactorIds { |
|
|
|
esQuery := the.getESQueryStrByHour(structureId, factorId, start, end) |
|
|
|
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} |
|
|
|
esAggResult := the.InHttp.HttpGetWithHeader(esQuery, auth) |
|
|
|
|
|
|
|
adaptor := the.getAdaptor() |
|
|
|
needPush := adaptor.Transform(esAggResult) |
|
|
|
if len(needPush) > 0 { |
|
|
|
the.ch <- needPush |
|
|
|
needPushes := adaptor.Transform(esAggResult) |
|
|
|
for i := range needPushes { |
|
|
|
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) |
|
|
|
} |
|
|
|
|
|
|
|
if len(needPushes) > 0 { |
|
|
|
the.ch <- needPushes |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (the *consumerHBJCAS) getESQueryStr(structureId, factorId, start, end string) string { |
|
|
|
} |
|
|
|
func (the *consumerHBJCAS) crc16rc4(transBytes []byte) []byte { |
|
|
|
resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes) |
|
|
|
needRC4 := append(transBytes, resultByCrc16...) |
|
|
|
rc4KeyStr, ok := the.Info.OtherInfo["rc4key"] |
|
|
|
if !ok { |
|
|
|
log.Panicf("未配置 rc4key") |
|
|
|
} |
|
|
|
rc4Key := []byte(rc4KeyStr) //the.RC4Key
|
|
|
|
// 加密操作
|
|
|
|
dest1 := make([]byte, len(needRC4)) |
|
|
|
rc4.NewCipher(rc4Key) |
|
|
|
cipher1, _ := rc4.NewCipher(rc4Key) |
|
|
|
cipher1.XORKeyStream(dest1, needRC4) |
|
|
|
log.Printf("rc4加密结果=> %s ", hex.EncodeToString(dest1)) |
|
|
|
return dest1 |
|
|
|
} |
|
|
|
func (the *consumerHBJCAS) getESQueryStrByHour(structureId, factorId, start, end string) string { |
|
|
|
aggSubSql := getEsAggSubSqlByFactorId(factorId) |
|
|
|
esQuery := fmt.Sprintf(` |
|
|
|
{ |
|
|
@ -207,7 +233,7 @@ func getEsAggSubSqlByFactorId(factorId string) string { |
|
|
|
func (the *consumerHBJCAS) getStructureId() string { |
|
|
|
structureId, ok := the.Info.OtherInfo["structureId"] |
|
|
|
if !ok { |
|
|
|
structureId = "5016" //河北承德乃积沟大桥结构物id=5016
|
|
|
|
log.Panicf("无法识别有效的structureId") |
|
|
|
} |
|
|
|
return structureId |
|
|
|
} |
|
|
|