Skip to content

Commit

Permalink
update code to use the comunica hash-bindings bus
Browse files Browse the repository at this point in the history
  • Loading branch information
maartyman committed Oct 21, 2024
1 parent c2c44f4 commit a4a45fb
Show file tree
Hide file tree
Showing 17 changed files with 63 additions and 353 deletions.
2 changes: 0 additions & 2 deletions engines/query-sparql-incremental/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@
"@incremunica/actor-guard-naive": "^1.3.0",
"@incremunica/actor-merge-bindings-context-is-addition": "^1.3.0",
"@incremunica/actor-query-operation-incremental-distinct-hash": "^1.3.0",
"@incremunica/actor-query-operation-incremental-filter": "^1.3.0",
"@incremunica/actor-query-source-identify-hypermedia-stream-none": "^1.3.0",
"@incremunica/actor-query-source-identify-streaming-rdfjs": "^1.3.0",
"@incremunica/actor-rdf-join-incremental-minus-hash": "^1.3.0",
Expand All @@ -199,7 +198,6 @@
"@incremunica/bus-resource-watch": "^1.3.0",
"@incremunica/config-query-sparql-incremental": "^1.3.0",
"@incremunica/context-entries": "^1.3.0",
"@incremunica/hash-bindings": "^1.3.0",
"@incremunica/incremental-inner-join": "^1.3.0",
"@incremunica/incremental-rdf-streaming-store": "^1.3.0",
"@incremunica/incremental-types": "^1.3.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@ import type {
BindingsStream,
} from '@comunica/types';
import { ActionContextKeyIsAddition } from '@incremunica/actor-merge-bindings-context-is-addition';
import { HashBindings } from '@incremunica/hash-bindings';
import type { AsyncIterator } from 'asynciterator';
import type { Algebra } from 'sparqlalgebrajs';
import { getSafeBindings } from '@comunica/utils-query-operation';
import {MediatorHashBindings} from "@comunica/bus-hash-bindings";

