diff --git a/LICENSE b/LICENSE deleted file mode 100644 index a4e9dc9..0000000 --- a/LICENSE +++ /dev/null @@ -1,16 +0,0 @@ -MIT No Attribution - -Copyright - -Permission is hereby granted, free of charge, to any person obtaining a copy of this -software and associated documentation files (the "Software"), to deal in the Software -without restriction, including without limitation the rights to use, copy, modify, -merge, publish, distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, -INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT -HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE -SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md deleted file mode 100644 index 2249d62..0000000 --- a/README.md +++ /dev/null @@ -1,2 +0,0 @@ -# wuyuanbiaoba - diff --git a/app.py b/app.py deleted file mode 100644 index 42224e5..0000000 --- a/app.py +++ /dev/null @@ -1,33 +0,0 @@ -from time import sleep - -import configSer -import tcp_Ser -import upload.DataReporter -import 标靶识别video -import cv2 - -if __name__ == '__main__': - config_path = "./config.json" - # 读取配置文件 - config_obj= configSer.ConfigOperate(config_path) - json_str = config_obj.config_info.to_json(indent=4) - print(f"当前配置:{json_str}") - - tcp_service = tcp_Ser.TcpSer("0.0.0.0", config_obj.config_info.server.port) - tcp_service.start() - reporter = upload.DataReporter.DataReporter(data_fps=config_obj.config_info.fps.data,video_fps=config_obj.config_info.fps.video) - reporter.register_handler(tcp_service.broadcast_message) - reporter.start() - # 启动video - 标靶识别video.configObj=config_obj - processor = 标靶识别video.VideoProcessor(reporter) - # 添加订阅者processor - tcp_service.add_subscribe(processor) - # 启动 - processor.video_mode(config_obj.config_info.capture) - - while True: - sleep(10) - - cv2.waitKey(0) - cv2.destroyAllWindows() \ No newline at end of file diff --git a/configSer.py b/configSer.py deleted file mode 100644 index 1e3bb19..0000000 --- a/configSer.py +++ /dev/null @@ -1,114 +0,0 @@ -import json -import os -from dataclasses import ( - dataclass, - field -) -from typing import Dict - -import numpy as np -from dataclasses_json import dataclass_json - -import models.target - -_file_path: str - - -@dataclass_json -@dataclass -class Server: - port: int = 0 - -@dataclass_json -@dataclass -class Fps: - data: int = 0 - video: int = 0 - -@dataclass_json -@dataclass -class ConfigInfo: - server:Server - fps:Fps - capture: str = "0" - # 标靶配置 - targets: Dict[int, models.target.CircleTarget] = field(default_factory=dict) - -class ConfigOperate: - _file_path: str - config_info: ConfigInfo - def __init__(self,path:str): - self._file_path = path - self.load2obj_sample() - - - def load2dict(self): - """"读取配置""" - if not os.path.exists(self._file_path): - raise FileNotFoundError(f"配置文件 {self._file_path} 不存在") - - with open(self._file_path) as json_file: - config = json.load(json_file) - return config - - # def load2obj_sample2(self): - # """"读取配置""" - # dic=self.load2dict() - # ts = dic["targets"] - # capture = dic["capture"] - # # 获取矩阵数据 - # matrix_dict = dic.get("perspective", {}) - # # n0=convert_to_ndarray(self.matrix_dict["0"]) - # # 将矩阵转换为字符串 - # # matrix_str = np.array2string(n0, precision=8, separator=', ', suppress_small=True) - # for _,t in ts.items(): - # obj = models.target.TargetInfo(**t) - # area = models.target.RectangleArea.from_dict(obj.rectangle_area) - # thres = models.target.Threshold(**obj.threshold) - # self.targets[obj.id] = models.target.CircleTarget( - # obj.id, - # obj.desc, - # area, - # obj.radius, - # thres, - # obj.base - # ) - # return self.targets - - def load2obj_sample(self): - dic=self.load2dict() - dict_str = json.dumps(dic) - self.config_info=ConfigInfo.from_json(dict_str) - - def save2json_file(self): - json_str = self.config_info.to_json(indent=4) - """"更新配置""" - with open(self._file_path, 'w') as json_file: - json_file.write(json_str) - # json.dump(self, json_file, indent=4) - return None - - - def save_dict_config(self, dict_data:Dict): - """"更新配置""" - with open(self._file_path, 'w') as json_file: - json.dump(dict_data, json_file, indent=4) - return None - - def update_dict_config(self, updates): - """ - 更新配置文件中的特定字段。 - :param file_path: 配置文件路径 - :param updates: 包含更新内容的字典 - """ - config_dict = self.load2dict() - config_dict.update(updates) - self.save_dict_config(config_dict) - -def convert_to_ndarray(matrix_data): - """ - 将 JSON 中的矩阵数据转换为 numpy ndarray。 - :param matrix_data: JSON 中的矩阵数据(列表形式) - :return: numpy ndarray - """ - return np.array(matrix_data, dtype=np.float64) \ No newline at end of file diff --git a/main.py b/main.py deleted file mode 100644 index fcb1163..0000000 --- a/main.py +++ /dev/null @@ -1,9 +0,0 @@ -def print_hi(name): - # 在下面的代码行中使用断点来调试脚本。 - print(f'Hi, {name}') # 按 Ctrl+F8 切换断点。 - - -# 按装订区域中的绿色按钮以运行脚本。 -if __name__ == '__main__': - print_hi('PyCharm') - diff --git a/tcp_Ser.py b/tcp_Ser.py deleted file mode 100644 index c9f678a..0000000 --- a/tcp_Ser.py +++ /dev/null @@ -1,122 +0,0 @@ -import logging -import socket -import threading -from unittest import case - -from models.msg import Msg - - -class TcpSer(threading.Thread): - # 定义服务器地址和端口 - HOST = '127.0.0.1' - PORT = 2230 - def __init__(self,host,port): - super().__init__() - self.HOST=host - self.PORT=port - self.connected_clients=[] - # 消费者 - self.consumers=[] - self.lock = threading.Lock() - # 处理客户端连接的函数 - def handle_client(self,client_socket): - try: - # 当客户端连接时,将其添加到列表中 - self.connected_clients.append(client_socket) - print(f"新连接: {client_socket.getpeername()}") - - # 保持连接,直到客户端断开 - while True: - # 接收客户端数据(如果需要) - data = client_socket.recv(4096) - msg_str=data.decode('utf-8') - if not data: - break # 如果没有数据,退出循环 - print(f"从 {client_socket.getpeername()} 收到: {msg_str}") - # 反序列化为 实例 - s_cmd = Msg.from_json(msg_str) - valid_msg:bool=True - match s_cmd.cmd: - case "getPoints" | "setPoints": - self.on_data(s_cmd,valid_msg) - case "videoFps"| "dataFps": - self.on_data(s_cmd,valid_msg) - case "setCap": - self.on_data(s_cmd,valid_msg) - # todo 添加处理 - case _: - valid_msg = False - err_msg=f"valid cmd={s_cmd.cmd}" - resp=f"""{{"_from": "dev","cmd": "{s_cmd.cmd}","values": {{"operate": false,"err": "{err_msg}"}}}}""" - client_socket.sendall(resp.encode()) - print("非法命令",resp) - - print("通讯完成") - except Exception as e: - print(f"处理客户端时出错: {e}") - finally: - # 从列表中移除客户端并关闭连接 - if client_socket in self.connected_clients: - self.connected_clients.remove(client_socket) - print(f"连接关闭: {client_socket.getpeername()}") - client_socket.close() - - # 注册的消费者必须携带on_data 方法 - def add_subscribe(self,consumer): - if hasattr(consumer, 'on_data'): - print(f"加入 consumer {consumer} ") - self.consumers.append(consumer) - else: - print("consumer 缺少on_data函数,订阅无效 ") - def on_data(self,msg,valid): - for consumer in self.consumers: - try: - resp=consumer.on_data(msg) - self.broadcast_message(resp) - except Exception as e: - logging.warn("通讯异常",e) - - # 广播消息给所有连接的客户端 - def broadcast_message(self,message:str): - with self.lock: - if len(message)==0: - return - - message+="\n\n" - for client in self.connected_clients: - try: - client.sendall(message.encode()) - except Exception as e: - print(f"向客户端发送消息时出错: {e}") - # 如果发送失败,从列表中移除客户端 - if client in self.connected_clients: - self.connected_clients.remove(client) - client.close() - def run(self): - # 创建服务器套接字 - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: - server_socket.bind((self.HOST,self.PORT)) - server_socket.listen() - print(f"服务器监听在 {self.HOST}:{self.PORT}...") - - try: - # 保持服务器运行并接受新的连接 - while True: - client_socket, addr = server_socket.accept() - print(f"连接来自 {addr}") - - # 为每个客户端启动一个线程 - client_thread = threading.Thread(target=self.handle_client, args=(client_socket,)) - client_thread.daemon = True # 守护线程,服务器关闭时自动结束 - client_thread.start() - - except KeyboardInterrupt: - print("服务器关闭...") - finally: - # 关闭所有客户端连接 - for client in self.connected_clients: - client.close() - server_socket.close() -if __name__ == '__main__': - tcp=TcpSer("127.0.0.1",2230) - tcp.run() \ No newline at end of file diff --git a/utils.py b/utils.py deleted file mode 100644 index e1f5537..0000000 --- a/utils.py +++ /dev/null @@ -1,34 +0,0 @@ -from typing import Dict, List - -import cv2 -import base64 - -import numpy as np - - -def frame_to_base64(frame, format="JPEG"): - """将 OpenCV 读取的图片帧转换为 Base64 编码的字符串""" - # 将图片帧编码为 JPEG 或 PNG 格式 - if format.upper() == "JPEG": - encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 80] # JPEG 压缩质量 - elif format.upper() == "PNG": - encode_param = [int(cv2.IMWRITE_PNG_COMPRESSION), 9] # PNG 压缩级别 - else: - raise ValueError("Unsupported format. Use 'JPEG' or 'PNG'.") - - result, encoded_frame = cv2.imencode(f".{format.lower()}", frame, encode_param) - if not result: - raise ValueError("Failed to encode frame.") - - # 将编码后的字节流转换为 Base64 字符串 - base64_string = base64.b64encode(encoded_frame).decode("utf-8") - return base64_string - - - -# 自定义编码器和解码器 -def encode_perspective(value: np.ndarray) -> List[List[float]]: - return value.tolist() - -def decode_perspective(value: List[List[float]]) -> np.ndarray: - return np.array(value) \ No newline at end of file diff --git a/标靶识别video.py b/videoDetection.py similarity index 67% rename from 标靶识别video.py rename to videoDetection.py index 7e08770..1af821a 100644 --- a/标靶识别video.py +++ b/videoDetection.py @@ -1,4 +1,3 @@ -import gc from datetime import datetime import json import queue @@ -6,7 +5,6 @@ import time from time import sleep import cv2 import numpy as np -import sys import logging import configSer @@ -14,11 +12,12 @@ import models.target import models.sampleMsg import upload.DataReporter import utils +import videoPush from models.msg import Msg logging.basicConfig(level=logging.DEBUG) drawing: bool = False # 是否正在绘制 -sigExit: bool = False # 是否退出 +is_video_mode: bool = False # 是否采用标靶图覆盖 # 定义点 start_point: models.target.Point end_point: models.target.Point @@ -68,11 +67,12 @@ class VideoProcessor: reporter: upload.DataReporter.DataReporter capture: cv2.VideoCapture is_opened: bool= False + is_running = True def __init__(self, reporter:upload.DataReporter.DataReporter): self.reporter = reporter def on_data(self,msg:Msg): - global configObj + global configObj,is_video_mode logging.info(f"msg={msg}") match msg.cmd: case "getPoints": @@ -87,8 +87,8 @@ class VideoProcessor: v=msg.values ts=v["targets"] - # # 清空原配置 - # configObj.config_info.targets={} + # 清空原配置 + configObj.config_info.targets={} for _,t in ts.items(): t_str=json.dumps(t) new_c_target = models.target.CircleTarget.from_json(t_str) @@ -104,7 +104,7 @@ class VideoProcessor: self.reporter.adjust_rate(fps,"image") configObj.config_info.fps.video = fps configObj.save2json_file() - resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True}) + resp_msg = models.msg.Msg(_from="dev", cmd="videoFps", values={"operate": True}) resp_json = resp_msg.to_json() return resp_json case "dataFps": @@ -113,14 +113,21 @@ class VideoProcessor: self.reporter.adjust_rate(fps,"data") configObj.config_info.fps.data=fps configObj.save2json_file() - resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True}) + resp_msg = models.msg.Msg(_from="dev", cmd="dataFps", values={"operate": True}) resp_json = resp_msg.to_json() return resp_json case "setCap": v = msg.values cap = v["cap"] self.switch_video(cap) - resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True}) + resp_msg = models.msg.Msg(_from="dev", cmd="setCap", values={"operate": True}) + resp_json = resp_msg.to_json() + return resp_json + case "videoMode": + v = msg.values + is_debug = v["debug"] + is_video_mode=is_debug + resp_msg = models.msg.Msg(_from="dev", cmd="videoMode", values={"operate": True}) resp_json = resp_msg.to_json() return resp_json print("==") @@ -133,16 +140,20 @@ class VideoProcessor: self.enqueue_image(all_img) def draw_rectangle(self,img): - global configObj + global configObj,is_video_mode gray_frame = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - + now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] #图像发送 - now_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - self.pre_handler_img(gray_frame,now_str) + if configObj.config_info.fps.video > 0: + self.pre_handler_img(gray_frame,now_str) if len(configObj.config_info.targets)==0: return + + # 基点 + base_point_pix:models.target.Point=None + #上报有新数据的点 all_upload_data = models.sampleMsg.AllSensorData(data=[], time=now_str) # 绘图-历史点 @@ -154,23 +165,48 @@ class VideoProcessor: tr.info.rectangle_area.x+tr.info.rectangle_area.w, tr.info.rectangle_area.y+tr.info.rectangle_area.h) #绘制标靶区域 - cv2.rectangle(img,tuple(_start_point), tuple(_end_point), (255, 0, 0), 2) + blue_color = (255, 0, 0) + if tr.info.base: + blue_color=(200, 0, 200) #紫红色 基点 + + cv2.rectangle(img,tuple(_start_point), tuple(_end_point), blue_color, 2) + label=f"{tr.info.desc},r={tr.info.radius}" + cv2.putText(img, label, (_start_point.x,_start_point.y-6), cv2.FONT_HERSHEY_SIMPLEX, 0.6, blue_color,1) #检测 + # 获取图像尺寸 + frame_height, frame_width = gray_frame.shape[:2] + if _end_point.x>frame_width or _end_point.y>frame_height: + print(f"标靶[{tr.info.desc}]sub_image 超出区域") + continue sub_image = self.extract_sub_image(gray_frame, _start_point, _end_point) + ret, sub_binary_frame = cv2.threshold(sub_image, tr.info.threshold.binary, 255, cv2.THRESH_BINARY) # 高斯滤波 - sub_binary_frame = cv2.GaussianBlur(sub_binary_frame, (tr.info.threshold.gauss, tr.info.threshold.gauss), 10,borderType=cv2.BORDER_REPLICATE) - cv2.imshow(f'{tr.info.id}_binaryImg', sub_binary_frame) + gauss_size=tr.info.threshold.gauss + sub_binary_frame = cv2.GaussianBlur(sub_binary_frame, (gauss_size, gauss_size), sigmaX=0,sigmaY=0,borderType=cv2.BORDER_REPLICATE) + # sub_binary_frame = cv2.bilateralFilter(sub_binary_frame, 5, 50, 50) + if tr.perspective is not None: + # 宽度 + sub_width = sub_binary_frame.shape[1] + # 高度 + sub_height = sub_binary_frame.shape[0] + + sub_binary_frame = cv2.warpPerspective(sub_binary_frame, tr.perspective, (sub_width, sub_height)) + # 覆盖原图 - # sub_c_img= cv2.cvtColor(sub_binary_frame, cv2.COLOR_GRAY2BGR) - # self.cover_sub_image(img,sub_c_img, _start_point, _end_point) + if is_video_mode: + sub_c_img= cv2.cvtColor(sub_binary_frame, cv2.COLOR_GRAY2BGR) + self.cover_sub_image(img,sub_c_img, _start_point, _end_point) + + if __debug__: + cv2.imshow(f'{tr.info.id}_binaryImg', sub_binary_frame) circles = self.circle2_detect(sub_binary_frame) if len(circles) == 0: continue - center,radius_pix=self.circle_show(img,circles,_start_point) + center,radius_pix=self.circle_match(circles,_start_point) # 纪录圆心位置 if tr.handler_info is None: tr.handler_info= models.target.HandlerInfo() @@ -178,53 +214,89 @@ class VideoProcessor: tr.handler_info.is_init=False tr.handler_info.center_init = center - tr.handler_info.center_point=center + # 数据平滑处理 + smooth_center = tr.handler_info.enqueue_center_point(center) + # print(f"{tr.info.desc},平滑len={len(tr.handler_info.center_point_queue)},平滑中心点={smooth_center},原始点={center}") + tr.handler_info.center_point=smooth_center + # 原始处理 tr.handler_info.center_point=center tr.handler_info.radius_pix=radius_pix - tr.circle_displacement() + tr.circle_displacement_pix() + + # 基点 + if tr.info.base: + base_point_pix=tr.handler_info.displacement_pix + + # 画圆 + self.circle_show(img,smooth_center,radius_pix,_start_point,_end_point) + + # 基于像素点计算 物理距离 + for i, tr in configObj.config_info.targets.items(): + + if tr.handler_info is None: + continue + if tr.handler_info.displacement_pix is None: + continue + # 减去基点偏移 + if base_point_pix is not None: + raw_point=tr.handler_info.displacement_pix + tr.handler_info.displacement_pix=models.target.Point( + x=raw_point.x-base_point_pix.x, + y=raw_point.y-base_point_pix.y) + if not tr.info.base: + # print(f"[{tr.info.id}]{tr.info.desc} 原偏 {raw_point} - 基偏{base_point_pix} ={tr.handler_info.displacement_pix}") + pass + tr.circle_displacement_phy() + if tr.handler_info.displacement_phy is not None: + all_upload_data.data.append( + models.sampleMsg.SensorData( + str(tr.info.id), + tr.handler_info.displacement_phy.x, + tr.handler_info.displacement_phy.y) + ) + tr.handler_info.displacement_pix=None + tr.handler_info.displacement_phy = None - all_upload_data.data.append( - models.sampleMsg.SensorData( - str(tr.info.id), - tr.handler_info.displacement_phy.x, - tr.handler_info.displacement_phy.y) - ) #过滤无效空数据 if len(all_upload_data.data)==0: return - self.enqueue_data(all_upload_data) + if configObj.config_info.fps.data > 0: + self.enqueue_data(all_upload_data) - def circle_show(self,img, circles, relative_point:models.target.Point): - font = cv2.FONT_HERSHEY_SIMPLEX - color = (255, 0, 0) # 蓝色 - scale = 0.5 + + def circle_match(self,circles, rect_s_point:models.target.Point)-> (models.target.Point,float): circle = max(circles, key=lambda c: c[2]) # 绘制圆心 - center = (circle[0] + relative_point.x, circle[1] + relative_point.y) + center = (circle[0] + rect_s_point.x, circle[1] + rect_s_point.y) + radius = float(np.round(circle[2], 3)) + cp = models.target.Point(x=center[0], y=center[1]) + return cp,radius + + def circle_show(self, img, center: models.target.Point,radius:float, rect_s_point: models.target.Point,rect_e_point: models.target.Point): + + font = cv2.FONT_HERSHEY_SIMPLEX + color = (255, 0, 0) # 蓝色 + scale = 0.5 center_int = tuple(int(x) for x in center) - cv2.circle(img, center_int, 2, (0, 255, 0), 4) - radius = np.round(circle[2], 3) radius_int = int(radius) + cv2.circle(img, center_int, 2, (0, 255, 0), 4) # 绘制外圆 - cv2.circle(img, center_int, radius_int, (0, 0, 255), 2) + cv2.circle(img, center_int, radius_int, (0, 0, 255), 1) # 打印圆心坐标 - text1 = f"center:{circle}" - text2 = f"r:{radius}" - txt_location = (center_int[0] + radius_int, center_int[1] + radius_int // 2) - cv2.putText(img, text1, txt_location, font, scale, color, 2) - - cp = models.target.Point(x=center[0], y=center[1]) - return cp,radius + text1 = f"c:{(center.x,center.y,radius)}" + txt_location = (rect_s_point.x+2,rect_e_point.y-2) + # txt_location = (center_int[0] - radius_int, center_int[1] + radius_int + 10) + cv2.putText(img, text1, txt_location, font, scale, color, 1) def circle2_detect(self,img): # 圆心距 canny阈值 最小半径 最大半径 - circles_float = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 1.5, 30, param1=300, param2=0.9, minRadius=15, + circles_float = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 1, 30, param1=200, param2=0.8, minRadius=15, maxRadius=0) # 创建一个0行, 2列的空数组 if circles_float is not None: @@ -255,20 +327,21 @@ class VideoProcessor: def open_video(self,video_id): + sleep(1) print(f"打开摄像头 -> {video_id}") self.capture = cv2.VideoCapture(video_id) frame_width = int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(f"默认分辨率= {frame_width}*{frame_height}") logging.info(f"{video_id}地址->{self.capture}") + if not self.capture.isOpened(): + self.capture.release() + logging.info(f"无法打开摄像头{video_id}, release地址 -> {self.capture}") + return fps = self.capture.get(cv2.CAP_PROP_FPS) print(f"fps={fps},video_id={video_id},") # self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, 1600) # 宽度 # self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 900) # 高度 - if not self.capture.isOpened(): - self.capture.release() - logging.info(f"无法打开摄像头{video_id},release 地址 -> {self.capture}") - return self.is_opened=True def switch_video(self,video_id:str): @@ -284,10 +357,11 @@ class VideoProcessor: def show_video(self): global sigExit,start_point, end_point, drawing - cv2.namedWindow('Frame') - cv2.setMouseCallback('Frame', add_rectangle) + if __debug__: + cv2.namedWindow('Frame') + cv2.setMouseCallback('Frame', add_rectangle) # 读取一帧图像 - while True: + while self.is_running: if not self.is_opened: print(f"摄像头 标记is_opened={self.is_opened}") sleep(5) @@ -300,17 +374,19 @@ class VideoProcessor: sleep(1) # self.capture.release() # self.capture= cv2.VideoCapture(0) # 再次尝试 - if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 - break - if sigExit: - break + cv2.waitKey(1) # 显示图像 if frame is not None: - cv2.imshow('Frame', frame) + if __debug__: + cv2.imshow('Frame', frame) + #缓存到推流 + videoPush.update_latest_frame(frame) + print("退出VideoProcessor") def show_image(self,frame): global start_point, end_point, drawing - cv2.namedWindow('Frame') - cv2.setMouseCallback('Frame', add_rectangle) + if __debug__: + cv2.namedWindow('Frame') + cv2.setMouseCallback('Frame', add_rectangle) # 读取一帧图像 while True: cp_img=frame.copy() @@ -328,9 +404,6 @@ class VideoProcessor: cv2.rectangle(frame, tuple(start_point), tuple(end_point), (0, 200, 200), 4) # print(f"鼠标位置 {start_point} -> {end_point}") - # 读取图像 - #img_copy = img.copy() # 复制图像用于还原 - def image_mode(self): img_raw=cv2.imread('images/trans/_4point.jpg')#images/target/rp80max3.jpg # img_raw = cv2.imread('images/trans/_4point.jpg') # images/target/rp80max3.jpg @@ -342,7 +415,6 @@ class VideoProcessor: if str.isdigit(video_id): video_id=int(video_id) self.open_video(video_id) - # if self.is_opened: self.show_video() # 释放摄像头资源并关闭所有窗口 print("退出 video") @@ -383,15 +455,9 @@ class VideoProcessor: pass #self.reporter.image_dropped += 1 - #数据广播 -def check_exit(sig, frame): - global sigExit - logging.info(f"收到退出信号 sig={sig}") - sigExit=True - sleep(1) - logging.info("程序退出") - sys.exit(0) - + def stop(self): + self.is_running=False + self.capture.release() diff --git a/videoPush.py b/videoPush.py new file mode 100644 index 0000000..48a74e5 --- /dev/null +++ b/videoPush.py @@ -0,0 +1,60 @@ +from time import sleep + +import cv2 +import threading +import time + +import numpy as np +import requests +from flask import Flask, Response, render_template_string, request + +import utils + +app_flask = Flask(__name__) + +# 全局变量,用于存储最新的帧 +latest_frame:np.ndarray = None +lock = threading.Lock() +is_running = True + +def update_latest_frame(n_bytes:np.ndarray): + global latest_frame + latest_frame=n_bytes + +def generate_mjpeg(): + """生成MJPEG格式的视频流""" + while is_running: + with lock: + if latest_frame is None: + continue + _,latest_frame_bytes = utils.frame_to_img(latest_frame) + frame = latest_frame_bytes.tobytes() + pass + sleep(0.1) + yield (b'--frame\r\n' + b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') + + +@app_flask.route('/video_flow') +def video_feed(): + """视频流路由""" + return Response( + generate_mjpeg(), + mimetype='multipart/x-mixed-replace; boundary=frame' + ) + +def run(): + port=2240 + print(f"推流服务启动,访问端口 127.0.0.1:{port}/video_flow") + app_flask.run(host='0.0.0.0', port=port, threaded=True) + +def video_server_run(): + thread = threading.Thread(target=run) + thread.daemon = True # 设置为守护线程,主线程退出时自动终止 + thread.start() +def stop(): + global is_running + is_running=False + +if __name__ == '__main__': + run() \ No newline at end of file diff --git a/三维圆.py b/三维圆.py deleted file mode 100644 index 8f13e7f..0000000 --- a/三维圆.py +++ /dev/null @@ -1,164 +0,0 @@ -import math - -import cv2 -import numpy as np -import matplotlib.pyplot as plt -import matplotlib -matplotlib.use('TkAgg') - -elevation=0 -azimuth=0 -def circle3d(): - global elevation,azimuth - # 创建一个新的图和一个3D坐标轴 - fig = plt.figure() - ax = fig.add_subplot(111, projection='3d') - - # 定义圆的参数 - radius = 1 # 半径 - theta = np.linspace(0, 2 * np.pi, 100) # 参数角度 - x = radius * np.cos(theta) # x坐标 - y = radius * np.sin(theta) # y坐标 - z = np.zeros_like(theta) # z坐标(这里圆在xy平面上) - - # 绘制圆 - # ax.plot(x, y, z, label='3D Circle', color='b') - - # 绘制圆的正俯视图 - # 正视图(xz平面) - ax.plot(x, np.zeros_like(theta), z, label='z View', color='r') - ax.plot(np.zeros_like(theta), y, z, label='h View', color='b') - # 俯视图(xy平面) - ax.plot(x, y, np.zeros_like(theta), label='Top View', color='g') - - # 设置坐标轴范围 - ax.set_xlim([-2, 2]) - ax.set_ylim([-2, 2]) - ax.set_zlim([-2, 2]) - ax.set_xlabel('X') - ax.set_ylabel('Y') - ax.set_zlabel('Z') - # 添加图例 - ax.legend() - # 设置初始视角 - # ax.view_init(elev=53, azim=-48,roll=-38) # 俯仰角30度,方位角45度 - ax.view_init(elev=90, azim=0, roll=0) # 俯仰角30度,方位角45度 - # 隐藏整个坐标轴 - # ax.axis('off') - # 设置窗口透明度 - fig.canvas.manager.window.attributes('-alpha', 0.6) # 设置窗口透明度为 0.6 - # 显示图形 - plt.show() - # input("请用鼠标转动 调整角度 elevation and azimuth: ") - # 获取当前的 elevation 和 azimuth - elevation = ax.elev - azimuth = ax.azim - - -def perspective_transform_by_angle(img, _elevation, _azimuth): - h, w = img.shape[:2] - - # 根据角度计算目标点位置 - fov = math.radians(45) # 假设视场角45度 - dz = 1 / math.tan(_elevation) - dx = dz * math.tan(_azimuth) - - # 源图像四个角点(原始斜视图) - src_points = np.float32([[0, 0], [w, 0], [w, h], [0, h]]) - - # 计算目标点(俯视图) - dst_points = np.float32([ - [w * 0.5 * (1 - dx), h * 0.5 * (1 - dz)], # 左上 - [w * 0.5 * (1 + dx), h * 0.5 * (1 - dz)], # 右上 - [w * 0.5 * (1 + dx), h * 0.5 * (1 + dz)], # 右下 - [w * 0.5 * (1 - dx), h * 0.5 * (1 + dz)] # 左下 - ]) - - # 获取变换矩阵并应用 - M = cv2.getPerspectiveTransform(src_points, dst_points) - transformed_image=cv2.warpPerspective(img, M, (h,w)) - # 显示原始图像和变换后的图像 - fig, ax = plt.subplots(1, 2,figsize= (12, 6)) - ax[0].imshow(img) - ax[0].set_title("Original Image") - ax[1].imshow(transformed_image) - ax[1].set_title("Transformed Image") - plt.show() - - -def trans_img(image_path ,x,y): - global elevation, azimuth - image = cv2.imread(image_path) - image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - - # 获取图像尺寸 - height, width = image.shape[:2] - - center_x, center_y = 532,385 #圆心 - # 定义源图像的四个点(假设为矩形) - sx1=center_x-100 - sx2=center_x+100 - sy1=center_y-100 - sy2=center_y + 100 - src_points = np.float32([ - [sx1, sy1], # 左上 - [sx2, sy1], # 右上 - [sx2, sy2], # 右下 - [sx1, sy2] # 左下 - ]) - # src_points = np.float32([ - # [center_x, sy1], # 上 - # [sx2, center_y], # 右 - # [center_x, sy2], # 下 - # [sx1, center_y] # 左 - # ]) - # 根据 elevation 和 azimuth 计算目标点 - # 这里是一个简化的计算方法,可以根据实际需求调整 - # 假设目标点在透视变换后的坐标 - # 将斜视的画面变成俯视的画面 - radius = 100 - - # 根据俯仰角和方位角计算目标点的偏移 - offset_x = radius * np.sin(elevation) * np.cos(azimuth) - offset_y = radius * np.sin(elevation) * np.sin(azimuth) - offset_z = radius * np.cos(elevation) - print(f"offset_x={offset_x},offset_y={offset_y}") - # 计算目标点 - # dst_points = np.float32([ - # [0 - offset_x, 0 - offset_y], # 左上 - # [width + offset_x, 0 - offset_y], # 右上 - # [width + offset_x, height + offset_y], # 右下 - # [0 - offset_x, height + offset_y] # 左下 - # ]) - dst_points = np.float32([ - [sx1- offset_x, sy1- offset_y], # 上 - [sx2 + offset_x, sy1 - offset_y], # 右上 - [sx2 + offset_x, sy2+ offset_y], # 右下 - [sx1 - offset_x, sy2 + offset_y] # 下 - ]) - # 计算透视变换矩阵 - matrix = cv2.getPerspectiveTransform(src_points, dst_points) - - # 应用透视变换 - transformed_image = cv2.warpPerspective(image, matrix, (width, height)) - - # 显示原始图像和变换后的图像 - fig, ax = plt.subplots(1, 2,figsize= (12, 6)) - ax[0].imshow(image) - ax[0].set_title("Original Image") - ax[1].imshow(transformed_image) - ax[1].set_title("Transformed Image") - plt.show() - -if __name__ == '__main__': - circle3d() - print("测试============") - print(f"elevation: {elevation}") - print(f" azimuth: {azimuth}") - img_path="images/trans/subRawImg.jpg" - rawimg=cv2.imread(img_path) - cv2.imshow("raw Image", rawimg) - # trans_img(img_path,elevation,azimuth) - elev = math.radians(elevation) - azim = math.radians(azimuth) - perspective_transform_by_angle(rawimg, elev,azim) \ No newline at end of file diff --git a/标靶识别.py b/标靶识别.py deleted file mode 100644 index a292278..0000000 --- a/标靶识别.py +++ /dev/null @@ -1,254 +0,0 @@ -import json -from time import sleep -from typing import Dict -from dataclasses import asdict -import cv2 -import numpy as np -import signal -import sys -import threading - -import models.target -import tcp_Ser -# 定义全局变量 -drawing = False # 是否正在绘制 -start_point=models.target.Point -end_point = models.target.Point -target_rectangle_dict:Dict[int,models.target.CircleTarget]={} -sigExit=False # 是否退出 -#数据广播 -sig_broadcast=True -tcp = tcp_Ser.TcpSer("127.0.0.1", 2230) -myb=threading.Thread(target=tcp.run) -myb.start() -def check_exit(sig, frame): - global sigExit - print(f"收到退出信号 sig={sig}") - sigExit=True - sleep(1) - print("程序退出") - sys.exit(0) - -#鼠标回调函数 -def add_rectangle(event, x, y, flags, param): - global start_point, end_point, drawing - - if event == cv2.EVENT_LBUTTONDOWN: # 左键按下 - print("左键按下") - start_point = models.target.Point(x,y) - end_point = start_point - drawing = True - elif event == cv2.EVENT_MOUSEMOVE: # 鼠标移动 - if drawing: - end_point = models.target.Point(x,y) - elif event == cv2.EVENT_LBUTTONUP: # 左键抬起 - print("左键抬起") - drawing = False - end_point = models.target.Point(x,y) - if start_point==end_point: - return - distance = cv2.norm(tuple(start_point), tuple(end_point), cv2.NORM_L2) - if distance<20: - print("距离小于20,无效区域") - return - target_id=len(target_rectangle_dict) - # 圆标靶半径 mm - radius= 20.0 - area=models.target.RectangleArea(start_point.x,start_point.y,end_point.x-start_point.x,end_point.y-start_point.y) - new_target=models.target.CircleTarget(target_id,area,radius) - print(f"新增区域[{target_id}] => {start_point, end_point}") - target_rectangle_dict[target_id] = new_target - - - - -def draw_rectangle(img): - gray_frame = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - - ret, gray_frame = cv2.threshold(gray_frame, 120, 255, cv2.THRESH_BINARY) - # 高斯滤波 - gray_frame = cv2.GaussianBlur(gray_frame, (5, 5), 1) - cv2.imshow('binaryImg', gray_frame) - - if len(target_rectangle_dict)==0: return - - #上报有新数据的点 - once_upload:Dict[int,models.target.CircleTarget]={} - - # 绘图-历史点 - for i, tr in target_rectangle_dict.items(): - _start_point = tr.rectangle_area[0] - _end_point = tr.rectangle_area[1] - #绘制标靶区域 - cv2.rectangle(img,tuple(_start_point), tuple(_end_point), (255, 0, 0), 2) - #检测 - sub_image = extract_sub_image(gray_frame, _start_point, _end_point) - circles = circle2_detect(sub_image) - if len(circles) == 0: - continue - center,radius=circle_show(img,circles,_start_point) - # 纪录圆心位置 - tr.center_point=center - tr.radius_pix=radius - if tr.is_init: - tr.center_init=tr.center_point - tr.is_init=False - tar = tr.circle_displacement() - msg=f"[{tar.id}]displacement_pix={tar.displacement_pix},displacement_phy={tar.displacement_phy}" - print(msg) - once_upload[tr.id]=tr - - - #过滤无效空数据 - if len(once_upload.items())==0: return - json_str = json.dumps( - {k:asdict(v) for k, v in once_upload.items() if v.is_init==False} - ) - print(f"标靶数据={json_str}",json_str) - if sig_broadcast: - tcp.broadcast_message(json_str) - - -def circle_show(img, circles, relative_point:models.target.Point): - font = cv2.FONT_HERSHEY_SIMPLEX - color = (255, 0, 0) # 蓝色 - scale = 0.5 - - circle = max(circles, key=lambda c: c[2]) - # print("画圆", circle) - # 绘制圆心 - center = (circle[0] + relative_point.x, circle[1] + relative_point.y) - center_int = tuple(int(x) for x in center) - cv2.circle(img, center_int, 2, (0, 255, 0), 4) - radius = np.round(circle[2], 3) - radius_int = int(radius) - # 绘制外圆 - cv2.circle(img, center_int, radius_int, (0, 0, 255), 2) - # 打印圆心坐标 - - text1 = f"center:{circle}" - text2 = f"r:{radius}" - txt_location = (center_int[0] + radius_int, center_int[1] + radius_int // 2) - cv2.putText(img, text1, txt_location, font, scale, color, 2) - - cp=models.target.Point(x=center[0], y=center[1]) - return cp,radius - - -def circle2_detect(img): - # 圆心距 canny阈值 最小半径 最大半径 - circles_float = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 1.5, 30, param1=300, param2=0.9, minRadius=15, - maxRadius=0) - # 创建一个0行, 2列的空数组 - if circles_float is not None: - num_circles = circles_float.shape[1] # 获取检测到的圆的数量 - print("圆的数量", num_circles) - # 提取圆心坐标(保留2位小数) - centers = [(round(float(x),2), round(float(y),2), round(float(r),2)) for x, y, r in circles_float[0, :]] - return centers - else: - return [] - - -def extract_sub_image(frame, top_left, bottom_right): - """ - 从帧中截取子区域 - :param frame: 输入的视频帧 - :param top_left: 子图片的左上角坐标 (x1, y1) - :param bottom_right: 子图片的右下角坐标 (x2, y2) - :return: 截取的子图片 - """ - x1, y1 = top_left - x2, y2 = bottom_right - return frame[y1:y2, x1:x2] - - -def open_video(video_id): - cap = cv2.VideoCapture(video_id) - # cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1600) # 宽度 - # cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 900) # 高度 - if not cap.isOpened(): - print("无法打开摄像头") - exit() - return cap - - -def show_video(cap): - global sigExit,start_point, end_point, drawing - cv2.namedWindow('Frame') - cv2.setMouseCallback('Frame', add_rectangle) - # 读取一帧图像 - while True: - ret, frame = cap.read() - if ret: - frame_handle(frame) - else: - print("无法读取帧") - if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 - break - if sigExit: - break -def show_image(frame): - global start_point, end_point, drawing - cv2.namedWindow('Frame') - cv2.setMouseCallback('Frame', add_rectangle) - # 读取一帧图像 - while True: - cp_img=frame.copy() - frame_handle(cp_img) - - if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 - break - cv2.destroyAllWindows() - -def frame_handle(frame): - # 绘图-历史点 - draw_rectangle(frame) - # 绘图-实时 - if drawing: - cv2.rectangle(frame, tuple(start_point), tuple(end_point), (0, 200, 200), 4) - # print(f"鼠标位置 {start_point} -> {end_point}") - # 显示图像 - cv2.imshow('Frame', frame) - -# 读取图像 -#img_copy = img.copy() # 复制图像用于还原 - -def video_mode(video_id): - capture = open_video(video_id) - fps = capture.get(cv2.CAP_PROP_FPS) - print(f"fps={fps}") - show_video(capture) - # 释放摄像头资源并关闭所有窗口 - capture.release() - cv2.destroyAllWindows() - -def image_mode(): - img_raw=cv2.imread('images/trans/_4point.jpg')#images/target/rp80max3.jpg - # img_raw = cv2.imread('images/trans/_4point.jpg') # images/target/rp80max3.jpg - # img_raw = cv2.imread('images/target/rp80.jpg') # images/target/rp80max3.jpg - show_image(img_raw) - -def rtsp_mode(): - # rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554" - rtsp_url ="rtsp://localhost:8554/rtsp" - - capture = open_video(rtsp_url) - fps = capture.get(cv2.CAP_PROP_FPS) - print(f"fps={fps}") - show_video(capture) - # 释放摄像头资源并关闭所有窗口 - capture.release() - cv2.destroyAllWindows() - -if __name__ == '__main__': - signal.signal(signal.SIGINT, check_exit) - - rtsp_mode() - # video_mode(0) - # image_mode() - - cv2.waitKey(0) - cv2.destroyAllWindows() - - diff --git a/缩放比例.py b/缩放比例.py deleted file mode 100644 index d02d3aa..0000000 --- a/缩放比例.py +++ /dev/null @@ -1,30 +0,0 @@ - -import matplotlib.pyplot as plt -import numpy as np - - -# 存储初始范围和当前比例 -initial_limits = {'x': None, 'y': None} -current_scale = 1.0 -fig, ax = plt.subplots() -x = np.linspace(0, 10, 100) -ax.plot(x, np.sin(x)) -def on_press(event): - if event.button == 3: # 右键按下 - initial_limits['x'] = ax.get_xlim() - initial_limits['y'] = ax.get_ylim() - -def on_release(event): - global current_scale - if event.button == 3 and initial_limits['x'] is not None: - new_xlim = ax.get_xlim() - scale_x = (initial_limits['x'][1] - initial_limits['x'][0]) / \ - (new_xlim[1] - new_xlim[0]) - print(f"X轴缩放比例: {scale_x:.2f}倍") - current_scale *= scale_x - print(f"累计总缩放: {current_scale:.2f}倍") - -if __name__ == '__main__': - fig.canvas.mpl_connect('button_press_event', on_press) - fig.canvas.mpl_connect('button_release_event', on_release) - plt.show() diff --git a/透视变换.py b/透视变换.py deleted file mode 100644 index 99c3fde..0000000 --- a/透视变换.py +++ /dev/null @@ -1,62 +0,0 @@ -import cv2 -import numpy as np -import 标靶识别 as ie - - -def perspective_transformation(image): - if image is None: - print("Error: Unable to load image.") - exit() - # 定义源图像中的四个点 - src_points = np.float32([ - [4, 16], # 左上角 - [795, 14], # 右上角 - [1027, 736], # 右下角 - [181, 856] # 左下角 - ]) - # 定义目标图像中的四个点 # 1050 * 900 - dst_points = np.float32([ - [4, 16], # 左上角 - [795, 14], # 右上角 - [795, 850], # 右下角 - [4, 850] # 左下角 - ]) - - # 计算透视变换矩阵 - M = cv2.getPerspectiveTransform(src_points, dst_points) - print(f"原矩阵={M}") - # 逆矩阵 - inverse_matrix = np.linalg.inv(M) - print(f"逆矩阵={inverse_matrix}") - # 应用透视变换 - transformed_image = cv2.warpPerspective(image, M, (1050, 900)) - # 应用透视变换 - inv_transformed_image = cv2.warpPerspective(transformed_image, inverse_matrix, (1050, 900)) - - # 显示原始图像和变换后的图像 - cv2.imshow("Original Image", image) - cv2.imshow("Trans Image", transformed_image) - cv2.imshow("iTrans Image", inv_transformed_image) - cv2.waitKey(0) - cv2.destroyAllWindows() - -def sub_img(): - # 读取图像 - image = cv2.imread("images/target/need_trans.jpg") - if image is None: - print("Error: Unable to load image.") - exit() - # 1050 * 900 - startPoint = [550, 700] - endPoint = [1600, 1600] - # 绘制标靶区域 - # 检测 - subImg = ie.extract_sub_image(image, startPoint, endPoint) - # cv2.imshow("subImg", subImg) - # cv2.waitKey(0) - # cv2.destroyAllWindows() - return subImg - -if __name__ == '__main__': - img=sub_img() - perspective_transformation(img) \ No newline at end of file diff --git a/透视变换手动选取4点.py b/透视变换手动选取4点.py deleted file mode 100644 index dce3026..0000000 --- a/透视变换手动选取4点.py +++ /dev/null @@ -1,149 +0,0 @@ -import cv2 -import numpy as np - -# 全局变量 -points = [] # 存储选择的四个点 -img = None # 存储原始图像 -img_copy = None # 用于绘制的图像副本 - - -def mouse_callback(event, x, y, flags, param): - """鼠标回调函数,用于选择四个点""" - global img_copy, points - - if event == cv2.EVENT_LBUTTONDOWN: - if len(points) < 4: - points.append((x, y)) - print(f"已选择点 {len(points)}: ({x}, {y})") - - # 在图像上绘制点 - cv2.circle(img_copy, (x, y), 5, (0, 255, 0), -1) - - # 如果已经选择了4个点,绘制连线 - if len(points) == 4: - # 按照上、右、下、左的顺序连接点 - for i in range(4): - cv2.line(img_copy, points[i], points[(i + 1) % 4], (0, 255, 0), 2) - - # 标记每个点的位置 - cv2.putText(img_copy, "Top", points[0], cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) - cv2.putText(img_copy, "Right", points[1], cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) - cv2.putText(img_copy, "Bottom", points[2], cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) - cv2.putText(img_copy, "Left", points[3], cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) - - cv2.imshow("Select Points", img_copy) - - -def perspective_transform(image, src_points): - """执行透视变换""" - # 将点排序为上、右、下、左 - top, right, bottom, left = src_points - - # 计算新图像的宽度(取左右边的最大值) - width_a = np.linalg.norm(np.array(right) - np.array(left)) - width_b = np.linalg.norm(np.array(top) - np.array(bottom)) - max_width = max(int(width_a), int(width_b)) - - # 计算新图像的高度(取上下边的最大值) - height_a = np.linalg.norm(np.array(bottom) - np.array(top)) - height_b = np.linalg.norm(np.array(right) - np.array(left)) - max_height = max(int(height_a), int(height_b)) - - # 定义目标点 - dst = np.array([ - [0, 0], # 左上角 - [max_width - 1, 0], # 右上角 - [max_width - 1, max_height - 1], # 右下角 - [0, max_height - 1] # 左下角 - ], dtype="float32") - - # 转换源点数组为numpy数组 - src = np.array([top, right, bottom, left], dtype="float32") - - # 计算透视变换矩阵 - M = cv2.getPerspectiveTransform(src, dst) - - # 执行透视变换 - warped = cv2.warpPerspective(image, M, (max_width, max_height)) - - return warped - -def k_perspective_transform(image, src_points): - """执行透视变换""" - # 将点排序为上、右、下、左 - top, right, bottom, left = src_points - - # 计算新图像的宽度(取左右边的最大值) - sub_zy = np.array(right) - np.array(left) - sub_sx = np.array(top) - np.array(bottom) - - sub_x=sub_sx[0]/2 - print("x差值",sub_sx[0]) - - sub_y = sub_zy[1] / 2 - print("y差值", sub_sx[0]) - - # 定义目标点 - dst = np.array([ - [top[0]-sub_x, top[1]], # 左上角 - [right[0], right[1]- sub_y], # 左上角 - [bottom[0]+sub_x, bottom[1]], # 左上角 - [left[0] , left[1]+ sub_y], # 左上角 - ], dtype="float32") - - # 转换源点数组为numpy数组 - src = np.array([top, right, bottom, left], dtype="float32") - - # 计算透视变换矩阵 - M = cv2.getPerspectiveTransform(src, dst) - - print(f"矩阵M={M}") - - # 执行透视变换 - warped = cv2.warpPerspective(image, M, (1050, 900)) - - return warped - -def main(): - global img, img_copy, points - - # 读取图像 - img = cv2.imread("images/trans/subRawImg.jpg") - if img is None: - print("无法加载图像,请检查路径是否正确") - return - - img_copy = img.copy() - - # 创建窗口并设置鼠标回调 - cv2.namedWindow("Select Points") - cv2.setMouseCallback("Select Points", mouse_callback) - - print("请按顺序点击选择四个点:上、右、下、左") - print("选择完成后按任意键继续...") - - while True: - cv2.imshow("Select Points", img_copy) - key = cv2.waitKey(1) & 0xFF - - # 如果已经选择了4个点,按任意键继续 - if len(points) == 4: - break - - # 执行透视变换 - warped = k_perspective_transform(img, points) - - # 显示结果 - cv2.imshow("Original Image", img) - cv2.imshow("Transformed", warped) - - output_path="images/trans/_4point.jpg" - cv2.imwrite(output_path, warped) - print(f"图像已保存到 {output_path}") - - cv2.waitKey(0) - cv2.destroyAllWindows() - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/通过变换角度获取变换公式.py b/通过变换角度获取变换公式.py deleted file mode 100644 index 316afe7..0000000 --- a/通过变换角度获取变换公式.py +++ /dev/null @@ -1,126 +0,0 @@ -import numpy as np -import matplotlib.pyplot as plt -from mpl_toolkits.mplot3d import Axes3D -import 透视变换 -import cv2 -def build_3d_rotation_matrix(elev, azim, roll): - """生成基于elev/azim/roll的完整旋转矩阵""" - elev_rad = np.radians(elev) - azim_rad = np.radians(azim) - roll_rad = np.radians(roll) - - # 绕x轴旋转(elevation) - Rx = np.array([ - [1, 0, 0], - [0, np.cos(elev_rad), -np.sin(elev_rad)], - [0, np.sin(elev_rad), np.cos(elev_rad)] - ]) - - # 绕y轴旋转(azimuth) - Ry = np.array([ - [np.cos(azim_rad), 0, np.sin(azim_rad)], - [0, 1, 0], - [-np.sin(azim_rad), 0, np.cos(azim_rad)] - ]) - - # 绕z轴旋转(roll) - Rz = np.array([ - [np.cos(roll_rad), -np.sin(roll_rad), 0], - [np.sin(roll_rad), np.cos(roll_rad), 0], - [0, 0, 1] - ]) - - return Rz @ Ry @ Rx # 组合旋转顺序 - - - -if __name__ == '__main__': - # 参数设置 - elev, azim, roll = 34, 0,-35 - rotation_3d = build_3d_rotation_matrix(elev, azim, roll) - - # 创建单位圆 - theta = np.linspace(0, 2 * np.pi, 100) - x = np.cos(theta) - y = np.sin(theta) - z = np.zeros_like(x) - circle = np.vstack((x, y, z)) # 3xN - - circle_transformed = rotation_3d @ circle - # 逆矩阵 - inverse_matrix = np.linalg.inv(rotation_3d) - # 画图 - fig = plt.figure(figsize=(12, 6)) - # 设置窗口透明度 - fig.canvas.manager.window.attributes('-alpha', 0.6) # 设置窗口透明度为 0.6 - ax1 = fig.add_subplot(121, projection='3d') - ax2 = fig.add_subplot(122, projection='3d') - # 标识圆心 - ax1.scatter(0, 0, 0, color='red', s=20, label='Center') - # 原始单位圆 - ax1.plot(circle[0], circle[1], circle[2], label='Original Circle') - # 原始单位圆 横轴和纵轴 - ax1.plot(circle[0], np.zeros_like(theta), circle[2], label='z View', color='r') - ax1.plot(np.zeros_like(theta), circle[1], circle[2], label='h View', color='b') - ax1.set_title('Original Circle (elev=90°, roll=0°)') - ax1.set_xlim([-1, 1]) - ax1.set_ylim([-1, 1]) - ax1.set_zlim([-1, 1]) - ax1.set_box_aspect([1, 1, 1]) - ax1.set_xlabel('X') - ax1.set_ylabel('Y') - ax1.set_zlabel('Z') - ax1.view_init(elev=90, azim=90) # 从Z轴方向观察 保持opencv方向一致 x->左 y->下 - # 变换后的单位圆 - # 标识圆心 - ax2.scatter(0, 0, 0, color='red', s=20, label='Center') - ax2.plot(circle_transformed[0], circle_transformed[1], circle_transformed[2], color='r', label='Transformed Circle') - ax2.set_title('Transformed (elev=52°, roll=-35°)') - ax2.set_xlim([-1, 1]) - ax2.set_ylim([-1, 1]) - ax2.set_zlim([-1, 1]) - ax2.set_xlabel('X') - ax2.set_ylabel('Y') - ax2.set_zlabel('Z') - ax2.set_box_aspect([1, 1, 1]) - ax2.view_init(elev=90, azim=90) # 从Z轴方向观察 - plt.show() - - image_zhen= cv2.imread("images/trans/transformed_image.jpg") - image_xie = cv2.imread("images/trans/subRawImg.jpg") - # transformed_image_hy = cv2.warpPerspective(transformed_image, inverse_matrix, dsize=(image.shape[1], image.shape[0])) - # # 执行变换(自动计算输出图像尺寸) - # 裁剪像素值到 [0, 255] 范围 - - # 获取图像的大小 - height, width = image_zhen.shape[:2] - # 计算中心点坐标 - center_x = width // 2 - center_y = height // 2 - # 构造一个平移矩阵,将原点移到中心 - M_translate = np.float32([ - [1, 0, -1*center_x], - [0, 1, -1*center_y], - [0, 0, 1] - ]) - image_xie_padding = cv2.warpPerspective(image_xie, M_translate, - dsize=(image_xie.shape[1], image_xie.shape[0])) - cv2.imshow("image_xie_padding", image_xie_padding) - # 将平移矩阵与目标变换矩阵结合起来 - # inverse_M_combined = np.dot(M_translate, inverse_matrix) - inverse_matrix[2][2]=1.0 - inverse_M_combined = np.dot(M_translate, inverse_matrix) - print(f"斜矩阵={rotation_3d}") - rotation_3d_int = np.clip(rotation_3d, 0, 255) - print(f"斜矩阵_int={rotation_3d}") - print(f"逆矩阵={inverse_M_combined}") - inverse_M_combined_int = np.clip(inverse_M_combined, 0, 255) - # print(f"逆矩阵_int={inverse_M_combined_int}") - transformed_image_hy = cv2.warpPerspective(image_xie, inverse_M_combined_int, - dsize=(image_xie.shape[1]*2, image_xie.shape[0]*2)) - cv2.imshow("transformed_image_hy", transformed_image_hy) - # transformed_image = cv2.warpPerspective(image_zhen, rotation_3d_int,dsize=(image_zhen.shape[1], image_zhen.shape[0])) - # cv2.imshow("rotated_img", transformed_image) - plt.show() - cv2.waitKey(0) - # cv2.destroyAllWindows() \ No newline at end of file diff --git a/通过变换角度获取变换公式deepseek.py b/通过变换角度获取变换公式deepseek.py deleted file mode 100644 index ad92130..0000000 --- a/通过变换角度获取变换公式deepseek.py +++ /dev/null @@ -1,163 +0,0 @@ -import cv2 -import numpy as np -from math import cos, sin, radians - - -def get_rotation_matrix(angle_x, angle_y, angle_z): - """生成3D旋转矩阵""" - # 转换为弧度 - rx = radians(angle_x) - ry = radians(angle_y) - rz = radians(angle_z) - - # X轴旋转矩阵 - mat_x = np.array([ - [1, 0, 0], - [0, cos(rx), -sin(rx)], - [0, sin(rx), cos(rx)] - ]) - - # Y轴旋转矩阵 - mat_y = np.array([ - [cos(ry), 0, sin(ry)], - [0, 1, 0], - [-sin(ry), 0, cos(ry)] - ]) - - # Z轴旋转矩阵 - mat_z = np.array([ - [cos(rz), -sin(rz), 0], - [sin(rz), cos(rz), 0], - [0, 0, 1] - ]) - - # 组合旋转矩阵 - rotation_matrix = np.dot(np.dot(mat_x, mat_y), mat_z) - return rotation_matrix - - -def perspective_transform(image, angle_x=0, angle_y=0, angle_z=0, scale=1.0): - """应用透视变换""" - h, w = image.shape[:2] - - # 获取旋转矩阵 - rotation_matrix = get_rotation_matrix(angle_x, angle_y, angle_z) - - # 创建3D点到2D点的映射 - # 将2D图像视为3D空间中Z=0平面上的物体 - points_3d = np.array([ - [0, 0, 0], # 左上 - [w, 0, 0], # 右上 - [w, h, 0], # 右下 - [0, h, 0] # 左下 - ], dtype=np.float32) - - # 应用旋转 - points_3d_rotated = np.dot(points_3d, rotation_matrix.T) - - # 添加透视效果 - 这里简单地将Z坐标作为深度 - # 可以调整这个值来改变透视强度 - points_2d_homo = points_3d_rotated[:, :2] / (scale - points_3d_rotated[:, 2:3] * 0.001) - - # 计算变换矩阵 - src_points = np.array([[0, 0], [w, 0], [w, h], [0, h]], dtype=np.float32) - dst_points = points_2d_homo.astype(np.float32) - - # 计算中心偏移 - min_xy = dst_points.min(axis=0) - max_xy = dst_points.max(axis=0) - dst_points -= min_xy - - # 计算新图像大小 - new_w = int(max_xy[0] - min_xy[0]) - new_h = int(max_xy[1] - min_xy[1]) - - # 获取透视变换矩阵 - M = cv2.getPerspectiveTransform(src_points, dst_points) - - # 应用变换 - transformed = cv2.warpPerspective(image, M, (new_w, new_h)) - - return transformed - - -def combined_perspective_transform(image, angle_x1, angle_y1, angle_z1, angle_y2, scale1=1.0, scale2=1.0): - """合并两次变换:第一次任意旋转,第二次Y轴旋转""" - h, w = image.shape[:2] - - # 第一次旋转矩阵 - rot1 = get_rotation_matrix(angle_x1, angle_y1, angle_z1) - - # 第二次旋转矩阵 (Y轴旋转) - rot2 = get_rotation_matrix(0, angle_y2, 0) - - # 合并旋转矩阵 - combined_rot = np.dot(rot2, rot1) - - # 创建3D点到2D点的映射 - points_3d = np.array([ - [0, 0, 0], # 左上 - [w, 0, 0], # 右上 - [w, h, 0], # 右下 - [0, h, 0] # 左下 - ], dtype=np.float32) - - # 应用合并后的旋转 - points_3d_rotated = np.dot(points_3d, combined_rot.T) - - # 添加透视效果 - points_2d_homo = points_3d_rotated[:, :2] / ((scale1 * scale2) - points_3d_rotated[:, 2:3] * 0.001) - - # 计算变换矩阵 - src_points = np.array([[0, 0], [w, 0], [w, h], [0, h]], dtype=np.float32) - dst_points = points_2d_homo.astype(np.float32) - - # 计算中心偏移 - min_xy = dst_points.min(axis=0) - max_xy = dst_points.max(axis=0) - dst_points -= min_xy - - # 计算新图像大小 - new_w = int(max_xy[0] - min_xy[0]) - new_h = int(max_xy[1] - min_xy[1]) - - # 获取透视变换矩阵 - M = cv2.getPerspectiveTransform(src_points, dst_points) - - # 应用变换 - transformed = cv2.warpPerspective(image, M, (new_w, new_h)) - - return transformed - -def show_transformed(): - image = cv2.imread('images/trans/transformed_image.jpg') - # 应用透视变换 - # 参数说明:angle_x, angle_y, angle_z 分别为绕X,Y,Z轴的旋转角度 - # scale 控制透视效果的强度 - transformed = perspective_transform(image, angle_x=34, angle_y=43, angle_z=-35, scale=1) - - # 显示结果 - cv2.imshow('Original', image) - cv2.imshow('Transformed', transformed) - cv2.waitKey(0) - cv2.destroyAllWindows() -def show_transformed_combined(): - image = cv2.imread('images/trans/transformed_image.jpg') - if image is None: - print("请替换为您的图片路径") - else: - # 第一次变换:任意角度 - # 第二次变换:Y轴旋转90度 - final_result = combined_perspective_transform( - image, - angle_x1=34, angle_y1=0, angle_z1=-35, # 第一次旋转参数 - angle_y2=43, # 第二次Y轴旋转90度 - scale1=1.2, scale2=1.0 # 透视参数 - ) - - cv2.imshow('Original', image) - cv2.imshow('Final Result', final_result) - cv2.waitKey(0) - cv2.destroyAllWindows() -if __name__ == '__main__': - show_transformed_combined() \ No newline at end of file diff --git a/霍夫变换_检测圆.py b/霍夫变换_检测圆.py deleted file mode 100644 index da8efbc..0000000 --- a/霍夫变换_检测圆.py +++ /dev/null @@ -1,86 +0,0 @@ -import cv2 -import numpy as np -from cv2 import waitKey - - -imgRaw=cv2.imread('images/target/bowa_target/min.jpg',cv2.IMREAD_GRAYSCALE)#images/target/rp80max3.jpg -imgColor=cv2.imread('images/target/bowa_target/min.jpg') #images/target/rp80max3.jpg - -ret, binary_img = cv2.threshold(imgRaw, 180, 255, cv2.THRESH_BINARY) -#cv2.imshow('binary_img', binary_img) -def circle_detect(img): - #高斯滤波 - #img = cv2.GaussianBlur(img, (3, 3), 1) - #cv2.imshow('gsmh', gaussianBlur) - # 圆心距 canny阈值 最小半径 最大半径 - circlesFloat = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 2, 10, param1=50, param2=0.9, minRadius=10, maxRadius=0) - print("==========") - # 创建一个0行, 2列的空数组 - if circlesFloat is not None: - num_circles = circlesFloat.shape[1] # 获取检测到的圆的数量 - print("圆的数量",num_circles) - # 提取圆心坐标(保留小数) - centers = [(float(x), float(y),float(r)) for x, y, r in circlesFloat[0, :]] - - font = cv2.FONT_HERSHEY_SIMPLEX - color = (255, 0, 0) # 蓝色 - scale = 1 - # 打印圆心坐标 - for center3d in centers: - center=(center3d[0], center3d[1]) - # center_txt=f"{float(center[0]),float(center[1])}" - text=f"center:{center},r:{center3d[2]}" - print(text) - centerInt=tuple(int(x) for x in center) - cv2.putText(img, text, centerInt, font, scale, color,2) - - circles = np.uint16(np.around(circlesFloat)) # 4舍5入, 然后转为uint16 - for i in circles[0, :]: - print("画圆", i) - # 绘制圆心 - center=(i[0], i[1]) - cv2.circle(img, center, 2, (0, 255, 0), 6) - # 绘制外圆 - cv2.circle(img, center, i[2], (0, 0, 255), 2) - -def circle_detect2(img): - #高斯滤波 - #img = cv2.GaussianBlur(img, (3, 3), 1) - #cv2.imshow('gsmh', gaussianBlur) - # 圆心距 canny阈值 最小半径 最大半径 - circlesFloat = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 2, 10, param1=50, param2=0.9, minRadius=10, maxRadius=0) - print("==========") - # 创建一个0行, 2列的空数组 - if circlesFloat is not None: - num_circles = circlesFloat.shape[1] # 获取检测到的圆的数量 - print("圆的数量",num_circles) - # 提取圆心坐标(保留小数) - centers = [(float(x), float(y),float(r)) for x, y, r in circlesFloat[0, :]] - - font = cv2.FONT_HERSHEY_SIMPLEX - color = (255, 0, 0) # 蓝色 - scale = 1 - # 打印圆心坐标 - for center3d in centers: - center=(center3d[0], center3d[1]) - # center_txt=f"{float(center[0]),float(center[1])}" - text=f"center:{center},r:{center3d[2]}" - print(text) - centerInt=tuple(int(x) for x in center) - cv2.putText(img, text, centerInt, font, scale, color,2) - - circles = np.uint16(np.around(circlesFloat)) # 4舍5入, 然后转为uint16 - for i in circles[0, :]: - print("画圆", i) - # 绘制圆心 - center=(i[0], i[1]) - cv2.circle(img, center, 2, (0, 255, 0), 6) - # 绘制外圆 - cv2.circle(img, center, i[2], (0, 0, 255), 2) - - -if __name__ == '__main__': - circle_detect(binary_img) - cv2.imshow('tagCircle', imgColor) - waitKey(0) - cv2.destroyAllWindows() \ No newline at end of file