From 6780e88a46b5bdf8b7630dad464b1f639354127e Mon Sep 17 00:00:00 2001 From: lucas Date: Wed, 25 Sep 2024 13:06:26 +0800 Subject: [PATCH] =?UTF-8?q?update=20=20=E8=B0=83=E6=95=B4=E4=B8=BB?= =?UTF-8?q?=E9=A2=98=E6=95=B0=E6=8D=AE=E9=94=81=E4=BD=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- et_sink/sinkHandler.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/et_sink/sinkHandler.go b/et_sink/sinkHandler.go index a4bdc61..9bb336e 100644 --- a/et_sink/sinkHandler.go +++ b/et_sink/sinkHandler.go @@ -82,7 +82,7 @@ func (the *SinkHandler) sinkRawData(p *common_models.ProcessData) *common_models return p } func (the *SinkHandler) sinkRawDataToES(deviceData common_models.DeviceData) { - + the.lock.Lock() switch deviceData.DataType { case common_models.RawTypeVib: vibData := deviceData.GetVibrationData() @@ -95,9 +95,7 @@ func (the *SinkHandler) sinkRawDataToES(deviceData common_models.DeviceData) { IotaDevice: deviceData.DeviceId, CreateTime: time.Now().Truncate(time.Millisecond), } - the.lock.Lock() the.dataQueueVib = append(the.dataQueueVib, vbRaws) - the.lock.Unlock() case common_models.RawTypeDiag: default: @@ -114,12 +112,10 @@ func (the *SinkHandler) sinkRawDataToES(deviceData common_models.DeviceData) { IotaDevice: deviceData.DeviceId, CreateTime: time.Now().Truncate(time.Millisecond), } - the.lock.Lock() the.dataQueueRaw = append(the.dataQueueRaw, esRaws) - the.lock.Unlock() } } - + the.lock.Unlock() if len(the.dataQueueRaw) >= the.batchCount || len(the.dataQueueVib) >= the.batchCount { the.signBatch <- true } @@ -198,6 +194,7 @@ func (the *SinkHandler) sinkThemeToES(p *common_models.ProcessData) *common_mode return p } func (the *SinkHandler) sinkThemeData(stations []common_models.Station) { + the.lock.Lock() for _, station := range stations { esTheme := common_models.EsTheme{ SensorName: station.Info.Name, @@ -212,10 +209,11 @@ func (the *SinkHandler) sinkThemeData(stations []common_models.Station) { IotaDevice: station.Info.GetDeviceIdArray(), CreateTime: time.Now().Truncate(time.Millisecond), } - the.lock.Lock() + the.dataQueueTheme = append(the.dataQueueTheme, esTheme) - the.lock.Unlock() + } + the.lock.Unlock() if len(the.dataQueueTheme) >= the.batchCount { the.signBatch <- true }