Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stage 6, Андрей Чешев, Политех #226

Merged
merged 135 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from 120 commits
Commits
Show all changes
135 commits
Select commit Hold shift + click to select a range
852ff3a
rename package
Queenore Feb 16, 2024
9cd508f
8/14 tests
Queenore Feb 17, 2024
7a9d72e
12/14 tests
Queenore Feb 17, 2024
dabe8db
14/14 tests
Queenore Feb 18, 2024
069b2c9
fixes
Queenore Feb 18, 2024
ecb61d3
fixes
Queenore Feb 18, 2024
f947df4
fixes
Queenore Feb 19, 2024
602c01e
+ report
Queenore Feb 19, 2024
71d5f45
codestyle fixes
Queenore Feb 19, 2024
1fbe9c6
codestyle fixes
Queenore Feb 19, 2024
d1ead95
add lua scripts
Queenore Feb 19, 2024
e49e1af
report fix
Queenore Feb 19, 2024
549fb23
HTML to SVG
Queenore Feb 19, 2024
121db27
add THRESHOLD_BYTES const
Queenore Feb 19, 2024
2d64026
Merge branch 'main' of https://github.com/polis-vk/2024-highload-dht …
Queenore Feb 23, 2024
cf39950
lab2 init
Queenore Feb 24, 2024
a768f60
with async result process
Queenore Feb 25, 2024
9297545
done without exception processing
Queenore Feb 25, 2024
4aea0d0
error/exception processing
Queenore Feb 25, 2024
bccbfb6
fixes
Queenore Feb 26, 2024
290721f
MAX_WORKERS_COUNT -> BLOCKING_QUEUE_MAX_SIZE
Queenore Feb 27, 2024
d6180aa
change queue size
Queenore Feb 27, 2024
fd5e8f7
fixes
Queenore Feb 28, 2024
d53aadb
add report
Queenore Feb 28, 2024
25a35c2
code review fixes
Queenore Feb 28, 2024
69fe7e0
Merge branch 'stage-1-Cheshev' into stage-2-Cheshev
Queenore Feb 28, 2024
9ee7e3a
fixes
Queenore Feb 28, 2024
e2612b7
public static -> private (method sendResponse())
Queenore Feb 28, 2024
0450bff
style fixes
Queenore Feb 28, 2024
43d1697
code climate fixes
Queenore Feb 29, 2024
a294fd8
code climate fixes
Queenore Feb 29, 2024
bb3dcaa
code climate fixes
Queenore Feb 29, 2024
edb3865
Merge branch 'main' into stage-2-Cheshev
incubos Feb 29, 2024
45f3dc2
Merge remote-tracking branch 'upstrem/main' into stage-2-Cheshev
Queenore Mar 3, 2024
4bcaa32
pr remark fixes
Queenore Mar 3, 2024
e5ec141
Merge remote-tracking branch 'origin/stage-2-Cheshev' into stage-2-Ch…
Queenore Mar 3, 2024
e7a6962
Merge remote-tracking branch 'origin/stage-2-Cheshev' into stage-2-Ch…
Queenore Mar 3, 2024
d8d68d7
init server with service in ServerStarter.java
Queenore Mar 4, 2024
c78ae4b
fix
Queenore Mar 6, 2024
08b69a8
import static fix
Queenore Mar 6, 2024
504cb3f
fix
Queenore Mar 6, 2024
f261d24
fix exception throw
Queenore Mar 6, 2024
f5060e1
add results
Queenore Mar 6, 2024
1df88f3
Merge remote-tracking branch 'origin/stage-2-Cheshev' into stage-2-Ch…
Queenore Mar 6, 2024
3e78b42
+ report
Queenore Mar 6, 2024
fda0a08
Merge branch 'main_upstream' into stage-3-Cheshev
Queenore Mar 8, 2024
1ba1419
stage 3 init
Queenore Mar 8, 2024
7fbfe06
fixes
Queenore Mar 9, 2024
3a1a33d
remove imports
Queenore Mar 9, 2024
738c049
fix
Queenore Mar 12, 2024
5338206
add timeout exception processing
Queenore Mar 13, 2024
4367dae
log description fix
Queenore Mar 13, 2024
533bcb5
code style fix
Queenore Mar 13, 2024
8ea3e9c
code style fix
Queenore Mar 13, 2024
b084d35
report fixes
Queenore Mar 15, 2024
26b8c37
report fix
Queenore Mar 16, 2024
1638918
Merge remote-tracking branch 'origin/stage-2-Cheshev' into stage-2-Ch…
Queenore Mar 16, 2024
9fd65a5
add results
Queenore Mar 16, 2024
5216084
fix
Queenore Mar 19, 2024
7e05e74
pr fix
Queenore Mar 20, 2024
fee0120
pr fix
Queenore Mar 20, 2024
ae9bacf
add starting cluster
Queenore Mar 20, 2024
ff49ef4
remove singleStart()
Queenore Mar 20, 2024
ec2ab3b
Merge branch 'stage-2-Cheshev' into stage-3-Cheshev
Queenore Mar 20, 2024
632e813
Merge remote-tracking branch 'origin/stage-3-Cheshev' into stage-3-Ch…
Queenore Mar 20, 2024
4c8f136
fix cc
Queenore Mar 20, 2024
dbdec26
fix cc
Queenore Mar 20, 2024
cbc2db1
+ report
Queenore Mar 20, 2024
53b6a4f
+ data
Queenore Mar 20, 2024
d653a8e
+ report explanation
Queenore Mar 21, 2024
e2faef6
+ entry timestamp
Queenore Mar 24, 2024
e723236
Merge remote-tracking branch 'upstrem/main' into stage-4-Cheshev
Queenore Mar 24, 2024
6773e67
stage 4 in process...
Queenore Mar 25, 2024
7be44c0
20/37
Queenore Mar 26, 2024
68a3cb4
in process...
Queenore Mar 26, 2024
e5350bd
tests done
Queenore Mar 27, 2024
bc915cc
fix codestyle
Queenore Mar 27, 2024
29188b0
fix codestyle
Queenore Mar 27, 2024
f4e15bf
fix cluster node request error
Queenore Mar 27, 2024
efa5e9c
Merge branch 'main' into stage-4-Cheshev
Queenore Mar 27, 2024
a7fead2
fix exception handle
Queenore Mar 27, 2024
08394a2
fix exception handle
Queenore Mar 28, 2024
bd3c2b7
fix codestyle
Queenore Mar 28, 2024
f9ae8ef
fix actual nodes for request
Queenore Apr 2, 2024
c287369
Merge remote-tracking branch 'origin/stage-4-Cheshev' into stage-4-Ch…
Queenore Apr 2, 2024
1e0895a
fix parameters parsing
Queenore Apr 2, 2024
f3e8e04
fix parameters parsing
Queenore Apr 2, 2024
f064f4c
fix
Queenore Apr 2, 2024
44b7e4f
Merge remote-tracking branch 'origin/stage-4-Cheshev' into stage-4-Ch…
Queenore Apr 2, 2024
3d2fb7d
fix parameters parsing
Queenore Apr 2, 2024
fc6d10f
fix codeclimate
Queenore Apr 3, 2024
e1197ba
+ report data
Queenore Apr 3, 2024
6b6614f
+ report
Queenore Apr 3, 2024
79a6848
fix report
Queenore Apr 4, 2024
10fe556
fix
Queenore Apr 4, 2024
fcb016a
Merge remote-tracking branch 'upstream/main' into stage-5-Cheshev
Queenore Apr 5, 2024
d468640
5th stage in process...
Queenore Apr 6, 2024
f3f8ec4
5th stage in process...
Queenore Apr 6, 2024
bfb9762
stage 5 done with tests
Queenore Apr 7, 2024
0e1ee69
+ thread naming
Queenore Apr 7, 2024
5970729
+ exception processing
Queenore Apr 7, 2024
5b5411d
Merge branch 'main' into stage-5-Cheshev
Queenore Apr 7, 2024
882f6d2
codeclimate fixes
Queenore Apr 7, 2024
4710474
codeclimate fixes
Queenore Apr 7, 2024
97f0aad
codeclimate fixes
Queenore Apr 7, 2024
9e45aae
fixes
Queenore Apr 8, 2024
2589874
minor fixes
Queenore Apr 10, 2024
b2e0944
minor fixes
Queenore Apr 10, 2024
9296ec9
Merge branch 'main' into stage-5-Cheshev
Queenore Apr 13, 2024
3c0188d
minor fixes
Queenore Apr 13, 2024
d56d514
..
Queenore Apr 18, 2024
cb5a55d
...
Queenore Apr 18, 2024
956e3a8
...
Queenore Apr 18, 2024
6ad1448
+ report
Queenore Apr 19, 2024
1ab7905
Merge remote-tracking branch 'upstream/main' into stage-6-Cheshev
Queenore Apr 23, 2024
46bec55
in process...
Queenore Apr 23, 2024
17efe08
in process...
Queenore Apr 24, 2024
f9219b2
in process...
Queenore Apr 25, 2024
7bb5e44
done + fixes
Queenore Apr 25, 2024
a224289
codeclimate fixes
Queenore Apr 25, 2024
3b784cb
Merge branch 'main' into stage-6-Cheshev
incubos Apr 28, 2024
a666591
fixes
Queenore May 1, 2024
b318a0b
Merge remote-tracking branch 'origin/stage-6-Cheshev' into stage-6-Ch…
Queenore May 1, 2024
e46ce06
fixes
Queenore May 2, 2024
0253c4d
fixes
Queenore May 4, 2024
669b7d4
Merge remote-tracking branch 'origin/stage-6-Cheshev' into stage-6-Ch…
Queenore May 4, 2024
285a574
fix
Queenore May 4, 2024
3c19345
+rep
Queenore May 4, 2024
7ff6971
Merge remote-tracking branch 'origin/stage-6-Cheshev' into stage-6-Ch…
Queenore May 4, 2024
5398f52
fix after merge
Queenore May 4, 2024
17f72cc
fix after merge
Queenore May 4, 2024
a4ffe64
fix report
Queenore May 4, 2024
9654d82
Merge branch 'main' into stage-6-Cheshev
lamtev May 9, 2024
86385b8
Merge branch 'main' into stage-6-Cheshev
Queenore May 19, 2024
c67a074
Merge branch 'main' into stage-6-Cheshev
incubos May 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 225 additions & 0 deletions src/main/java/ru/vk/itmo/test/andreycheshev/AsyncActions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package ru.vk.itmo.test.andreycheshev;

