Skip to content

Commit

Permalink
fix(core): handle runIf inside workingDirectory
Browse files Browse the repository at this point in the history
close #6689
  • Loading branch information
Skraye committed Jan 9, 2025
1 parent 4e543a2 commit 3c5074e
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 14 deletions.
30 changes: 20 additions & 10 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.*;
Expand All @@ -18,10 +19,7 @@
import io.kestra.core.services.LabelService;
import io.kestra.core.services.LogService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.utils.Hashing;
import io.kestra.core.utils.*;
import io.kestra.plugin.core.flow.WorkingDirectory;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.context.event.ApplicationEventPublisher;
Expand Down Expand Up @@ -170,7 +168,7 @@ public Worker(

@PostConstruct
void initMetrics() {
String[] tags = this.workerGroup == null ? new String[0] : new String[] { MetricRegistry.TAG_WORKER_GROUP, this.workerGroup };
String[] tags = this.workerGroup == null ? new String[0] : new String[]{MetricRegistry.TAG_WORKER_GROUP, this.workerGroup};
// create metrics to store thread count, pending jobs and running jobs, so we can have autoscaling easily
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_THREAD_COUNT, numThreads, tags);
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_PENDING_COUNT, pendingJobCount, tags);
Expand Down Expand Up @@ -354,7 +352,20 @@ private void handleTask(final WorkerTask workerTask) {
);

// all tasks will be handled immediately by the worker
WorkerTaskResult workerTaskResult = this.run(currentWorkerTask, false);
WorkerTaskResult workerTaskResult = null;
try {
if (!TruthUtils.isTruthy(runContext.render(currentWorkerTask.getTask().getRunIf()))) {
workerTaskResult = new WorkerTaskResult(currentWorkerTask.getTaskRun().withState(SKIPPED));
this.workerTaskResultQueue.emit(workerTaskResult);
} else {
workerTaskResult = this.run(currentWorkerTask, false);
}
} catch (IllegalVariableEvaluationException e) {
RunContextLogger contextLogger = runContextLoggerFactory.create(currentWorkerTask.getTaskRun(), currentWorkerTask.getTask());
contextLogger.logger().error("Failed evaluating runIf: {}", e.getMessage(), e);
} catch (QueueException e) {
log.error("Unable to emit the worker task result for task {} taskrun {}", currentWorkerTask.getTask().getId(), currentWorkerTask.getTaskRun().getId(), e);
}

if (workerTaskResult.getTaskRun().getState().isFailed() && !currentWorkerTask.getTask().isAllowFailure()) {
break;
Expand Down Expand Up @@ -507,7 +518,7 @@ private void handleTrigger(WorkerTrigger workerTrigger) {
.gauge(MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger, workerGroup)));
this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(1);

DefaultRunContext runContext = (DefaultRunContext)workerTrigger.getConditionContext().getRunContext();
DefaultRunContext runContext = (DefaultRunContext) workerTrigger.getConditionContext().getRunContext();
runContextInitializer.forWorker(runContext, workerTrigger);
try {

Expand Down Expand Up @@ -654,7 +665,7 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) {
} catch (QueueException e) {
// If there is a QueueException it can either be caused by the message limit or another queue issue.
// We fail the task and try to resend it.
TaskRun failed = workerTask.fail();
TaskRun failed = workerTask.fail();
if (e instanceof MessageTooBigException) {
// If it's a message too big, we remove the outputs
failed = failed.withOutputs(Collections.emptyMap());
Expand Down Expand Up @@ -735,7 +746,7 @@ private void logError(WorkerTrigger workerTrigger, Throwable e) {
}

private WorkerTask runAttempt(final WorkerTask workerTask) throws QueueException {
DefaultRunContext runContext = runContextInitializer.forWorker((DefaultRunContext) workerTask.getRunContext(), workerTask);;
DefaultRunContext runContext = runContextInitializer.forWorker((DefaultRunContext) workerTask.getRunContext(), workerTask);

Logger logger = runContext.logger();

Expand Down Expand Up @@ -921,7 +932,6 @@ private boolean waitForTasksCompletion(final Duration timeout) {
}
);


// wait for task completion
Await.until(
() -> {
Expand Down
17 changes: 13 additions & 4 deletions core/src/test/java/io/kestra/core/runners/TaskWithRunIfTest.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package io.kestra.core.runners;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

@KestraTest(startRunner = true)
class TaskWithRunIfTest {

Expand All @@ -24,4 +24,13 @@ void runnableTask(Execution execution) {
assertThat(execution.findTaskRunsByTaskId("willfailedtheflow").getFirst().getState().getCurrent(), is(State.Type.FAILED));
}

@Test
@ExecuteFlow("flows/valids/task-runif-workingdirectory.yml")
void runIfWorkingDirectory(Execution execution) {
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(3));
assertThat(execution.findTaskRunsByTaskId("log_orders").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.findTaskRunsByTaskId("log_test").getFirst().getState().getCurrent(), is(State.Type.SKIPPED));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
id: task-runif-workingdirectory
namespace: io.kestra.tests

inputs:
- id: scripts_to_run
type: MULTISELECT
required: true
values:
- "orders"
- "carriers"
- "transactions"
defaults: ["orders"]

tasks:
- id: fileSystem
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:

- id: log_orders
type: io.kestra.plugin.core.log.Log
message: "{{ inputs.scripts_to_run contains 'orders' }}"
runIf: "{{ inputs.scripts_to_run contains 'orders' }}"

- id: log_test
type: io.kestra.plugin.core.log.Log
message: "{{ inputs.scripts_to_run contains 'test' }}"
runIf: "{{ inputs.scripts_to_run contains 'test' }}"

0 comments on commit 3c5074e

Please sign in to comment.