Compare commits

...

7 Commits

Author SHA1 Message Date
yfh c61ea64bc1 odeList改为sync.Map; 4 weeks ago
yfh a029dadf64 删除注释 4 weeks ago
yfh 6e8fea7023 expireSeconds小于3分钟,返回180秒 4 weeks ago
yfh b50c15a362 Staions.Data.PhyData赋值 4 weeks ago
yfh 7c8ceddcca Staions.Data.PhyData赋值 4 weeks ago
yfh d00f169f8f calcTasks改为sync.Map 4 weeks ago
yfh d18c3bc18c 添加注释 4 weeks ago
  1. 6
      dataSource/kafka/kafka_handler.go
  2. 22
      et_Info/InfoHandler.go
  3. 1
      et_cache/cacheHandler.go
  4. 6
      et_calc/dataCalc.go
  5. 7
      et_calc/group/calcTask.go
  6. 123
      et_calc/group/groupCalc.go
  7. 4
      et_calc/group/timeStrategy.go
  8. 267
      master/app/et_master.go
  9. 8
      node/app/app.go
  10. 29
      node/app/et_node.go
  11. 1
      node/stages/stage.go

6
dataSource/kafka/kafka_handler.go

@ -2,9 +2,9 @@ package kafka
import ( import (
"dataSource" "dataSource"
"fmt"
"gitea.anxinyun.cn/container/common_utils/configLoad" "gitea.anxinyun.cn/container/common_utils/configLoad"
"gitea.anxinyun.cn/container/common_utils/kafkaHelper" "gitea.anxinyun.cn/container/common_utils/kafkaHelper"
"log"
) )
type KafkaDataSource struct { type KafkaDataSource struct {
@ -40,7 +40,7 @@ func (s *KafkaDataSource) Producer() {
// 创建消息处理器 // 创建消息处理器
handler := NewMessageHandler(cfgName) handler := NewMessageHandler(cfgName)
if handler == nil { if handler == nil {
fmt.Printf("No handler found for topic %s\n", cfgName) log.Printf("Kafka topic【%s】未定义处理者。\n", cfgName)
continue continue
} }
// 订阅主题 和 消息处理 // 订阅主题 和 消息处理
@ -60,8 +60,10 @@ type IMessageHandler interface {
func NewMessageHandler(cfgName string) IMessageHandler { func NewMessageHandler(cfgName string) IMessageHandler {
switch cfgName { switch cfgName {
case "data_raw": case "data_raw":
log.Printf("Kafka topic【%s】已注册处理者IotaDataHandler\n", cfgName)
return IotaDataHandler{} return IotaDataHandler{}
case "data_agg": case "data_agg":
log.Printf("Kafka topic【%s】已注册处理者AggDataHandler\n", cfgName)
return NewAggDataHandler() return NewAggDataHandler()
default: default:
return nil return nil

22
et_Info/InfoHandler.go

@ -29,6 +29,11 @@ func (the *InfoHandler) GetStage() stages.Stage {
func (the *InfoHandler) getStationInfo(p *common_models.ProcessData) *common_models.ProcessData { func (the *InfoHandler) getStationInfo(p *common_models.ProcessData) *common_models.ProcessData {
// TODO 测试 DeviceId = 22c76344-1eb2-4508-8aa6-4550c010e8f7 ,sensorId=18
//if p.DeviceData.DeviceId != "22c76344-1eb2-4508-8aa6-4550c010e8f7" {
// return &common_models.ProcessData{}
//}
s, err := the.configHelper.GetDeviceStationObjs(p.DeviceData.DeviceId) s, err := the.configHelper.GetDeviceStationObjs(p.DeviceData.DeviceId)
if err == nil && s != nil { if err == nil && s != nil {
p.Stations = s p.Stations = s
@ -64,15 +69,24 @@ func (the *InfoHandler) getFormulaInfo(p *common_models.ProcessData) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
for i2, device := range p.Stations[i].Info.Devices { //for i2, device := range p.Stations[i].Info.Devices {
formulaInfo, err := the.configHelper.GetFormulaInfo(device.FormulaId) // formulaInfo, err := the.configHelper.GetFormulaInfo(device.FormulaId)
// if err == nil {
// p.Stations[i].Info.Devices[i2].FormulaInfo = formulaInfo
// p.Stations[i].Info.Devices[i2].DeviceFactorProto = deviceFactorProto
// }
//}
// TODO #TEST BEGIN 2024-10-01 测点设备没有公式信息,测试时先从设备监测原型中获取
for i2, _ := range p.Stations[i].Info.Devices {
p.Stations[i].Info.Devices[i2].FormulaId = deviceFactorProto.Formula
formulaInfo, err := the.configHelper.GetFormulaInfo(deviceFactorProto.Formula)
if err == nil { if err == nil {
p.Stations[i].Info.Devices[i2].FormulaInfo = formulaInfo p.Stations[i].Info.Devices[i2].FormulaInfo = formulaInfo
p.Stations[i].Info.Devices[i2].DeviceFactorProto = deviceFactorProto p.Stations[i].Info.Devices[i2].DeviceFactorProto = deviceFactorProto
} }
} }
// #TEST END
} }
} }

1
et_cache/cacheHandler.go

@ -64,6 +64,7 @@ func (the *CacheHandler) enqueue(p *common_models.ProcessData) *common_models.Pr
needItemCache.Data, isWinCalcValid = the.windowCalc(v, cacheWindow) needItemCache.Data, isWinCalcValid = the.windowCalc(v, cacheWindow)
if isWinCalcValid { if isWinCalcValid {
station.Data.ThemeData[item.FieldName] = needItemCache.Data station.Data.ThemeData[item.FieldName] = needItemCache.Data
station.Data.PhyData[item.FieldName] = needItemCache.Data
} }
} }
} }

6
et_calc/dataCalc.go

@ -122,8 +122,14 @@ func (the *CalcHandler) calcFormula(p *common_models.ProcessData) *common_models
} }
//todo 测点多设备特殊处理.. //todo 测点多设备特殊处理..
phyData := map[string]any{}
for k, v := range outMap {
phyData[k] = v
}
if len(p.Stations[i].Info.Devices) == 1 { if len(p.Stations[i].Info.Devices) == 1 {
p.Stations[i].Data.ThemeData = outMap p.Stations[i].Data.ThemeData = outMap
p.Stations[i].Data.PhyData = phyData
p.Stations[i].Data.CollectTime = p.DeviceData.AcqTime p.Stations[i].Data.CollectTime = p.DeviceData.AcqTime
} }
} }

7
et_calc/group/calcTask.go

@ -13,7 +13,7 @@ import (
//沉降分组业务处理说明: //沉降分组业务处理说明:
//dimensionId 对应WEB端配置:组网配置/采集策略 //dimensionId 对应WEB端配置:组网配置/采集策略
// taskId是维度下的schema每次调用的时候,生成一个唯一的 //taskId是维度下的schema每次调用的时候,生成的一个编码
//分组配置要求:分组计算中的测点的【采集策略】同一个周期采集 //分组配置要求:分组计算中的测点的【采集策略】同一个周期采集
//特殊场景:上报类测点,要进行分组计算,需要在协议里处理,输出_acq_number确保一致 //特殊场景:上报类测点,要进行分组计算,需要在协议里处理,输出_acq_number确保一致
@ -44,6 +44,7 @@ func (t *CalcTask) AddStationData(data common_models.Station) {
return return
} }
t.stationMap[data.Info.Id] = data t.stationMap[data.Info.Id] = data
log.Println(t.R())
} }
// CheckIntegrity 检查计算项是否完整 // CheckIntegrity 检查计算项是否完整
@ -70,6 +71,10 @@ func (t *CalcTask) SetTimeout() int {
expireSeconds = FromDimension(t.dimensionId) expireSeconds = FromDimension(t.dimensionId)
} }
// 小于3分钟,返回180秒
if expireSeconds < 60*3 {
expireSeconds = 60 * 3
}
t.SetDeadLineTime(expireSeconds) t.SetDeadLineTime(expireSeconds)
return expireSeconds return expireSeconds
} }

