97 changed files with 2672 additions and 354 deletions
			
			
		| @ -0,0 +1 @@ | |||
| /logs/ | |||
| @ -1,19 +1,21 @@ | |||
| # goUpload | |||
| # goInOut | |||
| ## 功能说明: | |||
| 功能:用于设备采集数据上报第三方。   | |||
| 功能:用于数据的输入输出处理,可应对各中数据处理场景,典型的设备采集数据上报第三方。   | |||
| 数据入口->各类本地采集软件 | |||
| 数据出口->各类第三方数据接口 | |||
| 部署环境:用于windows环境 | |||
| 部署环境:用于windows环境或k8s 镜像部署 | |||
| 
 | |||
| ## 启用配置: | |||
| goUpload.exe + configFiles目录下的json配置 | |||
| app.exe + configFiles目录下的json配置 | |||
| (configFiles目录下有几个配置就会生效几个功能,实际部署时,不相干的项目配置不要放进去) | |||
| 
 | |||
| 支持-软件列表: | |||
| -------------------- | |||
| **统一采集软件** | |||
| **统一采集软件_mqtt** | |||
| 
 | |||
| **DAAS振动软件** | |||
| **DAAS振动软件_mqtt** | |||
| 
 | |||
| **称重软件** | |||
| **称重软件_mqtt** | |||
| **中科-光电txt** | |||
| 
 | |||
| **kafka** | |||
|  | |||
| @ -0,0 +1,84 @@ | |||
| package adaptors | |||
| 
 | |||
| import ( | |||
| 	"bytes" | |||
| 	"encoding/binary" | |||
| 	"encoding/json" | |||
| 	"fmt" | |||
| 	"goInOut/models" | |||
| 	"goInOut/utils" | |||
| 	"log" | |||
| 	"time" | |||
| ) | |||
| 
 | |||
| // Adaptor_GDND2LA_JSNCGLQL   光电挠度2路安数据 转换 江苏农村公路桥梁监测系统
 | |||
| type Adaptor_GDND2LA_JSNCGLQL struct { | |||
| 	IdMap      map[string]string | |||
| 	BridgeCode string | |||
| } | |||
| 
 | |||
| func (the Adaptor_GDND2LA_JSNCGLQL) Transform(inTopic, rawMsg string) []NeedPush { | |||
| 	gdnd := models.GDND2luAn{} | |||
| 	json.Unmarshal([]byte(rawMsg), &gdnd) | |||
| 	return the.RawToJSNCGLQL(gdnd) | |||
| } | |||
| 
 | |||
| func (the Adaptor_GDND2LA_JSNCGLQL) RawToJSNCGLQL(raw models.GDND2luAn) (result []NeedPush) { | |||
| 	Atime, err := time.Parse("2006-01-02 15:04:05", raw.Time) | |||
| 	if err != nil { | |||
| 		log.Printf("光电扰度 设备[%s] 数据时间 %s 解析错误", raw.Id, raw.Time) | |||
| 		return | |||
| 	} | |||
| 
 | |||
| 	sensorDataList := raw.Value[raw.Id] | |||
| 	//获取对方系统 id
 | |||
| 	sensorCode := the.getSensorCode(raw.Id) | |||
| 	if sensorCode == "" { | |||
| 		log.Printf("光电扰度 设备[%s] 无匹配的 江苏农村公路桥梁监测系统 测点id,请检查配置文件", raw.Id) | |||
| 		return | |||
| 	} | |||
| 
 | |||
| 	topic := fmt.Sprintf("data/%s/%s", the.BridgeCode, sensorCode) | |||
| 
 | |||
| 	var transBytes []byte | |||
| 	//添加时间段
 | |||
| 	transBytes = append(transBytes, the.getTimeBytes(Atime)...) | |||
| 
 | |||
| 	for _, sensorData := range sensorDataList { | |||
| 
 | |||
| 		//添加数据值段
 | |||
| 		bs := utils.Float32ToBytes(sensorData) | |||
| 		transBytes = append(transBytes, bs...) | |||
| 	} | |||
| 	result = append(result, NeedPush{ | |||
| 		Topic:   topic, | |||
| 		Payload: transBytes, | |||
| 	}) | |||
| 
 | |||
| 	return result | |||
| } | |||
| func (the Adaptor_GDND2LA_JSNCGLQL) getSensorCode(rawSensorName string) string { | |||
| 	v, isValid := the.IdMap[rawSensorName] | |||
| 	if !isValid { | |||
| 		v = "" | |||
| 	} | |||
| 	return v | |||
| } | |||
| 
 | |||
| func (the Adaptor_GDND2LA_JSNCGLQL) getTimeBytes(sensorTime time.Time) []byte { | |||
| 
 | |||
| 	year := uint16(sensorTime.Year()) | |||
| 	month := int8(sensorTime.Month()) | |||
| 	day := int8(sensorTime.Day()) | |||
| 	hour := int8(sensorTime.Hour()) | |||
| 	minute := int8(sensorTime.Minute()) | |||
| 	second := int8(sensorTime.Second()) | |||
| 	bytesBuffer := bytes.NewBuffer([]byte{}) | |||
| 	binary.Write(bytesBuffer, binary.BigEndian, year) | |||
| 	binary.Write(bytesBuffer, binary.BigEndian, month) | |||
| 	binary.Write(bytesBuffer, binary.BigEndian, day) | |||
| 	binary.Write(bytesBuffer, binary.BigEndian, hour) | |||
| 	binary.Write(bytesBuffer, binary.BigEndian, minute) | |||
| 	binary.Write(bytesBuffer, binary.BigEndian, second) | |||
| 	return bytesBuffer.Bytes() | |||
| } | |||
| @ -0,0 +1,47 @@ | |||
| package adaptors | |||
| 
 | |||
| import ( | |||
| 	"encoding/json" | |||
| 	"fmt" | |||
| 	"goInOut/consumers/SinoGnssMySQL" | |||
| 	"strconv" | |||
| ) | |||
| 
 | |||
| // Adaptor_SINOMYSQL_AXYMQTT  数据 转换 江苏农村公路桥梁监测系统
 | |||
| type Adaptor_SINOMYSQL_AXYMQTT struct { | |||
| } | |||
| 
 | |||
| func (the Adaptor_SINOMYSQL_AXYMQTT) Transform(gnssDataList []SinoGnssMySQL.GnssData) []NeedPush { | |||
| 	var needPush []NeedPush | |||
| 	allDxFiles := make(map[string][]SinoGnssMySQL.DxFile) | |||
| 	for _, gnssData := range gnssDataList { | |||
| 		OnceDxFiles := allDxFiles[gnssData.GroupName] | |||
| 		OnceDxFiles = append(OnceDxFiles, SinoGnssMySQL.DxFile{ | |||
| 			Module:   gnssData.StationName, | |||
| 			Channel:  1, | |||
| 			Timespan: gnssData.Time.UnixMilli(), | |||
| 			//file_mqtt协议里面只解析RV,  m=> mm
 | |||
| 			RawValue:   []float64{gnssData.X * 1000, gnssData.Y * 1000, gnssData.H * 1000}, | |||
| 			LimitValue: []float64{}, | |||
| 			PhyValue:   []float64{}, | |||
| 			ThemeValue: []float64{}, | |||
| 		}) | |||
| 		allDxFiles[gnssData.GroupName] = OnceDxFiles | |||
| 	} | |||
| 
 | |||
| 	for groupName, groupFileList := range allDxFiles { | |||
| 		contentMap := make(map[string]string) | |||
| 		for i, file := range groupFileList { | |||
| 			bs, _ := json.Marshal(file) | |||
| 			contentMap[strconv.Itoa(i)] = string(bs) | |||
| 		} | |||
| 		gpContent, _ := json.Marshal(contentMap) | |||
| 		topic := fmt.Sprintf("SinoGnss/%s/", groupName) | |||
| 		needPush = append(needPush, NeedPush{ | |||
| 			Topic:   topic, | |||
| 			Payload: gpContent, | |||
| 		}) | |||
| 	} | |||
| 
 | |||
| 	return needPush | |||
| } | |||
| @ -0,0 +1,148 @@ | |||
| package adaptors | |||
| 
 | |||
| import ( | |||
| 	"encoding/json" | |||
| 	"fmt" | |||
| 	"goInOut/consumers/AXYraw" | |||
| 	"goInOut/dbOperate" | |||
| 	"goInOut/models" | |||
| 	"log" | |||
| 	"sync" | |||
| 	"time" | |||
| ) | |||
| 
 | |||
| // Adaptor_AXY_LastRAW  安心云 kafka iota数据 转换 es设备数据
 | |||
| type Adaptor_AXY_LastRAW struct { | |||
| 	AXYraw.Info | |||
| 	Redis *dbOperate.RedisHelper | |||
| } | |||
| 
 | |||
| func (the Adaptor_AXY_LastRAW) Transform(topic, rawMsg string) *models.EsRaw { | |||
| 	iotaData := models.IotaData{} | |||
| 	json.Unmarshal([]byte(rawMsg), &iotaData) | |||
| 	return the.raw2es(iotaData) | |||
| } | |||
| 
 | |||
| func (the Adaptor_AXY_LastRAW) raw2es(iotaData models.IotaData) *models.EsRaw { | |||
| 	if len(iotaData.Data.Data) == 0 { | |||
| 		return nil | |||
| 	} | |||
| 
 | |||
| 	if !iotaData.Data.Success() { | |||
| 		msg, _ := json.Marshal(iotaData.Data.Result) | |||
| 		iotaData.Data.Data["errMsg"] = string(msg) | |||
| 	} | |||
| 	//log.Printf("设备[%s] 数据时间 %s", iotaData.DeviceId, iotaData.TriggerTime)
 | |||
| 	deviceInfo := the.GetDeviceInfo(iotaData.DeviceId) | |||
| 
 | |||
| 	//查不到信息的数据
 | |||
| 	if deviceInfo.Name == "" { | |||
| 		log.Printf("设备[%s] 无deviceInfo信息 %s", iotaData.DeviceId, iotaData.TriggerTime) | |||
| 		return nil | |||
| 	} | |||
| 
 | |||
| 	dataType := "" | |||
| 	if _dataType, ok := iotaData.Data.Data["_data_type"]; ok { | |||
| 		if v, ok := _dataType.(string); ok { | |||
| 			dataType = v | |||
| 		} | |||
| 	} | |||
| 	devdata := &models.DeviceData{ | |||
| 		DeviceId:    iotaData.DeviceId, | |||
| 		Name:        deviceInfo.Name, | |||
| 		ThingId:     iotaData.ThingId, | |||
| 		StructId:    deviceInfo.Structure.Id, | |||
| 		AcqTime:     iotaData.TriggerTime, | |||
| 		RealTime:    iotaData.RealTime, | |||
| 		ErrCode:     0, | |||
| 		Raw:         iotaData.Data.Data, | |||
| 		DeviceInfo:  deviceInfo, | |||
| 		DimensionId: iotaData.DimensionId, | |||
| 		DataType:    dataType, | |||
| 	} | |||
| 	EsRaws := toEsRaw(devdata) | |||
| 	return EsRaws | |||
| } | |||
| 
 | |||
| var deviceInfoMap = map[string]models.DeviceInfo{} | |||
| var deviceInfoMap2 = sync.Map{} | |||
| 
 | |||
| func (the Adaptor_AXY_LastRAW) GetDeviceInfo(deviceId string) models.DeviceInfo { | |||
| 	if vObj, ok := deviceInfoMap2.Load(deviceId); ok { | |||
| 		if v, ok := vObj.(models.DeviceInfo); ok { | |||
| 			durationMin := time.Now().Sub(v.RefreshTime).Minutes() | |||
| 			if durationMin < 5 { | |||
| 				return v | |||
| 			} | |||
| 		} | |||
| 	} | |||
| 	v, err := the.GetDeviceInfoFromRedis(deviceId) | |||
| 	if err != nil { | |||
| 		log.Printf("设备%s 无法查询到对应信息", deviceId) | |||
| 	} | |||
| 	//deviceInfoMap[deviceId] = v
 | |||
| 	deviceInfoMap2.Store(deviceId, v) | |||
| 	return v | |||
| } | |||
| 
 | |||
