Skip to content

Commit

Permalink
Reactor Optimizations
Browse files Browse the repository at this point in the history
This change takes advantage of some new features in Reactor Netty.
  • Loading branch information
nebhale committed May 6, 2016
1 parent 6a84057 commit 945caec
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@
package org.cloudfoundry.reactor.client;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import org.cloudfoundry.client.v2.CloudFoundryException;
import org.springframework.web.client.HttpStatusCodeException;
import reactor.core.publisher.Mono;
import reactor.core.util.Exceptions;
import reactor.io.netty.http.HttpException;

import java.io.IOException;
import java.io.InputStream;
import java.util.Map;

public final class CloudFoundryExceptionBuilder {
Expand Down Expand Up @@ -65,12 +62,9 @@ public static CloudFoundryException build(HttpStatusCodeException cause) { // T
*/
@SuppressWarnings("unchecked")
public static <T> Mono<T> build(HttpException cause) {
return cause.getChannel().receive()
.reduceWith(Unpooled::compositeBuffer, (prev, next) -> prev.addComponent(next.retain()))
.then(byteBufs -> {
byteBufs.setIndex(0, byteBufs.capacity());

try (InputStream in = new ByteBufInputStream(byteBufs)) {
return cause.getChannel().receive().aggregate().toInputStream()
.then(in -> {
try {
Map<String, ?> response = OBJECT_MAPPER.readValue(in, Map.class);
Integer code = (Integer) response.get("code");
String description = (String) response.get("description");
Expand All @@ -79,8 +73,6 @@ public static <T> Mono<T> build(HttpException cause) {
return Mono.error(new CloudFoundryException(code, description, errorCode, cause));
} catch (IOException e) {
throw Exceptions.propagate(e);
} finally {
byteBufs.release();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import reactor.core.publisher.Mono;
import reactor.core.tuple.Tuple;
import reactor.core.tuple.Tuple2;
import reactor.io.netty.common.NettyInbound;
import reactor.io.netty.http.HttpClient;
import reactor.io.netty.http.HttpInbound;
import reactor.io.netty.http.HttpOutbound;
Expand Down Expand Up @@ -66,18 +65,19 @@ protected final <REQ extends Validatable, RSP> Mono<RSP> doDelete(REQ request, C
.map(o -> requestTransformer.apply(Tuple.of(o, validRequest)))
.doOnSubscribe(s -> this.requestLogger.debug("DELETE {}", uri))
.then(o -> o.send(Mono.just(validRequest)
.as(JsonCodec.encode(this.objectMapper, o)))))
.where(req -> this.objectMapper.canSerialize(req.getClass()))
.map(JsonCodec.encode(this.objectMapper, o)))))
.doOnSuccess(inbound -> this.responseLogger.debug("{} {}", inbound.status().code(), uri))
.doOnSuccess(inbound -> printWarnings(inbound, this.responseLogger, uri))))
.flatMap(NettyInbound::receive)
.as(JsonCodec.decode(this.objectMapper, responseType));
.then(inbound -> inbound.receive().aggregate().toInputStream())
.map(JsonCodec.decode(this.objectMapper, responseType));
}

protected final <REQ extends Validatable, RSP> Mono<RSP> doGet(REQ request, Class<RSP> responseType, Function<Tuple2<UriComponentsBuilder, REQ>, UriComponentsBuilder> uriTransformer,
Function<Tuple2<HttpOutbound, REQ>, HttpOutbound> requestTransformer) {
return doGet(request, uriTransformer, requestTransformer)
.flatMap(NettyInbound::receive)
.as(JsonCodec.decode(this.objectMapper, responseType));
.then(inbound -> inbound.receive().aggregate().toInputStream())
.map(JsonCodec.decode(this.objectMapper, responseType));
}

protected final <REQ extends Validatable> Mono<HttpInbound> doGet(REQ request, Function<Tuple2<UriComponentsBuilder, REQ>, UriComponentsBuilder> uriTransformer,
Expand All @@ -104,11 +104,12 @@ protected final <REQ extends Validatable, RSP> Mono<RSP> doPatch(REQ request, Cl
.map(o -> requestTransformer.apply(Tuple.of(o, validRequest)))
.doOnSubscribe(s -> this.requestLogger.debug("PATCH {}", uri))
.then(o -> o.send(Mono.just(validRequest)
.as(JsonCodec.encode(this.objectMapper, o)))))
.where(req -> this.objectMapper.canSerialize(req.getClass()))
.map(JsonCodec.encode(this.objectMapper, o)))))
.doOnSuccess(inbound -> this.responseLogger.debug("{} {}", inbound.status().code(), uri))
.doOnSuccess(inbound -> printWarnings(inbound, this.responseLogger, uri))))
.flatMap(NettyInbound::receive)
.as(JsonCodec.decode(this.objectMapper, responseType));
.then(inbound -> inbound.receive().aggregate().toInputStream())
.map(JsonCodec.decode(this.objectMapper, responseType));
}

