Browse Source

update

1. 添加 log 日志纪录
2.添加stage 超时 check
3.调整部分日志位置
dev
lucas 1 month ago
parent
commit
695923a540
  1. 1
      et_Info/go.mod
  2. 15
      et_sink/sinkHandler.go
  3. 12
      node/app/app.go
  4. 9
      node/go.mod
  5. 16
      node/stages/stage.go

1
et_Info/go.mod

@ -22,7 +22,6 @@ require (
github.com/gorilla/websocket v1.5.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect github.com/magiconair/properties v1.8.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect

15
et_sink/sinkHandler.go

@ -7,7 +7,6 @@ import (
"gitea.anxinyun.cn/container/common_utils/storage/storageDBs" "gitea.anxinyun.cn/container/common_utils/storage/storageDBs"
"log" "log"
"node/stages" "node/stages"
"slices"
"sync" "sync"
"time" "time"
) )
@ -181,7 +180,8 @@ func (the *SinkHandler) dumpThemeBatchMonitor() {
if len(the.dataQueueTheme) > 0 { if len(the.dataQueueTheme) > 0 {
the.lock.RLock() the.lock.RLock()
count := len(the.dataQueueTheme) //todo 避免临界操作 count := len(the.dataQueueTheme)
log.Printf("es写入dataQueueTheme数据 count====> %d", count)
needDump := make([]common_models.EsTheme, count) needDump := make([]common_models.EsTheme, count)
//避免引用问题 //避免引用问题
copy(needDump, the.dataQueueTheme[:count]) copy(needDump, the.dataQueueTheme[:count])
@ -198,7 +198,6 @@ func (the *SinkHandler) sinkThemeToES(p *common_models.ProcessData) *common_mode
return p return p
} }
func (the *SinkHandler) sinkThemeData(stations []common_models.Station) { func (the *SinkHandler) sinkThemeData(stations []common_models.Station) {
var EsThemes []common_models.EsTheme
for _, station := range stations { for _, station := range stations {
esTheme := common_models.EsTheme{ esTheme := common_models.EsTheme{
SensorName: station.Info.Name, SensorName: station.Info.Name,
@ -213,20 +212,18 @@ func (the *SinkHandler) sinkThemeData(stations []common_models.Station) {
IotaDevice: station.Info.GetDeviceIdArray(), IotaDevice: station.Info.GetDeviceIdArray(),
CreateTime: time.Now().Truncate(time.Millisecond), CreateTime: time.Now().Truncate(time.Millisecond),
} }
EsThemes = append(EsThemes, esTheme) the.lock.Lock()
the.dataQueueTheme = append(the.dataQueueTheme, esTheme)
the.lock.Unlock()
} }
the.lock.Lock()
the.dataQueueTheme = slices.Concat(the.dataQueueTheme, EsThemes)
the.lock.Unlock()
//the.dataQueueTheme = append(the.dataQueueTheme, EsThemes...)
if len(the.dataQueueTheme) >= the.batchCount { if len(the.dataQueueTheme) >= the.batchCount {
the.signBatch <- true the.signBatch <- true
} }
} }
func (the *SinkHandler) dumpThemes(esThemes []common_models.EsTheme) { func (the *SinkHandler) dumpThemes(esThemes []common_models.EsTheme) {
for i, consumer := range the.storageConsumers { for i, consumer := range the.storageConsumers {
log.Printf("[consumer-%d]存储Theme数据 %d 条", i, len(esThemes))
consumer.SaveTheme(esThemes) consumer.SaveTheme(esThemes)
log.Printf("[consumer-%d]存储Theme数据 %d 条", i, len(esThemes))
} }
} }

12
node/app/app.go

@ -11,15 +11,27 @@ import (
"et_sink" "et_sink"
"fmt" "fmt"
"gitea.anxinyun.cn/container/common_utils/configLoad" "gitea.anxinyun.cn/container/common_utils/configLoad"
"gopkg.in/natefinch/lumberjack.v2"
"io"
"log" "log"
"net" "net"
"net/rpc" "net/rpc"
"node/stages" "node/stages"
"os"
"time" "time"
) )
func init() { func init() {
multiWriter := io.MultiWriter(os.Stdout, &lumberjack.Logger{
Filename: "./logs/logInfo.log",
MaxSize: 10, // megabytes
MaxBackups: 20,
MaxAge: 30, //days
//Compress: true,
})
log.SetFlags(log.LstdFlags | log.Lshortfile | log.Lmicroseconds) log.SetFlags(log.LstdFlags | log.Lshortfile | log.Lmicroseconds)
log.SetOutput(multiWriter)
log.Println("=================log start=================")
} }
func Start() { func Start() {

9
node/go.mod

@ -6,6 +6,7 @@ require (
gitea.anxinyun.cn/container/common_models v0.0.7 gitea.anxinyun.cn/container/common_models v0.0.7
gitea.anxinyun.cn/container/common_utils v0.0.7 gitea.anxinyun.cn/container/common_utils v0.0.7
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
) )
require ( require (
@ -14,11 +15,7 @@ require (
github.com/allegro/bigcache v1.2.1 // indirect github.com/allegro/bigcache v1.2.1 // indirect
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.12.2 // indirect
github.com/bytedance/sonic/loader v0.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect github.com/eapache/go-resiliency v1.6.0 // indirect
@ -46,8 +43,8 @@ require (
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.17.7 // indirect github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/magiconair/properties v1.8.7 // indirect github.com/magiconair/properties v1.8.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/oapi-codegen/runtime v1.0.0 // indirect github.com/oapi-codegen/runtime v1.0.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect
@ -66,10 +63,8 @@ require (
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.18.2 // indirect github.com/spf13/viper v1.18.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
go.uber.org/atomic v1.9.0 // indirect go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect
golang.org/x/arch v0.4.0 // indirect
golang.org/x/crypto v0.19.0 // indirect golang.org/x/crypto v0.19.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.21.0 // indirect golang.org/x/net v0.21.0 // indirect

16
node/stages/stage.go

@ -12,6 +12,7 @@ type Stage struct {
In <-chan *common_models.ProcessData In <-chan *common_models.ProcessData
processFuncs []func(*common_models.ProcessData) *common_models.ProcessData processFuncs []func(*common_models.ProcessData) *common_models.ProcessData
Out chan *common_models.ProcessData Out chan *common_models.ProcessData
execOver chan bool //阶段执行完毕,用以排查超时
} }
func NewStage(name string) *Stage { func NewStage(name string) *Stage {
@ -20,6 +21,7 @@ func NewStage(name string) *Stage {
processFuncs: make([]func(*common_models.ProcessData) *common_models.ProcessData, 0), processFuncs: make([]func(*common_models.ProcessData) *common_models.ProcessData, 0),
In: make(<-chan *common_models.ProcessData, 1), In: make(<-chan *common_models.ProcessData, 1),
Out: make(chan *common_models.ProcessData, 1), Out: make(chan *common_models.ProcessData, 1),
execOver: make(chan bool, 1),
} }
} }
@ -45,18 +47,28 @@ func (s *Stage) AddProcess(fun func(*common_models.ProcessData) *common_models.P
} }
func (s *Stage) process(data *common_models.ProcessData) *common_models.ProcessData { func (s *Stage) process(data *common_models.ProcessData) *common_models.ProcessData {
go s.handlerTimeOutCheck(s.Name, data.DeviceData.DeviceId)
for _, processFunc := range s.processFuncs { for _, processFunc := range s.processFuncs {
//tag := fmt.Sprintf("%d/%d", i+1, len(s.processFuncs)) //tag := fmt.Sprintf("%d/%d", i+1, len(s.processFuncs))
//log.Printf("stage[%s][%s]流程处理 start=> %s", s.Name, tag, data.DeviceData.Name) //log.Printf("stage[%s]流程处理 start=> %s", s.Name, data.DeviceData.Name)
func() { func() {
defer timeCost(s.Name, data.DeviceData.DeviceId, time.Now()) defer timeCost(s.Name, data.DeviceData.DeviceId, time.Now())
data = processFunc(data) data = processFunc(data)
}() }()
//log.Printf("stage[%s]流程处理 over=> %s", s.Name, data.DeviceData.Name) //log.Printf("stage[%s]流程处理 over=> %s", s.Name, data.DeviceData.Name)
} }
s.execOver <- true
return data return data
} }
func (s *Stage) handlerTimeOutCheck(stageName, deviceId string) {
defaultTimeout := 10 * time.Second
select {
case <-s.execOver:
case <-time.After(defaultTimeout):
log.Printf("=====================")
log.Panicf("stage[%s] ->[%s] 流程处理,超时[%v],请排查", stageName, deviceId, defaultTimeout)
}
}
func timeCost(nodeId, deviceId string, start time.Time) { func timeCost(nodeId, deviceId string, start time.Time) {
tc := time.Since(start) tc := time.Since(start)
log.Printf("stage[%s] ->[%s] 耗时 = %v", nodeId, deviceId, tc) log.Printf("stage[%s] ->[%s] 耗时 = %v", nodeId, deviceId, tc)

Loading…
Cancel
Save