181 lines
5.8 KiB
Python
Raw Permalink Normal View History

2025-06-09 14:59:40 +08:00
from sqlalchemy.orm import Session
import models
import schemas
from jose import JWTError, jwt
from passlib.context import CryptContext
from typing import List, Optional, Dict, Any
import datetime
import json
# 密码处理上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 验证密码
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
# 获取密码哈希
def get_password_hash(password):
return pwd_context.hash(password)
# 用户相关操作
def get_user(db: Session, username: str):
return db.query(models.User).filter(models.User.username == username).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, username: str, password: str, email: Optional[str] = None, full_name: Optional[str] = None):
hashed_password = get_password_hash(password)
db_user = models.User(
username=username,
email=email,
full_name=full_name,
hashed_password=hashed_password
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def authenticate_user(db: Session, username: str, password: str):
user = get_user(db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
# 人才相关操作
def get_talent(db: Session, talent_id: str):
return db.query(models.Talent).filter(models.Talent.id == talent_id).first()
def get_talents(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Talent).offset(skip).limit(limit).all()
def create_talent(db: Session, talent: schemas.TalentCreate):
db_talent = models.Talent(**talent.dict())
db.add(db_talent)
db.commit()
db.refresh(db_talent)
return db_talent
def update_talent(db: Session, talent_id: str, talent_data: Dict[str, Any]):
db_talent = get_talent(db, talent_id)
if db_talent:
for key, value in talent_data.items():
setattr(db_talent, key, value)
db.commit()
db.refresh(db_talent)
return db_talent
def delete_talent(db: Session, talent_id: str):
db_talent = get_talent(db, talent_id)
if db_talent:
db.delete(db_talent)
db.commit()
return True
return False
# 工程研究中心相关操作
def get_lab(db: Session, lab_id: str):
return db.query(models.Lab).filter(models.Lab.id == lab_id).first()
def get_labs(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Lab).offset(skip).limit(limit).all()
def create_lab(db: Session, lab: schemas.LabCreate):
db_lab = models.Lab(**lab.dict())
db.add(db_lab)
db.commit()
db.refresh(db_lab)
return db_lab
def update_lab(db: Session, lab_id: str, lab_data: Dict[str, Any]):
db_lab = get_lab(db, lab_id)
if db_lab:
for key, value in lab_data.items():
setattr(db_lab, key, value)
db.commit()
db.refresh(db_lab)
return db_lab
# 仪表盘数据相关操作
def get_dashboard(db: Session):
dashboard = db.query(models.DashboardData).first()
return dashboard
def get_news(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.News).offset(skip).limit(limit).all()
def create_news(db: Session, title: str, date: str, dashboard_id: int):
db_news = models.News(title=title, date=date, dashboard_id=dashboard_id)
db.add(db_news)
db.commit()
db.refresh(db_news)
return db_news
def update_dashboard(db: Session, dashboard_data: Dict[str, Any]):
dashboard = get_dashboard(db)
if dashboard:
# 更新简单字段
for key, value in dashboard_data.items():
if key != "newsData": # 新闻数据单独处理
setattr(dashboard, key, value)
# 如果有新闻数据需要更新
if "newsData" in dashboard_data:
# 删除所有旧新闻
db.query(models.News).filter(models.News.dashboard_id == dashboard.id).delete()
# 添加新的新闻
for news_item in dashboard_data["newsData"]:
db_news = models.News(
title=news_item["title"],
date=news_item["date"],
dashboard_id=dashboard.id
)
db.add(db_news)
db.commit()
db.refresh(dashboard)
return dashboard
# 维度相关操作
def get_dimension(db: Session, dimension_id: int):
return db.query(models.Dimension).filter(models.Dimension.id == dimension_id).first()
def get_dimensions_by_category(db: Session, category: str):
return db.query(models.Dimension).filter(models.Dimension.category == category).all()
def get_all_dimensions(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Dimension).offset(skip).limit(limit).all()
def create_dimension(db: Session, name: str, weight: float = 1.0, category: str = None, description: str = None):
db_dimension = models.Dimension(
name=name,
weight=weight,
category=category,
description=description
)
db.add(db_dimension)
db.commit()
db.refresh(db_dimension)
return db_dimension
def update_dimension(db: Session, dimension_id: int, dimension_data: Dict[str, Any]):
db_dimension = get_dimension(db, dimension_id)
if db_dimension:
for key, value in dimension_data.items():
setattr(db_dimension, key, value)
db.commit()
db.refresh(db_dimension)
return db_dimension
def delete_dimension(db: Session, dimension_id: int):
db_dimension = get_dimension(db, dimension_id)
if db_dimension:
db.delete(db_dimension)
db.commit()
return True
return False