From e3927c4ad3764cc6a8784bda3a6112cf2d97f8ba Mon Sep 17 00:00:00 2001 From: Brian Harrington Date: Tue, 20 Oct 2015 09:16:43 -0700 Subject: [PATCH] helper utilities for Observable Currently provides transformers that can be used with Observable.compose for: * gzip compression/decompression * aggregating to byte array * aggregating to String We'll look at an SSE decoder based on RxNetty's version later. --- .../java/com/netflix/iep/http/ByteBufs.java | 189 ++++++++++++++++++ .../com/netflix/iep/http/ByteBufsTest.java | 97 +++++++++ 2 files changed, 286 insertions(+) create mode 100644 iep-rxhttp/src/main/java/com/netflix/iep/http/ByteBufs.java create mode 100644 iep-rxhttp/src/test/java/com/netflix/iep/http/ByteBufsTest.java diff --git a/iep-rxhttp/src/main/java/com/netflix/iep/http/ByteBufs.java b/iep-rxhttp/src/main/java/com/netflix/iep/http/ByteBufs.java new file mode 100644 index 00000000..9341876e --- /dev/null +++ b/iep-rxhttp/src/main/java/com/netflix/iep/http/ByteBufs.java @@ -0,0 +1,189 @@ +/* + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.iep.http; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.compression.JdkZlibDecoder; +import io.netty.handler.codec.compression.JdkZlibEncoder; +import io.netty.handler.codec.compression.ZlibWrapper; +import io.netty.handler.codec.json.JsonObjectDecoder; +import rx.Observable; +import rx.Subscriber; +import rx.functions.Func1; +import rx.functions.Func2; + +import java.io.UnsupportedEncodingException; + +/** + * Helper operations for working with {@code Observable} objects. + */ +public final class ByteBufs { + + private ByteBufs() { + } + + private static Observable encode(final Observable input, final EmbeddedChannel channel) { + return Observable.create(new Observable.OnSubscribe() { + @Override public void call(final Subscriber subscriber) { + input.subscribe(new EncoderSubscriber(subscriber, channel)); + } + }); + } + + private static Observable decode(final Observable input, final EmbeddedChannel channel) { + return Observable.create(new Observable.OnSubscribe() { + @Override public void call(final Subscriber subscriber) { + input.subscribe(new DecoderSubscriber(subscriber, channel)); + } + }); + } + + /** + * Map to a GZIP compressed sequence of ByteBuf. + */ + public static Observable.Transformer gzip() { + return input -> encode(input, new EmbeddedChannel(new JdkZlibEncoder(ZlibWrapper.GZIP))); + } + + /** + * Map a GZIP compressed sequence of ByteBufs to a decompressed sequence. + */ + public static Observable.Transformer gunzip() { + return input -> decode(input, new EmbeddedChannel(new JdkZlibDecoder(ZlibWrapper.GZIP))); + } + + /** + * Process json data encoded as a sequence of ByteBufs so that each object within an array + * will be a single ByteBuf in the output. + */ + public static Observable.Transformer json() { + return input -> decode(input, new EmbeddedChannel(new JsonObjectDecoder(true))); + } + + /** + * Map a ByteBuf to a byte array. + */ + public static Func1 toByteArray() { + return bufs -> { + byte[] output = new byte[bufs.readableBytes()]; + bufs.readBytes(output, 0, bufs.readableBytes()); + return output; + }; + } + + private static Func2 append() { + return (bufs, buf) -> { + bufs.addComponent(buf); + bufs.writerIndex(bufs.writerIndex() + buf.readableBytes()); + return bufs; + }; + } + + /** + * Create an aggregated byte array with all data from the ByteBufs in the input observable. + */ + public static Observable.Transformer aggrByteArray() { + return input -> input.reduce(Unpooled.compositeBuffer(), append()).map(toByteArray()); + } + + private static String newString(byte[] buf, String enc) { + try { + return new String(buf, enc); + } catch (UnsupportedEncodingException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * Create a string using the aggregated content from the ByteBufs in the input observable. + */ + public static Observable.Transformer toString(String enc) { + return input -> input.compose(aggrByteArray()).map(buf -> newString(buf, enc)); + } + + private static class EncoderSubscriber extends Subscriber { + + private final Subscriber consumer; + private final EmbeddedChannel channel; + + public EncoderSubscriber(Subscriber consumer, EmbeddedChannel channel) { + this.consumer = consumer; + this.channel = channel; + } + + @Override + public void onNext(ByteBuf buf) { + channel.writeOutbound(buf); + ByteBuf msg; + while ((msg = channel.readOutbound()) != null) { + consumer.onNext(msg); + } + } + + @Override + public void onCompleted() { + channel.finish(); + ByteBuf msg; + while ((msg = channel.readOutbound()) != null) { + consumer.onNext(msg); + } + consumer.onCompleted(); + } + + @Override + public void onError(Throwable throwable) { + consumer.onError(throwable); + } + } + + private static class DecoderSubscriber extends Subscriber { + + private final Subscriber consumer; + private final EmbeddedChannel channel; + + public DecoderSubscriber(Subscriber consumer, EmbeddedChannel channel) { + this.consumer = consumer; + this.channel = channel; + } + + @Override + public void onNext(ByteBuf buf) { + channel.writeInbound(buf); + T msg; + while ((msg = channel.readInbound()) != null) { + consumer.onNext(msg); + } + } + + @Override + public void onCompleted() { + channel.finish(); + T msg; + while ((msg = channel.readInbound()) != null) { + consumer.onNext(msg); + } + consumer.onCompleted(); + } + + @Override + public void onError(Throwable throwable) { + consumer.onError(throwable); + } + } +} \ No newline at end of file diff --git a/iep-rxhttp/src/test/java/com/netflix/iep/http/ByteBufsTest.java b/iep-rxhttp/src/test/java/com/netflix/iep/http/ByteBufsTest.java new file mode 100644 index 00000000..dfcb8e91 --- /dev/null +++ b/iep-rxhttp/src/test/java/com/netflix/iep/http/ByteBufsTest.java @@ -0,0 +1,97 @@ +/* + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.iep.http; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import rx.Observable; +import rx.functions.Action1; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.nio.charset.Charset; +import java.util.zip.GZIPInputStream; + +@RunWith(JUnit4.class) +public class ByteBufsTest { + + private byte[] gzipDecompress(byte[] data) throws Exception { + ByteArrayInputStream bais = new ByteArrayInputStream(data); + try (GZIPInputStream in = new GZIPInputStream(bais)) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] buf = new byte[4096]; + int length; + while ((length = in.read(buf)) > 0) { + baos.write(buf, 0, length); + } + return baos.toByteArray(); + } + } + + private Observable obs(byte[] data) { + return Observable.just(Unpooled.wrappedBuffer(data)); + } + + @Test + public void gzip() throws Exception { + byte[] data = "foo".getBytes("UTF-8"); + byte[] compressed = obs(data) + .compose(ByteBufs.gzip()) + .compose(ByteBufs.aggrByteArray()) + .toBlocking().first(); + Assert.assertArrayEquals(data, gzipDecompress(compressed)); + } + + @Test + public void gunzip() throws Exception { + byte[] data = "foo".getBytes("UTF-8"); + byte[] decompressed = obs(data) + .compose(ByteBufs.gzip()) + .compose(ByteBufs.gunzip()) + .compose(ByteBufs.aggrByteArray()) + .toBlocking().first(); + Assert.assertArrayEquals(data, decompressed); + } + + @Test + public void json() throws Exception { + byte[] data = "[{\"a\":1},{\"a\":2},{\"a\":3},{\"a\":4},{\"a\":5}]".getBytes("UTF-8"); + obs(data).compose(ByteBufs.json()).toBlocking().forEach(new Action1() { + private int i = 0; + @Override + public void call(ByteBuf byteBuf) { + String obj = byteBuf.toString(Charset.forName("UTF-8")); + Assert.assertEquals(String.format("{\"a\":%d}", ++i), obj); + } + }); + } + + @Test + public void bufToString() throws Exception { + String result = Observable.merge( + obs("foo".getBytes("UTF-8")), + obs("bar".getBytes("UTF-8")), + obs("baz".getBytes("UTF-8"))) + .compose(ByteBufs.toString("UTF-8")) + .toBlocking() + .first(); + Assert.assertEquals("foobarbaz", result); + } +} \ No newline at end of file