Compare commits

...

13 Commits
v0.0.7 ... dev

  1. 30
      README.md
  2. 4
      chainedCache.go
  3. 175
      configHelper.go
  4. 27
      dbHelper/elasticsearchHelper.go
  5. 11
      go.mod
  6. 22
      go.sum
  7. 2
      kafkaHelper/consumerGroupHandler.go
  8. 15
      redisHelper.go
  9. 39
      storage/storageDBs/storage2Es.go
  10. 54
      storage/storageDBs/storage2InfluxDB.go
  11. 5
      transform/transform.go
  12. 12
      util_test.go

30
README.md

@ -1,4 +1,28 @@
# common_utils
重建
common_utils
# common_utils 通用工具包
支持:Elasticsearch、InfluxDB、Kafka 的操作
### 开发语言和版本
golang ,版本 go1.23.1
### 平台支持
安心云4.0
### 使用方式:
1. 设置 go 环境
set GOPRIVATE=gitea.anxinyun.cn
2. 查询go 版本
go list -m -versions gitea.anxinyun.cn/container/common_utils
如若 GOPRIVATE=gitea.anxinyun.cn 生效,则会返回 common_utils 的版本信息。
如若无版本信息返回,有可能是没有 container 的权限,也有可能环境变量设置没有生效。
异常情况处理:
1. 无 https://gitea.anxinyun.cn/container 访问权限,可以联系下管理员。
2. 环境变量设置不生效
解决方法,在GoLand工具中进行设置:
1)打开 Settings
2)Go > GOROOT,Download Go SDK
3)Go > Go Modules,设置环境变量 GOPRIVATE=gitea.anxinyun.cn
### 依赖包管理
common_utils 如若有修改,需要同步升级 et-go

4
chainedCache.go

