Skip to content

Commit

Permalink
Codeclimate
Browse files Browse the repository at this point in the history
  • Loading branch information
persehoney committed Jun 26, 2024
1 parent 3548972 commit 4cf9bf9
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 66 deletions.
20 changes: 10 additions & 10 deletions src/main/java/ru/vk/itmo/test/solnyshkoksenia/CustomSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.concurrent.Flow;

public class CustomSubscriber implements BodySubscriber<byte[]> {
private static final byte delimiter = '\n';
private static final byte DELIMITER = '\n';
volatile CompletableFuture<byte[]> bodyCF;
Flow.Subscription subscription;
List<ByteBuffer> responseData = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -59,15 +59,15 @@ private List<byte[]> toArrays(List<ByteBuffer> buffers) {

private boolean containsDelim(byte[] bytes) {
for (int element : bytes) {
if (element == delimiter) {
if (element == DELIMITER) {
return true;
}
}
return false;
}

private boolean startsWithDelim(byte[] bytes) {
return delimiter == bytes[0];
return DELIMITER == bytes[0];
}

private byte[] merge(byte[] src1, byte[] src2) {
Expand All @@ -84,7 +84,7 @@ private byte[] toArray(List<byte[]> bytes) {
for (byte[] src : bytes) {
System.arraycopy(src, 0, dst, offset, src.length);
offset += src.length;
dst[offset] = delimiter;
dst[offset] = DELIMITER;
offset++;
}
return dst;
Expand All @@ -100,20 +100,20 @@ private byte[] toBytes(List<ByteBuffer> buffers) {
byte[] pred = null;
for (int i = 0; i < chunks.size(); i++) {
byte[] cur = chunks.get(i);
byte[] next = i + 1 == chunks.size() ? new byte[0] : chunks.get(i + 1);
if (startsWithDelim(cur) && !predContainsDelim) {
cur = merge(pred, cur);
} else if (!containsDelim(cur) && predEndsWithDelim) {
byte[] next = chunks.get(i + 1);
if (!startsWithDelim(next)) {
cur = merge(pred, cur);
} else {
if (startsWithDelim(next)) {
bytes.add(pred);
} else {
cur = merge(pred, cur);
}
} else if (pred != null) {
bytes.add(pred);
}

predEndsWithDelim = delimiter == cur[cur.length - 1];
predEndsWithDelim = DELIMITER == cur[cur.length - 1];
predContainsDelim = containsDelim(cur);
pred = cur;
}
Expand All @@ -123,4 +123,4 @@ private byte[] toBytes(List<ByteBuffer> buffers) {
}
return toArray(bytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
public class MergeRangeResult {
private static final Comparator<MemorySegment> comparator = new MemorySegmentComparator();

private MergeRangeResult() {
}

public static Iterator<Entry<MemorySegment>> range(Iterator<Entry<MemorySegment>> firstIterator,
List<Response> responses) {
List<Iterator<Entry<MemorySegment>>> iterators = new ArrayList<>(responses.size() + 1);
Expand All @@ -36,7 +39,7 @@ private static Iterator<Entry<MemorySegment>> iterator(Response response) {
byte[] body = response.getBody();
char separator = '\n';
return new Iterator<>() {
int offset = 0;
int offset;

@Override
public boolean hasNext() {
Expand Down
96 changes: 42 additions & 54 deletions src/main/java/ru/vk/itmo/test/solnyshkoksenia/MyHttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import one.nio.http.Param;
import one.nio.http.Path;
import one.nio.http.Request;
import one.nio.http.RequestMethod;
import one.nio.http.Response;
import one.nio.net.Socket;
import one.nio.server.AcceptorConfig;
Expand Down Expand Up @@ -116,22 +117,12 @@ public void handleDefault(Request request, HttpSession session) {
}

@Path("/v0/entities")
@RequestMethod(Request.METHOD_GET)
public void handleRangeRequest(final Request request, final HttpSession session,
@Param(value = "start", required = true) String start,
@Param(value = "end") String end, @Param(value = "cluster") String cluster)
throws IOException {
if (request.getMethod() != Request.METHOD_GET) {
session.sendResponse(new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY));
return;
}

if (start == null || start.isBlank()) {
session.sendError(Response.BAD_REQUEST, "Start parameter required");
return;
}

if (end != null && !end.isBlank() && start.compareTo(end) >= 0) {
session.sendError(Response.BAD_REQUEST, "Start parameter should be less than end parameter");
if (!ServerUtils.validParameters(session, start, end)) {
return;
}

Expand All @@ -144,7 +135,7 @@ public void handleRangeRequest(final Request request, final HttpSession session,
try {
sendLocalRange((CustomHttpSession) session, start, end);
} catch (IOException e) {
throw new RuntimeException(e);
throw new UncheckedIOException(e);
}
});
}
Expand All @@ -159,24 +150,9 @@ private void sendLocalRange(CustomHttpSession session, String start, String end)
}

private void sendClusterRange(CustomHttpSession session, Request request, String start, String end) {
List<CompletableFuture<Response>> responses = new ArrayList<>();

List<CompletableFuture<Response>> responses = getResponses(request, config.clusterUrls(), request.getURI().replace("&cluster=1", ""),
responseInfo -> new CustomSubscriber());
Iterator<Entry<MemorySegment>> localIterator = invokeLocalRange(start, end);
for (String node : config.clusterUrls()) {
if (!node.equals(config.selfUrl())) {
try {
responses.add(invokeRemote(node, request, request.getURI().replace("&cluster=1", ""),
responseInfo -> new CustomSubscriber()));
} catch (IOException e) {
responses.add(CompletableFuture.completedFuture(new Response(Response.INTERNAL_ERROR,
Response.EMPTY)));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
responses.add(CompletableFuture.completedFuture(new Response(Response.SERVICE_UNAVAILABLE,
Response.EMPTY)));
}
}
}

executorLocal.execute(() -> {
List<Response> completedResponses = responses
Expand All @@ -194,13 +170,15 @@ private void sendClusterRange(CustomHttpSession session, Request request, String
try {
sendRangeResponse(session, localIterator, completedResponses);
} catch (IOException e) {
throw new RuntimeException(e);
throw new UncheckedIOException(e);
}
});
}

private void sendRangeResponse(CustomHttpSession session, Iterator<Entry<MemorySegment>> firstIterator, List<Response> responses) throws IOException {
Iterator<Entry<MemorySegment>> iterator = MergeRangeResult.range(firstIterator, responses.stream().filter(r -> r.getStatus() == HttpURLConnection.HTTP_OK).toList());
private void sendRangeResponse(CustomHttpSession session, Iterator<Entry<MemorySegment>> firstIterator,
List<Response> responses) throws IOException {
Iterator<Entry<MemorySegment>> iterator = MergeRangeResult.range(firstIterator, responses.stream()
.filter(r -> r.getStatus() == HttpURLConnection.HTTP_OK).toList());
session.stream(iterator);
}

Expand Down Expand Up @@ -242,16 +220,13 @@ public void handleRequest(final Request request, final HttpSession session,
handle(request, id, session, ack, from);
}

private void handle(Request request, String id, HttpSession session, Integer ack, Integer from) {
List<String> executorNodes = ServerUtils.getNodesByEntityId(config.clusterUrls(), id, from);
private List<CompletableFuture<Response>> getResponses(Request request, List<String> executorNodes, String uri,
HttpResponse.BodyHandler<byte[]> responseBodyHandler) {
List<CompletableFuture<Response>> responses = new ArrayList<>();

for (String node : executorNodes) {
if (node.equals(config.selfUrl())) {
responses.add(CompletableFuture.completedFuture(invokeLocal(request, id)));
} else {
if (!node.equals(config.selfUrl())) {
try {
responses.add(invokeRemote(node, request, request.getURI() + "&local=1", HttpResponse.BodyHandlers.ofByteArray()));
responses.add(invokeRemote(node, request, uri, responseBodyHandler));
} catch (IOException e) {
responses.add(CompletableFuture.completedFuture(new Response(Response.INTERNAL_ERROR,
Response.EMPTY)));
Expand All @@ -262,6 +237,16 @@ private void handle(Request request, String id, HttpSession session, Integer ack
}
}
}
return responses;
}

private void handle(Request request, String id, HttpSession session, Integer ack, Integer from) {
List<String> executorNodes = ServerUtils.getNodesByEntityId(config.clusterUrls(), id, from);
List<CompletableFuture<Response>> responses = getResponses(request, executorNodes,
request.getURI() + "&local=1", HttpResponse.BodyHandlers.ofByteArray());
if (executorNodes.contains(config.selfUrl())) {
responses.add(CompletableFuture.completedFuture(invokeLocal(request, id)));
}

executorLocal.execute(() -> {
List<Response> completedResponses = responses
Expand Down Expand Up @@ -300,20 +285,7 @@ private void sendResponse(Request request, HttpSession session, List<Response> r
return;
}

responses = responses.stream().filter(r -> r.getStatus() == HttpURLConnection.HTTP_OK
|| r.getStatus() == HttpURLConnection.HTTP_NOT_FOUND).toList();

Response bestResp = responses.getFirst();
for (int i = 1; i < responses.size(); i++) {
String bestRespTime = bestResp.getHeader(HEADER_TIMESTAMP_HEADER);
if (responses.get(i).getHeader(HEADER_TIMESTAMP) != null) {
if (bestRespTime == null || Long.parseLong(responses.get(i).getHeader(HEADER_TIMESTAMP_HEADER))
> Long.parseLong(bestRespTime)) {
bestResp = responses.get(i);
}
}
}
session.sendResponse(bestResp);
session.sendResponse(getBestResponse(responses));
}
case Request.METHOD_PUT -> {
if (statuses.stream().filter(s -> s == HttpURLConnection.HTTP_CREATED).count() < ack) {
Expand All @@ -333,6 +305,22 @@ private void sendResponse(Request request, HttpSession session, List<Response> r
}
}

private static Response getBestResponse(List<Response> responses) {
responses = responses.stream().filter(r -> r.getStatus() == HttpURLConnection.HTTP_OK
|| r.getStatus() == HttpURLConnection.HTTP_NOT_FOUND).toList();

Response bestResp = responses.getFirst();
for (int i = 1; i < responses.size(); i++) {
String bestRespTime = bestResp.getHeader(HEADER_TIMESTAMP_HEADER);
if (responses.get(i).getHeader(HEADER_TIMESTAMP) != null && (bestRespTime == null ||
Long.parseLong(responses.get(i).getHeader(HEADER_TIMESTAMP_HEADER))
> Long.parseLong(bestRespTime))) {
bestResp = responses.get(i);
}
}
return bestResp;
}

@SuppressWarnings("FutureReturnValueIgnored")
private CompletableFuture<Response> invokeRemote(String executorNode, Request request, String uri,
HttpResponse.BodyHandler<byte[]> responseBodyHandler)
Expand Down
18 changes: 17 additions & 1 deletion src/main/java/ru/vk/itmo/test/solnyshkoksenia/ServerUtils.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package ru.vk.itmo.test.solnyshkoksenia;

import one.nio.http.HttpSession;
import one.nio.http.Request;
import one.nio.http.Response;
import one.nio.util.Hash;
import ru.vk.itmo.dao.Entry;

import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.net.URI;
Expand All @@ -19,6 +21,7 @@
public class ServerUtils {
private static final String HEADER_TIMESTAMP = "X-timestamp";
private static final String HEADER_TIMESTAMP_HEADER = HEADER_TIMESTAMP + ": ";

protected static MemorySegment toMS(String input) {
return MemorySegment.ofArray(input.getBytes(StandardCharsets.UTF_8));
}
Expand Down Expand Up @@ -52,6 +55,20 @@ protected static Response makeResponse(HttpResponse<byte[]> httpResponse) {
return response;
}

protected static boolean validParameters(final HttpSession session, String start, String end) throws IOException {
if (start == null || start.isBlank()) {
session.sendError(Response.BAD_REQUEST, "Start parameter required");
return false;
}

if (end != null && !end.isBlank() && start.compareTo(end) >= 0) {
session.sendError(Response.BAD_REQUEST, "Start parameter should be less than end parameter");
return false;
}

return true;
}

protected static List<String> getNodesByEntityId(List<String> urls, String id, Integer from) {
List<Node> executorNodes = new ArrayList<>();

Expand Down Expand Up @@ -84,7 +101,6 @@ private static String toString(MemorySegment memorySegment) {
StandardCharsets.UTF_8);
}


@SuppressWarnings("unused")
private record Node(int id, int hash) implements Comparable<Node> {
@Override
Expand Down

0 comments on commit 4cf9bf9

Please sign in to comment.