Skip to content

Commit

Permalink
feat(core, ui, webserver): add replay system labels
Browse files Browse the repository at this point in the history
- Add system.replay to executions that are a replay
- Add system.replayed to executions that have been replayed

Fixes #6682
  • Loading branch information
loicmathieu committed Jan 8, 2025
1 parent 35ffe7e commit c0e14ca
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 11 deletions.
2 changes: 2 additions & 0 deletions core/src/main/java/io/kestra/core/models/Label.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public record Label(@NotNull String key, @NotNull String value) {
public static final String APP = SYSTEM_PREFIX + "app";
public static final String READ_ONLY = SYSTEM_PREFIX + "readOnly";
public static final String RESTARTED = SYSTEM_PREFIX + "restarted";
public static final String REPLAY = SYSTEM_PREFIX + "replay";
public static final String REPLAYED = SYSTEM_PREFIX + "replayed";

/**
* Static helper method for converting a map to a list of labels.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,11 @@ public Execution replay(final Execution execution, @Nullable String taskRunId, @
taskRunId == null ? new State() : execution.withState(State.Type.RESTARTED).getState()
);

newExecution = newExecution.withMetadata(execution.getMetadata().nextAttempt());
List<Label> newLabels = new ArrayList<>(execution.getLabels());
if (!newLabels.contains(new Label(Label.REPLAY, "true"))) {
newLabels.add(new Label(Label.REPLAY, "true"));
}
newExecution = newExecution.withMetadata(execution.getMetadata().nextAttempt()).withLabels(newLabels);

return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
Expand Down Expand Up @@ -62,9 +63,9 @@ void restartSimple() throws Exception {
assertThat(restart.getTaskRunList(), hasSize(3));
assertThat(restart.getTaskRunList().get(2).getState().getCurrent(), is(State.Type.RESTARTED));
assertThat(restart.getTaskRunList().get(2).getState().getHistories(), hasSize(4));

assertThat(restart.getId(), is(execution.getId()));
assertThat(restart.getTaskRunList().get(2).getId(), is(execution.getTaskRunList().get(2).getId()));
assertThat(restart.getLabels(), hasItem(new Label(Label.RESTARTED, "true")));
}

@Test
Expand Down Expand Up @@ -97,9 +98,9 @@ void restartSimpleRevision() throws Exception {
assertThat(restart.getTaskRunList(), hasSize(3));
assertThat(restart.getTaskRunList().get(2).getState().getCurrent(), is(State.Type.RESTARTED));
assertThat(restart.getTaskRunList().get(2).getState().getHistories(), hasSize(4));

assertThat(restart.getId(), not(execution.getId()));
assertThat(restart.getTaskRunList().get(2).getId(), not(execution.getTaskRunList().get(2).getId()));
assertThat(restart.getLabels(), hasItem(new Label(Label.RESTARTED, "true")));
}

@RetryingTest(5)
Expand All @@ -115,6 +116,7 @@ void restartFlowable() throws Exception {
assertThat(restart.getTaskRunList().stream().filter(taskRun -> taskRun.getState().getCurrent() == State.Type.RESTARTED).count(), greaterThan(1L));
assertThat(restart.getTaskRunList().stream().filter(taskRun -> taskRun.getState().getCurrent() == State.Type.RUNNING).count(), greaterThan(1L));
assertThat(restart.getTaskRunList().getFirst().getId(), is(restart.getTaskRunList().getFirst().getId()));
assertThat(restart.getLabels(), hasItem(new Label(Label.RESTARTED, "true")));
}

@RetryingTest(5)
Expand All @@ -130,6 +132,7 @@ void restartFlowable2() throws Exception {
assertThat(restart.getTaskRunList().stream().filter(taskRun -> taskRun.getState().getCurrent() == State.Type.RESTARTED).count(), greaterThan(1L));
assertThat(restart.getTaskRunList().stream().filter(taskRun -> taskRun.getState().getCurrent() == State.Type.RUNNING).count(), greaterThan(1L));
assertThat(restart.getTaskRunList().getFirst().getId(), is(restart.getTaskRunList().getFirst().getId()));
assertThat(restart.getLabels(), hasItem(new Label(Label.RESTARTED, "true")));
}

@Test
Expand All @@ -145,6 +148,7 @@ void restartDynamic() throws Exception {

assertThat(restart.getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.RESTARTED));
assertThat(restart.getTaskRunList().getFirst().getState().getHistories(), hasSize(4));
assertThat(restart.getLabels(), hasItem(new Label(Label.RESTARTED, "true")));
}

@Test
Expand All @@ -164,8 +168,8 @@ void replayFromBeginning() throws Exception {
assertThat(restart.getState().getHistories(), hasSize(1));
assertThat(restart.getState().getHistories().getFirst().getDate(), not(is(execution.getState().getStartDate())));
assertThat(restart.getTaskRunList(), hasSize(0));

assertThat(restart.getId(), not(execution.getId()));
assertThat(restart.getLabels(), hasItem(new Label(Label.REPLAY, "true")));
}

@Test
Expand All @@ -182,9 +186,9 @@ void replaySimple() throws Exception {
assertThat(restart.getTaskRunList(), hasSize(2));
assertThat(restart.getTaskRunList().get(1).getState().getCurrent(), is(State.Type.RESTARTED));
assertThat(restart.getTaskRunList().get(1).getState().getHistories(), hasSize(4));

assertThat(restart.getId(), not(execution.getId()));
assertThat(restart.getTaskRunList().get(1).getId(), not(execution.getTaskRunList().get(1).getId()));
assertThat(restart.getLabels(), hasItem(new Label(Label.REPLAY, "true")));
}

@Test
Expand All @@ -200,9 +204,9 @@ void replayFlowable() throws Exception {
assertThat(restart.getState().getHistories(), hasSize(4));
assertThat(restart.getTaskRunList(), hasSize(20));
assertThat(restart.getTaskRunList().get(19).getState().getCurrent(), is(State.Type.RESTARTED));

assertThat(restart.getId(), not(execution.getId()));
assertThat(restart.getTaskRunList().get(1).getId(), not(execution.getTaskRunList().get(1).getId()));
assertThat(restart.getLabels(), hasItem(new Label(Label.REPLAY, "true")));
}

@Test
Expand All @@ -222,6 +226,7 @@ void replayParallel() throws Exception {

assertThat(restart.getId(), not(execution.getId()));
assertThat(restart.getTaskRunList().get(1).getId(), not(execution.getTaskRunList().get(1).getId()));
assertThat(restart.getLabels(), hasItem(new Label(Label.REPLAY, "true")));
}

@Test
Expand All @@ -241,6 +246,7 @@ void replayEachSeq() throws Exception {

assertThat(restart.getId(), not(execution.getId()));
assertThat(restart.getTaskRunList().get(1).getId(), not(execution.getTaskRunList().get(1).getId()));
assertThat(restart.getLabels(), hasItem(new Label(Label.REPLAY, "true")));
}

@Test
Expand All @@ -260,6 +266,7 @@ void replayEachSeq2() throws Exception {

assertThat(restart.getId(), not(execution.getId()));
assertThat(restart.getTaskRunList().get(1).getId(), not(execution.getTaskRunList().get(1).getId()));
assertThat(restart.getLabels(), hasItem(new Label(Label.REPLAY, "true")));
}

@Test
Expand All @@ -279,6 +286,7 @@ void replayWithADynamicTask() throws Exception {

assertThat(restart.getId(), not(execution.getId()));
assertThat(restart.getTaskRunList().get(1).getId(), not(execution.getTaskRunList().get(1).getId()));
assertThat(restart.getLabels(), hasItem(new Label(Label.REPLAY, "true")));
}

@Test
Expand All @@ -298,6 +306,7 @@ void replayEachPara() throws Exception {

assertThat(restart.getId(), not(execution.getId()));
assertThat(restart.getTaskRunList().get(1).getId(), not(execution.getTaskRunList().get(1).getId()));
assertThat(restart.getLabels(), hasItem(new Label(Label.REPLAY, "true")));
}

@Test
Expand Down
26 changes: 26 additions & 0 deletions ui/src/components/executions/Overview.vue
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,26 @@
</el-alert>
</div>

<div v-if="isReplayed()">
<el-alert type="info" :closable="false" class="mb-4 main-info">
<template #title>
<div>
{{ $t('execution replayed') }}
</div>
</template>
</el-alert>
</div>

<div v-if="isReplay()">
<el-alert type="info" :closable="false" class="mb-4 main-info">
<template #title>
<div>
<span v-html="$t('execution replay', {originalId: execution?.originalId})" />
</div>
</template>
</el-alert>
</div>

<el-row class="mb-3">
<el-col :span="12" class="crud-align">
<crud type="CREATE" permission="EXECUTION" :detail="{executionId: execution.id}" />
Expand Down Expand Up @@ -214,6 +234,12 @@
isRestarted() {
return this.execution.labels?.find( it => it.key === "system.restarted" && (it.value === "true" || it.value === true)) !== undefined;
},
isReplayed() {
return this.execution.labels?.find( it => it.key === "system.replayed" && (it.value === "true" || it.value === true)) !== undefined;
},
isReplay() {
return this.execution.labels?.find( it => it.key === "system.replay" && (it.value === "true" || it.value === true)) !== undefined;
},
load() {
this.$store
.dispatch(
Expand Down
2 changes: 2 additions & 0 deletions ui/src/translations/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,8 @@
"execution failed header": "Execution failed!",
"execution failed message": "With last error: <code>{message}</code>",
"execution restarted": "This execution has been restarted {nbRestart} time(s).",
"execution replay": "This execution is a replay of <code>{originalId}</code>.",
"execution replayed": "This execution has been replayed.",
"task run id": "TaskRun ID",
"active": "Active",
"flows imported": "Flows imported",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,9 +879,22 @@ public Execution replay(

this.controlRevision(execution.get(), revision);

Execution replay = executionService.replay(execution.get(), taskRunId, revision);
return innerReplay(execution.get(), taskRunId, revision);
}

private Execution innerReplay(Execution execution, @Nullable String taskRunId, @Nullable Integer revision) throws Exception {
Execution replay = executionService.replay(execution, taskRunId, revision);
executionQueue.emit(replay);
eventPublisher.publishEvent(new CrudEvent<>(replay, execution.get(), CrudEventType.CREATE));
eventPublisher.publishEvent(new CrudEvent<>(replay, execution, CrudEventType.CREATE));

// update parent exec with replayed label
List<Label> newLabels = new ArrayList<>(execution.getLabels());
if (!newLabels.contains(new Label(Label.REPLAYED, "true"))) {
newLabels.add(new Label(Label.REPLAYED, "true"));
}
Execution newExecution = execution.withLabels(newLabels);
eventPublisher.publishEvent(new CrudEvent<>(newExecution, execution, CrudEventType.UPDATE));
executionRepository.save(newExecution);

return replay;
}
Expand Down Expand Up @@ -1440,9 +1453,7 @@ public MutableHttpResponse<?> replayByIds(
}

for (Execution execution : executions) {
Execution replay = executionService.replay(execution, null, null);
executionQueue.emit(replay);
eventPublisher.publishEvent(new CrudEvent<>(replay, execution, CrudEventType.CREATE));
innerReplay(execution, null, null);
}

return HttpResponse.ok(BulkResponse.builder().count(executions.size()).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,32 @@ void changeStatusByQuery() throws TimeoutException, QueueException {
assertThat(executions.getResults().get(1).getState().getCurrent(), is(State.Type.WARNING));;
}

@Test
void replay() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "minimal");

assertThat(execution.getState().isTerminated(), is(true));

// replay execution
Execution replay = client.toBlocking().retrieve(
HttpRequest.POST(
"/api/v1/executions/" + execution.getId() + "/replay",
null
),
Execution.class
);
assertThat(replay.getState().getCurrent(), is(State.Type.CREATED));
assertThat(replay.getOriginalId(), is(execution.getId()));
assertThat(replay.getLabels(), hasItem(new Label(Label.REPLAY, "true")));

// load the original execution and check that it has the system.replayed label
Execution original = client.toBlocking().retrieve(
HttpRequest.GET("/api/v1/executions/" + execution.getId()),
Execution.class
);
assertThat(original.getLabels(), hasItem(new Label(Label.REPLAYED, "true")));
}

@Test
void replayByIds() throws TimeoutException, QueueException {
Execution execution1 = runnerUtils.runOne(null, "io.kestra.tests", "minimal");
Expand Down

0 comments on commit c0e14ca

Please sign in to comment.