Skip to content

Commit

Permalink
Merge branch 'main' into lab6
Browse files Browse the repository at this point in the history
  • Loading branch information
incubos authored Apr 28, 2024
2 parents 90638e5 + 1b23246 commit d5b0ccb
Show file tree
Hide file tree
Showing 11 changed files with 475 additions and 169 deletions.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,21 @@ HTTP API расширяется query-параметрами `from` и `ack`, с
После прохождения модульных тестов, присылайте pull request с изменениями.
Наполните БД большим объёмом данных и отпрофилируйте cpu, alloc и lock при получении range всей базы одним запросом curl'ом.
Присылайте отчёт с анализом результатов и оптимизаций.

## Этап 7. Бонусный (hard deadline 2024-05-15 23:59:59 MSK)

Фичи, которые позволяют получить дополнительные баллы (при условии **добавления набора тестов**, демонстрирующих корректность, где применимо):
* Развёрнутая конструктивная обратная связь по курсу: достоинства и недостатки курса, сложность тем, предложения по улучшению
* Кластерные range-запросы с учётом шардирования и репликации
* Read repair при обнаружении расхождений между репликами
* Expire: возможность указания [времени жизни записей](https://en.wikipedia.org/wiki/Time_to_live)
* Server-side processing: трансформация данных с помощью скрипта, запускаемого на узлах кластера через API
* Нагрузочное тестирование при помощи [Y!CSB](https://github.com/brianfrankcooper/YCSB)
* Нагрузочное тестирование при помощи [Yandex.Tank](https://overload.yandex.net)
* Регулярный автоматический фоновый compaction (модульные и нагрузочные тесты)
* Hinted handoff [по аналогии с Cassandra](https://cassandra.apache.org/doc/latest/operating/hints.html)
* Устранение неконсистентностей между репликами [по аналогии с Cassandra](https://www.datastax.com/blog/advanced-repair-techniques) [nodetool repair](https://docs.datastax.com/en/archived/cassandra/2.0/cassandra/operations/ops_repair_nodes_c.html), например, на основе [Merkle Tree](https://en.wikipedia.org/wiki/Merkle_tree)
* Блочная компрессия данных на основе LZ4/zSTD/...
* Что-нибудь **своё**?

Перед началом работ продумайте и согласуйте с преподавателем её технический дизайн и получите вспомогательные материалы.
73 changes: 73 additions & 0 deletions src/main/java/ru/vk/itmo/test/reference/ReferenceHttpSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,20 @@
import one.nio.http.HttpSession;
import one.nio.http.Response;
import one.nio.net.Socket;
import ru.vk.itmo.test.reference.dao2.ReferenceBaseEntry;

import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

public class ReferenceHttpSession extends HttpSession {
private static final byte[] CRLF = "\r\n".getBytes(StandardCharsets.UTF_8);
private static final byte[] LF = "\n".getBytes(StandardCharsets.UTF_8);
private static final byte[] EMPTY_CHUNK = "0\r\n\r\n".getBytes(StandardCharsets.UTF_8);
Iterator<ReferenceBaseEntry<MemorySegment>> iterator;

public ReferenceHttpSession(Socket socket, HttpServer server) {
super(socket, server);
}
Expand All @@ -21,6 +31,51 @@ public void sendResponseOrClose(Response response) {
}
}

@Override
protected void processWrite() throws Exception {
super.processWrite();

nextChunk();
}

public void stream(Iterator<ReferenceBaseEntry<MemorySegment>> iterator) throws IOException {
this.iterator = iterator;
Response response = new Response(Response.OK);
response.addHeader("Transfer-Encoding: chunked");
writeResponse(response, false);

nextChunk();
}

private void nextChunk() throws IOException {
while (iterator.hasNext() && queueHead == null) {
ReferenceBaseEntry<MemorySegment> next = iterator.next();
ByteBuffer key = next.key().asByteBuffer();
ByteBuffer value = next.value().asByteBuffer();
int payloadSize = key.remaining() + value.remaining() + LF.length;
String payloadSizeStr = Integer.toHexString(payloadSize);
byte[] payloadSizeStrBytes = payloadSizeStr.getBytes(StandardCharsets.UTF_8);
write(payloadSizeStrBytes, 0, payloadSizeStrBytes.length);
write(CRLF, 0, CRLF.length);
write(new ReferenceQueueItem(key));
write(LF, 0, LF.length);
write(new ReferenceQueueItem(value));
write(CRLF, 0, CRLF.length);
}

if (!iterator.hasNext()) {
write(EMPTY_CHUNK, 0, EMPTY_CHUNK.length);

if ((this.handling = pipeline.pollFirst()) != null) {
if (handling == FIN) {
scheduleClose();
} else {
server.handleRequest(handling, this);
}
}
}
}

public void sendError(Throwable e) {
log.error("Exception during handleRequest", e);
try {
Expand All @@ -30,4 +85,22 @@ public void sendError(Throwable e) {
scheduleClose();
}
}

static class ReferenceQueueItem extends QueueItem {
private final ByteBuffer buffer;

ReferenceQueueItem(ByteBuffer buffer) {
this.buffer = buffer;
}

@Override
public int remaining() {
return buffer.remaining();
}

@Override
public int write(Socket socket) throws IOException {
return socket.write(buffer);
}
}
}
36 changes: 35 additions & 1 deletion src/main/java/ru/vk/itmo/test/reference/ReferenceServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -82,7 +84,21 @@ public void handleRequest(Request request, HttpSession sessionI) throws IOExcept
if (!(sessionI instanceof ReferenceHttpSession session)) {
throw new IllegalArgumentException("this method support only ReferenceHttpSession");
}
// if (config.selfPort() == 8100) {
if ("/v0/entities".equals(request.getPath())) {
executorWork.execute(new Runnable() {
@Override
public void run() {
try {
stream(request, session);
} catch (Exception e) {
session.sendError(e);
}
}
});
return;
}

// if (config.selfPort() == 8100) {
// return;
// }
String id = request.getParameter("id=");
Expand Down Expand Up @@ -158,6 +174,24 @@ public void run() {
});
}

private void stream(Request request, HttpSession session) throws IOException {
String startStr = request.getParameter("start=");
if (startStr == null || startStr.isBlank()) {
session.sendError(Response.BAD_REQUEST, "Invalid arguments");
return;
}

String endStr = request.getParameter("end=");
if (endStr != null && endStr.isBlank()) {
session.sendError(Response.BAD_REQUEST, "Invalid arguments");
return;
}

Iterator<ReferenceBaseEntry<MemorySegment>> iterator = dao.get(MemorySegment.ofArray(startStr.getBytes(StandardCharsets.UTF_8)),
endStr == null ? null : MemorySegment.ofArray(endStr.getBytes(StandardCharsets.UTF_8)));
((ReferenceHttpSession)session).stream(iterator);
}

private int getInt(Request request, String param, int defaultValue) throws IOException {
int ack;
String ackStr = request.getParameter(param);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static void shutdownAndAwaitTermination(ExecutorService pool) {
}
}

@ServiceFactory(stage = 5)
@ServiceFactory(stage = 6)
public static class Factory implements ServiceFactory.Factory {

@Override
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/ru/vk/itmo/test/reference/wrk_scripts/get3.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
math.randomseed(os.time())

function request()
counter = math.random(100000100, 100000000 + 5000000)
headers = {}
headers["Host"] = "localhost:8080"
return wrk.format("GET", "/v0/entities?start=" .. tostring(counter), headers)
end
Loading

0 comments on commit d5b0ccb

Please sign in to comment.