From 8eb8fe275c9f6e5bb3d828c00232456c1a32d119 Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Fri, 16 Feb 2024 16:14:26 +0200 Subject: [PATCH] refactor: optimize the authorization check for get schema by id Load the subjects associated with the schema id from in-memory database. This list can be used as source for subjects to match user ACLs. The ACL authorizator is added which can operate only on the user dictionary and permission list. HTTP authorizer inherits this class and add the file loading and HTTP authorization handling on top. Unit tests for ACL authorizer are added. --- karapace/auth.py | 90 +++++++++---- karapace/schema_registry_apis.py | 33 ++--- tests/unit/test_auth.py | 224 +++++++++++++++++++++++++++++++ 3 files changed, 302 insertions(+), 45 deletions(-) create mode 100644 tests/unit/test_auth.py diff --git a/karapace/auth.py b/karapace/auth.py index 071c618ad..133b78362 100644 --- a/karapace/auth.py +++ b/karapace/auth.py @@ -2,6 +2,8 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from __future__ import annotations + from base64 import b64encode from dataclasses import dataclass, field from enum import Enum, unique @@ -10,7 +12,6 @@ from karapace.rapu import JSON_CONTENT_TYPE from karapace.statsd import StatsClient from karapace.utils import json_decode, json_encode -from typing import List, Optional from typing_extensions import TypedDict from watchfiles import awatch, Change @@ -90,15 +91,68 @@ class ACLEntryData(TypedDict): class AuthData(TypedDict): - users: List[UserData] - permissions: List[ACLEntryData] + users: list[UserData] + permissions: list[ACLEntryData] + + +class ACLAuthorizer: + def __init__(self, *, user_db: dict[str, User] | None = None, permissions: list[ACLEntry] | None = None) -> None: + self.user_db = user_db or {} + self.permissions = permissions or [] + + def get_user(self, username: str) -> User | None: + return self.user_db.get(username) + + def _check_resources(self, resources: list[str], aclentry: ACLEntry) -> bool: + for resource in resources: + if aclentry.resource.match(resource) is not None: + return True + return False + + def _check_operation(self, operation: Operation, aclentry: ACLEntry) -> bool: + """Does ACL entry allow given operation. + + An entry at minimum gives Read permission. Write permission implies Read.""" + return operation == Operation.Read or aclentry.operation == Operation.Write + + def check_authorization(self, user: User | None, operation: Operation, resource: str) -> bool: + if user is None: + return False + for aclentry in self.permissions: + if ( + aclentry.username == user.username + and self._check_operation(operation, aclentry) + and self._check_resources([resource], aclentry) + ): + return True + return False + + def check_authorization_any(self, user: User | None, operation: Operation, resources: list[str]) -> bool: + """Checks that user is authorized to one of the resources in the list. + + If any resource in the list matches the permission the function returns True. This indicates only that + one resource matches the permission and other resources may not. + """ + if user is None: + return False -class HTTPAuthorizer: + for aclentry in self.permissions: + if ( + aclentry.username == user.username + and self._check_operation(operation, aclentry) + and self._check_resources(resources, aclentry) + ): + return True + return False + + +class HTTPAuthorizer(ACLAuthorizer): def __init__(self, filename: str) -> None: + super().__init__() self._auth_filename: str = filename self._auth_mtime: float = -1 - self._refresh_auth_task: Optional[asyncio.Task] = None + self._refresh_auth_task: asyncio.Task | None = None self._refresh_auth_awatch_stop_event = asyncio.Event() # Once first, can raise if file not valid self._load_authfile() @@ -158,7 +212,7 @@ def _load_authfile(self) -> None: ACLEntry(entry["username"], Operation(entry["operation"]), re.compile(entry["resource"])) for entry in authdata["permissions"] ] - self.userdb = users + self.user_db = users log.info( "Loaded schema registry users: %s", users, @@ -172,28 +226,6 @@ def _load_authfile(self) -> None: except Exception as ex: raise InvalidConfiguration("Failed to load auth file") from ex - def check_authorization(self, user: Optional[User], operation: Operation, resource: str) -> bool: - if user is None: - return False - - def check_operation(operation: Operation, aclentry: ACLEntry) -> bool: - """Does ACL entry allow given operation. - - An entry at minimum gives Read permission. Write permission implies Read.""" - return operation == Operation.Read or aclentry.operation == Operation.Write - - def check_resource(resource: str, aclentry: ACLEntry) -> bool: - return aclentry.resource.match(resource) is not None - - for aclentry in self.permissions: - if ( - aclentry.username == user.username - and check_operation(operation, aclentry) - and check_resource(resource, aclentry) - ): - return True - return False - def authenticate(self, request: aiohttp.web.Request) -> User: auth_header = request.headers.get("Authorization") if auth_header is None: @@ -211,7 +243,7 @@ def authenticate(self, request: aiohttp.web.Request) -> User: text='{"message": "Unauthorized"}', content_type=JSON_CONTENT_TYPE, ) - user = self.userdb.get(auth.login) + user = self.get_user(auth.login) if user is None or not user.compare_password(auth.password): raise aiohttp.web.HTTPUnauthorized( headers={"WWW-Authenticate": 'Basic realm="Karapace Schema Registry"'}, diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index da0e587a8..85560ec5a 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -493,29 +493,30 @@ async def schemas_get( ) include_subjects = request.query.get("includeSubjects", "false").lower() == "true" - fetch_max_id = request.query.get("fetchMaxId", "false").lower() == "true" - schema = self.schema_registry.schemas_get(parsed_schema_id, fetch_max_id=fetch_max_id) def _has_subject_with_id() -> bool: - schema_versions = self.schema_registry.database.find_schemas(include_deleted=True, latest_only=False) - for subject, schema_versions in schema_versions.items(): - if not schema_versions: - continue - for schema_version in schema_versions: - if ( - schema_version.schema_id == parsed_schema_id - and not schema_version.deleted - and self._auth is not None - and self._auth.check_authorization(user, Operation.Read, f"Subject:{subject}") - ): - return True - return False + # Fast path + if self._auth is None or self._auth.check_authorization(user, Operation.Read, "Subject:*"): + return True + + subjects = self.schema_registry.database.subjects_for_schema(schema_id=parsed_schema_id) + resources = [f"Subject:{subject}" for subject in subjects] + return self._auth.check_authorization_any(user=user, operation=Operation.Read, resources=resources) if self._auth: has_subject = _has_subject_with_id() if not has_subject: - schema = None + self.r( + body={ + "error_code": SchemaErrorCodes.SCHEMA_NOT_FOUND.value, + "message": "Schema not found", + }, + content_type=content_type, + status=HTTPStatus.NOT_FOUND, + ) + fetch_max_id = request.query.get("fetchMaxId", "false").lower() == "true" + schema = self.schema_registry.schemas_get(parsed_schema_id, fetch_max_id=fetch_max_id) if not schema: self.r( body={ diff --git a/tests/unit/test_auth.py b/tests/unit/test_auth.py new file mode 100644 index 000000000..24786023d --- /dev/null +++ b/tests/unit/test_auth.py @@ -0,0 +1,224 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from karapace.auth import ACLAuthorizer, ACLEntry, hash_password, HashAlgorithm, Operation, User + +import re + + +def test_empty_acl_authorizer() -> None: + authorizer = ACLAuthorizer() + admin_password_hash = hash_password(algorithm=HashAlgorithm.SHA256, salt="salt", plaintext_password="password") + assert False is authorizer.check_authorization( + user=User(username="admin", algorithm=HashAlgorithm.SHA256, salt="salt", password_hash=admin_password_hash), + operation=Operation.Read, + resource="Subject:*", + ) + + +def test_acl_authorizer() -> None: + admin_password_hash = hash_password( + algorithm=HashAlgorithm.SHA256, + salt="salt", + plaintext_password="admin_password", + ) + read_user_password_hash = hash_password( + algorithm=HashAlgorithm.SHA256, + salt="salt", + plaintext_password="read_password", + ) + write_user_password_hash = hash_password( + algorithm=HashAlgorithm.SHA256, + salt="salt", + plaintext_password="write_password", + ) + readwrite_user_password_hash = hash_password( + algorithm=HashAlgorithm.SHA256, + salt="salt", + plaintext_password="readwrite_password", + ) + + admin_user = User( + username="admin", + algorithm=HashAlgorithm.SHA256, + salt="salt", + password_hash=admin_password_hash, + ) + read_user = User( + username="read", + algorithm=HashAlgorithm.SHA256, + salt="salt", + password_hash=read_user_password_hash, + ) + write_user = User( + username="write", + algorithm=HashAlgorithm.SHA256, + salt="salt", + password_hash=write_user_password_hash, + ) + readwrite_user = User( + username="readwrite", + algorithm=HashAlgorithm.SHA256, + salt="salt", + password_hash=readwrite_user_password_hash, + ) + + authorizer = ACLAuthorizer( + user_db={ + "admin": admin_user, + "read": read_user, + "write": write_user, + "readwrite": readwrite_user, + }, + permissions=[ + ACLEntry("admin", Operation.Read, re.compile("Subject:*")), + ACLEntry("admin", Operation.Write, re.compile("Subject:*")), + ACLEntry("admin", Operation.Read, re.compile("Config:*")), + ACLEntry("admin", Operation.Write, re.compile("Config:*")), + ACLEntry("read", Operation.Read, re.compile("Subject:read_subject")), + ACLEntry("write", Operation.Write, re.compile("Subject:write_subject")), + ACLEntry("readwrite", Operation.Read, re.compile("Subject:readwrite_subject")), + ACLEntry("readwrite", Operation.Write, re.compile("Subject:readwrite_subject")), + ], + ) + + assert True is authorizer.check_authorization( + user=admin_user, + operation=Operation.Read, + resource="Subject:any_subject", + ) + assert True is authorizer.check_authorization( + user=admin_user, + operation=Operation.Read, + resource="Config:any_config", + ) + assert True is authorizer.check_authorization_any( + user=admin_user, + operation=Operation.Read, + resources=[ + "Config:any_config", + "Subject:any_subject", + "Unknown:resource", + ], + ) + + assert True is authorizer.check_authorization( + user=read_user, + operation=Operation.Read, + resource="Subject:read_subject", + ) + assert False is authorizer.check_authorization( + user=read_user, + operation=Operation.Read, + resource="Subject:any_subject", + ) + assert False is authorizer.check_authorization( + user=read_user, + operation=Operation.Write, + resource="Subject:read_subject", + ) + assert False is authorizer.check_authorization( + user=read_user, + operation=Operation.Write, + resource="Subject:write_subject", + ) + assert False is authorizer.check_authorization_any( + user=read_user, + operation=Operation.Read, + resources=[ + "Config:any_config", + "Subject:any_subject", + "Unknown:resource", + ], + ) + assert True is authorizer.check_authorization_any( + user=read_user, + operation=Operation.Read, + resources=[ + "Config:any_config", + "Subject:any_subject", + "Unknown:resource", + "Subject:read_subject", + ], + ) + + assert True is authorizer.check_authorization( + user=write_user, + operation=Operation.Write, + resource="Subject:write_subject", + ) + assert False is authorizer.check_authorization( + user=write_user, + operation=Operation.Write, + resource="Subject:any_subject", + ) + assert False is authorizer.check_authorization( + user=write_user, + operation=Operation.Write, + resource="Subject:read_subject", + ) + assert False is authorizer.check_authorization( + user=write_user, + operation=Operation.Read, + resource="Subject:read_subject", + ) + assert False is authorizer.check_authorization_any( + user=write_user, + operation=Operation.Write, + resources=[ + "Config:any_config", + "Subject:any_subject", + "Unknown:resource", + ], + ) + assert True is authorizer.check_authorization_any( + user=write_user, + operation=Operation.Write, + resources=[ + "Config:any_config", + "Subject:any_subject", + "Unknown:resource", + "Subject:write_subject", + ], + ) + + assert True is authorizer.check_authorization( + user=readwrite_user, + operation=Operation.Write, + resource="Subject:readwrite_subject", + ) + assert True is authorizer.check_authorization( + user=readwrite_user, + operation=Operation.Read, + resource="Subject:readwrite_subject", + ) + assert False is authorizer.check_authorization( + user=readwrite_user, + operation=Operation.Write, + resource="Subject:any_subject", + ) + assert False is authorizer.check_authorization( + user=readwrite_user, + operation=Operation.Write, + resource="Subject:read_subject", + ) + assert False is authorizer.check_authorization_any( + user=readwrite_user, + operation=Operation.Write, + resources=[ + "Config:any_config", + "Subject:any_subject", + "Unknown:resource", + ], + ) + assert True is authorizer.check_authorization_any( + user=readwrite_user, + operation=Operation.Write, + resources=[ + "Config:any_config", + "Subject:any_subject", + "Unknown:resource", + "Subject:readwrite_subject", + ], + )