Browse Source

重构et_master以支持多kafka主题数据处理

dev
yfh 1 month ago
parent
commit
15d2b0737a
  1. 4
      dataSource/kafka/kafka_handler.go
  2. 6
      et_Info/InfoHandler.go
  3. 320
      master/app/et_master.go
  4. 81
      node/app/et_node.go

4
dataSource/kafka/kafka_handler.go

@ -61,8 +61,8 @@ func NewMessageHandler(cfgName string) IMessageHandler {
switch cfgName {
case "data_raw":
return IotaDataHandler{}
case "alarm_agg":
return AggDataHandler{}
case "data_agg":
return NewAggDataHandler()
default:
return nil
}

6
et_Info/InfoHandler.go

@ -75,3 +75,9 @@ func (the *InfoHandler) getFormulaInfo(p *common_models.ProcessData) {
}
}
//func (the *InfoHandler) getThresholdInfo(p *common_models.ProcessData) {
// for _, stationInfo := range p.Stations {
// the.configHelper.GetStationThreshold(stationInfo.Info.Id)
// }
//}

320
master/app/et_master.go

@ -15,24 +15,26 @@ import (
"time"
)
func NewEtMaster() EtMaster {
master := EtMaster{
exporter: et_prometheus_exporter.NewPrometheusExporter(),
sleepCH: make(chan bool, 1),
}
return master
}
type EtMaster struct {
nodeList []*NodeRpc
exporter et_prometheus_exporter.PrometheusExporter
sleepCH chan bool
}
func NewEtMaster() *EtMaster {
master := EtMaster{
nodeList: make([]*NodeRpc, 0),
exporter: et_prometheus_exporter.NewPrometheusExporter(),
sleepCH: make(chan bool, 1),
}
return &master
}
type NodeRpc struct {
args *common_models.NodeArgs // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输入参数
resultCH chan bool // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输出结果
client *rpc.Client
args *common_models.NodeArgs // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输入参数
resultCH chan bool // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输出结果
aggResultCH chan bool // 聚集数据被处理后的返回结果 对应 Replay 参数
client *rpc.Client
}
// RegisterListen 启动 master RPC服务
@ -57,17 +59,139 @@ func (the *EtMaster) RegisterListen() {
if err != nil {
log.Println("master rpc Accept异常")
}
log.Printf("master Accept注册链接 from node[%s]", conn.RemoteAddr())
log.Printf("master Accept注册链接 from node[%s]", conn.RemoteAddr())
go rpc.ServeConn(conn)
}
}
// DistributeData 分发数据。
// 监听两个数据通道RawDataChan和AggDataChan,根据不同类型的数据通道接收到的数据,调用notifyData方法进行相应的处理操作。
func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) {
//数据类型注册
gob.Register([]interface{}{})
for {
if len(the.nodeList) == 0 {
log.Printf("nodeList is empty!")
time.Sleep(time.Second * 10)
continue
}
select {
case stopEnable := <-the.sleepCH:
if stopEnable {
stopTime := time.Second * 10
log.Printf("node 处理积压,%v,master 暂停 %v", stopEnable, stopTime)
time.Sleep(stopTime)
} else {
log.Printf("node 处理积压,%v,不正常空数据", stopEnable)
}
default:
}
select {
case data := <-dataChannels.RawDataChan:
the.notifyData(&data, the.callNodeService)
case data := <-dataChannels.AggDataChan:
the.notifyData(&data, the.callNodeService)
//default:
// time.Sleep(100 * time.Millisecond)
}
}
}
func (the *EtMaster) notifyData(data common_models.IDataTrace, callNodeFunc func(*NodeRpc, common_models.IDataTrace)) {
thingId := data.GetThingId()
isMatch := false
for _, nodeRpc := range the.nodeList {
if nodeRpc != nil {
if contains(nodeRpc.args.ThingIds, thingId) {
isMatch = true
go callNodeFunc(nodeRpc, data)
}
}
}
//无匹配触发 reBalance
if !isMatch {
if len(the.nodeList) > 0 {
the.sortNodeListByThingCount()
if the.nodeList[0] != nil {
the.nodeList[0].args.ThingIds = append(the.nodeList[0].args.ThingIds, thingId)
log.Printf("thingId:[%s] 分配到node:[%s]", thingId, the.nodeList[0].args.Addr)
go callNodeFunc(the.nodeList[0], data)
}
}
}
}
// callNodeService 调用 etNode 的RPC服务
func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrace) {
if node.client == nil {
log.Printf("node [%v] client=nil", node.args)
return
}
var serviceMethod = ""
var resultCH chan bool
var v interface{}
switch data.(type) {
case *common_models.IotaData:
v = data.(*common_models.IotaData)
the.exporter.OnIotaData2metricByPrometheus(data.(*common_models.IotaData))
serviceMethod = "etNode.IotaDataHandler"
resultCH = node.resultCH
case *common_models.AggData:
v = data.(*common_models.AggData)
serviceMethod = "etNode.AggDataHandler"
resultCH = node.aggResultCH
default:
log.Printf("Unknown type:%v", v)
return
}
log.Printf("RPC[%s]node待处理的数据:%+v, \n", serviceMethod, v)
go func() {
defer timeCost(node.args.ID, data.Q(), time.Now())
var reply bool
err := node.client.Call(serviceMethod, data, &reply)
if err != nil {
log.Printf("rpc 调用node, err:%s", err.Error())
}
resultCH <- reply
}()
// RPC调用结果
var result bool
timeoutMills := 1000 * time.Millisecond
select {
case reply := <-resultCH:
result = reply
log.Printf("RPC[%s]node处理后回复:%v。已处理的数据:%+v \n", serviceMethod, reply, v)
case <-time.After(timeoutMills):
log.Printf("RPC[%s]node调用超时退出,timeout:%v。需要处理的数据:%+v \n", serviceMethod, timeoutMills, v)
result = false
}
log.Printf("node[%s]处理数据结果:%v。%s %s", node.args.Addr, result, data.R(), data.T())
if result == false {
//发送 stop 信号
the.sleepCH <- true
log.Println("=============================================")
log.Printf("node[%s]处理[%s|%s]异常,触发nodesTidy", node.args.Addr, data.R(), data.T())
time.Sleep(time.Second * 5)
node = nil
the.nodesTidy()
}
}
// NodeRegister 是 RPC 服务方法,由 et_node 远程调用
func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, replay *bool) error {
node := &NodeRpc{
args: nodeArgs,
resultCH: make(chan bool, 1),
client: nil,
args: nodeArgs,
resultCH: make(chan bool, 1),
aggResultCH: make(chan bool, 1),
client: nil,
}
//master 初始化 node client
client, err := rpc.Dial("tcp", node.args.Addr)
@ -127,24 +251,15 @@ func (the *EtMaster) nodesTidy() {
printNodesInfo(the.nodeList)
}
func updateNodeList(nodes []*NodeRpc) []*NodeRpc {
var newNodes []*NodeRpc
for _, node := range nodes {
if node != nil && node.client != nil {
newNodes = append(newNodes, node)
func (the *EtMaster) sortNodeListByThingCount() {
sort.Slice(the.nodeList, func(i, j int) bool {
if the.nodeList[i] != nil && the.nodeList[j] != nil {
return len(the.nodeList[i].args.ThingIds) < len(the.nodeList[j].args.ThingIds)
} else {
return false
}
}
return newNodes
}
func printNodesInfo(nodes []*NodeRpc) {
info := fmt.Sprintf("共[%d]个节点:\n ", len(nodes))
for _, node := range nodes {
info += fmt.Sprintf("%s,%s\n", node.args.ID, node.args.Addr)
}
log.Println(info)
})
}
func (the *EtMaster) WaitNodeRegister() {
log.Println("等待 node进行注册")
for {
@ -168,105 +283,22 @@ func (the *EtMaster) ConnectNode() {
}
}
func (the *EtMaster) call_etNode(node *NodeRpc, args *common_models.IotaData) {
if node.client == nil {
log.Printf("node [%v] client=nil", node.args)
return
}
the.exporter.OnIotaData2metricByPrometheus(args)
resultCH := make(chan bool, 1)
go func() {
defer timeCost(node.args.ID, args.DeviceId, time.Now())
var rpcResult bool
err := node.client.Call("etNode.Handler", args, &rpcResult)
if err != nil {
log.Printf("rpc 调用node, err:%s", err.Error())
}
resultCH <- rpcResult
}()
//等result
var result bool
timeOut := 1000 * time.Millisecond
select {
case result = <-resultCH:
case <-time.After(timeOut):
log.Printf("node[%s]处理[%s|%s]超过 %v,超时", node.args.Addr, args.DeviceId, args.TriggerTime, timeOut)
result = false
}
//log.Printf("node[%s]处理[%s|%s]结果=%v", node.args.Addr, args.DeviceId, args.TriggerTime, result)
if result == false {
//发送 stop 信号
the.sleepCH <- true
log.Println("=============================================")
log.Printf("node[%s]处理[%s|%s]异常,触发nodesTidy", node.args.Addr, args.DeviceId, args.TriggerTime)
time.Sleep(time.Second * 5)
node.client = nil
the.nodesTidy()
}
}
func (the *EtMaster) call_aggNode(node *NodeRpc, args *common_models.AggData) {
if node.client == nil {
log.Printf("et node [%v] client=nil", node.args)
return
}
go func() {
var response bool
err := node.client.Call("aggNode.Handler", args, &response)
if err != nil {
log.Printf("rpc err:%s", err.Error())
// app 包内公用方法
func updateNodeList(nodes []*NodeRpc) []*NodeRpc {
var newNodes []*NodeRpc
for _, node := range nodes {
if node != nil && node.client != nil {
newNodes = append(newNodes, node)
}
// 将 rpc 返回值写入 chan
node.resultCH <- response
}()
// 接收数据
var result bool
select {
case result = <-node.resultCH:
case <-time.After(2 * time.Second):
log.Println("rpc [aggHandler.Handler] 调用超时,退出。")
result = false
}
log.Printf("agg node[%s]处理结果:%v", node.args.Addr, result)
return newNodes
}
// DistributeData 分发源数据给各类型处理节点。
// 通过不断从不同类型的数据通道中读取数据,并根据节点类型分发数据进行处理,可以实现数据的采集和分发功能。
func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) {
//数据类型注册
gob.Register([]interface{}{})
for {
if len(the.nodeList) == 0 {
log.Printf("nodeList is empty!")
time.Sleep(time.Second * 10)
continue
}
select {
case stopEnable := <-the.sleepCH:
if stopEnable {
stopTime := time.Second * 10
log.Printf("node 处理积压,%v,master 暂停 %v", stopEnable, stopTime)
time.Sleep(stopTime)
} else {
log.Printf("node 处理积压,%v,不正常空数据", stopEnable)
}
default:
}
select {
case args := <-dataChannels.RawDataChan:
the.notifyRawData("etNode", args)
case args := <-dataChannels.AggDataChan:
the.notifyAggData("etAgg", args)
}
func printNodesInfo(nodes []*NodeRpc) {
info := fmt.Sprintf("共[%d]个节点:\n ", len(nodes))
for _, node := range nodes {
info += fmt.Sprintf("%s,%s\n", node.args.ID, node.args.Addr)
}
log.Println(info)
}
func contains(arr []string, target string) bool {
for _, value := range arr {
@ -276,42 +308,6 @@ func contains(arr []string, target string) bool {
}
return false
}
func (the *EtMaster) sortNodeListByThingCount() {
sort.Slice(the.nodeList, func(i, j int) bool {
return len(the.nodeList[i].args.ThingIds) < len(the.nodeList[j].args.ThingIds)
})
}
func (the *EtMaster) notifyRawData(nodeType string, d common_models.IotaData) {
isMatch := false
for _, nodeRpc := range the.nodeList {
if contains(nodeRpc.args.ThingIds, d.ThingId) {
isMatch = true
switch nodeRpc.args.NodeType {
case nodeType:
the.call_etNode(nodeRpc, &d)
}
}
}
//无匹配触发 reBalance
if !isMatch {
if len(the.nodeList) > 0 {
the.sortNodeListByThingCount()
the.nodeList[0].args.ThingIds = append(the.nodeList[0].args.ThingIds, d.ThingId)
log.Printf("thingId:[%s] 分配到node:[%s]", d.ThingId, the.nodeList[0].args.Addr)
the.call_etNode(the.nodeList[0], &d)
}
}
}
func (the *EtMaster) notifyAggData(nodeType string, d common_models.AggData) {
for _, nodeRpc := range the.nodeList {
switch nodeRpc.args.NodeType {
case nodeType:
go the.call_aggNode(nodeRpc, &d)
}
}
}
func timeCost(nodeId, deviceId string, start time.Time) {
tc := time.Since(start)
log.Printf("调用node[%s],[%s]耗时 = %v", nodeId, deviceId, tc)

81
node/app/et_node.go

@ -1,6 +1,8 @@
package app
import (
"context"
"et_analyze"
"fmt"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils"
@ -15,10 +17,11 @@ import (
)
type EtNode struct {
nodeInfo *common_models.NodeArgs
master *rpcMaster
ch chan *common_models.ProcessData
recvDataHandler *et_recv.RecvDataHanler
nodeInfo *common_models.NodeArgs
master *rpcMaster
ch chan *common_models.ProcessData
recvDataHandler *et_recv.RecvDataHanler
aggAnalyzeHandler *et_analyze.AggThresholdHandler
}
type rpcMaster struct {
@ -30,22 +33,22 @@ const chSize = 1
func NewEtWorker() *EtNode {
node := &EtNode{
ch: make(chan *common_models.ProcessData, chSize),
recvDataHandler: et_recv.NewRecvDataHanler(),
ch: make(chan *common_models.ProcessData, chSize),
recvDataHandler: et_recv.NewRecvDataHanler(),
aggAnalyzeHandler: et_analyze.NewAggThresholdHandler(),
}
node.exitMonitor()
node.heartMonitor()
return node
}
// Handler 是 RPC 服务方法,由 master 远程调用
func (the *EtNode) Handler(iotaData common_models.IotaData, replay *bool) error {
*replay = true
// IotaDataHandler 是 RPC 服务方法,由 master 远程调用
func (the *EtNode) IotaDataHandler(iotaData common_models.IotaData, reply *bool) error {
*reply = true
err := the.ConsumerProcess(&iotaData)
if err != nil {
*replay = false
*reply = false
}
return err
}
@ -69,25 +72,39 @@ func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error {
return nil
}
// AggDataHandler 聚集阈值处理者,被 master 远程调用
func (the *EtNode) AggDataHandler(aggData common_models.AggData, reply *bool) error {
*reply = true
err := the.aggAnalyzeHandler.ProcessData(&aggData)
if err != nil {
errmsg := fmt.Sprintf("[etNode.AggDataHandler]变化速率阈值分析%s[aggTypeId:%d]ERROR: %v", aggData.R(), aggData.AggTypeId, err)
log.Println(errmsg)
return err
}
log.Printf("[etNode.AggDataHandler]变化速率阈值分析SUCCESS。%s[aggTypeId:%d]changed[%v]", aggData.R(), aggData.AggTypeId, aggData.Changed)
return nil
}
// 实现源接口
//func (the *EtNode) Process(ctx context.Context) (<-chan any, error) {
// source := make(chan any, chSize)
// go func() {
// defer close(source)
// for {
// select {
// case a := <-the.ch:
// source <- a
// log.Printf("存储数据=>source out,len=%d,%d", len(source), len(the.ch))
// case <-ctx.Done():
// log.Println("退出[source] EtNode.Process")
// return
// }
//
// }
// }()
// return source, nil
//}
func (the *EtNode) Process(ctx context.Context) (<-chan any, error) {
source := make(chan any, chSize)
go func() {
defer close(source)
for {
select {
case a := <-the.ch:
source <- a
log.Printf("存储数据=>source out,len=%d,%d", len(source), len(the.ch))
case <-ctx.Done():
log.Println("退出[source] EtNode.Process")
return
}
}
}()
return source, nil
}
// RegisterToMaster 调用 master 发布的RPC服务方法 master.NodeRegister
func (the *EtNode) RegisterToMaster() {
@ -116,7 +133,7 @@ func (the *EtNode) RegisterToMaster() {
ipPrefix := configLoad.LoadConfig().GetString("node.hostIpPrefix")
ip4 := common_utils.ReadIP4WithPrefixFirst(ipPrefix)
hostName, err := os.Hostname()
log.Printf("node [%s] ip=%s", hostName, ip4)
log.Printf("node [%s] ip=%s\n", hostName, ip4)
port := configLoad.LoadConfig().GetUint16("node.port")
callNodeAddr := fmt.Sprintf("%s:%d", ip4, port)
@ -169,9 +186,9 @@ func (the *EtNode) heartToMaster() {
func (the *EtNode) UnRegisterToMaster() {
var result bool
if err := the.master.conn.Call("master.NodeUnRegister", the.nodeInfo, &result); err != nil {
log.Printf("node[%s] 注销到 master,异常:%v", the.nodeInfo.Addr, err.Error())
log.Printf("node[%s] 从master注销,异常:%v", the.nodeInfo.Addr, err.Error())
} else {
log.Printf("node[%s] 注销到 master,结果:%v", the.nodeInfo.Addr, result)
log.Printf("node[%s] 从master注销,结果:%v", the.nodeInfo.Addr, result)
}
}
func (the *EtNode) exitMonitor() {

Loading…
Cancel
Save