Browse Source

update 更新最新版本

master
lucas 2 weeks ago
parent
commit
e2d6b86ab3
  1. 53
      app.py
  2. 59
      config.json
  3. 114
      configSer.py
  4. BIN
      images/biaoba.jpg
  5. 9
      main.py
  6. 19
      models/msg.py
  7. 43
      models/sampleMsg.py
  8. 133
      models/target.py
  9. 132
      tcp_Ser.py
  10. 79
      upload/DataReporter.py
  11. 12
      upload/DroppingQueue.py
  12. 26
      upload/RateLimiter.py
  13. 36
      utils.py
  14. 463
      videoDetection.py
  15. 60
      videoPush.py

53
app.py

@ -0,0 +1,53 @@
import sys
from time import sleep
import signal
import configSer
import tcp_Ser
import upload.DataReporter
import videoDetection
import videoPush
tcp_service = None
video_processor = None
def signal_handler(sig, frame):
global tcp_service,video_processor
print(f"收到退出信号 sig={sig},程序准备退出")
tcp_service.stop()
tcp_service.join()
video_processor.stop()
videoPush.stop()
print(f"===释放完毕,退出===")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler) # 捕获 Ctrl+C 信号
if __name__ == '__main__':
if __debug__:
print("Debug 模式")
else:
print("release 模式")
config_path = "./config.json"
# 读取配置文件
config_obj= configSer.ConfigOperate(config_path)
json_str = config_obj.config_info.to_json(indent=4)
print(f"当前配置:{json_str}")
tcp_service = tcp_Ser.TcpSer("0.0.0.0", config_obj.config_info.server.port)
tcp_service.start()
reporter = upload.DataReporter.DataReporter(data_fps=config_obj.config_info.fps.data,video_fps=config_obj.config_info.fps.video)
reporter.register_handler(tcp_service.broadcast_message)
reporter.start()
# 启动video
videoDetection.configObj=config_obj
video_processor = videoDetection.VideoProcessor(reporter)
# 添加订阅者processor
tcp_service.add_subscribe(video_processor)
# 推流
videoPush.video_server_run()
# 启动
video_processor.video_mode(config_obj.config_info.capture)
print("==over==")

59
config.json

@ -0,0 +1,59 @@
{
"server": {
"port": 2230
},
"fps": {
"data": 10,
"video": 0
},
"capture": "0",
"targets": {
"0": {
"info": {
"rectangle_area": {
"x": 1200,
"y": 1180,
"w": 130,
"h": 100
},
"threshold": {
"binary": 200,
"gauss": 3
},
"radius": 20.0,
"id": 0,
"desc": "0_up",
"base": false
},
"perspective": [
[
1.0,
0.0,
0.0
],
[
0.0,
1.0,
0.0
],
[
0.0,
0.0,
1.0
]
],
"handler_info": {
"radius_pix": 27.01,
"pix_length": 0.7404664938911514,
"center_point": {
"x": 261.5,
"y": 107.0
},
"center_init": {
"x": 261.5,
"y": 108.5
}
}
}
}
}

114
configSer.py

@ -0,0 +1,114 @@
import json
import os
from dataclasses import (
dataclass,
field
)
from typing import Dict
import numpy as np
from dataclasses_json import dataclass_json
import models.target
_file_path: str
@dataclass_json
@dataclass
class Server:
port: int = 0
@dataclass_json
@dataclass
class Fps:
data: int = 0
video: int = 0
@dataclass_json
@dataclass
class ConfigInfo:
server:Server
fps:Fps
capture: str = "0"
# 标靶配置
targets: Dict[int, models.target.CircleTarget] = field(default_factory=dict)
class ConfigOperate:
_file_path: str
config_info: ConfigInfo
def __init__(self,path:str):
self._file_path = path
self.load2obj_sample()
def load2dict(self):
""""读取配置"""
if not os.path.exists(self._file_path):
raise FileNotFoundError(f"配置文件 {self._file_path} 不存在")
with open(self._file_path) as json_file:
config = json.load(json_file)
return config
# def load2obj_sample2(self):
# """"读取配置"""
# dic=self.load2dict()
# ts = dic["targets"]
# capture = dic["capture"]
# # 获取矩阵数据
# matrix_dict = dic.get("perspective", {})
# # n0=convert_to_ndarray(self.matrix_dict["0"])
# # 将矩阵转换为字符串
# # matrix_str = np.array2string(n0, precision=8, separator=', ', suppress_small=True)
# for _,t in ts.items():
# obj = models.target.TargetInfo(**t)
# area = models.target.RectangleArea.from_dict(obj.rectangle_area)
# thres = models.target.Threshold(**obj.threshold)
# self.targets[obj.id] = models.target.CircleTarget(
# obj.id,
# obj.desc,
# area,
# obj.radius,
# thres,
# obj.base
# )
# return self.targets
def load2obj_sample(self):
dic=self.load2dict()
dict_str = json.dumps(dic)
self.config_info=ConfigInfo.from_json(dict_str)
def save2json_file(self):
json_str = self.config_info.to_json(indent=4)
""""更新配置"""
with open(self._file_path, 'w') as json_file:
json_file.write(json_str)
# json.dump(self, json_file, indent=4)
return None
def save_dict_config(self, dict_data:Dict):
""""更新配置"""
with open(self._file_path, 'w') as json_file:
json.dump(dict_data, json_file, indent=4)
return None
def update_dict_config(self, updates):
"""
更新配置文件中的特定字段
:param file_path: 配置文件路径
:param updates: 包含更新内容的字典
"""
config_dict = self.load2dict()
config_dict.update(updates)
self.save_dict_config(config_dict)
def convert_to_ndarray(matrix_data):
"""
JSON 中的矩阵数据转换为 numpy ndarray
:param matrix_data: JSON 中的矩阵数据列表形式
:return: numpy ndarray
"""
return np.array(matrix_data, dtype=np.float64)

