diff --git a/app.py b/app.py index 693123b..3525efb 100644 --- a/app.py +++ b/app.py @@ -1,10 +1,12 @@ import logging import os import sys +from datetime import datetime from time import sleep import signal import configSer import logSet +import stabilize.win import tcp_Ser import upload.DataReporter import videoDetection @@ -47,31 +49,42 @@ if __name__ == '__main__': else: print("release 模式") - config_path_abs=read_config_path() + # 确保imgs目录存在 + imgs_dir = "imgs" + if not os.path.exists(imgs_dir): + os.makedirs(imgs_dir) + # 读取配置文件 + config_path_abs=read_config_path() config_obj= configSer.ConfigOperate(config_path_abs) json_str = config_obj.config_info.to_json(indent=4) logging.info(f"当前配置:{json_str}") tcp_service = tcp_Ser.TcpSer("0.0.0.0", config_obj.config_info.server.port) + tcp_service.daemon=True tcp_service.start() reporter = upload.DataReporter.DataReporter(data_fps=config_obj.config_info.fps.data,video_fps=config_obj.config_info.fps.video) - reporter.register_handler(tcp_service.broadcast_message) + reporter.register_handler(tcp_service.broadcast_message,"tcp") if config_obj.config_info.upload.enable: mq_config=config_obj.config_info.upload.mqtt - mq = upload.mqttHelper.MQTTClient(broker=mq_config.broker, + upload.DataReporter.mq_client = upload.mqttHelper.MQTTClient(broker=mq_config.broker, port=mq_config.port, topic=mq_config.topic, username=mq_config.username, password=mq_config.password, - client_id=mq_config.client_id) - if mq.start(): - reporter.register_handler(mq.publish) - else: - logging.warn("mq链接失败,不上报") + client_id=mq_config.client_id+"_"+datetime.now().strftime("%y%m%d_%H%M%S"), + ) + try: + upload.DataReporter.start_mqtt_connection_thread() + reporter.register_handler(upload.DataReporter.mq_client.publish,"mqtt") + except Exception as e: + logging.warn("mqtt通讯异常", e) + sleep(2) reporter.start() + + # 启动video videoDetection.configObj=config_obj videoDetection.reporter=reporter @@ -83,6 +96,15 @@ if __name__ == '__main__': # 推流 videoPush.video_server_run() + #数据平滑窗口 + if config_obj.config_info.win.enable: + method=config_obj.config_info.win.method + size=config_obj.config_info.win.size + threshold=config_obj.config_info.win.threshold + img_threshold=config_obj.config_info.win.imgThreshold + video_processor.adaptive_smoother=stabilize.win.Win.create_window(method,size, threshold,img_threshold) + else: + video_processor.adaptive_smoother=None # 启动 video_processor.video_mode(config_obj.config_info.capture) diff --git a/build/config.json b/build/config.json new file mode 100644 index 0000000..4079585 --- /dev/null +++ b/build/config.json @@ -0,0 +1,149 @@ +{ + "mac": "mac123", + "server": { + "port": 2230 + }, + "fps": { + "data": 5, + "video": 0 + }, + "alert": { + "enable": false, + "intervalSec": 6 + }, + "win": { + "enable": true, + "method": "median", + "size": 5, + "threshold": -0.1, + "imgThreshold": 10.0 + }, + "upload": { + "mqtt": { + "broker": "218.3.126.49", + "port": 1883, + "topic": "wybb/zj/mqtt179_debug", + "username": "", + "password": "", + "client_id": "wybb_zj_123_debug" + }, + "enable": true + }, + "capture": "rtsp://192.168.1.10:554/user=admin&password=&channel=1&stream=0.sdp?", + "targets": { + "101": { + "info": { + "rectangle_area": { + "x": 758, + "y": 1267, + "w": 111, + "h": 108 + }, + "threshold": { + "binary": 100, + "gauss": 1, + "gradient": 100, + "anchor": 80 + }, + "radius": 40.0, + "id": 101, + "desc": "T_1", + "base": false + }, + "perspective": [ + [ + 1.0, + 0.0, + 0.0 + ], + [ + 0.0, + 1.0, + 0.0 + ], + [ + 0.0, + 0.0, + 1.0 + ] + ], + "handler_info": null + }, + "102": { + "info": { + "rectangle_area": { + "x": 2043, + "y": 1379, + "w": 140, + "h": 129 + }, + "threshold": { + "binary": 100, + "gauss": 1, + "gradient": 100, + "anchor": 80 + }, + "radius": 40.0, + "id": 102, + "desc": "T_2", + "base": false + }, + "perspective": [ + [ + 1.0, + 0.0, + 0.0 + ], + [ + 0.0, + 1.0, + 0.0 + ], + [ + 0.0, + 0.0, + 1.0 + ] + ], + "handler_info": null + }, + "103": { + "info": { + "rectangle_area": { + "x": 2789, + "y": 1410, + "w": 152, + "h": 142 + }, + "threshold": { + "binary": 100, + "gauss": 1, + "gradient": 100, + "anchor": 80 + }, + "radius": 40.0, + "id": 103, + "desc": "T_3", + "base": false + }, + "perspective": [ + [ + 1.0, + 0.0, + 0.0 + ], + [ + 0.0, + 1.0, + 0.0 + ], + [ + 0.0, + 0.0, + 1.0 + ] + ], + "handler_info": null + } + } +} \ No newline at end of file diff --git a/build/wuyuan.app b/build/wuyuan.app new file mode 100644 index 0000000..7c08d1b Binary files /dev/null and b/build/wuyuan.app differ diff --git a/build/wybb_watchdog.service b/build/wybb_watchdog.service index ca143e3..494f086 100644 --- a/build/wybb_watchdog.service +++ b/build/wybb_watchdog.service @@ -1,12 +1,13 @@ [Unit] Description = wybb watchdog service -After=multi-user.target +After = multi-user.target [Service] -ExecStart = /home/forlinx/lk/wuyuanbiaoba/dist/wuyuan.app #按照实际目录来填写 +ExecStartPre=/bin/sleep 8 +ExecStart = /home/forlinx/local/wuyuanbiaoba/dist/wuyuan.app Type = simple -Restart=always -RestartSec=30s # 检查一次,如果服务失败则重启 +Restart = always +RestartSec = 30s # 检查一次,如果服务失败则重启 [Install] diff --git a/config.json b/config.json index af1750b..0d2781a 100644 --- a/config.json +++ b/config.json @@ -1,25 +1,105 @@ { + "mac": "", "server": { "port": 2230 }, "fps": { - "data": 30, + "data": 1, "video": 0 }, "alert": { - "enable": false, - "intervalSec": 60 + "enable": true, + "intervalSec": 30 }, - "upload": { + "win": { "enable": true, + "method": "median", + "size": 20, + "threshold": -0.1, + "imgThreshold": 10.0 + }, + "upload": { "mqtt": { "broker": "218.3.126.49", "port": 1883, - "topic": "wybb/mqtt", - "client_id": "wybb_lk" - } + "topic": "wybb/zj/mqtt110_debug", + "username": "", + "password": "", + "client_id": "wybb_debug" + }, + "enable": true }, "capture": "C:/Users/Administrator/Videos/1k.mp4", "targets": { + "0": { + "info": { + "rectangle_area": { + "x": 524, + "y": 521, + "w": 50, + "h": 48 + }, + "threshold": { + "binary": 120, + "gauss": 1, + "gradient": 100, + "anchor": 80 + }, + "radius": 40.0, + "id": 0, + "desc": "bb_0", + "base": false + }, + "perspective": [], + "handler_info": { + "is_init": true, + "last_detected_time": 1766046547.513531, + "radius_pix": 16.498, + "pix_length": 2.4245363074312034, + "center_point": { + "x": 548.511, + "y": 543.337 + }, + "center_init": { + "x": 548.515, + "y": 543.271 + } + } + }, + "1": { + "info": { + "rectangle_area": { + "x": 729, + "y": 484, + "w": 68, + "h": 64 + }, + "threshold": { + "binary": 120, + "gauss": 1, + "gradient": 100, + "anchor": 80 + }, + "radius": 40.0, + "id": 1, + "desc": "bb_1", + "base": false + }, + "perspective": [], + "handler_info": { + "is_init": true, + "last_detected_time": 1766046547.513531, + "radius_pix": 17.024, + "pix_length": 2.3496240601503757, + "center_point": { + "x": 771.218, + "y": 515.245 + }, + "center_init": { + "x": 771.233, + "y": 515.17 + } + } + } } } \ No newline at end of file diff --git a/configSer.py b/configSer.py index 676a3fc..d7400cd 100644 --- a/configSer.py +++ b/configSer.py @@ -10,6 +10,7 @@ import numpy as np from dataclasses_json import dataclass_json import models.target +import tools.macInfo @dataclass_json @@ -29,6 +30,14 @@ class Alert: enable: bool = False intervalSec: int = 0 +@dataclass_json +@dataclass +class Win: + enable: bool = False + method: str = "median" + size: int = 1 + threshold: float = 1.0 + imgThreshold: float = 20.0 @dataclass_json @dataclass @@ -52,9 +61,11 @@ class Upload: @dataclass_json @dataclass class ConfigInfo: + mac: str server:Server fps:Fps alert:Alert + win:Win upload:Upload capture: str = "0" # 标靶配置 @@ -81,6 +92,10 @@ class ConfigOperate: dic=self.load2dict() dict_str = json.dumps(dic) self.config_info=ConfigInfo.from_json(dict_str) + # 初次加载 重置标靶初始状态 + self.reset_targets_init_state() + if self.config_info.mac=="": + self.config_info.mac =tools.macInfo.get_mac_address() def save2json_file(self): json_str = self.config_info.to_json(indent=4) @@ -107,6 +122,13 @@ class ConfigOperate: config_dict.update(updates) self.save_dict_config(config_dict) + def reset_targets_init_state(self): + """重置所有标靶的初始化状态""" + for target_id, target in self.config_info.targets.items(): + if target.handler_info is not None: + target.handler_info.is_init = False + + def convert_to_ndarray(matrix_data): """ 将 JSON 中的矩阵数据转换为 numpy ndarray。 diff --git a/main.py b/main.py index 2caf7ee..fcb1163 100644 --- a/main.py +++ b/main.py @@ -1,23 +1,9 @@ -from datetime import datetime -import sched -import time -from time import sleep - - -# 定义任务 -def print_event(name): - print(f"EVENT: {time.time()} - {name}") -def periodic_task(interval): - print(f"Task executed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-2]}") - scheduler.enter(interval, 1, periodic_task, (interval,)) - +def print_hi(name): + # 在下面的代码行中使用断点来调试脚本。 + print(f'Hi, {name}') # 按 Ctrl+F8 切换断点。 # 按装订区域中的绿色按钮以运行脚本。 if __name__ == '__main__': - # 初始化调度器 - scheduler = sched.scheduler(time.time) - scheduler.enter(0, 1, periodic_task, (0.33,)) # 每 5 秒执行一次 - scheduler.run() - print("===============") + print_hi('PyCharm') diff --git a/models/__pycache__/sampleMsg.cpython-313.pyc b/models/__pycache__/sampleMsg.cpython-313.pyc index 6461415..f9306ff 100644 Binary files a/models/__pycache__/sampleMsg.cpython-313.pyc and b/models/__pycache__/sampleMsg.cpython-313.pyc differ diff --git a/models/__pycache__/target.cpython-313.pyc b/models/__pycache__/target.cpython-313.pyc index b5d43f9..606615f 100644 Binary files a/models/__pycache__/target.cpython-313.pyc and b/models/__pycache__/target.cpython-313.pyc differ diff --git a/models/target.py b/models/target.py index b4f36b6..f4cced8 100644 --- a/models/target.py +++ b/models/target.py @@ -66,34 +66,19 @@ class TargetInfo: @dataclass_json @dataclass class HandlerInfo: - is_init:bool=True + is_init:bool=False last_detected_time:datetime=datetime.now() radius_pix:float= 1.0 - pix_length:float=0.0 + pix_length:float= 0.0 # 标靶中心 - center_point_queue:Deque[Point] = field(default_factory=lambda: deque(maxlen=10)) center_point: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) center_init : Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) # 标靶位移(像素) displacement_pix: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) displacement_phy: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None)) - def calculate_mean(self)-> Point: - """计算队列中所有数据的均值""" - if self.center_point_queue: - length=len(self.center_point_queue) - mean_x = sum(p.x for p in self.center_point_queue) / length - mean_y = sum(p.y for p in self.center_point_queue) / length - return Point(mean_x, mean_y) - else: - return None - def enqueue_center_point(self, data) -> Point: - """入队操作""" - self.center_point_queue.append(data) - return self.calculate_mean() - @dataclass_json @dataclass class CircleTarget: @@ -125,8 +110,8 @@ class CircleTarget: and self.handler_info.radius_pix !=0): # 单位像素->距离 self.handler_info.pix_length = self.info.radius / self.handler_info.radius_pix - offset_x = round(float(self.handler_info.displacement_pix.x * self.handler_info.pix_length), 3) - offset_y = round(float(self.handler_info.displacement_pix.y * self.handler_info.pix_length), 3) + offset_x = float(self.handler_info.displacement_pix.x * self.handler_info.pix_length) + offset_y = float(self.handler_info.displacement_pix.y * self.handler_info.pix_length) # logging.info(f"[{self.info.desc}] phy=> x={self.handler_info.displacement_pix.x}*{self.handler_info.pix_length} ={offset_x}") # logging.info(f"[{self.info.desc}] phy=> y={self.handler_info.displacement_pix.y}*{self.handler_info.pix_length} ={offset_y}") self.handler_info.displacement_phy = Point(offset_x, offset_y) diff --git a/requirements.txt b/requirements.txt index 683aab3..0100d13 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,8 @@ numpy~=2.2.2 +matplotlib~=3.10.3 dataclasses-json~=0.6.7 flask~=3.1.0 -matplotlib~=3.10.3 +scipy~=1.16.1 +paho-mqtt~=2.1.0 requests~=2.32.3 -opencv-python~=4.10 -paho-mqtt~=2.1.0 \ No newline at end of file +opencv-python~=4.10.0 \ No newline at end of file diff --git a/stabilize/algorithm/adaptiveSmoothingWindow.py b/stabilize/algorithm/adaptiveSmoothingWindow.py new file mode 100644 index 0000000..d2a6b31 --- /dev/null +++ b/stabilize/algorithm/adaptiveSmoothingWindow.py @@ -0,0 +1,120 @@ +# smoothing_window_improved.py +import threading +from collections import deque +from typing import Dict, List, Tuple +import numpy as np + +from models.sampleMsg import AllSensorData, SensorData + + +class AdaptiveSmoothingWindow: + """ + 一个自适应的数据平滑窗口。 + 当数据波动较小时,使用原始数据;当波动较大时,使用移动平均值进行平滑。 + """ + + def __init__(self,method:str ="median", window_size: int = 11, volatility_threshold: float = 2.0,img_threshold: float = 20.0): + """ + 初始化自适应平滑窗口。 + + :param window_size: 移动平均窗口的大小,默认为 11。 + :param volatility_threshold: 波动阈值。当新数据与上一个平滑值的差异超过此值时, + 将触发平滑。默认为 2.0。 + """ + if window_size <= 0: + raise ValueError(f"窗口大小 'window_size={window_size}' 必须是正数。") + + self.method = method + self.window_size = window_size + self.volatility_threshold = volatility_threshold + self.img_Threshold = img_threshold + # 内部存储结构: {sensor_pos: deque of (x, y) tuples} + self.sensor_history: Dict[str, deque[Tuple[float, float]]] = {} + + # 存储每个传感器上一次输出的平滑值: {sensor_pos: (last_smoothed_x, last_smoothed_y)} + self._last_smoothed_values: Dict[str, Tuple[float, float]] = {} + self._lock = threading.Lock() + + def resize(self,method:str ="median", window_size: int = 11, volatility_threshold: float = 2.0,img_threshold: float = 20.0): + with self._lock: + self.method = method + self.window_size = window_size + self.volatility_threshold = volatility_threshold + self.img_Threshold = img_threshold + # 内部存储结构: {sensor_pos: deque of (x, y) tuples} + self.sensor_history: Dict[str, deque[Tuple[float, float]]] = {} + + # 存储每个传感器上一次输出的平滑值: {sensor_pos: (last_smoothed_x, last_smoothed_y)} + self._last_smoothed_values: Dict[str, Tuple[float, float]] = {} + + def process(self, new_sensor_data: AllSensorData) -> (AllSensorData,bool): + """ + 处理一帧新的传感器数据,根据波动大小自适应地应用平滑。 + + :param new_sensor_data: 包含所有传感器最新数据的 AllSensorData 对象。 + :return: 一个新的 AllSensorData 对象,其中包含了自适应平滑后的数据。 + """ + smoothed_sensor_list: List[SensorData] = [] + is_abnormal=False + with self._lock: + for sensor in new_sensor_data.data: + pos = sensor.pos + + # 如果是新传感器,初始化其历史队列和上一个平滑值 + if pos not in self.sensor_history: + self.sensor_history[pos] = deque(maxlen=self.window_size) + # 首次出现,将当前值作为“上一个平滑值” + self._last_smoothed_values[pos] = (sensor.x, sensor.y) + + history_queue = self.sensor_history[pos] + history_queue.append((sensor.x, sensor.y)) + # print(f"====> {pos} 设备 滑窗数据 {len(history_queue)}/{self.window_size}") + last_x, last_y = self._last_smoothed_values[pos] + + # 决定当前使用哪个值作为输出 + current_output_x, current_output_y = sensor.x, sensor.y # 默认使用原始值 + + # 仅当窗口填满后,才考虑进行平滑 + if len(history_queue) == self.window_size: + # 计算窗口内的值 + xs, ys = zip(*history_queue) + # 初始化平滑值 + if self.method == "median": + sma_x = float(np.median(xs)) + sma_y = float(np.median(ys)) + else: # 默认使用均值 + sma_x = float(np.mean(xs)) + sma_y = float(np.mean(ys)) + + # 计算新原始数据与上一个平滑值之间的波动 + delta_x = abs(sensor.x - last_x) + delta_y = abs(sensor.y - last_y) + + # 如果 x 或 y 的波动超过阈值,则使用平滑值 + if delta_x >= self.volatility_threshold: + current_output_x = sma_x + if delta_y >= self.volatility_threshold: + current_output_y = sma_y + + #巨大变化 存储图片 + if delta_x >= self.img_Threshold: + is_abnormal = True + if delta_y >= self.img_Threshold: + is_abnormal = True + + # 创建新的 SensorData 对象 + smoothed_sensor = SensorData( + pos=pos, + desc=sensor.desc, + x=current_output_x, + y=current_output_y + ) + smoothed_sensor_list.append(smoothed_sensor) + + # 更新“上一个平滑值”为本次的输出值 + self._last_smoothed_values[pos] = (current_output_x, current_output_y) + + return AllSensorData( + data=smoothed_sensor_list, + time=new_sensor_data.time + ), is_abnormal \ No newline at end of file diff --git a/stabilize/algorithm/smoothingWindow.py b/stabilize/algorithm/smoothingWindow.py new file mode 100644 index 0000000..93182ef --- /dev/null +++ b/stabilize/algorithm/smoothingWindow.py @@ -0,0 +1,78 @@ +# smoothing_window.py + +from collections import deque +from typing import Dict, List, Tuple +import numpy as np # 使用 numpy 可以更高效地进行平均值计算 + +from models.sampleMsg import AllSensorData, SensorData + + +class SmoothingWindow: + """ + 一个用于对传感器数据进行简单移动平均 (SMA) 平滑处理的窗口。 + """ + def __init__(self, window_size: int = 10): + """ + 初始化平滑窗口。 + + :param window_size: 移动平均窗口的大小,默认为 10。 + """ + if window_size <= 0: + raise ValueError("窗口大小 'window_size' 必须是正数。") + + self.window_size = window_size + + # 内部存储结构: {sensor_pos: deque of (x, y) tuples} + # deque 会自动维护固定大小,当新元素加入而队列已满时,最老的元素会被移除 + self.sensor_history: Dict[str, deque[Tuple[float, float]]] = {} + + def process(self, all_sensor_data: AllSensorData) -> AllSensorData: + """ + 处理一帧新的传感器数据,对每个传感器的 x, y 坐标应用移动平均平滑。 + + :param all_sensor_data: 包含所有传感器最新数据的 AllSensorData 对象。 + :return: 一个新的 AllSensorData 对象,其中包含了平滑后的数据。 + 如果某个传感器的数据点少于窗口大小,则其值不会被平滑(保持原始值)。 + """ + smoothed_sensor_list: List[SensorData] = [] + + # 遍历当前帧中的每一个传感器数据 + for sensor in all_sensor_data.data: + # 如果是新传感器,为其创建一个新的 deque + if sensor.pos not in self.sensor_history: + self.sensor_history[sensor.pos] = deque(maxlen=self.window_size) + + # 获取该传感器的历史数据队列 + history_queue = self.sensor_history[sensor.pos] + + # 将新的 (x, y) 数据添加到队列末尾 + history_queue.append((sensor.x, sensor.y)) + + # 检查队列中的数据点数量是否达到了窗口大小 + if len(history_queue) == self.window_size: + # 如果达到窗口大小,则进行平滑计算 + + # 使用 numpy 将队列中的 x 和 y 值分别转换为数组,便于计算 + # history_queue 是一个 (x,y) 元组的列表,我们用 zip(*...) 来解包 + xs, ys = zip(*history_queue) + smoothed_x = np.mean(xs) + smoothed_y = np.mean(ys) + + # 创建一个新的 SensorData 对象,使用平滑后的值 + # 注意:desc 等其他字段保持不变 + smoothed_sensor = SensorData( + pos=sensor.pos, + desc=sensor.desc, + x=smoothed_x, + y=smoothed_y + ) + smoothed_sensor_list.append(smoothed_sensor) + else: + # 如果未达到窗口大小,不进行平滑,直接返回原始数据 + smoothed_sensor_list.append(sensor) + + # 创建并返回一个包含所有平滑后数据的新 AllSensorData 对象 + return AllSensorData( + data=smoothed_sensor_list, + time=all_sensor_data.time # 时间戳保持不变 + ) \ No newline at end of file diff --git a/stabilize/win.py b/stabilize/win.py new file mode 100644 index 0000000..242a124 --- /dev/null +++ b/stabilize/win.py @@ -0,0 +1,10 @@ +from functools import cached_property + +from stabilize.algorithm.adaptiveSmoothingWindow import AdaptiveSmoothingWindow + + +class Win: + @staticmethod + def create_window(method,size, threshold,img_threshold): + print(f"开启稳定窗口method={method},size={size}, threshold={threshold},img_threshold={img_threshold}") + return AdaptiveSmoothingWindow(method,size, threshold,img_threshold) \ No newline at end of file diff --git a/tcp_Ser.py b/tcp_Ser.py index 7c65dea..227792f 100644 --- a/tcp_Ser.py +++ b/tcp_Ser.py @@ -30,43 +30,61 @@ class TcpSer(threading.Thread): msg_str=data.decode('utf-8') if not data: break # 如果没有数据,退出循环 - print(f"从 {client_socket.getpeername()} 收到: {msg_str}") - # 反序列化为 实例 - s_cmd = Msg.from_json(msg_str) - valid_msg:bool=True - match s_cmd.cmd: - case "getBase": - self.on_data(s_cmd, valid_msg) - case "getPoints" | "setPoints": - self.on_data(s_cmd,valid_msg) - case "videoFps"| "dataFps": - self.on_data(s_cmd,valid_msg) - case "setCap": - self.on_data(s_cmd,valid_msg) - case "videoMode": - self.on_data(s_cmd,valid_msg) - case "clearZero": - self.on_data(s_cmd, valid_msg) - # todo 添加处理 - case _: - valid_msg = False - err_msg=f"valid cmd={s_cmd.cmd}" - resp=f"""{{"_from": "dev","cmd": "{s_cmd.cmd}","values": {{"operate": false,"err": "{err_msg}"}}}}""" - resp += "\n\n" - client_socket.sendall(resp.encode()) - logging.warn(f"非法命令 {resp}") + logging.info(f"从 {client_socket.getpeername()} 收到: {msg_str}") + # 按 \n\n (十六进制 0a 0a) 分割消息 + msg_str_list = msg_str.split('\n\n') + for msg_str in msg_str_list: + if msg_str=='\n\n' or len(msg_str)==0: + continue + if len(msg_str)<10: + logging.info(f"无效消息: |{msg_str}|, 十六进制: {msg_str.encode('utf-8').hex()}") + continue + # 反序列化为 实例 + s_cmd = Msg.from_json(msg_str) + valid_msg: bool = True + match s_cmd.cmd: + case "getBase": + self.on_data(s_cmd, valid_msg) + case "getPoints" | "setPoints": + self.on_data(s_cmd, valid_msg) + case "getDataFps" | "setDataFps": + self.on_data(s_cmd, valid_msg) + case "setCap": + self.on_data(s_cmd, valid_msg) + case "videoMode": + self.on_data(s_cmd, valid_msg) + case "clearZero": + self.on_data(s_cmd, valid_msg) + case "getId" | "setId": + self.on_data(s_cmd, valid_msg) + case "getAlert" | "setAlert": + self.on_data(s_cmd, valid_msg) + case "getWin" | "setWin": + self.on_data(s_cmd, valid_msg) + case "getMqtt" | "setMqtt": + self.on_data(s_cmd, valid_msg) + # todo 添加处理 + case _: + valid_msg = False + err_msg = f"valid cmd={s_cmd.cmd}" + resp = f"""{{"_from": "dev","cmd": "{s_cmd.cmd}","values": {{"operate": false,"err": "{err_msg}"}}}}""" + resp += "\n\n" + client_socket.sendall(resp.encode()) + logging.warn(f"非法命令 {resp}") + logging.info("通讯完成") + except ConnectionAbortedError: logging.warn("客户端已断开") self.clear_client(client_socket) return - except Exception as e: + except Exception as ex: err_msg = f"illegal content" resp = f"""{{"_from": "dev","cmd": "","values": {{"operate": false,"err": "{err_msg}"}}}}""" resp += "\n\n" client_socket.sendall(resp.encode()) - logging.warn(f"处理客户端命令出错: {e}") + logging.warn(f"处理客户端命令出错: {str(ex)},非法命令{msg_str}") self.clear_client(client_socket) return finally: @@ -137,7 +155,7 @@ class TcpSer(threading.Thread): # 为每个客户端启动一个线程 client_thread = threading.Thread(target=self.handle_client, args=(client_socket,)) - # client_thread.daemon = True # 守护线程,服务器关闭时自动结束 + client_thread.daemon = True # 守护线程,服务器关闭时自动结束 client_thread.start() except socket.timeout: continue # 超时继续循环 diff --git a/tools/macInfo.py b/tools/macInfo.py new file mode 100644 index 0000000..de50e26 --- /dev/null +++ b/tools/macInfo.py @@ -0,0 +1,23 @@ +import platform +import socket +import struct + +def get_mac_address(interface='eth0') -> str: + os_name = platform.system() + if os_name=="Windows": + return "mac_win_123" + else: + import fcntl + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + mac = fcntl.ioctl( + sock.fileno(), + 0x8927, + struct.pack('256s', interface[:15].encode('utf-8')) + ) + mac_address = ':'.join(['%02x' % b for b in mac[18:24]]) + return mac_address + + +if __name__ == '__main__': + mac_str=get_mac_address() + print(mac_str) \ No newline at end of file diff --git a/upload/DataReporter.py b/upload/DataReporter.py index 474ac7a..ec2dc4c 100644 --- a/upload/DataReporter.py +++ b/upload/DataReporter.py @@ -1,13 +1,56 @@ import logging import queue import sched +import sys import threading import time from concurrent.futures import ThreadPoolExecutor from datetime import datetime +from time import sleep + +import configSer import models.msg +from upload import mqttHelper from upload.RateLimiter import RateLimiter +mq_client:mqttHelper.MQTTClient = None + +# 启动 MQTT 连接线程 +def start_mqtt_connection_thread(): + """ + 启动一个后台线程用于 MQTT 连接 + """ + mqtt_thread = threading.Thread(target=connect_mqtt_with_retry, args=()) + mqtt_thread.daemon = True # 设置为守护线程,随主线程退出而结束 + mqtt_thread.start() + +def connect_mqtt_with_retry(max_retries=5, base_delay=10): + """ + 尝试连接 MQTT 并支持重试机制 + :param max_retries: 最大重试次数 + :param base_delay: 初始延迟时间(秒) + """ + is_mqtt_connected=False + retry_count = 1 + delay = base_delay + + while not is_mqtt_connected and retry_count <= max_retries: + try: + logging.info(f"尝试连接 MQTT... 第 {retry_count} 次") + if mq_client.start(): + logging.info("MQTT 连接成功!") + is_mqtt_connected= True + break + else: + logging.info("MQTT 连接失败,稍后重试...") + except Exception as e: + logging.info(f"MQTT 通信异常: {e}") + + retry_count += 1 + sleep(delay) + + if not mq_client.is_connected(): + logging.info(f"MQTT 连接失败,已达最大重试次数{max_retries}") class DataReporter(threading.Thread): def __init__(self,data_fps:int,video_fps:int=0): @@ -18,11 +61,11 @@ class DataReporter(threading.Thread): self.image_limiter = RateLimiter(max_rate=video_fps, time_window=1) # 弃用 图片限速: 1张/秒 self.data_limiter = RateLimiter(max_rate=data_fps, time_window=1) # 数据限速: 30条/秒 self.running = True - self.image_dropped = 0 # 统计丢弃的图片数量 - self.data_dropped = 0 # 统计丢弃的数据数量 + self.update = False self.daemon = True - self.callbacks = [] # 存储多个回调函数 - self.callback_executor = ThreadPoolExecutor(max_workers=4) # 创建线程池 + self.tcp_callbacks = [] # 存储多个回调函数 + self.mqtt_callbacks = [] # 存储多个回调函数 + self.callback_executor = ThreadPoolExecutor(max_workers=2) # 创建线程池 self.last_data = None # 存储最后一条数据 self.last_repeat_counts = 0 # 存储最后一条数据 @@ -30,77 +73,67 @@ class DataReporter(threading.Thread): self.schedulerEvent = None - def register_handler(self,handler_fun): - self.callbacks.append(handler_fun) + def register_handler(self,handler_fun,conn_type="mqtt"): + match conn_type: + case "mqtt": + self.mqtt_callbacks.append(handler_fun) + case "tcp": + self.tcp_callbacks.append(handler_fun) + case _: + logging.warn(f"不支持的回调连接类型: {conn_type}") def run(self): + now_timestamp = time.time() interval = 1.0 / self.data_limiter.max_rate - self.schedulerEvent=self.scheduler.enter(0, 1, self._schedule_data_reporting, (interval,)) + self.schedulerEvent=self.scheduler.enter(interval, 2, self._schedule_data_reporting, (interval,now_timestamp)) self.scheduler.run() - def _schedule_data_reporting(self,interval): + def _schedule_data_reporting(self,interval,now_timestamp): if self.running: - self.scheduler.enter(interval, 1, self._schedule_data_reporting, (interval,)) # 每 x 秒执行一次 - self._report_data_if_allowed() + if self.update: + interval = 1.0 / self.data_limiter.max_rate + self.update=False + next_timestamp = now_timestamp+interval + self.scheduler.enterabs(next_timestamp, 2, self._schedule_data_reporting, (interval,next_timestamp)) # 每 x 秒执行一次 + threading.Thread(target=self.upload, daemon=True).start() + + def upload(self): + self._report_data_if_allowed() + # 告警 + self._report_alert_if_allowed() + sleep(0.1) def _report_data_if_allowed(self): - # logging.info(f"Reporting data -> start") - if self.data_limiter.allow_request(): - if not self.data_queue.empty(): - try: - data = self.data_queue.get_nowait() - self.last_data = data - self.last_repeat_counts = 0 - self._report_data(data, self.last_repeat_counts) - except queue.Empty: - pass - elif self.last_data is not None and self.last_repeat_counts < 5: - self.last_repeat_counts += 1 - self._report_data(self.last_data, self.last_repeat_counts) - def run2(self): - while self.running: - - # alert上报 - if not self.alert_queue.empty(): - try: - alert_data = self.alert_queue.get_nowait() - self._report_alert(alert_data) - except queue.Empty: - pass - - # image上报 - # if not self.image_queue.empty() and self.image_limiter.allow_request(): - # try: - # image_data = self.image_queue.get_nowait() - # self._report_image(image_data) - # except queue.Empty: - # pass - - # 然后处理数据上报 - if self.data_limiter.allow_request(): - if not self.data_queue.empty(): - try: - data = self.data_queue.get_nowait() - self.last_data = data # 保存最后一条数据 - self.last_repeat_counts =0 - self._report_data(data,self.last_repeat_counts) - except queue.Empty: - pass - elif self.last_data is not None: - # 如果队列为空但有限速许可,并且存在最后一条数据,则重复上报最后一条数据 - if self.last_repeat_counts < 5: - self.last_repeat_counts+=1 - self._report_data(self.last_data, self.last_repeat_counts) - else: - # logging.warn("不允许上报") + if not self.data_queue.empty(): + data = self.data_queue.get_nowait() + self.last_data = data + self.last_repeat_counts = 0 + self._report_data(data, self.last_repeat_counts) + elif self.last_data is not None and self.last_repeat_counts < 6: + self.last_repeat_counts += 1 + self._report_data(self.last_data, self.last_repeat_counts) + + def _report_alert_if_allowed(self): + # alert上报 + if not self.alert_queue.empty(): + try: + alert_data = self.alert_queue.get_nowait() + self._report_alert(alert_data) + except queue.Empty: pass - time.sleep(0.02) # 避免CPU占用过高 - def _execute_callbacks(self, msg_json): """ 执行所有注册的回调函数 """ - for callback in self.callbacks: + # logging.info(f"will Reporting to {len(self.tcp_callbacks)} tcp client") + for callback in self.tcp_callbacks: + try: + # 提交到线程池异步执行 + self.callback_executor.submit(callback, msg_json) + except Exception as e: + print(f"Error executing callback {callback.__name__}: {e}") + def _execute_mqtt_callbacks(self, msg_json): + for callback in self.mqtt_callbacks: try: # 提交到线程池异步执行 self.callback_executor.submit(callback, msg_json) @@ -111,6 +144,7 @@ class DataReporter(threading.Thread): # 这里替换为实际的上报代码 msg = models.msg.Msg(_from="dev", cmd="alert", values=data[1]) msg_json = msg.to_json() + self._execute_mqtt_callbacks(msg_json) self._execute_callbacks(msg_json) def _report_image(self, data): @@ -122,28 +156,41 @@ class DataReporter(threading.Thread): self._execute_callbacks(msg_json) def _report_data(self, data, last_repeat_counts=0): - # 实现数据上报逻辑 - logging.info(f"Reporting [{last_repeat_counts}] data: {data}") # 实际的上报代码,数据结构转换 msg=models.msg.Msg(_from="dev",cmd="data",values=data[1]) # 重新格式化时间,数据等间隔 msg.values.time=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + logging.info(f"Reporting [{last_repeat_counts}] data: {data}") msg_json=msg.to_json() + self._execute_mqtt_callbacks(msg_json) self._execute_callbacks(msg_json) def stop(self): self.running = False self.join() - print(f"Stats: {self.data_dropped} data dropped") - - def adjust_rate(self, new_rate, data_type='image'): - if data_type == 'image': - with self.image_limiter.lock: - self.image_limiter.max_rate = new_rate - self.image_queue= queue.Queue(maxsize=new_rate) - self.image_limiter.update_interval() - else: - with self.data_limiter.lock: - self.data_limiter.max_rate = new_rate - self.data_queue = queue.Queue(maxsize=new_rate) - self.data_limiter.update_interval() \ No newline at end of file + + def adjust_rate(self, new_rate, data_type='data'): + with self.data_limiter.lock: + self.data_limiter.max_rate = new_rate + self.data_queue = queue.Queue(maxsize=new_rate) + self.update=True + self.data_limiter.update_interval() + + def adjust_mqtt(self, conf: configSer.Upload) -> bool: + global mq_client + self.mqtt_callbacks=[] + if mq_client is not None and mq_client.is_connected(): + mq_client.stop() + + if conf.enable: + mq_client = mqttHelper.MQTTClient(**conf.mqtt) + try: + if mq_client.start(): + self.register_handler(mq_client.publish,"mqtt") + else: + logging.warn("mq链接失败,不上报") + return False + except Exception as e: + logging.warn("adjust_mqtt 通讯异常", e) + sys.exit(0) + return True \ No newline at end of file diff --git a/upload/__pycache__/DataReporter.cpython-313.pyc b/upload/__pycache__/DataReporter.cpython-313.pyc index 2747261..07ff69f 100644 Binary files a/upload/__pycache__/DataReporter.cpython-313.pyc and b/upload/__pycache__/DataReporter.cpython-313.pyc differ diff --git a/upload/__pycache__/RateLimiter.cpython-313.pyc b/upload/__pycache__/RateLimiter.cpython-313.pyc index d076e90..035c724 100644 Binary files a/upload/__pycache__/RateLimiter.cpython-313.pyc and b/upload/__pycache__/RateLimiter.cpython-313.pyc differ diff --git a/upload/__pycache__/mqttHelper.cpython-313.pyc b/upload/__pycache__/mqttHelper.cpython-313.pyc index 1f70d61..663daf1 100644 Binary files a/upload/__pycache__/mqttHelper.cpython-313.pyc and b/upload/__pycache__/mqttHelper.cpython-313.pyc differ diff --git a/upload/mqttHelper.py b/upload/mqttHelper.py index fb9feae..5913c7a 100644 --- a/upload/mqttHelper.py +++ b/upload/mqttHelper.py @@ -1,14 +1,13 @@ -import json import logging -import random import time from datetime import datetime +from time import sleep from paho.mqtt import client as mqtt_client class MQTTClient: - def __init__(self, broker='218.3.126.49', port=1883, topic="wybb/mqtt", + def __init__(self, broker='218.3.126.49', port=1883, topic="wybb/default", username='emqx', password='public',client_id=''): self.BROKER = broker self.PORT = port @@ -27,6 +26,7 @@ class MQTTClient: self.FLAG_EXIT = False self.client = None + def on_connect(self, client, userdata, flags, rc, properties=None): if rc == 0 and client.is_connected(): print("Connected to MQTT Broker!") @@ -58,19 +58,21 @@ class MQTTClient: self.client.username_pw_set(self.USERNAME, self.PASSWORD) self.client.on_connect = self.on_connect self.client.on_disconnect = self.on_disconnect - self.client.connect(self.BROKER, self.PORT, keepalive=120) + self.client.connect(self.BROKER, self.PORT, keepalive=60) return self.client - def publish(self, msg:str): + def is_connected(self) -> bool: + return self.client.is_connected() + + def publish(self, message:str): if not self.client: logging.error("MQTT client is not initialized!") return if not self.client.is_connected(): logging.error("publish: MQTT client is not connected!") - time.sleep(1) return - result = self.client.publish(self.TOPIC, msg) + result = self.client.publish(self.TOPIC, message,qos=1) # result: [0, 1] status = result[0] if status == 0: @@ -82,7 +84,7 @@ class MQTTClient: def start(self): self.client = self.connect() self.client.loop_start() - time.sleep(1) + time.sleep(0.5) if self.client.is_connected(): logging.info("MQTT client is connected!") else: @@ -93,6 +95,7 @@ class MQTTClient: def stop(self): if self.client: self.client.loop_stop() + sleep(1) self.client.disconnect() diff --git a/videoDetection.py b/videoDetection.py index 6c94c67..7eddcc0 100644 --- a/videoDetection.py +++ b/videoDetection.py @@ -1,4 +1,6 @@ import copy +import os +import glob import threading from dataclasses import asdict from datetime import datetime @@ -18,6 +20,7 @@ import models.sampleMsg import upload.DataReporter import utils import videoPush +from stabilize.algorithm.adaptiveSmoothingWindow import AdaptiveSmoothingWindow from models.msg import Msg logging.basicConfig(level=logging.DEBUG) @@ -71,12 +74,14 @@ def read_target_rectangle(): return configObj.config_info.targets class VideoProcessor: - reporter: upload.DataReporter.DataReporter + adaptive_smoother:AdaptiveSmoothingWindow capture: cv2.VideoCapture capturePath: str="" is_opened: bool= False is_running = True is_clear_zero:bool=False + last_save_abnormal_img_time=time.time() + last_save_lost_img_time = time.time() # 全局配置锁 targets_lock: threading.Lock = threading.Lock() def __init__(self): @@ -84,7 +89,7 @@ class VideoProcessor: pass def on_data(self,msg:Msg): - global configObj,is_video_mode + global configObj,is_video_mode,reporter logging.info(f"msg={msg}") match msg.cmd: case "getBase": @@ -101,7 +106,7 @@ class VideoProcessor: for k,v in targets.items(): targets[k].handler_info=None - resp_msg = models.msg.Msg(_from="dev", cmd="getPoints", values={"targets": targets}) + resp_msg = models.msg.Msg(_from="dev", cmd=msg.cmd, values={"targets": targets}) resp_json = resp_msg.to_json_() return resp_json case "setPoints": @@ -120,43 +125,95 @@ class VideoProcessor: resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True}) resp_json = resp_msg.to_json() return resp_json - case "videoFps": - v = msg.values - fps = v["fps"] - self.reporter.adjust_rate(fps,"image") - configObj.config_info.fps.video = fps - configObj.save2json_file() - resp_msg = models.msg.Msg(_from="dev", cmd="videoFps", values={"operate": True}) + case "getDataFps": + fps=configObj.config_info.fps.data + resp_msg = models.msg.Msg(_from="dev", cmd=msg.cmd, values={"dataFps": fps}) resp_json = resp_msg.to_json() return resp_json - case "dataFps": + case "setDataFps": v = msg.values - fps = v["fps"] - self.reporter.adjust_rate(fps,"data") + fps = v["dataFps"] configObj.config_info.fps.data=fps configObj.save2json_file() - resp_msg = models.msg.Msg(_from="dev", cmd="dataFps", values={"operate": True}) + reporter.adjust_rate(fps,"data") + resp_msg = models.msg.Msg(_from="dev", cmd=msg.cmd, values={"operate": True}) resp_json = resp_msg.to_json() return resp_json case "setCap": v = msg.values cap = v["cap"] self.switch_video(cap) - resp_msg = models.msg.Msg(_from="dev", cmd="setCap", values={"operate": True}) + resp_msg = models.msg.Msg(_from="dev", cmd=msg.cmd, values={"operate": True}) resp_json = resp_msg.to_json() return resp_json case "videoMode": v = msg.values is_debug = v["debug"] is_video_mode=is_debug - resp_msg = models.msg.Msg(_from="dev", cmd="videoMode", values={"operate": True}) + resp_msg = models.msg.Msg(_from="dev", cmd=msg.cmd, values={"operate": True}) resp_json = resp_msg.to_json() return resp_json case "clearZero": self.is_clear_zero=True - resp_msg = models.msg.Msg(_from="dev", cmd="clearZero", values={"operate": True}) + resp_msg = models.msg.Msg(_from="dev", cmd=msg.cmd, values={"operate": True}) resp_json = resp_msg.to_json() return resp_json + case "getId": + mac_address = configObj.config_info.mac + resp_msg = models.msg.Msg(_from="dev", cmd=msg.cmd, values={"id": mac_address}) + resp_json = resp_msg.to_json_() + return resp_json + case "setId": + v = msg.values + mac_id = v["id"] + configObj.config_info.mac=mac_id + configObj.save2json_file() + resp_msg = models.msg.Msg(_from="dev", cmd=msg.cmd, values={"operate": True}) + resp_json = resp_msg.to_json_() + return resp_json + case "getAlert": + alert_info = configObj.config_info.alert + resp_msg = models.msg.Msg(_from="dev", cmd=msg.cmd, values=alert_info) + resp_json = resp_msg.to_json_() + return resp_json + case "setAlert": + v = msg.values + enable = v["enable"] + interval_sec = v["intervalSec"] + configObj.config_info.alert.enable = enable + configObj.config_info.alert.intervalSec = interval_sec + configObj.save2json_file() + resp_msg = models.msg.Msg(_from="dev", cmd=msg.cmd, values={"operate": True}) + resp_json = resp_msg.to_json_() + return resp_json + case "getWin": + win_info = configObj.config_info.win + resp_msg = models.msg.Msg(_from="dev", cmd=msg.cmd, values=win_info) + resp_json = resp_msg.to_json_() + return resp_json + case "setWin": + v = msg.values + win= configSer.Win(** v) + configObj.config_info.win=win + configObj.save2json_file() + self.adaptive_smoother.resize(window_size=win.size,volatility_threshold =win.threshold,img_threshold=win.imgThreshold) + resp_msg = models.msg.Msg(_from="dev", cmd=msg.cmd, values={"operate": True}) + resp_json = resp_msg.to_json_() + return resp_json + case "getMqtt": + win_info = configObj.config_info.upload + resp_msg = models.msg.Msg(_from="dev", cmd=msg.cmd, values=win_info) + resp_json = resp_msg.to_json_() + return resp_json + case "setMqtt": + v = msg.values + mq_upload = configSer.Upload(**v) + configObj.config_info.upload = mq_upload + configObj.save2json_file() + ok=reporter.adjust_mqtt(mq_upload) + resp_msg = models.msg.Msg(_from="dev", cmd=msg.cmd, values={"operate": ok}) + resp_json = resp_msg.to_json_() + return resp_json print("==") @@ -184,7 +241,7 @@ class VideoProcessor: base_point_pix:models.target.Point=None #上报有新数据的点 - all_upload_data = models.sampleMsg.AllSensorData(data=[], time=now_str) + frame_upload_data = models.sampleMsg.AllSensorData(data=[], time=now_str) with self.targets_lock: # 绘图-历史点 @@ -244,13 +301,14 @@ class VideoProcessor: # cv2.imshow(f'{tr.info.id}_binaryImg', sub_binary_frame) # - circles = self.circle_detect_Edges(sub_binary_frame,tr.info.threshold.gradient,tr.info.threshold.anchor) if __debug__: cv2.imshow(f'{tr.info.id}_binaryImg', sub_binary_frame) if len(circles) == 0: + # 识别不到的异常图像 + self.save_lost_img(img) continue elif len(circles) > 1: logging.info(f"标靶[{tr.info.desc}],匹配圆{len(circles)}个") @@ -263,14 +321,11 @@ class VideoProcessor: # 纪录圆心位置 if tr.handler_info is None: tr.handler_info= models.target.HandlerInfo() - if tr.handler_info.is_init: - tr.handler_info.is_init=False + if not tr.handler_info.is_init: + tr.handler_info.is_init=True tr.handler_info.center_init = center - - # 数据窗口平滑处理 - # smooth_center = tr.handler_info.enqueue_center_point(center) - # tr.handler_info.center_point = smooth_center - # print(f"{tr.info.desc},平滑len={len(tr.handler_info.center_point_queue)},平滑中心点={smooth_center},原始点={center}") + #存储首次中心数据 + configObj.save2json_file() # 原始处理 tr.handler_info.center_point=center @@ -305,7 +360,7 @@ class VideoProcessor: pass tr.circle_displacement_phy() if tr.handler_info.displacement_phy is not None: - all_upload_data.data.append( + frame_upload_data.data.append( models.sampleMsg.SensorData( str(tr.info.id), tr.info.desc, @@ -324,16 +379,101 @@ class VideoProcessor: if target.handler_info is not None: target.handler_info.center_init = target.handler_info.center_point self.is_clear_zero = False + #清零重新存储测点位置 + configObj.save2json_file() + + is_abnormal=self.sink_data(frame_upload_data) + if is_abnormal: + self.save_abnormal_img(img) + def save_abnormal_img(self, frame, imgs_dir="/home/forlinx/Pictures"): + """ + 保存当前图像到本地文件系统imgs目录下,最多保留10张图片 + """ - #过滤无效空数据 - if len(all_upload_data.data)==0: + # 检查是否需要限制存储频率(每秒最多一张) + current_time = time.time() + offset_time=current_time - self.last_save_abnormal_img_time + if offset_time < 1: return + else: + self.last_save_abnormal_img_time = current_time + + # 检查并创建目录 + if not os.path.exists(imgs_dir): + os.makedirs(imgs_dir, exist_ok=True) + + # 获取当前时间戳用于命名图片 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{timestamp}.jpg" + filepath = os.path.join(imgs_dir, filename) + + # 查找已存在的异常图片文件 + existing_files = glob.glob(os.path.join(imgs_dir, "*.jpg")) + + # 如果已存在10张或更多图片,删除最早的一张 + if len(existing_files) >= 10: + # 按文件名排序(按时间顺序) + existing_files.sort() + # 删除最早的文件 + os.remove(existing_files[0]) + + # 保存当前图片 + cv2.imwrite(filepath, frame) + def save_lost_img(self, frame,imgs_dir="/home/forlinx/Pictures/lost"): + # 检查是否需要限制存储频率(每秒最多一张) + current_time = time.time() + offset_time=current_time - self.last_save_lost_img_time + if offset_time < 10: + return + else: + self.last_save_lost_img_time = current_time + logging.info(f"无法识别的帧 => 存储") + # 检查并创建目录 + if not os.path.exists(imgs_dir): + os.makedirs(imgs_dir, exist_ok=True) + + # 获取当前时间戳用于命名图片 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{timestamp}.jpg" + filepath = os.path.join(imgs_dir, filename) + + # 查找已存在的异常图片文件 + existing_files = glob.glob(os.path.join(imgs_dir, "*.jpg")) + + # 如果已存在10张或更多图片,删除最早的一张 + if len(existing_files) >= 10: + # 按文件名排序(按时间顺序) + existing_files.sort() + # 删除最早的文件 + os.remove(existing_files[0]) + + # 保存当前图片 + cv2.imwrite(filepath, frame) + + def sink_data(self, d: models.sampleMsg.AllSensorData): + is_abnormal= False + # 过滤无效空数据 + if len(d.data) == 0: + return is_abnormal if configObj.config_info.fps.data > 0: - self.enqueue_data(all_upload_data) + new_d, is_abnormal=self.pre_handler_data(d) + # 处理数据精度 + for i, snd in enumerate(new_d.data): + snd.x = round(snd.x, 3) + snd.y = round(snd.y, 3) + new_d.data[i]=snd + self.enqueue_data(new_d) + + return is_abnormal + + def pre_handler_data(self, data: models.sampleMsg.AllSensorData) -> (models.sampleMsg.AllSensorData,bool): + if self.adaptive_smoother is None: + return data,False + smoothed_result,is_abnormal = self.adaptive_smoother.process(data) + return smoothed_result,is_abnormal - def circle_match(self,circles, rect_s_point:models.target.Point)-> (models.target.Point,float): @@ -386,7 +526,7 @@ class VideoProcessor: # txt_location = (center_int[0] - radius_int, center_int[1] + radius_int + 10) cv2.putText(img, text1, txt_location, font, scale, color, 1) txt_location2 = (rect_s_point, rect_e_point+20) - text2 = f"{tr.handler_info.displacement_phy.x,tr.handler_info.displacement_phy.y}" + text2 = f"{round(tr.handler_info.displacement_phy.x,3),round(tr.handler_info.displacement_phy.y,3)}" cv2.putText(img, text2, txt_location2, font, scale, color, 1) def circle_detect(self, img) -> list: @@ -480,7 +620,7 @@ class VideoProcessor: logging.info(f"{video_id}地址->{self.capture}") if not self.capture.isOpened(): self.capture.release() - logging.warn(f"无法打开摄像头{video_id}, release地址 -> {self.capture}") + logging.warn(f"无法打开摄像头{video_id}") return if frame_width==640: self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, 4224) # 宽度 @@ -506,34 +646,44 @@ class VideoProcessor: def show_video(self): camera_err_counts=0 + camera_switch_counts=0 global sigExit,start_point, end_point, drawing if __debug__: cv2.namedWindow('Frame') cv2.setMouseCallback('Frame', add_rectangle) # 读取一帧图像 while self.is_running: - if camera_err_counts >= 10: + if camera_err_counts >= 5: logging.warn(f"读取摄像头异常,准备重新切换") self.switch_video(str(self.capturePath)) camera_err_counts = 0 + camera_switch_counts += 1 continue + if camera_switch_counts >= 2: + logging.warn(f"切换摄像头异常{camera_switch_counts}次,准备重启") + self.stop() + logging.warn(f"执行退出...") + sleep(2) + os._exit(0) + + if not self.is_opened: - logging.warn(f"摄像头 标记is_opened={self.is_opened}") - sleep(5) + logging.warn(f"摄像头 标记is_opened={self.is_opened} [{camera_switch_counts}|{camera_err_counts}]") + sleep(2) camera_err_counts += 1 continue + # sleep(0.02) ret, frame = self.capture.read() if not ret: camera_err_counts+=1 logging.warn(f"${camera_err_counts}次,无法读取帧,cap地址- >{self.capture}") sleep(1) continue - + # logging.info(f"处理图像帧==>start") self.frame_handle(frame) - - cv2.waitKey(1) + # cv2.waitKey(1) # 显示图像 if frame is not None: if __debug__: @@ -556,7 +706,6 @@ class VideoProcessor: cv2.destroyAllWindows() def frame_handle(self,frame): - # logging.info(f"处理图像帧==>") # 绘图-历史点 self.draw_rectangle(frame) # 绘图-实时 @@ -602,6 +751,7 @@ class VideoProcessor: # 将时间戳转换为 datetime 对象 dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位 # 放入图片队列(自动丢弃最旧数据当队列满时) + try: reporter.data_queue.put((dt, data), block=False) except queue.Full: @@ -642,15 +792,17 @@ def task_timeout(): offline_monitor() if not configObj.config_info.alert.enable: return - now_time=datetime.now() + now_time:datetime=datetime.now() now_str = now_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] all_upload_alert = models.sampleMsg.AllAlert(alert=[], time=now_str) for t_id, tr in configObj.config_info.targets.items(): # 是否offline超时告警状态 - if tr.handler_info is not None: + if tr.handler_info is not None and tr.handler_info.is_init: delta_s = round((now_time - tr.handler_info.last_detected_time).total_seconds()) if delta_s > configObj.config_info.alert.intervalSec: - ad = models.sampleMsg.AlertData(tr.info.desc, delta_s) + # 舍去余数,向下取整到intervalSec的整数倍 + adjusted_delta_s = (delta_s // configObj.config_info.alert.intervalSec) * configObj.config_info.alert.intervalSec + ad = models.sampleMsg.AlertData(tr.info.desc, adjusted_delta_s) all_upload_alert.alert.append(ad) if len(all_upload_alert.alert) > 0: enqueue_alert(all_upload_alert) diff --git a/videoPush.py b/videoPush.py index 0dbdb82..7d7845e 100644 --- a/videoPush.py +++ b/videoPush.py @@ -50,7 +50,7 @@ def shutdown(): return 'Server shutting down...' def run(): port=2240 - print(f"推流服务启动,访问端口 127.0.0.1:{port}/video_flow") + print(f"推流服务启动, 127.0.0.1:{port}/video_flow\n") app_flask.run(host='0.0.0.0', port=port, threaded=True) def video_server_run():