You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

347 lines
12 KiB

from datetime import datetime
import json
import queue
import time
from time import sleep
from dataclasses import asdict
import cv2
import numpy as np
import signal
import sys
import threading
import logging
import configSer
import models.target
import models.sampleMsg
import tcp_Ser
import upload.DataReporter
import utils
from models.msg import Msg
logging.basicConfig(level=logging.DEBUG)
drawing: bool = False # 是否正在绘制
sigExit: bool = False # 是否退出
# 定义点
start_point: models.target.Point
end_point: models.target.Point
# 配置
configObj:configSer.ConfigOperate
# 鼠标回调函数
def add_rectangle(event, x, y, flags, param):
global start_point, end_point, drawing
if event == cv2.EVENT_LBUTTONDOWN: # 左键按下
logging.info("左键按下")
start_point = models.target.Point(x, y)
end_point = start_point
drawing = True
elif event == cv2.EVENT_MOUSEMOVE: # 鼠标移动
if drawing:
end_point = models.target.Point(x, y)
elif event == cv2.EVENT_LBUTTONUP: # 左键抬起
logging.info("左键抬起")
drawing = False
end_point = models.target.Point(x, y)
if start_point == end_point:
return
distance = cv2.norm(tuple(start_point), tuple(end_point), cv2.NORM_L2)
if distance < 20:
logging.info("距离小于20,无效区域")
return
target_id = len(configObj.config_info.targets)
# 圆标靶半径 mm
radius = 20.0
area=models.target.RectangleArea(int(start_point.x),int(start_point.y),
int(end_point.x-start_point.x),int(end_point.y-start_point.y))
t_info=models.target.TargetInfo( target_id,
"test add",
area,
radius,
models.target.Threshold(128,9),
False)
new_target = models.target.CircleTarget(t_info,None,None,None,None)
logging.info(f"新增区域[{target_id}] => {start_point, end_point}")
configObj.config_info.targets[target_id] = new_target
def read_target_rectangle():
return configObj.config_info.targets
class VideoProcessor:
reporter: upload.DataReporter.DataReporter
def __init__(self, reporter:upload.DataReporter.DataReporter):
self.reporter = reporter
def on_data(self,msg:Msg):
global configObj
logging.info(f"msg={msg}")
match msg.cmd:
case "getPoints":
data_dict = {k: asdict(v.info) for k, v in configObj.config_info.targets.items()}
resp_msg = models.msg.Msg(_from="dev", cmd="getPoints", values={"targets": data_dict})
resp_json = resp_msg.to_json_()
return resp_json
case "setPoints":
v=msg.values
ts=v["targets"]
# 清空原配置
configObj.config_info.targets={}
for _,t in ts.items():
t_str=json.dumps(t)
t_info = models.target.TargetInfo.from_json(t_str)
c_target=models.target.CircleTarget.init_by_info(t_info)
configObj.config_info.targets[c_target.info.id] =c_target
configObj.save2json_file()
resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True})
resp_json = resp_msg.to_json()
return resp_json
print("==")
def update_thresh_binary(self,v:int):
self.thresh_binary = v
def pre_handler_img(self,gray_frame,now_str:str):
# 将灰度图压缩为 JPEG 格式,并存储到内存缓冲区
img_base64 = utils.frame_to_base64(gray_frame, format="JPEG")
all_img = models.sampleMsg.AllImg(image=img_base64, time=now_str)
self.enqueue_image(all_img)
def draw_rectangle(self,img):
global configObj
gray_frame = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
#图像发送
now_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
self.pre_handler_img(gray_frame,now_str)
if len(configObj.config_info.targets)==0: return
#上报有新数据的点
all_upload_data = models.sampleMsg.AllSensorData(data=[], time=now_str)
# 绘图-历史点
for i, tr in configObj.config_info.targets.items():
if not hasattr(tr, "info"):
print("====")
_start_point = models.target.Point(tr.info.rectangle_area.x, tr.info.rectangle_area.y)
_end_point = models.target.Point(
tr.info.rectangle_area.x+tr.info.rectangle_area.w,
tr.info.rectangle_area.y+tr.info.rectangle_area.h)
#绘制标靶区域
cv2.rectangle(img,tuple(_start_point), tuple(_end_point), (255, 0, 0), 2)
#检测
sub_image = self.extract_sub_image(gray_frame, _start_point, _end_point)
ret, sub_binary_frame = cv2.threshold(sub_image, tr.info.threshold.binary, 255, cv2.THRESH_BINARY)
# 高斯滤波
sub_binary_frame = cv2.GaussianBlur(sub_binary_frame, (tr.info.threshold.gauss, tr.info.threshold.gauss), 1)
cv2.imshow(f'{tr.info.id}_binaryImg', sub_binary_frame)
# 覆盖原图
# sub_c_img= cv2.cvtColor(sub_binary_frame, cv2.COLOR_GRAY2BGR)
# self.cover_sub_image(img,sub_c_img, _start_point, _end_point)
circles = self.circle2_detect(sub_binary_frame)
if len(circles) == 0:
continue
center,radius=self.circle_show(img,circles,_start_point)
# 纪录圆心位置
tr.center_point=center
tr.radius_pix=radius
if tr.is_init:
tr.center_init=tr.center_point
tr.is_init=False
tr.circle_displacement()
all_upload_data.data.append(
models.sampleMsg.SensorData(
str(tr.info.id),
tr.displacement_phy.x,
tr.displacement_phy.y)
)
#过滤无效空数据
if len(all_upload_data.data)==0:
return
# json_str = json.dumps(
# {k:asdict(v) for k, v in once_upload.items() if v.is_init==False}
# )
# print(f"标靶数据={json_str}",json_str)
self.enqueue_data(all_upload_data)
def circle_show(self,img, circles, relative_point:models.target.Point):
font = cv2.FONT_HERSHEY_SIMPLEX
color = (255, 0, 0) # 蓝色
scale = 0.5
circle = max(circles, key=lambda c: c[2])
# 绘制圆心
center = (circle[0] + relative_point.x, circle[1] + relative_point.y)
center_int = tuple(int(x) for x in center)
cv2.circle(img, center_int, 2, (0, 255, 0), 4)
radius = np.round(circle[2], 3)
radius_int = int(radius)
# 绘制外圆
cv2.circle(img, center_int, radius_int, (0, 0, 255), 2)
# 打印圆心坐标
text1 = f"center:{circle}"
text2 = f"r:{radius}"
txt_location = (center_int[0] + radius_int, center_int[1] + radius_int // 2)
cv2.putText(img, text1, txt_location, font, scale, color, 2)
cp = models.target.Point(x=center[0], y=center[1])
return cp,radius
def circle2_detect(self,img):
# 圆心距 canny阈值 最小半径 最大半径
circles_float = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 1.5, 30, param1=300, param2=0.9, minRadius=15,
maxRadius=0)
# 创建一个0行, 2列的空数组
if circles_float is not None:
# 提取圆心坐标(保留2位小数)
centers = [(round(float(x),2), round(float(y),2), round(float(r),2)) for x, y, r in circles_float[0, :]]
return centers
else:
return []
def extract_sub_image(self,frame, top_left, bottom_right):
"""
从帧中截取子区域
:param frame: 输入的视频帧
:param top_left: 子图片的左上角坐标 (x1, y1)
:param bottom_right: 子图片的右下角坐标 (x2, y2)
:return: 截取的子图片
"""
x1, y1 = top_left
x2, y2 = bottom_right
return frame[y1:y2, x1:x2]
def cover_sub_image(self,frame,sub_frame, top_left, bottom_right):
x1, y1 = top_left
x2, y2 = bottom_right
frame[y1:y2, x1:x2]= sub_frame
return frame
def open_video(self,video_id):
cap = cv2.VideoCapture(video_id)
# cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1600) # 宽度
# cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 900) # 高度
if not cap.isOpened():
logging.info("无法打开摄像头")
exit()
return cap
def show_video(self,cap):
global sigExit,start_point, end_point, drawing
cv2.namedWindow('Frame')
cv2.setMouseCallback('Frame', add_rectangle)
# 读取一帧图像
while True:
ret, frame = cap.read()
if ret:
self.frame_handle(frame)
else:
logging.info("无法读取帧")
if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环
break
if sigExit:
break
def show_image(self,frame):
global start_point, end_point, drawing
cv2.namedWindow('Frame')
cv2.setMouseCallback('Frame', add_rectangle)
# 读取一帧图像
while True:
cp_img=frame.copy()
self.frame_handle(cp_img)
if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环
break
cv2.destroyAllWindows()
def frame_handle(self,frame):
# 绘图-历史点
self.draw_rectangle(frame)
# 绘图-实时
if drawing:
cv2.rectangle(frame, tuple(start_point), tuple(end_point), (0, 200, 200), 4)
# print(f"鼠标位置 {start_point} -> {end_point}")
# 显示图像
cv2.imshow('Frame', frame)
# 读取图像
#img_copy = img.copy() # 复制图像用于还原
def image_mode(self):
img_raw=cv2.imread('images/trans/_4point.jpg')#images/target/rp80max3.jpg
# img_raw = cv2.imread('images/trans/_4point.jpg') # images/target/rp80max3.jpg
# img_raw = cv2.imread('images/target/rp80.jpg') # images/target/rp80max3.jpg
self.show_image(img_raw)
# 支持
def video_mode(self,video_id):
capture = self.open_video(video_id)
fps = capture.get(cv2.CAP_PROP_FPS)
print(f"fps={fps}")
self.show_video(capture)
# 释放摄像头资源并关闭所有窗口
capture.release()
cv2.destroyAllWindows()
def rtsp_mode(self,rtsp_url:str):
# rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554"
# rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554/h264/ch1/main/av_stream"
capture = self.open_video(rtsp_url)
fps = capture.get(cv2.CAP_PROP_FPS)
print(f"fps={fps}")
self.show_video(capture)
# 释放摄像头资源并关闭所有窗口
capture.release()
cv2.destroyAllWindows()
def enqueue_data(self,data):
# 获取当前时间戳
timestamp = time.time()
# 将时间戳转换为 datetime 对象
dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位
# 放入图片队列(自动丢弃最旧数据当队列满时)
try:
self.reporter.data_queue.put((dt, data), block=False)
except queue.Full:
self.reporter.data_dropped += 1
def enqueue_image(self,data):
# 获取当前时间戳
timestamp = time.time()
# 将时间戳转换为 datetime 对象
dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位
# 放入图片队列(自动丢弃最旧数据当队列满时)
try:
self.reporter.image_queue.put((dt, data), block=False)
except queue.Full:
self.reporter.image_dropped += 1
#数据广播
def check_exit(sig, frame):
global sigExit
logging.info(f"收到退出信号 sig={sig}")
sigExit=True
sleep(1)
logging.info("程序退出")
sys.exit(0)