From d93a31ec95161715552d8692741ae65716f220f0 Mon Sep 17 00:00:00 2001 From: lucas Date: Tue, 15 Jul 2025 17:35:13 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E6=9B=B4=E6=96=B0=E6=9C=80=E6=96=B0?= =?UTF-8?q?=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...ig_知物云测点数据_最新同步.yaml | 11 +- consumers/SavoirTheme/config.go | 6 +- consumers/SavoirTheme/dataModel.go | 14 ++ consumers/consumerSavoirTheme.go | 143 ++++++++++++++++-- dbOperate/_kafka/producerHelper.go | 7 +- dbOperate/elasticsearchHelper.go | 14 ++ 6 files changed, 175 insertions(+), 20 deletions(-) create mode 100644 consumers/SavoirTheme/dataModel.go diff --git a/configFiles/config_知物云测点数据_最新同步.yaml b/configFiles/config_知物云测点数据_最新同步.yaml index 16e3794..4a9ac23 100644 --- a/configFiles/config_知物云测点数据_最新同步.yaml +++ b/configFiles/config_知物云测点数据_最新同步.yaml @@ -15,13 +15,12 @@ ioConfig: auth: userName: post password: 123 - interval: 30 + interval: 30 #多久写一次es + monitor: - - cron10min: 6/10 * * * * - cron1hour: 45 0/1 * * * + cron: 4/10 * * * * queryComponent: - redis: - address: 10.8.30.160:30379 + postgres: + connect: "host=10.8.30.166 port=5432 user=FashionAdmin password=123456 dbname=SavoirCloud sslmode=disable" diff --git a/consumers/SavoirTheme/config.go b/consumers/SavoirTheme/config.go index 397c49e..70b7965 100644 --- a/consumers/SavoirTheme/config.go +++ b/consumers/SavoirTheme/config.go @@ -25,7 +25,7 @@ type Info struct { } type queryComponent struct { - Redis struct { - Address string `json:"address"` - } `json:"redis"` + Pg struct { + Connect string `yaml:"connect"` + } `yaml:"postgres"` } diff --git a/consumers/SavoirTheme/dataModel.go b/consumers/SavoirTheme/dataModel.go new file mode 100644 index 0000000..ed5e6bd --- /dev/null +++ b/consumers/SavoirTheme/dataModel.go @@ -0,0 +1,14 @@ +package SavoirTheme + +import ( + "time" +) + +type OffLineGap struct { + StructId int `json:"struct_id" db:"struct_id"` + FactorId int `json:"factor_id" db:"factor_id"` + OfflineGap int `json:"offline_gap" db:"offline_gap"` + IsOpen bool `json:"is_open" db:"is_open"` + UpdateAt time.Time `json:"update_at" db:"update_at"` + StructName string `json:"name" db:"name"` +} diff --git a/consumers/consumerSavoirTheme.go b/consumers/consumerSavoirTheme.go index c9097ce..8ab740a 100644 --- a/consumers/consumerSavoirTheme.go +++ b/consumers/consumerSavoirTheme.go @@ -2,13 +2,16 @@ package consumers import ( "encoding/json" + "fmt" "goInOut/adaptors" "goInOut/consumers/SavoirTheme" "goInOut/dbOperate" "goInOut/dbOperate/_kafka" "goInOut/models" + "goInOut/monitors" "gopkg.in/yaml.v3" "log" + "strings" "sync" "time" ) @@ -17,13 +20,16 @@ type consumerSavoirTheme struct { //数据缓存管道 dataCache chan *models.EsTheme //具体配置 - Info SavoirTheme.ConfigFile - InKafka _kafka.KafkaHelper - OutEs dbOperate.ESHelper - infoRedis *dbOperate.RedisHelper - sinkMap sync.Map - lock sync.Mutex - logTagId int + Info SavoirTheme.ConfigFile + InKafka _kafka.KafkaHelper + OutEs dbOperate.ESHelper + infoPg *dbOperate.DBHelper + sinkMap sync.Map + lock sync.Mutex + logTagId int + monitor *monitors.CommonMonitor + //数据库配置信息 + pgOffLineGaps []SavoirTheme.OffLineGap } func (the *consumerSavoirTheme) LoadConfigJson(cfgStr string) { @@ -37,7 +43,7 @@ func (the *consumerSavoirTheme) LoadConfigJson(cfgStr string) { func (the *consumerSavoirTheme) Initial(cfg string) error { the.sinkMap = sync.Map{} - the.dataCache = make(chan *models.EsTheme, 500) + the.dataCache = make(chan *models.EsTheme, 1000) the.LoadConfigJson(cfg) err := the.inputInitial() @@ -49,6 +55,11 @@ func (the *consumerSavoirTheme) Initial(cfg string) error { return err } err = the.infoComponentInitial() + if err != nil { + return err + } + + err = the.monitorInitial() return err } func (the *consumerSavoirTheme) inputInitial() error { @@ -78,12 +89,124 @@ func (the *consumerSavoirTheme) outputInitial() error { func (the *consumerSavoirTheme) infoComponentInitial() error { //数据出口 - //addr := the.Info.OtherInfo..Redis.Address - //the.infoRedis = dbOperate.NewRedisHelper("", addr) + pgConnStr := the.Info.QueryComponent.Pg.Connect + the.infoPg = dbOperate.NewDBHelper("postgres", pgConnStr) + return nil +} + +func (the *consumerSavoirTheme) monitorInitial() error { + the.monitor = &monitors.CommonMonitor{ + MonitorHelper: &monitors.MonitorHelper{}, + } + + the.monitor.Start() + for taskName, cron := range the.Info.Monitor { + switch taskName { + case "cron": + the.monitor.RegisterTask(cron, the.statisticsOffline) + default: + log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) + } + } return nil } +func (the *consumerSavoirTheme) statisticsOffline() { + log.Printf("--> 定时任务 更新数据库 配置信息") + sql := `SELECT off.*,s.name FROM "t_struct_factor_offlinegap" as off left join t_structure as s +ON off.struct_id=s.id +where off.is_open=true order by off.struct_id` + err := the.infoPg.Query(&the.pgOffLineGaps, sql) + if err != nil { + log.Printf("查询数据库异常:err-> %s", err.Error()) + return + } + log.Printf("当前共 %d条 启用配置", len(the.pgOffLineGaps)) + + //立即触发 + the.judgeOffline() + +} +func (the *consumerSavoirTheme) judgeOffline() { + now := time.Now() + for _, gap := range the.pgOffLineGaps { + var alarmDetails []string + if !gap.IsOpen { + continue + } + log.Printf("判断 s:%d,f:%d,durMin:%d", gap.StructId, gap.FactorId, gap.OfflineGap) + queryStr := the.getESOfflineAlarmQueryStr(gap.StructId, gap.FactorId) + allThemes, err := the.OutEs.SearchThemeData("savoir_last_theme", queryStr) + if err != nil { + log.Printf("查询es 异常") + } + log.Printf("查询相关测点数=%d", len(allThemes)) + for _, theme := range allThemes { + offlineMin := now.Sub(theme.CollectTime).Minutes() + log.Printf("s:%d,f:%d,sensor:%d 离线%f min", gap.StructId, gap.FactorId, theme.Sensor, offlineMin) + if offlineMin > float64(gap.OfflineGap) { + msg := fmt.Sprintf("测点[%s]离线%f min > %d min", theme.SensorName, offlineMin, gap.OfflineGap) + log.Printf("----- > %s", msg) + alarmDetails = append(alarmDetails, msg) + } + } + prefix := "offline-" + sourceId := prefix + fmt.Sprintf("%d-%d", gap.StructId, gap.FactorId) + if len(alarmDetails) > 0 { + alarmMsg := models.KafkaAlarm{ + MessageMode: "AlarmGeneration", + StructureId: gap.StructId, + StructureName: gap.StructName, + SourceId: sourceId, + SourceName: gap.StructName, + AlarmTypeCode: "8004", + AlarmCode: "80040001", + Content: strings.Join(alarmDetails, ","), + Time: time.Now().Format("2006-01-02T15:04:05+0800"), + SourceTypeId: 1, // 0:DTU, 1:传感器, 2:测点 + Sponsor: "goInOut_savoirTheme", + Extras: nil, + SubDevices: nil, + } + + payload, _ := json.Marshal(alarmMsg) + the.InKafka.Publish("savoir_alarm", payload) + } + + } + +} + +func (the *consumerSavoirTheme) getESOfflineAlarmQueryStr(structId, factorId int) string { + + esQuery := fmt.Sprintf(` +{ + "query": { + "bool": { + "must": [ + { + "term": { + "structure": { + "value": %d + } + } + }, + { + "term": { + "factor": { + "value": %d + } + } + } + ] + } + } +} +`, structId, factorId) + return esQuery +} + func (the *consumerSavoirTheme) sinkTask() { intervalSec := the.Info.IoConfig.Out.Es.Interval ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) diff --git a/dbOperate/_kafka/producerHelper.go b/dbOperate/_kafka/producerHelper.go index 2943576..2e1ef46 100644 --- a/dbOperate/_kafka/producerHelper.go +++ b/dbOperate/_kafka/producerHelper.go @@ -85,7 +85,11 @@ type KafkaAsyncProducer struct { func (p *KafkaAsyncProducer) Publish(topic string, messageBytes []byte) { //由于kafka topic 无法分级 需要特殊处理 topic = strings.Split(topic, "/")[0] - msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.ByteEncoder(messageBytes)} + msg := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(messageBytes), + Timestamp: time.Now(), + } p.producer.Input() <- msg } @@ -97,6 +101,7 @@ func (p *KafkaAsyncProducer) Close() { func NewKafkaAsyncProducer(brokers []string, clientID string) *KafkaAsyncProducer { config := sarama.NewConfig() + config.Version = sarama.V2_0_0_0 config.ClientID = clientID config.Net.DialTimeout = time.Second * 30 config.Net.ReadTimeout = time.Second * 30 diff --git a/dbOperate/elasticsearchHelper.go b/dbOperate/elasticsearchHelper.go index 3dc9e14..adc19bc 100644 --- a/dbOperate/elasticsearchHelper.go +++ b/dbOperate/elasticsearchHelper.go @@ -172,6 +172,20 @@ func (the *ESHelper) SearchLatestStationData(index string, sensorId int) (models return theme, err } + +func (the *ESHelper) SearchThemeData(index string, queryBody string) ([]models.EsTheme, error) { + var themes []models.EsTheme + themesResp, err := the.searchThemes(index, queryBody) + + var theme models.EsTheme + if len(themesResp.Hits.Hits) > 0 { + theme = themesResp.Hits.Hits[0].Source + themes = append(themes, theme) + } + + return themes, err +} + func (the *ESHelper) BulkWrite(index, reqBody string) { body := &bytes.Buffer{}