Skip to content

Commit

Permalink
fix: fix the logic for halting and resuming sources in ActorRdfJoinIn…
Browse files Browse the repository at this point in the history
…nerIncrementalComputationalMultiBind class
  • Loading branch information
maartyman authored and gitbutler-client committed Feb 15, 2024
1 parent a058852 commit 2ee77ef
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,26 @@ export class ActorRdfJoinInnerIncrementalComputationalMultiBind extends ActorRdf
});
}

public static haltSources(sources: DataSources): void {
public static haltSources(sources: DataSources): { resume: () => void }[] {
const sourcesToResume: { resume: () => void }[] = [];
for (const source of sources) {
if (typeof source !== 'string' && 'resume' in source && 'halt' in source) {
if (
typeof source !== 'string' &&
'resume' in source &&
'halt' in source &&
'isHalted' in source &&
!(<any>source).isHalted()
) {
(<any>source).halt();
sourcesToResume.push(<{ resume: () => void }>source);
}
}
return sourcesToResume;
}

public static resumeSources(sources: DataSources): void {
public static resumeSources(sources: { resume: () => void }[]): void {
for (const source of sources) {
if (typeof source !== 'string' && 'resume' in source && 'halt' in source) {
(<any>source).resume();
}
(<any>source).resume();
}
}

Expand Down Expand Up @@ -123,15 +130,15 @@ export class ActorRdfJoinInnerIncrementalComputationalMultiBind extends ActorRdf
const activeElement = hashData.elements[hashData.elements.length - 1];
hashData.elements.pop();

ActorRdfJoinInnerIncrementalComputationalMultiBind.haltSources(sources);
const sourcesToResume = ActorRdfJoinInnerIncrementalComputationalMultiBind.haltSources(sources);

let activeIteratorStopped = false;
let newIteratorStopped = false;

activeElement.iterator.on('end', () => {
activeIteratorStopped = true;
if (newIteratorStopped) {
ActorRdfJoinInnerIncrementalComputationalMultiBind.resumeSources(sources);
ActorRdfJoinInnerIncrementalComputationalMultiBind.resumeSources(sourcesToResume);
}
activeElement.iterator.removeAllListeners();
});
Expand All @@ -144,7 +151,7 @@ export class ActorRdfJoinInnerIncrementalComputationalMultiBind extends ActorRdf
newIterator.on('end', () => {
newIteratorStopped = true;
if (activeIteratorStopped) {
ActorRdfJoinInnerIncrementalComputationalMultiBind.resumeSources(sources);
ActorRdfJoinInnerIncrementalComputationalMultiBind.resumeSources(sourcesToResume);
}
newIterator.removeAllListeners();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,7 @@ describe('ActorRdfJoinIncrementalComputationalMultiBind', () => {
resumeMock = jest.fn();

let mockStreamingStore = {
isHalted: () => false,
halt: haltMock,
resume: resumeMock
}
Expand Down Expand Up @@ -1384,6 +1385,7 @@ describe('ActorRdfJoinIncrementalComputationalMultiBind', () => {
resumeMock = jest.fn();

let mockStreamingStore = {
isHalted: () => false,
halt: haltMock,
resume: resumeMock
}
Expand Down

0 comments on commit 2ee77ef

Please sign in to comment.