10 changed files with 1059 additions and 3 deletions
@ -0,0 +1,185 @@ |
|||
package adaptors |
|||
|
|||
import ( |
|||
"encoding/json" |
|||
"fmt" |
|||
"goInOut/consumers/HBJCAS" |
|||
"goInOut/consumers/HBJCAS/protoFiles_hb" |
|||
"goInOut/dbOperate" |
|||
"goInOut/models" |
|||
"google.golang.org/protobuf/proto" |
|||
"log" |
|||
"math" |
|||
"strconv" |
|||
"strings" |
|||
"time" |
|||
) |
|||
|
|||
// Adaptor_ZWYES_HBGL 知物云的测点数据 同步到 河北省公路基础设施健康监测平台
|
|||
type Adaptor_ZWYES_HBGL struct { |
|||
//传感器code转换信息
|
|||
PointInfo map[int64]map[int64]int64 |
|||
StructInfo map[int64]int64 |
|||
//一些必要信息
|
|||
Info map[string]string |
|||
Redis *dbOperate.RedisHelper |
|||
} |
|||
|
|||
func (the Adaptor_ZWYES_HBGL) Transform(structId int64, factorId int, rawMsg string) []NeedPush { |
|||
esAggDateHistogram := HBJCAS.EsThemeAggDateHistogram{} |
|||
var needPush []NeedPush |
|||
err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram) |
|||
if err != nil { |
|||
return nil |
|||
} |
|||
|
|||
Payload := the.EsAggTopToHBJCAS(structId, factorId, esAggDateHistogram) |
|||
if len(Payload) == 0 { |
|||
return needPush |
|||
} |
|||
|
|||
needPush = append(needPush, NeedPush{ |
|||
Payload: Payload, |
|||
}) |
|||
return needPush |
|||
} |
|||
|
|||
func (the Adaptor_ZWYES_HBGL) EsAggTopToHBJCAS(structId int64, factorId int, esAggs HBJCAS.EsThemeAggDateHistogram) (result []byte) { |
|||
buckets := esAggs.Aggregations.GroupSensor.Buckets |
|||
if len(buckets) == 0 { |
|||
log.Printf("[s=%d,f=%d] ,es agg数据数量==0", structId, factorId) |
|||
return |
|||
} |
|||
//设施唯一编码(省平台)
|
|||
uniqueCode := the.getUniqueCode(structId) |
|||
if uniqueCode == 0 { |
|||
log.Printf("structId=%d,无匹配省平台uniqueCode", structId) |
|||
return |
|||
} |
|||
//数据汇总
|
|||
complexData := &protoFiles_hb.ComplexData{} |
|||
for _, sensorBucket := range buckets { |
|||
sensorId := sensorBucket.Key |
|||
for _, dateBucket := range sensorBucket.GroupDate.Buckets { |
|||
//优先redis获取
|
|||
station := models.Station{} |
|||
k1 := fmt.Sprintf("station:%d", sensorId) |
|||
errRedis := the.Redis.GetObj(k1, &station) |
|||
if errRedis != nil { |
|||
log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签异常", structId, factorId, sensorId) |
|||
continue |
|||
} |
|||
monitorCode := the.getPointCodeFromLabel(station.Labels) |
|||
if monitorCode == 0 { |
|||
log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签,信息转换int64异常,跳过", structId, factorId, sensorId) |
|||
continue |
|||
} |
|||
|
|||
dataDefinition := &protoFiles_hb.DataDefinition{ |
|||
DataType: protoFiles_hb.DataType_STATISTICS, |
|||
UniqueCode: fmt.Sprintf("%d", uniqueCode), |
|||
DataBody: the.EsAgg2StatisticData(factorId, monitorCode, dateBucket), |
|||
} |
|||
complexData.SensorData = append(complexData.SensorData, dataDefinition) |
|||
} |
|||
} |
|||
v, _ := json.Marshal(complexData) |
|||
log.Printf("[s:%d,f:%d] 特征数据=> %s", structId, factorId, v) |
|||
result, _ = proto.Marshal(complexData) |
|||
return result |
|||
} |
|||
|
|||
func (the Adaptor_ZWYES_HBGL) getMonitorTypeByFactorId(factorId int) protoFiles_hb.MonitoryType { |
|||
//拱顶沉降 102 桥墩转角 1914 挠度 1917 横向加速度 1919 纵向加速度 1920
|
|||
switch factorId { |
|||
case 102: |
|||
return protoFiles_hb.MonitoryType_GDCJ |
|||
case 1914: |
|||
return protoFiles_hb.MonitoryType_INC |
|||
case 1917: |
|||
return protoFiles_hb.MonitoryType_HPT |
|||
case 1919: |
|||
return protoFiles_hb.MonitoryType_VIB |
|||
case 1920: |
|||
return protoFiles_hb.MonitoryType_VIB |
|||
default: |
|||
log.Printf("factorId=%d,无匹配的MonitorType", factorId) |
|||
return protoFiles_hb.MonitoryType_CMM |
|||
} |
|||
} |
|||
|
|||
func (the Adaptor_ZWYES_HBGL) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket HBJCAS.BucketsXY) *protoFiles_hb.DataDefinition_StatisticData { |
|||
Atime := dateBucket.KeyAsString.Add(-8 * time.Hour).UnixMilli() |
|||
maxAbsoluteValueX := max(math.Abs(dateBucket.X.Max), math.Abs(dateBucket.X.Min)) |
|||
avgValueX := dateBucket.X.Avg |
|||
rootMeanSquareX := math.Sqrt(dateBucket.X.SumOfSquares / float64(dateBucket.X.Count)) |
|||
|
|||
maxAbsoluteValueY := max(math.Abs(dateBucket.Y.Max), math.Abs(dateBucket.Y.Min)) |
|||
avgValueY := dateBucket.Y.Avg |
|||
rootMeanSquareY := math.Sqrt(dateBucket.Y.SumOfSquares / float64(dateBucket.Y.Count)) |
|||
|
|||
monitoryType := the.getMonitorTypeByFactorId(factorId) |
|||
dataDefinitionStatisticData := &protoFiles_hb.DataDefinition_StatisticData{ |
|||
StatisticData: &protoFiles_hb.StatisticData{ |
|||
MonitorType: monitoryType, |
|||
MonitorCode: monitorCode, //测点唯一编码
|
|||
EventTime: Atime, |
|||
Interval: 60 * 1000, |
|||
}, |
|||
} |
|||
|
|||
//拱顶沉降 102 桥墩转角 1914 挠度 1917 横向加速度 1919 纵向加速度 1920
|
|||
switch factorId { |
|||
case 102: //拱顶沉降
|
|||
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Gdcj{Gdcj: &protoFiles_hb.GDCJStatistic{ |
|||
MaxAbsoluteValue: float32(maxAbsoluteValueX), |
|||
AvgValue: float32(avgValueX), |
|||
RootMeanSquare: float32(rootMeanSquareX), |
|||
TotalAbsoluteValue: float32(dateBucket.X.Max - dateBucket.X.Min), |
|||
}} |
|||
case 1914: //桥墩转角
|
|||
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Inc{Inc: &protoFiles_hb.INCStatistic{ |
|||
MaxAbsoluteValueX: float32(maxAbsoluteValueX), |
|||
AvgValueX: float32(avgValueX), |
|||
RootMeanSquareX: float32(rootMeanSquareX), |
|||
MaxAbsoluteValueY: float32(maxAbsoluteValueY), |
|||
AvgValueY: float32(avgValueY), |
|||
RootMeanSquareY: float32(rootMeanSquareY), |
|||
}} |
|||
case 1917: //挠度
|
|||
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Hpt{Hpt: &protoFiles_hb.HPTStatistic{ |
|||
MaxAbsoluteValue: float32(maxAbsoluteValueX), |
|||
AvgValue: float32(avgValueX), |
|||
RootMeanSquare: float32(rootMeanSquareX), |
|||
}} |
|||
case 1919, 1920: //振动
|
|||
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Vib{Vib: &protoFiles_hb.VIBStatistic{ |
|||
MaxAbsoluteValue: float32(maxAbsoluteValueX), |
|||
RootMeanSquare: float32(rootMeanSquareY), |
|||
}} |
|||
} |
|||
return dataDefinitionStatisticData |
|||
} |
|||
|
|||
func (the Adaptor_ZWYES_HBGL) getUniqueCode(structId int64) (uniqueCode int64) { |
|||
if v, ok := the.StructInfo[structId]; ok { |
|||
uniqueCode = v |
|||
} |
|||
return uniqueCode |
|||
} |
|||
|
|||
func (the Adaptor_ZWYES_HBGL) getPointCodeFromLabel(label string) int64 { |
|||
//解析label {13010600001}
|
|||
pointUniqueCode := int64(0) |
|||
if len(label) > 2 { |
|||
newLabel := strings.TrimLeft(label, "{") |
|||
str := strings.TrimRight(newLabel, "}") |
|||
codeInt64, err := strconv.ParseInt(str, 10, 64) |
|||
if err != nil { |
|||
log.Printf("测点标签转换异常[%s]", label) |
|||
} |
|||
pointUniqueCode = codeInt64 |
|||
} |
|||
|
|||
return pointUniqueCode |
|||
} |
@ -0,0 +1,106 @@ |
|||
consumer: consumerZWYHBJCAS |
|||
ioConfig: |
|||
in: |
|||
http: |
|||
url: https://esproxy.anxinyun.cn/savoir_themes/_search |
|||
out: |
|||
mqtt: |
|||
host: 123.249.81.52 |
|||
port: 8883 |
|||
userName: bs1321 |
|||
password: 9$TyND#ec$aZFfcl |
|||
clientId: zhangjiakouJTYS |
|||
topics: |
|||
- t/province/1321 |
|||
monitor: |
|||
#振动是触发式,数据迟缓 cron10min也改成1小时一次 最多上报6条,不进行10min实时上报 |
|||
cron10min: 6/10 * * * * |
|||
#普通类型 特征数据 |
|||
cron1hour: 45 0/1 * * * |
|||
#推送摄像机状态 1小时 |
|||
camera1hour: 0 0/1 * * * |
|||
#普推送健康度 24小时,每天8点执行 |
|||
health24hour: 0 8 * * * |
|||
info: |
|||
urlIndex: https://hbjcas.hebitt.com/pms/api/v2/bsi/ |
|||
secretKey: CrAd7tkSDXKt7dyk6ueY4OeWRnSHJhUa |
|||
rc4key: CrAd7tkSDXKt7dyk6ueY4OeWRnSHJhUa3Al3 |
|||
systemID: 1321 |
|||
queryComponent: |
|||
redis: |
|||
address: 10.8.30.160:30379 |
|||
#结构物id对应 |
|||
structInfo: |
|||
8926: 130830 |
|||
8928: 130831 |
|||
8929: 130832 |
|||
8930: 130833 |
|||
8921: 130834 |
|||
8922: 130835 |
|||
8923: 130836 |
|||
8931: 130837 |
|||
8932: 130838 |
|||
8936: 136137 |
|||
8940: 136139 |
|||
8939: 136141 |
|||
8938: 136142 |
|||
8935: 136144 |
|||
#点位id对应信息 |
|||
pointInfo: #测点类型支持 桥墩倾斜 15 裂缝18 支座位移20 桥面振动28 |
|||
#定义http接口用于对外调用 |
|||
httpServer: 0.0.0.0:8425 |
|||
#需要上报健康度的(桥梁|隧道|边坡)唯一编码 |
|||
codeInfo: |
|||
- 130830 |
|||
- 130831 |
|||
- 130832 |
|||
- 130833 |
|||
- 130834 |
|||
- 130835 |
|||
- 130836 |
|||
- 130837 |
|||
- 130838 |
|||
- 136137 |
|||
- 136138 |
|||
- 136139 |
|||
- 136140 |
|||
- 136141 |
|||
- 136142 |
|||
- 136143 |
|||
- 136144 |
|||
- 136145 |
|||
cameraInfo: |
|||
- 13083099901 |
|||
- 13083099902 |
|||
- 13083199903 |
|||
- 13083199904 |
|||
- 13083299905 |
|||
- 13083299906 |
|||
- 13083399907 |
|||
- 13083399908 |
|||
- 13083499909 |
|||
- 13083599910 |
|||
- 13083699911 |
|||
- 13083699912 |
|||
- 13083799913 |
|||
- 13083799914 |
|||
- 13083899915 |
|||
- 13083899916 |
|||
- 13613799917 |
|||
- 13613799918 |
|||
- 13613899919 |
|||
- 13613899920 |
|||
- 13613999921 |
|||
- 13613999922 |
|||
- 13614099923 |
|||
- 13614099924 |
|||
- 13614199925 |
|||
- 13614199926 |
|||
- 13614299927 |
|||
- 13614299928 |
|||
- 13614399929 |
|||
- 13614399930 |
|||
- 13614499931 |
|||
- 13614499932 |
|||
- 13614599933 |
|||
- 13614599934 |
@ -0,0 +1,37 @@ |
|||
consumer: consumerAXYES2GDJKJC |
|||
ioConfig: |
|||
in: |
|||
http: |
|||
url: https://esproxy.anxinyun.cn/anxincloud_themes/_search |
|||
out: |
|||
http: |
|||
url: https://bmsapi.gdjkjc.cn:13579/Data/AddStatData |
|||
method: "post" |
|||
monitor: |
|||
#振动是触发式,数据迟缓 cron10min也改成1小时一次 最多上报6条,不进行10min实时上报 |
|||
cron10min: 25 0/1 * * * #6/10 * * * * |
|||
#普通类型 特征数据 |
|||
cron1hour: 53 0/1 * * * |
|||
info: |
|||
5450: #隆江大桥 |
|||
appKey: db43bc5d74534348 |
|||
appSecret: 162c0a92f089464eaf9349358af4a830 |
|||
5455: #螺河特大桥 |
|||
appKey: 11b8f81901134570 |
|||
appSecret: 1505ef9480e4491bb578e2d7d9781620 |
|||
5456: #螺河东大桥 |
|||
appKey: d20477c3247b4012 |
|||
appSecret: 3d9bfb809be545bba11af17a88c81eb3 |
|||
queryComponent: |
|||
redis: |
|||
address: 10.8.30.160:30379 |
|||
#结构物id对应 |
|||
structInfo: |
|||
bridge: |
|||
5450: G15445224L1120/G15445224R1119 #隆江大桥 |
|||
#5455: G15441581L1320/G15441581R1321 #螺河特大桥 |
|||
#5456: G15441581L1310/G15441581R1311 #螺河东大桥 |
|||
slope: #隧道无特征数据 预留 |
|||
#5452: project11223- |
|||
|
|||
|
@ -0,0 +1,97 @@ |
|||
package HBJCAS |
|||
|
|||
type UploadBody struct { |
|||
Data []interface{} `json:"data"` |
|||
} |
|||
|
|||
//摄像机状态
|
|||
type CameraInfo struct { |
|||
PointUniqueCode int64 `json:"pointUniqueCode"` |
|||
Online int `json:"online"` |
|||
} |
|||
|
|||
//桥梁/隧道健康度
|
|||
type HealthInfo struct { |
|||
UniqueCode int `json:"uniqueCode"` |
|||
EntireHealthLevel int `json:"entireHealthLevel"` |
|||
ComponentHealthLevel int `json:"componentHealthLevel"` |
|||
EvaluateTime int64 `json:"evaluateTime"` |
|||
Remark string `json:"remark,omitempty"` |
|||
} |
|||
|
|||
//报警信息内容
|
|||
type WarningInfo struct { |
|||
AlarmId string `json:"alarmId"` |
|||
UniqueCode int `json:"uniqueCode"` |
|||
PointUniqueCode int64 `json:"pointUniqueCode"` |
|||
AlarmLevel int `json:"alarmLevel"` |
|||
MonitorValue string `json:"monitorValue"` |
|||
Unit string `json:"unit"` |
|||
AlarmStartTime int64 `json:"alarmStartTime"` |
|||
ReportToProvinceTime int64 `json:"reportToProvinceTime"` |
|||
ReportToProvinceUser string `json:"reportToProvinceUser"` |
|||
ReportToProvinceUserTel string `json:"reportToProvinceUserTel"` |
|||
AlarmStatus string `json:"alarmStatus"` |
|||
HandleTime int64 `json:"handleTime,omitempty"` |
|||
HandleUser string `json:"handleUser,omitempty"` |
|||
HandleUserTel string `json:"handleUserTel,omitempty"` |
|||
HandleContent string `json:"handleContent,omitempty"` |
|||
Test bool `json:"test,omitempty"` |
|||
} |
|||
|
|||
//特殊事件信息
|
|||
type SpecialEventInfo struct { |
|||
SpecialEventId string `json:"specialEventId"` |
|||
UniqueCode int `json:"uniqueCode"` |
|||
EventType string `json:"eventType"` |
|||
EventName string `json:"eventName"` |
|||
EventContent string `json:"eventContent"` |
|||
EventOccurTime int64 `json:"eventOccurTime"` |
|||
ReportToProvinceTime int64 `json:"reportToProvinceTime"` |
|||
ReportToProvinceUser string `json:"reportToProvinceUser"` |
|||
ReportToProvinceUserTel string `json:"reportToProvinceUserTel"` |
|||
Status string `json:"status"` |
|||
HandleTime int64 `json:"handleTime,omitempty"` |
|||
HandleUser string `json:"handleUser,omitempty"` |
|||
HandleUserTel string `json:"handleUserTel,omitempty"` |
|||
HandleContent string `json:"handleContent,omitempty"` |
|||
Test bool `json:"test,omitempty"` |
|||
} |
|||
|
|||
//特殊事件预案信息
|
|||
type SpecialEventPlanInfo struct { |
|||
UniqueCode int `json:"uniqueCode"` |
|||
File interface{} `json:"file"` |
|||
PlanContactsUser string `json:"planContactsUser"` |
|||
PlanContactsUserTel string `json:"planContactsUserTel"` |
|||
PlanContent string `json:"planContent,omitempty"` |
|||
PlanCreateTime int64 `json:"planCreateTime"` |
|||
PlanName string `json:"planName"` |
|||
PlanRemark string `json:"planRemark,omitempty"` |
|||
PlanType string `json:"planType"` |
|||
ReportToProvinceTime int64 `json:"reportToProvinceTime"` |
|||
ReportToProvinceUser string `json:"reportToProvinceUser"` |
|||
ReportToProvinceUserTel string `json:"reportToProvinceUserTel"` |
|||
Test bool `json:"test,omitempty"` |
|||
} |
|||
|
|||
//特殊事件预案信息删除
|
|||
type EventPlanDel struct { |
|||
UniqueCode int `json:"uniqueCode"` |
|||
PlanType string `json:"planType"` |
|||
} |
|||
|
|||
//报告信息
|
|||
type ReportInfo struct { |
|||
UniqueCode int `json:"uniqueCode"` |
|||
File interface{} `json:"file"` |
|||
ReportName string `json:"reportName"` |
|||
SpecialEventId string `json:"specialEventId,omitempty"` |
|||
ReportTime int64 `json:"reportTime"` |
|||
ReportToProvinceTime int64 `json:"reportToProvinceTime"` |
|||
ReportToProvinceUser string `json:"reportToProvinceUser"` |
|||
ReportToProvinceUserTel string `json:"reportToProvinceUserTel"` |
|||
ReportType string `json:"reportType"` |
|||
UploadType string `json:"uploadType,omitempty"` |
|||
Test bool `json:"test,omitempty"` |
|||
} |
@ -0,0 +1,561 @@ |
|||
package consumers |
|||
|
|||
import ( |
|||
"crypto/hmac" |
|||
"crypto/rc4" |
|||
"encoding/base64" |
|||
"encoding/hex" |
|||
"encoding/json" |
|||
"fmt" |
|||
"github.com/tjfoc/gmsm/sm3" |
|||
"goInOut/adaptors" |
|||
"goInOut/consumers/HBJCAS" |
|||
"goInOut/dbOperate" |
|||
"goInOut/monitors" |
|||
"goInOut/utils" |
|||
"gopkg.in/yaml.v3" |
|||
"io" |
|||
"io/ioutil" |
|||
"log" |
|||
"net/http" |
|||
"strings" |
|||
"time" |
|||
) |
|||
|
|||
type consumerZWYHBJCAS struct { |
|||
//数据缓存管道
|
|||
ch chan []adaptors.NeedPush |
|||
//具体配置
|
|||
Info HBJCAS.ConfigFile |
|||
Seq int64 |
|||
SeqDate string |
|||
InHttp *dbOperate.HttpHelper |
|||
outMqtt *dbOperate.MqttHelper |
|||
monitor *monitors.CommonMonitor |
|||
infoRedis *dbOperate.RedisHelper |
|||
} |
|||
|
|||
func (the *consumerZWYHBJCAS) LoadConfigJson(cfgStr string) { |
|||
// 将 yaml 格式的数据解析到结构体中
|
|||
err := yaml.Unmarshal([]byte(cfgStr), &the.Info) |
|||
if err != nil { |
|||
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) |
|||
panic(err) |
|||
} |
|||
} |
|||
|
|||
func (the *consumerZWYHBJCAS) Initial(cfg string) error { |
|||
the.LoadConfigJson(cfg) |
|||
err := the.InputInitial() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
err = the.OutputInitial() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
err = the.infoComponentInitial() |
|||
return err |
|||
} |
|||
func (the *consumerZWYHBJCAS) InputInitial() error { |
|||
the.ch = make(chan []adaptors.NeedPush, 200) |
|||
//数据入口
|
|||
the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""} |
|||
the.monitor = &monitors.CommonMonitor{ |
|||
MonitorHelper: &monitors.MonitorHelper{}, |
|||
} |
|||
the.Seq = 0 |
|||
the.SeqDate = time.Now().Format("2006-01-02") |
|||
the.monitor.Start() |
|||
for taskName, cron := range the.Info.Monitor { |
|||
switch taskName { |
|||
case "cron10min": |
|||
the.monitor.RegisterTask(cron, the.getEs10minAggData) |
|||
case "cron1hour": |
|||
the.monitor.RegisterTask(cron, the.getEs1HourAggData) |
|||
case "camera1hour": |
|||
the.monitor.RegisterTask(cron, the.UploadCamInfo) |
|||
case "health24hour": |
|||
the.monitor.RegisterTask(cron, the.UploadHeaInfo) |
|||
default: |
|||
log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
func (the *consumerZWYHBJCAS) OutputInitial() error { |
|||
//数据出口
|
|||
the.outMqtt = dbOperate.MqttInitial( |
|||
the.Info.IoConfig.Out.Mqtt.Host, |
|||
the.Info.IoConfig.Out.Mqtt.Port, |
|||
the.Info.IoConfig.Out.Mqtt.ClientId, |
|||
the.Info.IoConfig.Out.Mqtt.UserName, |
|||
the.Info.IoConfig.Out.Mqtt.Password, |
|||
true, //按照具体项目来
|
|||
"consumers/HBJCAS/ssl/cacert.pem", |
|||
"consumers/HBJCAS/ssl/client-cert.pem", |
|||
"consumers/HBJCAS/ssl/client-key.pem") |
|||
return nil |
|||
} |
|||
|
|||
func (the *consumerZWYHBJCAS) infoComponentInitial() error { |
|||
//数据出口
|
|||
addr := the.Info.QueryComponent.Redis.Address |
|||
the.infoRedis = dbOperate.NewRedisHelper("", addr) |
|||
return nil |
|||
} |
|||
func (the *consumerZWYHBJCAS) Work() { |
|||
go func() { |
|||
for { |
|||
needPushList := <-the.ch |
|||
if len(the.ch) > 0 { |
|||
log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) |
|||
} |
|||
|
|||
for _, push := range needPushList { |
|||
if push.Topic != "" { |
|||
the.outMqtt.Publish(push.Topic, push.Payload) |
|||
continue |
|||
} |
|||
|
|||
//没有标记topic 的 按照配置文件里面的推送
|
|||
for _, topic := range the.Info.IoConfig.Out.Mqtt.Topics { |
|||
the.outMqtt.Publish(topic, push.Payload) |
|||
} |
|||
|
|||
} |
|||
|
|||
time.Sleep(100 * time.Millisecond) |
|||
} |
|||
}() |
|||
go func() { |
|||
if the.Info.HttpServer != "" { |
|||
log.Printf("打开本地http接口服务[%s]\n", the.Info.HttpServer) |
|||
the.StartHttp() |
|||
} |
|||
}() |
|||
} |
|||
|
|||
func (the *consumerZWYHBJCAS) getAdaptor() (adaptor adaptors.Adaptor_ZWYES_HBGL) { |
|||
|
|||
return adaptors.Adaptor_ZWYES_HBGL{ |
|||
Redis: the.infoRedis, |
|||
} |
|||
} |
|||
|
|||
func (the *consumerZWYHBJCAS) getStructIds() []int64 { |
|||
var structIds []int64 |
|||
for strutId, _ := range the.Info.StructInfo { |
|||
structIds = append(structIds, strutId) |
|||
} |
|||
return structIds |
|||
} |
|||
|
|||
func (the *consumerZWYHBJCAS) getEs1HourAggData() { |
|||
start, end := utils.GetTimeRangeByHour(-1) |
|||
log.Printf("查询数据时间范围 %s - %s", start, end) |
|||
hourFactorIds := []int{15, 18, 20} |
|||
structIds := the.getStructIds() |
|||
for _, structId := range structIds { |
|||
for _, factorId := range hourFactorIds { |
|||
esQuery := the.getESQueryStrByHour(structId, factorId, start, end) |
|||
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} |
|||
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) |
|||
|
|||
adaptor := the.getAdaptor() |
|||
adaptor.PointInfo = the.Info.PointInfo |
|||
adaptor.StructInfo = the.Info.StructInfo |
|||
needPushes := adaptor.Transform(structId, factorId, esAggResultStr) |
|||
for i := range needPushes { |
|||
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) |
|||
} |
|||
|
|||
if len(needPushes) > 0 { |
|||
the.ch <- needPushes |
|||
} |
|||
} |
|||
} |
|||
|
|||
} |
|||
|
|||
func (the *consumerZWYHBJCAS) getEs10minAggData() { |
|||
//utils.GetTimeRangeBy10min() 由于振动数据实时性问题 改用一小时统一上报
|
|||
start, end := utils.GetTimeRangeByHour(-1) |
|||
log.Printf("查询10min数据时间范围 %s - %s", start, end) |
|||
factorIds := []int{28, 592} |
|||
structIds := the.getStructIds() |
|||
for _, structId := range structIds { |
|||
for _, factorId := range factorIds { |
|||
esQuery := the.getESQueryStrBy10min(structId, factorId, start, end) |
|||
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} |
|||
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) |
|||
|
|||
adaptor := the.getAdaptor() |
|||
adaptor.PointInfo = the.Info.PointInfo |
|||
adaptor.StructInfo = the.Info.StructInfo |
|||
needPushes := adaptor.Transform(structId, factorId, esAggResultStr) |
|||
for i := range needPushes { |
|||
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) |
|||
log.Printf("topic[%s],Payload=> %s", needPushes[i].Topic, hex.EncodeToString(needPushes[i].Payload)) |
|||
} |
|||
|
|||
if len(needPushes) > 0 { |
|||
the.ch <- needPushes |
|||
} |
|||
} |
|||
} |
|||
|
|||
} |
|||
|
|||
func (the *consumerZWYHBJCAS) crc16rc4(transBytes []byte) []byte { |
|||
resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes) |
|||
needRC4 := append(transBytes, resultByCrc16...) |
|||
rc4KeyStr, ok := the.Info.OtherInfo["rc4key"] |
|||
if !ok { |
|||
log.Panicf("未配置 rc4key") |
|||
} |
|||
rc4Key := []byte(rc4KeyStr) //the.RC4Key
|
|||
// 加密操作
|
|||
dest1 := make([]byte, len(needRC4)) |
|||
rc4.NewCipher(rc4Key) |
|||
cipher1, _ := rc4.NewCipher(rc4Key) |
|||
cipher1.XORKeyStream(dest1, needRC4) |
|||
return dest1 |
|||
} |
|||
|
|||
func (the *consumerZWYHBJCAS) getESQueryStrByHour(structureId int64, factorId int, start, end string) string { |
|||
aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) |
|||
esQuery := fmt.Sprintf(` |
|||
{ |
|||
"size": 0, |
|||
"query": { |
|||
"bool": { |
|||
"must": [ |
|||
{ |
|||
"term": { |
|||
"structure": { |
|||
"value": %d |
|||
} |
|||
} |
|||
}, |
|||
{ |
|||
"term": { |
|||
"factor": { |
|||
"value": %d |
|||
} |
|||
} |
|||
}, |
|||
{ |
|||
"range": { |
|||
"collect_time": { |
|||
"gte": "%s", |
|||
"lt": "%s" |
|||
} |
|||
} |
|||
} |
|||
] |
|||
} |
|||
}, |
|||
"aggs": { |
|||
"groupSensor": { |
|||
"terms": { |
|||
"field": "sensor" |
|||
}, |
|||
"aggs": { |
|||
"groupDate": { |
|||
"date_histogram": { |
|||
"field": "collect_time", |
|||
"interval": "1h", |
|||
"time_zone": "Asia/Shanghai", |
|||
"min_doc_count": 1 |
|||
}, |
|||
"aggs": %s |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
`, structureId, factorId, start, end, aggSubSql) |
|||
|
|||
return esQuery |
|||
} |
|||
|
|||
func (the *consumerZWYHBJCAS) getESQueryStrBy10min(structureId int64, factorId int, start, end string) string { |
|||
aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) |
|||
esQuery := fmt.Sprintf(` |
|||
{ |
|||
"size": 0, |
|||
"query": { |
|||
"bool": { |
|||
"must": [ |
|||
{ |
|||
"term": { |
|||
"structure": { |
|||
"value": %d |
|||
} |
|||
} |
|||
}, |
|||
{ |
|||
"term": { |
|||
"factor": { |
|||
"value": %d |
|||
} |
|||
} |
|||
}, |
|||
{ |
|||
"range": { |
|||
"collect_time": { |
|||
"gte": "%s", |
|||
"lte": "%s" |
|||
} |
|||
} |
|||
} |
|||
] |
|||
} |
|||
}, |
|||
"aggs": { |
|||
"groupSensor": { |
|||
"terms": { |
|||
"field": "sensor" |
|||
}, |
|||
"aggs": { |
|||
"groupDate": { |
|||
"date_histogram": { |
|||
"field": "collect_time", |
|||
"interval": "10m", |
|||
"time_zone": "Asia/Shanghai", |
|||
"min_doc_count": 1 |
|||
}, |
|||
"aggs": %s |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
`, structureId, factorId, start, end, aggSubSql) |
|||
|
|||
return esQuery |
|||
} |
|||
|
|||
func (the *consumerZWYHBJCAS) getStructureId() string { |
|||
structureId, ok := the.Info.OtherInfo["structureId"] |
|||
if !ok { |
|||
log.Panicf("无法识别有效的structureId") |
|||
} |
|||
return structureId |
|||
} |
|||
|
|||
//获取配置在yaml文件中的cameraInfo对应的摄像机的状态
|
|||
func (the *consumerZWYHBJCAS) getCameraStatus() []interface{} { |
|||
cameraArr := the.Info.CameraInfo |
|||
cameras := make([]interface{}, 0) |
|||
for _, cameraId := range cameraArr { |
|||
//增加根据cameraId获取对应摄像机状态
|
|||
camera := HBJCAS.CameraInfo{ |
|||
PointUniqueCode: cameraId, |
|||
Online: 1, |
|||
} |
|||
cameras = append(cameras, camera) |
|||
} |
|||
return cameras |
|||
} |
|||
|
|||
//获取配置在yaml文件中的codeInfo对应的需要上报健康度的(桥梁|隧道|边坡)的健康度
|
|||
func (the *consumerZWYHBJCAS) getCodeStatus() []interface{} { |
|||
infoArr := the.Info.CodeInfo |
|||
res := make([]interface{}, 0) |
|||
for _, info := range infoArr { |
|||
//增加根据code码获取对应摄像机状态
|
|||
nInfo := HBJCAS.HealthInfo{ |
|||
UniqueCode: info, |
|||
EntireHealthLevel: 0, |
|||
ComponentHealthLevel: 0, |
|||
EvaluateTime: time.Now().UnixNano() / 1e6, |
|||
} |
|||
res = append(res, nInfo) |
|||
} |
|||
return res |
|||
} |
|||
|
|||
// JWT头部
|
|||
type Header struct { |
|||
Typ string `json:"typ"` |
|||
Alg string `json:"alg"` |
|||
} |
|||
|
|||
// JWT载荷
|
|||
type Payload struct { |
|||
SystemID string `json:"systemId"` |
|||
Seq int64 `json:"seq"` |
|||
Timestamp int64 `json:"timestamp"` |
|||
} |
|||
|
|||
type Resp struct { |
|||
Code int `json:"code"` |
|||
Msg string `json:"msg"` |
|||
} |
|||
|
|||
func (the *consumerZWYHBJCAS) GenerateJWT() (string, error) { |
|||
// 创建头部
|
|||
header := Header{ |
|||
Typ: "JWT", |
|||
Alg: "SM3", |
|||
} |
|||
headerBytes, _ := json.Marshal(header) |
|||
headerBase64 := base64.StdEncoding.EncodeToString(headerBytes) |
|||
seq := int64(0) |
|||
if the.Seq != 0 { |
|||
if time.Now().Format("2006-01-02") != the.SeqDate { |
|||
the.Seq = 0 |
|||
} |
|||
} |
|||
seq = the.Seq |
|||
// 创建载荷
|
|||
payload := Payload{ |
|||
SystemID: the.Info.SystemId, |
|||
Seq: seq, |
|||
Timestamp: time.Now().UnixNano() / 1e6, // 当前时间戳,单位毫秒
|
|||
} |
|||
payloadBytes, _ := json.Marshal(payload) |
|||
payloadBase64 := base64.StdEncoding.EncodeToString(payloadBytes) |
|||
|
|||
// 创建签名
|
|||
message := headerBase64 + "." + payloadBase64 |
|||
var signatureBase64 string |
|||
h := sm3.New() |
|||
h.Write([]byte(message)) |
|||
secretKeyStr, ok := the.Info.OtherInfo["secretKey"] |
|||
if !ok { |
|||
log.Println("未配置 secretKey") |
|||
secretKeyStr = "" |
|||
} |
|||
secretKey := []byte(secretKeyStr) |
|||
signature := hmac.New(sm3.New, secretKey) |
|||
signature.Write([]byte(message)) |
|||
signatureBytes := signature.Sum(nil) |
|||
signatureBase64 = base64.StdEncoding.EncodeToString(signatureBytes) |
|||
// 组装JWT
|
|||
jwt := message + "." + signatureBase64 |
|||
return jwt, nil |
|||
} |
|||
|
|||
func (the *consumerZWYHBJCAS) UploadInfo(uploadType string) { |
|||
urlIndex, ok := the.Info.OtherInfo["urlIndex"] |
|||
if !ok { |
|||
log.Println("未配置省平台业务数据接口=============") |
|||
return |
|||
} |
|||
url := "" |
|||
var bodyInfo []interface{} |
|||
switch uploadType { |
|||
case "cameraInfo": |
|||
url = urlIndex + "cameraInfo/statusReport" |
|||
bodyInfo = the.getCameraStatus() |
|||
if len(bodyInfo) == 0 { |
|||
return |
|||
} |
|||
case "healthInfo": |
|||
url = urlIndex + "healthInfo/sync" |
|||
bodyInfo = the.getCodeStatus() |
|||
if len(bodyInfo) == 0 { |
|||
return |
|||
} |
|||
default: |
|||
return |
|||
} |
|||
tBody := HBJCAS.UploadBody{ |
|||
Data: bodyInfo, |
|||
} |
|||
jsonData, masErr := json.Marshal(tBody) |
|||
if masErr != nil { |
|||
fmt.Println(masErr) |
|||
return |
|||
} |
|||
payLoadStr := string(jsonData) |
|||
err := the.postInfo(url, payLoadStr) |
|||
if err != nil { |
|||
log.Printf("数据上报失败,err=%v\n", err) |
|||
} |
|||
|
|||
} |
|||
func (the *consumerZWYHBJCAS) UploadCamInfo() { |
|||
the.UploadInfo("cameraInfo") |
|||
} |
|||
|
|||
func (the *consumerZWYHBJCAS) UploadHeaInfo() { |
|||
the.UploadInfo("healthInfo") |
|||
} |
|||
|
|||
func (the *consumerZWYHBJCAS) postInfo(url, payloadStr string) error { |
|||
payload := strings.NewReader(payloadStr) |
|||
client := &http.Client{} |
|||
req, requestErr := http.NewRequest("POST", url, payload) |
|||
if requestErr != nil { |
|||
return requestErr |
|||
} |
|||
jwtRes, jwtErr := the.GenerateJWT() |
|||
if jwtErr != nil { |
|||
return jwtErr |
|||
} |
|||
auth := fmt.Sprintf("Bearer %s", jwtRes) |
|||
req.Header.Add("Content-Type", "application/json") |
|||
req.Header.Add("Authorization", auth) |
|||
res, clientErr := client.Do(req) |
|||
if clientErr != nil { |
|||
return clientErr |
|||
} |
|||
defer res.Body.Close() |
|||
|
|||
body, respErr := ioutil.ReadAll(res.Body) |
|||
if respErr != nil { |
|||
return respErr |
|||
} |
|||
var resp Resp |
|||
err := yaml.Unmarshal(body, &resp) |
|||
if err != nil { |
|||
log.Printf("接口返回[%s]转换失败 err=%v", string(body), err) |
|||
return err |
|||
} |
|||
the.Seq++ |
|||
if resp.Code != 100 { |
|||
log.Printf("接口[%s]返回非成功状态 code=%d,msg=[%s]", url, resp.Code, resp.Msg) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (the *consumerZWYHBJCAS) StartHttp() { |
|||
http.HandleFunc("/ping", RespHandle) |
|||
http.ListenAndServe(the.Info.HttpServer, nil) |
|||
} |
|||
|
|||
func RespHandle(w http.ResponseWriter, r *http.Request) { |
|||
_, err := ioutil.ReadAll(r.Body) |
|||
if err != nil && err != io.EOF { |
|||
fmt.Printf("read body content failed, err:[%s]\n", err.Error()) |
|||
return |
|||
} |
|||
fmt.Fprint(w, "Pong!") |
|||
|
|||
} |
|||
|
|||
func downloadFile(url string) ([]byte, error) { |
|||
// 发送GET请求下载文件
|
|||
response, err := http.Get(url) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("failed to download file[%s],err=%v", url, err) |
|||
} |
|||
defer response.Body.Close() |
|||
|
|||
// 检查响应状态
|
|||
if response.StatusCode != http.StatusOK { |
|||
return nil, fmt.Errorf("down file[%s] server returned: %v", url, response.Status) |
|||
} |
|||
|
|||
// 读取文件内容
|
|||
fileContent, err := io.ReadAll(response.Body) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("failed to read file[%s] content: %v", url, err) |
|||
} |
|||
|
|||
return fileContent, nil |
|||
} |
Loading…
Reference in new issue