Skip to content

Commit

Permalink
feat(NODE-6633): MongoClient.close closes active cursors
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Jan 15, 2025
1 parent e2aa15c commit adade92
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 10 deletions.
29 changes: 22 additions & 7 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ export interface CursorStreamOptions {
/** @public */
export type CursorFlag = (typeof CURSOR_FLAGS)[number];

function removeActiveCursor(this: AbstractCursor) {
this.client.s.activeCursors.delete(this);
}

/**
* @public
* @experimental
Expand Down Expand Up @@ -260,6 +264,10 @@ export abstract class AbstractCursor<
throw new MongoRuntimeError('Cursor must be constructed with MongoClient');
}
this.cursorClient = client;

this.cursorClient.s.activeCursors.add(this);
this.once('close', removeActiveCursor);

this.cursorNamespace = namespace;
this.cursorId = null;
this.initialized = false;
Expand Down Expand Up @@ -825,6 +833,11 @@ export abstract class AbstractCursor<
this.isKilled = false;
this.initialized = false;

this.cursorClient.s.activeCursors.add(this);
if (!this.listeners('close').includes(removeActiveCursor)) {
this.once('close', removeActiveCursor);
}

const session = this.cursorSession;
if (session) {
// We only want to end this session if we created it, and it hasn't ended yet
Expand Down Expand Up @@ -1008,14 +1021,16 @@ export abstract class AbstractCursor<
} catch (error) {
squashError(error);
} finally {
if (session?.owner === this) {
await session.endSession({ error });
}
if (!session?.inTransaction()) {
maybeClearPinnedConnection(session, { error });
try {
if (session?.owner === this) {
await session.endSession({ error });
}
if (!session?.inTransaction()) {
maybeClearPinnedConnection(session, { error });
}
} finally {
this.emitClose();
}

this.emitClose();
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type { ClientMetadata } from './cmap/handshake/client_metadata';
import type { CompressorName } from './cmap/wire_protocol/compression';
import { parseOptions, resolveSRVRecord } from './connection_string';
import { MONGO_CLIENT_EVENTS } from './constants';
import { type AbstractCursor } from './cursor/abstract_cursor';
import { Db, type DbOptions } from './db';
import type { Encrypter } from './encrypter';
import { MongoInvalidArgumentError } from './error';
Expand Down Expand Up @@ -323,6 +324,7 @@ export interface MongoClientPrivate {
* - used to notify the leak checker in our tests if test author forgot to clean up explicit sessions
*/
readonly activeSessions: Set<ClientSession>;
readonly activeCursors: Set<AbstractCursor>;
readonly sessionPool: ServerSessionPool;
readonly options: MongoOptions;
readonly readConcern?: ReadConcern;
Expand Down Expand Up @@ -398,6 +400,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
hasBeenClosed: false,
sessionPool: new ServerSessionPool(this),
activeSessions: new Set(),
activeCursors: new Set(),
authProviders: new MongoClientAuthProviders(),

get options() {
Expand Down Expand Up @@ -646,6 +649,11 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
writable: false
});

const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close());
this.s.activeCursors.clear();

await Promise.all(activeCursorCloses);

const activeSessionEnds = Array.from(this.s.activeSessions, session => session.endSession());
this.s.activeSessions.clear();

Expand Down
43 changes: 43 additions & 0 deletions test/integration/node-specific/abstract_cursor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -556,4 +556,47 @@ describe('class AbstractCursor', function () {
);
});
});

