97 changed files with 2672 additions and 354 deletions
			
			
		| @ -0,0 +1 @@ | |||||
|  | /logs/ | ||||
| @ -1,19 +1,21 @@ | |||||
| # goUpload | # goInOut | ||||
| ## 功能说明: | ## 功能说明: | ||||
| 功能:用于设备采集数据上报第三方。   | 功能:用于数据的输入输出处理,可应对各中数据处理场景,典型的设备采集数据上报第三方。   | ||||
| 数据入口->各类本地采集软件 | 数据入口->各类本地采集软件 | ||||
| 数据出口->各类第三方数据接口 | 数据出口->各类第三方数据接口 | ||||
| 部署环境:用于windows环境 | 部署环境:用于windows环境或k8s 镜像部署 | ||||
| 
 | 
 | ||||
| ## 启用配置: | ## 启用配置: | ||||
| goUpload.exe + configFiles目录下的json配置 | app.exe + configFiles目录下的json配置 | ||||
| (configFiles目录下有几个配置就会生效几个功能,实际部署时,不相干的项目配置不要放进去) | (configFiles目录下有几个配置就会生效几个功能,实际部署时,不相干的项目配置不要放进去) | ||||
| 
 | 
 | ||||
| 支持-软件列表: | 支持-软件列表: | ||||
| -------------------- | -------------------- | ||||
| **统一采集软件** | **统一采集软件_mqtt** | ||||
| 
 | 
 | ||||
| **DAAS振动软件** | **DAAS振动软件_mqtt** | ||||
| 
 | 
 | ||||
| **称重软件** | **称重软件_mqtt** | ||||
| **中科-光电txt** | **中科-光电txt** | ||||
|  | 
 | ||||
|  | **kafka** | ||||
|  | |||||
| @ -0,0 +1,84 @@ | |||||
|  | package adaptors | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"bytes" | ||||
|  | 	"encoding/binary" | ||||
|  | 	"encoding/json" | ||||
|  | 	"fmt" | ||||
|  | 	"goInOut/models" | ||||
|  | 	"goInOut/utils" | ||||
|  | 	"log" | ||||
|  | 	"time" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | // Adaptor_GDND2LA_JSNCGLQL   光电挠度2路安数据 转换 江苏农村公路桥梁监测系统
 | ||||
