diff --git a/styx-service-common/src/main/java/com/spotify/styx/storage/DatastoreStorage.java b/styx-service-common/src/main/java/com/spotify/styx/storage/DatastoreStorage.java index ede31f58e5..85ef475b65 100644 --- a/styx-service-common/src/main/java/com/spotify/styx/storage/DatastoreStorage.java +++ b/styx-service-common/src/main/java/com/spotify/styx/storage/DatastoreStorage.java @@ -41,6 +41,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.google.api.client.util.ExponentialBackOff; +import com.google.api.client.util.Sets; import com.google.cloud.Timestamp; import com.google.cloud.datastore.DatastoreException; import com.google.cloud.datastore.DatastoreReader; @@ -48,6 +49,7 @@ import com.google.cloud.datastore.EntityQuery; import com.google.cloud.datastore.Key; import com.google.cloud.datastore.KeyFactory; +import com.google.cloud.datastore.KeyQuery; import com.google.cloud.datastore.PathElement; import com.google.cloud.datastore.Query; import com.google.cloud.datastore.StringValue; @@ -294,10 +296,35 @@ public void updateNextNaturalTrigger(WorkflowId workflowId, TriggerInstantSpec t } Map workflowsWithNextNaturalTrigger() throws IOException { + final Set workflowIds = Sets.newHashSet(); + final KeyQuery query = + Query.newKeyQueryBuilder() + .setKind(KIND_WORKFLOW) + .build(); + datastore.query(query, key -> workflowIds.add(WorkflowId.parseKey(key.getName()))); + return workflowsWithNextNaturalTrigger(workflowIds); + } + + Map workflowsWithNextNaturalTrigger(Set workflowIds) { + final Iterable> batches = Iterables.partition(workflowIds, + MAX_NUMBER_OF_ENTITIES_IN_ONE_BATCH_READ); + return StreamSupport.stream(batches.spliterator(), false) + .map(batch -> asyncIO(() -> this.workflowsWithNextNaturalTrigger(batch))) + // `collect and stream` is crucial to make tasks running in parallel, otherwise they will + // be processed sequentially. Without `collect`, it will try to submit and wait for each task + // while iterating through the stream. This is somewhat subtle, so think twice. + .collect(toList()) + .stream() + .flatMap(task -> task.join().entrySet().stream()) + .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + Map workflowsWithNextNaturalTrigger(final List batch) throws IOException { final Map map = Maps.newHashMap(); - final EntityQuery query = - Query.newEntityQueryBuilder().setKind(KIND_WORKFLOW).build(); - datastore.query(query, entity -> { + final List keys = batch.stream() + .map(workflowId -> workflowKey(datastore::newKeyFactory, workflowId)) + .collect(toList()); + datastore.get(keys, entity -> { final Workflow workflow; try { workflow = OBJECT_MAPPER.readValue(entity.getString(PROPERTY_WORKFLOW_JSON), Workflow.class); @@ -461,7 +488,7 @@ Map readActiveStates(String componentId, String work final EntityQuery query = Query.newEntityQueryBuilder().setKind(KIND_ACTIVE_WORKFLOW_INSTANCE) .setFilter(CompositeFilter.and(PropertyFilter.eq(PROPERTY_COMPONENT, componentId), - PropertyFilter.eq(PROPERTY_WORKFLOW, workflowId))) + PropertyFilter.eq(PROPERTY_WORKFLOW, workflowId))) .build(); return queryActiveStates(query); @@ -509,7 +536,7 @@ static RunState entityToRunState(Entity entity, WorkflowInstance instance) .trigger(DatastoreStorage.readOpt(entity, PROPERTY_STATE_TRIGGER_TYPE).map(type -> TriggerUtil.trigger(type, entity.getString(PROPERTY_STATE_TRIGGER_ID)))) .messages(OBJECT_MAPPER.>readValue(entity.getString(PROPERTY_STATE_MESSAGES), - new TypeReference>() { })) + new TypeReference>() {})) .retryDelayMillis(readOpt(entity, PROPERTY_STATE_RETRY_DELAY_MILLIS)) .lastExit(DatastoreStorage.readOpt(entity, PROPERTY_STATE_LAST_EXIT).map(Long::intValue)) .executionId(readOpt(entity, PROPERTY_STATE_EXECUTION_ID)) @@ -517,7 +544,7 @@ static RunState entityToRunState(Entity entity, WorkflowInstance instance) .executionDescription(readOptJson(entity, PROPERTY_STATE_EXECUTION_DESCRIPTION, ExecutionDescription.class)) .resourceIds(readOptJson(entity, PROPERTY_STATE_RESOURCE_IDS, - new TypeReference>() { })) + new TypeReference>() {})) .triggerParameters(readOptJson(entity, PROPERTY_STATE_TRIGGER_PARAMETERS, TriggerParameters.class)) .build(); return RunState.create(instance, state, data, Instant.ofEpochMilli(timestamp), counter); @@ -695,8 +722,9 @@ static Workflow parseWorkflowJson(Entity entity, WorkflowId workflowId) throws I /** * Optionally get an {@link Entity} from a {@link DatastoreReader}. * - * @param datastore The reader to get from - * @param key The key to get + * @param datastore The reader to get from + * @param key The key to get + * * @return an optional containing the entity if it existed, empty otherwise. */ static Optional getOpt(CheckedDatastoreReaderWriter datastore, Key key) throws IOException { @@ -717,8 +745,9 @@ private static Optional getOptInstantProperty(Optional entity, /** * Convert an optional {@link Entity} into a builder if it exists, otherwise create a new builder. * - * @param entityOpt The optional entity - * @param key The key for which to create a new builder if the entity is not present + * @param entityOpt The optional entity + * @param key The key for which to create a new builder if the entity is not present + * * @return an entity builder either based of the given entity or a new one using the key. */ private static Entity.Builder asBuilderOrNew(Optional entityOpt, Key key) { @@ -795,7 +824,7 @@ private void deleteShardsForCounter(String counterId) throws IOException { // this is a safe guard to not to exceed max number of entities in one batch write // because in practice number of shards is much smaller for (List batch : Lists.partition(shards, MAX_NUMBER_OF_ENTITIES_IN_ONE_BATCH_WRITE)) { - datastore.delete(batch.toArray(new Key[0])); + datastore.delete(batch.toArray(new Key[0])); } } @@ -842,7 +871,7 @@ List getBackfills(boolean showAll) throws IOException { List getBackfillsForComponent(boolean showAll, String component) throws IOException { final EntityQuery query = backfillQueryBuilder(showAll, - PropertyFilter.eq(PROPERTY_COMPONENT, component)) + PropertyFilter.eq(PROPERTY_COMPONENT, component)) .build(); return backfillsForQuery(query); @@ -850,13 +879,14 @@ List getBackfillsForComponent(boolean showAll, String component) throw List getBackfillsForWorkflow(boolean showAll, String workflow) throws IOException { final EntityQuery query = backfillQueryBuilder(showAll, - PropertyFilter.eq(PROPERTY_WORKFLOW, workflow)) + PropertyFilter.eq(PROPERTY_WORKFLOW, workflow)) .build(); return backfillsForQuery(query); } - List getBackfillsForWorkflowId(boolean showAll, WorkflowId workflowId, Optional start) throws IOException { + List getBackfillsForWorkflowId(boolean showAll, WorkflowId workflowId, Optional start) + throws IOException { final List filters = new ArrayList<>(); filters.add(PropertyFilter.eq(PROPERTY_COMPONENT, workflowId.componentId())); @@ -874,7 +904,7 @@ List getBackfillsForWorkflowId(boolean showAll, WorkflowId workflowId, static Backfill entityToBackfill(Entity entity) throws IOException { final WorkflowId workflowId = WorkflowId.create(entity.getString(PROPERTY_COMPONENT), - entity.getString(PROPERTY_WORKFLOW)); + entity.getString(PROPERTY_WORKFLOW)); final BackfillBuilder builder = Backfill.newBuilder() .id(entity.getKey().getName()) @@ -919,21 +949,21 @@ private Stream readStream(Entity entity, String property) { private static Optional readOpt(Entity entity, String property) { return entity.contains(property) - ? Optional.of(entity.>getValue(property).get()) - : Optional.empty(); + ? Optional.of(entity.>getValue(property).get()) + : Optional.empty(); } private static Optional readOptJson(Entity entity, String property, Class cls) throws IOException { return entity.contains(property) - ? Optional.of(OBJECT_MAPPER.readValue(entity.getString(property), cls)) - : Optional.empty(); + ? Optional.of(OBJECT_MAPPER.readValue(entity.getString(property), cls)) + : Optional.empty(); } private static Optional readOptJson(Entity entity, String property, TypeReference valueTypeRef) throws IOException { return entity.contains(property) - ? Optional.of(OBJECT_MAPPER.readValue(entity.getString(property), valueTypeRef)) - : Optional.empty(); + ? Optional.of(OBJECT_MAPPER.readValue(entity.getString(property), valueTypeRef)) + : Optional.empty(); } private static T read(Entity entity, String property, T defaultValue) { @@ -1009,8 +1039,8 @@ private boolean isRetryableTransactionException(DatastoreException e) { Map shardsForCounter(String counterId) throws IOException { final List shardKeys = IntStream.range(0, NUM_SHARDS).mapToObj( - index -> datastore.newKeyFactory().setKind(KIND_COUNTER_SHARD).newKey( - String.format("%s-%d", counterId, index))) + index -> datastore.newKeyFactory().setKind(KIND_COUNTER_SHARD).newKey( + String.format("%s-%d", counterId, index))) .collect(toList()); final Map fetchedShards = new HashMap<>();