|
|
@ -65,8 +65,7 @@ func (the *consumerAXYThemeToES) inputInitial() error { |
|
|
} |
|
|
} |
|
|
the.InKafka.Initial() |
|
|
the.InKafka.Initial() |
|
|
|
|
|
|
|
|
structIds := the.Info.IoConfig.Out.Es.Index |
|
|
queryStr := the.getESTimeQueryStr() |
|
|
queryStr := the.getESTimeQueryStr(structIds) |
|
|
|
|
|
index := the.Info.IoConfig.Out.Es.Index |
|
|
index := the.Info.IoConfig.Out.Es.Index |
|
|
sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr) |
|
|
sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr) |
|
|
|
|
|
|
|
|
@ -219,8 +218,7 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, |
|
|
} else { |
|
|
} else { |
|
|
acqTime := theme.AcqTime |
|
|
acqTime := theme.AcqTime |
|
|
|
|
|
|
|
|
structIds := the.Info.IoConfig.Out.Es.Index |
|
|
queryStr := the.getESTimeQueryStr() |
|
|
queryStr := the.getESTimeQueryStr(structIds) |
|
|
|
|
|
index := the.Info.IoConfig.Out.Es.Index |
|
|
index := the.Info.IoConfig.Out.Es.Index |
|
|
sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr) |
|
|
sensorIdArray, _ = the.OutEs.SearchThemeDataArray(index, queryStr) |
|
|
|
|
|
|
|
|
@ -234,24 +232,22 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (the *consumerAXYThemeToES) getESTimeQueryStr(structId string) string { |
|
|
func (the *consumerAXYThemeToES) getESTimeQueryStr() string { |
|
|
|
|
|
|
|
|
esQuery := fmt.Sprintf(` |
|
|
esQuery := fmt.Sprintf(` |
|
|
{ |
|
|
{ |
|
|
"query": { |
|
|
"size": 10000, |
|
|
"bool": { |
|
|
"sort": [ |
|
|
"must": [ |
|
|
{ |
|
|
{ |
|
|
"create_time": { |
|
|
"term": { |
|
|
"order": "desc" |
|
|
"structure": { |
|
|
} |
|
|
"value": %s |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
] |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
], |
|
|
|
|
|
"query": { |
|
|
|
|
|
"match_all": {} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
`, structId) |
|
|
`) |
|
|
return esQuery |
|
|
return esQuery |
|
|
} |
|
|
} |
|
|
|