Skip to content

Commit

Permalink
Add sniffing for OpenSearch, fix #1142
Browse files Browse the repository at this point in the history
Signed-off-by: Julien Nioche <[email protected]>
  • Loading branch information
jnioche committed Jan 22, 2024
1 parent a8d7419 commit 41dd910
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
15 changes: 13 additions & 2 deletions external/opensearch/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand All @@ -17,7 +19,8 @@
<packaging>jar</packaging>

<name>storm-crawler-opensearch</name>
<url>https://github.com/DigitalPebble/storm-crawler/tree/master/external/opensearch</url>
<url>
https://github.com/DigitalPebble/storm-crawler/tree/master/external/opensearch</url>
<description>Opensearch resources for StormCrawler</description>

<build>
Expand Down Expand Up @@ -49,6 +52,14 @@
<version>${opensearch.version}</version>
</dependency>

<!--
https://mvnrepository.com/artifact/org.opensearch.client/opensearch-rest-client-sniffer -->
<dependency>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-rest-client-sniffer</artifactId>
<version>${opensearch.version}</version>
</dependency>

<dependency>
<groupId>com.digitalpebble.stormcrawler</groupId>
<artifactId>storm-crawler-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.bulk.BulkProcessor;
import org.opensearch.action.bulk.BulkRequest;
Expand All @@ -40,6 +41,7 @@
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.sniff.Sniffer;
import org.opensearch.common.unit.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,9 +57,13 @@ public final class OpenSearchConnection {

@NotNull private final BulkProcessor processor;

private OpenSearchConnection(@NotNull RestHighLevelClient c, @NotNull BulkProcessor p) {
@Nullable private final Sniffer sniffer;

private OpenSearchConnection(
@NotNull RestHighLevelClient c, @NotNull BulkProcessor p, @Nullable Sniffer s) {
processor = p;
client = c;
sniffer = s;
}

public RestHighLevelClient getClient() {
Expand Down Expand Up @@ -250,7 +256,14 @@ public static OpenSearchConnection getConnection(
.setConcurrentRequests(concurrentRequests)
.build();

return new OpenSearchConnection(client, bulkProcessor);
boolean sniff =
ConfUtils.getBoolean(stormConf, Constants.PARAMPREFIX, dottedType, "sniff", true);
Sniffer sniffer = null;
if (sniff) {
sniffer = Sniffer.builder(client.getLowLevelClient()).build();
}

return new OpenSearchConnection(client, bulkProcessor, sniffer);
}

private boolean isClosed = false;
Expand All @@ -276,6 +289,10 @@ public void close() {
throw new RuntimeException(e);
}

if (sniffer != null) {
sniffer.close();
}

// Now close the actual client
try {
client.close();
Expand Down

0 comments on commit 41dd910

Please sign in to comment.