Skip to content

Commit

Permalink
feat(core): add log record serialization (#6683)
Browse files Browse the repository at this point in the history
* feat(core): add log record serialization

* feat(core): add serialization to log record

---------

Co-authored-by: nKwiatkowski <[email protected]>
  • Loading branch information
nkwiatkowski and nKwiatkowski authored Jan 8, 2025
1 parent 6f43ab1 commit 132d454
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 31 deletions.
30 changes: 6 additions & 24 deletions core/src/main/java/io/kestra/core/models/executions/LogEntry.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package io.kestra.core.models.executions;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonView;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.Flow;
Expand All @@ -12,7 +9,6 @@
import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.Hidden;
import lombok.Builder;
import lombok.SneakyThrows;
import lombok.Value;
import org.slf4j.event.Level;

Expand All @@ -28,46 +24,36 @@
public class LogEntry implements DeletedInterface, TenantInterface {
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
@JsonView(MessageView.class)
String tenantId;

@NotNull
@JsonView(MessageView.class)
String namespace;

@NotNull
@JsonView(MessageView.class)
String flowId;

@Nullable
@JsonView(MessageView.class)
String taskId;

@Nullable
@JsonView(MessageView.class)
String executionId;

@Nullable
@JsonView(MessageView.class)
String taskRunId;

@Nullable
@JsonInclude
@JsonView(MessageView.class)
Integer attemptNumber;

@Nullable
@JsonView(MessageView.class)
String triggerId;

Instant timestamp;

Level level;

@JsonView(MessageView.class)
String thread;

@JsonView(MessageView.class)
String message;

@NotNull
Expand Down Expand Up @@ -142,16 +128,12 @@ public Map<String, String> toMap() {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@SneakyThrows
public String toJson(){
JsonMapper mapper = JsonMapper.builder()
.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false)
.serializationInclusion(JsonInclude.Include.ALWAYS)
.build();
return mapper.writerWithView(MessageView.class)
.writeValueAsString(this);
public Map<String, String> toLogMap() {
Map<String, String> map = this.toMap();
map.put("attemptNumber", this.attemptNumber != null ? String.valueOf(this.attemptNumber) : null);
map.put("thread", this.thread);
map.put("message", this.message);
return map;
}

public static class MessageView{}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package io.kestra.core.models.tasks.logs;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.KeyValue;
import io.opentelemetry.api.common.Value;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.logs.data.Body;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.resources.Resource;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
Expand All @@ -26,11 +29,13 @@ public class LogRecord implements LogRecordData {
SpanContext spanContext;
Severity severity;
String severityText;
Value<String> bodyValue;
Attributes attributes;
int totalAttributeCount;
Value<List<KeyValue>> bodyValue;

@JsonIgnore
public Body getBody(){
throw new UnsupportedOperationException();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import java.util.Map;
import org.junit.jupiter.api.Test;

public class LogEntryTest {

@Test
public void should_format_to_json_full_log_entry(){
public void should_format_to_log_map(){
LogEntry logEntry = LogEntry.builder()
.tenantId("tenantId")
.namespace("namespace")
Expand All @@ -21,15 +22,24 @@ public void should_format_to_json_full_log_entry(){
.thread("thread")
.message("message")
.build();
assertThat(logEntry.toJson(), is("{\"tenantId\":\"tenantId\",\"namespace\":\"namespace\",\"flowId\":\"flowId\",\"taskId\":\"taskId\",\"executionId\":\"executionId\",\"taskRunId\":\"taskRunId\",\"attemptNumber\":1,\"triggerId\":\"triggerId\",\"thread\":\"thread\",\"message\":\"message\"}"));
Map<String, String> logMap = logEntry.toLogMap();
assertThat(logMap.get("tenantId"), is("tenantId"));
assertThat(logMap.get("namespace"), is("namespace"));
assertThat(logMap.get("flowId"), is("flowId"));
assertThat(logMap.get("taskId"), is("taskId"));
assertThat(logMap.get("executionId"), is("executionId"));
assertThat(logMap.get("taskRunId"), is("taskRunId"));
assertThat(logMap.get("attemptNumber"), is("1"));
assertThat(logMap.get("triggerId"), is("triggerId"));
assertThat(logMap.get("thread"), is("thread"));
assertThat(logMap.get("message"), is("message"));
}

@Test
public void should_format_to_json_empty_log_entry(){
public void should_format_to_log_map_empty_log_entry(){
LogEntry logEntry = LogEntry.builder().build();
assertThat(logEntry.toJson(), is("{\"tenantId\":null,\"namespace\":null,\"flowId\":null,\"taskId\":null,\"executionId\":null,\"taskRunId\":null,\"attemptNumber\":null,\"triggerId\":null,\"thread\":null,\"message\":null}"));
Map<String, String> map = logEntry.toMap();
assertThat(map.size(), is(0));
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.kestra.core.models.tasks.logs;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.executions.LogEntry;
import io.opentelemetry.api.common.Value;
import io.opentelemetry.api.logs.Severity;
import java.time.Instant;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.slf4j.event.Level;

public class LogRecordTest {

@Test
void should_convert_log_record_to_string() throws JsonProcessingException {
LogRecord logRecord = LogRecord.builder()
.timestampEpochNanos(1322907330123456789L)
.observedTimestampEpochNanos(1322907330123456789L)
.severity(Severity.ERROR)
.severityText("ERROR")
.bodyValue(Value.of(
LogEntry.builder()
.tenantId("tenantId")
.namespace("namespace")
.flowId("flowId")
.taskId("taskId")
.executionId("executionId")
.taskRunId("taskRunId")
.attemptNumber(1)
.triggerId("triggerId")
.timestamp(Instant.parse("2011-12-03T10:15:30.123456789Z"))
.level(Level.INFO)
.thread("thread")
.message("message")
.build()
.toLogMap()
.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> Value.of(entry.getValue())))
))
.build();
String log = new ObjectMapper().writeValueAsString(logRecord);
assertThat(log, is("{\"resource\":null,\"instrumentationScopeInfo\":null,\"timestampEpochNanos\":1322907330123456789,\"observedTimestampEpochNanos\":1322907330123456789,\"spanContext\":null,\"severity\":\"ERROR\",\"severityText\":\"ERROR\",\"attributes\":null,\"totalAttributeCount\":0,\"bodyValue\":{\"value\":[{\"key\":\"taskRunId\",\"value\":{\"value\":\"taskRunId\",\"type\":\"STRING\"}},{\"key\":\"attemptNumber\",\"value\":{\"value\":\"1\",\"type\":\"STRING\"}},{\"key\":\"executionId\",\"value\":{\"value\":\"executionId\",\"type\":\"STRING\"}},{\"key\":\"triggerId\",\"value\":{\"value\":\"triggerId\",\"type\":\"STRING\"}},{\"key\":\"tenantId\",\"value\":{\"value\":\"tenantId\",\"type\":\"STRING\"}},{\"key\":\"namespace\",\"value\":{\"value\":\"namespace\",\"type\":\"STRING\"}},{\"key\":\"thread\",\"value\":{\"value\":\"thread\",\"type\":\"STRING\"}},{\"key\":\"message\",\"value\":{\"value\":\"message\",\"type\":\"STRING\"}},{\"key\":\"flowId\",\"value\":{\"value\":\"flowId\",\"type\":\"STRING\"}},{\"key\":\"taskId\",\"value\":{\"value\":\"taskId\",\"type\":\"STRING\"}}],\"type\":\"KEY_VALUE_LIST\"}}"));
}
}

0 comments on commit 132d454

Please sign in to comment.