Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refact: split policy router into multiple files #21

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 0 additions & 239 deletions bento_authorization_service/routers/policy.py

This file was deleted.

4 changes: 4 additions & 0 deletions bento_authorization_service/routers/policy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .router import policy_router
from . import evaluation, list_permissions

__all__ = ["policy_router"]
63 changes: 63 additions & 0 deletions bento_authorization_service/routers/policy/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import asyncio
import jwt

from bento_lib.auth.permissions import P_VIEW_PERMISSIONS
from fastapi import HTTPException, Request, status
from pydantic import BaseModel
from typing import Awaitable, Callable, TypeVar

from bento_authorization_service.db import Database
from bento_authorization_service.dependencies import OptionalBearerToken
from bento_authorization_service.idp_manager import IdPManager
from bento_authorization_service.logger import logger
from bento_authorization_service.models import ResourceModel
from bento_authorization_service.policy_engine.evaluation import TokenData
from ..utils import MarkAuthzDone, require_permission_and_flag

__all__ = [
"ResponseType",
"check_non_bearer_token_data_use",
"use_token_data_or_return_error_state",
]

ResponseType = TypeVar("ResponseType", dict, BaseModel)


async def check_non_bearer_token_data_use(
token_data: TokenData | None,
resources: tuple[ResourceModel, ...],
request: Request,
authorization: OptionalBearerToken,
db: Database,
idp_manager: IdPManager,
) -> None:
if token_data is None:
# Using our own token, so this becomes a public endpoint.
MarkAuthzDone.mark_authz_done(request)
return

async def req_inner(r: ResourceModel):
await require_permission_and_flag(r, P_VIEW_PERMISSIONS, request, authorization, db, idp_manager)

await asyncio.gather(*map(req_inner, resources))


async def use_token_data_or_return_error_state(
authorization: OptionalBearerToken,
idp_manager: IdPManager,
err_state: dict,
create_response: Callable[[dict | None], Awaitable[ResponseType]],
) -> ResponseType:
try:
token_data = (await idp_manager.decode(authorization.credentials)) if authorization is not None else None
except jwt.InvalidAudienceError as e:
logger.warning(f"Got token with bad audience (exception: {repr(e)})")
return err_state
except jwt.ExpiredSignatureError:
logger.warning(f"Got expired token")
return err_state
except jwt.DecodeError:
# Actually throw an HTTP error for this one
raise HTTPException(detail="Bearer token must be a valid JWT", status_code=status.HTTP_400_BAD_REQUEST)

return await create_response(token_data)
Loading