From 5bf5f7a54af74b2d137dace37d671f162ec0fe68 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Thu, 5 Dec 2024 13:12:51 -0800 Subject: [PATCH] Pre-testing checkpoint --- .../awssdk/crt/iot/MqttRequestResponse.java | 8 +- .../crt/aws-crt/jni-config.json | 27 +++ src/native/java_class_ids.c | 19 ++ src/native/java_class_ids.h | 9 + src/native/mqtt_request_response.c | 223 ++++++++++++++++-- 5 files changed, 263 insertions(+), 23 deletions(-) diff --git a/src/main/java/software/amazon/awssdk/crt/iot/MqttRequestResponse.java b/src/main/java/software/amazon/awssdk/crt/iot/MqttRequestResponse.java index d435b1c80..e9f40a89f 100644 --- a/src/main/java/software/amazon/awssdk/crt/iot/MqttRequestResponse.java +++ b/src/main/java/software/amazon/awssdk/crt/iot/MqttRequestResponse.java @@ -10,12 +10,10 @@ */ public class MqttRequestResponse { - private final String topic; - private final byte[] payload; + private String topic; + private byte[] payload; - private MqttRequestResponse(String topic, byte []payload) { - this.topic = topic; - this.payload = payload; + private MqttRequestResponse() { } /** diff --git a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json index f623c2695..c5601a0d1 100644 --- a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json +++ b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json @@ -135,6 +135,16 @@ { "name": "", "parameterTypes": [] + }, + { + "name": "get", + "parameterTypes": [ + "int" + ] + }, + { + "name": "size", + "parameterTypes": [] } ] }, @@ -956,6 +966,23 @@ } ] }, + { + "name": "software.amazon.awssdk.crt.iot.MqttRequestResponse", + "methods": [ + { + "name": "", + "parameterTypes": [] + } + ], + "fields": [ + { + "name": "topic" + }, + { + "name": "payload" + } + ] + }, { "name": "software.amazon.awssdk.crt.iot.RequestResponseOperation", "fields": [ diff --git a/src/native/java_class_ids.c b/src/native/java_class_ids.c index b376e9562..de8cb676b 100644 --- a/src/native/java_class_ids.c +++ b/src/native/java_class_ids.c @@ -2375,6 +2375,24 @@ static void s_cache_request_response_operation_properties(JNIEnv *env) { AWS_FATAL_ASSERT(request_response_operation_properties.correlation_token_field_id); } +struct java_mqtt_request_response_properties mqtt_request_response_properties; + +static void s_cache_mqtt_request_response_properties(JNIEnv *env) { + jclass cls = (*env)->FindClass(env, "software/amazon/awssdk/crt/iot/MqttRequestResponse"); + AWS_FATAL_ASSERT(cls); + + mqtt_request_response_properties.mqtt_request_response_class = (*env)->NewGlobalRef(env, cls); + + mqtt_request_response_properties.constructor_method_id = + (*env)->GetMethodID(env, mqtt_request_response_properties.mqtt_request_response_class, "", "()V"); + + mqtt_request_response_properties.topic_field_id = (*env)->GetFieldID(env, cls, "topic", "Ljava/lang/String;"); + AWS_FATAL_ASSERT(mqtt_request_response_properties.topic_field_id); + + mqtt_request_response_properties.payload_field_id = (*env)->GetFieldID(env, cls, "payload", "[B"); + AWS_FATAL_ASSERT(mqtt_request_response_properties.payload_field_id); +} + static void s_cache_java_class_ids(void *user_data) { JNIEnv *env = user_data; s_cache_http_request_body_stream(env); @@ -2482,6 +2500,7 @@ static void s_cache_java_class_ids(void *user_data) { s_cache_topic_aliasing_options(env); s_cache_response_path_properties(env); s_cache_request_response_operation_properties(env); + s_cache_mqtt_request_response_properties(env); } static aws_thread_once s_cache_once_init = AWS_THREAD_ONCE_STATIC_INIT; diff --git a/src/native/java_class_ids.h b/src/native/java_class_ids.h index 710bcb842..63b66de9f 100644 --- a/src/native/java_class_ids.h +++ b/src/native/java_class_ids.h @@ -991,6 +991,15 @@ struct java_request_response_operation_properties { }; extern struct java_request_response_operation_properties request_response_operation_properties; +/* MqttRequestResponse */ +struct java_mqtt_request_response_properties { + jclass mqtt_request_response_class; + jmethodID constructor_method_id; + jfieldID topic_field_id; + jfieldID payload_field_id; +}; +extern struct java_mqtt_request_response_properties mqtt_request_response_properties; + /** * All functions bound to JNI MUST call this before doing anything else. * This caches all JNI IDs the first time it is called. Any further calls are no-op; it is thread-safe. diff --git a/src/native/mqtt_request_response.c b/src/native/mqtt_request_response.c index 270ac190c..e310dec74 100644 --- a/src/native/mqtt_request_response.c +++ b/src/native/mqtt_request_response.c @@ -215,9 +215,27 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_iot_MqttRequestResponseCl struct aws_request_response_operation_binding { struct aws_allocator *allocator; - jweak operation_future; + JavaVM *jvm; + + jobject operation_future; }; +static void s_aws_request_response_operation_binding_destroy(struct aws_request_response_operation_binding *binding) { + if (!binding) { + return; + } + + JNIEnv *env = aws_jni_acquire_thread_env(binding->jvm); + + if (env && binding->operation_future) { + (*env)->DeleteGlobalRef(env, binding->operation_future); + } + + aws_jni_release_thread_env(binding->jvm, env); + + aws_mem_release(binding->allocator, binding); +} + /* All cursors in these structures are from calls to aws_jni_byte_cursor_from_jstring_acquire and the like which means non-null ptrs must be released with the appropriate JNI call, hence the cleanup implementation. @@ -250,8 +268,8 @@ struct aws_request_response_operation_jni_owned_parameters { struct aws_byte_cursor correlation_token; }; -static int s_aws_request_response_operation_parameters_init( - struct aws_request_response_operation_parameters *params, +static void s_aws_request_response_operation_parameters_init( + struct aws_request_response_operation_jni_owned_parameters *params, struct aws_allocator *allocator) { AWS_ZERO_STRUCT(*params); @@ -259,16 +277,16 @@ static int s_aws_request_response_operation_parameters_init( aws_array_list_init_dynamic(¶ms->subscriptions, allocator, 2, sizeof(struct aws_jni_subscription)); } -static void s_aws_request_response_operation_parameters_clean_up( - struct aws_request_response_operation_parameters *params, +static void s_aws_request_response_operation_jni_owned_parameters_clean_up( + struct aws_request_response_operation_jni_owned_parameters *params, JNIEnv *env) { if (!params) { - return NULL; + return; } for (size_t i = 0; i < aws_array_list_length(¶ms->response_paths); ++i) { struct aws_jni_response_path response_path; - AWS_ZERO_STRUCT(aws_jni_response_path); + AWS_ZERO_STRUCT(response_path); aws_array_list_get_at(¶ms->response_paths, &response_path, i); @@ -287,9 +305,9 @@ static void s_aws_request_response_operation_parameters_clean_up( } aws_array_list_clean_up(¶ms->subscriptions); - aws_jni_byte_cursor_from_jstring_release(env, params->java_publish_topic, ¶ms->publish_topic); - aws_jni_byte_cursor_from_jbyteArray_release(env, params->java_payload, ¶ms->payload); - aws_jni_byte_cursor_from_jstring_release(env, params->java_correlation_token, ¶ms->correlation_token); + aws_jni_byte_cursor_from_jstring_release(env, params->java_publish_topic, params->publish_topic); + aws_jni_byte_cursor_from_jbyteArray_release(env, params->java_payload, params->payload); + aws_jni_byte_cursor_from_jstring_release(env, params->java_correlation_token, params->correlation_token); } static int s_aws_request_response_operation_jni_owned_parameters_init_from_jobject( @@ -299,14 +317,54 @@ static int s_aws_request_response_operation_jni_owned_parameters_init_from_jobje JNIEnv *env) { s_aws_request_response_operation_parameters_init(params, allocator); - if (!java_request_response_operations) { + if (!java_request_response_operation) { aws_jni_throw_runtime_exception( env, "mqttRequestResponseClientSubmitRequest - request response options are null"); return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); } // responsePaths - ? ? ; + jobject java_response_paths = (jobject)(*env)->GetObjectField( + env, java_request_response_operation, request_response_operation_properties.response_paths_field_id); + jint response_path_count = + (*env)->CallIntMethod(env, java_response_paths, boxed_array_list_properties.size_method_id); + if (response_path_count <= 0) { + aws_jni_throw_runtime_exception(env, "mqttRequestResponseClientSubmitRequest - response paths is empty"); + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + + for (size_t i = 0; i < (size_t)response_path_count; ++i) { + jstring java_response_path = + (*env)->CallObjectMethod(env, java_response_paths, boxed_array_list_properties.get_method_id, (jint)i); + if (!java_response_path) { + aws_jni_throw_runtime_exception(env, "mqttRequestResponseClientSubmitRequest - null response path"); + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + + jstring java_response_topic = + (jstring)(*env)->GetObjectField(env, java_response_path, response_path_properties.response_topic_field_id); + if (!java_response_topic) { + aws_jni_throw_runtime_exception( + env, "mqttRequestResponseClientSubmitRequest - null response path response topic"); + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + + struct aws_jni_response_path response_path = { + .java_response_topic = java_response_topic, + .response_topic = aws_jni_byte_cursor_from_jstring_acquire(env, java_response_topic), + }; + + jstring java_correlation_token_json_path = (jstring)(*env)->GetObjectField( + env, java_response_path, response_path_properties.correlation_token_json_path_field_id); + + if (java_correlation_token_json_path) { + response_path.java_correlation_token_json_path = java_correlation_token_json_path; + response_path.correlation_token_json_path = + aws_jni_byte_cursor_from_jstring_acquire(env, java_correlation_token_json_path); + } + + aws_array_list_push_back(¶ms->response_paths, &response_path); + } // subscriptions jobject java_subscriptions = (jobject)(*env)->GetObjectField( @@ -319,6 +377,19 @@ static int s_aws_request_response_operation_jni_owned_parameters_init_from_jobje } for (size_t i = 0; i < (size_t)subscription_count; ++i) { + jstring java_subscription_topic_filter = + (*env)->CallObjectMethod(env, java_subscriptions, boxed_array_list_properties.get_method_id, (jint)i); + if (!java_subscription_topic_filter) { + aws_jni_throw_runtime_exception(env, "mqttRequestResponseClientSubmitRequest - null subscription"); + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + + struct aws_jni_subscription subscription = { + .java_topic_filter = java_subscription_topic_filter, + .topic_filter = aws_jni_byte_cursor_from_jstring_acquire(env, java_subscription_topic_filter), + }; + + aws_array_list_push_back(¶ms->subscriptions, &subscription); } // publishTopic @@ -349,6 +420,55 @@ static int s_aws_request_response_operation_jni_owned_parameters_init_from_jobje return AWS_OP_SUCCESS; } +static void s_on_request_response_operation_completion( + const struct aws_byte_cursor *response_topic, + const struct aws_byte_cursor *payload, + int error_code, + void *user_data) { + + struct aws_request_response_operation_binding *binding = user_data; + JNIEnv *env = aws_jni_acquire_thread_env(binding->jvm); + if (!env) { + goto done; + } + + jobject java_result = NULL; + + if (error_code == AWS_ERROR_SUCCESS) { + java_result = (*env)->NewObject( + env, + mqtt_request_response_properties.mqtt_request_response_class, + mqtt_request_response_properties.constructor_method_id); + if (java_result != NULL) { + jstring java_topic = aws_jni_string_from_cursor(env, response_topic); + (*env)->SetObjectField(env, java_result, mqtt_request_response_properties.topic_field_id, java_topic); + + jbyteArray java_payload = aws_jni_byte_array_from_cursor(env, payload); + (*env)->SetObjectField(env, java_result, mqtt_request_response_properties.payload_field_id, java_payload); + } + } + + if (java_result != NULL) { + (*env)->CallBooleanMethod( + env, binding->operation_future, completable_future_properties.complete_method_id, java_result); + } else { + int final_error_code = (error_code == AWS_ERROR_SUCCESS) ? AWS_ERROR_UNKNOWN : error_code; + jobject crt_exception = aws_jni_new_crt_exception_from_error_code(env, final_error_code); + + (*env)->CallBooleanMethod( + env, + binding->operation_future, + completable_future_properties.complete_exceptionally_method_id, + crt_exception); + } + +done: + + aws_jni_release_thread_env(binding->jvm, env); + + s_aws_request_response_operation_binding_destroy(binding); +} + JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_iot_MqttRequestResponseClient_mqttRequestResponseClientSubmitRequest( JNIEnv *env, @@ -359,15 +479,82 @@ JNIEXPORT void JNICALL (void)jni_class; - struct aws_request_response_operation_binding *binding = - aws_mem_calloc(allocator, 1, sizeof(struct aws_request_response_operation_binding)); + struct aws_mqtt_request_response_client *rr_client = + (struct aws_mqtt_request_response_client *)jni_mqtt_request_response_client_handle; + if (!rr_client || !java_request || !java_result_future) { + aws_jni_throw_runtime_exception(env, "mqttRequestResponseClientSubmitRequest: null parameter"); + return; + } - (void)env; - (void)jni_mqtt_request_response_client_handle; - (void)java_request; - (void)java_result_future; + JavaVM *jvm = NULL; + jint jvmresult = (*env)->GetJavaVM(env, &jvm); + if (jvmresult != 0) { + aws_jni_throw_runtime_exception(env, "mqttRequestResponseClientSubmitRequest: failed to get JVM"); + return; + } + + struct aws_request_response_operation_binding *binding = NULL; + struct aws_allocator *allocator = aws_jni_get_allocator(); + struct aws_request_response_operation_jni_owned_parameters request_params; + if (s_aws_request_response_operation_jni_owned_parameters_init_from_jobject( + &request_params, allocator, java_request, env)) { + s_aws_request_response_operation_jni_owned_parameters_clean_up(&request_params, env); + return; + } + + binding = aws_mem_calloc(allocator, 1, sizeof(struct aws_request_response_operation_binding)); + binding->allocator = allocator; + binding->operation_future = (*env)->NewGlobalRef(env, java_result_future); + binding->jvm = jvm; + + struct aws_mqtt_request_operation_options request_options; + AWS_ZERO_STRUCT(request_options); + + size_t subscription_count = aws_array_list_length(&request_params.subscriptions); + struct aws_byte_cursor subscriptions[subscription_count]; + for (size_t i = 0; i < subscription_count; ++i) { + struct aws_jni_subscription subscription; + AWS_ZERO_STRUCT(subscription); + aws_array_list_get_at(&request_params.subscriptions, &subscription, i); + + subscriptions[i] = subscription.topic_filter; + } + + size_t response_path_count = aws_array_list_length(&request_params.response_paths); + struct aws_mqtt_request_operation_response_path response_paths[response_path_count]; + for (size_t i = 0; i < response_path_count; ++i) { + struct aws_jni_response_path response_path; + AWS_ZERO_STRUCT(response_path); + aws_array_list_get_at(&request_params.response_paths, &response_path, i); + + response_paths[i].topic = response_path.response_topic; + response_paths[i].correlation_token_json_path = response_path.correlation_token_json_path; + } + + request_options.subscription_topic_filters = subscriptions; + request_options.subscription_topic_filter_count = subscription_count; + request_options.response_paths = response_paths; + request_options.response_path_count = response_path_count; + request_options.publish_topic = request_params.publish_topic; + request_options.serialized_request = request_params.payload; + request_options.correlation_token = request_params.correlation_token; + request_options.completion_callback = s_on_request_response_operation_completion; + request_options.user_data = binding; + + if (aws_mqtt_request_response_client_submit_request(rr_client, &request_options)) { + aws_jni_throw_runtime_exception(env, "mqttRequestResponseClientSubmitRequest - failed to submit request"); + goto error; + } + + goto done; error: + + s_aws_request_response_operation_binding_destroy(binding); + +done: + + s_aws_request_response_operation_jni_owned_parameters_clean_up(&request_params, env); } #if UINTPTR_MAX == 0xffffffff