Skip to content

Commit

Permalink
Fix http2 manual write (#419)
Browse files Browse the repository at this point in the history
  • Loading branch information
waahm7 authored Jan 9, 2023
1 parent 62a03c5 commit 4bbda11
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 11 deletions.
2 changes: 1 addition & 1 deletion include/aws/http/connection_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ struct aws_http_connection_manager_options {
* Optional.
* HTTP/2 specific configuration. Check `struct aws_http2_connection_options` for details of each config
*/
struct aws_http2_setting *initial_settings_array;
const struct aws_http2_setting *initial_settings_array;
size_t num_initial_settings;
size_t max_closed_streams;
bool http2_conn_manual_window_management;
Expand Down
4 changes: 2 additions & 2 deletions include/aws/http/http2_stream_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ struct aws_http2_stream_manager_options {
* - For stream level window control, `enable_read_back_pressure` will enable manual control. The initial window
* size needs to be set through `initial_settings_array`.
*/
struct aws_http2_setting *initial_settings_array;
const struct aws_http2_setting *initial_settings_array;
size_t num_initial_settings;
size_t max_closed_streams;
bool conn_manual_window_management;
Expand Down Expand Up @@ -187,7 +187,7 @@ struct aws_http2_stream_manager *aws_http2_stream_manager_release(struct aws_htt
AWS_HTTP_API
struct aws_http2_stream_manager *aws_http2_stream_manager_new(
struct aws_allocator *allocator,
struct aws_http2_stream_manager_options *options);
const struct aws_http2_stream_manager_options *options);

/**
* Acquire a stream from stream manager asynchronously.
Expand Down
14 changes: 7 additions & 7 deletions source/h2_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,6 @@ struct aws_h2_stream *aws_h2_stream_new_request(
stream->base.on_destroy = options->on_destroy;
stream->base.client_data = &stream->base.client_or_server_data.client;
stream->base.client_data->response_status = AWS_HTTP_STATUS_CODE_UNKNOWN;
struct aws_byte_cursor method;
AWS_ZERO_STRUCT(method);
if (aws_http_message_get_request_method(options->request, &method)) {
goto error;
}
stream->base.request_method = aws_http_str_to_method(method);
aws_linked_list_init(&stream->thread_data.outgoing_writes);
aws_linked_list_init(&stream->synced_data.pending_write_list);

Expand All @@ -276,6 +270,12 @@ struct aws_h2_stream *aws_h2_stream_new_request(
aws_raise_error(AWS_ERROR_HTTP_UNSUPPORTED_PROTOCOL);
goto error;
}
struct aws_byte_cursor method;
AWS_ZERO_STRUCT(method);
if (aws_http_message_get_request_method(options->request, &method)) {
goto error;
}
stream->base.request_method = aws_http_str_to_method(method);

/* Init H2 specific stuff */
stream->thread_data.state = AWS_H2_STREAM_STATE_IDLE;
Expand All @@ -289,7 +289,7 @@ struct aws_h2_stream *aws_h2_stream_new_request(
struct aws_h2_stream_data_write *body_write =
aws_mem_calloc(stream->base.alloc, 1, sizeof(struct aws_h2_stream_data_write));
body_write->data_stream = aws_input_stream_acquire(body_stream);
body_write->end_stream = true;
body_write->end_stream = !stream->manual_write;
aws_linked_list_push_back(&stream->thread_data.outgoing_writes, &body_write->node);
}

Expand Down
3 changes: 2 additions & 1 deletion source/http2_stream_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,7 @@ static void s_make_request_task(struct aws_channel_task *task, void *arg, enum a
.on_complete = s_on_stream_complete,
.on_destroy = s_on_stream_destroy,
.user_data = pending_stream_acquisition,
.http2_use_manual_data_writes = pending_stream_acquisition->options.http2_use_manual_data_writes,
};
/* TODO: we could put the pending acquisition back to the list if the connection is not available for new request.
*/
Expand Down Expand Up @@ -1051,7 +1052,7 @@ void s_stream_manager_on_zero_external_ref(struct aws_http2_stream_manager *stre

struct aws_http2_stream_manager *aws_http2_stream_manager_new(
struct aws_allocator *allocator,
struct aws_http2_stream_manager_options *options) {
const struct aws_http2_stream_manager_options *options) {

AWS_PRECONDITION(allocator);
/* The other options are validated by the aws_http_connection_manager_new */
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ add_test_case(h2_client_error_from_incoming_headers_callback_reset_stream)
add_test_case(h2_client_error_from_incoming_headers_done_callback_reset_stream)
add_test_case(h2_client_error_from_incoming_body_callback_reset_stream)
add_test_case(h2_client_manual_data_write)
add_test_case(h2_client_manual_data_write_with_body)
add_test_case(h2_client_manual_data_write_no_data)
add_test_case(h2_client_manual_data_write_connection_close)

Expand Down
110 changes: 110 additions & 0 deletions tests/test_h2_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -5336,6 +5336,116 @@ TEST_CASE(h2_client_manual_data_write) {
return s_tester_clean_up();
}

TEST_CASE(h2_client_manual_data_write_with_body) {

ASSERT_SUCCESS(s_tester_init(allocator, ctx));
/* get connection preface and acks out of the way */
ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer));
ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer));
size_t frame_count = h2_decode_tester_frame_count(&s_tester.peer.decode);

struct aws_http_message *request = aws_http2_message_new_request(allocator);
ASSERT_NOT_NULL(request);

struct aws_http_header request_headers_src[] = {
DEFINE_HEADER(":method", "GET"),
DEFINE_HEADER(":scheme", "https"),
DEFINE_HEADER(":path", "/"),
};
aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src));
struct aws_http_make_request_options request_options = {
.self_size = sizeof(request_options),
.request = request,
.http2_use_manual_data_writes = true,
};
size_t total_length = 0;

