package consumers import ( "encoding/json" "goUpload/adaptors" "goUpload/consumers/AXYraw" "goUpload/dbHelper" "goUpload/dbHelper/_kafka" "log" "time" ) type consumerAXYraw struct { //数据缓存管道 dataCache chan []byte //具体配置 ConfigInfo AXYraw.ConfigFile InKafka _kafka.KafkaHelper OutEs dbHelper.ESHelper infoRedis *dbHelper.RedisHelper } func (the *consumerAXYraw) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := json.Unmarshal([]byte(cfgStr), &the.ConfigInfo) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerAXYraw) Initial(cfg string) error { the.dataCache = make(chan []byte, 200) the.LoadConfigJson(cfg) err := the.inputInitial() if err != nil { return err } err = the.outputInitial() if err != nil { return err } err = the.infoComponentInitial() return err } func (the *consumerAXYraw) inputInitial() error { //数据入口 the.InKafka = _kafka.KafkaHelper{ Brokers: the.ConfigInfo.IoConfig.In.Kafka.Brokers, GroupId: the.ConfigInfo.IoConfig.In.Kafka.GroupId, } the.InKafka.Initial() for _, inTopic := range the.ConfigInfo.IoConfig.In.Kafka.Topics { the.InKafka.Subscribe(inTopic, the.onData) } the.InKafka.Worker() return nil } func (the *consumerAXYraw) outputInitial() error { //数据出口 the.OutEs = *dbHelper.NewESHelper( the.ConfigInfo.IoConfig.Out.Es.Address, the.ConfigInfo.IoConfig.Out.Es.Auth.UserName, the.ConfigInfo.IoConfig.Out.Es.Auth.Password, ) return nil } func (the *consumerAXYraw) infoComponentInitial() error { //数据出口 addr := the.ConfigInfo.Info.QueryComponent.Redis.Address the.infoRedis = dbHelper.NewRedisHelper("", addr) return nil } func (the *consumerAXYraw) RefreshTask() { the.tokenRefresh() ticker := time.NewTicker(24 * time.Hour) defer ticker.Stop() for true { <-ticker.C the.tokenRefresh() } } func (the *consumerAXYraw) tokenRefresh() { } func (the *consumerAXYraw) Work() { go func() { for { pushBytes := <-the.dataCache log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache)) log.Printf("推送[%v]: len=%d", "OutEs", len(pushBytes)) //the.OutEs.PublishWithHeader(pushBytes, map[string]string{"Authorization": the.OutEs.Token}) time.Sleep(10 * time.Millisecond) } }() } func (the *consumerAXYraw) onData(topic string, msg string) bool { if len(msg) > 80 { log.Printf("recv:[%s]:%s ...", topic, msg[:80]) } adaptor := the.getAdaptor() if adaptor != nil { needPush := adaptor.Transform(topic, msg) if len(needPush) > 0 { the.dataCache <- needPush } } return true } func (the *consumerAXYraw) getAdaptor() (adaptor adaptors.IAdaptor3) { adaptor = adaptors.Adaptor_AXY_LastRAW{ Redis: the.infoRedis, } return adaptor }