protected final <REQ extends Validatable, RSP> Mono<RSP> doPost(REQ request, Class<RSP> responseType, Function<Tuple2<UriComponentsBuilder, REQ>, UriComponentsBuilder> uriTransformer,
Expand All @@ -121,11 +122,12 @@ protected final <REQ extends Validatable, RSP> Mono<RSP> doPost(REQ request, Cla
.map(o -> requestTransformer.apply(Tuple.of(o, validRequest)))
.doOnSubscribe(s -> this.requestLogger.debug("POST {}", uri))
.then(o -> o.send(Mono.just(validRequest)
.as(JsonCodec.encode(this.objectMapper, o)))))
.where(req -> this.objectMapper.canSerialize(req.getClass()))
.map(JsonCodec.encode(this.objectMapper, o)))))
.doOnSuccess(inbound -> this.responseLogger.debug("{} {}", inbound.status().code(), uri))
.doOnSuccess(inbound -> printWarnings(inbound, this.responseLogger, uri))))
.flatMap(NettyInbound::receive)
.as(JsonCodec.decode(this.objectMapper, responseType));
.then(inbound -> inbound.receive().aggregate().toInputStream())
.map(JsonCodec.decode(this.objectMapper, responseType));
}

protected final <REQ extends Validatable, RSP> Mono<RSP> doPut(REQ request, Class<RSP> responseType, Function<Tuple2<UriComponentsBuilder, REQ>, UriComponentsBuilder> uriTransformer,
Expand All @@ -138,11 +140,12 @@ protected final <REQ extends Validatable, RSP> Mono<RSP> doPut(REQ request, Clas
.map(o -> requestTransformer.apply(Tuple.of(o, validRequest)))
.doOnSubscribe(s -> this.requestLogger.debug("PUT {}", uri))
.then(o -> o.send(Mono.just(validRequest)
.as(JsonCodec.encode(this.objectMapper, o)))))
.where(req -> this.objectMapper.canSerialize(req.getClass()))
.map(JsonCodec.encode(this.objectMapper, o)))))
.doOnSuccess(inbound -> this.responseLogger.debug("{} {}", inbound.status().code(), uri))
.doOnSuccess(inbound -> printWarnings(inbound, this.responseLogger, uri))))
.flatMap(NettyInbound::receive)
.as(JsonCodec.decode(this.objectMapper, responseType));
.then(inbound -> inbound.receive().aggregate().toInputStream())
.map(JsonCodec.decode(this.objectMapper, responseType));
}

