From 480298bdfaf46ad235db954ae342ee557ba131bc Mon Sep 17 00:00:00 2001 From: yulalenk Date: Thu, 11 Apr 2024 13:32:28 +0300 Subject: [PATCH] Stage 5 - basic implementation --- .../itmo/test/alenkovayulya/ServerImpl.java | 112 ++++++++++++++---- .../itmo/test/alenkovayulya/ServiceImpl.java | 2 +- .../itmo/test/alenkovayulya/ShardRouter.java | 37 ++++-- 3 files changed, 116 insertions(+), 35 deletions(-) diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/ServerImpl.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/ServerImpl.java index b4db7c56b..4e554a7ca 100644 --- a/src/main/java/ru/vk/itmo/test/alenkovayulya/ServerImpl.java +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/ServerImpl.java @@ -17,11 +17,15 @@ import java.lang.foreign.MemorySegment; import java.lang.foreign.ValueLayout; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static ru.vk.itmo.test.alenkovayulya.ShardRouter.REDIRECT_HEADER; import static ru.vk.itmo.test.alenkovayulya.ShardRouter.TIMESTAMP_HEADER; @@ -34,6 +38,11 @@ public class ServerImpl extends HttpServer { private final ExecutorService executorService; private final String url; private final ShardSelector shardSelector; + private static final Set ALLOWED_METHODS = Set.of( + Request.METHOD_GET, + Request.METHOD_PUT, + Request.METHOD_DELETE + ); private static final int[] AVAILABLE_GOOD_RESPONSE_CODES = new int[] {200, 201, 202, 404}; public ServerImpl(ServiceConfig serviceConfig, @@ -59,6 +68,11 @@ private static HttpServerConfig createServerConfig(ServiceConfig serviceConfig) @Override public void handleRequest(Request request, HttpSession session) throws IOException { + if (!ALLOWED_METHODS.contains(request.getMethod())) { + sendEmptyResponse(Response.METHOD_NOT_ALLOWED, session); + return; + } + String id = request.getParameter("id="); if (isEmptyId(id)) { sendEmptyResponse(Response.BAD_REQUEST, session); @@ -104,7 +118,6 @@ private void handleAsLeader(Request request, HttpSession session, String id) { LOGGER.error("Request rejected by policy", e); sendEmptyResponse(Response.SERVICE_UNAVAILABLE, session); } - } private void collectResponses(Request request, @@ -112,54 +125,104 @@ private void collectResponses(Request request, String id, int from, int ack - ) throws IOException { - List responses = new ArrayList<>(); + ) { + List> asyncResponses = new CopyOnWriteArrayList<>(); long timestamp = System.currentTimeMillis(); int firstOwnerShardIndex = shardSelector.getOwnerShardIndex(id); for (int i = 0; i < from; i++) { + CompletableFuture asyncResponse; int shardIndex = (firstOwnerShardIndex + i) % shardSelector.getClusterSize(); if (isRedirectNeeded(shardIndex)) { - handleRedirect(request, timestamp, shardIndex, responses); + asyncResponse = handleRedirect(request, timestamp, shardIndex); } else { - Response response = handleInternalRequest(request, id, timestamp); - responses.add(response); + asyncResponse = handleInternalRequestAsync(request, id, timestamp); } + asyncResponses.add(asyncResponse); + } - checkReplicasResponsesNumber(request, session, responses, ack); + handleAsyncResponses(session, ack, from, request, asyncResponses); + + } + + private void handleAsyncResponses( + HttpSession session, int ack, int from, Request request, + List> completableFutureResponses + ) { + List validResponses = new CopyOnWriteArrayList<>(); + AtomicBoolean isEnoughValidResponses = new AtomicBoolean(); + AtomicInteger allResponsesCounter = new AtomicInteger(); + + for (CompletableFuture completableFuture : completableFutureResponses) { + completableFuture.whenCompleteAsync((response, throwable) -> { + if (isEnoughValidResponses.get()) { + return; + } + allResponsesCounter.incrementAndGet(); + + if (throwable != null) { + response = new Response(Response.INTERNAL_ERROR); + } + + if (isValidResponse(response)) { + validResponses.add(response); + } + + sendResponseIfEnoughReplicasResponsesNumber(request, + isEnoughValidResponses, + session, + validResponses, + ack); + + if (allResponsesCounter.get() == from && validResponses.size() < ack) { + sendEmptyResponse("504 Not Enough Replicas", session); + } + }, executorService).exceptionally((th) -> new Response(Response.INTERNAL_ERROR)); + } } - private void checkReplicasResponsesNumber( + private void sendResponseIfEnoughReplicasResponsesNumber( Request request, + AtomicBoolean isEnoughValidResponses, HttpSession session, List responses, int ack - ) throws IOException { - if (responses.size() >= ack) { - if (request.getMethod() == Request.METHOD_GET) { - session.sendResponse(getResponseWithMaxTimestamp(responses)); - } else { - session.sendResponse(responses.getFirst()); + ) { + try { + if (responses.size() >= ack) { + isEnoughValidResponses.set(true); + if (request.getMethod() == Request.METHOD_GET) { + session.sendResponse(getResponseWithMaxTimestamp(responses)); + } else { + session.sendResponse(responses.getFirst()); + } } - } else { - sendEmptyResponse("504 Not Enough Replicas", session); + } catch (IOException e) { + LOGGER.error("Exception during send win response: ", e); + sendEmptyResponse(Response.INTERNAL_ERROR, session); + session.close(); } } - private void handleRedirect(Request request, long timestamp, int nodeIndex, List responses) { - Response response = redirectRequest(request.getMethodName(), + private boolean isValidResponse(Response response) { + return Arrays.stream(AVAILABLE_GOOD_RESPONSE_CODES) + .anyMatch(code -> code == response.getStatus()); + } + + private CompletableFuture handleRedirect(Request request, long timestamp, int nodeIndex) { + return redirectRequest(request.getMethodName(), request.getParameter("id="), shardSelector.getShardUrlByIndex(nodeIndex), request.getBody() == null ? new byte[0] : request.getBody(), timestamp); - boolean correctRes = Arrays.stream(AVAILABLE_GOOD_RESPONSE_CODES) - .anyMatch(code -> code == response.getStatus()); - if (correctRes) { - responses.add(response); - } + } + + private CompletableFuture handleInternalRequestAsync(Request request, String id, long timestamp) { + return CompletableFuture.supplyAsync(() -> + handleInternalRequest(request, id, timestamp), ShardRouter.proxyExecutor); } private Response handleInternalRequest(Request request, String id, long timestamp) { @@ -233,7 +296,6 @@ private void sendEmptyResponse(String response, HttpSession session) { session.sendResponse(emptyRes); } catch (IOException e) { LOGGER.info("Exception during sending the empty response: ", e); - session.close(); } } diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/ServiceImpl.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/ServiceImpl.java index 54e74f258..d755191a1 100644 --- a/src/main/java/ru/vk/itmo/test/alenkovayulya/ServiceImpl.java +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/ServiceImpl.java @@ -72,7 +72,7 @@ private void shutdownDao() { } } - @ServiceFactory(stage = 4) + @ServiceFactory(stage = 5) public static class Factory implements ServiceFactory.Factory { @Override public Service create(ServiceConfig config) { diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/ShardRouter.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/ShardRouter.java index 0254cebe8..c26f7b397 100644 --- a/src/main/java/ru/vk/itmo/test/alenkovayulya/ShardRouter.java +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/ShardRouter.java @@ -1,5 +1,6 @@ package ru.vk.itmo.test.alenkovayulya; +import one.nio.async.CustomThreadFactory; import one.nio.http.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -8,6 +9,10 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public final class ShardRouter { @@ -17,10 +22,18 @@ public final class ShardRouter { public static final String REDIRECT_HEADER = "Redirect"; private static final HttpClient client = HttpClient.newHttpClient(); + public static ThreadPoolExecutor proxyExecutor = new ThreadPoolExecutor( + 8, + 8, + 0L, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(128), + new CustomThreadFactory("ShardRouter")); + private ShardRouter() { } - public static Response redirectRequest(String method, + public static CompletableFuture redirectRequest(String method, String id, String ownerShardUrl, byte[] body, @@ -33,26 +46,32 @@ public static Response redirectRequest(String method, .method(method, HttpRequest.BodyPublishers.ofByteArray(body)) .build(); try { - HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofByteArray()); - Response shardResponse = new Response(getHttpResponseByCode(response.statusCode()), response.body()); - shardResponse.addHeader(response.headers().firstValue(TIMESTAMP_HEADER).orElse("")); - return shardResponse; + CompletableFuture> response = client.sendAsync( + request, + HttpResponse.BodyHandlers.ofByteArray()); + return response.thenApplyAsync(ShardRouter::getHttpResponseByCode); } catch (Exception e) { LOGGER.error("Error during sending request by router", e); Thread.currentThread().interrupt(); - return new Response(Response.INTERNAL_ERROR, Response.EMPTY); + return CompletableFuture.completedFuture(new Response(Response.INTERNAL_ERROR, Response.EMPTY)); } } - private static String getHttpResponseByCode(int code) { - return switch (code) { + private static Response getHttpResponseByCode(HttpResponse response) { + String responseCode = switch (response.statusCode()) { case 200 -> Response.OK; case 201 -> Response.CREATED; case 202 -> Response.ACCEPTED; case 400 -> Response.BAD_REQUEST; case 404 -> Response.NOT_FOUND; case 500 -> Response.INTERNAL_ERROR; - default -> throw new IllegalStateException("Not available status code: " + code); + default -> throw new IllegalStateException("Not available status code: " + response.statusCode()); }; + + Response shardResponse = new Response(responseCode, response.body()); + shardResponse.addHeader(response.headers().firstValue(TIMESTAMP_HEADER).orElse("")); + + return shardResponse; + } }