Skip to content

Commit

Permalink
feat(core): add an Exit task
Browse files Browse the repository at this point in the history
Fixes #5599
  • Loading branch information
loicmathieu committed Dec 24, 2024
1 parent e9929fd commit ff83e25
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
package io.kestra.core.models.tasks;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.RunContext;

import java.util.Optional;

/**
* Interface for tasks that modify the execution at runtime.
*/
public interface ExecutionUpdatableTask {
Execution update(Execution execution, RunContext runContext) throws Exception;

/**
* Resolve the state of a flowable task.
*/
default Optional<State.Type> resolveState(RunContext runContext, Execution execution) throws IllegalVariableEvaluationException {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -916,12 +916,13 @@ private Executor handleExecutionUpdatingTask(final Executor executor) {
"handleExecutionUpdatingTask.updateExecution"
);

var taskState = executionUpdatingTask.resolveState(workerTask.getRunContext(), executor.getExecution()).orElse(State.Type.SUCCESS);
workerTaskResults.add(
WorkerTaskResult.builder()
.taskRun(workerTask.getTaskRun().withAttempts(
Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(State.Type.SUCCESS)).build())
Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(taskState)).build())
)
.withState(State.Type.SUCCESS)
.withState(taskState)
)
.build()
);
Expand Down
136 changes: 136 additions & 0 deletions core/src/main/java/io/kestra/plugin/core/execution/Exit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package io.kestra.plugin.core.execution;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.ExecutionKilledExecution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.ExecutionUpdatableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.util.Optional;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Exit the execution: terminate it in the state defined by the property `state`.",
description = "Note that if this execution has running tasks, for ex in a parallel branch, they will not be terminated except if `state` is set to `KILLED`."
)
@Plugin(
examples = {
@Example(
full = true,
code = """
id: exit
namespace: company.team
inputs:
- id: state
type: SELECT
values:
- CONTINUE
- END
defaults: CONTINUE
tasks:
- id: if
type: io.kestra.plugin.core.flow.If
condition: "{{inputs.state == 'CONTINUE'}}"
then:
- id: hello
type: io.kestra.plugin.core.log.Log
message: I'm continuing
else:
- id: exit
type: io.kestra.plugin.core.execution.Exit
state: KILLED
- id: end
type: io.kestra.plugin.core.log.Log
message: I'm ending
"""
)
}
)
public class Exit extends Task implements ExecutionUpdatableTask {
@NotNull
@Schema(
title = "The execution exit state",
description = "Using `KILLED` will ends existing running tasks, other state will not."
)
@Builder.Default
private Property<ExitState> state = Property.of(ExitState.SUCCESS);

@Override
public Execution update(Execution execution, RunContext runContext) throws Exception {
State.Type exitState = executionState(runContext);

// if the state is killed, we send a kill event and end here
if (exitState == State.Type.KILLED) {
QueueInterface<ExecutionKilled> killQueue = ((DefaultRunContext) runContext).getApplicationContext().getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.KILL_NAMED));
killQueue.emit(ExecutionKilledExecution
.builder()
.state(ExecutionKilled.State.REQUESTED)
.executionId(execution.getId())
.isOnKillCascade(false)
.tenantId(execution.getTenantId())
.build()
);
return execution.withState(exitState);
}

return execution.findLastNotTerminated()
.map(taskRun -> {
try {
TaskRun newTaskRun = taskRun.withState(exitState);
Execution newExecution = execution.withTaskRun(newTaskRun);
// ends all parents
while (newTaskRun.getParentTaskRunId() != null) {
newTaskRun = newExecution.findTaskRunByTaskRunId(newTaskRun.getParentTaskRunId()).withState(exitState);
newExecution = execution.withTaskRun(newTaskRun);
}
return newExecution;
} catch (InternalException e) {
// in case we cannot update the last not terminated task run, we ignore it
return execution;
}
})
.orElse(execution)
.withState(exitState);
}

@Override
public Optional<State.Type> resolveState(RunContext runContext, Execution execution) throws IllegalVariableEvaluationException {
return Optional.of(executionState(runContext));
}

private State.Type executionState(RunContext runContext) throws IllegalVariableEvaluationException {
return switch (runContext.render(this.state).as(ExitState.class).orElseThrow()) {
case ExitState.SUCCESS -> State.Type.SUCCESS;
case WARNING -> State.Type.WARNING;
case KILLED -> State.Type.KILLED;
case FAILED -> State.Type.FAILED;
case CANCELED -> State.Type.CANCELLED;
};
}

public enum ExitState {
SUCCESS, WARNING, KILLED, FAILED, CANCELED
}
}
69 changes: 69 additions & 0 deletions core/src/test/java/io/kestra/plugin/core/execution/ExitTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.kestra.plugin.core.execution;

import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertTrue;

@KestraTest(startRunner = true)
class ExitTest {
@Inject
private RunnerUtils runnerUtils;

@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;

@Test
@ExecuteFlow("flows/valids/exit.yaml")
void shouldExitTheExecution(Execution execution) {
assertThat(execution.getState().getCurrent(), is(State.Type.WARNING));
assertThat(execution.getTaskRunList().size(), is(2));
assertThat(execution.getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.WARNING));
}

@Test
@LoadFlows("flows/valids/exit-killed.yaml")
void shouldExitAndKillTheExecution() throws TimeoutException, QueueException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);// Only the second reception of the execution will move all tasks to KILLED
AtomicReference<Execution> killedExecution = new AtomicReference<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("exit-killed") && execution.getState().getCurrent().isTerminated()) {
killedExecution.set(execution);
countDownLatch.countDown();
}
});

runnerUtils.runOneUntilRunning(null, "io.kestra.tests", "exit-killed", null, null, Duration.ofSeconds(30));

assertTrue(countDownLatch.await(1, TimeUnit.MINUTES));
receive.blockLast();
assertThat(killedExecution.get(), notNullValue());
assertThat(killedExecution.get().getState().getCurrent(), is(State.Type.KILLED));
assertThat(killedExecution.get().getTaskRunList().size(), is(2));
assertThat(killedExecution.get().getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.KILLED));
assertThat(killedExecution.get().getTaskRunList().get(1).getState().getCurrent(), is(State.Type.KILLED));
}
}
26 changes: 26 additions & 0 deletions core/src/test/resources/flows/valids/exit-killed.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
id: exit-killed
namespace: io.kestra.tests

inputs:
- id: state
type: SELECT
values:
- CONTINUE
- END
defaults: END

tasks:
- id: if
type: io.kestra.plugin.core.flow.If
condition: "{{inputs.state == 'CONTINUE'}}"
then:
- id: hello
type: io.kestra.plugin.core.log.Log
message: I'm continuing
else:
- id: exit
type: io.kestra.plugin.core.execution.Exit
state: KILLED
- id: end
type: io.kestra.plugin.core.log.Log
message: I'm ending
26 changes: 26 additions & 0 deletions core/src/test/resources/flows/valids/exit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
id: exit
namespace: io.kestra.tests

inputs:
- id: state
type: SELECT
values:
- CONTINUE
- END
defaults: END

tasks:
- id: if
type: io.kestra.plugin.core.flow.If
condition: "{{inputs.state == 'CONTINUE'}}"
then:
- id: hello
type: io.kestra.plugin.core.log.Log
message: I'm continuing
else:
- id: exit
type: io.kestra.plugin.core.execution.Exit
state: WARNING
- id: end
type: io.kestra.plugin.core.log.Log
message: I'm ending

0 comments on commit ff83e25

Please sign in to comment.