Skip to content

Commit

Permalink
Merge branch 'main' into 6_stage
Browse files Browse the repository at this point in the history
  • Loading branch information
lamtev authored May 9, 2024
2 parents fbd1aa9 + 92bec61 commit 9957f41
Show file tree
Hide file tree
Showing 139 changed files with 230,965 additions and 352 deletions.
86 changes: 41 additions & 45 deletions src/main/java/ru/vk/itmo/test/chebotinalexandr/Server.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,37 @@
package ru.vk.itmo.test.chebotinalexandr;

import one.nio.util.Hash;
import one.nio.async.CustomThreadFactory;
import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.dao.BaseEntry;
import ru.vk.itmo.dao.Config;
import ru.vk.itmo.dao.Dao;
import ru.vk.itmo.dao.Entry;
import ru.vk.itmo.test.chebotinalexandr.dao.Dao;
import ru.vk.itmo.test.chebotinalexandr.dao.NotOnlyInMemoryDao;
import ru.vk.itmo.test.chebotinalexandr.dao.entry.Entry;
import ru.vk.itmo.test.chebotinalexandr.dao.entry.TimestampEntry;

import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.net.http.HttpClient;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public final class Server {
private static final Random RANDOM = new Random();
private static final int ENTRIES_IN_DB = 500_000;
private static final long FLUSH_THRESHOLD_BYTES = 4_194_304L;
private static final long FLUSH_THRESHOLD_BYTES = 4_194_3040L;
private static final int BASE_PORT = 8080;
private static final int NODES = 3;
private static final int POOL_SIZE = 20;
private static final int QUEUE_CAPACITY = 256;

private Server() {

Expand Down Expand Up @@ -55,18 +60,30 @@ public static void main(String[] args) throws IOException {
new NotOnlyInMemoryDao(new Config(config.workingDir(), FLUSH_THRESHOLD_BYTES));
daoCluster[i] = dao;
ExecutorService executor = new ThreadPoolExecutor(
20,
20,
POOL_SIZE,
POOL_SIZE,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(256)
new ArrayBlockingQueue<>(QUEUE_CAPACITY)
);
StorageServer server = new StorageServer(config, dao, executor);

HttpClient httpClient = HttpClient.newBuilder()
.executor(
Executors.newFixedThreadPool(
POOL_SIZE,
new CustomThreadFactory("httpClient")
)
)
.connectTimeout(Duration.ofMillis(500))
.version(HttpClient.Version.HTTP_1_1)
.build();

StorageServer server = new StorageServer(config, dao, executor, httpClient);
server.start();

}

fillClusterNodesWithMultipleFlush(daoCluster, clusterUrls);
fillClusterNodesWithMultipleFlush(daoCluster);
}

private static int[] getRandomArray() {
Expand All @@ -87,49 +104,28 @@ private static int[] getRandomArray() {
return entries;
}

/**
* Fills all nodes in cluster with multiple sstables.
*/
private static void fillClusterNodesWithMultipleFlush(
Dao[] daoCluster,
List<String> clusterUrls
) throws IOException {
private static void fillClusterNodesWithMultipleFlush(Dao... daoCluster) throws IOException {
final int sstables = 100; //how many sstables dao must create
final int flushEntries = ENTRIES_IN_DB / sstables; //how many entries in one sstable
final int[] entriesCountInEachNode = new int[NODES];
final int[] entries = getRandomArray();

//only for GET tests with from = 3
int count = 0;
for (int entry : entries) {
//select node
int partition = selectNode(("k" + entry), clusterUrls);

//upsert entry in selected node and increment entry counter
daoCluster[partition].upsert(entry(keyAt(entry), valueAt(entry)));
entriesCountInEachNode[partition]++;

//check entry counters for ability to flush
for (int j = 0; j < entriesCountInEachNode.length; j++) {
if (entriesCountInEachNode[j] % flushEntries == 0) {
daoCluster[j].flush();
}
//upsert entry
for (int node = 0; node < NODES; node++) {
daoCluster[node].upsert(entry(keyAt(entry), valueAt(entry)));
}
}
}

private static int selectNode(String id, List<String> clusterUrls) {
Long maxHash = Long.MIN_VALUE;
int partition = -1;
count++;

for (int i = 0; i < clusterUrls.size(); i++) {
String url = clusterUrls.get(i);
long nodeHash = Hash.murmur3(url + id);
if (nodeHash > maxHash) {
maxHash = nodeHash;
partition = i;
//flush nodes
if (count == flushEntries) {
for (Dao dao : daoCluster) {
dao.flush();
}
count = 0;
}
}

return partition;
}

private static MemorySegment keyAt(int index) {
Expand All @@ -141,7 +137,7 @@ private static MemorySegment valueAt(int index) {
}

private static Entry<MemorySegment> entry(MemorySegment key, MemorySegment value) {
return new BaseEntry<>(key, value);
return new TimestampEntry<>(key, value, 0L);
}

}
Loading

0 comments on commit 9957f41

Please sign in to comment.