BIN
images/biaoba.jpg

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.1 KiB

9
main.py

@ -0,0 +1,9 @@
def print_hi(name):
# 在下面的代码行中使用断点来调试脚本。
print(f'Hi, {name}') # 按 Ctrl+F8 切换断点。
# 按装订区域中的绿色按钮以运行脚本。
if __name__ == '__main__':
print_hi('PyCharm')

19
models/msg.py

@ -0,0 +1,19 @@
import json
import typing
from dataclasses import dataclass
from dataclasses_json import dataclass_json
@dataclass_json
@dataclass
class Msg:
_from: str
cmd: str
values: typing.Any
def to_json_(self) -> str:
"""将数据类序列化为 JSON 字符串"""
return self.to_json()

43
models/sampleMsg.py

@ -0,0 +1,43 @@
import json
from dataclasses import dataclass
from typing import List
@dataclass
class SensorImg:
base64: str
def to_json(self) -> str:
"""将数据类序列化为 JSON 字符串"""
return json.dumps(self.__dict__, indent=4, default=lambda x: x.__dict__)
@classmethod
def from_json(cls, json_str: str) -> 'SensorImg':
"""从 JSON 字符串反序列化为数据类"""
data_dict = json.loads(json_str)
return cls(**data_dict)
@dataclass
class SensorData:
pos: str
x: float
y: float
def to_json(self) -> str:
"""将数据类序列化为 JSON 字符串"""
return json.dumps(self.__dict__, indent=4, default=lambda x: x.__dict__)
@classmethod
def from_json(cls, json_str: str) -> 'SensorData':
"""从 JSON 字符串反序列化为数据类"""
data_dict = json.loads(json_str)
return cls(**data_dict)
@dataclass
class AllSensorData:
data: List[SensorData]
time: str
@dataclass
class AllImg:
image: SensorImg
time: str

133
models/target.py

