From 43750ae299f5515a2560ae3320aca508924b01c3 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Fri, 3 Jan 2025 11:49:59 +0100 Subject: [PATCH] Unix domain socket support (#77) Closes #76 Signed-off-by: Thomas Segismont --- src/main/asciidoc/index.adoc | 10 +- src/main/java/examples/TCPBridgeExamples.java | 13 ++- .../bridge/tcp/TcpEventBusBridge.java | 27 ++++-- .../tcp/impl/TcpEventBusBridgeImpl.java | 6 ++ .../bridge/tcp/UnixDomainSocketTest.java | 95 +++++++++++++++++++ 5 files changed, 141 insertions(+), 10 deletions(-) create mode 100644 src/test/java/io/vertx/ext/eventbus/bridge/tcp/UnixDomainSocketTest.java diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index ddb4208..55a1964 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -1,5 +1,4 @@ = Vert.x TCP EventBus bridge -:toc: left Vert.x TCP EventBus bridge is a TCP bridge to Vert.x EventBus. To use this project, add the following dependency to the _dependencies_ section of your build descriptor: @@ -71,3 +70,12 @@ An example on how to get started with this bridge could be: ---- {@link examples.TCPBridgeExamples#example1} ---- + +== Listening to Unix domain sockets + +When running on JDK 16+, or using a https://vertx.io/docs/vertx-core/java/#_native_transports[native transport], a server can listen to Unix domain sockets: + +[source,$lang] +---- +{@link examples.TCPBridgeExamples#serverWithDomainSockets} +---- diff --git a/src/main/java/examples/TCPBridgeExamples.java b/src/main/java/examples/TCPBridgeExamples.java index b4892ef..c1ef59a 100644 --- a/src/main/java/examples/TCPBridgeExamples.java +++ b/src/main/java/examples/TCPBridgeExamples.java @@ -17,6 +17,7 @@ package examples; import io.vertx.core.Vertx; +import io.vertx.core.net.SocketAddress; import io.vertx.docgen.Source; import io.vertx.ext.bridge.BridgeOptions; import io.vertx.ext.bridge.PermittedOptions; @@ -30,7 +31,6 @@ public class TCPBridgeExamples { public void example1(Vertx vertx) { - TcpEventBusBridge bridge = TcpEventBusBridge.create( vertx, new BridgeOptions() @@ -44,6 +44,17 @@ public void example1(Vertx vertx) { // fail... } }); + } + + public void serverWithDomainSockets(TcpEventBusBridge bridge) { + SocketAddress domainSocketAddress = SocketAddress.domainSocketAddress("/var/tmp/bridge.sock"); + bridge.listen(domainSocketAddress).onComplete(res -> { + if (res.succeeded()) { + // succeed... + } else { + // fail... + } + }); } } diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridge.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridge.java index 5f96787..2cc9b0f 100644 --- a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridge.java +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridge.java @@ -15,13 +15,12 @@ */ package io.vertx.ext.eventbus.bridge.tcp; -import io.vertx.codegen.annotations.Fluent; import io.vertx.codegen.annotations.VertxGen; -import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.net.NetServerOptions; +import io.vertx.core.net.SocketAddress; import io.vertx.ext.bridge.BridgeOptions; import io.vertx.ext.eventbus.bridge.tcp.impl.TcpEventBusBridgeImpl; @@ -44,35 +43,47 @@ static TcpEventBusBridge create(Vertx vertx, BridgeOptions options) { static TcpEventBusBridge create(Vertx vertx, BridgeOptions options, NetServerOptions netServerOptions) { return new TcpEventBusBridgeImpl(vertx, options, netServerOptions,null); } + static TcpEventBusBridge create(Vertx vertx, BridgeOptions options, NetServerOptions netServerOptions,Handler eventHandler) { return new TcpEventBusBridgeImpl(vertx, options, netServerOptions,eventHandler); } + /** - * Listen on default port 7000 + * Start listening on the port and host as configured in the {@link io.vertx.core.net.NetServerOptions} used when creating the server. * * @return a future of the result */ Future listen(); /** - * Listen on specific port and bind to specific address + * Start listening on the specified port and host, ignoring port and host configured in the {@link io.vertx.core.net.NetServerOptions} used when creating the server. * - * @param port tcp port - * @param address tcp address to the bind + * @param port the tcp port + * @param address the local address * * @return a future of the result */ Future listen(int port, String address); /** - * Listen on specific port + * Start listening on the specified port and host "0.0.0.0", ignoring port and host configured in the {@link io.vertx.core.net.NetServerOptions} used when creating the server. * - * @param port tcp port + * @param port the TCP port * * @return a future of the result */ Future listen(int port); + /** + * Start listening on the specified local address, ignoring port and host configured in the {@link NetServerOptions} used when creating the server. + * + * @param localAddress the local address to listen on + * @return a future of the result + */ + default Future listen(SocketAddress localAddress) { + return Future.failedFuture("Not supported"); + } + /** * Close the current socket. * diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TcpEventBusBridgeImpl.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TcpEventBusBridgeImpl.java index 87d45de..fe883a8 100644 --- a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TcpEventBusBridgeImpl.java +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TcpEventBusBridgeImpl.java @@ -26,6 +26,7 @@ import io.vertx.core.net.NetServer; import io.vertx.core.net.NetServerOptions; import io.vertx.core.net.NetSocket; +import io.vertx.core.net.SocketAddress; import io.vertx.ext.bridge.BridgeEventType; import io.vertx.ext.bridge.BridgeOptions; import io.vertx.ext.bridge.PermittedOptions; @@ -81,6 +82,11 @@ public Future listen(int port) { return server.listen(port).map(this); } + @Override + public Future listen(SocketAddress localAddress) { + return server.listen(localAddress).map(this); + } + @Override public Future listen(int port, String address) { return server.listen(port, address).map(this); diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/UnixDomainSocketTest.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/UnixDomainSocketTest.java new file mode 100644 index 0000000..4b42b3b --- /dev/null +++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/UnixDomainSocketTest.java @@ -0,0 +1,95 @@ +package io.vertx.ext.eventbus.bridge.tcp; + +import io.vertx.core.Vertx; +import io.vertx.core.internal.VertxInternal; +import io.vertx.core.json.JsonObject; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetSocket; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.bridge.BridgeOptions; +import io.vertx.ext.bridge.PermittedOptions; +import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameHelper; +import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameParser; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; + +import java.io.File; + +import static org.junit.Assume.assumeTrue; + +@RunWith(VertxUnitRunner.class) +public class UnixDomainSocketTest { + + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + + private VertxInternal vertx; + private SocketAddress domainSocketAddress; + + @Before + public void before(TestContext context) throws Exception { + vertx = (VertxInternal) Vertx.vertx(); + assumeTrue("Domain sockets not supported on this platform", vertx.transport().supportsDomainSockets()); + + domainSocketAddress = SocketAddress.domainSocketAddress(new File(tmp.newFolder(), "bridge.sock").getAbsolutePath()); + + Async async = context.async(); + + TcpEventBusBridge bridge = TcpEventBusBridge.create( + vertx, + new BridgeOptions() + .addInboundPermitted(new PermittedOptions()) + .addOutboundPermitted(new PermittedOptions())); + + bridge.listen(domainSocketAddress).onComplete(res -> { + context.assertTrue(res.succeeded()); + async.complete(); + }); + } + + @After + public void after(TestContext context) { + vertx.close().onComplete(context.asyncAssertSuccess()); + } + + @Test + public void testRegister(TestContext context) { + // Send a request and get a response + NetClient client = vertx.createNetClient(); + final Async async = context.async(); + + client.connect(domainSocketAddress).onComplete(conn -> { + context.assertFalse(conn.failed()); + + NetSocket socket = conn.result(); + + // 1 reply will arrive + // MESSAGE for echo + final FrameParser parser = new FrameParser(parse -> { + context.assertTrue(parse.succeeded()); + JsonObject frame = parse.result(); + + context.assertNotEquals("err", frame.getString("type")); + context.assertEquals("Vert.x", frame.getJsonObject("body").getString("value")); + client.close(); + async.complete(); + }); + + socket.handler(parser); + + FrameHelper.sendFrame("register", "echo", null, socket); + + // now try to publish a message so it gets delivered both to the consumer registred on the startup and to this + // remote consumer + + FrameHelper.sendFrame("publish", "echo", new JsonObject().put("value", "Vert.x"), socket); + }); + } +}