From 719928dce6369141b4fe8bbd7249f74755327e3b Mon Sep 17 00:00:00 2001 From: yfh Date: Fri, 27 Sep 2024 17:17:37 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=86=E7=BB=84ES=E5=AD=98=E5=82=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- et_sink/sinkHandler.go | 83 ++++++++++++++++++++---------------------- 1 file changed, 40 insertions(+), 43 deletions(-) diff --git a/et_sink/sinkHandler.go b/et_sink/sinkHandler.go index 9bb336e..bfc716c 100644 --- a/et_sink/sinkHandler.go +++ b/et_sink/sinkHandler.go @@ -40,23 +40,6 @@ func NewSinkThemeHandler() *SinkHandler { return the } -func NewSinkGroupHandler() *SinkHandler { - esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses") - log.Printf("es addresses: %v", esAddresses) - - the := &SinkHandler{ - stage: stages.NewStage("EsGroupTheme 数据存储"), - storageConsumers: storageDBs.LoadIStorageConsumer(), - dataQueueGroup: make([]common_models.EsGroupTheme, 0, defaultBatchCount), - batchCount: defaultBatchCount, - } - - go the.onGroupData() - the.stage.AddProcess(the.sinkGroupDataToES) - - return the -} - func NewSinkRawHandler() *SinkHandler { esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses") log.Printf("es addresses: %v", esAddresses) @@ -226,52 +209,66 @@ func (the *SinkHandler) dumpThemes(esThemes []common_models.EsTheme) { } // ******************* 分组主题数据存储 ********************* -// onGroupData 监听分组主题数据 -func (the *SinkHandler) onGroupData() { +func NewSinkGroupHandler() *SinkHandler { + esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses") + log.Printf("es addresses: %v", esAddresses) + + the := &SinkHandler{ + stage: stages.NewStage("EsGroupTheme 数据存储"), + storageConsumers: storageDBs.LoadIStorageConsumer(), + dataQueueGroup: make([]common_models.EsGroupTheme, 0, defaultBatchCount), + lock: &sync.RWMutex{}, + signBatch: make(chan bool, 1), + batchCount: defaultBatchCount, + } + + go the.dumpGroupMonitor() + //the.stage.AddProcess(the.sinkGroupDataToES) + return the +} + +// dumpGroupMonitor 监听分组主题数据 +func (the *SinkHandler) dumpGroupMonitor() { for { select { case <-the.signBatch: + log.Printf("【dumpGroupMonitor】收到批存储信号。") case <-time.After(200 * time.Millisecond): } if len(the.dataQueueGroup) > 0 { + the.lock.RLock() count := len(the.dataQueueGroup) + log.Printf("es写入 dataQueueGroup 数据 count====> %d", count) + //needDump := make([]common_models.EsGroupTheme, count) + ////copy(needDump, the.dataQueueGroup[:count]) needDump := the.dataQueueGroup[:count] the.dataQueueGroup = the.dataQueueGroup[count:] - the.dumpGroupThemes(needDump) + the.lock.RUnlock() + + go the.dumpGroupThemes(needDump) } } } -func (the *SinkHandler) sinkGroupDataToES(p *common_models.ProcessData) *common_models.ProcessData { - go the.sinkGroupData(p.Stations) - return p +func (the *SinkHandler) SinkGroupDataToES(groupTheme common_models.EsGroupTheme) { + go the.sinkGroupData(groupTheme) } -func (the *SinkHandler) sinkGroupData(stations []common_models.Station) { - var EsThemes []common_models.EsGroupTheme - for _, station := range stations { - esTheme := common_models.EsGroupTheme{ - Structure: station.Info.StructureId, - GroupId: station.Info.Group.Id, - GroupName: station.Info.Group.Name, - Factor: station.Info.FactorId, - FactorName: station.Info.Factor.Name, - FactorProtoCode: station.Info.Proto.Code, - FactorProtoName: station.Info.Proto.Name, - Data: station.Data.PyhData, // 分组下所有测点的主题数据 - CollectTime: station.Data.CollectTime.Truncate(time.Millisecond), - CreateTime: time.Now().Truncate(time.Millisecond), - } - EsThemes = append(EsThemes, esTheme) - } - the.dataQueueGroup = append(the.dataQueueGroup, EsThemes...) +func (the *SinkHandler) sinkGroupData(groupTheme common_models.EsGroupTheme) { + the.lock.Lock() + the.dataQueueGroup = append(the.dataQueueGroup, groupTheme) + the.lock.Unlock() + if len(the.dataQueueGroup) >= the.batchCount { the.signBatch <- true } } func (the *SinkHandler) dumpGroupThemes(esThemes []common_models.EsGroupTheme) { - for _, consumer := range the.storageConsumers { - consumer.SaveGroupTheme(esThemes) + for _, c := range the.storageConsumers { + if consumer, ok := c.(*storageDBs.Storage2Es); ok { + consumer.SaveGroupTheme(esThemes) + log.Printf("[consumer-Storage2Es]存储 GroupTheme 数据 %d 条", len(esThemes)) + } } }