Skip to content

Commit

Permalink
feat(core): change always property to finally
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jan 9, 2025
1 parent 7097786 commit ab7dfed
Show file tree
Hide file tree
Showing 40 changed files with 311 additions and 242 deletions.
22 changes: 11 additions & 11 deletions core/src/main/java/io/kestra/core/models/executions/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -353,27 +353,27 @@ public List<ResolvedTask> findTaskDependingFlowState(
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedAlways afters tasks
* @param resolvedFinally afters tasks
* @param parentTaskRun the parent task
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(
List<ResolvedTask> resolvedTasks,
@Nullable List<ResolvedTask> resolvedErrors,
@Nullable List<ResolvedTask> resolvedAlways,
@Nullable List<ResolvedTask> resolvedFinally,
TaskRun parentTaskRun
) {
resolvedTasks = removeDisabled(resolvedTasks);
resolvedErrors = removeDisabled(resolvedErrors);
resolvedAlways = removeDisabled(resolvedAlways);
resolvedFinally = removeDisabled(resolvedFinally);


List<TaskRun> errorsFlow = this.findTaskRunByTasks(resolvedErrors, parentTaskRun);
List<TaskRun> alwaysFlow = this.findTaskRunByTasks(resolvedAlways, parentTaskRun);
List<TaskRun> finallyFlow = this.findTaskRunByTasks(resolvedFinally, parentTaskRun);

// always is already started, just continue theses always
if (!alwaysFlow.isEmpty()) {
return resolvedAlways == null ? Collections.emptyList() : resolvedAlways;
// finally is already started, just continue theses finally
if (!finallyFlow.isEmpty()) {
return resolvedFinally == null ? Collections.emptyList() : resolvedFinally;
}

// Check if flow has failed task
Expand All @@ -383,15 +383,15 @@ public List<ResolvedTask> findTaskDependingFlowState(
return Collections.emptyList();
}

if (resolvedAlways != null && resolvedErrors != null && !this.isTerminated(resolvedErrors, parentTaskRun)) {
if (resolvedFinally != null && resolvedErrors != null && !this.isTerminated(resolvedErrors, parentTaskRun)) {
return resolvedErrors;
} else if (resolvedAlways == null) {
} else if (resolvedFinally == null) {
return resolvedErrors == null ? Collections.emptyList() : resolvedErrors;
}
}

if (this.isTerminated(resolvedTasks, parentTaskRun) && resolvedAlways != null) {
return resolvedAlways;
if (this.isTerminated(resolvedTasks, parentTaskRun) && resolvedFinally != null) {
return resolvedFinally;
}

return resolvedTasks;
Expand Down
16 changes: 10 additions & 6 deletions core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
Expand Down Expand Up @@ -32,10 +33,7 @@
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -82,7 +80,13 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {
List<Task> errors;

@Valid
List<Task> always;
@JsonProperty("finally")
@Getter(AccessLevel.NONE)
protected List<Task> _finally;

public List<Task> getFinally() {
return this._finally;
}

@Valid
@Deprecated
Expand Down Expand Up @@ -191,7 +195,7 @@ public Stream<Task> allTasks() {
return Stream.of(
this.tasks != null ? this.tasks : new ArrayList<Task>(),
this.errors != null ? this.errors : new ArrayList<Task>(),
this.always != null ? this.always : new ArrayList<Task>(),
this._finally != null ? this._finally : new ArrayList<Task>(),
this.listenersTasks()
)
.flatMap(Collection::stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class FlowForExecution extends AbstractFlow {
List<TaskForExecution> errors;

@Valid
List<TaskForExecution> always;
List<TaskForExecution> _finally;

@Valid
List<AbstractTriggerForExecution> triggers;
Expand All @@ -39,7 +39,7 @@ public static FlowForExecution of(Flow flow) {
.inputs(flow.getInputs())
.tasks(flow.getTasks().stream().map(TaskForExecution::of).toList())
.errors(ListUtils.emptyOnNull(flow.getErrors()).stream().map(TaskForExecution::of).toList())
.always(ListUtils.emptyOnNull(flow.getAlways()).stream().map(TaskForExecution::of).toList())
._finally(ListUtils.emptyOnNull(flow.getFinally()).stream().map(TaskForExecution::of).toList())
.triggers(ListUtils.emptyOnNull(flow.getTriggers()).stream().map(AbstractTriggerForExecution::of).toList())
.disabled(flow.isDisabled())
.deleted(flow.isDeleted())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public Flow toFlow() {
.variables(this.variables)
.tasks(this.tasks)
.errors(this.errors)
.always(this.always)
._finally(this._finally)
.listeners(this.listeners)
.triggers(this.triggers)
.pluginDefaults(this.pluginDefaults)
Expand Down Expand Up @@ -70,7 +70,7 @@ public static FlowWithSource of(Flow flow, String source) {
.variables(flow.variables)
.tasks(flow.tasks)
.errors(flow.errors)
.always(flow.always)
._finally(flow._finally)
.listeners(flow.listeners)
.triggers(flow.triggers)
.pluginDefaults(flow.pluginDefaults)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.tasks.Task;
import lombok.Builder;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;

Expand All @@ -26,7 +26,12 @@ public class GraphCluster extends AbstractGraph {
private final GraphClusterRoot root;

@JsonIgnore
private final GraphClusterAlways always;
@Getter(AccessLevel.NONE)
private final GraphClusterFinally _finally;

public GraphClusterFinally getFinally() {
return _finally;
}

@JsonIgnore
private final GraphClusterEnd end;
Expand All @@ -44,15 +49,15 @@ public GraphCluster(String uid) {

this.relationType = null;
this.root = new GraphClusterRoot();
this.always = new GraphClusterAlways();
this._finally = new GraphClusterFinally();
this.end = new GraphClusterEnd();
this.taskNode = null;

this.addNode(this.root);
this.addNode(this.always);
this.addNode(this._finally);
this.addNode(this.end);

this.addEdge(this.getAlways(), this.getEnd(), new Relation());
this.addEdge(this.getFinally(), this.getEnd(), new Relation());
}

public GraphCluster(Task task, TaskRun taskRun, List<String> values, RelationType relationType) {
Expand All @@ -68,15 +73,15 @@ protected GraphCluster(AbstractGraphTask taskNode, String uid, RelationType rela

this.relationType = relationType;
this.root = new GraphClusterRoot();
this.always = new GraphClusterAlways();
this._finally = new GraphClusterFinally();
this.end = new GraphClusterEnd();
this.taskNode = taskNode;

this.addNode(this.root);
this.addNode(this.always);
this.addNode(this._finally);
this.addNode(this.end);

this.addEdge(this.getAlways(), this.getEnd(), new Relation());
this.addEdge(this.getFinally(), this.getEnd(), new Relation());
}

public void addNode(AbstractGraph node) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.kestra.core.models.hierarchies;

import io.kestra.core.utils.IdUtils;
import lombok.Getter;

@Getter
public class GraphClusterFinally extends AbstractGraph {
public GraphClusterFinally() {
super("finally-" + IdUtils.create());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ public enum RelationType {
SEQUENTIAL,
CHOICE,
ERROR,
ALWAYS,
FINALLY,
PARALLEL,
DYNAMIC
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public interface FlowableTask <T extends Output> {
title = "List of tasks to run after any tasks failed or success on this FlowableTask."
)
@PluginProperty
List<Task> getAlways();
List<Task> getFinally();

/**
* Create the topology representation of a flowable task.
Expand Down Expand Up @@ -77,7 +77,7 @@ default Optional<State.Type> resolveState(RunContext runContext, Execution execu
execution,
this.childTasks(runContext, parentTaskRun),
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
FlowableUtils.resolveTasks(this.getAlways(), parentTaskRun),
FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
parentTaskRun,
runContext,
isAllowFailure(),
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/java/io/kestra/core/models/templates/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
Expand Down Expand Up @@ -68,7 +69,13 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {
private List<Task> errors;

@Valid
private List<Task> always;
@JsonProperty("finally")
@Getter(AccessLevel.NONE)
protected List<Task> _finally;

public List<Task> getFinally() {
return this._finally;
}

@NotNull
@Builder.Default
Expand Down Expand Up @@ -141,7 +148,7 @@ public Template toDeleted() {
this.description,
this.tasks,
this.errors,
this.always,
this._finally,
true
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ private Optional<WorkerTaskResult> childWorkerTaskResult(Flow flow, Execution ex
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(
flowableParent.childTasks(runContext, parentTaskRun),
FlowableUtils.resolveTasks(flowableParent.getErrors(), parentTaskRun),
FlowableUtils.resolveTasks(flowableParent.getAlways(), parentTaskRun)
FlowableUtils.resolveTasks(flowableParent.getFinally(), parentTaskRun)
);

List<TaskRun> taskRunByTasks = execution.findTaskRunByTasks(currentTasks, parentTaskRun);
Expand Down Expand Up @@ -428,7 +428,7 @@ private Executor handleNext(Executor executor) {
executor.getExecution(),
ResolvedTask.of(executor.getFlow().getTasks()),
ResolvedTask.of(executor.getFlow().getErrors()),
ResolvedTask.of(executor.getFlow().getAlways())
ResolvedTask.of(executor.getFlow().getFinally())
);

if (nextTaskRuns.isEmpty()) {
Expand Down Expand Up @@ -689,7 +689,7 @@ private Executor handleEnd(Executor executor) {
List<ResolvedTask> currentTasks = executor.getExecution().findTaskDependingFlowState(
ResolvedTask.of(executor.getFlow().getTasks()),
ResolvedTask.of(executor.getFlow().getErrors()),
ResolvedTask.of(executor.getFlow().getAlways())
ResolvedTask.of(executor.getFlow().getFinally())
);

if (!executor.getExecution().isTerminated(currentTasks)) {
Expand Down
Loading

0 comments on commit ab7dfed

Please sign in to comment.