import json from time import sleep from typing import Dict from dataclasses import asdict import cv2 import numpy as np import signal import sys import threading import models.target import tcp_Ser # 定义全局变量 drawing = False # 是否正在绘制 start_point=models.target.Point end_point = models.target.Point target_rectangle_dict:Dict[int,models.target.CircleTarget]={} sigExit=False # 是否退出 #数据广播 sig_broadcast=True tcp = tcp_Ser.TcpSer("127.0.0.1", 2230) myb=threading.Thread(target=tcp.run) myb.start() def check_exit(sig, frame): global sigExit print(f"收到退出信号 sig={sig}") sigExit=True sleep(1) print("程序退出") sys.exit(0) #鼠标回调函数 def add_rectangle(event, x, y, flags, param): global start_point, end_point, drawing if event == cv2.EVENT_LBUTTONDOWN: # 左键按下 print("左键按下") start_point = models.target.Point(x,y) end_point = start_point drawing = True elif event == cv2.EVENT_MOUSEMOVE: # 鼠标移动 if drawing: end_point = models.target.Point(x,y) elif event == cv2.EVENT_LBUTTONUP: # 左键抬起 print("左键抬起") drawing = False end_point = models.target.Point(x,y) if start_point==end_point: return distance = cv2.norm(tuple(start_point), tuple(end_point), cv2.NORM_L2) if distance<20: print("距离小于20,无效区域") return target_id=len(target_rectangle_dict) # 圆标靶半径 mm radius= 20.0 area=models.target.RectangleArea(start_point.x,start_point.y,end_point.x-start_point.x,end_point.y-start_point.y) new_target=models.target.CircleTarget(target_id,area,radius) print(f"新增区域[{target_id}] => {start_point, end_point}") target_rectangle_dict[target_id] = new_target def draw_rectangle(img): gray_frame = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) ret, gray_frame = cv2.threshold(gray_frame, 120, 255, cv2.THRESH_BINARY) # 高斯滤波 gray_frame = cv2.GaussianBlur(gray_frame, (5, 5), 1) cv2.imshow('binaryImg', gray_frame) if len(target_rectangle_dict)==0: return #上报有新数据的点 once_upload:Dict[int,models.target.CircleTarget]={} # 绘图-历史点 for i, tr in target_rectangle_dict.items(): _start_point = tr.rectangle_area[0] _end_point = tr.rectangle_area[1] #绘制标靶区域 cv2.rectangle(img,tuple(_start_point), tuple(_end_point), (255, 0, 0), 2) #检测 sub_image = extract_sub_image(gray_frame, _start_point, _end_point) circles = circle2_detect(sub_image) if len(circles) == 0: continue center,radius=circle_show(img,circles,_start_point) # 纪录圆心位置 tr.center_point=center tr.radius_pix=radius if tr.is_init: tr.center_init=tr.center_point tr.is_init=False tar = tr.circle_displacement() msg=f"[{tar.id}]displacement_pix={tar.displacement_pix},displacement_phy={tar.displacement_phy}" print(msg) once_upload[tr.id]=tr #过滤无效空数据 if len(once_upload.items())==0: return json_str = json.dumps( {k:asdict(v) for k, v in once_upload.items() if v.is_init==False} ) print(f"标靶数据={json_str}",json_str) if sig_broadcast: tcp.broadcast_message(json_str) def circle_show(img, circles, relative_point:models.target.Point): font = cv2.FONT_HERSHEY_SIMPLEX color = (255, 0, 0) # 蓝色 scale = 0.5 circle = max(circles, key=lambda c: c[2]) # print("画圆", circle) # 绘制圆心 center = (circle[0] + relative_point.x, circle[1] + relative_point.y) center_int = tuple(int(x) for x in center) cv2.circle(img, center_int, 2, (0, 255, 0), 4) radius = np.round(circle[2], 3) radius_int = int(radius) # 绘制外圆 cv2.circle(img, center_int, radius_int, (0, 0, 255), 2) # 打印圆心坐标 text1 = f"center:{circle}" text2 = f"r:{radius}" txt_location = (center_int[0] + radius_int, center_int[1] + radius_int // 2) cv2.putText(img, text1, txt_location, font, scale, color, 2) cp=models.target.Point(x=center[0], y=center[1]) return cp,radius def circle2_detect(img): # 圆心距 canny阈值 最小半径 最大半径 circles_float = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 1.5, 30, param1=300, param2=0.9, minRadius=15, maxRadius=0) # 创建一个0行, 2列的空数组 if circles_float is not None: num_circles = circles_float.shape[1] # 获取检测到的圆的数量 print("圆的数量", num_circles) # 提取圆心坐标(保留2位小数) centers = [(round(float(x),2), round(float(y),2), round(float(r),2)) for x, y, r in circles_float[0, :]] return centers else: return [] def extract_sub_image(frame, top_left, bottom_right): """ 从帧中截取子区域 :param frame: 输入的视频帧 :param top_left: 子图片的左上角坐标 (x1, y1) :param bottom_right: 子图片的右下角坐标 (x2, y2) :return: 截取的子图片 """ x1, y1 = top_left x2, y2 = bottom_right return frame[y1:y2, x1:x2] def open_video(video_id): cap = cv2.VideoCapture(video_id) # cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1600) # 宽度 # cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 900) # 高度 if not cap.isOpened(): print("无法打开摄像头") exit() return cap def show_video(cap): global sigExit,start_point, end_point, drawing cv2.namedWindow('Frame') cv2.setMouseCallback('Frame', add_rectangle) # 读取一帧图像 while True: ret, frame = cap.read() if ret: frame_handle(frame) else: print("无法读取帧") if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 break if sigExit: break def show_image(frame): global start_point, end_point, drawing cv2.namedWindow('Frame') cv2.setMouseCallback('Frame', add_rectangle) # 读取一帧图像 while True: cp_img=frame.copy() frame_handle(cp_img) if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 break cv2.destroyAllWindows() def frame_handle(frame): # 绘图-历史点 draw_rectangle(frame) # 绘图-实时 if drawing: cv2.rectangle(frame, tuple(start_point), tuple(end_point), (0, 200, 200), 4) # print(f"鼠标位置 {start_point} -> {end_point}") # 显示图像 cv2.imshow('Frame', frame) # 读取图像 #img_copy = img.copy() # 复制图像用于还原 def video_mode(video_id): capture = open_video(video_id) fps = capture.get(cv2.CAP_PROP_FPS) print(f"fps={fps}") show_video(capture) # 释放摄像头资源并关闭所有窗口 capture.release() cv2.destroyAllWindows() def image_mode(): img_raw=cv2.imread('images/trans/_4point.jpg')#images/target/rp80max3.jpg # img_raw = cv2.imread('images/trans/_4point.jpg') # images/target/rp80max3.jpg # img_raw = cv2.imread('images/target/rp80.jpg') # images/target/rp80max3.jpg show_image(img_raw) def rtsp_mode(): # rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554" rtsp_url ="rtsp://localhost:8554/rtsp" capture = open_video(rtsp_url) fps = capture.get(cv2.CAP_PROP_FPS) print(f"fps={fps}") show_video(capture) # 释放摄像头资源并关闭所有窗口 capture.release() cv2.destroyAllWindows() if __name__ == '__main__': signal.signal(signal.SIGINT, check_exit) rtsp_mode() # video_mode(0) # image_mode() cv2.waitKey(0) cv2.destroyAllWindows()