You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

149 lines
6.1 KiB

import logging
import queue
import sched
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import models.msg
from upload.RateLimiter import RateLimiter
class DataReporter(threading.Thread):
def __init__(self,data_fps:int,video_fps:int=0):
super().__init__()
self.alert_queue = queue.Queue(maxsize=1) # 数据队列
self.image_queue = queue.Queue(maxsize=video_fps) # 图片队列
self.data_queue = queue.Queue(maxsize=data_fps) # 数据队列
self.image_limiter = RateLimiter(max_rate=video_fps, time_window=1) # 弃用 图片限速: 1张/秒
self.data_limiter = RateLimiter(max_rate=data_fps, time_window=1) # 数据限速: 30条/秒
self.running = True
self.image_dropped = 0 # 统计丢弃的图片数量
self.data_dropped = 0 # 统计丢弃的数据数量
self.daemon = True
self.callbacks = [] # 存储多个回调函数
self.callback_executor = ThreadPoolExecutor(max_workers=4) # 创建线程池
self.last_data = None # 存储最后一条数据
self.last_repeat_counts = 0 # 存储最后一条数据
self.scheduler = sched.scheduler(time.time)
self.schedulerEvent = None
def register_handler(self,handler_fun):
self.callbacks.append(handler_fun)
def run(self):
interval = 1.0 / self.data_limiter.max_rate
self.schedulerEvent=self.scheduler.enter(0, 1, self._schedule_data_reporting, (interval,))
self.scheduler.run()
def _schedule_data_reporting(self,interval):
if self.running:
self.scheduler.enter(interval, 1, self._schedule_data_reporting, (interval,)) # 每 x 秒执行一次
self._report_data_if_allowed()
def _report_data_if_allowed(self):
# logging.info(f"Reporting data -> start")
if self.data_limiter.allow_request():
if not self.data_queue.empty():
try:
data = self.data_queue.get_nowait()
self.last_data = data
self.last_repeat_counts = 0
self._report_data(data, self.last_repeat_counts)
except queue.Empty:
pass
elif self.last_data is not None and self.last_repeat_counts < 5:
self.last_repeat_counts += 1
self._report_data(self.last_data, self.last_repeat_counts)
def run2(self):
while self.running:
# alert上报
if not self.alert_queue.empty():
try:
alert_data = self.alert_queue.get_nowait()
self._report_alert(alert_data)
except queue.Empty:
pass
# image上报
# if not self.image_queue.empty() and self.image_limiter.allow_request():
# try:
# image_data = self.image_queue.get_nowait()
# self._report_image(image_data)
# except queue.Empty:
# pass
# 然后处理数据上报
if self.data_limiter.allow_request():
if not self.data_queue.empty():
try:
data = self.data_queue.get_nowait()
self.last_data = data # 保存最后一条数据
self.last_repeat_counts =0
self._report_data(data,self.last_repeat_counts)
except queue.Empty:
pass
elif self.last_data is not None:
# 如果队列为空但有限速许可,并且存在最后一条数据,则重复上报最后一条数据
if self.last_repeat_counts < 5:
self.last_repeat_counts+=1
self._report_data(self.last_data, self.last_repeat_counts)
else:
# logging.warn("不允许上报")
pass
time.sleep(0.02) # 避免CPU占用过高
def _execute_callbacks(self, msg_json):
"""
执行所有注册的回调函数
"""
for callback in self.callbacks:
try:
# 提交到线程池异步执行
self.callback_executor.submit(callback, msg_json)
except Exception as e:
print(f"Error executing callback {callback.__name__}: {e}")
def _report_alert(self, data):
print(f"Reporting alert, timestamp: {data[0]}")
# 这里替换为实际的上报代码
msg = models.msg.Msg(_from="dev", cmd="alert", values=data[1])
msg_json = msg.to_json()
self._execute_callbacks(msg_json)
def _report_image(self, data):
# 实现图片上报逻辑
print(f"Reporting image, timestamp: {data[0]}")
# 这里替换为实际的上报代码
msg = models.msg.Msg(_from="dev", cmd="image", values=data[1])
msg_json = msg.to_json()
self._execute_callbacks(msg_json)
def _report_data(self, data, last_repeat_counts=0):
# 实现数据上报逻辑
logging.info(f"Reporting [{last_repeat_counts}] data: {data}")
# 实际的上报代码,数据结构转换
msg=models.msg.Msg(_from="dev",cmd="data",values=data[1])
# 重新格式化时间,数据等间隔
msg.values.time=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
msg_json=msg.to_json()
self._execute_callbacks(msg_json)
def stop(self):
self.running = False
self.join()
print(f"Stats: {self.data_dropped} data dropped")
def adjust_rate(self, new_rate, data_type='image'):
if data_type == 'image':
with self.image_limiter.lock:
self.image_limiter.max_rate = new_rate
self.image_queue= queue.Queue(maxsize=new_rate)
self.image_limiter.update_interval()
else:
with self.data_limiter.lock:
self.data_limiter.max_rate = new_rate
self.data_queue = queue.Queue(maxsize=new_rate)
self.data_limiter.update_interval()