|  | type Adaptor_GDND2LA_JSNCGLQL struct { | ||||
|  | 	IdMap      map[string]string | ||||
|  | 	BridgeCode string | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the Adaptor_GDND2LA_JSNCGLQL) Transform(inTopic, rawMsg string) []NeedPush { | ||||
|  | 	gdnd := models.GDND2luAn{} | ||||
|  | 	json.Unmarshal([]byte(rawMsg), &gdnd) | ||||
|  | 	return the.RawToJSNCGLQL(gdnd) | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the Adaptor_GDND2LA_JSNCGLQL) RawToJSNCGLQL(raw models.GDND2luAn) (result []NeedPush) { | ||||
|  | 	Atime, err := time.Parse("2006-01-02 15:04:05", raw.Time) | ||||
|  | 	if err != nil { | ||||
|  | 		log.Printf("光电扰度 设备[%s] 数据时间 %s 解析错误", raw.Id, raw.Time) | ||||
|  | 		return | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	sensorDataList := raw.Value[raw.Id] | ||||
|  | 	//获取对方系统 id
 | ||||
|  | 	sensorCode := the.getSensorCode(raw.Id) | ||||
|  | 	if sensorCode == "" { | ||||
|  | 		log.Printf("光电扰度 设备[%s] 无匹配的 江苏农村公路桥梁监测系统 测点id,请检查配置文件", raw.Id) | ||||
|  | 		return | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	topic := fmt.Sprintf("data/%s/%s", the.BridgeCode, sensorCode) | ||||
|  | 
 | ||||
|  | 	var transBytes []byte | ||||
|  | 	//添加时间段
 | ||||
|  | 	transBytes = append(transBytes, the.getTimeBytes(Atime)...) | ||||
|  | 
 | ||||
|  | 	for _, sensorData := range sensorDataList { | ||||
|  | 
 | ||||
|  | 		//添加数据值段
 | ||||
|  | 		bs := utils.Float32ToBytes(sensorData) | ||||
|  | 		transBytes = append(transBytes, bs...) | ||||
|  | 	} | ||||
|  | 	result = append(result, NeedPush{ | ||||
|  | 		Topic:   topic, | ||||
|  | 		Payload: transBytes, | ||||
|  | 	}) | ||||
|  | 
 | ||||
|  | 	return result | ||||
|  | } | ||||
|  | func (the Adaptor_GDND2LA_JSNCGLQL) getSensorCode(rawSensorName string) string { | ||||
|  | 	v, isValid := the.IdMap[rawSensorName] | ||||
|  | 	if !isValid { | ||||
|  | 		v = "" | ||||
|  | 	} | ||||
|  | 	return v | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the Adaptor_GDND2LA_JSNCGLQL) getTimeBytes(sensorTime time.Time) []byte { | ||||
|  | 
 | ||||
|  | 	year := uint16(sensorTime.Year()) | ||||
|  | 	month := int8(sensorTime.Month()) | ||||
|  | 	day := int8(sensorTime.Day()) | ||||
|  | 	hour := int8(sensorTime.Hour()) | ||||
|  | 	minute := int8(sensorTime.Minute()) | ||||
|  | 	second := int8(sensorTime.Second()) | ||||
|  | 	bytesBuffer := bytes.NewBuffer([]byte{}) | ||||
|  | 	binary.Write(bytesBuffer, binary.BigEndian, year) | ||||
|  | 	binary.Write(bytesBuffer, binary.BigEndian, month) | ||||
|  | 	binary.Write(bytesBuffer, binary.BigEndian, day) | ||||
|  | 	binary.Write(bytesBuffer, binary.BigEndian, hour) | ||||
|  | 	binary.Write(bytesBuffer, binary.BigEndian, minute) | ||||
|  | 	binary.Write(bytesBuffer, binary.BigEndian, second) | ||||
|  | 	return bytesBuffer.Bytes() | ||||
|  | } | ||||
| @ -0,0 +1,47 @@ | |||||
|  | package adaptors | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"encoding/json" | ||||
|  | 	"fmt" | ||||
|  | 	"goInOut/consumers/SinoGnssMySQL" | ||||
|  | 	"strconv" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | // Adaptor_SINOMYSQL_AXYMQTT  数据 转换 江苏农村公路桥梁监测系统
 | ||||
|  | type Adaptor_SINOMYSQL_AXYMQTT struct { | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the Adaptor_SINOMYSQL_AXYMQTT) Transform(gnssDataList []SinoGnssMySQL.GnssData) []NeedPush { | ||||
|  | 	var needPush []NeedPush | ||||
|  | 	allDxFiles := make(map[string][]SinoGnssMySQL.DxFile) | ||||
|  | 	for _, gnssData := range gnssDataList { | ||||
|  | 		OnceDxFiles := allDxFiles[gnssData.GroupName] | ||||
|  | 		OnceDxFiles = append(OnceDxFiles, SinoGnssMySQL.DxFile{ | ||||
|  | 			Module:   gnssData.StationName, | ||||
|  | 			Channel:  1, | ||||
|  | 			Timespan: gnssData.Time.UnixMilli(), | ||||
|  | 			//file_mqtt协议里面只解析RV,  m=> mm
 | ||||
|  | 			RawValue:   []float64{gnssData.X * 1000, gnssData.Y * 1000, gnssData.H * 1000}, | ||||
|  | 			LimitValue: []float64{}, | ||||
|  | 			PhyValue:   []float64{}, | ||||
|  | 			ThemeValue: []float64{}, | ||||
|  | 		}) | ||||
|  | 		allDxFiles[gnssData.GroupName] = OnceDxFiles | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	for groupName, groupFileList := range allDxFiles { | ||||
|  | 		contentMap := make(map[string]string) | ||||
|  | 		for i, file := range groupFileList { | ||||
|  | 			bs, _ := json.Marshal(file) | ||||
|  | 			contentMap[strconv.Itoa(i)] = string(bs) | ||||
|  | 		} | ||||
|  | 		gpContent, _ := json.Marshal(contentMap) | ||||
|  | 		topic := fmt.Sprintf("SinoGnss/%s/", groupName) | ||||
|  | 		needPush = append(needPush, NeedPush{ | ||||
|  | 			Topic:   topic, | ||||
|  | 			Payload: gpContent, | ||||
|  | 		}) | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	return needPush | ||||
|  | } | ||||
| @ -0,0 +1,148 @@ | |||||
|  | package adaptors | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"encoding/json" | ||||
|  | 	"fmt" | ||||
|  | 	"goInOut/consumers/AXYraw" | ||||
|  | 	"goInOut/dbOperate" | ||||
|  | 	"goInOut/models" | ||||
|  | 	"log" | ||||
|  | 	"sync" | ||||
|  | 	"time" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | // Adaptor_AXY_LastRAW  安心云 kafka iota数据 转换 es设备数据
 | ||||
|  | type Adaptor_AXY_LastRAW struct { | ||||
|  | 	AXYraw.Info | ||||
|  | 	Redis *dbOperate.RedisHelper | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the Adaptor_AXY_LastRAW) Transform(topic, rawMsg string) *models.EsRaw { | ||||
|  | 	iotaData := models.IotaData{} | ||||
|  | 	json.Unmarshal([]byte(rawMsg), &iotaData) | ||||
|  | 	return the.raw2es(iotaData) | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the Adaptor_AXY_LastRAW) raw2es(iotaData models.IotaData) *models.EsRaw { | ||||
|  | 	if len(iotaData.Data.Data) == 0 { | ||||
|  | 		return nil | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	if !iotaData.Data.Success() { | ||||
|  | 		msg, _ := json.Marshal(iotaData.Data.Result) | ||||
|  | 		iotaData.Data.Data["errMsg"] = string(msg) | ||||
|  | 	} | ||||
|  | 	//log.Printf("设备[%s] 数据时间 %s", iotaData.DeviceId, iotaData.TriggerTime)
 | ||||
|  | 	deviceInfo := the.GetDeviceInfo(iotaData.DeviceId) | ||||
|  | 
 | ||||
|  | 	//查不到信息的数据
 | ||||
|  | 	if deviceInfo.Name == "" { | ||||
|  | 		log.Printf("设备[%s] 无deviceInfo信息 %s", iotaData.DeviceId, iotaData.TriggerTime) | ||||
|  | 		return nil | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	dataType := "" | ||||
|  | 	if _dataType, ok := iotaData.Data.Data["_data_type"]; ok { | ||||
|  | 		if v, ok := _dataType.(string); ok { | ||||
|  | 			dataType = v | ||||
|  | 		} | ||||
|  | 	} | ||||
|  | 	devdata := &models.DeviceData{ | ||||
|  | 		DeviceId:    iotaData.DeviceId, | ||||
|  | 		Name:        deviceInfo.Name, | ||||
|  | 		ThingId:     iotaData.ThingId, | ||||
|  | 		StructId:    deviceInfo.Structure.Id, | ||||
|  | 		AcqTime:     iotaData.TriggerTime, | ||||
|  | 		RealTime:    iotaData.RealTime, | ||||
|  | 		ErrCode:     0, | ||||
|  | 		Raw:         iotaData.Data.Data, | ||||
|  | 		DeviceInfo:  deviceInfo, | ||||
|  | 		DimensionId: iotaData.DimensionId, | ||||
|  | 		DataType:    dataType, | ||||
|  | 	} | ||||
|  | 	EsRaws := toEsRaw(devdata) | ||||
|  | 	return EsRaws | ||||
|  | } | ||||
|  | 
 | ||||
|  | var deviceInfoMap = map[string]models.DeviceInfo{} | ||||
|  | var deviceInfoMap2 = sync.Map{} | ||||
|  | 
 | ||||
|  | func (the Adaptor_AXY_LastRAW) GetDeviceInfo(deviceId string) models.DeviceInfo { | ||||
|  | 	if vObj, ok := deviceInfoMap2.Load(deviceId); ok { | ||||
|  | 		if v, ok := vObj.(models.DeviceInfo); ok { | ||||
|  | 			durationMin := time.Now().Sub(v.RefreshTime).Minutes() | ||||
|  | 			if durationMin < 5 { | ||||
|  | 				return v | ||||
|  | 			} | ||||
|  | 		} | ||||
|  | 	} | ||||
|  | 	v, err := the.GetDeviceInfoFromRedis(deviceId) | ||||
|  | 	if err != nil { | ||||
|  | 		log.Printf("设备%s 无法查询到对应信息", deviceId) | ||||
|  | 	} | ||||
|  | 	//deviceInfoMap[deviceId] = v
 | ||||
|  | 	deviceInfoMap2.Store(deviceId, v) | ||||
|  | 	return v | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the Adaptor_AXY_LastRAW) GetDeviceInfoFromRedis(deviceId string) (models.DeviceInfo, error) { | ||||
|  | 	Key_Iota_device := "iota_device" | ||||
|  | 	key_Thing_struct := "thing_struct" | ||||
|  | 	key_Iota_meta := "iota_meta" | ||||
|  | 	k1 := fmt.Sprintf("%s:%s", Key_Iota_device, deviceId) | ||||
|  | 	dev := models.IotaDevice{} | ||||
|  | 	thingStruct := models.ThingStruct{} | ||||
|  | 	devMeta := models.DeviceMeta{} | ||||
|  | 	err1 := the.Redis.GetObj(k1, &dev) | ||||
|  | 	if err1 != nil { | ||||
|  | 		return models.DeviceInfo{RefreshTime: time.Now()}, err1 | ||||
|  | 	} | ||||
|  | 	k2 := fmt.Sprintf("%s:%s", key_Thing_struct, dev.ThingId) | ||||
|  | 	err2 := the.Redis.GetObj(k2, &thingStruct) | ||||
|  | 	if err2 != nil { | ||||
|  | 		return models.DeviceInfo{RefreshTime: time.Now()}, err2 | ||||
|  | 	} | ||||
|  | 	k3 := fmt.Sprintf("%s:%s", key_Iota_meta, dev.DeviceMeta.Id) | ||||
|  | 	err3 := the.Redis.GetObj(k3, &devMeta) | ||||
|  | 	if err3 != nil { | ||||
|  | 		return models.DeviceInfo{RefreshTime: time.Now()}, err3 | ||||
|  | 	} | ||||
|  | 	//if err1 != nil || err2 != nil || err3 != nil {
 | ||||
|  | 	//	log.Printf("redis读取异常,err1=%s, err2=%s, err3=%s", err1, err2, err3)
 | ||||
|  | 	//	return models.DeviceInfo{}
 | ||||
|  | 	//}
 | ||||
|  | 
 | ||||
|  | 	s := models.Structure{ | ||||
|  | 		ThingId: thingStruct.ThingId, | ||||
|  | 		Id:      thingStruct.Id, | ||||
|  | 		Name:    thingStruct.Name, | ||||
|  | 		OrgId:   thingStruct.OrgId, | ||||
|  | 	} | ||||
|  | 	return models.DeviceInfo{ | ||||
|  | 		Id:          deviceId, | ||||
|  | 		Name:        dev.Name, | ||||
|  | 		Structure:   s, | ||||
|  | 		DeviceMeta:  devMeta, | ||||
|  | 		RefreshTime: time.Now(), | ||||
|  | 	}, nil | ||||
|  | } | ||||
|  | 
 | ||||
|  | func toEsRaw(deviceData *models.DeviceData) *models.EsRaw { | ||||
|  | 
 | ||||
|  | 	if deviceData == nil { | ||||
|  | 		return nil | ||||
|  | 	} | ||||
|  | 	var cstZone = time.FixedZone("CST", 8*3600) // 东八区 用以解决
 | ||||
|  | 	dataOutMeta := deviceData.DeviceInfo.DeviceMeta.GetOutputProps() | ||||
|  | 	createNativeRaw := models.EsRaw{ | ||||
|  | 		StructId:       deviceData.StructId, | ||||
|  | 		IotaDeviceName: deviceData.Name, | ||||
|  | 		Data:           deviceData.Raw, | ||||
|  | 		CollectTime:    deviceData.AcqTime.In(cstZone).Format("2006-01-02T15:04:05.000+0800"), | ||||
|  | 		Meta:           dataOutMeta, | ||||
|  | 		IotaDevice:     deviceData.DeviceId, | ||||
|  | 		CreateTime:     time.Now(), | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	return &createNativeRaw | ||||
|  | } | ||||
| @ -0,0 +1,132 @@ | |||||
|  | package adaptors | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"bytes" | ||||
|  | 	"encoding/binary" | ||||
|  | 	"encoding/json" | ||||
|  | 	"fmt" | ||||
|  | 	"goInOut/models" | ||||
|  | 	"goInOut/utils" | ||||
|  | 	"log" | ||||
|  | 	"strconv" | ||||
|  | 	"strings" | ||||
|  | 	"time" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | // Adaptor_ZD_JSNCGLQL  数据 转换 江苏农村公路桥梁监测系统
 | ||||
|  | type Adaptor_ZD_JSNCGLQL struct { | ||||
|  | 	IdMap      map[string]string | ||||
|  | 	BridgeCode string | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the Adaptor_ZD_JSNCGLQL) Transform(inTopic, rawMsg string) []NeedPush { | ||||
|  | 	zd := models.ZD{} | ||||
|  | 	err := json.Unmarshal([]byte(rawMsg), &zd) | ||||
|  | 	if err != nil { | ||||
|  | 		return nil | ||||
|  | 	} | ||||
|  | 	lowerTopic := strings.ToLower(inTopic) | ||||
|  | 	if strings.Contains(lowerTopic, "zdsl") || | ||||
|  | 		strings.Contains(lowerTopic, "cableforce") { | ||||
|  | 		return the.ZDSLtoJSNCGLQL(zd) | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	return the.ZDtoJSNCGLQL(zd) | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the Adaptor_ZD_JSNCGLQL) ZDtoJSNCGLQL(zd models.ZD) (result []NeedPush) { | ||||
|  | 	Atime := time.UnixMilli(zd.Ticks) | ||||
|  | 
 | ||||
|  | 	sensorDataList := zd.AccValues | ||||
|  | 	//获取对方系统 id
 | ||||
|  | 	sensorCode := the.getSensorCode(zd.Module) | ||||
|  | 	if sensorCode == "" { | ||||
|  | 		log.Printf("振动 设备[%s] 无匹配的 江苏农村公路桥梁监测系统 测点id,请检查配置文件", zd.Module) | ||||
|  | 		return | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	topic := fmt.Sprintf("data/%s/%s", the.BridgeCode, sensorCode) | ||||
|  | 	//数据秒数
 | ||||
|  | 	seconds := len(zd.AccValues) / zd.Frequency | ||||
|  | 
 | ||||
|  | 	for i := 0; i < seconds; i++ { | ||||
|  | 		var transBytes []byte | ||||
|  | 		onceTime := Atime.Add(time.Duration(i) * time.Second) | ||||
|  | 		//1.添加时间段
 | ||||
|  | 		transBytes = append(transBytes, the.getTimeBytes(onceTime)...) | ||||
|  | 
 | ||||
|  | 		startIndex := (0 + i) * zd.Frequency | ||||
|  | 		endIndex := (1 + i) * zd.Frequency | ||||
|  | 		if endIndex > len(sensorDataList) { | ||||
|  | 			endIndex = len(sensorDataList) | ||||
|  | 		} | ||||
|  | 		subDataList := sensorDataList[startIndex:endIndex] | ||||
|  | 
 | ||||
|  | 		for _, sensorData := range subDataList { | ||||
|  | 
 | ||||
|  | 			//4.添加数据值段
 | ||||
|  | 			bs := utils.Float32ToBytes(sensorData) | ||||
|  | 			transBytes = append(transBytes, bs...) | ||||
|  | 		} | ||||
|  | 		result = append(result, NeedPush{ | ||||
|  | 			Topic:   topic, | ||||
|  | 			Payload: transBytes, | ||||
|  | 		}) | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	return result | ||||
|  | } | ||||
|  | func (the Adaptor_ZD_JSNCGLQL) ZDSLtoJSNCGLQL(zd models.ZD) (result []NeedPush) { | ||||
|  | 	Atime := time.UnixMilli(zd.Ticks) | ||||
|  | 
 | ||||
|  | 	sensorDataList := zd.ThemeValue | ||||
|  | 	//获取对方系统 id
 | ||||
|  | 	sensorCode := the.getSensorCode(strconv.Itoa(zd.SensorId)) | ||||
|  | 	if sensorCode == "" { | ||||
|  | 		log.Printf("振动索力 设备[%s] 无匹配的 江苏农村公路桥梁监测系统 测点id,请检查配置文件", zd.Module) | ||||
|  | 		return | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	topic := fmt.Sprintf("data/%s/%s", the.BridgeCode, sensorCode) | ||||
|  | 
 | ||||
|  | 	var transBytes []byte | ||||
|  | 	//1.添加时间段
 | ||||
|  | 	transBytes = append(transBytes, the.getTimeBytes(Atime)...) | ||||
|  | 
 | ||||
|  | 	//数据值段
 | ||||
|  | 	bs := utils.Float32ToBytes(sensorDataList[0]) | ||||
|  | 	transBytes = append(transBytes, bs...) | ||||
|  | 
 | ||||
|  | 	result = append(result, NeedPush{ | ||||
|  | 		Topic:   topic, | ||||
|  | 		Payload: transBytes, | ||||
|  | 	}) | ||||
|  | 
 | ||||
|  | 	return result | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the Adaptor_ZD_JSNCGLQL) getSensorCode(rawSensorName string) string { | ||||
|  | 	v, isValid := the.IdMap[rawSensorName] | ||||
|  | 	if !isValid { | ||||
|  | 		v = "" | ||||
|  | 	} | ||||
|  | 	return v | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the Adaptor_ZD_JSNCGLQL) getTimeBytes(sensorTime time.Time) []byte { | ||||
|  | 
 | ||||
|  | 	year := uint16(sensorTime.Year()) | ||||
|  | 	month := int8(sensorTime.Month()) | ||||
|  | 	day := int8(sensorTime.Day()) | ||||
|  | 	hour := int8(sensorTime.Hour()) | ||||
|  | 	minute := int8(sensorTime.Minute()) | ||||
|  | 	second := int8(sensorTime.Second()) | ||||
|  | 	bytesBuffer := bytes.NewBuffer([]byte{}) | ||||
|  | 	binary.Write(bytesBuffer, binary.BigEndian, year) | ||||
|  | 	binary.Write(bytesBuffer, binary.BigEndian, month) | ||||
|  | 	binary.Write(bytesBuffer, binary.BigEndian, day) | ||||
|  | 	binary.Write(bytesBuffer, binary.BigEndian, hour) | ||||
|  | 	binary.Write(bytesBuffer, binary.BigEndian, minute) | ||||
|  | 	binary.Write(bytesBuffer, binary.BigEndian, second) | ||||
|  | 	return bytesBuffer.Bytes() | ||||
|  | } | ||||
| @ -0,0 +1,4 @@ | |||||
|  | FROM registry.ngaiot.com/base-images/alpine_3.20_cst:7 | ||||
|  | WORKDIR /app/ | ||||
|  | COPY   *.exe  configFiles  /app/ | ||||
|  | CMD ["/app/app.exe"] | ||||
| @ -1,24 +1,40 @@ | |||||
| podTemplate { | podTemplate { | ||||
|     node('pod-templ-jenkins-slave-golang') { |     node('pod-templ-jenkins-slave-golang') { | ||||
| 
 | 		 | ||||
| 		env.IMAGE_NAME = "${IOT_IMAGES_REGISTRY}/${LOCAL}/${JOB_NAME}" | 		env.IMAGE_NAME = "${IOT_IMAGES_REGISTRY}/${LOCAL}/${JOB_NAME}" | ||||
| 		env.IMAGE_NAME_SHORT = "${LOCAL}/${JOB_NAME}" | 		env.IMAGE_NAME_SHORT = "${LOCAL}/${JOB_NAME}" | ||||
| 		env.CODE_ADDR = "https://gitea.anxinyun.cn/lucas2/goUpload.git" | 		env.CODE_ADDR = "https://gitea.anxinyun.cn/lucas2/goUpload.git" | ||||
| 
 | 		 | ||||
| 	    stage('Run shell') { | 	    stage('Run shell') {  | ||||
| 		    git branch: 'dev', credentialsId: 'gitea-builder', url: "${CODE_ADDR}" | 		    git branch: 'dev', credentialsId: 'gitea-builder', url: "${CODE_ADDR}" | ||||
| 
 |  | ||||
| 		    container('golang-builder-1-23') { | 		    container('golang-builder-1-23') { | ||||
| 				sh''' | 				sh''' | ||||
| 				    echo "当前目===" | 				    git version | ||||
| 				    pwd | 				    git config --global --add url."https://builder:Fs7595!EAT@gitea.anxinyun.cn/".insteadOf "https://gitea.anxinyun.cn/" | ||||
| 				    ls | 				    unset GOPROXY | ||||
| 				    echo "========" | 				    go env -w GOPROXY=https://goproxy.cn,direct | ||||
| 				    /kaniko/executor --context=${BUILD_WORKSPACE} --dockerfile=build/Dockerfile --destination=${IMAGE_NAME}:${IMAGE_VERSION} --cache=false --cleanup | 				    go env -w GO111MODULE=on | ||||
|  | 				    go env -w GOPRIVATE=gitea.ngaiot.com,gitea.anxinyun.cn | ||||
|  | 				    go env -w GOSUMDB=sum.golang.org | ||||
|  | 				    go env | ||||
|  |                     go build  -a -v -o app.exe main.go | ||||
|  |                     tar -cvf app.tar *.exe configFiles | ||||
| 			    ''' | 			    ''' | ||||
| 		    } | 		    } | ||||
|  | 
 | ||||
|  | 		     container('image-builder') { | ||||
|  |             	sh''' | ||||
|  |             		 echo "当前目===" | ||||
|  |             		 pwd | ||||
|  |             		 ls | ||||
|  |             		 echo "========" | ||||
|  |             		 /kaniko/executor --context=${BUILD_WORKSPACE} --dockerfile=build/Dockerfile_app --destination=${IMAGE_NAME}:${IMAGE_VERSION} --cache=false --cleanup | ||||
|  |             	''' | ||||
|  |             } | ||||
|  |             archiveArtifacts artifacts: 'app.tar', followSymlinks: false | ||||
|  | 
 | ||||
| 		    buildName "${IMAGE_NAME_SHORT}:${IMAGE_VERSION}" | 		    buildName "${IMAGE_NAME_SHORT}:${IMAGE_VERSION}" | ||||
| 		    buildDescription "${IMAGE_NAME}:${IMAGE_VERSION}" | 		    buildDescription "${IMAGE_NAME}:${IMAGE_VERSION}"			 | ||||
| 		} | 		} | ||||
| 	} | 	} | ||||
| } | } | ||||
| @ -0,0 +1,24 @@ | |||||
|  | { | ||||
|  |   "consumer": "consumerSinoGnssMySQL", | ||||
|  |   "ioConfig": { | ||||
|  |     "in": { | ||||
|  |       "db": { | ||||
|  |         "type": "mysql", | ||||
|  |         "connStr": "root:Xuchen@2024@tcp(39.105.5.154:3306)/navi_cloud_sinognss?charset=utf8&parseTime=true" | ||||
|  |       }, | ||||
|  |       "cronStr": "0/1 * * * *" | ||||
|  |     }, | ||||
|  |     "out": { | ||||
|  |       "mqtt": { | ||||
|  |         "host": "10.8.30.160", | ||||
|  |         "port": 30883, | ||||
|  |         "userName": "upload", | ||||
|  |         "password": "", | ||||
|  |         "clientId": "goInOut_SinoGnssMySQL", | ||||
|  |         "Topics": [] | ||||
|  |       } | ||||
|  |     } | ||||
|  |   }, | ||||
|  |   "info": { | ||||
|  |   } | ||||
|  | } | ||||
| @ -0,0 +1,36 @@ | |||||
|  | { | ||||
|  |   "consumer": "consumerAXYraw", | ||||
|  |   "ioConfig": { | ||||
|  |     "in": { | ||||
|  |       "kafka": { | ||||
|  |         "brokers": [ | ||||
|  |           "10.8.30.160:30992" | ||||
|  |         ], | ||||
|  |         "groupId": "synchronizeRaw_50", | ||||
|  |         "topics": [ | ||||
|  |           "RawData" | ||||
|  |         ] | ||||
|  |       } | ||||
|  |     }, | ||||
|  |     "out": { | ||||
|  |       "es": { | ||||
|  |         "address": ["http://10.8.30.160:30092"], | ||||
|  |         "index": "anxincloud_last_raw", | ||||
|  |         "auth": { | ||||
|  |           "userName": "post", | ||||
|  |           "password": "123" | ||||
|  |         }, | ||||
|  |         "interval": 30 | ||||
|  |       } | ||||
|  |     } | ||||
|  |   }, | ||||
|  |   "info": { | ||||
|  |     "common": { | ||||
|  |     }, | ||||
|  |     "queryComponent":{ | ||||
|  |       "redis": { | ||||
|  |         "address": "10.8.30.142:30379" | ||||
|  |       } | ||||
|  |     } | ||||
|  |   } | ||||
|  | } | ||||
| @ -0,0 +1,30 @@ | |||||
|  | { | ||||
|  |   "consumer": "consumerHttpProxy", | ||||
|  |   "ioConfig": { | ||||
|  |     "in": { | ||||
|  |       "apiServer": { | ||||
|  |         "port": 7000, | ||||
|  |         "userName": "usr", | ||||
|  |         "password": "123", | ||||
|  |         "routers": [ | ||||
|  |           { | ||||
|  |             "router": "POST /upload/work/ABStatus", | ||||
|  |             "idTemplate": "{{.values.ucId}}" | ||||
|  |           }, | ||||
|  |           { | ||||
|  |             "router": "POST /upload/work/ABSHeart", | ||||
|  |             "idTemplate": "{{.Values.WorkId}}" | ||||
|  |           } | ||||
|  |         ] | ||||
|  |       } | ||||
|  |     }, | ||||
|  |     "out": { | ||||
|  |       "httpPost": { | ||||
|  |         "url": "http://218.3.126.49:60000", | ||||
|  |         "method": "post" | ||||
|  |       } | ||||
|  |     } | ||||
|  |   }, | ||||
|  |   "info": { | ||||
|  |   } | ||||
|  | } | ||||
| @ -1 +0,0 @@ | |||||
| package constKey |  | ||||
| @ -0,0 +1,31 @@ | |||||
|  | package AXYraw | ||||
|  | 
 | ||||
|  | import "goInOut/config" | ||||
|  | 
 | ||||
|  | type ConfigFile struct { | ||||
|  | 	config.Consumer | ||||
|  | 	IoConfig ioConfig `json:"ioConfig"` | ||||
|  | 	Info     Info     `json:"info"` | ||||
|  | } | ||||
|  | type ioConfig struct { | ||||
|  | 	In  in  `json:"in"` | ||||
|  | 	Out out `json:"out"` | ||||
|  | } | ||||
|  | type in struct { | ||||
|  | 	Kafka config.KafkaConfig `json:"kafka"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | type out struct { | ||||
|  | 	Es config.EsConfig `json:"es"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | type Info struct { | ||||
|  | 	Common         map[string]string `json:"common"` | ||||
|  | 	QueryComponent queryComponent    `json:"queryComponent"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | type queryComponent struct { | ||||
|  | 	Redis struct { | ||||
|  | 		Address string `json:"address"` | ||||
|  | 	} `json:"redis"` | ||||
|  | } | ||||
| @ -0,0 +1,25 @@ | |||||
|  | package HTTP_PRPXY | ||||
|  | 
 | ||||
|  | import "goInOut/config" | ||||
|  | 
 | ||||
|  | type ConfigFile struct { | ||||
|  | 	config.Consumer | ||||
|  | 	IoConfig  ioConfig          `json:"ioConfig"` | ||||
|  | 	OtherInfo map[string]string `json:"info"` | ||||
|  | } | ||||
|  | type ioConfig struct { | ||||
|  | 	In  In  `json:"in"` | ||||
|  | 	Out OUT `json:"out"` | ||||
|  | } | ||||
|  | type In struct { | ||||
|  | 	ApiServer config.ApiServerConfig `json:"apiServer"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | type OUT struct { | ||||
|  | 	HttpPost config.HttpConfig `json:"httpPost"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | type SensorInfo struct { | ||||
|  | 	Name string `json:"name"` //测点名称
 | ||||
|  | 	Code string `json:"code"` //测点编号 宜由“桥名简称-监测类别简称-构件类型编码-截面序号-构件序号-测点编号”组成
 | ||||
|  | } | ||||
| @ -0,0 +1,44 @@ | |||||
|  | package HTTP_PRPXY | ||||
|  | 
 | ||||
|  | type ABStatus struct { | ||||
|  | 	Values ABStatusValues `json:"values"` | ||||
|  | 	Secret string         `json:"secret"` | ||||
|  | 	Type   string         `json:"type"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | type ABStatusValues struct { | ||||
|  | 	CreateTime            int64  `json:"createTime"` | ||||
|  | 	Updater               string `json:"updater"` | ||||
|  | 	Deleted               bool   `json:"deleted"` | ||||
|  | 	Id                    string `json:"id"` | ||||
|  | 	WorkId                string `json:"workId"` | ||||
|  | 	UcId                  string `json:"ucId"` | ||||
|  | 	DeviceCode            string `json:"deviceCode"` | ||||
|  | 	UserId                string `json:"userId"` | ||||
|  | 	UserName              string `json:"userName"` | ||||
|  | 	DataTime              int64  `json:"dataTime"` | ||||
|  | 	BeltStatus            string `json:"beltStatus"` | ||||
|  | 	DeviceStatus          string `json:"deviceStatus"` | ||||
|  | 	StartDate             int64  `json:"startDate"` | ||||
|  | 	EndDate               int64  `json:"endDate"` | ||||
|  | 	MainVol               int    `json:"mainVol"` | ||||
|  | 	AscendWorkCertificate string `json:"ascend_work_certificate"` | ||||
|  | 	BeltPoseLeft          string `json:"beltPoseLeft"` | ||||
|  | 	BeltPoseRight         string `json:"beltPoseRight"` | ||||
|  | 	BeltStatusLeft        string `json:"beltStatusLeft"` | ||||
|  | 	BeltStatusRight       string `json:"beltStatusRight"` | ||||
|  | 	TickbVol              int    `json:"tickbVol"` | ||||
|  | 	//Avatar                interface{} `json:"avatar"`
 | ||||
|  | 	//AlarmType             interface{} `json:"alarmType"`
 | ||||
|  | 	//Identity              interface{} `json:"identity"`
 | ||||
|  | 	//IdentityImg           interface{} `json:"identityImg"`
 | ||||
|  | 	//IdentityImgF          interface{} `json:"identityImgF"`
 | ||||
|  | 	//Phone                 interface{} `json:"phone"`
 | ||||
|  | 	Versions int `json:"versions"` | ||||
|  | 	//Team                  interface{} `json:"team"`
 | ||||
|  | 	Longitude string `json:"longitude"` | ||||
|  | 	Latitude  string `json:"latitude"` | ||||
|  | 	Outsource string `json:"outsource"` | ||||
|  | 	LocTime   int64  `json:"locTime"` | ||||
|  | 	//WorkLen               interface{} `json:"workLen"`
 | ||||
|  | } | ||||
| @ -0,0 +1,27 @@ | |||||
|  | package SinoGnssMySQL | ||||
|  | 
 | ||||
|  | import "goInOut/config" | ||||
|  | 
 | ||||
|  | type ConfigFile struct { | ||||
|  | 	config.Consumer | ||||
|  | 	IoConfig  ioConfig          `json:"ioConfig"` | ||||
|  | 	OtherInfo map[string]string `json:"info"` | ||||
|  | } | ||||
|  | type ioConfig struct { | ||||
|  | 	In  In  `json:"in"` | ||||
|  | 	Out OUT `json:"out"` | ||||
|  | } | ||||
|  | type In struct { | ||||
|  | 	Db      config.DbConfig `json:"db"` | ||||
|  | 	CronStr string          `json:"cronStr"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | type OUT struct { | ||||
|  | 	Mqtt config.MqttConfig `json:"mqtt"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | // 缓存用
 | ||||
|  | type RecordInfo struct { | ||||
|  | 	Id        int64  `json:"id"` | ||||
|  | 	TableName string `json:"table_name"` | ||||
|  | } | ||||
| @ -0,0 +1,28 @@ | |||||
|  | package SinoGnssMySQL | ||||
|  | 
 | ||||
|  | import "time" | ||||
|  | 
 | ||||
|  | type GnssData struct { | ||||
|  | 	Id          int64     `json:"id" db:"id"` | ||||
|  | 	StationName string    `json:"station_name" db:"station_name"` | ||||
|  | 	GroupName   string    `json:"group_name" db:"group_name"` | ||||
|  | 	Time        time.Time `json:"time" db:"time"` | ||||
|  | 	X           float64   `json:"x" db:"x"` | ||||
|  | 	Y           float64   `json:"y" db:"y"` | ||||
|  | 	H           float64   `json:"h" db:"h"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | type DxFile struct { | ||||
|  | 	SensorId   int       `json:"S"` | ||||
|  | 	Module     string    `json:"M"` | ||||
|  | 	Channel    int       `json:"C"` | ||||
|  | 	Error      int       `json:"R"` | ||||
|  | 	Round      int64     `json:"N"` | ||||
|  | 	Timespan   int64     `json:"T"` | ||||
|  | 	Req        []string  `json:"Q"` | ||||
|  | 	Acq        []string  `json:"A"` | ||||
|  | 	RawValue   []float64 `json:"RV"` | ||||
|  | 	LimitValue []float64 `json:"LV"` | ||||
|  | 	PhyValue   []float64 `json:"PV"` | ||||
|  | 	ThemeValue []float64 `json:"TV"` | ||||
|  | } | ||||
| @ -0,0 +1,162 @@ | |||||
|  | package consumers | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"encoding/json" | ||||
|  | 	"goInOut/adaptors" | ||||
|  | 	"goInOut/consumers/AXYraw" | ||||
|  | 	"goInOut/dbOperate" | ||||
|  | 	"goInOut/dbOperate/_kafka" | ||||
|  | 	"goInOut/models" | ||||
|  | 	"log" | ||||
|  | 	"sync" | ||||
|  | 	"time" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | type consumerAXYraw struct { | ||||
|  | 	//数据缓存管道
 | ||||
|  | 	dataCache chan *models.EsRaw | ||||
|  | 	//具体配置
 | ||||
|  | 	ConfigInfo AXYraw.ConfigFile | ||||
|  | 	InKafka    _kafka.KafkaHelper | ||||
|  | 	OutEs      dbOperate.ESHelper | ||||
|  | 	infoRedis  *dbOperate.RedisHelper | ||||
|  | 	sinkRawMap sync.Map | ||||
|  | 	lock       sync.Mutex | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *consumerAXYraw) LoadConfigJson(cfgStr string) { | ||||
|  | 	// 将 JSON 格式的数据解析到结构体中
 | ||||
|  | 	err := json.Unmarshal([]byte(cfgStr), &the.ConfigInfo) | ||||
|  | 	if err != nil { | ||||
|  | 		log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) | ||||
|  | 		panic(err) | ||||
|  | 	} | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *consumerAXYraw) Initial(cfg string) error { | ||||
|  | 	the.sinkRawMap = sync.Map{} | ||||
|  | 	the.dataCache = make(chan *models.EsRaw, 200) | ||||
|  | 
 | ||||
|  | 	the.LoadConfigJson(cfg) | ||||
|  | 	err := the.inputInitial() | ||||
|  | 	if err != nil { | ||||
|  | 		return err | ||||
|  | 	} | ||||
|  | 	err = the.outputInitial() | ||||
|  | 	if err != nil { | ||||
|  | 		return err | ||||
|  | 	} | ||||
|  | 	err = the.infoComponentInitial() | ||||
|  | 	return err | ||||
|  | } | ||||
|  | func (the *consumerAXYraw) inputInitial() error { | ||||
|  | 	//数据入口
 | ||||
|  | 	the.InKafka = _kafka.KafkaHelper{ | ||||
|  | 		Brokers: the.ConfigInfo.IoConfig.In.Kafka.Brokers, | ||||
|  | 		GroupId: the.ConfigInfo.IoConfig.In.Kafka.GroupId, | ||||
|  | 	} | ||||
|  | 	the.InKafka.Initial() | ||||
|  | 	for _, inTopic := range the.ConfigInfo.IoConfig.In.Kafka.Topics { | ||||
|  | 		the.InKafka.Subscribe(inTopic, the.onData) | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	the.InKafka.Worker() | ||||
|  | 	return nil | ||||
|  | } | ||||
|  | func (the *consumerAXYraw) outputInitial() error { | ||||
|  | 	//数据出口
 | ||||
|  | 	the.OutEs = *dbOperate.NewESHelper( | ||||
|  | 		the.ConfigInfo.IoConfig.Out.Es.Address, | ||||
|  | 		the.ConfigInfo.IoConfig.Out.Es.Auth.UserName, | ||||
|  | 		the.ConfigInfo.IoConfig.Out.Es.Auth.Password, | ||||
|  | 	) | ||||
|  | 
 | ||||
|  | 	return nil | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *consumerAXYraw) infoComponentInitial() error { | ||||
|  | 	//数据出口
 | ||||
|  | 	addr := the.ConfigInfo.Info.QueryComponent.Redis.Address | ||||
|  | 	the.infoRedis = dbOperate.NewRedisHelper("", addr) | ||||
|  | 
 | ||||
|  | 	return nil | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *consumerAXYraw) sinkTask() { | ||||
|  | 	intervalSec := the.ConfigInfo.IoConfig.Out.Es.Interval | ||||
|  | 	ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) | ||||
|  | 	defer ticker.Stop() | ||||
|  | 	for { | ||||
|  | 		<-ticker.C | ||||
|  | 		the.toSink() | ||||
|  | 	} | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *consumerAXYraw) toSink() { | ||||
|  | 	var raws []models.EsRaw | ||||
|  | 	the.lock.Lock() | ||||
|  | 	defer the.lock.Unlock() | ||||
|  | 	the.sinkRawMap.Range(func(key, value any) bool { | ||||
|  | 		if v, ok := value.(*models.EsRaw); ok { | ||||
|  | 			raws = append(raws, *v) | ||||
|  | 			//零时打日志用
 | ||||
|  | 			if v.IotaDevice == logTagDeviceId { | ||||
|  | 				bs, _ := json.Marshal(v) | ||||
|  | 				log.Printf("toSink -> Range 标记设备数据 [%s] %s ", logTagDeviceId, string(bs)) | ||||
|  | 			} | ||||
|  | 			return ok | ||||
|  | 		} else { | ||||
|  | 			log.Printf("!!!  toSink -> Range  类型转换异常 [%v]", key) | ||||
|  | 		} | ||||
|  | 		return true | ||||
|  | 	}) | ||||
|  | 	if len(raws) > 0 { | ||||
|  | 		log.Printf("准备写入es %d条", len(raws)) | ||||
|  | 		index := the.ConfigInfo.IoConfig.Out.Es.Index | ||||
|  | 		the.OutEs.BulkWriteRaws2Es(index, raws) | ||||
|  | 		the.sinkRawMap.Clear() | ||||
|  | 	} | ||||
|  | } | ||||
|  | 
 | ||||
|  | const logTagDeviceId = "91da4d1f-fbc7-4dad-bedd-f8ff05c0e0e0" | ||||
|  | 
 | ||||
|  | func (the *consumerAXYraw) Work() { | ||||
|  | 	go the.sinkTask() | ||||
|  | 	go func() { | ||||
|  | 		for { | ||||
|  | 			pushEsRaw := <-the.dataCache | ||||
|  | 
 | ||||
|  | 			if pushEsRaw.IotaDevice == logTagDeviceId { | ||||
|  | 				bs, _ := json.Marshal(pushEsRaw) | ||||
|  | 				log.Printf("存储 标记设备数据 [%s] %s ", logTagDeviceId, string(bs)) | ||||
|  | 			} | ||||
|  | 
 | ||||
|  | 			//有效数据存入缓存
 | ||||
|  | 			the.lock.Lock() | ||||
|  | 			the.sinkRawMap.Store(pushEsRaw.IotaDevice, pushEsRaw) | ||||
|  | 			the.lock.Unlock() | ||||
|  | 		} | ||||
|  | 
 | ||||
|  | 	}() | ||||
|  | } | ||||
|  | func (the *consumerAXYraw) onData(topic string, msg string) bool { | ||||
|  | 	//if len(msg) > 80 {
 | ||||
|  | 	//	log.Printf("recv:[%s]:%s ...", topic, msg[:80])
 | ||||
|  | 	//}
 | ||||
|  | 	adaptor := adaptors.Adaptor_AXY_LastRAW{ | ||||
|  | 		Redis: the.infoRedis, | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	needPush := adaptor.Transform(topic, msg) | ||||
|  | 
 | ||||
|  | 	if needPush != nil && len(needPush.Meta) > 0 && needPush.Data != nil { | ||||
|  | 		//日志标记
 | ||||
|  | 		if needPush.IotaDevice == logTagDeviceId { | ||||
|  | 			bs, _ := json.Marshal(needPush) | ||||
|  | 			log.Printf("onData -> needPush 标记设备数据 [%s] %s ", logTagDeviceId, string(bs)) | ||||
|  | 		} | ||||
|  | 		the.dataCache <- needPush | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	return true | ||||
|  | } | ||||
| @ -0,0 +1,102 @@ | |||||
|  | package consumers | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"encoding/json" | ||||
|  | 	"fmt" | ||||
|  | 	"goInOut/config" | ||||
|  | 	"goInOut/consumers/HTTP_PRPXY" | ||||
|  | 	"goInOut/dbOperate" | ||||
|  | 	"goInOut/utils" | ||||
|  | 	"io" | ||||
|  | 	"log" | ||||
|  | 	"net/http" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | type consumerHttpProxy struct { | ||||
|  | 	//数据缓存管道
 | ||||
|  | 	routes map[string]config.Router | ||||
|  | 	//具体配置
 | ||||
|  | 	Info        HTTP_PRPXY.ConfigFile | ||||
|  | 	InApiServer *dbOperate.ApiServerHelper | ||||
|  | 	outHttpPost *dbOperate.HttpHelper | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *consumerHttpProxy) LoadConfigJson(cfgStr string) { | ||||
|  | 	// 将 JSON 格式的数据解析到结构体中
 | ||||
|  | 	err := json.Unmarshal([]byte(cfgStr), &the.Info) | ||||
|  | 	if err != nil { | ||||
|  | 		log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) | ||||
|  | 		panic(err) | ||||
|  | 	} | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *consumerHttpProxy) Initial(cfg string) error { | ||||
|  | 	the.routes = map[string]config.Router{} | ||||
|  | 	the.LoadConfigJson(cfg) | ||||
|  | 	err := the.InputInitial() | ||||
|  | 	if err != nil { | ||||
|  | 		return err | ||||
|  | 	} | ||||
|  | 	err = the.OutputInitial() | ||||
|  | 	return err | ||||
|  | } | ||||
|  | func (the *consumerHttpProxy) InputInitial() error { | ||||
|  | 	//数据入口
 | ||||
|  | 	the.InApiServer = dbOperate.NewApiServer( | ||||
|  | 		the.Info.IoConfig.In.ApiServer.Port, | ||||
|  | 		the.Info.IoConfig.In.ApiServer.Routers, | ||||
|  | 	) | ||||
|  | 	the.InApiServer.Initial() | ||||
|  | 	return nil | ||||
|  | } | ||||
|  | func (the *consumerHttpProxy) OutputInitial() error { | ||||
|  | 	//数据出口
 | ||||
|  | 	the.outHttpPost = &dbOperate.HttpHelper{ | ||||
|  | 		Url: the.Info.IoConfig.Out.HttpPost.Url, | ||||
|  | 	} | ||||
|  | 	the.outHttpPost.Initial() | ||||
|  | 	return nil | ||||
|  | } | ||||
|  | func (the *consumerHttpProxy) Work() { | ||||
|  | 	go func() { | ||||
|  | 		for _, router := range the.Info.IoConfig.In.ApiServer.Routers { | ||||
|  | 			the.InApiServer.RouteRegister(router.Router, the.onData) | ||||
|  | 			the.routes[router.Router] = router | ||||
|  | 		} | ||||
|  | 		the.InApiServer.Run() | ||||
|  | 
 | ||||
|  | 	}() | ||||
|  | } | ||||
|  | func (the *consumerHttpProxy) onData(w http.ResponseWriter, r *http.Request) { | ||||
|  | 	w.Header().Set("Content-Type", "application/json") | ||||
|  | 	body, err := io.ReadAll(r.Body) | ||||
|  | 
 | ||||
|  | 	log.Printf("收到 %s 请求 %s", r.RequestURI, body) | ||||
|  | 
 | ||||
|  | 	bodyObj := map[string]any{} | ||||
|  | 	err = json.Unmarshal(body, &bodyObj) | ||||
|  | 	if err != nil { | ||||
|  | 		log.Printf("body 解析失败,请检查 %s", err.Error()) | ||||
|  | 		return | ||||
|  | 	} | ||||
|  | 	routePath := fmt.Sprintf("%s %s", r.Method, r.RequestURI) | ||||
|  | 	idTemplate := the.routes[routePath] | ||||
|  | 
 | ||||
|  | 	id, err := utils.TextTemplateMatch(bodyObj, idTemplate.IdTemplate) | ||||
|  | 	if err != nil { | ||||
|  | 		log.Printf("解析id 失败,请检查 %s", err.Error()) | ||||
|  | 	} | ||||
|  | 	params := map[string]string{ | ||||
|  | 		"id": id, | ||||
|  | 	} | ||||
|  | 	resp := "" | ||||
|  | 	//沿用请求路由
 | ||||
|  | 	newUrl := the.outHttpPost.Url + r.URL.String() | ||||
|  | 	resp, err = the.outHttpPost.HttpPostWithParams(newUrl, string(body), params) | ||||
|  | 	if err != nil { | ||||
|  | 		return | ||||
|  | 	} | ||||
|  | 	log.Printf("应答: %s", resp) | ||||
|  | 	defer fmt.Fprintf(w, resp) | ||||
|  | 
 | ||||
|  | } | ||||
| @ -0,0 +1,174 @@ | |||||
|  | package consumers | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"encoding/json" | ||||
|  | 	"fmt" | ||||
|  | 	"goInOut/adaptors" | ||||
|  | 	"goInOut/consumers/SinoGnssMySQL" | ||||
|  | 	"goInOut/dbOperate" | ||||
|  | 	"goInOut/monitors" | ||||
|  | 	"goInOut/utils" | ||||
|  | 	"log" | ||||
|  | 	"os" | ||||
|  | 	"time" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | type consumerSinoGnssMySQL struct { | ||||
|  | 	//数据缓存管道
 | ||||
|  | 	ch chan []adaptors.NeedPush | ||||
|  | 	//具体配置
 | ||||
|  | 	Info    SinoGnssMySQL.ConfigFile | ||||
|  | 	InDB    *dbOperate.DBHelper | ||||
|  | 	outMqtt *dbOperate.MqttHelper | ||||
|  | 	monitor *monitors.CommonMonitor | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *consumerSinoGnssMySQL) LoadConfigJson(cfgStr string) { | ||||
|  | 	// 将 JSON 格式的数据解析到结构体中
 | ||||
|  | 	err := json.Unmarshal([]byte(cfgStr), &the.Info) | ||||
|  | 	if err != nil { | ||||
|  | 		log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) | ||||
|  | 		panic(err) | ||||
|  | 	} | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *consumerSinoGnssMySQL) Initial(cfg string) error { | ||||
|  | 	the.LoadConfigJson(cfg) | ||||
|  | 	err := the.InputInitial() | ||||
|  | 	if err != nil { | ||||
|  | 		return err | ||||
|  | 	} | ||||
|  | 	err = the.OutputInitial() | ||||
|  | 	return err | ||||
|  | } | ||||
|  | func (the *consumerSinoGnssMySQL) InputInitial() error { | ||||
|  | 	the.ch = make(chan []adaptors.NeedPush, 200) | ||||
|  | 	//数据入口
 | ||||
|  | 	the.InDB = dbOperate.NewDBHelper( | ||||
|  | 		the.Info.IoConfig.In.Db.Type, | ||||
|  | 		the.Info.IoConfig.In.Db.ConnStr) | ||||
|  | 	the.monitor = &monitors.CommonMonitor{ | ||||
|  | 		MonitorHelper: &monitors.MonitorHelper{CronStr: the.Info.IoConfig.In.CronStr}, | ||||
|  | 	} | ||||
|  | 	the.monitor.Start() | ||||
|  | 	the.monitor.RegisterFun(the.onData) | ||||
|  | 	return nil | ||||
|  | } | ||||
|  | func (the *consumerSinoGnssMySQL) OutputInitial() error { | ||||
|  | 	//数据出口
 | ||||
|  | 	the.outMqtt = dbOperate.MqttInitial( | ||||
|  | 		the.Info.IoConfig.Out.Mqtt.Host, | ||||
|  | 		the.Info.IoConfig.Out.Mqtt.Port, | ||||
|  | 		the.Info.IoConfig.Out.Mqtt.ClientId, | ||||
|  | 		the.Info.IoConfig.Out.Mqtt.UserName, | ||||
|  | 		the.Info.IoConfig.Out.Mqtt.Password, | ||||
|  | 		false, //按照具体项目来
 | ||||
|  | 		"") | ||||
|  | 	return nil | ||||
|  | } | ||||
|  | func (the *consumerSinoGnssMySQL) Work() { | ||||
|  | 	go func() { | ||||
|  | 		for { | ||||
|  | 			needPushList := <-the.ch | ||||
|  | 			if len(the.ch) > 0 { | ||||
|  | 				log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) | ||||
|  | 			} | ||||
|  | 
 | ||||
|  | 			for _, push := range needPushList { | ||||
|  | 				if push.Topic != "" { | ||||
|  | 					the.outMqtt.Publish(push.Topic, push.Payload) | ||||
|  | 				} | ||||
|  | 			} | ||||
|  | 
 | ||||
|  | 			time.Sleep(100 * time.Millisecond) | ||||
|  | 		} | ||||
|  | 	}() | ||||
|  | } | ||||
|  | func (the *consumerSinoGnssMySQL) onData() { | ||||
|  | 	recordInfo, err := readRecord() | ||||
|  | 	if err != nil { | ||||
|  | 		log.Printf("读取 缓存异常,err=%v", err.Error()) | ||||
|  | 		return | ||||
|  | 	} | ||||
|  | 	sql := fmt.Sprintf(`select d.id,d.station_name,p.group_name,d.time,d.x,d.y,d.h from %s as d  | ||||
|  | LEFT JOIN datasolution as p | ||||
|  | ON d.station_name=p.sn | ||||
|  | where p.group_name is not null | ||||
|  | and d.id > %d and d.id <= %d | ||||
|  | ORDER BY p.group_name;`, recordInfo.TableName, recordInfo.Id, recordInfo.Id+200) | ||||
|  | 	var GnssDatas []SinoGnssMySQL.GnssData | ||||
|  | 	err = the.InDB.Query(&GnssDatas, sql) | ||||
|  | 	if err != nil || len(GnssDatas) == 0 { | ||||
|  | 		log.Printf("当前批次无数据,跳过") | ||||
|  | 		return | ||||
|  | 	} | ||||
|  | 	maxId := MaxId(GnssDatas) | ||||
|  | 	log.Printf("当前批次id=%d => %d", recordInfo.Id, maxId) | ||||
|  | 	recordInfo.Id = maxId | ||||
|  | 	adaptor := the.getAdaptor() | ||||
|  | 	needPush := adaptor.Transform(GnssDatas) | ||||
|  | 	if len(needPush) > 0 { | ||||
|  | 		the.ch <- needPush | ||||
|  | 	} | ||||
|  | 	fileName := "cache.inout" | ||||
|  | 	//发现新月新纪录
 | ||||
|  | 	newTableName := tableNameNow() | ||||
|  | 	if recordInfo.TableName != newTableName { | ||||
|  | 		recordInfo.TableName = newTableName | ||||
|  | 		recordInfo.Id = 0 | ||||
|  | 	} | ||||
|  | 	cacheStr, _ := json.Marshal(recordInfo) | ||||
|  | 	err = utils.SaveCache2File(string(cacheStr), fileName) | ||||
|  | 	if err != nil { | ||||
|  | 		log.Panicf("record id to file,error: %v", err.Error()) | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | } | ||||
|  | func (the *consumerSinoGnssMySQL) getAdaptor() (adaptor adaptors.Adaptor_SINOMYSQL_AXYMQTT) { | ||||
|  | 
 | ||||
|  | 	return adaptors.Adaptor_SINOMYSQL_AXYMQTT{} | ||||
|  | } | ||||
|  | 
 | ||||
|  | func readRecord() (SinoGnssMySQL.RecordInfo, error) { | ||||
|  | 	fileName := "cache.inout" | ||||
|  | 	//文件存在?
 | ||||
|  | 	isExist := utils.FileExists(fileName) | ||||
|  | 	if !isExist { | ||||
|  | 		// 文件不存在,创建文件
 | ||||
|  | 		file, err := os.Create(fileName) | ||||
|  | 		if err != nil { | ||||
|  | 			log.Panicf("Error creating file: %v", err) | ||||
|  | 		} | ||||
|  | 		defaultRecord := SinoGnssMySQL.RecordInfo{ | ||||
|  | 			Id:        0, | ||||
|  | 			TableName: tableNameNow(), | ||||
|  | 		} | ||||
|  | 		str, _ := json.Marshal(defaultRecord) | ||||
|  | 		_, err = file.WriteString(string(str)) | ||||
|  | 		if err != nil { | ||||
|  | 			log.Panicf("file write error: %v", err.Error()) | ||||
|  | 			return SinoGnssMySQL.RecordInfo{}, err | ||||
|  | 		} | ||||
|  | 	} | ||||
|  | 	recordStr, err := utils.ReadCache2File(fileName) | ||||
|  | 	if err != nil { | ||||
|  | 		panic("") | ||||
|  | 	} | ||||
|  | 	record := SinoGnssMySQL.RecordInfo{} | ||||
|  | 	err = json.Unmarshal([]byte(recordStr), &record) | ||||
|  | 	return record, err | ||||
|  | } | ||||
|  | 
 | ||||
|  | func MaxId(GnssDatas []SinoGnssMySQL.GnssData) int64 { | ||||
|  | 	maxId := GnssDatas[0].Id | ||||
|  | 	for _, data := range GnssDatas { | ||||
|  | 		if data.Id > maxId { | ||||
|  | 			maxId = data.Id | ||||
|  | 		} | ||||
|  | 	} | ||||
|  | 	return maxId | ||||
|  | } | ||||
|  | 
 | ||||
|  | func tableNameNow() string { | ||||
|  | 	return "data_gnss_" + time.Now().Format("200601") | ||||
|  | } | ||||
| @ -1,38 +0,0 @@ | |||||
| package dbHelper |  | ||||
| 
 |  | ||||
| import ( |  | ||||
| 	"fmt" |  | ||||
| 	"github.com/gin-gonic/gin" |  | ||||
| 	"log" |  | ||||
| 	"net/http" |  | ||||
| 	"time" |  | ||||
| ) |  | ||||
| 
 |  | ||||
| type RouterFunc struct { |  | ||||
| 	relativePath string               //相对路由 如/gzm/data/upload
 |  | ||||
| 	funcType     string               // 方法类型 如 post ,get
 |  | ||||
| 	fun          func(c *gin.Context) //方法
 |  | ||||
| } |  | ||||
| 
 |  | ||||
| type ApiServerHelper struct { |  | ||||
| 	Port    uint16 |  | ||||
| 	RoutFun map[string]RouterFunc |  | ||||
| } |  | ||||
| 
 |  | ||||
| func (the *ApiServerHelper) Initial() { |  | ||||
| 	router := gin.Default() |  | ||||
| 	for name, routerFunc := range the.RoutFun { |  | ||||
| 		switch routerFunc.funcType { |  | ||||
| 		case http.MethodGet: |  | ||||
| 			router.GET(routerFunc.relativePath, routerFunc.fun) |  | ||||
| 		case http.MethodPost: |  | ||||
| 			router.GET(routerFunc.relativePath, routerFunc.fun) |  | ||||
| 		default: |  | ||||
| 			log.Printf("不支持的 [%s]方法类型 %s", routerFunc.relativePath, routerFunc.funcType) |  | ||||
| 			continue |  | ||||
| 		} |  | ||||
| 		log.Printf("注册路由 %s,监听地址=%s", name, routerFunc.relativePath) |  | ||||
| 	} |  | ||||
| 	router.Run(fmt.Sprintf("0.0.0.0:%d", the.Port)) |  | ||||
| 	time.Sleep(time.Second * 1) |  | ||||
| } |  | ||||
| @ -0,0 +1,58 @@ | |||||
|  | package dbOperate | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"fmt" | ||||
|  | 	"goInOut/config" | ||||
|  | 	"log" | ||||
|  | 	"net/http" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | type ApiServerHelper struct { | ||||
|  | 	mux    *http.ServeMux | ||||
|  | 	routes []config.Router | ||||
|  | 	port   uint | ||||
|  | 	server http.Server | ||||
|  | } | ||||
|  | 
 | ||||
|  | func NewApiServer(port uint, routes []config.Router) *ApiServerHelper { | ||||
|  | 	return &ApiServerHelper{ | ||||
|  | 		mux:    http.NewServeMux(), | ||||
|  | 		routes: routes, | ||||
|  | 		port:   port, | ||||
|  | 	} | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *ApiServerHelper) Initial() { | ||||
|  | 	the.mux = http.NewServeMux() | ||||
|  | 
 | ||||
|  | 	// 创建 HTTP 服务器
 | ||||
|  | 	the.server = http.Server{ | ||||
|  | 		Handler: the.mux, | ||||
|  | 		Addr:    fmt.Sprintf(":%d", the.port), | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	//the.routeRegister()
 | ||||
|  | 
 | ||||
|  | } | ||||
|  | func (the *ApiServerHelper) Run() { | ||||
|  | 	log.Printf("apiServer监听端口 %d", the.port) | ||||
|  | 	the.server.ListenAndServe() | ||||
|  | } | ||||
|  | func (the *ApiServerHelper) routeRegister() { | ||||
|  | 
 | ||||
|  | 	for _, route := range the.routes { | ||||
|  | 		the.mux.HandleFunc(route.Router, func(w http.ResponseWriter, r *http.Request) { | ||||
|  | 			w.Header().Set("Content-Type", "application/json") | ||||
|  | 			s := fmt.Sprintf(`{"route":"%s","resp":"安全带状态应答"}`, route) | ||||
|  | 			println("收到请求", route.Router) | ||||
|  | 			fmt.Fprintf(w, s) | ||||
|  | 		}) | ||||
|  | 		log.Printf("注册路由 %s", route) | ||||
|  | 	} | ||||
|  | } | ||||
|  | func (the *ApiServerHelper) RouteRegister(route string, handler func(w http.ResponseWriter, r *http.Request)) { | ||||
|  | 
 | ||||
|  | 	the.mux.HandleFunc(route, handler) | ||||
|  | 	log.Printf("注册路由 %s", route) | ||||
|  | 
 | ||||
|  | } | ||||
| @ -0,0 +1,101 @@ | |||||
|  | package dbOperate | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	_ "github.com/go-sql-driver/mysql" | ||||
|  | 	"github.com/jmoiron/sqlx" | ||||
|  | 	_ "github.com/lib/pq" | ||||
|  | 	_ "github.com/mattn/go-sqlite3" | ||||
|  | 	"log" | ||||
|  | 	"time" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | const ( | ||||
|  | 	postgres = iota + 1 | ||||
|  | 	sqlite3 | ||||
|  | ) | ||||
|  | 
 | ||||
|  | type DBHelper struct { | ||||
|  | 	dbClient   *sqlx.DB | ||||
|  | 	dbType     string | ||||
|  | 	ConnectStr string | ||||
|  | } | ||||
|  | 
 | ||||
|  | func NewDBHelper(dbType string, connectStr string) *DBHelper { | ||||
|  | 	the := &DBHelper{} | ||||
|  | 	switch dbType { | ||||
|  | 	case "postgres": | ||||
|  | 		fallthrough | ||||
|  | 	case "mysql": | ||||
|  | 		fallthrough | ||||
|  | 	case "sqlite3": | ||||
|  | 		fallthrough | ||||
|  | 	case "有效的数据库类型": | ||||
|  | 		the.dbType = dbType | ||||
|  | 		the.ConnectStr = connectStr | ||||
|  | 	default: | ||||
|  | 		log.Panicf("不支持的数据库类型=> %s", dbType) | ||||
|  | 
 | ||||
|  | 	} | ||||
|  | 	return the | ||||
|  | } | ||||
|  | 
 | ||||
|  | // sqlite3 => connectStr := os.Getwd() + "/db/cz.db"
 | ||||
|  | // mysql => "user:password@tcp(127.0.0.1:3306)/databaseName"
 | ||||
|  | // pg => "host=192.168.10.64 port=5432 user=postgres password=123456 dbname=headscale sslmode=disable"
 | ||||
|  | func (the *DBHelper) dbOpen() error { | ||||
|  | 	var err error | ||||
|  | 	tdb, err := sqlx.Open(the.dbType, the.ConnectStr) | ||||
|  | 	if err != nil { | ||||
|  | 		return err | ||||
|  | 	} | ||||
|  | 	if err = tdb.Ping(); err != nil { | ||||
|  | 		return err | ||||
|  | 	} | ||||
|  | 	the.dbClient = tdb | ||||
|  | 	return nil | ||||
|  | } | ||||
|  | func (the *DBHelper) Exec(dbRecordSQL string) error { | ||||
|  | 	if the.dbClient == nil { | ||||
|  | 		if openErr := the.dbOpen(); openErr != nil { | ||||
|  | 			//logger.Info("[%s]数据库链接失败,err=%v\n", time.Now().Format("2006-01-02 15:04:05.000"), openErr)
 | ||||
|  | 			return openErr | ||||
|  | 		} | ||||
|  | 	} | ||||
|  | 	execResult, execErr := the.dbClient.Exec(dbRecordSQL) | ||||
|  | 	defer the.dbClient.Close() | ||||
|  | 	if execErr != nil { | ||||
|  | 		return execErr | ||||
|  | 	} | ||||
|  | 	if n, err := execResult.RowsAffected(); n > 0 { | ||||
|  | 		return nil | ||||
|  | 	} else { | ||||
|  | 		log.Printf("[%s]执行sql[%s]失败\n", time.Now().Format("2006-01-02 15:04:05.000"), dbRecordSQL) | ||||
|  | 		return err | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *DBHelper) Query(dest any, sql string) error { | ||||
|  | 
 | ||||
|  | 	start := time.Now() | ||||
|  | 
 | ||||
|  | 	if the.dbClient == nil { | ||||
|  | 		if err := the.dbOpen(); err != nil { | ||||
|  | 			log.Printf("数据库链接失败:%s", err.Error()) | ||||
|  | 			return err | ||||
|  | 		} | ||||
|  | 	} | ||||
|  | 	err := the.dbClient.Select(dest, sql) | ||||
|  | 	if err != nil { | ||||
|  | 		log.Printf("数据库查询失败:%s,\n sql=%s", err.Error(), sql) | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	durTime := time.Since(start).Seconds() | ||||
|  | 	log.Printf("查询耗时:%v s", durTime) | ||||
|  | 	return err | ||||
|  | 
 | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *DBHelper) Close() error { | ||||
|  | 	return the.Close() | ||||
|  | } | ||||
| @ -0,0 +1,37 @@ | |||||
|  | package dbOperate | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"goInOut/models" | ||||
|  | 	"testing" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | type res struct { | ||||
|  | 	RLLYCJ      string `json:"LLYCJ"` | ||||
|  | 	RLLCacheMap string `json:"LLCacheMap"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | func TestRedis(t *testing.T) { | ||||
|  | 	addr := "10.8.30.160:30379" | ||||
|  | 	redis := NewRedisHelper("", addr) | ||||
|  | 
 | ||||
|  | 	key1 := "RLLYCJ" | ||||
|  | 	//v := redis.Get(key1)
 | ||||
|  | 	//println(v)
 | ||||
|  | 
 | ||||
|  | 	key2 := "RLLCacheMap" | ||||
|  | 	res1 := res{} | ||||
|  | 
 | ||||
|  | 	v2 := redis.MGet(&res1, key1, key2) | ||||
|  | 	println(v2) | ||||
|  | } | ||||
|  | 
 | ||||
|  | func TestPg(t *testing.T) { | ||||
|  | 	dbType := "postgres" | ||||
|  | 	connectStr := "host=10.8.30.32 port=5432 user=postgres password=123 dbname=NBJJ1215-T sslmode=disable" | ||||
|  | 	db := NewDBHelper(dbType, connectStr) | ||||
|  | 	sql := "select * from t_agg_way" | ||||
|  | 	var r []models.AggWay | ||||
|  | 	db.Query(&r, sql) | ||||
|  | 	println("===") | ||||
|  | 
 | ||||
|  | } | ||||
| @ -0,0 +1,269 @@ | |||||
|  | package dbOperate | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"bytes" | ||||
|  | 	"context" | ||||
|  | 	"encoding/json" | ||||
|  | 	"fmt" | ||||
|  | 	elasticsearch6 "github.com/elastic/go-elasticsearch/v6" | ||||
|  | 	"github.com/elastic/go-elasticsearch/v6/esapi" | ||||
|  | 	"goInOut/models" | ||||
|  | 	"io" | ||||
|  | 	"log" | ||||
|  | 	"strings" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | type ESHelper struct { | ||||
|  | 	addresses []string | ||||
|  | 	//org      string
 | ||||
|  | 	esClient *elasticsearch6.Client | ||||
|  | } | ||||
|  | 
 | ||||
|  | func NewESHelper(addresses []string, user, pwd string) *ESHelper { | ||||
|  | 	es, _ := elasticsearch6.NewClient(elasticsearch6.Config{ | ||||
|  | 		Addresses: addresses, | ||||
|  | 		Username:  user, | ||||
|  | 		Password:  pwd, | ||||
|  | 	}) | ||||
|  | 	res, err := es.Info() | ||||
|  | 	if err != nil { | ||||
|  | 		log.Fatalf("Error getting response: %s", err) | ||||
|  | 	} | ||||
|  | 	log.Printf("链接到es[%s]info=%v", elasticsearch6.Version, res) | ||||
|  | 	return &ESHelper{ | ||||
|  | 		addresses: addresses, | ||||
|  | 		esClient:  es, | ||||
|  | 	} | ||||
|  | } | ||||
|  | func (the *ESHelper) SearchRaw(index, reqBody string) []models.HitRaw { | ||||
|  | 	body := &bytes.Buffer{} | ||||
|  | 	body.WriteString(reqBody) | ||||
|  | 	response, err := the.esClient.Search( | ||||
|  | 		the.esClient.Search.WithIndex(index), | ||||
|  | 		the.esClient.Search.WithBody(body), | ||||
|  | 	) | ||||
|  | 	defer response.Body.Close() | ||||
|  | 	if err != nil { | ||||
|  | 		return nil | ||||
|  | 	} | ||||
|  | 	r := models.EsRawResp{} | ||||
|  | 	// Deserialize the response into a map.
 | ||||
|  | 	if err := json.NewDecoder(response.Body).Decode(&r); err != nil { | ||||
|  | 		log.Fatalf("Error parsing the response body: %s", err) | ||||
|  | 	} | ||||
|  | 	return r.Hits.Hits | ||||
|  | } | ||||
|  | func (the *ESHelper) Search(index, reqBody string) { | ||||
|  | 	body := &bytes.Buffer{} | ||||
|  | 	body.WriteString(reqBody) | ||||
|  | 	response, err := the.esClient.Search( | ||||
|  | 		the.esClient.Search.WithIndex(index), | ||||
|  | 		the.esClient.Search.WithBody(body), | ||||
|  | 	) | ||||
|  | 
 | ||||
|  | 	if err != nil { | ||||
|  | 		//return nil, err
 | ||||
|  | 	} | ||||
|  | 	log.Println(response.Status()) | ||||
|  | 	var r map[string]any | ||||
|  | 	// Deserialize the response into a map.
 | ||||
|  | 	if err := json.NewDecoder(response.Body).Decode(&r); err != nil { | ||||
|  | 		log.Fatalf("Error parsing the response body: %s", err) | ||||
|  | 	} | ||||
|  | 	// Print the response status, number of results, and request duration.
 | ||||
|  | 	log.Printf( | ||||
|  | 		"[%s] %d hits; took: %dms", | ||||
|  | 		response.Status(), | ||||
|  | 		int(r["hits"].(map[string]any)["total"].(float64)), | ||||
|  | 		int(r["took"].(float64)), | ||||
|  | 	) | ||||
|  | 
 | ||||
|  | 	for _, hit := range r["hits"].(map[string]any)["hits"].([]any) { | ||||
|  | 
 | ||||
|  | 		source := hit.(map[string]any)["_source"] | ||||
|  | 		log.Printf(" * ID=%s, %s", hit.(map[string]any)["_id"], source) | ||||
|  | 	} | ||||
|  | 	log.Println(strings.Repeat("=", 37)) | ||||
|  | } | ||||
|  | func (the *ESHelper) request(index, reqBody string) (map[string]any, error) { | ||||
|  | 	// Set up the request object.
 | ||||
|  | 	req := esapi.IndexRequest{ | ||||
|  | 		Index: index, | ||||
|  | 		//DocumentID: strconv.Itoa(i + 1),
 | ||||
|  | 		Body:    strings.NewReader(reqBody), | ||||
|  | 		Refresh: "true", | ||||
|  | 	} | ||||
|  | 	// Perform the request with the client.
 | ||||
|  | 	res, err := req.Do(context.Background(), the.esClient) | ||||
|  | 	if err != nil { | ||||
|  | 		log.Fatalf("Error getting response: %s", err) | ||||
|  | 	} | ||||
|  | 	defer res.Body.Close() | ||||
|  | 	var r map[string]any | ||||
|  | 	if res.IsError() { | ||||
|  | 		log.Printf("[%s] Error indexing document ID=%d", res.Status(), 0) | ||||
|  | 	} else { | ||||
|  | 		// Deserialize the response into a map.
 | ||||
|  | 
 | ||||
|  | 		if err := json.NewDecoder(res.Body).Decode(&r); err != nil { | ||||
|  | 			log.Printf("Error parsing the response body: %s", err) | ||||
|  | 		} else { | ||||
|  | 			// Print the response status and indexed document version.
 | ||||
|  | 			log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64))) | ||||
|  | 		} | ||||
|  | 	} | ||||
|  | 	return r, err | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *ESHelper) searchRaw(index, reqBody string) (models.IotaData, error) { | ||||
|  | 	respmap, err := the.request(index, reqBody) | ||||
|  | 	if respmap != nil { | ||||
|  | 
 | ||||
|  | 	} | ||||
|  | 	iotaDatas := models.IotaData{} | ||||
|  | 	return iotaDatas, err | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *ESHelper) searchThemes(index, reqBody string) (models.EsThemeResp, error) { | ||||
|  | 	body := &bytes.Buffer{} | ||||
|  | 	body.WriteString(reqBody) | ||||
|  | 	response, err := the.esClient.Search( | ||||
|  | 		the.esClient.Search.WithIndex(index), | ||||
|  | 		the.esClient.Search.WithBody(body), | ||||
|  | 	) | ||||
|  | 	defer response.Body.Close() | ||||
|  | 	if err != nil { | ||||
|  | 		//return nil, err
 | ||||
|  | 	} | ||||
|  | 	log.Println(response.Status()) | ||||
|  | 	r := models.EsThemeResp{} | ||||
|  | 	// Deserialize the response into a map.
 | ||||
|  | 	if err := json.NewDecoder(response.Body).Decode(&r); err != nil { | ||||
|  | 		log.Fatalf("Error parsing the response body: %s", err) | ||||
|  | 	} | ||||
|  | 	return r, err | ||||
|  | } | ||||
|  | func (the *ESHelper) SearchLatestStationData(index string, sensorId int) (models.EsTheme, error) { | ||||
|  | 	//sensorId := 178
 | ||||
|  | 	queryBody := fmt.Sprintf(`{ | ||||
|  |   "size": 1,  | ||||
|  |   "query": { | ||||
|  |     "term": { | ||||
|  |       "sensor": { | ||||
|  |         "value": %d | ||||
|  |       } | ||||
|  |     } | ||||
|  |   },  | ||||
|  |   "sort": [ | ||||
|  |     { | ||||
|  |       "collect_time": { | ||||
|  |         "order": "desc" | ||||
|  |       } | ||||
|  |     } | ||||
|  |   ] | ||||
|  | }`, sensorId) | ||||
|  | 	//index := "go_native_themes"
 | ||||
|  | 	themes, err := the.searchThemes(index, queryBody) | ||||
|  | 
 | ||||
|  | 	var theme models.EsTheme | ||||
|  | 	if len(themes.Hits.Hits) > 0 { | ||||
|  | 		theme = themes.Hits.Hits[0].Source | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	return theme, err | ||||
|  | } | ||||
|  | func (the *ESHelper) BulkWrite(index, reqBody string) { | ||||
|  | 
 | ||||
|  | 	body := &bytes.Buffer{} | ||||
|  | 	body.WriteString(reqBody) | ||||
|  | 	bulkRequest := esapi.BulkRequest{ | ||||
|  | 		Index:        index, | ||||
|  | 		Body:         body, | ||||
|  | 		DocumentType: "_doc", | ||||
|  | 	} | ||||
|  | 	res, err := bulkRequest.Do(context.Background(), the.esClient) | ||||
|  | 	defer res.Body.Close() | ||||
|  | 	if err != nil { | ||||
|  | 		log.Panicf("es 写入[%s],err=%s", index, err.Error()) | ||||
|  | 		return | ||||
|  | 	} | ||||
|  | 	respBody, _ := io.ReadAll(res.Body) | ||||
|  | 	if res.StatusCode != 200 && res.StatusCode != 201 { | ||||
|  | 		log.Panicf("es 写入失败,err=%s \n body=%s", string(respBody), reqBody) | ||||
|  | 	} | ||||
|  | 	//log.Printf("es 写入[%s],完成,res=%s ", index, respBody)
 | ||||
|  | 
 | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *ESHelper) BulkWriteWithLog(index, reqBody string) { | ||||
|  | 
 | ||||
|  | 	body := &bytes.Buffer{} | ||||
|  | 	body.WriteString(reqBody) | ||||
|  | 	bulkRequest := esapi.BulkRequest{ | ||||
|  | 		Index:        index, | ||||
|  | 		Body:         body, | ||||
|  | 		DocumentType: "_doc", | ||||
|  | 	} | ||||
|  | 	res, err := bulkRequest.Do(context.Background(), the.esClient) | ||||
|  | 	defer res.Body.Close() | ||||
|  | 	if err != nil { | ||||
|  | 		log.Panicf("es 写入[%s],err=%s", index, err.Error()) | ||||
|  | 		return | ||||
|  | 	} | ||||
|  | 	respBody, _ := io.ReadAll(res.Body) | ||||
|  | 	if res.StatusCode != 200 && res.StatusCode != 201 { | ||||
|  | 		log.Panicf("es 写入失败,err=%s \n body=%s", string(respBody), reqBody) | ||||
|  | 	} | ||||
|  | 	log.Printf("es 写入[%s],完成,res=%s ", index, respBody) | ||||
|  | 
 | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *ESHelper) BulkWriteRaws2Es(index string, raws []models.EsRaw) { | ||||
|  | 
 | ||||
|  | 	//log 测试用
 | ||||
|  | 	const logTagDeviceId = "91da4d1f-fbc7-4dad-bedd-f8ff05c0e0e0" | ||||
|  | 
 | ||||
|  | 	logTag := false | ||||
|  | 
 | ||||
|  | 	body := strings.Builder{} | ||||
|  | 	for _, raw := range raws { | ||||
|  | 		//  scala =>  val id = UUID.nameUUIDFromBytes(s"${v.deviceId}-${v.acqTime.getMillis}".getBytes("UTF-8")).toString
 | ||||
|  | 		source, _ := json.Marshal(raw) | ||||
|  | 		_id := raw.IotaDevice | ||||
|  | 		s := fmt.Sprintf( | ||||
|  | 			`{"index": {"_index": "%s","_id": "%s"}} | ||||
|  | %s | ||||
|  | `, index, _id, source) | ||||
|  | 		body.WriteString(s) | ||||
|  | 
 | ||||
|  | 		if raw.IotaDevice == logTagDeviceId { | ||||
|  | 			log.Printf("BulkWriteRaws2Es  标记设备数据 [%s] %s ", logTagDeviceId, string(s)) | ||||
|  | 			logTag = true | ||||
|  | 		} | ||||
|  | 	} | ||||
|  | 	if logTag { //追踪数据
 | ||||
|  | 		the.BulkWriteWithLog(index, body.String()) | ||||
|  | 	} else { | ||||
|  | 		the.BulkWrite(index, body.String()) | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *ESHelper) BulkWriteRaws2EsLast(index string, raws []models.EsRaw) { | ||||
|  | 	body := strings.Builder{} | ||||
|  | 	for _, raw := range raws { | ||||
|  | 		source, _ := json.Marshal(raw) | ||||
|  | 		_id := raw.IotaDevice | ||||
|  | 		s := fmt.Sprintf( | ||||
|  | 			`{"index": {"_index": "%s","_id": "%s"}} | ||||
|  | %s | ||||
|  | `, index, _id, source) | ||||
|  | 		body.WriteString(s) | ||||
|  | 	} | ||||
|  | 	the.BulkWrite(index, body.String()) | ||||
|  | 
 | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *ESHelper) Close() { | ||||
|  | 
 | ||||
|  | } | ||||
| @ -1,4 +1,4 @@ | |||||
| package dbHelper | package dbOperate | ||||
| 
 | 
 | ||||
| import ( | import ( | ||||
| 	"io" | 	"io" | ||||
| @ -0,0 +1,133 @@ | |||||
|  | package dbOperate | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"context" | ||||
|  | 	"encoding/json" | ||||
|  | 	"errors" | ||||
|  | 	"github.com/redis/go-redis/v9" | ||||
|  | 	"log" | ||||
|  | 	"time" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | type RedisHelper struct { | ||||
|  | 	rdb     redis.UniversalClient | ||||
|  | 	isReady bool | ||||
|  | 	ctx     context.Context | ||||
|  | } | ||||
|  | 
 | ||||
|  | func NewRedisHelper(master string, address ...string) *RedisHelper { | ||||
|  | 	r := &RedisHelper{ctx: context.Background()} | ||||
|  | 	r.InitialCluster(master, address...) | ||||
|  | 	return r | ||||
|  | 
 | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *RedisHelper) InitialCluster(master string, address ...string) { | ||||
|  | 
 | ||||
|  | 	the.rdb = redis.NewUniversalClient(&redis.UniversalOptions{ | ||||
|  | 		Addrs:      address, | ||||
|  | 		MasterName: master, | ||||
|  | 	}) | ||||
|  | 	log.Printf("redis 初始化完成 %s", address) | ||||
|  | 	the.isReady = true | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *RedisHelper) Get(key string) string { | ||||
|  | 	val, err := the.rdb.Get(the.ctx, key).Result() | ||||
|  | 	if errors.Is(err, redis.Nil) { | ||||
|  | 		log.Printf("%s does not exist", key) | ||||
|  | 	} else if err != nil { | ||||
|  | 		panic(err) | ||||
|  | 	} else { | ||||
|  | 		//log.Printf("get key => %s =%s", key, val)
 | ||||
|  | 	} | ||||
|  | 	return val | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *RedisHelper) GetObj(keys string, addr any) error { | ||||
|  | 	err := the.rdb.Get(the.ctx, keys).Scan(addr) | ||||
|  | 	if errors.Is(err, redis.Nil) { | ||||
|  | 		log.Printf("%s does not exist", keys) | ||||
|  | 	} else if err != nil { | ||||
|  | 		es := err.Error() | ||||
|  | 		log.Printf("err=%s ", es) | ||||
|  | 		return err | ||||
|  | 	} | ||||
|  | 	return nil | ||||
|  | } | ||||
|  | func (the *RedisHelper) SetObj(keys string, obj any) error { | ||||
|  | 	rs, err := the.rdb.Set(the.ctx, keys, obj, time.Minute*5).Result() | ||||
|  | 	log.Printf("rs=%s ", rs) | ||||
|  | 	if err != nil { | ||||
|  | 		log.Printf("err=%s ", err.Error()) | ||||
|  | 	} | ||||
|  | 	return err | ||||
|  | } | ||||
|  | func (the *RedisHelper) GetLRange(keys string, addr any) error { | ||||
|  | 	err := the.rdb.LRange(the.ctx, keys, 0, -1).ScanSlice(addr) | ||||
|  | 	if errors.Is(err, redis.Nil) { | ||||
|  | 		log.Printf("%s does not exist", keys) | ||||
|  | 	} else if err != nil { | ||||
|  | 		log.Printf("err=%s ", err.Error()) | ||||
|  | 		return err | ||||
|  | 	} | ||||
|  | 	return nil | ||||
|  | } | ||||
|  | func (the *RedisHelper) MGet(addr any, keys ...string) error { | ||||
|  | 	err := the.rdb.MGet(the.ctx, keys...).Scan(addr) | ||||
|  | 	if errors.Is(err, redis.Nil) { | ||||
|  | 		log.Printf("%s does not exist", keys) | ||||
|  | 	} else if err != nil { | ||||
|  | 		log.Printf("err=%s ", err.Error()) | ||||
|  | 		return err | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	return err | ||||
|  | } | ||||
|  | func (the *RedisHelper) MGetObj(addr any, keys ...string) error { | ||||
|  | 	err := the.rdb.MGet(the.ctx, keys...).Scan(addr) | ||||
|  | 	if errors.Is(err, redis.Nil) { | ||||
|  | 		log.Printf("%s does not exist", keys) | ||||
|  | 	} else if err != nil { | ||||
|  | 		log.Printf("err=%s ", err.Error()) | ||||
|  | 		return err | ||||
|  | 	} | ||||
|  | 	return nil | ||||
|  | } | ||||
|  | func (the *RedisHelper) HMGetObj(addr any, key, field string) error { | ||||
|  | 	rp, err := the.rdb.HMGet(the.ctx, key, field).Result() | ||||
|  | 	if errors.Is(err, redis.Nil) { | ||||
|  | 		log.Printf("%s does not exist", key) | ||||
|  | 		return err | ||||
|  | 	} else if err != nil { | ||||
|  | 		log.Printf("err=%s ", err.Error()) | ||||
|  | 		return err | ||||
|  | 	} | ||||
|  | 	for _, i := range rp { | ||||
|  | 		if v, ok := i.(string); ok { | ||||
|  | 			err := json.Unmarshal([]byte(v), addr) | ||||
|  | 			if err != nil { | ||||
|  | 				return err | ||||
|  | 			} | ||||
|  | 		} | ||||
|  | 	} | ||||
|  | 	//todo scan有问题 后续待排查
 | ||||
|  | 	return nil | ||||
|  | 
 | ||||
|  | 	//err := the.rdb.HMGet(the.ctx, key, field).Scan(addr)
 | ||||
|  | 	//if errors.Is(err, redis.Nil) {
 | ||||
|  | 	//	log.Printf("%s does not exist", key)
 | ||||
|  | 	//} else if err != nil {
 | ||||
|  | 	//	log.Printf("err=%s ", err.Error())
 | ||||
|  | 	//	return err
 | ||||
|  | 	//}
 | ||||
|  | 	//return nil
 | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *RedisHelper) SRem(key string, members ...string) int64 { | ||||
|  | 	return the.rdb.SRem(the.ctx, key, members).Val() | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *RedisHelper) SAdd(key string, members ...string) int64 { | ||||
|  | 	return the.rdb.SAdd(the.ctx, key, members).Val() | ||||
|  | } | ||||
| @ -1,4 +1,4 @@ | |||||
| package dbHelper | package dbOperate | ||||
| 
 | 
 | ||||
| import ( | import ( | ||||
| 	"fmt" | 	"fmt" | ||||
| @ -1,18 +0,0 @@ | |||||
| 2024/10/17 11:33:20.528503 main.go:24: =================log start================= |  | ||||
| 2024/10/17 11:33:20.540075 main.go:25: ==> |  | ||||
| 2024/10/17 11:33:20.540075 main.go:30: 进程启动 |  | ||||
| 2024/10/17 11:33:20.540075 init.go:20: 加载配置文件:config_魏家滑坡_视觉位移.json |  | ||||
| 2024/10/17 11:33:20.540592 init.go:20: 加载配置文件:弃用备份 |  | ||||
| 2024/10/17 11:33:20.540592 init.go:22: 非文件[弃用备份]跳过 |  | ||||
| 2024/10/17 11:33:22.546148 mqttHelper.go:48: mqtt连接状态异常 tcp://10.8.30.160:1883(u:wjhp-sjwy-upload,p:123456,cid:wjhp-sjwy-upload) [err=network Error : dial tcp 10.8.30.160:1883: connectex: No connection could be made because the target machine actively refused it.] |  | ||||
| 2024/10/17 11:33:22.546148 mqttHelper.go:49: mqtt重连,30s后尝试,剩余次数=3 |  | ||||
| 2024/10/17 11:33:53.265276 main.go:24: =================log start================= |  | ||||
| 2024/10/17 11:33:53.285624 main.go:25: ==> |  | ||||
| 2024/10/17 11:33:53.285624 main.go:30: 进程启动 |  | ||||
| 2024/10/17 11:33:53.286127 init.go:20: 加载配置文件:config_魏家滑坡_视觉位移.json |  | ||||
| 2024/10/17 11:33:53.286698 init.go:20: 加载配置文件:弃用备份 |  | ||||
| 2024/10/17 11:33:53.286698 init.go:22: 非文件[弃用备份]跳过 |  | ||||
| 2024/10/17 11:33:53.289208 mqttHelper.go:28: mqtt触发重连后的重订阅 |  | ||||
| 2024/10/17 11:33:55.290027 mqttHelper.go:100: =================开始订阅 10.8.30.160 [/fs-flexometer/admin123]================= |  | ||||
| 2024/10/17 11:34:12.251376 mqttHelper.go:28: mqtt触发重连后的重订阅 |  | ||||
| 2024/10/17 11:34:12.251376 mqttHelper.go:100: =================开始订阅 10.8.30.160 [/fs-flexometer/admin123]================= |  | ||||
| @ -0,0 +1,7 @@ | |||||
|  | package models | ||||
|  | 
 | ||||
|  | type GDND2luAn struct { | ||||
|  | 	Id    string               `json:"id"` | ||||
|  | 	Time  string               `json:"time"` | ||||
|  | 	Value map[string][]float32 `json:"value"` | ||||
|  | } | ||||
| @ -0,0 +1,40 @@ | |||||
|  | package models | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"time" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | type IotaData struct { | ||||
|  | 	UserId      string    `json:"userId"` | ||||
|  | 	ThingId     string    `json:"thingId"` | ||||
|  | 	DimensionId string    `json:"dimensionId"` | ||||
|  | 	DimCapId    string    `json:"dimCapId"` | ||||
|  | 	CapId       string    `json:"capId"` | ||||
|  | 	DeviceId    string    `json:"deviceId"` | ||||
|  | 	ScheduleId  string    `json:"scheduleId"` | ||||
|  | 	TaskId      string    `json:"taskId"` | ||||
|  | 	JobId       int       `json:"jobId"` | ||||
|  | 	JobRepeatId int       `json:"jobRepeatId"` | ||||
|  | 	TriggerTime time.Time `json:"triggerTime"` | ||||
|  | 	RealTime    time.Time `json:"realTime"` | ||||
|  | 	FinishTime  time.Time `json:"finishTime"` | ||||
|  | 	Seq         int       `json:"seq"` | ||||
|  | 	Released    bool      `json:"released"` | ||||
|  | 	Data        Data      `json:"data"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | type Data struct { | ||||
|  | 	Type   int            `json:"type"` | ||||
|  | 	Data   map[string]any `json:"data"` | ||||
|  | 	Result struct { | ||||
|  | 		Code     int    `json:"code"` | ||||
|  | 		Msg      string `json:"msg"` | ||||
|  | 		Detail   string `json:"detail"` | ||||
|  | 		ErrTimes int    `json:"errTimes"` | ||||
|  | 		Dropped  bool   `json:"dropped"` | ||||
|  | 	} `json:"result"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *Data) Success() bool { | ||||
|  | 	return the.Result.Code == 0 | ||||
|  | } | ||||
| @ -0,0 +1,6 @@ | |||||
|  | package models | ||||
|  | 
 | ||||
|  | type AggWay struct { | ||||
|  | 	Id   int64  `db:"id"` | ||||
|  | 	Name string `db:"name"` | ||||
|  | } | ||||
| @ -0,0 +1,6 @@ | |||||
|  | package models | ||||
|  | 
 | ||||
|  | const ( | ||||
|  | 	RawTypeVib  = "vib" | ||||
|  | 	RawTypeDiag = "diag" | ||||
|  | ) | ||||
| @ -0,0 +1,107 @@ | |||||
|  | package models | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"time" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | type DeviceData struct { | ||||
|  | 	DeviceId    string | ||||
|  | 	Name        string | ||||
|  | 	ThingId     string | ||||
|  | 	StructId    int | ||||
|  | 	TaskId      string | ||||
|  | 	AcqTime     time.Time | ||||
|  | 	RealTime    time.Time | ||||
|  | 	ErrCode     int | ||||
|  | 	Raw         map[string]any | ||||
|  | 	RawUnit     map[string]string | ||||
|  | 	DeviceInfo  DeviceInfo | ||||
|  | 	DimensionId string | ||||
|  | 	//数据类型 常见有 comm="" ,RawTypeVib="vib"
 | ||||
|  | 	DataType string | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (d *DeviceData) GetVibrationData() VibrationData { | ||||
|  | 	vibData := VibrationData{} | ||||
|  | 	if d.DataType == RawTypeVib { | ||||
|  | 		if v, ok := d.Raw["filterFreq"]; ok { | ||||
|  | 			if vv, ok := v.(float64); ok { | ||||
|  | 				vibData.FilterFreq = vv | ||||
|  | 			} | ||||
|  | 		} | ||||
|  | 
 | ||||
|  | 		if v, ok := d.Raw["sampleFreq"]; ok { | ||||
|  | 			if vv, ok := v.(float64); ok { | ||||
|  | 				vibData.SampleFreq = vv | ||||
|  | 			} | ||||
|  | 		} | ||||
|  | 
 | ||||
|  | 		if v, ok := d.Raw["gainAmplifier"]; ok { | ||||
|  | 			if vv, ok := v.(float64); ok { | ||||
|  | 				vibData.GainAmplifier = byte(vv) | ||||
|  | 			} | ||||
|  | 		} | ||||
|  | 
 | ||||
|  | 		if v, ok := d.Raw["version"]; ok { | ||||
|  | 			if vv, ok := v.(float64); ok { | ||||
|  | 				vibData.Version = byte(vv) | ||||
|  | 			} | ||||
|  | 		} | ||||
|  | 
 | ||||
|  | 		if v, ok := d.Raw["triggerType"]; ok { | ||||
|  | 			if vv, ok := v.(float64); ok { | ||||
|  | 				vibData.TriggerType = byte(vv) | ||||
|  | 			} | ||||
|  | 		} | ||||
|  | 
 | ||||
|  | 		if v, ok := d.Raw["physicalvalue"]; ok { | ||||
|  | 
 | ||||
|  | 			if vSlice, ok := v.([]any); ok { | ||||
|  | 				for _, vObj := range vSlice { | ||||
|  | 					if vv, ok := vObj.(float64); ok { | ||||
|  | 						vibData.Data = append(vibData.Data, vv) | ||||
|  | 					} | ||||
|  | 				} | ||||
|  | 
 | ||||
|  | 			} | ||||
|  | 
 | ||||
|  | 			//去直流
 | ||||
|  | 			if len(vibData.Data) > 0 { | ||||
|  | 				avg := func(dataArray []float64) float64 { | ||||
|  | 					sum := 0.0 | ||||
|  | 					for _, f := range dataArray { | ||||
|  | 						sum += f | ||||
|  | 					} | ||||
|  | 					return sum / float64(len(dataArray)) | ||||
|  | 				}(vibData.Data) //common_calc.GetAvg(vibData.Data)
 | ||||
|  | 
 | ||||
|  | 				for i := 0; i < len(vibData.Data); i++ { | ||||
|  | 					vibData.Data[i] = vibData.Data[i] - avg | ||||
|  | 				} | ||||
|  | 			} | ||||
|  | 		} | ||||
|  | 
 | ||||
|  | 	} | ||||
|  | 	return vibData | ||||
|  | } | ||||
|  | 
 | ||||
|  | // VibrationData 振动数据
 | ||||
|  | type VibrationData struct { | ||||
|  | 	Version       byte | ||||
|  | 	SampleFreq    float64 | ||||
|  | 	FilterFreq    float64 | ||||
|  | 	GainAmplifier byte | ||||
|  | 	TriggerType   byte | ||||
|  | 	Data          []float64 // 原始波形数据
 | ||||
|  | 	Unit          string | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (v *VibrationData) FormatParams() map[string]any { | ||||
|  | 	return map[string]any{ | ||||
|  | 		"sampleFreq":    v.SampleFreq, | ||||
|  | 		"version":       v.Version, | ||||
|  | 		"filterFreq":    v.FilterFreq, | ||||
|  | 		"gainAmplifier": v.GainAmplifier, | ||||
|  | 		"triggerType":   v.TriggerType, | ||||
|  | 	} | ||||
|  | } | ||||
| @ -0,0 +1,70 @@ | |||||
|  | package models | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"encoding/json" | ||||
|  | 	"fmt" | ||||
|  | 	"time" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | type DeviceInfo struct { | ||||
|  | 	Id          string     `json:"id"` | ||||
|  | 	Name        string     `json:"name"` | ||||
|  | 	Structure   Structure  `json:"structure"` | ||||
|  | 	DeviceMeta  DeviceMeta `json:"device_meta"` | ||||
|  | 	RefreshTime time.Time | ||||
|  | } | ||||
|  | 
 | ||||
|  | type DeviceMeta struct { | ||||
|  | 	Id           string           `json:"id"` | ||||
|  | 	Name         string           `json:"name"` | ||||
|  | 	Model        string           `json:"model"` | ||||
|  | 	Properties   []IotaProperty   `json:"properties"` | ||||
|  | 	Capabilities []IotaCapability `json:"capabilities"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (m *DeviceMeta) GetOutputProps() (out map[string]string) { | ||||
|  | 	out = make(map[string]string) | ||||
|  | 	if len(m.Capabilities) == 0 { | ||||
|  | 		return | ||||
|  | 	} | ||||
|  | 	for _, property := range m.Capabilities[0].Properties { | ||||
|  | 		info := fmt.Sprintf("%s(%s)", property.ShowName, property.Unit) | ||||
|  | 		out[property.Name] = info | ||||
|  | 	} | ||||
|  | 	return | ||||
|  | } | ||||
|  | func (m *DeviceMeta) GetOutputUnit() (out map[string]string) { | ||||
|  | 	out = make(map[string]string) | ||||
|  | 	if len(m.Capabilities) == 0 { | ||||
|  | 		return | ||||
|  | 	} | ||||
|  | 	for _, property := range m.Capabilities[0].Properties { | ||||
|  | 		out[property.Name] = property.Unit | ||||
|  | 	} | ||||
|  | 	return | ||||
|  | } | ||||
|  | 
 | ||||
|  | // redis序列化
 | ||||
|  | func (m *DeviceMeta) MarshalBinary() (data []byte, err error) { | ||||
|  | 	return json.Marshal(m) | ||||
|  | } | ||||
|  | 
 | ||||
|  | // redis序列化
 | ||||
|  | func (m *DeviceMeta) UnmarshalBinary(data []byte) error { | ||||
|  | 	return json.Unmarshal(data, m) | ||||
|  | 
 | ||||
|  | } | ||||
|  | 
 | ||||
|  | type IotaCapability struct { | ||||
|  | 	CapabilityCategoryId int            `json:"capabilityCategoryId"` | ||||
|  | 	Id                   string         `json:"id"` | ||||
|  | 	Name                 string         `json:"name"` | ||||
|  | 	Properties           []IotaProperty `json:"properties"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | type IotaProperty struct { | ||||
|  | 	Category string `json:"category"` | ||||
|  | 	Name     string `json:"name"` | ||||
|  | 	ShowName string `json:"showName"` | ||||
|  | 	Unit     string `json:"unit"` | ||||
|  | } | ||||
| @ -0,0 +1,37 @@ | |||||
|  | package models | ||||
|  | 
 | ||||
|  | import "time" | ||||
|  | 
 | ||||
|  | type EsRaw struct { | ||||
|  | 	StructId       int               `json:"structId"` | ||||
|  | 	IotaDeviceName string            `json:"iota_device_name"` | ||||
|  | 	Data           map[string]any    `json:"data"` | ||||
|  | 	CollectTime    string            `json:"collect_time"` | ||||
|  | 	Meta           map[string]string `json:"meta"` | ||||
|  | 	IotaDevice     string            `json:"iota_device"` | ||||
|  | 	CreateTime     time.Time         `json:"create_time"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | type EsRawResp struct { | ||||
|  | 	Took     int  `json:"took"` | ||||
|  | 	TimedOut bool `json:"timed_out"` | ||||
|  | 	Shards   struct { | ||||
|  | 		Total      int `json:"total"` | ||||
|  | 		Successful int `json:"successful"` | ||||
|  | 		Skipped    int `json:"skipped"` | ||||
|  | 		Failed     int `json:"failed"` | ||||
|  | 	} `json:"_shards"` | ||||
|  | 	Hits struct { | ||||
|  | 		Total    int      `json:"total"` | ||||
|  | 		MaxScore float64  `json:"max_score"` | ||||
|  | 		Hits     []HitRaw `json:"hits"` | ||||
|  | 	} `json:"hits"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | type HitRaw struct { | ||||
|  | 	Index  string  `json:"_index"` | ||||
|  | 	Type   string  `json:"_type"` | ||||
|  | 	Id     string  `json:"_id"` | ||||
|  | 	Score  float64 `json:"_score"` | ||||
|  | 	Source EsRaw   `json:"_source"` | ||||
|  | } | ||||
| @ -0,0 +1,41 @@ | |||||
|  | package models | ||||
|  | 
 | ||||
|  | import "time" | ||||
|  | 
 | ||||
|  | type EsTheme struct { | ||||
|  | 	SensorName      string         `json:"sensor_name"` | ||||
|  | 	FactorName      string         `json:"factor_name"` | ||||
|  | 	FactorProtoCode string         `json:"factor_proto_code"` | ||||
|  | 	Data            map[string]any `json:"data"` | ||||
|  | 	FactorProtoName string         `json:"factor_proto_name"` | ||||
|  | 	Factor          int            `json:"factor"` | ||||
|  | 	CollectTime     time.Time      `json:"collect_time"` | ||||
|  | 	Sensor          int            `json:"sensor"` | ||||
|  | 	Structure       int            `json:"structure"` | ||||
|  | 	IotaDevice      []string       `json:"iota_device"` | ||||
|  | 	CreateTime      time.Time      `json:"create_time"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | type EsThemeResp struct { | ||||
|  | 	Took     int  `json:"took"` | ||||
|  | 	TimedOut bool `json:"timed_out"` | ||||
|  | 	Shards   struct { | ||||
|  | 		Total      int `json:"total"` | ||||
|  | 		Successful int `json:"successful"` | ||||
|  | 		Skipped    int `json:"skipped"` | ||||
|  | 		Failed     int `json:"failed"` | ||||
|  | 	} `json:"_shards"` | ||||
|  | 	Hits struct { | ||||
|  | 		Total    int        `json:"total"` | ||||
|  | 		MaxScore float64    `json:"max_score"` | ||||
|  | 		Hits     []HitTheme `json:"hits"` | ||||
|  | 	} `json:"hits"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | type HitTheme struct { | ||||
|  | 	Index  string  `json:"_index"` | ||||
|  | 	Type   string  `json:"_type"` | ||||
|  | 	Id     string  `json:"_id"` | ||||
|  | 	Score  float64 `json:"_score"` | ||||
|  | 	Source EsTheme `json:"_source"` | ||||
|  | } | ||||
| @ -0,0 +1,25 @@ | |||||
|  | package models | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"encoding/json" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | type IotaDevice struct { | ||||
|  | 	Id           string     `json:"id"` | ||||
|  | 	Name         string     `json:"name"` | ||||
|  | 	Properties   string     `json:"properties"` | ||||
|  | 	DeviceMetaId string     `json:"deviceMetaId"` | ||||
|  | 	ThingId      string     `json:"thingId"` | ||||
|  | 	DeviceMeta   DeviceMeta `json:"deviceMeta"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | // redis序列化
 | ||||
|  | func (m *IotaDevice) MarshalBinary() (data []byte, err error) { | ||||
|  | 	return json.Marshal(m) | ||||
|  | } | ||||
|  | 
 | ||||
|  | // redis序列化
 | ||||
|  | func (m *IotaDevice) UnmarshalBinary(data []byte) error { | ||||
|  | 	return json.Unmarshal(data, m) | ||||
|  | 
 | ||||
|  | } | ||||
| @ -0,0 +1,31 @@ | |||||
|  | package models | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"encoding/json" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | type Structure struct { | ||||
|  | 	ThingId string `json:"thingId"` | ||||
|  | 	Id      int    `json:"id"` | ||||
|  | 	Name    string `json:"name"` | ||||
|  | 	Type    string `json:"type"` | ||||
|  | 	OrgId   int    `json:"orgId"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | type ThingStruct struct { | ||||
|  | 	ThingId string `json:"thingId"` | ||||
|  | 	Id      int    `json:"id"` | ||||
|  | 	Name    string `json:"name"` | ||||
|  | 	Type    string `json:"type"` | ||||
|  | 	OrgId   int    `json:"orgId"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | // redis序列化
 | ||||
|  | func (m *ThingStruct) MarshalBinary() (data []byte, err error) { | ||||
|  | 	return json.Marshal(m) | ||||
|  | } | ||||
|  | 
 | ||||
|  | // redis序列化
 | ||||
|  | func (m *ThingStruct) UnmarshalBinary(data []byte) error { | ||||
|  | 	return json.Unmarshal(data, m) | ||||
|  | } | ||||
| @ -0,0 +1,14 @@ | |||||
|  | package monitors | ||||
|  | 
 | ||||
|  | type CommonMonitor struct { | ||||
|  | 	*MonitorHelper | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *CommonMonitor) RegisterFun(fun func()) { | ||||
|  | 	the.registerFun(fun) | ||||
|  | } | ||||
|  | 
 | ||||
|  | func (the *CommonMonitor) Start() { | ||||
|  | 	the.initial() | ||||
|  | 	the.monitorStart() | ||||
|  | } | ||||
| @ -0,0 +1,80 @@ | |||||
|  | package testUnit | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"encoding/json" | ||||
|  | 	"fmt" | ||||
|  | 	"goInOut/models" | ||||
|  | 	"goInOut/utils" | ||||
|  | 	"testing" | ||||
|  | 	"time" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | // 定义一个结构体,包含嵌套的结构体字段
 | ||||
|  | type Person struct { | ||||
|  | 	Name Name `json:"name"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | type Name struct { | ||||
|  | 	First string `json:"first"` | ||||
|  | 	Last  string `json:"last"` | ||||
|  | } | ||||
|  | 
 | ||||
|  | func Test_httpProxy(t *testing.T) { | ||||
|  | 
 | ||||
|  | } | ||||
|  | 
 | ||||
|  | func Test_template(t *testing.T) { | ||||
|  | 	// 创建一个Person实例
 | ||||
|  | 	person := Person{ | ||||
|  | 		Name: Name{ | ||||
|  | 			First: "John", | ||||
|  | 			Last:  "Doe", | ||||
|  | 		}, | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	templateStr := "Hello, {{.Name.First}}   !" | ||||
|  | 	sbs, err := utils.TextTemplateMatch(person, templateStr) | ||||
|  | 	println(sbs) | ||||
|  | 	if err != nil { | ||||
|  | 		fmt.Println(err.Error()) | ||||
|  | 	} | ||||
|  | } | ||||
|  | 
 | ||||
|  | func Test_timeHandler(t *testing.T) { | ||||
|  | 	rawMsg := `{ | ||||
|  |                 "userId": "ce2d7eb2-e56e-422e-8bbe-95dfa18e32f8", | ||||
|  |                 "thingId": "14862308-083e-46e1-a422-d7d6a1e2825d", | ||||
|  |                 "dimensionId": "47386f69-c5aa-4ae7-b6cc-0490a1dc0b14", | ||||
|  |                 "dimCapId": "0d561c0b-4dca-4104-abc0-1f0c40a71382", | ||||
|  |                 "capId": "d4965875-354b-4294-87f4-c4ba9f9260ab", | ||||
|  |                 "deviceId": "9c43a09c-3c65-42d3-9a54-42b87e0e5af2", | ||||
|  |                 "scheduleId": "1cfebf18-81a2-489e-bcfb-efc294d8ce3d", | ||||
|  |                 "taskId": "b58858ed-9e23-4ac9-9f9c-44f9e057aee9", | ||||
|  |                 "jobId": 1, | ||||
|  |                 "jobRepeatId": 1, | ||||
|  |                 "triggerTime": "2024-12-04T18:23:04+16:00", | ||||
|  |                 "realTime": "0001-01-01T00:00:00Z", | ||||
|  |                 "finishTime": "2024-12-04T10:23:07.909922675+08:00", | ||||
|  |                 "seq": 0, | ||||
|  |                 "released": false, | ||||
|  |                 "data": { | ||||
|  |                     "type": 1, | ||||
|  |                     "data": { | ||||
|  |                         "physicalvalue": 0 | ||||
|  |                     }, | ||||
|  |                     "result": { | ||||
|  |                         "code": 0, | ||||
|  |                         "msg": "", | ||||
|  |                         "detail": null, | ||||
|  |                         "errTimes": 0, | ||||
|  |                         "dropped": false | ||||
|  |                     } | ||||
|  |                 } | ||||
|  |             }` | ||||
|  | 
 | ||||
|  | 	iotaData := models.IotaData{} | ||||
|  | 	json.Unmarshal([]byte(rawMsg), &iotaData) | ||||
|  | 	var cstZone = time.FixedZone("CST", 8*3600) // 东八区
 | ||||
|  | 	time := iotaData.TriggerTime.In(cstZone).Format("2006-01-02T15:04:05.000+0800") | ||||
|  | 	println(time) | ||||
|  | } | ||||
								
									
										File diff suppressed because one or more lines are too long
									
								
							
						
					| @ -0,0 +1,53 @@ | |||||
|  | package utils | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"fmt" | ||||
|  | 	"log" | ||||
|  | 	"os" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | func SaveCache2File(cacheStr string, fileName string) error { | ||||
|  | 	// 打开文件,如果文件不存在则创建
 | ||||
|  | 	file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) | ||||
|  | 	if err != nil { | ||||
|  | 		log.Println("Error opening file:", err) | ||||
|  | 		return err | ||||
|  | 	} | ||||
|  | 	defer file.Close() | ||||
|  | 
 | ||||
|  | 	// 将变量写入文件
 | ||||
|  | 	_, err = file.WriteString(cacheStr) | ||||
|  | 	if err != nil { | ||||
|  | 		log.Println("Error writing to file:", err) | ||||
|  | 	} | ||||
|  | 	return err | ||||
|  | 
 | ||||
|  | } | ||||
|  | 
 | ||||
|  | func ReadCache2File(fileName string) (string, error) { | ||||
|  | 	// 打开文件
 | ||||
|  | 	file, err := os.Open(fileName) | ||||
|  | 	if err != nil { | ||||
|  | 		log.Println("Error opening file:", err) | ||||
|  | 		return "", err | ||||
|  | 	} | ||||
|  | 	defer file.Close() | ||||
|  | 
 | ||||
|  | 	// 读取文件内容
 | ||||
|  | 	var content string | ||||
|  | 	_, err = fmt.Fscanf(file, "%s", &content) | ||||
|  | 	if err != nil { | ||||
|  | 		log.Println("Error reading from file:", err) | ||||
|  | 	} | ||||
|  | 	return content, err | ||||
|  | } | ||||
|  | 
 | ||||
|  | func FileExists(filePath string) bool { | ||||
|  | 	_, err := os.Stat(filePath) | ||||
|  | 	if err != nil { | ||||
|  | 		if os.IsNotExist(err) { | ||||
|  | 			return false | ||||
|  | 		} | ||||
|  | 	} | ||||
|  | 	return true | ||||
|  | } | ||||
| @ -0,0 +1,22 @@ | |||||
|  | package utils | ||||
|  | 
 | ||||
|  | import ( | ||||
|  | 	"strings" | ||||
|  | 	"text/template" | ||||
|  | ) | ||||
|  | 
 | ||||
|  | func TextTemplateMatch(obj any, textTemplate string) (string, error) { | ||||
|  | 	// 定义模板字符串
 | ||||
|  | 	tmpl, err := template.New("template").Parse(textTemplate) | ||||
|  | 	if err != nil { | ||||
|  | 		panic(err) | ||||
|  | 	} | ||||
|  | 
 | ||||
|  | 	sb := strings.Builder{} | ||||
|  | 	err = tmpl.Execute(&sb, obj) | ||||
|  | 	if err != nil { | ||||
|  | 		return "", err | ||||
|  | 	} | ||||
|  | 	sbs := sb.String() | ||||
|  | 	return sbs, err | ||||
|  | } | ||||
					Loading…
					
					
				
		Reference in new issue