32 changed files with 2245 additions and 0 deletions
@ -0,0 +1,31 @@ |
|||||
|
import configSer |
||||
|
import tcp_Ser |
||||
|
import upload.DataReporter |
||||
|
import 标靶识别video |
||||
|
import cv2 |
||||
|
|
||||
|
if __name__ == '__main__': |
||||
|
config_path = "./config.json" |
||||
|
# 读取配置文件 |
||||
|
config_obj= configSer.ConfigOperate(config_path) |
||||
|
json_str = config_obj.config_info.to_json(indent=4) |
||||
|
print(f"当前配置:{json_str}") |
||||
|
|
||||
|
tcp_service = tcp_Ser.TcpSer("127.0.0.1", config_obj.config_info.server.port) |
||||
|
tcp_service.start() |
||||
|
reporter = upload.DataReporter.DataReporter() |
||||
|
reporter.register_handler(tcp_service.broadcast_message) |
||||
|
reporter.start() |
||||
|
# 启动video |
||||
|
标靶识别video.configObj=config_obj |
||||
|
processor = 标靶识别video.VideoProcessor(reporter) |
||||
|
# 添加订阅者processor |
||||
|
tcp_service.add_subscribe(processor) |
||||
|
# 启动 |
||||
|
processor.video_mode(0) |
||||
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
cv2.waitKey(0) |
||||
|
cv2.destroyAllWindows() |
@ -0,0 +1,47 @@ |
|||||
|
{ |
||||
|
"server": { |
||||
|
"port": 2230 |
||||
|
}, |
||||
|
"capture": 0, |
||||
|
"targets": { |
||||
|
"0": { |
||||
|
"info": { |
||||
|
"rectangle_area": { |
||||
|
"x": 50, |
||||
|
"y": 371, |
||||
|
"w": 113, |
||||
|
"h": 91 |
||||
|
}, |
||||
|
"threshold": { |
||||
|
"binary": 120, |
||||
|
"gauss": 5 |
||||
|
}, |
||||
|
"radius_pix": 1, |
||||
|
"radius": 20.0, |
||||
|
"pix_length": 0.0, |
||||
|
"id": 0, |
||||
|
"desc": "0_biaoba", |
||||
|
"base": false |
||||
|
} |
||||
|
} |
||||
|
}, |
||||
|
"perspective": { |
||||
|
"0": [ |
||||
|
[ |
||||
|
1.02161644, |
||||
|
-0.277155559, |
||||
|
109.207622 |
||||
|
], |
||||
|
[ |
||||
|
0.0802663708, |
||||
|
1.00426514, |
||||
|
-34.8436318 |
||||
|
], |
||||
|
[ |
||||
|
3.80200276e-05, |
||||
|
2.664279e-06, |
||||
|
1.0 |
||||
|
] |
||||
|
] |
||||
|
} |
||||
|
} |
@ -0,0 +1,45 @@ |
|||||
|
{ |
||||
|
"server": { |
||||
|
"port": 2230 |
||||
|
}, |
||||
|
"capture": 0, |
||||
|
"targets": { |
||||
|
"0": { |
||||
|
"info": { |
||||
|
"rectangle_area": { |
||||
|
"x": 75, |
||||
|
"y": 310, |
||||
|
"w": 59, |
||||
|
"h": 55 |
||||
|
}, |
||||
|
"threshold": { |
||||
|
"binary": 128, |
||||
|
"gauss": 9 |
||||
|
}, |
||||
|
"radius": 20.0, |
||||
|
"id": 0, |
||||
|
"desc": "", |
||||
|
"base": false |
||||
|
} |
||||
|
} |
||||
|
}, |
||||
|
"perspective2": { |
||||
|
"0": [ |
||||
|
[ |
||||
|
1.02161644, |
||||
|
-0.277155559, |
||||
|
109.207622 |
||||
|
], |
||||
|
[ |
||||
|
0.0802663708, |
||||
|
1.00426514, |
||||
|
-34.8436318 |
||||
|
], |
||||
|
[ |
||||
|
3.80200276e-05, |
||||
|
2.664279e-06, |
||||
|
1.0 |
||||
|
] |
||||
|
] |
||||
|
} |
||||
|
} |
@ -0,0 +1,109 @@ |
|||||
|
import json |
||||
|
import os |
||||
|
from dataclasses import ( |
||||
|
dataclass, |
||||
|
field, asdict |
||||
|
) |
||||
|
from typing import Dict, Optional |
||||
|
|
||||
|
import numpy as np |
||||
|
from dataclasses_json import dataclass_json |
||||
|
|
||||
|
import models.target |
||||
|
|
||||
|
_file_path: str |
||||
|
|
||||
|
|
||||
|
@dataclass_json |
||||
|
@dataclass |
||||
|
class Server: |
||||
|
port: int = 0 |
||||
|
|
||||
|
@dataclass_json |
||||
|
@dataclass |
||||
|
class ConfigInfo: |
||||
|
server:Server |
||||
|
capture: int = 0 |
||||
|
# 标靶配置 |
||||
|
targets: Dict[int, models.target.CircleTarget] = field(default_factory=dict) |
||||
|
# 标靶透视矩阵 |
||||
|
perspective: Dict[int, np.ndarray] = field(default_factory=dict) |
||||
|
|
||||
|
class ConfigOperate: |
||||
|
_file_path: str |
||||
|
config_info: ConfigInfo |
||||
|
def __init__(self,path:str): |
||||
|
self._file_path = path |
||||
|
self.load2obj_sample() |
||||
|
|
||||
|
|
||||
|
def load2dict(self): |
||||
|
""""读取配置""" |
||||
|
if not os.path.exists(self._file_path): |
||||
|
raise FileNotFoundError(f"配置文件 {self._file_path} 不存在") |
||||
|
|
||||
|
with open(self._file_path) as json_file: |
||||
|
config = json.load(json_file) |
||||
|
return config |
||||
|
|
||||
|
def load2obj_sample2(self): |
||||
|
""""读取配置""" |
||||
|
dic=self.load2dict() |
||||
|
ts = dic["targets"] |
||||
|
capture = dic["capture"] |
||||
|
# 获取矩阵数据 |
||||
|
matrix_dict = dic.get("perspective", {}) |
||||
|
# n0=convert_to_ndarray(self.matrix_dict["0"]) |
||||
|
# 将矩阵转换为字符串 |
||||
|
# matrix_str = np.array2string(n0, precision=8, separator=', ', suppress_small=True) |
||||
|
for _,t in ts.items(): |
||||
|
obj = models.target.TargetInfo(**t) |
||||
|
area = models.target.RectangleArea.from_dict(obj.rectangle_area) |
||||
|
thres = models.target.Threshold(**obj.threshold) |
||||
|
self.targets[obj.id] = models.target.CircleTarget( |
||||
|
obj.id, |
||||
|
obj.desc, |
||||
|
area, |
||||
|
obj.radius, |
||||
|
thres, |
||||
|
obj.base |
||||
|
) |
||||
|
return self.targets |
||||
|
|
||||
|
def load2obj_sample(self): |
||||
|
dic=self.load2dict() |
||||
|
dict_str = json.dumps(dic) |
||||
|
self.config_info=ConfigInfo.from_json(dict_str) |
||||
|
|
||||
|
def save2json_file(self): |
||||
|
json_str = self.config_info.to_json(indent=4) |
||||
|
""""更新配置""" |
||||
|
with open(self._file_path, 'w') as json_file: |
||||
|
json_file.write(json_str) |
||||
|
# json.dump(self, json_file, indent=4) |
||||
|
return None |
||||
|
|
||||
|
|
||||
|
def save_dict_config(self, dict_data:Dict): |
||||
|
""""更新配置""" |
||||
|
with open(self._file_path, 'w') as json_file: |
||||
|
json.dump(dict_data, json_file, indent=4) |
||||
|
return None |
||||
|
|
||||
|
def update_dict_config(self, updates): |
||||
|
""" |
||||
|
更新配置文件中的特定字段。 |
||||
|
:param file_path: 配置文件路径 |
||||
|
:param updates: 包含更新内容的字典 |
||||
|
""" |
||||
|
config_dict = self.load2dict() |
||||
|
config_dict.update(updates) |
||||
|
self.save_dict_config(config_dict) |
||||
|
|
||||
|
def convert_to_ndarray(matrix_data): |
||||
|
""" |
||||
|
将 JSON 中的矩阵数据转换为 numpy ndarray。 |
||||
|
:param matrix_data: JSON 中的矩阵数据(列表形式) |
||||
|
:return: numpy ndarray |
||||
|
""" |
||||
|
return np.array(matrix_data, dtype=np.float64) |
After Width: | Height: | Size: 380 KiB |
After Width: | Height: | Size: 444 KiB |
After Width: | Height: | Size: 287 KiB |
After Width: | Height: | Size: 350 KiB |
@ -0,0 +1,9 @@ |
|||||
|
def print_hi(name): |
||||
|
# 在下面的代码行中使用断点来调试脚本。 |
||||
|
print(f'Hi, {name}') # 按 Ctrl+F8 切换断点。 |
||||
|
|
||||
|
|
||||
|
# 按装订区域中的绿色按钮以运行脚本。 |
||||
|
if __name__ == '__main__': |
||||
|
print_hi('PyCharm') |
||||
|
|
@ -0,0 +1,25 @@ |
|||||
|
import json |
||||
|
import typing |
||||
|
from dataclasses import dataclass |
||||
|
from dataclasses_json import dataclass_json |
||||
|
|
||||
|
@dataclass_json |
||||
|
@dataclass |
||||
|
class Msg: |
||||
|
_from: str |
||||
|
cmd: str |
||||
|
values: typing.Any |
||||
|
|
||||
|
def to_json_(self) -> str: |
||||
|
"""将数据类序列化为 JSON 字符串""" |
||||
|
# return json.dumps(self.__dict__, indent=4, default=lambda x: x.__dict__) |
||||
|
return self.to_json() |
||||
|
|
||||
|
# @classmethod |
||||
|
# def from_json(cls, json_str: str) -> 'Msg': |
||||
|
# """从 JSON 字符串反序列化为数据类""" |
||||
|
# data_dict = json.loads(json_str) |
||||
|
# return cls(**data_dict) |
||||
|
|
||||
|
|
||||
|
|
@ -0,0 +1,43 @@ |
|||||
|
import json |
||||
|
from dataclasses import dataclass |
||||
|
from typing import List |
||||
|
|
||||
|
@dataclass |
||||
|
class SensorImg: |
||||
|
base64: str |
||||
|
|
||||
|
def to_json(self) -> str: |
||||
|
"""将数据类序列化为 JSON 字符串""" |
||||
|
return json.dumps(self.__dict__, indent=4, default=lambda x: x.__dict__) |
||||
|
|
||||
|
@classmethod |
||||
|
def from_json(cls, json_str: str) -> 'SensorImg': |
||||
|
"""从 JSON 字符串反序列化为数据类""" |
||||
|
data_dict = json.loads(json_str) |
||||
|
return cls(**data_dict) |
||||
|
@dataclass |
||||
|
class SensorData: |
||||
|
pos: str |
||||
|
x: float |
||||
|
y: float |
||||
|
|
||||
|
def to_json(self) -> str: |
||||
|
"""将数据类序列化为 JSON 字符串""" |
||||
|
return json.dumps(self.__dict__, indent=4, default=lambda x: x.__dict__) |
||||
|
|
||||
|
@classmethod |
||||
|
def from_json(cls, json_str: str) -> 'SensorData': |
||||
|
"""从 JSON 字符串反序列化为数据类""" |
||||
|
data_dict = json.loads(json_str) |
||||
|
return cls(**data_dict) |
||||
|
|
||||
|
@dataclass |
||||
|
class AllSensorData: |
||||
|
data: List[SensorData] |
||||
|
time: str |
||||
|
|
||||
|
|
||||
|
@dataclass |
||||
|
class AllImg: |
||||
|
image: SensorImg |
||||
|
time: str |
@ -0,0 +1,97 @@ |
|||||
|
from dataclasses import dataclass, field |
||||
|
from typing import Optional |
||||
|
|
||||
|
from dataclasses_json import dataclass_json, config |
||||
|
|
||||
|
|
||||
|
@dataclass_json |
||||
|
@dataclass |
||||
|
class Point: |
||||
|
x:float |
||||
|
y:float |
||||
|
def __iter__(self): # 使对象可迭代,可直接转为元组 |
||||
|
yield self.x |
||||
|
yield self.y |
||||
|
|
||||
|
@dataclass |
||||
|
class RectangleArea: |
||||
|
x: int |
||||
|
y: int |
||||
|
w: int |
||||
|
h: int |
||||
|
@classmethod |
||||
|
def from_dict(cls, data: dict): |
||||
|
return cls( |
||||
|
x=data['x'], |
||||
|
y=data['y'], |
||||
|
w=data['w'], |
||||
|
h = data['h']) |
||||
|
|
||||
|
@dataclass |
||||
|
class Threshold: |
||||
|
binary: int |
||||
|
gauss: int |
||||
|
|
||||
|
@dataclass_json |
||||
|
@dataclass |
||||
|
class TargetInfo: |
||||
|
# 标靶方形区域 |
||||
|
rectangle_area:RectangleArea |
||||
|
threshold:Threshold |
||||
|
radius_pix:float= 1 |
||||
|
# 标靶物理半径 |
||||
|
radius:float=0.0 |
||||
|
pix_length:float=0.0 |
||||
|
id:int =-1 |
||||
|
desc:str="" |
||||
|
base:bool=False |
||||
|
def __init__(self,id,desc,rectangle_area:RectangleArea,radius,threshold:Threshold,base:bool,**kwargs): |
||||
|
self.id = id |
||||
|
self.desc = desc |
||||
|
self.rectangle_area=rectangle_area |
||||
|
self.radius=radius |
||||
|
self.threshold=threshold |
||||
|
self.base=base |
||||
|
|
||||
|
@classmethod |
||||
|
def from_dict(cls,data: dict): |
||||
|
return cls(data['id'],data['rectangle_area'],data['radius']) |
||||
|
|
||||
|
|
||||
|
@dataclass_json |
||||
|
@dataclass |
||||
|
class CircleTarget: |
||||
|
# 标靶方形区域 |
||||
|
info:TargetInfo |
||||
|
|
||||
|
# 初始标靶中心 |
||||
|
is_init=True |
||||
|
# 标靶中心 |
||||
|
center_point: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) |
||||
|
center_init : Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) |
||||
|
# 标靶位移(像素) |
||||
|
displacement_pix: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) |
||||
|
displacement_phy: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) |
||||
|
|
||||
|
def __init__(self,info:TargetInfo,center_point,center_init,displacement_pix,displacement_phy): |
||||
|
self.info=info |
||||
|
self.center_point=center_point |
||||
|
self.center_init=center_init |
||||
|
self. displacement_pix=displacement_pix |
||||
|
self.displacement_phy=displacement_phy |
||||
|
|
||||
|
|
||||
|
@classmethod |
||||
|
def init_by_info(cls,t:TargetInfo): |
||||
|
return CircleTarget(t,None,None,None,None) |
||||
|
def circle_displacement(self): |
||||
|
previous = self.center_init |
||||
|
if previous != (): |
||||
|
self.displacement_pix = Point(self.center_point.x - previous.x, self.center_point.y - previous.y) |
||||
|
if self.info.radius != 0: |
||||
|
# 单位像素距离 |
||||
|
self.info.pix_length = self.info.radius / self.info.radius_pix |
||||
|
offset_x = round(float(self.displacement_pix.x * self.info.pix_length), 5) |
||||
|
offset_y = round(float(self.displacement_pix.y * self.info.pix_length), 5) |
||||
|
self.displacement_phy = Point(offset_x, offset_y) |
||||
|
return self |
@ -0,0 +1,111 @@ |
|||||
|
import logging |
||||
|
import socket |
||||
|
import threading |
||||
|
from unittest import case |
||||
|
|
||||
|
from models.msg import Msg |
||||
|
|
||||
|
|
||||
|
class TcpSer(threading.Thread): |
||||
|
# 定义服务器地址和端口 |
||||
|
HOST = '127.0.0.1' |
||||
|
PORT = 2230 |
||||
|
def __init__(self,host,port): |
||||
|
super().__init__() |
||||
|
self.HOST=host |
||||
|
self.PORT=port |
||||
|
self.connected_clients=[] |
||||
|
# 消费者 |
||||
|
self.consumers=[] |
||||
|
self.lock = threading.Lock() |
||||
|
# 处理客户端连接的函数 |
||||
|
def handle_client(self,client_socket): |
||||
|
try: |
||||
|
# 当客户端连接时,将其添加到列表中 |
||||
|
self.connected_clients.append(client_socket) |
||||
|
print(f"新连接: {client_socket.getpeername()}") |
||||
|
|
||||
|
# 保持连接,直到客户端断开 |
||||
|
while True: |
||||
|
# 接收客户端数据(如果需要) |
||||
|
data = client_socket.recv(1024) |
||||
|
msg_str=data.decode('utf-8') |
||||
|
if not data: |
||||
|
break # 如果没有数据,退出循环 |
||||
|
print(f"从 {client_socket.getpeername()} 收到: {msg_str}") |
||||
|
# 反序列化为 实例 |
||||
|
s_cmd = Msg.from_json(msg_str) |
||||
|
match s_cmd.cmd: |
||||
|
case "getPoints" | "setPoints": |
||||
|
self.on_data(s_cmd) |
||||
|
# todo 添加处理 |
||||
|
case "xxxxx": |
||||
|
self.on_data(s_cmd) |
||||
|
|
||||
|
except Exception as e: |
||||
|
print(f"处理客户端时出错: {e}") |
||||
|
finally: |
||||
|
# 从列表中移除客户端并关闭连接 |
||||
|
if client_socket in self.connected_clients: |
||||
|
self.connected_clients.remove(client_socket) |
||||
|
print(f"连接关闭: {client_socket.getpeername()}") |
||||
|
client_socket.close() |
||||
|
|
||||
|
# 注册的消费者必须携带on_data 方法 |
||||
|
def add_subscribe(self,consumer): |
||||
|
if hasattr(consumer, 'on_data'): |
||||
|
print(f"加入 consumer {consumer} ") |
||||
|
self.consumers.append(consumer) |
||||
|
else: |
||||
|
print("consumer 缺少on_data函数,订阅无效 ") |
||||
|
def on_data(self,msg): |
||||
|
for consumer in self.consumers: |
||||
|
try: |
||||
|
resp=consumer.on_data(msg) |
||||
|
self.broadcast_message(resp) |
||||
|
except Exception as e: |
||||
|
logging.warn("通讯异常",e) |
||||
|
|
||||
|
# 广播消息给所有连接的客户端 |
||||
|
def broadcast_message(self,message:str): |
||||
|
with self.lock: |
||||
|
if len(message)==0: |
||||
|
return |
||||
|
|
||||
|
for client in self.connected_clients: |
||||
|
try: |
||||
|
client.sendall(message.encode()) |
||||
|
except Exception as e: |
||||
|
print(f"向客户端发送消息时出错: {e}") |
||||
|
# 如果发送失败,从列表中移除客户端 |
||||
|
if client in self.connected_clients: |
||||
|
self.connected_clients.remove(client) |
||||
|
client.close() |
||||
|
def run(self): |
||||
|
# 创建服务器套接字 |
||||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: |
||||
|
server_socket.bind((self.HOST,self.PORT)) |
||||
|
server_socket.listen() |
||||
|
print(f"服务器监听在 {self.HOST}:{self.PORT}...") |
||||
|
|
||||
|
try: |
||||
|
# 保持服务器运行并接受新的连接 |
||||
|
while True: |
||||
|
client_socket, addr = server_socket.accept() |
||||
|
print(f"连接来自 {addr}") |
||||
|
|
||||
|
# 为每个客户端启动一个线程 |
||||
|
client_thread = threading.Thread(target=self.handle_client, args=(client_socket,)) |
||||
|
client_thread.daemon = True # 守护线程,服务器关闭时自动结束 |
||||
|
client_thread.start() |
||||
|
|
||||
|
except KeyboardInterrupt: |
||||
|
print("服务器关闭...") |
||||
|
finally: |
||||
|
# 关闭所有客户端连接 |
||||
|
for client in self.connected_clients: |
||||
|
client.close() |
||||
|
server_socket.close() |
||||
|
if __name__ == '__main__': |
||||
|
tcp=TcpSer("127.0.0.1",2230) |
||||
|
tcp.run() |
@ -0,0 +1,18 @@ |
|||||
|
import signal |
||||
|
import sys |
||||
|
from dataclasses import dataclass, asdict |
||||
|
from datetime import datetime |
||||
|
|
||||
|
def signal_handler(sig, frame): |
||||
|
print(f"收到退出信号 sig={sig},程序退出") |
||||
|
sys.exit(0) |
||||
|
|
||||
|
signal.signal(signal.SIGINT, signal_handler) # 捕获 Ctrl+C 信号 |
||||
|
|
||||
|
|
||||
|
if __name__ == '__main__': |
||||
|
t=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] |
||||
|
print(t) |
||||
|
while True: |
||||
|
# 程序主循环 |
||||
|
pass |
@ -0,0 +1,29 @@ |
|||||
|
import json |
||||
|
from dataclasses import asdict |
||||
|
|
||||
|
import configSer |
||||
|
|
||||
|
def test_load_config(): |
||||
|
config_path = "../config.json" |
||||
|
# 读取配置文件 |
||||
|
config = configSer.ConfigOperate(config_path) |
||||
|
json_str2 = config.config_info.to_json(indent=4) |
||||
|
print("json=",json_str2) |
||||
|
config_dict = asdict(config) |
||||
|
config.capture=1 |
||||
|
config.save2json_file() |
||||
|
# 更新配置文件 |
||||
|
updates = { |
||||
|
"capture": "rtsp://admin:123456abc@192.168.1.64:554/h264/ch1/main/av_stream", |
||||
|
} |
||||
|
config.update_dict_config(updates) |
||||
|
|
||||
|
# 重新读取配置文件,确认更新 |
||||
|
updated_config = configSer.ConfigOperate(config_path) |
||||
|
print(f"当前新配置capture:{updated_config.capture}") |
||||
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
if __name__ == "__main__": |
||||
|
test_load_config() |
@ -0,0 +1,38 @@ |
|||||
|
import logging |
||||
|
|
||||
|
import cv2 |
||||
|
print(cv2.__version__) |
||||
|
def open_video(video_id): |
||||
|
cap = cv2.VideoCapture(video_id) |
||||
|
if not cap.isOpened(): |
||||
|
logging.info("无法打开摄像头") |
||||
|
exit() |
||||
|
return cap |
||||
|
|
||||
|
|
||||
|
rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554/h264/ch1/main/av_stream" |
||||
|
capture = open_video(rtsp_url) |
||||
|
fps = capture.get(cv2.CAP_PROP_FPS) |
||||
|
width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH)) |
||||
|
height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
||||
|
print(f"fps={fps}") |
||||
|
print("width={width}, height={height}".format(width=width, height=height)) |
||||
|
|
||||
|
# 定义视频编码器和输出文件 |
||||
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
||||
|
out = cv2.VideoWriter('output.mp4', fourcc, fps, (width, height)) |
||||
|
# 读取一帧图像 |
||||
|
while True: |
||||
|
ret, frame = capture.read() |
||||
|
print("-->") |
||||
|
|
||||
|
if ret: |
||||
|
# 写入帧到输出文件 |
||||
|
out.write(frame) |
||||
|
else: |
||||
|
logging.info("无法读取帧") |
||||
|
if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 |
||||
|
break |
||||
|
|
||||
|
capture.release() |
||||
|
out.release() |
@ -0,0 +1,32 @@ |
|||||
|
from dataclasses import dataclass, field |
||||
|
from typing import Optional |
||||
|
|
||||
|
from dataclasses_json import dataclass_json, config |
||||
|
import json |
||||
|
|
||||
|
import models.target |
||||
|
|
||||
|
@dataclass_json |
||||
|
@dataclass |
||||
|
class Rectangle: |
||||
|
x: int |
||||
|
y: int |
||||
|
width: int |
||||
|
height: int |
||||
|
p: Optional[models.target.Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) |
||||
|
|
||||
|
# 使用 |
||||
|
p=models.target.Point(1,2) |
||||
|
rect = Rectangle(0, 0, 100, 100,None) |
||||
|
# 序列化 |
||||
|
json_str=rect.to_json() |
||||
|
# json_str = json.dumps(rect, default=lambda obj: obj.__dict__) |
||||
|
print(f"序列化后={json_str}") |
||||
|
|
||||
|
rect.p.x=2046 |
||||
|
print("修改后p.x=",p.x) |
||||
|
|
||||
|
new_json='{"x": 1, "y": 1, "width": 100, "height": 100, "p": {"x": 2, "y": 2}}' |
||||
|
nr=Rectangle.from_json(new_json) |
||||
|
print(nr.p) |
||||
|
|
@ -0,0 +1,71 @@ |
|||||
|
import queue |
||||
|
import threading |
||||
|
import time |
||||
|
|
||||
|
import models.msg |
||||
|
from upload.RateLimiter import RateLimiter |
||||
|
|
||||
|
|
||||
|
class DataReporter(threading.Thread): |
||||
|
call_back=None |
||||
|
def __init__(self,): |
||||
|
super().__init__() |
||||
|
self.image_queue = queue.Queue(maxsize=10) # 图片队列 |
||||
|
self.data_queue = queue.Queue(maxsize=50) # 数据队列 |
||||
|
self.image_limiter = RateLimiter(max_rate=1, time_window=1) # 图片限速: 5张/秒 |
||||
|
self.data_limiter = RateLimiter(max_rate=1, time_window=1) # 数据限速: 20条/秒 |
||||
|
self.running = True |
||||
|
self.image_dropped = 0 # 统计丢弃的图片数量 |
||||
|
self.data_dropped = 0 # 统计丢弃的数据数量 |
||||
|
|
||||
|
def register_handler(self,handler_fun): |
||||
|
self.call_back = handler_fun |
||||
|
|
||||
|
def run(self): |
||||
|
while self.running: |
||||
|
# 优先处理图片上报 |
||||
|
if not self.image_queue.empty() and self.image_limiter.allow_request(): |
||||
|
try: |
||||
|
image_data = self.image_queue.get_nowait() |
||||
|
# self._report_image(image_data) |
||||
|
except queue.Empty: |
||||
|
pass |
||||
|
|
||||
|
# 然后处理数据上报 |
||||
|
if not self.data_queue.empty() and self.data_limiter.allow_request(): |
||||
|
try: |
||||
|
data = self.data_queue.get_nowait() |
||||
|
self._report_data(data) |
||||
|
except queue.Empty: |
||||
|
pass |
||||
|
|
||||
|
time.sleep(0.02) # 避免CPU占用过高 |
||||
|
|
||||
|
def _report_image(self, data): |
||||
|
# 实现图片上报逻辑 |
||||
|
print(f"Reporting image, timestamp: {data[0]}") |
||||
|
# 这里替换为实际的上报代码 |
||||
|
msg = models.msg.Msg(_from="dev", cmd="image", values=data[1]) |
||||
|
msg_json = msg.to_json() |
||||
|
self.call_back(msg_json) |
||||
|
|
||||
|
def _report_data(self, data): |
||||
|
# 实现数据上报逻辑 |
||||
|
print(f"Reporting data: {data}") |
||||
|
# 实际的上报代码,数据结构转换 |
||||
|
msg=models.msg.Msg(_from="dev",cmd="data",values=data[1]) |
||||
|
msg_json=msg.to_json() |
||||
|
self.call_back(msg_json) |
||||
|
|
||||
|
def stop(self): |
||||
|
self.running = False |
||||
|
self.join() |
||||
|
print(f"Stats: {self.image_dropped} images dropped, {self.data_dropped} data dropped") |
||||
|
|
||||
|
def adjust_rate(self, new_rate, data_type='image'): |
||||
|
if data_type == 'image': |
||||
|
with self.image_limiter.lock: |
||||
|
self.image_limiter.max_rate = new_rate |
||||
|
else: |
||||
|
with self.data_limiter.lock: |
||||
|
self.data_limiter.max_rate = new_rate |
@ -0,0 +1,12 @@ |
|||||
|
import queue |
||||
|
|
||||
|
|
||||
|
class DroppingQueue(queue.Queue): |
||||
|
"""自定义队列,满时自动丢弃最旧的数据""" |
||||
|
def put(self, item, block=False, timeout=None): |
||||
|
try: |
||||
|
return super().put(item, block=block, timeout=timeout) |
||||
|
except queue.Full: |
||||
|
# 队列满时丢弃最旧的一个数据 |
||||
|
self.get_nowait() |
||||
|
return super().put(item, block=False) |
@ -0,0 +1,23 @@ |
|||||
|
import threading |
||||
|
import queue |
||||
|
import time |
||||
|
from collections import deque |
||||
|
|
||||
|
class RateLimiter: |
||||
|
def __init__(self, max_rate, time_window): |
||||
|
self.max_rate = max_rate # 最大允许的请求数 |
||||
|
self.time_window = time_window # 时间窗口(秒) |
||||
|
self.timestamps = deque() |
||||
|
self.lock = threading.Lock() |
||||
|
|
||||
|
def allow_request(self): |
||||
|
with self.lock: |
||||
|
current_time = time.time() |
||||
|
# 移除超出时间窗口的时间戳 |
||||
|
while self.timestamps and current_time - self.timestamps[0] > self.time_window: |
||||
|
self.timestamps.popleft() |
||||
|
|
||||
|
if len(self.timestamps) < self.max_rate: |
||||
|
self.timestamps.append(current_time) |
||||
|
return True |
||||
|
return False |
@ -0,0 +1,19 @@ |
|||||
|
import cv2 |
||||
|
import base64 |
||||
|
def frame_to_base64(frame, format="JPEG"): |
||||
|
"""将 OpenCV 读取的图片帧转换为 Base64 编码的字符串""" |
||||
|
# 将图片帧编码为 JPEG 或 PNG 格式 |
||||
|
if format.upper() == "JPEG": |
||||
|
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 80] # JPEG 压缩质量 |
||||
|
elif format.upper() == "PNG": |
||||
|
encode_param = [int(cv2.IMWRITE_PNG_COMPRESSION), 9] # PNG 压缩级别 |
||||
|
else: |
||||
|
raise ValueError("Unsupported format. Use 'JPEG' or 'PNG'.") |
||||
|
|
||||
|
result, encoded_frame = cv2.imencode(f".{format.lower()}", frame, encode_param) |
||||
|
if not result: |
||||
|
raise ValueError("Failed to encode frame.") |
||||
|
|
||||
|
# 将编码后的字节流转换为 Base64 字符串 |
||||
|
base64_string = base64.b64encode(encoded_frame).decode("utf-8") |
||||
|
return base64_string |
@ -0,0 +1,164 @@ |
|||||
|
import math |
||||
|
|
||||
|
import cv2 |
||||
|
import numpy as np |
||||
|
import matplotlib.pyplot as plt |
||||
|
import matplotlib |
||||
|
matplotlib.use('TkAgg') |
||||
|
|
||||
|
elevation=0 |
||||
|
azimuth=0 |
||||
|
def circle3d(): |
||||
|
global elevation,azimuth |
||||
|
# 创建一个新的图和一个3D坐标轴 |
||||
|
fig = plt.figure() |
||||
|
ax = fig.add_subplot(111, projection='3d') |
||||
|
|
||||
|
# 定义圆的参数 |
||||
|
radius = 1 # 半径 |
||||
|
theta = np.linspace(0, 2 * np.pi, 100) # 参数角度 |
||||
|
x = radius * np.cos(theta) # x坐标 |
||||
|
y = radius * np.sin(theta) # y坐标 |
||||
|
z = np.zeros_like(theta) # z坐标(这里圆在xy平面上) |
||||
|
|
||||
|
# 绘制圆 |
||||
|
# ax.plot(x, y, z, label='3D Circle', color='b') |
||||
|
|
||||
|
# 绘制圆的正俯视图 |
||||
|
# 正视图(xz平面) |
||||
|
ax.plot(x, np.zeros_like(theta), z, label='z View', color='r') |
||||
|
ax.plot(np.zeros_like(theta), y, z, label='h View', color='b') |
||||
|
# 俯视图(xy平面) |
||||
|
ax.plot(x, y, np.zeros_like(theta), label='Top View', color='g') |
||||
|
|
||||
|
# 设置坐标轴范围 |
||||
|
ax.set_xlim([-2, 2]) |
||||
|
ax.set_ylim([-2, 2]) |
||||
|
ax.set_zlim([-2, 2]) |
||||
|
ax.set_xlabel('X') |
||||
|
ax.set_ylabel('Y') |
||||
|
ax.set_zlabel('Z') |
||||
|
# 添加图例 |
||||
|
ax.legend() |
||||
|
# 设置初始视角 |
||||
|
# ax.view_init(elev=53, azim=-48,roll=-38) # 俯仰角30度,方位角45度 |
||||
|
ax.view_init(elev=90, azim=0, roll=0) # 俯仰角30度,方位角45度 |
||||
|
# 隐藏整个坐标轴 |
||||
|
# ax.axis('off') |
||||
|
# 设置窗口透明度 |
||||
|
fig.canvas.manager.window.attributes('-alpha', 0.6) # 设置窗口透明度为 0.6 |
||||
|
# 显示图形 |
||||
|
plt.show() |
||||
|
# input("请用鼠标转动 调整角度 elevation and azimuth: ") |
||||
|
# 获取当前的 elevation 和 azimuth |
||||
|
elevation = ax.elev |
||||
|
azimuth = ax.azim |
||||
|
|
||||
|
|
||||
|
def perspective_transform_by_angle(img, _elevation, _azimuth): |
||||
|
h, w = img.shape[:2] |
||||
|
|
||||
|
# 根据角度计算目标点位置 |
||||
|
fov = math.radians(45) # 假设视场角45度 |
||||
|
dz = 1 / math.tan(_elevation) |
||||
|
dx = dz * math.tan(_azimuth) |
||||
|
|
||||
|
# 源图像四个角点(原始斜视图) |
||||
|
src_points = np.float32([[0, 0], [w, 0], [w, h], [0, h]]) |
||||
|
|
||||
|
# 计算目标点(俯视图) |
||||
|
dst_points = np.float32([ |
||||
|
[w * 0.5 * (1 - dx), h * 0.5 * (1 - dz)], # 左上 |
||||
|
[w * 0.5 * (1 + dx), h * 0.5 * (1 - dz)], # 右上 |
||||
|
[w * 0.5 * (1 + dx), h * 0.5 * (1 + dz)], # 右下 |
||||
|
[w * 0.5 * (1 - dx), h * 0.5 * (1 + dz)] # 左下 |
||||
|
]) |
||||
|
|
||||
|
# 获取变换矩阵并应用 |
||||
|
M = cv2.getPerspectiveTransform(src_points, dst_points) |
||||
|
transformed_image=cv2.warpPerspective(img, M, (h,w)) |
||||
|
# 显示原始图像和变换后的图像 |
||||
|
fig, ax = plt.subplots(1, 2,figsize= (12, 6)) |
||||
|
ax[0].imshow(img) |
||||
|
ax[0].set_title("Original Image") |
||||
|
ax[1].imshow(transformed_image) |
||||
|
ax[1].set_title("Transformed Image") |
||||
|
plt.show() |
||||
|
|
||||
|
|
||||
|
def trans_img(image_path ,x,y): |
||||
|
global elevation, azimuth |
||||
|
image = cv2.imread(image_path) |
||||
|
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
||||
|
|
||||
|
# 获取图像尺寸 |
||||
|
height, width = image.shape[:2] |
||||
|
|
||||
|
center_x, center_y = 532,385 #圆心 |
||||
|
# 定义源图像的四个点(假设为矩形) |
||||
|
sx1=center_x-100 |
||||
|
sx2=center_x+100 |
||||
|
sy1=center_y-100 |
||||
|
sy2=center_y + 100 |
||||
|
src_points = np.float32([ |
||||
|
[sx1, sy1], # 左上 |
||||
|
[sx2, sy1], # 右上 |
||||
|
[sx2, sy2], # 右下 |
||||
|
[sx1, sy2] # 左下 |
||||
|
]) |
||||
|
# src_points = np.float32([ |
||||
|
# [center_x, sy1], # 上 |
||||
|
# [sx2, center_y], # 右 |
||||
|
# [center_x, sy2], # 下 |
||||
|
# [sx1, center_y] # 左 |
||||
|
# ]) |
||||
|
# 根据 elevation 和 azimuth 计算目标点 |
||||
|
# 这里是一个简化的计算方法,可以根据实际需求调整 |
||||
|
# 假设目标点在透视变换后的坐标 |
||||
|
# 将斜视的画面变成俯视的画面 |
||||
|
radius = 100 |
||||
|
|
||||
|
# 根据俯仰角和方位角计算目标点的偏移 |
||||
|
offset_x = radius * np.sin(elevation) * np.cos(azimuth) |
||||
|
offset_y = radius * np.sin(elevation) * np.sin(azimuth) |
||||
|
offset_z = radius * np.cos(elevation) |
||||
|
print(f"offset_x={offset_x},offset_y={offset_y}") |
||||
|
# 计算目标点 |
||||
|
# dst_points = np.float32([ |
||||
|
# [0 - offset_x, 0 - offset_y], # 左上 |
||||
|
# [width + offset_x, 0 - offset_y], # 右上 |
||||
|
# [width + offset_x, height + offset_y], # 右下 |
||||
|
# [0 - offset_x, height + offset_y] # 左下 |
||||
|
# ]) |
||||
|
dst_points = np.float32([ |
||||
|
[sx1- offset_x, sy1- offset_y], # 上 |
||||
|
[sx2 + offset_x, sy1 - offset_y], # 右上 |
||||
|
[sx2 + offset_x, sy2+ offset_y], # 右下 |
||||
|
[sx1 - offset_x, sy2 + offset_y] # 下 |
||||
|
]) |
||||
|
# 计算透视变换矩阵 |
||||
|
matrix = cv2.getPerspectiveTransform(src_points, dst_points) |
||||
|
|
||||
|
# 应用透视变换 |
||||
|
transformed_image = cv2.warpPerspective(image, matrix, (width, height)) |
||||
|
|
||||
|
# 显示原始图像和变换后的图像 |
||||
|
fig, ax = plt.subplots(1, 2,figsize= (12, 6)) |
||||
|
ax[0].imshow(image) |
||||
|
ax[0].set_title("Original Image") |
||||
|
ax[1].imshow(transformed_image) |
||||
|
ax[1].set_title("Transformed Image") |
||||
|
plt.show() |
||||
|
|
||||
|
if __name__ == '__main__': |
||||
|
circle3d() |
||||
|
print("测试============") |
||||
|
print(f"elevation: {elevation}") |
||||
|
print(f" azimuth: {azimuth}") |
||||
|
img_path="images/trans/subRawImg.jpg" |
||||
|
rawimg=cv2.imread(img_path) |
||||
|
cv2.imshow("raw Image", rawimg) |
||||
|
# trans_img(img_path,elevation,azimuth) |
||||
|
elev = math.radians(elevation) |
||||
|
azim = math.radians(azimuth) |
||||
|
perspective_transform_by_angle(rawimg, elev,azim) |
@ -0,0 +1,60 @@ |
|||||
|
import cv2 |
||||
|
import numpy as np |
||||
|
|
||||
|
def pingyi(x,y): |
||||
|
# 获取图像的大小 |
||||
|
height, width = image.shape[:2] |
||||
|
|
||||
|
|
||||
|
# 构造平移矩阵:将原点移到图像中心 |
||||
|
M_translate = np.float32([ |
||||
|
[1, 0, x], |
||||
|
[0, 1, y], |
||||
|
[0, 0, 1] |
||||
|
]) |
||||
|
|
||||
|
return M_translate |
||||
|
def xuanzhuan_z(z): |
||||
|
# 构造沿Z轴旋转30度的旋转矩阵 |
||||
|
theta = np.radians(z) |
||||
|
cos_theta = np.cos(theta) |
||||
|
sin_theta = np.sin(theta) |
||||
|
M_z_rotate = np.float32([ |
||||
|
[cos_theta, -sin_theta, 0], |
||||
|
[sin_theta, cos_theta, 0], |
||||
|
[0, 0, 1] |
||||
|
]) |
||||
|
return M_z_rotate |
||||
|
def xuanzhuan_y(y): |
||||
|
# 构造沿Z轴旋转30度的旋转矩阵 |
||||
|
theta = np.radians(y) |
||||
|
rotation_vector = np.array([0, theta, 0]) # 绕Y轴旋转 |
||||
|
|
||||
|
# 计算旋转矩阵 |
||||
|
rotation_matrix, _ = cv2.Rodrigues(rotation_vector) |
||||
|
return rotation_matrix |
||||
|
if __name__ == '__main__': |
||||
|
# 读取图像 |
||||
|
image = cv2.imread("images/trans/transformed_image.jpg") |
||||
|
# 获取图像的大小 |
||||
|
height, width = image.shape[:2] |
||||
|
|
||||
|
m_pinyi=pingyi(width,0) |
||||
|
# m_xuanzhuan=xuanzhuan_z(30) |
||||
|
m_xuanzhuan = xuanzhuan_y(80) |
||||
|
M_combined = m_pinyi @ m_xuanzhuan |
||||
|
rotation_3d_int = np.clip(M_combined, 0, 255) |
||||
|
# 应用透视变换 |
||||
|
warped_image = cv2.warpPerspective( |
||||
|
image, |
||||
|
M_combined, |
||||
|
(width*2, height*2) |
||||
|
) |
||||
|
|
||||
|
# 显示结果 |
||||
|
cv2.imshow('Original Image', image) |
||||
|
cv2.imshow('Warped Image', warped_image) |
||||
|
cv2.waitKey(0) |
||||
|
cv2.destroyAllWindows() |
||||
|
|
||||
|
|
@ -0,0 +1,252 @@ |
|||||
|
import json |
||||
|
from time import sleep |
||||
|
from typing import Dict |
||||
|
from dataclasses import asdict |
||||
|
import cv2 |
||||
|
import numpy as np |
||||
|
import signal |
||||
|
import sys |
||||
|
import threading |
||||
|
|
||||
|
import models.target |
||||
|
import tcp_Ser |
||||
|
# 定义全局变量 |
||||
|
drawing = False # 是否正在绘制 |
||||
|
start_point=models.target.Point |
||||
|
end_point = models.target.Point |
||||
|
target_rectangle_dict:Dict[int,models.target.CircleTarget]={} |
||||
|
sigExit=False # 是否退出 |
||||
|
#数据广播 |
||||
|
sig_broadcast=True |
||||
|
tcp = tcp_Ser.TcpSer("127.0.0.1", 2230) |
||||
|
myb=threading.Thread(target=tcp.run) |
||||
|
myb.start() |
||||
|
def check_exit(sig, frame): |
||||
|
global sigExit |
||||
|
print(f"收到退出信号 sig={sig}") |
||||
|
sigExit=True |
||||
|
sleep(1) |
||||
|
print("程序退出") |
||||
|
sys.exit(0) |
||||
|
|
||||
|
#鼠标回调函数 |
||||
|
def add_rectangle(event, x, y, flags, param): |
||||
|
global start_point, end_point, drawing |
||||
|
|
||||
|
if event == cv2.EVENT_LBUTTONDOWN: # 左键按下 |
||||
|
print("左键按下") |
||||
|
start_point = models.target.Point(x,y) |
||||
|
end_point = start_point |
||||
|
drawing = True |
||||
|
elif event == cv2.EVENT_MOUSEMOVE: # 鼠标移动 |
||||
|
if drawing: |
||||
|
end_point = models.target.Point(x,y) |
||||
|
elif event == cv2.EVENT_LBUTTONUP: # 左键抬起 |
||||
|
print("左键抬起") |
||||
|
drawing = False |
||||
|
end_point = models.target.Point(x,y) |
||||
|
if start_point==end_point: |
||||
|
return |
||||
|
distance = cv2.norm(tuple(start_point), tuple(end_point), cv2.NORM_L2) |
||||
|
if distance<20: |
||||
|
print("距离小于20,无效区域") |
||||
|
return |
||||
|
target_id=len(target_rectangle_dict) |
||||
|
# 圆标靶半径 mm |
||||
|
radius= 20.0 |
||||
|
area=models.target.RectangleArea(start_point.x,start_point.y,end_point.x-start_point.x,end_point.y-start_point.y) |
||||
|
new_target=models.target.CircleTarget(target_id,area,radius) |
||||
|
print(f"新增区域[{target_id}] => {start_point, end_point}") |
||||
|
target_rectangle_dict[target_id] = new_target |
||||
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
def draw_rectangle(img): |
||||
|
gray_frame = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) |
||||
|
|
||||
|
ret, gray_frame = cv2.threshold(gray_frame, 120, 255, cv2.THRESH_BINARY) |
||||
|
# 高斯滤波 |
||||
|
gray_frame = cv2.GaussianBlur(gray_frame, (5, 5), 1) |
||||
|
cv2.imshow('binaryImg', gray_frame) |
||||
|
|
||||
|
if len(target_rectangle_dict)==0: return |
||||
|
|
||||
|
#上报有新数据的点 |
||||
|
once_upload:Dict[int,models.target.CircleTarget]={} |
||||
|
|
||||
|
# 绘图-历史点 |
||||
|
for i, tr in target_rectangle_dict.items(): |
||||
|
_start_point = tr.rectangle_area[0] |
||||
|
_end_point = tr.rectangle_area[1] |
||||
|
#绘制标靶区域 |
||||
|
cv2.rectangle(img,tuple(_start_point), tuple(_end_point), (255, 0, 0), 2) |
||||
|
#检测 |
||||
|
sub_image = extract_sub_image(gray_frame, _start_point, _end_point) |
||||
|
circles = circle2_detect(sub_image) |
||||
|
if len(circles) == 0: |
||||
|
continue |
||||
|
center,radius=circle_show(img,circles,_start_point) |
||||
|
# 纪录圆心位置 |
||||
|
tr.center_point=center |
||||
|
tr.radius_pix=radius |
||||
|
if tr.is_init: |
||||
|
tr.center_init=tr.center_point |
||||
|
tr.is_init=False |
||||
|
tar = tr.circle_displacement() |
||||
|
msg=f"[{tar.id}]displacement_pix={tar.displacement_pix},displacement_phy={tar.displacement_phy}" |
||||
|
print(msg) |
||||
|
once_upload[tr.id]=tr |
||||
|
|
||||
|
|
||||
|
#过滤无效空数据 |
||||
|
if len(once_upload.items())==0: return |
||||
|
json_str = json.dumps( |
||||
|
{k:asdict(v) for k, v in once_upload.items() if v.is_init==False} |
||||
|
) |
||||
|
print(f"标靶数据={json_str}",json_str) |
||||
|
if sig_broadcast: |
||||
|
tcp.broadcast_message(json_str) |
||||
|
|
||||
|
|
||||
|
def circle_show(img, circles, relative_point:models.target.Point): |
||||
|
font = cv2.FONT_HERSHEY_SIMPLEX |
||||
|
color = (255, 0, 0) # 蓝色 |
||||
|
scale = 0.5 |
||||
|
|
||||
|
circle = max(circles, key=lambda c: c[2]) |
||||
|
# print("画圆", circle) |
||||
|
# 绘制圆心 |
||||
|
center = (circle[0] + relative_point.x, circle[1] + relative_point.y) |
||||
|
center_int = tuple(int(x) for x in center) |
||||
|
cv2.circle(img, center_int, 2, (0, 255, 0), 4) |
||||
|
radius = np.round(circle[2], 3) |
||||
|
radius_int = int(radius) |
||||
|
# 绘制外圆 |
||||
|
cv2.circle(img, center_int, radius_int, (0, 0, 255), 2) |
||||
|
# 打印圆心坐标 |
||||
|
|
||||
|
text1 = f"center:{circle}" |
||||
|
text2 = f"r:{radius}" |
||||
|
txt_location = (center_int[0] + radius_int, center_int[1] + radius_int // 2) |
||||
|
cv2.putText(img, text1, txt_location, font, scale, color, 2) |
||||
|
|
||||
|
cp=models.target.Point(x=center[0], y=center[1]) |
||||
|
return cp,radius |
||||
|
|
||||
|
|
||||
|
def circle2_detect(img): |
||||
|
# 圆心距 canny阈值 最小半径 最大半径 |
||||
|
circles_float = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 1.5, 30, param1=300, param2=0.9, minRadius=15, |
||||
|
maxRadius=0) |
||||
|
# 创建一个0行, 2列的空数组 |
||||
|
if circles_float is not None: |
||||
|
num_circles = circles_float.shape[1] # 获取检测到的圆的数量 |
||||
|
print("圆的数量", num_circles) |
||||
|
# 提取圆心坐标(保留2位小数) |
||||
|
centers = [(round(float(x),2), round(float(y),2), round(float(r),2)) for x, y, r in circles_float[0, :]] |
||||
|
return centers |
||||
|
else: |
||||
|
return [] |
||||
|
|
||||
|
|
||||
|
def extract_sub_image(frame, top_left, bottom_right): |
||||
|
""" |
||||
|
从帧中截取子区域 |
||||
|
:param frame: 输入的视频帧 |
||||
|
:param top_left: 子图片的左上角坐标 (x1, y1) |
||||
|
:param bottom_right: 子图片的右下角坐标 (x2, y2) |
||||
|
:return: 截取的子图片 |
||||
|
""" |
||||
|
x1, y1 = top_left |
||||
|
x2, y2 = bottom_right |
||||
|
return frame[y1:y2, x1:x2] |
||||
|
|
||||
|
|
||||
|
def open_video(video_id): |
||||
|
cap = cv2.VideoCapture(video_id) |
||||
|
# cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1600) # 宽度 |
||||
|
# cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 900) # 高度 |
||||
|
if not cap.isOpened(): |
||||
|
print("无法打开摄像头") |
||||
|
exit() |
||||
|
return cap |
||||
|
|
||||
|
|
||||
|
def show_video(cap): |
||||
|
global sigExit,start_point, end_point, drawing |
||||
|
cv2.namedWindow('Frame') |
||||
|
cv2.setMouseCallback('Frame', add_rectangle) |
||||
|
# 读取一帧图像 |
||||
|
while True: |
||||
|
ret, frame = cap.read() |
||||
|
if ret: |
||||
|
frame_handle(frame) |
||||
|
else: |
||||
|
print("无法读取帧") |
||||
|
if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 |
||||
|
break |
||||
|
if sigExit: |
||||
|
break |
||||
|
def show_image(frame): |
||||
|
global start_point, end_point, drawing |
||||
|
cv2.namedWindow('Frame') |
||||
|
cv2.setMouseCallback('Frame', add_rectangle) |
||||
|
# 读取一帧图像 |
||||
|
while True: |
||||
|
cp_img=frame.copy() |
||||
|
frame_handle(cp_img) |
||||
|
|
||||
|
if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 |
||||
|
break |
||||
|
cv2.destroyAllWindows() |
||||
|
|
||||
|
def frame_handle(frame): |
||||
|
# 绘图-历史点 |
||||
|
draw_rectangle(frame) |
||||
|
# 绘图-实时 |
||||
|
if drawing: |
||||
|
cv2.rectangle(frame, tuple(start_point), tuple(end_point), (0, 200, 200), 4) |
||||
|
# print(f"鼠标位置 {start_point} -> {end_point}") |
||||
|
# 显示图像 |
||||
|
cv2.imshow('Frame', frame) |
||||
|
|
||||
|
# 读取图像 |
||||
|
#img_copy = img.copy() # 复制图像用于还原 |
||||
|
|
||||
|
def video_mode(video_id): |
||||
|
capture = open_video(video_id) |
||||
|
fps = capture.get(cv2.CAP_PROP_FPS) |
||||
|
print(f"fps={fps}") |
||||
|
show_video(capture) |
||||
|
# 释放摄像头资源并关闭所有窗口 |
||||
|
capture.release() |
||||
|
cv2.destroyAllWindows() |
||||
|
|
||||
|
def image_mode(): |
||||
|
img_raw=cv2.imread('images/trans/_4point.jpg')#images/target/rp80max3.jpg |
||||
|
# img_raw = cv2.imread('images/trans/_4point.jpg') # images/target/rp80max3.jpg |
||||
|
# img_raw = cv2.imread('images/target/rp80.jpg') # images/target/rp80max3.jpg |
||||
|
show_image(img_raw) |
||||
|
|
||||
|
def rtsp_mode(): |
||||
|
rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554" |
||||
|
capture = open_video(rtsp_url) |
||||
|
fps = capture.get(cv2.CAP_PROP_FPS) |
||||
|
print(f"fps={fps}") |
||||
|
show_video(capture) |
||||
|
# 释放摄像头资源并关闭所有窗口 |
||||
|
capture.release() |
||||
|
cv2.destroyAllWindows() |
||||
|
|
||||
|
if __name__ == '__main__': |
||||
|
signal.signal(signal.SIGINT, check_exit) |
||||
|
|
||||
|
# rtsp_mode() |
||||
|
video_mode(0) |
||||
|
# image_mode() |
||||
|
|
||||
|
cv2.waitKey(0) |
||||
|
cv2.destroyAllWindows() |
||||
|
|
||||
|
|
@ -0,0 +1,347 @@ |
|||||
|
from datetime import datetime |
||||
|
import json |
||||
|
import queue |
||||
|
import time |
||||
|
from time import sleep |
||||
|
from dataclasses import asdict |
||||
|
import cv2 |
||||
|
import numpy as np |
||||
|
import signal |
||||
|
import sys |
||||
|
import threading |
||||
|
import logging |
||||
|
|
||||
|
import configSer |
||||
|
import models.target |
||||
|
import models.sampleMsg |
||||
|
import tcp_Ser |
||||
|
import upload.DataReporter |
||||
|
import utils |
||||
|
from models.msg import Msg |
||||
|
|
||||
|
logging.basicConfig(level=logging.DEBUG) |
||||
|
drawing: bool = False # 是否正在绘制 |
||||
|
sigExit: bool = False # 是否退出 |
||||
|
# 定义点 |
||||
|
start_point: models.target.Point |
||||
|
end_point: models.target.Point |
||||
|
# 配置 |
||||
|
configObj:configSer.ConfigOperate |
||||
|
|
||||
|
# 鼠标回调函数 |
||||
|
def add_rectangle(event, x, y, flags, param): |
||||
|
global start_point, end_point, drawing |
||||
|
|
||||
|
if event == cv2.EVENT_LBUTTONDOWN: # 左键按下 |
||||
|
logging.info("左键按下") |
||||
|
start_point = models.target.Point(x, y) |
||||
|
end_point = start_point |
||||
|
drawing = True |
||||
|
elif event == cv2.EVENT_MOUSEMOVE: # 鼠标移动 |
||||
|
if drawing: |
||||
|
end_point = models.target.Point(x, y) |
||||
|
elif event == cv2.EVENT_LBUTTONUP: # 左键抬起 |
||||
|
logging.info("左键抬起") |
||||
|
drawing = False |
||||
|
end_point = models.target.Point(x, y) |
||||
|
if start_point == end_point: |
||||
|
return |
||||
|
distance = cv2.norm(tuple(start_point), tuple(end_point), cv2.NORM_L2) |
||||
|
if distance < 20: |
||||
|
logging.info("距离小于20,无效区域") |
||||
|
return |
||||
|
target_id = len(configObj.config_info.targets) |
||||
|
# 圆标靶半径 mm |
||||
|
radius = 20.0 |
||||
|
area=models.target.RectangleArea(int(start_point.x),int(start_point.y), |
||||
|
int(end_point.x-start_point.x),int(end_point.y-start_point.y)) |
||||
|
t_info=models.target.TargetInfo( target_id, |
||||
|
"test add", |
||||
|
area, |
||||
|
radius, |
||||
|
models.target.Threshold(128,9), |
||||
|
False) |
||||
|
new_target = models.target.CircleTarget(t_info,None,None,None,None) |
||||
|
logging.info(f"新增区域[{target_id}] => {start_point, end_point}") |
||||
|
configObj.config_info.targets[target_id] = new_target |
||||
|
|
||||
|
def read_target_rectangle(): |
||||
|
return configObj.config_info.targets |
||||
|
class VideoProcessor: |
||||
|
reporter: upload.DataReporter.DataReporter |
||||
|
|
||||
|
def __init__(self, reporter:upload.DataReporter.DataReporter): |
||||
|
self.reporter = reporter |
||||
|
|
||||
|
def on_data(self,msg:Msg): |
||||
|
global configObj |
||||
|
logging.info(f"msg={msg}") |
||||
|
match msg.cmd: |
||||
|
case "getPoints": |
||||
|
data_dict = {k: asdict(v.info) for k, v in configObj.config_info.targets.items()} |
||||
|
resp_msg = models.msg.Msg(_from="dev", cmd="getPoints", values={"targets": data_dict}) |
||||
|
resp_json = resp_msg.to_json_() |
||||
|
return resp_json |
||||
|
case "setPoints": |
||||
|
v=msg.values |
||||
|
ts=v["targets"] |
||||
|
|
||||
|
# 清空原配置 |
||||
|
configObj.config_info.targets={} |
||||
|
for _,t in ts.items(): |
||||
|
t_str=json.dumps(t) |
||||
|
t_info = models.target.TargetInfo.from_json(t_str) |
||||
|
c_target=models.target.CircleTarget.init_by_info(t_info) |
||||
|
configObj.config_info.targets[c_target.info.id] =c_target |
||||
|
|
||||
|
configObj.save2json_file() |
||||
|
resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True}) |
||||
|
resp_json = resp_msg.to_json() |
||||
|
return resp_json |
||||
|
print("==") |
||||
|
|
||||
|
|
||||
|
def update_thresh_binary(self,v:int): |
||||
|
self.thresh_binary = v |
||||
|
|
||||
|
|
||||
|
def pre_handler_img(self,gray_frame,now_str:str): |
||||
|
# 将灰度图压缩为 JPEG 格式,并存储到内存缓冲区 |
||||
|
img_base64 = utils.frame_to_base64(gray_frame, format="JPEG") |
||||
|
all_img = models.sampleMsg.AllImg(image=img_base64, time=now_str) |
||||
|
self.enqueue_image(all_img) |
||||
|
|
||||
|
def draw_rectangle(self,img): |
||||
|
global configObj |
||||
|
gray_frame = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) |
||||
|
|
||||
|
|
||||
|
#图像发送 |
||||
|
now_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] |
||||
|
self.pre_handler_img(gray_frame,now_str) |
||||
|
|
||||
|
|
||||
|
if len(configObj.config_info.targets)==0: return |
||||
|
#上报有新数据的点 |
||||
|
all_upload_data = models.sampleMsg.AllSensorData(data=[], time=now_str) |
||||
|
# 绘图-历史点 |
||||
|
for i, tr in configObj.config_info.targets.items(): |
||||
|
if not hasattr(tr, "info"): |
||||
|
print("====") |
||||
|
_start_point = models.target.Point(tr.info.rectangle_area.x, tr.info.rectangle_area.y) |
||||
|
_end_point = models.target.Point( |
||||
|
tr.info.rectangle_area.x+tr.info.rectangle_area.w, |
||||
|
tr.info.rectangle_area.y+tr.info.rectangle_area.h) |
||||
|
#绘制标靶区域 |
||||
|
cv2.rectangle(img,tuple(_start_point), tuple(_end_point), (255, 0, 0), 2) |
||||
|
#检测 |
||||
|
sub_image = self.extract_sub_image(gray_frame, _start_point, _end_point) |
||||
|
|
||||
|
ret, sub_binary_frame = cv2.threshold(sub_image, tr.info.threshold.binary, 255, cv2.THRESH_BINARY) |
||||
|
# 高斯滤波 |
||||
|
sub_binary_frame = cv2.GaussianBlur(sub_binary_frame, (tr.info.threshold.gauss, tr.info.threshold.gauss), 1) |
||||
|
cv2.imshow(f'{tr.info.id}_binaryImg', sub_binary_frame) |
||||
|
|
||||
|
# 覆盖原图 |
||||
|
# sub_c_img= cv2.cvtColor(sub_binary_frame, cv2.COLOR_GRAY2BGR) |
||||
|
# self.cover_sub_image(img,sub_c_img, _start_point, _end_point) |
||||
|
|
||||
|
circles = self.circle2_detect(sub_binary_frame) |
||||
|
if len(circles) == 0: |
||||
|
continue |
||||
|
center,radius=self.circle_show(img,circles,_start_point) |
||||
|
# 纪录圆心位置 |
||||
|
tr.center_point=center |
||||
|
tr.radius_pix=radius |
||||
|
if tr.is_init: |
||||
|
tr.center_init=tr.center_point |
||||
|
tr.is_init=False |
||||
|
tr.circle_displacement() |
||||
|
|
||||
|
all_upload_data.data.append( |
||||
|
models.sampleMsg.SensorData( |
||||
|
str(tr.info.id), |
||||
|
tr.displacement_phy.x, |
||||
|
tr.displacement_phy.y) |
||||
|
) |
||||
|
|
||||
|
#过滤无效空数据 |
||||
|
if len(all_upload_data.data)==0: |
||||
|
return |
||||
|
# json_str = json.dumps( |
||||
|
# {k:asdict(v) for k, v in once_upload.items() if v.is_init==False} |
||||
|
# ) |
||||
|
|
||||
|
# print(f"标靶数据={json_str}",json_str) |
||||
|
self.enqueue_data(all_upload_data) |
||||
|
|
||||
|
|
||||
|
def circle_show(self,img, circles, relative_point:models.target.Point): |
||||
|
font = cv2.FONT_HERSHEY_SIMPLEX |
||||
|
color = (255, 0, 0) # 蓝色 |
||||
|
scale = 0.5 |
||||
|
|
||||
|
circle = max(circles, key=lambda c: c[2]) |
||||
|
|
||||
|
# 绘制圆心 |
||||
|
center = (circle[0] + relative_point.x, circle[1] + relative_point.y) |
||||
|
center_int = tuple(int(x) for x in center) |
||||
|
cv2.circle(img, center_int, 2, (0, 255, 0), 4) |
||||
|
radius = np.round(circle[2], 3) |
||||
|
radius_int = int(radius) |
||||
|
# 绘制外圆 |
||||
|
cv2.circle(img, center_int, radius_int, (0, 0, 255), 2) |
||||
|
# 打印圆心坐标 |
||||
|
|
||||
|
text1 = f"center:{circle}" |
||||
|
text2 = f"r:{radius}" |
||||
|
txt_location = (center_int[0] + radius_int, center_int[1] + radius_int // 2) |
||||
|
cv2.putText(img, text1, txt_location, font, scale, color, 2) |
||||
|
|
||||
|
cp = models.target.Point(x=center[0], y=center[1]) |
||||
|
return cp,radius |
||||
|
|
||||
|
|
||||
|
def circle2_detect(self,img): |
||||
|
# 圆心距 canny阈值 最小半径 最大半径 |
||||
|
circles_float = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 1.5, 30, param1=300, param2=0.9, minRadius=15, |
||||
|
maxRadius=0) |
||||
|
# 创建一个0行, 2列的空数组 |
||||
|
if circles_float is not None: |
||||
|
# 提取圆心坐标(保留2位小数) |
||||
|
centers = [(round(float(x),2), round(float(y),2), round(float(r),2)) for x, y, r in circles_float[0, :]] |
||||
|
return centers |
||||
|
else: |
||||
|
return [] |
||||
|
|
||||
|
|
||||
|
def extract_sub_image(self,frame, top_left, bottom_right): |
||||
|
""" |
||||
|
从帧中截取子区域 |
||||
|
:param frame: 输入的视频帧 |
||||
|
:param top_left: 子图片的左上角坐标 (x1, y1) |
||||
|
:param bottom_right: 子图片的右下角坐标 (x2, y2) |
||||
|
:return: 截取的子图片 |
||||
|
""" |
||||
|
x1, y1 = top_left |
||||
|
x2, y2 = bottom_right |
||||
|
return frame[y1:y2, x1:x2] |
||||
|
|
||||
|
def cover_sub_image(self,frame,sub_frame, top_left, bottom_right): |
||||
|
x1, y1 = top_left |
||||
|
x2, y2 = bottom_right |
||||
|
frame[y1:y2, x1:x2]= sub_frame |
||||
|
return frame |
||||
|
|
||||
|
|
||||
|
def open_video(self,video_id): |
||||
|
cap = cv2.VideoCapture(video_id) |
||||
|
# cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1600) # 宽度 |
||||
|
# cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 900) # 高度 |
||||
|
if not cap.isOpened(): |
||||
|
logging.info("无法打开摄像头") |
||||
|
exit() |
||||
|
return cap |
||||
|
|
||||
|
|
||||
|
def show_video(self,cap): |
||||
|
global sigExit,start_point, end_point, drawing |
||||
|
cv2.namedWindow('Frame') |
||||
|
cv2.setMouseCallback('Frame', add_rectangle) |
||||
|
# 读取一帧图像 |
||||
|
while True: |
||||
|
ret, frame = cap.read() |
||||
|
if ret: |
||||
|
self.frame_handle(frame) |
||||
|
else: |
||||
|
logging.info("无法读取帧") |
||||
|
if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 |
||||
|
break |
||||
|
if sigExit: |
||||
|
break |
||||
|
def show_image(self,frame): |
||||
|
global start_point, end_point, drawing |
||||
|
cv2.namedWindow('Frame') |
||||
|
cv2.setMouseCallback('Frame', add_rectangle) |
||||
|
# 读取一帧图像 |
||||
|
while True: |
||||
|
cp_img=frame.copy() |
||||
|
self.frame_handle(cp_img) |
||||
|
|
||||
|
if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 |
||||
|
break |
||||
|
cv2.destroyAllWindows() |
||||
|
|
||||
|
def frame_handle(self,frame): |
||||
|
# 绘图-历史点 |
||||
|
self.draw_rectangle(frame) |
||||
|
# 绘图-实时 |
||||
|
if drawing: |
||||
|
cv2.rectangle(frame, tuple(start_point), tuple(end_point), (0, 200, 200), 4) |
||||
|
# print(f"鼠标位置 {start_point} -> {end_point}") |
||||
|
# 显示图像 |
||||
|
cv2.imshow('Frame', frame) |
||||
|
|
||||
|
# 读取图像 |
||||
|
#img_copy = img.copy() # 复制图像用于还原 |
||||
|
|
||||
|
def image_mode(self): |
||||
|
img_raw=cv2.imread('images/trans/_4point.jpg')#images/target/rp80max3.jpg |
||||
|
# img_raw = cv2.imread('images/trans/_4point.jpg') # images/target/rp80max3.jpg |
||||
|
# img_raw = cv2.imread('images/target/rp80.jpg') # images/target/rp80max3.jpg |
||||
|
self.show_image(img_raw) |
||||
|
|
||||
|
# 支持 |
||||
|
def video_mode(self,video_id): |
||||
|
capture = self.open_video(video_id) |
||||
|
fps = capture.get(cv2.CAP_PROP_FPS) |
||||
|
print(f"fps={fps}") |
||||
|
self.show_video(capture) |
||||
|
# 释放摄像头资源并关闭所有窗口 |
||||
|
capture.release() |
||||
|
cv2.destroyAllWindows() |
||||
|
|
||||
|
def rtsp_mode(self,rtsp_url:str): |
||||
|
# rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554" |
||||
|
# rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554/h264/ch1/main/av_stream" |
||||
|
capture = self.open_video(rtsp_url) |
||||
|
fps = capture.get(cv2.CAP_PROP_FPS) |
||||
|
print(f"fps={fps}") |
||||
|
self.show_video(capture) |
||||
|
# 释放摄像头资源并关闭所有窗口 |
||||
|
capture.release() |
||||
|
cv2.destroyAllWindows() |
||||
|
|
||||
|
def enqueue_data(self,data): |
||||
|
# 获取当前时间戳 |
||||
|
timestamp = time.time() |
||||
|
# 将时间戳转换为 datetime 对象 |
||||
|
dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位 |
||||
|
# 放入图片队列(自动丢弃最旧数据当队列满时) |
||||
|
try: |
||||
|
self.reporter.data_queue.put((dt, data), block=False) |
||||
|
except queue.Full: |
||||
|
self.reporter.data_dropped += 1 |
||||
|
def enqueue_image(self,data): |
||||
|
# 获取当前时间戳 |
||||
|
timestamp = time.time() |
||||
|
# 将时间戳转换为 datetime 对象 |
||||
|
dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位 |
||||
|
# 放入图片队列(自动丢弃最旧数据当队列满时) |
||||
|
try: |
||||
|
self.reporter.image_queue.put((dt, data), block=False) |
||||
|
except queue.Full: |
||||
|
self.reporter.image_dropped += 1 |
||||
|
|
||||
|
#数据广播 |
||||
|
def check_exit(sig, frame): |
||||
|
global sigExit |
||||
|
logging.info(f"收到退出信号 sig={sig}") |
||||
|
sigExit=True |
||||
|
sleep(1) |
||||
|
logging.info("程序退出") |
||||
|
sys.exit(0) |
||||
|
|
||||
|
|
||||
|
|
||||
|
|
@ -0,0 +1,47 @@ |
|||||
|
import cv2 |
||||
|
import numpy as np |
||||
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
def find_line(image): |
||||
|
if image is None: |
||||
|
print("Error: Unable to load image.") |
||||
|
exit() |
||||
|
gray_frame = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
||||
|
blurred_image = cv2.GaussianBlur(gray_frame, (9, 9), 0) |
||||
|
ret, gray_frame = cv2.threshold(blurred_image, 50, 255, cv2.THRESH_BINARY) |
||||
|
cv2.imshow("binary", gray_frame) |
||||
|
# edges = cv2.Canny(blurred_image, 50, 150, apertureSize=3) |
||||
|
# 应用边缘检测 |
||||
|
edges = cv2.Canny(gray_frame, 200, 400, apertureSize=3) |
||||
|
|
||||
|
# 使用标准霍夫变换检测直线 |
||||
|
# 使用标准霍夫变换检测直线 |
||||
|
lines = cv2.HoughLines(edges, rho=3, theta=np.pi / 180, threshold=200) |
||||
|
|
||||
|
# 绘制检测到的直线 |
||||
|
if lines is not None: |
||||
|
for line in lines: |
||||
|
rho, theta = line[0] |
||||
|
a = np.cos(theta) |
||||
|
b = np.sin(theta) |
||||
|
x0 = a * rho |
||||
|
y0 = b * rho |
||||
|
x1 = int(x0 + 1000 * (-b)) |
||||
|
y1 = int(y0 + 1000 * (a)) |
||||
|
x2 = int(x0 - 1000 * (-b)) |
||||
|
y2 = int(y0 - 1000 * (a)) |
||||
|
cv2.line(image, (x1, y1), (x2, y2), (0, 0, 255), 2) |
||||
|
|
||||
|
|
||||
|
|
||||
|
if __name__ == '__main__': |
||||
|
# 读取图像 |
||||
|
img = cv2.imread("images/trans/subRawImg.jpg") |
||||
|
|
||||
|
find_line(img) |
||||
|
# 显示结果 |
||||
|
cv2.imshow("Detected Lines", img) |
||||
|
cv2.waitKey(0) |
||||
|
cv2.destroyAllWindows() |
@ -0,0 +1,30 @@ |
|||||
|
|
||||
|
import matplotlib.pyplot as plt |
||||
|
import numpy as np |
||||
|
|
||||
|
|
||||
|
# 存储初始范围和当前比例 |
||||
|
initial_limits = {'x': None, 'y': None} |
||||
|
current_scale = 1.0 |
||||
|
fig, ax = plt.subplots() |
||||
|
x = np.linspace(0, 10, 100) |
||||
|
ax.plot(x, np.sin(x)) |
||||
|
def on_press(event): |
||||
|
if event.button == 3: # 右键按下 |
||||
|
initial_limits['x'] = ax.get_xlim() |
||||
|
initial_limits['y'] = ax.get_ylim() |
||||
|
|
||||
|
def on_release(event): |
||||
|
global current_scale |
||||
|
if event.button == 3 and initial_limits['x'] is not None: |
||||
|
new_xlim = ax.get_xlim() |
||||
|
scale_x = (initial_limits['x'][1] - initial_limits['x'][0]) / \ |
||||
|
(new_xlim[1] - new_xlim[0]) |
||||
|
print(f"X轴缩放比例: {scale_x:.2f}倍") |
||||
|
current_scale *= scale_x |
||||
|
print(f"累计总缩放: {current_scale:.2f}倍") |
||||
|
|
||||
|
if __name__ == '__main__': |
||||
|
fig.canvas.mpl_connect('button_press_event', on_press) |
||||
|
fig.canvas.mpl_connect('button_release_event', on_release) |
||||
|
plt.show() |
@ -0,0 +1,62 @@ |
|||||
|
import cv2 |
||||
|
import numpy as np |
||||
|
import 标靶识别 as ie |
||||
|
|
||||
|
|
||||
|
def perspective_transformation(image): |
||||
|
if image is None: |
||||
|
print("Error: Unable to load image.") |
||||
|
exit() |
||||
|
# 定义源图像中的四个点 |
||||
|
src_points = np.float32([ |
||||
|
[4, 16], # 左上角 |
||||
|
[795, 14], # 右上角 |
||||
|
[1027, 736], # 右下角 |
||||
|
[181, 856] # 左下角 |
||||
|
]) |
||||
|
# 定义目标图像中的四个点 # 1050 * 900 |
||||
|
dst_points = np.float32([ |
||||
|
[4, 16], # 左上角 |
||||
|
[795, 14], # 右上角 |
||||
|
[795, 850], # 右下角 |
||||
|
[4, 850] # 左下角 |
||||
|
]) |
||||
|
|
||||
|
# 计算透视变换矩阵 |
||||
|
M = cv2.getPerspectiveTransform(src_points, dst_points) |
||||
|
print(f"原矩阵={M}") |
||||
|
# 逆矩阵 |
||||
|
inverse_matrix = np.linalg.inv(M) |
||||
|
print(f"逆矩阵={inverse_matrix}") |
||||
|
# 应用透视变换 |
||||
|
transformed_image = cv2.warpPerspective(image, M, (1050, 900)) |
||||
|
# 应用透视变换 |
||||
|
inv_transformed_image = cv2.warpPerspective(transformed_image, inverse_matrix, (1050, 900)) |
||||
|
|
||||
|
# 显示原始图像和变换后的图像 |
||||
|
cv2.imshow("Original Image", image) |
||||
|
cv2.imshow("Trans Image", transformed_image) |
||||
|
cv2.imshow("iTrans Image", inv_transformed_image) |
||||
|
cv2.waitKey(0) |
||||
|
cv2.destroyAllWindows() |
||||
|
|
||||
|
def sub_img(): |
||||
|
# 读取图像 |
||||
|
image = cv2.imread("images/target/need_trans.jpg") |
||||
|
if image is None: |
||||
|
print("Error: Unable to load image.") |
||||
|
exit() |
||||
|
# 1050 * 900 |
||||
|
startPoint = [550, 700] |
||||
|
endPoint = [1600, 1600] |
||||
|
# 绘制标靶区域 |
||||
|
# 检测 |
||||
|
subImg = ie.extract_sub_image(image, startPoint, endPoint) |
||||
|
# cv2.imshow("subImg", subImg) |
||||
|
# cv2.waitKey(0) |
||||
|
# cv2.destroyAllWindows() |
||||
|
return subImg |
||||
|
|
||||
|
if __name__ == '__main__': |
||||
|
img=sub_img() |
||||
|
perspective_transformation(img) |
@ -0,0 +1,149 @@ |
|||||
|
import cv2 |
||||
|
import numpy as np |
||||
|
|
||||
|
# 全局变量 |
||||
|
points = [] # 存储选择的四个点 |
||||
|
img = None # 存储原始图像 |
||||
|
img_copy = None # 用于绘制的图像副本 |
||||
|
|
||||
|
|
||||
|
def mouse_callback(event, x, y, flags, param): |
||||
|
"""鼠标回调函数,用于选择四个点""" |
||||
|
global img_copy, points |
||||
|
|
||||
|
if event == cv2.EVENT_LBUTTONDOWN: |
||||
|
if len(points) < 4: |
||||
|
points.append((x, y)) |
||||
|
print(f"已选择点 {len(points)}: ({x}, {y})") |
||||
|
|
||||
|
# 在图像上绘制点 |
||||
|
cv2.circle(img_copy, (x, y), 5, (0, 255, 0), -1) |
||||
|
|
||||
|
# 如果已经选择了4个点,绘制连线 |
||||
|
if len(points) == 4: |
||||
|
# 按照上、右、下、左的顺序连接点 |
||||
|
for i in range(4): |
||||
|
cv2.line(img_copy, points[i], points[(i + 1) % 4], (0, 255, 0), 2) |
||||
|
|
||||
|
# 标记每个点的位置 |
||||
|
cv2.putText(img_copy, "Top", points[0], cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) |
||||
|
cv2.putText(img_copy, "Right", points[1], cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) |
||||
|
cv2.putText(img_copy, "Bottom", points[2], cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) |
||||
|
cv2.putText(img_copy, "Left", points[3], cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) |
||||
|
|
||||
|
cv2.imshow("Select Points", img_copy) |
||||
|
|
||||
|
|
||||
|
def perspective_transform(image, src_points): |
||||
|
"""执行透视变换""" |
||||
|
# 将点排序为上、右、下、左 |
||||
|
top, right, bottom, left = src_points |
||||
|
|
||||
|
# 计算新图像的宽度(取左右边的最大值) |
||||
|
width_a = np.linalg.norm(np.array(right) - np.array(left)) |
||||
|
width_b = np.linalg.norm(np.array(top) - np.array(bottom)) |
||||
|
max_width = max(int(width_a), int(width_b)) |
||||
|
|
||||
|
# 计算新图像的高度(取上下边的最大值) |
||||
|
height_a = np.linalg.norm(np.array(bottom) - np.array(top)) |
||||
|
height_b = np.linalg.norm(np.array(right) - np.array(left)) |
||||
|
max_height = max(int(height_a), int(height_b)) |
||||
|
|
||||
|
# 定义目标点 |
||||
|
dst = np.array([ |
||||
|
[0, 0], # 左上角 |
||||
|
[max_width - 1, 0], # 右上角 |
||||
|
[max_width - 1, max_height - 1], # 右下角 |
||||
|
[0, max_height - 1] # 左下角 |
||||
|
], dtype="float32") |
||||
|
|
||||
|
# 转换源点数组为numpy数组 |
||||
|
src = np.array([top, right, bottom, left], dtype="float32") |
||||
|
|
||||
|
# 计算透视变换矩阵 |
||||
|
M = cv2.getPerspectiveTransform(src, dst) |
||||
|
|
||||
|
# 执行透视变换 |
||||
|
warped = cv2.warpPerspective(image, M, (max_width, max_height)) |
||||
|
|
||||
|
return warped |
||||
|
|
||||
|
def k_perspective_transform(image, src_points): |
||||
|
"""执行透视变换""" |
||||
|
# 将点排序为上、右、下、左 |
||||
|
top, right, bottom, left = src_points |
||||
|
|
||||
|
# 计算新图像的宽度(取左右边的最大值) |
||||
|
sub_zy = np.array(right) - np.array(left) |
||||
|
sub_sx = np.array(top) - np.array(bottom) |
||||
|
|
||||
|
sub_x=sub_sx[0]/2 |
||||
|
print("x差值",sub_sx[0]) |
||||
|
|
||||
|
sub_y = sub_zy[1] / 2 |
||||
|
print("y差值", sub_sx[0]) |
||||
|
|
||||
|
# 定义目标点 |
||||
|
dst = np.array([ |
||||
|
[top[0]-sub_x, top[1]], # 左上角 |
||||
|
[right[0], right[1]- sub_y], # 左上角 |
||||
|
[bottom[0]+sub_x, bottom[1]], # 左上角 |
||||
|
[left[0] , left[1]+ sub_y], # 左上角 |
||||
|
], dtype="float32") |
||||
|
|
||||
|
# 转换源点数组为numpy数组 |
||||
|
src = np.array([top, right, bottom, left], dtype="float32") |
||||
|
|
||||
|
# 计算透视变换矩阵 |
||||
|
M = cv2.getPerspectiveTransform(src, dst) |
||||
|
|
||||
|
print(f"矩阵M={M}") |
||||
|
|
||||
|
# 执行透视变换 |
||||
|
warped = cv2.warpPerspective(image, M, (1050, 900)) |
||||
|
|
||||
|
return warped |
||||
|
|
||||
|
def main(): |
||||
|
global img, img_copy, points |
||||
|
|
||||
|
# 读取图像 |
||||
|
img = cv2.imread("images/trans/subRawImg.jpg") |
||||
|
if img is None: |
||||
|
print("无法加载图像,请检查路径是否正确") |
||||
|
return |
||||
|
|
||||
|
img_copy = img.copy() |
||||
|
|
||||
|
# 创建窗口并设置鼠标回调 |
||||
|
cv2.namedWindow("Select Points") |
||||
|
cv2.setMouseCallback("Select Points", mouse_callback) |
||||
|
|
||||
|
print("请按顺序点击选择四个点:上、右、下、左") |
||||
|
print("选择完成后按任意键继续...") |
||||
|
|
||||
|
while True: |
||||
|
cv2.imshow("Select Points", img_copy) |
||||
|
key = cv2.waitKey(1) & 0xFF |
||||
|
|
||||
|
# 如果已经选择了4个点,按任意键继续 |
||||
|
if len(points) == 4: |
||||
|
break |
||||
|
|
||||
|
# 执行透视变换 |
||||
|
warped = k_perspective_transform(img, points) |
||||
|
|
||||
|
# 显示结果 |
||||
|
cv2.imshow("Original Image", img) |
||||
|
cv2.imshow("Transformed", warped) |
||||
|
|
||||
|
output_path="images/trans/_4point.jpg" |
||||
|
cv2.imwrite(output_path, warped) |
||||
|
print(f"图像已保存到 {output_path}") |
||||
|
|
||||
|
cv2.waitKey(0) |
||||
|
cv2.destroyAllWindows() |
||||
|
|
||||
|
|
||||
|
if __name__ == "__main__": |
||||
|
main() |
@ -0,0 +1,126 @@ |
|||||
|
import numpy as np |
||||
|
import matplotlib.pyplot as plt |
||||
|
from mpl_toolkits.mplot3d import Axes3D |
||||
|
import 透视变换 |
||||
|
import cv2 |
||||
|
def build_3d_rotation_matrix(elev, azim, roll): |
||||
|
"""生成基于elev/azim/roll的完整旋转矩阵""" |
||||
|
elev_rad = np.radians(elev) |
||||
|
azim_rad = np.radians(azim) |
||||
|
roll_rad = np.radians(roll) |
||||
|
|
||||
|
# 绕x轴旋转(elevation) |
||||
|
Rx = np.array([ |
||||
|
[1, 0, 0], |
||||
|
[0, np.cos(elev_rad), -np.sin(elev_rad)], |
||||
|
[0, np.sin(elev_rad), np.cos(elev_rad)] |
||||
|
]) |
||||
|
|
||||
|
# 绕y轴旋转(azimuth) |
||||
|
Ry = np.array([ |
||||
|
[np.cos(azim_rad), 0, np.sin(azim_rad)], |
||||
|
[0, 1, 0], |
||||
|
[-np.sin(azim_rad), 0, np.cos(azim_rad)] |
||||
|
]) |
||||
|
|
||||
|
# 绕z轴旋转(roll) |
||||
|
Rz = np.array([ |
||||
|
[np.cos(roll_rad), -np.sin(roll_rad), 0], |
||||
|
[np.sin(roll_rad), np.cos(roll_rad), 0], |
||||
|
[0, 0, 1] |
||||
|
]) |
||||
|
|
||||
|
return Rz @ Ry @ Rx # 组合旋转顺序 |
||||
|
|
||||
|
|
||||
|
|
||||
|
if __name__ == '__main__': |
||||
|
# 参数设置 |
||||
|
elev, azim, roll = 34, 0,-35 |
||||
|
rotation_3d = build_3d_rotation_matrix(elev, azim, roll) |
||||
|
|
||||
|
# 创建单位圆 |
||||
|
theta = np.linspace(0, 2 * np.pi, 100) |
||||
|
x = np.cos(theta) |
||||
|
y = np.sin(theta) |
||||
|
z = np.zeros_like(x) |
||||
|
circle = np.vstack((x, y, z)) # 3xN |
||||
|
|
||||
|
circle_transformed = rotation_3d @ circle |
||||
|
# 逆矩阵 |
||||
|
inverse_matrix = np.linalg.inv(rotation_3d) |
||||
|
# 画图 |
||||
|
fig = plt.figure(figsize=(12, 6)) |
||||
|
# 设置窗口透明度 |
||||
|
fig.canvas.manager.window.attributes('-alpha', 0.6) # 设置窗口透明度为 0.6 |
||||
|
ax1 = fig.add_subplot(121, projection='3d') |
||||
|
ax2 = fig.add_subplot(122, projection='3d') |
||||
|
# 标识圆心 |
||||
|
ax1.scatter(0, 0, 0, color='red', s=20, label='Center') |
||||
|
# 原始单位圆 |
||||
|
ax1.plot(circle[0], circle[1], circle[2], label='Original Circle') |
||||
|
# 原始单位圆 横轴和纵轴 |
||||
|
ax1.plot(circle[0], np.zeros_like(theta), circle[2], label='z View', color='r') |
||||
|
ax1.plot(np.zeros_like(theta), circle[1], circle[2], label='h View', color='b') |
||||
|
ax1.set_title('Original Circle (elev=90°, roll=0°)') |
||||
|
ax1.set_xlim([-1, 1]) |
||||
|
ax1.set_ylim([-1, 1]) |
||||
|
ax1.set_zlim([-1, 1]) |
||||
|
ax1.set_box_aspect([1, 1, 1]) |
||||
|
ax1.set_xlabel('X') |
||||
|
ax1.set_ylabel('Y') |
||||
|
ax1.set_zlabel('Z') |
||||
|
ax1.view_init(elev=90, azim=90) # 从Z轴方向观察 保持opencv方向一致 x->左 y->下 |
||||
|
# 变换后的单位圆 |
||||
|
# 标识圆心 |
||||
|
ax2.scatter(0, 0, 0, color='red', s=20, label='Center') |
||||
|
ax2.plot(circle_transformed[0], circle_transformed[1], circle_transformed[2], color='r', label='Transformed Circle') |
||||
|
ax2.set_title('Transformed (elev=52°, roll=-35°)') |
||||
|
ax2.set_xlim([-1, 1]) |
||||
|
ax2.set_ylim([-1, 1]) |
||||
|
ax2.set_zlim([-1, 1]) |
||||
|
ax2.set_xlabel('X') |
||||
|
ax2.set_ylabel('Y') |
||||
|
ax2.set_zlabel('Z') |
||||
|
ax2.set_box_aspect([1, 1, 1]) |
||||
|
ax2.view_init(elev=90, azim=90) # 从Z轴方向观察 |
||||
|
plt.show() |
||||
|
|
||||
|
image_zhen= cv2.imread("images/trans/transformed_image.jpg") |
||||
|
image_xie = cv2.imread("images/trans/subRawImg.jpg") |
||||
|
# transformed_image_hy = cv2.warpPerspective(transformed_image, inverse_matrix, dsize=(image.shape[1], image.shape[0])) |
||||
|
# # 执行变换(自动计算输出图像尺寸) |
||||
|
# 裁剪像素值到 [0, 255] 范围 |
||||
|
|
||||
|
# 获取图像的大小 |
||||
|
height, width = image_zhen.shape[:2] |
||||
|
# 计算中心点坐标 |
||||
|
center_x = width // 2 |
||||
|
center_y = height // 2 |
||||
|
# 构造一个平移矩阵,将原点移到中心 |
||||
|
M_translate = np.float32([ |
||||
|
[1, 0, -1*center_x], |
||||
|
[0, 1, -1*center_y], |
||||
|
[0, 0, 1] |
||||
|
]) |
||||
|
image_xie_padding = cv2.warpPerspective(image_xie, M_translate, |
||||
|
dsize=(image_xie.shape[1], image_xie.shape[0])) |
||||
|
cv2.imshow("image_xie_padding", image_xie_padding) |
||||
|
# 将平移矩阵与目标变换矩阵结合起来 |
||||
|
# inverse_M_combined = np.dot(M_translate, inverse_matrix) |
||||
|
inverse_matrix[2][2]=1.0 |
||||
|
inverse_M_combined = np.dot(M_translate, inverse_matrix) |
||||
|
print(f"斜矩阵={rotation_3d}") |
||||
|
rotation_3d_int = np.clip(rotation_3d, 0, 255) |
||||
|
print(f"斜矩阵_int={rotation_3d}") |
||||
|
print(f"逆矩阵={inverse_M_combined}") |
||||
|
inverse_M_combined_int = np.clip(inverse_M_combined, 0, 255) |
||||
|
# print(f"逆矩阵_int={inverse_M_combined_int}") |
||||
|
transformed_image_hy = cv2.warpPerspective(image_xie, inverse_M_combined_int, |
||||
|
dsize=(image_xie.shape[1]*2, image_xie.shape[0]*2)) |
||||
|
cv2.imshow("transformed_image_hy", transformed_image_hy) |
||||
|
# transformed_image = cv2.warpPerspective(image_zhen, rotation_3d_int,dsize=(image_zhen.shape[1], image_zhen.shape[0])) |
||||
|
# cv2.imshow("rotated_img", transformed_image) |
||||
|
plt.show() |
||||
|
cv2.waitKey(0) |
||||
|
# cv2.destroyAllWindows() |
@ -0,0 +1,163 @@ |
|||||
|
import cv2 |
||||
|
import numpy as np |
||||
|
from math import cos, sin, radians |
||||
|
|
||||
|
|
||||
|
def get_rotation_matrix(angle_x, angle_y, angle_z): |
||||
|
"""生成3D旋转矩阵""" |
||||
|
# 转换为弧度 |
||||
|
rx = radians(angle_x) |
||||
|
ry = radians(angle_y) |
||||
|
rz = radians(angle_z) |
||||
|
|
||||
|
# X轴旋转矩阵 |
||||
|
mat_x = np.array([ |
||||
|
[1, 0, 0], |
||||
|
[0, cos(rx), -sin(rx)], |
||||
|
[0, sin(rx), cos(rx)] |
||||
|
]) |
||||
|
|
||||
|
# Y轴旋转矩阵 |
||||
|
mat_y = np.array([ |
||||
|
[cos(ry), 0, sin(ry)], |
||||
|
[0, 1, 0], |
||||
|
[-sin(ry), 0, cos(ry)] |
||||
|
]) |
||||
|
|
||||
|
# Z轴旋转矩阵 |
||||
|
mat_z = np.array([ |
||||
|
[cos(rz), -sin(rz), 0], |
||||
|
[sin(rz), cos(rz), 0], |
||||
|
[0, 0, 1] |
||||
|
]) |
||||
|
|
||||
|
# 组合旋转矩阵 |
||||
|
rotation_matrix = np.dot(np.dot(mat_x, mat_y), mat_z) |
||||
|
return rotation_matrix |
||||
|
|
||||
|
|
||||
|
def perspective_transform(image, angle_x=0, angle_y=0, angle_z=0, scale=1.0): |
||||
|
"""应用透视变换""" |
||||
|
h, w = image.shape[:2] |
||||
|
|
||||
|
# 获取旋转矩阵 |
||||
|
rotation_matrix = get_rotation_matrix(angle_x, angle_y, angle_z) |
||||
|
|
||||
|
# 创建3D点到2D点的映射 |
||||
|
# 将2D图像视为3D空间中Z=0平面上的物体 |
||||
|
points_3d = np.array([ |
||||
|
[0, 0, 0], # 左上 |
||||
|
[w, 0, 0], # 右上 |
||||
|
[w, h, 0], # 右下 |
||||
|
[0, h, 0] # 左下 |
||||
|
], dtype=np.float32) |
||||
|
|
||||
|
# 应用旋转 |
||||
|
points_3d_rotated = np.dot(points_3d, rotation_matrix.T) |
||||
|
|
||||
|
# 添加透视效果 - 这里简单地将Z坐标作为深度 |
||||
|
# 可以调整这个值来改变透视强度 |
||||
|
points_2d_homo = points_3d_rotated[:, :2] / (scale - points_3d_rotated[:, 2:3] * 0.001) |
||||
|
|
||||
|
# 计算变换矩阵 |
||||
|
src_points = np.array([[0, 0], [w, 0], [w, h], [0, h]], dtype=np.float32) |
||||
|
dst_points = points_2d_homo.astype(np.float32) |
||||
|
|
||||
|
# 计算中心偏移 |
||||
|
min_xy = dst_points.min(axis=0) |
||||
|
max_xy = dst_points.max(axis=0) |
||||
|
dst_points -= min_xy |
||||
|
|
||||
|
# 计算新图像大小 |
||||
|
new_w = int(max_xy[0] - min_xy[0]) |
||||
|
new_h = int(max_xy[1] - min_xy[1]) |
||||
|
|
||||
|
# 获取透视变换矩阵 |
||||
|
M = cv2.getPerspectiveTransform(src_points, dst_points) |
||||
|
|
||||
|
# 应用变换 |
||||
|
transformed = cv2.warpPerspective(image, M, (new_w, new_h)) |
||||
|
|
||||
|
return transformed |
||||
|
|
||||
|
|
||||
|
def combined_perspective_transform(image, angle_x1, angle_y1, angle_z1, angle_y2, scale1=1.0, scale2=1.0): |
||||
|
"""合并两次变换:第一次任意旋转,第二次Y轴旋转""" |
||||
|
h, w = image.shape[:2] |
||||
|
|
||||
|
# 第一次旋转矩阵 |
||||
|
rot1 = get_rotation_matrix(angle_x1, angle_y1, angle_z1) |
||||
|
|
||||
|
# 第二次旋转矩阵 (Y轴旋转) |
||||
|
rot2 = get_rotation_matrix(0, angle_y2, 0) |
||||
|
|
||||
|
# 合并旋转矩阵 |
||||
|
combined_rot = np.dot(rot2, rot1) |
||||
|
|
||||
|
# 创建3D点到2D点的映射 |
||||
|
points_3d = np.array([ |
||||
|
[0, 0, 0], # 左上 |
||||
|
[w, 0, 0], # 右上 |
||||
|
[w, h, 0], # 右下 |
||||
|
[0, h, 0] # 左下 |
||||
|
], dtype=np.float32) |
||||
|
|
||||
|
# 应用合并后的旋转 |
||||
|
points_3d_rotated = np.dot(points_3d, combined_rot.T) |
||||
|
|
||||
|
# 添加透视效果 |
||||
|
points_2d_homo = points_3d_rotated[:, :2] / ((scale1 * scale2) - points_3d_rotated[:, 2:3] * 0.001) |
||||
|
|
||||
|
# 计算变换矩阵 |
||||
|
src_points = np.array([[0, 0], [w, 0], [w, h], [0, h]], dtype=np.float32) |
||||
|
dst_points = points_2d_homo.astype(np.float32) |
||||
|
|
||||
|
# 计算中心偏移 |
||||
|
min_xy = dst_points.min(axis=0) |
||||
|
max_xy = dst_points.max(axis=0) |
||||
|
dst_points -= min_xy |
||||
|
|
||||
|
# 计算新图像大小 |
||||
|
new_w = int(max_xy[0] - min_xy[0]) |
||||
|
new_h = int(max_xy[1] - min_xy[1]) |
||||
|
|
||||
|
# 获取透视变换矩阵 |
||||
|
M = cv2.getPerspectiveTransform(src_points, dst_points) |
||||
|
|
||||
|
# 应用变换 |
||||
|
transformed = cv2.warpPerspective(image, M, (new_w, new_h)) |
||||
|
|
||||
|
return transformed |
||||
|
|
||||
|
def show_transformed(): |
||||
|
image = cv2.imread('images/trans/transformed_image.jpg') |
||||
|
# 应用透视变换 |
||||
|
# 参数说明:angle_x, angle_y, angle_z 分别为绕X,Y,Z轴的旋转角度 |
||||
|
# scale 控制透视效果的强度 |
||||
|
transformed = perspective_transform(image, angle_x=34, angle_y=43, angle_z=-35, scale=1) |
||||
|
|
||||
|
# 显示结果 |
||||
|
cv2.imshow('Original', image) |
||||
|
cv2.imshow('Transformed', transformed) |
||||
|
cv2.waitKey(0) |
||||
|
cv2.destroyAllWindows() |
||||
|
def show_transformed_combined(): |
||||
|
image = cv2.imread('images/trans/transformed_image.jpg') |
||||
|
if image is None: |
||||
|
print("请替换为您的图片路径") |
||||
|
else: |
||||
|
# 第一次变换:任意角度 |
||||
|
# 第二次变换:Y轴旋转90度 |
||||
|
final_result = combined_perspective_transform( |
||||
|
image, |
||||
|
angle_x1=34, angle_y1=0, angle_z1=-35, # 第一次旋转参数 |
||||
|
angle_y2=43, # 第二次Y轴旋转90度 |
||||
|
scale1=1.2, scale2=1.0 # 透视参数 |
||||
|
) |
||||
|
|
||||
|
cv2.imshow('Original', image) |
||||
|
cv2.imshow('Final Result', final_result) |
||||
|
cv2.waitKey(0) |
||||
|
cv2.destroyAllWindows() |
||||
|
if __name__ == '__main__': |
||||
|
show_transformed_combined() |
@ -0,0 +1,86 @@ |
|||||
|
import cv2 |
||||
|
import numpy as np |
||||
|
from cv2 import waitKey |
||||
|
|
||||
|
|
||||
|
imgRaw=cv2.imread('images/target/bowa_target/min.jpg',cv2.IMREAD_GRAYSCALE)#images/target/rp80max3.jpg |
||||
|
imgColor=cv2.imread('images/target/bowa_target/min.jpg') #images/target/rp80max3.jpg |
||||
|
|
||||
|
ret, binary_img = cv2.threshold(imgRaw, 180, 255, cv2.THRESH_BINARY) |
||||
|
#cv2.imshow('binary_img', binary_img) |
||||
|
def circle_detect(img): |
||||
|
#高斯滤波 |
||||
|
#img = cv2.GaussianBlur(img, (3, 3), 1) |
||||
|
#cv2.imshow('gsmh', gaussianBlur) |
||||
|
# 圆心距 canny阈值 最小半径 最大半径 |
||||
|
circlesFloat = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 2, 10, param1=50, param2=0.9, minRadius=10, maxRadius=0) |
||||
|
print("==========") |
||||
|
# 创建一个0行, 2列的空数组 |
||||
|
if circlesFloat is not None: |
||||
|
num_circles = circlesFloat.shape[1] # 获取检测到的圆的数量 |
||||
|
print("圆的数量",num_circles) |
||||
|
# 提取圆心坐标(保留小数) |
||||
|
centers = [(float(x), float(y),float(r)) for x, y, r in circlesFloat[0, :]] |
||||
|
|
||||
|
font = cv2.FONT_HERSHEY_SIMPLEX |
||||
|
color = (255, 0, 0) # 蓝色 |
||||
|
scale = 1 |
||||
|
# 打印圆心坐标 |
||||
|
for center3d in centers: |
||||
|
center=(center3d[0], center3d[1]) |
||||
|
# center_txt=f"{float(center[0]),float(center[1])}" |
||||
|
text=f"center:{center},r:{center3d[2]}" |
||||
|
print(text) |
||||
|
centerInt=tuple(int(x) for x in center) |
||||
|
cv2.putText(img, text, centerInt, font, scale, color,2) |
||||
|
|
||||
|
circles = np.uint16(np.around(circlesFloat)) # 4舍5入, 然后转为uint16 |
||||
|
for i in circles[0, :]: |
||||
|
print("画圆", i) |
||||
|
# 绘制圆心 |
||||
|
center=(i[0], i[1]) |
||||
|
cv2.circle(img, center, 2, (0, 255, 0), 6) |
||||
|
# 绘制外圆 |
||||
|
cv2.circle(img, center, i[2], (0, 0, 255), 2) |
||||
|
|
||||
|
def circle_detect2(img): |
||||
|
#高斯滤波 |
||||
|
#img = cv2.GaussianBlur(img, (3, 3), 1) |
||||
|
#cv2.imshow('gsmh', gaussianBlur) |
||||
|
# 圆心距 canny阈值 最小半径 最大半径 |
||||
|
circlesFloat = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 2, 10, param1=50, param2=0.9, minRadius=10, maxRadius=0) |
||||
|
print("==========") |
||||
|
# 创建一个0行, 2列的空数组 |
||||
|
if circlesFloat is not None: |
||||
|
num_circles = circlesFloat.shape[1] # 获取检测到的圆的数量 |
||||
|
print("圆的数量",num_circles) |
||||
|
# 提取圆心坐标(保留小数) |
||||
|
centers = [(float(x), float(y),float(r)) for x, y, r in circlesFloat[0, :]] |
||||
|
|
||||
|
font = cv2.FONT_HERSHEY_SIMPLEX |
||||
|
color = (255, 0, 0) # 蓝色 |
||||
|
scale = 1 |
||||
|
# 打印圆心坐标 |
||||
|
for center3d in centers: |
||||
|
center=(center3d[0], center3d[1]) |
||||
|
# center_txt=f"{float(center[0]),float(center[1])}" |
||||
|
text=f"center:{center},r:{center3d[2]}" |
||||
|
print(text) |
||||
|
centerInt=tuple(int(x) for x in center) |
||||
|
cv2.putText(img, text, centerInt, font, scale, color,2) |
||||
|
|
||||
|
circles = np.uint16(np.around(circlesFloat)) # 4舍5入, 然后转为uint16 |
||||
|
for i in circles[0, :]: |
||||
|
print("画圆", i) |
||||
|
# 绘制圆心 |
||||
|
center=(i[0], i[1]) |
||||
|
cv2.circle(img, center, 2, (0, 255, 0), 6) |
||||
|
# 绘制外圆 |
||||
|
cv2.circle(img, center, i[2], (0, 0, 255), 2) |
||||
|
|
||||
|
|
||||
|
if __name__ == '__main__': |
||||
|
circle_detect(binary_img) |
||||
|
cv2.imshow('tagCircle', imgColor) |
||||
|
waitKey(0) |
||||
|
cv2.destroyAllWindows() |
Loading…
Reference in new issue