Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Смирнов Андрей ИТМО КТ Stage 5 #181

Merged
merged 53 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
33dec72
added first realization
sandrew-uj Feb 20, 2024
4ed27a3
added wrk2
sandrew-uj Feb 21, 2024
215a47c
remove long line
sandrew-uj Feb 21, 2024
b4630db
style fix
sandrew-uj Feb 21, 2024
62ee00a
style fix1
sandrew-uj Feb 21, 2024
474f3f0
added profiling
sandrew-uj Feb 21, 2024
2cd4f41
Merge branch 'main' into main
daniil-ushkov Feb 25, 2024
ea708bc
Merge branch 'main' into main
daniil-ushkov Feb 25, 2024
6197f2f
Merge branch 'polis-vk:main' into main
sandrew-uj Feb 28, 2024
11f2f3f
added stage 2
sandrew-uj Feb 29, 2024
8ba7877
Merge branch 'main' into main
incubos Mar 1, 2024
508c025
codeclimate fix + stage2.md
sandrew-uj Mar 1, 2024
a10c6db
Merge remote-tracking branch 'origin/main'
sandrew-uj Mar 1, 2024
a0aa716
codeclimate fix1
sandrew-uj Mar 1, 2024
dd51faa
Merge branch 'main' into main
sandrew-uj Mar 7, 2024
a3d2c05
fixes in code provided
sandrew-uj Mar 7, 2024
b4e208a
codeclimate fix + added comparing
sandrew-uj Mar 7, 2024
cf7c014
images in md added
sandrew-uj Mar 7, 2024
d816a3a
Merge branch 'main' into main
sandrew-uj Mar 12, 2024
dcdc8d2
executor fix
sandrew-uj Mar 13, 2024
fabb285
style fix
sandrew-uj Mar 13, 2024
9b5f96d
added first realization
sandrew-uj Mar 14, 2024
3673f33
added better stage readme
sandrew-uj Mar 19, 2024
506da2d
Merge branch 'main' into main
sandrew-uj Mar 19, 2024
7da5b8e
fixes with asyncSend
sandrew-uj Mar 19, 2024
1deba38
Merge branch 'main' into main
sandrew-uj Mar 26, 2024
f916e7e
removed unused implements
sandrew-uj Mar 26, 2024
0023be5
Merge remote-tracking branch 'origin/main'
sandrew-uj Mar 26, 2024
fce0383
removed unused import
sandrew-uj Mar 26, 2024
2d05f98
Merge branch 'main' into stage4
sandrew-uj Mar 26, 2024
3895f6e
added realization
sandrew-uj Mar 28, 2024
f4b18fd
fixes
sandrew-uj Mar 28, 2024
31eef05
tests passed
sandrew-uj Mar 29, 2024
7a7bdf5
style fix
sandrew-uj Mar 29, 2024
8b125f3
Merge branch 'main' into stage4
sandrew-uj Mar 29, 2024
0fa4876
style fix1
sandrew-uj Mar 29, 2024
906f9e3
Merge remote-tracking branch 'origin/stage4' into stage4
sandrew-uj Mar 29, 2024
8fd039f
style fix2
sandrew-uj Mar 29, 2024
3ab4bca
flopping
sandrew-uj Mar 29, 2024
a5f3699
flopping1
sandrew-uj Mar 29, 2024
fe005fd
Merge branch 'polis-vk:main' into main
sandrew-uj Mar 29, 2024
359ca1b
Merge branch 'main' into stage4
sandrew-uj Mar 29, 2024
745c99f
added first realization
sandrew-uj Apr 11, 2024
b103ff5
codeclimate fix
sandrew-uj Apr 11, 2024
7a2d010
Merge branch 'main' into s5
sandrew-uj Apr 11, 2024
33303de
codeclimate fix1
sandrew-uj Apr 11, 2024
7b503fa
codeclimate fix2
sandrew-uj Apr 11, 2024
9877d83
Merge branch 'main' into s5
incubos Apr 13, 2024
9007e40
changes provided and readme added
May 1, 2024
fe3810b
codeclimate fix
May 1, 2024
60e8b9e
Merge branch 'main' into s5
sandrew-uj May 1, 2024
63f606d
Merge branch 'main' into s5
sandrew-uj May 13, 2024
4466032
build rollback
May 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ compileTestJava {
options.compilerArgs += ["--enable-preview"]
}

