Skip to content

Commit

Permalink
chore: switch from the high level driver to the java driver
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Aug 7, 2024
1 parent 745d6cc commit 6c296f1
Show file tree
Hide file tree
Showing 17 changed files with 307 additions and 200 deletions.
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ dependencies {
compileOnly group: "io.kestra", name: "core", version: kestraVersion

// Libs
api (group: 'org.opensearch.client', name: 'opensearch-rest-high-level-client') {
api (group: 'org.opensearch.client', name: 'opensearch-java') {
exclude group: 'org.apache.logging.log4j'
}
api (group: 'org.opensearch.client', name: 'opensearch-rest-client') {
exclude group: 'org.apache.logging.log4j'
}

Expand Down
44 changes: 30 additions & 14 deletions src/main/java/io/kestra/plugin/elasticsearch/AbstractLoad.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.slf4j.Logger;

import java.io.BufferedReader;
Expand Down Expand Up @@ -47,17 +47,18 @@ public abstract class AbstractLoad extends AbstractTask implements RunnableTask<
@Builder.Default
private Integer chunk = 1000;

abstract protected Flux<DocWriteRequest<?>> source(RunContext runContext, BufferedReader inputStream) throws IllegalVariableEvaluationException, IOException;
abstract protected Flux<BulkOperation> source(RunContext runContext, BufferedReader inputStream) throws IllegalVariableEvaluationException, IOException;

@Override
public AbstractLoad.Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
URI from = new URI(runContext.render(this.from));

try (
RestHighLevelClient client = this.connection.client(runContext);
RestClientTransport transport = this.connection.client(runContext);
BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))
) {
OpenSearchClient client = new OpenSearchClient(transport);
AtomicLong count = new AtomicLong();
AtomicLong duration = new AtomicLong();

Expand All @@ -67,16 +68,16 @@ public AbstractLoad.Output run(RunContext runContext) throws Exception {
})
.buffer(this.chunk, this.chunk)
.map(throwFunction(indexRequests -> {
BulkRequest bulkRequest = new BulkRequest();
indexRequests.forEach(bulkRequest::add);
return client.bulk(bulkRequest, RequestOptions.DEFAULT);
var bulkRequest = new BulkRequest.Builder();
bulkRequest.operations(indexRequests);

return client.bulk(bulkRequest.build());
}))
.doOnNext(bulkItemResponse -> {
duration.addAndGet(bulkItemResponse.getTook().nanos());
duration.addAndGet(bulkItemResponse.took());

if (bulkItemResponse.hasFailures()) {
throw new RuntimeException("Indexer failed bulk '" + bulkItemResponse.buildFailureMessage() + "'");
if (bulkItemResponse.errors()) {
throw new RuntimeException("Indexer failed bulk:\n " + logError(bulkItemResponse));
}
});

Expand All @@ -99,6 +100,21 @@ public AbstractLoad.Output run(RunContext runContext) throws Exception {
}
}

private String logError(BulkResponse bulkResponse) {
StringBuilder builder = new StringBuilder();
bulkResponse.items().forEach(
responseItem -> {
if (responseItem.error() != null) {
builder
.append(responseItem.index()).append(": ")
.append(responseItem.status()).append(" - ")
.append(responseItem.error().reason()).append('\n');
}
}
);
return builder.toString();
}

@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
Expand Down
46 changes: 33 additions & 13 deletions src/main/java/io/kestra/plugin/elasticsearch/AbstractSearch.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,34 @@
package io.kestra.plugin.elasticsearch;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.plugin.elasticsearch.model.XContentType;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.json.stream.JsonParser;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.opensearch._types.query_dsl.Query;
import org.opensearch.client.opensearch.core.SearchRequest;
import org.opensearch.client.transport.OpenSearchTransport;

