From 7f4a3e6ba2712c32f9ff72d54918170636a03578 Mon Sep 17 00:00:00 2001 From: lucas Date: Tue, 1 Jul 2025 17:53:28 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E6=B7=BB=E5=8A=A0=20=E5=91=8A?= =?UTF-8?q?=E8=AD=A6=E7=BB=84=E5=90=88=E6=95=B0=E6=8D=AE=E9=83=A8=E5=88=86?= =?UTF-8?q?,=20=E9=85=8D=E7=BD=AE=E8=AF=BB=E5=8F=96=E9=83=A8=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build/Dockerfile | 2 +- .../config_知物云_组合告警.yaml | 27 ++ ...测_知物云_轻量化特征数据.yaml | 0 consumers/AlarmCombination/config.go | 31 ++ consumers/SinoGnssMySQL/config.go | 1 + consumers/consumerAlarmCombination.go | 272 ++++++++++++++++++ consumers/consumerManage.go | 3 + dbOperate/_kafka/kafkaHelper.go | 27 +- dbOperate/_kafka/producerHelper.go | 4 +- 9 files changed, 361 insertions(+), 6 deletions(-) create mode 100644 configFiles/config_知物云_组合告警.yaml rename configFiles/{ => 弃用备份}/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml (100%) create mode 100644 consumers/AlarmCombination/config.go create mode 100644 consumers/consumerAlarmCombination.go diff --git a/build/Dockerfile b/build/Dockerfile index 72d634a..a0aa9e0 100644 --- a/build/Dockerfile +++ b/build/Dockerfile @@ -1,4 +1,4 @@ -FROM registry.ngaiot.com/base-images/golang:1.20-fs-10 +FROM registry.ngaiot.com/base-images/golang-1.22:1.22-fs-3 WORKDIR /app/ COPY . . diff --git a/configFiles/config_知物云_组合告警.yaml b/configFiles/config_知物云_组合告警.yaml new file mode 100644 index 0000000..5c67b5f --- /dev/null +++ b/configFiles/config_知物云_组合告警.yaml @@ -0,0 +1,27 @@ +consumer: consumerAlarmCombination +ioConfig: + in: + http: + url: https://esproxy.anxinyun.cn/savoir_alarms/_search + out: + kafka: + brokers: ["10.8.30.160:30992"] + groupId: "zhgj" + topics: + - zuhe_alarm +monitor: + #振动是触发式,数据迟缓 cron10min也改成1小时一次 最多上报6条,不进行10min实时上报 + cron_pg: 32 0/1 * * * #6/10 * * * * + #普通类型 特征数据 + cron_redis: 20 0/1 * * * +info: + rc4key: sK3kJttzZyf7aZI94zSYgytBYCrfZRt6yil2 +queryComponent: + redis: + address: 10.8.30.160:30379 + postgres: + connect: "host=10.8.30.165 port=5432 user=FashionAdmin password=123456 dbname=SavoirCloud sslmode=disable" +#点位id对应信息 +pointInfo: #测点类型支持 桥墩倾斜 15 裂缝18 支座位移20 桥面振动28 加速度三项监测592(承德隧道专用) + + diff --git a/configFiles/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml b/configFiles/弃用备份/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml similarity index 100% rename from configFiles/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml rename to configFiles/弃用备份/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml diff --git a/consumers/AlarmCombination/config.go b/consumers/AlarmCombination/config.go new file mode 100644 index 0000000..f630d57 --- /dev/null +++ b/consumers/AlarmCombination/config.go @@ -0,0 +1,31 @@ +package AlarmCombination + +import "goInOut/config" + +type ConfigFile struct { + IoConfig ioConfig `yaml:"ioConfig"` + OtherInfo map[string]string `yaml:"info"` + + Monitor map[string]string `yaml:"monitor"` + QueryComponent queryComponent `yaml:"queryComponent"` +} +type ioConfig struct { + In In `yaml:"in"` + Out Out `yaml:"out"` +} +type In struct { + Http config.HttpConfig `yaml:"http"` +} + +type Out struct { + Kafka config.KafkaConfig `yaml:"kafka"` +} + +type queryComponent struct { + Redis struct { + Address string `yaml:"address"` + } `yaml:"redis"` + Pg struct { + Connect string `yaml:"connect"` + } `yaml:"postgres"` +} diff --git a/consumers/SinoGnssMySQL/config.go b/consumers/SinoGnssMySQL/config.go index 2f0ec37..0636f73 100644 --- a/consumers/SinoGnssMySQL/config.go +++ b/consumers/SinoGnssMySQL/config.go @@ -24,4 +24,5 @@ type OUT struct { type RecordInfo struct { Id int64 `json:"id"` TableName string `json:"table_name"` + Counts int64 `json:"counts"` } diff --git a/consumers/consumerAlarmCombination.go b/consumers/consumerAlarmCombination.go new file mode 100644 index 0000000..33c8d72 --- /dev/null +++ b/consumers/consumerAlarmCombination.go @@ -0,0 +1,272 @@ +package consumers + +import ( + "fmt" + "goInOut/adaptors" + "goInOut/consumers/AlarmCombination" + "goInOut/dbOperate" + "goInOut/dbOperate/_kafka" + "goInOut/monitors" + "goInOut/utils" + "gopkg.in/yaml.v3" + "log" + "time" +) + +type consumerAlarmCombination struct { + //数据缓存管道 + ch chan []adaptors.NeedPush + //具体配置 + Info AlarmCombination.ConfigFile + InHttp *dbOperate.HttpHelper + outKafka *_kafka.KafkaHelper + monitor *monitors.CommonMonitor + infoRedis *dbOperate.RedisHelper + infoPg *dbOperate.DBHelper +} + +func (the *consumerAlarmCombination) LoadConfigJson(cfgStr string) { + // 将 yaml 格式的数据解析到结构体中 + err := yaml.Unmarshal([]byte(cfgStr), &the.Info) + if err != nil { + log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) + panic(err) + } +} + +func (the *consumerAlarmCombination) Initial(cfg string) error { + the.LoadConfigJson(cfg) + err := the.InputInitial() + if err != nil { + return err + } + err = the.OutputInitial() + if err != nil { + return err + } + err = the.infoComponentInitial() + return err +} +func (the *consumerAlarmCombination) InputInitial() error { + the.ch = make(chan []adaptors.NeedPush, 200) + //数据入口 + the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""} + the.monitor = &monitors.CommonMonitor{ + MonitorHelper: &monitors.MonitorHelper{}, + } + + the.monitor.Start() + for taskName, cron := range the.Info.Monitor { + switch taskName { + case "cron_pg": + the.monitor.RegisterTask(cron, the.updateCombinationInfo) + case "cron_redis": + the.monitor.RegisterTask(cron, the.getEs1HourAggData) + default: + log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) + } + } + + return nil +} +func (the *consumerAlarmCombination) OutputInitial() error { + //数据出口 + the.outKafka = _kafka.KafkaInitial( + the.Info.IoConfig.Out.Kafka.Brokers, + the.Info.IoConfig.Out.Kafka.GroupId) + return nil +} + +func (the *consumerAlarmCombination) infoComponentInitial() error { + //数据出口 + addr := the.Info.QueryComponent.Redis.Address + the.infoRedis = dbOperate.NewRedisHelper("", addr) + return nil +} +func (the *consumerAlarmCombination) Work() { + go func() { + for { + needPushList := <-the.ch + if len(the.ch) > 0 { + log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) + } + + for _, push := range needPushList { + if push.Topic != "" { + the.outKafka.Publish(push.Topic, push.Payload) + continue + } + + //没有标记topic 的 按照配置文件里面的推送 + for _, topic := range the.Info.IoConfig.Out.Kafka.Topics { + the.outKafka.Publish(topic, push.Payload) + } + + } + + time.Sleep(100 * time.Millisecond) + } + }() +} + +func (the *consumerAlarmCombination) getAdaptor() (adaptor adaptors.Adaptor_AXYES_HBGL) { + + return adaptors.Adaptor_AXYES_HBGL{ + Redis: the.infoRedis, + } +} + +func (the *consumerAlarmCombination) getEs1HourAggData() { + start, end := utils.GetTimeRangeByHour(-1) + log.Printf("查询数据时间范围 %s - %s", start, end) + hourFactorIds := []int{15, 18, 20} + structIds := []int64{1, 2} + for _, structId := range structIds { + for _, factorId := range hourFactorIds { + esQuery := the.getESQueryStrByHour(structId, factorId, start, end) + auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} + esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) + + adaptor := the.getAdaptor() + needPushes := adaptor.Transform(structId, factorId, esAggResultStr) + + if len(needPushes) > 0 { + the.ch <- needPushes + } + } + } + +} + +func (the *consumerAlarmCombination) updateCombinationInfo() { + log.Printf("更新 数据库 组合配置信息") + + structIds := []int64{1, 2} + for _, structId := range structIds { + + println(structId) + } + +} + +func (the *consumerAlarmCombination) getESQueryStrByHour(structureId int64, factorId int, start, end string) string { + aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) + esQuery := fmt.Sprintf(` +{ + "size": 0, + "query": { + "bool": { + "must": [ + { + "term": { + "structure": { + "value": %d + } + } + }, + { + "term": { + "factor": { + "value": %d + } + } + }, + { + "range": { + "collect_time": { + "gte": "%s", + "lt": "%s" + } + } + } + ] + } + }, + "aggs": { + "groupSensor": { + "terms": { + "field": "sensor" + }, + "aggs": { + "groupDate": { + "date_histogram": { + "field": "collect_time", + "interval": "1h", + "time_zone": "Asia/Shanghai", + "min_doc_count": 1 + }, + "aggs": %s + } + } + } + } +} +`, structureId, factorId, start, end, aggSubSql) + + return esQuery +} + +func (the *consumerAlarmCombination) getESQueryStrBy10min(structureId int64, factorId int, start, end string) string { + aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) + esQuery := fmt.Sprintf(` +{ + "size": 0, + "query": { + "bool": { + "must": [ + { + "term": { + "structure": { + "value": %d + } + } + }, + { + "term": { + "factor": { + "value": %d + } + } + }, + { + "range": { + "collect_time": { + "gte": "%s", + "lte": "%s" + } + } + } + ] + } + }, + "aggs": { + "groupSensor": { + "terms": { + "field": "sensor" + }, + "aggs": { + "groupDate": { + "date_histogram": { + "field": "collect_time", + "interval": "10m", + "time_zone": "Asia/Shanghai", + "min_doc_count": 1 + }, + "aggs": %s + } + } + } + } +} +`, structureId, factorId, start, end, aggSubSql) + + return esQuery +} + +func (the *consumerAlarmCombination) getStructureId() string { + structureId, ok := the.Info.OtherInfo["structureId"] + if !ok { + log.Panicf("无法识别有效的structureId") + } + return structureId +} diff --git a/consumers/consumerManage.go b/consumers/consumerManage.go index b569ecf..ed11339 100644 --- a/consumers/consumerManage.go +++ b/consumers/consumerManage.go @@ -46,6 +46,9 @@ func GetConsumer(name string) (consumer IConsumer) { case "consumerZWYHBJCAS": consumer = new(consumerZWYHBJCAS) + + case "consumerAlarmCombination": + consumer = new(consumerAlarmCombination) default: consumer = nil } diff --git a/dbOperate/_kafka/kafkaHelper.go b/dbOperate/_kafka/kafkaHelper.go index 4bdf3f4..192de56 100644 --- a/dbOperate/_kafka/kafkaHelper.go +++ b/dbOperate/_kafka/kafkaHelper.go @@ -2,12 +2,14 @@ package _kafka import ( "log" + "time" ) type KafkaHelper struct { - Brokers []string - GroupId string - client *ConsumerGroupHandler + Brokers []string + GroupId string + client *ConsumerGroupHandler + producer *KafkaAsyncProducer } func (the *KafkaHelper) initialClient() *ConsumerGroupHandler { @@ -22,6 +24,25 @@ func (the *KafkaHelper) Subscribe(topic string, callback func(topic string, msg log.Printf("=================开始订阅 %s [%s]=================", the.Brokers, topic) the.client.Subscribe(topic, callback) } + +func (the *KafkaHelper) Publish(topic string, bytes []byte) { + if the.producer == nil { + the.producer = NewKafkaAsyncProducer(the.Brokers, the.GroupId+"_p") + } + the.producer.Publish(topic, bytes) +} func (the *KafkaHelper) Worker() { go the.client.Worker() } + +func KafkaInitial(Brokers []string, GroupId string) *KafkaHelper { + kafkaHelper := KafkaHelper{ + Brokers: Brokers, + GroupId: GroupId, + } + + kafkaHelper.Initial() + + time.Sleep(time.Second * 1) + return &kafkaHelper +} diff --git a/dbOperate/_kafka/producerHelper.go b/dbOperate/_kafka/producerHelper.go index 0653cfc..9844c95 100644 --- a/dbOperate/_kafka/producerHelper.go +++ b/dbOperate/_kafka/producerHelper.go @@ -95,9 +95,9 @@ func (p *KafkaAsyncProducer) Close() { } } -func NewKafkaAsyncProducer(brokers []string) *KafkaAsyncProducer { +func NewKafkaAsyncProducer(brokers []string, clientID string) *KafkaAsyncProducer { config := sarama.NewConfig() - config.ClientID = "et-go-push" + config.ClientID = clientID config.Net.DialTimeout = time.Second * 10 config.Net.ReadTimeout = time.Second * 10 config.Net.WriteTimeout = time.Second * 10