2025-08-05 11:57:14 +08:00

236 lines
7.7 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any
from datetime import datetime
from core.database import get_db
from models.event import Event
router = APIRouter()
@router.get("/", summary="获取告警列表")
async def get_alarms(
page: int = Query(1, ge=1, description="页码"),
size: int = Query(20, ge=1, le=100, description="每页数量"),
severity: Optional[str] = Query(None, description="严重程度"),
status: Optional[str] = Query(None, description="告警状态"),
start_time: Optional[str] = Query(None, description="开始时间"),
end_time: Optional[str] = Query(None, description="结束时间"),
db: Session = Depends(get_db)
):
"""获取告警列表,支持分页和筛选"""
try:
# 构建查询 - 只查询告警事件
query = db.query(Event).filter(Event.is_alert == True)
if severity:
query = query.filter(Event.severity == severity)
if status:
query = query.filter(Event.status == status)
if start_time:
try:
start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
query = query.filter(Event.created_at >= start_dt)
except ValueError:
pass
if end_time:
try:
end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
query = query.filter(Event.created_at <= end_dt)
except ValueError:
pass
# 按创建时间倒序排列
query = query.order_by(Event.created_at.desc())
# 计算总数
total = query.count()
# 分页
skip = (page - 1) * size
events = query.offset(skip).limit(size).all()
# 转换为告警格式
alarms = []
for event in events:
# TODO: 获取设备信息
device_name = f"设备{event.device_id}" # 临时设备名称
alarms.append({
"id": event.id,
"type": event.event_type,
"severity": event.severity,
"status": event.status,
"device": device_name,
"created_at": event.created_at.isoformat(),
"description": event.description
})
return {
"alarms": alarms,
"total": total,
"page": page,
"size": size
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取告警列表失败: {str(e)}")
@router.get("/{alarm_id}", summary="获取告警详情")
async def get_alarm_detail(
alarm_id: int,
db: Session = Depends(get_db)
):
"""根据ID获取告警详情"""
try:
event = db.query(Event).filter(
Event.id == alarm_id,
Event.is_alert == True
).first()
if not event:
raise HTTPException(status_code=404, detail="告警不存在")
# TODO: 获取设备信息
device_name = f"设备{event.device_id}"
return {
"id": event.id,
"type": event.event_type,
"severity": event.severity,
"status": event.status,
"device": device_name,
"created_at": event.created_at.isoformat(),
"description": event.description,
"image_path": event.image_path,
"video_path": event.video_path,
"confidence": event.confidence,
"bbox": event.bbox,
"resolution_notes": event.resolution_notes,
"resolved_at": event.resolved_at.isoformat() if event.resolved_at else None
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取告警详情失败: {str(e)}")
@router.patch("/{alarm_id}/resolve", summary="处理告警")
async def resolve_alarm(
alarm_id: int,
resolution_data: Dict[str, Any],
db: Session = Depends(get_db)
):
"""处理告警"""
try:
event = db.query(Event).filter(
Event.id == alarm_id,
Event.is_alert == True
).first()
if not event:
raise HTTPException(status_code=404, detail="告警不存在")
# 更新告警状态
event.status = "resolved"
event.resolution_notes = resolution_data.get("resolution_notes", "")
event.resolved_at = datetime.now()
db.commit()
return {
"id": event.id,
"status": "resolved",
"resolved_at": event.resolved_at.isoformat(),
"message": "告警已处理"
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"处理告警失败: {str(e)}")
@router.get("/stats", summary="获取告警统计")
async def get_alarm_stats(db: Session = Depends(get_db)):
"""获取告警统计数据"""
try:
# 总告警数
total_alarms = db.query(Event).filter(Event.is_alert == True).count()
# 待处理告警数
pending_alarms = db.query(Event).filter(
Event.is_alert == True,
Event.status == "pending"
).count()
# 已处理告警数
resolved_alarms = db.query(Event).filter(
Event.is_alert == True,
Event.status == "resolved"
).count()
# TODO: 按严重程度统计
# 当前返回模拟数据
by_severity = [
{"severity": "high", "count": 12},
{"severity": "medium", "count": 45},
{"severity": "low", "count": 32}
]
return {
"total_alarms": total_alarms,
"pending_alarms": pending_alarms,
"resolved_alarms": resolved_alarms,
"by_severity": by_severity
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取告警统计失败: {str(e)}")
@router.get("/types/list", summary="获取告警类型列表")
async def get_alarm_types():
"""获取告警类型列表"""
try:
# TODO: 从数据库获取告警类型
# 当前返回固定类型
alarm_types = [
"船舶靠泊",
"船舶离泊",
"人员登轮",
"人员离轮",
"电脑弹窗",
"越界检测",
"车辆识别",
"货物识别"
]
return {
"alarm_types": alarm_types
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取告警类型失败: {str(e)}")
@router.get("/trend", summary="获取告警趋势")
async def get_alarm_trend(
days: int = Query(7, ge=1, le=30, description="统计天数"),
db: Session = Depends(get_db)
):
"""获取告警趋势数据"""
try:
# TODO: 实现真实的告警趋势统计
# 当前返回模拟数据
from datetime import timedelta
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days-1)
dates = []
alarm_counts = []
for i in range(days):
current_date = start_date + timedelta(days=i)
dates.append(current_date.strftime("%Y-%m-%d"))
# 模拟数据
alarm_counts.append(10 + (i * 2) % 20)
return {
"dates": dates,
"alarm_counts": alarm_counts
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取告警趋势失败: {str(e)}")