protected final <REQ extends Validatable> Mono<HttpInbound> doWs(REQ request, Function<Tuple2<UriComponentsBuilder, REQ>, UriComponentsBuilder> uriTransformer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,10 @@

package org.cloudfoundry.reactor.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.AsciiString;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSource;
import reactor.core.subscriber.SubscriberBarrier;
import reactor.core.util.Exceptions;
import reactor.io.netty.http.HttpOutbound;

Expand All @@ -41,177 +33,27 @@ final class JsonCodec {

private static final AsciiString APPLICATION_JSON = new AsciiString("application/json");

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

static <T> Function<Flux<ByteBuf>, Mono<T>> decode(Class<T> type) {
return decode(OBJECT_MAPPER, type);
}

static <T> Function<Flux<ByteBuf>, Mono<T>> decode(ObjectMapper objectMapper, Class<T> type) {
return source -> new JsonDecoder<>(source, objectMapper, type);
}

static <T> Function<Mono<T>, Mono<ByteBuf>> encode(HttpOutbound httpOutbound) {
return encode(OBJECT_MAPPER, httpOutbound);
}

static <T> Function<Mono<T>, Mono<ByteBuf>> encode(ObjectMapper objectMapper, HttpOutbound httpOutbound) {
httpOutbound.header(CONTENT_TYPE, APPLICATION_JSON);
return source -> new JsonEncoder<>(source, objectMapper);
}

private static final class JsonDecoder<T> extends MonoSource<ByteBuf, T> {

private final ObjectMapper objectMapper;

private final Class<T> type;

private JsonDecoder(Publisher<? extends ByteBuf> source, ObjectMapper objectMapper, Class<T> type) {
super(source);
this.objectMapper = objectMapper;
this.type = type;
}

@Override
public void subscribe(Subscriber<? super T> subscriber) {
this.source.subscribe(new JsonDecoderSubscriber<>(subscriber, this.objectMapper, this.type));
}
}

private static final class JsonDecoderSubscriber<T> extends SubscriberBarrier<ByteBuf, T> {

private final ObjectMapper objectMapper;

private final Class<T> type;

private CompositeByteBuf byteBuf;

private boolean done = false;

private JsonDecoderSubscriber(Subscriber<? super T> subscriber, ObjectMapper objectMapper, Class<T> type) {
super(subscriber);
this.objectMapper = objectMapper;
this.type = type;
}

@Override
protected void doComplete() {
if (this.done) {
return;
}

if (this.byteBuf != null) {
this.byteBuf.setIndex(0, this.byteBuf.capacity());

try (InputStream in = new ByteBufInputStream(this.byteBuf)) {
this.subscriber.onNext(this.objectMapper.readValue(in, this.type));
} catch (IOException e) {
onError(e);
} finally {
this.byteBuf.release();
}
}

this.done = true;
this.subscriber.onComplete();
}

@Override
protected void doError(Throwable throwable) {
if (this.done) {
Exceptions.onErrorDropped(throwable);
return;
}

if (this.byteBuf != null) {
this.byteBuf.release();
}

this.done = true;
this.subscriber.onError(throwable);
}

@Override
protected void doNext(ByteBuf byteBuf) {
if (this.done) {
Exceptions.onNextDropped(byteBuf);
return;
}

if (this.byteBuf == null) {
this.byteBuf = Unpooled.compositeBuffer();
static <T> Function<InputStream, T> decode(ObjectMapper objectMapper, Class<T> type) {
return in -> {
try {
return objectMapper.readValue(in, type);
} catch (IOException e) {
throw Exceptions.propagate(e);
}

this.byteBuf.addComponent(byteBuf.retain());
}

};
}

private static final class JsonEncoder<T> extends MonoSource<T, ByteBuf> {

private final ObjectMapper objectMapper;

private JsonEncoder(Publisher<? extends T> source, ObjectMapper objectMapper) {
super(source);
this.objectMapper = objectMapper;
}

@Override
public void subscribe(Subscriber<? super ByteBuf> subscriber) {
this.source.subscribe(new JsonEncoderSubscriber<>(subscriber, this.objectMapper));
}
}

private static final class JsonEncoderSubscriber<T> extends SubscriberBarrier<T, ByteBuf> {

private final ObjectMapper objectMapper;

private boolean done = false;

private JsonEncoderSubscriber(Subscriber<? super ByteBuf> subscriber, ObjectMapper objectMapper) {
super(subscriber);
this.objectMapper = objectMapper;
}

@Override
protected void doComplete() {
if (this.done) {
return;
}

this.done = true;
this.subscriber.onComplete();
}

@Override
protected void doError(Throwable throwable) {
if (this.done) {
Exceptions.onErrorDropped(throwable);
return;
}

this.done = true;
this.subscriber.onError(throwable);
}

@Override
protected void doNext(T t) {
if (this.done) {
Exceptions.onNextDropped(t);
return;
}

if (!this.objectMapper.canSerialize(t.getClass())) {
return;
}
static <T> Function<T, ByteBuf> encode(ObjectMapper objectMapper, HttpOutbound httpOutbound) {
httpOutbound.header(CONTENT_TYPE, APPLICATION_JSON);

return source -> {
try {
this.subscriber.onNext(Unpooled.wrappedBuffer(this.objectMapper.writeValueAsBytes(t)));
} catch (IOException e) {
onError(e);
return httpOutbound.delegate().alloc().directBuffer()
.writeBytes(objectMapper.writeValueAsBytes(source));
} catch (JsonProcessingException e) {
throw Exceptions.propagate(e);
}
}

};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import reactor.core.publisher.Mono;
import reactor.io.netty.config.ClientOptions;
import reactor.io.netty.http.HttpClient;
import reactor.io.netty.http.HttpInbound;

import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -54,8 +53,8 @@ public final class DefaultConnectionContext implements ConnectionContext {
this.sslCertificateTruster = createSslCertificateTruster(trustCertificates);
this.httpClient = createHttpClient(this.sslCertificateTruster);
this.root = getRoot(host, port, this.sslCertificateTruster);
this.info = getInfo(this.httpClient, this.root);
this.objectMapper = getObjectMapper(objectMapper);
this.info = getInfo(this.httpClient, this.objectMapper, this.root);
}

@Override
Expand Down Expand Up @@ -102,12 +101,12 @@ private static Optional<SslCertificateTruster> createSslCertificateTruster(Boole
}

@SuppressWarnings("unchecked")
private static Mono<Map<String, String>> getInfo(HttpClient httpClient, Mono<String> root) {
private static Mono<Map<String, String>> getInfo(HttpClient httpClient, ObjectMapper objectMapper, Mono<String> root) {
return root
.map(uri -> UriComponentsBuilder.fromUriString(uri).pathSegment("v2", "info").build().toUriString())
.then(httpClient::get)
.flatMap(HttpInbound::receive)
.as(JsonCodec.decode(Map.class))
.then(inbound -> inbound.receive().aggregate().toInputStream())
.map(JsonCodec.decode(objectMapper, Map.class))
.map(m -> (Map<String, String>) m)
.cache();
}
Expand Down

0 comments on commit 945caec

Please sign in to comment.