Compare commits

...

4 Commits

  1. 4
      containerApp/main.go
  2. 30
      dataSource/channels.go
  3. 69
      dataSource/go.mod
  4. 60
      dataSource/kafka/aggData.go
  5. 23
      dataSource/kafka/iotaData.go
  6. 71
      dataSource/kafka/kafka_handler.go
  7. 5
      et_rpc/rpc.go
  8. 2
      go.work
  9. 29
      master/go.mod
  10. 2
      master/main.go
  11. 2
      node/go.mod

4
containerApp/main.go

@ -40,7 +40,7 @@ func main() {
}
masterTag := configLoad.LoadConfig().GetString("master.hostNameTag")
log.Printf("hostName =[%s],masterTag=[%s]", hostName, masterTag)
log.Printf("hostName=[%s],masterTag=[%s]", hostName, masterTag)
if hostName == masterTag {
log.Printf("启动类型:master => hostName=[%s]", hostName)
etMaster.Start()
@ -53,11 +53,9 @@ func main() {
log.Printf("启动类型:node => hostName=[%s]", hostName)
etNode.Start()
}
}
func pprofRun() {
pprofAddr := ":10000"
log.Printf("性能分析 => pprofAddr=[%s]", pprofAddr)
go func() {

30
dataSource/channels.go

@ -1,30 +0,0 @@
package dataSource
import (
"gitea.anxinyun.cn/container/common_models"
"sync"
)
type DataChannels struct {
RawDataChan chan common_models.IotaData
AggDataChan chan common_models.AggData
}
var (
once sync.Once
dataChannels *DataChannels
)
func InitChannels() *DataChannels {
once.Do(func() {
dataChannels = &DataChannels{
RawDataChan: make(chan common_models.IotaData, 1),
AggDataChan: make(chan common_models.AggData, 1),
}
})
return dataChannels
}
func GetChannels() *DataChannels {
return dataChannels
}

69
dataSource/go.mod

@ -1,69 +0,0 @@
module dataSource
go 1.23.1
require (
gitea.anxinyun.cn/container/common_models v0.0.10
gitea.anxinyun.cn/container/common_utils v0.0.8
)
require (
github.com/IBM/sarama v1.43.0 // indirect
github.com/allegro/bigcache v1.2.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/eclipse/paho.mqtt.golang v1.4.3 // indirect
github.com/eko/gocache/lib/v4 v4.1.5 // indirect
github.com/eko/gocache/store/bigcache/v4 v4.2.1 // indirect
github.com/eko/gocache/store/redis/v4 v4.2.1 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/redis/go-redis/v9 v9.5.1 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.18.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

60
dataSource/kafka/aggData.go

@ -1,60 +0,0 @@
package kafka
import (
"dataSource"
"encoding/json"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"log"
"strings"
"time"
)
type AggDataHandler struct {
configHelper *common_utils.ConfigHelper
}
func NewAggDataHandler() *AggDataHandler {
redisAddr := configLoad.LoadConfig().GetString("redis.address")
return &AggDataHandler{
configHelper: common_utils.NewConfigHelper(redisAddr),
}
}
func (h AggDataHandler) HandleMessage(message string) bool {
// aggDataMsg: {"date":"2024-09-19T09:39:59.999+0000","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}}
// aggDataMsg 中的时间为UTC格式 2024-04-19T01:10:59.999+0000,
// 在进行 json.Unmarshal() 时报错
// 解决方案:先将 +0000 -> Z,然后再将 UTC 时间转换为中国时区时间("Asia/Shanghai")
// 2024-09-28T23:59:59.999+0800
// 将 2024-04-19T01:10:59.999+0000 -> 2024-04-19T01:10:59.999Z
utcTimeStr := strings.Replace(message, "+0800", "+08:00", 1)
utcTimeStr = strings.Replace(utcTimeStr, "+0000", "Z", 1)
aggData := common_models.AggData{}
err := json.Unmarshal([]byte(utcTimeStr), &aggData)
if err != nil {
log.Printf("json parse error: %v", err)
return false
}
// 转换为中国时区时间("Asia/Shanghai")
loc, _ := time.LoadLocation("Asia/Shanghai")
chinaTime := aggData.Date.In(loc)
aggData.Date = chinaTime
//log.Printf("message:%v\n, cvt: %+v", message, aggData)
if aggData.ThingId == "" {
structure, err := h.configHelper.GetStructure(aggData.StructId)
if err != nil {
log.Printf("redis 中无 key = structure:%d 的缓存数据.", aggData.StructId)
return false
}
aggData.ThingId = structure.ThingId
}
log.Printf("handler 处理sensorId[%d]消息", aggData.SensorId)
dataSource.GetChannels().AggDataChan <- aggData
return true
}

23
dataSource/kafka/iotaData.go

@ -1,23 +0,0 @@
package kafka
import (
"dataSource"
"encoding/json"
"gitea.anxinyun.cn/container/common_models"
"log"
)
type IotaDataHandler struct{}
func (h IotaDataHandler) HandleMessage(message string) bool {
// 处理 alarm 消息
rawData := common_models.IotaData{}
err := json.Unmarshal([]byte(message), &rawData)
if err != nil {
log.Printf("[dataSource/kafka/iotaData/IotaHandler] Parse msg error: %v", err)
return false
}
log.Printf("handler 处理[%s|%s]消息", rawData.DeviceId, rawData.TriggerTime)
dataSource.GetChannels().RawDataChan <- rawData
return true
}

71
dataSource/kafka/kafka_handler.go

@ -1,71 +0,0 @@
package kafka
import (
"dataSource"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"gitea.anxinyun.cn/container/common_utils/kafkaHelper"
"log"
)
type KafkaDataSource struct {
groupId string
brokers []string
topics map[string]string
DataChannels *dataSource.DataChannels
}
func NewKafkaDataSource() *KafkaDataSource {
// 初始化所有通道
dataSource.InitChannels()
// 读取配置
config := configLoad.LoadConfig()
groupId := config.GetString("kafka.groupId")
brokers := config.GetStringSlice("kafka.brokers")
topics := config.GetStringMapString("kafka.topics")
return &KafkaDataSource{
groupId: groupId,
brokers: brokers,
topics: topics,
DataChannels: dataSource.GetChannels(),
}
}
// Producer 将 kafka message -> 各数据模型 -> 各数据通道
func (s *KafkaDataSource) Producer() {
// 消费数据
kafkaConsumer := kafkaHelper.NewConsumerGroupHandler(s.brokers, s.groupId)
for cfgName, topic := range s.topics {
// 创建消息处理器
handler := NewMessageHandler(cfgName)
if handler == nil {
log.Printf("Kafka topic【%s】未定义处理者。\n", cfgName)
continue
}
// 订阅主题 和 消息处理
kafkaConsumer.Subscribe(topic, handler.HandleMessage)
}
kafkaConsumer.Worker()
}
// IMessageHandler 是 kafka 消息处理者接口
type IMessageHandler interface {
HandleMessage(message string) bool
}
// NewMessageHandler 是 MessageHandler 构造函数
// cfgName: config.yaml 中 kafka.topics 的配置名
func NewMessageHandler(cfgName string) IMessageHandler {
switch cfgName {
case "data_raw":
log.Printf("Kafka topic【%s】已注册处理者IotaDataHandler\n", cfgName)
return IotaDataHandler{}
case "data_agg":
log.Printf("Kafka topic【%s】已注册处理者AggDataHandler\n", cfgName)
return NewAggDataHandler()
default:
return nil
}
}

5
et_rpc/rpc.go

@ -9,6 +9,11 @@ const (
NodeState_Unhealthy
)
const (
RPCReply_Success RPCReplyCode = iota
RPCReply_Failure
)
type NodeArgs struct {
ID string
Addr string

2
go.work

@ -11,7 +11,7 @@ use (
node
et_analyze
et_cache
dataSource
et_print
containerApp
et_rpc
)

29
master/go.mod

@ -4,28 +4,15 @@ go 1.23.1
require (
gitea.anxinyun.cn/container/common_models v0.0.12
gitea.anxinyun.cn/container/common_utils v0.0.13
github.com/IBM/sarama v1.43.0
github.com/jolestar/go-commons-pool v2.0.0+incompatible
)
require (
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.18.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fortytw2/leaktest v1.3.0 // indirect
github.com/panjf2000/ants/v2 v2.11.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
golang.org/x/sync v0.11.0 // indirect
golang.org/x/time v0.10.0 // indirect
)

2
master/main.go

@ -6,5 +6,5 @@ import (
func main() {
app.Start()
println("over=======")
println("======= master main over=======")
}

2
node/go.mod

@ -4,7 +4,7 @@ go 1.23.1
require (
gitea.anxinyun.cn/container/common_models v0.0.12
gitea.anxinyun.cn/container/common_utils v0.0.13
gitea.anxinyun.cn/container/common_utils v0.0.14
github.com/google/uuid v1.6.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)

Loading…
Cancel
Save