Skip to content

Commit

Permalink
loadOne/loadMany now batch across steps also (#1852)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjie authored Nov 8, 2023
2 parents b58f068 + a83b7aa commit fa60922
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 9 deletions.
5 changes: 5 additions & 0 deletions .changeset/tricky-files-obey.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"grafast": patch
---

Adds tick-based batching (cross-step batching) to loadOne and loadMany.
161 changes: 161 additions & 0 deletions grafast/grafast/__tests__/loadOne-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import { expect } from "chai";
import type { ExecutionResult } from "graphql";
import { it } from "mocha";

import type { LoadOneCallback } from "../dist/index.js";
import { grafast, loadOne, makeGrafastSchema } from "../dist/index.js";

interface Thing {
id: number;
name: string;
reallyLongBio: string;
}
const THINGS: Thing[] = [
{
id: 1,
name: "Eyedee Won",
reallyLongBio: "Really long bio. ".repeat(1000),
},
{
id: 2,
name: "Idee Too",
reallyLongBio: "Super long bio. ".repeat(1000),
},
];

function pick<T extends object, K extends keyof T>(
obj: T,
keys: readonly K[],
): Pick<T, K> {
return Object.fromEntries(
Object.entries(obj).filter(([key]) => keys.includes(key as any)),
) as Pick<T, K>;
}

let CALLS: {
ids: readonly number[];
result: object;
attributes: readonly (keyof Thing)[] | null;
params: object;
}[] = [];

const loadThingByIds: LoadOneCallback<number, Thing, Record<string, never>> = (
ids,
{ attributes, params },
) => {
const result = ids
.map((id) => THINGS.find((t) => t.id === id))
.map((t) => (t && attributes ? pick(t, attributes) : t));
CALLS.push({ ids, result, attributes, params });
return result;
};

const makeSchema = (useStreamableStep = false) => {
return makeGrafastSchema({
typeDefs: /* GraphQL */ `
type Thing {
id: Int!
name: String!
reallyLongBio: String!
}
type Query {
thingById(id: Int!): Thing
}
`,
plans: {
Query: {
thingById(_, { $id }) {
return loadOne($id, loadThingByIds);
},
},
},
enableDeferStream: true,
});
};

it("batches across parallel trees with identical selection sets", async () => {
const source = /* GraphQL */ `
{
t1: thingById(id: 1) {
id
name
}
t2: thingById(id: 2) {
id
name
}
t3: thingById(id: 3) {
id
name
}
}
`;
const schema = makeSchema(false);

CALLS = [];
const result = (await grafast(
{
schema,
source,
},
{},
{},
)) as ExecutionResult;
expect(result).to.deep.equal({
data: {
t1: {
id: 1,
name: "Eyedee Won",
},
t2: {
id: 2,
name: "Idee Too",
},
t3: null,
},
});
expect(CALLS).to.have.length(1);
expect(CALLS[0].attributes).to.deep.equal(["id", "name"]);
});
it("batches across parallel trees with non-identical selection sets", async () => {
const source = /* GraphQL */ `
{
t1: thingById(id: 1) {
id
name
}
t2: thingById(id: 2) {
id
}
t3: thingById(id: 3) {
id
reallyLongBio
}
}
`;
const schema = makeSchema(false);

CALLS = [];
const result = (await grafast(
{
schema,
source,
},
{},
{},
)) as ExecutionResult;
expect(result).to.deep.equal({
data: {
t1: {
id: 1,
name: "Eyedee Won",
},
t2: {
id: 2,
},
t3: null,
},
});
expect(CALLS).to.have.length(1);
expect(CALLS[0].attributes).to.deep.equal(["id", "name", "reallyLongBio"]);
});
2 changes: 2 additions & 0 deletions grafast/grafast/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import {
JSONArray,
JSONObject,
JSONValue,
Maybe,
NodeIdCodec,
NodeIdHandler,
OutputPlanForType,
Expand Down Expand Up @@ -344,6 +345,7 @@ export {
LoadStep,
makeDecodeNodeId,
makeGrafastSchema,
Maybe,
ModifierStep,
newGrafastFieldConfigBuilder,
newInputObjectTypeBuilder,
Expand Down
2 changes: 2 additions & 0 deletions grafast/grafast/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1004,4 +1004,6 @@ export interface GrafastArgs extends GraphQLArgs {
requestContext?: Partial<Grafast.RequestContext>;
}

export type Maybe<T> = T | null | undefined;

export * from "./planJSONInterfaces.js";
86 changes: 77 additions & 9 deletions grafast/grafast/src/steps/load.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import type { __ItemStep } from "../index.js";
import type { __ItemStep, Deferred } from "../index.js";
import { defer } from "../index.js";
import type {
ExecutionExtra,
GrafastResultsList,
GrafastValuesList,
Maybe,
PromiseOrDirect,
} from "../interfaces.js";
import { ExecutableStep, isListLikeStep, isObjectLikeStep } from "../step.js";
Expand All @@ -23,7 +25,7 @@ type LoadCallback<
(
specs: ReadonlyArray<TSpec>,
options: LoadOptions<TItem, TParams>,
): PromiseOrDirect<ReadonlyArray<TData>>;
): PromiseOrDirect<ReadonlyArray<Maybe<TData>>>;
displayName?: string;
};

Expand Down Expand Up @@ -65,8 +67,16 @@ export function loadManyCallback<
return callback;
}

