diff --git a/rxnetty-examples/src/test/java/io/reactivex/netty/spectator/internal/LatencyMetricTest.java b/rxnetty-examples/src/test/java/io/reactivex/netty/spectator/internal/LatencyMetricTest.java deleted file mode 100644 index f88d971a..00000000 --- a/rxnetty-examples/src/test/java/io/reactivex/netty/spectator/internal/LatencyMetricTest.java +++ /dev/null @@ -1,73 +0,0 @@ -package io.reactivex.netty.spectator.internal; - -import com.netflix.spectator.api.DefaultRegistry; -import com.netflix.spectator.api.Id; -import com.netflix.spectator.api.Measurement; -import com.netflix.spectator.api.Meter; -import io.reactivex.netty.spectator.internal.LatencyMetrics.Percentile; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExternalResource; -import org.junit.runner.Description; -import org.junit.runners.model.Statement; - -import java.util.Iterator; -import java.util.Random; -import java.util.concurrent.TimeUnit; - -import static org.hamcrest.MatcherAssert.*; -import static org.hamcrest.Matchers.*; - -public class LatencyMetricTest { - - @Rule - public final LatencyMetricRule metricRule = new LatencyMetricRule(); - - @Test(timeout = 60000) - public void testPercentiles() throws Exception { - metricRule.populateMetrics(); - for (Percentile percentile : Percentile.values()) { - Meter spectatorMetric = metricRule.getSpectatorMetric(percentile); - double p = metricRule.metrics.getPercentileHolder().getPercentile(percentile.getValue()); - Iterator measure = spectatorMetric.measure().iterator(); - assertThat("Spectator metric measure is empty.", measure.hasNext()); - assertThat("Unexpected value of percentile: " + percentile.getTag(), measure.next().value(), - is(p)); - } - } - - public static class LatencyMetricRule extends ExternalResource { - - private static final Random random = new Random(); - - private LatencyMetrics metrics; - private String monitorId; - private String metricName; - private DefaultRegistry registry; - - @Override - public Statement apply(final Statement base, Description description) { - return new Statement() { - @Override - public void evaluate() throws Throwable { - metricName = "latency-test"; - monitorId = "latency-test-" + random.nextInt(100); - registry = new DefaultRegistry(); - metrics = new LatencyMetrics(metricName, monitorId, registry); - base.evaluate(); - } - }; - } - - public Meter getSpectatorMetric(Percentile percentile) { - Id id = registry.createId(metricName, "id", monitorId, "percentile", percentile.getTag()); - return registry.get(id); - } - - public void populateMetrics() { - for (int i = 0; i < 100; i++) { - metrics.record(1, TimeUnit.SECONDS); - } - } - } -} diff --git a/rxnetty-spectator-http/build.gradle b/rxnetty-spectator-http/build.gradle index 4199e3a4..c0fd107b 100644 --- a/rxnetty-spectator-http/build.gradle +++ b/rxnetty-spectator-http/build.gradle @@ -23,5 +23,4 @@ dependencies { compile project(':rxnetty-http') compile project(':rxnetty-spectator-tcp') compile project(':rxnetty-common') - compile 'com.netflix.spectator:spectator-api:0.40.0' } diff --git a/rxnetty-spectator-http/src/main/java/io/reactivex/netty/spectator/http/HttpClientListener.java b/rxnetty-spectator-http/src/main/java/io/reactivex/netty/spectator/http/HttpClientListener.java index 32f2e408..e441ce95 100644 --- a/rxnetty-spectator-http/src/main/java/io/reactivex/netty/spectator/http/HttpClientListener.java +++ b/rxnetty-spectator-http/src/main/java/io/reactivex/netty/spectator/http/HttpClientListener.java @@ -17,44 +17,32 @@ package io.reactivex.netty.spectator.http; -import com.netflix.spectator.api.Counter; import com.netflix.spectator.api.Registry; import com.netflix.spectator.api.Spectator; import io.reactivex.netty.protocol.http.client.events.HttpClientEventsListener; import io.reactivex.netty.spectator.http.internal.ResponseCodesHolder; -import io.reactivex.netty.spectator.internal.LatencyMetrics; +import io.reactivex.netty.spectator.internal.EventMetric; import io.reactivex.netty.spectator.tcp.TcpClientListener; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static io.reactivex.netty.spectator.internal.SpectatorUtils.*; /** * HttpClientListener. */ public class HttpClientListener extends HttpClientEventsListener { - private final AtomicInteger requestBacklog; - private final AtomicInteger inflightRequests; - private final Counter processedRequests; - private final Counter requestWriteFailed; - private final Counter failedResponses; + private final EventMetric requestWrite; + private final EventMetric requestProcessing; + private final EventMetric response; + private final ResponseCodesHolder responseCodesHolder; - private final LatencyMetrics requestWriteTimes; - private final LatencyMetrics responseReadTimes; - private final LatencyMetrics requestProcessingTimes; private final TcpClientListener tcpDelegate; public HttpClientListener(Registry registry, String monitorId) { - requestBacklog = newGauge(registry, "requestBacklog", monitorId, new AtomicInteger()); - inflightRequests = newGauge(registry, "inflightRequests", monitorId, new AtomicInteger()); - requestWriteTimes = new LatencyMetrics("requestWriteTimes", monitorId, registry); - responseReadTimes = new LatencyMetrics("responseReadTimes", monitorId, registry); - processedRequests = newCounter(registry, "processedRequests", monitorId); - requestWriteFailed = newCounter(registry, "requestWriteFailed", monitorId); - failedResponses = newCounter(registry, "failedResponses", monitorId); - requestProcessingTimes = new LatencyMetrics("requestProcessingTimes", monitorId, registry); + requestWrite = new EventMetric(registry, "request", monitorId, "action", "write"); + requestProcessing = new EventMetric(registry, "request", monitorId, "action", "processing"); + response = new EventMetric(registry, "response", monitorId, "action", "read"); + responseCodesHolder = new ResponseCodesHolder(registry, monitorId); tcpDelegate = new TcpClientListener(registry, monitorId); } @@ -63,49 +51,9 @@ public HttpClientListener(String monitorId) { this(Spectator.globalRegistry(), monitorId); } - public long getRequestBacklog() { - return requestBacklog.get(); - } - - public long getInflightRequests() { - return inflightRequests.get(); - } - - public long getProcessedRequests() { - return processedRequests.count(); - } - - public long getRequestWriteFailed() { - return requestWriteFailed.count(); - } - - public long getFailedResponses() { - return failedResponses.count(); - } - - public long getResponse1xx() { - return responseCodesHolder.getResponse1xx(); - } - - public long getResponse2xx() { - return responseCodesHolder.getResponse2xx(); - } - - public long getResponse3xx() { - return responseCodesHolder.getResponse3xx(); - } - - public long getResponse4xx() { - return responseCodesHolder.getResponse4xx(); - } - - public long getResponse5xx() { - return responseCodesHolder.getResponse5xx(); - } - @Override public void onRequestProcessingComplete(long duration, TimeUnit timeUnit) { - requestProcessingTimes.record(duration, timeUnit); + requestProcessing.success(duration, timeUnit); } @Override @@ -115,38 +63,32 @@ public void onResponseHeadersReceived(int responseCode, long duration, TimeUnit @Override public void onResponseReceiveComplete(long duration, TimeUnit timeUnit) { - inflightRequests.decrementAndGet(); - processedRequests.increment(); - responseReadTimes.record(duration, timeUnit); + response.success(duration, timeUnit); } @Override public void onRequestWriteStart() { - requestBacklog.decrementAndGet(); + requestWrite.start(); } @Override public void onResponseFailed(Throwable throwable) { - inflightRequests.decrementAndGet(); - processedRequests.increment(); - failedResponses.increment(); + response.failure(); } @Override public void onRequestWriteComplete(long duration, TimeUnit timeUnit) { - requestWriteTimes.record(duration, timeUnit); + requestWrite.success(duration, timeUnit); } @Override public void onRequestWriteFailed(long duration, TimeUnit timeUnit, Throwable throwable) { - inflightRequests.decrementAndGet(); - requestWriteFailed.increment(); + requestWrite.failure(duration, timeUnit); } @Override public void onRequestSubmitted() { - requestBacklog.incrementAndGet(); - inflightRequests.incrementAndGet(); + requestProcessing.start(); } @Override @@ -256,84 +198,4 @@ public void onConnectSuccess(long duration, TimeUnit timeUnit) { public void onConnectStart() { tcpDelegate.onConnectStart(); } - - public long getLiveConnections() { - return tcpDelegate.getLiveConnections(); - } - - public long getConnectionCount() { - return tcpDelegate.getConnectionCount(); - } - - public long getPendingConnects() { - return tcpDelegate.getPendingConnects(); - } - - public long getFailedConnects() { - return tcpDelegate.getFailedConnects(); - } - - public long getPendingConnectionClose() { - return tcpDelegate.getPendingConnectionClose(); - } - - public long getFailedConnectionClose() { - return tcpDelegate.getFailedConnectionClose(); - } - - public long getPendingPoolAcquires() { - return tcpDelegate.getPendingPoolAcquires(); - } - - public long getFailedPoolAcquires() { - return tcpDelegate.getFailedPoolAcquires(); - } - - public long getPendingPoolReleases() { - return tcpDelegate.getPendingPoolReleases(); - } - - public long getFailedPoolReleases() { - return tcpDelegate.getFailedPoolReleases(); - } - - public long getPoolEvictions() { - return tcpDelegate.getPoolEvictions(); - } - - public long getPoolReuse() { - return tcpDelegate.getPoolReuse(); - } - - public long getPendingWrites() { - return tcpDelegate.getPendingWrites(); - } - - public long getPendingFlushes() { - return tcpDelegate.getPendingFlushes(); - } - - public long getBytesWritten() { - return tcpDelegate.getBytesWritten(); - } - - public long getBytesRead() { - return tcpDelegate.getBytesRead(); - } - - public long getFailedWrites() { - return tcpDelegate.getFailedWrites(); - } - - public long getFailedFlushes() { - return tcpDelegate.getFailedFlushes(); - } - - public long getPoolAcquires() { - return tcpDelegate.getPoolAcquires(); - } - - public long getPoolReleases() { - return tcpDelegate.getPoolReleases(); - } } diff --git a/rxnetty-spectator-http/src/main/java/io/reactivex/netty/spectator/http/HttpServerListener.java b/rxnetty-spectator-http/src/main/java/io/reactivex/netty/spectator/http/HttpServerListener.java index 87f49225..15c9a3c7 100644 --- a/rxnetty-spectator-http/src/main/java/io/reactivex/netty/spectator/http/HttpServerListener.java +++ b/rxnetty-spectator-http/src/main/java/io/reactivex/netty/spectator/http/HttpServerListener.java @@ -17,34 +17,25 @@ package io.reactivex.netty.spectator.http; -import com.netflix.spectator.api.Counter; import com.netflix.spectator.api.Registry; import com.netflix.spectator.api.Spectator; import io.reactivex.netty.protocol.http.server.events.HttpServerEventsListener; import io.reactivex.netty.spectator.http.internal.ResponseCodesHolder; -import io.reactivex.netty.spectator.internal.LatencyMetrics; +import io.reactivex.netty.spectator.internal.EventMetric; import io.reactivex.netty.spectator.tcp.TcpServerListener; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static io.reactivex.netty.spectator.internal.SpectatorUtils.*; /** * HttpServerListener. */ public class HttpServerListener extends HttpServerEventsListener { - private final AtomicInteger requestBacklog; - private final AtomicInteger inflightRequests; - private final Counter processedRequests; - private final ResponseCodesHolder responseCodesHolder; - private final Counter failedRequests; - private final Counter responseWriteFailed; - private final LatencyMetrics responseWriteTimes; - private final LatencyMetrics requestReadTimes; - private final LatencyMetrics requestProcessingTimes; + private final EventMetric requestRead; + private final EventMetric requestProcessing; + private final EventMetric responseWrite; + private final ResponseCodesHolder responseCodesHolder; private final TcpServerListener tcpDelegate; public HttpServerListener(String monitorId) { @@ -52,102 +43,55 @@ public HttpServerListener(String monitorId) { } public HttpServerListener(Registry registry, String monitorId) { - requestBacklog = newGauge(registry, "requestBacklog", monitorId, new AtomicInteger()); - inflightRequests = newGauge(registry, "inflightRequests", monitorId, new AtomicInteger()); - responseWriteTimes = new LatencyMetrics("responseWriteTimes", monitorId, registry); - requestReadTimes = new LatencyMetrics("requestReadTimes", monitorId, registry); - requestProcessingTimes = new LatencyMetrics("requestProcessingTimes", monitorId, registry); - processedRequests = newCounter(registry, "processedRequests", monitorId); - failedRequests = newCounter(registry, "failedRequests", monitorId); - responseWriteFailed = newCounter(registry, "responseWriteFailed", monitorId); + requestRead = new EventMetric(registry, "request", monitorId, "action", "read"); + requestProcessing = new EventMetric(registry, "request", monitorId, "action", "processing"); + responseWrite = new EventMetric(registry, "response", monitorId, "action", "write"); responseCodesHolder = new ResponseCodesHolder(registry, monitorId); tcpDelegate = new TcpServerListener(registry, monitorId); } - public long getRequestBacklog() { - return requestBacklog.get(); - } - - public long getInflightRequests() { - return inflightRequests.get(); - } - - public long getProcessedRequests() { - return processedRequests.count(); - } - - public long getFailedRequests() { - return failedRequests.count(); - } - - public long getResponseWriteFailed() { - return responseWriteFailed.count(); - } - - public long getResponse1xx() { - return responseCodesHolder.getResponse1xx(); - } - - public long getResponse2xx() { - return responseCodesHolder.getResponse2xx(); - } - - public long getResponse3xx() { - return responseCodesHolder.getResponse3xx(); - } - - public long getResponse4xx() { - return responseCodesHolder.getResponse4xx(); - } - - public long getResponse5xx() { - return responseCodesHolder.getResponse5xx(); - } - public static HttpServerListener newHttpListener(String monitorId) { return new HttpServerListener(monitorId); } @Override public void onRequestHandlingFailed(long duration, TimeUnit timeUnit, Throwable throwable) { - processedRequests.increment(); - inflightRequests.decrementAndGet(); - failedRequests.increment(); - requestProcessingTimes.record(duration, timeUnit); + requestProcessing.failure(duration, timeUnit); } @Override public void onRequestHandlingSuccess(long duration, TimeUnit timeUnit) { - inflightRequests.decrementAndGet(); - processedRequests.increment(); - requestProcessingTimes.record(duration, timeUnit); + requestProcessing.success(duration, timeUnit); } @Override public void onResponseWriteSuccess(long duration, TimeUnit timeUnit, int responseCode) { - responseCodesHolder.update(responseCode); - responseWriteTimes.record(duration, timeUnit); + responseWrite.success(duration, timeUnit); } @Override public void onResponseWriteFailed(long duration, TimeUnit timeUnit, Throwable throwable) { - responseWriteFailed.increment(); + responseWrite.failure(duration, timeUnit); } @Override public void onRequestReceiveComplete(long duration, TimeUnit timeUnit) { - requestReadTimes.record(duration, timeUnit); + requestRead.success(duration, timeUnit); } @Override public void onRequestHandlingStart(long duration, TimeUnit timeUnit) { - requestBacklog.decrementAndGet(); + requestProcessing.start(duration, timeUnit); + } + + @Override + public void onRequestHeadersReceived() { + requestRead.start(); } @Override - public void onNewRequestReceived() { - requestBacklog.incrementAndGet(); - inflightRequests.incrementAndGet(); + public void onResponseWriteStart() { + responseWrite.start(); } @Override @@ -221,40 +165,4 @@ public void onWriteSuccess(long duration, TimeUnit timeUnit) { public void onWriteStart() { tcpDelegate.onWriteStart(); } - - public long getLiveConnections() { - return tcpDelegate.getLiveConnections(); - } - - public long getInflightConnections() { - return tcpDelegate.getInflightConnections(); - } - - public long getFailedConnections() { - return tcpDelegate.getFailedConnections(); - } - - public long getPendingWrites() { - return tcpDelegate.getPendingWrites(); - } - - public long getPendingFlushes() { - return tcpDelegate.getPendingFlushes(); - } - - public long getBytesWritten() { - return tcpDelegate.getBytesWritten(); - } - - public long getBytesRead() { - return tcpDelegate.getBytesRead(); - } - - public long getFailedWrites() { - return tcpDelegate.getFailedWrites(); - } - - public long getFailedFlushes() { - return tcpDelegate.getFailedFlushes(); - } } diff --git a/rxnetty-spectator-tcp/build.gradle b/rxnetty-spectator-tcp/build.gradle index b9bb9f68..52ac3b95 100644 --- a/rxnetty-spectator-tcp/build.gradle +++ b/rxnetty-spectator-tcp/build.gradle @@ -22,6 +22,6 @@ targetCompatibility = JavaVersion.VERSION_1_8 dependencies { compile project(':rxnetty-tcp') compile project(':rxnetty-common') - compile 'com.netflix.spectator:spectator-api:0.40.0' + compile 'com.netflix.spectator:spectator-api:0.50.0' compile 'com.netflix.numerus:numerus:1.1' } diff --git a/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/internal/EventMetric.java b/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/internal/EventMetric.java new file mode 100644 index 00000000..526489e4 --- /dev/null +++ b/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/internal/EventMetric.java @@ -0,0 +1,60 @@ +package io.reactivex.netty.spectator.internal; + +import com.netflix.spectator.api.Counter; +import com.netflix.spectator.api.Registry; +import com.netflix.spectator.api.histogram.PercentileTimer; + +import java.util.concurrent.TimeUnit; + +import static io.reactivex.netty.spectator.internal.SpectatorUtils.*; + +public class EventMetric { + + private final Counter start; + private final PercentileTimer startLatency; + + private final Counter success; + private final PercentileTimer successLatency; + + private final Counter failed; + private final PercentileTimer failureLatency; + + public EventMetric(Registry registry, String name, String monitorId, String... tags) { + start = newCounter(registry, name, monitorId, mergeTags(tags, "rtype", "count", "state", "start")); + startLatency = newPercentileTimer(registry, name, monitorId, mergeTags(tags, "rtype", "latency", + "state", "start")); + success = newCounter(registry, name, monitorId, mergeTags(tags, "rtype", "count", "state", "success")); + successLatency = newPercentileTimer(registry, name, monitorId, mergeTags(tags, "rtype", "latency", + "state", "success")); + failed = newCounter(registry, name, monitorId, mergeTags(tags, "rtype", "count", "state", "failed")); + failureLatency = newPercentileTimer(registry, name, monitorId, mergeTags(tags, "rtype", "latency", + "state", "failed")); + } + + public void start() { + start.increment(); + } + + public void start(long duration, TimeUnit timeUnit) { + start.increment(); + startLatency.record(duration, timeUnit); + } + + public void success() { + success.increment(); + } + + public void success(long duration, TimeUnit timeUnit) { + success.increment(); + successLatency.record(duration, timeUnit); + } + + public void failure() { + failed.increment(); + } + + public void failure(long duration, TimeUnit timeUnit) { + failed.increment(); + failureLatency.record(duration, timeUnit); + } +} diff --git a/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/internal/LatencyMetrics.java b/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/internal/LatencyMetrics.java deleted file mode 100644 index 8105c28b..00000000 --- a/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/internal/LatencyMetrics.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.reactivex.netty.spectator.internal; - -import com.netflix.numerus.NumerusProperty; -import com.netflix.numerus.NumerusRollingPercentile; -import com.netflix.spectator.api.Id; -import com.netflix.spectator.api.Registry; -import com.netflix.spectator.api.Spectator; - -import java.util.concurrent.TimeUnit; - -import static com.netflix.numerus.NumerusProperty.Factory.*; - -/** - * A latency metric for publishing various percentiles of latency which spectator does not. - */ -public class LatencyMetrics { - - public enum Percentile { - P5(5.0, "5"), - P25(25.0, "25"), - P50(50.0, "50"), - P75(75.0, "75"), - P90(90.0, "90"), - P99(99.0, "99"), - P99_5(99.5, "99.5"), - MAX(100.0, "Max"), - Min(0.0, "Min"), - Mean(50.0, "Mean"), - ; - - private final double v; - private final String tag; - - Percentile(double v, String tag) { - this.v = v; - this.tag = tag; - } - - public double getValue() { - return v; - } - - public String getTag() { - return tag; - } - } - - private static final NumerusProperty latency_timeInMilliseconds = asProperty(60000); - private static final NumerusProperty latency_numberOfBuckets = asProperty(12); // 12 buckets at 5000ms each - private static final NumerusProperty latency_bucketDataLength = asProperty(1000); - private static final NumerusProperty latency_enabled = asProperty(true); - - private final NumerusRollingPercentile p; - - public LatencyMetrics(String metricName, String monitorId) { - this(metricName, monitorId, Spectator.globalRegistry()); - } - - public LatencyMetrics(String metricName, String monitorId, Registry registry) { - p = new NumerusRollingPercentile(latency_timeInMilliseconds, latency_numberOfBuckets, latency_bucketDataLength, - latency_enabled); - - for (Percentile percentile : Percentile.values()) { - registerPercentileGauge(metricName, monitorId, registry, percentile.v, percentile.getTag()); - } - } - - public void record(long duration, TimeUnit timeUnit) { - p.addValue((int) TimeUnit.MILLISECONDS.convert(duration, timeUnit)); - } - - /*Visible for testing*/NumerusRollingPercentile getPercentileHolder() { - return p; - } - - private void registerPercentileGauge(String metricName, String monitorId, Registry registry, - final double percentile, String percentileTagValue) { - Id id = registry.createId(metricName, "id", monitorId, "percentile", percentileTagValue); - registry.gauge(id, p, value -> value.getPercentile(percentile)); - } -} diff --git a/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/internal/SpectatorUtils.java b/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/internal/SpectatorUtils.java index 0eaf71c5..719ba733 100644 --- a/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/internal/SpectatorUtils.java +++ b/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/internal/SpectatorUtils.java @@ -20,6 +20,7 @@ import com.netflix.spectator.api.Id; import com.netflix.spectator.api.Registry; import com.netflix.spectator.api.Timer; +import com.netflix.spectator.api.histogram.PercentileTimer; public final class SpectatorUtils { private SpectatorUtils() { @@ -52,6 +53,25 @@ public static T newGauge(Registry registry, String name, Stri return registry.gauge(gaugeId, number); } + public static PercentileTimer newPercentileTimer(Registry registry, String name, String id, String... tags) { + Id timerId = registry.createId(name, getTagsWithId(id, tags)); + return PercentileTimer.get(registry, timerId); + } + + public static String[] mergeTags(String[] tags1, String... tags2) { + if (tags1.length == 0) { + return tags2; + } + if (tags2.length == 0) { + return tags1; + } + + String[] toReturn = new String[tags1.length + tags2.length]; + System.arraycopy(tags1, 0, toReturn, 0, tags1.length); + System.arraycopy(tags2, 0, toReturn, tags1.length, tags2.length); + return toReturn; + } + private static String[] getTagsWithId(String id, String[] tags) { String[] allTags = new String[tags.length + 2]; System.arraycopy(tags, 0, allTags, 0, tags.length); diff --git a/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/tcp/TcpClientListener.java b/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/tcp/TcpClientListener.java index 35b6b0d0..2297a364 100644 --- a/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/tcp/TcpClientListener.java +++ b/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/tcp/TcpClientListener.java @@ -21,10 +21,9 @@ import com.netflix.spectator.api.Registry; import com.netflix.spectator.api.Spectator; import io.reactivex.netty.protocol.tcp.client.events.TcpClientEventListener; -import io.reactivex.netty.spectator.internal.LatencyMetrics; +import io.reactivex.netty.spectator.internal.EventMetric; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import static io.reactivex.netty.spectator.internal.SpectatorUtils.*; @@ -33,66 +32,33 @@ */ public class TcpClientListener extends TcpClientEventListener { - private final AtomicInteger liveConnections; - private final Counter connectionCount; - private final AtomicInteger pendingConnects; - private final Counter failedConnects; - private final LatencyMetrics connectionTimes; - - private final AtomicInteger pendingConnectionClose; - private final Counter failedConnectionClose; - - private final AtomicInteger pendingPoolAcquires; - private final Counter failedPoolAcquires; - private final LatencyMetrics poolAcquireTimes; - - private final AtomicInteger pendingPoolReleases; - private final Counter failedPoolReleases; - private final LatencyMetrics poolReleaseTimes; - - private final Counter poolAcquires; + private final EventMetric connection; + private final EventMetric connectionClose; + private final EventMetric poolAcquire; + private final EventMetric poolRelease; private final Counter poolEvictions; private final Counter poolReuse; - private final Counter poolReleases; - private final AtomicInteger pendingWrites; - private final AtomicInteger pendingFlushes; + private final EventMetric write; + private final EventMetric flush; - private final Counter bytesWritten; - private final LatencyMetrics writeTimes; private final Counter bytesRead; - private final Counter failedWrites; - private final Counter failedFlushes; - private final LatencyMetrics flushTimes; + private final Counter bytesWritten; public TcpClientListener(Registry registry, String monitorId) { - liveConnections = newGauge(registry, "liveConnections", monitorId, new AtomicInteger()); - connectionCount = newCounter(registry, "connectionCount", monitorId); - pendingConnects = newGauge(registry, "pendingConnects", monitorId, new AtomicInteger()); - failedConnects = newCounter(registry, "failedConnects", monitorId); - connectionTimes = new LatencyMetrics("connectionTimes", monitorId, registry); - pendingConnectionClose = newGauge(registry, "pendingConnectionClose", monitorId, new AtomicInteger()); - failedConnectionClose = newCounter(registry, "failedConnectionClose", monitorId); - pendingPoolAcquires = newGauge(registry, "pendingPoolAcquires", monitorId, new AtomicInteger()); - poolAcquireTimes = new LatencyMetrics("poolAcquireTimes", monitorId, registry); - failedPoolAcquires = newCounter(registry, "failedPoolAcquires", monitorId); - pendingPoolReleases = newGauge(registry, "pendingPoolReleases", monitorId, new AtomicInteger()); - poolReleaseTimes = new LatencyMetrics("poolReleaseTimes", monitorId, registry); - failedPoolReleases = newCounter(registry, "failedPoolReleases", monitorId); - poolAcquires = newCounter(registry, "poolAcquires", monitorId); - poolEvictions = newCounter(registry, "poolEvictions", monitorId); - poolReuse = newCounter(registry, "poolReuse", monitorId); - poolReleases = newCounter(registry, "poolReleases", monitorId); - - pendingWrites = newGauge(registry, "pendingWrites", monitorId, new AtomicInteger()); - pendingFlushes = newGauge(registry, "pendingFlushes", monitorId, new AtomicInteger()); - - bytesWritten = newCounter(registry, "bytesWritten", monitorId); - writeTimes = new LatencyMetrics("writeTimes", monitorId, registry); - bytesRead = newCounter(registry, "bytesRead", monitorId); - failedWrites = newCounter(registry, "failedWrites", monitorId); - failedFlushes = newCounter(registry, "failedFlushes", monitorId); - flushTimes = new LatencyMetrics("flushTimes", monitorId, registry); + + connection = new EventMetric(registry, "connection", monitorId, "action", "connect"); + connectionClose = new EventMetric(registry, "connection", monitorId, "action", "handle"); + poolAcquire = new EventMetric(registry, "connection.pool", monitorId, "action", "acquire"); + poolRelease = new EventMetric(registry, "connection.pool", monitorId, "action", "release"); + poolEvictions = newCounter(registry, "connection.pool", monitorId, "action", "evict"); + poolReuse = newCounter(registry, "connection.pool", monitorId, "action", "reuse"); + + write = new EventMetric(registry, "writes", monitorId, "action", "write"); + flush = new EventMetric(registry, "writes", monitorId, "action", "flush"); + + bytesWritten = newCounter(registry, "bytes", monitorId, "rtype", "count", "action", "write"); + bytesRead = newCounter(registry, "bytes", monitorId, "rtype", "count", "action", "read"); } public TcpClientListener(String monitorId) { @@ -111,49 +77,42 @@ public void onByteWritten(long bytesWritten) { @Override public void onFlushComplete(long duration, TimeUnit timeUnit) { - pendingFlushes.decrementAndGet(); - flushTimes.record(duration, timeUnit); + flush.success(duration, timeUnit); } @Override public void onFlushStart() { - pendingFlushes.incrementAndGet(); + flush.start(); } @Override public void onWriteFailed(long duration, TimeUnit timeUnit, Throwable throwable) { - pendingWrites.decrementAndGet(); - failedWrites.increment(); + write.failure(duration, timeUnit); } @Override public void onWriteSuccess(long duration, TimeUnit timeUnit) { - pendingWrites.decrementAndGet(); - writeTimes.record(duration, timeUnit); + write.success(duration, timeUnit); } @Override public void onWriteStart() { - pendingWrites.incrementAndGet(); + write.start(); } @Override public void onPoolReleaseFailed(long duration, TimeUnit timeUnit, Throwable throwable) { - pendingPoolReleases.decrementAndGet(); - poolReleases.increment(); - failedPoolReleases.increment(); + poolRelease.failure(duration, timeUnit); } @Override public void onPoolReleaseSuccess(long duration, TimeUnit timeUnit) { - pendingPoolReleases.decrementAndGet(); - poolReleases.increment(); - poolReleaseTimes.record(duration, timeUnit); + poolRelease.success(duration, timeUnit); } @Override public void onPoolReleaseStart() { - pendingPoolReleases.incrementAndGet(); + poolRelease.start(); } @Override @@ -168,137 +127,46 @@ public void onPooledConnectionReuse() { @Override public void onPoolAcquireFailed(long duration, TimeUnit timeUnit, Throwable throwable) { - pendingPoolAcquires.decrementAndGet(); - poolAcquires.increment(); - failedPoolAcquires.increment(); + poolAcquire.failure(duration, timeUnit); } @Override public void onPoolAcquireSuccess(long duration, TimeUnit timeUnit) { - pendingPoolAcquires.decrementAndGet(); - poolAcquires.increment(); - poolAcquireTimes.record(duration, timeUnit); + poolAcquire.success(duration, timeUnit); } @Override public void onPoolAcquireStart() { - pendingPoolAcquires.incrementAndGet(); + poolAcquire.start(); } @Override public void onConnectionCloseFailed(long duration, TimeUnit timeUnit, Throwable throwable) { - liveConnections.decrementAndGet(); // Even though the close failed, the connection isn't live. - pendingConnectionClose.decrementAndGet(); - failedConnectionClose.increment(); + connectionClose.failure(duration, timeUnit); } @Override public void onConnectionCloseSuccess(long duration, TimeUnit timeUnit) { - liveConnections.decrementAndGet(); - pendingConnectionClose.decrementAndGet(); + connectionClose.success(duration, timeUnit); } @Override public void onConnectionCloseStart() { - pendingConnectionClose.incrementAndGet(); + connectionClose.start(); } @Override public void onConnectFailed(long duration, TimeUnit timeUnit, Throwable throwable) { - pendingConnects.decrementAndGet(); - failedConnects.increment(); + connection.failure(duration, timeUnit); } @Override public void onConnectSuccess(long duration, TimeUnit timeUnit) { - pendingConnects.decrementAndGet(); - liveConnections.incrementAndGet(); - connectionCount.increment(); - connectionTimes.record(duration, timeUnit); + connection.success(duration, timeUnit); } @Override public void onConnectStart() { - pendingConnects.incrementAndGet(); - } - - public long getLiveConnections() { - return liveConnections.get(); - } - - public long getConnectionCount() { - return connectionCount.count(); - } - - public long getPendingConnects() { - return pendingConnects.get(); - } - - public long getFailedConnects() { - return failedConnects.count(); - } - - public long getPendingConnectionClose() { - return pendingConnectionClose.get(); - } - - public long getFailedConnectionClose() { - return failedConnectionClose.count(); - } - - public long getPendingPoolAcquires() { - return pendingPoolAcquires.get(); - } - - public long getFailedPoolAcquires() { - return failedPoolAcquires.count(); - } - - public long getPendingPoolReleases() { - return pendingPoolReleases.get(); - } - - public long getFailedPoolReleases() { - return failedPoolReleases.count(); - } - - public long getPoolEvictions() { - return poolEvictions.count(); - } - - public long getPoolReuse() { - return poolReuse.count(); - } - - public long getPendingWrites() { - return pendingWrites.get(); - } - - public long getPendingFlushes() { - return pendingFlushes.get(); - } - - public long getBytesWritten() { - return bytesWritten.count(); - } - - public long getBytesRead() { - return bytesRead.count(); - } - - public long getFailedWrites() { - return failedWrites.count(); - } - - public long getFailedFlushes() { - return failedFlushes.count(); - } - - public long getPoolAcquires() { - return poolAcquires.count(); - } - - public long getPoolReleases() { - return poolReleases.count(); + connection.start(); } } diff --git a/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/tcp/TcpServerListener.java b/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/tcp/TcpServerListener.java index 6682d9fa..11cb8689 100644 --- a/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/tcp/TcpServerListener.java +++ b/rxnetty-spectator-tcp/src/main/java/io/reactivex/netty/spectator/tcp/TcpServerListener.java @@ -21,10 +21,9 @@ import com.netflix.spectator.api.Registry; import com.netflix.spectator.api.Spectator; import io.reactivex.netty.protocol.tcp.server.events.TcpServerEventListener; -import io.reactivex.netty.spectator.internal.LatencyMetrics; +import io.reactivex.netty.spectator.internal.EventMetric; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import static io.reactivex.netty.spectator.internal.SpectatorUtils.*; @@ -33,42 +32,27 @@ */ public class TcpServerListener extends TcpServerEventListener { - private final AtomicInteger liveConnections; - private final AtomicInteger inflightConnections; - private final Counter failedConnections; - private final LatencyMetrics connectionProcessingTimes; - private final AtomicInteger pendingConnectionClose; - private final Counter failedConnectionClose; - private final LatencyMetrics connectionCloseTimes; + private final Counter connectionAccept; - private final AtomicInteger pendingWrites; - private final AtomicInteger pendingFlushes; + private final EventMetric connectionHandling; + private final EventMetric connectionClose; + private final EventMetric write; + private final EventMetric flush; - private final Counter bytesWritten; - private final LatencyMetrics writeTimes; private final Counter bytesRead; - private final Counter failedWrites; - private final Counter failedFlushes; - private final LatencyMetrics flushTimes; + private final Counter bytesWritten; public TcpServerListener(Registry registry, String monitorId) { - liveConnections = newGauge(registry, "liveConnections", monitorId, new AtomicInteger()); - inflightConnections = newGauge(registry, "inflightConnections", monitorId, new AtomicInteger()); - pendingConnectionClose = newGauge(registry, "pendingConnectionClose", monitorId, new AtomicInteger()); - failedConnectionClose = newCounter(registry, "failedConnectionClose", monitorId); - failedConnections = newCounter(registry, "failedConnections", monitorId); - connectionProcessingTimes = new LatencyMetrics("connectionProcessingTimes", monitorId, registry); - connectionCloseTimes = new LatencyMetrics("connectionCloseTimes", monitorId, registry); + connectionAccept = newCounter(registry, "connection", monitorId, "rtype", "count", + "action", "accept"); + connectionHandling = new EventMetric(registry, "connection", monitorId, "action", "handle"); + connectionClose = new EventMetric(registry, "connection", monitorId, "action", "close"); - pendingWrites = newGauge(registry, "pendingWrites", monitorId, new AtomicInteger()); - pendingFlushes = newGauge(registry, "pendingFlushes", monitorId, new AtomicInteger()); + write = new EventMetric(registry, "writes", monitorId, "action", "write"); + flush = new EventMetric(registry, "writes", monitorId, "action", "flush"); - bytesWritten = newCounter(registry, "bytesWritten", monitorId); - writeTimes = new LatencyMetrics("writeTimes", monitorId, registry); - bytesRead = newCounter(registry, "bytesRead", monitorId); - failedWrites = newCounter(registry, "failedWrites", monitorId); - failedFlushes = newCounter(registry, "failedFlushes", monitorId); - flushTimes = new LatencyMetrics("flushTimes", monitorId, registry); + bytesWritten = newCounter(registry, "bytes", monitorId, "rtype", "count", "action", "write"); + bytesRead = newCounter(registry, "bytes", monitorId, "rtype", "count", "action", "read"); } public TcpServerListener(String monitorId) { @@ -77,44 +61,37 @@ public TcpServerListener(String monitorId) { @Override public void onConnectionHandlingFailed(long duration, TimeUnit timeUnit, Throwable throwable) { - inflightConnections.decrementAndGet(); - failedConnections.increment(); + connectionHandling.failure(duration, timeUnit); } @Override public void onConnectionHandlingSuccess(long duration, TimeUnit timeUnit) { - inflightConnections.decrementAndGet(); - connectionProcessingTimes.record(duration, timeUnit); + connectionHandling.success(duration, timeUnit); } @Override public void onConnectionHandlingStart(long duration, TimeUnit timeUnit) { - inflightConnections.incrementAndGet(); + connectionHandling.start(duration, timeUnit); } @Override public void onConnectionCloseStart() { - pendingConnectionClose.incrementAndGet(); + connectionClose.start(); } @Override public void onConnectionCloseSuccess(long duration, TimeUnit timeUnit) { - liveConnections.decrementAndGet(); - pendingConnectionClose.decrementAndGet(); - connectionCloseTimes.record(duration, timeUnit); + connectionClose.success(duration, timeUnit); } @Override public void onConnectionCloseFailed(long duration, TimeUnit timeUnit, Throwable throwable) { - liveConnections.decrementAndGet(); - pendingConnectionClose.decrementAndGet(); - connectionCloseTimes.record(duration, timeUnit); - failedConnectionClose.increment(); + connectionClose.failure(duration, timeUnit); } @Override public void onNewClientConnected() { - liveConnections.incrementAndGet(); + connectionAccept.increment(); } @Override @@ -129,65 +106,26 @@ public void onByteWritten(long bytesWritten) { @Override public void onFlushComplete(long duration, TimeUnit timeUnit) { - pendingFlushes.decrementAndGet(); - flushTimes.record(duration, timeUnit); + flush.success(duration, timeUnit); } @Override public void onFlushStart() { - pendingFlushes.incrementAndGet(); + flush.start(); } @Override public void onWriteFailed(long duration, TimeUnit timeUnit, Throwable throwable) { - pendingWrites.decrementAndGet(); - failedWrites.increment(); + write.failure(duration, timeUnit); } @Override public void onWriteSuccess(long duration, TimeUnit timeUnit) { - pendingWrites.decrementAndGet(); - writeTimes.record(duration, timeUnit); + write.success(duration, timeUnit); } @Override public void onWriteStart() { - pendingWrites.incrementAndGet(); - } - - public long getLiveConnections() { - return liveConnections.get(); - } - - public long getInflightConnections() { - return inflightConnections.get(); - } - - public long getFailedConnections() { - return failedConnections.count(); - } - - public long getPendingWrites() { - return pendingWrites.get(); - } - - public long getPendingFlushes() { - return pendingFlushes.get(); - } - - public long getBytesWritten() { - return bytesWritten.count(); - } - - public long getBytesRead() { - return bytesRead.count(); - } - - public long getFailedWrites() { - return failedWrites.count(); - } - - public long getFailedFlushes() { - return failedFlushes.count(); + write.start(); } }