Browse Source

update 添加 告警组合数据部分, 配置读取部分

dev
lucas 3 days ago
parent
commit
7f4a3e6ba2
  1. 2
      build/Dockerfile
  2. 27
      configFiles/config_知物云_组合告警.yaml
  3. 0
      configFiles/弃用备份/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml
  4. 31
      consumers/AlarmCombination/config.go
  5. 1
      consumers/SinoGnssMySQL/config.go
  6. 272
      consumers/consumerAlarmCombination.go
  7. 3
      consumers/consumerManage.go
  8. 21
      dbOperate/_kafka/kafkaHelper.go
  9. 4
      dbOperate/_kafka/producerHelper.go

2
build/Dockerfile

@ -1,4 +1,4 @@
FROM registry.ngaiot.com/base-images/golang:1.20-fs-10 FROM registry.ngaiot.com/base-images/golang-1.22:1.22-fs-3
WORKDIR /app/ WORKDIR /app/
COPY . . COPY . .

27
configFiles/config_知物云_组合告警.yaml

@ -0,0 +1,27 @@
consumer: consumerAlarmCombination
ioConfig:
in:
http:
url: https://esproxy.anxinyun.cn/savoir_alarms/_search
out:
kafka:
brokers: ["10.8.30.160:30992"]
groupId: "zhgj"
topics:
- zuhe_alarm
monitor:
#振动是触发式,数据迟缓 cron10min也改成1小时一次 最多上报6条,不进行10min实时上报
cron_pg: 32 0/1 * * * #6/10 * * * *
#普通类型 特征数据
cron_redis: 20 0/1 * * *
info:
rc4key: sK3kJttzZyf7aZI94zSYgytBYCrfZRt6yil2
queryComponent:
redis:
address: 10.8.30.160:30379
postgres:
connect: "host=10.8.30.165 port=5432 user=FashionAdmin password=123456 dbname=SavoirCloud sslmode=disable"
#点位id对应信息
pointInfo: #测点类型支持 桥墩倾斜 15 裂缝18 支座位移20 桥面振动28 加速度三项监测592(承德隧道专用)

0
configFiles/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml → configFiles/弃用备份/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml

31
consumers/AlarmCombination/config.go

@ -0,0 +1,31 @@
package AlarmCombination
import "goInOut/config"
type ConfigFile struct {
IoConfig ioConfig `yaml:"ioConfig"`
OtherInfo map[string]string `yaml:"info"`
Monitor map[string]string `yaml:"monitor"`
QueryComponent queryComponent `yaml:"queryComponent"`
}
type ioConfig struct {
In In `yaml:"in"`
Out Out `yaml:"out"`
}
type In struct {
Http config.HttpConfig `yaml:"http"`
}
type Out struct {
Kafka config.KafkaConfig `yaml:"kafka"`
}
type queryComponent struct {
Redis struct {
Address string `yaml:"address"`
} `yaml:"redis"`
Pg struct {
Connect string `yaml:"connect"`
} `yaml:"postgres"`
}

1
consumers/SinoGnssMySQL/config.go

