diff --git a/src/main/java/ru/vk/itmo/kislovdanil/InMemoryDao.java b/src/main/java/ru/vk/itmo/kislovdanil/InMemoryDao.java deleted file mode 100644 index 6d1c3d3c9..000000000 --- a/src/main/java/ru/vk/itmo/kislovdanil/InMemoryDao.java +++ /dev/null @@ -1,47 +0,0 @@ -package ru.vk.itmo.kislovdanil; - -import ru.vk.itmo.Dao; -import ru.vk.itmo.Entry; - -import java.lang.foreign.MemorySegment; -import java.lang.foreign.ValueLayout; -import java.util.Comparator; -import java.util.Iterator; -import java.util.concurrent.ConcurrentSkipListMap; - -public class InMemoryDao implements Dao> { - ConcurrentSkipListMap> storage = - new ConcurrentSkipListMap<>(new MemSegComparator()); - - private static class MemSegComparator implements Comparator { - @Override - public int compare(MemorySegment o1, MemorySegment o2) { - long mismatch = o1.mismatch(o2); - if (mismatch == -1) { - return 0; - } - if (mismatch == Math.min(o1.byteSize(), o2.byteSize())) { - return Long.compare(o1.byteSize(), o2.byteSize()); - } - return Byte.compare(o1.get(ValueLayout.JAVA_BYTE, mismatch), o2.get(ValueLayout.JAVA_BYTE, mismatch)); - } - } - - @Override - public Iterator> get(MemorySegment from, MemorySegment to) { - if (from == null && to == null) return storage.values().iterator(); - if (from != null && to == null) return storage.tailMap(from).values().iterator(); - if (from == null) return storage.headMap(to).values().iterator(); - return storage.subMap(from, to).values().iterator(); - } - - @Override - public Entry get(MemorySegment key) { - return storage.get(key); - } - - @Override - public void upsert(Entry entry) { - storage.put(entry.key(), entry); - } -} diff --git a/src/main/java/ru/vk/itmo/kislovdanil/MemTable.java b/src/main/java/ru/vk/itmo/kislovdanil/MemTable.java new file mode 100644 index 000000000..e94c4cf7c --- /dev/null +++ b/src/main/java/ru/vk/itmo/kislovdanil/MemTable.java @@ -0,0 +1,40 @@ +package ru.vk.itmo.kislovdanil; + +import ru.vk.itmo.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.Comparator; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicLong; + +/* Basically, ConcurrentSkipList with counter of threads that putting value in it. + Necessary for preventing data loss while flushing. + */ +public class MemTable { + private final ConcurrentSkipListMap> storage; + private final long threshold; + private final AtomicLong size = new AtomicLong(0); + + private static long getEntrySize(Entry entry) { + long valueSize = entry.value() == null ? 0 : entry.value().byteSize(); + return valueSize + entry.key().byteSize(); + } + + public MemTable(Comparator comparator, long threshold) { + this.storage = new ConcurrentSkipListMap<>(comparator); + this.threshold = threshold; + } + + public boolean put(Entry entry) { + long entrySize = getEntrySize(entry); + if (size.addAndGet(entrySize) - entrySize > threshold) { + return false; + } + storage.put(entry.key(), entry); + return true; + } + + public ConcurrentSkipListMap> getStorage() { + return storage; + } +} diff --git a/src/main/java/ru/vk/itmo/kislovdanil/PersistentDao.java b/src/main/java/ru/vk/itmo/kislovdanil/PersistentDao.java index 890b2dfc0..86a52bc1d 100644 --- a/src/main/java/ru/vk/itmo/kislovdanil/PersistentDao.java +++ b/src/main/java/ru/vk/itmo/kislovdanil/PersistentDao.java @@ -4,51 +4,86 @@ import ru.vk.itmo.Dao; import ru.vk.itmo.Entry; import ru.vk.itmo.kislovdanil.exceptions.DBException; +import ru.vk.itmo.kislovdanil.exceptions.OverloadException; import ru.vk.itmo.kislovdanil.iterators.DatabaseIterator; +import ru.vk.itmo.kislovdanil.iterators.MemTableIterator; import ru.vk.itmo.kislovdanil.iterators.MergeIterator; +import ru.vk.itmo.kislovdanil.sstable.SSTable; import java.io.File; import java.io.IOException; +import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; import java.lang.foreign.ValueLayout; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; public class PersistentDao implements Dao>, Iterable> { public static final MemorySegment DELETED_VALUE = null; private final Config config; - private final List tables = new ArrayList<>(); + private volatile List tables = new ArrayList<>(); private final Comparator comparator = new MemSegComparator(); - private final ConcurrentNavigableMap> storage = - new ConcurrentSkipListMap<>(comparator); + private volatile MemTable memTable; + // Temporary storage in case of main storage flushing (Read only) + private volatile MemTable additionalStorage; + // In case of additional table overload while main table is flushing + private final AtomicLong nextId = new AtomicLong(); + private final ExecutorService commonExecutorService = Executors.newFixedThreadPool(2); + // To prevent parallel flushing + private volatile Future compcatFuture; + // To make sure that flushing in close() will be started + private volatile Future flushFuture; + // Have to take before any tables modification + private final Lock compactionLock = new ReentrantLock(); + // Have to take read while upsert and write while flushing (to prevent data loss) + private final ReadWriteLock upsertLock = new ReentrantReadWriteLock(); + private final Arena filesArena = Arena.ofShared(); - private long lastTimestamp = System.currentTimeMillis(); + private long getMaxTablesId(Iterable tableIterable) { + long curMaxId = -1; + for (SSTable table : tableIterable) { + curMaxId = Math.max(curMaxId, table.getTableId()); + } + return curMaxId; + } public PersistentDao(Config config) throws IOException { this.config = config; + this.memTable = new MemTable(comparator, config.flushThresholdBytes()); File basePathDirectory = new File(config.basePath().toString()); String[] ssTablesIds = basePathDirectory.list(); if (ssTablesIds == null) return; for (String tableID : ssTablesIds) { - // SSTable constructor with rewrite=false reads table data from disk if it exists - tables.add(new SSTable(config.basePath(), comparator, Long.parseLong(tableID), - storage.values(), false)); + // SSTable constructor without entries iterator reads table data from disk if it exists + tables.add(new SSTable(config.basePath(), comparator, Long.parseLong(tableID), filesArena)); } + nextId.set(getMaxTablesId(tables) + 1); tables.sort(SSTable::compareTo); } @Override public Iterator> get(MemorySegment from, MemorySegment to) { - List iterators = new ArrayList<>(tables.size() + 1); + List iterators = new ArrayList<>(tables.size() + 2); for (SSTable table : tables) { iterators.add(table.getRange(from, to)); } - iterators.add(new MemTableIterator(from, to)); + iterators.add(new MemTableIterator(from, to, memTable, Long.MAX_VALUE)); + if (additionalStorage != null) { + iterators.add(new MemTableIterator(from, to, additionalStorage, Long.MAX_VALUE - 1)); + } return new MergeIterator(iterators, comparator); } @@ -57,19 +92,23 @@ private static Entry wrapEntryIfDeleted(Entry entr return entry; } - private void updateId() { - lastTimestamp = Math.max(lastTimestamp + 1, System.currentTimeMillis()); + private long getNextId() { + return nextId.getAndIncrement(); } @Override public Entry get(MemorySegment key) { - Entry ans = storage.get(key); + Entry ans = memTable.getStorage().get(key); if (ans != null) return wrapEntryIfDeleted(ans); + if (additionalStorage != null) { + ans = additionalStorage.getStorage().get(key); + if (ans != null) return wrapEntryIfDeleted(ans); + } try { - for (SSTable table : tables) { - Entry data = table.find(key); - if (data != null) { - return wrapEntryIfDeleted(data); + for (SSTable table : tables.reversed()) { + ans = table.find(key); + if (ans != null) { + return wrapEntryIfDeleted(ans); } } } catch (IOException e) { @@ -80,39 +119,122 @@ public Entry get(MemorySegment key) { @Override public void upsert(Entry entry) { - storage.put(entry.key(), entry); + upsertLock.readLock().lock(); + try { + if (memTable.put(entry)) { + return; + } + } finally { + upsertLock.readLock().unlock(); + } + flush(); + upsertLock.readLock().lock(); + try { + if (!memTable.put(entry)) { + throw new OverloadException(entry); + } + } finally { + upsertLock.readLock().unlock(); + } } - @Override - public void flush() throws IOException { - if (!storage.isEmpty()) { - updateId(); - // SSTable constructor with rewrite=true writes MemTable data on disk deleting old data if it exists + private void makeFlush() throws IOException { + compactionLock.lock(); + try { + if (additionalStorage == null) return; + // SSTable constructor with entries iterator writes MemTable data on disk deleting old data if it exists tables.add(new SSTable(config.basePath(), comparator, - lastTimestamp, storage.values(), true)); - storage.clear(); + getNextId(), additionalStorage.getStorage().values().iterator(), filesArena)); + additionalStorage = null; + } finally { + compactionLock.unlock(); + } + } + + @Override + public void flush() { + upsertLock.writeLock().lock(); + try { + if (additionalStorage != null || memTable.getStorage().isEmpty()) { + return; + } + additionalStorage = memTable; + memTable = new MemTable(comparator, config.flushThresholdBytes()); + flushFuture = commonExecutorService.submit( + () -> { + try { + makeFlush(); + } catch (IOException e) { + throw new DBException(e); + } + }); + } finally { + upsertLock.writeLock().unlock(); + } + } + + private void closeExecutorService(ExecutorService executorService) { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } @Override public void close() throws IOException { + if (!filesArena.scope().isAlive()) { + return; + } + if (flushFuture != null) { + try { + flushFuture.get(); + } catch (InterruptedException | ExecutionException e) { + Thread.currentThread().interrupt(); + } + } flush(); + closeExecutorService(commonExecutorService); + filesArena.close(); } - @Override - public void compact() throws IOException { - if (!tables.isEmpty()) { - updateId(); - SSTable compactedTable = new SSTable(config.basePath(), comparator, lastTimestamp, - this, true); - storage.clear(); - for (SSTable table : tables) { + private void makeCompaction() throws IOException { + compactionLock.lock(); + try { + if (tables.size() <= 1) return; + long compactedTableId = getNextId(); + SSTable compactedTable = new SSTable(config.basePath(), comparator, compactedTableId, + new MergeIterator(tables, comparator), filesArena); + List oldTables = tables; + List newTables = new ArrayList<>(); + newTables.add(compactedTable); + tables = newTables; + for (SSTable table : oldTables) { table.deleteFromDisk(); } - tables.add(compactedTable); + } finally { + compactionLock.unlock(); } } + @Override + public void compact() { + if (compcatFuture != null && !compcatFuture.isDone()) { + compcatFuture.cancel(false); + } + compcatFuture = commonExecutorService.submit( + () -> { + try { + makeCompaction(); + } catch (IOException e) { + throw new DBException(e); + } + }); + } + @Override public Iterator> iterator() { return get(null, null); @@ -131,35 +253,4 @@ public int compare(MemorySegment o1, MemorySegment o2) { return Byte.compare(o1.get(ValueLayout.JAVA_BYTE, mismatch), o2.get(ValueLayout.JAVA_BYTE, mismatch)); } } - - private class MemTableIterator implements DatabaseIterator { - Iterator> innerIter; - - public MemTableIterator(MemorySegment from, MemorySegment to) { - if (from == null && to == null) { - innerIter = storage.values().iterator(); - } else if (from != null && to == null) { - innerIter = storage.tailMap(from).values().iterator(); - } else if (from == null) { - innerIter = storage.headMap(to).values().iterator(); - } else { - innerIter = storage.subMap(from, to).values().iterator(); - } - } - - @Override - public long getPriority() { - return Long.MAX_VALUE; - } - - @Override - public boolean hasNext() { - return innerIter.hasNext(); - } - - @Override - public Entry next() { - return innerIter.next(); - } - } } diff --git a/src/main/java/ru/vk/itmo/kislovdanil/exceptions/DBException.java b/src/main/java/ru/vk/itmo/kislovdanil/exceptions/DBException.java index 77af4155f..49b0b6732 100644 --- a/src/main/java/ru/vk/itmo/kislovdanil/exceptions/DBException.java +++ b/src/main/java/ru/vk/itmo/kislovdanil/exceptions/DBException.java @@ -4,4 +4,8 @@ public class DBException extends RuntimeException { public DBException(Exception e) { super(e); } + + public DBException() { + super(); + } } diff --git a/src/main/java/ru/vk/itmo/kislovdanil/exceptions/OverloadException.java b/src/main/java/ru/vk/itmo/kislovdanil/exceptions/OverloadException.java new file mode 100644 index 000000000..45b7552ac --- /dev/null +++ b/src/main/java/ru/vk/itmo/kislovdanil/exceptions/OverloadException.java @@ -0,0 +1,14 @@ +package ru.vk.itmo.kislovdanil.exceptions; + +import ru.vk.itmo.Entry; + +import java.lang.foreign.MemorySegment; + +public class OverloadException extends DBException { + public final Entry entry; + + public OverloadException(Entry entry) { + super(); + this.entry = entry; + } +} diff --git a/src/main/java/ru/vk/itmo/kislovdanil/iterators/MemTableIterator.java b/src/main/java/ru/vk/itmo/kislovdanil/iterators/MemTableIterator.java new file mode 100644 index 000000000..72a6d63be --- /dev/null +++ b/src/main/java/ru/vk/itmo/kislovdanil/iterators/MemTableIterator.java @@ -0,0 +1,42 @@ +package ru.vk.itmo.kislovdanil.iterators; + +import ru.vk.itmo.Entry; +import ru.vk.itmo.kislovdanil.MemTable; + +import java.lang.foreign.MemorySegment; +import java.util.Iterator; + +public class MemTableIterator implements DatabaseIterator { + private final Iterator> innerIter; + private final long priority; + + public MemTableIterator(MemorySegment from, MemorySegment to, + MemTable memTable, + long priority) { + this.priority = priority; + if (from == null && to == null) { + innerIter = memTable.getStorage().values().iterator(); + } else if (from != null && to == null) { + innerIter = memTable.getStorage().tailMap(from).values().iterator(); + } else if (from == null) { + innerIter = memTable.getStorage().headMap(to).values().iterator(); + } else { + innerIter = memTable.getStorage().subMap(from, to).values().iterator(); + } + } + + @Override + public long getPriority() { + return priority; + } + + @Override + public boolean hasNext() { + return innerIter.hasNext(); + } + + @Override + public Entry next() { + return innerIter.next(); + } +} diff --git a/src/main/java/ru/vk/itmo/kislovdanil/iterators/MergeIterator.java b/src/main/java/ru/vk/itmo/kislovdanil/iterators/MergeIterator.java index 445139ad4..c5eb029cd 100644 --- a/src/main/java/ru/vk/itmo/kislovdanil/iterators/MergeIterator.java +++ b/src/main/java/ru/vk/itmo/kislovdanil/iterators/MergeIterator.java @@ -2,8 +2,11 @@ import ru.vk.itmo.BaseEntry; import ru.vk.itmo.Entry; +import ru.vk.itmo.kislovdanil.sstable.SSTable; import java.lang.foreign.MemorySegment; +import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -15,7 +18,15 @@ public class MergeIterator implements Iterator> { private final NavigableMap itemsPool; private Entry currentEntry; - public MergeIterator(List iterators, Comparator comp) { + private static Iterable getDBIterators(Collection tables) { + List it = new ArrayList<>(tables.size()); + for (SSTable table: tables) { + it.add(table.getRange()); + } + return it; + } + + public MergeIterator(Iterable iterators, Comparator comp) { this.itemsPool = new TreeMap<>(comp); for (DatabaseIterator iter : iterators) { moveIterator(iter); @@ -23,6 +34,10 @@ public MergeIterator(List iterators, Comparator updateCurrentEntry(); } + public MergeIterator(Collection tables, Comparator comp) { + this(getDBIterators(tables), comp); + } + // Get next entry (skip all entries with null value) private void updateCurrentEntry() { MemorySegment value = null; diff --git a/src/main/java/ru/vk/itmo/kislovdanil/sstable/Metadata.java b/src/main/java/ru/vk/itmo/kislovdanil/sstable/Metadata.java new file mode 100644 index 000000000..b6b487d80 --- /dev/null +++ b/src/main/java/ru/vk/itmo/kislovdanil/sstable/Metadata.java @@ -0,0 +1,45 @@ +package ru.vk.itmo.kislovdanil.sstable; + +import ru.vk.itmo.Entry; + +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; + +class Metadata { + private final SSTable table; + private final SSTable.Range keyRange; + private final SSTable.Range valueRange; + private final Boolean isDeletion; + public static final long SIZE = Long.BYTES * 4 + 1; + + public Metadata(long index, SSTable table) { + this.table = table; + long base = index * Metadata.SIZE; + keyRange = table.readRange(table.summaryFile, base); + valueRange = table.readRange(table.summaryFile, base + 2 * Long.BYTES); + isDeletion = table.summaryFile.get(ValueLayout.JAVA_BOOLEAN, base + 4 * Long.BYTES); + } + + public MemorySegment readKey() { + return table.indexFile.asSlice(keyRange.offset, keyRange.length); + } + + public MemorySegment readValue() { + return isDeletion ? null : table.dataFile.asSlice(valueRange.offset, valueRange.length); + } + + public static void writeEntryMetadata(Entry entry, MemorySegment summaryFile, + long sumOffset, long indexOffset, long dataOffset) { + summaryFile.set(ValueLayout.JAVA_LONG_UNALIGNED, + sumOffset, indexOffset); + summaryFile.set(ValueLayout.JAVA_LONG_UNALIGNED, + sumOffset + Long.BYTES, entry.key().byteSize()); + summaryFile.set(ValueLayout.JAVA_LONG_UNALIGNED, + sumOffset + 2 * Long.BYTES, dataOffset); + summaryFile.set(ValueLayout.JAVA_BOOLEAN, + sumOffset + 4 * Long.BYTES, entry.value() == null); + summaryFile.set(ValueLayout.JAVA_LONG_UNALIGNED, + sumOffset + 3 * Long.BYTES, entry.value() == null ? 0 : entry.value().byteSize()); + } + +} diff --git a/src/main/java/ru/vk/itmo/kislovdanil/SSTable.java b/src/main/java/ru/vk/itmo/kislovdanil/sstable/SSTable.java similarity index 64% rename from src/main/java/ru/vk/itmo/kislovdanil/SSTable.java rename to src/main/java/ru/vk/itmo/kislovdanil/sstable/SSTable.java index 98a8e95e3..981181c1c 100644 --- a/src/main/java/ru/vk/itmo/kislovdanil/SSTable.java +++ b/src/main/java/ru/vk/itmo/kislovdanil/sstable/SSTable.java @@ -1,4 +1,4 @@ -package ru.vk.itmo.kislovdanil; +package ru.vk.itmo.kislovdanil.sstable; import ru.vk.itmo.BaseEntry; import ru.vk.itmo.Entry; @@ -12,29 +12,51 @@ import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Comparator; +import java.util.Iterator; +import java.util.List; -public class SSTable implements Comparable { +public class SSTable implements Comparable, Iterable> { // Contains offset and size for every key and every value in index file - private MemorySegment summaryFile; + MemorySegment summaryFile; private static final String SUMMARY_FILENAME = "summary"; // Contains keys - private MemorySegment indexFile; + MemorySegment indexFile; private static final String INDEX_FILENAME = "index"; // Contains values - private MemorySegment dataFile; + MemorySegment dataFile; private static final String DATA_FILENAME = "data"; - private final Comparator memSegComp; - private final Arena filesArena = Arena.ofAuto(); + final Comparator memSegComp; + private final Arena filesArena; private final long tableId; private final Path ssTablePath; - private final long size; + final long size; + + /* In case deletion while compaction of this table field would link to table with compacted data. + Necessary for iterators created before compaction. */ + // Gives a guarantee that SSTable files wouldn't be deleted while reading + + public long getTableId() { + return tableId; + } + + public SSTable(Path basePath, Comparator memSegComp, long tableId, Arena filesArena) + throws IOException { + this(basePath, memSegComp, tableId, null, false, filesArena); + } public SSTable(Path basePath, Comparator memSegComp, long tableId, - Iterable> entriesContainer, - boolean rewrite) throws IOException { + Iterator> entriesContainer, Arena filesArena) throws IOException { + this(basePath, memSegComp, tableId, entriesContainer, true, filesArena); + } + + private SSTable(Path basePath, Comparator memSegComp, long tableId, + Iterator> entriesContainer, + boolean rewrite, Arena filesArena) throws IOException { this.tableId = tableId; + this.filesArena = filesArena; this.ssTablePath = basePath.resolve(Long.toString(tableId)); this.memSegComp = memSegComp; Path summaryFilePath = this.ssTablePath.resolve(SUMMARY_FILENAME); @@ -106,13 +128,18 @@ private long[] getFilesSize(Iterable> entriesContainer) { } // Sequentially writes every entity data in SStable keeping files data consistent - private void write(Iterable> entriesContainer, + private void write(Iterator> entryIterator, Path summaryFilePath, Path indexFilePath, Path dataFilePath) throws IOException { prepareForWriting(summaryFilePath); prepareForWriting(indexFilePath); prepareForWriting(dataFilePath); - long[] filesSize = getFilesSize(entriesContainer); + List> entries = new ArrayList<>(); + while (entryIterator.hasNext()) { + entries.add(entryIterator.next()); + } + + long[] filesSize = getFilesSize(entries); summaryFile = mapFile(filesSize[0], summaryFilePath); indexFile = mapFile(filesSize[1], indexFilePath); @@ -121,7 +148,7 @@ private void write(Iterable> entriesContainer, long currentSummaryOffset = 0; long currentIndexOffset = 0; long currentDataOffset = 0; - for (Entry entry : entriesContainer) { + for (Entry entry : entries) { MemorySegment value = entry.value(); value = value == null ? filesArena.allocate(0) : value; MemorySegment key = entry.key(); @@ -140,19 +167,19 @@ public void deleteFromDisk() throws IOException { Files.delete(ssTablePath); } - private Range readRange(MemorySegment segment, long offset) { + Range readRange(MemorySegment segment, long offset) { return new Range(segment.get(ValueLayout.JAVA_LONG_UNALIGNED, offset), segment.get(ValueLayout.JAVA_LONG_UNALIGNED, offset + Long.BYTES)); } /* Binary search in summary and index files. Returns index of least record greater than key or equal. Returns -1 if no such key */ - private long findByKey(MemorySegment key) { + long findByKey(MemorySegment key) { long left = -1; // Always less than key long right = size; // Always greater or equal than key while (right - left > 1) { long middle = (right + left) / 2; - Metadata currentEntryMetadata = new Metadata(middle); + Metadata currentEntryMetadata = new Metadata(middle, this); MemorySegment curKey = currentEntryMetadata.readKey(); int compRes = memSegComp.compare(key, curKey); if (compRes <= 0) { @@ -170,8 +197,8 @@ private long findByKeyExact(MemorySegment key) { return goe; } - private Entry readEntry(long index) { - Metadata metadata = new Metadata(index); + Entry readEntry(long index) { + Metadata metadata = new Metadata(index, this); MemorySegment key = metadata.readKey(); MemorySegment value = metadata.readValue(); return new BaseEntry<>(key, value); @@ -184,58 +211,26 @@ public Entry find(MemorySegment key) throws IOException { } public DatabaseIterator getRange(MemorySegment from, MemorySegment to) { - return new SSTableIterator(from, to); + return new SSTableIterator(from, to, this); } - private class SSTableIterator implements DatabaseIterator { - private long curItemIndex; - private final MemorySegment maxKey; - - private Entry curEntry; - - public SSTableIterator(MemorySegment minKey, MemorySegment maxKey) { - this.maxKey = maxKey; - if (minKey == null) { - this.curItemIndex = 0; - } else { - this.curItemIndex = findByKey(minKey); - } - if (curItemIndex == -1) { - curItemIndex = Long.MAX_VALUE; - } else { - this.curEntry = readEntry(curItemIndex); - } - } - - @Override - public boolean hasNext() { - if (curItemIndex >= size) return false; - return maxKey == null || memSegComp.compare(curEntry.key(), maxKey) < 0; - } - - @Override - public Entry next() { - Entry result = curEntry; - curItemIndex++; - if (curItemIndex < size) { - curEntry = readEntry(curItemIndex); - } - return result; - } + public DatabaseIterator getRange() { + return getRange(null, null); + } - @Override - public long getPriority() { - return tableId; - } + @Override + public Iterator> iterator() { + return getRange(); } + // The less the ID, the less the table @Override public int compareTo(SSTable o) { - return Long.compare(o.tableId, this.tableId); + return Long.compare(this.tableId, o.tableId); } // Describes offset and size of any data segment - private static class Range { + static class Range { public long offset; public long length; @@ -244,42 +239,4 @@ public Range(long offset, long length) { this.length = length; } } - - private class Metadata { - private final Range keyRange; - private final Range valueRange; - private final Boolean isDeletion; - public static final long SIZE = Long.BYTES * 4 + 1; - - public Metadata(long index) { - long base = index * Metadata.SIZE; - keyRange = readRange(summaryFile, base); - valueRange = readRange(summaryFile, base + 2 * Long.BYTES); - isDeletion = summaryFile.get(ValueLayout.JAVA_BOOLEAN, base + 4 * Long.BYTES); - } - - public MemorySegment readKey() { - return indexFile.asSlice(keyRange.offset, keyRange.length); - } - - public MemorySegment readValue() { - return isDeletion ? null : dataFile.asSlice(valueRange.offset, valueRange.length); - } - - public static void writeEntryMetadata(Entry entry, MemorySegment summaryFile, - long sumOffset, long indexOffset, long dataOffset) { - summaryFile.set(ValueLayout.JAVA_LONG_UNALIGNED, - sumOffset, indexOffset); - summaryFile.set(ValueLayout.JAVA_LONG_UNALIGNED, - sumOffset + Long.BYTES, entry.key().byteSize()); - summaryFile.set(ValueLayout.JAVA_LONG_UNALIGNED, - sumOffset + 2 * Long.BYTES, dataOffset); - summaryFile.set(ValueLayout.JAVA_BOOLEAN, - sumOffset + 4 * Long.BYTES, entry.value() == null); - summaryFile.set(ValueLayout.JAVA_LONG_UNALIGNED, - sumOffset + 3 * Long.BYTES, entry.value() == null ? 0 : entry.value().byteSize()); - } - - } - } diff --git a/src/main/java/ru/vk/itmo/kislovdanil/sstable/SSTableIterator.java b/src/main/java/ru/vk/itmo/kislovdanil/sstable/SSTableIterator.java new file mode 100644 index 000000000..50940b38c --- /dev/null +++ b/src/main/java/ru/vk/itmo/kislovdanil/sstable/SSTableIterator.java @@ -0,0 +1,51 @@ +package ru.vk.itmo.kislovdanil.sstable; + +import ru.vk.itmo.Entry; +import ru.vk.itmo.kislovdanil.iterators.DatabaseIterator; + +import java.lang.foreign.MemorySegment; + +class SSTableIterator implements DatabaseIterator { + private long curItemIndex; + private final MemorySegment maxKey; + + private Entry curEntry; + private final SSTable table; + + public SSTableIterator(MemorySegment minKey, MemorySegment maxKey, SSTable table) { + this.table = table; + this.maxKey = maxKey; + if (table.size == 0) return; + if (minKey == null) { + this.curItemIndex = 0; + } else { + this.curItemIndex = this.table.findByKey(minKey); + } + if (curItemIndex == -1) { + curItemIndex = Long.MAX_VALUE; + } else { + this.curEntry = this.table.readEntry(curItemIndex); + } + } + + @Override + public boolean hasNext() { + if (curItemIndex >= this.table.size) return false; + return maxKey == null || table.memSegComp.compare(curEntry.key(), maxKey) < 0; + } + + @Override + public Entry next() { + Entry result = curEntry; + curItemIndex++; + if (curItemIndex < table.size) { + curEntry = table.readEntry(curItemIndex); + } + return result; + } + + @Override + public long getPriority() { + return table.getTableId(); + } +} diff --git a/src/main/java/ru/vk/itmo/test/kislovdanil/InMemoryDaoFactory.java b/src/main/java/ru/vk/itmo/test/kislovdanil/InMemoryDaoFactory.java index 999932829..730be2a2f 100644 --- a/src/main/java/ru/vk/itmo/test/kislovdanil/InMemoryDaoFactory.java +++ b/src/main/java/ru/vk/itmo/test/kislovdanil/InMemoryDaoFactory.java @@ -11,7 +11,7 @@ import java.lang.foreign.ValueLayout; import java.nio.charset.StandardCharsets; -@DaoFactory(stage = 4) +@DaoFactory(stage = 5) public class InMemoryDaoFactory implements DaoFactory.Factory> { @Override