Skip to content

Commit

Permalink
Merge pull request #50 from maartyman/guard-watch-changes
Browse files Browse the repository at this point in the history
Guard watch changes
  • Loading branch information
maartyman authored Nov 20, 2023
2 parents 84b9bed + 4c9572d commit 00fae26
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
"@id": "urn:comunica:default:resource-watch/actors#polling",
"@type": "ActorResourceWatchPolling",
"mediatorHttp": { "@id": "urn:comunica:default:http/mediators#main" },
"defaultPollingFrequency": 10
"defaultPollingFrequency": 10,
"priority": 5
},
{
"@id": "urn:comunica:default:resource-watch/actors#solid-notification-websockets",
"@type": "ActorResourceWatchSolidNotificationWebsockets",
"mediatorHttp": { "@id": "urn:comunica:default:http/mediators#main" }
"mediatorHttp": { "@id": "urn:comunica:default:http/mediators#main" },
"priority": 10
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"@id": "urn:comunica:default:resource-watch/mediators#main",
"@type": "MediatorNumber",
"type": "max",
"field": "filterFactor",
"field": "priority",
"ignoreErrors": true,
"bus": { "@id": "ActorResourceWatch:_default_bus" }
}
38 changes: 16 additions & 22 deletions packages/actor-guard-naive/lib/ActorGuardNaive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { IActionGuard, IActorGuardOutput, IActorGuardArgs } from '@incremun
import { ActorGuard } from '@incremunica/bus-guard';
import type { MediatorResourceWatch } from '@incremunica/bus-resource-watch';
import type { Quad } from '@incremunica/incremental-types';
import { Transform } from 'readable-stream';
import { Store } from 'n3';

