Skip to content

Commit

Permalink
feat(debugging): implement x-goog-spanner-request-id propagation per …
Browse files Browse the repository at this point in the history
…request

Implements propagation of the x-goog-spanner-request-id that'll be
propagated for every call. Once an error has been encountered, that
error will have `.requestId` set.

Fixes googleapis#2200
  • Loading branch information
odeke-em committed Dec 21, 2024
1 parent 92248c1 commit b7fe414
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 14 deletions.
29 changes: 22 additions & 7 deletions src/batch-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ class BatchTransaction extends Snapshot {
'BatchTransaction.createQueryPartitions',
traceConfig,
span => {
const database = this.session.parent as Database;
const nthRequest = database._nextNthRequest();
const headers: {[k: string]: string} = {};
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
Expand All @@ -157,7 +159,11 @@ class BatchTransaction extends Snapshot {
method: 'partitionQuery',
reqOpts,
gaxOpts: query.gaxOptions,
headers: headers,
headers: this.session._metadataWithRequestId(
nthRequest,
1,
headers
),
},
(err, partitions, resp) => {
if (err) {
Expand Down Expand Up @@ -201,11 +207,16 @@ class BatchTransaction extends Snapshot {
transaction: {id: this.id},
});
config.reqOpts = extend({}, query);
config.headers = {
[CLOUD_RESOURCE_HEADER]: (this.session.parent as Database)
.formattedName_,
const database = this.session.parent as Database;
const headers = {
[CLOUD_RESOURCE_HEADER]: database.formattedName_,
};
delete query.partitionOptions;
(config.headers = this.session._metadataWithRequestId(
database._nextNthRequest(),
1,
headers
)),
delete query.partitionOptions;
this.session.request(config, (err, resp) => {
if (err) {
setSpanError(span, err);
Expand Down Expand Up @@ -286,14 +297,18 @@ class BatchTransaction extends Snapshot {
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}

const database = this.session.parent as Database;
this.createPartitions_(
{
client: 'SpannerClient',
method: 'partitionRead',
reqOpts,
gaxOpts: options.gaxOptions,
headers: headers,
headers: this.session._metadataWithRequestId(
database._nextNthRequest(),
1,
headers
),
},
(err, partitions, resp) => {
if (err) {
Expand Down
5 changes: 5 additions & 0 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2917,6 +2917,11 @@ class Database extends common.GrpcServiceObject {
});
}

public _nextNthRequest(): number {
const spanner = this.parent as Spanner;
return spanner._nextNthRequest();
}

/**
* Create a readable object stream to receive resulting rows from a SQL
* statement.
Expand Down
13 changes: 13 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ import {
ObservabilityOptions,
ensureInitialContextManagerSet,
} from './instrument';
import {
AtomicCounter,
newAtomicCounter,
nextSpannerClientId,
} from './request_id_header';

// eslint-disable-next-line @typescript-eslint/no-var-requires
const gcpApiConfig = require('./spanner_grpc_config.json');
Expand Down Expand Up @@ -246,6 +251,8 @@ class Spanner extends GrpcService {
routeToLeaderEnabled = true;
directedReadOptions: google.spanner.v1.IDirectedReadOptions | null;
_observabilityOptions: ObservabilityOptions | undefined;
private _nthClientId: number;
private _nthRequest: AtomicCounter;

/**
* Placeholder used to auto populate a column with the commit timestamp.
Expand Down Expand Up @@ -374,6 +381,12 @@ class Spanner extends GrpcService {
this.directedReadOptions = directedReadOptions;
this._observabilityOptions = options.observabilityOptions;
ensureInitialContextManagerSet();
this._nthClientId = nextSpannerClientId();
this._nthRequest = newAtomicCounter(0);
}

_nextNthRequest(): number {
return this._nthRequest.increment();
}

/**
Expand Down
80 changes: 80 additions & 0 deletions src/request_id_header.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*!
* Copyright 2024 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {randomBytes} from 'crypto';
const randIdForProcess = randomBytes(8).readBigUint64LE(0).toString();
const X_GOOG_SPANNER_REQUEST_ID_HEADER = 'x-goog-spanner-request-id';

class AtomicCounter {
private backingBuffer: BigInt64Array;

constructor(initialValue?: number) {
this.backingBuffer = new BigInt64Array(
new SharedArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT)
);
if (initialValue) {
this.increment(initialValue);
}
}

public increment(n?: number): number {
if (!n) {
n = 1;
}
Atomics.store(this.backingBuffer, 0, BigInt(n));
return this.value();
}

public value(): number {
return Number(Atomics.load(this.backingBuffer, 0));
}

public toString(): string {
return `${this.value()}`;
}
}

function craftRequestId(
nthClientId: number,
channelId: number,
nthRequest: number,
attempt: number
) {
return `1.${randIdForProcess}.${nthClientId}.${channelId}.${nthRequest}.${attempt}`;
}

const nthClientId = new AtomicCounter();

/*
* nextSpannerClientId increments the internal
* counter for created SpannerClients, for use
* with x-goog-spanner-request-id.
*/
function nextSpannerClientId(): number {
return nthClientId.increment(1);
}

function newAtomicCounter(n?: number): AtomicCounter {
return new AtomicCounter(n);
}

export {
AtomicCounter,
X_GOOG_SPANNER_REQUEST_ID_HEADER,
craftRequestId,
nextSpannerClientId,
newAtomicCounter,
};
53 changes: 49 additions & 4 deletions src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ import {
import {grpc, CallOptions} from 'google-gax';
import IRequestOptions = google.spanner.v1.IRequestOptions;
import {Spanner} from '.';

import {
X_GOOG_SPANNER_REQUEST_ID_HEADER,
craftRequestId,
} from './request_id_header';
export type GetSessionResponse = [Session, r.Response];

/**
Expand Down Expand Up @@ -316,13 +319,18 @@ export class Session extends common.GrpcServiceObject {
const reqOpts = {
name: this.formattedName_,
};
const database = this.parent as Database;
return this.request(
{
client: 'SpannerClient',
method: 'deleteSession',
reqOpts,
gaxOpts,
headers: this.resourceHeader_,
headers: this._metadataWithRequestId(
database._nextNthRequest(),
1,
this.resourceHeader_
),
},
callback!
);
Expand Down Expand Up @@ -388,13 +396,18 @@ export class Session extends common.GrpcServiceObject {
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}
const database = this.parent as Database;
return this.request(
{
client: 'SpannerClient',
method: 'getSession',
reqOpts,
gaxOpts,
headers: headers,
headers: this._metadataWithRequestId(
database._nextNthRequest(),
1,
headers
),
},
(err, resp) => {
if (resp) {
Expand Down Expand Up @@ -440,13 +453,19 @@ export class Session extends common.GrpcServiceObject {
session: this.formattedName_,
sql: 'SELECT 1',
};

const database = this.parent as Database;
return this.request(
{
client: 'SpannerClient',
method: 'executeSql',
reqOpts,
gaxOpts,
headers: this.resourceHeader_,
headers: this._metadataWithRequestId(
database._nextNthRequest(),
1,
this.resourceHeader_
),
},
callback!
);
Expand Down Expand Up @@ -533,6 +552,32 @@ export class Session extends common.GrpcServiceObject {
private _getSpanner(): Spanner {
return this.parent.parent.parent as Spanner;
}

private channelId(): number {
// TODO: Infer channelId from the actual gRPC channel.
return 1;
}

public _metadataWithRequestId(
nthRequest: number,
attempt: number,
priorMetadata?: {[k: string]: string}
): {[k: string]: string} {
const database = this.parent as Database;
if (!priorMetadata) {
priorMetadata = {};
}
const withReqId = {
...priorMetadata,
};
withReqId[X_GOOG_SPANNER_REQUEST_ID_HEADER] = craftRequestId(
database._nthClientId,
this.channelId(),
nthRequest,
attempt
);
return withReqId;
}
}

/*! Developer Documentation
Expand Down
25 changes: 22 additions & 3 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ export class Snapshot extends EventEmitter {
opts: this._observabilityOptions,
dbName: this._dbName!,
};
const database = this.session.parent as Database;
return startTrace('Snapshot.begin', traceConfig, span => {
span.addEvent('Begin Transaction');

Expand All @@ -454,7 +455,11 @@ export class Snapshot extends EventEmitter {
method: 'beginTransaction',
reqOpts,
gaxOpts,
headers: headers,
headers: this.session._metadataWithRequestId(
database._nextNthRequest(),
1,
headers
),
},
(
err: null | grpc.ServiceError,
Expand Down Expand Up @@ -711,8 +716,12 @@ export class Snapshot extends EventEmitter {
opts: this._observabilityOptions,
dbName: this._dbName!,
};

return startTrace('Snapshot.createReadStream', traceConfig, span => {
let attempt = 0;
const database = this.session.parent as Database;
const nthRequest = database._nextNthRequest();

const makeRequest = (resumeToken?: ResumeToken): Readable => {
if (this.id && transaction.begin) {
delete transaction.begin;
Expand All @@ -739,7 +748,11 @@ export class Snapshot extends EventEmitter {
method: 'streamingRead',
reqOpts: Object.assign({}, reqOpts, {resumeToken}),
gaxOpts: gaxOptions,
headers: headers,
headers: this.session._metadataWithRequestId(
nthRequest,
attempt,
headers
),
});
};

Expand Down Expand Up @@ -1297,6 +1310,8 @@ export class Snapshot extends EventEmitter {
};
return startTrace('Snapshot.runStream', traceConfig, span => {
let attempt = 0;
const database = this.session.parent as Database;
const nthRequest = database._nextNthRequest();
const makeRequest = (resumeToken?: ResumeToken): Readable => {
attempt++;

Expand Down Expand Up @@ -1330,7 +1345,11 @@ export class Snapshot extends EventEmitter {
method: 'executeStreamingSql',
reqOpts: Object.assign({}, reqOpts, {resumeToken}),
gaxOpts: gaxOptions,
headers: headers,
headers: this.session._metadataWithRequestId(
nthRequest,
attempt,
headers
),
});
};

Expand Down

0 comments on commit b7fe414

Please sign in to comment.