9 changed files with 340 additions and 18 deletions
@ -0,0 +1,65 @@ |
|||
package adaptors |
|||
|
|||
import ( |
|||
"encoding/json" |
|||
"goInOut/consumers/AXYraw" |
|||
"goInOut/dbOperate" |
|||
"goInOut/models" |
|||
"log" |
|||
"time" |
|||
) |
|||
|
|||
// Adaptor_Savoir_LastTheme 知物云 kafka theme数据 转换 es设备数据
|
|||
type Adaptor_Savoir_LastTheme struct { |
|||
AXYraw.Info |
|||
Redis *dbOperate.RedisHelper |
|||
} |
|||
|
|||
func (the Adaptor_Savoir_LastTheme) Transform(topic, rawMsg string) *models.EsTheme { |
|||
theme := models.SavoirTheme{} |
|||
err := json.Unmarshal([]byte(rawMsg), &theme) |
|||
if err != nil { |
|||
log.Printf("反序列化 异常 dev数据=%s", rawMsg) |
|||
return nil |
|||
} |
|||
|
|||
return the.Theme2Es(theme) |
|||
} |
|||
|
|||
func (the Adaptor_Savoir_LastTheme) Theme2Es(theme models.SavoirTheme) *models.EsTheme { |
|||
if theme.DataEmpty || len(theme.Data) == 0 { |
|||
bs, _ := json.Marshal(theme) |
|||
log.Printf("测点[%s]数据为空 => %s", theme.Station.Name, bs) |
|||
return nil |
|||
} |
|||
|
|||
////log.Printf("设备[%s] 数据时间 %s", iotaData.DeviceId, iotaData.TriggerTime)
|
|||
//deviceInfo := the.GetDeviceInfo(iotaData.DeviceId)
|
|||
//
|
|||
////查不到信息的数据
|
|||
//if deviceInfo.Name == "" {
|
|||
// log.Printf("设备[%s] 无deviceInfo信息 %s", iotaData.DeviceId, iotaData.TriggerTime)
|
|||
// return nil
|
|||
//}
|
|||
//
|
|||
Atime, err := time.Parse("2006-01-02T15:04:05.000+0800", theme.AcqTime) |
|||
if err != nil { |
|||
log.Printf("知物云 测点[%s] 数据时间 %s 解析错误", theme.Station.Name, theme.AcqTime) |
|||
return nil |
|||
} |
|||
|
|||
themeData := &models.EsTheme{ |
|||
SensorName: theme.Station.Name, |
|||
FactorName: theme.Station.Factor.Name, |
|||
FactorProtoCode: theme.Station.Factor.ProtoCode, |
|||
Data: theme.Data, |
|||
FactorProtoName: theme.Station.Factor.ProtoName, |
|||
Factor: theme.Station.Factor.Id, |
|||
CollectTime: Atime, |
|||
Sensor: theme.Station.Id, |
|||
Structure: theme.Station.Structure.Id, |
|||
IotaDevice: []string{}, |
|||
CreateTime: time.Now(), |
|||
} |
|||
return themeData |
|||
} |
@ -0,0 +1,27 @@ |
|||
consumer: consumerSavoirTheme |
|||
ioConfig: |
|||
in: |
|||
kafka: |
|||
brokers: |
|||
- 10.8.30.160:30992 |
|||
groupId: savoir_last_theme_inout |
|||
topics: |
|||
- savoir_theme |
|||
out: |
|||
es: |
|||
address: |
|||
- "http://10.8.30.160:30092" |
|||
index: "savoir_last_theme" |
|||
auth: |
|||
userName: post |
|||
password: 123 |
|||
interval: 30 |
|||
monitor: |
|||
|
|||
cron10min: 6/10 * * * * |
|||
cron1hour: 45 0/1 * * * |
|||
|
|||
queryComponent: |
|||
redis: |
|||
address: 10.8.30.160:30379 |
|||
|
@ -0,0 +1,31 @@ |
|||
package SavoirTheme |
|||
|
|||
import "goInOut/config" |
|||
|
|||
type ConfigFile struct { |
|||
IoConfig ioConfig `yaml:"ioConfig"` |
|||
Monitor map[string]string `yaml:"monitor"` |
|||
QueryComponent queryComponent `yaml:"queryComponent"` |
|||
} |
|||
type ioConfig struct { |
|||
In in `json:"in"` |
|||
Out out `json:"out"` |
|||
} |
|||
type in struct { |
|||
Kafka config.KafkaConfig `json:"kafka"` |
|||
} |
|||
|
|||
type out struct { |
|||
Es config.EsConfig `json:"es"` |
|||
} |
|||
|
|||
type Info struct { |
|||
//Common map[string]string `json:"common"`
|
|||
QueryComponent queryComponent `json:"queryComponent"` |
|||
} |
|||
|
|||
type queryComponent struct { |
|||
Redis struct { |
|||
Address string `json:"address"` |
|||
} `json:"redis"` |
|||
} |
@ -0,0 +1,163 @@ |
|||
package consumers |
|||
|
|||
import ( |
|||
"encoding/json" |
|||
"goInOut/adaptors" |
|||
"goInOut/consumers/SavoirTheme" |
|||
"goInOut/dbOperate" |
|||
"goInOut/dbOperate/_kafka" |
|||
"goInOut/models" |
|||
"gopkg.in/yaml.v3" |
|||
"log" |
|||
"sync" |
|||
"time" |
|||
) |
|||
|
|||
type consumerSavoirTheme struct { |
|||
//数据缓存管道
|
|||
dataCache chan *models.EsTheme |
|||
//具体配置
|
|||
Info SavoirTheme.ConfigFile |
|||
InKafka _kafka.KafkaHelper |
|||
OutEs dbOperate.ESHelper |
|||
infoRedis *dbOperate.RedisHelper |
|||
sinkMap sync.Map |
|||
lock sync.Mutex |
|||
logTagId int |
|||
} |
|||
|
|||
func (the *consumerSavoirTheme) LoadConfigJson(cfgStr string) { |
|||
// 将 JSON 格式的数据解析到结构体中
|
|||
err := yaml.Unmarshal([]byte(cfgStr), &the.Info) |
|||
if err != nil { |
|||
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) |
|||
panic(err) |
|||
} |
|||
} |
|||
|
|||
func (the *consumerSavoirTheme) Initial(cfg string) error { |
|||
the.sinkMap = sync.Map{} |
|||
the.dataCache = make(chan *models.EsTheme, 500) |
|||
|
|||
the.LoadConfigJson(cfg) |
|||
err := the.inputInitial() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
err = the.outputInitial() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
err = the.infoComponentInitial() |
|||
return err |
|||
} |
|||
func (the *consumerSavoirTheme) inputInitial() error { |
|||
//数据入口
|
|||
the.InKafka = _kafka.KafkaHelper{ |
|||
Brokers: the.Info.IoConfig.In.Kafka.Brokers, |
|||
GroupId: the.Info.IoConfig.In.Kafka.GroupId, |
|||
} |
|||
the.InKafka.Initial() |
|||
for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics { |
|||
the.InKafka.Subscribe(inTopic, the.onData) |
|||
} |
|||
|
|||
the.InKafka.Worker() |
|||
return nil |
|||
} |
|||
func (the *consumerSavoirTheme) outputInitial() error { |
|||
//数据出口
|
|||
the.OutEs = *dbOperate.NewESHelper( |
|||
the.Info.IoConfig.Out.Es.Address, |
|||
the.Info.IoConfig.Out.Es.Auth.UserName, |
|||
the.Info.IoConfig.Out.Es.Auth.Password, |
|||
) |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (the *consumerSavoirTheme) infoComponentInitial() error { |
|||
//数据出口
|
|||
//addr := the.Info.OtherInfo..Redis.Address
|
|||
//the.infoRedis = dbOperate.NewRedisHelper("", addr)
|
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (the *consumerSavoirTheme) sinkTask() { |
|||
intervalSec := the.Info.IoConfig.Out.Es.Interval |
|||
ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) |
|||
defer ticker.Stop() |
|||
for { |
|||
<-ticker.C |
|||
the.toSink() |
|||
} |
|||
} |
|||
|
|||
func (the *consumerSavoirTheme) toSink() { |
|||
var themes []models.EsTheme |
|||
the.lock.Lock() |
|||
defer the.lock.Unlock() |
|||
the.sinkMap.Range(func(key, value any) bool { |
|||
if v, ok := value.(*models.EsTheme); ok { |
|||
themes = append(themes, *v) |
|||
//零时打日志用
|
|||
if v.Sensor == the.logTagId { |
|||
bs, _ := json.Marshal(v) |
|||
log.Printf("toSink -> Range 标记测点数据 [%d] %s ", the.logTagId, string(bs)) |
|||
} |
|||
return ok |
|||
} else { |
|||
log.Printf("!!! toSink -> Range 类型转换异常 [%v]", key) |
|||
} |
|||
return true |
|||
}) |
|||
if len(themes) > 0 { |
|||
index := the.Info.IoConfig.Out.Es.Index |
|||
log.Printf("写入es [%s] %d条", index, len(themes)) |
|||
the.OutEs.BulkWriteThemes2Es(index, themes) |
|||
the.sinkMap.Clear() |
|||
} |
|||
} |
|||
|
|||
func (the *consumerSavoirTheme) Work() { |
|||
log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId) |
|||
go the.sinkTask() |
|||
go func() { |
|||
for { |
|||
pushEsTheme := <-the.dataCache |
|||
|
|||
if pushEsTheme.Sensor == the.logTagId { |
|||
bs, _ := json.Marshal(pushEsTheme) |
|||
log.Printf("存储 标记测点数据 [%d] %s ", the.logTagId, string(bs)) |
|||
} |
|||
|
|||
//有效数据存入缓存
|
|||
the.lock.Lock() |
|||
the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme) |
|||
the.lock.Unlock() |
|||
} |
|||
|
|||
}() |
|||
} |
|||
func (the *consumerSavoirTheme) onData(topic string, msg string) bool { |
|||
//if len(msg) > 80 {
|
|||
// log.Printf("recv:[%s]:%s ...", topic, msg[:80])
|
|||
//}
|
|||
adaptor := adaptors.Adaptor_Savoir_LastTheme{} |
|||
|
|||
needPush := adaptor.Transform(topic, msg) |
|||
|
|||
if needPush != nil && needPush.Data != nil { |
|||
the.dataCache <- needPush |
|||
} else { |
|||
s, _ := json.Marshal(needPush) |
|||
if needPush != nil { |
|||
if needPush.Sensor == the.logTagId { |
|||
log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s) |
|||
} |
|||
} |
|||
} |
|||
|
|||
return true |
|||
} |
Loading…
Reference in new issue