import binascii import time import uvicorn from fastapi import FastAPI, Request from fastapi.exceptions import RequestValidationError from starlette.middleware.cors import CORSMiddleware from starlette.authentication import AuthenticationBackend, AuthenticationError, AuthCredentials, BaseUser, SimpleUser from starlette.middleware.authentication import AuthenticationMiddleware from starlette.requests import HTTPConnection from starlette.responses import Response, JSONResponse import crud import schemas from db.redisdb import get_redis_pool from middleware import CasbinMiddleware from db import connect_to_mongo, close_mongo_connection, get_database from db.ckdb_utils import connect_to_ck, close_ck_connection from db.redisdb_utils import connect_to_redis, close_redis_connection from utils import * from api.api_v1.api import api_router from core.config import settings from api.deps import get_current_user2 app = FastAPI(title=settings.PROJECT_NAME) app.include_router(api_router, prefix=settings.API_V1_STR) app.add_event_handler("startup", connect_to_mongo) app.add_event_handler("startup", connect_to_redis) app.add_event_handler("startup", connect_to_ck) app.add_event_handler("shutdown", close_mongo_connection) app.add_event_handler("shutdown", close_redis_connection) app.add_event_handler("shutdown", close_ck_connection) class CurrentUser(BaseUser): def __init__(self, username: str, user_id: str) -> None: self.username = username self.id = user_id @property def is_authenticated(self) -> bool: return True @property def display_name(self) -> str: return self.username @property def identity(self) -> str: return '' class BasicAuth(AuthenticationBackend): async def authenticate(self, request): if "Authorization" not in request.headers or request.scope.get('path') == '/api/v1/user/login': return None auth = request.headers["Authorization"] if len(auth) < 20: return None try: user = get_current_user2(auth.split(' ')[1]) except (ValueError, UnicodeDecodeError, binascii.Error): raise AuthenticationError("身份验证失败,请重新登录") return AuthCredentials(["authenticated"]), CurrentUser(user.name, user.id) def login_expired(conn: HTTPConnection, exc: Exception) -> Response: return JSONResponse(schemas.Msg(code=-5, msg='请重新登录').dict(), status_code=200) #处理路由权限问题 @app.middleware("http") async def panduan_quanxian_url(request: Request, call_next): #user_id=request.user.id #user=request.user.username start_time = int(time.time() * 1000) response = await call_next(request) process_time = int(time.time() * 1000) - start_time response.headers["X-Process-Time"] = str(process_time) url=request.url.path if 'docs' in url or 'openapi.json' in url: return response if url in ['/api/v1/user/login','/api/v1/user/send_auth_code']: return response game=request.url.query.split('=')[1] if 'undefined' in game: return response if '&' in game: game=game.split('&')[0] judge_url = await crud.user_url.get_quanxian(get_database(), schemas.Url_quanxian(user_id=request.user.id)) if judge_url == {}: # data='没有匹配这个游戏' return Response(schemas.Msg(code=0, msg='没有操作权限',data='').json()) if game not in judge_url['game']: #data='没有匹配这个游戏' return Response(schemas.Msg(code=0, msg='没有操作权限',data='' ).json()) quanxian_dict={} for i in range(len(judge_url['game'])): quanxian_dict[judge_url['game'][i]]=judge_url['quanxian'][i] user_list=await crud.url_list.get_url(get_database(),schemas.Url_list(name=quanxian_dict[game])) api_list=[] state_list=[] api_dict={} for i in user_list: for api in i['api_list']: api_list.append(api) for quanxian in i['state']: state_list.append(quanxian) for i in range(len(api_list)): api_dict[api_list[i]]=state_list[i] if url not in api_list: # data='没有对应路由' return Response(schemas.Msg(code=0, msg='没有操作权限',data='').json()) elif api_dict[url] != True: # data='路由为False' return Response(schemas.Msg(code=0, msg='没有操作权限',data='').json()) return response #app.add_middleware(CasbinMiddleware, enforcer=casbin_enforcer) app.add_middleware(AuthenticationMiddleware, backend=BasicAuth(), on_error=login_expired) app.add_middleware( CORSMiddleware, allow_origins=['*'], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.exception_handler(RequestValidationError) async def validation_exception_handler(request, exc): return Response(schemas.Msg(code=-4, msg='请求错误', data=str(exc)).json(), status_code=400) @app.exception_handler(Exception) async def http_exception_handler(request, exc): return Response(schemas.Msg(code=-3, msg='服务器错误').json(), status_code=500) @app.middleware("http") async def add_process_time_header(request: Request, call_next): start_time = int(time.time() * 1000) response = await call_next(request) process_time = int(time.time() * 1000) - start_time response.headers["X-Process-Time"] = str(process_time) user_id = 'anonymous' try: user_id = request.user.id except: pass await crud.api_log.insert_log(get_database(), schemas.ApiLogInsert( api=str(request.url), ms=process_time, user_id=user_id )) return response if __name__ == '__main__': #uvicorn.run(app='main:app', host="0.0.0.0", port=7899, reload=True, debug=True) uvicorn.run(app='main:app', host="0.0.0.0", port=7899, reload=True, debug=True)