Skip to content

Commit

Permalink
Streaming tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Dec 12, 2024
1 parent bb9e272 commit aa15b48
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 10 deletions.
11 changes: 11 additions & 0 deletions src/native/mqtt_request_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,8 @@ static void s_aws_streaming_operation_binding_destroy(struct aws_streaming_opera
(*env)->DeleteGlobalRef(env, binding->java_subscription_status_event_callback);
}

aws_jni_release_thread_env(binding->jvm, env);

done:

aws_mem_release(binding->allocator, binding);
Expand Down Expand Up @@ -836,6 +838,11 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_iot_StreamingOperationBas

struct aws_streaming_operation_binding *binding =
(struct aws_streaming_operation_binding *)jni_streaming_operation_handle;
if (binding == NULL) {
aws_jni_throw_runtime_exception(env, "streamingOperationOpen - stream already closed");
return;
}

struct aws_mqtt_rr_client_operation *stream = binding->stream;

if (aws_mqtt_rr_client_operation_activate(stream)) {
Expand All @@ -853,6 +860,10 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_iot_StreamingOperationBas

struct aws_streaming_operation_binding *binding =
(struct aws_streaming_operation_binding *)jni_streaming_operation_handle;
if (binding == NULL) {
return;
}

struct aws_mqtt_rr_client_operation *stream = binding->stream;

binding->stream = NULL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,13 @@
import software.amazon.awssdk.crt.CrtRuntimeException;
import software.amazon.awssdk.crt.io.TlsContext;
import software.amazon.awssdk.crt.io.TlsContextOptions;
import software.amazon.awssdk.crt.iot.MqttRequestResponse;
import software.amazon.awssdk.crt.iot.MqttRequestResponseClient;
import software.amazon.awssdk.crt.iot.MqttRequestResponseClientBuilder;
import software.amazon.awssdk.crt.iot.RequestResponseOperation;
import software.amazon.awssdk.crt.iot.ResponsePath;
import software.amazon.awssdk.crt.iot.*;
import software.amazon.awssdk.crt.mqtt.*;
import software.amazon.awssdk.crt.mqtt5.Mqtt5Client;
import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions;
import software.amazon.awssdk.crt.mqtt5.OnAttemptingConnectReturn;
import software.amazon.awssdk.crt.mqtt5.*;
import software.amazon.awssdk.crt.mqtt5.OnConnectionFailureReturn;
import software.amazon.awssdk.crt.mqtt5.OnConnectionSuccessReturn;
import software.amazon.awssdk.crt.mqtt5.OnDisconnectionReturn;
import software.amazon.awssdk.crt.mqtt5.OnStoppedReturn;
import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket;
import software.amazon.awssdk.crt.mqtt5.packets.PublishPacket;

import java.nio.charset.StandardCharsets;
import java.util.UUID;
Expand Down Expand Up @@ -649,4 +642,205 @@ public void GetNamedShadowFailureTimeoutMqtt311() {

doGetNamedShadowFailureTimeoutTest(MqttVersion.Mqtt311);
}

void doStreamingOperationOpenClosedTest(MqttVersion version) {
this.context = new TestContext(version, null);
CompletableFuture<Boolean> subscribed = new CompletableFuture<>();

StreamingOperationOptions streamingOptions = StreamingOperationOptions.builder()
.withTopic("$aws/things/NoSuchThing/shadow/name/UpdateShadowCITest/update/delta")
.withSubscriptionStatusEventCallback((event) -> {
if (event.getType() == SubscriptionStatusEventType.SUBSCRIPTION_ESTABLISHED) {
subscribed.complete(true);
}
})
.build();

StreamingOperationBase stream = this.context.rrClient.createStream(streamingOptions);
stream.open();

try {
subscribed.get();
} catch (Exception ex) {
Assert.assertTrue(false);
}

stream.close();
}

@Test
public void ShadowUpdatedStreamingOperationOpenCloseMqtt5() {
skipIfNetworkUnavailable();

doStreamingOperationOpenClosedTest(MqttVersion.Mqtt5);
}

@Test
public void ShadowUpdatedStreamingOperationOpenCloseMqtt311() {
skipIfNetworkUnavailable();

doStreamingOperationOpenClosedTest(MqttVersion.Mqtt311);
}

void doStreamingOperationIncomingPublishTest(MqttVersion version) {
this.context = new TestContext(version, null);
CompletableFuture<Boolean> subscribed = new CompletableFuture<>();
CompletableFuture<IncomingPublishEvent> publishEvent = new CompletableFuture<>();

String fakeShadowTopic = "not/a/shadow/topic/" + (UUID.randomUUID()).toString();
StreamingOperationOptions streamingOptions = StreamingOperationOptions.builder()
.withTopic(fakeShadowTopic)
.withSubscriptionStatusEventCallback((event) -> {
if (event.getType() == SubscriptionStatusEventType.SUBSCRIPTION_ESTABLISHED) {
subscribed.complete(true);
}
})
.withIncomingPublishEventCallback((event) -> {
publishEvent.complete(event);
})
.build();

StreamingOperationBase stream = this.context.rrClient.createStream(streamingOptions);
stream.open();

try {
subscribed.get();
} catch (Exception ex) {
Assert.assertTrue(false);
}

String originalPayloadAsString = "IncomingPublishTest";
byte[] originalPayload = originalPayloadAsString.getBytes(StandardCharsets.UTF_8);

if (version == MqttVersion.Mqtt5) {
PublishPacket publishPacket = new PublishPacket.PublishPacketBuilder()
.withPayload(originalPayload)
.withTopic(fakeShadowTopic)
.withQOS(QOS.AT_LEAST_ONCE)
.build();
this.context.mqtt5Client.publish(publishPacket);
} else {
this.context.mqtt311Client.publish(new MqttMessage(fakeShadowTopic, originalPayload, QualityOfService.AT_LEAST_ONCE));
}

try {
IncomingPublishEvent event = publishEvent.get();
Assert.assertNotNull(event);
String payloadAsString = new String(event.getPayload(), StandardCharsets.UTF_8);
Assert.assertEquals(originalPayloadAsString, payloadAsString);
} catch (Exception ex) {
Assert.assertTrue(false);
}

stream.close();
}

@Test
public void StreamingOperationIncomingPublishMqtt5() {
skipIfNetworkUnavailable();

doStreamingOperationIncomingPublishTest(MqttVersion.Mqtt5);
}

@Test
public void StreamingOperationIncomingPublishMqtt311() {
skipIfNetworkUnavailable();

doStreamingOperationIncomingPublishTest(MqttVersion.Mqtt311);
}

void doStreamingOperationReopenFailureTest(MqttVersion version) {
this.context = new TestContext(version, null);
CompletableFuture<Boolean> subscribed = new CompletableFuture<>();

StreamingOperationOptions streamingOptions = StreamingOperationOptions.builder()
.withTopic("$aws/things/NoSuchThing/shadow/name/UpdateShadowCITest/update/delta")
.withSubscriptionStatusEventCallback((event) -> {
if (event.getType() == SubscriptionStatusEventType.SUBSCRIPTION_ESTABLISHED) {
subscribed.complete(true);
}
})
.build();

StreamingOperationBase stream = this.context.rrClient.createStream(streamingOptions);
stream.open();

try {
subscribed.get();
} catch (Exception ex) {
Assert.assertTrue(false);
}

stream.close();

stream.open();
}

@Test(expected = CrtRuntimeException.class)
public void StreamingOperationReopenFailureMqtt5() {
skipIfNetworkUnavailable();

doStreamingOperationReopenFailureTest(MqttVersion.Mqtt5);
}

@Test(expected = CrtRuntimeException.class)
public void StreamingOperationReopenFailureMqtt311() {
skipIfNetworkUnavailable();

doStreamingOperationReopenFailureTest(MqttVersion.Mqtt311);
}

void doStreamingOperationCreateFailureClosedClientTest(MqttVersion version) {
this.context = new TestContext(version, null);

this.context.rrClient.close();

StreamingOperationOptions streamingOptions = StreamingOperationOptions.builder()
.withTopic("$aws/things/NoSuchThing/shadow/name/UpdateShadowCITest/update/delta")
.build();

try {
StreamingOperationBase stream = this.context.rrClient.createStream(streamingOptions);
Assert.assertTrue(false);
} finally {
this.context.rrClient = null;
}
}

@Test(expected = CrtRuntimeException.class)
public void StreamingOperationCreateFailureClosedClientMqtt5() {
skipIfNetworkUnavailable();

doStreamingOperationCreateFailureClosedClientTest(MqttVersion.Mqtt5);
}

@Test(expected = CrtRuntimeException.class)
public void StreamingOperationCreateFailureClosedClientMqtt311() {
skipIfNetworkUnavailable();

doStreamingOperationCreateFailureClosedClientTest(MqttVersion.Mqtt311);
}

void doStreamingOperationCreateFailureNullTopicTest(MqttVersion version) {
this.context = new TestContext(version, null);

StreamingOperationOptions streamingOptions = StreamingOperationOptions.builder()
.build();

StreamingOperationBase stream = this.context.rrClient.createStream(streamingOptions);
}

@Test(expected = CrtRuntimeException.class)
public void StreamingOperationCreateFailureNullTopicMqtt5() {
skipIfNetworkUnavailable();

doStreamingOperationCreateFailureNullTopicTest(MqttVersion.Mqtt5);
}

@Test(expected = CrtRuntimeException.class)
public void StreamingOperationCreateFailureNullTopicMqtt311() {
skipIfNetworkUnavailable();

doStreamingOperationCreateFailureNullTopicTest(MqttVersion.Mqtt311);
}
}

0 comments on commit aa15b48

Please sign in to comment.