You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
286 lines
7.0 KiB
286 lines
7.0 KiB
package consumers
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"goInOut/adaptors"
|
|
"goInOut/consumers/SavoirTheme"
|
|
"goInOut/dbOperate"
|
|
"goInOut/dbOperate/_kafka"
|
|
"goInOut/models"
|
|
"goInOut/monitors"
|
|
"gopkg.in/yaml.v3"
|
|
"log"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type consumerSavoirTheme struct {
|
|
//数据缓存管道
|
|
dataCache chan *models.EsTheme
|
|
//具体配置
|
|
Info SavoirTheme.ConfigFile
|
|
InKafka _kafka.KafkaHelper
|
|
OutEs dbOperate.ESHelper
|
|
infoPg *dbOperate.DBHelper
|
|
sinkMap sync.Map
|
|
lock sync.Mutex
|
|
logTagId int
|
|
monitor *monitors.CommonMonitor
|
|
//数据库配置信息
|
|
pgOffLineGaps []SavoirTheme.OffLineGap
|
|
}
|
|
|
|
func (the *consumerSavoirTheme) LoadConfigJson(cfgStr string) {
|
|
// 将 JSON 格式的数据解析到结构体中
|
|
err := yaml.Unmarshal([]byte(cfgStr), &the.Info)
|
|
if err != nil {
|
|
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (the *consumerSavoirTheme) Initial(cfg string) error {
|
|
the.sinkMap = sync.Map{}
|
|
the.dataCache = make(chan *models.EsTheme, 1000)
|
|
|
|
the.LoadConfigJson(cfg)
|
|
err := the.inputInitial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = the.outputInitial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = the.infoComponentInitial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = the.monitorInitial()
|
|
return err
|
|
}
|
|
func (the *consumerSavoirTheme) inputInitial() error {
|
|
//数据入口
|
|
the.InKafka = _kafka.KafkaHelper{
|
|
Brokers: the.Info.IoConfig.In.Kafka.Brokers,
|
|
GroupId: the.Info.IoConfig.In.Kafka.GroupId,
|
|
}
|
|
the.InKafka.Initial()
|
|
for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics {
|
|
the.InKafka.Subscribe(inTopic, the.onData)
|
|
}
|
|
|
|
the.InKafka.Worker()
|
|
return nil
|
|
}
|
|
func (the *consumerSavoirTheme) outputInitial() error {
|
|
//数据出口
|
|
the.OutEs = *dbOperate.NewESHelper(
|
|
the.Info.IoConfig.Out.Es.Address,
|
|
the.Info.IoConfig.Out.Es.Auth.UserName,
|
|
the.Info.IoConfig.Out.Es.Auth.Password,
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (the *consumerSavoirTheme) infoComponentInitial() error {
|
|
//数据出口
|
|
pgConnStr := the.Info.QueryComponent.Pg.Connect
|
|
the.infoPg = dbOperate.NewDBHelper("postgres", pgConnStr)
|
|
return nil
|
|
}
|
|
|
|
func (the *consumerSavoirTheme) monitorInitial() error {
|
|
the.monitor = &monitors.CommonMonitor{
|
|
MonitorHelper: &monitors.MonitorHelper{},
|
|
}
|
|
|
|
the.monitor.Start()
|
|
for taskName, cron := range the.Info.Monitor {
|
|
switch taskName {
|
|
case "cron":
|
|
the.monitor.RegisterTask(cron, the.statisticsOffline)
|
|
default:
|
|
log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (the *consumerSavoirTheme) statisticsOffline() {
|
|
log.Printf("--> 定时任务 更新数据库 配置信息")
|
|
sql := `SELECT off.*,s.name FROM "t_struct_factor_offlinegap" as off left join t_structure as s
|
|
ON off.struct_id=s.id
|
|
where off.is_open=true order by off.struct_id`
|
|
err := the.infoPg.Query(&the.pgOffLineGaps, sql)
|
|
if err != nil {
|
|
log.Printf("查询数据库异常:err-> %s", err.Error())
|
|
return
|
|
}
|
|
log.Printf("当前共 %d条 启用配置", len(the.pgOffLineGaps))
|
|
|
|
//立即触发
|
|
the.judgeOffline()
|
|
|
|
}
|
|
func (the *consumerSavoirTheme) judgeOffline() {
|
|
now := time.Now()
|
|
for _, gap := range the.pgOffLineGaps {
|
|
var alarmDetails []string
|
|
if !gap.IsOpen {
|
|
continue
|
|
}
|
|
log.Printf("判断 s:%d,f:%d,durMin:%d", gap.StructId, gap.FactorId, gap.OfflineGap)
|
|
queryStr := the.getESOfflineAlarmQueryStr(gap.StructId, gap.FactorId)
|
|
allThemes, err := the.OutEs.SearchThemeData("savoir_last_theme", queryStr)
|
|
if err != nil {
|
|
log.Printf("查询es 异常")
|
|
}
|
|
log.Printf("查询相关测点数=%d", len(allThemes))
|
|
for _, theme := range allThemes {
|
|
offlineMin := now.Sub(theme.CollectTime).Minutes()
|
|
log.Printf("s:%d,f:%d,sensor:%d 离线%f min", gap.StructId, gap.FactorId, theme.Sensor, offlineMin)
|
|
if offlineMin > float64(gap.OfflineGap) {
|
|
msg := fmt.Sprintf("测点[%s]离线%f min > %d min", theme.SensorName, offlineMin, gap.OfflineGap)
|
|
log.Printf("----- > %s", msg)
|
|
alarmDetails = append(alarmDetails, msg)
|
|
}
|
|
}
|
|
prefix := "offline-"
|
|
sourceId := prefix + fmt.Sprintf("%d-%d", gap.StructId, gap.FactorId)
|
|
if len(alarmDetails) > 0 {
|
|
alarmMsg := models.KafkaAlarm{
|
|
MessageMode: "AlarmGeneration",
|
|
StructureId: gap.StructId,
|
|
StructureName: gap.StructName,
|
|
SourceId: sourceId,
|
|
SourceName: gap.StructName,
|
|
AlarmTypeCode: "8004",
|
|
AlarmCode: "80040001",
|
|
Content: strings.Join(alarmDetails, ","),
|
|
Time: time.Now().Format("2006-01-02T15:04:05+0800"),
|
|
SourceTypeId: 1, // 0:DTU, 1:传感器, 2:测点
|
|
Sponsor: "goInOut_savoirTheme",
|
|
Extras: nil,
|
|
SubDevices: nil,
|
|
}
|
|
|
|
payload, _ := json.Marshal(alarmMsg)
|
|
the.InKafka.Publish("savoir_alarm", payload)
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
func (the *consumerSavoirTheme) getESOfflineAlarmQueryStr(structId, factorId int) string {
|
|
|
|
esQuery := fmt.Sprintf(`
|
|
{
|
|
"query": {
|
|
"bool": {
|
|
"must": [
|
|
{
|
|
"term": {
|
|
"structure": {
|
|
"value": %d
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"term": {
|
|
"factor": {
|
|
"value": %d
|
|
}
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
`, structId, factorId)
|
|
return esQuery
|
|
}
|
|
|
|
func (the *consumerSavoirTheme) sinkTask() {
|
|
intervalSec := the.Info.IoConfig.Out.Es.Interval
|
|
ticker := time.NewTicker(time.Duration(intervalSec) * time.Second)
|
|
defer ticker.Stop()
|
|
for {
|
|
<-ticker.C
|
|
the.toSink()
|
|
}
|
|
}
|
|
|
|
func (the *consumerSavoirTheme) toSink() {
|
|
var themes []models.EsTheme
|
|
the.lock.Lock()
|
|
defer the.lock.Unlock()
|
|
the.sinkMap.Range(func(key, value any) bool {
|
|
if v, ok := value.(*models.EsTheme); ok {
|
|
themes = append(themes, *v)
|
|
//零时打日志用
|
|
if v.Sensor == the.logTagId {
|
|
bs, _ := json.Marshal(v)
|
|
log.Printf("toSink -> Range 标记测点数据 [%d] %s ", the.logTagId, string(bs))
|
|
}
|
|
return ok
|
|
} else {
|
|
log.Printf("!!! toSink -> Range 类型转换异常 [%v]", key)
|
|
}
|
|
return true
|
|
})
|
|
if len(themes) > 0 {
|
|
index := the.Info.IoConfig.Out.Es.Index
|
|
log.Printf("写入es [%s] %d条", index, len(themes))
|
|
the.OutEs.BulkWriteThemes2Es(index, themes)
|
|
the.sinkMap.Clear()
|
|
}
|
|
}
|
|
|
|
func (the *consumerSavoirTheme) Work() {
|
|
log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId)
|
|
go the.sinkTask()
|
|
go func() {
|
|
for {
|
|
pushEsTheme := <-the.dataCache
|
|
|
|
if pushEsTheme.Sensor == the.logTagId {
|
|
bs, _ := json.Marshal(pushEsTheme)
|
|
log.Printf("存储 标记测点数据 [%d] %s ", the.logTagId, string(bs))
|
|
}
|
|
|
|
//有效数据存入缓存
|
|
the.lock.Lock()
|
|
the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme)
|
|
the.lock.Unlock()
|
|
}
|
|
|
|
}()
|
|
}
|
|
func (the *consumerSavoirTheme) onData(topic string, msg string) bool {
|
|
//if len(msg) > 80 {
|
|
// log.Printf("recv:[%s]:%s ...", topic, msg[:80])
|
|
//}
|
|
adaptor := adaptors.Adaptor_Savoir_LastTheme{}
|
|
|
|
needPush := adaptor.Transform(topic, msg)
|
|
|
|
if needPush != nil && needPush.Data != nil {
|
|
the.dataCache <- needPush
|
|
} else {
|
|
s, _ := json.Marshal(needPush)
|
|
if needPush != nil {
|
|
if needPush.Sensor == the.logTagId {
|
|
log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s)
|
|
}
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|