package consumers import ( "encoding/json" "fmt" "goInOut/adaptors" "goInOut/consumers/AXYTheme" "goInOut/dbOperate" "goInOut/dbOperate/_kafka" "goInOut/models" "goInOut/monitors" "gopkg.in/yaml.v3" "log" "sync" "time" ) type consumerAXYThemeToES struct { //数据缓存管道 dataCache chan *models.EsTheme //具体配置 Info AXYTheme.ConfigFile InKafka _kafka.KafkaHelper OutEs dbOperate.ESHelper infoPg *dbOperate.DBHelper sinkMap sync.Map // 全局缓存:存储所有测点的最新数据 latestDataMap sync.Map lock sync.Mutex logTagId int monitor *monitors.CommonMonitor } func (the *consumerAXYThemeToES) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := yaml.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerAXYThemeToES) Initial(cfg string) error { the.sinkMap = sync.Map{} the.latestDataMap = sync.Map{} // 初始化全局缓存 the.dataCache = make(chan *models.EsTheme, 1000) the.LoadConfigJson(cfg) err := the.inputInitial() if err != nil { return err } err = the.outputInitial() if err != nil { return err } return err } func (the *consumerAXYThemeToES) inputInitial() error { //数据入口 the.InKafka = _kafka.KafkaHelper{ Brokers: the.Info.IoConfig.In.Kafka.Brokers, GroupId: the.Info.IoConfig.In.Kafka.GroupId, } the.InKafka.Initial() for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics { the.InKafka.Subscribe(inTopic, the.onData) } the.InKafka.Worker() return nil } func (the *consumerAXYThemeToES) outputInitial() error { //数据出口 the.OutEs = *dbOperate.NewESHelper( the.Info.IoConfig.Out.Es.Address, the.Info.IoConfig.Out.Es.Auth.UserName, the.Info.IoConfig.Out.Es.Auth.Password, ) return nil } func (the *consumerAXYThemeToES) sinkTask() { intervalSec := the.Info.IoConfig.Out.Es.Interval log.Printf("读取配置文件intervalSec[%d]", intervalSec) ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) defer ticker.Stop() for { <-ticker.C the.toSink() } } func (the *consumerAXYThemeToES) toSink() { var themes []models.EsTheme the.lock.Lock() defer the.lock.Unlock() the.sinkMap.Range(func(key, value any) bool { if v, ok := value.(*models.EsTheme); ok { themes = append(themes, *v) // 在写入ES的同时更新全局缓存 the.updateLatestData(v) //零时打日志用 if v.Sensor == the.logTagId { bs, _ := json.Marshal(v) log.Printf("toSink -> Range 标记测点数据 [%d] %s ", the.logTagId, string(bs)) } return ok } else { log.Printf("!!! toSink -> Range 类型转换异常 [%v]", key) } return true }) if len(themes) > 0 { index := the.Info.IoConfig.Out.Es.Index log.Printf("写入es [%s] %d条", index, len(themes)) the.OutEs.BulkWriteThemes2Es(index, themes) the.sinkMap = sync.Map{} // 清空sinkMap } } // updateLatestData 更新全局缓存中的测点数据 func (the *consumerAXYThemeToES) updateLatestData(theme *models.EsTheme) { key := fmt.Sprintf("%d_%d", theme.Structure, theme.Sensor) the.latestDataMap.Store(key, theme) if theme.Sensor == the.logTagId { log.Printf("更新全局缓存: key=%s, collectTime=%s", key, theme.CollectTime) } } // getFromLatestData 从全局缓存获取测点数据 func (the *consumerAXYThemeToES) getFromLatestData(structId, sensorId int) (*models.EsTheme, bool) { key := fmt.Sprintf("%d_%d", structId, sensorId) if value, ok := the.latestDataMap.Load(key); ok { if theme, ok := value.(*models.EsTheme); ok { if sensorId == the.logTagId { log.Printf("从全局缓存获取: key=%s, collectTime=%s", key, theme.CollectTime) } return theme, true } } return nil, false } // loadInitialDataFromES 程序启动时从ES加载所有测点的最新数据到全局缓存 func (the *consumerAXYThemeToES) loadInitialDataFromES() error { log.Println("开始从ES加载所有测点的最新数据到全局缓存...") // 构建查询所有测点最新数据的ES查询 // 这里使用聚合查询,按structure和sensor分组,获取每个组的最新collect_time queryStr := ` { "size": 0, "aggs": { "group_by_structure_sensor": { "composite": { "size": 1000, "sources": [ { "structure": { "terms": { "field": "structure" } } }, { "sensor": { "terms": { "field": "sensor" } } } ] }, "aggs": { "latest_doc": { "top_hits": { "size": 1, "sort": [{ "collect_time": { "order": "desc" } }] } } } } } }` // 执行查询 index := the.Info.IoConfig.Out.Es.Index results, err := the.OutEs.SearchThemeData(index, queryStr) if err != nil { log.Printf("从ES加载初始数据失败: %v", err) return err } // 将查询结果存入全局缓存 count := 0 for _, theme := range results { the.updateLatestData(&theme) count++ } log.Printf("从ES加载初始数据完成,共加载 %d 个测点的数据", count) return nil } func (the *consumerAXYThemeToES) Work() { log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId) // 程序启动时从ES加载所有测点的最新数据 go func() { if err := the.loadInitialDataFromES(); err != nil { log.Printf("加载初始数据失败,将使用懒加载模式: %v", err) } }() go the.sinkTask() go func() { for { pushEsTheme := <-the.dataCache if pushEsTheme.Sensor == the.logTagId { bs, _ := json.Marshal(pushEsTheme) log.Printf("存储 标记测点数据 [%d] %s ", the.logTagId, string(bs)) } //有效数据存入缓存 the.lock.Lock() the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme) the.lock.Unlock() } }() } func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { adaptor := adaptors.Adaptor_Anxinyun_LastTheme{} // 首先解析消息检查时间格式 theme := models.AXYSavoirTheme{} if err := json.Unmarshal([]byte(msg), &theme); err == nil { // 检查并转换时间为北京时间 if len(theme.AcqTime) > 0 && theme.AcqTime[len(theme.AcqTime)-1] == 'Z' { beijingTimeStr, err := convertUTCToBeijing(theme.AcqTime) if err == nil { // 更新消息中的时间为北京时间 theme.AcqTime = beijingTimeStr // 重新序列化消息 if newMsg, err := json.Marshal(theme); err == nil { msg = string(newMsg) } } } } esAtime, newAtime, judgeErr := the.judgeTime(msg) judgeBool := false // 只有在两个时间解析成功时,且new时间在es时间之后,或者相等的情况下,才进行推送 if judgeErr == nil && (newAtime.After(esAtime) || newAtime.Equal(esAtime)) { judgeBool = true } if judgeBool { needPush := adaptor.Transform(topic, msg) if needPush != nil && needPush.Data != nil { // === 修改:在数据通过时效性验证后,立即更新全局缓存 === the.updateLatestData(needPush) // === 结束 === the.dataCache <- needPush } else { s, _ := json.Marshal(needPush) if needPush != nil { log.Printf("onData 测点[%d] needPush= %s", needPush.Sensor, s) if needPush.Sensor == the.logTagId { log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s) } } } } return true } // convertUTCToBeijing 将UTC时间转换为北京时间(+8小时) func convertUTCToBeijing(utcTimeStr string) (string, error) { // 解析UTC时间 utcTime, err := time.Parse("2006-01-02T15:04:05.000Z", utcTimeStr) if err != nil { return "", err } // 加8小时转换为北京时间 beijingTime := utcTime.Add(8 * time.Hour) // 格式化为北京时间字符串 return beijingTime.Format("2006-01-02T15:04:05.000+0800"), nil } func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, error) { theme := models.AXYSavoirTheme{} err := json.Unmarshal([]byte(rawMsg), &theme) if err != nil { log.Printf("反序列化 异常 dev数据=%s", rawMsg) return time.Time{}, time.Time{}, err } structId := theme.Station.Structure.Id sensorId := theme.Station.Id // 1. 首先尝试从全局缓存获取数据 cachedTheme, found := the.getFromLatestData(structId, sensorId) if found { // 使用缓存中的数据时间 cTime := cachedTheme.CollectTime acqTime := theme.AcqTime log.Printf("从缓存获取测点[%d_%d]数据时间: esTime=%s, newTime=%s", structId, sensorId, cTime, acqTime) esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000Z", cTime) if esErr != nil { log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", cTime, esErr) return time.Time{}, time.Time{}, esErr } esAtime := esAtime1.Add(8 * time.Hour) // 转为北京时间 newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", acqTime) if newErr != nil { log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) return time.Time{}, time.Time{}, newErr } // === 修改:在判断通过后立即更新缓存(即使稍后会在onData中用完整对象覆盖)=== if newAtime.After(esAtime) || newAtime.Equal(esAtime) { // 构建一个临时对象用于缓存时间信息 tempTheme := &models.EsTheme{ Sensor: sensorId, Structure: structId, CollectTime: acqTime, // 注意:这里使用原始字符串,格式需与缓存读取时一致 } the.updateLatestData(tempTheme) log.Printf("测点[%d_%d] 缓存时效检查通过,已立即更新全局缓存时间为: %s", structId, sensorId, acqTime) } // === 结束 === return esAtime, newAtime, nil } // 2. 如果缓存中没有,则从ES查询 log.Printf("缓存中未找到测点[%d_%d],从ES查询", structId, sensorId) queryStr := the.getESTimeQueryStr(structId, sensorId) index := the.Info.IoConfig.Out.Es.Index TimeTheme, err := the.OutEs.SearchThemeData(index, queryStr) // 如果es里面没有这个数据时间,就返回测点的时间 if len(TimeTheme) > 0 { cTime := TimeTheme[0].CollectTime acqTime := theme.AcqTime log.Printf("从ES获取测点[%d_%d]数据时间: esTime=%s, newTime=%s", structId, sensorId, cTime, acqTime) esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000Z", cTime) if esErr != nil { log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", cTime, esErr) return time.Time{}, time.Time{}, esErr } esAtime := esAtime1.Add(8 * time.Hour) // 转为北京时间 newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", acqTime) if newErr != nil { log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) return time.Time{}, time.Time{}, newErr } // 将从ES查询到的数据存入缓存 if len(TimeTheme) > 0 { the.updateLatestData(&TimeTheme[0]) } // === 修改:在判断通过后立即更新缓存(即使稍后会在onData中用完整对象覆盖)=== if newAtime.After(esAtime) || newAtime.Equal(esAtime) { // 构建一个临时对象用于缓存时间信息 tempTheme := &models.EsTheme{ Sensor: sensorId, Structure: structId, CollectTime: acqTime, // 注意:这里使用原始字符串,格式需与缓存读取时一致 } the.updateLatestData(tempTheme) log.Printf("测点[%d_%d] ES时效检查通过,已立即更新全局缓存时间为: %s", structId, sensorId, acqTime) } // === 结束 === return esAtime, newAtime, nil } else { // ES中没有数据,使用新数据的时间 acqTime := theme.AcqTime log.Printf("ES中无测点[%d_%d]数据,使用新时间:%s", structId, sensorId, acqTime) newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", acqTime) if newErr != nil { log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) return time.Time{}, time.Time{}, newErr } // === 修改:ES中没有数据时,也要更新缓存 === tempTheme := &models.EsTheme{ Sensor: sensorId, Structure: structId, CollectTime: acqTime, } the.updateLatestData(tempTheme) log.Printf("测点[%d_%d] ES中无数据,已更新全局缓存时间为: %s", structId, sensorId, acqTime) // === 结束 === return newAtime, newAtime, nil } } func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) string { esQuery := fmt.Sprintf(` { "query": { "bool": { "must": [ { "term": { "structure": { "value": %d } } }, { "term": { "sensor": { "value": %d } } } ] } }, "sort": [ { "collect_time": { "order": "desc" } } ], "size": 1 } `, structId, sensorId) return esQuery }