package consumers import ( "encoding/json" "fmt" "goInOut/adaptors" "goInOut/consumers/SavoirTheme" "goInOut/dbOperate" "goInOut/dbOperate/_kafka" "goInOut/models" "goInOut/monitors" "gopkg.in/yaml.v3" "log" "strings" "sync" "time" ) type consumerSavoirTheme struct { //数据缓存管道 dataCache chan *models.EsTheme //具体配置 Info SavoirTheme.ConfigFile InKafka _kafka.KafkaHelper OutEs dbOperate.ESHelper infoPg *dbOperate.DBHelper sinkMap sync.Map lock sync.Mutex logTagId int monitor *monitors.CommonMonitor //数据库配置信息 pgOffLineGaps []SavoirTheme.OffLineGap } func (the *consumerSavoirTheme) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := yaml.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerSavoirTheme) Initial(cfg string) error { the.sinkMap = sync.Map{} the.dataCache = make(chan *models.EsTheme, 1000) the.LoadConfigJson(cfg) err := the.inputInitial() if err != nil { return err } err = the.outputInitial() if err != nil { return err } err = the.infoComponentInitial() if err != nil { return err } err = the.monitorInitial() return err } func (the *consumerSavoirTheme) inputInitial() error { //数据入口 the.InKafka = _kafka.KafkaHelper{ Brokers: the.Info.IoConfig.In.Kafka.Brokers, GroupId: the.Info.IoConfig.In.Kafka.GroupId, } the.InKafka.Initial() for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics { the.InKafka.Subscribe(inTopic, the.onData) } the.InKafka.Worker() return nil } func (the *consumerSavoirTheme) outputInitial() error { //数据出口 the.OutEs = *dbOperate.NewESHelper( the.Info.IoConfig.Out.Es.Address, the.Info.IoConfig.Out.Es.Auth.UserName, the.Info.IoConfig.Out.Es.Auth.Password, ) return nil } func (the *consumerSavoirTheme) infoComponentInitial() error { //数据出口 pgConnStr := the.Info.QueryComponent.Pg.Connect the.infoPg = dbOperate.NewDBHelper("postgres", pgConnStr) return nil } func (the *consumerSavoirTheme) monitorInitial() error { the.monitor = &monitors.CommonMonitor{ MonitorHelper: &monitors.MonitorHelper{}, } the.monitor.Start() for taskName, cron := range the.Info.Monitor { switch taskName { case "cron": the.monitor.RegisterTask(cron, the.statisticsOffline) default: log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) } } return nil } func (the *consumerSavoirTheme) statisticsOffline() { log.Printf("--> 定时任务 更新数据库 配置信息") sql := `SELECT off.*,s.name FROM "t_struct_factor_offlinegap" as off left join t_structure as s ON off.struct_id=s.id where off.is_open=true order by off.struct_id` err := the.infoPg.Query(&the.pgOffLineGaps, sql) if err != nil { log.Printf("查询数据库异常:err-> %s", err.Error()) return } log.Printf("当前共 %d条 启用配置", len(the.pgOffLineGaps)) //立即触发 the.judgeOffline() } func (the *consumerSavoirTheme) judgeOffline() { now := time.Now() for _, gap := range the.pgOffLineGaps { var alarmDetails []string if !gap.IsOpen { continue } log.Printf("判断 s:%d,f:%d,durMin:%d", gap.StructId, gap.FactorId, gap.OfflineGap) queryStr := the.getESOfflineAlarmQueryStr(gap.StructId, gap.FactorId) allThemes, err := the.OutEs.SearchThemeData("savoir_last_theme", queryStr) if err != nil { log.Printf("查询es 异常") } log.Printf("查询相关测点数=%d", len(allThemes)) for _, theme := range allThemes { offlineMin := now.Sub(theme.CollectTime).Minutes() log.Printf("s:%d,f:%d,sensor:%d 离线%f min", gap.StructId, gap.FactorId, theme.Sensor, offlineMin) if offlineMin > float64(gap.OfflineGap) { msg := fmt.Sprintf("测点[%s]离线%f min > %d min", theme.SensorName, offlineMin, gap.OfflineGap) log.Printf("----- > %s", msg) alarmDetails = append(alarmDetails, msg) } } prefix := "offline-" sourceId := prefix + fmt.Sprintf("%d-%d", gap.StructId, gap.FactorId) if len(alarmDetails) > 0 { alarmMsg := models.KafkaAlarm{ MessageMode: "AlarmGeneration", StructureId: gap.StructId, StructureName: gap.StructName, SourceId: sourceId, SourceName: gap.StructName, AlarmTypeCode: "8004", AlarmCode: "80040003", Content: strings.Join(alarmDetails, ","), Time: time.Now().Format("2006-01-02T15:04:05+0800"), SourceTypeId: 1, // 0:DTU, 1:传感器, 2:测点 Sponsor: "goInOut_savoirTheme", Extras: nil, SubDevices: nil, } payload, _ := json.Marshal(alarmMsg) the.InKafka.Publish("savoir_alarm", payload) } } } func (the *consumerSavoirTheme) getESOfflineAlarmQueryStr(structId, factorId int) string { esQuery := fmt.Sprintf(` { "query": { "bool": { "must": [ { "term": { "structure": { "value": %d } } }, { "term": { "factor": { "value": %d } } } ] } } } `, structId, factorId) return esQuery } func (the *consumerSavoirTheme) sinkTask() { intervalSec := the.Info.IoConfig.Out.Es.Interval ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) defer ticker.Stop() for { <-ticker.C the.toSink() } } func (the *consumerSavoirTheme) toSink() { var themes []models.EsTheme the.lock.Lock() defer the.lock.Unlock() the.sinkMap.Range(func(key, value any) bool { if v, ok := value.(*models.EsTheme); ok { themes = append(themes, *v) //零时打日志用 if v.Sensor == the.logTagId { bs, _ := json.Marshal(v) log.Printf("toSink -> Range 标记测点数据 [%d] %s ", the.logTagId, string(bs)) } return ok } else { log.Printf("!!! toSink -> Range 类型转换异常 [%v]", key) } return true }) if len(themes) > 0 { index := the.Info.IoConfig.Out.Es.Index log.Printf("写入es [%s] %d条", index, len(themes)) the.OutEs.BulkWriteThemes2Es(index, themes) the.sinkMap.Clear() } } func (the *consumerSavoirTheme) Work() { log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId) go the.sinkTask() go func() { for { pushEsTheme := <-the.dataCache if pushEsTheme.Sensor == the.logTagId { bs, _ := json.Marshal(pushEsTheme) log.Printf("存储 标记测点数据 [%d] %s ", the.logTagId, string(bs)) } //有效数据存入缓存 the.lock.Lock() the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme) the.lock.Unlock() } }() } func (the *consumerSavoirTheme) onData(topic string, msg string) bool { //if len(msg) > 80 { // log.Printf("recv:[%s]:%s ...", topic, msg[:80]) //} adaptor := adaptors.Adaptor_Savoir_LastTheme{} needPush := adaptor.Transform(topic, msg) if needPush != nil && needPush.Data != nil { the.dataCache <- needPush } else { s, _ := json.Marshal(needPush) if needPush != nil { if needPush.Sensor == the.logTagId { log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s) } } } return true }