2025-08-05 18:50:06 +08:00

169 lines
5.3 KiB
Python

import cv2
import supervision as sv
from ultralytics import YOLO
import os
import time
import numpy as np
from tqdm import tqdm
from PIL import Image, ImageDraw, ImageFont
def put_chinese_text(img, text, position, font_size=20, color=(0, 0, 255)):
# 将OpenCV图像转换为PIL图像
img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_pil)
# 加载支持中文的字体
try:
# 尝试加载系统中的微软雅黑字体
font = ImageFont.truetype("msyh.ttc", font_size)
except:
# 如果找不到指定字体,使用默认字体
font = ImageFont.load_default()
# 绘制文本
draw.text(position, text, font=font, fill=color)
# 将PIL图像转换回OpenCV图像
return cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
def process_video_with_segmentation(
input_video_path: str,
output_video_path: str,
model_name: str = "yolov8n-seg.pt", # 使用分割模型
confidence_threshold: float = 0.5,
classes: list = None,
show_live: bool = True,
save_annotated: bool = False
):
"""
参数:
input_video_path: 输入视频文件路径
output_video_path: 输出视频文件路径
model_name: YOLO分割模型名称或路径 (默认: yolov8n-seg.pt)
confidence_threshold: 检测置信度阈值 (0-1)
classes: 要检测的类别ID列表 (None表示所有类别)
show_live: 是否实时显示处理过程
save_annotated: 是否保存标注后的视频
"""
# 1. 初始化分割模型
print(f"加载分割模型: {model_name}")
model = YOLO(model_name)
# 2. 初始化视频读取器
print(f"打开视频文件: {input_video_path}")
video_info = sv.VideoInfo.from_video_path(input_video_path)
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {input_video_path}")
return
# 3. 初始化视频写入器 (如果需要保存结果)
if save_annotated:
output_dir = os.path.dirname(output_video_path)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"创建输出目录: {output_dir}")
writer = cv2.VideoWriter(
output_video_path,
cv2.VideoWriter_fourcc(*'mp4v'),
video_info.fps,
(video_info.width, video_info.height)
)
print(f"准备保存结果到: {output_video_path}")
# 4. 初始化Supervision工具 - 使用MaskAnnotator替代BoxAnnotator
mask_annotator = sv.MaskAnnotator(
color=sv.Color(r=0, g=255, b=0), # 绿色掩码
opacity=0.5 # 50%透明度
)
# 5. 处理进度跟踪
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"视频总帧数: {total_frames}")
pbar = tqdm(total=total_frames, desc="处理视频帧")
# 7. 处理每一帧
frame_count = 0
processing_times = []
while cap.isOpened():
start_time = time.time()
# 读取帧
ret, frame = cap.read()
if not ret:
break
frame_count += 1
# 使用YOLO进行图像分割
results = model(
frame,
conf=confidence_threshold,
classes=classes,
verbose=False
)[0]
# 转换为Supervision检测对象
detections = sv.Detections.from_ultralytics(results)
# 应用分割掩码
segmented_frame = mask_annotator.annotate(
scene=frame.copy(),
detections=detections
)
# 计算处理时间
end_time = time.time()
processing_time = end_time - start_time
processing_times.append(processing_time)
# 实时显示处理结果
if show_live:
cv2.imshow("Segmentation", segmented_frame)
if cv2.waitKey(1) == 27: # ESC键退出
break
# 保存处理后的帧
if save_annotated:
writer.write(segmented_frame)
# 更新进度条
pbar.update(1)
# 8. 清理资源
cap.release()
if save_annotated:
writer.release()
if show_live:
cv2.destroyAllWindows()
pbar.close()
# 打印性能统计
if processing_times:
avg_time = sum(processing_times) / len(processing_times)
fps = 1 / avg_time if avg_time > 0 else 0
print(f"\n处理完成! 平均处理时间: {avg_time:.4f}秒/帧, 约 {fps:.2f} FPS")
if __name__ == "__main__":
# 示例用法
input_video = r"D:\yolo8\新建文件夹\靠泊视频.mp4" # 替换为您的视频路径
output_video = r"output/segmented_video.mp4" # 输出视频路径
# 选择要检测的类别 (0=人, 2=车等)
selected_classes = [0] # 只检测人
process_video_with_segmentation(
input_video_path=input_video,
output_video_path=output_video,
model_name=r"D:\yolo8\model\kaobo\weights\best.pt", # 使用分割模型
confidence_threshold=0.3, # 置信度阈值
classes=selected_classes, # 指定检测类别
show_live=True, # 实时显示处理
save_annotated=True # 保存结果视频
)