# smoothing_window.py from collections import deque from typing import Dict, List, Tuple import numpy as np # 使用 numpy 可以更高效地进行平均值计算 from models.sampleMsg import AllSensorData, SensorData class SmoothingWindow: """ 一个用于对传感器数据进行简单移动平均 (SMA) 平滑处理的窗口。 """ def __init__(self, window_size: int = 10): """ 初始化平滑窗口。 :param window_size: 移动平均窗口的大小,默认为 10。 """ if window_size <= 0: raise ValueError("窗口大小 'window_size' 必须是正数。") self.window_size = window_size # 内部存储结构: {sensor_pos: deque of (x, y) tuples} # deque 会自动维护固定大小,当新元素加入而队列已满时,最老的元素会被移除 self.sensor_history: Dict[str, deque[Tuple[float, float]]] = {} def process(self, all_sensor_data: AllSensorData) -> AllSensorData: """ 处理一帧新的传感器数据,对每个传感器的 x, y 坐标应用移动平均平滑。 :param all_sensor_data: 包含所有传感器最新数据的 AllSensorData 对象。 :return: 一个新的 AllSensorData 对象,其中包含了平滑后的数据。 如果某个传感器的数据点少于窗口大小,则其值不会被平滑(保持原始值)。 """ smoothed_sensor_list: List[SensorData] = [] # 遍历当前帧中的每一个传感器数据 for sensor in all_sensor_data.data: # 如果是新传感器,为其创建一个新的 deque if sensor.pos not in self.sensor_history: self.sensor_history[sensor.pos] = deque(maxlen=self.window_size) # 获取该传感器的历史数据队列 history_queue = self.sensor_history[sensor.pos] # 将新的 (x, y) 数据添加到队列末尾 history_queue.append((sensor.x, sensor.y)) # 检查队列中的数据点数量是否达到了窗口大小 if len(history_queue) == self.window_size: # 如果达到窗口大小,则进行平滑计算 # 使用 numpy 将队列中的 x 和 y 值分别转换为数组,便于计算 # history_queue 是一个 (x,y) 元组的列表,我们用 zip(*...) 来解包 xs, ys = zip(*history_queue) smoothed_x = np.mean(xs) smoothed_y = np.mean(ys) # 创建一个新的 SensorData 对象,使用平滑后的值 # 注意:desc 等其他字段保持不变 smoothed_sensor = SensorData( pos=sensor.pos, desc=sensor.desc, x=smoothed_x, y=smoothed_y ) smoothed_sensor_list.append(smoothed_sensor) else: # 如果未达到窗口大小,不进行平滑,直接返回原始数据 smoothed_sensor_list.append(sensor) # 创建并返回一个包含所有平滑后数据的新 AllSensorData 对象 return AllSensorData( data=smoothed_sensor_list, time=all_sensor_data.time # 时间戳保持不变 )