@ -0,0 +1,133 @@
from dataclasses import dataclass, field
from numbers import Number
from typing import Optional, Dict, Deque
from collections import deque
import numpy as np
from dataclasses_json import dataclass_json, config
import utils
@dataclass_json
@dataclass
class Point:
x:float
y:float
def __iter__(self): # 使对象可迭代,可直接转为元组
yield self.x
yield self.y
@dataclass
class RectangleArea:
x: int
y: int
w: int
h: int
@classmethod
def from_dict(cls, data: dict):
return cls(
x=data['x'],
y=data['y'],
w=data['w'],
h = data['h'])
@dataclass
class Threshold:
binary: int
gauss: int
@dataclass_json
@dataclass
class TargetInfo:
# 标靶方形区域
rectangle_area:RectangleArea
threshold:Threshold
# 标靶物理半径
radius:float=0.0
id:int =-1
desc:str=""
base:bool=False
def __init__(self,id,desc,rectangle_area:RectangleArea,radius,threshold:Threshold,base:bool,**kwargs):
self.id = id
self.desc = desc
self.rectangle_area=rectangle_area
self.radius=radius
self.threshold=threshold
self.base=base
@classmethod
def from_dict(cls,data: dict):
return cls(data['id'],data['rectangle_area'],data['radius'])
@dataclass_json
@dataclass
class HandlerInfo:
# 初始话
is_init=True
radius_pix:float= 1.0
pix_length:float=0.0
# 标靶中心
center_point_queue:Deque[Point] = field(default_factory=lambda: deque(maxlen=10))
center_point: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None))
center_init : Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None))
# 标靶位移(像素)
displacement_pix: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None))
displacement_phy: Optional[Point]= field(default=None, metadata=config(exclude=lambda x: x is None))
def calculate_mean(self)-> Point:
"""计算队列中所有数据的均值"""
if self.center_point_queue:
length=len(self.center_point_queue)
mean_x = sum(p.x for p in self.center_point_queue) / length
mean_y = sum(p.y for p in self.center_point_queue) / length
return Point(mean_x, mean_y)
else:
return None
def enqueue_center_point(self, data) -> Point:
"""入队操作"""
self.center_point_queue.append(data)
return self.calculate_mean()
@dataclass_json
@dataclass
class CircleTarget:
# 标靶方形区域
info:TargetInfo
# 标靶透视矩阵
perspective: np.ndarray = field(
metadata=config(
encoder=utils.encode_perspective,
decoder=utils.decode_perspective
)
)
handler_info: Optional[HandlerInfo]=None
# def __init__(self,info:TargetInfo,center_point,center_init,displacement_pix,displacement_phy):
# self.info=info
# self.center_point=center_point
# self.center_init=center_init
# self. displacement_pix=displacement_pix
# self.displacement_phy=displacement_phy
@classmethod
def init_by_info(cls,t:TargetInfo):
return CircleTarget(t,None,None,None,None)
def circle_displacement_pix(self):
previous = self.handler_info.center_init
if previous != ():
self.handler_info.displacement_pix = Point(self.handler_info.center_point.x - previous.x,
self.handler_info.center_point.y - previous.y)
return self
def circle_displacement_phy(self):
if self.info.radius != 0 and self.handler_info.displacement_pix is not None:
# 单位像素->距离
self.handler_info.pix_length = self.info.radius / self.handler_info.radius_pix
offset_x = round(float(self.handler_info.displacement_pix.x * self.handler_info.pix_length), 5)
offset_y = round(float(self.handler_info.displacement_pix.y * self.handler_info.pix_length), 5)
self.handler_info.displacement_phy = Point(offset_x, offset_y)
return self

132
tcp_Ser.py

@ -0,0 +1,132 @@
import logging
import socket
import threading
from models.msg import Msg
class TcpSer(threading.Thread):
# 定义服务器地址和端口
HOST = '127.0.0.1'
PORT = 2230
def __init__(self,host,port):
super().__init__()
self.HOST=host
self.PORT=port
self.connected_clients=[]
# 消费者
self.consumers=[]
self.lock = threading.Lock()
self.running = True
# self.daemon = True
# 处理客户端连接的函数
def handle_client(self,client_socket):
try:
# 保持连接,直到客户端断开
while self.running:
# 接收客户端数据(如果需要)
data = client_socket.recv(4096)
msg_str=data.decode('utf-8')
if not data:
break # 如果没有数据,退出循环
print(f"{client_socket.getpeername()} 收到: {msg_str}")
# 反序列化为 实例
s_cmd = Msg.from_json(msg_str)
valid_msg:bool=True
match s_cmd.cmd:
case "getPoints" | "setPoints":
self.on_data(s_cmd,valid_msg)
case "videoFps"| "dataFps":
self.on_data(s_cmd,valid_msg)
case "setCap":
self.on_data(s_cmd,valid_msg)
case "videoMode":
self.on_data(s_cmd,valid_msg)
# todo 添加处理
case _:
valid_msg = False
err_msg=f"valid cmd={s_cmd.cmd}"
resp=f"""{{"_from": "dev","cmd": "{s_cmd.cmd}","values": {{"operate": false,"err": "{err_msg}"}}}}"""
client_socket.sendall(resp.encode())
print("非法命令",resp)
print("通讯完成")
except ConnectionAbortedError:
print("客户端已断开")
except Exception as e:
print(f"处理客户端时出错: {e}")
finally:
pass
# 注册的消费者必须携带on_data 方法
def add_subscribe(self,consumer):
if hasattr(consumer, 'on_data'):
print(f"加入 consumer {consumer} ")
self.consumers.append(consumer)
else:
print("consumer 缺少on_data函数,订阅无效 ")
def on_data(self,msg,valid):
for consumer in self.consumers:
try:
resp=consumer.on_data(msg)
self.broadcast_message(resp)
except Exception as e:
logging.warn("通讯异常",e)
# 广播消息给所有连接的客户端
def broadcast_message(self,message:str):
with self.lock:
if len(message)==0:
return
message+="\n\n"
for client in self.connected_clients:
try:
client.sendall(message.encode())
except Exception as e:
print(f"向客户端发送消息时出错: {e}")
# 如果发送失败,从列表中移除客户端
if client in self.connected_clients:
self.connected_clients.remove(client)
client.close()
def run(self):
# 创建服务器套接字
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((self.HOST,self.PORT))
server_socket.listen()
server_socket.settimeout(1.0) # 设置 accept 超时时间为 1 秒
print(f"服务器监听在 {self.HOST}:{self.PORT}...")
try:
# 保持服务器运行并接受新的连接
while self.running:
try:
client_socket, addr = server_socket.accept()
print(f"连接来自 {addr}")
# 当客户端连接时,将其添加到列表中
self.connected_clients.append(client_socket)
# 为每个客户端启动一个线程
client_thread = threading.Thread(target=self.handle_client, args=(client_socket,))
# client_thread.daemon = True # 守护线程,服务器关闭时自动结束
client_thread.start()
except socket.timeout:
continue # 超时继续循环
except KeyboardInterrupt:
print("服务器关闭...")
finally:
# 关闭所有客户端连接
for client in self.connected_clients:
print(f"断开客户端 {client_socket.getpeername()}")
client.close()
server_socket.close()
def stop(self):
"""外部调用此方法停止服务"""
self.running = False
if __name__ == '__main__':
tcp=TcpSer("127.0.0.1",2230)
tcp.run()

