You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

78 lines
3.3 KiB

# smoothing_window.py
from collections import deque
from typing import Dict, List, Tuple
import numpy as np # 使用 numpy 可以更高效地进行平均值计算
from models.sampleMsg import AllSensorData, SensorData
class SmoothingWindow:
"""
一个用于对传感器数据进行简单移动平均 (SMA) 平滑处理的窗口。
"""
def __init__(self, window_size: int = 10):
"""
初始化平滑窗口。
:param window_size: 移动平均窗口的大小,默认为 10。
"""
if window_size <= 0:
raise ValueError("窗口大小 'window_size' 必须是正数。")
self.window_size = window_size
# 内部存储结构: {sensor_pos: deque of (x, y) tuples}
# deque 会自动维护固定大小,当新元素加入而队列已满时,最老的元素会被移除
self.sensor_history: Dict[str, deque[Tuple[float, float]]] = {}
def process(self, all_sensor_data: AllSensorData) -> AllSensorData:
"""
处理一帧新的传感器数据,对每个传感器的 x, y 坐标应用移动平均平滑。
:param all_sensor_data: 包含所有传感器最新数据的 AllSensorData 对象。
:return: 一个新的 AllSensorData 对象,其中包含了平滑后的数据。
如果某个传感器的数据点少于窗口大小,则其值不会被平滑(保持原始值)。
"""
smoothed_sensor_list: List[SensorData] = []
# 遍历当前帧中的每一个传感器数据
for sensor in all_sensor_data.data:
# 如果是新传感器,为其创建一个新的 deque
if sensor.pos not in self.sensor_history:
self.sensor_history[sensor.pos] = deque(maxlen=self.window_size)
# 获取该传感器的历史数据队列
history_queue = self.sensor_history[sensor.pos]
# 将新的 (x, y) 数据添加到队列末尾
history_queue.append((sensor.x, sensor.y))
# 检查队列中的数据点数量是否达到了窗口大小
if len(history_queue) == self.window_size:
# 如果达到窗口大小,则进行平滑计算
# 使用 numpy 将队列中的 x 和 y 值分别转换为数组,便于计算
# history_queue 是一个 (x,y) 元组的列表,我们用 zip(*...) 来解包
xs, ys = zip(*history_queue)
smoothed_x = np.mean(xs)
smoothed_y = np.mean(ys)
# 创建一个新的 SensorData 对象,使用平滑后的值
# 注意:desc 等其他字段保持不变
smoothed_sensor = SensorData(
pos=sensor.pos,
desc=sensor.desc,
x=smoothed_x,
y=smoothed_y
)
smoothed_sensor_list.append(smoothed_sensor)
else:
# 如果未达到窗口大小,不进行平滑,直接返回原始数据
smoothed_sensor_list.append(sensor)
# 创建并返回一个包含所有平滑后数据的新 AllSensorData 对象
return AllSensorData(
data=smoothed_sensor_list,
time=all_sensor_data.time # 时间戳保持不变
)