package consumers import ( "crypto/rc4" "encoding/hex" "fmt" "goInOut/adaptors" "goInOut/consumers/HBJCAS" "goInOut/dbOperate" "goInOut/monitors" "goInOut/utils" "gopkg.in/yaml.v3" "log" "time" ) type consumerHBJCAS struct { //数据缓存管道 ch chan []adaptors.NeedPush //具体配置 Info HBJCAS.ConfigFile InHttp *dbOperate.HttpHelper outMqtt *dbOperate.MqttHelper monitor *monitors.CommonMonitor infoRedis *dbOperate.RedisHelper } func (the *consumerHBJCAS) LoadConfigJson(cfgStr string) { // 将 yaml 格式的数据解析到结构体中 err := yaml.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerHBJCAS) Initial(cfg string) error { the.LoadConfigJson(cfg) err := the.InputInitial() if err != nil { return err } err = the.OutputInitial() if err != nil { return err } err = the.infoComponentInitial() return err } func (the *consumerHBJCAS) InputInitial() error { the.ch = make(chan []adaptors.NeedPush, 200) //数据入口 the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""} the.monitor = &monitors.CommonMonitor{ MonitorHelper: &monitors.MonitorHelper{}, } the.monitor.Start() for taskName, cron := range the.Info.Monitor { switch taskName { case "cron10min": the.monitor.RegisterTask(cron, the.getEs10minAggData) case "cron1hour": the.monitor.RegisterTask(cron, the.getEs1HourAggData) default: log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) } } return nil } func (the *consumerHBJCAS) OutputInitial() error { //数据出口 the.outMqtt = dbOperate.MqttInitial( the.Info.IoConfig.Out.Mqtt.Host, the.Info.IoConfig.Out.Mqtt.Port, the.Info.IoConfig.Out.Mqtt.ClientId, the.Info.IoConfig.Out.Mqtt.UserName, the.Info.IoConfig.Out.Mqtt.Password, true, //按照具体项目来 "consumers/HBJCAS/ssl/cacert.pem", "consumers/HBJCAS/ssl/client-cert.pem", "consumers/HBJCAS/ssl/client-key.pem") return nil } func (the *consumerHBJCAS) infoComponentInitial() error { //数据出口 addr := the.Info.QueryComponent.Redis.Address the.infoRedis = dbOperate.NewRedisHelper("", addr) return nil } func (the *consumerHBJCAS) Work() { go func() { for { needPushList := <-the.ch if len(the.ch) > 0 { log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) } for _, push := range needPushList { if push.Topic != "" { the.outMqtt.Publish(push.Topic, push.Payload) continue } //没有标记topic 的 按照配置文件里面的推送 for _, topic := range the.Info.IoConfig.Out.Mqtt.Topics { the.outMqtt.Publish(topic, push.Payload) } } time.Sleep(100 * time.Millisecond) } }() } func (the *consumerHBJCAS) getAdaptor() (adaptor adaptors.Adaptor_AXYES_HBGL) { return adaptors.Adaptor_AXYES_HBGL{ Redis: the.infoRedis, } } func (the *consumerHBJCAS) getStructIds() []int64 { var structIds []int64 for strutId, _ := range the.Info.StructInfo { structIds = append(structIds, strutId) } return structIds } func (the *consumerHBJCAS) getEs1HourAggData() { start, end := utils.GetTimeRangeByHour(-1) log.Printf("查询数据时间范围 %s - %s", start, end) hourFactorIds := []int{15, 18, 20} structIds := the.getStructIds() for _, structId := range structIds { for _, factorId := range hourFactorIds { esQuery := the.getESQueryStrByHour(structId, factorId, start, end) auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) adaptor := the.getAdaptor() adaptor.PointInfo = the.Info.PointInfo adaptor.StructInfo = the.Info.StructInfo needPushes := adaptor.Transform(structId, factorId, esAggResultStr) for i := range needPushes { needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) } if len(needPushes) > 0 { the.ch <- needPushes } } } } func (the *consumerHBJCAS) getEs10minAggData() { //utils.GetTimeRangeBy10min() 由于振动数据实时性问题 改用一小时统一上报 start, end := utils.GetTimeRangeByHour(-1) log.Printf("查询10min数据时间范围 %s - %s", start, end) factorIds := []int{28, 592} //监测因素 592 -> 结构物[5222]隧道河北承德广仁岭隧道(上行) 的加速度三项监测 structIds := the.getStructIds() for _, structId := range structIds { for _, factorId := range factorIds { esQuery := the.getESQueryStrBy10min(structId, factorId, start, end) auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) adaptor := the.getAdaptor() adaptor.PointInfo = the.Info.PointInfo adaptor.StructInfo = the.Info.StructInfo needPushes := adaptor.Transform(structId, factorId, esAggResultStr) for i := range needPushes { needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) log.Printf("topic[%s],Payload=> %s", needPushes[i].Topic, hex.EncodeToString(needPushes[i].Payload)) } if len(needPushes) > 0 { the.ch <- needPushes } } } } func (the *consumerHBJCAS) crc16rc4(transBytes []byte) []byte { resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes) needRC4 := append(transBytes, resultByCrc16...) rc4KeyStr, ok := the.Info.OtherInfo["rc4key"] if !ok { log.Panicf("未配置 rc4key") } rc4Key := []byte(rc4KeyStr) //the.RC4Key // 加密操作 dest1 := make([]byte, len(needRC4)) rc4.NewCipher(rc4Key) cipher1, _ := rc4.NewCipher(rc4Key) cipher1.XORKeyStream(dest1, needRC4) return dest1 } func (the *consumerHBJCAS) getESQueryStrByHour(structureId int64, factorId int, start, end string) string { aggSubSql := getEsAggSubSqlByFactorId(factorId) esQuery := fmt.Sprintf(` { "size": 0, "query": { "bool": { "must": [ { "term": { "structure": { "value": %d } } }, { "term": { "factor": { "value": %d } } }, { "range": { "collect_time": { "gte": "%s", "lt": "%s" } } } ] } }, "aggs": { "groupSensor": { "terms": { "field": "sensor" }, "aggs": { "groupDate": { "date_histogram": { "field": "collect_time", "interval": "1h", "time_zone": "Asia/Shanghai", "min_doc_count": 1 }, "aggs": %s } } } } } `, structureId, factorId, start, end, aggSubSql) return esQuery } func (the *consumerHBJCAS) getESQueryStrBy10min(structureId int64, factorId int, start, end string) string { aggSubSql := getEsAggSubSqlByFactorId(factorId) esQuery := fmt.Sprintf(` { "size": 0, "query": { "bool": { "must": [ { "term": { "structure": { "value": %d } } }, { "term": { "factor": { "value": %d } } }, { "range": { "collect_time": { "gte": "%s", "lte": "%s" } } } ] } }, "aggs": { "groupSensor": { "terms": { "field": "sensor" }, "aggs": { "groupDate": { "date_histogram": { "field": "collect_time", "interval": "10m", "time_zone": "Asia/Shanghai", "min_doc_count": 1 }, "aggs": %s } } } } } `, structureId, factorId, start, end, aggSubSql) return esQuery } func getEsAggSubSqlByFactorId(factorId int) string { //桥墩倾斜 15 裂缝 18 支座位移20 桥面振动28 加速度三项监测592(承德隧道专用) //温度4 果子沟上报 subAggSQl := "" switch factorId { case 4: subAggSQl = ` { "x": { "extended_stats": { "field": "data.temperature" } } }` case 15: subAggSQl = ` { "x": { "extended_stats": { "field": "data.x" } }, "y": { "extended_stats": { "field": "data.y" } } }` case 18: subAggSQl = ` { "x": { "extended_stats": { "field": "data.crack" } } }` case 20: subAggSQl = ` { "x": { "extended_stats": { "field": "data.displacement" } } }` case 28: subAggSQl = ` { "x": { "extended_stats": { "field": "data.pv" } }, "y": { "extended_stats": { "field": "data.trms" } } }` case 592: subAggSQl = ` { "x": { "extended_stats": { "field": "data.z_acc_speed" } } }` } return subAggSQl } func (the *consumerHBJCAS) getStructureId() string { structureId, ok := the.Info.OtherInfo["structureId"] if !ok { log.Panicf("无法识别有效的structureId") } return structureId }