79
upload/DataReporter.py

@ -0,0 +1,79 @@
import queue
import threading
import time
from datetime import datetime
import models.msg
from upload.RateLimiter import RateLimiter
class DataReporter(threading.Thread):
call_back=None
def __init__(self,data_fps:int,video_fps:int):
super().__init__()
self.image_queue = queue.Queue(maxsize=video_fps) # 图片队列
self.data_queue = queue.Queue(maxsize=data_fps) # 数据队列
self.image_limiter = RateLimiter(max_rate=video_fps, time_window=1) # 图片限速: 5张/秒
self.data_limiter = RateLimiter(max_rate=data_fps, time_window=1) # 数据限速: 20条/秒
self.running = True
self.image_dropped = 0 # 统计丢弃的图片数量
self.data_dropped = 0 # 统计丢弃的数据数量
self.daemon = True
def register_handler(self,handler_fun):
self.call_back = handler_fun
def run(self):
while self.running:
# 优先处理图片上报
if not self.image_queue.empty() and self.image_limiter.allow_request():
try:
image_data = self.image_queue.get_nowait()
self._report_image(image_data)
except queue.Empty:
pass
# 然后处理数据上报
if not self.data_queue.empty() and self.data_limiter.allow_request():
try:
data = self.data_queue.get_nowait()
self._report_data(data)
except queue.Empty:
pass
time.sleep(0.02) # 避免CPU占用过高
def _report_image(self, data):
# 实现图片上报逻辑
print(f"Reporting image, timestamp: {data[0]}")
# 这里替换为实际的上报代码
msg = models.msg.Msg(_from="dev", cmd="image", values=data[1])
msg_json = msg.to_json()
self.call_back(msg_json)
def _report_data(self, data):
# 实现数据上报逻辑
print(f"Reporting data: {data}")
# 实际的上报代码,数据结构转换
msg=models.msg.Msg(_from="dev",cmd="data",values=data[1])
# 重新格式化时间,数据等间隔
msg.values.time=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
msg_json=msg.to_json()
self.call_back(msg_json)
def stop(self):
self.running = False
self.join()
print(f"Stats: {self.image_dropped} images dropped, {self.data_dropped} data dropped")
def adjust_rate(self, new_rate, data_type='image'):
if data_type == 'image':
with self.image_limiter.lock:
self.image_limiter.max_rate = new_rate
self.image_queue= queue.Queue(maxsize=new_rate)
self.image_limiter.update_interval()
else:
with self.data_limiter.lock:
self.data_limiter.max_rate = new_rate
self.data_queue = queue.Queue(maxsize=new_rate)
self.data_limiter.update_interval()

12
upload/DroppingQueue.py

@ -0,0 +1,12 @@
import queue
class DroppingQueue(queue.Queue):
"""自定义队列,满时自动丢弃最旧的数据"""
def put(self, item, block=False, timeout=None):
try:
return super().put(item, block=block, timeout=timeout)
except queue.Full:
# 队列满时丢弃最旧的一个数据
self.get_nowait()
return super().put(item, block=False)

26
upload/RateLimiter.py

@ -0,0 +1,26 @@
import threading
import queue
import time
from collections import deque
class RateLimiter:
def __init__(self, max_rate, time_window):
self.max_rate = max_rate
self.time_window = time_window
self.lock = threading.Lock()
self.current_time = time.time()
self.interval = time_window / max_rate if max_rate > 0 else 0
self.next_available_time = self.current_time
def update_interval(self):
self.interval = self.time_window / self.max_rate if self.max_rate > 0 else 0
def allow_request(self):
with self.lock:
if self.interval<=0:
return False
current_time = time.time()
if current_time >= self.next_available_time:
self.next_available_time = current_time + self.interval
return True
return False

