From 048994435d770b594e274af66be6210d54b20453 Mon Sep 17 00:00:00 2001 From: 18209 Date: Thu, 6 Nov 2025 14:36:57 +0800 Subject: [PATCH] =?UTF-8?q?=E7=88=86=E9=97=AA=E4=B8=8A=E6=8A=A5=E8=BF=9B?= =?UTF-8?q?=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config_爆闪es数据上报http.yaml | 18 ++ consumers/AXYBS/config.go | 13 + consumers/consumerESraw2http.go | 262 ++++++++++++++++++ consumers/consumerManage.go | 3 + dbOperate/elasticsearchHelper.go | 16 +- go.mod | 1 + go.sum | 2 + models/PostCongif.go | 8 + 8 files changed, 320 insertions(+), 3 deletions(-) create mode 100644 configFiles/config_爆闪es数据上报http.yaml create mode 100644 consumers/AXYBS/config.go create mode 100644 consumers/consumerESraw2http.go create mode 100644 models/PostCongif.go diff --git a/configFiles/config_爆闪es数据上报http.yaml b/configFiles/config_爆闪es数据上报http.yaml new file mode 100644 index 0000000..787691f --- /dev/null +++ b/configFiles/config_爆闪es数据上报http.yaml @@ -0,0 +1,18 @@ +consumer: consumerESraw2http +address: + - "http://10.8.30.160:30092" +index: "anxincloud_last_raw" +userName: "esuser" +password: "fas123" +time: 1 +url: "http://183.233.128.153:20190/scm/third-api-test/Api/V1/ThirdPartyPlatform/PushDeviceHeartLog" +query: + iota_device: + - "335eda85-2bca-4ee7-9a6f-e0038f3321e1" + - "fc794781-d025-4ad7-95d4-08b372f8f307" + - "43b5209b-b5cb-4e39-9c18-32fd536e41e9" + - "ad1c5184-44da-4343-8ae6-8d3927c31093" + - "35234ef9-7137-48f1-b877-0eb28af876cd" + - "8555b009-67c3-4a31-998a-4b3d49049661" + - "21812630-404e-4946-9aaf-42e529db923a" + - "61aea690-e544-4717-aaf9-dd346e7465c0" \ No newline at end of file diff --git a/consumers/AXYBS/config.go b/consumers/AXYBS/config.go new file mode 100644 index 0000000..6ff98a5 --- /dev/null +++ b/consumers/AXYBS/config.go @@ -0,0 +1,13 @@ +package AXYBS + +type ElasticsearchConfig struct { + Address []string `yaml:"address"` + Index string `yaml:"index"` + UserName string `yaml:"userName"` + Password string `yaml:"password"` + Time int `yaml:"time"` + URL string `yaml:"url"` + Query struct { + IotaDevice []string `yaml:"iota_device"` + } `yaml:"query"` +} diff --git a/consumers/consumerESraw2http.go b/consumers/consumerESraw2http.go new file mode 100644 index 0000000..13f8e22 --- /dev/null +++ b/consumers/consumerESraw2http.go @@ -0,0 +1,262 @@ +package consumers + +import ( + "bytes" + "crypto/sha1" + "fmt" + "github.com/goccy/go-json" + "goInOut/consumers/AXYBS" + "goInOut/dbOperate" + "goInOut/models" + "gopkg.in/yaml.v3" + "io" + "log" + "net/http" + "strconv" + "strings" + "time" +) + +type consumerESraw2http struct { + //具体配置 + Info AXYBS.ElasticsearchConfig + OutEs dbOperate.ESHelper +} + +func (the *consumerESraw2http) Work() { + panic("implement me") +} + +func (the *consumerESraw2http) LoadConfigJson(cfgStr string) { + // 将 JSON 格式的数据解析到结构体中 + err := yaml.Unmarshal([]byte(cfgStr), &the.Info) + if err != nil { + log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) + panic(err) + } +} + +func (the *consumerESraw2http) Initial(cfg string) error { + + the.LoadConfigJson(cfg) + err := the.outputInitial() + if err != nil { + return err + } + err = the.inputInitial() + if err != nil { + return err + } + + return err +} + +func (the *consumerESraw2http) inputInitial() error { + + // 定时执行任务 + for { + // 查询 Elasticsearch 数据 + //models.NewESHelper(config.Elasticsearch.Address, config.Elasticsearch.UserName, config.Elasticsearch.Password) + queryStr := the.getESTimeQueryStr() + TimeTheme := the.OutEs.SearchRaw("anxincloud_last_raw", queryStr) + + // 遍历查询结果,并发送数据 + for _, hit := range TimeTheme { + + log.Printf("esResp.Hits.Hits: %s", hit) + //对es的数据进行逻辑处理 + postThing := ChangeEsValueToHttpBody(hit) + + // 发送每个设备的 value 数据 + err := SendPostRequest(the.Info.URL, postThing) + if err != nil { + log.Printf("发送 POST 请求失败: %s", err) + } else { + log.Printf("成功发送 value: %s", postThing.EquipCode) + } + } + + // 等待五分钟 + time.Sleep(time.Duration(the.Info.Time) * time.Minute) + } + return nil +} + +func (the *consumerESraw2http) outputInitial() error { + //数据出口 + the.OutEs = *dbOperate.NewESHelper( + the.Info.Address, + the.Info.UserName, + the.Info.Password, + ) + + return nil +} + +// SendPostRequest 发送POST请求,包含指定的头部和签名 +func SendPostRequest(url string, postThings models.PostThings) error { + // 格式化时间 + formattedTime := time.Now().Format("2006-01-02 15:04:05") + + // 构建请求体 + postBody := map[string]interface{}{ + "equipcode": postThings.EquipCode, + "lng": postThings.Lng, + "lat": postThings.Lat, + "status": 1, + "online": postThings.Online, + "currentstate": 1, + "led": 1, + "startorstopstate": "Start", + "threshold": 10.0, + "thresholdchangerate": 10.0, + "updatetime": formattedTime, + "remarks": "Remarks", + "lnglatsource": 1, + } + + // 将请求体转换为紧凑JSON字符串(无空格和换行) + bodyBytes, err := json.Marshal(postBody) + if err != nil { + return fmt.Errorf("序列化请求体失败: %v", err) + } + compactBody := string(bodyBytes) + + // 生成10位时间戳 + rnd := strconv.FormatInt(time.Now().Unix(), 10) + + // 计算签名 + appKey := "DlgAFSygIQNWPFlL" + appSecret := "yx9Zb3uN3boUXvCE3EfI" + sign := calculateSignature(compactBody, rnd, appKey, appSecret) + + // 创建HTTP请求 + req, err := http.NewRequest("POST", url, bytes.NewReader(bodyBytes)) + if err != nil { + return fmt.Errorf("创建HTTP请求失败: %v", err) + } + + // 设置请求头 + req.Header.Set("Content-Type", "application/json") + req.Header.Set("appkey", appKey) + //req.Header.Set("appsecret", appSecret) + req.Header.Set("rnd", rnd) + req.Header.Set("sign", sign) + + // 发送请求 + client := &http.Client{ + Timeout: 2 * time.Minute, + } + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("发送请求失败: %v", err) + } + defer resp.Body.Close() + + // 读取响应体 + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("读取响应失败: %v", err) + } + + // 检查HTTP状态码 + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("请求失败,状态码: %d, 响应: %s", resp.StatusCode, string(responseBody)) + } + + fmt.Printf("请求成功! 响应: %s\n", string(responseBody)) + return nil +} + +func calculateSignature(compactBody, rnd, appKey, appSecret string) string { + // 拼接字符串 + data := compactBody + rnd + appKey + appSecret + + // 计算SHA1哈希 + h := sha1.New() + h.Write([]byte(data)) + hashBytes := h.Sum(nil) + + // 转换为大写十六进制字符串 + return fmt.Sprintf("%X", hashBytes) +} + +func (the *consumerESraw2http) getESTimeQueryStr() string { + + // 构建查询请求体 + esQuery := fmt.Sprintf(` + { + "query": { + "bool": { + "must": [ + { + "terms": { + "iota_device": [%s] + } + } + ] + } + } + }`, formatUUIDArray(the.Info.Query.IotaDevice)) + return esQuery +} + +func formatUUIDArray(arr []string) string { + // 用双引号包裹每个元素,并用逗号连接 + quoted := make([]string, len(arr)) + for i, v := range arr { + quoted[i] = fmt.Sprintf("\"%s\"", v) // 给每个元素加上双引号 + } + + // 将格式化后的数组用逗号连接起来 + return strings.Join(quoted, ", ") +} + +func ChangeEsValueToHttpBody(hit models.HitRaw) models.PostThings { + fmt.Println("ChangeEsValueToHttpBody") + var postThings models.PostThings + + if hit.Source.IotaDevice == "335eda85-2bca-4ee7-9a6f-e0038f3321e1" { + postThings.EquipCode = "8678960778475670" + postThings.Online = hit.Source.Data["value"].(float64) + postThings.Lng = 116.1439856 + postThings.Lat = 23.00194234 + } else if hit.Source.IotaDevice == "fc794781-d025-4ad7-95d4-08b372f8f307" { + postThings.EquipCode = "8623610710964500" + postThings.Online = hit.Source.Data["value"].(float64) + postThings.Lng = 116.1508654 + postThings.Lat = 23.00236479 + } else if hit.Source.IotaDevice == "43b5209b-b5cb-4e39-9c18-32fd536e41e9" { + postThings.EquipCode = "8678960774603870" + postThings.Online = hit.Source.Data["value"].(float64) + postThings.Lng = 116.1508654 + postThings.Lat = 23.00236479 + } else if hit.Source.IotaDevice == "ad1c5184-44da-4343-8ae6-8d3927c31093" { + postThings.EquipCode = "8678960777615450" + postThings.Online = hit.Source.Data["value"].(float64) + postThings.Lng = 116.1508654 + postThings.Lat = 23.00236479 + } else if hit.Source.IotaDevice == "35234ef9-7137-48f1-b877-0eb28af876cd" { + postThings.EquipCode = "8678960778485160" + postThings.Online = hit.Source.Data["value"].(float64) + postThings.Lng = 116.1508654 + postThings.Lat = 23.00236479 + } else if hit.Source.IotaDevice == "8555b009-67c3-4a31-998a-4b3d49049661" { + postThings.EquipCode = "8678960778882800" + postThings.Online = hit.Source.Data["value"].(float64) + postThings.Lng = 116.1508654 + postThings.Lat = 23.00236479 + } else if hit.Source.IotaDevice == "21812630-404e-4946-9aaf-42e529db923a" { //往深圳方向信号灯 + postThings.EquipCode = "8678960778883801" + postThings.Online = hit.Source.Data["value"].(float64) + postThings.Lng = 116.1508654 + postThings.Lat = 23.00236479 + } else if hit.Source.IotaDevice == "61aea690-e544-4717-aaf9-dd346e7465c0" { //往汕头方向信号灯 + postThings.EquipCode = "8678960778883802" + postThings.Online = hit.Source.Data["value"].(float64) + postThings.Lng = 116.1508654 + postThings.Lat = 23.00236479 + } + + return postThings +} diff --git a/consumers/consumerManage.go b/consumers/consumerManage.go index 6880c58..2fc4b23 100644 --- a/consumers/consumerManage.go +++ b/consumers/consumerManage.go @@ -59,6 +59,9 @@ func GetConsumer(name string) (consumer IConsumer) { case "consumerAXYThemeToES": consumer = new(consumerAXYThemeToES) + case "consumerESraw2http": + consumer = new(consumerESraw2http) + default: consumer = nil } diff --git a/dbOperate/elasticsearchHelper.go b/dbOperate/elasticsearchHelper.go index cdce690..99aa67c 100644 --- a/dbOperate/elasticsearchHelper.go +++ b/dbOperate/elasticsearchHelper.go @@ -116,11 +116,21 @@ func (the *ESHelper) request(index, reqBody string) (map[string]any, error) { } func (the *ESHelper) searchRaw(index, reqBody string) (models.IotaData, error) { - respmap, err := the.request(index, reqBody) - if respmap != nil { - + body := &bytes.Buffer{} + body.WriteString(reqBody) + response, err := the.esClient.Search( + the.esClient.Search.WithIndex(index), + the.esClient.Search.WithBody(body), + ) + defer response.Body.Close() + if err != nil { + //return nil, err } + log.Println(response.Status(), response.Header, response.Body) iotaDatas := models.IotaData{} + if err := json.NewDecoder(response.Body).Decode(&iotaDatas); err != nil { + log.Fatalf("Error parsing the response body: %s", err) + } return iotaDatas, err } diff --git a/go.mod b/go.mod index 8cacac4..19acae0 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/eclipse/paho.mqtt.golang v1.4.3 github.com/elastic/go-elasticsearch/v6 v6.8.10 github.com/go-sql-driver/mysql v1.8.1 + github.com/goccy/go-json v0.10.2 github.com/google/uuid v1.3.1 github.com/influxdata/influxdb-client-go/v2 v2.13.0 github.com/jmoiron/sqlx v1.4.0 diff --git a/go.sum b/go.sum index b55a438..a199e44 100644 --- a/go.sum +++ b/go.sum @@ -46,6 +46,8 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= diff --git a/models/PostCongif.go b/models/PostCongif.go new file mode 100644 index 0000000..1bce826 --- /dev/null +++ b/models/PostCongif.go @@ -0,0 +1,8 @@ +package models + +type PostThings struct { + EquipCode string + Lng float64 + Lat float64 + Online float64 +}