Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Смирнов Андрей ИТМО КТ Stage 5 #181

Merged
merged 53 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
33dec72
added first realization
sandrew-uj Feb 20, 2024
4ed27a3
added wrk2
sandrew-uj Feb 21, 2024
215a47c
remove long line
sandrew-uj Feb 21, 2024
b4630db
style fix
sandrew-uj Feb 21, 2024
62ee00a
style fix1
sandrew-uj Feb 21, 2024
474f3f0
added profiling
sandrew-uj Feb 21, 2024
2cd4f41
Merge branch 'main' into main
daniil-ushkov Feb 25, 2024
ea708bc
Merge branch 'main' into main
daniil-ushkov Feb 25, 2024
6197f2f
Merge branch 'polis-vk:main' into main
sandrew-uj Feb 28, 2024
11f2f3f
added stage 2
sandrew-uj Feb 29, 2024
8ba7877
Merge branch 'main' into main
incubos Mar 1, 2024
508c025
codeclimate fix + stage2.md
sandrew-uj Mar 1, 2024
a10c6db
Merge remote-tracking branch 'origin/main'
sandrew-uj Mar 1, 2024
a0aa716
codeclimate fix1
sandrew-uj Mar 1, 2024
dd51faa
Merge branch 'main' into main
sandrew-uj Mar 7, 2024
a3d2c05
fixes in code provided
sandrew-uj Mar 7, 2024
b4e208a
codeclimate fix + added comparing
sandrew-uj Mar 7, 2024
cf7c014
images in md added
sandrew-uj Mar 7, 2024
d816a3a
Merge branch 'main' into main
sandrew-uj Mar 12, 2024
dcdc8d2
executor fix
sandrew-uj Mar 13, 2024
fabb285
style fix
sandrew-uj Mar 13, 2024
9b5f96d
added first realization
sandrew-uj Mar 14, 2024
3673f33
added better stage readme
sandrew-uj Mar 19, 2024
506da2d
Merge branch 'main' into main
sandrew-uj Mar 19, 2024
7da5b8e
fixes with asyncSend
sandrew-uj Mar 19, 2024
1deba38
Merge branch 'main' into main
sandrew-uj Mar 26, 2024
f916e7e
removed unused implements
sandrew-uj Mar 26, 2024
0023be5
Merge remote-tracking branch 'origin/main'
sandrew-uj Mar 26, 2024
fce0383
removed unused import
sandrew-uj Mar 26, 2024
2d05f98
Merge branch 'main' into stage4
sandrew-uj Mar 26, 2024
3895f6e
added realization
sandrew-uj Mar 28, 2024
f4b18fd
fixes
sandrew-uj Mar 28, 2024
31eef05
tests passed
sandrew-uj Mar 29, 2024
7a7bdf5
style fix
sandrew-uj Mar 29, 2024
8b125f3
Merge branch 'main' into stage4
sandrew-uj Mar 29, 2024
0fa4876
style fix1
sandrew-uj Mar 29, 2024
906f9e3
Merge remote-tracking branch 'origin/stage4' into stage4
sandrew-uj Mar 29, 2024
8fd039f
style fix2
sandrew-uj Mar 29, 2024
3ab4bca
flopping
sandrew-uj Mar 29, 2024
a5f3699
flopping1
sandrew-uj Mar 29, 2024
fe005fd
Merge branch 'polis-vk:main' into main
sandrew-uj Mar 29, 2024
359ca1b
Merge branch 'main' into stage4
sandrew-uj Mar 29, 2024
745c99f
added first realization
sandrew-uj Apr 11, 2024
b103ff5
codeclimate fix
sandrew-uj Apr 11, 2024
7a2d010
Merge branch 'main' into s5
sandrew-uj Apr 11, 2024
33303de
codeclimate fix1
sandrew-uj Apr 11, 2024
7b503fa
codeclimate fix2
sandrew-uj Apr 11, 2024
9877d83
Merge branch 'main' into s5
incubos Apr 13, 2024
9007e40
changes provided and readme added
May 1, 2024
fe3810b
codeclimate fix
May 1, 2024
60e8b9e
Merge branch 'main' into s5
sandrew-uj May 1, 2024
63f606d
Merge branch 'main' into s5
sandrew-uj May 13, 2024
4466032
build rollback
May 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 176 additions & 164 deletions src/main/java/ru/vk/itmo/test/smirnovandrew/MyServer.java

