from sqlalchemy.orm import Session import models import schemas from jose import JWTError, jwt from passlib.context import CryptContext from typing import List, Optional, Dict, Any import datetime import json # 密码处理上下文 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # 验证密码 def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) # 获取密码哈希 def get_password_hash(password): return pwd_context.hash(password) # 用户相关操作 def get_user(db: Session, username: str): return db.query(models.User).filter(models.User.username == username).first() def get_users(db: Session, skip: int = 0, limit: int = 100): return db.query(models.User).offset(skip).limit(limit).all() def create_user(db: Session, username: str, password: str, email: Optional[str] = None, full_name: Optional[str] = None): hashed_password = get_password_hash(password) db_user = models.User( username=username, email=email, full_name=full_name, hashed_password=hashed_password ) db.add(db_user) db.commit() db.refresh(db_user) return db_user def authenticate_user(db: Session, username: str, password: str): user = get_user(db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return user # 人才相关操作 def get_talent(db: Session, talent_id: str): return db.query(models.Talent).filter(models.Talent.id == talent_id).first() def get_talents(db: Session, skip: int = 0, limit: int = 100): return db.query(models.Talent).offset(skip).limit(limit).all() def create_talent(db: Session, talent: schemas.TalentCreate): db_talent = models.Talent(**talent.dict()) db.add(db_talent) db.commit() db.refresh(db_talent) return db_talent def update_talent(db: Session, talent_id: str, talent_data: Dict[str, Any]): db_talent = get_talent(db, talent_id) if db_talent: for key, value in talent_data.items(): setattr(db_talent, key, value) db.commit() db.refresh(db_talent) return db_talent def delete_talent(db: Session, talent_id: str): db_talent = get_talent(db, talent_id) if db_talent: db.delete(db_talent) db.commit() return True return False # 工程研究中心相关操作 def get_lab(db: Session, lab_id: str): return db.query(models.Lab).filter(models.Lab.id == lab_id).first() def get_labs(db: Session, skip: int = 0, limit: int = 100): return db.query(models.Lab).offset(skip).limit(limit).all() def create_lab(db: Session, lab: schemas.LabCreate): db_lab = models.Lab(**lab.dict()) db.add(db_lab) db.commit() db.refresh(db_lab) return db_lab def update_lab(db: Session, lab_id: str, lab_data: Dict[str, Any]): db_lab = get_lab(db, lab_id) if db_lab: for key, value in lab_data.items(): setattr(db_lab, key, value) db.commit() db.refresh(db_lab) return db_lab # 仪表盘数据相关操作 def get_dashboard(db: Session): dashboard = db.query(models.DashboardData).first() return dashboard def get_news(db: Session, skip: int = 0, limit: int = 100): return db.query(models.News).offset(skip).limit(limit).all() def create_news(db: Session, title: str, date: str, dashboard_id: int): db_news = models.News(title=title, date=date, dashboard_id=dashboard_id) db.add(db_news) db.commit() db.refresh(db_news) return db_news def update_dashboard(db: Session, dashboard_data: Dict[str, Any]): dashboard = get_dashboard(db) if dashboard: # 更新简单字段 for key, value in dashboard_data.items(): if key != "newsData": # 新闻数据单独处理 setattr(dashboard, key, value) # 如果有新闻数据需要更新 if "newsData" in dashboard_data: # 删除所有旧新闻 db.query(models.News).filter(models.News.dashboard_id == dashboard.id).delete() # 添加新的新闻 for news_item in dashboard_data["newsData"]: db_news = models.News( title=news_item["title"], date=news_item["date"], dashboard_id=dashboard.id ) db.add(db_news) db.commit() db.refresh(dashboard) return dashboard # 维度相关操作 def get_dimension(db: Session, dimension_id: int): return db.query(models.Dimension).filter(models.Dimension.id == dimension_id).first() def get_dimensions_by_category(db: Session, category: str): return db.query(models.Dimension).filter(models.Dimension.category == category).all() def get_all_dimensions(db: Session, skip: int = 0, limit: int = 100): return db.query(models.Dimension).offset(skip).limit(limit).all() def create_dimension(db: Session, name: str, weight: float = 1.0, category: str = None, description: str = None): db_dimension = models.Dimension( name=name, weight=weight, category=category, description=description ) db.add(db_dimension) db.commit() db.refresh(db_dimension) return db_dimension def update_dimension(db: Session, dimension_id: int, dimension_data: Dict[str, Any]): db_dimension = get_dimension(db, dimension_id) if db_dimension: for key, value in dimension_data.items(): setattr(db_dimension, key, value) db.commit() db.refresh(db_dimension) return db_dimension def delete_dimension(db: Session, dimension_id: int): db_dimension = get_dimension(db, dimension_id) if db_dimension: db.delete(db_dimension) db.commit() return True return False