xbackend/middleware/casbin.py
2021-08-05 21:17:27 +08:00

72 lines
2.0 KiB
Python

from utils.casbin.enforcer import Enforcer
from fastapi import HTTPException
from starlette.authentication import BaseUser
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.status import HTTP_403_FORBIDDEN
from starlette.types import ASGIApp, Receive, Scope, Send
import schemas
class CasbinMiddleware:
"""
Middleware for Casbin
"""
def __init__(
self,
app: ASGIApp,
enforcer: Enforcer,
) -> None:
"""
Configure Casbin Middleware
:param app:Retain for ASGI.
:param enforcer:Casbin Enforcer, must be initialized before FastAPI start.
"""
self.app = app
self.enforcer = enforcer
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] not in ("http", "websocket"):
await self.app(scope, receive, send)
return
if await self._enforce(scope, receive):
await self.app(scope, receive, send)
return
else:
response = JSONResponse(
status_code=HTTP_403_FORBIDDEN,
content="没有操作权限"
)
await response(scope, receive, send)
return
async def _enforce(self, scope: Scope, receive: Receive) -> bool:
"""
Enforce a request
:param user: user will be sent to enforcer
:param request: ASGI Request
:return: Enforce Result
"""
request = Request(scope, receive)
path = request.url.path
method = request.method
if 'user' not in scope:
raise RuntimeError("Casbin Middleware must work with an Authentication Middleware")
assert isinstance(request.user, BaseUser)
user = request.user.display_name if request.user.is_authenticated else 'anonymous'
dom = request.query_params.get('game', '0')
print(user, dom, path, method)
return self.enforcer.enforce(user, dom, path, method)