数据 输入输出 处理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

195 lines
6.8 KiB

package adaptors
import (
"bytes"
"fmt"
"goInOut/consumers/MYX"
"goInOut/utils"
"log"
"strconv"
"strings"
"text/template"
"time"
)
// Adaptor_GDGS_MYX 中科光电光栅 转换 明月峡-长江大桥
type Adaptor_GDGS_MYX struct {
IdMap map[string]MYX.CHInfo
}
func (the Adaptor_GDGS_MYX) Transform(rawMsg [][]string) []byte {
//tycj := models.TYCJ{}
//json.Unmarshal([]byte(rawMsg), &tycj)
return the.GDGStoMYCInflux(rawMsg)
}
func (the Adaptor_GDGS_MYX) GDGStoMYCInflux(rawRecords [][]string) (result []byte) {
for _, record := range rawRecords {
if len(record) < 3 {
log.Printf("数据 列<3,无效,跳过,%s", record)
continue
}
//数据处理
recordTime, err := time.Parse("2006-01-02 15:04:05", record[0])
if err != nil {
log.Printf("光电光栅文件 解析数据时间 %s 解析错误", record[0])
continue
}
LastCH, err := strconv.Atoi(record[1])
if err != nil {
log.Printf("行数据首通道[%s]转换int失败:%s", record[1], err.Error())
}
//数据通道 subCh ->归属在大的通道CH中
dataSubCh := 0
for i := 1; i < len(record); i += 2 {
recordCH, err := strconv.Atoi(record[i])
if err != nil {
log.Printf("数据-子通道[%s]转换int失败:%s", record[i], err.Error())
continue
}
//中科光电光栅1个通道下有多个设备 每行数据如下 大通道一样 需要自己按照顺序排布子通道
//2023-12-29 06:54:43,0,1528094,0,1531681,0,1536561,0,1537559,0,1541141,0,1542270,0,1546317,0,1551866,1,1527281,1,1532567,1,1537039,1,1541923,1,1546877,1,1551006,1,1556059,1,1562970,2,1528381,2,1532404,2,1536890,2,1542877,2,1546748,2,1551749,2,1556631,2,1561473,3,1527951,3,1532139,3,1538792,3,1542914,3,1546860,3,1551240,3,1557643,3,1561802,4,1529398,4,1533369,4,1537114,4,1540956,4,1545918,4,1551113,4,1556017,4,1560997,5,1536839,5,1541839,5,1546881,5,1552005,6,1527526,6,1543637,6,1549114
if LastCH == recordCH {
dataSubCh += 1
} else {
LastCH = recordCH
//CH变了,重新开始累计
dataSubCh = 1
}
wavelength, err := strconv.Atoi(record[i+1])
if err != nil {
log.Printf("数据-子通道[%s] 波长转换int失败:%s", record[i+1], err.Error())
continue
}
CH := fmt.Sprintf("CH%d-%d", recordCH, dataSubCh)
log.Printf("光电光栅 传感器[CH%d-%d] 波长=%d", recordCH, dataSubCh, wavelength)
//todo 波长计算应变
//physicalValue := wavelength
sensorInfo, isValid := the.getSensorInfo(CH)
if !isValid {
log.Printf("光电光栅 传感器[%s] 无匹配,跳过", CH)
continue
}
sensorLabel := sensorInfo.Label
//公式表达式
formulaStr := sensorInfo.Formula
if formulaStr == "" {
log.Printf("光电光栅 传感器[%s][%s]波长=%v,无公式计算", CH, sensorLabel, wavelength)
}
log.Printf("光电光栅 传感器[%s][%s]波长=%v,准备公式计算=%s", CH, sensorLabel, wavelength, formulaStr)
phy, err := the.calculatePhy(wavelength, formulaStr, sensorInfo.Params)
if err != nil {
log.Printf("传感器[%s][%s]公式计算异常,err=%s", CH, sensorLabel, err.Error())
continue
}
ThemeValues := []float64{phy}
tableName, fields := the.getFactorFields(sensorInfo.Type)
if sensorInfo.Label == "" || tableName == "" || len(fields) == 0 {
log.Printf("光电光栅 设备[%s][%s] type=%s 无匹配FactorFields tableName=%s,fields=%s",
CH, sensorInfo.Label, sensorInfo.Type, tableName, fields)
continue
}
//动态应变插入
// QLDYBJC,sensor_id=QLDYBJC001 valueV=0.68
var sqlBuilder strings.Builder
sqlBuilder.WriteString(fmt.Sprintf("%s,sensor_id=%s ", tableName, sensorLabel))
for i, field := range fields {
sqlBuilder.WriteString(fmt.Sprintf("%s=%f,", field, ThemeValues[i]))
}
sqlStr := sqlBuilder.String()
sqlStr = sqlStr[:len(sqlStr)-1]
sqlStr += fmt.Sprintf(" %d", recordTime.Add(-8*time.Hour).UnixNano())
log.Printf("光电光栅 设备 [f=%s]-[%s]-[%s]报文=%s 设备时间%v",
sensorInfo.Type, CH, sensorLabel, sqlStr, recordTime)
return []byte(sqlStr)
}
now := time.Now()
log.Println(recordTime, err, now)
}
//Atime, err := time.Parse("2006-01-02T15:04:05.000", tycj.SensorData.Time)
//if err != nil {
// log.Printf("统一采集 设备[%s] 数据时间 %s 解析错误", tycj.SensorData.Name, tycj.SensorData.Time)
// return
//}
//
//sensorCode := the.getSensorId(tycj.SensorData.Name)
//tableName, fields := the.getFactorFields(tycj.SensorData.FactorType)
//if sensorCode == "" || tableName == "" || len(fields) == 0 || len(fields) > len(tycj.SensorData.Data.ThemeValues) {
// log.Printf("统一采集 设备[%s] 数据时间 %s 无匹配 sensorCode=%s,tableName=%s,fields=%s",
// tycj.SensorData.Name, tycj.SensorData.Time, sensorCode, tableName, fields)
// return
//}
//
////insert ND_data,sensor_id=ND000101 valueV=0.68,valueH=290
//var sqlBuilder strings.Builder
//sqlBuilder.WriteString(fmt.Sprintf("%s,sensor_id=%s ", tableName, sensorCode))
//for i, field := range fields {
// sqlBuilder.WriteString(fmt.Sprintf("%s=%f,", field, tycj.SensorData.Data.ThemeValues[i]))
//}
//sqlStr := sqlBuilder.String()
//sqlStr = sqlStr[:len(sqlStr)-1]
//sqlStr += fmt.Sprintf(" %d", Atime.Add(-8*time.Hour).UnixNano())
//log.Printf("TYCJ [f=%d]-[%s]-[%s]报文=%s 设备时间%v", tycj.SensorData.FactorType, tycj.SensorData.Name,
// sensorCode, sqlStr, Atime)
return result
}
func (the Adaptor_GDGS_MYX) calculatePhy(wavelength int, formulaTemplateStr string, params map[string]float64) (float64, error) {
//波长计入参数
params["phy"] = float64(wavelength)
newSensorParams := map[string]string{}
for k, v := range params {
newSensorParams[k] = strconv.FormatFloat(v, 'f', 3, 64)
}
CreateFormulaTemplate := func(name, t string) *template.Template {
return template.Must(template.New(name).Parse(t))
}
formulaTemplate := CreateFormulaTemplate("formulaTemplate", formulaTemplateStr)
formulaBuff := &bytes.Buffer{}
err := formulaTemplate.Execute(formulaBuff, newSensorParams)
if err != nil {
return 0, err
}
//公式表达式计算
result := utils.CalculateFormula(formulaBuff.String())
return result, nil
}
func (the Adaptor_GDGS_MYX) getSensorInfo(rawSensorName string) (info MYX.CHInfo, isValid bool) {
v, isValid := the.IdMap[rawSensorName]
return v, isValid
}
func (the Adaptor_GDGS_MYX) getFactorFields(factorType string) (tableName string, fields []string) {
//这个对应关系 要看现场聚集的统一采集软件 进行调整
switch factorType {
case "QLDYBJC": //桥梁动应变监测 共20个
tableName = factorType
fields = []string{"value"}
case "QLJYBJC": //桥梁静应变监测 共20个
tableName = factorType
fields = []string{"value"}
case "ZLWDJC": //主梁温度监测 共4个
tableName = factorType
fields = []string{"value"}
default:
fields = []string{}
}
return tableName, fields
}