Skip to content

Commit

Permalink
Солнышко Ксения, ИТМО ФИТиП, M3332, stage 1 (polis-vk#11)
Browse files Browse the repository at this point in the history
* Add dao

* Add server

* Fix failed tests

* Fix failed codeclimate

* Fix failed codeclimate

* Remove string template

* Resolve some problems

* Add scripts, report and attachments

---------

Co-authored-by: Alexey Shik <[email protected]>
  • Loading branch information
2 people authored and osokindm committed Mar 6, 2024
1 parent a7ff920 commit d77ec98
Show file tree
Hide file tree
Showing 40 changed files with 5,017 additions and 0 deletions.
114 changes: 114 additions & 0 deletions src/main/java/ru/vk/itmo/test/solnyshkoksenia/MyHttpServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package ru.vk.itmo.test.solnyshkoksenia;

import one.nio.http.HttpServer;
import one.nio.http.HttpServerConfig;
import one.nio.http.HttpSession;
import one.nio.http.Param;
import one.nio.http.Path;
import one.nio.http.Request;
import one.nio.http.RequestMethod;
import one.nio.http.Response;
import one.nio.server.AcceptorConfig;
import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.dao.BaseEntry;
import ru.vk.itmo.dao.Config;
import ru.vk.itmo.dao.Entry;
import ru.vk.itmo.test.solnyshkoksenia.dao.DaoImpl;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.charset.StandardCharsets;

public class MyHttpServer extends HttpServer {
private final DaoImpl dao;

public MyHttpServer(ServiceConfig config) throws IOException {
super(createHttpServerConfig(config));
this.dao = new DaoImpl(createConfig(config));
}

private static HttpServerConfig createHttpServerConfig(ServiceConfig serviceConfig) {
HttpServerConfig serverConfig = new HttpServerConfig();
AcceptorConfig acceptorConfig = new AcceptorConfig();
acceptorConfig.port = serviceConfig.selfPort();
acceptorConfig.reusePort = true;

serverConfig.acceptors = new AcceptorConfig[]{acceptorConfig};
serverConfig.closeSessions = true;
return serverConfig;
}

private static Config createConfig(ServiceConfig config) {
return new Config(config.workingDir(), Math.round(0.33 * 128 * 1024 * 1024)); // 0.33 * 128mb
}

@Override
public void handleDefault(Request request, HttpSession session) throws IOException {
if (request.getMethod() == Request.METHOD_GET) {
session.sendResponse(new Response(Response.BAD_REQUEST, Response.EMPTY));
}
session.sendResponse(new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY));
}

@Override
public synchronized void stop() {
try {
dao.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
super.stop();
}

@Path("/v0/entity")
@RequestMethod(Request.METHOD_GET)
public void get(final HttpSession session,
@Param(value = "id", required = true) String id) throws IOException {
if (sendResponseIfEmpty(id, session)) {
return;
}

Entry<MemorySegment> entry = dao.get(toMS(id));
if (entry == null) {
session.sendResponse(new Response(Response.NOT_FOUND, Response.EMPTY));
return;
}
session.sendResponse(Response.ok(entry.value().toArray(ValueLayout.JAVA_BYTE)));
}

@Path("/v0/entity")
@RequestMethod(Request.METHOD_PUT)
public void put(final Request request, final HttpSession session,
@Param(value = "id", required = true) String id) throws IOException {
if (sendResponseIfEmpty(id, session)) {
return;
}
dao.upsert(new BaseEntry<>(toMS(id), MemorySegment.ofArray(request.getBody())));
session.sendResponse(new Response(Response.CREATED, Response.EMPTY));
}

@Path("/v0/entity")
@RequestMethod(Request.METHOD_DELETE)
public void delete(final HttpSession session,
@Param(value = "id", required = true) String id) throws IOException {
if (sendResponseIfEmpty(id, session)) {
return;
}
dao.upsert(new BaseEntry<>(toMS(id), null));
session.sendResponse(new Response(Response.ACCEPTED, Response.EMPTY));
}

private boolean sendResponseIfEmpty(String input, final HttpSession session) throws IOException {
if (input.isEmpty()) {
session.sendResponse(new Response(Response.BAD_REQUEST, Response.EMPTY));
return true;
}
return false;
}

private MemorySegment toMS(String input) {
return MemorySegment.ofArray(input.getBytes(StandardCharsets.UTF_8));
}
}
38 changes: 38 additions & 0 deletions src/main/java/ru/vk/itmo/test/solnyshkoksenia/ServiceImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package ru.vk.itmo.test.solnyshkoksenia;

import ru.vk.itmo.Service;
import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.test.ServiceFactory;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public class ServiceImpl implements Service {
private final ServiceConfig config;
private MyHttpServer server;

public ServiceImpl(ServiceConfig config) {
this.config = config;
}

@Override
public CompletableFuture<Void> start() throws IOException {
server = new MyHttpServer(config);
server.start();
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> stop() throws IOException {
server.stop();
return CompletableFuture.completedFuture(null);
}

@ServiceFactory(stage = 1)
public static class Factory implements ServiceFactory.Factory {
@Override
public Service create(ServiceConfig config) {
return new ServiceImpl(config);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ru.vk.itmo.test.solnyshkoksenia.dao;

public class DaoException extends RuntimeException {
public DaoException(String message) {
super(message);
}

public DaoException(String message, Throwable cause) {
super(message, cause);
}
}
187 changes: 187 additions & 0 deletions src/main/java/ru/vk/itmo/test/solnyshkoksenia/dao/DaoImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package ru.vk.itmo.test.solnyshkoksenia.dao;

import ru.vk.itmo.dao.Config;
import ru.vk.itmo.dao.Dao;
import ru.vk.itmo.dao.Entry;
import ru.vk.itmo.test.solnyshkoksenia.dao.storage.DiskStorage;

import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class DaoImpl implements Dao<MemorySegment, Entry<MemorySegment>> {
private static final Comparator<MemorySegment> comparator = new MemorySegmentComparator();
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
private final Arena arena;
private final Path path;
private volatile State curState;

public DaoImpl(Config config) throws IOException {
path = config.basePath().resolve("data");
Files.createDirectories(path);

arena = Arena.ofShared();

this.curState = new State(config, new DiskStorage(DiskStorage.loadOrRecover(path, arena), path));
}

@Override
public Iterator<Entry<MemorySegment>> get(MemorySegment from, MemorySegment to) {
State state = this.curState.checkAndGet();
List<Iterator<EntryExtended<MemorySegment>>> iterators = List.of(
state.getInMemory(state.flushingStorage, from, to),
state.getInMemory(state.storage, from, to)
);

Iterator<EntryExtended<MemorySegment>> iterator = new MergeIterator<>(iterators,
(e1, e2) -> comparator.compare(e1.key(), e2.key()));
Iterator<EntryExtended<MemorySegment>> innerIterator = state.diskStorage.range(iterator, from, to);

return new Iterator<>() {
@Override
public boolean hasNext() {
return innerIterator.hasNext();
}

@Override
public Entry<MemorySegment> next() {
return innerIterator.next();
}
};
}

public void upsert(Entry<MemorySegment> entry, Long ttl) {
State state = this.curState.checkAndGet();

lock.readLock().lock();
try {
state.putInMemory(entry, ttl);
} finally {
lock.readLock().unlock();
}

if (state.isOverflowed()) {
try {
autoFlush();
} catch (IOException e) {
throw new DaoException("Memory storage overflowed. Cannot flush", e);
}
}
}

@Override
public void upsert(Entry<MemorySegment> entry) {
upsert(entry, null);
}

@Override
public Entry<MemorySegment> get(MemorySegment key) {
State state = this.curState.checkAndGet();
return state.get(key, comparator);
}

@Override
public void flush() throws IOException {
State state = this.curState.checkAndGet();
if (state.storage.isEmpty() || state.isFlushing()) {
return;
}
autoFlush();
}

private void autoFlush() throws IOException {
State state = this.curState.checkAndGet();
lock.writeLock().lock();
try {
if (state.isFlushing()) {
return;
}
this.curState = state.moveStorage();
} finally {
lock.writeLock().unlock();
}

executor.execute(this::tryFlush);
}

private void tryFlush() {
State state = this.curState.checkAndGet();
try {
state.flush();
} catch (IOException e) {
throw new DaoException("Flush failed", e);
}

lock.writeLock().lock();
try {
this.curState = new State(state.config, state.storage, new ConcurrentSkipListMap<>(comparator),
new DiskStorage(DiskStorage.loadOrRecover(path, arena), path));
} catch (IOException e) {
throw new DaoException("Cannot recover storage on disk", e);
} finally {
lock.writeLock().unlock();
}
}

@Override
public void compact() {
try {
executor.submit(this::tryCompact).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new DaoException("Compaction failed. Thread interrupted", e);
} catch (ExecutionException e) {
throw new DaoException("Compaction failed", e);
}
}

private Object tryCompact() {
State state = this.curState.checkAndGet();
try {
state.diskStorage.compact();
} catch (IOException e) {
throw new DaoException("Cannot compact", e);
}

lock.writeLock().lock();
try {
this.curState = new State(state.config, state.storage, state.flushingStorage,
new DiskStorage(DiskStorage.loadOrRecover(path, arena), path));
} catch (IOException e) {
throw new DaoException("Cannot recover storage on disk after compaction", e);
} finally {
lock.writeLock().unlock();
}

return null;
}

@Override
public synchronized void close() throws IOException {
State state = this.curState;
if (state.isClosed() || !arena.scope().isAlive()) {
return;
}

if (!state.storage.isEmpty()) {
state.save();
}

executor.close();
arena.close();

this.curState = state.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ru.vk.itmo.test.solnyshkoksenia.dao;

import ru.vk.itmo.dao.Entry;

public record EntryExtended<Data>(Data key, Data value, Data expiration) implements Entry<Data> {
@Override
public String toString() {
return "{" + key + ":" + value + ":" + expiration + "}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ru.vk.itmo.test.solnyshkoksenia.dao;

import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.util.Comparator;

public class MemorySegmentComparator implements Comparator<MemorySegment> {
@Override
public int compare(MemorySegment o1, MemorySegment o2) {
long mismatch = o1.mismatch(o2);
if (mismatch == -1) {
return 0;
}

if (mismatch == o1.byteSize()) {
return -1;
}

if (mismatch == o2.byteSize()) {
return 1;
}

byte b1 = o1.get(ValueLayout.JAVA_BYTE, mismatch);
byte b2 = o2.get(ValueLayout.JAVA_BYTE, mismatch);
return Byte.compare(b1, b2);
}
}
Loading

0 comments on commit d77ec98

Please sign in to comment.