et-go 20240919重建
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

282 lines
8.6 KiB

package et_sink
import (
"encoding/json"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"gitea.anxinyun.cn/container/common_utils/storage/storageDBs"
"log"
"node/stages"
"slices"
"sync"
"time"
)
type SinkHandler struct {
stage *stages.Stage
storageConsumers []storageDBs.IStorageConsumer
dataQueueRaw []common_models.EsRaw
dataQueueVib []common_models.EsVbRaw
dataQueueTheme []common_models.EsTheme
lock *sync.RWMutex
dataQueueGroup []common_models.EsGroupTheme // 分组主题数据队列
signBatch chan bool
batchCount int
}
const defaultBatchCount = 1000
func NewSinkThemeHandler() *SinkHandler {
the := &SinkHandler{
stage: stages.NewStage("Theme 数据存储"),
storageConsumers: storageDBs.LoadIStorageConsumer(),
dataQueueTheme: make([]common_models.EsTheme, 0, defaultBatchCount),
lock: &sync.RWMutex{},
signBatch: make(chan bool, 1),
batchCount: defaultBatchCount,
}
go the.dumpThemeBatchMonitor()
the.stage.AddProcess(the.sinkThemeToES)
return the
}
func NewSinkGroupHandler() *SinkHandler {
esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses")
log.Printf("es addresses: %v", esAddresses)
the := &SinkHandler{
stage: stages.NewStage("EsGroupTheme 数据存储"),
storageConsumers: storageDBs.LoadIStorageConsumer(),
dataQueueGroup: make([]common_models.EsGroupTheme, 0, defaultBatchCount),
batchCount: defaultBatchCount,
}
go the.onGroupData()
the.stage.AddProcess(the.sinkGroupDataToES)
return the
}
func NewSinkRawHandler() *SinkHandler {
esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses")
log.Printf("es addresses: %v", esAddresses)
the := &SinkHandler{
stage: stages.NewStage("raws 数据存储"),
storageConsumers: storageDBs.LoadIStorageConsumer(),
dataQueueRaw: make([]common_models.EsRaw, 0, defaultBatchCount),
dataQueueVib: make([]common_models.EsVbRaw, 0, defaultBatchCount),
lock: &sync.RWMutex{},
batchCount: defaultBatchCount,
}
go the.dumpRawBatchMonitor()
the.stage.AddProcess(the.sinkRawData)
return the
}
func (the *SinkHandler) GetStage() stages.Stage {
return *the.stage
}
func (the *SinkHandler) sinkRawData(p *common_models.ProcessData) *common_models.ProcessData {
go the.sinkRawDataToES(p.DeviceData)
return p
}
func (the *SinkHandler) sinkRawDataToES(deviceData common_models.DeviceData) {
switch deviceData.DataType {
case common_models.RawTypeVib:
vibData := deviceData.GetVibrationData()
vbRaws := common_models.EsVbRaw{
StructId: deviceData.StructId,
IotaDeviceName: deviceData.Name,
Param: vibData.FormatParams(),
Data: map[string]any{"raw": vibData.Data},
CollectTime: deviceData.AcqTime.Truncate(time.Millisecond),
IotaDevice: deviceData.DeviceId,
CreateTime: time.Now().Truncate(time.Millisecond),
}
the.lock.Lock()
the.dataQueueVib = append(the.dataQueueVib, vbRaws)
the.lock.Unlock()
case common_models.RawTypeDiag:
default:
if deviceData.Raw == nil {
msg, _ := json.Marshal(deviceData)
log.Printf("异常空,raw数据 =%s", string(msg))
} else {
esRaws := common_models.EsRaw{
StructId: deviceData.StructId,
IotaDeviceName: deviceData.Name,
Data: deviceData.Raw,
CollectTime: deviceData.AcqTime.Truncate(time.Millisecond),
Meta: deviceData.DeviceInfo.DeviceMeta.GetOutputProps(),
IotaDevice: deviceData.DeviceId,
CreateTime: time.Now().Truncate(time.Millisecond),
}
the.lock.Lock()
the.dataQueueRaw = append(the.dataQueueRaw, esRaws)
the.lock.Unlock()
}
}
if len(the.dataQueueRaw) >= the.batchCount || len(the.dataQueueVib) >= the.batchCount {
the.signBatch <- true
}
}
func (the *SinkHandler) dumpRawBatchMonitor() {
for {
select {
case <-the.signBatch:
log.Printf("批存储信号raw,监控器收到")
case <-time.After(200 * time.Millisecond):
}
if len(the.dataQueueRaw) > 0 {
the.lock.RLock()
count := len(the.dataQueueRaw)
log.Printf("es写入dataQueueRaw数据 count====> %d", count)
needDump := make([]common_models.EsRaw, count)
//避免引用问题
copy(needDump, the.dataQueueRaw[:count])
the.dataQueueRaw = the.dataQueueRaw[count:]
the.lock.RUnlock()
go the.dumpRaws(needDump)
}
if len(the.dataQueueVib) > 0 {
the.lock.RLock()
count := len(the.dataQueueVib)
log.Printf("es写入dataQueueVib数据 count====> %d", count)
needDump := the.dataQueueVib[:count]
the.dataQueueVib = the.dataQueueVib[count:]
the.lock.RUnlock()
go the.dumpVibRaws(needDump)
}
}
}
func (the *SinkHandler) dumpRaws(esRaws []common_models.EsRaw) {
for i, consumer := range the.storageConsumers {
log.Printf("[consumer-%d]存储raw数据 %d 条", i, len(esRaws))
consumer.SaveRaw(esRaws)
}
}
func (the *SinkHandler) dumpVibRaws(esVbRaws []common_models.EsVbRaw) {
for i, consumer := range the.storageConsumers {
log.Printf("[consumer-%d]存储VbRaw数据 %d 条", i, len(esVbRaws))
consumer.SaveVib(esVbRaws)
}
}
func (the *SinkHandler) dumpThemeBatchMonitor() {
for {
select {
case <-the.signBatch:
log.Printf("批存储信号Theme,监控器收到")
case <-time.After(200 * time.Millisecond):
}
if len(the.dataQueueTheme) > 0 {
the.lock.RLock()
count := len(the.dataQueueTheme) //todo 避免临界操作
needDump := make([]common_models.EsTheme, count)
//避免引用问题
copy(needDump, the.dataQueueTheme[:count])
the.dataQueueTheme = the.dataQueueTheme[count:]
the.lock.RUnlock()
go the.dumpThemes(needDump)
}
}
}
func (the *SinkHandler) sinkThemeToES(p *common_models.ProcessData) *common_models.ProcessData {
go the.sinkThemeData(p.Stations)
return p
}
func (the *SinkHandler) sinkThemeData(stations []common_models.Station) {
var EsThemes []common_models.EsTheme
for _, station := range stations {
esTheme := common_models.EsTheme{
SensorName: station.Info.Name,
FactorName: station.Info.Factor.Name,
FactorProtoCode: station.Info.Proto.Code,
Data: station.Data.ThemeData,
FactorProtoName: station.Info.Proto.Name,
Factor: station.Info.FactorId,
CollectTime: station.Data.CollectTime.Truncate(time.Millisecond),
Sensor: station.Info.Id,
Structure: station.Info.StructureId,
IotaDevice: station.Info.GetDeviceIdArray(),
CreateTime: time.Now().Truncate(time.Millisecond),
}
EsThemes = append(EsThemes, esTheme)
}
the.lock.Lock()
the.dataQueueTheme = slices.Concat(the.dataQueueTheme, EsThemes)
the.lock.Unlock()
//the.dataQueueTheme = append(the.dataQueueTheme, EsThemes...)
if len(the.dataQueueTheme) >= the.batchCount {
the.signBatch <- true
}
}
func (the *SinkHandler) dumpThemes(esThemes []common_models.EsTheme) {
for i, consumer := range the.storageConsumers {
log.Printf("[consumer-%d]存储Theme数据 %d 条", i, len(esThemes))
consumer.SaveTheme(esThemes)
}
}
// ******************* 分组主题数据存储 *********************
// onGroupData 监听分组主题数据
func (the *SinkHandler) onGroupData() {
for {
select {
case <-the.signBatch:
case <-time.After(200 * time.Millisecond):
}
if len(the.dataQueueGroup) > 0 {
count := len(the.dataQueueGroup)
needDump := the.dataQueueGroup[:count]
the.dataQueueGroup = the.dataQueueGroup[count:]
the.dumpGroupThemes(needDump)
}
}
}
func (the *SinkHandler) sinkGroupDataToES(p *common_models.ProcessData) *common_models.ProcessData {
go the.sinkGroupData(p.Stations)
return p
}
func (the *SinkHandler) sinkGroupData(stations []common_models.Station) {
var EsThemes []common_models.EsGroupTheme
for _, station := range stations {
esTheme := common_models.EsGroupTheme{
Structure: station.Info.StructureId,
GroupId: station.Info.Group.Id,
GroupName: station.Info.Group.Name,
Factor: station.Info.FactorId,
FactorName: station.Info.Factor.Name,
FactorProtoCode: station.Info.Proto.Code,
FactorProtoName: station.Info.Proto.Name,
Data: station.Data.PyhData, // 分组下所有测点的主题数据
CollectTime: station.Data.CollectTime.Truncate(time.Millisecond),
CreateTime: time.Now().Truncate(time.Millisecond),
}
EsThemes = append(EsThemes, esTheme)
}
the.dataQueueGroup = append(the.dataQueueGroup, EsThemes...)
if len(the.dataQueueGroup) >= the.batchCount {
the.signBatch <- true
}
}
func (the *SinkHandler) dumpGroupThemes(esThemes []common_models.EsGroupTheme) {
for _, consumer := range the.storageConsumers {
consumer.SaveGroupTheme(esThemes)
}
}