Skip to content

Commit

Permalink
Супрядкина Дарья ИТМО DWS stage 5 (#180)
Browse files Browse the repository at this point in the history
* HW1: add realization

* HW1: fix code style

* HW1: change exception constructor

* HW1: add report

* HW1: add flame graphs description

* HW1: add flame graphs description

* HW1: fix code style

* HW1: fixes according to review

* HW2: first version of HW2

* HW2: fix codestyle

* HW2: fixes according to review

* HW2: change string template to typical string

* HW2: change parameters

* HW2: fix

* HW2: just finish

* HW2: fixes according to review

* HW2: explaining absence of flush

* HW3: add realization

* HW3: fix style

* HW3: add final

* HW3: fix finals

* HW3: change to async thread

* HW3: add shutdown threads

* HW3: fix style

* HW3: add first part of report

* HW3: add full report

* HW3: fix style

* HW3: add explanations about hashCode()

* HW4: add implementation

* HW4: fix style

* HW4: add report

* HW4: fix code style

* HW4: add note

* HW5: add realization

* HW5: add completed future

* HW5: fix tolerance

* HW5: fix codestyle

* HW5: fix codestyle

* HW5: close sessions

* HW5: rewrite to method sendResponseAndCloseSession

* HW5: remove redundant parameter

* HW5: set close sessions

* HW5: change

* HW5: change

* HW5: fix multithreading problem for better profiling

* HW5: fix codestyle

* HW5: add report

* HW5: fixes according to review

* HW5: fix header

* HW5: fixes according to review

* HW5: complete todo

---------

Co-authored-by: Vadim Tsesko <[email protected]>
Co-authored-by: pashchenko8 <[email protected]>
  • Loading branch information
3 people authored May 11, 2024
1 parent 780d69f commit 5b7300e
Show file tree
Hide file tree
Showing 21 changed files with 22,439 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ public final class HeaderConstraints {

public static final String TIMESTAMP_MILLIS_HEADER = "X-TIMESTAMP-MILLIS: ";
public static final String TIMESTAMP_MILLIS_HEADER_NORMAL = "X-TIMESTAMP-MILLIS";
public static final String FROM_HEADER = "X-FROM";
public static final String FROM_HEADER_NORMAL = "X-FROM";
public static final String FROM_HEADER = "X-FROM: ";

private HeaderConstraints() {
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@

import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.util.concurrent.CompletableFuture;

public class SelfRequestHandler {

private static final String TIMESTAMP_MILLIS_HEADER = "X-TIMESTAMP-MILLIS: ";
private final Dao<MemorySegment, ExtendedEntry<MemorySegment>> dao;
private final Utils utils;
private static final String TIMESTAMP_MILLIS_HEADER = "X-TIMESTAMP-MILLIS: ";

public SelfRequestHandler(Dao<MemorySegment, ExtendedEntry<MemorySegment>> dao, Utils utils) {
this.dao = dao;
Expand All @@ -23,14 +24,22 @@ public SelfRequestHandler(Dao<MemorySegment, ExtendedEntry<MemorySegment>> dao,

public Response handleRequest(Request request) {
String id = utils.getIdParameter(request);
return switch (request.getMethodName()) {
case "GET" -> get(id);
case "PUT" -> put(id, request);
case "DELETE" -> delete(id);
return switch (request.getMethod()) {
case Request.METHOD_GET -> get(id);
case Request.METHOD_PUT -> put(id, request);
case Request.METHOD_DELETE -> delete(id);
default -> new Response(Response.NOT_FOUND, Response.EMPTY);
};
}

public CompletableFuture<Response> handleAsyncRequest(Request request) {
return composeFuture(handleRequest(request));
}

private CompletableFuture<Response> composeFuture(Response response) {
return CompletableFuture.completedFuture(response);
}

public Response get(String id) {
try {
if (id == null || id.isEmpty()) {
Expand Down
189 changes: 96 additions & 93 deletions src/main/java/ru/vk/itmo/test/dariasupriadkina/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,30 @@
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.FROM_HEADER;
import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.FROM_HEADER_NORMAL;
import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.TIMESTAMP_MILLIS_HEADER;
import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.TIMESTAMP_MILLIS_HEADER_NORMAL;

public class Server extends HttpServer {

private static final Logger logger = LoggerFactory.getLogger(Server.class.getName());
private static final byte[] EMPTY_BYTE_ARRAY = new byte[]{};
private final ExecutorService workerExecutor;
private final Set<Integer> permittedMethods =
Set.of(Request.METHOD_GET, Request.METHOD_PUT, Request.METHOD_DELETE);
private final String selfUrl;
private final ShardingPolicy shardingPolicy;
private final HttpClient httpClient;
private static final int REQUEST_TIMEOUT_SEC = 2;
private final List<String> clusterUrls;
private final Utils utils;
private final SelfRequestHandler selfHandler;
Expand All @@ -70,7 +70,7 @@ private static HttpServerConfig createHttpServerConfig(ServiceConfig serviceConf
acceptorConfig.port = serviceConfig.selfPort();
acceptorConfig.reusePort = true;

httpServerConfig.acceptors = new AcceptorConfig[] {acceptorConfig};
httpServerConfig.acceptors = new AcceptorConfig[]{acceptorConfig};
httpServerConfig.closeSessions = true;

return httpServerConfig;
Expand All @@ -83,7 +83,6 @@ public void handleDefault(Request request, HttpSession session) throws IOExcepti
response = new Response(Response.BAD_REQUEST, Response.EMPTY);
} else {
response = new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY);

}
session.sendResponse(response);
}
Expand All @@ -93,74 +92,68 @@ public void handleRequest(Request request, HttpSession session) throws IOExcepti
try {
workerExecutor.execute(() -> {
try {
if (!permittedMethods.contains(request.getMethod())) {
Map<String, Integer> ackFrom = getFromAndAck(request);
int from = ackFrom.get("from");
int ack = ackFrom.get("ack");
if (!permittedMethods.contains(request.getMethod())
|| checkBadRequest(ack, from, request.getMethod())) {
handleDefault(request, session);
return;
}
if (request.getHeader(FROM_HEADER) == null) {
if (request.getHeader(FROM_HEADER_NORMAL) == null) {
request.addHeader(FROM_HEADER + selfUrl);

Map<String, Integer> ackFrom = getFromAndAck(request);
int from = ackFrom.get("from");
int ack = ackFrom.get("ack");

Response response = mergeResponses(broadcast(
shardingPolicy.getNodesById(utils.getIdParameter(request), from), request, ack),
ack, from);

session.sendResponse(response);

collectResponsesCallback(
broadcast(
shardingPolicy.getNodesById(utils.getIdParameter(request), from),
request
), ack, from, session);
} else {
Response resp = selfHandler.handleRequest(request);
checkTimestampHeaderExistenceAndSet(resp);
session.sendResponse(resp);
}

} catch (Exception e) {
logger.error("Unexpected error", e);
try {
if (e.getClass() == HttpException.class) {
session.sendResponse(new Response(Response.BAD_REQUEST, Response.EMPTY));
} else {
session.sendResponse(new Response(Response.INTERNAL_ERROR, Response.EMPTY));
}
} catch (IOException ex) {
logger.error("Failed to send error response", e);
session.close();
}
solveUnexpectedError(e, session);
}
});
} catch (RejectedExecutionException e) {
logger.error("Service is unavailable", e);
session.sendResponse(new Response(Response.SERVICE_UNAVAILABLE, Response.EMPTY));
}
}

private void solveUnexpectedError(Exception e, HttpSession session) {
logger.error("Unexpected error", e);
try {
if (e.getClass() == HttpException.class) {
session.sendResponse(new Response(Response.BAD_REQUEST, Response.EMPTY));
} else {
session.sendResponse(new Response(Response.INTERNAL_ERROR, Response.EMPTY));
}
} catch (Exception exception) {
logger.error("Failed to send error response", exception);
session.scheduleClose();
}
}

private List<Response> broadcast(List<String> nodes, Request request, int ack) {
int internalAck = ack;
List<Response> responses = new ArrayList<>(ack);
Response response;
private boolean checkBadRequest(int ack, int from, int method) {
return !permittedMethods.contains(method) || ack > from || ack <= 0 || from > clusterUrls.size();
}

private List<CompletableFuture<Response>> broadcast(List<String> nodes, Request request) {
List<CompletableFuture<Response>> futureResponses = new ArrayList<>(nodes.size());
CompletableFuture<Response> response;
if (nodes.contains(selfUrl)) {
response = selfHandler.handleRequest(request);
checkTimestampHeaderExistenceAndSet(response);
responses.add(response);
response = selfHandler.handleAsyncRequest(request);
futureResponses.add(response);
nodes.remove(selfUrl);
if (--internalAck == 0) {
return responses;
}
}

for (String node : nodes) {
response = handleProxy(utils.getEntryUrl(node, utils.getIdParameter(request)), request);
if (response.getStatus() < 500) {
checkTimestampHeaderExistenceAndSet(response);
responses.add(response);
if (--internalAck == 0) {
return responses;
}
}
futureResponses.add(response);
}
return responses;
return futureResponses;
}

private void checkTimestampHeaderExistenceAndSet(Response response) {
Expand All @@ -171,58 +164,68 @@ private void checkTimestampHeaderExistenceAndSet(Response response) {
}
}

private Response mergeResponses(List<Response> responses, int ack, int from) {
if (responses.stream().filter(response -> response.getStatus() == 400).count() == from
|| ack > from
|| from > clusterUrls.size()
|| ack <= 0) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
private void collectResponsesCallback(List<CompletableFuture<Response>> futureResponses,
int ack, int from, HttpSession session) {
List<Response> responses = new CopyOnWriteArrayList<>();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger exceptionCount = new AtomicInteger(0);
for (CompletableFuture<Response> futureResponse : futureResponses) {
futureResponse.whenCompleteAsync((response, exception) -> {

if (exception == null && response.getStatus() < 500) {
checkTimestampHeaderExistenceAndSet(response);
responses.add(response);
successCount.incrementAndGet();
if (successCount.get() == ack) {
sendAsyncResponse(chooseResponse(responses), session);
}
} else {
exceptionCount.incrementAndGet();
if (exceptionCount.get() > from - ack) {
sendAsyncResponse(new Response("504 Not enough replicas", Response.EMPTY), session);
}
}

}, workerExecutor).exceptionally(exception -> {
logger.error("Error happened while collecting responses from nodes", exception);
sendAsyncResponse(new Response(Response.INTERNAL_ERROR, Response.EMPTY), session);
return null;
});
}
if (responses.stream().filter(response -> response.getStatus() == 200
|| response.getStatus() == 404
|| response.getStatus() == 202
|| response.getStatus() == 201).count() < ack) {
return new Response("504 Not Enough Replicas", Response.EMPTY);
}

private void sendAsyncResponse(Response resp, HttpSession session) {
try {
session.sendResponse(resp);
} catch (IOException e) {
logger.error("Failed to send error response", e);
session.scheduleClose();
}
}

private Response chooseResponse(List<Response> responses) {
return responses.stream().max((o1, o2) -> {
Long header1 = Long.parseLong(o1.getHeader(TIMESTAMP_MILLIS_HEADER));
Long header2 = Long.parseLong(o2.getHeader(TIMESTAMP_MILLIS_HEADER));
return header1.compareTo(header2);
}).get();
}

public Response handleProxy(String redirectedUrl, Request request) {
try {
HttpRequest httpRequest = HttpRequest.newBuilder(URI.create(redirectedUrl))
.header(FROM_HEADER, selfUrl)
.method(request.getMethodName(), HttpRequest.BodyPublishers.ofByteArray(
request.getBody() == null ? new byte[]{} : request.getBody())
).build();
HttpResponse<byte[]> response = httpClient
.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray())
.get(REQUEST_TIMEOUT_SEC, TimeUnit.SECONDS);
Response response1 = new Response(String.valueOf(response.statusCode()), response.body());
if (response.headers().map().get(TIMESTAMP_MILLIS_HEADER_NORMAL) == null) {
response1.addHeader(TIMESTAMP_MILLIS_HEADER + "0");
} else {
response1.addHeader(TIMESTAMP_MILLIS_HEADER
+ response.headers().map().get(
TIMESTAMP_MILLIS_HEADER_NORMAL.toLowerCase(Locale.ROOT)).getFirst()
);
}

return response1;
} catch (ExecutionException e) {
logger.error("Unexpected error", e);
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
} catch (TimeoutException e) {
logger.error("Timeout reached", e);
return new Response(Response.REQUEST_TIMEOUT, Response.EMPTY);
} catch (InterruptedException e) {
logger.error("Service is unavailable", e);
Thread.currentThread().interrupt();
return new Response(Response.SERVICE_UNAVAILABLE, Response.EMPTY);
}
public CompletableFuture<Response> handleProxy(String redirectedUrl, Request request) {
HttpRequest httpRequest = HttpRequest.newBuilder(URI.create(redirectedUrl))
.header(FROM_HEADER_NORMAL, selfUrl)
.method(request.getMethodName(), HttpRequest.BodyPublishers.ofByteArray(
request.getBody() == null ? EMPTY_BYTE_ARRAY : request.getBody())
).build();
return httpClient
.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray())
.thenApplyAsync(httpResponse -> {
Response response1 = new Response(String.valueOf(httpResponse.statusCode()), httpResponse.body());
if (httpResponse.headers().map().get(TIMESTAMP_MILLIS_HEADER_NORMAL) == null) {
response1.addHeader(TIMESTAMP_MILLIS_HEADER + "0");
}
return response1;
}, workerExecutor);
}

private Map<String, Integer> getFromAndAck(Request request) {
Expand Down
Loading

0 comments on commit 5b7300e

Please sign in to comment.