Compare commits

...

7 Commits

Author SHA1 Message Date
yfh c61ea64bc1 odeList改为sync.Map; 11 months ago
yfh a029dadf64 删除注释 11 months ago
yfh 6e8fea7023 expireSeconds小于3分钟,返回180秒 11 months ago
yfh b50c15a362 Staions.Data.PhyData赋值 11 months ago
yfh 7c8ceddcca Staions.Data.PhyData赋值 11 months ago
yfh d00f169f8f calcTasks改为sync.Map 11 months ago
yfh d18c3bc18c 添加注释 11 months ago
  1. 6
      dataSource/kafka/kafka_handler.go
  2. 22
      et_Info/InfoHandler.go
  3. 1
      et_cache/cacheHandler.go
  4. 6
      et_calc/dataCalc.go
  5. 7
      et_calc/group/calcTask.go
  6. 123
      et_calc/group/groupCalc.go
  7. 4
      et_calc/group/timeStrategy.go
  8. 267
      master/app/et_master.go
  9. 8
      node/app/app.go
  10. 29
      node/app/et_node.go
  11. 1
      node/stages/stage.go

6
dataSource/kafka/kafka_handler.go

@ -2,9 +2,9 @@ package kafka
import ( import (
"dataSource" "dataSource"
"fmt"
"gitea.anxinyun.cn/container/common_utils/configLoad" "gitea.anxinyun.cn/container/common_utils/configLoad"
"gitea.anxinyun.cn/container/common_utils/kafkaHelper" "gitea.anxinyun.cn/container/common_utils/kafkaHelper"
"log"
) )
type KafkaDataSource struct { type KafkaDataSource struct {
@ -40,7 +40,7 @@ func (s *KafkaDataSource) Producer() {
// 创建消息处理器 // 创建消息处理器
handler := NewMessageHandler(cfgName) handler := NewMessageHandler(cfgName)
if handler == nil { if handler == nil {
fmt.Printf("No handler found for topic %s\n", cfgName) log.Printf("Kafka topic【%s】未定义处理者。\n", cfgName)
continue continue
} }
// 订阅主题 和 消息处理 // 订阅主题 和 消息处理
@ -60,8 +60,10 @@ type IMessageHandler interface {
func NewMessageHandler(cfgName string) IMessageHandler { func NewMessageHandler(cfgName string) IMessageHandler {
switch cfgName { switch cfgName {
case "data_raw": case "data_raw":
log.Printf("Kafka topic【%s】已注册处理者IotaDataHandler\n", cfgName)
return IotaDataHandler{} return IotaDataHandler{}
case "data_agg": case "data_agg":
log.Printf("Kafka topic【%s】已注册处理者AggDataHandler\n", cfgName)
return NewAggDataHandler() return NewAggDataHandler()
default: default:
return nil return nil

22
et_Info/InfoHandler.go

@ -29,6 +29,11 @@ func (the *InfoHandler) GetStage() stages.Stage {
func (the *InfoHandler) getStationInfo(p *common_models.ProcessData) *common_models.ProcessData { func (the *InfoHandler) getStationInfo(p *common_models.ProcessData) *common_models.ProcessData {
// TODO 测试 DeviceId = 22c76344-1eb2-4508-8aa6-4550c010e8f7 ,sensorId=18
//if p.DeviceData.DeviceId != "22c76344-1eb2-4508-8aa6-4550c010e8f7" {
// return &common_models.ProcessData{}
//}
s, err := the.configHelper.GetDeviceStationObjs(p.DeviceData.DeviceId) s, err := the.configHelper.GetDeviceStationObjs(p.DeviceData.DeviceId)
if err == nil && s != nil { if err == nil && s != nil {
p.Stations = s p.Stations = s
@ -64,15 +69,24 @@ func (the *InfoHandler) getFormulaInfo(p *common_models.ProcessData) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
for i2, device := range p.Stations[i].Info.Devices { //for i2, device := range p.Stations[i].Info.Devices {
formulaInfo, err := the.configHelper.GetFormulaInfo(device.FormulaId) // formulaInfo, err := the.configHelper.GetFormulaInfo(device.FormulaId)
// if err == nil {
// p.Stations[i].Info.Devices[i2].FormulaInfo = formulaInfo
// p.Stations[i].Info.Devices[i2].DeviceFactorProto = deviceFactorProto
// }
//}
// TODO #TEST BEGIN 2024-10-01 测点设备没有公式信息,测试时先从设备监测原型中获取
for i2, _ := range p.Stations[i].Info.Devices {
p.Stations[i].Info.Devices[i2].FormulaId = deviceFactorProto.Formula
formulaInfo, err := the.configHelper.GetFormulaInfo(deviceFactorProto.Formula)
if err == nil { if err == nil {
p.Stations[i].Info.Devices[i2].FormulaInfo = formulaInfo p.Stations[i].Info.Devices[i2].FormulaInfo = formulaInfo
p.Stations[i].Info.Devices[i2].DeviceFactorProto = deviceFactorProto p.Stations[i].Info.Devices[i2].DeviceFactorProto = deviceFactorProto
} }
} }
// #TEST END
} }
} }

1
et_cache/cacheHandler.go

@ -64,6 +64,7 @@ func (the *CacheHandler) enqueue(p *common_models.ProcessData) *common_models.Pr
needItemCache.Data, isWinCalcValid = the.windowCalc(v, cacheWindow) needItemCache.Data, isWinCalcValid = the.windowCalc(v, cacheWindow)
if isWinCalcValid { if isWinCalcValid {
station.Data.ThemeData[item.FieldName] = needItemCache.Data station.Data.ThemeData[item.FieldName] = needItemCache.Data
station.Data.PhyData[item.FieldName] = needItemCache.Data
} }
} }
} }

6
et_calc/dataCalc.go

@ -122,8 +122,14 @@ func (the *CalcHandler) calcFormula(p *common_models.ProcessData) *common_models
} }
//todo 测点多设备特殊处理.. //todo 测点多设备特殊处理..
phyData := map[string]any{}
for k, v := range outMap {
phyData[k] = v
}
if len(p.Stations[i].Info.Devices) == 1 { if len(p.Stations[i].Info.Devices) == 1 {
p.Stations[i].Data.ThemeData = outMap p.Stations[i].Data.ThemeData = outMap
p.Stations[i].Data.PhyData = phyData
p.Stations[i].Data.CollectTime = p.DeviceData.AcqTime p.Stations[i].Data.CollectTime = p.DeviceData.AcqTime
} }
} }

7
et_calc/group/calcTask.go

@ -13,7 +13,7 @@ import (
//沉降分组业务处理说明: //沉降分组业务处理说明:
//dimensionId 对应WEB端配置:组网配置/采集策略 //dimensionId 对应WEB端配置:组网配置/采集策略
// taskId是维度下的schema每次调用的时候,生成一个唯一的 //taskId是维度下的schema每次调用的时候,生成的一个编码
//分组配置要求:分组计算中的测点的【采集策略】同一个周期采集 //分组配置要求:分组计算中的测点的【采集策略】同一个周期采集
//特殊场景:上报类测点,要进行分组计算,需要在协议里处理,输出_acq_number确保一致 //特殊场景:上报类测点,要进行分组计算,需要在协议里处理,输出_acq_number确保一致
@ -44,6 +44,7 @@ func (t *CalcTask) AddStationData(data common_models.Station) {
return return
} }
t.stationMap[data.Info.Id] = data t.stationMap[data.Info.Id] = data
log.Println(t.R())
} }
// CheckIntegrity 检查计算项是否完整 // CheckIntegrity 检查计算项是否完整
@ -70,6 +71,10 @@ func (t *CalcTask) SetTimeout() int {
expireSeconds = FromDimension(t.dimensionId) expireSeconds = FromDimension(t.dimensionId)
} }
// 小于3分钟,返回180秒
if expireSeconds < 60*3 {
expireSeconds = 60 * 3
}
t.SetDeadLineTime(expireSeconds) t.SetDeadLineTime(expireSeconds)
return expireSeconds return expireSeconds
} }

