diff --git a/config.yaml b/config.yaml index 0f55c57..64090da 100644 --- a/config.yaml +++ b/config.yaml @@ -1,3 +1,5 @@ +pprof: + enable: false kafka: groupId: lucas_et_go3 brokers: diff --git a/containerApp/go.mod b/containerApp/go.mod index 9076376..020669a 100644 --- a/containerApp/go.mod +++ b/containerApp/go.mod @@ -1,6 +1,6 @@ module containerApp -go 1.22.0 +go 1.23.1 require gitea.anxinyun.cn/container/common_utils v0.0.7 diff --git a/containerApp/main.go b/containerApp/main.go index f09eb98..ef0fd8c 100644 --- a/containerApp/main.go +++ b/containerApp/main.go @@ -24,7 +24,11 @@ func main() { log.Printf("启动类型:master => hostName=[%s]", hostName) etMaster.Start() } else { - pprofRun() + pprofEnable := configLoad.LoadConfig().GetBool("pprof.enable") + if pprofEnable { + pprofRun() + } + log.Printf("启动类型:node => hostName=[%s]", hostName) etNode.Start() } @@ -36,6 +40,9 @@ func pprofRun() { pprofAddr := ":10000" log.Printf("性能分析 => pprofAddr=[%s]", pprofAddr) go func() { - http.ListenAndServe(pprofAddr, nil) //开启一个http服务,nil表示绑定默认路由器DefaultServeMux + err := http.ListenAndServe(pprofAddr, nil) + if err != nil { + log.Panicf("启动异常 => [%s]", err.Error()) + } //开启一个http服务,nil表示绑定默认路由器DefaultServeMux }() } diff --git a/dataSource/go.mod b/dataSource/go.mod index 6f1e119..668ad22 100644 --- a/dataSource/go.mod +++ b/dataSource/go.mod @@ -1,6 +1,6 @@ module dataSource -go 1.22.0 +go 1.23.1 require ( gitea.anxinyun.cn/container/common_models v0.0.7 diff --git a/et_Info/InfoHandler.go b/et_Info/InfoHandler.go index 52a8259..41eead8 100644 --- a/et_Info/InfoHandler.go +++ b/et_Info/InfoHandler.go @@ -46,7 +46,7 @@ func (the *InfoHandler) getStationInfo(p *common_models.ProcessData) *common_mod //存储测点obj err = the.configHelper.SetDeviceStationObjs(p.DeviceData.DeviceId, p.Stations) if err != nil { - log.Printf("缓存异常 err=%s", err.Error()) + log.Printf("SetDeviceStationObjs缓存异常 err=%s", err.Error()) } } return p @@ -70,18 +70,3 @@ func (the *InfoHandler) getFormulaInfo(p *common_models.ProcessData) { } } - -func (the *InfoHandler) getThresholdInfo(p *common_models.ProcessData) { - - for i, stationInfo := range p.Stations { - if stationInfo.Info.Id == 14 { - log.Println("==") - } - threshold, err := the.configHelper.GetStationThreshold(stationInfo.Info.Id) - if err == nil && threshold != nil && threshold.Items != nil { - //p.Stations[i].Threshold = threshold - log.Println("==", i) - } - } - log.Println("=") -} diff --git a/et_Info/go.mod b/et_Info/go.mod index e7bc296..c8a6ddb 100644 --- a/et_Info/go.mod +++ b/et_Info/go.mod @@ -1,6 +1,6 @@ module et_Info -go 1.22.0 +go 1.23.1 require ( gitea.anxinyun.cn/container/common_models v0.0.7 diff --git a/et_analyze/aggThreshold.go b/et_analyze/aggThreshold.go index 1f6e619..fb01bbe 100644 --- a/et_analyze/aggThreshold.go +++ b/et_analyze/aggThreshold.go @@ -114,7 +114,7 @@ func (t *AggThresholdHandler) getAndCacheAlarmMsg(aggData *common_models.AggData // 超阈值告警 alarm := common_models.NewOverChangingRateThreshold(findMinLevel(ls), stringifyThresholds(ls)) if alarm == nil { - log.Printf("[%v] over-agg-threshold:[Level:%d] content:[%s]ERROR: 未找到对应告警等级的告警码。", aggData, alarm.Level, alarm.Content) + //log.Printf("[%v] over-agg-threshold:[Level:%d] content:[%s]ERROR: 未找到对应告警等级的告警码。", aggData, alarm.Level, alarm.Content) return nil } log.Printf("[%v] over-agg-threshold:[Level:%d] content:[%s]", aggData, alarm.Level, alarm.Content) diff --git a/et_analyze/go.mod b/et_analyze/go.mod index a10cd25..fd45f54 100644 --- a/et_analyze/go.mod +++ b/et_analyze/go.mod @@ -14,7 +14,7 @@ module et_analyze -go 1.22.0 +go 1.23.1 require ( gitea.anxinyun.cn/container/common_models v0.0.7 diff --git a/et_analyze/threshold.go b/et_analyze/threshold.go index 5457456..06886a3 100644 --- a/et_analyze/threshold.go +++ b/et_analyze/threshold.go @@ -38,8 +38,8 @@ func NewThresholdHandler() *ThresholdHandler { return model } -func (the *ThresholdHandler) GetStage() stages.Stage { - return *the.stage +func (t *ThresholdHandler) GetStage() stages.Stage { + return *t.stage } // 必须 @@ -118,8 +118,8 @@ func (t *ThresholdHandler) getAndCacheAlarmMsg(station *common_models.Station, l // 超阈值告警 alarm := common_models.NewAlarmOverThreshold(findMinLevel(ls), stringifyThresholds(ls)) if alarm == nil { - log.Printf("over-threshold [%d-%s] level:%d code:%s content:%s time:%s ERROR: 未找到对应告警等级的告警码。\n", - station.Info.Id, station.Info.Name, alarm.Level, alarm.Code, alarm.Content, station.Data.CollectTime) + //log.Printf("over-threshold [%d-%s] level:%d code:%s content:%s time:%s ERROR: 未找到对应告警等级的告警码。\n", + // station.Info.Id, station.Info.Name, alarm.Level, alarm.Code, alarm.Content, station.Data.CollectTime) return nil } diff --git a/et_cache/cacheHandler.go b/et_cache/cacheHandler.go index 1940ac1..afa70ad 100644 --- a/et_cache/cacheHandler.go +++ b/et_cache/cacheHandler.go @@ -30,7 +30,7 @@ func (the *CacheHandler) GetStage() stages.Stage { return *the.stage } func (the *CacheHandler) enqueue(p *common_models.ProcessData) *common_models.ProcessData { - + for _, station := range p.Stations { for _, item := range station.Info.Proto.Items { //字符串类型不处理 @@ -95,7 +95,9 @@ func (the *CacheHandler) windowCalc(raw float64, window common_models.CacheWindo case common_models.Filter_ExtreAverage: result = filterForExtreAverage(raw, window) default: - log.Printf("不支持滑窗公式id:[%d]", window.MethodId) + if window.MethodId != 0 { + log.Printf("不支持滑窗公式id:[%d]", window.MethodId) + } return raw, false } return result, true diff --git a/et_cache/go.mod b/et_cache/go.mod index 00b9fbe..11426c6 100644 --- a/et_cache/go.mod +++ b/et_cache/go.mod @@ -1,6 +1,6 @@ module et_cache -go 1.22.0 +go 1.23.1 require ( gitea.anxinyun.cn/container/common_calc v0.0.1 diff --git a/et_calc/go.mod b/et_calc/go.mod index 382028d..eacdbac 100644 --- a/et_calc/go.mod +++ b/et_calc/go.mod @@ -1,6 +1,6 @@ module et_calc -go 1.22.0 +go 1.23.1 require ( gitea.anxinyun.cn/container/common_calc v0.0.1 diff --git a/et_print/go.mod b/et_print/go.mod index 0ba0217..70abaf9 100644 --- a/et_print/go.mod +++ b/et_print/go.mod @@ -1,2 +1,2 @@ module et_print -go 1.22.0 \ No newline at end of file +go 1.23.1 \ No newline at end of file diff --git a/et_push/go.mod b/et_push/go.mod index 6ca7ec0..9f8b597 100644 --- a/et_push/go.mod +++ b/et_push/go.mod @@ -1,6 +1,6 @@ module et_push -go 1.22.0 +go 1.23.1 require ( gitea.anxinyun.cn/container/common_models v0.0.7 diff --git a/et_sink/go.mod b/et_sink/go.mod index c59e50a..3878c06 100644 --- a/et_sink/go.mod +++ b/et_sink/go.mod @@ -1,6 +1,6 @@ module et_sink -go 1.22.0 +go 1.23.1 require ( gitea.anxinyun.cn/container/common_models v0.0.7 diff --git a/et_sink/sinkHandler.go b/et_sink/sinkHandler.go index 5304685..94efc79 100644 --- a/et_sink/sinkHandler.go +++ b/et_sink/sinkHandler.go @@ -1,6 +1,7 @@ package et_sink import ( + "encoding/json" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils/configLoad" "gitea.anxinyun.cn/container/common_utils/storage/storageDBs" @@ -92,16 +93,22 @@ func (the *SinkHandler) sinkRawDataToES(deviceData common_models.DeviceData) { the.dataQueueVib = append(the.dataQueueVib, vbRaws) case common_models.RawTypeDiag: default: - esRaws := common_models.EsRaw{ - StructId: deviceData.StructId, - IotaDeviceName: deviceData.Name, - Data: deviceData.Raw, - CollectTime: deviceData.AcqTime.Truncate(time.Millisecond), - Meta: deviceData.DeviceInfo.DeviceMeta.GetOutputProps(), - IotaDevice: deviceData.DeviceId, - CreateTime: time.Now().Truncate(time.Millisecond), + + if deviceData.Raw == nil { + msg, _ := json.Marshal(deviceData) + log.Printf("异常空,raw数据 =%s", string(msg)) + } else { + esRaws := common_models.EsRaw{ + StructId: deviceData.StructId, + IotaDeviceName: deviceData.Name, + Data: deviceData.Raw, + CollectTime: deviceData.AcqTime.Truncate(time.Millisecond), + Meta: deviceData.DeviceInfo.DeviceMeta.GetOutputProps(), + IotaDevice: deviceData.DeviceId, + CreateTime: time.Now().Truncate(time.Millisecond), + } + the.dataQueueRaw = append(the.dataQueueRaw, esRaws) } - the.dataQueueRaw = append(the.dataQueueRaw, esRaws) } if len(the.dataQueueRaw) >= the.batchCount || len(the.dataQueueVib) >= the.batchCount { @@ -119,9 +126,9 @@ func (the *SinkHandler) dumpRawBatchMonitor() { if len(the.dataQueueRaw) > 0 { count := len(the.dataQueueRaw) log.Printf("es写入dataQueueRaw数据 count====> %d", count) - needDump := the.dataQueueRaw[:count] + needDump := the.dataQueueRaw[:count] //make([]common_models.EsRaw, count) the.dataQueueRaw = the.dataQueueRaw[count:] - the.dumpRaws(needDump) + go the.dumpRaws(needDump) } if len(the.dataQueueVib) > 0 { @@ -129,7 +136,7 @@ func (the *SinkHandler) dumpRawBatchMonitor() { log.Printf("es写入dataQueueVib数据 count====> %d", count) needDump := the.dataQueueVib[:count] the.dataQueueVib = the.dataQueueVib[count:] - the.dumpVibRaws(needDump) + go the.dumpVibRaws(needDump) } } @@ -138,13 +145,13 @@ func (the *SinkHandler) dumpRawBatchMonitor() { func (the *SinkHandler) dumpRaws(esRaws []common_models.EsRaw) { for i, consumer := range the.storageConsumers { log.Printf("[consumer-%d]存储raw数据 %d 条", i, len(esRaws)) - go consumer.SaveRaw(esRaws) + consumer.SaveRaw(esRaws) } } func (the *SinkHandler) dumpVibRaws(esVbRaws []common_models.EsVbRaw) { for i, consumer := range the.storageConsumers { log.Printf("[consumer-%d]存储VbRaw数据 %d 条", i, len(esVbRaws)) - go consumer.SaveVib(esVbRaws) + consumer.SaveVib(esVbRaws) } } @@ -159,7 +166,7 @@ func (the *SinkHandler) dumpThemeBatchMonitor() { count := len(the.dataQueueTheme) needDump := the.dataQueueTheme[:count] the.dataQueueTheme = the.dataQueueTheme[count:] - the.dumpThemes(needDump) + go the.dumpThemes(needDump) } } @@ -195,7 +202,7 @@ func (the *SinkHandler) sinkThemeData(stations []common_models.Station) { func (the *SinkHandler) dumpThemes(esThemes []common_models.EsTheme) { for i, consumer := range the.storageConsumers { log.Printf("[consumer-%d]存储Theme数据 %d 条", i, len(esThemes)) - go consumer.SaveTheme(esThemes) + consumer.SaveTheme(esThemes) } } diff --git a/master/app/et_master.go b/master/app/et_master.go index c2db4f9..7f84a02 100644 --- a/master/app/et_master.go +++ b/master/app/et_master.go @@ -195,7 +195,7 @@ func (the *EtMaster) call_etNode(node *NodeRpc, args *common_models.IotaData) { log.Printf("node[%s]处理[%s|%s]超过 %v,超时", node.args.Addr, args.DeviceId, args.TriggerTime, timeOut) result = false } - log.Printf("node[%s]处理[%s|%s]结果=%v", node.args.Addr, args.DeviceId, args.TriggerTime, result) + //log.Printf("node[%s]处理[%s|%s]结果=%v", node.args.Addr, args.DeviceId, args.TriggerTime, result) if result == false { //发送 stop 信号 diff --git a/node/app/et_node.go b/node/app/et_node.go index c9feaf4..f047638 100644 --- a/node/app/et_node.go +++ b/node/app/et_node.go @@ -60,7 +60,7 @@ func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error { return nil } - log.Printf("rpc处理设备数据[%s]-time[%s]-[%v]", deviceData.DeviceId, deviceData.AcqTime, deviceData.Raw) + log.Printf("rpc处理设备数据[%s]-time[%v]-[%v]", deviceData.DeviceId, deviceData.AcqTime, deviceData.Raw) //log.Printf("rpc处理设备数据[%s]-time[%s]-data:%v", iotaData.DeviceId, iotaData.TriggerTime, iotaData.ThemeData.ThemeData) the.ch <- &common_models.ProcessData{ DeviceData: *deviceData, @@ -179,10 +179,10 @@ func (the *EtNode) exitMonitor() { c := make(chan os.Signal, 1) // 通过signal.Notify函数将信号通道c注册到系统相关的退出信号上 // 这里使用了两个退出信号:syscall.SIGINT(Ctrl+C)和syscall.SIGTERM(系统发送的退出信号) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGKILL) // 阻塞等待接收信号 s := <-c - log.Printf("接收到退出信号:%v,进行清理工作\n", s) + log.Printf("接收到退出信号:%v,进行清理工作", s) the.UnRegisterToMaster() time.Sleep(3 * time.Second) os.Exit(0) diff --git a/node/go.mod b/node/go.mod index ec92bbf..3ca2afd 100644 --- a/node/go.mod +++ b/node/go.mod @@ -14,7 +14,11 @@ require ( github.com/allegro/bigcache v1.2.1 // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/bytedance/sonic v1.12.2 // indirect + github.com/bytedance/sonic/loader v0.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cloudwego/base64x v0.1.4 // indirect + github.com/cloudwego/iasm v0.2.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/eapache/go-resiliency v1.6.0 // indirect @@ -42,6 +46,7 @@ require ( github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/klauspost/compress v1.17.7 // indirect + github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/oapi-codegen/runtime v1.0.0 // indirect @@ -61,8 +66,10 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/viper v1.18.2 // indirect github.com/subosito/gotenv v1.6.0 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect + golang.org/x/arch v0.4.0 // indirect golang.org/x/crypto v0.19.0 // indirect golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/net v0.21.0 // indirect diff --git a/node/stages/stage.go b/node/stages/stage.go index 1b4455a..608f353 100644 --- a/node/stages/stage.go +++ b/node/stages/stage.go @@ -52,7 +52,7 @@ func (s *Stage) process(data *common_models.ProcessData) *common_models.ProcessD defer timeCost(s.Name, data.DeviceData.DeviceId, time.Now()) data = processFunc(data) }() - //log.Printf("stage[%s][%s]流程处理 over=> %s", s.Name, tag, data.DeviceData.Name) + //log.Printf("stage[%s]流程处理 over=> %s", s.Name, data.DeviceData.Name) } return data