package et_sink import ( "encoding/json" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils/configLoad" "gitea.anxinyun.cn/container/common_utils/storage/storageDBs" "log" "node/stages" "sync" "time" ) type SinkHandler struct { stage *stages.Stage storageConsumers []storageDBs.IStorageConsumer dataQueueRaw []common_models.EsRaw dataQueueVib []common_models.EsVbRaw dataQueueTheme []common_models.EsTheme lock *sync.RWMutex dataQueueGroup []common_models.EsGroupTheme // 分组主题数据队列 signBatch chan bool batchCount int } const defaultBatchCount = 1000 func NewSinkThemeHandler() *SinkHandler { the := &SinkHandler{ stage: stages.NewStage("Theme 数据存储"), storageConsumers: storageDBs.LoadIStorageConsumer(), dataQueueTheme: make([]common_models.EsTheme, 0, defaultBatchCount), lock: &sync.RWMutex{}, signBatch: make(chan bool, 1), batchCount: defaultBatchCount, } go the.dumpThemeBatchMonitor() the.stage.AddProcess(the.sinkThemeToES) return the } func NewSinkGroupHandler() *SinkHandler { esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses") log.Printf("es addresses: %v", esAddresses) the := &SinkHandler{ stage: stages.NewStage("EsGroupTheme 数据存储"), storageConsumers: storageDBs.LoadIStorageConsumer(), dataQueueGroup: make([]common_models.EsGroupTheme, 0, defaultBatchCount), batchCount: defaultBatchCount, } go the.onGroupData() the.stage.AddProcess(the.sinkGroupDataToES) return the } func NewSinkRawHandler() *SinkHandler { esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses") log.Printf("es addresses: %v", esAddresses) the := &SinkHandler{ stage: stages.NewStage("raws 数据存储"), storageConsumers: storageDBs.LoadIStorageConsumer(), dataQueueRaw: make([]common_models.EsRaw, 0, defaultBatchCount), dataQueueVib: make([]common_models.EsVbRaw, 0, defaultBatchCount), lock: &sync.RWMutex{}, batchCount: defaultBatchCount, } go the.dumpRawBatchMonitor() the.stage.AddProcess(the.sinkRawData) return the } func (the *SinkHandler) GetStage() stages.Stage { return *the.stage } func (the *SinkHandler) sinkRawData(p *common_models.ProcessData) *common_models.ProcessData { go the.sinkRawDataToES(p.DeviceData) return p } func (the *SinkHandler) sinkRawDataToES(deviceData common_models.DeviceData) { the.lock.Lock() switch deviceData.DataType { case common_models.RawTypeVib: vibData := deviceData.GetVibrationData() vbRaws := common_models.EsVbRaw{ StructId: deviceData.StructId, IotaDeviceName: deviceData.Name, Param: vibData.FormatParams(), Data: map[string]any{"raw": vibData.Data}, CollectTime: deviceData.AcqTime.Truncate(time.Millisecond), IotaDevice: deviceData.DeviceId, CreateTime: time.Now().Truncate(time.Millisecond), } the.dataQueueVib = append(the.dataQueueVib, vbRaws) case common_models.RawTypeDiag: default: if deviceData.Raw == nil { msg, _ := json.Marshal(deviceData) log.Printf("异常空,raw数据 =%s", string(msg)) } else { esRaws := common_models.EsRaw{ StructId: deviceData.StructId, IotaDeviceName: deviceData.Name, Data: deviceData.Raw, CollectTime: deviceData.AcqTime.Truncate(time.Millisecond), Meta: deviceData.DeviceInfo.DeviceMeta.GetOutputProps(), IotaDevice: deviceData.DeviceId, CreateTime: time.Now().Truncate(time.Millisecond), } the.dataQueueRaw = append(the.dataQueueRaw, esRaws) } } the.lock.Unlock() if len(the.dataQueueRaw) >= the.batchCount || len(the.dataQueueVib) >= the.batchCount { the.signBatch <- true } } func (the *SinkHandler) dumpRawBatchMonitor() { for { select { case <-the.signBatch: log.Printf("批存储信号raw,监控器收到") case <-time.After(200 * time.Millisecond): } if len(the.dataQueueRaw) > 0 { the.lock.RLock() count := len(the.dataQueueRaw) log.Printf("es写入dataQueueRaw数据 count====> %d", count) needDump := make([]common_models.EsRaw, count) //避免引用问题 copy(needDump, the.dataQueueRaw[:count]) the.dataQueueRaw = the.dataQueueRaw[count:] the.lock.RUnlock() go the.dumpRaws(needDump) } if len(the.dataQueueVib) > 0 { the.lock.RLock() count := len(the.dataQueueVib) log.Printf("es写入dataQueueVib数据 count====> %d", count) needDump := the.dataQueueVib[:count] the.dataQueueVib = the.dataQueueVib[count:] the.lock.RUnlock() go the.dumpVibRaws(needDump) } } } func (the *SinkHandler) dumpRaws(esRaws []common_models.EsRaw) { for i, consumer := range the.storageConsumers { log.Printf("[consumer-%d]存储raw数据 %d 条", i, len(esRaws)) consumer.SaveRaw(esRaws) } } func (the *SinkHandler) dumpVibRaws(esVbRaws []common_models.EsVbRaw) { for i, consumer := range the.storageConsumers { log.Printf("[consumer-%d]存储VbRaw数据 %d 条", i, len(esVbRaws)) consumer.SaveVib(esVbRaws) } } func (the *SinkHandler) dumpThemeBatchMonitor() { for { select { case <-the.signBatch: log.Printf("批存储信号Theme,监控器收到") case <-time.After(200 * time.Millisecond): } if len(the.dataQueueTheme) > 0 { the.lock.RLock() count := len(the.dataQueueTheme) log.Printf("es写入dataQueueTheme数据 count====> %d", count) needDump := make([]common_models.EsTheme, count) //避免引用问题 copy(needDump, the.dataQueueTheme[:count]) the.dataQueueTheme = the.dataQueueTheme[count:] the.lock.RUnlock() go the.dumpThemes(needDump) } } } func (the *SinkHandler) sinkThemeToES(p *common_models.ProcessData) *common_models.ProcessData { go the.sinkThemeData(p.Stations) return p } func (the *SinkHandler) sinkThemeData(stations []common_models.Station) { the.lock.Lock() for _, station := range stations { esTheme := common_models.EsTheme{ SensorName: station.Info.Name, FactorName: station.Info.Factor.Name, FactorProtoCode: station.Info.Proto.Code, Data: station.Data.ThemeData, FactorProtoName: station.Info.Proto.Name, Factor: station.Info.FactorId, CollectTime: station.Data.CollectTime.Truncate(time.Millisecond), Sensor: station.Info.Id, Structure: station.Info.StructureId, IotaDevice: station.Info.GetDeviceIdArray(), CreateTime: time.Now().Truncate(time.Millisecond), } the.dataQueueTheme = append(the.dataQueueTheme, esTheme) } the.lock.Unlock() if len(the.dataQueueTheme) >= the.batchCount { the.signBatch <- true } } func (the *SinkHandler) dumpThemes(esThemes []common_models.EsTheme) { for i, consumer := range the.storageConsumers { consumer.SaveTheme(esThemes) log.Printf("[consumer-%d]存储Theme数据 %d 条", i, len(esThemes)) } } // ******************* 分组主题数据存储 ********************* // onGroupData 监听分组主题数据 func (the *SinkHandler) onGroupData() { for { select { case <-the.signBatch: case <-time.After(200 * time.Millisecond): } if len(the.dataQueueGroup) > 0 { count := len(the.dataQueueGroup) needDump := the.dataQueueGroup[:count] the.dataQueueGroup = the.dataQueueGroup[count:] the.dumpGroupThemes(needDump) } } } func (the *SinkHandler) sinkGroupDataToES(p *common_models.ProcessData) *common_models.ProcessData { go the.sinkGroupData(p.Stations) return p } func (the *SinkHandler) sinkGroupData(stations []common_models.Station) { var EsThemes []common_models.EsGroupTheme for _, station := range stations { esTheme := common_models.EsGroupTheme{ Structure: station.Info.StructureId, GroupId: station.Info.Group.Id, GroupName: station.Info.Group.Name, Factor: station.Info.FactorId, FactorName: station.Info.Factor.Name, FactorProtoCode: station.Info.Proto.Code, FactorProtoName: station.Info.Proto.Name, Data: station.Data.PyhData, // 分组下所有测点的主题数据 CollectTime: station.Data.CollectTime.Truncate(time.Millisecond), CreateTime: time.Now().Truncate(time.Millisecond), } EsThemes = append(EsThemes, esTheme) } the.dataQueueGroup = append(the.dataQueueGroup, EsThemes...) if len(the.dataQueueGroup) >= the.batchCount { the.signBatch <- true } } func (the *SinkHandler) dumpGroupThemes(esThemes []common_models.EsGroupTheme) { for _, consumer := range the.storageConsumers { consumer.SaveGroupTheme(esThemes) } }