/**
* A comunica Naive Guard Actor.
Expand Down Expand Up @@ -38,39 +38,33 @@ export class ActorGuardNaive extends ActorGuard {
});

resourceWatch.events.on('update', async() => {
const store = action.streamingSource.store.copyOfStore();
const matchStream = new Transform({
transform(quad: Quad, _encoding, callback: (error?: (Error | null), data?: any) => void) {
if (store.has(quad)) {
store.delete(quad);
callback(null, null);
return;
}
quad.diff = true;
callback(null, quad);
},
objectMode: true,
});

const deletionStore = action.streamingSource.store.copyOfStore();
const additionStore = new Store();
const responseGet = await this.mediatorDereferenceRdf.mediate({
context: action.context,
url: action.url,
});

responseGet.data.on('end', () => {
for (const quad of store) {
(<Quad>quad).diff = false;
matchStream.push(quad);
responseGet.data.on('data', (quad: Quad) => {
if (deletionStore.has(quad)) {
deletionStore.delete(quad);
return;
}
matchStream.end();
additionStore.add(quad);
});

action.streamingSource.store.import(responseGet.data.pipe(matchStream, { end: false }));
responseGet.data.on('end', () => {
for (const quad of deletionStore) {
action.streamingSource.store.removeQuad(<Quad>quad);
}
for (const quad of additionStore) {
action.streamingSource.store.addQuad(<Quad>quad);
}
});
});

resourceWatch.events.on('delete', () => {
for (const quad of action.streamingSource.store.getStore()) {
(<Quad>quad).diff = false;
action.streamingSource.store.removeQuad(<Quad>quad);
}
});
Expand Down
4 changes: 2 additions & 2 deletions packages/actor-guard-naive/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
"@comunica/bus-dereference-rdf": "^2.10.0",
"@comunica/core": "^2.10.0",
"@incremunica/bus-guard": "^1.1.0",
"readable-stream": "^4.4.2",
"@incremunica/bus-resource-watch": "^1.0.0"
"@incremunica/bus-resource-watch": "^1.0.0",
"n3": "^1.17.2"
},
"scripts": {
"build": "npm run build:ts && npm run build:components",
Expand Down
71 changes: 45 additions & 26 deletions packages/actor-guard-naive/test/ActorGuardNaive-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ import {Bus} from '@comunica/core';
import {IActionDereferenceRdf, MediatorDereferenceRdf} from "@comunica/bus-dereference-rdf";
import {IActionGuard} from "@incremunica/bus-guard";
import {Transform} from "readable-stream";
import arrayifyStream from "arrayify-stream";
import 'jest-rdf';
import {Store} from "n3";
import { Store, DataFactory} from "n3";
import EventEmitter = require("events");
import {ActorGuardNaive} from "../lib";
import {IActionResourceWatch, IActorResourceWatchOutput, MediatorResourceWatch} from "@incremunica/bus-resource-watch";
Expand All @@ -29,6 +28,7 @@ describe('ActorGuardNaive', () => {
let streamingStoreEventEmitter: EventEmitter;
let changeNotificationEventEmitter: EventEmitter;
let removeQuadFn = jest.fn();
let addQuadFn = jest.fn();
let stopFn = jest.fn();
let onFn: () => void;
let hasEnded: {value: boolean};
Expand All @@ -39,6 +39,7 @@ describe('ActorGuardNaive', () => {
changeNotificationEventEmitter = new EventEmitter();
quadArrayStore = [];
removeQuadFn = jest.fn();
addQuadFn = jest.fn();
stopFn = jest.fn();
hasEnded = {value: false};

Expand Down Expand Up @@ -89,6 +90,7 @@ describe('ActorGuardNaive', () => {
return new Store(quadArrayStore);
},
removeQuad: (quad: any) => removeQuadFn(quad),
addQuad: (quad: any) => addQuadFn(quad),
}
}
}
Expand Down Expand Up @@ -123,17 +125,6 @@ describe('ActorGuardNaive', () => {

await actor.run(action);

let promise = new Promise<void>((resolve) => {
streamingStoreEventEmitter.once("data", async (stream) => {
let tempArray = await arrayifyStream(stream);
expect(tempArray).toBeRdfIsomorphic([
quad('s3', 'p3', 'o3')
]);
expect(tempArray[0].diff).toBeTruthy();
resolve();
});
});

quadArray = [
quad('s1', 'p1', 'o1'),
quad('s2', 'p2', 'o2'),
Expand All @@ -142,7 +133,16 @@ describe('ActorGuardNaive', () => {

changeNotificationEventEmitter.emit("update");

await promise;
await new Promise<void>((resolve) => setImmediate(resolve));

expect(addQuadFn).toHaveBeenCalledTimes(1);
expect(addQuadFn).toHaveBeenCalledWith(
DataFactory.quad(
DataFactory.namedNode('s3'),
DataFactory.namedNode('p3'),
DataFactory.namedNode('o3')
)
);
});

it('should attach a negative changes stream', async () => {
Expand All @@ -154,25 +154,23 @@ describe('ActorGuardNaive', () => {

await actor.run(action);

let promise = new Promise<void>((resolve) => {
streamingStoreEventEmitter.once("data", async (stream) => {
let tempArray = await arrayifyStream(stream);
expect(tempArray).toBeRdfIsomorphic([
quad('s3', 'p3', 'o3')
]);
expect(tempArray[0].diff).toBeFalsy();
resolve();
});
});

quadArray = [
quad('s1', 'p1', 'o1'),
quad('s2', 'p2', 'o2')
];

changeNotificationEventEmitter.emit("update");

await promise;
await new Promise<void>((resolve) => setImmediate(resolve));

expect(removeQuadFn).toHaveBeenCalledTimes(1);
expect(removeQuadFn).toHaveBeenCalledWith(
DataFactory.quad(
DataFactory.namedNode('s3'),
DataFactory.namedNode('p3'),
DataFactory.namedNode('o3')
)
);
});

it('should handle delete events', async () => {
Expand All @@ -189,6 +187,27 @@ describe('ActorGuardNaive', () => {
await new Promise<void>((resolve) => setImmediate(resolve));

expect(removeQuadFn).toHaveBeenCalledTimes(3);
expect(removeQuadFn).toHaveBeenNthCalledWith(1,
DataFactory.quad(
DataFactory.namedNode('s1'),
DataFactory.namedNode('p1'),
DataFactory.namedNode('o1')
)
);
expect(removeQuadFn).toHaveBeenNthCalledWith(2,
DataFactory.quad(
DataFactory.namedNode('s2'),
DataFactory.namedNode('p2'),
DataFactory.namedNode('o2')
)
);
expect(removeQuadFn).toHaveBeenNthCalledWith(3,
DataFactory.quad(
DataFactory.namedNode('s3'),
DataFactory.namedNode('p3'),
DataFactory.namedNode('o3')
)
);
});
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export class ActorResourceWatchPolling extends ActorResourceWatch {
}

public async test(action: IActionResourceWatch): Promise<IActorTest> {
return true;
return { priority: this.priority };
}

public async run(action: IActionResourceWatch): Promise<IActorResourceWatchOutput> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ describe('ActorGuardPolling', () => {
let mediatorHttp: Mediator<
Actor<IActionHttp, IActorTest, IActorHttpOutput>,
IActionHttp, IActorTest, IActorHttpOutput>;
let priority: number;

let action: IActionResourceWatch;
let headersObject: {
Expand All @@ -25,6 +26,7 @@ describe('ActorGuardPolling', () => {
};

beforeEach(() => {
priority = 0;
headersObject = {
age: undefined,
'cache-control': undefined,
Expand Down Expand Up @@ -53,7 +55,9 @@ describe('ActorGuardPolling', () => {
beforeActors: [],
mediatorHttp: mediatorHttp,
defaultPollingFrequency: 1,
name: 'actor', bus
priority: priority,
name: 'actor',
bus
});

action = {
Expand All @@ -68,7 +72,7 @@ describe('ActorGuardPolling', () => {
});

it('should test', () => {
return expect(actor.test(<any>{})).resolves.toBeTruthy();
return expect(actor.test(action)).resolves.toEqual({ priority: priority });
});

it('should get an update if the etag changes', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export class ActorResourceWatchSolidNotificationWebsockets extends ActorResource
throw new Error('Resource does not support Solid Notifications with Websockets');
}

return true;
return { priority: this.priority };
}

public async run(action: IActionResourceWatch): Promise<IActorResourceWatchOutput> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ describe('ActorResourceWatchSolidNotificationWebsockets', () => {
Actor<IActionHttp, IActorTest, IActorHttpOutput>,
IActionHttp, IActorTest, IActorHttpOutput>;
let action: IActionResourceWatch;
let priority: number;
let createResourceRequestFn: (url: string) => any;
let createDescriptionResourceRequestFn: (url: string) => any;
let createChannelDescriptionRequestFn: (url: string) => any;

beforeEach(() => {
bus = new Bus({name: 'bus'});
priority = 0;

createResourceRequestFn = (url: string): any => {
throw Error("createResourceRequestFn not set");
Expand Down Expand Up @@ -97,7 +99,9 @@ describe('ActorResourceWatchSolidNotificationWebsockets', () => {
actor = new ActorResourceWatchSolidNotificationWebsockets({
beforeActors: [],
mediatorHttp: mediatorHttp,
name: 'actor', bus
name: 'actor',
bus,
priority: priority,
});

action = {
Expand All @@ -119,7 +123,7 @@ describe('ActorResourceWatchSolidNotificationWebsockets', () => {

let result = await actor.test(action);

expect(result).toBeTruthy();
expect(result).toEqual({priority: priority});
});

it('should not test if first get fails', async () => {
Expand Down
10 changes: 8 additions & 2 deletions packages/bus-resource-watch/lib/ActorResourceWatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { Actor } from '@comunica/core';
* @see IActorResourceWatchOutput
*/
export abstract class ActorResourceWatch extends Actor<IActionResourceWatch, IActorTest, IActorResourceWatchOutput> {
public readonly priority: number;
/**
* @param args - @defaultNested {<default_bus> a <cc:components/Bus.jsonld#Bus>} bus
*/
Expand Down Expand Up @@ -49,8 +50,13 @@ export interface IActorResourceWatchOutput extends IActorOutput {
stopFunction: () => void;
}

export type IActorResourceWatchArgs = IActorArgs<
IActionResourceWatch, IActorTest, IActorResourceWatchOutput>;
export interface IActorResourceWatchArgs extends IActorArgs<
IActionResourceWatch, IActorTest, IActorResourceWatchOutput> {
/**
* The priority of the actor.
*/
priority: number;
}

export type MediatorResourceWatch = Mediate<
IActionResourceWatch, IActorResourceWatchOutput>;
2 changes: 1 addition & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -11201,7 +11201,7 @@ n3@^1.11.1, n3@^1.16.3, n3@^1.16.4, n3@^1.6.3:
queue-microtask "^1.1.2"
readable-stream "^4.0.0"

n3@^1.17.0:
n3@^1.17.0, n3@^1.17.2:
version "1.17.2"
resolved "https://registry.yarnpkg.com/n3/-/n3-1.17.2.tgz#3370b2d07da98a5b2865fa43c2d4e5c563cc65df"
integrity sha512-BxSM52wYFqXrbQQT5WUEzKUn6qpYV+2L4XZLfn3Gblz2kwZ09S+QxC33WNdVEQy2djenFL8SNkrjejEKlvI6+Q==
Expand Down

0 comments on commit 00fae26

Please sign in to comment.