Browse Source

知物云测点数据中断告警并且自动恢复进程

dev
18209 3 months ago
parent
commit
f00ecb6cf0
  1. 7
      config/configStruct.go
  2. 3
      configFiles/config_知物云测点数据_最新同步.yaml
  3. 83
      consumers/consumerSavoirTheme.go
  4. 34
      dbOperate/elasticsearchHelper.go
  5. 38
      models/esTheme.go

7
config/configStruct.go

@ -18,9 +18,10 @@ type MqttConfig struct {
Topics []string `json:"topics" yaml:"topics"` Topics []string `json:"topics" yaml:"topics"`
} }
type KafkaConfig struct { type KafkaConfig struct {
Brokers []string `json:"brokers" yaml:"brokers"` Brokers []string `json:"brokers" yaml:"brokers"`
GroupId string `json:"groupId" yaml:"groupId"` GroupId string `json:"groupId" yaml:"groupId"`
Topics []string `json:"topics" yaml:"topics"` AlarmTopic string `json:"alarmTopic" yaml:"alarmTopic"`
Topics []string `json:"topics" yaml:"topics"`
} }
type EsConfig struct { type EsConfig struct {

3
configFiles/config_知物云测点数据_最新同步.yaml

@ -5,6 +5,7 @@ ioConfig:
brokers: brokers:
- 10.8.30.160:30992 - 10.8.30.160:30992
groupId: savoir_last_theme_inout groupId: savoir_last_theme_inout
alarmTopic: "savoir_alarm"
topics: topics:
- savoir_theme - savoir_theme
out: out:
@ -18,7 +19,7 @@ ioConfig:
interval: 30 #多久写一次es interval: 30 #多久写一次es
monitor: monitor:
cron: 4/10 * * * * cron: 1/5 * * * *
queryComponent: queryComponent:
postgres: postgres:

83
consumers/consumerSavoirTheme.go

@ -11,6 +11,7 @@ import (
"goInOut/monitors" "goInOut/monitors"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
"log" "log"
"regexp"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -145,6 +146,42 @@ func (the *consumerSavoirTheme) judgeOffline() {
for _, theme := range allThemes { for _, theme := range allThemes {
offlineMin := now.Sub(theme.CollectTime).Minutes() offlineMin := now.Sub(theme.CollectTime).Minutes()
log.Printf("s:%d,f:%d,sensor:%d 离线%f min", gap.StructId, gap.FactorId, theme.Sensor, offlineMin) log.Printf("s:%d,f:%d,sensor:%d 离线%f min", gap.StructId, gap.FactorId, theme.Sensor, offlineMin)
//拿到当前es最后一条数据和当前数据库的配置之后去查是否产生告警
StrValue := "80040003"
alarmQueryStr := the.getEsAlarmValueStr(gap.StructId, StrValue)
allAlarmThemes, err := the.OutEs.SearchAlarmThemeData("native_alarms", alarmQueryStr)
log.Printf("allAlarmThemes----- > %s", allAlarmThemes)
if err != nil {
log.Printf("查询es 异常")
}
for _, alarmTheme := range allAlarmThemes {
detailPoint := extractSensorName(alarmTheme.Detail)
if theme.SensorName == detailPoint {
if offlineMin < float64(gap.OfflineGap) {
prefix := "offline-"
sourceId := prefix + fmt.Sprintf("%d-%d", gap.StructId, gap.FactorId)
alarmMsg := models.KafkaAlarm{
MessageMode: "AlarmAutoElimination",
StructureId: gap.StructId,
StructureName: gap.StructName,
SourceId: sourceId,
SourceName: gap.StructName,
AlarmTypeCode: "3004",
AlarmCode: "********",
Content: "",
Time: time.Now().Format("2006-01-02T15:04:05+0800"),
SourceTypeId: 2, // 0:DTU, 1:传感器, 2:测点
Sponsor: "goInOut_savoirTheme",
Extras: nil,
SubDevices: nil,
}
log.Printf("----- > 恢复告警已准备发送")
payload, _ := json.Marshal(alarmMsg)
the.InKafka.Publish(the.Info.IoConfig.In.Kafka.AlarmTopic, payload)
}
}
}
if offlineMin > float64(gap.OfflineGap) { if offlineMin > float64(gap.OfflineGap) {
msg := fmt.Sprintf("测点[%s]离线%f min > %d min", theme.SensorName, offlineMin, gap.OfflineGap) msg := fmt.Sprintf("测点[%s]离线%f min > %d min", theme.SensorName, offlineMin, gap.OfflineGap)
log.Printf("----- > %s", msg) log.Printf("----- > %s", msg)
@ -171,13 +208,57 @@ func (the *consumerSavoirTheme) judgeOffline() {
} }
payload, _ := json.Marshal(alarmMsg) payload, _ := json.Marshal(alarmMsg)
the.InKafka.Publish("savoir_alarm", payload) the.InKafka.Publish(the.Info.IoConfig.In.Kafka.AlarmTopic, payload)
} }
} }
} }
// 提取方括号 [] 内的内容
func extractSensorName(alert string) string {
// 正则表达式匹配方括号内的内容
re := regexp.MustCompile(`\[(.*?)\]`)
match := re.FindStringSubmatch(alert)
// 如果匹配成功,返回匹配的内容(即方括号内的字符串)
if len(match) > 1 {
log.Printf("----- > %s", match[1])
return match[1]
}
return "" // 如果没有匹配到,返回空字符串
}
func (the *consumerSavoirTheme) getEsAlarmValueStr(structId int, alarmCode string) string {
esQuery := fmt.Sprintf(`
{
"query": {
"bool": {
"must": [
{
"term": {
"structure_id": {
"value": %d
}
}
},
{
"term": {
"alarm_code": {
"value": %s
}
}
}
]
}
}
}
`, structId, alarmCode)
return esQuery
}
func (the *consumerSavoirTheme) getESOfflineAlarmQueryStr(structId, factorId int) string { func (the *consumerSavoirTheme) getESOfflineAlarmQueryStr(structId, factorId int) string {
esQuery := fmt.Sprintf(` esQuery := fmt.Sprintf(`

34
dbOperate/elasticsearchHelper.go

@ -143,6 +143,27 @@ func (the *ESHelper) searchThemes(index, reqBody string) (models.EsThemeResp, er
} }
return r, err return r, err
} }
func (the *ESHelper) searchAlarmThemes(index, reqBody string) (models.EsAlarmThemeResp, error) {
body := &bytes.Buffer{}
body.WriteString(reqBody)
response, err := the.esClient.Search(
the.esClient.Search.WithIndex(index),
the.esClient.Search.WithBody(body),
)
defer response.Body.Close()
if err != nil {
//return nil, err
}
log.Println(response.Status())
r := models.EsAlarmThemeResp{}
// Deserialize the response into a map.
if err := json.NewDecoder(response.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
return r, err
}
func (the *ESHelper) SearchLatestStationData(index string, sensorId int) (models.EsTheme, error) { func (the *ESHelper) SearchLatestStationData(index string, sensorId int) (models.EsTheme, error) {
//sensorId := 178 //sensorId := 178
queryBody := fmt.Sprintf(`{ queryBody := fmt.Sprintf(`{
@ -186,6 +207,19 @@ func (the *ESHelper) SearchThemeData(index string, queryBody string) ([]models.E
return themes, err return themes, err
} }
func (the *ESHelper) SearchAlarmThemeData(index string, queryBody string) ([]models.EsAlarmTheme, error) {
var themes []models.EsAlarmTheme
themesResp, err := the.searchAlarmThemes(index, queryBody)
var theme models.EsAlarmTheme
if len(themesResp.Hits.Hits) > 0 {
theme = models.EsAlarmTheme(themesResp.Hits.Hits[0].Source)
themes = append(themes, theme)
}
return themes, err
}
func (the *ESHelper) BulkWrite(index, reqBody string) { func (the *ESHelper) BulkWrite(index, reqBody string) {
body := &bytes.Buffer{} body := &bytes.Buffer{}

38
models/esTheme.go

@ -16,6 +16,20 @@ type EsTheme struct {
CreateTime time.Time `json:"create_time"` CreateTime time.Time `json:"create_time"`
} }
type EsAlarmTheme struct {
SensorName string `json:"sensor_name"`
FactorName string `json:"factor_name"`
FactorProtoCode string `json:"factor_proto_code"`
Data map[string]any `json:"data"`
Detail string `json:"detail"`
Factor int `json:"factor"`
CollectTime time.Time `json:"collect_time"`
Sensor int `json:"sensor"`
Structure int `json:"structure"`
IotaDevice []string `json:"iota_device"`
CreateTime time.Time `json:"create_time"`
}
type EsThemeResp struct { type EsThemeResp struct {
Took int `json:"took"` Took int `json:"took"`
TimedOut bool `json:"timed_out"` TimedOut bool `json:"timed_out"`
@ -39,3 +53,27 @@ type HitTheme struct {
Score float64 `json:"_score"` Score float64 `json:"_score"`
Source EsTheme `json:"_source"` Source EsTheme `json:"_source"`
} }
type EsAlarmThemeResp struct {
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
Shards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
} `json:"_shards"`
Hits struct {
Total int `json:"total"`
MaxScore float64 `json:"max_score"`
Hits []HitAlarmTheme `json:"hits"`
} `json:"hits"`
}
type HitAlarmTheme struct {
Index string `json:"_index"`
Type string `json:"_type"`
Id string `json:"_id"`
Score float64 `json:"_score"`
Source EsAlarmTheme `json:"_source"`
}

Loading…
Cancel
Save