| func (the Adaptor_AXY_LastRAW) GetDeviceInfoFromRedis(deviceId string) (models.DeviceInfo, error) { | |||
| 	Key_Iota_device := "iota_device" | |||
| 	key_Thing_struct := "thing_struct" | |||
| 	key_Iota_meta := "iota_meta" | |||
| 	k1 := fmt.Sprintf("%s:%s", Key_Iota_device, deviceId) | |||
| 	dev := models.IotaDevice{} | |||
| 	thingStruct := models.ThingStruct{} | |||
| 	devMeta := models.DeviceMeta{} | |||
| 	err1 := the.Redis.GetObj(k1, &dev) | |||
| 	if err1 != nil { | |||
| 		return models.DeviceInfo{RefreshTime: time.Now()}, err1 | |||
| 	} | |||
| 	k2 := fmt.Sprintf("%s:%s", key_Thing_struct, dev.ThingId) | |||
| 	err2 := the.Redis.GetObj(k2, &thingStruct) | |||
| 	if err2 != nil { | |||
| 		return models.DeviceInfo{RefreshTime: time.Now()}, err2 | |||
| 	} | |||
| 	k3 := fmt.Sprintf("%s:%s", key_Iota_meta, dev.DeviceMeta.Id) | |||
| 	err3 := the.Redis.GetObj(k3, &devMeta) | |||
| 	if err3 != nil { | |||
| 		return models.DeviceInfo{RefreshTime: time.Now()}, err3 | |||
| 	} | |||
| 	//if err1 != nil || err2 != nil || err3 != nil {
 | |||
| 	//	log.Printf("redis读取异常,err1=%s, err2=%s, err3=%s", err1, err2, err3)
 | |||
| 	//	return models.DeviceInfo{}
 | |||
| 	//}
 | |||
| 
 | |||
| 	s := models.Structure{ | |||
| 		ThingId: thingStruct.ThingId, | |||
| 		Id:      thingStruct.Id, | |||
| 		Name:    thingStruct.Name, | |||
| 		OrgId:   thingStruct.OrgId, | |||
| 	} | |||
| 	return models.DeviceInfo{ | |||
| 		Id:          deviceId, | |||
| 		Name:        dev.Name, | |||
| 		Structure:   s, | |||
| 		DeviceMeta:  devMeta, | |||
| 		RefreshTime: time.Now(), | |||
| 	}, nil | |||
| } | |||
| 
 | |||
| func toEsRaw(deviceData *models.DeviceData) *models.EsRaw { | |||
| 
 | |||
| 	if deviceData == nil { | |||
| 		return nil | |||
| 	} | |||
| 	var cstZone = time.FixedZone("CST", 8*3600) // 东八区 用以解决
 | |||
| 	dataOutMeta := deviceData.DeviceInfo.DeviceMeta.GetOutputProps() | |||
| 	createNativeRaw := models.EsRaw{ | |||
| 		StructId:       deviceData.StructId, | |||
| 		IotaDeviceName: deviceData.Name, | |||
| 		Data:           deviceData.Raw, | |||
| 		CollectTime:    deviceData.AcqTime.In(cstZone).Format("2006-01-02T15:04:05.000+0800"), | |||
| 		Meta:           dataOutMeta, | |||
| 		IotaDevice:     deviceData.DeviceId, | |||
| 		CreateTime:     time.Now(), | |||
| 	} | |||
| 
 | |||
| 	return &createNativeRaw | |||
| } | |||
| @ -0,0 +1,132 @@ | |||
| package adaptors | |||
| 
 | |||
| import ( | |||
| 	"bytes" | |||
| 	"encoding/binary" | |||
| 	"encoding/json" | |||
| 	"fmt" | |||
| 	"goInOut/models" | |||
| 	"goInOut/utils" | |||
| 	"log" | |||
| 	"strconv" | |||
| 	"strings" | |||
| 	"time" | |||
| ) | |||
| 
 | |||
| // Adaptor_ZD_JSNCGLQL  数据 转换 江苏农村公路桥梁监测系统
 | |||
| type Adaptor_ZD_JSNCGLQL struct { | |||
| 	IdMap      map[string]string | |||
| 	BridgeCode string | |||
| } | |||
| 
 | |||
| func (the Adaptor_ZD_JSNCGLQL) Transform(inTopic, rawMsg string) []NeedPush { | |||
| 	zd := models.ZD{} | |||
| 	err := json.Unmarshal([]byte(rawMsg), &zd) | |||
| 	if err != nil { | |||
| 		return nil | |||
| 	} | |||
| 	lowerTopic := strings.ToLower(inTopic) | |||
| 	if strings.Contains(lowerTopic, "zdsl") || | |||
| 		strings.Contains(lowerTopic, "cableforce") { | |||
| 		return the.ZDSLtoJSNCGLQL(zd) | |||
| 	} | |||
| 
 | |||
| 	return the.ZDtoJSNCGLQL(zd) | |||
| } | |||
| 
 | |||
| func (the Adaptor_ZD_JSNCGLQL) ZDtoJSNCGLQL(zd models.ZD) (result []NeedPush) { | |||
| 	Atime := time.UnixMilli(zd.Ticks) | |||
| 
 | |||
| 	sensorDataList := zd.AccValues | |||
| 	//获取对方系统 id
 | |||
| 	sensorCode := the.getSensorCode(zd.Module) | |||
| 	if sensorCode == "" { | |||
| 		log.Printf("振动 设备[%s] 无匹配的 江苏农村公路桥梁监测系统 测点id,请检查配置文件", zd.Module) | |||
| 		return | |||
| 	} | |||
| 
 | |||
| 	topic := fmt.Sprintf("data/%s/%s", the.BridgeCode, sensorCode) | |||
| 	//数据秒数
 | |||
| 	seconds := len(zd.AccValues) / zd.Frequency | |||
| 
 | |||
| 	for i := 0; i < seconds; i++ { | |||
| 		var transBytes []byte | |||
| 		onceTime := Atime.Add(time.Duration(i) * time.Second) | |||
| 		//1.添加时间段
 | |||
| 		transBytes = append(transBytes, the.getTimeBytes(onceTime)...) | |||
| 
 | |||
| 		startIndex := (0 + i) * zd.Frequency | |||
| 		endIndex := (1 + i) * zd.Frequency | |||
| 		if endIndex > len(sensorDataList) { | |||
| 			endIndex = len(sensorDataList) | |||
| 		} | |||
| 		subDataList := sensorDataList[startIndex:endIndex] | |||
| 
 | |||
| 		for _, sensorData := range subDataList { | |||
| 
 | |||
| 			//4.添加数据值段
 | |||
| 			bs := utils.Float32ToBytes(sensorData) | |||
| 			transBytes = append(transBytes, bs...) | |||
| 		} | |||
| 		result = append(result, NeedPush{ | |||
| 			Topic:   topic, | |||
| 			Payload: transBytes, | |||
| 		}) | |||
| 	} | |||
| 
 | |||
| 	return result | |||
| } | |||
| func (the Adaptor_ZD_JSNCGLQL) ZDSLtoJSNCGLQL(zd models.ZD) (result []NeedPush) { | |||
| 	Atime := time.UnixMilli(zd.Ticks) | |||
| 
 | |||
| 	sensorDataList := zd.ThemeValue | |||
| 	//获取对方系统 id
 | |||
| 	sensorCode := the.getSensorCode(strconv.Itoa(zd.SensorId)) | |||
| 	if sensorCode == "" { | |||
| 		log.Printf("振动索力 设备[%s] 无匹配的 江苏农村公路桥梁监测系统 测点id,请检查配置文件", zd.Module) | |||
| 		return | |||
| 	} | |||
| 
 | |||
| 	topic := fmt.Sprintf("data/%s/%s", the.BridgeCode, sensorCode) | |||
| 
 | |||
| 	var transBytes []byte | |||
| 	//1.添加时间段
 | |||
| 	transBytes = append(transBytes, the.getTimeBytes(Atime)...) | |||
| 
 | |||
| 	//数据值段
 | |||
| 	bs := utils.Float32ToBytes(sensorDataList[0]) | |||
| 	transBytes = append(transBytes, bs...) | |||
| 
 | |||
| 	result = append(result, NeedPush{ | |||
| 		Topic:   topic, | |||
| 		Payload: transBytes, | |||
| 	}) | |||
| 
 | |||
| 	return result | |||
| } | |||
| 
 | |||
| func (the Adaptor_ZD_JSNCGLQL) getSensorCode(rawSensorName string) string { | |||
| 	v, isValid := the.IdMap[rawSensorName] | |||
| 	if !isValid { | |||
| 		v = "" | |||
| 	} | |||
| 	return v | |||
| } | |||
| 
 | |||