describe('cursor tracking', () => {
let client: MongoClient;
let collection: Collection;

beforeEach(async function () {
client = this.configuration.newClient();
collection = client.db('activeCursors').collection('activeCursors');
await collection.drop().catch(() => null);
await collection.insertMany(Array.from({ length: 50 }, (_, i) => ({ i })));
});

afterEach(async function () {
await client.close();
});

it('adds itself to a set upon construction', () => {
collection.find({}, { batchSize: 1 });
expect(client.s.activeCursors).to.have.lengthOf(1);
});

it('adds itself to a set upon rewind', async () => {
const cursor = collection.find({}, { batchSize: 1 });
await cursor.next();
expect(client.s.activeCursors).to.have.lengthOf(1);
await cursor.close();
expect(client.s.activeCursors).to.have.lengthOf(0);
cursor.rewind();
expect(client.s.activeCursors).to.have.lengthOf(1);
});

it('does not add more than one close listener', async () => {
const cursor = collection.find({}, { batchSize: 1 });
await cursor.next();
expect(cursor.listeners('close')).to.have.lengthOf(1);
await cursor.close();
expect(cursor.listeners('close')).to.have.lengthOf(0);
cursor.rewind();
cursor.rewind();
cursor.rewind();
expect(cursor.listeners('close')).to.have.lengthOf(1);
});
});
});
68 changes: 65 additions & 3 deletions test/integration/node-specific/mongo_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as net from 'net';
import * as sinon from 'sinon';

import {
type Collection,
type CommandFailedEvent,
type CommandStartedEvent,
type CommandSucceededEvent,
Expand Down Expand Up @@ -31,7 +32,6 @@ describe('class MongoClient', function () {
afterEach(async () => {
sinon.restore();
await client?.close();
// @ts-expect-error: Put this variable back to undefined to force tests to make their own client
client = undefined;
});

Expand Down Expand Up @@ -567,7 +567,44 @@ describe('class MongoClient', function () {
});
});

context('#close()', () => {
describe('active cursors', function () {
let client: MongoClient;
let collection: Collection<{ _id: number }>;
const kills = [];

beforeEach(async function () {
client = this.configuration.newClient();
collection = client.db('activeCursors').collection('activeCursors');
await collection.drop().catch(() => null);
await collection.insertMany(Array.from({ length: 50 }, (_, _id) => ({ _id })));

kills.length = 0;
client.on('commandStarted', ev => ev.commandName === 'killCursors' && kills.push(ev));
});

afterEach(async function () {
await client.close();
});

it('are tracked upon creation and removed upon exhaustion', async () => {
const cursors = Array.from({ length: 30 }, (_, skip) =>
collection.find({}, { skip, batchSize: 1 })
);
expect(client.s.activeCursors).to.have.lengthOf(30);
await Promise.all(cursors.map(c => c.toArray()));
expect(client.s.activeCursors).to.have.lengthOf(0);
expect(kills).to.have.lengthOf(0);
});

it('are removed from tracking if exhausted in first batch', async () => {
const cursors = Array.from({ length: 30 }, () => collection.find());
expect(client.s.activeCursors).to.have.lengthOf(30);
await Promise.all(cursors.map(c => c.next())); // only one document pulled from each.
expect(client.s.activeCursors).to.have.lengthOf(0);
});
});

describe('#close()', () => {
let client: MongoClient;
let db: Db;

Expand Down Expand Up @@ -702,7 +739,7 @@ describe('class MongoClient', function () {
expect(endEvents[0]).to.have.property('reply', undefined); // noReponse: true
});

context('when server selection would return no servers', () => {
describe('when server selection would return no servers', () => {
const serverDescription = new ServerDescription('a:1');

it('short circuits and does not end sessions', async () => {
Expand All @@ -722,6 +759,31 @@ describe('class MongoClient', function () {
expect(client.s.sessionPool.sessions).to.have.lengthOf(1);
});
});

describe('active cursors', function () {
let collection: Collection<{ _id: number }>;
const kills = [];

beforeEach(async () => {
collection = client.db('test').collection('activeCursors');
await collection.drop().catch(() => null);
await collection.insertMany(Array.from({ length: 50 }, (_, _id) => ({ _id })));

kills.length = 0;
client.on('commandStarted', ev => ev.commandName === 'killCursors' && kills.push(ev));
});

it('are all closed', async () => {
const cursors = Array.from({ length: 30 }, (_, skip) =>
collection.find({}, { skip, batchSize: 1 })
);
await Promise.all(cursors.map(c => c.next()));
expect(client.s.activeCursors).to.have.lengthOf(30);
await client.close();
expect(client.s.activeCursors).to.have.lengthOf(0);
expect(kills).to.have.lengthOf(30);
});
});
});

context('when connecting', function () {
Expand Down

0 comments on commit adade92

Please sign in to comment.