数据 输入输出 处理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

269 lines
6.7 KiB

package dbHelper
import (
"bytes"
"context"
"encoding/json"
"fmt"
elasticsearch6 "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"goInOut/models"
"io"
"log"
"strings"
)
type ESHelper struct {
addresses []string
//org string
esClient *elasticsearch6.Client
}
func NewESHelper(addresses []string, user, pwd string) *ESHelper {
es, _ := elasticsearch6.NewClient(elasticsearch6.Config{
Addresses: addresses,
Username: user,
Password: pwd,
})
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
log.Printf("链接到es[%s]info=%v", elasticsearch6.Version, res)
return &ESHelper{
addresses: addresses,
esClient: es,
}
}
func (the *ESHelper) SearchRaw(index, reqBody string) []models.HitRaw {
body := &bytes.Buffer{}
body.WriteString(reqBody)
response, err := the.esClient.Search(
the.esClient.Search.WithIndex(index),
the.esClient.Search.WithBody(body),
)
defer response.Body.Close()
if err != nil {
return nil
}
r := models.EsRawResp{}
// Deserialize the response into a map.
if err := json.NewDecoder(response.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
return r.Hits.Hits
}
func (the *ESHelper) Search(index, reqBody string) {
body := &bytes.Buffer{}
body.WriteString(reqBody)
response, err := the.esClient.Search(
the.esClient.Search.WithIndex(index),
the.esClient.Search.WithBody(body),
)
if err != nil {
//return nil, err
}
log.Println(response.Status())
var r map[string]any
// Deserialize the response into a map.
if err := json.NewDecoder(response.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
// Print the response status, number of results, and request duration.
log.Printf(
"[%s] %d hits; took: %dms",
response.Status(),
int(r["hits"].(map[string]any)["total"].(float64)),
int(r["took"].(float64)),
)
for _, hit := range r["hits"].(map[string]any)["hits"].([]any) {
source := hit.(map[string]any)["_source"]
log.Printf(" * ID=%s, %s", hit.(map[string]any)["_id"], source)
}
log.Println(strings.Repeat("=", 37))
}
func (the *ESHelper) request(index, reqBody string) (map[string]any, error) {
// Set up the request object.
req := esapi.IndexRequest{
Index: index,
//DocumentID: strconv.Itoa(i + 1),
Body: strings.NewReader(reqBody),
Refresh: "true",
}
// Perform the request with the client.
res, err := req.Do(context.Background(), the.esClient)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
var r map[string]any
if res.IsError() {
log.Printf("[%s] Error indexing document ID=%d", res.Status(), 0)
} else {
// Deserialize the response into a map.
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Printf("Error parsing the response body: %s", err)
} else {
// Print the response status and indexed document version.
log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
}
}
return r, err
}
func (the *ESHelper) searchRaw(index, reqBody string) (models.IotaData, error) {
respmap, err := the.request(index, reqBody)
if respmap != nil {
}
iotaDatas := models.IotaData{}
return iotaDatas, err
}
func (the *ESHelper) searchThemes(index, reqBody string) (models.EsThemeResp, error) {
body := &bytes.Buffer{}
body.WriteString(reqBody)
response, err := the.esClient.Search(
the.esClient.Search.WithIndex(index),
the.esClient.Search.WithBody(body),
)
defer response.Body.Close()
if err != nil {
//return nil, err
}
log.Println(response.Status())
r := models.EsThemeResp{}
// Deserialize the response into a map.
if err := json.NewDecoder(response.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
return r, err
}
func (the *ESHelper) SearchLatestStationData(index string, sensorId int) (models.EsTheme, error) {
//sensorId := 178
queryBody := fmt.Sprintf(`{
"size": 1,
"query": {
"term": {
"sensor": {
"value": %d
}
}
},
"sort": [
{
"collect_time": {
"order": "desc"
}
}
]
}`, sensorId)
//index := "go_native_themes"
themes, err := the.searchThemes(index, queryBody)
var theme models.EsTheme
if len(themes.Hits.Hits) > 0 {
theme = themes.Hits.Hits[0].Source
}
return theme, err
}
func (the *ESHelper) BulkWrite(index, reqBody string) {
body := &bytes.Buffer{}
body.WriteString(reqBody)
bulkRequest := esapi.BulkRequest{
Index: index,
Body: body,
DocumentType: "_doc",
}
res, err := bulkRequest.Do(context.Background(), the.esClient)
defer res.Body.Close()
if err != nil {
log.Panicf("es 写入[%s],err=%s", index, err.Error())
return
}
respBody, _ := io.ReadAll(res.Body)
if res.StatusCode != 200 && res.StatusCode != 201 {
log.Panicf("es 写入失败,err=%s \n body=%s", string(respBody), reqBody)
}
//log.Printf("es 写入[%s],完成,res=%s ", index, respBody)
}
func (the *ESHelper) BulkWriteWithLog(index, reqBody string) {
body := &bytes.Buffer{}
body.WriteString(reqBody)
bulkRequest := esapi.BulkRequest{
Index: index,
Body: body,
DocumentType: "_doc",
}
res, err := bulkRequest.Do(context.Background(), the.esClient)
defer res.Body.Close()
if err != nil {
log.Panicf("es 写入[%s],err=%s", index, err.Error())
return
}
respBody, _ := io.ReadAll(res.Body)
if res.StatusCode != 200 && res.StatusCode != 201 {
log.Panicf("es 写入失败,err=%s \n body=%s", string(respBody), reqBody)
}
log.Printf("es 写入[%s],完成,res=%s ", index, respBody)
}
func (the *ESHelper) BulkWriteRaws2Es(index string, raws []models.EsRaw) {
//log 测试用
const logTagDeviceId = "91da4d1f-fbc7-4dad-bedd-f8ff05c0e0e0"
logTag := false
body := strings.Builder{}
for _, raw := range raws {
// scala => val id = UUID.nameUUIDFromBytes(s"${v.deviceId}-${v.acqTime.getMillis}".getBytes("UTF-8")).toString
source, _ := json.Marshal(raw)
_id := raw.IotaDevice
s := fmt.Sprintf(
`{"index": {"_index": "%s","_id": "%s"}}
%s
`, index, _id, source)
body.WriteString(s)
if raw.IotaDevice == logTagDeviceId {
log.Printf("BulkWriteRaws2Es 标记设备数据 [%s] %s ", logTagDeviceId, string(s))
logTag = true
}
}
if logTag { //追踪数据
the.BulkWriteWithLog(index, body.String())
} else {
the.BulkWrite(index, body.String())
}
}
func (the *ESHelper) BulkWriteRaws2EsLast(index string, raws []models.EsRaw) {
body := strings.Builder{}
for _, raw := range raws {
source, _ := json.Marshal(raw)
_id := raw.IotaDevice
s := fmt.Sprintf(
`{"index": {"_index": "%s","_id": "%s"}}
%s
`, index, _id, source)
body.WriteString(s)
}
the.BulkWrite(index, body.String())
}
func (the *ESHelper) Close() {
}