package dbOperate import ( "context" "fmt" influxdb2 "github.com/influxdata/influxdb-client-go/v2" "log" "net/http" "time" ) type InfluxDBHelper struct { Url string Token string Bucket string Org string client influxdb2.Client } func (the *InfluxDBHelper) GetClient() influxdb2.Client { return the.client } func (the *InfluxDBHelper) Initial() { //the.Token = "t4YoDtUT1Hi3-1COJ2xxJbKnBGHyLYxt9SQ2MpZ_5EskMaKyKaU-IiSQRJnFm7y66t80M3AKH7TzF3tBksL5EA==" //the.Bucket = "dynamic" //the.Org = "shandong" //the.Url = "http://172.26.103.10:30086" // 创建HTTP客户端并设置请求超时 httpClient := &http.Client{ Timeout: 60 * time.Second, // 设置超时时间为60秒 } the.client = influxdb2.NewClientWithOptions(the.Url, the.Token, influxdb2.DefaultOptions().SetHTTPClient(httpClient)) // always close client at the end //defer the.client.Close() log.Println("influxDB 客户端初始化完成") } func (the *InfluxDBHelper) Close() { defer the.client.Close() log.Println("influxDB 客户端关闭") } func (the *InfluxDBHelper) Write() { // get non-blocking write client writeAPI := the.client.WriteAPI(the.Org, the.Bucket) // write line protocol writeAPI.WriteRecord(fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0)) writeAPI.WriteRecord(fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 22.5, 45.0)) // Flush writes writeAPI.Flush() } func (the *InfluxDBHelper) QueryByOfflineGap(measurement string, offlineGap string) map[string]time.Time { sh, _ := time.LoadLocation("Asia/Shanghai") data := make(map[string]time.Time) query := fmt.Sprintf(` from(bucket:"%v") |> range(start: -%sm) |> filter(fn: (r) => r["_measurement"] == "%s") |> group(columns: ["sensor_id"]) |> last() `, the.Bucket, offlineGap, measurement) // Get query client queryAPI := the.client.QueryAPI(the.Org) // get QueryTableResult result, err := queryAPI.Query(context.Background(), query) if err == nil { // Iterate over query response for result.Next() { // Notice when group key has changed if result.TableChanged() { fmt.Printf("table: %s\n", result.TableMetadata().String()) } // Access data values := result.Record().Values() sensorId := values["sensor_id"].(string) recordTime := result.Record().Time().In(sh) data[sensorId] = recordTime fmt.Printf("station:[%v] value:[%v] %v \n", sensorId, recordTime, values) } // check for an error if result.Err() != nil { fmt.Printf("query parsing error: %s \n", result.Err().Error()) } } else { log.Printf("influxDB 查询异常 %s", err.Error()) } return data } func (the *InfluxDBHelper) Query() map[string]time.Time { sh, _ := time.LoadLocation("Asia/Shanghai") data := make(map[string]time.Time) query := fmt.Sprintf(` from(bucket:"%v") |> range(start: -1) |> filter(fn: (r) => r["_measurement"] == "factor_11" or r["_measurement"] == "factor_18" or r["_measurement"] == "factor_24") |> group(columns: ["sensor_id"]) |> last() `, the.Bucket) // Get query client queryAPI := the.client.QueryAPI(the.Org) // get QueryTableResult result, err := queryAPI.Query(context.Background(), query) if err == nil { // Iterate over query response for result.Next() { // Notice when group key has changed if result.TableChanged() { fmt.Printf("table: %s\n", result.TableMetadata().String()) } // Access data values := result.Record().Values() sensorId := values["sensor_id"].(string) recordTime := result.Record().Time().In(sh) data[sensorId] = recordTime //fmt.Printf("station:[%v] value:[%v] %v \n", sensorId, recordTime, values) } // check for an error if result.Err() != nil { fmt.Printf("query parsing error: %v \n", result.Err().Error()) } } else { log.Printf("[QueryAllLast] influxDB 查询异常 %s", err.Error()) } return data }