diff --git a/dataSource/kafka/kafka_handler.go b/dataSource/kafka/kafka_handler.go index 281b298..bd2eccc 100644 --- a/dataSource/kafka/kafka_handler.go +++ b/dataSource/kafka/kafka_handler.go @@ -61,8 +61,8 @@ func NewMessageHandler(cfgName string) IMessageHandler { switch cfgName { case "data_raw": return IotaDataHandler{} - case "alarm_agg": - return AggDataHandler{} + case "data_agg": + return NewAggDataHandler() default: return nil } diff --git a/et_Info/InfoHandler.go b/et_Info/InfoHandler.go index 91e81f4..7a5e18f 100644 --- a/et_Info/InfoHandler.go +++ b/et_Info/InfoHandler.go @@ -75,3 +75,9 @@ func (the *InfoHandler) getFormulaInfo(p *common_models.ProcessData) { } } + +//func (the *InfoHandler) getThresholdInfo(p *common_models.ProcessData) { +// for _, stationInfo := range p.Stations { +// the.configHelper.GetStationThreshold(stationInfo.Info.Id) +// } +//} diff --git a/master/app/et_master.go b/master/app/et_master.go index 7f84a02..38a266e 100644 --- a/master/app/et_master.go +++ b/master/app/et_master.go @@ -15,24 +15,26 @@ import ( "time" ) -func NewEtMaster() EtMaster { - master := EtMaster{ - exporter: et_prometheus_exporter.NewPrometheusExporter(), - sleepCH: make(chan bool, 1), - } - return master -} - type EtMaster struct { nodeList []*NodeRpc exporter et_prometheus_exporter.PrometheusExporter sleepCH chan bool } +func NewEtMaster() *EtMaster { + master := EtMaster{ + nodeList: make([]*NodeRpc, 0), + exporter: et_prometheus_exporter.NewPrometheusExporter(), + sleepCH: make(chan bool, 1), + } + return &master +} + type NodeRpc struct { - args *common_models.NodeArgs // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输入参数 - resultCH chan bool // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输出结果 - client *rpc.Client + args *common_models.NodeArgs // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输入参数 + resultCH chan bool // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输出结果 + aggResultCH chan bool // 聚集数据被处理后的返回结果 对应 Replay 参数 + client *rpc.Client } // RegisterListen 启动 master RPC服务 @@ -57,17 +59,139 @@ func (the *EtMaster) RegisterListen() { if err != nil { log.Println("master rpc Accept异常") } - log.Printf("master Accept注册链接 from node[%s]", conn.RemoteAddr()) + log.Printf("master Accept注册链接 from node[%s]", conn.RemoteAddr()) go rpc.ServeConn(conn) } } +// DistributeData 分发数据。 +// 监听两个数据通道RawDataChan和AggDataChan,根据不同类型的数据通道接收到的数据,调用notifyData方法进行相应的处理操作。 +func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) { + //数据类型注册 + gob.Register([]interface{}{}) + for { + if len(the.nodeList) == 0 { + log.Printf("nodeList is empty!") + time.Sleep(time.Second * 10) + continue + } + + select { + case stopEnable := <-the.sleepCH: + if stopEnable { + stopTime := time.Second * 10 + log.Printf("node 处理积压,%v,master 暂停 %v", stopEnable, stopTime) + time.Sleep(stopTime) + } else { + log.Printf("node 处理积压,%v,不正常空数据", stopEnable) + } + default: + } + + select { + case data := <-dataChannels.RawDataChan: + the.notifyData(&data, the.callNodeService) + case data := <-dataChannels.AggDataChan: + the.notifyData(&data, the.callNodeService) + //default: + // time.Sleep(100 * time.Millisecond) + } + } +} +func (the *EtMaster) notifyData(data common_models.IDataTrace, callNodeFunc func(*NodeRpc, common_models.IDataTrace)) { + thingId := data.GetThingId() + isMatch := false + for _, nodeRpc := range the.nodeList { + if nodeRpc != nil { + if contains(nodeRpc.args.ThingIds, thingId) { + isMatch = true + go callNodeFunc(nodeRpc, data) + } + } + } + + //无匹配触发 reBalance + if !isMatch { + if len(the.nodeList) > 0 { + the.sortNodeListByThingCount() + if the.nodeList[0] != nil { + the.nodeList[0].args.ThingIds = append(the.nodeList[0].args.ThingIds, thingId) + log.Printf("thingId:[%s] 分配到node:[%s]", thingId, the.nodeList[0].args.Addr) + go callNodeFunc(the.nodeList[0], data) + } + } + } +} + +// callNodeService 调用 etNode 的RPC服务 +func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrace) { + if node.client == nil { + log.Printf("node [%v] client=nil", node.args) + return + } + + var serviceMethod = "" + var resultCH chan bool + var v interface{} + + switch data.(type) { + case *common_models.IotaData: + v = data.(*common_models.IotaData) + the.exporter.OnIotaData2metricByPrometheus(data.(*common_models.IotaData)) + serviceMethod = "etNode.IotaDataHandler" + resultCH = node.resultCH + case *common_models.AggData: + v = data.(*common_models.AggData) + serviceMethod = "etNode.AggDataHandler" + resultCH = node.aggResultCH + default: + log.Printf("Unknown type:%v", v) + return + } + + log.Printf("RPC[%s]node待处理的数据:%+v, \n", serviceMethod, v) + + go func() { + defer timeCost(node.args.ID, data.Q(), time.Now()) + var reply bool + err := node.client.Call(serviceMethod, data, &reply) + if err != nil { + log.Printf("rpc 调用node, err:%s", err.Error()) + } + resultCH <- reply + }() + + // RPC调用结果 + var result bool + timeoutMills := 1000 * time.Millisecond + select { + case reply := <-resultCH: + result = reply + log.Printf("RPC[%s]node处理后回复:%v。已处理的数据:%+v \n", serviceMethod, reply, v) + case <-time.After(timeoutMills): + log.Printf("RPC[%s]node调用超时退出,timeout:%v。需要处理的数据:%+v \n", serviceMethod, timeoutMills, v) + result = false + } + log.Printf("node[%s]处理数据结果:%v。%s %s", node.args.Addr, result, data.R(), data.T()) + + if result == false { + //发送 stop 信号 + the.sleepCH <- true + log.Println("=============================================") + log.Printf("node[%s]处理[%s|%s]异常,触发nodesTidy", node.args.Addr, data.R(), data.T()) + time.Sleep(time.Second * 5) + node = nil + the.nodesTidy() + } +} + // NodeRegister 是 RPC 服务方法,由 et_node 远程调用 func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, replay *bool) error { node := &NodeRpc{ - args: nodeArgs, - resultCH: make(chan bool, 1), - client: nil, + args: nodeArgs, + resultCH: make(chan bool, 1), + aggResultCH: make(chan bool, 1), + client: nil, } //master 初始化 node client client, err := rpc.Dial("tcp", node.args.Addr) @@ -127,24 +251,15 @@ func (the *EtMaster) nodesTidy() { printNodesInfo(the.nodeList) } -func updateNodeList(nodes []*NodeRpc) []*NodeRpc { - var newNodes []*NodeRpc - for _, node := range nodes { - if node != nil && node.client != nil { - newNodes = append(newNodes, node) +func (the *EtMaster) sortNodeListByThingCount() { + sort.Slice(the.nodeList, func(i, j int) bool { + if the.nodeList[i] != nil && the.nodeList[j] != nil { + return len(the.nodeList[i].args.ThingIds) < len(the.nodeList[j].args.ThingIds) + } else { + return false } - } - return newNodes -} -func printNodesInfo(nodes []*NodeRpc) { - info := fmt.Sprintf("共[%d]个节点:\n ", len(nodes)) - for _, node := range nodes { - info += fmt.Sprintf("%s,%s\n", node.args.ID, node.args.Addr) - } - log.Println(info) - + }) } - func (the *EtMaster) WaitNodeRegister() { log.Println("等待 node进行注册") for { @@ -168,105 +283,22 @@ func (the *EtMaster) ConnectNode() { } } -func (the *EtMaster) call_etNode(node *NodeRpc, args *common_models.IotaData) { - if node.client == nil { - log.Printf("node [%v] client=nil", node.args) - return - } - - the.exporter.OnIotaData2metricByPrometheus(args) - resultCH := make(chan bool, 1) - go func() { - defer timeCost(node.args.ID, args.DeviceId, time.Now()) - var rpcResult bool - err := node.client.Call("etNode.Handler", args, &rpcResult) - if err != nil { - log.Printf("rpc 调用node, err:%s", err.Error()) - } - resultCH <- rpcResult - }() - - //等result - var result bool - timeOut := 1000 * time.Millisecond - select { - case result = <-resultCH: - case <-time.After(timeOut): - log.Printf("node[%s]处理[%s|%s]超过 %v,超时", node.args.Addr, args.DeviceId, args.TriggerTime, timeOut) - result = false - } - //log.Printf("node[%s]处理[%s|%s]结果=%v", node.args.Addr, args.DeviceId, args.TriggerTime, result) - if result == false { - //发送 stop 信号 - - the.sleepCH <- true - log.Println("=============================================") - log.Printf("node[%s]处理[%s|%s]异常,触发nodesTidy", node.args.Addr, args.DeviceId, args.TriggerTime) - time.Sleep(time.Second * 5) - node.client = nil - the.nodesTidy() - } -} - -func (the *EtMaster) call_aggNode(node *NodeRpc, args *common_models.AggData) { - if node.client == nil { - log.Printf("et node [%v] client=nil", node.args) - return - } - go func() { - var response bool - err := node.client.Call("aggNode.Handler", args, &response) - if err != nil { - log.Printf("rpc err:%s", err.Error()) +// app 包内公用方法 +func updateNodeList(nodes []*NodeRpc) []*NodeRpc { + var newNodes []*NodeRpc + for _, node := range nodes { + if node != nil && node.client != nil { + newNodes = append(newNodes, node) } - // 将 rpc 返回值写入 chan - node.resultCH <- response - }() - - // 接收数据 - var result bool - select { - case result = <-node.resultCH: - case <-time.After(2 * time.Second): - log.Println("rpc [aggHandler.Handler] 调用超时,退出。") - result = false } - log.Printf("agg node[%s]处理结果:%v", node.args.Addr, result) + return newNodes } - -// DistributeData 分发源数据给各类型处理节点。 -// 通过不断从不同类型的数据通道中读取数据,并根据节点类型分发数据进行处理,可以实现数据的采集和分发功能。 -func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) { - - //数据类型注册 - gob.Register([]interface{}{}) - - for { - if len(the.nodeList) == 0 { - log.Printf("nodeList is empty!") - time.Sleep(time.Second * 10) - continue - } - - select { - case stopEnable := <-the.sleepCH: - if stopEnable { - stopTime := time.Second * 10 - log.Printf("node 处理积压,%v,master 暂停 %v", stopEnable, stopTime) - time.Sleep(stopTime) - } else { - log.Printf("node 处理积压,%v,不正常空数据", stopEnable) - } - default: - } - - select { - case args := <-dataChannels.RawDataChan: - the.notifyRawData("etNode", args) - case args := <-dataChannels.AggDataChan: - the.notifyAggData("etAgg", args) - } +func printNodesInfo(nodes []*NodeRpc) { + info := fmt.Sprintf("共[%d]个节点:\n ", len(nodes)) + for _, node := range nodes { + info += fmt.Sprintf("%s,%s\n", node.args.ID, node.args.Addr) } + log.Println(info) } func contains(arr []string, target string) bool { for _, value := range arr { @@ -276,42 +308,6 @@ func contains(arr []string, target string) bool { } return false } -func (the *EtMaster) sortNodeListByThingCount() { - sort.Slice(the.nodeList, func(i, j int) bool { - return len(the.nodeList[i].args.ThingIds) < len(the.nodeList[j].args.ThingIds) - }) -} -func (the *EtMaster) notifyRawData(nodeType string, d common_models.IotaData) { - - isMatch := false - for _, nodeRpc := range the.nodeList { - if contains(nodeRpc.args.ThingIds, d.ThingId) { - isMatch = true - switch nodeRpc.args.NodeType { - case nodeType: - the.call_etNode(nodeRpc, &d) - } - } - } - //无匹配触发 reBalance - if !isMatch { - if len(the.nodeList) > 0 { - the.sortNodeListByThingCount() - the.nodeList[0].args.ThingIds = append(the.nodeList[0].args.ThingIds, d.ThingId) - log.Printf("thingId:[%s] 分配到node:[%s]", d.ThingId, the.nodeList[0].args.Addr) - the.call_etNode(the.nodeList[0], &d) - } - } -} -func (the *EtMaster) notifyAggData(nodeType string, d common_models.AggData) { - for _, nodeRpc := range the.nodeList { - switch nodeRpc.args.NodeType { - case nodeType: - go the.call_aggNode(nodeRpc, &d) - } - } -} - func timeCost(nodeId, deviceId string, start time.Time) { tc := time.Since(start) log.Printf("调用node[%s],[%s]耗时 = %v", nodeId, deviceId, tc) diff --git a/node/app/et_node.go b/node/app/et_node.go index f047638..5d71579 100644 --- a/node/app/et_node.go +++ b/node/app/et_node.go @@ -1,6 +1,8 @@ package app import ( + "context" + "et_analyze" "fmt" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils" @@ -15,10 +17,11 @@ import ( ) type EtNode struct { - nodeInfo *common_models.NodeArgs - master *rpcMaster - ch chan *common_models.ProcessData - recvDataHandler *et_recv.RecvDataHanler + nodeInfo *common_models.NodeArgs + master *rpcMaster + ch chan *common_models.ProcessData + recvDataHandler *et_recv.RecvDataHanler + aggAnalyzeHandler *et_analyze.AggThresholdHandler } type rpcMaster struct { @@ -30,22 +33,22 @@ const chSize = 1 func NewEtWorker() *EtNode { node := &EtNode{ - ch: make(chan *common_models.ProcessData, chSize), - recvDataHandler: et_recv.NewRecvDataHanler(), + ch: make(chan *common_models.ProcessData, chSize), + recvDataHandler: et_recv.NewRecvDataHanler(), + aggAnalyzeHandler: et_analyze.NewAggThresholdHandler(), } node.exitMonitor() node.heartMonitor() return node } -// Handler 是 RPC 服务方法,由 master 远程调用 -func (the *EtNode) Handler(iotaData common_models.IotaData, replay *bool) error { - *replay = true +// IotaDataHandler 是 RPC 服务方法,由 master 远程调用 +func (the *EtNode) IotaDataHandler(iotaData common_models.IotaData, reply *bool) error { + *reply = true err := the.ConsumerProcess(&iotaData) if err != nil { - *replay = false + *reply = false } - return err } @@ -69,25 +72,39 @@ func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error { return nil } +// AggDataHandler 聚集阈值处理者,被 master 远程调用 +func (the *EtNode) AggDataHandler(aggData common_models.AggData, reply *bool) error { + *reply = true + err := the.aggAnalyzeHandler.ProcessData(&aggData) + if err != nil { + errmsg := fmt.Sprintf("[etNode.AggDataHandler]变化速率阈值分析%s[aggTypeId:%d]ERROR: %v", aggData.R(), aggData.AggTypeId, err) + log.Println(errmsg) + return err + } + + log.Printf("[etNode.AggDataHandler]变化速率阈值分析SUCCESS。%s[aggTypeId:%d]changed[%v]", aggData.R(), aggData.AggTypeId, aggData.Changed) + return nil +} + // 实现源接口 -//func (the *EtNode) Process(ctx context.Context) (<-chan any, error) { -// source := make(chan any, chSize) -// go func() { -// defer close(source) -// for { -// select { -// case a := <-the.ch: -// source <- a -// log.Printf("存储数据=>source out,len=%d,%d", len(source), len(the.ch)) -// case <-ctx.Done(): -// log.Println("退出[source] EtNode.Process") -// return -// } -// -// } -// }() -// return source, nil -//} +func (the *EtNode) Process(ctx context.Context) (<-chan any, error) { + source := make(chan any, chSize) + go func() { + defer close(source) + for { + select { + case a := <-the.ch: + source <- a + log.Printf("存储数据=>source out,len=%d,%d", len(source), len(the.ch)) + case <-ctx.Done(): + log.Println("退出[source] EtNode.Process") + return + } + + } + }() + return source, nil +} // RegisterToMaster 调用 master 发布的RPC服务方法 master.NodeRegister func (the *EtNode) RegisterToMaster() { @@ -116,7 +133,7 @@ func (the *EtNode) RegisterToMaster() { ipPrefix := configLoad.LoadConfig().GetString("node.hostIpPrefix") ip4 := common_utils.ReadIP4WithPrefixFirst(ipPrefix) hostName, err := os.Hostname() - log.Printf("node [%s] ip=%s", hostName, ip4) + log.Printf("node [%s] ip=%s\n", hostName, ip4) port := configLoad.LoadConfig().GetUint16("node.port") callNodeAddr := fmt.Sprintf("%s:%d", ip4, port) @@ -169,9 +186,9 @@ func (the *EtNode) heartToMaster() { func (the *EtNode) UnRegisterToMaster() { var result bool if err := the.master.conn.Call("master.NodeUnRegister", the.nodeInfo, &result); err != nil { - log.Printf("node[%s] 注销到 master,异常:%v", the.nodeInfo.Addr, err.Error()) + log.Printf("node[%s] 从master注销,异常:%v", the.nodeInfo.Addr, err.Error()) } else { - log.Printf("node[%s] 注销到 master,结果:%v", the.nodeInfo.Addr, result) + log.Printf("node[%s] 从master注销,结果:%v", the.nodeInfo.Addr, result) } } func (the *EtNode) exitMonitor() {