Skip to content

Commit

Permalink
Add metrics to SingleThreadedChunker
Browse files Browse the repository at this point in the history
Using a similar set of metrics to what the `TimedChunker` uses.  It's
helpful for diagnosing queue depth issues.  When we switched to the SPSC
queue, we were surprised the metrics were missing so I figured we could
add them in.
  • Loading branch information
timmartin-stripe committed Jan 7, 2025
1 parent e886a0c commit 99570ee
Showing 1 changed file with 23 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package io.reactivex.mantis.network.push;

import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
Expand All @@ -33,6 +37,11 @@ public class SingleThreadedChunker<T> implements Callable<Void> {
int iteration = 0;
private int index = 0;

private final Counter numEventsDrained;
private final Counter drainTriggeredByTimer;
private final Counter drainTriggeredByBatch;


public SingleThreadedChunker(ChunkProcessor<T> processor, MonitoredQueue<T> iQ, int chunkSize, long maxChunkInterval, ConnectionManager<T> connMgr) {
this.inputQueue = iQ;
this.chunkSize = chunkSize;
Expand All @@ -41,6 +50,17 @@ public SingleThreadedChunker(ChunkProcessor<T> processor, MonitoredQueue<T> iQ,
this.connectionManager = connMgr;
chunk = new Object[this.chunkSize];

MetricGroupId metricsGroup = new MetricGroupId("SingleThreadedChunker");
Metrics metrics = new Metrics.Builder()
.id(metricsGroup)
.addCounter("numEventsDrained")
.addCounter("drainTriggeredByTimer")
.addCounter("drainTriggeredByBatch")
.build();
numEventsDrained = metrics.getCounter("numEventsDrained");
drainTriggeredByTimer = metrics.getCounter("drainTriggeredByTimer");
drainTriggeredByBatch = metrics.getCounter("drainTriggeredByBatch");
MetricsRegistry.getInstance().registerAndGet(metrics);
}

private boolean stopCondition() {
Expand All @@ -58,6 +78,7 @@ public Void call() throws Exception {

long currTime = System.currentTimeMillis();
if (currTime - maxChunkInterval > chunkStartTime) {
drainTriggeredByTimer.increment();
drain();
}
iteration = 0;
Expand All @@ -73,6 +94,7 @@ public Void call() throws Exception {
chunk[index++] = ele;
}
} else {
drainTriggeredByBatch.increment();
drain();
chunkStartTime = System.currentTimeMillis();
if (stopCondition()) {
Expand All @@ -93,6 +115,7 @@ private void drain() {
}

processor.process(connectionManager, copy);
numEventsDrained.increment(copy.size());
index = 0;
}
}
Expand Down

0 comments on commit 99570ee

Please sign in to comment.