Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVStore support #1389

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion warp10/src/main/java/io/warp10/continuum/Configuration.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2018-2024 SenX S.A.S.
// Copyright 2018-2025 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -230,6 +230,16 @@ public class Configuration {
public static final String WARPSCRIPT_MAX_PIXELS_HARD = "warpscript.maxpixels.hard";
public static final String WARPSCRIPT_MAX_JSON_HARD = "warpscript.maxjson.hard";

/**
* Default maximum key size for KV stored via KVSTORE
*/
public static final String WARP_KVSTORE_MAXK = "warp.kvstore.maxk";

/**
* Default maximum value size for KV stored via KVSTORE
*/
public static final String WARP_KVSTORE_MAXV = "warp.kvstore.maxv";

/**
* When set to true, allow common comment block style. When false, keep the old strict comment block style within WarpScript
*/
Expand Down Expand Up @@ -1760,6 +1770,11 @@ public class Configuration {
*/
public static final String IN_MEMORY_EPHEMERAL = "in.memory.ephemeral";

/**
* Default maximum value size for all stored KV
*/
public static final String IN_MEMORY_MAXKVSIZE = "in.memory.maxkvsize";

/**
* Number of chunks per GTS to handle in memory (defaults to 3)
*/
Expand Down Expand Up @@ -1814,6 +1829,11 @@ public class Configuration {
*/
public static final String EGRESS_FDB_TENANT_PREFIX = "egress.fdb.tenant.prefix";

/**
* Maximum number of retries for FoundationDB transactions
*/
public static final String EGRESS_FDB_RETRYLIMIT = "egress.fdb.retrylimit";

/**
* Size of pooled FoundationDB databases instances
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2018-2023 SenX S.A.S.
// Copyright 2018-2025 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,16 @@ public class SensisionConstants {
// Classes
//

/**
* Number of FoundationDB sets committed in 'Egress'
*/
public static final String SENSISION_CLASS_CONTINUUM_EGRESS_FDB_SETS_COMMITTED = "warp.egress.fdb.sets.committed";

/**
* Number of FoundationDB clears committed in 'Egress'
*/
public static final String SENSISION_CLASS_CONTINUUM_EGRESS_FDB_CLEARS_COMMITTED = "warp.egress.fdb.clears.committed";

/**
* Number of times we have reached the maximum number of version attempts
*/
Expand Down
12 changes: 11 additions & 1 deletion warp10/src/main/java/io/warp10/continuum/store/StoreClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2018-2022 SenX S.A.S.
// Copyright 2018-2025 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -17,14 +17,21 @@
package io.warp10.continuum.store;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map.Entry;

import io.warp10.continuum.gts.GTSEncoder;
import io.warp10.continuum.store.thrift.data.FetchRequest;
import io.warp10.continuum.store.thrift.data.KVFetchRequest;
import io.warp10.continuum.store.thrift.data.KVStoreRequest;
import io.warp10.continuum.store.thrift.data.Metadata;
import io.warp10.quasar.token.thrift.data.WriteToken;
import io.warp10.standalone.StandalonePlasmaHandlerInterface;

public interface StoreClient {

public interface KVIterator<T> extends Iterator<T>, AutoCloseable {}

public void store(GTSEncoder encoder) throws IOException;
public long delete(WriteToken token, Metadata metadata, long start, long end) throws IOException;
/**
Expand All @@ -43,4 +50,7 @@ public interface StoreClient {
*/
public GTSDecoderIterator fetch(FetchRequest req) throws IOException;
public void addPlasmaHandler(StandalonePlasmaHandlerInterface handler);

public void kvstore(KVStoreRequest request) throws IOException;
public KVIterator<Entry<byte[],byte[]>> kvfetch(KVFetchRequest request) throws IOException;
}
4 changes: 2 additions & 2 deletions warp10/src/main/java/io/warp10/fdb/FDBClear.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2022 SenX S.A.S.
// Copyright 2022-2025 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,6 +48,6 @@ public void apply(Transaction txn) {

@Override
public int size() {
return 103 + key.length + (null != this.tenant ? tenant.length : 0);
return 120 + key.length + (null != this.tenant ? tenant.length : 0);
}
}
83 changes: 83 additions & 0 deletions warp10/src/main/java/io/warp10/fdb/FDBGetOp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//
// Copyright 2025 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package io.warp10.fdb;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import com.apple.foundationdb.Transaction;

public class FDBGetOp implements FDBMutation {

/**
* Tenant prefix
*/
private final byte[] tenant;
private final byte[] key;

CompletableFuture<byte[]> future = null;

public FDBGetOp(byte[] key) {
this(null, key);
}

public FDBGetOp(byte[] tenant, byte[] key) {
this.tenant = tenant;
this.key = key;
}

@Override
public void apply(Transaction txn) {
synchronized (this.key) {
if (null != this.future) {
throw new IllegalStateException("Operation has already been applied.");
}
if (null != tenant) {
byte[] tkey = Arrays.copyOf(this.tenant, tenant.length + key.length);
System.arraycopy(this.key, 0, tkey, this.tenant.length, key.length);
this.future = txn.get(tkey);
} else {
this.future = txn.get(this.key);
}
}
}

@Override
public int size() {
return 103 + (null != tenant ? tenant.length : 0) + key.length;
}

public byte[] getKey() {
return this.key;
}

public byte[] getValue() {
if (null == future) {
throw new IllegalStateException("Operation has not been applied.");
}
try {
return this.future.get();
} catch (ExecutionException|InterruptedException e) {
throw new RuntimeException("Error retrieving value.", e);
}
}

public byte[] getTenant() {
return this.tenant;
}
}
Loading