dashboard/backend/crud.py
2025-06-09 14:59:40 +08:00

181 lines
5.8 KiB
Python

from sqlalchemy.orm import Session
import models
import schemas
from jose import JWTError, jwt
from passlib.context import CryptContext
from typing import List, Optional, Dict, Any
import datetime
import json
# 密码处理上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 验证密码
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
# 获取密码哈希
def get_password_hash(password):
return pwd_context.hash(password)
# 用户相关操作
def get_user(db: Session, username: str):
return db.query(models.User).filter(models.User.username == username).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, username: str, password: str, email: Optional[str] = None, full_name: Optional[str] = None):
hashed_password = get_password_hash(password)
db_user = models.User(
username=username,
email=email,
full_name=full_name,
hashed_password=hashed_password
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def authenticate_user(db: Session, username: str, password: str):
user = get_user(db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
# 人才相关操作
def get_talent(db: Session, talent_id: str):
return db.query(models.Talent).filter(models.Talent.id == talent_id).first()
def get_talents(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Talent).offset(skip).limit(limit).all()
def create_talent(db: Session, talent: schemas.TalentCreate):
db_talent = models.Talent(**talent.dict())
db.add(db_talent)
db.commit()
db.refresh(db_talent)
return db_talent
def update_talent(db: Session, talent_id: str, talent_data: Dict[str, Any]):
db_talent = get_talent(db, talent_id)
if db_talent:
for key, value in talent_data.items():
setattr(db_talent, key, value)
db.commit()
db.refresh(db_talent)
return db_talent
def delete_talent(db: Session, talent_id: str):
db_talent = get_talent(db, talent_id)
if db_talent:
db.delete(db_talent)
db.commit()
return True
return False
# 工程研究中心相关操作
def get_lab(db: Session, lab_id: str):
return db.query(models.Lab).filter(models.Lab.id == lab_id).first()
def get_labs(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Lab).offset(skip).limit(limit).all()
def create_lab(db: Session, lab: schemas.LabCreate):
db_lab = models.Lab(**lab.dict())
db.add(db_lab)
db.commit()
db.refresh(db_lab)
return db_lab
def update_lab(db: Session, lab_id: str, lab_data: Dict[str, Any]):
db_lab = get_lab(db, lab_id)
if db_lab:
for key, value in lab_data.items():
setattr(db_lab, key, value)
db.commit()
db.refresh(db_lab)
return db_lab
# 仪表盘数据相关操作
def get_dashboard(db: Session):
dashboard = db.query(models.DashboardData).first()
return dashboard
def get_news(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.News).offset(skip).limit(limit).all()
def create_news(db: Session, title: str, date: str, dashboard_id: int):
db_news = models.News(title=title, date=date, dashboard_id=dashboard_id)
db.add(db_news)
db.commit()
db.refresh(db_news)
return db_news
def update_dashboard(db: Session, dashboard_data: Dict[str, Any]):
dashboard = get_dashboard(db)
if dashboard:
# 更新简单字段
for key, value in dashboard_data.items():
if key != "newsData": # 新闻数据单独处理
setattr(dashboard, key, value)
# 如果有新闻数据需要更新
if "newsData" in dashboard_data:
# 删除所有旧新闻
db.query(models.News).filter(models.News.dashboard_id == dashboard.id).delete()
# 添加新的新闻
for news_item in dashboard_data["newsData"]:
db_news = models.News(
title=news_item["title"],
date=news_item["date"],
dashboard_id=dashboard.id
)
db.add(db_news)
db.commit()
db.refresh(dashboard)
return dashboard
# 维度相关操作
def get_dimension(db: Session, dimension_id: int):
return db.query(models.Dimension).filter(models.Dimension.id == dimension_id).first()
def get_dimensions_by_category(db: Session, category: str):
return db.query(models.Dimension).filter(models.Dimension.category == category).all()
def get_all_dimensions(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Dimension).offset(skip).limit(limit).all()
def create_dimension(db: Session, name: str, weight: float = 1.0, category: str = None, description: str = None):
db_dimension = models.Dimension(
name=name,
weight=weight,
category=category,
description=description
)
db.add(db_dimension)
db.commit()
db.refresh(db_dimension)
return db_dimension
def update_dimension(db: Session, dimension_id: int, dimension_data: Dict[str, Any]):
db_dimension = get_dimension(db, dimension_id)
if db_dimension:
for key, value in dimension_data.items():
setattr(db_dimension, key, value)
db.commit()
db.refresh(db_dimension)
return db_dimension
def delete_dimension(db: Session, dimension_id: int):
db_dimension = get_dimension(db, dimension_id)
if db_dimension:
db.delete(db_dimension)
db.commit()
return True
return False