From 5a28b87a724f56b20e18f01abc6f0c0f7783aaa0 Mon Sep 17 00:00:00 2001 From: lucas Date: Tue, 24 Sep 2024 18:00:24 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E6=B7=BB=E5=8A=A0=E8=AF=BB=E5=86=99?= =?UTF-8?q?=E9=94=81=20=20=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- et_sink/sinkHandler.go | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/et_sink/sinkHandler.go b/et_sink/sinkHandler.go index c5ed2dc..066b8e6 100644 --- a/et_sink/sinkHandler.go +++ b/et_sink/sinkHandler.go @@ -7,6 +7,8 @@ import ( "gitea.anxinyun.cn/container/common_utils/storage/storageDBs" "log" "node/stages" + "slices" + "sync" "time" ) @@ -16,6 +18,7 @@ type SinkHandler struct { dataQueueRaw []common_models.EsRaw dataQueueVib []common_models.EsVbRaw dataQueueTheme []common_models.EsTheme + lock *sync.RWMutex dataQueueGroup []common_models.EsGroupTheme // 分组主题数据队列 signBatch chan bool batchCount int @@ -27,7 +30,8 @@ func NewSinkThemeHandler() *SinkHandler { the := &SinkHandler{ stage: stages.NewStage("Theme 数据存储"), storageConsumers: storageDBs.LoadIStorageConsumer(), - dataQueueTheme: make([]common_models.EsTheme, 0), + dataQueueTheme: make([]common_models.EsTheme, 0, defaultBatchCount), + lock: &sync.RWMutex{}, signBatch: make(chan bool, 1), batchCount: defaultBatchCount, } @@ -44,7 +48,7 @@ func NewSinkGroupHandler() *SinkHandler { the := &SinkHandler{ stage: stages.NewStage("EsGroupTheme 数据存储"), storageConsumers: storageDBs.LoadIStorageConsumer(), - dataQueueGroup: make([]common_models.EsGroupTheme, 0), + dataQueueGroup: make([]common_models.EsGroupTheme, 0, defaultBatchCount), batchCount: defaultBatchCount, } @@ -60,7 +64,9 @@ func NewSinkRawHandler() *SinkHandler { the := &SinkHandler{ stage: stages.NewStage("raws 数据存储"), storageConsumers: storageDBs.LoadIStorageConsumer(), - dataQueueRaw: make([]common_models.EsRaw, 0), + dataQueueRaw: make([]common_models.EsRaw, 0, defaultBatchCount), + dataQueueVib: make([]common_models.EsVbRaw, 0, defaultBatchCount), + lock: &sync.RWMutex{}, batchCount: defaultBatchCount, } go the.dumpRawBatchMonitor() @@ -90,7 +96,9 @@ func (the *SinkHandler) sinkRawDataToES(deviceData common_models.DeviceData) { IotaDevice: deviceData.DeviceId, CreateTime: time.Now().Truncate(time.Millisecond), } + the.lock.Lock() the.dataQueueVib = append(the.dataQueueVib, vbRaws) + the.lock.Unlock() case common_models.RawTypeDiag: default: @@ -107,7 +115,9 @@ func (the *SinkHandler) sinkRawDataToES(deviceData common_models.DeviceData) { IotaDevice: deviceData.DeviceId, CreateTime: time.Now().Truncate(time.Millisecond), } + the.lock.Lock() the.dataQueueRaw = append(the.dataQueueRaw, esRaws) + the.lock.Unlock() } } @@ -124,20 +134,24 @@ func (the *SinkHandler) dumpRawBatchMonitor() { case <-time.After(200 * time.Millisecond): } if len(the.dataQueueRaw) > 0 { + the.lock.RLock() count := len(the.dataQueueRaw) log.Printf("es写入dataQueueRaw数据 count====> %d", count) needDump := make([]common_models.EsRaw, count) //避免引用问题 copy(needDump, the.dataQueueRaw[:count]) the.dataQueueRaw = the.dataQueueRaw[count:] + the.lock.RUnlock() go the.dumpRaws(needDump) } if len(the.dataQueueVib) > 0 { + the.lock.RLock() count := len(the.dataQueueVib) log.Printf("es写入dataQueueVib数据 count====> %d", count) needDump := the.dataQueueVib[:count] the.dataQueueVib = the.dataQueueVib[count:] + the.lock.RUnlock() go the.dumpVibRaws(needDump) } } @@ -164,12 +178,15 @@ func (the *SinkHandler) dumpThemeBatchMonitor() { log.Printf("批存储信号Theme,监控器收到") case <-time.After(200 * time.Millisecond): } + if len(the.dataQueueTheme) > 0 { - count := len(the.dataQueueTheme) + the.lock.RLock() + count := len(the.dataQueueTheme) //todo 避免临界操作 needDump := make([]common_models.EsTheme, count) //避免引用问题 copy(needDump, the.dataQueueTheme[:count]) the.dataQueueTheme = the.dataQueueTheme[count:] + the.lock.RUnlock() go the.dumpThemes(needDump) } } @@ -198,7 +215,10 @@ func (the *SinkHandler) sinkThemeData(stations []common_models.Station) { } EsThemes = append(EsThemes, esTheme) } - the.dataQueueTheme = append(the.dataQueueTheme, EsThemes...) + the.lock.Lock() + the.dataQueueTheme = slices.Concat(the.dataQueueTheme, EsThemes) + the.lock.Unlock() + //the.dataQueueTheme = append(the.dataQueueTheme, EsThemes...) if len(the.dataQueueTheme) >= the.batchCount { the.signBatch <- true }