Skip to content

Commit

Permalink
Add a rebatch call to RemoteObservable connections between stages.
Browse files Browse the repository at this point in the history
  • Loading branch information
crioux-stripe committed Jan 10, 2025
1 parent a18a200 commit 7e61424
Showing 1 changed file with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.channel.ObservableConnection;
Expand Down Expand Up @@ -72,6 +73,10 @@ public class RemoteObservable {
private static boolean enableNettyLogging = false;
private static boolean enableCompression = true;
private static int maxFrameLength = 5242880; // 5 MB max frame
private static int bufferSize = 0;
private static final String DEFAULT_BUFFER_SIZE_STR = "0";



// NJ
static {
Expand Down Expand Up @@ -106,6 +111,9 @@ private static void loadFastProperties() {
if (maxFrameLengthStr != null && maxFrameLengthStr.length() > 0) {
maxFrameLength = Integer.parseInt(maxFrameLengthStr);
}

String bufferSizeStr = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantisClient.buffer.size", DEFAULT_BUFFER_SIZE_STR);
bufferSize = Integer.parseInt(Optional.ofNullable(bufferSizeStr).orElse(DEFAULT_BUFFER_SIZE_STR));
}

private static Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> retryLogic(final
Expand Down Expand Up @@ -249,7 +257,8 @@ public Observable<RemoteRxEvent> call(final ObservableConnection<RemoteRxEvent,
connection.writeAndFlush(RemoteRxEvent.subscribed(params.getName(), params.getSubscribeParameters())); // send subscribe event to server
remoteUnsubscribe.setConnection(connection);
return connection.getInput()
.lift(new DropOperator<RemoteRxEvent>("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups"));
.lift(new DropOperator<RemoteRxEvent>("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups"))
.rebatchRequests(bufferSize <= 0 ? 1 : bufferSize);
}
})
.doOnCompleted(new Action0() {
Expand Down Expand Up @@ -394,7 +403,8 @@ public Observable<RemoteRxEvent> call(final ObservableConnection<RemoteRxEvent,
connection.writeAndFlush(RemoteRxEvent.subscribed(params.getName(), params.getSubscribeParameters())); // send subscribe event to server
remoteUnsubscribe.setConnection(connection);
return connection.getInput()
.lift(new DropOperator<RemoteRxEvent>("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups"));
.lift(new DropOperator<RemoteRxEvent>("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups"))
.rebatchRequests(bufferSize <= 0 ? 1 : bufferSize);
}
})
.doOnCompleted(new Action0() {
Expand Down Expand Up @@ -518,7 +528,8 @@ public Observable<RemoteRxEvent> call(final ObservableConnection<RemoteRxEvent,
connection.writeAndFlush(RemoteRxEvent.subscribed(params.getName(), params.getSubscribeParameters())); // send subscribe event to server
remoteUnsubscribe.setConnection(connection);
return connection.getInput()
.lift(new DropOperator<RemoteRxEvent>("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServer"));
.lift(new DropOperator<RemoteRxEvent>("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServer"))
.rebatchRequests(bufferSize <= 0 ? 1 : bufferSize);
}
})
.doOnCompleted(new Action0() {
Expand Down

0 comments on commit 7e61424

Please sign in to comment.