Compare commits

...

8 Commits
v0.0.8 ... dev

  1. 51
      configHelper.go
  2. 4
      dbHelper/elasticsearchHelper.go
  3. 2
      go.mod
  4. 3
      go.sum
  5. 2
      kafkaHelper/consumerGroupHandler.go
  6. 15
      redisHelper.go
  7. 17
      storage/storageDBs/storage2Es.go
  8. 2
      storage/storageDBs/storage2InfluxDB.go
  9. 10
      util_test.go

51
configHelper.go

@ -167,6 +167,9 @@ func (the *ConfigHelper) GetDeviceStationIds(deviceId string) ([]int, error) {
k := fmt.Sprintf("%s:%s", redisKey.Device_stationIds, deviceId)
//var deviceMeta common_models.DeviceMeta
s := the.redisHelper.Get(k)
if s == "" {
return result, errors.New(fmt.Sprintf("redis 中无key=[%s] 缓存", k))
}
err = json.Unmarshal([]byte(s), &result)
//err = the.redisHelper.GetObj(k, &result)
}
@ -174,16 +177,21 @@ func (the *ConfigHelper) GetDeviceStationIds(deviceId string) ([]int, error) {
}
func (the *ConfigHelper) SetChainedCacheObj(k string, obj any) error {
var value string
if v, ok := obj.(string); !ok {
v, err := json.Marshal(obj)
var err error
switch obj.(type) {
case string:
value = obj.(string)
case []byte:
value = string(obj.([]byte))
default:
bs, err := json.Marshal(obj)
if err != nil {
return err
}
value = string(v)
} else {
value = v
value = string(bs)
}
err := the.chainedCache.LoadableChinCache.Set(the.ctx, k, value)
err = the.chainedCache.LoadableChinCache.Set(the.ctx, k, value)
return err
}
func (the *ConfigHelper) SetChainedCacheObjWithExpiration(k string, obj any, duration time.Duration) error {
@ -200,6 +208,10 @@ func (the *ConfigHelper) SetChainedCacheObjWithExpiration(k string, obj any, dur
err := the.chainedCache.LoadableChinCache.Set(the.ctx, k, value, store.WithExpiration(duration))
return err
}
func (the *ConfigHelper) DeleteChainedCacheObj(k string) error {
err := the.chainedCache.LoadableChinCache.Delete(the.ctx, k)
return err
}
func (the *ConfigHelper) GetCacheWindowObj(key_cacheWindow string) (common_models.CacheWindow, error) {
var redisCacheWin common_models.CacheWinSave
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, key_cacheWindow)
@ -250,7 +262,11 @@ func (the *ConfigHelper) SetDeviceStationObjs(deviceId string, stations common_m
var err error
k := fmt.Sprintf("%s:%s", redisKey.Device_stationObjs, deviceId)
err = the.SetChainedCacheObj(k, &stations)
bytes, err := json.Marshal(stations)
if err != nil {
return err
}
go the.SetChainedCacheObj(k, bytes)
return err
}
@ -386,6 +402,9 @@ func (the *ConfigHelper) GetStationGroup(groupId int) (common_models.StationGrou
k := fmt.Sprintf("%s:%d", redisKey.Group, groupId)
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if v, ok := value.(string); ok {
if v == "" {
return group, err
}
err = json.Unmarshal([]byte(v), &group)
if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error())
@ -560,6 +579,22 @@ func (the *ConfigHelper) GetFilter(stationId int) (common_models.Filter, error)
return result, err
}
func (the *ConfigHelper) GetFilterItem(stationId int, item string) (common_models.FilterItem, error) {
result, err := the.GetFilter(stationId)
if err == nil {
for _, filterItem := range result.Items {
if filterItem.FieldName == item {
return filterItem, err
}
}
} else {
log.Printf("获取GetFilterItem err=%s", err.Error())
}
return common_models.FilterItem{}, err
}
func (the *ConfigHelper) GetAlarmCode(alarmCode string) (common_models.AlarmCode, error) {
var err error
result, ok := AlarmCodeCache[alarmCode]
@ -585,7 +620,7 @@ func (the *ConfigHelper) GetIotaScheme(dimensionId string) (common_models.IotaSc
formattedStr := strings.Replace(v, "+0800", "+08:00", -1)
err = json.Unmarshal([]byte(formattedStr), &scheme)
if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error())
log.Printf("【GetIotaScheme】json unmarshal error:%s \n", err.Error())
}
}
return scheme, err

4
dbHelper/elasticsearchHelper.go

@ -30,7 +30,7 @@ func NewESHelper(addresses []string, user, pwd string) *ESHelper {
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
log.Printf("链接到es[%s]info=%v", elasticsearch6.Version, res)
log.Printf("链接到es[%s]info=%v", elasticsearch6.Version, res.StatusCode)
return &ESHelper{
addresses: addresses,
esClient: es,
@ -220,7 +220,7 @@ func (the *ESHelper) Close() {
func (the *ESHelper) BulkWriteGroup2Es(index string, themes []common_models.EsGroupTheme) {
body := strings.Builder{}
for _, theme := range themes {
source, _ := json.Marshal(theme)
source, _ := theme.MarshalJSON() //json.Marshal(theme)
_id := common_calc.NameUUIDFromString(fmt.Sprintf("%d-%d", theme.GroupId, theme.CollectTime.UnixMilli()))
s := fmt.Sprintf(
`{"index": {"_index": "%s","_id": "%s"}}

2
go.mod

@ -4,7 +4,7 @@ go 1.23.1
require (
gitea.anxinyun.cn/container/common_calc v0.0.1
gitea.anxinyun.cn/container/common_models v0.0.8
gitea.anxinyun.cn/container/common_models v0.0.11
github.com/IBM/sarama v1.43.0
github.com/allegro/bigcache v1.2.1
github.com/bytedance/sonic v1.12.2

3
go.sum

@ -35,6 +35,9 @@ gitea.anxinyun.cn/container/common_calc v0.0.1 h1:tGHq1jfdMCPg9nuoYLDBKrj0TCj8+9
gitea.anxinyun.cn/container/common_calc v0.0.1/go.mod h1:KAde7EMcFemVuEryjMiGGJDcA3bJA+0a8q5Ql+KU5sA=
gitea.anxinyun.cn/container/common_models v0.0.7 h1:zlHYJy7zFwqrH4q8KqbVjoMVL9TKNfkAZVauEljc8rk=
gitea.anxinyun.cn/container/common_models v0.0.7/go.mod h1:RXbYCDiXQGGeon1+9q/lWCSx7bXdXyX58PVJkZsPIGA=
gitea.anxinyun.cn/container/common_models v0.0.8/go.mod h1:uiuu9XJajjULCFfhC9Sx3EPy0yLUcbQ8swpIjnMX4OU=
gitea.anxinyun.cn/container/common_models v0.0.11 h1:wsisXrdkngN55AIeKWlhczADO36O4kg9OyVL64NtS7o=
gitea.anxinyun.cn/container/common_models v0.0.11/go.mod h1:uiuu9XJajjULCFfhC9Sx3EPy0yLUcbQ8swpIjnMX4OU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/IBM/sarama v1.43.0 h1:YFFDn8mMI2QL0wOrG0J2sFoVIAFl7hS9JQi2YZsXtJc=

2
kafkaHelper/consumerGroupHandler.go

@ -48,7 +48,7 @@ func (h *ConsumerGroupHandler) SubscribeRaw(topic string, fun func(*sarama.Consu
func (h *ConsumerGroupHandler) decorateSubscribeString(handler func(string) bool) func(*sarama.ConsumerMessage) bool {
f := func(cm *sarama.ConsumerMessage) bool {
msg := string(cm.Value)
log.Printf("处理topic[%s]数据 offset=[%d]", cm.Topic, cm.Offset)
log.Printf("处理topic[%s]消息 offset=[%d]", cm.Topic, cm.Offset)
return handler(msg)
}
return f

15
redisHelper.go

@ -19,7 +19,15 @@ func NewRedisHelper(master string, address ...string) *RedisHelper {
r := &RedisHelper{ctx: context.Background()}
r.InitialCluster(master, address...)
return r
//r := &RedisHelper{ctx: context.Background()}
//var wg sync.WaitGroup
//wg.Add(1)
//go func() {
// defer wg.Done()
// r.InitialCluster(master, address...)
//}()
//wg.Wait()
//return r
}
func (the *RedisHelper) InitialCluster(master string, address ...string) {
@ -39,6 +47,11 @@ func (the *RedisHelper) InitialCluster(master string, address ...string) {
}
func (the *RedisHelper) Get(key string) string {
if the.rdb == nil {
log.Println("Redis client is not initialized")
return ""
}
val, err := the.rdb.Get(the.ctx, key).Result()
if errors.Is(err, redis.Nil) {
log.Printf("%s does not exist", key)

17
storage/storageDBs/storage2Es.go

@ -69,8 +69,7 @@ func (the *Storage2Es) SaveRaw(dataList []common_models.EsRaw) {
`, the.rawIndex, _id, source)
body.WriteString(s)
}
//the.esHelper.BulkWrite(the.rawIndex, body.String())
newEsHelper().BulkWrite(the.rawIndex, body.String())
the.esHelper.BulkWrite(the.rawIndex, body.String())
}
func (the *Storage2Es) SaveTheme(dataList []common_models.EsTheme) {
@ -78,7 +77,7 @@ func (the *Storage2Es) SaveTheme(dataList []common_models.EsTheme) {
for _, theme := range dataList {
// scala => val id = UUID.nameUUIDFromBytes(s"${sd.station.id}-${sd.acqTime.getMillis}".getBytes("UTF-8")).toString
source, err := json.Marshal(theme)
source, err := theme.MarshalJSON() //json.Marshal(theme)
if err != nil {
log.Printf("theme序列化异常,err=%s", err.Error())
continue
@ -90,8 +89,7 @@ func (the *Storage2Es) SaveTheme(dataList []common_models.EsTheme) {
`, the.themeIndex, _id, source)
body.WriteString(s)
}
//the.esHelper.BulkWrite(the.themeIndex, body.String())
newEsHelper().BulkWrite(the.themeIndex, body.String())
the.esHelper.BulkWrite(the.themeIndex, body.String())
}
func (the *Storage2Es) SaveVib(dataList []common_models.EsVbRaw) {
body := strings.Builder{}
@ -105,15 +103,18 @@ func (the *Storage2Es) SaveVib(dataList []common_models.EsVbRaw) {
`, the.vibIndex, _id, source)
body.WriteString(s)
}
//the.esHelper.BulkWrite(the.vibIndex, body.String())
newEsHelper().BulkWrite(the.vibIndex, body.String())
the.esHelper.BulkWrite(the.vibIndex, body.String())
}
// SaveGroupTheme 分组主题数据写入ES
func (the *Storage2Es) SaveGroupTheme(dataList []common_models.EsGroupTheme) {
body := strings.Builder{}
for _, theme := range dataList {
source, _ := json.Marshal(theme)
source, err := theme.MarshalJSON() //json.Marshal(theme)
if err != nil {
log.Printf("group theme 序列化异常,err=%s", err.Error())
continue
}
_id := common_calc.NameUUIDFromString(fmt.Sprintf("%d-%d", theme.GroupId, theme.CollectTime.UnixMilli()))
s := fmt.Sprintf(
`{"index": {"_index": "%s","_id": "%s"}}

2
storage/storageDBs/storage2InfluxDB.go

@ -90,7 +90,7 @@ func (the *Storage2InfluxDB) SaveTheme(dataList []common_models.EsTheme) {
}
fields := transform.Obj2mapStr(d.Data)
line := fmt.Sprintf("%d %s %d", d.Sensor, fields, d.CollectTime.UnixNano())
line := fmt.Sprintf("factor_%d,sensor_id=%d %s %d", d.Factor, d.Sensor, fields, d.CollectTime.UnixNano())
sb = append(sb, line)
}
the.influxDBHelper.Write(sb, the.themeBucket)

10
util_test.go

@ -1,6 +1,7 @@
package common_utils
import (
"encoding/json"
"fmt"
"gitea.anxinyun.cn/container/common_calc"
"gitea.anxinyun.cn/container/common_models"
@ -155,6 +156,15 @@ func TestConfigHelper_GetIotaScheme(t *testing.T) {
}
}
func TestConfigHelper_GetIotaScheme2(t *testing.T) {
formattedStr := `{"id":"f65e5990-540d-40ff-9056-af775e5e4c56","name":"G07下游-静力","mode":"R","interval":1,"unit":"second","beginTime":"2023-08-10T16:45:35.244+08:00","endTime":null,"dimension":{"id":"f65e5990-540d-40ff-9056-af775e5e4c56","thingId":"8e3eec71-c924-47fd-ac8b-2f28c49ad4e9","name":"G07下游-静力"}}`
var scheme common_models.IotaScheme
err := json.Unmarshal([]byte(formattedStr), &scheme)
if err != nil {
log.Printf("【GetIotaScheme】json unmarshal error:%s \n", err.Error())
}
}
func TestConfigHelper_GetStationFilter(t *testing.T) {
cf := NewConfigHelper("10.8.30.160:30379")

Loading…
Cancel
Save