Skip to content

Commit

Permalink
feat: use the new FileSerde.readAll() method that improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Aug 13, 2024
1 parent 985ab91 commit c141d74
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kestra.core.models.executions.metrics.Timer;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
Expand Down Expand Up @@ -56,7 +57,7 @@ public AbstractLoad.Output run(RunContext runContext) throws Exception {

try (
RestClientTransport transport = this.connection.client(runContext);
BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))
BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)), FileSerde.BUFFER_SIZE)
) {
OpenSearchClient client = new OpenSearchClient(transport);
AtomicLong count = new AtomicLong();
Expand Down
7 changes: 3 additions & 4 deletions src/main/java/io/kestra/plugin/elasticsearch/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
import lombok.experimental.SuperBuilder;

import java.io.BufferedReader;
import java.io.IOException;
import java.util.Map;
import jakarta.validation.constraints.NotNull;
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.client.opensearch.core.bulk.IndexOperation;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import static io.kestra.core.utils.Rethrow.throwFunction;

Expand Down Expand Up @@ -73,9 +73,8 @@ public class Load extends AbstractLoad implements RunnableTask<Load.Output> {

@SuppressWarnings("unchecked")
@Override
protected Flux<BulkOperation> source(RunContext runContext, BufferedReader inputStream) throws IllegalVariableEvaluationException {
return Flux
.create(FileSerde.reader(inputStream), FluxSink.OverflowStrategy.BUFFER)
protected Flux<BulkOperation> source(RunContext runContext, BufferedReader inputStream) throws IllegalVariableEvaluationException, IOException {
return FileSerde.readAll(inputStream)
.map(throwFunction(o -> {
Map<String, ?> values = (Map<String, ?>) o;

Expand Down

0 comments on commit c141d74

Please sign in to comment.