package consumers import ( "crypto/rc4" "fmt" "goInOut/adaptors" "goInOut/consumers/HBJCAS" "goInOut/dbOperate" "goInOut/monitors" "goInOut/utils" "gopkg.in/yaml.v3" "log" "time" ) type consumerJYESNJZX struct { //数据缓存管道 ch chan []adaptors.NeedPush //具体配置 Info HBJCAS.ConfigFile InHttp *dbOperate.HttpHelper outMqtt *dbOperate.MqttHelper monitor *monitors.CommonMonitor infoRedis *dbOperate.RedisHelper } func (the *consumerJYESNJZX) LoadConfig(cfgStr string) { // 将 yaml 格式的数据解析到结构体中 err := yaml.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerJYESNJZX) Initial(cfg string) error { the.LoadConfig(cfg) err := the.InputInitial() if err != nil { return err } err = the.OutputInitial() if err != nil { return err } err = the.infoComponentInitial() return err } func (the *consumerJYESNJZX) InputInitial() error { the.ch = make(chan []adaptors.NeedPush, 200) //数据入口 the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""} the.monitor = &monitors.CommonMonitor{ MonitorHelper: &monitors.MonitorHelper{}, } the.monitor.Start() for _, cron := range the.Info.Monitor { the.monitor.RegisterTask(cron, the.getEsAggData) } return nil } func (the *consumerJYESNJZX) OutputInitial() error { //数据出口 the.outMqtt = dbOperate.MqttInitial( the.Info.IoConfig.Out.Mqtt.Host, the.Info.IoConfig.Out.Mqtt.Port, the.Info.IoConfig.Out.Mqtt.ClientId, the.Info.IoConfig.Out.Mqtt.UserName, the.Info.IoConfig.Out.Mqtt.Password, false, //按照具体项目来 "") return nil } func (the *consumerJYESNJZX) infoComponentInitial() error { //数据出口 addr := the.Info.QueryComponent.Redis.Address the.infoRedis = dbOperate.NewRedisHelper("", addr) return nil } func (the *consumerJYESNJZX) getEsAggData() { start, end := utils.GetTimeRangeBy1minByOffset(-5) //取向前偏移5分钟的一分钟的数据 log.Printf("查询数据时间范围 %s - %s", start, end) factorIds := []int{2, 4, 18, 103, 102, 96, 107, 156, 578} //监测因素 2温湿度 4温度 18裂缝检测 103净空收敛 102拱顶沉降 96二次衬彻应变 107道床及拱腰结构沉降 156风速 578风向 //架伊尔大桥的结构物id var structId int64 for strutId, _ := range the.Info.StructInfo { structId = strutId } adaptor := the.getAdaptor() adaptor.PointInfo = the.Info.PointInfo adaptor.StructInfo = the.Info.StructInfo for _, factorId := range factorIds { esQuery := the.getESQueryStr(structId, factorId, start, end) //log.Printf("esQuery[%s]", esQuery) auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) log.Printf("esAggResultStr[%s]", esAggResultStr) needPushes := adaptor.Transform(structId, factorId, esAggResultStr) if len(needPushes) > 0 { the.ch <- needPushes } } } func (the *consumerJYESNJZX) getAdaptor() (adaptor adaptors.Adaptor_AXYES_NJZX) { return adaptors.Adaptor_AXYES_NJZX{ Redis: the.infoRedis, } } func (the *consumerJYESNJZX) crc16rc4(transBytes []byte) []byte { resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes) needRC4 := append(transBytes, resultByCrc16...) rc4KeyStr, ok := the.Info.OtherInfo["rc4key"] if !ok { log.Panicf("未配置 rc4key") } rc4Key := []byte(rc4KeyStr) //the.RC4Key // 加密操作 dest1 := make([]byte, len(needRC4)) rc4.NewCipher(rc4Key) cipher1, _ := rc4.NewCipher(rc4Key) cipher1.XORKeyStream(dest1, needRC4) return dest1 } func (the *consumerJYESNJZX) getESQueryStr(structureId int64, factorId int, start, end string) string { esQuery := fmt.Sprintf(` { "size": 20, "query": { "bool": { "must": [ { "term": { "structure": { "value": %d } } }, { "term": { "factor": { "value": %d } } }, { "range": { "collect_time": { "gte": "%s", "lte": "%s" } } } ] } } } `, structureId, factorId, start, end) return esQuery } func (the *consumerJYESNJZX) Work() { go func() { for { needPushList := <-the.ch if len(the.ch) > 0 { log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) } for _, push := range needPushList { if push.Topic != "" { the.outMqtt.Publish(push.Topic, push.Payload) continue } //没有标记topic 的 按照配置文件里面的推送 for _, topic := range the.Info.IoConfig.Out.Mqtt.Topics { the.outMqtt.Publish(topic, push.Payload) } } time.Sleep(100 * time.Millisecond) } }() }