From eb6ef4493cb371b05a7685b27f7f0dfed5f33a8f Mon Sep 17 00:00:00 2001 From: 18209 Date: Wed, 7 Jan 2026 14:05:02 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=B5=8B=E7=82=B9=E6=9C=80?= =?UTF-8?q?=E6=96=B0=E4=B8=80=E6=9D=A1=E6=95=B0=E6=8D=AE=E5=87=8F=E5=B0=91?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2ES?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumers/consumerAXYThemeToES.go | 224 +++++++++++++++++++++++------- 1 file changed, 175 insertions(+), 49 deletions(-) diff --git a/consumers/consumerAXYThemeToES.go b/consumers/consumerAXYThemeToES.go index e7a1cf6..bfb292b 100644 --- a/consumers/consumerAXYThemeToES.go +++ b/consumers/consumerAXYThemeToES.go @@ -19,18 +19,18 @@ type consumerAXYThemeToES struct { //数据缓存管道 dataCache chan *models.EsTheme //具体配置 - Info AXYTheme.ConfigFile - InKafka _kafka.KafkaHelper - OutEs dbOperate.ESHelper - infoPg *dbOperate.DBHelper - sinkMap sync.Map - lock sync.Mutex - logTagId int - monitor *monitors.CommonMonitor + Info AXYTheme.ConfigFile + InKafka _kafka.KafkaHelper + OutEs dbOperate.ESHelper + infoPg *dbOperate.DBHelper + sinkMap sync.Map + // 全局缓存:存储所有测点的最新数据 + latestDataMap sync.Map + lock sync.Mutex + logTagId int + monitor *monitors.CommonMonitor } -var sensorIdArray map[int]models.EsTheme - func (the *consumerAXYThemeToES) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := yaml.Unmarshal([]byte(cfgStr), &the.Info) @@ -42,6 +42,7 @@ func (the *consumerAXYThemeToES) LoadConfigJson(cfgStr string) { func (the *consumerAXYThemeToES) Initial(cfg string) error { the.sinkMap = sync.Map{} + the.latestDataMap = sync.Map{} // 初始化全局缓存 the.dataCache = make(chan *models.EsTheme, 1000) the.LoadConfigJson(cfg) @@ -64,11 +65,6 @@ func (the *consumerAXYThemeToES) inputInitial() error { GroupId: the.Info.IoConfig.In.Kafka.GroupId, } the.InKafka.Initial() - - queryStr := the.getESTimeQueryStr() - index := the.Info.IoConfig.Out.Es.Index - sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr) - for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics { the.InKafka.Subscribe(inTopic, the.onData) } @@ -76,6 +72,7 @@ func (the *consumerAXYThemeToES) inputInitial() error { the.InKafka.Worker() return nil } + func (the *consumerAXYThemeToES) outputInitial() error { //数据出口 the.OutEs = *dbOperate.NewESHelper( @@ -102,9 +99,14 @@ func (the *consumerAXYThemeToES) toSink() { var themes []models.EsTheme the.lock.Lock() defer the.lock.Unlock() + the.sinkMap.Range(func(key, value any) bool { if v, ok := value.(*models.EsTheme); ok { themes = append(themes, *v) + + // 在写入ES的同时更新全局缓存 + the.updateLatestData(v) + //零时打日志用 if v.Sensor == the.logTagId { bs, _ := json.Marshal(v) @@ -116,16 +118,98 @@ func (the *consumerAXYThemeToES) toSink() { } return true }) + if len(themes) > 0 { index := the.Info.IoConfig.Out.Es.Index - log.Printf("写入es [%s] %d条,%s", index, len(themes), themes) + log.Printf("写入es [%s] %d条", index, len(themes)) the.OutEs.BulkWriteThemes2Es(index, themes) - the.sinkMap.Clear() + the.sinkMap = sync.Map{} // 清空sinkMap + } +} + +// updateLatestData 更新全局缓存中的测点数据 +func (the *consumerAXYThemeToES) updateLatestData(theme *models.EsTheme) { + key := fmt.Sprintf("%d_%d", theme.Structure, theme.Sensor) + the.latestDataMap.Store(key, theme) + + if theme.Sensor == the.logTagId { + log.Printf("更新全局缓存: key=%s, collectTime=%s", key, theme.CollectTime) + } +} + +// getFromLatestData 从全局缓存获取测点数据 +func (the *consumerAXYThemeToES) getFromLatestData(structId, sensorId int) (*models.EsTheme, bool) { + key := fmt.Sprintf("%d_%d", structId, sensorId) + if value, ok := the.latestDataMap.Load(key); ok { + if theme, ok := value.(*models.EsTheme); ok { + if sensorId == the.logTagId { + log.Printf("从全局缓存获取: key=%s, collectTime=%s", key, theme.CollectTime) + } + return theme, true + } } + return nil, false +} + +// loadInitialDataFromES 程序启动时从ES加载所有测点的最新数据到全局缓存 +func (the *consumerAXYThemeToES) loadInitialDataFromES() error { + log.Println("开始从ES加载所有测点的最新数据到全局缓存...") + + // 构建查询所有测点最新数据的ES查询 + // 这里使用聚合查询,按structure和sensor分组,获取每个组的最新collect_time + queryStr := ` +{ + "size": 0, + "aggs": { + "group_by_structure_sensor": { + "composite": { + "size": 1000, + "sources": [ + { "structure": { "terms": { "field": "structure" } } }, + { "sensor": { "terms": { "field": "sensor" } } } + ] + }, + "aggs": { + "latest_doc": { + "top_hits": { + "size": 1, + "sort": [{ "collect_time": { "order": "desc" } }] + } + } + } + } + } +}` + + // 执行查询 + index := the.Info.IoConfig.Out.Es.Index + results, err := the.OutEs.SearchThemeData(index, queryStr) + if err != nil { + log.Printf("从ES加载初始数据失败: %v", err) + return err + } + + // 将查询结果存入全局缓存 + count := 0 + for _, theme := range results { + the.updateLatestData(&theme) + count++ + } + + log.Printf("从ES加载初始数据完成,共加载 %d 个测点的数据", count) + return nil } func (the *consumerAXYThemeToES) Work() { log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId) + + // 程序启动时从ES加载所有测点的最新数据 + go func() { + if err := the.loadInitialDataFromES(); err != nil { + log.Printf("加载初始数据失败,将使用懒加载模式: %v", err) + } + }() + go the.sinkTask() go func() { for { @@ -141,37 +225,22 @@ func (the *consumerAXYThemeToES) Work() { the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme) the.lock.Unlock() } - }() } + func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { - //if len(msg) > 80 { - // log.Printf("recv:[%s]:%s ...", topic, msg[:80]) - //} adaptor := adaptors.Adaptor_Anxinyun_LastTheme{} esAtime, newAtime, judgeErr := the.judgeTime(msg) judgeBool := false - /*esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000-0700", esTimeStr) - if esErr != nil { - log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", esTimeStr, esErr) - } - esAtime := esAtime1.Add(8 * time.Hour) // 转为北京时间 - - newAtime, newErr := time.Parse("2006-01-02T15:04:05.000-0700", newTimeStr) - if newErr != nil { - log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", newTimeStr, newErr) - }*/ - // 只有在两个时间解析成功时,且new时间在es时间之后,或者相等的情况下,才进行推送 if judgeErr == nil && (newAtime.After(esAtime) || newAtime.Equal(esAtime)) { judgeBool = true } if judgeBool { - needPush := adaptor.Transform(topic, msg) if needPush != nil && needPush.Data != nil { @@ -190,6 +259,7 @@ func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { return true } + func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, error) { theme := models.AXYSavoirTheme{} err := json.Unmarshal([]byte(rawMsg), &theme) @@ -198,10 +268,19 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, return time.Time{}, time.Time{}, err } - if _, exists := sensorIdArray[theme.Station.Id]; exists { - cTime := sensorIdArray[theme.Station.Id].CollectTime + structId := theme.Station.Structure.Id + sensorId := theme.Station.Id + + // 1. 首先尝试从全局缓存获取数据 + cachedTheme, found := the.getFromLatestData(structId, sensorId) + if found { + // 使用缓存中的数据时间 + cTime := cachedTheme.CollectTime acqTime := theme.AcqTime - log.Printf("判断 esTimeStr:%s,newTimeStr:%s", cTime, acqTime) + + log.Printf("从缓存获取测点[%d_%d]数据时间: esTime=%s, newTime=%s", + structId, sensorId, cTime, acqTime) + esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000Z", cTime) if esErr != nil { log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", cTime, esErr) @@ -215,14 +294,45 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, return time.Time{}, time.Time{}, newErr } return esAtime, newAtime, nil - } else { + } + + // 2. 如果缓存中没有,则从ES查询 + log.Printf("缓存中未找到测点[%d_%d],从ES查询", structId, sensorId) + queryStr := the.getESTimeQueryStr(structId, sensorId) + index := the.Info.IoConfig.Out.Es.Index + TimeTheme, err := the.OutEs.SearchThemeData(index, queryStr) + + // 如果es里面没有这个数据时间,就返回测点的时间 + if len(TimeTheme) > 0 { + cTime := TimeTheme[0].CollectTime acqTime := theme.AcqTime + log.Printf("从ES获取测点[%d_%d]数据时间: esTime=%s, newTime=%s", + structId, sensorId, cTime, acqTime) - queryStr := the.getESTimeQueryStr() - index := the.Info.IoConfig.Out.Es.Index - sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr) + esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000Z", cTime) + if esErr != nil { + log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", cTime, esErr) + return time.Time{}, time.Time{}, esErr + } + esAtime := esAtime1.Add(8 * time.Hour) // 转为北京时间 + + newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", acqTime) + if newErr != nil { + log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) + return time.Time{}, time.Time{}, newErr + } + + // 将从ES查询到的数据存入缓存 + if len(TimeTheme) > 0 { + the.updateLatestData(&TimeTheme[0]) + } + + return esAtime, newAtime, nil + } else { + // ES中没有数据,使用新数据的时间 + acqTime := theme.AcqTime + log.Printf("ES中无测点[%d_%d]数据,使用新时间:%s", structId, sensorId, acqTime) - log.Printf("esTime 为空, 新时间:%s", acqTime) newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", acqTime) if newErr != nil { log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) @@ -232,22 +342,38 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, } } -func (the *consumerAXYThemeToES) getESTimeQueryStr() string { - +func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) string { esQuery := fmt.Sprintf(` { - "size": 10000, + "query": { + "bool": { + "must": [ + { + "term": { + "structure": { + "value": %d + } + } + }, + { + "term": { + "sensor": { + "value": %d + } + } + } + ] + } + }, "sort": [ { - "create_time": { + "collect_time": { "order": "desc" } } ], - "query": { - "match_all": {} - } + "size": 1 } -`) +`, structId, sensorId) return esQuery }