You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
398 lines
10 KiB
398 lines
10 KiB
package app
|
|
|
|
import (
|
|
"dataSource"
|
|
"encoding/gob"
|
|
"errors"
|
|
"et_prometheus_exporter"
|
|
"fmt"
|
|
"gitea.anxinyun.cn/container/common_models"
|
|
"gitea.anxinyun.cn/container/common_utils/configLoad"
|
|
"log"
|
|
"math"
|
|
"net"
|
|
"net/rpc"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type EtMaster struct {
|
|
nodeMap sync.Map
|
|
exporter et_prometheus_exporter.PrometheusExporter
|
|
sleepCH chan bool
|
|
}
|
|
|
|
func NewEtMaster() *EtMaster {
|
|
master := EtMaster{
|
|
exporter: et_prometheus_exporter.NewPrometheusExporter(),
|
|
sleepCH: make(chan bool, 1),
|
|
}
|
|
return &master
|
|
}
|
|
|
|
type NodeRpc struct {
|
|
args *common_models.NodeArgs // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输入参数
|
|
resultCH chan int // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输出结果
|
|
aggResultCH chan int // 聚集数据被处理后的返回结果 对应 Reply 参数
|
|
client *rpc.Client
|
|
}
|
|
|
|
// RegisterListen 启动 master RPC服务
|
|
func (the *EtMaster) RegisterListen() {
|
|
//监听
|
|
err := rpc.RegisterName("master", the)
|
|
if err != nil {
|
|
log.Println("master 提供注册服务异常")
|
|
return
|
|
}
|
|
|
|
port := configLoad.LoadConfig().GetUint16("master.port")
|
|
|
|
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
|
|
if err != nil {
|
|
log.Panic("master 启动 node服务注册功能异常")
|
|
}
|
|
log.Printf("master 启动 node服务注册功能 :%d", port)
|
|
for {
|
|
//log.Println("master 监听新注册链接")
|
|
conn, err := listener.Accept()
|
|
if err != nil {
|
|
log.Println("master rpc Accept异常")
|
|
}
|
|
log.Printf("master Accept注册链接 from node[%s]", conn.RemoteAddr())
|
|
go rpc.ServeConn(conn)
|
|
}
|
|
}
|
|
|
|
// DistributeData 分发数据。
|
|
// 监听两个数据通道RawDataChan和AggDataChan,根据不同类型的数据通道接收到的数据,调用notifyData方法进行相应的处理操作。
|
|
func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) {
|
|
//数据类型注册
|
|
gob.Register([]interface{}{})
|
|
for {
|
|
if the.nodeMapCount() == 0 {
|
|
log.Printf("nodeList is empty!")
|
|
time.Sleep(time.Second * 10)
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case stopEnable := <-the.sleepCH:
|
|
if stopEnable {
|
|
stopTime := time.Second * 10
|
|
log.Printf("node 处理积压,%v,master 暂停 %v", stopEnable, stopTime)
|
|
time.Sleep(stopTime)
|
|
} else {
|
|
log.Printf("node 处理积压,%v,不正常空数据", stopEnable)
|
|
}
|
|
default:
|
|
}
|
|
|
|
select {
|
|
case data := <-dataChannels.RawDataChan:
|
|
the.notifyData(&data, the.callNodeService)
|
|
case data := <-dataChannels.AggDataChan:
|
|
the.notifyData(&data, the.callNodeService)
|
|
//default:
|
|
// time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}
|
|
}
|
|
func (the *EtMaster) notifyData(data common_models.IDataTrace, callNodeFunc func(*NodeRpc, common_models.IDataTrace)) {
|
|
thingId := data.GetThingId()
|
|
isMatch := false
|
|
the.nodeMap.Range(func(address, value interface{}) bool {
|
|
if nodePtr, ok := value.(*NodeRpc); ok {
|
|
if nodePtr != nil {
|
|
if contains(nodePtr.args.ThingIds, thingId) {
|
|
isMatch = true
|
|
go callNodeFunc(nodePtr, data)
|
|
return false
|
|
}
|
|
}
|
|
}
|
|
|
|
return true
|
|
})
|
|
|
|
//无匹配触发 reBalance
|
|
if !isMatch {
|
|
nodePtr := the.getNodeWithMinThings()
|
|
if nodePtr != nil {
|
|
nodePtr.args.ThingIds = append(nodePtr.args.ThingIds, thingId)
|
|
log.Printf("thingId:[%s]被分配到node:[%s]", thingId, nodePtr.args.Addr)
|
|
go callNodeFunc(nodePtr, data)
|
|
}
|
|
}
|
|
}
|
|
|
|
// callNodeService 调用 etNode 的RPC服务
|
|
func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrace) {
|
|
if node.client == nil {
|
|
log.Printf("node [%v] client=nil", node.args)
|
|
return
|
|
}
|
|
|
|
var serviceMethod = ""
|
|
var resultCH chan int
|
|
var v interface{}
|
|
|
|
switch data.(type) {
|
|
case *common_models.IotaData:
|
|
v = data.(*common_models.IotaData)
|
|
the.exporter.OnIotaData2metricByPrometheus(data.(*common_models.IotaData))
|
|
serviceMethod = "etNode.IotaDataHandler"
|
|
resultCH = node.resultCH
|
|
case *common_models.AggData:
|
|
v = data.(*common_models.AggData)
|
|
serviceMethod = "etNode.AggDataHandler"
|
|
resultCH = node.aggResultCH
|
|
default:
|
|
log.Printf("Unknown kafka data type:%v", v)
|
|
return
|
|
}
|
|
|
|
log.Printf("RPC[%s] node待处理的数据:%+v \n", serviceMethod, v)
|
|
|
|
go func() {
|
|
defer timeCost(node.args.ID, data.Q(), time.Now())
|
|
var reply bool
|
|
err := node.client.Call(serviceMethod, data, &reply)
|
|
|
|
result := boolToInt(reply)
|
|
|
|
if err != nil {
|
|
isAggParseErr := strings.Contains(err.Error(), "aggData非法数据")
|
|
log.Printf("master调用node异常。Error:%s", err.Error())
|
|
if !isAggParseErr {
|
|
// rpc 调用node, err:read tcp 10.8.30.104:57230->10.8.30.104:40000: wsarecv: An existing connection was forcibly closed by the remote host.
|
|
result = 2
|
|
}
|
|
}
|
|
resultCH <- result
|
|
}()
|
|
|
|
// RPC调用结果
|
|
errorCode := 0
|
|
timeoutMills := 5 * 1000 * time.Millisecond
|
|
select {
|
|
case reply := <-resultCH:
|
|
// reply 0=false(RPC访问结果返回false),1=true(RPC访问结果返回true),2访问RPC网络异常
|
|
if reply == 2 {
|
|
log.Printf("RPC[%s]node连接已被关闭。未处理的数据*** %+v *** \n\n", serviceMethod, v)
|
|
errorCode = 200
|
|
} else if reply == 0 {
|
|
//log.Printf("RPC[%s]node处理后回复false。处理失败的数据*** %+v *** \n\n", serviceMethod, v)
|
|
errorCode = 100
|
|
}
|
|
case <-time.After(timeoutMills):
|
|
log.Printf("RPC[%s]node调用超时退出gorutine,timeout:%v。未处理的数据*** %+v *** \n\n", serviceMethod, timeoutMills, v)
|
|
errorCode = 300
|
|
}
|
|
|
|
// 100 故障:程序内部问题
|
|
// 200 故障:网络通信问题
|
|
// 300 故障:处理超时
|
|
if errorCode >= 200 {
|
|
the.errorHandle(errorCode, node.args.Addr, fmt.Sprintf("%s|%s", data.R(), data.T()))
|
|
} else {
|
|
//log.Printf("node[%s]node处理后回复true。处理成功的数据*** %+v *** \n\n", node.args.Addr, data.R(), data.T())
|
|
log.Printf("RPC[%s]node已处理的数据errorCode=%d *** %+v *** \n\n", serviceMethod, errorCode, v)
|
|
}
|
|
}
|
|
|
|
// NodeRegister 是 RPC 服务方法,由 et_node 远程调用
|
|
func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, reply *bool) error {
|
|
node := &NodeRpc{
|
|
args: nodeArgs,
|
|
resultCH: make(chan int, 1),
|
|
aggResultCH: make(chan int, 1),
|
|
client: nil,
|
|
}
|
|
//master 初始化 node client
|
|
client, err := rpc.Dial("tcp", nodeArgs.Addr)
|
|
if err != nil {
|
|
log.Printf("链接node失败-> node[%v]", nodeArgs.Addr)
|
|
return err
|
|
}
|
|
|
|
node.client = client
|
|
the.addOrUpdate(nodeArgs.Addr, node)
|
|
log.Printf("node服务[%v] 注册成功", nodeArgs)
|
|
the.printNodes()
|
|
*reply = true
|
|
return nil
|
|
}
|
|
|
|
func (the *EtMaster) NodeHeart(nodeArgs *common_models.NodeArgs, reply *bool) error {
|
|
if !the.clientIsValid(nodeArgs.Addr) {
|
|
log.Printf("收到-未注册的node[%v] 心跳", nodeArgs)
|
|
*reply = false
|
|
err := the.NodeRegister(nodeArgs, reply)
|
|
if err != nil {
|
|
return errors.New("未注册的node")
|
|
} else {
|
|
*reply = true
|
|
log.Printf("收到未注册的node[%v]心跳,master已将node重新注册。", nodeArgs)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
log.Printf("收到-node[%v] 心跳", nodeArgs)
|
|
*reply = true
|
|
|
|
return nil
|
|
}
|
|
|
|
// NodeUnRegister 节点RPC 注销
|
|
func (the *EtMaster) NodeUnRegister(nodeArgs *common_models.NodeArgs, reply *bool) error {
|
|
value, ok := the.nodeMap.Load(nodeArgs.Addr)
|
|
node := value.(*NodeRpc)
|
|
if ok && node.client != nil {
|
|
err := node.client.Close()
|
|
if err != nil {
|
|
log.Printf("节点[%s] client关闭异常 %s", nodeArgs.Addr, err.Error())
|
|
}
|
|
the.nodeMap.Delete(nodeArgs.Addr)
|
|
}
|
|
|
|
log.Printf("node服务[%v] 注销成功", nodeArgs)
|
|
*reply = true
|
|
return nil
|
|
}
|
|
|
|
func (the *EtMaster) WaitNodeRegister() {
|
|
log.Println("等待 node进行注册")
|
|
for {
|
|
if the.nodeMapCount() > 0 {
|
|
break
|
|
}
|
|
time.Sleep(time.Second * 10)
|
|
}
|
|
}
|
|
|
|
func (the *EtMaster) ConnectNode() {
|
|
the.nodeMap.Range(func(key, value interface{}) bool {
|
|
node := value.(*NodeRpc)
|
|
nodeAddr := key.(string)
|
|
|
|
if node.client == nil {
|
|
client, err := rpc.Dial("tcp", nodeAddr)
|
|
if err != nil {
|
|
log.Printf("链接node失败-> node[%v]", nodeAddr)
|
|
return true
|
|
}
|
|
|
|
node.client = client
|
|
the.nodeMap.Store(nodeAddr, node)
|
|
}
|
|
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (the *EtMaster) addOrUpdate(key string, newNode *NodeRpc) {
|
|
if val, ok := the.nodeMap.Load(key); ok {
|
|
hisNode := val.(*NodeRpc)
|
|
hisNode.client = newNode.client
|
|
the.nodeMap.Store(key, hisNode)
|
|
} else {
|
|
the.nodeMap.Store(key, newNode)
|
|
}
|
|
}
|
|
func (the *EtMaster) nodeMapCount() int {
|
|
count := 0
|
|
the.nodeMap.Range(func(key, value interface{}) bool {
|
|
count++
|
|
return true
|
|
})
|
|
return count
|
|
}
|
|
func (the *EtMaster) clientIsValid(address string) bool {
|
|
val, ok := the.nodeMap.Load(address)
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
if val.(*NodeRpc).client == nil {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// 获取最少things的节点
|
|
func (the *EtMaster) getNodeWithMinThings() *NodeRpc {
|
|
var minNode *NodeRpc
|
|
minThings := math.MaxInt64 // 初始化为最大值
|
|
|
|
the.nodeMap.Range(func(key, value interface{}) bool {
|
|
node := value.(*NodeRpc)
|
|
if len(node.args.ThingIds) < minThings {
|
|
minThings = len(node.args.ThingIds)
|
|
minNode = node
|
|
}
|
|
|
|
return true
|
|
})
|
|
|
|
return minNode
|
|
}
|
|
func (the *EtMaster) printNodes() {
|
|
count := 0
|
|
info := ""
|
|
the.nodeMap.Range(func(key, value interface{}) bool {
|
|
count++
|
|
node := value.(*NodeRpc)
|
|
info += fmt.Sprintf("%s,%s\n", node.args.ID, node.args.Addr)
|
|
return true
|
|
})
|
|
countInfo := fmt.Sprintf("共[%d]个节点:\n ", count)
|
|
log.Printf("%s %s", countInfo, info)
|
|
}
|
|
func (the *EtMaster) errorHandle(errCode int, address string, dataDesc string) {
|
|
val, ok := the.nodeMap.Load(address)
|
|
if !ok {
|
|
log.Printf("【tidyNodes】Error:不存在的node[%s]\n", address)
|
|
return
|
|
}
|
|
node := val.(*NodeRpc)
|
|
|
|
//发送 stop 信号
|
|
the.sleepCH <- true
|
|
log.Println("=============================================")
|
|
|
|
// 100 故障:程序内部错误
|
|
// 200 故障:网络通信问题
|
|
// 300 故障:处理超时
|
|
if errCode == 200 {
|
|
log.Printf("node[%v]连接已中断,休眠5秒后,将删除该节点。消息:%s", node.args.Addr, dataDesc)
|
|
time.Sleep(time.Second * 5)
|
|
the.nodeMap.Delete(address)
|
|
} else if errCode == 300 {
|
|
log.Printf("node[%s]处理超时,将休眠5秒后,将删除该节点。消息:%s", address, dataDesc)
|
|
time.Sleep(time.Second * 5)
|
|
the.nodeMap.Delete(address)
|
|
}
|
|
|
|
the.printNodes()
|
|
}
|
|
|
|
func contains(arr []string, target string) bool {
|
|
for _, value := range arr {
|
|
if value == target {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
func timeCost(nodeId, deviceId string, start time.Time) {
|
|
tc := time.Since(start)
|
|
log.Printf("master调用node[%s],处理[%s]耗时%v", nodeId, deviceId, tc)
|
|
}
|
|
func boolToInt(b bool) int {
|
|
if b {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
|