Skip to content

Commit

Permalink
add delete hash join
Browse files Browse the repository at this point in the history
  • Loading branch information
maartyman committed Nov 14, 2024
1 parent a9bf49a commit 3e4ff55
Show file tree
Hide file tree
Showing 6 changed files with 1,148 additions and 0 deletions.
31 changes: 31 additions & 0 deletions packages/actor-rdf-join-inner-incremental-delete-hash/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Incremunica Inner Incremental Delete Hash RDF Join Actor

[![npm version](https://badge.fury.io/js/@incremunica%2Factor-rdf-join-inner-incremental-delete-hash.svg)](https://badge.fury.io/js/@incremunica%2Factor-rdf-join-inner-incremental-delete-hash)

A comunica Inner Incremental Delete Hash RDF Join Actor.

## Install

```bash
$ yarn add @comunica/actor-rdf-join-inner-incremental-delete-hash
```

## Configure

After installing, this package can be added to your engine's configuration as follows:
```text
{
"@context": [
...
"https://linkedsoftwaredependencies.org/bundles/npm/@comunica/actor-rdf-join-inner-incremental-delete-hash/^1.0.0/components/context.jsonld"
],
"actors": [
...
{
"@id": "urn:comunica:default:rdf-join/actors#inner-incremental-delete-hash",
"@type": "ActorRdfJoinInnerIncrementalDeleteHash",
"mediatorJoinSelectivity": { "@id": "urn:comunica:default:rdf-join-selectivity/mediators#main" }
}
]
}
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import type { MediatorHashBindings } from '@comunica/bus-hash-bindings';
import type {
IActionRdfJoin,
IActorRdfJoinArgs,
IActorRdfJoinOutputInner,
IActorRdfJoinTestSideData,
} from '@comunica/bus-rdf-join';
import {
ActorRdfJoin,
} from '@comunica/bus-rdf-join';
import type { TestResult } from '@comunica/core';
import { passTestWithSideData } from '@comunica/core';
import type { IMediatorTypeJoinCoefficients } from '@comunica/mediatortype-join-coefficients';
import type { BindingsStream } from '@comunica/types';
import type { Bindings } from '@comunica/utils-bindings-factory';
import type { AsyncIterator } from 'asynciterator';
import { IncrementalDeleteHashJoin } from './IncrementalDeleteHashJoin';

/**
* A comunica Inner Incremental Delete Hash RDF Join Actor.
*/
export class ActorRdfJoinInnerIncrementalDeleteHash extends ActorRdfJoin {
public readonly mediatorHashBindings: MediatorHashBindings;

public constructor(args: IActorRdfJoinInnerIncrementalDeleteHashArgs) {
super(args, {
logicalType: 'inner',
physicalName: 'delete-hash',
limitEntries: 2,
canHandleUndefs: false,
});
}

protected async getOutput(action: IActionRdfJoin): Promise<IActorRdfJoinOutputInner> {
const metadatas = await ActorRdfJoin.getMetadatas(action.entries);
const commonVariables = ActorRdfJoin.overlappingVariables(metadatas).map(variable => variable.variable.value);
const { hashFunction } = await this.mediatorHashBindings.mediate({ context: action.context });
const bindingsStream = <BindingsStream><unknown> new IncrementalDeleteHashJoin(
<AsyncIterator<Bindings>><unknown>action.entries[0].output.bindingsStream,
<AsyncIterator<Bindings>><unknown>action.entries[1].output.bindingsStream,
<(...bindings: Bindings[]) => Bindings | null>ActorRdfJoin.joinBindings,
commonVariables,
entry => hashFunction(entry, metadatas[0].variables.map(v => v.variable)),
entry => hashFunction(entry, metadatas[1].variables.map(v => v.variable)),
);
return {
result: {
type: 'bindings',
bindingsStream,
metadata: async() => await this.constructResultMetadata(
action.entries,
await ActorRdfJoin.getMetadatas(action.entries),
action.context,
),
},
};
}

public async getJoinCoefficients(
_action: IActionRdfJoin,
sideData: IActorRdfJoinTestSideData,
): Promise<TestResult<IMediatorTypeJoinCoefficients, IActorRdfJoinTestSideData>> {
return passTestWithSideData({
iterations: 0,
persistedItems: 0,
blockingItems: 0,
requestTime: 0,
}, sideData);
}
}

export interface IActorRdfJoinInnerIncrementalDeleteHashArgs extends IActorRdfJoinArgs {
mediatorHashBindings: MediatorHashBindings;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import type { Bindings } from '@comunica/utils-bindings-factory';
import { KeysBindings } from '@incremunica/context-entries';
import { IncrementalInnerJoin } from '@incremunica/incremental-inner-join';
import type { AsyncIterator } from 'asynciterator';

interface IBindingsWithCount {
bindings: Bindings;
count: number;
}

export class IncrementalDeleteHashJoin extends IncrementalInnerJoin {
private readonly rightMemory: Map<number, IBindingsWithCount> = new Map<number, IBindingsWithCount>();
private readonly leftMemory: Map<number, IBindingsWithCount> = new Map<number, IBindingsWithCount>();
private activeElement: Bindings | null = null;
private otherArray: IterableIterator<IBindingsWithCount> = [][Symbol.iterator]();
private count = 0;
private otherArrayElement: IteratorResult<IBindingsWithCount, IBindingsWithCount | null> =
{ done: true, value: null };

private readonly overlappingVariables: string[];
private readonly hashLeft: (entry: Bindings) => number;
private readonly hashRight: (entry: Bindings) => number;

public constructor(
left: AsyncIterator<Bindings>,
right: AsyncIterator<Bindings>,
funJoin: (...bindings: Bindings[]) => Bindings | null,
overlappingVariables: string[],
hashLeft: (bindings: Bindings) => number,
hashRight: (bindings: Bindings) => number,
) {
super(left, right, funJoin);
this.overlappingVariables = overlappingVariables;
this.hashLeft = hashLeft;
this.hashRight = hashRight;
}

protected _cleanup(): void {
this.leftMemory.clear();
this.rightMemory.clear();
this.activeElement = null;
}

protected hasResults(): boolean {
return !this.leftIterator.done ||
!this.rightIterator.done ||
this.activeElement !== null;
}

private addOrDeleteFromMemory(
item: Bindings,
memory: Map<number, IBindingsWithCount>,
hashFunc: (bindings: Bindings) => number,
): boolean {
const hash = hashFunc(item);
const el = memory.get(hash);
if (item.getContextEntry(KeysBindings.isAddition)) {
if (el === undefined) {
memory.set(hash, { bindings: item, count: 1 });
} else {
el.count++;
}
return true;
}
if (el !== undefined) {
if (el.count <= 1) {
memory.delete(hash);
return true;
}
el.count--;
}
return true;
}

public read(): Bindings | null {
while (true) {
if (this.ended) {
return null;
}

// There is an active element
if (this.activeElement !== null) {
if (this.otherArrayElement.value === null || this.count === this.otherArrayElement.value.count) {
this.otherArrayElement = this.otherArray.next();
this.count = 0;
// eslint-disable-next-line ts/prefer-nullish-coalescing
if (this.otherArrayElement.done || !this.otherArrayElement.value) {
this.activeElement = null;
this.otherArrayElement = { done: true, value: null };
continue;
}
}
let areCompatible = true;
for (const variable of this.overlappingVariables) {
const activeElementTerm = this.activeElement.get(variable);
const otherArrayTerm = this.otherArrayElement.value.bindings.get(variable);
if (activeElementTerm && otherArrayTerm && !activeElementTerm.equals(otherArrayTerm)) {
areCompatible = false;
}
}
let resultingBindings = null;
if (areCompatible) {
resultingBindings = this.funJoin(this.activeElement, this.otherArrayElement.value.bindings);
}

this.count++;

if (resultingBindings !== null) {
return resultingBindings;
}
continue;
}

if (!this.hasResults()) {
this._end();
}

let item = null;
if (this.leftIterator.readable) {
item = this.leftIterator.read();
}
if (item !== null) {
if (this.addOrDeleteFromMemory(item, this.leftMemory, this.hashLeft)) {
this.activeElement = item;
this.otherArray = this.rightMemory.values();
}
continue;
}

if (this.rightIterator.readable) {
item = this.rightIterator.read();
}
if (item !== null) {
if (this.addOrDeleteFromMemory(item, this.rightMemory, this.hashRight)) {
this.activeElement = item;
this.otherArray = this.leftMemory.values();
}
continue;
}

this.readable = false;
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './ActorRdfJoinInnerIncrementalDeleteHash';
49 changes: 49 additions & 0 deletions packages/actor-rdf-join-inner-incremental-delete-hash/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"name": "@incremunica/actor-rdf-join-inner-incremental-delete-hash",
"version": "1.3.0",
"description": "A inner-incremental-delete-hash rdf-join actor",
"lsd:module": true,
"license": "MIT",
"homepage": "https://maartyman.github.io/incremunica/",
"repository": {
"type": "git",
"url": "https://github.com/maartyman/incremunica.git",
"directory": "packages/actor-rdf-join-inner-incremental-delete-hash"
},
"bugs": {
"url": "https://github.com/maartyman/incremunica/issues"
},
"keywords": [
"comunica",
"actor",
"rdf-join",
"inner-incremental-delete-hash"
],
"sideEffects": false,
"main": "lib/index.js",
"typings": "lib/index",
"publishConfig": {
"access": "public"
},
"files": [
"components",
"lib/**/*.d.ts",
"lib/**/*.js",
"lib/**/*.js.map"
],
"scripts": {
"build": "npm run build:ts && npm run build:components",
"build:ts": "node \"../../node_modules/typescript/bin/tsc\"",
"build:components": "componentsjs-generator"
},
"dependencies": {
"@comunica/bus-hash-bindings": "^4.0.2",
"@comunica/bus-rdf-join": "^4.0.1",
"@comunica/core": "^4.0.2",
"@comunica/mediatortype-join-coefficients": "^4.0.1",
"@comunica/utils-bindings-factory": "^4.0.1",
"@incremunica/context-entries": "^1.3.0",
"@incremunica/incremental-inner-join": "^1.3.0",
"asynciterator": "^3.9.0"
}
}
Loading

0 comments on commit 3e4ff55

Please sign in to comment.