application {
mainClass = 'ru.vk.itmo.test.reference.ReferenceService'
run {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Надо откатить перед мерджем

// mainClass = 'ru.vk.itmo.test.reference.ReferenceService'
mainClass ='ru.vk.itmo.test.smirnovandrew.Main'
applicationDefaultJvmArgs = ['-Xmx128m', '--enable-preview', '-Xlog:gc']
// + ['-XX:+UseSerialGC']
//
Expand Down
128 changes: 66 additions & 62 deletions src/main/java/ru/vk/itmo/test/smirnovandrew/MyServer.java
Original file line number Diff line number Diff line change
@@ -1,44 +1,39 @@
package ru.vk.itmo.test.smirnovandrew;

import one.nio.http.Header;
import one.nio.http.HttpClient;
import one.nio.http.HttpException;
import one.nio.http.HttpServer;
import one.nio.http.HttpSession;
import one.nio.http.Param;
import one.nio.http.Path;
import one.nio.http.Request;
import one.nio.http.RequestMethod;
import one.nio.http.Response;
import one.nio.pool.PoolException;
import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.test.reference.dao.ReferenceDao;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;

public class MyServer extends HttpServer {
private static final String ROOT = "/v0/entity";
private static final String X_SENDER_NODE = "X-SenderNode";
private static final String NOT_ENOUGH_REPLICAS = "504 Not Enough Replicas";
private static final long DURATION = 1000L;
private static final int OK_STATUS = 300;
private static final int NOT_FOUND_STATUS = 404;
private static final String HEADER_DELIMITER = ": ";
private final MyServerDao dao;
private final MyExecutor executor;
private final Logger logger;
private final Map<String, HttpClient> httpClients;
private final HttpClient httpClient;
private final RendezvousClusterManager rendezvousClustersManager;
private final ServiceConfig config;

Expand All @@ -64,15 +59,13 @@ public MyServer(
this.dao = new MyServerDao(dao);
this.executor = new MyExecutor(corePoolSize, availableProcessors);
this.logger = Logger.getLogger(MyServer.class.getName());
this.httpClients = config.clusterUrls().stream()
.filter(url -> !Objects.equals(url, config.selfUrl()))
.collect(Collectors.toMap(s -> s, MyServerUtil::createClient, (c, c1) -> c));
this.httpClient = HttpClient.newHttpClient();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

На каком пуле потоков выполняются запросы к другим репликам?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

На том же, что задается вот в этой строчке
this.executor = new MyExecutor(corePoolSize, availableProcessors);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Но ведь при создании HttpClient в него не передается никакой executor. Соответственно, будет создан отдельный вот тут
image
Или я что-то не заметил?

}

@Override
public void handleRequest(Request request, HttpSession session) throws IOException {
try {
long exp = System.currentTimeMillis() + DURATION;
long exp = System.currentTimeMillis() + MyServerUtil.DURATION;
executor.execute(() -> {
try {
if (System.currentTimeMillis() > exp) {
Expand All @@ -98,27 +91,31 @@ private static int quorum(int from) {
return from / 2 + 1;
}

private Response sendToAnotherNode(
private HttpRequest toHttpRequest(Request request, String nodeUrl, String params) {
return HttpRequest.newBuilder(URI.create(nodeUrl + MyServerUtil.ROOT + "?" + params))
.method(request.getMethodName(), request.getBody() == null
? HttpRequest.BodyPublishers.noBody()
: HttpRequest.BodyPublishers.ofByteArray(request.getBody()))
.setHeader(MyServerUtil.X_SENDER_NODE, config.selfUrl())
.build();
}

private CompletableFuture<Response> sendToAnotherNode(
Request request,
int ack,
int from,
String id,
String clusterUrl,
Function<MyServerDao, Response> operation
) {
if (Objects.equals(clusterUrl, config.selfUrl())) {
return operation.apply(dao);
return CompletableFuture.completedFuture(operation.apply(dao));
}

var httpClient = httpClients.get(clusterUrl);
var httpRequest = toHttpRequest(request, clusterUrl, String.format("id=%s&from=%d&ack=%d", id, from, ack));

try {
return httpClient.invoke(request);
} catch (InterruptedException e) {
logger.info(e.getMessage());
Thread.currentThread().interrupt();
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
} catch (HttpException | IOException | PoolException e1) {
logger.info(e1.getMessage());
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
}
return httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray())
.thenApplyAsync(MyServerUtil::processingResponse);
}

private Response handleLocalRequest(
sandrew-uj marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -128,15 +125,19 @@ private Response handleLocalRequest(
Integer ackParam,
String senderNode,
Function<MyServerDao, Response> operation
) {
Integer from = fromParam;
if (Objects.isNull(from)) {
) throws ExecutionException, InterruptedException, TimeoutException {
final int from;
if (Objects.isNull(fromParam)) {
from = config.clusterUrls().size();
} else {
from = fromParam;
}

Integer ack = ackParam;
if (Objects.isNull(ack)) {
final int ack;
if (Objects.isNull(ackParam)) {
ack = quorum(from);
} else {
ack = ackParam;
}

String paramError = getParametersError(id, from, ack);
Expand All @@ -154,26 +155,29 @@ private Response handleLocalRequest(
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}

var sortedNodes = RendezvousClusterManager.getSortedNodes(from, config);
var sortedNodes = RendezvousClusterManager.getSortedNodes(id, from, config);

if (sortedNodes.stream().map(config.clusterUrls()::get).noneMatch(config.selfUrl()::equals)) {
return sendToAnotherNode(request, clusterUrl, operation);
return sendToAnotherNode(request, ack, from, id, clusterUrl, operation)
.get(MyServerUtil.DURATION, TimeUnit.MILLISECONDS);
}

request.addHeader(String.join(HEADER_DELIMITER, X_SENDER_NODE, config.selfUrl()));
var responses = new ArrayList<Response>();
for (int nodeNumber : sortedNodes) {
var r = sendToAnotherNode(request, config.clusterUrls().get(nodeNumber), operation);
if (r.getStatus() < OK_STATUS
|| (r.getStatus() == NOT_FOUND_STATUS && request.getMethod() == Request.METHOD_GET)) {
responses.add(r);
}
}

if (responses.size() < ack) {
return new Response(NOT_ENOUGH_REPLICAS, Response.EMPTY);
}
return MyServerUtil.getMaxTimestampResponse(responses);
var completableResults = sortedNodes.stream()
.map(nodeNumber -> sendToAnotherNode(
request,
ack,
from,
id,
config.clusterUrls().get(nodeNumber),
operation))
.toList();

return MyServerUtil.getResults(
from,
ack,
completableResults,
logger
);
}

private String getParametersError(String id, Integer from, Integer ack) {
Expand Down Expand Up @@ -201,15 +205,15 @@ private String getParametersError(String id, Integer from, Integer ack) {
return null;
}

@Path(ROOT)
@Path(MyServerUtil.ROOT)
@RequestMethod(Request.METHOD_GET)
public Response get(
@Param(value = "id", required = true) String id,
@Param(value = "from") Integer from,
@Param(value = "ack") Integer ack,
@Header(value = X_SENDER_NODE) String senderNode,
@Header(value = MyServerUtil.X_SENDER_NODE) String senderNode,
Request request
) {
) throws ExecutionException, InterruptedException, TimeoutException {
return handleLocalRequest(
request,
id,
Expand All @@ -220,15 +224,15 @@ public Response get(
);
}

@Path(ROOT)
@Path(MyServerUtil.ROOT)
@RequestMethod(Request.METHOD_DELETE)
public Response delete(
@Param(value = "id", required = true) String id,
@Param(value = "from") Integer from,
@Param(value = "ack") Integer ack,
@Header(value = X_SENDER_NODE) String senderNode,
@Header(value = MyServerUtil.X_SENDER_NODE) String senderNode,
Request request
) {
) throws ExecutionException, InterruptedException, TimeoutException {
return handleLocalRequest(
request,
id,
Expand All @@ -239,15 +243,15 @@ public Response delete(
);
}

@Path(ROOT)
@Path(MyServerUtil.ROOT)
@RequestMethod(Request.METHOD_PUT)
public Response put(
@Param(value = "id", required = true) String id,
@Param(value = "from") Integer from,
@Param(value = "ack") Integer ack,
@Header(value = X_SENDER_NODE) String senderNode,
@Header(value = MyServerUtil.X_SENDER_NODE) String senderNode,
Request request
) {
) throws ExecutionException, InterruptedException, TimeoutException {
request.addHeader("Content-Length: " + request.getBody().length);
request.setBody(request.getBody());

Expand Down Expand Up @@ -275,6 +279,6 @@ public void handleDefault(Request request, HttpSession session) throws IOExcepti
public synchronized void stop() {
this.executor.shutdown();
super.stop();
httpClients.values().forEach(HttpClient::close);
httpClient.close();
}
}
4 changes: 2 additions & 2 deletions src/main/java/ru/vk/itmo/test/smirnovandrew/MyServerDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ Response getEntryFromDao(String id) {
ValWithTime valueWithTimestamp = byteArrayToObject(entry.value().toArray(ValueLayout.JAVA_BYTE));
if (valueWithTimestamp.value() == null) {
Response response = new Response(Response.NOT_FOUND, Response.EMPTY);
response.addHeader(MyServerUtil.X_TIMESTAMP + valueWithTimestamp.timestamp());
response.addHeader("X-Timestamp: " + valueWithTimestamp.timestamp());
return response;
}
Response response = new Response(Response.OK, valueWithTimestamp.value());
response.addHeader(MyServerUtil.X_TIMESTAMP + valueWithTimestamp.timestamp());
response.addHeader("X-Timestamp: " + valueWithTimestamp.timestamp());
return response;
} catch (IOException | ClassNotFoundException e) {
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
Expand Down
Loading
Loading