Browse Source

update 安心云2广东省平台 主体功能版本

dev
lucas 6 months ago
parent
commit
1e38ee2367
  1. 123
      adaptors/安心云es主题特征to广东省平台.go
  2. 24
      configFiles/config_安心云测点特征数据_广东省平台.yaml
  3. 7
      consumers/GDJKJC/config.go
  4. 73
      consumers/consumerAXYES2GDJKJC.go
  5. 3
      consumers/consumerManage.go
  6. 2
      dbOperate/dbHelper.go
  7. 10
      dbOperate/httpHelper.go
  8. 49
      utils/signByGDJKJC.go

123
adaptors/安心云es主题特征to广东省平台.go

@ -2,12 +2,11 @@ package adaptors
import (
"encoding/json"
"fmt"
"goInOut/consumers/GDJKJC"
"goInOut/consumers/HBJCAS"
"goInOut/consumers/HBJCAS/protoFiles_hb"
"goInOut/dbOperate"
"goInOut/models"
"google.golang.org/protobuf/proto"
"log"
"math"
"strconv"
@ -33,18 +32,22 @@ func (the Adaptor_AXYES_GDJKJC) Transform(structId int64, factorId int, rawMsg s
return nil
}
Payload := the.EsAggTopToGDJKJC(structId, factorId, esAggDateHistogram)
if len(Payload) == 0 {
Payloads := the.EsAggTopToGDJKJC(structId, factorId, esAggDateHistogram)
if len(Payloads) == 0 {
return needPush
}
for _, payload := range Payloads {
needPush = append(needPush, NeedPush{
Payload: Payload,
Topic: strconv.FormatInt(structId, 10),
Payload: payload,
})
}
return needPush
}
func (the Adaptor_AXYES_GDJKJC) EsAggTopToGDJKJC(structId int64, factorId int, esAggs HBJCAS.EsThemeAggDateHistogram) (result []byte) {
func (the Adaptor_AXYES_GDJKJC) EsAggTopToGDJKJC(structId int64, factorId int, esAggs HBJCAS.EsThemeAggDateHistogram) (result [][]byte) {
buckets := esAggs.Aggregations.GroupSensor.Buckets
if len(buckets) == 0 {
log.Printf("[s=%d,f=%d] ,es agg数据数量==0", structId, factorId)
@ -57,35 +60,33 @@ func (the Adaptor_AXYES_GDJKJC) EsAggTopToGDJKJC(structId int64, factorId int, e
return
}
//数据汇总
complexData := &protoFiles_hb.ComplexData{}
for _, sensorBucket := range buckets {
sensorId := sensorBucket.Key
for _, dateBucket := range sensorBucket.GroupDate.Buckets {
//优先redis获取
station := models.Station{}
k1 := fmt.Sprintf("station:%d", sensorId)
errRedis := the.Redis.GetObj(k1, &station)
if errRedis != nil {
log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签异常", structId, factorId, sensorId)
continue
}
monitorCode := the.getPointCodeFromLabel(station.Labels)
if monitorCode == 0 {
log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签,信息转换int64异常,跳过", structId, factorId, sensorId)
//station := models.Station{}
//k1 := fmt.Sprintf("station:%d", sensorId)
//errRedis := the.Redis.GetObj(k1, &station)
//if errRedis != nil {
// log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签异常", structId, factorId, sensorId)
// continue
//}
monitorCode := "LJ-VIB-P01-004-01" //the.getPointCodeFromLabel(station.Labels)
if monitorCode == "" {
log.Printf("redis 获取[s:%d,f:%d]测点[%d]信息,异常,跳过", structId, factorId, sensorId)
continue
}
dataDefinition := &protoFiles_hb.DataDefinition{
DataType: protoFiles_hb.DataType_STATISTICS,
UniqueCode: fmt.Sprintf("%d", uniqueCode), //乃积沟大桥
DataBody: the.EsAgg2StatisticData(factorId, monitorCode, dateBucket),
}
complexData.SensorData = append(complexData.SensorData, dataDefinition)
dataBytes := the.EsAgg2StatisticData(factorId, monitorCode, dateBucket)
//dataDefinition := &protoFiles_hb.DataDefinition{
// DataType: protoFiles_hb.DataType_STATISTICS,
// UniqueCode: fmt.Sprintf("%d", uniqueCode), //乃积沟大桥
// DataBody: the.EsAgg2StatisticData(factorId, monitorCode, dateBucket),
//}
result = append(result, dataBytes)
log.Printf("[s:%d,f:%d] t=%s, 特征数据=> %s", structId, factorId, dateBucket.KeyAsString, dataBytes)
}
}
v, _ := json.Marshal(complexData)
log.Printf("[s:%d,f:%d] 特征数据=> %s", structId, factorId, v)
result, _ = proto.Marshal(complexData)
return result
}
func (the Adaptor_AXYES_GDJKJC) getMonitorTypeByFactorId(factorId int) protoFiles_hb.MonitoryType {
@ -107,7 +108,7 @@ func (the Adaptor_AXYES_GDJKJC) getMonitorTypeByFactorId(factorId int) protoFile
}
}
func (the Adaptor_AXYES_GDJKJC) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket HBJCAS.BucketsXY) *protoFiles_hb.DataDefinition_StatisticData {
func (the Adaptor_AXYES_GDJKJC) EsAgg2StatisticData(factorId int, monitorCode string, dateBucket HBJCAS.BucketsXY) (dataBytes []byte) {
Atime := dateBucket.KeyAsString.Add(-8 * time.Hour).UnixMilli()
maxAbsoluteValueX := max(math.Abs(dateBucket.X.Max), math.Abs(dateBucket.X.Min))
avgValueX := dateBucket.X.Avg
@ -117,53 +118,35 @@ func (the Adaptor_AXYES_GDJKJC) EsAgg2StatisticData(factorId int, monitorCode in
avgValueY := dateBucket.Y.Avg
rootMeanSquareY := math.Sqrt(dateBucket.Y.SumOfSquares / float64(dateBucket.Y.Count))
monitoryType := the.getMonitorTypeByFactorId(factorId)
dataDefinitionStatisticData := &protoFiles_hb.DataDefinition_StatisticData{
StatisticData: &protoFiles_hb.StatisticData{
MonitorType: monitoryType,
MonitorCode: monitorCode, //测点唯一编码
EventTime: Atime,
Interval: 60 * 1000,
},
commonBody := GDJKJC.CommonBody{
ThirdChannelCode: monitorCode,
DataTimeUnix: Atime,
}
var destStruct any
switch factorId {
case 15: //倾角
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Inc{Inc: &protoFiles_hb.INCStatistic{
MaxAbsoluteValueX: float32(maxAbsoluteValueX),
AvgValueX: float32(avgValueX),
RootMeanSquareX: float32(rootMeanSquareX),
MaxAbsoluteValueY: float32(maxAbsoluteValueY),
AvgValueY: float32(avgValueY),
RootMeanSquareY: float32(rootMeanSquareY),
}}
case 18: //裂缝监测
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Crk{Crk: &protoFiles_hb.CRKStatistic{
MaxAbsoluteValue: float32(maxAbsoluteValueX),
AvgValue: float32(avgValueX),
RootMeanSquare: float32(rootMeanSquareX),
TotalAbsoluteValue: float32(dateBucket.X.Max - dateBucket.X.Min),
}}
case 20: //支座位移
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Dis{Dis: &protoFiles_hb.DISStatistic{
MaxAbsoluteValue: float32(maxAbsoluteValueX),
AvgValue: float32(avgValueX),
RootMeanSquare: float32(rootMeanSquareX),
TotalAbsoluteValue: float32(dateBucket.X.Max - dateBucket.X.Min),
}}
destStruct = GDJKJC.A43AngleBody{
CommonBody: commonBody,
XAvg: avgValueX,
XMax: maxAbsoluteValueX,
XRms: rootMeanSquareX, //均方根
YAvg: avgValueY,
YMax: maxAbsoluteValueY,
YRms: rootMeanSquareY, //均方根
}
case 28: //振动
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Vib{Vib: &protoFiles_hb.VIBStatistic{
MaxAbsoluteValue: float32(maxAbsoluteValueX),
RootMeanSquare: float32(rootMeanSquareY),
}}
case 592: //加速度三项监测
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Vib{Vib: &protoFiles_hb.VIBStatistic{
MaxAbsoluteValue: float32(maxAbsoluteValueX),
RootMeanSquare: float32(rootMeanSquareX),
}}
destStruct = GDJKJC.A48Acc{
CommonBody: commonBody,
Max: maxAbsoluteValueX / 100, //省平台 加速度单位 m/s² 实际设备单位 cm/s²
Rms: rootMeanSquareX / 100, //均方根
}
return dataDefinitionStatisticData
}
if destStruct != nil {
dataBytes, _ = json.Marshal(destStruct)
} else {
log.Printf("!!! [f=%d,s=%s]无匹配 映射数据", factorId, monitorCode)
}
return dataBytes
}
func (the Adaptor_AXYES_GDJKJC) getUniqueCode(structId int64) (uniqueCode string) {

24
configFiles/config_安心云测点特征数据_广东省平台.yaml

@ -9,19 +9,31 @@ ioConfig:
method: "post"
monitor:
#振动是触发式,数据迟缓 cron10min也改成1小时一次 最多上报6条,不进行10min实时上报
cron10min: 22 0/1 * * * #6/10 * * * *
cron10min: 36 0/1 * * * #6/10 * * * *
#普通类型 特征数据
cron1hour: 20 0/1 * * *
info:
rc4key: sK3kJttzZyf7aZI94zSYgytBYCrfZRt6yil2
5450: #隆江大桥
appKey: db43bc5d74534348
appSecret: 162c0a92f089464eaf9349358af4a830
5452: #深汕东边坡
appKey: cdf1493747174885
appSecret: 563e5d752181450da3b9d1617f75b527
5455: #螺河特大桥
appKey: 11b8f81901134570
appSecret: 1505ef9480e4491bb578e2d7d9781620
5456: #螺河东大桥
appKey: d20477c3247b4012
appSecret: 3d9bfb809be545bba11af17a88c81eb3
queryComponent:
redis:
address: 10.8.30.160:30379
#结构物id对应
structInfo:
5450: G0000000000001
5452: G0000000000002
5455: G0000000000003
5456: G0000000000004
5450: G0000000000001 #隆江大桥
5452: G0000000000002 #深汕东边坡
5455: G0000000000003 #螺河特大桥
5456: G0000000000004 #螺河东大桥

7
consumers/GDJKJC/config.go

@ -4,7 +4,7 @@ import "goInOut/config"
type ConfigFile struct {
IoConfig ioConfig `yaml:"ioConfig"`
OtherInfo map[string]string `yaml:"info"`
Info map[string]AppKeySecret `yaml:"info"`
PointInfo map[int64]map[int64]int64 `yaml:"pointInfo"`
StructInfo map[int64]string `yaml:"structInfo"`
Monitor map[string]string `yaml:"monitor"`
@ -27,3 +27,8 @@ type queryComponent struct {
Address string `yaml:"address"`
} `yaml:"redis"`
}
type AppKeySecret struct {
AppKey string `yaml:"appKey"`
AppSecret string `yaml:"appSecret"`
}

73
consumers/consumerAXYES2GDJKJC.go

@ -1,8 +1,6 @@
package consumers
import (
"crypto/rc4"
"encoding/hex"
"fmt"
"goInOut/adaptors"
"goInOut/consumers/GDJKJC"
@ -11,6 +9,7 @@ import (
"goInOut/utils"
"gopkg.in/yaml.v3"
"log"
"strconv"
"time"
)
@ -18,7 +17,7 @@ type consumerAXYES2GDJKJC struct {
//数据缓存管道
ch chan []adaptors.NeedPush
//具体配置
Info GDJKJC.ConfigFile
ConfigInfo GDJKJC.ConfigFile
InHttp *dbOperate.HttpHelper
outHttp *dbOperate.HttpHelper
monitor *monitors.CommonMonitor
@ -26,7 +25,7 @@ type consumerAXYES2GDJKJC struct {
}
func (the *consumerAXYES2GDJKJC) LoadConfigJson(cfgStr string) {
err := yaml.Unmarshal([]byte(cfgStr), &the.Info)
err := yaml.Unmarshal([]byte(cfgStr), &the.ConfigInfo)
if err != nil {
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
panic(err)
@ -49,13 +48,13 @@ func (the *consumerAXYES2GDJKJC) Initial(cfg string) error {
func (the *consumerAXYES2GDJKJC) InputInitial() error {
the.ch = make(chan []adaptors.NeedPush, 200)
//数据入口
the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""}
the.InHttp = &dbOperate.HttpHelper{Url: the.ConfigInfo.IoConfig.In.Http.Url, Token: ""}
the.monitor = &monitors.CommonMonitor{
MonitorHelper: &monitors.MonitorHelper{},
}
the.monitor.Start()
for taskName, cron := range the.Info.Monitor {
for taskName, cron := range the.ConfigInfo.Monitor {
switch taskName {
case "cron10min":
the.monitor.RegisterTask(cron, the.GetEs10minAggData)
@ -70,13 +69,13 @@ func (the *consumerAXYES2GDJKJC) InputInitial() error {
}
func (the *consumerAXYES2GDJKJC) OutputInitial() error {
//数据出口
the.outHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.Out.Http.Url, Token: ""}
the.outHttp = &dbOperate.HttpHelper{Url: the.ConfigInfo.IoConfig.Out.Http.Url, Token: ""}
return nil
}
func (the *consumerAXYES2GDJKJC) infoComponentInitial() error {
//数据出口
addr := the.Info.QueryComponent.Redis.Address
addr := the.ConfigInfo.QueryComponent.Redis.Address
the.infoRedis = dbOperate.NewRedisHelper("", addr)
return nil
}
@ -89,7 +88,22 @@ func (the *consumerAXYES2GDJKJC) Work() {
}
for _, push := range needPushList {
_, err := the.outHttp.Publish(push.Payload)
structIdStr := push.Topic
if _, ok := the.ConfigInfo.Info[structIdStr]; !ok {
log.Printf("structId=%s 无匹配的省平台AppKeySecret", structIdStr)
continue
}
appKey := the.ConfigInfo.Info[structIdStr].AppKey
appSecret := the.ConfigInfo.Info[structIdStr].AppSecret
rnd := strconv.FormatInt(time.Now().Unix(), 10)
Header := map[string]string{
"appKey": appKey,
"rnd": rnd,
"sign": utils.GetSign(string(push.Payload), rnd, appKey, appSecret),
}
_, err := the.outHttp.PublishWithHeader(push.Payload, Header)
if err != nil {
log.Printf("数据推送异常=> %s", err.Error())
return
@ -110,7 +124,7 @@ func (the *consumerAXYES2GDJKJC) getAdaptor() (adaptor adaptors.Adaptor_AXYES_GD
func (the *consumerAXYES2GDJKJC) getStructIds() []int64 {
var structIds []int64
for strutId, _ := range the.Info.StructInfo {
for strutId, _ := range the.ConfigInfo.StructInfo {
structIds = append(structIds, strutId)
}
return structIds
@ -127,12 +141,9 @@ func (the *consumerAXYES2GDJKJC) getEs1HourAggData() {
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
adaptor := the.getAdaptor()
adaptor.PointInfo = the.Info.PointInfo
adaptor.StructInfo = the.Info.StructInfo
adaptor.PointInfo = the.ConfigInfo.PointInfo
adaptor.StructInfo = the.ConfigInfo.StructInfo
needPushes := adaptor.Transform(structId, factorId, esAggResultStr)
for i := range needPushes {
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload)
}
if len(needPushes) > 0 {
the.ch <- needPushes
@ -155,13 +166,9 @@ func (the *consumerAXYES2GDJKJC) GetEs10minAggData() {
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
adaptor := the.getAdaptor()
adaptor.PointInfo = the.Info.PointInfo
adaptor.StructInfo = the.Info.StructInfo
adaptor.PointInfo = the.ConfigInfo.PointInfo
adaptor.StructInfo = the.ConfigInfo.StructInfo
needPushes := adaptor.Transform(structId, factorId, esAggResultStr)
for i := range needPushes {
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload)
log.Printf("topic[%s],Payload=> %s", needPushes[i].Topic, hex.EncodeToString(needPushes[i].Payload))
}
if len(needPushes) > 0 {
the.ch <- needPushes
@ -171,22 +178,6 @@ func (the *consumerAXYES2GDJKJC) GetEs10minAggData() {
}
func (the *consumerAXYES2GDJKJC) crc16rc4(transBytes []byte) []byte {
resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes)
needRC4 := append(transBytes, resultByCrc16...)
rc4KeyStr, ok := the.Info.OtherInfo["rc4key"]
if !ok {
log.Panicf("未配置 rc4key")
}
rc4Key := []byte(rc4KeyStr) //the.RC4Key
// 加密操作
dest1 := make([]byte, len(needRC4))
rc4.NewCipher(rc4Key)
cipher1, _ := rc4.NewCipher(rc4Key)
cipher1.XORKeyStream(dest1, needRC4)
return dest1
}
func (the *consumerAXYES2GDJKJC) getESQueryStrByHour(structureId int64, factorId int, start, end string) string {
aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId)
esQuery := fmt.Sprintf(`
@ -300,11 +291,3 @@ func (the *consumerAXYES2GDJKJC) getESQueryStrBy10min(structureId int64, factorI
return esQuery
}
func (the *consumerAXYES2GDJKJC) getStructureId() string {
structureId, ok := the.Info.OtherInfo["structureId"]
if !ok {
log.Panicf("无法识别有效的structureId")
}
return structureId
}

3
consumers/consumerManage.go

@ -41,6 +41,9 @@ func GetConsumer(name string) (consumer IConsumer) {
case "consumerJYESNJZX":
consumer = new(consumerJYESNJZX)
case "consumerAXYES2GDJKJC":
consumer = new(consumerAXYES2GDJKJC)
default:
consumer = nil
}

2
dbOperate/dbHelper.go

@ -57,7 +57,7 @@ func (the *DBHelper) dbOpen() error {
func (the *DBHelper) Exec(dbRecordSQL string) error {
if the.dbClient == nil {
if openErr := the.dbOpen(); openErr != nil {
//logger.Info("[%s]数据库链接失败,err=%v\n", time.Now().Format("2006-01-02 15:04:05.000"), openErr)
//logger.ConfigInfo("[%s]数据库链接失败,err=%v\n", time.Now().Format("2006-01-02 15:04:05.000"), openErr)
return openErr
}
}

10
dbOperate/httpHelper.go

@ -92,7 +92,9 @@ func (the *HttpHelper) Publish(messageBytes []byte) (string, error) {
func (the *HttpHelper) PublishWithHeader(messageBytes []byte, headers map[string]string) (string, error) {
resp, err := HttpPostWithHeader(the.Url, string(messageBytes), headers)
if err != nil {
log.Printf("数据推送异常 err=%s,resp=%s", err.Error(), resp)
log.Printf("数据推送,异常 err=%s,resp=%s", err.Error(), resp)
} else {
log.Printf("数据推送,正常 resp=%s", resp)
}
return resp, err
}
@ -178,10 +180,10 @@ func HttpPostWithHeader(url string, queryBody string, headers map[string]string)
req.Header.Add(k, v)
}
fmt.Printf("http post 开始请求,%s,\n,%v \n", url, req)
log.Printf("http post 开始请求,%s,\n,%v \n", url, req)
resp, err := client.Do(req)
if err != nil {
fmt.Println("请求POST异常 ", err, resp)
log.Println("请求POST异常 ", err, resp)
return "", err
}
defer resp.Body.Close()
@ -190,7 +192,7 @@ func HttpPostWithHeader(url string, queryBody string, headers map[string]string)
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("请求ReadAll异常 ", err, resp)
log.Println("请求ReadAll异常 ", err, resp)
return "", err
}
return string(body), err

49
utils/signByGDJKJC.go

@ -0,0 +1,49 @@
package utils
import (
"crypto/sha1"
"fmt"
"log"
"os"
"time"
)
func GetSHA1(rawStr string) (result string) {
//s := "sha2 this string"
/*
生成一个hash的模式是sha1.New()
sha1.Write(bytes)
sha1.Sum()
*/
h := sha1.New()
h.Write([]byte(rawStr))
bs := h.Sum(nil)
fmt.Printf("%x \n", bs)
result = fmt.Sprintf("%X", bs)
return
}
func GetSHA1FromFile(fileName string) (result string) {
content, err := os.ReadFile(fileName)
log.Printf("err=%v", err)
h := sha1.New()
h.Write(content)
bs := h.Sum(nil)
result = fmt.Sprintf("%X", bs)
log.Printf("SHA1_file=%s", result)
return
}
// GetRnd 获取当前时间戳 到秒
func GetRnd() (rnd string) {
t := time.Now().Unix()
log.Printf("%d", t)
return
}
func GetSign(postStr, rnd, AppKey, AppSecret string) (sign string) {
return GetSHA1(postStr + rnd + AppKey + AppSecret)
}
Loading…
Cancel
Save