You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
78 lines
3.3 KiB
78 lines
3.3 KiB
# smoothing_window.py
|
|
|
|
from collections import deque
|
|
from typing import Dict, List, Tuple
|
|
import numpy as np # 使用 numpy 可以更高效地进行平均值计算
|
|
|
|
from models.sampleMsg import AllSensorData, SensorData
|
|
|
|
|
|
class SmoothingWindow:
|
|
"""
|
|
一个用于对传感器数据进行简单移动平均 (SMA) 平滑处理的窗口。
|
|
"""
|
|
def __init__(self, window_size: int = 10):
|
|
"""
|
|
初始化平滑窗口。
|
|
|
|
:param window_size: 移动平均窗口的大小,默认为 10。
|
|
"""
|
|
if window_size <= 0:
|
|
raise ValueError("窗口大小 'window_size' 必须是正数。")
|
|
|
|
self.window_size = window_size
|
|
|
|
# 内部存储结构: {sensor_pos: deque of (x, y) tuples}
|
|
# deque 会自动维护固定大小,当新元素加入而队列已满时,最老的元素会被移除
|
|
self.sensor_history: Dict[str, deque[Tuple[float, float]]] = {}
|
|
|
|
def process(self, all_sensor_data: AllSensorData) -> AllSensorData:
|
|
"""
|
|
处理一帧新的传感器数据,对每个传感器的 x, y 坐标应用移动平均平滑。
|
|
|
|
:param all_sensor_data: 包含所有传感器最新数据的 AllSensorData 对象。
|
|
:return: 一个新的 AllSensorData 对象,其中包含了平滑后的数据。
|
|
如果某个传感器的数据点少于窗口大小,则其值不会被平滑(保持原始值)。
|
|
"""
|
|
smoothed_sensor_list: List[SensorData] = []
|
|
|
|
# 遍历当前帧中的每一个传感器数据
|
|
for sensor in all_sensor_data.data:
|
|
# 如果是新传感器,为其创建一个新的 deque
|
|
if sensor.pos not in self.sensor_history:
|
|
self.sensor_history[sensor.pos] = deque(maxlen=self.window_size)
|
|
|
|
# 获取该传感器的历史数据队列
|
|
history_queue = self.sensor_history[sensor.pos]
|
|
|
|
# 将新的 (x, y) 数据添加到队列末尾
|
|
history_queue.append((sensor.x, sensor.y))
|
|
|
|
# 检查队列中的数据点数量是否达到了窗口大小
|
|
if len(history_queue) == self.window_size:
|
|
# 如果达到窗口大小,则进行平滑计算
|
|
|
|
# 使用 numpy 将队列中的 x 和 y 值分别转换为数组,便于计算
|
|
# history_queue 是一个 (x,y) 元组的列表,我们用 zip(*...) 来解包
|
|
xs, ys = zip(*history_queue)
|
|
smoothed_x = np.mean(xs)
|
|
smoothed_y = np.mean(ys)
|
|
|
|
# 创建一个新的 SensorData 对象,使用平滑后的值
|
|
# 注意:desc 等其他字段保持不变
|
|
smoothed_sensor = SensorData(
|
|
pos=sensor.pos,
|
|
desc=sensor.desc,
|
|
x=smoothed_x,
|
|
y=smoothed_y
|
|
)
|
|
smoothed_sensor_list.append(smoothed_sensor)
|
|
else:
|
|
# 如果未达到窗口大小,不进行平滑,直接返回原始数据
|
|
smoothed_sensor_list.append(sensor)
|
|
|
|
# 创建并返回一个包含所有平滑后数据的新 AllSensorData 对象
|
|
return AllSensorData(
|
|
data=smoothed_sensor_list,
|
|
time=all_sensor_data.time # 时间戳保持不变
|
|
)
|