@ -18,11 +18,11 @@ type ChainedCache struct {
func NewChainedCache(redisAddr string) *ChainedCache {
bigCacheClient, _ := bigcache.NewBigCache(bigcache.DefaultConfig(2 * time.Minute))
bigCacheClient, _ := bigcache.NewBigCache(bigcache.DefaultConfig(1 * time.Minute))
bigCacheStore := bigcache_store.NewBigcache(bigCacheClient)
redisStore := redis_store.NewRedis(redis.NewClient(&redis.Options{
Addr: redisAddr,
}), store.WithExpiration(2*time.Minute))
}), store.WithExpiration(1*time.Minute+20*time.Second))
cacheManager := cache.NewChain[any](
cache.New[any](bigCacheStore),
cache.New[any](redisStore),

175
configHelper.go

@ -2,11 +2,15 @@ package common_utils
import (
"context"
"encoding/json"
"gitea.anxinyun.cn/container/common_models/constant/settlementParam"
"strings"
//"encoding/json"
"errors"
"fmt"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_models/constant/redisKey"
json "github.com/bytedance/sonic"
"github.com/eko/gocache/lib/v4/store"
"log"
"time"
@ -23,6 +27,7 @@ var IotaDeviceCache map[string]common_models.IotaDevice
var ThingStructCache map[string]common_models.ThingStruct
var DeviceNodeCache map[string]common_models.IotaInstances
var AlarmCodeCache map[string]common_models.AlarmCode
var StructureCache map[int]common_models.Structure
// var stationThreshold map[int]common_models.Threshold
// var aggThreshold map[string]common_models.AggThreshold
@ -62,6 +67,31 @@ func initDeviceInfoMapCache() {
DeviceMetaCache = make(map[string]common_models.DeviceMeta)
}
if deviceStationIdsCache == nil {
deviceStationIdsCache = make(map[string][]int)
}
if stationCache == nil {
stationCache = make(map[int]common_models.Station)
}
if deviceFactorProtoMap == nil {
deviceFactorProtoMap = make(map[string]common_models.DeviceFactorProto)
}
if FormulaCache == nil {
FormulaCache = make(map[int]common_models.Formula)
}
if ProtoCache == nil {
ProtoCache = make(map[string]common_models.Proto)
}
if AlarmCodeCache == nil {
AlarmCodeCache = make(map[string]common_models.AlarmCode)
}
if StructureCache == nil {
StructureCache = make(map[int]common_models.Structure)
}
}
// GetDeviceInfo 通过
@ -137,6 +167,9 @@ func (the *ConfigHelper) GetDeviceStationIds(deviceId string) ([]int, error) {
k := fmt.Sprintf("%s:%s", redisKey.Device_stationIds, deviceId)
//var deviceMeta common_models.DeviceMeta
s := the.redisHelper.Get(k)
if s == "" {
return result, errors.New(fmt.Sprintf("redis 中无key=[%s] 缓存", k))
}
err = json.Unmarshal([]byte(s), &result)
//err = the.redisHelper.GetObj(k, &result)
}
@ -144,16 +177,21 @@ func (the *ConfigHelper) GetDeviceStationIds(deviceId string) ([]int, error) {
}
func (the *ConfigHelper) SetChainedCacheObj(k string, obj any) error {
var value string
if v, ok := obj.(string); !ok {
v, err := json.Marshal(obj)
var err error
switch obj.(type) {
case string:
value = obj.(string)
case []byte:
value = string(obj.([]byte))
default:
bs, err := json.Marshal(obj)
if err != nil {
return err
}
value = string(v)
} else {
value = v
value = string(bs)
}
err := the.chainedCache.LoadableChinCache.Set(the.ctx, k, value)
err = the.chainedCache.LoadableChinCache.Set(the.ctx, k, value)
return err
}
func (the *ConfigHelper) SetChainedCacheObjWithExpiration(k string, obj any, duration time.Duration) error {
@ -170,6 +208,10 @@ func (the *ConfigHelper) SetChainedCacheObjWithExpiration(k string, obj any, dur
err := the.chainedCache.LoadableChinCache.Set(the.ctx, k, value, store.WithExpiration(duration))
return err
}
func (the *ConfigHelper) DeleteChainedCacheObj(k string) error {
err := the.chainedCache.LoadableChinCache.Delete(the.ctx, k)
return err
}
func (the *ConfigHelper) GetCacheWindowObj(key_cacheWindow string) (common_models.CacheWindow, error) {
var redisCacheWin common_models.CacheWinSave
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, key_cacheWindow)
@ -205,9 +247,13 @@ func (the *ConfigHelper) GetDeviceStationObjs(deviceId string) ([]common_models.
//err = the.redisHelper.GetObj(k, &result)
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if v, ok := value.(string); ok {
if v == "" {
log.Printf("LoadableChinCache[%s]=空", k)
return result, err
}
err = json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error())
log.Printf("json unmarshal error:%s", err.Error())
}
}
return result, err
@ -216,7 +262,11 @@ func (the *ConfigHelper) SetDeviceStationObjs(deviceId string, stations common_m
var err error
k := fmt.Sprintf("%s:%s", redisKey.Device_stationObjs, deviceId)
err = the.SetChainedCacheObj(k, &stations)
bytes, err := json.Marshal(stations)
if err != nil {
return err
}
go the.SetChainedCacheObj(k, bytes)
return err
}
@ -291,23 +341,78 @@ func (the *ConfigHelper) SetStationGroup(groupId int, group common_models.Statio
k := fmt.Sprintf("%s:%d", redisKey.Group, groupId)
return the.chainedCache.LoadableChinCache.Set(the.ctx, k, &group)
}
// 最大级联层级
const max_corr_depth int = 15
// 关联分组(e.g 沉降级联)
// depth: 递归查询深度(防止错误配置导致基点互相参考;亦限制级联的最大层级)
func (the *ConfigHelper) corrGroups(gp common_models.StationGroup, depth int) {
if gp.GroupType == "202" && gp.Params != nil && len(gp.Params) == 2 {
ref_base_stationId, ok := gp.Params[settlementParam.Ref_base].(float64)
if !ok {
log.Println("ref_base 转换异常。", gp.Params[settlementParam.Ref_base])
return
}
ref_point_stationId, ok := gp.Params[settlementParam.Ref_point].(float64)
if !ok {
log.Println("ref_base 转换异常。", gp.Params[settlementParam.Ref_point])
return
}
basePtr := gp.GetSettlementBaseItem()
if basePtr == nil {
log.Println("无基点。", gp.R())
return
}
sg, err := the.GetStationGroupInfo(int(ref_base_stationId))
subBase := common_models.GroupItem{
StationId: int(ref_base_stationId),
ParamsValue: map[string]interface{}{"base": true},
SubItems: nil,
}
if err == nil {
refGroup, err := the.GetStationGroup(sg.GroupId)
if err == nil {
if depth+1 <= max_corr_depth {
the.corrGroups(refGroup, depth+1)
}
}
subBase = *refGroup.GetItem(int(ref_base_stationId))
}
subPoint := common_models.GroupItem{
StationId: int(ref_point_stationId),
ParamsValue: map[string]interface{}{"base": false},
SubItems: nil,
}
basePtr.SubItems = map[string]common_models.GroupItem{
settlementParam.Ref_base: subBase,
settlementParam.Ref_point: subPoint,
}
//log.Println(basePtr)
}
}
func (the *ConfigHelper) GetStationGroup(groupId int) (common_models.StationGroup, error) {
var result common_models.StationGroup
var group common_models.StationGroup
// group:35
k := fmt.Sprintf("%s:%d", redisKey.Group, groupId)
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if v, ok := value.(string); ok {
if v == "" {
err = errors.New("无测点group 数据")
return result, err
return group, err
}
err = json.Unmarshal([]byte(v), &result)
err = json.Unmarshal([]byte(v), &group)
if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error())
log.Printf("err => v=%s", v)
}
}
return result, err
the.corrGroups(group, 0)
return group, err
}
// RedisKey.station_group
@ -315,6 +420,7 @@ func (the *ConfigHelper) SetStationGroupInfo(stationId int, info common_models.S
k := fmt.Sprintf("%s:%d", redisKey.Station_group, stationId)
return the.chainedCache.LoadableChinCache.Set(the.ctx, k, &info)
}
func (the *ConfigHelper) GetStationGroupInfo(stationId int) (common_models.StationGroupInfo, error) {
// sg:193
k := fmt.Sprintf("%s:%d", redisKey.Station_group, stationId)
@ -473,15 +579,29 @@ func (the *ConfigHelper) GetFilter(stationId int) (common_models.Filter, error)
return result, err
}
func (the *ConfigHelper) GetFilterItem(stationId int, item string) (common_models.FilterItem, error) {
result, err := the.GetFilter(stationId)
if err == nil {
for _, filterItem := range result.Items {
if filterItem.FieldName == item {
return filterItem, err
}
}
} else {
log.Printf("获取GetFilterItem err=%s", err.Error())
}
return common_models.FilterItem{}, err
}
func (the *ConfigHelper) GetAlarmCode(alarmCode string) (common_models.AlarmCode, error) {
//var iotaDevice common_models.IotaDevice
var err error
result, ok := AlarmCodeCache[alarmCode]
if !ok { //去redis查询
//thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd
k := fmt.Sprintf("%s:%s", redisKey.Alarm_code, alarmCode)
err = the.redisHelper.GetObj(k, &result)
AlarmCodeCache[alarmCode] = result
}
return result, err
}
@ -497,9 +617,10 @@ func (the *ConfigHelper) GetIotaScheme(dimensionId string) (common_models.IotaSc
var scheme common_models.IotaScheme
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if v, ok := value.(string); ok {
err = json.Unmarshal([]byte(v), &scheme)
formattedStr := strings.Replace(v, "+0800", "+08:00", -1)
err = json.Unmarshal([]byte(formattedStr), &scheme)
if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error())
log.Printf("【GetIotaScheme】json unmarshal error:%s \n", err.Error())
}
}
return scheme, err
@ -578,3 +699,17 @@ func (the *ConfigHelper) mapToTree(source map[string]common_models.IotaInstance,
}
return deviceNode
}
func (the *ConfigHelper) GetStructure(structId int) (common_models.Structure, error) {
var result common_models.Structure
// structure:1
k := fmt.Sprintf("%s:%d", redisKey.Structure, structId)
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if v, ok := value.(string); ok {
err = json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error())
}
}
return result, err
}