123
et_calc/group/groupCalc.go

@ -16,7 +16,7 @@ import (
var ( var (
configHelperInstance *common_utils.ConfigHelper configHelperInstance *common_utils.ConfigHelper
once sync.Once once sync.Once
mu sync.Mutex calcTasks sync.Map
) )
func GetConfigHelper() *common_utils.ConfigHelper { func GetConfigHelper() *common_utils.ConfigHelper {
@ -57,7 +57,6 @@ type GroupCalc struct {
stage *stages.Stage stage *stages.Stage
configHelper *common_utils.ConfigHelper configHelper *common_utils.ConfigHelper
signCalc chan bool signCalc chan bool
calcTasks map[GroupCalcTaskKey]CalcTask
esHandler *et_sink.SinkHandler esHandler *et_sink.SinkHandler
} }
@ -66,11 +65,10 @@ func NewGroupCalc() *GroupCalc {
stage: stages.NewStage("测点分组计算"), stage: stages.NewStage("测点分组计算"),
configHelper: GetConfigHelper(), configHelper: GetConfigHelper(),
signCalc: make(chan bool), signCalc: make(chan bool),
calcTasks: map[GroupCalcTaskKey]CalcTask{},
esHandler: et_sink.NewSinkGroupHandler(), esHandler: et_sink.NewSinkGroupHandler(),
} }
// 添加到 etNode 处理环节,实现数据加工 (缓存group各分项的主题数据 -> 分组计算 -> 分组数据) // 添加到 etNode 处理环节,实现数据加工 (缓存group各分项的主题数据 -> 分组计算、分组数据存储ES)
calcTaskManager.stage.AddProcess(calcTaskManager.processData) calcTaskManager.stage.AddProcess(calcTaskManager.processData)
return calcTaskManager return calcTaskManager
} }
@ -81,47 +79,77 @@ func (gc *GroupCalc) GetStage() stages.Stage {
// processData 的 stations 被改变了 // processData 的 stations 被改变了
func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_models.ProcessData { func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_models.ProcessData {
log.Printf("************* 82行 分组一次处理开始 len(inData.Stations)=%d *************", len(inData.Stations))
resultStations := make([]common_models.Station, 0) resultStations := make([]common_models.Station, 0)
// 分组超时任务 // 分组超时任务
for key, task := range gc.calcTasks { resultStations = append(resultStations, gc.processTimeoutTasks()...)
if task.IsTimeout() {
calcedStations := task.Calc()
resultStations = append(resultStations, calcedStations...)
// ES存储分组主题数据
//dumpStr, groupTheme := task.ToDump(calcedStations)
//gc.esHandler.SinkGroupDataToES(groupTheme)
//log.Printf("[groupCalc.processData]group timeout calc。 %s \n", dumpStr)
gc.dumpGroupTheme(&task, calcedStations)
delete(gc.calcTasks, key)
}
}
// 常规分组计算
for _, station := range inData.Stations { for _, station := range inData.Stations {
if station.Info.Group.Id == 0 || station.Info.Group.GroupType != groupType.Settlement { if station.Info.Group.Id == 0 || station.Info.Group.GroupType != groupType.Settlement {
//log.Printf("非【液压传感器测量沉降】分组。") //log.Printf("非【液压传感器测量沉降】分组。")
resultStations = append(resultStations, station) resultStations = append(resultStations, station)
} else { } else {
calcedStations, err := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime) calcRet, err := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime)
if err != nil { if err == nil {
resultStations = append(resultStations, station) log.Printf("************* 95行 沉降分组计算成功,返回记录数 len(calcRet)={%d} *************", len(calcRet))
resultStations = append(resultStations, calcRet...)
}
log.Printf("************* 98行 分组一次处理,缓存记录数 len(resultStations)={%d} *************", len(resultStations))
}
}
// 计算失败的测点不返回
result := make([]common_models.Station, 0)
for _, station := range resultStations {
if len(station.Data.ThemeData) > 0 {
////TODO #TEST BEGIN | filterSensorIds 需要排查的测点ID,主要排查分组测点主题数据未入ES的问题。
//filterSensorIds := []int{42, 15, 32, 43, 13, 33, 17, 14}
//for _, number := range filterSensorIds {
// if number == station.Info.Id {
// log.Printf("*************** 116行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data)
// break
// }
//}
//// #TEST END
// 需要返回的测点
result = append(result, station)
} else { } else {
resultStations = append(resultStations, calcedStations...) log.Printf("*************** 119行 分组计算异常的测点数据 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data)
}
} }
} }
// 返回处理后的数据,计算后的resultStations可能为空 // 计算后的 result 可能为空
inData.Stations = resultStations log.Printf("************* 124行 分组一次处理结束 len(inData.Stations)=%d *************", len(result))
inData.Stations = result
return inData return inData
} }
func (gc *GroupCalc) processTimeoutTasks() []common_models.Station {
resultStations := make([]common_models.Station, 0)
calcTasks.Range(func(key, value interface{}) bool {
//log.Println("Key:", key, "Value:", value)
task := value.(CalcTask)
if task.IsTimeout() {
calcRet := task.Calc()
resultStations = append(resultStations, calcRet...)
// ES存储分组主题数据
gc.dumpGroupTheme(&task, calcRet)
calcTasks.Delete(key)
}
return true // 返回 true 继续遍历,返回 false 停止遍历
})
return resultStations
}
// ES存储分组主题 // ES存储分组主题
func (gc *GroupCalc) dumpGroupTheme(task *CalcTask, calcedStations []common_models.Station) { func (gc *GroupCalc) dumpGroupTheme(task *CalcTask, calcedStations []common_models.Station) {
dumpStr, groupTheme := task.ToDump(calcedStations) dumpStr, esDoc := task.ToDump(calcedStations)
log.Println(dumpStr) log.Println(dumpStr)
if len(groupTheme.Data) > 0 { if len(esDoc.Data) > 0 {
gc.esHandler.SinkGroupDataToES(groupTheme) gc.esHandler.SinkGroupDataToES(esDoc)
} }
} }
@ -140,48 +168,39 @@ func (gc *GroupCalc) cacheAndCalc(station *common_models.Station, dimensionId st
} }
key := GroupCalcTaskKey{GroupId: sGroup.Id, TaskId: taskId} key := GroupCalcTaskKey{GroupId: sGroup.Id, TaskId: taskId}
if calcTask, ok := gc.calcTasks[key]; ok { if value, ok := calcTasks.Load(key); ok {
calcTask := value.(CalcTask)
calcTask.AddStationData(*station) calcTask.AddStationData(*station)
log.Println(calcTask.R())
// 分组计算 // 分组计算
if calcTask.CheckIntegrity() { if calcTask.CheckIntegrity() {
secs := calcTask.ElapsedSecs() secs := calcTask.ElapsedSecs()
if secs > 10 { if secs > 10 {
log.Printf("[groupCalc.processData] group calc wait %f秒, %s\n", secs, key.R()) log.Printf("[groupCalc.processData] wait %f秒, %s\n", secs, key.R())
} }
calcRet := calcTask.Calc()
calcedStations := calcTask.Calc() if calcRet != nil && len(calcRet) > 0 {
// 缓存已计算数据 resultStations = append(resultStations, calcRet...)
if calcedStations != nil && len(calcedStations) > 0 {
resultStations = append(resultStations, calcedStations...)
} }
// ES存储分组主题数据
//dumpStr, groupTheme := calcTask.ToDump(calcedStations)
//gc.esHandler.SinkGroupDataToES(groupTheme)
//log.Println(dumpStr)
gc.dumpGroupTheme(&calcTask, calcedStations)
delete(gc.calcTasks, key) // ES存储分组主题数据
gc.dumpGroupTheme(&calcTask, calcRet)
calcTasks.Delete(key)
} }
} else { } else {
// 不存在的计算任务:要取到首个元素的设备的采集策略(维度),以便后面获得过期时长 // 新计算任务:取到第一个的采集策略(维度),以便后面获得过期时长
task := NewGroupCalcTask(&sGroup, dimensionId, taskId, acqTime) task := NewGroupCalcTask(&sGroup, dimensionId, taskId, acqTime)
task.AddStationData(*station) task.AddStationData(*station)
log.Println(task.R())
if task.CheckIntegrity() { if task.CheckIntegrity() {
calcedStations := task.Calc() calcRet := task.Calc()
// 缓存已计算数据 if calcRet != nil && len(calcRet) > 0 {
if calcedStations != nil && len(calcedStations) > 0 { resultStations = append(resultStations, calcRet...)
resultStations = append(resultStations, calcedStations...)
} }
// ES存储分组主题数据 // ES存储分组主题数据
gc.dumpGroupTheme(task, calcedStations) gc.dumpGroupTheme(task, calcRet)
} else { } else {
task.SetTimeout() task.SetTimeout()
gc.calcTasks[key] = *task log.Println("****沉降分组计算任务超期策略dimensionId,taskId,acqTime,injectTime,deadlineTime****", task.stationGroup.Id, task.stationGroup.Name, task.dimensionId, task.taskId, task.acqTime, task.InjectTime, task.DeadlineTime)
calcTasks.Store(key, *task)
} }
} }
} }

