|
|
@ -9,21 +9,21 @@ import ( |
|
|
|
"gitea.anxinyun.cn/container/common_models" |
|
|
|
"gitea.anxinyun.cn/container/common_utils/configLoad" |
|
|
|
"log" |
|
|
|
"math" |
|
|
|
"net" |
|
|
|
"net/rpc" |
|
|
|
"sort" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
|
) |
|
|
|
|
|
|
|
type EtMaster struct { |
|
|
|
nodeList []*NodeRpc |
|
|
|
nodeMap sync.Map |
|
|
|
exporter et_prometheus_exporter.PrometheusExporter |
|
|
|
sleepCH chan bool |
|
|
|
} |
|
|
|
|
|
|
|
func NewEtMaster() *EtMaster { |
|
|
|
master := EtMaster{ |
|
|
|
nodeList: make([]*NodeRpc, 0), |
|
|
|
exporter: et_prometheus_exporter.NewPrometheusExporter(), |
|
|
|
sleepCH: make(chan bool, 1), |
|
|
|
} |
|
|
@ -31,9 +31,9 @@ func NewEtMaster() *EtMaster { |
|
|
|
} |
|
|
|
|
|
|
|
type NodeRpc struct { |
|
|
|
args *common_models.NodeArgs // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输入参数
|
|
|
|
resultCH chan bool // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输出结果
|
|
|
|
aggResultCH chan bool // 聚集数据被处理后的返回结果 对应 Replay 参数
|
|
|
|
args *common_models.NodeArgs // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输入参数
|
|
|
|
resultCH chan int // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输出结果
|
|
|
|
aggResultCH chan int // 聚集数据被处理后的返回结果 对应 Reply 参数
|
|
|
|
client *rpc.Client |
|
|
|
} |
|
|
|
|
|
|
@ -70,7 +70,7 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) { |
|
|
|
//数据类型注册
|
|
|
|
gob.Register([]interface{}{}) |
|
|
|
for { |
|
|
|
if len(the.nodeList) == 0 { |
|
|
|
if the.nodeMapCount() == 0 { |
|
|
|
log.Printf("nodeList is empty!") |
|
|
|
time.Sleep(time.Second * 10) |
|
|
|
continue |
|
|
@ -101,24 +101,27 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) { |
|
|
|
func (the *EtMaster) notifyData(data common_models.IDataTrace, callNodeFunc func(*NodeRpc, common_models.IDataTrace)) { |
|
|
|
thingId := data.GetThingId() |
|
|
|
isMatch := false |
|
|
|
for _, nodeRpc := range the.nodeList { |
|
|
|
if nodeRpc != nil { |
|
|
|
if contains(nodeRpc.args.ThingIds, thingId) { |
|
|
|
the.nodeMap.Range(func(address, value interface{}) bool { |
|
|
|
if nodePtr, ok := value.(*NodeRpc); ok { |
|
|
|
if nodePtr != nil { |
|
|
|
if contains(nodePtr.args.ThingIds, thingId) { |
|
|
|
isMatch = true |
|
|
|
go callNodeFunc(nodeRpc, data) |
|
|
|
go callNodeFunc(nodePtr, data) |
|
|
|
return false |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return true |
|
|
|
}) |
|
|
|
|
|
|
|
//无匹配触发 reBalance
|
|
|
|
if !isMatch { |
|
|
|
if len(the.nodeList) > 0 { |
|
|
|
the.sortNodeListByThingCount() |
|
|
|
if the.nodeList[0] != nil { |
|
|
|
the.nodeList[0].args.ThingIds = append(the.nodeList[0].args.ThingIds, thingId) |
|
|
|
log.Printf("thingId:[%s] 分配到node:[%s]", thingId, the.nodeList[0].args.Addr) |
|
|
|
go callNodeFunc(the.nodeList[0], data) |
|
|
|
} |
|
|
|
nodePtr := the.getNodeWithMinThings() |
|
|
|
if nodePtr != nil { |
|
|
|
nodePtr.args.ThingIds = append(nodePtr.args.ThingIds, thingId) |
|
|
|
log.Printf("thingId:[%s]被分配到node:[%s]", thingId, nodePtr.args.Addr) |
|
|
|
go callNodeFunc(nodePtr, data) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -131,7 +134,7 @@ func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrac |
|
|
|
} |
|
|
|
|
|
|
|
var serviceMethod = "" |
|
|
|
var resultCH chan bool |
|
|
|
var resultCH chan int |
|
|
|
var v interface{} |
|
|
|
|
|
|
|
switch data.(type) { |
|
|
@ -145,161 +148,223 @@ func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrac |
|
|
|
serviceMethod = "etNode.AggDataHandler" |
|
|
|
resultCH = node.aggResultCH |
|
|
|
default: |
|
|
|
log.Printf("Unknown type:%v", v) |
|
|
|
log.Printf("Unknown kafka data type:%v", v) |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
log.Printf("RPC[%s]node待处理的数据:%+v, \n", serviceMethod, v) |
|
|
|
log.Printf("RPC[%s] node待处理的数据:%+v \n", serviceMethod, v) |
|
|
|
|
|
|
|
go func() { |
|
|
|
defer timeCost(node.args.ID, data.Q(), time.Now()) |
|
|
|
var reply bool |
|
|
|
err := node.client.Call(serviceMethod, data, &reply) |
|
|
|
result := boolToInt(reply) |
|
|
|
if err != nil { |
|
|
|
log.Printf("rpc 调用node, err:%s", err.Error()) |
|
|
|
// rpc 调用node, err:read tcp 10.8.30.104:57230->10.8.30.104:40000: wsarecv: An existing connection was forcibly closed by the remote host.
|
|
|
|
log.Printf("master调用node异常。Error:%s", err.Error()) |
|
|
|
result = 2 |
|
|
|
} |
|
|
|
resultCH <- reply |
|
|
|
resultCH <- result |
|
|
|
}() |
|
|
|
|
|
|
|
// RPC调用结果
|
|
|
|
var result bool |
|
|
|
timeoutMills := 1000 * time.Millisecond |
|
|
|
errorCode := 0 |
|
|
|
timeoutMills := 5 * 1000 * time.Millisecond |
|
|
|
select { |
|
|
|
case reply := <-resultCH: |
|
|
|
result = reply |
|
|
|
log.Printf("RPC[%s]node处理后回复:%v。已处理的数据:%+v \n", serviceMethod, reply, v) |
|
|
|
// reply 0=false(RPC访问结果返回false),1=true(RPC访问结果返回true),2访问RPC网络异常
|
|
|
|
if reply == 2 { |
|
|
|
log.Printf("RPC[%s]node连接已被关闭。未处理的数据*** %+v *** \n\n", serviceMethod, v) |
|
|
|
errorCode = 200 |
|
|
|
} else if reply == 0 { |
|
|
|
//log.Printf("RPC[%s]node处理后回复false。处理失败的数据*** %+v *** \n\n", serviceMethod, v)
|
|
|
|
errorCode = 100 |
|
|
|
} |
|
|
|
case <-time.After(timeoutMills): |
|
|
|
log.Printf("RPC[%s]node调用超时退出,timeout:%v。需要处理的数据:%+v \n", serviceMethod, timeoutMills, v) |
|
|
|
result = false |
|
|
|
log.Printf("RPC[%s]node调用超时退出gorutine,timeout:%v。未处理的数据*** %+v *** \n\n", serviceMethod, timeoutMills, v) |
|
|
|
errorCode = 300 |
|
|
|
} |
|
|
|
log.Printf("node[%s]处理数据结果:%v。%s %s", node.args.Addr, result, data.R(), data.T()) |
|
|
|
|
|
|
|
if result == false { |
|
|
|
//发送 stop 信号
|
|
|
|
the.sleepCH <- true |
|
|
|
log.Println("=============================================") |
|
|
|
log.Printf("node[%s]处理[%s|%s]异常,触发nodesTidy", node.args.Addr, data.R(), data.T()) |
|
|
|
time.Sleep(time.Second * 5) |
|
|
|
node = nil |
|
|
|
the.nodesTidy() |
|
|
|
// 100 故障:程序内部问题
|
|
|
|
// 200 故障:网络通信问题
|
|
|
|
// 300 故障:处理超时
|
|
|
|
if errorCode >= 200 { |
|
|
|
the.errorHandle(errorCode, node.args.Addr, fmt.Sprintf("%s|%s", data.R(), data.T())) |
|
|
|
} else { |
|
|
|
//log.Printf("node[%s]node处理后回复true。处理成功的数据*** %+v *** \n\n", node.args.Addr, data.R(), data.T())
|
|
|
|
log.Printf("RPC[%s]node已处理的数据errorCode=%d *** %+v *** \n\n", serviceMethod, errorCode, v) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// NodeRegister 是 RPC 服务方法,由 et_node 远程调用
|
|
|
|
func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, replay *bool) error { |
|
|
|
func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, reply *bool) error { |
|
|
|
node := &NodeRpc{ |
|
|
|
args: nodeArgs, |
|
|
|
resultCH: make(chan bool, 1), |
|
|
|
aggResultCH: make(chan bool, 1), |
|
|
|
resultCH: make(chan int, 1), |
|
|
|
aggResultCH: make(chan int, 1), |
|
|
|
client: nil, |
|
|
|
} |
|
|
|
//master 初始化 node client
|
|
|
|
client, err := rpc.Dial("tcp", node.args.Addr) |
|
|
|
client, err := rpc.Dial("tcp", nodeArgs.Addr) |
|
|
|
if err != nil { |
|
|
|
log.Printf("链接node失败-> node[%v]", node.args.Addr) |
|
|
|
log.Printf("链接node失败-> node[%v]", nodeArgs.Addr) |
|
|
|
return err |
|
|
|
} |
|
|
|
node.client = client |
|
|
|
|
|
|
|
the.nodeList = append(the.nodeList, node) |
|
|
|
node.client = client |
|
|
|
the.addOrUpdate(nodeArgs.Addr, node) |
|
|
|
log.Printf("node服务[%v] 注册成功", nodeArgs) |
|
|
|
printNodesInfo(the.nodeList) |
|
|
|
*replay = true |
|
|
|
the.printNodes() |
|
|
|
*reply = true |
|
|
|
return nil |
|
|
|
} |
|
|
|
func (the *EtMaster) NodeHeart(nodeArgs *common_models.NodeArgs, replay *bool) error { |
|
|
|
|
|
|
|
isRegister := false |
|
|
|
for _, nodeRpc := range the.nodeList { |
|
|
|
if nodeRpc.args.Addr == nodeArgs.Addr { |
|
|
|
isRegister = true |
|
|
|
} |
|
|
|
} |
|
|
|
if !isRegister { |
|
|
|
func (the *EtMaster) NodeHeart(nodeArgs *common_models.NodeArgs, reply *bool) error { |
|
|
|
if !the.clientIsValid(nodeArgs.Addr) { |
|
|
|
log.Printf("收到-未注册的node[%v] 心跳", nodeArgs) |
|
|
|
*replay = false |
|
|
|
*reply = false |
|
|
|
return errors.New("未注册的node") |
|
|
|
} |
|
|
|
|
|
|
|
log.Printf("收到-node[%v] 心跳", nodeArgs) |
|
|
|
*replay = true |
|
|
|
*reply = true |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// NodeUnRegister 节点RPC 注销
|
|
|
|
func (the *EtMaster) NodeUnRegister(nodeArgs *common_models.NodeArgs, replay *bool) error { |
|
|
|
|
|
|
|
for i, node := range the.nodeList { |
|
|
|
log.Printf("节点[%s] 注销", node.args.Addr) |
|
|
|
if node.args.Addr == nodeArgs.Addr { |
|
|
|
func (the *EtMaster) NodeUnRegister(nodeArgs *common_models.NodeArgs, reply *bool) error { |
|
|
|
value, ok := the.nodeMap.Load(nodeArgs.Addr) |
|
|
|
node := value.(*NodeRpc) |
|
|
|
if ok && node.client != nil { |
|
|
|
err := node.client.Close() |
|
|
|
if err != nil { |
|
|
|
log.Printf("节点[%s] client关闭异常 %s", node.args.Addr, err.Error()) |
|
|
|
} |
|
|
|
the.nodeList[i] = nil |
|
|
|
log.Printf("节点[%s] client关闭异常 %s", nodeArgs.Addr, err.Error()) |
|
|
|
} |
|
|
|
the.nodeMap.Delete(nodeArgs.Addr) |
|
|
|
} |
|
|
|
the.nodesTidy() |
|
|
|
|
|
|
|
log.Printf("node服务[%v] 注销成功", nodeArgs) |
|
|
|
*replay = true |
|
|
|
*reply = true |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (the *EtMaster) nodesTidy() { |
|
|
|
the.nodeList = updateNodeList(the.nodeList) |
|
|
|
printNodesInfo(the.nodeList) |
|
|
|
} |
|
|
|
|
|
|
|
func (the *EtMaster) sortNodeListByThingCount() { |
|
|
|
sort.Slice(the.nodeList, func(i, j int) bool { |
|
|
|
if the.nodeList[i] != nil && the.nodeList[j] != nil { |
|
|
|
return len(the.nodeList[i].args.ThingIds) < len(the.nodeList[j].args.ThingIds) |
|
|
|
} else { |
|
|
|
return false |
|
|
|
} |
|
|
|
}) |
|
|
|
} |
|
|
|
func (the *EtMaster) WaitNodeRegister() { |
|
|
|
log.Println("等待 node进行注册") |
|
|
|
for { |
|
|
|
if len(the.nodeList) > 0 { |
|
|
|
if the.nodeMapCount() > 0 { |
|
|
|
break |
|
|
|
} |
|
|
|
time.Sleep(time.Second * 10) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (the *EtMaster) ConnectNode() { |
|
|
|
for i := range the.nodeList { |
|
|
|
nodeAddr := the.nodeList[i].args.Addr |
|
|
|
if the.nodeList[i].client == nil { |
|
|
|
the.nodeMap.Range(func(key, value interface{}) bool { |
|
|
|
node := value.(*NodeRpc) |
|
|
|
nodeAddr := key.(string) |
|
|
|
|
|
|
|
if node.client == nil { |
|
|
|
client, err := rpc.Dial("tcp", nodeAddr) |
|
|
|
if err != nil { |
|
|
|
log.Printf("链接node失败-> node[%v]", nodeAddr) |
|
|
|
continue |
|
|
|
return true |
|
|
|
} |
|
|
|
the.nodeList[i].client = client |
|
|
|
|
|
|
|
node.client = client |
|
|
|
the.nodeMap.Store(nodeAddr, node) |
|
|
|
} |
|
|
|
|
|
|
|
return true |
|
|
|
}) |
|
|
|
} |
|
|
|
|
|
|
|
func (the *EtMaster) addOrUpdate(key string, newNode *NodeRpc) { |
|
|
|
if val, ok := the.nodeMap.Load(key); ok { |
|
|
|
hisNode := val.(*NodeRpc) |
|
|
|
hisNode.client = newNode.client |
|
|
|
the.nodeMap.Store(key, hisNode) |
|
|
|
} else { |
|
|
|
the.nodeMap.Store(key, newNode) |
|
|
|
} |
|
|
|
} |
|
|
|
func (the *EtMaster) nodeMapCount() int { |
|
|
|
count := 0 |
|
|
|
the.nodeMap.Range(func(key, value interface{}) bool { |
|
|
|
count++ |
|
|
|
return true |
|
|
|
}) |
|
|
|
return count |
|
|
|
} |
|
|
|
func (the *EtMaster) clientIsValid(address string) bool { |
|
|
|
val, ok := the.nodeMap.Load(address) |
|
|
|
if !ok { |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
// app 包内公用方法
|
|
|
|
func updateNodeList(nodes []*NodeRpc) []*NodeRpc { |
|
|
|
var newNodes []*NodeRpc |
|
|
|
for _, node := range nodes { |
|
|
|
if node != nil && node.client != nil { |
|
|
|
newNodes = append(newNodes, node) |
|
|
|
if val.(*NodeRpc).client == nil { |
|
|
|
return false |
|
|
|
} |
|
|
|
return true |
|
|
|
} |
|
|
|
|
|
|
|
// 获取最少things的节点
|
|
|
|
func (the *EtMaster) getNodeWithMinThings() *NodeRpc { |
|
|
|
var minNode *NodeRpc |
|
|
|
minThings := math.MaxInt64 // 初始化为最大值
|
|
|
|
|
|
|
|
the.nodeMap.Range(func(key, value interface{}) bool { |
|
|
|
node := value.(*NodeRpc) |
|
|
|
if len(node.args.ThingIds) < minThings { |
|
|
|
minThings = len(node.args.ThingIds) |
|
|
|
minNode = node |
|
|
|
} |
|
|
|
return newNodes |
|
|
|
|
|
|
|
return true |
|
|
|
}) |
|
|
|
|
|
|
|
return minNode |
|
|
|
} |
|
|
|
func printNodesInfo(nodes []*NodeRpc) { |
|
|
|
info := fmt.Sprintf("共[%d]个节点:\n ", len(nodes)) |
|
|
|
for _, node := range nodes { |
|
|
|
func (the *EtMaster) printNodes() { |
|
|
|
count := 0 |
|
|
|
info := "" |
|
|
|
the.nodeMap.Range(func(key, value interface{}) bool { |
|
|
|
count++ |
|
|
|
node := value.(*NodeRpc) |
|
|
|
info += fmt.Sprintf("%s,%s\n", node.args.ID, node.args.Addr) |
|
|
|
return true |
|
|
|
}) |
|
|
|
countInfo := fmt.Sprintf("共[%d]个节点:\n ", count) |
|
|
|
log.Printf("%s %s", countInfo, info) |
|
|
|
} |
|
|
|
func (the *EtMaster) errorHandle(errCode int, address string, dataDesc string) { |
|
|
|
val, ok := the.nodeMap.Load(address) |
|
|
|
if !ok { |
|
|
|
log.Printf("【tidyNodes】Error:不存在的node[%s]\n", address) |
|
|
|
return |
|
|
|
} |
|
|
|
log.Println(info) |
|
|
|
node := val.(*NodeRpc) |
|
|
|
|
|
|
|
//发送 stop 信号
|
|
|
|
the.sleepCH <- true |
|
|
|
log.Println("=============================================") |
|
|
|
|
|
|
|
// 100 故障:程序内部错误
|
|
|
|
// 200 故障:网络通信问题
|
|
|
|
// 300 故障:处理超时
|
|
|
|
if errCode == 200 { |
|
|
|
log.Printf("node[%v]连接已中断,休眠5秒后,将删除该节点。消息:%s", node.args.Addr, dataDesc) |
|
|
|
time.Sleep(time.Second * 5) |
|
|
|
the.nodeMap.Delete(address) |
|
|
|
} else if errCode == 300 { |
|
|
|
log.Printf("node[%s]处理超时,将休眠5秒后,将删除该节点。消息:%s", address, dataDesc) |
|
|
|
time.Sleep(time.Second * 5) |
|
|
|
the.nodeMap.Delete(address) |
|
|
|
} |
|
|
|
|
|
|
|
the.printNodes() |
|
|
|
} |
|
|
|
|
|
|
|
func contains(arr []string, target string) bool { |
|
|
|
for _, value := range arr { |
|
|
|
if value == target { |
|
|
@ -310,5 +375,11 @@ func contains(arr []string, target string) bool { |
|
|
|
} |
|
|
|
func timeCost(nodeId, deviceId string, start time.Time) { |
|
|
|
tc := time.Since(start) |
|
|
|
log.Printf("调用node[%s],[%s]耗时 = %v", nodeId, deviceId, tc) |
|
|
|
log.Printf("master调用node[%s],处理[%s]耗时%v", nodeId, deviceId, tc) |
|
|
|
} |
|
|
|
func boolToInt(b bool) int { |
|
|
|
if b { |
|
|
|
return 1 |
|
|
|
} |
|
|
|
return 0 |
|
|
|
} |
|
|
|