diff --git a/config.json b/config.json deleted file mode 100644 index 4d97f33..0000000 --- a/config.json +++ /dev/null @@ -1,67 +0,0 @@ -{ - "server": { - "port": 2230 - }, - "fps": { - "data": 10, - "video": 0 - }, - "capture": "0", - "targets": { - "0": { - "info": { - "rectangle_area": { - "x": 50, - "y": 230, - "w": 130, - "h": 100 - }, - "threshold": { - "binary": 180, - "gauss": 7 - }, - "radius": 20.0, - "id": 0, - "desc": "0_up", - "base": false - }, - "perspective": [ - [ - 1.02, - -0.277155559, - 109.207622 - ], - [ - 0.0802663708, - 1.00426514, - -34.8436318 - ], - [ - 3.80200276e-05, - 2.664279e-06, - 1.0 - ] - ], - "handler_info": { - "radius_pix": 20.0, - "pix_length": 1.0, - "center_point": { - "x": 125.0, - "y": 272.0 - }, - "center_init": { - "x": 123.5, - "y": 272.0 - }, - "displacement_pix": { - "x": 1.5, - "y": 0.0 - }, - "displacement_phy": { - "x": 1.5, - "y": 0.0 - } - } - } - } -} \ No newline at end of file diff --git a/videoDetection.py b/videoDetection.py deleted file mode 100644 index 1af821a..0000000 --- a/videoDetection.py +++ /dev/null @@ -1,463 +0,0 @@ -from datetime import datetime -import json -import queue -import time -from time import sleep -import cv2 -import numpy as np -import logging - -import configSer -import models.target -import models.sampleMsg -import upload.DataReporter -import utils -import videoPush -from models.msg import Msg - -logging.basicConfig(level=logging.DEBUG) -drawing: bool = False # 是否正在绘制 -is_video_mode: bool = False # 是否采用标靶图覆盖 -# 定义点 -start_point: models.target.Point -end_point: models.target.Point -# 配置 -configObj:configSer.ConfigOperate - -# 鼠标回调函数 -def add_rectangle(event, x, y, flags, param): - global start_point, end_point, drawing - - if event == cv2.EVENT_LBUTTONDOWN: # 左键按下 - logging.info("左键按下") - start_point = models.target.Point(x, y) - end_point = start_point - drawing = True - elif event == cv2.EVENT_MOUSEMOVE: # 鼠标移动 - if drawing: - end_point = models.target.Point(x, y) - elif event == cv2.EVENT_LBUTTONUP: # 左键抬起 - logging.info("左键抬起") - drawing = False - end_point = models.target.Point(x, y) - if start_point == end_point: - return - distance = cv2.norm(tuple(start_point), tuple(end_point), cv2.NORM_L2) - if distance < 20: - logging.info("距离小于20,无效区域") - return - target_id = len(configObj.config_info.targets) - # 圆标靶半径 mm - radius = 20.0 - area=models.target.RectangleArea(int(start_point.x),int(start_point.y), - int(end_point.x-start_point.x),int(end_point.y-start_point.y)) - t_info=models.target.TargetInfo( target_id, - "test add", - area, - radius, - models.target.Threshold(190,9), - False) - new_target = models.target.CircleTarget(t_info,None,None) - logging.info(f"新增区域[{target_id}] => {start_point, end_point}") - configObj.config_info.targets[target_id] = new_target - -def read_target_rectangle(): - return configObj.config_info.targets -class VideoProcessor: - reporter: upload.DataReporter.DataReporter - capture: cv2.VideoCapture - is_opened: bool= False - is_running = True - def __init__(self, reporter:upload.DataReporter.DataReporter): - self.reporter = reporter - - def on_data(self,msg:Msg): - global configObj,is_video_mode - logging.info(f"msg={msg}") - match msg.cmd: - case "getPoints": - targets=configObj.config_info.targets.copy() - for k,v in targets.items(): - targets[k].handler_info=None - - resp_msg = models.msg.Msg(_from="dev", cmd="getPoints", values={"targets": targets}) - resp_json = resp_msg.to_json_() - return resp_json - case "setPoints": - v=msg.values - ts=v["targets"] - - # 清空原配置 - configObj.config_info.targets={} - for _,t in ts.items(): - t_str=json.dumps(t) - new_c_target = models.target.CircleTarget.from_json(t_str) - configObj.config_info.targets[new_c_target.info.id] =new_c_target - - configObj.save2json_file() - resp_msg = models.msg.Msg(_from="dev", cmd="setPoints", values={"operate": True}) - resp_json = resp_msg.to_json() - return resp_json - case "videoFps": - v = msg.values - fps = v["fps"] - self.reporter.adjust_rate(fps,"image") - configObj.config_info.fps.video = fps - configObj.save2json_file() - resp_msg = models.msg.Msg(_from="dev", cmd="videoFps", values={"operate": True}) - resp_json = resp_msg.to_json() - return resp_json - case "dataFps": - v = msg.values - fps = v["fps"] - self.reporter.adjust_rate(fps,"data") - configObj.config_info.fps.data=fps - configObj.save2json_file() - resp_msg = models.msg.Msg(_from="dev", cmd="dataFps", values={"operate": True}) - resp_json = resp_msg.to_json() - return resp_json - case "setCap": - v = msg.values - cap = v["cap"] - self.switch_video(cap) - resp_msg = models.msg.Msg(_from="dev", cmd="setCap", values={"operate": True}) - resp_json = resp_msg.to_json() - return resp_json - case "videoMode": - v = msg.values - is_debug = v["debug"] - is_video_mode=is_debug - resp_msg = models.msg.Msg(_from="dev", cmd="videoMode", values={"operate": True}) - resp_json = resp_msg.to_json() - return resp_json - print("==") - - - def pre_handler_img(self,gray_frame,now_str:str): - # 将灰度图压缩为 JPEG 格式,并存储到内存缓冲区 - img_base64 = utils.frame_to_base64(gray_frame, format="JPEG") - all_img = models.sampleMsg.AllImg(image=img_base64, time=now_str) - self.enqueue_image(all_img) - - def draw_rectangle(self,img): - global configObj,is_video_mode - gray_frame = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - - now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - #图像发送 - if configObj.config_info.fps.video > 0: - self.pre_handler_img(gray_frame,now_str) - - - if len(configObj.config_info.targets)==0: return - - # 基点 - base_point_pix:models.target.Point=None - - #上报有新数据的点 - all_upload_data = models.sampleMsg.AllSensorData(data=[], time=now_str) - # 绘图-历史点 - for i, tr in configObj.config_info.targets.items(): - if not hasattr(tr, "info"): - print("====") - _start_point = models.target.Point(tr.info.rectangle_area.x, tr.info.rectangle_area.y) - _end_point = models.target.Point( - tr.info.rectangle_area.x+tr.info.rectangle_area.w, - tr.info.rectangle_area.y+tr.info.rectangle_area.h) - #绘制标靶区域 - blue_color = (255, 0, 0) - if tr.info.base: - blue_color=(200, 0, 200) #紫红色 基点 - - cv2.rectangle(img,tuple(_start_point), tuple(_end_point), blue_color, 2) - label=f"{tr.info.desc},r={tr.info.radius}" - cv2.putText(img, label, (_start_point.x,_start_point.y-6), cv2.FONT_HERSHEY_SIMPLEX, 0.6, blue_color,1) - #检测 - # 获取图像尺寸 - frame_height, frame_width = gray_frame.shape[:2] - if _end_point.x>frame_width or _end_point.y>frame_height: - print(f"标靶[{tr.info.desc}]sub_image 超出区域") - continue - sub_image = self.extract_sub_image(gray_frame, _start_point, _end_point) - - - ret, sub_binary_frame = cv2.threshold(sub_image, tr.info.threshold.binary, 255, cv2.THRESH_BINARY) - # 高斯滤波 - gauss_size=tr.info.threshold.gauss - sub_binary_frame = cv2.GaussianBlur(sub_binary_frame, (gauss_size, gauss_size), sigmaX=0,sigmaY=0,borderType=cv2.BORDER_REPLICATE) - # sub_binary_frame = cv2.bilateralFilter(sub_binary_frame, 5, 50, 50) - if tr.perspective is not None: - # 宽度 - sub_width = sub_binary_frame.shape[1] - # 高度 - sub_height = sub_binary_frame.shape[0] - - sub_binary_frame = cv2.warpPerspective(sub_binary_frame, tr.perspective, (sub_width, sub_height)) - - - # 覆盖原图 - if is_video_mode: - sub_c_img= cv2.cvtColor(sub_binary_frame, cv2.COLOR_GRAY2BGR) - self.cover_sub_image(img,sub_c_img, _start_point, _end_point) - - if __debug__: - cv2.imshow(f'{tr.info.id}_binaryImg', sub_binary_frame) - - circles = self.circle2_detect(sub_binary_frame) - if len(circles) == 0: - continue - center,radius_pix=self.circle_match(circles,_start_point) - # 纪录圆心位置 - if tr.handler_info is None: - tr.handler_info= models.target.HandlerInfo() - if tr.handler_info.is_init: - tr.handler_info.is_init=False - tr.handler_info.center_init = center - - # 数据平滑处理 - smooth_center = tr.handler_info.enqueue_center_point(center) - # print(f"{tr.info.desc},平滑len={len(tr.handler_info.center_point_queue)},平滑中心点={smooth_center},原始点={center}") - tr.handler_info.center_point=smooth_center - # 原始处理 tr.handler_info.center_point=center - tr.handler_info.radius_pix=radius_pix - tr.circle_displacement_pix() - - # 基点 - if tr.info.base: - base_point_pix=tr.handler_info.displacement_pix - - # 画圆 - self.circle_show(img,smooth_center,radius_pix,_start_point,_end_point) - - # 基于像素点计算 物理距离 - for i, tr in configObj.config_info.targets.items(): - - if tr.handler_info is None: - continue - if tr.handler_info.displacement_pix is None: - continue - # 减去基点偏移 - if base_point_pix is not None: - raw_point=tr.handler_info.displacement_pix - tr.handler_info.displacement_pix=models.target.Point( - x=raw_point.x-base_point_pix.x, - y=raw_point.y-base_point_pix.y) - if not tr.info.base: - # print(f"[{tr.info.id}]{tr.info.desc} 原偏 {raw_point} - 基偏{base_point_pix} ={tr.handler_info.displacement_pix}") - pass - tr.circle_displacement_phy() - if tr.handler_info.displacement_phy is not None: - all_upload_data.data.append( - models.sampleMsg.SensorData( - str(tr.info.id), - tr.handler_info.displacement_phy.x, - tr.handler_info.displacement_phy.y) - ) - tr.handler_info.displacement_pix=None - tr.handler_info.displacement_phy = None - - - #过滤无效空数据 - if len(all_upload_data.data)==0: - return - - if configObj.config_info.fps.data > 0: - self.enqueue_data(all_upload_data) - - - - def circle_match(self,circles, rect_s_point:models.target.Point)-> (models.target.Point,float): - - circle = max(circles, key=lambda c: c[2]) - - # 绘制圆心 - center = (circle[0] + rect_s_point.x, circle[1] + rect_s_point.y) - radius = float(np.round(circle[2], 3)) - cp = models.target.Point(x=center[0], y=center[1]) - return cp,radius - - def circle_show(self, img, center: models.target.Point,radius:float, rect_s_point: models.target.Point,rect_e_point: models.target.Point): - - font = cv2.FONT_HERSHEY_SIMPLEX - color = (255, 0, 0) # 蓝色 - scale = 0.5 - center_int = tuple(int(x) for x in center) - radius_int = int(radius) - cv2.circle(img, center_int, 2, (0, 255, 0), 4) - # 绘制外圆 - cv2.circle(img, center_int, radius_int, (0, 0, 255), 1) - # 打印圆心坐标 - - text1 = f"c:{(center.x,center.y,radius)}" - txt_location = (rect_s_point.x+2,rect_e_point.y-2) - # txt_location = (center_int[0] - radius_int, center_int[1] + radius_int + 10) - cv2.putText(img, text1, txt_location, font, scale, color, 1) - - - def circle2_detect(self,img): - # 圆心距 canny阈值 最小半径 最大半径 - circles_float = cv2.HoughCircles(img, cv2.HOUGH_GRADIENT_ALT, 1, 30, param1=200, param2=0.8, minRadius=15, - maxRadius=0) - # 创建一个0行, 2列的空数组 - if circles_float is not None: - # 提取圆心坐标(保留2位小数) - centers = [(round(float(x),2), round(float(y),2), round(float(r),2)) for x, y, r in circles_float[0, :]] - return centers - else: - return [] - - - def extract_sub_image(self,frame, top_left, bottom_right): - """ - 从帧中截取子区域 - :param frame: 输入的视频帧 - :param top_left: 子图片的左上角坐标 (x1, y1) - :param bottom_right: 子图片的右下角坐标 (x2, y2) - :return: 截取的子图片 - """ - x1, y1 = top_left - x2, y2 = bottom_right - return frame[y1:y2, x1:x2] - - def cover_sub_image(self,frame,sub_frame, top_left, bottom_right): - x1, y1 = top_left - x2, y2 = bottom_right - frame[y1:y2, x1:x2]= sub_frame - return frame - - - def open_video(self,video_id): - sleep(1) - print(f"打开摄像头 -> {video_id}") - self.capture = cv2.VideoCapture(video_id) - frame_width = int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH)) - frame_height = int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) - print(f"默认分辨率= {frame_width}*{frame_height}") - logging.info(f"{video_id}地址->{self.capture}") - if not self.capture.isOpened(): - self.capture.release() - logging.info(f"无法打开摄像头{video_id}, release地址 -> {self.capture}") - return - fps = self.capture.get(cv2.CAP_PROP_FPS) - print(f"fps={fps},video_id={video_id},") - # self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, 1600) # 宽度 - # self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 900) # 高度 - self.is_opened=True - - def switch_video(self,video_id:str): - print(f"切换摄像头 -> {video_id}") - self.is_opened = False - self.capture.release() - cv2.destroyAllWindows() - if str.isdigit(video_id): - video_id=int(video_id) - self.open_video(video_id) - - - - def show_video(self): - global sigExit,start_point, end_point, drawing - if __debug__: - cv2.namedWindow('Frame') - cv2.setMouseCallback('Frame', add_rectangle) - # 读取一帧图像 - while self.is_running: - if not self.is_opened: - print(f"摄像头 标记is_opened={self.is_opened}") - sleep(5) - continue - ret, frame = self.capture.read() - if ret: - self.frame_handle(frame) - else: - logging.info(f"无法读取帧,cap地址- >{self.capture}") - sleep(1) - # self.capture.release() - # self.capture= cv2.VideoCapture(0) # 再次尝试 - cv2.waitKey(1) - # 显示图像 - if frame is not None: - if __debug__: - cv2.imshow('Frame', frame) - #缓存到推流 - videoPush.update_latest_frame(frame) - print("退出VideoProcessor") - def show_image(self,frame): - global start_point, end_point, drawing - if __debug__: - cv2.namedWindow('Frame') - cv2.setMouseCallback('Frame', add_rectangle) - # 读取一帧图像 - while True: - cp_img=frame.copy() - self.frame_handle(cp_img) - - if cv2.waitKey(1) & 0xFF == ord('q'): # 按'q'退出循环 - break - cv2.destroyAllWindows() - - def frame_handle(self,frame): - # 绘图-历史点 - self.draw_rectangle(frame) - # 绘图-实时 - if drawing: - cv2.rectangle(frame, tuple(start_point), tuple(end_point), (0, 200, 200), 4) - # print(f"鼠标位置 {start_point} -> {end_point}") - - def image_mode(self): - img_raw=cv2.imread('images/trans/_4point.jpg')#images/target/rp80max3.jpg - # img_raw = cv2.imread('images/trans/_4point.jpg') # images/target/rp80max3.jpg - # img_raw = cv2.imread('images/target/rp80.jpg') # images/target/rp80max3.jpg - self.show_image(img_raw) - - # 支持 - def video_mode(self,video_id:str): - if str.isdigit(video_id): - video_id=int(video_id) - self.open_video(video_id) - self.show_video() - # 释放摄像头资源并关闭所有窗口 - print("退出 video") - self.capture.release() - cv2.destroyAllWindows() - - def rtsp_mode(self,rtsp_url:str): - # rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554" - # rtsp_url ="rtsp://admin:123456abc@192.168.1.64:554/h264/ch1/main/av_stream" - self.open_video(rtsp_url) - fps = self.capture.get(cv2.CAP_PROP_FPS) - print(f"rtsp fps={fps}") - self.show_video() - # 释放摄像头资源并关闭所有窗口 - self.capture.release() - cv2.destroyAllWindows() - - def enqueue_data(self,data): - # 获取当前时间戳 - timestamp = time.time() - # 将时间戳转换为 datetime 对象 - dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位 - # 放入图片队列(自动丢弃最旧数据当队列满时) - try: - self.reporter.data_queue.put((dt, data), block=False) - except queue.Full: - # self.reporter.data_dropped += 1 - pass - def enqueue_image(self,data): - # 获取当前时间戳 - timestamp = time.time() - # 将时间戳转换为 datetime 对象 - dt = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S%f")[:-3] # 毫秒部分是微秒的前三位 - # 放入图片队列(自动丢弃最旧数据当队列满时) - try: - self.reporter.image_queue.put((dt, data), block=False) - except queue.Full: - pass - #self.reporter.image_dropped += 1 - - def stop(self): - self.is_running=False - self.capture.release() - - - diff --git a/videoPush.py b/videoPush.py deleted file mode 100644 index 48a74e5..0000000 --- a/videoPush.py +++ /dev/null @@ -1,60 +0,0 @@ -from time import sleep - -import cv2 -import threading -import time - -import numpy as np -import requests -from flask import Flask, Response, render_template_string, request - -import utils - -app_flask = Flask(__name__) - -# 全局变量,用于存储最新的帧 -latest_frame:np.ndarray = None -lock = threading.Lock() -is_running = True - -def update_latest_frame(n_bytes:np.ndarray): - global latest_frame - latest_frame=n_bytes - -def generate_mjpeg(): - """生成MJPEG格式的视频流""" - while is_running: - with lock: - if latest_frame is None: - continue - _,latest_frame_bytes = utils.frame_to_img(latest_frame) - frame = latest_frame_bytes.tobytes() - pass - sleep(0.1) - yield (b'--frame\r\n' - b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') - - -@app_flask.route('/video_flow') -def video_feed(): - """视频流路由""" - return Response( - generate_mjpeg(), - mimetype='multipart/x-mixed-replace; boundary=frame' - ) - -def run(): - port=2240 - print(f"推流服务启动,访问端口 127.0.0.1:{port}/video_flow") - app_flask.run(host='0.0.0.0', port=port, threaded=True) - -def video_server_run(): - thread = threading.Thread(target=run) - thread.daemon = True # 设置为守护线程,主线程退出时自动终止 - thread.start() -def stop(): - global is_running - is_running=False - -if __name__ == '__main__': - run() \ No newline at end of file