From 698890b567caec7b51b11191a7b194f6450c6171 Mon Sep 17 00:00:00 2001 From: Sviatoslav Bulbakha Date: Tue, 13 Sep 2022 21:17:33 +0400 Subject: [PATCH 1/2] Add an ability to customize bulk api url --- README.md | 10 +++++++--- lib/elasticsearch/indexing/bulk.ex | 10 ++++++---- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index d6e1242..0f42b21 100644 --- a/README.md +++ b/README.md @@ -103,7 +103,11 @@ config :my_app, MyApp.ElasticsearchCluster, # By default bulk indexing uses the "create" action. To allow existing # documents to be replaced, use the "index" action instead. - bulk_action: "create" + bulk_action: "create", + + # Path to bulk api url. Should start with a slash. + # You may need to use `/_doc/_bulk` for older ElasticSearch versions. + bulk_path: "/_bulk" } } ``` @@ -207,9 +211,9 @@ As AWS does not provide credentials' based http authentication, you can use the To use this, just add `sigaws` to your dependencies and add this to your configuration: ```elixir -# Add to deps +# Add to deps def deps do - [ + [ # ... {:sigaws, ">= 0.0.0"} ] diff --git a/lib/elasticsearch/indexing/bulk.ex b/lib/elasticsearch/indexing/bulk.ex index f1a4aec..5000c03 100644 --- a/lib/elasticsearch/indexing/bulk.ex +++ b/lib/elasticsearch/indexing/bulk.ex @@ -100,6 +100,7 @@ defmodule Elasticsearch.Index.Bulk do bulk_page_size = index_config[:bulk_page_size] || 5000 bulk_wait_interval = index_config[:bulk_wait_interval] || 0 action = index_config[:bulk_action] || "create" + bulk_path = index_config[:bulk_path] || "/_bulk" errors = store.transaction(fn -> @@ -108,20 +109,21 @@ defmodule Elasticsearch.Index.Bulk do |> Stream.map(&encode!(config, &1, index_name, action)) |> Stream.chunk_every(bulk_page_size) |> Stream.intersperse(bulk_wait_interval) - |> Stream.map(&put_bulk_page(config, index_name, &1)) + |> Stream.map(&put_bulk_page(config, index_name, bulk_path, &1)) |> Enum.reduce(errors, &collect_errors(&1, &2, action)) end) upload(config, index_name, %{index_config | sources: tail}, errors) end - defp put_bulk_page(_config, _index_name, wait_interval) when is_integer(wait_interval) do + defp put_bulk_page(_config, _index_name, _bulk_path, wait_interval) + when is_integer(wait_interval) do Logger.debug("Pausing #{wait_interval}ms between bulk pages") :timer.sleep(wait_interval) end - defp put_bulk_page(config, index_name, items) when is_list(items) do - Elasticsearch.put(config, "/#{index_name}/_doc/_bulk", Enum.join(items)) + defp put_bulk_page(config, index_name, bulk_path, items) when is_list(items) do + Elasticsearch.put(config, "/#{index_name}#{bulk_path}", Enum.join(items)) end defp collect_errors({:ok, %{"errors" => true} = response}, errors, action) do From 9c3666ede903b30c73b3e2b0fbdab4f1d98f8a84 Mon Sep 17 00:00:00 2001 From: Sviatoslav Bulbakha Date: Fri, 7 Oct 2022 02:32:41 +0400 Subject: [PATCH 2/2] Use old bulk path as default --- README.md | 4 ++-- lib/elasticsearch/indexing/bulk.ex | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 0f42b21..0a7da84 100644 --- a/README.md +++ b/README.md @@ -106,8 +106,8 @@ config :my_app, MyApp.ElasticsearchCluster, bulk_action: "create", # Path to bulk api url. Should start with a slash. - # You may need to use `/_doc/_bulk` for older ElasticSearch versions. - bulk_path: "/_bulk" + # You may need to use `/_bulk` for newer ElasticSearch versions. + bulk_path: "/_doc/_bulk" } } ``` diff --git a/lib/elasticsearch/indexing/bulk.ex b/lib/elasticsearch/indexing/bulk.ex index 5000c03..998d742 100644 --- a/lib/elasticsearch/indexing/bulk.ex +++ b/lib/elasticsearch/indexing/bulk.ex @@ -100,7 +100,7 @@ defmodule Elasticsearch.Index.Bulk do bulk_page_size = index_config[:bulk_page_size] || 5000 bulk_wait_interval = index_config[:bulk_wait_interval] || 0 action = index_config[:bulk_action] || "create" - bulk_path = index_config[:bulk_path] || "/_bulk" + bulk_path = index_config[:bulk_path] || "/_doc/_bulk" errors = store.transaction(fn ->