package adaptors import ( "encoding/json" "fmt" "goUpload/consumers/GZGZM" "goUpload/models" "log" "math" "strconv" "time" ) // Adaptor_THEME_GZGZM 知物云kafka主题数据 转换 广东省高支模平台 type Adaptor_THEME_GZGZM struct { GZGZM.SensorConfig } func (the Adaptor_THEME_GZGZM) Transform(topic, rawMsg string) []byte { theme := models.SavoirTheme{} json.Unmarshal([]byte(rawMsg), &theme) return the.Theme2GzGZM(theme) } func (the Adaptor_THEME_GZGZM) Theme2GzGZM(theme models.SavoirTheme) (result []byte) { Atime, err := time.Parse("2006-01-02T15:04:05.000+0800", theme.AcqTime) if err != nil { log.Printf("知物云 测点[%s] 数据时间 %s 解析错误", theme.Station.Name, theme.AcqTime) return } timeStr := Atime.Format("2006-01-02 15:04:05") inStationId := strconv.Itoa(theme.Station.Id) outStationInfo := the.getSensorId(inStationId) postData := GZGZM.PostData{ MonitorDataId: fmt.Sprintf("%d_%d", theme.Station.Id, Atime.Unix()), UploadTime: timeStr, StartTime: timeStr, Contractor: outStationInfo.Contractor, //厂商编号 Type: GZGZM.MonitorTypeCodeZL, //监测类型code DeviceSn: outStationInfo.DeviceSn, //设备code DeviceIp: outStationInfo.DeviceIp, //设备ip(可不) Soc: 100, //设备电量 MeasuredPoint: outStationInfo.Code, //测点编号(高支模平台点id) BranchCode: outStationInfo.BranchCode, //分部码 Status: 1, //数据状态 1正常 2预警 3报警 Data: GZGZM.PointData{ Unit: GZGZM.UnitZL, MeasuredValue1: 0, CumulativeValue1: 0, //变化量 InitAmount1: outStationInfo.InitAmount1, //初始归0值 (目前看 配置文件中获取较好) }, } //var sensorDataList []float32 switch theme.Station.Factor.Id { case models.Savoir_FactorType_QX_ZL: //x,y,force if v, ok := theme.Data["force"]; ok { //赋值 postData.Data.MeasuredValue1 = v } case models.Savoir_FactorType_QX_SP: //x,y,displacement if v, ok := theme.Data["displacement"]; ok { //赋值 postData.Data.MeasuredValue1 = v } case models.Savoir_FactorType_QX_CZ: //x,y,settling if v, ok := theme.Data["settling"]; ok { //赋值 postData.Data.MeasuredValue1 = v } default: return } //结构转为 result, _ = json.Marshal(postData) return result } func (the Adaptor_THEME_GZGZM) getSensorId(sensorId string) GZGZM.SensorInfo { s := GZGZM.SensorInfo{} if v, ok := the.SensorInfoMap[sensorId]; ok { s = v } return s } func (the Adaptor_THEME_GZGZM) getCodeBytes(sensorCode int16) []byte { bytes := make([]byte, 0) bytes = append(bytes, byte(sensorCode&0xFF), byte(sensorCode>>8), ) return bytes } func (the Adaptor_THEME_GZGZM) getTimeBytes(sensorTime time.Time) []byte { year := int8(sensorTime.Year() - 1900) month := int8(sensorTime.Month()) day := int8(sensorTime.Day()) hour := int8(sensorTime.Hour()) minute := int8(sensorTime.Minute()) millisecond := uint16(sensorTime.Second()*1000 + sensorTime.Nanosecond()/1e6) bytes := make([]byte, 0) bytes = append(bytes, byte(year), byte(month), byte(day), byte(hour), byte(minute), byte(millisecond&0xFF), byte(millisecond>>8), ) return bytes } func (the Adaptor_THEME_GZGZM) getDatasBytes(datas []float32) []byte { bytes := make([]byte, 0) for _, data := range datas { bits := math.Float32bits(data) bytes = append(bytes, byte(bits&0xFF), byte(bits>>8&0xFF), byte(bits>>16&0xFF), byte(bits>>24&0xFF), ) } return bytes } func (the Adaptor_THEME_GZGZM) getPayloadHeader(floatCount int16) []byte { bytes := make([]byte, 0) bytes = append(bytes, //报文类型 0x02, 0x00, //1:上行信息 0x01, //默认,通讯计算机编号 0x00, //命令码 0x01, //报文长度 byte((floatCount*4+9)&0xFF), byte((floatCount*4+9)>>8), ) return bytes }