import one.nio.http.HttpSession;
import one.nio.http.Request;
import one.nio.http.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UncheckedIOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class AsyncActions {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncActions.class);

private static final int CPU_THREADS_COUNT = Runtime.getRuntime().availableProcessors();

public static final String FUTURE_CREATION_ERROR = "Error when CompletableFuture creation";

private final Executor internalExecutor = Executors.newFixedThreadPool(
CPU_THREADS_COUNT / 2,
new WorkerThreadFactory("Internal-thread")
);
private final Executor senderExecutor = Executors.newFixedThreadPool(
CPU_THREADS_COUNT / 2,
new WorkerThreadFactory("Sender-thread")
);
private final Executor localCallExecutor = Executors.newFixedThreadPool(
CPU_THREADS_COUNT / 2,
new WorkerThreadFactory("LocalCall-thread")
);
private final Executor remoteCallExecutor = Executors.newFixedThreadPool(
CPU_THREADS_COUNT,
new WorkerThreadFactory("RemoteCall-thread")
);
private final Executor streamingExecutor = Executors.newFixedThreadPool(
CPU_THREADS_COUNT,
new WorkerThreadFactory("Streaming-thread")
);

private final HttpClient httpClient = HttpClient.newBuilder()
.executor(remoteCallExecutor)
.connectTimeout(Duration.ofMillis(500))
.version(HttpClient.Version.HTTP_1_1)
.build();

