Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Commit

Permalink
Get workflow with natural trigger consistently (#1019)
Browse files Browse the repository at this point in the history
* Get workflow with natural trigger consistently
  • Loading branch information
honnix authored Jan 13, 2023
1 parent b53dcf0 commit 4296e54
Showing 1 changed file with 53 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sets;
import com.google.cloud.Timestamp;
import com.google.cloud.datastore.DatastoreException;
import com.google.cloud.datastore.DatastoreReader;
import com.google.cloud.datastore.Entity;
import com.google.cloud.datastore.EntityQuery;
import com.google.cloud.datastore.Key;
import com.google.cloud.datastore.KeyFactory;
import com.google.cloud.datastore.KeyQuery;
import com.google.cloud.datastore.PathElement;
import com.google.cloud.datastore.Query;
import com.google.cloud.datastore.StringValue;
Expand Down Expand Up @@ -294,10 +296,35 @@ public void updateNextNaturalTrigger(WorkflowId workflowId, TriggerInstantSpec t
}

Map<Workflow, TriggerInstantSpec> workflowsWithNextNaturalTrigger() throws IOException {
final Set<WorkflowId> workflowIds = Sets.newHashSet();
final KeyQuery query =
Query.newKeyQueryBuilder()
.setKind(KIND_WORKFLOW)
.build();
datastore.query(query, key -> workflowIds.add(WorkflowId.parseKey(key.getName())));
return workflowsWithNextNaturalTrigger(workflowIds);
}

Map<Workflow, TriggerInstantSpec> workflowsWithNextNaturalTrigger(Set<WorkflowId> workflowIds) {
final Iterable<List<WorkflowId>> batches = Iterables.partition(workflowIds,
MAX_NUMBER_OF_ENTITIES_IN_ONE_BATCH_READ);
return StreamSupport.stream(batches.spliterator(), false)
.map(batch -> asyncIO(() -> this.workflowsWithNextNaturalTrigger(batch)))
// `collect and stream` is crucial to make tasks running in parallel, otherwise they will
// be processed sequentially. Without `collect`, it will try to submit and wait for each task
// while iterating through the stream. This is somewhat subtle, so think twice.
.collect(toList())
.stream()
.flatMap(task -> task.join().entrySet().stream())
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
}

Map<Workflow, TriggerInstantSpec> workflowsWithNextNaturalTrigger(final List<WorkflowId> batch) throws IOException {
final Map<Workflow, TriggerInstantSpec> map = Maps.newHashMap();
final EntityQuery query =
Query.newEntityQueryBuilder().setKind(KIND_WORKFLOW).build();
datastore.query(query, entity -> {
final List<Key> keys = batch.stream()
.map(workflowId -> workflowKey(datastore::newKeyFactory, workflowId))
.collect(toList());
datastore.get(keys, entity -> {
final Workflow workflow;
try {
workflow = OBJECT_MAPPER.readValue(entity.getString(PROPERTY_WORKFLOW_JSON), Workflow.class);
Expand Down Expand Up @@ -461,7 +488,7 @@ Map<WorkflowInstance, RunState> readActiveStates(String componentId, String work
final EntityQuery query =
Query.newEntityQueryBuilder().setKind(KIND_ACTIVE_WORKFLOW_INSTANCE)
.setFilter(CompositeFilter.and(PropertyFilter.eq(PROPERTY_COMPONENT, componentId),
PropertyFilter.eq(PROPERTY_WORKFLOW, workflowId)))
PropertyFilter.eq(PROPERTY_WORKFLOW, workflowId)))
.build();

return queryActiveStates(query);
Expand Down Expand Up @@ -509,15 +536,15 @@ static RunState entityToRunState(Entity entity, WorkflowInstance instance)
.trigger(DatastoreStorage.<String>readOpt(entity, PROPERTY_STATE_TRIGGER_TYPE).map(type ->
TriggerUtil.trigger(type, entity.getString(PROPERTY_STATE_TRIGGER_ID))))
.messages(OBJECT_MAPPER.<List<Message>>readValue(entity.getString(PROPERTY_STATE_MESSAGES),
new TypeReference<List<Message>>() { }))
new TypeReference<List<Message>>() {}))
.retryDelayMillis(readOpt(entity, PROPERTY_STATE_RETRY_DELAY_MILLIS))
.lastExit(DatastoreStorage.<Long>readOpt(entity, PROPERTY_STATE_LAST_EXIT).map(Long::intValue))
.executionId(readOpt(entity, PROPERTY_STATE_EXECUTION_ID))
.runnerId(readOpt(entity, PROPERTY_STATE_RUNNER_ID))
.executionDescription(readOptJson(entity, PROPERTY_STATE_EXECUTION_DESCRIPTION,
ExecutionDescription.class))
.resourceIds(readOptJson(entity, PROPERTY_STATE_RESOURCE_IDS,
new TypeReference<Set<String>>() { }))
new TypeReference<Set<String>>() {}))
.triggerParameters(readOptJson(entity, PROPERTY_STATE_TRIGGER_PARAMETERS, TriggerParameters.class))
.build();
return RunState.create(instance, state, data, Instant.ofEpochMilli(timestamp), counter);
Expand Down Expand Up @@ -695,8 +722,9 @@ static Workflow parseWorkflowJson(Entity entity, WorkflowId workflowId) throws I
/**
* Optionally get an {@link Entity} from a {@link DatastoreReader}.
*
* @param datastore The reader to get from
* @param key The key to get
* @param datastore The reader to get from
* @param key The key to get
*
* @return an optional containing the entity if it existed, empty otherwise.
*/
static Optional<Entity> getOpt(CheckedDatastoreReaderWriter datastore, Key key) throws IOException {
Expand All @@ -717,8 +745,9 @@ private static Optional<Instant> getOptInstantProperty(Optional<Entity> entity,
/**
* Convert an optional {@link Entity} into a builder if it exists, otherwise create a new builder.
*
* @param entityOpt The optional entity
* @param key The key for which to create a new builder if the entity is not present
* @param entityOpt The optional entity
* @param key The key for which to create a new builder if the entity is not present
*
* @return an entity builder either based of the given entity or a new one using the key.
*/
private static Entity.Builder asBuilderOrNew(Optional<Entity> entityOpt, Key key) {
Expand Down Expand Up @@ -795,7 +824,7 @@ private void deleteShardsForCounter(String counterId) throws IOException {
// this is a safe guard to not to exceed max number of entities in one batch write
// because in practice number of shards is much smaller
for (List<Key> batch : Lists.partition(shards, MAX_NUMBER_OF_ENTITIES_IN_ONE_BATCH_WRITE)) {
datastore.delete(batch.toArray(new Key[0]));
datastore.delete(batch.toArray(new Key[0]));
}
}

Expand Down Expand Up @@ -842,21 +871,22 @@ List<Backfill> getBackfills(boolean showAll) throws IOException {

List<Backfill> getBackfillsForComponent(boolean showAll, String component) throws IOException {
final EntityQuery query = backfillQueryBuilder(showAll,
PropertyFilter.eq(PROPERTY_COMPONENT, component))
PropertyFilter.eq(PROPERTY_COMPONENT, component))
.build();

return backfillsForQuery(query);
}

List<Backfill> getBackfillsForWorkflow(boolean showAll, String workflow) throws IOException {
final EntityQuery query = backfillQueryBuilder(showAll,
PropertyFilter.eq(PROPERTY_WORKFLOW, workflow))
PropertyFilter.eq(PROPERTY_WORKFLOW, workflow))
.build();

return backfillsForQuery(query);
}

List<Backfill> getBackfillsForWorkflowId(boolean showAll, WorkflowId workflowId, Optional<Instant> start) throws IOException {
List<Backfill> getBackfillsForWorkflowId(boolean showAll, WorkflowId workflowId, Optional<Instant> start)
throws IOException {

final List<Filter> filters = new ArrayList<>();
filters.add(PropertyFilter.eq(PROPERTY_COMPONENT, workflowId.componentId()));
Expand All @@ -874,7 +904,7 @@ List<Backfill> getBackfillsForWorkflowId(boolean showAll, WorkflowId workflowId,

static Backfill entityToBackfill(Entity entity) throws IOException {
final WorkflowId workflowId = WorkflowId.create(entity.getString(PROPERTY_COMPONENT),
entity.getString(PROPERTY_WORKFLOW));
entity.getString(PROPERTY_WORKFLOW));

final BackfillBuilder builder = Backfill.newBuilder()
.id(entity.getKey().getName())
Expand Down Expand Up @@ -919,21 +949,21 @@ private <T> Stream<T> readStream(Entity entity, String property) {

private static <T> Optional<T> readOpt(Entity entity, String property) {
return entity.contains(property)
? Optional.of(entity.<Value<T>>getValue(property).get())
: Optional.empty();
? Optional.of(entity.<Value<T>>getValue(property).get())
: Optional.empty();
}

private static <T> Optional<T> readOptJson(Entity entity, String property, Class<T> cls) throws IOException {
return entity.contains(property)
? Optional.of(OBJECT_MAPPER.readValue(entity.getString(property), cls))
: Optional.empty();
? Optional.of(OBJECT_MAPPER.readValue(entity.getString(property), cls))
: Optional.empty();
}

private static <T> Optional<T> readOptJson(Entity entity, String property, TypeReference<T> valueTypeRef)
throws IOException {
return entity.contains(property)
? Optional.of(OBJECT_MAPPER.readValue(entity.getString(property), valueTypeRef))
: Optional.empty();
? Optional.of(OBJECT_MAPPER.readValue(entity.getString(property), valueTypeRef))
: Optional.empty();
}

private static <T> T read(Entity entity, String property, T defaultValue) {
Expand Down Expand Up @@ -1009,8 +1039,8 @@ private boolean isRetryableTransactionException(DatastoreException e) {

Map<Integer, Long> shardsForCounter(String counterId) throws IOException {
final List<Key> shardKeys = IntStream.range(0, NUM_SHARDS).mapToObj(
index -> datastore.newKeyFactory().setKind(KIND_COUNTER_SHARD).newKey(
String.format("%s-%d", counterId, index)))
index -> datastore.newKeyFactory().setKind(KIND_COUNTER_SHARD).newKey(
String.format("%s-%d", counterId, index)))
.collect(toList());

final Map<Integer, Long> fetchedShards = new HashMap<>();
Expand Down

0 comments on commit 4296e54

Please sign in to comment.