Skip to content

Commit

Permalink
Merge pull request #44 from maartyman/master
Browse files Browse the repository at this point in the history
update branch
  • Loading branch information
maartyman authored Nov 15, 2023
2 parents 0324ff8 + 7648ac0 commit 7f5553d
Show file tree
Hide file tree
Showing 32 changed files with 587 additions and 941 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- name: Run depcheck
run: yarn run depcheck
- name: Run tests
run: yarn run test:ci
run: yarn run test-ci
- name: Submit coverage results
uses: coverallsapp/github-action@master
with:
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,24 +85,24 @@
"test-changed": "lerna run test --since HEAD",
"build-changed": "lerna run build --since HEAD",
"test": "jest",
"test:ci": "jest --ci --maxWorkers=4 --coverage",
"test-ci": "jest --ci --maxWorkers=4 --coverage",
"lint": "eslint . --ext .ts --cache",
"lint-fix": "eslint . --ext .ts --fix",
"build": "npm run build:ts && npm run build:components",
"build:ts": "tsc",
"build:components": "componentsjs-generator engines/* packages/*",
"build": "npm run build-ts && npm run build-components",
"build-ts": "tsc",
"build-components": "componentsjs-generator engines/* packages/*",
"build-watch": "nodemon -e ts --ignore '*.d.ts' --exec yarn run build",
"build-watch:ts": "tsc --watch",
"build-watch:components": "nodemon -e d.ts --exec yarn run build:components",
"publish": "yarn run build",
"publish": "yarn install && yarn pre-commit && yarn publish-release",
"publish-release": "lerna publish",
"publish-bare": "lerna exec -- npm publish --silent",
"publish-canary": "yarn run build && lerna version prerelease --preid alpha.$(.github/get-next-alpha-version.sh) --exact --ignore-scripts --force-publish --no-push --no-git-tag-version --yes && git update-index --assume-unchanged $(git ls-files | tr '\\n' ' ') && lerna publish from-package --no-git-reset --pre-dist-tag next --force-publish --no-push --no-git-tag-version --yes && git update-index --no-assume-unchanged $(git ls-files | tr '\\n' ' ') && git checkout .",
"doc": "typedoc",
"postinstall": "yarn run build && lerna run prepare",
"version": "manual-git-changelog onversion",
"depcheck": "lerna-script depcheckTask",
"depcheck:fix": "lerna-script depfixTask"
"depcheck-fix": "lerna-script depfixTask"
},
"resolutions": {
"@rdfjs/types": "1.1.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import type {
IQueryOperationResultBindings,
MetadataBindings,
} from '@comunica/types';
import { HashBindings } from '@incremunica/hash-bindings';
import type { Bindings } from '@incremunica/incremental-bindings-factory';
import type { AsyncIterator } from 'asynciterator';
import { TransformIterator, UnionIterator } from 'asynciterator';
import { Algebra, Factory } from 'sparqlalgebrajs';
import type { Algebra } from 'sparqlalgebrajs';
import { Factory } from 'sparqlalgebrajs';