private final HttpProvider httpProvider;

public AsyncActions(HttpProvider httpProvider) {
this.httpProvider = httpProvider;
}

public CompletableFuture<Void> processLocallyToSend(
String id,
Request request,
long timestamp,
HttpSession session) {

int method = request.getMethod();

return getLocalFuture(method, id, request, timestamp)
.thenAcceptAsync(
elements -> HttpUtils.sendResponse(
HttpUtils.getOneNioResponse(method, elements),
session
),
senderExecutor
)
.exceptionallyAsync(
exception -> {
if (exception.getCause() instanceof UncheckedIOException) {
LOGGER.error("Error when sending a request", exception);
}
return null;
},
internalExecutor
);
}

public CompletableFuture<Void> processLocallyToCollect(
int method,
String id,
Request request,
long timestamp,
ResponseCollector collector) {

CompletableFuture<Void> future = getLocalFuture(method, id, request, timestamp)
.thenApplyAsync(
collector::add,
internalExecutor
)
.exceptionallyAsync(
exception -> {
LOGGER.error("Internal error of the DAO operation", exception);
return collector.incrementResponsesCounter();
},
internalExecutor
)
.thenAcceptAsync(
condition -> {
if (condition) {
collector.sendResponse();
}
},
senderExecutor
);

return withSendingErrorProcessing(future);
}

public CompletableFuture<Void> processRemotelyToCollect(
String node,
Request request,
long timestamp,
ResponseCollector collector) {

HttpRequest httpRequest = HttpRequest.newBuilder(URI.create(node + request.getURI()))
.method(
request.getMethodName(),
request.getBody() == null
? HttpRequest.BodyPublishers.noBody()
: HttpRequest.BodyPublishers.ofByteArray(request.getBody())
)
.header(HttpUtils.TIMESTAMP_JAVA_NET_HEADER, String.valueOf(timestamp))
.timeout(Duration.ofMillis(1500))
.build();

CompletableFuture<Void> future = httpClient
.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray())
.thenApplyAsync(
response -> { // java.net response.
return collector.add(
HttpUtils.getElementsFromJavaNetResponse(response)
);
},
internalExecutor
)
.exceptionallyAsync(
exception -> {
LOGGER.error("Error when sending a request to the remote node", exception);
return collector.incrementResponsesCounter();
},
internalExecutor
)
.thenAcceptAsync(
condition -> {
if (condition) {
collector.sendResponse();
}
},
senderExecutor
);

