from fastapi import FastAPI, Depends, HTTPException, status, Body, UploadFile, File from fastapi.middleware.cors import CORSMiddleware from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from fastapi.staticfiles import StaticFiles from jose import JWTError, jwt from passlib.context import CryptContext from datetime import datetime, timedelta from pydantic import BaseModel from typing import List, Optional, Dict, Any import json import requests from bs4 import BeautifulSoup from fastapi.responses import JSONResponse import os import pickle import random import urllib.parse import uuid import shutil from sqlalchemy.orm import Session import base64 import docx # 用于解析Word文档 import re # 用于正则表达式匹配 # 导入数据库相关模块 from database import get_db, engine import models import schemas import crud # JWT配置 SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 # 创建FastAPI应用 app = FastAPI(title="科研评估管理系统API") # 配置CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], # 在生产环境应该设置为具体的前端域名 allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 创建静态文件目录 STATIC_DIR = "static" IMAGES_DIR = os.path.join(STATIC_DIR, "images") os.makedirs(IMAGES_DIR, exist_ok=True) # 挂载静态文件目录 app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") # 密码处理上下文 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # 数据模型 class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None class UserInDB(User): hashed_password: str class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): username: str class Talent(BaseModel): id: str idcode: Optional[str] = None name: str gender: Optional[str] = None birthDate: Optional[str] = None title: Optional[str] = None position: Optional[str] = None education: Optional[str] = None address: Optional[str] = None academicDirection: Optional[str] = None talentPlan: Optional[str] = None officeLocation: Optional[str] = None email: Optional[str] = None phone: Optional[str] = None tutorType: Optional[str] = None papers: Optional[str] = None projects: Optional[str] = None photo: Optional[str] = None eduWorkHistory: Optional[str] = None researchDirection: Optional[str] = None recentProjects: Optional[str] = None representativePapers: Optional[str] = None patents: Optional[str] = None evaluationData: Optional[List[float]] = None class Lab(BaseModel): id: str idcode: Optional[str] = None name: str personnel: Optional[str] = None nationalProjects: Optional[str] = None otherProjects: Optional[str] = None achievements: Optional[str] = None labAchievements: Optional[str] = None image: Optional[str] = None score: Optional[int] = None evaluationData: Optional[List[float]] = None class DashboardData(BaseModel): paperCount: int patentCount: int highImpactPapers: int keyProjects: int fundingAmount: str researcherStats: dict newsData: List[dict] # URL抓取模型 class ScrapeRequest(BaseModel): url: str # 保存评估数据模型 class SaveDataRequest(BaseModel): data_type: str # "talent" 或 "lab" data: dict # 文件存储路径 DATA_DIR = "data" TALENT_DATA_FILE = os.path.join(DATA_DIR, "talents.pkl") LAB_DATA_FILE = os.path.join(DATA_DIR, "labs.pkl") # 确保数据目录存在 os.makedirs(DATA_DIR, exist_ok=True) # 从文件加载数据 def load_data(file_path, default_data): if os.path.exists(file_path): try: with open(file_path, 'rb') as f: return pickle.load(f) except Exception as e: print(f"Error loading data from {file_path}: {e}") return default_data # 保存数据到文件 def save_data(file_path, data): try: with open(file_path, 'wb') as f: pickle.dump(data, f) return True except Exception as e: print(f"Error saving data to {file_path}: {e}") return False # 加载已保存的数据 talents = load_data(TALENT_DATA_FILE, []) labs = load_data(LAB_DATA_FILE, []) # 模拟数据库 fake_users_db = { "admin": { "username": "admin", "full_name": "管理员", "email": "admin@example.com", "hashed_password": pwd_context.hash("123456"), "disabled": False, } } # 模拟人才数据 talents = [ { "id": "BLG45187", "name": "张三", "gender": "男", "birthDate": "1980.01.01", "title": "教授", "position": "副院长", "education": "博士", "address": "北京市海淀区中关村南大街5号", "academicDirection": "人工智能", "talentPlan": "国家杰出青年", "officeLocation": "理工大厦A座201", "email": "zhangsan@example.com", "phone": "13800138000", "tutorType": "博士生导师", "papers": "15篇", "projects": "23项", "photo": "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='100%25' height='100%25' viewBox='0 0 100 100'%3E%3Crect width='100' height='100' fill='%234986ff' opacity='0.2'/%3E%3Ccircle cx='50' cy='40' r='20' fill='%234986ff' opacity='0.5'/%3E%3Cpath d='M30,80 Q50,60 70,80 L70,100 L30,100 Z' fill='%234986ff' opacity='0.5'/%3E%3C/svg%3E", "eduWorkHistory": "2005年毕业于北京理工大学,获博士学位\n2005-2010年在清华大学从事博士后研究\n2010年至今在北京理工大学任教", "researchDirection": "机器学习、深度学习、计算机视觉", "recentProjects": "国家自然科学基金重点项目:深度学习在计算机视觉中的应用研究\n国家重点研发计划项目:智能机器人视觉感知系统研发", "representativePapers": "[1] 机器学习在自动化控制中的应用\n[2] 深度强化学习研究进展", "patents": "一种基于深度学习的图像识别方法\n一种智能控制系统及其控制方法", "evaluationData": [85, 90, 78, 82, 76, 88] }, { "id": "BLG45188", "name": "李四", "gender": "男", "birthDate": "1982.05.15", "title": "副教授", "position": "系主任", "education": "博士", "address": "北京市海淀区中关村南大街5号", "academicDirection": "材料科学", "talentPlan": "青年千人计划", "officeLocation": "理工大厦B座305", "email": "lisi@example.com", "phone": "13900139000", "tutorType": "硕士生导师", "papers": "12篇", "projects": "18项", "photo": "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='100%25' height='100%25' viewBox='0 0 100 100'%3E%3Crect width='100' height='100' fill='%234986ff' opacity='0.2'/%3E%3Ccircle cx='50' cy='40' r='20' fill='%234986ff' opacity='0.5'/%3E%3Cpath d='M30,80 Q50,60 70,80 L70,100 L30,100 Z' fill='%234986ff' opacity='0.5'/%3E%3C/svg%3E", "eduWorkHistory": "2008年毕业于中国科学院,获博士学位\n2008-2013年在美国麻省理工学院从事博士后研究\n2013年至今在北京理工大学任教", "researchDirection": "新能源材料、纳米材料、催化材料", "recentProjects": "国家自然科学基金面上项目:高性能催化材料的设计与合成\n企业合作项目:新型锂电池材料开发", "representativePapers": "[1] 高性能催化材料的设计与合成\n[2] 纳米材料在新能源领域的应用", "patents": "一种高效催化材料的制备方法\n一种纳米材料的合成工艺", "evaluationData": [92, 85, 76, 89, 78, 82] } ] # 模拟工程研究中心数据 labs = [ { "id": "BLG45187", "name": "基础力学教学实验中心", "personnel": "30人", "nationalProjects": "10项", "otherProjects": "46项", "achievements": "28项", "labAchievements": "中心面向全校本科生开设力学类实验课程,获国家级教学成果奖1项,省部级教学成果奖2项。开发研制教学实验装置20余台套,获得国家专利15项。建设国家级精品课程2门,国家级精品资源共享课1门。", "image": "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='100%25' height='100%25' viewBox='0 0 300 200'%3E%3Crect width='300' height='200' fill='%234986ff' opacity='0.2'/%3E%3Crect x='20' y='40' width='260' height='120' fill='%234986ff' opacity='0.3'/%3E%3Ccircle cx='70' cy='70' r='20' fill='%234986ff' opacity='0.5'/%3E%3Crect x='120' y='50' width='140' height='40' fill='%234986ff' opacity='0.4'/%3E%3Crect x='120' y='110' width='140' height='30' fill='%234986ff' opacity='0.4'/%3E%3C/svg%3E", "score": 98, "evaluationData": [85, 90, 78, 82, 76, 88] }, { "id": "BLG45188", "name": "高性能计算工程研究中心", "personnel": "25人", "nationalProjects": "8项", "otherProjects": "37项", "achievements": "22项", "labAchievements": "工程研究中心围绕高性能计算、并行计算、分布式系统等方向开展研究。建有超级计算机集群,计算能力达到100 TFLOPS。在国际顶级期刊和会议发表论文50余篇,获国家发明专利12项。与多家知名IT企业建立了产学研合作关系。", "image": "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='100%25' height='100%25' viewBox='0 0 300 200'%3E%3Crect width='300' height='200' fill='%234986ff' opacity='0.1'/%3E%3Cpath d='M60,20 L240,20 L240,180 L60,180 Z' fill='%234986ff' opacity='0.2'/%3E%3Cpath d='M80,40 L140,40 L140,160 L80,160 Z' fill='%234986ff' opacity='0.3'/%3E%3Cpath d='M160,40 L220,40 L220,160 L160,160 Z' fill='%234986ff' opacity='0.3'/%3E%3Cline x1='80' y1='60' x2='140' y2='60' stroke='%23fff' stroke-width='1' opacity='0.5' /%3E%3Cline x1='80' y1='80' x2='140' y2='80' stroke='%23fff' stroke-width='1' opacity='0.5' /%3E%3Cline x1='80' y1='100' x2='140' y2='100' stroke='%23fff' stroke-width='1' opacity='0.5' /%3E%3Cline x1='80' y1='120' x2='140' y2='120' stroke='%23fff' stroke-width='1' opacity='0.5' /%3E%3Cline x1='80' y1='140' x2='140' y2='140' stroke='%23fff' stroke-width='1' opacity='0.5' /%3E%3Cline x1='160' y1='60' x2='220' y2='60' stroke='%23fff' stroke-width='1' opacity='0.5' /%3E%3Cline x1='160' y1='80' x2='220' y2='80' stroke='%23fff' stroke-width='1' opacity='0.5' /%3E%3Cline x1='160' y1='100' x2='220' y2='100' stroke='%23fff' stroke-width='1' opacity='0.5' /%3E%3Cline x1='160' y1='120' x2='220' y2='120' stroke='%23fff' stroke-width='1' opacity='0.5' /%3E%3Cline x1='160' y1='140' x2='220' y2='140' stroke='%23fff' stroke-width='1' opacity='0.5' /%3E%3C/svg%3E", "score": 94, "evaluationData": [92, 85, 76, 89, 78, 82] } ] # 模拟仪表盘数据 dashboard_data = { "paperCount": 3500, "patentCount": 2000, "highImpactPapers": 100, "keyProjects": 50, "fundingAmount": "500万元", "researcherStats": { "professor": 120, "associateProfessor": 180, "assistantProfessor": 150, "postdoc": 90, "phd": 250, "master": 400 }, "newsData": [ {"title": "北京理工大学获批国家重点研发计划项目", "date": "2023-09-15"}, {"title": "我校教授在Nature期刊发表重要研究成果", "date": "2023-08-30"}, {"title": "北京理工大学举办2023年学术科技节", "date": "2023-08-20"}, {"title": "我校研究团队在量子计算领域取得突破性进展", "date": "2023-08-10"}, {"title": "北京理工大学与华为公司签署战略合作协议", "date": "2023-07-25"}, {"title": "北京理工大学新增两个国家重点工程研究中心", "date": "2023-07-15"}, {"title": "我校研究生在国际大赛中获一等奖", "date": "2023-07-01"}, {"title": "北京理工大学成功研发新型纳米材料", "date": "2023-06-20"}, {"title": "我校科研团队获2023年度国家科技进步奖提名", "date": "2023-06-10"}, {"title": "北京理工大学举办人工智能与未来教育论坛", "date": "2023-05-25"} ] } # 工具函数 def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) return None def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return user def create_access_token(data: dict, expires_delta: timedelta = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = crud.get_user(db, username=token_data.username) if user is None: raise credentials_exception return user async def get_current_active_user(current_user: models.User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user # 下载并保存图片 def download_and_save_image(image_url, url_base=""): try: # 设置请求头,模拟浏览器访问 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Connection': 'keep-alive', 'Referer': url_base } # 如果URL是相对路径,则与基础URL合并 if image_url.startswith('../') or image_url.startswith('./'): image_url = urllib.parse.urljoin(url_base, image_url) # 发送请求获取图片 response = requests.get(image_url, stream=True, headers=headers, timeout=30) response.raise_for_status() # 获取图片扩展名 content_type = response.headers.get('Content-Type', '') if 'image/jpeg' in content_type or image_url.lower().endswith('.jpg') or image_url.lower().endswith('.jpeg'): ext = '.jpg' elif 'image/png' in content_type or image_url.lower().endswith('.png'): ext = '.png' elif 'image/gif' in content_type or image_url.lower().endswith('.gif'): ext = '.gif' else: ext = '.png' # 默认使用PNG格式 # 生成唯一文件名 image_filename = f"{uuid.uuid4().hex}{ext}" image_path = os.path.join(IMAGES_DIR, image_filename) # 保存图片到本地 with open(image_path, 'wb') as out_file: shutil.copyfileobj(response.raw, out_file) # 返回图片的静态访问地址 return f"/static/images/{image_filename}" except Exception as e: print(f"Error downloading image: {e}") return None # 路由 @app.post("/token", response_model=Token) async def login_for_access_token( form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db) ): user = crud.authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} @app.get("/users/me", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user @app.get("/dashboard") async def get_dashboard(db: Session = Depends(get_db)): dashboard = crud.get_dashboard(db) if dashboard: # 获取相关新闻 news = db.query(models.News).filter(models.News.dashboard_id == dashboard.id).all() # 构建响应 result = { "paperCount": dashboard.paperCount, "patentCount": dashboard.patentCount, "highImpactPapers": dashboard.highImpactPapers, "keyProjects": dashboard.keyProjects, "fundingAmount": dashboard.fundingAmount, "researcherStats": dashboard.researcherStats, "newsData": [{"title": n.title, "date": n.date} for n in news] } return result raise HTTPException(status_code=404, detail="Dashboard data not found") @app.get("/talents", response_model=List[schemas.Talent]) async def get_talents(db: Session = Depends(get_db)): talents = crud.get_talents(db) return talents @app.post("/talents", response_model=schemas.Talent) async def create_talent( talent: schemas.TalentCreate, current_user: models.User = Depends(get_current_active_user), db: Session = Depends(get_db) ): return crud.create_talent(db, talent) @app.get("/labs", response_model=List[schemas.Lab]) async def get_labs(db: Session = Depends(get_db)): labs = crud.get_labs(db) return labs @app.post("/labs", response_model=schemas.Lab) async def create_lab( lab: schemas.LabCreate, current_user: models.User = Depends(get_current_active_user), db: Session = Depends(get_db) ): return crud.create_lab(db, lab) # 健康检查接口 @app.get("/health") async def health_check(): return {"status": "healthy"} # 调试接口 - 检查用户状态 @app.get("/debug/users") async def debug_users(db: Session = Depends(get_db)): try: users = db.query(models.User).all() user_list = [] for user in users: user_list.append({ "username": user.username, "email": user.email, "full_name": user.full_name, "disabled": user.disabled, "has_password": bool(user.hashed_password) }) return { "total_users": len(users), "users": user_list } except Exception as e: return {"error": str(e)} # 调试接口 - 重新创建默认用户 @app.post("/debug/create-admin") async def debug_create_admin(db: Session = Depends(get_db)): try: # 检查是否已经存在admin用户 existing_user = db.query(models.User).filter(models.User.username == "admin").first() if existing_user: return {"message": "Admin用户已存在", "user": existing_user.username} # 创建admin用户 hashed_password = get_password_hash("123456") default_user = models.User( username="admin", email="admin@example.com", full_name="系统管理员", hashed_password=hashed_password, disabled=False ) db.add(default_user) db.commit() db.refresh(default_user) return {"message": "Admin用户创建成功", "user": default_user.username} except Exception as e: db.rollback() return {"error": str(e)} # URL抓取接口 - 更新版本 @app.post("/api/scrape-url") async def scrape_url(request: schemas.ScrapeRequest): try: # 设置请求头,模拟浏览器访问 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Cache-Control': 'max-age=0' } # 发送HTTP请求获取页面内容,增加超时时间 response = requests.get(request.url, headers=headers, timeout=30) response.raise_for_status() # 如果请求失败,抛出异常 # 设置编码以正确处理中文字符 response.encoding = 'utf-8' # 解析HTML soup = BeautifulSoup(response.text, 'html.parser') # 获取基础URL用于解析相对路径 url_parts = urllib.parse.urlparse(request.url) base_url = f"{url_parts.scheme}://{url_parts.netloc}" # 初始化数据字典 teacher_data = { "id": f"BLG{random.randint(10000, 99999)}", "photo": f"data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='100' height='120' viewBox='0 0 100 120'%3E%3Crect width='100' height='120' fill='%234986ff' opacity='0.3'/%3E%3Ccircle cx='50' cy='45' r='25' fill='%234986ff' opacity='0.6'/%3E%3Ccircle cx='50' cy='95' r='35' fill='%234986ff' opacity='0.6'/%3E%3C/svg%3E", "evaluationData": [ round(min(100, max(60, 70 + 20 * (0.5 - random.random())))) for _ in range(6) ] } # 从教师信息表提取基本信息 info_table = soup.find('div', class_='wz_teacher') if info_table: table = info_table.find('table') if table: rows = table.find_all('tr') # 提取姓名、性别、出生年月 if len(rows) > 0: cells = rows[0].find_all('td') if len(cells) >= 6: teacher_data["name"] = cells[1].text.strip() teacher_data["gender"] = cells[3].text.strip() teacher_data["birthDate"] = cells[5].text.strip() # 提取职称、职务、最高学历 if len(rows) > 1: cells = rows[1].find_all('td') if len(cells) >= 6: teacher_data["title"] = cells[1].text.strip() position = cells[3].text.strip() teacher_data["position"] = position if position else "" teacher_data["education"] = cells[5].text.strip() # 提取学科方向 if len(rows) > 2: cells = rows[2].find_all('td') if len(cells) >= 2: teacher_data["academicDirection"] = cells[1].text.strip() # 提取人才计划和办公地点 if len(rows) > 3: cells = rows[3].find_all('td') if len(cells) >= 6: talent_plan = cells[1].text.strip() teacher_data["talentPlan"] = talent_plan if talent_plan else "" teacher_data["officeLocation"] = cells[5].text.strip() # 提取电子邮件和联系方式 if len(rows) > 4: cells = rows[4].find_all('td') if len(cells) >= 6: email = cells[1].text.strip() teacher_data["email"] = email if email else "" phone = cells[5].text.strip() teacher_data["phone"] = phone if phone else "" # 提取通讯地址 if len(rows) > 5: cells = rows[5].find_all('td') if len(cells) >= 2: teacher_data["address"] = cells[1].text.strip() # 提取导师类型 if len(rows) > 6: cells = rows[6].find_all('td') if len(cells) >= 2: teacher_data["tutorType"] = cells[1].text.strip() # 提取照片 photo_element = soup.select_one('.teacherInfo .img img') if photo_element and photo_element.get('src'): img_src = photo_element['src'] # 处理相对路径,构建完整的图片URL if img_src.startswith('../../../'): # 移除 '../../../' 前缀,直接拼接到基础URL img_relative = img_src[9:] # 移除 '../../../' img_url = f"{base_url}/{img_relative}" elif img_src.startswith('../../'): # 处理 '../../' 相对路径 img_relative = img_src[6:] # 移除 '../../' img_url = f"{base_url}/{img_relative}" elif img_src.startswith('../'): # 处理 '../' 相对路径 img_relative = img_src[3:] # 移除 '../' img_url = f"{base_url}/{img_relative}" else: img_url = urllib.parse.urljoin(base_url, img_src) # 直接保存完整的图片URL,不下载到本地 teacher_data["photo"] = img_url # 提取详细信息部分 content_divs = soup.select('.con01_t') for div in content_divs: heading = div.find('h3') if not heading: continue heading_text = heading.text.strip() # 获取该部分的所有段落文本 paragraphs = [p.text.strip() for p in div.find_all('p') if p.text.strip()] section_content = '\n'.join(paragraphs) # 根据标题将内容映射到相应字段 if '教育与工作经历' in heading_text: teacher_data["eduWorkHistory"] = section_content elif '研究方向' in heading_text: teacher_data["researchDirection"] = section_content elif '近5年承担的科研项目' in heading_text or '近五年承担的科研项目' in heading_text: teacher_data["recentProjects"] = section_content # 计算项目数量 project_count = len([p for p in paragraphs if p.strip().startswith(str(len(paragraphs) - paragraphs.index(p))+".")]) if project_count > 0: teacher_data["projects"] = f"{project_count}项" else: teacher_data["projects"] = f"{len(paragraphs)}项" elif '代表性学术论文' in heading_text: teacher_data["representativePapers"] = section_content # 计算论文数量 paper_count = len([p for p in paragraphs if p.strip().startswith("[")]) if paper_count > 0: teacher_data["papers"] = f"{paper_count}篇" else: teacher_data["papers"] = f"{len(paragraphs)}篇" elif '授权国家发明专利' in heading_text or '专利' in heading_text: teacher_data["patents"] = section_content return teacher_data except Exception as e: print(f"抓取错误: {str(e)}") return JSONResponse( status_code=500, content={"error": f"抓取网页失败: {str(e)}"}, ) # 保存评估数据接口 @app.post("/api/save-data") async def save_evaluation_data( request: schemas.SaveDataRequest, current_user: models.User = Depends(get_current_active_user), db: Session = Depends(get_db) ): try: if request.data_type == "talent": # 获取人才ID talent_id = request.data.get("id") # 如果id不存在、为空字符串或为null,执行新增操作 if not talent_id: # 确保有必要的字段 if not request.data.get("name"): return JSONResponse( status_code=400, content={"error": "缺少必要字段:name"}, ) # 生成新ID new_id = f"BLG{random.randint(10000, 99999)}" # 复制数据并添加ID talent_data = {**request.data, "id": new_id} # 创建Talent对象并保存到数据库 # 过滤掉数据库模型中不存在的字段 valid_fields = { 'id', 'idcode', 'name', 'gender', 'birthDate', 'title', 'position', 'education', 'educationBackground', 'address', 'academicDirection', 'talentPlan', 'officeLocation', 'email', 'phone', 'tutorType', 'papers', 'projects', 'photo', 'eduWorkHistory', 'researchDirection', 'recentProjects', 'representativePapers', 'patents', 'evaluationData' } filtered_talent_data = {k: v for k, v in talent_data.items() if k in valid_fields} db_talent = models.Talent(**filtered_talent_data) db.add(db_talent) db.commit() db.refresh(db_talent) return {"success": True, "message": "人才评估数据已新增", "id": new_id} else: # 执行更新操作 talent = crud.update_talent(db, talent_id, request.data) if not talent: return JSONResponse( status_code=404, content={"error": f"未找到ID为 {talent_id} 的人才"}, ) return {"success": True, "message": "人才评估数据已更新"} elif request.data_type == "lab": # 获取工程研究中心ID lab_id = request.data.get("id") # 如果id不存在、为空字符串或为null,执行新增操作 if not lab_id: # 确保有必要的字段 if not request.data.get("name"): return JSONResponse( status_code=400, content={"error": "缺少必要字段:name"}, ) # 生成新ID new_id = f"BLG{random.randint(10000, 99999)}" # 复制数据并添加ID lab_data = {**request.data, "id": new_id} # 创建Lab对象并保存到数据库 db_lab = models.Lab(**lab_data) db.add(db_lab) db.commit() db.refresh(db_lab) return {"success": True, "message": "工程研究中心评估数据已新增", "id": new_id} else: # 执行更新操作 lab = crud.update_lab(db, lab_id, request.data) if not lab: return JSONResponse( status_code=404, content={"error": f"未找到ID为 {lab_id} 的工程研究中心"}, ) return {"success": True, "message": "工程研究中心评估数据已更新"} else: return JSONResponse( status_code=400, content={"error": f"不支持的数据类型: {request.data_type}"}, ) except Exception as e: return JSONResponse( status_code=500, content={"error": f"保存数据失败: {str(e)}"}, ) # 添加抓取教师信息和头像的新接口 @app.post("/api/fetch-teacher-data") async def fetch_teacher_data(db: Session = Depends(get_db)): try: # 抓取网页内容 url = "https://ac.bit.edu.cn/szdw/jsml/kzllykzgcyjs1/c6533e24f85749578699deca43c38b40.htm" # 设置请求头,模拟浏览器访问 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Cache-Control': 'max-age=0' } response = requests.get(url, headers=headers, timeout=30) response.raise_for_status() # 设置编码以正确处理中文字符 response.encoding = 'utf-8' # 解析HTML soup = BeautifulSoup(response.text, 'html.parser') # 获取基础URL用于解析相对路径 url_parts = urllib.parse.urlparse(url) base_url = f"{url_parts.scheme}://{url_parts.netloc}" # 初始化教师数据 teacher_data = { "id": f"BLG{random.randint(10000, 99999)}", "evaluationData": [ round(min(100, max(60, 70 + 20 * (0.5 - random.random())))) for _ in range(6) ] } # 从教师信息表提取基本信息 info_table = soup.find('div', class_='wz_teacher') if info_table: table = info_table.find('table') if table: rows = table.find_all('tr') # 提取姓名、性别、出生年月 if len(rows) > 0: cells = rows[0].find_all('td') if len(cells) >= 6: teacher_data["name"] = cells[1].text.strip() teacher_data["gender"] = cells[3].text.strip() teacher_data["birthDate"] = cells[5].text.strip() # 提取职称、职务、最高学历 if len(rows) > 1: cells = rows[1].find_all('td') if len(cells) >= 6: teacher_data["title"] = cells[1].text.strip() position = cells[3].text.strip() teacher_data["position"] = position if position else "" teacher_data["education"] = cells[5].text.strip() # 提取学科方向 if len(rows) > 2: cells = rows[2].find_all('td') if len(cells) >= 2: teacher_data["academicDirection"] = cells[1].text.strip() # 提取人才计划和办公地点 if len(rows) > 3: cells = rows[3].find_all('td') if len(cells) >= 6: talent_plan = cells[1].text.strip() teacher_data["talentPlan"] = talent_plan if talent_plan else "" teacher_data["officeLocation"] = cells[5].text.strip() # 提取电子邮件和联系方式 if len(rows) > 4: cells = rows[4].find_all('td') if len(cells) >= 6: email = cells[1].text.strip() teacher_data["email"] = email if email else "" phone = cells[5].text.strip() teacher_data["phone"] = phone if phone else "" # 提取通讯地址 if len(rows) > 5: cells = rows[5].find_all('td') if len(cells) >= 2: teacher_data["address"] = cells[1].text.strip() # 提取导师类型 if len(rows) > 6: cells = rows[6].find_all('td') if len(cells) >= 2: teacher_data["tutorType"] = cells[1].text.strip() # 提取详细信息部分 content_divs = soup.select('.con01_t') for div in content_divs: heading = div.find('h3') if not heading: continue heading_text = heading.text.strip() # 获取该部分的所有段落文本 paragraphs = [p.text.strip() for p in div.find_all('p') if p.text.strip()] section_content = '\n'.join(paragraphs) # 根据标题将内容映射到相应字段 if '教育与工作经历' in heading_text: teacher_data["eduWorkHistory"] = section_content elif '研究方向' in heading_text: teacher_data["researchDirection"] = section_content elif '近5年承担的科研项目' in heading_text or '近五年承担的科研项目' in heading_text: teacher_data["recentProjects"] = section_content # 计算项目数量 project_count = len([p for p in paragraphs if p.strip().startswith(str(len(paragraphs) - paragraphs.index(p))+".")]) if project_count > 0: teacher_data["projects"] = f"{project_count}项" else: teacher_data["projects"] = f"{len(paragraphs)}项" elif '代表性学术论文' in heading_text: teacher_data["representativePapers"] = section_content # 计算论文数量 paper_count = len([p for p in paragraphs if p.strip().startswith("[")]) if paper_count > 0: teacher_data["papers"] = f"{paper_count}篇" else: teacher_data["papers"] = f"{len(paragraphs)}篇" elif '授权国家发明专利' in heading_text or '专利' in heading_text: teacher_data["patents"] = section_content # 提取照片 photo_element = soup.select_one('.teacherInfo .img img') if photo_element and photo_element.get('src'): img_src = photo_element['src'] # 处理相对路径,构建完整的图片URL if img_src.startswith('../../../'): # 从URL获取基础路径(移除文件名和最后两级目录) url_parts = url.split('/') if len(url_parts) >= 4: base_path = '/'.join(url_parts[:-3]) img_url = f"{base_path}/{img_src[9:]}" # 移除 '../../../' else: img_url = urllib.parse.urljoin(base_url, img_src) else: img_url = urllib.parse.urljoin(base_url, img_src) # 直接保存完整的图片URL,不下载到本地 teacher_data["photo"] = img_url # 保存到数据库 db_talent = models.Talent(**teacher_data) db.add(db_talent) db.commit() db.refresh(db_talent) # 返回结果 return {"success": True, "message": "教师数据已成功抓取并保存", "data": teacher_data} except Exception as e: print(f"抓取教师数据错误: {str(e)}") return JSONResponse( status_code=500, content={"error": f"抓取失败: {str(e)}"}, ) # 添加获取单个教师详情的接口 @app.get("/talents/{talent_id}", response_model=schemas.Talent) async def get_talent_detail(talent_id: str, db: Session = Depends(get_db)): talent = crud.get_talent(db, talent_id) if talent is None: raise HTTPException(status_code=404, detail="教师信息不存在") return talent # 删除单个人才 @app.delete("/talents/{talent_id}") async def delete_talent( talent_id: str, current_user: models.User = Depends(get_current_active_user), db: Session = Depends(get_db) ): # 查找要删除的人才 talent = crud.get_talent(db, talent_id) if talent is None: raise HTTPException(status_code=404, detail="教师信息不存在") # 删除人才记录 success = crud.delete_talent(db, talent_id) if not success: raise HTTPException(status_code=500, detail="删除失败") return {"success": True, "message": "删除成功"} # 添加获取单个工程研究中心详情的接口 @app.get("/labs/{lab_id}", response_model=schemas.Lab) async def get_lab_detail(lab_id: str, db: Session = Depends(get_db)): lab = crud.get_lab(db, lab_id) if lab is None: raise HTTPException(status_code=404, detail="工程研究中心信息不存在") return lab # 获取所有维度 @app.get("/dimensions", response_model=List[schemas.Dimension]) async def get_dimensions(db: Session = Depends(get_db)): dimensions = crud.get_all_dimensions(db) return dimensions # 获取特定类别的维度 @app.get("/dimensions/{category}", response_model=List[schemas.Dimension]) async def get_dimensions_by_category(category: str, db: Session = Depends(get_db)): dimensions = db.query(models.Dimension).filter(models.Dimension.category == category).all() # 处理返回数据,添加subDimensions字段 result = [] for dim in dimensions: dim_dict = { "id": dim.id, "name": dim.name, "weight": dim.weight, "category": dim.category, "description": dim.description, "sub_dimensions": dim.sub_dimensions, "subDimensions": dim.sub_dimensions # 添加subDimensions与sub_dimensions内容相同 } result.append(dim_dict) return result # 创建新维度 @app.post("/dimensions", response_model=schemas.Dimension) async def create_dimension( dimension: schemas.DimensionCreate, current_user: models.User = Depends(get_current_active_user), db: Session = Depends(get_db) ): return crud.create_dimension( db=db, name=dimension.name, weight=dimension.weight, category=dimension.category, description=dimension.description ) @app.put("/dimensions/{dimension_id}", response_model=schemas.Dimension) async def update_dimension( dimension_id: int, dimension: schemas.DimensionCreate, current_user: models.User = Depends(get_current_active_user), db: Session = Depends(get_db) ): db_dimension = crud.get_dimension(db, dimension_id) if not db_dimension: raise HTTPException(status_code=404, detail="Dimension not found") dimension_data = dimension.dict(exclude_unset=True) return crud.update_dimension(db, dimension_id, dimension_data) @app.delete("/dimensions/{dimension_id}", response_model=dict) async def delete_dimension( dimension_id: int, current_user: models.User = Depends(get_current_active_user), db: Session = Depends(get_db) ): success = crud.delete_dimension(db, dimension_id) if not success: raise HTTPException(status_code=404, detail="Dimension not found") return {"success": True} # 添加新的API端点用于保存维度数据 @app.post("/api/save-dimensions") async def save_dimensions( request: dict = Body(...), current_user: models.User = Depends(get_current_active_user), db: Session = Depends(get_db) ): try: dimensions = request.get("dimensions", []) category = request.get("category", "") if not dimensions: return JSONResponse( status_code=400, content={"success": False, "message": "未提供维度数据"} ) if not category: return JSONResponse( status_code=400, content={"success": False, "message": "未提供分类信息"} ) # 根据category删除现有维度并重新创建 # 首先删除该类别的所有现有维度 existing_dimensions = db.query(models.Dimension).filter(models.Dimension.category == category).all() for dim in existing_dimensions: db.delete(dim) # 添加新的维度 for dim_data in dimensions: new_dimension = models.Dimension( name=dim_data.get("name", ""), weight=dim_data.get("weight", 1.0), category=category, description=dim_data.get("description", "") ) db.add(new_dimension) db.commit() return {"success": True, "message": "维度数据保存成功"} except Exception as e: db.rollback() return JSONResponse( status_code=500, content={"success": False, "message": f"保存维度数据失败: {str(e)}"} ) # 添加处理人才文档的新API @app.post("/api/upload-talent-document") async def upload_talent_document( file: UploadFile = File(...), current_user: models.User = Depends(get_current_active_user) ): # 检查文件是否为支持的文档格式 if not file.filename.endswith(('.docx', '.pdf', '.doc')): raise HTTPException(status_code=400, detail="只支持.docx、.pdf、.doc格式的文档") try: # 创建临时文件保存上传的文档 temp_file_path = f"temp_{uuid.uuid4()}.{file.filename.split('.')[-1]}" with open(temp_file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # 根据文件类型解析文档 if file.filename.endswith('.docx'): talent_data = extract_talent_info_from_docx(temp_file_path) else: # 对于PDF和DOC文件,暂时返回基础模板数据 talent_data = get_default_talent_data() talent_data["name"] = file.filename.split('.')[0] # 使用文件名作为姓名 # 删除临时文件 if os.path.exists(temp_file_path): os.remove(temp_file_path) return JSONResponse( status_code=200, content={"success": True, "data": talent_data} ) except Exception as e: # 确保删除任何临时文件 if 'temp_file_path' in locals() and os.path.exists(temp_file_path): os.remove(temp_file_path) print(f"处理人才文档时发生错误: {str(e)}") raise HTTPException(status_code=500, detail=f"处理文档时发生错误: {str(e)}") finally: # 关闭文件 await file.close() # 添加处理Word文档的新API @app.post("/api/upload-lab-document") async def upload_lab_document( file: UploadFile = File(...), current_user: models.User = Depends(get_current_active_user) ): # 检查文件是否为Word文档 if not file.filename.endswith(('.docx')): raise HTTPException(status_code=400, detail="只支持.docx格式的Word文档") try: # 创建临时文件保存上传的文档 temp_file_path = f"temp_{uuid.uuid4()}.docx" with open(temp_file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # 解析Word文档 lab_data = extract_lab_info_from_docx(temp_file_path) # 删除临时文件 if os.path.exists(temp_file_path): os.remove(temp_file_path) return JSONResponse( status_code=200, content={"success": True, "data": lab_data} ) except Exception as e: # 确保删除任何临时文件 if os.path.exists(temp_file_path): os.remove(temp_file_path) print(f"处理文档时发生错误: {str(e)}") raise HTTPException(status_code=500, detail=f"处理文档时发生错误: {str(e)}") finally: # 关闭文件 await file.close() def extract_talent_info_from_docx(file_path): """从Word文档中提取人才信息""" doc = docx.Document(file_path) full_text = [] # 提取所有段落文本 for para in doc.paragraphs: if para.text.strip(): full_text.append(para.text.strip()) # 合并文本以便于处理 text_content = "\n".join(full_text) # 初始化人才数据 talent_data = get_default_talent_data() # 提取姓名(假设第一行或包含"姓名"的行) if full_text: # 尝试从第一行提取姓名 first_line = full_text[0] if len(first_line) <= 10 and not any(char in first_line for char in [':', ':', '简历', '履历']): talent_data["name"] = first_line # 使用正则表达式提取各种信息 name_pattern = re.compile(r'姓名[::]\s*([^\n\r]+)') name_match = name_pattern.search(text_content) if name_match: talent_data["name"] = name_match.group(1).strip() # 提取编号 id_pattern = re.compile(r'编号[::]\s*([A-Za-z0-9]+)') id_match = id_pattern.search(text_content) if id_match: talent_data["idcode"] = id_match.group(1) # 提取性别 gender_pattern = re.compile(r'性别[::]\s*([男女])') gender_match = gender_pattern.search(text_content) if gender_match: talent_data["gender"] = gender_match.group(1) # 提取出生日期 birth_pattern = re.compile(r'出生[日期年月]*[::]\s*(\d{4}[年.-]\d{1,2}[月.-]\d{1,2}|\d{4}[年.-]\d{1,2})') birth_match = birth_pattern.search(text_content) if birth_match: talent_data["birthDate"] = birth_match.group(1) # 提取职称 title_pattern = re.compile(r'职称[::]\s*([^\n\r]+)') title_match = title_pattern.search(text_content) if title_match: talent_data["title"] = title_match.group(1).strip() # 提取学历 education_pattern = re.compile(r'学历[::]\s*([^\n\r]+)') education_match = education_pattern.search(text_content) if education_match: talent_data["education"] = education_match.group(1).strip() # 提取邮箱 email_pattern = re.compile(r'邮箱[::]?\s*([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})') email_match = email_pattern.search(text_content) if email_match: talent_data["email"] = email_match.group(1) # 提取电话 phone_pattern = re.compile(r'电话[::]?\s*(\d{11}|\d{3,4}-\d{7,8})') phone_match = phone_pattern.search(text_content) if phone_match: talent_data["phone"] = phone_match.group(1) # 提取研究方向 research_pattern = re.compile(r'研究方向[::]\s*([^\n\r]+)') research_match = research_pattern.search(text_content) if research_match: talent_data["researchDirection"] = research_match.group(1).strip() # 提取工作经历(查找包含"工作经历"、"教育经历"等关键词的段落) work_keywords = ['工作经历', '教育经历', '学习经历', '履历'] for i, line in enumerate(full_text): if any(keyword in line for keyword in work_keywords): # 取该行及后续几行作为工作经历 work_history = [] for j in range(i, min(i + 5, len(full_text))): if full_text[j].strip(): work_history.append(full_text[j].strip()) talent_data["eduWorkHistory"] = "\n".join(work_history) break # 提取论文信息 papers_pattern = re.compile(r'论文[数量]*[::]\s*(\d+)') papers_match = papers_pattern.search(text_content) if papers_match: talent_data["papers"] = f"{papers_match.group(1)}篇" # 提取项目信息 projects_pattern = re.compile(r'项目[数量]*[::]\s*(\d+)') projects_match = projects_pattern.search(text_content) if projects_match: talent_data["projects"] = f"{projects_match.group(1)}项" # 生成评估数据(基于提取到的信息) try: papers_num = int(re.search(r'\d+', talent_data["papers"] or "0").group(0) or 0) projects_num = int(re.search(r'\d+', talent_data["projects"] or "0").group(0) or 0) # 简单的评分算法 work_score = min(100, 60 + len(talent_data["eduWorkHistory"]) // 10) research_score = min(100, 60 + papers_num * 2) project_score = min(100, 60 + projects_num * 3) paper_score = min(100, 60 + papers_num * 2.5) patent_score = min(100, 60 + random.randint(0, 20)) # 随机生成专利分数 impact_score = min(100, (research_score + project_score + paper_score) // 3) talent_data["evaluationData"] = [ work_score, # 工作经历 research_score, # 研究方向 project_score, # 科研项目 paper_score, # 学术论文 patent_score, # 专利专著 impact_score # 学术影响 ] except Exception as e: print(f"计算人才评分时出错: {str(e)}") # 出错时保留默认评分 return talent_data def get_default_talent_data(): """获取默认的人才数据模板""" return { "idcode": "", "name": "", "gender": "", "birthDate": "", "title": "", "position": "", "education": "", "educationBackground": "", "address": "", "academicDirection": "", "talentPlan": "", "officeLocation": "", "email": "", "phone": "", "tutorType": "", "papers": "", "projects": "", "photo": "", "eduWorkHistory": "", "researchDirection": "", "recentProjects": "", "representativePapers": "", "patents": "", "evaluationData": [60, 60, 60, 60, 60, 60] # 默认评估数据 } def extract_lab_info_from_docx(file_path): """从Word文档中提取工程研究中心信息""" doc = docx.Document(file_path) full_text = [] # 提取所有段落文本 for para in doc.paragraphs: if para.text.strip(): full_text.append(para.text.strip()) # 合并文本以便于处理 text_content = "\n".join(full_text) # 初始化工程研究中心数据 lab_data = { "name": "", "idcode": "", "personnel": "", "nationalProjects": "", "otherProjects": "", "achievements": "", "labAchievements": "", "evaluationData": [60, 60, 60, 60, 60, 60] # 默认评估数据 } # 提取工程研究中心名称(假设第一行是工程研究中心名称) if full_text: lab_data["name"] = full_text[0] # 提取工程研究中心编号(寻找带有"编号"的行) id_pattern = re.compile(r'编号[::]\s*([A-Za-z0-9]+)') id_matches = id_pattern.search(text_content) if id_matches: lab_data["idcode"] = id_matches.group(1) # 提取人员数量 personnel_pattern = re.compile(r'人员[数量]*[::]\s*(\d+)') personnel_matches = personnel_pattern.search(text_content) if personnel_matches: lab_data["personnel"] = f"{personnel_matches.group(1)}人" # 提取国家级项目数量 national_projects_pattern = re.compile(r'国家级项目[::]\s*(\d+)') np_matches = national_projects_pattern.search(text_content) if np_matches: lab_data["nationalProjects"] = f"{np_matches.group(1)}项" # 提取其他项目数量 other_projects_pattern = re.compile(r'(其他|其它)项目[::]\s*(\d+)') op_matches = other_projects_pattern.search(text_content) if op_matches: lab_data["otherProjects"] = f"{op_matches.group(2)}项" # 提取成果数量 achievements_pattern = re.compile(r'成果[数量]*[::]\s*(\d+)') ach_matches = achievements_pattern.search(text_content) if ach_matches: lab_data["achievements"] = f"{ach_matches.group(1)}项" # 提取工程研究中心成就信息(取文本的中间部分作为工程研究中心成就) if len(full_text) > 2: # 跳过第一行(标题)和最后一行,取中间的文本作为成就描述 lab_data["labAchievements"] = "\n".join(full_text[1:-1]) # 根据提取到的信息,给出一个评估评分 # 这里可以编写更复杂的评分算法,示例中使用简单方法 try: # 解析数字 personnel_num = int(re.search(r'\d+', lab_data["personnel"] or "0").group(0) or 0) national_num = int(re.search(r'\d+', lab_data["nationalProjects"] or "0").group(0) or 0) other_num = int(re.search(r'\d+', lab_data["otherProjects"] or "0").group(0) or 0) ach_num = int(re.search(r'\d+', lab_data["achievements"] or "0").group(0) or 0) # 简单计算评分 (仅示例) innovation_score = min(100, 50 + national_num * 5) research_score = min(100, 50 + (national_num + other_num) * 2) transform_score = min(100, 50 + ach_num * 2) discipline_score = min(100, 50 + personnel_num * 2) contribution_score = min(100, 50 + (national_num + ach_num) * 1.5) potential_score = min(100, (innovation_score + research_score + transform_score) / 3) lab_data["evaluationData"] = [ innovation_score, # 创新水平 research_score, # 研究能力 transform_score, # 成果转化 discipline_score, # 学科建设 contribution_score, # 行业贡献 potential_score # 发展潜力 ] except Exception as e: print(f"计算评分时出错: {str(e)}") # 出错时保留默认评分 return lab_data # 添加新的API端点用于清空所有人才和工程研究中心信息 @app.post("/api/clear-all-data") async def clear_all_data( current_user: models.User = Depends(get_current_active_user), db: Session = Depends(get_db) ): try: # 删除所有人才数据 db.query(models.Talent).delete() # 删除所有工程研究中心数据 db.query(models.Lab).delete() # 提交事务 db.commit() return {"success": True, "message": "所有数据已清空"} except Exception as e: db.rollback() return JSONResponse( status_code=500, content={"success": False, "message": f"清空数据失败: {str(e)}"} ) # 添加端点用于保存二级维度结构 @app.post("/dimensions/save") async def save_dimensions_structure( request: schemas.SaveDimensionsRequest, current_user: models.User = Depends(get_current_active_user), db: Session = Depends(get_db) ): try: category = request.category new_dimensions = request.dimensions # 先删除该类别的所有现有维度 existing_dimensions = db.query(models.Dimension).filter(models.Dimension.category == category).all() for dim in existing_dimensions: db.delete(dim) # 添加新的维度结构 for dimension in new_dimensions: sub_dimensions_data = None # 检查是否有子维度数据,优先使用subDimensions字段(前端使用的字段名) if hasattr(dimension, 'subDimensions') and dimension.subDimensions: sub_dimensions_data = [ {"name": sub.name, "weight": sub.weight, "description": getattr(sub, 'description', None)} for sub in dimension.subDimensions ] # 兼容处理sub_dimensions字段 elif dimension.sub_dimensions: sub_dimensions_data = [ {"name": sub.name, "weight": sub.weight, "description": getattr(sub, 'description', None)} for sub in dimension.sub_dimensions ] db_dimension = models.Dimension( name=dimension.name, category=category, weight=0.0, # 一级维度不需要权重 description=dimension.description, sub_dimensions=sub_dimensions_data ) db.add(db_dimension) db.commit() return {"success": True, "message": f"已成功保存{len(new_dimensions)}个维度及其子维度"} except Exception as e: db.rollback() return JSONResponse( status_code=500, content={"success": False, "message": f"保存维度失败: {str(e)}"} ) # 启动时初始化数据库 @app.on_event("startup") async def startup_db_client(): # 创建表(如果不存在) models.Base.metadata.create_all(bind=engine) # 运行数据库表结构修改脚本 try: from alter_table import check_and_alter_table check_and_alter_table() print("数据库表结构检查完成") except Exception as e: print(f"运行表结构修改脚本时出错: {e}") # 初始化维度数据 from init_dimensions import init_dimensions init_dimensions() # 初始化其他数据 from database import SessionLocal db = SessionLocal() try: # 检查是否已经存在用户数据 existing_user = db.query(models.User).filter(models.User.username == "admin").first() if not existing_user: print("初始化默认用户...") hashed_password = get_password_hash("123456") default_user = models.User( username="admin", email="admin@example.com", full_name="系统管理员", hashed_password=hashed_password ) db.add(default_user) db.commit() print("默认用户已创建: admin/123456") finally: db.close() if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)