Large diffs are not rendered by default.

81 changes: 81 additions & 0 deletions src/main/java/ru/vk/itmo/test/smirnovandrew/MyServerDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package ru.vk.itmo.test.smirnovandrew;

import one.nio.http.Request;
import one.nio.http.Response;
import ru.vk.itmo.dao.BaseEntry;
import ru.vk.itmo.dao.Dao;
import ru.vk.itmo.dao.Entry;
import ru.vk.itmo.test.abramovilya.dao.DaoFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;

public class MyServerDao {
private final Dao<MemorySegment, Entry<MemorySegment>> dao;

public MyServerDao(Dao<MemorySegment, Entry<MemorySegment>> dao) {
this.dao = dao;
}

Response getEntryFromDao(String id) {
Entry<MemorySegment> entry = dao.get(DaoFactory.fromString(id));
if (entry == null) {
return new Response(Response.NOT_FOUND, Response.EMPTY);
}
try {
ValWithTime valueWithTimestamp = byteArrayToObject(entry.value().toArray(ValueLayout.JAVA_BYTE));
if (valueWithTimestamp.value() == null) {
Response response = new Response(Response.NOT_FOUND, Response.EMPTY);
response.addHeader("X-Timestamp: " + valueWithTimestamp.timestamp());
return response;
}
Response response = new Response(Response.OK, valueWithTimestamp.value());
response.addHeader("X-Timestamp: " + valueWithTimestamp.timestamp());
return response;
} catch (IOException | ClassNotFoundException e) {
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
}
}

Response putEntryIntoDao(String id, Request request) {
ValWithTime valueWithTimestamp = new ValWithTime(request.getBody(), System.currentTimeMillis());
try {
dao.upsert(new BaseEntry<>(DaoFactory.fromString(id),
MemorySegment.ofArray(objToByteArray(valueWithTimestamp))));
return new Response(Response.CREATED, Response.EMPTY);
} catch (IOException e) {
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
}
}

Response deleteValueFromDao(String id) {
ValWithTime valueWithTimestamp = new ValWithTime(null, System.currentTimeMillis());
try {
dao.upsert(new BaseEntry<>(DaoFactory.fromString(id),
MemorySegment.ofArray(objToByteArray(valueWithTimestamp))));
} catch (IOException e) {
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
}
return new Response(Response.ACCEPTED, Response.EMPTY);
}

private static ValWithTime byteArrayToObject(byte[] bytes) throws IOException, ClassNotFoundException {
var byteArrayInputStream = new ByteArrayInputStream(bytes);
try (var objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
return (ValWithTime) objectInputStream.readObject();
}
}

private static byte[] objToByteArray(ValWithTime object) throws IOException {
var byteArrayOutputStream = new ByteArrayOutputStream();
try (var objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
objectOutputStream.writeObject(object);
return byteArrayOutputStream.toByteArray();
}
}
}
127 changes: 127 additions & 0 deletions src/main/java/ru/vk/itmo/test/smirnovandrew/MyServerUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package ru.vk.itmo.test.smirnovandrew;

import one.nio.http.HttpServerConfig;
import one.nio.http.HttpSession;
import one.nio.http.Response;
import one.nio.server.AcceptorConfig;
import ru.vk.itmo.ServiceConfig;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.logging.Logger;

