You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
139 lines
3.6 KiB
139 lines
3.6 KiB
4 months ago
|
package dbHelper
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||
|
"log"
|
||
|
"net/http"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
type InfluxDBHelper struct {
|
||
|
url string
|
||
|
token string
|
||
|
bucket string
|
||
|
org string
|
||
|
client influxdb2.Client
|
||
|
}
|
||
|
|
||
|
func (the *InfluxDBHelper) getClient() influxdb2.Client {
|
||
|
return the.client
|
||
|
}
|
||
|
|
||
|
func NewInfluxDBHelper(url, token, org string) *InfluxDBHelper {
|
||
|
// 创建HTTP客户端并设置请求超时
|
||
|
httpClient := &http.Client{
|
||
|
Timeout: 60 * time.Second, // 设置超时时间为60秒
|
||
|
}
|
||
|
influxDBHelper := &InfluxDBHelper{
|
||
|
url: url,
|
||
|
token: token,
|
||
|
org: org,
|
||
|
}
|
||
|
influxDBHelper.client = influxdb2.NewClientWithOptions(influxDBHelper.url, influxDBHelper.token, influxdb2.DefaultOptions().SetHTTPClient(httpClient))
|
||
|
|
||
|
// always close client at the end
|
||
|
//defer the.client.Close()
|
||
|
log.Println("influxDB 客户端初始化完成")
|
||
|
return influxDBHelper
|
||
|
}
|
||
|
|
||
|
func (the *InfluxDBHelper) Close() {
|
||
|
defer the.client.Close()
|
||
|
log.Println("influxDB 客户端关闭")
|
||
|
}
|
||
|
|
||
|
func (the *InfluxDBHelper) Write(lines []string, bucket string) {
|
||
|
// get non-blocking write client
|
||
|
writeAPI := the.client.WriteAPI(the.org, bucket)
|
||
|
|
||
|
// write line protocol
|
||
|
for _, line := range lines {
|
||
|
writeAPI.WriteRecord(line)
|
||
|
}
|
||
|
writeAPI.Flush()
|
||
|
}
|
||
|
|
||
|
func (the *InfluxDBHelper) QueryByOfflineGap(measurement string, offlineGap string) map[string]time.Time {
|
||
|
|
||
|
sh, _ := time.LoadLocation("Asia/Shanghai")
|
||
|
data := make(map[string]time.Time)
|
||
|
|
||
|
query := fmt.Sprintf(`
|
||
|
from(bucket:"%v")
|
||
|
|> range(start: -%sm)
|
||
|
|> filter(fn: (r) => r["_measurement"] == "%s")
|
||
|
|> group(columns: ["sensor_id"])
|
||
|
|> last()
|
||
|
`, the.bucket, offlineGap, measurement)
|
||
|
// Get query client
|
||
|
queryAPI := the.client.QueryAPI(the.org)
|
||
|
// get QueryTableResult
|
||
|
result, err := queryAPI.Query(context.Background(), query)
|
||
|
if err == nil {
|
||
|
// Iterate over query response
|
||
|
for result.Next() {
|
||
|
// Notice when group key has changed
|
||
|
if result.TableChanged() {
|
||
|
fmt.Printf("table: %s\n", result.TableMetadata().String())
|
||
|
}
|
||
|
// Access data
|
||
|
values := result.Record().Values()
|
||
|
sensorId := values["sensor_id"].(string)
|
||
|
recordTime := result.Record().Time().In(sh)
|
||
|
data[sensorId] = recordTime
|
||
|
fmt.Printf("station:[%v] value:[%v] %v \n", sensorId, recordTime, values)
|
||
|
}
|
||
|
// check for an error
|
||
|
if result.Err() != nil {
|
||
|
fmt.Printf("query parsing error: %s\n", result.Err().Error())
|
||
|
}
|
||
|
} else {
|
||
|
log.Printf("influxDB 查询异常 %s", err.Error())
|
||
|
}
|
||
|
|
||
|
return data
|
||
|
}
|
||
|
func (the *InfluxDBHelper) Query() map[string]time.Time {
|
||
|
|
||
|
sh, _ := time.LoadLocation("Asia/Shanghai")
|
||
|
data := make(map[string]time.Time)
|
||
|
|
||
|
query := fmt.Sprintf(`
|
||
|
from(bucket:"%v")
|
||
|
|> range(start: -1)
|
||
|
|> filter(fn: (r) => r["_measurement"] == "factor_11" or r["_measurement"] == "factor_18" or r["_measurement"] == "factor_24")
|
||
|
|> group(columns: ["sensor_id"])
|
||
|
|> last()
|
||
|
`, the.bucket)
|
||
|
// Get query client
|
||
|
queryAPI := the.client.QueryAPI(the.org)
|
||
|
// get QueryTableResult
|
||
|
result, err := queryAPI.Query(context.Background(), query)
|
||
|
|
||
|
if err == nil {
|
||
|
// Iterate over query response
|
||
|
for result.Next() {
|
||
|
// Notice when group key has changed
|
||
|
if result.TableChanged() {
|
||
|
fmt.Printf("table: %s\n", result.TableMetadata().String())
|
||
|
}
|
||
|
// Access data
|
||
|
values := result.Record().Values()
|
||
|
sensorId := values["sensor_id"].(string)
|
||
|
recordTime := result.Record().Time().In(sh)
|
||
|
data[sensorId] = recordTime
|
||
|
//fmt.Printf("station:[%v] value:[%v] %v \n", sensorId, recordTime, values)
|
||
|
}
|
||
|
// check for an error
|
||
|
if result.Err() != nil {
|
||
|
fmt.Printf("query parsing error: %v \n", result.Err().Error())
|
||
|
}
|
||
|
} else {
|
||
|
log.Printf("[QueryAllLast] influxDB 查询异常 %s", err.Error())
|
||
|
}
|
||
|
|
||
|
return data
|
||
|
}
|