Skip to content

Commit

Permalink
[Metrics] Add a few WebSocket metrics
Browse files Browse the repository at this point in the history
- open connections count
- requests count
  • Loading branch information
quantranhong1999 authored and Arsnael committed Nov 1, 2024
1 parent 85f69b3 commit 9e4aca8
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,13 @@ public interface Metric {
default double movingAverage() {
return Long.valueOf(getCount()).doubleValue();
}

/**
* Mean rate of the events happen in one second.
*
* Default to count (naive implementation with period starting at boot time)
*/
default double meanRate() {
return Long.valueOf(getCount()).doubleValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,9 @@ public long getCount() {
public double movingAverage() {
return meter.getFiveMinuteRate();
}

@Override
public double meanRate() {
return meter.getMeanRate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.james.jmap.http.{Authenticator, UserProvisioning}
import org.apache.james.jmap.json.{PushSerializer, ResponseSerializer}
import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes, InjectionKeys => JMAPInjectionKeys}
import org.apache.james.mailbox.MailboxSession
import org.apache.james.metrics.api.{Metric, MetricFactory}
import org.apache.james.user.api.DelegationStore
import org.slf4j.{Logger, LoggerFactory}
import play.api.libs.json.Json
Expand Down Expand Up @@ -79,7 +80,10 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
emailChangeRepository: EmailChangeRepository,
pushSerializer: PushSerializer,
typeStateFactory: TypeStateFactory,
delegationStore: DelegationStore) extends JMAPRoutes {
delegationStore: DelegationStore,
metricFactory: MetricFactory) extends JMAPRoutes {
private val openingConnectionsMetric: Metric = metricFactory.generate("jmap_websocket_opening_connections_count")
private val requestCountMetric: Metric = metricFactory.generate("jmap_websocket_requests_count")

override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of(
JMAPRoute.builder
Expand All @@ -91,16 +95,17 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
.action(JMAPRoutes.CORS_CONTROL)
.corsHeaders())

private def handleWebSockets(httpServerRequest: HttpServerRequest, httpServerResponse: HttpServerResponse): Mono[Void] = {
private def handleWebSockets(httpServerRequest: HttpServerRequest, httpServerResponse: HttpServerResponse): Mono[Void] =
SMono(authenticator.authenticate(httpServerRequest))
.flatMap((mailboxSession: MailboxSession) => userProvisioner.provisionUser(mailboxSession)
.`then`
.`then`(SMono(httpServerResponse.addHeader(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, "jmap")
.sendWebsocket((in, out) => handleWebSocketConnection(mailboxSession)(in, out)))))
.sendWebsocket((in, out) => handleWebSocketConnection(mailboxSession)(in, out)))
.doOnSubscribe(_ => openingConnectionsMetric.increment())
.doOnTerminate(() => openingConnectionsMetric.decrement())))
.onErrorResume(throwable => handleHttpHandshakeError(throwable, httpServerResponse))
.asJava()
.`then`()
}

private def handleWebSocketConnection(session: MailboxSession)(in: WebsocketInbound, out: WebsocketOutbound): Mono[Void] = {
val sink: Sinks.Many[OutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
Expand All @@ -113,6 +118,7 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
frame.content().readBytes(bytes)
new String(bytes, StandardCharsets.UTF_8)
})
.doOnNext(_ => requestCountMetric.increment())
.flatMap(message => handleClientMessages(context)(message))
.doOnTerminate(context.clean)
.doOnCancel(context.clean)
Expand Down

0 comments on commit 9e4aca8

Please sign in to comment.