Skip to content

Commit

Permalink
Merge branch 'develop' into feat/async-routes
Browse files Browse the repository at this point in the history
  • Loading branch information
elevatebart committed Jan 9, 2025
2 parents 23a10ed + a994120 commit 1a11eed
Show file tree
Hide file tree
Showing 102 changed files with 2,431 additions and 697 deletions.
1 change: 1 addition & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,5 @@ dependencies {

testImplementation "org.testcontainers:testcontainers:1.20.3"
testImplementation "org.testcontainers:junit-jupiter:1.20.3"
testImplementation "org.bouncycastle:bcpkix-jdk18on:1.78.1"
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public List<Document> generate(RegisteredPlugin registeredPlugin) throws Excepti
result.addAll(this.generate(registeredPlugin, registeredPlugin.getTasks(), Task.class, "tasks"));
result.addAll(this.generate(registeredPlugin, registeredPlugin.getTriggers(), AbstractTrigger.class, "triggers"));
result.addAll(this.generate(registeredPlugin, registeredPlugin.getConditions(), Condition.class, "conditions"));
result.addAll(this.generate(registeredPlugin, registeredPlugin.getTaskRunners(), TaskRunner.class, "task-runners"));
//noinspection unchecked
result.addAll(this.generate(registeredPlugin, registeredPlugin.getTaskRunners(), (Class) TaskRunner.class, "task-runners"));
result.addAll(this.generate(registeredPlugin, registeredPlugin.getLogShippers(), LogShipper.class, "log-shipper"));

result.addAll(guides(registeredPlugin));
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/io/kestra/core/docs/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class Plugin {
private List<String> aliases;
private List<String> apps;
private List<String> appBlocks;
private List<String> charts;
private List<String> dataFilters;
private List<PluginSubGroup.PluginCategory> categories;
private String subGroup;

Expand Down Expand Up @@ -85,6 +87,8 @@ public static Plugin of(RegisteredPlugin registeredPlugin, @Nullable String subg
plugin.taskRunners = filterAndGetClassName(registeredPlugin.getTaskRunners()).stream().filter(c -> subgroup == null || c.startsWith(subgroup)).toList();
plugin.apps = filterAndGetClassName(registeredPlugin.getApps()).stream().filter(c -> subgroup == null || c.startsWith(subgroup)).toList();
plugin.appBlocks = filterAndGetClassName(registeredPlugin.getAppBlocks()).stream().filter(c -> subgroup == null || c.startsWith(subgroup)).toList();
plugin.charts = filterAndGetClassName(registeredPlugin.getCharts()).stream().filter(c -> subgroup == null || c.startsWith(subgroup)).toList();
plugin.dataFilters = filterAndGetClassName(registeredPlugin.getDataFilters()).stream().filter(c -> subgroup == null || c.startsWith(subgroup)).toList();

return plugin;
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/io/kestra/core/http/client/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.function.Consumer;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLHandshakeException;

@Slf4j
public class HttpClient implements Closeable {
Expand Down Expand Up @@ -252,6 +253,10 @@ private <T> HttpResponse<T> request(
} catch (SocketException e) {
throw new HttpClientRequestException(e.getMessage(), request, e);
} catch (IOException e) {
if (e instanceof SSLHandshakeException) {
throw new HttpClientRequestException(e.getMessage(), request, e);
}

if (e.getCause() instanceof HttpClientException httpClientException) {
throw httpClientException;
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/io/kestra/core/models/Label.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public record Label(@NotNull String key, @NotNull String value) {
public static final String APP = SYSTEM_PREFIX + "app";
public static final String READ_ONLY = SYSTEM_PREFIX + "readOnly";
public static final String RESTARTED = SYSTEM_PREFIX + "restarted";
public static final String REPLAY = SYSTEM_PREFIX + "replay";
public static final String REPLAYED = SYSTEM_PREFIX + "replayed";

/**
* Static helper method for converting a map to a list of labels.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected static Map<String, Long> taskRunnerTypeCount(List<Flow> allFlows) {
})
.map(t -> {
try {
TaskRunner taskRunner = (TaskRunner) t.getClass().getMethod("getTaskRunner").invoke(t);
TaskRunner<?> taskRunner = (TaskRunner<?>) t.getClass().getMethod("getTaskRunner").invoke(t);
return taskRunner != null ? taskRunner.getType() : null;
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.experimental.SuperBuilder;

import java.util.Collections;
Expand All @@ -29,6 +30,7 @@ public abstract class DataFilter<F extends Enum<F>, C extends ColumnDescriptor<F

private Map<String, C> columns;

@Setter
private List<AbstractFilter<F>> where;

private List<OrderBy> orderBy;
Expand All @@ -38,4 +40,7 @@ public Set<F> aggregationForbiddenFields() {
}

public abstract Class<? extends QueryBuilderInterface<F>> repositoryClass();

public abstract void setGlobalFilter(GlobalFilter globalFilter);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.kestra.core.models.dashboards;

import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.experimental.SuperBuilder;

import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;

@Getter
@Setter
@NoArgsConstructor
@SuperBuilder
public class GlobalFilter {
private ZonedDateTime startDate;
private ZonedDateTime endDate;
private Integer pageSize;
private Integer pageNumber;
private String namespace;
private Map<String, String> labels;
}
30 changes: 6 additions & 24 deletions core/src/main/java/io/kestra/core/models/executions/LogEntry.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package io.kestra.core.models.executions;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonView;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.Flow;
Expand All @@ -12,7 +9,6 @@
import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.Hidden;
import lombok.Builder;
import lombok.SneakyThrows;
import lombok.Value;
import org.slf4j.event.Level;

Expand All @@ -28,46 +24,36 @@
public class LogEntry implements DeletedInterface, TenantInterface {
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
@JsonView(MessageView.class)
String tenantId;

@NotNull
@JsonView(MessageView.class)
String namespace;

@NotNull
@JsonView(MessageView.class)
String flowId;

@Nullable
@JsonView(MessageView.class)
String taskId;

@Nullable
@JsonView(MessageView.class)
String executionId;

@Nullable
@JsonView(MessageView.class)
String taskRunId;

@Nullable
@JsonInclude
@JsonView(MessageView.class)
Integer attemptNumber;

@Nullable
@JsonView(MessageView.class)
String triggerId;

Instant timestamp;

Level level;

@JsonView(MessageView.class)
String thread;

@JsonView(MessageView.class)
String message;

@NotNull
Expand Down Expand Up @@ -142,16 +128,12 @@ public Map<String, String> toMap() {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@SneakyThrows
public String toJson(){
JsonMapper mapper = JsonMapper.builder()
.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false)
.serializationInclusion(JsonInclude.Include.ALWAYS)
.build();
return mapper.writerWithView(MessageView.class)
.writeValueAsString(this);
public Map<String, String> toLogMap() {
Map<String, String> map = this.toMap();
map.put("attemptNumber", this.attemptNumber != null ? String.valueOf(this.attemptNumber) : null);
map.put("thread", this.thread);
map.put("message", this.message);
return map;
}

public static class MessageView{}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ public class FlowForExecution extends AbstractFlow {
@NotEmpty
List<TaskForExecution> tasks;

@Valid
List<TaskForExecution> errors;

@Valid
List<AbstractTriggerForExecution> triggers;

Expand All @@ -32,6 +35,7 @@ public static FlowForExecution of(Flow flow) {
.revision(flow.getRevision())
.inputs(flow.getInputs())
.tasks(flow.getTasks().stream().map(TaskForExecution::of).toList())
.errors(ListUtils.emptyOnNull(flow.getErrors()).stream().map(TaskForExecution::of).toList())
.triggers(ListUtils.emptyOnNull(flow.getTriggers()).stream().map(AbstractTriggerForExecution::of).toList())
.disabled(flow.isDisabled())
.deleted(flow.isDeleted())
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/java/io/kestra/core/models/property/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;
import lombok.*;

import java.io.IOException;
import java.io.Serial;
Expand Down Expand Up @@ -350,6 +347,11 @@ String getExpression() {
return this.expression;
}

// used only by the value extractor
T getValue() {
return value;
}

static class PropertyDeserializer extends StdDeserializer<Property<?>> {
@Serial
private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.kestra.core.models.property;

import io.micronaut.context.annotation.Context;
import jakarta.validation.valueextraction.ExtractedValue;
import jakarta.validation.valueextraction.ValueExtractor;

/**
* Jakarta Bean Validation value extractor for a Property.<br>
*
* This is used by the @{@link io.kestra.core.validations.factory.CustomValidatorFactoryProvider}.
*/
@Context
public class PropertyValueExtractor implements ValueExtractor<Property<@ExtractedValue ?>> {

@Override
public void extractValues(Property<?> originalValue, ValueReceiver receiver) {
// this will disable validation at save time but enable it at runtime when the value would be populated
receiver.value( null, originalValue.getValue());
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package io.kestra.core.models.tasks.logs;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.KeyValue;
import io.opentelemetry.api.common.Value;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.logs.data.Body;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.resources.Resource;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
Expand All @@ -26,11 +29,13 @@ public class LogRecord implements LogRecordData {
SpanContext spanContext;
Severity severity;
String severityText;
Value<String> bodyValue;
Attributes attributes;
int totalAttributeCount;
Value<List<KeyValue>> bodyValue;

@JsonIgnore
public Body getBody(){
throw new UnsupportedOperationException();
}

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
package io.kestra.core.models.tasks.logs;

import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.runners.TaskRunnerDetailResult;
import io.kestra.core.runners.RunContext;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import reactor.core.publisher.Flux;

@Plugin
public abstract class LogShipper implements io.kestra.core.models.Plugin {
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
public abstract class LogShipper<T extends Output> implements io.kestra.core.models.Plugin {
@NotNull
@NotBlank
@Pattern(regexp="^[a-zA-Z0-9][a-zA-Z0-9_-]*")
protected String id;

@NotBlank
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;

public abstract void sendLogs(RunContext runContext, Flux<LogRecord> logRecord);
public abstract T sendLogs(RunContext runContext, Flux<LogRecord> logRecord) throws Exception;

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static List<String> replaceInternalStorage(
List<String> commands,
boolean replaceWithRelativePath
) throws IOException, IllegalVariableEvaluationException {
return commands
return ListUtils.emptyOnNull(commands)
.stream()
.map(throwFunction(c -> runContext.render(c, additionalVars)))
.map(throwFunction(c -> ScriptService.replaceInternalStorage(runContext, c, replaceWithRelativePath)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
@Getter
@NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public abstract class TaskRunner implements Plugin, WorkerJobLifecycle {
public abstract class TaskRunner<T extends TaskRunnerDetailResult> implements Plugin, WorkerJobLifecycle {
@NotBlank
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;
Expand Down Expand Up @@ -62,7 +62,7 @@ public abstract class TaskRunner implements Plugin, WorkerJobLifecycle {
* For remote task runner (like Kubernetes or in a cloud provider), <code>filesToUpload</code> must be used to upload input and namespace files to the runner,
* and <code>filesToDownload</code> must be used to download output files from the runner.
*/
public abstract RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToDownload) throws Exception;
public abstract TaskRunnerResult<T> run(RunContext runContext, TaskCommands taskCommands, List<String> filesToDownload) throws Exception;

public Map<String, Object> additionalVars(RunContext runContext, TaskCommands taskCommands) throws IllegalVariableEvaluationException {
if (this.additionalVars == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.kestra.core.models.tasks.runners;

import io.kestra.core.models.tasks.Output;
import lombok.Getter;
import lombok.experimental.SuperBuilder;

@Getter
@SuperBuilder
public class TaskRunnerDetailResult implements Output {
}
Loading

0 comments on commit 1a11eed

Please sign in to comment.