You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							370 lines
						
					
					
						
							9.6 KiB
						
					
					
				
			
		
		
		
			
			
			
				
					
				
				
					
				
			
		
		
	
	
							370 lines
						
					
					
						
							9.6 KiB
						
					
					
				| package consumers | |
| 
 | |
| import ( | |
| 	"encoding/json" | |
| 	"fmt" | |
| 	"goInOut/adaptors" | |
| 	"goInOut/consumers/SavoirTheme" | |
| 	"goInOut/dbOperate" | |
| 	"goInOut/dbOperate/_kafka" | |
| 	"goInOut/models" | |
| 	"goInOut/monitors" | |
| 	"gopkg.in/yaml.v3" | |
| 	"log" | |
| 	"regexp" | |
| 	"sync" | |
| 	"time" | |
| ) | |
| 
 | |
| type consumerSavoirTheme struct { | |
| 	//数据缓存管道 | |
| 	dataCache chan *models.EsTheme | |
| 	//具体配置 | |
| 	Info     SavoirTheme.ConfigFile | |
| 	InKafka  _kafka.KafkaHelper | |
| 	OutEs    dbOperate.ESHelper | |
| 	infoPg   *dbOperate.DBHelper | |
| 	sinkMap  sync.Map | |
| 	lock     sync.Mutex | |
| 	logTagId int | |
| 	monitor  *monitors.CommonMonitor | |
| 	//数据库配置信息 | |
| 	pgOffLineGaps []SavoirTheme.OffLineGap | |
| } | |
| 
 | |
| func (the *consumerSavoirTheme) LoadConfigJson(cfgStr string) { | |
| 	// 将 JSON 格式的数据解析到结构体中 | |
| 	err := yaml.Unmarshal([]byte(cfgStr), &the.Info) | |
| 	if err != nil { | |
| 		log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) | |
| 		panic(err) | |
| 	} | |
| } | |
| 
 | |
| func (the *consumerSavoirTheme) Initial(cfg string) error { | |
| 	the.sinkMap = sync.Map{} | |
| 	the.dataCache = make(chan *models.EsTheme, 1000) | |
| 
 | |
| 	the.LoadConfigJson(cfg) | |
| 	err := the.inputInitial() | |
| 	if err != nil { | |
| 		return err | |
| 	} | |
| 	err = the.outputInitial() | |
| 	if err != nil { | |
| 		return err | |
| 	} | |
| 	err = the.infoComponentInitial() | |
| 	if err != nil { | |
| 		return err | |
| 	} | |
| 
 | |
| 	err = the.monitorInitial() | |
| 	return err | |
| } | |
| func (the *consumerSavoirTheme) inputInitial() error { | |
| 	//数据入口 | |
| 	the.InKafka = _kafka.KafkaHelper{ | |
| 		Brokers: the.Info.IoConfig.In.Kafka.Brokers, | |
| 		GroupId: the.Info.IoConfig.In.Kafka.GroupId, | |
| 	} | |
| 	the.InKafka.Initial() | |
| 	for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics { | |
| 		the.InKafka.Subscribe(inTopic, the.onData) | |
| 	} | |
| 
 | |
| 	the.InKafka.Worker() | |
| 	return nil | |
| } | |
| func (the *consumerSavoirTheme) outputInitial() error { | |
| 	//数据出口 | |
| 	the.OutEs = *dbOperate.NewESHelper( | |
| 		the.Info.IoConfig.Out.Es.Address, | |
| 		the.Info.IoConfig.Out.Es.Auth.UserName, | |
| 		the.Info.IoConfig.Out.Es.Auth.Password, | |
| 	) | |
| 
 | |
| 	return nil | |
| } | |
| 
 | |
| func (the *consumerSavoirTheme) infoComponentInitial() error { | |
| 	//数据出口 | |
| 	pgConnStr := the.Info.QueryComponent.Pg.Connect | |
| 	the.infoPg = dbOperate.NewDBHelper("postgres", pgConnStr) | |
| 	return nil | |
| } | |
| 
 | |
