Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaky test pass1 #855

Merged
merged 5 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1132,9 +1132,10 @@ public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn on

@Override
public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) {
connectedFuture.completeExceptionally(new Exception(
"[" + client_name + "] Could not connect! Error code is: " + onConnectionFailureReturn.getErrorCode()
));
// failing the connected future here is not valid from a race condition standpoint. It is possible that
// the interrupting client itself gets interrupted and fails to fully connect due to the original client
// interrupting it. Eventually it will succeed (briefly) as the two clients fight over the client id
// with increasing reconnect backoff.
}

@Override
Expand Down Expand Up @@ -2083,6 +2084,9 @@ public void Op_UC4() {
eventsTwo.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
subscriber.subscribe(subscribePacketBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);

// Paranoid about service-side eventual consistency. Add a wait to reduce chances of a missed will publish.
Thread.sleep(2000);

publisher.stop(disconnectPacketBuilder.build());

publishEvents.publishReceivedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
Expand Down Expand Up @@ -2171,11 +2175,6 @@ public void Op_SharedSubscription() {
// Wait a little longer just to ensure that no packets beyond expectations are arrived.
publishEvents.afterCompletionFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);

// Check that both clients received packets.
// PublishEvents_Futured_Counted also checks for duplicated packets, so this one assert is enough
// to ensure that AWS IoT Core sent different packets to different subscribers.
assertTrue(publishEvents.clientsReceived.size() == 2);

subscriberOneClient.stop();
subscriberTwoClient.stop();
publisherClient.stop();
Expand Down
19 changes: 0 additions & 19 deletions src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ public SelfPubSubTest() {
static final String TEST_TOPIC = "publish/me/senpai/" + UUID.randomUUID().toString();
static final String TEST_PAYLOAD = "PUBLISH ME! SHINY AND CHROME!";

int pubsAcked = 0;
int subsAcked = 0;

@Test
public void testPubSub() {
skipIfNetworkUnavailable();
Expand Down Expand Up @@ -65,27 +62,21 @@ public void testPubSub() {

CompletableFuture<Integer> subscribed = connection.subscribe(TEST_TOPIC, QualityOfService.AT_LEAST_ONCE,
messageHandler);
subscribed.thenApply(unused -> subsAcked++);
int packetId = subscribed.get();

assertNotSame(0, packetId);
assertEquals("Single subscription", 1, subsAcked);

MqttMessage message = new MqttMessage(TEST_TOPIC, TEST_PAYLOAD.getBytes(), QualityOfService.AT_LEAST_ONCE,
false);
CompletableFuture<Integer> published = connection.publish(message);
published.thenApply(unused -> pubsAcked++);
packetId = published.get();

assertNotSame(0, packetId);
assertEquals("Published", 1, pubsAcked);

published = connection.publish(message);
published.thenApply(unused -> pubsAcked++);
packetId = published.get();

assertNotSame(0, packetId);
assertEquals("Published", 2, pubsAcked);

MqttMessage received = receivedFuture.get();
assertEquals("Received", message.getTopic(), received.getTopic());
Expand All @@ -94,11 +85,9 @@ public void testPubSub() {
assertEquals("Received", message.getRetain(), received.getRetain());

CompletableFuture<Integer> unsubscribed = connection.unsubscribe(TEST_TOPIC);
unsubscribed.thenApply(unused -> subsAcked--);
packetId = unsubscribed.get();

assertNotSame(0, packetId);
assertEquals("No Subscriptions", 0, subsAcked);
} catch (Exception ex) {
fail(ex.getMessage());
}
Expand Down Expand Up @@ -142,33 +131,25 @@ public void testPubSubOnMessage() {

try {
CompletableFuture<Integer> subscribed = connection.subscribe(TEST_TOPIC, QualityOfService.AT_LEAST_ONCE);
subscribed.thenApply(unused -> subsAcked++);
int packetId = subscribed.get();

assertNotSame(0, packetId);
assertEquals("Single subscription", 1, subsAcked);

MqttMessage message = new MqttMessage(TEST_TOPIC, TEST_PAYLOAD.getBytes(), QualityOfService.AT_LEAST_ONCE);
CompletableFuture<Integer> published = connection.publish(message);
published.thenApply(unused -> pubsAcked++);
packetId = published.get();

assertNotSame(0, packetId);
assertEquals("Published", 1, pubsAcked);

published = connection.publish(message);
published.thenApply(unused -> pubsAcked++);
packetId = published.get();

assertNotSame(0, packetId);
assertEquals("Published", 2, pubsAcked);

CompletableFuture<Integer> unsubscribed = connection.unsubscribe(TEST_TOPIC);
unsubscribed.thenApply(unused -> subsAcked--);
packetId = unsubscribed.get();

assertNotSame(0, packetId);
assertEquals("No Subscriptions", 0, subsAcked);
} catch (Exception ex) {
fail(ex.getMessage());
}
Expand Down
Loading