Browse Source

update 调整主题数据锁位置

dev
lucas 1 month ago
parent
commit
6780e88a46
  1. 14
      et_sink/sinkHandler.go

14
et_sink/sinkHandler.go

@ -82,7 +82,7 @@ func (the *SinkHandler) sinkRawData(p *common_models.ProcessData) *common_models
return p return p
} }
func (the *SinkHandler) sinkRawDataToES(deviceData common_models.DeviceData) { func (the *SinkHandler) sinkRawDataToES(deviceData common_models.DeviceData) {
the.lock.Lock()
switch deviceData.DataType { switch deviceData.DataType {
case common_models.RawTypeVib: case common_models.RawTypeVib:
vibData := deviceData.GetVibrationData() vibData := deviceData.GetVibrationData()
@ -95,9 +95,7 @@ func (the *SinkHandler) sinkRawDataToES(deviceData common_models.DeviceData) {
IotaDevice: deviceData.DeviceId, IotaDevice: deviceData.DeviceId,
CreateTime: time.Now().Truncate(time.Millisecond), CreateTime: time.Now().Truncate(time.Millisecond),
} }
the.lock.Lock()
the.dataQueueVib = append(the.dataQueueVib, vbRaws) the.dataQueueVib = append(the.dataQueueVib, vbRaws)
the.lock.Unlock()
case common_models.RawTypeDiag: case common_models.RawTypeDiag:
default: default:
@ -114,12 +112,10 @@ func (the *SinkHandler) sinkRawDataToES(deviceData common_models.DeviceData) {
IotaDevice: deviceData.DeviceId, IotaDevice: deviceData.DeviceId,
CreateTime: time.Now().Truncate(time.Millisecond), CreateTime: time.Now().Truncate(time.Millisecond),
} }
the.lock.Lock()
the.dataQueueRaw = append(the.dataQueueRaw, esRaws) the.dataQueueRaw = append(the.dataQueueRaw, esRaws)
the.lock.Unlock()
} }
} }
the.lock.Unlock()
if len(the.dataQueueRaw) >= the.batchCount || len(the.dataQueueVib) >= the.batchCount { if len(the.dataQueueRaw) >= the.batchCount || len(the.dataQueueVib) >= the.batchCount {
the.signBatch <- true the.signBatch <- true
} }
@ -198,6 +194,7 @@ func (the *SinkHandler) sinkThemeToES(p *common_models.ProcessData) *common_mode
return p return p
} }
func (the *SinkHandler) sinkThemeData(stations []common_models.Station) { func (the *SinkHandler) sinkThemeData(stations []common_models.Station) {
the.lock.Lock()
for _, station := range stations { for _, station := range stations {
esTheme := common_models.EsTheme{ esTheme := common_models.EsTheme{
SensorName: station.Info.Name, SensorName: station.Info.Name,
@ -212,10 +209,11 @@ func (the *SinkHandler) sinkThemeData(stations []common_models.Station) {
IotaDevice: station.Info.GetDeviceIdArray(), IotaDevice: station.Info.GetDeviceIdArray(),
CreateTime: time.Now().Truncate(time.Millisecond), CreateTime: time.Now().Truncate(time.Millisecond),
} }
the.lock.Lock()
the.dataQueueTheme = append(the.dataQueueTheme, esTheme) the.dataQueueTheme = append(the.dataQueueTheme, esTheme)
the.lock.Unlock()
} }
the.lock.Unlock()
if len(the.dataQueueTheme) >= the.batchCount { if len(the.dataQueueTheme) >= the.batchCount {
the.signBatch <- true the.signBatch <- true
} }

Loading…
Cancel
Save