diff --git a/lib/searchkick/query.rb b/lib/searchkick/query.rb index 3f3ac55ac..5bc9eb50a 100644 --- a/lib/searchkick/query.rb +++ b/lib/searchkick/query.rb @@ -12,12 +12,12 @@ class Query :took, :error, :model_name, :entry_name, :total_count, :total_entries, :current_page, :per_page, :limit_value, :padding, :total_pages, :num_pages, :offset_value, :offset, :previous_page, :prev_page, :next_page, :first_page?, :last_page?, - :out_of_range?, :hits, :response, :to_a, :first + :out_of_range?, :hits, :response, :to_a, :first, :scroll def initialize(klass, term = "*", **options) unknown_keywords = options.keys - [:aggs, :block, :body, :body_options, :boost, :boost_by, :boost_by_distance, :boost_by_recency, :boost_where, :conversions, :conversions_term, :debug, :emoji, :exclude, :execute, :explain, - :fields, :highlight, :includes, :index_name, :indices_boost, :limit, :load, + :fields, :highlight, :includes, :index_name, :indices_boost, :limit, :load, :scroll, :match, :misspellings, :model_includes, :offset, :operator, :order, :padding, :page, :per_page, :profile, :request_params, :routing, :scope_results, :select, :similar, :smart_aggs, :suggest, :total_entries, :track, :type, :where] raise ArgumentError, "unknown keywords: #{unknown_keywords.join(", ")}" if unknown_keywords.any? @@ -71,6 +71,7 @@ def params } params[:type] = @type if @type params[:routing] = @routing if @routing + params[:scroll] = @scroll if @scroll params.merge!(options[:request_params]) if options[:request_params] params end @@ -116,7 +117,8 @@ def handle_response(response) term: term, scope_results: options[:scope_results], index_name: options[:index_name], - total_entries: options[:total_entries] + total_entries: options[:total_entries], + scroll: options[:scroll] } if options[:debug] @@ -217,6 +219,7 @@ def prepare per_page = (options[:limit] || options[:per_page] || 10_000).to_i padding = [options[:padding].to_i, 0].max offset = options[:offset] || (page - 1) * per_page + padding + scroll = options[:scroll] # model and eager loading load = options[:load].nil? ? true : options[:load] @@ -485,11 +488,18 @@ def prepare # run block options[:block].call(payload) if options[:block] + # scroll optimization when interating over all docs + # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html + if options[:scroll] && payload[:query] == {match_all: {}} + payload[:sort] ||= ["_doc"] + end + @body = payload @page = page @per_page = per_page @padding = padding @load = load + @scroll = scroll end def set_fields diff --git a/lib/searchkick/results.rb b/lib/searchkick/results.rb index c9e2b4cf4..e249c82bc 100644 --- a/lib/searchkick/results.rb +++ b/lib/searchkick/results.rb @@ -204,6 +204,51 @@ def misspellings? @options[:misspellings] end + def scroll_id + @response["_scroll_id"] + end + + def scroll + raise Searchkick::Error, "Pass `scroll` option to the search method for scrolling" unless scroll_id + + if block_given? + records = self + while records.any? + yield records + records = records.scroll + end + + records.clear_scroll + else + params = { + scroll: options[:scroll], + scroll_id: scroll_id + } + + begin + # TODO Active Support notifications for this scroll call + Searchkick::Results.new(@klass, index_client.scroll(params), @options) + rescue Elasticsearch::Transport::Transport::Errors::NotFound => e + if e.class.to_s =~ /NotFound/ && e.message =~ /search_context_missing_exception/i + raise Searchkick::Error, "Scroll id has expired" + else + raise e + end + end + end + end + + def clear_scroll + begin + # try to clear scroll + # not required as scroll will expire + # but there is a cost to open scrolls + Searchkick.client.clear_scroll(scroll_id: scroll_id) + rescue Elasticsearch::Transport::Transport::Error + # do nothing + end + end + private def results_query(records, hits) @@ -253,5 +298,9 @@ def hit_highlights(hit, multiple: false) {} end end + + def index_client + klass.search_index.client + end end end