return withSendingErrorProcessing(future);
}

public void stream(Runnable runnable) {
CompletableFuture<Void> future = CompletableFuture.runAsync(
runnable,
streamingExecutor
).exceptionallyAsync(
exception -> {
LOGGER.error("Error while streaming process", exception);
return null;
},
internalExecutor
);

checkFuture(future);
}

private CompletableFuture<ResponseElements> getLocalFuture(
int method,
String id,
Request request,
long timestamp) {

return CompletableFuture.supplyAsync(
() -> switch (method) {
case Request.METHOD_GET -> httpProvider.get(id);
case Request.METHOD_PUT -> httpProvider.put(id, request.getBody(), timestamp);
default -> httpProvider.delete(id, timestamp);
},
localCallExecutor
);
}

private CompletableFuture<Void> withSendingErrorProcessing(
CompletableFuture<Void> future) {

return future
.exceptionallyAsync(
exception -> {
LOGGER.error("Error when sending a request", exception);
return null;
},
internalExecutor
);
}

public void sendAsync(Response response, HttpSession session) {
CompletableFuture<Void> future = withSendingErrorProcessing(
CompletableFuture.runAsync(
() -> HttpUtils.sendResponse(response, session),
senderExecutor
)
);

checkFuture(future);
}

public static boolean checkFuture(CompletableFuture<?> future) {
boolean isFutureNull = future == null;
if (isFutureNull) {
LOGGER.error(FUTURE_CREATION_ERROR);
}
return isFutureNull;
}
}
9 changes: 9 additions & 0 deletions src/main/java/ru/vk/itmo/test/andreycheshev/HttpProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package ru.vk.itmo.test.andreycheshev;

public interface HttpProvider {
ResponseElements get(String id);

ResponseElements put(String id, byte[] body, long timestamp);

ResponseElements delete(String id, long timestamp);
}
97 changes: 97 additions & 0 deletions src/main/java/ru/vk/itmo/test/andreycheshev/HttpUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package ru.vk.itmo.test.andreycheshev;

import one.nio.http.HttpSession;
import one.nio.http.Request;
import one.nio.http.Response;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.http.HttpResponse;
import java.util.Map;
import java.util.Optional;

public class HttpUtils {
public static final String TIMESTAMP_JAVA_NET_HEADER = "X-Timestamp";
public static final String TIMESTAMP_ONE_NIO_HEADER = TIMESTAMP_JAVA_NET_HEADER + ": ";

private static final String NOT_ENOUGH_REPLICAS_STATUS = "504 Not Enough Replicas";
private static final String TOO_MANY_REQUESTS_STATUS = "429 Too many requests";

public static final int EMPTY_TIMESTAMP = -1;

private static final Map<Integer, String> AVAILABLE_RESPONSES = Map.of(
200, Response.OK,
201, Response.CREATED,
202, Response.ACCEPTED,
404, Response.NOT_FOUND,
410, Response.GONE
); // Immutable map.

private HttpUtils() {

}

public static Response getBadRequest() {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}

public static Response getMethodNotAllowed() {
return new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY);
}

public static Response getInternalError() {
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
}

public static Response getNotEnoughReplicas() {
return new Response(NOT_ENOUGH_REPLICAS_STATUS, Response.EMPTY);
}

public static Response getTooManyRequests() {
return new Response(TOO_MANY_REQUESTS_STATUS, Response.EMPTY);
}

public static void sendResponse(Response response, HttpSession session) {
try {
session.sendResponse(response);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static ResponseElements getElementsFromJavaNetResponse(HttpResponse<byte[]> response) {
Optional<String> optTimestamp = response.headers().firstValue(TIMESTAMP_JAVA_NET_HEADER);

long responseTimestamp = optTimestamp.isPresent()
? Long.parseLong(optTimestamp.get())
: EMPTY_TIMESTAMP;

return new ResponseElements(
response.statusCode(),
response.body(),
responseTimestamp
);
}

public static Response getOneNioResponse(int method, ResponseElements elements) {
switch (method) {
case Request.METHOD_GET -> {
int status = elements.getStatus();

Response response = status == 410
? new Response(Response.NOT_FOUND, Response.EMPTY)
: new Response(AVAILABLE_RESPONSES.get(status), elements.getBody());

response.addHeader(TIMESTAMP_ONE_NIO_HEADER + elements.getTimestamp());

return response;
}
case Request.METHOD_PUT -> {
return new Response(Response.CREATED, Response.EMPTY);
}
default -> { // For delete method.
return new Response(Response.ACCEPTED, Response.EMPTY);
}
}
}
}
Loading
Loading