Skip to content

Commit

Permalink
Коротких Виктор / ИТМО DWS / Stage 6 (#211)
Browse files Browse the repository at this point in the history
* stage-3: pre-start all threads

* stage-3: implement consistent hashing

* stage-3: remove debug tmpDirs; remove unused imports

* stage-3: fix comments

* stage-3: close resources

* stage-3: add local dev arg parsing

* stage-3: add PUT results

* stage-3: use murmur hashing

* stage-3: minors

* stage-3: trigger build

* stage-3: delete report

* stage-3: add put report

* stage-3: add get results

* stage-3: add report

* stage-4: first try

* stage-4: fix tests

* stage-4: fix codestyle

* stage-4: fix codestyle 2

* stage-4: fix codestyle 3

* stage-4: provide new getting list of replicas algorithm

* stage-4: code refactoring

* stage-4: add report template

* stage-4: alloc optimization

* stage-4: update report

* stage-4: fix comments (part 1)

* stage-4: fix comments (part 2)

* stage-4: fix comments (part 3)

* stage-4: fix codestyle

* stage-5: init

* stage-5: async implementation

* stage-5: minors

* stage-5: fix merge

* stage-5: minors

* stage-5: add put report

* stage-5: add get report

* stage-6: implement streaming

* stage-6: fix codestyle

* stage-5: process local request async

* stage-5: extend report

* stage-5: refactoring

* stage-6: refactoring

* stage-6: refactoring

* stage-6: add report

* stage-6: fix codeclimate

* stage-6: fix comments

* stage-6: fix codeclimate

* stage-6: minor fixes for sending chunks

---------

Co-authored-by: Vadim Tsesko <[email protected]>
  • Loading branch information
vitekkor and incubos authored May 11, 2024
1 parent 8046881 commit 8028e6a
Show file tree
Hide file tree
Showing 37 changed files with 47,536 additions and 80 deletions.
133 changes: 90 additions & 43 deletions src/main/java/ru/vk/itmo/test/viktorkorotkikh/LSMServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
import ru.vk.itmo.test.viktorkorotkikh.dao.exceptions.LSMDaoOutOfMemoryException;
import ru.vk.itmo.test.viktorkorotkikh.dao.exceptions.TooManyFlushesException;
import ru.vk.itmo.test.viktorkorotkikh.http.LSMCustomSession;
import ru.vk.itmo.test.viktorkorotkikh.http.LSMRangeWriter;
import ru.vk.itmo.test.viktorkorotkikh.http.LSMServerResponseWithMemorySegment;
import ru.vk.itmo.test.viktorkorotkikh.util.ClusterResponseMerger;
import ru.vk.itmo.test.viktorkorotkikh.util.HttpResponseNodeResponse;
import ru.vk.itmo.test.viktorkorotkikh.util.LSMConstantResponse;
import ru.vk.itmo.test.viktorkorotkikh.util.LsmServerUtil;
import ru.vk.itmo.test.viktorkorotkikh.util.NodeResponse;
import ru.vk.itmo.test.viktorkorotkikh.util.OneNioNodeResponse;
import ru.vk.itmo.test.viktorkorotkikh.util.ReplicaEmptyResponse;
import ru.vk.itmo.test.viktorkorotkikh.util.RequestParameters;
Expand All @@ -35,18 +36,14 @@
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SequencedSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static java.net.HttpURLConnection.HTTP_GATEWAY_TIMEOUT;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
Expand All @@ -66,13 +63,17 @@ public class LSMServerImpl extends HttpServer {
private final String selfUrl;
private final HttpClient clusterClient;
private final ServiceConfig serviceConfig;
private final ExecutorService clusterResponseProcessor;
private final ExecutorService localProcessor;

public LSMServerImpl(
ServiceConfig serviceConfig,
Dao<MemorySegment, TimestampedEntry<MemorySegment>> dao,
ExecutorService executorService,
ConsistentHashingManager consistentHashingManager,
HttpClient clusterClient
HttpClient clusterClient,
ExecutorService clusterResponseProcessor,
ExecutorService localProcessor
) throws IOException {
super(createServerConfig(serviceConfig));
this.dao = dao;
Expand All @@ -81,6 +82,8 @@ public LSMServerImpl(
this.selfUrl = serviceConfig.selfUrl();
this.clusterClient = clusterClient;
this.serviceConfig = serviceConfig;
this.clusterResponseProcessor = clusterResponseProcessor;
this.localProcessor = localProcessor;
}

private static HttpServerConfig createServerConfig(ServiceConfig serviceConfig) {
Expand All @@ -103,6 +106,12 @@ public void handleRequest(Request request, HttpSession session) throws IOExcepti
executorService.execute(() -> {
try {
final String path = request.getPath();

if (path.startsWith("/v0/entities") && request.getMethod() == METHOD_GET) {
handleEntitiesRangeRequest(request, (LSMCustomSession) session);
return;
}

if (path.startsWith("/v0/entity")) {
handleEntityRequest(request, session);
} else {
Expand Down Expand Up @@ -186,15 +195,15 @@ public void handleEntityRequest(Request request, HttpSession session) throws IOE

final SequencedSet<String> replicas = consistentHashingManager.getReplicasSet(from, key);

final Response response = getResponseFromReplicas(
getResponseFromReplicas(
request,
from,
replicas,
key,
id,
ack
ack,
session
);
session.sendResponse(response);
}

private static RequestParameters getRequestParameters(Request request) {
Expand All @@ -220,34 +229,38 @@ private static RequestParameters getRequestParameters(Request request) {
return new RequestParameters(id, ack, from);
}

private Response getResponseFromReplicas(
private void getResponseFromReplicas(
Request request,
Integer from,
int from,
SequencedSet<String> replicas,
byte[] key,
String id,
Integer ack
int ack,
HttpSession session
) {
final List<NodeResponse> responses = new ArrayList<>(from);
final ClusterResponseMerger clusterResponseMerger = new ClusterResponseMerger(ack, from, request, session);
final long requestTimestamp = Instant.now().toEpochMilli();
int i = 0;
for (final String replicaUrl : replicas) {
if (replicaUrl.equals(selfUrl)) {
responses.add(processLocal(request, key, id, requestTimestamp));
processLocalAsync(request, key, id, requestTimestamp, i, clusterResponseMerger);
} else {
responses.add(processRemote(request, replicaUrl, id, requestTimestamp));
processRemote(request, replicaUrl, id, requestTimestamp, i, clusterResponseMerger);
}
i++;
}
return LsmServerUtil.mergeReplicasResponses(request, responses, ack);
}

private NodeResponse processRemote(
private void processRemote(
final Request originalRequest,
final String server,
final String id,
long requestTimestamp
long requestTimestamp,
int index,
ClusterResponseMerger clusterResponseMerge
) {
final HttpRequest request = createClusterRequest(originalRequest, server, id, requestTimestamp);
return sendClusterRequest(request);
sendClusterRequest(request, index, clusterResponseMerge);
}

private static HttpRequest createClusterRequest(
Expand All @@ -259,7 +272,8 @@ private static HttpRequest createClusterRequest(
HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(URI.create(server + ENTITY_V0_PATH_WITH_ID_PARAM + id))
.header(REPLICA_REQUEST_HEADER, "")
.header(LsmServerUtil.TIMESTAMP_HEADER, String.valueOf(requestTimestamp));
.header(LsmServerUtil.TIMESTAMP_HEADER, String.valueOf(requestTimestamp))
.timeout(Duration.ofMillis(CLUSTER_REQUEST_TIMEOUT_MILLISECONDS));
switch (originalRequest.getMethod()) {
case METHOD_GET -> builder.GET();
case METHOD_PUT -> {
Expand All @@ -275,29 +289,47 @@ private static HttpRequest createClusterRequest(
return builder.build();
}

private NodeResponse sendClusterRequest(
final HttpRequest request
private void sendClusterRequest(
final HttpRequest request,
final int index,
final ClusterResponseMerger clusterResponseMerger
) {
try {
return new HttpResponseNodeResponse(
clusterClient
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
.get(CLUSTER_REQUEST_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS)
);
} catch (InterruptedException e) {
final String clusterUrl = request.uri().toString();
Thread.currentThread().interrupt();
log.warn("Current thread was interrupted during processing request to cluster {}", clusterUrl);
return new ReplicaEmptyResponse(HTTP_UNAVAILABLE);
} catch (ExecutionException e) {
final String clusterUrl = request.uri().toString();
log.error("Unexpected exception occurred while sending request to cluster {}", clusterUrl, e);
return new ReplicaEmptyResponse(HTTP_UNAVAILABLE);
} catch (TimeoutException e) {
final String clusterUrl = request.uri().toString();
log.warn("Request timeout to cluster server {}", clusterUrl);
return new ReplicaEmptyResponse(HTTP_GATEWAY_TIMEOUT);
}
clusterClient
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
.thenAcceptAsync(response ->
clusterResponseMerger.addToMerge(
index,
new HttpResponseNodeResponse(response)
), clusterResponseProcessor)
.exceptionallyAsync(throwable -> {
if (throwable.getCause() instanceof java.net.http.HttpTimeoutException) {
final String clusterUrl = request.uri().toString();
log.warn("Request timeout to cluster server {}", clusterUrl);
clusterResponseMerger.addToMerge(index, new ReplicaEmptyResponse(HTTP_GATEWAY_TIMEOUT));
} else {
final String clusterUrl = request.uri().toString();
log.error(
"Unexpected exception occurred while sending request to cluster {}",
clusterUrl,
throwable
);
clusterResponseMerger.addToMerge(index, new ReplicaEmptyResponse(HTTP_UNAVAILABLE));
}
return null;
}, clusterResponseProcessor).state();
}

private void processLocalAsync(
Request request,
byte[] key,
String id,
long requestTimestamp,
int i,
ClusterResponseMerger clusterResponseMerger
) {
localProcessor.execute(() ->
clusterResponseMerger.addToMerge(i, processLocal(request, key, id, requestTimestamp))
);
}

private OneNioNodeResponse processLocal(
Expand Down Expand Up @@ -406,6 +438,21 @@ public Response handleCompact(Request request) throws IOException {
return LSMConstantResponse.ok(request);
}

public void handleEntitiesRangeRequest(Request request, LSMCustomSession session) throws IOException {
final String start = request.getParameter("start=");
final String end = request.getParameter("end=");
if (start == null || start.isEmpty() || (end != null && end.isEmpty())) {
log.debug("Bad request: start parameter and end parameter (if it present) should not be empty");
session.sendResponse(LSMConstantResponse.badRequest(request));
return;
}

final MemorySegment startMemorySegment = fromByteArray(start.getBytes(StandardCharsets.UTF_8));
final MemorySegment endMemorySegment = end == null ? null : fromByteArray(end.getBytes(StandardCharsets.UTF_8));
Iterator<TimestampedEntry<MemorySegment>> iterator = dao.get(startMemorySegment, endMemorySegment);
session.sendRangeResponse(new LSMRangeWriter(iterator, LSMConstantResponse.keepAlive(request)));
}

private static MemorySegment fromByteArray(final byte[] data) {
return MemorySegment.ofArray(data);
}
Expand Down
66 changes: 50 additions & 16 deletions src/main/java/ru/vk/itmo/test/viktorkorotkikh/LSMServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@
public class LSMServiceImpl implements Service {
private static final long FLUSH_THRESHOLD = 1 << 20; // 1 MB
private static final int TERMINATION_TIMEOUT_SECONDS = 20;
private static final int SERVER_EXECUTOR_SERVICE_THREADS_COUNT = 16;
private static final int SERVER_EXECUTOR_SERVICE_QUEUE_SIZE = 1024;
private static final int CLUSTER_HTTP_CLIENT_EXECUTOR_SERVICE_THREADS_COUNT = 16;
private static final int CLUSTER_HTTP_CLIENT_EXECUTOR_SERVICE_QUEUE_SIZE = 1024;
private static final int CLUSTER_RESPONSE_EXECUTOR_SERVICE_THREADS_COUNT = 16;
private static final int CLUSTER_RESPONSE_EXECUTOR_SERVICE_QUEUE_SIZE = 1024;
private static final int LOCAL_REQUEST_EXECUTOR_SERVICE_THREADS_COUNT = 16;
private static final int LOCAL_REQUEST_EXECUTOR_SERVICE_QUEUE_SIZE = 1024;
private final ServiceConfig serviceConfig;
private LSMServerImpl httpServer;
private boolean isRunning;
Expand All @@ -38,6 +46,8 @@ public class LSMServiceImpl implements Service {
private final ConsistentHashingManager consistentHashingManager;
private HttpClient clusterClient;
private ExecutorService clusterClientExecutorService;
private ExecutorService clusterResponseProcessor;
private ExecutorService localProcessor;

public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
Path baseWorkingDir = Path.of("daoWorkingDir");
Expand Down Expand Up @@ -96,14 +106,43 @@ public LSMServiceImpl(ServiceConfig serviceConfig) {
this.consistentHashingManager = new ConsistentHashingManager(10, serviceConfig.clusterUrls());
}

private static LSMServerImpl createServer(
ServiceConfig serviceConfig,
Dao<MemorySegment, TimestampedEntry<MemorySegment>> dao,
ExecutorService executorService,
ConsistentHashingManager consistentHashingManager,
HttpClient clusterClient
private LSMServerImpl createServer(
Dao<MemorySegment, TimestampedEntry<MemorySegment>> dao
) throws IOException {
return new LSMServerImpl(serviceConfig, dao, executorService, consistentHashingManager, clusterClient);
executorService = createExecutorService(
SERVER_EXECUTOR_SERVICE_THREADS_COUNT,
SERVER_EXECUTOR_SERVICE_QUEUE_SIZE,
"worker"
);
clusterClientExecutorService = createExecutorService(
CLUSTER_HTTP_CLIENT_EXECUTOR_SERVICE_THREADS_COUNT,
CLUSTER_HTTP_CLIENT_EXECUTOR_SERVICE_QUEUE_SIZE,
"cluster-request"
);
clusterResponseProcessor = createExecutorService(
CLUSTER_RESPONSE_EXECUTOR_SERVICE_THREADS_COUNT,
CLUSTER_RESPONSE_EXECUTOR_SERVICE_QUEUE_SIZE,
"cluster-response-processor"
);
localProcessor = createExecutorService(
LOCAL_REQUEST_EXECUTOR_SERVICE_THREADS_COUNT,
LOCAL_REQUEST_EXECUTOR_SERVICE_QUEUE_SIZE,
"local-processor"
);

clusterClient = HttpClient.newBuilder()
.executor(clusterClientExecutorService)
.build();

return new LSMServerImpl(
serviceConfig,
dao,
executorService,
consistentHashingManager,
clusterClient,
clusterResponseProcessor,
localProcessor
);
}

private static Dao<MemorySegment, TimestampedEntry<MemorySegment>> createLSMDao(Path workingDir) {
Expand Down Expand Up @@ -146,14 +185,7 @@ public synchronized CompletableFuture<Void> start() throws IOException {
if (isRunning) return CompletableFuture.completedFuture(null);
dao = createLSMDao(serviceConfig.workingDir());

executorService = createExecutorService(16, 1024, "worker");
clusterClientExecutorService = createExecutorService(16, 1024, "cluster-worker");

clusterClient = HttpClient.newBuilder()
.executor(clusterClientExecutorService)
.build();

httpServer = createServer(serviceConfig, dao, executorService, consistentHashingManager, clusterClient);
httpServer = createServer(dao);
httpServer.start();

isRunning = true;
Expand All @@ -167,6 +199,8 @@ public synchronized CompletableFuture<Void> stop() throws IOException {

shutdownHttpClient(clusterClient);
shutdownExecutorService(clusterClientExecutorService);
shutdownExecutorService(clusterResponseProcessor);
shutdownExecutorService(localProcessor);
shutdownExecutorService(executorService);
executorService = null;
clusterClient = null;
Expand Down Expand Up @@ -211,7 +245,7 @@ private static void shutdownHttpClient(HttpClient httpClient) {
}
}

@ServiceFactory(stage = 4)
@ServiceFactory(stage = 6)
public static class LSMServiceFactoryImpl implements ServiceFactory.Factory {
@Override
public Service create(ServiceConfig config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.lang.foreign.MemorySegment;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -39,7 +40,12 @@ private Iterator<TimestampedEntry<MemorySegment>> storageIterator(MemorySegment
return storage.tailMap(from).sequencedValues().iterator();
}

return storage.subMap(from, to).sequencedValues().iterator();
try {
return storage.subMap(from, to).sequencedValues().iterator();
} catch (IllegalArgumentException illegalArgumentException) {
// we get inconsistent range error when from > to
return Collections.emptyIterator();
}
}

public MemTableIterator iterator(MemorySegment from, MemorySegment to, int priorityReduction) {
Expand Down
Loading

0 comments on commit 8028e6a

Please sign in to comment.