9 changed files with 315 additions and 0 deletions
@ -0,0 +1,65 @@ |
|||||
|
package adaptors |
||||
|
|
||||
|
import ( |
||||
|
"encoding/json" |
||||
|
"goInOut/consumers/AXYraw" |
||||
|
"goInOut/dbOperate" |
||||
|
"goInOut/models" |
||||
|
"log" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
// Adaptor_Anxinyun_LastTheme 安心云 kafka theme数据 转换 es数据
|
||||
|
type Adaptor_Anxinyun_LastTheme struct { |
||||
|
AXYraw.Info |
||||
|
Redis *dbOperate.RedisHelper |
||||
|
} |
||||
|
|
||||
|
func (the Adaptor_Anxinyun_LastTheme) Transform(topic, rawMsg string) *models.EsTheme { |
||||
|
theme := models.AXYSavoirTheme{} |
||||
|
err := json.Unmarshal([]byte(rawMsg), &theme) |
||||
|
if err != nil { |
||||
|
log.Printf("反序列化 异常 dev数据=%s", rawMsg) |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
return the.Theme2Es(theme) |
||||
|
} |
||||
|
|
||||
|
func (the Adaptor_Anxinyun_LastTheme) Theme2Es(theme models.AXYSavoirTheme) *models.EsTheme { |
||||
|
if theme.DataEmpty || len(theme.Data) == 0 { |
||||
|
bs, _ := json.Marshal(theme) |
||||
|
log.Printf("测点[%s]数据为空 => %s", theme.Station.Name, bs) |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
////log.Printf("设备[%s] 数据时间 %s", iotaData.DeviceId, iotaData.TriggerTime)
|
||||
|
//deviceInfo := the.GetDeviceInfo(iotaData.DeviceId)
|
||||
|
//
|
||||
|
////查不到信息的数据
|
||||
|
//if deviceInfo.Name == "" {
|
||||
|
// log.Printf("设备[%s] 无deviceInfo信息 %s", iotaData.DeviceId, iotaData.TriggerTime)
|
||||
|
// return nil
|
||||
|
//}
|
||||
|
//
|
||||
|
Atime, err := time.Parse("2006-01-02T15:04:05.000+0800", theme.AcqTime) |
||||
|
if err != nil { |
||||
|
log.Printf("安心云 测点[%s] 数据时间 %s 解析错误", theme.Station.Name, theme.AcqTime) |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
themeData := &models.EsTheme{ |
||||
|
SensorName: theme.Station.Name, |
||||
|
FactorName: theme.Station.Factor.Name, |
||||
|
FactorProtoCode: theme.Station.Factor.ProtoCode, |
||||
|
Data: theme.Data, |
||||
|
FactorProtoName: theme.Station.Factor.ProtoName, |
||||
|
Factor: theme.Station.Factor.Id, |
||||
|
CollectTime: Atime, |
||||
|
Sensor: theme.Station.Id, |
||||
|
Structure: theme.Station.Structure.Id, |
||||
|
IotaDevice: []string{}, |
||||
|
CreateTime: time.Now(), |
||||
|
} |
||||
|
return themeData |
||||
|
} |
@ -0,0 +1,20 @@ |
|||||
|
consumer: consumerAXYThemeToES |
||||
|
ioConfig: |
||||
|
in: |
||||
|
kafka: |
||||
|
brokers: |
||||
|
- 10.8.30.160:30992 |
||||
|
groupId: axysavoir_last |
||||
|
topics: |
||||
|
- anxinyun_theme #监听数据的的主题 |
||||
|
out: |
||||
|
es: |
||||
|
address: |
||||
|
- "http://10.8.30.150:30847" |
||||
|
# - "http://10.8.30.155:5601" |
||||
|
# - "http://10.8.30.160:30092" |
||||
|
index: "anxincloud_last_theme" #es存测点最后一条数据的新索引 |
||||
|
auth: |
||||
|
userName: post |
||||
|
password: 123 |
||||
|
interval: 30 #多久写一次es(秒) |
@ -0,0 +1,19 @@ |
|||||
|
package AXYTheme |
||||
|
|
||||
|
import "goInOut/config" |
||||
|
|
||||
|
type ConfigFile struct { |
||||
|
IoConfig ioConfig `yaml:"ioConfig"` |
||||
|
Monitor map[string]string `yaml:"monitor"` |
||||
|
} |
||||
|
type ioConfig struct { |
||||
|
In in `json:"in"` |
||||
|
Out out `json:"out"` |
||||
|
} |
||||
|
type in struct { |
||||
|
Kafka config.KafkaConfig `json:"kafka"` |
||||
|
} |
||||
|
|
||||
|
type out struct { |
||||
|
Es config.EsConfig `json:"es"` |
||||
|
} |
@ -0,0 +1,14 @@ |
|||||
|
package AXYTheme |
||||
|
|
||||
|
import ( |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
type OffLineGap struct { |
||||
|
StructId int `json:"struct_id" db:"struct_id"` |
||||
|
FactorId int `json:"factor_id" db:"factor_id"` |
||||
|
OfflineGap int `json:"offline_gap" db:"offline_gap"` |
||||
|
IsOpen bool `json:"is_open" db:"is_open"` |
||||
|
UpdateAt time.Time `json:"update_at" db:"update_at"` |
||||
|
StructName string `json:"name" db:"name"` |
||||
|
} |
@ -0,0 +1,157 @@ |
|||||
|
package consumers |
||||
|
|
||||
|
import ( |
||||
|
"encoding/json" |
||||
|
"goInOut/adaptors" |
||||
|
"goInOut/consumers/AXYTheme" |
||||
|
"goInOut/dbOperate" |
||||
|
"goInOut/dbOperate/_kafka" |
||||
|
"goInOut/models" |
||||
|
"goInOut/monitors" |
||||
|
"gopkg.in/yaml.v3" |
||||
|
"log" |
||||
|
"sync" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
type consumerAXYThemeToES struct { |
||||
|
//数据缓存管道
|
||||
|
dataCache chan *models.EsTheme |
||||
|
//具体配置
|
||||
|
Info AXYTheme.ConfigFile |
||||
|
InKafka _kafka.KafkaHelper |
||||
|
OutEs dbOperate.ESHelper |
||||
|
infoPg *dbOperate.DBHelper |
||||
|
sinkMap sync.Map |
||||
|
lock sync.Mutex |
||||
|
logTagId int |
||||
|
monitor *monitors.CommonMonitor |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYThemeToES) LoadConfigJson(cfgStr string) { |
||||
|
// 将 JSON 格式的数据解析到结构体中
|
||||
|
err := yaml.Unmarshal([]byte(cfgStr), &the.Info) |
||||
|
if err != nil { |
||||
|
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) |
||||
|
panic(err) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYThemeToES) Initial(cfg string) error { |
||||
|
the.sinkMap = sync.Map{} |
||||
|
the.dataCache = make(chan *models.EsTheme, 1000) |
||||
|
|
||||
|
the.LoadConfigJson(cfg) |
||||
|
err := the.inputInitial() |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
err = the.outputInitial() |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
return err |
||||
|
} |
||||
|
func (the *consumerAXYThemeToES) inputInitial() error { |
||||
|
//数据入口
|
||||
|
the.InKafka = _kafka.KafkaHelper{ |
||||
|
Brokers: the.Info.IoConfig.In.Kafka.Brokers, |
||||
|
GroupId: the.Info.IoConfig.In.Kafka.GroupId, |
||||
|
} |
||||
|
the.InKafka.Initial() |
||||
|
for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics { |
||||
|
the.InKafka.Subscribe(inTopic, the.onData) |
||||
|
} |
||||
|
|
||||
|
the.InKafka.Worker() |
||||
|
return nil |
||||
|
} |
||||
|
func (the *consumerAXYThemeToES) outputInitial() error { |
||||
|
//数据出口
|
||||
|
the.OutEs = *dbOperate.NewESHelper( |
||||
|
the.Info.IoConfig.Out.Es.Address, |
||||
|
the.Info.IoConfig.Out.Es.Auth.UserName, |
||||
|
the.Info.IoConfig.Out.Es.Auth.Password, |
||||
|
) |
||||
|
|
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYThemeToES) sinkTask() { |
||||
|
intervalSec := the.Info.IoConfig.Out.Es.Interval |
||||
|
ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) |
||||
|
defer ticker.Stop() |
||||
|
for { |
||||
|
<-ticker.C |
||||
|
the.toSink() |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYThemeToES) toSink() { |
||||
|
var themes []models.EsTheme |
||||
|
the.lock.Lock() |
||||
|
defer the.lock.Unlock() |
||||
|
the.sinkMap.Range(func(key, value any) bool { |
||||
|
if v, ok := value.(*models.EsTheme); ok { |
||||
|
themes = append(themes, *v) |
||||
|
//零时打日志用
|
||||
|
if v.Sensor == the.logTagId { |
||||
|
bs, _ := json.Marshal(v) |
||||
|
log.Printf("toSink -> Range 标记测点数据 [%d] %s ", the.logTagId, string(bs)) |
||||
|
} |
||||
|
return ok |
||||
|
} else { |
||||
|
log.Printf("!!! toSink -> Range 类型转换异常 [%v]", key) |
||||
|
} |
||||
|
return true |
||||
|
}) |
||||
|
if len(themes) > 0 { |
||||
|
index := the.Info.IoConfig.Out.Es.Index |
||||
|
log.Printf("写入es [%s] %d条", index, len(themes)) |
||||
|
the.OutEs.BulkWriteThemes2Es(index, themes) |
||||
|
the.sinkMap.Clear() |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func (the *consumerAXYThemeToES) Work() { |
||||
|
log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId) |
||||
|
go the.sinkTask() |
||||
|
go func() { |
||||
|
for { |
||||
|
pushEsTheme := <-the.dataCache |
||||
|
|
||||
|
if pushEsTheme.Sensor == the.logTagId { |
||||
|
bs, _ := json.Marshal(pushEsTheme) |
||||
|
log.Printf("存储 标记测点数据 [%d] %s ", the.logTagId, string(bs)) |
||||
|
} |
||||
|
|
||||
|
//有效数据存入缓存
|
||||
|
the.lock.Lock() |
||||
|
the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme) |
||||
|
the.lock.Unlock() |
||||
|
} |
||||
|
|
||||
|
}() |
||||
|
} |
||||
|
func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { |
||||
|
//if len(msg) > 80 {
|
||||
|
// log.Printf("recv:[%s]:%s ...", topic, msg[:80])
|
||||
|
//}
|
||||
|
adaptor := adaptors.Adaptor_Anxinyun_LastTheme{} |
||||
|
|
||||
|
needPush := adaptor.Transform(topic, msg) |
||||
|
|
||||
|
if needPush != nil && needPush.Data != nil { |
||||
|
the.dataCache <- needPush |
||||
|
} else { |
||||
|
s, _ := json.Marshal(needPush) |
||||
|
if needPush != nil { |
||||
|
if needPush.Sensor == the.logTagId { |
||||
|
log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s) |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
return true |
||||
|
} |
@ -0,0 +1,37 @@ |
|||||
|
package models |
||||
|
|
||||
|
type AXYSavoirTheme struct { |
||||
|
Station struct { |
||||
|
Name string `json:"name"` |
||||
|
Id int `json:"id"` |
||||
|
Structure Structure `json:"structure"` |
||||
|
Factor struct { |
||||
|
Id int `json:"id"` |
||||
|
Name string `json:"name"` |
||||
|
ProtoCode string `json:"protoCode"` |
||||
|
ProtoName string `json:"protoName"` |
||||
|
Items []struct { |
||||
|
Id int `json:"id"` |
||||
|
Name string `json:"name"` |
||||
|
FieldName string `json:"field_name"` |
||||
|
UnitName string `json:"unit_name"` |
||||
|
Precision interface{} `json:"precision"` |
||||
|
} `json:"items"` |
||||
|
Units struct { |
||||
|
Strain string `json:"strain"` |
||||
|
} `json:"units"` |
||||
|
} `json:"factor"` |
||||
|
ManualData bool `json:"manual_data"` |
||||
|
Formula interface{} `json:"formula"` |
||||
|
Params interface{} `json:"params"` |
||||
|
Group interface{} `json:"group"` |
||||
|
Labels string `json:"labels"` |
||||
|
GroupParam interface{} `json:"groupParam"` |
||||
|
} `json:"station"` |
||||
|
AcqTime string `json:"acqTime"` |
||||
|
TaskId string `json:"taskId"` |
||||
|
Data map[string]any `json:"data"` |
||||
|
State int `json:"state"` |
||||
|
DataEmpty bool `json:"dataEmpty"` |
||||
|
RawAgg bool `json:"rawAgg"` |
||||
|
} |
Loading…
Reference in new issue