diff --git a/adaptors/知物云es主题特征to河北公路设施监测.go b/adaptors/知物云es主题特征to河北公路设施监测.go index 760bb2d..9e51cbe 100644 --- a/adaptors/知物云es主题特征to河北公路设施监测.go +++ b/adaptors/知物云es主题特征to河北公路设施监测.go @@ -50,7 +50,7 @@ func (the Adaptor_ZWYES_HBGL) EsAggTopToHBJCAS(structId int64, factorId int, esA return } //设施唯一编码(省平台) - uniqueCode := the.getUniqueCode(structId) + uniqueCode := the.GetUniqueCode(structId) if uniqueCode == 0 { log.Printf("structId=%d,无匹配省平台uniqueCode", structId) return @@ -68,7 +68,7 @@ func (the Adaptor_ZWYES_HBGL) EsAggTopToHBJCAS(structId int64, factorId int, esA log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签异常", structId, factorId, sensorId) continue } - monitorCode := the.getPointCodeFromLabel(station.Labels) + monitorCode := the.GetPointCodeFromLabel(station.Labels) if monitorCode == 0 { log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签,信息转换int64异常,跳过", structId, factorId, sensorId) continue @@ -161,14 +161,14 @@ func (the Adaptor_ZWYES_HBGL) EsAgg2StatisticData(factorId int, monitorCode int6 return dataDefinitionStatisticData } -func (the Adaptor_ZWYES_HBGL) getUniqueCode(structId int64) (uniqueCode int64) { +func (the Adaptor_ZWYES_HBGL) GetUniqueCode(structId int64) (uniqueCode int64) { if v, ok := the.StructInfo[structId]; ok { uniqueCode = v } return uniqueCode } -func (the Adaptor_ZWYES_HBGL) getPointCodeFromLabel(label string) int64 { +func (the Adaptor_ZWYES_HBGL) GetPointCodeFromLabel(label string) int64 { //解析label {13010600001} pointUniqueCode := int64(0) if len(label) > 2 { diff --git a/configFiles/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml b/configFiles/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml index 4e73687..7ba9716 100644 --- a/configFiles/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml +++ b/configFiles/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml @@ -3,6 +3,12 @@ ioConfig: in: http: url: https://esproxy.anxinyun.cn/savoir_themes/_search + kafka: + brokers: + - 10.8.30.160:30992 + groupId: anxinHebeiGL_01 + topics: + - savoir_alarm out: mqtt: host: 123.249.81.52 diff --git a/consumers/HBJCAS/config.go b/consumers/HBJCAS/config.go index e505bde..6ca316c 100644 --- a/consumers/HBJCAS/config.go +++ b/consumers/HBJCAS/config.go @@ -19,7 +19,8 @@ type ioConfig struct { Out Out `yaml:"out"` } type In struct { - Http config.HttpConfig `yaml:"http"` + Http config.HttpConfig `yaml:"http"` + Kafka config.KafkaConfig `json:"kafka"` } type Out struct { diff --git a/consumers/HBJCAS/interfaceBody.go b/consumers/HBJCAS/interfaceBody.go index dba6a46..3b5c80b 100644 --- a/consumers/HBJCAS/interfaceBody.go +++ b/consumers/HBJCAS/interfaceBody.go @@ -22,7 +22,7 @@ type HealthInfo struct { //报警信息内容 type WarningInfo struct { AlarmId string `json:"alarmId"` - UniqueCode int `json:"uniqueCode"` + UniqueCode int64 `json:"uniqueCode"` PointUniqueCode int64 `json:"pointUniqueCode"` AlarmLevel int `json:"alarmLevel"` MonitorValue string `json:"monitorValue"` diff --git a/consumers/consumerZWYHBJCAS.go b/consumers/consumerZWYHBJCAS.go index c810fa9..f50b30f 100644 --- a/consumers/consumerZWYHBJCAS.go +++ b/consumers/consumerZWYHBJCAS.go @@ -7,10 +7,13 @@ import ( "encoding/hex" "encoding/json" "fmt" + "github.com/google/uuid" "github.com/tjfoc/gmsm/sm3" "goInOut/adaptors" "goInOut/consumers/HBJCAS" "goInOut/dbOperate" + "goInOut/dbOperate/_kafka" + "goInOut/models" "goInOut/monitors" "goInOut/utils" "gopkg.in/yaml.v3" @@ -18,6 +21,7 @@ import ( "io/ioutil" "log" "net/http" + "regexp" "strings" "time" ) @@ -26,13 +30,15 @@ type consumerZWYHBJCAS struct { //数据缓存管道 ch chan []adaptors.NeedPush //具体配置 - Info HBJCAS.ConfigFile - Seq int64 - SeqDate string - InHttp *dbOperate.HttpHelper - outMqtt *dbOperate.MqttHelper - monitor *monitors.CommonMonitor - infoRedis *dbOperate.RedisHelper + Info HBJCAS.ConfigFile + Seq int64 + SeqDate string + InHttp *dbOperate.HttpHelper + InKafka _kafka.KafkaHelper + outMqtt *dbOperate.MqttHelper + monitor *monitors.CommonMonitor + infoRedis *dbOperate.RedisHelper + structIdArr []int } func (the *consumerZWYHBJCAS) LoadConfigJson(cfgStr string) { @@ -42,6 +48,12 @@ func (the *consumerZWYHBJCAS) LoadConfigJson(cfgStr string) { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } + the.structIdArr = make([]int, 0) + if len(the.Info.StructInfo) > 0 { + for structId, _ := range the.Info.StructInfo { + the.structIdArr = append(the.structIdArr, int(structId)) + } + } } func (the *consumerZWYHBJCAS) Initial(cfg string) error { @@ -61,6 +73,16 @@ func (the *consumerZWYHBJCAS) InputInitial() error { the.ch = make(chan []adaptors.NeedPush, 200) //数据入口 the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""} + the.InKafka = _kafka.KafkaHelper{ + Brokers: the.Info.IoConfig.In.Kafka.Brokers, + GroupId: the.Info.IoConfig.In.Kafka.GroupId, + } + the.InKafka.Initial() + for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics { + the.InKafka.Subscribe(inTopic, the.onData) + } + + the.InKafka.Worker() the.monitor = &monitors.CommonMonitor{ MonitorHelper: &monitors.MonitorHelper{}, } @@ -377,6 +399,71 @@ func (the *consumerZWYHBJCAS) getCodeStatus() []interface{} { return res } +func (the *consumerZWYHBJCAS) getAlarmInfo(am models.AlarmMsg) []interface{} { + alarms := make([]interface{}, 0) + content := am.Content + sourceId := am.SourceId + structureId := am.StructureId + level := GetLevel(am.AlarmCode) + if level == 0 { + //进行告警推送 + return alarms + } + msgArr := strings.Split(content, ";") + valArr := make([]string, 0) + unitArr := make([]string, 0) + re := regexp.MustCompile(`:([\d\.]+)(\D+),`) + for _, msg := range msgArr { + match := re.FindStringSubmatch(msg) + if match != nil { + // 提取数字和单位 + number := match[1] // 第一个捕获组,数字部分 + valArr = append(valArr, number) + unit := match[2] // 第二个捕获组,单位部分 + unitArr = append(unitArr, unit) + } + } + + id := uuid.New().String() + station := models.Station{} + k1 := fmt.Sprintf("station:%s", sourceId) + adaptor := the.getAdaptor() + errRedis := adaptor.Redis.GetObj(k1, &station) + if errRedis != nil { + log.Printf("redis 获取[s:%d]测点[%s]标签异常", structureId, sourceId) + return alarms + } + monitorCode := adaptor.GetPointCodeFromLabel(station.Labels) + uniqueCode := adaptor.GetUniqueCode(int64(structureId)) + if uniqueCode == 0 { + log.Printf("structId=%d,无匹配省平台uniqueCode", structureId) + return alarms + } + alTime := am.Time + tsp := GetTime(alTime) + nTsp := time.Now().UnixMilli() + al := HBJCAS.WarningInfo{ + AlarmId: id, + UniqueCode: uniqueCode, + PointUniqueCode: monitorCode, + AlarmLevel: level, + MonitorValue: strings.Join(valArr, ","), + Unit: strings.Join(unitArr, ","), + AlarmStartTime: tsp, + ReportToProvinceTime: nTsp, + ReportToProvinceUser: "王俊伟", + ReportToProvinceUserTel: "15933033309", + AlarmStatus: "Unhandled", + HandleTime: nTsp, + HandleUser: "王俊伟", + HandleUserTel: "15933033309", + HandleContent: "立即处理", + Test: false, + } + alarms = append(alarms, al) + return alarms +} + // JWT头部 type Header struct { Typ string `json:"typ"` @@ -439,7 +526,7 @@ func (the *consumerZWYHBJCAS) GenerateJWT() (string, error) { return jwt, nil } -func (the *consumerZWYHBJCAS) UploadInfo(uploadType string) { +func (the *consumerZWYHBJCAS) UploadInfo(uploadType string, vv []interface{}) { urlIndex, ok := the.Info.OtherInfo["urlIndex"] if !ok { log.Println("未配置省平台业务数据接口=============") @@ -460,6 +547,9 @@ func (the *consumerZWYHBJCAS) UploadInfo(uploadType string) { if len(bodyInfo) == 0 { return } + case "warningInfo": + url = urlIndex + "warningInfo/sync" + bodyInfo = vv default: return } @@ -480,11 +570,15 @@ func (the *consumerZWYHBJCAS) UploadInfo(uploadType string) { } } func (the *consumerZWYHBJCAS) UploadCamInfo() { - the.UploadInfo("cameraInfo") + the.UploadInfo("cameraInfo", nil) } func (the *consumerZWYHBJCAS) UploadHeaInfo() { - the.UploadInfo("healthInfo") + the.UploadInfo("healthInfo", nil) +} + +func (the *consumerZWYHBJCAS) UploadAlarmInfo(vv []interface{}) { + the.UploadInfo("warningInfo", vv) } func (the *consumerZWYHBJCAS) postInfo(url, payloadStr string) error { @@ -560,3 +654,57 @@ func downloadFile(url string) ([]byte, error) { return fileContent, nil } + +func (the *consumerZWYHBJCAS) onData(topic string, msg string) bool { + //if len(msg) > 80 { + // log.Printf("recv:[%s]:%s ...", topic, msg[:80]) + //} + switch topic { + case "savoir_alarm": + alarmData := models.AlarmMsg{} + err := json.Unmarshal([]byte(msg), &alarmData) + if err != nil { + log.Printf("反序列化 异常 alarm数据=%s", msg) + } + if alarmData.AlarmTypeCode == "3007" && the.JudgeExist(alarmData.StructureId) { + //收到配置结构物产生的超阈值告警 + alarms := the.getAlarmInfo(alarmData) + if len(alarms) > 0 { + //生成了告警,进行推送 + the.UploadAlarmInfo(alarms) + } + } + } + return true +} + +func (the *consumerZWYHBJCAS) JudgeExist(structId int) bool { + for _, sId := range the.structIdArr { + if sId == structId { + return true + } + } + return false +} + +func GetLevel(alarmCode string) int { + switch alarmCode { + case "30070001": + return 1 + case "30070002": + return 2 + case "30070003": + return 3 + default: + return 0 + } +} + +func GetTime(timeStr string) int64 { + parsedTime, err := time.Parse("2006-01-02T15:04:05.000Z0700", timeStr) + if err != nil { + return 0 + } + unixMilli := parsedTime.UnixMilli() + return unixMilli +} diff --git a/models/IotaData.go b/models/IotaData.go index 5439cc5..bf43d99 100644 --- a/models/IotaData.go +++ b/models/IotaData.go @@ -38,3 +38,18 @@ type Data struct { func (the *Data) Success() bool { return the.Result.Code == 0 } + +type AlarmMsg struct { + MessageMode string `json:"messageMode"` + StructureId int `json:"structureId"` + StructureName string `json:"structureName"` + SourceId string `json:"sourceId"` + SourceName string `json:"sourceName"` + AlarmTypeCode string `json:"alarmTypeCode"` + AlarmCode string `json:"alarmCode"` + Content string `json:"content"` + Time string `json:"time"` + SourceTypeId int `json:"sourceTypeId"` + Sponsor interface{} `json:"sponsor"` + Extras interface{} `json:"extras"` +}