2025-08-05 11:57:14 +08:00

185 lines
5.9 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any
from core.database import get_db
from models.device import Device
from models.event import Event
from models.algorithm import Algorithm
router = APIRouter()
@router.get("/", summary="获取监控列表")
async def get_monitors(
page: int = Query(1, ge=1, description="页码"),
size: int = Query(20, ge=1, le=100, description="每页数量"),
status: Optional[str] = Query(None, description="监控状态"),
location: Optional[str] = Query(None, description="位置筛选"),
db: Session = Depends(get_db)
):
"""获取监控列表,支持分页和筛选"""
try:
# 构建查询
query = db.query(Device).filter(Device.device_type == "camera")
if status:
query = query.filter(Device.status == status)
if location:
query = query.filter(Device.location.contains(location))
# 计算总数
total = query.count()
# 分页
skip = (page - 1) * size
devices = query.offset(skip).limit(size).all()
# 转换为监控格式
monitors = []
for device in devices:
# TODO: 获取实时检测数据
detections = []
if device.status == "online":
# 模拟检测数据
detections = [
{"type": "person", "x": 25, "y": 35, "width": 40, "height": 80}
]
monitors.append({
"id": device.id,
"name": device.name,
"location": device.location,
"status": device.status,
"video_url": f"/videos/port-{device.id}.mp4", # TODO: 实现真实视频URL
"detections": detections
})
return {
"monitors": monitors,
"total": total,
"page": page,
"size": size
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取监控列表失败: {str(e)}")
@router.get("/{monitor_id}", summary="获取监控详情")
async def get_monitor_detail(
monitor_id: int,
db: Session = Depends(get_db)
):
"""根据ID获取监控详情"""
try:
device = db.query(Device).filter(
Device.id == monitor_id,
Device.device_type == "camera"
).first()
if not device:
raise HTTPException(status_code=404, detail="监控不存在")
# TODO: 获取实时检测数据
detections = []
if device.status == "online":
detections = [
{"type": "person", "x": 25, "y": 35, "width": 40, "height": 80},
{"type": "vehicle", "x": 40, "y": 50, "width": 80, "height": 60}
]
# TODO: 获取相关事件
events = []
# TODO: 获取相关算法
algorithms = []
return {
"id": device.id,
"name": device.name,
"location": device.location,
"status": device.status,
"video_url": f"/videos/port-{device.id}.mp4", # TODO: 实现真实视频URL
"detections": detections,
"events": events,
"algorithms": algorithms
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取监控详情失败: {str(e)}")
@router.get("/{monitor_id}/stream", summary="获取监控视频流")
async def get_monitor_stream(
monitor_id: int,
db: Session = Depends(get_db)
):
"""获取监控视频流URL"""
try:
device = db.query(Device).filter(
Device.id == monitor_id,
Device.device_type == "camera"
).first()
if not device:
raise HTTPException(status_code=404, detail="监控不存在")
# TODO: 实现真实的视频流URL生成
stream_url = f"rtsp://{device.ip_address}:554/stream"
return {
"monitor_id": monitor_id,
"stream_url": stream_url,
"status": device.status
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取视频流失败: {str(e)}")
@router.get("/{monitor_id}/detections", summary="获取监控检测数据")
async def get_monitor_detections(
monitor_id: int,
db: Session = Depends(get_db)
):
"""获取监控实时检测数据"""
try:
device = db.query(Device).filter(
Device.id == monitor_id,
Device.device_type == "camera"
).first()
if not device:
raise HTTPException(status_code=404, detail="监控不存在")
# TODO: 实现真实的检测数据获取
# 当前返回模拟数据
detections = []
if device.status == "online":
detections = [
{
"type": "person",
"x": 25,
"y": 35,
"width": 40,
"height": 80,
"confidence": 0.95,
"timestamp": "2024-01-15T10:30:00Z"
},
{
"type": "vehicle",
"x": 40,
"y": 50,
"width": 80,
"height": 60,
"confidence": 0.88,
"timestamp": "2024-01-15T10:30:00Z"
}
]
return {
"monitor_id": monitor_id,
"detections": detections,
"timestamp": "2024-01-15T10:30:00Z"
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取检测数据失败: {str(e)}")