Browse Source

odeList改为sync.Map;

细化node访问异常情况:0正常|100内部错误|200网络异常|300访问超时5s;
修复相同地址Node注册,client不更新问题;
dev
yfh 4 weeks ago
parent
commit
c61ea64bc1
  1. 267
      master/app/et_master.go

267
master/app/et_master.go

@ -9,21 +9,21 @@ import (
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"log"
"math"
"net"
"net/rpc"
"sort"
"sync"
"time"
)
type EtMaster struct {
nodeList []*NodeRpc
nodeMap sync.Map
exporter et_prometheus_exporter.PrometheusExporter
sleepCH chan bool
}
func NewEtMaster() *EtMaster {
master := EtMaster{
nodeList: make([]*NodeRpc, 0),
exporter: et_prometheus_exporter.NewPrometheusExporter(),
sleepCH: make(chan bool, 1),
}
@ -32,8 +32,8 @@ func NewEtMaster() *EtMaster {
type NodeRpc struct {
args *common_models.NodeArgs // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输入参数
resultCH chan bool // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输出结果
aggResultCH chan bool // 聚集数据被处理后的返回结果 对应 Replay 参数
resultCH chan int // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输出结果
aggResultCH chan int // 聚集数据被处理后的返回结果 对应 Reply 参数
client *rpc.Client
}
@ -70,7 +70,7 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) {
//数据类型注册
gob.Register([]interface{}{})
for {
if len(the.nodeList) == 0 {
if the.nodeMapCount() == 0 {
log.Printf("nodeList is empty!")
time.Sleep(time.Second * 10)
continue
@ -101,24 +101,27 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) {
func (the *EtMaster) notifyData(data common_models.IDataTrace, callNodeFunc func(*NodeRpc, common_models.IDataTrace)) {
thingId := data.GetThingId()
isMatch := false
for _, nodeRpc := range the.nodeList {
if nodeRpc != nil {
if contains(nodeRpc.args.ThingIds, thingId) {
the.nodeMap.Range(func(address, value interface{}) bool {
if nodePtr, ok := value.(*NodeRpc); ok {
if nodePtr != nil {
if contains(nodePtr.args.ThingIds, thingId) {
isMatch = true
go callNodeFunc(nodeRpc, data)
go callNodeFunc(nodePtr, data)
return false
}
}
}
return true
})
//无匹配触发 reBalance
if !isMatch {
if len(the.nodeList) > 0 {
the.sortNodeListByThingCount()
if the.nodeList[0] != nil {
the.nodeList[0].args.ThingIds = append(the.nodeList[0].args.ThingIds, thingId)
log.Printf("thingId:[%s] 分配到node:[%s]", thingId, the.nodeList[0].args.Addr)
go callNodeFunc(the.nodeList[0], data)
}
nodePtr := the.getNodeWithMinThings()
if nodePtr != nil {
nodePtr.args.ThingIds = append(nodePtr.args.ThingIds, thingId)
log.Printf("thingId:[%s]被分配到node:[%s]", thingId, nodePtr.args.Addr)
go callNodeFunc(nodePtr, data)
}
}
}
@ -131,7 +134,7 @@ func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrac
}
var serviceMethod = ""
var resultCH chan bool
var resultCH chan int
var v interface{}
switch data.(type) {
@ -145,161 +148,223 @@ func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrac
serviceMethod = "etNode.AggDataHandler"
resultCH = node.aggResultCH
default:
log.Printf("Unknown type:%v", v)
log.Printf("Unknown kafka data type:%v", v)
return
}
log.Printf("RPC[%s]node待处理的数据:%+v, \n", serviceMethod, v)
log.Printf("RPC[%s] node待处理的数据:%+v \n", serviceMethod, v)
go func() {
defer timeCost(node.args.ID, data.Q(), time.Now())
var reply bool
err := node.client.Call(serviceMethod, data, &reply)
result := boolToInt(reply)
if err != nil {
log.Printf("rpc 调用node, err:%s", err.Error())
// rpc 调用node, err:read tcp 10.8.30.104:57230->10.8.30.104:40000: wsarecv: An existing connection was forcibly closed by the remote host.
log.Printf("master调用node异常。Error:%s", err.Error())
result = 2
}
resultCH <- reply
resultCH <- result
}()
// RPC调用结果
var result bool
timeoutMills := 1000 * time.Millisecond
errorCode := 0
timeoutMills := 5 * 1000 * time.Millisecond
select {
case reply := <-resultCH:
result = reply
log.Printf("RPC[%s]node处理后回复:%v。已处理的数据:%+v \n", serviceMethod, reply, v)
// reply 0=false(RPC访问结果返回false),1=true(RPC访问结果返回true),2访问RPC网络异常
if reply == 2 {
log.Printf("RPC[%s]node连接已被关闭。未处理的数据*** %+v *** \n\n", serviceMethod, v)
errorCode = 200
} else if reply == 0 {
//log.Printf("RPC[%s]node处理后回复false。处理失败的数据*** %+v *** \n\n", serviceMethod, v)
errorCode = 100
}
case <-time.After(timeoutMills):
log.Printf("RPC[%s]node调用超时退出,timeout:%v。需要处理的数据:%+v \n", serviceMethod, timeoutMills, v)
result = false
log.Printf("RPC[%s]node调用超时退出gorutine,timeout:%v。未处理的数据*** %+v *** \n\n", serviceMethod, timeoutMills, v)
errorCode = 300
}
log.Printf("node[%s]处理数据结果:%v。%s %s", node.args.Addr, result, data.R(), data.T())
if result == false {
//发送 stop 信号
the.sleepCH <- true
log.Println("=============================================")
log.Printf("node[%s]处理[%s|%s]异常,触发nodesTidy", node.args.Addr, data.R(), data.T())
time.Sleep(time.Second * 5)
node = nil
the.nodesTidy()
// 100 故障:程序内部问题
// 200 故障:网络通信问题
// 300 故障:处理超时
if errorCode >= 200 {
the.errorHandle(errorCode, node.args.Addr, fmt.Sprintf("%s|%s", data.R(), data.T()))
} else {
//log.Printf("node[%s]node处理后回复true。处理成功的数据*** %+v *** \n\n", node.args.Addr, data.R(), data.T())
log.Printf("RPC[%s]node已处理的数据errorCode=%d *** %+v *** \n\n", serviceMethod, errorCode, v)
}
}
// NodeRegister 是 RPC 服务方法,由 et_node 远程调用
func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, replay *bool) error {
func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, reply *bool) error {
node := &NodeRpc{
args: nodeArgs,
resultCH: make(chan bool, 1),
aggResultCH: make(chan bool, 1),
resultCH: make(chan int, 1),
aggResultCH: make(chan int, 1),
client: nil,
}
//master 初始化 node client
client, err := rpc.Dial("tcp", node.args.Addr)
client, err := rpc.Dial("tcp", nodeArgs.Addr)
if err != nil {
log.Printf("链接node失败-> node[%v]", node.args.Addr)
log.Printf("链接node失败-> node[%v]", nodeArgs.Addr)
return err
}
node.client = client
the.nodeList = append(the.nodeList, node)
node.client = client
the.addOrUpdate(nodeArgs.Addr, node)
log.Printf("node服务[%v] 注册成功", nodeArgs)
printNodesInfo(the.nodeList)
*replay = true
the.printNodes()
*reply = true
return nil
}
func (the *EtMaster) NodeHeart(nodeArgs *common_models.NodeArgs, replay *bool) error {
isRegister := false
for _, nodeRpc := range the.nodeList {
if nodeRpc.args.Addr == nodeArgs.Addr {
isRegister = true
}
}
if !isRegister {
func (the *EtMaster) NodeHeart(nodeArgs *common_models.NodeArgs, reply *bool) error {
if !the.clientIsValid(nodeArgs.Addr) {
log.Printf("收到-未注册的node[%v] 心跳", nodeArgs)
*replay = false
*reply = false
return errors.New("未注册的node")
}
log.Printf("收到-node[%v] 心跳", nodeArgs)
*replay = true
*reply = true
return nil
}
// NodeUnRegister 节点RPC 注销
func (the *EtMaster) NodeUnRegister(nodeArgs *common_models.NodeArgs, replay *bool) error {
for i, node := range the.nodeList {
log.Printf("节点[%s] 注销", node.args.Addr)
if node.args.Addr == nodeArgs.Addr {
func (the *EtMaster) NodeUnRegister(nodeArgs *common_models.NodeArgs, reply *bool) error {
value, ok := the.nodeMap.Load(nodeArgs.Addr)
node := value.(*NodeRpc)
if ok && node.client != nil {
err := node.client.Close()
if err != nil {
log.Printf("节点[%s] client关闭异常 %s", node.args.Addr, err.Error())
}
the.nodeList[i] = nil
log.Printf("节点[%s] client关闭异常 %s", nodeArgs.Addr, err.Error())
}
the.nodeMap.Delete(nodeArgs.Addr)
}
the.nodesTidy()
log.Printf("node服务[%v] 注销成功", nodeArgs)
*replay = true
*reply = true
return nil
}
func (the *EtMaster) nodesTidy() {
the.nodeList = updateNodeList(the.nodeList)
printNodesInfo(the.nodeList)
}
func (the *EtMaster) sortNodeListByThingCount() {
sort.Slice(the.nodeList, func(i, j int) bool {
if the.nodeList[i] != nil && the.nodeList[j] != nil {
return len(the.nodeList[i].args.ThingIds) < len(the.nodeList[j].args.ThingIds)
} else {
return false
}
})
}
func (the *EtMaster) WaitNodeRegister() {
log.Println("等待 node进行注册")
for {
if len(the.nodeList) > 0 {
if the.nodeMapCount() > 0 {
break
}
time.Sleep(time.Second * 10)
}
}
func (the *EtMaster) ConnectNode() {
for i := range the.nodeList {
nodeAddr := the.nodeList[i].args.Addr
if the.nodeList[i].client == nil {
the.nodeMap.Range(func(key, value interface{}) bool {
node := value.(*NodeRpc)
nodeAddr := key.(string)
if node.client == nil {
client, err := rpc.Dial("tcp", nodeAddr)
if err != nil {
log.Printf("链接node失败-> node[%v]", nodeAddr)
continue
return true
}
the.nodeList[i].client = client
node.client = client
the.nodeMap.Store(nodeAddr, node)
}
return true
})
}
func (the *EtMaster) addOrUpdate(key string, newNode *NodeRpc) {
if val, ok := the.nodeMap.Load(key); ok {
hisNode := val.(*NodeRpc)
hisNode.client = newNode.client
the.nodeMap.Store(key, hisNode)
} else {
the.nodeMap.Store(key, newNode)
}
}
func (the *EtMaster) nodeMapCount() int {
count := 0
the.nodeMap.Range(func(key, value interface{}) bool {
count++
return true
})
return count
}
func (the *EtMaster) clientIsValid(address string) bool {
val, ok := the.nodeMap.Load(address)
if !ok {
return false
}
// app 包内公用方法
func updateNodeList(nodes []*NodeRpc) []*NodeRpc {
var newNodes []*NodeRpc
for _, node := range nodes {
if node != nil && node.client != nil {
newNodes = append(newNodes, node)
if val.(*NodeRpc).client == nil {
return false
}
return true
}
return newNodes
// 获取最少things的节点
func (the *EtMaster) getNodeWithMinThings() *NodeRpc {
var minNode *NodeRpc
minThings := math.MaxInt64 // 初始化为最大值
the.nodeMap.Range(func(key, value interface{}) bool {
node := value.(*NodeRpc)
if len(node.args.ThingIds) < minThings {
minThings = len(node.args.ThingIds)
minNode = node
}
func printNodesInfo(nodes []*NodeRpc) {
info := fmt.Sprintf("共[%d]个节点:\n ", len(nodes))
for _, node := range nodes {
return true
})
return minNode
}
func (the *EtMaster) printNodes() {
count := 0
info := ""
the.nodeMap.Range(func(key, value interface{}) bool {
count++
node := value.(*NodeRpc)
info += fmt.Sprintf("%s,%s\n", node.args.ID, node.args.Addr)
return true
})
countInfo := fmt.Sprintf("共[%d]个节点:\n ", count)
log.Printf("%s %s", countInfo, info)
}
func (the *EtMaster) errorHandle(errCode int, address string, dataDesc string) {
val, ok := the.nodeMap.Load(address)
if !ok {
log.Printf("【tidyNodes】Error:不存在的node[%s]\n", address)
return
}
node := val.(*NodeRpc)
//发送 stop 信号
the.sleepCH <- true
log.Println("=============================================")
// 100 故障:程序内部错误
// 200 故障:网络通信问题
// 300 故障:处理超时
if errCode == 200 {
log.Printf("node[%v]连接已中断,休眠5秒后,将删除该节点。消息:%s", node.args.Addr, dataDesc)
time.Sleep(time.Second * 5)
the.nodeMap.Delete(address)
} else if errCode == 300 {
log.Printf("node[%s]处理超时,将休眠5秒后,将删除该节点。消息:%s", address, dataDesc)
time.Sleep(time.Second * 5)
the.nodeMap.Delete(address)
}
log.Println(info)
the.printNodes()
}
func contains(arr []string, target string) bool {
for _, value := range arr {
if value == target {
@ -310,5 +375,11 @@ func contains(arr []string, target string) bool {
}
func timeCost(nodeId, deviceId string, start time.Time) {
tc := time.Since(start)
log.Printf("调用node[%s],[%s]耗时 = %v", nodeId, deviceId, tc)
log.Printf("master调用node[%s],处理[%s]耗时%v", nodeId, deviceId, tc)
}
func boolToInt(b bool) int {
if b {
return 1
}
return 0
}

Loading…
Cancel
Save