| func (the *consumerSavoirTheme) monitorInitial() error { | |
| 	the.monitor = &monitors.CommonMonitor{ | |
| 		MonitorHelper: &monitors.MonitorHelper{}, | |
| 	} | |
| 
 | |
| 	the.monitor.Start() | |
| 	for taskName, cron := range the.Info.Monitor { | |
| 		switch taskName { | |
| 		case "cron": | |
| 			the.monitor.RegisterTask(cron, the.statisticsOffline) | |
| 		default: | |
| 			log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) | |
| 		} | |
| 	} | |
| 
 | |
| 	return nil | |
| } | |
| 
 | |
| func (the *consumerSavoirTheme) statisticsOffline() { | |
| 	log.Printf("-->  定时任务 更新数据库 配置信息") | |
| 	sql := `SELECT off.*,s.name FROM "t_struct_factor_offlinegap"  as off   left join t_structure as s | |
| ON off.struct_id=s.id | |
| where off.is_open=true order by off.struct_id` | |
| 	err := the.infoPg.Query(&the.pgOffLineGaps, sql) | |
| 	if err != nil { | |
| 		log.Printf("查询数据库异常:err-> %s", err.Error()) | |
| 		return | |
| 	} | |
| 	log.Printf("当前共 %d条 启用配置", len(the.pgOffLineGaps)) | |
| 
 | |
| 	//立即触发 | |
| 	the.judgeOffline() | |
| 
 | |
| } | |
| func (the *consumerSavoirTheme) judgeOffline() { | |
| 	now := time.Now() | |
| 	for _, gap := range the.pgOffLineGaps { | |
| 		var alarmDetails string | |
| 		if !gap.IsOpen { | |
| 			continue | |
| 		} | |
| 		log.Printf("判断 s:%d,f:%d,durMin:%d", gap.StructId, gap.FactorId, gap.OfflineGap) | |
| 		queryStr := the.getESOfflineAlarmQueryStr(gap.StructId, gap.FactorId) | |
| 		allThemes, err := the.OutEs.SearchThemeData("savoir_last_theme", queryStr) | |
| 		if err != nil { | |
| 			log.Printf("查询es 异常") | |
| 		} | |
| 		log.Printf("查询相关测点数=%d", len(allThemes)) | |
| 		for _, theme := range allThemes { | |
| 			offlineMin := now.Sub(theme.CreateTime).Minutes() | |
| 			log.Printf("s:%d,f:%d,sensor:%d 离线%f min", gap.StructId, gap.FactorId, theme.Sensor, offlineMin) | |
| 			log.Printf("s:%d,f:%d,sensor:%d 采集时间%s 当前时间%s", gap.StructId, gap.FactorId, theme.Sensor, theme.CreateTime, now) | |
| 
 | |
| 			//拿到当前es最后一条数据和当前数据库的配置之后去查是否产生告警 | |
| 			StrValue := "8004" | |
| 			alarmQueryStr := the.getEsAlarmValueStr(gap.StructId, StrValue) | |
| 			allAlarmThemes, err := the.OutEs.SearchAlarmThemeData(the.Info.IoConfig.Out.Es.AlarmIndex, alarmQueryStr) | |
| 			log.Printf("allAlarmThemes----- >  %s", allAlarmThemes) | |
| 			if err != nil { | |
| 				log.Printf("查询es 异常") | |
| 			} | |
| 			for _, alarmThemeName := range allAlarmThemes { | |
| 				StateValue := true | |
| 				if alarmThemeName.State == 0 || alarmThemeName.State == 1 || alarmThemeName.State == 2 { | |
| 					StateValue = false | |
| 				} | |
| 				if theme.SensorName == alarmThemeName.SensorName && StateValue == false { | |
| 					if offlineMin < float64(gap.OfflineGap) { | |
| 						alarmMsg := models.KafkaAlarm{ | |
| 							MessageMode:   "AlarmAutoElimination", | |
| 							StructureId:   gap.StructId, | |
| 							StructureName: gap.StructName, | |
| 							SourceId:      fmt.Sprintf("%d", theme.Sensor), | |
| 							SourceName:    theme.SensorName, | |
| 							AlarmTypeCode: "8004", | |
| 							AlarmCode:     "80040003", | |
| 							Content:       "", | |
| 							Time:          time.Now().Format("2006-01-02T15:04:05+0800"), | |
| 							SourceTypeId:  2, //   0:DTU, 1:传感器, 2:测点 | |
| 							Sponsor:       "goInOut_savoirTheme", | |
| 							Extras:        nil, | |
| 							SubDevices:    nil, | |
| 						} | |
| 						log.Printf("----- >  恢复告警已准备发送") | |
| 						payload, _ := json.Marshal(alarmMsg) | |
| 						the.InKafka.Publish(the.Info.IoConfig.In.Kafka.AlarmTopic, payload) | |
| 					} | |
| 				} | |
| 			} | |
| 			log.Printf("%f,%f", offlineMin, float64(gap.OfflineGap)) | |
| 			if offlineMin > float64(gap.OfflineGap) { | |
| 				msg := fmt.Sprintf("测点[%s]离线%f min > %d min", theme.SensorName, offlineMin, gap.OfflineGap) | |
| 				log.Printf("----- >  %s", msg) | |
| 				alarmDetails = msg | |
| 			} | |
| 
 | |
| 			log.Printf("len(alarmDetails) > 0 %d,%f", len(alarmDetails), float64(gap.OfflineGap)) | |
| 			if alarmDetails != "" { | |
| 				alarmMsg := models.KafkaAlarm{ | |
| 					MessageMode:   "AlarmGeneration", | |
| 					StructureId:   gap.StructId, | |
| 					StructureName: gap.StructName, | |
| 					SourceId:      fmt.Sprintf("%d", theme.Sensor), | |
| 					SourceName:    theme.SensorName, | |
| 					AlarmTypeCode: "8004", | |
| 					AlarmCode:     "80040003", | |
| 					Content:       alarmDetails, | |
| 					Time:          time.Now().Format("2006-01-02T15:04:05+0800"), | |
| 					SourceTypeId:  1, //   0:DTU, 1:传感器, 2:测点 | |
| 					Sponsor:       "goInOut_savoirTheme", | |
| 					Extras:        nil, | |
| 					SubDevices:    nil, | |
| 				} | |
| 
 | |
| 				payload, _ := json.Marshal(alarmMsg) | |
| 				the.InKafka.Publish(the.Info.IoConfig.In.Kafka.AlarmTopic, payload) | |
| 			} | |
| 
 | |
| 		} | |
| 
 | |
| 	} | |
| 
 | |
| } | |
| 
 | |
