数据 输入输出 处理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

225 lines
5.4 KiB

package consumers
import (
"encoding/json"
"fmt"
"goInOut/adaptors"
"goInOut/consumers/AXYTheme"
"goInOut/dbOperate"
"goInOut/dbOperate/_kafka"
"goInOut/models"
"goInOut/monitors"
"gopkg.in/yaml.v3"
"log"
"sync"
"time"
)
type consumerAXYThemeToES struct {
//数据缓存管道
dataCache chan *models.EsTheme
//具体配置
Info AXYTheme.ConfigFile
InKafka _kafka.KafkaHelper
OutEs dbOperate.ESHelper
infoPg *dbOperate.DBHelper
sinkMap sync.Map
lock sync.Mutex
logTagId int
monitor *monitors.CommonMonitor
}
func (the *consumerAXYThemeToES) LoadConfigJson(cfgStr string) {
// 将 JSON 格式的数据解析到结构体中
err := yaml.Unmarshal([]byte(cfgStr), &the.Info)
if err != nil {
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
panic(err)
}
}
func (the *consumerAXYThemeToES) Initial(cfg string) error {
the.sinkMap = sync.Map{}
the.dataCache = make(chan *models.EsTheme, 1000)
the.LoadConfigJson(cfg)
err := the.inputInitial()
if err != nil {
return err
}
err = the.outputInitial()
if err != nil {
return err
}
return err
}
func (the *consumerAXYThemeToES) inputInitial() error {
//数据入口
the.InKafka = _kafka.KafkaHelper{
Brokers: the.Info.IoConfig.In.Kafka.Brokers,
GroupId: the.Info.IoConfig.In.Kafka.GroupId,
}
the.InKafka.Initial()
for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics {
the.InKafka.Subscribe(inTopic, the.onData)
}
the.InKafka.Worker()
return nil
}
func (the *consumerAXYThemeToES) outputInitial() error {
//数据出口
the.OutEs = *dbOperate.NewESHelper(
the.Info.IoConfig.Out.Es.Address,
the.Info.IoConfig.Out.Es.Auth.UserName,
the.Info.IoConfig.Out.Es.Auth.Password,
)
return nil
}
func (the *consumerAXYThemeToES) sinkTask() {
intervalSec := the.Info.IoConfig.Out.Es.Interval
log.Printf("读取配置文件intervalSec[%d]", intervalSec)
ticker := time.NewTicker(time.Duration(intervalSec) * time.Second)
defer ticker.Stop()
for {
<-ticker.C
the.toSink()
}
}
func (the *consumerAXYThemeToES) toSink() {
var themes []models.EsTheme
the.lock.Lock()
defer the.lock.Unlock()
the.sinkMap.Range(func(key, value any) bool {
if v, ok := value.(*models.EsTheme); ok {
themes = append(themes, *v)
//零时打日志用
if v.Sensor == the.logTagId {
bs, _ := json.Marshal(v)
log.Printf("toSink -> Range 标记测点数据 [%d] %s ", the.logTagId, string(bs))
}
return ok
} else {
log.Printf("!!! toSink -> Range 类型转换异常 [%v]", key)
}
return true
})
if len(themes) > 0 {
index := the.Info.IoConfig.Out.Es.Index
log.Printf("写入es [%s] %d条,%s", index, len(themes), themes)
the.OutEs.BulkWriteThemes2Es(index, themes)
the.sinkMap.Clear()
}
}
func (the *consumerAXYThemeToES) Work() {
log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId)
go the.sinkTask()
go func() {
for {
pushEsTheme := <-the.dataCache
if pushEsTheme.Sensor == the.logTagId {
bs, _ := json.Marshal(pushEsTheme)
log.Printf("存储 标记测点数据 [%d] %s ", the.logTagId, string(bs))
}
//有效数据存入缓存
the.lock.Lock()
the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme)
the.lock.Unlock()
}
}()
}
func (the *consumerAXYThemeToES) onData(topic string, msg string) bool {
//if len(msg) > 80 {
// log.Printf("recv:[%s]:%s ...", topic, msg[:80])
//}
adaptor := adaptors.Adaptor_Anxinyun_LastTheme{}
esTimeStr, newTimeStr := the.judgeTime(msg)
judgeBool := false
esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000Z", esTimeStr)
if esErr != nil {
log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", esTimeStr, esErr)
}
esAtime := esAtime1.Add(8 * time.Hour) // 转为北京时间
newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", newTimeStr)
if newErr != nil {
log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", newTimeStr, newErr)
}
// 只有在两个时间解析成功时才进行比较
if esErr == nil && newErr == nil && newAtime.After(esAtime) {
judgeBool = true
}
if judgeBool {
needPush := adaptor.Transform(topic, msg)
if needPush != nil && needPush.Data != nil {
the.dataCache <- needPush
} else {
s, _ := json.Marshal(needPush)
if needPush != nil {
log.Printf("onData 测点[%d] needPush= %s", needPush.Sensor, s)
if needPush.Sensor == the.logTagId {
log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s)
}
}
}
}
return true
}
func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (string, string) {
theme := models.AXYSavoirTheme{}
err := json.Unmarshal([]byte(rawMsg), &theme)
if err != nil {
log.Printf("反序列化 异常 dev数据=%s", rawMsg)
}
queryStr := the.getESTimeQueryStr(theme.Station.Structure.Id, theme.Station.Id)
TimeTheme, err := the.OutEs.SearchThemeData("anxincloud_last_theme", queryStr)
log.Printf("判断 esTimeStr:%s,newTimeStr:%s", TimeTheme[0].CollectTime, theme.AcqTime)
return TimeTheme[0].CollectTime, theme.AcqTime
}
func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) string {
esQuery := fmt.Sprintf(`
{
"query": {
"bool": {
"must": [
{
"term": {
"structure": {
"value": %d
}
}
},
{
"term": {
"sensor": {
"value": %d
}
}
}
]
}
}
}
`, structId, sensorId)
return esQuery
}