Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6633): MongoClient.close closes active cursors #4372

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ export interface CursorStreamOptions {
/** @public */
export type CursorFlag = (typeof CURSOR_FLAGS)[number];

function removeActiveCursor(this: AbstractCursor) {
this.client.s.activeCursors.delete(this);
}

/**
* @public
* @experimental
Expand Down Expand Up @@ -260,6 +264,10 @@ export abstract class AbstractCursor<
throw new MongoRuntimeError('Cursor must be constructed with MongoClient');
}
this.cursorClient = client;

this.cursorClient.s.activeCursors.add(this);
this.once('close', removeActiveCursor);

this.cursorNamespace = namespace;
this.cursorId = null;
this.initialized = false;
Expand Down Expand Up @@ -825,6 +833,11 @@ export abstract class AbstractCursor<
this.isKilled = false;
this.initialized = false;

this.cursorClient.s.activeCursors.add(this);
if (!this.listeners('close').includes(removeActiveCursor)) {
this.once('close', removeActiveCursor);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance that this and the constructor should share more code? This change here feels like something that could have been really easy to miss if you didn't happen to think of it :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea "setupTracking" helper of sorts seems worth having, will do, ty


const session = this.cursorSession;
if (session) {
// We only want to end this session if we created it, and it hasn't ended yet
Expand Down Expand Up @@ -1008,14 +1021,16 @@ export abstract class AbstractCursor<
} catch (error) {
squashError(error);
} finally {
if (session?.owner === this) {
await session.endSession({ error });
}
if (!session?.inTransaction()) {
maybeClearPinnedConnection(session, { error });
try {
if (session?.owner === this) {
await session.endSession({ error });
}
if (!session?.inTransaction()) {
maybeClearPinnedConnection(session, { error });
}
} finally {
this.emitClose();
}

this.emitClose();
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type { ClientMetadata } from './cmap/handshake/client_metadata';
import type { CompressorName } from './cmap/wire_protocol/compression';
import { parseOptions, resolveSRVRecord } from './connection_string';
import { MONGO_CLIENT_EVENTS } from './constants';
import { type AbstractCursor } from './cursor/abstract_cursor';
import { Db, type DbOptions } from './db';
import type { Encrypter } from './encrypter';
import { MongoInvalidArgumentError } from './error';
Expand Down Expand Up @@ -323,6 +324,7 @@ export interface MongoClientPrivate {
* - used to notify the leak checker in our tests if test author forgot to clean up explicit sessions
*/
readonly activeSessions: Set<ClientSession>;
readonly activeCursors: Set<AbstractCursor>;
readonly sessionPool: ServerSessionPool;
readonly options: MongoOptions;
readonly readConcern?: ReadConcern;
Expand Down Expand Up @@ -398,6 +400,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
hasBeenClosed: false,
sessionPool: new ServerSessionPool(this),
activeSessions: new Set(),
activeCursors: new Set(),
authProviders: new MongoClientAuthProviders(),

get options() {
Expand Down Expand Up @@ -646,6 +649,11 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
writable: false
});

const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close());
this.s.activeCursors.clear();

await Promise.all(activeCursorCloses);

const activeSessionEnds = Array.from(this.s.activeSessions, session => session.endSession());
this.s.activeSessions.clear();

Expand Down
43 changes: 43 additions & 0 deletions test/integration/node-specific/abstract_cursor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -556,4 +556,47 @@ describe('class AbstractCursor', function () {
);
});
});

