package storageDBs import ( //"encoding/json" "fmt" "gitea.anxinyun.cn/container/common_calc" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils/configLoad" "gitea.anxinyun.cn/container/common_utils/dbHelper" json "github.com/bytedance/sonic" "log" "strings" "sync" ) type Storage2Es struct { esHelper *dbHelper.ESHelper rawIndex string themeIndex string vibIndex string groupIndex string } // NewStorage2Es6ByDefaultConfig // 读取默认配置-初始化 func NewStorage2Es6ByDefaultConfig() *Storage2Es { esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses") user := configLoad.LoadConfig().GetString("es.user") pwd := configLoad.LoadConfig().GetString("es.pwd") esHelper := dbHelper.NewESHelper(esAddresses, user, pwd) s := Storage2Es{ esHelper: esHelper, rawIndex: configLoad.LoadConfig().GetString("es.index.raw"), themeIndex: configLoad.LoadConfig().GetString("es.index.theme"), vibIndex: configLoad.LoadConfig().GetString("es.index.vib"), groupIndex: configLoad.LoadConfig().GetString("es.index.group"), } return &s } func newEsHelper() *dbHelper.ESHelper { esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses") user := configLoad.LoadConfig().GetString("es.user") pwd := configLoad.LoadConfig().GetString("es.pwd") return dbHelper.NewESHelper(esAddresses, user, pwd) } func (the *Storage2Es) SaveRaw(dataList []common_models.EsRaw) { defer func() { if err := recover(); err != nil { log.Printf("未知异常=> %v", err) } }() body := strings.Builder{} for _, raw := range dataList { // scala => val id = UUID.nameUUIDFromBytes(s"${v.deviceId}-${v.acqTime.getMillis}".getBytes("UTF-8")).toString source, err := json.Marshal(raw) if err != nil { log.Printf("raw序列化异常,err=%s", err.Error()) continue } docId := fmt.Sprintf("%s-%d", raw.IotaDevice, raw.CollectTime.UnixMilli()) _id := common_calc.NameUUIDFromString(docId) s := fmt.Sprintf( `{"index": {"_index": "%s","_id": "%s"}} %s `, the.rawIndex, _id, source) body.WriteString(s) } the.esHelper.BulkWrite(the.rawIndex, body.String()) } func (the *Storage2Es) SaveTheme(dataList []common_models.EsTheme) { body := strings.Builder{} for _, theme := range dataList { // scala => val id = UUID.nameUUIDFromBytes(s"${sd.station.id}-${sd.acqTime.getMillis}".getBytes("UTF-8")).toString source, err := json.Marshal(theme) if err != nil { log.Printf("theme序列化异常,err=%s", err.Error()) continue } _id := common_calc.NameUUIDFromString(fmt.Sprintf("%d-%d", theme.Sensor, theme.CollectTime.UnixMilli())) s := fmt.Sprintf( `{"index": {"_index": "%s","_id": "%s"}} %s `, the.themeIndex, _id, source) body.WriteString(s) } the.esHelper.BulkWrite(the.themeIndex, body.String()) } func (the *Storage2Es) SaveVib(dataList []common_models.EsVbRaw) { body := strings.Builder{} for _, raw := range dataList { // scala => val id = UUID.nameUUIDFromBytes(s"${v.deviceId}-${v.acqTime.getMillis}".getBytes("UTF-8")).toString source, _ := json.Marshal(raw) _id := common_calc.NameUUIDFromString(fmt.Sprintf("%s-%d", raw.IotaDevice, raw.CollectTime.UnixMilli())) s := fmt.Sprintf( `{"index": {"_index": "%s","_id": "%s"}} %s `, the.vibIndex, _id, source) body.WriteString(s) } the.esHelper.BulkWrite(the.vibIndex, body.String()) } // SaveGroupTheme 分组主题数据写入ES func (the *Storage2Es) SaveGroupTheme(dataList []common_models.EsGroupTheme) { body := strings.Builder{} for _, theme := range dataList { source, _ := json.Marshal(theme) _id := common_calc.NameUUIDFromString(fmt.Sprintf("%d-%d", theme.GroupId, theme.CollectTime.UnixMilli())) s := fmt.Sprintf( `{"index": {"_index": "%s","_id": "%s"}} %s `, the.groupIndex, _id, source) body.WriteString(s) } newEsHelper().BulkWrite(the.groupIndex, body.String()) } func bathRawMarshal(index string, arrays []common_models.EsRaw) strings.Builder { body := strings.Builder{} wg := sync.WaitGroup{} for _, array := range arrays { wg.Add(1) go func(raw common_models.EsRaw) { defer wg.Done() source, _ := json.Marshal(raw) _id := common_calc.NameUUIDFromString(fmt.Sprintf("%s-%d", raw.IotaDevice, raw.CollectTime.UnixMilli())) s := fmt.Sprintf( `{"index": {"_index": "%s","_id": "%s"}} %s `, index, _id, source) body.WriteString(s) }(array) } wg.Wait() return body } func bathVbRawMarshal(index string, arrays []common_models.EsVbRaw) strings.Builder { body := strings.Builder{} wg := sync.WaitGroup{} for _, array := range arrays { wg.Add(1) go func(raw common_models.EsVbRaw) { defer wg.Done() source, _ := json.Marshal(raw) _id := common_calc.NameUUIDFromString(fmt.Sprintf("%s-%d", raw.IotaDevice, raw.CollectTime.UnixMilli())) s := fmt.Sprintf( `{"index": {"_index": "%s","_id": "%s"}} %s `, index, _id, source) body.WriteString(s) }(array) } wg.Wait() return body } func bathThemeMarshal(index string, arrays []common_models.EsTheme) strings.Builder { body := strings.Builder{} wg := sync.WaitGroup{} for _, array := range arrays { wg.Add(1) go func(theme common_models.EsTheme) { defer wg.Done() source, _ := json.Marshal(theme) _id := common_calc.NameUUIDFromString(fmt.Sprintf("%d-%d", theme.Sensor, theme.CollectTime.UnixMilli())) s := fmt.Sprintf( `{"index": {"_index": "%s","_id": "%s"}} %s `, index, _id, source) body.WriteString(s) }(array) } wg.Wait() return body }