| // 提取方括号 [] 内的内容 | |
| func extractSensorName(alert string) string { | |
| 	// 正则表达式匹配方括号内的内容 | |
| 	re := regexp.MustCompile(`\[(.*?)\]`) | |
| 	match := re.FindStringSubmatch(alert) | |
| 
 | |
| 	// 如果匹配成功,返回匹配的内容(即方括号内的字符串) | |
| 	if len(match) > 1 { | |
| 		log.Printf("----- >  %s", match[1]) | |
| 		return match[1] | |
| 	} | |
| 
 | |
| 	return "" // 如果没有匹配到,返回空字符串 | |
| } | |
| 
 | |
| func (the *consumerSavoirTheme) getEsAlarmValueStr(structId int, alarmCode string) string { | |
| 
 | |
| 	esQuery := fmt.Sprintf(` | |
| { | |
|   "query": { | |
|     "bool": { | |
|       "must": [ | |
|         { | |
|           "term": { | |
|             "structure_id": { | |
|               "value": %d | |
|             } | |
|           } | |
|         }, | |
|         { | |
|           "term": { | |
|             "alarm_type_code": { | |
|               "value": %s | |
|             } | |
|           } | |
|         } | |
|       ] | |
|     } | |
|   } | |
| } | |
| `, structId, alarmCode) | |
| 	return esQuery | |
| } | |
| 
 | |
