重建 common_utils
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
3.6 KiB

package dbHelper
import (
"context"
"fmt"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"log"
"net/http"
"time"
)
type InfluxDBHelper struct {
url string
token string
bucket string
org string
client influxdb2.Client
}
func (the *InfluxDBHelper) getClient() influxdb2.Client {
return the.client
}
func NewInfluxDBHelper(url, token, org string) *InfluxDBHelper {
// 创建HTTP客户端并设置请求超时
httpClient := &http.Client{
Timeout: 60 * time.Second, // 设置超时时间为60秒
}
influxDBHelper := &InfluxDBHelper{
url: url,
token: token,
org: org,
}
influxDBHelper.client = influxdb2.NewClientWithOptions(influxDBHelper.url, influxDBHelper.token, influxdb2.DefaultOptions().SetHTTPClient(httpClient))
// always close client at the end
//defer the.client.Close()
log.Println("influxDB 客户端初始化完成")
return influxDBHelper
}
func (the *InfluxDBHelper) Close() {
defer the.client.Close()
log.Println("influxDB 客户端关闭")
}
func (the *InfluxDBHelper) Write(lines []string, bucket string) {
// get non-blocking write client
writeAPI := the.client.WriteAPI(the.org, bucket)
// write line protocol
for _, line := range lines {
writeAPI.WriteRecord(line)
}
writeAPI.Flush()
}
func (the *InfluxDBHelper) QueryByOfflineGap(measurement string, offlineGap string) map[string]time.Time {
sh, _ := time.LoadLocation("Asia/Shanghai")
data := make(map[string]time.Time)
query := fmt.Sprintf(`
from(bucket:"%v")
|> range(start: -%sm)
|> filter(fn: (r) => r["_measurement"] == "%s")
|> group(columns: ["sensor_id"])
|> last()
`, the.bucket, offlineGap, measurement)
// Get query client
queryAPI := the.client.QueryAPI(the.org)
// get QueryTableResult
result, err := queryAPI.Query(context.Background(), query)
if err == nil {
// Iterate over query response
for result.Next() {
// Notice when group key has changed
if result.TableChanged() {
fmt.Printf("table: %s\n", result.TableMetadata().String())
}
// Access data
values := result.Record().Values()
sensorId := values["sensor_id"].(string)
recordTime := result.Record().Time().In(sh)
data[sensorId] = recordTime
fmt.Printf("station:[%v] value:[%v] %v \n", sensorId, recordTime, values)
}
// check for an error
if result.Err() != nil {
fmt.Printf("query parsing error: %s\n", result.Err().Error())
}
} else {
log.Printf("influxDB 查询异常 %s", err.Error())
}
return data
}
func (the *InfluxDBHelper) Query() map[string]time.Time {
sh, _ := time.LoadLocation("Asia/Shanghai")
data := make(map[string]time.Time)
query := fmt.Sprintf(`
from(bucket:"%v")
|> range(start: -1)
|> filter(fn: (r) => r["_measurement"] == "factor_11" or r["_measurement"] == "factor_18" or r["_measurement"] == "factor_24")
|> group(columns: ["sensor_id"])
|> last()
`, the.bucket)
// Get query client
queryAPI := the.client.QueryAPI(the.org)
// get QueryTableResult
result, err := queryAPI.Query(context.Background(), query)
if err == nil {
// Iterate over query response
for result.Next() {
// Notice when group key has changed
if result.TableChanged() {
fmt.Printf("table: %s\n", result.TableMetadata().String())
}
// Access data
values := result.Record().Values()
sensorId := values["sensor_id"].(string)
recordTime := result.Record().Time().In(sh)
data[sensorId] = recordTime
//fmt.Printf("station:[%v] value:[%v] %v \n", sensorId, recordTime, values)
}
// check for an error
if result.Err() != nil {
fmt.Printf("query parsing error: %v \n", result.Err().Error())
}
} else {
log.Printf("[QueryAllLast] influxDB 查询异常 %s", err.Error())
}
return data
}