et-go 20240919重建
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

266 lines
6.8 KiB

1 month ago
package app
import (
"context"
"et_analyze"
1 month ago
"fmt"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"log"
"net/rpc"
"node/et_worker/et_recv"
"os"
"os/signal"
"syscall"
"time"
)
type EtNode struct {
nodeInfo *common_models.NodeArgs
master *rpcMaster
ch chan *common_models.ProcessData
recvDataHandler *et_recv.RecvDataHanler
aggAnalyzeHandler *et_analyze.AggThresholdHandler
1 month ago
}
type rpcMaster struct {
conn *rpc.Client
addr string
}
const chSize = 1
1 month ago
func NewEtWorker() *EtNode {
node := &EtNode{
ch: make(chan *common_models.ProcessData, chSize),
recvDataHandler: et_recv.NewRecvDataHanler(),
aggAnalyzeHandler: et_analyze.NewAggThresholdHandler(),
1 month ago
}
node.exitMonitor()
node.heartMonitor()
return node
}
// IotaDataHandler 是 RPC 服务方法,由 master 远程调用
func (the *EtNode) IotaDataHandler(iotaData common_models.IotaData, reply *bool) error {
*reply = true
1 month ago
err := the.ConsumerProcess(&iotaData)
if err != nil {
*reply = false
1 month ago
}
return err
}
4 weeks ago
// 是沉降测试数据
func isSettleData(data map[string]interface{}) bool {
// {"pressure":23.09,"temperature":24.93,"ssagee":16.44}
validKeys := map[string]bool{
"pressure": true,
"temperature": true,
"ssagee": true,
}
if len(data) != 3 {
return false
}
for key := range data {
if !validKeys[key] {
return false
}
}
return true
}
1 month ago
// ConsumerProcess 将 IotaData -> ProcessData
func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error {
4 weeks ago
//TODO #TEST BEGIN 测试静力水准仪 (现在有计算公式的单测点计算有问题,为了能跑通 沉降分组计算 测试)
//if !isSettleData(iotaData.Data.Data) {
// return nil
//}
// #TEST END
1 month ago
deviceData, err := the.recvDataHandler.OnDataHandler(*iotaData)
if err != nil {
return err
}
if deviceData == nil {
return nil
}
log.Printf("rpc处理设备数据[%s]-time[%v]-[%v]", deviceData.DeviceId, deviceData.AcqTime, deviceData.Raw)
4 weeks ago
1 month ago
the.ch <- &common_models.ProcessData{
DeviceData: *deviceData,
Stations: []common_models.Station{},
}
return nil
}
// AggDataHandler 聚集阈值处理者,被 master 远程调用
func (the *EtNode) AggDataHandler(aggData common_models.AggData, reply *bool) error {
*reply = true
err := the.aggAnalyzeHandler.ProcessData(&aggData)
if err != nil {
errmsg := fmt.Sprintf("[etNode.AggDataHandler]变化速率阈值分析%s[aggTypeId:%d]ERROR: %v", aggData.R(), aggData.AggTypeId, err)
log.Println(errmsg)
return err
}
log.Printf("[etNode.AggDataHandler]变化速率阈值分析SUCCESS。%s[aggTypeId:%d]changed[%v]", aggData.R(), aggData.AggTypeId, aggData.Changed)
return nil
}
1 month ago
// 实现源接口
func (the *EtNode) Process(ctx context.Context) (<-chan any, error) {
source := make(chan any, chSize)
go func() {
defer close(source)
for {
select {
case a := <-the.ch:
source <- a
log.Printf("存储数据=>source out,len=%d,%d", len(source), len(the.ch))
case <-ctx.Done():
log.Println("退出[source] EtNode.Process")
return
}
}
}()
return source, nil
}
1 month ago
// RegisterToMaster 调用 master 发布的RPC服务方法 master.NodeRegister
func (the *EtNode) RegisterToMaster() {
maxCount := 3
connectCount := 0
for {
connectCount++
if connectCount > maxCount {
log.Printf("RegisterToMaster 失败 超过%d次,准备退出", maxCount)
time.Sleep(time.Second * 10)
os.Exit(1)
}
masterAddr := loadMasterAddr()
masterConn, err := rpc.Dial("tcp", masterAddr)
if err != nil {
log.Printf("链接失败-> node[%s]", masterAddr)
time.Sleep(time.Second * 5)
continue
}
the.master = &rpcMaster{
conn: masterConn,
addr: masterAddr,
}
time.Sleep(time.Millisecond * 200)
//获取node自己地址
ipPrefix := configLoad.LoadConfig().GetString("node.hostIpPrefix")
ip4 := common_utils.ReadIP4WithPrefixFirst(ipPrefix)
hostName, err := os.Hostname()
log.Printf("node [%s] ip=%s\n", hostName, ip4)
1 month ago
port := configLoad.LoadConfig().GetUint16("node.port")
callNodeAddr := fmt.Sprintf("%s:%d", ip4, port)
if the.nodeInfo == nil {
the.nodeInfo = &common_models.NodeArgs{
ID: hostName + time.Now().Format("_20060102_150405"),
NodeType: "etNode",
Status: "",
Resources: "",
Addr: callNodeAddr,
ThingIds: []string{},
}
}
var result bool
err = the.master.conn.Call("master.NodeRegister", the.nodeInfo, &result)
if err != nil {
log.Printf("node[%s] 注册到 master[%s]异常:%v", the.nodeInfo.Addr, the.master.addr, result)
continue
}
break
}
}
func (the *EtNode) heartToMaster() {
maxCount := 3
connectCount := 0
reRegister := false
for {
connectCount++
if connectCount > maxCount {
log.Printf("heartToMaster 失败 超过%d次", maxCount)
reRegister = true
break
}
var result bool
err := the.master.conn.Call("master.NodeHeart", the.nodeInfo, &result)
if err != nil {
log.Printf("node[%s] 心跳到 master[%s]异常:%v", the.nodeInfo.Addr, the.master.addr, result)
time.Sleep(time.Second * 5)
continue
}
break
}
if reRegister { //触发重新注册
log.Printf("node[%s] 心跳失败触发-重新注册到 master[%s]", the.nodeInfo.Addr, the.master.addr)
the.RegisterToMaster()
}
}
func (the *EtNode) UnRegisterToMaster() {
var result bool
if err := the.master.conn.Call("master.NodeUnRegister", the.nodeInfo, &result); err != nil {
log.Printf("node[%s] 从master注销,异常:%v", the.nodeInfo.Addr, err.Error())
1 month ago
} else {
log.Printf("node[%s] 从master注销,结果:%v", the.nodeInfo.Addr, result)
1 month ago
}
}
func (the *EtNode) exitMonitor() {
go func() {
c := make(chan os.Signal, 1)
// 通过signal.Notify函数将信号通道c注册到系统相关的退出信号上
// 这里使用了两个退出信号:syscall.SIGINT(Ctrl+C)和syscall.SIGTERM(系统发送的退出信号)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGKILL)
1 month ago
// 阻塞等待接收信号
s := <-c
log.Printf("接收到退出信号:%v,进行清理工作", s)
1 month ago
the.UnRegisterToMaster()
time.Sleep(3 * time.Second)
os.Exit(0)
}()
}
func (the *EtNode) heartMonitor() {
go func() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
if the.master != nil {
log.Printf("node[%s] 心跳触发-> master[%s]", the.nodeInfo.Addr, the.master.addr)
the.heartToMaster()
}
}
}()
}
// LoadCh test用
func (the *EtNode) LoadCh() chan *common_models.ProcessData {
return the.ch
}
func loadMasterAddr() string {
masterHost := configLoad.LoadConfig().GetString("node.remoteMasterHost")
masterPort := configLoad.LoadConfig().GetUint16("master.port")
if masterHost == "" {
masterHost = "127.0.0.1"
}
if masterPort == 0 {
masterPort = 50000
}
return fmt.Sprintf("%s:%d", masterHost, masterPort)
}