interface LoadBatch {
deferred: Deferred<any>;
batchSpecs: readonly any[];
}

interface LoadMeta {
cache?: Map<any, any>;
loadBatchesByLoad?:
| Map<LoadCallback<any, any, any, any>, LoadBatch[]>
| undefined;
}

const idByLoad = new WeakMap<LoadCallback<any, any, any, any>, string>();
Expand Down Expand Up @@ -277,8 +287,7 @@ export class LoadStep<
count: number,
[specs]: [GrafastValuesList<TSpec>],
extra: ExecutionExtra,
): PromiseOrDirect<GrafastResultsList<TData>> {
const loadOptions = this.loadOptions!;
): PromiseOrDirect<GrafastResultsList<Maybe<TData>>> {
const meta = extra.meta as LoadMeta;
let cache = meta.cache;
if (!cache) {
Expand All @@ -287,7 +296,7 @@ export class LoadStep<
}
const batch = new Map<TSpec, number[]>();

const results: Array<PromiseOrDirect<TData> | null> = [];
const results: Array<PromiseOrDirect<Maybe<TData>>> = [];
for (let i = 0; i < count; i++) {
const spec = specs[i];
if (cache.has(spec)) {
Expand All @@ -305,9 +314,30 @@ export class LoadStep<
}
const pendingCount = batch.size;
if (pendingCount > 0) {
const deferred = defer<ReadonlyArray<Maybe<TData>>>();
const batchSpecs = [...batch.keys()];
const loadBatch: LoadBatch = { deferred, batchSpecs };
if (!meta.loadBatchesByLoad) {
meta.loadBatchesByLoad = new Map();
}
let loadBatches = meta.loadBatchesByLoad.get(this.load);
if (loadBatches) {
// Add to existing batch load
loadBatches.push(loadBatch);
} else {
// Create new batch load
loadBatches = [loadBatch];
meta.loadBatchesByLoad.set(this.load, loadBatches);
// Guaranteed by the metaKey to be equivalent for all entries sharing the same `meta`. Note equivalent is not identical; key order may change.
const loadOptions = this.loadOptions!;
setTimeout(() => {
// Don't allow adding anything else to the batch
meta.loadBatchesByLoad!.delete(this.load);
executeBatches(loadBatches!, this.load, loadOptions);
}, 0);
}
return (async () => {
const batchSpecs = [...batch.keys()];
const loadResults = await this.load(batchSpecs, loadOptions);
const loadResults = await deferred;
for (
let pendingIndex = 0;
pendingIndex < pendingCount;
Expand All @@ -321,10 +351,48 @@ export class LoadStep<
results[targetIndex] = loadResult;
}
}
return results as Array<PromiseOrDirect<TData>>;
return results;
})();
}
return results as Array<PromiseOrDirect<TData>>;
return results;
}
}

async function executeBatches(
loadBatches: readonly LoadBatch[],
load: LoadCallback<any, any, any, any>,
loadOptions: LoadOptions<any, any>,
) {
try {
const numberOfBatches = loadBatches.length;
if (numberOfBatches === 1) {
const [loadBatch] = loadBatches;
loadBatch.deferred.resolve(load(loadBatch.batchSpecs, loadOptions));
return;
} else {
// Do some tick-batching!
const indexStarts: number[] = [];
const allBatchSpecs: any[] = [];
for (let i = 0; i < numberOfBatches; i++) {
const loadBatch = loadBatches[i];
indexStarts[i] = allBatchSpecs.length;
for (const batchSpec of loadBatch.batchSpecs) {
allBatchSpecs.push(batchSpec);
}
}
const results = await load(allBatchSpecs, loadOptions);
for (let i = 0; i < numberOfBatches; i++) {
const loadBatch = loadBatches[i];
const start = indexStarts[i];
const stop = indexStarts[i + 1] ?? allBatchSpecs.length;
const entries = results.slice(start, stop);
loadBatch.deferred.resolve(entries);
}
}
} catch (e) {
for (const loadBatch of loadBatches) {
loadBatch.deferred.reject(e);
}
}
}

Expand Down

0 comments on commit fa60922

Please sign in to comment.