You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
136 lines
3.8 KiB
136 lines
3.8 KiB
package dbOperate
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
|
"log"
|
|
"net/http"
|
|
"time"
|
|
)
|
|
|
|
type InfluxDBHelper struct {
|
|
Url string
|
|
Token string
|
|
Bucket string
|
|
Org string
|
|
client influxdb2.Client
|
|
}
|
|
|
|
func (the *InfluxDBHelper) GetClient() influxdb2.Client {
|
|
return the.client
|
|
}
|
|
|
|
func (the *InfluxDBHelper) Initial() {
|
|
//the.Token = "t4YoDtUT1Hi3-1COJ2xxJbKnBGHyLYxt9SQ2MpZ_5EskMaKyKaU-IiSQRJnFm7y66t80M3AKH7TzF3tBksL5EA=="
|
|
//the.Bucket = "dynamic"
|
|
//the.Org = "shandong"
|
|
//the.Url = "http://172.26.103.10:30086"
|
|
// 创建HTTP客户端并设置请求超时
|
|
httpClient := &http.Client{
|
|
Timeout: 60 * time.Second, // 设置超时时间为60秒
|
|
}
|
|
the.client = influxdb2.NewClientWithOptions(the.Url, the.Token, influxdb2.DefaultOptions().SetHTTPClient(httpClient))
|
|
|
|
// always close client at the end
|
|
//defer the.client.Close()
|
|
log.Println("influxDB 客户端初始化完成")
|
|
}
|
|
|
|
func (the *InfluxDBHelper) Close() {
|
|
defer the.client.Close()
|
|
log.Println("influxDB 客户端关闭")
|
|
}
|
|
|
|
func (the *InfluxDBHelper) Write() {
|
|
// get non-blocking write client
|
|
writeAPI := the.client.WriteAPI(the.Org, the.Bucket)
|
|
|
|
// write line protocol
|
|
writeAPI.WriteRecord(fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0))
|
|
writeAPI.WriteRecord(fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 22.5, 45.0))
|
|
// Flush writes
|
|
writeAPI.Flush()
|
|
}
|
|
|
|
func (the *InfluxDBHelper) QueryByOfflineGap(measurement string, offlineGap string) map[string]time.Time {
|
|
|
|
sh, _ := time.LoadLocation("Asia/Shanghai")
|
|
data := make(map[string]time.Time)
|
|
|
|
query := fmt.Sprintf(`
|
|
from(bucket:"%v")
|
|
|> range(start: -%sm)
|
|
|> filter(fn: (r) => r["_measurement"] == "%s")
|
|
|> group(columns: ["sensor_id"])
|
|
|> last()
|
|
`, the.Bucket, offlineGap, measurement)
|
|
// Get query client
|
|
queryAPI := the.client.QueryAPI(the.Org)
|
|
// get QueryTableResult
|
|
result, err := queryAPI.Query(context.Background(), query)
|
|
if err == nil {
|
|
// Iterate over query response
|
|
for result.Next() {
|
|
// Notice when group key has changed
|
|
if result.TableChanged() {
|
|
fmt.Printf("table: %s\n", result.TableMetadata().String())
|
|
}
|
|
// Access data
|
|
values := result.Record().Values()
|
|
sensorId := values["sensor_id"].(string)
|
|
recordTime := result.Record().Time().In(sh)
|
|
data[sensorId] = recordTime
|
|
fmt.Printf("station:[%v] value:[%v] %v \n", sensorId, recordTime, values)
|
|
}
|
|
// check for an error
|
|
if result.Err() != nil {
|
|
fmt.Printf("query parsing error: %s \n", result.Err().Error())
|
|
}
|
|
} else {
|
|
log.Printf("influxDB 查询异常 %s", err.Error())
|
|
}
|
|
|
|
return data
|
|
}
|
|
func (the *InfluxDBHelper) Query() map[string]time.Time {
|
|
|
|
sh, _ := time.LoadLocation("Asia/Shanghai")
|
|
data := make(map[string]time.Time)
|
|
|
|
query := fmt.Sprintf(`
|
|
from(bucket:"%v")
|
|
|> range(start: -1)
|
|
|> filter(fn: (r) => r["_measurement"] == "factor_11" or r["_measurement"] == "factor_18" or r["_measurement"] == "factor_24")
|
|
|> group(columns: ["sensor_id"])
|
|
|> last()
|
|
`, the.Bucket)
|
|
// Get query client
|
|
queryAPI := the.client.QueryAPI(the.Org)
|
|
// get QueryTableResult
|
|
result, err := queryAPI.Query(context.Background(), query)
|
|
|
|
if err == nil {
|
|
// Iterate over query response
|
|
for result.Next() {
|
|
// Notice when group key has changed
|
|
if result.TableChanged() {
|
|
fmt.Printf("table: %s\n", result.TableMetadata().String())
|
|
}
|
|
// Access data
|
|
values := result.Record().Values()
|
|
sensorId := values["sensor_id"].(string)
|
|
recordTime := result.Record().Time().In(sh)
|
|
data[sensorId] = recordTime
|
|
//fmt.Printf("station:[%v] value:[%v] %v \n", sensorId, recordTime, values)
|
|
}
|
|
// check for an error
|
|
if result.Err() != nil {
|
|
fmt.Printf("query parsing error: %v \n", result.Err().Error())
|
|
}
|
|
} else {
|
|
log.Printf("[QueryAllLast] influxDB 查询异常 %s", err.Error())
|
|
}
|
|
|
|
return data
|
|
}
|
|
|