package storageDBs import ( "fmt" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils/configLoad" "gitea.anxinyun.cn/container/common_utils/dbHelper" "gitea.anxinyun.cn/container/common_utils/transform" ) type Storage2InfluxDB struct { influxDBHelper *dbHelper.InfluxDBHelper rawBucket string themeBucket string vibBucket string } // NewStorage2InfluxDB2ByDefaultConfig // 读取默认配置-初始化 func NewStorage2InfluxDB2ByDefaultConfig() *Storage2InfluxDB { influxDBAddresses := configLoad.LoadConfig().GetString("influxDB.address") influxDBToken := configLoad.LoadConfig().GetString("influxDB.token") influxDBOrg := configLoad.LoadConfig().GetString("influxDB.organization") influxHelper := dbHelper.NewInfluxDBHelper(influxDBAddresses, influxDBToken, influxDBOrg) s := Storage2InfluxDB{ influxDBHelper: influxHelper, rawBucket: configLoad.LoadConfig().GetString("influxDB.buckets.raw"), themeBucket: configLoad.LoadConfig().GetString("influxDB.buckets.theme"), vibBucket: configLoad.LoadConfig().GetString("influxDB.buckets.vib"), } return &s } func NewStorage2InfluxDB2(influx *dbHelper.InfluxDBHelper) *Storage2InfluxDB { s := Storage2InfluxDB{ influxDBHelper: influx, } return &s } func (the *Storage2InfluxDB) SaveDeviceData(dataList []common_models.DeviceData) { if len(dataList) == 0 { return } var sb []string for _, d := range dataList { if d.Raw == nil { continue } fields := transform.Obj2mapStr(d.Raw) line := fmt.Sprintf("%s %s %d", d.DeviceId, fields, d.AcqTime.UnixNano()) sb = append(sb, line) } the.influxDBHelper.Write(sb, the.rawBucket) } func (the *Storage2InfluxDB) SaveRaw(dataList []common_models.EsRaw) { if len(dataList) == 0 { return } var sb []string for _, d := range dataList { if d.Data == nil { continue } //cloneMap := copyMap(d.Data) //奇怪的错误 fields := transform.Obj2mapStr(d.Data) line := fmt.Sprintf("%s %s %d", d.IotaDevice, fields, d.CollectTime.UnixNano()) sb = append(sb, line) } the.influxDBHelper.Write(sb, the.rawBucket) } func copyMap(raw map[string]any) map[string]any { m := make(map[string]any) for k, v := range raw { m[k] = v } return m } func (the *Storage2InfluxDB) SaveTheme(dataList []common_models.EsTheme) { if len(dataList) == 0 { return } var sb []string for _, d := range dataList { if d.Data == nil { continue } fields := transform.Obj2mapStr(d.Data) line := fmt.Sprintf("%d %s %d", d.Sensor, fields, d.CollectTime.UnixNano()) sb = append(sb, line) } the.influxDBHelper.Write(sb, the.themeBucket) } func (the *Storage2InfluxDB) SaveVib(dataList []common_models.EsVbRaw) { if len(dataList) == 0 { return } var sb []string for _, data := range dataList { onceList := data.FlatMapDynamicVib() for _, onceD := range onceList { if onceD.Data == nil { continue } fields := transform.Obj2mapStr(onceD.Data) line := fmt.Sprintf("%s %s %d", onceD.IotaDevice, fields, onceD.CollectTime.UnixNano()) sb = append(sb, line) } } the.influxDBHelper.Write(sb, the.vibBucket) } func (the *Storage2InfluxDB) SaveThemeData(dataList []common_models.StationData) { if len(dataList) == 0 { return } var sb []string for _, d := range dataList { if d.ThemeData == nil { continue } fields := transform.Obj2mapStr(d.ThemeData) sensorId := 123 line := fmt.Sprintf("%d %s %d", sensorId, fields, d.CollectTime.UnixNano()) sb = append(sb, line) } the.influxDBHelper.Write(sb, the.themeBucket) } func (the *Storage2InfluxDB) SaveGroupTheme(dataList []common_models.EsGroupTheme) { }