| func (the Adaptor_ZD_JSNCGLQL) getTimeBytes(sensorTime time.Time) []byte { | |||
| 
 | |||
| 	year := uint16(sensorTime.Year()) | |||
| 	month := int8(sensorTime.Month()) | |||
| 	day := int8(sensorTime.Day()) | |||
| 	hour := int8(sensorTime.Hour()) | |||
| 	minute := int8(sensorTime.Minute()) | |||
| 	second := int8(sensorTime.Second()) | |||
| 	bytesBuffer := bytes.NewBuffer([]byte{}) | |||
| 	binary.Write(bytesBuffer, binary.BigEndian, year) | |||
| 	binary.Write(bytesBuffer, binary.BigEndian, month) | |||
| 	binary.Write(bytesBuffer, binary.BigEndian, day) | |||
| 	binary.Write(bytesBuffer, binary.BigEndian, hour) | |||
| 	binary.Write(bytesBuffer, binary.BigEndian, minute) | |||
| 	binary.Write(bytesBuffer, binary.BigEndian, second) | |||
| 	return bytesBuffer.Bytes() | |||
| } | |||
| @ -0,0 +1,4 @@ | |||
| FROM registry.ngaiot.com/base-images/alpine_3.20_cst:7 | |||
| WORKDIR /app/ | |||
| COPY   *.exe  configFiles  /app/ | |||
| CMD ["/app/app.exe"] | |||
| @ -1,24 +1,40 @@ | |||
| podTemplate { | |||
|     node('pod-templ-jenkins-slave-golang') { | |||
| 
 | |||
| 		 | |||
| 		env.IMAGE_NAME = "${IOT_IMAGES_REGISTRY}/${LOCAL}/${JOB_NAME}" | |||
| 		env.IMAGE_NAME_SHORT = "${LOCAL}/${JOB_NAME}" | |||
| 		env.CODE_ADDR = "https://gitea.anxinyun.cn/lucas2/goUpload.git" | |||
| 
 | |||
| 	    stage('Run shell') { | |||
| 		 | |||
| 	    stage('Run shell') {  | |||
| 		    git branch: 'dev', credentialsId: 'gitea-builder', url: "${CODE_ADDR}" | |||
| 
 | |||
| 		    container('golang-builder-1-23') { | |||
| 				sh''' | |||
| 				    echo "当前目===" | |||
| 				    pwd | |||
| 				    ls | |||
| 				    echo "========" | |||
| 				    /kaniko/executor --context=${BUILD_WORKSPACE} --dockerfile=build/Dockerfile --destination=${IMAGE_NAME}:${IMAGE_VERSION} --cache=false --cleanup | |||
| 				    git version | |||
| 				    git config --global --add url."https://builder:Fs7595!EAT@gitea.anxinyun.cn/".insteadOf "https://gitea.anxinyun.cn/" | |||
| 				    unset GOPROXY | |||
| 				    go env -w GOPROXY=https://goproxy.cn,direct | |||
| 				    go env -w GO111MODULE=on | |||
| 				    go env -w GOPRIVATE=gitea.ngaiot.com,gitea.anxinyun.cn | |||
| 				    go env -w GOSUMDB=sum.golang.org | |||
| 				    go env | |||
|                     go build  -a -v -o app.exe main.go | |||
|                     tar -cvf app.tar *.exe configFiles | |||
| 			    ''' | |||
| 		    } | |||
| 
 | |||
| 		     container('image-builder') { | |||
|             	sh''' | |||
|             		 echo "当前目===" | |||
|             		 pwd | |||
|             		 ls | |||
|             		 echo "========" | |||
|             		 /kaniko/executor --context=${BUILD_WORKSPACE} --dockerfile=build/Dockerfile_app --destination=${IMAGE_NAME}:${IMAGE_VERSION} --cache=false --cleanup | |||
|             	''' | |||
|             } | |||
|             archiveArtifacts artifacts: 'app.tar', followSymlinks: false | |||
| 
 | |||
| 		    buildName "${IMAGE_NAME_SHORT}:${IMAGE_VERSION}" | |||
| 		    buildDescription "${IMAGE_NAME}:${IMAGE_VERSION}" | |||
| 		    buildDescription "${IMAGE_NAME}:${IMAGE_VERSION}"			 | |||
| 		} | |||
| 	} | |||
| } | |||
| @ -0,0 +1,24 @@ | |||
| { | |||
|   "consumer": "consumerSinoGnssMySQL", | |||
|   "ioConfig": { | |||
|     "in": { | |||
|       "db": { | |||
|         "type": "mysql", | |||
|         "connStr": "root:Xuchen@2024@tcp(39.105.5.154:3306)/navi_cloud_sinognss?charset=utf8&parseTime=true" | |||
|       }, | |||
|       "cronStr": "0/1 * * * *" | |||
|     }, | |||
|     "out": { | |||
|       "mqtt": { | |||
|         "host": "10.8.30.160", | |||
|         "port": 30883, | |||
|         "userName": "upload", | |||
|         "password": "", | |||
|         "clientId": "goInOut_SinoGnssMySQL", | |||
|         "Topics": [] | |||
|       } | |||
|     } | |||
|   }, | |||
|   "info": { | |||
|   } | |||
| } | |||
| @ -0,0 +1,36 @@ | |||
| { | |||
|   "consumer": "consumerAXYraw", | |||
|   "ioConfig": { | |||
|     "in": { | |||
|       "kafka": { | |||
|         "brokers": [ | |||
|           "10.8.30.160:30992" | |||
|         ], | |||
|         "groupId": "synchronizeRaw_50", | |||
|         "topics": [ | |||
|           "RawData" | |||
|         ] | |||
|       } | |||
|     }, | |||
|     "out": { | |||
|       "es": { | |||
|         "address": ["http://10.8.30.160:30092"], | |||
|         "index": "anxincloud_last_raw", | |||
|         "auth": { | |||
|           "userName": "post", | |||
|           "password": "123" | |||
|         }, | |||
|         "interval": 30 | |||
|       } | |||
|     } | |||
|   }, | |||
|   "info": { | |||
|     "common": { | |||
|     }, | |||
|     "queryComponent":{ | |||
|       "redis": { | |||
|         "address": "10.8.30.142:30379" | |||
|       } | |||
|     } | |||
|   } | |||
| } | |||
| @ -0,0 +1,30 @@ | |||
| { | |||
|   "consumer": "consumerHttpProxy", | |||
|   "ioConfig": { | |||
|     "in": { | |||
|       "apiServer": { | |||
|         "port": 7000, | |||
|         "userName": "usr", | |||
|         "password": "123", | |||
|         "routers": [ | |||
|           { | |||
|             "router": "POST /upload/work/ABStatus", | |||
|             "idTemplate": "{{.values.ucId}}" | |||
|           }, | |||
|           { | |||
|             "router": "POST /upload/work/ABSHeart", | |||
|             "idTemplate": "{{.Values.WorkId}}" | |||
|           } | |||
|         ] | |||
|       } | |||
|     }, | |||
|     "out": { | |||
|       "httpPost": { | |||
|         "url": "http://218.3.126.49:60000", | |||
|         "method": "post" | |||
|       } | |||
|     } | |||
|   }, | |||
|   "info": { | |||
|   } | |||
| } | |||
| @ -1 +0,0 @@ | |||
| package constKey | |||
| @ -0,0 +1,31 @@ | |||
| package AXYraw | |||
| 
 | |||
| import "goInOut/config" | |||
| 
 | |||
| type ConfigFile struct { | |||
| 	config.Consumer | |||
| 	IoConfig ioConfig `json:"ioConfig"` | |||
| 	Info     Info     `json:"info"` | |||
| } | |||
| type ioConfig struct { | |||
| 	In  in  `json:"in"` | |||
| 	Out out `json:"out"` | |||
| } | |||
| type in struct { | |||
| 	Kafka config.KafkaConfig `json:"kafka"` | |||
| } | |||
| 
 | |||
| type out struct { | |||
| 	Es config.EsConfig `json:"es"` | |||
| } | |||
| 
 | |||
| type Info struct { | |||
| 	Common         map[string]string `json:"common"` | |||
| 	QueryComponent queryComponent    `json:"queryComponent"` | |||
| } | |||
| 
 | |||
| type queryComponent struct { | |||
| 	Redis struct { | |||
| 		Address string `json:"address"` | |||
| 	} `json:"redis"` | |||
| } | |||
| @ -0,0 +1,25 @@ | |||
| package HTTP_PRPXY | |||
| 
 | |||
| import "goInOut/config" | |||
| 
 | |||
| type ConfigFile struct { | |||
| 	config.Consumer | |||
| 	IoConfig  ioConfig          `json:"ioConfig"` | |||
| 	OtherInfo map[string]string `json:"info"` | |||
| } | |||
| type ioConfig struct { | |||
| 	In  In  `json:"in"` | |||
| 	Out OUT `json:"out"` | |||
| } | |||
| type In struct { | |||
| 	ApiServer config.ApiServerConfig `json:"apiServer"` | |||
| } | |||
| 
 | |||
| type OUT struct { | |||
| 	HttpPost config.HttpConfig `json:"httpPost"` | |||
| } | |||
| 
 | |||
| type SensorInfo struct { | |||
| 	Name string `json:"name"` //测点名称
 | |||
| 	Code string `json:"code"` //测点编号 宜由“桥名简称-监测类别简称-构件类型编码-截面序号-构件序号-测点编号”组成
 | |||
| } | |||
| @ -0,0 +1,44 @@ | |||
| package HTTP_PRPXY | |||
| 
 | |||
| type ABStatus struct { | |||
| 	Values ABStatusValues `json:"values"` | |||
| 	Secret string         `json:"secret"` | |||
| 	Type   string         `json:"type"` | |||
| } | |||
| 
 | |||
| type ABStatusValues struct { | |||
| 	CreateTime            int64  `json:"createTime"` | |||
| 	Updater               string `json:"updater"` | |||
| 	Deleted               bool   `json:"deleted"` | |||
| 	Id                    string `json:"id"` | |||
| 	WorkId                string `json:"workId"` | |||
| 	UcId                  string `json:"ucId"` | |||
| 	DeviceCode            string `json:"deviceCode"` | |||
| 	UserId                string `json:"userId"` | |||
| 	UserName              string `json:"userName"` | |||
| 	DataTime              int64  `json:"dataTime"` | |||
| 	BeltStatus            string `json:"beltStatus"` | |||
| 	DeviceStatus          string `json:"deviceStatus"` | |||
| 	StartDate             int64  `json:"startDate"` | |||
| 	EndDate               int64  `json:"endDate"` | |||
| 	MainVol               int    `json:"mainVol"` | |||
| 	AscendWorkCertificate string `json:"ascend_work_certificate"` | |||
| 	BeltPoseLeft          string `json:"beltPoseLeft"` | |||
| 	BeltPoseRight         string `json:"beltPoseRight"` | |||
| 	BeltStatusLeft        string `json:"beltStatusLeft"` | |||
| 	BeltStatusRight       string `json:"beltStatusRight"` | |||
| 	TickbVol              int    `json:"tickbVol"` | |||
| 	//Avatar                interface{} `json:"avatar"`
 | |||
| 	//AlarmType             interface{} `json:"alarmType"`
 | |||
| 	//Identity              interface{} `json:"identity"`
 | |||
| 	//IdentityImg           interface{} `json:"identityImg"`
 | |||
| 	//IdentityImgF          interface{} `json:"identityImgF"`
 | |||
| 	//Phone                 interface{} `json:"phone"`
 | |||
| 	Versions int `json:"versions"` | |||
| 	//Team                  interface{} `json:"team"`
 | |||
| 	Longitude string `json:"longitude"` | |||
| 	Latitude  string `json:"latitude"` | |||
| 	Outsource string `json:"outsource"` | |||
| 	LocTime   int64  `json:"locTime"` | |||
| 	//WorkLen               interface{} `json:"workLen"`
 | |||
| } | |||
| @ -0,0 +1,27 @@ | |||
| package SinoGnssMySQL | |||
| 
 | |||
| import "goInOut/config" | |||
| 
 | |||
| type ConfigFile struct { | |||
| 	config.Consumer | |||
| 	IoConfig  ioConfig          `json:"ioConfig"` | |||
| 	OtherInfo map[string]string `json:"info"` | |||
| } | |||
| type ioConfig struct { | |||
| 	In  In  `json:"in"` | |||
| 	Out OUT `json:"out"` | |||
| } | |||
| type In struct { | |||
| 	Db      config.DbConfig `json:"db"` | |||
| 	CronStr string          `json:"cronStr"` | |||
| } | |||
| 
 | |||
| type OUT struct { | |||
| 	Mqtt config.MqttConfig `json:"mqtt"` | |||
| } | |||
| 
 | |||
| // 缓存用
 | |||
| type RecordInfo struct { | |||
| 	Id        int64  `json:"id"` | |||
| 	TableName string `json:"table_name"` | |||
| } | |||
| @ -0,0 +1,28 @@ | |||
| package SinoGnssMySQL | |||
| 
 | |||
| import "time" | |||
| 
 | |||
| type GnssData struct { | |||
| 	Id          int64     `json:"id" db:"id"` | |||
| 	StationName string    `json:"station_name" db:"station_name"` | |||
| 	GroupName   string    `json:"group_name" db:"group_name"` | |||
| 	Time        time.Time `json:"time" db:"time"` | |||
| 	X           float64   `json:"x" db:"x"` | |||
| 	Y           float64   `json:"y" db:"y"` | |||
| 	H           float64   `json:"h" db:"h"` | |||
| } | |||
| 
 | |||
| type DxFile struct { | |||
| 	SensorId   int       `json:"S"` | |||
| 	Module     string    `json:"M"` | |||
| 	Channel    int       `json:"C"` | |||
| 	Error      int       `json:"R"` | |||
| 	Round      int64     `json:"N"` | |||
| 	Timespan   int64     `json:"T"` | |||
| 	Req        []string  `json:"Q"` | |||
| 	Acq        []string  `json:"A"` | |||
| 	RawValue   []float64 `json:"RV"` | |||
| 	LimitValue []float64 `json:"LV"` | |||
| 	PhyValue   []float64 `json:"PV"` | |||
| 	ThemeValue []float64 `json:"TV"` | |||
| } | |||
| @ -0,0 +1,162 @@ | |||
| package consumers | |||
| 
 | |||
| import ( | |||
| 	"encoding/json" | |||
| 	"goInOut/adaptors" | |||
| 	"goInOut/consumers/AXYraw" | |||
| 	"goInOut/dbOperate" | |||
| 	"goInOut/dbOperate/_kafka" | |||
| 	"goInOut/models" | |||
| 	"log" | |||
| 	"sync" | |||
| 	"time" | |||
| ) | |||
| 
 | |||
| type consumerAXYraw struct { | |||
| 	//数据缓存管道
 | |||
| 	dataCache chan *models.EsRaw | |||
| 	//具体配置
 | |||
| 	ConfigInfo AXYraw.ConfigFile | |||
| 	InKafka    _kafka.KafkaHelper | |||
| 	OutEs      dbOperate.ESHelper | |||
| 	infoRedis  *dbOperate.RedisHelper | |||
| 	sinkRawMap sync.Map | |||
| 	lock       sync.Mutex | |||
| } | |||
| 
 | |||
| func (the *consumerAXYraw) LoadConfigJson(cfgStr string) { | |||
| 	// 将 JSON 格式的数据解析到结构体中
 | |||
| 	err := json.Unmarshal([]byte(cfgStr), &the.ConfigInfo) | |||
| 	if err != nil { | |||
| 		log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) | |||
| 		panic(err) | |||
| 	} | |||
| } | |||
| 
 | |||
| func (the *consumerAXYraw) Initial(cfg string) error { | |||
| 	the.sinkRawMap = sync.Map{} | |||
| 	the.dataCache = make(chan *models.EsRaw, 200) | |||
| 
 | |||
| 	the.LoadConfigJson(cfg) | |||
| 	err := the.inputInitial() | |||
| 	if err != nil { | |||
| 		return err | |||
| 	} | |||
| 	err = the.outputInitial() | |||
| 	if err != nil { | |||
| 		return err | |||
| 	} | |||
| 	err = the.infoComponentInitial() | |||
| 	return err | |||
| } | |||
| func (the *consumerAXYraw) inputInitial() error { | |||
| 	//数据入口
 | |||
| 	the.InKafka = _kafka.KafkaHelper{ | |||
| 		Brokers: the.ConfigInfo.IoConfig.In.Kafka.Brokers, | |||
| 		GroupId: the.ConfigInfo.IoConfig.In.Kafka.GroupId, | |||
| 	} | |||
| 	the.InKafka.Initial() | |||
| 	for _, inTopic := range the.ConfigInfo.IoConfig.In.Kafka.Topics { | |||
| 		the.InKafka.Subscribe(inTopic, the.onData) | |||
| 	} | |||
| 
 | |||
| 	the.InKafka.Worker() | |||
| 	return nil | |||
| } | |||
| func (the *consumerAXYraw) outputInitial() error { | |||
| 	//数据出口
 | |||
| 	the.OutEs = *dbOperate.NewESHelper( | |||
| 		the.ConfigInfo.IoConfig.Out.Es.Address, | |||
| 		the.ConfigInfo.IoConfig.Out.Es.Auth.UserName, | |||
| 		the.ConfigInfo.IoConfig.Out.Es.Auth.Password, | |||
| 	) | |||
| 
 | |||
| 	return nil | |||
| } | |||
| 
 | |||
| func (the *consumerAXYraw) infoComponentInitial() error { | |||
| 	//数据出口
 | |||
| 	addr := the.ConfigInfo.Info.QueryComponent.Redis.Address | |||
| 	the.infoRedis = dbOperate.NewRedisHelper("", addr) | |||
| 
 | |||
| 	return nil | |||
| } | |||
| 
 | |||
| func (the *consumerAXYraw) sinkTask() { | |||
| 	intervalSec := the.ConfigInfo.IoConfig.Out.Es.Interval | |||
| 	ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) | |||
| 	defer ticker.Stop() | |||
| 	for { | |||
| 		<-ticker.C | |||
| 		the.toSink() | |||
| 	} | |||
| } | |||
| 
 | |||
