Browse Source

优化测点最后一条数据查询es的操作

dev
18209 3 days ago
parent
commit
55031a05b3
  1. 36
      consumers/consumerAXYThemeToES.go
  2. 25
      dbOperate/elasticsearchHelper.go

36
consumers/consumerAXYThemeToES.go

@ -29,6 +29,8 @@ type consumerAXYThemeToES struct {
monitor *monitors.CommonMonitor
}
var sensorIdArray map[int]models.EsTheme
func (the *consumerAXYThemeToES) LoadConfigJson(cfgStr string) {
// 将 JSON 格式的数据解析到结构体中
err := yaml.Unmarshal([]byte(cfgStr), &the.Info)
@ -54,6 +56,7 @@ func (the *consumerAXYThemeToES) Initial(cfg string) error {
return err
}
func (the *consumerAXYThemeToES) inputInitial() error {
//数据入口
the.InKafka = _kafka.KafkaHelper{
@ -61,6 +64,12 @@ func (the *consumerAXYThemeToES) inputInitial() error {
GroupId: the.Info.IoConfig.In.Kafka.GroupId,
}
the.InKafka.Initial()
structIds := the.Info.IoConfig.Out.Es.Index
queryStr := the.getESTimeQueryStr(structIds)
index := the.Info.IoConfig.Out.Es.Index
sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr)
for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics {
the.InKafka.Subscribe(inTopic, the.onData)
}
@ -190,12 +199,8 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time,
return time.Time{}, time.Time{}, err
}
queryStr := the.getESTimeQueryStr(theme.Station.Structure.Id, theme.Station.Id)
index := the.Info.IoConfig.Out.Es.Index
TimeTheme, err := the.OutEs.SearchThemeData(index, queryStr)
//如果es里面没有这个数据时间,呢就返回测点的时间
if len(TimeTheme) > 0 {
cTime := TimeTheme[0].CollectTime
if _, exists := sensorIdArray[theme.Station.Id]; exists {
cTime := sensorIdArray[theme.Station.Id].CollectTime
acqTime := theme.AcqTime
log.Printf("判断 esTimeStr:%s,newTimeStr:%s", cTime, acqTime)
esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000Z", cTime)
@ -213,6 +218,12 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time,
return esAtime, newAtime, nil
} else {
acqTime := theme.AcqTime
structIds := the.Info.IoConfig.Out.Es.Index
queryStr := the.getESTimeQueryStr(structIds)
index := the.Info.IoConfig.Out.Es.Index
sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr)
log.Printf("esTime 为空, 新时间:%s", acqTime)
newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", acqTime)
if newErr != nil {
@ -223,7 +234,7 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time,
}
}
func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) string {
func (the *consumerAXYThemeToES) getESTimeQueryStr(structId string) string {
esQuery := fmt.Sprintf(`
{
@ -233,14 +244,7 @@ func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) strin
{
"term": {
"structure": {
"value": %d
}
}
},
{
"term": {
"sensor": {
"value": %d
"value": %s
}
}
}
@ -248,6 +252,6 @@ func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) strin
}
}
}
`, structId, sensorId)
`, structId)
return esQuery
}

25
dbOperate/elasticsearchHelper.go

@ -10,6 +10,7 @@ import (
"goInOut/models"
"io"
"log"
"strconv"
"strings"
)
@ -218,6 +219,30 @@ func (the *ESHelper) SearchThemeData(index string, queryBody string) ([]models.E
return themes, err
}
func (the *ESHelper) SearchThemeDataArray(index string, queryBody string) (map[int]models.EsTheme, error) {
result := make(map[int]models.EsTheme)
themesResp, err := the.searchThemes(index, queryBody)
if err != nil {
return result, err
}
for _, hit := range themesResp.Hits.Hits {
// 将字符串ID转换为整数
id, err := strconv.Atoi(hit.Id)
if err != nil {
// 如果ID不是数字,可以选择跳过或使用其他逻辑
log.Printf("警告: 无法将ID %s 转换为整数: %v", hit.Id, err)
continue
}
// 将Source存入map,以Id作为key
result[id] = hit.Source
}
return result, nil
}
func (the *ESHelper) SearchAlarmThemeData(index string, queryBody string) ([]models.EsAlarmTheme, error) {
var sensors []models.EsAlarmTheme
themesResp, err := the.searchAlarmThemes(index, queryBody)

Loading…
Cancel
Save