From 4f00694cea96899764b18f7cb653652d565755f0 Mon Sep 17 00:00:00 2001 From: yfh Date: Mon, 24 Feb 2025 22:17:21 +0800 Subject: [PATCH] =?UTF-8?q?=E7=8E=AF=E8=8A=82=E5=A4=84=E7=90=86=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E6=89=B9=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- et_sink/sinkHandler.go | 161 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 146 insertions(+), 15 deletions(-) diff --git a/et_sink/sinkHandler.go b/et_sink/sinkHandler.go index bfc716c..fa3608b 100644 --- a/et_sink/sinkHandler.go +++ b/et_sink/sinkHandler.go @@ -25,10 +25,10 @@ type SinkHandler struct { const defaultBatchCount = 1000 -func NewSinkThemeHandler() *SinkHandler { +func NewSinkThemeHandler(storageConsumers []storageDBs.IStorageConsumer) *SinkHandler { the := &SinkHandler{ stage: stages.NewStage("Theme 数据存储"), - storageConsumers: storageDBs.LoadIStorageConsumer(), + storageConsumers: storageConsumers, dataQueueTheme: make([]common_models.EsTheme, 0, defaultBatchCount), lock: &sync.RWMutex{}, signBatch: make(chan bool, 1), @@ -40,15 +40,14 @@ func NewSinkThemeHandler() *SinkHandler { return the } -func NewSinkRawHandler() *SinkHandler { - esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses") - log.Printf("es addresses: %v", esAddresses) +func NewSinkRawHandler(storageConsumers []storageDBs.IStorageConsumer) *SinkHandler { the := &SinkHandler{ stage: stages.NewStage("raws 数据存储"), - storageConsumers: storageDBs.LoadIStorageConsumer(), + storageConsumers: storageConsumers, dataQueueRaw: make([]common_models.EsRaw, 0, defaultBatchCount), dataQueueVib: make([]common_models.EsVbRaw, 0, defaultBatchCount), lock: &sync.RWMutex{}, + signBatch: make(chan bool, 1), batchCount: defaultBatchCount, } go the.dumpRawBatchMonitor() @@ -60,10 +59,114 @@ func (the *SinkHandler) GetStage() stages.Stage { return *the.stage } -func (the *SinkHandler) sinkRawData(p *common_models.ProcessData) *common_models.ProcessData { - go the.sinkRawDataToES(p.DeviceData) - return p +//func (the *SinkHandler) sinkRawData(p *common_models.ProcessData) *common_models.ProcessData { +// go the.sinkRawDataToES(p.DeviceData) +// return p +//} + +func (the *SinkHandler) sinkRawData(data []*common_models.ProcessData) []*common_models.ProcessData { + go func() { + dataQueueRaw := make([]common_models.EsRaw, 0, len(data)) + dataQueueVib := make([]common_models.EsVbRaw, 0, len(data)) + + for _, p := range data { + deviceData := p.DeviceData + switch deviceData.DataType { + case common_models.RawTypeVib: + vibData := deviceData.GetVibrationData() + vbRaws := common_models.EsVbRaw{ + StructId: deviceData.StructId, + IotaDeviceName: deviceData.Name, + Param: vibData.FormatParams(), + Data: map[string]any{"raw": vibData.Data}, + CollectTime: deviceData.AcqTime.Truncate(time.Millisecond), + IotaDevice: deviceData.DeviceId, + CreateTime: time.Now().Truncate(time.Millisecond), + } + dataQueueVib = append(dataQueueVib, vbRaws) + case common_models.RawTypeDiag: + // 处理诊断数据的逻辑 + default: + if deviceData.Raw == nil { + msg, _ := json.Marshal(deviceData) + log.Printf("异常空,raw数据 =%s", string(msg)) + } else { + esRaws := common_models.EsRaw{ + StructId: deviceData.StructId, + IotaDeviceName: deviceData.Name, + Data: deviceData.Raw, + CollectTime: deviceData.AcqTime.Truncate(time.Millisecond), + Meta: deviceData.DeviceInfo.DeviceMeta.GetOutputProps(), + IotaDevice: deviceData.DeviceId, + CreateTime: time.Now().Truncate(time.Millisecond), + } + dataQueueRaw = append(dataQueueRaw, esRaws) + } + } + } + + if len(dataQueueRaw) > 0 { + count := len(dataQueueRaw) + log.Printf("es写入dataQueueRaw数据 count====> %d", count) + go the.dumpRaws(dataQueueRaw) + } + + if len(the.dataQueueVib) > 0 { + log.Printf("es写入dataQueueVib数据 count====> %d", len(dataQueueVib)) + go the.dumpVibRaws(dataQueueVib) + } + }() + + return data } + +func (the *SinkHandler) sinkRawData2(data []*common_models.ProcessData) []*common_models.ProcessData { + go func() { + the.lock.Lock() + for _, p := range data { + deviceData := p.DeviceData + switch deviceData.DataType { + case common_models.RawTypeVib: + vibData := deviceData.GetVibrationData() + vbRaws := common_models.EsVbRaw{ + StructId: deviceData.StructId, + IotaDeviceName: deviceData.Name, + Param: vibData.FormatParams(), + Data: map[string]any{"raw": vibData.Data}, + CollectTime: deviceData.AcqTime.Truncate(time.Millisecond), + IotaDevice: deviceData.DeviceId, + CreateTime: time.Now().Truncate(time.Millisecond), + } + the.dataQueueVib = append(the.dataQueueVib, vbRaws) + case common_models.RawTypeDiag: + default: + if deviceData.Raw == nil { + msg, _ := json.Marshal(deviceData) + log.Printf("异常空,raw数据 =%s", string(msg)) + } else { + esRaws := common_models.EsRaw{ + StructId: deviceData.StructId, + IotaDeviceName: deviceData.Name, + Data: deviceData.Raw, + CollectTime: deviceData.AcqTime.Truncate(time.Millisecond), + Meta: deviceData.DeviceInfo.DeviceMeta.GetOutputProps(), + IotaDevice: deviceData.DeviceId, + CreateTime: time.Now().Truncate(time.Millisecond), + } + the.dataQueueRaw = append(the.dataQueueRaw, esRaws) + } + } + } + + the.lock.Unlock() + if len(the.dataQueueRaw) >= the.batchCount || len(the.dataQueueVib) >= the.batchCount { + the.signBatch <- true + } + }() + + return data +} + func (the *SinkHandler) sinkRawDataToES(deviceData common_models.DeviceData) { the.lock.Lock() switch deviceData.DataType { @@ -81,7 +184,6 @@ func (the *SinkHandler) sinkRawDataToES(deviceData common_models.DeviceData) { the.dataQueueVib = append(the.dataQueueVib, vbRaws) case common_models.RawTypeDiag: default: - if deviceData.Raw == nil { msg, _ := json.Marshal(deviceData) log.Printf("异常空,raw数据 =%s", string(msg)) @@ -109,7 +211,7 @@ func (the *SinkHandler) dumpRawBatchMonitor() { select { case <-the.signBatch: log.Printf("批存储信号raw,监控器收到") - case <-time.After(200 * time.Millisecond): + case <-time.After(500 * time.Millisecond): } if len(the.dataQueueRaw) > 0 { the.lock.RLock() @@ -172,10 +274,40 @@ func (the *SinkHandler) dumpThemeBatchMonitor() { } -func (the *SinkHandler) sinkThemeToES(p *common_models.ProcessData) *common_models.ProcessData { - go the.sinkThemeData(p.Stations) - return p +func (the *SinkHandler) sinkThemeToES(data []*common_models.ProcessData) []*common_models.ProcessData { + go func() { + dataQueueTheme := make([]common_models.EsTheme, 0, len(data)) + for _, p := range data { + stations := p.Stations + for _, station := range stations { + esTheme := common_models.EsTheme{ + SensorName: station.Info.Name, + FactorName: station.Info.Factor.Name, + FactorProtoCode: station.Info.Proto.Code, + Data: station.Data.ThemeData, + FactorProtoName: station.Info.Proto.Name, + Factor: station.Info.FactorId, + CollectTime: station.Data.CollectTime.Truncate(time.Millisecond), + Sensor: station.Info.Id, + Structure: station.Info.StructureId, + IotaDevice: station.Info.GetDeviceIdArray(), + CreateTime: time.Now().Truncate(time.Millisecond), + } + + dataQueueTheme = append(dataQueueTheme, esTheme) + } + } + + if len(dataQueueTheme) > 0 { + count := len(dataQueueTheme) + log.Printf("es写入 dataQueueTheme 数据 count====> %d", count) + go the.dumpThemes(dataQueueTheme) + } + }() + + return data } + func (the *SinkHandler) sinkThemeData(stations []common_models.Station) { the.lock.Lock() for _, station := range stations { @@ -223,7 +355,6 @@ func NewSinkGroupHandler() *SinkHandler { } go the.dumpGroupMonitor() - //the.stage.AddProcess(the.sinkGroupDataToES) return the }