Skip to content

Commit

Permalink
HW5: complete todo
Browse files Browse the repository at this point in the history
  • Loading branch information
SuDarina committed May 10, 2024
1 parent e9e93eb commit b05df36
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 122 deletions.

This file was deleted.

27 changes: 11 additions & 16 deletions src/main/java/ru/vk/itmo/test/dariasupriadkina/ServiceIml.java
Original file line number Diff line number Diff line change
@@ -1,53 +1,48 @@
package ru.vk.itmo.test.dariasupriadkina;

import one.nio.async.CustomThreadFactory;
import ru.vk.itmo.Service;
import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.dao.Config;
import ru.vk.itmo.dao.Dao;
import ru.vk.itmo.test.dariasupriadkina.dao.ExtendedEntry;
import ru.vk.itmo.test.dariasupriadkina.dao.ReferenceDao;
import ru.vk.itmo.test.dariasupriadkina.sharding.ShardingPolicy;
import ru.vk.itmo.test.dariasupriadkina.workers.WorkerConfig;
import ru.vk.itmo.test.dariasupriadkina.workers.WorkerThreadPoolExecutor;
import ru.vk.itmo.test.dariasupriadkina.workers.CustomThreadConfig;
import ru.vk.itmo.test.dariasupriadkina.workers.CustomThreadPoolExecutor;

import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;