/**
* A comunica Multi-way Bind RDF Join Actor.
Expand All @@ -30,14 +32,6 @@ export class ActorRdfJoinInnerIncrementalComputationalMultiBind extends ActorRdf

public static readonly FACTORY = new Factory();

public static bindingHash(bindings: Bindings): string {
let hash = '';
for (const binding of bindings) {
hash += `${binding[0].value}:${binding[1].value}#`;
}
return hash;
}

public constructor(args: IActorRdfJoinInnerIncrementalComputationalMultiBindArgs) {
super(args, {
logicalType: 'inner',
Expand Down Expand Up @@ -91,9 +85,11 @@ export class ActorRdfJoinInnerIncrementalComputationalMultiBind extends ActorRdf
subOperations: Algebra.Operation[];
}>();

const hashBindings = new HashBindings();

// Create bindings function
const binder = async(bindings: Bindings, done: () => void, push: (i: BindingsStream) => void): Promise<void> => {
const hash = ActorRdfJoinInnerIncrementalComputationalMultiBind.bindingHash(bindings);
const hash = hashBindings.hash(bindings);
let hashData = transformMap.get(hash);
if (bindings.diff) {
if (hashData === undefined) {
Expand Down Expand Up @@ -308,54 +304,11 @@ export class ActorRdfJoinInnerIncrementalComputationalMultiBind extends ActorRdf
action: IActionRdfJoin,
metadatas: MetadataBindings[],
): Promise<IMediatorTypeJoinCoefficients> {
// Order the entries so we can pick the first one (usually the one with the lowest cardinality)
const entries = await this.sortJoinEntries(action.entries
.map((entry, i) => ({ ...entry, metadata: metadatas[i] })), action.context);
metadatas = entries.map(entry => entry.metadata);

const requestInitialTimes = ActorRdfJoin.getRequestInitialTimes(metadatas);
const requestItemTimes = ActorRdfJoin.getRequestItemTimes(metadatas);

// Determine first stream and remaining ones
const remainingEntries = [ ...entries ];
const remainingRequestInitialTimes = [ ...requestInitialTimes ];
const remainingRequestItemTimes = [ ...requestItemTimes ];
remainingEntries.splice(0, 1);
remainingRequestInitialTimes.splice(0, 1);
remainingRequestItemTimes.splice(0, 1);

// Reject binding on some operation types
if (remainingEntries
.some(entry => entry.operation.type === Algebra.types.EXTEND || entry.operation.type === Algebra.types.GROUP)) {
throw new Error(`Actor ${this.name} can not bind on Extend and Group operations`);
}

// Determine selectivities of smallest entry with all other entries
const selectivities = await Promise.all(remainingEntries
.map(async entry => (await this.mediatorJoinSelectivity.mediate({
entries: [ entries[0], entry ],
context: action.context,
})).selectivity * this.selectivityModifier));

// Determine coefficients for remaining entries
const cardinalityRemaining = remainingEntries
.map((entry, i) => entry.metadata.cardinality.value * selectivities[i])
.reduce((sum, element) => sum + element, 0);
const receiveInitialCostRemaining = remainingRequestInitialTimes
.reduce((sum, element, i) => sum + (element * selectivities[i]), 0);
const receiveItemCostRemaining = remainingRequestItemTimes
.reduce((sum, element, i) => sum + (element * selectivities[i]), 0);

return {
iterations: metadatas[0].cardinality.value * cardinalityRemaining,
iterations: 0,
persistedItems: 0,
blockingItems: 0,
requestTime: requestInitialTimes[0] +
metadatas[0].cardinality.value * (
requestItemTimes[0] +
receiveInitialCostRemaining +
cardinalityRemaining * receiveItemCostRemaining
),
requestTime: 0,
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"@comunica/mediatortype-join-coefficients": "^2.6.8",
"@comunica/types": "^2.6.8",
"@incremunica/incremental-bindings-factory": "^1.0.0",
"@incremunica/hash-bindings": "^1.0.0",
"asynciterator": "^3.8.0",
"sparqlalgebrajs": "^4.0.5"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ async function partialArrayifyStream(stream: EventEmitter, num: number): Promise
}


describe('ActorRdfJoinMultiBind', () => {
describe('ActorRdfJoinIncrementalComputationalMultiBind', () => {
let bus: any;

beforeEach(() => {
bus = new Bus({ name: 'bus' });
});

describe('An ActorRdfJoinMultiBind instance', () => {
describe('An ActorRdfJoinIncrementalComputationalMultiBind instance', () => {
let mediatorJoinSelectivity: Mediator<
Actor<IActionRdfJoinSelectivity, IActorTest, IActorRdfJoinSelectivityOutput>,
IActionRdfJoinSelectivity, IActorTest, IActorRdfJoinSelectivityOutput>;
Expand All @@ -55,7 +55,6 @@ describe('ActorRdfJoinMultiBind', () => {
let mediatorQueryOperation: Mediator<Actor<IActionQueryOperation, IActorTest, IQueryOperationResultBindings>,
IActionQueryOperation, IActorTest, IQueryOperationResultBindings>;
let actor: ActorRdfJoinInnerIncrementalComputationalMultiBind;
let logSpy: Mock;

beforeEach(() => {
mediatorJoinSelectivity = <any> {
Expand Down Expand Up @@ -100,7 +99,6 @@ describe('ActorRdfJoinMultiBind', () => {
mediatorJoinSelectivity,
mediatorJoinEntriesSort,
});
logSpy = (<any> actor).logDebug = jest.fn();
});

describe('getJoinCoefficients', () => {
Expand Down Expand Up @@ -148,13 +146,14 @@ describe('ActorRdfJoinMultiBind', () => {
},
],
)).toEqual({
iterations: 1.280_000_000_000_000_2,
iterations: 0,
persistedItems: 0,
blockingItems: 0,
requestTime: 0.440_96,
requestTime: 0,
});
});

/*
it('should handle three entries with a lower variable overlap', async() => {
expect(await actor.getJoinCoefficients(
{
Expand Down Expand Up @@ -324,6 +323,7 @@ describe('ActorRdfJoinMultiBind', () => {
requestTime: 0.403_840_000_000_000_03,
});
});
*/
});