| func (the *consumerAXYraw) toSink() { | |||
| 	var raws []models.EsRaw | |||
| 	the.lock.Lock() | |||
| 	defer the.lock.Unlock() | |||
| 	the.sinkRawMap.Range(func(key, value any) bool { | |||
| 		if v, ok := value.(*models.EsRaw); ok { | |||
| 			raws = append(raws, *v) | |||
| 			//零时打日志用
 | |||
| 			if v.IotaDevice == logTagDeviceId { | |||
| 				bs, _ := json.Marshal(v) | |||
| 				log.Printf("toSink -> Range 标记设备数据 [%s] %s ", logTagDeviceId, string(bs)) | |||
| 			} | |||
| 			return ok | |||
| 		} else { | |||
| 			log.Printf("!!!  toSink -> Range  类型转换异常 [%v]", key) | |||
| 		} | |||
| 		return true | |||
| 	}) | |||
| 	if len(raws) > 0 { | |||
| 		log.Printf("准备写入es %d条", len(raws)) | |||
| 		index := the.ConfigInfo.IoConfig.Out.Es.Index | |||
| 		the.OutEs.BulkWriteRaws2Es(index, raws) | |||
| 		the.sinkRawMap.Clear() | |||
| 	} | |||
| } | |||
| 
 | |||
| const logTagDeviceId = "91da4d1f-fbc7-4dad-bedd-f8ff05c0e0e0" | |||
| 
 | |||
| func (the *consumerAXYraw) Work() { | |||
| 	go the.sinkTask() | |||
| 	go func() { | |||
| 		for { | |||
| 			pushEsRaw := <-the.dataCache | |||
| 
 | |||
| 			if pushEsRaw.IotaDevice == logTagDeviceId { | |||
| 				bs, _ := json.Marshal(pushEsRaw) | |||
| 				log.Printf("存储 标记设备数据 [%s] %s ", logTagDeviceId, string(bs)) | |||
| 			} | |||
| 
 | |||
| 			//有效数据存入缓存
 | |||
| 			the.lock.Lock() | |||
| 			the.sinkRawMap.Store(pushEsRaw.IotaDevice, pushEsRaw) | |||
| 			the.lock.Unlock() | |||
| 		} | |||
| 
 | |||
| 	}() | |||
| } | |||
| func (the *consumerAXYraw) onData(topic string, msg string) bool { | |||
| 	//if len(msg) > 80 {
 | |||
| 	//	log.Printf("recv:[%s]:%s ...", topic, msg[:80])
 | |||
| 	//}
 | |||
| 	adaptor := adaptors.Adaptor_AXY_LastRAW{ | |||
| 		Redis: the.infoRedis, | |||
| 	} | |||
| 
 | |||
| 	needPush := adaptor.Transform(topic, msg) | |||
| 
 | |||
| 	if needPush != nil && len(needPush.Meta) > 0 && needPush.Data != nil { | |||
| 		//日志标记
 | |||
| 		if needPush.IotaDevice == logTagDeviceId { | |||
| 			bs, _ := json.Marshal(needPush) | |||
| 			log.Printf("onData -> needPush 标记设备数据 [%s] %s ", logTagDeviceId, string(bs)) | |||
| 		} | |||
| 		the.dataCache <- needPush | |||
| 	} | |||
| 
 | |||
| 	return true | |||
| } | |||
| @ -0,0 +1,102 @@ | |||
| package consumers | |||
| 
 | |||
| import ( | |||
| 	"encoding/json" | |||
| 	"fmt" | |||
| 	"goInOut/config" | |||
| 	"goInOut/consumers/HTTP_PRPXY" | |||
| 	"goInOut/dbOperate" | |||
| 	"goInOut/utils" | |||
| 	"io" | |||
| 	"log" | |||
| 	"net/http" | |||
| ) | |||
| 
 | |||
| type consumerHttpProxy struct { | |||
| 	//数据缓存管道
 | |||
| 	routes map[string]config.Router | |||
| 	//具体配置
 | |||
| 	Info        HTTP_PRPXY.ConfigFile | |||
| 	InApiServer *dbOperate.ApiServerHelper | |||
| 	outHttpPost *dbOperate.HttpHelper | |||
| } | |||
| 
 | |||
| func (the *consumerHttpProxy) LoadConfigJson(cfgStr string) { | |||
| 	// 将 JSON 格式的数据解析到结构体中
 | |||
| 	err := json.Unmarshal([]byte(cfgStr), &the.Info) | |||
| 	if err != nil { | |||
| 		log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) | |||
| 		panic(err) | |||
| 	} | |||
| } | |||
| 
 | |||
| func (the *consumerHttpProxy) Initial(cfg string) error { | |||
| 	the.routes = map[string]config.Router{} | |||
| 	the.LoadConfigJson(cfg) | |||
| 	err := the.InputInitial() | |||
| 	if err != nil { | |||
| 		return err | |||
| 	} | |||
| 	err = the.OutputInitial() | |||
| 	return err | |||
| } | |||
| func (the *consumerHttpProxy) InputInitial() error { | |||
| 	//数据入口
 | |||
| 	the.InApiServer = dbOperate.NewApiServer( | |||
| 		the.Info.IoConfig.In.ApiServer.Port, | |||
| 		the.Info.IoConfig.In.ApiServer.Routers, | |||
| 	) | |||
| 	the.InApiServer.Initial() | |||
| 	return nil | |||
| } | |||
| func (the *consumerHttpProxy) OutputInitial() error { | |||
| 	//数据出口
 | |||
| 	the.outHttpPost = &dbOperate.HttpHelper{ | |||
| 		Url: the.Info.IoConfig.Out.HttpPost.Url, | |||
| 	} | |||
| 	the.outHttpPost.Initial() | |||
| 	return nil | |||
| } | |||
| func (the *consumerHttpProxy) Work() { | |||
| 	go func() { | |||
| 		for _, router := range the.Info.IoConfig.In.ApiServer.Routers { | |||
| 			the.InApiServer.RouteRegister(router.Router, the.onData) | |||
| 			the.routes[router.Router] = router | |||
| 		} | |||
| 		the.InApiServer.Run() | |||
| 
 | |||
| 	}() | |||
| } | |||
| func (the *consumerHttpProxy) onData(w http.ResponseWriter, r *http.Request) { | |||
| 	w.Header().Set("Content-Type", "application/json") | |||
| 	body, err := io.ReadAll(r.Body) | |||
| 
 | |||
| 	log.Printf("收到 %s 请求 %s", r.RequestURI, body) | |||
| 
 | |||
| 	bodyObj := map[string]any{} | |||
| 	err = json.Unmarshal(body, &bodyObj) | |||
| 	if err != nil { | |||
| 		log.Printf("body 解析失败,请检查 %s", err.Error()) | |||
| 		return | |||
| 	} | |||
| 	routePath := fmt.Sprintf("%s %s", r.Method, r.RequestURI) | |||
| 	idTemplate := the.routes[routePath] | |||
| 
 | |||
| 	id, err := utils.TextTemplateMatch(bodyObj, idTemplate.IdTemplate) | |||
| 	if err != nil { | |||
| 		log.Printf("解析id 失败,请检查 %s", err.Error()) | |||
| 	} | |||
| 	params := map[string]string{ | |||
| 		"id": id, | |||
| 	} | |||
| 	resp := "" | |||
| 	//沿用请求路由
 | |||
| 	newUrl := the.outHttpPost.Url + r.URL.String() | |||
| 	resp, err = the.outHttpPost.HttpPostWithParams(newUrl, string(body), params) | |||
| 	if err != nil { | |||
| 		return | |||
| 	} | |||
| 	log.Printf("应答: %s", resp) | |||
| 	defer fmt.Fprintf(w, resp) | |||
| 
 | |||
| } | |||
| @ -0,0 +1,174 @@ | |||
| package consumers | |||
| 
 | |||
| import ( | |||
| 	"encoding/json" | |||
| 	"fmt" | |||
| 	"goInOut/adaptors" | |||
| 	"goInOut/consumers/SinoGnssMySQL" | |||
| 	"goInOut/dbOperate" | |||
| 	"goInOut/monitors" | |||
| 	"goInOut/utils" | |||
| 	"log" | |||
| 	"os" | |||
| 	"time" | |||
| ) | |||
| 
 | |||
| type consumerSinoGnssMySQL struct { | |||
| 	//数据缓存管道
 | |||
| 	ch chan []adaptors.NeedPush | |||
| 	//具体配置
 | |||
| 	Info    SinoGnssMySQL.ConfigFile | |||
| 	InDB    *dbOperate.DBHelper | |||
| 	outMqtt *dbOperate.MqttHelper | |||
| 	monitor *monitors.CommonMonitor | |||
| } | |||
| 
 | |||
| func (the *consumerSinoGnssMySQL) LoadConfigJson(cfgStr string) { | |||
| 	// 将 JSON 格式的数据解析到结构体中
 | |||
| 	err := json.Unmarshal([]byte(cfgStr), &the.Info) | |||
| 	if err != nil { | |||
| 		log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) | |||
| 		panic(err) | |||
| 	} | |||
| } | |||
| 
 | |||
| func (the *consumerSinoGnssMySQL) Initial(cfg string) error { | |||
| 	the.LoadConfigJson(cfg) | |||
| 	err := the.InputInitial() | |||
| 	if err != nil { | |||
| 		return err | |||
| 	} | |||
| 	err = the.OutputInitial() | |||
| 	return err | |||
| } | |||
| func (the *consumerSinoGnssMySQL) InputInitial() error { | |||
| 	the.ch = make(chan []adaptors.NeedPush, 200) | |||
| 	//数据入口
 | |||
| 	the.InDB = dbOperate.NewDBHelper( | |||
| 		the.Info.IoConfig.In.Db.Type, | |||
| 		the.Info.IoConfig.In.Db.ConnStr) | |||
| 	the.monitor = &monitors.CommonMonitor{ | |||
| 		MonitorHelper: &monitors.MonitorHelper{CronStr: the.Info.IoConfig.In.CronStr}, | |||
| 	} | |||
| 	the.monitor.Start() | |||
| 	the.monitor.RegisterFun(the.onData) | |||
| 	return nil | |||
| } | |||
| func (the *consumerSinoGnssMySQL) OutputInitial() error { | |||
| 	//数据出口
 | |||
| 	the.outMqtt = dbOperate.MqttInitial( | |||
| 		the.Info.IoConfig.Out.Mqtt.Host, | |||
| 		the.Info.IoConfig.Out.Mqtt.Port, | |||
| 		the.Info.IoConfig.Out.Mqtt.ClientId, | |||
| 		the.Info.IoConfig.Out.Mqtt.UserName, | |||
| 		the.Info.IoConfig.Out.Mqtt.Password, | |||
| 		false, //按照具体项目来
 | |||
| 		"") | |||
| 	return nil | |||
| } | |||
| func (the *consumerSinoGnssMySQL) Work() { | |||
| 	go func() { | |||
| 		for { | |||
| 			needPushList := <-the.ch | |||
| 			if len(the.ch) > 0 { | |||
| 				log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) | |||
| 			} | |||
| 
 | |||
| 			for _, push := range needPushList { | |||
| 				if push.Topic != "" { | |||
| 					the.outMqtt.Publish(push.Topic, push.Payload) | |||
| 				} | |||
| 			} | |||
| 
 | |||
| 			time.Sleep(100 * time.Millisecond) | |||
| 		} | |||
| 	}() | |||
| } | |||
| func (the *consumerSinoGnssMySQL) onData() { | |||
| 	recordInfo, err := readRecord() | |||
| 	if err != nil { | |||
| 		log.Printf("读取 缓存异常,err=%v", err.Error()) | |||
| 		return | |||
| 	} | |||
| 	sql := fmt.Sprintf(`select d.id,d.station_name,p.group_name,d.time,d.x,d.y,d.h from %s as d  | |||
| LEFT JOIN datasolution as p | |||
| ON d.station_name=p.sn | |||
| where p.group_name is not null | |||
| and d.id > %d and d.id <= %d | |||
| ORDER BY p.group_name;`, recordInfo.TableName, recordInfo.Id, recordInfo.Id+200) | |||
| 	var GnssDatas []SinoGnssMySQL.GnssData | |||
| 	err = the.InDB.Query(&GnssDatas, sql) | |||
| 	if err != nil || len(GnssDatas) == 0 { | |||
| 		log.Printf("当前批次无数据,跳过") | |||
| 		return | |||
| 	} | |||
| 	maxId := MaxId(GnssDatas) | |||
| 	log.Printf("当前批次id=%d => %d", recordInfo.Id, maxId) | |||
| 	recordInfo.Id = maxId | |||
| 	adaptor := the.getAdaptor() | |||
| 	needPush := adaptor.Transform(GnssDatas) | |||
| 	if len(needPush) > 0 { | |||
| 		the.ch <- needPush | |||
| 	} | |||
| 	fileName := "cache.inout" | |||
| 	//发现新月新纪录
 | |||
| 	newTableName := tableNameNow() | |||
| 	if recordInfo.TableName != newTableName { | |||
| 		recordInfo.TableName = newTableName | |||
| 		recordInfo.Id = 0 | |||
| 	} | |||
| 	cacheStr, _ := json.Marshal(recordInfo) | |||
| 	err = utils.SaveCache2File(string(cacheStr), fileName) | |||
| 	if err != nil { | |||
| 		log.Panicf("record id to file,error: %v", err.Error()) | |||
| 	} | |||
| 
 | |||
| } | |||
| func (the *consumerSinoGnssMySQL) getAdaptor() (adaptor adaptors.Adaptor_SINOMYSQL_AXYMQTT) { | |||
| 
 | |||
| 	return adaptors.Adaptor_SINOMYSQL_AXYMQTT{} | |||
| } | |||
| 
 | |||
