Skip to content

Commit

Permalink
feat(core): introduce an always block on flow & flowable
Browse files Browse the repository at this point in the history
close #6649
  • Loading branch information
tchiotludo committed Jan 8, 2025
1 parent 47d2b09 commit abc1bc1
Show file tree
Hide file tree
Showing 33 changed files with 906 additions and 45 deletions.
41 changes: 32 additions & 9 deletions core/src/main/java/io/kestra/core/models/executions/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,15 @@ public TaskRun findTaskRunByTaskIdAndValue(String id, List<String> values)
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedErrors afters tasks
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolvedTasks,
List<ResolvedTask> resolvedErrors) {
return this.findTaskDependingFlowState(resolvedTasks, resolvedErrors, null);
public List<ResolvedTask> findTaskDependingFlowState(
List<ResolvedTask> resolvedTasks,
List<ResolvedTask> resolvedErrors,
List<ResolvedTask> resolvedAfters
) {
return this.findTaskDependingFlowState(resolvedTasks, resolvedErrors, resolvedAfters, null);
}

/**
Expand All @@ -349,25 +353,45 @@ public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolved
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedAlways afters tasks
* @param parentTaskRun the parent task
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolvedTasks,
@Nullable List<ResolvedTask> resolvedErrors, TaskRun parentTaskRun) {
public List<ResolvedTask> findTaskDependingFlowState(
List<ResolvedTask> resolvedTasks,
@Nullable List<ResolvedTask> resolvedErrors,
@Nullable List<ResolvedTask> resolvedAlways,
TaskRun parentTaskRun
) {
resolvedTasks = removeDisabled(resolvedTasks);
resolvedErrors = removeDisabled(resolvedErrors);
resolvedAlways = removeDisabled(resolvedAlways);


List<TaskRun> errorsFlow = this.findTaskRunByTasks(resolvedErrors, parentTaskRun);
List<TaskRun> alwaysFlow = this.findTaskRunByTasks(resolvedAlways, parentTaskRun);

// always is already started, just continue theses always
if (!alwaysFlow.isEmpty()) {
return resolvedAlways == null ? Collections.emptyList() : resolvedAlways;
}

// Check if flow has failed task
if (!errorsFlow.isEmpty() || this.hasFailed(resolvedTasks, parentTaskRun)) {
if (resolvedErrors != null && (!errorsFlow.isEmpty() || this.hasFailed(resolvedTasks, parentTaskRun))) {
// Check if among the failed task, they will be retried
if (!this.hasFailedNoRetry(resolvedTasks, parentTaskRun)) {
return Collections.emptyList();
}

return resolvedErrors == null ? Collections.emptyList() : resolvedErrors;
if (resolvedAlways != null && !this.isTerminated(resolvedErrors, parentTaskRun)) {
return resolvedErrors;
} else if (resolvedAlways == null) {
return resolvedErrors;
}
}

if (this.isTerminated(resolvedTasks, parentTaskRun) && resolvedAlways != null) {
return resolvedAlways;
}

return resolvedTasks;
Expand All @@ -390,8 +414,7 @@ private List<ResolvedTask> removeDisabled(List<ResolvedTask> tasks) {
.toList();
}

public List<TaskRun> findTaskRunByTasks(List<ResolvedTask> resolvedTasks,
TaskRun parentTaskRun) {
public List<TaskRun> findTaskRunByTasks(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun) {
if (resolvedTasks == null || this.taskRunList == null) {
return Collections.emptyList();
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {
@Valid
List<Task> errors;

@Valid
List<Task> always;

@Valid
@Deprecated
List<Listener> listeners;
Expand Down Expand Up @@ -188,6 +191,7 @@ public Stream<Task> allTasks() {
return Stream.of(
this.tasks != null ? this.tasks : new ArrayList<Task>(),
this.errors != null ? this.errors : new ArrayList<Task>(),
this.always != null ? this.always : new ArrayList<Task>(),
this.listenersTasks()
)
.flatMap(Collection::stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public class FlowForExecution extends AbstractFlow {
@Valid
List<TaskForExecution> errors;

@Valid
List<TaskForExecution> always;

@Valid
List<AbstractTriggerForExecution> triggers;

Expand All @@ -36,6 +39,7 @@ public static FlowForExecution of(Flow flow) {
.inputs(flow.getInputs())
.tasks(flow.getTasks().stream().map(TaskForExecution::of).toList())
.errors(ListUtils.emptyOnNull(flow.getErrors()).stream().map(TaskForExecution::of).toList())
.always(ListUtils.emptyOnNull(flow.getAlways()).stream().map(TaskForExecution::of).toList())
.triggers(ListUtils.emptyOnNull(flow.getTriggers()).stream().map(AbstractTriggerForExecution::of).toList())
.disabled(flow.isDisabled())
.deleted(flow.isDeleted())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public Flow toFlow() {
.variables(this.variables)
.tasks(this.tasks)
.errors(this.errors)
.always(this.always)
.listeners(this.listeners)
.triggers(this.triggers)
.pluginDefaults(this.pluginDefaults)
Expand Down Expand Up @@ -69,6 +70,7 @@ public static FlowWithSource of(Flow flow, String source) {
.variables(flow.variables)
.tasks(flow.tasks)
.errors(flow.errors)
.always(flow.always)
.listeners(flow.listeners)
.triggers(flow.triggers)
.pluginDefaults(flow.pluginDefaults)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ public interface FlowableTask <T extends Output> {
@PluginProperty
List<Task> getErrors();

@Schema(
title = "List of tasks to run after any tasks failed or success on this FlowableTask."
)
@PluginProperty
List<Task> getAlways();

/**
* Create the topology representation of a flowable task.
* <p>
Expand Down Expand Up @@ -71,6 +77,7 @@ default Optional<State.Type> resolveState(RunContext runContext, Execution execu
execution,
this.childTasks(runContext, parentTaskRun),
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
FlowableUtils.resolveTasks(this.getAlways(), parentTaskRun),
parentTaskRun,
runContext,
isAllowFailure(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {
@Valid
private List<Task> errors;

@Valid
private List<Task> always;

@NotNull
@Builder.Default
private final boolean deleted = false;
Expand Down Expand Up @@ -138,6 +141,7 @@ public Template toDeleted() {
this.description,
this.tasks,
this.errors,
this.always,
true
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ private Optional<WorkerTaskResult> childWorkerTaskResult(Flow flow, Execution ex
// Then wait for completion (KILLED or whatever) on child tasks to KILLED the parent one.
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(
flowableParent.childTasks(runContext, parentTaskRun),
FlowableUtils.resolveTasks(flowableParent.getErrors(), parentTaskRun)
FlowableUtils.resolveTasks(flowableParent.getErrors(), parentTaskRun),
FlowableUtils.resolveTasks(flowableParent.getAlways(), parentTaskRun)
);

List<TaskRun> taskRunByTasks = execution.findTaskRunByTasks(currentTasks, parentTaskRun);
Expand Down Expand Up @@ -426,7 +427,8 @@ private Executor handleNext(Executor executor) {
.resolveSequentialNexts(
executor.getExecution(),
ResolvedTask.of(executor.getFlow().getTasks()),
ResolvedTask.of(executor.getFlow().getErrors())
ResolvedTask.of(executor.getFlow().getErrors()),
ResolvedTask.of(executor.getFlow().getAlways())
);

if (nextTaskRuns.isEmpty()) {
Expand Down Expand Up @@ -686,7 +688,8 @@ private Executor handleEnd(Executor executor) {

List<ResolvedTask> currentTasks = executor.getExecution().findTaskDependingFlowState(
ResolvedTask.of(executor.getFlow().getTasks()),
ResolvedTask.of(executor.getFlow().getErrors())
ResolvedTask.of(executor.getFlow().getErrors()),
ResolvedTask.of(executor.getFlow().getAlways())
);

if (!executor.getExecution().isTerminated(currentTasks)) {
Expand Down
31 changes: 23 additions & 8 deletions core/src/main/java/io/kestra/core/runners/FlowableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.stream.Stream;

public class FlowableUtils {
@Deprecated(forRemoval = true)
public static List<NextTaskRun> resolveSequentialNexts(
Execution execution,
List<ResolvedTask> tasks
Expand All @@ -31,18 +32,20 @@ public static List<NextTaskRun> resolveSequentialNexts(
public static List<NextTaskRun> resolveSequentialNexts(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors
List<ResolvedTask> errors,
List<ResolvedTask> always
) {
return resolveSequentialNexts(execution, tasks, errors, null);
return resolveSequentialNexts(execution, tasks, errors, always, null);
}

public static List<NextTaskRun> resolveSequentialNexts(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> always,
TaskRun parentTaskRun
) {
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, parentTaskRun);
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, always, parentTaskRun);

return FlowableUtils.innerResolveSequentialNexts(execution, currentTasks, parentTaskRun);
}
Expand Down Expand Up @@ -92,9 +95,10 @@ public static List<NextTaskRun> resolveWaitForNext(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> always,
TaskRun parentTaskRun
) {
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, parentTaskRun);
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, always, parentTaskRun);

// nothing
if (currentTasks == null || currentTasks.isEmpty() || execution.getState().getCurrent() == State.Type.KILLING) {
Expand Down Expand Up @@ -140,12 +144,13 @@ public static Optional<State.Type> resolveState(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> always,
TaskRun parentTaskRun,
RunContext runContext,
boolean allowFailure,
boolean allowWarning
) {
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, parentTaskRun);
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, always, parentTaskRun);

if (currentTasks == null) {
runContext.logger().warn(
Expand Down Expand Up @@ -197,12 +202,15 @@ public static List<NextTaskRun> resolveParallelNexts(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> always,
TaskRun parentTaskRun,
Integer concurrency
) {
return resolveParallelNexts(
execution,
tasks, errors,
tasks,
errors,
always,
parentTaskRun,
concurrency,
(nextTaskRunStream, taskRuns) -> nextTaskRunStream
Expand All @@ -217,6 +225,7 @@ public static List<NextTaskRun> resolveConcurrentNexts(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> always,
TaskRun parentTaskRun,
Integer concurrency
) {
Expand All @@ -227,6 +236,7 @@ public static List<NextTaskRun> resolveConcurrentNexts(
List<ResolvedTask> allTasks = execution.findTaskDependingFlowState(
tasks,
errors,
always,
parentTaskRun
);

Expand All @@ -249,7 +259,7 @@ public static List<NextTaskRun> resolveConcurrentNexts(
if (taskRuns.isEmpty()) {
Map<String, List<ResolvedTask>> collect = allTasks
.stream()
.collect(Collectors.groupingBy(resolvedTask -> resolvedTask.getValue(), () -> new LinkedHashMap<>(), Collectors.toList()));
.collect(Collectors.groupingBy(ResolvedTask::getValue, LinkedHashMap::new, Collectors.toList()));
return collect.values().stream()
.limit(concurrencySlots)
.map(resolvedTasks -> resolvedTasks.getFirst().toNextTaskRun(execution))
Expand All @@ -260,7 +270,8 @@ public static List<NextTaskRun> resolveConcurrentNexts(
// start as many tasks as we have concurrency slots
Map<String, List<ResolvedTask>> collect = allTasks
.stream()
.collect(Collectors.groupingBy(resolvedTask -> resolvedTask.getValue(), () -> new LinkedHashMap<>(), Collectors.toList()));
.collect(Collectors.groupingBy(ResolvedTask::getValue, LinkedHashMap::new, Collectors.toList()));

return collect.values().stream()
.map(resolvedTasks -> filterCreated(resolvedTasks, taskRuns, parentTaskRun))
.filter(resolvedTasks -> !resolvedTasks.isEmpty())
Expand All @@ -281,6 +292,7 @@ public static List<NextTaskRun> resolveDagNexts(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> always,
TaskRun parentTaskRun,
Integer concurrency,
List<Dag.DagTask> taskDependencies
Expand All @@ -289,6 +301,7 @@ public static List<NextTaskRun> resolveDagNexts(
execution,
tasks,
errors,
always,
parentTaskRun,
concurrency,
(nextTaskRunStream, taskRuns) -> nextTaskRunStream
Expand Down Expand Up @@ -321,6 +334,7 @@ public static List<NextTaskRun> resolveParallelNexts(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> always,
TaskRun parentTaskRun,
Integer concurrency,
BiFunction<Stream<NextTaskRun>, List<TaskRun>, Stream<NextTaskRun>> nextTaskRunFunction
Expand All @@ -332,6 +346,7 @@ public static List<NextTaskRun> resolveParallelNexts(
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(
tasks,
errors,
always,
parentTaskRun
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ public class AllowFailure extends Sequential implements FlowableTask<VoidOutput>
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
List<ResolvedTask> resolvedTasks = this.childTasks(runContext, parentTaskRun);
List<ResolvedTask> resolvedErrors = FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun);
List<ResolvedTask> resolvedAlways = FlowableUtils.resolveTasks(this.getAlways(), parentTaskRun);

Optional<State.Type> type = FlowableUtils.resolveState(
execution,
resolvedTasks,
resolvedErrors,
resolvedAlways,
parentTaskRun,
runContext,
this.isAllowFailure(),
Expand All @@ -77,6 +79,7 @@ public Optional<State.Type> resolveState(RunContext runContext, Execution execut
execution,
resolvedTasks,
null,
resolvedAlways,
parentTaskRun,
runContext,
this.isAllowFailure(),
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/java/io/kestra/plugin/core/flow/Dag.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public class Dag extends Task implements FlowableTask<VoidOutput> {
@PluginProperty
protected List<Task> errors;

@Valid
protected List<Task> always;

@Override
public GraphCluster tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException {
GraphCluster subGraph = new GraphCluster(this, taskRun, parentValues, RelationType.DYNAMIC);
Expand Down Expand Up @@ -138,7 +141,10 @@ public List<Task> allChildTasks() {
return Stream
.concat(
this.tasks != null ? this.tasks.stream().map(DagTask::getTask) : Stream.empty(),
this.errors != null ? this.errors.stream() : Stream.empty()
Stream.concat(
this.errors != null ? this.errors.stream() : Stream.empty(),
this.always != null ? this.always.stream() : Stream.empty()
)
)
.toList();
}
Expand All @@ -156,6 +162,7 @@ public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution
execution,
this.childTasks(runContext, parentTaskRun),
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
FlowableUtils.resolveTasks(this.always, parentTaskRun),
parentTaskRun,
this.concurrent,
this.tasks
Expand Down
Loading

0 comments on commit abc1bc1

Please sign in to comment.