import gc from datetime import datetime import json import queue import time from time import sleep import cv2 import numpy as np import sys import logging import configSer import models.target import models.sampleMsg import upload.DataReporter import utils from models.msg import Msg logging.basicConfig(level=logging.DEBUG) drawing: bool = False # 是否正在绘制 sigExit: bool = False # 是否退出 # 定义点 start_point: models.target.Point end_point: models.target.Point # 配置 configObj:configSer.ConfigOperate # 鼠标回调函数 def add_rectangle(event, x, y, flags, param): global start_point, end_point, drawing if event == cv2.EVENT_LBUTTONDOWN: # 左键按下 logging.info("左键按下") start_point = models.target.Point(x, y) end_point = start_point drawing = True elif event == cv2.EVENT_MOUSEMOVE: # 鼠标移动 if drawing: end_point = models.target.Point(x, y) elif event == cv2.EVENT_LBUTTONUP: # 左键抬起 logging.info("左键抬起") drawing = False end_point = models.target.Point(x, y) if start_point == end_point: return distance = cv2.norm(tuple(start_point), tuple(end_point), cv2.NORM_L2) if distance < 20: logging.info("距离小于20,无效区域") return target_id = len(configObj.config_info.targets) # 圆标靶半径 mm radius = 20.0 area=models.target.RectangleArea(int(start_point.x),int(start_point.y), int(end_point.x-start_point.x),int(end_point.y-start_point.y)) t_info=models.target.TargetInfo( target_id, "test add", area, radius, models.target.Threshold(190,9), False) new_target = models.target.CircleTarget(t_info,None,None) logging.info(f"新增区域[{target_id}] => {start_point, end_point}") configObj.config_info.targets[target_id] = new_target def read_target_rectangle(): return configObj.config_info.targets class VideoProcessor: reporter: upload.DataReporter.DataReporter capture: cv2.VideoCapture is_opened: bool= False def __init__(self, reporter:upload.DataReporter.DataReporter): self.reporter = reporter def on_data(self,msg:Msg): global configObj logging.info(f"msg={msg}") match msg.cmd: case "getPoints": targets=configObj.config_info.targets.copy() for k,v in targets.items(): targets[k].handler_info=None resp_msg = models.msg.Msg(_from="dev", cmd="getPoints", values={"targets": targets}) resp_json = resp_msg.to_json_() return resp_json case "setPoints": v=msg.values ts=v["targets"] # # 清空原配置 # configObj.config_info.targets={} for _,t in ts.items(): t_str=json.dumps(t) new_c_target = models.target.CircleTarget.from_json(t_str) configObj.config_info.targets[new_c_target.info.id] =new_c_target configObj.save2json_file() resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True}) resp_json = resp_msg.to_json() return resp_json case "videoFps": v = msg.values fps = v["fps"] self.reporter.adjust_rate(fps,"image") configObj.config_info.fps.video = fps configObj.save2json_file() resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True}) resp_json = resp_msg.to_json() return resp_json case "dataFps": v = msg.values fps = v["fps"] self.reporter.adjust_rate(fps,"data") configObj.config_info.fps.data=fps configObj.save2json_file() resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True}) resp_json = resp_msg.to_json() return resp_json case "setCap": v = msg.values cap = v["cap"] self.switch_video(cap) resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True}) resp_json = resp_msg.to_json() return resp_json print("==") def pre_handler_img(self,gray_frame,now_str:str): # 将灰度图压缩为 JPEG 格式,并存储到内存缓冲区 img_base64 = utils.frame_to_base64(gray_frame, format="JPEG") all_img = models.sampleMsg.AllImg(image=img_base64, time=now_str) self.enqueue_image(all_img) def draw_rectangle(self,img): global configObj gray_frame = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) #图像发送 now_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] self.pre_handler_img(gray_frame,now_str) if len(configObj.config_info.targets)==0: return #上报有新数据的点 all_upload_data = models.sampleMsg.AllSensorData(data=[], time=now_str) # 绘图-历史点 for i, tr in configObj.config_info.targets.items(): if not hasattr(tr, "info"): print("====") _start_point = models.target.Point(tr.info.rectangle_area.x, tr.info.rectangle_area.y) _end_point = models.target.Point( tr.info.rectangle_area.x+tr.info.rectangle_area.w, tr.info.rectangle_area.y+tr.info.rectangle_area.h) #绘制标靶区域 cv2.rectangle(img,tuple(_start_point), tuple(_end_point), (255, 0, 0), 2) #检测 sub_image = self.extract_sub_image(gray_frame, _start_point, _end_point) ret, sub_binary_frame = cv2.threshold(sub_image, tr.info.threshold.binary, 255, cv2.THRESH_BINARY) # 高斯滤波 sub_binary_frame = cv2.GaussianBlur(sub_binary_frame, (tr.info.threshold.gauss, tr.info.threshold.gauss), 10,borderType=cv2.BORDER_REPLICATE) cv2.imshow(f'{tr.info.id}_binaryImg', sub_binary_frame) # 覆盖原图 # sub_c_img= cv2.cvtColor(sub_binary_frame, cv2.COLOR_GRAY2BGR) # self.cover_sub_image(img,sub_c_img, _start_point, _end_point) circles = self.circle2_detect(sub_binary_frame) if len(circles) == 0: continue center,radius_pix=self.circle_show(img,circles,_start_point) # 纪录圆心位置 if tr.handler_info is None: tr.handler_info= models.target.HandlerInfo() if tr.handler_info.is_init: tr.handler_info.is_init=False tr.handler_info.center_init = center tr.handler_info.center_point=center tr.handler_info.radius_pix=radius_pix tr.circle_displacement() all_upload_data.data.append( models.sampleMsg.SensorData( str(tr.info.id), tr.handler_info.displacement_phy.x, tr.handler_info.displacement_phy.y) ) #过滤无效空数据 if len(all_upload_data.data)==0: return self.enqueue_data(all_upload_data) def circle_show(self,img, circles, relative_point:models.target.Point): font = cv2.FONT_HERSHEY_SIMPLEX color = (255, 0, 0) # 蓝色 scale = 0.5 circle = max(circles, key=lambda c: c[2]) # 绘制圆心 center = (circle[0] + relative_point.x, circle[1] + relative_point.y) center_int = tuple(int(x) for x in center) cv2.circle(img, center_int, 2, (0, 255, 0), 4) radius = np.round(circle[2], 3) radius_int = int(radius) # 绘制外圆 cv2.circle(img, center_int, radius_int, (0, 0, 255), 2) # 打印圆心坐标 text1 = f"center:{circle}" text2 = f"r:{radius}" txt_location = (center_int[0] + radius_int, center_int[1] + radius_int // 2) cv2.putText(img, text1, txt_location, font, scale, color, 2) cp = models.target.Point(x=center[0], y=center[1]) return cp,radius def circle2_detect(self,img): # 圆心距 canny阈值 最小半径 最大半径 circles_float = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 1.5, 30, param1=300, param2=0.9, minRadius=15, maxRadius=0) # 创建一个0行, 2列的空数组 if circles_float is not None: # 提取圆心坐标(保留2位小数) centers = [(round(float(x),2), round(float(y),2), round(float(r),2)) for x, y, r in circles_float[0, :]] return centers else: return [] def extract_sub_image(self,frame, top_left, bottom_right): """ 从帧中截取子区域 :param frame: 输入的视频帧 :param top_left: 子图片的左上角坐标 (x1, y1) :param bottom_right: 子图片的右下角坐标 (x2, y2) :return: 截取的子图片 """ x1, y1 = top_left x2, y2 = bottom_right return frame[y1:y2, x1:x2] def cover_sub_image(self,frame,sub_frame, top_left, bottom_right): x1, y1 = top_left x2, y2 = bottom_right frame[y1:y2, x1:x2]= sub_frame return frame def open_video(self,video_id): print(f"打开摄像头 -> {video_id}") self.capture = cv2.VideoCapture(video_id) frame_width = int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(f"默认分辨率= {frame_width}*{frame_height}") logging.info(f"{video_id}地址->{self.capture}") fps = self.capture.get(cv2.CAP_PROP_FPS) print(f"fps={fps},video_id={video_id},") # self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, 1600) # 宽度 # self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 900) # 高度 if not self.capture.isOpened(): self.capture.release() logging.info(f"无法打开摄像头{video_id},release 地址 -> {self.capture}") return self.is_opened=True def switch_video(self,video_id:str): print(f"切换摄像头 -> {video_id}") self.is_opened = False self.capture.release() cv2.destroyAllWindows() if str.isdigit(video_id): video_id=int(video_id) self.open_video(video_id) def show_video(self): global sigExit,start_point, end_point, drawing cv2.namedWindow('Frame') cv2.setMouseCallback('Frame', add_rectangle) # 读取一帧图像 while True: if not self.is_opened: print(f"摄像头 标记is_opened={self.is_opened}") sleep(5) continue ret, frame = self.capture.read() if ret: self.frame_handle(frame) else: logging.info(f"无法读取帧,cap地址- >{self.capture}") sleep(1) # self.capture.release() # self.capture= cv2.VideoCapture(0) # 再次尝试 if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 break if sigExit: break # 显示图像 if frame is not None: cv2.imshow('Frame', frame) def show_image(self,frame): global start_point, end_point, drawing cv2.namedWindow('Frame') cv2.setMouseCallback('Frame', add_rectangle) # 读取一帧图像 while True: cp_img=frame.copy() self.frame_handle(cp_img) if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 break cv2.destroyAllWindows() def frame_handle(self,frame): # 绘图-历史点 self.draw_rectangle(frame) # 绘图-实时 if drawing: cv2.rectangle(frame, tuple(start_point), tuple(end_point), (0, 200, 200), 4) # print(f"鼠标位置 {start_point} -> {end_point}") # 读取图像 #img_copy = img.copy() # 复制图像用于还原 def image_mode(self): img_raw=cv2.imread('images/trans/_4point.jpg')#images/target/rp80max3.jpg # img_raw = cv2.imread('images/trans/_4point.jpg') # images/target/rp80max3.jpg # img_raw = cv2.imread('images/target/rp80.jpg') # images/target/rp80max3.jpg self.show_image(img_raw) # 支持 def video_mode(self,video_id:str): if str.isdigit(video_id): video_id=int(video_id) self.open_video(video_id) # if self.is_opened: self.show_video() # 释放摄像头资源并关闭所有窗口 print("退出 video") self.capture.release() cv2.destroyAllWindows() def rtsp_mode(self,rtsp_url:str): # rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554" # rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554/h264/ch1/main/av_stream" self.open_video(rtsp_url) fps = self.capture.get(cv2.CAP_PROP_FPS) print(f"rtsp fps={fps}") self.show_video() # 释放摄像头资源并关闭所有窗口 self.capture.release() cv2.destroyAllWindows() def enqueue_data(self,data): # 获取当前时间戳 timestamp = time.time() # 将时间戳转换为 datetime 对象 dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位 # 放入图片队列(自动丢弃最旧数据当队列满时) try: self.reporter.data_queue.put((dt, data), block=False) except queue.Full: # self.reporter.data_dropped += 1 pass def enqueue_image(self,data): # 获取当前时间戳 timestamp = time.time() # 将时间戳转换为 datetime 对象 dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位 # 放入图片队列(自动丢弃最旧数据当队列满时) try: self.reporter.image_queue.put((dt, data), block=False) except queue.Full: pass #self.reporter.image_dropped += 1 #数据广播 def check_exit(sig, frame): global sigExit logging.info(f"收到退出信号 sig={sig}") sigExit=True sleep(1) logging.info("程序退出") sys.exit(0)