package consumers import ( "bytes" "crypto/md5" "encoding/hex" "fmt" "goInOut/dbOperate" "goInOut/models" "io" "log" "math/rand" "net/http" "strconv" "strings" "time" "github.com/goccy/go-json" "gopkg.in/yaml.v3" ) // 配置结构体 type DeepExcavationConfig struct { Address []string `yaml:"address"` Index string `yaml:"index"` UserName string `yaml:"userName"` Password string `yaml:"password"` URL string `yaml:"url"` Time int `yaml:"time"` Secret string `yaml:"secret"` ProjectId string `yaml:"projectId"` Appid string `yaml:"appid"` ProgramCode string `yaml:"programCode"` ProgramName string `yaml:"programName"` } // 深层水平位移数据结构 type DeepHorDisList struct { Depth int `json:"depth"` DepthValue float64 `json:"depthValue"` DepthInitValue float64 `json:"depthInitValue"` RateChange float64 `json:"rateChange"` } // 输出数据结构 type DeepExcavationData struct { ProjectId string `json:"projectId"` ProgramCode string `json:"programCode"` ProgramName string `json:"programName"` PointName string `json:"pointName"` PointId string `json:"pointId"` PointType string `json:"pointType"` PointTypeName string `json:"pointTypeName"` Unit string `json:"unit"` AlarmValue float64 `json:"alarmValue"` ControlValue float64 `json:"controlValue"` DeviceId string `json:"deviceId"` Variation float64 `json:"variation"` AggregateValue float64 `json:"aggregateValue"` RateChange float64 `json:"rateChange"` InitValue float64 `json:"initValue"` Value float64 `json:"value"` AlarmStatus int `json:"alarmStatus"` EventTime string `json:"eventTime"` DeepHorDisList *DeepHorDisList `json:"deepHorDisList,omitempty"` } type consumerESDeepExcavation struct { Info DeepExcavationConfig EsHelper dbOperate.ESHelper } func (c *consumerESDeepExcavation) Initial(configStr string) error { c.LoadConfigJson(configStr) return c.outputInitial() } func (c *consumerESDeepExcavation) LoadConfigJson(cfgStr string) { err := yaml.Unmarshal([]byte(cfgStr), &c.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (c *consumerESDeepExcavation) outputInitial() error { c.EsHelper = *dbOperate.NewESHelper( c.Info.Address, c.Info.UserName, c.Info.Password, ) return nil } func (c *consumerESDeepExcavation) Work() { for { queryStr := c.getESQueryStr() log.Printf("ES查询语句: %s", queryStr) hits := c.EsHelper.SearchLastThemes(c.Info.Index, queryStr) log.Printf("查询到 %d 条数据", len(hits)) var dataList []DeepExcavationData for _, hit := range hits { data := c.transformData(hit.Source) if data != nil { dataList = append(dataList, *data) } } if len(dataList) > 0 { for i, data := range dataList { err := c.sendData(data) if err != nil { log.Printf("发送第 %d 条数据失败: %v", i+1, err) } else { log.Printf("成功发送第 %d 条数据", i+1) } } } time.Sleep(time.Duration(c.Info.Time) * time.Minute) } } func (c *consumerESDeepExcavation) getESQueryStr() string { sensors := []string{ "71351", "69396", "69397", "69407", "69398", "69400", "69401", "69402", "69420", "69403", "69404", "69405", "69408", "69409", "69410", "69411", "69412", "69413", "69425", "69423", "69421", "69419", "69418", "69399", "69414", "69406", "69415", "69417", "69533", "69534", "69458", "69459", "69460", "69461", "69462", "69463", "69556", "69521", "69522", "69523", "69524", "69525", "69526", "69527", "69528", "69529", "69464", "69532", "69520", "69539", "69540", "69541", "69542", "69543", "69544", "69545", "69546", "69547", "69538", "69537", "69536", "69535", "69457", "69456", "69549", "69647", "69550", "69645", "69646", "69555", "69640", "69644", "69643", "69551", "69552", "69553", "69554", "69639", "69671", "69642", "69676", "69677", "69678", "69641", "69670", "69638", "69981", "69975", "69969", "69977", "69980", "69976", "69978", "69979", "69974", "69973", "69968", "69970", "69971", "69972", "70000", "70001", "70002", "70003", "70004", "70013", "70014", "70015", "70016", "70017", "69999", "69997", "69998", "69416", "70096", "70097", "70098", "70243", "70244", "70245", "70246", "70247", "70403", "70404", "70691", "70692", "71006", "71007", "71008", "73275", "73276", "73277", "73278", "73279", "73280", "73281", "73282", } sensorStr := "\"" + strings.Join(sensors, "\",\"") + "\"" return fmt.Sprintf(`{ "size": 1000, "query": { "bool": { "must": [ { "terms": { "sensor": [%s] } } ] } } }`, sensorStr) } func (c *consumerESDeepExcavation) transformData(theme models.EsTheme) *DeepExcavationData { switch theme.Factor { case 52: return c.transformFactor52(theme) case 33: return c.transformFactor33(theme) case 31: return c.transformFactor31(theme) default: log.Printf("未知的factor值: %d", theme.Factor) return nil } } // factor=52: 深层水平位移 func (c *consumerESDeepExcavation) transformFactor52(theme models.EsTheme) *DeepExcavationData { pointId := strconv.Itoa(theme.Sensor) pointName := theme.SensorName deviceId := "QDXXGBYY" + pointId eventTime := formatEventTime(theme.CollectTime) x := getFloatFromData(theme.Data, "x") xTotal := getFloatFromData(theme.Data, "xTotal") y := getFloatFromData(theme.Data, "y") // 从sensor_name中提取深度值 depth := extractDepthFromSensorName(pointName) depthValue := x depthInitValue := depthValue + randFloat(-2, 2) depthRateChange := depthValue / 1 variation := x aggregateValue := xTotal rateChange := variation / 1 initValue := variation + randFloat(-2, 2) value := y return &DeepExcavationData{ ProjectId: c.Info.ProjectId, ProgramCode: c.Info.ProgramCode, ProgramName: c.Info.ProgramName, PointName: pointName, PointId: pointId, PointType: "deepHorDis", PointTypeName: "深层水平位移", Unit: "mm", AlarmValue: 400.0, ControlValue: 360.0, DeviceId: deviceId, Variation: roundFloat(variation, 2), AggregateValue: roundFloat(aggregateValue, 2), RateChange: roundFloat(rateChange, 2), InitValue: roundFloat(initValue, 2), Value: roundFloat(value, 2), AlarmStatus: 0, EventTime: eventTime, DeepHorDisList: &DeepHorDisList{ Depth: depth, DepthValue: roundFloat(depthValue, 2), DepthInitValue: roundFloat(depthInitValue, 2), RateChange: roundFloat(depthRateChange, 2), }, } } // factor=33: 锚索轴力 func (c *consumerESDeepExcavation) transformFactor33(theme models.EsTheme) *DeepExcavationData { pointId := strconv.Itoa(theme.Sensor) pointName := theme.SensorName deviceId := "QDXXGBYY" + pointId eventTime := formatEventTime(theme.CollectTime) force := getFloatFromData(theme.Data, "force") variation := force + randFloat(-2, 2) aggregateValue := force / 0.98 rateChange := variation / 1 initValue := variation + randFloat(-2, 2) value := force return &DeepExcavationData{ ProjectId: c.Info.ProjectId, ProgramCode: c.Info.ProgramCode, ProgramName: c.Info.ProgramName, PointName: pointName, PointId: pointId, PointType: "axisforceCab", PointTypeName: "锚索轴力", Unit: "kn", AlarmValue: 5250.0, ControlValue: 4725.0, DeviceId: deviceId, Variation: roundFloat(variation, 2), AggregateValue: roundFloat(aggregateValue, 2), RateChange: roundFloat(rateChange, 2), InitValue: roundFloat(initValue, 2), Value: roundFloat(value, 2), AlarmStatus: 0, EventTime: eventTime, } } // factor=31: 地下水位 func (c *consumerESDeepExcavation) transformFactor31(theme models.EsTheme) *DeepExcavationData { pointId := strconv.Itoa(theme.Sensor) pointName := theme.SensorName deviceId := "QDXXGBYY" + pointId eventTime := formatEventTime(theme.CollectTime) force := getFloatFromData(theme.Data, "force") waterLevel := getFloatFromData(theme.Data, "waterLevel") variation := force + randFloat(-1, 1) aggregateValue := waterLevel / 0.98 rateChange := variation / 1 initValue := variation + randFloat(-1, 1) value := waterLevel return &DeepExcavationData{ ProjectId: c.Info.ProjectId, ProgramCode: c.Info.ProgramCode, ProgramName: c.Info.ProgramName, PointName: pointName, PointId: pointId, PointType: "froundLev", PointTypeName: "地下水位", Unit: "mm", AlarmValue: 1000.0, ControlValue: 900.0, DeviceId: deviceId, Variation: roundFloat(variation, 2), AggregateValue: roundFloat(aggregateValue, 2), RateChange: roundFloat(rateChange, 2), InitValue: roundFloat(initValue, 2), Value: roundFloat(value, 2), AlarmStatus: 0, EventTime: eventTime, } } func (c *consumerESDeepExcavation) sendData(data DeepExcavationData) error { // 生成签名 sign := generateSign(c.Info.Secret, c.Info.Appid, c.Info.ProjectId) // 拼接URL fullURL := fmt.Sprintf("%s?appid=%s&projectId=%s&sign=%s", c.Info.URL, c.Info.Appid, c.Info.ProjectId, sign) // 序列化数据(每个请求只发送一个数据,包装成数组) bodyBytes, err := json.Marshal([]DeepExcavationData{data}) if err != nil { return fmt.Errorf("序列化数据失败: %v", err) } log.Printf("发送HTTP请求: %s", fullURL) log.Printf("请求体: %s", string(bodyBytes)) // 创建HTTP请求 req, err := http.NewRequest("POST", fullURL, bytes.NewReader(bodyBytes)) if err != nil { return fmt.Errorf("创建HTTP请求失败: %v", err) } req.Header.Set("Content-Type", "application/json") client := &http.Client{ Timeout: 2 * time.Minute, } resp, err := client.Do(req) if err != nil { return fmt.Errorf("发送请求失败: %v", err) } defer resp.Body.Close() responseBody, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("读取响应失败: %v", err) } if resp.StatusCode != http.StatusOK { return fmt.Errorf("请求失败,状态码: %d, 响应: %s", resp.StatusCode, string(responseBody)) } log.Printf("请求成功! 响应: %s", string(responseBody)) return nil } // 生成签名 func generateSign(secret, appid, projectId string) string { // 连接字符串: secret + "appid" + appid + "projectId" + projectId + secret rawStr := secret + "appid" + appid + "projectId" + projectId + secret // MD5加密 hash := md5.Sum([]byte(rawStr)) return strings.ToUpper(hex.EncodeToString(hash[:])) } // 格式化事件时间(加8小时) func formatEventTime(collectTime string) string { if collectTime == "" { return time.Now().Add(8 * time.Hour).Format("2006-01-02 15:04:00") } // 尝试解析多种格式 formats := []string{ "2006-01-02T15:04:05", "2006-01-02T15:04:05Z", "2006-01-02 15:04:05", "2006/01/02 15:04:05", } for _, format := range formats { t, err := time.Parse(format, collectTime) if err == nil { return t.Add(8 * time.Hour).Format("2006-01-02 15:04:00") } } // 默认返回当前时间加8小时 return time.Now().Add(8 * time.Hour).Format("2006-01-02 15:04:00") } // 从sensor_name中提取深度值(_后面的数字,取负数) func extractDepthFromSensorName(sensorName string) int { parts := strings.Split(sensorName, "-") if len(parts) >= 2 { depth, err := strconv.Atoi(parts[len(parts)-1]) if err == nil { return -depth } } return 0 } // 从data map中获取float值 func getFloatFromData(data map[string]any, key string) float64 { if val, ok := data[key]; ok && val != nil { switch v := val.(type) { case float64: return v case float32: return float64(v) case int: return float64(v) case int64: return float64(v) case string: if f, err := strconv.ParseFloat(v, 64); err == nil { return f } } } return 0.0 } // 生成随机浮点数 func randFloat(min, max float64) float64 { return min + rand.Float64()*(max-min) } // 保留两位小数 func roundFloat(f float64, places int) float64 { shift := float64(1) for i := 0; i < places; i++ { shift *= 10 } return float64(int(f*shift+0.5)) / shift }