diff --git a/adaptors/安心云最新主题数据toE5.go b/adaptors/安心云最新主题数据toE5.go new file mode 100644 index 0000000..589bfb3 --- /dev/null +++ b/adaptors/安心云最新主题数据toE5.go @@ -0,0 +1,65 @@ +package adaptors + +import ( + "encoding/json" + "goInOut/consumers/AXYraw" + "goInOut/dbOperate" + "goInOut/models" + "log" + "time" +) + +// Adaptor_Anxinyun_LastTheme 安心云 kafka theme数据 转换 es数据 +type Adaptor_Anxinyun_LastTheme struct { + AXYraw.Info + Redis *dbOperate.RedisHelper +} + +func (the Adaptor_Anxinyun_LastTheme) Transform(topic, rawMsg string) *models.EsTheme { + theme := models.AXYSavoirTheme{} + err := json.Unmarshal([]byte(rawMsg), &theme) + if err != nil { + log.Printf("反序列化 异常 dev数据=%s", rawMsg) + return nil + } + + return the.Theme2Es(theme) +} + +func (the Adaptor_Anxinyun_LastTheme) Theme2Es(theme models.AXYSavoirTheme) *models.EsTheme { + if theme.DataEmpty || len(theme.Data) == 0 { + bs, _ := json.Marshal(theme) + log.Printf("测点[%s]数据为空 => %s", theme.Station.Name, bs) + return nil + } + + ////log.Printf("设备[%s] 数据时间 %s", iotaData.DeviceId, iotaData.TriggerTime) + //deviceInfo := the.GetDeviceInfo(iotaData.DeviceId) + // + ////查不到信息的数据 + //if deviceInfo.Name == "" { + // log.Printf("设备[%s] 无deviceInfo信息 %s", iotaData.DeviceId, iotaData.TriggerTime) + // return nil + //} + // + Atime, err := time.Parse("2006-01-02T15:04:05.000+0800", theme.AcqTime) + if err != nil { + log.Printf("安心云 测点[%s] 数据时间 %s 解析错误", theme.Station.Name, theme.AcqTime) + return nil + } + + themeData := &models.EsTheme{ + SensorName: theme.Station.Name, + FactorName: theme.Station.Factor.Name, + FactorProtoCode: theme.Station.Factor.ProtoCode, + Data: theme.Data, + FactorProtoName: theme.Station.Factor.ProtoName, + Factor: theme.Station.Factor.Id, + CollectTime: Atime, + Sensor: theme.Station.Id, + Structure: theme.Station.Structure.Id, + IotaDevice: []string{}, + CreateTime: time.Now(), + } + return themeData +} diff --git a/configFiles/config_安心云测点数据_最新同步.yaml b/configFiles/config_安心云测点数据_最新同步.yaml new file mode 100644 index 0000000..70de5e6 --- /dev/null +++ b/configFiles/config_安心云测点数据_最新同步.yaml @@ -0,0 +1,20 @@ +consumer: consumerAXYThemeToES +ioConfig: + in: + kafka: + brokers: + - 10.8.30.160:30992 + groupId: axysavoir_last + topics: + - anxinyun_theme #监听数据的的主题 + out: + es: + address: + - "http://10.8.30.150:30847" + # - "http://10.8.30.155:5601" + # - "http://10.8.30.160:30092" + index: "anxincloud_last_theme" #es存测点最后一条数据的新索引 + auth: + userName: post + password: 123 + interval: 30 #多久写一次es(秒) diff --git a/configFiles/弃用备份/config_安心云设备数据_最新同步.json b/configFiles/config_安心云设备数据_最新同步.json similarity index 100% rename from configFiles/弃用备份/config_安心云设备数据_最新同步.json rename to configFiles/config_安心云设备数据_最新同步.json diff --git a/configFiles/config_承德金隅水泥_河北矿山_四方洞子.json b/configFiles/弃用备份/config_承德金隅水泥_河北矿山_四方洞子.json similarity index 100% rename from configFiles/config_承德金隅水泥_河北矿山_四方洞子.json rename to configFiles/弃用备份/config_承德金隅水泥_河北矿山_四方洞子.json diff --git a/consumers/AXYTheme/config.go b/consumers/AXYTheme/config.go new file mode 100644 index 0000000..7816aee --- /dev/null +++ b/consumers/AXYTheme/config.go @@ -0,0 +1,19 @@ +package AXYTheme + +import "goInOut/config" + +type ConfigFile struct { + IoConfig ioConfig `yaml:"ioConfig"` + Monitor map[string]string `yaml:"monitor"` +} +type ioConfig struct { + In in `json:"in"` + Out out `json:"out"` +} +type in struct { + Kafka config.KafkaConfig `json:"kafka"` +} + +type out struct { + Es config.EsConfig `json:"es"` +} diff --git a/consumers/AXYTheme/dataModel.go b/consumers/AXYTheme/dataModel.go new file mode 100644 index 0000000..da1bffc --- /dev/null +++ b/consumers/AXYTheme/dataModel.go @@ -0,0 +1,14 @@ +package AXYTheme + +import ( + "time" +) + +type OffLineGap struct { + StructId int `json:"struct_id" db:"struct_id"` + FactorId int `json:"factor_id" db:"factor_id"` + OfflineGap int `json:"offline_gap" db:"offline_gap"` + IsOpen bool `json:"is_open" db:"is_open"` + UpdateAt time.Time `json:"update_at" db:"update_at"` + StructName string `json:"name" db:"name"` +} diff --git a/consumers/consumerAXYThemeToES.go b/consumers/consumerAXYThemeToES.go new file mode 100644 index 0000000..10c19ae --- /dev/null +++ b/consumers/consumerAXYThemeToES.go @@ -0,0 +1,157 @@ +package consumers + +import ( + "encoding/json" + "goInOut/adaptors" + "goInOut/consumers/AXYTheme" + "goInOut/dbOperate" + "goInOut/dbOperate/_kafka" + "goInOut/models" + "goInOut/monitors" + "gopkg.in/yaml.v3" + "log" + "sync" + "time" +) + +type consumerAXYThemeToES struct { + //数据缓存管道 + dataCache chan *models.EsTheme + //具体配置 + Info AXYTheme.ConfigFile + InKafka _kafka.KafkaHelper + OutEs dbOperate.ESHelper + infoPg *dbOperate.DBHelper + sinkMap sync.Map + lock sync.Mutex + logTagId int + monitor *monitors.CommonMonitor +} + +func (the *consumerAXYThemeToES) LoadConfigJson(cfgStr string) { + // 将 JSON 格式的数据解析到结构体中 + err := yaml.Unmarshal([]byte(cfgStr), &the.Info) + if err != nil { + log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) + panic(err) + } +} + +func (the *consumerAXYThemeToES) Initial(cfg string) error { + the.sinkMap = sync.Map{} + the.dataCache = make(chan *models.EsTheme, 1000) + + the.LoadConfigJson(cfg) + err := the.inputInitial() + if err != nil { + return err + } + err = the.outputInitial() + if err != nil { + return err + } + + return err +} +func (the *consumerAXYThemeToES) inputInitial() error { + //数据入口 + the.InKafka = _kafka.KafkaHelper{ + Brokers: the.Info.IoConfig.In.Kafka.Brokers, + GroupId: the.Info.IoConfig.In.Kafka.GroupId, + } + the.InKafka.Initial() + for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics { + the.InKafka.Subscribe(inTopic, the.onData) + } + + the.InKafka.Worker() + return nil +} +func (the *consumerAXYThemeToES) outputInitial() error { + //数据出口 + the.OutEs = *dbOperate.NewESHelper( + the.Info.IoConfig.Out.Es.Address, + the.Info.IoConfig.Out.Es.Auth.UserName, + the.Info.IoConfig.Out.Es.Auth.Password, + ) + + return nil +} + +func (the *consumerAXYThemeToES) sinkTask() { + intervalSec := the.Info.IoConfig.Out.Es.Interval + ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) + defer ticker.Stop() + for { + <-ticker.C + the.toSink() + } +} + +func (the *consumerAXYThemeToES) toSink() { + var themes []models.EsTheme + the.lock.Lock() + defer the.lock.Unlock() + the.sinkMap.Range(func(key, value any) bool { + if v, ok := value.(*models.EsTheme); ok { + themes = append(themes, *v) + //零时打日志用 + if v.Sensor == the.logTagId { + bs, _ := json.Marshal(v) + log.Printf("toSink -> Range 标记测点数据 [%d] %s ", the.logTagId, string(bs)) + } + return ok + } else { + log.Printf("!!! toSink -> Range 类型转换异常 [%v]", key) + } + return true + }) + if len(themes) > 0 { + index := the.Info.IoConfig.Out.Es.Index + log.Printf("写入es [%s] %d条", index, len(themes)) + the.OutEs.BulkWriteThemes2Es(index, themes) + the.sinkMap.Clear() + } +} + +func (the *consumerAXYThemeToES) Work() { + log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId) + go the.sinkTask() + go func() { + for { + pushEsTheme := <-the.dataCache + + if pushEsTheme.Sensor == the.logTagId { + bs, _ := json.Marshal(pushEsTheme) + log.Printf("存储 标记测点数据 [%d] %s ", the.logTagId, string(bs)) + } + + //有效数据存入缓存 + the.lock.Lock() + the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme) + the.lock.Unlock() + } + + }() +} +func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { + //if len(msg) > 80 { + // log.Printf("recv:[%s]:%s ...", topic, msg[:80]) + //} + adaptor := adaptors.Adaptor_Anxinyun_LastTheme{} + + needPush := adaptor.Transform(topic, msg) + + if needPush != nil && needPush.Data != nil { + the.dataCache <- needPush + } else { + s, _ := json.Marshal(needPush) + if needPush != nil { + if needPush.Sensor == the.logTagId { + log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s) + } + } + } + + return true +} diff --git a/consumers/consumerManage.go b/consumers/consumerManage.go index e8fb9df..6880c58 100644 --- a/consumers/consumerManage.go +++ b/consumers/consumerManage.go @@ -56,6 +56,9 @@ func GetConsumer(name string) (consumer IConsumer) { case "consumerAxySkAlarm": consumer = new(consumerAxySkAlarm) + case "consumerAXYThemeToES": + consumer = new(consumerAXYThemeToES) + default: consumer = nil } diff --git a/models/axySavoirTheme.go b/models/axySavoirTheme.go new file mode 100644 index 0000000..2fd2ae9 --- /dev/null +++ b/models/axySavoirTheme.go @@ -0,0 +1,37 @@ +package models + +type AXYSavoirTheme struct { + Station struct { + Name string `json:"name"` + Id int `json:"id"` + Structure Structure `json:"structure"` + Factor struct { + Id int `json:"id"` + Name string `json:"name"` + ProtoCode string `json:"protoCode"` + ProtoName string `json:"protoName"` + Items []struct { + Id int `json:"id"` + Name string `json:"name"` + FieldName string `json:"field_name"` + UnitName string `json:"unit_name"` + Precision interface{} `json:"precision"` + } `json:"items"` + Units struct { + Strain string `json:"strain"` + } `json:"units"` + } `json:"factor"` + ManualData bool `json:"manual_data"` + Formula interface{} `json:"formula"` + Params interface{} `json:"params"` + Group interface{} `json:"group"` + Labels string `json:"labels"` + GroupParam interface{} `json:"groupParam"` + } `json:"station"` + AcqTime string `json:"acqTime"` + TaskId string `json:"taskId"` + Data map[string]any `json:"data"` + State int `json:"state"` + DataEmpty bool `json:"dataEmpty"` + RawAgg bool `json:"rawAgg"` +}