27
dbHelper/elasticsearchHelper.go

@ -30,7 +30,7 @@ func NewESHelper(addresses []string, user, pwd string) *ESHelper {
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
log.Printf("链接到es[%s]info=%v", elasticsearch6.Version, res)
log.Printf("链接到es[%s]info=%v", elasticsearch6.Version, res.StatusCode)
return &ESHelper{
addresses: addresses,
esClient: es,
@ -45,10 +45,8 @@ func (the *ESHelper) SearchRaw(index, reqBody string) []common_models.HitRaw {
)
defer response.Body.Close()
if err != nil {
//return nil, err
return nil
}
log.Println(response.Status())
r := common_models.EsRawResp{}
// Deserialize the response into a map.
if err := json.NewDecoder(response.Body).Decode(&r); err != nil {
@ -192,7 +190,7 @@ func (the *ESHelper) BulkWrite(index, reqBody string) {
if res.StatusCode != 200 && res.StatusCode != 201 {
respBody, _ := io.ReadAll(res.Body)
log.Panicf("es 写入[%s]失败,err=%s", string(respBody))
log.Panicf("es 写入失败,err=%s \n body=%s", string(respBody), reqBody)
}
//log.Printf("es 写入[%s],字符长度=%d,完成", index, len(reqBody))
@ -213,3 +211,22 @@ func (the *ESHelper) BulkWriteRaws2Es(index string, raws []common_models.EsRaw)
the.BulkWrite(index, body.String())
}
func (the *ESHelper) Close() {
}
// BulkWriteGroup2Es 分组主题数据写入ES
func (the *ESHelper) BulkWriteGroup2Es(index string, themes []common_models.EsGroupTheme) {
body := strings.Builder{}
for _, theme := range themes {
source, _ := theme.MarshalJSON() //json.Marshal(theme)
_id := common_calc.NameUUIDFromString(fmt.Sprintf("%d-%d", theme.GroupId, theme.CollectTime.UnixMilli()))
s := fmt.Sprintf(
`{"index": {"_index": "%s","_id": "%s"}}
%s
`, index, _id, source)
body.WriteString(s)
}
the.BulkWrite(index, body.String())
}

11
go.mod

@ -1,12 +1,13 @@
module gitea.anxinyun.cn/container/common_utils
go 1.22.0
go 1.23.1
require (
gitea.anxinyun.cn/container/common_calc v0.0.1
gitea.anxinyun.cn/container/common_models v0.0.7
gitea.anxinyun.cn/container/common_models v0.0.11
github.com/IBM/sarama v1.43.0
github.com/allegro/bigcache v1.2.1
github.com/bytedance/sonic v1.12.2
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/eko/gocache/lib/v4 v4.1.5
github.com/eko/gocache/store/bigcache/v4 v4.2.1
@ -21,7 +22,10 @@ require (
require (
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic/loader v0.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect
@ -44,6 +48,7 @@ require (
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
@ -64,8 +69,10 @@ require (
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/arch v0.4.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.21.0 // indirect

22
go.sum

@ -35,6 +35,9 @@ gitea.anxinyun.cn/container/common_calc v0.0.1 h1:tGHq1jfdMCPg9nuoYLDBKrj0TCj8+9
gitea.anxinyun.cn/container/common_calc v0.0.1/go.mod h1:KAde7EMcFemVuEryjMiGGJDcA3bJA+0a8q5Ql+KU5sA=
gitea.anxinyun.cn/container/common_models v0.0.7 h1:zlHYJy7zFwqrH4q8KqbVjoMVL9TKNfkAZVauEljc8rk=
gitea.anxinyun.cn/container/common_models v0.0.7/go.mod h1:RXbYCDiXQGGeon1+9q/lWCSx7bXdXyX58PVJkZsPIGA=
gitea.anxinyun.cn/container/common_models v0.0.8/go.mod h1:uiuu9XJajjULCFfhC9Sx3EPy0yLUcbQ8swpIjnMX4OU=
gitea.anxinyun.cn/container/common_models v0.0.11 h1:wsisXrdkngN55AIeKWlhczADO36O4kg9OyVL64NtS7o=
gitea.anxinyun.cn/container/common_models v0.0.11/go.mod h1:uiuu9XJajjULCFfhC9Sx3EPy0yLUcbQ8swpIjnMX4OU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/IBM/sarama v1.43.0 h1:YFFDn8mMI2QL0wOrG0J2sFoVIAFl7hS9JQi2YZsXtJc=
@ -60,6 +63,11 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bytedance/sonic v1.12.2 h1:oaMFuRTpMHYLpCntGca65YWt5ny+wAceDERTkT2L9lg=
github.com/bytedance/sonic v1.12.2/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKzMzT9r/rk=
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
github.com/bytedance/sonic/loader v0.2.0 h1:zNprn+lsIP06C/IqCHs3gPQIvnvpKbbxyXQP1iU4kWM=
github.com/bytedance/sonic/loader v0.2.0/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
@ -69,6 +77,10 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -221,6 +233,10 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
@ -314,6 +330,7 @@ github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpE
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
@ -322,6 +339,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@ -336,6 +355,8 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/arch v0.4.0 h1:A8WCeEWhLwPBKNbFi5Wv5UTCBx5zzubnXDlMOFAzFMc=
golang.org/x/arch v0.4.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@ -647,6 +668,7 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=

2
kafkaHelper/consumerGroupHandler.go

@ -48,7 +48,7 @@ func (h *ConsumerGroupHandler) SubscribeRaw(topic string, fun func(*sarama.Consu
func (h *ConsumerGroupHandler) decorateSubscribeString(handler func(string) bool) func(*sarama.ConsumerMessage) bool {
f := func(cm *sarama.ConsumerMessage) bool {
msg := string(cm.Value)
log.Printf("处理topic[%s]数据 offset=[%d]", cm.Topic, cm.Offset)
log.Printf("处理topic[%s]消息 offset=[%d]", cm.Topic, cm.Offset)
return handler(msg)
}
return f

15
redisHelper.go

@ -19,7 +19,15 @@ func NewRedisHelper(master string, address ...string) *RedisHelper {
r := &RedisHelper{ctx: context.Background()}
r.InitialCluster(master, address...)
return r
//r := &RedisHelper{ctx: context.Background()}
//var wg sync.WaitGroup
//wg.Add(1)
//go func() {
// defer wg.Done()
// r.InitialCluster(master, address...)
//}()
//wg.Wait()
//return r
}
func (the *RedisHelper) InitialCluster(master string, address ...string) {
@ -39,6 +47,11 @@ func (the *RedisHelper) InitialCluster(master string, address ...string) {
}
func (the *RedisHelper) Get(key string) string {
if the.rdb == nil {
log.Println("Redis client is not initialized")
return ""
}
val, err := the.rdb.Get(the.ctx, key).Result()
if errors.Is(err, redis.Nil) {
log.Printf("%s does not exist", key)

39
storage/storageDBs/storage2Es.go

@ -1,12 +1,14 @@
package storageDBs
import (
"encoding/json"
//"encoding/json"
"fmt"
"gitea.anxinyun.cn/container/common_calc"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"gitea.anxinyun.cn/container/common_utils/dbHelper"
json "github.com/bytedance/sonic"
"log"
"strings"
"sync"
)
@ -45,10 +47,20 @@ func newEsHelper() *dbHelper.ESHelper {
}
func (the *Storage2Es) SaveRaw(dataList []common_models.EsRaw) {
defer func() {
if err := recover(); err != nil {
log.Printf("未知异常=> %v", err)
}
}()
body := strings.Builder{}
for _, raw := range dataList {
// scala => val id = UUID.nameUUIDFromBytes(s"${v.deviceId}-${v.acqTime.getMillis}".getBytes("UTF-8")).toString
source, _ := json.Marshal(raw)
source, err := json.Marshal(raw)
if err != nil {
log.Printf("raw序列化异常,err=%s", err.Error())
continue
}
docId := fmt.Sprintf("%s-%d", raw.IotaDevice, raw.CollectTime.UnixMilli())
_id := common_calc.NameUUIDFromString(docId)
s := fmt.Sprintf(
@ -57,8 +69,7 @@ func (the *Storage2Es) SaveRaw(dataList []common_models.EsRaw) {
`, the.rawIndex, _id, source)
body.WriteString(s)
}
//the.esHelper.BulkWrite(the.rawIndex, body.String())
newEsHelper().BulkWrite(the.rawIndex, body.String())
the.esHelper.BulkWrite(the.rawIndex, body.String())
}
func (the *Storage2Es) SaveTheme(dataList []common_models.EsTheme) {
@ -66,7 +77,11 @@ func (the *Storage2Es) SaveTheme(dataList []common_models.EsTheme) {
for _, theme := range dataList {
// scala => val id = UUID.nameUUIDFromBytes(s"${sd.station.id}-${sd.acqTime.getMillis}".getBytes("UTF-8")).toString
source, _ := json.Marshal(theme)
source, err := theme.MarshalJSON() //json.Marshal(theme)
if err != nil {
log.Printf("theme序列化异常,err=%s", err.Error())
continue
}
_id := common_calc.NameUUIDFromString(fmt.Sprintf("%d-%d", theme.Sensor, theme.CollectTime.UnixMilli()))
s := fmt.Sprintf(
`{"index": {"_index": "%s","_id": "%s"}}
@ -74,8 +89,7 @@ func (the *Storage2Es) SaveTheme(dataList []common_models.EsTheme) {
`, the.themeIndex, _id, source)
body.WriteString(s)
}
//the.esHelper.BulkWrite(the.themeIndex, body.String())
newEsHelper().BulkWrite(the.themeIndex, body.String())
the.esHelper.BulkWrite(the.themeIndex, body.String())
}
func (the *Storage2Es) SaveVib(dataList []common_models.EsVbRaw) {
body := strings.Builder{}
@ -89,15 +103,18 @@ func (the *Storage2Es) SaveVib(dataList []common_models.EsVbRaw) {
`, the.vibIndex, _id, source)
body.WriteString(s)
}
//the.esHelper.BulkWrite(the.vibIndex, body.String())
newEsHelper().BulkWrite(the.vibIndex, body.String())
the.esHelper.BulkWrite(the.vibIndex, body.String())
}
// SaveGroupTheme 分组主题数据写入ES
func (the *Storage2Es) SaveGroupTheme(dataList []common_models.EsGroupTheme) {
body := strings.Builder{}
for _, theme := range dataList {
source, _ := json.Marshal(theme)
source, err := theme.MarshalJSON() //json.Marshal(theme)
if err != nil {
log.Printf("group theme 序列化异常,err=%s", err.Error())
continue
}
_id := common_calc.NameUUIDFromString(fmt.Sprintf("%d-%d", theme.GroupId, theme.CollectTime.UnixMilli()))
s := fmt.Sprintf(
`{"index": {"_index": "%s","_id": "%s"}}
@ -105,7 +122,7 @@ func (the *Storage2Es) SaveGroupTheme(dataList []common_models.EsGroupTheme) {
`, the.groupIndex, _id, source)
body.WriteString(s)
}
the.esHelper.BulkWrite(the.groupIndex, body.String())
newEsHelper().BulkWrite(the.groupIndex, body.String())
}
func bathRawMarshal(index string, arrays []common_models.EsRaw) strings.Builder {
body := strings.Builder{}

54
storage/storageDBs/storage2InfluxDB.go

@ -6,7 +6,6 @@ import (
"gitea.anxinyun.cn/container/common_utils/configLoad"
"gitea.anxinyun.cn/container/common_utils/dbHelper"
"gitea.anxinyun.cn/container/common_utils/transform"
"strings"
)
type Storage2InfluxDB struct {
@ -44,35 +43,54 @@ func (the *Storage2InfluxDB) SaveDeviceData(dataList []common_models.DeviceData)
}
var sb []string
for _, d := range dataList {
fields := strings.Builder{}
transform.Obj2mapStr(&fields, d.Raw)
line := fmt.Sprintf("%s %s %d", d.DeviceId, fields.String(), d.AcqTime.UnixNano())
if d.Raw == nil {
continue
}
fields := transform.Obj2mapStr(d.Raw)
line := fmt.Sprintf("%s %s %d", d.DeviceId, fields, d.AcqTime.UnixNano())
sb = append(sb, line)
}
the.influxDBHelper.Write(sb, the.rawBucket)
}
func (the *Storage2InfluxDB) SaveRaw(dataList []common_models.EsRaw) {
if len(dataList) == 0 {
return
}
var sb []string
for _, d := range dataList {
fields := strings.Builder{}
transform.Obj2mapStr(&fields, d.Data)
line := fmt.Sprintf("%s %s %d", d.IotaDevice, fields.String(), d.CollectTime.UnixNano())
if d.Data == nil {
continue
}
//cloneMap := copyMap(d.Data)
//奇怪的错误
fields := transform.Obj2mapStr(d.Data)
line := fmt.Sprintf("%s %s %d", d.IotaDevice, fields, d.CollectTime.UnixNano())
sb = append(sb, line)
}
the.influxDBHelper.Write(sb, the.rawBucket)
}
func copyMap(raw map[string]any) map[string]any {
m := make(map[string]any)
for k, v := range raw {
m[k] = v
}
return m
}
func (the *Storage2InfluxDB) SaveTheme(dataList []common_models.EsTheme) {
if len(dataList) == 0 {
return
}
var sb []string
for _, d := range dataList {
fields := strings.Builder{}
transform.Obj2mapStr(&fields, d.Data)
line := fmt.Sprintf("%d %s %d", d.Sensor, fields.String(), d.CollectTime.UnixNano())
if d.Data == nil {
continue
}
fields := transform.Obj2mapStr(d.Data)
line := fmt.Sprintf("factor_%d,sensor_id=%d %s %d", d.Factor, d.Sensor, fields, d.CollectTime.UnixNano())
sb = append(sb, line)
}
the.influxDBHelper.Write(sb, the.themeBucket)
@ -85,9 +103,11 @@ func (the *Storage2InfluxDB) SaveVib(dataList []common_models.EsVbRaw) {
for _, data := range dataList {
onceList := data.FlatMapDynamicVib()
for _, onceD := range onceList {
fields := strings.Builder{}
transform.Obj2mapStr(&fields, onceD.Data)
line := fmt.Sprintf("%s %s %d", onceD.IotaDevice, fields.String(), onceD.CollectTime.UnixNano())
if onceD.Data == nil {
continue
}
fields := transform.Obj2mapStr(onceD.Data)
line := fmt.Sprintf("%s %s %d", onceD.IotaDevice, fields, onceD.CollectTime.UnixNano())
sb = append(sb, line)
}
}
@ -99,10 +119,12 @@ func (the *Storage2InfluxDB) SaveThemeData(dataList []common_models.StationData)
}
var sb []string
for _, d := range dataList {
fields := strings.Builder{}
transform.Obj2mapStr(&fields, d.ThemeData)
if d.ThemeData == nil {
continue
}
fields := transform.Obj2mapStr(d.ThemeData)
sensorId := 123
line := fmt.Sprintf("%d %s %d", sensorId, fields.String(), d.CollectTime.UnixNano())
line := fmt.Sprintf("%d %s %d", sensorId, fields, d.CollectTime.UnixNano())
sb = append(sb, line)
}
the.influxDBHelper.Write(sb, the.themeBucket)

5
transform/transform.go

@ -10,8 +10,9 @@ import (
// Obj2mapStr
// 将map[string]any 转为字符串 a="123",b=234 这种格式
func Obj2mapStr(fields *strings.Builder, dataMap map[string]any) *strings.Builder {
func Obj2mapStr(dataMap map[string]any) string {
ItemCount := 0
fields := strings.Builder{}
for k, v := range dataMap {
sValue := ""
switch v.(type) {
@ -36,7 +37,7 @@ func Obj2mapStr(fields *strings.Builder, dataMap map[string]any) *strings.Builde
}
}
return fields
return fields.String()
}
func Numerical(raw map[string]any) map[string]any {

12
util_test.go

@ -1,16 +1,19 @@
package common_utils
import (
"encoding/json"
"fmt"
"gitea.anxinyun.cn/container/common_calc"
"gitea.anxinyun.cn/container/common_models"
"github.com/stretchr/testify/assert"
"log"
//"strings"
"testing"
"time"
)
func Test_maxMin(t *testing.T) {
d := []float64{2.1, 3.1, 1.1, 5.1, 8.1, 7.1, 6.1}
_min, _max := common_calc.MinMax(d)
_basMax := common_calc.AbsMax(d)
@ -153,6 +156,15 @@ func TestConfigHelper_GetIotaScheme(t *testing.T) {
}
}
func TestConfigHelper_GetIotaScheme2(t *testing.T) {
formattedStr := `{"id":"f65e5990-540d-40ff-9056-af775e5e4c56","name":"G07下游-静力","mode":"R","interval":1,"unit":"second","beginTime":"2023-08-10T16:45:35.244+08:00","endTime":null,"dimension":{"id":"f65e5990-540d-40ff-9056-af775e5e4c56","thingId":"8e3eec71-c924-47fd-ac8b-2f28c49ad4e9","name":"G07下游-静力"}}`
var scheme common_models.IotaScheme
err := json.Unmarshal([]byte(formattedStr), &scheme)
if err != nil {
log.Printf("【GetIotaScheme】json unmarshal error:%s \n", err.Error())
}
}
func TestConfigHelper_GetStationFilter(t *testing.T) {
cf := NewConfigHelper("10.8.30.160:30379")

Loading…
Cancel
Save