|
|
@ -174,6 +174,29 @@ func (the *ESHelper) SearchLatestStationData(index string, sensorId int) (models |
|
|
|
} |
|
|
|
func (the *ESHelper) BulkWrite(index, reqBody string) { |
|
|
|
|
|
|
|
body := &bytes.Buffer{} |
|
|
|
body.WriteString(reqBody) |
|
|
|
bulkRequest := esapi.BulkRequest{ |
|
|
|
Index: index, |
|
|
|
Body: body, |
|
|
|
DocumentType: "_doc", |
|
|
|
} |
|
|
|
res, err := bulkRequest.Do(context.Background(), the.esClient) |
|
|
|
defer res.Body.Close() |
|
|
|
if err != nil { |
|
|
|
log.Panicf("es 写入[%s],err=%s", index, err.Error()) |
|
|
|
return |
|
|
|
} |
|
|
|
respBody, _ := io.ReadAll(res.Body) |
|
|
|
if res.StatusCode != 200 && res.StatusCode != 201 { |
|
|
|
log.Panicf("es 写入失败,err=%s \n body=%s", string(respBody), reqBody) |
|
|
|
} |
|
|
|
//log.Printf("es 写入[%s],完成,res=%s ", index, respBody)
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
func (the *ESHelper) BulkWriteWithLog(index, reqBody string) { |
|
|
|
|
|
|
|
body := &bytes.Buffer{} |
|
|
|
body.WriteString(reqBody) |
|
|
|
bulkRequest := esapi.BulkRequest{ |
|
|
@ -200,6 +223,8 @@ func (the *ESHelper) BulkWriteRaws2Es(index string, raws []models.EsRaw) { |
|
|
|
//log 测试用
|
|
|
|
const logTagDeviceId = "91da4d1f-fbc7-4dad-bedd-f8ff05c0e0e0" |
|
|
|
|
|
|
|
logTag := false |
|
|
|
|
|
|
|
body := strings.Builder{} |
|
|
|
for _, raw := range raws { |
|
|
|
// scala => val id = UUID.nameUUIDFromBytes(s"${v.deviceId}-${v.acqTime.getMillis}".getBytes("UTF-8")).toString
|
|
|
@ -213,9 +238,14 @@ func (the *ESHelper) BulkWriteRaws2Es(index string, raws []models.EsRaw) { |
|
|
|
|
|
|
|
if raw.IotaDevice == logTagDeviceId { |
|
|
|
log.Printf("BulkWriteRaws2Es 标记设备数据 [%s] %s ", logTagDeviceId, string(s)) |
|
|
|
logTag = true |
|
|
|
} |
|
|
|
} |
|
|
|
the.BulkWrite(index, body.String()) |
|
|
|
if logTag { //追踪数据
|
|
|
|
the.BulkWriteWithLog(index, body.String()) |
|
|
|
} else { |
|
|
|
the.BulkWrite(index, body.String()) |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|