forked from polis-vk/2024-highload-dht
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Коротких Виктор / ИТМО DWS / Stage 1 (polis-vk#4)
* state-1: add dao impl * state-1: first implementation of httpserver * state-1: fix flushing after stress failure * state-1: refactoring * state-1: add scripts * state-1: wrk reports * state-1: fix codestyle * state-1: fix codestyle 2 * state-1: add report * state-1: fix getting id parameter * state-1: new report * state-1: fix new report * state-1: memory allocations fix * state-1: fix codeclimate * stage-1: fix review comments * stage-1: codeclimate fix * stage-1: add global exception handler
- Loading branch information
Showing
82 changed files
with
4,069 additions
and
0 deletions.
There are no files selected for viewing
160 changes: 160 additions & 0 deletions
160
src/main/java/ru/vk/itmo/test/viktorkorotkikh/LSMServerImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
package ru.vk.itmo.test.viktorkorotkikh; | ||
|
||
import one.nio.http.HttpServer; | ||
import one.nio.http.HttpServerConfig; | ||
import one.nio.http.HttpSession; | ||
import one.nio.http.Path; | ||
import one.nio.http.Request; | ||
import one.nio.http.RequestMethod; | ||
import one.nio.http.Response; | ||
import one.nio.net.Socket; | ||
import one.nio.server.AcceptorConfig; | ||
import one.nio.server.RejectedSessionException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import ru.vk.itmo.ServiceConfig; | ||
import ru.vk.itmo.dao.BaseEntry; | ||
import ru.vk.itmo.dao.Dao; | ||
import ru.vk.itmo.dao.Entry; | ||
import ru.vk.itmo.test.viktorkorotkikh.dao.exceptions.LSMDaoOutOfMemoryException; | ||
import ru.vk.itmo.test.viktorkorotkikh.dao.exceptions.TooManyFlushesException; | ||
import ru.vk.itmo.test.viktorkorotkikh.http.LSMConstantResponse; | ||
import ru.vk.itmo.test.viktorkorotkikh.http.LSMCustomSession; | ||
import ru.vk.itmo.test.viktorkorotkikh.http.LSMServerResponseWithMemorySegment; | ||
|
||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.lang.foreign.MemorySegment; | ||
import java.nio.charset.StandardCharsets; | ||
|
||
import static one.nio.http.Request.METHOD_DELETE; | ||
import static one.nio.http.Request.METHOD_GET; | ||
import static one.nio.http.Request.METHOD_PUT; | ||
|
||
public class LSMServerImpl extends HttpServer { | ||
private static final Logger log = LoggerFactory.getLogger(LSMServerImpl.class); | ||
private final Dao<MemorySegment, Entry<MemorySegment>> dao; | ||
|
||
public LSMServerImpl(ServiceConfig serviceConfig, Dao<MemorySegment, Entry<MemorySegment>> dao) throws IOException { | ||
super(createServerConfig(serviceConfig)); | ||
this.dao = dao; | ||
} | ||
|
||
private static HttpServerConfig createServerConfig(ServiceConfig serviceConfig) { | ||
HttpServerConfig serverConfig = new HttpServerConfig(); | ||
serverConfig.acceptors = new AcceptorConfig[]{createAcceptorConfig(serviceConfig.selfPort())}; | ||
serverConfig.closeSessions = true; | ||
return serverConfig; | ||
} | ||
|
||
private static AcceptorConfig createAcceptorConfig(int port) { | ||
AcceptorConfig acceptorConfig = new AcceptorConfig(); | ||
acceptorConfig.port = port; | ||
acceptorConfig.reusePort = true; | ||
return acceptorConfig; | ||
} | ||
|
||
@Override | ||
public void handleRequest(Request request, HttpSession session) throws IOException { | ||
try { | ||
super.handleRequest(request, session); | ||
} catch (Exception e) { | ||
log.error("Unexpected error occurred: ", e); | ||
session.sendResponse(LSMConstantResponse.serviceUnavailable(request)); | ||
} | ||
} | ||
|
||
@Path("/v0/entity") | ||
public void handleEntityRequest(Request request, HttpSession session) throws IOException { | ||
// validate id parameter | ||
String id = request.getParameter("id="); | ||
if (id == null || id.isEmpty()) { | ||
session.sendResponse(LSMConstantResponse.badRequest(request)); | ||
return; | ||
} | ||
|
||
Response response = switch (request.getMethod()) { | ||
case METHOD_GET -> handleGetEntity(request, id); | ||
case METHOD_PUT -> handlePutEntity(request, id); | ||
case METHOD_DELETE -> handleDeleteEntity(request, id); | ||
default -> LSMConstantResponse.methodNotAllowed(request); | ||
}; | ||
session.sendResponse(response); | ||
} | ||
|
||
private Response handleGetEntity(final Request request, final String id) { | ||
final Entry<MemorySegment> entry; | ||
try { | ||
entry = dao.get(fromString(id)); | ||
} catch (UncheckedIOException e) { | ||
// sstable get method throws UncheckedIOException | ||
return LSMConstantResponse.serviceUnavailable(request); | ||
} | ||
if (entry == null || entry.value() == null) { | ||
return LSMConstantResponse.notFound(request); | ||
} | ||
|
||
return new LSMServerResponseWithMemorySegment(Response.OK, entry.value()); | ||
} | ||
|
||
private Response handlePutEntity(final Request request, final String id) { | ||
if (request.getBody() == null) { | ||
return LSMConstantResponse.badRequest(request); | ||
} | ||
|
||
Entry<MemorySegment> newEntry = new BaseEntry<>( | ||
fromString(id), | ||
MemorySegment.ofArray(request.getBody()) | ||
); | ||
try { | ||
dao.upsert(newEntry); | ||
} catch (LSMDaoOutOfMemoryException e) { | ||
// when entry is too big to be putted into memtable | ||
return LSMConstantResponse.entityTooLarge(request); | ||
} catch (TooManyFlushesException e) { | ||
// when one memory table is in the process of being flushed, and the second is already full | ||
return LSMConstantResponse.tooManyRequests(request); | ||
} | ||
|
||
return LSMConstantResponse.created(request); | ||
} | ||
|
||
private Response handleDeleteEntity(final Request request, final String id) { | ||
final Entry<MemorySegment> newEntry = new BaseEntry<>( | ||
fromString(id), | ||
null | ||
); | ||
try { | ||
dao.upsert(newEntry); | ||
} catch (LSMDaoOutOfMemoryException e) { | ||
// when entry is too big to be putted into memtable | ||
return LSMConstantResponse.entityTooLarge(request); | ||
} catch (TooManyFlushesException e) { | ||
// when one memory table is in the process of being flushed, and the second is already full | ||
return LSMConstantResponse.tooManyRequests(request); | ||
} | ||
|
||
return LSMConstantResponse.accepted(request); | ||
} | ||
|
||
@Path("/v0/compact") | ||
@RequestMethod(value = {METHOD_GET}) | ||
public Response handleCompact(Request request) throws IOException { | ||
dao.compact(); | ||
return LSMConstantResponse.ok(request); | ||
} | ||
|
||
private static MemorySegment fromString(String data) { | ||
return data == null ? null : MemorySegment.ofArray(data.getBytes(StandardCharsets.UTF_8)); | ||
} | ||
|
||
@Override | ||
public void handleDefault(Request request, HttpSession session) throws IOException { | ||
session.sendResponse(LSMConstantResponse.badRequest(request)); | ||
} | ||
|
||
@Override | ||
public HttpSession createSession(Socket socket) throws RejectedSessionException { | ||
return new LSMCustomSession(socket, this); | ||
} | ||
} |
107 changes: 107 additions & 0 deletions
107
src/main/java/ru/vk/itmo/test/viktorkorotkikh/LSMServiceImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
package ru.vk.itmo.test.viktorkorotkikh; | ||
|
||
import ru.vk.itmo.Service; | ||
import ru.vk.itmo.ServiceConfig; | ||
import ru.vk.itmo.dao.Config; | ||
import ru.vk.itmo.dao.Dao; | ||
import ru.vk.itmo.dao.Entry; | ||
import ru.vk.itmo.test.ServiceFactory; | ||
import ru.vk.itmo.test.viktorkorotkikh.dao.LSMDaoImpl; | ||
|
||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.lang.foreign.MemorySegment; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.util.List; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ExecutionException; | ||
|
||
public class LSMServiceImpl implements Service { | ||
private static final long FLUSH_THRESHOLD = 1 << 20; // 1 MB | ||
private final ServiceConfig serviceConfig; | ||
private LSMServerImpl httpServer; | ||
private boolean isRunning; | ||
private Dao<MemorySegment, Entry<MemorySegment>> dao; | ||
|
||
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { | ||
Path tmpDir = Files.createTempDirectory("dao"); | ||
tmpDir.toFile().deleteOnExit(); | ||
|
||
ServiceConfig serviceConfig = new ServiceConfig( | ||
8080, | ||
"http://localhost:8080", | ||
List.of("http://localhost:8080"), | ||
tmpDir | ||
); | ||
LSMServiceImpl lsmService = new LSMServiceImpl(serviceConfig); | ||
|
||
lsmService.start().get(); | ||
} | ||
|
||
public LSMServiceImpl(ServiceConfig serviceConfig) throws IOException { | ||
this.serviceConfig = serviceConfig; | ||
} | ||
|
||
private static LSMServerImpl createServer( | ||
ServiceConfig serviceConfig, | ||
Dao<MemorySegment, Entry<MemorySegment>> dao | ||
) throws IOException { | ||
return new LSMServerImpl(serviceConfig, dao); | ||
} | ||
|
||
private static Dao<MemorySegment, Entry<MemorySegment>> createLSMDao(Path workingDir) { | ||
Config daoConfig = new Config( | ||
workingDir, | ||
FLUSH_THRESHOLD | ||
); | ||
return new LSMDaoImpl(daoConfig); | ||
} | ||
|
||
private static void closeLSMDao(Dao<MemorySegment, Entry<MemorySegment>> dao) { | ||
if (dao == null) return; | ||
try { | ||
dao.close(); | ||
} catch (IOException e) { | ||
throw new UncheckedIOException(e); | ||
} | ||
} | ||
|
||
@Override | ||
public synchronized CompletableFuture<Void> start() throws IOException { | ||
if (isRunning) return CompletableFuture.completedFuture(null); | ||
dao = createLSMDao(serviceConfig.workingDir()); | ||
|
||
httpServer = createServer(serviceConfig, dao); | ||
httpServer.start(); | ||
|
||
isRunning = true; | ||
return CompletableFuture.completedFuture(null); | ||
} | ||
|
||
@Override | ||
public synchronized CompletableFuture<Void> stop() throws IOException { | ||
if (!isRunning) return CompletableFuture.completedFuture(null); | ||
httpServer.stop(); | ||
httpServer = null; | ||
|
||
closeLSMDao(dao); | ||
dao = null; | ||
|
||
isRunning = false; | ||
return CompletableFuture.completedFuture(null); | ||
} | ||
|
||
@ServiceFactory(stage = 1) | ||
public static class LSMServiceFactoryImpl implements ServiceFactory.Factory { | ||
@Override | ||
public Service create(ServiceConfig config) { | ||
try { | ||
return new LSMServiceImpl(config); | ||
} catch (IOException e) { | ||
throw new UncheckedIOException(e); | ||
} | ||
} | ||
} | ||
|
||
} |
5 changes: 5 additions & 0 deletions
5
src/main/java/ru/vk/itmo/test/viktorkorotkikh/dao/EntriesMetadata.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
package ru.vk.itmo.test.viktorkorotkikh.dao; | ||
|
||
public record EntriesMetadata(int count, long entriesDataSize) { | ||
|
||
} |
Oops, something went wrong.