Skip to content

Commit

Permalink
#4682 PlanB State Store
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Jan 18, 2025
1 parent 053e0d5 commit 2ce46a9
Show file tree
Hide file tree
Showing 23 changed files with 634 additions and 291 deletions.
10 changes: 2 additions & 8 deletions stroom-app/src/main/resources/ui/noauth/swagger/stroom.json
Original file line number Diff line number Diff line change
Expand Up @@ -4406,10 +4406,7 @@
},
"responses" : {
"default" : {
"content" : {
"application/octet-stream" : { }
},
"description" : "default response"
"description" : "Returns: 200 if the request was ok and the snapshot returned, 304 if the snapshot has not been modified, 401 if unauthorised, 500 for any other error"
}
},
"summary" : "Fetch Plan B snapshot",
Expand Down Expand Up @@ -4457,10 +4454,7 @@
},
"responses" : {
"default" : {
"content" : {
"application/json" : { }
},
"description" : "default response"
"description" : "Returns: 200 if the part was sent ok, 401 if unauthorised, 500 for any other error"
}
},
"summary" : "Send Plan B part",
Expand Down
11 changes: 5 additions & 6 deletions stroom-app/src/main/resources/ui/noauth/swagger/stroom.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3015,9 +3015,9 @@ paths:
$ref: "#/components/schemas/SnapshotRequest"
responses:
default:
content:
application/octet-stream: {}
description: default response
description: "Returns: 200 if the request was ok and the snapshot returned,\
\ 304 if the snapshot has not been modified, 401 if unauthorised, 500\
\ for any other error"
summary: Fetch Plan B snapshot
tags:
- File Transfer
Expand Down Expand Up @@ -3050,9 +3050,8 @@ paths:
type: object
responses:
default:
content:
application/json: {}
description: default response
description: "Returns: 200 if the part was sent ok, 401 if unauthorised,\
\ 500 for any other error"
summary: Send Plan B part
tags:
- File Transfer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import stroom.util.shared.RestResource;

import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.HeaderParam;
Expand Down Expand Up @@ -50,7 +51,14 @@ public interface FileTransferResource extends RestResource {
@Consumes(MediaType.APPLICATION_JSON)
@Operation(
summary = "Fetch Plan B snapshot",
operationId = "fetchSnapshot")
operationId = "fetchSnapshot",
responses = {
@ApiResponse(description = "Returns: " +
"200 if the request was ok and the snapshot returned, " +
"304 if the snapshot has not been modified, " +
"401 if unauthorised, " +
"500 for any other error")
})
Response fetchSnapshot(SnapshotRequest request);

@POST
Expand All @@ -59,7 +67,13 @@ public interface FileTransferResource extends RestResource {
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
@Operation(
summary = "Send Plan B part",
operationId = "sendPart")
operationId = "sendPart",
responses = {
@ApiResponse(description = "Returns: " +
"200 if the part was sent ok, " +
"401 if unauthorised, " +
"500 for any other error")
})
Response sendPart(@HeaderParam("createTime") long createTime,
@HeaderParam("metaId") long metaId,
@HeaderParam("fileHash") String fileHash,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import stroom.planb.shared.PlanBDoc;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;
import stroom.util.shared.time.SimpleDuration;
import stroom.util.time.SimpleDurationUtil;
import stroom.util.zip.ZipUtil;

import jakarta.inject.Provider;
Expand Down Expand Up @@ -138,6 +140,29 @@ public void merge(final Path sourceDir) {
}
}

@Override
public void condense() {
try {
final PlanBDoc doc = getDoc();
if (doc != null && doc.isCondense()) {
final SimpleDuration duration = SimpleDuration
.builder()
.time(doc.getCondenseAge())
.timeUnit(doc.getCondenseTimeUnit())
.build();
final Instant maxAge = SimpleDurationUtil.minus(Instant.now(), duration);
incrementUseCount();
try {
db.condense(maxAge);
} finally {
decrementUseCount();
}
}
} catch (final Exception e) {
LOGGER.error(e::getMessage, e);
}
}

@Override
public void checkSnapshotStatus(final SnapshotRequest request) {
// If we already have a snapshot for the current write time then don't create a snapshot and just return an
Expand Down Expand Up @@ -217,6 +242,7 @@ public void cleanup() {
if (useCount.get() == 0) {
if (open && isIdle()) {
db.close();
db = null;
open = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.stream.Stream;

Expand All @@ -27,11 +29,14 @@ public class MergeProcessor {

private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(MergeProcessor.class);

private static final Duration MIN_CONDENSE_FREQUENCY = Duration.ofMinutes(10);

private final SequentialFileStore fileStore;
private final Path mergingDir;
private final SecurityContext securityContext;
private final TaskContextFactory taskContextFactory;
private final ShardManager shardManager;
private Instant nextCondenseTime;

@Inject
public MergeProcessor(final SequentialFileStore fileStore,
Expand All @@ -50,6 +55,8 @@ public MergeProcessor(final SequentialFileStore fileStore,
throw new RuntimeException("Unable to delete contents of: " + FileUtil.getCanonicalPath(mergingDir));
}
}

nextCondenseTime = Instant.now().plus(MIN_CONDENSE_FREQUENCY);
}

private boolean ensureDirExists(final Path path) {
Expand Down Expand Up @@ -90,6 +97,13 @@ public void exec() {
taskContext.info(() -> "Merging data: " + currentStoreId);
merge(sequentialFile);

// Periodically we will condense all shards.
// This is done here so that the same thread is always used for writing.
if (nextCondenseTime.isBefore(Instant.now())) {
shardManager.condenseAll();
nextCondenseTime = Instant.now().plus(MIN_CONDENSE_FREQUENCY);
}

// Increment store id.
storeId++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ interface Shard {
*/
void merge(Path sourceDir);

/**
* Condense data in the shard.
*/
void condense();

/**
* Determine if we are allowed to create a snapshot or if the snapshot we have is already the latest.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public void merge(final Path sourceDir) throws IOException {
shard.merge(sourceDir);
}

public void condenseAll() {
shardMap.values().forEach(shard -> shard.condense());
}

public void checkSnapshotStatus(final SnapshotRequest request) {
final Shard shard = getShard(request.getMapName());
shard.checkSnapshotStatus(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public void merge(final Path sourceDir) {
throw new RuntimeException("Merge is not supported on snapshots");
}

@Override
public void condense() {
// Condense is not supported on snapshots
}

@Override
public void checkSnapshotStatus(final SnapshotRequest request) {
throw new RuntimeException("You cannot create a snapshot from a snapshot");
Expand Down Expand Up @@ -310,6 +315,7 @@ private void cleanup() {
if (useCount.get() == 0) {
if (open && (destroy || isIdle())) {
db.close();
db = null;
open = false;
}

Expand Down
Loading

0 comments on commit 2ce46a9

Please sign in to comment.