Browse Source

delete 删除所有文件 重新传

master
lucas 2 weeks ago
parent
commit
cf9b76723b
  1. BIN
      images/trans/_4point.jpg
  2. BIN
      images/trans/subRawImg.jpg
  3. BIN
      images/trans/template.jpg
  4. BIN
      images/trans/transformed_image.jpg
  5. 19
      models/msg.py
  6. 43
      models/sampleMsg.py
  7. 112
      models/target.py
  8. 18
      test/测试.py
  9. 29
      test/测试config.py
  10. 64
      test/测试opencv.py
  11. 32
      test/测试序列化.py
  12. 73
      upload/DataReporter.py
  13. 12
      upload/DroppingQueue.py
  14. 23
      upload/RateLimiter.py
  15. 60
      图像偏移.py
  16. 47
      直线检测.py

BIN
images/trans/_4point.jpg

Binary file not shown.

Before

Width:  |  Height:  |  Size: 380 KiB

BIN
images/trans/subRawImg.jpg

Binary file not shown.

Before

Width:  |  Height:  |  Size: 444 KiB

BIN
images/trans/template.jpg

Binary file not shown.

Before

Width:  |  Height:  |  Size: 287 KiB

BIN
images/trans/transformed_image.jpg

Binary file not shown.

Before

Width:  |  Height:  |  Size: 350 KiB

19
models/msg.py

@ -1,19 +0,0 @@
import json
import typing
from dataclasses import dataclass
from dataclasses_json import dataclass_json
@dataclass_json
@dataclass
class Msg:
_from: str
cmd: str
values: typing.Any
def to_json_(self) -> str:
"""将数据类序列化为 JSON 字符串"""
return self.to_json()

43
models/sampleMsg.py

@ -1,43 +0,0 @@
import json
from dataclasses import dataclass
from typing import List
@dataclass
class SensorImg:
base64: str
def to_json(self) -> str:
"""将数据类序列化为 JSON 字符串"""
return json.dumps(self.__dict__, indent=4, default=lambda x: x.__dict__)
@classmethod
def from_json(cls, json_str: str) -> 'SensorImg':
"""从 JSON 字符串反序列化为数据类"""
data_dict = json.loads(json_str)
return cls(**data_dict)
@dataclass
class SensorData:
pos: str
x: float
y: float
def to_json(self) -> str:
"""将数据类序列化为 JSON 字符串"""
return json.dumps(self.__dict__, indent=4, default=lambda x: x.__dict__)
@classmethod
def from_json(cls, json_str: str) -> 'SensorData':
"""从 JSON 字符串反序列化为数据类"""
data_dict = json.loads(json_str)
return cls(**data_dict)
@dataclass
class AllSensorData:
data: List[SensorData]
time: str
@dataclass
class AllImg:
image: SensorImg
time: str

112
models/target.py