/* set request body */
const char *body_src = "hello";
struct aws_byte_cursor body_cursor = aws_byte_cursor_from_c_str(body_src);
struct aws_input_stream *request_body = aws_input_stream_new_from_cursor(allocator, &body_cursor);
aws_http_message_set_body_stream(request, request_body);
int64_t body_length = 0;
ASSERT_SUCCESS(aws_input_stream_get_length(request_body, &body_length));
total_length += (size_t)body_length;
aws_input_stream_release(request_body);

struct aws_http_stream *stream = aws_http_connection_make_request(s_tester.connection, &request_options);
ASSERT_NOT_NULL(stream);

aws_http_stream_activate(stream);
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
uint32_t stream_id = aws_http_stream_get_id(stream);

struct aws_byte_buf payload;
aws_byte_buf_init(&payload, allocator, 1024);

struct h2_client_manual_data_write_ctx test_ctx = {
.allocator = allocator,
.data = payload,
};

/* Simulate writes coming in over time */
for (int idx = 0; idx < 1000; ++idx) {
struct aws_input_stream *data_stream = s_h2_client_manual_data_write_generate_data(&test_ctx);
int64_t stream_length = 0;
ASSERT_SUCCESS(aws_input_stream_get_length(data_stream, &stream_length));
total_length += (size_t)stream_length;
struct aws_http2_stream_write_data_options write = {
.data = data_stream,
.on_complete = NULL,
.user_data = NULL,
};
ASSERT_SUCCESS(aws_http2_stream_write_data(stream, &write));
/* fake peer sends WINDOW_UPDATE */
struct aws_h2_frame *peer_frame = aws_h2_frame_new_window_update(allocator, stream_id, (uint32_t)stream_length);
ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, peer_frame));
/* Connection level window update */
peer_frame = aws_h2_frame_new_window_update(allocator, 0, (uint32_t)stream_length);
ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, peer_frame));
if (idx % 10 == 0) {
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer));
}
aws_input_stream_release(data_stream);
}
struct aws_http2_stream_write_data_options last_write = {.end_stream = true};

ASSERT_SUCCESS(aws_http2_stream_write_data(stream, &last_write));

testing_channel_drain_queued_tasks(&s_tester.testing_channel);
ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer));
size_t frame_count2 = h2_decode_tester_frame_count(&s_tester.peer.decode);
/* Peer should received header frame without end_stream and mutiple data frames and combined payload length should
* be the same as total length sent. */
struct h2_decoded_frame *header_frame = h2_decode_tester_get_frame(&s_tester.peer.decode, frame_count);
ASSERT_UINT_EQUALS(AWS_H2_FRAME_T_HEADERS, header_frame->type);
ASSERT_FALSE(header_frame->end_stream);
size_t received_length = 0;
for (size_t i = frame_count + 1; i < frame_count2; i++) {
struct h2_decoded_frame *data_frame = h2_decode_tester_get_frame(&s_tester.peer.decode, i);
ASSERT_UINT_EQUALS(AWS_H2_FRAME_T_DATA, data_frame->type);
received_length += data_frame->data_payload_len;
if (i == frame_count2 - 1) {
ASSERT_TRUE(data_frame->end_stream);
} else {
ASSERT_FALSE(data_frame->end_stream);
}
}
ASSERT_UINT_EQUALS(received_length, total_length);

aws_http_message_release(request);
aws_http_stream_release(stream);

/* close the connection */
aws_http_connection_close(s_tester.connection);

aws_byte_buf_clean_up(&test_ctx.data);

/* clean up */
return s_tester_clean_up();
}

TEST_CASE(h2_client_manual_data_write_no_data) {

ASSERT_SUCCESS(s_tester_init(allocator, ctx));
Expand Down

0 comments on commit 4bbda11

Please sign in to comment.