describe('cursor tracking', () => {
let client: MongoClient;
let collection: Collection;

beforeEach(async function () {
client = this.configuration.newClient();
collection = client.db('activeCursors').collection('activeCursors');
await collection.drop().catch(() => null);
await collection.insertMany(Array.from({ length: 50 }, (_, i) => ({ i })));
});

afterEach(async function () {
await client.close();
});

it('adds itself to a set upon construction', () => {
collection.find({}, { batchSize: 1 });
expect(client.s.activeCursors).to.have.lengthOf(1);
});

it('adds itself to a set upon rewind', async () => {
const cursor = collection.find({}, { batchSize: 1 });
await cursor.next();
expect(client.s.activeCursors).to.have.lengthOf(1);
await cursor.close();
expect(client.s.activeCursors).to.have.lengthOf(0);
cursor.rewind();
expect(client.s.activeCursors).to.have.lengthOf(1);
});

it('does not add more than one close listener', async () => {
const cursor = collection.find({}, { batchSize: 1 });
await cursor.next();
expect(cursor.listeners('close')).to.have.lengthOf(1);
await cursor.close();
expect(cursor.listeners('close')).to.have.lengthOf(0);
cursor.rewind();
cursor.rewind();
cursor.rewind();
expect(cursor.listeners('close')).to.have.lengthOf(1);
});
});
});
68 changes: 65 additions & 3 deletions test/integration/node-specific/mongo_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as net from 'net';
import * as sinon from 'sinon';

import {
type Collection,
type CommandFailedEvent,
type CommandStartedEvent,
type CommandSucceededEvent,
Expand Down Expand Up @@ -31,7 +32,6 @@ describe('class MongoClient', function () {
afterEach(async () => {
sinon.restore();
await client?.close();
// @ts-expect-error: Put this variable back to undefined to force tests to make their own client
client = undefined;
});

Expand Down Expand Up @@ -567,7 +567,44 @@ describe('class MongoClient', function () {
});
});

context('#close()', () => {
describe('active cursors', function () {
let client: MongoClient;
let collection: Collection<{ _id: number }>;
const kills = [];

beforeEach(async function () {
client = this.configuration.newClient();
collection = client.db('activeCursors').collection('activeCursors');
await collection.drop().catch(() => null);
await collection.insertMany(Array.from({ length: 50 }, (_, _id) => ({ _id })));

kills.length = 0;
client.on('commandStarted', ev => ev.commandName === 'killCursors' && kills.push(ev));
});

afterEach(async function () {
await client.close();
});

it('are tracked upon creation and removed upon exhaustion', async () => {
const cursors = Array.from({ length: 30 }, (_, skip) =>
collection.find({}, { skip, batchSize: 1 })
);
expect(client.s.activeCursors).to.have.lengthOf(30);
await Promise.all(cursors.map(c => c.toArray()));
expect(client.s.activeCursors).to.have.lengthOf(0);
expect(kills).to.have.lengthOf(0);
});

it('are removed from tracking if exhausted in first batch', async () => {
const cursors = Array.from({ length: 30 }, () => collection.find());
expect(client.s.activeCursors).to.have.lengthOf(30);
await Promise.all(cursors.map(c => c.next())); // only one document pulled from each.
expect(client.s.activeCursors).to.have.lengthOf(0);
});
});

describe('#close()', () => {
let client: MongoClient;
let db: Db;

Expand Down Expand Up @@ -702,7 +739,7 @@ describe('class MongoClient', function () {
expect(endEvents[0]).to.have.property('reply', undefined); // noReponse: true
});

context('when server selection would return no servers', () => {
describe('when server selection would return no servers', () => {
const serverDescription = new ServerDescription('a:1');

it('short circuits and does not end sessions', async () => {
Expand All @@ -722,6 +759,31 @@ describe('class MongoClient', function () {
expect(client.s.sessionPool.sessions).to.have.lengthOf(1);
});
});

describe('active cursors', function () {
let collection: Collection<{ _id: number }>;
const kills = [];

beforeEach(async () => {
collection = client.db('test').collection('activeCursors');
await collection.drop().catch(() => null);
await collection.insertMany(Array.from({ length: 50 }, (_, _id) => ({ _id })));

kills.length = 0;
client.on('commandStarted', ev => ev.commandName === 'killCursors' && kills.push(ev));
});

it('are all closed', async () => {
const cursors = Array.from({ length: 30 }, (_, skip) =>
collection.find({}, { skip, batchSize: 1 })
);
await Promise.all(cursors.map(c => c.next()));
expect(client.s.activeCursors).to.have.lengthOf(30);
await client.close();
expect(client.s.activeCursors).to.have.lengthOf(0);
expect(kills).to.have.lengthOf(30);
});
});
});

context('when connecting', function () {
Expand Down
Loading