Skip to content

Commit

Permalink
fix deferred resource watch
Browse files Browse the repository at this point in the history
  • Loading branch information
maartyman committed Jan 3, 2025
1 parent c9c48a0 commit c922346
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 63 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { EventEmitter } from 'events';
import type { MediatorHttp } from '@comunica/bus-http';
import type { IActorTest, TestResult } from '@comunica/core';
import { failTest, passTest } from '@comunica/core';
import type {
Expand All @@ -11,7 +12,6 @@ import {
ActorResourceWatch,
} from '@incremunica/bus-resource-watch';
import { KeysResourceWatch } from '@incremunica/context-entries';
import type {MediatorHttp} from "@comunica/bus-http";

/**
* An incremunica Deferred Resource Watch Actor.
Expand All @@ -26,6 +26,21 @@ export class ActorResourceWatchDeferred extends ActorResourceWatch {
if (!action.context.has(KeysResourceWatch.deferredEvaluationEventEmitter)) {
return failTest('Context does not have \'deferredEvaluationEventEmitter\'');
}
const responseHead = await this.mediatorHttp.mediate(
{
context: action.context,
input: action.url,
init: {
method: 'HEAD',
},
},
);
if (!responseHead.ok) {
return failTest('Source does not support HEAD requests');
}
if (!responseHead.headers.get('etag')) {
return failTest('Source does not support etag headers');
}
return passTest({ priority: this.priority });
}

Expand All @@ -37,7 +52,6 @@ export class ActorResourceWatchDeferred extends ActorResourceWatch {

let etag = action.metadata.etag;
const checkForChanges = (): void => {
// TODO [2024-12-19]: what if the source doesn't support HEAD requests, if it's a SPARQL endpoint for example?
this.mediatorHttp.mediate(
{
context: action.context,
Expand All @@ -47,7 +61,7 @@ export class ActorResourceWatchDeferred extends ActorResourceWatch {
},
},
).then((responseHead) => {
// TODO [2024-12-01]: have more specific error handling for example 304: Not Modified should not emit 'delete'
// TODO [2025-08-01]: have more specific error handling for example 304: Not Modified should not emit 'delete'
if (!responseHead.ok) {
outputEvents.emit('delete');
}
Expand All @@ -58,7 +72,7 @@ export class ActorResourceWatchDeferred extends ActorResourceWatch {
}).catch(() => {
outputEvents.emit('delete');
});
}
};

let running = false;
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,96 +18,130 @@ describe('ActorGuardDeferred', () => {
let priority: number;
let action: IActionResourceWatch;
let deferredEventEmitter: EventEmitter;
let mediatorHttp: any;

beforeEach(() => {
deferredEventEmitter = new EventEmitter();
const context = new ActionContext().set(KeysResourceWatch.deferredEvaluationEventEmitter, deferredEventEmitter);
priority = 0;
mediatorHttp = <any> { mediate: jest.fn(() => Promise.resolve({ ok: true, headers: { get: () => '0' } })) };

actor = new ActorResourceWatchDeferred({
priority,
name: 'actor',
bus,
mediatorHttp: mediatorHttp,
});

action = {
context,
url: 'www.test.com',
metadata: {
etag: 0,
etag: '0',
'cache-control': undefined,
age: undefined,
},
};
});

it('should test', async() => {
await expect(actor.test(action)).resolves.toPassTest({
priority: 0,
describe('test', () => {
it('should test', async () => {
await expect(actor.test(action)).resolves.toPassTest({
priority: 0,
});
});
});

it('should not test', async() => {
action.context = new ActionContext();
await expect(actor.test(action)).resolves.toFailTest('Context does not have \'deferredEvaluationEventEmitter\'');
});
it('should not test if context doesn\'t have deferredEvaluationEventEmitter', async() => {
action.context = new ActionContext();
await expect(actor.test(action)).resolves
.toFailTest('Context does not have \'deferredEvaluationEventEmitter\'');
});

it('should run', async() => {
const result = await actor.run(action);
expect(result.events).toBeInstanceOf(EventEmitter);
expect(result.start).toBeInstanceOf(Function);
expect(result.stop).toBeInstanceOf(Function);
});
it('should not test if source doesn\'t support HEAD requests', async () => {
mediatorHttp.mediate = jest.fn(() => Promise.resolve({ ok: false }));
await expect(actor.test(action)).resolves.toFailTest('Source does not support HEAD requests');
});

it('should start and stop', async() => {
const result = await actor.run(action);
result.start();
expect(deferredEventEmitter.listenerCount('update')).toBe(1);
result.stop();
expect(deferredEventEmitter.listenerCount('update')).toBe(0);
it('should not test if source doesn\'t support etags', async () => {
mediatorHttp.mediate = jest.fn(() => Promise.resolve({ ok: true, headers: { get: () => null } }));
await expect(actor.test(action)).resolves.toFailTest('Source does not support etag headers');
});
});

it('should start and stop multiple times', async() => {
const result = await actor.run(action);
result.start();
result.start();
expect(deferredEventEmitter.listenerCount('update')).toBe(1);
result.stop();
result.stop();
expect(deferredEventEmitter.listenerCount('update')).toBe(0);
result.start();
result.start();
expect(deferredEventEmitter.listenerCount('update')).toBe(1);
result.stop();
result.stop();
expect(deferredEventEmitter.listenerCount('update')).toBe(0);
});
describe('run', () => {
it('should run', async () => {
const result = await actor.run(action);
expect(result.events).toBeInstanceOf(EventEmitter);
expect(result.start).toBeInstanceOf(Function);
expect(result.stop).toBeInstanceOf(Function);
});

it('should not emit update events if not started', async() => {
const result = await actor.run(action);
const listener = jest.fn();
result.events.on('update', listener);
deferredEventEmitter.emit('update');
expect(listener).toHaveBeenCalledTimes(0);
});
it('should start and stop', async () => {
const result = await actor.run(action);
result.start();
expect(deferredEventEmitter.listenerCount('update')).toBe(1);
result.stop();
expect(deferredEventEmitter.listenerCount('update')).toBe(0);
});

it('should emit update events if started', async() => {
const result = await actor.run(action);
const listener = jest.fn();
result.events.on('update', listener);
result.start();
deferredEventEmitter.emit('update');
expect(listener).toHaveBeenCalledTimes(1);
});
it('should start and stop multiple times', async () => {
const result = await actor.run(action);
result.start();
result.start();
expect(deferredEventEmitter.listenerCount('update')).toBe(1);
result.stop();
result.stop();
expect(deferredEventEmitter.listenerCount('update')).toBe(0);
result.start();
result.start();
expect(deferredEventEmitter.listenerCount('update')).toBe(1);
result.stop();
result.stop();
expect(deferredEventEmitter.listenerCount('update')).toBe(0);
});

it('should not emit update events if stopped', async() => {
const result = await actor.run(action);
const listener = jest.fn();
result.events.on('update', listener);
result.start();
result.stop();
deferredEventEmitter.emit('update');
expect(listener).toHaveBeenCalledTimes(0);
it('should not emit update events if not started', async () => {
const result = await actor.run(action);
const listener = jest.fn();
result.events.on('update', listener);
deferredEventEmitter.emit('update');
expect(listener).toHaveBeenCalledTimes(0);
});

it('should not emit update events if started and no changes', async () => {
const result = await actor.run(action);
const listener = jest.fn();
result.events.on('update', listener);
result.start();
deferredEventEmitter.emit('update');
expect(mediatorHttp.mediate).toHaveBeenCalledTimes(1);
//make sure the promise the mediator promise is resolved
await new Promise(resolve => setImmediate(resolve));
expect(listener).toHaveBeenCalledTimes(0);
});

it('should emit update events if started and changes', async () => {
const result = await actor.run(action);
const listener = jest.fn();
result.events.on('update', listener);
mediatorHttp.mediate = jest.fn(() => Promise.resolve({ ok: true, headers: { get: () => '1' } }));
result.start();
deferredEventEmitter.emit('update');
expect(mediatorHttp.mediate).toHaveBeenCalledTimes(1);
//make sure the promise the mediator promise is resolved
await new Promise(resolve => setImmediate(resolve));
expect(listener).toHaveBeenCalledTimes(1);
});

it('should not emit update events if stopped', async () => {
const result = await actor.run(action);
const listener = jest.fn();
result.events.on('update', listener);
result.start();
result.stop();
deferredEventEmitter.emit('update');
expect(listener).toHaveBeenCalledTimes(0);
});
});
});
});

0 comments on commit c922346

Please sign in to comment.