Compare commits

...

6 Commits

  1. 33
      config.yaml
  2. 2
      containerApp/go.mod
  3. 4
      et_calc/group/calcTask_test.go
  4. 21
      et_calc/group/groupCalc.go
  5. 8
      master/app/et_master.go
  6. 4
      master/go.mod
  7. 5
      node/app/app.go
  8. 8
      node/app/et_node.go
  9. 4
      node/go.mod
  10. 2
      node/stages/stage.go

33
config.yaml

@ -1,35 +1,34 @@
pprof:
enable: false
kafka:
groupId: consumer_group_et_go3
groupId: local_et_go2
brokers:
- 10.8.30.160:30992
- 10.8.30.142:30992
topics:
# 输入流
# data_theme: native_theme
data_raw: RawData
data_agg: native_agg
# 输出流
data_theme: native_theme
# data_agg: native_agg
alarm_iota: Alert
alarm_anxinyun: native_alarm
redis:
address: 10.8.30.160:30379
address: 10.8.30.142:30379
es:
enable: true
addresses:
- http://10.8.30.160:30092
- http://10.8.30.142:30092
# - http://10.8.30.160:30092
user: ""
pwd: ""
index:
raw: native_raws
vib: native_vbraws
theme: native_themes
raw: native_raws_new10
vib: native_vbraws_new10
theme: native_themes_new10
group: native_group_themes
influxDB:
enable: true
address: http://10.8.30.160:30086
token: EJcUCii65nAWBivWjV_7ro-V1yAiarPD41iuxJgRPZBRqXst4hSPt7L8VEhUoY_hdR0kvmPHFN5lv1wlX12t-A==
organization: dongjiang
address: http://10.8.30.142:30086
token: m-biJsdVzL0nxmz4fqYeE5p0mia_FiMFwGr8em09EjBJVr5pSFZ37G8FBDlx7uxDTyKtpMgfEvte9bZaOuMBLg==
organization: localtest
buckets:
raw: raw
vib: vib
@ -42,18 +41,18 @@ prometheus:
push:
mqtt:
enable: false
host: 10.8.30.160
host: 10.8.30.142
port: 30883
clientIdPrefix: push_et_go
kafka:
enable: false
groupId: push_et_go_lk
brokers:
- 10.8.30.160:30992
- 10.8.30.142:30992
# 节点信息 #
master:
port: 50000
hostNameTag: "0" #多状态副本 master 节点的 hostName 标记
hostNameTag: "0" #多状态副本 master 节点的 hostName 标记 DESKTOP-1QK809D
node:
remoteMasterHost: 10.8.30.104 #用于 node Register -> master
hostIpPrefix: 10.8. #多网卡筛选ip网段

2
containerApp/go.mod

