数据 输入输出 处理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
3.8 KiB

package consumers
import (
"encoding/json"
"goInOut/adaptors"
"goInOut/consumers/AXYraw"
"goInOut/dbOperate"
"goInOut/dbOperate/_kafka"
"goInOut/models"
"log"
"sync"
"time"
)
type consumerAXYraw struct {
//数据缓存管道
dataCache chan *models.EsRaw
//具体配置
ConfigInfo AXYraw.ConfigFile
InKafka _kafka.KafkaHelper
OutEs dbOperate.ESHelper
infoRedis *dbOperate.RedisHelper
sinkRawMap sync.Map
lock sync.Mutex
}
func (the *consumerAXYraw) LoadConfigJson(cfgStr string) {
// 将 JSON 格式的数据解析到结构体中
err := json.Unmarshal([]byte(cfgStr), &the.ConfigInfo)
if err != nil {
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
panic(err)
}
}
func (the *consumerAXYraw) Initial(cfg string) error {
the.sinkRawMap = sync.Map{}
the.dataCache = make(chan *models.EsRaw, 200)
the.LoadConfigJson(cfg)
err := the.inputInitial()
if err != nil {
return err
}
err = the.outputInitial()
if err != nil {
return err
}
err = the.infoComponentInitial()
return err
}
func (the *consumerAXYraw) inputInitial() error {
//数据入口
the.InKafka = _kafka.KafkaHelper{
Brokers: the.ConfigInfo.IoConfig.In.Kafka.Brokers,
GroupId: the.ConfigInfo.IoConfig.In.Kafka.GroupId,
}
the.InKafka.Initial()
for _, inTopic := range the.ConfigInfo.IoConfig.In.Kafka.Topics {
the.InKafka.Subscribe(inTopic, the.onData)
}
the.InKafka.Worker()
return nil
}
func (the *consumerAXYraw) outputInitial() error {
//数据出口
the.OutEs = *dbOperate.NewESHelper(
the.ConfigInfo.IoConfig.Out.Es.Address,
the.ConfigInfo.IoConfig.Out.Es.Auth.UserName,
the.ConfigInfo.IoConfig.Out.Es.Auth.Password,
)
return nil
}
func (the *consumerAXYraw) infoComponentInitial() error {
//数据出口
addr := the.ConfigInfo.Info.QueryComponent.Redis.Address
the.infoRedis = dbOperate.NewRedisHelper("", addr)
return nil
}
func (the *consumerAXYraw) sinkTask() {
intervalSec := the.ConfigInfo.IoConfig.Out.Es.Interval
ticker := time.NewTicker(time.Duration(intervalSec) * time.Second)
defer ticker.Stop()
for {
<-ticker.C
the.toSink()
}
}
func (the *consumerAXYraw) toSink() {
var raws []models.EsRaw
the.lock.Lock()
defer the.lock.Unlock()
the.sinkRawMap.Range(func(key, value any) bool {
if v, ok := value.(*models.EsRaw); ok {
raws = append(raws, *v)
//零时打日志用
if v.IotaDevice == logTagDeviceId {
bs, _ := json.Marshal(v)
log.Printf("toSink -> Range 标记设备数据 [%s] %s ", logTagDeviceId, string(bs))
}
return ok
} else {
log.Printf("!!! toSink -> Range 类型转换异常 [%v]", key)
}
return true
})
if len(raws) > 0 {
log.Printf("准备写入es %d条", len(raws))
index := the.ConfigInfo.IoConfig.Out.Es.Index
the.OutEs.BulkWriteRaws2Es(index, raws)
the.sinkRawMap.Clear()
}
}
const logTagDeviceId = "91da4d1f-fbc7-4dad-bedd-f8ff05c0e0e0"
func (the *consumerAXYraw) Work() {
go the.sinkTask()
go func() {
for {
pushEsRaw := <-the.dataCache
if pushEsRaw.IotaDevice == logTagDeviceId {
bs, _ := json.Marshal(pushEsRaw)
log.Printf("存储 标记设备数据 [%s] %s ", logTagDeviceId, string(bs))
}
//有效数据存入缓存
the.lock.Lock()
the.sinkRawMap.Store(pushEsRaw.IotaDevice, pushEsRaw)
the.lock.Unlock()
}
}()
}
func (the *consumerAXYraw) onData(topic string, msg string) bool {
//if len(msg) > 80 {
// log.Printf("recv:[%s]:%s ...", topic, msg[:80])
//}
adaptor := adaptors.Adaptor_AXY_LastRAW{
Redis: the.infoRedis,
}
needPush := adaptor.Transform(topic, msg)
if needPush != nil && len(needPush.Meta) > 0 && needPush.Data != nil {
//日志标记
if needPush.IotaDevice == logTagDeviceId {
bs, _ := json.Marshal(needPush)
log.Printf("onData -> needPush 标记设备数据 [%s] %s ", logTagDeviceId, string(bs))
}
the.dataCache <- needPush
}
return true
}