Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Решетников Алексей, ИТМО DWS, stage 6 #222

Closed
wants to merge 39 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
24c8e17
stage2
lehatheslayer Feb 29, 2024
76597ad
fix
lehatheslayer Feb 29, 2024
421e4da
codeclimate
lehatheslayer Feb 29, 2024
83f3164
codeclimate #2
lehatheslayer Feb 29, 2024
6cfe8ff
fixes
lehatheslayer Mar 6, 2024
8d43174
report
lehatheslayer Mar 6, 2024
ecb5dae
Merge branch 'main' into stage2
lehatheslayer Mar 6, 2024
2734f6a
Merge branch 'polis-vk:main' into stage2
lehatheslayer Mar 9, 2024
ca7cad6
Merge branch 'polis-vk:main' into stage2
lehatheslayer Mar 14, 2024
4eebbd4
graceful shutdown
lehatheslayer Mar 14, 2024
af4490c
stage3
lehatheslayer Mar 14, 2024
1c23f64
style
lehatheslayer Mar 14, 2024
c3d05ae
style
lehatheslayer Mar 14, 2024
1069c54
style
lehatheslayer Mar 14, 2024
bce25f9
style
lehatheslayer Mar 14, 2024
3467c34
style
lehatheslayer Mar 14, 2024
646f98d
style
lehatheslayer Mar 14, 2024
4b197fd
style
lehatheslayer Mar 14, 2024
f49309c
Merge branch 'main' into stage3
incubos Mar 15, 2024
654b50f
Merge branch 'main' into stage3
lehatheslayer Mar 20, 2024
fb00785
fixes
lehatheslayer Mar 20, 2024
cc2744e
report
lehatheslayer Mar 20, 2024
02f0384
style
lehatheslayer Mar 20, 2024
a0e4d52
Merge branch 'polis-vk:main' into stage3
lehatheslayer Mar 28, 2024
484d7a3
stage 4
lehatheslayer Mar 28, 2024
958958b
style
lehatheslayer Mar 28, 2024
5661ef5
style
lehatheslayer Mar 28, 2024
2c87ae8
style
lehatheslayer Mar 28, 2024
c7ad00d
Merge branch 'polis-vk:main' into stage4
lehatheslayer Apr 3, 2024
20222b0
report
lehatheslayer Apr 3, 2024
5660d92
Merge branch 'main' into stage4
lehatheslayer Apr 11, 2024
2a5a140
stage5
lehatheslayer Apr 11, 2024
082ed1a
style
lehatheslayer Apr 11, 2024
08cb80e
Merge branch 'main' into stage5
incubos Apr 13, 2024
29e0d47
Merge branch 'polis-vk:main' into stage5
lehatheslayer Apr 17, 2024
6d0b52b
stage5
lehatheslayer Apr 17, 2024
f290ebb
update report
lehatheslayer Apr 17, 2024
679cce0
Merge branch 'main' into stage5
lehatheslayer Apr 25, 2024
ae49a5c
stage6
lehatheslayer Apr 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package ru.vk.itmo.test.reshetnikovaleksei;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import one.nio.util.ByteArrayBuilder;
import ru.vk.itmo.test.reshetnikovaleksei.dao.Entry;

import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

