package consumers import ( "bytes" "crypto/sha1" "fmt" "github.com/goccy/go-json" "goInOut/consumers/AXYBS" "goInOut/dbOperate" "goInOut/models" "gopkg.in/yaml.v3" "io" "log" "net/http" "strconv" "strings" "time" ) type consumerESraw2http struct { //具体配置 Info AXYBS.ElasticsearchConfig OutEs dbOperate.ESHelper } func (the *consumerESraw2http) Work() { panic("implement me") } func (the *consumerESraw2http) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := yaml.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerESraw2http) Initial(cfg string) error { the.LoadConfigJson(cfg) err := the.outputInitial() if err != nil { return err } err = the.inputInitial() if err != nil { return err } return err } func (the *consumerESraw2http) inputInitial() error { // 定时执行任务 for { // 查询 Elasticsearch 数据 //models.NewESHelper(config.Elasticsearch.Address, config.Elasticsearch.UserName, config.Elasticsearch.Password) queryStr := the.getESTimeQueryStr() TimeTheme := the.OutEs.SearchRaw("anxincloud_last_raw", queryStr) // 遍历查询结果,并发送数据 for _, hit := range TimeTheme { log.Printf("esResp.Hits.Hits: %s", hit) //对es的数据进行逻辑处理 postThing := ChangeEsValueToHttpBody(hit) // 发送每个设备的 value 数据 err := SendPostRequest(the.Info.URL, postThing) if err != nil { log.Printf("发送 POST 请求失败: %s", err) } else { log.Printf("成功发送 value: %s", postThing.EquipCode) } } // 等待五分钟 time.Sleep(time.Duration(the.Info.Time) * time.Minute) } return nil } func (the *consumerESraw2http) outputInitial() error { //数据出口 the.OutEs = *dbOperate.NewESHelper( the.Info.Address, the.Info.UserName, the.Info.Password, ) return nil } // SendPostRequest 发送POST请求,包含指定的头部和签名 func SendPostRequest(url string, postThings models.PostThings) error { // 格式化时间 formattedTime := time.Now().Format("2006-01-02 15:04:05") // 构建请求体 postBody := map[string]interface{}{ "equipcode": postThings.EquipCode, "lng": postThings.Lng, "lat": postThings.Lat, "status": 1, "online": postThings.Online, "currentstate": 1, "led": 1, "startorstopstate": "Start", "threshold": 10.0, "thresholdchangerate": 10.0, "updatetime": formattedTime, "remarks": "Remarks", "lnglatsource": 1, } // 将请求体转换为紧凑JSON字符串(无空格和换行) bodyBytes, err := json.Marshal(postBody) if err != nil { return fmt.Errorf("序列化请求体失败: %v", err) } compactBody := string(bodyBytes) // 生成10位时间戳 rnd := strconv.FormatInt(time.Now().Unix(), 10) // 计算签名 appKey := "DlgAFSygIQNWPFlL" appSecret := "yx9Zb3uN3boUXvCE3EfI" sign := calculateSignature(compactBody, rnd, appKey, appSecret) // 创建HTTP请求 req, err := http.NewRequest("POST", url, bytes.NewReader(bodyBytes)) if err != nil { return fmt.Errorf("创建HTTP请求失败: %v", err) } // 设置请求头 req.Header.Set("Content-Type", "application/json") req.Header.Set("appkey", appKey) //req.Header.Set("appsecret", appSecret) req.Header.Set("rnd", rnd) req.Header.Set("sign", sign) // 发送请求 client := &http.Client{ Timeout: 2 * time.Minute, } resp, err := client.Do(req) if err != nil { return fmt.Errorf("发送请求失败: %v", err) } defer resp.Body.Close() // 读取响应体 responseBody, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("读取响应失败: %v", err) } // 检查HTTP状态码 if resp.StatusCode != http.StatusOK { return fmt.Errorf("请求失败,状态码: %d, 响应: %s", resp.StatusCode, string(responseBody)) } fmt.Printf("请求成功! 响应: %s\n", string(responseBody)) return nil } func calculateSignature(compactBody, rnd, appKey, appSecret string) string { // 拼接字符串 data := compactBody + rnd + appKey + appSecret // 计算SHA1哈希 h := sha1.New() h.Write([]byte(data)) hashBytes := h.Sum(nil) // 转换为大写十六进制字符串 return fmt.Sprintf("%X", hashBytes) } func (the *consumerESraw2http) getESTimeQueryStr() string { // 构建查询请求体 esQuery := fmt.Sprintf(` { "query": { "bool": { "must": [ { "terms": { "iota_device": [%s] } } ] } } }`, formatUUIDArray(the.Info.Query.IotaDevice)) return esQuery } func formatUUIDArray(arr []string) string { // 用双引号包裹每个元素,并用逗号连接 quoted := make([]string, len(arr)) for i, v := range arr { quoted[i] = fmt.Sprintf("\"%s\"", v) // 给每个元素加上双引号 } // 将格式化后的数组用逗号连接起来 return strings.Join(quoted, ", ") } func ChangeEsValueToHttpBody(hit models.HitRaw) models.PostThings { fmt.Println("ChangeEsValueToHttpBody") var postThings models.PostThings if hit.Source.IotaDevice == "335eda85-2bca-4ee7-9a6f-e0038f3321e1" { postThings.EquipCode = "8678960778475670" postThings.Online = hit.Source.Data["value"].(float64) postThings.Lng = 116.1439856 postThings.Lat = 23.00194234 } else if hit.Source.IotaDevice == "fc794781-d025-4ad7-95d4-08b372f8f307" { postThings.EquipCode = "8623610710964500" postThings.Online = hit.Source.Data["value"].(float64) postThings.Lng = 116.1508654 postThings.Lat = 23.00236479 } else if hit.Source.IotaDevice == "43b5209b-b5cb-4e39-9c18-32fd536e41e9" { postThings.EquipCode = "8678960774603870" postThings.Online = hit.Source.Data["value"].(float64) postThings.Lng = 116.1508654 postThings.Lat = 23.00236479 } else if hit.Source.IotaDevice == "ad1c5184-44da-4343-8ae6-8d3927c31093" { postThings.EquipCode = "8678960777615450" postThings.Online = hit.Source.Data["value"].(float64) postThings.Lng = 116.1508654 postThings.Lat = 23.00236479 } else if hit.Source.IotaDevice == "35234ef9-7137-48f1-b877-0eb28af876cd" { postThings.EquipCode = "8678960778485160" postThings.Online = hit.Source.Data["value"].(float64) postThings.Lng = 116.1508654 postThings.Lat = 23.00236479 } else if hit.Source.IotaDevice == "8555b009-67c3-4a31-998a-4b3d49049661" { postThings.EquipCode = "8678960778882800" postThings.Online = hit.Source.Data["value"].(float64) postThings.Lng = 116.1508654 postThings.Lat = 23.00236479 } else if hit.Source.IotaDevice == "21812630-404e-4946-9aaf-42e529db923a" { //往深圳方向信号灯 postThings.EquipCode = "8678960778883801" postThings.Online = hit.Source.Data["value"].(float64) postThings.Lng = 116.1508654 postThings.Lat = 23.00236479 } else if hit.Source.IotaDevice == "61aea690-e544-4717-aaf9-dd346e7465c0" { //往汕头方向信号灯 postThings.EquipCode = "8678960778883802" postThings.Online = hit.Source.Data["value"].(float64) postThings.Lng = 116.1508654 postThings.Lat = 23.00236479 } return postThings }