From 50d387afbea570b486bc4162cb574f8adc697a0d Mon Sep 17 00:00:00 2001 From: Francois Prunayre Date: Thu, 26 Oct 2023 17:26:20 +0200 Subject: [PATCH] Update to Elasticsearch 8 / WFS indexing draft. --- pom.xml | 2 +- .../worker/EsWFSFeatureIndexer.java | 79 ++++++++++++------- 2 files changed, 51 insertions(+), 30 deletions(-) diff --git a/pom.xml b/pom.xml index bba31e71977..e54bc26622a 100644 --- a/pom.xml +++ b/pom.xml @@ -1525,7 +1525,7 @@ 8.10.4 linux-x86_64 tar.gz - https + http 9200 localhost ${es.protocol}://${es.host}:${es.port} diff --git a/workers/wfsfeature-harvester/src/main/java/org/fao/geonet/harvester/wfsfeatures/worker/EsWFSFeatureIndexer.java b/workers/wfsfeature-harvester/src/main/java/org/fao/geonet/harvester/wfsfeatures/worker/EsWFSFeatureIndexer.java index d870278a445..4f70a8520fd 100644 --- a/workers/wfsfeature-harvester/src/main/java/org/fao/geonet/harvester/wfsfeatures/worker/EsWFSFeatureIndexer.java +++ b/workers/wfsfeature-harvester/src/main/java/org/fao/geonet/harvester/wfsfeatures/worker/EsWFSFeatureIndexer.java @@ -23,11 +23,17 @@ package org.fao.geonet.harvester.wfsfeatures.worker; +import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester; +import co.elastic.clients.elasticsearch._helpers.bulk.BulkListener; import co.elastic.clients.elasticsearch._types.Result; import co.elastic.clients.elasticsearch.core.BulkRequest; import co.elastic.clients.elasticsearch.core.BulkResponse; import co.elastic.clients.elasticsearch.core.IndexRequest; import co.elastic.clients.elasticsearch.core.IndexResponse; +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; +import co.elastic.clients.json.JsonData; +import co.elastic.clients.json.JsonpMapper; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -35,6 +41,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; +import jakarta.json.spi.JsonProvider; import org.apache.camel.Exchange; import org.apache.camel.spring.SpringCamelContext; import org.apache.jcs.access.exception.InvalidArgumentException; @@ -65,6 +72,7 @@ import org.springframework.beans.factory.annotation.Value; import java.io.IOException; +import java.io.StringReader; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; @@ -273,13 +281,12 @@ public CompletableFuture indexFeatures(Exchange exchange) throws Exception LOGGER.error(msg); throw new RuntimeException(msg); } + final Phaser phaser = new Phaser(); + + BulkResutHandler brh = new AsyncBulkResutHandler(phaser, typeName, url, nbOfFeatures, report, state.getParameters().getMetadataUuid()); try { nbOfFeatures = 0; - - final Phaser phaser = new Phaser(); - BulkResutHandler brh = new AsyncBulkResutHandler(phaser, typeName, url, nbOfFeatures, report, state.getParameters().getMetadataUuid()); - long begin = System.currentTimeMillis(); String epsg = "urn:ogc:def:crs:OGC:1.3:CRS84"; @@ -468,6 +475,7 @@ public CompletableFuture indexFeatures(Exchange exchange) throws Exception throw e; } finally { report.saveHarvesterReport(); + brh.close(); future.complete(null); } @@ -590,11 +598,10 @@ abstract class BulkResutHandler { private Report report; private String metadataUuid; protected long begin; - protected BulkRequest bulk; + protected BulkIngester bulk; protected int bulkSize; protected int failuresCount; - // TODO: ES 8 - //ActionListener listener; + BulkListener listener; public BulkResutHandler(Phaser phaser, String typeName, String url, int firstFeatureIndex, Report report, String metadataUuid) { this.phaser = phaser; @@ -604,16 +611,19 @@ public BulkResutHandler(Phaser phaser, String typeName, String url, int firstFea this.report = report; this.metadataUuid = metadataUuid; - this.bulk = new BulkRequest.Builder().index(index).build(); + this.bulkSize = 0; this.failuresCount = 0; LOGGER.debug(" {} - Indexing bulk (size {}) starting at {} ...", typeName, featureCommitInterval, firstFeatureIndex); - // TODO: ES 8 - /*listener = new ActionListener() { + listener = new BulkListener() { @Override - public void onResponse(BulkResponse bulkResponse) { + public void beforeBulk(long executionId, BulkRequest request, List contexts) { + } + + @Override + public void afterBulk(long executionId, BulkRequest request, List contexts, BulkResponse bulkResponse) { AtomicInteger bulkFailures = new AtomicInteger(); if (bulkResponse.errors()) { bulkResponse.items().forEach(e -> { @@ -627,29 +637,32 @@ public void onResponse(BulkResponse bulkResponse) { } }); } - LOGGER.debug(" {} - Features [{}-{}] indexed in {} ms{}.", new Object[]{ - typeName, firstFeatureIndex, firstFeatureIndex + bulkSize, + LOGGER.debug(" {} - Features [{}-{}] indexed in {} ms{}.", typeName, firstFeatureIndex, firstFeatureIndex + bulkSize, System.currentTimeMillis() - begin, bulkResponse.errors() ? - " but with " + bulkFailures + " errors" : "" - }); + " but with " + bulkFailures + " errors" : ""); failuresCount = bulkFailures.get(); phaser.arriveAndDeregister(); } @Override - public void onFailure(Exception e) { + public void afterBulk(long executionId, BulkRequest request, List contexts, Throwable failure) { String msg = String.format( - "Features [%s-%s] indexed in %s ms but with errors. Exception: %s", - typeName, firstFeatureIndex, bulkSize, + " %s - Features [%s-%s] indexed in %s ms but with errors. Exception: %s", + typeName, firstFeatureIndex, firstFeatureIndex + bulkSize, System.currentTimeMillis() - begin, - e.getMessage() + failure.getMessage() ); report.put("error_ss", msg); LOGGER.error(msg); phaser.arriveAndDeregister(); } - };*/ + }; + + this.bulk = BulkIngester.of(b -> b.client(client.getAsynchClient()) + .listener(listener) + .maxOperations(featureCommitInterval)); + } public int getBulkSize() { @@ -668,11 +681,17 @@ public void addAction(ObjectNode rootNode, SimpleFeature feature) throws JsonPro } String id = String.format("%s#%s#%s", url, typeName, featureId); - // TODO: ES 8 - //bulk.operations().add(new IndexRequest.Builder().index(index).id(id) - // .document(jacksonMapper.writeValueAsString(rootNode)).build()); - -// .routing(ROUTING_KEY)); + StringReader reader = new StringReader(jacksonMapper.writeValueAsString(rootNode)); + // https://discuss.elastic.co/t/java-8-1-bulk-request/302423 + JsonpMapper jsonpMapper = client.getClient()._transport().jsonpMapper(); + JsonProvider jsonProvider = jsonpMapper.jsonProvider(); + JsonData jd = JsonData.from(jsonProvider.createParser(reader), jsonpMapper); + + bulk.add(b -> + b.index(io -> io + .index(index) + .id(id) + .document(jd)), id); bulkSize++; } @@ -680,11 +699,16 @@ protected void prepareLaunch() { phaser.register(); this.begin = System.currentTimeMillis(); } + public void close() { + if (this.bulk != null) { + this.bulk.close(); + } + } abstract public void launchBulk(EsRestClient client) throws Exception; } - // depending on situation, one can expect going up to 1.5 faster using an async result handler (e.g. hudge collection of points) + // depending on situation, one can expect going up to 1.5 faster using an async result handler (e.g. huge collection of points) class AsyncBulkResutHandler extends BulkResutHandler { public AsyncBulkResutHandler(Phaser phaser, String typeName, String url, int firstFeatureIndex, Report report, String metadataUuid) { super(phaser, typeName, url, firstFeatureIndex, report, metadataUuid); @@ -692,9 +716,6 @@ public AsyncBulkResutHandler(Phaser phaser, String typeName, String url, int fir public void launchBulk(EsRestClient client) throws Exception { prepareLaunch(); - - // TODO: ES 8 - //client.getAsynchClient().bulk(this.bulk, this.listener); } }