public final class MyServerUtil {
public static final String ROOT = "/v0/entity";
public static final String X_SENDER_NODE = "X-SenderNode";
public static final String X_TIMESTAMP = "X-TimeStamp";
public static final Map<Integer, String> HTTP_CODES = Map.of(
HttpURLConnection.HTTP_OK, Response.OK,
HttpURLConnection.HTTP_ACCEPTED, Response.ACCEPTED,
HttpURLConnection.HTTP_CREATED, Response.CREATED,
HttpURLConnection.HTTP_NOT_FOUND, Response.NOT_FOUND,
HttpURLConnection.HTTP_BAD_REQUEST, Response.BAD_REQUEST,
HttpURLConnection.HTTP_INTERNAL_ERROR, Response.INTERNAL_ERROR
);
public static final String NOT_ENOUGH_REPLICAS = "504 Not Enough Replicas";
public static final long DURATION = 1000L;

private MyServerUtil() {
}

public static HttpServerConfig generateServerConfig(ServiceConfig config) {
var serverConfig = new HttpServerConfig();
var acceptorsConfig = new AcceptorConfig();

acceptorsConfig.port = config.selfPort();
acceptorsConfig.reusePort = true;

serverConfig.acceptors = new AcceptorConfig[]{acceptorsConfig};
serverConfig.closeSessions = true;
return serverConfig;
}

public static void sendEmpty(HttpSession session, Logger logger, String message) {
try {
session.sendResponse(new Response(message, Response.EMPTY));
} catch (IOException e) {
logger.info(e.getMessage());
}
}

public static long headerTimestampToLong(Response r) {
String header = r.getHeader("X-TimeStamp: ");
if (header == null) {
return Long.MIN_VALUE;
}
return Long.parseLong(header);
}

public static Response processingResponse(HttpResponse<byte[]> response) {
String statusCode = HTTP_CODES.getOrDefault(response.statusCode(), null);
if (statusCode == null) {
return new Response(Response.INTERNAL_ERROR, response.body());
} else {
Response newResponse = new Response(statusCode, response.body());
long timestamp = response.headers().firstValueAsLong(X_TIMESTAMP).orElse(0);
newResponse.addHeader(X_TIMESTAMP + ": " + timestamp);
return newResponse;
}
}

public static Response getResults(
sandrew-uj marked this conversation as resolved.
Show resolved Hide resolved
int from,
int ack,
List<CompletableFuture<Response>> completableResults,
Logger logger
) throws ExecutionException, InterruptedException {
var okResponses = new ConcurrentLinkedDeque<Response>();
var okResponsesCount = new AtomicInteger();
var failedResponsesCount = new AtomicInteger();
var answer = new CompletableFuture<Response>();

BiConsumer<Response, Throwable> whenComplete = (r, throwable) -> {
if (throwable == null || r.getStatus() < 500) {
okResponsesCount.incrementAndGet();
okResponses.add(r);
sandrew-uj marked this conversation as resolved.
Show resolved Hide resolved
} else {
failedResponsesCount.incrementAndGet();
}

if (okResponsesCount.get() >= ack) {
answer.complete(okResponses.stream()
.max(Comparator.comparingLong(MyServerUtil::headerTimestampToLong))
.get());
}

if (failedResponsesCount.get() >= from - ack + 1) {
answer.complete(new Response(MyServerUtil.NOT_ENOUGH_REPLICAS, Response.EMPTY));
}
};

completableResults.forEach(completableFuture -> {
var responseFuture = completableFuture.whenComplete(whenComplete);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

На каком пуле потоков выполняется агрегация ответов от реплик?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

На том же, что задается вот в этой строчке
this.executor = new MyExecutor(corePoolSize, availableProcessors);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Так как в явном виде не меняется пул потоков (не передается другой executor), то будет выполняться на том же пуле потоков, что и completableResults, то есть на том же пуле, что и sendAsync, на потоках из httpClient.

if (responseFuture == null) {
logger.info("Error completable future is null!");
}
});

try {
return answer.get(MyServerUtil.DURATION, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
logger.info("Too long waiting for response: " + e.getMessage());
return new Response(MyServerUtil.NOT_ENOUGH_REPLICAS, Response.EMPTY);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.test.ServiceFactory;

@ServiceFactory(stage = 3)
@ServiceFactory(stage = 5)
public class MyServiceFactory implements ServiceFactory.Factory {
@Override
public Service create(ServiceConfig config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import one.nio.util.Hash;
import ru.vk.itmo.ServiceConfig;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

public class RendezvousClusterManager {

Expand All @@ -30,4 +33,13 @@ public String getCluster(String key) {

return availableClusters.get(resIdx);
}

public static List<Integer> getSortedNodes(String key, int amount, ServiceConfig config) {
sandrew-uj marked this conversation as resolved.
Show resolved Hide resolved
return IntStream.range(0, config.clusterUrls().size())
.mapToObj(i -> Map.entry(i, Hash.murmur3(key + i)))
.sorted(Comparator.<Map.Entry<Integer, Integer>>comparingInt(Map.Entry::getValue).reversed())
.map(Map.Entry::getKey)
.limit(amount)
.toList();
}
}
6 changes: 6 additions & 0 deletions src/main/java/ru/vk/itmo/test/smirnovandrew/ValWithTime.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package ru.vk.itmo.test.smirnovandrew;

import java.io.Serializable;

public record ValWithTime(byte[] value, long timestamp) implements Serializable {
}
81 changes: 81 additions & 0 deletions src/main/java/ru/vk/itmo/test/smirnovandrew/readme/stage5.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Отчет о тестировании
## Использованные методы
Все методы реализованы так же как и методы в stage1,
как именно они реализованы можно посмотреть либо в отчете
stage1.md или в папке lua

После прогрева выяснилось, что точка разладки происходит где-то
в районе 3000-3500, поэтому будем тестировать на 3000 rps

### PUT запросы
```
./wrk -d 120 -t 1 -c 64 -R 3000 -L -s /home/andrew/my-dht/2024-highload-dht/src/main/java/ru/vk/itmo/test/smirnovandrew/lua/put.lua http://localhost:8080
```
```dtd
./profiler.sh --fdtransfer -d 120 -e cpu -f put_cpu_stage3.html jps
```

![put3000.png](stage5%2Fput3000.png)
Здесь интересно, что latency начинает расти где-то около 99.9%

Анализ CPU:
![put_cpu.png](stage5%2Fput_cpu.png)
Наибольшая нагрузка приходится на ожидание свободных потоков

Также на асинхронное отправление запроса на другой шард


Анализ ALLOC:
![put_alloc.png](stage5%2Fput_alloc.png)
Большее количество памяти так же, как и в предыдущих частях
тратится на конвертацию MemorySegment в byte[] и обратно

Также тратится на расшифрофку headerов и на посылку Completable future


Анализ LOCK:
![put_lock.png](stage5%2Fput_lock.png)
Наибольная нагрузка тут приходится на ожидание таски из очереди

Также тратится на ожидание асинхронного запроса

### GET запросы
```
./wrk -d 60 -t 1 -c 64 -R 3000 -L -s /home/andrew/my-dht/2024-highload-dht/src/main/java/ru/vk/itmo/test/smirnovandrew/lua/get.lua http://localhost:8080
```
```dtd
./profiler.sh --fdtransfer -d 120 -e cpu -f get_cpu_stage3.html jps
```

![get3000.png](stage5%2Fget3000.png)
Здесь также latency серьезно растет около 99.9%

Анализ CPU:
![get_cpu.png](stage5%2Fget_cpu.png)
Наибольшая нагрузка приходится на ожидание свободных потоков

Также на асинхронное отправление запроса на другой шард

Анализ ALLOC:
![get_alloc.png](stage5%2Fget_alloc.png)
Большее количество памяти так же, как и в предыдущих частях
тратится на конвертацию MemorySegment в byte[] и обратно

Также тратится на расшифрофку headerов и на посылку Completable future



Анализ LOCK:
![get_lock.png](stage5%2Fget_lock.png)
Наибольная нагрузка тут приходится на ожидание таски из очереди

Также тратится на ожидание асинхронного запроса


### Сравнение с предыдущей реализацией
![compare.png](stage5%2Fcompare.png)
К сожалению я не делал графики на stage4, поэтому могу сравнить
показатели только со stage3. Здесь по графикам заметно, что относительно
stage3 есть значительные улучшения по latency


Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading