diff --git a/adaptors/知物云主题数据to广州高支模省平台.go b/adaptors/知物云主题数据to广州高支模省平台.go index 023a78e..fdaf28b 100644 --- a/adaptors/知物云主题数据to广州高支模省平台.go +++ b/adaptors/知物云主题数据to广州高支模省平台.go @@ -55,19 +55,26 @@ func (the Adaptor_THEME_GZGZM) Theme2GzGZM(theme models.SavoirTheme) (result []b switch theme.Station.Factor.Id { case models.Savoir_FactorType_QX_ZL: //x,y,force if v, ok := theme.Data["force"]; ok { - //赋值 - postData.Data.MeasuredValue1 = v + if floatValue, ok := v.(float64); ok { + //赋值 + postData.Data.MeasuredValue1 = floatValue + } + } case models.Savoir_FactorType_QX_SP: //x,y,displacement if v, ok := theme.Data["displacement"]; ok { - //赋值 - postData.Data.MeasuredValue1 = v + if floatValue, ok := v.(float64); ok { + //赋值 + postData.Data.MeasuredValue1 = floatValue + } } case models.Savoir_FactorType_QX_CZ: //x,y,settling if v, ok := theme.Data["settling"]; ok { - //赋值 - postData.Data.MeasuredValue1 = v + if floatValue, ok := v.(float64); ok { + //赋值 + postData.Data.MeasuredValue1 = floatValue + } } default: return diff --git a/adaptors/知物云最新主题数据toES.go b/adaptors/知物云最新主题数据toES.go new file mode 100644 index 0000000..c6379d2 --- /dev/null +++ b/adaptors/知物云最新主题数据toES.go @@ -0,0 +1,65 @@ +package adaptors + +import ( + "encoding/json" + "goInOut/consumers/AXYraw" + "goInOut/dbOperate" + "goInOut/models" + "log" + "time" +) + +// Adaptor_Savoir_LastTheme 知物云 kafka theme数据 转换 es设备数据 +type Adaptor_Savoir_LastTheme struct { + AXYraw.Info + Redis *dbOperate.RedisHelper +} + +func (the Adaptor_Savoir_LastTheme) Transform(topic, rawMsg string) *models.EsTheme { + theme := models.SavoirTheme{} + err := json.Unmarshal([]byte(rawMsg), &theme) + if err != nil { + log.Printf("反序列化 异常 dev数据=%s", rawMsg) + return nil + } + + return the.Theme2Es(theme) +} + +func (the Adaptor_Savoir_LastTheme) Theme2Es(theme models.SavoirTheme) *models.EsTheme { + if theme.DataEmpty || len(theme.Data) == 0 { + bs, _ := json.Marshal(theme) + log.Printf("测点[%s]数据为空 => %s", theme.Station.Name, bs) + return nil + } + + ////log.Printf("设备[%s] 数据时间 %s", iotaData.DeviceId, iotaData.TriggerTime) + //deviceInfo := the.GetDeviceInfo(iotaData.DeviceId) + // + ////查不到信息的数据 + //if deviceInfo.Name == "" { + // log.Printf("设备[%s] 无deviceInfo信息 %s", iotaData.DeviceId, iotaData.TriggerTime) + // return nil + //} + // + Atime, err := time.Parse("2006-01-02T15:04:05.000+0800", theme.AcqTime) + if err != nil { + log.Printf("知物云 测点[%s] 数据时间 %s 解析错误", theme.Station.Name, theme.AcqTime) + return nil + } + + themeData := &models.EsTheme{ + SensorName: theme.Station.Name, + FactorName: theme.Station.Factor.Name, + FactorProtoCode: theme.Station.Factor.ProtoCode, + Data: theme.Data, + FactorProtoName: theme.Station.Factor.ProtoName, + Factor: theme.Station.Factor.Id, + CollectTime: Atime, + Sensor: theme.Station.Id, + Structure: theme.Station.Structure.Id, + IotaDevice: []string{}, + CreateTime: time.Now(), + } + return themeData +} diff --git a/configFiles/config_知物云测点数据_最新同步.yaml b/configFiles/config_知物云测点数据_最新同步.yaml new file mode 100644 index 0000000..16e3794 --- /dev/null +++ b/configFiles/config_知物云测点数据_最新同步.yaml @@ -0,0 +1,27 @@ +consumer: consumerSavoirTheme +ioConfig: + in: + kafka: + brokers: + - 10.8.30.160:30992 + groupId: savoir_last_theme_inout + topics: + - savoir_theme + out: + es: + address: + - "http://10.8.30.160:30092" + index: "savoir_last_theme" + auth: + userName: post + password: 123 + interval: 30 +monitor: + + cron10min: 6/10 * * * * + cron1hour: 45 0/1 * * * + +queryComponent: + redis: + address: 10.8.30.160:30379 + diff --git a/configFiles/config_知物云_组合告警.yaml b/configFiles/弃用备份/config_知物云_组合告警.yaml similarity index 100% rename from configFiles/config_知物云_组合告警.yaml rename to configFiles/弃用备份/config_知物云_组合告警.yaml diff --git a/consumers/SavoirTheme/config.go b/consumers/SavoirTheme/config.go new file mode 100644 index 0000000..397c49e --- /dev/null +++ b/consumers/SavoirTheme/config.go @@ -0,0 +1,31 @@ +package SavoirTheme + +import "goInOut/config" + +type ConfigFile struct { + IoConfig ioConfig `yaml:"ioConfig"` + Monitor map[string]string `yaml:"monitor"` + QueryComponent queryComponent `yaml:"queryComponent"` +} +type ioConfig struct { + In in `json:"in"` + Out out `json:"out"` +} +type in struct { + Kafka config.KafkaConfig `json:"kafka"` +} + +type out struct { + Es config.EsConfig `json:"es"` +} + +type Info struct { + //Common map[string]string `json:"common"` + QueryComponent queryComponent `json:"queryComponent"` +} + +type queryComponent struct { + Redis struct { + Address string `json:"address"` + } `json:"redis"` +} diff --git a/consumers/consumerManage.go b/consumers/consumerManage.go index ed11339..bbc7beb 100644 --- a/consumers/consumerManage.go +++ b/consumers/consumerManage.go @@ -49,6 +49,10 @@ func GetConsumer(name string) (consumer IConsumer) { case "consumerAlarmCombination": consumer = new(consumerAlarmCombination) + + case "consumerSavoirTheme": + consumer = new(consumerSavoirTheme) + default: consumer = nil } diff --git a/consumers/consumerSavoirTheme.go b/consumers/consumerSavoirTheme.go new file mode 100644 index 0000000..c9097ce --- /dev/null +++ b/consumers/consumerSavoirTheme.go @@ -0,0 +1,163 @@ +package consumers + +import ( + "encoding/json" + "goInOut/adaptors" + "goInOut/consumers/SavoirTheme" + "goInOut/dbOperate" + "goInOut/dbOperate/_kafka" + "goInOut/models" + "gopkg.in/yaml.v3" + "log" + "sync" + "time" +) + +type consumerSavoirTheme struct { + //数据缓存管道 + dataCache chan *models.EsTheme + //具体配置 + Info SavoirTheme.ConfigFile + InKafka _kafka.KafkaHelper + OutEs dbOperate.ESHelper + infoRedis *dbOperate.RedisHelper + sinkMap sync.Map + lock sync.Mutex + logTagId int +} + +func (the *consumerSavoirTheme) LoadConfigJson(cfgStr string) { + // 将 JSON 格式的数据解析到结构体中 + err := yaml.Unmarshal([]byte(cfgStr), &the.Info) + if err != nil { + log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) + panic(err) + } +} + +func (the *consumerSavoirTheme) Initial(cfg string) error { + the.sinkMap = sync.Map{} + the.dataCache = make(chan *models.EsTheme, 500) + + the.LoadConfigJson(cfg) + err := the.inputInitial() + if err != nil { + return err + } + err = the.outputInitial() + if err != nil { + return err + } + err = the.infoComponentInitial() + return err +} +func (the *consumerSavoirTheme) inputInitial() error { + //数据入口 + the.InKafka = _kafka.KafkaHelper{ + Brokers: the.Info.IoConfig.In.Kafka.Brokers, + GroupId: the.Info.IoConfig.In.Kafka.GroupId, + } + the.InKafka.Initial() + for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics { + the.InKafka.Subscribe(inTopic, the.onData) + } + + the.InKafka.Worker() + return nil +} +func (the *consumerSavoirTheme) outputInitial() error { + //数据出口 + the.OutEs = *dbOperate.NewESHelper( + the.Info.IoConfig.Out.Es.Address, + the.Info.IoConfig.Out.Es.Auth.UserName, + the.Info.IoConfig.Out.Es.Auth.Password, + ) + + return nil +} + +func (the *consumerSavoirTheme) infoComponentInitial() error { + //数据出口 + //addr := the.Info.OtherInfo..Redis.Address + //the.infoRedis = dbOperate.NewRedisHelper("", addr) + + return nil +} + +func (the *consumerSavoirTheme) sinkTask() { + intervalSec := the.Info.IoConfig.Out.Es.Interval + ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) + defer ticker.Stop() + for { + <-ticker.C + the.toSink() + } +} + +func (the *consumerSavoirTheme) toSink() { + var themes []models.EsTheme + the.lock.Lock() + defer the.lock.Unlock() + the.sinkMap.Range(func(key, value any) bool { + if v, ok := value.(*models.EsTheme); ok { + themes = append(themes, *v) + //零时打日志用 + if v.Sensor == the.logTagId { + bs, _ := json.Marshal(v) + log.Printf("toSink -> Range 标记测点数据 [%d] %s ", the.logTagId, string(bs)) + } + return ok + } else { + log.Printf("!!! toSink -> Range 类型转换异常 [%v]", key) + } + return true + }) + if len(themes) > 0 { + index := the.Info.IoConfig.Out.Es.Index + log.Printf("写入es [%s] %d条", index, len(themes)) + the.OutEs.BulkWriteThemes2Es(index, themes) + the.sinkMap.Clear() + } +} + +func (the *consumerSavoirTheme) Work() { + log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId) + go the.sinkTask() + go func() { + for { + pushEsTheme := <-the.dataCache + + if pushEsTheme.Sensor == the.logTagId { + bs, _ := json.Marshal(pushEsTheme) + log.Printf("存储 标记测点数据 [%d] %s ", the.logTagId, string(bs)) + } + + //有效数据存入缓存 + the.lock.Lock() + the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme) + the.lock.Unlock() + } + + }() +} +func (the *consumerSavoirTheme) onData(topic string, msg string) bool { + //if len(msg) > 80 { + // log.Printf("recv:[%s]:%s ...", topic, msg[:80]) + //} + adaptor := adaptors.Adaptor_Savoir_LastTheme{} + + needPush := adaptor.Transform(topic, msg) + + if needPush != nil && needPush.Data != nil { + the.dataCache <- needPush + } else { + s, _ := json.Marshal(needPush) + if needPush != nil { + if needPush.Sensor == the.logTagId { + log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s) + } + } + } + + return true +} diff --git a/dbOperate/elasticsearchHelper.go b/dbOperate/elasticsearchHelper.go index 502b760..3dc9e14 100644 --- a/dbOperate/elasticsearchHelper.go +++ b/dbOperate/elasticsearchHelper.go @@ -218,10 +218,7 @@ func (the *ESHelper) BulkWriteWithLog(index, reqBody string) { } -func (the *ESHelper) BulkWriteRaws2Es(index string, raws []models.EsRaw) { - - //log 测试用 - const logTagDeviceId = "91da4d1f-fbc7-4dad-bedd-f8ff05c0e0e0" +func (the *ESHelper) BulkWriteRaws2Es(index string, raws []models.EsRaw, logIds ...string) { logTag := false @@ -236,8 +233,8 @@ func (the *ESHelper) BulkWriteRaws2Es(index string, raws []models.EsRaw) { `, index, _id, source) body.WriteString(s) - if raw.IotaDevice == logTagDeviceId { - log.Printf("BulkWriteRaws2Es 标记设备数据 [%s] %s ", logTagDeviceId, string(s)) + if raw.IotaDevice == logIds[0] { + log.Printf("BulkWriteRaws2Es 标记设备数据 [%s] %s ", raw.IotaDevice, s) logTag = true } } @@ -264,6 +261,34 @@ func (the *ESHelper) BulkWriteRaws2EsLast(index string, raws []models.EsRaw) { } +func (the *ESHelper) BulkWriteThemes2Es(index string, themes []models.EsTheme, logIds ...int) { + + logTag := false + + body := strings.Builder{} + for _, raw := range themes { + // scala => val id = UUID.nameUUIDFromBytes(s"${v.deviceId}-${v.acqTime.getMillis}".getBytes("UTF-8")).toString + source, _ := json.Marshal(raw) + _id := raw.Sensor + s := fmt.Sprintf( + `{"index": {"_index": "%s","_id": "%d"}} +%s +`, index, _id, source) + body.WriteString(s) + + if len(logIds) > 0 && raw.Sensor == logIds[0] { + log.Printf("BulkWriteRaws2Es 标记设备数据 [%s] %s ", raw.IotaDevice, s) + logTag = true + } + } + if logTag { //追踪数据 + the.BulkWriteWithLog(index, body.String()) + } else { + the.BulkWrite(index, body.String()) + } + +} + func (the *ESHelper) Close() { } diff --git a/models/savoirTheme.go b/models/savoirTheme.go index 515298a..bb1d7a7 100644 --- a/models/savoirTheme.go +++ b/models/savoirTheme.go @@ -40,10 +40,10 @@ type SavoirTheme struct { Labels string `json:"labels"` GroupParam interface{} `json:"groupParam"` } `json:"station"` - AcqTime string `json:"acqTime"` - TaskId string `json:"taskId"` - Data map[string]float64 `json:"data"` - State int `json:"state"` - DataEmpty bool `json:"dataEmpty"` - RawAgg bool `json:"rawAgg"` + AcqTime string `json:"acqTime"` + TaskId string `json:"taskId"` + Data map[string]any `json:"data"` + State int `json:"state"` + DataEmpty bool `json:"dataEmpty"` + RawAgg bool `json:"rawAgg"` }