| func readRecord() (SinoGnssMySQL.RecordInfo, error) { | |||
| 	fileName := "cache.inout" | |||
| 	//文件存在?
 | |||
| 	isExist := utils.FileExists(fileName) | |||
| 	if !isExist { | |||
| 		// 文件不存在,创建文件
 | |||
| 		file, err := os.Create(fileName) | |||
| 		if err != nil { | |||
| 			log.Panicf("Error creating file: %v", err) | |||
| 		} | |||
| 		defaultRecord := SinoGnssMySQL.RecordInfo{ | |||
| 			Id:        0, | |||
| 			TableName: tableNameNow(), | |||
| 		} | |||
| 		str, _ := json.Marshal(defaultRecord) | |||
| 		_, err = file.WriteString(string(str)) | |||
| 		if err != nil { | |||
| 			log.Panicf("file write error: %v", err.Error()) | |||
| 			return SinoGnssMySQL.RecordInfo{}, err | |||
| 		} | |||
| 	} | |||
| 	recordStr, err := utils.ReadCache2File(fileName) | |||
| 	if err != nil { | |||
| 		panic("") | |||
| 	} | |||
| 	record := SinoGnssMySQL.RecordInfo{} | |||
| 	err = json.Unmarshal([]byte(recordStr), &record) | |||
| 	return record, err | |||
| } | |||
| 
 | |||
| func MaxId(GnssDatas []SinoGnssMySQL.GnssData) int64 { | |||
| 	maxId := GnssDatas[0].Id | |||
| 	for _, data := range GnssDatas { | |||
| 		if data.Id > maxId { | |||
| 			maxId = data.Id | |||
| 		} | |||
| 	} | |||
| 	return maxId | |||
| } | |||
| 
 | |||
| func tableNameNow() string { | |||
| 	return "data_gnss_" + time.Now().Format("200601") | |||
| } | |||
| @ -1,38 +0,0 @@ | |||
| package dbHelper | |||
| 
 | |||
| import ( | |||
| 	"fmt" | |||
| 	"github.com/gin-gonic/gin" | |||
| 	"log" | |||
| 	"net/http" | |||
| 	"time" | |||
| ) | |||
| 
 | |||
| type RouterFunc struct { | |||
| 	relativePath string               //相对路由 如/gzm/data/upload
 | |||
| 	funcType     string               // 方法类型 如 post ,get
 | |||
| 	fun          func(c *gin.Context) //方法
 | |||
| } | |||
| 
 | |||
| type ApiServerHelper struct { | |||
| 	Port    uint16 | |||
| 	RoutFun map[string]RouterFunc | |||
| } | |||
| 
 | |||
| func (the *ApiServerHelper) Initial() { | |||
| 	router := gin.Default() | |||
| 	for name, routerFunc := range the.RoutFun { | |||
| 		switch routerFunc.funcType { | |||
| 		case http.MethodGet: | |||
| 			router.GET(routerFunc.relativePath, routerFunc.fun) | |||
| 		case http.MethodPost: | |||
| 			router.GET(routerFunc.relativePath, routerFunc.fun) | |||
| 		default: | |||
| 			log.Printf("不支持的 [%s]方法类型 %s", routerFunc.relativePath, routerFunc.funcType) | |||
| 			continue | |||
| 		} | |||
| 		log.Printf("注册路由 %s,监听地址=%s", name, routerFunc.relativePath) | |||
| 	} | |||
| 	router.Run(fmt.Sprintf("0.0.0.0:%d", the.Port)) | |||
| 	time.Sleep(time.Second * 1) | |||
| } | |||
| @ -0,0 +1,58 @@ | |||
| package dbOperate | |||
| 
 | |||
| import ( | |||
| 	"fmt" | |||
| 	"goInOut/config" | |||
| 	"log" | |||
| 	"net/http" | |||
| ) | |||
| 
 | |||
| type ApiServerHelper struct { | |||
| 	mux    *http.ServeMux | |||
| 	routes []config.Router | |||
| 	port   uint | |||
| 	server http.Server | |||
| } | |||
| 
 | |||
| func NewApiServer(port uint, routes []config.Router) *ApiServerHelper { | |||
| 	return &ApiServerHelper{ | |||
| 		mux:    http.NewServeMux(), | |||
| 		routes: routes, | |||
| 		port:   port, | |||
| 	} | |||
| } | |||
| 
 | |||
| func (the *ApiServerHelper) Initial() { | |||
| 	the.mux = http.NewServeMux() | |||
| 
 | |||
| 	// 创建 HTTP 服务器
 | |||
| 	the.server = http.Server{ | |||
| 		Handler: the.mux, | |||
| 		Addr:    fmt.Sprintf(":%d", the.port), | |||
| 	} | |||
| 
 | |||
| 	//the.routeRegister()
 | |||
| 
 | |||
| } | |||
| func (the *ApiServerHelper) Run() { | |||
| 	log.Printf("apiServer监听端口 %d", the.port) | |||
| 	the.server.ListenAndServe() | |||
| } | |||
| func (the *ApiServerHelper) routeRegister() { | |||
| 
 | |||
| 	for _, route := range the.routes { | |||
| 		the.mux.HandleFunc(route.Router, func(w http.ResponseWriter, r *http.Request) { | |||
| 			w.Header().Set("Content-Type", "application/json") | |||
| 			s := fmt.Sprintf(`{"route":"%s","resp":"安全带状态应答"}`, route) | |||
| 			println("收到请求", route.Router) | |||
| 			fmt.Fprintf(w, s) | |||
| 		}) | |||
| 		log.Printf("注册路由 %s", route) | |||
| 	} | |||
| } | |||
| func (the *ApiServerHelper) RouteRegister(route string, handler func(w http.ResponseWriter, r *http.Request)) { | |||
| 
 | |||
| 	the.mux.HandleFunc(route, handler) | |||
| 	log.Printf("注册路由 %s", route) | |||
| 
 | |||
| } | |||
| @ -0,0 +1,101 @@ | |||
| package dbOperate | |||
| 
 | |||
| import ( | |||
| 	_ "github.com/go-sql-driver/mysql" | |||
| 	"github.com/jmoiron/sqlx" | |||
| 	_ "github.com/lib/pq" | |||
| 	_ "github.com/mattn/go-sqlite3" | |||
| 	"log" | |||
| 	"time" | |||
| ) | |||
| 
 | |||
| const ( | |||
| 	postgres = iota + 1 | |||
| 	sqlite3 | |||
| ) | |||
| 
 | |||
| type DBHelper struct { | |||
| 	dbClient   *sqlx.DB | |||
| 	dbType     string | |||
| 	ConnectStr string | |||
| } | |||
| 
 | |||
| func NewDBHelper(dbType string, connectStr string) *DBHelper { | |||
| 	the := &DBHelper{} | |||
| 	switch dbType { | |||
| 	case "postgres": | |||
| 		fallthrough | |||
| 	case "mysql": | |||
| 		fallthrough | |||
| 	case "sqlite3": | |||
| 		fallthrough | |||
| 	case "有效的数据库类型": | |||
| 		the.dbType = dbType | |||
| 		the.ConnectStr = connectStr | |||
| 	default: | |||
| 		log.Panicf("不支持的数据库类型=> %s", dbType) | |||
| 
 | |||
| 	} | |||
| 	return the | |||
| } | |||
| 
 | |||
| // sqlite3 => connectStr := os.Getwd() + "/db/cz.db"
 | |||
| // mysql => "user:password@tcp(127.0.0.1:3306)/databaseName"
 | |||
| // pg => "host=192.168.10.64 port=5432 user=postgres password=123456 dbname=headscale sslmode=disable"
 | |||
| func (the *DBHelper) dbOpen() error { | |||
| 	var err error | |||
| 	tdb, err := sqlx.Open(the.dbType, the.ConnectStr) | |||
| 	if err != nil { | |||
| 		return err | |||
| 	} | |||
| 	if err = tdb.Ping(); err != nil { | |||
| 		return err | |||
| 	} | |||
| 	the.dbClient = tdb | |||
| 	return nil | |||
| } | |||
| func (the *DBHelper) Exec(dbRecordSQL string) error { | |||
| 	if the.dbClient == nil { | |||
| 		if openErr := the.dbOpen(); openErr != nil { | |||
| 			//logger.Info("[%s]数据库链接失败,err=%v\n", time.Now().Format("2006-01-02 15:04:05.000"), openErr)
 | |||
| 			return openErr | |||
| 		} | |||
| 	} | |||
| 	execResult, execErr := the.dbClient.Exec(dbRecordSQL) | |||
| 	defer the.dbClient.Close() | |||
| 	if execErr != nil { | |||
| 		return execErr | |||
| 	} | |||
| 	if n, err := execResult.RowsAffected(); n > 0 { | |||
| 		return nil | |||
| 	} else { | |||
| 		log.Printf("[%s]执行sql[%s]失败\n", time.Now().Format("2006-01-02 15:04:05.000"), dbRecordSQL) | |||
| 		return err | |||
| 	} | |||
| 
 | |||
| } | |||
| 
 | |||
| func (the *DBHelper) Query(dest any, sql string) error { | |||
| 
 | |||
| 	start := time.Now() | |||
| 
 | |||
| 	if the.dbClient == nil { | |||
| 		if err := the.dbOpen(); err != nil { | |||
| 			log.Printf("数据库链接失败:%s", err.Error()) | |||
| 			return err | |||
| 		} | |||
| 	} | |||
| 	err := the.dbClient.Select(dest, sql) | |||
| 	if err != nil { | |||
| 		log.Printf("数据库查询失败:%s,\n sql=%s", err.Error(), sql) | |||
| 	} | |||
| 
 | |||
| 	durTime := time.Since(start).Seconds() | |||
| 	log.Printf("查询耗时:%v s", durTime) | |||
| 	return err | |||
| 
 | |||
| } | |||
| 
 | |||
| func (the *DBHelper) Close() error { | |||
| 	return the.Close() | |||
| } | |||
| @ -0,0 +1,37 @@ | |||
| package dbOperate | |||
| 
 | |||
| import ( | |||
| 	"goInOut/models" | |||
| 	"testing" | |||
| ) | |||
| 
 | |||
| type res struct { | |||
| 	RLLYCJ      string `json:"LLYCJ"` | |||
| 	RLLCacheMap string `json:"LLCacheMap"` | |||
| } | |||
| 
 | |||
| func TestRedis(t *testing.T) { | |||
| 	addr := "10.8.30.160:30379" | |||
| 	redis := NewRedisHelper("", addr) | |||
| 
 | |||
| 	key1 := "RLLYCJ" | |||
| 	//v := redis.Get(key1)
 | |||
| 	//println(v)
 | |||
| 
 | |||
| 	key2 := "RLLCacheMap" | |||
| 	res1 := res{} | |||
| 
 | |||
| 	v2 := redis.MGet(&res1, key1, key2) | |||
| 	println(v2) | |||
| } | |||
| 
 | |||
| func TestPg(t *testing.T) { | |||
| 	dbType := "postgres" | |||
| 	connectStr := "host=10.8.30.32 port=5432 user=postgres password=123 dbname=NBJJ1215-T sslmode=disable" | |||
| 	db := NewDBHelper(dbType, connectStr) | |||
| 	sql := "select * from t_agg_way" | |||
| 	var r []models.AggWay | |||
| 	db.Query(&r, sql) | |||
| 	println("===") | |||
| 
 | |||
| } | |||
| @ -0,0 +1,269 @@ | |||
| package dbOperate | |||
| 
 | |||
| import ( | |||
| 	"bytes" | |||
| 	"context" | |||
| 	"encoding/json" | |||
| 	"fmt" | |||
| 	elasticsearch6 "github.com/elastic/go-elasticsearch/v6" | |||
| 	"github.com/elastic/go-elasticsearch/v6/esapi" | |||
| 	"goInOut/models" | |||
| 	"io" | |||
| 	"log" | |||
| 	"strings" | |||
| ) | |||
| 
 | |||
| type ESHelper struct { | |||
| 	addresses []string | |||
| 	//org      string
 | |||
| 	esClient *elasticsearch6.Client | |||
| } | |||
| 
 | |||
