-
Notifications
You must be signed in to change notification settings - Fork 255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ClosedChannelException while TcpClient reads from TcpServer #598
Comments
Hi Jonas, it looks like
Here is an example that shows how to escape (and test) the exception. I've also added some fixes for bugs that are in your original code. import io.netty.buffer.ByteBuf;
import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.protocol.tcp.client.TcpClient;
import io.reactivex.netty.protocol.tcp.server.TcpServer;
import org.testng.annotations.Test;
import rx.Observable;
import rx.observables.StringObservable;
import rx.observers.TestSubscriber;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.util.stream.Stream;
public class TcpTest {
@Test
public void testSinkClosedChannelException() throws Exception {
int count = 100;
TcpServer<ByteBuf, ByteBuf> server = TcpServer.newServer(0)
.start(
c ->
c.writeString(
Observable.range(0, count)
.map(i -> i + ",")
)
);
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
TcpClient.newClient("127.0.0.1", server.getServerPort())
.createConnectionRequest()
.flatMap(Connection::getInput)
.map(
b -> {
// must free the ByteBuf
String s = b.toString(Charset.defaultCharset());
b.release();
return s;
}
)
// can't guarantee that the block boundaries will line up so we force split
.compose(o -> StringObservable.split(o, ","))
.map(Integer::parseInt)
.onErrorResumeNext(
e -> {
if (e instanceof ClosedChannelException) {
// sink ClosedChannelException to nothing
return Observable.empty();
}
// 'rethrow' all other errors
return Observable.error(e);
}
)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertValueCount(count);
testSubscriber.assertValues(
Stream.iterate(0, i -> i + 1).limit(count).toArray(Integer[]::new)
);
testSubscriber.assertNoErrors();
} |
Thank you for a quick reply! Catching the ClosedChannelException and returning empty will work, but I was more interested in the thoughts behind considering a connection closed by the server as an error. I tried to dig down in the commits for some clues, but there is just one large commit by @NiteshKant ( 9da1977#diff-cb86b4ce0a39c9fea0a3174ae12680feR99 ) where this was changed from onCompleted to onError. I will try to understand why this does not happen when using HttpServer + HttpClient. |
Yeah, beyond that comment I have no idea. Nothing struck me in the commit either. I had wondered if it is to signal client close vs server close (which would otherwise be lost) because Netflix had a socket client that needed to know this, but that's pure speculation. |
I am getting
java.nio.channels.ClosedChannelException
in my TcpClient when the server closes the connection. I would expect thegetInput()
Observable to complete when the client notices a closed connection, rather than to return an error.Consider the following test:
The test will output:
Are my expectations wrong? How can I modify the test so that the client can read the input to the end? Examples in the repository uses
take(x)
which will unsubscribe after receiving the expected number of results, but how can I write an example with an arbitrary number of results?The text was updated successfully, but these errors were encountered: