Skip to content

Commit

Permalink
remove ReadableCursorStream
Browse files Browse the repository at this point in the history
  • Loading branch information
baileympearson committed Jan 9, 2025
1 parent 3216d33 commit 267ae91
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 141 deletions.
114 changes: 5 additions & 109 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Readable, Transform } from 'stream';
import { Readable } from 'stream';

import { type BSONSerializeOptions, type Document, Long, pluckBSONSerializeOptions } from '../bson';
import { type OnDemandDocumentDeserializeOptions } from '../cmap/wire_protocol/on_demand/document';
Expand Down Expand Up @@ -496,33 +496,10 @@ export abstract class AbstractCursor<
}

stream(options?: CursorStreamOptions): Readable & AsyncIterable<TSchema> {
if (options?.transform) {
const transform = options.transform;
const readable = new ReadableCursorStream(this);

const transformedStream = readable.pipe(
new Transform({
objectMode: true,
highWaterMark: 1,
transform(chunk, _, callback) {
try {
const transformed = transform(chunk);
callback(undefined, transformed);
} catch (err) {
callback(err);
}
}
})
);

// Bubble errors to transformed stream, because otherwise no way
// to handle this error.
readable.on('error', err => transformedStream.emit('error', err));

return transformedStream;
}

return new ReadableCursorStream(this);
const transform = options?.transform ?? (doc => doc);
return Readable.from(this, { autoDestroy: false, highWaterMark: 1, objectMode: true }).map(
transform
);
}

async hasNext(): Promise<boolean> {
Expand Down Expand Up @@ -1062,87 +1039,6 @@ export abstract class AbstractCursor<
}
}

class ReadableCursorStream extends Readable {
private _cursor: AbstractCursor;
private _readInProgress = false;

constructor(cursor: AbstractCursor) {
super({
objectMode: true,
autoDestroy: false,
highWaterMark: 1
});
this._cursor = cursor;
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
override _read(size: number): void {
if (!this._readInProgress) {
this._readInProgress = true;
this._readNext();
}
}

override _destroy(error: Error | null, callback: (error?: Error | null) => void): void {
this._cursor.close().then(
() => callback(error),
closeError => callback(closeError)
);
}

private _readNext() {
if (this._cursor.id === Long.ZERO) {
this.push(null);
return;
}

this._cursor.next().then(
result => {
if (result == null) {
this.push(null);
} else if (this.destroyed) {
this._cursor.close().then(undefined, squashError);
} else {
if (this.push(result)) {
return this._readNext();
}

this._readInProgress = false;
}
},
err => {
// NOTE: This is questionable, but we have a test backing the behavior. It seems the
// desired behavior is that a stream ends cleanly when a user explicitly closes
// a client during iteration. Alternatively, we could do the "right" thing and
// propagate the error message by removing this special case.
if (err.message.match(/server is closed/)) {
this._cursor.close().then(undefined, squashError);
return this.push(null);
}

// NOTE: This is also perhaps questionable. The rationale here is that these errors tend
// to be "operation was interrupted", where a cursor has been closed but there is an
// active getMore in-flight. This used to check if the cursor was killed but once
// that changed to happen in cleanup legitimate errors would not destroy the
// stream. There are change streams test specifically test these cases.
if (err.message.match(/operation was interrupted/)) {
return this.push(null);
}

// NOTE: The two above checks on the message of the error will cause a null to be pushed
// to the stream, thus closing the stream before the destroy call happens. This means
// that either of those error messages on a change stream will not get a proper
// 'error' event to be emitted (the error passed to destroy). Change stream resumability
// relies on that error event to be emitted to create its new cursor and thus was not
// working on 4.4 servers because the error emitted on failover was "interrupted at
// shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down".
// See NODE-4475.
return this.destroy(err);
}
);
}
}

configureResourceManagement(AbstractCursor.prototype);

/**
Expand Down
42 changes: 18 additions & 24 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1495,40 +1495,34 @@ describe('Cursor', function () {
}
});

it('does not auto destroy streams', function (done) {
it('does not auto destroy streams', async function () {
const docs = [];

for (var i = 0; i < 10; i++) {
docs.push({ a: i + 1 });
}

const configuration = this.configuration;
client.connect((err, client) => {
expect(err).to.not.exist;
await client.connect();

const db = client.db(configuration.db);
db.createCollection('does_not_autodestroy_streams', (err, collection) => {
expect(err).to.not.exist;
const db = client.db(configuration.db);
const collection = await db.createCollection('does_not_autodestroy_streams');

collection.insertMany(docs, configuration.writeConcernMax(), err => {
expect(err).to.not.exist;
await collection.insertMany(docs, configuration.writeConcernMax());

const cursor = collection.find();
const stream = cursor.stream();
stream.on('close', () => {
expect.fail('extra close event must not be called');
});
stream.on('end', () => {
client.close();
done();
});
stream.on('data', doc => {
expect(doc).to.exist;
});
stream.resume();
});
});
const cursor = collection.find();
const stream = cursor.stream();

const end$ = once(stream, 'end');
const close$ = once(stream, 'close').then(() => {
expect.fail('extra close event must not be called');
});

stream.resume();

await Promise.race([end$, close$]);

await client.close();
});

it('should be able to stream documents', {
Expand Down Expand Up @@ -2321,7 +2315,7 @@ describe('Cursor', function () {
.find()
.withReadPreference('notsecondary');
test.ok(false);
} catch (err) {} // eslint-disable-line
} catch (err) { } // eslint-disable-line

db.collection('shouldFailToSetReadPreferenceOnCursor')
.find()
Expand Down
9 changes: 1 addition & 8 deletions test/integration/node-specific/abstract_cursor.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { expect } from 'chai';
import { once } from 'events';
import * as sinon from 'sinon';
import { Transform } from 'stream';
import { inspect } from 'util';

import {
Expand Down Expand Up @@ -299,14 +298,8 @@ describe('class AbstractCursor', function () {
});

it('propagates errors to transform stream', async function () {
const transform = new Transform({
transform(data, encoding, callback) {
callback(null, data);
}
});

// MongoServerError: unknown operator: $bar
const stream = collection.find({ foo: { $bar: 25 } }).stream({ transform });
const stream = collection.find({ foo: { $bar: 25 } }).stream({ transform: doc => doc });

const error: Error | null = await new Promise(resolve => {
stream.on('error', error => resolve(error));
Expand Down

0 comments on commit 267ae91

Please sign in to comment.