Browse Source

迁移从 net.rpc 到 gRPC

dev
yfh 1 month ago
parent
commit
35d6142e16
  1. 85
      node/agg_worker/agg_node.go
  2. 108
      node/app/app.go
  3. 445
      node/app/et_node.go
  4. 242
      node/app/master_connection_grpc.go
  5. 112
      node/app/master_connection_pool_grpc.go
  6. 68
      node/app/node_process_stage.go
  7. 459
      node/app/node_server.go

85
node/agg_worker/agg_node.go

@ -1,85 +0,0 @@
package agg_worker
import (
"et_analyze"
"gitea.anxinyun.cn/container/common_models"
"github.com/google/uuid"
"log"
"net/rpc"
"node/et_worker/et_recv"
"os"
"time"
)
type AggNode struct {
recvDataHandler *et_recv.RecvDataHanler
}
func NewAggWorker() *AggNode {
return &AggNode{
recvDataHandler: et_recv.NewRecvDataHanler(),
}
}
// Handler 是 RPC 接口,由 master 远程调用
func (the *AggNode) Handler(aggData common_models.AggData, replay *bool) error {
*replay = true
err := the.ConsumerProcess(&aggData)
if err != nil {
return err
}
return nil
}
// ConsumerProcess 处理阈值判断业务
func (the *AggNode) ConsumerProcess(aggData *common_models.AggData) error {
aggHandler := et_analyze.NewAggThresholdHandler()
aggHandler.ProcessData(aggData)
log.Printf("rpc聚集阈值分析[%d]-time[%s]-[%v]", aggData.SensorId, aggData.Date, aggData.Agg)
return nil
}
// RegisterToMaster 调用 master 发布的RPC服务方法 master.NodeRegister
func (the *AggNode) RegisterToMaster() {
connectCount := 0
for {
connectCount++
if connectCount > 3 {
log.Printf("RegisterToMaster 失败 超过%d次,准备退出", connectCount-1)
time.Sleep(time.Second * 10)
os.Exit(1)
}
masterAddr := os.Getenv("masterAddr")
if masterAddr == "" {
masterAddr = "127.0.0.1:50000"
}
time.Sleep(time.Second * 1)
master, err := rpc.Dial("tcp", masterAddr)
if err != nil {
log.Printf("链接失败-> node[%s]", masterAddr)
continue
}
//todo 获取node自己地址
nodeAddr := "127.0.0.1:40001"
status := `{"health_status":"healthy","load_average":{"1_min":0.75,"5_min":1.2,"15_min":0.9},"availability":"available","last_check_time":"2022-01-01T12:00:00Z"}`
resources := `{"cpu":{"cores":4,"usage":"50%","temperature":"60°C"},"memory":{"total":"8GB","used":"4GB","available":"4GB"},"storage":{"total":"256GB","used":"100GB","available":"156GB"}}`
nodeArgs := &common_models.NodeArgs{
ID: uuid.New().String(),
NodeType: "aggNode",
Status: status,
Resources: resources,
Addr: nodeAddr,
ThingIds: []string{},
}
var result bool
err = master.Call("master.NodeRegister", &nodeArgs, &result)
if err != nil {
log.Printf("node[%s]注册到master[%s]异常:%v", masterAddr, nodeAddr, result)
continue
}
break
}
}

108
node/app/app.go

