diff --git a/app.py b/app.py new file mode 100644 index 0000000..3ebecac --- /dev/null +++ b/app.py @@ -0,0 +1,53 @@ +import sys +from time import sleep +import signal +import configSer +import tcp_Ser +import upload.DataReporter +import videoDetection +import videoPush + + +tcp_service = None +video_processor = None +def signal_handler(sig, frame): + global tcp_service,video_processor + print(f"收到退出信号 sig={sig},程序准备退出") + tcp_service.stop() + tcp_service.join() + video_processor.stop() + videoPush.stop() + print(f"===释放完毕,退出===") + sys.exit(0) + +signal.signal(signal.SIGINT, signal_handler) # 捕获 Ctrl+C 信号 +if __name__ == '__main__': + if __debug__: + print("Debug 模式") + else: + print("release 模式") + + config_path = "./config.json" + # 读取配置文件 + config_obj= configSer.ConfigOperate(config_path) + json_str = config_obj.config_info.to_json(indent=4) + print(f"当前配置:{json_str}") + + tcp_service = tcp_Ser.TcpSer("0.0.0.0", config_obj.config_info.server.port) + tcp_service.start() + reporter = upload.DataReporter.DataReporter(data_fps=config_obj.config_info.fps.data,video_fps=config_obj.config_info.fps.video) + reporter.register_handler(tcp_service.broadcast_message) + reporter.start() + # 启动video + videoDetection.configObj=config_obj + video_processor = videoDetection.VideoProcessor(reporter) + # 添加订阅者processor + tcp_service.add_subscribe(video_processor) + + # 推流 + videoPush.video_server_run() + + # 启动 + video_processor.video_mode(config_obj.config_info.capture) + + print("==over==") diff --git a/config.json b/config.json new file mode 100644 index 0000000..6044b02 --- /dev/null +++ b/config.json @@ -0,0 +1,59 @@ +{ + "server": { + "port": 2230 + }, + "fps": { + "data": 10, + "video": 0 + }, + "capture": "0", + "targets": { + "0": { + "info": { + "rectangle_area": { + "x": 1200, + "y": 1180, + "w": 130, + "h": 100 + }, + "threshold": { + "binary": 200, + "gauss": 3 + }, + "radius": 20.0, + "id": 0, + "desc": "0_up", + "base": false + }, + "perspective": [ + [ + 1.0, + 0.0, + 0.0 + ], + [ + 0.0, + 1.0, + 0.0 + ], + [ + 0.0, + 0.0, + 1.0 + ] + ], + "handler_info": { + "radius_pix": 27.01, + "pix_length": 0.7404664938911514, + "center_point": { + "x": 261.5, + "y": 107.0 + }, + "center_init": { + "x": 261.5, + "y": 108.5 + } + } + } + } +} \ No newline at end of file diff --git a/configSer.py b/configSer.py new file mode 100644 index 0000000..1e3bb19 --- /dev/null +++ b/configSer.py @@ -0,0 +1,114 @@ +import json +import os +from dataclasses import ( + dataclass, + field +) +from typing import Dict + +import numpy as np +from dataclasses_json import dataclass_json + +import models.target + +_file_path: str + + +@dataclass_json +@dataclass +class Server: + port: int = 0 + +@dataclass_json +@dataclass +class Fps: + data: int = 0 + video: int = 0 + +@dataclass_json +@dataclass +class ConfigInfo: + server:Server + fps:Fps + capture: str = "0" + # 标靶配置 + targets: Dict[int, models.target.CircleTarget] = field(default_factory=dict) + +class ConfigOperate: + _file_path: str + config_info: ConfigInfo + def __init__(self,path:str): + self._file_path = path + self.load2obj_sample() + + + def load2dict(self): + """"读取配置""" + if not os.path.exists(self._file_path): + raise FileNotFoundError(f"配置文件 {self._file_path} 不存在") + + with open(self._file_path) as json_file: + config = json.load(json_file) + return config + + # def load2obj_sample2(self): + # """"读取配置""" + # dic=self.load2dict() + # ts = dic["targets"] + # capture = dic["capture"] + # # 获取矩阵数据 + # matrix_dict = dic.get("perspective", {}) + # # n0=convert_to_ndarray(self.matrix_dict["0"]) + # # 将矩阵转换为字符串 + # # matrix_str = np.array2string(n0, precision=8, separator=', ', suppress_small=True) + # for _,t in ts.items(): + # obj = models.target.TargetInfo(**t) + # area = models.target.RectangleArea.from_dict(obj.rectangle_area) + # thres = models.target.Threshold(**obj.threshold) + # self.targets[obj.id] = models.target.CircleTarget( + # obj.id, + # obj.desc, + # area, + # obj.radius, + # thres, + # obj.base + # ) + # return self.targets + + def load2obj_sample(self): + dic=self.load2dict() + dict_str = json.dumps(dic) + self.config_info=ConfigInfo.from_json(dict_str) + + def save2json_file(self): + json_str = self.config_info.to_json(indent=4) + """"更新配置""" + with open(self._file_path, 'w') as json_file: + json_file.write(json_str) + # json.dump(self, json_file, indent=4) + return None + + + def save_dict_config(self, dict_data:Dict): + """"更新配置""" + with open(self._file_path, 'w') as json_file: + json.dump(dict_data, json_file, indent=4) + return None + + def update_dict_config(self, updates): + """ + 更新配置文件中的特定字段。 + :param file_path: 配置文件路径 + :param updates: 包含更新内容的字典 + """ + config_dict = self.load2dict() + config_dict.update(updates) + self.save_dict_config(config_dict) + +def convert_to_ndarray(matrix_data): + """ + 将 JSON 中的矩阵数据转换为 numpy ndarray。 + :param matrix_data: JSON 中的矩阵数据(列表形式) + :return: numpy ndarray + """ + return np.array(matrix_data, dtype=np.float64) \ No newline at end of file diff --git a/images/biaoba.jpg b/images/biaoba.jpg new file mode 100644 index 0000000..4de45ec Binary files /dev/null and b/images/biaoba.jpg differ diff --git a/main.py b/main.py new file mode 100644 index 0000000..fcb1163 --- /dev/null +++ b/main.py @@ -0,0 +1,9 @@ +def print_hi(name): + # 在下面的代码行中使用断点来调试脚本。 + print(f'Hi, {name}') # 按 Ctrl+F8 切换断点。 + + +# 按装订区域中的绿色按钮以运行脚本。 +if __name__ == '__main__': + print_hi('PyCharm') + diff --git a/models/msg.py b/models/msg.py new file mode 100644 index 0000000..c59606d --- /dev/null +++ b/models/msg.py @@ -0,0 +1,19 @@ +import json +import typing +from dataclasses import dataclass +from dataclasses_json import dataclass_json + +@dataclass_json +@dataclass +class Msg: + _from: str + cmd: str + values: typing.Any + + def to_json_(self) -> str: + """将数据类序列化为 JSON 字符串""" + return self.to_json() + + + + diff --git a/models/sampleMsg.py b/models/sampleMsg.py new file mode 100644 index 0000000..8620eff --- /dev/null +++ b/models/sampleMsg.py @@ -0,0 +1,43 @@ +import json +from dataclasses import dataclass +from typing import List + +@dataclass +class SensorImg: + base64: str + + def to_json(self) -> str: + """将数据类序列化为 JSON 字符串""" + return json.dumps(self.__dict__, indent=4, default=lambda x: x.__dict__) + + @classmethod + def from_json(cls, json_str: str) -> 'SensorImg': + """从 JSON 字符串反序列化为数据类""" + data_dict = json.loads(json_str) + return cls(**data_dict) +@dataclass +class SensorData: + pos: str + x: float + y: float + + def to_json(self) -> str: + """将数据类序列化为 JSON 字符串""" + return json.dumps(self.__dict__, indent=4, default=lambda x: x.__dict__) + + @classmethod + def from_json(cls, json_str: str) -> 'SensorData': + """从 JSON 字符串反序列化为数据类""" + data_dict = json.loads(json_str) + return cls(**data_dict) + +@dataclass +class AllSensorData: + data: List[SensorData] + time: str + + +@dataclass +class AllImg: + image: SensorImg + time: str \ No newline at end of file diff --git a/models/target.py b/models/target.py new file mode 100644 index 0000000..860eef5 --- /dev/null +++ b/models/target.py @@ -0,0 +1,133 @@ +from dataclasses import dataclass, field +from numbers import Number +from typing import Optional, Dict, Deque +from collections import deque +import numpy as np +from dataclasses_json import dataclass_json, config + +import utils + + +@dataclass_json +@dataclass +class Point: + x:float + y:float + def __iter__(self): # 使对象可迭代,可直接转为元组 + yield self.x + yield self.y + +@dataclass +class RectangleArea: + x: int + y: int + w: int + h: int + @classmethod + def from_dict(cls, data: dict): + return cls( + x=data['x'], + y=data['y'], + w=data['w'], + h = data['h']) + +@dataclass +class Threshold: + binary: int + gauss: int + +@dataclass_json +@dataclass +class TargetInfo: + # 标靶方形区域 + rectangle_area:RectangleArea + threshold:Threshold + # 标靶物理半径 + radius:float=0.0 + id:int =-1 + desc:str="" + base:bool=False + def __init__(self,id,desc,rectangle_area:RectangleArea,radius,threshold:Threshold,base:bool,**kwargs): + self.id = id + self.desc = desc + self.rectangle_area=rectangle_area + self.radius=radius + self.threshold=threshold + self.base=base + + @classmethod + def from_dict(cls,data: dict): + return cls(data['id'],data['rectangle_area'],data['radius']) + +@dataclass_json +@dataclass +class HandlerInfo: + # 初始话 + is_init=True + radius_pix:float= 1.0 + pix_length:float=0.0 + # 标靶中心 + center_point_queue:Deque[Point] = field(default_factory=lambda: deque(maxlen=10)) + center_point: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) + center_init : Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) + # 标靶位移(像素) + displacement_pix: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) + displacement_phy: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) + + def calculate_mean(self)-> Point: + """计算队列中所有数据的均值""" + if self.center_point_queue: + length=len(self.center_point_queue) + mean_x = sum(p.x for p in self.center_point_queue) / length + mean_y = sum(p.y for p in self.center_point_queue) / length + return Point(mean_x, mean_y) + else: + return None + + + def enqueue_center_point(self, data) -> Point: + """入队操作""" + self.center_point_queue.append(data) + return self.calculate_mean() + +@dataclass_json +@dataclass +class CircleTarget: + # 标靶方形区域 + info:TargetInfo + # 标靶透视矩阵 + perspective: np.ndarray = field( + metadata=config( + encoder=utils.encode_perspective, + decoder=utils.decode_perspective + ) + ) + handler_info: Optional[HandlerInfo]=None + + + # def __init__(self,info:TargetInfo,center_point,center_init,displacement_pix,displacement_phy): + # self.info=info + # self.center_point=center_point + # self.center_init=center_init + # self. displacement_pix=displacement_pix + # self.displacement_phy=displacement_phy + + + @classmethod + def init_by_info(cls,t:TargetInfo): + return CircleTarget(t,None,None,None,None) + + def circle_displacement_pix(self): + previous = self.handler_info.center_init + if previous != (): + self.handler_info.displacement_pix = Point(self.handler_info.center_point.x - previous.x, + self.handler_info.center_point.y - previous.y) + return self + def circle_displacement_phy(self): + if self.info.radius != 0 and self.handler_info.displacement_pix is not None: + # 单位像素->距离 + self.handler_info.pix_length = self.info.radius / self.handler_info.radius_pix + offset_x = round(float(self.handler_info.displacement_pix.x * self.handler_info.pix_length), 5) + offset_y = round(float(self.handler_info.displacement_pix.y * self.handler_info.pix_length), 5) + self.handler_info.displacement_phy = Point(offset_x, offset_y) + return self \ No newline at end of file diff --git a/tcp_Ser.py b/tcp_Ser.py new file mode 100644 index 0000000..93dde83 --- /dev/null +++ b/tcp_Ser.py @@ -0,0 +1,132 @@ +import logging +import socket +import threading + +from models.msg import Msg + + +class TcpSer(threading.Thread): + # 定义服务器地址和端口 + HOST = '127.0.0.1' + PORT = 2230 + def __init__(self,host,port): + super().__init__() + self.HOST=host + self.PORT=port + self.connected_clients=[] + # 消费者 + self.consumers=[] + self.lock = threading.Lock() + self.running = True + # self.daemon = True + # 处理客户端连接的函数 + def handle_client(self,client_socket): + try: + # 保持连接,直到客户端断开 + while self.running: + # 接收客户端数据(如果需要) + data = client_socket.recv(4096) + msg_str=data.decode('utf-8') + if not data: + break # 如果没有数据,退出循环 + print(f"从 {client_socket.getpeername()} 收到: {msg_str}") + # 反序列化为 实例 + s_cmd = Msg.from_json(msg_str) + valid_msg:bool=True + match s_cmd.cmd: + case "getPoints" | "setPoints": + self.on_data(s_cmd,valid_msg) + case "videoFps"| "dataFps": + self.on_data(s_cmd,valid_msg) + case "setCap": + self.on_data(s_cmd,valid_msg) + case "videoMode": + self.on_data(s_cmd,valid_msg) + # todo 添加处理 + case _: + valid_msg = False + err_msg=f"valid cmd={s_cmd.cmd}" + resp=f"""{{"_from": "dev","cmd": "{s_cmd.cmd}","values": {{"operate": false,"err": "{err_msg}"}}}}""" + client_socket.sendall(resp.encode()) + print("非法命令",resp) + + print("通讯完成") + except ConnectionAbortedError: + print("客户端已断开") + except Exception as e: + print(f"处理客户端时出错: {e}") + finally: + pass + + + # 注册的消费者必须携带on_data 方法 + def add_subscribe(self,consumer): + if hasattr(consumer, 'on_data'): + print(f"加入 consumer {consumer} ") + self.consumers.append(consumer) + else: + print("consumer 缺少on_data函数,订阅无效 ") + def on_data(self,msg,valid): + for consumer in self.consumers: + try: + resp=consumer.on_data(msg) + self.broadcast_message(resp) + except Exception as e: + logging.warn("通讯异常",e) + + # 广播消息给所有连接的客户端 + def broadcast_message(self,message:str): + with self.lock: + if len(message)==0: + return + + message+="\n\n" + for client in self.connected_clients: + try: + client.sendall(message.encode()) + except Exception as e: + print(f"向客户端发送消息时出错: {e}") + # 如果发送失败,从列表中移除客户端 + if client in self.connected_clients: + self.connected_clients.remove(client) + client.close() + def run(self): + # 创建服务器套接字 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind((self.HOST,self.PORT)) + server_socket.listen() + server_socket.settimeout(1.0) # 设置 accept 超时时间为 1 秒 + print(f"服务器监听在 {self.HOST}:{self.PORT}...") + + try: + # 保持服务器运行并接受新的连接 + while self.running: + try: + client_socket, addr = server_socket.accept() + print(f"连接来自 {addr}") + + # 当客户端连接时,将其添加到列表中 + self.connected_clients.append(client_socket) + + # 为每个客户端启动一个线程 + client_thread = threading.Thread(target=self.handle_client, args=(client_socket,)) + # client_thread.daemon = True # 守护线程,服务器关闭时自动结束 + client_thread.start() + except socket.timeout: + continue # 超时继续循环 + except KeyboardInterrupt: + print("服务器关闭...") + finally: + # 关闭所有客户端连接 + for client in self.connected_clients: + print(f"断开客户端 {client_socket.getpeername()}") + client.close() + server_socket.close() + def stop(self): + """外部调用此方法停止服务""" + self.running = False + +if __name__ == '__main__': + tcp=TcpSer("127.0.0.1",2230) + tcp.run() \ No newline at end of file diff --git a/upload/DataReporter.py b/upload/DataReporter.py new file mode 100644 index 0000000..3f54789 --- /dev/null +++ b/upload/DataReporter.py @@ -0,0 +1,79 @@ +import queue +import threading +import time +from datetime import datetime + +import models.msg +from upload.RateLimiter import RateLimiter + + +class DataReporter(threading.Thread): + call_back=None + def __init__(self,data_fps:int,video_fps:int): + super().__init__() + self.image_queue = queue.Queue(maxsize=video_fps) # 图片队列 + self.data_queue = queue.Queue(maxsize=data_fps) # 数据队列 + self.image_limiter = RateLimiter(max_rate=video_fps, time_window=1) # 图片限速: 5张/秒 + self.data_limiter = RateLimiter(max_rate=data_fps, time_window=1) # 数据限速: 20条/秒 + self.running = True + self.image_dropped = 0 # 统计丢弃的图片数量 + self.data_dropped = 0 # 统计丢弃的数据数量 + self.daemon = True + + def register_handler(self,handler_fun): + self.call_back = handler_fun + + def run(self): + while self.running: + # 优先处理图片上报 + if not self.image_queue.empty() and self.image_limiter.allow_request(): + try: + image_data = self.image_queue.get_nowait() + self._report_image(image_data) + except queue.Empty: + pass + + # 然后处理数据上报 + if not self.data_queue.empty() and self.data_limiter.allow_request(): + try: + data = self.data_queue.get_nowait() + self._report_data(data) + except queue.Empty: + pass + + time.sleep(0.02) # 避免CPU占用过高 + + def _report_image(self, data): + # 实现图片上报逻辑 + print(f"Reporting image, timestamp: {data[0]}") + # 这里替换为实际的上报代码 + msg = models.msg.Msg(_from="dev", cmd="image", values=data[1]) + msg_json = msg.to_json() + self.call_back(msg_json) + + def _report_data(self, data): + # 实现数据上报逻辑 + print(f"Reporting data: {data}") + # 实际的上报代码,数据结构转换 + msg=models.msg.Msg(_from="dev",cmd="data",values=data[1]) + # 重新格式化时间,数据等间隔 + msg.values.time=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + msg_json=msg.to_json() + self.call_back(msg_json) + + def stop(self): + self.running = False + self.join() + print(f"Stats: {self.image_dropped} images dropped, {self.data_dropped} data dropped") + + def adjust_rate(self, new_rate, data_type='image'): + if data_type == 'image': + with self.image_limiter.lock: + self.image_limiter.max_rate = new_rate + self.image_queue= queue.Queue(maxsize=new_rate) + self.image_limiter.update_interval() + else: + with self.data_limiter.lock: + self.data_limiter.max_rate = new_rate + self.data_queue = queue.Queue(maxsize=new_rate) + self.data_limiter.update_interval() \ No newline at end of file diff --git a/upload/DroppingQueue.py b/upload/DroppingQueue.py new file mode 100644 index 0000000..d520248 --- /dev/null +++ b/upload/DroppingQueue.py @@ -0,0 +1,12 @@ +import queue + + +class DroppingQueue(queue.Queue): + """自定义队列,满时自动丢弃最旧的数据""" + def put(self, item, block=False, timeout=None): + try: + return super().put(item, block=block, timeout=timeout) + except queue.Full: + # 队列满时丢弃最旧的一个数据 + self.get_nowait() + return super().put(item, block=False) \ No newline at end of file diff --git a/upload/RateLimiter.py b/upload/RateLimiter.py new file mode 100644 index 0000000..f03769c --- /dev/null +++ b/upload/RateLimiter.py @@ -0,0 +1,26 @@ +import threading +import queue +import time +from collections import deque + +class RateLimiter: + def __init__(self, max_rate, time_window): + self.max_rate = max_rate + self.time_window = time_window + self.lock = threading.Lock() + self.current_time = time.time() + self.interval = time_window / max_rate if max_rate > 0 else 0 + self.next_available_time = self.current_time + + def update_interval(self): + self.interval = self.time_window / self.max_rate if self.max_rate > 0 else 0 + + def allow_request(self): + with self.lock: + if self.interval<=0: + return False + current_time = time.time() + if current_time >= self.next_available_time: + self.next_available_time = current_time + self.interval + return True + return False diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..ae2965d --- /dev/null +++ b/utils.py @@ -0,0 +1,36 @@ +from typing import Dict, List + +import cv2 +import base64 + +import numpy as np +def frame_to_img(frame, format="JPEG"): + """将 OpenCV 读取的图片帧转换为 img""" + # 将图片帧编码为 JPEG 或 PNG 格式 + if format.upper() == "JPEG": + encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 80] # JPEG 压缩质量 + elif format.upper() == "PNG": + encode_param = [int(cv2.IMWRITE_PNG_COMPRESSION), 9] # PNG 压缩级别 + else: + raise ValueError("Unsupported format. Use 'JPEG' or 'PNG'.") + + result, encoded_img = cv2.imencode(f".{format.lower()}", frame, encode_param) + return result,encoded_img + +def frame_to_base64(frame, format="JPEG"): + """将 OpenCV 读取的图片帧转换为 Base64 编码的字符串""" + result, encoded_img = frame_to_img(frame, format=format) + if not result: + raise ValueError("Failed to encode frame.") + + # 将编码后的字节流转换为 Base64 字符串 + base64_string = base64.b64encode(encoded_img).decode("utf-8") + return base64_string + + +# 自定义编码器和解码器 +def encode_perspective(value: np.ndarray) -> List[List[float]]: + return value.tolist() + +def decode_perspective(value: List[List[float]]) -> np.ndarray: + return np.array(value) \ No newline at end of file diff --git a/videoDetection.py b/videoDetection.py new file mode 100644 index 0000000..1af821a --- /dev/null +++ b/videoDetection.py @@ -0,0 +1,463 @@ +from datetime import datetime +import json +import queue +import time +from time import sleep +import cv2 +import numpy as np +import logging + +import configSer +import models.target +import models.sampleMsg +import upload.DataReporter +import utils +import videoPush +from models.msg import Msg + +logging.basicConfig(level=logging.DEBUG) +drawing: bool = False # 是否正在绘制 +is_video_mode: bool = False # 是否采用标靶图覆盖 +# 定义点 +start_point: models.target.Point +end_point: models.target.Point +# 配置 +configObj:configSer.ConfigOperate + +# 鼠标回调函数 +def add_rectangle(event, x, y, flags, param): + global start_point, end_point, drawing + + if event == cv2.EVENT_LBUTTONDOWN: # 左键按下 + logging.info("左键按下") + start_point = models.target.Point(x, y) + end_point = start_point + drawing = True + elif event == cv2.EVENT_MOUSEMOVE: # 鼠标移动 + if drawing: + end_point = models.target.Point(x, y) + elif event == cv2.EVENT_LBUTTONUP: # 左键抬起 + logging.info("左键抬起") + drawing = False + end_point = models.target.Point(x, y) + if start_point == end_point: + return + distance = cv2.norm(tuple(start_point), tuple(end_point), cv2.NORM_L2) + if distance < 20: + logging.info("距离小于20,无效区域") + return + target_id = len(configObj.config_info.targets) + # 圆标靶半径 mm + radius = 20.0 + area=models.target.RectangleArea(int(start_point.x),int(start_point.y), + int(end_point.x-start_point.x),int(end_point.y-start_point.y)) + t_info=models.target.TargetInfo( target_id, + "test add", + area, + radius, + models.target.Threshold(190,9), + False) + new_target = models.target.CircleTarget(t_info,None,None) + logging.info(f"新增区域[{target_id}] => {start_point, end_point}") + configObj.config_info.targets[target_id] = new_target + +def read_target_rectangle(): + return configObj.config_info.targets +class VideoProcessor: + reporter: upload.DataReporter.DataReporter + capture: cv2.VideoCapture + is_opened: bool= False + is_running = True + def __init__(self, reporter:upload.DataReporter.DataReporter): + self.reporter = reporter + + def on_data(self,msg:Msg): + global configObj,is_video_mode + logging.info(f"msg={msg}") + match msg.cmd: + case "getPoints": + targets=configObj.config_info.targets.copy() + for k,v in targets.items(): + targets[k].handler_info=None + + resp_msg = models.msg.Msg(_from="dev", cmd="getPoints", values={"targets": targets}) + resp_json = resp_msg.to_json_() + return resp_json + case "setPoints": + v=msg.values + ts=v["targets"] + + # 清空原配置 + configObj.config_info.targets={} + for _,t in ts.items(): + t_str=json.dumps(t) + new_c_target = models.target.CircleTarget.from_json(t_str) + configObj.config_info.targets[new_c_target.info.id] =new_c_target + + configObj.save2json_file() + resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True}) + resp_json = resp_msg.to_json() + return resp_json + case "videoFps": + v = msg.values + fps = v["fps"] + self.reporter.adjust_rate(fps,"image") + configObj.config_info.fps.video = fps + configObj.save2json_file() + resp_msg = models.msg.Msg(_from="dev", cmd="videoFps", values={"operate": True}) + resp_json = resp_msg.to_json() + return resp_json + case "dataFps": + v = msg.values + fps = v["fps"] + self.reporter.adjust_rate(fps,"data") + configObj.config_info.fps.data=fps + configObj.save2json_file() + resp_msg = models.msg.Msg(_from="dev", cmd="dataFps", values={"operate": True}) + resp_json = resp_msg.to_json() + return resp_json + case "setCap": + v = msg.values + cap = v["cap"] + self.switch_video(cap) + resp_msg = models.msg.Msg(_from="dev", cmd="setCap", values={"operate": True}) + resp_json = resp_msg.to_json() + return resp_json + case "videoMode": + v = msg.values + is_debug = v["debug"] + is_video_mode=is_debug + resp_msg = models.msg.Msg(_from="dev", cmd="videoMode", values={"operate": True}) + resp_json = resp_msg.to_json() + return resp_json + print("==") + + + def pre_handler_img(self,gray_frame,now_str:str): + # 将灰度图压缩为 JPEG 格式,并存储到内存缓冲区 + img_base64 = utils.frame_to_base64(gray_frame, format="JPEG") + all_img = models.sampleMsg.AllImg(image=img_base64, time=now_str) + self.enqueue_image(all_img) + + def draw_rectangle(self,img): + global configObj,is_video_mode + gray_frame = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + + now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + #图像发送 + if configObj.config_info.fps.video > 0: + self.pre_handler_img(gray_frame,now_str) + + + if len(configObj.config_info.targets)==0: return + + # 基点 + base_point_pix:models.target.Point=None + + #上报有新数据的点 + all_upload_data = models.sampleMsg.AllSensorData(data=[], time=now_str) + # 绘图-历史点 + for i, tr in configObj.config_info.targets.items(): + if not hasattr(tr, "info"): + print("====") + _start_point = models.target.Point(tr.info.rectangle_area.x, tr.info.rectangle_area.y) + _end_point = models.target.Point( + tr.info.rectangle_area.x+tr.info.rectangle_area.w, + tr.info.rectangle_area.y+tr.info.rectangle_area.h) + #绘制标靶区域 + blue_color = (255, 0, 0) + if tr.info.base: + blue_color=(200, 0, 200) #紫红色 基点 + + cv2.rectangle(img,tuple(_start_point), tuple(_end_point), blue_color, 2) + label=f"{tr.info.desc},r={tr.info.radius}" + cv2.putText(img, label, (_start_point.x,_start_point.y-6), cv2.FONT_HERSHEY_SIMPLEX, 0.6, blue_color,1) + #检测 + # 获取图像尺寸 + frame_height, frame_width = gray_frame.shape[:2] + if _end_point.x>frame_width or _end_point.y>frame_height: + print(f"标靶[{tr.info.desc}]sub_image 超出区域") + continue + sub_image = self.extract_sub_image(gray_frame, _start_point, _end_point) + + + ret, sub_binary_frame = cv2.threshold(sub_image, tr.info.threshold.binary, 255, cv2.THRESH_BINARY) + # 高斯滤波 + gauss_size=tr.info.threshold.gauss + sub_binary_frame = cv2.GaussianBlur(sub_binary_frame, (gauss_size, gauss_size), sigmaX=0,sigmaY=0,borderType=cv2.BORDER_REPLICATE) + # sub_binary_frame = cv2.bilateralFilter(sub_binary_frame, 5, 50, 50) + if tr.perspective is not None: + # 宽度 + sub_width = sub_binary_frame.shape[1] + # 高度 + sub_height = sub_binary_frame.shape[0] + + sub_binary_frame = cv2.warpPerspective(sub_binary_frame, tr.perspective, (sub_width, sub_height)) + + + # 覆盖原图 + if is_video_mode: + sub_c_img= cv2.cvtColor(sub_binary_frame, cv2.COLOR_GRAY2BGR) + self.cover_sub_image(img,sub_c_img, _start_point, _end_point) + + if __debug__: + cv2.imshow(f'{tr.info.id}_binaryImg', sub_binary_frame) + + circles = self.circle2_detect(sub_binary_frame) + if len(circles) == 0: + continue + center,radius_pix=self.circle_match(circles,_start_point) + # 纪录圆心位置 + if tr.handler_info is None: + tr.handler_info= models.target.HandlerInfo() + if tr.handler_info.is_init: + tr.handler_info.is_init=False + tr.handler_info.center_init = center + + # 数据平滑处理 + smooth_center = tr.handler_info.enqueue_center_point(center) + # print(f"{tr.info.desc},平滑len={len(tr.handler_info.center_point_queue)},平滑中心点={smooth_center},原始点={center}") + tr.handler_info.center_point=smooth_center + # 原始处理 tr.handler_info.center_point=center + tr.handler_info.radius_pix=radius_pix + tr.circle_displacement_pix() + + # 基点 + if tr.info.base: + base_point_pix=tr.handler_info.displacement_pix + + # 画圆 + self.circle_show(img,smooth_center,radius_pix,_start_point,_end_point) + + # 基于像素点计算 物理距离 + for i, tr in configObj.config_info.targets.items(): + + if tr.handler_info is None: + continue + if tr.handler_info.displacement_pix is None: + continue + # 减去基点偏移 + if base_point_pix is not None: + raw_point=tr.handler_info.displacement_pix + tr.handler_info.displacement_pix=models.target.Point( + x=raw_point.x-base_point_pix.x, + y=raw_point.y-base_point_pix.y) + if not tr.info.base: + # print(f"[{tr.info.id}]{tr.info.desc} 原偏 {raw_point} - 基偏{base_point_pix} ={tr.handler_info.displacement_pix}") + pass + tr.circle_displacement_phy() + if tr.handler_info.displacement_phy is not None: + all_upload_data.data.append( + models.sampleMsg.SensorData( + str(tr.info.id), + tr.handler_info.displacement_phy.x, + tr.handler_info.displacement_phy.y) + ) + tr.handler_info.displacement_pix=None + tr.handler_info.displacement_phy = None + + + #过滤无效空数据 + if len(all_upload_data.data)==0: + return + + if configObj.config_info.fps.data > 0: + self.enqueue_data(all_upload_data) + + + + def circle_match(self,circles, rect_s_point:models.target.Point)-> (models.target.Point,float): + + circle = max(circles, key=lambda c: c[2]) + + # 绘制圆心 + center = (circle[0] + rect_s_point.x, circle[1] + rect_s_point.y) + radius = float(np.round(circle[2], 3)) + cp = models.target.Point(x=center[0], y=center[1]) + return cp,radius + + def circle_show(self, img, center: models.target.Point,radius:float, rect_s_point: models.target.Point,rect_e_point: models.target.Point): + + font = cv2.FONT_HERSHEY_SIMPLEX + color = (255, 0, 0) # 蓝色 + scale = 0.5 + center_int = tuple(int(x) for x in center) + radius_int = int(radius) + cv2.circle(img, center_int, 2, (0, 255, 0), 4) + # 绘制外圆 + cv2.circle(img, center_int, radius_int, (0, 0, 255), 1) + # 打印圆心坐标 + + text1 = f"c:{(center.x,center.y,radius)}" + txt_location = (rect_s_point.x+2,rect_e_point.y-2) + # txt_location = (center_int[0] - radius_int, center_int[1] + radius_int + 10) + cv2.putText(img, text1, txt_location, font, scale, color, 1) + + + def circle2_detect(self,img): + # 圆心距 canny阈值 最小半径 最大半径 + circles_float = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 1, 30, param1=200, param2=0.8, minRadius=15, + maxRadius=0) + # 创建一个0行, 2列的空数组 + if circles_float is not None: + # 提取圆心坐标(保留2位小数) + centers = [(round(float(x),2), round(float(y),2), round(float(r),2)) for x, y, r in circles_float[0, :]] + return centers + else: + return [] + + + def extract_sub_image(self,frame, top_left, bottom_right): + """ + 从帧中截取子区域 + :param frame: 输入的视频帧 + :param top_left: 子图片的左上角坐标 (x1, y1) + :param bottom_right: 子图片的右下角坐标 (x2, y2) + :return: 截取的子图片 + """ + x1, y1 = top_left + x2, y2 = bottom_right + return frame[y1:y2, x1:x2] + + def cover_sub_image(self,frame,sub_frame, top_left, bottom_right): + x1, y1 = top_left + x2, y2 = bottom_right + frame[y1:y2, x1:x2]= sub_frame + return frame + + + def open_video(self,video_id): + sleep(1) + print(f"打开摄像头 -> {video_id}") + self.capture = cv2.VideoCapture(video_id) + frame_width = int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH)) + frame_height = int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) + print(f"默认分辨率= {frame_width}*{frame_height}") + logging.info(f"{video_id}地址->{self.capture}") + if not self.capture.isOpened(): + self.capture.release() + logging.info(f"无法打开摄像头{video_id}, release地址 -> {self.capture}") + return + fps = self.capture.get(cv2.CAP_PROP_FPS) + print(f"fps={fps},video_id={video_id},") + # self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, 1600) # 宽度 + # self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 900) # 高度 + self.is_opened=True + + def switch_video(self,video_id:str): + print(f"切换摄像头 -> {video_id}") + self.is_opened = False + self.capture.release() + cv2.destroyAllWindows() + if str.isdigit(video_id): + video_id=int(video_id) + self.open_video(video_id) + + + + def show_video(self): + global sigExit,start_point, end_point, drawing + if __debug__: + cv2.namedWindow('Frame') + cv2.setMouseCallback('Frame', add_rectangle) + # 读取一帧图像 + while self.is_running: + if not self.is_opened: + print(f"摄像头 标记is_opened={self.is_opened}") + sleep(5) + continue + ret, frame = self.capture.read() + if ret: + self.frame_handle(frame) + else: + logging.info(f"无法读取帧,cap地址- >{self.capture}") + sleep(1) + # self.capture.release() + # self.capture= cv2.VideoCapture(0) # 再次尝试 + cv2.waitKey(1) + # 显示图像 + if frame is not None: + if __debug__: + cv2.imshow('Frame', frame) + #缓存到推流 + videoPush.update_latest_frame(frame) + print("退出VideoProcessor") + def show_image(self,frame): + global start_point, end_point, drawing + if __debug__: + cv2.namedWindow('Frame') + cv2.setMouseCallback('Frame', add_rectangle) + # 读取一帧图像 + while True: + cp_img=frame.copy() + self.frame_handle(cp_img) + + if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 + break + cv2.destroyAllWindows() + + def frame_handle(self,frame): + # 绘图-历史点 + self.draw_rectangle(frame) + # 绘图-实时 + if drawing: + cv2.rectangle(frame, tuple(start_point), tuple(end_point), (0, 200, 200), 4) + # print(f"鼠标位置 {start_point} -> {end_point}") + + def image_mode(self): + img_raw=cv2.imread('images/trans/_4point.jpg')#images/target/rp80max3.jpg + # img_raw = cv2.imread('images/trans/_4point.jpg') # images/target/rp80max3.jpg + # img_raw = cv2.imread('images/target/rp80.jpg') # images/target/rp80max3.jpg + self.show_image(img_raw) + + # 支持 + def video_mode(self,video_id:str): + if str.isdigit(video_id): + video_id=int(video_id) + self.open_video(video_id) + self.show_video() + # 释放摄像头资源并关闭所有窗口 + print("退出 video") + self.capture.release() + cv2.destroyAllWindows() + + def rtsp_mode(self,rtsp_url:str): + # rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554" + # rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554/h264/ch1/main/av_stream" + self.open_video(rtsp_url) + fps = self.capture.get(cv2.CAP_PROP_FPS) + print(f"rtsp fps={fps}") + self.show_video() + # 释放摄像头资源并关闭所有窗口 + self.capture.release() + cv2.destroyAllWindows() + + def enqueue_data(self,data): + # 获取当前时间戳 + timestamp = time.time() + # 将时间戳转换为 datetime 对象 + dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位 + # 放入图片队列(自动丢弃最旧数据当队列满时) + try: + self.reporter.data_queue.put((dt, data), block=False) + except queue.Full: + # self.reporter.data_dropped += 1 + pass + def enqueue_image(self,data): + # 获取当前时间戳 + timestamp = time.time() + # 将时间戳转换为 datetime 对象 + dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位 + # 放入图片队列(自动丢弃最旧数据当队列满时) + try: + self.reporter.image_queue.put((dt, data), block=False) + except queue.Full: + pass + #self.reporter.image_dropped += 1 + + def stop(self): + self.is_running=False + self.capture.release() + + + diff --git a/videoPush.py b/videoPush.py new file mode 100644 index 0000000..48a74e5 --- /dev/null +++ b/videoPush.py @@ -0,0 +1,60 @@ +from time import sleep + +import cv2 +import threading +import time + +import numpy as np +import requests +from flask import Flask, Response, render_template_string, request + +import utils + +app_flask = Flask(__name__) + +# 全局变量,用于存储最新的帧 +latest_frame:np.ndarray = None +lock = threading.Lock() +is_running = True + +def update_latest_frame(n_bytes:np.ndarray): + global latest_frame + latest_frame=n_bytes + +def generate_mjpeg(): + """生成MJPEG格式的视频流""" + while is_running: + with lock: + if latest_frame is None: + continue + _,latest_frame_bytes = utils.frame_to_img(latest_frame) + frame = latest_frame_bytes.tobytes() + pass + sleep(0.1) + yield (b'--frame\r\n' + b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') + + +@app_flask.route('/video_flow') +def video_feed(): + """视频流路由""" + return Response( + generate_mjpeg(), + mimetype='multipart/x-mixed-replace; boundary=frame' + ) + +def run(): + port=2240 + print(f"推流服务启动,访问端口 127.0.0.1:{port}/video_flow") + app_flask.run(host='0.0.0.0', port=port, threaded=True) + +def video_server_run(): + thread = threading.Thread(target=run) + thread.daemon = True # 设置为守护线程,主线程退出时自动终止 + thread.start() +def stop(): + global is_running + is_running=False + +if __name__ == '__main__': + run() \ No newline at end of file