123
et_calc/group/groupCalc.go

@ -16,7 +16,7 @@ import (
var ( var (
configHelperInstance *common_utils.ConfigHelper configHelperInstance *common_utils.ConfigHelper
once sync.Once once sync.Once
mu sync.Mutex calcTasks sync.Map
) )
func GetConfigHelper() *common_utils.ConfigHelper { func GetConfigHelper() *common_utils.ConfigHelper {
@ -57,7 +57,6 @@ type GroupCalc struct {
stage *stages.Stage stage *stages.Stage
configHelper *common_utils.ConfigHelper configHelper *common_utils.ConfigHelper
signCalc chan bool signCalc chan bool
calcTasks map[GroupCalcTaskKey]CalcTask
esHandler *et_sink.SinkHandler esHandler *et_sink.SinkHandler
} }
@ -66,11 +65,10 @@ func NewGroupCalc() *GroupCalc {
stage: stages.NewStage("测点分组计算"), stage: stages.NewStage("测点分组计算"),
configHelper: GetConfigHelper(), configHelper: GetConfigHelper(),
signCalc: make(chan bool), signCalc: make(chan bool),
calcTasks: map[GroupCalcTaskKey]CalcTask{},
esHandler: et_sink.NewSinkGroupHandler(), esHandler: et_sink.NewSinkGroupHandler(),
} }
// 添加到 etNode 处理环节,实现数据加工 (缓存group各分项的主题数据 -> 分组计算 -> 分组数据) // 添加到 etNode 处理环节,实现数据加工 (缓存group各分项的主题数据 -> 分组计算、分组数据存储ES)
calcTaskManager.stage.AddProcess(calcTaskManager.processData) calcTaskManager.stage.AddProcess(calcTaskManager.processData)
return calcTaskManager return calcTaskManager
} }
@ -81,47 +79,77 @@ func (gc *GroupCalc) GetStage() stages.Stage {
// processData 的 stations 被改变了 // processData 的 stations 被改变了
func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_models.ProcessData { func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_models.ProcessData {
log.Printf("************* 82行 分组一次处理开始 len(inData.Stations)=%d *************", len(inData.Stations))
resultStations := make([]common_models.Station, 0) resultStations := make([]common_models.Station, 0)
// 分组超时任务 // 分组超时任务
for key, task := range gc.calcTasks { resultStations = append(resultStations, gc.processTimeoutTasks()...)
if task.IsTimeout() {
calcedStations := task.Calc()
resultStations = append(resultStations, calcedStations...)
// ES存储分组主题数据
//dumpStr, groupTheme := task.ToDump(calcedStations)
//gc.esHandler.SinkGroupDataToES(groupTheme)
//log.Printf("[groupCalc.processData]group timeout calc。 %s \n", dumpStr)
gc.dumpGroupTheme(&task, calcedStations)
delete(gc.calcTasks, key)
}
}
// 常规分组计算
for _, station := range inData.Stations { for _, station := range inData.Stations {
if station.Info.Group.Id == 0 || station.Info.Group.GroupType != groupType.Settlement { if station.Info.Group.Id == 0 || station.Info.Group.GroupType != groupType.Settlement {
//log.Printf("非【液压传感器测量沉降】分组。") //log.Printf("非【液压传感器测量沉降】分组。")
resultStations = append(resultStations, station) resultStations = append(resultStations, station)
} else { } else {
calcedStations, err := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime) calcRet, err := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime)
if err != nil { if err == nil {
resultStations = append(resultStations, station) log.Printf("************* 95行 沉降分组计算成功,返回记录数 len(calcRet)={%d} *************", len(calcRet))
resultStations = append(resultStations, calcRet...)
}
log.Printf("************* 98行 分组一次处理,缓存记录数 len(resultStations)={%d} *************", len(resultStations))
}
}
// 计算失败的测点不返回
result := make([]common_models.Station, 0)
for _, station := range resultStations {
if len(station.Data.ThemeData) > 0 {
////TODO #TEST BEGIN | filterSensorIds 需要排查的测点ID,主要排查分组测点主题数据未入ES的问题。
//filterSensorIds := []int{42, 15, 32, 43, 13, 33, 17, 14}
//for _, number := range filterSensorIds {
// if number == station.Info.Id {
// log.Printf("*************** 116行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data)
// break
// }
//}
//// #TEST END
// 需要返回的测点
result = append(result, station)
} else { } else {
resultStations = append(resultStations, calcedStations...) log.Printf("*************** 119行 分组计算异常的测点数据 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data)
}
} }
} }
// 返回处理后的数据,计算后的resultStations可能为空 // 计算后的 result 可能为空
inData.Stations = resultStations log.Printf("************* 124行 分组一次处理结束 len(inData.Stations)=%d *************", len(result))
inData.Stations = result
return inData return inData
} }
func (gc *GroupCalc) processTimeoutTasks() []common_models.Station {
resultStations := make([]common_models.Station, 0)
calcTasks.Range(func(key, value interface{}) bool {
//log.Println("Key:", key, "Value:", value)
task := value.(CalcTask)
if task.IsTimeout() {
calcRet := task.Calc()
resultStations = append(resultStations, calcRet...)
// ES存储分组主题数据
gc.dumpGroupTheme(&task, calcRet)
calcTasks.Delete(key)
}
return true // 返回 true 继续遍历,返回 false 停止遍历
})
return resultStations
}
// ES存储分组主题 // ES存储分组主题
func (gc *GroupCalc) dumpGroupTheme(task *CalcTask, calcedStations []common_models.Station) { func (gc *GroupCalc) dumpGroupTheme(task *CalcTask, calcedStations []common_models.Station) {
dumpStr, groupTheme := task.ToDump(calcedStations) dumpStr, esDoc := task.ToDump(calcedStations)
log.Println(dumpStr) log.Println(dumpStr)
if len(groupTheme.Data) > 0 { if len(esDoc.Data) > 0 {
gc.esHandler.SinkGroupDataToES(groupTheme) gc.esHandler.SinkGroupDataToES(esDoc)
} }
} }
@ -140,48 +168,39 @@ func (gc *GroupCalc) cacheAndCalc(station *common_models.Station, dimensionId st
} }
key := GroupCalcTaskKey{GroupId: sGroup.Id, TaskId: taskId} key := GroupCalcTaskKey{GroupId: sGroup.Id, TaskId: taskId}
if calcTask, ok := gc.calcTasks[key]; ok { if value, ok := calcTasks.Load(key); ok {
calcTask := value.(CalcTask)
calcTask.AddStationData(*station) calcTask.AddStationData(*station)
log.Println(calcTask.R())
// 分组计算 // 分组计算
if calcTask.CheckIntegrity() { if calcTask.CheckIntegrity() {
secs := calcTask.ElapsedSecs() secs := calcTask.ElapsedSecs()
if secs > 10 { if secs > 10 {
log.Printf("[groupCalc.processData] group calc wait %f秒, %s\n", secs, key.R()) log.Printf("[groupCalc.processData] wait %f秒, %s\n", secs, key.R())
} }
calcRet := calcTask.Calc()
calcedStations := calcTask.Calc() if calcRet != nil && len(calcRet) > 0 {
// 缓存已计算数据 resultStations = append(resultStations, calcRet...)
if calcedStations != nil && len(calcedStations) > 0 {
resultStations = append(resultStations, calcedStations...)
} }
// ES存储分组主题数据
//dumpStr, groupTheme := calcTask.ToDump(calcedStations)
//gc.esHandler.SinkGroupDataToES(groupTheme)
//log.Println(dumpStr)
gc.dumpGroupTheme(&calcTask, calcedStations)
delete(gc.calcTasks, key) // ES存储分组主题数据
gc.dumpGroupTheme(&calcTask, calcRet)
calcTasks.Delete(key)
} }
} else { } else {
// 不存在的计算任务:要取到首个元素的设备的采集策略(维度),以便后面获得过期时长 // 新计算任务:取到第一个的采集策略(维度),以便后面获得过期时长
task := NewGroupCalcTask(&sGroup, dimensionId, taskId, acqTime) task := NewGroupCalcTask(&sGroup, dimensionId, taskId, acqTime)
task.AddStationData(*station) task.AddStationData(*station)
log.Println(task.R())
if task.CheckIntegrity() { if task.CheckIntegrity() {
calcedStations := task.Calc() calcRet := task.Calc()
// 缓存已计算数据 if calcRet != nil && len(calcRet) > 0 {
if calcedStations != nil && len(calcedStations) > 0 { resultStations = append(resultStations, calcRet...)
resultStations = append(resultStations, calcedStations...)
} }
// ES存储分组主题数据 // ES存储分组主题数据
gc.dumpGroupTheme(task, calcedStations) gc.dumpGroupTheme(task, calcRet)
} else { } else {
task.SetTimeout() task.SetTimeout()
gc.calcTasks[key] = *task log.Println("****沉降分组计算任务超期策略dimensionId,taskId,acqTime,injectTime,deadlineTime****", task.stationGroup.Id, task.stationGroup.Name, task.dimensionId, task.taskId, task.acqTime, task.InjectTime, task.DeadlineTime)
calcTasks.Store(key, *task)
} }
} }
} }

