Skip to content

Commit

Permalink
fix(core): Continue WaitFor loop if tasks are not failed (#6572)
Browse files Browse the repository at this point in the history
close #6031
  • Loading branch information
Skraye authored Jan 9, 2025
1 parent 9b59c75 commit 1b615c9
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 2 deletions.
9 changes: 9 additions & 0 deletions core/src/main/java/io/kestra/core/models/flows/State.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ public boolean isTerminated() {
return this.current.isTerminated();
}

@JsonIgnore
public boolean isTerminatedNoFail() {
return this.current.isTerminatedNoFail();
}

@JsonIgnore
public boolean isRunning() {
return this.current.isRunning();
Expand Down Expand Up @@ -216,6 +221,10 @@ public boolean isTerminated() {
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED || this == Type.RETRIED || this == Type.SKIPPED;
}

public boolean isTerminatedNoFail() {
return this == Type.WARNING || this == Type.SUCCESS || this == Type.RETRIED || this == Type.SKIPPED;
}

public boolean isCreated() {
return this == Type.CREATED || this == Type.RESTARTED;
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/kestra/plugin/core/flow/WaitFor.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public boolean childTaskRunExecuted(Execution execution, TaskRun parentTaskRun)
.stream()
.filter(t -> t.getParentTaskRunId() != null
&& t.getParentTaskRunId().equals(parentTaskRun.getId())
&& t.getState().isSuccess()
&& t.getState().isTerminatedNoFail()
).count() == tasks.size();

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public abstract class AbstractRunnerTest {
protected ForEachItemCaseTest forEachItemCaseTest;

@Inject
private WaitForCaseTest waitForTestCaseTest;
protected WaitForCaseTest waitForTestCaseTest;

@Inject
private FlowConcurrencyCaseTest flowConcurrencyCaseTest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,11 @@ public void waitforMultipleTasksFailed() throws TimeoutException, QueueException
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList().getLast().attemptNumber(), is(1));
}

public void waitForChildTaskWarning() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "waitfor-child-task-warning");

assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat((Integer) execution.getTaskRunList().getFirst().getOutputs().get("iterationCount"), greaterThan(1));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
id: waitfor-child-task-warning
namespace: io.kestra.tests

tasks:
- id: loop
type: io.kestra.plugin.core.flow.WaitFor
condition: "{{ outputs.check_migration_task.values['test'] == 'FINISHED' }}"
failOnMaxReached: true
checkFrequency:
interval: PT5S
maxDuration: PT10S
tasks:

## forcing a Warning
- id: allow_failure
allowFailure: true
type: io.kestra.plugin.core.execution.Fail

- id: check_migration_task
type: io.kestra.plugin.core.output.OutputValues
values:
test: "ok"
6 changes: 6 additions & 0 deletions jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ public abstract class JdbcRunnerTest extends AbstractRunnerTest {
@Inject
private JdbcTestUtils jdbcTestUtils;

@Test
@LoadFlows({"flows/valids/waitfor-child-task-warning.yaml"})
void waitForChildTaskWarning() throws Exception {
waitForTestCaseTest.waitForChildTaskWarning();
}

@Test
@LoadFlows({"flows/valids/inputs-large.yaml"})
void flowTooLarge() throws Exception {
Expand Down

0 comments on commit 1b615c9

Please sign in to comment.