You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							250 lines
						
					
					
						
							5.6 KiB
						
					
					
				
			
		
		
		
			
			
			
				
					
				
				
					
				
			
		
		
	
	
							250 lines
						
					
					
						
							5.6 KiB
						
					
					
				| package consumers | |
| 
 | |
| import ( | |
| 	"crypto/rc4" | |
| 	"encoding/hex" | |
| 	"fmt" | |
| 	"goInOut/adaptors" | |
| 	"goInOut/consumers/HBJCAS" | |
| 	"goInOut/dbOperate" | |
| 	"goInOut/monitors" | |
| 	"goInOut/utils" | |
| 	"gopkg.in/yaml.v3" | |
| 	"log" | |
| 	"time" | |
| ) | |
| 
 | |
| type consumerHBJCAS struct { | |
| 	//数据缓存管道 | |
| 	ch chan []adaptors.NeedPush | |
| 	//具体配置 | |
| 	Info    HBJCAS.ConfigFile | |
| 	InHttp  *dbOperate.HttpHelper | |
| 	outMqtt *dbOperate.MqttHelper | |
| 	monitor *monitors.CommonMonitor | |
| } | |
| 
 | |
| func (the *consumerHBJCAS) LoadConfigJson(cfgStr string) { | |
| 	// 将 yaml 格式的数据解析到结构体中 | |
| 	err := yaml.Unmarshal([]byte(cfgStr), &the.Info) | |
| 	if err != nil { | |
| 		log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) | |
| 		panic(err) | |
| 	} | |
| } | |
| 
 | |
| func (the *consumerHBJCAS) Initial(cfg string) error { | |
| 	the.LoadConfigJson(cfg) | |
| 	err := the.InputInitial() | |
| 	if err != nil { | |
| 		return err | |
| 	} | |
| 	err = the.OutputInitial() | |
| 	return err | |
| } | |
| func (the *consumerHBJCAS) InputInitial() error { | |
| 	the.ch = make(chan []adaptors.NeedPush, 200) | |
| 	//数据入口 | |
| 	the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""} | |
| 	the.monitor = &monitors.CommonMonitor{ | |
| 		MonitorHelper: &monitors.MonitorHelper{CronStr: the.Info.IoConfig.In.CronStr}, | |
| 	} | |
| 	the.monitor.Start() | |
| 	the.monitor.RegisterFun(the.getEsData) | |
| 	return nil | |
| } | |
| func (the *consumerHBJCAS) OutputInitial() error { | |
| 	//数据出口 | |
| 	the.outMqtt = dbOperate.MqttInitial( | |
| 		the.Info.IoConfig.Out.Mqtt.Host, | |
| 		the.Info.IoConfig.Out.Mqtt.Port, | |
| 		the.Info.IoConfig.Out.Mqtt.ClientId, | |
| 		the.Info.IoConfig.Out.Mqtt.UserName, | |
| 		the.Info.IoConfig.Out.Mqtt.Password, | |
| 		false, //按照具体项目来 | |
| 		"") | |
| 	return nil | |
| } | |
| func (the *consumerHBJCAS) Work() { | |
| 	go func() { | |
| 		for { | |
| 			needPushList := <-the.ch | |
| 			if len(the.ch) > 0 { | |
| 				log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) | |
| 			} | |
| 
 | |
| 			for _, push := range needPushList { | |
| 				if push.Topic != "" { | |
| 					the.outMqtt.Publish(push.Topic, push.Payload) | |
| 					continue | |
| 				} | |
| 
 | |
| 				//没有标记topic 的  按照配置文件里面的推送 | |
| 				for _, topic := range the.Info.IoConfig.Out.Mqtt.Topics { | |
| 					the.outMqtt.Publish(topic, push.Payload) | |
| 				} | |
| 
 | |
| 			} | |
| 
 | |
| 			time.Sleep(100 * time.Millisecond) | |
| 		} | |
| 	}() | |
| } | |
| 
 | |
| func (the *consumerHBJCAS) getAdaptor() (adaptor adaptors.Adaptor_AXYES_HBGL) { | |
| 
 | |
| 	return adaptors.Adaptor_AXYES_HBGL{} | |
| } | |
| 
 | |
