package consumers import ( "crypto/md5" "encoding/json" "fmt" "goUpload/adaptors" "goUpload/consumers/GZGZM" "goUpload/dbHelper" "goUpload/dbHelper/_kafka" "io" "log" "math/rand" "time" ) type consumerGZGZM struct { //数据缓存管道 dataCache chan []byte //具体配置 ConfigInfo GZGZM.ConfigFile InKafka _kafka.KafkaHelper OutHttp *dbHelper.HttpHelper } func (the *consumerGZGZM) RegisterPoint(thirdId string, sensorInfo GZGZM.SensorInfo) error { registerUrl := "http://sit.gibs.deagluo.top:32001/openApi/gzm_v1/monitorPoint/create" r := GZGZM.RegisterPointBody{ ThirdId: thirdId, BranchCode: sensorInfo.BranchCode, Name: sensorInfo.Name, Code: sensorInfo.Code, Type: sensorInfo.Type, AlarmValue1: 0, AlarmValue2: 0, AlarmValue3: 0, WarnValue1: 0, WarnValue2: 0, WarnValue3: 0, AlarmCount: 0, WarnCount: 0, InitAmount1: sensorInfo.InitAmount1, InitAmount2: sensorInfo.InitAmount2, InitAmount3: sensorInfo.InitAmount3, Unit: sensorInfo.Unit, IsClose: 0, } the.OutHttp.Url = registerUrl body, _ := json.Marshal(r) resp, err := the.OutHttp.PublishWithHeader(body, map[string]string{"Authorization": the.OutHttp.Token}) if err != nil { log.Printf(" 注册测点异常 err=%v \n,resp=%s", err, resp) } return err } func (the *consumerGZGZM) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := json.Unmarshal([]byte(cfgStr), &the.ConfigInfo) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerGZGZM) Initial(cfg string) error { the.dataCache = make(chan []byte, 200) the.LoadConfigJson(cfg) err := the.inputInitial() if err != nil { return err } err = the.outputInitial() return err } func (the *consumerGZGZM) inputInitial() error { //数据入口 the.InKafka = _kafka.KafkaHelper{ Brokers: the.ConfigInfo.IoConfig.In.Kafka.Brokers, GroupId: the.ConfigInfo.IoConfig.In.Kafka.GroupId, } the.InKafka.Initial() for _, inTopic := range the.ConfigInfo.IoConfig.In.Kafka.Topics { the.InKafka.Subscribe(inTopic, the.onData) } the.InKafka.Worker() return nil } func (the *consumerGZGZM) outputInitial() error { //数据出口 the.OutHttp = &dbHelper.HttpHelper{ Url: the.ConfigInfo.IoConfig.Out.Http.Url, } the.OutHttp.Initial() //判断有无token if the.ConfigInfo.IoConfig.Out.Http.Token.Static != "" { //静态token the.OutHttp.Token = the.ConfigInfo.IoConfig.Out.Http.Token.Static } else { //需要动态获取的token if the.ConfigInfo.IoConfig.Out.Http.Token.Url != "" { go the.RefreshTask() } } return nil } func (the *consumerGZGZM) RefreshTask() { the.tokenRefresh() ticker := time.NewTicker(24 * time.Hour) defer ticker.Stop() for true { <-ticker.C the.tokenRefresh() } } func (the *consumerGZGZM) tokenRefresh() { url := the.ConfigInfo.IoConfig.Out.Http.Token.Url appKey := the.ConfigInfo.IoConfig.Out.Http.Token.AppKey appSecret := the.ConfigInfo.IoConfig.Out.Http.Token.AppSecret randomStr16 := string(Krand(16, KC_RAND_KIND_NUM)) timestamp := time.Now().UnixMilli() secretRaw := fmt.Sprintf("%s%s%s%d", appKey, appSecret, randomStr16, timestamp) h := md5.New() io.WriteString(h, secretRaw) secret := fmt.Sprintf("%x", h.Sum(nil)) queryUrl := url + "?" + fmt.Sprintf("&appKey=%s", appKey) + fmt.Sprintf("&random=%s", randomStr16) + fmt.Sprintf("×tamp=%d", timestamp) + fmt.Sprintf("&secret=%s", secret) tokenHttp := &dbHelper.HttpHelper{ Url: queryUrl, } tokenHttp.Initial() tokenResp := tokenHttp.HttpGet("") tokenBody := GZGZM.TokenBody{} err := json.Unmarshal([]byte(tokenResp), &tokenBody) if err != nil { log.Printf("解析异常 err=%s,body=%s", err.Error(), tokenResp) return } if tokenBody.Code == 200 { the.OutHttp.Token = tokenBody.Data log.Printf("刷新成功,token=%s", the.OutHttp.Token) } } func (the *consumerGZGZM) Work() { go func() { for { pushBytes := <-the.dataCache log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache)) log.Printf("推送[%v]: len=%d", the.OutHttp.Url, len(pushBytes)) the.OutHttp.PublishWithHeader(pushBytes, map[string]string{"Authorization": the.OutHttp.Token}) time.Sleep(10 * time.Millisecond) } }() } func (the *consumerGZGZM) onData(topic string, msg string) bool { if len(msg) > 80 { log.Printf("recv:[%s]:%s ...", topic, msg[:80]) } adaptor := the.getAdaptor() if adaptor != nil { needPush := adaptor.Transform(topic, msg) if len(needPush) > 0 { the.dataCache <- needPush } } return true } func (the *consumerGZGZM) getAdaptor() (adaptor adaptors.IAdaptor3) { return adaptors.Adaptor_THEME_GZGZM{ GZGZM.SensorConfig{ SensorInfoMap: the.ConfigInfo.SensorInfoMap, }, } } // 随机字符串 func Krand(size int, kind int) []byte { ikind, kinds, result := kind, [][]int{[]int{10, 48}, []int{26, 97}, []int{26, 65}}, make([]byte, size) is_all := kind > 2 || kind < 0 //rand.Seed(time.Now().UnixNano()) for i := 0; i < size; i++ { if is_all { // random ikind ikind = rand.Intn(3) } scope, base := kinds[ikind][0], kinds[ikind][1] result[i] = uint8(base + rand.Intn(scope)) } return result } const ( KC_RAND_KIND_NUM = 0 // 纯数字 KC_RAND_KIND_LOWER = 1 // 小写字母 KC_RAND_KIND_UPPER = 2 // 大写字母 KC_RAND_KIND_ALL = 3 // 数字、大小写字母 )