Skip to content

Commit

Permalink
Merge pull request #183 from brharrington/bytebufs
Browse files Browse the repository at this point in the history
helper utilities for Observable<ByteBuf>
  • Loading branch information
brharrington committed Oct 20, 2015
2 parents 0fc9f93 + e3927c4 commit c8373bb
Show file tree
Hide file tree
Showing 2 changed files with 286 additions and 0 deletions.
189 changes: 189 additions & 0 deletions iep-rxhttp/src/main/java/com/netflix/iep/http/ByteBufs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.iep.http;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.compression.JdkZlibDecoder;
import io.netty.handler.codec.compression.JdkZlibEncoder;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.codec.json.JsonObjectDecoder;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
import rx.functions.Func2;

import java.io.UnsupportedEncodingException;

/**
* Helper operations for working with {@code Observable<ByteBuf>} objects.
*/
public final class ByteBufs {

private ByteBufs() {
}

private static Observable<ByteBuf> encode(final Observable<ByteBuf> input, final EmbeddedChannel channel) {
return Observable.create(new Observable.OnSubscribe<ByteBuf>() {
@Override public void call(final Subscriber<? super ByteBuf> subscriber) {
input.subscribe(new EncoderSubscriber(subscriber, channel));
}
});
}

private static <T> Observable<T> decode(final Observable<ByteBuf> input, final EmbeddedChannel channel) {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override public void call(final Subscriber<? super T> subscriber) {
input.subscribe(new DecoderSubscriber<T>(subscriber, channel));
}
});
}

/**
* Map to a GZIP compressed sequence of ByteBuf.
*/
public static Observable.Transformer<ByteBuf, ByteBuf> gzip() {
return input -> encode(input, new EmbeddedChannel(new JdkZlibEncoder(ZlibWrapper.GZIP)));
}

/**
* Map a GZIP compressed sequence of ByteBufs to a decompressed sequence.
*/
public static Observable.Transformer<ByteBuf, ByteBuf> gunzip() {
return input -> decode(input, new EmbeddedChannel(new JdkZlibDecoder(ZlibWrapper.GZIP)));
}

/**
* Process json data encoded as a sequence of ByteBufs so that each object within an array
* will be a single ByteBuf in the output.
*/
public static Observable.Transformer<ByteBuf, ByteBuf> json() {
return input -> decode(input, new EmbeddedChannel(new JsonObjectDecoder(true)));
}

/**
* Map a ByteBuf to a byte array.
*/
public static Func1<ByteBuf, byte[]> toByteArray() {
return bufs -> {
byte[] output = new byte[bufs.readableBytes()];
bufs.readBytes(output, 0, bufs.readableBytes());
return output;
};
}

private static Func2<CompositeByteBuf, ByteBuf, CompositeByteBuf> append() {
return (bufs, buf) -> {
bufs.addComponent(buf);
bufs.writerIndex(bufs.writerIndex() + buf.readableBytes());
return bufs;
};
}

/**
* Create an aggregated byte array with all data from the ByteBufs in the input observable.
*/
public static Observable.Transformer<ByteBuf, byte[]> aggrByteArray() {
return input -> input.reduce(Unpooled.compositeBuffer(), append()).map(toByteArray());
}

private static String newString(byte[] buf, String enc) {
try {
return new String(buf, enc);
} catch (UnsupportedEncodingException e) {
throw new IllegalArgumentException(e);
}
}

/**
* Create a string using the aggregated content from the ByteBufs in the input observable.
*/
public static Observable.Transformer<ByteBuf, String> toString(String enc) {
return input -> input.compose(aggrByteArray()).map(buf -> newString(buf, enc));
}

private static class EncoderSubscriber extends Subscriber<ByteBuf> {

private final Subscriber<? super ByteBuf> consumer;
private final EmbeddedChannel channel;

public EncoderSubscriber(Subscriber<? super ByteBuf> consumer, EmbeddedChannel channel) {
this.consumer = consumer;
this.channel = channel;
}

@Override
public void onNext(ByteBuf buf) {
channel.writeOutbound(buf);
ByteBuf msg;
while ((msg = channel.readOutbound()) != null) {
consumer.onNext(msg);
}
}

@Override
public void onCompleted() {
channel.finish();
ByteBuf msg;
while ((msg = channel.readOutbound()) != null) {
consumer.onNext(msg);
}
consumer.onCompleted();
}

@Override
public void onError(Throwable throwable) {
consumer.onError(throwable);
}
}

private static class DecoderSubscriber<T> extends Subscriber<ByteBuf> {

private final Subscriber<? super T> consumer;
private final EmbeddedChannel channel;

public DecoderSubscriber(Subscriber<? super T> consumer, EmbeddedChannel channel) {
this.consumer = consumer;
this.channel = channel;
}

@Override
public void onNext(ByteBuf buf) {
channel.writeInbound(buf);
T msg;
while ((msg = channel.readInbound()) != null) {
consumer.onNext(msg);
}
}

@Override
public void onCompleted() {
channel.finish();
T msg;
while ((msg = channel.readInbound()) != null) {
consumer.onNext(msg);
}
consumer.onCompleted();
}

@Override
public void onError(Throwable throwable) {
consumer.onError(throwable);
}
}
}
97 changes: 97 additions & 0 deletions iep-rxhttp/src/test/java/com/netflix/iep/http/ByteBufsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.iep.http;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import rx.Observable;
import rx.functions.Action1;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.charset.Charset;
import java.util.zip.GZIPInputStream;

@RunWith(JUnit4.class)
public class ByteBufsTest {

private byte[] gzipDecompress(byte[] data) throws Exception {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
try (GZIPInputStream in = new GZIPInputStream(bais)) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buf = new byte[4096];
int length;
while ((length = in.read(buf)) > 0) {
baos.write(buf, 0, length);
}
return baos.toByteArray();
}
}

private Observable<ByteBuf> obs(byte[] data) {
return Observable.just(Unpooled.wrappedBuffer(data));
}

@Test
public void gzip() throws Exception {
byte[] data = "foo".getBytes("UTF-8");
byte[] compressed = obs(data)
.compose(ByteBufs.gzip())
.compose(ByteBufs.aggrByteArray())
.toBlocking().first();
Assert.assertArrayEquals(data, gzipDecompress(compressed));
}

@Test
public void gunzip() throws Exception {
byte[] data = "foo".getBytes("UTF-8");
byte[] decompressed = obs(data)
.compose(ByteBufs.gzip())
.compose(ByteBufs.gunzip())
.compose(ByteBufs.aggrByteArray())
.toBlocking().first();
Assert.assertArrayEquals(data, decompressed);
}

@Test
public void json() throws Exception {
byte[] data = "[{\"a\":1},{\"a\":2},{\"a\":3},{\"a\":4},{\"a\":5}]".getBytes("UTF-8");
obs(data).compose(ByteBufs.json()).toBlocking().forEach(new Action1<ByteBuf>() {
private int i = 0;
@Override
public void call(ByteBuf byteBuf) {
String obj = byteBuf.toString(Charset.forName("UTF-8"));
Assert.assertEquals(String.format("{\"a\":%d}", ++i), obj);
}
});
}

@Test
public void bufToString() throws Exception {
String result = Observable.merge(
obs("foo".getBytes("UTF-8")),
obs("bar".getBytes("UTF-8")),
obs("baz".getBytes("UTF-8")))
.compose(ByteBufs.toString("UTF-8"))
.toBlocking()
.first();
Assert.assertEquals("foobarbaz", result);
}
}

0 comments on commit c8373bb

Please sign in to comment.