Skip to content

Commit

Permalink
fix: add tests and fixes to stream source
Browse files Browse the repository at this point in the history
  • Loading branch information
maartyman committed Jan 20, 2025
1 parent 3d1fbfa commit 1b01ad6
Show file tree
Hide file tree
Showing 10 changed files with 550 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
{
"@id": "urn:comunica:default:query-source-identify/actors#stream",
"@type": "ActorQuerySourceIdentifyStream",
"mediatorQuerySourceIdentify": { "@id": "urn:comunica:default:query-source-identify/mediators#main" },
"mediatorContextPreprocess": { "@id": "urn:comunica:default:context-preprocess/mediators#main" },
"mediatorRdfMetadataAccumulate": { "@id": "urn:comunica:default:rdf-metadata-accumulate/mediators#main" }
}
]
Expand Down
12 changes: 4 additions & 8 deletions engines/query-sparql-incremental/test/QuerySparql-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -383,19 +383,15 @@ describe('System test: QuerySparql (without polly)', () => {

const sourcesStream = new PassThrough({ objectMode: true });
sourcesStream.push({
value: 'http://localhost:8787',
isAddition: true,
});
sourcesStream.push({
value: streamingStore,
querySource: 'http://localhost:8787',
isAddition: true,
});

bindingStream = await engine.queryBindings(`SELECT * WHERE {
?s ?p ?o.
}`, {
// @ts-expect-error
sources: [ sourcesStream ],
sources: [ streamingStore, sourcesStream ],
pollingPeriod: 1000,
});

Expand All @@ -416,7 +412,7 @@ describe('System test: QuerySparql (without polly)', () => {
]).setContextEntry(KeysBindings.isAddition, true));

sourcesStream.push({
value: 'http://localhost:8787',
querySource: 'http://localhost:8787',
isAddition: false,
});

Expand Down Expand Up @@ -563,7 +559,7 @@ describe('System test: QuerySparql (with polly)', () => {
// @ts-expect-error
sources: [ new ArrayIterator([
{
value: 'https://www.rubensworks.net/',
querySource: 'https://www.rubensworks.net/',
isAddition: true,
},
]) ],
Expand Down
2 changes: 1 addition & 1 deletion packages/actor-query-source-identify-stream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ After installing, this package can be added to your engine's configuration as fo
{
"@id": "urn:comunica:default:query-source-identify/actors#stream",
"@type": "ActorQuerySourceIdentifyStream",
"mediatorQuerySourceIdentify": { "@id": "urn:comunica:default:query-source-identify/mediators#main" },
"mediatorContextPreprocess": { "@id": "urn:comunica:default:context-preprocess/mediators#main" },
"mediatorRdfMetadataAccumulate": { "@id": "urn:comunica:default:rdf-metadata-accumulate/mediators#main" }
}
]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
import type { MediatorContextPreprocess } from '@comunica/bus-context-preprocess';
import type {
IActionQuerySourceIdentify,
IActorQuerySourceIdentifyOutput,
IActorQuerySourceIdentifyArgs,
MediatorQuerySourceIdentify,
} from '@comunica/bus-query-source-identify';
import { ActorQuerySourceIdentify } from '@comunica/bus-query-source-identify';
import type { MediatorRdfMetadataAccumulate } from '@comunica/bus-rdf-metadata-accumulate';
import { KeysInitQuery } from '@comunica/context-entries';
import type { IActorTest, TestResult } from '@comunica/core';
import { failTest, passTestVoid, ActionContext } from '@comunica/core';
import type { ComunicaDataFactory } from '@comunica/types';
import type { QuerySourceStream, QuerySourceUnidentifiedExpanded } from '@incremunica/types';
import { StreamQuerySources } from './StreamQuerySources';

/**
* An incremunica Stream Sources Query Source Identify Actor.
*/
export class ActorQuerySourceIdentifyStream extends ActorQuerySourceIdentify {
public readonly mediatorQuerySourceIdentify: MediatorQuerySourceIdentify;
public readonly mediatorRdfMetadataAccumulate: MediatorRdfMetadataAccumulate;
public readonly mediatorContextPreprocess: MediatorContextPreprocess;

public constructor(args: IActorQuerySourceIdentifyStreamSourcesArgs) {
super(args);
}

public async test(action: IActionQuerySourceIdentify): Promise<TestResult<IActorTest>> {
const source = action.querySourceUnidentified;
const source = <QuerySourceUnidentifiedExpanded>action.querySourceUnidentified;
if (source.type !== 'stream') {
return failTest(`${this.name} requires a single query source with stream type to be present in the context.`);
}
Expand All @@ -36,10 +37,10 @@ export class ActorQuerySourceIdentifyStream extends ActorQuerySourceIdentify {
return {
querySource: {
source: new StreamQuerySources(
<any>action.querySourceUnidentified.value,
<QuerySourceStream><any>action.querySourceUnidentified.value,
dataFactory,
this.mediatorQuerySourceIdentify,
this.mediatorRdfMetadataAccumulate,
this.mediatorContextPreprocess,
action.context,
),
context: action.querySourceUnidentified.context ?? new ActionContext(),
Expand All @@ -50,11 +51,11 @@ export class ActorQuerySourceIdentifyStream extends ActorQuerySourceIdentify {

export interface IActorQuerySourceIdentifyStreamSourcesArgs extends IActorQuerySourceIdentifyArgs {
/**
* A mediator for identifying query sources.
* A mediator for accumulating metadata.
*/
mediatorQuerySourceIdentify: MediatorQuerySourceIdentify;
mediatorRdfMetadataAccumulate: MediatorRdfMetadataAccumulate;
/**
* A mediator for identifying query sources.
* A mediator for preprocessing the context.
*/
mediatorRdfMetadataAccumulate: MediatorQuerySourceIdentify;
mediatorContextPreprocess: MediatorContextPreprocess;
}
Loading

0 comments on commit 1b01ad6

Please sign in to comment.