@ -24,4 +24,5 @@ type OUT struct {
type RecordInfo struct { type RecordInfo struct {
Id int64 `json:"id"` Id int64 `json:"id"`
TableName string `json:"table_name"` TableName string `json:"table_name"`
Counts int64 `json:"counts"`
} }

272
consumers/consumerAlarmCombination.go

@ -0,0 +1,272 @@
package consumers
import (
"fmt"
"goInOut/adaptors"
"goInOut/consumers/AlarmCombination"
"goInOut/dbOperate"
"goInOut/dbOperate/_kafka"
"goInOut/monitors"
"goInOut/utils"
"gopkg.in/yaml.v3"
"log"
"time"
)
type consumerAlarmCombination struct {
//数据缓存管道
ch chan []adaptors.NeedPush
//具体配置
Info AlarmCombination.ConfigFile
InHttp *dbOperate.HttpHelper
outKafka *_kafka.KafkaHelper
monitor *monitors.CommonMonitor
infoRedis *dbOperate.RedisHelper
infoPg *dbOperate.DBHelper
}
func (the *consumerAlarmCombination) LoadConfigJson(cfgStr string) {
// 将 yaml 格式的数据解析到结构体中
err := yaml.Unmarshal([]byte(cfgStr), &the.Info)
if err != nil {
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
panic(err)
}
}
func (the *consumerAlarmCombination) Initial(cfg string) error {
the.LoadConfigJson(cfg)
err := the.InputInitial()
if err != nil {
return err
}
err = the.OutputInitial()
if err != nil {
return err
}
err = the.infoComponentInitial()
return err
}
func (the *consumerAlarmCombination) InputInitial() error {
the.ch = make(chan []adaptors.NeedPush, 200)
//数据入口
the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""}
the.monitor = &monitors.CommonMonitor{
MonitorHelper: &monitors.MonitorHelper{},
}
the.monitor.Start()
for taskName, cron := range the.Info.Monitor {
switch taskName {
case "cron_pg":
the.monitor.RegisterTask(cron, the.updateCombinationInfo)
case "cron_redis":
the.monitor.RegisterTask(cron, the.getEs1HourAggData)
default:
log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron)
}
}
return nil
}
func (the *consumerAlarmCombination) OutputInitial() error {
//数据出口
the.outKafka = _kafka.KafkaInitial(
the.Info.IoConfig.Out.Kafka.Brokers,
the.Info.IoConfig.Out.Kafka.GroupId)
return nil
}
func (the *consumerAlarmCombination) infoComponentInitial() error {
//数据出口
addr := the.Info.QueryComponent.Redis.Address
the.infoRedis = dbOperate.NewRedisHelper("", addr)
return nil
}
func (the *consumerAlarmCombination) Work() {
go func() {
for {
needPushList := <-the.ch
if len(the.ch) > 0 {
log.Printf("取出ch数据,剩余[%d] ", len(the.ch))
}
for _, push := range needPushList {
if push.Topic != "" {
the.outKafka.Publish(push.Topic, push.Payload)
continue
}
//没有标记topic 的 按照配置文件里面的推送
for _, topic := range the.Info.IoConfig.Out.Kafka.Topics {
the.outKafka.Publish(topic, push.Payload)
}
}
time.Sleep(100 * time.Millisecond)
}
}()
}
func (the *consumerAlarmCombination) getAdaptor() (adaptor adaptors.Adaptor_AXYES_HBGL) {
return adaptors.Adaptor_AXYES_HBGL{
Redis: the.infoRedis,
}
}
func (the *consumerAlarmCombination) getEs1HourAggData() {
start, end := utils.GetTimeRangeByHour(-1)
log.Printf("查询数据时间范围 %s - %s", start, end)
hourFactorIds := []int{15, 18, 20}
structIds := []int64{1, 2}
for _, structId := range structIds {
for _, factorId := range hourFactorIds {
esQuery := the.getESQueryStrByHour(structId, factorId, start, end)
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
adaptor := the.getAdaptor()
needPushes := adaptor.Transform(structId, factorId, esAggResultStr)
if len(needPushes) > 0 {
the.ch <- needPushes
}
}
}
}
func (the *consumerAlarmCombination) updateCombinationInfo() {
log.Printf("更新 数据库 组合配置信息")
structIds := []int64{1, 2}
for _, structId := range structIds {
println(structId)
}
}
func (the *consumerAlarmCombination) getESQueryStrByHour(structureId int64, factorId int, start, end string) string {
aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId)
esQuery := fmt.Sprintf(`
{
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {
"structure": {
"value": %d
}
}
},
{
"term": {
"factor": {
"value": %d
}
}
},
{
"range": {
"collect_time": {
"gte": "%s",
"lt": "%s"
}
}
}
]
}
},
"aggs": {
"groupSensor": {
"terms": {
"field": "sensor"
},
"aggs": {
"groupDate": {
"date_histogram": {
"field": "collect_time",
"interval": "1h",
"time_zone": "Asia/Shanghai",
"min_doc_count": 1
},
"aggs": %s
}
}
}
}
}
`, structureId, factorId, start, end, aggSubSql)
return esQuery
}
func (the *consumerAlarmCombination) getESQueryStrBy10min(structureId int64, factorId int, start, end string) string {
aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId)
esQuery := fmt.Sprintf(`
{
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {
"structure": {
"value": %d
}
}
},
{
"term": {
"factor": {
"value": %d
}
}
},
{
"range": {
"collect_time": {
"gte": "%s",
"lte": "%s"
}
}
}
]
}
},
"aggs": {
"groupSensor": {
"terms": {
"field": "sensor"
},
"aggs": {
"groupDate": {
"date_histogram": {
"field": "collect_time",
"interval": "10m",
"time_zone": "Asia/Shanghai",
"min_doc_count": 1
},
"aggs": %s
}
}
}
}
}
`, structureId, factorId, start, end, aggSubSql)
return esQuery
}
func (the *consumerAlarmCombination) getStructureId() string {
structureId, ok := the.Info.OtherInfo["structureId"]
if !ok {
log.Panicf("无法识别有效的structureId")
}
return structureId
}

