You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

463 lines
18 KiB

from datetime import datetime
import json
import queue
import time
from time import sleep
import cv2
import numpy as np
import logging
import configSer
import models.target
import models.sampleMsg
import upload.DataReporter
import utils
import videoPush
from models.msg import Msg
logging.basicConfig(level=logging.DEBUG)
drawing: bool = False # 是否正在绘制
is_video_mode: bool = False # 是否采用标靶图覆盖
# 定义点
start_point: models.target.Point
end_point: models.target.Point
# 配置
configObj:configSer.ConfigOperate
# 鼠标回调函数
def add_rectangle(event, x, y, flags, param):
global start_point, end_point, drawing
if event == cv2.EVENT_LBUTTONDOWN: # 左键按下
logging.info("左键按下")
start_point = models.target.Point(x, y)
end_point = start_point
drawing = True
elif event == cv2.EVENT_MOUSEMOVE: # 鼠标移动
if drawing:
end_point = models.target.Point(x, y)
elif event == cv2.EVENT_LBUTTONUP: # 左键抬起
logging.info("左键抬起")
drawing = False
end_point = models.target.Point(x, y)
if start_point == end_point:
return
distance = cv2.norm(tuple(start_point), tuple(end_point), cv2.NORM_L2)
if distance < 20:
logging.info("距离小于20,无效区域")
return
target_id = len(configObj.config_info.targets)
# 圆标靶半径 mm
radius = 20.0
area=models.target.RectangleArea(int(start_point.x),int(start_point.y),
int(end_point.x-start_point.x),int(end_point.y-start_point.y))
t_info=models.target.TargetInfo( target_id,
"test add",
area,
radius,
models.target.Threshold(190,9),
False)
new_target = models.target.CircleTarget(t_info,None,None)
logging.info(f"新增区域[{target_id}] => {start_point, end_point}")
configObj.config_info.targets[target_id] = new_target
def read_target_rectangle():
return configObj.config_info.targets
class VideoProcessor:
reporter: upload.DataReporter.DataReporter
capture: cv2.VideoCapture
is_opened: bool= False
is_running = True
def __init__(self, reporter:upload.DataReporter.DataReporter):
self.reporter = reporter
def on_data(self,msg:Msg):
global configObj,is_video_mode
logging.info(f"msg={msg}")
match msg.cmd:
case "getPoints":
targets=configObj.config_info.targets.copy()
for k,v in targets.items():
targets[k].handler_info=None
resp_msg = models.msg.Msg(_from="dev", cmd="getPoints", values={"targets": targets})
resp_json = resp_msg.to_json_()
return resp_json
case "setPoints":
v=msg.values
ts=v["targets"]
# 清空原配置
configObj.config_info.targets={}
for _,t in ts.items():
t_str=json.dumps(t)
new_c_target = models.target.CircleTarget.from_json(t_str)
configObj.config_info.targets[new_c_target.info.id] =new_c_target
configObj.save2json_file()
resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True})
resp_json = resp_msg.to_json()
return resp_json
case "videoFps":
v = msg.values
fps = v["fps"]
self.reporter.adjust_rate(fps,"image")
configObj.config_info.fps.video = fps
configObj.save2json_file()
resp_msg = models.msg.Msg(_from="dev", cmd="videoFps", values={"operate": True})
resp_json = resp_msg.to_json()
return resp_json
case "dataFps":
v = msg.values
fps = v["fps"]
self.reporter.adjust_rate(fps,"data")
configObj.config_info.fps.data=fps
configObj.save2json_file()
resp_msg = models.msg.Msg(_from="dev", cmd="dataFps", values={"operate": True})
resp_json = resp_msg.to_json()
return resp_json
case "setCap":
v = msg.values
cap = v["cap"]
self.switch_video(cap)
resp_msg = models.msg.Msg(_from="dev", cmd="setCap", values={"operate": True})
resp_json = resp_msg.to_json()
return resp_json
case "videoMode":
v = msg.values
is_debug = v["debug"]
is_video_mode=is_debug
resp_msg = models.msg.Msg(_from="dev", cmd="videoMode", values={"operate": True})
resp_json = resp_msg.to_json()
return resp_json
print("==")
def pre_handler_img(self,gray_frame,now_str:str):
# 将灰度图压缩为 JPEG 格式,并存储到内存缓冲区
img_base64 = utils.frame_to_base64(gray_frame, format="JPEG")
all_img = models.sampleMsg.AllImg(image=img_base64, time=now_str)
self.enqueue_image(all_img)
def draw_rectangle(self,img):
global configObj,is_video_mode
gray_frame = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
#图像发送
if configObj.config_info.fps.video > 0:
self.pre_handler_img(gray_frame,now_str)
if len(configObj.config_info.targets)==0: return
# 基点
base_point_pix:models.target.Point=None
#上报有新数据的点
all_upload_data = models.sampleMsg.AllSensorData(data=[], time=now_str)
# 绘图-历史点
for i, tr in configObj.config_info.targets.items():
if not hasattr(tr, "info"):
print("====")
_start_point = models.target.Point(tr.info.rectangle_area.x, tr.info.rectangle_area.y)
_end_point = models.target.Point(
tr.info.rectangle_area.x+tr.info.rectangle_area.w,
tr.info.rectangle_area.y+tr.info.rectangle_area.h)
#绘制标靶区域
blue_color = (255, 0, 0)
if tr.info.base:
blue_color=(200, 0, 200) #紫红色 基点
cv2.rectangle(img,tuple(_start_point), tuple(_end_point), blue_color, 2)
label=f"{tr.info.desc},r={tr.info.radius}"
cv2.putText(img, label, (_start_point.x,_start_point.y-6), cv2.FONT_HERSHEY_SIMPLEX, 0.6, blue_color,1)
#检测
# 获取图像尺寸
frame_height, frame_width = gray_frame.shape[:2]
if _end_point.x>frame_width or _end_point.y>frame_height:
print(f"标靶[{tr.info.desc}]sub_image 超出区域")
continue
sub_image = self.extract_sub_image(gray_frame, _start_point, _end_point)
ret, sub_binary_frame = cv2.threshold(sub_image, tr.info.threshold.binary, 255, cv2.THRESH_BINARY)
# 高斯滤波
gauss_size=tr.info.threshold.gauss
sub_binary_frame = cv2.GaussianBlur(sub_binary_frame, (gauss_size, gauss_size), sigmaX=0,sigmaY=0,borderType=cv2.BORDER_REPLICATE)
# sub_binary_frame = cv2.bilateralFilter(sub_binary_frame, 5, 50, 50)
if tr.perspective is not None:
# 宽度
sub_width = sub_binary_frame.shape[1]
# 高度
sub_height = sub_binary_frame.shape[0]
sub_binary_frame = cv2.warpPerspective(sub_binary_frame, tr.perspective, (sub_width, sub_height))
# 覆盖原图
if is_video_mode:
sub_c_img= cv2.cvtColor(sub_binary_frame, cv2.COLOR_GRAY2BGR)
self.cover_sub_image(img,sub_c_img, _start_point, _end_point)
if __debug__:
cv2.imshow(f'{tr.info.id}_binaryImg', sub_binary_frame)
circles = self.circle2_detect(sub_binary_frame)
if len(circles) == 0:
continue
center,radius_pix=self.circle_match(circles,_start_point)
# 纪录圆心位置
if tr.handler_info is None:
tr.handler_info= models.target.HandlerInfo()
if tr.handler_info.is_init:
tr.handler_info.is_init=False
tr.handler_info.center_init = center
# 数据平滑处理
smooth_center = tr.handler_info.enqueue_center_point(center)
# print(f"{tr.info.desc},平滑len={len(tr.handler_info.center_point_queue)},平滑中心点={smooth_center},原始点={center}")
tr.handler_info.center_point=smooth_center
# 原始处理 tr.handler_info.center_point=center
tr.handler_info.radius_pix=radius_pix
tr.circle_displacement_pix()
# 基点
if tr.info.base:
base_point_pix=tr.handler_info.displacement_pix
# 画圆
self.circle_show(img,smooth_center,radius_pix,_start_point,_end_point)
# 基于像素点计算 物理距离
for i, tr in configObj.config_info.targets.items():
if tr.handler_info is None:
continue
if tr.handler_info.displacement_pix is None:
continue
# 减去基点偏移
if base_point_pix is not None:
raw_point=tr.handler_info.displacement_pix
tr.handler_info.displacement_pix=models.target.Point(
x=raw_point.x-base_point_pix.x,
y=raw_point.y-base_point_pix.y)
if not tr.info.base:
# print(f"[{tr.info.id}]{tr.info.desc} 原偏 {raw_point} - 基偏{base_point_pix} ={tr.handler_info.displacement_pix}")
pass
tr.circle_displacement_phy()
if tr.handler_info.displacement_phy is not None:
all_upload_data.data.append(
models.sampleMsg.SensorData(
str(tr.info.id),
tr.handler_info.displacement_phy.x,
tr.handler_info.displacement_phy.y)
)
tr.handler_info.displacement_pix=None
tr.handler_info.displacement_phy = None
#过滤无效空数据
if len(all_upload_data.data)==0:
return
if configObj.config_info.fps.data > 0:
self.enqueue_data(all_upload_data)
def circle_match(self,circles, rect_s_point:models.target.Point)-> (models.target.Point,float):
circle = max(circles, key=lambda c: c[2])
# 绘制圆心
center = (circle[0] + rect_s_point.x, circle[1] + rect_s_point.y)
radius = float(np.round(circle[2], 3))
cp = models.target.Point(x=center[0], y=center[1])
return cp,radius
def circle_show(self, img, center: models.target.Point,radius:float, rect_s_point: models.target.Point,rect_e_point: models.target.Point):
font = cv2.FONT_HERSHEY_SIMPLEX
color = (255, 0, 0) # 蓝色
scale = 0.5
center_int = tuple(int(x) for x in center)
radius_int = int(radius)
cv2.circle(img, center_int, 2, (0, 255, 0), 4)
# 绘制外圆
cv2.circle(img, center_int, radius_int, (0, 0, 255), 1)
# 打印圆心坐标
text1 = f"c:{(center.x,center.y,radius)}"
txt_location = (rect_s_point.x+2,rect_e_point.y-2)
# txt_location = (center_int[0] - radius_int, center_int[1] + radius_int + 10)
cv2.putText(img, text1, txt_location, font, scale, color, 1)
def circle2_detect(self,img):
# 圆心距 canny阈值 最小半径 最大半径
circles_float = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 1, 30, param1=200, param2=0.8, minRadius=15,
maxRadius=0)
# 创建一个0行, 2列的空数组
if circles_float is not None:
# 提取圆心坐标(保留2位小数)
centers = [(round(float(x),2), round(float(y),2), round(float(r),2)) for x, y, r in circles_float[0, :]]
return centers
else:
return []
def extract_sub_image(self,frame, top_left, bottom_right):
"""
从帧中截取子区域
:param frame: 输入的视频帧
:param top_left: 子图片的左上角坐标 (x1, y1)
:param bottom_right: 子图片的右下角坐标 (x2, y2)
:return: 截取的子图片
"""
x1, y1 = top_left
x2, y2 = bottom_right
return frame[y1:y2, x1:x2]
def cover_sub_image(self,frame,sub_frame, top_left, bottom_right):
x1, y1 = top_left
x2, y2 = bottom_right
frame[y1:y2, x1:x2]= sub_frame
return frame
def open_video(self,video_id):
sleep(1)
print(f"打开摄像头 -> {video_id}")
self.capture = cv2.VideoCapture(video_id)
frame_width = int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"默认分辨率= {frame_width}*{frame_height}")
logging.info(f"{video_id}地址->{self.capture}")
if not self.capture.isOpened():
self.capture.release()
logging.info(f"无法打开摄像头{video_id}, release地址 -> {self.capture}")
return
fps = self.capture.get(cv2.CAP_PROP_FPS)
print(f"fps={fps},video_id={video_id},")
# self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, 1600) # 宽度
# self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 900) # 高度
self.is_opened=True
def switch_video(self,video_id:str):
print(f"切换摄像头 -> {video_id}")
self.is_opened = False
self.capture.release()
cv2.destroyAllWindows()
if str.isdigit(video_id):
video_id=int(video_id)
self.open_video(video_id)
def show_video(self):
global sigExit,start_point, end_point, drawing
if __debug__:
cv2.namedWindow('Frame')
cv2.setMouseCallback('Frame', add_rectangle)
# 读取一帧图像
while self.is_running:
if not self.is_opened:
print(f"摄像头 标记is_opened={self.is_opened}")
sleep(5)
continue
ret, frame = self.capture.read()
if ret:
self.frame_handle(frame)
else:
logging.info(f"无法读取帧,cap地址- >{self.capture}")
sleep(1)
# self.capture.release()
# self.capture= cv2.VideoCapture(0) # 再次尝试
cv2.waitKey(1)
# 显示图像
if frame is not None:
if __debug__:
cv2.imshow('Frame', frame)
#缓存到推流
videoPush.update_latest_frame(frame)
print("退出VideoProcessor")
def show_image(self,frame):
global start_point, end_point, drawing
if __debug__:
cv2.namedWindow('Frame')
cv2.setMouseCallback('Frame', add_rectangle)
# 读取一帧图像
while True:
cp_img=frame.copy()
self.frame_handle(cp_img)
if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环
break
cv2.destroyAllWindows()
def frame_handle(self,frame):
# 绘图-历史点
self.draw_rectangle(frame)
# 绘图-实时
if drawing:
cv2.rectangle(frame, tuple(start_point), tuple(end_point), (0, 200, 200), 4)
# print(f"鼠标位置 {start_point} -> {end_point}")
def image_mode(self):
img_raw=cv2.imread('images/trans/_4point.jpg')#images/target/rp80max3.jpg
# img_raw = cv2.imread('images/trans/_4point.jpg') # images/target/rp80max3.jpg
# img_raw = cv2.imread('images/target/rp80.jpg') # images/target/rp80max3.jpg
self.show_image(img_raw)
# 支持
def video_mode(self,video_id:str):
if str.isdigit(video_id):
video_id=int(video_id)
self.open_video(video_id)
self.show_video()
# 释放摄像头资源并关闭所有窗口
print("退出 video")
self.capture.release()
cv2.destroyAllWindows()
def rtsp_mode(self,rtsp_url:str):
# rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554"
# rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554/h264/ch1/main/av_stream"
self.open_video(rtsp_url)
fps = self.capture.get(cv2.CAP_PROP_FPS)
print(f"rtsp fps={fps}")
self.show_video()
# 释放摄像头资源并关闭所有窗口
self.capture.release()
cv2.destroyAllWindows()
def enqueue_data(self,data):
# 获取当前时间戳
timestamp = time.time()
# 将时间戳转换为 datetime 对象
dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位
# 放入图片队列(自动丢弃最旧数据当队列满时)
try:
self.reporter.data_queue.put((dt, data), block=False)
except queue.Full:
# self.reporter.data_dropped += 1
pass
def enqueue_image(self,data):
# 获取当前时间戳
timestamp = time.time()
# 将时间戳转换为 datetime 对象
dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位
# 放入图片队列(自动丢弃最旧数据当队列满时)
try:
self.reporter.image_queue.put((dt, data), block=False)
except queue.Full:
pass
#self.reporter.image_dropped += 1
def stop(self):
self.is_running=False
self.capture.release()