diff --git a/adaptors/安心云最新设备数据toES.go b/adaptors/安心云最新设备数据toES.go new file mode 100644 index 0000000..ee3d1fe --- /dev/null +++ b/adaptors/安心云最新设备数据toES.go @@ -0,0 +1,129 @@ +package adaptors + +import ( + "encoding/json" + "fmt" + "goUpload/consumers/AXYraw" + "goUpload/consumers/GZGZM" + "goUpload/dbHelper" + "goUpload/models" + "log" + "math" + "time" +) + +// Adaptor_AXY_LastRAW 安心云 kafka iota数据 转换 es设备数据 +type Adaptor_AXY_LastRAW struct { + AXYraw.Info + Redis *dbHelper.RedisHelper +} + +func (the Adaptor_AXY_LastRAW) Transform(topic, rawMsg string) []byte { + iotaData := models.IotaData{} + json.Unmarshal([]byte(rawMsg), &iotaData) + return the.Theme2GzGZM(iotaData) +} + +func (the Adaptor_AXY_LastRAW) Theme2GzGZM(iotaData models.IotaData) (result []byte) { + if !iotaData.Data.Success() { + return + } + log.Printf("设备[%s] 数据时间 %s", iotaData.DeviceId, iotaData.TriggerTime) + the.GetDeviceInfo(iotaData.DeviceId) + return result +} +func (the Adaptor_AXY_LastRAW) getSensorId(sensorId string) GZGZM.SensorInfo { + s := GZGZM.SensorInfo{} + //if v, ok := the.SensorInfoMap[sensorId]; ok { + // s = v + //} + return s +} +func (the Adaptor_AXY_LastRAW) getCodeBytes(sensorCode int16) []byte { + + bytes := make([]byte, 0) + bytes = append(bytes, + byte(sensorCode&0xFF), + byte(sensorCode>>8), + ) + + return bytes +} + +func (the Adaptor_AXY_LastRAW) getTimeBytes(sensorTime time.Time) []byte { + + year := int8(sensorTime.Year() - 1900) + month := int8(sensorTime.Month()) + day := int8(sensorTime.Day()) + hour := int8(sensorTime.Hour()) + minute := int8(sensorTime.Minute()) + millisecond := uint16(sensorTime.Second()*1000 + sensorTime.Nanosecond()/1e6) + bytes := make([]byte, 0) + bytes = append(bytes, + byte(year), + byte(month), + byte(day), + byte(hour), + byte(minute), + byte(millisecond&0xFF), + byte(millisecond>>8), + ) + + return bytes +} + +func (the Adaptor_AXY_LastRAW) getDatasBytes(datas []float32) []byte { + + bytes := make([]byte, 0) + for _, data := range datas { + bits := math.Float32bits(data) + bytes = append(bytes, + byte(bits&0xFF), + byte(bits>>8&0xFF), + byte(bits>>16&0xFF), + byte(bits>>24&0xFF), + ) + } + + return bytes +} + +func (the Adaptor_AXY_LastRAW) getPayloadHeader(floatCount int16) []byte { + + bytes := make([]byte, 0) + + bytes = append(bytes, + //报文类型 + 0x02, + 0x00, + //1:上行信息 + 0x01, + //默认,通讯计算机编号 + 0x00, + //命令码 + 0x01, + //报文长度 + byte((floatCount*4+9)&0xFF), + byte((floatCount*4+9)>>8), + ) + + return bytes +} + +func (the Adaptor_AXY_LastRAW) GetDeviceInfo(deviceId string) []byte { + Key_Iota_device := "iota_device" + key_Thing_struct := "thing_struct" + key_Iota_meta := "iota_meta" + k1 := fmt.Sprintf("%s:%s", Key_Iota_device, deviceId) + dev := models.IotaDevice{} + ts := models.ThingStruct{} + devMeta := models.DeviceMeta{} + err1 := the.Redis.GetObj(k1, &dev) + k2 := fmt.Sprintf("%s:%s", key_Thing_struct, dev.ThingId) + err2 := the.Redis.GetObj(k2, &ts) + k3 := fmt.Sprintf("%s:%s", key_Iota_meta, dev.DeviceMeta.Id) + err3 := the.Redis.GetObj(k3, &devMeta) + println(err1, err2, err3) + + return make([]byte, 0) +} diff --git a/build/Dockerfile b/build/Dockerfile index 9d44059..72d634a 100644 --- a/build/Dockerfile +++ b/build/Dockerfile @@ -8,7 +8,7 @@ RUN go env -w GO111MODULE=on RUN CGO_ENABLED=0 go build -a -v -o app.exe main.go -FROM registry.ngaiot.com/base-images/golang:1.20-fs-10 +FROM registry.ngaiot.com/base-images/alpine_3.20_cst:7 WORKDIR /app/ COPY --from=0 /app/app.exe /app COPY --from=0 /app/configFiles /app diff --git a/config/configStruct.go b/config/configStruct.go index 88a7e4a..306432e 100644 --- a/config/configStruct.go +++ b/config/configStruct.go @@ -18,6 +18,15 @@ type KafkaConfig struct { Topics []string `json:"topics"` } +type EsConfig struct { + Address []string `json:"address"` + Index string `json:"index"` + Auth struct { + UserName string `json:"userName"` + Password string `json:"password"` + } `json:"auth"` +} + type UdpConfig struct { Host string `json:"host"` Port int `json:"port"` diff --git a/config/init.go b/config/init.go index 9dc18aa..7fda0ab 100644 --- a/config/init.go +++ b/config/init.go @@ -22,24 +22,16 @@ func LoadConfigJson() map[string]string { log.Printf("非文件[%s]跳过", file.Name()) continue } - // 读取mqtt配置 - configMqttBytes, _ := os.ReadFile(fmt.Sprintf("configFiles/%s", file.Name())) + // 读取配置 + configBytes, _ := os.ReadFile(fmt.Sprintf("configFiles/%s", file.Name())) consumer := new(Consumer) // 将 JSON 格式的数据解析到结构体中 - err = json.Unmarshal(configMqttBytes, &consumer) + err = json.Unmarshal(configBytes, &consumer) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", consumer.Consumer, err.Error()) continue } - allConfig[consumer.Consumer] = string(configMqttBytes) - //// 读取sensor配置 - //configSensorBytes, _ := os.ReadFile("configSensor.json") - //// 将 JSON 格式的数据解析到结构体中 - //err2 := json.Unmarshal(configSensorBytes, &userConfig.SensorConfig) - //if err2 != nil { - // log.Printf("读取mqtt配置异常 err=%v", err2.Error()) - //} - //allConfig[] + allConfig[consumer.Consumer] = string(configBytes) } return allConfig } diff --git a/configFiles/config_安心云设备数据_最新同步.json b/configFiles/config_安心云设备数据_最新同步.json new file mode 100644 index 0000000..164452c --- /dev/null +++ b/configFiles/config_安心云设备数据_最新同步.json @@ -0,0 +1,36 @@ +{ + "consumer": "consumerAXYraw", + "ioConfig": { + "in": { + "kafka": { + "brokers": [ + "10.8.30.160:30992" + ], + "groupId": "synchronizeRaw", + "topics": [ + "RawData" + ] + } + }, + "out": { + "es": { + "address": ["http://10.8.30.142:30092"], + "index": "anxincloud_raws_last", + "auth": { + "userName": "post", + "password": "123" + } + } + } + }, + "info": { + "common": { + "structureId": "3676" + }, + "queryComponent":{ + "redis": { + "address": "10.8.30.142:30379" + } + } + } +} \ No newline at end of file diff --git a/configFiles/config_魏家滑坡_视觉位移.json b/configFiles/弃用备份/config_魏家滑坡_视觉位移.json similarity index 100% rename from configFiles/config_魏家滑坡_视觉位移.json rename to configFiles/弃用备份/config_魏家滑坡_视觉位移.json diff --git a/constKey/key.go b/constKey/key.go deleted file mode 100644 index e237c25..0000000 --- a/constKey/key.go +++ /dev/null @@ -1 +0,0 @@ -package constKey diff --git a/consumers/AXYraw/config.go b/consumers/AXYraw/config.go new file mode 100644 index 0000000..d3911fe --- /dev/null +++ b/consumers/AXYraw/config.go @@ -0,0 +1,31 @@ +package AXYraw + +import "goUpload/config" + +type ConfigFile struct { + config.Consumer + IoConfig ioConfig `json:"ioConfig"` + Info Info `json:"info"` +} +type ioConfig struct { + In in `json:"in"` + Out out `json:"out"` +} +type in struct { + Kafka config.KafkaConfig `json:"kafka"` +} + +type out struct { + Es config.EsConfig `json:"es"` +} + +type Info struct { + Common map[string]string `json:"common"` + QueryComponent queryComponent `json:"queryComponent"` +} + +type queryComponent struct { + Redis struct { + Address string `json:"address"` + } `json:"redis"` +} diff --git a/consumers/consumerAXYraw.go b/consumers/consumerAXYraw.go new file mode 100644 index 0000000..0adb36d --- /dev/null +++ b/consumers/consumerAXYraw.go @@ -0,0 +1,128 @@ +package consumers + +import ( + "encoding/json" + "goUpload/adaptors" + "goUpload/consumers/AXYraw" + "goUpload/dbHelper" + "goUpload/dbHelper/_kafka" + "log" + "time" +) + +type consumerAXYraw struct { + //数据缓存管道 + dataCache chan []byte + //具体配置 + ConfigInfo AXYraw.ConfigFile + InKafka _kafka.KafkaHelper + OutEs dbHelper.ESHelper + infoRedis *dbHelper.RedisHelper +} + +func (the *consumerAXYraw) LoadConfigJson(cfgStr string) { + // 将 JSON 格式的数据解析到结构体中 + err := json.Unmarshal([]byte(cfgStr), &the.ConfigInfo) + if err != nil { + log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) + panic(err) + } +} + +func (the *consumerAXYraw) Initial(cfg string) error { + the.dataCache = make(chan []byte, 200) + + the.LoadConfigJson(cfg) + err := the.inputInitial() + if err != nil { + return err + } + err = the.outputInitial() + if err != nil { + return err + } + err = the.infoComponentInitial() + return err +} +func (the *consumerAXYraw) inputInitial() error { + //数据入口 + the.InKafka = _kafka.KafkaHelper{ + Brokers: the.ConfigInfo.IoConfig.In.Kafka.Brokers, + GroupId: the.ConfigInfo.IoConfig.In.Kafka.GroupId, + } + the.InKafka.Initial() + for _, inTopic := range the.ConfigInfo.IoConfig.In.Kafka.Topics { + the.InKafka.Subscribe(inTopic, the.onData) + } + + the.InKafka.Worker() + return nil +} +func (the *consumerAXYraw) outputInitial() error { + //数据出口 + the.OutEs = *dbHelper.NewESHelper( + the.ConfigInfo.IoConfig.Out.Es.Address, + the.ConfigInfo.IoConfig.Out.Es.Auth.UserName, + the.ConfigInfo.IoConfig.Out.Es.Auth.Password, + ) + + return nil +} + +func (the *consumerAXYraw) infoComponentInitial() error { + //数据出口 + addr := the.ConfigInfo.Info.QueryComponent.Redis.Address + the.infoRedis = dbHelper.NewRedisHelper("", addr) + + return nil +} + +func (the *consumerAXYraw) RefreshTask() { + the.tokenRefresh() + ticker := time.NewTicker(24 * time.Hour) + defer ticker.Stop() + for true { + <-ticker.C + the.tokenRefresh() + } +} + +func (the *consumerAXYraw) tokenRefresh() { + +} + +func (the *consumerAXYraw) Work() { + + go func() { + for { + pushBytes := <-the.dataCache + log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache)) + + log.Printf("推送[%v]: len=%d", "OutEs", len(pushBytes)) + //the.OutEs.PublishWithHeader(pushBytes, map[string]string{"Authorization": the.OutEs.Token}) + time.Sleep(10 * time.Millisecond) + } + + }() +} +func (the *consumerAXYraw) onData(topic string, msg string) bool { + if len(msg) > 80 { + log.Printf("recv:[%s]:%s ...", topic, msg[:80]) + } + adaptor := the.getAdaptor() + if adaptor != nil { + needPush := adaptor.Transform(topic, msg) + + if len(needPush) > 0 { + the.dataCache <- needPush + } + } + return true +} +func (the *consumerAXYraw) getAdaptor() (adaptor adaptors.IAdaptor3) { + + adaptor = adaptors.Adaptor_AXY_LastRAW{ + Redis: the.infoRedis, + } + return adaptor +} diff --git a/consumers/consumerManage.go b/consumers/consumerManage.go index ba2ff82..d13370b 100644 --- a/consumers/consumerManage.go +++ b/consumers/consumerManage.go @@ -19,6 +19,9 @@ func GetConsumer(name string) (consumer IConsumer) { case "consumerGZGZM": //工迅-广州高支模平台 consumer = new(consumerGZGZM) + + case "consumerAXYraw": //工迅-广州高支模平台 + consumer = new(consumerAXYraw) default: consumer = nil } diff --git a/dbHelper/db_test.go b/dbHelper/db_test.go new file mode 100644 index 0000000..f7973d5 --- /dev/null +++ b/dbHelper/db_test.go @@ -0,0 +1,23 @@ +package dbHelper + +import "testing" + +type res struct { + RLLYCJ string `json:"LLYCJ"` + RLLCacheMap string `json:"LLCacheMap"` +} + +func TestRedis(t *testing.T) { + addr := "10.8.30.160:30379" + redis := NewRedisHelper("", addr) + + key1 := "RLLYCJ" + //v := redis.Get(key1) + //println(v) + + key2 := "RLLCacheMap" + res1 := res{} + + v2 := redis.MGet(&res1, key1, key2) + println(v2) +} diff --git a/dbHelper/elasticsearchHelper.go b/dbHelper/elasticsearchHelper.go new file mode 100644 index 0000000..7c18d19 --- /dev/null +++ b/dbHelper/elasticsearchHelper.go @@ -0,0 +1,231 @@ +package dbHelper + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + elasticsearch6 "github.com/elastic/go-elasticsearch/v6" + "github.com/elastic/go-elasticsearch/v6/esapi" + "goUpload/models" + "io" + "log" + "strings" +) + +type ESHelper struct { + addresses []string + //org string + esClient *elasticsearch6.Client +} + +func NewESHelper(addresses []string, user, pwd string) *ESHelper { + es, _ := elasticsearch6.NewClient(elasticsearch6.Config{ + Addresses: addresses, + Username: user, + Password: pwd, + }) + res, err := es.Info() + if err != nil { + log.Fatalf("Error getting response: %s", err) + } + log.Printf("链接到es[%s]info=%v", elasticsearch6.Version, res) + return &ESHelper{ + addresses: addresses, + esClient: es, + } +} +func (the *ESHelper) SearchRaw(index, reqBody string) []models.HitRaw { + body := &bytes.Buffer{} + body.WriteString(reqBody) + response, err := the.esClient.Search( + the.esClient.Search.WithIndex(index), + the.esClient.Search.WithBody(body), + ) + defer response.Body.Close() + if err != nil { + return nil + } + r := models.EsRawResp{} + // Deserialize the response into a map. + if err := json.NewDecoder(response.Body).Decode(&r); err != nil { + log.Fatalf("Error parsing the response body: %s", err) + } + return r.Hits.Hits +} +func (the *ESHelper) Search(index, reqBody string) { + body := &bytes.Buffer{} + body.WriteString(reqBody) + response, err := the.esClient.Search( + the.esClient.Search.WithIndex(index), + the.esClient.Search.WithBody(body), + ) + + if err != nil { + //return nil, err + } + log.Println(response.Status()) + var r map[string]any + // Deserialize the response into a map. + if err := json.NewDecoder(response.Body).Decode(&r); err != nil { + log.Fatalf("Error parsing the response body: %s", err) + } + // Print the response status, number of results, and request duration. + log.Printf( + "[%s] %d hits; took: %dms", + response.Status(), + int(r["hits"].(map[string]any)["total"].(float64)), + int(r["took"].(float64)), + ) + + for _, hit := range r["hits"].(map[string]any)["hits"].([]any) { + + source := hit.(map[string]any)["_source"] + log.Printf(" * ID=%s, %s", hit.(map[string]any)["_id"], source) + } + log.Println(strings.Repeat("=", 37)) +} +func (the *ESHelper) request(index, reqBody string) (map[string]any, error) { + // Set up the request object. + req := esapi.IndexRequest{ + Index: index, + //DocumentID: strconv.Itoa(i + 1), + Body: strings.NewReader(reqBody), + Refresh: "true", + } + // Perform the request with the client. + res, err := req.Do(context.Background(), the.esClient) + if err != nil { + log.Fatalf("Error getting response: %s", err) + } + defer res.Body.Close() + var r map[string]any + if res.IsError() { + log.Printf("[%s] Error indexing document ID=%d", res.Status(), 0) + } else { + // Deserialize the response into a map. + + if err := json.NewDecoder(res.Body).Decode(&r); err != nil { + log.Printf("Error parsing the response body: %s", err) + } else { + // Print the response status and indexed document version. + log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64))) + } + } + return r, err +} + +func (the *ESHelper) searchRaw(index, reqBody string) (models.IotaData, error) { + respmap, err := the.request(index, reqBody) + if respmap != nil { + + } + iotaDatas := models.IotaData{} + return iotaDatas, err +} + +func (the *ESHelper) searchThemes(index, reqBody string) (models.EsThemeResp, error) { + body := &bytes.Buffer{} + body.WriteString(reqBody) + response, err := the.esClient.Search( + the.esClient.Search.WithIndex(index), + the.esClient.Search.WithBody(body), + ) + defer response.Body.Close() + if err != nil { + //return nil, err + } + log.Println(response.Status()) + r := models.EsThemeResp{} + // Deserialize the response into a map. + if err := json.NewDecoder(response.Body).Decode(&r); err != nil { + log.Fatalf("Error parsing the response body: %s", err) + } + return r, err +} +func (the *ESHelper) SearchLatestStationData(index string, sensorId int) (models.EsTheme, error) { + //sensorId := 178 + queryBody := fmt.Sprintf(`{ + "size": 1, + "query": { + "term": { + "sensor": { + "value": %d + } + } + }, + "sort": [ + { + "collect_time": { + "order": "desc" + } + } + ] +}`, sensorId) + //index := "go_native_themes" + themes, err := the.searchThemes(index, queryBody) + + var theme models.EsTheme + if len(themes.Hits.Hits) > 0 { + theme = themes.Hits.Hits[0].Source + } + + return theme, err +} +func (the *ESHelper) BulkWrite(index, reqBody string) { + + body := &bytes.Buffer{} + body.WriteString(reqBody) + bulkRequest := esapi.BulkRequest{ + Index: index, + Body: body, + DocumentType: "_doc", + } + res, err := bulkRequest.Do(context.Background(), the.esClient) + defer res.Body.Close() + if err != nil { + log.Panicf("es 写入[%s],err=%s", index, err.Error()) + } + + if res.StatusCode != 200 && res.StatusCode != 201 { + respBody, _ := io.ReadAll(res.Body) + log.Panicf("es 写入失败,err=%s \n body=%s", string(respBody), reqBody) + } + //log.Printf("es 写入[%s],字符长度=%d,完成", index, len(reqBody)) + +} + +func (the *ESHelper) BulkWriteRaws2Es(index string, raws []models.EsRaw) { + body := strings.Builder{} + for _, raw := range raws { + // scala => val id = UUID.nameUUIDFromBytes(s"${v.deviceId}-${v.acqTime.getMillis}".getBytes("UTF-8")).toString + source, _ := json.Marshal(raw) + _id := fmt.Sprintf("%s-%d", raw.IotaDevice, raw.CollectTime.UnixMilli()) + s := fmt.Sprintf( + `{"index": {"_index": "%s","_id": "%s"}} +%s +`, index, _id, source) + body.WriteString(s) + } + the.BulkWrite(index, body.String()) + +} + +func (the *ESHelper) BulkWriteRaws2EsLast(index string, raws []models.EsRaw) { + body := strings.Builder{} + for _, raw := range raws { + source, _ := json.Marshal(raw) + _id := raw.IotaDevice + s := fmt.Sprintf( + `{"index": {"_index": "%s","_id": "%s"}} +%s +`, index, _id, source) + body.WriteString(s) + } + the.BulkWrite(index, body.String()) + +} + +func (the *ESHelper) Close() { + +} diff --git a/dbHelper/httpHelper.go b/dbHelper/httpHelper.go index 75932cc..7232434 100644 --- a/dbHelper/httpHelper.go +++ b/dbHelper/httpHelper.go @@ -173,7 +173,7 @@ func HttpPostFormDataWithHeader(url string, queryBody string, headers map[string req.Header.Add(k, v) } - fmt.Printf("http post 开始请求,%s,\n,%s \n", url, req) + fmt.Printf("http post 开始请求,%s,\n,%v \n", url, req) resp, err := client.Do(req) if err != nil { fmt.Println("请求POST异常 ", err, resp) diff --git a/dbHelper/influxDBHelper.go b/dbHelper/influxDBHelper.go index ef31874..a5bc65b 100644 --- a/dbHelper/influxDBHelper.go +++ b/dbHelper/influxDBHelper.go @@ -85,7 +85,7 @@ from(bucket:"%v") } // check for an error if result.Err() != nil { - fmt.Printf("query parsing error: %\n", result.Err().Error()) + fmt.Printf("query parsing error: %s \n", result.Err().Error()) } } else { log.Printf("influxDB 查询异常 %s", err.Error()) diff --git a/dbHelper/mqttHelper.go b/dbHelper/mqttHelper.go index 00af870..da5871d 100644 --- a/dbHelper/mqttHelper.go +++ b/dbHelper/mqttHelper.go @@ -92,10 +92,9 @@ func (the *MqttHelper) Publish(topic string, messageBytes []byte) { func (the *MqttHelper) Subscribe(topic string, myCallback func(topic string, callMsg string)) { var callback mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { - log.Printf("收到数据 [%v] TOPIC: %s,MSGLen: %v", msg.MessageID(), msg.Topic(), len(msg.Payload())) + //log.Printf("收到数据 [%v] TOPIC: %s,MSGLen: %v", msg.MessageID(), msg.Topic(), len(msg.Payload())) Msg := string(msg.Payload()) myCallback(msg.Topic(), Msg) - //log.Println("消息处理结束") } log.Printf("=================开始订阅 %s [%s]=================", the.Host, topic) t := the.client.Subscribe(topic, 1, callback) diff --git a/dbHelper/redisHelper.go b/dbHelper/redisHelper.go new file mode 100644 index 0000000..3048e16 --- /dev/null +++ b/dbHelper/redisHelper.go @@ -0,0 +1,133 @@ +package dbHelper + +import ( + "context" + "encoding/json" + "errors" + "github.com/redis/go-redis/v9" + "log" + "time" +) + +type RedisHelper struct { + rdb redis.UniversalClient + isReady bool + ctx context.Context +} + +func NewRedisHelper(master string, address ...string) *RedisHelper { + r := &RedisHelper{ctx: context.Background()} + r.InitialCluster(master, address...) + return r + +} + +func (the *RedisHelper) InitialCluster(master string, address ...string) { + + the.rdb = redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: address, + MasterName: master, + }) + log.Printf("redis 初始化完成 %s", address) + the.isReady = true +} + +func (the *RedisHelper) Get(key string) string { + val, err := the.rdb.Get(the.ctx, key).Result() + if errors.Is(err, redis.Nil) { + log.Printf("%s does not exist", key) + } else if err != nil { + panic(err) + } else { + //log.Printf("get key => %s =%s", key, val) + } + return val +} + +func (the *RedisHelper) GetObj(keys string, addr any) error { + err := the.rdb.Get(the.ctx, keys).Scan(addr) + if errors.Is(err, redis.Nil) { + log.Printf("%s does not exist", keys) + } else if err != nil { + es := err.Error() + log.Printf("err=%s ", es) + return err + } + return nil +} +func (the *RedisHelper) SetObj(keys string, obj any) error { + rs, err := the.rdb.Set(the.ctx, keys, obj, time.Minute*5).Result() + log.Printf("rs=%s ", rs) + if err != nil { + log.Printf("err=%s ", err.Error()) + } + return err +} +func (the *RedisHelper) GetLRange(keys string, addr any) error { + err := the.rdb.LRange(the.ctx, keys, 0, -1).ScanSlice(addr) + if errors.Is(err, redis.Nil) { + log.Printf("%s does not exist", keys) + } else if err != nil { + log.Printf("err=%s ", err.Error()) + return err + } + return nil +} +func (the *RedisHelper) MGet(addr any, keys ...string) error { + err := the.rdb.MGet(the.ctx, keys...).Scan(addr) + if errors.Is(err, redis.Nil) { + log.Printf("%s does not exist", keys) + } else if err != nil { + log.Printf("err=%s ", err.Error()) + return err + } + + return err +} +func (the *RedisHelper) MGetObj(addr any, keys ...string) error { + err := the.rdb.MGet(the.ctx, keys...).Scan(addr) + if errors.Is(err, redis.Nil) { + log.Printf("%s does not exist", keys) + } else if err != nil { + log.Printf("err=%s ", err.Error()) + return err + } + return nil +} +func (the *RedisHelper) HMGetObj(addr any, key, field string) error { + rp, err := the.rdb.HMGet(the.ctx, key, field).Result() + if errors.Is(err, redis.Nil) { + log.Printf("%s does not exist", key) + return err + } else if err != nil { + log.Printf("err=%s ", err.Error()) + return err + } + for _, i := range rp { + if v, ok := i.(string); ok { + err := json.Unmarshal([]byte(v), addr) + if err != nil { + return err + } + } + } + //todo scan有问题 后续待排查 + return nil + + //err := the.rdb.HMGet(the.ctx, key, field).Scan(addr) + //if errors.Is(err, redis.Nil) { + // log.Printf("%s does not exist", key) + //} else if err != nil { + // log.Printf("err=%s ", err.Error()) + // return err + //} + //return nil +} + +func (the *RedisHelper) SRem(key string, members ...string) int64 { + return the.rdb.SRem(the.ctx, key, members).Val() +} + +func (the *RedisHelper) SAdd(key string, members ...string) int64 { + return the.rdb.SAdd(the.ctx, key, members).Val() +} diff --git a/go.mod b/go.mod index ebc56e6..777e577 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,10 @@ go 1.21 require ( github.com/IBM/sarama v1.43.3 github.com/eclipse/paho.mqtt.golang v1.4.3 + github.com/elastic/go-elasticsearch/v6 v6.8.10 github.com/gin-gonic/gin v1.9.1 github.com/influxdata/influxdb-client-go/v2 v2.13.0 + github.com/redis/go-redis/v9 v9.7.0 github.com/robfig/cron/v3 v3.0.1 golang.org/x/text v0.17.0 google.golang.org/protobuf v1.31.0 @@ -16,9 +18,11 @@ require ( require ( github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/bytedance/sonic v1.10.0-rc3 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect github.com/chenzhuoyu/iasm v0.9.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect diff --git a/go.sum b/go.sum index 2c506ad..d7f49c1 100644 --- a/go.sum +++ b/go.sum @@ -4,10 +4,16 @@ github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMz github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.10.0-rc/go.mod h1:ElCzW+ufi8qKqNW0FY314xriJhyJhuoJ3gFZdAHF7NM= github.com/bytedance/sonic v1.10.0-rc3 h1:uNSnscRapXTwUgTyOF0GVljYD08p9X/Lbr9MweSV3V0= github.com/bytedance/sonic v1.10.0-rc3/go.mod h1:iZcSUejdk5aukTND/Eu/ivjQuEL0Cu9/rf50Hi0u/g4= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d h1:77cEq6EriyTZ0g/qfRdp61a3Uu/AWrgIq2s0ClJV1g0= @@ -18,6 +24,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= @@ -26,6 +34,8 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= +github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8FlUwqG5sPzr6a8= +github.com/elastic/go-elasticsearch/v6 v6.8.10/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= @@ -111,6 +121,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= +github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= diff --git a/models/IotaData.go b/models/IotaData.go new file mode 100644 index 0000000..87d386e --- /dev/null +++ b/models/IotaData.go @@ -0,0 +1,40 @@ +package models + +import ( + "time" +) + +type IotaData struct { + UserId string `json:"userId"` + ThingId string `json:"thingId"` + DimensionId string `json:"dimensionId"` + DimCapId string `json:"dimCapId"` + CapId string `json:"capId"` + DeviceId string `json:"deviceId"` + ScheduleId string `json:"scheduleId"` + TaskId string `json:"taskId"` + JobId int `json:"jobId"` + JobRepeatId int `json:"jobRepeatId"` + TriggerTime time.Time `json:"triggerTime"` + RealTime time.Time `json:"realTime"` + FinishTime time.Time `json:"finishTime"` + Seq int `json:"seq"` + Released bool `json:"released"` + Data Data `json:"data"` +} + +type Data struct { + Type int `json:"type"` + Data map[string]any `json:"data"` + Result struct { + Code int `json:"code"` + Msg string `json:"msg"` + Detail string `json:"detail"` + ErrTimes int `json:"errTimes"` + Dropped bool `json:"dropped"` + } `json:"result"` +} + +func (the *Data) Success() bool { + return the.Result.Code == 0 +} diff --git a/models/deviceInfo.go b/models/deviceInfo.go new file mode 100644 index 0000000..c612b04 --- /dev/null +++ b/models/deviceInfo.go @@ -0,0 +1,43 @@ +package models + +import "encoding/json" + +type DeviceInfo struct { + Id string `json:"id"` + Name string `json:"name"` + Structure Structure `json:"structure"` + DeviceMeta DeviceMeta `json:"device_meta"` +} + +type DeviceMeta struct { + Id string `json:"id"` + Name string `json:"name"` + Model string `json:"model"` + Properties []IotaProperty `json:"properties"` + Capabilities []IotaCapability `json:"capabilities"` +} + +// redis序列化 +func (m *DeviceMeta) MarshalBinary() (data []byte, err error) { + return json.Marshal(m) +} + +// redis序列化 +func (m *DeviceMeta) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, m) + +} + +type IotaCapability struct { + CapabilityCategoryId int `json:"capabilityCategoryId"` + Id string `json:"id"` + Name string `json:"name"` + Properties []IotaProperty `json:"properties"` +} + +type IotaProperty struct { + Category string `json:"category"` + Name string `json:"name"` + ShowName string `json:"showName"` + Unit string `json:"unit"` +} diff --git a/models/esRaw.go b/models/esRaw.go new file mode 100644 index 0000000..b893c24 --- /dev/null +++ b/models/esRaw.go @@ -0,0 +1,37 @@ +package models + +import "time" + +type EsRaw struct { + StructId int `json:"structId"` + IotaDeviceName string `json:"iota_device_name"` + Data map[string]any `json:"data"` + CollectTime time.Time `json:"collect_time"` + Meta map[string]string `json:"meta"` + IotaDevice string `json:"iota_device"` + CreateTime time.Time `json:"create_time"` +} + +type EsRawResp struct { + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Shards struct { + Total int `json:"total"` + Successful int `json:"successful"` + Skipped int `json:"skipped"` + Failed int `json:"failed"` + } `json:"_shards"` + Hits struct { + Total int `json:"total"` + MaxScore float64 `json:"max_score"` + Hits []HitRaw `json:"hits"` + } `json:"hits"` +} + +type HitRaw struct { + Index string `json:"_index"` + Type string `json:"_type"` + Id string `json:"_id"` + Score float64 `json:"_score"` + Source EsRaw `json:"_source"` +} diff --git a/models/esTheme.go b/models/esTheme.go new file mode 100644 index 0000000..ae31dbb --- /dev/null +++ b/models/esTheme.go @@ -0,0 +1,41 @@ +package models + +import "time" + +type EsTheme struct { + SensorName string `json:"sensor_name"` + FactorName string `json:"factor_name"` + FactorProtoCode string `json:"factor_proto_code"` + Data map[string]any `json:"data"` + FactorProtoName string `json:"factor_proto_name"` + Factor int `json:"factor"` + CollectTime time.Time `json:"collect_time"` + Sensor int `json:"sensor"` + Structure int `json:"structure"` + IotaDevice []string `json:"iota_device"` + CreateTime time.Time `json:"create_time"` +} + +type EsThemeResp struct { + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Shards struct { + Total int `json:"total"` + Successful int `json:"successful"` + Skipped int `json:"skipped"` + Failed int `json:"failed"` + } `json:"_shards"` + Hits struct { + Total int `json:"total"` + MaxScore float64 `json:"max_score"` + Hits []HitTheme `json:"hits"` + } `json:"hits"` +} + +type HitTheme struct { + Index string `json:"_index"` + Type string `json:"_type"` + Id string `json:"_id"` + Score float64 `json:"_score"` + Source EsTheme `json:"_source"` +} diff --git a/models/iotaDevice.go b/models/iotaDevice.go new file mode 100644 index 0000000..56ee80f --- /dev/null +++ b/models/iotaDevice.go @@ -0,0 +1,25 @@ +package models + +import ( + "encoding/json" +) + +type IotaDevice struct { + Id string `json:"id"` + Name string `json:"name"` + Properties string `json:"properties"` + DeviceMetaId string `json:"deviceMetaId"` + ThingId string `json:"thingId"` + DeviceMeta DeviceMeta `json:"deviceMeta"` +} + +// redis序列化 +func (m *IotaDevice) MarshalBinary() (data []byte, err error) { + return json.Marshal(m) +} + +// redis序列化 +func (m *IotaDevice) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, m) + +} diff --git a/models/savoirTheme.go b/models/savoirTheme.go index 8a320f5..515298a 100644 --- a/models/savoirTheme.go +++ b/models/savoirTheme.go @@ -47,11 +47,3 @@ type SavoirTheme struct { DataEmpty bool `json:"dataEmpty"` RawAgg bool `json:"rawAgg"` } - -type Structure struct { - ThingId string `json:"thingId"` - Id int `json:"id"` - Name string `json:"name"` - Type string `json:"type"` - OrgId int `json:"orgId"` -} diff --git a/models/structure.go b/models/structure.go new file mode 100644 index 0000000..a10a230 --- /dev/null +++ b/models/structure.go @@ -0,0 +1,31 @@ +package models + +import ( + "encoding/json" +) + +type Structure struct { + ThingId string `json:"thingId"` + Id int `json:"id"` + Name string `json:"name"` + Type string `json:"type"` + OrgId int `json:"orgId"` +} + +type ThingStruct struct { + ThingId string `json:"thingId"` + Id int `json:"id"` + Name string `json:"name"` + Type string `json:"type"` + OrgId int `json:"orgId"` +} + +// redis序列化 +func (m *ThingStruct) MarshalBinary() (data []byte, err error) { + return json.Marshal(m) +} + +// redis序列化 +func (m *ThingStruct) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, m) +}