From 3508209d9ac321b45cb9737ffe3a1413ca51c15c Mon Sep 17 00:00:00 2001 From: maartenvandenbrande Date: Mon, 25 Nov 2024 09:40:00 +0100 Subject: [PATCH] fix streamSource event emitter memory leak --- .../lib/StreamQuerySources.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/actor-query-source-identify-stream/lib/StreamQuerySources.ts b/packages/actor-query-source-identify-stream/lib/StreamQuerySources.ts index 2360cc2a..aaa67ff3 100644 --- a/packages/actor-query-source-identify-stream/lib/StreamQuerySources.ts +++ b/packages/actor-query-source-identify-stream/lib/StreamQuerySources.ts @@ -49,6 +49,7 @@ export class StreamQuerySources implements IQuerySource { context: IActionContext, ) { this.sourcesEventEmitter = new EventEmitter(); + this.sourcesEventEmitter.setMaxListeners(Infinity); stream.on('data', (item: IStreamQuerySource) => { if (item.isAddition) { const chunk: ISourceWrapper = { @@ -144,10 +145,15 @@ export class StreamQuerySources implements IQuerySource { }; iterator.readable = true; - this.sourcesEventEmitter.on('data', (sourceWrapper: ISourceWrapperSafe) => { + const addSourceToBuffer = (sourceWrapper: ISourceWrapperSafe) => { + if (iterator.done) { + this.sourcesEventEmitter.removeListener('data', addSourceToBuffer); + return; + } buffer.push(sourceWrapper); iterator.readable = true; - }); + } + this.sourcesEventEmitter.on('data', addSourceToBuffer); const unionIterator = new UnionIterator(iterator, { autoStart: false });