From eb33ebcaa8c82bd4f57ea8d7336415625886b12f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Tue, 13 Aug 2024 10:58:28 +0200 Subject: [PATCH] feat: use FileSerde.writeAll and buffering for improved performances --- .../kestra/plugin/elasticsearch/Scroll.java | 20 ++++++------- .../kestra/plugin/elasticsearch/Search.java | 28 +++++++++---------- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/src/main/java/io/kestra/plugin/elasticsearch/Scroll.java b/src/main/java/io/kestra/plugin/elasticsearch/Scroll.java index 5d72c50..9964aa0 100644 --- a/src/main/java/io/kestra/plugin/elasticsearch/Scroll.java +++ b/src/main/java/io/kestra/plugin/elasticsearch/Scroll.java @@ -18,18 +18,15 @@ import org.opensearch.client.opensearch.core.SearchResponse; import org.opensearch.client.transport.rest_client.RestClientTransport; import org.slf4j.Logger; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; +import java.io.*; import java.net.URI; import java.time.Duration; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; -import static io.kestra.core.utils.Rethrow.throwConsumer; - @SuperBuilder @ToString @EqualsAndHashCode @@ -65,7 +62,7 @@ public Scroll.Output run(RunContext runContext) throws Exception { try ( RestClientTransport transport = this.connection.client(runContext); - OutputStream output = new FileOutputStream(tempFile) + Writer output = new BufferedWriter(new FileWriter(tempFile), FileSerde.BUFFER_SIZE) ) { OpenSearchClient client = new OpenSearchClient(transport); // build request @@ -90,11 +87,10 @@ public Scroll.Output run(RunContext runContext) throws Exception { requestsDuration.addAndGet(searchResponse.took()); requestsCount.incrementAndGet(); - searchResponse.hits().hits() - .forEach(throwConsumer(documentFields -> { - recordsCount.incrementAndGet(); - FileSerde.write(output, documentFields.source()); - })); + Flux hitFlux = Flux.fromIterable(searchResponse.hits().hits()).map(hit -> hit.source()); + Mono longMono = FileSerde.writeAll(output, hitFlux); + + recordsCount.addAndGet(longMono.block()); ScrollRequest searchScrollRequest = new ScrollRequest.Builder() .scrollId(scrollId) diff --git a/src/main/java/io/kestra/plugin/elasticsearch/Search.java b/src/main/java/io/kestra/plugin/elasticsearch/Search.java index d3debde..fb1851a 100644 --- a/src/main/java/io/kestra/plugin/elasticsearch/Search.java +++ b/src/main/java/io/kestra/plugin/elasticsearch/Search.java @@ -18,10 +18,10 @@ import org.opensearch.client.opensearch.core.SearchResponse; import org.opensearch.client.transport.rest_client.RestClientTransport; import org.slf4j.Logger; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; +import java.io.*; import java.net.URI; import java.time.Duration; import java.util.ArrayList; @@ -100,10 +100,10 @@ public Search.Output run(RunContext runContext) throws Exception { break; case STORE: - Pair store = this.store(runContext, searchResponse); + Pair store = this.store(runContext, searchResponse); outputBuilder .uri(store.getLeft()) - .size(store.getRight()); + .size(store.getRight().intValue()); break; } @@ -120,18 +120,18 @@ public Search.Output run(RunContext runContext) throws Exception { } - protected Pair store(RunContext runContext, SearchResponse searchResponse) throws IOException { + protected Pair store(RunContext runContext, SearchResponse searchResponse) throws IOException { File tempFile = runContext.workingDir().createTempFile(".ion").toFile(); - try (var output = new FileOutputStream(tempFile)) { - searchResponse.hits().hits() - .forEach(throwConsumer(docs -> FileSerde.write(output, docs.source()))); - } + try (var output = new BufferedWriter(new FileWriter(tempFile), FileSerde.BUFFER_SIZE)) { + Flux hitFlux = Flux.fromIterable(searchResponse.hits().hits()).map(hit -> hit.source()); + Mono longMono = FileSerde.writeAll(output, hitFlux); - return Pair.of( - runContext.storage().putFile(tempFile), - searchResponse.hits().hits().size() - ); + return Pair.of( + runContext.storage().putFile(tempFile), + longMono.block() + ); + } } protected Pair>, Integer> fetch(SearchResponse searchResponse) {