diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/HeaderConstraints.java b/src/main/java/ru/vk/itmo/test/dariasupriadkina/HeaderConstraints.java index 8c81842c0..fc4d99a39 100644 --- a/src/main/java/ru/vk/itmo/test/dariasupriadkina/HeaderConstraints.java +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/HeaderConstraints.java @@ -4,7 +4,8 @@ public final class HeaderConstraints { public static final String TIMESTAMP_MILLIS_HEADER = "X-TIMESTAMP-MILLIS: "; public static final String TIMESTAMP_MILLIS_HEADER_NORMAL = "X-TIMESTAMP-MILLIS"; - public static final String FROM_HEADER = "X-FROM"; + public static final String FROM_HEADER_NORMAL = "X-FROM"; + public static final String FROM_HEADER = "X-FROM: "; private HeaderConstraints() { } diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/NodeThreadPoolExecutor.java b/src/main/java/ru/vk/itmo/test/dariasupriadkina/NodeThreadPoolExecutor.java deleted file mode 100644 index 2a1953732..000000000 --- a/src/main/java/ru/vk/itmo/test/dariasupriadkina/NodeThreadPoolExecutor.java +++ /dev/null @@ -1,45 +0,0 @@ -package ru.vk.itmo.test.dariasupriadkina; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import ru.vk.itmo.test.dariasupriadkina.workers.WorkerConfig; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -public class NodeThreadPoolExecutor extends ThreadPoolExecutor { - - private static final Logger logger = LoggerFactory.getLogger(NodeThreadPoolExecutor.class.getName()); - private final int shutdownTimeoutSec; - - public NodeThreadPoolExecutor(int corePoolSize, int maximumPoolSize, BlockingQueue workQueue, - ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler, - int shutdownTimeoutSec) { - super(corePoolSize, maximumPoolSize, WorkerConfig.KEEP_ALIVE_TIME, - WorkerConfig.KEEP_ALIVE_TIME_SECONDS, workQueue, threadFactory, rejectedExecutionHandler); - this.shutdownTimeoutSec = shutdownTimeoutSec; - } - - // Метод из документации на ExecutorService - public void shutdownAndAwaitTermination() { - this.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!awaitTermination(shutdownTimeoutSec, TimeUnit.SECONDS)) { - shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!this.awaitTermination(shutdownTimeoutSec, TimeUnit.SECONDS)) { - logger.error("Pool did not terminate"); - } - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } -} diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/SelfRequestHandler.java b/src/main/java/ru/vk/itmo/test/dariasupriadkina/SelfRequestHandler.java index 86cde0b25..3a3405b9a 100644 --- a/src/main/java/ru/vk/itmo/test/dariasupriadkina/SelfRequestHandler.java +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/SelfRequestHandler.java @@ -9,12 +9,13 @@ import java.lang.foreign.MemorySegment; import java.lang.foreign.ValueLayout; +import java.util.concurrent.CompletableFuture; public class SelfRequestHandler { + private static final String TIMESTAMP_MILLIS_HEADER = "X-TIMESTAMP-MILLIS: "; private final Dao> dao; private final Utils utils; - private static final String TIMESTAMP_MILLIS_HEADER = "X-TIMESTAMP-MILLIS: "; public SelfRequestHandler(Dao> dao, Utils utils) { this.dao = dao; @@ -23,14 +24,22 @@ public SelfRequestHandler(Dao> dao, public Response handleRequest(Request request) { String id = utils.getIdParameter(request); - return switch (request.getMethodName()) { - case "GET" -> get(id); - case "PUT" -> put(id, request); - case "DELETE" -> delete(id); + return switch (request.getMethod()) { + case Request.METHOD_GET -> get(id); + case Request.METHOD_PUT -> put(id, request); + case Request.METHOD_DELETE -> delete(id); default -> new Response(Response.NOT_FOUND, Response.EMPTY); }; } + public CompletableFuture handleAsyncRequest(Request request) { + return composeFuture(handleRequest(request)); + } + + private CompletableFuture composeFuture(Response response) { + return CompletableFuture.completedFuture(response); + } + public Response get(String id) { try { if (id == null || id.isEmpty()) { diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/Server.java b/src/main/java/ru/vk/itmo/test/dariasupriadkina/Server.java index dc5108bf6..2f8fc933d 100644 --- a/src/main/java/ru/vk/itmo/test/dariasupriadkina/Server.java +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/Server.java @@ -22,30 +22,30 @@ import java.net.http.HttpResponse; import java.util.ArrayList; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.FROM_HEADER; +import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.FROM_HEADER_NORMAL; import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.TIMESTAMP_MILLIS_HEADER; import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.TIMESTAMP_MILLIS_HEADER_NORMAL; public class Server extends HttpServer { private static final Logger logger = LoggerFactory.getLogger(Server.class.getName()); + private static final byte[] EMPTY_BYTE_ARRAY = new byte[]{}; private final ExecutorService workerExecutor; private final Set permittedMethods = Set.of(Request.METHOD_GET, Request.METHOD_PUT, Request.METHOD_DELETE); private final String selfUrl; private final ShardingPolicy shardingPolicy; private final HttpClient httpClient; - private static final int REQUEST_TIMEOUT_SEC = 2; private final List clusterUrls; private final Utils utils; private final SelfRequestHandler selfHandler; @@ -70,7 +70,7 @@ private static HttpServerConfig createHttpServerConfig(ServiceConfig serviceConf acceptorConfig.port = serviceConfig.selfPort(); acceptorConfig.reusePort = true; - httpServerConfig.acceptors = new AcceptorConfig[] {acceptorConfig}; + httpServerConfig.acceptors = new AcceptorConfig[]{acceptorConfig}; httpServerConfig.closeSessions = true; return httpServerConfig; @@ -83,7 +83,6 @@ public void handleDefault(Request request, HttpSession session) throws IOExcepti response = new Response(Response.BAD_REQUEST, Response.EMPTY); } else { response = new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY); - } session.sendResponse(response); } @@ -93,74 +92,68 @@ public void handleRequest(Request request, HttpSession session) throws IOExcepti try { workerExecutor.execute(() -> { try { - if (!permittedMethods.contains(request.getMethod())) { + Map ackFrom = getFromAndAck(request); + int from = ackFrom.get("from"); + int ack = ackFrom.get("ack"); + if (!permittedMethods.contains(request.getMethod()) + || checkBadRequest(ack, from, request.getMethod())) { handleDefault(request, session); + return; } - if (request.getHeader(FROM_HEADER) == null) { + if (request.getHeader(FROM_HEADER_NORMAL) == null) { request.addHeader(FROM_HEADER + selfUrl); - - Map ackFrom = getFromAndAck(request); - int from = ackFrom.get("from"); - int ack = ackFrom.get("ack"); - - Response response = mergeResponses(broadcast( - shardingPolicy.getNodesById(utils.getIdParameter(request), from), request, ack), - ack, from); - - session.sendResponse(response); - + collectResponsesCallback( + broadcast( + shardingPolicy.getNodesById(utils.getIdParameter(request), from), + request + ), ack, from, session); } else { Response resp = selfHandler.handleRequest(request); checkTimestampHeaderExistenceAndSet(resp); session.sendResponse(resp); } - } catch (Exception e) { - logger.error("Unexpected error", e); - try { - if (e.getClass() == HttpException.class) { - session.sendResponse(new Response(Response.BAD_REQUEST, Response.EMPTY)); - } else { - session.sendResponse(new Response(Response.INTERNAL_ERROR, Response.EMPTY)); - } - } catch (IOException ex) { - logger.error("Failed to send error response", e); - session.close(); - } + solveUnexpectedError(e, session); } }); } catch (RejectedExecutionException e) { logger.error("Service is unavailable", e); session.sendResponse(new Response(Response.SERVICE_UNAVAILABLE, Response.EMPTY)); + } + } + private void solveUnexpectedError(Exception e, HttpSession session) { + logger.error("Unexpected error", e); + try { + if (e.getClass() == HttpException.class) { + session.sendResponse(new Response(Response.BAD_REQUEST, Response.EMPTY)); + } else { + session.sendResponse(new Response(Response.INTERNAL_ERROR, Response.EMPTY)); + } + } catch (Exception exception) { + logger.error("Failed to send error response", exception); + session.scheduleClose(); } } - private List broadcast(List nodes, Request request, int ack) { - int internalAck = ack; - List responses = new ArrayList<>(ack); - Response response; + private boolean checkBadRequest(int ack, int from, int method) { + return !permittedMethods.contains(method) || ack > from || ack <= 0 || from > clusterUrls.size(); + } + + private List> broadcast(List nodes, Request request) { + List> futureResponses = new ArrayList<>(nodes.size()); + CompletableFuture response; if (nodes.contains(selfUrl)) { - response = selfHandler.handleRequest(request); - checkTimestampHeaderExistenceAndSet(response); - responses.add(response); + response = selfHandler.handleAsyncRequest(request); + futureResponses.add(response); nodes.remove(selfUrl); - if (--internalAck == 0) { - return responses; - } } for (String node : nodes) { response = handleProxy(utils.getEntryUrl(node, utils.getIdParameter(request)), request); - if (response.getStatus() < 500) { - checkTimestampHeaderExistenceAndSet(response); - responses.add(response); - if (--internalAck == 0) { - return responses; - } - } + futureResponses.add(response); } - return responses; + return futureResponses; } private void checkTimestampHeaderExistenceAndSet(Response response) { @@ -171,19 +164,46 @@ private void checkTimestampHeaderExistenceAndSet(Response response) { } } - private Response mergeResponses(List responses, int ack, int from) { - if (responses.stream().filter(response -> response.getStatus() == 400).count() == from - || ack > from - || from > clusterUrls.size() - || ack <= 0) { - return new Response(Response.BAD_REQUEST, Response.EMPTY); + private void collectResponsesCallback(List> futureResponses, + int ack, int from, HttpSession session) { + List responses = new CopyOnWriteArrayList<>(); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger exceptionCount = new AtomicInteger(0); + for (CompletableFuture futureResponse : futureResponses) { + futureResponse.whenCompleteAsync((response, exception) -> { + + if (exception == null && response.getStatus() < 500) { + checkTimestampHeaderExistenceAndSet(response); + responses.add(response); + successCount.incrementAndGet(); + if (successCount.get() == ack) { + sendAsyncResponse(chooseResponse(responses), session); + } + } else { + exceptionCount.incrementAndGet(); + if (exceptionCount.get() > from - ack) { + sendAsyncResponse(new Response("504 Not enough replicas", Response.EMPTY), session); + } + } + + }, workerExecutor).exceptionally(exception -> { + logger.error("Error happened while collecting responses from nodes", exception); + sendAsyncResponse(new Response(Response.INTERNAL_ERROR, Response.EMPTY), session); + return null; + }); } - if (responses.stream().filter(response -> response.getStatus() == 200 - || response.getStatus() == 404 - || response.getStatus() == 202 - || response.getStatus() == 201).count() < ack) { - return new Response("504 Not Enough Replicas", Response.EMPTY); + } + + private void sendAsyncResponse(Response resp, HttpSession session) { + try { + session.sendResponse(resp); + } catch (IOException e) { + logger.error("Failed to send error response", e); + session.scheduleClose(); } + } + + private Response chooseResponse(List responses) { return responses.stream().max((o1, o2) -> { Long header1 = Long.parseLong(o1.getHeader(TIMESTAMP_MILLIS_HEADER)); Long header2 = Long.parseLong(o2.getHeader(TIMESTAMP_MILLIS_HEADER)); @@ -191,38 +211,21 @@ private Response mergeResponses(List responses, int ack, int from) { }).get(); } - public Response handleProxy(String redirectedUrl, Request request) { - try { - HttpRequest httpRequest = HttpRequest.newBuilder(URI.create(redirectedUrl)) - .header(FROM_HEADER, selfUrl) - .method(request.getMethodName(), HttpRequest.BodyPublishers.ofByteArray( - request.getBody() == null ? new byte[]{} : request.getBody()) - ).build(); - HttpResponse response = httpClient - .sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray()) - .get(REQUEST_TIMEOUT_SEC, TimeUnit.SECONDS); - Response response1 = new Response(String.valueOf(response.statusCode()), response.body()); - if (response.headers().map().get(TIMESTAMP_MILLIS_HEADER_NORMAL) == null) { - response1.addHeader(TIMESTAMP_MILLIS_HEADER + "0"); - } else { - response1.addHeader(TIMESTAMP_MILLIS_HEADER - + response.headers().map().get( - TIMESTAMP_MILLIS_HEADER_NORMAL.toLowerCase(Locale.ROOT)).getFirst() - ); - } - - return response1; - } catch (ExecutionException e) { - logger.error("Unexpected error", e); - return new Response(Response.INTERNAL_ERROR, Response.EMPTY); - } catch (TimeoutException e) { - logger.error("Timeout reached", e); - return new Response(Response.REQUEST_TIMEOUT, Response.EMPTY); - } catch (InterruptedException e) { - logger.error("Service is unavailable", e); - Thread.currentThread().interrupt(); - return new Response(Response.SERVICE_UNAVAILABLE, Response.EMPTY); - } + public CompletableFuture handleProxy(String redirectedUrl, Request request) { + HttpRequest httpRequest = HttpRequest.newBuilder(URI.create(redirectedUrl)) + .header(FROM_HEADER_NORMAL, selfUrl) + .method(request.getMethodName(), HttpRequest.BodyPublishers.ofByteArray( + request.getBody() == null ? EMPTY_BYTE_ARRAY : request.getBody()) + ).build(); + return httpClient + .sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray()) + .thenApplyAsync(httpResponse -> { + Response response1 = new Response(String.valueOf(httpResponse.statusCode()), httpResponse.body()); + if (httpResponse.headers().map().get(TIMESTAMP_MILLIS_HEADER_NORMAL) == null) { + response1.addHeader(TIMESTAMP_MILLIS_HEADER + "0"); + } + return response1; + }, workerExecutor); } private Map getFromAndAck(Request request) { diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/ServiceIml.java b/src/main/java/ru/vk/itmo/test/dariasupriadkina/ServiceIml.java index 6a6eb0270..78c54bdef 100644 --- a/src/main/java/ru/vk/itmo/test/dariasupriadkina/ServiceIml.java +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/ServiceIml.java @@ -1,6 +1,5 @@ package ru.vk.itmo.test.dariasupriadkina; -import one.nio.async.CustomThreadFactory; import ru.vk.itmo.Service; import ru.vk.itmo.ServiceConfig; import ru.vk.itmo.dao.Config; @@ -8,46 +7,42 @@ import ru.vk.itmo.test.dariasupriadkina.dao.ExtendedEntry; import ru.vk.itmo.test.dariasupriadkina.dao.ReferenceDao; import ru.vk.itmo.test.dariasupriadkina.sharding.ShardingPolicy; -import ru.vk.itmo.test.dariasupriadkina.workers.WorkerConfig; -import ru.vk.itmo.test.dariasupriadkina.workers.WorkerThreadPoolExecutor; +import ru.vk.itmo.test.dariasupriadkina.workers.CustomThreadConfig; +import ru.vk.itmo.test.dariasupriadkina.workers.CustomThreadPoolExecutor; import java.io.IOException; import java.lang.foreign.MemorySegment; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; public class ServiceIml implements Service { - private Server server; - private Dao> dao; private final Config daoConfig; private final ServiceConfig serviceConfig; - private final WorkerConfig workerConfig; - private WorkerThreadPoolExecutor workerThreadPoolExecutor; - private NodeThreadPoolExecutor nodeThreadPoolExecutor; + private final CustomThreadConfig workerConfig; + private final CustomThreadConfig nodeConfig; private final ShardingPolicy shardingPolicy; private final AtomicBoolean stopped = new AtomicBoolean(false); + private Server server; + private Dao> dao; + private CustomThreadPoolExecutor workerThreadPoolExecutor; + private CustomThreadPoolExecutor nodeThreadPoolExecutor; public ServiceIml(ServiceConfig serviceConfig, Config daoConfig, - WorkerConfig workerConfig, ShardingPolicy shardingPolicy) { + CustomThreadConfig workerConfig, ShardingPolicy shardingPolicy, + CustomThreadConfig nodeConfig) { this.daoConfig = daoConfig; this.serviceConfig = serviceConfig; this.workerConfig = workerConfig; this.shardingPolicy = shardingPolicy; + this.nodeConfig = nodeConfig; } @Override public synchronized CompletableFuture start() throws IOException { dao = new ReferenceDao(daoConfig); - workerThreadPoolExecutor = new WorkerThreadPoolExecutor(workerConfig); - // TODO вынести параметры в отдельную конфигурацию для большей гибкости - nodeThreadPoolExecutor = new NodeThreadPoolExecutor(8, - 8, - new ArrayBlockingQueue<>(1024), - new CustomThreadFactory("node-executor", true), - new ThreadPoolExecutor.AbortPolicy(), 30); + workerThreadPoolExecutor = new CustomThreadPoolExecutor(workerConfig); + nodeThreadPoolExecutor = new CustomThreadPoolExecutor(nodeConfig); nodeThreadPoolExecutor.prestartAllCoreThreads(); workerThreadPoolExecutor.prestartAllCoreThreads(); diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/ServiceImlFactory.java b/src/main/java/ru/vk/itmo/test/dariasupriadkina/ServiceImlFactory.java index 27c9a3c73..7e4dee3c9 100644 --- a/src/main/java/ru/vk/itmo/test/dariasupriadkina/ServiceImlFactory.java +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/ServiceImlFactory.java @@ -6,18 +6,14 @@ import ru.vk.itmo.test.ServiceFactory; import ru.vk.itmo.test.dariasupriadkina.sharding.RendezvousHashing; import ru.vk.itmo.test.dariasupriadkina.sharding.ShardingPolicy; -import ru.vk.itmo.test.dariasupriadkina.workers.WorkerConfig; +import ru.vk.itmo.test.dariasupriadkina.workers.CustomThreadConfig; import java.nio.file.Path; -@ServiceFactory(stage = 4) +@ServiceFactory(stage = 5) public class ServiceImlFactory implements ServiceFactory.Factory { - private static final long FLUSH_THRESHOLD_BYTES = 1024 * 1024; - private static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors(); - private static final int MAXIMUM_POOL_SIZE = Runtime.getRuntime().availableProcessors(); - private static final int QUEUE_SIZE = 1024; - private static final int SHUTDOWN_TIMEOUT_SEC = 30; + public static final long FLUSH_THRESHOLD_BYTES = 1024 * 1024; @Override public Service create(ServiceConfig serviceConfig) { @@ -25,8 +21,8 @@ public Service create(ServiceConfig serviceConfig) { serviceConfig.clusterUrls() ); Config referenceDaoConfig = new Config(Path.of(serviceConfig.workingDir().toUri()), FLUSH_THRESHOLD_BYTES); - WorkerConfig workerConfig = new WorkerConfig(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, - QUEUE_SIZE, SHUTDOWN_TIMEOUT_SEC); - return new ServiceIml(serviceConfig, referenceDaoConfig, workerConfig, shardingPolicy); + CustomThreadConfig workerConfig = CustomThreadConfig.baseConfig("worker-thread"); + CustomThreadConfig nodeConfig = CustomThreadConfig.baseConfig("node-thread"); + return new ServiceIml(serviceConfig, referenceDaoConfig, workerConfig, shardingPolicy, nodeConfig); } } diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/TestServer.java b/src/main/java/ru/vk/itmo/test/dariasupriadkina/TestServer.java index 264814dd1..c703186c5 100644 --- a/src/main/java/ru/vk/itmo/test/dariasupriadkina/TestServer.java +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/TestServer.java @@ -4,7 +4,7 @@ import ru.vk.itmo.dao.Config; import ru.vk.itmo.test.dariasupriadkina.sharding.RendezvousHashing; import ru.vk.itmo.test.dariasupriadkina.sharding.ShardingPolicy; -import ru.vk.itmo.test.dariasupriadkina.workers.WorkerConfig; +import ru.vk.itmo.test.dariasupriadkina.workers.CustomThreadConfig; import java.io.IOException; import java.nio.file.Files; @@ -18,10 +18,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static ru.vk.itmo.test.dariasupriadkina.ServiceImlFactory.FLUSH_THRESHOLD_BYTES; + public final class TestServer { - private static final int THREADS = Runtime.getRuntime().availableProcessors(); - private static final int QUEUE_SIZE = 1024; private static final String LOCALHOST_PREFIX = "http://localhost:"; private static final int NODE_AMOUNT = 3; @@ -56,9 +56,9 @@ public static void main(String[] args) throws IOException, ExecutionException, for (ServiceConfig serviceConfig : clusterConfs) { ServiceIml serviceIml = new ServiceIml(serviceConfig, new Config(serviceConfig.workingDir(), - 1024 * 1024), - new WorkerConfig(THREADS, THREADS, QUEUE_SIZE, 30), - shardingPolicy); + FLUSH_THRESHOLD_BYTES), + CustomThreadConfig.baseConfig("worker-thread"), shardingPolicy, + CustomThreadConfig.baseConfig("node-thread")); serviceIml.start().get(2, TimeUnit.SECONDS); } } diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/REPORT.md b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/REPORT.md new file mode 100644 index 000000000..11afa32d9 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/REPORT.md @@ -0,0 +1,248 @@ +# Асинхронное взаимодействие + +При добавлении асинхронного взаимодействия на рассылку по сети, мы ожидаем, что показатели нашей системы улучшатся (повысится rps и уменьшится latency). +Так как теперь мы вместо того, чтобы ждать пока все запросы к другим нодам не будут последовательно обработаны, мы: +- Отправляем запросы параллельно +- Отправляем ответ клиенту со скоростью ответов от ack самых быстрых нод (ранее, мы ждали обработку всех from запросов) + +## Нагрузочное тестирование + +### PUT + +Точка разладки: ≈7000rps + +60s +``` +wrk -d 60 -t 4 -c 64 -R 7000 -L -s /Users/dariasupriadkina/IdeaProjects/2024-highload-dht/src/main/java/ru/vk/itmo/test/dariasupriadkina/scripts/upsert.lua http://localhost:8080 +Running 1m test @ http://localhost:8080 + Thread Stats Avg Stdev Max +/- Stdev + Latency 33.10ms 47.13ms 201.22ms 80.92% + Req/Sec 1.75k 593.61 3.05k 62.83% + Latency Distribution (HdrHistogram - Recorded Latency) + 50.000% 2.11ms + 75.000% 59.10ms + 90.000% 115.71ms + 99.000% 159.62ms + 99.900% 177.15ms + 99.990% 195.71ms + 99.999% 200.45ms +100.000% 201.34ms +``` + +30s +``` +wrk -d 30 -t 4 -c 64 -R 7000 -L -s /Users/dariasupriadkina/IdeaProjects/2024-highload-dht/src/main/java/ru/vk/itmo/test/dariasupriadkina/scripts/upsert.lua http://localhost:8080 +Running 30s test @ http://localhost:8080 + Thread Stats Avg Stdev Max +/- Stdev + Latency 33.54ms 46.81ms 200.58ms 80.57% + Req/Sec 1.76k 624.58 3.09k 61.76% + Latency Distribution (HdrHistogram - Recorded Latency) + 50.000% 1.90ms + 75.000% 61.12ms + 90.000% 115.01ms + 99.000% 154.75ms + 99.900% 185.22ms + 99.990% 196.86ms + 99.999% 199.42ms +100.000% 200.70ms +``` + +### GET-random + +Точка разладки: ≈10000rps + +60s +``` +wrk -d 60 -t 4 -c 64 -R 10000 -L -s /Users/dariasupriadkina/IdeaProjects/2024-highload-dht/src/main/java/ru/vk/itmo/test/dariasupriadkina/scripts/getrand.lua http://localhost:8080 +Running 1m test @ http://localhost:8080 + Thread Stats Avg Stdev Max +/- Stdev + Latency 21.34ms 39.43ms 185.09ms 85.56% + Req/Sec 2.50k 752.75 4.83k 78.54% + Latency Distribution (HdrHistogram - Recorded Latency) + 50.000% 1.99ms + 75.000% 13.81ms + 90.000% 90.11ms + 99.000% 155.52ms + 99.900% 174.08ms + 99.990% 179.71ms + 99.999% 182.53ms +100.000% 185.22ms +``` + +30s +``` +wrk -d 30 -t 4 -c 64 -R 10000 -L -s /Users/dariasupriadkina/IdeaProjects/2024-highload-dht/src/main/java/ru/vk/itmo/test/dariasupriadkina/scripts/getrand.lua http://localhost:8080 +Running 30s test @ http://localhost:8080 + Thread Stats Avg Stdev Max +/- Stdev + Latency 20.91ms 38.72ms 194.05ms 85.65% + Req/Sec 2.51k 739.50 4.33k 78.71% + Latency Distribution (HdrHistogram - Recorded Latency) + 50.000% 2.00ms + 75.000% 13.31ms + 90.000% 89.28ms + 99.000% 151.68ms + 99.900% 167.17ms + 99.990% 172.41ms + 99.999% 185.60ms +100.000% 194.18ms +``` + +### Результаты с прошлой лабораторной работы: + +Так как в текущей лабораторной работе измерения делались с другими параметрами wrk (64 connections/4threads), +было принято решение перемерить результаты, полученные в предыдущей лабораторной работе + +#### PUT + +Точка разладки ≈4000rps +``` + wrk -d 30 -t 4 -c 64 -R 4000 -L -s /Users/dariasupriadkina/IdeaProjects/2024-highload-dht/src/main/java/ru/vk/itmo/test/dariasupriadkina/scripts/upsert.lua http://localhost:8080 +Running 30s test @ http://localhost:8080 + Thread Stats Avg Stdev Max +/- Stdev + Latency 27.73ms 40.23ms 176.26ms 81.04% + Req/Sec 1.00k 238.36 1.65k 73.24% + Latency Distribution (HdrHistogram - Recorded Latency) + 50.000% 1.85ms + 75.000% 49.31ms + 90.000% 99.78ms + 99.000% 134.01ms + 99.900% 162.30ms + 99.990% 174.08ms + 99.999% 176.25ms +100.000% 176.38ms + +``` + +#### GET-random + +Точка разладки ≈5000rps +``` +dariasupriadkina@MacBook-Air async-profiler-3.0-macos % wrk -d 60 -t 4 -c 64 -R 5000 -L -s /Users/dariasupriadkina/IdeaProjects/2024-highload-dht/src/main/java/ru/vk/itmo/test/dariasupriadkina/scripts/getrand.lua http://localhost:8080 +Running 1m test @ http://localhost:8080 + Thread Stats Avg Stdev Max +/- Stdev + Latency 55.25ms 42.87ms 181.12ms 55.09% + Req/Sec 1.25k 154.40 2.09k 81.95% + Latency Distribution (HdrHistogram - Recorded Latency) + 50.000% 51.84ms + 75.000% 91.01ms + 90.000% 116.16ms + 99.000% 144.51ms + 99.900% 168.57ms + 99.990% 176.77ms + 99.999% 179.97ms +100.000% 181.25ms +``` + + +### Сравнение +Latency очень близки друг к другу, однако, выдерживаемый rps увеличился почти в 2 раза, что на get, что на put запросах. + +В прошлой лабораторной, большая часть %CPU уходила на метод handleRequest, в рамках которого +последовательно осуществлялось взаимодействие по сети с другими нодами, сейчас, когда это взаимодействие происходит параллельно, +можно было предполагать, что работа будет происходить быстрее. Сейчас выглядит так, будто из идеи асинхронного +взаимодействия можно было вытянуть еще выгоды. + +Повлиять на производительность могло несколько факторов: +- Неграмотная работа с пулами потоков (слишком мало или слишком много выделенных потоков под конкретную задачу) +- Свич контекст между потоками при выполнении коллбэков + +Проведем профилирование и посмотрим, можем ли мы как-то добиться меньшей latency и большей производительности + + +## Профилироване + +### PUT +Результаты профилирования PUT-запросов доступны по ссылкам: + +[upsert-alloc.html](data%2Fupsert-alloc.html) + +[upsert-cpu.html](data%2Fupsert-cpu.html) + +[upsert-lock.html](data%2Fupsert-lock.html) + +**Из интересного, при рассмотрении профиля CPU**, можно заметить, что теперь часть нашего пользовательского кода исполняется в `ForkJoinWorkerThread`. +Это связано с работой коллбэков в `CompletableFuture`. Не сказать, что они там исполняются часто: + +- apply(), вызванный коллбэком `thenApply()` в данном пуле потоков встречается в 0,35% сэмплов +- accept(), вызванный, вероятно, с помощью `.whenComplete()` в методе `collectResponsesCallback()` встречается в 0,70% сэмплов + +По идее, хоть исполнение этих коллбэков в commonPool и не сыграло глобальной роли в распределении сэмплов, но сам факт того, что +наш код исполняется в рамках разделяемого пула, который мы не можем контролировать, выглядит не очень правильным. +Вероятно, следует рассмотреть возможность замены `whenComplete` на `whenCompleteAsync` и явно указать пул потоков, на котором мы хотим исполнять +коллбэки. + +Также бросается в глаза, что в процентном соотношении метод `handleRequest` стал занимать 12% вместо 18%, как в прошлом этапе, снижение произошло как раз благодаря тому, что мы не ждем ответа всех нод, +а отправляем все запросы параллельно и ждем только `ack` ответов (стоит отметить, что снижении было бы еще более значительным, если бы профиль в 4ой лабораторной был снят, +когда я не экспериментировала с возможностью при получении ack ответов выходить из программы) + +**Профили аллокаций** выглядят похожим образом, на сравнении с профилями предыдущей лабораторной работой, однако, наличие коллбэков, исполняемых в `ForJoinWorkerThread`, добавило +новых аллокаций в этом месте, которых раньше не было + +Изменение метода `handleRequest` также немного повлияло на распределение аллокаций: изменилось с 23% до 30% +(но здесь надо учитывать, что на эту цифру повлияло и добавление абсолютно новых аллокаций в виде аллокаций в +`ForJoinWorkerThread`, о которых было сказано ранее) + +**На профиле локов:** + +В процентном соотношении `SelectorManager.run` и `PayloadThread.run` по-прежнему занимают 16% и 83% соответственно +В рамках работы `SelectorManager.run` заметно увеличилось время ожидания на локах для метода `ConnectionPool.purgeExpiredConnectionsAndReturnNextDeadline`: +в прошлой реализации он занимал 0.93%, а теперь 9.36% + +`ThreadPoolExecutor.getTask` уменьшилось с 8% до 3%, что может говорить о том, что на получение задач наши потоки стали тратить меньше времени, что +может быть вызвано тем фактом, что у нас добавляется количество тасок, которые надо выполнить в виде коллбэков. Коллбэки сами по себе не слишком +долго выполняются, однако могут занять для этого целый поток. + +### GET +Результаты профилирования GET-запросов доступны по ссылкам: + +[getrand-alloc.html](data%2Fgetrand-alloc.html) + +[getrand-cpu.html](data%2Fgetrand-cpu.html) + +[getrand-lock.html](data%2Fgetrand-lock.html) + +С get-запросами все астоит аналогичным образом, что и с put, так как разницы в серверной обработке в моем решении +нет абсолютно никакой, отличается лишь работа, выполняемая в dao. Отличается лишь степень изменения в %. Например, +на профиле локов `ThreadPoolExecutor.getTask` уменьшилось с 11% до 2%, что говорит о том, что в очереди у нас почти всегда есть таски, +которые необходимо обработать, благодаря чему, мы не блокируемся. + +Вообще, такое значение `ThreadPoolExecutor.getTask` и увеличение количества выполняемых тасок в поле, натолкнуло меня на мысль, +что, возможно, имеет смысл увеличить количество потоков в `workerExecutor` и `nodeExecutor`. Раньше, увеличение количества потоков не приводило +меня к каким-то серьезным положительным изменениям, но сейчас это выглядит весьма логично. + +Я увеличила количество потоков в этих пулах в 2 раза: +с 8 (по количеству ядер в машине, на которой выполняется лабораторная работа) до 16. + +И это дало свои результаты. При повторном использовании wrk на get-запросах при 10000rps получилось следующее: + +``` +wrk -d 30 -t 4 -c 64 -R 10000 -L -s /Users/dariasupriadkina/IdeaProjects/2024-highload-dht/src/main/java/ru/vk/itmo/test/dariasupriadkina/scripts/getrand.lua http://localhost:8080 +Running 30s test @ http://localhost:8080 + Thread Stats Avg Stdev Max +/- Stdev + Latency 2.12ms 2.79ms 28.14ms 90.93% + Req/Sec 2.64k 849.06 8.22k 85.14% + Latency Distribution (HdrHistogram - Recorded Latency) + 50.000% 1.33ms + 75.000% 1.84ms + 90.000% 4.01ms + 99.000% 14.73ms + 99.900% 20.14ms + 99.990% 24.37ms + 99.999% 27.63ms +100.000% 28.16ms +``` + +Latency в разы уменьшилась + +Чтобы обеспечить выполнение коллбэков в наших пулах, я явно указала, что хочу, чтобы эти коллбэки +вызывались в `workerExecutor`'е за счет замены `.thenApply()` и `.whenComplete()` на `.thenApplyAsync()` и +`.whenCompleteAsync()` с указанием `workerExecutor` в качестве параметра. + +Сама производительность вроде как не изменилась, персентили не поменялись, а коллбэки из `ForkJoinWorkerThread` +исчезли: + +[getrand2-cpu.html](data%2Fgetrand2-cpu.html) + +[getrand2-alloc.html](data%2Fgetrand2-alloc.html) + +[getrand2-lock.html](data%2Fgetrand2-lock.html) \ No newline at end of file diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand-alloc.html b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand-alloc.html new file mode 100644 index 000000000..f3bb6624d --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand-alloc.html @@ -0,0 +1,3153 @@ + + + + + + + +

Allocation profile

+
  
+
Produced by async-profiler
+ +
+

+

Matched:

+ diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand-cpu.html b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand-cpu.html new file mode 100644 index 000000000..cd7e5aa0c --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand-cpu.html @@ -0,0 +1,3413 @@ + + + + + + + +

CPU profile

+
  
+
Produced by async-profiler
+ +
+

+

Matched:

+ diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand-lock.html b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand-lock.html new file mode 100644 index 000000000..637db51a7 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand-lock.html @@ -0,0 +1,907 @@ + + + + + + + +

Lock profile

+
  
+
Produced by async-profiler
+ +
+

+

Matched:

+ diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand2-alloc.html b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand2-alloc.html new file mode 100644 index 000000000..9d576c197 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand2-alloc.html @@ -0,0 +1,3215 @@ + + + + + + + +

Allocation profile

+
  
+
Produced by async-profiler
+ +
+

+

Matched:

+ diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand2-cpu.html b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand2-cpu.html new file mode 100644 index 000000000..1e42e972a --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand2-cpu.html @@ -0,0 +1,2846 @@ + + + + + + + +

CPU profile

+
  
+
Produced by async-profiler
+ +
+

+

Matched:

+ diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand2-lock.html b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand2-lock.html new file mode 100644 index 000000000..b423ff688 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/getrand2-lock.html @@ -0,0 +1,1136 @@ + + + + + + + +

Lock profile

+
  
+
Produced by async-profiler
+ +
+

+

Matched:

+ diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/upsert-alloc.html b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/upsert-alloc.html new file mode 100644 index 000000000..f9ae283af --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/upsert-alloc.html @@ -0,0 +1,3218 @@ + + + + + + + +

Allocation profile

+
  
+
Produced by async-profiler
+ +
+

+

Matched:

+ diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/upsert-cpu.html b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/upsert-cpu.html new file mode 100644 index 000000000..97d50de4d --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/upsert-cpu.html @@ -0,0 +1,3142 @@ + + + + + + + +

CPU profile

+
  
+
Produced by async-profiler
+ +
+

+

Matched:

+ diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/upsert-lock.html b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/upsert-lock.html new file mode 100644 index 000000000..ca2bf3f48 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw5/data/upsert-lock.html @@ -0,0 +1,953 @@ + + + + + + + +

Lock profile

+
  
+
Produced by async-profiler
+ +
+

+

Matched:

+ diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/scripts/getrand.lua b/src/main/java/ru/vk/itmo/test/dariasupriadkina/scripts/getrand.lua index 7f9c03539..f0c079cec 100644 --- a/src/main/java/ru/vk/itmo/test/dariasupriadkina/scripts/getrand.lua +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/scripts/getrand.lua @@ -5,6 +5,6 @@ headers = {} headers["Host"] = "localhost:8080" request = function() - id = math.random(0, 3500000) + id = math.random(0, 109000) return wrk.format("GET", tostring(url .. id), headers) end \ No newline at end of file diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/workers/CustomThreadConfig.java b/src/main/java/ru/vk/itmo/test/dariasupriadkina/workers/CustomThreadConfig.java new file mode 100644 index 000000000..05d9b66e0 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/workers/CustomThreadConfig.java @@ -0,0 +1,64 @@ +package ru.vk.itmo.test.dariasupriadkina.workers; + +import one.nio.async.CustomThreadFactory; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class CustomThreadConfig { + + public static final long KEEP_ALIVE_TIME = 1000L; + public static final TimeUnit KEEP_ALIVE_TIME_SECONDS = TimeUnit.SECONDS; + public static final int QUEUE_SIZE = 1024; + public static final int THREADS = Runtime.getRuntime().availableProcessors(); + public static final int SHUTDOWN_TIMEOUT_SEC = 60; + + private final int corePoolSize; + private final int maximumPoolSize; + private final int shutdownTimeoutSec; + private final ArrayBlockingQueue workQueue; + private final CustomThreadFactory threadFactory; + private final RejectedExecutionHandler handler; + + public CustomThreadConfig(int corePoolSize, int maximumPoolSize, int queueSize, int shutdownTimeoutSec, + String threadName, RejectedExecutionHandler handler) { + this.corePoolSize = corePoolSize; + this.maximumPoolSize = maximumPoolSize; + this.workQueue = new ArrayBlockingQueue<>(queueSize); + this.shutdownTimeoutSec = shutdownTimeoutSec; + this.threadFactory = new CustomThreadFactory(threadName, true); + this.handler = handler; + } + + public static CustomThreadConfig baseConfig(String threadName) { + return new CustomThreadConfig(THREADS * 2, THREADS * 2, + QUEUE_SIZE, SHUTDOWN_TIMEOUT_SEC, threadName, new ThreadPoolExecutor.AbortPolicy()); + } + + public int getCorePoolSize() { + return corePoolSize; + } + + public int getMaximumPoolSize() { + return maximumPoolSize; + } + + public BlockingQueue getWorkQueue() { + return workQueue; + } + + public int getShutdownTimeoutSec() { + return shutdownTimeoutSec; + } + + public CustomThreadFactory getThreadFactory() { + return threadFactory; + } + + public RejectedExecutionHandler getHandler() { + return handler; + } +} diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/workers/WorkerThreadPoolExecutor.java b/src/main/java/ru/vk/itmo/test/dariasupriadkina/workers/CustomThreadPoolExecutor.java similarity index 76% rename from src/main/java/ru/vk/itmo/test/dariasupriadkina/workers/WorkerThreadPoolExecutor.java rename to src/main/java/ru/vk/itmo/test/dariasupriadkina/workers/CustomThreadPoolExecutor.java index 26123f689..72bfcb564 100644 --- a/src/main/java/ru/vk/itmo/test/dariasupriadkina/workers/WorkerThreadPoolExecutor.java +++ b/src/main/java/ru/vk/itmo/test/dariasupriadkina/workers/CustomThreadPoolExecutor.java @@ -1,22 +1,21 @@ package ru.vk.itmo.test.dariasupriadkina.workers; -import one.nio.async.CustomThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -public class WorkerThreadPoolExecutor extends ThreadPoolExecutor { +public class CustomThreadPoolExecutor extends ThreadPoolExecutor { - private static final Logger logger = LoggerFactory.getLogger(WorkerThreadPoolExecutor.class.getName()); + private static final Logger logger = LoggerFactory.getLogger(CustomThreadPoolExecutor.class.getName()); private final int shutdownTimeoutSec; - public WorkerThreadPoolExecutor(WorkerConfig workerConfig) { + public CustomThreadPoolExecutor(CustomThreadConfig workerConfig) { super(workerConfig.getCorePoolSize(), workerConfig.getMaximumPoolSize(), - WorkerConfig.KEEP_ALIVE_TIME, WorkerConfig.KEEP_ALIVE_TIME_SECONDS, - workerConfig.getWorkQueue(), new CustomThreadFactory("worker-thread", true), - new AbortPolicy()); + CustomThreadConfig.KEEP_ALIVE_TIME, CustomThreadConfig.KEEP_ALIVE_TIME_SECONDS, + workerConfig.getWorkQueue(), workerConfig.getThreadFactory(), + workerConfig.getHandler()); this.shutdownTimeoutSec = workerConfig.getShutdownTimeoutSec(); } diff --git a/src/main/java/ru/vk/itmo/test/dariasupriadkina/workers/WorkerConfig.java b/src/main/java/ru/vk/itmo/test/dariasupriadkina/workers/WorkerConfig.java deleted file mode 100644 index a46b99965..000000000 --- a/src/main/java/ru/vk/itmo/test/dariasupriadkina/workers/WorkerConfig.java +++ /dev/null @@ -1,39 +0,0 @@ -package ru.vk.itmo.test.dariasupriadkina.workers; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - -public class WorkerConfig { - - public static final long KEEP_ALIVE_TIME = 1000L; - public static final TimeUnit KEEP_ALIVE_TIME_SECONDS = TimeUnit.SECONDS; - - private final int corePoolSize; - private final int maximumPoolSize; - private final int shutdownTimeoutSec; - private final ArrayBlockingQueue workQueue; - - public WorkerConfig(int corePoolSize, int maximumPoolSize, int queueSize, int shutdownTimeoutSec) { - this.corePoolSize = corePoolSize; - this.maximumPoolSize = maximumPoolSize; - this.workQueue = new ArrayBlockingQueue<>(queueSize); - this.shutdownTimeoutSec = shutdownTimeoutSec; - } - - public int getCorePoolSize() { - return corePoolSize; - } - - public int getMaximumPoolSize() { - return maximumPoolSize; - } - - public BlockingQueue getWorkQueue() { - return workQueue; - } - - public int getShutdownTimeoutSec() { - return shutdownTimeoutSec; - } -}