diff --git a/et_Info/go.mod b/et_Info/go.mod index c8a6ddb..0c1cd86 100644 --- a/et_Info/go.mod +++ b/et_Info/go.mod @@ -22,7 +22,6 @@ require ( github.com/gorilla/websocket v1.5.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/magiconair/properties v1.8.7 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect diff --git a/et_sink/sinkHandler.go b/et_sink/sinkHandler.go index 066b8e6..a4bdc61 100644 --- a/et_sink/sinkHandler.go +++ b/et_sink/sinkHandler.go @@ -7,7 +7,6 @@ import ( "gitea.anxinyun.cn/container/common_utils/storage/storageDBs" "log" "node/stages" - "slices" "sync" "time" ) @@ -181,7 +180,8 @@ func (the *SinkHandler) dumpThemeBatchMonitor() { if len(the.dataQueueTheme) > 0 { the.lock.RLock() - count := len(the.dataQueueTheme) //todo 避免临界操作 + count := len(the.dataQueueTheme) + log.Printf("es写入dataQueueTheme数据 count====> %d", count) needDump := make([]common_models.EsTheme, count) //避免引用问题 copy(needDump, the.dataQueueTheme[:count]) @@ -198,7 +198,6 @@ func (the *SinkHandler) sinkThemeToES(p *common_models.ProcessData) *common_mode return p } func (the *SinkHandler) sinkThemeData(stations []common_models.Station) { - var EsThemes []common_models.EsTheme for _, station := range stations { esTheme := common_models.EsTheme{ SensorName: station.Info.Name, @@ -213,20 +212,18 @@ func (the *SinkHandler) sinkThemeData(stations []common_models.Station) { IotaDevice: station.Info.GetDeviceIdArray(), CreateTime: time.Now().Truncate(time.Millisecond), } - EsThemes = append(EsThemes, esTheme) + the.lock.Lock() + the.dataQueueTheme = append(the.dataQueueTheme, esTheme) + the.lock.Unlock() } - the.lock.Lock() - the.dataQueueTheme = slices.Concat(the.dataQueueTheme, EsThemes) - the.lock.Unlock() - //the.dataQueueTheme = append(the.dataQueueTheme, EsThemes...) if len(the.dataQueueTheme) >= the.batchCount { the.signBatch <- true } } func (the *SinkHandler) dumpThemes(esThemes []common_models.EsTheme) { for i, consumer := range the.storageConsumers { - log.Printf("[consumer-%d]存储Theme数据 %d 条", i, len(esThemes)) consumer.SaveTheme(esThemes) + log.Printf("[consumer-%d]存储Theme数据 %d 条", i, len(esThemes)) } } diff --git a/node/app/app.go b/node/app/app.go index 179754d..c598faf 100644 --- a/node/app/app.go +++ b/node/app/app.go @@ -11,15 +11,27 @@ import ( "et_sink" "fmt" "gitea.anxinyun.cn/container/common_utils/configLoad" + "gopkg.in/natefinch/lumberjack.v2" + "io" "log" "net" "net/rpc" "node/stages" + "os" "time" ) func init() { + multiWriter := io.MultiWriter(os.Stdout, &lumberjack.Logger{ + Filename: "./logs/logInfo.log", + MaxSize: 10, // megabytes + MaxBackups: 20, + MaxAge: 30, //days + //Compress: true, + }) log.SetFlags(log.LstdFlags | log.Lshortfile | log.Lmicroseconds) + log.SetOutput(multiWriter) + log.Println("=================log start=================") } func Start() { diff --git a/node/go.mod b/node/go.mod index 3ca2afd..8d2eb7a 100644 --- a/node/go.mod +++ b/node/go.mod @@ -6,6 +6,7 @@ require ( gitea.anxinyun.cn/container/common_models v0.0.7 gitea.anxinyun.cn/container/common_utils v0.0.7 github.com/google/uuid v1.6.0 + gopkg.in/natefinch/lumberjack.v2 v2.2.1 ) require ( @@ -14,11 +15,7 @@ require ( github.com/allegro/bigcache v1.2.1 // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/bytedance/sonic v1.12.2 // indirect - github.com/bytedance/sonic/loader v0.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/cloudwego/base64x v0.1.4 // indirect - github.com/cloudwego/iasm v0.2.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/eapache/go-resiliency v1.6.0 // indirect @@ -46,8 +43,8 @@ require ( github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/klauspost/compress v1.17.7 // indirect - github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/magiconair/properties v1.8.7 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/oapi-codegen/runtime v1.0.0 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect @@ -66,10 +63,8 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/viper v1.18.2 // indirect github.com/subosito/gotenv v1.6.0 // indirect - github.com/twitchyliquid64/golang-asm v0.15.1 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect - golang.org/x/arch v0.4.0 // indirect golang.org/x/crypto v0.19.0 // indirect golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/net v0.21.0 // indirect diff --git a/node/stages/stage.go b/node/stages/stage.go index 608f353..dc421ef 100644 --- a/node/stages/stage.go +++ b/node/stages/stage.go @@ -12,6 +12,7 @@ type Stage struct { In <-chan *common_models.ProcessData processFuncs []func(*common_models.ProcessData) *common_models.ProcessData Out chan *common_models.ProcessData + execOver chan bool //阶段执行完毕,用以排查超时 } func NewStage(name string) *Stage { @@ -20,6 +21,7 @@ func NewStage(name string) *Stage { processFuncs: make([]func(*common_models.ProcessData) *common_models.ProcessData, 0), In: make(<-chan *common_models.ProcessData, 1), Out: make(chan *common_models.ProcessData, 1), + execOver: make(chan bool, 1), } } @@ -45,18 +47,28 @@ func (s *Stage) AddProcess(fun func(*common_models.ProcessData) *common_models.P } func (s *Stage) process(data *common_models.ProcessData) *common_models.ProcessData { + go s.handlerTimeOutCheck(s.Name, data.DeviceData.DeviceId) for _, processFunc := range s.processFuncs { //tag := fmt.Sprintf("%d/%d", i+1, len(s.processFuncs)) - //log.Printf("stage[%s][%s]流程处理 start=> %s", s.Name, tag, data.DeviceData.Name) + //log.Printf("stage[%s]流程处理 start=> %s", s.Name, data.DeviceData.Name) func() { defer timeCost(s.Name, data.DeviceData.DeviceId, time.Now()) data = processFunc(data) }() //log.Printf("stage[%s]流程处理 over=> %s", s.Name, data.DeviceData.Name) } - + s.execOver <- true return data } +func (s *Stage) handlerTimeOutCheck(stageName, deviceId string) { + defaultTimeout := 10 * time.Second + select { + case <-s.execOver: + case <-time.After(defaultTimeout): + log.Printf("=====================") + log.Panicf("stage[%s] ->[%s] 流程处理,超时[%v],请排查", stageName, deviceId, defaultTimeout) + } +} func timeCost(nodeId, deviceId string, start time.Time) { tc := time.Since(start) log.Printf("stage[%s] ->[%s] 耗时 = %v", nodeId, deviceId, tc)