xbackend/api/api_v1/endpoints/login.py
2021-04-30 18:52:30 +08:00

94 lines
2.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Body, Depends, HTTPException, Request
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
import crud, models, schemas
from api import deps
from core import security
from core.config import settings
from core.security import get_password_hash
from utils import (
verify_password_reset_token,
)
router = APIRouter()
@router.post("/login")
def login(
# data: schemas.UserLogin,
data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(deps.get_db),
) -> Any:
"""
OAuth2兼容令牌登录获取将来令牌的访问令牌
"""
user = crud.user.authenticate(
db, name=data.username, password=data.password
)
if not user:
raise HTTPException(status_code=400, detail="Incorrect name or password")
elif not crud.user.is_active(user):
raise HTTPException(status_code=400, detail="Inactive user")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
'data': {
'name': user.name,
'email': user.email,
'access_token': security.create_access_token(
expires_delta=access_token_expires, id=user.id, email=user.email, is_active=user.is_active,
is_superuser=user.is_superuser, name=user.name
),
"token_type": "bearer",
},
'access_token': security.create_access_token(
expires_delta=access_token_expires, id=user.id, email=user.email, is_active=user.is_active,
is_superuser=user.is_superuser, name=user.name
),
"token_type": "bearer",
'code': 0,
'msg': 'success',
}
@router.get("/me", response_model=schemas.UserDBBase)
def me(current_user: models.User = Depends(deps.get_current_active_user)) -> Any:
"""
Test access token
"""
return current_user
@router.post("/reset-password", response_model=schemas.Msg)
def reset_password(
token: str = Body(...),
new_password: str = Body(...),
db: Session = Depends(deps.get_db),
) -> Any:
"""
重设密码
"""
user_id = verify_password_reset_token(token)
if not user_id:
raise HTTPException(status_code=400, detail="Invalid token")
user = crud.user.get(db, user_id)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system.",
)
elif not crud.user.is_active(user):
raise HTTPException(status_code=400, detail="Inactive user")
hashed_password = get_password_hash(new_password)
user.hashed_password = hashed_password
db.add(user)
db.commit()
return {"msg": "Password updated successfully"}