diff --git a/images/trans/_4point.jpg b/images/trans/_4point.jpg deleted file mode 100644 index 3efc2c3..0000000 Binary files a/images/trans/_4point.jpg and /dev/null differ diff --git a/images/trans/subRawImg.jpg b/images/trans/subRawImg.jpg deleted file mode 100644 index 1889c00..0000000 Binary files a/images/trans/subRawImg.jpg and /dev/null differ diff --git a/images/trans/template.jpg b/images/trans/template.jpg deleted file mode 100644 index 685cfa0..0000000 Binary files a/images/trans/template.jpg and /dev/null differ diff --git a/images/trans/transformed_image.jpg b/images/trans/transformed_image.jpg deleted file mode 100644 index 9764f23..0000000 Binary files a/images/trans/transformed_image.jpg and /dev/null differ diff --git a/models/msg.py b/models/msg.py deleted file mode 100644 index c59606d..0000000 --- a/models/msg.py +++ /dev/null @@ -1,19 +0,0 @@ -import json -import typing -from dataclasses import dataclass -from dataclasses_json import dataclass_json - -@dataclass_json -@dataclass -class Msg: - _from: str - cmd: str - values: typing.Any - - def to_json_(self) -> str: - """将数据类序列化为 JSON 字符串""" - return self.to_json() - - - - diff --git a/models/sampleMsg.py b/models/sampleMsg.py deleted file mode 100644 index 8620eff..0000000 --- a/models/sampleMsg.py +++ /dev/null @@ -1,43 +0,0 @@ -import json -from dataclasses import dataclass -from typing import List - -@dataclass -class SensorImg: - base64: str - - def to_json(self) -> str: - """将数据类序列化为 JSON 字符串""" - return json.dumps(self.__dict__, indent=4, default=lambda x: x.__dict__) - - @classmethod - def from_json(cls, json_str: str) -> 'SensorImg': - """从 JSON 字符串反序列化为数据类""" - data_dict = json.loads(json_str) - return cls(**data_dict) -@dataclass -class SensorData: - pos: str - x: float - y: float - - def to_json(self) -> str: - """将数据类序列化为 JSON 字符串""" - return json.dumps(self.__dict__, indent=4, default=lambda x: x.__dict__) - - @classmethod - def from_json(cls, json_str: str) -> 'SensorData': - """从 JSON 字符串反序列化为数据类""" - data_dict = json.loads(json_str) - return cls(**data_dict) - -@dataclass -class AllSensorData: - data: List[SensorData] - time: str - - -@dataclass -class AllImg: - image: SensorImg - time: str \ No newline at end of file diff --git a/models/target.py b/models/target.py deleted file mode 100644 index 4b62e00..0000000 --- a/models/target.py +++ /dev/null @@ -1,112 +0,0 @@ -from dataclasses import dataclass, field -from typing import Optional, Dict - -import numpy as np -from dataclasses_json import dataclass_json, config - -import utils - - -@dataclass_json -@dataclass -class Point: - x:float - y:float - def __iter__(self): # 使对象可迭代,可直接转为元组 - yield self.x - yield self.y - -@dataclass -class RectangleArea: - x: int - y: int - w: int - h: int - @classmethod - def from_dict(cls, data: dict): - return cls( - x=data['x'], - y=data['y'], - w=data['w'], - h = data['h']) - -@dataclass -class Threshold: - binary: int - gauss: int - -@dataclass_json -@dataclass -class TargetInfo: - # 标靶方形区域 - rectangle_area:RectangleArea - threshold:Threshold - # 标靶物理半径 - radius:float=0.0 - id:int =-1 - desc:str="" - base:bool=False - def __init__(self,id,desc,rectangle_area:RectangleArea,radius,threshold:Threshold,base:bool,**kwargs): - self.id = id - self.desc = desc - self.rectangle_area=rectangle_area - self.radius=radius - self.threshold=threshold - self.base=base - - @classmethod - def from_dict(cls,data: dict): - return cls(data['id'],data['rectangle_area'],data['radius']) - -@dataclass_json -@dataclass -class HandlerInfo: - # 初始话 - is_init=True - radius_pix:float= 1.0 - pix_length:float=0.0 - # 标靶中心 - center_point: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) - center_init : Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) - # 标靶位移(像素) - displacement_pix: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) - displacement_phy: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) - -@dataclass_json -@dataclass -class CircleTarget: - # 标靶方形区域 - info:TargetInfo - # 标靶透视矩阵 - perspective: np.ndarray = field( - metadata=config( - encoder=utils.encode_perspective, - decoder=utils.decode_perspective - ) - ) - handler_info: Optional[HandlerInfo]=None - - - # def __init__(self,info:TargetInfo,center_point,center_init,displacement_pix,displacement_phy): - # self.info=info - # self.center_point=center_point - # self.center_init=center_init - # self. displacement_pix=displacement_pix - # self.displacement_phy=displacement_phy - - - @classmethod - def init_by_info(cls,t:TargetInfo): - return CircleTarget(t,None,None,None,None) - def circle_displacement(self): - previous = self.handler_info.center_init - if previous != (): - self.handler_info.displacement_pix = Point(self.handler_info.center_point.x - previous.x, - self.handler_info.center_point.y - previous.y) - if self.info.radius != 0: - # 单位像素距离 - self.handler_info.pix_length = self.info.radius / self.handler_info.radius_pix - offset_x = round(float(self.handler_info.displacement_pix.x * self.handler_info.pix_length), 5) - offset_y = round(float(self.handler_info.displacement_pix.y * self.handler_info.pix_length), 5) - self.handler_info.displacement_phy = Point(offset_x, offset_y) - return self \ No newline at end of file diff --git a/test/测试.py b/test/测试.py deleted file mode 100644 index 0fb9fce..0000000 --- a/test/测试.py +++ /dev/null @@ -1,18 +0,0 @@ -import signal -import sys -from dataclasses import dataclass, asdict -from datetime import datetime - -def signal_handler(sig, frame): - print(f"收到退出信号 sig={sig},程序退出") - sys.exit(0) - -signal.signal(signal.SIGINT, signal_handler) # 捕获 Ctrl+C 信号 - - -if __name__ == '__main__': - t=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - print(t) - while True: - # 程序主循环 - pass \ No newline at end of file diff --git a/test/测试config.py b/test/测试config.py deleted file mode 100644 index 7be1498..0000000 --- a/test/测试config.py +++ /dev/null @@ -1,29 +0,0 @@ -import json -from dataclasses import asdict - -import configSer - -def test_load_config(): - config_path = "../config.json" - # 读取配置文件 - config: configSer.ConfigOperate = configSer.ConfigOperate(config_path) - json_str2 = config.config_info.to_json(indent=4) - print("json=",json_str2) - # 测试修改相机id - config.config_info.capture=1 - config.save2json_file() - # 更新配置文件 - updates = { - "capture": "rtsp://admin:123456abc@192.168.1.64:554/h264/ch1/main/av_stream", - } - config.update_dict_config(updates) - - # 重新读取配置文件,确认更新 - updated_config = configSer.ConfigOperate(config_path) - print(f"当前新配置capture:{updated_config.capture}") - - - - -if __name__ == "__main__": - test_load_config() \ No newline at end of file diff --git a/test/测试opencv.py b/test/测试opencv.py deleted file mode 100644 index 86a2949..0000000 --- a/test/测试opencv.py +++ /dev/null @@ -1,64 +0,0 @@ -import logging -from time import sleep - -import cv2 -print(cv2.__version__) -def open_video(video_id): - cap = cv2.VideoCapture(video_id) - if not cap.isOpened(): - logging.info("无法打开摄像头") - return cap - -class VideoProcessor: - capture: cv2.VideoCapture - -rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554/h264/ch1/main/av_stream" -print("开始读取2") -capture=open_video(2) -vp = VideoProcessor() -vp.capture = capture -fps = vp.capture .get(cv2.CAP_PROP_FPS) -width = int(vp.capture .get(cv2.CAP_PROP_FRAME_WIDTH)) -height = int(vp.capture .get(cv2.CAP_PROP_FRAME_HEIGHT)) -print(f"fps={fps}") -print("width={width}, height={height}".format(width=width, height=height)) - -cv2.namedWindow('frame') -i = 0 -while True: - ret, frame = vp.capture .read() - if ret: - print("-->") - cv2.imshow("frame", frame) - i=i+1 - if i>10: - break - # 写入帧到输出文件 - # out.write(frame) - else: - logging.info("无法读取图像帧") - break - if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 - break -print("释放2") -vp.capture.release() -cv2.destroyAllWindows() -print("开始读取0") -sleep(2) -vp.capture = open_video(0) -# # 定义视频编 = open_video(0)码器和输出文件 -# fourcc = cv2.VideoWriter_fourcc(*'mp4v') -# out = cv2.VideoWriter('output.mp4', fourcc, fps, (width, height)) -while True: - ret, frame = vp.capture .read() - if ret: - cv2.imshow("frame", frame) - # 写入帧到输出文件 - # out.write(frame) - else: - logging.info("无法读取图像帧") - if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 - break - -vp.capture.release() -# out.release() diff --git a/test/测试序列化.py b/test/测试序列化.py deleted file mode 100644 index 5a4e9f6..0000000 --- a/test/测试序列化.py +++ /dev/null @@ -1,32 +0,0 @@ -from dataclasses import dataclass, field -from typing import Optional - -from dataclasses_json import dataclass_json, config -import json - -import models.target - -@dataclass_json -@dataclass -class Rectangle: - x: int - y: int - width: int - height: int - p: Optional[models.target.Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) - -# 使用 -p=models.target.Point(1,2) -rect = Rectangle(0, 0, 100, 100,None) -# 序列化 -json_str=rect.to_json() -# json_str = json.dumps(rect, default=lambda obj: obj.__dict__) -print(f"序列化后={json_str}") - -rect.p.x=2046 -print("修改后p.x=",p.x) - -new_json='{"x": 1, "y": 1, "width": 100, "height": 100, "p": {"x": 2, "y": 2}}' -nr=Rectangle.from_json(new_json) -print(nr.p) - diff --git a/upload/DataReporter.py b/upload/DataReporter.py deleted file mode 100644 index 49746a1..0000000 --- a/upload/DataReporter.py +++ /dev/null @@ -1,73 +0,0 @@ -import queue -import threading -import time - -import models.msg -from upload.RateLimiter import RateLimiter - - -class DataReporter(threading.Thread): - call_back=None - def __init__(self,data_fps:int,video_fps:int): - super().__init__() - self.image_queue = queue.Queue(maxsize=video_fps) # 图片队列 - self.data_queue = queue.Queue(maxsize=data_fps) # 数据队列 - self.image_limiter = RateLimiter(max_rate=video_fps, time_window=1) # 图片限速: 5张/秒 - self.data_limiter = RateLimiter(max_rate=data_fps, time_window=1) # 数据限速: 20条/秒 - self.running = True - self.image_dropped = 0 # 统计丢弃的图片数量 - self.data_dropped = 0 # 统计丢弃的数据数量 - - def register_handler(self,handler_fun): - self.call_back = handler_fun - - def run(self): - while self.running: - # 优先处理图片上报 - if not self.image_queue.empty() and self.image_limiter.allow_request(): - try: - image_data = self.image_queue.get_nowait() - self._report_image(image_data) - except queue.Empty: - pass - - # 然后处理数据上报 - if not self.data_queue.empty() and self.data_limiter.allow_request(): - try: - data = self.data_queue.get_nowait() - self._report_data(data) - except queue.Empty: - pass - - time.sleep(0.02) # 避免CPU占用过高 - - def _report_image(self, data): - # 实现图片上报逻辑 - print(f"Reporting image, timestamp: {data[0]}") - # 这里替换为实际的上报代码 - msg = models.msg.Msg(_from="dev", cmd="image", values=data[1]) - msg_json = msg.to_json() - self.call_back(msg_json) - - def _report_data(self, data): - # 实现数据上报逻辑 - print(f"Reporting data: {data}") - # 实际的上报代码,数据结构转换 - msg=models.msg.Msg(_from="dev",cmd="data",values=data[1]) - msg_json=msg.to_json() - self.call_back(msg_json) - - def stop(self): - self.running = False - self.join() - print(f"Stats: {self.image_dropped} images dropped, {self.data_dropped} data dropped") - - def adjust_rate(self, new_rate, data_type='image'): - if data_type == 'image': - with self.image_limiter.lock: - self.image_limiter.max_rate = new_rate - self.image_queue= queue.Queue(maxsize=new_rate) - else: - with self.data_limiter.lock: - self.data_limiter.max_rate = new_rate - self.data_queue = queue.Queue(maxsize=new_rate) \ No newline at end of file diff --git a/upload/DroppingQueue.py b/upload/DroppingQueue.py deleted file mode 100644 index d520248..0000000 --- a/upload/DroppingQueue.py +++ /dev/null @@ -1,12 +0,0 @@ -import queue - - -class DroppingQueue(queue.Queue): - """自定义队列,满时自动丢弃最旧的数据""" - def put(self, item, block=False, timeout=None): - try: - return super().put(item, block=block, timeout=timeout) - except queue.Full: - # 队列满时丢弃最旧的一个数据 - self.get_nowait() - return super().put(item, block=False) \ No newline at end of file diff --git a/upload/RateLimiter.py b/upload/RateLimiter.py deleted file mode 100644 index c14051a..0000000 --- a/upload/RateLimiter.py +++ /dev/null @@ -1,23 +0,0 @@ -import threading -import queue -import time -from collections import deque - -class RateLimiter: - def __init__(self, max_rate, time_window): - self.max_rate = max_rate # 最大允许的请求数 - self.time_window = time_window # 时间窗口(秒) - self.timestamps = deque() - self.lock = threading.Lock() - - def allow_request(self): - with self.lock: - current_time = time.time() - # 移除超出时间窗口的时间戳 - while self.timestamps and current_time - self.timestamps[0] > self.time_window: - self.timestamps.popleft() - - if len(self.timestamps) < self.max_rate: - self.timestamps.append(current_time) - return True - return False \ No newline at end of file diff --git a/图像偏移.py b/图像偏移.py deleted file mode 100644 index b1e22f0..0000000 --- a/图像偏移.py +++ /dev/null @@ -1,60 +0,0 @@ -import cv2 -import numpy as np - -def pingyi(x,y): - # 获取图像的大小 - height, width = image.shape[:2] - - - # 构造平移矩阵:将原点移到图像中心 - M_translate = np.float32([ - [1, 0, x], - [0, 1, y], - [0, 0, 1] - ]) - - return M_translate -def xuanzhuan_z(z): - # 构造沿Z轴旋转30度的旋转矩阵 - theta = np.radians(z) - cos_theta = np.cos(theta) - sin_theta = np.sin(theta) - M_z_rotate = np.float32([ - [cos_theta, -sin_theta, 0], - [sin_theta, cos_theta, 0], - [0, 0, 1] - ]) - return M_z_rotate -def xuanzhuan_y(y): - # 构造沿Z轴旋转30度的旋转矩阵 - theta = np.radians(y) - rotation_vector = np.array([0, theta, 0]) # 绕Y轴旋转 - - # 计算旋转矩阵 - rotation_matrix, _ = cv2.Rodrigues(rotation_vector) - return rotation_matrix -if __name__ == '__main__': - # 读取图像 - image = cv2.imread("images/trans/transformed_image.jpg") - # 获取图像的大小 - height, width = image.shape[:2] - - m_pinyi=pingyi(width,0) - # m_xuanzhuan=xuanzhuan_z(30) - m_xuanzhuan = xuanzhuan_y(80) - M_combined = m_pinyi @ m_xuanzhuan - rotation_3d_int = np.clip(M_combined, 0, 255) - # 应用透视变换 - warped_image = cv2.warpPerspective( - image, - M_combined, - (width*2, height*2) - ) - - # 显示结果 - cv2.imshow('Original Image', image) - cv2.imshow('Warped Image', warped_image) - cv2.waitKey(0) - cv2.destroyAllWindows() - - diff --git a/直线检测.py b/直线检测.py deleted file mode 100644 index 4462768..0000000 --- a/直线检测.py +++ /dev/null @@ -1,47 +0,0 @@ -import cv2 -import numpy as np - - - - -def find_line(image): - if image is None: - print("Error: Unable to load image.") - exit() - gray_frame = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) - blurred_image = cv2.GaussianBlur(gray_frame, (9, 9), 0) - ret, gray_frame = cv2.threshold(blurred_image, 50, 255, cv2.THRESH_BINARY) - cv2.imshow("binary", gray_frame) - # edges = cv2.Canny(blurred_image, 50, 150, apertureSize=3) - # 应用边缘检测 - edges = cv2.Canny(gray_frame, 200, 400, apertureSize=3) - - # 使用标准霍夫变换检测直线 - # 使用标准霍夫变换检测直线 - lines = cv2.HoughLines(edges, rho=3, theta=np.pi / 180, threshold=200) - - # 绘制检测到的直线 - if lines is not None: - for line in lines: - rho, theta = line[0] - a = np.cos(theta) - b = np.sin(theta) - x0 = a * rho - y0 = b * rho - x1 = int(x0 + 1000 * (-b)) - y1 = int(y0 + 1000 * (a)) - x2 = int(x0 - 1000 * (-b)) - y2 = int(y0 - 1000 * (a)) - cv2.line(image, (x1, y1), (x2, y2), (0, 0, 255), 2) - - - -if __name__ == '__main__': - # 读取图像 - img = cv2.imread("images/trans/subRawImg.jpg") - - find_line(img) - # 显示结果 - cv2.imshow("Detected Lines", img) - cv2.waitKey(0) - cv2.destroyAllWindows()