Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Коротких Виктор / ИТМО DWS / Stage 5 #190

Merged
merged 53 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
29a59e0
stage-3: pre-start all threads
vitekkor Mar 8, 2024
b7db2f0
stage-3: implement consistent hashing
vitekkor Mar 13, 2024
a9e4c0c
stage-3: remove debug tmpDirs; remove unused imports
vitekkor Mar 13, 2024
bcf927f
stage-3: fix comments
vitekkor Mar 13, 2024
1329fca
stage-3: close resources
vitekkor Mar 14, 2024
2953614
stage-3: add local dev arg parsing
vitekkor Mar 20, 2024
b76fcd6
Merge remote-tracking branch 'upstream/main' into stage-3
vitekkor Mar 20, 2024
959c689
stage-3: add PUT results
vitekkor Mar 20, 2024
3b56dac
stage-3: use murmur hashing
vitekkor Mar 20, 2024
ddda31c
stage-3: minors
vitekkor Mar 20, 2024
51e494a
stage-3: trigger build
vitekkor Mar 20, 2024
9501cb6
stage-3: delete report
vitekkor Mar 20, 2024
73e2acd
stage-3: add put report
vitekkor Mar 20, 2024
91c1c4d
stage-3: add get results
vitekkor Mar 20, 2024
518d384
stage-3: add report
vitekkor Mar 20, 2024
e9316eb
Merge remote-tracking branch 'upstream/main' into stage-3
vitekkor Mar 25, 2024
4576c2b
stage-4: first try
vitekkor Mar 28, 2024
833a577
stage-4: fix tests
vitekkor Mar 28, 2024
a2486b9
stage-4: fix codestyle
vitekkor Mar 28, 2024
2d7329a
stage-4: fix codestyle 2
vitekkor Mar 28, 2024
76b6e40
stage-4: fix codestyle 3
vitekkor Mar 28, 2024
192a9c8
stage-4: provide new getting list of replicas algorithm
vitekkor Mar 31, 2024
8eb201d
stage-4: code refactoring
vitekkor Mar 31, 2024
07f5e6e
Merge remote-tracking branch 'upstream/main' into stage-4
vitekkor Apr 3, 2024
b00aaa1
stage-4: add report template
vitekkor Apr 3, 2024
96d4db4
stage-4: alloc optimization
vitekkor Apr 4, 2024
ffd3aa2
stage-4: update report
vitekkor Apr 4, 2024
f6f7fba
Merge remote-tracking branch 'upstream/main' into stage-4
vitekkor Apr 4, 2024
1726a36
Merge branch 'main' into stage-4
incubos Apr 7, 2024
9deef23
stage-4: fix comments (part 1)
vitekkor Apr 7, 2024
1b00667
stage-4: fix comments (part 2)
vitekkor Apr 7, 2024
5f60680
stage-4: fix comments (part 3)
vitekkor Apr 11, 2024
8429483
Merge remote-tracking branch 'upstream/main' into stage-4
vitekkor Apr 11, 2024
615ab86
stage-4: fix codestyle
vitekkor Apr 11, 2024
a9a4546
stage-5: init
vitekkor Apr 11, 2024
545ef88
stage-5: async implementation
vitekkor Apr 11, 2024
c06eab6
stage-5: minors
vitekkor Apr 11, 2024
5509b3d
stage-5: merge upstream
vitekkor Apr 14, 2024
0ac9eec
stage-5: fix merge
vitekkor Apr 14, 2024
af3fe25
stage-5: minors
vitekkor Apr 17, 2024
bd437fc
stage-5: add put report
vitekkor Apr 17, 2024
043e9f7
stage-5: add get report
vitekkor Apr 18, 2024
b7e4a30
Merge remote-tracking branch 'upstream/main' into stage-5
vitekkor Apr 21, 2024
b67b9de
stage-5: process local request async
vitekkor May 1, 2024
fcdb149
stage-5: extend report
vitekkor May 1, 2024
70465f7
Merge remote-tracking branch 'upstream/main' into stage-5
vitekkor May 1, 2024
0fb152c
stage-5: refactoring
vitekkor May 1, 2024
1c0ed91
Merge branch 'main' into stage-5
atimofeyev May 5, 2024
1e16718
stage-5: replace NodeResponse array with AtomicReferenceArray
vitekkor May 9, 2024
870327b
Merge remote-tracking branch 'origin/stage-5' into stage-5
vitekkor May 9, 2024
362c83a
stage-5: codeclimate fix
vitekkor May 9, 2024
26b5e4c
Merge remote-tracking branch 'upstream/main' into stage-5
vitekkor May 14, 2024
cf2c84b
Merge branch 'main' into stage-5
atimofeyev May 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 68 additions & 43 deletions src/main/java/ru/vk/itmo/test/viktorkorotkikh/LSMServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import ru.vk.itmo.test.viktorkorotkikh.dao.exceptions.TooManyFlushesException;
import ru.vk.itmo.test.viktorkorotkikh.http.LSMCustomSession;
import ru.vk.itmo.test.viktorkorotkikh.http.LSMServerResponseWithMemorySegment;
import ru.vk.itmo.test.viktorkorotkikh.util.ClusterResponseMerger;
import ru.vk.itmo.test.viktorkorotkikh.util.HttpResponseNodeResponse;
import ru.vk.itmo.test.viktorkorotkikh.util.LSMConstantResponse;
import ru.vk.itmo.test.viktorkorotkikh.util.LsmServerUtil;
import ru.vk.itmo.test.viktorkorotkikh.util.NodeResponse;
import ru.vk.itmo.test.viktorkorotkikh.util.OneNioNodeResponse;
import ru.vk.itmo.test.viktorkorotkikh.util.ReplicaEmptyResponse;
import ru.vk.itmo.test.viktorkorotkikh.util.RequestParameters;
Expand All @@ -35,18 +35,14 @@
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SequencedSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static java.net.HttpURLConnection.HTTP_GATEWAY_TIMEOUT;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
Expand All @@ -66,13 +62,17 @@ public class LSMServerImpl extends HttpServer {
private final String selfUrl;
private final HttpClient clusterClient;
private final ServiceConfig serviceConfig;
private final ExecutorService clusterResponseProcessor;
private final ExecutorService localProcessor;

public LSMServerImpl(
ServiceConfig serviceConfig,
Dao<MemorySegment, TimestampedEntry<MemorySegment>> dao,
ExecutorService executorService,
ConsistentHashingManager consistentHashingManager,
HttpClient clusterClient
HttpClient clusterClient,
ExecutorService clusterResponseProcessor,
ExecutorService localProcessor
) throws IOException {
super(createServerConfig(serviceConfig));
this.dao = dao;
Expand All @@ -81,6 +81,8 @@ public LSMServerImpl(
this.selfUrl = serviceConfig.selfUrl();
this.clusterClient = clusterClient;
this.serviceConfig = serviceConfig;
this.clusterResponseProcessor = clusterResponseProcessor;
this.localProcessor = localProcessor;
}

private static HttpServerConfig createServerConfig(ServiceConfig serviceConfig) {
Expand Down Expand Up @@ -186,15 +188,15 @@ public void handleEntityRequest(Request request, HttpSession session) throws IOE

final SequencedSet<String> replicas = consistentHashingManager.getReplicasSet(from, key);

final Response response = getResponseFromReplicas(
getResponseFromReplicas(
request,
from,
replicas,
key,
id,
ack
ack,
session
);
session.sendResponse(response);
}

private static RequestParameters getRequestParameters(Request request) {
Expand All @@ -220,34 +222,38 @@ private static RequestParameters getRequestParameters(Request request) {
return new RequestParameters(id, ack, from);
}

private Response getResponseFromReplicas(
private void getResponseFromReplicas(
Request request,
Integer from,
int from,
SequencedSet<String> replicas,
byte[] key,
String id,
Integer ack
int ack,
HttpSession session
) {
final List<NodeResponse> responses = new ArrayList<>(from);
final ClusterResponseMerger clusterResponseMerger = new ClusterResponseMerger(ack, from, request, session);
final long requestTimestamp = Instant.now().toEpochMilli();
int i = 0;
for (final String replicaUrl : replicas) {
if (replicaUrl.equals(selfUrl)) {
responses.add(processLocal(request, key, id, requestTimestamp));
processLocalAsync(request, key, id, requestTimestamp, i, clusterResponseMerger);
} else {
responses.add(processRemote(request, replicaUrl, id, requestTimestamp));
processRemote(request, replicaUrl, id, requestTimestamp, i, clusterResponseMerger);
}
i++;
}
return LsmServerUtil.mergeReplicasResponses(request, responses, ack);
}

private NodeResponse processRemote(
private void processRemote(
final Request originalRequest,
final String server,
final String id,
long requestTimestamp
long requestTimestamp,
int index,
ClusterResponseMerger clusterResponseMerge
) {
final HttpRequest request = createClusterRequest(originalRequest, server, id, requestTimestamp);
return sendClusterRequest(request);
sendClusterRequest(request, index, clusterResponseMerge);
}

private static HttpRequest createClusterRequest(
Expand All @@ -259,7 +265,8 @@ private static HttpRequest createClusterRequest(
HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(URI.create(server + ENTITY_V0_PATH_WITH_ID_PARAM + id))
.header(REPLICA_REQUEST_HEADER, "")
.header(LsmServerUtil.TIMESTAMP_HEADER, String.valueOf(requestTimestamp));
.header(LsmServerUtil.TIMESTAMP_HEADER, String.valueOf(requestTimestamp))
.timeout(Duration.ofMillis(CLUSTER_REQUEST_TIMEOUT_MILLISECONDS));
switch (originalRequest.getMethod()) {
case METHOD_GET -> builder.GET();
case METHOD_PUT -> {
Expand All @@ -275,29 +282,47 @@ private static HttpRequest createClusterRequest(
return builder.build();
}

private NodeResponse sendClusterRequest(
final HttpRequest request
private void sendClusterRequest(
final HttpRequest request,
final int index,
final ClusterResponseMerger clusterResponseMerger
) {
try {
return new HttpResponseNodeResponse(
clusterClient
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
.get(CLUSTER_REQUEST_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS)
);
} catch (InterruptedException e) {
final String clusterUrl = request.uri().toString();
Thread.currentThread().interrupt();
log.warn("Current thread was interrupted during processing request to cluster {}", clusterUrl);
return new ReplicaEmptyResponse(HTTP_UNAVAILABLE);
} catch (ExecutionException e) {
final String clusterUrl = request.uri().toString();
log.error("Unexpected exception occurred while sending request to cluster {}", clusterUrl, e);
return new ReplicaEmptyResponse(HTTP_UNAVAILABLE);
} catch (TimeoutException e) {
final String clusterUrl = request.uri().toString();
log.warn("Request timeout to cluster server {}", clusterUrl);
return new ReplicaEmptyResponse(HTTP_GATEWAY_TIMEOUT);
}
clusterClient
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
.thenAcceptAsync(response ->
clusterResponseMerger.addToMerge(
index,
new HttpResponseNodeResponse(response)
), clusterResponseProcessor)
.exceptionallyAsync(throwable -> {
if (throwable.getCause() instanceof java.net.http.HttpTimeoutException) {
final String clusterUrl = request.uri().toString();
log.warn("Request timeout to cluster server {}", clusterUrl);
clusterResponseMerger.addToMerge(index, new ReplicaEmptyResponse(HTTP_GATEWAY_TIMEOUT));
} else {
final String clusterUrl = request.uri().toString();
log.error(
"Unexpected exception occurred while sending request to cluster {}",
clusterUrl,
throwable
);
clusterResponseMerger.addToMerge(index, new ReplicaEmptyResponse(HTTP_UNAVAILABLE));
}
return null;
}, clusterResponseProcessor).state();
}

private void processLocalAsync(
Request request,
byte[] key,
String id,
long requestTimestamp,
int i,
ClusterResponseMerger clusterResponseMerger
) {
localProcessor.execute(() ->
clusterResponseMerger.addToMerge(i, processLocal(request, key, id, requestTimestamp))
);
}

private OneNioNodeResponse processLocal(
Expand Down
32 changes: 28 additions & 4 deletions src/main/java/ru/vk/itmo/test/viktorkorotkikh/LSMServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class LSMServiceImpl implements Service {
private final ConsistentHashingManager consistentHashingManager;
private HttpClient clusterClient;
private ExecutorService clusterClientExecutorService;
private ExecutorService clusterResponseProcessor;
private ExecutorService localProcessor;

public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
Path baseWorkingDir = Path.of("daoWorkingDir");
Expand Down Expand Up @@ -101,9 +103,19 @@ private static LSMServerImpl createServer(
Dao<MemorySegment, TimestampedEntry<MemorySegment>> dao,
ExecutorService executorService,
ConsistentHashingManager consistentHashingManager,
HttpClient clusterClient
HttpClient clusterClient,
ExecutorService clusterResponseProcessor,
ExecutorService localProcessor
) throws IOException {
return new LSMServerImpl(serviceConfig, dao, executorService, consistentHashingManager, clusterClient);
return new LSMServerImpl(
serviceConfig,
dao,
executorService,
consistentHashingManager,
clusterClient,
clusterResponseProcessor,
localProcessor
);
}

private static Dao<MemorySegment, TimestampedEntry<MemorySegment>> createLSMDao(Path workingDir) {
Expand Down Expand Up @@ -148,12 +160,22 @@ public synchronized CompletableFuture<Void> start() throws IOException {

executorService = createExecutorService(16, 1024, "worker");
clusterClientExecutorService = createExecutorService(16, 1024, "cluster-worker");
clusterResponseProcessor = createExecutorService(16, 1024, "cluster-response");
localProcessor = createExecutorService(16, 1024, "local-processor");

clusterClient = HttpClient.newBuilder()
.executor(clusterClientExecutorService)
.build();

httpServer = createServer(serviceConfig, dao, executorService, consistentHashingManager, clusterClient);
httpServer = createServer(
serviceConfig,
dao,
executorService,
consistentHashingManager,
clusterClient,
clusterResponseProcessor,
localProcessor
);
httpServer.start();

isRunning = true;
Expand All @@ -167,6 +189,8 @@ public synchronized CompletableFuture<Void> stop() throws IOException {

shutdownHttpClient(clusterClient);
shutdownExecutorService(clusterClientExecutorService);
shutdownExecutorService(clusterResponseProcessor);
shutdownExecutorService(localProcessor);
shutdownExecutorService(executorService);
executorService = null;
clusterClient = null;
Expand Down Expand Up @@ -211,7 +235,7 @@ private static void shutdownHttpClient(HttpClient httpClient) {
}
}

@ServiceFactory(stage = 4)
@ServiceFactory(stage = 5)
public static class LSMServiceFactoryImpl implements ServiceFactory.Factory {
@Override
public Service create(ServiceConfig config) {
Expand Down
Loading
Loading