Skip to content

Commit

Permalink
#4682 PlanB State Store
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Jan 10, 2025
1 parent 719343a commit f922898
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import stroom.query.language.functions.FieldIndex;
import stroom.query.language.functions.Val;
import stroom.query.language.functions.ValuesConsumer;
import stroom.util.concurrent.UncheckedInterruptedException;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;

Expand All @@ -28,6 +29,7 @@
import java.nio.file.Path;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import java.util.function.Predicate;

Expand All @@ -38,9 +40,12 @@ public abstract class AbstractLmdbReader<K, V> implements AutoCloseable {
private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(AbstractLmdbReader.class);

private static final byte[] NAME = "db".getBytes(UTF_8);
private static final int CONCURRENT_READERS = 10;

private final Semaphore concurrentReaderSemaphore;

final ByteBufferFactory byteBufferFactory;
final Env<ByteBuffer> env;
private final Env<ByteBuffer> env;
final Dbi<ByteBuffer> dbi;
final Serde<K, V> serde;

Expand All @@ -55,21 +60,37 @@ public AbstractLmdbReader(final Path path,
final Env.Builder<ByteBuffer> builder = Env.create()
.setMapSize(LmdbConfig.DEFAULT_MAX_STORE_SIZE.getBytes())
.setMaxDbs(1)
.setMaxReaders(1);
.setMaxReaders(CONCURRENT_READERS);

env = builder.open(lmdbEnvDir.getEnvDir().toFile(),
EnvFlags.MDB_NOTLS,
EnvFlags.MDB_NOLOCK,
EnvFlags.MDB_RDONLY_ENV);
dbi = env.openDbi(NAME);
concurrentReaderSemaphore = new Semaphore(CONCURRENT_READERS);
}

public synchronized Optional<V> get(final K key) {
try (final Txn<ByteBuffer> readTxn = env.txnRead()) {
return get(readTxn, key);
<R> R read(final Function<Txn<ByteBuffer>, R> function) {
try {
concurrentReaderSemaphore.acquire();
try {
try (final Txn<ByteBuffer> readTxn = env.txnRead()) {
return function.apply(readTxn);
}
} finally {
concurrentReaderSemaphore.release();
}
} catch (final InterruptedException e) {
LOGGER.error(e::getMessage, e);
Thread.currentThread().interrupt();
throw new UncheckedInterruptedException(e);
}
}

public Optional<V> get(final K key) {
return read(readTxn -> get(readTxn, key));
}

private Optional<V> get(final Txn<ByteBuffer> readTxn, final K key) {
return serde.createKeyByteBuffer(key, keyByteBuffer ->
serde.createPrefixPredicate(key, predicate -> {
Expand All @@ -88,20 +109,20 @@ private Optional<V> get(final Txn<ByteBuffer> readTxn, final K key) {
}));
}

public synchronized void search(final ExpressionCriteria criteria,
final FieldIndex fieldIndex,
final DateTimeSettings dateTimeSettings,
final ExpressionPredicateFactory expressionPredicateFactory,
final ValuesConsumer consumer) {
public void search(final ExpressionCriteria criteria,
final FieldIndex fieldIndex,
final DateTimeSettings dateTimeSettings,
final ExpressionPredicateFactory expressionPredicateFactory,
final ValuesConsumer consumer) {
final ValueFunctionFactories<Val[]> valueFunctionFactories = createValueFunctionFactories(fieldIndex);
final Optional<Predicate<Val[]>> optionalPredicate = expressionPredicateFactory
.create(criteria.getExpression(), valueFunctionFactories, dateTimeSettings);
final Predicate<Val[]> predicate = optionalPredicate.orElse(vals -> true);
final Function<KeyVal<ByteBuffer>, Val>[] valExtractors = serde.getValExtractors(fieldIndex);

// TODO : It would be faster if we limit the iteration to keys based on the criteria.
try (final Txn<ByteBuffer> txn = env.txnRead()) {
try (final CursorIterable<ByteBuffer> cursorIterable = dbi.iterate(txn)) {
read(readTxn -> {
try (final CursorIterable<ByteBuffer> cursorIterable = dbi.iterate(readTxn)) {
for (final KeyVal<ByteBuffer> keyVal : cursorIterable) {
final Val[] vals = new Val[valExtractors.length];
for (int i = 0; i < vals.length; i++) {
Expand All @@ -112,7 +133,8 @@ public synchronized void search(final ExpressionCriteria criteria,
}
}
}
}
return null;
});
}

ValueFunctionFactories<Val[]> createValueFunctionFactories(final FieldIndex fieldIndex) {
Expand All @@ -125,10 +147,8 @@ ValueFunctionFactories<Val[]> createValueFunctionFactories(final FieldIndex fiel
};
}

public synchronized long count() {
try (final Txn<ByteBuffer> readTxn = env.txnRead()) {
return dbi.stat(readTxn).entries;
}
public long count() {
return read(readTxn -> dbi.stat(readTxn).entries);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.lmdbjava.CursorIterable;
import org.lmdbjava.CursorIterable.KeyVal;
import org.lmdbjava.KeyRange;
import org.lmdbjava.Txn;

import java.nio.ByteBuffer;
import java.nio.file.Path;
Expand All @@ -27,7 +26,8 @@ public Optional<RangedState> getState(final RangedStateRequest request) {
start.flip();

final KeyRange<ByteBuffer> keyRange = KeyRange.atLeastBackward(start);
try (final Txn<ByteBuffer> readTxn = env.txnRead()) {

return read(readTxn -> {
try (final CursorIterable<ByteBuffer> cursor = dbi.iterate(readTxn, keyRange)) {
final Iterator<KeyVal<ByteBuffer>> iterator = cursor.iterator();
while (iterator.hasNext()
Expand All @@ -44,8 +44,8 @@ public Optional<RangedState> getState(final RangedStateRequest request) {
}
}
}
}
return Optional.empty();
return Optional.empty();
});
} finally {
byteBufferFactory.release(start);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.lmdbjava.CursorIterable;
import org.lmdbjava.CursorIterable.KeyVal;
import org.lmdbjava.KeyRange;
import org.lmdbjava.Txn;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
Expand All @@ -35,26 +34,26 @@ public SessionReader(final Path path,
super(path, byteBufferFactory, new SessionSerde(byteBufferFactory));
}

public synchronized void search(final ExpressionCriteria criteria,
final FieldIndex fieldIndex,
final DateTimeSettings dateTimeSettings,
final ExpressionPredicateFactory expressionPredicateFactory,
final ValuesConsumer consumer) {
public void search(final ExpressionCriteria criteria,
final FieldIndex fieldIndex,
final DateTimeSettings dateTimeSettings,
final ExpressionPredicateFactory expressionPredicateFactory,
final ValuesConsumer consumer) {
final ValueFunctionFactories<Val[]> valueFunctionFactories = createValueFunctionFactories(fieldIndex);
final Optional<Predicate<Val[]>> optionalPredicate = expressionPredicateFactory
.create(criteria.getExpression(), valueFunctionFactories, dateTimeSettings);
final Predicate<Val[]> predicate = optionalPredicate.orElse(vals -> true);
final Function<KeyVal<ByteBuffer>, Val>[] valExtractors = serde.getValExtractors(fieldIndex);

// TODO : It would be faster if we limit the iteration to keys based on the criteria.

Long lastKeyHash = null;

// We keep a map of sessions to cope with key hash clashes.
final Map<String, CurrentSession> currentSessionMap = new HashMap<>();

try (final Txn<ByteBuffer> txn = env.txnRead()) {
try (final CursorIterable<ByteBuffer> cursorIterable = dbi.iterate(txn)) {
read(readTxn -> {
// TODO : It would be faster if we limit the iteration to keys based on the criteria.

Long lastKeyHash = null;

try (final CursorIterable<ByteBuffer> cursorIterable = dbi.iterate(readTxn)) {
for (final KeyVal<ByteBuffer> keyVal : cursorIterable) {
final Val[] vals = new Val[valExtractors.length];
for (int i = 0; i < vals.length; i++) {
Expand Down Expand Up @@ -116,17 +115,19 @@ public synchronized void search(final ExpressionCriteria criteria,
}
}
}
}

if (lastKeyHash != null) {
// Send the final sessions.
currentSessionMap.values().forEach(currentSession -> consumer.accept(extendSession(
currentSession.vals,
fieldIndex,
currentSession.sessionStart,
currentSession.sessionEnd)));
currentSessionMap.clear();
}
if (lastKeyHash != null) {
// Send the final sessions.
currentSessionMap.values().forEach(currentSession -> consumer.accept(extendSession(
currentSession.vals,
fieldIndex,
currentSession.sessionStart,
currentSession.sessionEnd)));
currentSessionMap.clear();
}

return null;
});
}

public Val[] extendSession(final Val[] vals,
Expand All @@ -145,8 +146,6 @@ public Val[] extendSession(final Val[] vals,
}

public Optional<Session> getState(final SessionRequest request) {
Optional<Session> result = Optional.empty();

// Hash the value.
final long nameHash = LongHashFunction.xx3().hashBytes(request.name());
final long time = request.time();
Expand All @@ -161,7 +160,7 @@ public Optional<Session> getState(final SessionRequest request) {
end.flip();

final KeyRange<ByteBuffer> keyRange = KeyRange.closedBackward(start, end);
try (final Txn<ByteBuffer> readTxn = env.txnRead()) {
return read(readTxn -> {
try (final CursorIterable<ByteBuffer> cursor = dbi.iterate(readTxn, keyRange)) {
final Iterator<KeyVal<ByteBuffer>> iterator = cursor.iterator();
while (iterator.hasNext()
Expand All @@ -185,13 +184,12 @@ public Optional<Session> getState(final SessionRequest request) {
}
}
}
}
return Optional.empty();
});
} finally {
byteBufferFactory.release(start);
byteBufferFactory.release(end);
}

return result;
}

private record CurrentSession(Long sessionStart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public StateReader(final Path path,
super(path, byteBufferFactory, new StateSerde(byteBufferFactory));
}

public synchronized Optional<State> getState(final StateRequest request) {
public Optional<State> getState(final StateRequest request) {
final Key key = Key.builder().name(request.key()).build();
final Optional<StateValue> optional = get(key);
return optional.map(value -> new State(key, value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.lmdbjava.CursorIterable;
import org.lmdbjava.CursorIterable.KeyVal;
import org.lmdbjava.KeyRange;
import org.lmdbjava.Txn;

import java.nio.ByteBuffer;
import java.nio.file.Path;
Expand All @@ -21,15 +20,14 @@ public TemporalRangedStateReader(final Path path,
}

public Optional<TemporalRangedState> getState(final TemporalRangedStateRequest request) {
Optional<TemporalRangedState> result = Optional.empty();

final ByteBuffer start = byteBufferFactory.acquire(Long.BYTES);
try {
start.putLong(request.key());
start.flip();

final KeyRange<ByteBuffer> keyRange = KeyRange.atLeastBackward(start);
try (final Txn<ByteBuffer> readTxn = env.txnRead()) {
return read(readTxn -> {
Optional<TemporalRangedState> result = Optional.empty();
try (final CursorIterable<ByteBuffer> cursor = dbi.iterate(readTxn, keyRange)) {
final Iterator<KeyVal<ByteBuffer>> iterator = cursor.iterator();
while (iterator.hasNext()
Expand All @@ -53,11 +51,10 @@ public Optional<TemporalRangedState> getState(final TemporalRangedStateRequest r
}
}
}
}
return result;
});
} finally {
byteBufferFactory.release(start);
}

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.lmdbjava.CursorIterable;
import org.lmdbjava.CursorIterable.KeyVal;
import org.lmdbjava.KeyRange;
import org.lmdbjava.Txn;

import java.nio.ByteBuffer;
import java.nio.file.Path;
Expand All @@ -22,7 +21,7 @@ public TemporalStateReader(final Path path,
super(path, byteBufferFactory, new TemporalStateSerde(byteBufferFactory));
}

public synchronized Optional<TemporalState> getState(final TemporalStateRequest request) {
public Optional<TemporalState> getState(final TemporalStateRequest request) {
final long rowHash = LongHashFunction.xx3().hashBytes(request.key());
final ByteBuffer start = byteBufferFactory.acquire(Long.BYTES + Long.BYTES);
final ByteBuffer stop = byteBufferFactory.acquire(Long.BYTES);
Expand All @@ -35,7 +34,7 @@ public synchronized Optional<TemporalState> getState(final TemporalStateRequest
stop.flip();

final KeyRange<ByteBuffer> keyRange = KeyRange.closedBackward(start, stop);
try (final Txn<ByteBuffer> readTxn = env.txnRead()) {
return read(readTxn -> {
try (final CursorIterable<ByteBuffer> cursor = dbi.iterate(readTxn, keyRange)) {
final Iterator<KeyVal<ByteBuffer>> iterator = cursor.iterator();
while (iterator.hasNext()
Expand All @@ -58,12 +57,11 @@ public synchronized Optional<TemporalState> getState(final TemporalStateRequest
}
}
}
}
return Optional.empty();
});
} finally {
byteBufferFactory.release(start);
byteBufferFactory.release(stop);
}

return Optional.empty();
}
}

0 comments on commit f922898

Please sign in to comment.