36
utils.py

@ -0,0 +1,36 @@
from typing import Dict, List
import cv2
import base64
import numpy as np
def frame_to_img(frame, format="JPEG"):
"""将 OpenCV 读取的图片帧转换为 img"""
# 将图片帧编码为 JPEG 或 PNG 格式
if format.upper() == "JPEG":
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 80] # JPEG 压缩质量
elif format.upper() == "PNG":
encode_param = [int(cv2.IMWRITE_PNG_COMPRESSION), 9] # PNG 压缩级别
else:
raise ValueError("Unsupported format. Use 'JPEG' or 'PNG'.")
result, encoded_img = cv2.imencode(f".{format.lower()}", frame, encode_param)
return result,encoded_img
def frame_to_base64(frame, format="JPEG"):
"""将 OpenCV 读取的图片帧转换为 Base64 编码的字符串"""
result, encoded_img = frame_to_img(frame, format=format)
if not result:
raise ValueError("Failed to encode frame.")
# 将编码后的字节流转换为 Base64 字符串
base64_string = base64.b64encode(encoded_img).decode("utf-8")
return base64_string
# 自定义编码器和解码器
def encode_perspective(value: np.ndarray) -> List[List[float]]:
return value.tolist()
def decode_perspective(value: List[List[float]]) -> np.ndarray:
return np.array(value)

463
videoDetection.py

