diff --git a/app.py b/app.py new file mode 100644 index 0000000..693123b --- /dev/null +++ b/app.py @@ -0,0 +1,90 @@ +import logging +import os +import sys +from time import sleep +import signal +import configSer +import logSet +import tcp_Ser +import upload.DataReporter +import videoDetection +import videoPush +from upload import mqttHelper + +tcp_service = None +video_processor = None +reporter = None +def signal_handler(sig, frame): + global tcp_service,video_processor,reporter + print(f"收到退出信号 sig={sig},程序准备退出") + tcp_service.stop() + tcp_service.join() + video_processor.stop() + videoPush.stop() + reporter.stop() + + print(f"===释放完毕,退出===") + sys.exit(0) +def read_config_path() ->str : + config_name = "config.json" + + # determine if application is a script file or frozen exe + if getattr(sys, 'frozen', False): + application_path = os.path.dirname(sys.executable) + elif __file__: + application_path = os.path.dirname(__file__) + + config_path = os.path.join(application_path, config_name) + + print(f"命令路径:{os.getcwd()}, config_path_abs路径:{config_path}") + return config_path + +signal.signal(signal.SIGINT, signal_handler) # 捕获 Ctrl+C 信号 +if __name__ == '__main__': + logSet.log_init() + if __debug__: + print("Debug 模式") + else: + print("release 模式") + + config_path_abs=read_config_path() + # 读取配置文件 + config_obj= configSer.ConfigOperate(config_path_abs) + json_str = config_obj.config_info.to_json(indent=4) + logging.info(f"当前配置:{json_str}") + + tcp_service = tcp_Ser.TcpSer("0.0.0.0", config_obj.config_info.server.port) + tcp_service.start() + reporter = upload.DataReporter.DataReporter(data_fps=config_obj.config_info.fps.data,video_fps=config_obj.config_info.fps.video) + reporter.register_handler(tcp_service.broadcast_message) + + if config_obj.config_info.upload.enable: + mq_config=config_obj.config_info.upload.mqtt + mq = upload.mqttHelper.MQTTClient(broker=mq_config.broker, + port=mq_config.port, + topic=mq_config.topic, + username=mq_config.username, + password=mq_config.password, + client_id=mq_config.client_id) + if mq.start(): + reporter.register_handler(mq.publish) + else: + logging.warn("mq链接失败,不上报") + + reporter.start() + # 启动video + videoDetection.configObj=config_obj + videoDetection.reporter=reporter + + video_processor = videoDetection.VideoProcessor() + # 添加订阅者processor + tcp_service.add_subscribe(video_processor) + + # 推流 + videoPush.video_server_run() + + + # 启动 + video_processor.video_mode(config_obj.config_info.capture) + + logging("==over==") diff --git a/build/Dockerfile_app b/build/Dockerfile_app new file mode 100644 index 0000000..6d564ec --- /dev/null +++ b/build/Dockerfile_app @@ -0,0 +1,6 @@ +FROM registry.ngaiot.com/tools/opencv_python_base_image:1 +WORKDIR /app/ +COPY ./wuyuanbiaoba ./wuyuanbiaoba +RUN pyinstaller -F --name=wuyuan.app --python-option=O ./wuyuanbiaoba/app.py +COPY ./wuyuanbiaoba/config.json ./dist/ +CMD ["/app/dist/wuyuan.app"] \ No newline at end of file diff --git a/build/Dockerfile_base b/build/Dockerfile_base new file mode 100644 index 0000000..8a42c72 --- /dev/null +++ b/build/Dockerfile_base @@ -0,0 +1,4 @@ +FROM registry.ngaiot.com/tools/python:3.10.18 +RUN pip3 install opencv-python==4.10.0.84 -i https://mirrors.aliyun.com/pypi/simple/ +RUN pip3 install numpy==2.2.6 -i https://mirrors.aliyun.com/pypi/simple/ +RUN pip3 install pyinstaller==6.14.1 -i https://mirrors.aliyun.com/pypi/simple/ diff --git a/build/wybb_watchdog.service b/build/wybb_watchdog.service new file mode 100644 index 0000000..ca143e3 --- /dev/null +++ b/build/wybb_watchdog.service @@ -0,0 +1,15 @@ +[Unit] +Description = wybb watchdog service +After=multi-user.target + +[Service] +ExecStart = /home/forlinx/lk/wuyuanbiaoba/dist/wuyuan.app #按照实际目录来填写 +Type = simple +Restart=always +RestartSec=30s # 检查一次,如果服务失败则重启 + + +[Install] +WantedBy = multi-user.target + +# 存放到 ubuntu /etc/systemd/system diff --git a/config.json b/config.json new file mode 100644 index 0000000..af1750b --- /dev/null +++ b/config.json @@ -0,0 +1,25 @@ +{ + "server": { + "port": 2230 + }, + "fps": { + "data": 30, + "video": 0 + }, + "alert": { + "enable": false, + "intervalSec": 60 + }, + "upload": { + "enable": true, + "mqtt": { + "broker": "218.3.126.49", + "port": 1883, + "topic": "wybb/mqtt", + "client_id": "wybb_lk" + } + }, + "capture": "C:/Users/Administrator/Videos/1k.mp4", + "targets": { + } +} \ No newline at end of file diff --git a/configSer.py b/configSer.py new file mode 100644 index 0000000..676a3fc --- /dev/null +++ b/configSer.py @@ -0,0 +1,116 @@ +import json +import os +from dataclasses import ( + dataclass, + field +) +from typing import Dict + +import numpy as np +from dataclasses_json import dataclass_json + +import models.target + + +@dataclass_json +@dataclass +class Server: + port: int = 0 + +@dataclass_json +@dataclass +class Fps: + data: int = 0 + video: int = 0 + +@dataclass_json +@dataclass +class Alert: + enable: bool = False + intervalSec: int = 0 + + +@dataclass_json +@dataclass +class Mqtt: + broker: str = "" + port: int = 1883 + topic: str = "wybb/mqtt" + username: str = "" + password: str = "" + client_id: str = "" + +@dataclass_json +@dataclass +class Upload: + mqtt: Mqtt + enable: bool = False + + + + +@dataclass_json +@dataclass +class ConfigInfo: + server:Server + fps:Fps + alert:Alert + upload:Upload + capture: str = "0" + # 标靶配置 + targets: Dict[int, models.target.CircleTarget] = field(default_factory=dict) + +class ConfigOperate: + _file_path: str + config_info: ConfigInfo + def __init__(self,path:str): + self._file_path = path + self.load2obj_sample() + + + def load2dict(self): + """"读取配置""" + if not os.path.exists(self._file_path): + raise FileNotFoundError(f"配置文件 {self._file_path} 不存在") + + with open(self._file_path) as json_file: + config = json.load(json_file) + return config + + def load2obj_sample(self): + dic=self.load2dict() + dict_str = json.dumps(dic) + self.config_info=ConfigInfo.from_json(dict_str) + + def save2json_file(self): + json_str = self.config_info.to_json(indent=4) + """"更新配置""" + with open(self._file_path, 'w') as json_file: + json_file.write(json_str) + # json.dump(self, json_file, indent=4) + return None + + + def save_dict_config(self, dict_data:Dict): + """"更新配置""" + with open(self._file_path, 'w') as json_file: + json.dump(dict_data, json_file, indent=4) + return None + + def update_dict_config(self, updates): + """ + 更新配置文件中的特定字段。 + :param file_path: 配置文件路径 + :param updates: 包含更新内容的字典 + """ + config_dict = self.load2dict() + config_dict.update(updates) + self.save_dict_config(config_dict) + +def convert_to_ndarray(matrix_data): + """ + 将 JSON 中的矩阵数据转换为 numpy ndarray。 + :param matrix_data: JSON 中的矩阵数据(列表形式) + :return: numpy ndarray + """ + return np.array(matrix_data, dtype=np.float64) \ No newline at end of file diff --git a/images/biaoba.jpg b/images/biaoba.jpg new file mode 100644 index 0000000..4de45ec Binary files /dev/null and b/images/biaoba.jpg differ diff --git a/images/target/bbbjt.png b/images/target/bbbjt.png new file mode 100644 index 0000000..31e2331 Binary files /dev/null and b/images/target/bbbjt.png differ diff --git a/images/target/bowa_target/1.jpg b/images/target/bowa_target/1.jpg new file mode 100644 index 0000000..e457aaf Binary files /dev/null and b/images/target/bowa_target/1.jpg differ diff --git a/images/target/bowa_target/max.jpg b/images/target/bowa_target/max.jpg new file mode 100644 index 0000000..755b666 Binary files /dev/null and b/images/target/bowa_target/max.jpg differ diff --git a/images/target/bowa_target/min.jpg b/images/target/bowa_target/min.jpg new file mode 100644 index 0000000..8a4d907 Binary files /dev/null and b/images/target/bowa_target/min.jpg differ diff --git a/images/target/need_trans.jpg b/images/target/need_trans.jpg new file mode 100644 index 0000000..593ec21 Binary files /dev/null and b/images/target/need_trans.jpg differ diff --git a/images/target/road.jpeg b/images/target/road.jpeg new file mode 100644 index 0000000..1e8b15e Binary files /dev/null and b/images/target/road.jpeg differ diff --git a/images/target/rp80.jpg b/images/target/rp80.jpg new file mode 100644 index 0000000..96dd598 Binary files /dev/null and b/images/target/rp80.jpg differ diff --git a/images/target/rp80max.jpg b/images/target/rp80max.jpg new file mode 100644 index 0000000..f684a3e Binary files /dev/null and b/images/target/rp80max.jpg differ diff --git a/images/target/rp80max2.jpg b/images/target/rp80max2.jpg new file mode 100644 index 0000000..ef9c64c Binary files /dev/null and b/images/target/rp80max2.jpg differ diff --git a/images/target/rp80max3.jpg b/images/target/rp80max3.jpg new file mode 100644 index 0000000..857ca5e Binary files /dev/null and b/images/target/rp80max3.jpg differ diff --git a/images/trans/_4point.jpg b/images/trans/_4point.jpg new file mode 100644 index 0000000..995c148 Binary files /dev/null and b/images/trans/_4point.jpg differ diff --git a/images/trans/subRawImg.jpg b/images/trans/subRawImg.jpg new file mode 100644 index 0000000..1889c00 Binary files /dev/null and b/images/trans/subRawImg.jpg differ diff --git a/images/trans/template.jpg b/images/trans/template.jpg new file mode 100644 index 0000000..685cfa0 Binary files /dev/null and b/images/trans/template.jpg differ diff --git a/images/trans/transformed_image.jpg b/images/trans/transformed_image.jpg new file mode 100644 index 0000000..9764f23 Binary files /dev/null and b/images/trans/transformed_image.jpg differ diff --git a/logSet.py b/logSet.py new file mode 100644 index 0000000..99e4a4d --- /dev/null +++ b/logSet.py @@ -0,0 +1,38 @@ +import logging +import os +from logging.handlers import RotatingFileHandler + + +def log_init(): + # 确保logs目录存在 + if not os.path.exists('logs'): + os.makedirs('logs') + # 创建格式器 + formatter = logging.Formatter( + fmt='%(asctime)s.%(msecs)03d %(name)s %(levelname)s %(module)s.%(funcName)s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # 配置根日志记录器 + root_logger = logging.getLogger() + # 清除现有的处理器,避免重复添加 + root_logger.handlers.clear() + + # 创建文件处理器 + file_handler = RotatingFileHandler( + filename='logs/app.log', + maxBytes=1024*1024, # 1MB + backupCount=10, + encoding='utf-8' # 指定编码 + ) + file_handler.setLevel(logging.INFO) + file_handler.setFormatter(formatter) + + # 创建控制台处理器 + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + console_handler.setFormatter(formatter) + + root_logger.setLevel(logging.INFO) + root_logger.addHandler(file_handler) + root_logger.addHandler(console_handler) \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..2caf7ee --- /dev/null +++ b/main.py @@ -0,0 +1,23 @@ +from datetime import datetime +import sched +import time +from time import sleep + + +# 定义任务 +def print_event(name): + print(f"EVENT: {time.time()} - {name}") +def periodic_task(interval): + print(f"Task executed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-2]}") + scheduler.enter(interval, 1, periodic_task, (interval,)) + + + +# 按装订区域中的绿色按钮以运行脚本。 +if __name__ == '__main__': + # 初始化调度器 + scheduler = sched.scheduler(time.time) + scheduler.enter(0, 1, periodic_task, (0.33,)) # 每 5 秒执行一次 + scheduler.run() + print("===============") + diff --git a/models/__pycache__/msg.cpython-313.opt-1.pyc b/models/__pycache__/msg.cpython-313.opt-1.pyc new file mode 100644 index 0000000..311ad8c Binary files /dev/null and b/models/__pycache__/msg.cpython-313.opt-1.pyc differ diff --git a/models/__pycache__/msg.cpython-313.pyc b/models/__pycache__/msg.cpython-313.pyc new file mode 100644 index 0000000..311ad8c Binary files /dev/null and b/models/__pycache__/msg.cpython-313.pyc differ diff --git a/models/__pycache__/sampleMsg.cpython-313.opt-1.pyc b/models/__pycache__/sampleMsg.cpython-313.opt-1.pyc new file mode 100644 index 0000000..d420945 Binary files /dev/null and b/models/__pycache__/sampleMsg.cpython-313.opt-1.pyc differ diff --git a/models/__pycache__/sampleMsg.cpython-313.pyc b/models/__pycache__/sampleMsg.cpython-313.pyc new file mode 100644 index 0000000..6461415 Binary files /dev/null and b/models/__pycache__/sampleMsg.cpython-313.pyc differ diff --git a/models/__pycache__/target.cpython-313.opt-1.pyc b/models/__pycache__/target.cpython-313.opt-1.pyc new file mode 100644 index 0000000..93c654c Binary files /dev/null and b/models/__pycache__/target.cpython-313.opt-1.pyc differ diff --git a/models/__pycache__/target.cpython-313.pyc b/models/__pycache__/target.cpython-313.pyc new file mode 100644 index 0000000..b5d43f9 Binary files /dev/null and b/models/__pycache__/target.cpython-313.pyc differ diff --git a/models/msg.py b/models/msg.py new file mode 100644 index 0000000..c59606d --- /dev/null +++ b/models/msg.py @@ -0,0 +1,19 @@ +import json +import typing +from dataclasses import dataclass +from dataclasses_json import dataclass_json + +@dataclass_json +@dataclass +class Msg: + _from: str + cmd: str + values: typing.Any + + def to_json_(self) -> str: + """将数据类序列化为 JSON 字符串""" + return self.to_json() + + + + diff --git a/models/sampleMsg.py b/models/sampleMsg.py new file mode 100644 index 0000000..bf49b3f --- /dev/null +++ b/models/sampleMsg.py @@ -0,0 +1,65 @@ +import json +from dataclasses import dataclass +from typing import List + +@dataclass +class SensorImg: + base64: str + + def to_json(self) -> str: + """将数据类序列化为 JSON 字符串""" + return json.dumps(self.__dict__, indent=4, default=lambda x: x.__dict__) + + @classmethod + def from_json(cls, json_str: str) -> 'SensorImg': + """从 JSON 字符串反序列化为数据类""" + data_dict = json.loads(json_str) + return cls(**data_dict) +@dataclass +class SensorData: + pos: str + desc: str + x: float + y: float + + def to_json(self) -> str: + """将数据类序列化为 JSON 字符串""" + return json.dumps(self.__dict__, indent=4, default=lambda x: x.__dict__) + + @classmethod + def from_json(cls, json_str: str) -> 'SensorData': + """从 JSON 字符串反序列化为数据类""" + data_dict = json.loads(json_str) + return cls(**data_dict) + +@dataclass +class AlertData: + pos: str + timeOut: float + + def to_json(self) -> str: + """将数据类序列化为 JSON 字符串""" + return json.dumps(self.__dict__, indent=4, default=lambda x: x.__dict__) + + @classmethod + def from_json(cls, json_str: str) -> 'AlertData': + """从 JSON 字符串反序列化为数据类""" + data_dict = json.loads(json_str) + return cls(**data_dict) + + +@dataclass +class AllSensorData: + data: List[SensorData] + time: str + + +@dataclass +class AllImg: + image: SensorImg + time: str + +@dataclass +class AllAlert: + alert: List[AlertData] + time: str \ No newline at end of file diff --git a/models/target.py b/models/target.py new file mode 100644 index 0000000..b4f36b6 --- /dev/null +++ b/models/target.py @@ -0,0 +1,133 @@ +import logging +from dataclasses import dataclass, field +from datetime import datetime +from numbers import Number +from typing import Optional, Dict, Deque +from collections import deque +import numpy as np +from dataclasses_json import dataclass_json, config + +import utils + + +@dataclass_json +@dataclass +class Point: + x:float + y:float + def __iter__(self): # 使对象可迭代,可直接转为元组 + yield self.x + yield self.y + +@dataclass +class RectangleArea: + x: int + y: int + w: int + h: int + @classmethod + def from_dict(cls, data: dict): + return cls( + x=data['x'], + y=data['y'], + w=data['w'], + h = data['h']) + +@dataclass +class Threshold: + binary: int + gauss: int + gradient: int=100 + anchor: int=80 + +@dataclass_json +@dataclass +class TargetInfo: + # 标靶方形区域 + rectangle_area:RectangleArea + threshold:Threshold + # 标靶物理半径 + radius:float=0.0 + id:int =-1 + desc:str="" + base:bool=False + def __init__(self,id,desc,rectangle_area:RectangleArea,radius,threshold:Threshold,base:bool,**kwargs): + self.id = id + self.desc = desc + self.rectangle_area=rectangle_area + self.radius=radius + self.threshold=threshold + self.base=base + + @classmethod + def from_dict(cls,data: dict): + return cls(data['id'],data['rectangle_area'],data['radius']) + +@dataclass_json +@dataclass +class HandlerInfo: + is_init:bool=True + last_detected_time:datetime=datetime.now() + radius_pix:float= 1.0 + pix_length:float=0.0 + # 标靶中心 + center_point_queue:Deque[Point] = field(default_factory=lambda: deque(maxlen=10)) + center_point: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) + center_init : Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) + # 标靶位移(像素) + displacement_pix: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) + displacement_phy: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) + + def calculate_mean(self)-> Point: + """计算队列中所有数据的均值""" + if self.center_point_queue: + length=len(self.center_point_queue) + mean_x = sum(p.x for p in self.center_point_queue) / length + mean_y = sum(p.y for p in self.center_point_queue) / length + return Point(mean_x, mean_y) + else: + return None + + + def enqueue_center_point(self, data) -> Point: + """入队操作""" + self.center_point_queue.append(data) + return self.calculate_mean() + +@dataclass_json +@dataclass +class CircleTarget: + # 标靶方形区域 + info:TargetInfo + # 标靶透视矩阵 + perspective: np.ndarray = field( + metadata=config( + encoder=utils.encode_perspective, + decoder=utils.decode_perspective + ) + ) + handler_info: Optional[HandlerInfo]=None + + + @classmethod + def init_by_info(cls,t:TargetInfo): + return CircleTarget(t,None,None,None,None) + + def circle_displacement_pix(self): + previous = self.handler_info.center_init + if previous != (): + self.handler_info.displacement_pix = Point(self.handler_info.center_point.x - previous.x, + self.handler_info.center_point.y - previous.y) + return self + def circle_displacement_phy(self): + if (self.info.radius != 0 + and self.handler_info.displacement_pix is not None + and self.handler_info.radius_pix !=0): + # 单位像素->距离 + self.handler_info.pix_length = self.info.radius / self.handler_info.radius_pix + offset_x = round(float(self.handler_info.displacement_pix.x * self.handler_info.pix_length), 3) + offset_y = round(float(self.handler_info.displacement_pix.y * self.handler_info.pix_length), 3) + # logging.info(f"[{self.info.desc}] phy=> x={self.handler_info.displacement_pix.x}*{self.handler_info.pix_length} ={offset_x}") + # logging.info(f"[{self.info.desc}] phy=> y={self.handler_info.displacement_pix.y}*{self.handler_info.pix_length} ={offset_y}") + self.handler_info.displacement_phy = Point(offset_x, offset_y) + return self \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..683aab3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +numpy~=2.2.2 +dataclasses-json~=0.6.7 +flask~=3.1.0 +matplotlib~=3.10.3 +requests~=2.32.3 +opencv-python~=4.10 +paho-mqtt~=2.1.0 \ No newline at end of file diff --git a/tcp_Ser.py b/tcp_Ser.py new file mode 100644 index 0000000..7c65dea --- /dev/null +++ b/tcp_Ser.py @@ -0,0 +1,158 @@ +import logging +import socket +import threading + +from models.msg import Msg + + +class TcpSer(threading.Thread): + # 定义服务器地址和端口 + HOST = '127.0.0.1' + PORT = 2230 + def __init__(self,host,port): + super().__init__() + self.HOST=host + self.PORT=port + self.connected_clients=[] + # 消费者 + self.consumers=[] + self.lock = threading.Lock() + self.running = True + # self.daemon = True + # 处理客户端连接的函数 + def handle_client(self,client_socket:socket): + # 保持连接,直到客户端断开 + while self.running: + # 保持连接,直到客户端断开 + try: + # 接收客户端数据(如果需要) + data = client_socket.recv(1024*8) + msg_str=data.decode('utf-8') + if not data: + break # 如果没有数据,退出循环 + print(f"从 {client_socket.getpeername()} 收到: {msg_str}") + # 反序列化为 实例 + s_cmd = Msg.from_json(msg_str) + valid_msg:bool=True + match s_cmd.cmd: + case "getBase": + self.on_data(s_cmd, valid_msg) + case "getPoints" | "setPoints": + self.on_data(s_cmd,valid_msg) + case "videoFps"| "dataFps": + self.on_data(s_cmd,valid_msg) + case "setCap": + self.on_data(s_cmd,valid_msg) + case "videoMode": + self.on_data(s_cmd,valid_msg) + case "clearZero": + self.on_data(s_cmd, valid_msg) + # todo 添加处理 + case _: + valid_msg = False + err_msg=f"valid cmd={s_cmd.cmd}" + resp=f"""{{"_from": "dev","cmd": "{s_cmd.cmd}","values": {{"operate": false,"err": "{err_msg}"}}}}""" + resp += "\n\n" + client_socket.sendall(resp.encode()) + logging.warn(f"非法命令 {resp}") + logging.info("通讯完成") + except ConnectionAbortedError: + logging.warn("客户端已断开") + self.clear_client(client_socket) + return + except Exception as e: + err_msg = f"illegal content" + resp = f"""{{"_from": "dev","cmd": "","values": {{"operate": false,"err": "{err_msg}"}}}}""" + resp += "\n\n" + + client_socket.sendall(resp.encode()) + logging.warn(f"处理客户端命令出错: {e}") + self.clear_client(client_socket) + return + finally: + pass + + + + # 注册的消费者必须携带on_data 方法 + def add_subscribe(self,consumer): + if hasattr(consumer, 'on_data'): + print(f"加入 consumer {consumer} ") + self.consumers.append(consumer) + else: + print("consumer 缺少on_data函数,订阅无效 ") + + def on_data(self, msg, valid): + for consumer in self.consumers: + try: + resp=consumer.on_data(msg) + self.broadcast_message(resp) + except Exception as e: + logging.warn("通讯异常",e) + + # 广播消息给所有连接的客户端 + def broadcast_message(self,msg:str): + with self.lock: + if len(msg)==0: + return + + msg+="\n\n" + for client in self.connected_clients: + try: + client.sendall(msg.encode()) + except Exception as e: + # 如果发送失败,从列表中移除客户端 + if client in self.connected_clients: + self.connected_clients.remove(client) + client.close() + print(f"向客户端发送消息时出错: {e}") + + def clear_client(self,client:socket): + try: + if client in self.connected_clients: + self.connected_clients.remove(client) + client.close() + except Exception as e: + logging.warn(f"清理客户端,异常: {e}") + + + def run(self): + # 创建服务器套接字 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind((self.HOST,self.PORT)) + server_socket.listen() + server_socket.settimeout(1.0) # 设置 accept 超时时间为 1 秒 + print(f"服务器监听在 {self.HOST}:{self.PORT}...") + + try: + # 保持服务器运行并接受新的连接 + while self.running: + try: + client_socket, addr = server_socket.accept() + print(f"连接来自 {addr}") + + # 当客户端连接时,将其添加到列表中 + self.connected_clients.append(client_socket) + + # 为每个客户端启动一个线程 + client_thread = threading.Thread(target=self.handle_client, args=(client_socket,)) + # client_thread.daemon = True # 守护线程,服务器关闭时自动结束 + client_thread.start() + except socket.timeout: + continue # 超时继续循环 + except KeyboardInterrupt: + print("服务器关闭...") + finally: + # 关闭所有客户端连接 + for client in self.connected_clients: + print(f"断开客户端 {client_socket.getpeername()}") + client.close() + server_socket.close() + def stop(self): + """外部调用此方法停止服务""" + self.running = False + +if __name__ == '__main__': + tcp=TcpSer("127.0.0.1",2230) + tcp.run() \ No newline at end of file diff --git a/upload/DataReporter.py b/upload/DataReporter.py new file mode 100644 index 0000000..474ac7a --- /dev/null +++ b/upload/DataReporter.py @@ -0,0 +1,149 @@ +import logging +import queue +import sched +import threading +import time +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime +import models.msg +from upload.RateLimiter import RateLimiter + + +class DataReporter(threading.Thread): + def __init__(self,data_fps:int,video_fps:int=0): + super().__init__() + self.alert_queue = queue.Queue(maxsize=1) # 数据队列 + self.image_queue = queue.Queue(maxsize=video_fps) # 图片队列 + self.data_queue = queue.Queue(maxsize=data_fps) # 数据队列 + self.image_limiter = RateLimiter(max_rate=video_fps, time_window=1) # 弃用 图片限速: 1张/秒 + self.data_limiter = RateLimiter(max_rate=data_fps, time_window=1) # 数据限速: 30条/秒 + self.running = True + self.image_dropped = 0 # 统计丢弃的图片数量 + self.data_dropped = 0 # 统计丢弃的数据数量 + self.daemon = True + self.callbacks = [] # 存储多个回调函数 + self.callback_executor = ThreadPoolExecutor(max_workers=4) # 创建线程池 + self.last_data = None # 存储最后一条数据 + self.last_repeat_counts = 0 # 存储最后一条数据 + + self.scheduler = sched.scheduler(time.time) + self.schedulerEvent = None + + + def register_handler(self,handler_fun): + self.callbacks.append(handler_fun) + def run(self): + interval = 1.0 / self.data_limiter.max_rate + self.schedulerEvent=self.scheduler.enter(0, 1, self._schedule_data_reporting, (interval,)) + self.scheduler.run() + + def _schedule_data_reporting(self,interval): + if self.running: + self.scheduler.enter(interval, 1, self._schedule_data_reporting, (interval,)) # 每 x 秒执行一次 + self._report_data_if_allowed() + + def _report_data_if_allowed(self): + # logging.info(f"Reporting data -> start") + if self.data_limiter.allow_request(): + if not self.data_queue.empty(): + try: + data = self.data_queue.get_nowait() + self.last_data = data + self.last_repeat_counts = 0 + self._report_data(data, self.last_repeat_counts) + except queue.Empty: + pass + elif self.last_data is not None and self.last_repeat_counts < 5: + self.last_repeat_counts += 1 + self._report_data(self.last_data, self.last_repeat_counts) + def run2(self): + while self.running: + + # alert上报 + if not self.alert_queue.empty(): + try: + alert_data = self.alert_queue.get_nowait() + self._report_alert(alert_data) + except queue.Empty: + pass + + # image上报 + # if not self.image_queue.empty() and self.image_limiter.allow_request(): + # try: + # image_data = self.image_queue.get_nowait() + # self._report_image(image_data) + # except queue.Empty: + # pass + + # 然后处理数据上报 + if self.data_limiter.allow_request(): + if not self.data_queue.empty(): + try: + data = self.data_queue.get_nowait() + self.last_data = data # 保存最后一条数据 + self.last_repeat_counts =0 + self._report_data(data,self.last_repeat_counts) + except queue.Empty: + pass + elif self.last_data is not None: + # 如果队列为空但有限速许可,并且存在最后一条数据,则重复上报最后一条数据 + if self.last_repeat_counts < 5: + self.last_repeat_counts+=1 + self._report_data(self.last_data, self.last_repeat_counts) + else: + # logging.warn("不允许上报") + pass + + time.sleep(0.02) # 避免CPU占用过高 + + def _execute_callbacks(self, msg_json): + """ + 执行所有注册的回调函数 + """ + for callback in self.callbacks: + try: + # 提交到线程池异步执行 + self.callback_executor.submit(callback, msg_json) + except Exception as e: + print(f"Error executing callback {callback.__name__}: {e}") + def _report_alert(self, data): + print(f"Reporting alert, timestamp: {data[0]}") + # 这里替换为实际的上报代码 + msg = models.msg.Msg(_from="dev", cmd="alert", values=data[1]) + msg_json = msg.to_json() + self._execute_callbacks(msg_json) + + def _report_image(self, data): + # 实现图片上报逻辑 + print(f"Reporting image, timestamp: {data[0]}") + # 这里替换为实际的上报代码 + msg = models.msg.Msg(_from="dev", cmd="image", values=data[1]) + msg_json = msg.to_json() + self._execute_callbacks(msg_json) + + def _report_data(self, data, last_repeat_counts=0): + # 实现数据上报逻辑 + logging.info(f"Reporting [{last_repeat_counts}] data: {data}") + # 实际的上报代码,数据结构转换 + msg=models.msg.Msg(_from="dev",cmd="data",values=data[1]) + # 重新格式化时间,数据等间隔 + msg.values.time=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + msg_json=msg.to_json() + self._execute_callbacks(msg_json) + + def stop(self): + self.running = False + self.join() + print(f"Stats: {self.data_dropped} data dropped") + + def adjust_rate(self, new_rate, data_type='image'): + if data_type == 'image': + with self.image_limiter.lock: + self.image_limiter.max_rate = new_rate + self.image_queue= queue.Queue(maxsize=new_rate) + self.image_limiter.update_interval() + else: + with self.data_limiter.lock: + self.data_limiter.max_rate = new_rate + self.data_queue = queue.Queue(maxsize=new_rate) + self.data_limiter.update_interval() \ No newline at end of file diff --git a/upload/DroppingQueue.py b/upload/DroppingQueue.py new file mode 100644 index 0000000..d520248 --- /dev/null +++ b/upload/DroppingQueue.py @@ -0,0 +1,12 @@ +import queue + + +class DroppingQueue(queue.Queue): + """自定义队列,满时自动丢弃最旧的数据""" + def put(self, item, block=False, timeout=None): + try: + return super().put(item, block=block, timeout=timeout) + except queue.Full: + # 队列满时丢弃最旧的一个数据 + self.get_nowait() + return super().put(item, block=False) \ No newline at end of file diff --git a/upload/RateLimiter.py b/upload/RateLimiter.py new file mode 100644 index 0000000..1836986 --- /dev/null +++ b/upload/RateLimiter.py @@ -0,0 +1,29 @@ +import logging +import threading +import queue +import time +from collections import deque + +class RateLimiter: + def __init__(self, max_rate, time_window): + self.max_rate = max_rate + self.time_window = time_window + self.lock = threading.Lock() + self.current_time = time.time() + self.interval = time_window / max_rate if max_rate > 0 else 0 + self.next_available_time = self.current_time + + def update_interval(self): + self.interval = self.time_window / self.max_rate if self.max_rate > 0 else 0 + + def allow_request(self): + with self.lock: + if self.interval<=0: + return False + current_time = time.time() + if current_time >= self.next_available_time: + self.next_available_time = current_time + self.interval + #logging.info(f"rate[{self.max_rate}]下次请求 now={current_time} < {self.next_available_time}") + return True + #logging.info(f"rate[{self.max_rate}]不准请求 now={current_time} < {self.next_available_time}") + return False diff --git a/upload/__pycache__/DataReporter.cpython-313.opt-1.pyc b/upload/__pycache__/DataReporter.cpython-313.opt-1.pyc new file mode 100644 index 0000000..7be05c7 Binary files /dev/null and b/upload/__pycache__/DataReporter.cpython-313.opt-1.pyc differ diff --git a/upload/__pycache__/DataReporter.cpython-313.pyc b/upload/__pycache__/DataReporter.cpython-313.pyc new file mode 100644 index 0000000..2747261 Binary files /dev/null and b/upload/__pycache__/DataReporter.cpython-313.pyc differ diff --git a/upload/__pycache__/RateLimiter.cpython-313.opt-1.pyc b/upload/__pycache__/RateLimiter.cpython-313.opt-1.pyc new file mode 100644 index 0000000..9e7ea12 Binary files /dev/null and b/upload/__pycache__/RateLimiter.cpython-313.opt-1.pyc differ diff --git a/upload/__pycache__/RateLimiter.cpython-313.pyc b/upload/__pycache__/RateLimiter.cpython-313.pyc new file mode 100644 index 0000000..d076e90 Binary files /dev/null and b/upload/__pycache__/RateLimiter.cpython-313.pyc differ diff --git a/upload/__pycache__/mqttHelper.cpython-313.pyc b/upload/__pycache__/mqttHelper.cpython-313.pyc new file mode 100644 index 0000000..1f70d61 Binary files /dev/null and b/upload/__pycache__/mqttHelper.cpython-313.pyc differ diff --git a/upload/mqttHelper.py b/upload/mqttHelper.py new file mode 100644 index 0000000..fb9feae --- /dev/null +++ b/upload/mqttHelper.py @@ -0,0 +1,107 @@ +import json +import logging +import random +import time +from datetime import datetime + +from paho.mqtt import client as mqtt_client + + +class MQTTClient: + def __init__(self, broker='218.3.126.49', port=1883, topic="wybb/mqtt", + username='emqx', password='public',client_id=''): + self.BROKER = broker + self.PORT = port + self.TOPIC = topic + # generate client ID with pub prefix randomly + self.CLIENT_ID = client_id if client_id!='' else f'wybb_{datetime.now().strftime("%Y%m%d_%H%M%S")}' + + self.USERNAME = username + self.PASSWORD = password + + self.FIRST_RECONNECT_DELAY = 1 + self.RECONNECT_RATE = 2 + self.MAX_RECONNECT_COUNT = 10 + self.MAX_RECONNECT_DELAY = 60 + + self.FLAG_EXIT = False + self.client = None + + def on_connect(self, client, userdata, flags, rc, properties=None): + if rc == 0 and client.is_connected(): + print("Connected to MQTT Broker!") + else: + print(f'Failed to connect, return code {rc}') + + def on_disconnect(self, client, userdata, flags, rc, properties=None): + logging.info("Disconnected with result code: %s", rc) + reconnect_count, reconnect_delay = 0, self.FIRST_RECONNECT_DELAY + while reconnect_count < self.MAX_RECONNECT_COUNT: + logging.info("Reconnecting in %d seconds...", reconnect_delay) + time.sleep(reconnect_delay) + + try: + client.reconnect() + logging.info("Reconnected successfully!") + return + except Exception as err: + logging.error("%s. Reconnect failed. Retrying...", err) + + reconnect_delay *= self.RECONNECT_RATE + reconnect_delay = min(reconnect_delay, self.MAX_RECONNECT_DELAY) + reconnect_count += 1 + logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count) + self.FLAG_EXIT = True + + def connect(self): + self.client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2, self.CLIENT_ID) + self.client.username_pw_set(self.USERNAME, self.PASSWORD) + self.client.on_connect = self.on_connect + self.client.on_disconnect = self.on_disconnect + self.client.connect(self.BROKER, self.PORT, keepalive=120) + return self.client + + def publish(self, msg:str): + if not self.client: + logging.error("MQTT client is not initialized!") + return + + if not self.client.is_connected(): + logging.error("publish: MQTT client is not connected!") + time.sleep(1) + return + result = self.client.publish(self.TOPIC, msg) + # result: [0, 1] + status = result[0] + if status == 0: + # logging.info(f'Send topic [{self.TOPIC}] to `{msg}` ') + pass + else: + logging.info(f'Failed to send topic [{self.TOPIC}]') + + def start(self): + self.client = self.connect() + self.client.loop_start() + time.sleep(1) + if self.client.is_connected(): + logging.info("MQTT client is connected!") + else: + self.client.loop_stop() + return False + return True + + def stop(self): + if self.client: + self.client.loop_stop() + self.client.disconnect() + + +# 示例用法 +if __name__ == '__main__': + mq = MQTTClient() + if mq.start(): + msg = {"a": 123} + mq.publish(msg) + time.sleep(2) # 等待消息发送完成 + mq.stop() + print("==========") diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..ebec607 --- /dev/null +++ b/utils.py @@ -0,0 +1,38 @@ +from typing import Dict, List + +import cv2 +import base64 + +import numpy as np +def frame_to_img(frame, format="JPEG"): + """将 OpenCV 读取的图片帧转换为 img""" + # 将图片帧编码为 JPEG 或 PNG 格式 + if format.upper() == "JPEG": + encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 80] # JPEG 压缩质量 + elif format.upper() == "PNG": + encode_param = [int(cv2.IMWRITE_PNG_COMPRESSION), 9] # PNG 压缩级别 + else: + raise ValueError("Unsupported format. Use 'JPEG' or 'PNG'.") + + result, encoded_img = cv2.imencode(f".{format.lower()}", frame, encode_param) + return result,encoded_img + +def frame_to_base64(frame, format="JPEG"): + """将 OpenCV 读取的图片帧转换为 Base64 编码的字符串""" + result, encoded_img = frame_to_img(frame, format=format) + if not result: + raise ValueError("Failed to encode frame.") + + # 将编码后的字节流转换为 Base64 字符串 + base64_string = base64.b64encode(encoded_img).decode("utf-8") + return base64_string + + +# 自定义编码器和解码器 +def encode_perspective(value: np.ndarray) -> List[List[float]]: + if value is None: + return [] + return value.tolist() + +def decode_perspective(value: List[List[float]]) -> np.ndarray: + return np.array(value) \ No newline at end of file diff --git a/videoDetection.py b/videoDetection.py new file mode 100644 index 0000000..6c94c67 --- /dev/null +++ b/videoDetection.py @@ -0,0 +1,665 @@ +import copy +import threading +from dataclasses import asdict +from datetime import datetime +import json +import queue +import time +from time import sleep +from typing import Tuple + +import cv2 +import numpy as np +import logging + +import configSer +import models.target +import models.sampleMsg +import upload.DataReporter +import utils +import videoPush +from models.msg import Msg + +logging.basicConfig(level=logging.DEBUG) +drawing: bool = False # 是否正在绘制 +is_video_mode: bool = False # 是否采用标靶图覆盖 +# 定义点 +start_point: models.target.Point +end_point: models.target.Point +# 配置 +configObj:configSer.ConfigOperate +reporter:upload.DataReporter.DataReporter +# 鼠标回调函数 +def add_rectangle(event, x, y, flags, param): + global start_point, end_point, drawing + + if event == cv2.EVENT_LBUTTONDOWN: # 左键按下 + logging.info("左键按下") + start_point = models.target.Point(x, y) + end_point = start_point + drawing = True + elif event == cv2.EVENT_MOUSEMOVE: # 鼠标移动 + if drawing: + end_point = models.target.Point(x, y) + elif event == cv2.EVENT_LBUTTONUP: # 左键抬起 + logging.info("左键抬起") + drawing = False + end_point = models.target.Point(x, y) + if start_point == end_point: + return + distance = cv2.norm(tuple(start_point), tuple(end_point), cv2.NORM_L2) + if distance < 20: + logging.info("距离小于20,无效区域") + return + target_id = len(configObj.config_info.targets) + # 圆标靶半径 mm + radius = 40.0 + area=models.target.RectangleArea(int(start_point.x),int(start_point.y), + int(end_point.x-start_point.x),int(end_point.y-start_point.y)) + num=len(configObj.config_info.targets.items()) + t_info=models.target.TargetInfo( target_id, + f"bb_{num}", + area, + radius, + models.target.Threshold(120,1), + False) + new_target = models.target.CircleTarget(t_info,None,None) + logging.info(f"新增区域[{target_id}] => {start_point, end_point}") + configObj.config_info.targets[target_id] = new_target + +def read_target_rectangle(): + return configObj.config_info.targets + +class VideoProcessor: + reporter: upload.DataReporter.DataReporter + capture: cv2.VideoCapture + capturePath: str="" + is_opened: bool= False + is_running = True + is_clear_zero:bool=False + # 全局配置锁 + targets_lock: threading.Lock = threading.Lock() + def __init__(self): + print("初始化 VideoProcessor") + pass + + def on_data(self,msg:Msg): + global configObj,is_video_mode + logging.info(f"msg={msg}") + match msg.cmd: + case "getBase": + base_info = asdict(configObj.config_info) + base_info.pop("targets",None) + base_info.pop("server", None) + base_info.pop("upload", None) + + resp_msg = models.msg.Msg(_from="dev", cmd="getBase", values={"base": base_info}) + resp_json = resp_msg.to_json_() + return resp_json + case "getPoints": + targets=copy.deepcopy(configObj.config_info.targets) + for k,v in targets.items(): + targets[k].handler_info=None + + resp_msg = models.msg.Msg(_from="dev", cmd="getPoints", values={"targets": targets}) + resp_json = resp_msg.to_json_() + return resp_json + case "setPoints": + v=msg.values + ts=v["targets"] + + with self.targets_lock: + # 清空原配置 + configObj.config_info.targets={} + for _,t in ts.items(): + t_str=json.dumps(t) + new_c_target = models.target.CircleTarget.from_json(t_str) + configObj.config_info.targets[new_c_target.info.id] =new_c_target + + configObj.save2json_file() + resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True}) + resp_json = resp_msg.to_json() + return resp_json + case "videoFps": + v = msg.values + fps = v["fps"] + self.reporter.adjust_rate(fps,"image") + configObj.config_info.fps.video = fps + configObj.save2json_file() + resp_msg = models.msg.Msg(_from="dev", cmd="videoFps", values={"operate": True}) + resp_json = resp_msg.to_json() + return resp_json + case "dataFps": + v = msg.values + fps = v["fps"] + self.reporter.adjust_rate(fps,"data") + configObj.config_info.fps.data=fps + configObj.save2json_file() + resp_msg = models.msg.Msg(_from="dev", cmd="dataFps", values={"operate": True}) + resp_json = resp_msg.to_json() + return resp_json + case "setCap": + v = msg.values + cap = v["cap"] + self.switch_video(cap) + resp_msg = models.msg.Msg(_from="dev", cmd="setCap", values={"operate": True}) + resp_json = resp_msg.to_json() + return resp_json + case "videoMode": + v = msg.values + is_debug = v["debug"] + is_video_mode=is_debug + resp_msg = models.msg.Msg(_from="dev", cmd="videoMode", values={"operate": True}) + resp_json = resp_msg.to_json() + return resp_json + case "clearZero": + self.is_clear_zero=True + resp_msg = models.msg.Msg(_from="dev", cmd="clearZero", values={"operate": True}) + resp_json = resp_msg.to_json() + return resp_json + print("==") + + + def pre_handler_img(self,gray_frame,now_str:str): + # 将灰度图压缩为 JPEG 格式,并存储到内存缓冲区 + img_base64 = utils.frame_to_base64(gray_frame, format="JPEG") + all_img = models.sampleMsg.AllImg(image=img_base64, time=now_str) + self.enqueue_image(all_img) + + def draw_rectangle(self,img): + global configObj,is_video_mode + + gray_frame = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + + now_time = datetime.now() + now_str = now_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + #图像发送 + if configObj.config_info.fps.video > 0: + self.pre_handler_img(gray_frame,now_str) + + + if len(configObj.config_info.targets)==0: return + + # 基点 + base_point_pix:models.target.Point=None + + #上报有新数据的点 + all_upload_data = models.sampleMsg.AllSensorData(data=[], time=now_str) + + with self.targets_lock: + # 绘图-历史点 + for i, tr in configObj.config_info.targets.items(): + # logging.info(f"检测--> 标靶[{tr.info.desc}]") + if not hasattr(tr, "info"): + print("====") + _start_point = models.target.Point(tr.info.rectangle_area.x, tr.info.rectangle_area.y) + _end_point = models.target.Point( + tr.info.rectangle_area.x+tr.info.rectangle_area.w, + tr.info.rectangle_area.y+tr.info.rectangle_area.h) + #绘制标靶区域 + blue_color = (255, 0, 0) + if tr.info.base: + blue_color=(200, 0, 200) #紫红色 基点 + + cv2.rectangle(img,tuple(_start_point), tuple(_end_point), blue_color, 2) + label=f"{tr.info.desc},r={tr.info.radius}" + cv2.putText(img, label, (_start_point.x,_start_point.y-6), cv2.FONT_HERSHEY_SIMPLEX, 0.6, blue_color,1) + #检测 + # 获取图像尺寸 + frame_height, frame_width = gray_frame.shape[:2] + if _end_point.x>frame_width or _end_point.y>frame_height: + logging.warn(f"标靶[{tr.info.desc}]sub_image 超出区域") + continue + sub_image = self.extract_sub_image(gray_frame, _start_point, _end_point) + + + # ret, sub_binary_frame = cv2.threshold(sub_image, tr.info.threshold.binary, 255, cv2.THRESH_BINARY) + sub_binary_frame=sub_image + # 应用滤波 + # sub_binary_frame =cv2.bilateralFilter(sub_binary_frame, 4, 15, 15) + + # sub_binary_frame = cv2.cvtColor(sub_binary_frame, cv2.COLOR_BGR2GRAY) + + # 高斯滤波 + gauss_size=tr.info.threshold.gauss + if gauss_size >1: + sub_binary_frame = cv2.GaussianBlur(sub_binary_frame, (gauss_size, gauss_size), sigmaX=0,sigmaY=0,borderType=cv2.BORDER_REPLICATE) + # sub_binary_frame = cv2.bilateralFilter(sub_binary_frame, 5, 50, 50) + if tr.perspective is not None: + # 宽度 + sub_width = sub_binary_frame.shape[1] + # 高度 + sub_height = sub_binary_frame.shape[0] + + # 先禁用图像变换 + # sub_binary_frame = cv2.warpPerspective(sub_binary_frame, tr.perspective, (sub_width, sub_height)) + + + # 覆盖原图 + if is_video_mode: + sub_c_img= cv2.cvtColor(sub_binary_frame, cv2.COLOR_GRAY2BGR) + self.cover_sub_image(img,sub_c_img, _start_point, _end_point) + + # if __debug__: + # cv2.imshow(f'{tr.info.id}_binaryImg', sub_binary_frame) + # + + + circles = self.circle_detect_Edges(sub_binary_frame,tr.info.threshold.gradient,tr.info.threshold.anchor) + + if __debug__: + cv2.imshow(f'{tr.info.id}_binaryImg', sub_binary_frame) + + if len(circles) == 0: + continue + elif len(circles) > 1: + logging.info(f"标靶[{tr.info.desc}],匹配圆{len(circles)}个") + i=0 + for circle in circles: + logging.info(f"标靶[{tr.info.desc}],圆{i}半径={circle[2]}") + i+=1 + + center,radius_pix=self.circle_match(circles,_start_point) + # 纪录圆心位置 + if tr.handler_info is None: + tr.handler_info= models.target.HandlerInfo() + if tr.handler_info.is_init: + tr.handler_info.is_init=False + tr.handler_info.center_init = center + + # 数据窗口平滑处理 + # smooth_center = tr.handler_info.enqueue_center_point(center) + # tr.handler_info.center_point = smooth_center + # print(f"{tr.info.desc},平滑len={len(tr.handler_info.center_point_queue)},平滑中心点={smooth_center},原始点={center}") + + # 原始处理 + tr.handler_info.center_point=center + + + tr.handler_info.radius_pix=radius_pix + tr.circle_displacement_pix() + + + #纪录时间 + tr.handler_info.last_detected_time = now_time + + # 基点 + if tr.info.base: + base_point_pix=tr.handler_info.displacement_pix + + # 基于像素点计算 物理距离 + for i, tr in configObj.config_info.targets.items(): + + if tr.handler_info is None: + continue + if tr.handler_info.displacement_pix is None: + continue + # 减去基点偏移 + if base_point_pix is not None: + raw_point=tr.handler_info.displacement_pix + tr.handler_info.displacement_pix=models.target.Point( + x=raw_point.x-base_point_pix.x, + y=raw_point.y-base_point_pix.y) + if not tr.info.base: + # print(f"[{tr.info.id}]{tr.info.desc} 原偏 {raw_point} - 基偏{base_point_pix} ={tr.handler_info.displacement_pix}") + pass + tr.circle_displacement_phy() + if tr.handler_info.displacement_phy is not None: + all_upload_data.data.append( + models.sampleMsg.SensorData( + str(tr.info.id), + tr.info.desc, + tr.handler_info.displacement_phy.x, + tr.handler_info.displacement_phy.y) + ) + # 画圆 + self.circle_show_phy(img,tr) + + tr.handler_info.displacement_pix=None + tr.handler_info.displacement_phy = None + + if self.is_clear_zero: + logging.info(f"执行清零") + for t_id, target in configObj.config_info.targets.items(): + if target.handler_info is not None: + target.handler_info.center_init = target.handler_info.center_point + self.is_clear_zero = False + + + #过滤无效空数据 + if len(all_upload_data.data)==0: + return + + if configObj.config_info.fps.data > 0: + self.enqueue_data(all_upload_data) + + + + def circle_match(self,circles, rect_s_point:models.target.Point)-> (models.target.Point,float): + + circle = max(circles, key=lambda c: c[2]) + # 绘制圆心 + center = (circle[0] + rect_s_point.x, circle[1] + rect_s_point.y) + radius = float(np.round(circle[2], 3)) + cp = models.target.Point(x=round(center[0], 5), y=round(center[1], 5)) + return cp,radius + + def circle_show(self, img, + center: models.target.Point,radius:float, + rect_s_point: models.target.Point, + rect_e_point: models.target.Point): + + font = cv2.FONT_HERSHEY_SIMPLEX + color = (255, 0, 0) # 蓝色 + scale = 0.5 + center_int = tuple(int(x) for x in center) + radius_int = int(radius) + cv2.circle(img, center_int, 2, (0, 255, 0), 4) + # 绘制外圆 + cv2.circle(img, center_int, radius_int, (0, 0, 255), 2) + # 打印圆心坐标 + + text1 = f"c:{(center.x,center.y,radius)}" + txt_location = (rect_s_point.x+2,rect_e_point.y-2) + # txt_location = (center_int[0] - radius_int, center_int[1] + radius_int + 10) + cv2.putText(img, text1, txt_location, font, scale, color, 1) + + def circle_show_phy(self, img,tr: models.target.CircleTarget): + + font = cv2.FONT_HERSHEY_SIMPLEX + color = (255, 0, 0) # 蓝色 + scale = 0.5 + + center=tr.handler_info.center_point + + center_int = tuple(int(x) for x in center) + radius_int = int(tr.handler_info.radius_pix) + cv2.circle(img, center_int, 2, (0, 255, 0), 4) + # 绘制外圆 + cv2.circle(img, center_int, radius_int, (0, 0, 255), 2) + # 打印圆心坐标 + + text1 = f"c:{(center.x, center.y, tr.handler_info.radius_pix)}" + rect_s_point=tr.info.rectangle_area.x + rect_e_point=tr.info.rectangle_area.y+tr.info.rectangle_area.h-5 + txt_location = (rect_s_point,rect_e_point) + # txt_location = (center_int[0] - radius_int, center_int[1] + radius_int + 10) + cv2.putText(img, text1, txt_location, font, scale, color, 1) + txt_location2 = (rect_s_point, rect_e_point+20) + text2 = f"{tr.handler_info.displacement_phy.x,tr.handler_info.displacement_phy.y}" + cv2.putText(img, text2, txt_location2, font, scale, color, 1) + + def circle_detect(self, img) -> list: + # 圆心距 canny阈值 最小半径 最大半径 + circles_float = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 1, 30, param1=200, param2=0.8, minRadius=15, + maxRadius=0) + # 创建一个0行, 2列的空数组 + if circles_float is not None: + # 提取圆心坐标(保留2位小数) + centers = [(round(float(x),2), round(float(y),2), round(float(r),2)) for x, y, r in circles_float[0, :]] + return centers + else: + return [] + + def circle_detect_Blob(self, img) -> list: + params = cv2.SimpleBlobDetector_Params() + detector = cv2.SimpleBlobDetector.create(params) + keypoints:Tuple = detector.detect(img) + centers=[] + # 创建一个0行, 2列的空数组 + if keypoints is not None: + for kp in keypoints: + p=kp.pt + center = (round(float(p[0]),3), round(float(p[1]),3), round(float(kp.size/2),3)) + centers.append( center) + + return centers + def circle_detect_Edges(self, image,gradient:int,anchor:int) -> list[Tuple[float,float,float]]: + # 创建EdgeDrawing检测器 + ed = cv2.ximgproc.createEdgeDrawing() + # 获取参数结构体并设置参数 + params = cv2.ximgproc_EdgeDrawing_Params() + params.EdgeDetectionOperator = cv2.ximgproc.EdgeDrawing_SOBEL + params.GradientThresholdValue = gradient #100 # 梯度幅值 ≥ 40 的像素才可能成为“边缘”。 + params.AnchorThresholdValue = anchor #80 # 要求该像素的梯度幅值 ≥ Anchor 阈值 才能被选为“锚点” + ed.setParams(params) + #计时 + start = cv2.getTickCount() + # 检测边缘 + ed.detectEdges(image) + # 检测椭圆和圆形 + ellipses = ed.detectEllipses() + end_ellipses = cv2.getTickCount() + duration_ellipses = (end_ellipses - start) * 1000 / cv2.getTickFrequency() + # print(f"EdgeDrawing 椭圆检测耗时: {duration_ellipses:.2f} ms") + # 绘制椭圆和圆形 + + centers = [] + + if ellipses is None: + return centers + + for ellipseRows in ellipses: + ellipse = ellipseRows[0] # ellipses 结构为 N 1 6 的3维度数组 + # center = (int(ellipse[0]), int(ellipse[1])) + # axes = (int(ellipse[2] + ellipse[3]), int(ellipse[2] + ellipse[4])) + # angle = ellipse[5] #椭圆旋转角度(度,0~180) + center = (round(float(ellipse[0]), 3), round(float(ellipse[1]), 3), round(float(ellipse[2]), 3)) + #半径 0 忽略 + if center[2]<=0: + continue + centers.append(center) + return centers + + def extract_sub_image(self,frame, top_left, bottom_right): + """ + 从帧中截取子区域 + :param frame: 输入的视频帧 + :param top_left: 子图片的左上角坐标 (x1, y1) + :param bottom_right: 子图片的右下角坐标 (x2, y2) + :return: 截取的子图片 + """ + x1, y1 = top_left + x2, y2 = bottom_right + return frame[y1:y2, x1:x2] + + def cover_sub_image(self,frame,sub_frame, top_left, bottom_right): + x1, y1 = top_left + x2, y2 = bottom_right + frame[y1:y2, x1:x2]= sub_frame + return frame + + + def open_video(self,video_id): + sleep(1) + logging.info(f"打开摄像头 -> {video_id}") + self.capture = cv2.VideoCapture(video_id) + frame_width = int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH)) + frame_height = int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) + logging.info(f"默认分辨率= {frame_width}*{frame_height}") + logging.info(f"{video_id}地址->{self.capture}") + if not self.capture.isOpened(): + self.capture.release() + logging.warn(f"无法打开摄像头{video_id}, release地址 -> {self.capture}") + return + if frame_width==640: + self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, 4224) # 宽度 + self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 3136) # 高度 + logging.info(f"摄像头分辨率 更新->4224*3136") + self.is_opened=True + self.capturePath=video_id + fps = self.capture.get(cv2.CAP_PROP_FPS) + logging.info(f"fps={fps},video_id={video_id},") + + def switch_video(self,video_id:str): + print(f"切换摄像头 -> {video_id}") + self.is_opened = False + self.capture.release() + cv2.destroyAllWindows() + if str.isdigit(video_id): + video_id=int(video_id) + self.open_video(video_id) + + def clear_zero(self): + self.is_opened = False + + + def show_video(self): + camera_err_counts=0 + global sigExit,start_point, end_point, drawing + if __debug__: + cv2.namedWindow('Frame') + cv2.setMouseCallback('Frame', add_rectangle) + # 读取一帧图像 + while self.is_running: + if camera_err_counts >= 10: + logging.warn(f"读取摄像头异常,准备重新切换") + self.switch_video(str(self.capturePath)) + camera_err_counts = 0 + continue + + if not self.is_opened: + logging.warn(f"摄像头 标记is_opened={self.is_opened}") + sleep(5) + camera_err_counts += 1 + continue + + ret, frame = self.capture.read() + if not ret: + camera_err_counts+=1 + logging.warn(f"${camera_err_counts}次,无法读取帧,cap地址- >{self.capture}") + sleep(1) + continue + + self.frame_handle(frame) + + cv2.waitKey(1) + # 显示图像 + if frame is not None: + if __debug__: + cv2.imshow('Frame', frame) + #缓存到推流 + videoPush.update_latest_frame(frame) + logging.warn("退出VideoProcessor") + def show_image(self,frame): + global start_point, end_point, drawing + if __debug__: + cv2.namedWindow('Frame') + cv2.setMouseCallback('Frame', add_rectangle) + # 读取一帧图像 + while True: + cp_img=frame.copy() + self.frame_handle(cp_img) + + if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 + break + cv2.destroyAllWindows() + + def frame_handle(self,frame): + # logging.info(f"处理图像帧==>") + # 绘图-历史点 + self.draw_rectangle(frame) + # 绘图-实时 + if drawing: + cv2.rectangle(frame, tuple(start_point), tuple(end_point), (0, 200, 200), 4) + # print(f"鼠标位置 {start_point} -> {end_point}") + + def image_mode(self): + img_raw=cv2.imread('images/trans/_4point.jpg')#images/target/rp80max3.jpg + # img_raw = cv2.imread('images/trans/_4point.jpg') # images/target/rp80max3.jpg + # img_raw = cv2.imread('images/target/rp80.jpg') # images/target/rp80max3.jpg + self.show_image(img_raw) + + # 支持 + def video_mode(self,video_id:str): + if str.isdigit(video_id): + video_id=int(video_id) + + offline_monitor() + self.open_video(video_id) + self.show_video() + + # 释放摄像头资源并关闭所有窗口 + logging.warn("退出 video") + self.capture.release() + cv2.destroyAllWindows() + + def rtsp_mode(self,rtsp_url:str): + # rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554" + # rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554/h264/ch1/main/av_stream" + self.open_video(rtsp_url) + fps = self.capture.get(cv2.CAP_PROP_FPS) + logging.info(f"rtsp fps={fps}") + self.show_video() + # 释放摄像头资源并关闭所有窗口 + self.capture.release() + cv2.destroyAllWindows() + + def enqueue_data(self,data): + global reporter + # 获取当前时间戳 + timestamp = time.time() + # 将时间戳转换为 datetime 对象 + dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位 + # 放入图片队列(自动丢弃最旧数据当队列满时) + try: + reporter.data_queue.put((dt, data), block=False) + except queue.Full: + # self.reporter.data_dropped += 1 + pass + + + def enqueue_image(self,data): + global reporter + # 获取当前时间戳 + timestamp = time.time() + # 将时间戳转换为 datetime 对象 + dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位 + # 放入图片队列(自动丢弃最旧数据当队列满时) + try: + reporter.image_queue.put((dt, data), block=False) + except queue.Full: + pass + #self.reporter.image_dropped += 1 + + def stop(self): + self.is_running=False + self.capture.release() + +def enqueue_alert(data): + global reporter + # 获取当前时间戳 + timestamp = time.time() + # 将时间戳转换为 datetime 对象 + dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位 + try: + reporter.alert_queue.put((dt, data), block=False) + except queue.Full: + # self.reporter.data_dropped += 1 + pass + +def task_timeout(): + offline_monitor() + if not configObj.config_info.alert.enable: + return + now_time=datetime.now() + now_str = now_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + all_upload_alert = models.sampleMsg.AllAlert(alert=[], time=now_str) + for t_id, tr in configObj.config_info.targets.items(): + # 是否offline超时告警状态 + if tr.handler_info is not None: + delta_s = round((now_time - tr.handler_info.last_detected_time).total_seconds()) + if delta_s > configObj.config_info.alert.intervalSec: + ad = models.sampleMsg.AlertData(tr.info.desc, delta_s) + all_upload_alert.alert.append(ad) + if len(all_upload_alert.alert) > 0: + enqueue_alert(all_upload_alert) + logging.warn(f"标靶脱靶超时=> {all_upload_alert}") + + + +def offline_monitor(): + timer = threading.Timer(configObj.config_info.alert.intervalSec, task_timeout) + timer.start() # 启动定时器 + + diff --git a/videoPush.py b/videoPush.py new file mode 100644 index 0000000..0dbdb82 --- /dev/null +++ b/videoPush.py @@ -0,0 +1,79 @@ +import os +import signal +from time import sleep + +import threading + +import numpy as np +import requests +from flask import Flask, Response + +import utils + +app_flask = Flask(__name__) + +# 全局变量,用于存储最新的帧 +latest_frame:np.ndarray = None +lock = threading.Lock() +is_running = True + +def update_latest_frame(n_bytes:np.ndarray): + global latest_frame + latest_frame=n_bytes + +def generate_mjpeg(): + """生成MJPEG格式的视频流""" + while is_running: + with lock: + if latest_frame is None: + continue + _,latest_frame_bytes = utils.frame_to_img(latest_frame) + frame = latest_frame_bytes.tobytes() + pass + sleep(0.1) + yield (b'--frame\r\n' + b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') + + +@app_flask.route('/video_flow') +def video_feed(): + """视频流路由""" + return Response( + generate_mjpeg(), + mimetype='multipart/x-mixed-replace; boundary=frame' + ) + +# 添加关闭路由 +@app_flask.route('/shutdown', methods=['POST']) +def shutdown(): + os.kill(os.getpid(), signal.SIGTERM) + return 'Server shutting down...' +def run(): + port=2240 + print(f"推流服务启动,访问端口 127.0.0.1:{port}/video_flow") + app_flask.run(host='0.0.0.0', port=port, threaded=True) + +def video_server_run(): + thread = threading.Thread(target=run) + thread.daemon = True # 设置为守护线程,主线程退出时自动终止 + thread.start() +def stop(): + global is_running + is_running = False + # 通过HTTP请求关闭Flask服务器 + try: + response = requests.post('http://localhost:2240/shutdown', timeout=1) + print(f"服务器关闭请求发送成功,状态码: {response.status_code}") + except requests.exceptions.ConnectionError as e: + print(f"无法连接到服务器: {e}") + except requests.exceptions.Timeout as e: + print(f"关闭请求超时: {e}") + except requests.exceptions.RequestException as e: + print(f"发送关闭请求时发生错误: {e}") + except Exception as e: + print(f"关闭服务器时发生未知错误: {e}") + + + +if __name__ == '__main__': + run() \ No newline at end of file