public class ChunkedResponseBuilder {
private static final byte[] CRLF = "\r\n".getBytes(StandardCharsets.UTF_8);
private static final byte[] DELIMITER = "\n".getBytes(StandardCharsets.UTF_8);
private static final byte[] HTTP_OK = "HTTP/1.1 200 OK".getBytes(StandardCharsets.UTF_8);
private static final byte[] CONTENT_TYPE = "Content-Type: text/plain".getBytes(StandardCharsets.UTF_8);
private static final byte[] TRANSFER_ENCODING = "Transfer-Encoding: chunked".getBytes(StandardCharsets.UTF_8);
private static final byte[] CONNECTION = "Connection: keep-alive".getBytes(StandardCharsets.UTF_8);

private final ByteArrayBuilder builder;

public ChunkedResponseBuilder() {
this.builder = new ByteArrayBuilder();
}

@CanIgnoreReturnValue
public ChunkedResponseBuilder withHeader() {
builder.append(HTTP_OK, 0, HTTP_OK.length);
builder.append(CRLF, 0, CRLF.length);
builder.append(CONTENT_TYPE, 0, CONTENT_TYPE.length);
builder.append(CRLF, 0, CRLF.length);
builder.append(TRANSFER_ENCODING, 0, TRANSFER_ENCODING.length);
builder.append(CRLF, 0, CRLF.length);
builder.append(CONNECTION, 0, CONNECTION.length);
builder.append(CRLF, 0, CRLF.length);
builder.append(CRLF, 0, CRLF.length);

return this;
}

@CanIgnoreReturnValue
public ChunkedResponseBuilder withData(Iterator<Entry<MemorySegment>> iterator) {
while (iterator.hasNext()) {
Entry<MemorySegment> entry = iterator.next();

byte[] key = entry.key().toArray(ValueLayout.JAVA_BYTE);
byte[] value = entry.value().toArray(ValueLayout.JAVA_BYTE);
byte[] length = Integer.toHexString(key.length + value.length + DELIMITER.length)
.getBytes(StandardCharsets.UTF_8);

builder.append(length, 0, length.length);
builder.append(CRLF, 0, CRLF.length);
builder.append(key, 0, key.length);
builder.append(DELIMITER, 0, DELIMITER.length);
builder.append(value, 0, value.length);
builder.append(CRLF, 0, CRLF.length);
}

return this;
}

@CanIgnoreReturnValue
public ChunkedResponseBuilder withEnd() {
byte[] dataSize = Integer.toHexString(0).getBytes(StandardCharsets.UTF_8);
builder.append(dataSize, 0, dataSize.length);
builder.append(CRLF, 0, CRLF.length);
builder.append(CRLF, 0, CRLF.length);

return this;
}

public byte[] build() {
return builder.toBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@
import java.util.concurrent.atomic.AtomicLong;

public class CustomThreadFactory implements ThreadFactory {
private static final String THREAD_NAME_PREFIX = "mega-thread-";
private static final AtomicLong THREAD_COUNTER = new AtomicLong();

private final String threadNamePrefix;

public CustomThreadFactory(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
}

@Override
public Thread newThread(Runnable r) {
return new Thread(r, THREAD_NAME_PREFIX + THREAD_COUNTER.getAndIncrement());
return new Thread(r, threadNamePrefix + THREAD_COUNTER.getAndIncrement());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ private ExecutorServiceFactory() {

}

public static ExecutorService createExecutorService() {
public static ExecutorService createExecutorService(String threadNamePrefix) {
return new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME_SECONDS,
UNIT, QUEUE, new CustomThreadFactory(), new ThreadPoolExecutor.AbortPolicy()
UNIT, QUEUE, new CustomThreadFactory(threadNamePrefix), new ThreadPoolExecutor.AbortPolicy()
);
}
}
166 changes: 118 additions & 48 deletions src/main/java/ru/vk/itmo/test/reshetnikovaleksei/HttpServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import one.nio.http.Param;
import one.nio.http.Path;
import one.nio.http.Request;
import one.nio.http.RequestMethod;
import one.nio.http.Response;
import one.nio.server.AcceptorConfig;
import one.nio.util.Utf8;
Expand All @@ -22,8 +23,9 @@
import java.lang.foreign.ValueLayout;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
Expand All @@ -39,17 +41,20 @@ public class HttpServerImpl extends HttpServer {

private final Dao<MemorySegment, Entry<MemorySegment>> dao;
private final ExecutorService executorService;
private final ExecutorService localExecutorService;
private final RequestRouter requestRouter;
private final String selfUrl;
private final int clusterSize;

public HttpServerImpl(ServiceConfig config,
Dao<MemorySegment, Entry<MemorySegment>> dao,
ExecutorService executorService,
ExecutorService localExecutorService,
RequestRouter requestRouter) throws IOException {
super(createConfig(config));
this.dao = dao;
this.executorService = executorService;
this.localExecutorService = localExecutorService;
this.requestRouter = requestRouter;
this.selfUrl = config.selfUrl();
this.clusterSize = config.clusterUrls().size();
Expand Down Expand Up @@ -95,11 +100,48 @@ public void handleRequest(Request request, HttpSession session) throws IOExcepti
}
}

@Path("/v0/entities")
@RequestMethod(Request.METHOD_GET)
public void entities(Request request,
HttpSession session,
@Param(value = "start", required = true) String startId,
@Param(value = "end") String endId) {
if (startId == null || startId.isBlank() || (endId != null && endId.isBlank())) {
try {
session.sendResponse(new Response(Response.BAD_REQUEST, Response.EMPTY));
} catch (IOException e) {
processIOException(request, session, e);
}

return;
}

Iterator<Entry<MemorySegment>> iterator = dao.get(
MemorySegment.ofArray(startId.getBytes(StandardCharsets.UTF_8)),
endId == null
? null
: MemorySegment.ofArray(endId.getBytes(StandardCharsets.UTF_8))
);

byte[] responseBytes = new ChunkedResponseBuilder()
.withHeader()
.withData(iterator)
.withEnd()
.build();
try {
session.write(responseBytes, 0, responseBytes.length);
} catch (IOException e) {
processIOException(request, session, e);
}
session.scheduleClose();
}

@Path("/v0/entity")
public Response entity(Request request,
@Param(value = "id") String id,
@Param(value = "from") Integer from,
@Param(value = "ack") Integer ack) {
@Param(value = "ack") Integer ack
) throws ExecutionException, InterruptedException {
if (id == null || id.isBlank()) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}
Expand All @@ -111,69 +153,71 @@ public Response entity(Request request,
if (ack == null) {
ack = (from + 1) / 2;
}

if (from < 0 || from > clusterSize || from < ack || ack <= 0) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}

if (request.getHeader(REDIRECTED_REQUEST_HEADER_NAME) == null) {
List<Response> not5xxResponses = getNot5xxResponses(
List<Response> notErrorResponses = extractFuturesAndGetNotErrorResponses(
sendRequestsAndGetResponses(
request, id, requestRouter.getNodesByEntityId(id, from)
)
);

if (not5xxResponses.size() >= ack) {
if (request.getMethod() == Request.METHOD_GET) {
not5xxResponses.sort(Comparator.comparingLong(r ->
Long.parseLong(r.getHeader(TIMESTAMP_HEADER_NAME + ": "))));
return not5xxResponses.getLast();
} else {
return not5xxResponses.getFirst();
}
if (notErrorResponses.size() >= ack) {
return request.getMethod() == Request.METHOD_GET
? getLastResponse(notErrorResponses)
: notErrorResponses.getFirst();
}

return new Response(NOT_ENOUGH_REPLICAS, Response.EMPTY);
}

return invokeLocal(request, id);
return invokeLocal(request, id).get();
}

private Response invokeLocal(Request request, String id) {
switch (request.getMethod()) {
case Request.METHOD_GET -> {
MemorySegment key = MemorySegment.ofArray(id.getBytes(StandardCharsets.UTF_8));
Entry<MemorySegment> entry = dao.get(key);
if (entry == null || entry.value() == null) {
Response response = new Response(Response.NOT_FOUND, Response.EMPTY);
response.addHeader(TIMESTAMP_HEADER_NAME + ": " + (entry != null ? entry.timestamp() : 0));
private CompletableFuture<Response> invokeLocal(Request request, String id) {
CompletableFuture<Response> completableFuture = new CompletableFuture<>();

return response;
}
localExecutorService.execute(() -> {
switch (request.getMethod()) {
case Request.METHOD_GET -> {
MemorySegment key = MemorySegment.ofArray(id.getBytes(StandardCharsets.UTF_8));
Entry<MemorySegment> entry = dao.get(key);
if (entry == null || entry.value() == null) {
Response response = new Response(Response.NOT_FOUND, Response.EMPTY);
response.addHeader(TIMESTAMP_HEADER_NAME + ": " + (entry != null ? entry.timestamp() : 0));

Response response = Response.ok(entry.value().toArray(ValueLayout.JAVA_BYTE));
response.addHeader(TIMESTAMP_HEADER_NAME + ": " + entry.timestamp());
return response;
}
case Request.METHOD_PUT -> {
MemorySegment key = MemorySegment.ofArray(Utf8.toBytes(id));
MemorySegment value = MemorySegment.ofArray(request.getBody());
dao.upsert(new BaseEntry<>(key, value, System.currentTimeMillis()));
return new Response(Response.CREATED, Response.EMPTY);
}
case Request.METHOD_DELETE -> {
MemorySegment key = MemorySegment.ofArray(id.getBytes(StandardCharsets.UTF_8));
dao.upsert(new BaseEntry<>(key, null, System.currentTimeMillis()));
return new Response(Response.ACCEPTED, Response.EMPTY);
}
default -> {
return new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY);
completableFuture.complete(response);
return;
}

Response response = Response.ok(entry.value().toArray(ValueLayout.JAVA_BYTE));
response.addHeader(TIMESTAMP_HEADER_NAME + ": " + entry.timestamp());
completableFuture.complete(response);
}
case Request.METHOD_PUT -> {
MemorySegment key = MemorySegment.ofArray(Utf8.toBytes(id));
MemorySegment value = MemorySegment.ofArray(request.getBody());
dao.upsert(new BaseEntry<>(key, value, System.currentTimeMillis()));
completableFuture.complete(new Response(Response.CREATED, Response.EMPTY));
}
case Request.METHOD_DELETE -> {
MemorySegment key = MemorySegment.ofArray(id.getBytes(StandardCharsets.UTF_8));
dao.upsert(new BaseEntry<>(key, null, System.currentTimeMillis()));
completableFuture.complete(new Response(Response.ACCEPTED, Response.EMPTY));
}
default -> completableFuture.complete(new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY));
}
}
});

return completableFuture;
}

private List<Response> sendRequestsAndGetResponses(Request request, String id, List<String> nodes) {
List<Response> responses = new ArrayList<>();
private List<CompletableFuture<Response>> sendRequestsAndGetResponses(
Request request, String id, List<String> nodes
) {
List<CompletableFuture<Response>> responses = new ArrayList<>();
for (String node : nodes) {
if (node.equals(selfUrl)) {
responses.add(invokeLocal(request, id));
Expand All @@ -184,31 +228,57 @@ private List<Response> sendRequestsAndGetResponses(Request request, String id, L
responses.add(requestRouter.redirect(node, request, id));
} catch (TimeoutException e) {
LOGGER.error("timeout while invoking remote node");
responses.add(new Response(Response.REQUEST_TIMEOUT, Response.EMPTY));
responses.add(CompletableFuture.completedFuture(
new Response(Response.REQUEST_TIMEOUT, Response.EMPTY)));
} catch (ExecutionException | IOException e) {
LOGGER.error("I/O exception while calling remote node");
responses.add(new Response(Response.INTERNAL_ERROR, Response.EMPTY));
responses.add(CompletableFuture.completedFuture(
new Response(Response.INTERNAL_ERROR, Response.EMPTY)));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error("Thread interrupted");
responses.add(new Response(Response.SERVICE_UNAVAILABLE, Response.EMPTY));
responses.add(CompletableFuture.completedFuture(
new Response(Response.SERVICE_UNAVAILABLE, Response.EMPTY)));
}
}

return responses;
}

private List<Response> getNot5xxResponses(List<Response> responses) {
private List<Response> extractFuturesAndGetNotErrorResponses(List<CompletableFuture<Response>> futures) {
List<Response> responses = futures.stream().map(future -> {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Got error while redirecting request: {}", e.getMessage());
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
}
}).toList();

List<Response> not5xxResponses = new ArrayList<>();
for (Response response : responses) {
if (response.getStatus() < 500) {
if (response.getStatus() < 500 && response.getStatus() != 429) {
not5xxResponses.add(response);
}
}

return not5xxResponses;
}

private Response getLastResponse(List<Response> notErrorResponses) {
Response lastResponse = null;
long maxTimestamp = Long.MIN_VALUE;

for (Response response : notErrorResponses) {
long timestamp = Long.parseLong(response.getHeader(TIMESTAMP_HEADER_NAME + ": "));
if (timestamp > maxTimestamp) {
maxTimestamp = timestamp;
lastResponse = response;
}
}
return lastResponse;
}

private void processIOException(Request request, HttpSession session, IOException e) {
LOGGER.error("Failed to send response for request: {} with error: {}", request, e.getMessage());
session.close();
Expand Down
Loading
Loading