| func NewESHelper(addresses []string, user, pwd string) *ESHelper { | |||
| 	es, _ := elasticsearch6.NewClient(elasticsearch6.Config{ | |||
| 		Addresses: addresses, | |||
| 		Username:  user, | |||
| 		Password:  pwd, | |||
| 	}) | |||
| 	res, err := es.Info() | |||
| 	if err != nil { | |||
| 		log.Fatalf("Error getting response: %s", err) | |||
| 	} | |||
| 	log.Printf("链接到es[%s]info=%v", elasticsearch6.Version, res) | |||
| 	return &ESHelper{ | |||
| 		addresses: addresses, | |||
| 		esClient:  es, | |||
| 	} | |||
| } | |||
| func (the *ESHelper) SearchRaw(index, reqBody string) []models.HitRaw { | |||
| 	body := &bytes.Buffer{} | |||
| 	body.WriteString(reqBody) | |||
| 	response, err := the.esClient.Search( | |||
| 		the.esClient.Search.WithIndex(index), | |||
| 		the.esClient.Search.WithBody(body), | |||
| 	) | |||
| 	defer response.Body.Close() | |||
| 	if err != nil { | |||
| 		return nil | |||
| 	} | |||
| 	r := models.EsRawResp{} | |||
| 	// Deserialize the response into a map.
 | |||
| 	if err := json.NewDecoder(response.Body).Decode(&r); err != nil { | |||
| 		log.Fatalf("Error parsing the response body: %s", err) | |||
| 	} | |||
| 	return r.Hits.Hits | |||
| } | |||
| func (the *ESHelper) Search(index, reqBody string) { | |||
| 	body := &bytes.Buffer{} | |||
| 	body.WriteString(reqBody) | |||
| 	response, err := the.esClient.Search( | |||
| 		the.esClient.Search.WithIndex(index), | |||
| 		the.esClient.Search.WithBody(body), | |||
| 	) | |||
| 
 | |||
| 	if err != nil { | |||
| 		//return nil, err
 | |||
| 	} | |||
| 	log.Println(response.Status()) | |||
| 	var r map[string]any | |||
| 	// Deserialize the response into a map.
 | |||
| 	if err := json.NewDecoder(response.Body).Decode(&r); err != nil { | |||
| 		log.Fatalf("Error parsing the response body: %s", err) | |||
| 	} | |||
| 	// Print the response status, number of results, and request duration.
 | |||
| 	log.Printf( | |||
| 		"[%s] %d hits; took: %dms", | |||
| 		response.Status(), | |||
| 		int(r["hits"].(map[string]any)["total"].(float64)), | |||
| 		int(r["took"].(float64)), | |||
| 	) | |||
| 
 | |||
| 	for _, hit := range r["hits"].(map[string]any)["hits"].([]any) { | |||
| 
 | |||
| 		source := hit.(map[string]any)["_source"] | |||
| 		log.Printf(" * ID=%s, %s", hit.(map[string]any)["_id"], source) | |||
| 	} | |||
| 	log.Println(strings.Repeat("=", 37)) | |||
| } | |||
| func (the *ESHelper) request(index, reqBody string) (map[string]any, error) { | |||
| 	// Set up the request object.
 | |||
| 	req := esapi.IndexRequest{ | |||
| 		Index: index, | |||
| 		//DocumentID: strconv.Itoa(i + 1),
 | |||
| 		Body:    strings.NewReader(reqBody), | |||
| 		Refresh: "true", | |||
| 	} | |||
| 	// Perform the request with the client.
 | |||
| 	res, err := req.Do(context.Background(), the.esClient) | |||
| 	if err != nil { | |||
| 		log.Fatalf("Error getting response: %s", err) | |||
| 	} | |||
| 	defer res.Body.Close() | |||
| 	var r map[string]any | |||
| 	if res.IsError() { | |||
| 		log.Printf("[%s] Error indexing document ID=%d", res.Status(), 0) | |||
| 	} else { | |||
| 		// Deserialize the response into a map.
 | |||
| 
 | |||
| 		if err := json.NewDecoder(res.Body).Decode(&r); err != nil { | |||
| 			log.Printf("Error parsing the response body: %s", err) | |||
| 		} else { | |||
| 			// Print the response status and indexed document version.
 | |||
| 			log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64))) | |||
| 		} | |||
| 	} | |||
| 	return r, err | |||
| } | |||
| 
 | |||
| func (the *ESHelper) searchRaw(index, reqBody string) (models.IotaData, error) { | |||
| 	respmap, err := the.request(index, reqBody) | |||
| 	if respmap != nil { | |||
| 
 | |||
| 	} | |||
| 	iotaDatas := models.IotaData{} | |||
| 	return iotaDatas, err | |||
| } | |||
| 
 | |||
| func (the *ESHelper) searchThemes(index, reqBody string) (models.EsThemeResp, error) { | |||
| 	body := &bytes.Buffer{} | |||
| 	body.WriteString(reqBody) | |||
| 	response, err := the.esClient.Search( | |||
| 		the.esClient.Search.WithIndex(index), | |||
| 		the.esClient.Search.WithBody(body), | |||
| 	) | |||
| 	defer response.Body.Close() | |||
| 	if err != nil { | |||
| 		//return nil, err
 | |||
| 	} | |||
| 	log.Println(response.Status()) | |||
| 	r := models.EsThemeResp{} | |||
| 	// Deserialize the response into a map.
 | |||
| 	if err := json.NewDecoder(response.Body).Decode(&r); err != nil { | |||
| 		log.Fatalf("Error parsing the response body: %s", err) | |||
| 	} | |||
| 	return r, err | |||
| } | |||
| func (the *ESHelper) SearchLatestStationData(index string, sensorId int) (models.EsTheme, error) { | |||
| 	//sensorId := 178
 | |||
| 	queryBody := fmt.Sprintf(`{ | |||
|   "size": 1,  | |||
|   "query": { | |||
|     "term": { | |||
|       "sensor": { | |||
|         "value": %d | |||
|       } | |||
|     } | |||
|   },  | |||
|   "sort": [ | |||
|     { | |||
|       "collect_time": { | |||
|         "order": "desc" | |||
|       } | |||
|     } | |||
|   ] | |||
| }`, sensorId) | |||
| 	//index := "go_native_themes"
 | |||
| 	themes, err := the.searchThemes(index, queryBody) | |||
| 
 | |||
| 	var theme models.EsTheme | |||
| 	if len(themes.Hits.Hits) > 0 { | |||
| 		theme = themes.Hits.Hits[0].Source | |||
| 	} | |||
| 
 | |||
| 	return theme, err | |||
| } | |||
| func (the *ESHelper) BulkWrite(index, reqBody string) { | |||
| 
 | |||
| 	body := &bytes.Buffer{} | |||
| 	body.WriteString(reqBody) | |||
| 	bulkRequest := esapi.BulkRequest{ | |||
| 		Index:        index, | |||
| 		Body:         body, | |||
| 		DocumentType: "_doc", | |||
| 	} | |||
| 	res, err := bulkRequest.Do(context.Background(), the.esClient) | |||
| 	defer res.Body.Close() | |||
| 	if err != nil { | |||
| 		log.Panicf("es 写入[%s],err=%s", index, err.Error()) | |||
| 		return | |||
| 	} | |||
| 	respBody, _ := io.ReadAll(res.Body) | |||
| 	if res.StatusCode != 200 && res.StatusCode != 201 { | |||
| 		log.Panicf("es 写入失败,err=%s \n body=%s", string(respBody), reqBody) | |||
| 	} | |||
| 	//log.Printf("es 写入[%s],完成,res=%s ", index, respBody)
 | |||
| 
 | |||
| } | |||
| 
 | |||
| func (the *ESHelper) BulkWriteWithLog(index, reqBody string) { | |||
| 
 | |||
| 	body := &bytes.Buffer{} | |||
| 	body.WriteString(reqBody) | |||
| 	bulkRequest := esapi.BulkRequest{ | |||
| 		Index:        index, | |||
| 		Body:         body, | |||
| 		DocumentType: "_doc", | |||
| 	} | |||
| 	res, err := bulkRequest.Do(context.Background(), the.esClient) | |||
| 	defer res.Body.Close() | |||
| 	if err != nil { | |||
| 		log.Panicf("es 写入[%s],err=%s", index, err.Error()) | |||
| 		return | |||
| 	} | |||
| 	respBody, _ := io.ReadAll(res.Body) | |||
| 	if res.StatusCode != 200 && res.StatusCode != 201 { | |||
| 		log.Panicf("es 写入失败,err=%s \n body=%s", string(respBody), reqBody) | |||
| 	} | |||
| 	log.Printf("es 写入[%s],完成,res=%s ", index, respBody) | |||
| 
 | |||
| } | |||
| 
 | |||
| func (the *ESHelper) BulkWriteRaws2Es(index string, raws []models.EsRaw) { | |||
| 
 | |||
| 	//log 测试用
 | |||
| 	const logTagDeviceId = "91da4d1f-fbc7-4dad-bedd-f8ff05c0e0e0" | |||
| 
 | |||
| 	logTag := false | |||
| 
 | |||
| 	body := strings.Builder{} | |||
| 	for _, raw := range raws { | |||
| 		//  scala =>  val id = UUID.nameUUIDFromBytes(s"${v.deviceId}-${v.acqTime.getMillis}".getBytes("UTF-8")).toString
 | |||
| 		source, _ := json.Marshal(raw) | |||
| 		_id := raw.IotaDevice | |||
| 		s := fmt.Sprintf( | |||
| 			`{"index": {"_index": "%s","_id": "%s"}} | |||
| %s | |||
| `, index, _id, source) | |||
| 		body.WriteString(s) | |||
| 
 | |||
| 		if raw.IotaDevice == logTagDeviceId { | |||
| 			log.Printf("BulkWriteRaws2Es  标记设备数据 [%s] %s ", logTagDeviceId, string(s)) | |||
| 			logTag = true | |||
| 		} | |||
| 	} | |||
| 	if logTag { //追踪数据
 | |||
| 		the.BulkWriteWithLog(index, body.String()) | |||
| 	} else { | |||
| 		the.BulkWrite(index, body.String()) | |||
| 	} | |||
| 
 | |||
| } | |||
| 
 | |||
| func (the *ESHelper) BulkWriteRaws2EsLast(index string, raws []models.EsRaw) { | |||
| 	body := strings.Builder{} | |||
| 	for _, raw := range raws { | |||
| 		source, _ := json.Marshal(raw) | |||
| 		_id := raw.IotaDevice | |||
| 		s := fmt.Sprintf( | |||
| 			`{"index": {"_index": "%s","_id": "%s"}} | |||
| %s | |||
| `, index, _id, source) | |||
| 		body.WriteString(s) | |||
| 	} | |||
| 	the.BulkWrite(index, body.String()) | |||
| 
 | |||
| } | |||
| 
 | |||
| func (the *ESHelper) Close() { | |||
| 
 | |||
| } | |||
| @ -1,4 +1,4 @@ | |||
| package dbHelper | |||
| package dbOperate | |||
| 
 | |||