| func (the *consumerHBJCAS) getStructIds() []int64 { | |
| 	var structIds []int64 | |
| 	for strutId, _ := range the.Info.PointInfo { | |
| 		structIds = append(structIds, strutId) | |
| 	} | |
| 	return structIds | |
| } | |
| func (the *consumerHBJCAS) getEsData() { | |
| 	start, end := utils.GetTimeRangeByHour(-1) | |
| 	log.Printf("查询数据时间范围 %s - %s", start, end) | |
| 	hourFactorIds := []int{20} //, 15, 20 , 28 | |
| 	structIds := the.getStructIds() | |
| 	for _, structId := range structIds { | |
| 		for _, factorId := range hourFactorIds { | |
| 			esQuery := the.getESQueryStrByHour(structId, factorId, start, end) | |
| 			auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} | |
| 			esAggResult := the.InHttp.HttpGetWithHeader(esQuery, auth) | |
| 
 | |
| 			adaptor := the.getAdaptor() | |
| 			adaptor.PointInfo = the.Info.PointInfo | |
| 			adaptor.StructInfo = the.Info.StructInfo | |
| 			needPushes := adaptor.Transform(structId, factorId, esAggResult) | |
| 			for i := range needPushes { | |
| 				needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) | |
| 			} | |
| 
 | |
| 			if len(needPushes) > 0 { | |
| 				the.ch <- needPushes | |
| 			} | |
| 		} | |
| 	} | |
| 
 | |
| } | |
| func (the *consumerHBJCAS) crc16rc4(transBytes []byte) []byte { | |
| 	resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes) | |
| 	needRC4 := append(transBytes, resultByCrc16...) | |
| 	rc4KeyStr, ok := the.Info.OtherInfo["rc4key"] | |
| 	if !ok { | |
| 		log.Panicf("未配置 rc4key") | |
| 	} | |
| 	rc4Key := []byte(rc4KeyStr) //the.RC4Key | |
| 	// 加密操作 | |
| 	dest1 := make([]byte, len(needRC4)) | |
| 	rc4.NewCipher(rc4Key) | |
| 	cipher1, _ := rc4.NewCipher(rc4Key) | |
| 	cipher1.XORKeyStream(dest1, needRC4) | |
| 	log.Printf("rc4加密结果=> %s ", hex.EncodeToString(dest1)) | |
| 	return dest1 | |
| } | |
| func (the *consumerHBJCAS) getESQueryStrByHour(structureId int64, factorId int, start, end string) string { | |
| 	aggSubSql := getEsAggSubSqlByFactorId(factorId) | |
| 	esQuery := fmt.Sprintf(` | |
| { | |
|   "size": 0, | |
|   "query": { | |
|     "bool": { | |
|       "must": [ | |
|         { | |
|           "term": { | |
|             "structure": { | |
|               "value": %d | |
|             } | |
|           } | |
|         }, | |
|         { | |
|           "term": { | |
|             "factor": { | |
|               "value": %d | |
|             } | |
|           } | |
|         }, | |
|         { | |
|           "range": { | |
|             "collect_time": { | |
|               "gte": "%s", | |
|               "lte": "%s" | |
|             } | |
|           } | |
|         } | |
|       ] | |
|     } | |
|   }, | |
|   "aggs": { | |
|     "groupSensor": { | |
|       "terms": { | |
|         "field": "sensor" | |
|       }, | |
|       "aggs": { | |
|         "groupDate": { | |
|           "date_histogram": { | |
|             "field": "collect_time", | |
|             "interval": "hour", | |
|             "time_zone": "Asia/Shanghai" | |
|           }, | |
|           "aggs": %s | |
|         } | |
|       } | |
|     } | |
|   } | |
| } | |
| `, structureId, factorId, start, end, aggSubSql) | |
| 
 | |
| 	return esQuery | |
| } | |
| 
 | |
| func getEsAggSubSqlByFactorId(factorId int) string { | |
| 	//桥墩倾斜 15   支座位移20  桥面振动28 | |
| 	subAggSQl := "" | |
| 	switch factorId { | |
| 	case 15: | |
| 		subAggSQl = ` | |
| { | |
|     "x": { | |
|         "extended_stats": { | |
|             "field": "data.x" | |
|         } | |
|     }, | |
|     "y": { | |
|         "extended_stats": { | |
|             "field": "data.y" | |
|         } | |
|     } | |
| }` | |
| 	case 20: | |
| 		subAggSQl = ` | |
| { | |
|     "displacement": { | |
|         "extended_stats": { | |
|             "field": "data.displacement" | |
|         } | |
|     } | |
| }` | |
| 	case 28: | |
| 		subAggSQl = ` | |
| { | |
|     "trms": { | |
|         "extended_stats": { | |
|             "field": "data.trms" | |
|         } | |
|     } | |
| }` | |
| 	} | |
| 	return subAggSQl | |
| } | |
| 
 | |
| func (the *consumerHBJCAS) getStructureId() string { | |
| 	structureId, ok := the.Info.OtherInfo["structureId"] | |
| 	if !ok { | |
| 		log.Panicf("无法识别有效的structureId") | |
| 	} | |
| 	return structureId | |
| }
 | |
| 
 |