From 0ef19cf4f9daacad39aae4456425625bc49c1091 Mon Sep 17 00:00:00 2001 From: lucas Date: Tue, 24 Dec 2024 18:32:17 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E6=B7=BB=E5=8A=A0gnss=20=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E8=8E=B7=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- adaptors/司南GNSS_MySql库to安心云.go | 38 ++++++ adaptors/安心云最新设备数据toES.go | 4 +- config/configStruct.go | 5 + configFiles/config_司南GnssMySQL.json | 26 +++++ ...ig_安心云设备数据_最新同步.json | 0 consumers/SinoGnssMySQL/config.go | 20 ++++ consumers/SinoGnssMySQL/dataModel.go | 28 +++++ consumers/consumerAXYraw.go | 12 +- consumers/consumerCDJYSN.go | 10 +- consumers/consumerCQZG.go | 10 +- consumers/consumerGZGZM.go | 10 +- consumers/consumerHTJC.go | 10 +- consumers/consumerHTTP_PRPXY.go | 10 +- consumers/consumerJSNCGLQL.go | 10 +- consumers/consumerMYX.go | 10 +- consumers/consumerManage.go | 5 +- consumers/consumerSinoGnssMySQL.go | 108 ++++++++++++++++++ consumers/consumerWJHP.go | 10 +- dbHelper/db_test.go | 23 ---- .../_kafka/consumerGroupHandler.go | 0 {dbHelper => dbOperate}/_kafka/kafkaHelper.go | 0 {dbHelper => dbOperate}/_kafka/kafka_test.go | 0 .../_kafka/producerHelper.go | 0 {dbHelper => dbOperate}/apiServerHelper.go | 2 +- dbOperate/dbHelper.go | 101 ++++++++++++++++ dbOperate/db_test.go | 37 ++++++ .../elasticsearchHelper.go | 2 +- {dbHelper => dbOperate}/fileSaveHelper.go | 2 +- {dbHelper => dbOperate}/httpHelper.go | 2 +- {dbHelper => dbOperate}/influxDBHelper.go | 2 +- {dbHelper => dbOperate}/mqttHelper.go | 2 +- {dbHelper => dbOperate}/redisHelper.go | 2 +- {dbHelper => dbOperate}/udpHelper.go | 2 +- go.mod | 31 +---- go.sum | 79 ++----------- models/aggWay.go | 6 + monitors/httpMonitor.go | 4 +- testUnit/mqttPush_test.go | 4 +- testUnit/mqttRecv_test.go | 4 +- testUnit/udpPush_test.go | 4 +- testUnit/安心云http转发_test.go | 42 +++++++ 41 files changed, 496 insertions(+), 181 deletions(-) create mode 100644 adaptors/司南GNSS_MySql库to安心云.go create mode 100644 configFiles/config_司南GnssMySQL.json rename configFiles/{ => 弃用备份}/config_安心云设备数据_最新同步.json (100%) create mode 100644 consumers/SinoGnssMySQL/config.go create mode 100644 consumers/SinoGnssMySQL/dataModel.go create mode 100644 consumers/consumerSinoGnssMySQL.go delete mode 100644 dbHelper/db_test.go rename {dbHelper => dbOperate}/_kafka/consumerGroupHandler.go (100%) rename {dbHelper => dbOperate}/_kafka/kafkaHelper.go (100%) rename {dbHelper => dbOperate}/_kafka/kafka_test.go (100%) rename {dbHelper => dbOperate}/_kafka/producerHelper.go (100%) rename {dbHelper => dbOperate}/apiServerHelper.go (98%) create mode 100644 dbOperate/dbHelper.go create mode 100644 dbOperate/db_test.go rename {dbHelper => dbOperate}/elasticsearchHelper.go (99%) rename {dbHelper => dbOperate}/fileSaveHelper.go (97%) rename {dbHelper => dbOperate}/httpHelper.go (99%) rename {dbHelper => dbOperate}/influxDBHelper.go (99%) rename {dbHelper => dbOperate}/mqttHelper.go (99%) rename {dbHelper => dbOperate}/redisHelper.go (99%) rename {dbHelper => dbOperate}/udpHelper.go (98%) create mode 100644 models/aggWay.go diff --git a/adaptors/司南GNSS_MySql库to安心云.go b/adaptors/司南GNSS_MySql库to安心云.go new file mode 100644 index 0000000..b9f1b57 --- /dev/null +++ b/adaptors/司南GNSS_MySql库to安心云.go @@ -0,0 +1,38 @@ +package adaptors + +import ( + "encoding/json" + "fmt" + "goInOut/consumers/SinoGnssMySQL" +) + +// Adaptor_SINOMYSQL_AXYMQTT 数据 转换 江苏农村公路桥梁监测系统 +type Adaptor_SINOMYSQL_AXYMQTT struct { +} + +func (the Adaptor_SINOMYSQL_AXYMQTT) Transform(gnssDataList []SinoGnssMySQL.GnssData) []NeedPush { + var needPush []NeedPush + var allDxFiles map[string][]SinoGnssMySQL.DxFile + for _, gnssData := range gnssDataList { + OnceDxFiles := allDxFiles[gnssData.GroupName] + OnceDxFiles = append(OnceDxFiles, SinoGnssMySQL.DxFile{ + Module: gnssData.StationName, + Timespan: gnssData.Time.UnixMilli(), + RawValue: []float64{gnssData.X, gnssData.Y, gnssData.H}, + PhyValue: []float64{gnssData.X, gnssData.Y, gnssData.H}, + ThemeValue: []float64{gnssData.X, gnssData.Y, gnssData.H}, + }) + allDxFiles[gnssData.GroupName] = OnceDxFiles + } + + for groupName, groupFile := range allDxFiles { + bs, _ := json.Marshal(groupFile) + topic := fmt.Sprintf("SinoGnss/%s/", groupName) + needPush = append(needPush, NeedPush{ + Topic: topic, + Payload: bs, + }) + } + + return needPush +} diff --git a/adaptors/安心云最新设备数据toES.go b/adaptors/安心云最新设备数据toES.go index 390a744..b300d35 100644 --- a/adaptors/安心云最新设备数据toES.go +++ b/adaptors/安心云最新设备数据toES.go @@ -4,7 +4,7 @@ import ( "encoding/json" "fmt" "goInOut/consumers/AXYraw" - "goInOut/dbHelper" + "goInOut/dbOperate" "goInOut/models" "log" "sync" @@ -14,7 +14,7 @@ import ( // Adaptor_AXY_LastRAW 安心云 kafka iota数据 转换 es设备数据 type Adaptor_AXY_LastRAW struct { AXYraw.Info - Redis *dbHelper.RedisHelper + Redis *dbOperate.RedisHelper } func (the Adaptor_AXY_LastRAW) Transform(topic, rawMsg string) *models.EsRaw { diff --git a/config/configStruct.go b/config/configStruct.go index 1bc91d2..b12a9f4 100644 --- a/config/configStruct.go +++ b/config/configStruct.go @@ -4,6 +4,11 @@ type Consumer struct { Consumer string `json:"consumer"` } +type DbConfig struct { + Type string `json:"type"` + ConnStr string `json:"connStr"` +} + type MqttConfig struct { Host string `json:"host"` Port int `json:"port"` diff --git a/configFiles/config_司南GnssMySQL.json b/configFiles/config_司南GnssMySQL.json new file mode 100644 index 0000000..179b69b --- /dev/null +++ b/configFiles/config_司南GnssMySQL.json @@ -0,0 +1,26 @@ +{ + "consumer": "consumerSinoGnssMySQL", + "ioConfig": { + "in": { + "db": { + "type": "mysql", + "connStr": "root:Xuchen@2024@tcp(39.105.5.154:3306)/navi_cloud_sinognss?charset=utf8&parseTime=true" + } + }, + "out": { + "mqtt": { + "host": "10.8.30.160", + "port": 30883, + "userName": "upload", + "password": "", + "clientId": "goInOut_SinoGnssMySQL", + "Topics": [ + "SinoGnss/{{.group_name}}/{{.station_name}}" + ] + } + } + }, + "info": { + "bridgeCode": "G2320281L0012" + } +} \ No newline at end of file diff --git a/configFiles/config_安心云设备数据_最新同步.json b/configFiles/弃用备份/config_安心云设备数据_最新同步.json similarity index 100% rename from configFiles/config_安心云设备数据_最新同步.json rename to configFiles/弃用备份/config_安心云设备数据_最新同步.json diff --git a/consumers/SinoGnssMySQL/config.go b/consumers/SinoGnssMySQL/config.go new file mode 100644 index 0000000..a6989db --- /dev/null +++ b/consumers/SinoGnssMySQL/config.go @@ -0,0 +1,20 @@ +package SinoGnssMySQL + +import "goInOut/config" + +type ConfigFile struct { + config.Consumer + IoConfig ioConfig `json:"ioConfig"` + OtherInfo map[string]string `json:"info"` +} +type ioConfig struct { + In In `json:"in"` + Out OUT `json:"out"` +} +type In struct { + Db config.DbConfig `json:"db"` +} + +type OUT struct { + Mqtt config.MqttConfig `json:"mqtt"` +} diff --git a/consumers/SinoGnssMySQL/dataModel.go b/consumers/SinoGnssMySQL/dataModel.go new file mode 100644 index 0000000..c127bc9 --- /dev/null +++ b/consumers/SinoGnssMySQL/dataModel.go @@ -0,0 +1,28 @@ +package SinoGnssMySQL + +import "time" + +type GnssData struct { + Id int64 `json:"id" db:"id"` + StationName string `json:"station_name" db:"station_name"` + GroupName string `json:"group_name" db:"group_name"` + Time time.Time `json:"time" db:"time"` + X float64 `json:"x" db:"x"` + Y float64 `json:"y" db:"y"` + H float64 `json:"h" db:"h"` +} + +type DxFile struct { + SensorId int `json:"S"` + Module string `json:"M"` + Channel int `json:"C"` + Error int `json:"R"` + Round int64 `json:"N"` + Timespan int64 `json:"T"` + Req []string `json:"Q"` + Acq []string `json:"A"` + RawValue []float64 `json:"RV"` + LimitValue []float64 `json:"LV"` + PhyValue []float64 `json:"PV"` + ThemeValue []float64 `json:"TV"` +} diff --git a/consumers/consumerAXYraw.go b/consumers/consumerAXYraw.go index 3121994..50be402 100644 --- a/consumers/consumerAXYraw.go +++ b/consumers/consumerAXYraw.go @@ -4,8 +4,8 @@ import ( "encoding/json" "goInOut/adaptors" "goInOut/consumers/AXYraw" - "goInOut/dbHelper" - "goInOut/dbHelper/_kafka" + "goInOut/dbOperate" + "goInOut/dbOperate/_kafka" "goInOut/models" "log" "sync" @@ -18,8 +18,8 @@ type consumerAXYraw struct { //具体配置 ConfigInfo AXYraw.ConfigFile InKafka _kafka.KafkaHelper - OutEs dbHelper.ESHelper - infoRedis *dbHelper.RedisHelper + OutEs dbOperate.ESHelper + infoRedis *dbOperate.RedisHelper sinkRawMap sync.Map lock sync.Mutex } @@ -65,7 +65,7 @@ func (the *consumerAXYraw) inputInitial() error { } func (the *consumerAXYraw) outputInitial() error { //数据出口 - the.OutEs = *dbHelper.NewESHelper( + the.OutEs = *dbOperate.NewESHelper( the.ConfigInfo.IoConfig.Out.Es.Address, the.ConfigInfo.IoConfig.Out.Es.Auth.UserName, the.ConfigInfo.IoConfig.Out.Es.Auth.Password, @@ -77,7 +77,7 @@ func (the *consumerAXYraw) outputInitial() error { func (the *consumerAXYraw) infoComponentInitial() error { //数据出口 addr := the.ConfigInfo.Info.QueryComponent.Redis.Address - the.infoRedis = dbHelper.NewRedisHelper("", addr) + the.infoRedis = dbOperate.NewRedisHelper("", addr) return nil } diff --git a/consumers/consumerCDJYSN.go b/consumers/consumerCDJYSN.go index c4a7d26..7ea6857 100644 --- a/consumers/consumerCDJYSN.go +++ b/consumers/consumerCDJYSN.go @@ -6,7 +6,7 @@ import ( "github.com/robfig/cron/v3" "goInOut/adaptors" "goInOut/consumers/CDJYSN" - "goInOut/dbHelper" + "goInOut/dbOperate" "goInOut/monitors" "goInOut/utils" "log" @@ -19,8 +19,8 @@ type consumerCDJYSN struct { dataCache chan []byte //具体配置 ConfigInfo CDJYSN.ConfigFile - //InHttp *dbHelper.HttpHelper - OutFile *dbHelper.FileSaveHelper + //InHttp *dbOperate.HttpHelper + OutFile *dbOperate.FileSaveHelper InHttp monitors.HttpMonitor configCron *cron.Cron } @@ -47,7 +47,7 @@ func (the *consumerCDJYSN) InputInitial() error { the.dataCache = make(chan []byte, 200) //数据入口 the.InHttp = monitors.HttpMonitor{ - HttpClient: &dbHelper.HttpHelper{Url: the.ConfigInfo.IoConfig.In.Http.Url, Token: ""}, + HttpClient: &dbOperate.HttpHelper{Url: the.ConfigInfo.IoConfig.In.Http.Url, Token: ""}, MonitorHelper: &monitors.MonitorHelper{CronStr: the.ConfigInfo.IoConfig.In.CronStr}, } @@ -57,7 +57,7 @@ func (the *consumerCDJYSN) InputInitial() error { } func (the *consumerCDJYSN) OutputInitial() error { //数据出口 - the.OutFile = &dbHelper.FileSaveHelper{ + the.OutFile = &dbOperate.FileSaveHelper{ Directory: the.ConfigInfo.IoConfig.Out.File.Directory, FilenameExtension: the.ConfigInfo.IoConfig.Out.File.FileNameExtension, } diff --git a/consumers/consumerCQZG.go b/consumers/consumerCQZG.go index bfb6329..5f0739f 100644 --- a/consumers/consumerCQZG.go +++ b/consumers/consumerCQZG.go @@ -4,7 +4,7 @@ import ( "encoding/json" "goInOut/adaptors" "goInOut/consumers/CQZG" - "goInOut/dbHelper" + "goInOut/dbOperate" "log" "os" "strings" @@ -16,8 +16,8 @@ type consumerCQZG struct { ch_t500101 chan []byte //具体配置 Info CQZG.ConfigFile - InMqtt *dbHelper.MqttHelper - outMqtt *dbHelper.MqttHelper + InMqtt *dbOperate.MqttHelper + outMqtt *dbOperate.MqttHelper } func (the *consumerCQZG) LoadConfigJson(cfgStr string) { @@ -43,7 +43,7 @@ func (the *consumerCQZG) InputInitial() error { log.Printf("RC4Key=%s", the.Info.Config.Rc4key) the.ch_t500101 = make(chan []byte, 200) //数据入口 - the.InMqtt = dbHelper.MqttInitial( + the.InMqtt = dbOperate.MqttInitial( the.Info.Config.InMqtt.Host, the.Info.Config.InMqtt.Port, the.Info.Config.InMqtt.ClientId, @@ -58,7 +58,7 @@ func (the *consumerCQZG) InputInitial() error { } func (the *consumerCQZG) OutputInitial() error { //数据出口 - the.outMqtt = dbHelper.MqttInitial( + the.outMqtt = dbOperate.MqttInitial( the.Info.Config.OutMqtt.Host, the.Info.Config.OutMqtt.Port, the.Info.Config.OutMqtt.ClientId, diff --git a/consumers/consumerGZGZM.go b/consumers/consumerGZGZM.go index 17c88aa..e022e40 100644 --- a/consumers/consumerGZGZM.go +++ b/consumers/consumerGZGZM.go @@ -6,8 +6,8 @@ import ( "fmt" "goInOut/adaptors" "goInOut/consumers/GZGZM" - "goInOut/dbHelper" - "goInOut/dbHelper/_kafka" + "goInOut/dbOperate" + "goInOut/dbOperate/_kafka" "io" "log" "math/rand" @@ -20,7 +20,7 @@ type consumerGZGZM struct { //具体配置 ConfigInfo GZGZM.ConfigFile InKafka _kafka.KafkaHelper - OutHttp *dbHelper.HttpHelper + OutHttp *dbOperate.HttpHelper } func (the *consumerGZGZM) RegisterPoint(thirdId string, sensorInfo GZGZM.SensorInfo) error { @@ -91,7 +91,7 @@ func (the *consumerGZGZM) inputInitial() error { } func (the *consumerGZGZM) outputInitial() error { //数据出口 - the.OutHttp = &dbHelper.HttpHelper{ + the.OutHttp = &dbOperate.HttpHelper{ Url: the.ConfigInfo.IoConfig.Out.Http.Url, } the.OutHttp.Initial() @@ -135,7 +135,7 @@ func (the *consumerGZGZM) tokenRefresh() { fmt.Sprintf("&random=%s", randomStr16) + fmt.Sprintf("×tamp=%d", timestamp) + fmt.Sprintf("&secret=%s", secret) - tokenHttp := &dbHelper.HttpHelper{ + tokenHttp := &dbOperate.HttpHelper{ Url: queryUrl, } tokenHttp.Initial() diff --git a/consumers/consumerHTJC.go b/consumers/consumerHTJC.go index 4069803..305acdb 100644 --- a/consumers/consumerHTJC.go +++ b/consumers/consumerHTJC.go @@ -4,7 +4,7 @@ import ( "encoding/json" "goInOut/adaptors" "goInOut/consumers/HTJC" - "goInOut/dbHelper" + "goInOut/dbOperate" "log" "strings" "time" @@ -15,8 +15,8 @@ type consumerHTJC struct { dataCache chan []byte //具体配置 Info HTJC.ConfigFile - InMqtt *dbHelper.MqttHelper - outUdp *dbHelper.UdpHelper + InMqtt *dbOperate.MqttHelper + outUdp *dbOperate.UdpHelper } func (the *consumerHTJC) LoadConfigJson(cfgStr string) { @@ -40,7 +40,7 @@ func (the *consumerHTJC) Initial(cfg string) error { func (the *consumerHTJC) InputInitial() error { the.dataCache = make(chan []byte, 1000) //数据入口 - the.InMqtt = dbHelper.MqttInitial( + the.InMqtt = dbOperate.MqttInitial( the.Info.Config.InMqtt.Host, the.Info.Config.InMqtt.Port, the.Info.Config.InMqtt.ClientId, @@ -55,7 +55,7 @@ func (the *consumerHTJC) InputInitial() error { } func (the *consumerHTJC) OutputInitial() error { //数据出口 - the.outUdp = &dbHelper.UdpHelper{ + the.outUdp = &dbOperate.UdpHelper{ Host: the.Info.Config.OutUdp.Host, Port: the.Info.Config.OutUdp.Port, } diff --git a/consumers/consumerHTTP_PRPXY.go b/consumers/consumerHTTP_PRPXY.go index 0be1372..3cd9896 100644 --- a/consumers/consumerHTTP_PRPXY.go +++ b/consumers/consumerHTTP_PRPXY.go @@ -5,7 +5,7 @@ import ( "fmt" "goInOut/config" "goInOut/consumers/HTTP_PRPXY" - "goInOut/dbHelper" + "goInOut/dbOperate" "goInOut/utils" "io" "log" @@ -17,8 +17,8 @@ type consumerHttpProxy struct { routes map[string]config.Router //具体配置 Info HTTP_PRPXY.ConfigFile - InApiServer *dbHelper.ApiServerHelper - outHttpPost *dbHelper.HttpHelper + InApiServer *dbOperate.ApiServerHelper + outHttpPost *dbOperate.HttpHelper } func (the *consumerHttpProxy) LoadConfigJson(cfgStr string) { @@ -42,7 +42,7 @@ func (the *consumerHttpProxy) Initial(cfg string) error { } func (the *consumerHttpProxy) InputInitial() error { //数据入口 - the.InApiServer = dbHelper.NewApiServer( + the.InApiServer = dbOperate.NewApiServer( the.Info.IoConfig.In.ApiServer.Port, the.Info.IoConfig.In.ApiServer.Routers, ) @@ -51,7 +51,7 @@ func (the *consumerHttpProxy) InputInitial() error { } func (the *consumerHttpProxy) OutputInitial() error { //数据出口 - the.outHttpPost = &dbHelper.HttpHelper{ + the.outHttpPost = &dbOperate.HttpHelper{ Url: the.Info.IoConfig.Out.HttpPost.Url, } the.outHttpPost.Initial() diff --git a/consumers/consumerJSNCGLQL.go b/consumers/consumerJSNCGLQL.go index c36b377..25c651c 100644 --- a/consumers/consumerJSNCGLQL.go +++ b/consumers/consumerJSNCGLQL.go @@ -4,7 +4,7 @@ import ( "encoding/json" "goInOut/adaptors" "goInOut/consumers/JSNCGLQL" - "goInOut/dbHelper" + "goInOut/dbOperate" "log" "strings" "time" @@ -15,8 +15,8 @@ type consumerJSNCGLQL struct { ch chan []adaptors.NeedPush //具体配置 Info JSNCGLQL.ConfigFile - InMqtt *dbHelper.MqttHelper - outMqtt *dbHelper.MqttHelper + InMqtt *dbOperate.MqttHelper + outMqtt *dbOperate.MqttHelper } func (the *consumerJSNCGLQL) LoadConfigJson(cfgStr string) { @@ -40,7 +40,7 @@ func (the *consumerJSNCGLQL) Initial(cfg string) error { func (the *consumerJSNCGLQL) InputInitial() error { the.ch = make(chan []adaptors.NeedPush, 200) //数据入口 - the.InMqtt = dbHelper.MqttInitial( + the.InMqtt = dbOperate.MqttInitial( the.Info.IoConfig.In.Mqtt.Host, the.Info.IoConfig.In.Mqtt.Port, the.Info.IoConfig.In.Mqtt.ClientId, @@ -55,7 +55,7 @@ func (the *consumerJSNCGLQL) InputInitial() error { } func (the *consumerJSNCGLQL) OutputInitial() error { //数据出口 - the.outMqtt = dbHelper.MqttInitial( + the.outMqtt = dbOperate.MqttInitial( the.Info.IoConfig.Out.Mqtt.Host, the.Info.IoConfig.Out.Mqtt.Port, the.Info.IoConfig.Out.Mqtt.ClientId, diff --git a/consumers/consumerMYX.go b/consumers/consumerMYX.go index a140a17..f31224e 100644 --- a/consumers/consumerMYX.go +++ b/consumers/consumerMYX.go @@ -7,7 +7,7 @@ import ( "fmt" "goInOut/adaptors" "goInOut/consumers/MYX" - "goInOut/dbHelper" + "goInOut/dbOperate" "goInOut/monitors" "golang.org/x/text/encoding/unicode" "log" @@ -22,9 +22,9 @@ type consumerMYX struct { dataCache chan []byte //具体配置 Info MYX.ConfigFile - InMqtt *dbHelper.MqttHelper + InMqtt *dbOperate.MqttHelper InFileMonitor *monitors.FileMonitor - outHttpPost *dbHelper.HttpHelper + outHttpPost *dbOperate.HttpHelper } func (the *consumerMYX) LoadConfigJson(cfgStr string) { @@ -48,7 +48,7 @@ func (the *consumerMYX) Initial(cfg string) error { func (the *consumerMYX) InputInitial() error { the.dataCache = make(chan []byte, 200) //mqtt数据入口 - the.InMqtt = dbHelper.MqttInitial( + the.InMqtt = dbOperate.MqttInitial( the.Info.IOConfig.InMqtt.Host, the.Info.IOConfig.InMqtt.Port, the.Info.IOConfig.InMqtt.ClientId, @@ -71,7 +71,7 @@ func (the *consumerMYX) InputInitial() error { } func (the *consumerMYX) OutputInitial() error { //数据出口 - the.outHttpPost = &dbHelper.HttpHelper{ + the.outHttpPost = &dbOperate.HttpHelper{ Url: the.Info.IOConfig.OutHttpPost.Url, } the.outHttpPost.Initial() diff --git a/consumers/consumerManage.go b/consumers/consumerManage.go index 636cc6e..68cb921 100644 --- a/consumers/consumerManage.go +++ b/consumers/consumerManage.go @@ -26,8 +26,11 @@ func GetConsumer(name string) (consumer IConsumer) { case "consumerJSNCGLQL": //工迅-广州高支模平台 consumer = new(consumerJSNCGLQL) - case "consumerHttpProxy": + case "consumerHttpProxy": consumer = new(consumerHttpProxy) + + case "consumerSinoGnssMySQL": + consumer = new(consumerSinoGnssMySQL) default: consumer = nil } diff --git a/consumers/consumerSinoGnssMySQL.go b/consumers/consumerSinoGnssMySQL.go new file mode 100644 index 0000000..a12f18e --- /dev/null +++ b/consumers/consumerSinoGnssMySQL.go @@ -0,0 +1,108 @@ +package consumers + +import ( + "encoding/json" + "goInOut/adaptors" + "goInOut/consumers/SinoGnssMySQL" + "goInOut/dbOperate" + "log" + "time" +) + +type consumerSinoGnssMySQL struct { + //数据缓存管道 + ch chan []adaptors.NeedPush + //具体配置 + Info SinoGnssMySQL.ConfigFile + InDB *dbOperate.DBHelper + outMqtt *dbOperate.MqttHelper +} + +func (the *consumerSinoGnssMySQL) LoadConfigJson(cfgStr string) { + // 将 JSON 格式的数据解析到结构体中 + err := json.Unmarshal([]byte(cfgStr), &the.Info) + if err != nil { + log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) + panic(err) + } +} + +func (the *consumerSinoGnssMySQL) Initial(cfg string) error { + the.LoadConfigJson(cfg) + err := the.InputInitial() + if err != nil { + return err + } + err = the.OutputInitial() + return err +} +func (the *consumerSinoGnssMySQL) InputInitial() error { + the.ch = make(chan []adaptors.NeedPush, 200) + //数据入口 + the.InDB = dbOperate.NewDBHelper( + the.Info.IoConfig.In.Db.Type, + the.Info.IoConfig.In.Db.ConnStr) + + return nil +} +func (the *consumerSinoGnssMySQL) OutputInitial() error { + //数据出口 + the.outMqtt = dbOperate.MqttInitial( + the.Info.IoConfig.Out.Mqtt.Host, + the.Info.IoConfig.Out.Mqtt.Port, + the.Info.IoConfig.Out.Mqtt.ClientId, + the.Info.IoConfig.Out.Mqtt.UserName, + the.Info.IoConfig.Out.Mqtt.Password, + false, //按照具体项目来 + "") + return nil +} +func (the *consumerSinoGnssMySQL) Work() { + //测试 + the.onData() + + go func() { + for { + needPushList := <-the.ch + log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) + + for _, outTopic := range the.Info.IoConfig.Out.Mqtt.Topics { + for _, push := range needPushList { + log.Printf("推送[%s]: len=%d", outTopic, len(push.Payload)) + //hex.EncodeToString(pushBytes) + if push.Topic != "" { + outTopic = push.Topic + } + the.outMqtt.Publish(outTopic, push.Payload) + } + + } + time.Sleep(100 * time.Millisecond) + } + }() +} +func (the *consumerSinoGnssMySQL) onData() { + sql := `select d.id,d.station_name,p.group_name,d.time,d.x,d.y,d.h from data_gnss_202412 as d +LEFT JOIN datasolution as p +ON d.station_name=p.sn +where p.group_name is not null +ORDER BY p.group_name +limit 10;` + var GnssDatas []SinoGnssMySQL.GnssData + err := the.InDB.Query(&GnssDatas, sql) + if err != nil { + return + } + + adaptor := the.getAdaptor() + + needPush := adaptor.Transform(GnssDatas) + if len(needPush) > 0 { + the.ch <- needPush + } + +} +func (the *consumerSinoGnssMySQL) getAdaptor() (adaptor adaptors.Adaptor_SINOMYSQL_AXYMQTT) { + + return adaptors.Adaptor_SINOMYSQL_AXYMQTT{} +} diff --git a/consumers/consumerWJHP.go b/consumers/consumerWJHP.go index 046dc30..4cdab69 100644 --- a/consumers/consumerWJHP.go +++ b/consumers/consumerWJHP.go @@ -4,7 +4,7 @@ import ( "encoding/json" "goInOut/adaptors" "goInOut/consumers/WJHP" - "goInOut/dbHelper" + "goInOut/dbOperate" "goInOut/monitors" "log" "strings" @@ -16,9 +16,9 @@ type consumerWJHP struct { dataCache chan []byte //具体配置 Info WJHP.ConfigFile - InMqtt *dbHelper.MqttHelper + InMqtt *dbOperate.MqttHelper InFileMonitor *monitors.FileMonitor - outHttpPost *dbHelper.HttpHelper + outHttpPost *dbOperate.HttpHelper } func (the *consumerWJHP) LoadConfigJson(cfgStr string) { @@ -42,7 +42,7 @@ func (the *consumerWJHP) Initial(cfg string) error { func (the *consumerWJHP) InputInitial() error { the.dataCache = make(chan []byte, 200) //mqtt数据入口 - the.InMqtt = dbHelper.MqttInitial( + the.InMqtt = dbOperate.MqttInitial( the.Info.IOConfig.InMqtt.Host, the.Info.IOConfig.InMqtt.Port, the.Info.IOConfig.InMqtt.ClientId, @@ -58,7 +58,7 @@ func (the *consumerWJHP) InputInitial() error { } func (the *consumerWJHP) OutputInitial() error { //数据出口 - the.outHttpPost = &dbHelper.HttpHelper{ + the.outHttpPost = &dbOperate.HttpHelper{ Url: the.Info.IOConfig.OutHttpPost.Url, } the.outHttpPost.Initial() diff --git a/dbHelper/db_test.go b/dbHelper/db_test.go deleted file mode 100644 index f7973d5..0000000 --- a/dbHelper/db_test.go +++ /dev/null @@ -1,23 +0,0 @@ -package dbHelper - -import "testing" - -type res struct { - RLLYCJ string `json:"LLYCJ"` - RLLCacheMap string `json:"LLCacheMap"` -} - -func TestRedis(t *testing.T) { - addr := "10.8.30.160:30379" - redis := NewRedisHelper("", addr) - - key1 := "RLLYCJ" - //v := redis.Get(key1) - //println(v) - - key2 := "RLLCacheMap" - res1 := res{} - - v2 := redis.MGet(&res1, key1, key2) - println(v2) -} diff --git a/dbHelper/_kafka/consumerGroupHandler.go b/dbOperate/_kafka/consumerGroupHandler.go similarity index 100% rename from dbHelper/_kafka/consumerGroupHandler.go rename to dbOperate/_kafka/consumerGroupHandler.go diff --git a/dbHelper/_kafka/kafkaHelper.go b/dbOperate/_kafka/kafkaHelper.go similarity index 100% rename from dbHelper/_kafka/kafkaHelper.go rename to dbOperate/_kafka/kafkaHelper.go diff --git a/dbHelper/_kafka/kafka_test.go b/dbOperate/_kafka/kafka_test.go similarity index 100% rename from dbHelper/_kafka/kafka_test.go rename to dbOperate/_kafka/kafka_test.go diff --git a/dbHelper/_kafka/producerHelper.go b/dbOperate/_kafka/producerHelper.go similarity index 100% rename from dbHelper/_kafka/producerHelper.go rename to dbOperate/_kafka/producerHelper.go diff --git a/dbHelper/apiServerHelper.go b/dbOperate/apiServerHelper.go similarity index 98% rename from dbHelper/apiServerHelper.go rename to dbOperate/apiServerHelper.go index a7e2686..34e17c9 100644 --- a/dbHelper/apiServerHelper.go +++ b/dbOperate/apiServerHelper.go @@ -1,4 +1,4 @@ -package dbHelper +package dbOperate import ( "fmt" diff --git a/dbOperate/dbHelper.go b/dbOperate/dbHelper.go new file mode 100644 index 0000000..436c0e9 --- /dev/null +++ b/dbOperate/dbHelper.go @@ -0,0 +1,101 @@ +package dbOperate + +import ( + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" + _ "github.com/lib/pq" + _ "github.com/mattn/go-sqlite3" + "log" + "time" +) + +const ( + postgres = iota + 1 + sqlite3 +) + +type DBHelper struct { + dbClient *sqlx.DB + dbType string + ConnectStr string +} + +func NewDBHelper(dbType string, connectStr string) *DBHelper { + the := &DBHelper{} + switch dbType { + case "postgres": + fallthrough + case "mysql": + fallthrough + case "sqlite3": + fallthrough + case "有效的数据库类型": + the.dbType = dbType + the.ConnectStr = connectStr + default: + log.Panicf("不支持的数据库类型=> %s", dbType) + + } + return the +} + +// sqlite3 => connectStr := os.Getwd() + "/db/cz.db" +// mysql => "user:password@tcp(127.0.0.1:3306)/databaseName" +// pg => "host=192.168.10.64 port=5432 user=postgres password=123456 dbname=headscale sslmode=disable" +func (the *DBHelper) dbOpen() error { + var err error + tdb, err := sqlx.Open(the.dbType, the.ConnectStr) + if err != nil { + return err + } + if err = tdb.Ping(); err != nil { + return err + } + the.dbClient = tdb + return nil +} +func (the *DBHelper) Exec(dbRecordSQL string) error { + if the.dbClient == nil { + if openErr := the.dbOpen(); openErr != nil { + //logger.Info("[%s]数据库链接失败,err=%v\n", time.Now().Format("2006-01-02 15:04:05.000"), openErr) + return openErr + } + } + execResult, execErr := the.dbClient.Exec(dbRecordSQL) + defer the.dbClient.Close() + if execErr != nil { + return execErr + } + if n, err := execResult.RowsAffected(); n > 0 { + return nil + } else { + log.Printf("[%s]执行sql[%s]失败\n", time.Now().Format("2006-01-02 15:04:05.000"), dbRecordSQL) + return err + } + +} + +func (the *DBHelper) Query(dest any, sql string) error { + + start := time.Now() + + if the.dbClient == nil { + if err := the.dbOpen(); err != nil { + log.Printf("数据库链接失败:%s", err.Error()) + return err + } + } + err := the.dbClient.Select(dest, sql) + if err != nil { + log.Printf("数据库查询失败:%s,\n sql=%s", err.Error(), sql) + } + + durTime := time.Since(start).Seconds() + log.Printf("查询耗时:%v s", durTime) + return err + +} + +func (the *DBHelper) Close() error { + return the.Close() +} diff --git a/dbOperate/db_test.go b/dbOperate/db_test.go new file mode 100644 index 0000000..dcb339f --- /dev/null +++ b/dbOperate/db_test.go @@ -0,0 +1,37 @@ +package dbOperate + +import ( + "goInOut/models" + "testing" +) + +type res struct { + RLLYCJ string `json:"LLYCJ"` + RLLCacheMap string `json:"LLCacheMap"` +} + +func TestRedis(t *testing.T) { + addr := "10.8.30.160:30379" + redis := NewRedisHelper("", addr) + + key1 := "RLLYCJ" + //v := redis.Get(key1) + //println(v) + + key2 := "RLLCacheMap" + res1 := res{} + + v2 := redis.MGet(&res1, key1, key2) + println(v2) +} + +func TestPg(t *testing.T) { + dbType := "postgres" + connectStr := "host=10.8.30.32 port=5432 user=postgres password=123 dbname=NBJJ1215-T sslmode=disable" + db := NewDBHelper(dbType, connectStr) + sql := "select * from t_agg_way" + var r []models.AggWay + db.Query(&r, sql) + println("===") + +} diff --git a/dbHelper/elasticsearchHelper.go b/dbOperate/elasticsearchHelper.go similarity index 99% rename from dbHelper/elasticsearchHelper.go rename to dbOperate/elasticsearchHelper.go index 523cade..502b760 100644 --- a/dbHelper/elasticsearchHelper.go +++ b/dbOperate/elasticsearchHelper.go @@ -1,4 +1,4 @@ -package dbHelper +package dbOperate import ( "bytes" diff --git a/dbHelper/fileSaveHelper.go b/dbOperate/fileSaveHelper.go similarity index 97% rename from dbHelper/fileSaveHelper.go rename to dbOperate/fileSaveHelper.go index 1402c90..f1455f6 100644 --- a/dbHelper/fileSaveHelper.go +++ b/dbOperate/fileSaveHelper.go @@ -1,4 +1,4 @@ -package dbHelper +package dbOperate import ( "io" diff --git a/dbHelper/httpHelper.go b/dbOperate/httpHelper.go similarity index 99% rename from dbHelper/httpHelper.go rename to dbOperate/httpHelper.go index 1d11648..a180e91 100644 --- a/dbHelper/httpHelper.go +++ b/dbOperate/httpHelper.go @@ -1,4 +1,4 @@ -package dbHelper +package dbOperate import ( "bytes" diff --git a/dbHelper/influxDBHelper.go b/dbOperate/influxDBHelper.go similarity index 99% rename from dbHelper/influxDBHelper.go rename to dbOperate/influxDBHelper.go index a5bc65b..23dbd09 100644 --- a/dbHelper/influxDBHelper.go +++ b/dbOperate/influxDBHelper.go @@ -1,4 +1,4 @@ -package dbHelper +package dbOperate import ( "context" diff --git a/dbHelper/mqttHelper.go b/dbOperate/mqttHelper.go similarity index 99% rename from dbHelper/mqttHelper.go rename to dbOperate/mqttHelper.go index da5871d..eef3fa8 100644 --- a/dbHelper/mqttHelper.go +++ b/dbOperate/mqttHelper.go @@ -1,4 +1,4 @@ -package dbHelper +package dbOperate import ( "crypto/tls" diff --git a/dbHelper/redisHelper.go b/dbOperate/redisHelper.go similarity index 99% rename from dbHelper/redisHelper.go rename to dbOperate/redisHelper.go index 3048e16..4cdf550 100644 --- a/dbHelper/redisHelper.go +++ b/dbOperate/redisHelper.go @@ -1,4 +1,4 @@ -package dbHelper +package dbOperate import ( "context" diff --git a/dbHelper/udpHelper.go b/dbOperate/udpHelper.go similarity index 98% rename from dbHelper/udpHelper.go rename to dbOperate/udpHelper.go index f96da6a..c9135ca 100644 --- a/dbHelper/udpHelper.go +++ b/dbOperate/udpHelper.go @@ -1,4 +1,4 @@ -package dbHelper +package dbOperate import ( "fmt" diff --git a/go.mod b/go.mod index e69a73f..26bab5d 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,16 @@ module goInOut -go 1.22 +go 1.23 require ( github.com/IBM/sarama v1.43.3 github.com/eclipse/paho.mqtt.golang v1.4.3 github.com/elastic/go-elasticsearch/v6 v6.8.10 - github.com/gin-gonic/gin v1.9.1 + github.com/go-sql-driver/mysql v1.8.1 github.com/influxdata/influxdb-client-go/v2 v2.13.0 + github.com/jmoiron/sqlx v1.4.0 + github.com/lib/pq v1.10.9 + github.com/mattn/go-sqlite3 v1.14.24 github.com/redis/go-redis/v9 v9.7.0 github.com/robfig/cron/v3 v3.0.1 golang.org/x/text v0.17.0 @@ -16,22 +19,14 @@ require ( ) require ( + filippo.io/edwards25519 v1.1.0 // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect - github.com/bytedance/sonic v1.10.0-rc3 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect - github.com/chenzhuoyu/iasm v0.9.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect - github.com/gabriel-vasile/mimetype v1.4.2 // indirect - github.com/gin-contrib/sse v0.1.0 // indirect - github.com/go-playground/locales v0.14.1 // indirect - github.com/go-playground/universal-translator v0.18.1 // indirect - github.com/go-playground/validator/v10 v10.14.1 // indirect - github.com/goccy/go-json v0.10.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.3.1 // indirect github.com/gorilla/websocket v1.5.0 // indirect @@ -44,25 +39,11 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.9 // indirect - github.com/klauspost/cpuid/v2 v2.2.5 // indirect - github.com/kr/text v0.2.0 // indirect - github.com/leodido/go-urn v1.2.4 // indirect - github.com/mattn/go-isatty v0.0.19 // indirect - github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect - github.com/modern-go/reflect2 v1.0.2 // indirect github.com/oapi-codegen/runtime v1.0.0 // indirect - github.com/pelletier/go-toml/v2 v2.0.9 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect - github.com/rogpeppe/go-internal v1.12.0 // indirect - github.com/twitchyliquid64/golang-asm v0.15.1 // indirect - github.com/ugorji/go/codec v1.2.11 // indirect - golang.org/x/arch v0.4.0 // indirect golang.org/x/crypto v0.26.0 // indirect golang.org/x/net v0.28.0 // indirect golang.org/x/sync v0.8.0 // indirect - golang.org/x/sys v0.23.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index d7f49c1..64a1995 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA= github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ= github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= @@ -8,19 +10,8 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= -github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= -github.com/bytedance/sonic v1.10.0-rc/go.mod h1:ElCzW+ufi8qKqNW0FY314xriJhyJhuoJ3gFZdAHF7NM= -github.com/bytedance/sonic v1.10.0-rc3 h1:uNSnscRapXTwUgTyOF0GVljYD08p9X/Lbr9MweSV3V0= -github.com/bytedance/sonic v1.10.0-rc3/go.mod h1:iZcSUejdk5aukTND/Eu/ivjQuEL0Cu9/rf50Hi0u/g4= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= -github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= -github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d h1:77cEq6EriyTZ0g/qfRdp61a3Uu/AWrgIq2s0ClJV1g0= -github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d/go.mod h1:8EPpVsBuRksnlj1mLy4AWzRNQYxauNi62uWcE3to6eA= -github.com/chenzhuoyu/iasm v0.9.0 h1:9fhXjVzq5hUy2gkhhgHl95zG2cEAhw9OSGs8toWWAwo= -github.com/chenzhuoyu/iasm v0.9.0/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -38,28 +29,13 @@ github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8F github.com/elastic/go-elasticsearch/v6 v6.8.10/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= -github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= -github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= -github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= -github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg= -github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU= -github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= -github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= -github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= -github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= -github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= -github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= -github.com/go-playground/validator/v10 v10.14.1 h1:9c50NUPC30zyuKprjL3vNZ0m5oG+jU0zvx4AqHGnv4k= -github.com/go-playground/validator/v10 v10.14.1/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= -github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= -github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= @@ -89,32 +65,18 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= -github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= +github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= -github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= -github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= -github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= -github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= -github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= -github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= -github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= -github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= -github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM= +github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/oapi-codegen/runtime v1.0.0 h1:P4rqFX5fMFWqRzY9M/3YF9+aPSPPB06IzP2P7oOxrWo= github.com/oapi-codegen/runtime v1.0.0/go.mod h1:LmCUMQuPB4M/nLXilQXhHw+BLZdDb18B34OO356yJ/A= -github.com/pelletier/go-toml/v2 v2.0.9 h1:uH2qQXheeefCCkuBBSLi7jCiSmj3VRh2+Goq2N7Xxu0= -github.com/pelletier/go-toml/v2 v2.0.9/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -125,30 +87,18 @@ github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= -github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= -github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= -github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= -github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= -golang.org/x/arch v0.4.0 h1:A8WCeEWhLwPBKNbFi5Wv5UTCBx5zzubnXDlMOFAzFMc= -golang.org/x/arch v0.4.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= @@ -173,9 +123,6 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= -golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -196,13 +143,9 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= -rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/models/aggWay.go b/models/aggWay.go new file mode 100644 index 0000000..01d2cd6 --- /dev/null +++ b/models/aggWay.go @@ -0,0 +1,6 @@ +package models + +type AggWay struct { + Id int64 `db:"id"` + Name string `db:"name"` +} diff --git a/monitors/httpMonitor.go b/monitors/httpMonitor.go index 124e0e0..b3d14cb 100644 --- a/monitors/httpMonitor.go +++ b/monitors/httpMonitor.go @@ -1,9 +1,9 @@ package monitors -import "goInOut/dbHelper" +import "goInOut/dbOperate" type HttpMonitor struct { - HttpClient *dbHelper.HttpHelper + HttpClient *dbOperate.HttpHelper *MonitorHelper } diff --git a/testUnit/mqttPush_test.go b/testUnit/mqttPush_test.go index 6f8e051..468d65e 100644 --- a/testUnit/mqttPush_test.go +++ b/testUnit/mqttPush_test.go @@ -3,7 +3,7 @@ package testUnit import ( "encoding/hex" "goInOut/consumers/CQZG/protoFiles" - "goInOut/dbHelper" + "goInOut/dbOperate" "google.golang.org/protobuf/proto" "log" "testing" @@ -34,7 +34,7 @@ func TestMqttUpload(t *testing.T) { hexStr := hex.EncodeToString(dataStr) log.Println(err, hexStr) - mqHelpers := dbHelper.MqttHelper{ + mqHelpers := dbOperate.MqttHelper{ Host: "mqtt.datahub.anxinyun.cn", Port: 1883, ClientId: "lzwjdq500101-lk", diff --git a/testUnit/mqttRecv_test.go b/testUnit/mqttRecv_test.go index 5f97174..187fe3a 100644 --- a/testUnit/mqttRecv_test.go +++ b/testUnit/mqttRecv_test.go @@ -1,14 +1,14 @@ package testUnit import ( - "goInOut/dbHelper" + "goInOut/dbOperate" "log" "testing" "time" ) func TestMqttRecv(t *testing.T) { - mqHelpers := dbHelper.MqttHelper{ + mqHelpers := dbOperate.MqttHelper{ Host: "10.8.30.160", Port: 30883, ClientId: "lzwjdq500101-lk", diff --git a/testUnit/udpPush_test.go b/testUnit/udpPush_test.go index 8aa1e09..4e125ba 100644 --- a/testUnit/udpPush_test.go +++ b/testUnit/udpPush_test.go @@ -1,14 +1,14 @@ package testUnit import ( - "goInOut/dbHelper" + "goInOut/dbOperate" "strconv" "testing" "time" ) func TestUDP_push(t *testing.T) { - udp := dbHelper.UdpHelper{ + udp := dbOperate.UdpHelper{ Host: "10.8.30.110", Port: 8888, } diff --git a/testUnit/安心云http转发_test.go b/testUnit/安心云http转发_test.go index 0cea01e..b18b483 100644 --- a/testUnit/安心云http转发_test.go +++ b/testUnit/安心云http转发_test.go @@ -1,9 +1,12 @@ package testUnit import ( + "encoding/json" "fmt" + "goInOut/models" "goInOut/utils" "testing" + "time" ) // 定义一个结构体,包含嵌套的结构体字段 @@ -36,3 +39,42 @@ func Test_template(t *testing.T) { fmt.Println(err.Error()) } } + +func Test_timeHandler(t *testing.T) { + rawMsg := `{ + "userId": "ce2d7eb2-e56e-422e-8bbe-95dfa18e32f8", + "thingId": "14862308-083e-46e1-a422-d7d6a1e2825d", + "dimensionId": "47386f69-c5aa-4ae7-b6cc-0490a1dc0b14", + "dimCapId": "0d561c0b-4dca-4104-abc0-1f0c40a71382", + "capId": "d4965875-354b-4294-87f4-c4ba9f9260ab", + "deviceId": "9c43a09c-3c65-42d3-9a54-42b87e0e5af2", + "scheduleId": "1cfebf18-81a2-489e-bcfb-efc294d8ce3d", + "taskId": "b58858ed-9e23-4ac9-9f9c-44f9e057aee9", + "jobId": 1, + "jobRepeatId": 1, + "triggerTime": "2024-12-04T18:23:04+16:00", + "realTime": "0001-01-01T00:00:00Z", + "finishTime": "2024-12-04T10:23:07.909922675+08:00", + "seq": 0, + "released": false, + "data": { + "type": 1, + "data": { + "physicalvalue": 0 + }, + "result": { + "code": 0, + "msg": "", + "detail": null, + "errTimes": 0, + "dropped": false + } + } + }` + + iotaData := models.IotaData{} + json.Unmarshal([]byte(rawMsg), &iotaData) + var cstZone = time.FixedZone("CST", 8*3600) // 东八区 + time := iotaData.TriggerTime.In(cstZone).Format("2006-01-02T15:04:05.000+0800") + println(time) +}