Skip to content

Commit

Permalink
feat(debugging): implement x-goog-spanner-request-id propagation per …
Browse files Browse the repository at this point in the history
…request

Implements propagation of the x-goog-spanner-request-id that'll be
propagated for every call. Once an error has been encountered, that
error will have `.requestId` set.

Fixes googleapis#2200
  • Loading branch information
odeke-em committed Jan 6, 2025
1 parent 9f77f9f commit d4ced71
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 17 deletions.
29 changes: 22 additions & 7 deletions src/batch-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ class BatchTransaction extends Snapshot {
'BatchTransaction.createQueryPartitions',
traceConfig,
span => {
const database = this.session.parent as Database;
const nthRequest = database._nextNthRequest();
const headers: {[k: string]: string} = {};
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
Expand All @@ -157,7 +159,11 @@ class BatchTransaction extends Snapshot {
method: 'partitionQuery',
reqOpts,
gaxOpts: query.gaxOptions,
headers: headers,
headers: this.session._metadataWithRequestId(
nthRequest,
1,
headers
),
},
(err, partitions, resp) => {
if (err) {
Expand Down Expand Up @@ -201,11 +207,16 @@ class BatchTransaction extends Snapshot {
transaction: {id: this.id},
});
config.reqOpts = extend({}, query);
config.headers = {
[CLOUD_RESOURCE_HEADER]: (this.session.parent as Database)
.formattedName_,
const database = this.session.parent as Database;
const headers = {
[CLOUD_RESOURCE_HEADER]: database.formattedName_,
};
delete query.partitionOptions;
(config.headers = this.session._metadataWithRequestId(
database._nextNthRequest(),
1,
headers
)),
delete query.partitionOptions;
this.session.request(config, (err, resp) => {
if (err) {
setSpanError(span, err);
Expand Down Expand Up @@ -286,14 +297,18 @@ class BatchTransaction extends Snapshot {
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}

const database = this.session.parent as Database;
this.createPartitions_(
{
client: 'SpannerClient',
method: 'partitionRead',
reqOpts,
gaxOpts: options.gaxOptions,
headers: headers,
headers: this.session._metadataWithRequestId(
database._nextNthRequest(),
1,
headers
),
},
(err, partitions, resp) => {
if (err) {
Expand Down
47 changes: 46 additions & 1 deletion src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ import {
setSpanErrorAndException,
traceConfig,
} from './instrument';
import {
AtomicCounter,
X_GOOG_SPANNER_REQUEST_ID_HEADER,
craftRequestId,
newAtomicCounter,
} from './request_id_header';

export type GetDatabaseRolesCallback = RequestCallback<
IDatabaseRole,
databaseAdmin.spanner.admin.database.v1.IListDatabaseRolesResponse
Expand Down Expand Up @@ -350,6 +357,8 @@ class Database extends common.GrpcServiceObject {
> | null;
_observabilityOptions?: ObservabilityOptions; // TODO: exmaine if we can remove it
private _traceConfig: traceConfig;
private _nthRequest: AtomicCounter;
public _clientId: number;
constructor(
instance: Instance,
name: string,
Expand Down Expand Up @@ -483,7 +492,14 @@ class Database extends common.GrpcServiceObject {
Object.assign({}, queryOptions),
Database.getEnvironmentQueryOptions()
);
this._nthRequest = newAtomicCounter(0);
this._clientId = 0;
}

_nextNthRequest(): number {
return this._nthRequest.increment();
}

/**
* @typedef {array} SetDatabaseMetadataResponse
* @property {object} 0 The {@link Database} metadata.
Expand Down Expand Up @@ -699,7 +715,11 @@ class Database extends common.GrpcServiceObject {
method: 'batchCreateSessions',
reqOpts,
gaxOpts: options.gaxOptions,
headers: headers,
headers: this._metadataWithRequestId(
this._nextNthRequest(),
1,
headers
),
},
(err, resp) => {
if (err) {
Expand All @@ -723,6 +743,31 @@ class Database extends common.GrpcServiceObject {
});
}

private channelId(): number {
// TODO: Infer channelId from the actual gRPC channel.
return 1;
}

public _metadataWithRequestId(
nthRequest: number,
attempt: number,
priorMetadata?: {[k: string]: string}
): {[k: string]: string} {
if (!priorMetadata) {
priorMetadata = {};
}
const withReqId = {
...priorMetadata,
};
withReqId[X_GOOG_SPANNER_REQUEST_ID_HEADER] = craftRequestId(
this._clientId,
this.channelId(),
nthRequest,
attempt
);
return withReqId;
}

/**
* Get a reference to a {@link BatchTransaction} object.
*
Expand Down
3 changes: 3 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import {
ObservabilityOptions,
ensureInitialContextManagerSet,
} from './instrument';
import {AtomicCounter, nextSpannerClientId} from './request_id_header';

// eslint-disable-next-line @typescript-eslint/no-var-requires
const gcpApiConfig = require('./spanner_grpc_config.json');
Expand Down Expand Up @@ -248,6 +249,7 @@ class Spanner extends GrpcService {
routeToLeaderEnabled = true;
directedReadOptions: google.spanner.v1.IDirectedReadOptions | null;
_observabilityOptions: ObservabilityOptions | undefined;
_nthClientId: number;

/**
* Placeholder used to auto populate a column with the commit timestamp.
Expand Down Expand Up @@ -377,6 +379,7 @@ class Spanner extends GrpcService {
this._observabilityOptions?.enableEndToEndTracing
);
ensureInitialContextManagerSet();
this._nthClientId = nextSpannerClientId();
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,7 @@ class Instance extends common.GrpcServiceObject {
if (!this.databases_.has(key!)) {
const db = new Database(this, name, poolOptions, queryOptions);
db._observabilityOptions = this._observabilityOptions;
db._clientId = (this.parent as Spanner)._nthClientId;
this.databases_.set(key!, db);
}
return this.databases_.get(key!)!;
Expand Down
81 changes: 81 additions & 0 deletions src/request_id_header.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*!
* Copyright 2024 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {randomBytes} from 'crypto';
const randIdForProcess = randomBytes(8).readBigUint64LE(0).toString();
const X_GOOG_SPANNER_REQUEST_ID_HEADER = 'x-goog-spanner-request-id';

class AtomicCounter {
private backingBuffer: Uint32Array;

constructor(initialValue?: number) {
this.backingBuffer = new Uint32Array(
new SharedArrayBuffer(Uint32Array.BYTES_PER_ELEMENT)
);
if (initialValue) {
this.increment(initialValue);
}
}

public increment(n?: number): number {
if (!n) {
n = 1;
}
Atomics.add(this.backingBuffer, 0, n);
return this.value();
}

public value(): number {
return Atomics.load(this.backingBuffer, 0);
}

public toString(): string {
return `${this.value()}`;
}
}

function craftRequestId(
nthClientId: number,
channelId: number,
nthRequest: number,
attempt: number
) {
return `1.${randIdForProcess}.${nthClientId}.${channelId}.${nthRequest}.${attempt}`;
}

const nthClientId = new AtomicCounter();

/*
* nextSpannerClientId increments the internal
* counter for created SpannerClients, for use
* with x-goog-spanner-request-id.
*/
function nextSpannerClientId(): number {
nthClientId.increment(1);
return nthClientId.value();
}

function newAtomicCounter(n?: number): AtomicCounter {
return new AtomicCounter(n);
}

export {
AtomicCounter,
X_GOOG_SPANNER_REQUEST_ID_HEADER,
craftRequestId,
nextSpannerClientId,
newAtomicCounter,
};
38 changes: 34 additions & 4 deletions src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import {
import {grpc, CallOptions} from 'google-gax';
import IRequestOptions = google.spanner.v1.IRequestOptions;
import {Spanner} from '.';

export type GetSessionResponse = [Session, r.Response];

/**
Expand Down Expand Up @@ -317,13 +316,18 @@ export class Session extends common.GrpcServiceObject {
const reqOpts = {
name: this.formattedName_,
};
const database = this.parent as Database;
return this.request(
{
client: 'SpannerClient',
method: 'deleteSession',
reqOpts,
gaxOpts,
headers: this.commonHeaders_,
headers: database._metadataWithRequestId(
database._nextNthRequest(),
1,
this.commonHeaders_
),
},
callback!
);
Expand Down Expand Up @@ -389,13 +393,18 @@ export class Session extends common.GrpcServiceObject {
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}
const database = this.parent as Database;
return this.request(
{
client: 'SpannerClient',
method: 'getSession',
reqOpts,
gaxOpts,
headers: headers,
headers: database._metadataWithRequestId(
database._nextNthRequest(),
1,
headers
),
},
(err, resp) => {
if (resp) {
Expand Down Expand Up @@ -441,17 +450,33 @@ export class Session extends common.GrpcServiceObject {
session: this.formattedName_,
sql: 'SELECT 1',
};

const database = this.parent as Database;
return this.request(
{
client: 'SpannerClient',
method: 'executeSql',
reqOpts,
gaxOpts,
headers: this.commonHeaders_,
headers: database._metadataWithRequestId(
database._nextNthRequest(),
1,
this.commonHeaders_
),
},
callback!
);
}

public _metadataWithRequestId(
nthRequest: number,
attempt: number,
priorMetadata?: {[k: string]: string}
): {[k: string]: string} {
const database = this.parent as Database;
return database._metadataWithRequestId(nthRequest, attempt, priorMetadata);
}

/**
* Create a PartitionedDml transaction.
*
Expand Down Expand Up @@ -534,6 +559,11 @@ export class Session extends common.GrpcServiceObject {
private _getSpanner(): Spanner {
return this.parent.parent.parent as Spanner;
}

private channelId(): number {
// TODO: Infer channelId from the actual gRPC channel.
return 1;
}
}

/*! Developer Documentation
Expand Down
Loading

0 comments on commit d4ced71

Please sign in to comment.