Browse Source

爆闪上报进程

dev
18209 2 days ago
parent
commit
048994435d
  1. 18
      configFiles/config_爆闪es数据上报http.yaml
  2. 13
      consumers/AXYBS/config.go
  3. 262
      consumers/consumerESraw2http.go
  4. 3
      consumers/consumerManage.go
  5. 16
      dbOperate/elasticsearchHelper.go
  6. 1
      go.mod
  7. 2
      go.sum
  8. 8
      models/PostCongif.go

18
configFiles/config_爆闪es数据上报http.yaml

@ -0,0 +1,18 @@
consumer: consumerESraw2http
address:
- "http://10.8.30.160:30092"
index: "anxincloud_last_raw"
userName: "esuser"
password: "fas123"
time: 1
url: "http://183.233.128.153:20190/scm/third-api-test/Api/V1/ThirdPartyPlatform/PushDeviceHeartLog"
query:
iota_device:
- "335eda85-2bca-4ee7-9a6f-e0038f3321e1"
- "fc794781-d025-4ad7-95d4-08b372f8f307"
- "43b5209b-b5cb-4e39-9c18-32fd536e41e9"
- "ad1c5184-44da-4343-8ae6-8d3927c31093"
- "35234ef9-7137-48f1-b877-0eb28af876cd"
- "8555b009-67c3-4a31-998a-4b3d49049661"
- "21812630-404e-4946-9aaf-42e529db923a"
- "61aea690-e544-4717-aaf9-dd346e7465c0"

13
consumers/AXYBS/config.go

@ -0,0 +1,13 @@
package AXYBS
type ElasticsearchConfig struct {
Address []string `yaml:"address"`
Index string `yaml:"index"`
UserName string `yaml:"userName"`
Password string `yaml:"password"`
Time int `yaml:"time"`
URL string `yaml:"url"`
Query struct {
IotaDevice []string `yaml:"iota_device"`
} `yaml:"query"`
}

262
consumers/consumerESraw2http.go