public class ServiceIml implements Service {

private final Config daoConfig;
private final ServiceConfig serviceConfig;
private final WorkerConfig workerConfig;
private final CustomThreadConfig workerConfig;
private final CustomThreadConfig nodeConfig;
private final ShardingPolicy shardingPolicy;
private final AtomicBoolean stopped = new AtomicBoolean(false);
private Server server;
private Dao<MemorySegment, ExtendedEntry<MemorySegment>> dao;
private WorkerThreadPoolExecutor workerThreadPoolExecutor;
private NodeThreadPoolExecutor nodeThreadPoolExecutor;
private CustomThreadPoolExecutor workerThreadPoolExecutor;
private CustomThreadPoolExecutor nodeThreadPoolExecutor;

public ServiceIml(ServiceConfig serviceConfig, Config daoConfig,
WorkerConfig workerConfig, ShardingPolicy shardingPolicy) {
CustomThreadConfig workerConfig, ShardingPolicy shardingPolicy,
CustomThreadConfig nodeConfig) {
this.daoConfig = daoConfig;
this.serviceConfig = serviceConfig;
this.workerConfig = workerConfig;
this.shardingPolicy = shardingPolicy;
this.nodeConfig = nodeConfig;
}

@Override
public synchronized CompletableFuture<Void> start() throws IOException {
dao = new ReferenceDao(daoConfig);
workerThreadPoolExecutor = new WorkerThreadPoolExecutor(workerConfig);
// TODO вынести параметры в отдельную конфигурацию для большей гибкости
nodeThreadPoolExecutor = new NodeThreadPoolExecutor(16,
16,
new ArrayBlockingQueue<>(1024),
new CustomThreadFactory("node-executor", true),
new ThreadPoolExecutor.AbortPolicy(), 60);
workerThreadPoolExecutor = new CustomThreadPoolExecutor(workerConfig);
nodeThreadPoolExecutor = new CustomThreadPoolExecutor(nodeConfig);
nodeThreadPoolExecutor.prestartAllCoreThreads();
workerThreadPoolExecutor.prestartAllCoreThreads();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,23 @@
import ru.vk.itmo.test.ServiceFactory;
import ru.vk.itmo.test.dariasupriadkina.sharding.RendezvousHashing;
import ru.vk.itmo.test.dariasupriadkina.sharding.ShardingPolicy;
import ru.vk.itmo.test.dariasupriadkina.workers.WorkerConfig;
import ru.vk.itmo.test.dariasupriadkina.workers.CustomThreadConfig;

import java.nio.file.Path;

@ServiceFactory(stage = 5)
public class ServiceImlFactory implements ServiceFactory.Factory {

private static final long FLUSH_THRESHOLD_BYTES = 1024 * 1024;
private static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors();
private static final int MAXIMUM_POOL_SIZE = Runtime.getRuntime().availableProcessors();
private static final int QUEUE_SIZE = 1024;
private static final int SHUTDOWN_TIMEOUT_SEC = 30;
public static final long FLUSH_THRESHOLD_BYTES = 1024 * 1024;

@Override
public Service create(ServiceConfig serviceConfig) {
ShardingPolicy shardingPolicy = new RendezvousHashing(
serviceConfig.clusterUrls()
);
Config referenceDaoConfig = new Config(Path.of(serviceConfig.workingDir().toUri()), FLUSH_THRESHOLD_BYTES);
WorkerConfig workerConfig = new WorkerConfig(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE,
QUEUE_SIZE, SHUTDOWN_TIMEOUT_SEC);
return new ServiceIml(serviceConfig, referenceDaoConfig, workerConfig, shardingPolicy);
CustomThreadConfig workerConfig = CustomThreadConfig.baseConfig("worker-thread");
CustomThreadConfig nodeConfig = CustomThreadConfig.baseConfig("node-thread");
return new ServiceIml(serviceConfig, referenceDaoConfig, workerConfig, shardingPolicy, nodeConfig);
}
}
12 changes: 6 additions & 6 deletions src/main/java/ru/vk/itmo/test/dariasupriadkina/TestServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import ru.vk.itmo.dao.Config;
import ru.vk.itmo.test.dariasupriadkina.sharding.RendezvousHashing;
import ru.vk.itmo.test.dariasupriadkina.sharding.ShardingPolicy;
import ru.vk.itmo.test.dariasupriadkina.workers.WorkerConfig;
import ru.vk.itmo.test.dariasupriadkina.workers.CustomThreadConfig;

import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -18,10 +18,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static ru.vk.itmo.test.dariasupriadkina.ServiceImlFactory.FLUSH_THRESHOLD_BYTES;

public final class TestServer {

private static final int THREADS = Runtime.getRuntime().availableProcessors();
private static final int QUEUE_SIZE = 1024;
private static final String LOCALHOST_PREFIX = "http://localhost:";
private static final int NODE_AMOUNT = 3;

Expand Down Expand Up @@ -56,9 +56,9 @@ public static void main(String[] args) throws IOException, ExecutionException,

for (ServiceConfig serviceConfig : clusterConfs) {
ServiceIml serviceIml = new ServiceIml(serviceConfig, new Config(serviceConfig.workingDir(),
1024 * 1024),
new WorkerConfig(THREADS * 2, THREADS * 2,
QUEUE_SIZE, 60), shardingPolicy);
FLUSH_THRESHOLD_BYTES),
CustomThreadConfig.baseConfig("worker-thread"), shardingPolicy,
CustomThreadConfig.baseConfig("node-thread"));
serviceIml.start().get(2, TimeUnit.SECONDS);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package ru.vk.itmo.test.dariasupriadkina.workers;

import one.nio.async.CustomThreadFactory;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadConfig {

public static final long KEEP_ALIVE_TIME = 1000L;
public static final TimeUnit KEEP_ALIVE_TIME_SECONDS = TimeUnit.SECONDS;
public static final int QUEUE_SIZE = 1024;
public static final int THREADS = Runtime.getRuntime().availableProcessors();
public static final int SHUTDOWN_TIMEOUT_SEC = 60;

private final int corePoolSize;
private final int maximumPoolSize;
private final int shutdownTimeoutSec;
private final ArrayBlockingQueue<Runnable> workQueue;
private final CustomThreadFactory threadFactory;
private final RejectedExecutionHandler handler;

public CustomThreadConfig(int corePoolSize, int maximumPoolSize, int queueSize, int shutdownTimeoutSec,
String threadName, RejectedExecutionHandler handler) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = new ArrayBlockingQueue<>(queueSize);
this.shutdownTimeoutSec = shutdownTimeoutSec;
this.threadFactory = new CustomThreadFactory(threadName, true);
this.handler = handler;
}

public static CustomThreadConfig baseConfig(String threadName) {
return new CustomThreadConfig(THREADS * 2, THREADS * 2,
QUEUE_SIZE, SHUTDOWN_TIMEOUT_SEC, threadName, new ThreadPoolExecutor.AbortPolicy());
}

public int getCorePoolSize() {
return corePoolSize;
}

public int getMaximumPoolSize() {
return maximumPoolSize;
}

public BlockingQueue<Runnable> getWorkQueue() {
return workQueue;
}

public int getShutdownTimeoutSec() {
return shutdownTimeoutSec;
}

public CustomThreadFactory getThreadFactory() {
return threadFactory;
}

public RejectedExecutionHandler getHandler() {
return handler;
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
package ru.vk.itmo.test.dariasupriadkina.workers;

import one.nio.async.CustomThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class WorkerThreadPoolExecutor extends ThreadPoolExecutor {
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {

private static final Logger logger = LoggerFactory.getLogger(WorkerThreadPoolExecutor.class.getName());
private static final Logger logger = LoggerFactory.getLogger(CustomThreadPoolExecutor.class.getName());
private final int shutdownTimeoutSec;

public WorkerThreadPoolExecutor(WorkerConfig workerConfig) {
public CustomThreadPoolExecutor(CustomThreadConfig workerConfig) {
super(workerConfig.getCorePoolSize(), workerConfig.getMaximumPoolSize(),
WorkerConfig.KEEP_ALIVE_TIME, WorkerConfig.KEEP_ALIVE_TIME_SECONDS,
workerConfig.getWorkQueue(), new CustomThreadFactory("worker-thread", true),
new AbortPolicy());
CustomThreadConfig.KEEP_ALIVE_TIME, CustomThreadConfig.KEEP_ALIVE_TIME_SECONDS,
workerConfig.getWorkQueue(), workerConfig.getThreadFactory(),
workerConfig.getHandler());
this.shutdownTimeoutSec = workerConfig.getShutdownTimeoutSec();
}

Expand Down

This file was deleted.

0 comments on commit b05df36

Please sign in to comment.