@ -1,112 +0,0 @@
from dataclasses import dataclass, field
from typing import Optional, Dict
import numpy as np
from dataclasses_json import dataclass_json, config
import utils
@dataclass_json
@dataclass
class Point:
x:float
y:float
def __iter__(self): # 使对象可迭代,可直接转为元组
yield self.x
yield self.y
@dataclass
class RectangleArea:
x: int
y: int
w: int
h: int
@classmethod
def from_dict(cls, data: dict):
return cls(
x=data['x'],
y=data['y'],
w=data['w'],
h = data['h'])
@dataclass
class Threshold:
binary: int
gauss: int
@dataclass_json
@dataclass
class TargetInfo:
# 标靶方形区域
rectangle_area:RectangleArea
threshold:Threshold
# 标靶物理半径
radius:float=0.0
id:int =-1
desc:str=""
base:bool=False
def __init__(self,id,desc,rectangle_area:RectangleArea,radius,threshold:Threshold,base:bool,**kwargs):
self.id = id
self.desc = desc
self.rectangle_area=rectangle_area
self.radius=radius
self.threshold=threshold
self.base=base
@classmethod
def from_dict(cls,data: dict):
return cls(data['id'],data['rectangle_area'],data['radius'])
@dataclass_json
@dataclass
class HandlerInfo:
# 初始话
is_init=True
radius_pix:float= 1.0
pix_length:float=0.0
# 标靶中心
center_point: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None))
center_init : Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None))
# 标靶位移(像素)
displacement_pix: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None))
displacement_phy: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None))
@dataclass_json
@dataclass
class CircleTarget:
# 标靶方形区域
info:TargetInfo
# 标靶透视矩阵
perspective: np.ndarray = field(
metadata=config(
encoder=utils.encode_perspective,
decoder=utils.decode_perspective
)
)
handler_info: Optional[HandlerInfo]=None
# def __init__(self,info:TargetInfo,center_point,center_init,displacement_pix,displacement_phy):
# self.info=info
# self.center_point=center_point
# self.center_init=center_init
# self. displacement_pix=displacement_pix
# self.displacement_phy=displacement_phy
@classmethod
def init_by_info(cls,t:TargetInfo):
return CircleTarget(t,None,None,None,None)
def circle_displacement(self):
previous = self.handler_info.center_init
if previous != ():
self.handler_info.displacement_pix = Point(self.handler_info.center_point.x - previous.x,
self.handler_info.center_point.y - previous.y)
if self.info.radius != 0:
# 单位像素距离
self.handler_info.pix_length = self.info.radius / self.handler_info.radius_pix
offset_x = round(float(self.handler_info.displacement_pix.x * self.handler_info.pix_length), 5)
offset_y = round(float(self.handler_info.displacement_pix.y * self.handler_info.pix_length), 5)
self.handler_info.displacement_phy = Point(offset_x, offset_y)
return self

18
test/测试.py

@ -1,18 +0,0 @@
import signal
import sys
from dataclasses import dataclass, asdict
from datetime import datetime
def signal_handler(sig, frame):
print(f"收到退出信号 sig={sig},程序退出")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler) # 捕获 Ctrl+C 信号
if __name__ == '__main__':
t=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print(t)
while True:
# 程序主循环
pass

29
test/测试config.py

@ -1,29 +0,0 @@
import json
from dataclasses import asdict
import configSer
def test_load_config():
config_path = "../config.json"
# 读取配置文件
config: configSer.ConfigOperate = configSer.ConfigOperate(config_path)
json_str2 = config.config_info.to_json(indent=4)
print("json=",json_str2)
# 测试修改相机id
config.config_info.capture=1
config.save2json_file()
# 更新配置文件
updates = {
"capture": "rtsp://admin:123456abc@192.168.1.64:554/h264/ch1/main/av_stream",
}
config.update_dict_config(updates)
# 重新读取配置文件,确认更新
updated_config = configSer.ConfigOperate(config_path)
print(f"当前新配置capture:{updated_config.capture}")
if __name__ == "__main__":
test_load_config()

64
test/测试opencv.py

