Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Set source type for endpoints #68

Merged
merged 1 commit into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions src/lib/Generator.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import getSPARQLQuery from '../utils/getSPARQLQuery.js';
import type {Quad, NamedNode} from '@rdfjs/types';
import getSPARQLQueryString from '../utils/getSPARQLQueryString.js';
import getEndpoint from '../utils/getEndpoint.js';
import type {Endpoint, QueryEngine} from './types.js';
import type {Endpoint, QueryEngine, QuerySource} from './types.js';
import getEngine from '../utils/getEngine.js';
import getEngineSource from '../utils/getEngineSource.js';
import EventEmitter from 'node:events';
Expand All @@ -23,10 +23,9 @@ export default class Generator extends EventEmitter<Events> {
private iterationsProcessed = 0;
private iterationsIncoming = 0;
private statements = 0;
private source = '';
private $thisList: NamedNode[] = [];
private readonly endpoint: Endpoint;
// private iteratorEnded: boolean = false;
private source?: QuerySource;
public constructor(
private readonly stage: Stage,
private readonly index: number
Expand Down Expand Up @@ -77,7 +76,6 @@ export default class Generator extends EventEmitter<Events> {
this.source
}: ${(e as Error).message}`
);
if (this.source === '') this.source = getEngineSource(this.endpoint);
const unionQuery = getSPARQLQuery(
getSPARQLQueryString(this.query),
'construct'
Expand All @@ -92,7 +90,7 @@ export default class Generator extends EventEmitter<Events> {

this.engine
.queryQuads(getSPARQLQueryString(unionQuery), {
sources: [this.source],
sources: [(this.source ??= getEngineSource(this.endpoint))],
})
.then(stream => {
stream.on('data', (quad: Quad) => {
Expand Down
7 changes: 3 additions & 4 deletions src/lib/Iterator.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import getSPARQLQuery from '../utils/getSPARQLQuery.js';
import {type Bindings} from '@comunica/types';
import getSPARQLQueryString from '../utils/getSPARQLQueryString.js';
import getEndpoint from '../utils/getEndpoint.js';
import type {Endpoint, QueryEngine} from './types.js';
import type {Endpoint, QueryEngine, QuerySource} from './types.js';
import getEngine from '../utils/getEngine.js';
import getEngineSource from '../utils/getEngineSource.js';
import parse from 'parse-duration';
Expand All @@ -24,7 +24,7 @@ export default class Iterator extends EventEmitter<Events> {
public readonly endpoint: Endpoint;
private readonly engine: QueryEngine;
private readonly delay: number = 0;
private source = '';
private source?: QuerySource;
private $offset = 0;
public totalResults = 0;

Expand All @@ -50,7 +50,6 @@ export default class Iterator extends EventEmitter<Events> {
public run(): void {
setTimeout(() => {
let resultsPerPage = 0;
if (this.source === '') this.source = getEngineSource(this.endpoint);
this.query.offset = this.$offset;
const queryString = getSPARQLQueryString(this.query);
const error = (e: unknown): Error =>
Expand All @@ -63,7 +62,7 @@ export default class Iterator extends EventEmitter<Events> {
);
this.engine
.queryBindings(queryString, {
sources: [this.source],
sources: [(this.source ??= getEngineSource(this.endpoint))],
})
.then(stream => {
stream.on('data', (binding: Bindings) => {
Expand Down
22 changes: 9 additions & 13 deletions src/lib/Pipeline.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ let spinner: Ora;
class Pipeline {
public readonly stages = new Map<string, Stage>();
public dataDir: string;
private $isValidated = false;
private stageNames: string[] = [];
private startTime = performance.now();
private readonly destination: File | TriplyDB;
Expand Down Expand Up @@ -64,6 +63,7 @@ class Pipeline {
this.destination = isTriplyDBPathString(destinationFile)
? new TriplyDB(destinationFile).validate()
: new File(destinationFile, true).validate();
this.validate();
}

private error(e: Error, stage?: string): void {
Expand All @@ -76,7 +76,6 @@ class Pipeline {
}

public getPreviousStage(stage: Stage): Stage | undefined {
this.validate();
if (!this.stages.has(stage.name)) {
throw new Error(
`This is unexpected: missing stage "${stage.name}" in stages.`
Expand All @@ -88,18 +87,18 @@ class Pipeline {
else return this.stages.get(names[ix - 1]);
}

public validate(): void {
if (this.$isValidated) return;
let i = 0;
private validate(): void {
if (this.$configuration.stages.length === 0) {
throw new Error('Your pipeline contains no stages.');
}

if (this.$configuration.stages[0].iterator.endpoint === undefined) {
throw new Error(
'The first stage of your pipeline must have an endpoint defined for the Iterator.'
);
}

for (const stageConfiguration of this.$configuration.stages) {
if (i === 0 && stageConfiguration.iterator.endpoint === undefined) {
throw new Error(
'The first stage of your pipeline must have an endpoint defined for the Iterator.'
);
}
if (this.stages.has(stageConfiguration.name)) {
throw new Error(
`Detected a duplicate name for stage \`${stageConfiguration.name}\` in your pipeline: each stage must have a unique name.`
Expand All @@ -109,9 +108,7 @@ class Pipeline {
stageConfiguration.name,
new Stage(this, stageConfiguration)
);
i++;
}
this.$isValidated = true;
}

