diff --git a/README.md b/README.md index 5935b793a..3da67abd2 100644 --- a/README.md +++ b/README.md @@ -163,3 +163,21 @@ HTTP API расширяется query-параметрами `from` и `ack`, с После прохождения модульных тестов, присылайте pull request с изменениями. Наполните БД большим объёмом данных и отпрофилируйте cpu, alloc и lock при получении range всей базы одним запросом curl'ом. Присылайте отчёт с анализом результатов и оптимизаций. + +## Этап 7. Бонусный (hard deadline 2024-05-15 23:59:59 MSK) + +Фичи, которые позволяют получить дополнительные баллы (при условии **добавления набора тестов**, демонстрирующих корректность, где применимо): +* Развёрнутая конструктивная обратная связь по курсу: достоинства и недостатки курса, сложность тем, предложения по улучшению +* Кластерные range-запросы с учётом шардирования и репликации +* Read repair при обнаружении расхождений между репликами +* Expire: возможность указания [времени жизни записей](https://en.wikipedia.org/wiki/Time_to_live) +* Server-side processing: трансформация данных с помощью скрипта, запускаемого на узлах кластера через API +* Нагрузочное тестирование при помощи [Y!CSB](https://github.com/brianfrankcooper/YCSB) +* Нагрузочное тестирование при помощи [Yandex.Tank](https://overload.yandex.net) +* Регулярный автоматический фоновый compaction (модульные и нагрузочные тесты) +* Hinted handoff [по аналогии с Cassandra](https://cassandra.apache.org/doc/latest/operating/hints.html) +* Устранение неконсистентностей между репликами [по аналогии с Cassandra](https://www.datastax.com/blog/advanced-repair-techniques) [nodetool repair](https://docs.datastax.com/en/archived/cassandra/2.0/cassandra/operations/ops_repair_nodes_c.html), например, на основе [Merkle Tree](https://en.wikipedia.org/wiki/Merkle_tree) +* Блочная компрессия данных на основе LZ4/zSTD/... +* Что-нибудь **своё**? + +Перед началом работ продумайте и согласуйте с преподавателем её технический дизайн и получите вспомогательные материалы. diff --git a/src/main/java/ru/vk/itmo/test/reference/ReferenceHttpSession.java b/src/main/java/ru/vk/itmo/test/reference/ReferenceHttpSession.java index df22088af..aaa107a7c 100644 --- a/src/main/java/ru/vk/itmo/test/reference/ReferenceHttpSession.java +++ b/src/main/java/ru/vk/itmo/test/reference/ReferenceHttpSession.java @@ -4,10 +4,20 @@ import one.nio.http.HttpSession; import one.nio.http.Response; import one.nio.net.Socket; +import ru.vk.itmo.test.reference.dao2.ReferenceBaseEntry; import java.io.IOException; +import java.lang.foreign.MemorySegment; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; public class ReferenceHttpSession extends HttpSession { + private static final byte[] CRLF = "\r\n".getBytes(StandardCharsets.UTF_8); + private static final byte[] LF = "\n".getBytes(StandardCharsets.UTF_8); + private static final byte[] EMPTY_CHUNK = "0\r\n\r\n".getBytes(StandardCharsets.UTF_8); + Iterator> iterator; + public ReferenceHttpSession(Socket socket, HttpServer server) { super(socket, server); } @@ -21,6 +31,51 @@ public void sendResponseOrClose(Response response) { } } + @Override + protected void processWrite() throws Exception { + super.processWrite(); + + nextChunk(); + } + + public void stream(Iterator> iterator) throws IOException { + this.iterator = iterator; + Response response = new Response(Response.OK); + response.addHeader("Transfer-Encoding: chunked"); + writeResponse(response, false); + + nextChunk(); + } + + private void nextChunk() throws IOException { + while (iterator.hasNext() && queueHead == null) { + ReferenceBaseEntry next = iterator.next(); + ByteBuffer key = next.key().asByteBuffer(); + ByteBuffer value = next.value().asByteBuffer(); + int payloadSize = key.remaining() + value.remaining() + LF.length; + String payloadSizeStr = Integer.toHexString(payloadSize); + byte[] payloadSizeStrBytes = payloadSizeStr.getBytes(StandardCharsets.UTF_8); + write(payloadSizeStrBytes, 0, payloadSizeStrBytes.length); + write(CRLF, 0, CRLF.length); + write(new ReferenceQueueItem(key)); + write(LF, 0, LF.length); + write(new ReferenceQueueItem(value)); + write(CRLF, 0, CRLF.length); + } + + if (!iterator.hasNext()) { + write(EMPTY_CHUNK, 0, EMPTY_CHUNK.length); + + if ((this.handling = pipeline.pollFirst()) != null) { + if (handling == FIN) { + scheduleClose(); + } else { + server.handleRequest(handling, this); + } + } + } + } + public void sendError(Throwable e) { log.error("Exception during handleRequest", e); try { @@ -30,4 +85,22 @@ public void sendError(Throwable e) { scheduleClose(); } } + + static class ReferenceQueueItem extends QueueItem { + private final ByteBuffer buffer; + + ReferenceQueueItem(ByteBuffer buffer) { + this.buffer = buffer; + } + + @Override + public int remaining() { + return buffer.remaining(); + } + + @Override + public int write(Socket socket) throws IOException { + return socket.write(buffer); + } + } } diff --git a/src/main/java/ru/vk/itmo/test/reference/ReferenceServer.java b/src/main/java/ru/vk/itmo/test/reference/ReferenceServer.java index cb0d54874..aabbed52b 100644 --- a/src/main/java/ru/vk/itmo/test/reference/ReferenceServer.java +++ b/src/main/java/ru/vk/itmo/test/reference/ReferenceServer.java @@ -21,7 +21,9 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Iterator; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -82,7 +84,21 @@ public void handleRequest(Request request, HttpSession sessionI) throws IOExcept if (!(sessionI instanceof ReferenceHttpSession session)) { throw new IllegalArgumentException("this method support only ReferenceHttpSession"); } -// if (config.selfPort() == 8100) { + if ("/v0/entities".equals(request.getPath())) { + executorWork.execute(new Runnable() { + @Override + public void run() { + try { + stream(request, session); + } catch (Exception e) { + session.sendError(e); + } + } + }); + return; + } + + // if (config.selfPort() == 8100) { // return; // } String id = request.getParameter("id="); @@ -158,6 +174,24 @@ public void run() { }); } + private void stream(Request request, HttpSession session) throws IOException { + String startStr = request.getParameter("start="); + if (startStr == null || startStr.isBlank()) { + session.sendError(Response.BAD_REQUEST, "Invalid arguments"); + return; + } + + String endStr = request.getParameter("end="); + if (endStr != null && endStr.isBlank()) { + session.sendError(Response.BAD_REQUEST, "Invalid arguments"); + return; + } + + Iterator> iterator = dao.get(MemorySegment.ofArray(startStr.getBytes(StandardCharsets.UTF_8)), + endStr == null ? null : MemorySegment.ofArray(endStr.getBytes(StandardCharsets.UTF_8))); + ((ReferenceHttpSession)session).stream(iterator); + } + private int getInt(Request request, String param, int defaultValue) throws IOException { int ack; String ackStr = request.getParameter(param); diff --git a/src/main/java/ru/vk/itmo/test/reference/ReferenceService.java b/src/main/java/ru/vk/itmo/test/reference/ReferenceService.java index 039ad7c61..7a77d81ca 100644 --- a/src/main/java/ru/vk/itmo/test/reference/ReferenceService.java +++ b/src/main/java/ru/vk/itmo/test/reference/ReferenceService.java @@ -71,7 +71,7 @@ public static void shutdownAndAwaitTermination(ExecutorService pool) { } } - @ServiceFactory(stage = 5) + @ServiceFactory(stage = 6) public static class Factory implements ServiceFactory.Factory { @Override diff --git a/src/main/java/ru/vk/itmo/test/reference/wrk_scripts/get3.lua b/src/main/java/ru/vk/itmo/test/reference/wrk_scripts/get3.lua new file mode 100644 index 000000000..c2ea60cb7 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/reference/wrk_scripts/get3.lua @@ -0,0 +1,8 @@ +math.randomseed(os.time()) + +function request() + counter = math.random(100000100, 100000000 + 5000000) + headers = {} + headers["Host"] = "localhost:8080" + return wrk.format("GET", "/v0/entities?start=" .. tostring(counter), headers) +end \ No newline at end of file diff --git a/src/main/java/ru/vk/itmo/test/smirnovandrew/MyServer.java b/src/main/java/ru/vk/itmo/test/smirnovandrew/MyServer.java index 3ab5047ef..d0a26feb2 100644 --- a/src/main/java/ru/vk/itmo/test/smirnovandrew/MyServer.java +++ b/src/main/java/ru/vk/itmo/test/smirnovandrew/MyServer.java @@ -1,56 +1,46 @@ package ru.vk.itmo.test.smirnovandrew; +import one.nio.http.Header; +import one.nio.http.HttpClient; +import one.nio.http.HttpException; import one.nio.http.HttpServer; -import one.nio.http.HttpServerConfig; import one.nio.http.HttpSession; import one.nio.http.Param; import one.nio.http.Path; import one.nio.http.Request; import one.nio.http.RequestMethod; import one.nio.http.Response; -import one.nio.server.AcceptorConfig; +import one.nio.pool.PoolException; import ru.vk.itmo.ServiceConfig; -import ru.vk.itmo.dao.BaseEntry; import ru.vk.itmo.test.reference.dao.ReferenceDao; import java.io.IOException; -import java.lang.foreign.MemorySegment; -import java.lang.foreign.ValueLayout; -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.function.Function; import java.util.logging.Logger; +import java.util.stream.Collectors; public class MyServer extends HttpServer { - private static final String ROOT = "/v0/entity"; - private static long DURATION = 1000L; - - private static final String ID = "id="; - - private static final long RESPONSE_WAIT = 1; - - private final ReferenceDao dao; - + private static final String X_SENDER_NODE = "X-SenderNode"; + private static final String NOT_ENOUGH_REPLICAS = "504 Not Enough Replicas"; + private static final long DURATION = 1000L; + private static final int OK_STATUS = 300; + private static final int NOT_FOUND_STATUS = 404; + private static final String HEADER_DELIMITER = ": "; + private final MyServerDao dao; private final MyExecutor executor; - private final Logger logger; - - private final HttpClient httpClient; - + private final Map httpClients; private final RendezvousClusterManager rendezvousClustersManager; - - private final String selfUrl; + private final ServiceConfig config; private static final Set METHOD_SET = new HashSet<>(List.of( Request.METHOD_GET, @@ -58,199 +48,217 @@ public class MyServer extends HttpServer { Request.METHOD_DELETE )); - @Override - public void handleRequest(Request request, HttpSession session) throws IOException { - String key = request.getParameter(ID); - if (Objects.isNull(key) || key.isEmpty()) { - logger.info(String.format("There is no id in query: %s", request.getQueryString())); - sendEmpty(session, Response.BAD_REQUEST); - return; - } - - String clusterUrl = rendezvousClustersManager.getCluster(key); - - if (Objects.isNull(clusterUrl)) { - logger.info(String.format("Cluster url is null, request = %s", request.getQueryString())); - sendEmpty(session, Response.BAD_REQUEST); - return; - } - - if (Objects.equals(selfUrl, clusterUrl)) { - handleLocalRequest(request, session); - return; - } - - var clusterRequest = HttpRequest.newBuilder( - URI.create( - String.join("", - clusterUrl, - ROOT, - "?", - ID, - key - )) - ); - - switch (request.getMethod()) { - case Request.METHOD_GET -> clusterRequest.GET(); - case Request.METHOD_DELETE -> clusterRequest.DELETE(); - case Request.METHOD_PUT -> clusterRequest.PUT(HttpRequest.BodyPublishers.ofByteArray(request.getBody())); - default -> session.sendResponse(new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY)); - } - - try { - var response = httpClient.sendAsync( - clusterRequest.build(), - HttpResponse.BodyHandlers.ofByteArray() - ).get(RESPONSE_WAIT, TimeUnit.SECONDS); - - session.sendResponse( - new Response( - responseFromStatusCode(response.statusCode()), - response.body() - ) - ); - } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) { - logger.info(e.getMessage()); - sendEmpty(session, Response.INTERNAL_ERROR); - Thread.currentThread().interrupt(); - } + public MyServer(ServiceConfig config, ReferenceDao dao) throws IOException { + this(config, dao, Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors()); } - private static String responseFromStatusCode(int statusCode) { - return switch (statusCode) { - case 400 -> Response.BAD_REQUEST; - case 404 -> Response.NOT_FOUND; - case 200 -> Response.OK; - case 202 -> Response.ACCEPTED; - case 201 -> Response.CREATED; - case 503 -> Response.SERVICE_UNAVAILABLE; - default -> Response.INTERNAL_ERROR; - }; + public MyServer( + ServiceConfig config, + ReferenceDao dao, + int corePoolSize, + int availableProcessors + ) throws IOException { + super(MyServerUtil.generateServerConfig(config)); + this.rendezvousClustersManager = new RendezvousClusterManager(config); + this.config = config; + this.dao = new MyServerDao(dao); + this.executor = new MyExecutor(corePoolSize, availableProcessors); + this.logger = Logger.getLogger(MyServer.class.getName()); + this.httpClients = config.clusterUrls().stream() + .filter(url -> !Objects.equals(url, config.selfUrl())) + .collect(Collectors.toMap(s -> s, MyServerUtil::createClient, (c, c1) -> c)); } - private void handleLocalRequest(Request request, HttpSession session) { + @Override + public void handleRequest(Request request, HttpSession session) throws IOException { try { long exp = System.currentTimeMillis() + DURATION; executor.execute(() -> { try { if (System.currentTimeMillis() > exp) { - sendEmpty(session, Response.SERVICE_UNAVAILABLE); + MyServerUtil.sendEmpty(session, logger, Response.SERVICE_UNAVAILABLE); } else { super.handleRequest(request, session); } } catch (IOException e) { logger.info(e.getMessage()); - sendEmpty(session, Response.INTERNAL_ERROR); + MyServerUtil.sendEmpty(session, logger, Response.INTERNAL_ERROR); } catch (Exception e) { logger.info(e.getMessage()); - sendEmpty(session, Response.BAD_REQUEST); + MyServerUtil.sendEmpty(session, logger, Response.BAD_REQUEST); } }); } catch (RejectedExecutionException e) { logger.info(e.getMessage()); - sendEmpty(session, "429 Too Many Requests"); + MyServerUtil.sendEmpty(session, logger, "429 Too Many Requests"); } } - private void sendEmpty(HttpSession session, String message) { + private static int quorum(int from) { + return from / 2 + 1; + } + + private Response sendToAnotherNode( + Request request, + String clusterUrl, + Function operation + ) { + if (Objects.equals(clusterUrl, config.selfUrl())) { + return operation.apply(dao); + } + + var httpClient = httpClients.get(clusterUrl); + try { - session.sendResponse(new Response(message, Response.EMPTY)); - } catch (IOException e) { + return httpClient.invoke(request); + } catch (InterruptedException e) { logger.info(e.getMessage()); + Thread.currentThread().interrupt(); + return new Response(Response.INTERNAL_ERROR, Response.EMPTY); + } catch (HttpException | IOException | PoolException e1) { + logger.info(e1.getMessage()); + return new Response(Response.INTERNAL_ERROR, Response.EMPTY); } } - private static HttpServerConfig generateServerConfig(ServiceConfig config) { - var serverConfig = new HttpServerConfig(); - var acceptorsConfig = new AcceptorConfig(); + private Response handleLocalRequest( + Request request, + String id, + Integer fromParam, + Integer ackParam, + String senderNode, + Function operation + ) { + Integer from = fromParam; + if (Objects.isNull(from)) { + from = config.clusterUrls().size(); + } - acceptorsConfig.port = config.selfPort(); - acceptorsConfig.reusePort = true; + Integer ack = ackParam; + if (Objects.isNull(ack)) { + ack = quorum(from); + } - serverConfig.acceptors = new AcceptorConfig[] {acceptorsConfig}; - serverConfig.closeSessions = true; - return serverConfig; - } + String paramError = getParametersError(id, from, ack); + if (Objects.nonNull(paramError)) { + return new Response(Response.BAD_REQUEST, paramError.getBytes(StandardCharsets.UTF_8)); + } - public MyServer(ServiceConfig config, ReferenceDao dao) throws IOException { - this(config, dao, Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors()); - } + if (Objects.nonNull(senderNode) && !senderNode.isEmpty()) { + return operation.apply(dao); + } - public MyServer( - ServiceConfig config, - ReferenceDao dao, - int corePoolSize, - int availableProcessors - ) throws IOException { - super(generateServerConfig(config)); - this.rendezvousClustersManager = new RendezvousClusterManager(config); - this.selfUrl = config.selfUrl(); - this.dao = dao; - this.executor = new MyExecutor(corePoolSize, availableProcessors); - this.logger = Logger.getLogger(MyServer.class.getName()); - this.httpClient = HttpClient.newHttpClient(); - } + String clusterUrl = rendezvousClustersManager.getCluster(id); - private boolean isStringInvalid(String param) { - return Objects.isNull(param) || "".equals(param); - } + if (Objects.isNull(clusterUrl)) { + return new Response(Response.BAD_REQUEST, Response.EMPTY); + } + + var sortedNodes = RendezvousClusterManager.getSortedNodes(from, config); + + if (sortedNodes.stream().map(config.clusterUrls()::get).noneMatch(config.selfUrl()::equals)) { + return sendToAnotherNode(request, clusterUrl, operation); + } - private MemorySegment fromString(String data) { - if (data == null) { - return null; + request.addHeader(String.join(HEADER_DELIMITER, X_SENDER_NODE, config.selfUrl())); + var responses = new ArrayList(); + for (int nodeNumber : sortedNodes) { + var r = sendToAnotherNode(request, config.clusterUrls().get(nodeNumber), operation); + if (r.getStatus() < OK_STATUS + || (r.getStatus() == NOT_FOUND_STATUS && request.getMethod() == Request.METHOD_GET)) { + responses.add(r); + } } - return MemorySegment.ofArray(data.getBytes(StandardCharsets.UTF_8)); + + if (responses.size() < ack) { + return new Response(NOT_ENOUGH_REPLICAS, Response.EMPTY); + } + return MyServerUtil.getMaxTimestampResponse(responses); } - @Path(ROOT) - @RequestMethod(Request.METHOD_GET) - public Response get( - @Param(value = "id", required = true) String id - ) { - if (isStringInvalid(id)) { - return new Response(Response.BAD_REQUEST, Response.EMPTY); + private String getParametersError(String id, Integer from, Integer ack) { + if (Objects.isNull(id) || id.isEmpty()) { + return "Invalid id provided"; + } + + if (ack <= 0) { + return "Too small ack"; } - var key = fromString(id); - var got = dao.get(key); + if (from <= 0) { + return "Too small from"; + } - if (Objects.isNull(got)) { - return new Response(Response.NOT_FOUND, Response.EMPTY); + int clusterSize = config.clusterUrls().size(); + if (from > clusterSize) { + return String.format("From is greater than cluster size: from=%d, clusterSize=%d", from, clusterSize); } - return Response.ok(got.value().toArray(ValueLayout.JAVA_BYTE)); + if (ack > from) { + return String.format("Ack is greater than from: ack=%d, from=%d", ack, from); + } + + return null; + } + + @Path(ROOT) + @RequestMethod(Request.METHOD_GET) + public Response get( + @Param(value = "id", required = true) String id, + @Param(value = "from") Integer from, + @Param(value = "ack") Integer ack, + @Header(value = X_SENDER_NODE) String senderNode, + Request request + ) { + return handleLocalRequest( + request, + id, + from, + ack, + senderNode, + d -> d.getEntryFromDao(id) + ); } @Path(ROOT) @RequestMethod(Request.METHOD_DELETE) public Response delete( - @Param(value = "id", required = true) String id + @Param(value = "id", required = true) String id, + @Param(value = "from") Integer from, + @Param(value = "ack") Integer ack, + @Header(value = X_SENDER_NODE) String senderNode, + Request request ) { - if (isStringInvalid(id)) { - return new Response(Response.BAD_REQUEST, Response.EMPTY); - } - - var key = fromString(id); - dao.upsert(new BaseEntry<>(key, null)); - return new Response(Response.ACCEPTED, Response.EMPTY); + return handleLocalRequest( + request, + id, + from, + ack, + senderNode, + d -> d.deleteValueFromDao(id) + ); } @Path(ROOT) @RequestMethod(Request.METHOD_PUT) public Response put( @Param(value = "id", required = true) String id, + @Param(value = "from") Integer from, + @Param(value = "ack") Integer ack, + @Header(value = X_SENDER_NODE) String senderNode, Request request ) { - if (isStringInvalid(id)) { - return new Response(Response.BAD_REQUEST, Response.EMPTY); - } - - var key = fromString(id); - var value = MemorySegment.ofArray(request.getBody()); - - dao.upsert(new BaseEntry<>(key, value)); - return new Response(Response.CREATED, Response.EMPTY); + request.addHeader("Content-Length: " + request.getBody().length); + request.setBody(request.getBody()); + + return handleLocalRequest( + request, + id, + from, + ack, + senderNode, + d -> d.putEntryIntoDao(id, request) + ); } @Override @@ -267,6 +275,6 @@ public void handleDefault(Request request, HttpSession session) throws IOExcepti public synchronized void stop() { this.executor.shutdown(); super.stop(); - httpClient.close(); + httpClients.values().forEach(HttpClient::close); } } diff --git a/src/main/java/ru/vk/itmo/test/smirnovandrew/MyServerDao.java b/src/main/java/ru/vk/itmo/test/smirnovandrew/MyServerDao.java new file mode 100644 index 000000000..f63805c8a --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/smirnovandrew/MyServerDao.java @@ -0,0 +1,81 @@ +package ru.vk.itmo.test.smirnovandrew; + +import one.nio.http.Request; +import one.nio.http.Response; +import ru.vk.itmo.dao.BaseEntry; +import ru.vk.itmo.dao.Dao; +import ru.vk.itmo.dao.Entry; +import ru.vk.itmo.test.abramovilya.dao.DaoFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; + +public class MyServerDao { + private final Dao> dao; + + public MyServerDao(Dao> dao) { + this.dao = dao; + } + + Response getEntryFromDao(String id) { + Entry entry = dao.get(DaoFactory.fromString(id)); + if (entry == null) { + return new Response(Response.NOT_FOUND, Response.EMPTY); + } + try { + ValWithTime valueWithTimestamp = byteArrayToObject(entry.value().toArray(ValueLayout.JAVA_BYTE)); + if (valueWithTimestamp.value() == null) { + Response response = new Response(Response.NOT_FOUND, Response.EMPTY); + response.addHeader(MyServerUtil.X_TIMESTAMP + valueWithTimestamp.timestamp()); + return response; + } + Response response = new Response(Response.OK, valueWithTimestamp.value()); + response.addHeader(MyServerUtil.X_TIMESTAMP + valueWithTimestamp.timestamp()); + return response; + } catch (IOException | ClassNotFoundException e) { + return new Response(Response.INTERNAL_ERROR, Response.EMPTY); + } + } + + Response putEntryIntoDao(String id, Request request) { + ValWithTime valueWithTimestamp = new ValWithTime(request.getBody(), System.currentTimeMillis()); + try { + dao.upsert(new BaseEntry<>(DaoFactory.fromString(id), + MemorySegment.ofArray(objToByteArray(valueWithTimestamp)))); + return new Response(Response.CREATED, Response.EMPTY); + } catch (IOException e) { + return new Response(Response.INTERNAL_ERROR, Response.EMPTY); + } + } + + Response deleteValueFromDao(String id) { + ValWithTime valueWithTimestamp = new ValWithTime(null, System.currentTimeMillis()); + try { + dao.upsert(new BaseEntry<>(DaoFactory.fromString(id), + MemorySegment.ofArray(objToByteArray(valueWithTimestamp)))); + } catch (IOException e) { + return new Response(Response.INTERNAL_ERROR, Response.EMPTY); + } + return new Response(Response.ACCEPTED, Response.EMPTY); + } + + private static ValWithTime byteArrayToObject(byte[] bytes) throws IOException, ClassNotFoundException { + var byteArrayInputStream = new ByteArrayInputStream(bytes); + try (var objectInputStream = new ObjectInputStream(byteArrayInputStream)) { + return (ValWithTime) objectInputStream.readObject(); + } + } + + private static byte[] objToByteArray(ValWithTime object) throws IOException { + var byteArrayOutputStream = new ByteArrayOutputStream(); + try (var objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) { + objectOutputStream.writeObject(object); + return byteArrayOutputStream.toByteArray(); + } + } +} diff --git a/src/main/java/ru/vk/itmo/test/smirnovandrew/MyServerUtil.java b/src/main/java/ru/vk/itmo/test/smirnovandrew/MyServerUtil.java new file mode 100644 index 000000000..9c6397bbf --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/smirnovandrew/MyServerUtil.java @@ -0,0 +1,69 @@ +package ru.vk.itmo.test.smirnovandrew; + +import one.nio.http.HttpClient; +import one.nio.http.HttpServerConfig; +import one.nio.http.HttpSession; +import one.nio.http.Response; +import one.nio.net.ConnectionString; +import one.nio.server.AcceptorConfig; +import ru.vk.itmo.ServiceConfig; + +import java.io.IOException; +import java.util.List; +import java.util.logging.Logger; + +public final class MyServerUtil { + private static final int CONNECTION_TIMEOUT = 1000; + + public static final String X_TIMESTAMP = "X-TimeStamp: "; + + private MyServerUtil() { + } + + public static HttpServerConfig generateServerConfig(ServiceConfig config) { + var serverConfig = new HttpServerConfig(); + var acceptorsConfig = new AcceptorConfig(); + + acceptorsConfig.port = config.selfPort(); + acceptorsConfig.reusePort = true; + + serverConfig.acceptors = new AcceptorConfig[]{acceptorsConfig}; + serverConfig.closeSessions = true; + return serverConfig; + } + + public static HttpClient createClient(String url) { + var client = new HttpClient(new ConnectionString(url)); + client.setConnectTimeout(CONNECTION_TIMEOUT); + return client; + } + + public static void sendEmpty(HttpSession session, Logger logger, String message) { + try { + session.sendResponse(new Response(message, Response.EMPTY)); + } catch (IOException e) { + logger.info(e.getMessage()); + } + } + + private static long headerTimestampToLong(Response r) { + String header = r.getHeader(X_TIMESTAMP); + if (header == null) { + return Long.MIN_VALUE; + } + return Long.parseLong(header); + } + + public static Response getMaxTimestampResponse(List responses) { + long maxTimestamp = Long.MIN_VALUE; + Response maxResponse = null; + for (var response: responses) { + long timestamp = headerTimestampToLong(response); + if (timestamp >= maxTimestamp) { + maxResponse = response; + maxTimestamp = timestamp; + } + } + return maxResponse; + } +} diff --git a/src/main/java/ru/vk/itmo/test/smirnovandrew/MyServiceFactory.java b/src/main/java/ru/vk/itmo/test/smirnovandrew/MyServiceFactory.java index 52beefe75..81564c3f4 100644 --- a/src/main/java/ru/vk/itmo/test/smirnovandrew/MyServiceFactory.java +++ b/src/main/java/ru/vk/itmo/test/smirnovandrew/MyServiceFactory.java @@ -4,7 +4,7 @@ import ru.vk.itmo.ServiceConfig; import ru.vk.itmo.test.ServiceFactory; -@ServiceFactory(stage = 3) +@ServiceFactory(stage = 4) public class MyServiceFactory implements ServiceFactory.Factory { @Override public Service create(ServiceConfig config) { diff --git a/src/main/java/ru/vk/itmo/test/smirnovandrew/RendezvousClusterManager.java b/src/main/java/ru/vk/itmo/test/smirnovandrew/RendezvousClusterManager.java index afd8e0721..0f6f40f70 100644 --- a/src/main/java/ru/vk/itmo/test/smirnovandrew/RendezvousClusterManager.java +++ b/src/main/java/ru/vk/itmo/test/smirnovandrew/RendezvousClusterManager.java @@ -3,6 +3,7 @@ import one.nio.util.Hash; import ru.vk.itmo.ServiceConfig; +import java.util.ArrayList; import java.util.List; public class RendezvousClusterManager { @@ -17,7 +18,7 @@ public String getCluster(String key) { int resIdx = -1; int maxHash = Integer.MIN_VALUE; for (int i = 0; i < availableClusters.size(); ++i) { - var hash = Hash.murmur3(String.join("", availableClusters.get(i), key)); + var hash = Hash.murmur3(key + availableClusters.get(i)); if (hash > maxHash) { resIdx = i; maxHash = hash; @@ -30,4 +31,12 @@ public String getCluster(String key) { return availableClusters.get(resIdx); } + + public static List getSortedNodes(int amount, ServiceConfig config) { + var result = new ArrayList(); + for (int i = 0; i < config.clusterUrls().size() && result.size() < amount; i++) { + result.add(i); + } + return result; + } } diff --git a/src/main/java/ru/vk/itmo/test/smirnovandrew/ValWithTime.java b/src/main/java/ru/vk/itmo/test/smirnovandrew/ValWithTime.java new file mode 100644 index 000000000..827a75451 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/smirnovandrew/ValWithTime.java @@ -0,0 +1,6 @@ +package ru.vk.itmo.test.smirnovandrew; + +import java.io.Serializable; + +public record ValWithTime(byte[] value, long timestamp) implements Serializable { +}