Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Коротких Виктор / ИТМО DWS / Stage 5 #190

Merged
merged 53 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
29a59e0
stage-3: pre-start all threads
vitekkor Mar 8, 2024
b7db2f0
stage-3: implement consistent hashing
vitekkor Mar 13, 2024
a9e4c0c
stage-3: remove debug tmpDirs; remove unused imports
vitekkor Mar 13, 2024
bcf927f
stage-3: fix comments
vitekkor Mar 13, 2024
1329fca
stage-3: close resources
vitekkor Mar 14, 2024
2953614
stage-3: add local dev arg parsing
vitekkor Mar 20, 2024
b76fcd6
Merge remote-tracking branch 'upstream/main' into stage-3
vitekkor Mar 20, 2024
959c689
stage-3: add PUT results
vitekkor Mar 20, 2024
3b56dac
stage-3: use murmur hashing
vitekkor Mar 20, 2024
ddda31c
stage-3: minors
vitekkor Mar 20, 2024
51e494a
stage-3: trigger build
vitekkor Mar 20, 2024
9501cb6
stage-3: delete report
vitekkor Mar 20, 2024
73e2acd
stage-3: add put report
vitekkor Mar 20, 2024
91c1c4d
stage-3: add get results
vitekkor Mar 20, 2024
518d384
stage-3: add report
vitekkor Mar 20, 2024
e9316eb
Merge remote-tracking branch 'upstream/main' into stage-3
vitekkor Mar 25, 2024
4576c2b
stage-4: first try
vitekkor Mar 28, 2024
833a577
stage-4: fix tests
vitekkor Mar 28, 2024
a2486b9
stage-4: fix codestyle
vitekkor Mar 28, 2024
2d7329a
stage-4: fix codestyle 2
vitekkor Mar 28, 2024
76b6e40
stage-4: fix codestyle 3
vitekkor Mar 28, 2024
192a9c8
stage-4: provide new getting list of replicas algorithm
vitekkor Mar 31, 2024
8eb201d
stage-4: code refactoring
vitekkor Mar 31, 2024
07f5e6e
Merge remote-tracking branch 'upstream/main' into stage-4
vitekkor Apr 3, 2024
b00aaa1
stage-4: add report template
vitekkor Apr 3, 2024
96d4db4
stage-4: alloc optimization
vitekkor Apr 4, 2024
ffd3aa2
stage-4: update report
vitekkor Apr 4, 2024
f6f7fba
Merge remote-tracking branch 'upstream/main' into stage-4
vitekkor Apr 4, 2024
1726a36
Merge branch 'main' into stage-4
incubos Apr 7, 2024
9deef23
stage-4: fix comments (part 1)
vitekkor Apr 7, 2024
1b00667
stage-4: fix comments (part 2)
vitekkor Apr 7, 2024
5f60680
stage-4: fix comments (part 3)
vitekkor Apr 11, 2024
8429483
Merge remote-tracking branch 'upstream/main' into stage-4
vitekkor Apr 11, 2024
615ab86
stage-4: fix codestyle
vitekkor Apr 11, 2024
a9a4546
stage-5: init
vitekkor Apr 11, 2024
545ef88
stage-5: async implementation
vitekkor Apr 11, 2024
c06eab6
stage-5: minors
vitekkor Apr 11, 2024
5509b3d
stage-5: merge upstream
vitekkor Apr 14, 2024
0ac9eec
stage-5: fix merge
vitekkor Apr 14, 2024
af3fe25
stage-5: minors
vitekkor Apr 17, 2024
bd437fc
stage-5: add put report
vitekkor Apr 17, 2024
043e9f7
stage-5: add get report
vitekkor Apr 18, 2024
b7e4a30
Merge remote-tracking branch 'upstream/main' into stage-5
vitekkor Apr 21, 2024
b67b9de
stage-5: process local request async
vitekkor May 1, 2024
fcdb149
stage-5: extend report
vitekkor May 1, 2024
70465f7
Merge remote-tracking branch 'upstream/main' into stage-5
vitekkor May 1, 2024
0fb152c
stage-5: refactoring
vitekkor May 1, 2024
1c0ed91
Merge branch 'main' into stage-5
atimofeyev May 5, 2024
1e16718
stage-5: replace NodeResponse array with AtomicReferenceArray
vitekkor May 9, 2024
870327b
Merge remote-tracking branch 'origin/stage-5' into stage-5
vitekkor May 9, 2024
362c83a
stage-5: codeclimate fix
vitekkor May 9, 2024
26b5e4c
Merge remote-tracking branch 'upstream/main' into stage-5
vitekkor May 14, 2024
cf2c84b
Merge branch 'main' into stage-5
atimofeyev May 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;