@ -1,64 +0,0 @@
import logging
from time import sleep
import cv2
print(cv2.__version__)
def open_video(video_id):
cap = cv2.VideoCapture(video_id)
if not cap.isOpened():
logging.info("无法打开摄像头")
return cap
class VideoProcessor:
capture: cv2.VideoCapture
rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554/h264/ch1/main/av_stream"
print("开始读取2")
capture=open_video(2)
vp = VideoProcessor()
vp.capture = capture
fps = vp.capture .get(cv2.CAP_PROP_FPS)
width = int(vp.capture .get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(vp.capture .get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"fps={fps}")
print("width={width}, height={height}".format(width=width, height=height))
cv2.namedWindow('frame')
i = 0
while True:
ret, frame = vp.capture .read()
if ret:
print("-->")
cv2.imshow("frame", frame)
i=i+1
if i>10:
break
# 写入帧到输出文件
# out.write(frame)
else:
logging.info("无法读取图像帧")
break
if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环
break
print("释放2")
vp.capture.release()
cv2.destroyAllWindows()
print("开始读取0")
sleep(2)
vp.capture = open_video(0)
# # 定义视频编 = open_video(0)码器和输出文件
# fourcc = cv2.VideoWriter_fourcc(*'mp4v')
# out = cv2.VideoWriter('output.mp4', fourcc, fps, (width, height))
while True:
ret, frame = vp.capture .read()
if ret:
cv2.imshow("frame", frame)
# 写入帧到输出文件
# out.write(frame)
else:
logging.info("无法读取图像帧")
if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环
break
vp.capture.release()
# out.release()

32
test/测试序列化.py

@ -1,32 +0,0 @@
from dataclasses import dataclass, field
from typing import Optional
from dataclasses_json import dataclass_json, config
import json
import models.target
@dataclass_json
@dataclass
class Rectangle:
x: int
y: int
width: int
height: int
p: Optional[models.target.Point]= field(default=None, metadata=config(exclude=lambda x: x is None))
# 使用
p=models.target.Point(1,2)
rect = Rectangle(0, 0, 100, 100,None)
# 序列化
json_str=rect.to_json()
# json_str = json.dumps(rect, default=lambda obj: obj.__dict__)
print(f"序列化后={json_str}")
rect.p.x=2046
print("修改后p.x=",p.x)
new_json='{"x": 1, "y": 1, "width": 100, "height": 100, "p": {"x": 2, "y": 2}}'
nr=Rectangle.from_json(new_json)
print(nr.p)

73
upload/DataReporter.py

@ -1,73 +0,0 @@
import queue
import threading
import time
import models.msg
from upload.RateLimiter import RateLimiter
class DataReporter(threading.Thread):
call_back=None
def __init__(self,data_fps:int,video_fps:int):
super().__init__()
self.image_queue = queue.Queue(maxsize=video_fps) # 图片队列
self.data_queue = queue.Queue(maxsize=data_fps) # 数据队列
self.image_limiter = RateLimiter(max_rate=video_fps, time_window=1) # 图片限速: 5张/秒
self.data_limiter = RateLimiter(max_rate=data_fps, time_window=1) # 数据限速: 20条/秒
self.running = True
self.image_dropped = 0 # 统计丢弃的图片数量
self.data_dropped = 0 # 统计丢弃的数据数量
def register_handler(self,handler_fun):
self.call_back = handler_fun
def run(self):
while self.running:
# 优先处理图片上报
if not self.image_queue.empty() and self.image_limiter.allow_request():
try:
image_data = self.image_queue.get_nowait()
self._report_image(image_data)
except queue.Empty:
pass
# 然后处理数据上报
if not self.data_queue.empty() and self.data_limiter.allow_request():
try:
data = self.data_queue.get_nowait()
self._report_data(data)
except queue.Empty:
pass
time.sleep(0.02) # 避免CPU占用过高
def _report_image(self, data):
# 实现图片上报逻辑
print(f"Reporting image, timestamp: {data[0]}")
# 这里替换为实际的上报代码
msg = models.msg.Msg(_from="dev", cmd="image", values=data[1])
msg_json = msg.to_json()
self.call_back(msg_json)
def _report_data(self, data):
# 实现数据上报逻辑
print(f"Reporting data: {data}")
# 实际的上报代码,数据结构转换
msg=models.msg.Msg(_from="dev",cmd="data",values=data[1])
msg_json=msg.to_json()
self.call_back(msg_json)
def stop(self):
self.running = False
self.join()
print(f"Stats: {self.image_dropped} images dropped, {self.data_dropped} data dropped")
def adjust_rate(self, new_rate, data_type='image'):
if data_type == 'image':
with self.image_limiter.lock:
self.image_limiter.max_rate = new_rate
self.image_queue= queue.Queue(maxsize=new_rate)
else:
with self.data_limiter.lock:
self.data_limiter.max_rate = new_rate
self.data_queue = queue.Queue(maxsize=new_rate)

12
upload/DroppingQueue.py

@ -1,12 +0,0 @@
import queue
class DroppingQueue(queue.Queue):
"""自定义队列,满时自动丢弃最旧的数据"""
def put(self, item, block=False, timeout=None):
try:
return super().put(item, block=block, timeout=timeout)
except queue.Full:
# 队列满时丢弃最旧的一个数据
self.get_nowait()
return super().put(item, block=False)

23
upload/RateLimiter.py

@ -1,23 +0,0 @@
import threading
import queue
import time
from collections import deque
class RateLimiter:
def __init__(self, max_rate, time_window):
self.max_rate = max_rate # 最大允许的请求数
self.time_window = time_window # 时间窗口(秒)
self.timestamps = deque()
self.lock = threading.Lock()
def allow_request(self):
with self.lock:
current_time = time.time()
# 移除超出时间窗口的时间戳
while self.timestamps and current_time - self.timestamps[0] > self.time_window:
self.timestamps.popleft()
if len(self.timestamps) < self.max_rate:
self.timestamps.append(current_time)
return True
return False

60
图像偏移.py

@ -1,60 +0,0 @@
import cv2
import numpy as np
def pingyi(x,y):
# 获取图像的大小
height, width = image.shape[:2]
# 构造平移矩阵:将原点移到图像中心
M_translate = np.float32([
[1, 0, x],
[0, 1, y],
[0, 0, 1]
])
return M_translate
def xuanzhuan_z(z):
# 构造沿Z轴旋转30度的旋转矩阵
theta = np.radians(z)
cos_theta = np.cos(theta)
sin_theta = np.sin(theta)
M_z_rotate = np.float32([
[cos_theta, -sin_theta, 0],
[sin_theta, cos_theta, 0],
[0, 0, 1]
])
return M_z_rotate
def xuanzhuan_y(y):
# 构造沿Z轴旋转30度的旋转矩阵
theta = np.radians(y)
rotation_vector = np.array([0, theta, 0]) # 绕Y轴旋转
# 计算旋转矩阵
rotation_matrix, _ = cv2.Rodrigues(rotation_vector)
return rotation_matrix
if __name__ == '__main__':
# 读取图像
image = cv2.imread("images/trans/transformed_image.jpg")
# 获取图像的大小
height, width = image.shape[:2]
m_pinyi=pingyi(width,0)
# m_xuanzhuan=xuanzhuan_z(30)
m_xuanzhuan = xuanzhuan_y(80)
M_combined = m_pinyi @ m_xuanzhuan
rotation_3d_int = np.clip(M_combined, 0, 255)
# 应用透视变换
warped_image = cv2.warpPerspective(
image,
M_combined,
(width*2, height*2)
)
# 显示结果
cv2.imshow('Original Image', image)
cv2.imshow('Warped Image', warped_image)
cv2.waitKey(0)
cv2.destroyAllWindows()

47
直线检测.py

@ -1,47 +0,0 @@
import cv2
import numpy as np
def find_line(image):
if image is None:
print("Error: Unable to load image.")
exit()
gray_frame = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
blurred_image = cv2.GaussianBlur(gray_frame, (9, 9), 0)
ret, gray_frame = cv2.threshold(blurred_image, 50, 255, cv2.THRESH_BINARY)
cv2.imshow("binary", gray_frame)
# edges = cv2.Canny(blurred_image, 50, 150, apertureSize=3)
# 应用边缘检测
edges = cv2.Canny(gray_frame, 200, 400, apertureSize=3)
# 使用标准霍夫变换检测直线
# 使用标准霍夫变换检测直线
lines = cv2.HoughLines(edges, rho=3, theta=np.pi / 180, threshold=200)
# 绘制检测到的直线
if lines is not None:
for line in lines:
rho, theta = line[0]
a = np.cos(theta)
b = np.sin(theta)
x0 = a * rho
y0 = b * rho
x1 = int(x0 + 1000 * (-b))
y1 = int(y0 + 1000 * (a))
x2 = int(x0 - 1000 * (-b))
y2 = int(y0 - 1000 * (a))
cv2.line(image, (x1, y1), (x2, y2), (0, 0, 255), 2)
if __name__ == '__main__':
# 读取图像
img = cv2.imread("images/trans/subRawImg.jpg")
find_line(img)
# 显示结果
cv2.imshow("Detected Lines", img)
cv2.waitKey(0)
cv2.destroyAllWindows()
Loading…
Cancel
Save