@ -0,0 +1,463 @@
from datetime import datetime
import json
import queue
import time
from time import sleep
import cv2
import numpy as np
import logging
import configSer
import models.target
import models.sampleMsg
import upload.DataReporter
import utils
import videoPush
from models.msg import Msg
logging.basicConfig(level=logging.DEBUG)
drawing: bool = False # 是否正在绘制
is_video_mode: bool = False # 是否采用标靶图覆盖
# 定义点
start_point: models.target.Point
end_point: models.target.Point
# 配置
configObj:configSer.ConfigOperate
# 鼠标回调函数
def add_rectangle(event, x, y, flags, param):
global start_point, end_point, drawing
if event == cv2.EVENT_LBUTTONDOWN: # 左键按下
logging.info("左键按下")
start_point = models.target.Point(x, y)
end_point = start_point
drawing = True
elif event == cv2.EVENT_MOUSEMOVE: # 鼠标移动
if drawing:
end_point = models.target.Point(x, y)
elif event == cv2.EVENT_LBUTTONUP: # 左键抬起
logging.info("左键抬起")
drawing = False
end_point = models.target.Point(x, y)
if start_point == end_point:
return
distance = cv2.norm(tuple(start_point), tuple(end_point), cv2.NORM_L2)
if distance < 20:
logging.info("距离小于20,无效区域")
return
target_id = len(configObj.config_info.targets)
# 圆标靶半径 mm
radius = 20.0
area=models.target.RectangleArea(int(start_point.x),int(start_point.y),
int(end_point.x-start_point.x),int(end_point.y-start_point.y))
t_info=models.target.TargetInfo( target_id,
"test add",
area,
radius,
models.target.Threshold(190,9),
False)
new_target = models.target.CircleTarget(t_info,None,None)
logging.info(f"新增区域[{target_id}] => {start_point, end_point}")
configObj.config_info.targets[target_id] = new_target
def read_target_rectangle():
return configObj.config_info.targets
class VideoProcessor:
reporter: upload.DataReporter.DataReporter
capture: cv2.VideoCapture
is_opened: bool= False
is_running = True
def __init__(self, reporter:upload.DataReporter.DataReporter):
self.reporter = reporter
def on_data(self,msg:Msg):
global configObj,is_video_mode
logging.info(f"msg={msg}")
match msg.cmd:
case "getPoints":
targets=configObj.config_info.targets.copy()
for k,v in targets.items():
targets[k].handler_info=None
resp_msg = models.msg.Msg(_from="dev", cmd="getPoints", values={"targets": targets})
resp_json = resp_msg.to_json_()
return resp_json
case "setPoints":
v=msg.values
ts=v["targets"]
# 清空原配置
configObj.config_info.targets={}
for _,t in ts.items():
t_str=json.dumps(t)
new_c_target = models.target.CircleTarget.from_json(t_str)
configObj.config_info.targets[new_c_target.info.id] =new_c_target
configObj.save2json_file()
resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True})
resp_json = resp_msg.to_json()
return resp_json
case "videoFps":
v = msg.values
fps = v["fps"]
self.reporter.adjust_rate(fps,"image")
configObj.config_info.fps.video = fps
configObj.save2json_file()
resp_msg = models.msg.Msg(_from="dev", cmd="videoFps", values={"operate": True})
resp_json = resp_msg.to_json()
return resp_json
case "dataFps":
v = msg.values
fps = v["fps"]
self.reporter.adjust_rate(fps,"data")
configObj.config_info.fps.data=fps
configObj.save2json_file()
resp_msg = models.msg.Msg(_from="dev", cmd="dataFps", values={"operate": True})
resp_json = resp_msg.to_json()
return resp_json
case "setCap":
v = msg.values
cap = v["cap"]
self.switch_video(cap)
resp_msg = models.msg.Msg(_from="dev", cmd="setCap", values={"operate": True})
resp_json = resp_msg.to_json()
return resp_json
case "videoMode":
v = msg.values
is_debug = v["debug"]
is_video_mode=is_debug
resp_msg = models.msg.Msg(_from="dev", cmd="videoMode", values={"operate": True})
resp_json = resp_msg.to_json()
return resp_json
print("==")
def pre_handler_img(self,gray_frame,now_str:str):
# 将灰度图压缩为 JPEG 格式,并存储到内存缓冲区
img_base64 = utils.frame_to_base64(gray_frame, format="JPEG")
all_img = models.sampleMsg.AllImg(image=img_base64, time=now_str)
self.enqueue_image(all_img)
def draw_rectangle(self,img):
global configObj,is_video_mode
gray_frame = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
#图像发送
if configObj.config_info.fps.video > 0:
self.pre_handler_img(gray_frame,now_str)
if len(configObj.config_info.targets)==0: return
# 基点
base_point_pix:models.target.Point=None
#上报有新数据的点
all_upload_data = models.sampleMsg.AllSensorData(data=[], time=now_str)
# 绘图-历史点
for i, tr in configObj.config_info.targets.items():
if not hasattr(tr, "info"):
print("====")
_start_point = models.target.Point(tr.info.rectangle_area.x, tr.info.rectangle_area.y)
_end_point = models.target.Point(
tr.info.rectangle_area.x+tr.info.rectangle_area.w,
tr.info.rectangle_area.y+tr.info.rectangle_area.h)
#绘制标靶区域
blue_color = (255, 0, 0)
if tr.info.base:
blue_color=(200, 0, 200) #紫红色 基点
cv2.rectangle(img,tuple(_start_point), tuple(_end_point), blue_color, 2)
label=f"{tr.info.desc},r={tr.info.radius}"
cv2.putText(img, label, (_start_point.x,_start_point.y-6), cv2.FONT_HERSHEY_SIMPLEX, 0.6, blue_color,1)
#检测
# 获取图像尺寸
frame_height, frame_width = gray_frame.shape[:2]
if _end_point.x>frame_width or _end_point.y>frame_height:
print(f"标靶[{tr.info.desc}]sub_image 超出区域")
continue
sub_image = self.extract_sub_image(gray_frame, _start_point, _end_point)
ret, sub_binary_frame = cv2.threshold(sub_image, tr.info.threshold.binary, 255, cv2.THRESH_BINARY)
# 高斯滤波
gauss_size=tr.info.threshold.gauss
sub_binary_frame = cv2.GaussianBlur(sub_binary_frame, (gauss_size, gauss_size), sigmaX=0,sigmaY=0,borderType=cv2.BORDER_REPLICATE)
# sub_binary_frame = cv2.bilateralFilter(sub_binary_frame, 5, 50, 50)
if tr.perspective is not None:
# 宽度
sub_width = sub_binary_frame.shape[1]
# 高度
sub_height = sub_binary_frame.shape[0]
sub_binary_frame = cv2.warpPerspective(sub_binary_frame, tr.perspective, (sub_width, sub_height))
# 覆盖原图
if is_video_mode:
sub_c_img= cv2.cvtColor(sub_binary_frame, cv2.COLOR_GRAY2BGR)
self.cover_sub_image(img,sub_c_img, _start_point, _end_point)
if __debug__:
cv2.imshow(f'{tr.info.id}_binaryImg', sub_binary_frame)
circles = self.circle2_detect(sub_binary_frame)
if len(circles) == 0:
continue
center,radius_pix=self.circle_match(circles,_start_point)
# 纪录圆心位置
if tr.handler_info is None:
tr.handler_info= models.target.HandlerInfo()
if tr.handler_info.is_init:
tr.handler_info.is_init=False
tr.handler_info.center_init = center
# 数据平滑处理
smooth_center = tr.handler_info.enqueue_center_point(center)
# print(f"{tr.info.desc},平滑len={len(tr.handler_info.center_point_queue)},平滑中心点={smooth_center},原始点={center}")
tr.handler_info.center_point=smooth_center
# 原始处理 tr.handler_info.center_point=center
tr.handler_info.radius_pix=radius_pix
tr.circle_displacement_pix()
# 基点
if tr.info.base:
base_point_pix=tr.handler_info.displacement_pix
# 画圆
self.circle_show(img,smooth_center,radius_pix,_start_point,_end_point)
# 基于像素点计算 物理距离
for i, tr in configObj.config_info.targets.items():
if tr.handler_info is None:
continue
if tr.handler_info.displacement_pix is None:
continue
# 减去基点偏移
if base_point_pix is not None:
raw_point=tr.handler_info.displacement_pix
tr.handler_info.displacement_pix=models.target.Point(
x=raw_point.x-base_point_pix.x,
y=raw_point.y-base_point_pix.y)
if not tr.info.base:
# print(f"[{tr.info.id}]{tr.info.desc} 原偏 {raw_point} - 基偏{base_point_pix} ={tr.handler_info.displacement_pix}")
pass
tr.circle_displacement_phy()
if tr.handler_info.displacement_phy is not None:
all_upload_data.data.append(
models.sampleMsg.SensorData(
str(tr.info.id),
tr.handler_info.displacement_phy.x,
tr.handler_info.displacement_phy.y)
)
tr.handler_info.displacement_pix=None
tr.handler_info.displacement_phy = None
#过滤无效空数据
if len(all_upload_data.data)==0:
return
if configObj.config_info.fps.data > 0:
self.enqueue_data(all_upload_data)
def circle_match(self,circles, rect_s_point:models.target.Point)-> (models.target.Point,float):
circle = max(circles, key=lambda c: c[2])
# 绘制圆心
center = (circle[0] + rect_s_point.x, circle[1] + rect_s_point.y)
radius = float(np.round(circle[2], 3))
cp = models.target.Point(x=center[0], y=center[1])
return cp,radius
def circle_show(self, img, center: models.target.Point,radius:float, rect_s_point: models.target.Point,rect_e_point: models.target.Point):
font = cv2.FONT_HERSHEY_SIMPLEX
color = (255, 0, 0) # 蓝色
scale = 0.5
center_int = tuple(int(x) for x in center)
radius_int = int(radius)
cv2.circle(img, center_int, 2, (0, 255, 0), 4)
# 绘制外圆
cv2.circle(img, center_int, radius_int, (0, 0, 255), 1)
# 打印圆心坐标
text1 = f"c:{(center.x,center.y,radius)}"
txt_location = (rect_s_point.x+2,rect_e_point.y-2)
# txt_location = (center_int[0] - radius_int, center_int[1] + radius_int + 10)
cv2.putText(img, text1, txt_location, font, scale, color, 1)
def circle2_detect(self,img):
# 圆心距 canny阈值 最小半径 最大半径
circles_float = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 1, 30, param1=200, param2=0.8, minRadius=15,
maxRadius=0)
# 创建一个0行, 2列的空数组
if circles_float is not None:
# 提取圆心坐标(保留2位小数)
centers = [(round(float(x),2), round(float(y),2), round(float(r),2)) for x, y, r in circles_float[0, :]]
return centers
else:
return []
def extract_sub_image(self,frame, top_left, bottom_right):
"""
从帧中截取子区域
:param frame: 输入的视频帧
:param top_left: 子图片的左上角坐标 (x1, y1)
:param bottom_right: 子图片的右下角坐标 (x2, y2)
:return: 截取的子图片
"""
x1, y1 = top_left
x2, y2 = bottom_right
return frame[y1:y2, x1:x2]
def cover_sub_image(self,frame,sub_frame, top_left, bottom_right):
x1, y1 = top_left
x2, y2 = bottom_right
frame[y1:y2, x1:x2]= sub_frame
return frame
def open_video(self,video_id):
sleep(1)
print(f"打开摄像头 -> {video_id}")
self.capture = cv2.VideoCapture(video_id)
frame_width = int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"默认分辨率= {frame_width}*{frame_height}")
logging.info(f"{video_id}地址->{self.capture}")
if not self.capture.isOpened():
self.capture.release()
logging.info(f"无法打开摄像头{video_id}, release地址 -> {self.capture}")
return
fps = self.capture.get(cv2.CAP_PROP_FPS)
print(f"fps={fps},video_id={video_id},")
# self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, 1600) # 宽度
# self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 900) # 高度
self.is_opened=True
def switch_video(self,video_id:str):
print(f"切换摄像头 -> {video_id}")
self.is_opened = False
self.capture.release()
cv2.destroyAllWindows()
if str.isdigit(video_id):
video_id=int(video_id)
self.open_video(video_id)
def show_video(self):
global sigExit,start_point, end_point, drawing
if __debug__:
cv2.namedWindow('Frame')
cv2.setMouseCallback('Frame', add_rectangle)
# 读取一帧图像
while self.is_running:
if not self.is_opened:
print(f"摄像头 标记is_opened={self.is_opened}")
sleep(5)
continue
ret, frame = self.capture.read()
if ret:
self.frame_handle(frame)
else:
logging.info(f"无法读取帧,cap地址- >{self.capture}")
sleep(1)
# self.capture.release()
# self.capture= cv2.VideoCapture(0) # 再次尝试
cv2.waitKey(1)
# 显示图像
if frame is not None:
if __debug__:
cv2.imshow('Frame', frame)
#缓存到推流
videoPush.update_latest_frame(frame)
print("退出VideoProcessor")
def show_image(self,frame):
global start_point, end_point, drawing
if __debug__:
cv2.namedWindow('Frame')
cv2.setMouseCallback('Frame', add_rectangle)
# 读取一帧图像
while True:
cp_img=frame.copy()
self.frame_handle(cp_img)
if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环
break
cv2.destroyAllWindows()
def frame_handle(self,frame):
# 绘图-历史点
self.draw_rectangle(frame)
# 绘图-实时
if drawing:
cv2.rectangle(frame, tuple(start_point), tuple(end_point), (0, 200, 200), 4)
# print(f"鼠标位置 {start_point} -> {end_point}")
def image_mode(self):
img_raw=cv2.imread('images/trans/_4point.jpg')#images/target/rp80max3.jpg
# img_raw = cv2.imread('images/trans/_4point.jpg') # images/target/rp80max3.jpg
# img_raw = cv2.imread('images/target/rp80.jpg') # images/target/rp80max3.jpg
self.show_image(img_raw)
# 支持
def video_mode(self,video_id:str):
if str.isdigit(video_id):
video_id=int(video_id)
self.open_video(video_id)
self.show_video()
# 释放摄像头资源并关闭所有窗口
print("退出 video")
self.capture.release()
cv2.destroyAllWindows()
def rtsp_mode(self,rtsp_url:str):
# rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554"
# rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554/h264/ch1/main/av_stream"
self.open_video(rtsp_url)
fps = self.capture.get(cv2.CAP_PROP_FPS)
print(f"rtsp fps={fps}")
self.show_video()
# 释放摄像头资源并关闭所有窗口
self.capture.release()
cv2.destroyAllWindows()
def enqueue_data(self,data):
# 获取当前时间戳
timestamp = time.time()
# 将时间戳转换为 datetime 对象
dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位
# 放入图片队列(自动丢弃最旧数据当队列满时)
try:
self.reporter.data_queue.put((dt, data), block=False)
except queue.Full:
# self.reporter.data_dropped += 1
pass
def enqueue_image(self,data):
# 获取当前时间戳
timestamp = time.time()
# 将时间戳转换为 datetime 对象
dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位
# 放入图片队列(自动丢弃最旧数据当队列满时)
try:
self.reporter.image_queue.put((dt, data), block=False)
except queue.Full:
pass
#self.reporter.image_dropped += 1
def stop(self):
self.is_running=False
self.capture.release()