@ -0,0 +1,262 @@
package consumers
import (
"bytes"
"crypto/sha1"
"fmt"
"github.com/goccy/go-json"
"goInOut/consumers/AXYBS"
"goInOut/dbOperate"
"goInOut/models"
"gopkg.in/yaml.v3"
"io"
"log"
"net/http"
"strconv"
"strings"
"time"
)
type consumerESraw2http struct {
//具体配置
Info AXYBS.ElasticsearchConfig
OutEs dbOperate.ESHelper
}
func (the *consumerESraw2http) Work() {
panic("implement me")
}
func (the *consumerESraw2http) LoadConfigJson(cfgStr string) {
// 将 JSON 格式的数据解析到结构体中
err := yaml.Unmarshal([]byte(cfgStr), &the.Info)
if err != nil {
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
panic(err)
}
}
func (the *consumerESraw2http) Initial(cfg string) error {
the.LoadConfigJson(cfg)
err := the.outputInitial()
if err != nil {
return err
}
err = the.inputInitial()
if err != nil {
return err
}
return err
}
func (the *consumerESraw2http) inputInitial() error {
// 定时执行任务
for {
// 查询 Elasticsearch 数据
//models.NewESHelper(config.Elasticsearch.Address, config.Elasticsearch.UserName, config.Elasticsearch.Password)
queryStr := the.getESTimeQueryStr()
TimeTheme := the.OutEs.SearchRaw("anxincloud_last_raw", queryStr)
// 遍历查询结果,并发送数据
for _, hit := range TimeTheme {
log.Printf("esResp.Hits.Hits: %s", hit)
//对es的数据进行逻辑处理
postThing := ChangeEsValueToHttpBody(hit)
// 发送每个设备的 value 数据
err := SendPostRequest(the.Info.URL, postThing)
if err != nil {
log.Printf("发送 POST 请求失败: %s", err)
} else {
log.Printf("成功发送 value: %s", postThing.EquipCode)
}
}
// 等待五分钟
time.Sleep(time.Duration(the.Info.Time) * time.Minute)
}
return nil
}
func (the *consumerESraw2http) outputInitial() error {
//数据出口
the.OutEs = *dbOperate.NewESHelper(
the.Info.Address,
the.Info.UserName,
the.Info.Password,
)
return nil
}
// SendPostRequest 发送POST请求,包含指定的头部和签名
func SendPostRequest(url string, postThings models.PostThings) error {
// 格式化时间
formattedTime := time.Now().Format("2006-01-02 15:04:05")
// 构建请求体
postBody := map[string]interface{}{
"equipcode": postThings.EquipCode,
"lng": postThings.Lng,
"lat": postThings.Lat,
"status": 1,
"online": postThings.Online,
"currentstate": 1,
"led": 1,
"startorstopstate": "Start",
"threshold": 10.0,
"thresholdchangerate": 10.0,
"updatetime": formattedTime,
"remarks": "Remarks",
"lnglatsource": 1,
}
// 将请求体转换为紧凑JSON字符串(无空格和换行)
bodyBytes, err := json.Marshal(postBody)
if err != nil {
return fmt.Errorf("序列化请求体失败: %v", err)
}
compactBody := string(bodyBytes)
// 生成10位时间戳
rnd := strconv.FormatInt(time.Now().Unix(), 10)
// 计算签名
appKey := "DlgAFSygIQNWPFlL"
appSecret := "yx9Zb3uN3boUXvCE3EfI"
sign := calculateSignature(compactBody, rnd, appKey, appSecret)
// 创建HTTP请求
req, err := http.NewRequest("POST", url, bytes.NewReader(bodyBytes))
if err != nil {
return fmt.Errorf("创建HTTP请求失败: %v", err)
}
// 设置请求头
req.Header.Set("Content-Type", "application/json")
req.Header.Set("appkey", appKey)
//req.Header.Set("appsecret", appSecret)
req.Header.Set("rnd", rnd)
req.Header.Set("sign", sign)
// 发送请求
client := &http.Client{
Timeout: 2 * time.Minute,
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("发送请求失败: %v", err)
}
defer resp.Body.Close()
// 读取响应体
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("读取响应失败: %v", err)
}
// 检查HTTP状态码
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("请求失败,状态码: %d, 响应: %s", resp.StatusCode, string(responseBody))
}
fmt.Printf("请求成功! 响应: %s\n", string(responseBody))
return nil
}
func calculateSignature(compactBody, rnd, appKey, appSecret string) string {
// 拼接字符串
data := compactBody + rnd + appKey + appSecret
// 计算SHA1哈希
h := sha1.New()
h.Write([]byte(data))
hashBytes := h.Sum(nil)
// 转换为大写十六进制字符串
return fmt.Sprintf("%X", hashBytes)
}
func (the *consumerESraw2http) getESTimeQueryStr() string {
// 构建查询请求体
esQuery := fmt.Sprintf(`
{
"query": {
"bool": {
"must": [
{
"terms": {
"iota_device": [%s]
}
}
]
}
}
}`, formatUUIDArray(the.Info.Query.IotaDevice))
return esQuery
}
func formatUUIDArray(arr []string) string {
// 用双引号包裹每个元素,并用逗号连接
quoted := make([]string, len(arr))
for i, v := range arr {
quoted[i] = fmt.Sprintf("\"%s\"", v) // 给每个元素加上双引号
}
// 将格式化后的数组用逗号连接起来
return strings.Join(quoted, ", ")
}
func ChangeEsValueToHttpBody(hit models.HitRaw) models.PostThings {
fmt.Println("ChangeEsValueToHttpBody")
var postThings models.PostThings
if hit.Source.IotaDevice == "335eda85-2bca-4ee7-9a6f-e0038f3321e1" {
postThings.EquipCode = "8678960778475670"
postThings.Online = hit.Source.Data["value"].(float64)
postThings.Lng = 116.1439856
postThings.Lat = 23.00194234
} else if hit.Source.IotaDevice == "fc794781-d025-4ad7-95d4-08b372f8f307" {
postThings.EquipCode = "8623610710964500"
postThings.Online = hit.Source.Data["value"].(float64)
postThings.Lng = 116.1508654
postThings.Lat = 23.00236479
} else if hit.Source.IotaDevice == "43b5209b-b5cb-4e39-9c18-32fd536e41e9" {
postThings.EquipCode = "8678960774603870"
postThings.Online = hit.Source.Data["value"].(float64)
postThings.Lng = 116.1508654
postThings.Lat = 23.00236479
} else if hit.Source.IotaDevice == "ad1c5184-44da-4343-8ae6-8d3927c31093" {
postThings.EquipCode = "8678960777615450"
postThings.Online = hit.Source.Data["value"].(float64)
postThings.Lng = 116.1508654
postThings.Lat = 23.00236479
} else if hit.Source.IotaDevice == "35234ef9-7137-48f1-b877-0eb28af876cd" {
postThings.EquipCode = "8678960778485160"
postThings.Online = hit.Source.Data["value"].(float64)
postThings.Lng = 116.1508654
postThings.Lat = 23.00236479
} else if hit.Source.IotaDevice == "8555b009-67c3-4a31-998a-4b3d49049661" {
postThings.EquipCode = "8678960778882800"
postThings.Online = hit.Source.Data["value"].(float64)
postThings.Lng = 116.1508654
postThings.Lat = 23.00236479
} else if hit.Source.IotaDevice == "21812630-404e-4946-9aaf-42e529db923a" { //往深圳方向信号灯
postThings.EquipCode = "8678960778883801"
postThings.Online = hit.Source.Data["value"].(float64)
postThings.Lng = 116.1508654
postThings.Lat = 23.00236479
} else if hit.Source.IotaDevice == "61aea690-e544-4717-aaf9-dd346e7465c0" { //往汕头方向信号灯
postThings.EquipCode = "8678960778883802"
postThings.Online = hit.Source.Data["value"].(float64)
postThings.Lng = 116.1508654
postThings.Lat = 23.00236479
}
return postThings
}

