-
Notifications
You must be signed in to change notification settings - Fork 48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Супрядкина Дарья ИТМО DWS stage 5 #180
Changes from 61 commits
2f2080a
a18dc95
528366b
990d153
3587983
75db05e
3c9655b
ad686bc
1d5c62c
825f223
8fe320f
c5a6710
e6673e6
474cc38
c601bb6
fe0ffa5
f1da741
55458dc
7b834f5
022c4d6
7de6005
889bdd7
c00be65
69c9b6d
c852e2f
6e9031e
7529b7b
91ba923
a9c58df
141a869
0f118ed
afd4a3f
e9259b8
b4fae77
9ffd458
6d47409
4f529d4
ab42efe
6625f89
a13db87
0ed611c
6da7828
80a406b
9196595
dc30ab3
6fe0449
629090b
33f5c3a
7523858
d0bd0e9
2ee3bd4
1db3665
dca0f2d
9d65379
1d65f01
6be3d39
3a569d0
71b103d
595df0a
2049803
3788b33
818a350
e9e93eb
b05df36
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,14 +25,15 @@ | |
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CopyOnWriteArrayList; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.RejectedExecutionException; | ||
import java.util.concurrent.ThreadPoolExecutor; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.FROM_HEADER; | ||
import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.FROM_HEADER_NORMAL; | ||
import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.TIMESTAMP_MILLIS_HEADER; | ||
import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.TIMESTAMP_MILLIS_HEADER_NORMAL; | ||
|
||
|
@@ -45,7 +46,6 @@ public class Server extends HttpServer { | |
private final String selfUrl; | ||
private final ShardingPolicy shardingPolicy; | ||
private final HttpClient httpClient; | ||
private static final int REQUEST_TIMEOUT_SEC = 2; | ||
private final List<String> clusterUrls; | ||
private final Utils utils; | ||
private final SelfRequestHandler selfHandler; | ||
|
@@ -70,7 +70,7 @@ private static HttpServerConfig createHttpServerConfig(ServiceConfig serviceConf | |
acceptorConfig.port = serviceConfig.selfPort(); | ||
acceptorConfig.reusePort = true; | ||
|
||
httpServerConfig.acceptors = new AcceptorConfig[] {acceptorConfig}; | ||
httpServerConfig.acceptors = new AcceptorConfig[]{acceptorConfig}; | ||
httpServerConfig.closeSessions = true; | ||
|
||
return httpServerConfig; | ||
|
@@ -83,7 +83,6 @@ public void handleDefault(Request request, HttpSession session) throws IOExcepti | |
response = new Response(Response.BAD_REQUEST, Response.EMPTY); | ||
} else { | ||
response = new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY); | ||
|
||
} | ||
session.sendResponse(response); | ||
} | ||
|
@@ -93,74 +92,68 @@ public void handleRequest(Request request, HttpSession session) throws IOExcepti | |
try { | ||
workerExecutor.execute(() -> { | ||
try { | ||
if (!permittedMethods.contains(request.getMethod())) { | ||
Map<String, Integer> ackFrom = getFromAndAck(request); | ||
int from = ackFrom.get("from"); | ||
int ack = ackFrom.get("ack"); | ||
if (!permittedMethods.contains(request.getMethod()) | ||
|| checkBadRequest(ack, from, request.getMethod())) { | ||
handleDefault(request, session); | ||
return; | ||
} | ||
if (request.getHeader(FROM_HEADER) == null) { | ||
if (request.getHeader(FROM_HEADER_NORMAL) == null) { | ||
request.addHeader(FROM_HEADER + selfUrl); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Заголовки обычно в формате There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Зачем мы здесь добавляем заголовок в исходный запрос, если он потом не используется? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. в случае, если у нас запрос пришел от пользователя, мы не имеем заголовка X-FROM, поэтому осуществляем broadcast There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Я понял изначальную идею, но заголовок |
||
|
||
Map<String, Integer> ackFrom = getFromAndAck(request); | ||
int from = ackFrom.get("from"); | ||
int ack = ackFrom.get("ack"); | ||
|
||
Response response = mergeResponses(broadcast( | ||
shardingPolicy.getNodesById(utils.getIdParameter(request), from), request, ack), | ||
ack, from); | ||
|
||
session.sendResponse(response); | ||
|
||
collectResponsesCallback( | ||
broadcast( | ||
shardingPolicy.getNodesById(utils.getIdParameter(request), from), | ||
request | ||
), ack, from, session); | ||
} else { | ||
Response resp = selfHandler.handleRequest(request); | ||
checkTimestampHeaderExistenceAndSet(resp); | ||
session.sendResponse(resp); | ||
} | ||
|
||
} catch (Exception e) { | ||
logger.error("Unexpected error", e); | ||
try { | ||
if (e.getClass() == HttpException.class) { | ||
session.sendResponse(new Response(Response.BAD_REQUEST, Response.EMPTY)); | ||
} else { | ||
session.sendResponse(new Response(Response.INTERNAL_ERROR, Response.EMPTY)); | ||
} | ||
} catch (IOException ex) { | ||
logger.error("Failed to send error response", e); | ||
session.close(); | ||
} | ||
solveUnexpectedError(e, session); | ||
} | ||
}); | ||
} catch (RejectedExecutionException e) { | ||
logger.error("Service is unavailable", e); | ||
session.sendResponse(new Response(Response.SERVICE_UNAVAILABLE, Response.EMPTY)); | ||
} | ||
} | ||
|
||
private void solveUnexpectedError(Exception e, HttpSession session) { | ||
logger.error("Unexpected error", e); | ||
SuDarina marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try { | ||
if (e.getClass() == HttpException.class) { | ||
session.sendResponse(new Response(Response.BAD_REQUEST, Response.EMPTY)); | ||
} else { | ||
session.sendResponse(new Response(Response.INTERNAL_ERROR, Response.EMPTY)); | ||
} | ||
} catch (Exception exception) { | ||
logger.error("Failed to send error response", exception); | ||
session.scheduleClose(); | ||
} | ||
} | ||
|
||
private List<Response> broadcast(List<String> nodes, Request request, int ack) { | ||
int internalAck = ack; | ||
List<Response> responses = new ArrayList<>(ack); | ||
Response response; | ||
private boolean checkBadRequest(int ack, int from, int method) { | ||
return !permittedMethods.contains(method) || ack > from || ack <= 0 || from > clusterUrls.size(); | ||
} | ||
|
||
private List<CompletableFuture<Response>> broadcast(List<String> nodes, Request request) { | ||
List<CompletableFuture<Response>> futureResponses = new ArrayList<>(nodes.size()); | ||
CompletableFuture<Response> response; | ||
if (nodes.contains(selfUrl)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Почему это не внутри цикла ниже? Зачем отдельный проход по списку и удаление себя, если обнаружили? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Когда мы делаем рассылку по всем нодам, мы должны обработать запрос и на ноде, которая делает этот запрос, она отправляет этот запрос на свой локальный обработчик и работает со своей бд. Чтобы отделить две разные логики обработки, мы делаем отдельную прогонку. В рамках одной прогонки необходимо постоянно сверять текущий url c selfUrl, даже если мы уже обработали selfUrl, постоянное сравнение продолжается, что мне показалось не очень логично. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
response = selfHandler.handleRequest(request); | ||
checkTimestampHeaderExistenceAndSet(response); | ||
responses.add(response); | ||
response = selfHandler.handleAsyncRequest(request); | ||
futureResponses.add(response); | ||
nodes.remove(selfUrl); | ||
if (--internalAck == 0) { | ||
return responses; | ||
} | ||
} | ||
|
||
for (String node : nodes) { | ||
response = handleProxy(utils.getEntryUrl(node, utils.getIdParameter(request)), request); | ||
if (response.getStatus() < 500) { | ||
checkTimestampHeaderExistenceAndSet(response); | ||
responses.add(response); | ||
if (--internalAck == 0) { | ||
return responses; | ||
} | ||
} | ||
futureResponses.add(response); | ||
} | ||
return responses; | ||
return futureResponses; | ||
} | ||
|
||
private void checkTimestampHeaderExistenceAndSet(Response response) { | ||
|
@@ -171,58 +164,75 @@ private void checkTimestampHeaderExistenceAndSet(Response response) { | |
} | ||
} | ||
|
||
private Response mergeResponses(List<Response> responses, int ack, int from) { | ||
if (responses.stream().filter(response -> response.getStatus() == 400).count() == from | ||
|| ack > from | ||
|| from > clusterUrls.size() | ||
|| ack <= 0) { | ||
return new Response(Response.BAD_REQUEST, Response.EMPTY); | ||
private void collectResponsesCallback(List<CompletableFuture<Response>> futureResponses, | ||
SuDarina marked this conversation as resolved.
Show resolved
Hide resolved
|
||
int ack, int from, HttpSession session) { | ||
List<Response> responses = new CopyOnWriteArrayList<>(); | ||
AtomicInteger successCount = new AtomicInteger(0); | ||
AtomicInteger exceptionCount = new AtomicInteger(0); | ||
for (CompletableFuture<Response> futureResponse : futureResponses) { | ||
futureResponse.whenCompleteAsync((response, exception) -> { | ||
|
||
if (exception == null && response.getStatus() < 500) { | ||
checkTimestampHeaderExistenceAndSet(response); | ||
responses.add(response); | ||
successCount.incrementAndGet(); | ||
if (successCount.get() == ack) { | ||
sendAsyncResponse(chooseResponse(responses), session); | ||
} | ||
} else { | ||
exceptionCount.incrementAndGet(); | ||
if (exceptionCount.get() > from - ack) { | ||
sendAsyncResponse(new Response("504 Not enough replicas", Response.EMPTY), session); | ||
} | ||
} | ||
|
||
}, workerExecutor).exceptionally(exception -> { | ||
logger.error("Error happened while collecting responses from nodes", exception); | ||
return new Response(Response.INTERNAL_ERROR, Response.EMPTY); | ||
incubos marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}); | ||
} | ||
if (responses.stream().filter(response -> response.getStatus() == 200 | ||
|| response.getStatus() == 404 | ||
|| response.getStatus() == 202 | ||
|| response.getStatus() == 201).count() < ack) { | ||
return new Response("504 Not Enough Replicas", Response.EMPTY); | ||
} | ||
|
||
private void sendAsyncResponse(Response resp, HttpSession session) { | ||
try { | ||
session.sendResponse(resp); | ||
} catch (IOException e) { | ||
logger.error("Failed to send error response", e); | ||
session.scheduleClose(); | ||
} | ||
} | ||
|
||
private Response chooseResponse(List<Response> responses) { | ||
return responses.stream().max((o1, o2) -> { | ||
Long header1 = Long.parseLong(o1.getHeader(TIMESTAMP_MILLIS_HEADER)); | ||
Long header2 = Long.parseLong(o2.getHeader(TIMESTAMP_MILLIS_HEADER)); | ||
return header1.compareTo(header2); | ||
}).get(); | ||
} | ||
|
||
public Response handleProxy(String redirectedUrl, Request request) { | ||
try { | ||
HttpRequest httpRequest = HttpRequest.newBuilder(URI.create(redirectedUrl)) | ||
.header(FROM_HEADER, selfUrl) | ||
.method(request.getMethodName(), HttpRequest.BodyPublishers.ofByteArray( | ||
request.getBody() == null ? new byte[]{} : request.getBody()) | ||
).build(); | ||
HttpResponse<byte[]> response = httpClient | ||
.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray()) | ||
.get(REQUEST_TIMEOUT_SEC, TimeUnit.SECONDS); | ||
Response response1 = new Response(String.valueOf(response.statusCode()), response.body()); | ||
if (response.headers().map().get(TIMESTAMP_MILLIS_HEADER_NORMAL) == null) { | ||
response1.addHeader(TIMESTAMP_MILLIS_HEADER + "0"); | ||
} else { | ||
response1.addHeader(TIMESTAMP_MILLIS_HEADER | ||
+ response.headers().map().get( | ||
public CompletableFuture<Response> handleProxy(String redirectedUrl, Request request) { | ||
HttpRequest httpRequest = HttpRequest.newBuilder(URI.create(redirectedUrl)) | ||
.header(FROM_HEADER_NORMAL, selfUrl) | ||
.method(request.getMethodName(), HttpRequest.BodyPublishers.ofByteArray( | ||
request.getBody() == null ? new byte[]{} : request.getBody()) | ||
incubos marked this conversation as resolved.
Show resolved
Hide resolved
|
||
).build(); | ||
return httpClient | ||
.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray()) | ||
.thenApplyAsync(httpResponse -> { | ||
Response response1 = new Response(String.valueOf(httpResponse.statusCode()), httpResponse.body()); | ||
if (httpResponse.headers().map().get(TIMESTAMP_MILLIS_HEADER_NORMAL) == null) { | ||
response1.addHeader(TIMESTAMP_MILLIS_HEADER + "0"); | ||
} else { | ||
response1.addHeader(TIMESTAMP_MILLIS_HEADER | ||
+ httpResponse.headers().map().get( | ||
incubos marked this conversation as resolved.
Show resolved
Hide resolved
|
||
TIMESTAMP_MILLIS_HEADER_NORMAL.toLowerCase(Locale.ROOT)).getFirst() | ||
); | ||
} | ||
|
||
return response1; | ||
} catch (ExecutionException e) { | ||
logger.error("Unexpected error", e); | ||
return new Response(Response.INTERNAL_ERROR, Response.EMPTY); | ||
} catch (TimeoutException e) { | ||
logger.error("Timeout reached", e); | ||
return new Response(Response.REQUEST_TIMEOUT, Response.EMPTY); | ||
} catch (InterruptedException e) { | ||
logger.error("Service is unavailable", e); | ||
Thread.currentThread().interrupt(); | ||
return new Response(Response.SERVICE_UNAVAILABLE, Response.EMPTY); | ||
} | ||
); | ||
} | ||
return response1; | ||
}, workerExecutor).exceptionally(exception -> { | ||
logger.error("Error happened while sending async requests", exception); | ||
return new Response(Response.INTERNAL_ERROR, Response.EMPTY); | ||
}); | ||
} | ||
|
||
private Map<String, Integer> getFromAndAck(Request request) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Почему не просто
composeFuture(handleRequest(request))
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Исправила