package consumers import ( "crypto/rc4" "encoding/hex" "fmt" "goInOut/adaptors" "goInOut/consumers/HBJCAS" "goInOut/dbOperate" "goInOut/monitors" "goInOut/utils" "gopkg.in/yaml.v3" "log" "time" ) type consumerGZG2ZJHL struct { //数据缓存管道 ch chan []adaptors.NeedPush //具体配置 Info HBJCAS.ConfigFile InHttp *dbOperate.HttpHelper outMqtt *dbOperate.MqttHelper monitor *monitors.CommonMonitor infoRedis *dbOperate.RedisHelper } func (the *consumerGZG2ZJHL) LoadConfigJson(cfgStr string) { // 将 yaml 格式的数据解析到结构体中 err := yaml.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerGZG2ZJHL) Initial(cfg string) error { the.LoadConfigJson(cfg) err := the.InputInitial() if err != nil { return err } err = the.OutputInitial() if err != nil { return err } err = the.infoComponentInitial() return err } func (the *consumerGZG2ZJHL) InputInitial() error { the.ch = make(chan []adaptors.NeedPush, 200) //数据入口 the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""} the.monitor = &monitors.CommonMonitor{ MonitorHelper: &monitors.MonitorHelper{}, } the.monitor.Start() for taskName, cron := range the.Info.Monitor { switch taskName { case "cron10min": //the.monitor.RegisterTask(cron, the.getEs10minAggData) case "cron1hour": the.monitor.RegisterTask(cron, the.getEs1HourAggData) default: log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) } } return nil } func (the *consumerGZG2ZJHL) OutputInitial() error { //数据出口 the.outMqtt = dbOperate.MqttInitial( the.Info.IoConfig.Out.Mqtt.Host, the.Info.IoConfig.Out.Mqtt.Port, the.Info.IoConfig.Out.Mqtt.ClientId, the.Info.IoConfig.Out.Mqtt.UserName, the.Info.IoConfig.Out.Mqtt.Password, false, //按照具体项目来 "") return nil } func (the *consumerGZG2ZJHL) infoComponentInitial() error { //数据出口 addr := the.Info.QueryComponent.Redis.Address the.infoRedis = dbOperate.NewRedisHelper("", addr) return nil } func (the *consumerGZG2ZJHL) Work() { go func() { for { needPushList := <-the.ch if len(the.ch) > 0 { log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) } for _, push := range needPushList { if push.Topic != "" { the.outMqtt.Publish(push.Topic, push.Payload) continue } //没有标记topic 的 按照配置文件里面的推送 for _, topic := range the.Info.IoConfig.Out.Mqtt.Topics { the.outMqtt.Publish(topic, push.Payload) } } time.Sleep(100 * time.Millisecond) } }() } func (the *consumerGZG2ZJHL) getAdaptor() (adaptor adaptors.Adaptor_ZWYES_ZJHL) { return adaptors.Adaptor_ZWYES_ZJHL{ Redis: the.infoRedis, } } func (the *consumerGZG2ZJHL) getStructIds() []int64 { var structIds []int64 for strutId, _ := range the.Info.StructInfo { structIds = append(structIds, strutId) } return structIds } func (the *consumerGZG2ZJHL) getEs1HourAggData() { start, end := utils.GetTimeRangeByHour(-5) log.Printf("查询数据时间范围 %s - %s", start, end) //hourFactorIds := []int{883, 2, 7, 935, 4, 6, 11} hourFactorIds := []int{935} //湿度883 -> 要当作温湿度处理,合并温度监测中的3个有相同label的测点数据,组合成虚拟温湿度数据 //温湿度2(要温度和湿度数据拼接), 源于温度测点和湿度测点 特殊的3个label测点算温湿度 6500030003,6500030004,6500030005 //Gnss 935 ,4(温度),6索力, 15, 18, 20 structIds := the.getStructIds() for _, structId := range structIds { for _, factorId := range hourFactorIds { esQuery := the.getESQueryStrByHour(structId, factorId, start, end) auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) adaptor := the.getAdaptor() adaptor.PointInfo = the.Info.PointInfo adaptor.StructInfo = the.Info.StructInfo needPushes := adaptor.Transform(structId, factorId, esAggResultStr) for i := range needPushes { needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) println("mqtt 推送报文=", hex.EncodeToString(needPushes[i].Payload)) } if len(needPushes) > 0 { the.ch <- needPushes } } } } func (the *consumerGZG2ZJHL) getEs10minAggData() { //utils.GetTimeRangeBy10min() 由于振动数据实时性问题 改用一小时统一上报 start, end := utils.GetTimeRangeByHour(-1) log.Printf("查询10min数据时间范围 %s - %s", start, end) factorIds := []int{28, 592} //监测因素 592 -> 结构物[5222]隧道河北承德广仁岭隧道(上行) 的加速度三项监测 structIds := the.getStructIds() for _, structId := range structIds { for _, factorId := range factorIds { esQuery := the.getESQueryStrBy10min(structId, factorId, start, end) auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) adaptor := the.getAdaptor() adaptor.PointInfo = the.Info.PointInfo adaptor.StructInfo = the.Info.StructInfo needPushes := adaptor.Transform(structId, factorId, esAggResultStr) for i := range needPushes { needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) log.Printf("topic[%s],Payload=> %s", needPushes[i].Topic, hex.EncodeToString(needPushes[i].Payload)) } if len(needPushes) > 0 { the.ch <- needPushes } } } } func (the *consumerGZG2ZJHL) crc16rc4(transBytes []byte) []byte { resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes) needRC4 := append(transBytes, resultByCrc16...) rc4KeyStr, ok := the.Info.OtherInfo["rc4key"] if !ok { log.Panicf("未配置 rc4key") } rc4Key := []byte(rc4KeyStr) //the.RC4Key // 加密操作 dest1 := make([]byte, len(needRC4)) rc4.NewCipher(rc4Key) cipher1, _ := rc4.NewCipher(rc4Key) cipher1.XORKeyStream(dest1, needRC4) return dest1 } func (the *consumerGZG2ZJHL) getESQueryStrByHour(structureId int64, factorId int, start, end string) string { aggSubSql := getEsAggSubSqlByZwyFactorId(factorId) esQuery := fmt.Sprintf(` { "size": 0, "query": { "bool": { "must": [ { "term": { "structure": { "value": %d } } }, { "term": { "factor": { "value": %d } } }, { "range": { "collect_time": { "gte": "%s", "lt": "%s" } } } ] } }, "aggs": { "groupSensor": { "terms": { "field": "sensor" }, "aggs": { "groupDate": { "date_histogram": { "field": "collect_time", "interval": "1h", "time_zone": "Asia/Shanghai", "min_doc_count": 1 }, "aggs": %s } } } } } `, structureId, factorId, start, end, aggSubSql) return esQuery } func (the *consumerGZG2ZJHL) getESQueryStrBy10min(structureId int64, factorId int, start, end string) string { aggSubSql := getEsAggSubSqlByZwyFactorId(factorId) esQuery := fmt.Sprintf(` { "size": 0, "query": { "bool": { "must": [ { "term": { "structure": { "value": %d } } }, { "term": { "factor": { "value": %d } } }, { "range": { "collect_time": { "gte": "%s", "lte": "%s" } } } ] } }, "aggs": { "groupSensor": { "terms": { "field": "sensor" }, "aggs": { "groupDate": { "date_histogram": { "field": "collect_time", "interval": "10m", "time_zone": "Asia/Shanghai", "min_doc_count": 1 }, "aggs": %s } } } } } `, structureId, factorId, start, end, aggSubSql) return esQuery } func getEsAggSubSqlByZwyFactorId(factorId int) string { //桥墩倾斜 15 裂缝 18 支座位移20 挠度19 桥面振动28 Gnss935 subAggSQl := "" switch factorId { case 4: //结构温度 subAggSQl = ` { "x": { "extended_stats": { "field": "data.temperature" } } }` case 6: // 索力 subAggSQl = ` { "x": { "extended_stats": { "field": "data.cableForce" } } }` case 7: // 称重 车载 subAggSQl = ` { "x": { "extended_stats": { "field": "data.load" } }, "y": { "extended_stats": { "field": "data.overload" } } }` case 11: //应变 subAggSQl = ` { "x": { "extended_stats": { "field": "data.strain" } } }` case 15: subAggSQl = ` { "x": { "extended_stats": { "field": "data.x" } }, "y": { "extended_stats": { "field": "data.y" } } }` case 18: subAggSQl = ` { "x": { "extended_stats": { "field": "data.crack" } } }` case 19: subAggSQl = ` { "x": { "extended_stats": { "field": "data.deflection" } } }` case 20, 24: subAggSQl = ` { "x": { "extended_stats": { "field": "data.displacement" } } }` case 28: subAggSQl = ` { "x": { "extended_stats": { "field": "data.pv" } }, "y": { "extended_stats": { "field": "data.trms" } } }` case 883: //湿度 (后期需要合并3个温度 拼成 温湿度) subAggSQl = ` { "x": { "extended_stats": { "field": "data.humidity" } } }` case 935: subAggSQl = ` { "x": { "extended_stats": { "field": "data.x" } }, "y": { "extended_stats": { "field": "data.y" } }, "z": { "extended_stats": { "field": "data.z" } } }` } return subAggSQl } func (the *consumerGZG2ZJHL) getStructureId() string { structureId, ok := the.Info.OtherInfo["structureId"] if !ok { log.Panicf("无法识别有效的structureId") } return structureId }