diff --git a/karapace/instrumentation/__init__.py b/karapace/instrumentation/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/karapace/instrumentation/prometheus.py b/karapace/instrumentation/prometheus.py new file mode 100644 index 000000000..d4b168d35 --- /dev/null +++ b/karapace/instrumentation/prometheus.py @@ -0,0 +1,99 @@ +""" +karapace - prometheus instrumentation + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from __future__ import annotations + +from aiohttp.web import middleware, Request, RequestHandler, Response +from karapace.rapu import RestApp +from prometheus_client import CollectorRegistry, Counter, Gauge, generate_latest, Histogram +from typing import ClassVar + +import logging +import time + +LOG = logging.getLogger(__name__) + + +class PrometheusInstrumentation: + METRICS_ENDPOINT_PATH: ClassVar[str] = "/metrics" + START_TIME_REQUEST_KEY: ClassVar[str] = "start_time" + + registry: ClassVar[CollectorRegistry] = CollectorRegistry() + + karapace_http_requests_total: ClassVar[Counter] = Counter( + registry=registry, + name="karapace_http_requests_total", + documentation="Total Request Count for HTTP/TCP Protocol", + labelnames=("method", "path", "status"), + ) + + karapace_http_requests_latency_seconds: ClassVar[Histogram] = Histogram( + registry=registry, + name="karapace_http_requests_latency_seconds", + documentation="Request Duration for HTTP/TCP Protocol", + labelnames=("method", "path"), + ) + + karapace_http_requests_in_progress: ClassVar[Gauge] = Gauge( + registry=registry, + name="karapace_http_requests_in_progress", + documentation="Request Duration for HTTP/TCP Protocol", + labelnames=("method", "path"), + ) + + @classmethod + def setup_metrics(cls: PrometheusInstrumentation, *, app: RestApp) -> None: + LOG.info("Setting up prometheus metrics") + app.route( + cls.METRICS_ENDPOINT_PATH, + callback=cls.serve_metrics, + method="GET", + schema_request=False, + with_request=False, + json_body=False, + auth=None, + ) + app.app.middlewares.insert(0, cls.http_request_metrics_middleware) + app.app[cls.karapace_http_requests_total] = cls.karapace_http_requests_total + app.app[cls.karapace_http_requests_latency_seconds] = cls.karapace_http_requests_latency_seconds + app.app[cls.karapace_http_requests_in_progress] = cls.karapace_http_requests_in_progress + + @classmethod + async def serve_metrics(cls: PrometheusInstrumentation) -> bytes: + return generate_latest(cls.registry) + + @classmethod + @middleware + async def http_request_metrics_middleware( + cls: PrometheusInstrumentation, + request: Request, + handler: RequestHandler, + ) -> None: + request[cls.START_TIME_REQUEST_KEY] = time.time() + + # Extract request labels + path = request.path + method = request.method + + # Increment requests in progress before handler + request.app[cls.karapace_http_requests_in_progress].labels(method, path).inc() + + # Call request handler + response: Response = await handler(request) + + # Instrument request latency + request.app[cls.karapace_http_requests_latency_seconds].labels(method, path).observe( + time.time() - request[cls.START_TIME_REQUEST_KEY] + ) + + # Instrument total requests + request.app[cls.karapace_http_requests_total].labels(method, path, response.status).inc() + + # Decrement requests in progress after handler + request.app[cls.karapace_http_requests_in_progress].labels(method, path).dec() + + return response diff --git a/karapace/karapace_all.py b/karapace/karapace_all.py index 353c5021a..620928428 100644 --- a/karapace/karapace_all.py +++ b/karapace/karapace_all.py @@ -6,6 +6,7 @@ from contextlib import closing from karapace import version as karapace_version from karapace.config import read_config +from karapace.instrumentation.prometheus import PrometheusInstrumentation from karapace.kafka_rest_apis import KafkaRest from karapace.rapu import RestApp from karapace.schema_registry_apis import KarapaceSchemaRegistryController @@ -63,6 +64,7 @@ def main() -> int: try: # `close` will be called by the callback `close_by_app` set by `KarapaceBase` + PrometheusInstrumentation.setup_metrics(app=app) app.run() except Exception as ex: # pylint: disable-broad-except app.stats.unexpected_exception(ex=ex, where="karapace") diff --git a/tests/unit/instrumentation/test_prometheus.py b/tests/unit/instrumentation/test_prometheus.py new file mode 100644 index 000000000..7fcbf4f19 --- /dev/null +++ b/tests/unit/instrumentation/test_prometheus.py @@ -0,0 +1,119 @@ +""" +karapace - prometheus instrumentation tests + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from _pytest.logging import LogCaptureFixture +from karapace.instrumentation.prometheus import PrometheusInstrumentation +from karapace.rapu import RestApp +from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram +from unittest.mock import AsyncMock, call, MagicMock, patch + +import aiohttp.web +import logging +import pytest + + +class TestPrometheusInstrumentation: + @pytest.fixture + def prometheus(self) -> PrometheusInstrumentation: + return PrometheusInstrumentation() + + def test_constants(self, prometheus: PrometheusInstrumentation) -> None: + assert prometheus.START_TIME_REQUEST_KEY == "start_time" + assert isinstance(prometheus.registry, CollectorRegistry) + + def test_metric_types(self, prometheus: PrometheusInstrumentation) -> None: + assert isinstance(prometheus.karapace_http_requests_total, Counter) + assert isinstance(prometheus.karapace_http_requests_latency_seconds, Histogram) + assert isinstance(prometheus.karapace_http_requests_in_progress, Gauge) + + def test_metric_values(self, prometheus: PrometheusInstrumentation) -> None: + # `_total` suffix is stripped off the metric name for `Counters`, but needed for clarity. + assert repr(prometheus.karapace_http_requests_total) == "prometheus_client.metrics.Counter(karapace_http_requests)" + assert ( + repr(prometheus.karapace_http_requests_latency_seconds) + == "prometheus_client.metrics.Histogram(karapace_http_requests_latency_seconds)" + ) + assert ( + repr(prometheus.karapace_http_requests_in_progress) + == "prometheus_client.metrics.Gauge(karapace_http_requests_in_progress)" + ) + + def test_setup_metrics(self, caplog: LogCaptureFixture, prometheus: PrometheusInstrumentation) -> None: + app = AsyncMock(spec=RestApp, app=AsyncMock(spec=aiohttp.web.Application)) + + with caplog.at_level(logging.INFO, logger="karapace.instrumentation.prometheus"): + prometheus.setup_metrics(app=app) + + app.route.assert_called_once_with( + prometheus.METRICS_ENDPOINT_PATH, + callback=prometheus.serve_metrics, + method="GET", + schema_request=False, + with_request=False, + json_body=False, + auth=None, + ) + app.app.middlewares.insert.assert_called_once_with(0, prometheus.http_request_metrics_middleware) + app.app.__setitem__.assert_has_calls( + [ + call(prometheus.karapace_http_requests_total, prometheus.karapace_http_requests_total), + call( + prometheus.karapace_http_requests_latency_seconds, + prometheus.karapace_http_requests_latency_seconds, + ), + call(prometheus.karapace_http_requests_in_progress, prometheus.karapace_http_requests_in_progress), + ] + ) + for log in caplog.records: + assert log.name == "karapace.instrumentation.prometheus" + assert log.levelname == "INFO" + assert log.message == "Setting up prometheus metrics" + + @patch("karapace.instrumentation.prometheus.generate_latest") + async def test_serve_metrics(self, generate_latest: MagicMock, prometheus: PrometheusInstrumentation) -> None: + await prometheus.serve_metrics() + generate_latest.assert_called_once_with(prometheus.registry) + + @patch("karapace.instrumentation.prometheus.time") + async def test_http_request_metrics_middleware( + self, + mock_time: MagicMock, + prometheus: PrometheusInstrumentation, + ) -> None: + mock_time.time.return_value = 10 + request = AsyncMock( + spec=aiohttp.web.Request, app=AsyncMock(spec=aiohttp.web.Application), path="/path", method="GET" + ) + handler = AsyncMock(spec=aiohttp.web.RequestHandler, return_value=MagicMock(status=200)) + + await prometheus.http_request_metrics_middleware(request=request, handler=handler) + + request.__setitem__.assert_called_once_with(prometheus.START_TIME_REQUEST_KEY, 10) + request.app[prometheus.karapace_http_requests_in_progress].labels.assert_has_calls( + [ + call("GET", "/path"), + call().inc(), + ] + ) + request.app[prometheus.karapace_http_requests_latency_seconds].labels.assert_has_calls( + [ + call("GET", "/path"), + call().observe(request.__getitem__.return_value.__rsub__.return_value), + ] + ) + request.app[prometheus.karapace_http_requests_total].labels.assert_has_calls( + [ + call("GET", "/path", 200), + call().inc(), + ] + ) + request.app[prometheus.karapace_http_requests_in_progress].labels.assert_has_calls( + [ + call("GET", "/path"), + call().dec(), + ] + )