Skip to content

Commit

Permalink
Update to Elasticsearch 8 / WFS indexing draft.
Browse files Browse the repository at this point in the history
  • Loading branch information
fxprunayre committed Oct 26, 2023
1 parent f47c3b6 commit 50d387a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 30 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1525,7 +1525,7 @@
<es.version>8.10.4</es.version>
<es.platform>linux-x86_64</es.platform>
<es.installer.extension>tar.gz</es.installer.extension>
<es.protocol>https</es.protocol>
<es.protocol>http</es.protocol>
<es.port>9200</es.port>
<es.host>localhost</es.host>
<es.url>${es.protocol}://${es.host}:${es.port}</es.url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,25 @@

package org.fao.geonet.harvester.wfsfeatures.worker;

import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester;
import co.elastic.clients.elasticsearch._helpers.bulk.BulkListener;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.json.JsonpMapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import jakarta.json.spi.JsonProvider;
import org.apache.camel.Exchange;
import org.apache.camel.spring.SpringCamelContext;
import org.apache.jcs.access.exception.InvalidArgumentException;
Expand Down Expand Up @@ -65,6 +72,7 @@
import org.springframework.beans.factory.annotation.Value;

import java.io.IOException;
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -273,13 +281,12 @@ public CompletableFuture<Void> indexFeatures(Exchange exchange) throws Exception
LOGGER.error(msg);
throw new RuntimeException(msg);
}
final Phaser phaser = new Phaser();

BulkResutHandler brh = new AsyncBulkResutHandler(phaser, typeName, url, nbOfFeatures, report, state.getParameters().getMetadataUuid());

try {
nbOfFeatures = 0;

final Phaser phaser = new Phaser();
BulkResutHandler brh = new AsyncBulkResutHandler(phaser, typeName, url, nbOfFeatures, report, state.getParameters().getMetadataUuid());

long begin = System.currentTimeMillis();

String epsg = "urn:ogc:def:crs:OGC:1.3:CRS84";
Expand Down Expand Up @@ -468,6 +475,7 @@ public CompletableFuture<Void> indexFeatures(Exchange exchange) throws Exception
throw e;
} finally {
report.saveHarvesterReport();
brh.close();
future.complete(null);
}

Expand Down Expand Up @@ -590,11 +598,10 @@ abstract class BulkResutHandler {
private Report report;
private String metadataUuid;
protected long begin;
protected BulkRequest bulk;
protected BulkIngester<String> bulk;
protected int bulkSize;
protected int failuresCount;
// TODO: ES 8
//ActionListener<BulkResponse> listener;
BulkListener<String> listener;

public BulkResutHandler(Phaser phaser, String typeName, String url, int firstFeatureIndex, Report report, String metadataUuid) {
this.phaser = phaser;
Expand All @@ -604,16 +611,19 @@ public BulkResutHandler(Phaser phaser, String typeName, String url, int firstFea
this.report = report;

this.metadataUuid = metadataUuid;
this.bulk = new BulkRequest.Builder().index(index).build();

this.bulkSize = 0;
this.failuresCount = 0;
LOGGER.debug(" {} - Indexing bulk (size {}) starting at {} ...",
typeName, featureCommitInterval, firstFeatureIndex);

// TODO: ES 8
/*listener = new ActionListener<BulkResponse>() {
listener = new BulkListener<String>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) {
}

@Override
public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse bulkResponse) {
AtomicInteger bulkFailures = new AtomicInteger();
if (bulkResponse.errors()) {
bulkResponse.items().forEach(e -> {
Expand All @@ -627,29 +637,32 @@ public void onResponse(BulkResponse bulkResponse) {
}
});
}
LOGGER.debug(" {} - Features [{}-{}] indexed in {} ms{}.", new Object[]{
typeName, firstFeatureIndex, firstFeatureIndex + bulkSize,
LOGGER.debug(" {} - Features [{}-{}] indexed in {} ms{}.", typeName, firstFeatureIndex, firstFeatureIndex + bulkSize,
System.currentTimeMillis() - begin,
bulkResponse.errors() ?
" but with " + bulkFailures + " errors" : ""
});
" but with " + bulkFailures + " errors" : "");
failuresCount = bulkFailures.get();
phaser.arriveAndDeregister();
}

@Override
public void onFailure(Exception e) {
public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) {
String msg = String.format(
"Features [%s-%s] indexed in %s ms but with errors. Exception: %s",
typeName, firstFeatureIndex, bulkSize,
" %s - Features [%s-%s] indexed in %s ms but with errors. Exception: %s",
typeName, firstFeatureIndex, firstFeatureIndex + bulkSize,
System.currentTimeMillis() - begin,
e.getMessage()
failure.getMessage()
);
report.put("error_ss", msg);
LOGGER.error(msg);
phaser.arriveAndDeregister();
}
};*/
};

this.bulk = BulkIngester.of(b -> b.client(client.getAsynchClient())
.listener(listener)
.maxOperations(featureCommitInterval));

}

public int getBulkSize() {
Expand All @@ -668,33 +681,41 @@ public void addAction(ObjectNode rootNode, SimpleFeature feature) throws JsonPro
}

String id = String.format("%s#%s#%s", url, typeName, featureId);
// TODO: ES 8
//bulk.operations().add(new IndexRequest.Builder().index(index).id(id)
// .document(jacksonMapper.writeValueAsString(rootNode)).build());

// .routing(ROUTING_KEY));
StringReader reader = new StringReader(jacksonMapper.writeValueAsString(rootNode));
// https://discuss.elastic.co/t/java-8-1-bulk-request/302423
JsonpMapper jsonpMapper = client.getClient()._transport().jsonpMapper();
JsonProvider jsonProvider = jsonpMapper.jsonProvider();
JsonData jd = JsonData.from(jsonProvider.createParser(reader), jsonpMapper);

bulk.add(b ->
b.index(io -> io
.index(index)
.id(id)
.document(jd)), id);
bulkSize++;
}

protected void prepareLaunch() {
phaser.register();
this.begin = System.currentTimeMillis();
}
public void close() {
if (this.bulk != null) {
this.bulk.close();
}
}

abstract public void launchBulk(EsRestClient client) throws Exception;
}

// depending on situation, one can expect going up to 1.5 faster using an async result handler (e.g. hudge collection of points)
// depending on situation, one can expect going up to 1.5 faster using an async result handler (e.g. huge collection of points)
class AsyncBulkResutHandler extends BulkResutHandler {
public AsyncBulkResutHandler(Phaser phaser, String typeName, String url, int firstFeatureIndex, Report report, String metadataUuid) {
super(phaser, typeName, url, firstFeatureIndex, report, metadataUuid);
}

public void launchBulk(EsRestClient client) throws Exception {
prepareLaunch();

// TODO: ES 8
//client.getAsynchClient().bulk(this.bulk, this.listener);
}
}

Expand Down

0 comments on commit 50d387a

Please sign in to comment.