blob: df7ca7be15cab6eebc1a7ffffb08ec4f62161248 [file] [log] [blame] [edit]
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#include <aws/mqtt/v5/mqtt5_client.h>
#include <aws/http/proxy.h>
#include <aws/io/event_loop.h>
#include <aws/io/socket.h>
#include <aws/io/tls_channel_handler.h>
#include <crt.h>
#include <http_request_utils.h>
#include <java_class_ids.h>
#include <jni.h>
#include <mqtt5_client_jni.h>
#include <mqtt5_packets.h>
#include <mqtt5_utils.h>
/* on 32-bit platforms, casting pointers to longs throws a warning we don't need */
#if UINTPTR_MAX == 0xffffffff
# if defined(_MSC_VER)
# pragma warning(push)
# pragma warning(disable : 4305) /* 'type cast': truncation from 'jlong' to 'jni_tls_ctx_options *' */
# else
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wpointer-to-int-cast"
# pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
# endif
#endif
/*******************************************************************************
* CLIENT ONLY STRUCTS
******************************************************************************/
struct aws_mqtt5_client_publish_return_data {
struct aws_mqtt5_client_java_jni *java_client;
jobject jni_publish_future;
};
struct aws_mqtt5_client_subscribe_return_data {
struct aws_mqtt5_client_java_jni *java_client;
jobject jni_subscribe_future;
};
struct aws_mqtt5_client_unsubscribe_return_data {
struct aws_mqtt5_client_java_jni *java_client;
jobject jni_unsubscribe_future;
};
struct aws_http_proxy_options_java_jni {
struct aws_http_proxy_options options;
struct aws_byte_buf proxy_host_buf;
struct aws_byte_cursor proxy_host_cursor;
struct aws_byte_buf authorization_username_buf;
struct aws_byte_cursor authorization_username_cursor;
struct aws_byte_buf authorization_password_buf;
struct aws_byte_cursor authorization_password_cursor;
};
/*******************************************************************************
* HELPER FUNCTION (LOGGING)
******************************************************************************/
static void s_aws_mqtt5_client_log_and_throw_exception(JNIEnv *env, const char *message, int error_code) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "%s - error code: %i", message, error_code);
// raise error to update the "last_error_code"
aws_raise_error(error_code);
aws_jni_throw_runtime_exception(env, "%s - error code: %i", message, error_code);
}
/*******************************************************************************
* HTTP PROXY FUNCTIONS
******************************************************************************/
static void s_aws_mqtt5_http_proxy_options_java_destroy(
JNIEnv *env,
struct aws_allocator *allocator,
struct aws_http_proxy_options_java_jni *http_options) {
(void)env;
if (!http_options) {
return;
}
AWS_LOGF_DEBUG(AWS_LS_MQTT5_CLIENT, "id=%p: Destroying JavaHttpProxyOptions", (void *)http_options);
if (aws_byte_buf_is_valid(&http_options->proxy_host_buf)) {
aws_byte_buf_clean_up(&http_options->proxy_host_buf);
}
if (aws_byte_buf_is_valid(&http_options->authorization_username_buf)) {
aws_byte_buf_clean_up(&http_options->authorization_username_buf);
}
if (aws_byte_buf_is_valid(&http_options->authorization_password_buf)) {
aws_byte_buf_clean_up(&http_options->authorization_password_buf);
}
/* Frees all allocated memory */
aws_mem_release(allocator, http_options);
}
static struct aws_http_proxy_options_java_jni *s_aws_mqtt5_http_proxy_options_create_from_java(
JNIEnv *env,
struct aws_allocator *allocator,
struct aws_mqtt5_client_java_jni *java_client,
jobject java_http_proxy_options) {
/* Cannot fail */
struct aws_http_proxy_options_java_jni *http_options =
aws_mem_calloc(allocator, 1, sizeof(struct aws_http_proxy_options_java_jni));
AWS_LOGF_DEBUG(
AWS_LS_MQTT5_CLIENT, "JavaHttpProxyOptions=%p: Created new JavaHttpProxyOptions", (void *)http_options);
jobject jni_proxy_connection_type = (*env)->CallObjectMethod(
env, java_http_proxy_options, http_proxy_options_properties.proxy_get_connection_type_id);
if (aws_jni_check_and_clear_exception(env)) {
goto on_error;
}
if (jni_proxy_connection_type) {
jint jni_proxy_connection_type_value = (*env)->CallIntMethod(
env, jni_proxy_connection_type, http_proxy_connection_type_properties.proxy_get_value_id);
if (aws_jni_check_and_clear_exception(env)) {
goto on_error;
}
if (jni_proxy_connection_type_value) {
int32_t jni_proxy_connection_type_value_check = (int32_t)jni_proxy_connection_type_value;
if (jni_proxy_connection_type_value_check < 0) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "HTTP Proxy Options connection type is less than 0");
goto on_error;
} else if (jni_proxy_connection_type_value_check > AWS_HPCT_HTTP_TUNNEL) { /* The (current) maximum enum */
/* value */
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CLIENT, "HTTP Proxy Options connection type is more than maximum allowed value");
goto on_error;
} else {
http_options->options.connection_type =
(enum aws_http_proxy_connection_type)jni_proxy_connection_type_value;
}
}
}
jstring jni_proxy_host = (jstring)(*env)->CallObjectMethod(
env, java_http_proxy_options, http_proxy_options_properties.proxy_get_proxy_host_id);
if (aws_jni_check_and_clear_exception(env)) {
goto on_error;
}
if (jni_proxy_host) {
// Get the data, copy it, and then release the JNI stuff
struct aws_byte_cursor tmp_cursor = aws_jni_byte_cursor_from_jstring_acquire(env, jni_proxy_host);
aws_byte_buf_init_copy_from_cursor(&http_options->proxy_host_buf, aws_jni_get_allocator(), tmp_cursor);
http_options->proxy_host_cursor = aws_byte_cursor_from_buf(&http_options->proxy_host_buf);
aws_jni_byte_cursor_from_jstring_release(env, jni_proxy_host, tmp_cursor);
http_options->options.host = http_options->proxy_host_cursor;
}
jint jni_proxy_port =
(*env)->CallIntMethod(env, java_http_proxy_options, http_proxy_options_properties.proxy_get_proxy_port_id);
if (aws_jni_check_and_clear_exception(env)) {
goto on_error;
}
if (jni_proxy_port) {
http_options->options.port = (uint32_t)jni_proxy_port;
}
jobject jni_proxy_tls_context = (*env)->CallObjectMethod(
env, java_http_proxy_options, http_proxy_options_properties.proxy_get_proxy_tls_context_id);
if (aws_jni_check_and_clear_exception(env)) {
goto on_error;
}
if (jni_proxy_tls_context) {
jlong jni_proxy_tls_context_long =
(*env)->CallLongMethod(env, jni_proxy_tls_context, crt_resource_properties.get_native_handle_method_id);
struct aws_tls_ctx *tls_ctx = (struct aws_tls_ctx *)jni_proxy_tls_context_long;
if (tls_ctx) {
aws_tls_connection_options_init_from_ctx(&java_client->http_proxy_tls_options, tls_ctx);
aws_tls_connection_options_set_server_name(
&java_client->http_proxy_tls_options, allocator, &http_options->options.host);
http_options->options.tls_options = &java_client->http_proxy_tls_options;
}
}
jobject jni_proxy_authorization_type = (*env)->CallObjectMethod(
env, java_http_proxy_options, http_proxy_options_properties.proxy_get_proxy_authorization_type_id);
if (aws_jni_check_and_clear_exception(env)) {
goto on_error;
}
if (jni_proxy_authorization_type) {
jint jni_proxy_authorization_type_value = (*env)->CallIntMethod(
env, jni_proxy_authorization_type, http_proxy_connection_type_properties.proxy_get_value_id);
if (aws_jni_check_and_clear_exception(env)) {
goto on_error;
}
http_options->options.auth_type = (enum aws_http_proxy_authentication_type)jni_proxy_authorization_type_value;
}
jstring jni_proxy_authorization_username = (jstring)(*env)->CallObjectMethod(
env, java_http_proxy_options, http_proxy_options_properties.proxy_get_authorization_username_id);
if (aws_jni_check_and_clear_exception(env)) {
goto on_error;
}
if (jni_proxy_authorization_username) {
// Get the data, copy it, and then release the JNI stuff
struct aws_byte_cursor tmp_cursor =
aws_jni_byte_cursor_from_jstring_acquire(env, jni_proxy_authorization_username);
aws_byte_buf_init_copy_from_cursor(
&http_options->authorization_username_buf, aws_jni_get_allocator(), tmp_cursor);
http_options->authorization_username_cursor =
aws_byte_cursor_from_buf(&http_options->authorization_username_buf);
aws_jni_byte_cursor_from_jstring_release(env, jni_proxy_authorization_username, tmp_cursor);
http_options->options.auth_username = http_options->authorization_username_cursor;
}
jstring jni_proxy_authorization_password = (jstring)(*env)->CallObjectMethod(
env, java_http_proxy_options, http_proxy_options_properties.proxy_get_authorization_password_id);
if (aws_jni_check_and_clear_exception(env)) {
goto on_error;
}
if (jni_proxy_authorization_password) {
// Get the data, copy it, and then release the JNI stuff
struct aws_byte_cursor tmp_cursor =
aws_jni_byte_cursor_from_jstring_acquire(env, jni_proxy_authorization_password);
aws_byte_buf_init_copy_from_cursor(
&http_options->authorization_password_buf, aws_jni_get_allocator(), tmp_cursor);
http_options->authorization_password_cursor =
aws_byte_cursor_from_buf(&http_options->authorization_password_buf);
aws_jni_byte_cursor_from_jstring_release(env, jni_proxy_authorization_password, tmp_cursor);
http_options->options.auth_password = http_options->authorization_password_cursor;
}
return http_options;
on_error:
s_aws_mqtt5_http_proxy_options_java_destroy(env, allocator, http_options);
return NULL;
}
/*******************************************************************************
* HELPER FUNCTIONS
******************************************************************************/
static void aws_mqtt5_client_java_destroy(
JNIEnv *env,
struct aws_allocator *allocator,
struct aws_mqtt5_client_java_jni *java_client) {
AWS_PRECONDITION(java_client);
if (!java_client) {
return;
}
AWS_LOGF_DEBUG(AWS_LS_MQTT5_CLIENT, "java_client=%p: Destroying MQTT5 client", (void *)java_client);
if (java_client->jni_client) {
(*env)->DeleteGlobalRef(env, java_client->jni_client);
}
if (java_client->jni_publish_events) {
(*env)->DeleteGlobalRef(env, java_client->jni_publish_events);
}
if (java_client->jni_lifecycle_events) {
(*env)->DeleteGlobalRef(env, java_client->jni_lifecycle_events);
}
aws_tls_connection_options_clean_up(&java_client->tls_options);
aws_tls_connection_options_clean_up(&java_client->http_proxy_tls_options);
/* Frees allocated memory */
aws_mem_release(allocator, java_client);
}
static void s_complete_future_with_exception(JNIEnv *env, jobject *future, int error_code) {
if (!env || !future) {
return;
}
jobject crt_exception = aws_jni_new_crt_exception_from_error_code(env, error_code);
(*env)->CallBooleanMethod(
env, *future, completable_future_properties.complete_exceptionally_method_id, crt_exception);
aws_jni_check_and_clear_exception(env);
(*env)->DeleteLocalRef(env, crt_exception);
}
static void s_aws_count_allocation(const void *pointer, size_t *counter) {
if (pointer != NULL) {
*counter += 1;
}
}
static char s_client_string[] = "MQTT5 Client";
/*******************************************************************************
* MQTT5 CALLBACK FUNCTIONS
******************************************************************************/
static void s_aws_mqtt5_client_java_lifecycle_event(const struct aws_mqtt5_client_lifecycle_event *event) {
struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)event->user_data;
if (!java_client) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "LifecycleEvent: invalid client");
return;
}
/********** JNI ENV ACQUIRE **********/
JavaVM *jvm = java_client->jvm;
JNIEnv *env = aws_jni_acquire_thread_env(jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "LifecycleEvent: could not get env");
return;
}
/* Calculate the number of references needed (1 is always needed for the return struct) */
size_t references_needed = 1;
if (event->connack_data != NULL) {
/* A ConnAck packet will need 2 references at minimum */
references_needed += 2;
/* Optionals */
s_aws_count_allocation(event->connack_data->assigned_client_identifier, &references_needed);
s_aws_count_allocation(event->connack_data->authentication_data, &references_needed);
s_aws_count_allocation(event->connack_data->authentication_method, &references_needed);
s_aws_count_allocation(event->connack_data->maximum_packet_size, &references_needed);
s_aws_count_allocation(event->connack_data->maximum_qos, &references_needed);
s_aws_count_allocation(event->connack_data->reason_string, &references_needed);
s_aws_count_allocation(event->connack_data->receive_maximum, &references_needed);
s_aws_count_allocation(event->connack_data->response_information, &references_needed);
s_aws_count_allocation(event->connack_data->retain_available, &references_needed);
s_aws_count_allocation(event->connack_data->server_keep_alive, &references_needed);
s_aws_count_allocation(event->connack_data->server_reference, &references_needed);
s_aws_count_allocation(event->connack_data->session_expiry_interval, &references_needed);
s_aws_count_allocation(event->connack_data->shared_subscriptions_available, &references_needed);
s_aws_count_allocation(event->connack_data->subscription_identifiers_available, &references_needed);
s_aws_count_allocation(event->connack_data->topic_alias_maximum, &references_needed);
s_aws_count_allocation(event->connack_data->wildcard_subscriptions_available, &references_needed);
/* Add user properties */
references_needed += event->connack_data->user_property_count * 2;
references_needed += 2; /* Add 2 for arrays to hold user properties */
}
if (event->disconnect_data != NULL) {
/* A Disconnect packet will need 1 reference at a minimum */
references_needed += 1;
/* Optionals */
s_aws_count_allocation(event->disconnect_data->reason_string, &references_needed);
s_aws_count_allocation(event->disconnect_data->server_reference, &references_needed);
s_aws_count_allocation(event->disconnect_data->session_expiry_interval_seconds, &references_needed);
/* Add user properties */
references_needed += event->disconnect_data->user_property_count * 2;
references_needed += 2; /* Add 1 for array to hold user properties */
}
if (event->settings != NULL) {
/* Negotiated settings only need 2 references, one for the ClientID and another for the object */
references_needed += 2;
}
/* Make a local frame so we can clean memory */
jint local_frame_result = (*env)->PushLocalFrame(env, (jint)references_needed);
if (local_frame_result != 0) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "LifecycleEvent: could not push local JNI frame with 14 allocation minimum!", AWS_ERROR_INVALID_STATE);
aws_jni_release_thread_env(jvm, env);
return;
}
jobject connack_data = NULL;
if (event->connack_data != NULL) {
connack_data = s_aws_mqtt5_client_create_jni_connack_packet_from_native(env, event->connack_data);
if (connack_data == NULL) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "LifecycleEvent: creating ConnAck packet failed!");
goto clean_up;
}
}
jobject disconnect_data = NULL;
if (event->disconnect_data != NULL) {
disconnect_data = s_aws_mqtt5_client_create_jni_disconnect_packet_from_native(env, event->disconnect_data);
if (disconnect_data == NULL) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "LifecycleEvent: creating Disconnect packet failed!");
goto clean_up;
}
}
jobject negotiated_settings_data = NULL;
if (event->settings != NULL) {
negotiated_settings_data = s_aws_mqtt5_client_create_jni_negotiated_settings_from_native(env, event->settings);
}
jobject jni_lifecycle_events = java_client->jni_lifecycle_events;
if (!jni_lifecycle_events) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "LifecycleEvent: no lifecycle events found!", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
jobject java_lifecycle_return_data;
switch (event->event_type) {
case AWS_MQTT5_CLET_ATTEMPTING_CONNECT:
/* Make the OnAttemptingConnectReturn struct */
java_lifecycle_return_data = (*env)->NewObject(
env,
mqtt5_on_attempting_connect_return_properties.return_class,
mqtt5_on_attempting_connect_return_properties.return_constructor_id);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
(*env)->CallVoidMethod(
env,
jni_lifecycle_events,
mqtt5_lifecycle_events_properties.lifecycle_attempting_connect_id,
java_client->jni_client,
java_lifecycle_return_data);
break;
case AWS_MQTT5_CLET_CONNECTION_SUCCESS:
/* Make the OnConnectionSuccessReturn struct */
java_lifecycle_return_data = (*env)->NewObject(
env,
mqtt5_on_connection_success_return_properties.return_class,
mqtt5_on_connection_success_return_properties.return_constructor_id,
connack_data,
negotiated_settings_data);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
/* Set OnConnected BEFORE calling the callback so it is accurate in the callback itself. */
(*env)->CallVoidMethod(env, java_client->jni_client, mqtt5_client_properties.client_set_is_connected, true);
(*env)->CallVoidMethod(
env,
jni_lifecycle_events,
mqtt5_lifecycle_events_properties.lifecycle_connection_success_id,
java_client->jni_client,
java_lifecycle_return_data);
break;
case AWS_MQTT5_CLET_CONNECTION_FAILURE: {
jint error_code = (jint)event->error_code;
/* Make the OnConnectionFailureReturn struct */
java_lifecycle_return_data = (*env)->NewObject(
env,
mqtt5_on_connection_failure_return_properties.return_class,
mqtt5_on_connection_failure_return_properties.return_constructor_id,
error_code,
connack_data);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
(*env)->CallVoidMethod(
env,
jni_lifecycle_events,
mqtt5_lifecycle_events_properties.lifecycle_connection_failure_id,
java_client->jni_client,
java_lifecycle_return_data);
break;
}
case AWS_MQTT5_CLET_DISCONNECTION: {
jint error_code = (jint)event->error_code;
/* Make the OnDisconnectionReturn struct */
java_lifecycle_return_data = (*env)->NewObject(
env,
mqtt5_on_disconnection_return_properties.return_class,
mqtt5_on_disconnection_return_properties.return_constructor_id,
error_code,
disconnect_data);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
/* Set OnConnected BEFORE calling the callback so it is accurate in the callback itself. */
(*env)->CallVoidMethod(
env, java_client->jni_client, mqtt5_client_properties.client_set_is_connected, false);
(*env)->CallVoidMethod(
env,
jni_lifecycle_events,
mqtt5_lifecycle_events_properties.lifecycle_disconnection_id,
java_client->jni_client,
java_lifecycle_return_data);
break;
}
case AWS_MQTT5_CLET_STOPPED:
/* Make the OnStopped struct */
java_lifecycle_return_data = (*env)->NewObject(
env,
mqtt5_on_stopped_return_properties.return_class,
mqtt5_on_stopped_return_properties.return_constructor_id);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
(*env)->CallVoidMethod(
env,
jni_lifecycle_events,
mqtt5_lifecycle_events_properties.lifecycle_stopped_id,
java_client->jni_client,
java_lifecycle_return_data);
break;
default:
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "LifecycleEvent: unsupported event type: %i", event->event_type);
}
goto clean_up;
clean_up:
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
(*env)->PopLocalFrame(env, NULL);
/********** JNI ENV RELEASE **********/
aws_jni_release_thread_env(jvm, env);
}
static void s_aws_mqtt5_client_java_publish_received(
const struct aws_mqtt5_packet_publish_view *publish,
void *user_data) {
struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)user_data;
if (!java_client) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "publishReceived function: invalid client");
return;
}
if (!publish) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "publishReceived function: invalid publish packet");
return;
}
/********** JNI ENV ACQUIRE **********/
JavaVM *jvm = java_client->jvm;
JNIEnv *env = aws_jni_acquire_thread_env(jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "publishReceived function: could not get env");
return;
}
/* Calculate the number of references needed */
size_t references_needed = 0;
{
/* One reference is needed for the PublishReturn */
references_needed += 1;
/* A Publish packet will need 5 references at minimum */
references_needed += 5;
/* Optionals */
s_aws_count_allocation(publish->content_type, &references_needed);
s_aws_count_allocation(publish->correlation_data, &references_needed);
s_aws_count_allocation(publish->message_expiry_interval_seconds, &references_needed);
s_aws_count_allocation(publish->response_topic, &references_needed);
s_aws_count_allocation(publish->topic_alias, &references_needed);
s_aws_count_allocation(publish->payload_format, &references_needed);
/* Add user properties and subscription identifiers */
references_needed += publish->user_property_count * 2;
references_needed += 1; /* Add 1 for array to hold user properties */
if (publish->subscription_identifier_count > 0) {
references_needed += publish->subscription_identifier_count;
references_needed += 1; /* Add 1 for array */
}
}
/**
* Push a new local frame so any local references we make are tied to it. Then we can pop it to free memory.
*/
jint local_frame_result = (*env)->PushLocalFrame(env, (jint)references_needed);
if (local_frame_result != 0) {
s_aws_mqtt5_client_log_and_throw_exception(
env,
"publishReceived function: could not push local JNI frame with 12 allocation minimum!",
AWS_ERROR_INVALID_STATE);
goto clean_up;
}
/* The return result */
jobject publish_packet_return_data;
/* Make the PublishPacket */
jobject publish_packet_data = s_aws_mqtt5_client_create_jni_publish_packet_from_native(env, publish);
if (publish_packet_data == NULL) {
goto clean_up;
}
/* Make the PublishReturn struct that will hold all of the data that is passed to Java */
publish_packet_return_data = (*env)->NewObject(
env,
mqtt5_publish_return_properties.return_class,
mqtt5_publish_return_properties.return_constructor_id,
publish_packet_data);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
if (java_client->jni_publish_events) {
(*env)->CallVoidMethod(
env,
java_client->jni_publish_events,
mqtt5_publish_events_properties.publish_events_publish_received_id,
java_client->jni_client,
publish_packet_return_data);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
}
goto clean_up;
clean_up:
(*env)->PopLocalFrame(env, NULL);
/********** JNI ENV RELEASE **********/
aws_jni_release_thread_env(jvm, env);
}
static void s_aws_mqtt5_client_java_publish_callback_destructor(
JNIEnv *env,
struct aws_mqtt5_client_publish_return_data *callback_return_data) {
struct aws_allocator *allocator = aws_jni_get_allocator();
if (callback_return_data != NULL) {
if (callback_return_data->jni_publish_future && env != NULL) {
(*env)->DeleteGlobalRef(env, callback_return_data->jni_publish_future);
}
aws_mem_release(allocator, callback_return_data);
}
}
static void s_aws_mqtt5_client_java_publish_completion(
enum aws_mqtt5_packet_type packet_type,
const void *packet,
int error_code,
void *user_data) {
int exception_error_code = error_code;
JavaVM *jvm = NULL;
JNIEnv *env = NULL;
bool has_pushed_frame = false;
struct aws_mqtt5_client_publish_return_data *return_data = (struct aws_mqtt5_client_publish_return_data *)user_data;
if (!return_data) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "PublishCompletion function: invalid return data!");
return;
}
struct aws_mqtt5_client_java_jni *java_client = return_data->java_client;
if (!java_client) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "PublishCompletion function: invalid client");
goto clean_up;
}
/********** JNI ENV ACQUIRE **********/
jvm = java_client->jvm;
env = aws_jni_acquire_thread_env(jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "PublishCompletion function: could not get env");
goto clean_up;
}
/* Get the future for this specific publish and complete right away if there is an error */
jobject jni_publish_future = return_data->jni_publish_future;
if (error_code != AWS_OP_SUCCESS) {
goto exception;
}
/* If this result is supposed to have a packet and does not, then error right away */
if (packet_type != AWS_MQTT5_PT_NONE && packet == NULL) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "PublishCompletion function: packet type but no packet!");
goto exception;
}
/* Calculate the number of references needed */
size_t references_needed = 0;
if (packet_type == AWS_MQTT5_PT_PUBACK) {
struct aws_mqtt5_packet_puback_view *puback_packet = (struct aws_mqtt5_packet_puback_view *)packet;
/* A PubAck packet will need 2 references at minimum */
references_needed += 2;
/* Optionals */
s_aws_count_allocation(puback_packet->reason_string, &references_needed);
/* Add user properties */
references_needed += puback_packet->user_property_count * 2;
references_needed += 1; /* Add 1 for array to hold user properties */
}
/**
* Push a new local frame so any local references we make are tied to it. Then we can pop it to free memory.
*/
jint local_frame_result = (*env)->PushLocalFrame(env, (jint)references_needed);
if (local_frame_result != 0) {
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CLIENT,
"PublishCompletion function: could not push local JNI frame with 12 allocation minimum");
exception_error_code = AWS_ERROR_INVALID_STATE;
goto exception;
}
has_pushed_frame = true;
/* The result */
jobject publish_packet_result_data;
if (packet_type == AWS_MQTT5_PT_NONE) {
/* QoS 0 */
publish_packet_result_data = (*env)->NewObject(
env, mqtt5_publish_result_properties.result_class, mqtt5_publish_result_properties.result_constructor_id);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
} else if (packet_type == AWS_MQTT5_PT_PUBACK) {
/* QoS 1 */
struct aws_mqtt5_packet_puback_view *puback_packet = (struct aws_mqtt5_packet_puback_view *)packet;
/* Make the PubAck packet */
jobject puback_packet_data = s_aws_mqtt5_client_create_jni_puback_packet_from_native(env, puback_packet);
if (puback_packet_data == NULL) {
exception_error_code = AWS_ERROR_INVALID_STATE;
goto exception;
}
/* Make the result and populate it with data made above */
publish_packet_result_data = (*env)->NewObject(
env,
mqtt5_publish_result_properties.result_class,
mqtt5_publish_result_properties.result_puback_constructor_id,
puback_packet_data);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
} else {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "PublishCompletion function called with unknown packet type!");
exception_error_code = AWS_ERROR_INVALID_STATE;
goto exception;
}
/* Complete the future */
(*env)->CallBooleanMethod(
env, jni_publish_future, completable_future_properties.complete_method_id, publish_packet_result_data);
if (aws_jni_check_and_clear_exception(env)) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "PublishCompletion function: exception when completing future");
goto exception;
}
goto clean_up;
exception:
s_complete_future_with_exception(env, &jni_publish_future, exception_error_code);
goto clean_up;
clean_up:
s_aws_mqtt5_client_java_publish_callback_destructor(env, return_data);
if (env != NULL) {
if (has_pushed_frame) {
(*env)->PopLocalFrame(env, NULL);
}
/********** JNI ENV RELEASE **********/
aws_jni_release_thread_env(jvm, env);
}
return;
}
static void s_aws_mqtt5_client_java_subscribe_callback_destructor(
JNIEnv *env,
struct aws_mqtt5_client_subscribe_return_data *callback_return_data) {
struct aws_allocator *allocator = aws_jni_get_allocator();
if (callback_return_data != NULL) {
if (callback_return_data->jni_subscribe_future && env != NULL) {
(*env)->DeleteGlobalRef(env, callback_return_data->jni_subscribe_future);
}
aws_mem_release(allocator, callback_return_data);
}
}
static void s_aws_mqtt5_client_java_subscribe_completion(
const struct aws_mqtt5_packet_suback_view *suback,
int error_code,
void *user_data) {
int exception_error_code = error_code;
JNIEnv *env = NULL;
JavaVM *jvm = NULL;
bool has_pushed_frame = false;
struct aws_mqtt5_client_subscribe_return_data *return_data =
(struct aws_mqtt5_client_subscribe_return_data *)user_data;
if (!return_data) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "SubscribeCompletion: invalid return data!");
return;
}
struct aws_mqtt5_client_java_jni *java_client = return_data->java_client;
if (!java_client) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "SubscribeCompletion: invalid client");
goto clean_up;
}
/********** JNI ENV ACQUIRE **********/
jvm = java_client->jvm;
env = aws_jni_acquire_thread_env(jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "SubscribeCompletion: could not get env");
goto clean_up;
}
/* Get the future for this specific subscribe */
jobject jni_subscribe_future = return_data->jni_subscribe_future;
if (error_code != AWS_OP_SUCCESS) {
exception_error_code = error_code;
goto exception;
}
/* Calculate the number of references needed */
size_t references_needed = 0;
if (suback != NULL) {
/* A SubAck packet will need 1 references at minimum */
references_needed += 1;
/* Optionals */
s_aws_count_allocation(suback->reason_string, &references_needed);
/* Add user properties and reason codes */
references_needed += (suback->user_property_count) * 2;
references_needed += 1; /* Add 1 for arrays to hold user properties */
if (suback->reason_code_count > 0) {
references_needed += suback->reason_code_count;
references_needed += 1; /* Add 1 for arrays to hold reason codes */
}
}
/**
* Push a new local frame so any local references we make are tied to it. Then we can pop it to free memory.
*/
jint local_frame_result = (*env)->PushLocalFrame(env, (jint)references_needed);
if (local_frame_result != 0) {
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CLIENT, "SubscribeCompletion: could not push local JNI frame with 4 allocation minimum");
exception_error_code = AWS_ERROR_INVALID_STATE;
goto exception;
}
has_pushed_frame = true;
/* The SubAck to return (if present) */
jobject suback_packet_data = NULL;
if (suback != NULL) {
suback_packet_data = (*env)->NewObject(
env,
mqtt5_suback_packet_properties.suback_packet_class,
mqtt5_suback_packet_properties.suback_constructor_id);
if (s_set_jni_string_field_in_packet(
env,
suback->reason_string,
suback_packet_data,
mqtt5_suback_packet_properties.suback_reason_string_field_id,
"reason string",
true) != AWS_OP_SUCCESS) {
goto clean_up;
}
if (suback->reason_codes != NULL) {
if (suback->reason_code_count > 0) {
for (size_t i = 0; i < suback->reason_code_count; ++i) {
const enum aws_mqtt5_suback_reason_code *reason_code_data = &suback->reason_codes[i];
if (s_set_int_enum_in_packet(
env,
(int *)reason_code_data,
suback_packet_data,
mqtt5_suback_packet_properties.suback_native_add_suback_code_id,
false) != AWS_OP_SUCCESS) {
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CLIENT,
"Error when creating SubAckPacket from native: Could not set reason code");
exception_error_code = AWS_ERROR_INVALID_STATE;
goto exception;
}
}
}
}
if (s_set_user_properties_field(
env,
suback->user_property_count,
suback->user_properties,
suback_packet_data,
mqtt5_suback_packet_properties.suback_user_properties_field_id) == AWS_OP_ERR) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "When creating PubAckPacket from native could not add user property!");
exception_error_code = AWS_ERROR_INVALID_STATE;
goto exception;
}
}
/* Complete the promise */
(*env)->CallBooleanMethod(
env, jni_subscribe_future, completable_future_properties.complete_method_id, suback_packet_data);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
goto clean_up;
exception:
s_complete_future_with_exception(env, &jni_subscribe_future, exception_error_code);
goto clean_up;
clean_up:
s_aws_mqtt5_client_java_subscribe_callback_destructor(env, return_data);
if (env != NULL) {
if (has_pushed_frame) {
(*env)->PopLocalFrame(env, NULL);
}
/********** JNI ENV RELEASE **********/
aws_jni_release_thread_env(jvm, env);
}
}
static void s_aws_mqtt5_client_java_unsubscribe_callback_destructor(
JNIEnv *env,
struct aws_mqtt5_client_unsubscribe_return_data *callback_return_data) {
struct aws_allocator *allocator = aws_jni_get_allocator();
if (callback_return_data != NULL) {
if (callback_return_data->jni_unsubscribe_future && env != NULL) {
(*env)->DeleteGlobalRef(env, callback_return_data->jni_unsubscribe_future);
}
aws_mem_release(allocator, callback_return_data);
}
}
static void s_aws_mqtt5_client_java_unsubscribe_completion(
const struct aws_mqtt5_packet_unsuback_view *unsuback,
int error_code,
void *user_data) {
int exception_error_code = error_code;
JNIEnv *env = NULL;
JavaVM *jvm = NULL;
bool has_pushed_frame = false;
struct aws_mqtt5_client_unsubscribe_return_data *return_data =
(struct aws_mqtt5_client_unsubscribe_return_data *)user_data;
if (!return_data) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "UnsubscribeCompletion: invalid return data!");
return;
}
struct aws_mqtt5_client_java_jni *java_client = return_data->java_client;
if (!java_client) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "UnsubscribeCompletion: invalid client");
goto clean_up;
}
/********** JNI ENV ACQUIRE **********/
jvm = java_client->jvm;
env = aws_jni_acquire_thread_env(jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "UnsubscribeCompletion: could not get env");
goto clean_up;
}
/* Get the future for this specific unsubscribe */
jobject jni_unsubscribe_future = return_data->jni_unsubscribe_future;
/* Calculate the number of references needed */
size_t references_needed = 0;
if (unsuback != NULL) {
/* A UnsubAck packet will need 1 reference at minimum */
references_needed += 1;
/* Optionals */
s_aws_count_allocation(unsuback->reason_string, &references_needed);
/* Add user properties and reason codes */
references_needed += (unsuback->user_property_count) * 2;
references_needed += 1; /* Add 1 for array to hold user properties */
if (unsuback->reason_code_count > 0) {
references_needed += unsuback->reason_code_count;
references_needed += 1; /* Add 1 for array to hold reason codes */
}
}
/**
* Push a new local frame so any local allocations we make are tied to it. Then we can pop it to free memory.
*/
jint local_frame_result = (*env)->PushLocalFrame(env, (jint)references_needed);
if (local_frame_result != 0) {
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CLIENT, "UnsubscribeCompletion: could not push local JNI frame with 4 allocation minimum");
exception_error_code = AWS_ERROR_INVALID_STATE;
goto exception;
}
has_pushed_frame = true;
if (error_code != AWS_OP_SUCCESS) {
exception_error_code = error_code;
goto exception;
}
jobject unsuback_packet_data = (*env)->NewObject(
env,
mqtt5_unsuback_packet_properties.unsuback_packet_class,
mqtt5_unsuback_packet_properties.unsuback_constructor_id);
if (s_set_jni_string_field_in_packet(
env,
unsuback->reason_string,
unsuback_packet_data,
mqtt5_unsuback_packet_properties.unsuback_reason_string_field_id,
"reason string",
true) != AWS_OP_SUCCESS) {
goto clean_up;
}
if (unsuback->reason_codes) {
if (unsuback->reason_code_count > 0) {
for (size_t i = 0; i < unsuback->reason_code_count; ++i) {
const enum aws_mqtt5_unsuback_reason_code *reason_code_data = &unsuback->reason_codes[i];
if (s_set_int_enum_in_packet(
env,
(int *)reason_code_data,
unsuback_packet_data,
mqtt5_unsuback_packet_properties.unsuback_native_add_unsuback_code_id,
false) != AWS_OP_SUCCESS) {
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CLIENT,
"Error when creating UnsubAckPacket from native: Could not set reason code");
exception_error_code = AWS_ERROR_INVALID_STATE;
goto exception;
}
}
}
}
if (s_set_user_properties_field(
env,
unsuback->user_property_count,
unsuback->user_properties,
unsuback_packet_data,
mqtt5_unsuback_packet_properties.unsuback_user_properties_field_id) == AWS_OP_ERR) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "When creating UnsubAckPacket from native could not add user property!");
exception_error_code = AWS_ERROR_INVALID_STATE;
goto exception;
}
/* Complete the promise */
(*env)->CallBooleanMethod(
env, jni_unsubscribe_future, completable_future_properties.complete_method_id, unsuback_packet_data);
aws_jni_check_and_clear_exception(env); /* To hide JNI warning */
goto clean_up;
exception:
s_complete_future_with_exception(env, &jni_unsubscribe_future, exception_error_code);
goto clean_up;
clean_up:
s_aws_mqtt5_client_java_unsubscribe_callback_destructor(env, return_data);
if (env != NULL) {
if (has_pushed_frame) {
(*env)->PopLocalFrame(env, NULL);
}
/********** JNI ENV RELEASE **********/
aws_jni_release_thread_env(jvm, env);
}
}
static void s_aws_mqtt5_client_java_termination(void *complete_ctx) {
struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)complete_ctx;
if (!java_client) {
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CLIENT, "MQTT5 client termination function in JNI called, but with invalid java_client");
return;
}
/********** JNI ENV ACQUIRE **********/
JavaVM *jvm = java_client->jvm;
JNIEnv *env = aws_jni_acquire_thread_env(jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "MQTT5 client termination function in JNI called, but could not get env");
return;
}
(*env)->CallVoidMethod(env, java_client->jni_client, crt_resource_properties.release_references);
java_client->client = NULL;
struct aws_allocator *allocator = aws_jni_get_allocator();
aws_mqtt5_client_java_destroy(env, allocator, java_client);
/********** JNI ENV RELEASE **********/
aws_jni_release_thread_env(jvm, env);
}
/*******************************************************************************
* MQTT5 CLIENT FUNCTIONS
******************************************************************************/
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalStart(
JNIEnv *env,
jclass jni_class,
jlong jni_client) {
(void)jni_class;
aws_cache_jni_ids(env);
struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)jni_client;
if (!java_client) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.start: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT);
return;
}
if (!java_client->client) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.start: Invalid/null native client", AWS_ERROR_INVALID_ARGUMENT);
return;
}
int return_result = aws_mqtt5_client_start(java_client->client);
if (return_result != AWS_OP_SUCCESS) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.start: aws_mqtt5_client_start returned a non AWS_OP_SUCCESS code!", aws_last_error());
}
}
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalStop(
JNIEnv *env,
jclass jni_class,
jlong jni_client,
jobject jni_disconnect_packet) {
(void)jni_class;
aws_cache_jni_ids(env);
struct aws_allocator *allocator = aws_jni_get_allocator();
struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)jni_client;
if (!java_client) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.stop: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT);
return;
}
if (!java_client->client) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.stop: Invalid/null native client", AWS_ERROR_INVALID_ARGUMENT);
return;
}
struct aws_mqtt5_packet_disconnect_view_java_jni *java_disconnect_packet = NULL;
int return_result = AWS_OP_ERR;
if (jni_disconnect_packet) {
java_disconnect_packet =
aws_mqtt5_packet_disconnect_view_create_from_java(env, allocator, jni_disconnect_packet);
if (!java_disconnect_packet) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.stop: Invalid/null disconnect packet", aws_last_error());
goto clean_up;
}
}
return_result = aws_mqtt5_client_stop(
java_client->client, aws_mqtt5_packet_disconnect_view_get_packet(java_disconnect_packet), NULL);
if (return_result != AWS_OP_SUCCESS) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.stop: aws_mqtt5_client_stop returned a non AWS_OP_SUCCESS code!", return_result);
}
goto clean_up;
clean_up:
if (java_disconnect_packet) {
aws_mqtt5_packet_disconnect_view_java_destroy(env, allocator, java_disconnect_packet);
}
return;
}
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalPublish(
JNIEnv *env,
jclass jni_class,
jlong jni_client,
jobject jni_publish_packet,
jobject jni_publish_future) {
(void)jni_class;
aws_cache_jni_ids(env);
struct aws_allocator *allocator = aws_jni_get_allocator();
struct aws_mqtt5_packet_publish_view_java_jni *java_publish_packet = NULL;
struct aws_mqtt5_client_publish_return_data *return_data = NULL;
int error_code = 0;
struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)jni_client;
if (!java_client) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.publish: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT);
return;
}
if (!jni_publish_future) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.publish: Invalid/null publish future", AWS_ERROR_INVALID_ARGUMENT);
return;
}
if (!java_client->client) {
error_code = AWS_ERROR_INVALID_ARGUMENT;
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Mqtt5Client.publish: Invalid/null native client");
goto exception;
}
if (!jni_publish_packet) {
error_code = AWS_ERROR_INVALID_ARGUMENT;
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Mqtt5Client.publish: Invalid/Null publish packet!");
goto exception;
}
/* Cannot fail */
return_data = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt5_client_publish_return_data));
return_data->java_client = java_client;
return_data->jni_publish_future = (*env)->NewGlobalRef(env, jni_publish_future);
struct aws_mqtt5_publish_completion_options completion_options = {
.completion_callback = &s_aws_mqtt5_client_java_publish_completion,
.completion_user_data = return_data,
};
java_publish_packet = aws_mqtt5_packet_publish_view_create_from_java(env, allocator, jni_publish_packet);
if (!java_publish_packet) {
error_code = aws_last_error();
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Mqtt5Client.publish: Could not create native publish packet!");
goto exception;
}
int return_result = aws_mqtt5_client_publish(
java_client->client, aws_mqtt5_packet_publish_view_get_packet(java_publish_packet), &completion_options);
if (return_result != AWS_OP_SUCCESS) {
error_code = aws_last_error();
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CLIENT, "Mqtt5Client.publish: Could not publish packet! Error code: %i", return_result);
goto exception;
}
goto clean_up;
exception:
s_complete_future_with_exception(
env,
&jni_publish_future,
(error_code == AWS_ERROR_SUCCESS) ? AWS_ERROR_MQTT5_OPERATION_PROCESSING_FAILURE : error_code);
if (java_publish_packet) {
aws_mqtt5_packet_publish_view_java_destroy(env, allocator, java_publish_packet);
}
if (return_data) {
s_aws_mqtt5_client_java_publish_callback_destructor(env, return_data);
}
return;
clean_up:
if (java_publish_packet) {
aws_mqtt5_packet_publish_view_java_destroy(env, allocator, java_publish_packet);
}
}
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalSubscribe(
JNIEnv *env,
jclass jni_class,
jlong jni_client,
jobject jni_subscribe_packet,
jobject jni_subscribe_future) {
(void)jni_class;
aws_cache_jni_ids(env);
struct aws_allocator *allocator = aws_jni_get_allocator();
struct aws_mqtt5_client_subscribe_return_data *return_data = NULL;
struct aws_mqtt5_packet_subscribe_view_java_jni *java_subscribe_packet = NULL;
int error_code = AWS_ERROR_SUCCESS;
struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)jni_client;
if (!java_client) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.subscribe: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT);
return;
}
if (!jni_subscribe_future) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.subscribe: Invalid/null subscribe future", AWS_ERROR_INVALID_ARGUMENT);
return;
}
if (!java_client->client) {
error_code = AWS_ERROR_INVALID_ARGUMENT;
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Mqtt5Client.subscribe: Invalid/null native client");
goto exception;
}
if (!jni_subscribe_packet) {
error_code = AWS_ERROR_INVALID_ARGUMENT;
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Mqtt5Client.subscribe: Invalid/Null subscribe packet!");
goto exception;
}
/* Cannot fail */
return_data = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt5_client_subscribe_return_data));
return_data->java_client = java_client;
return_data->jni_subscribe_future = (*env)->NewGlobalRef(env, jni_subscribe_future);
struct aws_mqtt5_subscribe_completion_options completion_options = {
.completion_callback = &s_aws_mqtt5_client_java_subscribe_completion,
.completion_user_data = return_data,
};
java_subscribe_packet = aws_mqtt5_packet_subscribe_view_create_from_java(env, allocator, jni_subscribe_packet);
if (java_subscribe_packet == NULL) {
error_code = aws_last_error();
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Mqtt5Client.subscribe: Could not create native subscribe packet!");
goto exception;
}
int return_result = aws_mqtt5_client_subscribe(
java_client->client, aws_mqtt5_packet_subscribe_view_get_packet(java_subscribe_packet), &completion_options);
if (return_result != AWS_OP_SUCCESS) {
error_code = aws_last_error();
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Mqtt5Client.subscribe: Subscribe failed! Error code: %i", return_result);
goto exception;
}
goto clean_up;
exception:
s_complete_future_with_exception(
env,
&jni_subscribe_future,
(error_code == AWS_ERROR_SUCCESS) ? AWS_ERROR_MQTT5_OPERATION_PROCESSING_FAILURE : error_code);
if (java_subscribe_packet) {
aws_mqtt5_packet_subscribe_view_java_destroy(env, allocator, java_subscribe_packet);
}
if (return_data) {
s_aws_mqtt5_client_java_subscribe_callback_destructor(env, return_data);
}
return;
clean_up:
if (java_subscribe_packet) {
aws_mqtt5_packet_subscribe_view_java_destroy(env, allocator, java_subscribe_packet);
}
}
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalUnsubscribe(
JNIEnv *env,
jclass jni_class,
jlong jni_client,
jobject jni_unsubscribe_packet,
jobject jni_unsubscribe_future) {
(void)jni_class;
aws_cache_jni_ids(env);
struct aws_allocator *allocator = aws_jni_get_allocator();
struct aws_mqtt5_client_unsubscribe_return_data *return_data = NULL;
struct aws_mqtt5_packet_unsubscribe_view_java_jni *java_unsubscribe_packet = NULL;
int error_code = AWS_ERROR_SUCCESS;
struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)jni_client;
if (!java_client) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.unsubscribe: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT);
return;
}
if (!jni_unsubscribe_future) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.unsubscribe: Invalid/null unsubscribe future", AWS_ERROR_INVALID_ARGUMENT);
return;
}
if (!java_client->client) {
error_code = AWS_ERROR_INVALID_ARGUMENT;
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Mqtt5Client.unsubscribe: Invalid/null native client");
goto exception;
}
if (!jni_unsubscribe_packet) {
error_code = AWS_ERROR_INVALID_ARGUMENT;
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Mqtt5Client.unsubscribe: Invalid/Null unsubscribe packet!");
goto exception;
}
/* Cannot fail */
return_data = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt5_client_unsubscribe_return_data));
return_data->java_client = java_client;
return_data->jni_unsubscribe_future = (*env)->NewGlobalRef(env, jni_unsubscribe_future);
struct aws_mqtt5_unsubscribe_completion_options completion_options = {
.completion_callback = &s_aws_mqtt5_client_java_unsubscribe_completion,
.completion_user_data = return_data,
};
java_unsubscribe_packet =
aws_mqtt5_packet_unsubscribe_view_create_from_java(env, allocator, jni_unsubscribe_packet);
if (!java_unsubscribe_packet) {
error_code = aws_last_error();
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Mqtt5Client.unsubscribe: Could not create native unsubscribe packet!");
goto exception;
}
int return_result = aws_mqtt5_client_unsubscribe(
java_client->client,
aws_mqtt5_packet_unsubscribe_view_get_packet(java_unsubscribe_packet),
&completion_options);
if (return_result != AWS_OP_SUCCESS) {
error_code = aws_last_error();
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CLIENT, "Mqtt5Client.unsubscribe: Unsubscribe failed! Error code: %i", return_result);
goto exception;
}
goto clean_up;
exception:
s_complete_future_with_exception(
env,
&jni_unsubscribe_future,
(error_code == AWS_ERROR_SUCCESS) ? AWS_ERROR_MQTT5_OPERATION_PROCESSING_FAILURE : error_code);
if (java_unsubscribe_packet) {
aws_mqtt5_packet_unsubscribe_view_java_destroy(env, allocator, java_unsubscribe_packet);
}
if (return_data) {
s_aws_mqtt5_client_java_unsubscribe_callback_destructor(env, return_data);
}
return;
clean_up:
if (java_unsubscribe_packet) {
aws_mqtt5_packet_unsubscribe_view_java_destroy(env, allocator, java_unsubscribe_packet);
}
}
JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalGetOperationStatistics(
JNIEnv *env,
jclass jni_class,
jlong jni_client) {
(void)jni_class;
aws_cache_jni_ids(env);
struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)jni_client;
if (!java_client) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.getOperationStatistics: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT);
return NULL;
}
if (!java_client->client) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "Mqtt5Client.getOperationStatistics: Invalid/null native client", AWS_ERROR_INVALID_ARGUMENT);
return NULL;
}
/* Construct Java object */
jobject jni_operation_statistics = (*env)->NewObject(
env,
mqtt5_client_operation_statistics_properties.statistics_class,
mqtt5_client_operation_statistics_properties.statistics_constructor_id);
if (jni_operation_statistics == NULL) {
return NULL;
}
struct aws_mqtt5_client_operation_statistics client_stats;
aws_mqtt5_client_get_stats(java_client->client, &client_stats);
(*env)->SetLongField(
env,
jni_operation_statistics,
mqtt5_client_operation_statistics_properties.incomplete_operation_count_field_id,
(jlong)client_stats.incomplete_operation_count);
if (aws_jni_check_and_clear_exception(env)) {
aws_raise_error(AWS_ERROR_INVALID_STATE);
aws_jni_throw_runtime_exception(
env, "Mqtt5Client.getOperationStatistics: could not create incomplete operation count");
return NULL;
}
(*env)->SetLongField(
env,
jni_operation_statistics,
mqtt5_client_operation_statistics_properties.incomplete_operation_size_field_id,
(jlong)client_stats.incomplete_operation_size);
if (aws_jni_check_and_clear_exception(env)) {
aws_raise_error(AWS_ERROR_INVALID_STATE);
aws_jni_throw_runtime_exception(
env, "Mqtt5Client.getOperationStatistics: could not create incomplete operation size");
return NULL;
}
(*env)->SetLongField(
env,
jni_operation_statistics,
mqtt5_client_operation_statistics_properties.unacked_operation_count_field_id,
(jlong)client_stats.unacked_operation_count);
if (aws_jni_check_and_clear_exception(env)) {
aws_raise_error(AWS_ERROR_INVALID_STATE);
aws_jni_throw_runtime_exception(
env, "Mqtt5Client.getOperationStatistics: could not create unacked operation count");
return NULL;
}
(*env)->SetLongField(
env,
jni_operation_statistics,
mqtt5_client_operation_statistics_properties.unacked_operation_size_field_id,
(jlong)client_stats.unacked_operation_size);
if (aws_jni_check_and_clear_exception(env)) {
aws_raise_error(AWS_ERROR_INVALID_STATE);
aws_jni_throw_runtime_exception(
env, "Mqtt5Client.getOperationStatistics: could not create unacked operation size");
return NULL;
}
return jni_operation_statistics;
}
/*******************************************************************************
* WEBSOCKET FUNCTIONS
******************************************************************************/
struct mqtt5_jni_ws_handshake {
struct aws_mqtt5_client_java_jni *java_client;
struct aws_http_message *http_request;
aws_mqtt5_transform_websocket_handshake_complete_fn *complete_fn;
void *complete_ctx;
struct aws_allocator *allocator;
};
static void s_ws_handshake_destroy(struct mqtt5_jni_ws_handshake *ws_handshake) {
if (!ws_handshake) {
return;
}
aws_mem_release(ws_handshake->allocator, ws_handshake);
}
static void s_aws_mqtt5_client_java_websocket_handshake_transform(
struct aws_http_message *request,
void *user_data,
aws_mqtt5_transform_websocket_handshake_complete_fn *complete_fn,
void *complete_ctx) {
struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)user_data;
if (!java_client) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Websocket handshake function in JNI called without valid client");
return;
}
if (!java_client->jni_client || !java_client->client) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Websocket handshake function in JNI called with already freed client");
return;
}
/********** JNI ENV ACQUIRE **********/
JNIEnv *env = aws_jni_acquire_thread_env(java_client->jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
complete_fn(request, AWS_ERROR_INVALID_STATE, complete_ctx);
return;
}
struct aws_allocator *alloc = aws_jni_get_allocator();
/* Cannot fail */
struct mqtt5_jni_ws_handshake *ws_handshake = aws_mem_calloc(alloc, 1, sizeof(struct mqtt5_jni_ws_handshake));
ws_handshake->java_client = java_client;
ws_handshake->complete_ctx = complete_ctx;
ws_handshake->complete_fn = complete_fn;
ws_handshake->http_request = request;
ws_handshake->allocator = alloc;
jobject java_http_request = aws_java_http_request_from_native(env, request, NULL);
if (!java_http_request) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Could not create a HttpRequest for Java in MQTT5 client");
aws_raise_error(AWS_ERROR_UNKNOWN);
goto error;
}
if (java_client->jni_client) {
jobject jni_client = java_client->jni_client;
(*env)->CallVoidMethod(
env, jni_client, mqtt5_client_properties.client_on_websocket_handshake_id, java_http_request, ws_handshake);
AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
}
(*env)->DeleteLocalRef(env, java_http_request);
aws_jni_release_thread_env(java_client->jvm, env);
/********** JNI ENV RELEASE SUCCESS PATH **********/
return;
error:;
int error_code = aws_last_error();
s_ws_handshake_destroy(ws_handshake);
complete_fn(request, error_code, complete_ctx);
aws_jni_release_thread_env(java_client->jvm, env);
/********** JNI ENV RELEASE FAILURE PATH **********/
}
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientInternalWebsocketHandshakeComplete(
JNIEnv *env,
jclass jni_class,
jlong jni_connection,
jbyteArray jni_marshalled_request,
jobject jni_throwable,
jlong jni_user_data) {
(void)jni_class;
(void)jni_connection;
aws_cache_jni_ids(env);
struct mqtt5_jni_ws_handshake *ws_handshake = (void *)jni_user_data;
int error_code = AWS_ERROR_SUCCESS;
if (!ws_handshake) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Websocket handshake complete function in JNI called without handshake");
return;
}
if (jni_throwable != NULL) {
if ((*env)->IsInstanceOf(env, jni_throwable, crt_runtime_exception_properties.crt_runtime_exception_class)) {
error_code = (*env)->GetIntField(env, jni_throwable, crt_runtime_exception_properties.error_code_field_id);
}
if (error_code == AWS_ERROR_SUCCESS) {
error_code = AWS_ERROR_UNKNOWN; /* is there anything more that could be done here? */
}
goto done;
}
if (aws_apply_java_http_request_changes_to_native_request(
env, jni_marshalled_request, NULL, ws_handshake->http_request)) {
error_code = aws_last_error();
goto done;
}
done:
ws_handshake->complete_fn(ws_handshake->http_request, error_code, ws_handshake->complete_ctx);
s_ws_handshake_destroy(ws_handshake);
}
static int s_initialize_topic_aliasing_options(
JNIEnv *env,
struct aws_mqtt5_client_topic_alias_options *topic_aliasing_options,
jobject jni_topic_aliasing_options) {
jobject jni_outbound_behavior = (*env)->GetObjectField(
env, jni_topic_aliasing_options, mqtt5_topic_aliasing_options_properties.outbound_behavior_field_id);
if (jni_outbound_behavior != NULL) {
jint enum_value = (*env)->CallIntMethod(
env, jni_outbound_behavior, mqtt5_outbound_topic_alias_behavior_type_properties.get_value_method_id);
if (aws_jni_check_and_clear_exception(env)) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Error getting native value from OutboundTopicAliasBehaviorType");
return aws_raise_error(AWS_ERROR_INVALID_STATE);
}
topic_aliasing_options->outbound_topic_alias_behavior =
(enum aws_mqtt5_client_outbound_topic_alias_behavior_type)enum_value;
}
jobject jni_outbound_cache_max_size = (*env)->GetObjectField(
env, jni_topic_aliasing_options, mqtt5_topic_aliasing_options_properties.outbound_cache_max_size_field_id);
if (jni_outbound_cache_max_size != NULL) {
jint int_value =
(*env)->CallIntMethod(env, jni_outbound_cache_max_size, boxed_integer_properties.integer_get_value_id);
if (int_value < 0 || int_value > UINT16_MAX) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Invalid outbound cache size value: %d", int_value);
return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
}
topic_aliasing_options->outbound_alias_cache_max_size = (uint16_t)int_value;
}
jobject jni_inbound_behavior = (*env)->GetObjectField(
env, jni_topic_aliasing_options, mqtt5_topic_aliasing_options_properties.inbound_behavior_field_id);
if (jni_inbound_behavior != NULL) {
jint enum_value = (*env)->CallIntMethod(
env, jni_inbound_behavior, mqtt5_inbound_topic_alias_behavior_type_properties.get_value_method_id);
if (aws_jni_check_and_clear_exception(env)) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Error getting native value from InboundTopicAliasBehaviorType");
return aws_raise_error(AWS_ERROR_INVALID_STATE);
}
topic_aliasing_options->inbound_topic_alias_behavior =
(enum aws_mqtt5_client_inbound_topic_alias_behavior_type)enum_value;
}
jobject jni_inbound_cache_max_size = (*env)->GetObjectField(
env, jni_topic_aliasing_options, mqtt5_topic_aliasing_options_properties.inbound_cache_max_size_field_id);
if (jni_inbound_cache_max_size != NULL) {
jint int_value =
(*env)->CallIntMethod(env, jni_inbound_cache_max_size, boxed_integer_properties.integer_get_value_id);
if (int_value < 0 || int_value > UINT16_MAX) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Invalid inbound cache size value: %d", int_value);
return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
}
topic_aliasing_options->inbound_alias_cache_size = (uint16_t)int_value;
}
return AWS_OP_SUCCESS;
}
/*******************************************************************************
* JNI FUNCTIONS
******************************************************************************/
/* Create and Destroy
**************************************/
JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientNew(
JNIEnv *env,
jclass jni_class,
jobject jni_options,
jobject jni_connect_options,
jobject jni_bootstrap,
jobject jni_client) {
(void)jni_class;
aws_cache_jni_ids(env);
struct aws_allocator *allocator = aws_jni_get_allocator();
struct aws_mqtt5_packet_connect_view_java_jni *connect_options = NULL;
struct aws_mqtt5_client_options client_options;
AWS_ZERO_STRUCT(client_options);
struct aws_http_proxy_options_java_jni *java_http_proxy_options = NULL;
struct aws_byte_buf host_name_buf;
/* Needed to track if optionals are set or not */
bool was_value_set = false;
/**
* Push a new local frame so any local allocations we make are tied to it. Then we can pop it to free memory.
* In Java JNI allocations here, we have 21 allocations so we need at least that many.
* It should expand if we use more.
* (NOTE: We cannot get the exact here because we are pulling from Java objects and we have no way to know how many
* that will need)
*/
jint local_frame_result = (*env)->PushLocalFrame(env, (jint)21);
if (local_frame_result != 0) {
s_aws_mqtt5_client_log_and_throw_exception(
env,
"MQTT5 client new: could not push local JNI frame with 21 allocation minimum",
AWS_ERROR_INVALID_STATE);
return (jlong)NULL;
}
struct aws_mqtt5_client_java_jni *java_client =
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt5_client_java_jni));
AWS_LOGF_DEBUG(AWS_LS_MQTT5_CLIENT, "java_client=%p: Initializing MQTT5 client", (void *)java_client);
if (java_client == NULL) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: could not initialize new client", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (aws_get_string_from_jobject(
env,
jni_options,
mqtt5_client_options_properties.options_host_name_field_id,
s_client_string,
"Host Name",
&host_name_buf,
&client_options.host_name,
false,
&was_value_set) != AWS_OP_SUCCESS) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: Could not get host name from options", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
uint32_t port = 0;
if (aws_get_uint32_from_jobject(
env,
jni_options,
mqtt5_client_options_properties.options_port_field_id,
s_client_string,
"port",
&port,
false,
&was_value_set) != AWS_OP_SUCCESS) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: Could not get port from options", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (was_value_set) {
client_options.port = port;
}
if (!jni_bootstrap) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: no bootstrap found", AWS_ERROR_INVALID_ARGUMENT);
goto clean_up;
}
jlong jni_bootstrap_pointer =
(*env)->CallLongMethod(env, jni_bootstrap, crt_resource_properties.get_native_handle_method_id);
if (aws_jni_check_and_clear_exception(env)) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: could not get native handle for bootstrap", AWS_ERROR_INVALID_ARGUMENT);
goto clean_up;
}
struct aws_client_bootstrap *bootstrap = (struct aws_client_bootstrap *)jni_bootstrap_pointer;
client_options.bootstrap = bootstrap;
struct aws_socket_options *socket_options = NULL;
struct aws_socket_options tmp_socket_options;
jobject jni_socket_options =
(*env)->CallObjectMethod(env, jni_options, mqtt5_client_options_properties.options_get_socket_options_id);
if (aws_jni_check_and_clear_exception(env)) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: error getting socket options", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (jni_socket_options) {
jlong jni_socket_options_pointer =
(*env)->CallLongMethod(env, jni_socket_options, crt_resource_properties.get_native_handle_method_id);
if (aws_jni_check_and_clear_exception(env)) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: could not get native handle for socket options", AWS_ERROR_INVALID_ARGUMENT);
goto clean_up;
}
socket_options = (struct aws_socket_options *)jni_socket_options_pointer;
}
if (socket_options == NULL) {
tmp_socket_options.type = AWS_SOCKET_STREAM;
tmp_socket_options.domain = AWS_SOCKET_IPV4;
tmp_socket_options.connect_timeout_ms = 10000;
client_options.socket_options = &tmp_socket_options;
} else {
client_options.socket_options = socket_options;
}
jobject jni_tls_options =
(*env)->CallObjectMethod(env, jni_options, mqtt5_client_options_properties.options_get_tls_options_id);
if (aws_jni_check_and_clear_exception(env)) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: error getting tls options", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (jni_tls_options) {
jlong jni_tls_pointer =
(*env)->CallLongMethod(env, jni_tls_options, crt_resource_properties.get_native_handle_method_id);
if (aws_jni_check_and_clear_exception(env)) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: could not get native handle for tls options", AWS_ERROR_INVALID_ARGUMENT);
goto clean_up;
}
struct aws_tls_ctx *tls_ctx = (struct aws_tls_ctx *)jni_tls_pointer;
if (tls_ctx) {
aws_tls_connection_options_init_from_ctx(&java_client->tls_options, tls_ctx);
aws_tls_connection_options_set_server_name(&java_client->tls_options, allocator, &client_options.host_name);
client_options.tls_options = &java_client->tls_options;
}
} else {
client_options.tls_options = NULL;
}
jobject jni_http_proxy_options =
(*env)->GetObjectField(env, jni_options, mqtt5_client_options_properties.http_proxy_options_field_id);
if (aws_jni_check_and_clear_exception(env)) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: error getting http proxy options", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (jni_http_proxy_options) {
java_http_proxy_options =
s_aws_mqtt5_http_proxy_options_create_from_java(env, allocator, java_client, jni_http_proxy_options);
client_options.http_proxy_options = &java_http_proxy_options->options;
if (client_options.http_proxy_options->connection_type != AWS_HPCT_HTTP_TUNNEL) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: http proxy connection type has to be set to tunnel", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
}
if (jni_connect_options) {
connect_options = aws_mqtt5_packet_connect_view_create_from_java(env, allocator, jni_connect_options);
if (connect_options != NULL || aws_jni_check_and_clear_exception(env)) {
client_options.connect_options = aws_mqtt5_packet_connect_view_get_packet(connect_options);
} else {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: error getting connect options", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
}
uint32_t session_behavior = UINT32_MAX;
if (aws_get_enum_from_jobject(
env,
jni_options,
mqtt5_client_options_properties.options_get_session_behavior_id,
s_client_string,
"session behavior",
mqtt5_client_session_behavior_properties.client_get_value_id,
&session_behavior,
true,
&was_value_set) == AWS_OP_ERR) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: Could not get session behavior from options", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (was_value_set) {
client_options.session_behavior = (enum aws_mqtt5_client_session_behavior_type)session_behavior;
}
uint32_t extended_validation_and_flow_control_options = UINT32_MAX;
if (aws_get_enum_from_jobject(
env,
jni_options,
mqtt5_client_options_properties.options_get_extended_validation_and_flow_control_options_id,
s_client_string,
"extended validation and flow control",
mqtt5_client_extended_validation_and_flow_control_options.client_get_value_id,
&extended_validation_and_flow_control_options,
true,
&was_value_set) == AWS_OP_ERR) {
s_aws_mqtt5_client_log_and_throw_exception(
env,
"MQTT5 client new: Could not extended validation and flow control from options",
AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (was_value_set) {
client_options.extended_validation_and_flow_control_options =
(enum aws_mqtt5_extended_validation_and_flow_control_options)extended_validation_and_flow_control_options;
}
uint32_t offline_queue_enum = UINT32_MAX;
if (aws_get_enum_from_jobject(
env,
jni_options,
mqtt5_client_options_properties.options_get_offline_queue_behavior_id,
s_client_string,
"offline queue behavior",
mqtt5_client_offline_queue_behavior_type_properties.client_get_value_id,
&offline_queue_enum,
true,
&was_value_set) == AWS_OP_ERR) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: Could not get offline queue behavior from options", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (was_value_set) {
client_options.offline_queue_behavior = (enum aws_mqtt5_client_operation_queue_behavior_type)offline_queue_enum;
}
uint32_t retry_jitter_enum = UINT32_MAX;
if (aws_get_enum_from_jobject(
env,
jni_options,
mqtt5_client_options_properties.options_get_retry_jitter_mode_id,
s_client_string,
"retry jitter mode",
mqtt5_client_jitter_mode_properties.client_get_value_id,
&retry_jitter_enum,
true,
&was_value_set) == AWS_OP_ERR) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: Could not get retry jitter mode from options", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (was_value_set) {
client_options.retry_jitter_mode = (enum aws_exponential_backoff_jitter_mode)retry_jitter_enum;
}
uint64_t min_reconnect_delay_ms = 0;
if (aws_get_uint64_from_jobject(
env,
jni_options,
mqtt5_client_options_properties.min_reconnect_delay_ms_field_id,
s_client_string,
"minimum reconnect delay",
&min_reconnect_delay_ms,
true,
&was_value_set) != AWS_OP_SUCCESS) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: Could not get minimum reconnect delay from options", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (was_value_set) {
client_options.min_reconnect_delay_ms = min_reconnect_delay_ms;
}
uint64_t max_reconnect_delay_ms = 0;
if (aws_get_uint64_from_jobject(
env,
jni_options,
mqtt5_client_options_properties.max_reconnect_delay_ms_field_id,
s_client_string,
"maximum reconnect delay",
&max_reconnect_delay_ms,
true,
&was_value_set) != AWS_OP_SUCCESS) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: Could not get maximum reconnect delay from options", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (was_value_set) {
client_options.max_reconnect_delay_ms = max_reconnect_delay_ms;
}
uint64_t min_connected_time_to_reset_reconnect_delay_ms = 0;
if (aws_get_uint64_from_jobject(
env,
jni_options,
mqtt5_client_options_properties.min_connected_time_to_reset_reconnect_delay_ms_field_id,
s_client_string,
"minimum connected time to reset reconnect delay",
&min_connected_time_to_reset_reconnect_delay_ms,
true,
&was_value_set) != AWS_OP_SUCCESS) {
s_aws_mqtt5_client_log_and_throw_exception(
env,
"MQTT5 client new: Could not get minimum connected time to reset reconnect delay from options",
AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (was_value_set) {
client_options.min_connected_time_to_reset_reconnect_delay_ms = min_connected_time_to_reset_reconnect_delay_ms;
}
uint32_t ping_timeout = 0;
if (aws_get_uint32_from_jobject(
env,
jni_options,
mqtt5_client_options_properties.ping_timeout_ms_field_id,
s_client_string,
"ping timeout",
&ping_timeout,
true,
&was_value_set) != AWS_OP_SUCCESS) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: Could not get ping timeout from options", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (was_value_set) {
client_options.ping_timeout_ms = ping_timeout;
}
uint32_t connack_timeout = 0;
if (aws_get_uint32_from_jobject(
env,
jni_options,
mqtt5_client_options_properties.connack_timeout_ms_field_id,
s_client_string,
"ConnAck timeout",
&connack_timeout,
true,
&was_value_set) != AWS_OP_SUCCESS) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: Could not get ConnAck timeout from options", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (was_value_set) {
client_options.connack_timeout_ms = connack_timeout;
}
uint32_t ack_timeout = 0;
if (aws_get_uint32_from_jobject(
env,
jni_options,
mqtt5_client_options_properties.ack_timeout_seconds_field_id,
s_client_string,
"Ack timeout",
&ack_timeout,
true,
&was_value_set) != AWS_OP_SUCCESS) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: Could not get Ack timeout from options", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (was_value_set) {
client_options.ack_timeout_seconds = ack_timeout;
}
jint jvmresult = (*env)->GetJavaVM(env, &java_client->jvm);
if (jvmresult != 0) {
s_aws_mqtt5_client_log_and_throw_exception(env, "MQTT5 client new: Unable to get JVM", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
java_client->jni_client = (*env)->NewGlobalRef(env, jni_client);
client_options.lifecycle_event_handler = &s_aws_mqtt5_client_java_lifecycle_event;
client_options.lifecycle_event_handler_user_data = (void *)java_client;
client_options.publish_received_handler = &s_aws_mqtt5_client_java_publish_received;
client_options.publish_received_handler_user_data = (void *)java_client;
/* Are we using websockets? */
jobject jni_websocket_handshake =
(*env)->GetObjectField(env, jni_client, mqtt5_client_properties.websocket_handshake_field_id);
if (aws_jni_check_and_clear_exception(env)) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: error getting websocket handshake transform", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (jni_websocket_handshake) {
client_options.websocket_handshake_transform = &s_aws_mqtt5_client_java_websocket_handshake_transform;
client_options.websocket_handshake_transform_user_data = (void *)java_client;
}
jobject jni_publish_events =
(*env)->GetObjectField(env, jni_options, mqtt5_client_options_properties.publish_events_field_id);
if (aws_jni_check_and_clear_exception(env)) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: error getting publish events", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (jni_publish_events != NULL) {
java_client->jni_publish_events = (*env)->NewGlobalRef(env, jni_publish_events);
}
jobject jni_lifecycle_events =
(*env)->GetObjectField(env, jni_options, mqtt5_client_options_properties.lifecycle_events_field_id);
if (aws_jni_check_and_clear_exception(env)) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client new: error getting lifecycle events", AWS_ERROR_INVALID_STATE);
goto clean_up;
}
if (jni_lifecycle_events != NULL) {
java_client->jni_lifecycle_events = (*env)->NewGlobalRef(env, jni_lifecycle_events);
}
struct aws_mqtt5_client_topic_alias_options topic_aliasing_options;
AWS_ZERO_STRUCT(topic_aliasing_options);
jobject jni_topic_aliasing_options =
(*env)->GetObjectField(env, jni_options, mqtt5_client_options_properties.topic_aliasing_options_field_id);
if (jni_topic_aliasing_options != NULL) {
if (s_initialize_topic_aliasing_options(env, &topic_aliasing_options, jni_topic_aliasing_options) ==
AWS_OP_SUCCESS) {
client_options.topic_aliasing_options = &topic_aliasing_options;
}
}
client_options.client_termination_handler = &s_aws_mqtt5_client_java_termination;
client_options.client_termination_handler_user_data = (void *)java_client;
/* Make the MQTT5 client */
java_client->client = aws_mqtt5_client_new(allocator, &client_options);
/* Did we successfully make a client? If not, then throw an exception */
if (java_client->client == NULL) {
s_aws_mqtt5_client_log_and_throw_exception(
env,
"MQTT5 client new: Was unable to create client due to option configuration! Enable error logging to see "
"reason",
AWS_ERROR_MQTT5_CLIENT_OPTIONS_VALIDATION);
goto clean_up;
}
goto clean_up;
clean_up:
aws_mqtt5_packet_connect_view_java_destroy(env, allocator, connect_options);
s_aws_mqtt5_http_proxy_options_java_destroy(env, allocator, java_http_proxy_options);
if (aws_byte_buf_is_valid(&host_name_buf)) {
aws_byte_buf_clean_up(&host_name_buf);
}
(*env)->PopLocalFrame(env, NULL);
if (java_client->client != NULL) {
return (jlong)java_client;
}
aws_mqtt5_client_java_destroy(env, allocator, java_client);
return (jlong)NULL;
}
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5ClientDestroy(
JNIEnv *env,
jclass jni_class,
jlong jni_mqtt_client) {
(void)jni_class;
aws_cache_jni_ids(env);
struct aws_mqtt5_client_java_jni *java_client = (struct aws_mqtt5_client_java_jni *)jni_mqtt_client;
if (!java_client) {
s_aws_mqtt5_client_log_and_throw_exception(
env, "MQTT5 client destroy: Invalid/null client", AWS_ERROR_INVALID_ARGUMENT);
return;
}
// If the client is NOT null it can be shut down normally
struct aws_allocator *allocator = aws_jni_get_allocator();
if (java_client->client) {
aws_mqtt5_client_release(java_client->client);
} else {
aws_mqtt5_client_java_destroy(env, allocator, java_client);
}
}
#if UINTPTR_MAX == 0xffffffff
# if defined(_MSC_VER)
# pragma warning(pop)
# else
# pragma GCC diagnostic pop
# endif
#endif