Skip to content

Commit

Permalink
Merge branch 'main' into hw6
Browse files Browse the repository at this point in the history
  • Loading branch information
incubos authored May 11, 2024
2 parents 8ff70c9 + 8028e6a commit aecc716
Show file tree
Hide file tree
Showing 261 changed files with 412,768 additions and 968 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ public final class HeaderConstraints {

public static final String TIMESTAMP_MILLIS_HEADER = "X-TIMESTAMP-MILLIS: ";
public static final String TIMESTAMP_MILLIS_HEADER_NORMAL = "X-TIMESTAMP-MILLIS";
public static final String FROM_HEADER = "X-FROM";
public static final String FROM_HEADER_NORMAL = "X-FROM";
public static final String FROM_HEADER = "X-FROM: ";

private HeaderConstraints() {
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@

import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.util.concurrent.CompletableFuture;

public class SelfRequestHandler {

private static final String TIMESTAMP_MILLIS_HEADER = "X-TIMESTAMP-MILLIS: ";
private final Dao<MemorySegment, ExtendedEntry<MemorySegment>> dao;
private final Utils utils;
private static final String TIMESTAMP_MILLIS_HEADER = "X-TIMESTAMP-MILLIS: ";

public SelfRequestHandler(Dao<MemorySegment, ExtendedEntry<MemorySegment>> dao, Utils utils) {
this.dao = dao;
Expand All @@ -23,14 +24,22 @@ public SelfRequestHandler(Dao<MemorySegment, ExtendedEntry<MemorySegment>> dao,

public Response handleRequest(Request request) {
String id = utils.getIdParameter(request);
return switch (request.getMethodName()) {
case "GET" -> get(id);
case "PUT" -> put(id, request);
case "DELETE" -> delete(id);
return switch (request.getMethod()) {
case Request.METHOD_GET -> get(id);
case Request.METHOD_PUT -> put(id, request);
case Request.METHOD_DELETE -> delete(id);
default -> new Response(Response.NOT_FOUND, Response.EMPTY);
};
}

public CompletableFuture<Response> handleAsyncRequest(Request request) {
return composeFuture(handleRequest(request));
}

private CompletableFuture<Response> composeFuture(Response response) {
return CompletableFuture.completedFuture(response);
}

public Response get(String id) {
try {
if (id == null || id.isEmpty()) {
Expand Down
189 changes: 96 additions & 93 deletions src/main/java/ru/vk/itmo/test/dariasupriadkina/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,30 @@
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.FROM_HEADER;
import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.FROM_HEADER_NORMAL;
import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.TIMESTAMP_MILLIS_HEADER;
import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.TIMESTAMP_MILLIS_HEADER_NORMAL;

public class Server extends HttpServer {

private static final Logger logger = LoggerFactory.getLogger(Server.class.getName());
private static final byte[] EMPTY_BYTE_ARRAY = new byte[]{};
private final ExecutorService workerExecutor;
private final Set<Integer> permittedMethods =
Set.of(Request.METHOD_GET, Request.METHOD_PUT, Request.METHOD_DELETE);
private final String selfUrl;
private final ShardingPolicy shardingPolicy;
private final HttpClient httpClient;
private static final int REQUEST_TIMEOUT_SEC = 2;
private final List<String> clusterUrls;
private final Utils utils;
private final SelfRequestHandler selfHandler;
Expand All @@ -70,7 +70,7 @@ private static HttpServerConfig createHttpServerConfig(ServiceConfig serviceConf
acceptorConfig.port = serviceConfig.selfPort();
acceptorConfig.reusePort = true;

httpServerConfig.acceptors = new AcceptorConfig[] {acceptorConfig};
httpServerConfig.acceptors = new AcceptorConfig[]{acceptorConfig};
httpServerConfig.closeSessions = true;

return httpServerConfig;
Expand All @@ -83,7 +83,6 @@ public void handleDefault(Request request, HttpSession session) throws IOExcepti
response = new Response(Response.BAD_REQUEST, Response.EMPTY);
} else {
response = new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY);

}
session.sendResponse(response);
}
Expand All @@ -93,74 +92,68 @@ public void handleRequest(Request request, HttpSession session) throws IOExcepti
try {
workerExecutor.execute(() -> {
try {
if (!permittedMethods.contains(request.getMethod())) {
Map<String, Integer> ackFrom = getFromAndAck(request);
int from = ackFrom.get("from");
int ack = ackFrom.get("ack");
if (!permittedMethods.contains(request.getMethod())
|| checkBadRequest(ack, from, request.getMethod())) {
handleDefault(request, session);
return;
}
if (request.getHeader(FROM_HEADER) == null) {
if (request.getHeader(FROM_HEADER_NORMAL) == null) {
request.addHeader(FROM_HEADER + selfUrl);

Map<String, Integer> ackFrom = getFromAndAck(request);
int from = ackFrom.get("from");
int ack = ackFrom.get("ack");

Response response = mergeResponses(broadcast(
shardingPolicy.getNodesById(utils.getIdParameter(request), from), request, ack),
ack, from);

session.sendResponse(response);

collectResponsesCallback(
broadcast(
shardingPolicy.getNodesById(utils.getIdParameter(request), from),
request
), ack, from, session);
} else {
Response resp = selfHandler.handleRequest(request);
checkTimestampHeaderExistenceAndSet(resp);
session.sendResponse(resp);
}

} catch (Exception e) {
logger.error("Unexpected error", e);
try {
if (e.getClass() == HttpException.class) {
session.sendResponse(new Response(Response.BAD_REQUEST, Response.EMPTY));
} else {
session.sendResponse(new Response(Response.INTERNAL_ERROR, Response.EMPTY));
}
} catch (IOException ex) {
logger.error("Failed to send error response", e);
session.close();
}
solveUnexpectedError(e, session);
}
});
} catch (RejectedExecutionException e) {
logger.error("Service is unavailable", e);
session.sendResponse(new Response(Response.SERVICE_UNAVAILABLE, Response.EMPTY));
}
}

private void solveUnexpectedError(Exception e, HttpSession session) {
logger.error("Unexpected error", e);
try {
if (e.getClass() == HttpException.class) {
session.sendResponse(new Response(Response.BAD_REQUEST, Response.EMPTY));
} else {
session.sendResponse(new Response(Response.INTERNAL_ERROR, Response.EMPTY));
}
} catch (Exception exception) {
logger.error("Failed to send error response", exception);
session.scheduleClose();
}
}

private List<Response> broadcast(List<String> nodes, Request request, int ack) {
int internalAck = ack;
List<Response> responses = new ArrayList<>(ack);
Response response;
private boolean checkBadRequest(int ack, int from, int method) {
return !permittedMethods.contains(method) || ack > from || ack <= 0 || from > clusterUrls.size();
}

private List<CompletableFuture<Response>> broadcast(List<String> nodes, Request request) {
List<CompletableFuture<Response>> futureResponses = new ArrayList<>(nodes.size());
CompletableFuture<Response> response;
if (nodes.contains(selfUrl)) {
response = selfHandler.handleRequest(request);
checkTimestampHeaderExistenceAndSet(response);
responses.add(response);
response = selfHandler.handleAsyncRequest(request);
futureResponses.add(response);
nodes.remove(selfUrl);
if (--internalAck == 0) {
return responses;
}
}

for (String node : nodes) {
response = handleProxy(utils.getEntryUrl(node, utils.getIdParameter(request)), request);
if (response.getStatus() < 500) {
checkTimestampHeaderExistenceAndSet(response);
responses.add(response);
if (--internalAck == 0) {
return responses;
}
}
futureResponses.add(response);
}
return responses;
return futureResponses;
}

private void checkTimestampHeaderExistenceAndSet(Response response) {
Expand All @@ -171,58 +164,68 @@ private void checkTimestampHeaderExistenceAndSet(Response response) {
}
}

private Response mergeResponses(List<Response> responses, int ack, int from) {
if (responses.stream().filter(response -> response.getStatus() == 400).count() == from
|| ack > from
|| from > clusterUrls.size()
|| ack <= 0) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
private void collectResponsesCallback(List<CompletableFuture<Response>> futureResponses,
int ack, int from, HttpSession session) {
List<Response> responses = new CopyOnWriteArrayList<>();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger exceptionCount = new AtomicInteger(0);
for (CompletableFuture<Response> futureResponse : futureResponses) {
futureResponse.whenCompleteAsync((response, exception) -> {

if (exception == null && response.getStatus() < 500) {
checkTimestampHeaderExistenceAndSet(response);
responses.add(response);
successCount.incrementAndGet();
if (successCount.get() == ack) {
sendAsyncResponse(chooseResponse(responses), session);
}
} else {
exceptionCount.incrementAndGet();
if (exceptionCount.get() > from - ack) {
sendAsyncResponse(new Response("504 Not enough replicas", Response.EMPTY), session);
}
}

}, workerExecutor).exceptionally(exception -> {
logger.error("Error happened while collecting responses from nodes", exception);
sendAsyncResponse(new Response(Response.INTERNAL_ERROR, Response.EMPTY), session);
return null;
});
}
if (responses.stream().filter(response -> response.getStatus() == 200
|| response.getStatus() == 404
|| response.getStatus() == 202
|| response.getStatus() == 201).count() < ack) {
return new Response("504 Not Enough Replicas", Response.EMPTY);
}

private void sendAsyncResponse(Response resp, HttpSession session) {
try {
session.sendResponse(resp);
} catch (IOException e) {
logger.error("Failed to send error response", e);
session.scheduleClose();
}
}

private Response chooseResponse(List<Response> responses) {
return responses.stream().max((o1, o2) -> {
Long header1 = Long.parseLong(o1.getHeader(TIMESTAMP_MILLIS_HEADER));
Long header2 = Long.parseLong(o2.getHeader(TIMESTAMP_MILLIS_HEADER));
return header1.compareTo(header2);
}).get();
}

public Response handleProxy(String redirectedUrl, Request request) {
try {
HttpRequest httpRequest = HttpRequest.newBuilder(URI.create(redirectedUrl))
.header(FROM_HEADER, selfUrl)
.method(request.getMethodName(), HttpRequest.BodyPublishers.ofByteArray(
request.getBody() == null ? new byte[]{} : request.getBody())
).build();
HttpResponse<byte[]> response = httpClient
.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray())
.get(REQUEST_TIMEOUT_SEC, TimeUnit.SECONDS);
Response response1 = new Response(String.valueOf(response.statusCode()), response.body());
if (response.headers().map().get(TIMESTAMP_MILLIS_HEADER_NORMAL) == null) {
response1.addHeader(TIMESTAMP_MILLIS_HEADER + "0");
} else {
response1.addHeader(TIMESTAMP_MILLIS_HEADER
+ response.headers().map().get(
TIMESTAMP_MILLIS_HEADER_NORMAL.toLowerCase(Locale.ROOT)).getFirst()
);
}

return response1;
} catch (ExecutionException e) {
logger.error("Unexpected error", e);
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
} catch (TimeoutException e) {
logger.error("Timeout reached", e);
return new Response(Response.REQUEST_TIMEOUT, Response.EMPTY);
} catch (InterruptedException e) {
logger.error("Service is unavailable", e);
Thread.currentThread().interrupt();
return new Response(Response.SERVICE_UNAVAILABLE, Response.EMPTY);
}
public CompletableFuture<Response> handleProxy(String redirectedUrl, Request request) {
HttpRequest httpRequest = HttpRequest.newBuilder(URI.create(redirectedUrl))
.header(FROM_HEADER_NORMAL, selfUrl)
.method(request.getMethodName(), HttpRequest.BodyPublishers.ofByteArray(
request.getBody() == null ? EMPTY_BYTE_ARRAY : request.getBody())
).build();
return httpClient
.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray())
.thenApplyAsync(httpResponse -> {
Response response1 = new Response(String.valueOf(httpResponse.statusCode()), httpResponse.body());
if (httpResponse.headers().map().get(TIMESTAMP_MILLIS_HEADER_NORMAL) == null) {
response1.addHeader(TIMESTAMP_MILLIS_HEADER + "0");
}
return response1;
}, workerExecutor);
}

private Map<String, Integer> getFromAndAck(Request request) {
Expand Down
Loading

0 comments on commit aecc716

Please sign in to comment.