Compare commits
53 Commits
0da55d9f35
...
4f09121f6f
Author | SHA1 | Date |
---|---|---|
lucas2 | 4f09121f6f | 2 weeks ago |
lucas | 075f8b40d3 | 2 weeks ago |
lucas | d4c10f1a10 | 2 weeks ago |
lucas | b5925b054e | 2 weeks ago |
lucas | d75d767904 | 2 weeks ago |
lucas | e7cb98f992 | 2 weeks ago |
lucas | c681f9e48e | 2 weeks ago |
lucas | 6d979cbe2c | 2 weeks ago |
lucas | 0ef19cf4f9 | 3 weeks ago |
lucas | 0082f882ca | 1 month ago |
lucas | 35ad2d7295 | 1 month ago |
lucas | 04e3f32654 | 1 month ago |
lucas | 4f452dbfcd | 1 month ago |
lucas | 4b8f5921f5 | 2 months ago |
lucas | dd1a3397e2 | 2 months ago |
lucas | 3e0cd261e6 | 2 months ago |
lucas | e586dbab16 | 2 months ago |
lucas | 9067faeb58 | 2 months ago |
lucas | 507dc26e61 | 2 months ago |
lucas | da5078826c | 2 months ago |
lucas | f3f164a36a | 2 months ago |
lucas | bb15e9b7f9 | 2 months ago |
lucas | 68b672cfcd | 2 months ago |
lucas | c58ff60910 | 2 months ago |
lucas | 995aa2e415 | 2 months ago |
lucas | e5ea7a485a | 2 months ago |
lucas | 1e187bfdff | 2 months ago |
lucas | 5c95f682a5 | 2 months ago |
lucas | 0d195cb3a0 | 2 months ago |
lucas | 189692eade | 2 months ago |
lucas | e7c1fbec68 | 2 months ago |
lucas | cdcc7ddc28 | 2 months ago |
lucas | 72d18dde47 | 2 months ago |
lucas | 7b38baa228 | 2 months ago |
lucas | becd732f62 | 2 months ago |
lucas | 14509000d6 | 2 months ago |
lucas | 30316c6d6f | 2 months ago |
lucas | be231c81bb | 2 months ago |
lucas | 6c02504ccd | 2 months ago |
lucas | bc2e575067 | 2 months ago |
lucas | de9a85cfa0 | 2 months ago |
lucas | b29428e567 | 2 months ago |
lucas | d589c16790 | 2 months ago |
lucas | 9ec508c38b | 2 months ago |
lucas | 86fda6ed1c | 2 months ago |
lucas | 23d196993f | 2 months ago |
lucas | 4a8f314727 | 2 months ago |
lucas | 5dc6916de9 | 2 months ago |
lucas | 225129594a | 2 months ago |
lucas | e59c21c27d | 3 months ago |
lucas | 93c974bddb | 3 months ago |
lucas | 0592ec7e07 | 3 months ago |
lucas2 | d42506f9a1 | 3 months ago |
97 changed files with 2672 additions and 354 deletions
@ -0,0 +1 @@ |
|||||
|
/logs/ |
@ -1,19 +1,21 @@ |
|||||
# goUpload |
# goInOut |
||||
## 功能说明: |
## 功能说明: |
||||
功能:用于设备采集数据上报第三方。 |
功能:用于数据的输入输出处理,可应对各中数据处理场景,典型的设备采集数据上报第三方。 |
||||
数据入口->各类本地采集软件 |
数据入口->各类本地采集软件 |
||||
数据出口->各类第三方数据接口 |
数据出口->各类第三方数据接口 |
||||
部署环境:用于windows环境 |
部署环境:用于windows环境或k8s 镜像部署 |
||||
|
|
||||
## 启用配置: |
## 启用配置: |
||||
goUpload.exe + configFiles目录下的json配置 |
app.exe + configFiles目录下的json配置 |
||||
(configFiles目录下有几个配置就会生效几个功能,实际部署时,不相干的项目配置不要放进去) |
(configFiles目录下有几个配置就会生效几个功能,实际部署时,不相干的项目配置不要放进去) |
||||
|
|
||||
支持-软件列表: |
支持-软件列表: |
||||
-------------------- |
-------------------- |
||||
**统一采集软件** |
**统一采集软件_mqtt** |
||||
|
|
||||
**DAAS振动软件** |
**DAAS振动软件_mqtt** |
||||
|
|
||||
**称重软件** |
**称重软件_mqtt** |
||||
**中科-光电txt** |
**中科-光电txt** |
||||
|
|
||||
|
**kafka** |
||||
|
@ -0,0 +1,84 @@ |
|||||
|
package adaptors |
||||
|
|
||||
|
import ( |
||||
|
"bytes" |
||||
|
"encoding/binary" |
||||
|
"encoding/json" |
||||
|
"fmt" |
||||
|
"goInOut/models" |
||||
|
"goInOut/utils" |
||||
|
"log" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
// Adaptor_GDND2LA_JSNCGLQL 光电挠度2路安数据 转换 江苏农村公路桥梁监测系统
|
||||
|
type Adaptor_GDND2LA_JSNCGLQL struct { |
||||
|
IdMap map[string]string |
||||
|
BridgeCode string |
||||
|
} |
||||
|
|
||||
|
func (the Adaptor_GDND2LA_JSNCGLQL) Transform(inTopic, rawMsg string) []NeedPush { |
||||
|
gdnd := models.GDND2luAn{} |
||||
|
json.Unmarshal([]byte(rawMsg), &gdnd) |
||||
|
return the.RawToJSNCGLQL(gdnd) |
||||
|
} |
||||
|
|
||||
|
func (the Adaptor_GDND2LA_JSNCGLQL) RawToJSNCGLQL(raw models.GDND2luAn) (result []NeedPush) { |
||||
|
Atime, err := time.Parse("2006-01-02 15:04:05", raw.Time) |
||||
|
if err != nil { |
||||
|
log.Printf("光电扰度 设备[%s] 数据时间 %s 解析错误", raw.Id, raw.Time) |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
sensorDataList := raw.Value[raw.Id] |
||||
|
//获取对方系统 id
|
||||
|
sensorCode := the.getSensorCode(raw.Id) |
||||
|
if sensorCode == "" { |
||||
|
log.Printf("光电扰度 设备[%s] 无匹配的 江苏农村公路桥梁监测系统 测点id,请检查配置文件", raw.Id) |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
topic := fmt.Sprintf("data/%s/%s", the.BridgeCode, sensorCode) |
||||
|
|
||||
|
var transBytes []byte |
||||
|
//添加时间段
|
||||
|
transBytes = append(transBytes, the.getTimeBytes(Atime)...) |
||||
|
|
||||
|
for _, sensorData := range sensorDataList { |
||||
|
|
||||
|
//添加数据值段
|
||||
|
bs := utils.Float32ToBytes(sensorData) |
||||
|
transBytes = append(transBytes, bs...) |
||||
|
} |
||||
|
result = append(result, NeedPush{ |
||||
|
Topic: topic, |
||||
|
Payload: transBytes, |
||||
|
}) |
||||
|
|
||||
|
return result |
||||
|
} |
||||
|
func (the Adaptor_GDND2LA_JSNCGLQL) getSensorCode(rawSensorName string) string { |
||||
|
v, isValid := the.IdMap[rawSensorName] |
||||
|
if !isValid { |
||||
|
v = "" |
||||
|
} |
||||
|
return v |
||||
|
} |
||||
|
|
||||
|
func (the Adaptor_GDND2LA_JSNCGLQL) getTimeBytes(sensorTime time.Time) []byte { |
||||
|
|
||||
|
year := uint16(sensorTime.Year()) |
||||
|
month := int8(sensorTime.Month()) |
||||
|
day := int8(sensorTime.Day()) |
||||
|
hour := int8(sensorTime.Hour()) |
||||
|
minute := int8(sensorTime.Minute()) |
||||
|
second := int8(sensorTime.Second()) |
||||
|
bytesBuffer := bytes.NewBuffer([]byte{}) |
||||
|
binary.Write(bytesBuffer, binary.BigEndian, year) |
||||
|
binary.Write(bytesBuffer, binary.BigEndian, month) |
||||
|
binary.Write(bytesBuffer, binary.BigEndian, day) |
||||
|
binary.Write(bytesBuffer, binary.BigEndian, hour) |
||||
|
binary.Write(bytesBuffer, binary.BigEndian, minute) |
||||
|
binary.Write(bytesBuffer, binary.BigEndian, second) |
||||
|
return bytesBuffer.Bytes() |
||||
|
} |
@ -0,0 +1,47 @@ |
|||||
|
package adaptors |
||||
|
|
||||
|
import ( |
||||
|
"encoding/json" |
||||
|
"fmt" |
||||
|
"goInOut/consumers/SinoGnssMySQL" |
||||
|
"strconv" |
||||
|
) |
||||
|
|
||||
|
// Adaptor_SINOMYSQL_AXYMQTT 数据 转换 江苏农村公路桥梁监测系统
|
||||
|
type Adaptor_SINOMYSQL_AXYMQTT struct { |
||||
|
} |
||||
|
|
||||
|
func (the Adaptor_SINOMYSQL_AXYMQTT) Transform(gnssDataList []SinoGnssMySQL.GnssData) []NeedPush { |
||||
|
var needPush []NeedPush |
||||
|
allDxFiles := make(map[string][]SinoGnssMySQL.DxFile) |
||||
|
for _, gnssData := range gnssDataList { |
||||
|
OnceDxFiles := allDxFiles[gnssData.GroupName] |
||||
|
OnceDxFiles = append(OnceDxFiles, SinoGnssMySQL.DxFile{ |
||||
|
Module: gnssData.StationName, |
||||
|
Channel: 1, |
||||
|
Timespan: gnssData.Time.UnixMilli(), |
||||
|
//file_mqtt协议里面只解析RV, m=> mm
|
||||
|
RawValue: []float64{gnssData.X * 1000, gnssData.Y * 1000, gnssData.H * 1000}, |
||||
|
LimitValue: []float64{}, |
||||
|
PhyValue: []float64{}, |
||||
|
ThemeValue: []float64{}, |
||||
|
}) |
||||
|
allDxFiles[gnssData.GroupName] = OnceDxFiles |
||||
|
} |
||||
|
|
||||
|
for groupName, groupFileList := range allDxFiles { |
||||
|
contentMap := make(map[string]string) |
||||
|
for i, file := range groupFileList { |
||||
|
bs, _ := json.Marshal(file) |
||||
|
contentMap[strconv.Itoa(i)] = string(bs) |
||||
|
} |
||||
|
gpContent, _ := json.Marshal(contentMap) |
||||
|
topic := fmt.Sprintf("SinoGnss/%s/", groupName) |
||||
|
needPush = append(needPush, NeedPush{ |
||||
|
Topic: topic, |
||||
|
Payload: gpContent, |
||||
|
}) |
||||
|
} |
||||
|
|
||||
|
return needPush |
||||
|
} |
@ -0,0 +1,148 @@ |
|||||
|
package adaptors |
||||
|
|
||||
|
import ( |
||||
|
"encoding/json" |
||||
|
"fmt" |
||||
|
"goInOut/consumers/AXYraw" |
||||
|
"goInOut/dbOperate" |
||||
|
"goInOut/models" |
||||
|
"log" |
||||
|
"sync" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
// Adaptor_AXY_LastRAW 安心云 kafka iota数据 转换 es设备数据
|
||||
|
type Adaptor_AXY_LastRAW struct { |
||||
|
AXYraw.Info |
||||
|
Redis *dbOperate.RedisHelper |
||||
|
} |
||||
|
|
||||
|
func (the Adaptor_AXY_LastRAW) Transform(topic, rawMsg string) *models.EsRaw { |
||||
|
iotaData := models.IotaData{} |
||||
|
json.Unmarshal([]byte(rawMsg), &iotaData) |
||||
|
return the.raw2es(iotaData) |
||||
|
} |
||||
|
|
||||
|
func (the Adaptor_AXY_LastRAW) raw2es(iotaData models.IotaData) *models.EsRaw { |
||||
|
if len(iotaData.Data.Data) == 0 { |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
if !iotaData.Data.Success() { |
||||
|
msg, _ := json.Marshal(iotaData.Data.Result) |
||||
|
iotaData.Data.Data["errMsg"] = string(msg) |
||||
|
} |
||||
|
//log.Printf("设备[%s] 数据时间 %s", iotaData.DeviceId, iotaData.TriggerTime)
|
||||
|
deviceInfo := the.GetDeviceInfo(iotaData.DeviceId) |
||||
|
|
||||
|
//查不到信息的数据
|
||||
|
if deviceInfo.Name == "" { |
||||
|
log.Printf("设备[%s] 无deviceInfo信息 %s", iotaData.DeviceId, iotaData.TriggerTime) |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
dataType := "" |
||||
|
if _dataType, ok := iotaData.Data.Data["_data_type"]; ok { |
||||
|
if v, ok := _dataType.(string); ok { |
||||
|
dataType = v |
||||
|
} |
||||
|
} |
||||
|
devdata := &models.DeviceData{ |
||||
|
DeviceId: iotaData.DeviceId, |
||||
|
Name: deviceInfo.Name, |
||||
|
ThingId: iotaData.ThingId, |
||||
|
StructId: deviceInfo.Structure.Id, |
||||
|
AcqTime: iotaData.TriggerTime, |
||||
|
RealTime: iotaData.RealTime, |
||||
|
ErrCode: 0, |
||||
|
Raw: iotaData.Data.Data, |
||||
|
DeviceInfo: deviceInfo, |
||||
|
DimensionId: iotaData.DimensionId, |
||||
|
DataType: dataType, |
||||
|
} |
||||
|
EsRaws := toEsRaw(devdata) |
||||
|
return EsRaws |
||||
|
} |
||||
|
|
||||
|
var deviceInfoMap = map[string]models.DeviceInfo{} |
||||
|
var deviceInfoMap2 = sync.Map{} |
||||
|
|
||||
|
func (the Adaptor_AXY_LastRAW) GetDeviceInfo(deviceId string) models.DeviceInfo { |
||||
|
if vObj, ok := deviceInfoMap2.Load(deviceId); ok { |
||||
|
if v, ok := vObj.(models.DeviceInfo); ok { |
||||
|
durationMin := time.Now().Sub(v.RefreshTime).Minutes() |
||||
|
if durationMin < 5 { |
||||
|
return v |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
v, err := the.GetDeviceInfoFromRedis(deviceId) |
||||
|
if err != nil { |
||||
|
log.Printf("设备%s 无法查询到对应信息", deviceId) |
||||
|
} |
||||
|
//deviceInfoMap[deviceId] = v
|
||||
|
deviceInfoMap2.Store(deviceId, v) |
||||
|
return v |
||||
|
} |
||||
|
|
||||
|
func (the Adaptor_AXY_LastRAW) GetDeviceInfoFromRedis(deviceId string) (models.DeviceInfo, error) { |
||||
|
Key_Iota_device := "iota_device" |
||||
|
key_Thing_struct := "thing_struct" |
||||
|
key_Iota_meta := "iota_meta" |
||||
|
k1 := fmt.Sprintf("%s:%s", Key_Iota_device, deviceId) |
||||
|
dev := models.IotaDevice{} |
||||
|
thingStruct := models.ThingStruct{} |
||||
|
devMeta := models.DeviceMeta{} |
||||
|
err1 := the.Redis.GetObj(k1, &dev) |
||||
|
if err1 != nil { |
||||
|
return models.DeviceInfo{RefreshTime: time.Now()}, err1 |
||||
|
} |
||||
|
k2 := fmt.Sprintf("%s:%s", key_Thing_struct, dev.ThingId) |
||||
|
err2 := the.Redis.GetObj(k2, &thingStruct) |
||||
|
if err2 != nil { |
||||
|
return models.DeviceInfo{RefreshTime: time.Now()}, err2 |
||||
|
} |
||||
|
k3 := fmt.Sprintf("%s:%s", key_Iota_meta, dev.DeviceMeta.Id) |
||||
|
err3 := the.Redis.GetObj(k3, &devMeta) |
||||
|
if err3 != nil { |
||||
|
return models.DeviceInfo{RefreshTime: time.Now()}, err3 |
||||
|
} |
||||
|
//if err1 != nil || err2 != nil || err3 != nil {
|
||||
|
// log.Printf("redis读取异常,err1=%s, err2=%s, err3=%s", err1, err2, err3)
|
||||
|
// return models.DeviceInfo{}
|
||||
|
//}
|
||||
|
|
||||
|
s := models.Structure{ |
||||
|
ThingId: thingStruct.ThingId, |
||||
|
Id: thingStruct.Id, |
||||
|
Name: thingStruct.Name, |
||||
|
OrgId: thingStruct.OrgId, |
||||
|
} |
||||
|
return models.DeviceInfo{ |
||||
|
Id: deviceId, |
||||
|
Name: dev.Name, |
||||
|
Structure: s, |
||||
|
DeviceMeta: devMeta, |
||||
|
RefreshTime: time.Now(), |
||||
|
}, nil |
||||
|
} |
||||
|
|
||||
|
func toEsRaw(deviceData *models.DeviceData) *models.EsRaw { |
||||
|
|
||||
|
if deviceData == nil { |
||||
|
return nil |
||||
|
} |
||||
|
var cstZone = time.FixedZone("CST", 8*3600) // 东八区 用以解决
|
||||
|
dataOutMeta := deviceData.DeviceInfo.DeviceMeta.GetOutputProps() |
||||
|
createNativeRaw := models.EsRaw{ |
||||
|
StructId: deviceData.StructId, |
||||
|
IotaDeviceName: deviceData.Name, |
||||
|
Data: deviceData.Raw, |
||||
|
CollectTime: deviceData.AcqTime.In(cstZone).Format("2006-01-02T15:04:05.000+0800"), |
||||
|
Meta: dataOutMeta, |
||||
|
IotaDevice: deviceData.DeviceId, |
||||
|
CreateTime: time.Now(), |
||||
|
} |
||||
|
|
||||
|
return &createNativeRaw |
||||
|
} |
@ -0,0 +1,132 @@ |
|||||
|
package adaptors |
||||
|
|
||||
|
import ( |
||||
|
"bytes" |
||||
|
"encoding/binary" |
||||
|
"encoding/json" |
||||
|
"fmt" |
||||
|
"goInOut/models" |
||||
|
"goInOut/utils" |
||||
|
"log" |
||||
|
"strconv" |
||||
|
"strings" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
// Adaptor_ZD_JSNCGLQL 数据 转换 江苏农村公路桥梁监测系统
|
||||
|
type Adaptor_ZD_JSNCGLQL struct { |
||||
|
IdMap map[string]string |
||||
|
BridgeCode string |
||||
|
} |
||||
|
|
||||
|
func (the Adaptor_ZD_JSNCGLQL) Transform(inTopic, rawMsg string) []NeedPush { |
||||
|
zd := models.ZD{} |
||||
|
err := json.Unmarshal([]byte(rawMsg), &zd) |
||||
|
if err != nil { |
||||
|
return nil |
||||
|
} |
||||
|
lowerTopic := strings.ToLower(inTopic) |
||||
|
if strings.Contains(lowerTopic, "zdsl") || |
||||
|
strings.Contains(lowerTopic, "cableforce") { |
||||
|
return the.ZDSLtoJSNCGLQL(zd) |
||||
|
} |
||||
|
|
||||
|
return the.ZDtoJSNCGLQL(zd) |
||||
|
} |
||||
|
|
||||
|
func (the Adaptor_ZD_JSNCGLQL) ZDtoJSNCGLQL(zd models.ZD) (result []NeedPush) { |
||||
|
Atime := time.UnixMilli(zd.Ticks) |
||||
|
|
||||
|
sensorDataList := zd.AccValues |
||||
|
//获取对方系统 id
|
||||
|
sensorCode := the.getSensorCode(zd.Module) |
||||
|
if sensorCode == "" { |
||||
|
log.Printf("振动 设备[%s] 无匹配的 江苏农村公路桥梁监测系统 测点id,请检查配置文件", zd.Module) |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
topic := fmt.Sprintf("data/%s/%s", the.BridgeCode, sensorCode) |
||||
|
//数据秒数
|
||||
|
seconds := len(zd.AccValues) / zd.Frequency |
||||
|
|
||||
|
for i := 0; i < seconds; i++ { |
||||
|
var transBytes []byte |
||||
|
onceTime := Atime.Add(time.Duration(i) * time.Second) |
||||
|
//1.添加时间段
|
||||
|
transBytes = append(transBytes, the.getTimeBytes(onceTime)...) |
||||
|
|
||||
|
startIndex := (0 + i) * zd.Frequency |
||||
|
endIndex := (1 + i) * zd.Frequency |
||||
|
if endIndex > len(sensorDataList) { |
||||
|
endIndex = len(sensorDataList) |
||||
|
} |
||||
|
subDataList := sensorDataList[startIndex:endIndex] |
||||
|
|
||||
|
for _, sensorData := range subDataList { |
||||
|
|
||||
|
//4.添加数据值段
|
||||
|
bs := utils.Float32ToBytes(sensorData) |
||||
|
transBytes = append(transBytes, bs...) |
||||
|
} |
||||
|
result = append(result, NeedPush{ |
||||
|
Topic: topic, |
||||
|
Payload: transBytes, |
||||
|
}) |
||||
|
} |
||||
|
|
||||
|
return result |
||||
|
} |
||||
|
func (the Adaptor_ZD_JSNCGLQL) ZDSLtoJSNCGLQL(zd models.ZD) (result []NeedPush) { |
||||
|
Atime := time.UnixMilli(zd.Ticks) |
||||
|
|
||||
|
sensorDataList := zd.ThemeValue |
||||
|
//获取对方系统 id
|
||||
|
sensorCode := the.getSensorCode(strconv.Itoa(zd.SensorId)) |
||||
|
if sensorCode == "" { |
||||
|
log.Printf("振动索力 设备[%s] 无匹配的 江苏农村公路桥梁监测系统 测点id,请检查配置文件", zd.Module) |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
topic := fmt.Sprintf("data/%s/%s", the.BridgeCode, sensorCode) |
||||
|
|
||||
|
var transBytes []byte |
||||
|
//1.添加时间段
|
||||
|
transBytes = append(transBytes, the.getTimeBytes(Atime)...) |
||||
|
|
||||
|
//数据值段
|
||||
|
bs := utils.Float32ToBytes(sensorDataList[0]) |
||||
|
transBytes = append(transBytes, bs...) |
||||
|
|
||||
|
result = append(result, NeedPush{ |
||||
|
Topic: topic, |
||||
|
Payload: transBytes, |
||||
|
}) |
||||
|
|
||||
|
return result |
||||
|
} |
||||
|
|
||||
|
func (the Adaptor_ZD_JSNCGLQL) getSensorCode(rawSensorName string) string { |
||||
|
v, isValid := the.IdMap[rawSensorName] |
||||
|
if !isValid { |
||||
|
v = "" |
||||
|
} |
||||
|
return v |
||||
|
} |
||||
|
|
||||
|
func (the Adaptor_ZD_JSNCGLQL) getTimeBytes(sensorTime time.Time) []byte { |
||||
|
|
||||
|
year := uint16(sensorTime.Year()) |
||||
|
month := int8(sensorTime.Month()) |
||||
|
day := int8(sensorTime.Day()) |
||||
|
hour := int8(sensorTime.Hour()) |
||||
|
minute := int8(sensorTime.Minute()) |
||||
|
second := int8(sensorTime.Second()) |
||||
|
bytesBuffer := bytes.NewBuffer([]byte{}) |
||||
|
binary.Write(bytesBuffer, binary.BigEndian, year) |
||||
|
binary.Write(bytesBuffer, binary.BigEndian, month) |
||||
|
binary.Write(bytesBuffer, binary.BigEndian, day) |
||||
|
binary.Write(bytesBuffer, binary.BigEndian, hour) |
||||
|
binary.Write(bytesBuffer, binary.BigEndian, minute) |
||||
|
binary.Write(bytesBuffer, binary.BigEndian, second) |
||||
|
return bytesBuffer.Bytes() |
||||
|
} |
@ -0,0 +1,4 @@ |
|||||
|
FROM registry.ngaiot.com/base-images/alpine_3.20_cst:7 |
||||
|
WORKDIR /app/ |
||||
|
COPY *.exe configFiles /app/ |
||||
|
CMD ["/app/app.exe"] |
@ -0,0 +1,24 @@ |
|||||
|
{ |
||||
|
"consumer": "consumerSinoGnssMySQL", |
||||
|
"ioConfig": { |
||||
|
"in": { |
||||
|
"db": { |
||||
|
"type": "mysql", |
||||
|
"connStr": "root:Xuchen@2024@tcp(39.105.5.154:3306)/navi_cloud_sinognss?charset=utf8&parseTime=true" |
||||
|
}, |
||||
|
"cronStr": "0/1 * * * *" |
||||
|
}, |
||||
|
"out": { |
||||
|
"mqtt": { |
||||
|
"host": "10.8.30.160", |
||||
|
"port": 30883, |
||||
|
"userName": "upload", |
||||
|
"password": "", |
||||
|
"clientId": "goInOut_SinoGnssMySQL", |
||||
|
"Topics": [] |
||||
|
} |
||||
|
} |
||||
|
}, |
||||
|
"info": { |
||||
|
} |
||||
|
} |
@ -0,0 +1,36 @@ |
|||||
|
{ |
||||
|
"consumer": "consumerAXYraw", |
||||
|
"ioConfig": { |
||||
|
"in": { |
||||
|
"kafka": { |
||||
|
"brokers": [ |
||||
|
"10.8.30.160:30992" |
||||
|
], |
||||
|
"groupId": "synchronizeRaw_50", |
||||
|
"topics": [ |
||||
|
"RawData" |
||||
|
] |
||||
|
} |
||||
|
}, |
||||
|
"out": { |
||||
|
"es": { |
||||
|
"address": ["http://10.8.30.160:30092"], |
||||
|
"index": "anxincloud_last_raw", |
||||
|
"auth": { |
||||
|
"userName": "post", |
||||
|
"password": "123" |
||||
|
}, |
||||
|
"interval": 30 |
||||
|
} |
||||
|
} |
||||
|
}, |
||||
|
"info": { |
||||
|
"common": { |
||||
|
}, |
||||
|
"queryComponent":{ |
||||
|
"redis": { |
||||
|
"address": "10.8.30.142:30379" |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
@ -0,0 +1,30 @@ |
|||||
|
{ |
||||
|
"consumer": "consumerHttpProxy", |
||||
|
"ioConfig": { |
||||
|
"in": { |
||||
|
"apiServer": { |
||||
|
"port": 7000, |
||||
|
"userName": "usr", |
||||
|
"password": "123", |
||||
|
"routers": [ |
||||
|
{ |
||||
|
"router": "POST /upload/work/ABStatus", |
||||
|
"idTemplate": "{{.values.ucId}}" |
||||
|
}, |
||||
|
{ |
||||
|
"router": "POST /upload/work/ABSHeart", |
||||
|
"idTemplate": "{{.Values.WorkId}}" |
||||
|
} |
||||
|
] |
||||
|
} |
||||
|
}, |
||||
|
"out": { |
||||
|
"httpPost": { |
||||
|
"url": "http://218.3.126.49:60000", |
||||
|
"method": "post" |
||||
|
} |
||||
|
} |
||||
|
}, |
||||
|
"info": { |
||||
|
} |
||||
|
} |
@ -1 +0,0 @@ |
|||||
package constKey |
|
@ -0,0 +1,31 @@ |
|||||
|
package AXYraw |
||||
|
|
||||
|
import "goInOut/config" |
||||
|
|
||||
|
type ConfigFile struct { |
||||
|
config.Consumer |
||||
|
IoConfig ioConfig `json:"ioConfig"` |
||||
|
Info Info `json:"info"` |
||||
|
} |
||||
|
type ioConfig struct { |
||||
|
In in `json:"in"` |
||||
|
Out out `json:"out"` |
||||
|
} |
||||
|
type in struct { |
||||
|
Kafka config.KafkaConfig `json:"kafka"` |
||||
|
} |
||||
|
|
||||
|
type out struct { |
||||
|
Es config.EsConfig `json:"es"` |
||||
|
} |
||||
|
|
||||
|
type Info struct { |
||||
|
Common map[string]string `json:"common"` |
||||
|
QueryComponent queryComponent `json:"queryComponent"` |
||||
|
} |
||||
|
|
||||
|
type queryComponent struct { |
||||
|
Redis struct { |
||||
|
Address string `json:"address"` |
||||
|
} `json:"redis"` |
||||
|
} |
@ -0,0 +1,25 @@ |
|||||
|
package HTTP_PRPXY |
||||
|
|
||||
|
import "goInOut/config" |
||||
|
|
||||
|
type ConfigFile struct { |
||||
|
config.Consumer |
||||
|
IoConfig ioConfig `json:"ioConfig"` |
||||
|
OtherInfo map[string]string `json:"info"` |
||||
|
} |
||||
|
type ioConfig struct { |
||||
|
In In `json:"in"` |
||||
|
Out OUT `json:"out"` |
||||
|
} |
||||
|
type In struct { |
||||
|
ApiServer config.ApiServerConfig `json:"apiServer"` |
||||
|
} |
||||
|
|
||||
|
type OUT struct { |
||||
|
HttpPost config.HttpConfig `json:"httpPost"` |
||||
|
} |
||||
|
|
||||
|
type SensorInfo struct { |
||||
|
Name string `json:"name"` //测点名称
|
||||
|
Code string `json:"code"` //测点编号 宜由“桥名简称-监测类别简称-构件类型编码-截面序号-构件序号-测点编号”组成
|
||||
|
} |
@ -0,0 +1,44 @@ |
|||||
|
package HTTP_PRPXY |
||||
|
|
||||
|
type ABStatus struct { |
||||
|
Values ABStatusValues `json:"values"` |
||||
|
Secret string `json:"secret"` |
||||
|
Type string `json:"type"` |
||||
|
} |
||||
|
|
||||
|
type ABStatusValues struct { |
||||
|
CreateTime int64 `json:"createTime"` |
||||
|
Updater string `json:"updater"` |
||||
|
Deleted bool `json:"deleted"` |
||||
|
Id string `json:"id"` |
||||
|
WorkId string `json:"workId"` |
||||
|
UcId string `json:"ucId"` |
||||
|
DeviceCode string `json:"deviceCode"` |
||||
|
UserId string `json:"userId"` |
||||
|
UserName string `json:"userName"` |
||||
|
DataTime int64 `json:"dataTime"` |
||||
|
BeltStatus string `json:"beltStatus"` |
||||
|
DeviceStatus string `json:"deviceStatus"` |
||||
|
StartDate int64 `json:"startDate"` |
||||
|
EndDate int64 `json:"endDate"` |
||||
|
MainVol int `json:"mainVol"` |
||||
|
AscendWorkCertificate string `json:"ascend_work_certificate"` |
||||
|
BeltPoseLeft string `json:"beltPoseLeft"` |
||||
|
BeltPoseRight string `json:"beltPoseRight"` |
||||
|
BeltStatusLeft string `json:"beltStatusLeft"` |
||||
|
BeltStatusRight string `json:"beltStatusRight"` |
||||
|
TickbVol int `json:"tickbVol"` |
||||
|
//Avatar interface{} `json:"avatar"`
|
||||
|
//AlarmType interface{} `json:"alarmType"`
|
||||
|
//Identity interface{} `json:"identity"`
|
||||
|
//IdentityImg interface{} `json:"identityImg"`
|
||||
|
//IdentityImgF interface{} `json:"identityImgF"`
|
||||
|
//Phone interface{} `json:"phone"`
|
||||
|
Versions int `json:"versions"` |
||||
|
//Team interface{} `json:"team"`
|
||||
|
Longitude string `json:"longitude"` |
||||
|
Latitude string `json:"latitude"` |
||||
|
Outsource string `json:"outsource"` |
||||
|
LocTime int64 `json:"locTime"` |
||||
|
//WorkLen interface{} `json:"workLen"`
|
||||
|
} |
@ -0,0 +1,27 @@ |
|||||
|
package SinoGnssMySQL |
||||
|
|
||||
|
import "goInOut/config" |
||||
|
|
||||
|
type ConfigFile struct { |
||||
|
config.Consumer |
||||
|
IoConfig ioConfig `json:"ioConfig"` |
||||
|
OtherInfo map[string]string `json:"info"` |
||||
|
} |
||||
|
type ioConfig struct { |
||||
|
In In `json:"in"` |
||||
|
Out OUT `json:"out"` |
||||
|
} |
||||
|
type In struct { |
||||
|
Db config.DbConfig `json:"db"` |
||||
|
CronStr string `json:"cronStr"` |
||||
|
} |
||||
|
|
||||
|
type OUT struct { |
||||
|
Mqtt config.MqttConfig `json:"mqtt"` |
||||
|
} |
||||
|
|
||||
|
// 缓存用
|
||||
|
type RecordInfo struct { |
||||
|
Id int64 `json:"id"` |
||||
|
TableName string `json:"table_name"` |
||||
|
} |
@ -0,0 +1,28 @@ |
|||||
|
package SinoGnssMySQL |
||||
|
|
||||
|
import "time" |
||||
|
|
||||
|
type GnssData struct { |
||||
|
Id int64 `json:"id" db:"id"` |
||||
|
StationName string `json:"station_name" db:"station_name"` |
||||
|
GroupName string `json:"group_name" db:"group_name"` |
||||
|
Time time.Time `json:"time" db:"time"` |
||||
|
X float64 `json:"x" db:"x"` |
||||
|
Y float64 `json:"y" db:"y"` |
||||
|
H float64 `json:"h" db:"h"` |
||||
|
} |
||||
|
|
||||
|
type DxFile struct { |
||||
|
SensorId int `json:"S"` |
||||
|
Module string `json:"M"` |
||||
|
Channel int `json:"C"` |
||||
|
Error int `json:"R"` |
||||
|
Round int64 `json:"N"` |
||||
|
Timespan int64 `json:"T"` |
||||
|
Req []string `json:"Q"` |
||||
|
Acq []string `json:"A"` |
||||
|
RawValue []float64 `json:"RV"` |
||||
|
LimitValue []float64 `json:"LV"` |
||||
|
PhyValue []float64 `json:"PV"` |
||||
|
ThemeValue []float64 `json:"TV"` |
||||
|
} |
@ -0,0 +1,162 @@ |
|||||
|
package consumers |
||||
|
|
||||
|
import ( |
||||
|
"encoding/json" |
||||
|
"goInOut/adaptors" |
||||
|
"goInOut/consumers/AXYraw" |
||||
|
"goInOut/dbOperate" |
||||
|
"goInOut/dbOperate/_kafka" |
||||
|
"goInOut/models" |
||||
|
"log" |
||||
|
"sync" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
type consumerAXYraw struct { |
||||
|
//数据缓存管道
|
||||
|
dataCache chan *models.EsRaw |
||||
|
//具体配置
|
||||
|
ConfigInfo AXYraw.ConfigFile |
||||
|
InKafka _kafka.KafkaHelper |
||||
|
OutEs dbOperate.ESHelper |
||||
|
infoRedis *dbOperate.RedisHelper |
||||
|
sinkRawMap sync.Map |
||||
|
lock sync.Mutex |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYraw) LoadConfigJson(cfgStr string) { |
||||
|
// 将 JSON 格式的数据解析到结构体中
|
||||
|
err := json.Unmarshal([]byte(cfgStr), &the.ConfigInfo) |
||||
|
if err != nil { |
||||
|
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) |
||||
|
panic(err) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYraw) Initial(cfg string) error { |
||||
|
the.sinkRawMap = sync.Map{} |
||||
|
the.dataCache = make(chan *models.EsRaw, 200) |
||||
|
|
||||
|
the.LoadConfigJson(cfg) |
||||
|
err := the.inputInitial() |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
err = the.outputInitial() |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
err = the.infoComponentInitial() |
||||
|
return err |
||||
|
} |
||||
|
func (the *consumerAXYraw) inputInitial() error { |
||||
|
//数据入口
|
||||
|
the.InKafka = _kafka.KafkaHelper{ |
||||
|
Brokers: the.ConfigInfo.IoConfig.In.Kafka.Brokers, |
||||
|
GroupId: the.ConfigInfo.IoConfig.In.Kafka.GroupId, |
||||
|
} |
||||
|
the.InKafka.Initial() |
||||
|
for _, inTopic := range the.ConfigInfo.IoConfig.In.Kafka.Topics { |
||||
|
the.InKafka.Subscribe(inTopic, the.onData) |
||||
|
} |
||||
|
|
||||
|
the.InKafka.Worker() |
||||
|
return nil |
||||
|
} |
||||
|
func (the *consumerAXYraw) outputInitial() error { |
||||
|
//数据出口
|
||||
|
the.OutEs = *dbOperate.NewESHelper( |
||||
|
the.ConfigInfo.IoConfig.Out.Es.Address, |
||||
|
the.ConfigInfo.IoConfig.Out.Es.Auth.UserName, |
||||
|
the.ConfigInfo.IoConfig.Out.Es.Auth.Password, |
||||
|
) |
||||
|
|
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYraw) infoComponentInitial() error { |
||||
|
//数据出口
|
||||
|
addr := the.ConfigInfo.Info.QueryComponent.Redis.Address |
||||
|
the.infoRedis = dbOperate.NewRedisHelper("", addr) |
||||
|
|
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYraw) sinkTask() { |
||||
|
intervalSec := the.ConfigInfo.IoConfig.Out.Es.Interval |
||||
|
ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) |
||||
|
defer ticker.Stop() |
||||
|
for { |
||||
|
<-ticker.C |
||||
|
the.toSink() |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYraw) toSink() { |
||||
|
var raws []models.EsRaw |
||||
|
the.lock.Lock() |
||||
|
defer the.lock.Unlock() |
||||
|
the.sinkRawMap.Range(func(key, value any) bool { |
||||
|
if v, ok := value.(*models.EsRaw); ok { |
||||
|
raws = append(raws, *v) |
||||
|
//零时打日志用
|
||||
|
if v.IotaDevice == logTagDeviceId { |
||||
|
bs, _ := json.Marshal(v) |
||||
|
log.Printf("toSink -> Range 标记设备数据 [%s] %s ", logTagDeviceId, string(bs)) |
||||
|
} |
||||
|
return ok |
||||
|
} else { |
||||
|
log.Printf("!!! toSink -> Range 类型转换异常 [%v]", key) |
||||
|
} |
||||
|
return true |
||||
|
}) |
||||
|
if len(raws) > 0 { |
||||
|
log.Printf("准备写入es %d条", len(raws)) |
||||
|
index := the.ConfigInfo.IoConfig.Out.Es.Index |
||||
|
the.OutEs.BulkWriteRaws2Es(index, raws) |
||||
|
the.sinkRawMap.Clear() |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
const logTagDeviceId = "91da4d1f-fbc7-4dad-bedd-f8ff05c0e0e0" |
||||
|
|
||||
|
func (the *consumerAXYraw) Work() { |
||||
|
go the.sinkTask() |
||||
|
go func() { |
||||
|
for { |
||||
|
pushEsRaw := <-the.dataCache |
||||
|
|
||||
|
if pushEsRaw.IotaDevice == logTagDeviceId { |
||||
|
bs, _ := json.Marshal(pushEsRaw) |
||||
|
log.Printf("存储 标记设备数据 [%s] %s ", logTagDeviceId, string(bs)) |
||||
|
} |
||||
|
|
||||
|
//有效数据存入缓存
|
||||
|
the.lock.Lock() |
||||
|
the.sinkRawMap.Store(pushEsRaw.IotaDevice, pushEsRaw) |
||||
|
the.lock.Unlock() |
||||
|
} |
||||
|
|
||||
|
}() |
||||
|
} |
||||
|
func (the *consumerAXYraw) onData(topic string, msg string) bool { |
||||
|
//if len(msg) > 80 {
|
||||
|
// log.Printf("recv:[%s]:%s ...", topic, msg[:80])
|
||||
|
//}
|
||||
|
adaptor := adaptors.Adaptor_AXY_LastRAW{ |
||||
|
Redis: the.infoRedis, |
||||
|
} |
||||
|
|
||||
|
needPush := adaptor.Transform(topic, msg) |
||||
|
|
||||
|
if needPush != nil && len(needPush.Meta) > 0 && needPush.Data != nil { |
||||
|
//日志标记
|
||||
|
if needPush.IotaDevice == logTagDeviceId { |
||||
|
bs, _ := json.Marshal(needPush) |
||||
|
log.Printf("onData -> needPush 标记设备数据 [%s] %s ", logTagDeviceId, string(bs)) |
||||
|
} |
||||
|
the.dataCache <- needPush |
||||
|
} |
||||
|
|
||||
|
return true |
||||
|
} |
@ -0,0 +1,102 @@ |
|||||
|
package consumers |
||||
|
|
||||
|
import ( |
||||
|
"encoding/json" |
||||
|
"fmt" |
||||
|
"goInOut/config" |
||||
|
"goInOut/consumers/HTTP_PRPXY" |
||||
|
"goInOut/dbOperate" |
||||
|
"goInOut/utils" |
||||
|
"io" |
||||
|
"log" |
||||
|
"net/http" |
||||
|
) |
||||
|
|
||||
|
type consumerHttpProxy struct { |
||||
|
//数据缓存管道
|
||||
|
routes map[string]config.Router |
||||
|
//具体配置
|
||||
|
Info HTTP_PRPXY.ConfigFile |
||||
|
InApiServer *dbOperate.ApiServerHelper |
||||
|
outHttpPost *dbOperate.HttpHelper |
||||
|
} |
||||
|
|
||||
|
func (the *consumerHttpProxy) LoadConfigJson(cfgStr string) { |
||||
|
// 将 JSON 格式的数据解析到结构体中
|
||||
|
err := json.Unmarshal([]byte(cfgStr), &the.Info) |
||||
|
if err != nil { |
||||
|
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) |
||||
|
panic(err) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func (the *consumerHttpProxy) Initial(cfg string) error { |
||||
|
the.routes = map[string]config.Router{} |
||||
|
the.LoadConfigJson(cfg) |
||||
|
err := the.InputInitial() |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
err = the.OutputInitial() |
||||
|
return err |
||||
|
} |
||||
|
func (the *consumerHttpProxy) InputInitial() error { |
||||
|
//数据入口
|
||||
|
the.InApiServer = dbOperate.NewApiServer( |
||||
|
the.Info.IoConfig.In.ApiServer.Port, |
||||
|
the.Info.IoConfig.In.ApiServer.Routers, |
||||
|
) |
||||
|
the.InApiServer.Initial() |
||||
|
return nil |
||||
|
} |
||||
|
func (the *consumerHttpProxy) OutputInitial() error { |
||||
|
//数据出口
|
||||
|
the.outHttpPost = &dbOperate.HttpHelper{ |
||||
|
Url: the.Info.IoConfig.Out.HttpPost.Url, |
||||
|
} |
||||
|
the.outHttpPost.Initial() |
||||
|
return nil |
||||
|
} |
||||
|
func (the *consumerHttpProxy) Work() { |
||||
|
go func() { |
||||
|
for _, router := range the.Info.IoConfig.In.ApiServer.Routers { |
||||
|
the.InApiServer.RouteRegister(router.Router, the.onData) |
||||
|
the.routes[router.Router] = router |
||||
|
} |
||||
|
the.InApiServer.Run() |
||||
|
|
||||
|
}() |
||||
|
} |
||||
|
func (the *consumerHttpProxy) onData(w http.ResponseWriter, r *http.Request) { |
||||
|
w.Header().Set("Content-Type", "application/json") |
||||
|
body, err := io.ReadAll(r.Body) |
||||
|
|
||||
|
log.Printf("收到 %s 请求 %s", r.RequestURI, body) |
||||
|
|
||||
|
bodyObj := map[string]any{} |
||||
|
err = json.Unmarshal(body, &bodyObj) |
||||
|
if err != nil { |
||||
|
log.Printf("body 解析失败,请检查 %s", err.Error()) |
||||
|
return |
||||
|
} |
||||
|
routePath := fmt.Sprintf("%s %s", r.Method, r.RequestURI) |
||||
|
idTemplate := the.routes[routePath] |
||||
|
|
||||
|
id, err := utils.TextTemplateMatch(bodyObj, idTemplate.IdTemplate) |
||||
|
if err != nil { |
||||
|
log.Printf("解析id 失败,请检查 %s", err.Error()) |
||||
|
} |
||||
|
params := map[string]string{ |
||||
|
"id": id, |
||||
|
} |
||||
|
resp := "" |
||||
|
//沿用请求路由
|
||||
|
newUrl := the.outHttpPost.Url + r.URL.String() |
||||
|
resp, err = the.outHttpPost.HttpPostWithParams(newUrl, string(body), params) |
||||
|
if err != nil { |
||||
|
return |
||||
|
} |
||||
|
log.Printf("应答: %s", resp) |
||||
|
defer fmt.Fprintf(w, resp) |
||||
|
|
||||
|
} |
@ -0,0 +1,174 @@ |
|||||
|
package consumers |
||||
|
|
||||
|
import ( |
||||
|
"encoding/json" |
||||
|
"fmt" |
||||
|
"goInOut/adaptors" |
||||
|
"goInOut/consumers/SinoGnssMySQL" |
||||
|
"goInOut/dbOperate" |
||||
|
"goInOut/monitors" |
||||
|
"goInOut/utils" |
||||
|
"log" |
||||
|
"os" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
type consumerSinoGnssMySQL struct { |
||||
|
//数据缓存管道
|
||||
|
ch chan []adaptors.NeedPush |
||||
|
//具体配置
|
||||
|
Info SinoGnssMySQL.ConfigFile |
||||
|
InDB *dbOperate.DBHelper |
||||
|
outMqtt *dbOperate.MqttHelper |
||||
|
monitor *monitors.CommonMonitor |
||||
|
} |
||||
|
|
||||
|
func (the *consumerSinoGnssMySQL) LoadConfigJson(cfgStr string) { |
||||
|
// 将 JSON 格式的数据解析到结构体中
|
||||
|
err := json.Unmarshal([]byte(cfgStr), &the.Info) |
||||
|
if err != nil { |
||||
|
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) |
||||
|
panic(err) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func (the *consumerSinoGnssMySQL) Initial(cfg string) error { |
||||
|
the.LoadConfigJson(cfg) |
||||
|
err := the.InputInitial() |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
err = the.OutputInitial() |
||||
|
return err |
||||
|
} |
||||
|
func (the *consumerSinoGnssMySQL) InputInitial() error { |
||||
|
the.ch = make(chan []adaptors.NeedPush, 200) |
||||
|
//数据入口
|
||||
|
the.InDB = dbOperate.NewDBHelper( |
||||
|
the.Info.IoConfig.In.Db.Type, |
||||
|
the.Info.IoConfig.In.Db.ConnStr) |
||||
|
the.monitor = &monitors.CommonMonitor{ |
||||
|
MonitorHelper: &monitors.MonitorHelper{CronStr: the.Info.IoConfig.In.CronStr}, |
||||
|
} |
||||
|
the.monitor.Start() |
||||
|
the.monitor.RegisterFun(the.onData) |
||||
|
return nil |
||||
|
} |
||||
|
func (the *consumerSinoGnssMySQL) OutputInitial() error { |
||||
|
//数据出口
|
||||
|
the.outMqtt = dbOperate.MqttInitial( |
||||
|
the.Info.IoConfig.Out.Mqtt.Host, |
||||
|
the.Info.IoConfig.Out.Mqtt.Port, |
||||
|
the.Info.IoConfig.Out.Mqtt.ClientId, |
||||
|
the.Info.IoConfig.Out.Mqtt.UserName, |
||||
|
the.Info.IoConfig.Out.Mqtt.Password, |
||||
|
false, //按照具体项目来
|
||||
|
"") |
||||
|
return nil |
||||
|
} |
||||
|
func (the *consumerSinoGnssMySQL) Work() { |
||||
|
go func() { |
||||
|
for { |
||||
|
needPushList := <-the.ch |
||||
|
if len(the.ch) > 0 { |
||||
|
log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) |
||||
|
} |
||||
|
|
||||
|
for _, push := range needPushList { |
||||
|
if push.Topic != "" { |
||||
|
the.outMqtt.Publish(push.Topic, push.Payload) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
time.Sleep(100 * time.Millisecond) |
||||
|
} |
||||
|
}() |
||||
|
} |
||||
|
func (the *consumerSinoGnssMySQL) onData() { |
||||
|
recordInfo, err := readRecord() |
||||
|
if err != nil { |
||||
|
log.Printf("读取 缓存异常,err=%v", err.Error()) |
||||
|
return |
||||
|
} |
||||
|
sql := fmt.Sprintf(`select d.id,d.station_name,p.group_name,d.time,d.x,d.y,d.h from %s as d |
||||
|
LEFT JOIN datasolution as p |
||||
|
ON d.station_name=p.sn |
||||
|
where p.group_name is not null |
||||
|
and d.id > %d and d.id <= %d |
||||
|
ORDER BY p.group_name;`, recordInfo.TableName, recordInfo.Id, recordInfo.Id+200) |
||||
|
var GnssDatas []SinoGnssMySQL.GnssData |
||||
|
err = the.InDB.Query(&GnssDatas, sql) |
||||
|
if err != nil || len(GnssDatas) == 0 { |
||||
|
log.Printf("当前批次无数据,跳过") |
||||
|
return |
||||
|
} |
||||
|
maxId := MaxId(GnssDatas) |
||||
|
log.Printf("当前批次id=%d => %d", recordInfo.Id, maxId) |
||||
|
recordInfo.Id = maxId |
||||
|
adaptor := the.getAdaptor() |
||||
|
needPush := adaptor.Transform(GnssDatas) |
||||
|
if len(needPush) > 0 { |
||||
|
the.ch <- needPush |
||||
|
} |
||||
|
fileName := "cache.inout" |
||||
|
//发现新月新纪录
|
||||
|
newTableName := tableNameNow() |
||||
|
if recordInfo.TableName != newTableName { |
||||
|
recordInfo.TableName = newTableName |
||||
|
recordInfo.Id = 0 |
||||
|
} |
||||
|
cacheStr, _ := json.Marshal(recordInfo) |
||||
|
err = utils.SaveCache2File(string(cacheStr), fileName) |
||||
|
if err != nil { |
||||
|
log.Panicf("record id to file,error: %v", err.Error()) |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
func (the *consumerSinoGnssMySQL) getAdaptor() (adaptor adaptors.Adaptor_SINOMYSQL_AXYMQTT) { |
||||
|
|
||||
|
return adaptors.Adaptor_SINOMYSQL_AXYMQTT{} |
||||
|
} |
||||
|
|
||||
|
func readRecord() (SinoGnssMySQL.RecordInfo, error) { |
||||
|
fileName := "cache.inout" |
||||
|
//文件存在?
|
||||
|
isExist := utils.FileExists(fileName) |
||||
|
if !isExist { |
||||
|
// 文件不存在,创建文件
|
||||
|
file, err := os.Create(fileName) |
||||
|
if err != nil { |
||||
|
log.Panicf("Error creating file: %v", err) |
||||
|
} |
||||
|
defaultRecord := SinoGnssMySQL.RecordInfo{ |
||||
|
Id: 0, |
||||
|
TableName: tableNameNow(), |
||||
|
} |
||||
|
str, _ := json.Marshal(defaultRecord) |
||||
|
_, err = file.WriteString(string(str)) |
||||
|
if err != nil { |
||||
|
log.Panicf("file write error: %v", err.Error()) |
||||
|
return SinoGnssMySQL.RecordInfo{}, err |
||||
|
} |
||||
|
} |
||||
|
recordStr, err := utils.ReadCache2File(fileName) |
||||
|
if err != nil { |
||||
|
panic("") |
||||
|
} |
||||
|
record := SinoGnssMySQL.RecordInfo{} |
||||
|
err = json.Unmarshal([]byte(recordStr), &record) |
||||
|
return record, err |
||||
|
} |
||||
|
|
||||
|
func MaxId(GnssDatas []SinoGnssMySQL.GnssData) int64 { |
||||
|
maxId := GnssDatas[0].Id |
||||
|
for _, data := range GnssDatas { |
||||
|
if data.Id > maxId { |
||||
|
maxId = data.Id |
||||
|
} |
||||
|
} |
||||
|
return maxId |
||||
|
} |
||||
|
|
||||
|
func tableNameNow() string { |
||||
|
return "data_gnss_" + time.Now().Format("200601") |
||||
|
} |
@ -1,38 +0,0 @@ |
|||||
package dbHelper |
|
||||
|
|
||||
import ( |
|
||||
"fmt" |
|
||||
"github.com/gin-gonic/gin" |
|
||||
"log" |
|
||||
"net/http" |
|
||||
"time" |
|
||||
) |
|
||||
|
|
||||
type RouterFunc struct { |
|
||||
relativePath string //相对路由 如/gzm/data/upload
|
|
||||
funcType string // 方法类型 如 post ,get
|
|
||||
fun func(c *gin.Context) //方法
|
|
||||
} |
|
||||
|
|
||||
type ApiServerHelper struct { |
|
||||
Port uint16 |
|
||||
RoutFun map[string]RouterFunc |
|
||||
} |
|
||||
|
|
||||
func (the *ApiServerHelper) Initial() { |
|
||||
router := gin.Default() |
|
||||
for name, routerFunc := range the.RoutFun { |
|
||||
switch routerFunc.funcType { |
|
||||
case http.MethodGet: |
|
||||
router.GET(routerFunc.relativePath, routerFunc.fun) |
|
||||
case http.MethodPost: |
|
||||
router.GET(routerFunc.relativePath, routerFunc.fun) |
|
||||
default: |
|
||||
log.Printf("不支持的 [%s]方法类型 %s", routerFunc.relativePath, routerFunc.funcType) |
|
||||
continue |
|
||||
} |
|
||||
log.Printf("注册路由 %s,监听地址=%s", name, routerFunc.relativePath) |
|
||||
} |
|
||||
router.Run(fmt.Sprintf("0.0.0.0:%d", the.Port)) |
|
||||
time.Sleep(time.Second * 1) |
|
||||
} |
|
@ -0,0 +1,58 @@ |
|||||
|
package dbOperate |
||||
|
|
||||
|
import ( |
||||
|
"fmt" |
||||
|
"goInOut/config" |
||||
|
"log" |
||||
|
"net/http" |
||||
|
) |
||||
|
|
||||
|
type ApiServerHelper struct { |
||||
|
mux *http.ServeMux |
||||
|
routes []config.Router |
||||
|
port uint |
||||
|
server http.Server |
||||
|
} |
||||
|
|
||||
|
func NewApiServer(port uint, routes []config.Router) *ApiServerHelper { |
||||
|
return &ApiServerHelper{ |
||||
|
mux: http.NewServeMux(), |
||||
|
routes: routes, |
||||
|
port: port, |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func (the *ApiServerHelper) Initial() { |
||||
|
the.mux = http.NewServeMux() |
||||
|
|
||||
|
// 创建 HTTP 服务器
|
||||
|
the.server = http.Server{ |
||||
|
Handler: the.mux, |
||||
|
Addr: fmt.Sprintf(":%d", the.port), |
||||
|
} |
||||
|
|
||||
|
//the.routeRegister()
|
||||
|
|
||||
|
} |
||||
|
func (the *ApiServerHelper) Run() { |
||||
|
log.Printf("apiServer监听端口 %d", the.port) |
||||
|
the.server.ListenAndServe() |
||||
|
} |
||||
|
func (the *ApiServerHelper) routeRegister() { |
||||
|
|
||||
|
for _, route := range the.routes { |
||||
|
the.mux.HandleFunc(route.Router, func(w http.ResponseWriter, r *http.Request) { |
||||
|
w.Header().Set("Content-Type", "application/json") |
||||
|
s := fmt.Sprintf(`{"route":"%s","resp":"安全带状态应答"}`, route) |
||||
|
println("收到请求", route.Router) |
||||
|
fmt.Fprintf(w, s) |
||||
|
}) |
||||
|
log.Printf("注册路由 %s", route) |
||||
|
} |
||||
|
} |
||||
|
func (the *ApiServerHelper) RouteRegister(route string, handler func(w http.ResponseWriter, r *http.Request)) { |
||||
|
|
||||
|
the.mux.HandleFunc(route, handler) |
||||
|
log.Printf("注册路由 %s", route) |
||||
|
|
||||
|
} |
@ -0,0 +1,101 @@ |
|||||
|
package dbOperate |
||||
|
|
||||
|
import ( |
||||
|
_ "github.com/go-sql-driver/mysql" |
||||
|
"github.com/jmoiron/sqlx" |
||||
|
_ "github.com/lib/pq" |
||||
|
_ "github.com/mattn/go-sqlite3" |
||||
|
"log" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
const ( |
||||
|
postgres = iota + 1 |
||||
|
sqlite3 |
||||
|
) |
||||
|
|
||||
|
type DBHelper struct { |
||||
|
dbClient *sqlx.DB |
||||
|
dbType string |
||||
|
ConnectStr string |
||||
|
} |
||||
|
|
||||
|
func NewDBHelper(dbType string, connectStr string) *DBHelper { |
||||
|
the := &DBHelper{} |
||||
|
switch dbType { |
||||
|
case "postgres": |
||||
|
fallthrough |
||||
|
case "mysql": |
||||
|
fallthrough |
||||
|
case "sqlite3": |
||||
|
fallthrough |
||||
|
case "有效的数据库类型": |
||||
|
the.dbType = dbType |
||||
|
the.ConnectStr = connectStr |
||||
|
default: |
||||
|
log.Panicf("不支持的数据库类型=> %s", dbType) |
||||
|
|
||||
|
} |
||||
|
return the |
||||
|
} |
||||
|
|
||||
|
// sqlite3 => connectStr := os.Getwd() + "/db/cz.db"
|
||||
|
// mysql => "user:password@tcp(127.0.0.1:3306)/databaseName"
|
||||
|
// pg => "host=192.168.10.64 port=5432 user=postgres password=123456 dbname=headscale sslmode=disable"
|
||||
|
func (the *DBHelper) dbOpen() error { |
||||
|
var err error |
||||
|
tdb, err := sqlx.Open(the.dbType, the.ConnectStr) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
if err = tdb.Ping(); err != nil { |
||||
|
return err |
||||
|
} |
||||
|
the.dbClient = tdb |
||||
|
return nil |
||||
|
} |
||||
|
func (the *DBHelper) Exec(dbRecordSQL string) error { |
||||
|
if the.dbClient == nil { |
||||
|
if openErr := the.dbOpen(); openErr != nil { |
||||
|
//logger.Info("[%s]数据库链接失败,err=%v\n", time.Now().Format("2006-01-02 15:04:05.000"), openErr)
|
||||
|
return openErr |
||||
|
} |
||||
|
} |
||||
|
execResult, execErr := the.dbClient.Exec(dbRecordSQL) |
||||
|
defer the.dbClient.Close() |
||||
|
if execErr != nil { |
||||
|
return execErr |
||||
|
} |
||||
|
if n, err := execResult.RowsAffected(); n > 0 { |
||||
|
return nil |
||||
|
} else { |
||||
|
log.Printf("[%s]执行sql[%s]失败\n", time.Now().Format("2006-01-02 15:04:05.000"), dbRecordSQL) |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func (the *DBHelper) Query(dest any, sql string) error { |
||||
|
|
||||
|
start := time.Now() |
||||
|
|
||||
|
if the.dbClient == nil { |
||||
|
if err := the.dbOpen(); err != nil { |
||||
|
log.Printf("数据库链接失败:%s", err.Error()) |
||||
|
return err |
||||
|
} |
||||
|
} |
||||
|
err := the.dbClient.Select(dest, sql) |
||||
|
if err != nil { |
||||
|
log.Printf("数据库查询失败:%s,\n sql=%s", err.Error(), sql) |
||||
|
} |
||||
|
|
||||
|
durTime := time.Since(start).Seconds() |
||||
|
log.Printf("查询耗时:%v s", durTime) |
||||
|
return err |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func (the *DBHelper) Close() error { |
||||
|
return the.Close() |
||||
|
} |
@ -0,0 +1,37 @@ |
|||||
|
package dbOperate |
||||
|
|
||||
|
import ( |
||||
|
"goInOut/models" |
||||
|
"testing" |
||||
|
) |
||||
|
|
||||
|
type res struct { |
||||
|
RLLYCJ string `json:"LLYCJ"` |
||||
|
RLLCacheMap string `json:"LLCacheMap"` |
||||
|
} |
||||
|
|
||||
|
func TestRedis(t *testing.T) { |
||||
|
addr := "10.8.30.160:30379" |
||||
|
redis := NewRedisHelper("", addr) |
||||
|
|
||||
|
key1 := "RLLYCJ" |
||||
|
//v := redis.Get(key1)
|
||||
|
//println(v)
|
||||
|
|
||||
|
key2 := "RLLCacheMap" |
||||
|
res1 := res{} |
||||
|
|
||||
|
v2 := redis.MGet(&res1, key1, key2) |
||||
|
println(v2) |
||||
|
} |
||||
|
|
||||
|
func TestPg(t *testing.T) { |
||||
|
dbType := "postgres" |
||||
|
connectStr := "host=10.8.30.32 port=5432 user=postgres password=123 dbname=NBJJ1215-T sslmode=disable" |
||||
|
db := NewDBHelper(dbType, connectStr) |
||||
|
sql := "select * from t_agg_way" |
||||
|
var r []models.AggWay |
||||
|
db.Query(&r, sql) |
||||
|
println("===") |
||||
|
|
||||
|
} |
@ -0,0 +1,269 @@ |
|||||
|
package dbOperate |
||||
|
|
||||
|
import ( |
||||
|
"bytes" |
||||
|
"context" |
||||
|
"encoding/json" |
||||
|
"fmt" |
||||
|
elasticsearch6 "github.com/elastic/go-elasticsearch/v6" |
||||
|
"github.com/elastic/go-elasticsearch/v6/esapi" |
||||
|
"goInOut/models" |
||||
|
"io" |
||||
|
"log" |
||||
|
"strings" |
||||
|
) |
||||
|
|
||||
|
type ESHelper struct { |
||||
|
addresses []string |
||||
|
//org string
|
||||
|
esClient *elasticsearch6.Client |
||||
|
} |
||||
|
|
||||
|
func NewESHelper(addresses []string, user, pwd string) *ESHelper { |
||||
|
es, _ := elasticsearch6.NewClient(elasticsearch6.Config{ |
||||
|
Addresses: addresses, |
||||
|
Username: user, |
||||
|
Password: pwd, |
||||
|
}) |
||||
|
res, err := es.Info() |
||||
|
if err != nil { |
||||
|
log.Fatalf("Error getting response: %s", err) |
||||
|
} |
||||
|
log.Printf("链接到es[%s]info=%v", elasticsearch6.Version, res) |
||||
|
return &ESHelper{ |
||||
|
addresses: addresses, |
||||
|
esClient: es, |
||||
|
} |
||||
|
} |
||||
|
func (the *ESHelper) SearchRaw(index, reqBody string) []models.HitRaw { |
||||
|
body := &bytes.Buffer{} |
||||
|
body.WriteString(reqBody) |
||||
|
response, err := the.esClient.Search( |
||||
|
the.esClient.Search.WithIndex(index), |
||||
|
the.esClient.Search.WithBody(body), |
||||
|
) |
||||
|
defer response.Body.Close() |
||||
|
if err != nil { |
||||
|
return nil |
||||
|
} |
||||
|
r := models.EsRawResp{} |
||||
|
// Deserialize the response into a map.
|
||||
|
if err := json.NewDecoder(response.Body).Decode(&r); err != nil { |
||||
|
log.Fatalf("Error parsing the response body: %s", err) |
||||
|
} |
||||
|
return r.Hits.Hits |
||||
|
} |
||||
|
func (the *ESHelper) Search(index, reqBody string) { |
||||
|
body := &bytes.Buffer{} |
||||
|
body.WriteString(reqBody) |
||||
|
response, err := the.esClient.Search( |
||||
|
the.esClient.Search.WithIndex(index), |
||||
|
the.esClient.Search.WithBody(body), |
||||
|
) |
||||
|
|
||||
|
if err != nil { |
||||
|
//return nil, err
|
||||
|
} |
||||
|
log.Println(response.Status()) |
||||
|
var r map[string]any |
||||
|
// Deserialize the response into a map.
|
||||
|
if err := json.NewDecoder(response.Body).Decode(&r); err != nil { |
||||
|
log.Fatalf("Error parsing the response body: %s", err) |
||||
|
} |
||||
|
// Print the response status, number of results, and request duration.
|
||||
|
log.Printf( |
||||
|
"[%s] %d hits; took: %dms", |
||||
|
response.Status(), |
||||
|
int(r["hits"].(map[string]any)["total"].(float64)), |
||||
|
int(r["took"].(float64)), |
||||
|
) |
||||
|
|
||||
|
for _, hit := range r["hits"].(map[string]any)["hits"].([]any) { |
||||
|
|
||||
|
source := hit.(map[string]any)["_source"] |
||||
|
log.Printf(" * ID=%s, %s", hit.(map[string]any)["_id"], source) |
||||
|
} |
||||
|
log.Println(strings.Repeat("=", 37)) |
||||
|
} |
||||
|
func (the *ESHelper) request(index, reqBody string) (map[string]any, error) { |
||||
|
// Set up the request object.
|
||||
|
req := esapi.IndexRequest{ |
||||
|
Index: index, |
||||
|
//DocumentID: strconv.Itoa(i + 1),
|
||||
|
Body: strings.NewReader(reqBody), |
||||
|
Refresh: "true", |
||||
|
} |
||||
|
// Perform the request with the client.
|
||||
|
res, err := req.Do(context.Background(), the.esClient) |
||||
|
if err != nil { |
||||
|
log.Fatalf("Error getting response: %s", err) |
||||
|
} |
||||
|
defer res.Body.Close() |
||||
|
var r map[string]any |
||||
|
if res.IsError() { |
||||
|
log.Printf("[%s] Error indexing document ID=%d", res.Status(), 0) |
||||
|
} else { |
||||
|
// Deserialize the response into a map.
|
||||
|
|
||||
|
if err := json.NewDecoder(res.Body).Decode(&r); err != nil { |
||||
|
log.Printf("Error parsing the response body: %s", err) |
||||
|
} else { |
||||
|
// Print the response status and indexed document version.
|
||||
|
log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64))) |
||||
|
} |
||||
|
} |
||||
|
return r, err |
||||
|
} |
||||
|
|
||||
|
func (the *ESHelper) searchRaw(index, reqBody string) (models.IotaData, error) { |
||||
|
respmap, err := the.request(index, reqBody) |
||||
|
if respmap != nil { |
||||
|
|
||||
|
} |
||||
|
iotaDatas := models.IotaData{} |
||||
|
return iotaDatas, err |
||||
|
} |
||||
|
|
||||
|
func (the *ESHelper) searchThemes(index, reqBody string) (models.EsThemeResp, error) { |
||||
|
body := &bytes.Buffer{} |
||||
|
body.WriteString(reqBody) |
||||
|
response, err := the.esClient.Search( |
||||
|
the.esClient.Search.WithIndex(index), |
||||
|
the.esClient.Search.WithBody(body), |
||||
|
) |
||||
|
defer response.Body.Close() |
||||
|
if err != nil { |
||||
|
//return nil, err
|
||||
|
} |
||||
|
log.Println(response.Status()) |
||||
|
r := models.EsThemeResp{} |
||||
|
// Deserialize the response into a map.
|
||||
|
if err := json.NewDecoder(response.Body).Decode(&r); err != nil { |
||||
|
log.Fatalf("Error parsing the response body: %s", err) |
||||
|
} |
||||
|
return r, err |
||||
|
} |
||||
|
func (the *ESHelper) SearchLatestStationData(index string, sensorId int) (models.EsTheme, error) { |
||||
|
//sensorId := 178
|
||||
|
queryBody := fmt.Sprintf(`{ |
||||
|
"size": 1, |
||||
|
"query": { |
||||
|
"term": { |
||||
|
"sensor": { |
||||
|
"value": %d |
||||
|
} |
||||
|
} |
||||
|
}, |
||||
|
"sort": [ |
||||
|
{ |
||||
|
"collect_time": { |
||||
|
"order": "desc" |
||||
|
} |
||||
|
} |
||||
|
] |
||||
|
}`, sensorId) |
||||
|
//index := "go_native_themes"
|
||||
|
themes, err := the.searchThemes(index, queryBody) |
||||
|
|
||||
|
var theme models.EsTheme |
||||
|
if len(themes.Hits.Hits) > 0 { |
||||
|
theme = themes.Hits.Hits[0].Source |
||||
|
} |
||||
|
|
||||
|
return theme, err |
||||
|
} |
||||
|
func (the *ESHelper) BulkWrite(index, reqBody string) { |
||||
|
|
||||
|
body := &bytes.Buffer{} |
||||
|
body.WriteString(reqBody) |
||||
|
bulkRequest := esapi.BulkRequest{ |
||||
|
Index: index, |
||||
|
Body: body, |
||||
|
DocumentType: "_doc", |
||||
|
} |
||||
|
res, err := bulkRequest.Do(context.Background(), the.esClient) |
||||
|
defer res.Body.Close() |
||||
|
if err != nil { |
||||
|
log.Panicf("es 写入[%s],err=%s", index, err.Error()) |
||||
|
return |
||||
|
} |
||||
|
respBody, _ := io.ReadAll(res.Body) |
||||
|
if res.StatusCode != 200 && res.StatusCode != 201 { |
||||
|
log.Panicf("es 写入失败,err=%s \n body=%s", string(respBody), reqBody) |
||||
|
} |
||||
|
//log.Printf("es 写入[%s],完成,res=%s ", index, respBody)
|
||||
|
|
||||
|
} |
||||
|
|
||||
|
func (the *ESHelper) BulkWriteWithLog(index, reqBody string) { |
||||
|
|
||||
|
body := &bytes.Buffer{} |
||||
|
body.WriteString(reqBody) |
||||
|
bulkRequest := esapi.BulkRequest{ |
||||
|
Index: index, |
||||
|
Body: body, |
||||
|
DocumentType: "_doc", |
||||
|
} |
||||
|
res, err := bulkRequest.Do(context.Background(), the.esClient) |
||||
|
defer res.Body.Close() |
||||
|
if err != nil { |
||||
|
log.Panicf("es 写入[%s],err=%s", index, err.Error()) |
||||
|
return |
||||
|
} |
||||
|
respBody, _ := io.ReadAll(res.Body) |
||||
|
if res.StatusCode != 200 && res.StatusCode != 201 { |
||||
|
log.Panicf("es 写入失败,err=%s \n body=%s", string(respBody), reqBody) |
||||
|
} |
||||
|
log.Printf("es 写入[%s],完成,res=%s ", index, respBody) |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func (the *ESHelper) BulkWriteRaws2Es(index string, raws []models.EsRaw) { |
||||
|
|
||||
|
//log 测试用
|
||||
|
const logTagDeviceId = "91da4d1f-fbc7-4dad-bedd-f8ff05c0e0e0" |
||||
|
|
||||
|
logTag := false |
||||
|
|
||||
|
body := strings.Builder{} |
||||
|
for _, raw := range raws { |
||||
|
// scala => val id = UUID.nameUUIDFromBytes(s"${v.deviceId}-${v.acqTime.getMillis}".getBytes("UTF-8")).toString
|
||||
|
source, _ := json.Marshal(raw) |
||||
|
_id := raw.IotaDevice |
||||
|
s := fmt.Sprintf( |
||||
|
`{"index": {"_index": "%s","_id": "%s"}} |
||||
|
%s |
||||
|
`, index, _id, source) |
||||
|
body.WriteString(s) |
||||
|
|
||||
|
if raw.IotaDevice == logTagDeviceId { |
||||
|
log.Printf("BulkWriteRaws2Es 标记设备数据 [%s] %s ", logTagDeviceId, string(s)) |
||||
|
logTag = true |
||||
|
} |
||||
|
} |
||||
|
if logTag { //追踪数据
|
||||
|
the.BulkWriteWithLog(index, body.String()) |
||||
|
} else { |
||||
|
the.BulkWrite(index, body.String()) |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func (the *ESHelper) BulkWriteRaws2EsLast(index string, raws []models.EsRaw) { |
||||
|
body := strings.Builder{} |
||||
|
for _, raw := range raws { |
||||
|
source, _ := json.Marshal(raw) |
||||
|
_id := raw.IotaDevice |
||||
|
s := fmt.Sprintf( |
||||
|
`{"index": {"_index": "%s","_id": "%s"}} |
||||
|
%s |
||||
|
`, index, _id, source) |
||||
|
body.WriteString(s) |
||||
|
} |
||||
|
the.BulkWrite(index, body.String()) |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func (the *ESHelper) Close() { |
||||
|
|
||||
|
} |
@ -1,4 +1,4 @@ |
|||||
package dbHelper |
package dbOperate |
||||
|
|
||||
import ( |
import ( |
||||
"io" |
"io" |
@ -0,0 +1,133 @@ |
|||||
|
package dbOperate |
||||
|
|
||||
|
import ( |
||||
|
"context" |
||||
|
"encoding/json" |
||||
|
"errors" |
||||
|
"github.com/redis/go-redis/v9" |
||||
|
"log" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
type RedisHelper struct { |
||||
|
rdb redis.UniversalClient |
||||
|
isReady bool |
||||
|
ctx context.Context |
||||
|
} |
||||
|
|
||||
|
func NewRedisHelper(master string, address ...string) *RedisHelper { |
||||
|
r := &RedisHelper{ctx: context.Background()} |
||||
|
r.InitialCluster(master, address...) |
||||
|
return r |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func (the *RedisHelper) InitialCluster(master string, address ...string) { |
||||
|
|
||||
|
the.rdb = redis.NewUniversalClient(&redis.UniversalOptions{ |
||||
|
Addrs: address, |
||||
|
MasterName: master, |
||||
|
}) |
||||
|
log.Printf("redis 初始化完成 %s", address) |
||||
|
the.isReady = true |
||||
|
} |
||||
|
|
||||
|
func (the *RedisHelper) Get(key string) string { |
||||
|
val, err := the.rdb.Get(the.ctx, key).Result() |
||||
|
if errors.Is(err, redis.Nil) { |
||||
|
log.Printf("%s does not exist", key) |
||||
|
} else if err != nil { |
||||
|
panic(err) |
||||
|
} else { |
||||
|
//log.Printf("get key => %s =%s", key, val)
|
||||
|
} |
||||
|
return val |
||||
|
} |
||||
|
|
||||
|
func (the *RedisHelper) GetObj(keys string, addr any) error { |
||||
|
err := the.rdb.Get(the.ctx, keys).Scan(addr) |
||||
|
if errors.Is(err, redis.Nil) { |
||||
|
log.Printf("%s does not exist", keys) |
||||
|
} else if err != nil { |
||||
|
es := err.Error() |
||||
|
log.Printf("err=%s ", es) |
||||
|
return err |
||||
|
} |
||||
|
return nil |
||||
|
} |
||||
|
func (the *RedisHelper) SetObj(keys string, obj any) error { |
||||
|
rs, err := the.rdb.Set(the.ctx, keys, obj, time.Minute*5).Result() |
||||
|
log.Printf("rs=%s ", rs) |
||||
|
if err != nil { |
||||
|
log.Printf("err=%s ", err.Error()) |
||||
|
} |
||||
|
return err |
||||
|
} |
||||
|
func (the *RedisHelper) GetLRange(keys string, addr any) error { |
||||
|
err := the.rdb.LRange(the.ctx, keys, 0, -1).ScanSlice(addr) |
||||
|
if errors.Is(err, redis.Nil) { |
||||
|
log.Printf("%s does not exist", keys) |
||||
|
} else if err != nil { |
||||
|
log.Printf("err=%s ", err.Error()) |
||||
|
return err |
||||
|
} |
||||
|
return nil |
||||
|
} |
||||
|
func (the *RedisHelper) MGet(addr any, keys ...string) error { |
||||
|
err := the.rdb.MGet(the.ctx, keys...).Scan(addr) |
||||
|
if errors.Is(err, redis.Nil) { |
||||
|
log.Printf("%s does not exist", keys) |
||||
|
} else if err != nil { |
||||
|
log.Printf("err=%s ", err.Error()) |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
return err |
||||
|
} |
||||
|
func (the *RedisHelper) MGetObj(addr any, keys ...string) error { |
||||
|
err := the.rdb.MGet(the.ctx, keys...).Scan(addr) |
||||
|
if errors.Is(err, redis.Nil) { |
||||
|
log.Printf("%s does not exist", keys) |
||||
|
} else if err != nil { |
||||
|
log.Printf("err=%s ", err.Error()) |
||||
|
return err |
||||
|
} |
||||
|
return nil |
||||
|
} |
||||
|
func (the *RedisHelper) HMGetObj(addr any, key, field string) error { |
||||
|
rp, err := the.rdb.HMGet(the.ctx, key, field).Result() |
||||
|
if errors.Is(err, redis.Nil) { |
||||
|
log.Printf("%s does not exist", key) |
||||
|
return err |
||||
|
} else if err != nil { |
||||
|
log.Printf("err=%s ", err.Error()) |
||||
|
return err |
||||
|
} |
||||
|
for _, i := range rp { |
||||
|
if v, ok := i.(string); ok { |
||||
|
err := json.Unmarshal([]byte(v), addr) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
//todo scan有问题 后续待排查
|
||||
|
return nil |
||||
|
|
||||
|
//err := the.rdb.HMGet(the.ctx, key, field).Scan(addr)
|
||||
|
//if errors.Is(err, redis.Nil) {
|
||||
|
// log.Printf("%s does not exist", key)
|
||||
|
//} else if err != nil {
|
||||
|
// log.Printf("err=%s ", err.Error())
|
||||
|
// return err
|
||||
|
//}
|
||||
|
//return nil
|
||||
|
} |
||||
|
|
||||
|
func (the *RedisHelper) SRem(key string, members ...string) int64 { |
||||
|
return the.rdb.SRem(the.ctx, key, members).Val() |
||||
|
} |
||||
|
|
||||
|
func (the *RedisHelper) SAdd(key string, members ...string) int64 { |
||||
|
return the.rdb.SAdd(the.ctx, key, members).Val() |
||||
|
} |
@ -1,4 +1,4 @@ |
|||||
package dbHelper |
package dbOperate |
||||
|
|
||||
import ( |
import ( |
||||
"fmt" |
"fmt" |
@ -1,18 +0,0 @@ |
|||||
2024/10/17 11:33:20.528503 main.go:24: =================log start================= |
|
||||
2024/10/17 11:33:20.540075 main.go:25: ==> |
|
||||
2024/10/17 11:33:20.540075 main.go:30: 进程启动 |
|
||||
2024/10/17 11:33:20.540075 init.go:20: 加载配置文件:config_魏家滑坡_视觉位移.json |
|
||||
2024/10/17 11:33:20.540592 init.go:20: 加载配置文件:弃用备份 |
|
||||
2024/10/17 11:33:20.540592 init.go:22: 非文件[弃用备份]跳过 |
|
||||
2024/10/17 11:33:22.546148 mqttHelper.go:48: mqtt连接状态异常 tcp://10.8.30.160:1883(u:wjhp-sjwy-upload,p:123456,cid:wjhp-sjwy-upload) [err=network Error : dial tcp 10.8.30.160:1883: connectex: No connection could be made because the target machine actively refused it.] |
|
||||
2024/10/17 11:33:22.546148 mqttHelper.go:49: mqtt重连,30s后尝试,剩余次数=3 |
|
||||
2024/10/17 11:33:53.265276 main.go:24: =================log start================= |
|
||||
2024/10/17 11:33:53.285624 main.go:25: ==> |
|
||||
2024/10/17 11:33:53.285624 main.go:30: 进程启动 |
|
||||
2024/10/17 11:33:53.286127 init.go:20: 加载配置文件:config_魏家滑坡_视觉位移.json |
|
||||
2024/10/17 11:33:53.286698 init.go:20: 加载配置文件:弃用备份 |
|
||||
2024/10/17 11:33:53.286698 init.go:22: 非文件[弃用备份]跳过 |
|
||||
2024/10/17 11:33:53.289208 mqttHelper.go:28: mqtt触发重连后的重订阅 |
|
||||
2024/10/17 11:33:55.290027 mqttHelper.go:100: =================开始订阅 10.8.30.160 [/fs-flexometer/admin123]================= |
|
||||
2024/10/17 11:34:12.251376 mqttHelper.go:28: mqtt触发重连后的重订阅 |
|
||||
2024/10/17 11:34:12.251376 mqttHelper.go:100: =================开始订阅 10.8.30.160 [/fs-flexometer/admin123]================= |
|
@ -0,0 +1,7 @@ |
|||||
|
package models |
||||
|
|
||||
|
type GDND2luAn struct { |
||||
|
Id string `json:"id"` |
||||
|
Time string `json:"time"` |
||||
|
Value map[string][]float32 `json:"value"` |
||||
|
} |
@ -0,0 +1,40 @@ |
|||||
|
package models |
||||
|
|
||||
|
import ( |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
type IotaData struct { |
||||
|
UserId string `json:"userId"` |
||||
|
ThingId string `json:"thingId"` |
||||
|
DimensionId string `json:"dimensionId"` |
||||
|
DimCapId string `json:"dimCapId"` |
||||
|
CapId string `json:"capId"` |
||||
|
DeviceId string `json:"deviceId"` |
||||
|
ScheduleId string `json:"scheduleId"` |
||||
|
TaskId string `json:"taskId"` |
||||
|
JobId int `json:"jobId"` |
||||
|
JobRepeatId int `json:"jobRepeatId"` |
||||
|
TriggerTime time.Time `json:"triggerTime"` |
||||
|
RealTime time.Time `json:"realTime"` |
||||
|
FinishTime time.Time `json:"finishTime"` |
||||
|
Seq int `json:"seq"` |
||||
|
Released bool `json:"released"` |
||||
|
Data Data `json:"data"` |
||||
|
} |
||||
|
|
||||
|
type Data struct { |
||||
|
Type int `json:"type"` |
||||
|
Data map[string]any `json:"data"` |
||||
|
Result struct { |
||||
|
Code int `json:"code"` |
||||
|
Msg string `json:"msg"` |
||||
|
Detail string `json:"detail"` |
||||
|
ErrTimes int `json:"errTimes"` |
||||
|
Dropped bool `json:"dropped"` |
||||
|
} `json:"result"` |
||||
|
} |
||||
|
|
||||
|
func (the *Data) Success() bool { |
||||
|
return the.Result.Code == 0 |
||||
|
} |
@ -0,0 +1,6 @@ |
|||||
|
package models |
||||
|
|
||||
|
type AggWay struct { |
||||
|
Id int64 `db:"id"` |
||||
|
Name string `db:"name"` |
||||
|
} |
@ -0,0 +1,6 @@ |
|||||
|
package models |
||||
|
|
||||
|
const ( |
||||
|
RawTypeVib = "vib" |
||||
|
RawTypeDiag = "diag" |
||||
|
) |
@ -0,0 +1,107 @@ |
|||||
|
package models |
||||
|
|
||||
|
import ( |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
type DeviceData struct { |
||||
|
DeviceId string |
||||
|
Name string |
||||
|
ThingId string |
||||
|
StructId int |
||||
|
TaskId string |
||||
|
AcqTime time.Time |
||||
|
RealTime time.Time |
||||
|
ErrCode int |
||||
|
Raw map[string]any |
||||
|
RawUnit map[string]string |
||||
|
DeviceInfo DeviceInfo |
||||
|
DimensionId string |
||||
|
//数据类型 常见有 comm="" ,RawTypeVib="vib"
|
||||
|
DataType string |
||||
|
} |
||||
|
|
||||
|
func (d *DeviceData) GetVibrationData() VibrationData { |
||||
|
vibData := VibrationData{} |
||||
|
if d.DataType == RawTypeVib { |
||||
|
if v, ok := d.Raw["filterFreq"]; ok { |
||||
|
if vv, ok := v.(float64); ok { |
||||
|
vibData.FilterFreq = vv |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
if v, ok := d.Raw["sampleFreq"]; ok { |
||||
|
if vv, ok := v.(float64); ok { |
||||
|
vibData.SampleFreq = vv |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
if v, ok := d.Raw["gainAmplifier"]; ok { |
||||
|
if vv, ok := v.(float64); ok { |
||||
|
vibData.GainAmplifier = byte(vv) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
if v, ok := d.Raw["version"]; ok { |
||||
|
if vv, ok := v.(float64); ok { |
||||
|
vibData.Version = byte(vv) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
if v, ok := d.Raw["triggerType"]; ok { |
||||
|
if vv, ok := v.(float64); ok { |
||||
|
vibData.TriggerType = byte(vv) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
if v, ok := d.Raw["physicalvalue"]; ok { |
||||
|
|
||||
|
if vSlice, ok := v.([]any); ok { |
||||
|
for _, vObj := range vSlice { |
||||
|
if vv, ok := vObj.(float64); ok { |
||||
|
vibData.Data = append(vibData.Data, vv) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
|
||||
|
//去直流
|
||||
|
if len(vibData.Data) > 0 { |
||||
|
avg := func(dataArray []float64) float64 { |
||||
|
sum := 0.0 |
||||
|
for _, f := range dataArray { |
||||
|
sum += f |
||||
|
} |
||||
|
return sum / float64(len(dataArray)) |
||||
|
}(vibData.Data) //common_calc.GetAvg(vibData.Data)
|
||||
|
|
||||
|
for i := 0; i < len(vibData.Data); i++ { |
||||
|
vibData.Data[i] = vibData.Data[i] - avg |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
return vibData |
||||
|
} |
||||
|
|
||||
|
// VibrationData 振动数据
|
||||
|
type VibrationData struct { |
||||
|
Version byte |
||||
|
SampleFreq float64 |
||||
|
FilterFreq float64 |
||||
|
GainAmplifier byte |
||||
|
TriggerType byte |
||||
|
Data []float64 // 原始波形数据
|
||||
|
Unit string |
||||
|
} |
||||
|
|
||||
|
func (v *VibrationData) FormatParams() map[string]any { |
||||
|
return map[string]any{ |
||||
|
"sampleFreq": v.SampleFreq, |
||||
|
"version": v.Version, |
||||
|
"filterFreq": v.FilterFreq, |
||||
|
"gainAmplifier": v.GainAmplifier, |
||||
|
"triggerType": v.TriggerType, |
||||
|
} |
||||
|
} |
@ -0,0 +1,70 @@ |
|||||
|
package models |
||||
|
|
||||
|
import ( |
||||
|
"encoding/json" |
||||
|
"fmt" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
type DeviceInfo struct { |
||||
|
Id string `json:"id"` |
||||
|
Name string `json:"name"` |
||||
|
Structure Structure `json:"structure"` |
||||
|
DeviceMeta DeviceMeta `json:"device_meta"` |
||||
|
RefreshTime time.Time |
||||
|
} |
||||
|
|
||||
|
type DeviceMeta struct { |
||||
|
Id string `json:"id"` |
||||
|
Name string `json:"name"` |
||||
|
Model string `json:"model"` |
||||
|
Properties []IotaProperty `json:"properties"` |
||||
|
Capabilities []IotaCapability `json:"capabilities"` |
||||
|
} |
||||
|
|
||||
|
func (m *DeviceMeta) GetOutputProps() (out map[string]string) { |
||||
|
out = make(map[string]string) |
||||
|
if len(m.Capabilities) == 0 { |
||||
|
return |
||||
|
} |
||||
|
for _, property := range m.Capabilities[0].Properties { |
||||
|
info := fmt.Sprintf("%s(%s)", property.ShowName, property.Unit) |
||||
|
out[property.Name] = info |
||||
|
} |
||||
|
return |
||||
|
} |
||||
|
func (m *DeviceMeta) GetOutputUnit() (out map[string]string) { |
||||
|
out = make(map[string]string) |
||||
|
if len(m.Capabilities) == 0 { |
||||
|
return |
||||
|
} |
||||
|
for _, property := range m.Capabilities[0].Properties { |
||||
|
out[property.Name] = property.Unit |
||||
|
} |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
// redis序列化
|
||||
|
func (m *DeviceMeta) MarshalBinary() (data []byte, err error) { |
||||
|
return json.Marshal(m) |
||||
|
} |
||||
|
|
||||
|
// redis序列化
|
||||
|
func (m *DeviceMeta) UnmarshalBinary(data []byte) error { |
||||
|
return json.Unmarshal(data, m) |
||||
|
|
||||
|
} |
||||
|
|
||||
|
type IotaCapability struct { |
||||
|
CapabilityCategoryId int `json:"capabilityCategoryId"` |
||||
|
Id string `json:"id"` |
||||
|
Name string `json:"name"` |
||||
|
Properties []IotaProperty `json:"properties"` |
||||
|
} |
||||
|
|
||||
|
type IotaProperty struct { |
||||
|
Category string `json:"category"` |
||||
|
Name string `json:"name"` |
||||
|
ShowName string `json:"showName"` |
||||
|
Unit string `json:"unit"` |
||||
|
} |
@ -0,0 +1,37 @@ |
|||||
|
package models |
||||
|
|
||||
|
import "time" |
||||
|
|
||||
|
type EsRaw struct { |
||||
|
StructId int `json:"structId"` |
||||
|
IotaDeviceName string `json:"iota_device_name"` |
||||
|
Data map[string]any `json:"data"` |
||||
|
CollectTime string `json:"collect_time"` |
||||
|
Meta map[string]string `json:"meta"` |
||||
|
IotaDevice string `json:"iota_device"` |
||||
|
CreateTime time.Time `json:"create_time"` |
||||
|
} |
||||
|
|
||||
|
type EsRawResp struct { |
||||
|
Took int `json:"took"` |
||||
|
TimedOut bool `json:"timed_out"` |
||||
|
Shards struct { |
||||
|
Total int `json:"total"` |
||||
|
Successful int `json:"successful"` |
||||
|
Skipped int `json:"skipped"` |
||||
|
Failed int `json:"failed"` |
||||
|
} `json:"_shards"` |
||||
|
Hits struct { |
||||
|
Total int `json:"total"` |
||||
|
MaxScore float64 `json:"max_score"` |
||||
|
Hits []HitRaw `json:"hits"` |
||||
|
} `json:"hits"` |
||||
|
} |
||||
|
|
||||
|
type HitRaw struct { |
||||
|
Index string `json:"_index"` |
||||
|
Type string `json:"_type"` |
||||
|
Id string `json:"_id"` |
||||
|
Score float64 `json:"_score"` |
||||
|
Source EsRaw `json:"_source"` |
||||
|
} |
@ -0,0 +1,41 @@ |
|||||
|
package models |
||||
|
|
||||
|
import "time" |
||||
|
|
||||
|
type EsTheme struct { |
||||
|
SensorName string `json:"sensor_name"` |
||||
|
FactorName string `json:"factor_name"` |
||||
|
FactorProtoCode string `json:"factor_proto_code"` |
||||
|
Data map[string]any `json:"data"` |
||||
|
FactorProtoName string `json:"factor_proto_name"` |
||||
|
Factor int `json:"factor"` |
||||
|
CollectTime time.Time `json:"collect_time"` |
||||
|
Sensor int `json:"sensor"` |
||||
|
Structure int `json:"structure"` |
||||
|
IotaDevice []string `json:"iota_device"` |
||||
|
CreateTime time.Time `json:"create_time"` |
||||
|
} |
||||
|
|
||||
|
type EsThemeResp struct { |
||||
|
Took int `json:"took"` |
||||
|
TimedOut bool `json:"timed_out"` |
||||
|
Shards struct { |
||||
|
Total int `json:"total"` |
||||
|
Successful int `json:"successful"` |
||||
|
Skipped int `json:"skipped"` |
||||
|
Failed int `json:"failed"` |
||||
|
} `json:"_shards"` |
||||
|
Hits struct { |
||||
|
Total int `json:"total"` |
||||
|
MaxScore float64 `json:"max_score"` |
||||
|
Hits []HitTheme `json:"hits"` |
||||
|
} `json:"hits"` |
||||
|
} |
||||
|
|
||||
|
type HitTheme struct { |
||||
|
Index string `json:"_index"` |
||||
|
Type string `json:"_type"` |
||||
|
Id string `json:"_id"` |
||||
|
Score float64 `json:"_score"` |
||||
|
Source EsTheme `json:"_source"` |
||||
|
} |
@ -0,0 +1,25 @@ |
|||||
|
package models |
||||
|
|
||||
|
import ( |
||||
|
"encoding/json" |
||||
|
) |
||||
|
|
||||
|
type IotaDevice struct { |
||||
|
Id string `json:"id"` |
||||
|
Name string `json:"name"` |
||||
|
Properties string `json:"properties"` |
||||
|
DeviceMetaId string `json:"deviceMetaId"` |
||||
|
ThingId string `json:"thingId"` |
||||
|
DeviceMeta DeviceMeta `json:"deviceMeta"` |
||||
|
} |
||||
|
|
||||
|
// redis序列化
|
||||
|
func (m *IotaDevice) MarshalBinary() (data []byte, err error) { |
||||
|
return json.Marshal(m) |
||||
|
} |
||||
|
|
||||
|
// redis序列化
|
||||
|
func (m *IotaDevice) UnmarshalBinary(data []byte) error { |
||||
|
return json.Unmarshal(data, m) |
||||
|
|
||||
|
} |
@ -0,0 +1,31 @@ |
|||||
|
package models |
||||
|
|
||||
|
import ( |
||||
|
"encoding/json" |
||||
|
) |
||||
|
|
||||
|
type Structure struct { |
||||
|
ThingId string `json:"thingId"` |
||||
|
Id int `json:"id"` |
||||
|
Name string `json:"name"` |
||||
|
Type string `json:"type"` |
||||
|
OrgId int `json:"orgId"` |
||||
|
} |
||||
|
|
||||
|
type ThingStruct struct { |
||||
|
ThingId string `json:"thingId"` |
||||
|
Id int `json:"id"` |
||||
|
Name string `json:"name"` |
||||
|
Type string `json:"type"` |
||||
|
OrgId int `json:"orgId"` |
||||
|
} |
||||
|
|
||||
|
// redis序列化
|
||||
|
func (m *ThingStruct) MarshalBinary() (data []byte, err error) { |
||||
|
return json.Marshal(m) |
||||
|
} |
||||
|
|
||||
|
// redis序列化
|
||||
|
func (m *ThingStruct) UnmarshalBinary(data []byte) error { |
||||
|
return json.Unmarshal(data, m) |
||||
|
} |
@ -0,0 +1,14 @@ |
|||||
|
package monitors |
||||
|
|
||||
|
type CommonMonitor struct { |
||||
|
*MonitorHelper |
||||
|
} |
||||
|
|
||||
|
func (the *CommonMonitor) RegisterFun(fun func()) { |
||||
|
the.registerFun(fun) |
||||
|
} |
||||
|
|
||||
|
func (the *CommonMonitor) Start() { |
||||
|
the.initial() |
||||
|
the.monitorStart() |
||||
|
} |
@ -0,0 +1,80 @@ |
|||||
|
package testUnit |
||||
|
|
||||
|
import ( |
||||
|
"encoding/json" |
||||
|
"fmt" |
||||
|
"goInOut/models" |
||||
|
"goInOut/utils" |
||||
|
"testing" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
// 定义一个结构体,包含嵌套的结构体字段
|
||||
|
type Person struct { |
||||
|
Name Name `json:"name"` |
||||
|
} |
||||
|
|
||||
|
type Name struct { |
||||
|
First string `json:"first"` |
||||
|
Last string `json:"last"` |
||||
|
} |
||||
|
|
||||
|
func Test_httpProxy(t *testing.T) { |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func Test_template(t *testing.T) { |
||||
|
// 创建一个Person实例
|
||||
|
person := Person{ |
||||
|
Name: Name{ |
||||
|
First: "John", |
||||
|
Last: "Doe", |
||||
|
}, |
||||
|
} |
||||
|
|
||||
|
templateStr := "Hello, {{.Name.First}} !" |
||||
|
sbs, err := utils.TextTemplateMatch(person, templateStr) |
||||
|
println(sbs) |
||||
|
if err != nil { |
||||
|
fmt.Println(err.Error()) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func Test_timeHandler(t *testing.T) { |
||||
|
rawMsg := `{ |
||||
|
"userId": "ce2d7eb2-e56e-422e-8bbe-95dfa18e32f8", |
||||
|
"thingId": "14862308-083e-46e1-a422-d7d6a1e2825d", |
||||
|
"dimensionId": "47386f69-c5aa-4ae7-b6cc-0490a1dc0b14", |
||||
|
"dimCapId": "0d561c0b-4dca-4104-abc0-1f0c40a71382", |
||||
|
"capId": "d4965875-354b-4294-87f4-c4ba9f9260ab", |
||||
|
"deviceId": "9c43a09c-3c65-42d3-9a54-42b87e0e5af2", |
||||
|
"scheduleId": "1cfebf18-81a2-489e-bcfb-efc294d8ce3d", |
||||
|
"taskId": "b58858ed-9e23-4ac9-9f9c-44f9e057aee9", |
||||
|
"jobId": 1, |
||||
|
"jobRepeatId": 1, |
||||
|
"triggerTime": "2024-12-04T18:23:04+16:00", |
||||
|
"realTime": "0001-01-01T00:00:00Z", |
||||
|
"finishTime": "2024-12-04T10:23:07.909922675+08:00", |
||||
|
"seq": 0, |
||||
|
"released": false, |
||||
|
"data": { |
||||
|
"type": 1, |
||||
|
"data": { |
||||
|
"physicalvalue": 0 |
||||
|
}, |
||||
|
"result": { |
||||
|
"code": 0, |
||||
|
"msg": "", |
||||
|
"detail": null, |
||||
|
"errTimes": 0, |
||||
|
"dropped": false |
||||
|
} |
||||
|
} |
||||
|
}` |
||||
|
|
||||
|
iotaData := models.IotaData{} |
||||
|
json.Unmarshal([]byte(rawMsg), &iotaData) |
||||
|
var cstZone = time.FixedZone("CST", 8*3600) // 东八区
|
||||
|
time := iotaData.TriggerTime.In(cstZone).Format("2006-01-02T15:04:05.000+0800") |
||||
|
println(time) |
||||
|
} |
File diff suppressed because one or more lines are too long
@ -0,0 +1,53 @@ |
|||||
|
package utils |
||||
|
|
||||
|
import ( |
||||
|
"fmt" |
||||
|
"log" |
||||
|
"os" |
||||
|
) |
||||
|
|
||||
|
func SaveCache2File(cacheStr string, fileName string) error { |
||||
|
// 打开文件,如果文件不存在则创建
|
||||
|
file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) |
||||
|
if err != nil { |
||||
|
log.Println("Error opening file:", err) |
||||
|
return err |
||||
|
} |
||||
|
defer file.Close() |
||||
|
|
||||
|
// 将变量写入文件
|
||||
|
_, err = file.WriteString(cacheStr) |
||||
|
if err != nil { |
||||
|
log.Println("Error writing to file:", err) |
||||
|
} |
||||
|
return err |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func ReadCache2File(fileName string) (string, error) { |
||||
|
// 打开文件
|
||||
|
file, err := os.Open(fileName) |
||||
|
if err != nil { |
||||
|
log.Println("Error opening file:", err) |
||||
|
return "", err |
||||
|
} |
||||
|
defer file.Close() |
||||
|
|
||||
|
// 读取文件内容
|
||||
|
var content string |
||||
|
_, err = fmt.Fscanf(file, "%s", &content) |
||||
|
if err != nil { |
||||
|
log.Println("Error reading from file:", err) |
||||
|
} |
||||
|
return content, err |
||||
|
} |
||||
|
|
||||
|
func FileExists(filePath string) bool { |
||||
|
_, err := os.Stat(filePath) |
||||
|
if err != nil { |
||||
|
if os.IsNotExist(err) { |
||||
|
return false |
||||
|
} |
||||
|
} |
||||
|
return true |
||||
|
} |
@ -0,0 +1,22 @@ |
|||||
|
package utils |
||||
|
|
||||
|
import ( |
||||
|
"strings" |
||||
|
"text/template" |
||||
|
) |
||||
|
|
||||
|
func TextTemplateMatch(obj any, textTemplate string) (string, error) { |
||||
|
// 定义模板字符串
|
||||
|
tmpl, err := template.New("template").Parse(textTemplate) |
||||
|
if err != nil { |
||||
|
panic(err) |
||||
|
} |
||||
|
|
||||
|
sb := strings.Builder{} |
||||
|
err = tmpl.Execute(&sb, obj) |
||||
|
if err != nil { |
||||
|
return "", err |
||||
|
} |
||||
|
sbs := sb.String() |
||||
|
return sbs, err |
||||
|
} |
Loading…
Reference in new issue