You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
128 lines
3.1 KiB
128 lines
3.1 KiB
package node_manager
|
|
|
|
import (
|
|
"et_rpc"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
Node_Load_Change_Threshold = 200
|
|
Node_Load_Threshold = 200 // 节点数据积压阈值
|
|
Node_Refresh_Interval = 120 * time.Second // 节点信息刷新间隔
|
|
)
|
|
|
|
type LoadBalancer struct {
|
|
nodes []*NodeConnection
|
|
nodeSelector INodeSelector // 节点选择器
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func NewLoadBalancer(selector INodeSelector) *LoadBalancer {
|
|
lb := &LoadBalancer{
|
|
nodes: make([]*NodeConnection, 0),
|
|
nodeSelector: selector,
|
|
mu: sync.RWMutex{},
|
|
}
|
|
|
|
// TODO 启动健康度检查
|
|
//go lb.HealthCheck()
|
|
|
|
return lb
|
|
}
|
|
|
|
func (b *LoadBalancer) AddNode(node *NodeConnection) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
b.nodes = append(b.nodes, node)
|
|
}
|
|
|
|
func (b *LoadBalancer) RemoveNode(addr string) bool {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
for i, node := range b.nodes {
|
|
if node.Addr == addr {
|
|
b.nodes = append(b.nodes[:i], b.nodes[i+1:]...)
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (b *LoadBalancer) SetSelector(selector INodeSelector) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
b.nodeSelector = selector
|
|
}
|
|
|
|
// SelectNode 通过节点选择器选择节点
|
|
func (b *LoadBalancer) SelectNode() (*NodeConnection, error) {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
return b.nodeSelector.Select(b.nodes)
|
|
}
|
|
|
|
// UpdateNode 事件驱动更新节点,isError 立即更新
|
|
func (b *LoadBalancer) UpdateNode(nodeArgs *et_rpc.NodeArgs, isError bool) error {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
// 不健康:status = NodeState_Unhealthy 或者 Load 超阈值
|
|
for _, node := range b.nodes {
|
|
if node.Addr == nodeArgs.Addr {
|
|
isOverThreshold := abs(node.NArgs.Load-nodeArgs.Load) > Node_Load_Change_Threshold // 荷载变化超阈值
|
|
isTimeout := time.Since(node.lastUpdate) > Node_Refresh_Interval // 超刷新间隔
|
|
if isError || isOverThreshold || isTimeout {
|
|
node.NArgs.Load = nodeArgs.Load
|
|
node.NArgs.Status = et_rpc.NodeState_Healthy //TODO node.GetHealthStatus(nodeArgs)
|
|
node.lastUpdate = time.Now()
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return fmt.Errorf("未注册的节点: %s", nodeArgs.Addr)
|
|
}
|
|
|
|
func (b *LoadBalancer) NodeExists(nodeAddr string) bool {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
for _, node := range b.nodes {
|
|
if node.Addr == nodeAddr {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func abs(x int) int {
|
|
if x < 0 {
|
|
return -x
|
|
}
|
|
return x
|
|
}
|
|
|
|
// 定时健康检查、更新节点状态
|
|
//func (b *LoadBalancer) HealthCheck() {
|
|
// for {
|
|
// b.mu.Lock()
|
|
// reply := new(et_rpc.NodeArgs)
|
|
// for _, node := range b.nodes {
|
|
// result := b.checkNodeHealth(node, reply)
|
|
// b.UpdateNode(reply, result)
|
|
// }
|
|
// b.mu.Unlock()
|
|
//
|
|
// time.Sleep(5 * time.Minute)
|
|
// }
|
|
//}
|
|
//
|
|
//// 健康检查的具体实现
|
|
//func (b *LoadBalancer) checkNodeHealth(conn *NodeConnection, reply *et_rpc.NodeArgs) bool {
|
|
// // 健康检查,例如发送心跳请求等
|
|
// err := conn.Call(context.Background(), et_rpc.RPCService_Node_Ping, &et_rpc.NodeArgs{}, reply)
|
|
// return err == nil
|
|
//
|
|
// // TODO 根据返回信息:节点的CPU、内存、硬盘使用情况、数据积压情况来判断节点的健康情况
|
|
//}
|
|
|