From f8e4ca0de43c97ceec198f7e2a21a233c3283534 Mon Sep 17 00:00:00 2001 From: lucas Date: Mon, 30 Jun 2025 17:49:33 +0800 Subject: [PATCH] =?UTF-8?q?delete=20=E6=97=A7=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- LICENSE | 16 -- README.md | 2 - app.py | 33 --- configSer.py | 114 -------- main.py | 9 - tcp_Ser.py | 122 --------- utils.py | 34 --- 标靶识别video.py => videoDetection.py | 208 +++++++++----- videoPush.py | 60 +++++ 三维圆.py | 164 ----------- 标靶识别.py | 254 ------------------ 缩放比例.py | 30 --- 透视变换.py | 62 ----- 透视变换手动选取4点.py | 149 ---------- 通过变换角度获取变换公式.py | 126 --------- ...变换角度获取变换公式deepseek.py | 163 ----------- 霍夫变换_检测圆.py | 86 ------ 17 files changed, 197 insertions(+), 1435 deletions(-) delete mode 100644 LICENSE delete mode 100644 README.md delete mode 100644 app.py delete mode 100644 configSer.py delete mode 100644 main.py delete mode 100644 tcp_Ser.py delete mode 100644 utils.py rename 标靶识别video.py => videoDetection.py (67%) create mode 100644 videoPush.py delete mode 100644 三维圆.py delete mode 100644 标靶识别.py delete mode 100644 缩放比例.py delete mode 100644 透视变换.py delete mode 100644 透视变换手动选取4点.py delete mode 100644 通过变换角度获取变换公式.py delete mode 100644 通过变换角度获取变换公式deepseek.py delete mode 100644 霍夫变换_检测圆.py diff --git a/LICENSE b/LICENSE deleted file mode 100644 index a4e9dc9..0000000 --- a/LICENSE +++ /dev/null @@ -1,16 +0,0 @@ -MIT No Attribution - -Copyright - -Permission is hereby granted, free of charge, to any person obtaining a copy of this -software and associated documentation files (the "Software"), to deal in the Software -without restriction, including without limitation the rights to use, copy, modify, -merge, publish, distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, -INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT -HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE -SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md deleted file mode 100644 index 2249d62..0000000 --- a/README.md +++ /dev/null @@ -1,2 +0,0 @@ -# wuyuanbiaoba - diff --git a/app.py b/app.py deleted file mode 100644 index 42224e5..0000000 --- a/app.py +++ /dev/null @@ -1,33 +0,0 @@ -from time import sleep - -import configSer -import tcp_Ser -import upload.DataReporter -import 标靶识别video -import cv2 - -if __name__ == '__main__': - config_path = "./config.json" - # 读取配置文件 - config_obj= configSer.ConfigOperate(config_path) - json_str = config_obj.config_info.to_json(indent=4) - print(f"当前配置:{json_str}") - - tcp_service = tcp_Ser.TcpSer("0.0.0.0", config_obj.config_info.server.port) - tcp_service.start() - reporter = upload.DataReporter.DataReporter(data_fps=config_obj.config_info.fps.data,video_fps=config_obj.config_info.fps.video) - reporter.register_handler(tcp_service.broadcast_message) - reporter.start() - # 启动video - 标靶识别video.configObj=config_obj - processor = 标靶识别video.VideoProcessor(reporter) - # 添加订阅者processor - tcp_service.add_subscribe(processor) - # 启动 - processor.video_mode(config_obj.config_info.capture) - - while True: - sleep(10) - - cv2.waitKey(0) - cv2.destroyAllWindows() \ No newline at end of file diff --git a/configSer.py b/configSer.py deleted file mode 100644 index 1e3bb19..0000000 --- a/configSer.py +++ /dev/null @@ -1,114 +0,0 @@ -import json -import os -from dataclasses import ( - dataclass, - field -) -from typing import Dict - -import numpy as np -from dataclasses_json import dataclass_json - -import models.target - -_file_path: str - - -@dataclass_json -@dataclass -class Server: - port: int = 0 - -@dataclass_json -@dataclass -class Fps: - data: int = 0 - video: int = 0 - -@dataclass_json -@dataclass -class ConfigInfo: - server:Server - fps:Fps - capture: str = "0" - # 标靶配置 - targets: Dict[int, models.target.CircleTarget] = field(default_factory=dict) - -class ConfigOperate: - _file_path: str - config_info: ConfigInfo - def __init__(self,path:str): - self._file_path = path - self.load2obj_sample() - - - def load2dict(self): - """"读取配置""" - if not os.path.exists(self._file_path): - raise FileNotFoundError(f"配置文件 {self._file_path} 不存在") - - with open(self._file_path) as json_file: - config = json.load(json_file) - return config - - # def load2obj_sample2(self): - # """"读取配置""" - # dic=self.load2dict() - # ts = dic["targets"] - # capture = dic["capture"] - # # 获取矩阵数据 - # matrix_dict = dic.get("perspective", {}) - # # n0=convert_to_ndarray(self.matrix_dict["0"]) - # # 将矩阵转换为字符串 - # # matrix_str = np.array2string(n0, precision=8, separator=', ', suppress_small=True) - # for _,t in ts.items(): - # obj = models.target.TargetInfo(**t) - # area = models.target.RectangleArea.from_dict(obj.rectangle_area) - # thres = models.target.Threshold(**obj.threshold) - # self.targets[obj.id] = models.target.CircleTarget( - # obj.id, - # obj.desc, - # area, - # obj.radius, - # thres, - # obj.base - # ) - # return self.targets - - def load2obj_sample(self): - dic=self.load2dict() - dict_str = json.dumps(dic) - self.config_info=ConfigInfo.from_json(dict_str) - - def save2json_file(self): - json_str = self.config_info.to_json(indent=4) - """"更新配置""" - with open(self._file_path, 'w') as json_file: - json_file.write(json_str) - # json.dump(self, json_file, indent=4) - return None - - - def save_dict_config(self, dict_data:Dict): - """"更新配置""" - with open(self._file_path, 'w') as json_file: - json.dump(dict_data, json_file, indent=4) - return None - - def update_dict_config(self, updates): - """ - 更新配置文件中的特定字段。 - :param file_path: 配置文件路径 - :param updates: 包含更新内容的字典 - """ - config_dict = self.load2dict() - config_dict.update(updates) - self.save_dict_config(config_dict) - -def convert_to_ndarray(matrix_data): - """ - 将 JSON 中的矩阵数据转换为 numpy ndarray。 - :param matrix_data: JSON 中的矩阵数据(列表形式) - :return: numpy ndarray - """ - return np.array(matrix_data, dtype=np.float64) \ No newline at end of file diff --git a/main.py b/main.py deleted file mode 100644 index fcb1163..0000000 --- a/main.py +++ /dev/null @@ -1,9 +0,0 @@ -def print_hi(name): - # 在下面的代码行中使用断点来调试脚本。 - print(f'Hi, {name}') # 按 Ctrl+F8 切换断点。 - - -# 按装订区域中的绿色按钮以运行脚本。 -if __name__ == '__main__': - print_hi('PyCharm') - diff --git a/tcp_Ser.py b/tcp_Ser.py deleted file mode 100644 index c9f678a..0000000 --- a/tcp_Ser.py +++ /dev/null @@ -1,122 +0,0 @@ -import logging -import socket -import threading -from unittest import case - -from models.msg import Msg - - -class TcpSer(threading.Thread): - # 定义服务器地址和端口 - HOST = '127.0.0.1' - PORT = 2230 - def __init__(self,host,port): - super().__init__() - self.HOST=host - self.PORT=port - self.connected_clients=[] - # 消费者 - self.consumers=[] - self.lock = threading.Lock() - # 处理客户端连接的函数 - def handle_client(self,client_socket): - try: - # 当客户端连接时,将其添加到列表中 - self.connected_clients.append(client_socket) - print(f"新连接: {client_socket.getpeername()}") - - # 保持连接,直到客户端断开 - while True: - # 接收客户端数据(如果需要) - data = client_socket.recv(4096) - msg_str=data.decode('utf-8') - if not data: - break # 如果没有数据,退出循环 - print(f"从 {client_socket.getpeername()} 收到: {msg_str}") - # 反序列化为 实例 - s_cmd = Msg.from_json(msg_str) - valid_msg:bool=True - match s_cmd.cmd: - case "getPoints" | "setPoints": - self.on_data(s_cmd,valid_msg) - case "videoFps"| "dataFps": - self.on_data(s_cmd,valid_msg) - case "setCap": - self.on_data(s_cmd,valid_msg) - # todo 添加处理 - case _: - valid_msg = False - err_msg=f"valid cmd={s_cmd.cmd}" - resp=f"""{{"_from": "dev","cmd": "{s_cmd.cmd}","values": {{"operate": false,"err": "{err_msg}"}}}}""" - client_socket.sendall(resp.encode()) - print("非法命令",resp) - - print("通讯完成") - except Exception as e: - print(f"处理客户端时出错: {e}") - finally: - # 从列表中移除客户端并关闭连接 - if client_socket in self.connected_clients: - self.connected_clients.remove(client_socket) - print(f"连接关闭: {client_socket.getpeername()}") - client_socket.close() - - # 注册的消费者必须携带on_data 方法 - def add_subscribe(self,consumer): - if hasattr(consumer, 'on_data'): - print(f"加入 consumer {consumer} ") - self.consumers.append(consumer) - else: - print("consumer 缺少on_data函数,订阅无效 ") - def on_data(self,msg,valid): - for consumer in self.consumers: - try: - resp=consumer.on_data(msg) - self.broadcast_message(resp) - except Exception as e: - logging.warn("通讯异常",e) - - # 广播消息给所有连接的客户端 - def broadcast_message(self,message:str): - with self.lock: - if len(message)==0: - return - - message+="\n\n" - for client in self.connected_clients: - try: - client.sendall(message.encode()) - except Exception as e: - print(f"向客户端发送消息时出错: {e}") - # 如果发送失败,从列表中移除客户端 - if client in self.connected_clients: - self.connected_clients.remove(client) - client.close() - def run(self): - # 创建服务器套接字 - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: - server_socket.bind((self.HOST,self.PORT)) - server_socket.listen() - print(f"服务器监听在 {self.HOST}:{self.PORT}...") - - try: - # 保持服务器运行并接受新的连接 - while True: - client_socket, addr = server_socket.accept() - print(f"连接来自 {addr}") - - # 为每个客户端启动一个线程 - client_thread = threading.Thread(target=self.handle_client, args=(client_socket,)) - client_thread.daemon = True # 守护线程,服务器关闭时自动结束 - client_thread.start() - - except KeyboardInterrupt: - print("服务器关闭...") - finally: - # 关闭所有客户端连接 - for client in self.connected_clients: - client.close() - server_socket.close() -if __name__ == '__main__': - tcp=TcpSer("127.0.0.1",2230) - tcp.run() \ No newline at end of file diff --git a/utils.py b/utils.py deleted file mode 100644 index e1f5537..0000000 --- a/utils.py +++ /dev/null @@ -1,34 +0,0 @@ -from typing import Dict, List - -import cv2 -import base64 - -import numpy as np - - -def frame_to_base64(frame, format="JPEG"): - """将 OpenCV 读取的图片帧转换为 Base64 编码的字符串""" - # 将图片帧编码为 JPEG 或 PNG 格式 - if format.upper() == "JPEG": - encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 80] # JPEG 压缩质量 - elif format.upper() == "PNG": - encode_param = [int(cv2.IMWRITE_PNG_COMPRESSION), 9] # PNG 压缩级别 - else: - raise ValueError("Unsupported format. Use 'JPEG' or 'PNG'.") - - result, encoded_frame = cv2.imencode(f".{format.lower()}", frame, encode_param) - if not result: - raise ValueError("Failed to encode frame.") - - # 将编码后的字节流转换为 Base64 字符串 - base64_string = base64.b64encode(encoded_frame).decode("utf-8") - return base64_string - - - -# 自定义编码器和解码器 -def encode_perspective(value: np.ndarray) -> List[List[float]]: - return value.tolist() - -def decode_perspective(value: List[List[float]]) -> np.ndarray: - return np.array(value) \ No newline at end of file diff --git a/标靶识别video.py b/videoDetection.py similarity index 67% rename from 标靶识别video.py rename to videoDetection.py index 7e08770..1af821a 100644 --- a/标靶识别video.py +++ b/videoDetection.py @@ -1,4 +1,3 @@ -import gc from datetime import datetime import json import queue @@ -6,7 +5,6 @@ import time from time import sleep import cv2 import numpy as np -import sys import logging import configSer @@ -14,11 +12,12 @@ import models.target import models.sampleMsg import upload.DataReporter import utils +import videoPush from models.msg import Msg logging.basicConfig(level=logging.DEBUG) drawing: bool = False # 是否正在绘制 -sigExit: bool = False # 是否退出 +is_video_mode: bool = False # 是否采用标靶图覆盖 # 定义点 start_point: models.target.Point end_point: models.target.Point @@ -68,11 +67,12 @@ class VideoProcessor: reporter: upload.DataReporter.DataReporter capture: cv2.VideoCapture is_opened: bool= False + is_running = True def __init__(self, reporter:upload.DataReporter.DataReporter): self.reporter = reporter def on_data(self,msg:Msg): - global configObj + global configObj,is_video_mode logging.info(f"msg={msg}") match msg.cmd: case "getPoints": @@ -87,8 +87,8 @@ class VideoProcessor: v=msg.values ts=v["targets"] - # # 清空原配置 - # configObj.config_info.targets={} + # 清空原配置 + configObj.config_info.targets={} for _,t in ts.items(): t_str=json.dumps(t) new_c_target = models.target.CircleTarget.from_json(t_str) @@ -104,7 +104,7 @@ class VideoProcessor: self.reporter.adjust_rate(fps,"image") configObj.config_info.fps.video = fps configObj.save2json_file() - resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True}) + resp_msg = models.msg.Msg(_from="dev", cmd="videoFps", values={"operate": True}) resp_json = resp_msg.to_json() return resp_json case "dataFps": @@ -113,14 +113,21 @@ class VideoProcessor: self.reporter.adjust_rate(fps,"data") configObj.config_info.fps.data=fps configObj.save2json_file() - resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True}) + resp_msg = models.msg.Msg(_from="dev", cmd="dataFps", values={"operate": True}) resp_json = resp_msg.to_json() return resp_json case "setCap": v = msg.values cap = v["cap"] self.switch_video(cap) - resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True}) + resp_msg = models.msg.Msg(_from="dev", cmd="setCap", values={"operate": True}) + resp_json = resp_msg.to_json() + return resp_json + case "videoMode": + v = msg.values + is_debug = v["debug"] + is_video_mode=is_debug + resp_msg = models.msg.Msg(_from="dev", cmd="videoMode", values={"operate": True}) resp_json = resp_msg.to_json() return resp_json print("==") @@ -133,16 +140,20 @@ class VideoProcessor: self.enqueue_image(all_img) def draw_rectangle(self,img): - global configObj + global configObj,is_video_mode gray_frame = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - + now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] #图像发送 - now_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - self.pre_handler_img(gray_frame,now_str) + if configObj.config_info.fps.video > 0: + self.pre_handler_img(gray_frame,now_str) if len(configObj.config_info.targets)==0: return + + # 基点 + base_point_pix:models.target.Point=None + #上报有新数据的点 all_upload_data = models.sampleMsg.AllSensorData(data=[], time=now_str) # 绘图-历史点 @@ -154,23 +165,48 @@ class VideoProcessor: tr.info.rectangle_area.x+tr.info.rectangle_area.w, tr.info.rectangle_area.y+tr.info.rectangle_area.h) #绘制标靶区域 - cv2.rectangle(img,tuple(_start_point), tuple(_end_point), (255, 0, 0), 2) + blue_color = (255, 0, 0) + if tr.info.base: + blue_color=(200, 0, 200) #紫红色 基点 + + cv2.rectangle(img,tuple(_start_point), tuple(_end_point), blue_color, 2) + label=f"{tr.info.desc},r={tr.info.radius}" + cv2.putText(img, label, (_start_point.x,_start_point.y-6), cv2.FONT_HERSHEY_SIMPLEX, 0.6, blue_color,1) #检测 + # 获取图像尺寸 + frame_height, frame_width = gray_frame.shape[:2] + if _end_point.x>frame_width or _end_point.y>frame_height: + print(f"标靶[{tr.info.desc}]sub_image 超出区域") + continue sub_image = self.extract_sub_image(gray_frame, _start_point, _end_point) + ret, sub_binary_frame = cv2.threshold(sub_image, tr.info.threshold.binary, 255, cv2.THRESH_BINARY) # 高斯滤波 - sub_binary_frame = cv2.GaussianBlur(sub_binary_frame, (tr.info.threshold.gauss, tr.info.threshold.gauss), 10,borderType=cv2.BORDER_REPLICATE) - cv2.imshow(f'{tr.info.id}_binaryImg', sub_binary_frame) + gauss_size=tr.info.threshold.gauss + sub_binary_frame = cv2.GaussianBlur(sub_binary_frame, (gauss_size, gauss_size), sigmaX=0,sigmaY=0,borderType=cv2.BORDER_REPLICATE) + # sub_binary_frame = cv2.bilateralFilter(sub_binary_frame, 5, 50, 50) + if tr.perspective is not None: + # 宽度 + sub_width = sub_binary_frame.shape[1] + # 高度 + sub_height = sub_binary_frame.shape[0] + + sub_binary_frame = cv2.warpPerspective(sub_binary_frame, tr.perspective, (sub_width, sub_height)) + # 覆盖原图 - # sub_c_img= cv2.cvtColor(sub_binary_frame, cv2.COLOR_GRAY2BGR) - # self.cover_sub_image(img,sub_c_img, _start_point, _end_point) + if is_video_mode: + sub_c_img= cv2.cvtColor(sub_binary_frame, cv2.COLOR_GRAY2BGR) + self.cover_sub_image(img,sub_c_img, _start_point, _end_point) + + if __debug__: + cv2.imshow(f'{tr.info.id}_binaryImg', sub_binary_frame) circles = self.circle2_detect(sub_binary_frame) if len(circles) == 0: continue - center,radius_pix=self.circle_show(img,circles,_start_point) + center,radius_pix=self.circle_match(circles,_start_point) # 纪录圆心位置 if tr.handler_info is None: tr.handler_info= models.target.HandlerInfo() @@ -178,53 +214,89 @@ class VideoProcessor: tr.handler_info.is_init=False tr.handler_info.center_init = center - tr.handler_info.center_point=center + # 数据平滑处理 + smooth_center = tr.handler_info.enqueue_center_point(center) + # print(f"{tr.info.desc},平滑len={len(tr.handler_info.center_point_queue)},平滑中心点={smooth_center},原始点={center}") + tr.handler_info.center_point=smooth_center + # 原始处理 tr.handler_info.center_point=center tr.handler_info.radius_pix=radius_pix - tr.circle_displacement() + tr.circle_displacement_pix() + + # 基点 + if tr.info.base: + base_point_pix=tr.handler_info.displacement_pix + + # 画圆 + self.circle_show(img,smooth_center,radius_pix,_start_point,_end_point) + + # 基于像素点计算 物理距离 + for i, tr in configObj.config_info.targets.items(): + + if tr.handler_info is None: + continue + if tr.handler_info.displacement_pix is None: + continue + # 减去基点偏移 + if base_point_pix is not None: + raw_point=tr.handler_info.displacement_pix + tr.handler_info.displacement_pix=models.target.Point( + x=raw_point.x-base_point_pix.x, + y=raw_point.y-base_point_pix.y) + if not tr.info.base: + # print(f"[{tr.info.id}]{tr.info.desc} 原偏 {raw_point} - 基偏{base_point_pix} ={tr.handler_info.displacement_pix}") + pass + tr.circle_displacement_phy() + if tr.handler_info.displacement_phy is not None: + all_upload_data.data.append( + models.sampleMsg.SensorData( + str(tr.info.id), + tr.handler_info.displacement_phy.x, + tr.handler_info.displacement_phy.y) + ) + tr.handler_info.displacement_pix=None + tr.handler_info.displacement_phy = None - all_upload_data.data.append( - models.sampleMsg.SensorData( - str(tr.info.id), - tr.handler_info.displacement_phy.x, - tr.handler_info.displacement_phy.y) - ) #过滤无效空数据 if len(all_upload_data.data)==0: return - self.enqueue_data(all_upload_data) + if configObj.config_info.fps.data > 0: + self.enqueue_data(all_upload_data) - def circle_show(self,img, circles, relative_point:models.target.Point): - font = cv2.FONT_HERSHEY_SIMPLEX - color = (255, 0, 0) # 蓝色 - scale = 0.5 + + def circle_match(self,circles, rect_s_point:models.target.Point)-> (models.target.Point,float): circle = max(circles, key=lambda c: c[2]) # 绘制圆心 - center = (circle[0] + relative_point.x, circle[1] + relative_point.y) + center = (circle[0] + rect_s_point.x, circle[1] + rect_s_point.y) + radius = float(np.round(circle[2], 3)) + cp = models.target.Point(x=center[0], y=center[1]) + return cp,radius + + def circle_show(self, img, center: models.target.Point,radius:float, rect_s_point: models.target.Point,rect_e_point: models.target.Point): + + font = cv2.FONT_HERSHEY_SIMPLEX + color = (255, 0, 0) # 蓝色 + scale = 0.5 center_int = tuple(int(x) for x in center) - cv2.circle(img, center_int, 2, (0, 255, 0), 4) - radius = np.round(circle[2], 3) radius_int = int(radius) + cv2.circle(img, center_int, 2, (0, 255, 0), 4) # 绘制外圆 - cv2.circle(img, center_int, radius_int, (0, 0, 255), 2) + cv2.circle(img, center_int, radius_int, (0, 0, 255), 1) # 打印圆心坐标 - text1 = f"center:{circle}" - text2 = f"r:{radius}" - txt_location = (center_int[0] + radius_int, center_int[1] + radius_int // 2) - cv2.putText(img, text1, txt_location, font, scale, color, 2) - - cp = models.target.Point(x=center[0], y=center[1]) - return cp,radius + text1 = f"c:{(center.x,center.y,radius)}" + txt_location = (rect_s_point.x+2,rect_e_point.y-2) + # txt_location = (center_int[0] - radius_int, center_int[1] + radius_int + 10) + cv2.putText(img, text1, txt_location, font, scale, color, 1) def circle2_detect(self,img): # 圆心距 canny阈值 最小半径 最大半径 - circles_float = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 1.5, 30, param1=300, param2=0.9, minRadius=15, + circles_float = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 1, 30, param1=200, param2=0.8, minRadius=15, maxRadius=0) # 创建一个0行, 2列的空数组 if circles_float is not None: @@ -255,20 +327,21 @@ class VideoProcessor: def open_video(self,video_id): + sleep(1) print(f"打开摄像头 -> {video_id}") self.capture = cv2.VideoCapture(video_id) frame_width = int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(f"默认分辨率= {frame_width}*{frame_height}") logging.info(f"{video_id}地址->{self.capture}") + if not self.capture.isOpened(): + self.capture.release() + logging.info(f"无法打开摄像头{video_id}, release地址 -> {self.capture}") + return fps = self.capture.get(cv2.CAP_PROP_FPS) print(f"fps={fps},video_id={video_id},") # self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, 1600) # 宽度 # self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 900) # 高度 - if not self.capture.isOpened(): - self.capture.release() - logging.info(f"无法打开摄像头{video_id},release 地址 -> {self.capture}") - return self.is_opened=True def switch_video(self,video_id:str): @@ -284,10 +357,11 @@ class VideoProcessor: def show_video(self): global sigExit,start_point, end_point, drawing - cv2.namedWindow('Frame') - cv2.setMouseCallback('Frame', add_rectangle) + if __debug__: + cv2.namedWindow('Frame') + cv2.setMouseCallback('Frame', add_rectangle) # 读取一帧图像 - while True: + while self.is_running: if not self.is_opened: print(f"摄像头 标记is_opened={self.is_opened}") sleep(5) @@ -300,17 +374,19 @@ class VideoProcessor: sleep(1) # self.capture.release() # self.capture= cv2.VideoCapture(0) # 再次尝试 - if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 - break - if sigExit: - break + cv2.waitKey(1) # 显示图像 if frame is not None: - cv2.imshow('Frame', frame) + if __debug__: + cv2.imshow('Frame', frame) + #缓存到推流 + videoPush.update_latest_frame(frame) + print("退出VideoProcessor") def show_image(self,frame): global start_point, end_point, drawing - cv2.namedWindow('Frame') - cv2.setMouseCallback('Frame', add_rectangle) + if __debug__: + cv2.namedWindow('Frame') + cv2.setMouseCallback('Frame', add_rectangle) # 读取一帧图像 while True: cp_img=frame.copy() @@ -328,9 +404,6 @@ class VideoProcessor: cv2.rectangle(frame, tuple(start_point), tuple(end_point), (0, 200, 200), 4) # print(f"鼠标位置 {start_point} -> {end_point}") - # 读取图像 - #img_copy = img.copy() # 复制图像用于还原 - def image_mode(self): img_raw=cv2.imread('images/trans/_4point.jpg')#images/target/rp80max3.jpg # img_raw = cv2.imread('images/trans/_4point.jpg') # images/target/rp80max3.jpg @@ -342,7 +415,6 @@ class VideoProcessor: if str.isdigit(video_id): video_id=int(video_id) self.open_video(video_id) - # if self.is_opened: self.show_video() # 释放摄像头资源并关闭所有窗口 print("退出 video") @@ -383,15 +455,9 @@ class VideoProcessor: pass #self.reporter.image_dropped += 1 - #数据广播 -def check_exit(sig, frame): - global sigExit - logging.info(f"收到退出信号 sig={sig}") - sigExit=True - sleep(1) - logging.info("程序退出") - sys.exit(0) - + def stop(self): + self.is_running=False + self.capture.release() diff --git a/videoPush.py b/videoPush.py new file mode 100644 index 0000000..48a74e5 --- /dev/null +++ b/videoPush.py @@ -0,0 +1,60 @@ +from time import sleep + +import cv2 +import threading +import time + +import numpy as np +import requests +from flask import Flask, Response, render_template_string, request + +import utils + +app_flask = Flask(__name__) + +# 全局变量,用于存储最新的帧 +latest_frame:np.ndarray = None +lock = threading.Lock() +is_running = True + +def update_latest_frame(n_bytes:np.ndarray): + global latest_frame + latest_frame=n_bytes + +def generate_mjpeg(): + """生成MJPEG格式的视频流""" + while is_running: + with lock: + if latest_frame is None: + continue + _,latest_frame_bytes = utils.frame_to_img(latest_frame) + frame = latest_frame_bytes.tobytes() + pass + sleep(0.1) + yield (b'--frame\r\n' + b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') + + +@app_flask.route('/video_flow') +def video_feed(): + """视频流路由""" + return Response( + generate_mjpeg(), + mimetype='multipart/x-mixed-replace; boundary=frame' + ) + +def run(): + port=2240 + print(f"推流服务启动,访问端口 127.0.0.1:{port}/video_flow") + app_flask.run(host='0.0.0.0', port=port, threaded=True) + +def video_server_run(): + thread = threading.Thread(target=run) + thread.daemon = True # 设置为守护线程,主线程退出时自动终止 + thread.start() +def stop(): + global is_running + is_running=False + +if __name__ == '__main__': + run() \ No newline at end of file diff --git a/三维圆.py b/三维圆.py deleted file mode 100644 index 8f13e7f..0000000 --- a/三维圆.py +++ /dev/null @@ -1,164 +0,0 @@ -import math - -import cv2 -import numpy as np -import matplotlib.pyplot as plt -import matplotlib -matplotlib.use('TkAgg') - -elevation=0 -azimuth=0 -def circle3d(): - global elevation,azimuth - # 创建一个新的图和一个3D坐标轴 - fig = plt.figure() - ax = fig.add_subplot(111, projection='3d') - - # 定义圆的参数 - radius = 1 # 半径 - theta = np.linspace(0, 2 * np.pi, 100) # 参数角度 - x = radius * np.cos(theta) # x坐标 - y = radius * np.sin(theta) # y坐标 - z = np.zeros_like(theta) # z坐标(这里圆在xy平面上) - - # 绘制圆 - # ax.plot(x, y, z, label='3D Circle', color='b') - - # 绘制圆的正俯视图 - # 正视图(xz平面) - ax.plot(x, np.zeros_like(theta), z, label='z View', color='r') - ax.plot(np.zeros_like(theta), y, z, label='h View', color='b') - # 俯视图(xy平面) - ax.plot(x, y, np.zeros_like(theta), label='Top View', color='g') - - # 设置坐标轴范围 - ax.set_xlim([-2, 2]) - ax.set_ylim([-2, 2]) - ax.set_zlim([-2, 2]) - ax.set_xlabel('X') - ax.set_ylabel('Y') - ax.set_zlabel('Z') - # 添加图例 - ax.legend() - # 设置初始视角 - # ax.view_init(elev=53, azim=-48,roll=-38) # 俯仰角30度,方位角45度 - ax.view_init(elev=90, azim=0, roll=0) # 俯仰角30度,方位角45度 - # 隐藏整个坐标轴 - # ax.axis('off') - # 设置窗口透明度 - fig.canvas.manager.window.attributes('-alpha', 0.6) # 设置窗口透明度为 0.6 - # 显示图形 - plt.show() - # input("请用鼠标转动 调整角度 elevation and azimuth: ") - # 获取当前的 elevation 和 azimuth - elevation = ax.elev - azimuth = ax.azim - - -def perspective_transform_by_angle(img, _elevation, _azimuth): - h, w = img.shape[:2] - - # 根据角度计算目标点位置 - fov = math.radians(45) # 假设视场角45度 - dz = 1 / math.tan(_elevation) - dx = dz * math.tan(_azimuth) - - # 源图像四个角点(原始斜视图) - src_points = np.float32([[0, 0], [w, 0], [w, h], [0, h]]) - - # 计算目标点(俯视图) - dst_points = np.float32([ - [w * 0.5 * (1 - dx), h * 0.5 * (1 - dz)], # 左上 - [w * 0.5 * (1 + dx), h * 0.5 * (1 - dz)], # 右上 - [w * 0.5 * (1 + dx), h * 0.5 * (1 + dz)], # 右下 - [w * 0.5 * (1 - dx), h * 0.5 * (1 + dz)] # 左下 - ]) - - # 获取变换矩阵并应用 - M = cv2.getPerspectiveTransform(src_points, dst_points) - transformed_image=cv2.warpPerspective(img, M, (h,w)) - # 显示原始图像和变换后的图像 - fig, ax = plt.subplots(1, 2,figsize= (12, 6)) - ax[0].imshow(img) - ax[0].set_title("Original Image") - ax[1].imshow(transformed_image) - ax[1].set_title("Transformed Image") - plt.show() - - -def trans_img(image_path ,x,y): - global elevation, azimuth - image = cv2.imread(image_path) - image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - - # 获取图像尺寸 - height, width = image.shape[:2] - - center_x, center_y = 532,385 #圆心 - # 定义源图像的四个点(假设为矩形) - sx1=center_x-100 - sx2=center_x+100 - sy1=center_y-100 - sy2=center_y + 100 - src_points = np.float32([ - [sx1, sy1], # 左上 - [sx2, sy1], # 右上 - [sx2, sy2], # 右下 - [sx1, sy2] # 左下 - ]) - # src_points = np.float32([ - # [center_x, sy1], # 上 - # [sx2, center_y], # 右 - # [center_x, sy2], # 下 - # [sx1, center_y] # 左 - # ]) - # 根据 elevation 和 azimuth 计算目标点 - # 这里是一个简化的计算方法,可以根据实际需求调整 - # 假设目标点在透视变换后的坐标 - # 将斜视的画面变成俯视的画面 - radius = 100 - - # 根据俯仰角和方位角计算目标点的偏移 - offset_x = radius * np.sin(elevation) * np.cos(azimuth) - offset_y = radius * np.sin(elevation) * np.sin(azimuth) - offset_z = radius * np.cos(elevation) - print(f"offset_x={offset_x},offset_y={offset_y}") - # 计算目标点 - # dst_points = np.float32([ - # [0 - offset_x, 0 - offset_y], # 左上 - # [width + offset_x, 0 - offset_y], # 右上 - # [width + offset_x, height + offset_y], # 右下 - # [0 - offset_x, height + offset_y] # 左下 - # ]) - dst_points = np.float32([ - [sx1- offset_x, sy1- offset_y], # 上 - [sx2 + offset_x, sy1 - offset_y], # 右上 - [sx2 + offset_x, sy2+ offset_y], # 右下 - [sx1 - offset_x, sy2 + offset_y] # 下 - ]) - # 计算透视变换矩阵 - matrix = cv2.getPerspectiveTransform(src_points, dst_points) - - # 应用透视变换 - transformed_image = cv2.warpPerspective(image, matrix, (width, height)) - - # 显示原始图像和变换后的图像 - fig, ax = plt.subplots(1, 2,figsize= (12, 6)) - ax[0].imshow(image) - ax[0].set_title("Original Image") - ax[1].imshow(transformed_image) - ax[1].set_title("Transformed Image") - plt.show() - -if __name__ == '__main__': - circle3d() - print("测试============") - print(f"elevation: {elevation}") - print(f" azimuth: {azimuth}") - img_path="images/trans/subRawImg.jpg" - rawimg=cv2.imread(img_path) - cv2.imshow("raw Image", rawimg) - # trans_img(img_path,elevation,azimuth) - elev = math.radians(elevation) - azim = math.radians(azimuth) - perspective_transform_by_angle(rawimg, elev,azim) \ No newline at end of file diff --git a/标靶识别.py b/标靶识别.py deleted file mode 100644 index a292278..0000000 --- a/标靶识别.py +++ /dev/null @@ -1,254 +0,0 @@ -import json -from time import sleep -from typing import Dict -from dataclasses import asdict -import cv2 -import numpy as np -import signal -import sys -import threading - -import models.target -import tcp_Ser -# 定义全局变量 -drawing = False # 是否正在绘制 -start_point=models.target.Point -end_point = models.target.Point -target_rectangle_dict:Dict[int,models.target.CircleTarget]={} -sigExit=False # 是否退出 -#数据广播 -sig_broadcast=True -tcp = tcp_Ser.TcpSer("127.0.0.1", 2230) -myb=threading.Thread(target=tcp.run) -myb.start() -def check_exit(sig, frame): - global sigExit - print(f"收到退出信号 sig={sig}") - sigExit=True - sleep(1) - print("程序退出") - sys.exit(0) - -#鼠标回调函数 -def add_rectangle(event, x, y, flags, param): - global start_point, end_point, drawing - - if event == cv2.EVENT_LBUTTONDOWN: # 左键按下 - print("左键按下") - start_point = models.target.Point(x,y) - end_point = start_point - drawing = True - elif event == cv2.EVENT_MOUSEMOVE: # 鼠标移动 - if drawing: - end_point = models.target.Point(x,y) - elif event == cv2.EVENT_LBUTTONUP: # 左键抬起 - print("左键抬起") - drawing = False - end_point = models.target.Point(x,y) - if start_point==end_point: - return - distance = cv2.norm(tuple(start_point), tuple(end_point), cv2.NORM_L2) - if distance<20: - print("距离小于20,无效区域") - return - target_id=len(target_rectangle_dict) - # 圆标靶半径 mm - radius= 20.0 - area=models.target.RectangleArea(start_point.x,start_point.y,end_point.x-start_point.x,end_point.y-start_point.y) - new_target=models.target.CircleTarget(target_id,area,radius) - print(f"新增区域[{target_id}] => {start_point, end_point}") - target_rectangle_dict[target_id] = new_target - - - - -def draw_rectangle(img): - gray_frame = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - - ret, gray_frame = cv2.threshold(gray_frame, 120, 255, cv2.THRESH_BINARY) - # 高斯滤波 - gray_frame = cv2.GaussianBlur(gray_frame, (5, 5), 1) - cv2.imshow('binaryImg', gray_frame) - - if len(target_rectangle_dict)==0: return - - #上报有新数据的点 - once_upload:Dict[int,models.target.CircleTarget]={} - - # 绘图-历史点 - for i, tr in target_rectangle_dict.items(): - _start_point = tr.rectangle_area[0] - _end_point = tr.rectangle_area[1] - #绘制标靶区域 - cv2.rectangle(img,tuple(_start_point), tuple(_end_point), (255, 0, 0), 2) - #检测 - sub_image = extract_sub_image(gray_frame, _start_point, _end_point) - circles = circle2_detect(sub_image) - if len(circles) == 0: - continue - center,radius=circle_show(img,circles,_start_point) - # 纪录圆心位置 - tr.center_point=center - tr.radius_pix=radius - if tr.is_init: - tr.center_init=tr.center_point - tr.is_init=False - tar = tr.circle_displacement() - msg=f"[{tar.id}]displacement_pix={tar.displacement_pix},displacement_phy={tar.displacement_phy}" - print(msg) - once_upload[tr.id]=tr - - - #过滤无效空数据 - if len(once_upload.items())==0: return - json_str = json.dumps( - {k:asdict(v) for k, v in once_upload.items() if v.is_init==False} - ) - print(f"标靶数据={json_str}",json_str) - if sig_broadcast: - tcp.broadcast_message(json_str) - - -def circle_show(img, circles, relative_point:models.target.Point): - font = cv2.FONT_HERSHEY_SIMPLEX - color = (255, 0, 0) # 蓝色 - scale = 0.5 - - circle = max(circles, key=lambda c: c[2]) - # print("画圆", circle) - # 绘制圆心 - center = (circle[0] + relative_point.x, circle[1] + relative_point.y) - center_int = tuple(int(x) for x in center) - cv2.circle(img, center_int, 2, (0, 255, 0), 4) - radius = np.round(circle[2], 3) - radius_int = int(radius) - # 绘制外圆 - cv2.circle(img, center_int, radius_int, (0, 0, 255), 2) - # 打印圆心坐标 - - text1 = f"center:{circle}" - text2 = f"r:{radius}" - txt_location = (center_int[0] + radius_int, center_int[1] + radius_int // 2) - cv2.putText(img, text1, txt_location, font, scale, color, 2) - - cp=models.target.Point(x=center[0], y=center[1]) - return cp,radius - - -def circle2_detect(img): - # 圆心距 canny阈值 最小半径 最大半径 - circles_float = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 1.5, 30, param1=300, param2=0.9, minRadius=15, - maxRadius=0) - # 创建一个0行, 2列的空数组 - if circles_float is not None: - num_circles = circles_float.shape[1] # 获取检测到的圆的数量 - print("圆的数量", num_circles) - # 提取圆心坐标(保留2位小数) - centers = [(round(float(x),2), round(float(y),2), round(float(r),2)) for x, y, r in circles_float[0, :]] - return centers - else: - return [] - - -def extract_sub_image(frame, top_left, bottom_right): - """ - 从帧中截取子区域 - :param frame: 输入的视频帧 - :param top_left: 子图片的左上角坐标 (x1, y1) - :param bottom_right: 子图片的右下角坐标 (x2, y2) - :return: 截取的子图片 - """ - x1, y1 = top_left - x2, y2 = bottom_right - return frame[y1:y2, x1:x2] - - -def open_video(video_id): - cap = cv2.VideoCapture(video_id) - # cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1600) # 宽度 - # cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 900) # 高度 - if not cap.isOpened(): - print("无法打开摄像头") - exit() - return cap - - -def show_video(cap): - global sigExit,start_point, end_point, drawing - cv2.namedWindow('Frame') - cv2.setMouseCallback('Frame', add_rectangle) - # 读取一帧图像 - while True: - ret, frame = cap.read() - if ret: - frame_handle(frame) - else: - print("无法读取帧") - if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 - break - if sigExit: - break -def show_image(frame): - global start_point, end_point, drawing - cv2.namedWindow('Frame') - cv2.setMouseCallback('Frame', add_rectangle) - # 读取一帧图像 - while True: - cp_img=frame.copy() - frame_handle(cp_img) - - if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 - break - cv2.destroyAllWindows() - -def frame_handle(frame): - # 绘图-历史点 - draw_rectangle(frame) - # 绘图-实时 - if drawing: - cv2.rectangle(frame, tuple(start_point), tuple(end_point), (0, 200, 200), 4) - # print(f"鼠标位置 {start_point} -> {end_point}") - # 显示图像 - cv2.imshow('Frame', frame) - -# 读取图像 -#img_copy = img.copy() # 复制图像用于还原 - -def video_mode(video_id): - capture = open_video(video_id) - fps = capture.get(cv2.CAP_PROP_FPS) - print(f"fps={fps}") - show_video(capture) - # 释放摄像头资源并关闭所有窗口 - capture.release() - cv2.destroyAllWindows() - -def image_mode(): - img_raw=cv2.imread('images/trans/_4point.jpg')#images/target/rp80max3.jpg - # img_raw = cv2.imread('images/trans/_4point.jpg') # images/target/rp80max3.jpg - # img_raw = cv2.imread('images/target/rp80.jpg') # images/target/rp80max3.jpg - show_image(img_raw) - -def rtsp_mode(): - # rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554" - rtsp_url ="rtsp://localhost:8554/rtsp" - - capture = open_video(rtsp_url) - fps = capture.get(cv2.CAP_PROP_FPS) - print(f"fps={fps}") - show_video(capture) - # 释放摄像头资源并关闭所有窗口 - capture.release() - cv2.destroyAllWindows() - -if __name__ == '__main__': - signal.signal(signal.SIGINT, check_exit) - - rtsp_mode() - # video_mode(0) - # image_mode() - - cv2.waitKey(0) - cv2.destroyAllWindows() - - diff --git a/缩放比例.py b/缩放比例.py deleted file mode 100644 index d02d3aa..0000000 --- a/缩放比例.py +++ /dev/null @@ -1,30 +0,0 @@ - -import matplotlib.pyplot as plt -import numpy as np - - -# 存储初始范围和当前比例 -initial_limits = {'x': None, 'y': None} -current_scale = 1.0 -fig, ax = plt.subplots() -x = np.linspace(0, 10, 100) -ax.plot(x, np.sin(x)) -def on_press(event): - if event.button == 3: # 右键按下 - initial_limits['x'] = ax.get_xlim() - initial_limits['y'] = ax.get_ylim() - -def on_release(event): - global current_scale - if event.button == 3 and initial_limits['x'] is not None: - new_xlim = ax.get_xlim() - scale_x = (initial_limits['x'][1] - initial_limits['x'][0]) / \ - (new_xlim[1] - new_xlim[0]) - print(f"X轴缩放比例: {scale_x:.2f}倍") - current_scale *= scale_x - print(f"累计总缩放: {current_scale:.2f}倍") - -if __name__ == '__main__': - fig.canvas.mpl_connect('button_press_event', on_press) - fig.canvas.mpl_connect('button_release_event', on_release) - plt.show() diff --git a/透视变换.py b/透视变换.py deleted file mode 100644 index 99c3fde..0000000 --- a/透视变换.py +++ /dev/null @@ -1,62 +0,0 @@ -import cv2 -import numpy as np -import 标靶识别 as ie - - -def perspective_transformation(image): - if image is None: - print("Error: Unable to load image.") - exit() - # 定义源图像中的四个点 - src_points = np.float32([ - [4, 16], # 左上角 - [795, 14], # 右上角 - [1027, 736], # 右下角 - [181, 856] # 左下角 - ]) - # 定义目标图像中的四个点 # 1050 * 900 - dst_points = np.float32([ - [4, 16], # 左上角 - [795, 14], # 右上角 - [795, 850], # 右下角 - [4, 850] # 左下角 - ]) - - # 计算透视变换矩阵 - M = cv2.getPerspectiveTransform(src_points, dst_points) - print(f"原矩阵={M}") - # 逆矩阵 - inverse_matrix = np.linalg.inv(M) - print(f"逆矩阵={inverse_matrix}") - # 应用透视变换 - transformed_image = cv2.warpPerspective(image, M, (1050, 900)) - # 应用透视变换 - inv_transformed_image = cv2.warpPerspective(transformed_image, inverse_matrix, (1050, 900)) - - # 显示原始图像和变换后的图像 - cv2.imshow("Original Image", image) - cv2.imshow("Trans Image", transformed_image) - cv2.imshow("iTrans Image", inv_transformed_image) - cv2.waitKey(0) - cv2.destroyAllWindows() - -def sub_img(): - # 读取图像 - image = cv2.imread("images/target/need_trans.jpg") - if image is None: - print("Error: Unable to load image.") - exit() - # 1050 * 900 - startPoint = [550, 700] - endPoint = [1600, 1600] - # 绘制标靶区域 - # 检测 - subImg = ie.extract_sub_image(image, startPoint, endPoint) - # cv2.imshow("subImg", subImg) - # cv2.waitKey(0) - # cv2.destroyAllWindows() - return subImg - -if __name__ == '__main__': - img=sub_img() - perspective_transformation(img) \ No newline at end of file diff --git a/透视变换手动选取4点.py b/透视变换手动选取4点.py deleted file mode 100644 index dce3026..0000000 --- a/透视变换手动选取4点.py +++ /dev/null @@ -1,149 +0,0 @@ -import cv2 -import numpy as np - -# 全局变量 -points = [] # 存储选择的四个点 -img = None # 存储原始图像 -img_copy = None # 用于绘制的图像副本 - - -def mouse_callback(event, x, y, flags, param): - """鼠标回调函数,用于选择四个点""" - global img_copy, points - - if event == cv2.EVENT_LBUTTONDOWN: - if len(points) < 4: - points.append((x, y)) - print(f"已选择点 {len(points)}: ({x}, {y})") - - # 在图像上绘制点 - cv2.circle(img_copy, (x, y), 5, (0, 255, 0), -1) - - # 如果已经选择了4个点,绘制连线 - if len(points) == 4: - # 按照上、右、下、左的顺序连接点 - for i in range(4): - cv2.line(img_copy, points[i], points[(i + 1) % 4], (0, 255, 0), 2) - - # 标记每个点的位置 - cv2.putText(img_copy, "Top", points[0], cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) - cv2.putText(img_copy, "Right", points[1], cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) - cv2.putText(img_copy, "Bottom", points[2], cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) - cv2.putText(img_copy, "Left", points[3], cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) - - cv2.imshow("Select Points", img_copy) - - -def perspective_transform(image, src_points): - """执行透视变换""" - # 将点排序为上、右、下、左 - top, right, bottom, left = src_points - - # 计算新图像的宽度(取左右边的最大值) - width_a = np.linalg.norm(np.array(right) - np.array(left)) - width_b = np.linalg.norm(np.array(top) - np.array(bottom)) - max_width = max(int(width_a), int(width_b)) - - # 计算新图像的高度(取上下边的最大值) - height_a = np.linalg.norm(np.array(bottom) - np.array(top)) - height_b = np.linalg.norm(np.array(right) - np.array(left)) - max_height = max(int(height_a), int(height_b)) - - # 定义目标点 - dst = np.array([ - [0, 0], # 左上角 - [max_width - 1, 0], # 右上角 - [max_width - 1, max_height - 1], # 右下角 - [0, max_height - 1] # 左下角 - ], dtype="float32") - - # 转换源点数组为numpy数组 - src = np.array([top, right, bottom, left], dtype="float32") - - # 计算透视变换矩阵 - M = cv2.getPerspectiveTransform(src, dst) - - # 执行透视变换 - warped = cv2.warpPerspective(image, M, (max_width, max_height)) - - return warped - -def k_perspective_transform(image, src_points): - """执行透视变换""" - # 将点排序为上、右、下、左 - top, right, bottom, left = src_points - - # 计算新图像的宽度(取左右边的最大值) - sub_zy = np.array(right) - np.array(left) - sub_sx = np.array(top) - np.array(bottom) - - sub_x=sub_sx[0]/2 - print("x差值",sub_sx[0]) - - sub_y = sub_zy[1] / 2 - print("y差值", sub_sx[0]) - - # 定义目标点 - dst = np.array([ - [top[0]-sub_x, top[1]], # 左上角 - [right[0], right[1]- sub_y], # 左上角 - [bottom[0]+sub_x, bottom[1]], # 左上角 - [left[0] , left[1]+ sub_y], # 左上角 - ], dtype="float32") - - # 转换源点数组为numpy数组 - src = np.array([top, right, bottom, left], dtype="float32") - - # 计算透视变换矩阵 - M = cv2.getPerspectiveTransform(src, dst) - - print(f"矩阵M={M}") - - # 执行透视变换 - warped = cv2.warpPerspective(image, M, (1050, 900)) - - return warped - -def main(): - global img, img_copy, points - - # 读取图像 - img = cv2.imread("images/trans/subRawImg.jpg") - if img is None: - print("无法加载图像,请检查路径是否正确") - return - - img_copy = img.copy() - - # 创建窗口并设置鼠标回调 - cv2.namedWindow("Select Points") - cv2.setMouseCallback("Select Points", mouse_callback) - - print("请按顺序点击选择四个点:上、右、下、左") - print("选择完成后按任意键继续...") - - while True: - cv2.imshow("Select Points", img_copy) - key = cv2.waitKey(1) & 0xFF - - # 如果已经选择了4个点,按任意键继续 - if len(points) == 4: - break - - # 执行透视变换 - warped = k_perspective_transform(img, points) - - # 显示结果 - cv2.imshow("Original Image", img) - cv2.imshow("Transformed", warped) - - output_path="images/trans/_4point.jpg" - cv2.imwrite(output_path, warped) - print(f"图像已保存到 {output_path}") - - cv2.waitKey(0) - cv2.destroyAllWindows() - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/通过变换角度获取变换公式.py b/通过变换角度获取变换公式.py deleted file mode 100644 index 316afe7..0000000 --- a/通过变换角度获取变换公式.py +++ /dev/null @@ -1,126 +0,0 @@ -import numpy as np -import matplotlib.pyplot as plt -from mpl_toolkits.mplot3d import Axes3D -import 透视变换 -import cv2 -def build_3d_rotation_matrix(elev, azim, roll): - """生成基于elev/azim/roll的完整旋转矩阵""" - elev_rad = np.radians(elev) - azim_rad = np.radians(azim) - roll_rad = np.radians(roll) - - # 绕x轴旋转(elevation) - Rx = np.array([ - [1, 0, 0], - [0, np.cos(elev_rad), -np.sin(elev_rad)], - [0, np.sin(elev_rad), np.cos(elev_rad)] - ]) - - # 绕y轴旋转(azimuth) - Ry = np.array([ - [np.cos(azim_rad), 0, np.sin(azim_rad)], - [0, 1, 0], - [-np.sin(azim_rad), 0, np.cos(azim_rad)] - ]) - - # 绕z轴旋转(roll) - Rz = np.array([ - [np.cos(roll_rad), -np.sin(roll_rad), 0], - [np.sin(roll_rad), np.cos(roll_rad), 0], - [0, 0, 1] - ]) - - return Rz @ Ry @ Rx # 组合旋转顺序 - - - -if __name__ == '__main__': - # 参数设置 - elev, azim, roll = 34, 0,-35 - rotation_3d = build_3d_rotation_matrix(elev, azim, roll) - - # 创建单位圆 - theta = np.linspace(0, 2 * np.pi, 100) - x = np.cos(theta) - y = np.sin(theta) - z = np.zeros_like(x) - circle = np.vstack((x, y, z)) # 3xN - - circle_transformed = rotation_3d @ circle - # 逆矩阵 - inverse_matrix = np.linalg.inv(rotation_3d) - # 画图 - fig = plt.figure(figsize=(12, 6)) - # 设置窗口透明度 - fig.canvas.manager.window.attributes('-alpha', 0.6) # 设置窗口透明度为 0.6 - ax1 = fig.add_subplot(121, projection='3d') - ax2 = fig.add_subplot(122, projection='3d') - # 标识圆心 - ax1.scatter(0, 0, 0, color='red', s=20, label='Center') - # 原始单位圆 - ax1.plot(circle[0], circle[1], circle[2], label='Original Circle') - # 原始单位圆 横轴和纵轴 - ax1.plot(circle[0], np.zeros_like(theta), circle[2], label='z View', color='r') - ax1.plot(np.zeros_like(theta), circle[1], circle[2], label='h View', color='b') - ax1.set_title('Original Circle (elev=90°, roll=0°)') - ax1.set_xlim([-1, 1]) - ax1.set_ylim([-1, 1]) - ax1.set_zlim([-1, 1]) - ax1.set_box_aspect([1, 1, 1]) - ax1.set_xlabel('X') - ax1.set_ylabel('Y') - ax1.set_zlabel('Z') - ax1.view_init(elev=90, azim=90) # 从Z轴方向观察 保持opencv方向一致 x->左 y->下 - # 变换后的单位圆 - # 标识圆心 - ax2.scatter(0, 0, 0, color='red', s=20, label='Center') - ax2.plot(circle_transformed[0], circle_transformed[1], circle_transformed[2], color='r', label='Transformed Circle') - ax2.set_title('Transformed (elev=52°, roll=-35°)') - ax2.set_xlim([-1, 1]) - ax2.set_ylim([-1, 1]) - ax2.set_zlim([-1, 1]) - ax2.set_xlabel('X') - ax2.set_ylabel('Y') - ax2.set_zlabel('Z') - ax2.set_box_aspect([1, 1, 1]) - ax2.view_init(elev=90, azim=90) # 从Z轴方向观察 - plt.show() - - image_zhen= cv2.imread("images/trans/transformed_image.jpg") - image_xie = cv2.imread("images/trans/subRawImg.jpg") - # transformed_image_hy = cv2.warpPerspective(transformed_image, inverse_matrix, dsize=(image.shape[1], image.shape[0])) - # # 执行变换(自动计算输出图像尺寸) - # 裁剪像素值到 [0, 255] 范围 - - # 获取图像的大小 - height, width = image_zhen.shape[:2] - # 计算中心点坐标 - center_x = width // 2 - center_y = height // 2 - # 构造一个平移矩阵,将原点移到中心 - M_translate = np.float32([ - [1, 0, -1*center_x], - [0, 1, -1*center_y], - [0, 0, 1] - ]) - image_xie_padding = cv2.warpPerspective(image_xie, M_translate, - dsize=(image_xie.shape[1], image_xie.shape[0])) - cv2.imshow("image_xie_padding", image_xie_padding) - # 将平移矩阵与目标变换矩阵结合起来 - # inverse_M_combined = np.dot(M_translate, inverse_matrix) - inverse_matrix[2][2]=1.0 - inverse_M_combined = np.dot(M_translate, inverse_matrix) - print(f"斜矩阵={rotation_3d}") - rotation_3d_int = np.clip(rotation_3d, 0, 255) - print(f"斜矩阵_int={rotation_3d}") - print(f"逆矩阵={inverse_M_combined}") - inverse_M_combined_int = np.clip(inverse_M_combined, 0, 255) - # print(f"逆矩阵_int={inverse_M_combined_int}") - transformed_image_hy = cv2.warpPerspective(image_xie, inverse_M_combined_int, - dsize=(image_xie.shape[1]*2, image_xie.shape[0]*2)) - cv2.imshow("transformed_image_hy", transformed_image_hy) - # transformed_image = cv2.warpPerspective(image_zhen, rotation_3d_int,dsize=(image_zhen.shape[1], image_zhen.shape[0])) - # cv2.imshow("rotated_img", transformed_image) - plt.show() - cv2.waitKey(0) - # cv2.destroyAllWindows() \ No newline at end of file diff --git a/通过变换角度获取变换公式deepseek.py b/通过变换角度获取变换公式deepseek.py deleted file mode 100644 index ad92130..0000000 --- a/通过变换角度获取变换公式deepseek.py +++ /dev/null @@ -1,163 +0,0 @@ -import cv2 -import numpy as np -from math import cos, sin, radians - - -def get_rotation_matrix(angle_x, angle_y, angle_z): - """生成3D旋转矩阵""" - # 转换为弧度 - rx = radians(angle_x) - ry = radians(angle_y) - rz = radians(angle_z) - - # X轴旋转矩阵 - mat_x = np.array([ - [1, 0, 0], - [0, cos(rx), -sin(rx)], - [0, sin(rx), cos(rx)] - ]) - - # Y轴旋转矩阵 - mat_y = np.array([ - [cos(ry), 0, sin(ry)], - [0, 1, 0], - [-sin(ry), 0, cos(ry)] - ]) - - # Z轴旋转矩阵 - mat_z = np.array([ - [cos(rz), -sin(rz), 0], - [sin(rz), cos(rz), 0], - [0, 0, 1] - ]) - - # 组合旋转矩阵 - rotation_matrix = np.dot(np.dot(mat_x, mat_y), mat_z) - return rotation_matrix - - -def perspective_transform(image, angle_x=0, angle_y=0, angle_z=0, scale=1.0): - """应用透视变换""" - h, w = image.shape[:2] - - # 获取旋转矩阵 - rotation_matrix = get_rotation_matrix(angle_x, angle_y, angle_z) - - # 创建3D点到2D点的映射 - # 将2D图像视为3D空间中Z=0平面上的物体 - points_3d = np.array([ - [0, 0, 0], # 左上 - [w, 0, 0], # 右上 - [w, h, 0], # 右下 - [0, h, 0] # 左下 - ], dtype=np.float32) - - # 应用旋转 - points_3d_rotated = np.dot(points_3d, rotation_matrix.T) - - # 添加透视效果 - 这里简单地将Z坐标作为深度 - # 可以调整这个值来改变透视强度 - points_2d_homo = points_3d_rotated[:, :2] / (scale - points_3d_rotated[:, 2:3] * 0.001) - - # 计算变换矩阵 - src_points = np.array([[0, 0], [w, 0], [w, h], [0, h]], dtype=np.float32) - dst_points = points_2d_homo.astype(np.float32) - - # 计算中心偏移 - min_xy = dst_points.min(axis=0) - max_xy = dst_points.max(axis=0) - dst_points -= min_xy - - # 计算新图像大小 - new_w = int(max_xy[0] - min_xy[0]) - new_h = int(max_xy[1] - min_xy[1]) - - # 获取透视变换矩阵 - M = cv2.getPerspectiveTransform(src_points, dst_points) - - # 应用变换 - transformed = cv2.warpPerspective(image, M, (new_w, new_h)) - - return transformed - - -def combined_perspective_transform(image, angle_x1, angle_y1, angle_z1, angle_y2, scale1=1.0, scale2=1.0): - """合并两次变换:第一次任意旋转,第二次Y轴旋转""" - h, w = image.shape[:2] - - # 第一次旋转矩阵 - rot1 = get_rotation_matrix(angle_x1, angle_y1, angle_z1) - - # 第二次旋转矩阵 (Y轴旋转) - rot2 = get_rotation_matrix(0, angle_y2, 0) - - # 合并旋转矩阵 - combined_rot = np.dot(rot2, rot1) - - # 创建3D点到2D点的映射 - points_3d = np.array([ - [0, 0, 0], # 左上 - [w, 0, 0], # 右上 - [w, h, 0], # 右下 - [0, h, 0] # 左下 - ], dtype=np.float32) - - # 应用合并后的旋转 - points_3d_rotated = np.dot(points_3d, combined_rot.T) - - # 添加透视效果 - points_2d_homo = points_3d_rotated[:, :2] / ((scale1 * scale2) - points_3d_rotated[:, 2:3] * 0.001) - - # 计算变换矩阵 - src_points = np.array([[0, 0], [w, 0], [w, h], [0, h]], dtype=np.float32) - dst_points = points_2d_homo.astype(np.float32) - - # 计算中心偏移 - min_xy = dst_points.min(axis=0) - max_xy = dst_points.max(axis=0) - dst_points -= min_xy - - # 计算新图像大小 - new_w = int(max_xy[0] - min_xy[0]) - new_h = int(max_xy[1] - min_xy[1]) - - # 获取透视变换矩阵 - M = cv2.getPerspectiveTransform(src_points, dst_points) - - # 应用变换 - transformed = cv2.warpPerspective(image, M, (new_w, new_h)) - - return transformed - -def show_transformed(): - image = cv2.imread('images/trans/transformed_image.jpg') - # 应用透视变换 - # 参数说明:angle_x, angle_y, angle_z 分别为绕X,Y,Z轴的旋转角度 - # scale 控制透视效果的强度 - transformed = perspective_transform(image, angle_x=34, angle_y=43, angle_z=-35, scale=1) - - # 显示结果 - cv2.imshow('Original', image) - cv2.imshow('Transformed', transformed) - cv2.waitKey(0) - cv2.destroyAllWindows() -def show_transformed_combined(): - image = cv2.imread('images/trans/transformed_image.jpg') - if image is None: - print("请替换为您的图片路径") - else: - # 第一次变换:任意角度 - # 第二次变换:Y轴旋转90度 - final_result = combined_perspective_transform( - image, - angle_x1=34, angle_y1=0, angle_z1=-35, # 第一次旋转参数 - angle_y2=43, # 第二次Y轴旋转90度 - scale1=1.2, scale2=1.0 # 透视参数 - ) - - cv2.imshow('Original', image) - cv2.imshow('Final Result', final_result) - cv2.waitKey(0) - cv2.destroyAllWindows() -if __name__ == '__main__': - show_transformed_combined() \ No newline at end of file diff --git a/霍夫变换_检测圆.py b/霍夫变换_检测圆.py deleted file mode 100644 index da8efbc..0000000 --- a/霍夫变换_检测圆.py +++ /dev/null @@ -1,86 +0,0 @@ -import cv2 -import numpy as np -from cv2 import waitKey - - -imgRaw=cv2.imread('images/target/bowa_target/min.jpg',cv2.IMREAD_GRAYSCALE)#images/target/rp80max3.jpg -imgColor=cv2.imread('images/target/bowa_target/min.jpg') #images/target/rp80max3.jpg - -ret, binary_img = cv2.threshold(imgRaw, 180, 255, cv2.THRESH_BINARY) -#cv2.imshow('binary_img', binary_img) -def circle_detect(img): - #高斯滤波 - #img = cv2.GaussianBlur(img, (3, 3), 1) - #cv2.imshow('gsmh', gaussianBlur) - # 圆心距 canny阈值 最小半径 最大半径 - circlesFloat = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 2, 10, param1=50, param2=0.9, minRadius=10, maxRadius=0) - print("==========") - # 创建一个0行, 2列的空数组 - if circlesFloat is not None: - num_circles = circlesFloat.shape[1] # 获取检测到的圆的数量 - print("圆的数量",num_circles) - # 提取圆心坐标(保留小数) - centers = [(float(x), float(y),float(r)) for x, y, r in circlesFloat[0, :]] - - font = cv2.FONT_HERSHEY_SIMPLEX - color = (255, 0, 0) # 蓝色 - scale = 1 - # 打印圆心坐标 - for center3d in centers: - center=(center3d[0], center3d[1]) - # center_txt=f"{float(center[0]),float(center[1])}" - text=f"center:{center},r:{center3d[2]}" - print(text) - centerInt=tuple(int(x) for x in center) - cv2.putText(img, text, centerInt, font, scale, color,2) - - circles = np.uint16(np.around(circlesFloat)) # 4舍5入, 然后转为uint16 - for i in circles[0, :]: - print("画圆", i) - # 绘制圆心 - center=(i[0], i[1]) - cv2.circle(img, center, 2, (0, 255, 0), 6) - # 绘制外圆 - cv2.circle(img, center, i[2], (0, 0, 255), 2) - -def circle_detect2(img): - #高斯滤波 - #img = cv2.GaussianBlur(img, (3, 3), 1) - #cv2.imshow('gsmh', gaussianBlur) - # 圆心距 canny阈值 最小半径 最大半径 - circlesFloat = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 2, 10, param1=50, param2=0.9, minRadius=10, maxRadius=0) - print("==========") - # 创建一个0行, 2列的空数组 - if circlesFloat is not None: - num_circles = circlesFloat.shape[1] # 获取检测到的圆的数量 - print("圆的数量",num_circles) - # 提取圆心坐标(保留小数) - centers = [(float(x), float(y),float(r)) for x, y, r in circlesFloat[0, :]] - - font = cv2.FONT_HERSHEY_SIMPLEX - color = (255, 0, 0) # 蓝色 - scale = 1 - # 打印圆心坐标 - for center3d in centers: - center=(center3d[0], center3d[1]) - # center_txt=f"{float(center[0]),float(center[1])}" - text=f"center:{center},r:{center3d[2]}" - print(text) - centerInt=tuple(int(x) for x in center) - cv2.putText(img, text, centerInt, font, scale, color,2) - - circles = np.uint16(np.around(circlesFloat)) # 4舍5入, 然后转为uint16 - for i in circles[0, :]: - print("画圆", i) - # 绘制圆心 - center=(i[0], i[1]) - cv2.circle(img, center, 2, (0, 255, 0), 6) - # 绘制外圆 - cv2.circle(img, center, i[2], (0, 0, 255), 2) - - -if __name__ == '__main__': - circle_detect(binary_img) - cv2.imshow('tagCircle', imgColor) - waitKey(0) - cv2.destroyAllWindows() \ No newline at end of file