| import ( | |||
| 	"io" | |||
| @ -0,0 +1,133 @@ | |||
| package dbOperate | |||
| 
 | |||
| import ( | |||
| 	"context" | |||
| 	"encoding/json" | |||
| 	"errors" | |||
| 	"github.com/redis/go-redis/v9" | |||
| 	"log" | |||
| 	"time" | |||
| ) | |||
| 
 | |||
| type RedisHelper struct { | |||
| 	rdb     redis.UniversalClient | |||
| 	isReady bool | |||
| 	ctx     context.Context | |||
| } | |||
| 
 | |||
| func NewRedisHelper(master string, address ...string) *RedisHelper { | |||
| 	r := &RedisHelper{ctx: context.Background()} | |||
| 	r.InitialCluster(master, address...) | |||
| 	return r | |||
| 
 | |||
| } | |||
| 
 | |||
| func (the *RedisHelper) InitialCluster(master string, address ...string) { | |||
| 
 | |||
| 	the.rdb = redis.NewUniversalClient(&redis.UniversalOptions{ | |||
| 		Addrs:      address, | |||
| 		MasterName: master, | |||
| 	}) | |||
| 	log.Printf("redis 初始化完成 %s", address) | |||
| 	the.isReady = true | |||
| } | |||
| 
 | |||
| func (the *RedisHelper) Get(key string) string { | |||
| 	val, err := the.rdb.Get(the.ctx, key).Result() | |||
| 	if errors.Is(err, redis.Nil) { | |||
| 		log.Printf("%s does not exist", key) | |||
| 	} else if err != nil { | |||
| 		panic(err) | |||
| 	} else { | |||
| 		//log.Printf("get key => %s =%s", key, val)
 | |||
| 	} | |||
| 	return val | |||
| } | |||
| 
 | |||
| func (the *RedisHelper) GetObj(keys string, addr any) error { | |||
| 	err := the.rdb.Get(the.ctx, keys).Scan(addr) | |||
| 	if errors.Is(err, redis.Nil) { | |||
| 		log.Printf("%s does not exist", keys) | |||
| 	} else if err != nil { | |||
| 		es := err.Error() | |||
| 		log.Printf("err=%s ", es) | |||
| 		return err | |||
| 	} | |||
| 	return nil | |||
| } | |||
| func (the *RedisHelper) SetObj(keys string, obj any) error { | |||
| 	rs, err := the.rdb.Set(the.ctx, keys, obj, time.Minute*5).Result() | |||
| 	log.Printf("rs=%s ", rs) | |||
| 	if err != nil { | |||
| 		log.Printf("err=%s ", err.Error()) | |||
| 	} | |||
| 	return err | |||
| } | |||
| func (the *RedisHelper) GetLRange(keys string, addr any) error { | |||
| 	err := the.rdb.LRange(the.ctx, keys, 0, -1).ScanSlice(addr) | |||
| 	if errors.Is(err, redis.Nil) { | |||
| 		log.Printf("%s does not exist", keys) | |||
| 	} else if err != nil { | |||
| 		log.Printf("err=%s ", err.Error()) | |||
| 		return err | |||
| 	} | |||
| 	return nil | |||
| } | |||
| func (the *RedisHelper) MGet(addr any, keys ...string) error { | |||
| 	err := the.rdb.MGet(the.ctx, keys...).Scan(addr) | |||
| 	if errors.Is(err, redis.Nil) { | |||
| 		log.Printf("%s does not exist", keys) | |||
| 	} else if err != nil { | |||
| 		log.Printf("err=%s ", err.Error()) | |||
| 		return err | |||
| 	} | |||
| 
 | |||
| 	return err | |||
| } | |||
| func (the *RedisHelper) MGetObj(addr any, keys ...string) error { | |||
| 	err := the.rdb.MGet(the.ctx, keys...).Scan(addr) | |||
| 	if errors.Is(err, redis.Nil) { | |||
| 		log.Printf("%s does not exist", keys) | |||
| 	} else if err != nil { | |||
| 		log.Printf("err=%s ", err.Error()) | |||
| 		return err | |||
| 	} | |||
| 	return nil | |||
| } | |||
| func (the *RedisHelper) HMGetObj(addr any, key, field string) error { | |||
| 	rp, err := the.rdb.HMGet(the.ctx, key, field).Result() | |||
| 	if errors.Is(err, redis.Nil) { | |||
| 		log.Printf("%s does not exist", key) | |||
| 		return err | |||
| 	} else if err != nil { | |||
| 		log.Printf("err=%s ", err.Error()) | |||
| 		return err | |||
| 	} | |||
| 	for _, i := range rp { | |||
| 		if v, ok := i.(string); ok { | |||
| 			err := json.Unmarshal([]byte(v), addr) | |||
| 			if err != nil { | |||
| 				return err | |||
| 			} | |||
| 		} | |||
| 	} | |||
| 	//todo scan有问题 后续待排查
 | |||
| 	return nil | |||
| 
 | |||
| 	//err := the.rdb.HMGet(the.ctx, key, field).Scan(addr)
 | |||
| 	//if errors.Is(err, redis.Nil) {
 | |||
| 	//	log.Printf("%s does not exist", key)
 | |||
| 	//} else if err != nil {
 | |||
| 	//	log.Printf("err=%s ", err.Error())
 | |||
| 	//	return err
 | |||
| 	//}
 | |||
| 	//return nil
 | |||
| } | |||
| 
 | |||
| func (the *RedisHelper) SRem(key string, members ...string) int64 { | |||
| 	return the.rdb.SRem(the.ctx, key, members).Val() | |||
| } | |||
| 
 | |||
| func (the *RedisHelper) SAdd(key string, members ...string) int64 { | |||
| 	return the.rdb.SAdd(the.ctx, key, members).Val() | |||
| } | |||
| @ -1,4 +1,4 @@ | |||
| package dbHelper | |||
| package dbOperate | |||
| 
 | |||
| import ( | |||
| 	"fmt" | |||
| @ -1,18 +0,0 @@ | |||
| 2024/10/17 11:33:20.528503 main.go:24: =================log start================= | |||
| 2024/10/17 11:33:20.540075 main.go:25: ==> | |||
| 2024/10/17 11:33:20.540075 main.go:30: 进程启动 | |||
| 2024/10/17 11:33:20.540075 init.go:20: 加载配置文件:config_魏家滑坡_视觉位移.json | |||
| 2024/10/17 11:33:20.540592 init.go:20: 加载配置文件:弃用备份 | |||
| 2024/10/17 11:33:20.540592 init.go:22: 非文件[弃用备份]跳过 | |||
| 2024/10/17 11:33:22.546148 mqttHelper.go:48: mqtt连接状态异常 tcp://10.8.30.160:1883(u:wjhp-sjwy-upload,p:123456,cid:wjhp-sjwy-upload) [err=network Error : dial tcp 10.8.30.160:1883: connectex: No connection could be made because the target machine actively refused it.] | |||
| 2024/10/17 11:33:22.546148 mqttHelper.go:49: mqtt重连,30s后尝试,剩余次数=3 | |||
| 2024/10/17 11:33:53.265276 main.go:24: =================log start================= | |||
| 2024/10/17 11:33:53.285624 main.go:25: ==> | |||
| 2024/10/17 11:33:53.285624 main.go:30: 进程启动 | |||
| 2024/10/17 11:33:53.286127 init.go:20: 加载配置文件:config_魏家滑坡_视觉位移.json | |||
| 2024/10/17 11:33:53.286698 init.go:20: 加载配置文件:弃用备份 | |||
| 2024/10/17 11:33:53.286698 init.go:22: 非文件[弃用备份]跳过 | |||
| 2024/10/17 11:33:53.289208 mqttHelper.go:28: mqtt触发重连后的重订阅 | |||
| 2024/10/17 11:33:55.290027 mqttHelper.go:100: =================开始订阅 10.8.30.160 [/fs-flexometer/admin123]================= | |||
| 2024/10/17 11:34:12.251376 mqttHelper.go:28: mqtt触发重连后的重订阅 | |||
| 2024/10/17 11:34:12.251376 mqttHelper.go:100: =================开始订阅 10.8.30.160 [/fs-flexometer/admin123]================= | |||
| @ -0,0 +1,7 @@ | |||
| package models | |||
| 
 | |||
| type GDND2luAn struct { | |||
| 	Id    string               `json:"id"` | |||
| 	Time  string               `json:"time"` | |||
| 	Value map[string][]float32 `json:"value"` | |||
| } | |||
| @ -0,0 +1,40 @@ | |||
| package models | |||
| 
 | |||
| import ( | |||
| 	"time" | |||
| ) | |||
| 
 | |||
| type IotaData struct { | |||
| 	UserId      string    `json:"userId"` | |||
| 	ThingId     string    `json:"thingId"` | |||
| 	DimensionId string    `json:"dimensionId"` | |||
| 	DimCapId    string    `json:"dimCapId"` | |||
| 	CapId       string    `json:"capId"` | |||
| 	DeviceId    string    `json:"deviceId"` | |||
| 	ScheduleId  string    `json:"scheduleId"` | |||
| 	TaskId      string    `json:"taskId"` | |||
| 	JobId       int       `json:"jobId"` | |||
| 	JobRepeatId int       `json:"jobRepeatId"` | |||
| 	TriggerTime time.Time `json:"triggerTime"` | |||
| 	RealTime    time.Time `json:"realTime"` | |||
| 	FinishTime  time.Time `json:"finishTime"` | |||
| 	Seq         int       `json:"seq"` | |||
| 	Released    bool      `json:"released"` | |||
| 	Data        Data      `json:"data"` | |||
| } | |||
| 
 | |||
| type Data struct { | |||
| 	Type   int            `json:"type"` | |||
| 	Data   map[string]any `json:"data"` | |||
| 	Result struct { | |||
| 		Code     int    `json:"code"` | |||
| 		Msg      string `json:"msg"` | |||
| 		Detail   string `json:"detail"` | |||
| 		ErrTimes int    `json:"errTimes"` | |||
| 		Dropped  bool   `json:"dropped"` | |||
| 	} `json:"result"` | |||
| } | |||
| 
 | |||
| func (the *Data) Success() bool { | |||
| 	return the.Result.Code == 0 | |||
| } | |||
| @ -0,0 +1,6 @@ | |||
| package models | |||
| 
 | |||
| type AggWay struct { | |||
| 	Id   int64  `db:"id"` | |||
| 	Name string `db:"name"` | |||
| } | |||
| @ -0,0 +1,6 @@ | |||
| package models | |||
| 
 | |||
| const ( | |||
| 	RawTypeVib  = "vib" | |||
| 	RawTypeDiag = "diag" | |||
| ) | |||
| @ -0,0 +1,107 @@ | |||
| package models | |||
| 
 | |||
| import ( | |||
| 	"time" | |||
| ) | |||
| 
 | |||
| type DeviceData struct { | |||
| 	DeviceId    string | |||
| 	Name        string | |||
| 	ThingId     string | |||
| 	StructId    int | |||
| 	TaskId      string | |||
| 	AcqTime     time.Time | |||
| 	RealTime    time.Time | |||
| 	ErrCode     int | |||
| 	Raw         map[string]any | |||
| 	RawUnit     map[string]string | |||
| 	DeviceInfo  DeviceInfo | |||
| 	DimensionId string | |||
| 	//数据类型 常见有 comm="" ,RawTypeVib="vib"
 | |||
| 	DataType string | |||
| } | |||
| 
 | |||
| func (d *DeviceData) GetVibrationData() VibrationData { | |||
| 	vibData := VibrationData{} | |||
| 	if d.DataType == RawTypeVib { | |||
| 		if v, ok := d.Raw["filterFreq"]; ok { | |||
| 			if vv, ok := v.(float64); ok { | |||
| 				vibData.FilterFreq = vv | |||
| 			} | |||
| 		} | |||
| 
 | |||
| 		if v, ok := d.Raw["sampleFreq"]; ok { | |||
| 			if vv, ok := v.(float64); ok { | |||
| 				vibData.SampleFreq = vv | |||
| 			} | |||
| 		} | |||
| 
 | |||
| 		if v, ok := d.Raw["gainAmplifier"]; ok { | |||
| 			if vv, ok := v.(float64); ok { | |||
| 				vibData.GainAmplifier = byte(vv) | |||
| 			} | |||
| 		} | |||
| 
 | |||
| 		if v, ok := d.Raw["version"]; ok { | |||
| 			if vv, ok := v.(float64); ok { | |||
| 				vibData.Version = byte(vv) | |||
| 			} | |||
| 		} | |||
| 
 | |||
| 		if v, ok := d.Raw["triggerType"]; ok { | |||
| 			if vv, ok := v.(float64); ok { | |||
| 				vibData.TriggerType = byte(vv) | |||
| 			} | |||
| 		} | |||
| 
 | |||
| 		if v, ok := d.Raw["physicalvalue"]; ok { | |||
| 
 | |||
| 			if vSlice, ok := v.([]any); ok { | |||
| 				for _, vObj := range vSlice { | |||
| 					if vv, ok := vObj.(float64); ok { | |||
| 						vibData.Data = append(vibData.Data, vv) | |||
| 					} | |||
| 				} | |||
| 
 | |||
| 			} | |||
| 
 | |||
| 			//去直流
 | |||
| 			if len(vibData.Data) > 0 { | |||
| 				avg := func(dataArray []float64) float64 { | |||
| 					sum := 0.0 | |||
| 					for _, f := range dataArray { | |||
| 						sum += f | |||
| 					} | |||
| 					return sum / float64(len(dataArray)) | |||
| 				}(vibData.Data) //common_calc.GetAvg(vibData.Data)
 | |||
| 
 | |||
| 				for i := 0; i < len(vibData.Data); i++ { | |||
| 					vibData.Data[i] = vibData.Data[i] - avg | |||
| 				} | |||
| 			} | |||
| 		} | |||
| 
 | |||
| 	} | |||
| 	return vibData | |||
| } | |||
| 
 | |||
| // VibrationData 振动数据
 | |||
| type VibrationData struct { | |||
| 	Version       byte | |||
| 	SampleFreq    float64 | |||
| 	FilterFreq    float64 | |||
| 	GainAmplifier byte | |||
| 	TriggerType   byte | |||
| 	Data          []float64 // 原始波形数据
 | |||
| 	Unit          string | |||
| } | |||
| 
 | |||
| func (v *VibrationData) FormatParams() map[string]any { | |||
| 	return map[string]any{ | |||
| 		"sampleFreq":    v.SampleFreq, | |||
| 		"version":       v.Version, | |||
| 		"filterFreq":    v.FilterFreq, | |||
| 		"gainAmplifier": v.GainAmplifier, | |||
| 		"triggerType":   v.TriggerType, | |||
| 	} | |||
| } | |||
| @ -0,0 +1,70 @@ | |||
| package models | |||
| 
 | |||
| import ( | |||
| 	"encoding/json" | |||
| 	"fmt" | |||
| 	"time" | |||
| ) | |||
| 
 | |||
| type DeviceInfo struct { | |||
| 	Id          string     `json:"id"` | |||
| 	Name        string     `json:"name"` | |||
| 	Structure   Structure  `json:"structure"` | |||
| 	DeviceMeta  DeviceMeta `json:"device_meta"` | |||
| 	RefreshTime time.Time | |||
| } | |||
| 
 | |||
| type DeviceMeta struct { | |||
| 	Id           string           `json:"id"` | |||
| 	Name         string           `json:"name"` | |||
| 	Model        string           `json:"model"` | |||
| 	Properties   []IotaProperty   `json:"properties"` | |||
| 	Capabilities []IotaCapability `json:"capabilities"` | |||
| } | |||
| 
 | |||
| func (m *DeviceMeta) GetOutputProps() (out map[string]string) { | |||
| 	out = make(map[string]string) | |||
| 	if len(m.Capabilities) == 0 { | |||
| 		return | |||
| 	} | |||
| 	for _, property := range m.Capabilities[0].Properties { | |||
| 		info := fmt.Sprintf("%s(%s)", property.ShowName, property.Unit) | |||
| 		out[property.Name] = info | |||
| 	} | |||
| 	return | |||
| } | |||
| func (m *DeviceMeta) GetOutputUnit() (out map[string]string) { | |||
| 	out = make(map[string]string) | |||
| 	if len(m.Capabilities) == 0 { | |||
| 		return | |||
| 	} | |||
| 	for _, property := range m.Capabilities[0].Properties { | |||
| 		out[property.Name] = property.Unit | |||
| 	} | |||
| 	return | |||
| } | |||
| 
 | |||
| // redis序列化
 | |||
| func (m *DeviceMeta) MarshalBinary() (data []byte, err error) { | |||
| 	return json.Marshal(m) | |||
| } | |||
| 
 | |||
| // redis序列化
 | |||
| func (m *DeviceMeta) UnmarshalBinary(data []byte) error { | |||
| 	return json.Unmarshal(data, m) | |||
| 
 | |||
| } | |||
| 
 | |||
| type IotaCapability struct { | |||
| 	CapabilityCategoryId int            `json:"capabilityCategoryId"` | |||
| 	Id                   string         `json:"id"` | |||
| 	Name                 string         `json:"name"` | |||
| 	Properties           []IotaProperty `json:"properties"` | |||
| } | |||
| 
 | |||
| type IotaProperty struct { | |||
| 	Category string `json:"category"` | |||
| 	Name     string `json:"name"` | |||
| 	ShowName string `json:"showName"` | |||
| 	Unit     string `json:"unit"` | |||
| } | |||
| @ -0,0 +1,37 @@ | |||
| package models | |||
| 
 | |||
| import "time" | |||
| 
 | |||
| type EsRaw struct { | |||
| 	StructId       int               `json:"structId"` | |||
| 	IotaDeviceName string            `json:"iota_device_name"` | |||
| 	Data           map[string]any    `json:"data"` | |||
| 	CollectTime    string            `json:"collect_time"` | |||
| 	Meta           map[string]string `json:"meta"` | |||
| 	IotaDevice     string            `json:"iota_device"` | |||
| 	CreateTime     time.Time         `json:"create_time"` | |||
| } | |||
| 
 | |||
| type EsRawResp struct { | |||
| 	Took     int  `json:"took"` | |||
| 	TimedOut bool `json:"timed_out"` | |||
| 	Shards   struct { | |||
| 		Total      int `json:"total"` | |||
| 		Successful int `json:"successful"` | |||
| 		Skipped    int `json:"skipped"` | |||
| 		Failed     int `json:"failed"` | |||
| 	} `json:"_shards"` | |||
| 	Hits struct { | |||
| 		Total    int      `json:"total"` | |||
| 		MaxScore float64  `json:"max_score"` | |||
| 		Hits     []HitRaw `json:"hits"` | |||
| 	} `json:"hits"` | |||
| } | |||
| 
 | |||
| type HitRaw struct { | |||
| 	Index  string  `json:"_index"` | |||
| 	Type   string  `json:"_type"` | |||
| 	Id     string  `json:"_id"` | |||
| 	Score  float64 `json:"_score"` | |||
| 	Source EsRaw   `json:"_source"` | |||
| } | |||
| @ -0,0 +1,41 @@ | |||
| package models | |||
| 
 | |||
| import "time" | |||
| 
 | |||
| type EsTheme struct { | |||
| 	SensorName      string         `json:"sensor_name"` | |||
| 	FactorName      string         `json:"factor_name"` | |||
| 	FactorProtoCode string         `json:"factor_proto_code"` | |||
| 	Data            map[string]any `json:"data"` | |||
| 	FactorProtoName string         `json:"factor_proto_name"` | |||
| 	Factor          int            `json:"factor"` | |||
| 	CollectTime     time.Time      `json:"collect_time"` | |||
| 	Sensor          int            `json:"sensor"` | |||
| 	Structure       int            `json:"structure"` | |||
| 	IotaDevice      []string       `json:"iota_device"` | |||
| 	CreateTime      time.Time      `json:"create_time"` | |||
| } | |||
| 
 | |||
| type EsThemeResp struct { | |||
| 	Took     int  `json:"took"` | |||
| 	TimedOut bool `json:"timed_out"` | |||
| 	Shards   struct { | |||
| 		Total      int `json:"total"` | |||
| 		Successful int `json:"successful"` | |||
| 		Skipped    int `json:"skipped"` | |||
| 		Failed     int `json:"failed"` | |||
| 	} `json:"_shards"` | |||
| 	Hits struct { | |||
| 		Total    int        `json:"total"` | |||
| 		MaxScore float64    `json:"max_score"` | |||
| 		Hits     []HitTheme `json:"hits"` | |||
| 	} `json:"hits"` | |||
| } | |||
| 
 | |||
| type HitTheme struct { | |||
| 	Index  string  `json:"_index"` | |||
| 	Type   string  `json:"_type"` | |||
| 	Id     string  `json:"_id"` | |||
| 	Score  float64 `json:"_score"` | |||
| 	Source EsTheme `json:"_source"` | |||
| } | |||
| @ -0,0 +1,25 @@ | |||
| package models | |||
| 
 | |||
| import ( | |||
| 	"encoding/json" | |||
| ) | |||
| 
 | |||
| type IotaDevice struct { | |||
| 	Id           string     `json:"id"` | |||
| 	Name         string     `json:"name"` | |||
| 	Properties   string     `json:"properties"` | |||
| 	DeviceMetaId string     `json:"deviceMetaId"` | |||
| 	ThingId      string     `json:"thingId"` | |||
| 	DeviceMeta   DeviceMeta `json:"deviceMeta"` | |||
| } | |||
| 
 | |||
| // redis序列化
 | |||
| func (m *IotaDevice) MarshalBinary() (data []byte, err error) { | |||
| 	return json.Marshal(m) | |||
| } | |||
| 
 | |||
| // redis序列化
 | |||
| func (m *IotaDevice) UnmarshalBinary(data []byte) error { | |||
| 	return json.Unmarshal(data, m) | |||
| 
 | |||
| } | |||
| @ -0,0 +1,31 @@ | |||
| package models | |||
| 
 | |||
| import ( | |||
| 	"encoding/json" | |||
| ) | |||
| 
 | |||
| type Structure struct { | |||
| 	ThingId string `json:"thingId"` | |||
| 	Id      int    `json:"id"` | |||
| 	Name    string `json:"name"` | |||
| 	Type    string `json:"type"` | |||
| 	OrgId   int    `json:"orgId"` | |||
| } | |||
| 
 | |||
| type ThingStruct struct { | |||
| 	ThingId string `json:"thingId"` | |||
| 	Id      int    `json:"id"` | |||
| 	Name    string `json:"name"` | |||
| 	Type    string `json:"type"` | |||
| 	OrgId   int    `json:"orgId"` | |||
| } | |||
| 
 | |||
| // redis序列化
 | |||
| func (m *ThingStruct) MarshalBinary() (data []byte, err error) { | |||
| 	return json.Marshal(m) | |||
| } | |||
| 
 | |||
| // redis序列化
 | |||
| func (m *ThingStruct) UnmarshalBinary(data []byte) error { | |||
| 	return json.Unmarshal(data, m) | |||
| } | |||
| @ -0,0 +1,14 @@ | |||
| package monitors | |||
| 
 | |||
| type CommonMonitor struct { | |||
| 	*MonitorHelper | |||
| } | |||
| 
 | |||
| func (the *CommonMonitor) RegisterFun(fun func()) { | |||
| 	the.registerFun(fun) | |||
| } | |||
| 
 | |||
| func (the *CommonMonitor) Start() { | |||
| 	the.initial() | |||
| 	the.monitorStart() | |||
| } | |||
| @ -0,0 +1,80 @@ | |||
| package testUnit | |||
| 
 | |||
| import ( | |||
| 	"encoding/json" | |||
| 	"fmt" | |||
| 	"goInOut/models" | |||
| 	"goInOut/utils" | |||
| 	"testing" | |||
| 	"time" | |||
| ) | |||
| 
 | |||
| // 定义一个结构体,包含嵌套的结构体字段
 | |||
| type Person struct { | |||
| 	Name Name `json:"name"` | |||
| } | |||
| 
 | |||
| type Name struct { | |||
| 	First string `json:"first"` | |||
| 	Last  string `json:"last"` | |||
| } | |||
| 
 | |||
| func Test_httpProxy(t *testing.T) { | |||
| 
 | |||
| } | |||
| 
 | |||
| func Test_template(t *testing.T) { | |||
| 	// 创建一个Person实例
 | |||
| 	person := Person{ | |||
| 		Name: Name{ | |||
| 			First: "John", | |||
| 			Last:  "Doe", | |||
| 		}, | |||
| 	} | |||
| 
 | |||
| 	templateStr := "Hello, {{.Name.First}}   !" | |||
| 	sbs, err := utils.TextTemplateMatch(person, templateStr) | |||
| 	println(sbs) | |||
| 	if err != nil { | |||
| 		fmt.Println(err.Error()) | |||
| 	} | |||
| } | |||
| 
 | |||
| func Test_timeHandler(t *testing.T) { | |||
| 	rawMsg := `{ | |||
|                 "userId": "ce2d7eb2-e56e-422e-8bbe-95dfa18e32f8", | |||
|                 "thingId": "14862308-083e-46e1-a422-d7d6a1e2825d", | |||
|                 "dimensionId": "47386f69-c5aa-4ae7-b6cc-0490a1dc0b14", | |||
|                 "dimCapId": "0d561c0b-4dca-4104-abc0-1f0c40a71382", | |||
|                 "capId": "d4965875-354b-4294-87f4-c4ba9f9260ab", | |||
|                 "deviceId": "9c43a09c-3c65-42d3-9a54-42b87e0e5af2", | |||
|                 "scheduleId": "1cfebf18-81a2-489e-bcfb-efc294d8ce3d", | |||
|                 "taskId": "b58858ed-9e23-4ac9-9f9c-44f9e057aee9", | |||
|                 "jobId": 1, | |||
|                 "jobRepeatId": 1, | |||
|                 "triggerTime": "2024-12-04T18:23:04+16:00", | |||
|                 "realTime": "0001-01-01T00:00:00Z", | |||
|                 "finishTime": "2024-12-04T10:23:07.909922675+08:00", | |||
|                 "seq": 0, | |||
|                 "released": false, | |||
|                 "data": { | |||
|                     "type": 1, | |||
|                     "data": { | |||
|                         "physicalvalue": 0 | |||
|                     }, | |||
|                     "result": { | |||
|                         "code": 0, | |||
|                         "msg": "", | |||
|                         "detail": null, | |||
|                         "errTimes": 0, | |||
|                         "dropped": false | |||
|                     } | |||
|                 } | |||
|             }` | |||
| 
 | |||
| 	iotaData := models.IotaData{} | |||
| 	json.Unmarshal([]byte(rawMsg), &iotaData) | |||
| 	var cstZone = time.FixedZone("CST", 8*3600) // 东八区
 | |||
| 	time := iotaData.TriggerTime.In(cstZone).Format("2006-01-02T15:04:05.000+0800") | |||
| 	println(time) | |||
| } | |||
								
									
										File diff suppressed because one or more lines are too long
									
								
							
						
					| @ -0,0 +1,53 @@ | |||
| package utils | |||
| 
 | |||
| import ( | |||
| 	"fmt" | |||
| 	"log" | |||
| 	"os" | |||
| ) | |||
| 
 | |||
| func SaveCache2File(cacheStr string, fileName string) error { | |||
| 	// 打开文件,如果文件不存在则创建
 | |||
| 	file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) | |||
| 	if err != nil { | |||
| 		log.Println("Error opening file:", err) | |||
| 		return err | |||
| 	} | |||
| 	defer file.Close() | |||
| 
 | |||
| 	// 将变量写入文件
 | |||
| 	_, err = file.WriteString(cacheStr) | |||
| 	if err != nil { | |||
| 		log.Println("Error writing to file:", err) | |||
| 	} | |||
| 	return err | |||
| 
 | |||
| } | |||
| 
 | |||
| func ReadCache2File(fileName string) (string, error) { | |||
| 	// 打开文件
 | |||
| 	file, err := os.Open(fileName) | |||
| 	if err != nil { | |||
| 		log.Println("Error opening file:", err) | |||
| 		return "", err | |||
| 	} | |||
| 	defer file.Close() | |||
| 
 | |||
| 	// 读取文件内容
 | |||
| 	var content string | |||
| 	_, err = fmt.Fscanf(file, "%s", &content) | |||
| 	if err != nil { | |||
| 		log.Println("Error reading from file:", err) | |||
| 	} | |||
| 	return content, err | |||
| } | |||
| 
 | |||
| func FileExists(filePath string) bool { | |||
| 	_, err := os.Stat(filePath) | |||
| 	if err != nil { | |||
| 		if os.IsNotExist(err) { | |||
| 			return false | |||
| 		} | |||
| 	} | |||
| 	return true | |||
| } | |||
| @ -0,0 +1,22 @@ | |||
| package utils | |||
| 
 | |||
| import ( | |||
| 	"strings" | |||
| 	"text/template" | |||
| ) | |||
| 
 | |||
| func TextTemplateMatch(obj any, textTemplate string) (string, error) { | |||
| 	// 定义模板字符串
 | |||
| 	tmpl, err := template.New("template").Parse(textTemplate) | |||
| 	if err != nil { | |||
| 		panic(err) | |||
| 	} | |||
| 
 | |||
| 	sb := strings.Builder{} | |||
| 	err = tmpl.Execute(&sb, obj) | |||
| 	if err != nil { | |||
| 		return "", err | |||
| 	} | |||
| 	sbs := sb.String() | |||
| 	return sbs, err | |||
| } | |||
					Loading…
					
					
				
		Reference in new issue