2025-08-02 12:38:52 +08:00

165 lines
5.4 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from typing import List, Optional
from core.database import get_db
from models.device import Device
from schemas.device import (
DeviceCreate,
DeviceUpdate,
DeviceResponse,
DeviceListResponse
)
router = APIRouter()
@router.post("/", response_model=DeviceResponse, summary="创建设备")
async def create_device(
device: DeviceCreate,
db: Session = Depends(get_db)
):
"""创建新的设备"""
db_device = Device(**device.dict())
db.add(db_device)
db.commit()
db.refresh(db_device)
return db_device
@router.get("/", response_model=DeviceListResponse, summary="获取设备列表")
async def get_devices(
skip: int = Query(0, ge=0, description="跳过记录数"),
limit: int = Query(10, ge=1, le=100, description="返回记录数"),
name: Optional[str] = Query(None, description="设备名称"),
device_type: Optional[str] = Query(None, description="设备类型"),
status: Optional[str] = Query(None, description="设备状态"),
location: Optional[str] = Query(None, description="设备位置"),
is_enabled: Optional[bool] = Query(None, description="是否启用"),
db: Session = Depends(get_db)
):
"""获取设备列表,支持分页和筛选"""
query = db.query(Device)
if name:
query = query.filter(Device.name.contains(name))
if device_type:
query = query.filter(Device.device_type == device_type)
if status:
query = query.filter(Device.status == status)
if location:
query = query.filter(Device.location.contains(location))
if is_enabled is not None:
query = query.filter(Device.is_enabled == is_enabled)
total = query.count()
devices = query.offset(skip).limit(limit).all()
return DeviceListResponse(
devices=devices,
total=total,
page=skip // limit + 1,
size=limit
)
@router.get("/{device_id}", response_model=DeviceResponse, summary="获取设备详情")
async def get_device(
device_id: int,
db: Session = Depends(get_db)
):
"""根据ID获取设备详情"""
device = db.query(Device).filter(Device.id == device_id).first()
if not device:
raise HTTPException(status_code=404, detail="设备不存在")
return device
@router.put("/{device_id}", response_model=DeviceResponse, summary="更新设备")
async def update_device(
device_id: int,
device: DeviceUpdate,
db: Session = Depends(get_db)
):
"""更新设备信息"""
db_device = db.query(Device).filter(Device.id == device_id).first()
if not db_device:
raise HTTPException(status_code=404, detail="设备不存在")
update_data = device.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_device, field, value)
db.commit()
db.refresh(db_device)
return db_device
@router.delete("/{device_id}", summary="删除设备")
async def delete_device(
device_id: int,
db: Session = Depends(get_db)
):
"""删除设备"""
device = db.query(Device).filter(Device.id == device_id).first()
if not device:
raise HTTPException(status_code=404, detail="设备不存在")
db.delete(device)
db.commit()
return {"message": "设备删除成功"}
@router.patch("/{device_id}/status", response_model=DeviceResponse, summary="更新设备状态")
async def update_device_status(
device_id: int,
status: str = Query(..., description="新状态"),
db: Session = Depends(get_db)
):
"""更新设备状态"""
device = db.query(Device).filter(Device.id == device_id).first()
if not device:
raise HTTPException(status_code=404, detail="设备不存在")
device.status = status
db.commit()
db.refresh(device)
return device
@router.patch("/{device_id}/enable", response_model=DeviceResponse, summary="启用/禁用设备")
async def toggle_device_enabled(
device_id: int,
enabled: bool = Query(..., description="是否启用"),
db: Session = Depends(get_db)
):
"""启用或禁用设备"""
device = db.query(Device).filter(Device.id == device_id).first()
if not device:
raise HTTPException(status_code=404, detail="设备不存在")
device.is_enabled = enabled
db.commit()
db.refresh(device)
return device
@router.get("/types/list", summary="获取设备类型列表")
async def get_device_types():
"""获取所有设备类型"""
return {
"types": [
{"value": "camera", "label": "摄像头"},
{"value": "sensor", "label": "传感器"},
{"value": "gate", "label": "门禁"},
{"value": "alarm", "label": "报警器"},
{"value": "other", "label": "其他"}
]
}
@router.get("/status/stats", summary="获取设备状态统计")
async def get_device_status_stats(db: Session = Depends(get_db)):
"""获取设备状态统计信息"""
total = db.query(Device).count()
online = db.query(Device).filter(Device.status == "online").count()
offline = db.query(Device).filter(Device.status == "offline").count()
error = db.query(Device).filter(Device.status == "error").count()
return {
"total": total,
"online": online,
"offline": offline,
"error": error,
"online_rate": round(online / total * 100, 2) if total > 0 else 0
}