diff --git a/src/main/java/io/kestra/plugin/elasticsearch/AbstractLoad.java b/src/main/java/io/kestra/plugin/elasticsearch/AbstractLoad.java index d7f4c68..1d8fd1b 100644 --- a/src/main/java/io/kestra/plugin/elasticsearch/AbstractLoad.java +++ b/src/main/java/io/kestra/plugin/elasticsearch/AbstractLoad.java @@ -6,6 +6,7 @@ import io.kestra.core.models.executions.metrics.Timer; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; +import io.kestra.core.serializers.FileSerde; import io.swagger.v3.oas.annotations.media.Schema; import lombok.*; import lombok.experimental.SuperBuilder; @@ -56,7 +57,7 @@ public AbstractLoad.Output run(RunContext runContext) throws Exception { try ( RestClientTransport transport = this.connection.client(runContext); - BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from))) + BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)), FileSerde.BUFFER_SIZE) ) { OpenSearchClient client = new OpenSearchClient(transport); AtomicLong count = new AtomicLong(); diff --git a/src/main/java/io/kestra/plugin/elasticsearch/Load.java b/src/main/java/io/kestra/plugin/elasticsearch/Load.java index c0f2712..869ddb8 100644 --- a/src/main/java/io/kestra/plugin/elasticsearch/Load.java +++ b/src/main/java/io/kestra/plugin/elasticsearch/Load.java @@ -13,12 +13,12 @@ import lombok.experimental.SuperBuilder; import java.io.BufferedReader; +import java.io.IOException; import java.util.Map; import jakarta.validation.constraints.NotNull; import org.opensearch.client.opensearch.core.bulk.BulkOperation; import org.opensearch.client.opensearch.core.bulk.IndexOperation; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import static io.kestra.core.utils.Rethrow.throwFunction; @@ -73,9 +73,8 @@ public class Load extends AbstractLoad implements RunnableTask { @SuppressWarnings("unchecked") @Override - protected Flux source(RunContext runContext, BufferedReader inputStream) throws IllegalVariableEvaluationException { - return Flux - .create(FileSerde.reader(inputStream), FluxSink.OverflowStrategy.BUFFER) + protected Flux source(RunContext runContext, BufferedReader inputStream) throws IllegalVariableEvaluationException, IOException { + return FileSerde.readAll(inputStream) .map(throwFunction(o -> { Map values = (Map) o;