Skip to content

Commit

Permalink
Stage 5 - basic implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
yulalenk committed Apr 11, 2024
1 parent cfe4755 commit f0d52dc
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 35 deletions.
113 changes: 88 additions & 25 deletions src/main/java/ru/vk/itmo/test/alenkovayulya/ServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,33 @@
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static ru.vk.itmo.test.alenkovayulya.ShardRouter.REDIRECT_HEADER;
import static ru.vk.itmo.test.alenkovayulya.ShardRouter.TIMESTAMP_HEADER;
import static ru.vk.itmo.test.alenkovayulya.ShardRouter.redirectRequest;


public class ServerImpl extends HttpServer {

private static final Logger LOGGER = LoggerFactory.getLogger(ServerImpl.class);
private final Dao<MemorySegment, EntryWithTimestamp<MemorySegment>> referenceDao;
private final ExecutorService executorService;
private final String url;
private final ShardSelector shardSelector;
private static final Set<Integer> ALLOWED_METHODS = Set.of(
Request.METHOD_GET,
Request.METHOD_PUT,
Request.METHOD_DELETE
);
private static final int[] AVAILABLE_GOOD_RESPONSE_CODES = new int[] {200, 201, 202, 404};

public ServerImpl(ServiceConfig serviceConfig,
Expand All @@ -59,6 +69,11 @@ private static HttpServerConfig createServerConfig(ServiceConfig serviceConfig)

@Override
public void handleRequest(Request request, HttpSession session) throws IOException {
if (!ALLOWED_METHODS.contains(request.getMethod())) {
sendEmptyResponse(Response.METHOD_NOT_ALLOWED, session);
return;
}

String id = request.getParameter("id=");
if (isEmptyId(id)) {
sendEmptyResponse(Response.BAD_REQUEST, session);
Expand Down Expand Up @@ -104,62 +119,111 @@ private void handleAsLeader(Request request, HttpSession session, String id) {
LOGGER.error("Request rejected by policy", e);
sendEmptyResponse(Response.SERVICE_UNAVAILABLE, session);
}

}

private void collectResponses(Request request,
HttpSession session,
String id,
int from,
int ack
) throws IOException {
List<Response> responses = new ArrayList<>();
) {
List<CompletableFuture<Response>> asyncResponses = new CopyOnWriteArrayList<>();
long timestamp = System.currentTimeMillis();
int firstOwnerShardIndex = shardSelector.getOwnerShardIndex(id);

for (int i = 0; i < from; i++) {
CompletableFuture<Response> asyncResponse;
int shardIndex = (firstOwnerShardIndex + i) % shardSelector.getClusterSize();

if (isRedirectNeeded(shardIndex)) {
handleRedirect(request, timestamp, shardIndex, responses);
asyncResponse = handleRedirect(request, timestamp, shardIndex);
} else {
Response response = handleInternalRequest(request, id, timestamp);
responses.add(response);
asyncResponse = handleInternalRequestAsync(request, id, timestamp);
}

asyncResponses.add(asyncResponse);

}

checkReplicasResponsesNumber(request, session, responses, ack);
handleAsyncResponses(session, ack, from, request, asyncResponses);

}

private void checkReplicasResponsesNumber(
private void handleAsyncResponses(
HttpSession session, int ack, int from, Request request,
List<CompletableFuture<Response>> completableFutureResponses
) {
List<Response> validResponses = new CopyOnWriteArrayList<>();
AtomicBoolean isEnoughValidResponses = new AtomicBoolean();
AtomicInteger allResponsesCounter = new AtomicInteger();

for (CompletableFuture<Response> completableFuture : completableFutureResponses) {
completableFuture.whenCompleteAsync((response, throwable) -> {
if (isEnoughValidResponses.get()) {
return;
}
allResponsesCounter.incrementAndGet();

if (throwable != null) {
response = new Response(Response.INTERNAL_ERROR);
}

if (isValidResponse(response)) {
validResponses.add(response);
}

sendResponseIfEnoughReplicasResponsesNumber(request,
isEnoughValidResponses,
session,
validResponses,
ack);

if (allResponsesCounter.get() == from && validResponses.size() < ack) {
sendEmptyResponse("504 Not Enough Replicas", session);
}
}, executorService).exceptionally((th) -> new Response(Response.INTERNAL_ERROR));
}
}

private void sendResponseIfEnoughReplicasResponsesNumber(
Request request,
AtomicBoolean isEnoughValidResponses,
HttpSession session,
List<Response> responses,
int ack
) throws IOException {
if (responses.size() >= ack) {
if (request.getMethod() == Request.METHOD_GET) {
session.sendResponse(getResponseWithMaxTimestamp(responses));
} else {
session.sendResponse(responses.getFirst());
) {
try {
if (responses.size() >= ack) {
isEnoughValidResponses.set(true);
if (request.getMethod() == Request.METHOD_GET) {
session.sendResponse(getResponseWithMaxTimestamp(responses));
} else {
session.sendResponse(responses.getFirst());
}
}
} else {
sendEmptyResponse("504 Not Enough Replicas", session);
} catch (IOException e) {
LOGGER.error("Exception during send win response: ", e);
sendEmptyResponse(Response.INTERNAL_ERROR, session);
session.close();
}
}

private void handleRedirect(Request request, long timestamp, int nodeIndex, List<Response> responses) {
Response response = redirectRequest(request.getMethodName(),
private boolean isValidResponse(Response response) {
return Arrays.stream(AVAILABLE_GOOD_RESPONSE_CODES)
.anyMatch(code -> code == response.getStatus());
}

private CompletableFuture<Response> handleRedirect(Request request, long timestamp, int nodeIndex) {
return redirectRequest(request.getMethodName(),
request.getParameter("id="),
shardSelector.getShardUrlByIndex(nodeIndex),
request.getBody() == null
? new byte[0] : request.getBody(), timestamp);
boolean correctRes = Arrays.stream(AVAILABLE_GOOD_RESPONSE_CODES)
.anyMatch(code -> code == response.getStatus());
if (correctRes) {
responses.add(response);
}
}

private CompletableFuture<Response> handleInternalRequestAsync(Request request, String id, long timestamp) {
return CompletableFuture.supplyAsync(() ->
handleInternalRequest(request, id, timestamp), ShardRouter.proxyExecutor);
}

private Response handleInternalRequest(Request request, String id, long timestamp) {
Expand Down Expand Up @@ -233,7 +297,6 @@ private void sendEmptyResponse(String response, HttpSession session) {
session.sendResponse(emptyRes);
} catch (IOException e) {
LOGGER.info("Exception during sending the empty response: ", e);
session.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private void shutdownDao() {
}
}

@ServiceFactory(stage = 4)
@ServiceFactory(stage = 5)
public static class Factory implements ServiceFactory.Factory {
@Override
public Service create(ServiceConfig config) {
Expand Down
37 changes: 28 additions & 9 deletions src/main/java/ru/vk/itmo/test/alenkovayulya/ShardRouter.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ru.vk.itmo.test.alenkovayulya;

import one.nio.async.CustomThreadFactory;
import one.nio.http.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -8,6 +9,10 @@
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public final class ShardRouter {

Expand All @@ -17,10 +22,18 @@ public final class ShardRouter {
public static final String REDIRECT_HEADER = "Redirect";
private static final HttpClient client = HttpClient.newHttpClient();

public static ThreadPoolExecutor proxyExecutor = new ThreadPoolExecutor(
8,
8,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(128),
new CustomThreadFactory("ShardRouter"));

private ShardRouter() {
}

public static Response redirectRequest(String method,
public static CompletableFuture<Response> redirectRequest(String method,
String id,
String ownerShardUrl,
byte[] body,
Expand All @@ -33,26 +46,32 @@ public static Response redirectRequest(String method,
.method(method, HttpRequest.BodyPublishers.ofByteArray(body))
.build();
try {
HttpResponse<byte[]> response = client.send(request, HttpResponse.BodyHandlers.ofByteArray());
Response shardResponse = new Response(getHttpResponseByCode(response.statusCode()), response.body());
shardResponse.addHeader(response.headers().firstValue(TIMESTAMP_HEADER).orElse(""));
return shardResponse;
CompletableFuture<HttpResponse<byte[]>> response = client.sendAsync(
request,
HttpResponse.BodyHandlers.ofByteArray());
return response.thenApplyAsync(ShardRouter::getHttpResponseByCode);
} catch (Exception e) {
LOGGER.error("Error during sending request by router", e);
Thread.currentThread().interrupt();
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
return CompletableFuture.completedFuture(new Response(Response.INTERNAL_ERROR, Response.EMPTY));
}
}

private static String getHttpResponseByCode(int code) {
return switch (code) {
private static Response getHttpResponseByCode(HttpResponse<byte[]> response) {
String responseCode = switch (response.statusCode()) {
case 200 -> Response.OK;
case 201 -> Response.CREATED;
case 202 -> Response.ACCEPTED;
case 400 -> Response.BAD_REQUEST;
case 404 -> Response.NOT_FOUND;
case 500 -> Response.INTERNAL_ERROR;
default -> throw new IllegalStateException("Not available status code: " + code);
default -> throw new IllegalStateException("Not available status code: " + response.statusCode());
};

Response shardResponse = new Response(responseCode, response.body());
shardResponse.addHeader(response.headers().firstValue(TIMESTAMP_HEADER).orElse(""));

return shardResponse;

}
}

0 comments on commit f0d52dc

Please sign in to comment.