Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Аленькова Юлия ИТМО DWS Stage5 #185

Merged
merged 14 commits into from
May 22, 2024
311 changes: 244 additions & 67 deletions src/main/java/ru/vk/itmo/test/alenkovayulya/ServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,50 @@
import one.nio.http.HttpServer;
import one.nio.http.HttpServerConfig;
import one.nio.http.HttpSession;
import one.nio.http.Param;
import one.nio.http.Path;
import one.nio.http.Request;
import one.nio.http.RequestMethod;
import one.nio.http.Response;
import one.nio.server.AcceptorConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.dao.BaseEntry;
import ru.vk.itmo.dao.Dao;
import ru.vk.itmo.dao.Entry;
import ru.vk.itmo.test.alenkovayulya.dao.BaseEntryWithTimestamp;
import ru.vk.itmo.test.alenkovayulya.dao.Dao;
import ru.vk.itmo.test.alenkovayulya.dao.EntryWithTimestamp;

import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static ru.vk.itmo.test.alenkovayulya.ShardRouter.REDIRECT_HEADER;
import static ru.vk.itmo.test.alenkovayulya.ShardRouter.TIMESTAMP_HEADER;
import static ru.vk.itmo.test.alenkovayulya.ShardRouter.redirectRequest;

public class ServerImpl extends HttpServer {
public static final String PATH = "/v0/entity";

private static final Logger LOGGER = LoggerFactory.getLogger(ServerImpl.class);
private final Dao<MemorySegment, Entry<MemorySegment>> referenceDao;
private final Dao<MemorySegment, EntryWithTimestamp<MemorySegment>> referenceDao;
private final ExecutorService executorService;
private final String url;
private final ShardSelector shardSelector;
private static final Set<Integer> ALLOWED_METHODS = Set.of(
Request.METHOD_GET,
Request.METHOD_PUT,
Request.METHOD_DELETE
);
private static final int[] AVAILABLE_GOOD_RESPONSE_CODES = new int[] {200, 201, 202, 404};

public ServerImpl(ServiceConfig serviceConfig,
Dao<MemorySegment, Entry<MemorySegment>> referenceDao,
Dao<MemorySegment, EntryWithTimestamp<MemorySegment>> referenceDao,
ExecutorService executorService, ShardSelector shardSelector) throws IOException {
super(createServerConfig(serviceConfig));
this.referenceDao = referenceDao;
Expand All @@ -54,87 +67,251 @@ private static HttpServerConfig createServerConfig(ServiceConfig serviceConfig)
}

@Override
public void handleRequest(Request request, HttpSession session) {
executorService.execute(() -> {
try {
super.handleRequest(request, session);
} catch (Exception e) {
public void handleRequest(Request request, HttpSession session) throws IOException {
if (!ALLOWED_METHODS.contains(request.getMethod())) {
sendEmptyResponse(Response.METHOD_NOT_ALLOWED, session);
return;
}

String id = request.getParameter("id=");
if (isEmptyId(id)) {
sendEmptyResponse(Response.BAD_REQUEST, session);
return;
}

if (request.getHeader(REDIRECT_HEADER) != null) {
long timestamp = resolveTimestamp(request.getHeader(TIMESTAMP_HEADER));
Response response = handleInternalRequest(request, id, timestamp);
session.sendResponse(response);
} else {
handleAsLeader(request, session, id);
}
}

private void handleAsLeader(Request request, HttpSession session, String id) {
String ackS = request.getParameter("ack=");
String fromS = request.getParameter("from=");

int from = isEmptyId(fromS)
? shardSelector.getClusterSize() : Integer.parseInt(fromS);
int ack = isEmptyId(ackS)
? quorum(from) : Integer.parseInt(ackS);

if (ack == 0 || ack > from) {
sendEmptyResponse(Response.BAD_REQUEST, session);
}

try {
executorService.execute(() -> {
try {
session.sendError(Response.BAD_REQUEST, e.getMessage());
} catch (IOException ex) {
LOGGER.info("Exception during sending the response: ", ex);
session.close();
collectResponses(request, session, id, from, ack);
} catch (Exception e) {
try {
session.sendError(Response.BAD_REQUEST, e.getMessage());
} catch (IOException ex) {
LOGGER.info("Exception during sending the response: ", ex);
session.close();
}
}
}
});
});
} catch (RejectedExecutionException e) {
LOGGER.error("Request rejected by policy", e);
sendEmptyResponse(Response.SERVICE_UNAVAILABLE, session);
}
}

@Path(PATH)
@RequestMethod(Request.METHOD_GET)
public Response getEntity(@Param(value = "id", required = true) String id) {
if (isEmptyId(id)) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
private void collectResponses(Request request,
HttpSession session,
String id,
int from,
int ack
) {
List<CompletableFuture<Response>> asyncResponses = new CopyOnWriteArrayList<>();
long timestamp = System.currentTimeMillis();
int firstOwnerShardIndex = shardSelector.getOwnerShardIndex(id);

for (int i = 0; i < from; i++) {
CompletableFuture<Response> asyncResponse;
int shardIndex = (firstOwnerShardIndex + i) % shardSelector.getClusterSize();

if (isRedirectNeeded(shardIndex)) {
asyncResponse = handleRedirect(request, timestamp, shardIndex);
} else {
asyncResponse = handleInternalRequestAsync(request, id, timestamp);
}
String ownerShardUrl = shardSelector.getOwnerShardUrl(id);
if (isRedirectNeeded(ownerShardUrl)) {
return redirectRequest("GET", id, ownerShardUrl, new byte[0]);

asyncResponses.add(asyncResponse);

}

handleAsyncResponses(session, ack, from, request, asyncResponses);

}

private void handleAsyncResponses(
HttpSession session, int ack, int from, Request request,
List<CompletableFuture<Response>> completableFutureResponses
) {
List<Response> validResponses = new CopyOnWriteArrayList<>();
AtomicBoolean isEnoughValidResponses = new AtomicBoolean();
AtomicInteger allResponsesCounter = new AtomicInteger();

for (CompletableFuture<Response> completableFuture : completableFutureResponses) {
completableFuture.whenCompleteAsync((response, throwable) -> {
if (isEnoughValidResponses.get()) {
return;
}
allResponsesCounter.incrementAndGet();

if (throwable != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throwable съедается и не понятно что была какая-то ошибка

response = new Response(Response.INTERNAL_ERROR);
}

if (isValidResponse(response)) {
validResponses.add(response);
}

sendResponseIfEnoughReplicasResponsesNumber(request,
isEnoughValidResponses,
session,
validResponses,
ack);

if (allResponsesCounter.get() == from && validResponses.size() < ack) {
sendEmptyResponse("504 Not Enough Replicas", session);
}
}, executorService).exceptionally((th) -> new Response(Response.INTERNAL_ERROR));
}
}

private void sendResponseIfEnoughReplicasResponsesNumber(
Request request,
AtomicBoolean isEnoughValidResponses,
HttpSession session,
List<Response> responses,
int ack
) {
try {
if (responses.size() >= ack) {
isEnoughValidResponses.set(true);
if (request.getMethod() == Request.METHOD_GET) {
session.sendResponse(getResponseWithMaxTimestamp(responses));
} else {
session.sendResponse(responses.getFirst());
}
}
Entry<MemorySegment> value = referenceDao.get(
convertBytesToMemorySegment(id.getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
LOGGER.error("Exception during send win response: ", e);
sendEmptyResponse(Response.INTERNAL_ERROR, session);
session.close();
}
}

private boolean isValidResponse(Response response) {
return Arrays.stream(AVAILABLE_GOOD_RESPONSE_CODES)
.anyMatch(code -> code == response.getStatus());
}

return value == null ? new Response(Response.NOT_FOUND, Response.EMPTY)
: Response.ok(value.value().toArray(ValueLayout.JAVA_BYTE));
private CompletableFuture<Response> handleRedirect(Request request, long timestamp, int nodeIndex) {
return redirectRequest(request.getMethodName(),
request.getParameter("id="),
shardSelector.getShardUrlByIndex(nodeIndex),
request.getBody() == null
? new byte[0] : request.getBody(), timestamp);
}

@Path(PATH)
@RequestMethod(Request.METHOD_PUT)
public Response putEntity(@Param(value = "id", required = true) String id, Request request) {
if (isEmptyId(id)) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
private CompletableFuture<Response> handleInternalRequestAsync(Request request, String id, long timestamp) {
return CompletableFuture.supplyAsync(() ->
handleInternalRequest(request, id, timestamp), ShardRouter.proxyExecutor);
}

private Response handleInternalRequest(Request request, String id, long timestamp) {
switch (request.getMethod()) {
case Request.METHOD_GET -> {
EntryWithTimestamp<MemorySegment> entry = referenceDao.get(
convertBytesToMemorySegment(id.getBytes(StandardCharsets.UTF_8)));

if (entry == null) {
return new Response(Response.NOT_FOUND, Response.EMPTY);
} else if (entry.value() == null) {
Response response = new Response(Response.NOT_FOUND, Response.EMPTY);
response.addHeader(TIMESTAMP_HEADER + ": " + entry.timestamp());
return response;
} else {
Response response = Response.ok(entry.value().toArray(ValueLayout.JAVA_BYTE));
response.addHeader(TIMESTAMP_HEADER + ": " + entry.timestamp());
return response;
}
}
String ownerShardUrl = shardSelector.getOwnerShardUrl(id);
if (isRedirectNeeded(ownerShardUrl)) {
return redirectRequest("PUT", id, ownerShardUrl, request.getBody());
case Request.METHOD_PUT -> {
referenceDao.upsert(new BaseEntryWithTimestamp<>(
convertBytesToMemorySegment(id.getBytes(StandardCharsets.UTF_8)),
convertBytesToMemorySegment(request.getBody()), timestamp));

return new Response(Response.CREATED, Response.EMPTY);
}
referenceDao.upsert(new BaseEntry<>(
convertBytesToMemorySegment(id.getBytes(StandardCharsets.UTF_8)),
convertBytesToMemorySegment(request.getBody())));
return new Response(Response.CREATED, Response.EMPTY);
}

@Path(PATH)
@RequestMethod(Request.METHOD_DELETE)
public Response deleteEntity(@Param(value = "id", required = true) String id) {
String ownerShardUrl = shardSelector.getOwnerShardUrl(id);
if (isRedirectNeeded(ownerShardUrl)) {
return redirectRequest("DELETE", id, ownerShardUrl, new byte[0]);
case Request.METHOD_DELETE -> {
referenceDao.upsert(new BaseEntryWithTimestamp<>(
convertBytesToMemorySegment(id.getBytes(StandardCharsets.UTF_8)),
null, timestamp));

return new Response(Response.ACCEPTED, Response.EMPTY);
}
if (isEmptyId(id)) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
default -> {
return new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY);
}
referenceDao.upsert(new BaseEntry<>(
convertBytesToMemorySegment(id.getBytes(StandardCharsets.UTF_8)), null));
return new Response(Response.ACCEPTED, Response.EMPTY);
}

@Override
public void handleDefault(Request request, HttpSession session) throws IOException {
switch (request.getMethodName()) {
case "GET", "PUT", "DELETE" -> session.sendResponse(new Response(Response.BAD_REQUEST, Response.EMPTY));
default -> session.sendResponse(new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY));
}
}

private boolean isRedirectNeeded(String ownerUrl) {
return !url.equals(ownerUrl);
private boolean isRedirectNeeded(int shardIndex) {
return !url.equals(shardSelector.getShardUrlByIndex(shardIndex));
}

private boolean isEmptyId(String id) {
return id.isEmpty() && id.isBlank();
return id == null || (id.isEmpty() && id.isBlank());
}

private MemorySegment convertBytesToMemorySegment(byte[] byteArray) {
return MemorySegment.ofArray(byteArray);
}

private int quorum(int from) {
return from / 2 + 1;
}

private long resolveTimestamp(String timestampHeader) {
if (isEmptyId(timestampHeader)) {
return 0L;
}
try {
return Long.parseLong(timestampHeader);
} catch (NumberFormatException e) {
return 0L;
}
}

private void sendEmptyResponse(String response, HttpSession session) {
var emptyRes = new Response(response, Response.EMPTY);
try {
session.sendResponse(emptyRes);
} catch (IOException e) {
LOGGER.info("Exception during sending the empty response: ", e);
}
}

private Response getResponseWithMaxTimestamp(List<Response> responses) {
Response result = responses.getFirst();
long max = 0;
for (Response response : responses) {
String timestampHeader = response.getHeaders()[response.getHeaderCount() - 1];

long timestamp = resolveTimestamp(timestampHeader);
if (max < timestamp) {
max = timestamp;
result = response;
}
}

return result;
}
}
8 changes: 4 additions & 4 deletions src/main/java/ru/vk/itmo/test/alenkovayulya/ServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import ru.vk.itmo.Service;
import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.dao.Config;
import ru.vk.itmo.dao.Dao;
import ru.vk.itmo.dao.Entry;
import ru.vk.itmo.test.ServiceFactory;
import ru.vk.itmo.test.alenkovayulya.dao.Dao;
import ru.vk.itmo.test.alenkovayulya.dao.EntryWithTimestamp;
import ru.vk.itmo.test.alenkovayulya.dao.ReferenceDao;

import java.io.IOException;
Expand All @@ -17,7 +17,7 @@

public class ServiceImpl implements Service {

private Dao<MemorySegment, Entry<MemorySegment>> referenceDao;
private Dao<MemorySegment, EntryWithTimestamp<MemorySegment>> referenceDao;
private ExecutorService executorService;
private ServerImpl server;
private final ServiceConfig config;
Expand Down Expand Up @@ -72,7 +72,7 @@ private void shutdownDao() {
}
}

@ServiceFactory(stage = 3)
@ServiceFactory(stage = 5)
public static class Factory implements ServiceFactory.Factory {
@Override
public Service create(ServiceConfig config) {
Expand Down
Loading
Loading