Skip to content

Commit

Permalink
feat: add LIMIT and OFFSET support
Browse files Browse the repository at this point in the history
  • Loading branch information
maartyman committed Jan 22, 2025
1 parent 01f2691 commit 197d206
Show file tree
Hide file tree
Showing 9 changed files with 980 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"ccqs:config/query-operation/actors/query/union.json",
"ccqs:config/query-operation/actors/query/minus.json",
"ccqs:config/query-operation/actors/query/nop.json",
"icqsi:config/query-operation/actors/query/slice.json",
"ccqs:config/query-operation/actors/query/leftjoin.json",
"ccqs:config/query-operation/actors/query/values.json",
"ccqs:config/query-operation/actors/query/bgp.json",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"@context": [
"https://linkedsoftwaredependencies.org/bundles/npm/@comunica/runner/^4.0.0/components/context.jsonld",

"https://linkedsoftwaredependencies.org/bundles/npm/@incremunica/actor-query-operation-slice/^1.0.0/components/context.jsonld"
],
"@id": "urn:comunica:default:Runner",
"@type": "Runner",
"actors": [
{
"@id": "urn:comunica:default:query-operation/actors#slice",
"@type": "ActorQueryOperationSlice",
"mediatorQueryOperation": { "@id": "urn:comunica:default:query-operation/mediators#main" }
}
]
}
1 change: 1 addition & 0 deletions engines/query-sparql-incremental/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@
"@incremunica/actor-merge-bindings-context-is-addition": "^1.3.0",
"@incremunica/actor-query-operation-distinct-hash": "^1.3.0",
"@incremunica/actor-query-operation-group": "^1.3.0",
"@incremunica/actor-query-operation-slice": "^1.3.0",
"@incremunica/actor-query-source-identify-hypermedia-none": "^1.3.0",
"@incremunica/actor-query-source-identify-stream": "^1.3.0",
"@incremunica/actor-query-source-identify-streaming-rdfjs": "^1.3.0",
Expand Down
35 changes: 35 additions & 0 deletions packages/actor-query-operation-slice/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Incremunica Slice Query Operation Actor

