From 04e3f32654f242728068cf4077a2eabcb0c9ea5f Mon Sep 17 00:00:00 2001 From: lucas Date: Tue, 10 Dec 2024 17:13:53 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E6=B7=BB=E5=8A=A0=E6=8C=87=E5=AE=9A?= =?UTF-8?q?=E8=AE=BE=E5=A4=87=E6=97=A5=E5=BF=97=20=E8=BF=BD=E8=B8=AA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- adaptors/安心云最新设备数据toES.go | 2 +- dbHelper/elasticsearchHelper.go | 32 ++++++++++++++++++++- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/adaptors/安心云最新设备数据toES.go b/adaptors/安心云最新设备数据toES.go index f6ead4a..1d3dba0 100644 --- a/adaptors/安心云最新设备数据toES.go +++ b/adaptors/安心云最新设备数据toES.go @@ -27,7 +27,7 @@ func (the Adaptor_AXY_LastRAW) raw2es(iotaData models.IotaData) *models.EsRaw { if !iotaData.Data.Success() { return nil } - log.Printf("设备[%s] 数据时间 %s", iotaData.DeviceId, iotaData.TriggerTime) + //log.Printf("设备[%s] 数据时间 %s", iotaData.DeviceId, iotaData.TriggerTime) deviceInfo := the.GetDeviceInfo(iotaData.DeviceId) //查不到信息的数据 diff --git a/dbHelper/elasticsearchHelper.go b/dbHelper/elasticsearchHelper.go index 1a64afc..523cade 100644 --- a/dbHelper/elasticsearchHelper.go +++ b/dbHelper/elasticsearchHelper.go @@ -174,6 +174,29 @@ func (the *ESHelper) SearchLatestStationData(index string, sensorId int) (models } func (the *ESHelper) BulkWrite(index, reqBody string) { + body := &bytes.Buffer{} + body.WriteString(reqBody) + bulkRequest := esapi.BulkRequest{ + Index: index, + Body: body, + DocumentType: "_doc", + } + res, err := bulkRequest.Do(context.Background(), the.esClient) + defer res.Body.Close() + if err != nil { + log.Panicf("es 写入[%s],err=%s", index, err.Error()) + return + } + respBody, _ := io.ReadAll(res.Body) + if res.StatusCode != 200 && res.StatusCode != 201 { + log.Panicf("es 写入失败,err=%s \n body=%s", string(respBody), reqBody) + } + //log.Printf("es 写入[%s],完成,res=%s ", index, respBody) + +} + +func (the *ESHelper) BulkWriteWithLog(index, reqBody string) { + body := &bytes.Buffer{} body.WriteString(reqBody) bulkRequest := esapi.BulkRequest{ @@ -200,6 +223,8 @@ func (the *ESHelper) BulkWriteRaws2Es(index string, raws []models.EsRaw) { //log 测试用 const logTagDeviceId = "91da4d1f-fbc7-4dad-bedd-f8ff05c0e0e0" + logTag := false + body := strings.Builder{} for _, raw := range raws { // scala => val id = UUID.nameUUIDFromBytes(s"${v.deviceId}-${v.acqTime.getMillis}".getBytes("UTF-8")).toString @@ -213,9 +238,14 @@ func (the *ESHelper) BulkWriteRaws2Es(index string, raws []models.EsRaw) { if raw.IotaDevice == logTagDeviceId { log.Printf("BulkWriteRaws2Es 标记设备数据 [%s] %s ", logTagDeviceId, string(s)) + logTag = true } } - the.BulkWrite(index, body.String()) + if logTag { //追踪数据 + the.BulkWriteWithLog(index, body.String()) + } else { + the.BulkWrite(index, body.String()) + } }