From 267ae916e9c3235eecbae262ed92a2d5bd7caa7e Mon Sep 17 00:00:00 2001 From: bailey Date: Thu, 9 Jan 2025 07:15:25 -0700 Subject: [PATCH] remove ReadableCursorStream --- src/cursor/abstract_cursor.ts | 114 +----------------- test/integration/crud/misc_cursors.test.js | 42 +++---- .../node-specific/abstract_cursor.test.ts | 9 +- 3 files changed, 24 insertions(+), 141 deletions(-) diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 8eccdfcf63..6fe1ccd0a8 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -1,4 +1,4 @@ -import { Readable, Transform } from 'stream'; +import { Readable } from 'stream'; import { type BSONSerializeOptions, type Document, Long, pluckBSONSerializeOptions } from '../bson'; import { type OnDemandDocumentDeserializeOptions } from '../cmap/wire_protocol/on_demand/document'; @@ -496,33 +496,10 @@ export abstract class AbstractCursor< } stream(options?: CursorStreamOptions): Readable & AsyncIterable { - if (options?.transform) { - const transform = options.transform; - const readable = new ReadableCursorStream(this); - - const transformedStream = readable.pipe( - new Transform({ - objectMode: true, - highWaterMark: 1, - transform(chunk, _, callback) { - try { - const transformed = transform(chunk); - callback(undefined, transformed); - } catch (err) { - callback(err); - } - } - }) - ); - - // Bubble errors to transformed stream, because otherwise no way - // to handle this error. - readable.on('error', err => transformedStream.emit('error', err)); - - return transformedStream; - } - - return new ReadableCursorStream(this); + const transform = options?.transform ?? (doc => doc); + return Readable.from(this, { autoDestroy: false, highWaterMark: 1, objectMode: true }).map( + transform + ); } async hasNext(): Promise { @@ -1062,87 +1039,6 @@ export abstract class AbstractCursor< } } -class ReadableCursorStream extends Readable { - private _cursor: AbstractCursor; - private _readInProgress = false; - - constructor(cursor: AbstractCursor) { - super({ - objectMode: true, - autoDestroy: false, - highWaterMark: 1 - }); - this._cursor = cursor; - } - - // eslint-disable-next-line @typescript-eslint/no-unused-vars - override _read(size: number): void { - if (!this._readInProgress) { - this._readInProgress = true; - this._readNext(); - } - } - - override _destroy(error: Error | null, callback: (error?: Error | null) => void): void { - this._cursor.close().then( - () => callback(error), - closeError => callback(closeError) - ); - } - - private _readNext() { - if (this._cursor.id === Long.ZERO) { - this.push(null); - return; - } - - this._cursor.next().then( - result => { - if (result == null) { - this.push(null); - } else if (this.destroyed) { - this._cursor.close().then(undefined, squashError); - } else { - if (this.push(result)) { - return this._readNext(); - } - - this._readInProgress = false; - } - }, - err => { - // NOTE: This is questionable, but we have a test backing the behavior. It seems the - // desired behavior is that a stream ends cleanly when a user explicitly closes - // a client during iteration. Alternatively, we could do the "right" thing and - // propagate the error message by removing this special case. - if (err.message.match(/server is closed/)) { - this._cursor.close().then(undefined, squashError); - return this.push(null); - } - - // NOTE: This is also perhaps questionable. The rationale here is that these errors tend - // to be "operation was interrupted", where a cursor has been closed but there is an - // active getMore in-flight. This used to check if the cursor was killed but once - // that changed to happen in cleanup legitimate errors would not destroy the - // stream. There are change streams test specifically test these cases. - if (err.message.match(/operation was interrupted/)) { - return this.push(null); - } - - // NOTE: The two above checks on the message of the error will cause a null to be pushed - // to the stream, thus closing the stream before the destroy call happens. This means - // that either of those error messages on a change stream will not get a proper - // 'error' event to be emitted (the error passed to destroy). Change stream resumability - // relies on that error event to be emitted to create its new cursor and thus was not - // working on 4.4 servers because the error emitted on failover was "interrupted at - // shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down". - // See NODE-4475. - return this.destroy(err); - } - ); - } -} - configureResourceManagement(AbstractCursor.prototype); /** diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index b8de060b6b..e225e3df93 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -1495,7 +1495,7 @@ describe('Cursor', function () { } }); - it('does not auto destroy streams', function (done) { + it('does not auto destroy streams', async function () { const docs = []; for (var i = 0; i < 10; i++) { @@ -1503,32 +1503,26 @@ describe('Cursor', function () { } const configuration = this.configuration; - client.connect((err, client) => { - expect(err).to.not.exist; + await client.connect(); - const db = client.db(configuration.db); - db.createCollection('does_not_autodestroy_streams', (err, collection) => { - expect(err).to.not.exist; + const db = client.db(configuration.db); + const collection = await db.createCollection('does_not_autodestroy_streams'); - collection.insertMany(docs, configuration.writeConcernMax(), err => { - expect(err).to.not.exist; + await collection.insertMany(docs, configuration.writeConcernMax()); - const cursor = collection.find(); - const stream = cursor.stream(); - stream.on('close', () => { - expect.fail('extra close event must not be called'); - }); - stream.on('end', () => { - client.close(); - done(); - }); - stream.on('data', doc => { - expect(doc).to.exist; - }); - stream.resume(); - }); - }); + const cursor = collection.find(); + const stream = cursor.stream(); + + const end$ = once(stream, 'end'); + const close$ = once(stream, 'close').then(() => { + expect.fail('extra close event must not be called'); }); + + stream.resume(); + + await Promise.race([end$, close$]); + + await client.close(); }); it('should be able to stream documents', { @@ -2321,7 +2315,7 @@ describe('Cursor', function () { .find() .withReadPreference('notsecondary'); test.ok(false); - } catch (err) {} // eslint-disable-line + } catch (err) { } // eslint-disable-line db.collection('shouldFailToSetReadPreferenceOnCursor') .find() diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index ac060c9d45..358f3b9216 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -1,7 +1,6 @@ import { expect } from 'chai'; import { once } from 'events'; import * as sinon from 'sinon'; -import { Transform } from 'stream'; import { inspect } from 'util'; import { @@ -299,14 +298,8 @@ describe('class AbstractCursor', function () { }); it('propagates errors to transform stream', async function () { - const transform = new Transform({ - transform(data, encoding, callback) { - callback(null, data); - } - }); - // MongoServerError: unknown operator: $bar - const stream = collection.find({ foo: { $bar: 25 } }).stream({ transform }); + const stream = collection.find({ foo: { $bar: 25 } }).stream({ transform: doc => doc }); const error: Error | null = await new Promise(resolve => { stream.on('error', error => resolve(error));