package app import ( "dataSource" "encoding/gob" "errors" "et_prometheus_exporter" "fmt" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils/configLoad" "log" "net" "net/rpc" "sort" "time" ) type EtMaster struct { nodeList []*NodeRpc exporter et_prometheus_exporter.PrometheusExporter sleepCH chan bool } func NewEtMaster() *EtMaster { master := EtMaster{ nodeList: make([]*NodeRpc, 0), exporter: et_prometheus_exporter.NewPrometheusExporter(), sleepCH: make(chan bool, 1), } return &master } type NodeRpc struct { args *common_models.NodeArgs // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输入参数 resultCH chan bool // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输出结果 aggResultCH chan bool // 聚集数据被处理后的返回结果 对应 Replay 参数 client *rpc.Client } // RegisterListen 启动 master RPC服务 func (the *EtMaster) RegisterListen() { //监听 err := rpc.RegisterName("master", the) if err != nil { log.Println("master 提供注册服务异常") return } port := configLoad.LoadConfig().GetUint16("master.port") listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { log.Panic("master 启动 node服务注册功能异常") } log.Printf("master 启动 node服务注册功能 :%d", port) for { //log.Println("master 监听新注册链接") conn, err := listener.Accept() if err != nil { log.Println("master rpc Accept异常") } log.Printf("master Accept注册链接 from node[%s]", conn.RemoteAddr()) go rpc.ServeConn(conn) } } // DistributeData 分发数据。 // 监听两个数据通道RawDataChan和AggDataChan,根据不同类型的数据通道接收到的数据,调用notifyData方法进行相应的处理操作。 func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) { //数据类型注册 gob.Register([]interface{}{}) for { if len(the.nodeList) == 0 { log.Printf("nodeList is empty!") time.Sleep(time.Second * 10) continue } select { case stopEnable := <-the.sleepCH: if stopEnable { stopTime := time.Second * 10 log.Printf("node 处理积压,%v,master 暂停 %v", stopEnable, stopTime) time.Sleep(stopTime) } else { log.Printf("node 处理积压,%v,不正常空数据", stopEnable) } default: } select { case data := <-dataChannels.RawDataChan: the.notifyData(&data, the.callNodeService) case data := <-dataChannels.AggDataChan: the.notifyData(&data, the.callNodeService) //default: // time.Sleep(100 * time.Millisecond) } } } func (the *EtMaster) notifyData(data common_models.IDataTrace, callNodeFunc func(*NodeRpc, common_models.IDataTrace)) { thingId := data.GetThingId() isMatch := false for _, nodeRpc := range the.nodeList { if nodeRpc != nil { if contains(nodeRpc.args.ThingIds, thingId) { isMatch = true go callNodeFunc(nodeRpc, data) } } } //无匹配触发 reBalance if !isMatch { if len(the.nodeList) > 0 { the.sortNodeListByThingCount() if the.nodeList[0] != nil { the.nodeList[0].args.ThingIds = append(the.nodeList[0].args.ThingIds, thingId) log.Printf("thingId:[%s] 分配到node:[%s]", thingId, the.nodeList[0].args.Addr) go callNodeFunc(the.nodeList[0], data) } } } } // callNodeService 调用 etNode 的RPC服务 func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrace) { if node.client == nil { log.Printf("node [%v] client=nil", node.args) return } var serviceMethod = "" var resultCH chan bool var v interface{} switch data.(type) { case *common_models.IotaData: v = data.(*common_models.IotaData) the.exporter.OnIotaData2metricByPrometheus(data.(*common_models.IotaData)) serviceMethod = "etNode.IotaDataHandler" resultCH = node.resultCH case *common_models.AggData: v = data.(*common_models.AggData) serviceMethod = "etNode.AggDataHandler" resultCH = node.aggResultCH default: log.Printf("Unknown type:%v", v) return } log.Printf("RPC[%s]node待处理的数据:%+v, \n", serviceMethod, v) go func() { defer timeCost(node.args.ID, data.Q(), time.Now()) var reply bool err := node.client.Call(serviceMethod, data, &reply) if err != nil { log.Printf("rpc 调用node, err:%s", err.Error()) } resultCH <- reply }() // RPC调用结果 var result bool timeoutMills := 1000 * time.Millisecond select { case reply := <-resultCH: result = reply log.Printf("RPC[%s]node处理后回复:%v。已处理的数据:%+v \n", serviceMethod, reply, v) case <-time.After(timeoutMills): log.Printf("RPC[%s]node调用超时退出,timeout:%v。需要处理的数据:%+v \n", serviceMethod, timeoutMills, v) result = false } log.Printf("node[%s]处理数据结果:%v。%s %s", node.args.Addr, result, data.R(), data.T()) if result == false { //发送 stop 信号 the.sleepCH <- true log.Println("=============================================") log.Printf("node[%s]处理[%s|%s]异常,触发nodesTidy", node.args.Addr, data.R(), data.T()) time.Sleep(time.Second * 5) node = nil the.nodesTidy() } } // NodeRegister 是 RPC 服务方法,由 et_node 远程调用 func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, replay *bool) error { node := &NodeRpc{ args: nodeArgs, resultCH: make(chan bool, 1), aggResultCH: make(chan bool, 1), client: nil, } //master 初始化 node client client, err := rpc.Dial("tcp", node.args.Addr) if err != nil { log.Printf("链接node失败-> node[%v]", node.args.Addr) return err } node.client = client the.nodeList = append(the.nodeList, node) log.Printf("node服务[%v] 注册成功", nodeArgs) printNodesInfo(the.nodeList) *replay = true return nil } func (the *EtMaster) NodeHeart(nodeArgs *common_models.NodeArgs, replay *bool) error { isRegister := false for _, nodeRpc := range the.nodeList { if nodeRpc.args.Addr == nodeArgs.Addr { isRegister = true } } if !isRegister { log.Printf("收到-未注册的node[%v] 心跳", nodeArgs) *replay = false return errors.New("未注册的node") } log.Printf("收到-node[%v] 心跳", nodeArgs) *replay = true return nil } // NodeUnRegister 节点RPC 注销 func (the *EtMaster) NodeUnRegister(nodeArgs *common_models.NodeArgs, replay *bool) error { for i, node := range the.nodeList { log.Printf("节点[%s] 注销", node.args.Addr) if node.args.Addr == nodeArgs.Addr { err := node.client.Close() if err != nil { log.Printf("节点[%s] client关闭异常 %s", node.args.Addr, err.Error()) } the.nodeList[i] = nil } } the.nodesTidy() log.Printf("node服务[%v] 注销成功", nodeArgs) *replay = true return nil } func (the *EtMaster) nodesTidy() { the.nodeList = updateNodeList(the.nodeList) printNodesInfo(the.nodeList) } func (the *EtMaster) sortNodeListByThingCount() { sort.Slice(the.nodeList, func(i, j int) bool { if the.nodeList[i] != nil && the.nodeList[j] != nil { return len(the.nodeList[i].args.ThingIds) < len(the.nodeList[j].args.ThingIds) } else { return false } }) } func (the *EtMaster) WaitNodeRegister() { log.Println("等待 node进行注册") for { if len(the.nodeList) > 0 { break } time.Sleep(time.Second * 10) } } func (the *EtMaster) ConnectNode() { for i := range the.nodeList { nodeAddr := the.nodeList[i].args.Addr if the.nodeList[i].client == nil { client, err := rpc.Dial("tcp", nodeAddr) if err != nil { log.Printf("链接node失败-> node[%v]", nodeAddr) continue } the.nodeList[i].client = client } } } // app 包内公用方法 func updateNodeList(nodes []*NodeRpc) []*NodeRpc { var newNodes []*NodeRpc for _, node := range nodes { if node != nil && node.client != nil { newNodes = append(newNodes, node) } } return newNodes } func printNodesInfo(nodes []*NodeRpc) { info := fmt.Sprintf("共[%d]个节点:\n ", len(nodes)) for _, node := range nodes { info += fmt.Sprintf("%s,%s\n", node.args.ID, node.args.Addr) } log.Println(info) } func contains(arr []string, target string) bool { for _, value := range arr { if value == target { return true } } return false } func timeCost(nodeId, deviceId string, start time.Time) { tc := time.Since(start) log.Printf("调用node[%s],[%s]耗时 = %v", nodeId, deviceId, tc) }