Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Андрей Смирнов ИТМО КТ stage6 #219

Closed
wants to merge 38 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
33dec72
added first realization
sandrew-uj Feb 20, 2024
4ed27a3
added wrk2
sandrew-uj Feb 21, 2024
215a47c
remove long line
sandrew-uj Feb 21, 2024
b4630db
style fix
sandrew-uj Feb 21, 2024
62ee00a
style fix1
sandrew-uj Feb 21, 2024
474f3f0
added profiling
sandrew-uj Feb 21, 2024
2cd4f41
Merge branch 'main' into main
daniil-ushkov Feb 25, 2024
ea708bc
Merge branch 'main' into main
daniil-ushkov Feb 25, 2024
6197f2f
Merge branch 'polis-vk:main' into main
sandrew-uj Feb 28, 2024
11f2f3f
added stage 2
sandrew-uj Feb 29, 2024
8ba7877
Merge branch 'main' into main
incubos Mar 1, 2024
508c025
codeclimate fix + stage2.md
sandrew-uj Mar 1, 2024
a10c6db
Merge remote-tracking branch 'origin/main'
sandrew-uj Mar 1, 2024
a0aa716
codeclimate fix1
sandrew-uj Mar 1, 2024
dd51faa
Merge branch 'main' into main
sandrew-uj Mar 7, 2024
a3d2c05
fixes in code provided
sandrew-uj Mar 7, 2024
b4e208a
codeclimate fix + added comparing
sandrew-uj Mar 7, 2024
cf7c014
images in md added
sandrew-uj Mar 7, 2024
d816a3a
Merge branch 'main' into main
sandrew-uj Mar 12, 2024
dcdc8d2
executor fix
sandrew-uj Mar 13, 2024
fabb285
style fix
sandrew-uj Mar 13, 2024
3673f33
added better stage readme
sandrew-uj Mar 19, 2024
506da2d
Merge branch 'main' into main
sandrew-uj Mar 19, 2024
1deba38
Merge branch 'main' into main
sandrew-uj Mar 26, 2024
f916e7e
removed unused implements
sandrew-uj Mar 26, 2024
0023be5
Merge remote-tracking branch 'origin/main'
sandrew-uj Mar 26, 2024
fce0383
removed unused import
sandrew-uj Mar 26, 2024
fe005fd
Merge branch 'polis-vk:main' into main
sandrew-uj Mar 29, 2024
8c183e1
Merge branch 'polis-vk:main' into main
sandrew-uj Apr 10, 2024
e50a0e7
Merge branch 'polis-vk:main' into main
sandrew-uj Apr 25, 2024
314cbaa
added solution: try to test on linux
Apr 25, 2024
e5b1014
added solution: local tests passed
Apr 25, 2024
f1df04d
added solution: codeclimate fix
Apr 25, 2024
4505767
added solution: flopping
Apr 25, 2024
fffa6f1
added solution: flopping1
Apr 25, 2024
5c055b1
added report
May 10, 2024
9e190b9
Merge branch 'main' into stage6
sandrew-uj May 10, 2024
096dcac
flopping
May 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/ru/vk/itmo/test/smirnovandrew/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public static void main(String[] args) throws IOException {
ReferenceDao dao = new ReferenceDao(
new Config(
data,
2 * 1024 * 1024
2L * 1024 * 1024
)
);
String localhost = "http://localhost";
Expand Down
174 changes: 86 additions & 88 deletions src/main/java/ru/vk/itmo/test/smirnovandrew/MyServer.java
Original file line number Diff line number Diff line change
@@ -1,44 +1,39 @@
package ru.vk.itmo.test.smirnovandrew;

import one.nio.http.Header;
import one.nio.http.HttpClient;
import one.nio.http.HttpException;
import one.nio.http.HttpServer;
import one.nio.http.HttpSession;
import one.nio.http.Param;
import one.nio.http.Path;
import one.nio.http.Request;
import one.nio.http.RequestMethod;
import one.nio.http.Response;
import one.nio.pool.PoolException;
import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.test.reference.dao.ReferenceDao;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;

public class MyServer extends HttpServer {
private static final String ROOT = "/v0/entity";
private static final String X_SENDER_NODE = "X-SenderNode";
private static final String NOT_ENOUGH_REPLICAS = "504 Not Enough Replicas";
private static final long DURATION = 1000L;
private static final int OK_STATUS = 300;
private static final int NOT_FOUND_STATUS = 404;
private static final String HEADER_DELIMITER = ": ";
private final MyServerDao dao;
private final MyExecutor executor;
private final Logger logger;
private final Map<String, HttpClient> httpClients;
private final HttpClient httpClient;
private final RendezvousClusterManager rendezvousClustersManager;
private final ServiceConfig config;

Expand All @@ -64,17 +59,20 @@ public MyServer(
this.dao = new MyServerDao(dao);
this.executor = new MyExecutor(corePoolSize, availableProcessors);
this.logger = Logger.getLogger(MyServer.class.getName());
this.httpClients = config.clusterUrls().stream()
.filter(url -> !Objects.equals(url, config.selfUrl()))
.collect(Collectors.toMap(s -> s, MyServerUtil::createClient, (c, c1) -> c));
this.httpClient = HttpClient.newHttpClient();
}

@Override
public void handleRequest(Request request, HttpSession session) throws IOException {
try {
long exp = System.currentTimeMillis() + DURATION;
long exp = System.currentTimeMillis() + MyServerUtil.DURATION;
executor.execute(() -> {
try {
if (Objects.equals(request.getPath(), MyServerUtil.ROOT_ENTITIES)) {
entities(request, session);
return;
}

if (System.currentTimeMillis() > exp) {
MyServerUtil.sendEmpty(session, logger, Response.SERVICE_UNAVAILABLE);
} else {
Expand All @@ -98,27 +96,31 @@ private static int quorum(int from) {
return from / 2 + 1;
}

private Response sendToAnotherNode(
private HttpRequest toHttpRequest(Request request, String nodeUrl, String params) {
return HttpRequest.newBuilder(URI.create(nodeUrl + MyServerUtil.ROOT + "?" + params))
.method(request.getMethodName(), request.getBody() == null
? HttpRequest.BodyPublishers.noBody()
: HttpRequest.BodyPublishers.ofByteArray(request.getBody()))
.setHeader(MyServerUtil.X_SENDER_NODE, config.selfUrl())
.build();
}

private CompletableFuture<Response> sendToAnotherNode(
Request request,
int ack,
int from,
String id,
String clusterUrl,
Function<MyServerDao, Response> operation
) {
if (Objects.equals(clusterUrl, config.selfUrl())) {
return operation.apply(dao);
return CompletableFuture.completedFuture(operation.apply(dao));
}

var httpClient = httpClients.get(clusterUrl);
var httpRequest = toHttpRequest(request, clusterUrl, String.format("id=%s&from=%d&ack=%d", id, from, ack));

try {
return httpClient.invoke(request);
} catch (InterruptedException e) {
logger.info(e.getMessage());
Thread.currentThread().interrupt();
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
} catch (HttpException | IOException | PoolException e1) {
logger.info(e1.getMessage());
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
}
return httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray())
.thenApplyAsync(MyServerUtil::processingResponse);
}

private Response handleLocalRequest(
Expand All @@ -128,18 +130,22 @@ private Response handleLocalRequest(
Integer ackParam,
String senderNode,
Function<MyServerDao, Response> operation
) {
Integer from = fromParam;
if (Objects.isNull(from)) {
) throws ExecutionException, InterruptedException, TimeoutException {
final int from;
if (Objects.isNull(fromParam)) {
from = config.clusterUrls().size();
} else {
from = fromParam;
}

Integer ack = ackParam;
if (Objects.isNull(ack)) {
final int ack;
if (Objects.isNull(ackParam)) {
ack = quorum(from);
} else {
ack = ackParam;
}

String paramError = getParametersError(id, from, ack);
String paramError = MyServerUtil.getParametersError(id, from, ack, config.clusterUrls().size());
if (Objects.nonNull(paramError)) {
return new Response(Response.BAD_REQUEST, paramError.getBytes(StandardCharsets.UTF_8));
}
Expand All @@ -154,62 +160,40 @@ private Response handleLocalRequest(
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}

var sortedNodes = RendezvousClusterManager.getSortedNodes(from, config);
var sortedNodes = RendezvousClusterManager.getSortedNodes(id, from, config);

if (sortedNodes.stream().map(config.clusterUrls()::get).noneMatch(config.selfUrl()::equals)) {
return sendToAnotherNode(request, clusterUrl, operation);
}

request.addHeader(String.join(HEADER_DELIMITER, X_SENDER_NODE, config.selfUrl()));
var responses = new ArrayList<Response>();
for (int nodeNumber : sortedNodes) {
var r = sendToAnotherNode(request, config.clusterUrls().get(nodeNumber), operation);
if (r.getStatus() < OK_STATUS
|| (r.getStatus() == NOT_FOUND_STATUS && request.getMethod() == Request.METHOD_GET)) {
responses.add(r);
}
}

if (responses.size() < ack) {
return new Response(NOT_ENOUGH_REPLICAS, Response.EMPTY);
}
return MyServerUtil.getMaxTimestampResponse(responses);
}

private String getParametersError(String id, Integer from, Integer ack) {
if (Objects.isNull(id) || id.isEmpty()) {
return "Invalid id provided";
}

if (ack <= 0) {
return "Too small ack";
return sendToAnotherNode(request, ack, from, id, clusterUrl, operation)
.get(MyServerUtil.DURATION, TimeUnit.MILLISECONDS);
}

if (from <= 0) {
return "Too small from";
}

int clusterSize = config.clusterUrls().size();
if (from > clusterSize) {
return String.format("From is greater than cluster size: from=%d, clusterSize=%d", from, clusterSize);
}

if (ack > from) {
return String.format("Ack is greater than from: ack=%d, from=%d", ack, from);
}

return null;
var completableResults = sortedNodes.stream()
.map(nodeNumber -> sendToAnotherNode(
request,
ack,
from,
id,
config.clusterUrls().get(nodeNumber),
operation))
.toList();

return MyServerUtil.getResults(
from,
ack,
completableResults,
logger
);
}

@Path(ROOT)
@Path(MyServerUtil.ROOT)
@RequestMethod(Request.METHOD_GET)
public Response get(
@Param(value = "id", required = true) String id,
@Param(value = "from") Integer from,
@Param(value = "ack") Integer ack,
@Header(value = X_SENDER_NODE) String senderNode,
@Header(value = MyServerUtil.X_SENDER_NODE) String senderNode,
Request request
) {
) throws ExecutionException, InterruptedException, TimeoutException {
return handleLocalRequest(
request,
id,
Expand All @@ -220,15 +204,15 @@ public Response get(
);
}

@Path(ROOT)
@Path(MyServerUtil.ROOT)
@RequestMethod(Request.METHOD_DELETE)
public Response delete(
@Param(value = "id", required = true) String id,
@Param(value = "from") Integer from,
@Param(value = "ack") Integer ack,
@Header(value = X_SENDER_NODE) String senderNode,
@Header(value = MyServerUtil.X_SENDER_NODE) String senderNode,
Request request
) {
) throws ExecutionException, InterruptedException, TimeoutException {
return handleLocalRequest(
request,
id,
Expand All @@ -239,15 +223,15 @@ public Response delete(
);
}

@Path(ROOT)
@Path(MyServerUtil.ROOT)
@RequestMethod(Request.METHOD_PUT)
public Response put(
@Param(value = "id", required = true) String id,
@Param(value = "from") Integer from,
@Param(value = "ack") Integer ack,
@Header(value = X_SENDER_NODE) String senderNode,
@Header(value = MyServerUtil.X_SENDER_NODE) String senderNode,
Request request
) {
) throws ExecutionException, InterruptedException, TimeoutException {
request.addHeader("Content-Length: " + request.getBody().length);
request.setBody(request.getBody());

Expand All @@ -261,6 +245,20 @@ public Response put(
);
}

private void entities(
Request request,
HttpSession session
) {
String start = request.getParameter("start=");
String end = request.getParameter("end=");
if (start.isEmpty()) {
MyServerUtil.sendEmpty(session, logger, Response.BAD_REQUEST);
return;
}

MyServerUtil.handleLocalEntitiesRequest(start, end, session, dao, logger);
}

@Override
public void handleDefault(Request request, HttpSession session) throws IOException {
session.sendResponse(
Expand All @@ -275,6 +273,6 @@ public void handleDefault(Request request, HttpSession session) throws IOExcepti
public synchronized void stop() {
this.executor.shutdown();
super.stop();
httpClients.values().forEach(HttpClient::close);
httpClient.close();
}
}
11 changes: 8 additions & 3 deletions src/main/java/ru/vk/itmo/test/smirnovandrew/MyServerDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.io.ObjectOutputStream;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.util.Iterator;

public class MyServerDao {
private final Dao<MemorySegment, Entry<MemorySegment>> dao;
Expand All @@ -22,6 +23,10 @@ public MyServerDao(Dao<MemorySegment, Entry<MemorySegment>> dao) {
this.dao = dao;
}

Iterator<Entry<MemorySegment>> getEntriesFromDao(String start, String end) {
return dao.get(DaoFactory.fromString(start), DaoFactory.fromString(end));
}

Response getEntryFromDao(String id) {
Entry<MemorySegment> entry = dao.get(DaoFactory.fromString(id));
if (entry == null) {
Expand All @@ -31,11 +36,11 @@ Response getEntryFromDao(String id) {
ValWithTime valueWithTimestamp = byteArrayToObject(entry.value().toArray(ValueLayout.JAVA_BYTE));
if (valueWithTimestamp.value() == null) {
Response response = new Response(Response.NOT_FOUND, Response.EMPTY);
response.addHeader(MyServerUtil.X_TIMESTAMP + valueWithTimestamp.timestamp());
response.addHeader("X-Timestamp: " + valueWithTimestamp.timestamp());
return response;
}
Response response = new Response(Response.OK, valueWithTimestamp.value());
response.addHeader(MyServerUtil.X_TIMESTAMP + valueWithTimestamp.timestamp());
response.addHeader("X-Timestamp: " + valueWithTimestamp.timestamp());
return response;
} catch (IOException | ClassNotFoundException e) {
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
Expand Down Expand Up @@ -64,7 +69,7 @@ Response deleteValueFromDao(String id) {
return new Response(Response.ACCEPTED, Response.EMPTY);
}

private static ValWithTime byteArrayToObject(byte[] bytes) throws IOException, ClassNotFoundException {
static ValWithTime byteArrayToObject(byte[] bytes) throws IOException, ClassNotFoundException {
var byteArrayInputStream = new ByteArrayInputStream(bytes);
try (var objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
return (ValWithTime) objectInputStream.readObject();
Expand Down
Loading
Loading