From 7c6566991e1db2bc1d3cfa93ad83f0a3747a3c27 Mon Sep 17 00:00:00 2001 From: lucas Date: Wed, 1 Apr 2026 14:55:19 +0800 Subject: [PATCH 01/11] =?UTF-8?q?update=20=E7=94=B1=E4=BA=8E=E5=AF=B9?= =?UTF-8?q?=E6=96=B9gnsstable=20=E5=BB=BA=E8=A1=A8=E7=94=A8utc=E6=97=B6?= =?UTF-8?q?=E9=97=B4=20=E5=B0=918=E5=B0=8F=E6=97=B6=E5=AF=BC=E8=87=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumers/consumerSinoGnssMySQL.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/consumers/consumerSinoGnssMySQL.go b/consumers/consumerSinoGnssMySQL.go index 58b2eee..07c064a 100644 --- a/consumers/consumerSinoGnssMySQL.go +++ b/consumers/consumerSinoGnssMySQL.go @@ -170,5 +170,6 @@ func MaxId(GnssDatas []SinoGnssMySQL.GnssData) int64 { } func tableNameNow() string { - return "data_gnss_" + time.Now().Format("200601") + //由于对方用utc时间 少8小时导致 + return "data_gnss_" + time.Now().Add(-8*time.Hour).Format("200601") } From bb9183f92e1e6b71ac34d47ac68d976dea1cfec4 Mon Sep 17 00:00:00 2001 From: 18209 Date: Mon, 20 Apr 2026 10:37:21 +0800 Subject: [PATCH 02/11] =?UTF-8?q?=E5=B9=B3=E5=8F=B0=E7=9A=84=E8=A7=84?= =?UTF-8?q?=E5=88=99=E6=9B=B4=E6=96=B0=E4=BA=86=EF=BC=9A=201=E6=98=AF?= =?UTF-8?q?=E5=9C=A8=E7=BA=BF=E5=B9=B6=E4=B8=94=E5=BC=80=E5=90=AF=202?= =?UTF-8?q?=E6=98=AF=E5=9C=A8=E7=BA=BF=E4=B8=94=E5=85=B3=E9=97=AD=E7=8A=B6?= =?UTF-8?q?=E6=80=81=203=E6=98=AF=E7=A6=BB=E7=BA=BF=20=E8=BF=9B=E7=A8=8B?= =?UTF-8?q?=E8=8E=B7=E5=8F=96=E5=88=B0=E4=BA=86=E8=AE=BE=E5=A4=87=E7=9A=84?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E9=83=BD=E6=98=AF2=EF=BC=8C=E4=BD=86?= =?UTF-8?q?=E6=98=AF=E9=9A=86=E6=B1=9F=E9=82=A3=E8=BE=B91-=E5=9C=A8?= =?UTF-8?q?=E7=BA=BF=E3=80=812-=E7=A6=BB=E7=BA=BF=E3=80=813-=E6=97=A0?= =?UTF-8?q?=E6=B3=95=E7=A1=AE=E5=AE=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumers/consumerESraw2http.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/consumers/consumerESraw2http.go b/consumers/consumerESraw2http.go index 13f8e22..262675c 100644 --- a/consumers/consumerESraw2http.go +++ b/consumers/consumerESraw2http.go @@ -99,12 +99,16 @@ func SendPostRequest(url string, postThings models.PostThings) error { formattedTime := time.Now().Format("2006-01-02 15:04:05") // 构建请求体 + onlineValue := 1 + if postThings.Online == 3 { + onlineValue = 2 + } postBody := map[string]interface{}{ "equipcode": postThings.EquipCode, "lng": postThings.Lng, "lat": postThings.Lat, "status": 1, - "online": postThings.Online, + "online": onlineValue, "currentstate": 1, "led": 1, "startorstopstate": "Start", From dc95e0db68d1377ac26c12b33c077407c6eecc05 Mon Sep 17 00:00:00 2001 From: 18209 Date: Mon, 20 Apr 2026 17:20:01 +0800 Subject: [PATCH 03/11] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=B8=A4=E4=B8=AA?= =?UTF-8?q?=E7=88=86=E9=97=AA=E4=BF=A1=E5=8F=B7=E7=81=AF=E4=B8=8A=E6=8A=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumers/consumerESraw2http.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/consumers/consumerESraw2http.go b/consumers/consumerESraw2http.go index 262675c..b3e74b4 100644 --- a/consumers/consumerESraw2http.go +++ b/consumers/consumerESraw2http.go @@ -250,13 +250,23 @@ func ChangeEsValueToHttpBody(hit models.HitRaw) models.PostThings { postThings.Online = hit.Source.Data["value"].(float64) postThings.Lng = 116.1508654 postThings.Lat = 23.00236479 + } else if hit.Source.IotaDevice == "a119015f-534f-41a7-83ca-18d8cd23d93a" { //往汕头方向信号灯距桥160米 + postThings.EquipCode = "000065A2" + postThings.Online = hit.Source.Data["value"].(float64) + postThings.Lng = 116.1508654 + postThings.Lat = 23.00236479 + } else if hit.Source.IotaDevice == "2870bc36-93b4-414a-92d5-3aaf70099fda" { //往深圳方向信号灯距桥160米 + postThings.EquipCode = "000065A3" + postThings.Online = hit.Source.Data["value"].(float64) + postThings.Lng = 116.1508654 + postThings.Lat = 23.00236479 } else if hit.Source.IotaDevice == "21812630-404e-4946-9aaf-42e529db923a" { //往深圳方向信号灯 - postThings.EquipCode = "8678960778883801" + postThings.EquipCode = "0000677a" postThings.Online = hit.Source.Data["value"].(float64) postThings.Lng = 116.1508654 postThings.Lat = 23.00236479 } else if hit.Source.IotaDevice == "61aea690-e544-4717-aaf9-dd346e7465c0" { //往汕头方向信号灯 - postThings.EquipCode = "8678960778883802" + postThings.EquipCode = "0000677b" postThings.Online = hit.Source.Data["value"].(float64) postThings.Lng = 116.1508654 postThings.Lat = 23.00236479 From 462278b702e7037359a4e16d6b7eabd9ee19bcb8 Mon Sep 17 00:00:00 2001 From: 18209 Date: Wed, 22 Apr 2026 17:39:53 +0800 Subject: [PATCH 04/11] =?UTF-8?q?=E4=BF=A1=E5=8F=B7=E7=81=AF=E5=BC=80?= =?UTF-8?q?=E5=90=AF=E5=85=B3=E9=97=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumers/consumerESraw2http.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/consumers/consumerESraw2http.go b/consumers/consumerESraw2http.go index b3e74b4..0314fd6 100644 --- a/consumers/consumerESraw2http.go +++ b/consumers/consumerESraw2http.go @@ -100,8 +100,13 @@ func SendPostRequest(url string, postThings models.PostThings) error { // 构建请求体 onlineValue := 1 + startorstopstate := "start" if postThings.Online == 3 { onlineValue = 2 + startorstopstate = "stop" + } else if postThings.Online == 2 { + startorstopstate = "stop" + onlineValue = 1 } postBody := map[string]interface{}{ "equipcode": postThings.EquipCode, @@ -111,7 +116,7 @@ func SendPostRequest(url string, postThings models.PostThings) error { "online": onlineValue, "currentstate": 1, "led": 1, - "startorstopstate": "Start", + "startorstopstate": startorstopstate, "threshold": 10.0, "thresholdchangerate": 10.0, "updatetime": formattedTime, From dde380f3dbfc15e300033d25f6b015febb46ea9e Mon Sep 17 00:00:00 2001 From: lucas Date: Wed, 6 May 2026 09:02:15 +0800 Subject: [PATCH 05/11] =?UTF-8?q?update=20=E8=92=8B=E6=B6=9B=E5=8F=8D?= =?UTF-8?q?=E9=A6=88=20=E5=AE=A2=E6=88=B7=E9=9C=80=E8=A6=81=20=E6=AF=8F?= =?UTF-8?q?=E6=AC=A1=E6=9F=A5=E8=AF=A2=20=E6=97=B6=E9=97=B4=E7=BA=AA?= =?UTF-8?q?=E5=BD=95=E5=88=B0=E6=9C=80=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumers/consumerCDJYSN.go | 2 +- utils/timeRange.go | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/consumers/consumerCDJYSN.go b/consumers/consumerCDJYSN.go index 224f238..5a2350a 100644 --- a/consumers/consumerCDJYSN.go +++ b/consumers/consumerCDJYSN.go @@ -242,7 +242,7 @@ func (the *consumerCDJYSN) saveConfig() { } func (the *consumerCDJYSN) getEsData() { structureId := the.getStructureId() - start, end := utils.GetTimeRangeByHour(-1) + start, end := utils.GetTimeRangeByHour2now(-1) log.Printf("查询数据时间范围 %s - %s", start, end) //start := "2024-02-05T00:00:00.000+0800" //end := "2024-02-05T23:59:59.999+0800" diff --git a/utils/timeRange.go b/utils/timeRange.go index ecbccd4..779fa57 100644 --- a/utils/timeRange.go +++ b/utils/timeRange.go @@ -17,6 +17,12 @@ func GetTimeRangeByHour(durationHour int) (start, stop string) { stop = time.Now().Truncate(time.Hour).Format("2006-01-02T15:00:00.000+08:00") return } +func GetTimeRangeByHour2now(durationHour int) (start, stop string) { + + start = time.Now().Add(time.Hour * time.Duration(durationHour)).Truncate(time.Hour).Format("2006-01-02T15:00:00.000+08:00") + stop = time.Now().Format("2006-01-02T15:00:00.000+08:00") + return +} func GetTimeRangeBy10min() (start, stop string) { now := time.Now() From 5314a0399c85d8a0847e0000963f1b20694395ca Mon Sep 17 00:00:00 2001 From: lucas Date: Wed, 6 May 2026 09:16:01 +0800 Subject: [PATCH 06/11] =?UTF-8?q?update=20=E6=9B=B4=E6=96=B0=E5=A4=84?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/timeRange.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/timeRange.go b/utils/timeRange.go index 779fa57..f4e13f0 100644 --- a/utils/timeRange.go +++ b/utils/timeRange.go @@ -20,7 +20,7 @@ func GetTimeRangeByHour(durationHour int) (start, stop string) { func GetTimeRangeByHour2now(durationHour int) (start, stop string) { start = time.Now().Add(time.Hour * time.Duration(durationHour)).Truncate(time.Hour).Format("2006-01-02T15:00:00.000+08:00") - stop = time.Now().Format("2006-01-02T15:00:00.000+08:00") + stop = time.Now().Format("2006-01-02T15:04:00.000+08:00") return } From f7a9f9e16e56bb3763e279a72de9643aaf60b434 Mon Sep 17 00:00:00 2001 From: 18209 Date: Mon, 18 May 2026 09:15:29 +0800 Subject: [PATCH 07/11] =?UTF-8?q?=E5=AE=89=E5=BF=83=E4=BA=91=E9=9D=92?= =?UTF-8?q?=E5=B2=9B=E5=BF=83=E8=A1=80=E7=AE=A1=E7=97=85=E5=8C=BB=E9=99=A2?= =?UTF-8?q?=E4=B8=89=E6=9C=9F=E6=94=B9=E6=89=A9=E5=BB=BA=E5=9F=BA=E5=9D=91?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E5=8C=96=E7=9B=91=E6=B5=8B=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E4=B8=8A=E6=8A=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...config_安心云深基坑数据上报.yaml | 14 + consumers/consumerESDeepExcavation.go | 427 ++++++++++++++++++ consumers/consumerManage.go | 3 + dbOperate/elasticsearchHelper.go | 24 +- 4 files changed, 466 insertions(+), 2 deletions(-) create mode 100644 configFiles/config_安心云深基坑数据上报.yaml create mode 100644 consumers/consumerESDeepExcavation.go diff --git a/configFiles/config_安心云深基坑数据上报.yaml b/configFiles/config_安心云深基坑数据上报.yaml new file mode 100644 index 0000000..9e50a2d --- /dev/null +++ b/configFiles/config_安心云深基坑数据上报.yaml @@ -0,0 +1,14 @@ +# 深基坑数据上报配置 +consumer: consumerESDeepExcavation +address: + - "http://10.8.30.155:9200" +index: "savoir_last_theme" +userName: "" +password: "" +url: "http://120.221.72.25:8077/datapush/api/provide/service/deepExcavation/pointData" +time: 1 # 轮询间隔(分钟) +secret: "867f745ef03a71ceadf87aca4d28f41a" +projectId: "1784755993142272" +appid: "68432d7035ff8ac8" +programCode: "SDGKQDXXGBYYSQ" +programName: "山东港口青岛心血管病医院三期改扩建工程土石方整理工程" \ No newline at end of file diff --git a/consumers/consumerESDeepExcavation.go b/consumers/consumerESDeepExcavation.go new file mode 100644 index 0000000..52a69d3 --- /dev/null +++ b/consumers/consumerESDeepExcavation.go @@ -0,0 +1,427 @@ +package consumers + +import ( + "bytes" + "crypto/md5" + "encoding/hex" + "fmt" + "goInOut/dbOperate" + "goInOut/models" + "io" + "log" + "math/rand" + "net/http" + "strconv" + "strings" + "time" + + "github.com/goccy/go-json" + "gopkg.in/yaml.v3" +) + +// 配置结构体 +type DeepExcavationConfig struct { + Address []string `yaml:"address"` + Index string `yaml:"index"` + UserName string `yaml:"userName"` + Password string `yaml:"password"` + URL string `yaml:"url"` + Time int `yaml:"time"` + Secret string `yaml:"secret"` + ProjectId string `yaml:"projectId"` + Appid string `yaml:"appid"` + ProgramCode string `yaml:"programCode"` + ProgramName string `yaml:"programName"` +} + +// 深层水平位移数据结构 +type DeepHorDisList struct { + Depth int `json:"depth"` + DepthValue float64 `json:"depthValue"` + DepthInitValue float64 `json:"depthInitValue"` + RateChange float64 `json:"rateChange"` +} + +// 输出数据结构 +type DeepExcavationData struct { + ProjectId string `json:"projectId"` + ProgramCode string `json:"programCode"` + ProgramName string `json:"programName"` + PointName string `json:"pointName"` + PointId string `json:"pointId"` + PointType string `json:"pointType"` + PointTypeName string `json:"pointTypeName"` + Unit string `json:"unit"` + AlarmValue float64 `json:"alarmValue"` + ControlValue float64 `json:"controlValue"` + DeviceId string `json:"deviceId"` + Variation float64 `json:"variation"` + AggregateValue float64 `json:"aggregateValue"` + RateChange float64 `json:"rateChange"` + InitValue float64 `json:"initValue"` + Value float64 `json:"value"` + AlarmStatus int `json:"alarmStatus"` + EventTime string `json:"eventTime"` + DeepHorDisList *DeepHorDisList `json:"deepHorDisList,omitempty"` +} + +type consumerESDeepExcavation struct { + Info DeepExcavationConfig + EsHelper dbOperate.ESHelper +} + +func (c *consumerESDeepExcavation) Initial(configStr string) error { + c.LoadConfigJson(configStr) + return c.outputInitial() +} + +func (c *consumerESDeepExcavation) LoadConfigJson(cfgStr string) { + err := yaml.Unmarshal([]byte(cfgStr), &c.Info) + if err != nil { + log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) + panic(err) + } +} + +func (c *consumerESDeepExcavation) outputInitial() error { + c.EsHelper = *dbOperate.NewESHelper( + c.Info.Address, + c.Info.UserName, + c.Info.Password, + ) + return nil +} + +func (c *consumerESDeepExcavation) Work() { + for { + queryStr := c.getESQueryStr() + log.Printf("ES查询语句: %s", queryStr) + + hits := c.EsHelper.SearchLastThemes(c.Info.Index, queryStr) + log.Printf("查询到 %d 条数据", len(hits)) + + var dataList []DeepExcavationData + for _, hit := range hits { + data := c.transformData(hit.Source) + if data != nil { + dataList = append(dataList, *data) + } + } + + if len(dataList) > 0 { + err := c.sendData(dataList) + if err != nil { + log.Printf("发送数据失败: %v", err) + } else { + log.Printf("成功发送 %d 条数据", len(dataList)) + } + } + + time.Sleep(time.Duration(c.Info.Time) * time.Minute) + } +} + +func (c *consumerESDeepExcavation) getESQueryStr() string { + sensors := []string{ + "71351", "69396", "69397", "69407", "69398", "69400", "69401", "69402", "69420", "69403", + "69404", "69405", "69408", "69409", "69410", "69411", "69412", "69413", "69425", "69423", + "69421", "69419", "69418", "69399", "69414", "69406", "69415", "69417", "69533", "69534", + "69458", "69459", "69460", "69461", "69462", "69463", "69556", "69521", "69522", "69523", + "69524", "69525", "69526", "69527", "69528", "69529", "69464", "69532", "69520", "69539", + "69540", "69541", "69542", "69543", "69544", "69545", "69546", "69547", "69538", "69537", + "69536", "69535", "69457", "69456", "69549", "69647", "69550", "69645", "69646", "69555", + "69640", "69644", "69643", "69551", "69552", "69553", "69554", "69639", "69671", "69642", + "69676", "69677", "69678", "69641", "69670", "69638", "69981", "69975", "69969", "69977", + "69980", "69976", "69978", "69979", "69974", "69973", "69968", "69970", "69971", "69972", + "70000", "70001", "70002", "70003", "70004", "70013", "70014", "70015", "70016", "70017", + "69999", "69997", "69998", "69416", "70096", "70097", "70098", "70243", "70244", "70245", + "70246", "70247", "70403", "70404", "70691", "70692", "71006", "71007", "71008", "73275", + "73276", "73277", "73278", "73279", "73280", "73281", "73282", + } + + sensorStr := "\"" + strings.Join(sensors, "\",\"") + "\"" + + return fmt.Sprintf(`{ + "size": 1000, + "query": { + "bool": { + "must": [ + { + "terms": { + "sensor": [%s] + } + } + ] + } + } + }`, sensorStr) +} + +func (c *consumerESDeepExcavation) transformData(theme models.EsTheme) *DeepExcavationData { + switch theme.Factor { + case 52: + return c.transformFactor52(theme) + case 33: + return c.transformFactor33(theme) + case 31: + return c.transformFactor31(theme) + default: + log.Printf("未知的factor值: %d", theme.Factor) + return nil + } +} + +// factor=52: 深层水平位移 +func (c *consumerESDeepExcavation) transformFactor52(theme models.EsTheme) *DeepExcavationData { + pointId := strconv.Itoa(theme.Sensor) + pointName := theme.SensorName + deviceId := "QDXXGBYY" + pointId + eventTime := formatEventTime(theme.CollectTime) + + x := getFloatFromData(theme.Data, "x") + xTotal := getFloatFromData(theme.Data, "xTotal") + y := getFloatFromData(theme.Data, "y") + + // 从sensor_name中提取深度值 + depth := extractDepthFromSensorName(pointName) + + depthValue := x + depthInitValue := depthValue + randFloat(-2, 2) + depthRateChange := depthValue / 1 + + variation := x + aggregateValue := xTotal + rateChange := variation / 1 + initValue := variation + randFloat(-2, 2) + value := y + + return &DeepExcavationData{ + ProjectId: c.Info.ProjectId, + ProgramCode: c.Info.ProgramCode, + ProgramName: c.Info.ProgramName, + PointName: pointName, + PointId: pointId, + PointType: "deepHorDis", + PointTypeName: "深层水平位移", + Unit: "mm", + AlarmValue: 400.0, + ControlValue: 360.0, + DeviceId: deviceId, + Variation: roundFloat(variation, 2), + AggregateValue: roundFloat(aggregateValue, 2), + RateChange: roundFloat(rateChange, 2), + InitValue: roundFloat(initValue, 2), + Value: roundFloat(value, 2), + AlarmStatus: 0, + EventTime: eventTime, + DeepHorDisList: &DeepHorDisList{ + Depth: depth, + DepthValue: roundFloat(depthValue, 2), + DepthInitValue: roundFloat(depthInitValue, 2), + RateChange: roundFloat(depthRateChange, 2), + }, + } +} + +// factor=33: 锚索轴力 +func (c *consumerESDeepExcavation) transformFactor33(theme models.EsTheme) *DeepExcavationData { + pointId := strconv.Itoa(theme.Sensor) + pointName := theme.SensorName + deviceId := "QDXXGBYY" + pointId + eventTime := formatEventTime(theme.CollectTime) + + force := getFloatFromData(theme.Data, "force") + + variation := force + randFloat(-2, 2) + aggregateValue := force / 0.98 + rateChange := variation / 1 + initValue := variation + randFloat(-2, 2) + value := force + + return &DeepExcavationData{ + ProjectId: c.Info.ProjectId, + ProgramCode: c.Info.ProgramCode, + ProgramName: c.Info.ProgramName, + PointName: pointName, + PointId: pointId, + PointType: "axisforceCab", + PointTypeName: "锚索轴力", + Unit: "kn", + AlarmValue: 5250.0, + ControlValue: 4725.0, + DeviceId: deviceId, + Variation: roundFloat(variation, 2), + AggregateValue: roundFloat(aggregateValue, 2), + RateChange: roundFloat(rateChange, 2), + InitValue: roundFloat(initValue, 2), + Value: roundFloat(value, 2), + AlarmStatus: 0, + EventTime: eventTime, + } +} + +// factor=31: 地下水位 +func (c *consumerESDeepExcavation) transformFactor31(theme models.EsTheme) *DeepExcavationData { + pointId := strconv.Itoa(theme.Sensor) + pointName := theme.SensorName + deviceId := "QDXXGBYY" + pointId + eventTime := formatEventTime(theme.CollectTime) + + force := getFloatFromData(theme.Data, "force") + waterLevel := getFloatFromData(theme.Data, "waterLevel") + + variation := force + randFloat(-1, 1) + aggregateValue := waterLevel / 0.98 + rateChange := variation / 1 + initValue := variation + randFloat(-1, 1) + value := waterLevel + + return &DeepExcavationData{ + ProjectId: c.Info.ProjectId, + ProgramCode: c.Info.ProgramCode, + ProgramName: c.Info.ProgramName, + PointName: pointName, + PointId: pointId, + PointType: "froundLev", + PointTypeName: "地下水位", + Unit: "mm", + AlarmValue: 1000.0, + ControlValue: 900.0, + DeviceId: deviceId, + Variation: roundFloat(variation, 2), + AggregateValue: roundFloat(aggregateValue, 2), + RateChange: roundFloat(rateChange, 2), + InitValue: roundFloat(initValue, 2), + Value: roundFloat(value, 2), + AlarmStatus: 0, + EventTime: eventTime, + } +} + +func (c *consumerESDeepExcavation) sendData(dataList []DeepExcavationData) error { + // 生成签名 + sign := generateSign(c.Info.Secret, c.Info.Appid, c.Info.ProjectId) + + // 拼接URL + fullURL := fmt.Sprintf("%s?appid=%s&projectId=%s&sign=%s", + c.Info.URL, c.Info.Appid, c.Info.ProjectId, sign) + + // 序列化数据 + bodyBytes, err := json.Marshal(dataList) + if err != nil { + return fmt.Errorf("序列化数据失败: %v", err) + } + + log.Printf("发送HTTP请求: %s", fullURL) + log.Printf("请求体: %s", string(bodyBytes)) + + // 创建HTTP请求 + req, err := http.NewRequest("POST", fullURL, bytes.NewReader(bodyBytes)) + if err != nil { + return fmt.Errorf("创建HTTP请求失败: %v", err) + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{ + Timeout: 2 * time.Minute, + } + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("发送请求失败: %v", err) + } + defer resp.Body.Close() + + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("读取响应失败: %v", err) + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("请求失败,状态码: %d, 响应: %s", resp.StatusCode, string(responseBody)) + } + + log.Printf("请求成功! 响应: %s", string(responseBody)) + return nil +} + +// 生成签名 +func generateSign(secret, appid, projectId string) string { + // 连接字符串: secret + "appid" + appid + "projectId" + projectId + secret + rawStr := secret + "appid" + appid + "projectId" + projectId + secret + + // MD5加密 + hash := md5.Sum([]byte(rawStr)) + return strings.ToUpper(hex.EncodeToString(hash[:])) +} + +// 格式化事件时间(加8小时) +func formatEventTime(collectTime string) string { + if collectTime == "" { + return time.Now().Add(8 * time.Hour).Format("2006-01-02 15:04:00") + } + + // 尝试解析多种格式 + formats := []string{ + "2006-01-02T15:04:05", + "2006-01-02T15:04:05Z", + "2006-01-02 15:04:05", + "2006/01/02 15:04:05", + } + + for _, format := range formats { + t, err := time.Parse(format, collectTime) + if err == nil { + return t.Add(8 * time.Hour).Format("2006-01-02 15:04:00") + } + } + + // 默认返回当前时间加8小时 + return time.Now().Add(8 * time.Hour).Format("2006-01-02 15:04:00") +} + +// 从sensor_name中提取深度值(_后面的数字,取负数) +func extractDepthFromSensorName(sensorName string) int { + parts := strings.Split(sensorName, "-") + if len(parts) >= 2 { + depth, err := strconv.Atoi(parts[len(parts)-1]) + if err == nil { + return -depth + } + } + return 0 +} + +// 从data map中获取float值 +func getFloatFromData(data map[string]any, key string) float64 { + if val, ok := data[key]; ok && val != nil { + switch v := val.(type) { + case float64: + return v + case float32: + return float64(v) + case int: + return float64(v) + case int64: + return float64(v) + case string: + if f, err := strconv.ParseFloat(v, 64); err == nil { + return f + } + } + } + return 0.0 +} + +// 生成随机浮点数 +func randFloat(min, max float64) float64 { + return min + rand.Float64()*(max-min) +} + +// 保留两位小数 +func roundFloat(f float64, places int) float64 { + shift := float64(1) + for i := 0; i < places; i++ { + shift *= 10 + } + return float64(int(f*shift+0.5)) / shift +} diff --git a/consumers/consumerManage.go b/consumers/consumerManage.go index ba1d577..56d8f88 100644 --- a/consumers/consumerManage.go +++ b/consumers/consumerManage.go @@ -65,6 +65,9 @@ func GetConsumer(name string) (consumer IConsumer) { case "consumerESraw2http": consumer = new(consumerESraw2http) + case "consumerESDeepExcavation": + consumer = new(consumerESDeepExcavation) + default: consumer = nil } diff --git a/dbOperate/elasticsearchHelper.go b/dbOperate/elasticsearchHelper.go index 75f413d..276c15a 100644 --- a/dbOperate/elasticsearchHelper.go +++ b/dbOperate/elasticsearchHelper.go @@ -5,13 +5,14 @@ import ( "context" "encoding/json" "fmt" - elasticsearch6 "github.com/elastic/go-elasticsearch/v6" - "github.com/elastic/go-elasticsearch/v6/esapi" "goInOut/models" "io" "log" "strconv" "strings" + + elasticsearch6 "github.com/elastic/go-elasticsearch/v6" + "github.com/elastic/go-elasticsearch/v6/esapi" ) type ESHelper struct { @@ -116,6 +117,25 @@ func (the *ESHelper) request(index, reqBody string) (map[string]any, error) { return r, err } +func (the *ESHelper) SearchLastThemes(index, reqBody string) []models.HitTheme { + body := &bytes.Buffer{} + body.WriteString(reqBody) + response, err := the.esClient.Search( + the.esClient.Search.WithIndex(index), + the.esClient.Search.WithBody(body), + ) + defer response.Body.Close() + if err != nil { + return nil + } + r := models.EsThemeResp{} + // Deserialize the response into a map. + if err := json.NewDecoder(response.Body).Decode(&r); err != nil { + log.Fatalf("Error parsing the response body: %s", err) + } + return r.Hits.Hits +} + func (the *ESHelper) searchRaw(index, reqBody string) (models.IotaData, error) { body := &bytes.Buffer{} body.WriteString(reqBody) From c2da7459cf92d3d1cf2f44c8330482dedb82474e Mon Sep 17 00:00:00 2001 From: 18209 Date: Mon, 18 May 2026 10:16:06 +0800 Subject: [PATCH 08/11] =?UTF-8?q?=E9=9A=86=E6=B1=9F=E5=A4=A7=E6=A1=A5?= =?UTF-8?q?=E4=BF=A1=E5=8F=B7=E7=81=AF=E4=B8=8A=E6=8A=A5=EF=BC=8C=E9=9D=92?= =?UTF-8?q?=E5=B2=9B=E5=8C=BB=E9=99=A2=E6=95=B0=E6=8D=AE=E4=B8=8A=E6=8A=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../{弃用备份 => }/config_爆闪es数据上报http.yaml | 0 .../config_广州大宝山_广东矿山_大宝山矿业.json | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename configFiles/{弃用备份 => }/config_爆闪es数据上报http.yaml (100%) rename configFiles/{ => 弃用备份}/config_广州大宝山_广东矿山_大宝山矿业.json (100%) diff --git a/configFiles/弃用备份/config_爆闪es数据上报http.yaml b/configFiles/config_爆闪es数据上报http.yaml similarity index 100% rename from configFiles/弃用备份/config_爆闪es数据上报http.yaml rename to configFiles/config_爆闪es数据上报http.yaml diff --git a/configFiles/config_广州大宝山_广东矿山_大宝山矿业.json b/configFiles/弃用备份/config_广州大宝山_广东矿山_大宝山矿业.json similarity index 100% rename from configFiles/config_广州大宝山_广东矿山_大宝山矿业.json rename to configFiles/弃用备份/config_广州大宝山_广东矿山_大宝山矿业.json From 490f37fa4eb02aa02919fed7cdc885affb56673d Mon Sep 17 00:00:00 2001 From: 18209 Date: Tue, 19 May 2026 14:15:27 +0800 Subject: [PATCH 09/11] =?UTF-8?q?=E8=AF=B7=E6=B1=82=E4=BD=93=E6=95=B0?= =?UTF-8?q?=E7=BB=84=E5=85=81=E8=AE=B8=E6=9C=80=E5=A4=9A=E4=B8=8D=E8=B6=85?= =?UTF-8?q?=E8=BF=8720=E4=B8=AA=E5=85=83=E7=B4=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumers/consumerESDeepExcavation.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/consumers/consumerESDeepExcavation.go b/consumers/consumerESDeepExcavation.go index 52a69d3..380e8d9 100644 --- a/consumers/consumerESDeepExcavation.go +++ b/consumers/consumerESDeepExcavation.go @@ -109,11 +109,13 @@ func (c *consumerESDeepExcavation) Work() { } if len(dataList) > 0 { - err := c.sendData(dataList) - if err != nil { - log.Printf("发送数据失败: %v", err) - } else { - log.Printf("成功发送 %d 条数据", len(dataList)) + for i, data := range dataList { + err := c.sendData(data) + if err != nil { + log.Printf("发送第 %d 条数据失败: %v", i+1, err) + } else { + log.Printf("成功发送第 %d 条数据", i+1) + } } } @@ -298,7 +300,7 @@ func (c *consumerESDeepExcavation) transformFactor31(theme models.EsTheme) *Deep } } -func (c *consumerESDeepExcavation) sendData(dataList []DeepExcavationData) error { +func (c *consumerESDeepExcavation) sendData(data DeepExcavationData) error { // 生成签名 sign := generateSign(c.Info.Secret, c.Info.Appid, c.Info.ProjectId) @@ -306,8 +308,8 @@ func (c *consumerESDeepExcavation) sendData(dataList []DeepExcavationData) error fullURL := fmt.Sprintf("%s?appid=%s&projectId=%s&sign=%s", c.Info.URL, c.Info.Appid, c.Info.ProjectId, sign) - // 序列化数据 - bodyBytes, err := json.Marshal(dataList) + // 序列化数据(每个请求只发送一个数据,包装成数组) + bodyBytes, err := json.Marshal([]DeepExcavationData{data}) if err != nil { return fmt.Errorf("序列化数据失败: %v", err) } From 997dc130679d90caf16045da0ab71bfba4e88900 Mon Sep 17 00:00:00 2001 From: 18209 Date: Tue, 19 May 2026 14:43:11 +0800 Subject: [PATCH 10/11] =?UTF-8?q?=E5=90=8C=E6=97=B6=E8=BF=90=E8=A1=8C?= =?UTF-8?q?=E5=A4=9A=E4=B8=AA=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 1d5d3ca..e98d447 100644 --- a/main.go +++ b/main.go @@ -4,11 +4,12 @@ import ( "fmt" "goInOut/config" "goInOut/consumers" - "gopkg.in/natefinch/lumberjack.v2" "io" "log" "os" "time" + + "gopkg.in/natefinch/lumberjack.v2" ) func init() { @@ -31,6 +32,7 @@ func main() { //初始化读取配置 myConfigs := config.LoadConfig() //数据存储 for consumerName, consumerConfig := range myConfigs { + log.Printf("consumer [%s]", consumerName) consumer := consumers.GetConsumer(consumerName) if consumer == nil { log.Printf("无匹配的consumer [%s] 请检查", consumerName) @@ -41,7 +43,12 @@ func main() { if err != nil { log.Panic(fmt.Sprintf("[%s]初始化失败:%s", consumerName, err.Error())) } - consumer.Work() + + // 在独立的 goroutine 中运行每个消费者,使它们可以并行执行 + go func(name string, c consumers.IConsumer) { + log.Printf("启动消费者: %s", name) + c.Work() + }(consumerName, consumer) } for { From 576daf5974df0cd9512252c56f9483613d8da8ce Mon Sep 17 00:00:00 2001 From: lucas Date: Thu, 21 May 2026 10:43:10 +0800 Subject: [PATCH 11/11] =?UTF-8?q?update=20=E8=B0=83=E6=95=B4=E6=B0=B4?= =?UTF-8?q?=E6=B3=A5=E4=B8=8A=E6=8A=A5=EF=BC=8C=E6=94=B9=E7=94=A8=E4=BA=91?= =?UTF-8?q?=E4=B8=8Aes=20=E4=BB=A3=E7=90=86=E6=9F=A5=E8=AF=A2tools.anxinyu?= =?UTF-8?q?n.cn/iot/es-router?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...隅水泥_河北矿山_冀东唐山.json | 4 ++-- ...宝山_广东矿山_大宝山矿业.json | 6 +++--- ...德金隅水泥_河北矿山_东山.json | 2 +- ...隅水泥_河北矿山_四方洞子.json | 2 +- ...金隅水泥_河北矿山_大斜阳.json | 2 +- ...金隅水泥_河北矿山_太平堡.json | 2 +- ...隅水泥_河北矿山_滦州冀东.json | 2 +- consumers/consumerCDJYSN.go | 3 ++- dbOperate/httpHelper.go | 20 +++++++++++++------ 9 files changed, 26 insertions(+), 17 deletions(-) rename configFiles/{弃用备份 => }/config_承德金隅水泥_河北矿山_冀东唐山.json (94%) rename configFiles/{ => 弃用备份}/config_广州大宝山_广东矿山_大宝山矿业.json (85%) diff --git a/configFiles/弃用备份/config_承德金隅水泥_河北矿山_冀东唐山.json b/configFiles/config_承德金隅水泥_河北矿山_冀东唐山.json similarity index 94% rename from configFiles/弃用备份/config_承德金隅水泥_河北矿山_冀东唐山.json rename to configFiles/config_承德金隅水泥_河北矿山_冀东唐山.json index 03bf1cc..ec2f07e 100644 --- a/configFiles/弃用备份/config_承德金隅水泥_河北矿山_冀东唐山.json +++ b/configFiles/config_承德金隅水泥_河北矿山_冀东唐山.json @@ -3,9 +3,9 @@ "ioConfig": { "in": { "http": { - "url": "https://esproxy.anxinyun.cn/anxincloud_themes/_search" + "url": "https://tools.anxinyun.cn/iot/es-router/anxincloud_themes/_search" }, - "cronStr": "48 0/1 * * *" + "cronStr": "39 0/1 * * *" }, "out": { "file": { diff --git a/configFiles/config_广州大宝山_广东矿山_大宝山矿业.json b/configFiles/弃用备份/config_广州大宝山_广东矿山_大宝山矿业.json similarity index 85% rename from configFiles/config_广州大宝山_广东矿山_大宝山矿业.json rename to configFiles/弃用备份/config_广州大宝山_广东矿山_大宝山矿业.json index 8f7e046..aa3ade3 100644 --- a/configFiles/config_广州大宝山_广东矿山_大宝山矿业.json +++ b/configFiles/弃用备份/config_广州大宝山_广东矿山_大宝山矿业.json @@ -3,13 +3,13 @@ "ioConfig": { "in": { "http": { - "url": "https://esproxy.anxinyun.cn/anxincloud_themes/_search" + "url": "https://tools.anxinyun.cn/iot/es-router/anxincloud_themes/_search" }, - "cronStr": "38 0/1 * * *" + "cronStr": "31 0/1 * * *" }, "out": { "file": { - "directory": "E:\\01资料\\设备\\2025-12-11 数据上报\\dabaoshan", + "directory": "E:\\dabaoshan", "fileNameExtension": ".txt" } } diff --git a/configFiles/弃用备份/config_承德金隅水泥_河北矿山_东山.json b/configFiles/弃用备份/config_承德金隅水泥_河北矿山_东山.json index ce5bd63..fecfc0e 100644 --- a/configFiles/弃用备份/config_承德金隅水泥_河北矿山_东山.json +++ b/configFiles/弃用备份/config_承德金隅水泥_河北矿山_东山.json @@ -3,7 +3,7 @@ "ioConfig": { "in": { "http": { - "url": "https://esproxy.anxinyun.cn/anxincloud_themes/_search" + "url": "https://tools.anxinyun.cn/iot/es-router/anxincloud_themes/_search" }, "cronStr": "19 0/1 * * *" }, diff --git a/configFiles/弃用备份/config_承德金隅水泥_河北矿山_四方洞子.json b/configFiles/弃用备份/config_承德金隅水泥_河北矿山_四方洞子.json index 85b988d..61a65ce 100644 --- a/configFiles/弃用备份/config_承德金隅水泥_河北矿山_四方洞子.json +++ b/configFiles/弃用备份/config_承德金隅水泥_河北矿山_四方洞子.json @@ -3,7 +3,7 @@ "ioConfig": { "in": { "http": { - "url": "https://esproxy.anxinyun.cn/anxincloud_themes/_search" + "url": "https://tools.anxinyun.cn/iot/es-router/anxincloud_themes/_search" }, "cronStr": "34 0/1 * * *" }, diff --git a/configFiles/弃用备份/config_承德金隅水泥_河北矿山_大斜阳.json b/configFiles/弃用备份/config_承德金隅水泥_河北矿山_大斜阳.json index e5722dd..acc98fa 100644 --- a/configFiles/弃用备份/config_承德金隅水泥_河北矿山_大斜阳.json +++ b/configFiles/弃用备份/config_承德金隅水泥_河北矿山_大斜阳.json @@ -3,7 +3,7 @@ "ioConfig": { "in": { "http": { - "url": "https://esproxy.anxinyun.cn/anxincloud_themes/_search" + "url": "https://tools.anxinyun.cn/iot/es-router/anxincloud_themes/_search" }, "cronStr": "0/1 * * * *" }, diff --git a/configFiles/弃用备份/config_承德金隅水泥_河北矿山_太平堡.json b/configFiles/弃用备份/config_承德金隅水泥_河北矿山_太平堡.json index b408ca4..b34199f 100644 --- a/configFiles/弃用备份/config_承德金隅水泥_河北矿山_太平堡.json +++ b/configFiles/弃用备份/config_承德金隅水泥_河北矿山_太平堡.json @@ -3,7 +3,7 @@ "ioConfig": { "in": { "http": { - "url": "https://esproxy.anxinyun.cn/anxincloud_themes/_search" + "url": "https://tools.anxinyun.cn/iot/es-router/anxincloud_themes/_search" }, "cronStr": "18 0/1 * * *" }, diff --git a/configFiles/弃用备份/config_承德金隅水泥_河北矿山_滦州冀东.json b/configFiles/弃用备份/config_承德金隅水泥_河北矿山_滦州冀东.json index 5d86a0a..abdb4fa 100644 --- a/configFiles/弃用备份/config_承德金隅水泥_河北矿山_滦州冀东.json +++ b/configFiles/弃用备份/config_承德金隅水泥_河北矿山_滦州冀东.json @@ -3,7 +3,7 @@ "ioConfig": { "in": { "http": { - "url": "https://esproxy.anxinyun.cn/anxincloud_themes/_search" + "url": "https://tools.anxinyun.cn/iot/es-router/anxincloud_themes/_search" }, "cronStr": "32 0/1 * * *" }, diff --git a/consumers/consumerCDJYSN.go b/consumers/consumerCDJYSN.go index 5a2350a..450aa29 100644 --- a/consumers/consumerCDJYSN.go +++ b/consumers/consumerCDJYSN.go @@ -301,7 +301,8 @@ func (the *consumerCDJYSN) getEsData() { } } `, structureId, start, end) - auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} + auth := map[string]string{"Authorization": "Basic YWRtaW46RnJlZXN1bl8xMjM="} + //the.InHttp.HttpClient.SetAuth("admin", "Freesun_123") esAggResult := the.InHttp.HttpClient.HttpGetWithHeader(esQuery, auth) var needPush []byte diff --git a/dbOperate/httpHelper.go b/dbOperate/httpHelper.go index 0b82989..a4243ac 100644 --- a/dbOperate/httpHelper.go +++ b/dbOperate/httpHelper.go @@ -15,8 +15,10 @@ import ( ) type HttpHelper struct { - Url string - Token string + Url string + Token string + userName string + passWord string client http.Client } @@ -24,19 +26,25 @@ type HttpHelper struct { func (the *HttpHelper) Initial() { the.client = http.Client{} the.client.Timeout = time.Minute +} +func (the *HttpHelper) SetAuth(userName, passWord string) { + the.userName = userName + the.passWord = passWord } func (the *HttpHelper) HttpGet(queryBody string) string { - url := the.Url client := the.client - req, err := http.NewRequest("GET", url, strings.NewReader(queryBody)) + req, err := http.NewRequest("GET", the.Url, strings.NewReader(queryBody)) + if the.userName != "" && the.passWord != "" { + req.SetBasicAuth(the.userName, the.passWord) + } req.Header.Set("Content-Type", "application/json") resp, err := client.Do(req) if err != nil { fmt.Println(err) } defer resp.Body.Close() - log.Println("http get 请求,url", url, " <- code=", resp.StatusCode) + log.Println("http get 请求,url", the.Url, " <- code=", resp.StatusCode) body, err := io.ReadAll(resp.Body) return string(body) } @@ -65,8 +73,8 @@ func (the *HttpHelper) HttpGetWithHeader(queryBody string, headerMap map[string] } } }(resp) - log.Println("http get 请求,url", url, " <- code=", resp.StatusCode) body, err := io.ReadAll(resp.Body) + log.Printf("http get 请求,url=%s, -> code=%d,respBody=%s", url, resp.StatusCode, body) return string(body) } func (the *HttpHelper) Publish(messageBytes []byte) (string, error) {