@ -1,23 +1,9 @@
package app
import (
"encoding/gob"
"et_Info"
"et_analyze"
"et_cache"
"et_calc"
"et_calc/group"
"et_print"
"et_push"
"et_sink"
"fmt"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"gopkg.in/natefinch/lumberjack.v2"
"io"
"log"
"net"
"net/rpc"
"node/stages"
"os"
"time"
)
@ -25,7 +11,7 @@ import (
func init() {
multiWriter := io.MultiWriter(os.Stdout, &lumberjack.Logger{
Filename: "./logs/logInfo.log",
MaxSize: 10, // megabytes
MaxSize: 30, // megabytes
MaxBackups: 20,
MaxAge: 30, //days
//Compress: true,
@ -36,85 +22,25 @@ func init() {
}
func Start() {
// etNode 注册
nodeWorker := NewEtWorker()
// etNode 数据后处理环节
nodeStageManage := stages.NewStageManager()
nodeStageManage.AddSource(nodeWorker.ch)
//add 业务环节
nodeStageManage = addWorkStages(nodeStageManage)
// 启动 Node RPC 服务
nodeManager := NewETNode()
go nodeManager.startRPCServer()
<-nodeManager.grpcServerStarted
//add 测试环节
//nodeStageManage = addTestPrintStages(nodeStageManage)
// 初始化与 master 的连接
nodeManager.connectAndRegisterToMaster()
// 启动 etNode 处理
nodeStageManage.Run()
// 每 60 秒向 master 发送一次心跳
go nodeManager.heartbeat(60 * time.Second)
gob.Register([]interface{}{})
err := rpc.RegisterName("etNode", nodeWorker)
if err != nil {
log.Panicf("注册 etNode rpc 异常")
}
go nodeSerRpcListen()
// 启动退出监听的协程
nodeManager.startMonitorExit()
//后移注册流程,避免node启动异常的无效注册
nodeWorker.RegisterToMaster()
//go func() {
// for g := range chGroupData {
// log.Printf("groupItem: %v", g.Stations[0].Info.Name)
// log.Printf("chGroupData=%p,通道数据:%d/%d", chGroupData, len(chGroupData), cap(chGroupData))
// }
//}()
for {
time.Sleep(time.Hour)
}
}
func nodeSerRpcListen() {
port := configLoad.LoadConfig().GetUint16("node.port")
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Panicf("服务启动rpc 异常=%s", err.Error())
}
log.Printf("服务监听=> :%d", port)
for {
conn, err := listener.Accept()
if err != nil {
log.Println("rpc Accept异常")
}
log.Printf("node 建立链接 from master[%s]", conn.RemoteAddr())
go rpc.ServeConn(conn)
}
}
func addWorkStages(nodeStageManage *stages.StageManager) *stages.StageManager {
// raws 数据存储
sinkRawHandler := et_sink.NewSinkRawHandler()
nodeStageManage.AddStages(sinkRawHandler.GetStage())
// 测点信息获取
infoHandler := et_Info.NewInfoHandler()
nodeStageManage.AddStages(infoHandler.GetStage())
// 单测点计算
calcHandler := et_calc.NewCalcHandler()
nodeStageManage.AddStages(calcHandler.GetStage())
// 滑窗过滤
cacheHandler := et_cache.NewCacheHandler()
nodeStageManage.AddStages(cacheHandler.GetStage())
// 测点分组计算
groupCalcHandler := group.NewGroupCalc()
nodeStageManage.AddStages(groupCalcHandler.GetStage())
// Theme 数据存储
sinkThemeHandler := et_sink.NewSinkThemeHandler()
nodeStageManage.AddStages(sinkThemeHandler.GetStage())
// 测点阈值分析
stationAnalyzeHandler := et_analyze.NewThresholdHandler()
nodeStageManage.AddStages(stationAnalyzeHandler.GetStage())
// 数据推送
publishHandler := et_push.NewPushHandler()
nodeStageManage.AddStages(publishHandler.GetStage())
return nodeStageManage
}
func addTestPrintStages(nodeStageManage *stages.StageManager) *stages.StageManager {
printHandler := et_print.NewPrintHandler()
nodeStageManage.AddStages(printHandler.GetStage())
return nodeStageManage
}

445
node/app/et_node.go

@ -1,273 +1,306 @@
package app
import (
"context"
"et_analyze"
"et_rpc"
"et_rpc/pb"
"fmt"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"log"
"net/rpc"
"node/et_worker/et_recv"
"net"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
)
type EtNode struct {
nodeInfo *common_models.NodeArgs
master *rpcMaster
ch chan *common_models.ProcessData
recvDataHandler *et_recv.RecvDataHanler
aggAnalyzeHandler *et_analyze.AggThresholdHandler
type ETNode struct {
// node rpc server 相关信息
grpcServer *grpc.Server
nodeServer *NodeServiceServer
grpcServerStarted chan struct{} // 通知主程序RPC已经启动
processChannels []chan []*common_models.ProcessData
groupDataChan chan []*common_models.ProcessData // 分组数据
// node 信息
nodeInfo *et_rpc.NodeArgs
Addr string
// master 信息
masterAddr string
masterConn *MasterConnection
}
type rpcMaster struct {
conn *rpc.Client
addr string
}
func NewETNode() *ETNode {
const processChannelsCount = 1
processBufSize := configLoad.LoadConfig().GetInt("performance.node.processBufSize")
processChannels := make([]chan []*common_models.ProcessData, processChannelsCount)
for i := 0; i < processChannelsCount; i++ {
processChannels[i] = make(chan []*common_models.ProcessData, processBufSize)
}
nodeServer := NewNodeServer(processChannels)
grpcServer := grpc.NewServer()
pb.RegisterNodeServiceServer(grpcServer, nodeServer)
const chSize = 1
// 创建grpc健康检查服务
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
// 设置初始健康状态
healthServer.SetServingStatus("NodeService", grpc_health_v1.HealthCheckResponse_SERVING)
func NewEtWorker() *EtNode {
node := &EtNode{
ch: make(chan *common_models.ProcessData, chSize),
recvDataHandler: et_recv.NewRecvDataHanler(),
aggAnalyzeHandler: et_analyze.NewAggThresholdHandler(),
m := &ETNode{
grpcServer: grpcServer,
nodeServer: nodeServer,
grpcServerStarted: make(chan struct{}),
processChannels: processChannels,
//groupDataChan: make(chan []*common_models.ProcessData, 500),
}
node.exitMonitor()
node.heartMonitor()
return node
// 初始化 Node 信息
m.nodeInfo = m.initNodeInfo()
return m
}
// IotaDataHandler 是 RPC 服务方法,由 master 远程调用
func (the *EtNode) IotaDataHandler(iotaData common_models.IotaData, reply *bool) error {
*reply = true
err := the.ConsumerProcess(&iotaData)
func (n *ETNode) startRPCServer() {
port := configLoad.LoadConfig().GetUint16("node.port")
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
*reply = false
log.Panicf("启动 Node RPC 服务失败:%v", err)
}
return err
}
defer listener.Close()
log.Printf("启动 Node RPC 服务成功,服务端口:%d", port)
// 是沉降测试数据
func isSettleData(data map[string]interface{}) bool {
// {"pressure":23.09,"temperature":24.93,"ssagee":16.44}
validKeys := map[string]bool{
"pressure": true,
"temperature": true,
"ssagee": true,
time.Sleep(100 * time.Millisecond)
close(n.grpcServerStarted)
// 启动 gRPC 服务器
if err := n.grpcServer.Serve(listener); err != nil {
log.Panicf("gRPC 服务器服务失败:%v", err)
}
}
if len(data) != 3 {
return false
func (n *ETNode) initNodeInfo() *et_rpc.NodeArgs {
// 获取主机的 IP 地址前缀
ipPrefix := configLoad.LoadConfig().GetString("node.hostIpPrefix")
ip4 := common_utils.ReadIP4WithPrefixFirst(ipPrefix)
// 获取主机名
hostName, err := os.Hostname()
if err != nil {
log.Fatalf("获取主机名失败: %v", err)
}
// 获取配置的端口号
log.Printf("node [%s] ip=%s\n", hostName, ip4)
port := configLoad.LoadConfig().GetUint16("node.port")
if port == 0 {
log.Fatalf("未配置有效的端口号")
}
// 构造 Node 的地址
nodeAddr := fmt.Sprintf("%s:%d", ip4, port)
log.Printf("node 的地址为 %s", nodeAddr)
n.Addr = nodeAddr
for key := range data {
if !validKeys[key] {
return false
}
// 初始化 Node 信息
return &et_rpc.NodeArgs{
ID: hostName + time.Now().Format("_20060102_150405"),
Status: et_rpc.NodeState_Healthy,
ErrCode: et_rpc.RPCReply_Success,
ResourceJson: "",
Addr: nodeAddr,
}
return true
}
// ConsumerProcess 将 IotaData -> ProcessData
func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error {
// 记录方法开始时间
startTime := time.Now()
//TODO #TEST BEGIN 测试静力水准仪 (现在有计算公式的单测点计算有问题,为了能跑通 沉降分组计算 测试)
//if !isSettleData(iotaData.Data.Data) {
// return nil
//}
// #TEST END
func (n *ETNode) connectAndRegisterToMaster() {
// 获取master配置
masterHost := configLoad.LoadConfig().GetString("node.remoteMasterHost")
masterPort := configLoad.LoadConfig().GetUint16("master.port")
if masterHost == "" {
masterHost = "127.0.0.1"
}
if masterPort == 0 {
masterPort = 50000
}
masterAddr := fmt.Sprintf("%s:%d", masterHost, masterPort)
deviceData, err := the.recvDataHandler.OnDataHandler(*iotaData)
// node 建立与 master 的连接
masterConn, err := NewMasterConnection(masterAddr)
if err != nil {
return err
log.Printf("ERROR: 建立与 Master[%s] 的连接失败!!\n", masterAddr)
} else {
n.masterConn = masterConn
log.Printf("建立与 Master[%s] 的连接成功!\n", masterAddr)
}
if deviceData == nil {
return nil
}
n.masterAddr = masterAddr
n.masterConn = masterConn
log.Printf("rpc处理设备数据[%s]-time[%v]-[%v]", deviceData.DeviceId, deviceData.AcqTime, deviceData.Raw)
time.Sleep(500 * time.Millisecond)
the.ch <- &common_models.ProcessData{
DeviceData: *deviceData,
Stations: []common_models.Station{},
// node 向 master 发送注册请求(node 调用 master 的注册服务)
err = n.register() //尝试3次
if err != nil {
// 3次尝试失败后退出程序
log.Fatalf("node[%s]->master[%s] 注册失败,Error: %v", n.nodeInfo.Addr, masterAddr, err)
//log.Println(err)
}
defer func() {
duration := time.Since(startTime)
log.Printf("ConsumerProcess(iotaData *common_models.IotaData)执行时长: %v", duration)
}()
return nil
}
// AggDataHandler 聚集阈值处理者,被 master 远程调用
func (the *EtNode) AggDataHandler(aggData common_models.AggData, reply *bool) error {
*reply = true
err := the.aggAnalyzeHandler.ProcessData(&aggData)
if err != nil {
errmsg := fmt.Sprintf("[etNode.AggDataHandler]变化速率阈值分析%s[aggTypeId:%d]ERROR: %v", aggData.R(), aggData.AggTypeId, err)
log.Println(errmsg)
return err
func (n *ETNode) register() error {
if n.masterConn == nil {
return fmt.Errorf("n.masterConn is nil")
}
log.Printf("[etNode.AggDataHandler]变化速率阈值分析SUCCESS。%s[aggTypeId:%d]changed[%v]", aggData.R(), aggData.AggTypeId, aggData.Changed)
return nil
}
//n.nodeInfo.Load = len(n.chIotaData)
const maxRetries = 3
retries := 0
// 实现源接口
func (the *EtNode) Process(ctx context.Context) (<-chan any, error) {
source := make(chan any, chSize)
go func() {
defer close(source)
for {
select {
case a := <-the.ch:
source <- a
log.Printf("存储数据=>source out,len=%d,%d", len(source), len(the.ch))
case <-ctx.Done():
log.Println("退出[source] EtNode.Process")
return
}
for {
err := n.masterConn.CallRegister(n.nodeInfo)
if err == nil {
log.Println("注册成功")
return nil // 注册成功,返回 nil
} else {
log.Printf("注册失败: %v", err)
retries++
}
if retries >= maxRetries {
log.Println("达到最大重试次数,停止注册尝试")
return fmt.Errorf("注册失败,达到最大重试次数: %v", err) // 返回错误
}
}()
return source, nil
// 每5秒发送一次注册消息
time.Sleep(5 * time.Second)
}
}
func (n *ETNode) unregister() {
reply := new(et_rpc.NodeArgs)
err := n.masterConn.CallUnregister()
if err != nil {
log.Printf("node[%s] 从master注销异常,err=%v", n.nodeInfo.Addr, err.Error())
} else {
log.Printf("node[%s] 从master注销成功。reply=%+v", n.nodeInfo.Addr, reply)
}
}
// RegisterToMaster 调用 master 发布的RPC服务方法 master.NodeRegister
func (the *EtNode) RegisterToMaster() {
maxCount := 3
connectCount := 0
func (n *ETNode) heartbeat(interval time.Duration) {
// 心跳3次失败后,发送注册消息
const maxRetries = 3
retries := 0
var err error
for {
connectCount++
if connectCount > maxCount {
log.Printf("RegisterToMaster 失败 超过%d次,准备退出", maxCount)
time.Sleep(time.Second * 10)
os.Exit(1)
}
masterAddr := loadMasterAddr()
masterConn, err := rpc.Dial("tcp", masterAddr)
if err != nil {
log.Printf("链接失败-> node[%s]", masterAddr)
time.Sleep(time.Second * 5)
if n.masterConn == nil {
log.Println("ERROR: masterConn is nil")
time.Sleep(1 * time.Second)
continue
}
the.master = &rpcMaster{
conn: masterConn,
addr: masterAddr,
}
time.Sleep(time.Millisecond * 200)
//获取node自己地址
ipPrefix := configLoad.LoadConfig().GetString("node.hostIpPrefix")
ip4 := common_utils.ReadIP4WithPrefixFirst(ipPrefix)
hostName, err := os.Hostname()
log.Printf("node [%s] ip=%s\n", hostName, ip4)
port := configLoad.LoadConfig().GetUint16("node.port")
callNodeAddr := fmt.Sprintf("%s:%d", ip4, port)
if the.nodeInfo == nil {
the.nodeInfo = &common_models.NodeArgs{
ID: hostName + time.Now().Format("_20060102_150405"),
NodeType: "etNode",
Status: "",
Resources: "",
Addr: callNodeAddr,
ThingIds: []string{},
err = n.masterConn.CallHeartbeatNode(n.nodeInfo.Addr)
if err == nil {
log.Println("心跳成功!!")
retries = 0
} else {
log.Printf("心跳消息发送失败: %v", err)
if strings.Contains(err.Error(), "未注册的节点") {
retries = 3
} else {
retries++
}
}
var result bool
err = the.master.conn.Call("master.NodeRegister", the.nodeInfo, &result)
if err != nil {
log.Printf("node[%s] 注册到 master[%s]异常:%v", the.nodeInfo.Addr, the.master.addr, result)
continue
}
break
}
}
func (the *EtNode) heartToMaster() {
maxCount := 3
connectCount := 0
reRegister := false
for {
connectCount++
if connectCount > maxCount {
log.Printf("heartToMaster 失败 超过%d次", maxCount)
reRegister = true
break
}
var result bool
err := the.master.conn.Call("master.NodeHeart", the.nodeInfo, &result)
if err != nil {
log.Printf("node[%s] 心跳到 master[%s]异常:%v", the.nodeInfo.Addr, the.master.addr, result)
time.Sleep(time.Second * 5)
continue
if retries >= maxRetries {
// 发送注注册、重置重试计数
err = n.register()
if err == nil {
log.Println("重新注册成功", n.Addr)
} else {
log.Println("重新注册失败", n.Addr)
}
retries = 0
}
break
}
if reRegister { //触发重新注册
log.Printf("node[%s] 心跳失败触发-重新注册到 master[%s]", the.nodeInfo.Addr, the.master.addr)
the.RegisterToMaster()
}
}
func (the *EtNode) UnRegisterToMaster() {
var result bool
if err := the.master.conn.Call("master.NodeUnRegister", the.nodeInfo, &result); err != nil {
log.Printf("node[%s] 从master注销,异常:%v", the.nodeInfo.Addr, err.Error())
} else {
log.Printf("node[%s] 从master注销,结果:%v", the.nodeInfo.Addr, result)
// 每60秒发送一次心跳
time.Sleep(interval)
}
}
func (the *EtNode) exitMonitor() {
go func() {
c := make(chan os.Signal, 1)
// 通过signal.Notify函数将信号通道c注册到系统相关的退出信号上
// 这里使用了两个退出信号:syscall.SIGINT(Ctrl+C)和syscall.SIGTERM(系统发送的退出信号)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGKILL)
// 阻塞等待接收信号
s := <-c
log.Printf("接收到退出信号:%v,进行清理工作", s)
the.UnRegisterToMaster()
time.Sleep(3 * time.Second)
os.Exit(0)
}()
func (n *ETNode) startMonitorExit() {
c := make(chan os.Signal, 1)
// 通过signal.Notify函数将信号通道c注册到系统相关的退出信号上
// 这里使用了两个退出信号:syscall.SIGINT(Ctrl+C)和syscall.SIGTERM(系统发送的退出信号)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGKILL)
// 阻塞等待接收信号
s := <-c
log.Printf("接收到退出信号:%v,进行清理工作", s)
// 注销
n.unregister()
// 等待所有数据处理完毕
n.waitForDataProcessing()
log.Printf("退出前,通道中数据已经处理完毕!!")
os.Exit(0)
}
func (the *EtNode) heartMonitor() {
func (n *ETNode) waitForDataProcessing() {
const waitInterval = 10 * time.Second
var wg sync.WaitGroup
// 处理 IotaData 通道
wg.Add(1)
go func() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
if the.master != nil {
log.Printf("node[%s] 心跳触发-> master[%s]", the.nodeInfo.Addr, the.master.addr)
the.heartToMaster()
defer wg.Done()
for _, ch := range n.processChannels {
for len(ch) > 0 {
time.Sleep(waitInterval)
}
}
}()
// 处理 groupDataChan 通道
wg.Add(1)
go func() {
defer wg.Done()
for len(n.groupDataChan) > 0 {
time.Sleep(waitInterval)
}
}()
// 等待所有 goroutine 完成
wg.Wait()
}
// LoadCh test用
func (the *EtNode) LoadCh() chan *common_models.ProcessData {
return the.ch
}
//func LogProcessDataTimeCost(nodeId, deviceId string, start time.Time) {
// tc := time.Since(start)
// log.Printf("******** [%s][%s]装载设备信息耗时: %v", nodeId, deviceId, tc)
//}
func loadMasterAddr() string {
masterHost := configLoad.LoadConfig().GetString("node.remoteMasterHost")
masterPort := configLoad.LoadConfig().GetUint16("master.port")
if masterHost == "" {
masterHost = "127.0.0.1"
}
if masterPort == 0 {
masterPort = 50000
}
return fmt.Sprintf("%s:%d", masterHost, masterPort)
}
// 是沉降测试数据
//func isSettleData(data map[string]interface{}) bool {
// // {"pressure":23.09,"temperature":24.93,"ssagee":16.44}
// validKeys := map[string]bool{
// "pressure": true,
// "temperature": true,
// "ssagee": true,
// }
//
// if len(data) != 3 {
// return false
// }
//
// for key := range data {
// if !validKeys[key] {
// return false
// }
// }
// return true
//}

242
node/app/master_connection_grpc.go

@ -0,0 +1,242 @@
package app
import (
"context"
"et_rpc"
"et_rpc/pb"
"fmt"
pool "github.com/jolestar/go-commons-pool"
"google.golang.org/grpc/health/grpc_health_v1"
"log"
"sync"
"time"
)
type MasterConnection struct {
MasterAddr string
Addr string
NArgs *et_rpc.NodeArgs
rpcPool *pool.ObjectPool
lastUpdate time.Time // 节点信息更新时间
ctx context.Context
mu sync.Mutex
factory *MasterGRPCClientFactory
}
// TODO NewMasterConnection 从配置文件中获取 pool 参数
func NewMasterConnection(masterAddr string) (*MasterConnection, error) {
ctx := context.Background()
factory := NewMasterGRPCClientFactory(masterAddr)
p := pool.NewObjectPoolWithDefaultConfig(ctx, factory)
p.Config.MaxTotal = 10 // 最大连接数
p.Config.MaxIdle = 5 // 最大空闲连接数据
p.Config.MinIdle = 1 // 最小空闲连接数据
p.Config.TestOnBorrow = true
p.Config.TestOnReturn = false
p.Config.TestWhileIdle = true // 是否在空闲时检查连接有效性
p.Config.MinEvictableIdleTime = 30 * time.Minute //空闲连接最小可驱逐时间
//p.Config.SoftMinEvictableIdleTime = 15 * time.Minute //空闲连接软最小可驱逐时间
conn := &MasterConnection{
ctx: ctx,
MasterAddr: masterAddr,
rpcPool: p,
}
// 获取连接进行简单的测试
obj, err := conn.rpcPool.BorrowObject(ctx)
if err != nil {
return nil, fmt.Errorf("建立RPC连接失败:%w", err)
}
defer conn.rpcPool.ReturnObject(ctx, obj)
grpcPoolObj, ok := obj.(*MasterGRPCPoolObject)
if !ok {
log.Fatalf("类型断言失败,obj 不是 *MasterGRPCPoolObject 类型")
}
// 健康检查
healthClient := grpc_health_v1.NewHealthClient(grpcPoolObj.Conn)
resp, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{
Service: "MasterService",
})
if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING {
return nil, fmt.Errorf("健康检查失败: %v, 状态: %v", err, resp.Status)
}
conn.factory = factory
return conn, nil
}
func (n *MasterConnection) BorrowValidConnection(ctx context.Context) (*pool.PooledObject, error) {
var obj1 interface{}
var err error
// 尝试借用连接,最多重试 3 次
for attempts := 0; attempts < 3; attempts++ {
obj1, err = n.rpcPool.BorrowObject(ctx)
if err == nil {
break
}
log.Printf("Attempt %d: Failed to borrow object from pool: %v", attempts+1, err)
time.Sleep(1 * time.Second)
}
if err != nil {
return nil, fmt.Errorf("borrow object error after 3 attempts: %w", err)
}
pooledObject, ok := obj1.(*pool.PooledObject)
if !ok {
return nil, log.Output(2, "Invalid object type from pool") // 类型不匹配,返回错误
}
if !n.factory.ValidateObject(ctx, pooledObject) {
err = n.factory.DestroyObject(ctx, pooledObject)
if err != nil {
return nil, err
}
obj1, err = n.factory.MakeObject(ctx)
if err != nil {
return nil, err
}
pooledObject, ok = obj1.(*pool.PooledObject)
if !ok {
return nil, log.Output(2, "Invalid object type from pool after recreation") // 类型不匹配,返回错误
}
}
return pooledObject, nil
}
func (n *MasterConnection) CallRegister(nodeInfo *et_rpc.NodeArgs) error {
n.NArgs = nodeInfo
// 创建新的上下文并设置超时
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// 从连接池中借用一个连接
obj1, err := n.rpcPool.BorrowObject(ctx)
if err != nil {
return fmt.Errorf("gRPC[CallRegister] 借用对象错误: %w", err)
}
// 使用连接相关处理
rpcPoolObj := obj1.(*MasterGRPCPoolObject)
defer func() {
if err := n.rpcPool.ReturnObject(ctx, obj1); err != nil {
log.Printf("gRPC[CallRegister] 归还对象到连接池失败: %v", err)
}
}()
// 进行 RPC 调用
request := &pb.NodeRequest{
Id: fmt.Sprintf("master-%s", n.MasterAddr),
Address: nodeInfo.Addr,
ThingIds: make([]string, 0),
}
resp, err := rpcPoolObj.Client.RegisterNode(ctx, request)
if err != nil {
return fmt.Errorf("调用 gRPC[CallRegister] 错误: %w", err)
}
log.Printf("调用 gRPC[CallRegister] resp=%+v, err=%+v\n", resp, err)
// 归还连接
//err = n.rpcPool.ReturnObject(ctx, obj1)
//if err != nil {
// log.Printf("归还对象到连接池失败: %v", err)
// return fmt.Errorf("归还对象错误: %w", err)
//}
return nil
}
func (n *MasterConnection) CallUnregister() error {
// 创建新的上下文并设置超时
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// 从连接池中借用一个连接
obj1, err := n.rpcPool.BorrowObject(ctx)
if err != nil {
return fmt.Errorf("gRPC[CallUnregister] 借用对象错误: %w", err)
}
// 使用连接相关处理
rpcPoolObj := obj1.(*MasterGRPCPoolObject)
defer func() {
if err := n.rpcPool.ReturnObject(ctx, obj1); err != nil {
log.Printf("gRPC[CallUnregister] 归还对象到连接池失败: %v", err)
}
}()
// 进行 RPC 调用
request := &pb.NodeRequest{
Id: "",
Address: n.Addr,
ThingIds: make([]string, 0),
}
resp, err := rpcPoolObj.Client.UnregisterNode(ctx, request)
if err != nil {
return fmt.Errorf("调用 gRPC[CallUnregister] 错误: %w", err)
}
log.Printf("调用 gRPC[CallUnregister] resp=%+v, err=%+v\n", resp, err)
// 归还连接
err = n.rpcPool.ReturnObject(ctx, obj1)
if err != nil {
log.Printf("归还对象到连接池失败: %v", err)
return fmt.Errorf("归还对象错误: %w", err)
}
return nil
}
func (n *MasterConnection) CallHeartbeatNode(nodeAddr string) error {
// 创建新的上下文并设置超时
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// 从连接池中借用一个连接
obj1, err := n.rpcPool.BorrowObject(ctx)
if err != nil {
return fmt.Errorf("gRPC[CallHeartbeatNode] 借用对象错误: %w", err)
}
// 使用连接相关处理
rpcPoolObj := obj1.(*MasterGRPCPoolObject)
defer func() {
if err := n.rpcPool.ReturnObject(ctx, obj1); err != nil {
log.Printf("gRPC[CallHeartbeatNode] 归还对象到连接池失败: %v", err)
}
log.Printf("gRPC[CallHeartbeatNode] 已归还对象 obj1 。")
}()
// 进行 RPC 调用
request := &pb.NodeRequest{
Id: "",
Address: nodeAddr,
ThingIds: make([]string, 0),
}
resp, err := rpcPoolObj.Client.HeartbeatNode(ctx, request)
if err != nil {
return fmt.Errorf("调用 gRPC[CallHeartbeatNode] 错误: %w", err)
}
log.Printf("调用 gRPC[CallHeartbeatNode] resp=%+v, err=%+v\n", resp, err)
// 归还连接
//err = n.rpcPool.ReturnObject(ctx, obj1)
//if err != nil {
// log.Printf("归还对象到连接池失败: %v", err)
// return fmt.Errorf("归还对象错误: %w", err)
//}
return nil
}

112
node/app/master_connection_pool_grpc.go

@ -0,0 +1,112 @@
package app
import (
"context"
"et_rpc/pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health/grpc_health_v1"
"log"
"time"
pool "github.com/jolestar/go-commons-pool"
)
type MasterGRPCPoolObject struct {
Conn *grpc.ClientConn // 保存 gRPC 连接
Client pb.MasterServiceClient // gRPC 客户端
}
type MasterGRPCClientFactory struct {
address string
}
// NewGRPCClientFactory 创建新的 gRPC 连接工厂
func NewMasterGRPCClientFactory(address string) *MasterGRPCClientFactory {
return &MasterGRPCClientFactory{
address: address,
}
}
func (f *MasterGRPCClientFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 定义重试策略
serviceConfig := `{
"methodConfig": [{
"name": [{"service": "MasterService", "method": "*"}],
"retryPolicy": {
"maxAttempts": 2,
"initialBackoff": "1s",
"maxBackoff": "10s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE", "DEADLINE_EXCEEDED"]
}
}]
}`
conn, err := grpc.NewClient(
f.address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(serviceConfig),
)
if err != nil {
return nil, err // 如果3次都失败,返回错误
}
client := pb.NewMasterServiceClient(conn)
return pool.NewPooledObject(
&MasterGRPCPoolObject{
Conn: conn,
Client: client,
},
), nil
}
// 销毁 gRPC 连接
func (f *MasterGRPCClientFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {
grpcPoolObj := object.Object.(*MasterGRPCPoolObject)
if grpcPoolObj.Client != nil {
// 关闭连接
grpcPoolObj.Conn.Close() // gRPC 客户端连接关闭
}
return nil
}
// 验证 gRPC 连接的有效性
func (f *MasterGRPCClientFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool {
grpcPoolObj := object.Object.(*MasterGRPCPoolObject)
select {
case <-ctx.Done():
return false // 如果上下文已经取消,返回无效
default:
// 继续进行有效性检查
}
healthClient := grpc_health_v1.NewHealthClient(grpcPoolObj.Conn)
resp, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{
Service: "MasterService",
})
if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING {
log.Println("ValidateObject failed:", err)
return false
}
return true
}
// 激活 gRPC 连接
func (f *MasterGRPCClientFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error {
// 可以在这里发送心跳请求以确保连接有效
return nil
}
// 非激活 gRPC 连接
func (f *MasterGRPCClientFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error {
// 可以在这里进行连接的重置,例如清除状态或缓存
return nil
}

68
node/app/node_process_stage.go

@ -0,0 +1,68 @@
package app
import (
"et_Info"
"et_analyze"
"et_cache"
"et_cache/cacheSer"
"et_calc"
"et_calc/group"
"et_push"
"et_sink"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"gitea.anxinyun.cn/container/common_utils/dbHelper"
"gitea.anxinyun.cn/container/common_utils/storage/storageDBs"
"node/stages"
)
func CreateStages(inChan chan []*common_models.ProcessData, outChan chan []*common_models.ProcessData) *stages.StageManager {
redisAddr := configLoad.LoadConfig().GetString("redis.address")
configHelper := common_utils.NewConfigHelper(redisAddr)
cacheServer := cacheSer.NewCacheServer(configHelper)
esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses")
esESHelper := dbHelper.NewESHelper(esAddresses, "", "")
storageConsumers := storageDBs.LoadIStorageConsumer()
// etNode 数据后处理环节
nodeStageManage := stages.NewStageManager(outChan)
nodeStageManage.AddSource(inChan)
// raws 数据存储
sinkRawHandler := et_sink.NewSinkRawHandler(storageConsumers)
nodeStageManage.AddStages(sinkRawHandler.GetStage())
// 测点信息获取
infoHandler := et_Info.NewInfoHandler(configHelper)
nodeStageManage.AddStages(infoHandler.GetStage())
// 单测点计算
calcHandler := et_calc.NewCalcHandler(configHelper, cacheServer, esESHelper)
nodeStageManage.AddStages(calcHandler.GetStage())
// 测点数据缓存(滑窗过滤)
cacheHandler := et_cache.NewCacheHandler(cacheServer)
nodeStageManage.AddStages(cacheHandler.GetStage())
// 测点分组计算
groupCalcHandler := group.NewGroupCalc(configHelper)
nodeStageManage.AddStages(groupCalcHandler.GetStage())
// Theme 数据存储
sinkThemeHandler := et_sink.NewSinkThemeHandler(storageConsumers)
nodeStageManage.AddStages(sinkThemeHandler.GetStage())
// 测点阈值分析
stationAnalyzeHandler := et_analyze.NewThresholdHandler()
nodeStageManage.AddStages(stationAnalyzeHandler.GetStage())
//数据推送
pushEnable := configLoad.LoadConfig().GetBool("push.enable")
if pushEnable {
publishHandler := et_push.NewPushHandler()
nodeStageManage.AddStages(publishHandler.GetStage())
}
return nodeStageManage
}

459
node/app/node_server.go

@ -0,0 +1,459 @@
package app
import (
"context"
"encoding/json"
"et_analyze"
"et_rpc/pb"
"fmt"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"golang.org/x/time/rate"
"log"
"math"
"node/et_worker/et_recv"
"strings"
"sync"
"time"
)
type UTCTime time.Time
// UnmarshalJSON 自定义日期解析
func (ut *UTCTime) UnmarshalJSON(data []byte) error {
// 去掉 JSON 字符串的引号
str := string(data)
if len(str) < 2 || str[0] != '"' || str[len(str)-1] != '"' {
return fmt.Errorf("invalid time format: %s", str)
}
str = str[1 : len(str)-1]
// 解析自定义日期格式
t, err := time.Parse("2006-01-02T15:04:05.999-0700", str)
if err != nil {
return fmt.Errorf("failed to parse time: %v", err)
}
// 赋值
*ut = UTCTime(t)
return nil
}
// aggDataMsg: {"date":"2024-09-19T09:39:59.999+0000","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}}
type AggDataJson struct {
Date UTCTime
SensorId int
StructId int
FactorId int
AggTypeId int // 聚集类型 : 10分钟/30分钟/3小时/6小时/12小时/时/日/周/月聚集
AggMethodId int // 聚集方法 : 平均值/最大值/最小值
Agg map[string]float64 // 聚集数据
Changed map[string]float64 // 变化量
}
// NodeServiceServer 实现了 NodeServiceServer 接口
type NodeServiceServer struct {
pb.UnimplementedNodeServiceServer // 嵌入未实现的接口,以便将来兼容性
Addr string
Load int32
iotaDataHandler *et_recv.RecvDataHandler
aggDataHandler *et_analyze.AggThresholdHandler
iotaChannels []chan []common_models.IotaData
processChannels []chan []*common_models.ProcessData
outProcessChannel chan []*common_models.ProcessData
nextProcessChannel int // 记录下一个要尝试的通道索引
nextChannel int // 记录下一个要尝试的通道索引
mu sync.Mutex // 保护 nextChannel 的并发访问
}
func NewNodeServer(processChannels []chan []*common_models.ProcessData) *NodeServiceServer {
stageResultBufSize := configLoad.LoadConfig().GetInt("performance.node.stageResultBufSize")
iotaBufSize := configLoad.LoadConfig().GetInt("performance.node.iotaBufSize")
s := &NodeServiceServer{
iotaDataHandler: et_recv.NewRecvDataHandler(),
aggDataHandler: et_analyze.NewAggThresholdHandler(),
processChannels: processChannels,
outProcessChannel: make(chan []*common_models.ProcessData, stageResultBufSize),
}
s.iotaChannels = s.NewIotaChannels(len(processChannels), iotaBufSize)
// 启动 DeviceInfo 缓存更新协程,设置为每 10 分钟更新一次
s.iotaDataHandler.RecvConfigHelper.StartUpdateDeviceInfo(10*time.Minute, 30)
// 处理 process data
s.RunStageManager()
time.Sleep(500 * time.Millisecond)
// 将 IotaData 转换为 DeviceData, 转换后数据发送到 s.processChannels
go s.HandleIotaChannels()
return s
}
func (s *NodeServiceServer) NewIotaChannels(count, bufferSize int) []chan []common_models.IotaData {
channels := make([]chan []common_models.IotaData, count)
for i := 0; i < count; i++ {
channels[i] = make(chan []common_models.IotaData, bufferSize)
}
return channels
}
func (s *NodeServiceServer) HandleIotaChannels() {
iotaWorkerCount := configLoad.LoadConfig().GetInt("performance.node.iotaWorkerCount")
for index, ch := range s.iotaChannels {
for w := 0; w < iotaWorkerCount; w++ {
go func(c chan []common_models.IotaData) {
s.HandleIotaChan(c, index)
}(ch)
}
}
}
// func (s *NodeServiceServer) HandleProcessChannels() {
func (s *NodeServiceServer) RunStageManager() {
for _, ch := range s.processChannels {
go func(c chan []*common_models.ProcessData) {
stageMgr := CreateStages(c, s.outProcessChannel)
stageMgr.RunStages()
}(ch)
}
}
func (s *NodeServiceServer) sendToIotaChannels(data []common_models.IotaData) (chan []common_models.IotaData, bool) {
startTime := time.Now()
defer func() {
elapsedTime := time.Since(startTime)
log.Printf("sendToIotaChannels elapsedTime= %s\n", elapsedTime)
//log.Printf("Final iotaData channel states: ")
//for _, ch := range s.iotaChannels {
// log.Printf("iotaChan[%p]: %d/%d\n", ch, len(ch), cap(ch))
//}
}()
var selectedChannel chan []common_models.IotaData
minLength := math.MaxInt32
// 选择最空闲的通道
for _, ch := range s.iotaChannels {
if len(ch) < minLength {
minLength = len(ch)
selectedChannel = ch
}
}
// 尝试发送数据
select {
case selectedChannel <- data:
return selectedChannel, true
case <-time.After(100 * time.Millisecond): // 设置超时时间
log.Println("Timeout while trying to send iotaData.")
return nil, false
}
return nil, false // 如果所有通道都满了,返回 nil 和 false
}
// 将 IotaData 转换为 DeviceData, 转换后数据发送到 s.processChannels
func (s *NodeServiceServer) HandleIotaChan(ch chan []common_models.IotaData, index int) {
// 创建一个速率限制器,每秒允许处理 100 条数据
limiter := rate.NewLimiter(10, 1) // 100 次/秒,突发容量为 1
for data := range ch {
// 等待直到可以处理下一条数据
if err := limiter.Wait(context.Background()); err != nil {
log.Printf("处理速率限制错误: %v", err)
continue
}
go func(iotaDataArray []common_models.IotaData) {
dataHandleTime := time.Now() // 记录 data 的处理开始时间
formattedTime := dataHandleTime.Format("2006-01-02 15:04:05.999999999")
defer func() {
if r := recover(); r != nil {
log.Printf("Recovered from panic: %v", r)
}
log.Printf("4.iotaDataArray[%v] 处理耗时:%v", formattedTime, time.Since(dataHandleTime))
}()
log.Printf("1.iotaDataArray[%v] 准备处理。processChannel[%p]数据量:%d/%d", formattedTime, ch, len(ch), cap(ch))
processDataArray := make([]*common_models.ProcessData, 0, len(iotaDataArray))
for _, r := range iotaDataArray {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
iotaHandleTime := time.Now() // 记录 iota 数据处理开始时间
deviceData, err := s.iotaDataHandler.OnDataHandler(ctx, r)
iotaHandleElapse := time.Since(iotaHandleTime) // 计算数据处理耗时
if err != nil {
log.Printf("IotaData->DeviceData[%s] 转换错误:%s. 耗时:%v。", r.DeviceId, err.Error(), iotaHandleElapse)
return
}
if deviceData == nil {
log.Printf("IotaData->DeviceData[%s] 转换错误:deviceData is nil. 耗时:%v。", r.DeviceId, iotaHandleElapse)
return
}
// 将数据写入 processDataChannel
pd := &common_models.ProcessData{
DeviceData: *deviceData,
Stations: []common_models.Station{},
}
processDataArray = append(processDataArray, pd)
}
log.Printf("2.iotaDataArray[%v] 已处理完 IotaData->DeviceData ", formattedTime)
sendTime := time.Now()
processChannel, ok := s.sendToProcessChannels(processDataArray, index) // 这里会一直等到有资源
if !ok {
// TODO processChannels 满了之后做什么处理?
log.Printf("3.iotaDataArray[%v] s.processChannels %d个通道都已满,被阻塞。", formattedTime, len(s.processChannels))
} else {
log.Printf("3.iotaDataArray[%v] 已发送至s.processChannels。processChannel[%p]数据量:%d/%d, \n发送耗时:%v ,iotaDataArray处理耗时:%v",
formattedTime, processChannel, len(processChannel), cap(processChannel), time.Since(sendTime), time.Since(dataHandleTime))
}
}(data)
}
}
func (s *NodeServiceServer) sendToProcessChannels(data []*common_models.ProcessData, index int) (chan []*common_models.ProcessData, bool) {
startTime := time.Now()
//timeoutDuration := configLoad.LoadConfig().GetUint16("performance.node.processTimeout") // 设置超时时间为 60 秒
defer func() {
elapsedTime := time.Since(startTime)
log.Printf("[sendToProcessChannels] elapsedTime= %s\n", elapsedTime)
//log.Printf("[sendToProcessChannels] Final processData channel states:")
//for _, ch := range s.processChannels {
// log.Printf("[sendToProcessChannels] processChan[%p]: %d/%d\n", ch, len(ch), cap(ch))
//}
}()
selectedChannel := s.processChannels[index]
log.Printf("[sendToProcessChannels] 尝试发送。 channel[%p]: %d/%d\n", selectedChannel, len(selectedChannel), cap(selectedChannel))
// 超时限制
//timeout := time.After(time.Duration(timeoutDuration))
for {
select {
case selectedChannel <- data:
// 发送成功
log.Printf("[sendToProcessChannels] 发送成功。channel[%p]: %d/%d\n", selectedChannel, len(selectedChannel), cap(selectedChannel))
time.Sleep(50 * time.Millisecond)
return selectedChannel, true
default:
//log.Printf("[sendToProcessChannels] channel[%p] 已满 cap=%d,继续尝试发送。\n", selectedChannel, cap(selectedChannel))
time.Sleep(200 * time.Millisecond) // 等待一段时间后再尝试
//case <-timeout:
// log.Printf("[sendToProcessChannels] 发送超时超过1分钟,将停止尝试。\n")
// return nil, false // 超时返回 nil 和 false
//case <-time.After(500 * time.Millisecond):
// log.Printf("[sendToProcessChannels] 发送超时500ms,将继续尝试channel[%p]。\n", selectedChannel)
// continue
}
}
}
var limiter = rate.NewLimiter(rate.Limit(500), 1) // 每秒最多处理 1000 条数据
func (s *NodeServiceServer) HandleIotaData(ctx context.Context, req *pb.HandleDataRequest) (*pb.HandleDataResponse, error) {
if err := limiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("请求速率过高,请稍后重试")
}
startTime := time.Now()
// 1. 数据转换
conversionStart := time.Now()
iotaDataList := s.convertMessages2IotaData(req.Messages)
conversionDuration := time.Since(conversionStart)
log.Printf("[INFO][HandleIotaData] 1.数据转换耗时: %v, 共 %d 条数据。", conversionDuration, len(req.Messages))
// 2. 发送到 Iota 通道
sendStart := time.Now()
_, ok := s.sendToIotaChannels(iotaDataList)
sendDuration := time.Since(sendStart)
log.Printf("[INFO] [HandleIotaData] 2.sendToIotaChannels耗时: %v。", sendDuration)
//log.Printf("[INFO] [HandleIotaData] 2.sendToIotaChannels耗时: %v。通道状态: %v, 通道指针: %p, 当前长度: %d/%d",
// sendDuration, ok, ch, len(ch), cap(ch))
if !ok {
log.Printf("[WARN] [HandleIotaData] 2.所有 Iota 通道已满,无法发送数据,通道数量: %d", len(s.iotaChannels))
return &pb.HandleDataResponse{
Addr: s.Addr,
Load: s.Load,
Status: pb.HandleDataResponse_SUCCESS,
ErrorMessage: "s.iotaChannels 通道已满",
}, nil
}
// 3. 计算总处理时长
totalDuration := time.Since(startTime)
log.Printf("[INFO] [HandleIotaData] 3.总处理时长: %v", totalDuration)
return &pb.HandleDataResponse{
Addr: s.Addr,
Load: s.Load,
Status: pb.HandleDataResponse_SUCCESS,
ErrorMessage: "",
}, nil
}
// HandleAggData 处理聚集数据并返回节点响应
func (s *NodeServiceServer) HandleAggData(ctx context.Context, req *pb.HandleDataRequest) (*pb.HandleDataResponse, error) {
if err := limiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("请求速率过高,请稍后重试")
}
startTime := time.Now()
// 1. 数据转换
conversionStart := time.Now()
aggDataList := s.convertMessages2AggData(req.Messages)
conversionDuration := time.Since(conversionStart)
log.Printf("[INFO][HandleAggData] 1.数据转换耗时: %v, 共 %d 条数据。", conversionDuration, len(req.Messages))
// 2. 发送到 Iota 通道
analyzeStart := time.Now()
for _, aggData := range aggDataList {
err := s.aggDataHandler.ProcessData(&aggData)
if err != nil {
errmsg := fmt.Sprintf("[etNode.AggDataHandler] 2.变化速率阈值分析%s[aggTypeId:%d]ERROR: %v", aggData.R(), aggData.AggTypeId, err)
log.Println(errmsg)
}
//} else {
// log.Printf("[etNode.AggDataHandler]变化速率阈值分析SUCCESS。%s[aggTypeId:%d]changed[%v]", aggData.R(), aggData.AggTypeId, aggData.Changed)
//}
}
analyzeDuration := time.Since(analyzeStart)
log.Printf("[INFO][HandleAggData] 2. 变化速率阈值分析耗时: %v。", analyzeDuration)
// 3. 计算总处理时长
totalDuration := time.Since(startTime)
log.Printf("[INFO][HandleAggData] 3.总处理时长: %v", totalDuration)
// 返回响应
return &pb.HandleDataResponse{
Addr: s.Addr,
Load: s.Load,
Status: pb.HandleDataResponse_SUCCESS,
ErrorMessage: "",
}, nil
}
// mustEmbedUnimplementedNodeServiceServer 确保实现了接口
func (s *NodeServiceServer) mustEmbedUnimplementedNodeServiceServer() {}
// createErrorResponse 用于创建错误响应
func (s *NodeServiceServer) createErrorResponse(status pb.HandleDataResponse_Status, message string) (*pb.HandleDataResponse, error) {
response := &pb.HandleDataResponse{
Addr: s.Addr,
Load: s.Load,
Status: status,
ErrorMessage: message,
}
log.Printf(message) // 记录错误信息
return response, fmt.Errorf(message)
}
func (s *NodeServiceServer) convertMessages2IotaData(messages []string) []common_models.IotaData {
st := time.Now()
dataArray := make([]common_models.IotaData, 0, len(messages))
// 尝试批量解析
jsonArray := fmt.Sprintf("[%s]", strings.Join(messages, ","))
if err := json.Unmarshal([]byte(jsonArray), &dataArray); err != nil {
// 批量解析失败,逐个解析
for _, msg := range messages {
var data common_models.IotaData
if err := json.Unmarshal([]byte(msg), &data); err != nil {
log.Printf("逐个 JSON 反序列化失败:%v", err)
continue
}
dataArray = append(dataArray, data)
}
}
log.Printf("[convertMessages2IotaData] 序列化耗时:%v ,共解析出 %d 个 IotaData。", time.Since(st), len(dataArray))
return dataArray
}
func (s *NodeServiceServer) convertMessages2AggData(messages []string) []common_models.AggData {
//log.Printf("[convertMessages2AggData] len(messages)=%d ,start ...", len(messages))
st := time.Now()
// 预分配 aggDatas 的容量
aggDatas := make([]common_models.AggData, 0, len(messages))
// 尝试批量解析 JSON 数组
jsonArray := fmt.Sprintf("[%s]", strings.Join(messages, ","))
var tmpDatas []AggDataJson
err := json.Unmarshal([]byte(jsonArray), &tmpDatas)
if err != nil {
log.Printf("JSON 数组反序列化失败,尝试逐个解析:%v", err)
// 如果批量解析失败,逐个解析 JSON 字符串
var wg sync.WaitGroup
var mu sync.Mutex
for _, val := range messages {
wg.Add(1)
go func(msg string) {
defer wg.Done()
var data AggDataJson
if err := json.Unmarshal([]byte(msg), &data); err != nil {
log.Printf("逐个 JSON 反序列化失败:%v", err)
return
}
// 加锁保护 aggDatas
mu.Lock()
aggDatas = append(aggDatas, common_models.AggData{
Date: time.Time(data.Date),
SensorId: data.SensorId,
StructId: data.StructId,
FactorId: data.FactorId,
AggTypeId: data.AggTypeId,
AggMethodId: data.AggMethodId,
Agg: data.Agg,
Changed: data.Changed,
ThingId: "",
})
mu.Unlock()
}(val)
}
// 等待所有 Goroutine 完成
wg.Wait()
} else {
// 批量解析成功,直接转换
aggDatas = make([]common_models.AggData, len(tmpDatas))
for i, data := range tmpDatas {
aggDatas[i] = common_models.AggData{
Date: time.Time(data.Date),
SensorId: data.SensorId,
StructId: data.StructId,
FactorId: data.FactorId,
AggTypeId: data.AggTypeId,
AggMethodId: data.AggMethodId,
Agg: data.Agg,
Changed: data.Changed,
ThingId: "",
}
}
}
log.Printf("[convertMessages2AggData] 序列化耗时:%v ,共解析出 %d 个 AggData。", time.Since(st), len(aggDatas))
return aggDatas
}
Loading…
Cancel
Save