diff --git a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java index fda1139f5..1fc36ea73 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java @@ -1132,9 +1132,10 @@ public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn on @Override public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) { - connectedFuture.completeExceptionally(new Exception( - "[" + client_name + "] Could not connect! Error code is: " + onConnectionFailureReturn.getErrorCode() - )); + // failing the connected future here is not valid from a race condition standpoint. It is possible that + // the interrupting client itself gets interrupted and fails to fully connect due to the original client + // interrupting it. Eventually it will succeed (briefly) as the two clients fight over the client id + // with increasing reconnect backoff. } @Override @@ -2083,6 +2084,9 @@ public void Op_UC4() { eventsTwo.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); subscriber.subscribe(subscribePacketBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + // Paranoid about service-side eventual consistency. Add a wait to reduce chances of a missed will publish. + Thread.sleep(2000); + publisher.stop(disconnectPacketBuilder.build()); publishEvents.publishReceivedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); @@ -2171,11 +2175,6 @@ public void Op_SharedSubscription() { // Wait a little longer just to ensure that no packets beyond expectations are arrived. publishEvents.afterCompletionFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); - // Check that both clients received packets. - // PublishEvents_Futured_Counted also checks for duplicated packets, so this one assert is enough - // to ensure that AWS IoT Core sent different packets to different subscribers. - assertTrue(publishEvents.clientsReceived.size() == 2); - subscriberOneClient.stop(); subscriberTwoClient.stop(); publisherClient.stop(); diff --git a/src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java b/src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java index 18906a2f8..5de86f2ed 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java @@ -34,9 +34,6 @@ public SelfPubSubTest() { static final String TEST_TOPIC = "publish/me/senpai/" + UUID.randomUUID().toString(); static final String TEST_PAYLOAD = "PUBLISH ME! SHINY AND CHROME!"; - int pubsAcked = 0; - int subsAcked = 0; - @Test public void testPubSub() { skipIfNetworkUnavailable(); @@ -65,27 +62,21 @@ public void testPubSub() { CompletableFuture subscribed = connection.subscribe(TEST_TOPIC, QualityOfService.AT_LEAST_ONCE, messageHandler); - subscribed.thenApply(unused -> subsAcked++); int packetId = subscribed.get(); assertNotSame(0, packetId); - assertEquals("Single subscription", 1, subsAcked); MqttMessage message = new MqttMessage(TEST_TOPIC, TEST_PAYLOAD.getBytes(), QualityOfService.AT_LEAST_ONCE, false); CompletableFuture published = connection.publish(message); - published.thenApply(unused -> pubsAcked++); packetId = published.get(); assertNotSame(0, packetId); - assertEquals("Published", 1, pubsAcked); published = connection.publish(message); - published.thenApply(unused -> pubsAcked++); packetId = published.get(); assertNotSame(0, packetId); - assertEquals("Published", 2, pubsAcked); MqttMessage received = receivedFuture.get(); assertEquals("Received", message.getTopic(), received.getTopic()); @@ -94,11 +85,9 @@ public void testPubSub() { assertEquals("Received", message.getRetain(), received.getRetain()); CompletableFuture unsubscribed = connection.unsubscribe(TEST_TOPIC); - unsubscribed.thenApply(unused -> subsAcked--); packetId = unsubscribed.get(); assertNotSame(0, packetId); - assertEquals("No Subscriptions", 0, subsAcked); } catch (Exception ex) { fail(ex.getMessage()); } @@ -142,33 +131,25 @@ public void testPubSubOnMessage() { try { CompletableFuture subscribed = connection.subscribe(TEST_TOPIC, QualityOfService.AT_LEAST_ONCE); - subscribed.thenApply(unused -> subsAcked++); int packetId = subscribed.get(); assertNotSame(0, packetId); - assertEquals("Single subscription", 1, subsAcked); MqttMessage message = new MqttMessage(TEST_TOPIC, TEST_PAYLOAD.getBytes(), QualityOfService.AT_LEAST_ONCE); CompletableFuture published = connection.publish(message); - published.thenApply(unused -> pubsAcked++); packetId = published.get(); assertNotSame(0, packetId); - assertEquals("Published", 1, pubsAcked); published = connection.publish(message); - published.thenApply(unused -> pubsAcked++); packetId = published.get(); assertNotSame(0, packetId); - assertEquals("Published", 2, pubsAcked); CompletableFuture unsubscribed = connection.unsubscribe(TEST_TOPIC); - unsubscribed.thenApply(unused -> subsAcked--); packetId = unsubscribed.get(); assertNotSame(0, packetId); - assertEquals("No Subscriptions", 0, subsAcked); } catch (Exception ex) { fail(ex.getMessage()); }