3
consumers/consumerManage.go

@ -59,6 +59,9 @@ func GetConsumer(name string) (consumer IConsumer) {
case "consumerAXYThemeToES": case "consumerAXYThemeToES":
consumer = new(consumerAXYThemeToES) consumer = new(consumerAXYThemeToES)
case "consumerESraw2http":
consumer = new(consumerESraw2http)
default: default:
consumer = nil consumer = nil
} }

16
dbOperate/elasticsearchHelper.go

@ -116,11 +116,21 @@ func (the *ESHelper) request(index, reqBody string) (map[string]any, error) {
} }
func (the *ESHelper) searchRaw(index, reqBody string) (models.IotaData, error) { func (the *ESHelper) searchRaw(index, reqBody string) (models.IotaData, error) {
respmap, err := the.request(index, reqBody) body := &bytes.Buffer{}
if respmap != nil { body.WriteString(reqBody)
response, err := the.esClient.Search(
the.esClient.Search.WithIndex(index),
the.esClient.Search.WithBody(body),
)
defer response.Body.Close()
if err != nil {
//return nil, err
} }
log.Println(response.Status(), response.Header, response.Body)
iotaDatas := models.IotaData{} iotaDatas := models.IotaData{}
if err := json.NewDecoder(response.Body).Decode(&iotaDatas); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
return iotaDatas, err return iotaDatas, err
} }

1
go.mod

@ -7,6 +7,7 @@ require (
github.com/eclipse/paho.mqtt.golang v1.4.3 github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/elastic/go-elasticsearch/v6 v6.8.10 github.com/elastic/go-elasticsearch/v6 v6.8.10
github.com/go-sql-driver/mysql v1.8.1 github.com/go-sql-driver/mysql v1.8.1
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.3.1 github.com/google/uuid v1.3.1
github.com/influxdata/influxdb-client-go/v2 v2.13.0 github.com/influxdata/influxdb-client-go/v2 v2.13.0
github.com/jmoiron/sqlx v1.4.0 github.com/jmoiron/sqlx v1.4.0

2
go.sum

@ -46,6 +46,8 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=

8
models/PostCongif.go

@ -0,0 +1,8 @@
package models
type PostThings struct {
EquipCode string
Lng float64
Lat float64
Online float64
}
Loading…
Cancel
Save