Skip to content

Commit

Permalink
Merge pull request #243 from smartprocure/feature/elasticsearch-clien…
Browse files Browse the repository at this point in the history
…t-v8

Support elastic search client v8
  • Loading branch information
stellarhoof authored Aug 1, 2024
2 parents a2df8b9 + 1fb99f7 commit 618e01b
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 84 deletions.
5 changes: 5 additions & 0 deletions .changeset/sharp-birds-rest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'contexture-elasticsearch': minor
---

Support elasticsearch client v8
7 changes: 7 additions & 0 deletions packages/provider-elasticsearch/src/compat.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Detect if ElasticSearch client is at least version 8 by detecting whether an
// `extend` method exists on it.
//
// @see https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/8.0/changelog-client.html#_remove_client_extensions_api
export function isAtLeastVersion8(client) {
return !client.extend
}
68 changes: 38 additions & 30 deletions packages/provider-elasticsearch/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import _ from 'lodash/fp.js'
import { hoistOnTree } from './utils/results.js'
import { getESSchemas } from './schema.js'
import _debug from 'debug'
import { isAtLeastVersion8 } from './compat.js'

let debug = _debug('contexture:elasticsearch')

Expand Down Expand Up @@ -45,6 +46,7 @@ let ElasticsearchProvider = (config = { request: {} }) => ({
}
},
async runSearch({ requestOptions = {} } = {}, node, schema, filters, aggs) {
let client = config.getClient()
let hoistedFromFilters = hoistOnTree(filters)
let hoistedFromAggs = hoistOnTree(aggs)
let {
Expand All @@ -54,36 +56,41 @@ let ElasticsearchProvider = (config = { request: {} }) => ({
clusterDefaultTimeout,
} = config
let { scroll, scrollId } = node
let request = scrollId
? // If we have scrollId then keep scrolling, no query needed
{
scroll: scroll === true ? '60m' : hoistedFromFilters,
body: { scroll_id: scrollId },
}
: // Deterministic ordering of JSON keys for request cache optimization
{
...configOptions,
index: schema.elasticsearch.index,
// Scroll support (used for bulk export)
...(scroll && { scroll: scroll === true ? '2m' : scroll }),
body: {
// Wrap in constant_score when not sorting by score to avoid wasting time on relevance scoring
...(!_.isEmpty(hoistedFromAggs) && _.mergeAll(hoistedFromAggs)),
...(!_.isEmpty(hoistedFromFilters) &&
_.mergeAll(hoistedFromFilters)),
query:
filters && !_.has('sort._score', aggs)
? constantScore(filters)
: filters,
// If there are aggs, skip search results
...(aggs.aggs && { size: 0 }),
// Sorting by _doc is more efficient for scrolling since it won't waste time on any sorting
...(scroll && { sort: ['_doc'] }),
...aggs,
},
}
let request
// If we have scrollId then keep scrolling, no query needed
if (scrollId) {
let body = { scroll_id: scrollId }
request = {
scroll: scroll === true ? '60m' : hoistedFromFilters,
...(isAtLeastVersion8(client) ? body : { body }),
}
}
// Deterministic ordering of JSON keys for request cache optimization
else {
let body = {
// Wrap in constant_score when not sorting by score to avoid wasting time on relevance scoring
...(!_.isEmpty(hoistedFromAggs) && _.mergeAll(hoistedFromAggs)),
...(!_.isEmpty(hoistedFromFilters) && _.mergeAll(hoistedFromFilters)),
query:
filters && !_.has('sort._score', aggs)
? constantScore(filters)
: filters,
// If there are aggs, skip search results
...(aggs.aggs && { size: 0 }),
// Sorting by _doc is more efficient for scrolling since it won't waste time on any sorting
...(scroll && { sort: ['_doc'] }),
...aggs,
}
request = {
...configOptions,
index: schema.elasticsearch.index,
// Scroll support (used for bulk export)
...(scroll && { scroll: scroll === true ? '2m' : scroll }),
...(isAtLeastVersion8(client) ? body : { body }),
}
}

let child = config.getClient().child({
let child = client.child({
headers: requestOptions.headers,
requestTimeout: requestOptions.requestTimeout,
})
Expand All @@ -102,7 +109,8 @@ let ElasticsearchProvider = (config = { request: {} }) => ({
node._meta.requests.push(metaObj)
let count = counter.inc()
debug('(%s) Request: %O\nOptions: %O', count, request, requestOptions)
let { body } = await search(request, requestOptions)
let response = await search(request, requestOptions)
let body = isAtLeastVersion8(client) ? response : response?.body

// If body has timed_out set to true, log that partial results were returned,
// if partial is turned off an error will be thrown instead.
Expand Down
6 changes: 3 additions & 3 deletions packages/provider-elasticsearch/src/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ describe('Server Provider', () => {
let firstSearchCall =
client.child.mock.results[0].value.search.mock.calls[0]

expect(firstSearchCall[0].body.query.constant_score.filter).toEqual({
expect(firstSearchCall[0].query.constant_score.filter).toEqual({
query_string,
})
})
Expand All @@ -65,7 +65,7 @@ describe('Server Provider', () => {
let firstSearchCall =
client.child.mock.results[0].value.search.mock.calls[0]

expect(firstSearchCall[0].body).toEqual({ query: undefined })
expect(firstSearchCall[0]).toEqual({ query: undefined })
})
it('runSearch should not wrap queries in constant_score if sort._score is present', () => {
const client = {
Expand All @@ -87,7 +87,7 @@ describe('Server Provider', () => {
let firstSearchCall =
client.child.mock.results[0].value.search.mock.calls[0]

expect(firstSearchCall[0].body.query).toEqual({
expect(firstSearchCall[0].query).toEqual({
query_string,
})
})
Expand Down
9 changes: 7 additions & 2 deletions packages/provider-elasticsearch/src/schema.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import _ from 'lodash/fp.js'
import F from 'futil'
import { isAtLeastVersion8 } from './compat.js'

let Tree = F.tree((x) => x.properties)
// flatLeaves should auto detect reject vs omit (or just more general obj vs arr method)
Expand Down Expand Up @@ -83,6 +84,10 @@ export let fromMappingsWithAliases = (mappings, aliases) => {

export let getESSchemas = (client) =>
Promise.all([client.indices.getMapping(), client.indices.getAlias()]).then(
([{ body: mappings }, { body: aliases }]) =>
fromMappingsWithAliases(mappings, aliases)
([mappingResult, aliasResult]) => {
return fromMappingsWithAliases(
isAtLeastVersion8(client) ? mappingResult : mappingResult.body,
isAtLeastVersion8(client) ? aliasResult : aliasResult.body
)
}
)
94 changes: 45 additions & 49 deletions packages/provider-elasticsearch/src/utils/futil.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,51 @@ describe('futil candidates', () => {
it('Should hoist from tree based on demarcation for hoisting from filters', () => {
let input = {
index: 'sp-data-lit',
body: {
query: {
constant_score: {
filter: {
bool: {
should: [
{
bool: {
must: [
{
__hoistProps: {
runtime_mappings: {
'FederalDoc.relevantContractDates.signedDate.fiscal':
{
type: 'date',
script: {
source:
"if(doc['FederalDoc.relevantContractDates.signedDate'].size()!=0){emit(doc['FederalDoc.relevantContractDates.signedDate'].value.plusMonths(params['monthOffset']).toInstant().toEpochMilli())}",
params: {
monthOffset: 3,
},
},
},
},
},
range: {
'FederalDoc.relevantContractDates.signedDate.fiscal':
{
gte: '2015-04-01T00:00:00.000Z',
lte: '2015-06-30T23:59:59Z',
},
},
},
],
},
},
],
minimum_should_match: 1,
},
},
},
},
}
let output = {
result: {
index: 'sp-data-lit',
query: {
constant_score: {
filter: {
Expand All @@ -386,21 +430,6 @@ describe('futil candidates', () => {
bool: {
must: [
{
__hoistProps: {
runtime_mappings: {
'FederalDoc.relevantContractDates.signedDate.fiscal':
{
type: 'date',
script: {
source:
"if(doc['FederalDoc.relevantContractDates.signedDate'].size()!=0){emit(doc['FederalDoc.relevantContractDates.signedDate'].value.plusMonths(params['monthOffset']).toInstant().toEpochMilli())}",
params: {
monthOffset: 3,
},
},
},
},
},
range: {
'FederalDoc.relevantContractDates.signedDate.fiscal':
{
Expand All @@ -419,39 +448,6 @@ describe('futil candidates', () => {
},
},
},
}
let output = {
result: {
index: 'sp-data-lit',
body: {
query: {
constant_score: {
filter: {
bool: {
should: [
{
bool: {
must: [
{
range: {
'FederalDoc.relevantContractDates.signedDate.fiscal':
{
gte: '2015-04-01T00:00:00.000Z',
lte: '2015-06-30T23:59:59Z',
},
},
},
],
},
},
],
minimum_should_match: 1,
},
},
},
},
},
},
removed: [
{
runtime_mappings: {
Expand Down

0 comments on commit 618e01b

Please sign in to comment.