208 lines
6.7 KiB
Python
208 lines
6.7 KiB
Python
|
import cv2
|
||
|
import supervision as sv
|
||
|
from ultralytics import YOLO
|
||
|
import os
|
||
|
import time
|
||
|
import numpy as np
|
||
|
from tqdm import tqdm
|
||
|
from PIL import Image, ImageDraw, ImageFont
|
||
|
import io
|
||
|
|
||
|
def put_chinese_text(img, text, position, font_size=20, color=(0, 0, 255)):
|
||
|
# 将OpenCV图像转换为PIL图像
|
||
|
img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
|
||
|
draw = ImageDraw.Draw(img_pil)
|
||
|
|
||
|
# 加载支持中文的字体
|
||
|
try:
|
||
|
# 尝试加载系统中的微软雅黑字体
|
||
|
font = ImageFont.truetype("msyh.ttc", font_size)
|
||
|
except:
|
||
|
# 如果找不到指定字体,使用默认字体
|
||
|
font = ImageFont.load_default()
|
||
|
|
||
|
# 绘制文本
|
||
|
draw.text(position, text, font=font, fill=color)
|
||
|
|
||
|
# 将PIL图像转换回OpenCV图像
|
||
|
return cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
|
||
|
|
||
|
def process_video_with_detection(
|
||
|
input_video_path: str,
|
||
|
output_video_path: str,
|
||
|
model_name: str = "yoll11n.pt",
|
||
|
confidence_threshold: float = 0.5,
|
||
|
classes: list = None,
|
||
|
show_live: bool = True,
|
||
|
save_annotated: bool = False
|
||
|
):
|
||
|
"""
|
||
|
参数:
|
||
|
input_video_path: 输入视频文件路径
|
||
|
output_video_path: 输出视频文件路径
|
||
|
model_name: YOLO模型名称或路径 (yoll11n.pt)
|
||
|
confidence_threshold: 检测置信度阈值 (0-1)
|
||
|
classes: 要检测的类别ID列表 (None表示所有类别)
|
||
|
show_live: 是否实时显示处理过程
|
||
|
save_annotated: 是否保存标注后的视频
|
||
|
"""
|
||
|
# 1. 初始化模型
|
||
|
print(f"加载模型: {model_name}")
|
||
|
model = YOLO(model_name)
|
||
|
|
||
|
# 2. 初始化视频读取器
|
||
|
print(f"打开视频文件: {input_video_path}")
|
||
|
video_info = sv.VideoInfo.from_video_path(input_video_path)
|
||
|
cap = cv2.VideoCapture(input_video_path)
|
||
|
|
||
|
if not cap.isOpened():
|
||
|
print(f"无法打开视频文件: {input_video_path}")
|
||
|
return
|
||
|
|
||
|
# 3. 初始化视频写入器 (如果需要保存结果)
|
||
|
if save_annotated:
|
||
|
output_dir = os.path.dirname(output_video_path)
|
||
|
if output_dir and not os.path.exists(output_dir):
|
||
|
os.makedirs(output_dir)
|
||
|
print(f"创建输出目录: {output_dir}")
|
||
|
|
||
|
writer = cv2.VideoWriter(
|
||
|
output_video_path,
|
||
|
cv2.VideoWriter_fourcc(*'mp4v'),
|
||
|
video_info.fps,
|
||
|
(video_info.width, video_info.height)
|
||
|
)
|
||
|
print(f"准备保存结果到: {output_video_path}")
|
||
|
|
||
|
# 4. 初始化Supervision工具
|
||
|
byte_tracker = sv.ByteTrack()
|
||
|
box_annotator = sv.BoxAnnotator(
|
||
|
|
||
|
)
|
||
|
label_annotator = sv.LabelAnnotator()
|
||
|
trace_annotator = sv.TraceAnnotator()
|
||
|
|
||
|
# 5. 处理进度跟踪
|
||
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||
|
print(f"视频总帧数: {total_frames}")
|
||
|
pbar = tqdm(total=total_frames, desc="处理视频帧")
|
||
|
|
||
|
# 6. 处理每一帧
|
||
|
frame_count = 0
|
||
|
processing_times = []
|
||
|
|
||
|
while cap.isOpened():
|
||
|
start_time = time.time()
|
||
|
|
||
|
# 读取帧
|
||
|
ret, frame = cap.read()
|
||
|
if not ret:
|
||
|
break
|
||
|
|
||
|
frame_count += 1
|
||
|
|
||
|
# 使用YOLO进行目标检测
|
||
|
results = model(
|
||
|
frame,
|
||
|
conf=confidence_threshold,
|
||
|
classes=classes,
|
||
|
verbose=False
|
||
|
)[0]
|
||
|
|
||
|
# 转换为Supervision检测对象
|
||
|
detections = sv.Detections.from_ultralytics(results)
|
||
|
|
||
|
|
||
|
# 准备标注
|
||
|
labels = [
|
||
|
f"{results.names[class_id]} {confidence:.2f}"
|
||
|
for class_id, confidence in
|
||
|
zip(detections.class_id, detections.confidence)
|
||
|
]
|
||
|
|
||
|
# 标注边界框
|
||
|
annotated_frame = box_annotator.annotate(
|
||
|
scene=frame.copy(),
|
||
|
detections=detections
|
||
|
)
|
||
|
|
||
|
# 标注标签
|
||
|
annotated_frame = label_annotator.annotate(
|
||
|
scene=annotated_frame,
|
||
|
detections=detections,
|
||
|
labels=labels
|
||
|
)
|
||
|
|
||
|
# 定义多边形点
|
||
|
pts1 = [(1153.11, 273.86), (1146.09, 370.77), (1217.71, 352.51), (1220.52, 257.01)]
|
||
|
pts2 = [(1140.47, 506.99), (1214.90, 504.19), (1212.09, 592.66), (1136.25, 594.07)]
|
||
|
# 转换为整数坐标
|
||
|
pts1 = [(int(x), int(y)) for x, y in pts1]
|
||
|
pts2 = [(int(x), int(y)) for x, y in pts2]
|
||
|
# 绘制多边形
|
||
|
cv2.polylines(annotated_frame, [np.array(pts1)], isClosed=True, color=(0, 255, 0), thickness=2)
|
||
|
cv2.polylines(annotated_frame, [np.array(pts2)], isClosed=True, color=(0, 0, 255), thickness=2)
|
||
|
|
||
|
# 检测目标是否通过多边形
|
||
|
polygon = np.array(pts1, np.int32)
|
||
|
warning_displayed = False
|
||
|
|
||
|
# 遍历所有检测到的目标
|
||
|
for bbox in detections.xyxy:
|
||
|
# 获取目标边界框
|
||
|
x1, y1, x2, y2 = bbox
|
||
|
# 计算中心点
|
||
|
center_x = int((x1 + x2) / 2)
|
||
|
center_y = int((y1 + y2) / 2)
|
||
|
|
||
|
# 检测中心点是否在多边形内
|
||
|
distance = cv2.pointPolygonTest(polygon, (center_x, center_y), False)
|
||
|
|
||
|
# 如果点在多边形内 (distance >= 0)
|
||
|
if distance >= 0:
|
||
|
# 显示警告信息
|
||
|
annotated_frame = put_chinese_text(annotated_frame, "警告: 目标进入区域!", (50, 50), font_size=20, color=(255, 0, 0))
|
||
|
|
||
|
# 计算处理时间
|
||
|
end_time = time.time()
|
||
|
processing_time = end_time - start_time
|
||
|
processing_times.append(processing_time)
|
||
|
|
||
|
# 实时显示处理结果
|
||
|
if show_live:
|
||
|
cv2.imshow("detect", annotated_frame)
|
||
|
if cv2.waitKey(1) == 27: # ESC键退出
|
||
|
break
|
||
|
|
||
|
# 保存处理后的帧
|
||
|
if save_annotated:
|
||
|
writer.write(annotated_frame)
|
||
|
|
||
|
# 更新进度条
|
||
|
pbar.update(1)
|
||
|
|
||
|
# 7. 清理资源
|
||
|
cap.release()
|
||
|
if save_annotated:
|
||
|
writer.release()
|
||
|
if show_live:
|
||
|
cv2.destroyAllWindows()
|
||
|
pbar.close()
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
# 示例用法
|
||
|
input_video = r"D:\yolo8\mp4\12333.mp4" # 替换为您的视频路径
|
||
|
output_video = r"output/annotated_video.mp4" # 输出视频路径
|
||
|
|
||
|
# 选择要检测的类别
|
||
|
selected_classes = [0]
|
||
|
|
||
|
process_video_with_detection(
|
||
|
input_video_path=input_video,
|
||
|
output_video_path=output_video,
|
||
|
model_name=r"D:\yolo8\结果文件\登轮\weights\best.pt", # 使用预训练模型
|
||
|
confidence_threshold=0.5, # 置信度阈值
|
||
|
classes=selected_classes, # 指定检测类别
|
||
|
show_live=True, # 实时显示处理
|
||
|
save_annotated=False # 保存结果视频
|
||
|
)
|