describe('sortJoinEntries', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,11 @@ export class ActorRdfJoinInnerIncrementalFullHash extends ActorRdfJoin {
action: IActionRdfJoin,
metadatas: MetadataBindings[],
): Promise<IMediatorTypeJoinCoefficients> {
const requestInitialTimes = ActorRdfJoin.getRequestInitialTimes(metadatas);
const requestItemTimes = ActorRdfJoin.getRequestItemTimes(metadatas);
return {
iterations: metadatas[0].cardinality.value + metadatas[1].cardinality.value,
persistedItems: metadatas[0].cardinality.value + metadatas[1].cardinality.value,
iterations: 0,
persistedItems: 0,
blockingItems: 0,
requestTime: requestInitialTimes[0] + metadatas[0].cardinality.value * requestItemTimes[0] +
requestInitialTimes[1] + metadatas[1].cardinality.value * requestItemTimes[1],
requestTime: 0,
};
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { bindingsToString } from '@incremunica/incremental-bindings-factory';
import { HashBindings } from '@incremunica/hash-bindings';
import { IncrementalInnerJoin } from '@incremunica/incremental-inner-join';
import type { Bindings, BindingsStream } from '@incremunica/incremental-types';
import type { IMapObject } from './DualKeyHashMap';
Expand All @@ -12,6 +12,7 @@ export class IncrementalFullHashJoin extends IncrementalInnerJoin {
private otherElement: IMapObject<Bindings> | null = null;
private count = 0;
private readonly funHash: (entry: Bindings) => string;
private readonly hashBindings = new HashBindings();

public constructor(
left: BindingsStream,
Expand All @@ -37,10 +38,10 @@ export class IncrementalFullHashJoin extends IncrementalInnerJoin {

private addOrDeleteFromMemory(item: Bindings, joinHash: string, memory: DualKeyHashMap<Bindings>): boolean {
if (item.diff) {
memory.set(bindingsToString(<any>item), joinHash, item);
memory.set(this.hashBindings.hash(item), joinHash, item);
return true;
}
return memory.delete(bindingsToString(<any>item), joinHash);
return memory.delete(this.hashBindings.hash(item), joinHash);
}

public read(): Bindings | null {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
"@comunica/bus-rdf-join": "^2.6.8",
"@comunica/mediatortype-join-coefficients": "^2.6.8",
"@comunica/types": "^2.6.8",
"@incremunica/incremental-bindings-factory": "^1.0.0",
"@incremunica/incremental-inner-join": "^1.0.0",
"@incremunica/incremental-types": "^1.0.0"
"@incremunica/incremental-types": "^1.0.0",
"@incremunica/hash-bindings": "^1.0.0"
},
"scripts": {
"build": "npm run build:ts && npm run build:components",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,7 @@ describe('ActorRdfJoinFullHash', () => {
});

it('should generate correct test metadata', async() => {
await expect(actor.test(action)).resolves.toHaveProperty('iterations',
(await (<any> action.entries[0].output).metadata()).cardinality.value +
(await (<any> action.entries[1].output).metadata()).cardinality.value);
await expect(actor.test(action)).resolves.toHaveProperty('iterations',0);
});
});

Expand Down
Loading

0 comments on commit 7f5553d

Please sign in to comment.