From 00a02e0b295ba8186e073ec748215a5f421ec7bc Mon Sep 17 00:00:00 2001 From: yulalenk Date: Sun, 18 Feb 2024 21:14:38 +0300 Subject: [PATCH] base implementation of server --- .../itmo/test/alenkovayulya/ServerImpl.java | 99 ++++++ .../test/alenkovayulya/ServerInitializer.java | 26 ++ .../itmo/test/alenkovayulya/ServiceImpl.java | 44 +++ .../alenkovayulya/dao/ByteArraySegment.java | 48 +++ .../dao/LiveFilteringIterator.java | 52 ++++ .../itmo/test/alenkovayulya/dao/MemTable.java | 49 +++ .../dao/MemorySegmentComparator.java | 89 ++++++ .../dao/MergingEntryIterator.java | 68 ++++ .../test/alenkovayulya/dao/ReferenceDao.java | 292 ++++++++++++++++++ .../itmo/test/alenkovayulya/dao/SSTable.java | 204 ++++++++++++ .../test/alenkovayulya/dao/SSTableWriter.java | 166 ++++++++++ .../itmo/test/alenkovayulya/dao/SSTables.java | 162 ++++++++++ .../itmo/test/alenkovayulya/dao/TableSet.java | 201 ++++++++++++ .../dao/WeightedPeekingEntryIterator.java | 67 ++++ 14 files changed, 1567 insertions(+) create mode 100644 src/main/java/ru/vk/itmo/test/alenkovayulya/ServerImpl.java create mode 100644 src/main/java/ru/vk/itmo/test/alenkovayulya/ServerInitializer.java create mode 100644 src/main/java/ru/vk/itmo/test/alenkovayulya/ServiceImpl.java create mode 100644 src/main/java/ru/vk/itmo/test/alenkovayulya/dao/ByteArraySegment.java create mode 100644 src/main/java/ru/vk/itmo/test/alenkovayulya/dao/LiveFilteringIterator.java create mode 100644 src/main/java/ru/vk/itmo/test/alenkovayulya/dao/MemTable.java create mode 100644 src/main/java/ru/vk/itmo/test/alenkovayulya/dao/MemorySegmentComparator.java create mode 100644 src/main/java/ru/vk/itmo/test/alenkovayulya/dao/MergingEntryIterator.java create mode 100644 src/main/java/ru/vk/itmo/test/alenkovayulya/dao/ReferenceDao.java create mode 100644 src/main/java/ru/vk/itmo/test/alenkovayulya/dao/SSTable.java create mode 100644 src/main/java/ru/vk/itmo/test/alenkovayulya/dao/SSTableWriter.java create mode 100644 src/main/java/ru/vk/itmo/test/alenkovayulya/dao/SSTables.java create mode 100644 src/main/java/ru/vk/itmo/test/alenkovayulya/dao/TableSet.java create mode 100644 src/main/java/ru/vk/itmo/test/alenkovayulya/dao/WeightedPeekingEntryIterator.java diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/ServerImpl.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/ServerImpl.java new file mode 100644 index 000000000..f6bc6bde4 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/ServerImpl.java @@ -0,0 +1,99 @@ +package ru.vk.itmo.test.alenkovayulya; + +import one.nio.http.*; +import one.nio.server.AcceptorConfig; +import ru.vk.itmo.ServiceConfig; +import ru.vk.itmo.dao.BaseEntry; +import ru.vk.itmo.dao.Entry; +import ru.vk.itmo.test.alenkovayulya.dao.ReferenceDao; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.nio.charset.StandardCharsets; + +public class ServerImpl extends HttpServer { + + private final ReferenceDao referenceDao; + + public ServerImpl(ServiceConfig serviceConfig, ReferenceDao referenceDao) throws IOException { + super(createServerConfig(serviceConfig)); + this.referenceDao = referenceDao; + } + + private static HttpServerConfig createServerConfig(ServiceConfig serviceConfig) { + HttpServerConfig serverConfig = new HttpServerConfig(); + AcceptorConfig acceptorConfig = new AcceptorConfig(); + acceptorConfig.port = serviceConfig.selfPort(); + acceptorConfig.reusePort = true; + + serverConfig.acceptors = new AcceptorConfig[]{acceptorConfig}; + serverConfig.closeSessions = true; + return serverConfig; + } + + + @Path("/v0/entity") + @RequestMethod(Request.METHOD_GET) + public Response getEntity(@Param(value = "id", required = true) String id) { + if (isEmptyId(id)) { + return new Response(Response.BAD_REQUEST, Response.EMPTY); + } + Entry value = referenceDao.get(convertBytesToMemorySegment(id.getBytes(StandardCharsets.UTF_8))); + + return value == null ? + new Response(Response.NOT_FOUND, Response.EMPTY) : + Response.ok(value.value().toArray(ValueLayout.JAVA_BYTE)); + } + + @Path("/v0/entity") + @RequestMethod(Request.METHOD_PUT) + public Response putEntity(@Param(value = "id", required = true) String id, Request request) { + if (isEmptyId(id)) { + return new Response(Response.BAD_REQUEST, Response.EMPTY); + } + referenceDao.upsert(new BaseEntry<>( + convertBytesToMemorySegment(id.getBytes(StandardCharsets.UTF_8)), + convertBytesToMemorySegment(request.getBody()))); + return new Response(Response.CREATED, Response.EMPTY); + } + + @Path("/v0/entity") + @RequestMethod(Request.METHOD_DELETE) + public Response deleteEntity(@Param(value = "id", required = true) String id) { + if (isEmptyId(id)) { + return new Response(Response.BAD_REQUEST, Response.EMPTY); + } + referenceDao.upsert(new BaseEntry<>( + convertBytesToMemorySegment(id.getBytes(StandardCharsets.UTF_8)), null)); + return new Response(Response.ACCEPTED, Response.EMPTY); + } + + + @Override + public void handleDefault(Request request, HttpSession session) throws IOException { + switch (request.getMethodName()) { + case "GET", "PUT", "DELETE" -> session.sendResponse(new Response(Response.BAD_REQUEST, Response.EMPTY)); + default -> session.sendResponse(new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY)); + } + } + + @Override + public synchronized void stop() { + super.stop(); + try { + referenceDao.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private boolean isEmptyId(String id) { + return id.isEmpty() && id.isBlank(); + } + + private MemorySegment convertBytesToMemorySegment(byte[] byteArray) { + return MemorySegment.ofArray(byteArray); + } +} diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/ServerInitializer.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/ServerInitializer.java new file mode 100644 index 000000000..8d6a504e5 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/ServerInitializer.java @@ -0,0 +1,26 @@ +package ru.vk.itmo.test.alenkovayulya; + +import ru.vk.itmo.ServiceConfig; +import ru.vk.itmo.dao.Config; +import ru.vk.itmo.test.alenkovayulya.dao.ReferenceDao; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; + +public class ServerInitializer { + public static final int PORT = 8080; + public static final String URL = "http://localhost"; + + + public static void main(String[] args) throws IOException { + ServiceConfig config = new ServiceConfig(PORT, URL, List.of(URL), + Files.createTempDirectory("reports") + ); + + ReferenceDao dao = new ReferenceDao(new Config(config.workingDir(), 1024)); + ServerImpl server = new ServerImpl(config, dao); + server.start(); + } + +} diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/ServiceImpl.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/ServiceImpl.java new file mode 100644 index 000000000..667551864 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/ServiceImpl.java @@ -0,0 +1,44 @@ +package ru.vk.itmo.test.alenkovayulya; + +import ru.vk.itmo.Service; +import ru.vk.itmo.ServiceConfig; +import ru.vk.itmo.dao.Config; +import ru.vk.itmo.test.ServiceFactory; +import ru.vk.itmo.test.alenkovayulya.dao.ReferenceDao; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +public class ServiceImpl implements Service { + + private ReferenceDao referenceDao; + private ServerImpl server; + private final ServiceConfig config; + + public ServiceImpl(ServiceConfig config) { + this.config = config; + + } + @Override + public CompletableFuture start() throws IOException { + referenceDao = new ReferenceDao(new Config(config.workingDir(), 1024)); + server = new ServerImpl(config, referenceDao); + server.start(); + return CompletableFuture.completedFuture(null); + } + @Override + public CompletableFuture stop() throws IOException { + server.stop(); + referenceDao.close(); + return CompletableFuture.completedFuture(null); + } + + @ServiceFactory(stage = 1) + public static class Factory implements ServiceFactory.Factory { + + @Override + public Service create(ServiceConfig config) { + return new ServiceImpl(config); + } + } +} diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/ByteArraySegment.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/ByteArraySegment.java new file mode 100644 index 000000000..ee2e80566 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/ByteArraySegment.java @@ -0,0 +1,48 @@ +package ru.vk.itmo.test.alenkovayulya.dao; + +import java.io.IOException; +import java.lang.foreign.MemorySegment; +import java.nio.ByteBuffer; + +/** + * Growable buffer with {@link ByteBuffer} and {@link MemorySegment} interface. + * + * @author incubos + */ +final class ByteArraySegment { + private byte[] array; + private MemorySegment segment; + + ByteArraySegment(final int capacity) { + this.array = new byte[capacity]; + this.segment = MemorySegment.ofArray(array); + } + + void withArray(final ArrayConsumer consumer) throws IOException { + consumer.process(array); + } + + MemorySegment segment() { + return segment; + } + + void ensureCapacity(final long size) { + if (size > Integer.MAX_VALUE) { + throw new IllegalArgumentException("Too big!"); + } + + final int capacity = (int) size; + if (array.length >= capacity) { + return; + } + + // Grow to the nearest bigger power of 2 + final int newSize = Integer.highestOneBit(capacity) << 1; + array = new byte[newSize]; + segment = MemorySegment.ofArray(array); + } + + interface ArrayConsumer { + void process(byte[] array) throws IOException; + } +} diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/LiveFilteringIterator.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/LiveFilteringIterator.java new file mode 100644 index 000000000..1a3faa78c --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/LiveFilteringIterator.java @@ -0,0 +1,52 @@ +package ru.vk.itmo.test.alenkovayulya.dao; + +import ru.vk.itmo.dao.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Filters non tombstone {@link Entry}s. + * + * @author incubos + */ +final class LiveFilteringIterator implements Iterator> { + private final Iterator> delegate; + private Entry next; + + LiveFilteringIterator(final Iterator> delegate) { + this.delegate = delegate; + skipTombstones(); + } + + private void skipTombstones() { + while (delegate.hasNext()) { + final Entry entry = delegate.next(); + if (entry.value() != null) { + this.next = entry; + break; + } + } + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + // Consume + final Entry result = next; + next = null; + + skipTombstones(); + + return result; + } +} diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/MemTable.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/MemTable.java new file mode 100644 index 000000000..3ff8f1f40 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/MemTable.java @@ -0,0 +1,49 @@ +package ru.vk.itmo.test.alenkovayulya.dao; + +import ru.vk.itmo.dao.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.Iterator; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +/** + * Memory table. + * + * @author incubos + */ +final class MemTable { + private final NavigableMap> map = + new ConcurrentSkipListMap<>( + MemorySegmentComparator.INSTANCE); + + boolean isEmpty() { + return map.isEmpty(); + } + + Iterator> get( + final MemorySegment from, + final MemorySegment to) { + if (from == null && to == null) { + // All + return map.values().iterator(); + } else if (from == null) { + // Head + return map.headMap(to).values().iterator(); + } else if (to == null) { + // Tail + return map.tailMap(from).values().iterator(); + } else { + // Slice + return map.subMap(from, to).values().iterator(); + } + } + + Entry get(final MemorySegment key) { + return map.get(key); + } + + Entry upsert(final Entry entry) { + return map.put(entry.key(), entry); + } +} diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/MemorySegmentComparator.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/MemorySegmentComparator.java new file mode 100644 index 000000000..cacb6dce9 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/MemorySegmentComparator.java @@ -0,0 +1,89 @@ +package ru.vk.itmo.test.alenkovayulya.dao; + +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.util.Comparator; + +/** + * Compares {@link MemorySegment}s. + * + * @author incubos + */ +final class MemorySegmentComparator implements Comparator { + static final Comparator INSTANCE = + new MemorySegmentComparator(); + + private MemorySegmentComparator() { + // Singleton + } + + @Override + public int compare( + final MemorySegment left, + final MemorySegment right) { + final long mismatch = left.mismatch(right); + if (mismatch == -1L) { + // No mismatch + return 0; + } + + if (mismatch == left.byteSize()) { + // left is prefix of right, so left is smaller + return -1; + } + + if (mismatch == right.byteSize()) { + // right is prefix of left, so left is greater + return 1; + } + + // Compare mismatched bytes as unsigned + return Byte.compareUnsigned( + left.getAtIndex( + ValueLayout.OfByte.JAVA_BYTE, + mismatch), + right.getAtIndex( + ValueLayout.OfByte.JAVA_BYTE, + mismatch)); + } + + static int compare( + final MemorySegment srcSegment, + final long srcFromOffset, + final long srcLength, + final MemorySegment dstSegment, + final long dstFromOffset, + final long dstLength) { + final long mismatch = + MemorySegment.mismatch( + srcSegment, + srcFromOffset, + srcFromOffset + srcLength, + dstSegment, + dstFromOffset, + dstFromOffset + dstLength); + if (mismatch == -1L) { + // No mismatch + return 0; + } + + if (mismatch == srcLength) { + // left is prefix of right, so left is smaller + return -1; + } + + if (mismatch == dstLength) { + // right is prefix of left, so left is greater + return 1; + } + + // Compare mismatched bytes as unsigned + return Byte.compareUnsigned( + srcSegment.getAtIndex( + ValueLayout.OfByte.JAVA_BYTE, + srcFromOffset + mismatch), + dstSegment.getAtIndex( + ValueLayout.OfByte.JAVA_BYTE, + dstFromOffset + mismatch)); + } +} diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/MergingEntryIterator.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/MergingEntryIterator.java new file mode 100644 index 000000000..8f9df22fc --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/MergingEntryIterator.java @@ -0,0 +1,68 @@ +package ru.vk.itmo.test.alenkovayulya.dao; + +import ru.vk.itmo.dao.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.*; + +/** + * Merges entry {@link Iterator}s. + * + * @author incubos + */ +final class MergingEntryIterator implements Iterator> { + private final Queue iterators; + + MergingEntryIterator(final List iterators) { + assert iterators.stream().allMatch(WeightedPeekingEntryIterator::hasNext); + + this.iterators = new PriorityQueue<>(iterators); + } + + @Override + public boolean hasNext() { + return !iterators.isEmpty(); + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + final WeightedPeekingEntryIterator top = iterators.remove(); + final Entry result = top.next(); + + if (top.hasNext()) { + // Not exhausted + iterators.add(top); + } + + // Remove older versions of the key + while (true) { + final WeightedPeekingEntryIterator iterator = iterators.peek(); + if (iterator == null) { + // Nothing left + break; + } + + // Skip entries with the same key + final Entry entry = iterator.peek(); + if (MemorySegmentComparator.INSTANCE.compare(result.key(), entry.key()) != 0) { + // Reached another key + break; + } + + // Drop + iterators.remove(); + // Skip + iterator.next(); + if (iterator.hasNext()) { + // Not exhausted + iterators.add(iterator); + } + } + + return result; + } +} diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/ReferenceDao.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/ReferenceDao.java new file mode 100644 index 000000000..306816ba5 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/ReferenceDao.java @@ -0,0 +1,292 @@ +package ru.vk.itmo.test.alenkovayulya.dao; + +import ru.vk.itmo.dao.Config; +import ru.vk.itmo.dao.Dao; +import ru.vk.itmo.dao.Entry; + +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Reference implementation of {@link Dao}. + * + * @author incubos + */ +public class ReferenceDao implements Dao> { + private final Config config; + private final Arena arena; + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + // Guarded by lock + private volatile TableSet tableSet; + + private final ExecutorService flusher = + Executors.newSingleThreadExecutor(r -> { + final Thread result = new Thread(r); + result.setName("flusher"); + return result; + }); + private final ExecutorService compactor = + Executors.newSingleThreadExecutor(r -> { + final Thread result = new Thread(r); + result.setName("compactor"); + return result; + }); + + private final AtomicBoolean closed = new AtomicBoolean(); + + public ReferenceDao(final Config config) throws IOException { + this.config = config; + this.arena = Arena.ofShared(); + + // First complete promotion of compacted SSTables + SSTables.promote( + config.basePath(), + 0, + 1); + + this.tableSet = + TableSet.from( + SSTables.discover( + arena, + config.basePath())); + } + + @Override + public Iterator> get( + final MemorySegment from, + final MemorySegment to) { + return new LiveFilteringIterator( + tableSet.get( + from, + to)); + } + + @Override + public Entry get(final MemorySegment key) { + // Without lock, just snapshot of table set + return tableSet.get(key); + } + + @Override + public void upsert(final Entry entry) { + final boolean autoFlush; + lock.readLock().lock(); + try { + if (tableSet.memTableSize.get() > config.flushThresholdBytes() + && tableSet.flushingTable != null) { + throw new IllegalStateException("Can't keep up with flushing!"); + } + + // Upsert + final Entry previous = tableSet.upsert(entry); + + // Update size estimate + final long size = tableSet.memTableSize.addAndGet(sizeOf(entry) - sizeOf(previous)); + autoFlush = size > config.flushThresholdBytes(); + } finally { + lock.readLock().unlock(); + } + + if (autoFlush) { + initiateFlush(true); + } + } + + private static long sizeOf(final Entry entry) { + if (entry == null) { + return 0L; + } + + if (entry.value() == null) { + return entry.key().byteSize(); + } + + return entry.key().byteSize() + entry.value().byteSize(); + } + + private void initiateFlush(final boolean auto) { + flusher.submit(() -> { + final TableSet currentTableSet; + lock.writeLock().lock(); + try { + if (this.tableSet.memTable.isEmpty()) { + // Nothing to flush + return; + } + + if (auto && this.tableSet.memTableSize.get() < config.flushThresholdBytes()) { + // Not enough data to flush + return; + } + + // Switch memTable to flushing + currentTableSet = this.tableSet.flushing(); + this.tableSet = currentTableSet; + } finally { + lock.writeLock().unlock(); + } + + // Write + final int sequence = currentTableSet.nextSequence(); + try { + new SSTableWriter() + .write( + config.basePath(), + sequence, + currentTableSet.flushingTable.get(null, null)); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(-1); + return; + } + + // Open + final SSTable flushed; + try { + flushed = SSTables.open( + arena, + config.basePath(), + sequence); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(-2); + return; + } + + // Switch + lock.writeLock().lock(); + try { + this.tableSet = this.tableSet.flushed(flushed); + } finally { + lock.writeLock().unlock(); + } + }).state(); + } + + @Override + public void flush() throws IOException { + initiateFlush(false); + } + + @Override + public void compact() throws IOException { + compactor.submit(() -> { + final TableSet currentTableSet; + lock.writeLock().lock(); + try { + currentTableSet = this.tableSet; + if (currentTableSet.ssTables.size() < 2) { + // Nothing to compact + return; + } + } finally { + lock.writeLock().unlock(); + } + + // Compact to 0 + try { + new SSTableWriter() + .write( + config.basePath(), + 0, + new LiveFilteringIterator( + currentTableSet.allSSTableEntries())); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(-3); + } + + // Open 0 + final SSTable compacted; + try { + compacted = + SSTables.open( + arena, + config.basePath(), + 0); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(-4); + return; + } + + // Replace old SSTables with compacted one to + // keep serving requests + final Set replaced = new HashSet<>(currentTableSet.ssTables); + lock.writeLock().lock(); + try { + this.tableSet = + this.tableSet.compacted( + replaced, + compacted); + } finally { + lock.writeLock().unlock(); + } + + // Remove compacted SSTables starting from the oldest ones. + // If we crash, 0 contains all the data, and + // it will be promoted on reopen. + for (final SSTable ssTable : currentTableSet.ssTables.reversed()) { + try { + SSTables.remove( + config.basePath(), + ssTable.sequence); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(-5); + } + } + + // Promote zero to one (possibly replacing) + try { + SSTables.promote( + config.basePath(), + 0, + 1); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(-6); + } + + // Replace promoted SSTable + lock.writeLock().lock(); + try { + this.tableSet = + this.tableSet.compacted( + Collections.singleton(compacted), + compacted.withSequence(1)); + } finally { + lock.writeLock().unlock(); + } + }).state(); + } + + @Override + public void close() throws IOException { + if (closed.getAndSet(true)) { + // Already closed + return; + } + + // Maybe flush + flush(); + + // Stop all the threads + flusher.close(); + compactor.close(); + + // Close arena + arena.close(); + } +} diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/SSTable.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/SSTable.java new file mode 100644 index 000000000..91233ce1a --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/SSTable.java @@ -0,0 +1,204 @@ +package ru.vk.itmo.test.alenkovayulya.dao; + +import ru.vk.itmo.dao.BaseEntry; +import ru.vk.itmo.dao.Entry; + +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.util.Collections; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Persistent SSTable in data file and index file. + * + * @author incubos + * @see SSTables + */ +final class SSTable { + final int sequence; + + private final MemorySegment index; + private final MemorySegment data; + private final long size; + + SSTable( + final int sequence, + final MemorySegment index, + final MemorySegment data) { + this.sequence = sequence; + this.index = index; + this.data = data; + this.size = index.byteSize() / Long.BYTES; + } + + SSTable withSequence(final int sequence) { + return new SSTable( + sequence, + index, + data); + } + + /** + * Returns index of the entry if found; otherwise, (-(insertion point) - 1). + * The insertion point is defined as the point at which the key would be inserted: + * the index of the first element greater than the key, + * or size if all keys are less than the specified key. + * Note that this guarantees that the return value will be >= 0 + * if and only if the key is found. + */ + private long entryBinarySearch(final MemorySegment key) { + long low = 0L; + long high = size - 1; + + while (low <= high) { + final long mid = (low + high) >>> 1; + final long midEntryOffset = entryOffset(mid); + final long midKeyLength = getLength(midEntryOffset); + final int compare = + MemorySegmentComparator.compare( + data, + midEntryOffset + Long.BYTES, // Position at key + midKeyLength, + key, + 0L, + key.byteSize()); + + if (compare < 0) { + low = mid + 1; + } else if (compare > 0) { + high = mid - 1; + } else { + return mid; + } + } + + return -(low + 1); + } + + private long entryOffset(final long entry) { + return index.get( + ValueLayout.OfLong.JAVA_LONG, + entry * Long.BYTES); + } + + private long getLength(final long offset) { + return data.get( + ValueLayout.OfLong.JAVA_LONG_UNALIGNED, + offset); + } + + Iterator> get( + final MemorySegment from, + final MemorySegment to) { + assert from == null || to == null || MemorySegmentComparator.INSTANCE.compare(from, to) <= 0; + + // Slice of SSTable in absolute offsets + final long fromOffset; + final long toOffset; + + // Left offset bound + if (from == null) { + // Start from the beginning + fromOffset = 0L; + } else { + final long fromEntry = entryBinarySearch(from); + if (fromEntry >= 0L) { + fromOffset = entryOffset(fromEntry); + } else if (-fromEntry - 1 == size) { + // No relevant data + return Collections.emptyIterator(); + } else { + // Greater but existing key found + fromOffset = entryOffset(-fromEntry - 1); + } + } + + // Right offset bound + if (to == null) { + // Up to the end + toOffset = data.byteSize(); + } else { + final long toEntry = entryBinarySearch(to); + if (toEntry >= 0L) { + toOffset = entryOffset(toEntry); + } else if (-toEntry - 1 == size) { + // Up to the end + toOffset = data.byteSize(); + } else { + // Greater but existing key found + toOffset = entryOffset(-toEntry - 1); + } + } + + return new SliceIterator(fromOffset, toOffset); + } + + Entry get(final MemorySegment key) { + final long entry = entryBinarySearch(key); + if (entry < 0) { + return null; + } + + // Skip key (will reuse the argument) + long offset = entryOffset(entry); + offset += Long.BYTES + key.byteSize(); + // Extract value length + final long valueLength = getLength(offset); + if (valueLength == SSTables.TOMBSTONE_VALUE_LENGTH) { + // Tombstone encountered + return new BaseEntry<>(key, null); + } else { + // Get value + offset += Long.BYTES; + final MemorySegment value = data.asSlice(offset, valueLength); + return new BaseEntry<>(key, value); + } + } + + private final class SliceIterator implements Iterator> { + private long offset; + private final long toOffset; + + private SliceIterator( + final long offset, + final long toOffset) { + this.offset = offset; + this.toOffset = toOffset; + } + + @Override + public boolean hasNext() { + return offset < toOffset; + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + // Read key length + final long keyLength = getLength(offset); + offset += Long.BYTES; + + // Read key + final MemorySegment key = data.asSlice(offset, keyLength); + offset += keyLength; + + // Read value length + final long valueLength = getLength(offset); + offset += Long.BYTES; + + // Read value + if (valueLength == SSTables.TOMBSTONE_VALUE_LENGTH) { + // Tombstone encountered + return new BaseEntry<>(key, null); + } else { + final MemorySegment value = data.asSlice(offset, valueLength); + offset += valueLength; + return new BaseEntry<>(key, value); + } + } + } +} diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/SSTableWriter.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/SSTableWriter.java new file mode 100644 index 000000000..e2d828bd2 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/SSTableWriter.java @@ -0,0 +1,166 @@ +package ru.vk.itmo.test.alenkovayulya.dao; + +import ru.vk.itmo.dao.Entry; + +import java.io.BufferedOutputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.Iterator; + +/** + * Writes {@link Entry} {@link Iterator} to SSTable on disk. + * + *

Index file {@code .index} contains {@code long} offsets to entries in data file: + * {@code [offset0, offset1, ...]} + * + *

Data file {@code .data} contains serialized entries: + * {@code } + * + *

Tombstones are encoded as {@code valueLength} {@code -1} and no subsequent value. + * + * @author incubos + */ +final class SSTableWriter { + private static final int BUFFER_SIZE = 64 * 1024; + + // Reusable buffers to eliminate allocations. + // But excessive memory copying is still there :( + // Long cell + private final ByteArraySegment longBuffer = new ByteArraySegment(Long.BYTES); + // Growable blob cell + private final ByteArraySegment blobBuffer = new ByteArraySegment(512); + + void write( + final Path baseDir, + final int sequence, + final Iterator> entries) throws IOException { + // Write to temporary files + final Path tempIndexName = SSTables.tempIndexName(baseDir, sequence); + final Path tempDataName = SSTables.tempDataName(baseDir, sequence); + + // Delete temporary files to eliminate tails + Files.deleteIfExists(tempIndexName); + Files.deleteIfExists(tempDataName); + + // Iterate in a single pass! + // Will write through FileChannel despite extra memory copying and + // no buffering (which may be implemented later). + // Looking forward to MemorySegment facilities in FileChannel! + try (OutputStream index = + new BufferedOutputStream( + new FileOutputStream( + tempIndexName.toFile()), + BUFFER_SIZE); + OutputStream data = + new BufferedOutputStream( + new FileOutputStream( + tempDataName.toFile()), + BUFFER_SIZE)) { + long entryOffset = 0L; + + // Iterate and serialize + while (entries.hasNext()) { + // First write offset to the entry + writeLong(entryOffset, index); + + // Then write the entry + final Entry entry = entries.next(); + entryOffset += writeEntry(entry, data); + } + } + + // Publish files atomically + // FIRST index, LAST data + final Path indexName = + SSTables.indexName( + baseDir, + sequence); + Files.move( + tempIndexName, + indexName, + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + final Path dataName = + SSTables.dataName( + baseDir, + sequence); + Files.move( + tempDataName, + dataName, + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + } + + private void writeLong( + final long value, + final OutputStream os) throws IOException { + longBuffer.segment().set( + ValueLayout.OfLong.JAVA_LONG_UNALIGNED, + 0, + value); + longBuffer.withArray(os::write); + } + + private void writeSegment( + final MemorySegment value, + final OutputStream os) throws IOException { + final long size = value.byteSize(); + blobBuffer.ensureCapacity(size); + MemorySegment.copy( + value, + 0L, + blobBuffer.segment(), + 0L, + size); + blobBuffer.withArray(array -> + os.write( + array, + 0, + (int) size)); + } + + /** + * Writes {@link Entry} to {@link FileChannel}. + * + * @return written bytes + */ + private long writeEntry( + final Entry entry, + final OutputStream os) throws IOException { + final MemorySegment key = entry.key(); + final MemorySegment value = entry.value(); + long result = 0L; + + // Key size + writeLong(key.byteSize(), os); + result += Long.BYTES; + + // Key + writeSegment(key, os); + result += key.byteSize(); + + // Value size and possibly value + if (value == null) { + // Tombstone + writeLong(SSTables.TOMBSTONE_VALUE_LENGTH, os); + result += Long.BYTES; + } else { + // Value length + writeLong(value.byteSize(), os); + result += Long.BYTES; + + // Value + writeSegment(value, os); + result += value.byteSize(); + } + + return result; + } +} diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/SSTables.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/SSTables.java new file mode 100644 index 000000000..8b9e1ccde --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/SSTables.java @@ -0,0 +1,162 @@ +package ru.vk.itmo.test.alenkovayulya.dao; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +/** + * Provides {@link SSTable} management facilities: dumping and discovery. + * + * @author incubos + */ +final class SSTables { + public static final String INDEX_SUFFIX = ".index"; + public static final String DATA_SUFFIX = ".data"; + public static final long TOMBSTONE_VALUE_LENGTH = -1L; + + private static final String TEMP_SUFFIX = ".tmp"; + + /** + * Can't instantiate. + */ + private SSTables() { + // Only static methods + } + + static Path indexName( + final Path baseDir, + final int sequence) { + return baseDir.resolve(sequence + INDEX_SUFFIX); + } + + static Path dataName( + final Path baseDir, + final int sequence) { + return baseDir.resolve(sequence + DATA_SUFFIX); + } + + static Path tempIndexName( + final Path baseDir, + final int sequence) { + return baseDir.resolve(sequence + INDEX_SUFFIX + TEMP_SUFFIX); + } + + static Path tempDataName( + final Path baseDir, + final int sequence) { + return baseDir.resolve(sequence + DATA_SUFFIX + TEMP_SUFFIX); + } + + /** + * Returns {@link List} of {@link SSTable}s from freshest to oldest. + */ + static List discover( + final Arena arena, + final Path baseDir) throws IOException { + if (!Files.exists(baseDir)) { + return Collections.emptyList(); + } + + final List result = new ArrayList<>(); + try (Stream files = Files.list(baseDir)) { + files.forEach(file -> { + final String fileName = file.getFileName().toString(); + if (!fileName.endsWith(DATA_SUFFIX)) { + // Skip non data + return; + } + + final int sequence = + // .data -> N + Integer.parseInt( + fileName.substring( + 0, + fileName.length() - DATA_SUFFIX.length())); + + try { + result.add(open(arena, baseDir, sequence)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + // Sort from freshest to oldest + result.sort((o1, o2) -> Integer.compare(o2.sequence, o1.sequence)); + + return Collections.unmodifiableList(result); + } + + static SSTable open( + final Arena arena, + final Path baseDir, + final int sequence) throws IOException { + final MemorySegment index = + mapReadOnly( + arena, + indexName(baseDir, sequence)); + final MemorySegment data = + mapReadOnly( + arena, + dataName(baseDir, sequence)); + + return new SSTable( + sequence, + index, + data); + } + + private static MemorySegment mapReadOnly( + final Arena arena, + final Path file) throws IOException { + try (FileChannel channel = + FileChannel.open( + file, + StandardOpenOption.READ)) { + return channel.map( + FileChannel.MapMode.READ_ONLY, + 0L, + Files.size(file), + arena); + } + } + + static void remove( + final Path baseDir, + final int sequence) throws IOException { + // First delete data file to make SSTable invisible + Files.delete(dataName(baseDir, sequence)); + Files.delete(indexName(baseDir, sequence)); + } + + static void promote( + final Path baseDir, + final int from, + final int to) throws IOException { + // Build to progress to the same outcome + if (Files.exists(indexName(baseDir, from))) { + Files.move( + indexName(baseDir, from), + indexName(baseDir, to), + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + } + if (Files.exists(dataName(baseDir, from))) { + Files.move( + dataName(baseDir, from), + dataName(baseDir, to), + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + } + } +} diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/TableSet.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/TableSet.java new file mode 100644 index 000000000..b58aa2829 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/TableSet.java @@ -0,0 +1,201 @@ +package ru.vk.itmo.test.alenkovayulya.dao; + +import ru.vk.itmo.dao.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Data set in various tables. + * + * @author incubos + */ +final class TableSet { + final MemTable memTable; + final AtomicLong memTableSize; + // null or read-only + final MemTable flushingTable; + // From freshest to oldest + final List ssTables; + + private TableSet( + final MemTable memTable, + final AtomicLong memTableSize, + final MemTable flushingTable, + final List ssTables) { + this.memTable = memTable; + this.memTableSize = memTableSize; + this.flushingTable = flushingTable; + this.ssTables = ssTables; + } + + static TableSet from(final List ssTables) { + return new TableSet( + new MemTable(), + new AtomicLong(), + null, + ssTables); + } + + int nextSequence() { + return ssTables.stream() + .mapToInt(t -> t.sequence) + .max() + .orElse(0) + 1; + } + + TableSet flushing() { + if (memTable.isEmpty()) { + throw new IllegalStateException("Nothing to flush"); + } + + if (flushingTable != null) { + throw new IllegalStateException("Already flushing"); + } + + return new TableSet( + new MemTable(), + new AtomicLong(), + memTable, + ssTables); + } + + TableSet flushed(final SSTable flushed) { + final List newSSTables = new ArrayList<>(ssTables.size() + 1); + newSSTables.add(flushed); + newSSTables.addAll(ssTables); + return new TableSet( + memTable, + memTableSize, + null, + newSSTables); + } + + TableSet compacted( + final Set replaced, + final SSTable with) { + final List newSsTables = new ArrayList<>(this.ssTables.size() + 1); + + // Keep not replaced SSTables + for (final SSTable ssTable : this.ssTables) { + if (!replaced.contains(ssTable)) { + newSsTables.add(ssTable); + } + } + + // Logically the oldest one + newSsTables.add(with); + + return new TableSet( + memTable, + memTableSize, + flushingTable, + newSsTables); + } + + Iterator> get( + final MemorySegment from, + final MemorySegment to) { + final List iterators = + new ArrayList<>(2 + ssTables.size()); + + // MemTable goes first + final Iterator> memTableIterator = + memTable.get(from, to); + if (memTableIterator.hasNext()) { + iterators.add( + new WeightedPeekingEntryIterator( + Integer.MIN_VALUE, + memTableIterator)); + } + + // Then goes flushing + if (flushingTable != null) { + final Iterator> flushingIterator = + flushingTable.get(from, to); + if (flushingIterator.hasNext()) { + iterators.add( + new WeightedPeekingEntryIterator( + Integer.MIN_VALUE + 1, + flushingIterator)); + } + } + + // Then go all the SSTables + for (int i = 0; i < ssTables.size(); i++) { + final SSTable ssTable = ssTables.get(i); + final Iterator> ssTableIterator = + ssTable.get(from, to); + if (ssTableIterator.hasNext()) { + iterators.add( + new WeightedPeekingEntryIterator( + i, + ssTableIterator)); + } + } + + return switch (iterators.size()) { + case 0 -> Collections.emptyIterator(); + case 1 -> iterators.get(0); + default -> new MergingEntryIterator(iterators); + }; + } + + Entry get(final MemorySegment key) { + // Slightly optimized version not to pollute the heap + + // First check MemTable + Entry result = memTable.get(key); + if (result != null) { + // Transform tombstone + return swallowTombstone(result); + } + + // Then check flushing + if (flushingTable != null) { + result = flushingTable.get(key); + if (result != null) { + // Transform tombstone + return swallowTombstone(result); + } + } + + // At last check SSTables from freshest to oldest + for (final SSTable ssTable : ssTables) { + result = ssTable.get(key); + if (result != null) { + // Transform tombstone + return swallowTombstone(result); + } + } + + // Nothing found + return null; + } + + private static Entry swallowTombstone(final Entry entry) { + return entry.value() == null ? null : entry; + } + + Entry upsert(final Entry entry) { + return memTable.upsert(entry); + } + + Iterator> allSSTableEntries() { + final List iterators = + new ArrayList<>(ssTables.size()); + + for (int i = 0; i < ssTables.size(); i++) { + final SSTable ssTable = ssTables.get(i); + final Iterator> ssTableIterator = + ssTable.get(null, null); + iterators.add( + new WeightedPeekingEntryIterator( + i, + ssTableIterator)); + } + + return new MergingEntryIterator(iterators); + } +} diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/WeightedPeekingEntryIterator.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/WeightedPeekingEntryIterator.java new file mode 100644 index 000000000..05b8d74ba --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/dao/WeightedPeekingEntryIterator.java @@ -0,0 +1,67 @@ +package ru.vk.itmo.test.alenkovayulya.dao; + +import ru.vk.itmo.dao.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Peeking {@link Iterator} wrapper. + * + * @author incubos + */ +final class WeightedPeekingEntryIterator + implements Iterator>, + Comparable { + private final int weight; + private final Iterator> delegate; + private Entry next; + + WeightedPeekingEntryIterator( + final int weight, + final Iterator> delegate) { + this.weight = weight; + this.delegate = delegate; + this.next = delegate.hasNext() ? delegate.next() : null; + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + final Entry result = next; + next = delegate.hasNext() ? delegate.next() : null; + return result; + } + + Entry peek() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + return next; + } + + @Override + public int compareTo(final WeightedPeekingEntryIterator other) { + // First compare keys + int result = + MemorySegmentComparator.INSTANCE.compare( + peek().key(), + other.peek().key()); + if (result != 0) { + return result; + } + + // Then compare weights if keys are equal + return Integer.compare(weight, other.weight); + } +}