From 9e4aca8dbc7f472183f7358ba9e0b8509c836a9f Mon Sep 17 00:00:00 2001 From: Quan Tran Date: Thu, 31 Oct 2024 14:16:00 +0700 Subject: [PATCH] [Metrics] Add a few WebSocket metrics - open connections count - requests count --- .../java/org/apache/james/metrics/api/Metric.java | 9 +++++++++ .../james/metrics/dropwizard/DropWizardMetric.java | 5 +++++ .../apache/james/jmap/routes/WebSocketRoutes.scala | 14 ++++++++++---- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/Metric.java b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/Metric.java index 87efdd9843d..131fc6bc2f3 100644 --- a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/Metric.java +++ b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/Metric.java @@ -46,4 +46,13 @@ public interface Metric { default double movingAverage() { return Long.valueOf(getCount()).doubleValue(); } + + /** + * Mean rate of the events happen in one second. + * + * Default to count (naive implementation with period starting at boot time) + */ + default double meanRate() { + return Long.valueOf(getCount()).doubleValue(); + } } diff --git a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetric.java b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetric.java index 02d6fa55ad2..fc271917c78 100644 --- a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetric.java +++ b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetric.java @@ -72,4 +72,9 @@ public long getCount() { public double movingAverage() { return meter.getFiveMinuteRate(); } + + @Override + public double meanRate() { + return meter.getMeanRate(); + } } diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala index f5965088974..a861a99be18 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala @@ -41,6 +41,7 @@ import org.apache.james.jmap.http.{Authenticator, UserProvisioning} import org.apache.james.jmap.json.{PushSerializer, ResponseSerializer} import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes, InjectionKeys => JMAPInjectionKeys} import org.apache.james.mailbox.MailboxSession +import org.apache.james.metrics.api.{Metric, MetricFactory} import org.apache.james.user.api.DelegationStore import org.slf4j.{Logger, LoggerFactory} import play.api.libs.json.Json @@ -79,7 +80,10 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato emailChangeRepository: EmailChangeRepository, pushSerializer: PushSerializer, typeStateFactory: TypeStateFactory, - delegationStore: DelegationStore) extends JMAPRoutes { + delegationStore: DelegationStore, + metricFactory: MetricFactory) extends JMAPRoutes { + private val openingConnectionsMetric: Metric = metricFactory.generate("jmap_websocket_opening_connections_count") + private val requestCountMetric: Metric = metricFactory.generate("jmap_websocket_requests_count") override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of( JMAPRoute.builder @@ -91,16 +95,17 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato .action(JMAPRoutes.CORS_CONTROL) .corsHeaders()) - private def handleWebSockets(httpServerRequest: HttpServerRequest, httpServerResponse: HttpServerResponse): Mono[Void] = { + private def handleWebSockets(httpServerRequest: HttpServerRequest, httpServerResponse: HttpServerResponse): Mono[Void] = SMono(authenticator.authenticate(httpServerRequest)) .flatMap((mailboxSession: MailboxSession) => userProvisioner.provisionUser(mailboxSession) .`then` .`then`(SMono(httpServerResponse.addHeader(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, "jmap") - .sendWebsocket((in, out) => handleWebSocketConnection(mailboxSession)(in, out))))) + .sendWebsocket((in, out) => handleWebSocketConnection(mailboxSession)(in, out))) + .doOnSubscribe(_ => openingConnectionsMetric.increment()) + .doOnTerminate(() => openingConnectionsMetric.decrement()))) .onErrorResume(throwable => handleHttpHandshakeError(throwable, httpServerResponse)) .asJava() .`then`() - } private def handleWebSocketConnection(session: MailboxSession)(in: WebsocketInbound, out: WebsocketOutbound): Mono[Void] = { val sink: Sinks.Many[OutboundMessage] = Sinks.many().unicast().onBackpressureBuffer() @@ -113,6 +118,7 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato frame.content().readBytes(bytes) new String(bytes, StandardCharsets.UTF_8) }) + .doOnNext(_ => requestCountMetric.increment()) .flatMap(message => handleClientMessages(context)(message)) .doOnTerminate(context.clean) .doOnCancel(context.clean)