Skip to content

Commit

Permalink
Merge pull request #819 from Aiven-Open/jjaakola-aiven-optimize-get-s…
Browse files Browse the repository at this point in the history
…chema-by-id-auth-check

refactor: optimize the authorization check for get schema by id
  • Loading branch information
aiven-anton authored Feb 20, 2024
2 parents 2e520f6 + 8eb8fe2 commit 119fd58
Show file tree
Hide file tree
Showing 3 changed files with 302 additions and 45 deletions.
90 changes: 61 additions & 29 deletions karapace/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from base64 import b64encode
from dataclasses import dataclass, field
from enum import Enum, unique
Expand All @@ -10,7 +12,6 @@
from karapace.rapu import JSON_CONTENT_TYPE
from karapace.statsd import StatsClient
from karapace.utils import json_decode, json_encode
from typing import List, Optional
from typing_extensions import TypedDict
from watchfiles import awatch, Change

Expand Down Expand Up @@ -90,15 +91,68 @@ class ACLEntryData(TypedDict):


class AuthData(TypedDict):
users: List[UserData]
permissions: List[ACLEntryData]
users: list[UserData]
permissions: list[ACLEntryData]


class ACLAuthorizer:
def __init__(self, *, user_db: dict[str, User] | None = None, permissions: list[ACLEntry] | None = None) -> None:
self.user_db = user_db or {}
self.permissions = permissions or []

def get_user(self, username: str) -> User | None:
return self.user_db.get(username)

def _check_resources(self, resources: list[str], aclentry: ACLEntry) -> bool:
for resource in resources:
if aclentry.resource.match(resource) is not None:
return True
return False

def _check_operation(self, operation: Operation, aclentry: ACLEntry) -> bool:
"""Does ACL entry allow given operation.
An entry at minimum gives Read permission. Write permission implies Read."""
return operation == Operation.Read or aclentry.operation == Operation.Write

def check_authorization(self, user: User | None, operation: Operation, resource: str) -> bool:
if user is None:
return False

for aclentry in self.permissions:
if (
aclentry.username == user.username
and self._check_operation(operation, aclentry)
and self._check_resources([resource], aclentry)
):
return True
return False

def check_authorization_any(self, user: User | None, operation: Operation, resources: list[str]) -> bool:
"""Checks that user is authorized to one of the resources in the list.
If any resource in the list matches the permission the function returns True. This indicates only that
one resource matches the permission and other resources may not.
"""
if user is None:
return False

class HTTPAuthorizer:
for aclentry in self.permissions:
if (
aclentry.username == user.username
and self._check_operation(operation, aclentry)
and self._check_resources(resources, aclentry)
):
return True
return False


class HTTPAuthorizer(ACLAuthorizer):
def __init__(self, filename: str) -> None:
super().__init__()
self._auth_filename: str = filename
self._auth_mtime: float = -1
self._refresh_auth_task: Optional[asyncio.Task] = None
self._refresh_auth_task: asyncio.Task | None = None
self._refresh_auth_awatch_stop_event = asyncio.Event()
# Once first, can raise if file not valid
self._load_authfile()
Expand Down Expand Up @@ -158,7 +212,7 @@ def _load_authfile(self) -> None:
ACLEntry(entry["username"], Operation(entry["operation"]), re.compile(entry["resource"]))
for entry in authdata["permissions"]
]
self.userdb = users
self.user_db = users
log.info(
"Loaded schema registry users: %s",
users,
Expand All @@ -172,28 +226,6 @@ def _load_authfile(self) -> None:
except Exception as ex:
raise InvalidConfiguration("Failed to load auth file") from ex

def check_authorization(self, user: Optional[User], operation: Operation, resource: str) -> bool:
if user is None:
return False

def check_operation(operation: Operation, aclentry: ACLEntry) -> bool:
"""Does ACL entry allow given operation.
An entry at minimum gives Read permission. Write permission implies Read."""
return operation == Operation.Read or aclentry.operation == Operation.Write

def check_resource(resource: str, aclentry: ACLEntry) -> bool:
return aclentry.resource.match(resource) is not None

for aclentry in self.permissions:
if (
aclentry.username == user.username
and check_operation(operation, aclentry)
and check_resource(resource, aclentry)
):
return True
return False

def authenticate(self, request: aiohttp.web.Request) -> User:
auth_header = request.headers.get("Authorization")
if auth_header is None:
Expand All @@ -211,7 +243,7 @@ def authenticate(self, request: aiohttp.web.Request) -> User:
text='{"message": "Unauthorized"}',
content_type=JSON_CONTENT_TYPE,
)
user = self.userdb.get(auth.login)
user = self.get_user(auth.login)
if user is None or not user.compare_password(auth.password):
raise aiohttp.web.HTTPUnauthorized(
headers={"WWW-Authenticate": 'Basic realm="Karapace Schema Registry"'},
Expand Down
33 changes: 17 additions & 16 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,29 +493,30 @@ async def schemas_get(
)

include_subjects = request.query.get("includeSubjects", "false").lower() == "true"
fetch_max_id = request.query.get("fetchMaxId", "false").lower() == "true"
schema = self.schema_registry.schemas_get(parsed_schema_id, fetch_max_id=fetch_max_id)

def _has_subject_with_id() -> bool:
schema_versions = self.schema_registry.database.find_schemas(include_deleted=True, latest_only=False)
for subject, schema_versions in schema_versions.items():
if not schema_versions:
continue
for schema_version in schema_versions:
if (
schema_version.schema_id == parsed_schema_id
and not schema_version.deleted
and self._auth is not None
and self._auth.check_authorization(user, Operation.Read, f"Subject:{subject}")
):
return True
return False
# Fast path
if self._auth is None or self._auth.check_authorization(user, Operation.Read, "Subject:*"):
return True

subjects = self.schema_registry.database.subjects_for_schema(schema_id=parsed_schema_id)
resources = [f"Subject:{subject}" for subject in subjects]
return self._auth.check_authorization_any(user=user, operation=Operation.Read, resources=resources)

if self._auth:
has_subject = _has_subject_with_id()
if not has_subject:
schema = None
self.r(
body={
"error_code": SchemaErrorCodes.SCHEMA_NOT_FOUND.value,
"message": "Schema not found",
},
content_type=content_type,
status=HTTPStatus.NOT_FOUND,
)

fetch_max_id = request.query.get("fetchMaxId", "false").lower() == "true"
schema = self.schema_registry.schemas_get(parsed_schema_id, fetch_max_id=fetch_max_id)
if not schema:
self.r(
body={
Expand Down
Loading

0 comments on commit 119fd58

Please sign in to comment.