diff --git a/consumers/consumerAXYThemeToES.go b/consumers/consumerAXYThemeToES.go index c5f677d..47ccdfa 100644 --- a/consumers/consumerAXYThemeToES.go +++ b/consumers/consumerAXYThemeToES.go @@ -19,14 +19,16 @@ type consumerAXYThemeToES struct { //数据缓存管道 dataCache chan *models.EsTheme //具体配置 - Info AXYTheme.ConfigFile - InKafka _kafka.KafkaHelper - OutEs dbOperate.ESHelper - infoPg *dbOperate.DBHelper - sinkMap sync.Map - lock sync.Mutex - logTagId int - monitor *monitors.CommonMonitor + Info AXYTheme.ConfigFile + InKafka _kafka.KafkaHelper + OutEs dbOperate.ESHelper + infoPg *dbOperate.DBHelper + sinkMap sync.Map + // 全局缓存:存储所有测点的最新数据 + latestDataMap sync.Map + lock sync.Mutex + logTagId int + monitor *monitors.CommonMonitor } func (the *consumerAXYThemeToES) LoadConfigJson(cfgStr string) { @@ -40,6 +42,7 @@ func (the *consumerAXYThemeToES) LoadConfigJson(cfgStr string) { func (the *consumerAXYThemeToES) Initial(cfg string) error { the.sinkMap = sync.Map{} + the.latestDataMap = sync.Map{} // 初始化全局缓存 the.dataCache = make(chan *models.EsTheme, 1000) the.LoadConfigJson(cfg) @@ -54,6 +57,7 @@ func (the *consumerAXYThemeToES) Initial(cfg string) error { return err } + func (the *consumerAXYThemeToES) inputInitial() error { //数据入口 the.InKafka = _kafka.KafkaHelper{ @@ -68,6 +72,7 @@ func (the *consumerAXYThemeToES) inputInitial() error { the.InKafka.Worker() return nil } + func (the *consumerAXYThemeToES) outputInitial() error { //数据出口 the.OutEs = *dbOperate.NewESHelper( @@ -94,9 +99,14 @@ func (the *consumerAXYThemeToES) toSink() { var themes []models.EsTheme the.lock.Lock() defer the.lock.Unlock() + the.sinkMap.Range(func(key, value any) bool { if v, ok := value.(*models.EsTheme); ok { themes = append(themes, *v) + + // 在写入ES的同时更新全局缓存 + the.updateLatestData(v) + //零时打日志用 if v.Sensor == the.logTagId { bs, _ := json.Marshal(v) @@ -108,16 +118,98 @@ func (the *consumerAXYThemeToES) toSink() { } return true }) + if len(themes) > 0 { index := the.Info.IoConfig.Out.Es.Index - log.Printf("写入es [%s] %d条,%s", index, len(themes), themes) + log.Printf("写入es [%s] %d条", index, len(themes)) the.OutEs.BulkWriteThemes2Es(index, themes) - the.sinkMap.Clear() + the.sinkMap = sync.Map{} // 清空sinkMap + } +} + +// updateLatestData 更新全局缓存中的测点数据 +func (the *consumerAXYThemeToES) updateLatestData(theme *models.EsTheme) { + key := fmt.Sprintf("%d_%d", theme.Structure, theme.Sensor) + the.latestDataMap.Store(key, theme) + + if theme.Sensor == the.logTagId { + log.Printf("更新全局缓存: key=%s, collectTime=%s", key, theme.CollectTime) + } +} + +// getFromLatestData 从全局缓存获取测点数据 +func (the *consumerAXYThemeToES) getFromLatestData(structId, sensorId int) (*models.EsTheme, bool) { + key := fmt.Sprintf("%d_%d", structId, sensorId) + if value, ok := the.latestDataMap.Load(key); ok { + if theme, ok := value.(*models.EsTheme); ok { + if sensorId == the.logTagId { + log.Printf("从全局缓存获取: key=%s, collectTime=%s", key, theme.CollectTime) + } + return theme, true + } } + return nil, false +} + +// loadInitialDataFromES 程序启动时从ES加载所有测点的最新数据到全局缓存 +func (the *consumerAXYThemeToES) loadInitialDataFromES() error { + log.Println("开始从ES加载所有测点的最新数据到全局缓存...") + + // 构建查询所有测点最新数据的ES查询 + // 这里使用聚合查询,按structure和sensor分组,获取每个组的最新collect_time + queryStr := ` +{ + "size": 0, + "aggs": { + "group_by_structure_sensor": { + "composite": { + "size": 1000, + "sources": [ + { "structure": { "terms": { "field": "structure" } } }, + { "sensor": { "terms": { "field": "sensor" } } } + ] + }, + "aggs": { + "latest_doc": { + "top_hits": { + "size": 1, + "sort": [{ "collect_time": { "order": "desc" } }] + } + } + } + } + } +}` + + // 执行查询 + index := the.Info.IoConfig.Out.Es.Index + results, err := the.OutEs.SearchThemeData(index, queryStr) + if err != nil { + log.Printf("从ES加载初始数据失败: %v", err) + return err + } + + // 将查询结果存入全局缓存 + count := 0 + for _, theme := range results { + the.updateLatestData(&theme) + count++ + } + + log.Printf("从ES加载初始数据完成,共加载 %d 个测点的数据", count) + return nil } func (the *consumerAXYThemeToES) Work() { log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId) + + // 程序启动时从ES加载所有测点的最新数据 + go func() { + if err := the.loadInitialDataFromES(); err != nil { + log.Printf("加载初始数据失败,将使用懒加载模式: %v", err) + } + }() + go the.sinkTask() go func() { for { @@ -133,40 +225,46 @@ func (the *consumerAXYThemeToES) Work() { the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme) the.lock.Unlock() } - }() } + func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { - //if len(msg) > 80 { - // log.Printf("recv:[%s]:%s ...", topic, msg[:80]) - //} adaptor := adaptors.Adaptor_Anxinyun_LastTheme{} + // 首先解析消息检查时间格式 + theme := models.AXYSavoirTheme{} + if err := json.Unmarshal([]byte(msg), &theme); err == nil { + // 检查并转换时间为北京时间 + if len(theme.AcqTime) > 0 && theme.AcqTime[len(theme.AcqTime)-1] == 'Z' { + beijingTimeStr, err := convertUTCToBeijing(theme.AcqTime) + if err == nil { + // 更新消息中的时间为北京时间 + theme.AcqTime = beijingTimeStr + // 重新序列化消息 + if newMsg, err := json.Marshal(theme); err == nil { + msg = string(newMsg) + } + } + } + } + esAtime, newAtime, judgeErr := the.judgeTime(msg) judgeBool := false - /*esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000-0700", esTimeStr) - if esErr != nil { - log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", esTimeStr, esErr) - } - esAtime := esAtime1.Add(8 * time.Hour) // 转为北京时间 - - newAtime, newErr := time.Parse("2006-01-02T15:04:05.000-0700", newTimeStr) - if newErr != nil { - log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", newTimeStr, newErr) - }*/ - // 只有在两个时间解析成功时,且new时间在es时间之后,或者相等的情况下,才进行推送 if judgeErr == nil && (newAtime.After(esAtime) || newAtime.Equal(esAtime)) { judgeBool = true } if judgeBool { - needPush := adaptor.Transform(topic, msg) if needPush != nil && needPush.Data != nil { + // === 修改:在数据通过时效性验证后,立即更新全局缓存 === + the.updateLatestData(needPush) + // === 结束 === + the.dataCache <- needPush } else { s, _ := json.Marshal(needPush) @@ -182,6 +280,22 @@ func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { return true } + +// convertUTCToBeijing 将UTC时间转换为北京时间(+8小时) +func convertUTCToBeijing(utcTimeStr string) (string, error) { + // 解析UTC时间 + utcTime, err := time.Parse("2006-01-02T15:04:05.000Z", utcTimeStr) + if err != nil { + return "", err + } + + // 加8小时转换为北京时间 + beijingTime := utcTime.Add(8 * time.Hour) + + // 格式化为北京时间字符串 + return beijingTime.Format("2006-01-02T15:04:05.000+0800"), nil +} + func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, error) { theme := models.AXYSavoirTheme{} err := json.Unmarshal([]byte(rawMsg), &theme) @@ -190,13 +304,61 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, return time.Time{}, time.Time{}, err } - queryStr := the.getESTimeQueryStr(theme.Station.Structure.Id, theme.Station.Id) - TimeTheme, err := the.OutEs.SearchThemeData("anxincloud_last_theme", queryStr) - //如果es里面没有这个数据时间,呢就返回测点的时间 + structId := theme.Station.Structure.Id + sensorId := theme.Station.Id + + // 1. 首先尝试从全局缓存获取数据 + cachedTheme, found := the.getFromLatestData(structId, sensorId) + if found { + // 使用缓存中的数据时间 + cTime := cachedTheme.CollectTime + acqTime := theme.AcqTime + + log.Printf("从缓存获取测点[%d_%d]数据时间: esTime=%s, newTime=%s", + structId, sensorId, cTime, acqTime) + + esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000Z", cTime) + if esErr != nil { + log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", cTime, esErr) + return time.Time{}, time.Time{}, esErr + } + esAtime := esAtime1.Add(8 * time.Hour) // 转为北京时间 + + newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", acqTime) + if newErr != nil { + log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) + return time.Time{}, time.Time{}, newErr + } + + // === 修改:在判断通过后立即更新缓存(即使稍后会在onData中用完整对象覆盖)=== + if newAtime.After(esAtime) || newAtime.Equal(esAtime) { + // 构建一个临时对象用于缓存时间信息 + tempTheme := &models.EsTheme{ + Sensor: sensorId, + Structure: structId, + CollectTime: acqTime, // 注意:这里使用原始字符串,格式需与缓存读取时一致 + } + the.updateLatestData(tempTheme) + log.Printf("测点[%d_%d] 缓存时效检查通过,已立即更新全局缓存时间为: %s", structId, sensorId, acqTime) + } + // === 结束 === + + return esAtime, newAtime, nil + } + + // 2. 如果缓存中没有,则从ES查询 + log.Printf("缓存中未找到测点[%d_%d],从ES查询", structId, sensorId) + queryStr := the.getESTimeQueryStr(structId, sensorId) + index := the.Info.IoConfig.Out.Es.Index + TimeTheme, err := the.OutEs.SearchThemeData(index, queryStr) + + // 如果es里面没有这个数据时间,就返回测点的时间 if len(TimeTheme) > 0 { cTime := TimeTheme[0].CollectTime acqTime := theme.AcqTime - log.Printf("判断 esTimeStr:%s,newTimeStr:%s", cTime, acqTime) + log.Printf("从ES获取测点[%d_%d]数据时间: esTime=%s, newTime=%s", + structId, sensorId, cTime, acqTime) + esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000Z", cTime) if esErr != nil { log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", cTime, esErr) @@ -209,21 +371,52 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) return time.Time{}, time.Time{}, newErr } + + // 将从ES查询到的数据存入缓存 + if len(TimeTheme) > 0 { + the.updateLatestData(&TimeTheme[0]) + } + + // === 修改:在判断通过后立即更新缓存(即使稍后会在onData中用完整对象覆盖)=== + if newAtime.After(esAtime) || newAtime.Equal(esAtime) { + // 构建一个临时对象用于缓存时间信息 + tempTheme := &models.EsTheme{ + Sensor: sensorId, + Structure: structId, + CollectTime: acqTime, // 注意:这里使用原始字符串,格式需与缓存读取时一致 + } + the.updateLatestData(tempTheme) + log.Printf("测点[%d_%d] ES时效检查通过,已立即更新全局缓存时间为: %s", structId, sensorId, acqTime) + } + // === 结束 === + return esAtime, newAtime, nil } else { + // ES中没有数据,使用新数据的时间 acqTime := theme.AcqTime - log.Printf("esTime 为空, 新时间:%s", acqTime) + log.Printf("ES中无测点[%d_%d]数据,使用新时间:%s", structId, sensorId, acqTime) + newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", acqTime) if newErr != nil { log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) return time.Time{}, time.Time{}, newErr } + + // === 修改:ES中没有数据时,也要更新缓存 === + tempTheme := &models.EsTheme{ + Sensor: sensorId, + Structure: structId, + CollectTime: acqTime, + } + the.updateLatestData(tempTheme) + log.Printf("测点[%d_%d] ES中无数据,已更新全局缓存时间为: %s", structId, sensorId, acqTime) + // === 结束 === + return newAtime, newAtime, nil } } func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) string { - esQuery := fmt.Sprintf(` { "query": { @@ -245,7 +438,15 @@ func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) strin } ] } - } + }, + "sort": [ + { + "collect_time": { + "order": "desc" + } + } + ], + "size": 1 } `, structId, sensorId) return esQuery diff --git a/dbOperate/elasticsearchHelper.go b/dbOperate/elasticsearchHelper.go index 99aa67c..75f413d 100644 --- a/dbOperate/elasticsearchHelper.go +++ b/dbOperate/elasticsearchHelper.go @@ -10,6 +10,7 @@ import ( "goInOut/models" "io" "log" + "strconv" "strings" ) @@ -218,6 +219,30 @@ func (the *ESHelper) SearchThemeData(index string, queryBody string) ([]models.E return themes, err } +func (the *ESHelper) SearchThemeDataArray(index string, queryBody string) (map[int]models.EsTheme, error) { + + result := make(map[int]models.EsTheme) + themesResp, err := the.searchThemes(index, queryBody) + if err != nil { + return result, err + } + + for _, hit := range themesResp.Hits.Hits { + // 将字符串ID转换为整数 + id, err := strconv.Atoi(hit.Id) + if err != nil { + // 如果ID不是数字,可以选择跳过或使用其他逻辑 + log.Printf("警告: 无法将ID %s 转换为整数: %v", hit.Id, err) + continue + } + + // 将Source存入map,以Id作为key + result[id] = hit.Source + } + + return result, nil +} + func (the *ESHelper) SearchAlarmThemeData(index string, queryBody string) ([]models.EsAlarmTheme, error) { var sensors []models.EsAlarmTheme themesResp, err := the.searchAlarmThemes(index, queryBody) diff --git a/logSet.go b/logSet.go index 6ef5b31..6fa57c2 100644 --- a/logSet.go +++ b/logSet.go @@ -12,8 +12,8 @@ func logInitial() { multiWriter := io.MultiWriter(os.Stdout, &lumberjack.Logger{ Filename: "./logs/logInfo.log", - MaxSize: 1, // megabytes - MaxBackups: 10, + MaxSize: 10, // megabytes + MaxBackups: 100, MaxAge: 30, //days //Compress: true, }) diff --git a/main.go b/main.go index 4e7d747..1d5d3ca 100644 --- a/main.go +++ b/main.go @@ -14,8 +14,8 @@ import ( func init() { multiWriter := io.MultiWriter(os.Stdout, &lumberjack.Logger{ Filename: "./logs/logInfo.log", - MaxSize: 1, // megabytes - MaxBackups: 10, + MaxSize: 10, // megabytes + MaxBackups: 100, MaxAge: 30, //days //Compress: true, })