[![npm version](https://badge.fury.io/js/%40incremunica%2Factor-query-operation-slice.svg)](https://www.npmjs.com/package/@incremunica/actor-query-operation-slice)

A [Query Operation](https://github.com/comunica/comunica/tree/master/packages/bus-query-operation) actor that handles SPARQL [`OFFSET`](https://www.w3.org/TR/sparql11-query/#modOffset) and [`LIMIT`](https://www.w3.org/TR/sparql11-query/#modResultLimit) operations.

## Install

```bash
$ yarn add @incremunica/actor-query-operation-slice
```

## Configure

After installing, this package can be added to your engine's configuration as follows:
```text
{
"@context": [
...
"https://linkedsoftwaredependencies.org/bundles/npm/@incremunica/actor-query-operation-slice/^1.0.0/components/context.jsonld"
],
"actors": [
...
{
"@id": "urn:comunica:default:query-operation/actors#slice",
"@type": "ActorQueryOperationSlice",
"mediatorQueryOperation": { "@id": "urn:comunica:default:query-operation/mediators#main" }
}
]
}
```

### Config Parameters

* `mediatorQueryOperation`: A mediator over the [Query Operation bus](https://github.com/comunica/comunica/tree/master/packages/bus-query-operation).
210 changes: 210 additions & 0 deletions packages/actor-query-operation-slice/lib/ActorQueryOperationSlice.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import type { IActorQueryOperationTypedMediatedArgs } from '@comunica/bus-query-operation';
import {
ActorQueryOperationTypedMediated,
} from '@comunica/bus-query-operation';
import { KeysInitQuery, KeysQueryOperation } from '@comunica/context-entries';
import type { IActorTest, TestResult } from '@comunica/core';
import { passTestVoid } from '@comunica/core';
import type {
IQueryOperationResult,
IQueryOperationResultBindings,
IQueryOperationResultQuads,
IQueryOperationResultStream,
IMetadata,
IActionContext,
BindingsStream,
} from '@comunica/types';
import type { Bindings } from '@comunica/utils-bindings-factory';
import { bindingsToString } from '@comunica/utils-bindings-factory';
import { KeysBindings } from '@incremunica/context-entries';
import type { Quad } from '@incremunica/types';
import type { AsyncIterator } from 'asynciterator';
import { termToString } from 'rdf-string';
import type { Algebra } from 'sparqlalgebrajs';

/**
* An Incremunica Slice Query Operation Actor.
*/
export class ActorQueryOperationSlice extends ActorQueryOperationTypedMediated<Algebra.Slice> {
public constructor(args: IActorQueryOperationTypedMediatedArgs) {
super(args, 'slice');
}

public async testOperation(_operation: Algebra.Slice, _context: IActionContext): Promise<TestResult<IActorTest>> {
return passTestVoid();
}

public async runOperation(operation: Algebra.Slice, context: IActionContext): Promise<IQueryOperationResult> {
// Add limit indicator to the context, which can be used for query planning
// eslint-disable-next-line unicorn/explicit-length-check
if (operation.length) {
context = context.set(KeysQueryOperation.limitIndicator, operation.length);
}
const dataFactory = context.get(KeysInitQuery.dataFactory)!;

// Resolve the input
const output: IQueryOperationResult = await this.mediatorQueryOperation
.mediate({ operation: operation.input, context });

if (output.type === 'bindings') {
const bindingsStream = <BindingsStream><any> this.sliceStream<Bindings>(
<AsyncIterator<Bindings>><any>output.bindingsStream,
operation,
bindings => bindingsToString(bindings),
bindings => bindings.getContextEntry(KeysBindings.isAddition) ?? true,
bindings => bindings.setContextEntry(KeysBindings.isAddition, false),
);
return <IQueryOperationResultBindings> {
type: 'bindings',
bindingsStream,
metadata: this.sliceMetadata(output, operation),
};
}

if (output.type === 'quads') {
const quadStream = <any> this.sliceStream<Quad>(
<AsyncIterator<Quad>><any>output.quadStream,
operation,
quad => termToString(quad),
quad => quad.isAddition ?? true,
(quad) => {
const newQuad = <Quad>dataFactory.quad(quad.subject, quad.predicate, quad.object, quad.graph);
newQuad.isAddition = false;
return newQuad;
},
);
return <IQueryOperationResultQuads> {
type: 'quads',
quadStream,
metadata: this.sliceMetadata(output, operation),
};
}

// In all other cases, return the result as-is.
return output;
}

// Slice the stream based on the pattern values
private sliceStream<T extends Bindings | Quad>(
stream: AsyncIterator<T>,
pattern: Algebra.Slice,
hashFunction: (item: T) => string,
isAdditionFunction: (item: T) => boolean,
makeDeletion: (item: T) => T,
): AsyncIterator<T> {
// eslint-disable-next-line unicorn/explicit-length-check
const hasLength: boolean = Boolean(pattern.length) || pattern.length === 0;
const { start } = pattern;
const length = hasLength ? pattern.length! : Number.POSITIVE_INFINITY;

const addElement = (map: Map<string, { element: T; count: number }>, hash: string, element: T): void => {
const mapValue = map.get(hash);
if (mapValue === undefined) {
map.set(hash, { element, count: 1 });
} else {
mapValue.count++;
}
};

const deleteElement = (map: Map<string, { element: T; count: number }>, hash: string): boolean => {
const mapValue = map.get(hash);
if (mapValue === undefined) {
return false;
}
if (mapValue.count === 1) {
map.delete(hash);
} else {
mapValue.count--;
}
return true;
};

const deleteRandomElement = (map: Map<string, { element: T; count: number }>): [string, T] => {
const [ key, value ]: [string, { element: T; count: number }] = map.entries().next().value;
const mapValue = map.get(key)!;
if (mapValue.count === 1) {
map.delete(key);
} else {
mapValue.count--;
}
return [ key, value.element ];
};

const result = new Map<string, { element: T; count: number }>();
const overflow = new Map<string, { element: T; count: number }>();
let resultSize = 0;
let overflowSize = 0;
const transform = (incomingElement: T, done: () => void, push: (result: T) => void): void => {
const hash = hashFunction(incomingElement);
const isAddition = isAdditionFunction(incomingElement);
if (isAddition) {
if ((overflowSize < start) || (resultSize >= length)) {
addElement(overflow, hash, incomingElement);
overflowSize++;
done();
return;
}
addElement(result, hash, incomingElement);
resultSize++;
push(incomingElement);
done();
return;
}
// If a deletion first check overflow
if (deleteElement(overflow, hash)) {
if (overflowSize <= start && resultSize > 0) {
const [ deletedKey, deletedElement ] = deleteRandomElement(result);
push(makeDeletion(deletedElement));
resultSize--;
addElement(overflow, deletedKey, deletedElement);
done();
return;
}
overflowSize--;
done();
return;
}
// Otherwise check the result
if (deleteElement(result, hash)) {
push(makeDeletion(incomingElement));
if (overflowSize > start) {
const [ deletedKey, deletedElement ] = deleteRandomElement(overflow);
push(deletedElement);
overflowSize--;
addElement(result, deletedKey, deletedElement);
done();
return;
}
resultSize--;
done();
return;
}
done();
stream.destroy(new Error(`Deletion ${hash}, has not been added.`));
};

return stream.transform({
transform,
});
}

// If we find metadata, apply slicing on the total number of items
private sliceMetadata(
output: IQueryOperationResultStream<any, any>,
pattern: Algebra.Slice,
): () => Promise<IMetadata<any>> {
// eslint-disable-next-line unicorn/explicit-length-check
const hasLength: boolean = Boolean(pattern.length) || pattern.length === 0;
return () => (<() => Promise<IMetadata<any>>>output.metadata)()
.then((subMetadata) => {
const cardinality = { ...subMetadata.cardinality };
if (Number.isFinite(cardinality.value)) {
cardinality.value = Math.max(0, cardinality.value - pattern.start);
if (hasLength) {
cardinality.value = Math.min(cardinality.value, pattern.length!);
}
}
return { ...subMetadata, cardinality };
});
}
}
1 change: 1 addition & 0 deletions packages/actor-query-operation-slice/lib/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './ActorQueryOperationSlice';
49 changes: 49 additions & 0 deletions packages/actor-query-operation-slice/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"name": "@incremunica/actor-query-operation-slice",
"version": "1.3.0",
"description": "A slice query-operation actor",
"lsd:module": true,
"license": "MIT",
"homepage": "https://maartyman.github.io/incremunica/",
"repository": {
"type": "git",
"url": "https://github.com/comunica/comunica.git",
"directory": "packages/actor-query-operation-slice"
},
"bugs": {
"url": "https://github.com/comunica/comunica/issues"
},
"keywords": [
"incremunica",
"actor",
"query-operation",
"slice"
],
"sideEffects": false,
"main": "lib/index.js",
"typings": "lib/index",
"publishConfig": {
"access": "public"
},
"files": [
"components",
"lib/**/*.d.ts",
"lib/**/*.js",
"lib/**/*.js.map"
],
"scripts": {
"build": "yarn run build:ts && yarn run build:components",
"build:ts": "node \"../../node_modules/typescript/bin/tsc\"",
"build:components": "componentsjs-generator"
},
"dependencies": {
"@comunica/bus-query-operation": "^4.0.2",
"@comunica/context-entries": "^4.0.2",
"@comunica/core": "^4.0.2",
"@comunica/types": "^4.0.2",
"@comunica/utils-bindings-factory": "^4.0.2",
"@incremunica/context-entries": "^1.3.0",
"rdf-string": "^2.0.0",
"sparqlalgebrajs": "^4.3.8"
}
}
Loading

0 comments on commit 197d206

Please sign in to comment.