diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index 88ecf0a27..79586cbcd 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -462,6 +462,10 @@ def num_consumers(self) -> int: return len(self.consumer_manager.consumers) async def _maybe_create_async_producer(self) -> AsyncKafkaProducer: + """ + :raises NoBrokersAvailable: + :raises AuthenticationFailedError: + """ if self._async_producer is not None: return self._async_producer @@ -672,6 +676,10 @@ async def aclose(self) -> None: self.consumer_manager = None async def publish(self, topic: str, partition_id: Optional[str], content_type: str, request: HTTPRequest) -> None: + """ + :raises NoBrokersAvailable: + :raises AuthenticationFailedError: + """ formats: dict = request.content_type data: dict = request.json _ = await self.get_topic_info(topic, content_type) @@ -725,11 +733,25 @@ async def publish(self, topic: str, partition_id: Optional[str], content_type: s async def partition_publish(self, topic: str, partition_id: str, content_type: str, *, request: HTTPRequest) -> None: log.debug("Executing partition publish on topic %s and partition %s", topic, partition_id) - await self.publish(topic, partition_id, content_type, request) + try: + await self.publish(topic, partition_id, content_type, request) + except (NoBrokersAvailable, AuthenticationFailedError): + KafkaRest.service_unavailable( + message="Service unavailable", + content_type=content_type, + sub_code=RESTErrorCodes.HTTP_SERVICE_UNAVAILABLE.value, + ) async def topic_publish(self, topic: str, content_type: str, *, request: HTTPRequest) -> None: log.debug("Executing topic publish on topic %s", topic) - await self.publish(topic, None, content_type, request) + try: + await self.publish(topic, None, content_type, request) + except (NoBrokersAvailable, AuthenticationFailedError): + KafkaRest.service_unavailable( + message="Service unavailable", + content_type=content_type, + sub_code=RESTErrorCodes.HTTP_SERVICE_UNAVAILABLE.value, + ) @staticmethod def validate_partition_id(partition_id: str, content_type: str) -> int: @@ -1038,6 +1060,10 @@ async def validate_publish_request_format(self, data: dict, formats: dict, conte ) async def produce_messages(self, *, topic: str, prepared_records: List) -> List: + """ + :raises NoBrokersAvailable: + :raises AuthenticationFailedError: + """ producer = await self._maybe_create_async_producer() produce_futures = [] diff --git a/karapace/kafka_rest_apis/error_codes.py b/karapace/kafka_rest_apis/error_codes.py index 7ff0ac4df..f57a10f3c 100644 --- a/karapace/kafka_rest_apis/error_codes.py +++ b/karapace/kafka_rest_apis/error_codes.py @@ -11,6 +11,7 @@ class RESTErrorCodes(Enum): HTTP_NOT_FOUND = HTTPStatus.NOT_FOUND.value HTTP_INTERNAL_SERVER_ERROR = HTTPStatus.INTERNAL_SERVER_ERROR.value HTTP_UNPROCESSABLE_ENTITY = HTTPStatus.UNPROCESSABLE_ENTITY.value + HTTP_SERVICE_UNAVAILABLE = HTTPStatus.SERVICE_UNAVAILABLE.value TOPIC_NOT_FOUND = 40401 PARTITION_NOT_FOUND = 40402 CONSUMER_NOT_FOUND = 40403 diff --git a/karapace/karapace.py b/karapace/karapace.py index 36c589cb9..28e26cf91 100644 --- a/karapace/karapace.py +++ b/karapace/karapace.py @@ -76,6 +76,17 @@ def not_found(message: str, sub_code: int, content_type: str) -> NoReturn: content_type=content_type, status=HTTPStatus.NOT_FOUND, body={"message": message, "error_code": sub_code} ) + @staticmethod + def service_unavailable(message: str, sub_code: int, content_type: str) -> NoReturn: + KarapaceBase.r( + content_type=content_type, + status=HTTPStatus.SERVICE_UNAVAILABLE, + body={ + "message": message, + "error_code": sub_code, + }, + ) + async def root_get(self) -> NoReturn: self.r({}, "application/json")