Browse Source

环节处理改为批处理

dev
yfh 3 months ago
parent
commit
4f00694cea
  1. 161
      et_sink/sinkHandler.go

161
et_sink/sinkHandler.go

@ -25,10 +25,10 @@ type SinkHandler struct {
const defaultBatchCount = 1000 const defaultBatchCount = 1000
func NewSinkThemeHandler() *SinkHandler { func NewSinkThemeHandler(storageConsumers []storageDBs.IStorageConsumer) *SinkHandler {
the := &SinkHandler{ the := &SinkHandler{
stage: stages.NewStage("Theme 数据存储"), stage: stages.NewStage("Theme 数据存储"),
storageConsumers: storageDBs.LoadIStorageConsumer(), storageConsumers: storageConsumers,
dataQueueTheme: make([]common_models.EsTheme, 0, defaultBatchCount), dataQueueTheme: make([]common_models.EsTheme, 0, defaultBatchCount),
lock: &sync.RWMutex{}, lock: &sync.RWMutex{},
signBatch: make(chan bool, 1), signBatch: make(chan bool, 1),
@ -40,15 +40,14 @@ func NewSinkThemeHandler() *SinkHandler {
return the return the
} }
func NewSinkRawHandler() *SinkHandler { func NewSinkRawHandler(storageConsumers []storageDBs.IStorageConsumer) *SinkHandler {
esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses")
log.Printf("es addresses: %v", esAddresses)
the := &SinkHandler{ the := &SinkHandler{
stage: stages.NewStage("raws 数据存储"), stage: stages.NewStage("raws 数据存储"),
storageConsumers: storageDBs.LoadIStorageConsumer(), storageConsumers: storageConsumers,
dataQueueRaw: make([]common_models.EsRaw, 0, defaultBatchCount), dataQueueRaw: make([]common_models.EsRaw, 0, defaultBatchCount),
dataQueueVib: make([]common_models.EsVbRaw, 0, defaultBatchCount), dataQueueVib: make([]common_models.EsVbRaw, 0, defaultBatchCount),
lock: &sync.RWMutex{}, lock: &sync.RWMutex{},
signBatch: make(chan bool, 1),
batchCount: defaultBatchCount, batchCount: defaultBatchCount,
} }
go the.dumpRawBatchMonitor() go the.dumpRawBatchMonitor()
@ -60,12 +59,72 @@ func (the *SinkHandler) GetStage() stages.Stage {
return *the.stage return *the.stage
} }
func (the *SinkHandler) sinkRawData(p *common_models.ProcessData) *common_models.ProcessData { //func (the *SinkHandler) sinkRawData(p *common_models.ProcessData) *common_models.ProcessData {
go the.sinkRawDataToES(p.DeviceData) // go the.sinkRawDataToES(p.DeviceData)
return p // return p
//}
func (the *SinkHandler) sinkRawData(data []*common_models.ProcessData) []*common_models.ProcessData {
go func() {
dataQueueRaw := make([]common_models.EsRaw, 0, len(data))
dataQueueVib := make([]common_models.EsVbRaw, 0, len(data))
for _, p := range data {
deviceData := p.DeviceData
switch deviceData.DataType {
case common_models.RawTypeVib:
vibData := deviceData.GetVibrationData()
vbRaws := common_models.EsVbRaw{
StructId: deviceData.StructId,
IotaDeviceName: deviceData.Name,
Param: vibData.FormatParams(),
Data: map[string]any{"raw": vibData.Data},
CollectTime: deviceData.AcqTime.Truncate(time.Millisecond),
IotaDevice: deviceData.DeviceId,
CreateTime: time.Now().Truncate(time.Millisecond),
}
dataQueueVib = append(dataQueueVib, vbRaws)
case common_models.RawTypeDiag:
// 处理诊断数据的逻辑
default:
if deviceData.Raw == nil {
msg, _ := json.Marshal(deviceData)
log.Printf("异常空,raw数据 =%s", string(msg))
} else {
esRaws := common_models.EsRaw{
StructId: deviceData.StructId,
IotaDeviceName: deviceData.Name,
Data: deviceData.Raw,
CollectTime: deviceData.AcqTime.Truncate(time.Millisecond),
Meta: deviceData.DeviceInfo.DeviceMeta.GetOutputProps(),
IotaDevice: deviceData.DeviceId,
CreateTime: time.Now().Truncate(time.Millisecond),
}
dataQueueRaw = append(dataQueueRaw, esRaws)
}
}
}
if len(dataQueueRaw) > 0 {
count := len(dataQueueRaw)
log.Printf("es写入dataQueueRaw数据 count====> %d", count)
go the.dumpRaws(dataQueueRaw)
}
if len(the.dataQueueVib) > 0 {
log.Printf("es写入dataQueueVib数据 count====> %d", len(dataQueueVib))
go the.dumpVibRaws(dataQueueVib)
}
}()
return data
} }
func (the *SinkHandler) sinkRawDataToES(deviceData common_models.DeviceData) {
func (the *SinkHandler) sinkRawData2(data []*common_models.ProcessData) []*common_models.ProcessData {
go func() {
the.lock.Lock() the.lock.Lock()
for _, p := range data {
deviceData := p.DeviceData
switch deviceData.DataType { switch deviceData.DataType {
case common_models.RawTypeVib: case common_models.RawTypeVib:
vibData := deviceData.GetVibrationData() vibData := deviceData.GetVibrationData()
@ -81,7 +140,50 @@ func (the *SinkHandler) sinkRawDataToES(deviceData common_models.DeviceData) {
the.dataQueueVib = append(the.dataQueueVib, vbRaws) the.dataQueueVib = append(the.dataQueueVib, vbRaws)
case common_models.RawTypeDiag: case common_models.RawTypeDiag:
default: default:
if deviceData.Raw == nil {
msg, _ := json.Marshal(deviceData)
log.Printf("异常空,raw数据 =%s", string(msg))
} else {
esRaws := common_models.EsRaw{
StructId: deviceData.StructId,
IotaDeviceName: deviceData.Name,
Data: deviceData.Raw,
CollectTime: deviceData.AcqTime.Truncate(time.Millisecond),
Meta: deviceData.DeviceInfo.DeviceMeta.GetOutputProps(),
IotaDevice: deviceData.DeviceId,
CreateTime: time.Now().Truncate(time.Millisecond),
}
the.dataQueueRaw = append(the.dataQueueRaw, esRaws)
}
}
}
the.lock.Unlock()
if len(the.dataQueueRaw) >= the.batchCount || len(the.dataQueueVib) >= the.batchCount {
the.signBatch <- true
}
}()
return data
}
func (the *SinkHandler) sinkRawDataToES(deviceData common_models.DeviceData) {
the.lock.Lock()
switch deviceData.DataType {
case common_models.RawTypeVib:
vibData := deviceData.GetVibrationData()
vbRaws := common_models.EsVbRaw{
StructId: deviceData.StructId,
IotaDeviceName: deviceData.Name,
Param: vibData.FormatParams(),
Data: map[string]any{"raw": vibData.Data},
CollectTime: deviceData.AcqTime.Truncate(time.Millisecond),
IotaDevice: deviceData.DeviceId,
CreateTime: time.Now().Truncate(time.Millisecond),
}
the.dataQueueVib = append(the.dataQueueVib, vbRaws)
case common_models.RawTypeDiag:
default:
if deviceData.Raw == nil { if deviceData.Raw == nil {
msg, _ := json.Marshal(deviceData) msg, _ := json.Marshal(deviceData)
log.Printf("异常空,raw数据 =%s", string(msg)) log.Printf("异常空,raw数据 =%s", string(msg))
@ -109,7 +211,7 @@ func (the *SinkHandler) dumpRawBatchMonitor() {
select { select {
case <-the.signBatch: case <-the.signBatch:
log.Printf("批存储信号raw,监控器收到") log.Printf("批存储信号raw,监控器收到")
case <-time.After(200 * time.Millisecond): case <-time.After(500 * time.Millisecond):
} }
if len(the.dataQueueRaw) > 0 { if len(the.dataQueueRaw) > 0 {
the.lock.RLock() the.lock.RLock()
@ -172,10 +274,40 @@ func (the *SinkHandler) dumpThemeBatchMonitor() {
} }
func (the *SinkHandler) sinkThemeToES(p *common_models.ProcessData) *common_models.ProcessData { func (the *SinkHandler) sinkThemeToES(data []*common_models.ProcessData) []*common_models.ProcessData {
go the.sinkThemeData(p.Stations) go func() {
return p dataQueueTheme := make([]common_models.EsTheme, 0, len(data))
for _, p := range data {
stations := p.Stations
for _, station := range stations {
esTheme := common_models.EsTheme{
SensorName: station.Info.Name,
FactorName: station.Info.Factor.Name,
FactorProtoCode: station.Info.Proto.Code,
Data: station.Data.ThemeData,
FactorProtoName: station.Info.Proto.Name,
Factor: station.Info.FactorId,
CollectTime: station.Data.CollectTime.Truncate(time.Millisecond),
Sensor: station.Info.Id,
Structure: station.Info.StructureId,
IotaDevice: station.Info.GetDeviceIdArray(),
CreateTime: time.Now().Truncate(time.Millisecond),
}
dataQueueTheme = append(dataQueueTheme, esTheme)
}
}
if len(dataQueueTheme) > 0 {
count := len(dataQueueTheme)
log.Printf("es写入 dataQueueTheme 数据 count====> %d", count)
go the.dumpThemes(dataQueueTheme)
}
}()
return data
} }
func (the *SinkHandler) sinkThemeData(stations []common_models.Station) { func (the *SinkHandler) sinkThemeData(stations []common_models.Station) {
the.lock.Lock() the.lock.Lock()
for _, station := range stations { for _, station := range stations {
@ -223,7 +355,6 @@ func NewSinkGroupHandler() *SinkHandler {
} }
go the.dumpGroupMonitor() go the.dumpGroupMonitor()
//the.stage.AddProcess(the.sinkGroupDataToES)
return the return the
} }

Loading…
Cancel
Save