重建 common_utils
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

114 lines
3.4 KiB

package storageDBs
import (
"fmt"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"gitea.anxinyun.cn/container/common_utils/dbHelper"
"gitea.anxinyun.cn/container/common_utils/transform"
"strings"
)
type Storage2InfluxDB struct {
influxDBHelper *dbHelper.InfluxDBHelper
rawBucket string
themeBucket string
vibBucket string
}
// NewStorage2InfluxDB2ByDefaultConfig
// 读取默认配置-初始化
func NewStorage2InfluxDB2ByDefaultConfig() *Storage2InfluxDB {
influxDBAddresses := configLoad.LoadConfig().GetString("influxDB.address")
influxDBToken := configLoad.LoadConfig().GetString("influxDB.token")
influxDBOrg := configLoad.LoadConfig().GetString("influxDB.organization")
influxHelper := dbHelper.NewInfluxDBHelper(influxDBAddresses, influxDBToken, influxDBOrg)
s := Storage2InfluxDB{
influxDBHelper: influxHelper,
rawBucket: configLoad.LoadConfig().GetString("influxDB.buckets.raw"),
themeBucket: configLoad.LoadConfig().GetString("influxDB.buckets.theme"),
vibBucket: configLoad.LoadConfig().GetString("influxDB.buckets.vib"),
}
return &s
}
func NewStorage2InfluxDB2(influx *dbHelper.InfluxDBHelper) *Storage2InfluxDB {
s := Storage2InfluxDB{
influxDBHelper: influx,
}
return &s
}
func (the *Storage2InfluxDB) SaveDeviceData(dataList []common_models.DeviceData) {
if len(dataList) == 0 {
return
}
var sb []string
for _, d := range dataList {
fields := strings.Builder{}
transform.Obj2mapStr(&fields, d.Raw)
line := fmt.Sprintf("%s %s %d", d.DeviceId, fields.String(), d.AcqTime.UnixNano())
sb = append(sb, line)
}
the.influxDBHelper.Write(sb, the.rawBucket)
}
func (the *Storage2InfluxDB) SaveRaw(dataList []common_models.EsRaw) {
if len(dataList) == 0 {
return
}
var sb []string
for _, d := range dataList {
fields := strings.Builder{}
transform.Obj2mapStr(&fields, d.Data)
line := fmt.Sprintf("%s %s %d", d.IotaDevice, fields.String(), d.CollectTime.UnixNano())
sb = append(sb, line)
}
the.influxDBHelper.Write(sb, the.rawBucket)
}
func (the *Storage2InfluxDB) SaveTheme(dataList []common_models.EsTheme) {
if len(dataList) == 0 {
return
}
var sb []string
for _, d := range dataList {
fields := strings.Builder{}
transform.Obj2mapStr(&fields, d.Data)
line := fmt.Sprintf("%d %s %d", d.Sensor, fields.String(), d.CollectTime.UnixNano())
sb = append(sb, line)
}
the.influxDBHelper.Write(sb, the.themeBucket)
}
func (the *Storage2InfluxDB) SaveVib(dataList []common_models.EsVbRaw) {
if len(dataList) == 0 {
return
}
var sb []string
for _, data := range dataList {
onceList := data.FlatMapDynamicVib()
for _, onceD := range onceList {
fields := strings.Builder{}
transform.Obj2mapStr(&fields, onceD.Data)
line := fmt.Sprintf("%s %s %d", onceD.IotaDevice, fields.String(), onceD.CollectTime.UnixNano())
sb = append(sb, line)
}
}
the.influxDBHelper.Write(sb, the.vibBucket)
}
func (the *Storage2InfluxDB) SaveThemeData(dataList []common_models.StationData) {
if len(dataList) == 0 {
return
}
var sb []string
for _, d := range dataList {
fields := strings.Builder{}
transform.Obj2mapStr(&fields, d.ThemeData)
sensorId := 123
line := fmt.Sprintf("%d %s %d", sensorId, fields.String(), d.CollectTime.UnixNano())
sb = append(sb, line)
}
the.influxDBHelper.Write(sb, the.themeBucket)
}
func (the *Storage2InfluxDB) SaveGroupTheme(dataList []common_models.EsGroupTheme) {
}