You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
120 lines
5.3 KiB
120 lines
5.3 KiB
# smoothing_window_improved.py
|
|
import threading
|
|
from collections import deque
|
|
from typing import Dict, List, Tuple
|
|
import numpy as np
|
|
|
|
from models.sampleMsg import AllSensorData, SensorData
|
|
|
|
|
|
class AdaptiveSmoothingWindow:
|
|
"""
|
|
一个自适应的数据平滑窗口。
|
|
当数据波动较小时,使用原始数据;当波动较大时,使用移动平均值进行平滑。
|
|
"""
|
|
|
|
def __init__(self,method:str ="median", window_size: int = 11, volatility_threshold: float = 2.0,img_threshold: float = 20.0):
|
|
"""
|
|
初始化自适应平滑窗口。
|
|
|
|
:param window_size: 移动平均窗口的大小,默认为 11。
|
|
:param volatility_threshold: 波动阈值。当新数据与上一个平滑值的差异超过此值时,
|
|
将触发平滑。默认为 2.0。
|
|
"""
|
|
if window_size <= 0:
|
|
raise ValueError(f"窗口大小 'window_size={window_size}' 必须是正数。")
|
|
|
|
self.method = method
|
|
self.window_size = window_size
|
|
self.volatility_threshold = volatility_threshold
|
|
self.img_Threshold = img_threshold
|
|
# 内部存储结构: {sensor_pos: deque of (x, y) tuples}
|
|
self.sensor_history: Dict[str, deque[Tuple[float, float]]] = {}
|
|
|
|
# 存储每个传感器上一次输出的平滑值: {sensor_pos: (last_smoothed_x, last_smoothed_y)}
|
|
self._last_smoothed_values: Dict[str, Tuple[float, float]] = {}
|
|
self._lock = threading.Lock()
|
|
|
|
def resize(self,method:str ="median", window_size: int = 11, volatility_threshold: float = 2.0,img_threshold: float = 20.0):
|
|
with self._lock:
|
|
self.method = method
|
|
self.window_size = window_size
|
|
self.volatility_threshold = volatility_threshold
|
|
self.img_Threshold = img_threshold
|
|
# 内部存储结构: {sensor_pos: deque of (x, y) tuples}
|
|
self.sensor_history: Dict[str, deque[Tuple[float, float]]] = {}
|
|
|
|
# 存储每个传感器上一次输出的平滑值: {sensor_pos: (last_smoothed_x, last_smoothed_y)}
|
|
self._last_smoothed_values: Dict[str, Tuple[float, float]] = {}
|
|
|
|
def process(self, new_sensor_data: AllSensorData) -> (AllSensorData,bool):
|
|
"""
|
|
处理一帧新的传感器数据,根据波动大小自适应地应用平滑。
|
|
|
|
:param new_sensor_data: 包含所有传感器最新数据的 AllSensorData 对象。
|
|
:return: 一个新的 AllSensorData 对象,其中包含了自适应平滑后的数据。
|
|
"""
|
|
smoothed_sensor_list: List[SensorData] = []
|
|
is_abnormal=False
|
|
with self._lock:
|
|
for sensor in new_sensor_data.data:
|
|
pos = sensor.pos
|
|
|
|
# 如果是新传感器,初始化其历史队列和上一个平滑值
|
|
if pos not in self.sensor_history:
|
|
self.sensor_history[pos] = deque(maxlen=self.window_size)
|
|
# 首次出现,将当前值作为“上一个平滑值”
|
|
self._last_smoothed_values[pos] = (sensor.x, sensor.y)
|
|
|
|
history_queue = self.sensor_history[pos]
|
|
history_queue.append((sensor.x, sensor.y))
|
|
# print(f"====> {pos} 设备 滑窗数据 {len(history_queue)}/{self.window_size}")
|
|
last_x, last_y = self._last_smoothed_values[pos]
|
|
|
|
# 决定当前使用哪个值作为输出
|
|
current_output_x, current_output_y = sensor.x, sensor.y # 默认使用原始值
|
|
|
|
# 仅当窗口填满后,才考虑进行平滑
|
|
if len(history_queue) == self.window_size:
|
|
# 计算窗口内的值
|
|
xs, ys = zip(*history_queue)
|
|
# 初始化平滑值
|
|
if self.method == "median":
|
|
sma_x = float(np.median(xs))
|
|
sma_y = float(np.median(ys))
|
|
else: # 默认使用均值
|
|
sma_x = float(np.mean(xs))
|
|
sma_y = float(np.mean(ys))
|
|
|
|
# 计算新原始数据与上一个平滑值之间的波动
|
|
delta_x = abs(sensor.x - last_x)
|
|
delta_y = abs(sensor.y - last_y)
|
|
|
|
# 如果 x 或 y 的波动超过阈值,则使用平滑值
|
|
if delta_x >= self.volatility_threshold:
|
|
current_output_x = sma_x
|
|
if delta_y >= self.volatility_threshold:
|
|
current_output_y = sma_y
|
|
|
|
#巨大变化 存储图片
|
|
if delta_x >= self.img_Threshold:
|
|
is_abnormal = True
|
|
if delta_y >= self.img_Threshold:
|
|
is_abnormal = True
|
|
|
|
# 创建新的 SensorData 对象
|
|
smoothed_sensor = SensorData(
|
|
pos=pos,
|
|
desc=sensor.desc,
|
|
x=current_output_x,
|
|
y=current_output_y
|
|
)
|
|
smoothed_sensor_list.append(smoothed_sensor)
|
|
|
|
# 更新“上一个平滑值”为本次的输出值
|
|
self._last_smoothed_values[pos] = (current_output_x, current_output_y)
|
|
|
|
return AllSensorData(
|
|
data=smoothed_sensor_list,
|
|
time=new_sensor_data.time
|
|
), is_abnormal
|