public get configuration(): LDWorkbenchConfiguration {
Expand All @@ -128,7 +125,6 @@ class Pipeline {
if (!(this.opts?.silent === true)) spinner.start();
let startFromStage = 0;
try {
this.validate();
if (this.opts?.startFromStageName !== undefined) {
if (/^\d+$/.test(this.opts.startFromStageName)) {
const ix = parseInt(this.opts.startFromStageName);
Expand Down
4 changes: 0 additions & 4 deletions src/lib/tests/Generator.class.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ describe('Generator Class', () => {
expect(generator).to.have.property('query');
expect(generator).to.have.property('engine');
expect(generator).to.have.property('endpoint');
expect(generator).to.have.property('source');
});
});
describe('run', () => {
Expand Down Expand Up @@ -108,8 +107,6 @@ describe('Generator Class', () => {
};
// read file after pipeline has finished
const pipelineParallelGenerators = new Pipeline(config, {silent: true});
pipelineParallelGenerators.validate();

await pipelineParallelGenerators.run();
const file = fs.readFileSync(filePath, {encoding: 'utf-8'});
const fileLines = file.split('\n').sort();
Expand Down Expand Up @@ -148,7 +145,6 @@ describe('Generator Class', () => {
],
};
const pipelineBatch = new Pipeline(batchConfiguration, {silent: true});
pipelineBatch.validate();
pipelineBatch
.run()
.then(() => {
Expand Down
1 change: 0 additions & 1 deletion src/lib/tests/Iterator.class.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ describe('Iterator Class', () => {
expect(iterator).to.have.property('query');
expect(iterator).to.have.property('endpoint');
expect(iterator).to.have.property('engine');
expect(iterator).to.have.property('source');
expect(iterator).to.have.property('$offset', 0);
expect(iterator).to.have.property('totalResults', 0);
});
Expand Down
14 changes: 4 additions & 10 deletions src/lib/tests/Pipeline.class.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ describe('Pipeline Class', () => {
expect(pipeline).to.be.an.instanceOf(Pipeline);
expect(pipeline).to.have.property('stages').that.is.a('Map');
expect(pipeline).to.have.property('dataDir').that.is.a('string');
expect(pipeline).to.have.property('$isValidated', false);
expect(pipeline).to.have.property('stageNames').that.is.an('array');
expect(pipeline).to.have.property('startTime').that.is.an('number');
expect(pipeline)
Expand Down Expand Up @@ -99,7 +98,6 @@ describe('Pipeline Class', () => {
],
};
const pipeline = new Pipeline(configuration, {silent: true});
pipeline.validate();

const stage1 = pipeline.stages.get('Stage 1')!;
const stage2 = pipeline.stages.get('Stage 2')!;
Expand Down Expand Up @@ -155,10 +153,9 @@ describe('Pipeline Class', () => {
destination: 'file://pipelines/data/example-pipeline.nt',
stages: [],
} as unknown as LDWorkbenchConfiguration;
const pipeline = new Pipeline(invalidConfiguration, {silent: true});
let failed = false;
try {
pipeline.validate();
new Pipeline(invalidConfiguration, {silent: true});
} catch (error) {
if (error instanceof Error) {
if (error.message === 'Your pipeline contains no stages.') {
Expand Down Expand Up @@ -204,10 +201,9 @@ describe('Pipeline Class', () => {
},
],
} as unknown as LDWorkbenchConfiguration;
const pipeline = new Pipeline(invalidConfiguration, {silent: true});
let failed = false;
try {
pipeline.validate();
new Pipeline(invalidConfiguration, {silent: true});
} catch (error) {
if (error instanceof Error) {
if (
Expand Down Expand Up @@ -259,10 +255,9 @@ describe('Pipeline Class', () => {
},
],
};
const pipeline = new Pipeline(configDuplicateStageName, {silent: true});
let failed = false;
try {
pipeline.validate();
new Pipeline(configDuplicateStageName, {silent: true});
} catch (error) {
if (error instanceof Error) {
if (
Expand Down Expand Up @@ -315,10 +310,9 @@ describe('Pipeline Class', () => {
},
],
};
const pipeline = new Pipeline(configDuplicateStageName, {silent: true});
let failed = false;
try {
pipeline.validate();
new Pipeline(configDuplicateStageName, {silent: true});
} catch (error) {
failed = true;
if (error instanceof Error) {
Expand Down
4 changes: 0 additions & 4 deletions src/lib/tests/PreviousStage.class.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ describe('PreviousStage Class', () => {
],
};
const pipeline = new Pipeline(config, {silent: true});
pipeline.validate();
const stage: Stage = new Stage(pipeline, config.stages[1]);
const stagesSoFar = Array.from(stage.pipeline.stages.keys());
const previousStage = new PreviousStage(stage, stagesSoFar.pop()!);
Expand Down Expand Up @@ -88,7 +87,6 @@ describe('PreviousStage Class', () => {
],
};
const pipeline = new Pipeline(config, {silent: true});
pipeline.validate();
const stage: Stage = new Stage(pipeline, config.stages[0]);
const stagesSoFar = Array.from(stage.pipeline.stages.keys());
const previousStage = new PreviousStage(stage, stagesSoFar.pop()!);
Expand Down Expand Up @@ -132,7 +130,6 @@ describe('PreviousStage Class', () => {
],
};
const pipeline = new Pipeline(config, {silent: true});
pipeline.validate();
const stageTwo: Stage = new Stage(pipeline, config.stages[1]);
const stagesSoFar = Array.from(stageTwo.pipeline.stages.keys());
const previousStage = new PreviousStage(stageTwo, stagesSoFar.pop()!); // should be stage one
Expand Down Expand Up @@ -178,7 +175,6 @@ describe('PreviousStage Class', () => {
],
};
const pipeline = new Pipeline(config, {silent: true});
pipeline.validate();
const stage: Stage = new Stage(pipeline, config.stages[1]);
const stagesSoFar = Array.from(stage.pipeline.stages.keys());
const previousStage = new PreviousStage(stage, stagesSoFar.pop()!);
Expand Down
1 change: 1 addition & 0 deletions src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ import {type QueryEngine as QueryEngineFile} from '@comunica/query-sparql-file';

export type Endpoint = File | URL | PreviousStage;
export type QueryEngine = QueryEngineSparql | QueryEngineFile;
export type QuerySource = {type?: string; value: string};
9 changes: 4 additions & 5 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,11 @@ async function main(): Promise<void> {
);
}

const pipeline = new Pipeline(configuration, {
startFromStageName: cliArgs.stage,
silent: cliArgs.silent,
});

try {
const pipeline = new Pipeline(configuration, {
startFromStageName: cliArgs.stage,
silent: cliArgs.silent,
});
await pipeline.run();
} catch (e) {
error(
Expand Down
3 changes: 1 addition & 2 deletions src/utils/getEndpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ export default function getEndpoint(
return new File(endpoint);
} else if (endpoint !== undefined) {
try {
// fix for GraphDB, see https://github.com/comunica/comunica/issues/962
return new URL((endpoint as string).replace(/^sparql@/, ''));
return new URL(endpoint);
} catch (e) {
throw new Error(`"${endpoint as string}" is not a valid URL`);
}
Expand Down
22 changes: 15 additions & 7 deletions src/utils/getEngineSource.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
import {isPreviousStage} from './guards.js';
import {existsSync} from 'fs';
import path from 'path';
import type {Endpoint} from '../lib/types.js';
import type {Endpoint, QuerySource} from '../lib/types.js';

export default function getEngineSource(endpoint: Endpoint): string {
let source: string;
export default function getEngineSource(endpoint: Endpoint): QuerySource {
if (isPreviousStage(endpoint)) {
const previousStage = endpoint.load();
if (!existsSync(previousStage.destinationPath)) {
throw new Error(
`The result from stage "${previousStage.name}" (${previousStage.destinationPath}) is not available, make sure to run that stage first`
);
}
source = path.resolve(previousStage.destinationPath);
} else {
source = endpoint.toString();
return {
type: 'file',
value: path.resolve(previousStage.destinationPath),
};
} else if (endpoint instanceof URL) {
return {
type: 'sparql',
value: endpoint.toString(),
};
}
return source;

return {
value: endpoint.toString(),
};
}
Loading