4
et_calc/group/timeStrategy.go

@ -8,8 +8,8 @@ import (
) )
const ( const (
data_active_expire_sec int = 900 data_active_expire_sec int = 900 // 15分钟
data_report_expire_sec int = 3600 data_report_expire_sec int = 3600 // 1小时
) )
// TimeStrategy 测点组合计算 超时时间判断策略 // TimeStrategy 测点组合计算 超时时间判断策略

267
master/app/et_master.go

@ -9,21 +9,21 @@ import (
"gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils/configLoad" "gitea.anxinyun.cn/container/common_utils/configLoad"
"log" "log"
"math"
"net" "net"
"net/rpc" "net/rpc"
"sort" "sync"
"time" "time"
) )
type EtMaster struct { type EtMaster struct {
nodeList []*NodeRpc nodeMap sync.Map
exporter et_prometheus_exporter.PrometheusExporter exporter et_prometheus_exporter.PrometheusExporter
sleepCH chan bool sleepCH chan bool
} }
func NewEtMaster() *EtMaster { func NewEtMaster() *EtMaster {
master := EtMaster{ master := EtMaster{
nodeList: make([]*NodeRpc, 0),
exporter: et_prometheus_exporter.NewPrometheusExporter(), exporter: et_prometheus_exporter.NewPrometheusExporter(),
sleepCH: make(chan bool, 1), sleepCH: make(chan bool, 1),
} }
@ -32,8 +32,8 @@ func NewEtMaster() *EtMaster {
type NodeRpc struct { type NodeRpc struct {
args *common_models.NodeArgs // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输入参数 args *common_models.NodeArgs // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输入参数
resultCH chan bool // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输出结果 resultCH chan int // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输出结果
aggResultCH chan bool // 聚集数据被处理后的返回结果 对应 Replay 参数 aggResultCH chan int // 聚集数据被处理后的返回结果 对应 Reply 参数
client *rpc.Client client *rpc.Client
} }
@ -70,7 +70,7 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) {
//数据类型注册 //数据类型注册
gob.Register([]interface{}{}) gob.Register([]interface{}{})
for { for {
if len(the.nodeList) == 0 { if the.nodeMapCount() == 0 {
log.Printf("nodeList is empty!") log.Printf("nodeList is empty!")
time.Sleep(time.Second * 10) time.Sleep(time.Second * 10)
continue continue
@ -101,24 +101,27 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) {
func (the *EtMaster) notifyData(data common_models.IDataTrace, callNodeFunc func(*NodeRpc, common_models.IDataTrace)) { func (the *EtMaster) notifyData(data common_models.IDataTrace, callNodeFunc func(*NodeRpc, common_models.IDataTrace)) {
thingId := data.GetThingId() thingId := data.GetThingId()
isMatch := false isMatch := false
for _, nodeRpc := range the.nodeList { the.nodeMap.Range(func(address, value interface{}) bool {
if nodeRpc != nil { if nodePtr, ok := value.(*NodeRpc); ok {
if contains(nodeRpc.args.ThingIds, thingId) { if nodePtr != nil {
if contains(nodePtr.args.ThingIds, thingId) {
isMatch = true isMatch = true
go callNodeFunc(nodeRpc, data) go callNodeFunc(nodePtr, data)
return false
} }
} }
} }
return true
})
//无匹配触发 reBalance //无匹配触发 reBalance
if !isMatch { if !isMatch {
if len(the.nodeList) > 0 { nodePtr := the.getNodeWithMinThings()
the.sortNodeListByThingCount() if nodePtr != nil {
if the.nodeList[0] != nil { nodePtr.args.ThingIds = append(nodePtr.args.ThingIds, thingId)
the.nodeList[0].args.ThingIds = append(the.nodeList[0].args.ThingIds, thingId) log.Printf("thingId:[%s]被分配到node:[%s]", thingId, nodePtr.args.Addr)
log.Printf("thingId:[%s] 分配到node:[%s]", thingId, the.nodeList[0].args.Addr) go callNodeFunc(nodePtr, data)
go callNodeFunc(the.nodeList[0], data)
}
} }
} }
} }
@ -131,7 +134,7 @@ func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrac
} }
var serviceMethod = "" var serviceMethod = ""
var resultCH chan bool var resultCH chan int
var v interface{} var v interface{}
switch data.(type) { switch data.(type) {
@ -145,161 +148,223 @@ func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrac
serviceMethod = "etNode.AggDataHandler" serviceMethod = "etNode.AggDataHandler"
resultCH = node.aggResultCH resultCH = node.aggResultCH
default: default:
log.Printf("Unknown type:%v", v) log.Printf("Unknown kafka data type:%v", v)
return return
} }
log.Printf("RPC[%s]node待处理的数据:%+v, \n", serviceMethod, v) log.Printf("RPC[%s] node待处理的数据:%+v \n", serviceMethod, v)
go func() { go func() {
defer timeCost(node.args.ID, data.Q(), time.Now()) defer timeCost(node.args.ID, data.Q(), time.Now())
var reply bool var reply bool
err := node.client.Call(serviceMethod, data, &reply) err := node.client.Call(serviceMethod, data, &reply)
result := boolToInt(reply)
if err != nil { if err != nil {
log.Printf("rpc 调用node, err:%s", err.Error()) // rpc 调用node, err:read tcp 10.8.30.104:57230->10.8.30.104:40000: wsarecv: An existing connection was forcibly closed by the remote host.
log.Printf("master调用node异常。Error:%s", err.Error())
result = 2
} }
resultCH <- reply resultCH <- result
}() }()
// RPC调用结果 // RPC调用结果
var result bool errorCode := 0
timeoutMills := 1000 * time.Millisecond timeoutMills := 5 * 1000 * time.Millisecond
select { select {
case reply := <-resultCH: case reply := <-resultCH:
result = reply // reply 0=false(RPC访问结果返回false),1=true(RPC访问结果返回true),2访问RPC网络异常
log.Printf("RPC[%s]node处理后回复:%v。已处理的数据:%+v \n", serviceMethod, reply, v) if reply == 2 {
log.Printf("RPC[%s]node连接已被关闭。未处理的数据*** %+v *** \n\n", serviceMethod, v)
errorCode = 200
} else if reply == 0 {
//log.Printf("RPC[%s]node处理后回复false。处理失败的数据*** %+v *** \n\n", serviceMethod, v)
errorCode = 100
}
case <-time.After(timeoutMills): case <-time.After(timeoutMills):
log.Printf("RPC[%s]node调用超时退出,timeout:%v。需要处理的数据:%+v \n", serviceMethod, timeoutMills, v) log.Printf("RPC[%s]node调用超时退出gorutine,timeout:%v。未处理的数据*** %+v *** \n\n", serviceMethod, timeoutMills, v)
result = false errorCode = 300
} }
log.Printf("node[%s]处理数据结果:%v。%s %s", node.args.Addr, result, data.R(), data.T())
if result == false { // 100 故障:程序内部问题
//发送 stop 信号 // 200 故障:网络通信问题
the.sleepCH <- true // 300 故障:处理超时
log.Println("=============================================") if errorCode >= 200 {
log.Printf("node[%s]处理[%s|%s]异常,触发nodesTidy", node.args.Addr, data.R(), data.T()) the.errorHandle(errorCode, node.args.Addr, fmt.Sprintf("%s|%s", data.R(), data.T()))
time.Sleep(time.Second * 5) } else {
node = nil //log.Printf("node[%s]node处理后回复true。处理成功的数据*** %+v *** \n\n", node.args.Addr, data.R(), data.T())
the.nodesTidy() log.Printf("RPC[%s]node已处理的数据errorCode=%d *** %+v *** \n\n", serviceMethod, errorCode, v)
} }
} }
// NodeRegister 是 RPC 服务方法,由 et_node 远程调用 // NodeRegister 是 RPC 服务方法,由 et_node 远程调用
func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, replay *bool) error { func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, reply *bool) error {
node := &NodeRpc{ node := &NodeRpc{
args: nodeArgs, args: nodeArgs,
resultCH: make(chan bool, 1), resultCH: make(chan int, 1),
aggResultCH: make(chan bool, 1), aggResultCH: make(chan int, 1),
client: nil, client: nil,
} }
//master 初始化 node client //master 初始化 node client
client, err := rpc.Dial("tcp", node.args.Addr) client, err := rpc.Dial("tcp", nodeArgs.Addr)
if err != nil { if err != nil {
log.Printf("链接node失败-> node[%v]", node.args.Addr) log.Printf("链接node失败-> node[%v]", nodeArgs.Addr)
return err return err
} }
node.client = client
the.nodeList = append(the.nodeList, node) node.client = client
the.addOrUpdate(nodeArgs.Addr, node)
log.Printf("node服务[%v] 注册成功", nodeArgs) log.Printf("node服务[%v] 注册成功", nodeArgs)
printNodesInfo(the.nodeList) the.printNodes()
*replay = true *reply = true
return nil return nil
} }
func (the *EtMaster) NodeHeart(nodeArgs *common_models.NodeArgs, replay *bool) error {
isRegister := false func (the *EtMaster) NodeHeart(nodeArgs *common_models.NodeArgs, reply *bool) error {
for _, nodeRpc := range the.nodeList { if !the.clientIsValid(nodeArgs.Addr) {
if nodeRpc.args.Addr == nodeArgs.Addr {
isRegister = true
}
}
if !isRegister {
log.Printf("收到-未注册的node[%v] 心跳", nodeArgs) log.Printf("收到-未注册的node[%v] 心跳", nodeArgs)
*replay = false *reply = false
return errors.New("未注册的node") return errors.New("未注册的node")
} }
log.Printf("收到-node[%v] 心跳", nodeArgs) log.Printf("收到-node[%v] 心跳", nodeArgs)
*replay = true *reply = true
return nil return nil
} }
// NodeUnRegister 节点RPC 注销 // NodeUnRegister 节点RPC 注销
func (the *EtMaster) NodeUnRegister(nodeArgs *common_models.NodeArgs, replay *bool) error { func (the *EtMaster) NodeUnRegister(nodeArgs *common_models.NodeArgs, reply *bool) error {
value, ok := the.nodeMap.Load(nodeArgs.Addr)
for i, node := range the.nodeList { node := value.(*NodeRpc)
log.Printf("节点[%s] 注销", node.args.Addr) if ok && node.client != nil {
if node.args.Addr == nodeArgs.Addr {
err := node.client.Close() err := node.client.Close()
if err != nil { if err != nil {
log.Printf("节点[%s] client关闭异常 %s", node.args.Addr, err.Error()) log.Printf("节点[%s] client关闭异常 %s", nodeArgs.Addr, err.Error())
}
the.nodeList[i] = nil
} }
the.nodeMap.Delete(nodeArgs.Addr)
} }
the.nodesTidy()
log.Printf("node服务[%v] 注销成功", nodeArgs) log.Printf("node服务[%v] 注销成功", nodeArgs)
*replay = true *reply = true
return nil return nil
} }
func (the *EtMaster) nodesTidy() {
the.nodeList = updateNodeList(the.nodeList)
printNodesInfo(the.nodeList)
}
func (the *EtMaster) sortNodeListByThingCount() {
sort.Slice(the.nodeList, func(i, j int) bool {
if the.nodeList[i] != nil && the.nodeList[j] != nil {
return len(the.nodeList[i].args.ThingIds) < len(the.nodeList[j].args.ThingIds)
} else {
return false
}
})
}
func (the *EtMaster) WaitNodeRegister() { func (the *EtMaster) WaitNodeRegister() {
log.Println("等待 node进行注册") log.Println("等待 node进行注册")
for { for {
if len(the.nodeList) > 0 { if the.nodeMapCount() > 0 {
break break
} }
time.Sleep(time.Second * 10) time.Sleep(time.Second * 10)
} }
} }
func (the *EtMaster) ConnectNode() { func (the *EtMaster) ConnectNode() {
for i := range the.nodeList { the.nodeMap.Range(func(key, value interface{}) bool {
nodeAddr := the.nodeList[i].args.Addr node := value.(*NodeRpc)
if the.nodeList[i].client == nil { nodeAddr := key.(string)
if node.client == nil {
client, err := rpc.Dial("tcp", nodeAddr) client, err := rpc.Dial("tcp", nodeAddr)
if err != nil { if err != nil {
log.Printf("链接node失败-> node[%v]", nodeAddr) log.Printf("链接node失败-> node[%v]", nodeAddr)
continue return true
} }
the.nodeList[i].client = client
node.client = client
the.nodeMap.Store(nodeAddr, node)
}
return true
})
}
func (the *EtMaster) addOrUpdate(key string, newNode *NodeRpc) {
if val, ok := the.nodeMap.Load(key); ok {
hisNode := val.(*NodeRpc)
hisNode.client = newNode.client
the.nodeMap.Store(key, hisNode)
} else {
the.nodeMap.Store(key, newNode)
} }
} }
func (the *EtMaster) nodeMapCount() int {
count := 0
the.nodeMap.Range(func(key, value interface{}) bool {
count++
return true
})
return count
}
func (the *EtMaster) clientIsValid(address string) bool {
val, ok := the.nodeMap.Load(address)
if !ok {
return false
} }
// app 包内公用方法 if val.(*NodeRpc).client == nil {
func updateNodeList(nodes []*NodeRpc) []*NodeRpc { return false
var newNodes []*NodeRpc
for _, node := range nodes {
if node != nil && node.client != nil {
newNodes = append(newNodes, node)
} }
return true
} }
return newNodes
// 获取最少things的节点
func (the *EtMaster) getNodeWithMinThings() *NodeRpc {
var minNode *NodeRpc
minThings := math.MaxInt64 // 初始化为最大值
the.nodeMap.Range(func(key, value interface{}) bool {
node := value.(*NodeRpc)
if len(node.args.ThingIds) < minThings {
minThings = len(node.args.ThingIds)
minNode = node
} }
func printNodesInfo(nodes []*NodeRpc) {
info := fmt.Sprintf("共[%d]个节点:\n ", len(nodes)) return true
for _, node := range nodes { })
return minNode
}
func (the *EtMaster) printNodes() {
count := 0
info := ""
the.nodeMap.Range(func(key, value interface{}) bool {
count++
node := value.(*NodeRpc)
info += fmt.Sprintf("%s,%s\n", node.args.ID, node.args.Addr) info += fmt.Sprintf("%s,%s\n", node.args.ID, node.args.Addr)
return true
})
countInfo := fmt.Sprintf("共[%d]个节点:\n ", count)
log.Printf("%s %s", countInfo, info)
}
func (the *EtMaster) errorHandle(errCode int, address string, dataDesc string) {
val, ok := the.nodeMap.Load(address)
if !ok {
log.Printf("【tidyNodes】Error:不存在的node[%s]\n", address)
return
}
node := val.(*NodeRpc)
//发送 stop 信号
the.sleepCH <- true
log.Println("=============================================")
// 100 故障:程序内部错误
// 200 故障:网络通信问题
// 300 故障:处理超时
if errCode == 200 {
log.Printf("node[%v]连接已中断,休眠5秒后,将删除该节点。消息:%s", node.args.Addr, dataDesc)
time.Sleep(time.Second * 5)
the.nodeMap.Delete(address)
} else if errCode == 300 {
log.Printf("node[%s]处理超时,将休眠5秒后,将删除该节点。消息:%s", address, dataDesc)
time.Sleep(time.Second * 5)
the.nodeMap.Delete(address)
} }
log.Println(info)
the.printNodes()
} }
func contains(arr []string, target string) bool { func contains(arr []string, target string) bool {
for _, value := range arr { for _, value := range arr {
if value == target { if value == target {
@ -310,5 +375,11 @@ func contains(arr []string, target string) bool {
} }
func timeCost(nodeId, deviceId string, start time.Time) { func timeCost(nodeId, deviceId string, start time.Time) {
tc := time.Since(start) tc := time.Since(start)
log.Printf("调用node[%s],[%s]耗时 = %v", nodeId, deviceId, tc) log.Printf("master调用node[%s],处理[%s]耗时%v", nodeId, deviceId, tc)
}
func boolToInt(b bool) int {
if b {
return 1
}
return 0
} }

8
node/app/app.go

@ -57,14 +57,6 @@ func Start() {
} }
go nodeSerRpcListen() go nodeSerRpcListen()
// aggNode 注册,无数据后处理环节
//aggWorker := agg_worker.NewAggWorker()
//err1 := rpc.RegisterName("aggNode", aggWorker)
//if err1 != nil {
// log.Fatal("注册 aggNode rpc 异常", err1)
//}
//aggWorker.RegisterToMaster()
//后移注册流程,避免node启动异常的无效注册 //后移注册流程,避免node启动异常的无效注册
nodeWorker.RegisterToMaster() nodeWorker.RegisterToMaster()

29
node/app/et_node.go

@ -52,8 +52,35 @@ func (the *EtNode) IotaDataHandler(iotaData common_models.IotaData, reply *bool)
return err return err
} }
// 是沉降测试数据
func isSettleData(data map[string]interface{}) bool {
// {"pressure":23.09,"temperature":24.93,"ssagee":16.44}
validKeys := map[string]bool{
"pressure": true,
"temperature": true,
"ssagee": true,
}
if len(data) != 3 {
return false
}
for key := range data {
if !validKeys[key] {
return false
}
}
return true
}
// ConsumerProcess 将 IotaData -> ProcessData // ConsumerProcess 将 IotaData -> ProcessData
func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error { func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error {
//TODO #TEST BEGIN 测试静力水准仪 (现在有计算公式的单测点计算有问题,为了能跑通 沉降分组计算 测试)
//if !isSettleData(iotaData.Data.Data) {
// return nil
//}
// #TEST END
deviceData, err := the.recvDataHandler.OnDataHandler(*iotaData) deviceData, err := the.recvDataHandler.OnDataHandler(*iotaData)
if err != nil { if err != nil {
return err return err
@ -64,7 +91,7 @@ func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error {
} }
log.Printf("rpc处理设备数据[%s]-time[%v]-[%v]", deviceData.DeviceId, deviceData.AcqTime, deviceData.Raw) log.Printf("rpc处理设备数据[%s]-time[%v]-[%v]", deviceData.DeviceId, deviceData.AcqTime, deviceData.Raw)
//log.Printf("rpc处理设备数据[%s]-time[%s]-data:%v", iotaData.DeviceId, iotaData.TriggerTime, iotaData.ThemeData.ThemeData)
the.ch <- &common_models.ProcessData{ the.ch <- &common_models.ProcessData{
DeviceData: *deviceData, DeviceData: *deviceData,
Stations: []common_models.Station{}, Stations: []common_models.Station{},

1
node/stages/stage.go

@ -66,6 +66,7 @@ func (s *Stage) handlerTimeOutCheck(stageName, deviceId string) {
case <-s.execOver: case <-s.execOver:
case <-time.After(defaultTimeout): case <-time.After(defaultTimeout):
log.Printf("=====================") log.Printf("=====================")
//TODO #TEST BEGIN 测试时可以注释掉下面这行,否则调试时超时,会引发一个 panic,导致程序中断。
log.Panicf("stage[%s] ->[%s] 流程处理,超时[%v],请排查", stageName, deviceId, defaultTimeout) log.Panicf("stage[%s] ->[%s] 流程处理,超时[%v],请排查", stageName, deviceId, defaultTimeout)
} }
} }

Loading…
Cancel
Save