diff --git a/core/src/main/java/io/kestra/core/runners/Worker.java b/core/src/main/java/io/kestra/core/runners/Worker.java index f15724b2233..c16ea1662f9 100644 --- a/core/src/main/java/io/kestra/core/runners/Worker.java +++ b/core/src/main/java/io/kestra/core/runners/Worker.java @@ -6,6 +6,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.kestra.core.exceptions.DeserializationException; +import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.metrics.MetricRegistry; import io.kestra.core.models.Label; import io.kestra.core.models.executions.*; @@ -18,10 +19,7 @@ import io.kestra.core.services.LabelService; import io.kestra.core.services.LogService; import io.kestra.core.services.WorkerGroupService; -import io.kestra.core.utils.Await; -import io.kestra.core.utils.Either; -import io.kestra.core.utils.ExecutorsUtils; -import io.kestra.core.utils.Hashing; +import io.kestra.core.utils.*; import io.kestra.plugin.core.flow.WorkingDirectory; import io.micronaut.context.annotation.Parameter; import io.micronaut.context.event.ApplicationEventPublisher; @@ -170,7 +168,7 @@ public Worker( @PostConstruct void initMetrics() { - String[] tags = this.workerGroup == null ? new String[0] : new String[] { MetricRegistry.TAG_WORKER_GROUP, this.workerGroup }; + String[] tags = this.workerGroup == null ? new String[0] : new String[]{MetricRegistry.TAG_WORKER_GROUP, this.workerGroup}; // create metrics to store thread count, pending jobs and running jobs, so we can have autoscaling easily this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_THREAD_COUNT, numThreads, tags); this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_PENDING_COUNT, pendingJobCount, tags); @@ -354,7 +352,20 @@ private void handleTask(final WorkerTask workerTask) { ); // all tasks will be handled immediately by the worker - WorkerTaskResult workerTaskResult = this.run(currentWorkerTask, false); + WorkerTaskResult workerTaskResult = null; + try { + if (!TruthUtils.isTruthy(runContext.render(currentWorkerTask.getTask().getRunIf()))) { + workerTaskResult = new WorkerTaskResult(currentWorkerTask.getTaskRun().withState(SKIPPED)); + this.workerTaskResultQueue.emit(workerTaskResult); + } else { + workerTaskResult = this.run(currentWorkerTask, false); + } + } catch (IllegalVariableEvaluationException e) { + RunContextLogger contextLogger = runContextLoggerFactory.create(currentWorkerTask.getTaskRun(), currentWorkerTask.getTask()); + contextLogger.logger().error("Failed evaluating runIf: {}", e.getMessage(), e); + } catch (QueueException e) { + log.error("Unable to emit the worker task result for task {} taskrun {}", currentWorkerTask.getTask().getId(), currentWorkerTask.getTaskRun().getId(), e); + } if (workerTaskResult.getTaskRun().getState().isFailed() && !currentWorkerTask.getTask().isAllowFailure()) { break; @@ -507,7 +518,7 @@ private void handleTrigger(WorkerTrigger workerTrigger) { .gauge(MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger, workerGroup))); this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(1); - DefaultRunContext runContext = (DefaultRunContext)workerTrigger.getConditionContext().getRunContext(); + DefaultRunContext runContext = (DefaultRunContext) workerTrigger.getConditionContext().getRunContext(); runContextInitializer.forWorker(runContext, workerTrigger); try { @@ -654,7 +665,7 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) { } catch (QueueException e) { // If there is a QueueException it can either be caused by the message limit or another queue issue. // We fail the task and try to resend it. - TaskRun failed = workerTask.fail(); + TaskRun failed = workerTask.fail(); if (e instanceof MessageTooBigException) { // If it's a message too big, we remove the outputs failed = failed.withOutputs(Collections.emptyMap()); @@ -735,7 +746,7 @@ private void logError(WorkerTrigger workerTrigger, Throwable e) { } private WorkerTask runAttempt(final WorkerTask workerTask) throws QueueException { - DefaultRunContext runContext = runContextInitializer.forWorker((DefaultRunContext) workerTask.getRunContext(), workerTask);; + DefaultRunContext runContext = runContextInitializer.forWorker((DefaultRunContext) workerTask.getRunContext(), workerTask); Logger logger = runContext.logger(); @@ -921,7 +932,6 @@ private boolean waitForTasksCompletion(final Duration timeout) { } ); - // wait for task completion Await.until( () -> { diff --git a/core/src/test/java/io/kestra/core/runners/TaskWithRunIfTest.java b/core/src/test/java/io/kestra/core/runners/TaskWithRunIfTest.java index 26150c1d520..7de4d988865 100644 --- a/core/src/test/java/io/kestra/core/runners/TaskWithRunIfTest.java +++ b/core/src/test/java/io/kestra/core/runners/TaskWithRunIfTest.java @@ -1,15 +1,15 @@ package io.kestra.core.runners; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; - import io.kestra.core.junit.annotations.ExecuteFlow; import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.flows.State; import org.junit.jupiter.api.Test; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + @KestraTest(startRunner = true) class TaskWithRunIfTest { @@ -24,4 +24,13 @@ void runnableTask(Execution execution) { assertThat(execution.findTaskRunsByTaskId("willfailedtheflow").getFirst().getState().getCurrent(), is(State.Type.FAILED)); } + @Test + @ExecuteFlow("flows/valids/task-runif-workingdirectory.yml") + void runIfWorkingDirectory(Execution execution) { + assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); + assertThat(execution.getTaskRunList(), hasSize(3)); + assertThat(execution.findTaskRunsByTaskId("log_orders").getFirst().getState().getCurrent(), is(State.Type.SUCCESS)); + assertThat(execution.findTaskRunsByTaskId("log_test").getFirst().getState().getCurrent(), is(State.Type.SKIPPED)); + } + } diff --git a/core/src/test/resources/flows/valids/task-runif-workingdirectory.yml b/core/src/test/resources/flows/valids/task-runif-workingdirectory.yml new file mode 100644 index 00000000000..4502afa9b0c --- /dev/null +++ b/core/src/test/resources/flows/valids/task-runif-workingdirectory.yml @@ -0,0 +1,27 @@ +id: task-runif-workingdirectory +namespace: io.kestra.tests + +inputs: + - id: scripts_to_run + type: MULTISELECT + required: true + values: + - "orders" + - "carriers" + - "transactions" + defaults: ["orders"] + +tasks: + - id: fileSystem + type: io.kestra.plugin.core.flow.WorkingDirectory + tasks: + + - id: log_orders + type: io.kestra.plugin.core.log.Log + message: "{{ inputs.scripts_to_run contains 'orders' }}" + runIf: "{{ inputs.scripts_to_run contains 'orders' }}" + + - id: log_test + type: io.kestra.plugin.core.log.Log + message: "{{ inputs.scripts_to_run contains 'test' }}" + runIf: "{{ inputs.scripts_to_run contains 'test' }}" \ No newline at end of file