Compare commits

...

3 Commits

  1. 2
      configHelper.go
  2. 4
      dbHelper/elasticsearchHelper.go
  3. 2
      go.mod
  4. 3
      go.sum
  5. 2
      kafkaHelper/consumerGroupHandler.go
  6. 15
      redisHelper.go
  7. 8
      storage/storageDBs/storage2Es.go
  8. 10
      util_test.go

2
configHelper.go

@ -600,7 +600,7 @@ func (the *ConfigHelper) GetIotaScheme(dimensionId string) (common_models.IotaSc
formattedStr := strings.Replace(v, "+0800", "+08:00", -1)
err = json.Unmarshal([]byte(formattedStr), &scheme)
if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error())
log.Printf("【GetIotaScheme】json unmarshal error:%s \n", err.Error())
}
}
return scheme, err

4
dbHelper/elasticsearchHelper.go

@ -30,7 +30,7 @@ func NewESHelper(addresses []string, user, pwd string) *ESHelper {
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
log.Printf("链接到es[%s]info=%v", elasticsearch6.Version, res)
log.Printf("链接到es[%s]info=%v", elasticsearch6.Version, res.StatusCode)
return &ESHelper{
addresses: addresses,
esClient: es,
@ -220,7 +220,7 @@ func (the *ESHelper) Close() {
func (the *ESHelper) BulkWriteGroup2Es(index string, themes []common_models.EsGroupTheme) {
body := strings.Builder{}
for _, theme := range themes {
source, _ := json.Marshal(theme)
source, _ := theme.MarshalJSON() //json.Marshal(theme)
_id := common_calc.NameUUIDFromString(fmt.Sprintf("%d-%d", theme.GroupId, theme.CollectTime.UnixMilli()))
s := fmt.Sprintf(
`{"index": {"_index": "%s","_id": "%s"}}

2
go.mod

@ -4,7 +4,7 @@ go 1.23.1
require (
gitea.anxinyun.cn/container/common_calc v0.0.1
gitea.anxinyun.cn/container/common_models v0.0.8
gitea.anxinyun.cn/container/common_models v0.0.11
github.com/IBM/sarama v1.43.0
github.com/allegro/bigcache v1.2.1
github.com/bytedance/sonic v1.12.2

3
go.sum

@ -35,6 +35,9 @@ gitea.anxinyun.cn/container/common_calc v0.0.1 h1:tGHq1jfdMCPg9nuoYLDBKrj0TCj8+9
gitea.anxinyun.cn/container/common_calc v0.0.1/go.mod h1:KAde7EMcFemVuEryjMiGGJDcA3bJA+0a8q5Ql+KU5sA=
gitea.anxinyun.cn/container/common_models v0.0.7 h1:zlHYJy7zFwqrH4q8KqbVjoMVL9TKNfkAZVauEljc8rk=
gitea.anxinyun.cn/container/common_models v0.0.7/go.mod h1:RXbYCDiXQGGeon1+9q/lWCSx7bXdXyX58PVJkZsPIGA=
gitea.anxinyun.cn/container/common_models v0.0.8/go.mod h1:uiuu9XJajjULCFfhC9Sx3EPy0yLUcbQ8swpIjnMX4OU=
gitea.anxinyun.cn/container/common_models v0.0.11 h1:wsisXrdkngN55AIeKWlhczADO36O4kg9OyVL64NtS7o=
gitea.anxinyun.cn/container/common_models v0.0.11/go.mod h1:uiuu9XJajjULCFfhC9Sx3EPy0yLUcbQ8swpIjnMX4OU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/IBM/sarama v1.43.0 h1:YFFDn8mMI2QL0wOrG0J2sFoVIAFl7hS9JQi2YZsXtJc=

2
kafkaHelper/consumerGroupHandler.go

@ -48,7 +48,7 @@ func (h *ConsumerGroupHandler) SubscribeRaw(topic string, fun func(*sarama.Consu
func (h *ConsumerGroupHandler) decorateSubscribeString(handler func(string) bool) func(*sarama.ConsumerMessage) bool {
f := func(cm *sarama.ConsumerMessage) bool {
msg := string(cm.Value)
log.Printf("处理topic[%s]数据 offset=[%d]", cm.Topic, cm.Offset)
log.Printf("处理topic[%s]消息 offset=[%d]", cm.Topic, cm.Offset)
return handler(msg)
}
return f

15
redisHelper.go

@ -19,7 +19,15 @@ func NewRedisHelper(master string, address ...string) *RedisHelper {
r := &RedisHelper{ctx: context.Background()}
r.InitialCluster(master, address...)
return r
//r := &RedisHelper{ctx: context.Background()}
//var wg sync.WaitGroup
//wg.Add(1)
//go func() {
// defer wg.Done()
// r.InitialCluster(master, address...)
//}()
//wg.Wait()
//return r
}
func (the *RedisHelper) InitialCluster(master string, address ...string) {
@ -39,6 +47,11 @@ func (the *RedisHelper) InitialCluster(master string, address ...string) {
}
func (the *RedisHelper) Get(key string) string {
if the.rdb == nil {
log.Println("Redis client is not initialized")
return ""
}
val, err := the.rdb.Get(the.ctx, key).Result()
if errors.Is(err, redis.Nil) {
log.Printf("%s does not exist", key)

8
storage/storageDBs/storage2Es.go

@ -77,7 +77,7 @@ func (the *Storage2Es) SaveTheme(dataList []common_models.EsTheme) {
for _, theme := range dataList {
// scala => val id = UUID.nameUUIDFromBytes(s"${sd.station.id}-${sd.acqTime.getMillis}".getBytes("UTF-8")).toString
source, err := json.Marshal(theme)
source, err := theme.MarshalJSON() //json.Marshal(theme)
if err != nil {
log.Printf("theme序列化异常,err=%s", err.Error())
continue
@ -110,7 +110,11 @@ func (the *Storage2Es) SaveVib(dataList []common_models.EsVbRaw) {
func (the *Storage2Es) SaveGroupTheme(dataList []common_models.EsGroupTheme) {
body := strings.Builder{}
for _, theme := range dataList {
source, _ := json.Marshal(theme)
source, err := theme.MarshalJSON() //json.Marshal(theme)
if err != nil {
log.Printf("group theme 序列化异常,err=%s", err.Error())
continue
}
_id := common_calc.NameUUIDFromString(fmt.Sprintf("%d-%d", theme.GroupId, theme.CollectTime.UnixMilli()))
s := fmt.Sprintf(
`{"index": {"_index": "%s","_id": "%s"}}

10
util_test.go

@ -1,6 +1,7 @@
package common_utils
import (
"encoding/json"
"fmt"
"gitea.anxinyun.cn/container/common_calc"
"gitea.anxinyun.cn/container/common_models"
@ -155,6 +156,15 @@ func TestConfigHelper_GetIotaScheme(t *testing.T) {
}
}
func TestConfigHelper_GetIotaScheme2(t *testing.T) {
formattedStr := `{"id":"f65e5990-540d-40ff-9056-af775e5e4c56","name":"G07下游-静力","mode":"R","interval":1,"unit":"second","beginTime":"2023-08-10T16:45:35.244+08:00","endTime":null,"dimension":{"id":"f65e5990-540d-40ff-9056-af775e5e4c56","thingId":"8e3eec71-c924-47fd-ac8b-2f28c49ad4e9","name":"G07下游-静力"}}`
var scheme common_models.IotaScheme
err := json.Unmarshal([]byte(formattedStr), &scheme)
if err != nil {
log.Printf("【GetIotaScheme】json unmarshal error:%s \n", err.Error())
}
}
func TestConfigHelper_GetStationFilter(t *testing.T) {
cf := NewConfigHelper("10.8.30.160:30379")

Loading…
Cancel
Save