package dbHelper import ( "context" "fmt" influxdb2 "github.com/influxdata/influxdb-client-go/v2" "log" "net/http" "time" ) type InfluxDBHelper struct { url string token string bucket string org string client influxdb2.Client } func (the *InfluxDBHelper) getClient() influxdb2.Client { return the.client } func NewInfluxDBHelper(url, token, org string) *InfluxDBHelper { // 创建HTTP客户端并设置请求超时 httpClient := &http.Client{ Timeout: 60 * time.Second, // 设置超时时间为60秒 } influxDBHelper := &InfluxDBHelper{ url: url, token: token, org: org, } influxDBHelper.client = influxdb2.NewClientWithOptions(influxDBHelper.url, influxDBHelper.token, influxdb2.DefaultOptions().SetHTTPClient(httpClient)) // always close client at the end //defer the.client.Close() log.Println("influxDB 客户端初始化完成") return influxDBHelper } func (the *InfluxDBHelper) Close() { defer the.client.Close() log.Println("influxDB 客户端关闭") } func (the *InfluxDBHelper) Write(lines []string, bucket string) { // get non-blocking write client writeAPI := the.client.WriteAPI(the.org, bucket) // write line protocol for _, line := range lines { writeAPI.WriteRecord(line) } writeAPI.Flush() } func (the *InfluxDBHelper) QueryByOfflineGap(measurement string, offlineGap string) map[string]time.Time { sh, _ := time.LoadLocation("Asia/Shanghai") data := make(map[string]time.Time) query := fmt.Sprintf(` from(bucket:"%v") |> range(start: -%sm) |> filter(fn: (r) => r["_measurement"] == "%s") |> group(columns: ["sensor_id"]) |> last() `, the.bucket, offlineGap, measurement) // Get query client queryAPI := the.client.QueryAPI(the.org) // get QueryTableResult result, err := queryAPI.Query(context.Background(), query) if err == nil { // Iterate over query response for result.Next() { // Notice when group key has changed if result.TableChanged() { fmt.Printf("table: %s\n", result.TableMetadata().String()) } // Access data values := result.Record().Values() sensorId := values["sensor_id"].(string) recordTime := result.Record().Time().In(sh) data[sensorId] = recordTime fmt.Printf("station:[%v] value:[%v] %v \n", sensorId, recordTime, values) } // check for an error if result.Err() != nil { fmt.Printf("query parsing error: %s\n", result.Err().Error()) } } else { log.Printf("influxDB 查询异常 %s", err.Error()) } return data } func (the *InfluxDBHelper) Query() map[string]time.Time { sh, _ := time.LoadLocation("Asia/Shanghai") data := make(map[string]time.Time) query := fmt.Sprintf(` from(bucket:"%v") |> range(start: -1) |> filter(fn: (r) => r["_measurement"] == "factor_11" or r["_measurement"] == "factor_18" or r["_measurement"] == "factor_24") |> group(columns: ["sensor_id"]) |> last() `, the.bucket) // Get query client queryAPI := the.client.QueryAPI(the.org) // get QueryTableResult result, err := queryAPI.Query(context.Background(), query) if err == nil { // Iterate over query response for result.Next() { // Notice when group key has changed if result.TableChanged() { fmt.Printf("table: %s\n", result.TableMetadata().String()) } // Access data values := result.Record().Values() sensorId := values["sensor_id"].(string) recordTime := result.Record().Time().In(sh) data[sensorId] = recordTime //fmt.Printf("station:[%v] value:[%v] %v \n", sensorId, recordTime, values) } // check for an error if result.Err() != nil { fmt.Printf("query parsing error: %v \n", result.Err().Error()) } } else { log.Printf("[QueryAllLast] influxDB 查询异常 %s", err.Error()) } return data }