From 60f5852582e4163b33c2d02226b5b5c1b5858762 Mon Sep 17 00:00:00 2001 From: Michael Kuhn Date: Wed, 20 Oct 2021 23:34:50 +0200 Subject: [PATCH] lib/object, server: Check existence for read/write operations (#93) --- lib/object/jdistributed-object.c | 65 ++++++++++++++++++++++++++------ lib/object/jobject.c | 31 ++++++++++----- server/loop.c | 40 +++++++++++++------- 3 files changed, 102 insertions(+), 34 deletions(-) diff --git a/lib/object/jdistributed-object.c b/lib/object/jdistributed-object.c index 479a2a88c..d2f8d2fb8 100644 --- a/lib/object/jdistributed-object.c +++ b/lib/object/jdistributed-object.c @@ -353,6 +353,12 @@ j_distributed_object_read_background_operation(gpointer data) reply_operation_count = j_message_get_count(reply); + if (reply_operation_count == 0) + { + background_data->ret = FALSE; + break; + } + for (guint i = 0; i < reply_operation_count && j_list_iterator_next(it); i++) { JDistributedObjectReadBuffer* buffer = j_list_iterator_get(it); @@ -384,9 +390,7 @@ j_distributed_object_read_background_operation(gpointer data) j_list_unref(background_data->read.buffers); - g_slice_free(JDistributedObjectBackgroundData, background_data); - - return NULL; + return data; } /** @@ -422,14 +426,21 @@ j_distributed_object_write_background_operation(gpointer data) reply = j_message_new_reply(background_data->message); j_message_receive(reply, object_connection); - it = j_list_iterator_new(background_data->write.bytes_written); - - while (j_list_iterator_next(it)) + if (j_message_get_count(reply) > 0) { - guint64* bytes_written = j_list_iterator_get(it); + it = j_list_iterator_new(background_data->write.bytes_written); - nbytes = j_message_get_8(reply); - j_helper_atomic_add(bytes_written, nbytes); + while (j_list_iterator_next(it)) + { + guint64* bytes_written = j_list_iterator_get(it); + + nbytes = j_message_get_8(reply); + j_helper_atomic_add(bytes_written, nbytes); + } + } + else + { + background_data->ret = FALSE; } } @@ -439,9 +450,7 @@ j_distributed_object_write_background_operation(gpointer data) j_list_unref(background_data->write.bytes_written); - g_slice_free(JDistributedObjectBackgroundData, background_data); - - return NULL; + return data; } /** @@ -912,11 +921,27 @@ j_distributed_object_read_exec(JList* operations, JSemantics* semantics) data->operations = NULL; data->semantics = semantics; data->read.buffers = br_lists[i]; + data->ret = TRUE; background_data[i] = data; } j_helper_execute_parallel(j_distributed_object_read_background_operation, background_data, server_count); + + for (guint i = 0; i < server_count; i++) + { + JDistributedObjectBackgroundData* data; + + if (background_data[i] == NULL) + { + continue; + } + + data = background_data[i]; + ret = data->ret && ret; + + g_slice_free(JDistributedObjectBackgroundData, data); + } } else { @@ -1087,11 +1112,27 @@ j_distributed_object_write_exec(JList* operations, JSemantics* semantics) data->operations = NULL; data->semantics = semantics; data->write.bytes_written = bw_lists[i]; + data->ret = TRUE; background_data[i] = data; } j_helper_execute_parallel(j_distributed_object_write_background_operation, background_data, server_count); + + for (guint i = 0; i < server_count; i++) + { + JDistributedObjectBackgroundData* data; + + if (background_data[i] == NULL) + { + continue; + } + + data = background_data[i]; + ret = data->ret && ret; + + g_slice_free(JDistributedObjectBackgroundData, data); + } } else { diff --git a/lib/object/jobject.c b/lib/object/jobject.c index d6a82b942..52c57819c 100644 --- a/lib/object/jobject.c +++ b/lib/object/jobject.c @@ -540,6 +540,12 @@ j_object_read_exec(JList* operations, JSemantics* semantics) reply_operation_count = j_message_get_count(reply); + if (reply_operation_count == 0) + { + ret = FALSE; + break; + } + for (guint i = 0; i < reply_operation_count && j_list_iterator_next(it); i++) { JObjectOperation* operation = j_list_iterator_get(it); @@ -703,18 +709,25 @@ j_object_write_exec(JList* operations, JSemantics* semantics) reply = j_message_new_reply(message); j_message_receive(reply, object_connection); - it = j_list_iterator_new(operations); - - while (j_list_iterator_next(it)) + if (j_message_get_count(reply) > 0) { - JObjectOperation* operation = j_list_iterator_get(it); - guint64* bytes_written = operation->write.bytes_written; + it = j_list_iterator_new(operations); - nbytes = j_message_get_8(reply); - j_helper_atomic_add(bytes_written, nbytes); - } + while (j_list_iterator_next(it)) + { + JObjectOperation* operation = j_list_iterator_get(it); + guint64* bytes_written = operation->write.bytes_written; + + nbytes = j_message_get_8(reply); + j_helper_atomic_add(bytes_written, nbytes); + } - j_list_iterator_free(it); + j_list_iterator_free(it); + } + else + { + ret = FALSE; + } } j_connection_pool_push(J_BACKEND_TYPE_OBJECT, object->index, object_connection); diff --git a/server/loop.c b/server/loop.c index a80edbe89..33adbc1c1 100644 --- a/server/loop.c +++ b/server/loop.c @@ -192,14 +192,14 @@ jd_handle_message(JMessage* message, GSocketConnection* connection, JMemoryChunk { JMessage* reply; gpointer object; + gboolean ret; namespace = j_message_get_string(message); path = j_message_get_string(message); reply = j_message_new_reply(message); - /// \todo return value - j_backend_object_open(jd_object_backend, namespace, path, &object); + ret = j_backend_object_open(jd_object_backend, namespace, path, &object); for (i = 0; i < operation_count; i++) { @@ -211,6 +211,11 @@ jd_handle_message(JMessage* message, GSocketConnection* connection, JMemoryChunk length = j_message_get_8(message); offset = j_message_get_8(message); + if (G_UNLIKELY(!ret)) + { + break; + } + if (length > memory_chunk_size) { /// \todo return proper error @@ -247,7 +252,10 @@ jd_handle_message(JMessage* message, GSocketConnection* connection, JMemoryChunk j_statistics_add(statistics, J_STATISTICS_BYTES_SENT, bytes_read); } - j_backend_object_close(jd_object_backend, object); + if (ret) + { + j_backend_object_close(jd_object_backend, object); + } j_message_send(reply, connection); j_message_unref(reply); @@ -259,6 +267,7 @@ jd_handle_message(JMessage* message, GSocketConnection* connection, JMemoryChunk { g_autoptr(JMessage) reply = NULL; gpointer object; + gboolean ret; if (safety == J_SEMANTICS_SAFETY_NETWORK || safety == J_SEMANTICS_SAFETY_STORAGE) { @@ -268,8 +277,7 @@ jd_handle_message(JMessage* message, GSocketConnection* connection, JMemoryChunk namespace = j_message_get_string(message); path = j_message_get_string(message); - /// \todo return value - j_backend_object_open(jd_object_backend, namespace, path, &object); + ret = j_backend_object_open(jd_object_backend, namespace, path, &object); for (i = 0; i < operation_count; i++) { @@ -282,7 +290,7 @@ jd_handle_message(JMessage* message, GSocketConnection* connection, JMemoryChunk length = j_message_get_8(message); offset = j_message_get_8(message); - if (length > memory_chunk_size) + if (length > memory_chunk_size && reply != NULL && G_LIKELY(ret)) { /// \todo return proper error j_message_add_operation(reply, sizeof(guint64)); @@ -298,13 +306,16 @@ jd_handle_message(JMessage* message, GSocketConnection* connection, JMemoryChunk g_input_stream_read_all(input, buf, length, NULL, NULL, NULL); j_statistics_add(statistics, J_STATISTICS_BYTES_RECEIVED, length); - j_backend_object_write(jd_object_backend, object, buf, length, offset, &bytes_written); - j_statistics_add(statistics, J_STATISTICS_BYTES_WRITTEN, bytes_written); - - if (reply != NULL) + if (G_LIKELY(ret)) { - j_message_add_operation(reply, sizeof(guint64)); - j_message_append_8(reply, &bytes_written); + j_backend_object_write(jd_object_backend, object, buf, length, offset, &bytes_written); + j_statistics_add(statistics, J_STATISTICS_BYTES_WRITTEN, bytes_written); + + if (reply != NULL) + { + j_message_add_operation(reply, sizeof(guint64)); + j_message_append_8(reply, &bytes_written); + } } j_memory_chunk_reset(memory_chunk); @@ -316,7 +327,10 @@ jd_handle_message(JMessage* message, GSocketConnection* connection, JMemoryChunk j_statistics_add(statistics, J_STATISTICS_SYNC, 1); } - j_backend_object_close(jd_object_backend, object); + if (ret) + { + j_backend_object_close(jd_object_backend, object); + } if (reply != NULL) {