|
|
|
@ -19,18 +19,18 @@ type consumerAXYThemeToES struct { |
|
|
|
//数据缓存管道
|
|
|
|
dataCache chan *models.EsTheme |
|
|
|
//具体配置
|
|
|
|
Info AXYTheme.ConfigFile |
|
|
|
InKafka _kafka.KafkaHelper |
|
|
|
OutEs dbOperate.ESHelper |
|
|
|
infoPg *dbOperate.DBHelper |
|
|
|
sinkMap sync.Map |
|
|
|
lock sync.Mutex |
|
|
|
logTagId int |
|
|
|
monitor *monitors.CommonMonitor |
|
|
|
Info AXYTheme.ConfigFile |
|
|
|
InKafka _kafka.KafkaHelper |
|
|
|
OutEs dbOperate.ESHelper |
|
|
|
infoPg *dbOperate.DBHelper |
|
|
|
sinkMap sync.Map |
|
|
|
// 全局缓存:存储所有测点的最新数据
|
|
|
|
latestDataMap sync.Map |
|
|
|
lock sync.Mutex |
|
|
|
logTagId int |
|
|
|
monitor *monitors.CommonMonitor |
|
|
|
} |
|
|
|
|
|
|
|
var sensorIdArray map[int]models.EsTheme |
|
|
|
|
|
|
|
func (the *consumerAXYThemeToES) LoadConfigJson(cfgStr string) { |
|
|
|
// 将 JSON 格式的数据解析到结构体中
|
|
|
|
err := yaml.Unmarshal([]byte(cfgStr), &the.Info) |
|
|
|
@ -42,6 +42,7 @@ func (the *consumerAXYThemeToES) LoadConfigJson(cfgStr string) { |
|
|
|
|
|
|
|
func (the *consumerAXYThemeToES) Initial(cfg string) error { |
|
|
|
the.sinkMap = sync.Map{} |
|
|
|
the.latestDataMap = sync.Map{} // 初始化全局缓存
|
|
|
|
the.dataCache = make(chan *models.EsTheme, 1000) |
|
|
|
|
|
|
|
the.LoadConfigJson(cfg) |
|
|
|
@ -64,11 +65,6 @@ func (the *consumerAXYThemeToES) inputInitial() error { |
|
|
|
GroupId: the.Info.IoConfig.In.Kafka.GroupId, |
|
|
|
} |
|
|
|
the.InKafka.Initial() |
|
|
|
|
|
|
|
queryStr := the.getESTimeQueryStr() |
|
|
|
index := the.Info.IoConfig.Out.Es.Index |
|
|
|
sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr) |
|
|
|
|
|
|
|
for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics { |
|
|
|
the.InKafka.Subscribe(inTopic, the.onData) |
|
|
|
} |
|
|
|
@ -76,6 +72,7 @@ func (the *consumerAXYThemeToES) inputInitial() error { |
|
|
|
the.InKafka.Worker() |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (the *consumerAXYThemeToES) outputInitial() error { |
|
|
|
//数据出口
|
|
|
|
the.OutEs = *dbOperate.NewESHelper( |
|
|
|
@ -102,9 +99,14 @@ func (the *consumerAXYThemeToES) toSink() { |
|
|
|
var themes []models.EsTheme |
|
|
|
the.lock.Lock() |
|
|
|
defer the.lock.Unlock() |
|
|
|
|
|
|
|
the.sinkMap.Range(func(key, value any) bool { |
|
|
|
if v, ok := value.(*models.EsTheme); ok { |
|
|
|
themes = append(themes, *v) |
|
|
|
|
|
|
|
// 在写入ES的同时更新全局缓存
|
|
|
|
the.updateLatestData(v) |
|
|
|
|
|
|
|
//零时打日志用
|
|
|
|
if v.Sensor == the.logTagId { |
|
|
|
bs, _ := json.Marshal(v) |
|
|
|
@ -116,16 +118,98 @@ func (the *consumerAXYThemeToES) toSink() { |
|
|
|
} |
|
|
|
return true |
|
|
|
}) |
|
|
|
|
|
|
|
if len(themes) > 0 { |
|
|
|
index := the.Info.IoConfig.Out.Es.Index |
|
|
|
log.Printf("写入es [%s] %d条,%s", index, len(themes), themes) |
|
|
|
log.Printf("写入es [%s] %d条", index, len(themes)) |
|
|
|
the.OutEs.BulkWriteThemes2Es(index, themes) |
|
|
|
the.sinkMap.Clear() |
|
|
|
the.sinkMap = sync.Map{} // 清空sinkMap
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// updateLatestData 更新全局缓存中的测点数据
|
|
|
|
func (the *consumerAXYThemeToES) updateLatestData(theme *models.EsTheme) { |
|
|
|
key := fmt.Sprintf("%d_%d", theme.Structure, theme.Sensor) |
|
|
|
the.latestDataMap.Store(key, theme) |
|
|
|
|
|
|
|
if theme.Sensor == the.logTagId { |
|
|
|
log.Printf("更新全局缓存: key=%s, collectTime=%s", key, theme.CollectTime) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// getFromLatestData 从全局缓存获取测点数据
|
|
|
|
func (the *consumerAXYThemeToES) getFromLatestData(structId, sensorId int) (*models.EsTheme, bool) { |
|
|
|
key := fmt.Sprintf("%d_%d", structId, sensorId) |
|
|
|
if value, ok := the.latestDataMap.Load(key); ok { |
|
|
|
if theme, ok := value.(*models.EsTheme); ok { |
|
|
|
if sensorId == the.logTagId { |
|
|
|
log.Printf("从全局缓存获取: key=%s, collectTime=%s", key, theme.CollectTime) |
|
|
|
} |
|
|
|
return theme, true |
|
|
|
} |
|
|
|
} |
|
|
|
return nil, false |
|
|
|
} |
|
|
|
|
|
|
|
// loadInitialDataFromES 程序启动时从ES加载所有测点的最新数据到全局缓存
|
|
|
|
func (the *consumerAXYThemeToES) loadInitialDataFromES() error { |
|
|
|
log.Println("开始从ES加载所有测点的最新数据到全局缓存...") |
|
|
|
|
|
|
|
// 构建查询所有测点最新数据的ES查询
|
|
|
|
// 这里使用聚合查询,按structure和sensor分组,获取每个组的最新collect_time
|
|
|
|
queryStr := ` |
|
|
|
{ |
|
|
|
"size": 0, |
|
|
|
"aggs": { |
|
|
|
"group_by_structure_sensor": { |
|
|
|
"composite": { |
|
|
|
"size": 1000, |
|
|
|
"sources": [ |
|
|
|
{ "structure": { "terms": { "field": "structure" } } }, |
|
|
|
{ "sensor": { "terms": { "field": "sensor" } } } |
|
|
|
] |
|
|
|
}, |
|
|
|
"aggs": { |
|
|
|
"latest_doc": { |
|
|
|
"top_hits": { |
|
|
|
"size": 1, |
|
|
|
"sort": [{ "collect_time": { "order": "desc" } }] |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
}` |
|
|
|
|
|
|
|
// 执行查询
|
|
|
|
index := the.Info.IoConfig.Out.Es.Index |
|
|
|
results, err := the.OutEs.SearchThemeData(index, queryStr) |
|
|
|
if err != nil { |
|
|
|
log.Printf("从ES加载初始数据失败: %v", err) |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
// 将查询结果存入全局缓存
|
|
|
|
count := 0 |
|
|
|
for _, theme := range results { |
|
|
|
the.updateLatestData(&theme) |
|
|
|
count++ |
|
|
|
} |
|
|
|
|
|
|
|
log.Printf("从ES加载初始数据完成,共加载 %d 个测点的数据", count) |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (the *consumerAXYThemeToES) Work() { |
|
|
|
log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId) |
|
|
|
|
|
|
|
// 程序启动时从ES加载所有测点的最新数据
|
|
|
|
go func() { |
|
|
|
if err := the.loadInitialDataFromES(); err != nil { |
|
|
|
log.Printf("加载初始数据失败,将使用懒加载模式: %v", err) |
|
|
|
} |
|
|
|
}() |
|
|
|
|
|
|
|
go the.sinkTask() |
|
|
|
go func() { |
|
|
|
for { |
|
|
|
@ -141,37 +225,22 @@ func (the *consumerAXYThemeToES) Work() { |
|
|
|
the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme) |
|
|
|
the.lock.Unlock() |
|
|
|
} |
|
|
|
|
|
|
|
}() |
|
|
|
} |
|
|
|
|
|
|
|
func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { |
|
|
|
//if len(msg) > 80 {
|
|
|
|
// log.Printf("recv:[%s]:%s ...", topic, msg[:80])
|
|
|
|
//}
|
|
|
|
adaptor := adaptors.Adaptor_Anxinyun_LastTheme{} |
|
|
|
|
|
|
|
esAtime, newAtime, judgeErr := the.judgeTime(msg) |
|
|
|
|
|
|
|
judgeBool := false |
|
|
|
|
|
|
|
/*esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000-0700", esTimeStr) |
|
|
|
if esErr != nil { |
|
|
|
log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", esTimeStr, esErr) |
|
|
|
} |
|
|
|
esAtime := esAtime1.Add(8 * time.Hour) // 转为北京时间
|
|
|
|
|
|
|
|
newAtime, newErr := time.Parse("2006-01-02T15:04:05.000-0700", newTimeStr) |
|
|
|
if newErr != nil { |
|
|
|
log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", newTimeStr, newErr) |
|
|
|
}*/ |
|
|
|
|
|
|
|
// 只有在两个时间解析成功时,且new时间在es时间之后,或者相等的情况下,才进行推送
|
|
|
|
if judgeErr == nil && (newAtime.After(esAtime) || newAtime.Equal(esAtime)) { |
|
|
|
judgeBool = true |
|
|
|
} |
|
|
|
|
|
|
|
if judgeBool { |
|
|
|
|
|
|
|
needPush := adaptor.Transform(topic, msg) |
|
|
|
|
|
|
|
if needPush != nil && needPush.Data != nil { |
|
|
|
@ -190,6 +259,7 @@ func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { |
|
|
|
|
|
|
|
return true |
|
|
|
} |
|
|
|
|
|
|
|
func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, error) { |
|
|
|
theme := models.AXYSavoirTheme{} |
|
|
|
err := json.Unmarshal([]byte(rawMsg), &theme) |
|
|
|
@ -198,10 +268,19 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, |
|
|
|
return time.Time{}, time.Time{}, err |
|
|
|
} |
|
|
|
|
|
|
|
if _, exists := sensorIdArray[theme.Station.Id]; exists { |
|
|
|
cTime := sensorIdArray[theme.Station.Id].CollectTime |
|
|
|
structId := theme.Station.Structure.Id |
|
|
|
sensorId := theme.Station.Id |
|
|
|
|
|
|
|
// 1. 首先尝试从全局缓存获取数据
|
|
|
|
cachedTheme, found := the.getFromLatestData(structId, sensorId) |
|
|
|
if found { |
|
|
|
// 使用缓存中的数据时间
|
|
|
|
cTime := cachedTheme.CollectTime |
|
|
|
acqTime := theme.AcqTime |
|
|
|
log.Printf("判断 esTimeStr:%s,newTimeStr:%s", cTime, acqTime) |
|
|
|
|
|
|
|
log.Printf("从缓存获取测点[%d_%d]数据时间: esTime=%s, newTime=%s", |
|
|
|
structId, sensorId, cTime, acqTime) |
|
|
|
|
|
|
|
esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000Z", cTime) |
|
|
|
if esErr != nil { |
|
|
|
log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", cTime, esErr) |
|
|
|
@ -215,14 +294,45 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, |
|
|
|
return time.Time{}, time.Time{}, newErr |
|
|
|
} |
|
|
|
return esAtime, newAtime, nil |
|
|
|
} else { |
|
|
|
} |
|
|
|
|
|
|
|
// 2. 如果缓存中没有,则从ES查询
|
|
|
|
log.Printf("缓存中未找到测点[%d_%d],从ES查询", structId, sensorId) |
|
|
|
queryStr := the.getESTimeQueryStr(structId, sensorId) |
|
|
|
index := the.Info.IoConfig.Out.Es.Index |
|
|
|
TimeTheme, err := the.OutEs.SearchThemeData(index, queryStr) |
|
|
|
|
|
|
|
// 如果es里面没有这个数据时间,就返回测点的时间
|
|
|
|
if len(TimeTheme) > 0 { |
|
|
|
cTime := TimeTheme[0].CollectTime |
|
|
|
acqTime := theme.AcqTime |
|
|
|
log.Printf("从ES获取测点[%d_%d]数据时间: esTime=%s, newTime=%s", |
|
|
|
structId, sensorId, cTime, acqTime) |
|
|
|
|
|
|
|
queryStr := the.getESTimeQueryStr() |
|
|
|
index := the.Info.IoConfig.Out.Es.Index |
|
|
|
sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr) |
|
|
|
esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000Z", cTime) |
|
|
|
if esErr != nil { |
|
|
|
log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", cTime, esErr) |
|
|
|
return time.Time{}, time.Time{}, esErr |
|
|
|
} |
|
|
|
esAtime := esAtime1.Add(8 * time.Hour) // 转为北京时间
|
|
|
|
|
|
|
|
newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", acqTime) |
|
|
|
if newErr != nil { |
|
|
|
log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) |
|
|
|
return time.Time{}, time.Time{}, newErr |
|
|
|
} |
|
|
|
|
|
|
|
// 将从ES查询到的数据存入缓存
|
|
|
|
if len(TimeTheme) > 0 { |
|
|
|
the.updateLatestData(&TimeTheme[0]) |
|
|
|
} |
|
|
|
|
|
|
|
return esAtime, newAtime, nil |
|
|
|
} else { |
|
|
|
// ES中没有数据,使用新数据的时间
|
|
|
|
acqTime := theme.AcqTime |
|
|
|
log.Printf("ES中无测点[%d_%d]数据,使用新时间:%s", structId, sensorId, acqTime) |
|
|
|
|
|
|
|
log.Printf("esTime 为空, 新时间:%s", acqTime) |
|
|
|
newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", acqTime) |
|
|
|
if newErr != nil { |
|
|
|
log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) |
|
|
|
@ -232,22 +342,38 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (the *consumerAXYThemeToES) getESTimeQueryStr() string { |
|
|
|
|
|
|
|
func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) string { |
|
|
|
esQuery := fmt.Sprintf(` |
|
|
|
{ |
|
|
|
"size": 10000, |
|
|
|
"query": { |
|
|
|
"bool": { |
|
|
|
"must": [ |
|
|
|
{ |
|
|
|
"term": { |
|
|
|
"structure": { |
|
|
|
"value": %d |
|
|
|
} |
|
|
|
} |
|
|
|
}, |
|
|
|
{ |
|
|
|
"term": { |
|
|
|
"sensor": { |
|
|
|
"value": %d |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
] |
|
|
|
} |
|
|
|
}, |
|
|
|
"sort": [ |
|
|
|
{ |
|
|
|
"create_time": { |
|
|
|
"collect_time": { |
|
|
|
"order": "desc" |
|
|
|
} |
|
|
|
} |
|
|
|
], |
|
|
|
"query": { |
|
|
|
"match_all": {} |
|
|
|
} |
|
|
|
"size": 1 |
|
|
|
} |
|
|
|
`) |
|
|
|
`, structId, sensorId) |
|
|
|
return esQuery |
|
|
|
} |
|
|
|
|