Browse Source

分组ES存储

dev
yfh 1 month ago
parent
commit
719928dce6
  1. 81
      et_sink/sinkHandler.go

81
et_sink/sinkHandler.go

@ -40,23 +40,6 @@ func NewSinkThemeHandler() *SinkHandler {
return the
}
func NewSinkGroupHandler() *SinkHandler {
esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses")
log.Printf("es addresses: %v", esAddresses)
the := &SinkHandler{
stage: stages.NewStage("EsGroupTheme 数据存储"),
storageConsumers: storageDBs.LoadIStorageConsumer(),
dataQueueGroup: make([]common_models.EsGroupTheme, 0, defaultBatchCount),
batchCount: defaultBatchCount,
}
go the.onGroupData()
the.stage.AddProcess(the.sinkGroupDataToES)
return the
}
func NewSinkRawHandler() *SinkHandler {
esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses")
log.Printf("es addresses: %v", esAddresses)
@ -226,52 +209,66 @@ func (the *SinkHandler) dumpThemes(esThemes []common_models.EsTheme) {
}
// ******************* 分组主题数据存储 *********************
// onGroupData 监听分组主题数据
func (the *SinkHandler) onGroupData() {
func NewSinkGroupHandler() *SinkHandler {
esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses")
log.Printf("es addresses: %v", esAddresses)
the := &SinkHandler{
stage: stages.NewStage("EsGroupTheme 数据存储"),
storageConsumers: storageDBs.LoadIStorageConsumer(),
dataQueueGroup: make([]common_models.EsGroupTheme, 0, defaultBatchCount),
lock: &sync.RWMutex{},
signBatch: make(chan bool, 1),
batchCount: defaultBatchCount,
}
go the.dumpGroupMonitor()
//the.stage.AddProcess(the.sinkGroupDataToES)
return the
}
// dumpGroupMonitor 监听分组主题数据
func (the *SinkHandler) dumpGroupMonitor() {
for {
select {
case <-the.signBatch:
log.Printf("【dumpGroupMonitor】收到批存储信号。")
case <-time.After(200 * time.Millisecond):
}
if len(the.dataQueueGroup) > 0 {
the.lock.RLock()
count := len(the.dataQueueGroup)
log.Printf("es写入 dataQueueGroup 数据 count====> %d", count)
//needDump := make([]common_models.EsGroupTheme, count)
////copy(needDump, the.dataQueueGroup[:count])
needDump := the.dataQueueGroup[:count]
the.dataQueueGroup = the.dataQueueGroup[count:]
the.dumpGroupThemes(needDump)
the.lock.RUnlock()
go the.dumpGroupThemes(needDump)
}
}
}
func (the *SinkHandler) sinkGroupDataToES(p *common_models.ProcessData) *common_models.ProcessData {
go the.sinkGroupData(p.Stations)
return p
func (the *SinkHandler) SinkGroupDataToES(groupTheme common_models.EsGroupTheme) {
go the.sinkGroupData(groupTheme)
}
func (the *SinkHandler) sinkGroupData(stations []common_models.Station) {
var EsThemes []common_models.EsGroupTheme
for _, station := range stations {
esTheme := common_models.EsGroupTheme{
Structure: station.Info.StructureId,
GroupId: station.Info.Group.Id,
GroupName: station.Info.Group.Name,
Factor: station.Info.FactorId,
FactorName: station.Info.Factor.Name,
FactorProtoCode: station.Info.Proto.Code,
FactorProtoName: station.Info.Proto.Name,
Data: station.Data.PyhData, // 分组下所有测点的主题数据
CollectTime: station.Data.CollectTime.Truncate(time.Millisecond),
CreateTime: time.Now().Truncate(time.Millisecond),
}
EsThemes = append(EsThemes, esTheme)
}
the.dataQueueGroup = append(the.dataQueueGroup, EsThemes...)
func (the *SinkHandler) sinkGroupData(groupTheme common_models.EsGroupTheme) {
the.lock.Lock()
the.dataQueueGroup = append(the.dataQueueGroup, groupTheme)
the.lock.Unlock()
if len(the.dataQueueGroup) >= the.batchCount {
the.signBatch <- true
}
}
func (the *SinkHandler) dumpGroupThemes(esThemes []common_models.EsGroupTheme) {
for _, consumer := range the.storageConsumers {
for _, c := range the.storageConsumers {
if consumer, ok := c.(*storageDBs.Storage2Es); ok {
consumer.SaveGroupTheme(esThemes)
log.Printf("[consumer-Storage2Es]存储 GroupTheme 数据 %d 条", len(esThemes))
}
}
}

Loading…
Cancel
Save