import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.util.List;
import java.util.Map;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public abstract class AbstractSearch extends AbstractTask {
private static ObjectMapper MAPPER = JacksonMapper.ofJson();

@Schema(
title = "The ElasticSearch indices.",
description = "Default to all indices."
Expand All @@ -39,20 +48,22 @@ public abstract class AbstractSearch extends AbstractTask {
)
@PluginProperty
@Builder.Default
private XContentType contentType = XContentType.JSON;
private XContentType contentType = XContentType.JSON; //FIXME

protected SearchRequest request(RunContext runContext) throws IllegalVariableEvaluationException, IOException {
SearchRequest request = new SearchRequest();
protected SearchRequest.Builder request(RunContext runContext, OpenSearchTransport transport) throws IllegalVariableEvaluationException, IOException {
SearchRequest.Builder request;

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
try (XContentParser xContentParser = ElasticsearchService.toXContentParser(runContext, this.request, this.contentType)) {
searchSourceBuilder.parseXContent(xContentParser);
if (this.request instanceof String requestStr) {
request = parseQuery(transport, requestStr).toBuilder();
} else if (this.request instanceof Map requestMap) {
String requestStr = MAPPER.writeValueAsString(requestMap);
request = parseQuery(transport, requestStr).toBuilder();
} else {
throw new IllegalArgumentException("The `request` property must be a String or an Object");
}

request.source(searchSourceBuilder);

if (this.indexes != null) {
request.indices(runContext.render(this.indexes).toArray(String[]::new));
request.index(runContext.render(this.indexes));
}

if (this.routing != null) {
Expand All @@ -61,4 +72,13 @@ protected SearchRequest request(RunContext runContext) throws IllegalVariableEva

return request;
}

// Use the trick found here: https://forum.opensearch.org/t/how-to-create-index-using-json-file/11137
private SearchRequest parseQuery(OpenSearchTransport transport, String query) throws IOException {
try (Reader reader = new StringReader(query)) {
JsonpMapper mapper = transport.jsonpMapper();
JsonParser parser = mapper.jsonProvider().createParser(reader);
return SearchRequest._DESERIALIZER.deserialize(parser, mapper);
}
}
}
56 changes: 33 additions & 23 deletions src/main/java/io/kestra/plugin/elasticsearch/Bulk.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.kestra.plugin.elasticsearch;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
Expand All @@ -11,11 +13,10 @@
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.client.opensearch.core.bulk.DeleteOperation;
import org.opensearch.client.opensearch.core.bulk.IndexOperation;
import org.opensearch.client.opensearch.core.bulk.UpdateOperation;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

Expand Down Expand Up @@ -47,58 +48,67 @@
}
)
public class Bulk extends AbstractLoad implements RunnableTask<Bulk.Output> {
private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson();

@Override
protected Flux<DocWriteRequest<?>> source(RunContext runContext, BufferedReader inputStream) throws IOException {
protected Flux<BulkOperation> source(RunContext runContext, BufferedReader inputStream) throws IOException {
return Flux
.create(this.esNdJSonReader(inputStream), FluxSink.OverflowStrategy.BUFFER);
}

@SuppressWarnings("unchecked")
public Consumer<FluxSink<DocWriteRequest<?>>> esNdJSonReader(BufferedReader input) throws IOException {
public Consumer<FluxSink<BulkOperation>> esNdJSonReader(BufferedReader input) throws IOException {
return throwConsumer(s -> {
String row;

while ((row = input.readLine()) != null) {
Map.Entry<String, Object> operation = JacksonMapper.toMap(row).entrySet().iterator().next();
Map<String, Object> value = (Map<String, Object>) operation.getValue();

DocWriteRequest<?> docWriteRequest;
var bulkOperation = new BulkOperation.Builder();

switch (operation.getKey()) {
case "index":
docWriteRequest = new IndexRequest()
var indexOperation = new IndexOperation.Builder<>()
.id((String) value.get("_id"))
.source(input.readLine(), XContentType.JSON);
.index((String) value.get("_index"))
.document(parseline(input.readLine()));
bulkOperation.index(indexOperation.build());
break;
case "create":
docWriteRequest = new IndexRequest()
var createOperation = new IndexOperation.Builder<>()
.id((String) value.get("_id"))
.opType(DocWriteRequest.OpType.CREATE)
.source(input.readLine(), XContentType.JSON);
.index((String) value.get("_index"))
.ifPrimaryTerm(0L) //FIXME opType
.document(parseline(input.readLine()));
bulkOperation.index(createOperation.build());
break;
case "update":
docWriteRequest = new UpdateRequest()
var updateOperation = new UpdateOperation.Builder<>()
.id((String) value.get("_id"))
.index((String) value.get("_index"))
.docAsUpsert(true)
.doc(input.readLine(), XContentType.JSON);
.document(parseline(input.readLine()));
bulkOperation.update(updateOperation.build());
break;
case "delete":
docWriteRequest = new DeleteRequest()
.id((String) value.get("_id"));
var deleteOperation = new DeleteOperation.Builder()
.id((String) value.get("_id"))
.index((String) value.get("_index"));
bulkOperation.delete(deleteOperation.build());
break;
default:
throw new IllegalArgumentException("Invalid bulk request type on '" + row + "'");
}


if (value.containsKey("_index")) {
docWriteRequest.index((String) value.get("_index"));
}

s.next(docWriteRequest);
s.next(bulkOperation.build());
}

s.complete();
});
}

private Map<?,?> parseline(String line) throws JsonProcessingException {
return OBJECT_MAPPER.readValue(line, JacksonMapper.MAP_TYPE_REFERENCE);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.kestra.plugin.elasticsearch;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand All @@ -21,18 +23,21 @@
import org.apache.http.ssl.SSLContextBuilder;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;

import java.net.URI;
import java.util.List;
import javax.net.ssl.SSLContext;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.transport.rest_client.RestClientTransport;

@SuperBuilder
@NoArgsConstructor
@Getter
public class ElasticsearchConnection {
private static final ObjectMapper MAPPER = JacksonMapper.ofJson(false);

@Schema(
title = "List of HTTP ElasticSearch servers.",
description = "Must be an URI like `https://elasticsearch.com:9200` with scheme and port."
Expand Down Expand Up @@ -69,14 +74,14 @@ public class ElasticsearchConnection {
@Schema(
title = "Whether the REST client should return any response containing at leas one warning header as a failure."
)
@PluginProperty(dynamic = false)
@PluginProperty
private Boolean strictDeprecationMode;

@Schema(
title = "Trust all SSL CA certificates.",
description = "Use this if the server is using a self signed SSL certificate."
)
@PluginProperty(dynamic = false)
@PluginProperty
private Boolean trustAllSsl;

@SuperBuilder
Expand All @@ -96,7 +101,7 @@ public static class BasicAuth {
private String password;
}

RestHighLevelClient client(RunContext runContext) throws IllegalVariableEvaluationException {
RestClientTransport client(RunContext runContext) throws IllegalVariableEvaluationException {
RestClientBuilder builder = RestClient
.builder(this.httpHosts(runContext))
.setHttpClientConfigCallback(httpClientBuilder -> {
Expand All @@ -116,7 +121,7 @@ RestHighLevelClient client(RunContext runContext) throws IllegalVariableEvaluati
builder.setStrictDeprecationMode(this.getStrictDeprecationMode());
}

return new RestHighLevelClient(builder);
return new RestClientTransport(builder.build(), new JacksonJsonpMapper(MAPPER));
}

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,8 @@
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.*;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.search.SearchModule;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;

public abstract class ElasticsearchService {
Expand All @@ -24,14 +18,4 @@ public static String toBody(RunContext runContext, Object value) throws IllegalV
throw new IllegalVariableEvaluationException("Invalid value type '" + value.getClass() + "'");
}
}

public static XContentParser toXContentParser(RunContext runContext, Object value, XContentType contentType) throws IllegalVariableEvaluationException, IOException {
String json = toBody(runContext, value);

SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList());

return contentType
.xContent()
.createParser(new NamedXContentRegistry(searchModule.getNamedXContents()), LoggingDeprecationHandler.THROW_UNSUPPORTED_OPERATION, json);
}
}
Loading

0 comments on commit 6c296f1

Please sign in to comment.