Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support elastic search client v8 #243

Merged
merged 1 commit into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/sharp-birds-rest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'contexture-elasticsearch': minor
---

Support elasticsearch client v8
7 changes: 7 additions & 0 deletions packages/provider-elasticsearch/src/compat.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Detect if ElasticSearch client is at least version 8 by detecting whether an
// `extend` method exists on it.
//
// @see https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/8.0/changelog-client.html#_remove_client_extensions_api
export function isAtLeastVersion8(client) {
return !client.extend
}
68 changes: 38 additions & 30 deletions packages/provider-elasticsearch/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import _ from 'lodash/fp.js'
import { hoistOnTree } from './utils/results.js'
import { getESSchemas } from './schema.js'
import _debug from 'debug'
import { isAtLeastVersion8 } from './compat.js'

let debug = _debug('contexture:elasticsearch')

Expand Down Expand Up @@ -45,6 +46,7 @@ let ElasticsearchProvider = (config = { request: {} }) => ({
}
},
async runSearch({ requestOptions = {} } = {}, node, schema, filters, aggs) {
let client = config.getClient()
let hoistedFromFilters = hoistOnTree(filters)
let hoistedFromAggs = hoistOnTree(aggs)
let {
Expand All @@ -54,36 +56,41 @@ let ElasticsearchProvider = (config = { request: {} }) => ({
clusterDefaultTimeout,
} = config
let { scroll, scrollId } = node
let request = scrollId
? // If we have scrollId then keep scrolling, no query needed
{
scroll: scroll === true ? '60m' : hoistedFromFilters,
body: { scroll_id: scrollId },
}
: // Deterministic ordering of JSON keys for request cache optimization
{
...configOptions,
index: schema.elasticsearch.index,
// Scroll support (used for bulk export)
...(scroll && { scroll: scroll === true ? '2m' : scroll }),
body: {
// Wrap in constant_score when not sorting by score to avoid wasting time on relevance scoring
...(!_.isEmpty(hoistedFromAggs) && _.mergeAll(hoistedFromAggs)),
...(!_.isEmpty(hoistedFromFilters) &&
_.mergeAll(hoistedFromFilters)),
query:
filters && !_.has('sort._score', aggs)
? constantScore(filters)
: filters,
// If there are aggs, skip search results
...(aggs.aggs && { size: 0 }),
// Sorting by _doc is more efficient for scrolling since it won't waste time on any sorting
...(scroll && { sort: ['_doc'] }),
...aggs,
},
}
let request
// If we have scrollId then keep scrolling, no query needed
if (scrollId) {
let body = { scroll_id: scrollId }
request = {
scroll: scroll === true ? '60m' : hoistedFromFilters,
...(isAtLeastVersion8(client) ? body : { body }),
}
}
// Deterministic ordering of JSON keys for request cache optimization
else {
let body = {
// Wrap in constant_score when not sorting by score to avoid wasting time on relevance scoring
...(!_.isEmpty(hoistedFromAggs) && _.mergeAll(hoistedFromAggs)),
...(!_.isEmpty(hoistedFromFilters) && _.mergeAll(hoistedFromFilters)),
query:
filters && !_.has('sort._score', aggs)
? constantScore(filters)
: filters,
// If there are aggs, skip search results
...(aggs.aggs && { size: 0 }),
// Sorting by _doc is more efficient for scrolling since it won't waste time on any sorting
...(scroll && { sort: ['_doc'] }),
...aggs,
}
request = {
...configOptions,
index: schema.elasticsearch.index,
// Scroll support (used for bulk export)
...(scroll && { scroll: scroll === true ? '2m' : scroll }),
...(isAtLeastVersion8(client) ? body : { body }),
}
}

let child = config.getClient().child({
let child = client.child({
headers: requestOptions.headers,
requestTimeout: requestOptions.requestTimeout,
})
Expand All @@ -102,7 +109,8 @@ let ElasticsearchProvider = (config = { request: {} }) => ({
node._meta.requests.push(metaObj)
let count = counter.inc()
debug('(%s) Request: %O\nOptions: %O', count, request, requestOptions)
let { body } = await search(request, requestOptions)
let response = await search(request, requestOptions)
let body = isAtLeastVersion8(client) ? response : response?.body

// If body has timed_out set to true, log that partial results were returned,
// if partial is turned off an error will be thrown instead.
Expand Down
6 changes: 3 additions & 3 deletions packages/provider-elasticsearch/src/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ describe('Server Provider', () => {
let firstSearchCall =
client.child.mock.results[0].value.search.mock.calls[0]

expect(firstSearchCall[0].body.query.constant_score.filter).toEqual({
expect(firstSearchCall[0].query.constant_score.filter).toEqual({
query_string,
})
})
Expand All @@ -65,7 +65,7 @@ describe('Server Provider', () => {
let firstSearchCall =
client.child.mock.results[0].value.search.mock.calls[0]

expect(firstSearchCall[0].body).toEqual({ query: undefined })
expect(firstSearchCall[0]).toEqual({ query: undefined })
})
it('runSearch should not wrap queries in constant_score if sort._score is present', () => {
const client = {
Expand All @@ -87,7 +87,7 @@ describe('Server Provider', () => {
let firstSearchCall =
client.child.mock.results[0].value.search.mock.calls[0]

expect(firstSearchCall[0].body.query).toEqual({
expect(firstSearchCall[0].query).toEqual({
query_string,
})
})
Expand Down
9 changes: 7 additions & 2 deletions packages/provider-elasticsearch/src/schema.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import _ from 'lodash/fp.js'
import F from 'futil'
import { isAtLeastVersion8 } from './compat.js'

let Tree = F.tree((x) => x.properties)
// flatLeaves should auto detect reject vs omit (or just more general obj vs arr method)
Expand Down Expand Up @@ -83,6 +84,10 @@ export let fromMappingsWithAliases = (mappings, aliases) => {

export let getESSchemas = (client) =>
Promise.all([client.indices.getMapping(), client.indices.getAlias()]).then(
([{ body: mappings }, { body: aliases }]) =>
fromMappingsWithAliases(mappings, aliases)
([mappingResult, aliasResult]) => {
return fromMappingsWithAliases(
isAtLeastVersion8(client) ? mappingResult : mappingResult.body,
isAtLeastVersion8(client) ? aliasResult : aliasResult.body
)
}
)
94 changes: 45 additions & 49 deletions packages/provider-elasticsearch/src/utils/futil.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,51 @@ describe('futil candidates', () => {
it('Should hoist from tree based on demarcation for hoisting from filters', () => {
let input = {
index: 'sp-data-lit',
body: {
query: {
constant_score: {
filter: {
bool: {
should: [
{
bool: {
must: [
{
__hoistProps: {
runtime_mappings: {
'FederalDoc.relevantContractDates.signedDate.fiscal':
{
type: 'date',
script: {
source:
"if(doc['FederalDoc.relevantContractDates.signedDate'].size()!=0){emit(doc['FederalDoc.relevantContractDates.signedDate'].value.plusMonths(params['monthOffset']).toInstant().toEpochMilli())}",
params: {
monthOffset: 3,
},
},
},
},
},
range: {
'FederalDoc.relevantContractDates.signedDate.fiscal':
{
gte: '2015-04-01T00:00:00.000Z',
lte: '2015-06-30T23:59:59Z',
},
},
},
],
},
},
],
minimum_should_match: 1,
},
},
},
},
}
let output = {
result: {
index: 'sp-data-lit',
query: {
constant_score: {
filter: {
Expand All @@ -386,21 +430,6 @@ describe('futil candidates', () => {
bool: {
must: [
{
__hoistProps: {
runtime_mappings: {
'FederalDoc.relevantContractDates.signedDate.fiscal':
{
type: 'date',
script: {
source:
"if(doc['FederalDoc.relevantContractDates.signedDate'].size()!=0){emit(doc['FederalDoc.relevantContractDates.signedDate'].value.plusMonths(params['monthOffset']).toInstant().toEpochMilli())}",
params: {
monthOffset: 3,
},
},
},
},
},
range: {
'FederalDoc.relevantContractDates.signedDate.fiscal':
{
Expand All @@ -419,39 +448,6 @@ describe('futil candidates', () => {
},
},
},
}
let output = {
result: {
index: 'sp-data-lit',
body: {
query: {
constant_score: {
filter: {
bool: {
should: [
{
bool: {
must: [
{
range: {
'FederalDoc.relevantContractDates.signedDate.fiscal':
{
gte: '2015-04-01T00:00:00.000Z',
lte: '2015-06-30T23:59:59Z',
},
},
},
],
},
},
],
minimum_should_match: 1,
},
},
},
},
},
},
removed: [
{
runtime_mappings: {
Expand Down
Loading