Skip to content

Commit

Permalink
fix(core): FileSerde use reader and writer to be able to configure th…
Browse files Browse the repository at this point in the history
…e charset
  • Loading branch information
loicmathieu committed Aug 12, 2024
1 parent 76b59eb commit 9911e25
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 90 deletions.
100 changes: 60 additions & 40 deletions core/src/main/java/io/kestra/core/serializers/FileSerde.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package io.kestra.core.serializers;

import static io.kestra.core.utils.Rethrow.throwBiFunction;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
Expand All @@ -13,24 +11,20 @@
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.util.Optional;
import java.io.*;
import java.util.function.Consumer;

public final class FileSerde {
/**
* Advised buffer size for better performance. <br>
* It is advised to wrap all readers and writers with buffered variants before calling any of the methods here.
* We advise a buffer of BUFFER_SIZE which is 32k.
*/
public static final int BUFFER_SIZE = 32 * 1024;

private static final ObjectMapper DEFAULT_OBJECT_MAPPER = JacksonMapper.ofIon();
private static final TypeReference<Object> DEFAULT_TYPE_REFERENCE = new TypeReference<>(){};

/** The size of the buffer used for reading and writing data from streams. */
private static final int BUFFER_SIZE = 32 * 1024;

private FileSerde() {}

public static void write(OutputStream output, Object row) throws IOException {
Expand All @@ -40,6 +34,9 @@ public static void write(OutputStream output, Object row) throws IOException {
}
}

/**
* For performance, it is advised to use the {@link #readAll(Reader)} method instead.
*/
public static Consumer<FluxSink<Object>> reader(BufferedReader input) {
return s -> {
String row;
Expand Down Expand Up @@ -100,47 +97,70 @@ private static <T> T convert(String row, Class<T> cls) throws JsonProcessingExce
return DEFAULT_OBJECT_MAPPER.readValue(row, cls);
}

public static Flux<Object> readAll(InputStream in) throws IOException {
return readAll(DEFAULT_OBJECT_MAPPER, in, DEFAULT_TYPE_REFERENCE);
/**
* For performance, it is advised to wrap the reader inside a BufferedReader, see {@link #BUFFER_SIZE}.
*/
public static Flux<Object> readAll(Reader reader) throws IOException {
return readAll(DEFAULT_OBJECT_MAPPER, reader, DEFAULT_TYPE_REFERENCE);
}

public static <T> Flux<T> readAll(InputStream in, TypeReference<T> type) throws IOException {
return readAll(DEFAULT_OBJECT_MAPPER, in, type);
/**
* For performance, it is advised to wrap the reader inside a BufferedReader, see {@link #BUFFER_SIZE}.
*/
public static <T> Flux<T> readAll(Reader reader, TypeReference<T> type) throws IOException {
return readAll(DEFAULT_OBJECT_MAPPER, reader, type);
}

public static Flux<Object> readAll(ObjectMapper objectMapper, InputStream in) throws IOException {
/**
* For performance, it is advised to wrap the reader inside a BufferedReader, see {@link #BUFFER_SIZE}.
*/
public static Flux<Object> readAll(ObjectMapper objectMapper, Reader in) throws IOException {
return readAll(objectMapper, in, DEFAULT_TYPE_REFERENCE);
}

public static <T> Flux<T> readAll(ObjectMapper objectMapper, InputStream in, TypeReference<T> type) throws IOException {
return Flux.generate(
() -> createMappingIterator(objectMapper, in, type),
throwBiFunction((iterator, sink) -> {
final T value = iterator.hasNextValue() ? iterator.nextValue() : null;
Optional.ofNullable(value).ifPresentOrElse(sink::next, sink::complete);
return iterator;
}),
throwConsumer(MappingIterator::close)
);
/**
* For performance, it is advised to wrap the reader inside a BufferedReader, see {@link #BUFFER_SIZE}.
*/
public static <T> Flux<T> readAll(ObjectMapper objectMapper, Reader reader, TypeReference<T> type) throws IOException {
MappingIterator<T> mappingIterator = createMappingIterator(objectMapper, reader, type);
return Flux.<T>create(sink -> {
mappingIterator.forEachRemaining(t -> sink.next(t));
sink.complete();
}, FluxSink.OverflowStrategy.BUFFER)
.doFinally(throwConsumer(ignored -> mappingIterator.close()));
}

public static <T> Mono<Long> writeAll(OutputStream out, Flux<T> values) throws IOException {
return writeAll(DEFAULT_OBJECT_MAPPER, out, values);
/**
* For performance, it is advised to wrap the writer inside a BufferedWriter, see {@link #BUFFER_SIZE}.
*/
public static <T> Mono<Long> writeAll(Writer writer, Flux<T> values) throws IOException {
return writeAll(DEFAULT_OBJECT_MAPPER, writer, values);
}

public static <T> Mono<Long> writeAll(ObjectMapper objectMapper, OutputStream out, Flux<T> values) throws IOException {
return Mono.using(
() -> createSequenceWriter(objectMapper, out),
throwFunction((writer) -> values.doOnNext(throwConsumer(writer::write)).count()),
throwConsumer(SequenceWriter::close)
);
/**
* For performance, it is advised to wrap the writer inside a BufferedWriter, see {@link #BUFFER_SIZE}.
*/
public static <T> Mono<Long> writeAll(ObjectMapper objectMapper, Writer writer, Flux<T> values) throws IOException {
SequenceWriter seqWriter = createSequenceWriter(objectMapper, writer, new TypeReference<T>() {});
return values
.filter(value -> value != null)
.doOnNext(throwConsumer(value -> seqWriter.write(value)))
.doFinally(throwConsumer(ignored -> seqWriter.flush())) // we should have called close() but it generates an exception, so we flush
.count();
}

private static <T> MappingIterator<T> createMappingIterator(ObjectMapper objectMapper, InputStream in, TypeReference<T> type) throws IOException {
return objectMapper.readerFor(type).readValues(new BufferedInputStream(in, BUFFER_SIZE));
private static <T> MappingIterator<T> createMappingIterator(ObjectMapper objectMapper, Reader reader, TypeReference<T> type) throws IOException {
// See https://github.com/FasterXML/jackson-dataformats-binary/issues/493
// There is a limitation with the MappingIterator that cannot differentiate between an array of things (of whatever shape)
// and a sequence/stream of things (of Array shape).
// To work around that, we need to create a JsonParser and advance to the first token.
try (var parser = objectMapper.createParser(reader)) {
parser.nextToken();
return objectMapper.readerFor(type).readValues(parser);
}
}

private static SequenceWriter createSequenceWriter(ObjectMapper objectMapper, OutputStream out) throws IOException {
return objectMapper.writer().writeValues(new BufferedOutputStream(out, BUFFER_SIZE));
private static <T> SequenceWriter createSequenceWriter(ObjectMapper objectMapper, Writer writer, TypeReference<T> type) throws IOException {
return objectMapper.writerFor(type).writeValues(writer);
}
}
40 changes: 4 additions & 36 deletions core/src/main/java/io/kestra/core/serializers/JacksonMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,7 @@
package io.kestra.core.serializers;

import com.amazon.ion.IonSystem;
import com.amazon.ion.impl._Private_IonBinaryWriterBuilder;
import com.amazon.ion.impl._Private_Utils;
import com.amazon.ion.system.IonBinaryWriterBuilder;
import com.amazon.ion.system.IonReaderBuilder;
import com.amazon.ion.system.IonTextWriterBuilder;
import com.amazon.ion.system.SimpleCatalog;
import com.amazon.ion.system.*;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -39,8 +34,6 @@
import java.util.Map;
import java.util.TimeZone;

import static com.amazon.ion.impl.lite._Private_LiteDomTrampoline.newLiteSystem;

public final class JacksonMapper {
public static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<>() {};
public static final TypeReference<List<Object>> LIST_TYPE_REFERENCE = new TypeReference<>() {};
Expand Down Expand Up @@ -145,34 +138,9 @@ private static ObjectMapper createIonObjectMapper() {
}

private static IonSystem createIonSystem() {
// This code is inspired by the IonSystemBuilder#build() method and the usage of "withWriteTopLevelValuesOnNewLines(true)".
//
// After the integration of the relevant pull request (https://github.com/amazon-ion/ion-java/pull/781),
// it is expected that this code should be replaced with a more simplified version.
//
// The simplified code would look like below:
//
// return IonSystemBuilder.standard()
// .withIonTextWriterBuilder(IonTextWriterBuilder.standard().withWriteTopLevelValuesOnNewLines(true))
// .build();
//
// TODO: Simplify this code once the pull request is integrated.

final var catalog = new SimpleCatalog();

final var textWriterBuilder = IonTextWriterBuilder.standard()
.withCatalog(catalog)
.withCharsetAscii()
.withWriteTopLevelValuesOnNewLines(true); // write line separators on new lines instead of spaces

final var binaryWriterBuilder = IonBinaryWriterBuilder.standard()
.withCatalog(catalog)
.withInitialSymbolTable(_Private_Utils.systemSymtab(1));

final var readerBuilder = IonReaderBuilder.standard()
.withCatalog(catalog);

return newLiteSystem(textWriterBuilder, (_Private_IonBinaryWriterBuilder) binaryWriterBuilder, readerBuilder);
return IonSystemBuilder.standard()
.withIonTextWriterBuilder(IonTextWriterBuilder.standard().withWriteTopLevelValuesOnNewLines(true))
.build();
}

public static Pair<JsonNode, JsonNode> getBiDirectionalDiffs(Object previous, Object current) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.*;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -355,7 +351,7 @@ public Outputs outputs(final RunContext runContext) throws IOException {
return null;
}

try(InputStream is = runContext.storage().getFile(uri)) {
try(Reader is = new BufferedReader(new InputStreamReader(runContext.storage().getFile(uri)))) {
Map<String, URI> outputs = FileSerde
.readAll(is, new TypeReference<Map<String, URI>>() {})
.blockFirst();
Expand Down
16 changes: 8 additions & 8 deletions core/src/test/java/io/kestra/core/serializers/FileSerdeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ void readMax() throws IOException {
void readAll_fromEmptySource() throws IOException {
final Path inputTempFilePath = createTempFile();

final List<Object> outputValues = FileSerde.readAll(Files.newInputStream(inputTempFilePath)).collectList().block();
final List<Object> outputValues = FileSerde.readAll(Files.newBufferedReader(inputTempFilePath)).collectList().block();
assertThat(outputValues, empty());
}

Expand All @@ -99,7 +99,7 @@ void readAll_fromSingleValuedSource() throws IOException {
final List<String> inputLines = List.of("{id:1,value:\"value1\"}");
Files.write(inputTempFilePath, inputLines);

final List<SimpleEntry> outputValues = FileSerde.readAll(Files.newInputStream(inputTempFilePath), new TypeReference<SimpleEntry>() {}).collectList().block();
final List<SimpleEntry> outputValues = FileSerde.readAll(Files.newBufferedReader(inputTempFilePath), new TypeReference<SimpleEntry>() {}).collectList().block();
assertThat(outputValues, hasSize(1));
assertThat(outputValues.getFirst(), equalTo(new SimpleEntry(1, "value1")));
}
Expand All @@ -111,7 +111,7 @@ void readAll_fromMultiValuedSource() throws IOException {
final List<String> inputLines = List.of("{id:1,value:\"value1\"}", "{id:2,value:\"value2\"}", "{id:3,value:\"value3\"}");
Files.write(inputTempFilePath, inputLines);

final List<SimpleEntry> outputValues = FileSerde.readAll(Files.newInputStream(inputTempFilePath), new TypeReference<SimpleEntry>() {}).collectList().block();
final List<SimpleEntry> outputValues = FileSerde.readAll(Files.newBufferedReader(inputTempFilePath), new TypeReference<SimpleEntry>() {}).collectList().block();
assertThat(outputValues, hasSize(3));
assertThat(outputValues.getFirst(), equalTo(new SimpleEntry(1, "value1")));
assertThat(outputValues.get(1), equalTo(new SimpleEntry(2, "value2")));
Expand All @@ -122,7 +122,7 @@ void readAll_fromMultiValuedSource() throws IOException {
void writeAll_fromEmptySource() throws IOException {
final Path outputTempFilePath = createTempFile();

final Long outputCount = FileSerde.writeAll(Files.newOutputStream(outputTempFilePath), Flux.empty()).block();
final Long outputCount = FileSerde.writeAll(Files.newBufferedWriter(outputTempFilePath), Flux.empty()).block();
assertThat(outputCount, is(0L));
}

Expand All @@ -131,7 +131,7 @@ void writeAll_fromSingleValuedSource() throws IOException {
final Path outputTempFilePath = createTempFile();

final List<SimpleEntry> inputValues = List.of(new SimpleEntry(1, "value1"));
final Long outputCount = FileSerde.writeAll(Files.newOutputStream(outputTempFilePath), Flux.fromIterable(inputValues)).block();
final Long outputCount = FileSerde.writeAll(Files.newBufferedWriter(outputTempFilePath), Flux.fromIterable(inputValues)).block();
assertThat(outputCount, is(1L));

final List<String> outputLines = Files.readAllLines(outputTempFilePath);
Expand All @@ -144,7 +144,7 @@ void writeAll_fromMultiValuedSource() throws IOException {
final Path outputTempFilePath = createTempFile();

final List<SimpleEntry> inputValues = List.of(new SimpleEntry(1, "value1"), new SimpleEntry(2, "value2"), new SimpleEntry(3, "value3"));
final Long outputCount = FileSerde.writeAll(Files.newOutputStream(outputTempFilePath), Flux.fromIterable(inputValues)).block();
final Long outputCount = FileSerde.writeAll(Files.newBufferedWriter(outputTempFilePath), Flux.fromIterable(inputValues)).block();
assertThat(outputCount, is(3L));

final List<String> outputLines = Files.readAllLines(outputTempFilePath);
Expand All @@ -162,8 +162,8 @@ void writeAll_fromReadAll() throws IOException {
final List<String> inputLines = List.of("{id:1,value:\"value1\"}", "{id:2,value:\"value2\"}", "{id:3,value:\"value3\"}");
Files.write(inputTempFilePath, inputLines);

final Flux<Object> inputFlux = FileSerde.readAll(Files.newInputStream(inputTempFilePath));
final Long outputCount = FileSerde.writeAll(Files.newOutputStream(outputTempFilePath), inputFlux).block();
final Flux<Object> inputFlux = FileSerde.readAll(Files.newBufferedReader(inputTempFilePath));
final Long outputCount = FileSerde.writeAll(Files.newBufferedWriter(outputTempFilePath), inputFlux).block();
assertThat(outputCount, is(3L));

final List<String> outputLines = Files.readAllLines(outputTempFilePath);
Expand Down

0 comments on commit 9911e25

Please sign in to comment.