Skip to content

Commit

Permalink
Update old unit tests + added new ones
Browse files Browse the repository at this point in the history
  • Loading branch information
corcoja committed Apr 5, 2022
1 parent f9a32a8 commit f22ceed
Show file tree
Hide file tree
Showing 4 changed files with 861 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -93,19 +95,18 @@ void testPubSubPushConsumerOneMessage() throws Exception {
testMessage));

// Subscribe to a push-based consumer
assertDoesNotThrow(() -> pubSubTransceiver.subscribe(new SubHandler() {
SubHandler subHandler = new SubHandler() {
@Override
public void newMessageReceived(SubOnlyTunnel tunnel, String subject, String message) {
testMatch.set(testSubjects.stream().anyMatch((testSubject) -> subject.equals(testSubject))
&& testMessage.equals(message));
testMatch
.set(testSubjects.stream().anyMatch(subject::equals) && testMessage.equals(message));
}
}));
};
assertDoesNotThrow(() -> pubSubTransceiver.subscribe(subHandler));

// Wait until the subscription has received the published message
Awaitility.await().atMost(Duration.ofSeconds(4)).with().pollInterval(Duration.ofMillis(500))
.until(() -> testMatch.get());

assertTrue(testMatch.get());
.until(testMatch::get);

assertDoesNotThrow(() -> connectionPrimer.close());
natsServer.stop();
Expand Down Expand Up @@ -134,19 +135,18 @@ void testPubSubPullConsumerOneMessage() throws Exception {
testMessage));

// Subscribe to a pull-based consumer
assertDoesNotThrow(() -> pubSubTransceiver.subscribe(new SubHandler() {
SubHandler subHandler = new SubHandler() {
@Override
public void newMessageReceived(SubOnlyTunnel tunnel, String subject, String message) {
testMatch.set(testSubjects.stream().anyMatch((testSubject) -> subject.equals(testSubject))
&& testMessage.equals(message));
testMatch
.set(testSubjects.stream().anyMatch(subject::equals) && testMessage.equals(message));
}
}));
};
assertDoesNotThrow(() -> pubSubTransceiver.subscribe(subHandler));

// Wait until the subscription has received the published message
Awaitility.await().atMost(Duration.ofSeconds(4)).with().pollInterval(Duration.ofMillis(500))
.until(() -> testMatch.get());

assertTrue(testMatch.get());
.until(testMatch::get);

assertDoesNotThrow(() -> connectionPrimer.close());
natsServer.stop();
Expand Down Expand Up @@ -182,22 +182,21 @@ void testPubSubPushConsumerManyMessages() throws Exception {
}

// Subscribe to a push-based consumer
assertDoesNotThrow(() -> pubSubTransceiver.subscribe(new SubHandler() {
SubHandler subHandler = new SubHandler() {
@Override
public void newMessageReceived(SubOnlyTunnel tunnel, String subject, String message) {
IntStream.range(0, matches.size())
.filter(idx -> messages.get(idx).getKey().equals(subject)
&& messages.get(idx).getValue().equals(message))
.findFirst().ifPresent(idx -> matches.set(idx, true));
}
}));
};
assertDoesNotThrow(() -> pubSubTransceiver.subscribe(subHandler));

// Wait until the subscription has received all the published message
Awaitility.await().atMost(Duration.ofSeconds(8)).with().pollInterval(Duration.ofMillis(500))
.until(() -> matches.stream().allMatch(Boolean::valueOf));

assertTrue(matches.stream().allMatch(Boolean::valueOf));

assertDoesNotThrow(() -> connectionPrimer.close());
natsServer.stop();
}
Expand Down Expand Up @@ -231,21 +230,133 @@ void testPubSubPullConsumerManyMessages() throws Exception {
}

// Subscribe to a pull-based consumer
assertDoesNotThrow(() -> pubSubTransceiver.subscribe(new SubHandler() {
SubHandler subHandler = new SubHandler() {
@Override
public void newMessageReceived(SubOnlyTunnel tunnel, String subject, String message) {
IntStream.range(0, matches.size())
.filter(idx -> messages.get(idx).getKey().equals(subject)
&& messages.get(idx).getValue().equals(message))
.findFirst().ifPresent(idx -> matches.set(idx, true));
}
}));
};
assertDoesNotThrow(() -> pubSubTransceiver.subscribe(subHandler));

// Wait until the subscription has received all the published message
Awaitility.await().atMost(Duration.ofSeconds(8)).with().pollInterval(Duration.ofMillis(500))
.until(() -> matches.stream().allMatch(Boolean::valueOf));

assertTrue(matches.stream().allMatch(Boolean::valueOf));
assertDoesNotThrow(() -> connectionPrimer.close());
natsServer.stop();
}

