Compare commits

...

7 Commits

Author SHA1 Message Date
yfh c61ea64bc1 odeList改为sync.Map; 11 months ago
yfh a029dadf64 删除注释 11 months ago
yfh 6e8fea7023 expireSeconds小于3分钟,返回180秒 11 months ago
yfh b50c15a362 Staions.Data.PhyData赋值 11 months ago
yfh 7c8ceddcca Staions.Data.PhyData赋值 11 months ago
yfh d00f169f8f calcTasks改为sync.Map 11 months ago
yfh d18c3bc18c 添加注释 11 months ago
  1. 6
      dataSource/kafka/kafka_handler.go
  2. 22
      et_Info/InfoHandler.go
  3. 1
      et_cache/cacheHandler.go
  4. 6
      et_calc/dataCalc.go
  5. 15
      et_calc/group/calcTask.go
  6. 123
      et_calc/group/groupCalc.go
  7. 4
      et_calc/group/timeStrategy.go
  8. 279
      master/app/et_master.go
  9. 8
      node/app/app.go
  10. 29
      node/app/et_node.go
  11. 1
      node/stages/stage.go

6
dataSource/kafka/kafka_handler.go

@ -2,9 +2,9 @@ package kafka
import (
"dataSource"
"fmt"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"gitea.anxinyun.cn/container/common_utils/kafkaHelper"
"log"
)
type KafkaDataSource struct {
@ -40,7 +40,7 @@ func (s *KafkaDataSource) Producer() {
// 创建消息处理器
handler := NewMessageHandler(cfgName)
if handler == nil {
fmt.Printf("No handler found for topic %s\n", cfgName)
log.Printf("Kafka topic【%s】未定义处理者。\n", cfgName)
continue
}
// 订阅主题 和 消息处理
@ -60,8 +60,10 @@ type IMessageHandler interface {
func NewMessageHandler(cfgName string) IMessageHandler {
switch cfgName {
case "data_raw":
log.Printf("Kafka topic【%s】已注册处理者IotaDataHandler\n", cfgName)
return IotaDataHandler{}
case "data_agg":
log.Printf("Kafka topic【%s】已注册处理者AggDataHandler\n", cfgName)
return NewAggDataHandler()
default:
return nil

22
et_Info/InfoHandler.go

@ -29,6 +29,11 @@ func (the *InfoHandler) GetStage() stages.Stage {
func (the *InfoHandler) getStationInfo(p *common_models.ProcessData) *common_models.ProcessData {
// TODO 测试 DeviceId = 22c76344-1eb2-4508-8aa6-4550c010e8f7 ,sensorId=18
//if p.DeviceData.DeviceId != "22c76344-1eb2-4508-8aa6-4550c010e8f7" {
// return &common_models.ProcessData{}
//}
s, err := the.configHelper.GetDeviceStationObjs(p.DeviceData.DeviceId)
if err == nil && s != nil {
p.Stations = s
@ -64,15 +69,24 @@ func (the *InfoHandler) getFormulaInfo(p *common_models.ProcessData) {
if err != nil {
panic(err)
}
for i2, device := range p.Stations[i].Info.Devices {
formulaInfo, err := the.configHelper.GetFormulaInfo(device.FormulaId)
//for i2, device := range p.Stations[i].Info.Devices {
// formulaInfo, err := the.configHelper.GetFormulaInfo(device.FormulaId)
// if err == nil {
// p.Stations[i].Info.Devices[i2].FormulaInfo = formulaInfo
// p.Stations[i].Info.Devices[i2].DeviceFactorProto = deviceFactorProto
// }
//}
// TODO #TEST BEGIN 2024-10-01 测点设备没有公式信息,测试时先从设备监测原型中获取
for i2, _ := range p.Stations[i].Info.Devices {
p.Stations[i].Info.Devices[i2].FormulaId = deviceFactorProto.Formula
formulaInfo, err := the.configHelper.GetFormulaInfo(deviceFactorProto.Formula)
if err == nil {
p.Stations[i].Info.Devices[i2].FormulaInfo = formulaInfo
p.Stations[i].Info.Devices[i2].DeviceFactorProto = deviceFactorProto
}
}
// #TEST END
}
}

1
et_cache/cacheHandler.go

@ -64,6 +64,7 @@ func (the *CacheHandler) enqueue(p *common_models.ProcessData) *common_models.Pr
needItemCache.Data, isWinCalcValid = the.windowCalc(v, cacheWindow)
if isWinCalcValid {
station.Data.ThemeData[item.FieldName] = needItemCache.Data
station.Data.PhyData[item.FieldName] = needItemCache.Data
}
}
}

6
et_calc/dataCalc.go

@ -122,8 +122,14 @@ func (the *CalcHandler) calcFormula(p *common_models.ProcessData) *common_models
}
//todo 测点多设备特殊处理..
phyData := map[string]any{}
for k, v := range outMap {
phyData[k] = v
}
if len(p.Stations[i].Info.Devices) == 1 {
p.Stations[i].Data.ThemeData = outMap
p.Stations[i].Data.PhyData = phyData
p.Stations[i].Data.CollectTime = p.DeviceData.AcqTime
}
}

15
et_calc/group/calcTask.go

@ -11,11 +11,11 @@ import (
"time"
)
// 沉降分组业务处理说明:
// dimensionId 对应WEB端配置:组网配置/采集策略
// taskId是维度下的schema每次调用的时候,生成一个唯一的
// 分组配置要求:分组计算中的测点的【采集策略】同一个周期采集
// 特殊场景:上报类测点,要进行分组计算,需要在协议里处理,输出_acq_number确保一致
//沉降分组业务处理说明:
//dimensionId 对应WEB端配置:组网配置/采集策略
//taskId是维度下的schema每次调用的时候,生成的一个编码
//分组配置要求:分组计算中的测点的【采集策略】同一个周期采集
//特殊场景:上报类测点,要进行分组计算,需要在协议里处理,输出_acq_number确保一致
type CalcTask struct {
*BaseDueTask
@ -44,6 +44,7 @@ func (t *CalcTask) AddStationData(data common_models.Station) {
return
}
t.stationMap[data.Info.Id] = data
log.Println(t.R())
}
// CheckIntegrity 检查计算项是否完整
@ -70,6 +71,10 @@ func (t *CalcTask) SetTimeout() int {
expireSeconds = FromDimension(t.dimensionId)
}
// 小于3分钟,返回180秒
if expireSeconds < 60*3 {
expireSeconds = 60 * 3
}
t.SetDeadLineTime(expireSeconds)
return expireSeconds
}

123
et_calc/group/groupCalc.go

@ -16,7 +16,7 @@ import (
var (
configHelperInstance *common_utils.ConfigHelper
once sync.Once
mu sync.Mutex
calcTasks sync.Map
)
func GetConfigHelper() *common_utils.ConfigHelper {
@ -57,7 +57,6 @@ type GroupCalc struct {
stage *stages.Stage
configHelper *common_utils.ConfigHelper
signCalc chan bool
calcTasks map[GroupCalcTaskKey]CalcTask
esHandler *et_sink.SinkHandler
}
@ -66,11 +65,10 @@ func NewGroupCalc() *GroupCalc {
stage: stages.NewStage("测点分组计算"),
configHelper: GetConfigHelper(),
signCalc: make(chan bool),
calcTasks: map[GroupCalcTaskKey]CalcTask{},
esHandler: et_sink.NewSinkGroupHandler(),
}
// 添加到 etNode 处理环节,实现数据加工 (缓存group各分项的主题数据 -> 分组计算 -> 分组数据)
// 添加到 etNode 处理环节,实现数据加工 (缓存group各分项的主题数据 -> 分组计算、分组数据存储ES)
calcTaskManager.stage.AddProcess(calcTaskManager.processData)
return calcTaskManager
}
@ -81,47 +79,77 @@ func (gc *GroupCalc) GetStage() stages.Stage {
// processData 的 stations 被改变了
func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_models.ProcessData {
log.Printf("************* 82行 分组一次处理开始 len(inData.Stations)=%d *************", len(inData.Stations))
resultStations := make([]common_models.Station, 0)
// 分组超时任务
for key, task := range gc.calcTasks {
if task.IsTimeout() {
calcedStations := task.Calc()
resultStations = append(resultStations, calcedStations...)
// ES存储分组主题数据
//dumpStr, groupTheme := task.ToDump(calcedStations)
//gc.esHandler.SinkGroupDataToES(groupTheme)
//log.Printf("[groupCalc.processData]group timeout calc。 %s \n", dumpStr)
gc.dumpGroupTheme(&task, calcedStations)
delete(gc.calcTasks, key)
}
}
resultStations = append(resultStations, gc.processTimeoutTasks()...)
// 常规分组计算
for _, station := range inData.Stations {
if station.Info.Group.Id == 0 || station.Info.Group.GroupType != groupType.Settlement {
//log.Printf("非【液压传感器测量沉降】分组。")
resultStations = append(resultStations, station)
} else {
calcedStations, err := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime)
if err != nil {
resultStations = append(resultStations, station)
} else {
resultStations = append(resultStations, calcedStations...)
calcRet, err := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime)
if err == nil {
log.Printf("************* 95行 沉降分组计算成功,返回记录数 len(calcRet)={%d} *************", len(calcRet))
resultStations = append(resultStations, calcRet...)
}
log.Printf("************* 98行 分组一次处理,缓存记录数 len(resultStations)={%d} *************", len(resultStations))
}
}
// 返回处理后的数据,计算后的resultStations可能为空
inData.Stations = resultStations
// 计算失败的测点不返回
result := make([]common_models.Station, 0)
for _, station := range resultStations {
if len(station.Data.ThemeData) > 0 {
////TODO #TEST BEGIN | filterSensorIds 需要排查的测点ID,主要排查分组测点主题数据未入ES的问题。
//filterSensorIds := []int{42, 15, 32, 43, 13, 33, 17, 14}
//for _, number := range filterSensorIds {
// if number == station.Info.Id {
// log.Printf("*************** 116行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data)
// break
// }
//}
//// #TEST END
// 需要返回的测点
result = append(result, station)
} else {
log.Printf("*************** 119行 分组计算异常的测点数据 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data)
}
}
// 计算后的 result 可能为空
log.Printf("************* 124行 分组一次处理结束 len(inData.Stations)=%d *************", len(result))
inData.Stations = result
return inData
}
func (gc *GroupCalc) processTimeoutTasks() []common_models.Station {
resultStations := make([]common_models.Station, 0)
calcTasks.Range(func(key, value interface{}) bool {
//log.Println("Key:", key, "Value:", value)
task := value.(CalcTask)
if task.IsTimeout() {
calcRet := task.Calc()
resultStations = append(resultStations, calcRet...)
// ES存储分组主题数据
gc.dumpGroupTheme(&task, calcRet)
calcTasks.Delete(key)
}
return true // 返回 true 继续遍历,返回 false 停止遍历
})
return resultStations
}
// ES存储分组主题
func (gc *GroupCalc) dumpGroupTheme(task *CalcTask, calcedStations []common_models.Station) {
dumpStr, groupTheme := task.ToDump(calcedStations)
dumpStr, esDoc := task.ToDump(calcedStations)
log.Println(dumpStr)
if len(groupTheme.Data) > 0 {
gc.esHandler.SinkGroupDataToES(groupTheme)
if len(esDoc.Data) > 0 {
gc.esHandler.SinkGroupDataToES(esDoc)
}
}
@ -140,48 +168,39 @@ func (gc *GroupCalc) cacheAndCalc(station *common_models.Station, dimensionId st
}
key := GroupCalcTaskKey{GroupId: sGroup.Id, TaskId: taskId}
if calcTask, ok := gc.calcTasks[key]; ok {
if value, ok := calcTasks.Load(key); ok {
calcTask := value.(CalcTask)
calcTask.AddStationData(*station)
log.Println(calcTask.R())
// 分组计算
if calcTask.CheckIntegrity() {
secs := calcTask.ElapsedSecs()
if secs > 10 {
log.Printf("[groupCalc.processData] group calc wait %f秒, %s\n", secs, key.R())
log.Printf("[groupCalc.processData] wait %f秒, %s\n", secs, key.R())
}
calcedStations := calcTask.Calc()
// 缓存已计算数据
if calcedStations != nil && len(calcedStations) > 0 {
resultStations = append(resultStations, calcedStations...)
calcRet := calcTask.Calc()
if calcRet != nil && len(calcRet) > 0 {
resultStations = append(resultStations, calcRet...)
}
// ES存储分组主题数据
//dumpStr, groupTheme := calcTask.ToDump(calcedStations)
//gc.esHandler.SinkGroupDataToES(groupTheme)
//log.Println(dumpStr)
gc.dumpGroupTheme(&calcTask, calcedStations)
delete(gc.calcTasks, key)
// ES存储分组主题数据
gc.dumpGroupTheme(&calcTask, calcRet)
calcTasks.Delete(key)
}
} else {
// 不存在的计算任务:要取到首个元素的设备的采集策略(维度),以便后面获得过期时长
// 新计算任务:取到第一个的采集策略(维度),以便后面获得过期时长
task := NewGroupCalcTask(&sGroup, dimensionId, taskId, acqTime)
task.AddStationData(*station)
log.Println(task.R())
if task.CheckIntegrity() {
calcedStations := task.Calc()
// 缓存已计算数据
if calcedStations != nil && len(calcedStations) > 0 {
resultStations = append(resultStations, calcedStations...)
calcRet := task.Calc()
if calcRet != nil && len(calcRet) > 0 {
resultStations = append(resultStations, calcRet...)
}
// ES存储分组主题数据
gc.dumpGroupTheme(task, calcedStations)
gc.dumpGroupTheme(task, calcRet)
} else {
task.SetTimeout()
gc.calcTasks[key] = *task
log.Println("****沉降分组计算任务超期策略dimensionId,taskId,acqTime,injectTime,deadlineTime****", task.stationGroup.Id, task.stationGroup.Name, task.dimensionId, task.taskId, task.acqTime, task.InjectTime, task.DeadlineTime)
calcTasks.Store(key, *task)
}
}
}

4
et_calc/group/timeStrategy.go

@ -8,8 +8,8 @@ import (
)
const (
data_active_expire_sec int = 900
data_report_expire_sec int = 3600
data_active_expire_sec int = 900 // 15分钟
data_report_expire_sec int = 3600 // 1小时
)
// TimeStrategy 测点组合计算 超时时间判断策略

279
master/app/et_master.go

@ -9,21 +9,21 @@ import (
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"log"
"math"
"net"
"net/rpc"
"sort"
"sync"
"time"
)
type EtMaster struct {
nodeList []*NodeRpc
nodeMap sync.Map
exporter et_prometheus_exporter.PrometheusExporter
sleepCH chan bool
}
func NewEtMaster() *EtMaster {
master := EtMaster{
nodeList: make([]*NodeRpc, 0),
exporter: et_prometheus_exporter.NewPrometheusExporter(),
sleepCH: make(chan bool, 1),
}
@ -31,9 +31,9 @@ func NewEtMaster() *EtMaster {
}
type NodeRpc struct {
args *common_models.NodeArgs // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输入参数
resultCH chan bool // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输出结果
aggResultCH chan bool // 聚集数据被处理后的返回结果 对应 Replay 参数
args *common_models.NodeArgs // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输入参数
resultCH chan int // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输出结果
aggResultCH chan int // 聚集数据被处理后的返回结果 对应 Reply 参数
client *rpc.Client
}
@ -70,7 +70,7 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) {
//数据类型注册
gob.Register([]interface{}{})
for {
if len(the.nodeList) == 0 {
if the.nodeMapCount() == 0 {
log.Printf("nodeList is empty!")
time.Sleep(time.Second * 10)
continue
@ -101,24 +101,27 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) {
func (the *EtMaster) notifyData(data common_models.IDataTrace, callNodeFunc func(*NodeRpc, common_models.IDataTrace)) {
thingId := data.GetThingId()
isMatch := false
for _, nodeRpc := range the.nodeList {
if nodeRpc != nil {
if contains(nodeRpc.args.ThingIds, thingId) {
isMatch = true
go callNodeFunc(nodeRpc, data)
the.nodeMap.Range(func(address, value interface{}) bool {
if nodePtr, ok := value.(*NodeRpc); ok {
if nodePtr != nil {
if contains(nodePtr.args.ThingIds, thingId) {
isMatch = true
go callNodeFunc(nodePtr, data)
return false
}
}
}
}
return true
})
//无匹配触发 reBalance
if !isMatch {
if len(the.nodeList) > 0 {
the.sortNodeListByThingCount()
if the.nodeList[0] != nil {
the.nodeList[0].args.ThingIds = append(the.nodeList[0].args.ThingIds, thingId)
log.Printf("thingId:[%s] 分配到node:[%s]", thingId, the.nodeList[0].args.Addr)
go callNodeFunc(the.nodeList[0], data)
}
nodePtr := the.getNodeWithMinThings()
if nodePtr != nil {
nodePtr.args.ThingIds = append(nodePtr.args.ThingIds, thingId)
log.Printf("thingId:[%s]被分配到node:[%s]", thingId, nodePtr.args.Addr)
go callNodeFunc(nodePtr, data)
}
}
}
@ -131,7 +134,7 @@ func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrac
}
var serviceMethod = ""
var resultCH chan bool
var resultCH chan int
var v interface{}
switch data.(type) {
@ -145,161 +148,223 @@ func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrac
serviceMethod = "etNode.AggDataHandler"
resultCH = node.aggResultCH
default:
log.Printf("Unknown type:%v", v)
log.Printf("Unknown kafka data type:%v", v)
return
}
log.Printf("RPC[%s]node待处理的数据:%+v, \n", serviceMethod, v)
log.Printf("RPC[%s] node待处理的数据:%+v \n", serviceMethod, v)
go func() {
defer timeCost(node.args.ID, data.Q(), time.Now())
var reply bool
err := node.client.Call(serviceMethod, data, &reply)
result := boolToInt(reply)
if err != nil {
log.Printf("rpc 调用node, err:%s", err.Error())
// rpc 调用node, err:read tcp 10.8.30.104:57230->10.8.30.104:40000: wsarecv: An existing connection was forcibly closed by the remote host.
log.Printf("master调用node异常。Error:%s", err.Error())
result = 2
}
resultCH <- reply
resultCH <- result
}()
// RPC调用结果
var result bool
timeoutMills := 1000 * time.Millisecond
errorCode := 0
timeoutMills := 5 * 1000 * time.Millisecond
select {
case reply := <-resultCH:
result = reply
log.Printf("RPC[%s]node处理后回复:%v。已处理的数据:%+v \n", serviceMethod, reply, v)
// reply 0=false(RPC访问结果返回false),1=true(RPC访问结果返回true),2访问RPC网络异常
if reply == 2 {
log.Printf("RPC[%s]node连接已被关闭。未处理的数据*** %+v *** \n\n", serviceMethod, v)
errorCode = 200
} else if reply == 0 {
//log.Printf("RPC[%s]node处理后回复false。处理失败的数据*** %+v *** \n\n", serviceMethod, v)
errorCode = 100
}
case <-time.After(timeoutMills):
log.Printf("RPC[%s]node调用超时退出,timeout:%v。需要处理的数据:%+v \n", serviceMethod, timeoutMills, v)
result = false
log.Printf("RPC[%s]node调用超时退出gorutine,timeout:%v。未处理的数据*** %+v *** \n\n", serviceMethod, timeoutMills, v)
errorCode = 300
}
log.Printf("node[%s]处理数据结果:%v。%s %s", node.args.Addr, result, data.R(), data.T())
if result == false {
//发送 stop 信号
the.sleepCH <- true
log.Println("=============================================")
log.Printf("node[%s]处理[%s|%s]异常,触发nodesTidy", node.args.Addr, data.R(), data.T())
time.Sleep(time.Second * 5)
node = nil
the.nodesTidy()
// 100 故障:程序内部问题
// 200 故障:网络通信问题
// 300 故障:处理超时
if errorCode >= 200 {
the.errorHandle(errorCode, node.args.Addr, fmt.Sprintf("%s|%s", data.R(), data.T()))
} else {
//log.Printf("node[%s]node处理后回复true。处理成功的数据*** %+v *** \n\n", node.args.Addr, data.R(), data.T())
log.Printf("RPC[%s]node已处理的数据errorCode=%d *** %+v *** \n\n", serviceMethod, errorCode, v)
}
}
// NodeRegister 是 RPC 服务方法,由 et_node 远程调用
func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, replay *bool) error {
func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, reply *bool) error {
node := &NodeRpc{
args: nodeArgs,
resultCH: make(chan bool, 1),
aggResultCH: make(chan bool, 1),
resultCH: make(chan int, 1),
aggResultCH: make(chan int, 1),
client: nil,
}
//master 初始化 node client
client, err := rpc.Dial("tcp", node.args.Addr)
client, err := rpc.Dial("tcp", nodeArgs.Addr)
if err != nil {
log.Printf("链接node失败-> node[%v]", node.args.Addr)
log.Printf("链接node失败-> node[%v]", nodeArgs.Addr)
return err
}
node.client = client
the.nodeList = append(the.nodeList, node)
node.client = client
the.addOrUpdate(nodeArgs.Addr, node)
log.Printf("node服务[%v] 注册成功", nodeArgs)
printNodesInfo(the.nodeList)
*replay = true
the.printNodes()
*reply = true
return nil
}
func (the *EtMaster) NodeHeart(nodeArgs *common_models.NodeArgs, replay *bool) error {
isRegister := false
for _, nodeRpc := range the.nodeList {
if nodeRpc.args.Addr == nodeArgs.Addr {
isRegister = true
}
}
if !isRegister {
func (the *EtMaster) NodeHeart(nodeArgs *common_models.NodeArgs, reply *bool) error {
if !the.clientIsValid(nodeArgs.Addr) {
log.Printf("收到-未注册的node[%v] 心跳", nodeArgs)
*replay = false
*reply = false
return errors.New("未注册的node")
}
log.Printf("收到-node[%v] 心跳", nodeArgs)
*replay = true
*reply = true
return nil
}
// NodeUnRegister 节点RPC 注销
func (the *EtMaster) NodeUnRegister(nodeArgs *common_models.NodeArgs, replay *bool) error {
for i, node := range the.nodeList {
log.Printf("节点[%s] 注销", node.args.Addr)
if node.args.Addr == nodeArgs.Addr {
err := node.client.Close()
if err != nil {
log.Printf("节点[%s] client关闭异常 %s", node.args.Addr, err.Error())
}
the.nodeList[i] = nil
func (the *EtMaster) NodeUnRegister(nodeArgs *common_models.NodeArgs, reply *bool) error {
value, ok := the.nodeMap.Load(nodeArgs.Addr)
node := value.(*NodeRpc)
if ok && node.client != nil {
err := node.client.Close()
if err != nil {
log.Printf("节点[%s] client关闭异常 %s", nodeArgs.Addr, err.Error())
}
the.nodeMap.Delete(nodeArgs.Addr)
}
the.nodesTidy()
log.Printf("node服务[%v] 注销成功", nodeArgs)
*replay = true
*reply = true
return nil
}
func (the *EtMaster) nodesTidy() {
the.nodeList = updateNodeList(the.nodeList)
printNodesInfo(the.nodeList)
}
func (the *EtMaster) sortNodeListByThingCount() {
sort.Slice(the.nodeList, func(i, j int) bool {
if the.nodeList[i] != nil && the.nodeList[j] != nil {
return len(the.nodeList[i].args.ThingIds) < len(the.nodeList[j].args.ThingIds)
} else {
return false
}
})
}
func (the *EtMaster) WaitNodeRegister() {
log.Println("等待 node进行注册")
for {
if len(the.nodeList) > 0 {
if the.nodeMapCount() > 0 {
break
}
time.Sleep(time.Second * 10)
}
}
func (the *EtMaster) ConnectNode() {
for i := range the.nodeList {
nodeAddr := the.nodeList[i].args.Addr
if the.nodeList[i].client == nil {
the.nodeMap.Range(func(key, value interface{}) bool {
node := value.(*NodeRpc)
nodeAddr := key.(string)
if node.client == nil {
client, err := rpc.Dial("tcp", nodeAddr)
if err != nil {
log.Printf("链接node失败-> node[%v]", nodeAddr)
continue
return true
}
the.nodeList[i].client = client
node.client = client
the.nodeMap.Store(nodeAddr, node)
}
return true
})
}
func (the *EtMaster) addOrUpdate(key string, newNode *NodeRpc) {
if val, ok := the.nodeMap.Load(key); ok {
hisNode := val.(*NodeRpc)
hisNode.client = newNode.client
the.nodeMap.Store(key, hisNode)
} else {
the.nodeMap.Store(key, newNode)
}
}
func (the *EtMaster) nodeMapCount() int {
count := 0
the.nodeMap.Range(func(key, value interface{}) bool {
count++
return true
})
return count
}
func (the *EtMaster) clientIsValid(address string) bool {
val, ok := the.nodeMap.Load(address)
if !ok {
return false
}
// app 包内公用方法
func updateNodeList(nodes []*NodeRpc) []*NodeRpc {
var newNodes []*NodeRpc
for _, node := range nodes {
if node != nil && node.client != nil {
newNodes = append(newNodes, node)
}
if val.(*NodeRpc).client == nil {
return false
}
return newNodes
return true
}
// 获取最少things的节点
func (the *EtMaster) getNodeWithMinThings() *NodeRpc {
var minNode *NodeRpc
minThings := math.MaxInt64 // 初始化为最大值
the.nodeMap.Range(func(key, value interface{}) bool {
node := value.(*NodeRpc)
if len(node.args.ThingIds) < minThings {
minThings = len(node.args.ThingIds)
minNode = node
}
return true
})
return minNode
}
func printNodesInfo(nodes []*NodeRpc) {
info := fmt.Sprintf("共[%d]个节点:\n ", len(nodes))
for _, node := range nodes {
func (the *EtMaster) printNodes() {
count := 0
info := ""
the.nodeMap.Range(func(key, value interface{}) bool {
count++
node := value.(*NodeRpc)
info += fmt.Sprintf("%s,%s\n", node.args.ID, node.args.Addr)
return true
})
countInfo := fmt.Sprintf("共[%d]个节点:\n ", count)
log.Printf("%s %s", countInfo, info)
}
func (the *EtMaster) errorHandle(errCode int, address string, dataDesc string) {
val, ok := the.nodeMap.Load(address)
if !ok {
log.Printf("【tidyNodes】Error:不存在的node[%s]\n", address)
return
}
node := val.(*NodeRpc)
//发送 stop 信号
the.sleepCH <- true
log.Println("=============================================")
// 100 故障:程序内部错误
// 200 故障:网络通信问题
// 300 故障:处理超时
if errCode == 200 {
log.Printf("node[%v]连接已中断,休眠5秒后,将删除该节点。消息:%s", node.args.Addr, dataDesc)
time.Sleep(time.Second * 5)
the.nodeMap.Delete(address)
} else if errCode == 300 {
log.Printf("node[%s]处理超时,将休眠5秒后,将删除该节点。消息:%s", address, dataDesc)
time.Sleep(time.Second * 5)
the.nodeMap.Delete(address)
}
log.Println(info)
the.printNodes()
}
func contains(arr []string, target string) bool {
for _, value := range arr {
if value == target {
@ -310,5 +375,11 @@ func contains(arr []string, target string) bool {
}
func timeCost(nodeId, deviceId string, start time.Time) {
tc := time.Since(start)
log.Printf("调用node[%s],[%s]耗时 = %v", nodeId, deviceId, tc)
log.Printf("master调用node[%s],处理[%s]耗时%v", nodeId, deviceId, tc)
}
func boolToInt(b bool) int {
if b {
return 1
}
return 0
}

8
node/app/app.go

@ -57,14 +57,6 @@ func Start() {
}
go nodeSerRpcListen()
// aggNode 注册,无数据后处理环节
//aggWorker := agg_worker.NewAggWorker()
//err1 := rpc.RegisterName("aggNode", aggWorker)
//if err1 != nil {
// log.Fatal("注册 aggNode rpc 异常", err1)
//}
//aggWorker.RegisterToMaster()
//后移注册流程,避免node启动异常的无效注册
nodeWorker.RegisterToMaster()

29
node/app/et_node.go

@ -52,8 +52,35 @@ func (the *EtNode) IotaDataHandler(iotaData common_models.IotaData, reply *bool)
return err
}
// 是沉降测试数据
func isSettleData(data map[string]interface{}) bool {
// {"pressure":23.09,"temperature":24.93,"ssagee":16.44}
validKeys := map[string]bool{
"pressure": true,
"temperature": true,
"ssagee": true,
}
if len(data) != 3 {
return false
}
for key := range data {
if !validKeys[key] {
return false
}
}
return true
}
// ConsumerProcess 将 IotaData -> ProcessData
func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error {
//TODO #TEST BEGIN 测试静力水准仪 (现在有计算公式的单测点计算有问题,为了能跑通 沉降分组计算 测试)
//if !isSettleData(iotaData.Data.Data) {
// return nil
//}
// #TEST END
deviceData, err := the.recvDataHandler.OnDataHandler(*iotaData)
if err != nil {
return err
@ -64,7 +91,7 @@ func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error {
}
log.Printf("rpc处理设备数据[%s]-time[%v]-[%v]", deviceData.DeviceId, deviceData.AcqTime, deviceData.Raw)
//log.Printf("rpc处理设备数据[%s]-time[%s]-data:%v", iotaData.DeviceId, iotaData.TriggerTime, iotaData.ThemeData.ThemeData)
the.ch <- &common_models.ProcessData{
DeviceData: *deviceData,
Stations: []common_models.Station{},

1
node/stages/stage.go

@ -66,6 +66,7 @@ func (s *Stage) handlerTimeOutCheck(stageName, deviceId string) {
case <-s.execOver:
case <-time.After(defaultTimeout):
log.Printf("=====================")
//TODO #TEST BEGIN 测试时可以注释掉下面这行,否则调试时超时,会引发一个 panic,导致程序中断。
log.Panicf("stage[%s] ->[%s] 流程处理,超时[%v],请排查", stageName, deviceId, defaultTimeout)
}
}

Loading…
Cancel
Save