Browse Source

安信河北项目增加告警内容推送

dev
zhangyuxiang 2 weeks ago
parent
commit
cc3958ccf4
  1. 8
      adaptors/知物云es主题特征to河北公路设施监测.go
  2. 6
      configFiles/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml
  3. 3
      consumers/HBJCAS/config.go
  4. 2
      consumers/HBJCAS/interfaceBody.go
  5. 168
      consumers/consumerZWYHBJCAS.go
  6. 15
      models/IotaData.go

8
adaptors/知物云es主题特征to河北公路设施监测.go

@ -50,7 +50,7 @@ func (the Adaptor_ZWYES_HBGL) EsAggTopToHBJCAS(structId int64, factorId int, esA
return return
} }
//设施唯一编码(省平台) //设施唯一编码(省平台)
uniqueCode := the.getUniqueCode(structId) uniqueCode := the.GetUniqueCode(structId)
if uniqueCode == 0 { if uniqueCode == 0 {
log.Printf("structId=%d,无匹配省平台uniqueCode", structId) log.Printf("structId=%d,无匹配省平台uniqueCode", structId)
return return
@ -68,7 +68,7 @@ func (the Adaptor_ZWYES_HBGL) EsAggTopToHBJCAS(structId int64, factorId int, esA
log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签异常", structId, factorId, sensorId) log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签异常", structId, factorId, sensorId)
continue continue
} }
monitorCode := the.getPointCodeFromLabel(station.Labels) monitorCode := the.GetPointCodeFromLabel(station.Labels)
if monitorCode == 0 { if monitorCode == 0 {
log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签,信息转换int64异常,跳过", structId, factorId, sensorId) log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签,信息转换int64异常,跳过", structId, factorId, sensorId)
continue continue
@ -161,14 +161,14 @@ func (the Adaptor_ZWYES_HBGL) EsAgg2StatisticData(factorId int, monitorCode int6
return dataDefinitionStatisticData return dataDefinitionStatisticData
} }
func (the Adaptor_ZWYES_HBGL) getUniqueCode(structId int64) (uniqueCode int64) { func (the Adaptor_ZWYES_HBGL) GetUniqueCode(structId int64) (uniqueCode int64) {
if v, ok := the.StructInfo[structId]; ok { if v, ok := the.StructInfo[structId]; ok {
uniqueCode = v uniqueCode = v
} }
return uniqueCode return uniqueCode
} }
func (the Adaptor_ZWYES_HBGL) getPointCodeFromLabel(label string) int64 { func (the Adaptor_ZWYES_HBGL) GetPointCodeFromLabel(label string) int64 {
//解析label {13010600001} //解析label {13010600001}
pointUniqueCode := int64(0) pointUniqueCode := int64(0)
if len(label) > 2 { if len(label) > 2 {

6
configFiles/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml

@ -3,6 +3,12 @@ ioConfig:
in: in:
http: http:
url: https://esproxy.anxinyun.cn/savoir_themes/_search url: https://esproxy.anxinyun.cn/savoir_themes/_search
kafka:
brokers:
- 10.8.30.160:30992
groupId: anxinHebeiGL_01
topics:
- savoir_alarm
out: out:
mqtt: mqtt:
host: 123.249.81.52 host: 123.249.81.52

3
consumers/HBJCAS/config.go

@ -19,7 +19,8 @@ type ioConfig struct {
Out Out `yaml:"out"` Out Out `yaml:"out"`
} }
type In struct { type In struct {
Http config.HttpConfig `yaml:"http"` Http config.HttpConfig `yaml:"http"`
Kafka config.KafkaConfig `json:"kafka"`
} }
type Out struct { type Out struct {

2
consumers/HBJCAS/interfaceBody.go

@ -22,7 +22,7 @@ type HealthInfo struct {
//报警信息内容 //报警信息内容
type WarningInfo struct { type WarningInfo struct {
AlarmId string `json:"alarmId"` AlarmId string `json:"alarmId"`
UniqueCode int `json:"uniqueCode"` UniqueCode int64 `json:"uniqueCode"`
PointUniqueCode int64 `json:"pointUniqueCode"` PointUniqueCode int64 `json:"pointUniqueCode"`
AlarmLevel int `json:"alarmLevel"` AlarmLevel int `json:"alarmLevel"`
MonitorValue string `json:"monitorValue"` MonitorValue string `json:"monitorValue"`

168
consumers/consumerZWYHBJCAS.go

@ -7,10 +7,13 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/google/uuid"
"github.com/tjfoc/gmsm/sm3" "github.com/tjfoc/gmsm/sm3"
"goInOut/adaptors" "goInOut/adaptors"
"goInOut/consumers/HBJCAS" "goInOut/consumers/HBJCAS"
"goInOut/dbOperate" "goInOut/dbOperate"
"goInOut/dbOperate/_kafka"
"goInOut/models"
"goInOut/monitors" "goInOut/monitors"
"goInOut/utils" "goInOut/utils"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
@ -18,6 +21,7 @@ import (
"io/ioutil" "io/ioutil"
"log" "log"
"net/http" "net/http"
"regexp"
"strings" "strings"
"time" "time"
) )
@ -26,13 +30,15 @@ type consumerZWYHBJCAS struct {
//数据缓存管道 //数据缓存管道
ch chan []adaptors.NeedPush ch chan []adaptors.NeedPush
//具体配置 //具体配置
Info HBJCAS.ConfigFile Info HBJCAS.ConfigFile
Seq int64 Seq int64
SeqDate string SeqDate string
InHttp *dbOperate.HttpHelper InHttp *dbOperate.HttpHelper
outMqtt *dbOperate.MqttHelper InKafka _kafka.KafkaHelper
monitor *monitors.CommonMonitor outMqtt *dbOperate.MqttHelper
infoRedis *dbOperate.RedisHelper monitor *monitors.CommonMonitor
infoRedis *dbOperate.RedisHelper
structIdArr []int
} }
func (the *consumerZWYHBJCAS) LoadConfigJson(cfgStr string) { func (the *consumerZWYHBJCAS) LoadConfigJson(cfgStr string) {
@ -42,6 +48,12 @@ func (the *consumerZWYHBJCAS) LoadConfigJson(cfgStr string) {
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
panic(err) panic(err)
} }
the.structIdArr = make([]int, 0)
if len(the.Info.StructInfo) > 0 {
for structId, _ := range the.Info.StructInfo {
the.structIdArr = append(the.structIdArr, int(structId))
}
}
} }
func (the *consumerZWYHBJCAS) Initial(cfg string) error { func (the *consumerZWYHBJCAS) Initial(cfg string) error {
@ -61,6 +73,16 @@ func (the *consumerZWYHBJCAS) InputInitial() error {
the.ch = make(chan []adaptors.NeedPush, 200) the.ch = make(chan []adaptors.NeedPush, 200)
//数据入口 //数据入口
the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""} the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""}
the.InKafka = _kafka.KafkaHelper{
Brokers: the.Info.IoConfig.In.Kafka.Brokers,
GroupId: the.Info.IoConfig.In.Kafka.GroupId,
}
the.InKafka.Initial()
for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics {
the.InKafka.Subscribe(inTopic, the.onData)
}
the.InKafka.Worker()
the.monitor = &monitors.CommonMonitor{ the.monitor = &monitors.CommonMonitor{
MonitorHelper: &monitors.MonitorHelper{}, MonitorHelper: &monitors.MonitorHelper{},
} }
@ -377,6 +399,71 @@ func (the *consumerZWYHBJCAS) getCodeStatus() []interface{} {
return res return res
} }
func (the *consumerZWYHBJCAS) getAlarmInfo(am models.AlarmMsg) []interface{} {
alarms := make([]interface{}, 0)
content := am.Content
sourceId := am.SourceId
structureId := am.StructureId
level := GetLevel(am.AlarmCode)
if level == 0 {
//进行告警推送
return alarms
}
msgArr := strings.Split(content, ";")
valArr := make([]string, 0)
unitArr := make([]string, 0)
re := regexp.MustCompile(`:([\d\.]+)(\D+),`)
for _, msg := range msgArr {
match := re.FindStringSubmatch(msg)
if match != nil {
// 提取数字和单位
number := match[1] // 第一个捕获组,数字部分
valArr = append(valArr, number)
unit := match[2] // 第二个捕获组,单位部分
unitArr = append(unitArr, unit)
}
}
id := uuid.New().String()
station := models.Station{}
k1 := fmt.Sprintf("station:%s", sourceId)
adaptor := the.getAdaptor()
errRedis := adaptor.Redis.GetObj(k1, &station)
if errRedis != nil {
log.Printf("redis 获取[s:%d]测点[%s]标签异常", structureId, sourceId)
return alarms
}
monitorCode := adaptor.GetPointCodeFromLabel(station.Labels)
uniqueCode := adaptor.GetUniqueCode(int64(structureId))
if uniqueCode == 0 {
log.Printf("structId=%d,无匹配省平台uniqueCode", structureId)
return alarms
}
alTime := am.Time
tsp := GetTime(alTime)
nTsp := time.Now().UnixMilli()
al := HBJCAS.WarningInfo{
AlarmId: id,
UniqueCode: uniqueCode,
PointUniqueCode: monitorCode,
AlarmLevel: level,
MonitorValue: strings.Join(valArr, ","),
Unit: strings.Join(unitArr, ","),
AlarmStartTime: tsp,
ReportToProvinceTime: nTsp,
ReportToProvinceUser: "王俊伟",
ReportToProvinceUserTel: "15933033309",
AlarmStatus: "Unhandled",
HandleTime: nTsp,
HandleUser: "王俊伟",
HandleUserTel: "15933033309",
HandleContent: "立即处理",
Test: false,
}
alarms = append(alarms, al)
return alarms
}
// JWT头部 // JWT头部
type Header struct { type Header struct {
Typ string `json:"typ"` Typ string `json:"typ"`
@ -439,7 +526,7 @@ func (the *consumerZWYHBJCAS) GenerateJWT() (string, error) {
return jwt, nil return jwt, nil
} }
func (the *consumerZWYHBJCAS) UploadInfo(uploadType string) { func (the *consumerZWYHBJCAS) UploadInfo(uploadType string, vv []interface{}) {
urlIndex, ok := the.Info.OtherInfo["urlIndex"] urlIndex, ok := the.Info.OtherInfo["urlIndex"]
if !ok { if !ok {
log.Println("未配置省平台业务数据接口=============") log.Println("未配置省平台业务数据接口=============")
@ -460,6 +547,9 @@ func (the *consumerZWYHBJCAS) UploadInfo(uploadType string) {
if len(bodyInfo) == 0 { if len(bodyInfo) == 0 {
return return
} }
case "warningInfo":
url = urlIndex + "warningInfo/sync"
bodyInfo = vv
default: default:
return return
} }
@ -480,11 +570,15 @@ func (the *consumerZWYHBJCAS) UploadInfo(uploadType string) {
} }
} }
func (the *consumerZWYHBJCAS) UploadCamInfo() { func (the *consumerZWYHBJCAS) UploadCamInfo() {
the.UploadInfo("cameraInfo") the.UploadInfo("cameraInfo", nil)
} }
func (the *consumerZWYHBJCAS) UploadHeaInfo() { func (the *consumerZWYHBJCAS) UploadHeaInfo() {
the.UploadInfo("healthInfo") the.UploadInfo("healthInfo", nil)
}
func (the *consumerZWYHBJCAS) UploadAlarmInfo(vv []interface{}) {
the.UploadInfo("warningInfo", vv)
} }
func (the *consumerZWYHBJCAS) postInfo(url, payloadStr string) error { func (the *consumerZWYHBJCAS) postInfo(url, payloadStr string) error {
@ -560,3 +654,57 @@ func downloadFile(url string) ([]byte, error) {
return fileContent, nil return fileContent, nil
} }
func (the *consumerZWYHBJCAS) onData(topic string, msg string) bool {
//if len(msg) > 80 {
// log.Printf("recv:[%s]:%s ...", topic, msg[:80])
//}
switch topic {
case "savoir_alarm":
alarmData := models.AlarmMsg{}
err := json.Unmarshal([]byte(msg), &alarmData)
if err != nil {
log.Printf("反序列化 异常 alarm数据=%s", msg)
}
if alarmData.AlarmTypeCode == "3007" && the.JudgeExist(alarmData.StructureId) {
//收到配置结构物产生的超阈值告警
alarms := the.getAlarmInfo(alarmData)
if len(alarms) > 0 {
//生成了告警,进行推送
the.UploadAlarmInfo(alarms)
}
}
}
return true
}
func (the *consumerZWYHBJCAS) JudgeExist(structId int) bool {
for _, sId := range the.structIdArr {
if sId == structId {
return true
}
}
return false
}
func GetLevel(alarmCode string) int {
switch alarmCode {
case "30070001":
return 1
case "30070002":
return 2
case "30070003":
return 3
default:
return 0
}
}
func GetTime(timeStr string) int64 {
parsedTime, err := time.Parse("2006-01-02T15:04:05.000Z0700", timeStr)
if err != nil {
return 0
}
unixMilli := parsedTime.UnixMilli()
return unixMilli
}

15
models/IotaData.go

@ -38,3 +38,18 @@ type Data struct {
func (the *Data) Success() bool { func (the *Data) Success() bool {
return the.Result.Code == 0 return the.Result.Code == 0
} }
type AlarmMsg struct {
MessageMode string `json:"messageMode"`
StructureId int `json:"structureId"`
StructureName string `json:"structureName"`
SourceId string `json:"sourceId"`
SourceName string `json:"sourceName"`
AlarmTypeCode string `json:"alarmTypeCode"`
AlarmCode string `json:"alarmCode"`
Content string `json:"content"`
Time string `json:"time"`
SourceTypeId int `json:"sourceTypeId"`
Sponsor interface{} `json:"sponsor"`
Extras interface{} `json:"extras"`
}

Loading…
Cancel
Save