@ -2,7 +2,7 @@ module containerApp
go 1.23.1
require gitea.anxinyun.cn/container/common_utils v0.0.9
require gitea.anxinyun.cn/container/common_utils v0.0.13
require (
github.com/fsnotify/fsnotify v1.7.0 // indirect

4
et_calc/group/calcTask_test.go

@ -10,8 +10,8 @@ import (
"time"
)
var configHelper = common_utils.NewConfigHelper("10.8.30.160:30379")
var group, _ = configHelper.GetStationGroup(36)
var configHelper = common_utils.NewConfigHelper("10.8.30.142:30379")
var group, _ = configHelper.GetStationGroup(54)
var processDataStrWSD_noFormula = `
{"DeviceData":{"DeviceId":"ed0f1d94-49a9-415b-9336-f965a2b0a985","Name":"温湿度传感器m1","ThingId":"5da9aa1b-05b7-4943-be57-dedb34f7a1bd","StructId":3,"TaskId":"0e1c7d3d-257d-4763-a335-198aef0fc625","AcqTime":"2024-03-20T04:20:48.000125336+08:00","RealTime":"2024-03-20T04:20:48.000251942+08:00","ErrCode":0,"Raw":{"Temp":27.9,"humidy":13.5},"RawUnit":{"Temp":"℃","humidy":"%"},"DeviceInfo":{"id":"ed0f1d94-49a9-415b-9336-f965a2b0a985","name":"温湿度传感器m1","structure":{"thingId":"5da9aa1b-05b7-4943-be57-dedb34f7a1bd","id":3,"name":"添加边坡","type":"边坡","orgId":1,"latitude":0,"longitude":0},"device_meta":{"id":"d5bf6f22-3d7a-4ab0-9043-1020e9516bcc","name":"温湿度传感器","model":"FS-BDS-WSD","properties":[],"capabilities":[{"capabilityCategoryId":3,"id":"d2add1b3-b21c-420b-82a4-e0c55ee3a019","name":"采集","properties":[{"category":"Output","name":"Temp","showName":"温度","unit":"℃"},{"category":"Output","name":"humidy","showName":"湿度","unit":"%"}]}]}},"DimensionId":"76c75371-bb9a-4f71-a25d-58adf7938296","DataType":""},"Stations":[{"Info":{"id":197,"name":"温湿度测点1","structure":3,"thingId":"5da9aa1b-05b7-4943-be57-dedb34f7a1bd","struct_name":"添加边坡","factor":2,"manual_data":false,"formula":0,"params_value":null,"Factor":{"id":2,"name":"环境温湿度","protoCode":"1002","protoName":"温湿度","items":[{"id":3,"name":"温度","field_name":"temperature","unit_name":"℃","precision":0},{"id":4,"name":"湿度","field_name":"humidity","unit_name":"%RH","precision":0}],"units":{"humidity":"%RH","temperature":"℃"}},"proto":"1002","Proto":{"code":"1002","name":"温湿度","items":[{"id":3,"name":"温度","field_name":"temperature","unit_name":"℃","precision":0},{"id":4,"name":"湿度","field_name":"humidity","unit_name":"%RH","precision":0}]},"Devices":[{"formula_id":0,"params":{},"iota_device_id":"ed0f1d94-49a9-415b-9336-f965a2b0a985","iota_device_serial":0,"FormulaInfo":{"id":0,"expression":"","params":null,"ioFields":{"input":null,"output":null},"type":""},"DeviceFactorProto":{"formula":0,"field_val":{"Temp":"temperature","humidy":"humidity"},"FieldValUnitK":null}}],"Labels":"{}","CombineInfo":""},"Data":{"DeviceCalcData":null,"ThemeData":null,"CollectTime":"0001-01-01T00:00:00Z","AlarmLevel":0},"Threshold":{"Items":[{"item":3,"field_name":"temperature","name":"温度","level":1,"lower":20,"upper":100000,"begin":null,"end":null},{"item":3,"field_name":"temperature","name":"温度","level":2,"lower":10,"upper":20,"begin":null,"end":null},{"item":4,"field_name":"humidity","name":"湿度","level":1,"lower":6,"upper":16,"begin":null,"end":null}]}}]}`

21
et_calc/group/groupCalc.go

@ -43,10 +43,12 @@ func GetGroupInfo(station common_models.Station) []GroupStation {
result := make([]GroupStation, 0)
if gps != nil {
for i := 0; i < len(gps); i++ {
result = append(result, GroupStation{
Group: gps[i],
Station: station,
})
if gps[i].Id != 0 {
result = append(result, GroupStation{
Group: gps[i],
Station: station,
})
}
}
}
@ -91,9 +93,12 @@ func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_mode
resultStations = append(resultStations, station)
} else {
calcRet, err := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime)
if err == nil {
log.Printf("************* 95行 沉降分组计算成功,返回记录数 len(calcRet)={%d} *************", len(calcRet))
resultStations = append(resultStations, calcRet...)
} else {
println(err)
}
log.Printf("************* 98行 分组一次处理,缓存记录数 len(resultStations)={%d} *************", len(resultStations))
}
@ -104,15 +109,16 @@ func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_mode
for _, station := range resultStations {
if len(station.Data.ThemeData) > 0 {
////TODO #TEST BEGIN | filterSensorIds 需要排查的测点ID,主要排查分组测点主题数据未入ES的问题。
//filterSensorIds := []int{42, 15, 32, 43, 13, 33, 17, 14}
//filterSensorIds := []int{18, 20, 21}
//for _, number := range filterSensorIds {
// if number == station.Info.Id {
// log.Printf("*************** 116行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data)
// log.Printf("*************** 110行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data)
// break
// }
//}
//// #TEST END
// 需要返回的测点
//log.Printf("*************** 121行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data)
result = append(result, station)
} else {
@ -199,7 +205,8 @@ func (gc *GroupCalc) cacheAndCalc(station *common_models.Station, dimensionId st
gc.dumpGroupTheme(task, calcRet)
} else {
task.SetTimeout()
log.Println("****沉降分组计算任务超期策略dimensionId,taskId,acqTime,injectTime,deadlineTime****", task.stationGroup.Id, task.stationGroup.Name, task.dimensionId, task.taskId, task.acqTime, task.InjectTime, task.DeadlineTime)
log.Printf("****[%d][%s]沉降分组计算任务超期策略: dimensionId:%v\n taskId:%v\n acqTime:%v\n injectTime:%v\n deadlineTime:%v\n",
task.stationGroup.Id, task.stationGroup.Name, task.dimensionId, task.taskId, task.acqTime, task.InjectTime, task.DeadlineTime)
calcTasks.Store(key, *task)
}
}

8
master/app/et_master.go

@ -71,6 +71,7 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) {
//数据类型注册
gob.Register([]interface{}{})
for {
log.Println("L74 nodeCount: %d", the.nodeMapCount())
if the.nodeMapCount() == 0 {
log.Printf("nodeList is empty!")
time.Sleep(time.Second * 10)
@ -79,6 +80,7 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) {
select {
case stopEnable := <-the.sleepCH:
log.Println("L83 nodeCount: %d", the.nodeMapCount())
if stopEnable {
stopTime := time.Second * 10
log.Printf("node 处理积压,%v,master 暂停 %v", stopEnable, stopTime)
@ -91,8 +93,10 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) {
select {
case data := <-dataChannels.RawDataChan:
log.Println("L96 nodeCount: %d", the.nodeMapCount())
the.notifyData(&data, the.callNodeService)
case data := <-dataChannels.AggDataChan:
log.Println("L99 nodeCount: %d", the.nodeMapCount())
the.notifyData(&data, the.callNodeService)
//default:
// time.Sleep(100 * time.Millisecond)
@ -175,7 +179,7 @@ func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrac
// RPC调用结果
errorCode := 0
timeoutMills := 5 * 1000 * time.Millisecond
timeoutMills := 300 * 1000 * time.Millisecond // 5分组
select {
case reply := <-resultCH:
// reply 0=false(RPC访问结果返回false),1=true(RPC访问结果返回true),2访问RPC网络异常
@ -349,7 +353,7 @@ func (the *EtMaster) printNodes() {
return true
})
countInfo := fmt.Sprintf("共[%d]个节点:\n ", count)
log.Printf("%s %s", countInfo, info)
log.Printf("%s %s\n", countInfo, info)
}
func (the *EtMaster) errorHandle(errCode int, address string, dataDesc string) {
val, ok := the.nodeMap.Load(address)

4
master/go.mod

@ -3,8 +3,8 @@ module master
go 1.23.1
require (
gitea.anxinyun.cn/container/common_models v0.0.10
gitea.anxinyun.cn/container/common_utils v0.0.8
gitea.anxinyun.cn/container/common_models v0.0.12
gitea.anxinyun.cn/container/common_utils v0.0.13
)
require (

5
node/app/app.go

@ -8,7 +8,6 @@ import (
"et_calc"
"et_calc/group"
"et_print"
"et_push"
"et_sink"
"fmt"
"gitea.anxinyun.cn/container/common_utils/configLoad"
@ -107,8 +106,8 @@ func addWorkStages(nodeStageManage *stages.StageManager) *stages.StageManager {
nodeStageManage.AddStages(stationAnalyzeHandler.GetStage())
// 数据推送
publishHandler := et_push.NewPushHandler()
nodeStageManage.AddStages(publishHandler.GetStage())
//publishHandler := et_push.NewPushHandler()
//nodeStageManage.AddStages(publishHandler.GetStage())
return nodeStageManage
}

8
node/app/et_node.go

@ -75,6 +75,9 @@ func isSettleData(data map[string]interface{}) bool {
// ConsumerProcess 将 IotaData -> ProcessData
func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error {
// 记录方法开始时间
startTime := time.Now()
//TODO #TEST BEGIN 测试静力水准仪 (现在有计算公式的单测点计算有问题,为了能跑通 沉降分组计算 测试)
//if !isSettleData(iotaData.Data.Data) {
// return nil
@ -96,6 +99,11 @@ func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error {
DeviceData: *deviceData,
Stations: []common_models.Station{},
}
defer func() {
duration := time.Since(startTime)
log.Printf("ConsumerProcess(iotaData *common_models.IotaData)执行时长: %v", duration)
}()
return nil
}

4
node/go.mod

@ -3,8 +3,8 @@ module node
go 1.23.1
require (
gitea.anxinyun.cn/container/common_models v0.0.11
gitea.anxinyun.cn/container/common_utils v0.0.12
gitea.anxinyun.cn/container/common_models v0.0.12
gitea.anxinyun.cn/container/common_utils v0.0.13
github.com/google/uuid v1.6.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)

2
node/stages/stage.go

@ -61,7 +61,7 @@ func (s *Stage) process(data *common_models.ProcessData) *common_models.ProcessD
return data
}
func (s *Stage) handlerTimeOutCheck(stageName, deviceId string) {
defaultTimeout := 10 * time.Second
defaultTimeout := 240 * time.Second // 4分钟
select {
case <-s.execOver:
case <-time.After(defaultTimeout):

Loading…
Cancel
Save