16 changed files with 3680 additions and 2 deletions
@ -0,0 +1,8 @@ |
|||
# 默认忽略的文件 |
|||
/shelf/ |
|||
/workspace.xml |
|||
# 基于编辑器的 HTTP 客户端请求 |
|||
/httpRequests/ |
|||
# Datasource local storage ignored files |
|||
/dataSources/ |
|||
/dataSources.local.xml |
@ -0,0 +1,9 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<module type="WEB_MODULE" version="4"> |
|||
<component name="Go" enabled="true" /> |
|||
<component name="NewModuleRootManager"> |
|||
<content url="file://$MODULE_DIR$" /> |
|||
<orderEntry type="inheritedJdk" /> |
|||
<orderEntry type="sourceFolder" forTests="false" /> |
|||
</component> |
|||
</module> |
@ -0,0 +1,8 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<project version="4"> |
|||
<component name="ProjectModuleManager"> |
|||
<modules> |
|||
<module fileurl="file://$PROJECT_DIR$/.idea/goInOut.iml" filepath="$PROJECT_DIR$/.idea/goInOut.iml" /> |
|||
</modules> |
|||
</component> |
|||
</project> |
@ -0,0 +1,6 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<project version="4"> |
|||
<component name="VcsDirectoryMappings"> |
|||
<mapping directory="" vcs="Git" /> |
|||
</component> |
|||
</project> |
@ -0,0 +1,196 @@ |
|||
package adaptors |
|||
|
|||
import ( |
|||
"encoding/hex" |
|||
"encoding/json" |
|||
"fmt" |
|||
"goInOut/consumers/JYES_NJZX" |
|||
"goInOut/consumers/JYES_NJZX/protoDataFiles" |
|||
"goInOut/dbOperate" |
|||
"goInOut/models" |
|||
"google.golang.org/protobuf/proto" |
|||
"log" |
|||
"strings" |
|||
"time" |
|||
) |
|||
|
|||
// Adaptor_AXYES_NJZX 安心云依加尔山es 特征数据 to 南京智行平台
|
|||
type Adaptor_AXYES_NJZX struct { |
|||
//传感器code转换信息
|
|||
PointInfo map[int64]map[int64]int64 |
|||
StructInfo map[int64]int64 |
|||
//一些必要信息
|
|||
Info map[string]string |
|||
Redis *dbOperate.RedisHelper |
|||
} |
|||
|
|||
func (the *Adaptor_AXYES_NJZX) Transform(structId int64, factorId int, rawMsg string) []NeedPush { |
|||
//es查到的数据分装进结构体里面
|
|||
esAggDateHistogram := JYES_NJZX.EsThemeDateValue{} |
|||
var needPush []NeedPush |
|||
|
|||
err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram) |
|||
if err != nil { |
|||
return nil |
|||
} |
|||
|
|||
Payload := the.EsDataValueChangeToNJZX(structId, factorId, esAggDateHistogram) |
|||
if len(Payload) == 0 { |
|||
return needPush |
|||
} |
|||
|
|||
needPush = append(needPush, NeedPush{ |
|||
Payload: Payload, |
|||
}) |
|||
return needPush |
|||
} |
|||
|
|||
func (the *Adaptor_AXYES_NJZX) EsDataValueChangeToNJZX(structId int64, factorId int, esDataValue JYES_NJZX.EsThemeDateValue) (result []byte) { |
|||
buckets := esDataValue.Hits.Hits |
|||
//数据汇总
|
|||
complexData := &protoDataFiles.ComplexData{} |
|||
for _, sensorBucket := range buckets { |
|||
sensorId := sensorBucket.Source.SensorName //安心云de测点id
|
|||
|
|||
//优先redis获取
|
|||
station := models.Station{} |
|||
k1 := fmt.Sprintf("station:%d", sensorId) |
|||
errRedis := the.Redis.GetObj(k1, &station) |
|||
if errRedis != nil { |
|||
log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签异常", structId, factorId, sensorId) |
|||
continue |
|||
} |
|||
monitorCodeStr := the.getPointCodeFromLabel(station.Labels) |
|||
if monitorCodeStr == "" { |
|||
log.Printf("redis 获取[s:%d,f:%d]测点[%d],标签信息[%s]转换int64异常,跳过", structId, factorId, sensorId, station.Labels) |
|||
continue |
|||
} |
|||
|
|||
dataDefinition := the.ChangeToNJZXData(factorId, monitorCodeStr, sensorBucket) |
|||
|
|||
complexData.SensorData = append(complexData.SensorData, dataDefinition) |
|||
} |
|||
|
|||
//v, _ := json.Marshal(complexData)
|
|||
//log.Printf("[struct:%d,factor:%d] 特征数据=> %s", structId, factorId, v)
|
|||
result, _ = proto.Marshal(complexData) |
|||
log.Printf("[struct:%d,factor:%d] protobuf数据=> %s", structId, factorId, hex.EncodeToString(result)) |
|||
return result |
|||
} |
|||
|
|||
func (the *Adaptor_AXYES_NJZX) getMonitorTypeByFactorId(factorId int) protoDataFiles.MonitoryType { |
|||
//监测因素 2温湿度 4温度 18裂缝检测
|
|||
//103净空收敛 102拱顶沉降 96二次衬彻应变
|
|||
//107道床及拱腰结构沉降 156风速 578风向
|
|||
switch factorId { |
|||
case 2: //温湿度
|
|||
return protoDataFiles.MonitoryType_RHS |
|||
case 4: //温度
|
|||
return protoDataFiles.MonitoryType_TMP |
|||
case 18: //裂缝检测
|
|||
return protoDataFiles.MonitoryType_CRK |
|||
case 103: //净空收敛
|
|||
return protoDataFiles.MonitoryType_INC //无对应
|
|||
case 102: //拱顶沉降
|
|||
return protoDataFiles.MonitoryType_CRK //无对应
|
|||
case 96: //二次衬彻应变
|
|||
return protoDataFiles.MonitoryType_RSG |
|||
case 107: //道床及拱腰结构沉降
|
|||
return protoDataFiles.MonitoryType_VIB //无对应
|
|||
case 156: //风速
|
|||
return protoDataFiles.MonitoryType_WDS |
|||
case 578: //风向
|
|||
return protoDataFiles.MonitoryType_WDD |
|||
default: |
|||
log.Printf("factorId=%d,无匹配的MonitorType", factorId) |
|||
return protoDataFiles.MonitoryType_RHS |
|||
} |
|||
} |
|||
|
|||
func (the *Adaptor_AXYES_NJZX) parseTimeToTimestamp(timeStr string) (int64, error) { |
|||
// 解析时间字符串为 time.Time 对象
|
|||
parsedTime, err := time.Parse(time.RFC3339, timeStr) |
|||
if err != nil { |
|||
return 0, err |
|||
} |
|||
// 返回 Unix 时间戳(秒数)
|
|||
return parsedTime.Unix(), nil |
|||
} |
|||
|
|||
func (the *Adaptor_AXYES_NJZX) ChangeToNJZXData(factorId int, monitorCodeStr string, dateBucket JYES_NJZX.Hits) *protoDataFiles.SensorData { |
|||
|
|||
Atime, _ := the.parseTimeToTimestamp(dateBucket.Source.CollectTime) |
|||
monitoryType := the.getMonitorTypeByFactorId(factorId) |
|||
dataDefinitionData := &protoDataFiles.SensorData{ |
|||
MonitorType: monitoryType, |
|||
SensorNo: monitorCodeStr, |
|||
UpTime: Atime, |
|||
} |
|||
|
|||
switch factorId { |
|||
case 2: //温湿度
|
|||
dataDefinitionData.DataBody = &protoDataFiles.SensorData_Rhs{Rhs: &protoDataFiles.RHSRealTime{ |
|||
Temperature: []float32{float32(dateBucket.Source.Data["temperature"])}, |
|||
Humidity: []float32{float32(dateBucket.Source.Data["humidity"])}, |
|||
}} |
|||
case 4: //温度
|
|||
dataDefinitionData.DataBody = &protoDataFiles.SensorData_Tmp{Tmp: &protoDataFiles.TMPRealTime{ |
|||
Temperature: []float32{float32(dateBucket.Source.Data["temperature"])}, |
|||
}} |
|||
case 18: //裂缝检测
|
|||
dataDefinitionData.DataBody = &protoDataFiles.SensorData_Crk{Crk: &protoDataFiles.CRKRealTime{ |
|||
CrackWidth: []float32{float32(dateBucket.Source.Data["crack"])}, |
|||
}} |
|||
case 103: //净空收敛//123456789
|
|||
dataDefinitionData.DataBody = &protoDataFiles.SensorData_Rhs{Rhs: &protoDataFiles.RHSRealTime{ |
|||
Temperature: []float32{float32(dateBucket.Source.Data["crack"])}, |
|||
Humidity: []float32{float32(dateBucket.Source.Data["crack"])}, |
|||
}} |
|||
case 102: //拱顶沉降//123456789
|
|||
dataDefinitionData.DataBody = &protoDataFiles.SensorData_Rhs{Rhs: &protoDataFiles.RHSRealTime{ |
|||
Temperature: []float32{float32(dateBucket.Source.Data["crack"])}, |
|||
Humidity: []float32{float32(dateBucket.Source.Data["crack"])}, |
|||
}} |
|||
case 96: //二次衬彻应变
|
|||
dataDefinitionData.DataBody = &protoDataFiles.SensorData_Rsg{Rsg: &protoDataFiles.RSGRealTime{ |
|||
Strain: []float32{float32(dateBucket.Source.Data["strain"])}, |
|||
}} |
|||
case 107: //道床及拱腰结构沉降//123456789
|
|||
dataDefinitionData.DataBody = &protoDataFiles.SensorData_Rhs{Rhs: &protoDataFiles.RHSRealTime{ |
|||
Temperature: []float32{float32(dateBucket.Source.Data["crack"])}, |
|||
Humidity: []float32{float32(dateBucket.Source.Data["crack"])}, |
|||
}} |
|||
case 156: //风速
|
|||
dataDefinitionData.DataBody = &protoDataFiles.SensorData_Wds{Wds: &protoDataFiles.WDSRealTime{ |
|||
WindSpeed: []float32{float32(dateBucket.Source.Data["speed"])}, |
|||
}} |
|||
case 578: //风向
|
|||
dataDefinitionData.DataBody = &protoDataFiles.SensorData_Wdd{Wdd: &protoDataFiles.WDDRealTime{ |
|||
WindDirection: []float32{float32(dateBucket.Source.Data["direction"])}, |
|||
}} |
|||
} |
|||
|
|||
return dataDefinitionData |
|||
} |
|||
|
|||
func (the *Adaptor_AXYES_NJZX) getUniqueCode(structId int64) (uniqueCode int64) { |
|||
if v, ok := the.StructInfo[structId]; ok { |
|||
uniqueCode = v |
|||
} |
|||
return uniqueCode |
|||
} |
|||
|
|||
func (the *Adaptor_AXYES_NJZX) getPointCodeFromLabel(label string) string { |
|||
//解析label {code:wd01}
|
|||
pointUniqueCode := "" |
|||
if len(label) > 3 { |
|||
newLabel := strings.TrimLeft(label, "{code:") |
|||
str := strings.TrimRight(newLabel, "}") |
|||
if str == "" { |
|||
log.Printf("测点标签转换异常[%s]", label) |
|||
} |
|||
pointUniqueCode = str |
|||
} |
|||
|
|||
return pointUniqueCode |
|||
} |
@ -0,0 +1,30 @@ |
|||
consumer: consumerJYESNJZX |
|||
ioConfig: |
|||
in: |
|||
http: |
|||
url: https://esproxy.anxinyun.cn/savoir_themes/_search |
|||
out: |
|||
mqtt: |
|||
host: 120.205.24.17 |
|||
port: 1883 |
|||
userName: |
|||
password: |
|||
clientId: bridge_goinout |
|||
topics: |
|||
t/t6540000001/rt |
|||
monitor: |
|||
cron10min: 8/10 * * * * #31 0/1 * * * # |
|||
info: |
|||
rc4key: t/gzgyy0219 |
|||
queryComponent: |
|||
redis: |
|||
address: 10.8.30.160:30379 #按照实际项目来 |
|||
#结构物id对应 |
|||
structInfo: |
|||
#加依尔4983 -> 映射对方平台id |
|||
4983: 56232 |
|||
#点位id对应信息 |
|||
pointInfo: #监测因素 2温湿度 4温度 18裂缝检测 103净空收敛 102拱顶沉降 96二次衬彻应变 107道床及拱腰结构沉降 156风速 578风向 |
|||
|
|||
|
|||
|
@ -0,0 +1,36 @@ |
|||
package JYES_NJZX |
|||
|
|||
type EsThemeDateValue struct { |
|||
Took int `json:"took"` |
|||
TimedOut bool `json:"timed_out"` |
|||
Shards struct { |
|||
Total int `json:"total"` |
|||
Successful int `json:"successful"` |
|||
Skipped int `json:"skipped"` |
|||
Failed int `json:"failed"` |
|||
} `json:"_shards"` |
|||
Hits struct { |
|||
Total int `json:"total"` |
|||
MaxScore float64 `json:"max_score"` |
|||
Hits []Hits `json:"hits"` |
|||
} `json:"hits"` |
|||
} |
|||
type Hits struct { |
|||
Index string `json:"_index"` |
|||
Type int `json:"_type"` |
|||
Id string `json:"_id"` |
|||
Score int `json:"_score"` |
|||
Source Source `json:"_source"` |
|||
} |
|||
type Source struct { |
|||
SensorName string `json:"sensor_name"` |
|||
FactorName string `json:"factor_name"` |
|||
FactorProtoCode string `json:"factor_proto_code"` |
|||
Data map[string]float64 `json:"data"` |
|||
Factor int `json:"factor"` |
|||
CollectTime string `json:"collect_time"` |
|||
Sensor int `json:"sensor"` |
|||
Structure int `json:"structure"` |
|||
IotaDevice []string `json:"iota_device"` |
|||
CreateTime string `json:"create_time"` |
|||
} |
File diff suppressed because it is too large
@ -0,0 +1,205 @@ |
|||
package consumers |
|||
|
|||
import ( |
|||
"crypto/rc4" |
|||
"encoding/hex" |
|||
"fmt" |
|||
"goInOut/adaptors" |
|||
"goInOut/consumers/HBJCAS" |
|||
"goInOut/dbOperate" |
|||
"goInOut/monitors" |
|||
"goInOut/utils" |
|||
"gopkg.in/yaml.v3" |
|||
"log" |
|||
"time" |
|||
) |
|||
|
|||
type consumerJYESNJZX struct { |
|||
//数据缓存管道
|
|||
ch chan []adaptors.NeedPush |
|||
//具体配置
|
|||
Info HBJCAS.ConfigFile |
|||
InHttp *dbOperate.HttpHelper |
|||
outMqtt *dbOperate.MqttHelper |
|||
monitor *monitors.CommonMonitor |
|||
infoRedis *dbOperate.RedisHelper |
|||
} |
|||
|
|||
func (the *consumerJYESNJZX) LoadConfig(cfgStr string) { |
|||
// 将 yaml 格式的数据解析到结构体中
|
|||
err := yaml.Unmarshal([]byte(cfgStr), &the.Info) |
|||
if err != nil { |
|||
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) |
|||
panic(err) |
|||
} |
|||
} |
|||
|
|||
func (the *consumerJYESNJZX) Initial(cfg string) error { |
|||
the.LoadConfig(cfg) |
|||
err := the.InputInitial() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
err = the.OutputInitial() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
err = the.infoComponentInitial() |
|||
return err |
|||
} |
|||
|
|||
func (the *consumerJYESNJZX) InputInitial() error { |
|||
the.ch = make(chan []adaptors.NeedPush, 200) |
|||
//数据入口
|
|||
the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""} |
|||
the.monitor = &monitors.CommonMonitor{ |
|||
MonitorHelper: &monitors.MonitorHelper{}, |
|||
} |
|||
|
|||
the.monitor.Start() |
|||
for _, cron := range the.Info.Monitor { |
|||
the.monitor.RegisterTask(cron, the.getEsAggData) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (the *consumerJYESNJZX) OutputInitial() error { |
|||
//数据出口
|
|||
the.outMqtt = dbOperate.MqttInitial( |
|||
the.Info.IoConfig.Out.Mqtt.Host, |
|||
the.Info.IoConfig.Out.Mqtt.Port, |
|||
the.Info.IoConfig.Out.Mqtt.ClientId, |
|||
the.Info.IoConfig.Out.Mqtt.UserName, |
|||
the.Info.IoConfig.Out.Mqtt.Password, |
|||
false, //按照具体项目来
|
|||
"") |
|||
return nil |
|||
} |
|||
|
|||
func (the *consumerJYESNJZX) infoComponentInitial() error { |
|||
//数据出口
|
|||
addr := the.Info.QueryComponent.Redis.Address |
|||
the.infoRedis = dbOperate.NewRedisHelper("", addr) |
|||
return nil |
|||
} |
|||
|
|||
func (the *consumerJYESNJZX) getEsAggData() { |
|||
start, end := utils.GetTimeRangeBy1minByOffset(-5) |
|||
//取向前偏移5分钟的一分钟的数据
|
|||
log.Printf("查询数据时间范围 %s - %s", start, end) |
|||
factorIds := []int{2, 4, 18, 103, 102, 96, 107, 156, 578} |
|||
//监测因素 2温湿度 4温度 18裂缝检测 103净空收敛 102拱顶沉降 96二次衬彻应变 107道床及拱腰结构沉降 156风速 578风向
|
|||
|
|||
//架伊尔大桥的结构物id
|
|||
var structId int64 |
|||
for strutId, _ := range the.Info.StructInfo { |
|||
structId = strutId |
|||
} |
|||
|
|||
adaptor := the.getAdaptor() |
|||
adaptor.PointInfo = the.Info.PointInfo |
|||
adaptor.StructInfo = the.Info.StructInfo |
|||
for _, factorId := range factorIds { |
|||
esQuery := the.getESQueryStr(structId, factorId, start, end) |
|||
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} |
|||
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) |
|||
log.Printf("esAggResultStr[%s]", esAggResultStr) |
|||
|
|||
needPushes := adaptor.Transform(structId, factorId, esAggResultStr) |
|||
for i := range needPushes { |
|||
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) |
|||
log.Printf("topic[%s],Payload=> %s", needPushes[i].Topic, hex.EncodeToString(needPushes[i].Payload)) |
|||
} |
|||
|
|||
if len(needPushes) > 0 { |
|||
the.ch <- needPushes |
|||
} |
|||
} |
|||
} |
|||
|
|||
func (the *consumerJYESNJZX) getAdaptor() (adaptor adaptors.Adaptor_AXYES_NJZX) { |
|||
|
|||
return adaptors.Adaptor_AXYES_NJZX{ |
|||
Redis: the.infoRedis, |
|||
} |
|||
} |
|||
|
|||
func (the *consumerJYESNJZX) crc16rc4(transBytes []byte) []byte { |
|||
resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes) |
|||
needRC4 := append(transBytes, resultByCrc16...) |
|||
rc4KeyStr, ok := the.Info.OtherInfo["rc4key"] |
|||
if !ok { |
|||
log.Panicf("未配置 rc4key") |
|||
} |
|||
rc4Key := []byte(rc4KeyStr) //the.RC4Key
|
|||
// 加密操作
|
|||
dest1 := make([]byte, len(needRC4)) |
|||
rc4.NewCipher(rc4Key) |
|||
cipher1, _ := rc4.NewCipher(rc4Key) |
|||
cipher1.XORKeyStream(dest1, needRC4) |
|||
return dest1 |
|||
} |
|||
|
|||
func (the *consumerJYESNJZX) getESQueryStr(structureId int64, factorId int, start, end string) string { |
|||
esQuery := fmt.Sprintf(` |
|||
{ |
|||
"size": 20, |
|||
"query": { |
|||
"bool": { |
|||
"must": [ |
|||
{ |
|||
"term": { |
|||
"structure": { |
|||
"value": %d |
|||
} |
|||
} |
|||
}, |
|||
{ |
|||
"term": { |
|||
"factor": { |
|||
"value": %d |
|||
} |
|||
} |
|||
}, |
|||
{ |
|||
"range": { |
|||
"collect_time": { |
|||
"gte": "%s", |
|||
"lte": "%s" |
|||
} |
|||
} |
|||
} |
|||
] |
|||
} |
|||
}, |
|||
} |
|||
`, structureId, factorId, start, end) |
|||
|
|||
return esQuery |
|||
} |
|||
|
|||
func (the *consumerJYESNJZX) Work() { |
|||
go func() { |
|||
for { |
|||
needPushList := <-the.ch |
|||
if len(the.ch) > 0 { |
|||
log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) |
|||
} |
|||
|
|||
for _, push := range needPushList { |
|||
if push.Topic != "" { |
|||
the.outMqtt.Publish(push.Topic, push.Payload) |
|||
continue |
|||
} |
|||
|
|||
//没有标记topic 的 按照配置文件里面的推送
|
|||
for _, topic := range the.Info.IoConfig.Out.Mqtt.Topics { |
|||
the.outMqtt.Publish(topic, push.Payload) |
|||
} |
|||
|
|||
} |
|||
|
|||
time.Sleep(100 * time.Millisecond) |
|||
} |
|||
}() |
|||
} |
Loading…
Reference in new issue