package dbHelper import ( "bytes" "context" "encoding/json" "fmt" "gitea.anxinyun.cn/container/common_calc" "gitea.anxinyun.cn/container/common_models" elasticsearch6 "github.com/elastic/go-elasticsearch/v6" "github.com/elastic/go-elasticsearch/v6/esapi" "io" "log" "strings" ) type ESHelper struct { addresses []string //org string esClient *elasticsearch6.Client } func NewESHelper(addresses []string, user, pwd string) *ESHelper { es, _ := elasticsearch6.NewClient(elasticsearch6.Config{ Addresses: addresses, Username: user, Password: pwd, }) res, err := es.Info() if err != nil { log.Fatalf("Error getting response: %s", err) } log.Printf("链接到es[%s]info=%v", elasticsearch6.Version, res) return &ESHelper{ addresses: addresses, esClient: es, } } func (the *ESHelper) SearchRaw(index, reqBody string) []common_models.HitRaw { body := &bytes.Buffer{} body.WriteString(reqBody) response, err := the.esClient.Search( the.esClient.Search.WithIndex(index), the.esClient.Search.WithBody(body), ) defer response.Body.Close() if err != nil { return nil } r := common_models.EsRawResp{} // Deserialize the response into a map. if err := json.NewDecoder(response.Body).Decode(&r); err != nil { log.Fatalf("Error parsing the response body: %s", err) } return r.Hits.Hits } func (the *ESHelper) Search(index, reqBody string) { body := &bytes.Buffer{} body.WriteString(reqBody) response, err := the.esClient.Search( the.esClient.Search.WithIndex(index), the.esClient.Search.WithBody(body), ) if err != nil { //return nil, err } log.Println(response.Status()) var r map[string]any // Deserialize the response into a map. if err := json.NewDecoder(response.Body).Decode(&r); err != nil { log.Fatalf("Error parsing the response body: %s", err) } // Print the response status, number of results, and request duration. log.Printf( "[%s] %d hits; took: %dms", response.Status(), int(r["hits"].(map[string]any)["total"].(float64)), int(r["took"].(float64)), ) for _, hit := range r["hits"].(map[string]any)["hits"].([]any) { source := hit.(map[string]any)["_source"] log.Printf(" * ID=%s, %s", hit.(map[string]any)["_id"], source) } log.Println(strings.Repeat("=", 37)) } func (the *ESHelper) request(index, reqBody string) (map[string]any, error) { // Set up the request object. req := esapi.IndexRequest{ Index: index, //DocumentID: strconv.Itoa(i + 1), Body: strings.NewReader(reqBody), Refresh: "true", } // Perform the request with the client. res, err := req.Do(context.Background(), the.esClient) if err != nil { log.Fatalf("Error getting response: %s", err) } defer res.Body.Close() var r map[string]any if res.IsError() { log.Printf("[%s] Error indexing document ID=%d", res.Status(), 0) } else { // Deserialize the response into a map. if err := json.NewDecoder(res.Body).Decode(&r); err != nil { log.Printf("Error parsing the response body: %s", err) } else { // Print the response status and indexed document version. log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64))) } } return r, err } func (the *ESHelper) searchRaw(index, reqBody string) (common_models.IotaData, error) { respmap, err := the.request(index, reqBody) if respmap != nil { } iotaDatas := common_models.IotaData{} return iotaDatas, err } func (the *ESHelper) searchThemes(index, reqBody string) (common_models.EsThemeResp, error) { body := &bytes.Buffer{} body.WriteString(reqBody) response, err := the.esClient.Search( the.esClient.Search.WithIndex(index), the.esClient.Search.WithBody(body), ) defer response.Body.Close() if err != nil { //return nil, err } log.Println(response.Status()) r := common_models.EsThemeResp{} // Deserialize the response into a map. if err := json.NewDecoder(response.Body).Decode(&r); err != nil { log.Fatalf("Error parsing the response body: %s", err) } return r, err } func (the *ESHelper) SearchLatestStationData(index string, sensorId int) (common_models.EsTheme, error) { //sensorId := 178 queryBody := fmt.Sprintf(`{ "size": 1, "query": { "term": { "sensor": { "value": %d } } }, "sort": [ { "collect_time": { "order": "desc" } } ] }`, sensorId) //index := "go_native_themes" themes, err := the.searchThemes(index, queryBody) var theme common_models.EsTheme if len(themes.Hits.Hits) > 0 { theme = themes.Hits.Hits[0].Source } return theme, err } func (the *ESHelper) BulkWrite(index, reqBody string) { body := &bytes.Buffer{} body.WriteString(reqBody) bulkRequest := esapi.BulkRequest{ Index: index, Body: body, DocumentType: "_doc", } res, err := bulkRequest.Do(context.Background(), the.esClient) defer res.Body.Close() if err != nil { log.Panicf("es 写入[%s],err=%s", index, err.Error()) } if res.StatusCode != 200 && res.StatusCode != 201 { respBody, _ := io.ReadAll(res.Body) log.Panicf("es 写入失败,err=%s \n body=%s", string(respBody), reqBody) } //log.Printf("es 写入[%s],字符长度=%d,完成", index, len(reqBody)) } func (the *ESHelper) BulkWriteRaws2Es(index string, raws []common_models.EsRaw) { body := strings.Builder{} for _, raw := range raws { // scala => val id = UUID.nameUUIDFromBytes(s"${v.deviceId}-${v.acqTime.getMillis}".getBytes("UTF-8")).toString source, _ := json.Marshal(raw) _id := common_calc.NameUUIDFromString(fmt.Sprintf("%s-%d", raw.IotaDevice, raw.CollectTime.UnixMilli())) s := fmt.Sprintf( `{"index": {"_index": "%s","_id": "%s"}} %s `, index, _id, source) body.WriteString(s) } the.BulkWrite(index, body.String()) } func (the *ESHelper) Close() { } // BulkWriteGroup2Es 分组主题数据写入ES func (the *ESHelper) BulkWriteGroup2Es(index string, themes []common_models.EsGroupTheme) { body := strings.Builder{} for _, theme := range themes { source, _ := json.Marshal(theme) _id := common_calc.NameUUIDFromString(fmt.Sprintf("%d-%d", theme.GroupId, theme.CollectTime.UnixMilli())) s := fmt.Sprintf( `{"index": {"_index": "%s","_id": "%s"}} %s `, index, _id, source) body.WriteString(s) } the.BulkWrite(index, body.String()) }