Skip to content

Commit

Permalink
Unix domain socket support (#77)
Browse files Browse the repository at this point in the history
Closes #76

Signed-off-by: Thomas Segismont <[email protected]>
  • Loading branch information
tsegismont authored Jan 3, 2025
1 parent b2da0b6 commit 43750ae
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 10 deletions.
10 changes: 9 additions & 1 deletion src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
= Vert.x TCP EventBus bridge
:toc: left

Vert.x TCP EventBus bridge is a TCP bridge to Vert.x EventBus.
To use this project, add the following dependency to the _dependencies_ section of your build descriptor:
Expand Down Expand Up @@ -71,3 +70,12 @@ An example on how to get started with this bridge could be:
----
{@link examples.TCPBridgeExamples#example1}
----

== Listening to Unix domain sockets

When running on JDK 16+, or using a https://vertx.io/docs/vertx-core/java/#_native_transports[native transport], a server can listen to Unix domain sockets:

[source,$lang]
----
{@link examples.TCPBridgeExamples#serverWithDomainSockets}
----
13 changes: 12 additions & 1 deletion src/main/java/examples/TCPBridgeExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package examples;

import io.vertx.core.Vertx;
import io.vertx.core.net.SocketAddress;
import io.vertx.docgen.Source;
import io.vertx.ext.bridge.BridgeOptions;
import io.vertx.ext.bridge.PermittedOptions;
Expand All @@ -30,7 +31,6 @@
public class TCPBridgeExamples {

public void example1(Vertx vertx) {

TcpEventBusBridge bridge = TcpEventBusBridge.create(
vertx,
new BridgeOptions()
Expand All @@ -44,6 +44,17 @@ public void example1(Vertx vertx) {
// fail...
}
});
}

public void serverWithDomainSockets(TcpEventBusBridge bridge) {
SocketAddress domainSocketAddress = SocketAddress.domainSocketAddress("/var/tmp/bridge.sock");

bridge.listen(domainSocketAddress).onComplete(res -> {
if (res.succeeded()) {
// succeed...
} else {
// fail...
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
*/
package io.vertx.ext.eventbus.bridge.tcp;

import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.bridge.BridgeOptions;
import io.vertx.ext.eventbus.bridge.tcp.impl.TcpEventBusBridgeImpl;

Expand All @@ -44,35 +43,47 @@ static TcpEventBusBridge create(Vertx vertx, BridgeOptions options) {
static TcpEventBusBridge create(Vertx vertx, BridgeOptions options, NetServerOptions netServerOptions) {
return new TcpEventBusBridgeImpl(vertx, options, netServerOptions,null);
}

static TcpEventBusBridge create(Vertx vertx, BridgeOptions options, NetServerOptions netServerOptions,Handler<BridgeEvent> eventHandler) {
return new TcpEventBusBridgeImpl(vertx, options, netServerOptions,eventHandler);
}

/**
* Listen on default port 7000
* Start listening on the port and host as configured in the {@link io.vertx.core.net.NetServerOptions} used when creating the server.
*
* @return a future of the result
*/
Future<TcpEventBusBridge> listen();

/**
* Listen on specific port and bind to specific address
* Start listening on the specified port and host, ignoring port and host configured in the {@link io.vertx.core.net.NetServerOptions} used when creating the server.
*
* @param port tcp port
* @param address tcp address to the bind
* @param port the tcp port
* @param address the local address
*
* @return a future of the result
*/
Future<TcpEventBusBridge> listen(int port, String address);

/**
* Listen on specific port
* Start listening on the specified port and host "0.0.0.0", ignoring port and host configured in the {@link io.vertx.core.net.NetServerOptions} used when creating the server.
*
* @param port tcp port
* @param port the TCP port
*
* @return a future of the result
*/
Future<TcpEventBusBridge> listen(int port);

/**
* Start listening on the specified local address, ignoring port and host configured in the {@link NetServerOptions} used when creating the server.
*
* @param localAddress the local address to listen on
* @return a future of the result
*/
default Future<TcpEventBusBridge> listen(SocketAddress localAddress) {
return Future.failedFuture("Not supported");
}

/**
* Close the current socket.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.bridge.BridgeEventType;
import io.vertx.ext.bridge.BridgeOptions;
import io.vertx.ext.bridge.PermittedOptions;
Expand Down Expand Up @@ -81,6 +82,11 @@ public Future<TcpEventBusBridge> listen(int port) {
return server.listen(port).map(this);
}

@Override
public Future<TcpEventBusBridge> listen(SocketAddress localAddress) {
return server.listen(localAddress).map(this);
}

@Override
public Future<TcpEventBusBridge> listen(int port, String address) {
return server.listen(port, address).map(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.vertx.ext.eventbus.bridge.tcp;

import io.vertx.core.Vertx;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.bridge.BridgeOptions;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameHelper;
import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameParser;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;

import java.io.File;

import static org.junit.Assume.assumeTrue;

@RunWith(VertxUnitRunner.class)
public class UnixDomainSocketTest {

@Rule
public TemporaryFolder tmp = new TemporaryFolder();

private VertxInternal vertx;
private SocketAddress domainSocketAddress;

@Before
public void before(TestContext context) throws Exception {
vertx = (VertxInternal) Vertx.vertx();
assumeTrue("Domain sockets not supported on this platform", vertx.transport().supportsDomainSockets());

domainSocketAddress = SocketAddress.domainSocketAddress(new File(tmp.newFolder(), "bridge.sock").getAbsolutePath());

Async async = context.async();

TcpEventBusBridge bridge = TcpEventBusBridge.create(
vertx,
new BridgeOptions()
.addInboundPermitted(new PermittedOptions())
.addOutboundPermitted(new PermittedOptions()));

bridge.listen(domainSocketAddress).onComplete(res -> {
context.assertTrue(res.succeeded());
async.complete();
});
}

@After
public void after(TestContext context) {
vertx.close().onComplete(context.asyncAssertSuccess());
}

@Test
public void testRegister(TestContext context) {
// Send a request and get a response
NetClient client = vertx.createNetClient();
final Async async = context.async();

client.connect(domainSocketAddress).onComplete(conn -> {
context.assertFalse(conn.failed());

NetSocket socket = conn.result();

// 1 reply will arrive
// MESSAGE for echo
final FrameParser parser = new FrameParser(parse -> {
context.assertTrue(parse.succeeded());
JsonObject frame = parse.result();

context.assertNotEquals("err", frame.getString("type"));
context.assertEquals("Vert.x", frame.getJsonObject("body").getString("value"));
client.close();
async.complete();
});

socket.handler(parser);

FrameHelper.sendFrame("register", "echo", null, socket);

// now try to publish a message so it gets delivered both to the consumer registred on the startup and to this
// remote consumer

FrameHelper.sendFrame("publish", "echo", new JsonObject().put("value", "Vert.x"), socket);
});
}
}

0 comments on commit 43750ae

Please sign in to comment.