You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
144 lines
3.1 KiB
144 lines
3.1 KiB
package consumers
|
|
|
|
import (
|
|
"encoding/json"
|
|
"goInOut/adaptors"
|
|
"goInOut/consumers/AXYraw"
|
|
"goInOut/dbHelper"
|
|
"goInOut/dbHelper/_kafka"
|
|
"goInOut/models"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type consumerAXYraw struct {
|
|
//数据缓存管道
|
|
dataCache chan *models.EsRaw
|
|
//具体配置
|
|
ConfigInfo AXYraw.ConfigFile
|
|
InKafka _kafka.KafkaHelper
|
|
OutEs dbHelper.ESHelper
|
|
infoRedis *dbHelper.RedisHelper
|
|
sinkRawMap sync.Map
|
|
lock sync.Mutex
|
|
}
|
|
|
|
func (the *consumerAXYraw) LoadConfigJson(cfgStr string) {
|
|
// 将 JSON 格式的数据解析到结构体中
|
|
err := json.Unmarshal([]byte(cfgStr), &the.ConfigInfo)
|
|
if err != nil {
|
|
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (the *consumerAXYraw) Initial(cfg string) error {
|
|
the.sinkRawMap = sync.Map{}
|
|
the.dataCache = make(chan *models.EsRaw, 200)
|
|
|
|
the.LoadConfigJson(cfg)
|
|
err := the.inputInitial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = the.outputInitial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = the.infoComponentInitial()
|
|
return err
|
|
}
|
|
func (the *consumerAXYraw) inputInitial() error {
|
|
//数据入口
|
|
the.InKafka = _kafka.KafkaHelper{
|
|
Brokers: the.ConfigInfo.IoConfig.In.Kafka.Brokers,
|
|
GroupId: the.ConfigInfo.IoConfig.In.Kafka.GroupId,
|
|
}
|
|
the.InKafka.Initial()
|
|
for _, inTopic := range the.ConfigInfo.IoConfig.In.Kafka.Topics {
|
|
the.InKafka.Subscribe(inTopic, the.onData)
|
|
}
|
|
|
|
the.InKafka.Worker()
|
|
return nil
|
|
}
|
|
func (the *consumerAXYraw) outputInitial() error {
|
|
//数据出口
|
|
the.OutEs = *dbHelper.NewESHelper(
|
|
the.ConfigInfo.IoConfig.Out.Es.Address,
|
|
the.ConfigInfo.IoConfig.Out.Es.Auth.UserName,
|
|
the.ConfigInfo.IoConfig.Out.Es.Auth.Password,
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (the *consumerAXYraw) infoComponentInitial() error {
|
|
//数据出口
|
|
addr := the.ConfigInfo.Info.QueryComponent.Redis.Address
|
|
the.infoRedis = dbHelper.NewRedisHelper("", addr)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (the *consumerAXYraw) sinkTask() {
|
|
intervalSec := the.ConfigInfo.IoConfig.Out.Es.Interval
|
|
ticker := time.NewTicker(time.Duration(intervalSec) * time.Second)
|
|
defer ticker.Stop()
|
|
for true {
|
|
<-ticker.C
|
|
the.toSink()
|
|
}
|
|
}
|
|
|
|
func (the *consumerAXYraw) toSink() {
|
|
var raws []models.EsRaw
|
|
the.lock.Lock()
|
|
defer the.lock.Unlock()
|
|
the.sinkRawMap.Range(func(key, value any) bool {
|
|
if v, ok := value.(*models.EsRaw); ok {
|
|
raws = append(raws, *v)
|
|
return ok
|
|
}
|
|
return false
|
|
})
|
|
if len(raws) > 0 {
|
|
log.Printf("准备写入es %d条", len(raws))
|
|
index := the.ConfigInfo.IoConfig.Out.Es.Index
|
|
the.OutEs.BulkWriteRaws2Es(index, raws)
|
|
the.sinkRawMap.Clear()
|
|
}
|
|
}
|
|
|
|
func (the *consumerAXYraw) Work() {
|
|
go the.sinkTask()
|
|
go func() {
|
|
for {
|
|
pushEsRaw := <-the.dataCache
|
|
log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache))
|
|
|
|
//有效数据存入缓存
|
|
the.lock.Lock()
|
|
the.sinkRawMap.Store(pushEsRaw.IotaDevice, pushEsRaw)
|
|
the.lock.Unlock()
|
|
}
|
|
|
|
}()
|
|
}
|
|
func (the *consumerAXYraw) onData(topic string, msg string) bool {
|
|
//if len(msg) > 80 {
|
|
// log.Printf("recv:[%s]:%s ...", topic, msg[:80])
|
|
//}
|
|
adaptor := adaptors.Adaptor_AXY_LastRAW{
|
|
Redis: the.infoRedis,
|
|
}
|
|
|
|
needPush := adaptor.Transform(topic, msg)
|
|
|
|
if needPush != nil {
|
|
the.dataCache <- needPush
|
|
}
|
|
|
|
return true
|
|
}
|
|
|