package consumers import ( "crypto/hmac" "crypto/rc4" "encoding/base64" "encoding/hex" "encoding/json" "fmt" "github.com/tjfoc/gmsm/sm3" "goInOut/adaptors" "goInOut/consumers/HBJCAS" "goInOut/dbOperate" "goInOut/monitors" "goInOut/utils" "gopkg.in/yaml.v3" "io" "io/ioutil" "log" "net/http" "strings" "time" ) type consumerZWYHBJCAS struct { //数据缓存管道 ch chan []adaptors.NeedPush //具体配置 Info HBJCAS.ConfigFile Seq int64 SeqDate string InHttp *dbOperate.HttpHelper outMqtt *dbOperate.MqttHelper monitor *monitors.CommonMonitor infoRedis *dbOperate.RedisHelper } func (the *consumerZWYHBJCAS) LoadConfigJson(cfgStr string) { // 将 yaml 格式的数据解析到结构体中 err := yaml.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerZWYHBJCAS) Initial(cfg string) error { the.LoadConfigJson(cfg) err := the.InputInitial() if err != nil { return err } err = the.OutputInitial() if err != nil { return err } err = the.infoComponentInitial() return err } func (the *consumerZWYHBJCAS) InputInitial() error { the.ch = make(chan []adaptors.NeedPush, 200) //数据入口 the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""} the.monitor = &monitors.CommonMonitor{ MonitorHelper: &monitors.MonitorHelper{}, } the.Seq = 0 the.SeqDate = time.Now().Format("2006-01-02") the.monitor.Start() for taskName, cron := range the.Info.Monitor { switch taskName { case "cron10min": the.monitor.RegisterTask(cron, the.getEs10minAggData) case "cron1hour": the.monitor.RegisterTask(cron, the.getEs1HourAggData) case "camera1hour": the.monitor.RegisterTask(cron, the.UploadCamInfo) case "health24hour": the.monitor.RegisterTask(cron, the.UploadHeaInfo) default: log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) } } return nil } func (the *consumerZWYHBJCAS) OutputInitial() error { //数据出口 the.outMqtt = dbOperate.MqttInitial( the.Info.IoConfig.Out.Mqtt.Host, the.Info.IoConfig.Out.Mqtt.Port, the.Info.IoConfig.Out.Mqtt.ClientId, the.Info.IoConfig.Out.Mqtt.UserName, the.Info.IoConfig.Out.Mqtt.Password, true, //按照具体项目来 "consumers/HBJCAS/ssl/cacert.pem", "consumers/HBJCAS/ssl/client-cert.pem", "consumers/HBJCAS/ssl/client-key.pem") return nil } func (the *consumerZWYHBJCAS) infoComponentInitial() error { //数据出口 addr := the.Info.QueryComponent.Redis.Address the.infoRedis = dbOperate.NewRedisHelper("", addr) return nil } func (the *consumerZWYHBJCAS) Work() { go func() { for { needPushList := <-the.ch if len(the.ch) > 0 { log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) } for _, push := range needPushList { if push.Topic != "" { the.outMqtt.Publish(push.Topic, push.Payload) continue } //没有标记topic 的 按照配置文件里面的推送 for _, topic := range the.Info.IoConfig.Out.Mqtt.Topics { the.outMqtt.Publish(topic, push.Payload) } } time.Sleep(100 * time.Millisecond) } }() go func() { if the.Info.HttpServer != "" { log.Printf("打开本地http接口服务[%s]\n", the.Info.HttpServer) the.StartHttp() } }() } func (the *consumerZWYHBJCAS) getAdaptor() (adaptor adaptors.Adaptor_ZWYES_HBGL) { return adaptors.Adaptor_ZWYES_HBGL{ Redis: the.infoRedis, } } func (the *consumerZWYHBJCAS) getStructIds() []int64 { var structIds []int64 for strutId, _ := range the.Info.StructInfo { structIds = append(structIds, strutId) } return structIds } func (the *consumerZWYHBJCAS) getEs1HourAggData() { start, end := utils.GetTimeRangeByHour(-1) log.Printf("查询数据时间范围 %s - %s", start, end) hourFactorIds := []int{15, 18, 20} structIds := the.getStructIds() for _, structId := range structIds { for _, factorId := range hourFactorIds { esQuery := the.getESQueryStrByHour(structId, factorId, start, end) auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) adaptor := the.getAdaptor() adaptor.PointInfo = the.Info.PointInfo adaptor.StructInfo = the.Info.StructInfo needPushes := adaptor.Transform(structId, factorId, esAggResultStr) for i := range needPushes { needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) } if len(needPushes) > 0 { the.ch <- needPushes } } } } func (the *consumerZWYHBJCAS) getEs10minAggData() { //utils.GetTimeRangeBy10min() 由于振动数据实时性问题 改用一小时统一上报 start, end := utils.GetTimeRangeByHour(-1) log.Printf("查询10min数据时间范围 %s - %s", start, end) factorIds := []int{28, 592} structIds := the.getStructIds() for _, structId := range structIds { for _, factorId := range factorIds { esQuery := the.getESQueryStrBy10min(structId, factorId, start, end) auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) adaptor := the.getAdaptor() adaptor.PointInfo = the.Info.PointInfo adaptor.StructInfo = the.Info.StructInfo needPushes := adaptor.Transform(structId, factorId, esAggResultStr) for i := range needPushes { needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) log.Printf("topic[%s],Payload=> %s", needPushes[i].Topic, hex.EncodeToString(needPushes[i].Payload)) } if len(needPushes) > 0 { the.ch <- needPushes } } } } func (the *consumerZWYHBJCAS) crc16rc4(transBytes []byte) []byte { resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes) needRC4 := append(transBytes, resultByCrc16...) rc4KeyStr, ok := the.Info.OtherInfo["rc4key"] if !ok { log.Panicf("未配置 rc4key") } rc4Key := []byte(rc4KeyStr) //the.RC4Key // 加密操作 dest1 := make([]byte, len(needRC4)) rc4.NewCipher(rc4Key) cipher1, _ := rc4.NewCipher(rc4Key) cipher1.XORKeyStream(dest1, needRC4) return dest1 } func (the *consumerZWYHBJCAS) getESQueryStrByHour(structureId int64, factorId int, start, end string) string { aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) esQuery := fmt.Sprintf(` { "size": 0, "query": { "bool": { "must": [ { "term": { "structure": { "value": %d } } }, { "term": { "factor": { "value": %d } } }, { "range": { "collect_time": { "gte": "%s", "lt": "%s" } } } ] } }, "aggs": { "groupSensor": { "terms": { "field": "sensor" }, "aggs": { "groupDate": { "date_histogram": { "field": "collect_time", "interval": "1h", "time_zone": "Asia/Shanghai", "min_doc_count": 1 }, "aggs": %s } } } } } `, structureId, factorId, start, end, aggSubSql) return esQuery } func (the *consumerZWYHBJCAS) getESQueryStrBy10min(structureId int64, factorId int, start, end string) string { aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) esQuery := fmt.Sprintf(` { "size": 0, "query": { "bool": { "must": [ { "term": { "structure": { "value": %d } } }, { "term": { "factor": { "value": %d } } }, { "range": { "collect_time": { "gte": "%s", "lte": "%s" } } } ] } }, "aggs": { "groupSensor": { "terms": { "field": "sensor" }, "aggs": { "groupDate": { "date_histogram": { "field": "collect_time", "interval": "10m", "time_zone": "Asia/Shanghai", "min_doc_count": 1 }, "aggs": %s } } } } } `, structureId, factorId, start, end, aggSubSql) return esQuery } func (the *consumerZWYHBJCAS) getStructureId() string { structureId, ok := the.Info.OtherInfo["structureId"] if !ok { log.Panicf("无法识别有效的structureId") } return structureId } //获取配置在yaml文件中的cameraInfo对应的摄像机的状态 func (the *consumerZWYHBJCAS) getCameraStatus() []interface{} { cameraArr := the.Info.CameraInfo cameras := make([]interface{}, 0) for _, cameraId := range cameraArr { //增加根据cameraId获取对应摄像机状态 camera := HBJCAS.CameraInfo{ PointUniqueCode: cameraId, Online: 1, } cameras = append(cameras, camera) } return cameras } //获取配置在yaml文件中的codeInfo对应的需要上报健康度的(桥梁|隧道|边坡)的健康度 func (the *consumerZWYHBJCAS) getCodeStatus() []interface{} { infoArr := the.Info.CodeInfo res := make([]interface{}, 0) for _, info := range infoArr { //增加根据code码获取对应摄像机状态 nInfo := HBJCAS.HealthInfo{ UniqueCode: info, EntireHealthLevel: 0, ComponentHealthLevel: 0, EvaluateTime: time.Now().UnixNano() / 1e6, } res = append(res, nInfo) } return res } // JWT头部 type Header struct { Typ string `json:"typ"` Alg string `json:"alg"` } // JWT载荷 type Payload struct { SystemID string `json:"systemId"` Seq int64 `json:"seq"` Timestamp int64 `json:"timestamp"` } type Resp struct { Code int `json:"code"` Msg string `json:"msg"` } func (the *consumerZWYHBJCAS) GenerateJWT() (string, error) { // 创建头部 header := Header{ Typ: "JWT", Alg: "SM3", } headerBytes, _ := json.Marshal(header) headerBase64 := base64.StdEncoding.EncodeToString(headerBytes) seq := int64(0) if the.Seq != 0 { if time.Now().Format("2006-01-02") != the.SeqDate { the.Seq = 0 } } seq = the.Seq // 创建载荷 payload := Payload{ SystemID: the.Info.SystemId, Seq: seq, Timestamp: time.Now().UnixNano() / 1e6, // 当前时间戳,单位毫秒 } payloadBytes, _ := json.Marshal(payload) payloadBase64 := base64.StdEncoding.EncodeToString(payloadBytes) // 创建签名 message := headerBase64 + "." + payloadBase64 var signatureBase64 string h := sm3.New() h.Write([]byte(message)) secretKeyStr, ok := the.Info.OtherInfo["secretKey"] if !ok { log.Println("未配置 secretKey") secretKeyStr = "" } secretKey := []byte(secretKeyStr) signature := hmac.New(sm3.New, secretKey) signature.Write([]byte(message)) signatureBytes := signature.Sum(nil) signatureBase64 = base64.StdEncoding.EncodeToString(signatureBytes) // 组装JWT jwt := message + "." + signatureBase64 return jwt, nil } func (the *consumerZWYHBJCAS) UploadInfo(uploadType string) { urlIndex, ok := the.Info.OtherInfo["urlIndex"] if !ok { log.Println("未配置省平台业务数据接口=============") return } url := "" var bodyInfo []interface{} switch uploadType { case "cameraInfo": url = urlIndex + "cameraInfo/statusReport" bodyInfo = the.getCameraStatus() if len(bodyInfo) == 0 { return } case "healthInfo": url = urlIndex + "healthInfo/sync" bodyInfo = the.getCodeStatus() if len(bodyInfo) == 0 { return } default: return } tBody := HBJCAS.UploadBody{ Data: bodyInfo, } jsonData, masErr := json.Marshal(tBody) if masErr != nil { fmt.Println(masErr) return } payLoadStr := string(jsonData) err := the.postInfo(url, payLoadStr) if err != nil { log.Printf("数据上报失败,err=%v\n", err) } } func (the *consumerZWYHBJCAS) UploadCamInfo() { the.UploadInfo("cameraInfo") } func (the *consumerZWYHBJCAS) UploadHeaInfo() { the.UploadInfo("healthInfo") } func (the *consumerZWYHBJCAS) postInfo(url, payloadStr string) error { payload := strings.NewReader(payloadStr) client := &http.Client{} req, requestErr := http.NewRequest("POST", url, payload) if requestErr != nil { return requestErr } jwtRes, jwtErr := the.GenerateJWT() if jwtErr != nil { return jwtErr } auth := fmt.Sprintf("Bearer %s", jwtRes) req.Header.Add("Content-Type", "application/json") req.Header.Add("Authorization", auth) res, clientErr := client.Do(req) if clientErr != nil { return clientErr } defer res.Body.Close() body, respErr := ioutil.ReadAll(res.Body) if respErr != nil { return respErr } var resp Resp err := yaml.Unmarshal(body, &resp) if err != nil { log.Printf("接口返回[%s]转换失败 err=%v", string(body), err) return err } the.Seq++ if resp.Code != 100 { log.Printf("接口[%s]返回非成功状态 code=%d,msg=[%s]", url, resp.Code, resp.Msg) } return nil } func (the *consumerZWYHBJCAS) StartHttp() { http.HandleFunc("/ping", RespHandle) http.ListenAndServe(the.Info.HttpServer, nil) } func RespHandle(w http.ResponseWriter, r *http.Request) { _, err := ioutil.ReadAll(r.Body) if err != nil && err != io.EOF { fmt.Printf("read body content failed, err:[%s]\n", err.Error()) return } fmt.Fprint(w, "Pong!") } func downloadFile(url string) ([]byte, error) { // 发送GET请求下载文件 response, err := http.Get(url) if err != nil { return nil, fmt.Errorf("failed to download file[%s],err=%v", url, err) } defer response.Body.Close() // 检查响应状态 if response.StatusCode != http.StatusOK { return nil, fmt.Errorf("down file[%s] server returned: %v", url, response.Status) } // 读取文件内容 fileContent, err := io.ReadAll(response.Body) if err != nil { return nil, fmt.Errorf("failed to read file[%s] content: %v", url, err) } return fileContent, nil }