diff --git a/engines/config-query-sparql-incremental/config/query-source-identify/actors/stream.json b/engines/config-query-sparql-incremental/config/query-source-identify/actors/stream.json index 36236301..1e3f9986 100644 --- a/engines/config-query-sparql-incremental/config/query-source-identify/actors/stream.json +++ b/engines/config-query-sparql-incremental/config/query-source-identify/actors/stream.json @@ -10,7 +10,7 @@ { "@id": "urn:comunica:default:query-source-identify/actors#stream", "@type": "ActorQuerySourceIdentifyStream", - "mediatorQuerySourceIdentify": { "@id": "urn:comunica:default:query-source-identify/mediators#main" }, + "mediatorContextPreprocess": { "@id": "urn:comunica:default:context-preprocess/mediators#main" }, "mediatorRdfMetadataAccumulate": { "@id": "urn:comunica:default:rdf-metadata-accumulate/mediators#main" } } ] diff --git a/engines/query-sparql-incremental/test/QuerySparql-test.ts b/engines/query-sparql-incremental/test/QuerySparql-test.ts index 7447a4fa..b25d5640 100644 --- a/engines/query-sparql-incremental/test/QuerySparql-test.ts +++ b/engines/query-sparql-incremental/test/QuerySparql-test.ts @@ -383,11 +383,7 @@ describe('System test: QuerySparql (without polly)', () => { const sourcesStream = new PassThrough({ objectMode: true }); sourcesStream.push({ - value: 'http://localhost:8787', - isAddition: true, - }); - sourcesStream.push({ - value: streamingStore, + querySource: 'http://localhost:8787', isAddition: true, }); @@ -395,7 +391,7 @@ describe('System test: QuerySparql (without polly)', () => { ?s ?p ?o. }`, { // @ts-expect-error - sources: [ sourcesStream ], + sources: [ streamingStore, sourcesStream ], pollingPeriod: 1000, }); @@ -416,7 +412,7 @@ describe('System test: QuerySparql (without polly)', () => { ]).setContextEntry(KeysBindings.isAddition, true)); sourcesStream.push({ - value: 'http://localhost:8787', + querySource: 'http://localhost:8787', isAddition: false, }); @@ -563,7 +559,7 @@ describe('System test: QuerySparql (with polly)', () => { // @ts-expect-error sources: [ new ArrayIterator([ { - value: 'https://www.rubensworks.net/', + querySource: 'https://www.rubensworks.net/', isAddition: true, }, ]) ], diff --git a/packages/actor-query-source-identify-stream/README.md b/packages/actor-query-source-identify-stream/README.md index 5a33682f..f52463e7 100644 --- a/packages/actor-query-source-identify-stream/README.md +++ b/packages/actor-query-source-identify-stream/README.md @@ -24,7 +24,7 @@ After installing, this package can be added to your engine's configuration as fo { "@id": "urn:comunica:default:query-source-identify/actors#stream", "@type": "ActorQuerySourceIdentifyStream", - "mediatorQuerySourceIdentify": { "@id": "urn:comunica:default:query-source-identify/mediators#main" }, + "mediatorContextPreprocess": { "@id": "urn:comunica:default:context-preprocess/mediators#main" }, "mediatorRdfMetadataAccumulate": { "@id": "urn:comunica:default:rdf-metadata-accumulate/mediators#main" } } ] diff --git a/packages/actor-query-source-identify-stream/lib/ActorQuerySourceIdentifyStream.ts b/packages/actor-query-source-identify-stream/lib/ActorQuerySourceIdentifyStream.ts index 957c4418..7968f547 100644 --- a/packages/actor-query-source-identify-stream/lib/ActorQuerySourceIdentifyStream.ts +++ b/packages/actor-query-source-identify-stream/lib/ActorQuerySourceIdentifyStream.ts @@ -1,8 +1,8 @@ +import type { MediatorContextPreprocess } from '@comunica/bus-context-preprocess'; import type { IActionQuerySourceIdentify, IActorQuerySourceIdentifyOutput, IActorQuerySourceIdentifyArgs, - MediatorQuerySourceIdentify, } from '@comunica/bus-query-source-identify'; import { ActorQuerySourceIdentify } from '@comunica/bus-query-source-identify'; import type { MediatorRdfMetadataAccumulate } from '@comunica/bus-rdf-metadata-accumulate'; @@ -10,21 +10,22 @@ import { KeysInitQuery } from '@comunica/context-entries'; import type { IActorTest, TestResult } from '@comunica/core'; import { failTest, passTestVoid, ActionContext } from '@comunica/core'; import type { ComunicaDataFactory } from '@comunica/types'; +import type { QuerySourceStream, QuerySourceUnidentifiedExpanded } from '@incremunica/types'; import { StreamQuerySources } from './StreamQuerySources'; /** * An incremunica Stream Sources Query Source Identify Actor. */ export class ActorQuerySourceIdentifyStream extends ActorQuerySourceIdentify { - public readonly mediatorQuerySourceIdentify: MediatorQuerySourceIdentify; public readonly mediatorRdfMetadataAccumulate: MediatorRdfMetadataAccumulate; + public readonly mediatorContextPreprocess: MediatorContextPreprocess; public constructor(args: IActorQuerySourceIdentifyStreamSourcesArgs) { super(args); } public async test(action: IActionQuerySourceIdentify): Promise> { - const source = action.querySourceUnidentified; + const source = action.querySourceUnidentified; if (source.type !== 'stream') { return failTest(`${this.name} requires a single query source with stream type to be present in the context.`); } @@ -36,10 +37,10 @@ export class ActorQuerySourceIdentifyStream extends ActorQuerySourceIdentify { return { querySource: { source: new StreamQuerySources( - action.querySourceUnidentified.value, + action.querySourceUnidentified.value, dataFactory, - this.mediatorQuerySourceIdentify, this.mediatorRdfMetadataAccumulate, + this.mediatorContextPreprocess, action.context, ), context: action.querySourceUnidentified.context ?? new ActionContext(), @@ -50,11 +51,11 @@ export class ActorQuerySourceIdentifyStream extends ActorQuerySourceIdentify { export interface IActorQuerySourceIdentifyStreamSourcesArgs extends IActorQuerySourceIdentifyArgs { /** - * A mediator for identifying query sources. + * A mediator for accumulating metadata. */ - mediatorQuerySourceIdentify: MediatorQuerySourceIdentify; + mediatorRdfMetadataAccumulate: MediatorRdfMetadataAccumulate; /** - * A mediator for identifying query sources. + * A mediator for preprocessing the context. */ - mediatorRdfMetadataAccumulate: MediatorQuerySourceIdentify; + mediatorContextPreprocess: MediatorContextPreprocess; } diff --git a/packages/actor-query-source-identify-stream/lib/StreamQuerySources.ts b/packages/actor-query-source-identify-stream/lib/StreamQuerySources.ts index faa60ae6..e13fb1da 100644 --- a/packages/actor-query-source-identify-stream/lib/StreamQuerySources.ts +++ b/packages/actor-query-source-identify-stream/lib/StreamQuerySources.ts @@ -2,14 +2,12 @@ import { EventEmitter } from 'events'; import { LinkedRdfSourcesAsyncRdfIterator, } from '@comunica/actor-query-source-identify-hypermedia/lib/LinkedRdfSourcesAsyncRdfIterator'; -import type { - IActorQuerySourceIdentifyOutput, - MediatorQuerySourceIdentify, -} from '@comunica/bus-query-source-identify'; +import type { MediatorContextPreprocess } from '@comunica/bus-context-preprocess'; import { getVariables, } from '@comunica/bus-query-source-identify'; import type { MediatorRdfMetadataAccumulate } from '@comunica/bus-rdf-metadata-accumulate'; +import { KeysInitQuery, KeysQueryOperation } from '@comunica/context-entries'; import type { Bindings, BindingsStream, @@ -21,77 +19,119 @@ import type { MetadataBindings, } from '@comunica/types'; import { MetadataValidationState } from '@comunica/utils-metadata'; +import type { IQuerySourceStreamElement, QuerySourceStream } from '@incremunica/types'; import type * as RDF from '@rdfjs/types'; import { AsyncIterator, UnionIterator } from 'asynciterator'; import { Queue } from 'data-structure-typed'; import { type Algebra, Factory } from 'sparqlalgebrajs'; import type { Operation } from 'sparqlalgebrajs/lib/algebra'; -export type IStreamQuerySource = { - isAddition: boolean; - value: string; -}; +enum SourceState { + identify, + done, + deleted, +} type ISourceWrapper = { source: IQuerySource | undefined; deleteCallbacks: (() => void)[]; -}; - -type ISourceWrapperSafe = { - source: IQuerySource; - deleteCallbacks: (() => void)[]; + state: SourceState; + identifiedEvent: EventEmitter; }; export class StreamQuerySources implements IQuerySource { public referenceValue: string; public context: IActionContext; - private readonly sources: Map = new Map(); + private readonly sources: Map = new Map(); private readonly sourcesEventEmitter: EventEmitter; protected readonly selectorShape: FragmentSelectorShape; private readonly dataFactory: ComunicaDataFactory; private readonly mediatorRdfMetadataAccumulate: MediatorRdfMetadataAccumulate; + private error: Error | undefined; public constructor( - stream: AsyncIterator, + stream: QuerySourceStream, dataFactory: ComunicaDataFactory, - mediatorQuerySourceIdentify: MediatorQuerySourceIdentify, mediatorRdfMetadataAccumulate: MediatorRdfMetadataAccumulate, + mediatorContextPreprocess: MediatorContextPreprocess, context: IActionContext, ) { this.mediatorRdfMetadataAccumulate = mediatorRdfMetadataAccumulate; this.sourcesEventEmitter = new EventEmitter(); this.sourcesEventEmitter.setMaxListeners(Number.POSITIVE_INFINITY); - stream.on('data', (item: IStreamQuerySource) => { - // TODO [2025-01-01]: properly canonicalize the url, if it is a URL - // TODO [2025-01-01]: what if the URL occurs twice once here and once in another source identify? - // TODO [2025-01-01]: what if a source occurs twice here + stream.on('data', (item: IQuerySourceStreamElement) => { + let hash: string; + if (typeof item.querySource === 'string') { + hash = item.querySource; + } else { + hash = item.querySource.value; + } if (item.isAddition) { - const chunk: ISourceWrapper = { - deleteCallbacks: [], - source: undefined, - }; - mediatorQuerySourceIdentify - .mediate({ context, querySourceUnidentified: { value: item.value }}) - .then((querySource: IActorQuerySourceIdentifyOutput) => { - chunk.source = querySource.querySource.source; - this.sourcesEventEmitter.emit('data', chunk); - }) - .catch((error: Error) => { - throw error; + const existingSourceInstance = this.sources.get(hash); + if (existingSourceInstance === undefined) { + const sourceWrapper: ISourceWrapper = { + source: undefined, + deleteCallbacks: [], + state: SourceState.identify, + identifiedEvent: new EventEmitter(), + }; + mediatorContextPreprocess + .mediate({ context: context.set(KeysInitQuery.querySourcesUnidentified, [ item.querySource ]) }) + .then((contextPreprocessResult) => { + if (sourceWrapper.state !== SourceState.deleted) { + const sources = contextPreprocessResult.context.get(KeysQueryOperation.querySources); + if (sources === undefined || sources.length !== 1) { + this.error = new Error('Expected a single query source.'); + this.sourcesEventEmitter.emit('error', this.error); + return; + } + sourceWrapper.source = sources[0].source; + // Don't set the state to done if it is deleted + if (sourceWrapper.state === SourceState.identify) { + sourceWrapper.state = SourceState.done; + } + sourceWrapper.identifiedEvent.emit('identified'); + } + }) + .catch((error: Error) => { + this.error = error; + }); + this.sourcesEventEmitter.emit('data', sourceWrapper); + this.sources.set(hash, [ sourceWrapper ]); + } else { + const sourceWrapper: ISourceWrapper = { + source: existingSourceInstance[0].source, + state: existingSourceInstance[0].state, + deleteCallbacks: [], + identifiedEvent: existingSourceInstance[0].identifiedEvent, + }; + existingSourceInstance.push(sourceWrapper); + sourceWrapper.identifiedEvent.on('identified', () => { + if (sourceWrapper.state === SourceState.identify) { + sourceWrapper.source = existingSourceInstance[0].source; + sourceWrapper.state = SourceState.done; + } }); - this.sources.set(item.value, chunk); + this.sourcesEventEmitter.emit('data', sourceWrapper); + } } else { - const source = this.sources.get(item.value); - if (!source) { + const source = this.sources.get(hash); + if (source === undefined || source.length === 0) { + this.error = new Error(`Deleted source: "${hash}" has not been added. List of added sources:\n[\n${[ ...this.sources.keys() ].join(',\n')}\n]`); + this.sourcesEventEmitter.emit('error', this.error); return; } - for (const deleteCallback of source.deleteCallbacks) { + const workingSource = source.pop()!; + workingSource.state = SourceState.deleted; + for (const deleteCallback of workingSource.deleteCallbacks) { deleteCallback(); } - this.sources.delete(item.value); + if (source.length === 0) { + this.sources.delete(hash); + } } }); - this.referenceValue = 'StreamingHypermediaQuerySources'; + this.referenceValue = 'StreamingQuerySources'; this.dataFactory = dataFactory; const AF = new Factory( this.dataFactory); this.selectorShape = { @@ -121,12 +161,22 @@ export class StreamQuerySources implements IQuerySource { context: IActionContext, options: IQueryBindingsOptions | undefined, ): BindingsStream { - const buffer = new Queue(); - for (const sourceWrapper of this.sources.values()) { - if (sourceWrapper.source === undefined) { - continue; + if (this.error) { + throw this.error; + } + + const buffer = new Queue(); + for (const sourceWrappers of this.sources.values()) { + for (const sourceWrapper of sourceWrappers) { + if (sourceWrapper.state === SourceState.identify) { + sourceWrapper.identifiedEvent.once('identified', () => { + buffer.push(sourceWrapper); + iterator.readable = true; + }); + } else { + buffer.push(sourceWrapper); + } } - buffer.push(sourceWrapper); } const variables = getVariables(operation); @@ -141,12 +191,18 @@ export class StreamQuerySources implements IQuerySource { let first = true; const iterator = new AsyncIterator(); iterator.read = (): BindingsStream | null => { + if (this.error) { + return null; + } const sourceWrapper = buffer.shift(); if (sourceWrapper === undefined) { iterator.readable = false; return null; } - const bindingsStream = sourceWrapper.source.queryBindings(operation, context, options); + if (sourceWrapper.state === SourceState.deleted) { + return iterator.read(); + } + const bindingsStream = sourceWrapper.source!.queryBindings(operation, context, options); bindingsStream.getProperty('metadata', (metadata: MetadataBindings) => { if (first) { accumulatedMetadata.state.invalidate(); @@ -170,21 +226,30 @@ export class StreamQuerySources implements IQuerySource { }); let stopStreamFn = bindingsStream.getProperty<() => void>('delete'); if (!stopStreamFn) { + // Either the source of the bindingsStream (as the bindingsStream is probably a mapping iterator from the + // skolemization) is possibly a LinkedRdfSourcesAsyncRdfIterator + let linkedRdfSourcesAsyncRdfIterator: LinkedRdfSourcesAsyncRdfIterator | undefined; if (bindingsStream instanceof LinkedRdfSourcesAsyncRdfIterator) { + linkedRdfSourcesAsyncRdfIterator = bindingsStream; + } + if ((bindingsStream)._source instanceof LinkedRdfSourcesAsyncRdfIterator) { + linkedRdfSourcesAsyncRdfIterator = (bindingsStream)._source; + } + if (linkedRdfSourcesAsyncRdfIterator) { // TODO [2025-01-01]: make sure LinkedRdfSourcesAsyncRdfIterator is also destroyed stopStreamFn = () => { - for (const currentIterator of (bindingsStream).currentIterators) { + for (const currentIterator of (linkedRdfSourcesAsyncRdfIterator).currentIterators) { const fn = (currentIterator).getProperty<() => void>('delete'); if (fn) { fn(); } else { - throw new Error('No delete function found'); + iterator.destroy(new Error('No delete function found')); } } }; } else { stopStreamFn = () => { - throw new Error('No delete function found'); + iterator.destroy(new Error('No delete function found')); }; } } @@ -193,15 +258,25 @@ export class StreamQuerySources implements IQuerySource { }; iterator.readable = true; - const addSourceToBuffer = (sourceWrapper: ISourceWrapperSafe): void => { + const addSourceToBuffer = (sourceWrapper: ISourceWrapper): void => { if (iterator.done) { this.sourcesEventEmitter.removeListener('data', addSourceToBuffer); return; } - buffer.push(sourceWrapper); - iterator.readable = true; + if (sourceWrapper.state === SourceState.identify) { + sourceWrapper.identifiedEvent.once('identified', () => { + buffer.push(sourceWrapper); + iterator.readable = true; + }); + } else { + buffer.push(sourceWrapper); + iterator.readable = true; + } }; this.sourcesEventEmitter.on('data', addSourceToBuffer); + this.sourcesEventEmitter.on('error', (error: Error) => { + unionIterator.destroy(error); + }); const unionIterator = new UnionIterator(iterator, { autoStart: false }); diff --git a/packages/actor-query-source-identify-stream/package.json b/packages/actor-query-source-identify-stream/package.json index 594123fd..d3032d74 100644 --- a/packages/actor-query-source-identify-stream/package.json +++ b/packages/actor-query-source-identify-stream/package.json @@ -1,7 +1,7 @@ { "name": "@incremunica/actor-query-source-identify-stream", "version": "1.3.0", - "description": "A Streaming hypermedia query-source-identify actor", + "description": "A Streaming query-source-identify actor", "lsd:module": true, "license": "MIT", "homepage": "https://maartyman.github.io/incremunica/", @@ -38,12 +38,14 @@ }, "dependencies": { "@comunica/actor-query-source-identify-hypermedia": "^4.0.2", + "@comunica/bus-context-preprocess": "^4.0.2", "@comunica/bus-query-source-identify": "^4.0.1", "@comunica/bus-rdf-metadata-accumulate": "^4.0.2", "@comunica/context-entries": "^4.0.1", "@comunica/core": "^4.0.1", "@comunica/types": "^4.0.1", "@comunica/utils-metadata": "^4.0.1", + "@incremunica/types": "^1.3.0", "@rdfjs/types": "*", "asynciterator": "^3.9.0", "data-structure-typed": "^1.53.2", diff --git a/packages/actor-query-source-identify-stream/test/ActorQuerySourceIdentifyStream-test.ts b/packages/actor-query-source-identify-stream/test/ActorQuerySourceIdentifyStream-test.ts index e69de29b..ee09c398 100644 --- a/packages/actor-query-source-identify-stream/test/ActorQuerySourceIdentifyStream-test.ts +++ b/packages/actor-query-source-identify-stream/test/ActorQuerySourceIdentifyStream-test.ts @@ -0,0 +1,381 @@ +import type { MediatorContextPreprocess } from '@comunica/bus-context-preprocess'; +import { ActorQuerySourceIdentify } from '@comunica/bus-query-source-identify'; +import type { MediatorRdfMetadataAccumulate } from '@comunica/bus-rdf-metadata-accumulate'; +import { KeysInitQuery, KeysQueryOperation } from '@comunica/context-entries'; +import { ActionContext, Bus } from '@comunica/core'; +import type { IActionContext } from '@comunica/types'; +import { KeysBindings } from '@incremunica/context-entries'; +import { createTestContextWithDataFactory, AF, DF, BF, partialArrayifyStream } from '@incremunica/dev-tools'; +import type { IQuerySourceStreamElement } from '@incremunica/types'; +import { ArrayIterator, AsyncIterator } from 'asynciterator'; +import { ActorQuerySourceIdentifyStream } from '../lib'; +import 'jest-rdf'; +import '@comunica/utils-jest'; +import { StreamQuerySources } from '../lib/StreamQuerySources'; + +describe('ActorQuerySourceIdentifyStream', () => { + let bus: any; + + beforeEach(() => { + bus = new Bus({ name: 'bus' }); + }); + + describe('The ActorQuerySourceIdentifyStream module', () => { + it('should be a function', () => { + expect(ActorQuerySourceIdentifyStream).toBeInstanceOf(Function); + }); + + it('should be a ActorQuerySourceIdentifyStream constructor', () => { + expect(new ( ActorQuerySourceIdentifyStream)({ name: 'actor', bus })) + .toBeInstanceOf(ActorQuerySourceIdentifyStream); + expect(new ( ActorQuerySourceIdentifyStream)({ name: 'actor', bus })) + .toBeInstanceOf(ActorQuerySourceIdentify); + }); + + it('should not be able to create new ActorQuerySourceIdentifyStream objects without \'new\'', () => { + expect(() => { + ( ActorQuerySourceIdentifyStream)(); + }).toThrow(`Class constructor ActorQuerySourceIdentifyStream cannot be invoked without 'new'`); + }); + }); + + describe('An ActorQuerySourceIdentifyStream instance', () => { + let actor: ActorQuerySourceIdentifyStream; + let source: AsyncIterator; + let mediatorRdfMetadataAccumulate: MediatorRdfMetadataAccumulate; + let mediatorContextPreprocess: MediatorContextPreprocess; + let context: IActionContext; + let deleteCallback: () => void; + + beforeEach(() => { + deleteCallback = jest.fn(); + mediatorRdfMetadataAccumulate = { + mediate: jest.fn((action) => { + return { source: {}, context: new ActionContext() }; + }), + }; + mediatorContextPreprocess = { + mediate: jest.fn((action) => { + return Promise.resolve({ + context: action.context.set(KeysQueryOperation.querySources, [{ + source: { + queryBindings: () => { + const it = new AsyncIterator(); + it.read = () => { + if (it.readable) { + it.readable = false; + return BF.bindings([ + [ DF.variable('v'), DF.namedNode('a') ], + ]).setContextEntry(KeysBindings.isAddition, true); + } + return null; + }; + it.readable = true; + + it.setProperty<() => void>('delete', () => { + deleteCallback(); + it.read = () => { + if (it.readable) { + it.readable = false; + return BF.bindings([ + [ DF.variable('v'), DF.namedNode('a') ], + ]).setContextEntry(KeysBindings.isAddition, false); + } + it.close(); + return null; + }; + it.readable = true; + }); + + return it; + }, + }, + context: new ActionContext(), + }]), + }); + }), + }; + actor = new ActorQuerySourceIdentifyStream({ + name: 'actor', + bus, + mediatorRdfMetadataAccumulate, + mediatorContextPreprocess, + }); + source = new ArrayIterator(); + context = createTestContextWithDataFactory(); + }); + + describe('test', () => { + it('should test', async() => { + await expect(actor.test({ + querySourceUnidentified: { type: 'stream', value: source }, + context: new ActionContext(), + })).resolves.toBeTruthy(); + }); + + it('should not test with sparql type', async() => { + await expect(actor.test({ + querySourceUnidentified: { type: 'sparql', value: source }, + context: new ActionContext(), + })).resolves.toFailTest(`actor requires a single query source with stream type to be present in the context.`); + }); + }); + + describe('run', () => { + it('should get the source', async() => { + const result = await actor.run({ + querySourceUnidentified: { type: 'stream', value: source }, + context, + }); + expect(result.querySource.source).toBeInstanceOf(StreamQuerySources); + expect(result.querySource.context).not.toBe(context); + }); + + it('should work with different versions of query sources', async() => { + const sources = [ + { + isAddition: true, + querySource: 'http://example.org/', + }, + { + isAddition: true, + querySource: ' .', + }, + { + isAddition: true, + querySource: { + type: 'serialized', + value: ' .', + }, + }, + { + isAddition: true, + querySource: { + value: 'http://example.org/', + }, + }, + { + isAddition: true, + querySource: { + value: 'http://example.org/', + context: new ActionContext(), + }, + }, + ]; + source = new ArrayIterator(sources); + const result = await actor.run({ + querySourceUnidentified: { type: 'stream', value: source }, + context, + }); + expect(result.querySource.source).toBeInstanceOf(StreamQuerySources); + expect(result.querySource.context).not.toBe(context); + const bindings = result.querySource.source.queryBindings( + AF.createPattern(DF.variable('s'), DF.namedNode('p'), DF.variable('o')), + new ActionContext(), + ); + await expect(partialArrayifyStream(bindings, 5)).resolves.toEqualBindingsArray([ + BF.bindings([[ DF.variable('v'), DF.namedNode('a') ]]).setContextEntry(KeysBindings.isAddition, true), + BF.bindings([[ DF.variable('v'), DF.namedNode('a') ]]).setContextEntry(KeysBindings.isAddition, true), + BF.bindings([[ DF.variable('v'), DF.namedNode('a') ]]).setContextEntry(KeysBindings.isAddition, true), + BF.bindings([[ DF.variable('v'), DF.namedNode('a') ]]).setContextEntry(KeysBindings.isAddition, true), + BF.bindings([[ DF.variable('v'), DF.namedNode('a') ]]).setContextEntry(KeysBindings.isAddition, true), + ]); + expect(mediatorContextPreprocess.mediate).toHaveBeenNthCalledWith( + 1, + { context: context.set(KeysInitQuery.querySourcesUnidentified, [ sources[0].querySource ]) }, + ); + expect(mediatorContextPreprocess.mediate).toHaveBeenNthCalledWith( + 2, + { context: context.set(KeysInitQuery.querySourcesUnidentified, [ sources[1].querySource ]) }, + ); + expect(mediatorRdfMetadataAccumulate.mediate).toHaveBeenCalledTimes(0); + }); + + it('should work with immediate deletions 1', async() => { + const sources = [ + { + isAddition: true, + querySource: 'http://example.org/', + }, + { + isAddition: false, + querySource: 'http://example.org/', + }, + ]; + source = new ArrayIterator(sources); + const result = await actor.run({ + querySourceUnidentified: { type: 'stream', value: source }, + context, + }); + expect(result.querySource.source).toBeInstanceOf(StreamQuerySources); + expect(result.querySource.context).not.toBe(context); + const bindings = result.querySource.source.queryBindings( + AF.createPattern(DF.variable('s'), DF.namedNode('p'), DF.variable('o')), + new ActionContext(), + ); + await new Promise(resolve => setTimeout(resolve, 10)); + expect(deleteCallback).toHaveBeenCalledTimes(0); + expect(mediatorContextPreprocess.mediate).toHaveBeenNthCalledWith( + 1, + { context: context.set(KeysInitQuery.querySourcesUnidentified, [ sources[0].querySource ]) }, + ); + }); + + it('should work with immediate deletions 2', async() => { + const sources = [ + { + isAddition: true, + querySource: 'http://example.org/', + }, + { + isAddition: true, + querySource: 'http://example.org/', + }, + { + isAddition: true, + querySource: 'http://example.org/', + }, + { + isAddition: false, + querySource: 'http://example.org/', + }, + { + isAddition: false, + querySource: 'http://example.org/', + }, + ]; + source = new ArrayIterator(sources); + const result = await actor.run({ + querySourceUnidentified: { type: 'stream', value: source }, + context, + }); + expect(result.querySource.source).toBeInstanceOf(StreamQuerySources); + expect(result.querySource.context).not.toBe(context); + const bindings = result.querySource.source.queryBindings( + AF.createPattern(DF.variable('s'), DF.namedNode('p'), DF.variable('o')), + new ActionContext(), + ); + await expect(partialArrayifyStream(bindings, 1)).resolves.toEqualBindingsArray([ + BF.bindings([[ DF.variable('v'), DF.namedNode('a') ]]).setContextEntry(KeysBindings.isAddition, true), + ]); + expect(deleteCallback).toHaveBeenCalledTimes(0); + expect(mediatorContextPreprocess.mediate).toHaveBeenNthCalledWith( + 1, + { context: context.set(KeysInitQuery.querySourcesUnidentified, [ sources[0].querySource ]) }, + ); + }); + + it('should work with slow deletions', async() => { + const sources = [ + { + isAddition: true, + querySource: 'http://example.org/', + }, + { + isAddition: false, + querySource: 'http://example.org/', + }, + ]; + source = new AsyncIterator(); + source.read = () => { + if (sources.length === 0) { + source.close(); + return null; + } + if (source.readable) { + source.readable = false; + return sources.shift(); + } + source.readable = false; + return null; + }; + source.readable = true; + const result = await actor.run({ + querySourceUnidentified: { type: 'stream', value: source }, + context, + }); + expect(result.querySource.source).toBeInstanceOf(StreamQuerySources); + expect(result.querySource.context).not.toBe(context); + const bindings = result.querySource.source.queryBindings( + AF.createPattern(DF.variable('s'), DF.namedNode('p'), DF.variable('o')), + new ActionContext(), + ); + await expect(partialArrayifyStream(bindings, 1)).resolves.toEqualBindingsArray([ + BF.bindings([[ DF.variable('v'), DF.namedNode('a') ]]).setContextEntry(KeysBindings.isAddition, true), + ]); + source.readable = true; + await expect(partialArrayifyStream(bindings, 1)).resolves.toEqualBindingsArray([ + BF.bindings([[ DF.variable('v'), DF.namedNode('a') ]]).setContextEntry(KeysBindings.isAddition, false), + ]); + expect(deleteCallback).toHaveBeenCalledTimes(1); + expect(mediatorContextPreprocess.mediate).toHaveBeenNthCalledWith( + 1, + { context: context.set(KeysInitQuery.querySourcesUnidentified, [ 'http://example.org/' ]) }, + ); + }); + + it('should fail on non existing deletions', async() => { + const sources = [ + { + isAddition: true, + querySource: 'http://example.org/1', + }, + { + isAddition: true, + querySource: 'http://example.org/2', + }, + { + isAddition: false, + querySource: 'http://example.org/a', + }, + ]; + source = new AsyncIterator(); + source.read = () => { + if (sources.length === 0) { + source.close(); + return null; + } + return sources.shift(); + }; + source.readable = false; + const result = (await actor.run({ + querySourceUnidentified: { type: 'stream', value: source }, + context, + })).querySource; + const bindingsStream = result.source.queryBindings( + AF.createPattern(DF.variable('s'), DF.namedNode('p'), DF.variable('o')), + new ActionContext(), + ); + source.readable = true; + await expect(new Promise((resolve, reject) => { + bindingsStream.on('data', () => { + resolve(); + }); + bindingsStream.on('end', () => { + resolve(); + }); + bindingsStream.on('error', (e) => { + reject(e); + }); + })).rejects.toThrow('Deleted source: "http://example.org/a" has not been added. List of added sources:\n[\nhttp://example.org/1,\nhttp://example.org/2\n]'); + expect(() => { + result.source.queryBindings( + AF.createPattern(DF.variable('s'), DF.namedNode('p'), DF.variable('o')), + new ActionContext(), + ); + }).toThrow('Deleted source: "http://example.org/a" has not been added. List of added sources:\n[\nhttp://example.org/1,\nhttp://example.org/2\n]'); + }); + + it('should get the source with context', async() => { + const contextSource = new ActionContext(); + const ret = await actor.run({ + querySourceUnidentified: { type: 'stream', value: source, context: contextSource }, + context, + }); + expect(ret.querySource.source).toBeInstanceOf(StreamQuerySources); + expect(ret.querySource.context).not.toBe(context); + expect(ret.querySource.context).toBe(contextSource); + }); + }); + }); +}); diff --git a/packages/types/lib/QuerySource.ts b/packages/types/lib/QuerySource.ts new file mode 100644 index 00000000..1d7b9688 --- /dev/null +++ b/packages/types/lib/QuerySource.ts @@ -0,0 +1,21 @@ +import type { IActionContext } from '@comunica/types'; +import type { + IQuerySourceSerialized, + IQuerySourceUnidentifiedExpanded, + IQuerySourceUnidentifiedExpandedRawContext, +} from '@comunica/types/lib/IQuerySource'; +import type * as RDF from '@rdfjs/types'; +import type { AsyncIterator } from 'asynciterator'; + +export interface IQuerySourceStreamElement { + isAddition: boolean; + querySource: + string | + { type?: string; value: string; context?: IActionContext } | + { type?: string; value: string; context?: Record }; +} +export type QuerySourceStream = AsyncIterator; + +export type QuerySourceUnidentifiedExpanded = IQuerySourceUnidentifiedExpanded | IQuerySourceSerialized; +export type QuerySourceUnidentified = string | RDF.Source | RDF.Store | QuerySourceUnidentifiedExpanded | + IQuerySourceUnidentifiedExpandedRawContext | QuerySourceStream; diff --git a/packages/types/lib/index.ts b/packages/types/lib/index.ts index 41cd63a0..6dc45483 100644 --- a/packages/types/lib/index.ts +++ b/packages/types/lib/index.ts @@ -1,2 +1,3 @@ export * from './Quad'; export * from './DetermineChangesEvents'; +export * from './QuerySource'; diff --git a/packages/types/package.json b/packages/types/package.json index feb7a500..4ccb1cb6 100644 --- a/packages/types/package.json +++ b/packages/types/package.json @@ -34,6 +34,9 @@ "build:components": "componentsjs-generator" }, "dependencies": { + "@comunica/types": "^4.0.2", + "@rdfjs/types": "*", + "asynciterator": "^3.9.0", "n3": "^1.16.3" } }