From 10655a9d2aec706d526aeb097b6693934468f7ee Mon Sep 17 00:00:00 2001 From: lucas Date: Fri, 22 Aug 2025 08:32:01 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E6=B8=85=E9=99=A4=E6=97=A7=E7=89=88?= =?UTF-8?q?=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 140 ------------ app.py | 53 ----- build/Dockerfile_app | 6 - build/Dockerfile_base | 4 - config.json | 59 ----- configSer.py | 114 ---------- images/biaoba.jpg | Bin 2151 -> 0 bytes main.py | 9 - models/msg.py | 19 -- models/sampleMsg.py | 43 ---- models/target.py | 133 ------------ tcp_Ser.py | 132 ------------ upload/DataReporter.py | 79 ------- upload/DroppingQueue.py | 12 -- upload/RateLimiter.py | 26 --- utils.py | 36 ---- videoDetection.py | 463 ---------------------------------------- videoPush.py | 60 ------ 18 files changed, 1388 deletions(-) delete mode 100644 .gitignore delete mode 100644 app.py delete mode 100644 build/Dockerfile_app delete mode 100644 build/Dockerfile_base delete mode 100644 config.json delete mode 100644 configSer.py delete mode 100644 images/biaoba.jpg delete mode 100644 main.py delete mode 100644 models/msg.py delete mode 100644 models/sampleMsg.py delete mode 100644 models/target.py delete mode 100644 tcp_Ser.py delete mode 100644 upload/DataReporter.py delete mode 100644 upload/DroppingQueue.py delete mode 100644 upload/RateLimiter.py delete mode 100644 utils.py delete mode 100644 videoDetection.py delete mode 100644 videoPush.py diff --git a/.gitignore b/.gitignore deleted file mode 100644 index f8b73e7..0000000 --- a/.gitignore +++ /dev/null @@ -1,140 +0,0 @@ -# ---> Python -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - -# C extensions -*.so - -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ -cover/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 -db.sqlite3-journal - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -.pybuilder/ -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -# For a library or package, you might want to ignore these files since the code is -# intended to run in multiple environments; otherwise, check them in: -# .python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# pytype static type analyzer -.pytype/ - -# Cython debug symbols -cython_debug/ - diff --git a/app.py b/app.py deleted file mode 100644 index 3ebecac..0000000 --- a/app.py +++ /dev/null @@ -1,53 +0,0 @@ -import sys -from time import sleep -import signal -import configSer -import tcp_Ser -import upload.DataReporter -import videoDetection -import videoPush - - -tcp_service = None -video_processor = None -def signal_handler(sig, frame): - global tcp_service,video_processor - print(f"收到退出信号 sig={sig},程序准备退出") - tcp_service.stop() - tcp_service.join() - video_processor.stop() - videoPush.stop() - print(f"===释放完毕,退出===") - sys.exit(0) - -signal.signal(signal.SIGINT, signal_handler) # 捕获 Ctrl+C 信号 -if __name__ == '__main__': - if __debug__: - print("Debug 模式") - else: - print("release 模式") - - config_path = "./config.json" - # 读取配置文件 - config_obj= configSer.ConfigOperate(config_path) - json_str = config_obj.config_info.to_json(indent=4) - print(f"当前配置:{json_str}") - - tcp_service = tcp_Ser.TcpSer("0.0.0.0", config_obj.config_info.server.port) - tcp_service.start() - reporter = upload.DataReporter.DataReporter(data_fps=config_obj.config_info.fps.data,video_fps=config_obj.config_info.fps.video) - reporter.register_handler(tcp_service.broadcast_message) - reporter.start() - # 启动video - videoDetection.configObj=config_obj - video_processor = videoDetection.VideoProcessor(reporter) - # 添加订阅者processor - tcp_service.add_subscribe(video_processor) - - # 推流 - videoPush.video_server_run() - - # 启动 - video_processor.video_mode(config_obj.config_info.capture) - - print("==over==") diff --git a/build/Dockerfile_app b/build/Dockerfile_app deleted file mode 100644 index b004c93..0000000 --- a/build/Dockerfile_app +++ /dev/null @@ -1,6 +0,0 @@ -FROM registry.ngaiot.com/tools/cv4.10_np2.2:v2 -WORKDIR /app/ -COPY ./wuyuanbiaoba ./wuyuanbiaoba -RUN pyinstaller -F --name=wuyuan.app --python-option=O ./wuyuanbiaoba/app.py -COPY ./wuyuanbiaoba/config.json ./dist/ -CMD ["/app/dist/wuyuan.app"] \ No newline at end of file diff --git a/build/Dockerfile_base b/build/Dockerfile_base deleted file mode 100644 index 8a42c72..0000000 --- a/build/Dockerfile_base +++ /dev/null @@ -1,4 +0,0 @@ -FROM registry.ngaiot.com/tools/python:3.10.18 -RUN pip3 install opencv-python==4.10.0.84 -i https://mirrors.aliyun.com/pypi/simple/ -RUN pip3 install numpy==2.2.6 -i https://mirrors.aliyun.com/pypi/simple/ -RUN pip3 install pyinstaller==6.14.1 -i https://mirrors.aliyun.com/pypi/simple/ diff --git a/config.json b/config.json deleted file mode 100644 index 6044b02..0000000 --- a/config.json +++ /dev/null @@ -1,59 +0,0 @@ -{ - "server": { - "port": 2230 - }, - "fps": { - "data": 10, - "video": 0 - }, - "capture": "0", - "targets": { - "0": { - "info": { - "rectangle_area": { - "x": 1200, - "y": 1180, - "w": 130, - "h": 100 - }, - "threshold": { - "binary": 200, - "gauss": 3 - }, - "radius": 20.0, - "id": 0, - "desc": "0_up", - "base": false - }, - "perspective": [ - [ - 1.0, - 0.0, - 0.0 - ], - [ - 0.0, - 1.0, - 0.0 - ], - [ - 0.0, - 0.0, - 1.0 - ] - ], - "handler_info": { - "radius_pix": 27.01, - "pix_length": 0.7404664938911514, - "center_point": { - "x": 261.5, - "y": 107.0 - }, - "center_init": { - "x": 261.5, - "y": 108.5 - } - } - } - } -} \ No newline at end of file diff --git a/configSer.py b/configSer.py deleted file mode 100644 index 1e3bb19..0000000 --- a/configSer.py +++ /dev/null @@ -1,114 +0,0 @@ -import json -import os -from dataclasses import ( - dataclass, - field -) -from typing import Dict - -import numpy as np -from dataclasses_json import dataclass_json - -import models.target - -_file_path: str - - -@dataclass_json -@dataclass -class Server: - port: int = 0 - -@dataclass_json -@dataclass -class Fps: - data: int = 0 - video: int = 0 - -@dataclass_json -@dataclass -class ConfigInfo: - server:Server - fps:Fps - capture: str = "0" - # 标靶配置 - targets: Dict[int, models.target.CircleTarget] = field(default_factory=dict) - -class ConfigOperate: - _file_path: str - config_info: ConfigInfo - def __init__(self,path:str): - self._file_path = path - self.load2obj_sample() - - - def load2dict(self): - """"读取配置""" - if not os.path.exists(self._file_path): - raise FileNotFoundError(f"配置文件 {self._file_path} 不存在") - - with open(self._file_path) as json_file: - config = json.load(json_file) - return config - - # def load2obj_sample2(self): - # """"读取配置""" - # dic=self.load2dict() - # ts = dic["targets"] - # capture = dic["capture"] - # # 获取矩阵数据 - # matrix_dict = dic.get("perspective", {}) - # # n0=convert_to_ndarray(self.matrix_dict["0"]) - # # 将矩阵转换为字符串 - # # matrix_str = np.array2string(n0, precision=8, separator=', ', suppress_small=True) - # for _,t in ts.items(): - # obj = models.target.TargetInfo(**t) - # area = models.target.RectangleArea.from_dict(obj.rectangle_area) - # thres = models.target.Threshold(**obj.threshold) - # self.targets[obj.id] = models.target.CircleTarget( - # obj.id, - # obj.desc, - # area, - # obj.radius, - # thres, - # obj.base - # ) - # return self.targets - - def load2obj_sample(self): - dic=self.load2dict() - dict_str = json.dumps(dic) - self.config_info=ConfigInfo.from_json(dict_str) - - def save2json_file(self): - json_str = self.config_info.to_json(indent=4) - """"更新配置""" - with open(self._file_path, 'w') as json_file: - json_file.write(json_str) - # json.dump(self, json_file, indent=4) - return None - - - def save_dict_config(self, dict_data:Dict): - """"更新配置""" - with open(self._file_path, 'w') as json_file: - json.dump(dict_data, json_file, indent=4) - return None - - def update_dict_config(self, updates): - """ - 更新配置文件中的特定字段。 - :param file_path: 配置文件路径 - :param updates: 包含更新内容的字典 - """ - config_dict = self.load2dict() - config_dict.update(updates) - self.save_dict_config(config_dict) - -def convert_to_ndarray(matrix_data): - """ - 将 JSON 中的矩阵数据转换为 numpy ndarray。 - :param matrix_data: JSON 中的矩阵数据(列表形式) - :return: numpy ndarray - """ - return np.array(matrix_data, dtype=np.float64) \ No newline at end of file diff --git a/images/biaoba.jpg b/images/biaoba.jpg deleted file mode 100644 index 4de45eca936aab7cfb63b91ff8e98c80c20eafd9..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2151 zcmbW!dpy(o9|!R7Y{Q1xn3-EU*df#-#B$l_B248{xs=o)QewFrb4{j@8Cs;HLM}!d1CzLfXEaW9HWla(EQcFaJ`YSg(YE&m9@-`0t?I3BFS>Z(2XCW#87h3OD&}(cez2$5Wa+ z=2wjHye|%>9d(kiBw^M|i^-RxesNOU4_N$SGxKy_G3ef`U}g`%k&7QZ^%}$ z>0L2M8)fbSE7LY3pw6tsgA(fw^(t`K=Hf_f*9dU?g?bzF`r#cyrEwhGYMHj@xQPbm z=-0-Xyuy+68`d62xwy@UYPWsP*#}_W9X$wHtBMhzs#U4Y?QM@$m66sHgaQtC-0bYb zEn+4o-ha03bcRrb`FL3F)!q430C|Le;vk6;X4(C=H+4TQ=g5u3O!0k?eVek9I=+|< z&RK+6j*p7><1%bSS;VliM8fNhGZi)W9{l7rr9e&)hp+tPcBf!UiGIW#V+~ZocdS+Df}<>Zow8V zL+m{0QeL#-I`;TYhd!1;aDLCs;Pbq$Y=0(PFtIgM5t%UGosfOacHPmdQ!l5qgp~Bs zgRz_Z9)=mWQS4d_MP+sF++@NvT2aMgdm4I#Khu?ZKA6-wd1da&@a-{+QS~$aOY?r@ zD`CEF>|8P@PWAww;5k~f1a%v<{J>%pqE9*MpKVL*3C2F#b!pvWSQoiYuR7xKM<$=} zK0IzEB!TK#4C>g<1&v}S(ZdE5gcBH`mU}T;Ya=hJ@jV-H@nYVmXp;(+sALTB^x({i zbGAt2mw-{zcD-w~rX`r+b2uE_+3nc;PJO?1cvL-SCFGTU^`awDL{HN4eC}KynAIe} zj#vthi1)d{V{^D?)K*AitaYCz-E#K!ww3wugLX$fi^ytWe764Zs8`^>>r7^nR>5$- zsVLt+8S!lVM-+XzUU)e7hX2x=VGh*A-HAM_?JSEWV6RVE-7V`pl8lw;TU4kik+Sich4ti#TW2oChH3|Tr^N>y`_16OsW;a8+P_!MY=bq) z&U%-)p8MWEaGx&U&(VEkGbgMSp;k&uRspJ2dy;W}c@*BUw$u+_`PHF|TH`i5?a|X~ z*-H85trFyl-96oqz?iQp6;zjC6HBtUR*a|Mhmp@FWvKhj=2eub6L>lWSzGK~094Y=LZ>H{(MtHXZ+iyO#; diff --git a/main.py b/main.py deleted file mode 100644 index fcb1163..0000000 --- a/main.py +++ /dev/null @@ -1,9 +0,0 @@ -def print_hi(name): - # 在下面的代码行中使用断点来调试脚本。 - print(f'Hi, {name}') # 按 Ctrl+F8 切换断点。 - - -# 按装订区域中的绿色按钮以运行脚本。 -if __name__ == '__main__': - print_hi('PyCharm') - diff --git a/models/msg.py b/models/msg.py deleted file mode 100644 index c59606d..0000000 --- a/models/msg.py +++ /dev/null @@ -1,19 +0,0 @@ -import json -import typing -from dataclasses import dataclass -from dataclasses_json import dataclass_json - -@dataclass_json -@dataclass -class Msg: - _from: str - cmd: str - values: typing.Any - - def to_json_(self) -> str: - """将数据类序列化为 JSON 字符串""" - return self.to_json() - - - - diff --git a/models/sampleMsg.py b/models/sampleMsg.py deleted file mode 100644 index 8620eff..0000000 --- a/models/sampleMsg.py +++ /dev/null @@ -1,43 +0,0 @@ -import json -from dataclasses import dataclass -from typing import List - -@dataclass -class SensorImg: - base64: str - - def to_json(self) -> str: - """将数据类序列化为 JSON 字符串""" - return json.dumps(self.__dict__, indent=4, default=lambda x: x.__dict__) - - @classmethod - def from_json(cls, json_str: str) -> 'SensorImg': - """从 JSON 字符串反序列化为数据类""" - data_dict = json.loads(json_str) - return cls(**data_dict) -@dataclass -class SensorData: - pos: str - x: float - y: float - - def to_json(self) -> str: - """将数据类序列化为 JSON 字符串""" - return json.dumps(self.__dict__, indent=4, default=lambda x: x.__dict__) - - @classmethod - def from_json(cls, json_str: str) -> 'SensorData': - """从 JSON 字符串反序列化为数据类""" - data_dict = json.loads(json_str) - return cls(**data_dict) - -@dataclass -class AllSensorData: - data: List[SensorData] - time: str - - -@dataclass -class AllImg: - image: SensorImg - time: str \ No newline at end of file diff --git a/models/target.py b/models/target.py deleted file mode 100644 index 860eef5..0000000 --- a/models/target.py +++ /dev/null @@ -1,133 +0,0 @@ -from dataclasses import dataclass, field -from numbers import Number -from typing import Optional, Dict, Deque -from collections import deque -import numpy as np -from dataclasses_json import dataclass_json, config - -import utils - - -@dataclass_json -@dataclass -class Point: - x:float - y:float - def __iter__(self): # 使对象可迭代,可直接转为元组 - yield self.x - yield self.y - -@dataclass -class RectangleArea: - x: int - y: int - w: int - h: int - @classmethod - def from_dict(cls, data: dict): - return cls( - x=data['x'], - y=data['y'], - w=data['w'], - h = data['h']) - -@dataclass -class Threshold: - binary: int - gauss: int - -@dataclass_json -@dataclass -class TargetInfo: - # 标靶方形区域 - rectangle_area:RectangleArea - threshold:Threshold - # 标靶物理半径 - radius:float=0.0 - id:int =-1 - desc:str="" - base:bool=False - def __init__(self,id,desc,rectangle_area:RectangleArea,radius,threshold:Threshold,base:bool,**kwargs): - self.id = id - self.desc = desc - self.rectangle_area=rectangle_area - self.radius=radius - self.threshold=threshold - self.base=base - - @classmethod - def from_dict(cls,data: dict): - return cls(data['id'],data['rectangle_area'],data['radius']) - -@dataclass_json -@dataclass -class HandlerInfo: - # 初始话 - is_init=True - radius_pix:float= 1.0 - pix_length:float=0.0 - # 标靶中心 - center_point_queue:Deque[Point] = field(default_factory=lambda: deque(maxlen=10)) - center_point: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) - center_init : Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) - # 标靶位移(像素) - displacement_pix: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) - displacement_phy: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) - - def calculate_mean(self)-> Point: - """计算队列中所有数据的均值""" - if self.center_point_queue: - length=len(self.center_point_queue) - mean_x = sum(p.x for p in self.center_point_queue) / length - mean_y = sum(p.y for p in self.center_point_queue) / length - return Point(mean_x, mean_y) - else: - return None - - - def enqueue_center_point(self, data) -> Point: - """入队操作""" - self.center_point_queue.append(data) - return self.calculate_mean() - -@dataclass_json -@dataclass -class CircleTarget: - # 标靶方形区域 - info:TargetInfo - # 标靶透视矩阵 - perspective: np.ndarray = field( - metadata=config( - encoder=utils.encode_perspective, - decoder=utils.decode_perspective - ) - ) - handler_info: Optional[HandlerInfo]=None - - - # def __init__(self,info:TargetInfo,center_point,center_init,displacement_pix,displacement_phy): - # self.info=info - # self.center_point=center_point - # self.center_init=center_init - # self. displacement_pix=displacement_pix - # self.displacement_phy=displacement_phy - - - @classmethod - def init_by_info(cls,t:TargetInfo): - return CircleTarget(t,None,None,None,None) - - def circle_displacement_pix(self): - previous = self.handler_info.center_init - if previous != (): - self.handler_info.displacement_pix = Point(self.handler_info.center_point.x - previous.x, - self.handler_info.center_point.y - previous.y) - return self - def circle_displacement_phy(self): - if self.info.radius != 0 and self.handler_info.displacement_pix is not None: - # 单位像素->距离 - self.handler_info.pix_length = self.info.radius / self.handler_info.radius_pix - offset_x = round(float(self.handler_info.displacement_pix.x * self.handler_info.pix_length), 5) - offset_y = round(float(self.handler_info.displacement_pix.y * self.handler_info.pix_length), 5) - self.handler_info.displacement_phy = Point(offset_x, offset_y) - return self \ No newline at end of file diff --git a/tcp_Ser.py b/tcp_Ser.py deleted file mode 100644 index 93dde83..0000000 --- a/tcp_Ser.py +++ /dev/null @@ -1,132 +0,0 @@ -import logging -import socket -import threading - -from models.msg import Msg - - -class TcpSer(threading.Thread): - # 定义服务器地址和端口 - HOST = '127.0.0.1' - PORT = 2230 - def __init__(self,host,port): - super().__init__() - self.HOST=host - self.PORT=port - self.connected_clients=[] - # 消费者 - self.consumers=[] - self.lock = threading.Lock() - self.running = True - # self.daemon = True - # 处理客户端连接的函数 - def handle_client(self,client_socket): - try: - # 保持连接,直到客户端断开 - while self.running: - # 接收客户端数据(如果需要) - data = client_socket.recv(4096) - msg_str=data.decode('utf-8') - if not data: - break # 如果没有数据,退出循环 - print(f"从 {client_socket.getpeername()} 收到: {msg_str}") - # 反序列化为 实例 - s_cmd = Msg.from_json(msg_str) - valid_msg:bool=True - match s_cmd.cmd: - case "getPoints" | "setPoints": - self.on_data(s_cmd,valid_msg) - case "videoFps"| "dataFps": - self.on_data(s_cmd,valid_msg) - case "setCap": - self.on_data(s_cmd,valid_msg) - case "videoMode": - self.on_data(s_cmd,valid_msg) - # todo 添加处理 - case _: - valid_msg = False - err_msg=f"valid cmd={s_cmd.cmd}" - resp=f"""{{"_from": "dev","cmd": "{s_cmd.cmd}","values": {{"operate": false,"err": "{err_msg}"}}}}""" - client_socket.sendall(resp.encode()) - print("非法命令",resp) - - print("通讯完成") - except ConnectionAbortedError: - print("客户端已断开") - except Exception as e: - print(f"处理客户端时出错: {e}") - finally: - pass - - - # 注册的消费者必须携带on_data 方法 - def add_subscribe(self,consumer): - if hasattr(consumer, 'on_data'): - print(f"加入 consumer {consumer} ") - self.consumers.append(consumer) - else: - print("consumer 缺少on_data函数,订阅无效 ") - def on_data(self,msg,valid): - for consumer in self.consumers: - try: - resp=consumer.on_data(msg) - self.broadcast_message(resp) - except Exception as e: - logging.warn("通讯异常",e) - - # 广播消息给所有连接的客户端 - def broadcast_message(self,message:str): - with self.lock: - if len(message)==0: - return - - message+="\n\n" - for client in self.connected_clients: - try: - client.sendall(message.encode()) - except Exception as e: - print(f"向客户端发送消息时出错: {e}") - # 如果发送失败,从列表中移除客户端 - if client in self.connected_clients: - self.connected_clients.remove(client) - client.close() - def run(self): - # 创建服务器套接字 - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: - server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind((self.HOST,self.PORT)) - server_socket.listen() - server_socket.settimeout(1.0) # 设置 accept 超时时间为 1 秒 - print(f"服务器监听在 {self.HOST}:{self.PORT}...") - - try: - # 保持服务器运行并接受新的连接 - while self.running: - try: - client_socket, addr = server_socket.accept() - print(f"连接来自 {addr}") - - # 当客户端连接时,将其添加到列表中 - self.connected_clients.append(client_socket) - - # 为每个客户端启动一个线程 - client_thread = threading.Thread(target=self.handle_client, args=(client_socket,)) - # client_thread.daemon = True # 守护线程,服务器关闭时自动结束 - client_thread.start() - except socket.timeout: - continue # 超时继续循环 - except KeyboardInterrupt: - print("服务器关闭...") - finally: - # 关闭所有客户端连接 - for client in self.connected_clients: - print(f"断开客户端 {client_socket.getpeername()}") - client.close() - server_socket.close() - def stop(self): - """外部调用此方法停止服务""" - self.running = False - -if __name__ == '__main__': - tcp=TcpSer("127.0.0.1",2230) - tcp.run() \ No newline at end of file diff --git a/upload/DataReporter.py b/upload/DataReporter.py deleted file mode 100644 index 3f54789..0000000 --- a/upload/DataReporter.py +++ /dev/null @@ -1,79 +0,0 @@ -import queue -import threading -import time -from datetime import datetime - -import models.msg -from upload.RateLimiter import RateLimiter - - -class DataReporter(threading.Thread): - call_back=None - def __init__(self,data_fps:int,video_fps:int): - super().__init__() - self.image_queue = queue.Queue(maxsize=video_fps) # 图片队列 - self.data_queue = queue.Queue(maxsize=data_fps) # 数据队列 - self.image_limiter = RateLimiter(max_rate=video_fps, time_window=1) # 图片限速: 5张/秒 - self.data_limiter = RateLimiter(max_rate=data_fps, time_window=1) # 数据限速: 20条/秒 - self.running = True - self.image_dropped = 0 # 统计丢弃的图片数量 - self.data_dropped = 0 # 统计丢弃的数据数量 - self.daemon = True - - def register_handler(self,handler_fun): - self.call_back = handler_fun - - def run(self): - while self.running: - # 优先处理图片上报 - if not self.image_queue.empty() and self.image_limiter.allow_request(): - try: - image_data = self.image_queue.get_nowait() - self._report_image(image_data) - except queue.Empty: - pass - - # 然后处理数据上报 - if not self.data_queue.empty() and self.data_limiter.allow_request(): - try: - data = self.data_queue.get_nowait() - self._report_data(data) - except queue.Empty: - pass - - time.sleep(0.02) # 避免CPU占用过高 - - def _report_image(self, data): - # 实现图片上报逻辑 - print(f"Reporting image, timestamp: {data[0]}") - # 这里替换为实际的上报代码 - msg = models.msg.Msg(_from="dev", cmd="image", values=data[1]) - msg_json = msg.to_json() - self.call_back(msg_json) - - def _report_data(self, data): - # 实现数据上报逻辑 - print(f"Reporting data: {data}") - # 实际的上报代码,数据结构转换 - msg=models.msg.Msg(_from="dev",cmd="data",values=data[1]) - # 重新格式化时间,数据等间隔 - msg.values.time=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - msg_json=msg.to_json() - self.call_back(msg_json) - - def stop(self): - self.running = False - self.join() - print(f"Stats: {self.image_dropped} images dropped, {self.data_dropped} data dropped") - - def adjust_rate(self, new_rate, data_type='image'): - if data_type == 'image': - with self.image_limiter.lock: - self.image_limiter.max_rate = new_rate - self.image_queue= queue.Queue(maxsize=new_rate) - self.image_limiter.update_interval() - else: - with self.data_limiter.lock: - self.data_limiter.max_rate = new_rate - self.data_queue = queue.Queue(maxsize=new_rate) - self.data_limiter.update_interval() \ No newline at end of file diff --git a/upload/DroppingQueue.py b/upload/DroppingQueue.py deleted file mode 100644 index d520248..0000000 --- a/upload/DroppingQueue.py +++ /dev/null @@ -1,12 +0,0 @@ -import queue - - -class DroppingQueue(queue.Queue): - """自定义队列,满时自动丢弃最旧的数据""" - def put(self, item, block=False, timeout=None): - try: - return super().put(item, block=block, timeout=timeout) - except queue.Full: - # 队列满时丢弃最旧的一个数据 - self.get_nowait() - return super().put(item, block=False) \ No newline at end of file diff --git a/upload/RateLimiter.py b/upload/RateLimiter.py deleted file mode 100644 index f03769c..0000000 --- a/upload/RateLimiter.py +++ /dev/null @@ -1,26 +0,0 @@ -import threading -import queue -import time -from collections import deque - -class RateLimiter: - def __init__(self, max_rate, time_window): - self.max_rate = max_rate - self.time_window = time_window - self.lock = threading.Lock() - self.current_time = time.time() - self.interval = time_window / max_rate if max_rate > 0 else 0 - self.next_available_time = self.current_time - - def update_interval(self): - self.interval = self.time_window / self.max_rate if self.max_rate > 0 else 0 - - def allow_request(self): - with self.lock: - if self.interval<=0: - return False - current_time = time.time() - if current_time >= self.next_available_time: - self.next_available_time = current_time + self.interval - return True - return False diff --git a/utils.py b/utils.py deleted file mode 100644 index ae2965d..0000000 --- a/utils.py +++ /dev/null @@ -1,36 +0,0 @@ -from typing import Dict, List - -import cv2 -import base64 - -import numpy as np -def frame_to_img(frame, format="JPEG"): - """将 OpenCV 读取的图片帧转换为 img""" - # 将图片帧编码为 JPEG 或 PNG 格式 - if format.upper() == "JPEG": - encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 80] # JPEG 压缩质量 - elif format.upper() == "PNG": - encode_param = [int(cv2.IMWRITE_PNG_COMPRESSION), 9] # PNG 压缩级别 - else: - raise ValueError("Unsupported format. Use 'JPEG' or 'PNG'.") - - result, encoded_img = cv2.imencode(f".{format.lower()}", frame, encode_param) - return result,encoded_img - -def frame_to_base64(frame, format="JPEG"): - """将 OpenCV 读取的图片帧转换为 Base64 编码的字符串""" - result, encoded_img = frame_to_img(frame, format=format) - if not result: - raise ValueError("Failed to encode frame.") - - # 将编码后的字节流转换为 Base64 字符串 - base64_string = base64.b64encode(encoded_img).decode("utf-8") - return base64_string - - -# 自定义编码器和解码器 -def encode_perspective(value: np.ndarray) -> List[List[float]]: - return value.tolist() - -def decode_perspective(value: List[List[float]]) -> np.ndarray: - return np.array(value) \ No newline at end of file diff --git a/videoDetection.py b/videoDetection.py deleted file mode 100644 index 1af821a..0000000 --- a/videoDetection.py +++ /dev/null @@ -1,463 +0,0 @@ -from datetime import datetime -import json -import queue -import time -from time import sleep -import cv2 -import numpy as np -import logging - -import configSer -import models.target -import models.sampleMsg -import upload.DataReporter -import utils -import videoPush -from models.msg import Msg - -logging.basicConfig(level=logging.DEBUG) -drawing: bool = False # 是否正在绘制 -is_video_mode: bool = False # 是否采用标靶图覆盖 -# 定义点 -start_point: models.target.Point -end_point: models.target.Point -# 配置 -configObj:configSer.ConfigOperate - -# 鼠标回调函数 -def add_rectangle(event, x, y, flags, param): - global start_point, end_point, drawing - - if event == cv2.EVENT_LBUTTONDOWN: # 左键按下 - logging.info("左键按下") - start_point = models.target.Point(x, y) - end_point = start_point - drawing = True - elif event == cv2.EVENT_MOUSEMOVE: # 鼠标移动 - if drawing: - end_point = models.target.Point(x, y) - elif event == cv2.EVENT_LBUTTONUP: # 左键抬起 - logging.info("左键抬起") - drawing = False - end_point = models.target.Point(x, y) - if start_point == end_point: - return - distance = cv2.norm(tuple(start_point), tuple(end_point), cv2.NORM_L2) - if distance < 20: - logging.info("距离小于20,无效区域") - return - target_id = len(configObj.config_info.targets) - # 圆标靶半径 mm - radius = 20.0 - area=models.target.RectangleArea(int(start_point.x),int(start_point.y), - int(end_point.x-start_point.x),int(end_point.y-start_point.y)) - t_info=models.target.TargetInfo( target_id, - "test add", - area, - radius, - models.target.Threshold(190,9), - False) - new_target = models.target.CircleTarget(t_info,None,None) - logging.info(f"新增区域[{target_id}] => {start_point, end_point}") - configObj.config_info.targets[target_id] = new_target - -def read_target_rectangle(): - return configObj.config_info.targets -class VideoProcessor: - reporter: upload.DataReporter.DataReporter - capture: cv2.VideoCapture - is_opened: bool= False - is_running = True - def __init__(self, reporter:upload.DataReporter.DataReporter): - self.reporter = reporter - - def on_data(self,msg:Msg): - global configObj,is_video_mode - logging.info(f"msg={msg}") - match msg.cmd: - case "getPoints": - targets=configObj.config_info.targets.copy() - for k,v in targets.items(): - targets[k].handler_info=None - - resp_msg = models.msg.Msg(_from="dev", cmd="getPoints", values={"targets": targets}) - resp_json = resp_msg.to_json_() - return resp_json - case "setPoints": - v=msg.values - ts=v["targets"] - - # 清空原配置 - configObj.config_info.targets={} - for _,t in ts.items(): - t_str=json.dumps(t) - new_c_target = models.target.CircleTarget.from_json(t_str) - configObj.config_info.targets[new_c_target.info.id] =new_c_target - - configObj.save2json_file() - resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True}) - resp_json = resp_msg.to_json() - return resp_json - case "videoFps": - v = msg.values - fps = v["fps"] - self.reporter.adjust_rate(fps,"image") - configObj.config_info.fps.video = fps - configObj.save2json_file() - resp_msg = models.msg.Msg(_from="dev", cmd="videoFps", values={"operate": True}) - resp_json = resp_msg.to_json() - return resp_json - case "dataFps": - v = msg.values - fps = v["fps"] - self.reporter.adjust_rate(fps,"data") - configObj.config_info.fps.data=fps - configObj.save2json_file() - resp_msg = models.msg.Msg(_from="dev", cmd="dataFps", values={"operate": True}) - resp_json = resp_msg.to_json() - return resp_json - case "setCap": - v = msg.values - cap = v["cap"] - self.switch_video(cap) - resp_msg = models.msg.Msg(_from="dev", cmd="setCap", values={"operate": True}) - resp_json = resp_msg.to_json() - return resp_json - case "videoMode": - v = msg.values - is_debug = v["debug"] - is_video_mode=is_debug - resp_msg = models.msg.Msg(_from="dev", cmd="videoMode", values={"operate": True}) - resp_json = resp_msg.to_json() - return resp_json - print("==") - - - def pre_handler_img(self,gray_frame,now_str:str): - # 将灰度图压缩为 JPEG 格式,并存储到内存缓冲区 - img_base64 = utils.frame_to_base64(gray_frame, format="JPEG") - all_img = models.sampleMsg.AllImg(image=img_base64, time=now_str) - self.enqueue_image(all_img) - - def draw_rectangle(self,img): - global configObj,is_video_mode - gray_frame = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - - now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - #图像发送 - if configObj.config_info.fps.video > 0: - self.pre_handler_img(gray_frame,now_str) - - - if len(configObj.config_info.targets)==0: return - - # 基点 - base_point_pix:models.target.Point=None - - #上报有新数据的点 - all_upload_data = models.sampleMsg.AllSensorData(data=[], time=now_str) - # 绘图-历史点 - for i, tr in configObj.config_info.targets.items(): - if not hasattr(tr, "info"): - print("====") - _start_point = models.target.Point(tr.info.rectangle_area.x, tr.info.rectangle_area.y) - _end_point = models.target.Point( - tr.info.rectangle_area.x+tr.info.rectangle_area.w, - tr.info.rectangle_area.y+tr.info.rectangle_area.h) - #绘制标靶区域 - blue_color = (255, 0, 0) - if tr.info.base: - blue_color=(200, 0, 200) #紫红色 基点 - - cv2.rectangle(img,tuple(_start_point), tuple(_end_point), blue_color, 2) - label=f"{tr.info.desc},r={tr.info.radius}" - cv2.putText(img, label, (_start_point.x,_start_point.y-6), cv2.FONT_HERSHEY_SIMPLEX, 0.6, blue_color,1) - #检测 - # 获取图像尺寸 - frame_height, frame_width = gray_frame.shape[:2] - if _end_point.x>frame_width or _end_point.y>frame_height: - print(f"标靶[{tr.info.desc}]sub_image 超出区域") - continue - sub_image = self.extract_sub_image(gray_frame, _start_point, _end_point) - - - ret, sub_binary_frame = cv2.threshold(sub_image, tr.info.threshold.binary, 255, cv2.THRESH_BINARY) - # 高斯滤波 - gauss_size=tr.info.threshold.gauss - sub_binary_frame = cv2.GaussianBlur(sub_binary_frame, (gauss_size, gauss_size), sigmaX=0,sigmaY=0,borderType=cv2.BORDER_REPLICATE) - # sub_binary_frame = cv2.bilateralFilter(sub_binary_frame, 5, 50, 50) - if tr.perspective is not None: - # 宽度 - sub_width = sub_binary_frame.shape[1] - # 高度 - sub_height = sub_binary_frame.shape[0] - - sub_binary_frame = cv2.warpPerspective(sub_binary_frame, tr.perspective, (sub_width, sub_height)) - - - # 覆盖原图 - if is_video_mode: - sub_c_img= cv2.cvtColor(sub_binary_frame, cv2.COLOR_GRAY2BGR) - self.cover_sub_image(img,sub_c_img, _start_point, _end_point) - - if __debug__: - cv2.imshow(f'{tr.info.id}_binaryImg', sub_binary_frame) - - circles = self.circle2_detect(sub_binary_frame) - if len(circles) == 0: - continue - center,radius_pix=self.circle_match(circles,_start_point) - # 纪录圆心位置 - if tr.handler_info is None: - tr.handler_info= models.target.HandlerInfo() - if tr.handler_info.is_init: - tr.handler_info.is_init=False - tr.handler_info.center_init = center - - # 数据平滑处理 - smooth_center = tr.handler_info.enqueue_center_point(center) - # print(f"{tr.info.desc},平滑len={len(tr.handler_info.center_point_queue)},平滑中心点={smooth_center},原始点={center}") - tr.handler_info.center_point=smooth_center - # 原始处理 tr.handler_info.center_point=center - tr.handler_info.radius_pix=radius_pix - tr.circle_displacement_pix() - - # 基点 - if tr.info.base: - base_point_pix=tr.handler_info.displacement_pix - - # 画圆 - self.circle_show(img,smooth_center,radius_pix,_start_point,_end_point) - - # 基于像素点计算 物理距离 - for i, tr in configObj.config_info.targets.items(): - - if tr.handler_info is None: - continue - if tr.handler_info.displacement_pix is None: - continue - # 减去基点偏移 - if base_point_pix is not None: - raw_point=tr.handler_info.displacement_pix - tr.handler_info.displacement_pix=models.target.Point( - x=raw_point.x-base_point_pix.x, - y=raw_point.y-base_point_pix.y) - if not tr.info.base: - # print(f"[{tr.info.id}]{tr.info.desc} 原偏 {raw_point} - 基偏{base_point_pix} ={tr.handler_info.displacement_pix}") - pass - tr.circle_displacement_phy() - if tr.handler_info.displacement_phy is not None: - all_upload_data.data.append( - models.sampleMsg.SensorData( - str(tr.info.id), - tr.handler_info.displacement_phy.x, - tr.handler_info.displacement_phy.y) - ) - tr.handler_info.displacement_pix=None - tr.handler_info.displacement_phy = None - - - #过滤无效空数据 - if len(all_upload_data.data)==0: - return - - if configObj.config_info.fps.data > 0: - self.enqueue_data(all_upload_data) - - - - def circle_match(self,circles, rect_s_point:models.target.Point)-> (models.target.Point,float): - - circle = max(circles, key=lambda c: c[2]) - - # 绘制圆心 - center = (circle[0] + rect_s_point.x, circle[1] + rect_s_point.y) - radius = float(np.round(circle[2], 3)) - cp = models.target.Point(x=center[0], y=center[1]) - return cp,radius - - def circle_show(self, img, center: models.target.Point,radius:float, rect_s_point: models.target.Point,rect_e_point: models.target.Point): - - font = cv2.FONT_HERSHEY_SIMPLEX - color = (255, 0, 0) # 蓝色 - scale = 0.5 - center_int = tuple(int(x) for x in center) - radius_int = int(radius) - cv2.circle(img, center_int, 2, (0, 255, 0), 4) - # 绘制外圆 - cv2.circle(img, center_int, radius_int, (0, 0, 255), 1) - # 打印圆心坐标 - - text1 = f"c:{(center.x,center.y,radius)}" - txt_location = (rect_s_point.x+2,rect_e_point.y-2) - # txt_location = (center_int[0] - radius_int, center_int[1] + radius_int + 10) - cv2.putText(img, text1, txt_location, font, scale, color, 1) - - - def circle2_detect(self,img): - # 圆心距 canny阈值 最小半径 最大半径 - circles_float = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 1, 30, param1=200, param2=0.8, minRadius=15, - maxRadius=0) - # 创建一个0行, 2列的空数组 - if circles_float is not None: - # 提取圆心坐标(保留2位小数) - centers = [(round(float(x),2), round(float(y),2), round(float(r),2)) for x, y, r in circles_float[0, :]] - return centers - else: - return [] - - - def extract_sub_image(self,frame, top_left, bottom_right): - """ - 从帧中截取子区域 - :param frame: 输入的视频帧 - :param top_left: 子图片的左上角坐标 (x1, y1) - :param bottom_right: 子图片的右下角坐标 (x2, y2) - :return: 截取的子图片 - """ - x1, y1 = top_left - x2, y2 = bottom_right - return frame[y1:y2, x1:x2] - - def cover_sub_image(self,frame,sub_frame, top_left, bottom_right): - x1, y1 = top_left - x2, y2 = bottom_right - frame[y1:y2, x1:x2]= sub_frame - return frame - - - def open_video(self,video_id): - sleep(1) - print(f"打开摄像头 -> {video_id}") - self.capture = cv2.VideoCapture(video_id) - frame_width = int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH)) - frame_height = int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) - print(f"默认分辨率= {frame_width}*{frame_height}") - logging.info(f"{video_id}地址->{self.capture}") - if not self.capture.isOpened(): - self.capture.release() - logging.info(f"无法打开摄像头{video_id}, release地址 -> {self.capture}") - return - fps = self.capture.get(cv2.CAP_PROP_FPS) - print(f"fps={fps},video_id={video_id},") - # self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, 1600) # 宽度 - # self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 900) # 高度 - self.is_opened=True - - def switch_video(self,video_id:str): - print(f"切换摄像头 -> {video_id}") - self.is_opened = False - self.capture.release() - cv2.destroyAllWindows() - if str.isdigit(video_id): - video_id=int(video_id) - self.open_video(video_id) - - - - def show_video(self): - global sigExit,start_point, end_point, drawing - if __debug__: - cv2.namedWindow('Frame') - cv2.setMouseCallback('Frame', add_rectangle) - # 读取一帧图像 - while self.is_running: - if not self.is_opened: - print(f"摄像头 标记is_opened={self.is_opened}") - sleep(5) - continue - ret, frame = self.capture.read() - if ret: - self.frame_handle(frame) - else: - logging.info(f"无法读取帧,cap地址- >{self.capture}") - sleep(1) - # self.capture.release() - # self.capture= cv2.VideoCapture(0) # 再次尝试 - cv2.waitKey(1) - # 显示图像 - if frame is not None: - if __debug__: - cv2.imshow('Frame', frame) - #缓存到推流 - videoPush.update_latest_frame(frame) - print("退出VideoProcessor") - def show_image(self,frame): - global start_point, end_point, drawing - if __debug__: - cv2.namedWindow('Frame') - cv2.setMouseCallback('Frame', add_rectangle) - # 读取一帧图像 - while True: - cp_img=frame.copy() - self.frame_handle(cp_img) - - if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 - break - cv2.destroyAllWindows() - - def frame_handle(self,frame): - # 绘图-历史点 - self.draw_rectangle(frame) - # 绘图-实时 - if drawing: - cv2.rectangle(frame, tuple(start_point), tuple(end_point), (0, 200, 200), 4) - # print(f"鼠标位置 {start_point} -> {end_point}") - - def image_mode(self): - img_raw=cv2.imread('images/trans/_4point.jpg')#images/target/rp80max3.jpg - # img_raw = cv2.imread('images/trans/_4point.jpg') # images/target/rp80max3.jpg - # img_raw = cv2.imread('images/target/rp80.jpg') # images/target/rp80max3.jpg - self.show_image(img_raw) - - # 支持 - def video_mode(self,video_id:str): - if str.isdigit(video_id): - video_id=int(video_id) - self.open_video(video_id) - self.show_video() - # 释放摄像头资源并关闭所有窗口 - print("退出 video") - self.capture.release() - cv2.destroyAllWindows() - - def rtsp_mode(self,rtsp_url:str): - # rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554" - # rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554/h264/ch1/main/av_stream" - self.open_video(rtsp_url) - fps = self.capture.get(cv2.CAP_PROP_FPS) - print(f"rtsp fps={fps}") - self.show_video() - # 释放摄像头资源并关闭所有窗口 - self.capture.release() - cv2.destroyAllWindows() - - def enqueue_data(self,data): - # 获取当前时间戳 - timestamp = time.time() - # 将时间戳转换为 datetime 对象 - dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位 - # 放入图片队列(自动丢弃最旧数据当队列满时) - try: - self.reporter.data_queue.put((dt, data), block=False) - except queue.Full: - # self.reporter.data_dropped += 1 - pass - def enqueue_image(self,data): - # 获取当前时间戳 - timestamp = time.time() - # 将时间戳转换为 datetime 对象 - dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位 - # 放入图片队列(自动丢弃最旧数据当队列满时) - try: - self.reporter.image_queue.put((dt, data), block=False) - except queue.Full: - pass - #self.reporter.image_dropped += 1 - - def stop(self): - self.is_running=False - self.capture.release() - - - diff --git a/videoPush.py b/videoPush.py deleted file mode 100644 index 48a74e5..0000000 --- a/videoPush.py +++ /dev/null @@ -1,60 +0,0 @@ -from time import sleep - -import cv2 -import threading -import time - -import numpy as np -import requests -from flask import Flask, Response, render_template_string, request - -import utils - -app_flask = Flask(__name__) - -# 全局变量,用于存储最新的帧 -latest_frame:np.ndarray = None -lock = threading.Lock() -is_running = True - -def update_latest_frame(n_bytes:np.ndarray): - global latest_frame - latest_frame=n_bytes - -def generate_mjpeg(): - """生成MJPEG格式的视频流""" - while is_running: - with lock: - if latest_frame is None: - continue - _,latest_frame_bytes = utils.frame_to_img(latest_frame) - frame = latest_frame_bytes.tobytes() - pass - sleep(0.1) - yield (b'--frame\r\n' - b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') - - -@app_flask.route('/video_flow') -def video_feed(): - """视频流路由""" - return Response( - generate_mjpeg(), - mimetype='multipart/x-mixed-replace; boundary=frame' - ) - -def run(): - port=2240 - print(f"推流服务启动,访问端口 127.0.0.1:{port}/video_flow") - app_flask.run(host='0.0.0.0', port=port, threaded=True) - -def video_server_run(): - thread = threading.Thread(target=run) - thread.daemon = True # 设置为守护线程,主线程退出时自动终止 - thread.start() -def stop(): - global is_running - is_running=False - -if __name__ == '__main__': - run() \ No newline at end of file