diff --git a/consumers/consumerAXYThemeToES.go b/consumers/consumerAXYThemeToES.go index 02c8356..2534f86 100644 --- a/consumers/consumerAXYThemeToES.go +++ b/consumers/consumerAXYThemeToES.go @@ -29,6 +29,8 @@ type consumerAXYThemeToES struct { monitor *monitors.CommonMonitor } +var sensorIdArray map[int]models.EsTheme + func (the *consumerAXYThemeToES) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := yaml.Unmarshal([]byte(cfgStr), &the.Info) @@ -54,6 +56,7 @@ func (the *consumerAXYThemeToES) Initial(cfg string) error { return err } + func (the *consumerAXYThemeToES) inputInitial() error { //数据入口 the.InKafka = _kafka.KafkaHelper{ @@ -61,6 +64,12 @@ func (the *consumerAXYThemeToES) inputInitial() error { GroupId: the.Info.IoConfig.In.Kafka.GroupId, } the.InKafka.Initial() + + structIds := the.Info.IoConfig.Out.Es.Index + queryStr := the.getESTimeQueryStr(structIds) + index := the.Info.IoConfig.Out.Es.Index + sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr) + for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics { the.InKafka.Subscribe(inTopic, the.onData) } @@ -190,12 +199,8 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, return time.Time{}, time.Time{}, err } - queryStr := the.getESTimeQueryStr(theme.Station.Structure.Id, theme.Station.Id) - index := the.Info.IoConfig.Out.Es.Index - TimeTheme, err := the.OutEs.SearchThemeData(index, queryStr) - //如果es里面没有这个数据时间,呢就返回测点的时间 - if len(TimeTheme) > 0 { - cTime := TimeTheme[0].CollectTime + if _, exists := sensorIdArray[theme.Station.Id]; exists { + cTime := sensorIdArray[theme.Station.Id].CollectTime acqTime := theme.AcqTime log.Printf("判断 esTimeStr:%s,newTimeStr:%s", cTime, acqTime) esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000Z", cTime) @@ -213,6 +218,12 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, return esAtime, newAtime, nil } else { acqTime := theme.AcqTime + + structIds := the.Info.IoConfig.Out.Es.Index + queryStr := the.getESTimeQueryStr(structIds) + index := the.Info.IoConfig.Out.Es.Index + sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr) + log.Printf("esTime 为空, 新时间:%s", acqTime) newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", acqTime) if newErr != nil { @@ -223,7 +234,7 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, } } -func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) string { +func (the *consumerAXYThemeToES) getESTimeQueryStr(structId string) string { esQuery := fmt.Sprintf(` { @@ -233,14 +244,7 @@ func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) strin { "term": { "structure": { - "value": %d - } - } - }, - { - "term": { - "sensor": { - "value": %d + "value": %s } } } @@ -248,6 +252,6 @@ func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) strin } } } -`, structId, sensorId) +`, structId) return esQuery } diff --git a/dbOperate/elasticsearchHelper.go b/dbOperate/elasticsearchHelper.go index 99aa67c..75f413d 100644 --- a/dbOperate/elasticsearchHelper.go +++ b/dbOperate/elasticsearchHelper.go @@ -10,6 +10,7 @@ import ( "goInOut/models" "io" "log" + "strconv" "strings" ) @@ -218,6 +219,30 @@ func (the *ESHelper) SearchThemeData(index string, queryBody string) ([]models.E return themes, err } +func (the *ESHelper) SearchThemeDataArray(index string, queryBody string) (map[int]models.EsTheme, error) { + + result := make(map[int]models.EsTheme) + themesResp, err := the.searchThemes(index, queryBody) + if err != nil { + return result, err + } + + for _, hit := range themesResp.Hits.Hits { + // 将字符串ID转换为整数 + id, err := strconv.Atoi(hit.Id) + if err != nil { + // 如果ID不是数字,可以选择跳过或使用其他逻辑 + log.Printf("警告: 无法将ID %s 转换为整数: %v", hit.Id, err) + continue + } + + // 将Source存入map,以Id作为key + result[id] = hit.Source + } + + return result, nil +} + func (the *ESHelper) SearchAlarmThemeData(index string, queryBody string) ([]models.EsAlarmTheme, error) { var sensors []models.EsAlarmTheme themesResp, err := the.searchAlarmThemes(index, queryBody)