From 2a1330e9b216efa54415bef76585a4dfd91abae5 Mon Sep 17 00:00:00 2001 From: lucas Date: Thu, 25 Dec 2025 09:51:34 +0800 Subject: [PATCH 1/8] =?UTF-8?q?update=20=20anxinyun=20=E4=B8=BB=E9=A2=98?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E7=B4=A2=E5=BC=95=E6=94=B9=E7=94=A8=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E9=87=8C=E9=9D=A2=E7=9A=84=E7=B4=A2=E5=BC=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumers/consumerAXYThemeToES.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/consumers/consumerAXYThemeToES.go b/consumers/consumerAXYThemeToES.go index c5f677d..02c8356 100644 --- a/consumers/consumerAXYThemeToES.go +++ b/consumers/consumerAXYThemeToES.go @@ -191,7 +191,8 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, } queryStr := the.getESTimeQueryStr(theme.Station.Structure.Id, theme.Station.Id) - TimeTheme, err := the.OutEs.SearchThemeData("anxincloud_last_theme", queryStr) + index := the.Info.IoConfig.Out.Es.Index + TimeTheme, err := the.OutEs.SearchThemeData(index, queryStr) //如果es里面没有这个数据时间,呢就返回测点的时间 if len(TimeTheme) > 0 { cTime := TimeTheme[0].CollectTime From 55031a05b3b4ecd632beb0856070260c721eb074 Mon Sep 17 00:00:00 2001 From: 18209 Date: Mon, 29 Dec 2025 20:06:15 +0800 Subject: [PATCH 2/8] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=B5=8B=E7=82=B9?= =?UTF-8?q?=E6=9C=80=E5=90=8E=E4=B8=80=E6=9D=A1=E6=95=B0=E6=8D=AE=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2es=E7=9A=84=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumers/consumerAXYThemeToES.go | 36 +++++++++++++++++-------------- dbOperate/elasticsearchHelper.go | 25 +++++++++++++++++++++ 2 files changed, 45 insertions(+), 16 deletions(-) diff --git a/consumers/consumerAXYThemeToES.go b/consumers/consumerAXYThemeToES.go index 02c8356..2534f86 100644 --- a/consumers/consumerAXYThemeToES.go +++ b/consumers/consumerAXYThemeToES.go @@ -29,6 +29,8 @@ type consumerAXYThemeToES struct { monitor *monitors.CommonMonitor } +var sensorIdArray map[int]models.EsTheme + func (the *consumerAXYThemeToES) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := yaml.Unmarshal([]byte(cfgStr), &the.Info) @@ -54,6 +56,7 @@ func (the *consumerAXYThemeToES) Initial(cfg string) error { return err } + func (the *consumerAXYThemeToES) inputInitial() error { //数据入口 the.InKafka = _kafka.KafkaHelper{ @@ -61,6 +64,12 @@ func (the *consumerAXYThemeToES) inputInitial() error { GroupId: the.Info.IoConfig.In.Kafka.GroupId, } the.InKafka.Initial() + + structIds := the.Info.IoConfig.Out.Es.Index + queryStr := the.getESTimeQueryStr(structIds) + index := the.Info.IoConfig.Out.Es.Index + sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr) + for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics { the.InKafka.Subscribe(inTopic, the.onData) } @@ -190,12 +199,8 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, return time.Time{}, time.Time{}, err } - queryStr := the.getESTimeQueryStr(theme.Station.Structure.Id, theme.Station.Id) - index := the.Info.IoConfig.Out.Es.Index - TimeTheme, err := the.OutEs.SearchThemeData(index, queryStr) - //如果es里面没有这个数据时间,呢就返回测点的时间 - if len(TimeTheme) > 0 { - cTime := TimeTheme[0].CollectTime + if _, exists := sensorIdArray[theme.Station.Id]; exists { + cTime := sensorIdArray[theme.Station.Id].CollectTime acqTime := theme.AcqTime log.Printf("判断 esTimeStr:%s,newTimeStr:%s", cTime, acqTime) esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000Z", cTime) @@ -213,6 +218,12 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, return esAtime, newAtime, nil } else { acqTime := theme.AcqTime + + structIds := the.Info.IoConfig.Out.Es.Index + queryStr := the.getESTimeQueryStr(structIds) + index := the.Info.IoConfig.Out.Es.Index + sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr) + log.Printf("esTime 为空, 新时间:%s", acqTime) newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", acqTime) if newErr != nil { @@ -223,7 +234,7 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, } } -func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) string { +func (the *consumerAXYThemeToES) getESTimeQueryStr(structId string) string { esQuery := fmt.Sprintf(` { @@ -233,14 +244,7 @@ func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) strin { "term": { "structure": { - "value": %d - } - } - }, - { - "term": { - "sensor": { - "value": %d + "value": %s } } } @@ -248,6 +252,6 @@ func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) strin } } } -`, structId, sensorId) +`, structId) return esQuery } diff --git a/dbOperate/elasticsearchHelper.go b/dbOperate/elasticsearchHelper.go index 99aa67c..75f413d 100644 --- a/dbOperate/elasticsearchHelper.go +++ b/dbOperate/elasticsearchHelper.go @@ -10,6 +10,7 @@ import ( "goInOut/models" "io" "log" + "strconv" "strings" ) @@ -218,6 +219,30 @@ func (the *ESHelper) SearchThemeData(index string, queryBody string) ([]models.E return themes, err } +func (the *ESHelper) SearchThemeDataArray(index string, queryBody string) (map[int]models.EsTheme, error) { + + result := make(map[int]models.EsTheme) + themesResp, err := the.searchThemes(index, queryBody) + if err != nil { + return result, err + } + + for _, hit := range themesResp.Hits.Hits { + // 将字符串ID转换为整数 + id, err := strconv.Atoi(hit.Id) + if err != nil { + // 如果ID不是数字,可以选择跳过或使用其他逻辑 + log.Printf("警告: 无法将ID %s 转换为整数: %v", hit.Id, err) + continue + } + + // 将Source存入map,以Id作为key + result[id] = hit.Source + } + + return result, nil +} + func (the *ESHelper) SearchAlarmThemeData(index string, queryBody string) ([]models.EsAlarmTheme, error) { var sensors []models.EsAlarmTheme themesResp, err := the.searchAlarmThemes(index, queryBody) From 46e3329e400e1e7f35e181e2ab924b4f1afaa57c Mon Sep 17 00:00:00 2001 From: lucas Date: Wed, 31 Dec 2025 15:46:30 +0800 Subject: [PATCH 3/8] =?UTF-8?q?fix=20=20=E5=8F=96=E6=B6=88=E4=B8=BB?= =?UTF-8?q?=E9=A2=98=E6=95=B0=E6=8D=AE=20=E6=9F=A5=E8=AF=A2=E9=99=90?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumers/consumerAXYThemeToES.go | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/consumers/consumerAXYThemeToES.go b/consumers/consumerAXYThemeToES.go index 2534f86..e7a1cf6 100644 --- a/consumers/consumerAXYThemeToES.go +++ b/consumers/consumerAXYThemeToES.go @@ -65,8 +65,7 @@ func (the *consumerAXYThemeToES) inputInitial() error { } the.InKafka.Initial() - structIds := the.Info.IoConfig.Out.Es.Index - queryStr := the.getESTimeQueryStr(structIds) + queryStr := the.getESTimeQueryStr() index := the.Info.IoConfig.Out.Es.Index sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr) @@ -219,8 +218,7 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, } else { acqTime := theme.AcqTime - structIds := the.Info.IoConfig.Out.Es.Index - queryStr := the.getESTimeQueryStr(structIds) + queryStr := the.getESTimeQueryStr() index := the.Info.IoConfig.Out.Es.Index sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr) @@ -234,24 +232,22 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, } } -func (the *consumerAXYThemeToES) getESTimeQueryStr(structId string) string { +func (the *consumerAXYThemeToES) getESTimeQueryStr() string { esQuery := fmt.Sprintf(` { - "query": { - "bool": { - "must": [ - { - "term": { - "structure": { - "value": %s - } - } - } - ] + "size": 10000, + "sort": [ + { + "create_time": { + "order": "desc" + } } + ], + "query": { + "match_all": {} } } -`, structId) +`) return esQuery } From eb6ef4493cb371b05a7685b27f7f0dfed5f33a8f Mon Sep 17 00:00:00 2001 From: 18209 Date: Wed, 7 Jan 2026 14:05:02 +0800 Subject: [PATCH 4/8] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=B5=8B=E7=82=B9?= =?UTF-8?q?=E6=9C=80=E6=96=B0=E4=B8=80=E6=9D=A1=E6=95=B0=E6=8D=AE=E5=87=8F?= =?UTF-8?q?=E5=B0=91=E6=9F=A5=E8=AF=A2ES?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumers/consumerAXYThemeToES.go | 224 +++++++++++++++++++++++------- 1 file changed, 175 insertions(+), 49 deletions(-) diff --git a/consumers/consumerAXYThemeToES.go b/consumers/consumerAXYThemeToES.go index e7a1cf6..bfb292b 100644 --- a/consumers/consumerAXYThemeToES.go +++ b/consumers/consumerAXYThemeToES.go @@ -19,18 +19,18 @@ type consumerAXYThemeToES struct { //数据缓存管道 dataCache chan *models.EsTheme //具体配置 - Info AXYTheme.ConfigFile - InKafka _kafka.KafkaHelper - OutEs dbOperate.ESHelper - infoPg *dbOperate.DBHelper - sinkMap sync.Map - lock sync.Mutex - logTagId int - monitor *monitors.CommonMonitor + Info AXYTheme.ConfigFile + InKafka _kafka.KafkaHelper + OutEs dbOperate.ESHelper + infoPg *dbOperate.DBHelper + sinkMap sync.Map + // 全局缓存:存储所有测点的最新数据 + latestDataMap sync.Map + lock sync.Mutex + logTagId int + monitor *monitors.CommonMonitor } -var sensorIdArray map[int]models.EsTheme - func (the *consumerAXYThemeToES) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := yaml.Unmarshal([]byte(cfgStr), &the.Info) @@ -42,6 +42,7 @@ func (the *consumerAXYThemeToES) LoadConfigJson(cfgStr string) { func (the *consumerAXYThemeToES) Initial(cfg string) error { the.sinkMap = sync.Map{} + the.latestDataMap = sync.Map{} // 初始化全局缓存 the.dataCache = make(chan *models.EsTheme, 1000) the.LoadConfigJson(cfg) @@ -64,11 +65,6 @@ func (the *consumerAXYThemeToES) inputInitial() error { GroupId: the.Info.IoConfig.In.Kafka.GroupId, } the.InKafka.Initial() - - queryStr := the.getESTimeQueryStr() - index := the.Info.IoConfig.Out.Es.Index - sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr) - for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics { the.InKafka.Subscribe(inTopic, the.onData) } @@ -76,6 +72,7 @@ func (the *consumerAXYThemeToES) inputInitial() error { the.InKafka.Worker() return nil } + func (the *consumerAXYThemeToES) outputInitial() error { //数据出口 the.OutEs = *dbOperate.NewESHelper( @@ -102,9 +99,14 @@ func (the *consumerAXYThemeToES) toSink() { var themes []models.EsTheme the.lock.Lock() defer the.lock.Unlock() + the.sinkMap.Range(func(key, value any) bool { if v, ok := value.(*models.EsTheme); ok { themes = append(themes, *v) + + // 在写入ES的同时更新全局缓存 + the.updateLatestData(v) + //零时打日志用 if v.Sensor == the.logTagId { bs, _ := json.Marshal(v) @@ -116,16 +118,98 @@ func (the *consumerAXYThemeToES) toSink() { } return true }) + if len(themes) > 0 { index := the.Info.IoConfig.Out.Es.Index - log.Printf("写入es [%s] %d条,%s", index, len(themes), themes) + log.Printf("写入es [%s] %d条", index, len(themes)) the.OutEs.BulkWriteThemes2Es(index, themes) - the.sinkMap.Clear() + the.sinkMap = sync.Map{} // 清空sinkMap + } +} + +// updateLatestData 更新全局缓存中的测点数据 +func (the *consumerAXYThemeToES) updateLatestData(theme *models.EsTheme) { + key := fmt.Sprintf("%d_%d", theme.Structure, theme.Sensor) + the.latestDataMap.Store(key, theme) + + if theme.Sensor == the.logTagId { + log.Printf("更新全局缓存: key=%s, collectTime=%s", key, theme.CollectTime) + } +} + +// getFromLatestData 从全局缓存获取测点数据 +func (the *consumerAXYThemeToES) getFromLatestData(structId, sensorId int) (*models.EsTheme, bool) { + key := fmt.Sprintf("%d_%d", structId, sensorId) + if value, ok := the.latestDataMap.Load(key); ok { + if theme, ok := value.(*models.EsTheme); ok { + if sensorId == the.logTagId { + log.Printf("从全局缓存获取: key=%s, collectTime=%s", key, theme.CollectTime) + } + return theme, true + } } + return nil, false +} + +// loadInitialDataFromES 程序启动时从ES加载所有测点的最新数据到全局缓存 +func (the *consumerAXYThemeToES) loadInitialDataFromES() error { + log.Println("开始从ES加载所有测点的最新数据到全局缓存...") + + // 构建查询所有测点最新数据的ES查询 + // 这里使用聚合查询,按structure和sensor分组,获取每个组的最新collect_time + queryStr := ` +{ + "size": 0, + "aggs": { + "group_by_structure_sensor": { + "composite": { + "size": 1000, + "sources": [ + { "structure": { "terms": { "field": "structure" } } }, + { "sensor": { "terms": { "field": "sensor" } } } + ] + }, + "aggs": { + "latest_doc": { + "top_hits": { + "size": 1, + "sort": [{ "collect_time": { "order": "desc" } }] + } + } + } + } + } +}` + + // 执行查询 + index := the.Info.IoConfig.Out.Es.Index + results, err := the.OutEs.SearchThemeData(index, queryStr) + if err != nil { + log.Printf("从ES加载初始数据失败: %v", err) + return err + } + + // 将查询结果存入全局缓存 + count := 0 + for _, theme := range results { + the.updateLatestData(&theme) + count++ + } + + log.Printf("从ES加载初始数据完成,共加载 %d 个测点的数据", count) + return nil } func (the *consumerAXYThemeToES) Work() { log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId) + + // 程序启动时从ES加载所有测点的最新数据 + go func() { + if err := the.loadInitialDataFromES(); err != nil { + log.Printf("加载初始数据失败,将使用懒加载模式: %v", err) + } + }() + go the.sinkTask() go func() { for { @@ -141,37 +225,22 @@ func (the *consumerAXYThemeToES) Work() { the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme) the.lock.Unlock() } - }() } + func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { - //if len(msg) > 80 { - // log.Printf("recv:[%s]:%s ...", topic, msg[:80]) - //} adaptor := adaptors.Adaptor_Anxinyun_LastTheme{} esAtime, newAtime, judgeErr := the.judgeTime(msg) judgeBool := false - /*esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000-0700", esTimeStr) - if esErr != nil { - log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", esTimeStr, esErr) - } - esAtime := esAtime1.Add(8 * time.Hour) // 转为北京时间 - - newAtime, newErr := time.Parse("2006-01-02T15:04:05.000-0700", newTimeStr) - if newErr != nil { - log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", newTimeStr, newErr) - }*/ - // 只有在两个时间解析成功时,且new时间在es时间之后,或者相等的情况下,才进行推送 if judgeErr == nil && (newAtime.After(esAtime) || newAtime.Equal(esAtime)) { judgeBool = true } if judgeBool { - needPush := adaptor.Transform(topic, msg) if needPush != nil && needPush.Data != nil { @@ -190,6 +259,7 @@ func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { return true } + func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, error) { theme := models.AXYSavoirTheme{} err := json.Unmarshal([]byte(rawMsg), &theme) @@ -198,10 +268,19 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, return time.Time{}, time.Time{}, err } - if _, exists := sensorIdArray[theme.Station.Id]; exists { - cTime := sensorIdArray[theme.Station.Id].CollectTime + structId := theme.Station.Structure.Id + sensorId := theme.Station.Id + + // 1. 首先尝试从全局缓存获取数据 + cachedTheme, found := the.getFromLatestData(structId, sensorId) + if found { + // 使用缓存中的数据时间 + cTime := cachedTheme.CollectTime acqTime := theme.AcqTime - log.Printf("判断 esTimeStr:%s,newTimeStr:%s", cTime, acqTime) + + log.Printf("从缓存获取测点[%d_%d]数据时间: esTime=%s, newTime=%s", + structId, sensorId, cTime, acqTime) + esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000Z", cTime) if esErr != nil { log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", cTime, esErr) @@ -215,14 +294,45 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, return time.Time{}, time.Time{}, newErr } return esAtime, newAtime, nil - } else { + } + + // 2. 如果缓存中没有,则从ES查询 + log.Printf("缓存中未找到测点[%d_%d],从ES查询", structId, sensorId) + queryStr := the.getESTimeQueryStr(structId, sensorId) + index := the.Info.IoConfig.Out.Es.Index + TimeTheme, err := the.OutEs.SearchThemeData(index, queryStr) + + // 如果es里面没有这个数据时间,就返回测点的时间 + if len(TimeTheme) > 0 { + cTime := TimeTheme[0].CollectTime acqTime := theme.AcqTime + log.Printf("从ES获取测点[%d_%d]数据时间: esTime=%s, newTime=%s", + structId, sensorId, cTime, acqTime) - queryStr := the.getESTimeQueryStr() - index := the.Info.IoConfig.Out.Es.Index - sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr) + esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000Z", cTime) + if esErr != nil { + log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", cTime, esErr) + return time.Time{}, time.Time{}, esErr + } + esAtime := esAtime1.Add(8 * time.Hour) // 转为北京时间 + + newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", acqTime) + if newErr != nil { + log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) + return time.Time{}, time.Time{}, newErr + } + + // 将从ES查询到的数据存入缓存 + if len(TimeTheme) > 0 { + the.updateLatestData(&TimeTheme[0]) + } + + return esAtime, newAtime, nil + } else { + // ES中没有数据,使用新数据的时间 + acqTime := theme.AcqTime + log.Printf("ES中无测点[%d_%d]数据,使用新时间:%s", structId, sensorId, acqTime) - log.Printf("esTime 为空, 新时间:%s", acqTime) newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", acqTime) if newErr != nil { log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) @@ -232,22 +342,38 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, } } -func (the *consumerAXYThemeToES) getESTimeQueryStr() string { - +func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) string { esQuery := fmt.Sprintf(` { - "size": 10000, + "query": { + "bool": { + "must": [ + { + "term": { + "structure": { + "value": %d + } + } + }, + { + "term": { + "sensor": { + "value": %d + } + } + } + ] + } + }, "sort": [ { - "create_time": { + "collect_time": { "order": "desc" } } ], - "query": { - "match_all": {} - } + "size": 1 } -`) +`, structId, sensorId) return esQuery } From 480700153a2767c7ba490a4cbf62d4aa1ad4b393 Mon Sep 17 00:00:00 2001 From: 18209 Date: Wed, 14 Jan 2026 14:03:31 +0800 Subject: [PATCH 5/8] =?UTF-8?q?=E6=9C=80=E5=90=8E=E4=B8=80=E6=9D=A1?= =?UTF-8?q?=E6=B5=8B=E7=82=B9=E6=95=B0=E6=8D=AE=E5=85=A8=E5=B1=80=E5=8F=98?= =?UTF-8?q?=E9=87=8F=E7=94=9F=E5=91=BD=E5=91=A8=E6=9C=9F=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumers/consumerAXYThemeToES.go | 42 +++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/consumers/consumerAXYThemeToES.go b/consumers/consumerAXYThemeToES.go index bfb292b..ca9046f 100644 --- a/consumers/consumerAXYThemeToES.go +++ b/consumers/consumerAXYThemeToES.go @@ -244,6 +244,10 @@ func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { needPush := adaptor.Transform(topic, msg) if needPush != nil && needPush.Data != nil { + // === 修改:在数据通过时效性验证后,立即更新全局缓存 === + the.updateLatestData(needPush) + // === 结束 === + the.dataCache <- needPush } else { s, _ := json.Marshal(needPush) @@ -293,6 +297,20 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) return time.Time{}, time.Time{}, newErr } + + // === 修改:在判断通过后立即更新缓存(即使稍后会在onData中用完整对象覆盖)=== + if newAtime.After(esAtime) || newAtime.Equal(esAtime) { + // 构建一个临时对象用于缓存时间信息 + tempTheme := &models.EsTheme{ + Sensor: sensorId, + Structure: structId, + CollectTime: acqTime, // 注意:这里使用原始字符串,格式需与缓存读取时一致 + } + the.updateLatestData(tempTheme) + log.Printf("测点[%d_%d] 缓存时效检查通过,已立即更新全局缓存时间为: %s", structId, sensorId, acqTime) + } + // === 结束 === + return esAtime, newAtime, nil } @@ -327,6 +345,19 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, the.updateLatestData(&TimeTheme[0]) } + // === 修改:在判断通过后立即更新缓存(即使稍后会在onData中用完整对象覆盖)=== + if newAtime.After(esAtime) || newAtime.Equal(esAtime) { + // 构建一个临时对象用于缓存时间信息 + tempTheme := &models.EsTheme{ + Sensor: sensorId, + Structure: structId, + CollectTime: acqTime, // 注意:这里使用原始字符串,格式需与缓存读取时一致 + } + the.updateLatestData(tempTheme) + log.Printf("测点[%d_%d] ES时效检查通过,已立即更新全局缓存时间为: %s", structId, sensorId, acqTime) + } + // === 结束 === + return esAtime, newAtime, nil } else { // ES中没有数据,使用新数据的时间 @@ -338,6 +369,17 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) return time.Time{}, time.Time{}, newErr } + + // === 修改:ES中没有数据时,也要更新缓存 === + tempTheme := &models.EsTheme{ + Sensor: sensorId, + Structure: structId, + CollectTime: acqTime, + } + the.updateLatestData(tempTheme) + log.Printf("测点[%d_%d] ES中无数据,已更新全局缓存时间为: %s", structId, sensorId, acqTime) + // === 结束 === + return newAtime, newAtime, nil } } From 62ded4e686643cce0d18ba3bbbffa73cfa58fa18 Mon Sep 17 00:00:00 2001 From: 18209 Date: Fri, 16 Jan 2026 15:42:55 +0800 Subject: [PATCH 6/8] =?UTF-8?q?kafka=E5=A2=9E=E5=8A=A0=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E4=BF=9D=E6=8A=A4=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumers/consumerAXYThemeToES.go | 32 +++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/consumers/consumerAXYThemeToES.go b/consumers/consumerAXYThemeToES.go index ca9046f..47ccdfa 100644 --- a/consumers/consumerAXYThemeToES.go +++ b/consumers/consumerAXYThemeToES.go @@ -231,6 +231,23 @@ func (the *consumerAXYThemeToES) Work() { func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { adaptor := adaptors.Adaptor_Anxinyun_LastTheme{} + // 首先解析消息检查时间格式 + theme := models.AXYSavoirTheme{} + if err := json.Unmarshal([]byte(msg), &theme); err == nil { + // 检查并转换时间为北京时间 + if len(theme.AcqTime) > 0 && theme.AcqTime[len(theme.AcqTime)-1] == 'Z' { + beijingTimeStr, err := convertUTCToBeijing(theme.AcqTime) + if err == nil { + // 更新消息中的时间为北京时间 + theme.AcqTime = beijingTimeStr + // 重新序列化消息 + if newMsg, err := json.Marshal(theme); err == nil { + msg = string(newMsg) + } + } + } + } + esAtime, newAtime, judgeErr := the.judgeTime(msg) judgeBool := false @@ -264,6 +281,21 @@ func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { return true } +// convertUTCToBeijing 将UTC时间转换为北京时间(+8小时) +func convertUTCToBeijing(utcTimeStr string) (string, error) { + // 解析UTC时间 + utcTime, err := time.Parse("2006-01-02T15:04:05.000Z", utcTimeStr) + if err != nil { + return "", err + } + + // 加8小时转换为北京时间 + beijingTime := utcTime.Add(8 * time.Hour) + + // 格式化为北京时间字符串 + return beijingTime.Format("2006-01-02T15:04:05.000+0800"), nil +} + func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, error) { theme := models.AXYSavoirTheme{} err := json.Unmarshal([]byte(rawMsg), &theme) From c22229bdf71c880b468d6e0a356fff973c6d5eaf Mon Sep 17 00:00:00 2001 From: 18209 Date: Mon, 19 Jan 2026 14:10:52 +0800 Subject: [PATCH 7/8] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- logSet.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/logSet.go b/logSet.go index 6ef5b31..6fa57c2 100644 --- a/logSet.go +++ b/logSet.go @@ -12,8 +12,8 @@ func logInitial() { multiWriter := io.MultiWriter(os.Stdout, &lumberjack.Logger{ Filename: "./logs/logInfo.log", - MaxSize: 1, // megabytes - MaxBackups: 10, + MaxSize: 10, // megabytes + MaxBackups: 100, MaxAge: 30, //days //Compress: true, }) From 488d17078787bf50545a23c488fdbe4c75dfa32f Mon Sep 17 00:00:00 2001 From: 18209 Date: Mon, 19 Jan 2026 15:37:56 +0800 Subject: [PATCH 8/8] =?UTF-8?q?main=20=E5=A2=9E=E5=8A=A0=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 4e7d747..1d5d3ca 100644 --- a/main.go +++ b/main.go @@ -14,8 +14,8 @@ import ( func init() { multiWriter := io.MultiWriter(os.Stdout, &lumberjack.Logger{ Filename: "./logs/logInfo.log", - MaxSize: 1, // megabytes - MaxBackups: 10, + MaxSize: 10, // megabytes + MaxBackups: 100, MaxAge: 30, //days //Compress: true, })