import json from datetime import timedelta from typing import Any from fastapi import APIRouter, Body, Depends, HTTPException, Request from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session import crud, models, schemas from api import deps from core import security from core.config import settings from core.security import get_password_hash from utils import ( verify_password_reset_token, ) router = APIRouter() @router.post("/login") def login( data: schemas.UserLogin, # data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(deps.get_db), ) -> Any: """ OAuth2兼容令牌登录,获取将来令牌的访问令牌 """ user = crud.user.authenticate( db, name=data.username, password=data.password ) if not user: raise HTTPException(status_code=400, detail="Incorrect name or password") elif not crud.user.is_active(user): raise HTTPException(status_code=400, detail="Inactive user") access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) return { 'data': { 'name': user.name, 'email': user.email, 'token': security.create_access_token( expires_delta=access_token_expires, id=user.id, email=user.email, is_active=user.is_active, is_superuser=user.is_superuser, name=user.name ), }, 'code': 0, 'msg': 'success', } @router.get("/me", response_model=schemas.UserDBBase) def me(current_user: models.User = Depends(deps.get_current_active_user)) -> Any: """ Test access token """ return current_user @router.post("/reset-password", response_model=schemas.Msg) def reset_password( token: str = Body(...), new_password: str = Body(...), db: Session = Depends(deps.get_db), ) -> Any: """ 重设密码 """ user_id = verify_password_reset_token(token) if not user_id: raise HTTPException(status_code=400, detail="Invalid token") user = crud.user.get(db, user_id) if not user: raise HTTPException( status_code=404, detail="The user with this username does not exist in the system.", ) elif not crud.user.is_active(user): raise HTTPException(status_code=400, detail="Inactive user") hashed_password = get_password_hash(new_password) user.hashed_password = hashed_password db.add(user) db.commit() return {"msg": "Password updated successfully"}