You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

120 lines
5.3 KiB

# smoothing_window_improved.py
import threading
from collections import deque
from typing import Dict, List, Tuple
import numpy as np
from models.sampleMsg import AllSensorData, SensorData
class AdaptiveSmoothingWindow:
"""
一个自适应的数据平滑窗口。
当数据波动较小时,使用原始数据;当波动较大时,使用移动平均值进行平滑。
"""
def __init__(self,method:str ="median", window_size: int = 11, volatility_threshold: float = 2.0,img_threshold: float = 20.0):
"""
初始化自适应平滑窗口。
:param window_size: 移动平均窗口的大小,默认为 11。
:param volatility_threshold: 波动阈值。当新数据与上一个平滑值的差异超过此值时,
将触发平滑。默认为 2.0。
"""
if window_size <= 0:
raise ValueError(f"窗口大小 'window_size={window_size}' 必须是正数。")
self.method = method
self.window_size = window_size
self.volatility_threshold = volatility_threshold
self.img_Threshold = img_threshold
# 内部存储结构: {sensor_pos: deque of (x, y) tuples}
self.sensor_history: Dict[str, deque[Tuple[float, float]]] = {}
# 存储每个传感器上一次输出的平滑值: {sensor_pos: (last_smoothed_x, last_smoothed_y)}
self._last_smoothed_values: Dict[str, Tuple[float, float]] = {}
self._lock = threading.Lock()
def resize(self,method:str ="median", window_size: int = 11, volatility_threshold: float = 2.0,img_threshold: float = 20.0):
with self._lock:
self.method = method
self.window_size = window_size
self.volatility_threshold = volatility_threshold
self.img_Threshold = img_threshold
# 内部存储结构: {sensor_pos: deque of (x, y) tuples}
self.sensor_history: Dict[str, deque[Tuple[float, float]]] = {}
# 存储每个传感器上一次输出的平滑值: {sensor_pos: (last_smoothed_x, last_smoothed_y)}
self._last_smoothed_values: Dict[str, Tuple[float, float]] = {}
def process(self, new_sensor_data: AllSensorData) -> (AllSensorData,bool):
"""
处理一帧新的传感器数据,根据波动大小自适应地应用平滑。
:param new_sensor_data: 包含所有传感器最新数据的 AllSensorData 对象。
:return: 一个新的 AllSensorData 对象,其中包含了自适应平滑后的数据。
"""
smoothed_sensor_list: List[SensorData] = []
is_abnormal=False
with self._lock:
for sensor in new_sensor_data.data:
pos = sensor.pos
# 如果是新传感器,初始化其历史队列和上一个平滑值
if pos not in self.sensor_history:
self.sensor_history[pos] = deque(maxlen=self.window_size)
# 首次出现,将当前值作为“上一个平滑值”
self._last_smoothed_values[pos] = (sensor.x, sensor.y)
history_queue = self.sensor_history[pos]
history_queue.append((sensor.x, sensor.y))
# print(f"====> {pos} 设备 滑窗数据 {len(history_queue)}/{self.window_size}")
last_x, last_y = self._last_smoothed_values[pos]
# 决定当前使用哪个值作为输出
current_output_x, current_output_y = sensor.x, sensor.y # 默认使用原始值
# 仅当窗口填满后,才考虑进行平滑
if len(history_queue) == self.window_size:
# 计算窗口内的值
xs, ys = zip(*history_queue)
# 初始化平滑值
if self.method == "median":
sma_x = float(np.median(xs))
sma_y = float(np.median(ys))
else: # 默认使用均值
sma_x = float(np.mean(xs))
sma_y = float(np.mean(ys))
# 计算新原始数据与上一个平滑值之间的波动
delta_x = abs(sensor.x - last_x)
delta_y = abs(sensor.y - last_y)
# 如果 x 或 y 的波动超过阈值,则使用平滑值
if delta_x >= self.volatility_threshold:
current_output_x = sma_x
if delta_y >= self.volatility_threshold:
current_output_y = sma_y
#巨大变化 存储图片
if delta_x >= self.img_Threshold:
is_abnormal = True
if delta_y >= self.img_Threshold:
is_abnormal = True
# 创建新的 SensorData 对象
smoothed_sensor = SensorData(
pos=pos,
desc=sensor.desc,
x=current_output_x,
y=current_output_y
)
smoothed_sensor_list.append(smoothed_sensor)
# 更新“上一个平滑值”为本次的输出值
self._last_smoothed_values[pos] = (current_output_x, current_output_y)
return AllSensorData(
data=smoothed_sensor_list,
time=new_sensor_data.time
), is_abnormal