/**
* An Incremunica Distinct Hash Query Operation Actor.
*/
export class ActorQueryOperationIncrementalDistinctHash extends ActorQueryOperationTypedMediated<Algebra.Distinct> {
public constructor(args: IActorQueryOperationDistinctHashArgs) {
public readonly mediatorHashBindings: MediatorHashBindings;

public constructor(args: ActorQueryOperationIncrementalDistinctHashArgs) {
super(args, 'distinct');
}

Expand All @@ -32,8 +34,9 @@ export class ActorQueryOperationIncrementalDistinctHash extends ActorQueryOperat
const output: IQueryOperationResultBindings = getSafeBindings(
await this.mediatorQueryOperation.mediate({ operation: operation.input, context }),
);
const { hashFunction } = await this.mediatorHashBindings.mediate({ context });
const bindingsStream = <BindingsStream><unknown>(<AsyncIterator<Bindings>><unknown>output.bindingsStream).filter(
this.newHashFilter(),
this.newHashFilter(entry => hashFunction(entry, [...entry.keys()])),
);
return {
type: 'bindings',
Expand All @@ -47,12 +50,11 @@ export class ActorQueryOperationIncrementalDistinctHash extends ActorQueryOperat
* This will maintain an internal hash datastructure so that every bindings object only returns true once.
* @return {(bindings: Bindings) => boolean} A distinct filter for bindings.
*/
public newHashFilter(): (bindings: Bindings) => boolean {
const hashBindings = new HashBindings();
public newHashFilter(hashBindings: (bindings: Bindings) => number): (bindings: Bindings) => boolean {
// Base comunica uses an object here but as we hash deletions incremunica uses a Map
const hashes: Map<string, number> = new Map<string, number>();
const hashes: Map<number, number> = new Map<number, number>();
return (bindings: Bindings) => {
const hash: string = hashBindings.hash(bindings);
const hash = hashBindings(bindings);
const hasMapValue = hashes.get(hash);
if (bindings.getContextEntry(new ActionContextKeyIsAddition())) {
if (hasMapValue) {
Expand All @@ -75,4 +77,6 @@ export class ActorQueryOperationIncrementalDistinctHash extends ActorQueryOperat
}
}

export interface IActorQueryOperationDistinctHashArgs extends IActorQueryOperationTypedMediatedArgs {}
export interface ActorQueryOperationIncrementalDistinctHashArgs extends IActorQueryOperationTypedMediatedArgs {
mediatorHashBindings: MediatorHashBindings;
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
"@comunica/core": "^4.0.1",
"@comunica/types": "^4.0.1",
"@incremunica/actor-merge-bindings-context-is-addition": "^1.3.0",
"@incremunica/hash-bindings": "^1.3.0",
"sparqlalgebrajs": "^4.3.8"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
"@comunica/mediatortype-join-coefficients": "^4.0.1",
"@comunica/types": "^4.0.1",
"@incremunica/actor-merge-bindings-context-is-addition": "^1.3.0",
"@incremunica/hash-bindings": "^1.3.0",
"@rdfjs/types": "*",
"asynciterator": "^3.9.0"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import type {
} from '@comunica/types';
import { ActionContextKeyIsAddition } from '@incremunica/actor-merge-bindings-context-is-addition';
import { KeysStreamingSource } from '@incremunica/context-entries';
import { HashBindings } from '@incremunica/hash-bindings';
import { TransformIterator, UnionIterator } from 'asynciterator';
import type { AsyncIterator } from 'asynciterator';
import type { Algebra } from 'sparqlalgebrajs';
import { Factory } from 'sparqlalgebrajs';
import {passTestWithSideData, TestResult} from "@comunica/core";
import {getSafeBindings, materializeOperation, getOperationSource} from '@comunica/utils-query-operation';
import {MediatorMergeBindingsContext} from "@comunica/bus-merge-bindings-context";
import type {MediatorHashBindings} from "@comunica/bus-hash-bindings";

/**
* A comunica Multi-way Bind RDF Join Actor.
Expand All @@ -37,6 +37,7 @@ export class ActorRdfJoinInnerIncrementalComputationalMultiBind extends ActorRdf
public readonly mediatorJoinEntriesSort: MediatorRdfJoinEntriesSort;
public readonly mediatorQueryOperation: MediatorQueryOperation;
public readonly mediatorMergeBindingsContext: MediatorMergeBindingsContext;
public readonly mediatorHashBindings: MediatorHashBindings;

public constructor(args: IActorRdfJoinInnerIncrementalComputationalMultiBindArgs) {
super(args, {
Expand Down Expand Up @@ -73,6 +74,7 @@ export class ActorRdfJoinInnerIncrementalComputationalMultiBind extends ActorRdf
* @param sources The sources of the query.
* @param algebraFactory The algebra factory.
* @param bindingsFactory The bindingsFactory created with bindings context merger.
* @param hashBindings A function that hashes bindings.
* @return {AsyncIterator<Bindings>}
*/
public static async createBindStream(
Expand All @@ -84,27 +86,25 @@ export class ActorRdfJoinInnerIncrementalComputationalMultiBind extends ActorRdf
sources: any,
algebraFactory: Factory,
bindingsFactory: BindingsFactory,
hashBindings: (bindings: Bindings) => number,
): Promise<AsyncIterator<Bindings>> {
const transformMap = new Map<
string,
{
elements: {
iterator: AsyncIterator<Bindings>;
stopFunction: (() => void);
}[];
subOperations: Algebra.Operation[];
}
>();

const hashBindings = new HashBindings();

number,
{
elements: {
iterator: AsyncIterator<Bindings>;
stopFunction: (() => void);
}[];
subOperations: Algebra.Operation[];
}
>();
// Create bindings function
const binder = async(
bindings: Bindings,
done: () => void,
push: (i: AsyncIterator<Bindings>) => void,
): Promise<void> => {
const hash = hashBindings.hash(bindings);
const hash = hashBindings(bindings);
let hashData = transformMap.get(hash);
if (bindings.getContextEntry(new ActionContextKeyIsAddition())) {
if (hashData === undefined) {
Expand Down Expand Up @@ -269,6 +269,8 @@ export class ActorRdfJoinInnerIncrementalComputationalMultiBind extends ActorRdf
dataFactory,
);

const { hashFunction } = await this.mediatorHashBindings.mediate({ context: action.context });

for (const [ i, element ] of entries.entries()) {
if (i !== 0) {
element.output.bindingsStream.close();
Expand Down Expand Up @@ -313,7 +315,8 @@ export class ActorRdfJoinInnerIncrementalComputationalMultiBind extends ActorRdf
false,
sources ?? [],
algebraFactory,
bindingsFactory
bindingsFactory,
entry => hashFunction(entry, [...entry.keys()]),
);

return {
Expand Down Expand Up @@ -356,4 +359,12 @@ export interface IActorRdfJoinInnerIncrementalComputationalMultiBindArgs extends
* The query operation mediator
*/
mediatorQueryOperation: MediatorQueryOperation;
/**
* The merge bindings context mediator
*/
mediatorMergeBindingsContext: MediatorMergeBindingsContext;
/**
* The hash bindings mediator
*/
mediatorHashBindings: MediatorHashBindings;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
"@comunica/types": "^4.0.1",
"@incremunica/actor-merge-bindings-context-is-addition": "^1.3.0",
"@incremunica/context-entries": "^1.3.0",
"@incremunica/hash-bindings": "^1.3.0",
"asynciterator": "^3.9.0",
"sparqlalgebrajs": "^4.3.8"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { Bindings } from '@comunica/utils-bindings-factory';
import { ActionContextKeyIsAddition } from '@incremunica/actor-merge-bindings-context-is-addition';
import { HashBindings } from '@incremunica/hash-bindings';
import { IncrementalInnerJoin } from '@incremunica/incremental-inner-join';
import type { AsyncIterator } from 'asynciterator';
import type { IMapObject } from './DualKeyHashMap';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
"@comunica/mediatortype-join-coefficients": "^4.0.1",
"@comunica/types": "^4.0.1",
"@incremunica/actor-merge-bindings-context-is-addition": "^1.3.0",
"@incremunica/hash-bindings": "^1.3.0",
"@incremunica/incremental-inner-join": "^1.3.0",
"asynciterator": "^3.9.0"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import type {
IJoinEntryWithMetadata, ComunicaDataFactory,
} from '@comunica/types';
import { ActionContextKeyIsAddition } from '@incremunica/actor-merge-bindings-context-is-addition';
import { HashBindings } from '@incremunica/hash-bindings';
import {
ArrayIterator,
EmptyIterator,
Expand All @@ -30,7 +29,7 @@ import { Factory } from 'sparqlalgebrajs';
import {passTestWithSideData, TestResult} from "@comunica/core";
import { getSafeBindings } from '@comunica/utils-query-operation';
import {MediatorMergeBindingsContext} from "@comunica/bus-merge-bindings-context";
import {factory} from "ts-jest/dist/transformers/hoist-jest";
import type {MediatorHashBindings} from "@comunica/bus-hash-bindings";

/**
* A comunica Multi-way Bind RDF Join Actor.
Expand All @@ -40,6 +39,7 @@ export class ActorRdfJoinInnerIncrementalMemoryMultiBind extends ActorRdfJoin {
public readonly mediatorJoinEntriesSort: MediatorRdfJoinEntriesSort;
public readonly mediatorQueryOperation: MediatorQueryOperation;
public readonly mediatorMergeBindingsContext: MediatorMergeBindingsContext;
public readonly mediatorHashBindings: MediatorHashBindings;

public constructor(args: IActorRdfJoinInnerIncrementalMemoryMultiBindArgs) {
super(args, {
Expand All @@ -59,6 +59,7 @@ export class ActorRdfJoinInnerIncrementalMemoryMultiBind extends ActorRdfJoin {
* @param optional If the original bindings should be emitted when the resulting bindings stream is empty.
* @param algebraFactory The algebra factory.
* @param bindingsFactory The bindingsFactory created with bindings context merger.
* @param hashBindings A function that hashes bindings.
* @return {AsyncIterator<Bindings>}
*/
public static async createBindStream(
Expand All @@ -69,34 +70,31 @@ export class ActorRdfJoinInnerIncrementalMemoryMultiBind extends ActorRdfJoin {
optional: boolean,
algebraFactory: Factory,
bindingsFactory: BindingsFactory,
hashBindings: (bindings: Bindings) => number,
): Promise<AsyncIterator<Bindings>> {
const transformMap = new Map<
string,
{
iterator: AsyncIterator<Bindings>;
memory: Map<
string,
number,
{
bindings: Bindings;
iterator: AsyncIterator<Bindings>;
memory: Map<
number,
{
bindings: Bindings;
count: number;
}
>;
count: number;
}
>;
count: number;
}
>();

const hashBindings = new HashBindings();
const hashSubBindings = new HashBindings();

>();
// Create bindings function
const binder = (bindings: Bindings, done: () => void, push: (i: AsyncIterator<Bindings>) => void): void => {
const hash = hashBindings.hash(bindings);
const hash = hashBindings(bindings);
if (bindings) {
const hashData = transformMap.get(hash);
if (hashData === undefined) {
const data = {
iterator: new EmptyIterator<Bindings>(),
memory: new Map<string, { bindings: Bindings; count: number }>(),
memory: new Map<number, { bindings: Bindings; count: number }>(),
count: 1,
};
transformMap.set(hash, data);
Expand All @@ -118,7 +116,7 @@ export class ActorRdfJoinInnerIncrementalMemoryMultiBind extends ActorRdfJoin {
subDone();
return;
}
const bindingsHash = hashSubBindings.hash(newBindings);
const bindingsHash = hashBindings(newBindings);
const bindingsData = data.memory.get(bindingsHash);
if (newBindings.getContextEntry(new ActionContextKeyIsAddition())) {
if (bindingsData === undefined) {
Expand Down Expand Up @@ -291,6 +289,8 @@ export class ActorRdfJoinInnerIncrementalMemoryMultiBind extends ActorRdfJoin {
dataFactory,
);

const { hashFunction } = await this.mediatorHashBindings.mediate({ context: action.context });

for (const [ i, element ] of entries.entries()) {
if (i !== 0) {
element.output.bindingsStream.close();
Expand Down Expand Up @@ -321,7 +321,8 @@ export class ActorRdfJoinInnerIncrementalMemoryMultiBind extends ActorRdfJoin {
},
false,
algebraFactory,
bindingsFactory
bindingsFactory,
entry => hashFunction(entry, [...entry.keys()]),
);

return {
Expand Down Expand Up @@ -368,4 +369,8 @@ export interface IActorRdfJoinInnerIncrementalMemoryMultiBindArgs extends IActor
* The merge bindings context mediator
*/
mediatorMergeBindingsContext: MediatorMergeBindingsContext;
/**
* The hash bindings mediator
*/
mediatorHashBindings: MediatorHashBindings;
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
"@comunica/mediatortype-join-coefficients": "^4.0.1",
"@comunica/types": "^4.0.1",
"@incremunica/actor-merge-bindings-context-is-addition": "^1.3.0",
"@incremunica/hash-bindings": "^1.3.0",
"asynciterator": "^3.9.0",
"sparqlalgebrajs": "^4.3.8"
}
Expand Down
Empty file removed packages/hash-bindings/.npmignore
Empty file.
11 changes: 0 additions & 11 deletions packages/hash-bindings/README.md

This file was deleted.

37 changes: 0 additions & 37 deletions packages/hash-bindings/lib/HashBindings.ts

This file was deleted.

1 change: 0 additions & 1 deletion packages/hash-bindings/lib/index.ts

This file was deleted.

Loading

0 comments on commit a4a45fb

Please sign in to comment.