Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Супрядкина Дарья ИТМО DWS stage 5 #180

Merged
merged 64 commits into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from 61 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
2f2080a
HW1: add realization
SuDarina Feb 18, 2024
a18dc95
HW1: fix code style
SuDarina Feb 19, 2024
528366b
HW1: change exception constructor
SuDarina Feb 19, 2024
990d153
HW1: add report
SuDarina Feb 21, 2024
3587983
HW1: add flame graphs description
SuDarina Feb 21, 2024
75db05e
HW1: add flame graphs description
SuDarina Feb 21, 2024
3c9655b
HW1: fix code style
SuDarina Feb 21, 2024
ad686bc
HW1: fixes according to review
SuDarina Feb 27, 2024
1d5c62c
HW2: first version of HW2
SuDarina Feb 28, 2024
825f223
HW2: fix codestyle
SuDarina Feb 29, 2024
8fe320f
Merge branch 'main' into hw2/async-server
incubos Feb 29, 2024
c5a6710
Merge remote-tracking branch 'upstream/main' into hw2/async-server
SuDarina Mar 4, 2024
e6673e6
HW2: fixes according to review
SuDarina Mar 5, 2024
474cc38
HW2: change string template to typical string
SuDarina Mar 5, 2024
c601bb6
HW2: change parameters
SuDarina Mar 6, 2024
fe0ffa5
HW2: fix
SuDarina Mar 6, 2024
f1da741
HW2: just finish
SuDarina Mar 7, 2024
55458dc
Merge branch 'main' into hw2/async-server
incubos Mar 10, 2024
7b834f5
HW2: fixes according to review
SuDarina Mar 12, 2024
022c4d6
HW2: explaining absence of flush
SuDarina Mar 12, 2024
7de6005
HW3: add realization
SuDarina Mar 14, 2024
889bdd7
HW3: fix style
SuDarina Mar 14, 2024
c00be65
HW3: add final
SuDarina Mar 14, 2024
69c9b6d
HW3: fix finals
SuDarina Mar 14, 2024
c852e2f
HW3: change to async thread
SuDarina Mar 14, 2024
6e9031e
HW3: add shutdown threads
SuDarina Mar 14, 2024
7529b7b
HW3: fix style
SuDarina Mar 14, 2024
91ba923
Merge remote-tracking branch 'upstream/main' into hw3/sharding
SuDarina Mar 19, 2024
a9c58df
HW3: add first part of report
SuDarina Mar 20, 2024
141a869
HW3: add full report
SuDarina Mar 20, 2024
0f118ed
HW3: fix style
SuDarina Mar 20, 2024
afd4a3f
HW3: add explanations about hashCode()
SuDarina Mar 20, 2024
e9259b8
Merge remote-tracking branch 'upstream/main' into hw4/replication
SuDarina Mar 26, 2024
b4fae77
HW4: add implementation
SuDarina Mar 28, 2024
9ffd458
HW4: fix style
SuDarina Mar 28, 2024
6d47409
HW4: add report
SuDarina Apr 3, 2024
4f529d4
Merge remote-tracking branch 'upstream/main' into hw4/replication
SuDarina Apr 3, 2024
ab42efe
HW4: fix code style
SuDarina Apr 3, 2024
6625f89
HW4: add note
SuDarina Apr 3, 2024
a13db87
Merge branch 'main' into hw4/replication
pashchenko8 Apr 6, 2024
0ed611c
HW5: add realization
SuDarina Apr 11, 2024
6da7828
HW5: add completed future
SuDarina Apr 11, 2024
80a406b
HW5: fix tolerance
SuDarina Apr 11, 2024
9196595
Merge remote-tracking branch 'upstream/main' into hw5/async-interaction
SuDarina Apr 11, 2024
dc30ab3
HW5: fix codestyle
SuDarina Apr 11, 2024
6fe0449
HW5: fix codestyle
SuDarina Apr 11, 2024
629090b
HW5: close sessions
SuDarina Apr 11, 2024
33f5c3a
HW5: rewrite to method sendResponseAndCloseSession
SuDarina Apr 11, 2024
7523858
HW5: remove redundant parameter
SuDarina Apr 11, 2024
d0bd0e9
HW5: set close sessions
SuDarina Apr 11, 2024
2ee3bd4
HW5: change
SuDarina Apr 11, 2024
1db3665
HW5: change
SuDarina Apr 11, 2024
dca0f2d
Merge branch 'main' into hw5/async-interaction
incubos Apr 13, 2024
9d65379
HW5: fix multithreading problem for better profiling
SuDarina Apr 14, 2024
1d65f01
HW5: fix codestyle
SuDarina Apr 14, 2024
6be3d39
HW5: add report
SuDarina Apr 17, 2024
3a569d0
Merge branch 'main' into hw5/async-interaction
incubos Apr 28, 2024
71b103d
Merge branch 'main' into hw5/async-interaction
incubos May 1, 2024
595df0a
HW5: fixes according to review
SuDarina May 5, 2024
2049803
HW5: fix header
SuDarina May 5, 2024
3788b33
Merge branch 'main' into hw5/async-interaction
incubos May 10, 2024
818a350
HW5: fixes according to review
SuDarina May 10, 2024
e9e93eb
Merge remote-tracking branch 'origin/hw5/async-interaction' into hw5/…
SuDarina May 10, 2024
b05df36
HW5: complete todo
SuDarina May 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ public final class HeaderConstraints {

public static final String TIMESTAMP_MILLIS_HEADER = "X-TIMESTAMP-MILLIS: ";
public static final String TIMESTAMP_MILLIS_HEADER_NORMAL = "X-TIMESTAMP-MILLIS";
public static final String FROM_HEADER = "X-FROM";
public static final String FROM_HEADER_NORMAL = "X-FROM";
public static final String FROM_HEADER = "X-FROM: ";

private HeaderConstraints() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@

import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.util.concurrent.CompletableFuture;

public class SelfRequestHandler {

private static final String TIMESTAMP_MILLIS_HEADER = "X-TIMESTAMP-MILLIS: ";
private final Dao<MemorySegment, ExtendedEntry<MemorySegment>> dao;
private final Utils utils;
private static final String TIMESTAMP_MILLIS_HEADER = "X-TIMESTAMP-MILLIS: ";

public SelfRequestHandler(Dao<MemorySegment, ExtendedEntry<MemorySegment>> dao, Utils utils) {
this.dao = dao;
Expand All @@ -23,14 +24,22 @@ public SelfRequestHandler(Dao<MemorySegment, ExtendedEntry<MemorySegment>> dao,

public Response handleRequest(Request request) {
String id = utils.getIdParameter(request);
return switch (request.getMethodName()) {
case "GET" -> get(id);
case "PUT" -> put(id, request);
case "DELETE" -> delete(id);
return switch (request.getMethod()) {
case Request.METHOD_GET -> get(id);
case Request.METHOD_PUT -> put(id, request);
case Request.METHOD_DELETE -> delete(id);
default -> new Response(Response.NOT_FOUND, Response.EMPTY);
};
}

public CompletableFuture<Response> handleAsyncRequest(Request request) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Почему не просто composeFuture(handleRequest(request))?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Исправила

return composeFuture(handleRequest(request));
}

private CompletableFuture<Response> composeFuture(Response response) {
return CompletableFuture.completedFuture(response);
}

public Response get(String id) {
try {
if (id == null || id.isEmpty()) {
Expand Down
192 changes: 101 additions & 91 deletions src/main/java/ru/vk/itmo/test/dariasupriadkina/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.FROM_HEADER;
import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.FROM_HEADER_NORMAL;
import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.TIMESTAMP_MILLIS_HEADER;
import static ru.vk.itmo.test.dariasupriadkina.HeaderConstraints.TIMESTAMP_MILLIS_HEADER_NORMAL;

Expand All @@ -45,7 +46,6 @@ public class Server extends HttpServer {
private final String selfUrl;
private final ShardingPolicy shardingPolicy;
private final HttpClient httpClient;
private static final int REQUEST_TIMEOUT_SEC = 2;
private final List<String> clusterUrls;
private final Utils utils;
private final SelfRequestHandler selfHandler;
Expand All @@ -70,7 +70,7 @@ private static HttpServerConfig createHttpServerConfig(ServiceConfig serviceConf
acceptorConfig.port = serviceConfig.selfPort();
acceptorConfig.reusePort = true;

httpServerConfig.acceptors = new AcceptorConfig[] {acceptorConfig};
httpServerConfig.acceptors = new AcceptorConfig[]{acceptorConfig};
httpServerConfig.closeSessions = true;

return httpServerConfig;
Expand All @@ -83,7 +83,6 @@ public void handleDefault(Request request, HttpSession session) throws IOExcepti
response = new Response(Response.BAD_REQUEST, Response.EMPTY);
} else {
response = new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY);

}
session.sendResponse(response);
}
Expand All @@ -93,74 +92,68 @@ public void handleRequest(Request request, HttpSession session) throws IOExcepti
try {
workerExecutor.execute(() -> {
try {
if (!permittedMethods.contains(request.getMethod())) {
Map<String, Integer> ackFrom = getFromAndAck(request);
int from = ackFrom.get("from");
int ack = ackFrom.get("ack");
if (!permittedMethods.contains(request.getMethod())
|| checkBadRequest(ack, from, request.getMethod())) {
handleDefault(request, session);
return;
}
if (request.getHeader(FROM_HEADER) == null) {
if (request.getHeader(FROM_HEADER_NORMAL) == null) {
request.addHeader(FROM_HEADER + selfUrl);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Заголовки обычно в формате name: value, а тут будет X-FROMhttp://....

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Зачем мы здесь добавляем заголовок в исходный запрос, если он потом не используется?

Copy link
Contributor Author

@SuDarina SuDarina May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

в случае, если у нас запрос пришел от пользователя, мы не имеем заголовка X-FROM, поэтому осуществляем broadcast
(рассылку по нодам, которые были получены в ходе выполнения метода getNodesById), если же этот заголовок уже установлен,
значит, мы понимаем, что запрос пришел от другой ноды, а следовательно, предполагается, что значение по данному ключу
находится у нас, поэтому - отправляем на своего обработчика
Изначально, я планировала не только проверять наличие этого заголовка, но и в случае, когда он установлен, убеждаться в том, что этот URL входит в clusterUrls, чтобы избежать ситуации, когда данный заголовок был установлен кем-то вручную и вызывал неправильное поведение системы. Но, когда реализовывавлась 4я лабораторная работа этот момент для скорости написано сначала был опущен, а потом и вовсе забыт

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Я понял изначальную идею, но заголовок FROM_HEADER из данного объекта request дальше никак не используется, потому что мы формируем запросы для Java HTTP client.


Map<String, Integer> ackFrom = getFromAndAck(request);
int from = ackFrom.get("from");
int ack = ackFrom.get("ack");

Response response = mergeResponses(broadcast(
shardingPolicy.getNodesById(utils.getIdParameter(request), from), request, ack),
ack, from);

session.sendResponse(response);

collectResponsesCallback(
broadcast(
shardingPolicy.getNodesById(utils.getIdParameter(request), from),
request
), ack, from, session);
} else {
Response resp = selfHandler.handleRequest(request);
checkTimestampHeaderExistenceAndSet(resp);
session.sendResponse(resp);
}

} catch (Exception e) {
logger.error("Unexpected error", e);
try {
if (e.getClass() == HttpException.class) {
session.sendResponse(new Response(Response.BAD_REQUEST, Response.EMPTY));
} else {
session.sendResponse(new Response(Response.INTERNAL_ERROR, Response.EMPTY));
}
} catch (IOException ex) {
logger.error("Failed to send error response", e);
session.close();
}
solveUnexpectedError(e, session);
}
});
} catch (RejectedExecutionException e) {
logger.error("Service is unavailable", e);
session.sendResponse(new Response(Response.SERVICE_UNAVAILABLE, Response.EMPTY));
}
}

private void solveUnexpectedError(Exception e, HttpSession session) {
logger.error("Unexpected error", e);
SuDarina marked this conversation as resolved.
Show resolved Hide resolved
try {
if (e.getClass() == HttpException.class) {
session.sendResponse(new Response(Response.BAD_REQUEST, Response.EMPTY));
} else {
session.sendResponse(new Response(Response.INTERNAL_ERROR, Response.EMPTY));
}
} catch (Exception exception) {
logger.error("Failed to send error response", exception);
session.scheduleClose();
}
}

private List<Response> broadcast(List<String> nodes, Request request, int ack) {
int internalAck = ack;
List<Response> responses = new ArrayList<>(ack);
Response response;
private boolean checkBadRequest(int ack, int from, int method) {
return !permittedMethods.contains(method) || ack > from || ack <= 0 || from > clusterUrls.size();
}

private List<CompletableFuture<Response>> broadcast(List<String> nodes, Request request) {
List<CompletableFuture<Response>> futureResponses = new ArrayList<>(nodes.size());
CompletableFuture<Response> response;
if (nodes.contains(selfUrl)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Почему это не внутри цикла ниже? Зачем отдельный проход по списку и удаление себя, если обнаружили?

Copy link
Contributor Author

@SuDarina SuDarina May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Когда мы делаем рассылку по всем нодам, мы должны обработать запрос и на ноде, которая делает этот запрос, она отправляет этот запрос на свой локальный обработчик и работает со своей бд. Чтобы отделить две разные логики обработки, мы делаем отдельную прогонку. В рамках одной прогонки необходимо постоянно сверять текущий url c selfUrl, даже если мы уже обработали selfUrl, постоянное сравнение продолжается, что мне показалось не очень логично.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nodes.contains(selfUrl) это пробегание по всему списку и сравнение.
nodes.remove(selfUrl) это копирование куска списка, потому что под капотом ArrayList.
Затем снова итерируемся по списку.
Выглядит менее оптимально, чем один проход с постоянным сравнением.

response = selfHandler.handleRequest(request);
checkTimestampHeaderExistenceAndSet(response);
responses.add(response);
response = selfHandler.handleAsyncRequest(request);
futureResponses.add(response);
nodes.remove(selfUrl);
if (--internalAck == 0) {
return responses;
}
}

for (String node : nodes) {
response = handleProxy(utils.getEntryUrl(node, utils.getIdParameter(request)), request);
if (response.getStatus() < 500) {
checkTimestampHeaderExistenceAndSet(response);
responses.add(response);
if (--internalAck == 0) {
return responses;
}
}
futureResponses.add(response);
}
return responses;
return futureResponses;
}

private void checkTimestampHeaderExistenceAndSet(Response response) {
Expand All @@ -171,58 +164,75 @@ private void checkTimestampHeaderExistenceAndSet(Response response) {
}
}

private Response mergeResponses(List<Response> responses, int ack, int from) {
if (responses.stream().filter(response -> response.getStatus() == 400).count() == from
|| ack > from
|| from > clusterUrls.size()
|| ack <= 0) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
private void collectResponsesCallback(List<CompletableFuture<Response>> futureResponses,
SuDarina marked this conversation as resolved.
Show resolved Hide resolved
int ack, int from, HttpSession session) {
List<Response> responses = new CopyOnWriteArrayList<>();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger exceptionCount = new AtomicInteger(0);
for (CompletableFuture<Response> futureResponse : futureResponses) {
futureResponse.whenCompleteAsync((response, exception) -> {

if (exception == null && response.getStatus() < 500) {
checkTimestampHeaderExistenceAndSet(response);
responses.add(response);
successCount.incrementAndGet();
if (successCount.get() == ack) {
sendAsyncResponse(chooseResponse(responses), session);
}
} else {
exceptionCount.incrementAndGet();
if (exceptionCount.get() > from - ack) {
sendAsyncResponse(new Response("504 Not enough replicas", Response.EMPTY), session);
}
}

}, workerExecutor).exceptionally(exception -> {
logger.error("Error happened while collecting responses from nodes", exception);
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
incubos marked this conversation as resolved.
Show resolved Hide resolved
});
}
if (responses.stream().filter(response -> response.getStatus() == 200
|| response.getStatus() == 404
|| response.getStatus() == 202
|| response.getStatus() == 201).count() < ack) {
return new Response("504 Not Enough Replicas", Response.EMPTY);
}

private void sendAsyncResponse(Response resp, HttpSession session) {
try {
session.sendResponse(resp);
} catch (IOException e) {
logger.error("Failed to send error response", e);
session.scheduleClose();
}
}

private Response chooseResponse(List<Response> responses) {
return responses.stream().max((o1, o2) -> {
Long header1 = Long.parseLong(o1.getHeader(TIMESTAMP_MILLIS_HEADER));
Long header2 = Long.parseLong(o2.getHeader(TIMESTAMP_MILLIS_HEADER));
return header1.compareTo(header2);
}).get();
}

public Response handleProxy(String redirectedUrl, Request request) {
try {
HttpRequest httpRequest = HttpRequest.newBuilder(URI.create(redirectedUrl))
.header(FROM_HEADER, selfUrl)
.method(request.getMethodName(), HttpRequest.BodyPublishers.ofByteArray(
request.getBody() == null ? new byte[]{} : request.getBody())
).build();
HttpResponse<byte[]> response = httpClient
.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray())
.get(REQUEST_TIMEOUT_SEC, TimeUnit.SECONDS);
Response response1 = new Response(String.valueOf(response.statusCode()), response.body());
if (response.headers().map().get(TIMESTAMP_MILLIS_HEADER_NORMAL) == null) {
response1.addHeader(TIMESTAMP_MILLIS_HEADER + "0");
} else {
response1.addHeader(TIMESTAMP_MILLIS_HEADER
+ response.headers().map().get(
public CompletableFuture<Response> handleProxy(String redirectedUrl, Request request) {
HttpRequest httpRequest = HttpRequest.newBuilder(URI.create(redirectedUrl))
.header(FROM_HEADER_NORMAL, selfUrl)
.method(request.getMethodName(), HttpRequest.BodyPublishers.ofByteArray(
request.getBody() == null ? new byte[]{} : request.getBody())
incubos marked this conversation as resolved.
Show resolved Hide resolved
).build();
return httpClient
.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray())
.thenApplyAsync(httpResponse -> {
Response response1 = new Response(String.valueOf(httpResponse.statusCode()), httpResponse.body());
if (httpResponse.headers().map().get(TIMESTAMP_MILLIS_HEADER_NORMAL) == null) {
response1.addHeader(TIMESTAMP_MILLIS_HEADER + "0");
} else {
response1.addHeader(TIMESTAMP_MILLIS_HEADER
+ httpResponse.headers().map().get(
incubos marked this conversation as resolved.
Show resolved Hide resolved
TIMESTAMP_MILLIS_HEADER_NORMAL.toLowerCase(Locale.ROOT)).getFirst()
);
}

return response1;
} catch (ExecutionException e) {
logger.error("Unexpected error", e);
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
} catch (TimeoutException e) {
logger.error("Timeout reached", e);
return new Response(Response.REQUEST_TIMEOUT, Response.EMPTY);
} catch (InterruptedException e) {
logger.error("Service is unavailable", e);
Thread.currentThread().interrupt();
return new Response(Response.SERVICE_UNAVAILABLE, Response.EMPTY);
}
);
}
return response1;
}, workerExecutor).exceptionally(exception -> {
logger.error("Error happened while sending async requests", exception);
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
});
}

private Map<String, Integer> getFromAndAck(Request request) {
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/ru/vk/itmo/test/dariasupriadkina/ServiceIml.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@

public class ServiceIml implements Service {

private Server server;
private Dao<MemorySegment, ExtendedEntry<MemorySegment>> dao;
private final Config daoConfig;
private final ServiceConfig serviceConfig;
private final WorkerConfig workerConfig;
private WorkerThreadPoolExecutor workerThreadPoolExecutor;
private NodeThreadPoolExecutor nodeThreadPoolExecutor;
private final ShardingPolicy shardingPolicy;
private final AtomicBoolean stopped = new AtomicBoolean(false);
private Server server;
private Dao<MemorySegment, ExtendedEntry<MemorySegment>> dao;
private WorkerThreadPoolExecutor workerThreadPoolExecutor;
private NodeThreadPoolExecutor nodeThreadPoolExecutor;

public ServiceIml(ServiceConfig serviceConfig, Config daoConfig,
WorkerConfig workerConfig, ShardingPolicy shardingPolicy) {
Expand All @@ -43,11 +43,11 @@ public synchronized CompletableFuture<Void> start() throws IOException {
dao = new ReferenceDao(daoConfig);
workerThreadPoolExecutor = new WorkerThreadPoolExecutor(workerConfig);
// TODO вынести параметры в отдельную конфигурацию для большей гибкости
incubos marked this conversation as resolved.
Show resolved Hide resolved
nodeThreadPoolExecutor = new NodeThreadPoolExecutor(8,
8,
nodeThreadPoolExecutor = new NodeThreadPoolExecutor(16,
16,
incubos marked this conversation as resolved.
Show resolved Hide resolved
new ArrayBlockingQueue<>(1024),
new CustomThreadFactory("node-executor", true),
new ThreadPoolExecutor.AbortPolicy(), 30);
new ThreadPoolExecutor.AbortPolicy(), 60);
SuDarina marked this conversation as resolved.
Show resolved Hide resolved
nodeThreadPoolExecutor.prestartAllCoreThreads();
workerThreadPoolExecutor.prestartAllCoreThreads();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import java.nio.file.Path;

@ServiceFactory(stage = 4)
@ServiceFactory(stage = 5)
public class ServiceImlFactory implements ServiceFactory.Factory {

private static final long FLUSH_THRESHOLD_BYTES = 1024 * 1024;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ public static void main(String[] args) throws IOException, ExecutionException,
for (ServiceConfig serviceConfig : clusterConfs) {
ServiceIml serviceIml = new ServiceIml(serviceConfig, new Config(serviceConfig.workingDir(),
1024 * 1024),
new WorkerConfig(THREADS, THREADS, QUEUE_SIZE, 30),
shardingPolicy);
new WorkerConfig(THREADS * 2, THREADS * 2,
QUEUE_SIZE, 60), shardingPolicy);
serviceIml.start().get(2, TimeUnit.SECONDS);
}
}
Expand Down
Loading
Loading