diff --git a/warp10/src/main/java/io/warp10/continuum/Configuration.java b/warp10/src/main/java/io/warp10/continuum/Configuration.java index 36153b277..74203063e 100644 --- a/warp10/src/main/java/io/warp10/continuum/Configuration.java +++ b/warp10/src/main/java/io/warp10/continuum/Configuration.java @@ -1,5 +1,5 @@ // -// Copyright 2018-2024 SenX S.A.S. +// Copyright 2018-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -230,6 +230,16 @@ public class Configuration { public static final String WARPSCRIPT_MAX_PIXELS_HARD = "warpscript.maxpixels.hard"; public static final String WARPSCRIPT_MAX_JSON_HARD = "warpscript.maxjson.hard"; + /** + * Default maximum key size for KV stored via KVSTORE + */ + public static final String WARP_KVSTORE_MAXK = "warp.kvstore.maxk"; + + /** + * Default maximum value size for KV stored via KVSTORE + */ + public static final String WARP_KVSTORE_MAXV = "warp.kvstore.maxv"; + /** * When set to true, allow common comment block style. When false, keep the old strict comment block style within WarpScript */ @@ -1760,6 +1770,11 @@ public class Configuration { */ public static final String IN_MEMORY_EPHEMERAL = "in.memory.ephemeral"; + /** + * Default maximum value size for all stored KV + */ + public static final String IN_MEMORY_MAXKVSIZE = "in.memory.maxkvsize"; + /** * Number of chunks per GTS to handle in memory (defaults to 3) */ @@ -1814,6 +1829,11 @@ public class Configuration { */ public static final String EGRESS_FDB_TENANT_PREFIX = "egress.fdb.tenant.prefix"; + /** + * Maximum number of retries for FoundationDB transactions + */ + public static final String EGRESS_FDB_RETRYLIMIT = "egress.fdb.retrylimit"; + /** * Size of pooled FoundationDB databases instances */ diff --git a/warp10/src/main/java/io/warp10/continuum/sensision/SensisionConstants.java b/warp10/src/main/java/io/warp10/continuum/sensision/SensisionConstants.java index 02b8202b0..ae421c4a8 100644 --- a/warp10/src/main/java/io/warp10/continuum/sensision/SensisionConstants.java +++ b/warp10/src/main/java/io/warp10/continuum/sensision/SensisionConstants.java @@ -1,5 +1,5 @@ // -// Copyright 2018-2023 SenX S.A.S. +// Copyright 2018-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -30,6 +30,16 @@ public class SensisionConstants { // Classes // + /** + * Number of FoundationDB sets committed in 'Egress' + */ + public static final String SENSISION_CLASS_CONTINUUM_EGRESS_FDB_SETS_COMMITTED = "warp.egress.fdb.sets.committed"; + + /** + * Number of FoundationDB clears committed in 'Egress' + */ + public static final String SENSISION_CLASS_CONTINUUM_EGRESS_FDB_CLEARS_COMMITTED = "warp.egress.fdb.clears.committed"; + /** * Number of times we have reached the maximum number of version attempts */ diff --git a/warp10/src/main/java/io/warp10/continuum/store/StoreClient.java b/warp10/src/main/java/io/warp10/continuum/store/StoreClient.java index 7ce88fee8..a2296e8a1 100644 --- a/warp10/src/main/java/io/warp10/continuum/store/StoreClient.java +++ b/warp10/src/main/java/io/warp10/continuum/store/StoreClient.java @@ -1,5 +1,5 @@ // -// Copyright 2018-2022 SenX S.A.S. +// Copyright 2018-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,14 +17,21 @@ package io.warp10.continuum.store; import java.io.IOException; +import java.util.Iterator; +import java.util.Map.Entry; import io.warp10.continuum.gts.GTSEncoder; import io.warp10.continuum.store.thrift.data.FetchRequest; +import io.warp10.continuum.store.thrift.data.KVFetchRequest; +import io.warp10.continuum.store.thrift.data.KVStoreRequest; import io.warp10.continuum.store.thrift.data.Metadata; import io.warp10.quasar.token.thrift.data.WriteToken; import io.warp10.standalone.StandalonePlasmaHandlerInterface; public interface StoreClient { + + public interface KVIterator extends Iterator, AutoCloseable {} + public void store(GTSEncoder encoder) throws IOException; public long delete(WriteToken token, Metadata metadata, long start, long end) throws IOException; /** @@ -43,4 +50,7 @@ public interface StoreClient { */ public GTSDecoderIterator fetch(FetchRequest req) throws IOException; public void addPlasmaHandler(StandalonePlasmaHandlerInterface handler); + + public void kvstore(KVStoreRequest request) throws IOException; + public KVIterator> kvfetch(KVFetchRequest request) throws IOException; } diff --git a/warp10/src/main/java/io/warp10/fdb/FDBClear.java b/warp10/src/main/java/io/warp10/fdb/FDBClear.java index eeca6ed72..3622afad1 100644 --- a/warp10/src/main/java/io/warp10/fdb/FDBClear.java +++ b/warp10/src/main/java/io/warp10/fdb/FDBClear.java @@ -1,5 +1,5 @@ // -// Copyright 2022 SenX S.A.S. +// Copyright 2022-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -48,6 +48,6 @@ public void apply(Transaction txn) { @Override public int size() { - return 103 + key.length + (null != this.tenant ? tenant.length : 0); + return 120 + key.length + (null != this.tenant ? tenant.length : 0); } } diff --git a/warp10/src/main/java/io/warp10/fdb/FDBGetOp.java b/warp10/src/main/java/io/warp10/fdb/FDBGetOp.java new file mode 100644 index 000000000..7d1d5326e --- /dev/null +++ b/warp10/src/main/java/io/warp10/fdb/FDBGetOp.java @@ -0,0 +1,83 @@ +// +// Copyright 2025 SenX S.A.S. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package io.warp10.fdb; + +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import com.apple.foundationdb.Transaction; + +public class FDBGetOp implements FDBMutation { + + /** + * Tenant prefix + */ + private final byte[] tenant; + private final byte[] key; + + CompletableFuture future = null; + + public FDBGetOp(byte[] key) { + this(null, key); + } + + public FDBGetOp(byte[] tenant, byte[] key) { + this.tenant = tenant; + this.key = key; + } + + @Override + public void apply(Transaction txn) { + synchronized (this.key) { + if (null != this.future) { + throw new IllegalStateException("Operation has already been applied."); + } + if (null != tenant) { + byte[] tkey = Arrays.copyOf(this.tenant, tenant.length + key.length); + System.arraycopy(this.key, 0, tkey, this.tenant.length, key.length); + this.future = txn.get(tkey); + } else { + this.future = txn.get(this.key); + } + } + } + + @Override + public int size() { + return 103 + (null != tenant ? tenant.length : 0) + key.length; + } + + public byte[] getKey() { + return this.key; + } + + public byte[] getValue() { + if (null == future) { + throw new IllegalStateException("Operation has not been applied."); + } + try { + return this.future.get(); + } catch (ExecutionException|InterruptedException e) { + throw new RuntimeException("Error retrieving value.", e); + } + } + + public byte[] getTenant() { + return this.tenant; + } +} diff --git a/warp10/src/main/java/io/warp10/fdb/FDBStoreClient.java b/warp10/src/main/java/io/warp10/fdb/FDBStoreClient.java index 12ce22530..a98b524fe 100644 --- a/warp10/src/main/java/io/warp10/fdb/FDBStoreClient.java +++ b/warp10/src/main/java/io/warp10/fdb/FDBStoreClient.java @@ -1,5 +1,5 @@ // -// Copyright 2022-2023 SenX S.A.S. +// Copyright 2022-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,24 +17,45 @@ package io.warp10.fdb; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.AbstractMap; +import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; +import org.bouncycastle.util.Arrays; import org.bouncycastle.util.encoders.Hex; +import com.apple.foundationdb.Database; +import com.apple.foundationdb.FDBException; +import com.apple.foundationdb.StreamingMode; +import com.apple.foundationdb.Transaction; + import io.warp10.continuum.Configuration; import io.warp10.continuum.gts.GTSEncoder; import io.warp10.continuum.gts.MetadataIdComparator; +import io.warp10.continuum.sensision.SensisionConstants; +import io.warp10.continuum.store.Constants; import io.warp10.continuum.store.GTSDecoderIterator; import io.warp10.continuum.store.MultiScanGTSDecoderIterator; import io.warp10.continuum.store.ParallelGTSDecoderIteratorWrapper; import io.warp10.continuum.store.StoreClient; import io.warp10.continuum.store.thrift.data.FetchRequest; +import io.warp10.continuum.store.thrift.data.KVFetchRequest; +import io.warp10.continuum.store.thrift.data.KVStoreRequest; import io.warp10.continuum.store.thrift.data.Metadata; import io.warp10.crypto.KeyStore; import io.warp10.crypto.OrderPreservingBase64; +import io.warp10.quasar.token.thrift.data.ReadToken; import io.warp10.quasar.token.thrift.data.WriteToken; +import io.warp10.script.functions.KVSTORE; +import io.warp10.sensision.Sensision; import io.warp10.standalone.StandalonePlasmaHandlerInterface; public class FDBStoreClient implements StoreClient { @@ -43,12 +64,16 @@ public class FDBStoreClient implements StoreClient { public final KeyStore keystore; public final boolean FDBUseTenantPrefix; + private final long fdbRetryLimit; private static final String EGRESS_FDB_POOLSIZE_DEFAULT = Integer.toString(1); + public static final String DEFAULT_FDB_RETRYLIMIT = Long.toString(4); + public FDBStoreClient(KeyStore keystore, Properties properties) throws IOException { this.keystore = keystore; this.FDBUseTenantPrefix = "true".equals(properties.getProperty(Configuration.FDB_USE_TENANT_PREFIX)); + this.fdbRetryLimit = Long.parseLong(properties.getProperty(Configuration.EGRESS_FDB_RETRYLIMIT, DEFAULT_FDB_RETRYLIMIT)); if (FDBUseTenantPrefix && (null != properties.getProperty(Configuration.EGRESS_FDB_TENANT) || null != properties.getProperty(Configuration.EGRESS_FDB_TENANT_PREFIX))) { throw new RuntimeException("Cannot set '" + Configuration.EGRESS_FDB_TENANT + "' or '" + Configuration.EGRESS_FDB_TENANT_PREFIX + "' when '" + Configuration.FDB_USE_TENANT_PREFIX + "' is true."); @@ -176,4 +201,457 @@ public long delete(WriteToken token, Metadata metadata, long start, long end) th public FDBPool getPool() { return this.pool; } + + @Override + public void kvstore(KVStoreRequest request) throws IOException { + + if (null == request) { + return; + } + + // Convert to FDBMutations, then batch those in tx of less than 10M + + WriteToken token = request.getToken(); + + byte[] tenantPrefix = this.pool.getContext().getTenantPrefix(); + + if (this.FDBUseTenantPrefix) { + if (token.getAttributesSize() > 0 && token.getAttributes().containsKey(Constants.TOKEN_ATTR_FDB_TENANT_PREFIX)) { + tenantPrefix = OrderPreservingBase64.decode(token.getAttributes().get(Constants.TOKEN_ATTR_FDB_TENANT_PREFIX)); + if (8 != tenantPrefix.length) { + throw new IOException("Invalid tenant prefix, length should be 8 bytes."); + } + } else { + throw new IOException("Configuration mandates a tenant and none was found in the provided token"); + } + } else { + if (token.getAttributesSize() > 0 && token.getAttributes().containsKey(Constants.TOKEN_ATTR_FDB_TENANT_PREFIX)) { + throw new IOException("Configuration has no tenant support but provided token had a defined tenant."); + } + } + + // + // Extract kvprefix from token attributes + // + + byte[] kvprefix = KVSTORE.KVPREFIX; + + if (token.getAttributesSize() > 0 && null != token.getAttributes().get(KVSTORE.ATTR_KVPREFIX)) { + if (!token.getAttributes().get(KVSTORE.ATTR_KVPREFIX).isEmpty()) { + byte[] tkvprefix = OrderPreservingBase64.decode(token.getAttributes().get(KVSTORE.ATTR_KVPREFIX)); + kvprefix = Arrays.copyOf(KVSTORE.KVPREFIX, KVSTORE.KVPREFIX.length + tkvprefix.length); + System.arraycopy(tkvprefix, 0, kvprefix, KVSTORE.KVPREFIX.length, tkvprefix.length); + } + } else { + throw new IOException("Token is missing attribute '" + KVSTORE.ATTR_KVPREFIX + "'."); + } + + List> batches = new ArrayList>(); + List batch = new ArrayList(); + batches.add(batch); + long batchsize = 0; + + for (int i = 0; i < request.getKeysSize(); i++) { + ByteBuffer bb = request.getKeys().get(i); + bb.mark(); + byte[] key = Arrays.copyOf(kvprefix, kvprefix.length + bb.remaining()); + + if (key.length + (null != tenantPrefix ? tenantPrefix.length : 0) >= FDBUtils.MAX_KEY_SIZE) { + throw new IOException("Key size (" + key.length + ") exceed maximum size."); + } + + bb.get(key, kvprefix.length, bb.remaining()); + bb.reset(); + + FDBMutation mutation = null; + if (i > request.getValuesSize() || 0 == request.getValues().get(i).remaining()) { + mutation = new FDBClear(tenantPrefix, key); + } else { + bb = request.getValues().get(i); + bb.mark(); + if (bb.remaining() >= FDBUtils.MAX_VALUE_SIZE) { + throw new IOException("Value size (" + bb.remaining() + ") exceed maximum size."); + } + + byte[] value = new byte[bb.remaining()]; + bb.get(value); + bb.reset(); + mutation = new FDBSet(tenantPrefix, key, value); + } + + if (batchsize + mutation.size() > FDBUtils.MAX_TXN_SIZE * 0.95) { + batch = new ArrayList(); + batches.add(batch); + batchsize = 0; + } + + batch.add(mutation); + batchsize += mutation.size(); + } + + // + // Persist the batches via one tx per batch + // + + + for (List btch: batches) { + Database db = this.pool.getDatabase(); + Transaction txn = null; + boolean retry = false; + long retries = fdbRetryLimit; + + do { + try { + retry = false; + txn = db.createTransaction(); + // Allow RAW access because we may manually force a tenant key prefix without actually setting a tenant + if (this.pool.getContext().hasTenant() || this.FDBUseTenantPrefix) { + txn.options().setRawAccess(); + } + + int sets = 0; + int clears = 0; + + for (FDBMutation mutation: btch) { + if (mutation instanceof FDBSet) { + sets++; + } else if (mutation instanceof FDBClearRange) { + clears++; + } + mutation.apply(txn); + } + txn.commit().get(); + Sensision.update(SensisionConstants.SENSISION_CLASS_CONTINUUM_EGRESS_FDB_SETS_COMMITTED, Sensision.EMPTY_LABELS, sets); + Sensision.update(SensisionConstants.SENSISION_CLASS_CONTINUUM_EGRESS_FDB_CLEARS_COMMITTED, Sensision.EMPTY_LABELS, clears); + } catch (Throwable t) { + FDBUtils.errorMetrics("egress", t.getCause()); + if (t.getCause() instanceof FDBException && ((FDBException) t.getCause()).isRetryable() && retries-- > 0) { + retry = true; + } else { + throw new IOException("Error while commiting to FoundationDB.", t); + } + } finally { + if (null != txn) { + try { txn.close(); } catch (Throwable t) {} + txn = null; + } + } + } while(retry); + } + } + + @Override + public KVIterator> kvfetch(KVFetchRequest request) throws IOException { + + ReadToken token = request.getToken(); + + if (FDBUseTenantPrefix && (0 == token.getAttributesSize() || !token.getAttributes().containsKey(Constants.TOKEN_ATTR_FDB_TENANT_PREFIX))) { + throw new IOException("Invalid token, missing tenant prefix."); + } else if (!FDBUseTenantPrefix && (0 != token.getAttributesSize() && token.getAttributes().containsKey(Constants.TOKEN_ATTR_FDB_TENANT_PREFIX))) { + throw new IOException("Invalid token, no support for tenant prefix."); + } + + byte[] tenantPrefix = null; + + if (FDBUseTenantPrefix) { + tenantPrefix = OrderPreservingBase64.decode(token.getAttributes().get(Constants.TOKEN_ATTR_FDB_TENANT_PREFIX)); + if (8 != tenantPrefix.length) { + throw new IOException("Invalid tenant prefix, length should be 8 bytes."); + } + } else { + tenantPrefix = pool.getContext().getTenantPrefix(); + } + + // + // Extract kvprefix from token attributes + // + + byte[] kvprefix = KVSTORE.KVPREFIX; + + if (token.getAttributesSize() > 0 && null != token.getAttributes().get(KVSTORE.ATTR_KVPREFIX)) { + if (!token.getAttributes().get(KVSTORE.ATTR_KVPREFIX).isEmpty()) { + byte[] tkvprefix = OrderPreservingBase64.decode(token.getAttributes().get(KVSTORE.ATTR_KVPREFIX)); + kvprefix = Arrays.copyOf(KVSTORE.KVPREFIX, KVSTORE.KVPREFIX.length + tkvprefix.length); + System.arraycopy(tkvprefix, 0, kvprefix, KVSTORE.KVPREFIX.length, tkvprefix.length); + } + } else { + throw new IOException("Token is missing attribute '" + KVSTORE.ATTR_KVPREFIX + "'."); + } + + if (request.isSetStart() && request.isSetStop()) { + FDBScan scan = new FDBScan(); + scan.setReverse(false); + scan.setTenantPrefix(tenantPrefix); + + byte[] start = Arrays.copyOf(kvprefix, kvprefix.length + request.getStart().length); + System.arraycopy(request.getStart(), 0, start, kvprefix.length, request.getStart().length); + + byte[] stop = Arrays.copyOf(kvprefix, kvprefix.length + request.getStop().length); + System.arraycopy(request.getStop(), 0, stop, kvprefix.length, request.getStop().length); + + scan.setStartKey(start); + scan.setEndKey(stop); + + AtomicReference tx = new AtomicReference(); + + FDBKVScanner scanner = scan.getScanner(pool.getContext(), pool.getDatabase(), StreamingMode.ITERATOR, null); + Sensision.update(SensisionConstants.SENSISION_CLASS_CONTINUUM_FDB_CLIENT_SCANNERS, Sensision.EMPTY_LABELS, 1); + + final byte[] fkvprefix = kvprefix; + + return new KVIterator>() { + Entry element = null; + boolean done = false; + @Override + + public boolean hasNext() { + if (done) { + return false; + } + if (null != element) { + return true; + } + + if (!scanner.hasNext()) { + done = true; + return false; + } + + FDBKeyValue kv = scanner.next(); + + byte[] key = kv.getKey(); + + // Strip kvprefix from key + if (fkvprefix.length > 0) { + byte[] k = Arrays.copyOfRange(key, fkvprefix.length, key.length); + element = new AbstractMap.SimpleEntry(k, kv.getValue()); + } else { + element = new AbstractMap.SimpleEntry(key, kv.getValue()); + } + + return null != element; + } + + @Override + public Entry next() { + if (done) { + throw new IllegalStateException(); + } + if (null == element && !hasNext()) { + throw new NoSuchElementException(); + } + Entry elt = element; + element = null; + return elt; + } + + @Override + public void close() throws Exception { + scanner.close(); + } + }; + } else { + // Keys + + long txsize = 0L; + + List batch = new ArrayList(); + + List> kvs = new ArrayList>(); + + for (int i = 0; i < request.getKeysSize(); i++) { + ByteBuffer kbb = request.getKeys().get(i); + byte[] key = Arrays.copyOf(kvprefix, kvprefix.length + kbb.remaining()); + kbb.get(key, kvprefix.length, kbb.remaining()); + + FDBGetOp get = new FDBGetOp(tenantPrefix, key); + + // We've reached the size limit for a transaction, issue it to FDB + if (txsize + get.size() >= FDBUtils.MAX_TXN_SIZE * 0.95) { + kvs.addAll(flushGetOps(batch)); + batch.clear(); + txsize = 0; + } + batch.add(get); + txsize += get.size(); + } + + if (!batch.isEmpty()) { + kvs.addAll(flushGetOps(batch)); + } + + // + // Return an iterator which will strip kvprefix from keys as elements are returned + // + + Iterator> iter = kvs.iterator(); + + final byte[] fkvprefix = kvprefix; + + return new KVIterator>() { + boolean done = false; + Entry element = null; + @Override + public void close() throws Exception { + done = true; + } + @Override + public boolean hasNext() { + if (done) { + return false; + } + if (null != element) { + return true; + } + if (!iter.hasNext()) { + done = true; + return false; + } + Entry entry = iter.next(); + + // Strip kvprefix from key + if (fkvprefix.length > 0) { + byte[] k = Arrays.copyOfRange(entry.getKey(), fkvprefix.length, entry.getKey().length); + element = new AbstractMap.SimpleEntry(k, entry.getValue()); + } else { + element = entry; + } + return true; + } + + @Override + public Entry next() { + if (done) { + throw new IllegalStateException(); + } + + if (null == element) { + if (!hasNext()) { + throw new IllegalStateException(); + } + } + + Entry elt = element; + element = null; + return elt; + } + }; + } + } + + private List> flushGetOps(List batch) { + List> kvs = new ArrayList>(batch.size()); + + if (batch.isEmpty()) { + return kvs; + } + + Transaction txn = null; + + /** + * Number of attempts when cluster is lagging, 0 means no lag mitigation + */ + int versionAttempts = 32; + + /** + * Read version offset to apply when we need to go back in time (post error 1037). In microseconds. + * The offset will be modified along the attempts. + */ + long offset = 1000000L; + + Long forceReadVersion = null; + + long count = 0L; + + long attempts = this.fdbRetryLimit; + + while(attempts-- > 0) { + try { + txn = this.pool.getDatabase().createTransaction(); + // Allow RAW access because we may manually force a tenant key prefix without actually setting a tenant + if (this.pool.getContext().hasTenant() || null != batch.get(0).getTenant()) { + txn.options().setRawAccess(); + } + + txn.options().setCausalReadRisky(); + txn.options().setReadYourWritesDisable(); + txn.options().setSnapshotRywDisable(); + + for (FDBGetOp op: batch) { + op.apply(txn); + } + + txn.commit(); + + for (FDBGetOp op: batch) { + kvs.add(new AbstractMap.SimpleEntry(op.getKey(), op.getValue())); + } + + return kvs; + } catch (Throwable t) { + FDBUtils.errorMetrics("kvfetch", t.getCause()); + if (t.getCause() instanceof FDBException) { + FDBException fdbe = (FDBException) t.getCause(); + int code = fdbe.getCode(); + + // We support retrying after a 'transaction too old' error + if (1007 == code) { // Transaction too old + // If we were forcing a read version and read nothing then we probably set the read version + // too far in the past, adjust it 0.5s in the future + if (null != forceReadVersion && 0L == count) { + forceReadVersion += 500000; + } + // Update last seen read version + if (count > 0L) { + try { this.pool.getContext().setLastSeenReadVersion(txn.getReadVersion().get()); } catch (Throwable tt) {} + count = 0L; + } + try { txn.close(); } catch (Throwable tt) {} + txn = null; + continue; //return hasNext(); + } else if (1009 == code // 1009: request for future version, we were too aggressive when forcing the version. + || 1037 == code) { // 1037: process_behind, the cluster is lagging, attempt to force a known valid read version + long readVersion = 0L; + try { + // Call getReadVersion so version gets a chance to be initialized and we don't risk getting a transaction_too_old (1007) + // @see https://forums.foundationdb.org/t/use-of-setreadversion/3696/2 + readVersion = txn.getReadVersion().get(); + } catch (Throwable tt) {} + // We have reached the maximum number of forced version attempts + if (0 >= versionAttempts) { + Sensision.update(SensisionConstants.CLASS_WARP_FDB_MAXFORCEDVERSION, Sensision.EMPTY_LABELS, 1); + throw new RuntimeException("Maximum number of forced read version attempts reached", t); + } + + if (null == forceReadVersion) { + forceReadVersion = this.pool.getContext().getLastSeenReadVersion(); + + if (forceReadVersion < readVersion) { + forceReadVersion = readVersion - offset; + } + } else { + // forced read version was already set but reading failed anyway, so attempt with a read version further in the past + forceReadVersion -= offset; + } + // Close the transaction since we cannot force the read version on a transaction which alread + try { txn.close(); } catch (Throwable tt) {} + txn = null; + continue; //return hasNext(); + } else if (1039 == code) { // cluster_version_changed, may happen when using the multi-version client upon first connection + // Close the transaction + try { txn.close(); } catch (Throwable tt) {} + txn = null; + continue; + } + } + throw t; + } finally { + if (null != txn) { + try { txn.close(); } catch (Throwable tt) {} + } + } + } + + throw new RuntimeException("Unable to retrieve Key/Value pairs from FoundationDB"); + } } diff --git a/warp10/src/main/java/io/warp10/fdb/FDBUtils.java b/warp10/src/main/java/io/warp10/fdb/FDBUtils.java index 914e03017..5b435b74a 100644 --- a/warp10/src/main/java/io/warp10/fdb/FDBUtils.java +++ b/warp10/src/main/java/io/warp10/fdb/FDBUtils.java @@ -1,5 +1,5 @@ // -// Copyright 2022-2023 SenX S.A.S. +// Copyright 2022-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -49,6 +49,7 @@ public class FDBUtils { public static final String KEY_ID = "id"; public static final String KEY_PREFIX = "prefix"; + public static final int MAX_KEY_SIZE = 10000; public static final int MAX_VALUE_SIZE = 100000; public static final long MAX_TXN_SIZE = 10000000; diff --git a/warp10/src/main/java/io/warp10/leveldb/WarpDB.java b/warp10/src/main/java/io/warp10/leveldb/WarpDB.java index fde424269..7f8691db1 100644 --- a/warp10/src/main/java/io/warp10/leveldb/WarpDB.java +++ b/warp10/src/main/java/io/warp10/leveldb/WarpDB.java @@ -1,5 +1,5 @@ // -// Copyright 2018-2023 SenX S.A.S. +// Copyright 2018-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -355,8 +355,7 @@ public Snapshot delete(byte[] key, WriteOptions options) throws DBException { @Override public byte[] get(byte[] key) throws DBException { - throw new RuntimeException("Unsupported operation get."); - //return this.db.get(key); + return this.db.get(key); } @Override diff --git a/warp10/src/main/java/io/warp10/script/WarpScriptLib.java b/warp10/src/main/java/io/warp10/script/WarpScriptLib.java index 9ad4897a2..7c32649d3 100644 --- a/warp10/src/main/java/io/warp10/script/WarpScriptLib.java +++ b/warp10/src/main/java/io/warp10/script/WarpScriptLib.java @@ -1,5 +1,5 @@ // -// Copyright 2019-2024 SenX S.A.S. +// Copyright 2019-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -360,6 +360,8 @@ import io.warp10.script.functions.JSONTO; import io.warp10.script.functions.KEYLIST; import io.warp10.script.functions.KURTOSIS; +import io.warp10.script.functions.KVLOAD; +import io.warp10.script.functions.KVSTORE; import io.warp10.script.functions.LABELS; import io.warp10.script.functions.LASTACTIVITY; import io.warp10.script.functions.LASTBUCKET; @@ -1106,8 +1108,8 @@ public class WarpScriptLib { public static final String BDTESTBIT = "BDTESTBIT"; public static final String BDXOR = "BDXOR"; - - + public static final String KVLOAD = "KVLOAD"; + public static final String KVSTORE = "KVSTORE"; public static final String RSAPUBLIC = "RSAPUBLIC"; public static final String RSAPRIVATE = "RSAPRIVATE"; @@ -2955,6 +2957,9 @@ public class WarpScriptLib { addNamedWarpScriptFunction(new BDPROBABLEPRIME(BDPROBABLEPRIME, false)); addNamedWarpScriptFunction(new BDPROBABLEPRIME(SBDPROBABLEPRIME, true)); + addNamedWarpScriptFunction(new KVSTORE(KVSTORE)); + addNamedWarpScriptFunction(new KVLOAD(KVLOAD)); + // // Linear Algebra // diff --git a/warp10/src/main/java/io/warp10/script/functions/KVLOAD.java b/warp10/src/main/java/io/warp10/script/functions/KVLOAD.java new file mode 100644 index 000000000..6852c2d50 --- /dev/null +++ b/warp10/src/main/java/io/warp10/script/functions/KVLOAD.java @@ -0,0 +1,170 @@ +// +// Copyright 2025 SenX S.A.S. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package io.warp10.script.functions; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import io.warp10.BytesUtils; +import io.warp10.continuum.Tokens; +import io.warp10.continuum.store.StoreClient; +import io.warp10.continuum.store.StoreClient.KVIterator; +import io.warp10.continuum.store.thrift.data.KVFetchRequest; +import io.warp10.quasar.token.thrift.data.ReadToken; +import io.warp10.script.NamedWarpScriptFunction; +import io.warp10.script.WarpScriptException; +import io.warp10.script.WarpScriptStack; +import io.warp10.script.WarpScriptStack.Macro; +import io.warp10.script.WarpScriptStackFunction; + +public class KVLOAD extends NamedWarpScriptFunction implements WarpScriptStackFunction { + + public static final String KEY_START = "start"; + public static final String KEY_END = "end"; + public static final String KEY_KEYS = "keys"; + public static final String KEY_TOKEN = "token"; + public static final String KEY_MACRO = "macro"; + + public KVLOAD(String name) { + super(name); + } + + @Override + public Object apply(WarpScriptStack stack) throws WarpScriptException { + Object top = stack.pop(); + + if (!(top instanceof Map)) { + throw new WarpScriptException(getName() + " operates on a map."); + } + + Map params = (Map) top; + + if (!(params.get(KEY_TOKEN) instanceof String)) { + throw new WarpScriptException(getName() + " expects a token under '" + KEY_TOKEN + "'."); + } + + // + // If the token has an attribute .kvprefix then the call can succeed. This attribute + // acts as a capability. + // + + ReadToken rtoken = Tokens.extractReadToken((String) params.get(KEY_TOKEN)); + + KVFetchRequest kvfr = new KVFetchRequest(); + kvfr.setToken(rtoken); + + Macro macro = null; + + if (params.get(KEY_MACRO) instanceof Macro) { + macro = (Macro) params.get(KEY_MACRO); + } + + if (params.get(KEY_KEYS) instanceof List) { + List keys = (List) params.get(KEY_KEYS); + List bkeys = new ArrayList(keys.size()); + + for (Object key: keys) { + if (key instanceof byte[]) { + bkeys.add((byte[]) key); + } else if (key instanceof String) { + byte[] k = ((String) key).getBytes(StandardCharsets.UTF_8); + if (k.length != ((String) key).length()) { + throw new WarpScriptException(getName() + " STRING keys cannot contain non ISO-8859-1 characters."); + } + bkeys.add(k); + } else { + throw new WarpScriptException(getName() + " expects '" + KEY_KEYS + "' to contain a list of BYTES or STRING keys."); + } + } + + // Sort the list in lexicographical order to speed up retrieval + + bkeys.sort(new Comparator() { + @Override + public int compare(byte[] o1, byte[] o2) { + return BytesUtils.compareTo(o1, o2); + } + }); + + for (byte[] key: bkeys) { + kvfr.addToKeys(ByteBuffer.wrap(key)); + } + } else { + byte[] start = null; + byte[] end = null; + + if (params.get(KEY_START) instanceof byte[]) { + start = Arrays.copyOf((byte[]) params.get(KEY_START), ((byte[]) params.get(KEY_START)).length); + } else if (params.get(KEY_START) instanceof String) { + start = ((String) params.get(KEY_START)).getBytes(StandardCharsets.UTF_8); + if (start.length != ((String) params.get(KEY_START)).length()) { + throw new WarpScriptException(getName() + " STRING keys cannot contain non ISO-8859-1 characters."); + } + } else { + throw new WarpScriptException(getName() + " expects keys to be STRING or BYTES."); + } + + if (params.get(KEY_END) instanceof byte[]) { + end = Arrays.copyOf((byte[]) params.get(KEY_END), ((byte[]) params.get(KEY_END)).length); + } else if (params.get(KEY_END) instanceof String) { + end = ((String) params.get(KEY_END)).getBytes(StandardCharsets.UTF_8); + if (end.length != ((String) params.get(KEY_END)).length()) { + throw new WarpScriptException(getName() + " STRING keys cannot contain non ISO-8859-1 characters."); + } + } else { + throw new WarpScriptException(getName() + " expects keys to be STRING or BYTES."); + } + + kvfr.setStart(start); + kvfr.setStop(end); + } + + StoreClient store = stack.getStoreClient(); + + Map kv = new LinkedHashMap(); + + try (KVIterator> iter = store.kvfetch(kvfr)) { + while(iter.hasNext()) { + Entry entry = iter.next(); + + if (null != macro) { + stack.push(entry.getKey()); + stack.push(entry.getValue()); + stack.exec(macro); + if (Boolean.TRUE.equals(stack.pop())) { + kv.put(entry.getKey(), entry.getValue()); + } + } else { + kv.put(entry.getKey(), entry.getValue()); + } + } + } catch (Exception e) { + throw new WarpScriptException(getName() + " encountered an error while reading Key/Value pairs.", e); + } + + stack.push(kv); + + return stack; + } +} diff --git a/warp10/src/main/java/io/warp10/script/functions/KVSTORE.java b/warp10/src/main/java/io/warp10/script/functions/KVSTORE.java new file mode 100644 index 000000000..af6845504 --- /dev/null +++ b/warp10/src/main/java/io/warp10/script/functions/KVSTORE.java @@ -0,0 +1,161 @@ +// +// Copyright 2025 SenX S.A.S. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package io.warp10.script.functions; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Map.Entry; + +import io.warp10.WarpConfig; +import io.warp10.continuum.Configuration; +import io.warp10.continuum.Tokens; +import io.warp10.continuum.store.StoreClient; +import io.warp10.continuum.store.thrift.data.KVStoreRequest; +import io.warp10.quasar.token.thrift.data.WriteToken; +import io.warp10.script.NamedWarpScriptFunction; +import io.warp10.script.WarpScriptException; +import io.warp10.script.WarpScriptStack; +import io.warp10.script.WarpScriptStackFunction; + +public class KVSTORE extends NamedWarpScriptFunction implements WarpScriptStackFunction { + + /** + * KV are stored with keys starting with 'X' (as Xtra data) + */ + public static final byte[] KVPREFIX = "X".getBytes(StandardCharsets.UTF_8); + + /** + * Token attribute for storing OPB64 encoded key prefix + */ + public static final String ATTR_KVPREFIX = ".kvprefix"; + + /** + * Maximum key size + */ + public static final String ATTR_KVMAXK = ".kvmaxk"; + + /** + * Maximum value size + */ + public static final String ATTR_KVMAXV = ".kvmaxv"; + + private static final int DEFAULT_MAXK; + private static final int DEFAULT_MAXV; + + static { + DEFAULT_MAXK = Integer.parseInt(WarpConfig.getProperty(Configuration.WARP_KVSTORE_MAXK, "128")); + DEFAULT_MAXV = Integer.parseInt(WarpConfig.getProperty(Configuration.WARP_KVSTORE_MAXV, "1024")); + } + + public KVSTORE(String name) { + super(name); + } + + @Override + public Object apply(WarpScriptStack stack) throws WarpScriptException { + Object top = stack.pop(); + + if (!(top instanceof String)) { + throw new WarpScriptException(getName() + " missing token."); + } + + // + // If the token has an attribute .kvprefix then the call can succeed. This attribute + // acts as a capability. + // + + WriteToken wtoken = Tokens.extractWriteToken((String) top); + + top = stack.pop(); + + if (!(top instanceof Map)) { + throw new WarpScriptException(getName() + " operates on a map."); + } + + Map kv = (Map) top; + + KVStoreRequest kvsr = new KVStoreRequest(); + kvsr.setToken(wtoken); + + int kvmaxv = DEFAULT_MAXV; + int kvmaxk = DEFAULT_MAXK; + + if (wtoken.getAttributesSize() > 0) { + if (null != wtoken.getAttributes().get(ATTR_KVMAXK)) { + kvmaxk = Integer.parseInt(wtoken.getAttributes().get(ATTR_KVMAXK)); + } + if (null != wtoken.getAttributes().get(ATTR_KVMAXV)) { + kvmaxv = Integer.parseInt(wtoken.getAttributes().get(ATTR_KVMAXV)); + } + } + + for (Entry entry: kv.entrySet()) { + byte[] key = null; + byte[] value = null; + + if (entry.getKey() instanceof byte[]) { + key = (byte[]) entry.getKey(); + } else if (entry.getKey() instanceof String) { + key = ((String) entry.getKey()).getBytes(StandardCharsets.UTF_8); + // Check that STRING keys do not contain Unicode characters + if (key.length != ((String) entry.getKey()).length()) { + throw new WarpScriptException(getName() + " STRING keys cannot contain non ISO-8859-1 characters."); + } + } else { + throw new WarpScriptException(getName() + " keys are expected to be BYTES or STRING."); + } + + if (entry.getValue() instanceof byte[]) { + value = (byte[]) entry.getValue(); + } else if (entry.getValue() instanceof String) { + value = ((String) entry.getValue()).getBytes(StandardCharsets.UTF_8); + } else if (null != entry.getValue()) { + throw new WarpScriptException(getName() + " values are expected to be BYTES or STRING."); + } + + if (key.length > kvmaxk) { + throw new WarpScriptException(getName() + " maximum allowed key size is " + kvmaxk + " bytes."); + } + + if (null != value && value.length > kvmaxv) { + throw new WarpScriptException(getName() + " maximum allowed value size is " + kvmaxv + " bytes."); + } + + kvsr.addToKeys(ByteBuffer.wrap(key)); + + if (null == value) { + kvsr.addToValues(ByteBuffer.allocate(0)); + } else { + kvsr.addToValues(ByteBuffer.wrap(value)); + } + } + + StoreClient store = stack.getStoreClient(); + + try { + store.kvstore(kvsr); + // We force writing a null to trigger a possible sync of a DatalogManager + store.kvstore(null); + } catch (IOException ioe) { + throw new WarpScriptException(getName() + " encountered an error while storing Key/Value pairs.", ioe); + } + + return stack; + } +} diff --git a/warp10/src/main/java/io/warp10/standalone/NullStoreClient.java b/warp10/src/main/java/io/warp10/standalone/NullStoreClient.java index eb935dc59..cf2236752 100644 --- a/warp10/src/main/java/io/warp10/standalone/NullStoreClient.java +++ b/warp10/src/main/java/io/warp10/standalone/NullStoreClient.java @@ -1,5 +1,5 @@ // -// Copyright 2018-2024 SenX S.A.S. +// Copyright 2018-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,6 +17,9 @@ package io.warp10.standalone; import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map.Entry; import java.util.NoSuchElementException; import io.warp10.continuum.gts.GTSDecoder; @@ -24,6 +27,8 @@ import io.warp10.continuum.store.GTSDecoderIterator; import io.warp10.continuum.store.StoreClient; import io.warp10.continuum.store.thrift.data.FetchRequest; +import io.warp10.continuum.store.thrift.data.KVFetchRequest; +import io.warp10.continuum.store.thrift.data.KVStoreRequest; import io.warp10.continuum.store.thrift.data.Metadata; import io.warp10.quasar.token.thrift.data.WriteToken; @@ -49,4 +54,24 @@ public void store(GTSEncoder encoder) throws IOException {} @Override public long delete(WriteToken token, Metadata metadata, long start, long end) throws IOException { return 0L; } + + @Override + public void kvstore(KVStoreRequest request) throws IOException {} + + @Override + public KVIterator> kvfetch(KVFetchRequest request) throws IOException { + return new KVIterator>() { + @Override + public boolean hasNext() { + return false; + } + @Override + public Entry next() { + return null; + } + @Override + public void close() throws Exception { + } + }; + } } diff --git a/warp10/src/main/java/io/warp10/standalone/PlasmaStoreClient.java b/warp10/src/main/java/io/warp10/standalone/PlasmaStoreClient.java index 11d2304a7..61e7a5bd6 100644 --- a/warp10/src/main/java/io/warp10/standalone/PlasmaStoreClient.java +++ b/warp10/src/main/java/io/warp10/standalone/PlasmaStoreClient.java @@ -1,5 +1,5 @@ // -// Copyright 2018-2020 SenX S.A.S. +// Copyright 2018-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -18,12 +18,16 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Map.Entry; import io.warp10.continuum.gts.GTSEncoder; import io.warp10.continuum.store.GTSDecoderIterator; import io.warp10.continuum.store.StoreClient; import io.warp10.continuum.store.thrift.data.FetchRequest; +import io.warp10.continuum.store.thrift.data.KVFetchRequest; +import io.warp10.continuum.store.thrift.data.KVStoreRequest; import io.warp10.continuum.store.thrift.data.Metadata; import io.warp10.quasar.token.thrift.data.WriteToken; @@ -44,7 +48,17 @@ public void store(GTSEncoder encoder) throws java.io.IOException { } } } - + @Override public long delete(WriteToken token, Metadata metadata, long start, long end) throws IOException { return 0L; } + + @Override + public void kvstore(KVStoreRequest request) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public KVIterator> kvfetch(KVFetchRequest request) throws IOException { + throw new UnsupportedOperationException(); + } } diff --git a/warp10/src/main/java/io/warp10/standalone/StandaloneAcceleratedStoreClient.java b/warp10/src/main/java/io/warp10/standalone/StandaloneAcceleratedStoreClient.java index ad1b7f674..6f3ce7b41 100644 --- a/warp10/src/main/java/io/warp10/standalone/StandaloneAcceleratedStoreClient.java +++ b/warp10/src/main/java/io/warp10/standalone/StandaloneAcceleratedStoreClient.java @@ -1,5 +1,5 @@ // -// Copyright 2020-2021 SenX S.A.S. +// Copyright 2020-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,8 +19,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; @@ -29,10 +31,10 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; -import io.warp10.CustomThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.warp10.CustomThreadFactory; import io.warp10.WarpConfig; import io.warp10.continuum.Configuration; import io.warp10.continuum.TimeSource; @@ -45,6 +47,8 @@ import io.warp10.continuum.store.StoreClient; import io.warp10.continuum.store.thrift.data.DirectoryRequest; import io.warp10.continuum.store.thrift.data.FetchRequest; +import io.warp10.continuum.store.thrift.data.KVFetchRequest; +import io.warp10.continuum.store.thrift.data.KVStoreRequest; import io.warp10.continuum.store.thrift.data.Metadata; import io.warp10.quasar.token.thrift.data.WriteToken; @@ -337,4 +341,14 @@ public void store(GTSEncoder encoder) throws IOException { cache.store(encoder); } } + + @Override + public void kvstore(KVStoreRequest request) throws IOException { + persistent.kvstore(request); + } + + @Override + public KVIterator> kvfetch(KVFetchRequest request) throws IOException { + return persistent.kvfetch(request); + } } diff --git a/warp10/src/main/java/io/warp10/standalone/StandaloneChunkedMemoryStore.java b/warp10/src/main/java/io/warp10/standalone/StandaloneChunkedMemoryStore.java index f89e98e10..340814097 100644 --- a/warp10/src/main/java/io/warp10/standalone/StandaloneChunkedMemoryStore.java +++ b/warp10/src/main/java/io/warp10/standalone/StandaloneChunkedMemoryStore.java @@ -1,5 +1,5 @@ // -// Copyright 2018-2023 SenX S.A.S. +// Copyright 2018-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,11 +20,18 @@ import java.io.IOException; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; @@ -38,12 +45,17 @@ import org.apache.hadoop.util.ShutdownHookManager; import org.apache.thrift.TDeserializer; import org.apache.thrift.TSerializer; -import org.apache.thrift.protocol.TCompactProtocol; +import org.bouncycastle.util.Arrays; +import org.iq80.leveldb.DBIterator; +import org.iq80.leveldb.ReadOptions; +import org.iq80.leveldb.WriteBatch; +import org.iq80.leveldb.WriteOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.MapMaker; +import io.warp10.BytesUtils; import io.warp10.CapacityExtractorOutputStream; import io.warp10.ThriftUtils; import io.warp10.continuum.TimeSource; @@ -54,12 +66,18 @@ import io.warp10.continuum.store.Constants; import io.warp10.continuum.store.GTSDecoderIterator; import io.warp10.continuum.store.StoreClient; +import io.warp10.continuum.store.StoreClient.KVIterator; import io.warp10.continuum.store.thrift.data.FetchRequest; import io.warp10.continuum.store.thrift.data.GTSWrapper; +import io.warp10.continuum.store.thrift.data.KVFetchRequest; +import io.warp10.continuum.store.thrift.data.KVStoreRequest; import io.warp10.continuum.store.thrift.data.Metadata; import io.warp10.crypto.KeyStore; +import io.warp10.crypto.OrderPreservingBase64; import io.warp10.crypto.SipHashInline; +import io.warp10.quasar.token.thrift.data.ReadToken; import io.warp10.quasar.token.thrift.data.WriteToken; +import io.warp10.script.functions.KVSTORE; import io.warp10.sensision.Sensision; /** @@ -75,6 +93,11 @@ public class StandaloneChunkedMemoryStore extends Thread implements StoreClient private final Map series; + private final SortedMap kv; + private final long MAXKVSIZE; + + private final AtomicLong kvsize = new AtomicLong(0L); + private final List plasmaHandlers = new ArrayList(); private StandaloneDirectoryClient directoryClient = null; @@ -100,7 +123,9 @@ public StandaloneChunkedMemoryStore(Properties properties, KeyStore keystore) { this.properties = properties; this.series = new MapMaker().concurrencyLevel(64).makeMap(); + this.kv = Collections.synchronizedSortedMap(new TreeMap()); + this.MAXKVSIZE = Long.parseLong(properties.getProperty(io.warp10.continuum.Configuration.IN_MEMORY_MAXKVSIZE, "10000000")); if ("true".equals(properties.getProperty(io.warp10.continuum.Configuration.IN_MEMORY_EPHEMERAL))) { this.chunkcount = 1; this.chunkspan = Long.MAX_VALUE; @@ -655,4 +680,232 @@ public int getChunkCount() { public int getGTSCount() { return this.series.size(); } + + @Override + public void kvstore(KVStoreRequest request) throws IOException { + + if (null == request) { + return; + } + + WriteToken token = request.getToken(); + + // + // Extract kvprefix from token attributes + // + + byte[] kvprefix = KVSTORE.KVPREFIX; + + if (token.getAttributesSize() > 0 && null != token.getAttributes().get(KVSTORE.ATTR_KVPREFIX)) { + if (!token.getAttributes().get(KVSTORE.ATTR_KVPREFIX).isEmpty()) { + byte[] tkvprefix = OrderPreservingBase64.decode(token.getAttributes().get(KVSTORE.ATTR_KVPREFIX)); + kvprefix = Arrays.copyOf(KVSTORE.KVPREFIX, KVSTORE.KVPREFIX.length + tkvprefix.length); + System.arraycopy(tkvprefix, 0, kvprefix, KVSTORE.KVPREFIX.length, tkvprefix.length); + } + } else { + throw new IOException("Token is missing attribute '" + KVSTORE.ATTR_KVPREFIX + "'."); + } + + // Iterate over the keys, missing or empty values will remove the entries + TreeMap kvs = new TreeMap(); + + for (int i = 0; i < request.getKeysSize(); i++) { + ByteBuffer kbb = request.getKeys().get(i); + kbb.mark(); + byte[] key = Arrays.copyOf(kvprefix, kvprefix.length + kbb.remaining()); + kbb.get(key, kvprefix.length, kbb.remaining()); + kbb.reset(); + if (i < request.getValuesSize()) { + ByteBuffer vbb = request.getValues().get(i); + vbb.mark(); + // If the associated value is empty, delete the entry under 'key' + if (0 == vbb.remaining()) { + byte[] old = this.kv.remove(OrderPreservingBase64.encodeToString(key)); + if (null != old) { + kvsize.addAndGet(-old.length-key.length); + } + } else { + byte[] value = new byte[vbb.remaining()]; + vbb.get(value); + kvs.put(OrderPreservingBase64.encodeToString(key), value); + } + vbb.reset(); + } else { + byte[] old = this.kv.remove(OrderPreservingBase64.encodeToString(key)); + if (null != old) { + kvsize.addAndGet(-old.length-key.length); + } + } + } + if (!kvs.isEmpty()) { + for (Entry entry: kvs.entrySet()) { + byte[] old = kv.put(entry.getKey(), entry.getValue()); + if (null != old) { + kvsize.addAndGet(entry.getValue().length-old.length); + } else { + kvsize.addAndGet(OrderPreservingBase64.decode(entry.getKey()).length + entry.getValue().length); + } + if (kvsize.get() > MAXKVSIZE) { + throw new IOException("Some K/V pairs could not be added as it would have exceeded the configured total size limit. All deletions were performed though."); + } + } + } + } + + @Override + public KVIterator> kvfetch(KVFetchRequest request) throws IOException { + + ReadToken token = request.getToken(); + + // + // Extract kvprefix from token attributes + // + + byte[] kvprefix = KVSTORE.KVPREFIX; + + if (token.getAttributesSize() > 0 && null != token.getAttributes().get(KVSTORE.ATTR_KVPREFIX)) { + if (!token.getAttributes().get(KVSTORE.ATTR_KVPREFIX).isEmpty()) { + byte[] tkvprefix = OrderPreservingBase64.decode(token.getAttributes().get(KVSTORE.ATTR_KVPREFIX)); + kvprefix = Arrays.copyOf(KVSTORE.KVPREFIX, KVSTORE.KVPREFIX.length + tkvprefix.length); + System.arraycopy(tkvprefix, 0, kvprefix, KVSTORE.KVPREFIX.length, tkvprefix.length); + } + } else { + throw new IOException("Token is missing attribute '" + KVSTORE.ATTR_KVPREFIX + "'."); + } + + final byte[] fkvprefix = kvprefix; + + ReadOptions options = new ReadOptions().fillCache(false); + + if (request.isSetStart() && request.isSetStop()) { + byte[] start = Arrays.copyOf(kvprefix, kvprefix.length + request.getStart().length); + System.arraycopy(request.getStart(), 0, start, kvprefix.length, request.getStart().length); + byte[] stop = Arrays.copyOf(kvprefix, kvprefix.length + request.getStop().length); + System.arraycopy(request.getStop(), 0, stop, kvprefix.length, request.getStop().length); + + SortedMap range = this.kv.subMap(OrderPreservingBase64.encodeToString(start), OrderPreservingBase64.encodeToString(stop)); + TreeSet keys = null; + + synchronized(this.kv) { + keys = new TreeSet(range.keySet()); + } + + final Iterator iterator = keys.iterator(); + + return new KVIterator>() { + + Entry element = null; + boolean done = false; + + @Override + public boolean hasNext() { + if (null != element) { + return true; + } + if (done) { + return false; + } + while(null == element && iterator.hasNext()) { + String key = iterator.next(); + element = new AbstractMap.SimpleEntry(OrderPreservingBase64.decode(key), kv.get(key)); + + if (null == element.getValue()) { + element = null; + } else { + // Strip prefixes from key + element = new AbstractMap.SimpleEntry(Arrays.copyOfRange(element.getKey(), fkvprefix.length, element.getKey().length), element.getValue()); + } + } + + if (null == element) { + done = true; + } + return null != element; + } + + @Override + public Entry next() { + if (done || null == element) { + throw new IllegalStateException(); + } + Entry elt = element; + element = null; + return elt; + } + + @Override + public void close() throws Exception { + done = true; + } + }; + + } else if (request.isSetKeys()) { + // Fetch explicit keys + return new KVIterator>() { + + Entry element = null; + boolean done = false; + int idx = 0; + + @Override + public boolean hasNext() { + if (done) { + return false; + } + if (null != element) { + return true; + } + if (idx >= request.getKeysSize()) { + done = true; + return false; + } + + while (idx < request.getKeysSize()) { + ByteBuffer bb = request.getKeys().get(idx++); + byte[] k = Arrays.copyOf(fkvprefix, fkvprefix.length + bb.remaining()); + bb.get(k, fkvprefix.length, bb.remaining()); + byte[] value = kv.get(OrderPreservingBase64.encodeToString(k)); + if (null != value) { + // Strip prefix + element = new AbstractMap.SimpleEntry(Arrays.copyOfRange(k, fkvprefix.length, k.length),value); + break; + } + } + + if (null == element) { + done = true; + } + return null != element; + } + + @Override + public void close() throws Exception { + done = true; + } + + @Override + public Entry next() { + if (done || null == element) { + throw new IllegalStateException(); + } + Entry elt = element; + element = null; + return elt; + } + }; + } else { + return new KVIterator>() { + @Override + public void close() throws Exception {} + @Override + public boolean hasNext() { + return false; + } + @Override + public Entry next() { + throw new IllegalStateException(); + } + }; + } + } } diff --git a/warp10/src/main/java/io/warp10/standalone/StandaloneParallelStoreClientWrapper.java b/warp10/src/main/java/io/warp10/standalone/StandaloneParallelStoreClientWrapper.java index 1e90ab5f2..0320a8496 100644 --- a/warp10/src/main/java/io/warp10/standalone/StandaloneParallelStoreClientWrapper.java +++ b/warp10/src/main/java/io/warp10/standalone/StandaloneParallelStoreClientWrapper.java @@ -1,5 +1,5 @@ // -// Copyright 2018-2020 SenX S.A.S. +// Copyright 2018-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,32 +16,36 @@ package io.warp10.standalone; import java.io.IOException; +import java.util.Iterator; +import java.util.Map.Entry; import io.warp10.continuum.gts.GTSEncoder; import io.warp10.continuum.store.GTSDecoderIterator; import io.warp10.continuum.store.ParallelGTSDecoderIteratorWrapper; import io.warp10.continuum.store.StoreClient; import io.warp10.continuum.store.thrift.data.FetchRequest; +import io.warp10.continuum.store.thrift.data.KVFetchRequest; +import io.warp10.continuum.store.thrift.data.KVStoreRequest; import io.warp10.continuum.store.thrift.data.Metadata; import io.warp10.quasar.token.thrift.data.WriteToken; public class StandaloneParallelStoreClientWrapper implements StoreClient { private final StoreClient parent; - + public StandaloneParallelStoreClientWrapper(StoreClient parent) { this.parent = parent; } - + @Override public void addPlasmaHandler(StandalonePlasmaHandlerInterface handler) { parent.addPlasmaHandler(handler); } - + @Override public long delete(WriteToken token, Metadata metadata, long start, long end) throws IOException { return parent.delete(token, metadata, start, end); } - + @Override public GTSDecoderIterator fetch(FetchRequest req) throws IOException { if (req.isParallelScanners() && ParallelGTSDecoderIteratorWrapper.useParallelScanners()) { @@ -50,9 +54,19 @@ public GTSDecoderIterator fetch(FetchRequest req) throws IOException { return parent.fetch(req); } } - + @Override public void store(GTSEncoder encoder) throws IOException { parent.store(encoder); } + + @Override + public void kvstore(KVStoreRequest request) throws IOException { + parent.kvstore(request); + } + + @Override + public KVIterator> kvfetch(KVFetchRequest request) throws IOException { + return parent.kvfetch(request); + } } diff --git a/warp10/src/main/java/io/warp10/standalone/StandaloneShardedStoreClientWrapper.java b/warp10/src/main/java/io/warp10/standalone/StandaloneShardedStoreClientWrapper.java index c4135961f..32dd1cd3a 100644 --- a/warp10/src/main/java/io/warp10/standalone/StandaloneShardedStoreClientWrapper.java +++ b/warp10/src/main/java/io/warp10/standalone/StandaloneShardedStoreClientWrapper.java @@ -1,5 +1,5 @@ // -// Copyright 2019-2023 SenX S.A.S. +// Copyright 2019-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,6 +17,8 @@ package io.warp10.standalone; import java.io.IOException; +import java.util.Iterator; +import java.util.Map.Entry; import io.warp10.WarpConfig; import io.warp10.continuum.Configuration; @@ -25,6 +27,8 @@ import io.warp10.continuum.store.GTSDecoderIterator; import io.warp10.continuum.store.StoreClient; import io.warp10.continuum.store.thrift.data.FetchRequest; +import io.warp10.continuum.store.thrift.data.KVFetchRequest; +import io.warp10.continuum.store.thrift.data.KVStoreRequest; import io.warp10.continuum.store.thrift.data.Metadata; import io.warp10.crypto.KeyStore; import io.warp10.crypto.SipHashInline; @@ -131,4 +135,14 @@ public void store(GTSEncoder encoder) throws IOException { this.client.store(encoder); } } + + @Override + public void kvstore(KVStoreRequest request) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public KVIterator> kvfetch(KVFetchRequest request) throws IOException { + throw new UnsupportedOperationException(); + } } diff --git a/warp10/src/main/java/io/warp10/standalone/StandaloneStoreClient.java b/warp10/src/main/java/io/warp10/standalone/StandaloneStoreClient.java index 82cedc69c..9d3f742e7 100644 --- a/warp10/src/main/java/io/warp10/standalone/StandaloneStoreClient.java +++ b/warp10/src/main/java/io/warp10/standalone/StandaloneStoreClient.java @@ -1,5 +1,5 @@ // -// Copyright 2018-2023 SenX S.A.S. +// Copyright 2018-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -29,13 +30,14 @@ import java.util.Random; import java.util.concurrent.atomic.AtomicLong; +import org.bouncycastle.util.Arrays; import org.iq80.leveldb.DBIterator; import org.iq80.leveldb.ReadOptions; import org.iq80.leveldb.WriteBatch; import org.iq80.leveldb.WriteOptions; -import io.warp10.WarpConfig; import io.warp10.BytesUtils; +import io.warp10.WarpConfig; import io.warp10.continuum.Configuration; import io.warp10.continuum.Tokens; import io.warp10.continuum.gts.GTSDecoder; @@ -47,11 +49,15 @@ import io.warp10.continuum.store.GTSDecoderIterator; import io.warp10.continuum.store.StoreClient; import io.warp10.continuum.store.thrift.data.FetchRequest; +import io.warp10.continuum.store.thrift.data.KVFetchRequest; +import io.warp10.continuum.store.thrift.data.KVStoreRequest; import io.warp10.continuum.store.thrift.data.Metadata; import io.warp10.crypto.KeyStore; +import io.warp10.crypto.OrderPreservingBase64; import io.warp10.leveldb.WarpDB; import io.warp10.quasar.token.thrift.data.ReadToken; import io.warp10.quasar.token.thrift.data.WriteToken; +import io.warp10.script.functions.KVSTORE; import io.warp10.sensision.Sensision; public class StandaloneStoreClient implements StoreClient { @@ -81,7 +87,7 @@ public class StandaloneStoreClient implements StoreClient { private final boolean syncwrites; private final double syncrate; private final int blockcacheThreshold; - + protected StandaloneStoreClient() { MAX_ENCODER_SIZE = 0; MAX_DELETE_BATCHSIZE = 0; @@ -665,7 +671,7 @@ private void store(List kvs) throws IOException { } } } - + /** * Persists a GTSEncoder instance. * CAUTION, this method assumes that classId and labelsId HAVE BEEN computed for @@ -821,4 +827,223 @@ public long delete(WriteToken token, Metadata metadata, long start, long end) th public void addPlasmaHandler(StandalonePlasmaHandlerInterface plasmaHandler) { this.plasmaHandlers.add(plasmaHandler); } + + @Override + public void kvstore(KVStoreRequest request) throws IOException { + + if (null == request) { + return; + } + + WriteToken token = request.getToken(); + + // + // Extract kvprefix from token attributes + // + + byte[] kvprefix = KVSTORE.KVPREFIX; + + if (token.getAttributesSize() > 0 && null != token.getAttributes().get(KVSTORE.ATTR_KVPREFIX)) { + if (!token.getAttributes().get(KVSTORE.ATTR_KVPREFIX).isEmpty()) { + byte[] tkvprefix = OrderPreservingBase64.decode(token.getAttributes().get(KVSTORE.ATTR_KVPREFIX)); + kvprefix = Arrays.copyOf(KVSTORE.KVPREFIX, KVSTORE.KVPREFIX.length + tkvprefix.length); + System.arraycopy(tkvprefix, 0, kvprefix, KVSTORE.KVPREFIX.length, tkvprefix.length); + } + } else { + throw new IOException("Token is missing attribute '" + KVSTORE.ATTR_KVPREFIX + "'."); + } + + WriteBatch batch = db.createWriteBatch(); + + boolean written = false; + + try { + // Iterate over the keys, missing or empty values will remove the entries + for (int i = 0; i < request.getKeysSize(); i++) { + ByteBuffer kbb = request.getKeys().get(i); + kbb.mark(); + byte[] key = Arrays.copyOf(kvprefix, kvprefix.length + kbb.remaining()); + kbb.get(key, kvprefix.length, kbb.remaining()); + kbb.reset(); + if (i < request.getValuesSize()) { + ByteBuffer vbb = request.getValues().get(i); + vbb.mark(); + // If the associated value is empty, delete the entry under 'key' + if (0 == vbb.remaining()) { + batch.delete(key); + } else { + byte[] value = new byte[vbb.remaining()]; + vbb.get(value); + batch.put(key, value); + } + vbb.reset(); + } else { + batch.delete(key); + } + } + + WriteOptions options = new WriteOptions().sync(1.0 == syncrate); + + if (syncwrites && !options.sync()) { + options = new WriteOptions().sync(Math.random() < syncrate); + } + + this.db.write(batch, options); + written = true; + } finally { + if (written) { + batch.close(); + } + } + } + + @Override + public KVIterator> kvfetch(KVFetchRequest request) throws IOException { + + ReadToken token = request.getToken(); + + // + // Extract kvprefix from token attributes + // + + byte[] kvprefix = KVSTORE.KVPREFIX; + + if (token.getAttributesSize() > 0 && null != token.getAttributes().get(KVSTORE.ATTR_KVPREFIX)) { + if (!token.getAttributes().get(KVSTORE.ATTR_KVPREFIX).isEmpty()) { + byte[] tkvprefix = OrderPreservingBase64.decode(token.getAttributes().get(KVSTORE.ATTR_KVPREFIX)); + kvprefix = Arrays.copyOf(KVSTORE.KVPREFIX, KVSTORE.KVPREFIX.length + tkvprefix.length); + System.arraycopy(tkvprefix, 0, kvprefix, KVSTORE.KVPREFIX.length, tkvprefix.length); + } + } else { + throw new IOException("Token is missing attribute '" + KVSTORE.ATTR_KVPREFIX + "'."); + } + + final byte[] fkvprefix = kvprefix; + + ReadOptions options = new ReadOptions().fillCache(false); + + if (request.isSetStart() && request.isSetStop()) { + byte[] start = Arrays.copyOf(kvprefix, kvprefix.length + request.getStart().length); + System.arraycopy(request.getStart(), 0, start, kvprefix.length, request.getStart().length); + byte[] stop = Arrays.copyOf(kvprefix, kvprefix.length + request.getStop().length); + System.arraycopy(request.getStop(), 0, stop, kvprefix.length, request.getStop().length); + + final DBIterator iterator = db.iterator(options); + + iterator.seek(start); + final byte[] end = stop; + + return new KVIterator>() { + + Entry element = null; + boolean done = false; + + @Override + public boolean hasNext() { + if (null != element) { + return true; + } + if (done) { + return false; + } + if (iterator.hasNext()) { + element = iterator.next(); + + if (BytesUtils.compareTo(element.getKey(),end) >= 0) { + element = null; + done = true; + } else { + // Strip prefixes from key + element = new AbstractMap.SimpleEntry(Arrays.copyOfRange(element.getKey(), fkvprefix.length, element.getKey().length), element.getValue()); + } + } + return null != element; + } + + @Override + public Entry next() { + if (done || null == element) { + throw new IllegalStateException(); + } + Entry elt = element; + element = null; + return elt; + } + + @Override + public void close() throws Exception { + done = true; + iterator.close(); + } + }; + + } else if (request.isSetKeys()) { + // Fetch explicit keys + return new KVIterator>() { + + Entry element = null; + boolean done = false; + int idx = 0; + + @Override + public boolean hasNext() { + if (done) { + return false; + } + if (null != element) { + return true; + } + if (idx >= request.getKeysSize()) { + done = true; + return false; + } + + while (idx < request.getKeysSize()) { + ByteBuffer bb = request.getKeys().get(idx++); + byte[] k = Arrays.copyOf(fkvprefix, fkvprefix.length + bb.remaining()); + bb.get(k, fkvprefix.length, bb.remaining()); + byte[] value = db.get(k); + if (null != value) { + // Strip prefix + element = new AbstractMap.SimpleEntry(Arrays.copyOfRange(k, fkvprefix.length, k.length),value); + break; + } + } + + if (null == element) { + done = true; + } + return null != element; + } + + @Override + public void close() throws Exception { + done = true; + } + + @Override + public Entry next() { + if (done || null == element) { + throw new IllegalStateException(); + } + Entry elt = element; + element = null; + return elt; + } + }; + } else { + return new KVIterator>() { + @Override + public void close() throws Exception {} + @Override + public boolean hasNext() { + return false; + } + @Override + public Entry next() { + throw new IllegalStateException(); + } + }; + } + } } diff --git a/warp10/src/main/java/io/warp10/standalone/datalog/DatalogHelper.java b/warp10/src/main/java/io/warp10/standalone/datalog/DatalogHelper.java index 0c9d10f2c..b7cee2171 100644 --- a/warp10/src/main/java/io/warp10/standalone/datalog/DatalogHelper.java +++ b/warp10/src/main/java/io/warp10/standalone/datalog/DatalogHelper.java @@ -1,5 +1,5 @@ // -// Copyright 2020-2023 SenX S.A.S. +// Copyright 2020-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,13 +24,13 @@ import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; -import org.apache.thrift.protocol.TCompactProtocol; import io.warp10.ThriftUtils; import io.warp10.WarpConfig; import io.warp10.continuum.gts.GTSEncoder; import io.warp10.continuum.store.thrift.data.DatalogRecord; import io.warp10.continuum.store.thrift.data.DatalogRecordType; +import io.warp10.continuum.store.thrift.data.KVStoreRequest; import io.warp10.continuum.store.thrift.data.Metadata; import io.warp10.quasar.token.thrift.data.WriteToken; @@ -90,6 +90,16 @@ public static DatalogRecord getUnregisterRecord(String id, Metadata metadata) th return record; } + public static DatalogRecord getKVStoreRecord(String id, KVStoreRequest request) throws IOException { + DatalogRecord record = new DatalogRecord(); + record.setType(DatalogRecordType.KVSTORE); + record.setId(id); + record.setTimestamp(System.currentTimeMillis()); + record.setKvstorerequest(request); + + return record; + } + public static byte[] serialize(DatalogRecord record) throws IOException { // If 'forward' is set, copy its content to metadata/baseTimestamp/encoder and // clear the 'forward' field diff --git a/warp10/src/main/java/io/warp10/standalone/datalog/DatalogManager.java b/warp10/src/main/java/io/warp10/standalone/datalog/DatalogManager.java index 29cb230b5..9ac144aa0 100644 --- a/warp10/src/main/java/io/warp10/standalone/datalog/DatalogManager.java +++ b/warp10/src/main/java/io/warp10/standalone/datalog/DatalogManager.java @@ -1,5 +1,5 @@ // -// Copyright 2020-2022 SenX S.A.S. +// Copyright 2020-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import io.warp10.continuum.gts.GTSEncoder; import io.warp10.continuum.store.StoreClient; import io.warp10.continuum.store.thrift.data.DatalogRecord; +import io.warp10.continuum.store.thrift.data.KVStoreRequest; import io.warp10.continuum.store.thrift.data.Metadata; import io.warp10.quasar.token.thrift.data.WriteToken; import io.warp10.standalone.StandaloneDirectoryClient; @@ -39,6 +40,7 @@ public abstract class DatalogManager { protected abstract void unregister(Metadata metadata) throws IOException; protected abstract void store(GTSEncoder encoder) throws IOException; protected abstract void delete(WriteToken token, Metadata metadata, long start, long end) throws IOException; + protected abstract void kvstore(KVStoreRequest request) throws IOException; /** * Method to process a record retrieved from a feeder. This is different from locally generated diff --git a/warp10/src/main/java/io/warp10/standalone/datalog/DatalogStoreClient.java b/warp10/src/main/java/io/warp10/standalone/datalog/DatalogStoreClient.java index a42c09aa4..e575fcfdc 100644 --- a/warp10/src/main/java/io/warp10/standalone/datalog/DatalogStoreClient.java +++ b/warp10/src/main/java/io/warp10/standalone/datalog/DatalogStoreClient.java @@ -1,5 +1,5 @@ // -// Copyright 2020 SenX S.A.S. +// Copyright 2020-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,11 +17,15 @@ package io.warp10.standalone.datalog; import java.io.IOException; +import java.util.Iterator; +import java.util.Map.Entry; import io.warp10.continuum.gts.GTSEncoder; import io.warp10.continuum.store.GTSDecoderIterator; import io.warp10.continuum.store.StoreClient; import io.warp10.continuum.store.thrift.data.FetchRequest; +import io.warp10.continuum.store.thrift.data.KVFetchRequest; +import io.warp10.continuum.store.thrift.data.KVStoreRequest; import io.warp10.continuum.store.thrift.data.Metadata; import io.warp10.quasar.token.thrift.data.WriteToken; import io.warp10.standalone.StandalonePlasmaHandlerInterface; @@ -30,12 +34,12 @@ public class DatalogStoreClient implements StoreClient { private final DatalogManager manager; private final StoreClient store; - + public DatalogStoreClient(DatalogManager manager, StoreClient store) { this.manager = manager; this.store = store; } - + @Override public GTSDecoderIterator fetch(FetchRequest req) throws IOException { return store.fetch(req); @@ -63,8 +67,19 @@ public long delete(WriteToken token, Metadata metadata, long start, long end) th public void addPlasmaHandler(StandalonePlasmaHandlerInterface plasmaHandler) { store.addPlasmaHandler(plasmaHandler); } - + public DatalogManager getDatalogManager() { return this.manager; } + + @Override + public void kvstore(KVStoreRequest request) throws IOException { + store.kvstore(request); + manager.kvstore(request); + } + + @Override + public KVIterator> kvfetch(KVFetchRequest request) throws IOException { + return store.kvfetch(request); + } } diff --git a/warp10/src/main/java/io/warp10/standalone/datalog/DatalogWorker.java b/warp10/src/main/java/io/warp10/standalone/datalog/DatalogWorker.java index 3e669655a..c8a2b0da8 100644 --- a/warp10/src/main/java/io/warp10/standalone/datalog/DatalogWorker.java +++ b/warp10/src/main/java/io/warp10/standalone/datalog/DatalogWorker.java @@ -1,5 +1,5 @@ // -// Copyright 2020-2023 SenX S.A.S. +// Copyright 2020-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -25,11 +25,9 @@ import io.warp10.continuum.Configuration; import io.warp10.continuum.gts.GTSDecoder; import io.warp10.continuum.gts.GTSEncoder; -import io.warp10.continuum.gts.GTSWrapperHelper; import io.warp10.continuum.sensision.SensisionConstants; import io.warp10.continuum.store.StoreClient; import io.warp10.continuum.store.thrift.data.DatalogRecord; -import io.warp10.continuum.store.thrift.data.DatalogRecordType; import io.warp10.continuum.store.thrift.data.Metadata; import io.warp10.quasar.token.thrift.data.WriteToken; import io.warp10.sensision.Sensision; @@ -95,6 +93,7 @@ public void run() { long lastDeleteRecord = 0L; long lastRegisterRecord = 0L; long lastUnregisterRecord = 0L; + long lastKvstoreRecord = 0L; while(true) { @@ -115,12 +114,14 @@ public void run() { if (now - lastRegisterRecord > FLUSH_INTERVAL || now - lastUnregisterRecord > FLUSH_INTERVAL || now - lastUpdateRecord > FLUSH_INTERVAL - || now - lastDeleteRecord > FLUSH_INTERVAL) { + || now - lastDeleteRecord > FLUSH_INTERVAL + || now - lastKvstoreRecord > FLUSH_INTERVAL) { manager.process(null); lastRegisterRecord = now; lastUnregisterRecord = now; lastUpdateRecord = now; lastDeleteRecord = now; + lastKvstoreRecord = now; } } else { long now = System.currentTimeMillis(); @@ -132,6 +133,10 @@ public void run() { storeClient.delete(null, null, Long.MAX_VALUE, Long.MAX_VALUE); lastDeleteRecord = now; } + if (now - lastKvstoreRecord > FLUSH_INTERVAL) { + storeClient.kvstore(null); + lastKvstoreRecord = now; + } if (now - lastRegisterRecord > FLUSH_INTERVAL) { directoryClient.register(null); lastRegisterRecord = now; @@ -191,6 +196,10 @@ record = job.record; storeClient.delete(record.getToken(), metadata, record.getStart(), record.getStop()); job.consumer.success(job.ref); break; + case KVSTORE: + storeClient.kvstore(record.getKvstorerequest()); + job.consumer.success(job.ref); + break; } } switch(job.record.getType()) { @@ -206,6 +215,9 @@ record = job.record; case UNREGISTER: lastUnregisterRecord = System.currentTimeMillis(); break; + case KVSTORE: + lastKvstoreRecord = System.currentTimeMillis(); + break; } } catch (Throwable t) { err = t; @@ -231,6 +243,7 @@ public void flush() throws Exception { } else { storeClient.store(null); storeClient.delete(null, null, Long.MAX_VALUE, Long.MAX_VALUE); + storeClient.kvstore(null); directoryClient.register(null); directoryClient.unregister(null); } diff --git a/warp10/src/main/java/io/warp10/standalone/datalog/DatalogWorkers.java b/warp10/src/main/java/io/warp10/standalone/datalog/DatalogWorkers.java index 1b65a62fa..3ad4a132b 100644 --- a/warp10/src/main/java/io/warp10/standalone/datalog/DatalogWorkers.java +++ b/warp10/src/main/java/io/warp10/standalone/datalog/DatalogWorkers.java @@ -1,5 +1,5 @@ // -// Copyright 2020-2023 SenX S.A.S. +// Copyright 2020-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import io.warp10.continuum.store.DirectoryClient; import io.warp10.continuum.store.StoreClient; import io.warp10.continuum.store.thrift.data.DatalogRecord; +import io.warp10.continuum.store.thrift.data.DatalogRecordType; public class DatalogWorkers { @@ -91,11 +92,20 @@ public static void offer(DatalogConsumer consumer, String ref, DatalogRecord rec // Note: this has nothing to do with the shard id // - long classid = record.getMetadata().getClassId(); - long labelsid = record.getMetadata().getLabelsId(); + // + // For KVStore messages we use the first worker so all mutations are handled by a single worker. + // We cannot do it on a per Key basis since a single KVStoreRequest may have multiple keys + // + + long partition = 0L; - long partkey = (((classid << 16) & 0xFFFF0000L) | ((labelsid >>> 48) & 0xFFFFL)); - long partition = partkey % NUM_WORKERS; + if (!DatalogRecordType.KVSTORE.equals(record.getType())) { + long classid = record.getMetadata().getClassId(); + long labelsid = record.getMetadata().getLabelsId(); + + long partkey = (((classid << 16) & 0xFFFF0000L) | ((labelsid >>> 48) & 0xFFFFL)); + partition = partkey % NUM_WORKERS; + } DatalogJob job = new DatalogJob(consumer, ref, record); diff --git a/warp10/src/main/java/io/warp10/standalone/datalog/FileBasedDatalogManager.java b/warp10/src/main/java/io/warp10/standalone/datalog/FileBasedDatalogManager.java index 6a66df3b0..7a111479f 100644 --- a/warp10/src/main/java/io/warp10/standalone/datalog/FileBasedDatalogManager.java +++ b/warp10/src/main/java/io/warp10/standalone/datalog/FileBasedDatalogManager.java @@ -1,5 +1,5 @@ // -// Copyright 2020-2023 SenX S.A.S. +// Copyright 2020-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -52,6 +52,7 @@ import io.warp10.continuum.gts.GTSEncoder; import io.warp10.continuum.store.StoreClient; import io.warp10.continuum.store.thrift.data.DatalogRecord; +import io.warp10.continuum.store.thrift.data.KVStoreRequest; import io.warp10.continuum.store.thrift.data.Metadata; import io.warp10.quasar.token.thrift.data.WriteToken; import io.warp10.standalone.StandaloneDirectoryClient; @@ -77,6 +78,7 @@ public class FileBasedDatalogManager extends DatalogManager implements Runnable public static final String CONFIG_DATALOG_MANAGER_FORWARD = "datalog.manager.forward"; public static final String CONFIG_DATALOG_MANAGER_LOGUPDATES = "datalog.manager.logupdates"; public static final String CONFIG_DATALOG_MANAGER_LOGDELETES = "datalog.manager.logdeletes"; + public static final String CONFIG_DATALOG_MANAGER_LOGKVSTORES = "datalog.manager.logkvstores"; public static final String CONFIG_DATALOG_FEEDER_ID = "datalog.feeder.id"; public static final String CONFIG_DATALOG_FEEDER_HOST = "datalog.feeder.host"; @@ -166,6 +168,7 @@ public class FileBasedDatalogManager extends DatalogManager implements Runnable */ private final boolean logupdates; private final boolean logdeletes; + private final boolean logkvstores; private static final boolean REGISTER_UPDATES; @@ -222,9 +225,10 @@ public FileBasedDatalogManager() { throw new RuntimeException("Missing Datalog id '" + CONFIG_DATALOG_MANAGER_ID + "'."); } - // Should we log UPDATE / DELETE records? + // Should we log UPDATE / DELETE / KVSTORE records? this.logupdates = "true".equals(WarpConfig.getProperty(CONFIG_DATALOG_MANAGER_LOGUPDATES, "true")); this.logdeletes = "true".equals(WarpConfig.getProperty(CONFIG_DATALOG_MANAGER_LOGDELETES, "true")); + this.logkvstores = "true".equals(WarpConfig.getProperty(CONFIG_DATALOG_MANAGER_LOGKVSTORES, "true")); if (null != WarpConfig.getProperty(CONFIG_DATALOG_MANAGER_FORWARD)) { String[] ids = WarpConfig.getProperty(CONFIG_DATALOG_MANAGER_FORWARD).split(","); @@ -349,10 +353,23 @@ protected void delete(WriteToken token, Metadata metadata, long start, long end) if (null == metadata) { return; } - System.out.println("delete: " + token + " " + metadata + " " + start + " " + end); + //System.out.println("delete: " + token + " " + metadata + " " + start + " " + end); append(DatalogHelper.getDeleteRecord(id, token, metadata, start, end)); } + @Override + protected void kvstore(KVStoreRequest request) throws IOException { + if (!logkvstores) { + return; + } + + if (null == request) { + append(null); + } else { + append(DatalogHelper.getKVStoreRecord(id, request)); + } + } + @Override protected void process(DatalogRecord record) throws IOException { @@ -360,6 +377,7 @@ protected void process(DatalogRecord record) throws IOException { if (null == record) { this.storeClient.store(null); this.storeClient.delete(null, null, Long.MAX_VALUE, Long.MAX_VALUE); + this.storeClient.kvstore(null); this.directoryClient.unregister(null); this.directoryClient.register(null); return; @@ -409,6 +427,10 @@ protected void process(DatalogRecord record) throws IOException { case DELETE: this.storeClient.delete(record.getToken(), record.getMetadata(), record.getStart(), record.getStop()); break; + + case KVSTORE: + this.storeClient.kvstore(record.getKvstorerequest()); + break; } // If the record id is in 'noforward', don't append the record to the log file @@ -527,7 +549,7 @@ public int compare(FileStatus o1, FileStatus o2) { long count = 0L; while(reader.next(key, val)) { - System.out.println("VAL=" + DatalogHelper.getRecord(val.getBytes(), 0, val.getLength())); + //System.out.println("VAL=" + DatalogHelper.getRecord(val.getBytes(), 0, val.getLength())); count++; } @@ -575,7 +597,7 @@ private void append(DatalogRecord record) throws IOException { key[offset++] = (byte) ((l >>> 8) & 0xFFL); key[offset++] = (byte) (l & 0xFFL); - l = record.getMetadata().getClassId(); + l = null == record.getMetadata() ? 0L : record.getMetadata().getClassId(); key[offset++] = (byte) ((l >>> 56) & 0xFFL); key[offset++] = (byte) ((l >>> 48) & 0xFFL); key[offset++] = (byte) ((l >>> 40) & 0xFFL); @@ -585,7 +607,7 @@ private void append(DatalogRecord record) throws IOException { key[offset++] = (byte) ((l >>> 8) & 0xFFL); key[offset++] = (byte) (l & 0xFFL); - l = record.getMetadata().getLabelsId(); + l = null == record.getMetadata() ? 0L : record.getMetadata().getLabelsId(); key[offset++] = (byte) ((l >>> 56) & 0xFFL); key[offset++] = (byte) ((l >>> 48) & 0xFFL); key[offset++] = (byte) ((l >>> 40) & 0xFFL); @@ -625,6 +647,7 @@ public void run() { while(true) { // Should we close the current file and reopen a new one? if (done.get() || null == datalog || size.get() > MAXSIZE || System.currentTimeMillis() - start.get() > MAXTIME) { + try { lock.lockInterruptibly(); diff --git a/warp10/src/main/java/io/warp10/standalone/datalog/TCPDatalogConsumer.java b/warp10/src/main/java/io/warp10/standalone/datalog/TCPDatalogConsumer.java index 301ae59fe..cd995f9b2 100644 --- a/warp10/src/main/java/io/warp10/standalone/datalog/TCPDatalogConsumer.java +++ b/warp10/src/main/java/io/warp10/standalone/datalog/TCPDatalogConsumer.java @@ -1,5 +1,5 @@ // -// Copyright 2020-2023 SenX S.A.S. +// Copyright 2020-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -540,7 +540,7 @@ record = new DatalogRecord(); DatalogHelper.deserialize(msg.getRecord(), record); // - // If our id or an exluded one created this message, ignore it, it would otherwise create a loop + // If our id or an excluded one created this message, ignore it, it would otherwise create a loop // if (id.equals(record.getId()) || this.excluded.contains(record.getId())) { @@ -550,219 +550,221 @@ record = new DatalogRecord(); continue; } - // - // Recompute the class and labels id with our own keys - // + if (!DatalogRecordType.KVSTORE.equals(record.getType())) { + // + // Recompute the class and labels id with our own keys + // - long classid = GTSHelper.classId(CLASS_KEYS, record.getMetadata().getName()); - long labelsid = GTSHelper.labelsId(LABELS_KEYS, record.getMetadata().getLabels()); + long classid = GTSHelper.classId(CLASS_KEYS, record.getMetadata().getName()); + long labelsid = GTSHelper.labelsId(LABELS_KEYS, record.getMetadata().getLabels()); - record.getMetadata().setClassId(classid); - record.getMetadata().setLabelsId(labelsid); + record.getMetadata().setClassId(classid); + record.getMetadata().setLabelsId(labelsid); - // - // Check shards if needed - // + // + // Check shards if needed + // - if (hasShards) { - // compute shard id - long sid = DatalogHelper.getShardId(classid, labelsid, shardShift); + if (hasShards) { + // compute shard id + long sid = DatalogHelper.getShardId(classid, labelsid, shardShift); - boolean matched = false; + boolean matched = false; - for (int i = 0; i < modulus.length; i++) { - if (0 != modulus[i] && remainder[i] == sid % modulus[i]) { - matched = true; - break; + for (int i = 0; i < modulus.length; i++) { + if (0 != modulus[i] && remainder[i] == sid % modulus[i]) { + matched = true; + break; + } } - } - - if (!matched) { - continue; - } - } - - // - // Execute the filtering macro if set. - // If 'datalog.consumer.macro.data' is false, the macro is - // expected to return a boolean which, if true, will - // accept the message. - // The macro is fed with a GTS Encoder with the metadata - // of the GTS subject of the message and a STRING with - // the type of message (From DatalogRecordType, UPDATE, DELETE, REGISTER, UNREGISTER). - // If 'datalog.consumer.macro.data' is true, the macro - // is fed with an encoder as above but with the actual data included for UPDATE messages. - // The macro will then have the ability to alter the encoder to update metadata - // and/or data. It is then expected to return a boolean or an encoder. - // - // The macro is called in the consumer thread, not in the worker threads so metadata alterations - // are reflected in the choice of the worker to respect the sequence of messages per GTS - // - if (null != macro) { - stack.show(); - stack.clear(); - Map tokenMap = null; - - if (macroToken && null != record.getToken()) { - tokenMap = TOKENDUMP.mapFromToken(record.getToken()); + if (!matched) { + continue; + } } - if (!macroData) { - GTSEncoder encoder = new GTSEncoder(0L); - encoder.setMetadata(record.getMetadata()); - - stack.push(tokenMap); - stack.push(encoder); - stack.push(record.getType().name()); - stack.push(macro); - RUN.apply(stack); - if (0 == stack.depth() || !Boolean.TRUE.equals(stack.peek())) { - stack.show(); - stack.clear(); - Sensision.update(SensisionConstants.SENSISION_CLASS_DATALOG_SKIPPED, labels, 1); - continue; + // + // Execute the filtering macro if set. + // If 'datalog.consumer.macro.data' is false, the macro is + // expected to return a boolean which, if true, will + // accept the message. + // The macro is fed with a GTS Encoder with the metadata + // of the GTS subject of the message and a STRING with + // the type of message (From DatalogRecordType, UPDATE, DELETE, REGISTER, UNREGISTER). + // If 'datalog.consumer.macro.data' is true, the macro + // is fed with an encoder as above but with the actual data included for UPDATE messages. + // The macro will then have the ability to alter the encoder to update metadata + // and/or data. It is then expected to return a boolean or an encoder. + // + // The macro is called in the consumer thread, not in the worker threads so metadata alterations + // are reflected in the choice of the worker to respect the sequence of messages per GTS + // + + if (null != macro) { + stack.show(); + stack.clear(); + Map tokenMap = null; + + if (macroToken && null != record.getToken()) { + tokenMap = TOKENDUMP.mapFromToken(record.getToken()); } - } else { - GTSEncoder encoder; - if (DatalogRecordType.UPDATE.equals(record.getType())) { - GTSDecoder decoder = new GTSDecoder(record.getBaseTimestamp(), record.bufferForEncoder()); - decoder.next(); - encoder = decoder.getEncoder(); + if (!macroData) { + GTSEncoder encoder = new GTSEncoder(0L); + encoder.setMetadata(record.getMetadata()); + + stack.push(tokenMap); + stack.push(encoder); + stack.push(record.getType().name()); + stack.push(macro); + RUN.apply(stack); + if (0 == stack.depth() || !Boolean.TRUE.equals(stack.peek())) { + stack.show(); + stack.clear(); + Sensision.update(SensisionConstants.SENSISION_CLASS_DATALOG_SKIPPED, labels, 1); + continue; + } } else { - encoder = new GTSEncoder(0L); - } - encoder.setMetadata(record.getMetadata()); - - // Call the macro with the token, the encoder and the record type - stack.push(tokenMap); - stack.push(encoder); - stack.push(record.getType().name()); - stack.push(macro); - RUN.apply(stack); - - // Skip if the macro did not return a boolean TRUE - boolean skip = !(stack.depth() >= 1 && Boolean.TRUE.equals(stack.peek())); - - boolean updateIds = false; - - if (!skip) { - // Discard the boolean - stack.pop(); - - // The macro returned a GTS or an ENCODER, update the record with the - // metadata and the content (for UPDATE records) - GTSEncoder forwardEncoder = null; - if (stack.depth() > 0) { - if (stack.peek() instanceof GeoTimeSerie) { - GeoTimeSerie gts = (GeoTimeSerie) stack.pop(); - encoder = new GTSEncoder(0L); - encoder.setMetadata(gts.getMetadata()); - encoder.encode(gts); - updateIds = true; - } else if (stack.peek() instanceof GTSEncoder) { - encoder = (GTSEncoder) stack.pop(); - updateIds = true; - } else if (stack.peek() instanceof List) { - // The macro returned a list of two elements, either GTS or ENCODER. The first one is the - // content to use for the action (UPDATE/REGISTER/UNREGISTER/DELETE), the second one is - // the content to forward. - List l = (List) stack.pop(); - if (2 == l.size()) { - Object update = l.get(0); - Object forward = l.get(1); - - if (update instanceof GeoTimeSerie) { - GeoTimeSerie gts = (GeoTimeSerie) update; - encoder = new GTSEncoder(0L); - encoder.setMetadata(gts.getMetadata()); - encoder.encode(gts); - updateIds = true; - } else if (update instanceof GTSEncoder) { - encoder = (GTSEncoder) update; - updateIds = true; + GTSEncoder encoder; + + if (DatalogRecordType.UPDATE.equals(record.getType())) { + GTSDecoder decoder = new GTSDecoder(record.getBaseTimestamp(), record.bufferForEncoder()); + decoder.next(); + encoder = decoder.getEncoder(); + } else { + encoder = new GTSEncoder(0L); + } + encoder.setMetadata(record.getMetadata()); + + // Call the macro with the token, the encoder and the record type + stack.push(tokenMap); + stack.push(encoder); + stack.push(record.getType().name()); + stack.push(macro); + RUN.apply(stack); + + // Skip if the macro did not return a boolean TRUE + boolean skip = !(stack.depth() >= 1 && Boolean.TRUE.equals(stack.peek())); + + boolean updateIds = false; + + if (!skip) { + // Discard the boolean + stack.pop(); + + // The macro returned a GTS or an ENCODER, update the record with the + // metadata and the content (for UPDATE records) + GTSEncoder forwardEncoder = null; + if (stack.depth() > 0) { + if (stack.peek() instanceof GeoTimeSerie) { + GeoTimeSerie gts = (GeoTimeSerie) stack.pop(); + encoder = new GTSEncoder(0L); + encoder.setMetadata(gts.getMetadata()); + encoder.encode(gts); + updateIds = true; + } else if (stack.peek() instanceof GTSEncoder) { + encoder = (GTSEncoder) stack.pop(); + updateIds = true; + } else if (stack.peek() instanceof List) { + // The macro returned a list of two elements, either GTS or ENCODER. The first one is the + // content to use for the action (UPDATE/REGISTER/UNREGISTER/DELETE), the second one is + // the content to forward. + List l = (List) stack.pop(); + if (2 == l.size()) { + Object update = l.get(0); + Object forward = l.get(1); + + if (update instanceof GeoTimeSerie) { + GeoTimeSerie gts = (GeoTimeSerie) update; + encoder = new GTSEncoder(0L); + encoder.setMetadata(gts.getMetadata()); + encoder.encode(gts); + updateIds = true; + } else if (update instanceof GTSEncoder) { + encoder = (GTSEncoder) update; + updateIds = true; + } else { + skip = true; + } + + // Create 'forward' wrapper + if (forward instanceof GeoTimeSerie) { + GeoTimeSerie gts = (GeoTimeSerie) forward; + forwardEncoder = new GTSEncoder(0L); + forwardEncoder.setMetadata(gts.getMetadata()); + forwardEncoder.encode(gts); + updateIds = true; + } else if (forward instanceof GTSEncoder) { + forwardEncoder = (GTSEncoder) forward; + updateIds = true; + } else { + skip = true; + } } else { skip = true; } - // Create 'forward' wrapper - if (forward instanceof GeoTimeSerie) { - GeoTimeSerie gts = (GeoTimeSerie) forward; - forwardEncoder = new GTSEncoder(0L); - forwardEncoder.setMetadata(gts.getMetadata()); - forwardEncoder.encode(gts); - updateIds = true; - } else if (forward instanceof GTSEncoder) { - forwardEncoder = (GTSEncoder) forward; - updateIds = true; - } else { - skip = true; + // Copy the forwardEncoder as a wrapper in the record. This will trigger the storing of this + // encoder instead of the one in 'encoder' but will forward the latter if forwarding should happen. + if (null != forwardEncoder) { + GTSWrapper wrapper = GTSWrapperHelper.fromGTSEncoderToGTSWrapper(forwardEncoder, false); + record.setForward(wrapper); } } else { skip = true; } - // Copy the forwardEncoder as a wrapper in the record. This will trigger the storing of this - // encoder instead of the one in 'encoder' but will forward the latter if forwarding should happen. - if (null != forwardEncoder) { - GTSWrapper wrapper = GTSWrapperHelper.fromGTSEncoderToGTSWrapper(forwardEncoder, false); - record.setForward(wrapper); - } - } else { - skip = true; - } - - if (!skip) { - // Copy the encoder - record.setMetadata(encoder.getMetadata()); - if (DatalogRecordType.UPDATE.equals(record.getType())) { - record.setBaseTimestamp(encoder.getBaseTimestamp()); - record.setEncoder(encoder.getBytes()); - } + if (!skip) { + // Copy the encoder + record.setMetadata(encoder.getMetadata()); + if (DatalogRecordType.UPDATE.equals(record.getType())) { + record.setBaseTimestamp(encoder.getBaseTimestamp()); + record.setEncoder(encoder.getBytes()); + } - // If there is a map left on the stack, update the token - if (stack.depth() > 0 && stack.peek() instanceof Map) { - tokenMap = (Map) stack.pop(); - // force token type - tokenMap.put(TOKENGEN.KEY_TYPE, TokenType.WRITE.toString()); - record.setToken((WriteToken) TOKENGEN.tokenFromMap(tokenMap, "Datalog", Long.MAX_VALUE)); + // If there is a map left on the stack, update the token + if (stack.depth() > 0 && stack.peek() instanceof Map) { + tokenMap = (Map) stack.pop(); + // force token type + tokenMap.put(TOKENGEN.KEY_TYPE, TokenType.WRITE.toString()); + record.setToken((WriteToken) TOKENGEN.tokenFromMap(tokenMap, "Datalog", Long.MAX_VALUE)); + } + updateIds = true; } - updateIds = true; } } - } - if (skip) { - stack.show(); - stack.clear(); - Sensision.update(SensisionConstants.SENSISION_CLASS_DATALOG_SKIPPED, labels, 1); - continue; - } + if (skip) { + stack.show(); + stack.clear(); + Sensision.update(SensisionConstants.SENSISION_CLASS_DATALOG_SKIPPED, labels, 1); + continue; + } - // - // Update the class and label ids if metadata were possibly changed. - // This is so the feeders later have a valid set of ids to use for sharding. - // + // + // Update the class and label ids if metadata were possibly changed. + // This is so the feeders later have a valid set of ids to use for sharding. + // - if (updateIds) { - long cid = GTSHelper.classId(CLASS_KEYS, record.getMetadata().getName()); - long lid = GTSHelper.labelsId(LABELS_KEYS, record.getMetadata().getLabels()); + if (updateIds) { + long cid = GTSHelper.classId(CLASS_KEYS, record.getMetadata().getName()); + long lid = GTSHelper.labelsId(LABELS_KEYS, record.getMetadata().getLabels()); - record.getMetadata().setClassId(cid); - record.getMetadata().setLabelsId(lid); + record.getMetadata().setClassId(cid); + record.getMetadata().setLabelsId(lid); - if (record.isSetForward()) { - cid = GTSHelper.classId(CLASS_KEYS, record.getForward().getMetadata().getName()); - lid = GTSHelper.labelsId(LABELS_KEYS, record.getForward().getMetadata().getLabels()); + if (record.isSetForward()) { + cid = GTSHelper.classId(CLASS_KEYS, record.getForward().getMetadata().getName()); + lid = GTSHelper.labelsId(LABELS_KEYS, record.getForward().getMetadata().getLabels()); - record.getForward().getMetadata().setClassId(cid); - record.getForward().getMetadata().setLabelsId(lid); + record.getForward().getMetadata().setClassId(cid); + record.getForward().getMetadata().setLabelsId(lid); + } } } + stack.show(); + stack.clear(); } - stack.show(); - stack.clear(); } // diff --git a/warp10/src/main/java/io/warp10/standalone/datalog/TCPDatalogFeeder.java b/warp10/src/main/java/io/warp10/standalone/datalog/TCPDatalogFeeder.java index 4d2e77593..3cd689c24 100644 --- a/warp10/src/main/java/io/warp10/standalone/datalog/TCPDatalogFeeder.java +++ b/warp10/src/main/java/io/warp10/standalone/datalog/TCPDatalogFeeder.java @@ -1,5 +1,5 @@ // -// Copyright 2020-2022 SenX S.A.S. +// Copyright 2020-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -32,7 +32,7 @@ public class TCPDatalogFeeder extends Thread { private static final Logger LOG = LoggerFactory.getLogger(TCPDatalogFeeder.class); public static final String DEFAULT_HOST = "127.0.0.1"; - public static final String DEFAULT_PORT = "3564"; // speeds 'DLNG', DataLog Next Generation + public static final String DEFAULT_PORT = "3564"; // spells 'DLNG', DataLog Next Generation private static final String DEFAULT_MAXCLIENTS = "2"; private static final String DEFAULT_BACKLOG = "2"; diff --git a/warp10/src/main/java/io/warp10/standalone/datalog/TCPDatalogFeederWorker.java b/warp10/src/main/java/io/warp10/standalone/datalog/TCPDatalogFeederWorker.java index 44de125d3..5e1fd34d5 100644 --- a/warp10/src/main/java/io/warp10/standalone/datalog/TCPDatalogFeederWorker.java +++ b/warp10/src/main/java/io/warp10/standalone/datalog/TCPDatalogFeederWorker.java @@ -1,5 +1,5 @@ // -// Copyright 2020-2023 SenX S.A.S. +// Copyright 2020-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -63,7 +63,6 @@ import io.warp10.script.binary.ADD; import io.warp10.script.functions.ECDH; import io.warp10.script.functions.ECPRIVATE; -import io.warp10.script.functions.ECPUBLIC; import io.warp10.script.functions.ECSIGN; import io.warp10.script.functions.HASH; import io.warp10.script.functions.REVERSE; @@ -141,7 +140,6 @@ public void run() { // ECPrivateKey eccPrivate = null; - ECPublicKey eccPublic = null; String eccpri = WarpConfig.getProperty(FileBasedDatalogManager.CONFIG_DATALOG_FEEDER_ECC_PRIVATE); @@ -701,7 +699,7 @@ public void run() { if (!currentFile.equals(next)) { len = fs.getFileStatus(path).getLen(); - System.out.println("FILE=" + currentFile + " POS=" + position + " LEN=" + len + " NEXT=" + next); + //System.out.println("FILE=" + currentFile + " POS=" + position + " LEN=" + len + " NEXT=" + next); if (len == position + SEQFILE_SYNC_MARKER_LEN) { previousFile = currentFile; currentFile = next; diff --git a/warp10/src/main/thrift/io_warp10_continuum_store_thrift_data.thrift b/warp10/src/main/thrift/io_warp10_continuum_store_thrift_data.thrift index ba55f92ca..7e8b27bc4 100644 --- a/warp10/src/main/thrift/io_warp10_continuum_store_thrift_data.thrift +++ b/warp10/src/main/thrift/io_warp10_continuum_store_thrift_data.thrift @@ -1,5 +1,5 @@ // -// Copyright 2018-2024 SenX S.A.S. +// Copyright 2018-2025 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -448,6 +448,7 @@ enum DatalogRecordType { DELETE = 2, REGISTER = 3, UNREGISTER = 4, + KVSTORE = 5, } struct DatalogRecord { @@ -506,9 +507,14 @@ struct DatalogRecord { 10: optional GTSWrapper forward, /** - * Write token associated with the request. This is needed when using FDB with tenant prefixes. + * Write token associated with the request. This is needed when using FDB with tenant prefixes of KVPrefixes */ 11: optional io_warp10_quasar_token_thrift_data.WriteToken token, + + /** + * KVStoreRequest instance when message is containing a KVStore mutation + */ + 12: optional KVStoreRequest kvstorerequest, } enum DatalogMessageType { @@ -564,3 +570,37 @@ struct DatalogMessage { */ 52: optional string commitref, } + +// +// KVStore related structures +// + +struct KVStoreRequest { + 1: list keys, + 2: list values, + /** + * Write token associated with the request. This is needed to add/strip prefixes such as the FDB tenant prefix. + */ + 3: io_warp10_quasar_token_thrift_data.WriteToken token, +} + +struct KVFetchRequest { + /** + * Start key (inclusive) + */ + 1: optional binary start, + /** + * End key (exclusive) + */ + 2: optional binary stop, + + /** + * Explicit list of keys + */ + 3: optional list keys, + + /** + * Read token associated with the request. This is needed to add/strip prefixes. + */ + 4: io_warp10_quasar_token_thrift_data.ReadToken token, +}