4
et_calc/group/timeStrategy.go

@ -8,8 +8,8 @@ import (
) )
const ( const (
data_active_expire_sec int = 900 data_active_expire_sec int = 900 // 15分钟
data_report_expire_sec int = 3600 data_report_expire_sec int = 3600 // 1小时
) )
// TimeStrategy 测点组合计算 超时时间判断策略 // TimeStrategy 测点组合计算 超时时间判断策略

267
master/app/et_master.go

@ -9,21 +9,21 @@ import (
"gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils/configLoad" "gitea.anxinyun.cn/container/common_utils/configLoad"
"log" "log"
"math"
"net" "net"
"net/rpc" "net/rpc"
"sort" "sync"
"time" "time"
) )
type EtMaster struct { type EtMaster struct {
nodeList []*NodeRpc nodeMap sync.Map
exporter et_prometheus_exporter.PrometheusExporter exporter et_prometheus_exporter.PrometheusExporter
sleepCH chan bool sleepCH chan bool
} }
func NewEtMaster() *EtMaster { func NewEtMaster() *EtMaster {
master := EtMaster{ master := EtMaster{
nodeList: make([]*NodeRpc, 0),
exporter: et_prometheus_exporter.NewPrometheusExporter(), exporter: et_prometheus_exporter.NewPrometheusExporter(),
sleepCH: make(chan bool, 1), sleepCH: make(chan bool, 1),
} }
@ -32,8 +32,8 @@ func NewEtMaster() *EtMaster {
type NodeRpc struct { type NodeRpc struct {
args *common_models.NodeArgs // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输入参数 args *common_models.NodeArgs // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输入参数
resultCH chan bool // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输出结果 resultCH chan int // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输出结果
aggResultCH chan bool // 聚集数据被处理后的返回结果 对应 Replay 参数 aggResultCH chan int // 聚集数据被处理后的返回结果 对应 Reply 参数
client *rpc.Client client *rpc.Client
} }
@ -70,7 +70,7 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) {
//数据类型注册 //数据类型注册
gob.Register([]interface{}{}) gob.Register([]interface{}{})
for { for {
if len(the.nodeList) == 0 { if the.nodeMapCount() == 0 {
log.Printf("nodeList is empty!") log.Printf("nodeList is empty!")
time.Sleep(time.Second * 10) time.Sleep(time.Second * 10)
continue continue
@ -101,24 +101,27 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) {
func (the *EtMaster) notifyData(data common_models.IDataTrace, callNodeFunc func(*NodeRpc, common_models.IDataTrace)) { func (the *EtMaster) notifyData(data common_models.IDataTrace, callNodeFunc func(*NodeRpc, common_models.IDataTrace)) {
thingId := data.GetThingId() thingId := data.GetThingId()
isMatch := false isMatch := false
for _, nodeRpc := range the.nodeList { the.nodeMap.Range(func(address, value interface{}) bool {
if nodeRpc != nil { if nodePtr, ok := value.(*NodeRpc); ok {
if contains(nodeRpc.args.ThingIds, thingId) { if nodePtr != nil {
if contains(nodePtr.args.ThingIds, thingId) {
isMatch = true isMatch = true
go callNodeFunc(nodeRpc, data) go callNodeFunc(nodePtr, data)
return false
} }
} }
} }
return true
})
//无匹配触发 reBalance //无匹配触发 reBalance
if !isMatch { if !isMatch {
if len(the.nodeList) > 0 { nodePtr := the.getNodeWithMinThings()
the.sortNodeListByThingCount() if nodePtr != nil {
if the.nodeList[0] != nil { nodePtr.args.ThingIds = append(nodePtr.args.ThingIds, thingId)
the.nodeList[0].args.ThingIds = append(the.nodeList[0].args.ThingIds, thingId) log.Printf("thingId:[%s]被分配到node:[%s]", thingId, nodePtr.args.Addr)
log.Printf("thingId:[%s] 分配到node:[%s]", thingId, the.nodeList[0].args.Addr) go callNodeFunc(nodePtr, data)
go callNodeFunc(the.nodeList[0], data)
}
} }
} }
} }
@ -131,7 +134,7 @@ func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrac
} }
var serviceMethod = "" var serviceMethod = ""
var resultCH chan bool var resultCH chan int
var v interface{} var v interface{}
switch data.(type) { switch data.(type) {
@ -145,161 +148,223 @@ func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrac
serviceMethod = "etNode.AggDataHandler" serviceMethod = "etNode.AggDataHandler"
resultCH = node.aggResultCH resultCH = node.aggResultCH
default: default:
log.Printf("Unknown type:%v", v) log.Printf("Unknown kafka data type:%v", v)
return return
} }
log.Printf("RPC[%s]node待处理的数据:%+v, \n", serviceMethod, v) log.Printf("RPC[%s] node待处理的数据:%+v \n", serviceMethod, v)
go func() { go func() {
defer timeCost(node.args.ID, data.Q(), time.Now()) defer timeCost(node.args.ID, data.Q(), time.Now())
var reply bool var reply bool
err := node.client.Call(serviceMethod, data, &reply) err := node.client.Call(serviceMethod, data, &reply)
result := boolToInt(reply)
if err != nil { if err != nil {
log.Printf("rpc 调用node, err:%s", err.Error()) // rpc 调用node, err:read tcp 10.8.30.104:57230->10.8.30.104:40000: wsarecv: An existing connection was forcibly closed by the remote host.
log.Printf("master调用node异常。Error:%s", err.Error())
result = 2
} }
resultCH <- reply resultCH <- result
}() }()
// RPC调用结果 // RPC调用结果
var result bool errorCode := 0
timeoutMills := 1000 * time.Millisecond timeoutMills := 5 * 1000 * time.Millisecond
select { select {
case reply := <-resultCH: case reply := <-resultCH:
result = reply // reply 0=false(RPC访问结果返回false),1=true(RPC访问结果返回true),2访问RPC网络异常
log.Printf("RPC[%s]node处理后回复:%v。已处理的数据:%+v \n", serviceMethod, reply, v) if reply == 2 {
log.Printf("RPC[%s]node连接已被关闭。未处理的数据*** %+v *** \n\n", serviceMethod, v)
errorCode = 200
} else if reply == 0 {
//log.Printf("RPC[%s]node处理后回复false。处理失败的数据*** %+v *** \n\n", serviceMethod, v)
errorCode = 100
}
case <-time.After(timeoutMills): case <-time.After(timeoutMills):
log.Printf("RPC[%s]node调用超时退出,timeout:%v。需要处理的数据:%+v \n", serviceMethod, timeoutMills, v) log.Printf("RPC[%s]node调用超时退出gorutine,timeout:%v。未处理的数据*** %+v *** \n\n", serviceMethod, timeoutMills, v)
result = false errorCode = 300
} }
log.Printf("node[%s]处理数据结果:%v。%s %s", node.args.Addr, result, data.R(), data.T())
if result == false { // 100 故障:程序内部问题
//发送 stop 信号 // 200 故障:网络通信问题
the.sleepCH <- true // 300 故障:处理超时
log.Println("=============================================") if errorCode >= 200 {
log.Printf("node[%s]处理[%s|%s]异常,触发nodesTidy", node.args.Addr, data.R(), data.T()) the.errorHandle(errorCode, node.args.Addr, fmt.Sprintf("%s|%s", data.R(), data.T()))
time.Sleep(time.Second * 5) } else {
node = nil //log.Printf("node[%s]node处理后回复true。处理成功的数据*** %+v *** \n\n", node.args.Addr, data.R(), data.T())
the.nodesTidy() log.Printf("RPC[%s]node已处理的数据errorCode=%d *** %+v *** \n\n", serviceMethod, errorCode, v)
} }
} }
// NodeRegister 是 RPC 服务方法,由 et_node 远程调用 // NodeRegister 是 RPC 服务方法,由 et_node 远程调用
func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, replay *bool) error { func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, reply *bool) error {
node := &NodeRpc{ node := &NodeRpc{
args: nodeArgs, args: nodeArgs,
resultCH: make(chan bool, 1), resultCH: make(chan int, 1),
aggResultCH: make(chan bool, 1), aggResultCH: make(chan int, 1),
client: nil, client: nil,
} }
//master 初始化 node client //master 初始化 node client
client, err := rpc.Dial("tcp", node.args.Addr) client, err := rpc.Dial("tcp", nodeArgs.Addr)
if err != nil { if err != nil {
log.Printf("链接node失败-> node[%v]", node.args.Addr) log.Printf("链接node失败-> node[%v]", nodeArgs.Addr)
return err return err
} }
node.client = client
the.nodeList = append(the.nodeList, node) node.client = client
the.addOrUpdate(nodeArgs.Addr, node)
log.Printf("node服务[%v] 注册成功", nodeArgs) log.Printf("node服务[%v] 注册成功", nodeArgs)
printNodesInfo(the.nodeList) the.printNodes()
*replay = true *reply = true
return nil return nil
} }
func (the *EtMaster) NodeHeart(nodeArgs *common_models.NodeArgs, replay *bool) error {
isRegister := false func (the *EtMaster) NodeHeart(nodeArgs *common_models.NodeArgs, reply *bool) error {
for _, nodeRpc := range the.nodeList { if !the.clientIsValid(nodeArgs.Addr) {
if nodeRpc.args.Addr == nodeArgs.Addr {
isRegister = true
}
}
if !isRegister {
log.Printf("收到-未注册的node[%v] 心跳", nodeArgs) log.Printf("收到-未注册的node[%v] 心跳", nodeArgs)
*replay = false *reply = false
return errors.New("未注册的node") return errors.New("未注册的node")
} }
log.Printf("收到-node[%v] 心跳", nodeArgs) log.Printf("收到-node[%v] 心跳", nodeArgs)
*replay = true *reply = true
return nil return nil
} }
// NodeUnRegister 节点RPC 注销 // NodeUnRegister 节点RPC 注销
func (the *EtMaster) NodeUnRegister(nodeArgs *common_models.NodeArgs, replay *bool) error { func (the *EtMaster) NodeUnRegister(nodeArgs *common_models.NodeArgs, reply *bool) error {
value, ok := the.nodeMap.Load(nodeArgs.Addr)
for i, node := range the.nodeList { node := value.(*NodeRpc)
log.Printf("节点[%s] 注销", node.args.Addr) if ok && node.client != nil {
if node.args.Addr == nodeArgs.Addr {
err := node.client.Close() err := node.client.Close()
if err != nil { if err != nil {
log.Printf("节点[%s] client关闭异常 %s", node.args.Addr, err.Error()) log.Printf("节点[%s] client关闭异常 %s", nodeArgs.Addr, err.Error())
}
the.nodeList[i] = nil
} }
the.nodeMap.Delete(nodeArgs.Addr)
} }
the.nodesTidy()
log.Printf("node服务[%v] 注销成功", nodeArgs) log.Printf("node服务[%v] 注销成功", nodeArgs)
*replay = true *reply = true
return nil return nil
} }
func (the *EtMaster) nodesTidy() {
the.nodeList = updateNodeList(the.nodeList)
printNodesInfo(the.nodeList)
}
func (the *EtMaster) sortNodeListByThingCount() {
sort.Slice(the.nodeList, func(i, j int) bool {
if the.nodeList[i] != nil && the.nodeList[j] != nil {
return len(the.nodeList[i].args.ThingIds) < len(the.nodeList[j].args.ThingIds)
} else {
return false
}
})
}
func (the *EtMaster) WaitNodeRegister() { func (the *EtMaster) WaitNodeRegister() {
log.Println("等待 node进行注册") log.Println("等待 node进行注册")
for { for {
if len(the.nodeList) > 0 { if the.nodeMapCount() > 0 {
break break
} }
time.Sleep(time.Second * 10) time.Sleep(time.Second * 10)
} }
} }
func (the *EtMaster) ConnectNode() { func (the *EtMaster) ConnectNode() {
for i := range the.nodeList { the.nodeMap.Range(func(key, value interface{}) bool {
nodeAddr := the.nodeList[i].args.Addr node := value.(*NodeRpc)
if the.nodeList[i].client == nil { nodeAddr := key.(string)
if node.client == nil {
client, err := rpc.Dial("tcp", nodeAddr) client, err := rpc.Dial("tcp", nodeAddr)
if err != nil { if err != nil {
log.Printf("链接node失败-> node[%v]", nodeAddr) log.Printf("链接node失败-> node[%v]", nodeAddr)
continue return true
} }
the.nodeList[i].client = client
node.client = client
the.nodeMap.Store(nodeAddr, node)
}
return true
})
}
func (the *EtMaster) addOrUpdate(key string, newNode *NodeRpc) {
if val, ok := the.nodeMap.Load(key); ok {
hisNode := val.(*NodeRpc)
hisNode.client = newNode.client
the.nodeMap.Store(key, hisNode)
} else {
the.nodeMap.Store(key, newNode)
} }
} }
func (the *EtMaster) nodeMapCount() int {
count := 0
the.nodeMap.Range(func(key, value interface{}) bool {
count++
return true
})
return count
}
func (the *EtMaster) clientIsValid(address string) bool {
val, ok := the.nodeMap.Load(address)
if !ok {
return false
} }
// app 包内公用方法 if val.(*NodeRpc).client == nil {
func updateNodeList(nodes []*NodeRpc) []*NodeRpc { return false
var newNodes []*NodeRpc
for _, node := range nodes {
if node != nil && node.client != nil {
newNodes = append(newNodes, node)
} }
return true
} }
return newNodes
// 获取最少things的节点
func (the *EtMaster) getNodeWithMinThings() *NodeRpc {
var minNode *NodeRpc
minThings := math.MaxInt64 // 初始化为最大值
the.nodeMap.Range(func(key, value interface{}) bool {
node := value.(*NodeRpc)
if len(node.args.ThingIds) < minThings {
minThings = len(node.args.ThingIds)
minNode = node
} }
func printNodesInfo(nodes []*NodeRpc) {
info := fmt.Sprintf("共[%d]个节点:\n ", len(nodes)) return true
for _, node := range nodes { })
return minNode
}
func (the *EtMaster) printNodes() {
count := 0
info := ""
the.nodeMap.Range(func(key, value interface{}) bool {
count++
node := value.(*NodeRpc)
info += fmt.Sprintf("%s,%s\n", node.args.ID, node.args.Addr) info += fmt.Sprintf("%s,%s\n", node.args.ID, node.args.Addr)
return true
})
countInfo := fmt.Sprintf("共[%d]个节点:\n ", count)
log.Printf("%s %s", countInfo, info)
}
func (the *EtMaster) errorHandle(errCode int, address string, dataDesc string) {
val, ok := the.nodeMap.Load(address)
if !ok {
log.Printf("【tidyNodes】Error:不存在的node[%s]\n", address)
return
}
node := val.(*NodeRpc)
//发送 stop 信号
the.sleepCH <- true
log.Println("=============================================")
// 100 故障:程序内部错误
// 200 故障:网络通信问题
// 300 故障:处理超时
if errCode == 200 {
log.Printf("node[%v]连接已中断,休眠5秒后,将删除该节点。消息:%s", node.args.Addr, dataDesc)
time.Sleep(time.Second * 5)
the.nodeMap.Delete(address)
} else if errCode == 300 {
log.Printf("node[%s]处理超时,将休眠5秒后,将删除该节点。消息:%s", address, dataDesc)
time.Sleep(time.Second * 5)
the.nodeMap.Delete(address)
} }
log.Println(info)
the.printNodes()
} }
func contains(arr []string, target string) bool { func contains(arr []string, target string) bool {
for _, value := range arr { for _, value := range arr {
if value == target { if value == target {
@ -310,5 +375,11 @@ func contains(arr []string, target string) bool {
} }
func timeCost(nodeId, deviceId string, start time.Time) { func timeCost(nodeId, deviceId string, start time.Time) {
tc := time.Since(start) tc := time.Since(start)
log.Printf("调用node[%s],[%s]耗时 = %v", nodeId, deviceId, tc) log.Printf("master调用node[%s],处理[%s]耗时%v", nodeId, deviceId, tc)
}
func boolToInt(b bool) int {
if b {
return 1
}
return 0
} }

8
node/app/app.go

@ -57,14 +57,6 @@ func Start() {
} }
go nodeSerRpcListen() go nodeSerRpcListen()
// aggNode 注册,无数据后处理环节
//aggWorker := agg_worker.NewAggWorker()
//err1 := rpc.RegisterName("aggNode", aggWorker)
//if err1 != nil {
// log.Fatal("注册 aggNode rpc 异常", err1)
//}
//aggWorker.RegisterToMaster()
//后移注册流程,避免node启动异常的无效注册 //后移注册流程,避免node启动异常的无效注册
nodeWorker.RegisterToMaster() nodeWorker.RegisterToMaster()

29
node/app/et_node.go

@ -52,8 +52,35 @@ func (the *EtNode) IotaDataHandler(iotaData common_models.IotaData, reply *bool)
return err return err
} }
// 是沉降测试数据
func isSettleData(data map[string]interface{}) bool {
// {"pressure":23.09,"temperature":24.93,"ssagee":16.44}
validKeys := map[string]bool{
"pressure": true,
"temperature": true,
"ssagee": true,
}
if len(data) != 3 {
return false
}
for key := range data {
if !validKeys[key] {
return false
}
}
return true
}
// ConsumerProcess 将 IotaData -> ProcessData // ConsumerProcess 将 IotaData -> ProcessData
func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error { func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error {
//TODO #TEST BEGIN 测试静力水准仪 (现在有计算公式的单测点计算有问题,为了能跑通 沉降分组计算 测试)
//if !isSettleData(iotaData.Data.Data) {
// return nil
//}
// #TEST END
deviceData, err := the.recvDataHandler.OnDataHandler(*iotaData) deviceData, err := the.recvDataHandler.OnDataHandler(*iotaData)
if err != nil { if err != nil {
return err return err
@ -64,7 +91,7 @@ func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error {
} }
log.Printf("rpc处理设备数据[%s]-time[%v]-[%v]", deviceData.DeviceId, deviceData.AcqTime, deviceData.Raw) log.Printf("rpc处理设备数据[%s]-time[%v]-[%v]", deviceData.DeviceId, deviceData.AcqTime, deviceData.Raw)
//log.Printf("rpc处理设备数据[%s]-time[%s]-data:%v", iotaData.DeviceId, iotaData.TriggerTime, iotaData.ThemeData.ThemeData)
the.ch <- &common_models.ProcessData{ the.ch <- &common_models.ProcessData{
DeviceData: *deviceData, DeviceData: *deviceData,
Stations: []common_models.Station{}, Stations: []common_models.Station{},

1
node/stages/stage.go

@ -66,6 +66,7 @@ func (s *Stage) handlerTimeOutCheck(stageName, deviceId string) {
case <-s.execOver: case <-s.execOver:
case <-time.After(defaultTimeout): case <-time.After(defaultTimeout):
log.Printf("=====================") log.Printf("=====================")
//TODO #TEST BEGIN 测试时可以注释掉下面这行,否则调试时超时,会引发一个 panic,导致程序中断。
log.Panicf("stage[%s] ->[%s] 流程处理,超时[%v],请排查", stageName, deviceId, defaultTimeout) log.Panicf("stage[%s] ->[%s] 流程处理,超时[%v],请排查", stageName, deviceId, defaultTimeout)
} }
} }

Loading…
Cancel
Save