60
videoPush.py

@ -0,0 +1,60 @@
from time import sleep
import cv2
import threading
import time
import numpy as np
import requests
from flask import Flask, Response, render_template_string, request
import utils
app_flask = Flask(__name__)
# 全局变量,用于存储最新的帧
latest_frame:np.ndarray = None
lock = threading.Lock()
is_running = True
def update_latest_frame(n_bytes:np.ndarray):
global latest_frame
latest_frame=n_bytes
def generate_mjpeg():
"""生成MJPEG格式的视频流"""
while is_running:
with lock:
if latest_frame is None:
continue
_,latest_frame_bytes = utils.frame_to_img(latest_frame)
frame = latest_frame_bytes.tobytes()
pass
sleep(0.1)
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
@app_flask.route('/video_flow')
def video_feed():
"""视频流路由"""
return Response(
generate_mjpeg(),
mimetype='multipart/x-mixed-replace; boundary=frame'
)
def run():
port=2240
print(f"推流服务启动,访问端口 127.0.0.1:{port}/video_flow")
app_flask.run(host='0.0.0.0', port=port, threaded=True)
def video_server_run():
thread = threading.Thread(target=run)
thread.daemon = True # 设置为守护线程,主线程退出时自动终止
thread.start()
def stop():
global is_running
is_running=False
if __name__ == '__main__':
run()
Loading…
Cancel
Save