-
Notifications
You must be signed in to change notification settings - Fork 48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Смирнов Андрей ИТМО КТ Stage 5 #181
Changes from 51 commits
33dec72
4ed27a3
215a47c
b4630db
62ee00a
474f3f0
2cd4f41
ea708bc
6197f2f
11f2f3f
8ba7877
508c025
a10c6db
a0aa716
dd51faa
a3d2c05
b4e208a
cf7c014
d816a3a
dcdc8d2
fabb285
9b5f96d
3673f33
506da2d
7da5b8e
1deba38
f916e7e
0023be5
fce0383
2d05f98
3895f6e
f4b18fd
31eef05
7a7bdf5
8b125f3
0fa4876
906f9e3
8fd039f
3ab4bca
a5f3699
fe005fd
359ca1b
745c99f
b103ff5
7a2d010
33303de
7b503fa
9877d83
9007e40
fe3810b
60e8b9e
63f606d
4466032
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,44 +1,39 @@ | ||
package ru.vk.itmo.test.smirnovandrew; | ||
|
||
import one.nio.http.Header; | ||
import one.nio.http.HttpClient; | ||
import one.nio.http.HttpException; | ||
import one.nio.http.HttpServer; | ||
import one.nio.http.HttpSession; | ||
import one.nio.http.Param; | ||
import one.nio.http.Path; | ||
import one.nio.http.Request; | ||
import one.nio.http.RequestMethod; | ||
import one.nio.http.Response; | ||
import one.nio.pool.PoolException; | ||
import ru.vk.itmo.ServiceConfig; | ||
import ru.vk.itmo.test.reference.dao.ReferenceDao; | ||
|
||
import java.io.IOException; | ||
import java.net.URI; | ||
import java.net.http.HttpClient; | ||
import java.net.http.HttpRequest; | ||
import java.net.http.HttpResponse; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.ArrayList; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.RejectedExecutionException; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.function.Function; | ||
import java.util.logging.Logger; | ||
import java.util.stream.Collectors; | ||
|
||
public class MyServer extends HttpServer { | ||
private static final String ROOT = "/v0/entity"; | ||
private static final String X_SENDER_NODE = "X-SenderNode"; | ||
private static final String NOT_ENOUGH_REPLICAS = "504 Not Enough Replicas"; | ||
private static final long DURATION = 1000L; | ||
private static final int OK_STATUS = 300; | ||
private static final int NOT_FOUND_STATUS = 404; | ||
private static final String HEADER_DELIMITER = ": "; | ||
private final MyServerDao dao; | ||
private final MyExecutor executor; | ||
private final Logger logger; | ||
private final Map<String, HttpClient> httpClients; | ||
private final HttpClient httpClient; | ||
private final RendezvousClusterManager rendezvousClustersManager; | ||
private final ServiceConfig config; | ||
|
||
|
@@ -64,15 +59,13 @@ public MyServer( | |
this.dao = new MyServerDao(dao); | ||
this.executor = new MyExecutor(corePoolSize, availableProcessors); | ||
this.logger = Logger.getLogger(MyServer.class.getName()); | ||
this.httpClients = config.clusterUrls().stream() | ||
.filter(url -> !Objects.equals(url, config.selfUrl())) | ||
.collect(Collectors.toMap(s -> s, MyServerUtil::createClient, (c, c1) -> c)); | ||
this.httpClient = HttpClient.newHttpClient(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. На каком пуле потоков выполняются запросы к другим репликам? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. На том же, что задается вот в этой строчке There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
} | ||
|
||
@Override | ||
public void handleRequest(Request request, HttpSession session) throws IOException { | ||
try { | ||
long exp = System.currentTimeMillis() + DURATION; | ||
long exp = System.currentTimeMillis() + MyServerUtil.DURATION; | ||
executor.execute(() -> { | ||
try { | ||
if (System.currentTimeMillis() > exp) { | ||
|
@@ -98,27 +91,31 @@ private static int quorum(int from) { | |
return from / 2 + 1; | ||
} | ||
|
||
private Response sendToAnotherNode( | ||
private HttpRequest toHttpRequest(Request request, String nodeUrl, String params) { | ||
return HttpRequest.newBuilder(URI.create(nodeUrl + MyServerUtil.ROOT + "?" + params)) | ||
.method(request.getMethodName(), request.getBody() == null | ||
? HttpRequest.BodyPublishers.noBody() | ||
: HttpRequest.BodyPublishers.ofByteArray(request.getBody())) | ||
.setHeader(MyServerUtil.X_SENDER_NODE, config.selfUrl()) | ||
.build(); | ||
} | ||
|
||
private CompletableFuture<Response> sendToAnotherNode( | ||
Request request, | ||
int ack, | ||
int from, | ||
String id, | ||
String clusterUrl, | ||
Function<MyServerDao, Response> operation | ||
) { | ||
if (Objects.equals(clusterUrl, config.selfUrl())) { | ||
return operation.apply(dao); | ||
return CompletableFuture.completedFuture(operation.apply(dao)); | ||
} | ||
|
||
var httpClient = httpClients.get(clusterUrl); | ||
var httpRequest = toHttpRequest(request, clusterUrl, String.format("id=%s&from=%d&ack=%d", id, from, ack)); | ||
|
||
try { | ||
return httpClient.invoke(request); | ||
} catch (InterruptedException e) { | ||
logger.info(e.getMessage()); | ||
Thread.currentThread().interrupt(); | ||
return new Response(Response.INTERNAL_ERROR, Response.EMPTY); | ||
} catch (HttpException | IOException | PoolException e1) { | ||
logger.info(e1.getMessage()); | ||
return new Response(Response.INTERNAL_ERROR, Response.EMPTY); | ||
} | ||
return httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray()) | ||
.thenApplyAsync(MyServerUtil::processingResponse); | ||
} | ||
|
||
private Response handleLocalRequest( | ||
sandrew-uj marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
@@ -128,15 +125,19 @@ private Response handleLocalRequest( | |
Integer ackParam, | ||
String senderNode, | ||
Function<MyServerDao, Response> operation | ||
) { | ||
Integer from = fromParam; | ||
if (Objects.isNull(from)) { | ||
) throws ExecutionException, InterruptedException, TimeoutException { | ||
final int from; | ||
if (Objects.isNull(fromParam)) { | ||
from = config.clusterUrls().size(); | ||
} else { | ||
from = fromParam; | ||
} | ||
|
||
Integer ack = ackParam; | ||
if (Objects.isNull(ack)) { | ||
final int ack; | ||
if (Objects.isNull(ackParam)) { | ||
ack = quorum(from); | ||
} else { | ||
ack = ackParam; | ||
} | ||
|
||
String paramError = getParametersError(id, from, ack); | ||
|
@@ -154,26 +155,29 @@ private Response handleLocalRequest( | |
return new Response(Response.BAD_REQUEST, Response.EMPTY); | ||
} | ||
|
||
var sortedNodes = RendezvousClusterManager.getSortedNodes(from, config); | ||
var sortedNodes = RendezvousClusterManager.getSortedNodes(id, from, config); | ||
|
||
if (sortedNodes.stream().map(config.clusterUrls()::get).noneMatch(config.selfUrl()::equals)) { | ||
return sendToAnotherNode(request, clusterUrl, operation); | ||
return sendToAnotherNode(request, ack, from, id, clusterUrl, operation) | ||
.get(MyServerUtil.DURATION, TimeUnit.MILLISECONDS); | ||
} | ||
|
||
request.addHeader(String.join(HEADER_DELIMITER, X_SENDER_NODE, config.selfUrl())); | ||
var responses = new ArrayList<Response>(); | ||
for (int nodeNumber : sortedNodes) { | ||
var r = sendToAnotherNode(request, config.clusterUrls().get(nodeNumber), operation); | ||
if (r.getStatus() < OK_STATUS | ||
|| (r.getStatus() == NOT_FOUND_STATUS && request.getMethod() == Request.METHOD_GET)) { | ||
responses.add(r); | ||
} | ||
} | ||
|
||
if (responses.size() < ack) { | ||
return new Response(NOT_ENOUGH_REPLICAS, Response.EMPTY); | ||
} | ||
return MyServerUtil.getMaxTimestampResponse(responses); | ||
var completableResults = sortedNodes.stream() | ||
.map(nodeNumber -> sendToAnotherNode( | ||
request, | ||
ack, | ||
from, | ||
id, | ||
config.clusterUrls().get(nodeNumber), | ||
operation)) | ||
.toList(); | ||
|
||
return MyServerUtil.getResults( | ||
from, | ||
ack, | ||
completableResults, | ||
logger | ||
); | ||
} | ||
|
||
private String getParametersError(String id, Integer from, Integer ack) { | ||
|
@@ -201,15 +205,15 @@ private String getParametersError(String id, Integer from, Integer ack) { | |
return null; | ||
} | ||
|
||
@Path(ROOT) | ||
@Path(MyServerUtil.ROOT) | ||
@RequestMethod(Request.METHOD_GET) | ||
public Response get( | ||
@Param(value = "id", required = true) String id, | ||
@Param(value = "from") Integer from, | ||
@Param(value = "ack") Integer ack, | ||
@Header(value = X_SENDER_NODE) String senderNode, | ||
@Header(value = MyServerUtil.X_SENDER_NODE) String senderNode, | ||
Request request | ||
) { | ||
) throws ExecutionException, InterruptedException, TimeoutException { | ||
return handleLocalRequest( | ||
request, | ||
id, | ||
|
@@ -220,15 +224,15 @@ public Response get( | |
); | ||
} | ||
|
||
@Path(ROOT) | ||
@Path(MyServerUtil.ROOT) | ||
@RequestMethod(Request.METHOD_DELETE) | ||
public Response delete( | ||
@Param(value = "id", required = true) String id, | ||
@Param(value = "from") Integer from, | ||
@Param(value = "ack") Integer ack, | ||
@Header(value = X_SENDER_NODE) String senderNode, | ||
@Header(value = MyServerUtil.X_SENDER_NODE) String senderNode, | ||
Request request | ||
) { | ||
) throws ExecutionException, InterruptedException, TimeoutException { | ||
return handleLocalRequest( | ||
request, | ||
id, | ||
|
@@ -239,15 +243,15 @@ public Response delete( | |
); | ||
} | ||
|
||
@Path(ROOT) | ||
@Path(MyServerUtil.ROOT) | ||
@RequestMethod(Request.METHOD_PUT) | ||
public Response put( | ||
@Param(value = "id", required = true) String id, | ||
@Param(value = "from") Integer from, | ||
@Param(value = "ack") Integer ack, | ||
@Header(value = X_SENDER_NODE) String senderNode, | ||
@Header(value = MyServerUtil.X_SENDER_NODE) String senderNode, | ||
Request request | ||
) { | ||
) throws ExecutionException, InterruptedException, TimeoutException { | ||
request.addHeader("Content-Length: " + request.getBody().length); | ||
request.setBody(request.getBody()); | ||
|
||
|
@@ -275,6 +279,6 @@ public void handleDefault(Request request, HttpSession session) throws IOExcepti | |
public synchronized void stop() { | ||
this.executor.shutdown(); | ||
super.stop(); | ||
httpClients.values().forEach(HttpClient::close); | ||
httpClient.close(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Надо откатить перед мерджем