From 2ee77efd0506982c2ee5c368c0254e53718ff1b7 Mon Sep 17 00:00:00 2001 From: maartenvandenbrande Date: Thu, 15 Feb 2024 13:53:16 +0100 Subject: [PATCH] fix: fix the logic for halting and resuming sources in ActorRdfJoinInnerIncrementalComputationalMultiBind class --- ...nInnerIncrementalComputationalMultiBind.ts | 25 ++++++++++++------- ...rementalComputationalMultiBindJoin-test.ts | 2 ++ 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/packages/actor-rdf-join-inner-incremental-computational-multi-bind/lib/ActorRdfJoinInnerIncrementalComputationalMultiBind.ts b/packages/actor-rdf-join-inner-incremental-computational-multi-bind/lib/ActorRdfJoinInnerIncrementalComputationalMultiBind.ts index 71d0c945..2af73517 100644 --- a/packages/actor-rdf-join-inner-incremental-computational-multi-bind/lib/ActorRdfJoinInnerIncrementalComputationalMultiBind.ts +++ b/packages/actor-rdf-join-inner-incremental-computational-multi-bind/lib/ActorRdfJoinInnerIncrementalComputationalMultiBind.ts @@ -40,19 +40,26 @@ export class ActorRdfJoinInnerIncrementalComputationalMultiBind extends ActorRdf }); } - public static haltSources(sources: DataSources): void { + public static haltSources(sources: DataSources): { resume: () => void }[] { + const sourcesToResume: { resume: () => void }[] = []; for (const source of sources) { - if (typeof source !== 'string' && 'resume' in source && 'halt' in source) { + if ( + typeof source !== 'string' && + 'resume' in source && + 'halt' in source && + 'isHalted' in source && + !(source).isHalted() + ) { (source).halt(); + sourcesToResume.push(<{ resume: () => void }>source); } } + return sourcesToResume; } - public static resumeSources(sources: DataSources): void { + public static resumeSources(sources: { resume: () => void }[]): void { for (const source of sources) { - if (typeof source !== 'string' && 'resume' in source && 'halt' in source) { - (source).resume(); - } + (source).resume(); } } @@ -123,7 +130,7 @@ export class ActorRdfJoinInnerIncrementalComputationalMultiBind extends ActorRdf const activeElement = hashData.elements[hashData.elements.length - 1]; hashData.elements.pop(); - ActorRdfJoinInnerIncrementalComputationalMultiBind.haltSources(sources); + const sourcesToResume = ActorRdfJoinInnerIncrementalComputationalMultiBind.haltSources(sources); let activeIteratorStopped = false; let newIteratorStopped = false; @@ -131,7 +138,7 @@ export class ActorRdfJoinInnerIncrementalComputationalMultiBind extends ActorRdf activeElement.iterator.on('end', () => { activeIteratorStopped = true; if (newIteratorStopped) { - ActorRdfJoinInnerIncrementalComputationalMultiBind.resumeSources(sources); + ActorRdfJoinInnerIncrementalComputationalMultiBind.resumeSources(sourcesToResume); } activeElement.iterator.removeAllListeners(); }); @@ -144,7 +151,7 @@ export class ActorRdfJoinInnerIncrementalComputationalMultiBind extends ActorRdf newIterator.on('end', () => { newIteratorStopped = true; if (activeIteratorStopped) { - ActorRdfJoinInnerIncrementalComputationalMultiBind.resumeSources(sources); + ActorRdfJoinInnerIncrementalComputationalMultiBind.resumeSources(sourcesToResume); } newIterator.removeAllListeners(); }); diff --git a/packages/actor-rdf-join-inner-incremental-computational-multi-bind/test/ActorRdfJoinInnerIncrementalComputationalMultiBindJoin-test.ts b/packages/actor-rdf-join-inner-incremental-computational-multi-bind/test/ActorRdfJoinInnerIncrementalComputationalMultiBindJoin-test.ts index 7e2b298d..2fd98ddb 100644 --- a/packages/actor-rdf-join-inner-incremental-computational-multi-bind/test/ActorRdfJoinInnerIncrementalComputationalMultiBindJoin-test.ts +++ b/packages/actor-rdf-join-inner-incremental-computational-multi-bind/test/ActorRdfJoinInnerIncrementalComputationalMultiBindJoin-test.ts @@ -1180,6 +1180,7 @@ describe('ActorRdfJoinIncrementalComputationalMultiBind', () => { resumeMock = jest.fn(); let mockStreamingStore = { + isHalted: () => false, halt: haltMock, resume: resumeMock } @@ -1384,6 +1385,7 @@ describe('ActorRdfJoinIncrementalComputationalMultiBind', () => { resumeMock = jest.fn(); let mockStreamingStore = { + isHalted: () => false, halt: haltMock, resume: resumeMock }