package app import ( "encoding/gob" "et_Info" "et_analyze" "et_cache" "et_calc" "et_calc/group" "et_print" "et_push" "et_sink" "fmt" "gitea.anxinyun.cn/container/common_utils/configLoad" "gopkg.in/natefinch/lumberjack.v2" "io" "log" "net" "net/rpc" "node/stages" "os" "time" ) func init() { multiWriter := io.MultiWriter(os.Stdout, &lumberjack.Logger{ Filename: "./logs/logInfo.log", MaxSize: 10, // megabytes MaxBackups: 20, MaxAge: 30, //days //Compress: true, }) log.SetFlags(log.LstdFlags | log.Lshortfile | log.Lmicroseconds) log.SetOutput(multiWriter) log.Println("=================log start=================") } func Start() { // etNode 注册 nodeWorker := NewEtWorker() // etNode 数据后处理环节 nodeStageManage := stages.NewStageManager() nodeStageManage.AddSource(nodeWorker.ch) //add 业务环节 nodeStageManage = addWorkStages(nodeStageManage) //add 测试环节 //nodeStageManage = addTestPrintStages(nodeStageManage) // 启动 etNode 处理 nodeStageManage.Run() gob.Register([]interface{}{}) err := rpc.RegisterName("etNode", nodeWorker) if err != nil { log.Panicf("注册 etNode rpc 异常") } go nodeSerRpcListen() //后移注册流程,避免node启动异常的无效注册 nodeWorker.RegisterToMaster() for { time.Sleep(time.Hour) } } func nodeSerRpcListen() { port := configLoad.LoadConfig().GetUint16("node.port") listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { log.Panicf("服务启动rpc 异常=%s", err.Error()) } log.Printf("服务监听=> :%d", port) for { conn, err := listener.Accept() if err != nil { log.Println("rpc Accept异常") } log.Printf("node 建立链接 from master[%s]", conn.RemoteAddr()) go rpc.ServeConn(conn) } } func addWorkStages(nodeStageManage *stages.StageManager) *stages.StageManager { // raws 数据存储 sinkRawHandler := et_sink.NewSinkRawHandler() nodeStageManage.AddStages(sinkRawHandler.GetStage()) // 测点信息获取 infoHandler := et_Info.NewInfoHandler() nodeStageManage.AddStages(infoHandler.GetStage()) // 单测点计算 calcHandler := et_calc.NewCalcHandler() nodeStageManage.AddStages(calcHandler.GetStage()) // 滑窗过滤 cacheHandler := et_cache.NewCacheHandler() nodeStageManage.AddStages(cacheHandler.GetStage()) // 测点分组计算 groupCalcHandler := group.NewGroupCalc() nodeStageManage.AddStages(groupCalcHandler.GetStage()) // Theme 数据存储 sinkThemeHandler := et_sink.NewSinkThemeHandler() nodeStageManage.AddStages(sinkThemeHandler.GetStage()) // 测点阈值分析 stationAnalyzeHandler := et_analyze.NewThresholdHandler() nodeStageManage.AddStages(stationAnalyzeHandler.GetStage()) // 数据推送 publishHandler := et_push.NewPushHandler() nodeStageManage.AddStages(publishHandler.GetStage()) return nodeStageManage } func addTestPrintStages(nodeStageManage *stages.StageManager) *stages.StageManager { printHandler := et_print.NewPrintHandler() nodeStageManage.AddStages(printHandler.GetStage()) return nodeStageManage }