diff --git a/external/opensearch/pom.xml b/external/opensearch/pom.xml index 9dacb3513..2404a7cc4 100644 --- a/external/opensearch/pom.xml +++ b/external/opensearch/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 @@ -17,7 +19,8 @@ jar storm-crawler-opensearch - https://github.com/DigitalPebble/storm-crawler/tree/master/external/opensearch + + https://github.com/DigitalPebble/storm-crawler/tree/master/external/opensearch Opensearch resources for StormCrawler @@ -49,6 +52,14 @@ ${opensearch.version} + + + org.opensearch.client + opensearch-rest-client-sniffer + ${opensearch.version} + + com.digitalpebble.stormcrawler storm-crawler-core diff --git a/external/opensearch/src/main/java/com/digitalpebble/stormcrawler/opensearch/OpenSearchConnection.java b/external/opensearch/src/main/java/com/digitalpebble/stormcrawler/opensearch/OpenSearchConnection.java index 8a2685e6a..ffff955ef 100644 --- a/external/opensearch/src/main/java/com/digitalpebble/stormcrawler/opensearch/OpenSearchConnection.java +++ b/external/opensearch/src/main/java/com/digitalpebble/stormcrawler/opensearch/OpenSearchConnection.java @@ -31,6 +31,7 @@ import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.storm.shade.org.apache.commons.lang.StringUtils; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.bulk.BulkProcessor; import org.opensearch.action.bulk.BulkRequest; @@ -40,6 +41,7 @@ import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.sniff.Sniffer; import org.opensearch.common.unit.TimeValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,9 +57,13 @@ public final class OpenSearchConnection { @NotNull private final BulkProcessor processor; - private OpenSearchConnection(@NotNull RestHighLevelClient c, @NotNull BulkProcessor p) { + @Nullable private final Sniffer sniffer; + + private OpenSearchConnection( + @NotNull RestHighLevelClient c, @NotNull BulkProcessor p, @Nullable Sniffer s) { processor = p; client = c; + sniffer = s; } public RestHighLevelClient getClient() { @@ -250,7 +256,14 @@ public static OpenSearchConnection getConnection( .setConcurrentRequests(concurrentRequests) .build(); - return new OpenSearchConnection(client, bulkProcessor); + boolean sniff = + ConfUtils.getBoolean(stormConf, Constants.PARAMPREFIX, dottedType, "sniff", true); + Sniffer sniffer = null; + if (sniff) { + sniffer = Sniffer.builder(client.getLowLevelClient()).build(); + } + + return new OpenSearchConnection(client, bulkProcessor, sniffer); } private boolean isClosed = false; @@ -276,6 +289,10 @@ public void close() { throw new RuntimeException(e); } + if (sniffer != null) { + sniffer.close(); + } + // Now close the actual client try { client.close();