You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
170 lines
5.2 KiB
170 lines
5.2 KiB
1 month ago
|
package storageDBs
|
||
|
|
||
|
import (
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"gitea.anxinyun.cn/container/common_calc"
|
||
|
"gitea.anxinyun.cn/container/common_models"
|
||
|
"gitea.anxinyun.cn/container/common_utils/configLoad"
|
||
|
"gitea.anxinyun.cn/container/common_utils/dbHelper"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
type Storage2Es struct {
|
||
|
esHelper *dbHelper.ESHelper
|
||
|
rawIndex string
|
||
|
themeIndex string
|
||
|
vibIndex string
|
||
|
groupIndex string
|
||
|
}
|
||
|
|
||
|
// NewStorage2Es6ByDefaultConfig
|
||
|
// 读取默认配置-初始化
|
||
|
func NewStorage2Es6ByDefaultConfig() *Storage2Es {
|
||
|
esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses")
|
||
|
user := configLoad.LoadConfig().GetString("es.user")
|
||
|
pwd := configLoad.LoadConfig().GetString("es.pwd")
|
||
|
esHelper := dbHelper.NewESHelper(esAddresses, user, pwd)
|
||
|
s := Storage2Es{
|
||
|
esHelper: esHelper,
|
||
|
rawIndex: configLoad.LoadConfig().GetString("es.index.raw"),
|
||
|
themeIndex: configLoad.LoadConfig().GetString("es.index.theme"),
|
||
|
vibIndex: configLoad.LoadConfig().GetString("es.index.vib"),
|
||
|
groupIndex: configLoad.LoadConfig().GetString("es.index.group"),
|
||
|
}
|
||
|
|
||
|
return &s
|
||
|
}
|
||
|
|
||
|
func newEsHelper() *dbHelper.ESHelper {
|
||
|
esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses")
|
||
|
user := configLoad.LoadConfig().GetString("es.user")
|
||
|
pwd := configLoad.LoadConfig().GetString("es.pwd")
|
||
|
return dbHelper.NewESHelper(esAddresses, user, pwd)
|
||
|
}
|
||
|
|
||
|
func (the *Storage2Es) SaveRaw(dataList []common_models.EsRaw) {
|
||
|
body := strings.Builder{}
|
||
|
for _, raw := range dataList {
|
||
|
// scala => val id = UUID.nameUUIDFromBytes(s"${v.deviceId}-${v.acqTime.getMillis}".getBytes("UTF-8")).toString
|
||
|
source, _ := json.Marshal(raw)
|
||
|
docId := fmt.Sprintf("%s-%d", raw.IotaDevice, raw.CollectTime.UnixMilli())
|
||
|
_id := common_calc.NameUUIDFromString(docId)
|
||
|
s := fmt.Sprintf(
|
||
|
`{"index": {"_index": "%s","_id": "%s"}}
|
||
|
%s
|
||
|
`, the.rawIndex, _id, source)
|
||
|
body.WriteString(s)
|
||
|
}
|
||
|
//the.esHelper.BulkWrite(the.rawIndex, body.String())
|
||
|
newEsHelper().BulkWrite(the.rawIndex, body.String())
|
||
|
}
|
||
|
|
||
|
func (the *Storage2Es) SaveTheme(dataList []common_models.EsTheme) {
|
||
|
body := strings.Builder{}
|
||
|
|
||
|
for _, theme := range dataList {
|
||
|
// scala => val id = UUID.nameUUIDFromBytes(s"${sd.station.id}-${sd.acqTime.getMillis}".getBytes("UTF-8")).toString
|
||
|
source, _ := json.Marshal(theme)
|
||
|
_id := common_calc.NameUUIDFromString(fmt.Sprintf("%d-%d", theme.Sensor, theme.CollectTime.UnixMilli()))
|
||
|
s := fmt.Sprintf(
|
||
|
`{"index": {"_index": "%s","_id": "%s"}}
|
||
|
%s
|
||
|
`, the.themeIndex, _id, source)
|
||
|
body.WriteString(s)
|
||
|
}
|
||
|
//the.esHelper.BulkWrite(the.themeIndex, body.String())
|
||
|
newEsHelper().BulkWrite(the.themeIndex, body.String())
|
||
|
}
|
||
|
func (the *Storage2Es) SaveVib(dataList []common_models.EsVbRaw) {
|
||
|
body := strings.Builder{}
|
||
|
for _, raw := range dataList {
|
||
|
// scala => val id = UUID.nameUUIDFromBytes(s"${v.deviceId}-${v.acqTime.getMillis}".getBytes("UTF-8")).toString
|
||
|
source, _ := json.Marshal(raw)
|
||
|
_id := common_calc.NameUUIDFromString(fmt.Sprintf("%s-%d", raw.IotaDevice, raw.CollectTime.UnixMilli()))
|
||
|
s := fmt.Sprintf(
|
||
|
`{"index": {"_index": "%s","_id": "%s"}}
|
||
|
%s
|
||
|
`, the.vibIndex, _id, source)
|
||
|
body.WriteString(s)
|
||
|
}
|
||
|
//the.esHelper.BulkWrite(the.vibIndex, body.String())
|
||
|
newEsHelper().BulkWrite(the.vibIndex, body.String())
|
||
|
}
|
||
|
|
||
|
// SaveGroupTheme 分组主题数据写入ES
|
||
|
func (the *Storage2Es) SaveGroupTheme(dataList []common_models.EsGroupTheme) {
|
||
|
body := strings.Builder{}
|
||
|
for _, theme := range dataList {
|
||
|
source, _ := json.Marshal(theme)
|
||
|
_id := common_calc.NameUUIDFromString(fmt.Sprintf("%d-%d", theme.GroupId, theme.CollectTime.UnixMilli()))
|
||
|
s := fmt.Sprintf(
|
||
|
`{"index": {"_index": "%s","_id": "%s"}}
|
||
|
%s
|
||
|
`, the.groupIndex, _id, source)
|
||
|
body.WriteString(s)
|
||
|
}
|
||
|
the.esHelper.BulkWrite(the.groupIndex, body.String())
|
||
|
}
|
||
|
func bathRawMarshal(index string, arrays []common_models.EsRaw) strings.Builder {
|
||
|
body := strings.Builder{}
|
||
|
|
||
|
wg := sync.WaitGroup{}
|
||
|
for _, array := range arrays {
|
||
|
wg.Add(1)
|
||
|
go func(raw common_models.EsRaw) {
|
||
|
defer wg.Done()
|
||
|
source, _ := json.Marshal(raw)
|
||
|
_id := common_calc.NameUUIDFromString(fmt.Sprintf("%s-%d", raw.IotaDevice, raw.CollectTime.UnixMilli()))
|
||
|
s := fmt.Sprintf(
|
||
|
`{"index": {"_index": "%s","_id": "%s"}}
|
||
|
%s
|
||
|
`, index, _id, source)
|
||
|
body.WriteString(s)
|
||
|
}(array)
|
||
|
}
|
||
|
wg.Wait()
|
||
|
return body
|
||
|
}
|
||
|
func bathVbRawMarshal(index string, arrays []common_models.EsVbRaw) strings.Builder {
|
||
|
body := strings.Builder{}
|
||
|
|
||
|
wg := sync.WaitGroup{}
|
||
|
for _, array := range arrays {
|
||
|
wg.Add(1)
|
||
|
go func(raw common_models.EsVbRaw) {
|
||
|
defer wg.Done()
|
||
|
source, _ := json.Marshal(raw)
|
||
|
_id := common_calc.NameUUIDFromString(fmt.Sprintf("%s-%d", raw.IotaDevice, raw.CollectTime.UnixMilli()))
|
||
|
s := fmt.Sprintf(
|
||
|
`{"index": {"_index": "%s","_id": "%s"}}
|
||
|
%s
|
||
|
`, index, _id, source)
|
||
|
body.WriteString(s)
|
||
|
}(array)
|
||
|
}
|
||
|
wg.Wait()
|
||
|
return body
|
||
|
}
|
||
|
func bathThemeMarshal(index string, arrays []common_models.EsTheme) strings.Builder {
|
||
|
body := strings.Builder{}
|
||
|
|
||
|
wg := sync.WaitGroup{}
|
||
|
for _, array := range arrays {
|
||
|
wg.Add(1)
|
||
|
go func(theme common_models.EsTheme) {
|
||
|
defer wg.Done()
|
||
|
source, _ := json.Marshal(theme)
|
||
|
_id := common_calc.NameUUIDFromString(fmt.Sprintf("%d-%d", theme.Sensor, theme.CollectTime.UnixMilli()))
|
||
|
s := fmt.Sprintf(
|
||
|
`{"index": {"_index": "%s","_id": "%s"}}
|
||
|
%s
|
||
|
`, index, _id, source)
|
||
|
body.WriteString(s)
|
||
|
}(array)
|
||
|
}
|
||
|
wg.Wait()
|
||
|
return body
|
||
|
}
|