数据 输入输出 处理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

262 lines
7.0 KiB

package consumers
import (
"bytes"
"crypto/sha1"
"fmt"
"github.com/goccy/go-json"
"goInOut/consumers/AXYBS"
"goInOut/dbOperate"
"goInOut/models"
"gopkg.in/yaml.v3"
"io"
"log"
"net/http"
"strconv"
"strings"
"time"
)
type consumerESraw2http struct {
//具体配置
Info AXYBS.ElasticsearchConfig
OutEs dbOperate.ESHelper
}
func (the *consumerESraw2http) Work() {
panic("implement me")
}
func (the *consumerESraw2http) LoadConfigJson(cfgStr string) {
// 将 JSON 格式的数据解析到结构体中
err := yaml.Unmarshal([]byte(cfgStr), &the.Info)
if err != nil {
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
panic(err)
}
}
func (the *consumerESraw2http) Initial(cfg string) error {
the.LoadConfigJson(cfg)
err := the.outputInitial()
if err != nil {
return err
}
err = the.inputInitial()
if err != nil {
return err
}
return err
}
func (the *consumerESraw2http) inputInitial() error {
// 定时执行任务
for {
// 查询 Elasticsearch 数据
//models.NewESHelper(config.Elasticsearch.Address, config.Elasticsearch.UserName, config.Elasticsearch.Password)
queryStr := the.getESTimeQueryStr()
TimeTheme := the.OutEs.SearchRaw("anxincloud_last_raw", queryStr)
// 遍历查询结果,并发送数据
for _, hit := range TimeTheme {
log.Printf("esResp.Hits.Hits: %s", hit)
//对es的数据进行逻辑处理
postThing := ChangeEsValueToHttpBody(hit)
// 发送每个设备的 value 数据
err := SendPostRequest(the.Info.URL, postThing)
if err != nil {
log.Printf("发送 POST 请求失败: %s", err)
} else {
log.Printf("成功发送 value: %s", postThing.EquipCode)
}
}
// 等待五分钟
time.Sleep(time.Duration(the.Info.Time) * time.Minute)
}
return nil
}
func (the *consumerESraw2http) outputInitial() error {
//数据出口
the.OutEs = *dbOperate.NewESHelper(
the.Info.Address,
the.Info.UserName,
the.Info.Password,
)
return nil
}
// SendPostRequest 发送POST请求,包含指定的头部和签名
func SendPostRequest(url string, postThings models.PostThings) error {
// 格式化时间
formattedTime := time.Now().Format("2006-01-02 15:04:05")
// 构建请求体
postBody := map[string]interface{}{
"equipcode": postThings.EquipCode,
"lng": postThings.Lng,
"lat": postThings.Lat,
"status": 1,
"online": postThings.Online,
"currentstate": 1,
"led": 1,
"startorstopstate": "Start",
"threshold": 10.0,
"thresholdchangerate": 10.0,
"updatetime": formattedTime,
"remarks": "Remarks",
"lnglatsource": 1,
}
// 将请求体转换为紧凑JSON字符串(无空格和换行)
bodyBytes, err := json.Marshal(postBody)
if err != nil {
return fmt.Errorf("序列化请求体失败: %v", err)
}
compactBody := string(bodyBytes)
// 生成10位时间戳
rnd := strconv.FormatInt(time.Now().Unix(), 10)
// 计算签名
appKey := "DlgAFSygIQNWPFlL"
appSecret := "yx9Zb3uN3boUXvCE3EfI"
sign := calculateSignature(compactBody, rnd, appKey, appSecret)
// 创建HTTP请求
req, err := http.NewRequest("POST", url, bytes.NewReader(bodyBytes))
if err != nil {
return fmt.Errorf("创建HTTP请求失败: %v", err)
}
// 设置请求头
req.Header.Set("Content-Type", "application/json")
req.Header.Set("appkey", appKey)
//req.Header.Set("appsecret", appSecret)
req.Header.Set("rnd", rnd)
req.Header.Set("sign", sign)
// 发送请求
client := &http.Client{
Timeout: 2 * time.Minute,
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("发送请求失败: %v", err)
}
defer resp.Body.Close()
// 读取响应体
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("读取响应失败: %v", err)
}
// 检查HTTP状态码
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("请求失败,状态码: %d, 响应: %s", resp.StatusCode, string(responseBody))
}
fmt.Printf("请求成功! 响应: %s\n", string(responseBody))
return nil
}
func calculateSignature(compactBody, rnd, appKey, appSecret string) string {
// 拼接字符串
data := compactBody + rnd + appKey + appSecret
// 计算SHA1哈希
h := sha1.New()
h.Write([]byte(data))
hashBytes := h.Sum(nil)
// 转换为大写十六进制字符串
return fmt.Sprintf("%X", hashBytes)
}
func (the *consumerESraw2http) getESTimeQueryStr() string {
// 构建查询请求体
esQuery := fmt.Sprintf(`
{
"query": {
"bool": {
"must": [
{
"terms": {
"iota_device": [%s]
}
}
]
}
}
}`, formatUUIDArray(the.Info.Query.IotaDevice))
return esQuery
}
func formatUUIDArray(arr []string) string {
// 用双引号包裹每个元素,并用逗号连接
quoted := make([]string, len(arr))
for i, v := range arr {
quoted[i] = fmt.Sprintf("\"%s\"", v) // 给每个元素加上双引号
}
// 将格式化后的数组用逗号连接起来
return strings.Join(quoted, ", ")
}
func ChangeEsValueToHttpBody(hit models.HitRaw) models.PostThings {
fmt.Println("ChangeEsValueToHttpBody")
var postThings models.PostThings
if hit.Source.IotaDevice == "335eda85-2bca-4ee7-9a6f-e0038f3321e1" {
postThings.EquipCode = "8678960778475670"
postThings.Online = hit.Source.Data["value"].(float64)
postThings.Lng = 116.1439856
postThings.Lat = 23.00194234
} else if hit.Source.IotaDevice == "fc794781-d025-4ad7-95d4-08b372f8f307" {
postThings.EquipCode = "8623610710964500"
postThings.Online = hit.Source.Data["value"].(float64)
postThings.Lng = 116.1508654
postThings.Lat = 23.00236479
} else if hit.Source.IotaDevice == "43b5209b-b5cb-4e39-9c18-32fd536e41e9" {
postThings.EquipCode = "8678960774603870"
postThings.Online = hit.Source.Data["value"].(float64)
postThings.Lng = 116.1508654
postThings.Lat = 23.00236479
} else if hit.Source.IotaDevice == "ad1c5184-44da-4343-8ae6-8d3927c31093" {
postThings.EquipCode = "8678960777615450"
postThings.Online = hit.Source.Data["value"].(float64)
postThings.Lng = 116.1508654
postThings.Lat = 23.00236479
} else if hit.Source.IotaDevice == "35234ef9-7137-48f1-b877-0eb28af876cd" {
postThings.EquipCode = "8678960778485160"
postThings.Online = hit.Source.Data["value"].(float64)
postThings.Lng = 116.1508654
postThings.Lat = 23.00236479
} else if hit.Source.IotaDevice == "8555b009-67c3-4a31-998a-4b3d49049661" {
postThings.EquipCode = "8678960778882800"
postThings.Online = hit.Source.Data["value"].(float64)
postThings.Lng = 116.1508654
postThings.Lat = 23.00236479
} else if hit.Source.IotaDevice == "21812630-404e-4946-9aaf-42e529db923a" { //往深圳方向信号灯
postThings.EquipCode = "8678960778883801"
postThings.Online = hit.Source.Data["value"].(float64)
postThings.Lng = 116.1508654
postThings.Lat = 23.00236479
} else if hit.Source.IotaDevice == "61aea690-e544-4717-aaf9-dd346e7465c0" { //往汕头方向信号灯
postThings.EquipCode = "8678960778883802"
postThings.Online = hit.Source.Data["value"].(float64)
postThings.Lng = 116.1508654
postThings.Lat = 23.00236479
}
return postThings
}