public class ClusterResponseMerger {
private static final Logger log = LoggerFactory.getLogger(ClusterResponseMerger.class);
private final int ack;
private final int allowedUnsuccessfulResponses;
private final Request originalRequest;
private final HttpSession session;
private final NodeResponse[] nodeResponses;
private final AtomicReferenceArray<NodeResponse> nodeResponses;
private final AtomicInteger unsuccessfulResponsesCount;
private final AtomicInteger successfulResponsesCount;

Expand All @@ -25,13 +26,14 @@ public ClusterResponseMerger(int ack, int from, Request originalRequest, HttpSes
this.allowedUnsuccessfulResponses = from - ack;
this.originalRequest = originalRequest;
this.session = session;
this.nodeResponses = new NodeResponse[from];
this.nodeResponses = new AtomicReferenceArray<>(from);
this.unsuccessfulResponsesCount = new AtomicInteger();
this.successfulResponsesCount = new AtomicInteger();
}

public void addToMerge(int index, NodeResponse response) {
nodeResponses[index] = response;
// we can write in plain semantic because we read in opaque in mergeReplicasResponses method
nodeResponses.setPlain(index, response);
if (isSuccessfulResponse(response.statusCode())) {
int newSuccessfulResponsesCount = successfulResponsesCount.incrementAndGet();
if (newSuccessfulResponsesCount == ack) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicReferenceArray;

import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
import static java.net.HttpURLConnection.HTTP_ENTITY_TOO_LARGE;
Expand All @@ -27,7 +28,7 @@ public static String timestampToHeader(long timestamp) {

public static Response mergeReplicasResponses(
final Request originalRequest,
final NodeResponse[] responses,
final AtomicReferenceArray<NodeResponse> responses,
final int ack
) {
switch (originalRequest.getMethod()) {
Expand All @@ -44,11 +45,16 @@ public static Response mergeReplicasResponses(
}
}

private static Response mergeGetResponses(Request originalRequest, NodeResponse[] responses, int ack) {
private static Response mergeGetResponses(
Request originalRequest,
AtomicReferenceArray<NodeResponse> responses,
int ack
) {
long maxTimestamp = -1;
NodeResponse lastValue = null;
int successfulResponses = 0;
for (NodeResponse response : responses) {
for (int i = 0; i < responses.length(); i++) {
final NodeResponse response = responses.getOpaque(i);
if (response == null) continue;
final long valueTimestamp = getTimestamp(response);
if (valueTimestamp > maxTimestamp) {
Expand Down Expand Up @@ -78,7 +84,7 @@ private static Response mergeGetResponses(Request originalRequest, NodeResponse[

private static Response mergePutResponses(
Request originalRequest,
NodeResponse[] responses,
AtomicReferenceArray<NodeResponse> responses,
int ack
) {
if (hasNotEnoughReplicas(responses, ack)) {
Expand All @@ -89,7 +95,7 @@ private static Response mergePutResponses(

private static Response mergeDeleteResponses(
Request originalRequest,
NodeResponse[] responses,
AtomicReferenceArray<NodeResponse> responses,
int ack
) {
if (hasNotEnoughReplicas(responses, ack)) {
Expand All @@ -98,9 +104,10 @@ private static Response mergeDeleteResponses(
return LSMConstantResponse.accepted(originalRequest);
}

private static boolean hasNotEnoughReplicas(NodeResponse[] responses, int ack) {
private static boolean hasNotEnoughReplicas(AtomicReferenceArray<NodeResponse> responses, int ack) {
int successfulResponses = 0;
for (NodeResponse response : responses) {
for (int i = 0; i < responses.length(); i++) {
final NodeResponse response = responses.getOpaque(i);
if (response == null) continue;
if (response.statusCode() >= 200 && response.statusCode() < 300) {
successfulResponses++;
Expand All @@ -117,8 +124,9 @@ private static long getTimestamp(final NodeResponse response) {
return Long.parseLong(timestamp);
}

private static NodeResponse firstNotNull(NodeResponse[] responses) {
for (NodeResponse response : responses) {
private static NodeResponse firstNotNull(AtomicReferenceArray<NodeResponse> responses) {
for (int i = 0; i < responses.length(); i++) {
final NodeResponse response = responses.getOpaque(i);
if (response != null) return response;
}
throw new NoSuchElementException();
Expand Down
Loading