et-go 20240919重建
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

133 lines
3.6 KiB

package app
import (
"encoding/gob"
"et_Info"
"et_analyze"
"et_cache"
"et_calc"
"et_print"
"et_push"
"et_sink"
"fmt"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"gopkg.in/natefinch/lumberjack.v2"
"io"
"log"
"net"
"net/rpc"
"node/stages"
"os"
"time"
)
func init() {
multiWriter := io.MultiWriter(os.Stdout, &lumberjack.Logger{
Filename: "./logs/logInfo.log",
MaxSize: 10, // megabytes
MaxBackups: 20,
MaxAge: 30, //days
//Compress: true,
})
log.SetFlags(log.LstdFlags | log.Lshortfile | log.Lmicroseconds)
log.SetOutput(multiWriter)
log.Println("=================log start=================")
}
func Start() {
// etNode 注册
nodeWorker := NewEtWorker()
// etNode 数据后处理环节
nodeStageManage := stages.NewStageManager()
nodeStageManage.AddSource(nodeWorker.ch)
//add 业务环节
nodeStageManage = addWorkStages(nodeStageManage)
//add 测试环节
//nodeStageManage = addTestPrintStages(nodeStageManage)
// 启动 etNode 处理
nodeStageManage.Run()
gob.Register([]interface{}{})
err := rpc.RegisterName("etNode", nodeWorker)
if err != nil {
log.Panicf("注册 etNode rpc 异常")
}
go nodeSerRpcListen()
// aggNode 注册,无数据后处理环节
//aggWorker := agg_worker.NewAggWorker()
//err1 := rpc.RegisterName("aggNode", aggWorker)
//if err1 != nil {
// log.Fatal("注册 aggNode rpc 异常", err1)
//}
//aggWorker.RegisterToMaster()
//后移注册流程,避免node启动异常的无效注册
nodeWorker.RegisterToMaster()
for {
time.Sleep(time.Hour)
}
}
func nodeSerRpcListen() {
port := configLoad.LoadConfig().GetUint16("node.port")
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Panicf("服务启动rpc 异常=%s", err.Error())
}
log.Printf("服务监听=> :%d", port)
for {
conn, err := listener.Accept()
if err != nil {
log.Println("rpc Accept异常")
}
log.Printf("node 建立链接 from master[%s]", conn.RemoteAddr())
go rpc.ServeConn(conn)
}
}
func addWorkStages(nodeStageManage *stages.StageManager) *stages.StageManager {
// raws 数据存储
sinkRawHandler := et_sink.NewSinkRawHandler()
nodeStageManage.AddStages(sinkRawHandler.GetStage())
// 测点信息获取
infoHandler := et_Info.NewInfoHandler()
nodeStageManage.AddStages(infoHandler.GetStage())
// 单测点计算
calcHandler := et_calc.NewCalcHandler()
nodeStageManage.AddStages(calcHandler.GetStage())
cacheHandler := et_cache.NewCacheHandler()
nodeStageManage.AddStages(cacheHandler.GetStage())
// Theme 数据存储
sinkThemeHandler := et_sink.NewSinkThemeHandler()
nodeStageManage.AddStages(sinkThemeHandler.GetStage())
// 测点主题数据的阈值分析
stationAnalyzeHandler := et_analyze.NewThresholdHandler()
nodeStageManage.AddStages(stationAnalyzeHandler.GetStage())
// 测点分组计算
//groupCalcHandler := group.NewGroupCalc()
//nodeStageManage.AddStages(groupCalcHandler.GetStage())
// EsGroupTheme 数据存储
//sinkGroupHandler := et_sink.NewSinkGroupHandler()
//nodeStageManage.AddStages(sinkGroupHandler.GetStage())
//// (阈值分析) 测点主题数据的阈值分析 + 分组计算后数据的阈值分析
//groupAnalyzeHandler := et_analyze.NewThresholdHandler()
//nodeStageManage.AddStages(groupAnalyzeHandler.GetStage())
publishHandler := et_push.NewPushHandler()
nodeStageManage.AddStages(publishHandler.GetStage())
return nodeStageManage
}
func addTestPrintStages(nodeStageManage *stages.StageManager) *stages.StageManager {
printHandler := et_print.NewPrintHandler()
nodeStageManage.AddStages(printHandler.GetStage())
return nodeStageManage
}