diff --git a/consumers/consumerAXYES2GDJKJC.go b/consumers/consumerAXYES2GDJKJC.go new file mode 100644 index 0000000..50db175 --- /dev/null +++ b/consumers/consumerAXYES2GDJKJC.go @@ -0,0 +1,310 @@ +package consumers + +import ( + "crypto/rc4" + "encoding/hex" + "fmt" + "goInOut/adaptors" + "goInOut/consumers/HBJCAS" + "goInOut/dbOperate" + "goInOut/monitors" + "goInOut/utils" + "gopkg.in/yaml.v3" + "log" + "time" +) + +type consumerAXYES2GDJKJC struct { + //数据缓存管道 + ch chan []adaptors.NeedPush + //具体配置 + Info HBJCAS.ConfigFile + InHttp *dbOperate.HttpHelper + outHttp *dbOperate.HttpHelper + monitor *monitors.CommonMonitor + infoRedis *dbOperate.RedisHelper +} + +func (the *consumerAXYES2GDJKJC) LoadConfigJson(cfgStr string) { + err := yaml.Unmarshal([]byte(cfgStr), &the.Info) + if err != nil { + log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) + panic(err) + } +} + +func (the *consumerAXYES2GDJKJC) Initial(cfg string) error { + the.LoadConfigJson(cfg) + err := the.InputInitial() + if err != nil { + return err + } + err = the.OutputInitial() + if err != nil { + return err + } + err = the.infoComponentInitial() + return err +} +func (the *consumerAXYES2GDJKJC) InputInitial() error { + the.ch = make(chan []adaptors.NeedPush, 200) + //数据入口 + the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""} + the.monitor = &monitors.CommonMonitor{ + MonitorHelper: &monitors.MonitorHelper{}, + } + + the.monitor.Start() + for taskName, cron := range the.Info.Monitor { + switch taskName { + case "cron10min": + the.monitor.RegisterTask(cron, the.getEs10minAggData) + case "cron1hour": + the.monitor.RegisterTask(cron, the.getEs1HourAggData) + default: + log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) + } + } + + return nil +} +func (the *consumerAXYES2GDJKJC) OutputInitial() error { + //数据出口 + the.outHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.Out.Http.Url, Token: ""} + return nil +} + +func (the *consumerAXYES2GDJKJC) infoComponentInitial() error { + //数据出口 + addr := the.Info.QueryComponent.Redis.Address + the.infoRedis = dbOperate.NewRedisHelper("", addr) + return nil +} +func (the *consumerAXYES2GDJKJC) Work() { + go func() { + for { + needPushList := <-the.ch + if len(the.ch) > 0 { + log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) + } + + for _, push := range needPushList { + _, err := the.outHttp.Publish(push.Payload) + if err != nil { + log.Printf("数据推送异常=> %s", err.Error()) + return + } + } + + time.Sleep(100 * time.Millisecond) + } + }() +} + +func (the *consumerAXYES2GDJKJC) getAdaptor() (adaptor adaptors.Adaptor_AXYES_HBGL) { + + return adaptors.Adaptor_AXYES_HBGL{ + Redis: the.infoRedis, + } +} + +func (the *consumerAXYES2GDJKJC) getStructIds() []int64 { + var structIds []int64 + for strutId, _ := range the.Info.StructInfo { + structIds = append(structIds, strutId) + } + return structIds +} +func (the *consumerAXYES2GDJKJC) getEs1HourAggData() { + start, end := utils.GetTimeRangeByHour(-1) + log.Printf("查询数据时间范围 %s - %s", start, end) + hourFactorIds := []int{15, 18, 20} + structIds := the.getStructIds() + for _, structId := range structIds { + for _, factorId := range hourFactorIds { + esQuery := the.getESQueryStrByHour(structId, factorId, start, end) + auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} + esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) + + adaptor := the.getAdaptor() + adaptor.PointInfo = the.Info.PointInfo + adaptor.StructInfo = the.Info.StructInfo + needPushes := adaptor.Transform(structId, factorId, esAggResultStr) + for i := range needPushes { + needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) + } + + if len(needPushes) > 0 { + the.ch <- needPushes + } + } + } + +} + +func (the *consumerAXYES2GDJKJC) getEs10minAggData() { + //utils.GetTimeRangeBy10min() 由于振动数据实时性问题 改用一小时统一上报 + start, end := utils.GetTimeRangeByHour(-1) + log.Printf("查询10min数据时间范围 %s - %s", start, end) + factorIds := []int{28, 592} //监测因素 592 -> 结构物[5222]隧道河北承德广仁岭隧道(上行) 的加速度三项监测 + structIds := the.getStructIds() + for _, structId := range structIds { + for _, factorId := range factorIds { + esQuery := the.getESQueryStrBy10min(structId, factorId, start, end) + auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} + esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) + + adaptor := the.getAdaptor() + adaptor.PointInfo = the.Info.PointInfo + adaptor.StructInfo = the.Info.StructInfo + needPushes := adaptor.Transform(structId, factorId, esAggResultStr) + for i := range needPushes { + needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) + log.Printf("topic[%s],Payload=> %s", needPushes[i].Topic, hex.EncodeToString(needPushes[i].Payload)) + } + + if len(needPushes) > 0 { + the.ch <- needPushes + } + } + } + +} + +func (the *consumerAXYES2GDJKJC) crc16rc4(transBytes []byte) []byte { + resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes) + needRC4 := append(transBytes, resultByCrc16...) + rc4KeyStr, ok := the.Info.OtherInfo["rc4key"] + if !ok { + log.Panicf("未配置 rc4key") + } + rc4Key := []byte(rc4KeyStr) //the.RC4Key + // 加密操作 + dest1 := make([]byte, len(needRC4)) + rc4.NewCipher(rc4Key) + cipher1, _ := rc4.NewCipher(rc4Key) + cipher1.XORKeyStream(dest1, needRC4) + return dest1 +} + +func (the *consumerAXYES2GDJKJC) getESQueryStrByHour(structureId int64, factorId int, start, end string) string { + aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) + esQuery := fmt.Sprintf(` +{ + "size": 0, + "query": { + "bool": { + "must": [ + { + "term": { + "structure": { + "value": %d + } + } + }, + { + "term": { + "factor": { + "value": %d + } + } + }, + { + "range": { + "collect_time": { + "gte": "%s", + "lt": "%s" + } + } + } + ] + } + }, + "aggs": { + "groupSensor": { + "terms": { + "field": "sensor" + }, + "aggs": { + "groupDate": { + "date_histogram": { + "field": "collect_time", + "interval": "1h", + "time_zone": "Asia/Shanghai", + "min_doc_count": 1 + }, + "aggs": %s + } + } + } + } +} +`, structureId, factorId, start, end, aggSubSql) + + return esQuery +} + +func (the *consumerAXYES2GDJKJC) getESQueryStrBy10min(structureId int64, factorId int, start, end string) string { + aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) + esQuery := fmt.Sprintf(` +{ + "size": 0, + "query": { + "bool": { + "must": [ + { + "term": { + "structure": { + "value": %d + } + } + }, + { + "term": { + "factor": { + "value": %d + } + } + }, + { + "range": { + "collect_time": { + "gte": "%s", + "lte": "%s" + } + } + } + ] + } + }, + "aggs": { + "groupSensor": { + "terms": { + "field": "sensor" + }, + "aggs": { + "groupDate": { + "date_histogram": { + "field": "collect_time", + "interval": "10m", + "time_zone": "Asia/Shanghai", + "min_doc_count": 1 + }, + "aggs": %s + } + } + } + } +} +`, structureId, factorId, start, end, aggSubSql) + + return esQuery +} + +func (the *consumerAXYES2GDJKJC) getStructureId() string { + structureId, ok := the.Info.OtherInfo["structureId"] + if !ok { + log.Panicf("无法识别有效的structureId") + } + return structureId +}