| func (the *consumerSavoirTheme) getESOfflineAlarmQueryStr(structId, factorId int) string { | |
| 
 | |
| 	esQuery := fmt.Sprintf(` | |
| { | |
|   "query": { | |
|     "bool": { | |
|       "must": [ | |
|         { | |
|           "term": { | |
|             "structure": { | |
|               "value": %d | |
|             } | |
|           } | |
|         }, | |
|         { | |
|           "term": { | |
|             "factor": { | |
|               "value": %d | |
|             } | |
|           } | |
|         } | |
|       ] | |
|     } | |
|   } | |
| } | |
| `, structId, factorId) | |
| 	return esQuery | |
| } | |
| 
 | |
| func (the *consumerSavoirTheme) sinkTask() { | |
| 	intervalSec := the.Info.IoConfig.Out.Es.Interval | |
| 	ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) | |
| 	defer ticker.Stop() | |
| 	for { | |
| 		<-ticker.C | |
| 		the.toSink() | |
| 	} | |
| } | |
| 
 | |
| func (the *consumerSavoirTheme) toSink() { | |
| 	var themes []models.EsTheme | |
| 	the.lock.Lock() | |
| 	defer the.lock.Unlock() | |
| 	the.sinkMap.Range(func(key, value any) bool { | |
| 		if v, ok := value.(*models.EsTheme); ok { | |
| 			themes = append(themes, *v) | |
| 			//零时打日志用 | |
| 			if v.Sensor == the.logTagId { | |
| 				bs, _ := json.Marshal(v) | |
| 				log.Printf("toSink -> Range 标记测点数据 [%d] %s ", the.logTagId, string(bs)) | |
| 			} | |
| 			return ok | |
| 		} else { | |
| 			log.Printf("!!!  toSink -> Range  类型转换异常 [%v]", key) | |
| 		} | |
| 		return true | |
| 	}) | |
| 	if len(themes) > 0 { | |
| 		index := the.Info.IoConfig.Out.Es.Index | |
| 		log.Printf("写入es [%s] %d条", index, len(themes)) | |
| 		the.OutEs.BulkWriteThemes2Es(index, themes) | |
| 		the.sinkMap.Clear() | |
| 	} | |
| } | |
| 
 | |
| func (the *consumerSavoirTheme) Work() { | |
| 	log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId) | |
| 	go the.sinkTask() | |
| 	go func() { | |
| 		for { | |
| 			pushEsTheme := <-the.dataCache | |
| 
 | |
| 			if pushEsTheme.Sensor == the.logTagId { | |
| 				bs, _ := json.Marshal(pushEsTheme) | |
| 				log.Printf("存储 标记测点数据 [%d] %s ", the.logTagId, string(bs)) | |
| 			} | |
| 
 | |
| 			//有效数据存入缓存 | |
| 			the.lock.Lock() | |
| 			the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme) | |
| 			the.lock.Unlock() | |
| 		} | |
| 
 | |
| 	}() | |
| } | |
| func (the *consumerSavoirTheme) onData(topic string, msg string) bool { | |
| 	//if len(msg) > 80 { | |
| 	//	log.Printf("recv:[%s]:%s ...", topic, msg[:80]) | |
| 	//} | |
| 	adaptor := adaptors.Adaptor_Savoir_LastTheme{} | |
| 
 | |
| 	needPush := adaptor.Transform(topic, msg) | |
| 
 | |
| 	if needPush != nil && needPush.Data != nil { | |
| 		the.dataCache <- needPush | |
| 	} else { | |
| 		s, _ := json.Marshal(needPush) | |
| 		if needPush != nil { | |
| 			if needPush.Sensor == the.logTagId { | |
| 				log.Printf("onData 测点[%d] 异常needPush=  %s", needPush.Sensor, s) | |
| 			} | |
| 		} | |
| 	} | |
| 
 | |
| 	return true | |
| }
 | |
| 
 |