package consumers import ( "encoding/json" "goInOut/adaptors" "goInOut/consumers/AXYraw" "goInOut/dbHelper" "goInOut/dbHelper/_kafka" "goInOut/models" "log" "sync" "time" ) type consumerAXYraw struct { //数据缓存管道 dataCache chan *models.EsRaw //具体配置 ConfigInfo AXYraw.ConfigFile InKafka _kafka.KafkaHelper OutEs dbHelper.ESHelper infoRedis *dbHelper.RedisHelper sinkRawMap sync.Map lock sync.Mutex } func (the *consumerAXYraw) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := json.Unmarshal([]byte(cfgStr), &the.ConfigInfo) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerAXYraw) Initial(cfg string) error { the.sinkRawMap = sync.Map{} the.dataCache = make(chan *models.EsRaw, 200) the.LoadConfigJson(cfg) err := the.inputInitial() if err != nil { return err } err = the.outputInitial() if err != nil { return err } err = the.infoComponentInitial() return err } func (the *consumerAXYraw) inputInitial() error { //数据入口 the.InKafka = _kafka.KafkaHelper{ Brokers: the.ConfigInfo.IoConfig.In.Kafka.Brokers, GroupId: the.ConfigInfo.IoConfig.In.Kafka.GroupId, } the.InKafka.Initial() for _, inTopic := range the.ConfigInfo.IoConfig.In.Kafka.Topics { the.InKafka.Subscribe(inTopic, the.onData) } the.InKafka.Worker() return nil } func (the *consumerAXYraw) outputInitial() error { //数据出口 the.OutEs = *dbHelper.NewESHelper( the.ConfigInfo.IoConfig.Out.Es.Address, the.ConfigInfo.IoConfig.Out.Es.Auth.UserName, the.ConfigInfo.IoConfig.Out.Es.Auth.Password, ) return nil } func (the *consumerAXYraw) infoComponentInitial() error { //数据出口 addr := the.ConfigInfo.Info.QueryComponent.Redis.Address the.infoRedis = dbHelper.NewRedisHelper("", addr) return nil } func (the *consumerAXYraw) sinkTask() { intervalSec := the.ConfigInfo.IoConfig.Out.Es.Interval ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) defer ticker.Stop() for { <-ticker.C the.toSink() } } func (the *consumerAXYraw) toSink() { var raws []models.EsRaw the.lock.Lock() defer the.lock.Unlock() the.sinkRawMap.Range(func(key, value any) bool { if v, ok := value.(*models.EsRaw); ok { raws = append(raws, *v) //零时打日志用 if v.IotaDevice == logTagDeviceId { bs, _ := json.Marshal(v) log.Printf("toSink -> Range 标记设备数据 [%s] %s ", logTagDeviceId, string(bs)) } return ok } else { log.Printf("!!! toSink -> Range 类型转换异常 [%v]", key) } return true }) if len(raws) > 0 { log.Printf("准备写入es %d条", len(raws)) index := the.ConfigInfo.IoConfig.Out.Es.Index the.OutEs.BulkWriteRaws2Es(index, raws) the.sinkRawMap.Clear() } } const logTagDeviceId = "91da4d1f-fbc7-4dad-bedd-f8ff05c0e0e0" func (the *consumerAXYraw) Work() { go the.sinkTask() go func() { for { pushEsRaw := <-the.dataCache if pushEsRaw.IotaDevice == logTagDeviceId { bs, _ := json.Marshal(pushEsRaw) log.Printf("存储 标记设备数据 [%s] %s ", logTagDeviceId, string(bs)) } //有效数据存入缓存 the.lock.Lock() the.sinkRawMap.Store(pushEsRaw.IotaDevice, pushEsRaw) the.lock.Unlock() } }() } func (the *consumerAXYraw) onData(topic string, msg string) bool { //if len(msg) > 80 { // log.Printf("recv:[%s]:%s ...", topic, msg[:80]) //} adaptor := adaptors.Adaptor_AXY_LastRAW{ Redis: the.infoRedis, } needPush := adaptor.Transform(topic, msg) if needPush != nil && len(needPush.Meta) > 0 && needPush.Data != nil { //日志标记 if needPush.IotaDevice == logTagDeviceId { bs, _ := json.Marshal(needPush) log.Printf("onData -> needPush 标记设备数据 [%s] %s ", logTagDeviceId, string(bs)) } the.dataCache <- needPush } return true }