From f00ecb6cf0c078522702a39ba5dd4fa9c3f5b07f Mon Sep 17 00:00:00 2001 From: 18209 Date: Thu, 17 Jul 2025 17:06:20 +0800 Subject: [PATCH] =?UTF-8?q?=E7=9F=A5=E7=89=A9=E4=BA=91=E6=B5=8B=E7=82=B9?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E4=B8=AD=E6=96=AD=E5=91=8A=E8=AD=A6=E5=B9=B6?= =?UTF-8?q?=E4=B8=94=E8=87=AA=E5=8A=A8=E6=81=A2=E5=A4=8D=E8=BF=9B=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/configStruct.go | 7 +- ...ig_知物云测点数据_最新同步.yaml | 3 +- consumers/consumerSavoirTheme.go | 83 ++++++++++++++++++- dbOperate/elasticsearchHelper.go | 34 ++++++++ models/esTheme.go | 38 +++++++++ 5 files changed, 160 insertions(+), 5 deletions(-) diff --git a/config/configStruct.go b/config/configStruct.go index c67d577..211bb78 100644 --- a/config/configStruct.go +++ b/config/configStruct.go @@ -18,9 +18,10 @@ type MqttConfig struct { Topics []string `json:"topics" yaml:"topics"` } type KafkaConfig struct { - Brokers []string `json:"brokers" yaml:"brokers"` - GroupId string `json:"groupId" yaml:"groupId"` - Topics []string `json:"topics" yaml:"topics"` + Brokers []string `json:"brokers" yaml:"brokers"` + GroupId string `json:"groupId" yaml:"groupId"` + AlarmTopic string `json:"alarmTopic" yaml:"alarmTopic"` + Topics []string `json:"topics" yaml:"topics"` } type EsConfig struct { diff --git a/configFiles/config_知物云测点数据_最新同步.yaml b/configFiles/config_知物云测点数据_最新同步.yaml index 4a9ac23..3d0fd0b 100644 --- a/configFiles/config_知物云测点数据_最新同步.yaml +++ b/configFiles/config_知物云测点数据_最新同步.yaml @@ -5,6 +5,7 @@ ioConfig: brokers: - 10.8.30.160:30992 groupId: savoir_last_theme_inout + alarmTopic: "savoir_alarm" topics: - savoir_theme out: @@ -18,7 +19,7 @@ ioConfig: interval: 30 #多久写一次es monitor: - cron: 4/10 * * * * + cron: 1/5 * * * * queryComponent: postgres: diff --git a/consumers/consumerSavoirTheme.go b/consumers/consumerSavoirTheme.go index 6f6ca02..d0f5f77 100644 --- a/consumers/consumerSavoirTheme.go +++ b/consumers/consumerSavoirTheme.go @@ -11,6 +11,7 @@ import ( "goInOut/monitors" "gopkg.in/yaml.v3" "log" + "regexp" "strings" "sync" "time" @@ -145,6 +146,42 @@ func (the *consumerSavoirTheme) judgeOffline() { for _, theme := range allThemes { offlineMin := now.Sub(theme.CollectTime).Minutes() log.Printf("s:%d,f:%d,sensor:%d 离线%f min", gap.StructId, gap.FactorId, theme.Sensor, offlineMin) + + //拿到当前es最后一条数据和当前数据库的配置之后去查是否产生告警 + StrValue := "80040003" + alarmQueryStr := the.getEsAlarmValueStr(gap.StructId, StrValue) + allAlarmThemes, err := the.OutEs.SearchAlarmThemeData("native_alarms", alarmQueryStr) + log.Printf("allAlarmThemes----- > %s", allAlarmThemes) + if err != nil { + log.Printf("查询es 异常") + } + for _, alarmTheme := range allAlarmThemes { + detailPoint := extractSensorName(alarmTheme.Detail) + if theme.SensorName == detailPoint { + if offlineMin < float64(gap.OfflineGap) { + prefix := "offline-" + sourceId := prefix + fmt.Sprintf("%d-%d", gap.StructId, gap.FactorId) + alarmMsg := models.KafkaAlarm{ + MessageMode: "AlarmAutoElimination", + StructureId: gap.StructId, + StructureName: gap.StructName, + SourceId: sourceId, + SourceName: gap.StructName, + AlarmTypeCode: "3004", + AlarmCode: "********", + Content: "", + Time: time.Now().Format("2006-01-02T15:04:05+0800"), + SourceTypeId: 2, // 0:DTU, 1:传感器, 2:测点 + Sponsor: "goInOut_savoirTheme", + Extras: nil, + SubDevices: nil, + } + log.Printf("----- > 恢复告警已准备发送") + payload, _ := json.Marshal(alarmMsg) + the.InKafka.Publish(the.Info.IoConfig.In.Kafka.AlarmTopic, payload) + } + } + } if offlineMin > float64(gap.OfflineGap) { msg := fmt.Sprintf("测点[%s]离线%f min > %d min", theme.SensorName, offlineMin, gap.OfflineGap) log.Printf("----- > %s", msg) @@ -171,13 +208,57 @@ func (the *consumerSavoirTheme) judgeOffline() { } payload, _ := json.Marshal(alarmMsg) - the.InKafka.Publish("savoir_alarm", payload) + the.InKafka.Publish(the.Info.IoConfig.In.Kafka.AlarmTopic, payload) } } } +// 提取方括号 [] 内的内容 +func extractSensorName(alert string) string { + // 正则表达式匹配方括号内的内容 + re := regexp.MustCompile(`\[(.*?)\]`) + match := re.FindStringSubmatch(alert) + + // 如果匹配成功,返回匹配的内容(即方括号内的字符串) + if len(match) > 1 { + log.Printf("----- > %s", match[1]) + return match[1] + } + + return "" // 如果没有匹配到,返回空字符串 +} + +func (the *consumerSavoirTheme) getEsAlarmValueStr(structId int, alarmCode string) string { + + esQuery := fmt.Sprintf(` +{ + "query": { + "bool": { + "must": [ + { + "term": { + "structure_id": { + "value": %d + } + } + }, + { + "term": { + "alarm_code": { + "value": %s + } + } + } + ] + } + } +} +`, structId, alarmCode) + return esQuery +} + func (the *consumerSavoirTheme) getESOfflineAlarmQueryStr(structId, factorId int) string { esQuery := fmt.Sprintf(` diff --git a/dbOperate/elasticsearchHelper.go b/dbOperate/elasticsearchHelper.go index adc19bc..088df2c 100644 --- a/dbOperate/elasticsearchHelper.go +++ b/dbOperate/elasticsearchHelper.go @@ -143,6 +143,27 @@ func (the *ESHelper) searchThemes(index, reqBody string) (models.EsThemeResp, er } return r, err } + +func (the *ESHelper) searchAlarmThemes(index, reqBody string) (models.EsAlarmThemeResp, error) { + body := &bytes.Buffer{} + body.WriteString(reqBody) + response, err := the.esClient.Search( + the.esClient.Search.WithIndex(index), + the.esClient.Search.WithBody(body), + ) + defer response.Body.Close() + if err != nil { + //return nil, err + } + log.Println(response.Status()) + r := models.EsAlarmThemeResp{} + // Deserialize the response into a map. + if err := json.NewDecoder(response.Body).Decode(&r); err != nil { + log.Fatalf("Error parsing the response body: %s", err) + } + return r, err +} + func (the *ESHelper) SearchLatestStationData(index string, sensorId int) (models.EsTheme, error) { //sensorId := 178 queryBody := fmt.Sprintf(`{ @@ -186,6 +207,19 @@ func (the *ESHelper) SearchThemeData(index string, queryBody string) ([]models.E return themes, err } +func (the *ESHelper) SearchAlarmThemeData(index string, queryBody string) ([]models.EsAlarmTheme, error) { + var themes []models.EsAlarmTheme + themesResp, err := the.searchAlarmThemes(index, queryBody) + + var theme models.EsAlarmTheme + if len(themesResp.Hits.Hits) > 0 { + theme = models.EsAlarmTheme(themesResp.Hits.Hits[0].Source) + themes = append(themes, theme) + } + + return themes, err +} + func (the *ESHelper) BulkWrite(index, reqBody string) { body := &bytes.Buffer{} diff --git a/models/esTheme.go b/models/esTheme.go index ae31dbb..13cf737 100644 --- a/models/esTheme.go +++ b/models/esTheme.go @@ -16,6 +16,20 @@ type EsTheme struct { CreateTime time.Time `json:"create_time"` } +type EsAlarmTheme struct { + SensorName string `json:"sensor_name"` + FactorName string `json:"factor_name"` + FactorProtoCode string `json:"factor_proto_code"` + Data map[string]any `json:"data"` + Detail string `json:"detail"` + Factor int `json:"factor"` + CollectTime time.Time `json:"collect_time"` + Sensor int `json:"sensor"` + Structure int `json:"structure"` + IotaDevice []string `json:"iota_device"` + CreateTime time.Time `json:"create_time"` +} + type EsThemeResp struct { Took int `json:"took"` TimedOut bool `json:"timed_out"` @@ -39,3 +53,27 @@ type HitTheme struct { Score float64 `json:"_score"` Source EsTheme `json:"_source"` } + +type EsAlarmThemeResp struct { + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Shards struct { + Total int `json:"total"` + Successful int `json:"successful"` + Skipped int `json:"skipped"` + Failed int `json:"failed"` + } `json:"_shards"` + Hits struct { + Total int `json:"total"` + MaxScore float64 `json:"max_score"` + Hits []HitAlarmTheme `json:"hits"` + } `json:"hits"` +} + +type HitAlarmTheme struct { + Index string `json:"_index"` + Type string `json:"_type"` + Id string `json:"_id"` + Score float64 `json:"_score"` + Source EsAlarmTheme `json:"_source"` +}