3
consumers/consumerManage.go

@ -46,6 +46,9 @@ func GetConsumer(name string) (consumer IConsumer) {
case "consumerZWYHBJCAS": case "consumerZWYHBJCAS":
consumer = new(consumerZWYHBJCAS) consumer = new(consumerZWYHBJCAS)
case "consumerAlarmCombination":
consumer = new(consumerAlarmCombination)
default: default:
consumer = nil consumer = nil
} }

21
dbOperate/_kafka/kafkaHelper.go

@ -2,12 +2,14 @@ package _kafka
import ( import (
"log" "log"
"time"
) )
type KafkaHelper struct { type KafkaHelper struct {
Brokers []string Brokers []string
GroupId string GroupId string
client *ConsumerGroupHandler client *ConsumerGroupHandler
producer *KafkaAsyncProducer
} }
func (the *KafkaHelper) initialClient() *ConsumerGroupHandler { func (the *KafkaHelper) initialClient() *ConsumerGroupHandler {
@ -22,6 +24,25 @@ func (the *KafkaHelper) Subscribe(topic string, callback func(topic string, msg
log.Printf("=================开始订阅 %s [%s]=================", the.Brokers, topic) log.Printf("=================开始订阅 %s [%s]=================", the.Brokers, topic)
the.client.Subscribe(topic, callback) the.client.Subscribe(topic, callback)
} }
func (the *KafkaHelper) Publish(topic string, bytes []byte) {
if the.producer == nil {
the.producer = NewKafkaAsyncProducer(the.Brokers, the.GroupId+"_p")
}
the.producer.Publish(topic, bytes)
}
func (the *KafkaHelper) Worker() { func (the *KafkaHelper) Worker() {
go the.client.Worker() go the.client.Worker()
} }
func KafkaInitial(Brokers []string, GroupId string) *KafkaHelper {
kafkaHelper := KafkaHelper{
Brokers: Brokers,
GroupId: GroupId,
}
kafkaHelper.Initial()
time.Sleep(time.Second * 1)
return &kafkaHelper
}

4
dbOperate/_kafka/producerHelper.go

@ -95,9 +95,9 @@ func (p *KafkaAsyncProducer) Close() {
} }
} }
func NewKafkaAsyncProducer(brokers []string) *KafkaAsyncProducer { func NewKafkaAsyncProducer(brokers []string, clientID string) *KafkaAsyncProducer {
config := sarama.NewConfig() config := sarama.NewConfig()
config.ClientID = "et-go-push" config.ClientID = clientID
config.Net.DialTimeout = time.Second * 10 config.Net.DialTimeout = time.Second * 10
config.Net.ReadTimeout = time.Second * 10 config.Net.ReadTimeout = time.Second * 10
config.Net.WriteTimeout = time.Second * 10 config.Net.WriteTimeout = time.Second * 10

Loading…
Cancel
Save