@Test
void testPubSubPushConsumerUnsubscribe() throws Exception {
List<String> testSubjects = List.of("test1", "test2", "test3", "test4", "test4");
String testMessageWhenSubscribed = "Test message while subscribed!";
String testMessageNotSubscribed = "Test message not subscribed!";

natsServer.start();

// Create connection and the transceiver object
final ConnectionPrimer connectionPrimer = new ConnectionPrimer(serverUrl);
PubSubTransceiver pubSubTransceiver = new PubSubTransceiver(connectionPrimer,
new StreamConfiguration.Builder().name("test").subjects(testSubjects).build(),
new ConsumerConfiguration.Builder().durable("test-consumer-push")
.deliverSubject("delivery.test").build(),
new PubSubConfiguration.Builder().automaticallyCreateStream(true)
.automaticallyCreateConsumer(true).build());

AtomicInteger testMatches = new AtomicInteger(0);

// Publish one message
assertDoesNotThrow(() -> pubSubTransceiver.publish(
testSubjects.get(ThreadLocalRandom.current().nextInt(0, testSubjects.size())),
testMessageWhenSubscribed));

// Subscribe to a push-based consumer
SubHandler subHandler = new SubHandler() {
@Override
public void newMessageReceived(SubOnlyTunnel tunnel, String subject, String message) {
Boolean subjectMatch = testSubjects.stream().anyMatch(subject::equals);
Boolean messageMatch = List.of(testMessageWhenSubscribed, testMessageNotSubscribed).stream()
.anyMatch(message::equals);

if (subjectMatch && messageMatch) {
testMatches.incrementAndGet();
}
}
};
assertDoesNotThrow(() -> pubSubTransceiver.subscribe(subHandler));

// Wait until the subscription has received the published message
Awaitility.await().atMost(Duration.ofSeconds(4)).with().pollInterval(Duration.ofMillis(500))
.until(() -> testMatches.get() == 1);

// Unsubscribe, send another message and wait a few seconds
pubSubTransceiver.unsubscribe();
assertDoesNotThrow(() -> pubSubTransceiver.publish(
testSubjects.get(ThreadLocalRandom.current().nextInt(0, testSubjects.size())),
testMessageNotSubscribed));
TimeUnit.SECONDS.sleep(2);

// The matches count still must be one
assertTrue(testMatches.get() == 1);

assertDoesNotThrow(() -> connectionPrimer.close());
natsServer.stop();
}

@Test
void testPubSubPullConsumerUnsubscribe() throws Exception {
List<String> testSubjects = List.of("test1", "test2", "test3", "test4", "test4");
String testMessageWhenSubscribed = "Test message while subscribed!";
String testMessageNotSubscribed = "Test message not subscribed!";

natsServer.start();

// Create connection and the transceiver object
final ConnectionPrimer connectionPrimer = new ConnectionPrimer(serverUrl);
PubSubTransceiver pubSubTransceiver = new PubSubTransceiver(connectionPrimer,
new StreamConfiguration.Builder().name("test").subjects(testSubjects).build(),
new ConsumerConfiguration.Builder().durable("test-consumer-pull").build(),
new PubSubConfiguration.Builder().automaticallyCreateStream(true)
.automaticallyCreateConsumer(true).build());

AtomicInteger testMatches = new AtomicInteger(0);

// Publish one message
assertDoesNotThrow(() -> pubSubTransceiver.publish(
testSubjects.get(ThreadLocalRandom.current().nextInt(0, testSubjects.size())),
testMessageWhenSubscribed));

// Subscribe to a pull-based consumer
SubHandler subHandler = new SubHandler() {
@Override
public void newMessageReceived(SubOnlyTunnel tunnel, String subject, String message) {
Boolean subjectMatch = testSubjects.stream().anyMatch(subject::equals);
Boolean messageMatch = List.of(testMessageWhenSubscribed, testMessageNotSubscribed).stream()
.anyMatch(message::equals);

if (subjectMatch && messageMatch) {
testMatches.incrementAndGet();
}
}
};
assertDoesNotThrow(() -> pubSubTransceiver.subscribe(subHandler));

// Wait until the subscription has received the published message
Awaitility.await().atMost(Duration.ofSeconds(4)).with().pollInterval(Duration.ofMillis(500))
.until(() -> testMatches.get() == 1);

// Unsubscribe, send another message and wait a few seconds
pubSubTransceiver.unsubscribe();
assertDoesNotThrow(() -> pubSubTransceiver.publish(
testSubjects.get(ThreadLocalRandom.current().nextInt(0, testSubjects.size())),
testMessageNotSubscribed));
TimeUnit.SECONDS.sleep(2);

// The matches count still must be one
assertTrue(testMatches.get() == 1);

assertDoesNotThrow(() -> connectionPrimer.close());
natsServer.stop();
Expand Down
Loading

0 comments on commit f22ceed

Please sign in to comment.