Skip to content

Commit

Permalink
Add debug channel callbacks
Browse files Browse the repository at this point in the history
Motivation:

Sometimes it can be useful to modify the channel pipeline or inspect the
channel when debugging. v1 has hooks to allow users to do this which
proved helpful a number of times. v2 should offer similar behaviour.

Modifications:

- Add 'DebugChannelCallbacks' to the client and server config and wire
  them up appropriately.
- Add tests. Extract 'TransportKind' from some tests as it was being
  recreated a number of times.

Result:

Users can modify the channel pipeline if they need to.
  • Loading branch information
glbrntt committed Jan 9, 2025
1 parent e97f97e commit 674b6c2
Show file tree
Hide file tree
Showing 17 changed files with 566 additions and 75 deletions.
12 changes: 9 additions & 3 deletions Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ package final class Connection: Sendable {
descriptor: MethodDescriptor,
options: CallOptions
) async throws -> Stream {
let (multiplexer, scheme) = try self.state.withLock { state in
let (multiplexer, scheme, onCreateStream) = try self.state.withLock { state in
switch state {
case .connected(let connected):
return (connected.multiplexer, connected.scheme)
return (connected.multiplexer, connected.scheme, connected.onCreateHTTP2Stream)
case .notConnected, .closing, .closed:
throw RPCError(code: .unavailable, message: "subchannel isn't ready")
}
Expand Down Expand Up @@ -243,7 +243,10 @@ package final class Connection: Sendable {
outboundType: RPCRequestPart.self
)
)
}
}.runCallbackIfSet(
on: channel,
callback: onCreateStream
)
}

return Stream(wrapping: stream, descriptor: descriptor)
Expand Down Expand Up @@ -461,11 +464,14 @@ extension Connection {
var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>
/// Whether the connection is plaintext, `false` implies TLS is being used.
var scheme: Scheme
/// A user-provided callback to call after creating the stream.
var onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)?

init(_ connection: HTTP2Connection) {
self.channel = connection.channel
self.multiplexer = connection.multiplexer
self.scheme = connection.isPlaintext ? .http : .https
self.onCreateHTTP2Stream = connection.onCreateHTTP2Stream
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,21 @@ package struct HTTP2Connection: Sendable {
/// An HTTP/2 stream multiplexer.
var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>

/// A callback which is invoked when creating an HTTP/2 stream.
var onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)?

/// Whether the connection is insecure (i.e. plaintext).
var isPlaintext: Bool

package init(
channel: NIOAsyncChannel<ClientConnectionEvent, Void>,
multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>,
isPlaintext: Bool
isPlaintext: Bool,
onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)?
) {
self.channel = channel
self.multiplexer = multiplexer
self.isPlaintext = isPlaintext
self.onCreateHTTP2Stream = onCreateHTTP2Stream
}
}
29 changes: 29 additions & 0 deletions Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

public import GRPCCore
public import NIOCore

/// A namespace for the HTTP/2 client transport.
public enum HTTP2ClientTransport {}
Expand Down Expand Up @@ -164,4 +165,32 @@ extension HTTP2ClientTransport.Config {
Self(maxFrameSize: 1 << 14, targetWindowSize: 8 * 1024 * 1024, authority: nil)
}
}

/// A set of callbacks used for debugging purposes.
///
/// The callbacks give you access to the underlying NIO `Channel` after gRPC's initializer has
/// run for each `Channel`. These callbacks are intended for debugging purposes.
///
/// - Important: You should be very careful when implementing these callbacks as they may have
/// unexpected side effects on your gRPC application.
public struct ChannelDebuggingCallbacks: Sendable {
/// A callback invoked with each new TCP connection.
public var onCreateTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)?

/// A callback invoked with each new HTTP/2 stream.
public var onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)?

public init(
onCreateTCPConnection: (@Sendable (_ channel: any Channel) -> Void)?,
onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) -> Void)?
) {
self.onCreateTCPConnection = onCreateTCPConnection
self.onCreateHTTP2Stream = onCreateHTTP2Stream
}

/// Default values; no callbacks are set.
public static var defaults: Self {
Self(onCreateTCPConnection: nil, onCreateHTTP2Stream: nil)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2025, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package import NIOCore

extension EventLoopFuture where Value: Sendable {
package func runCallbackIfSet(
on channel: any Channel,
callback: (@Sendable (any Channel) async throws -> Void)?
) -> EventLoopFuture<Value> {
guard let initializer = callback else { return self }

// The code below code is equivalent to the following but avoids allocating an extra future.
//
// return self.flatMap { value in
// self.eventLoop.makeFutureWithTask {
// try await userInitializer(channel)
// }.map {
// value
// }
// }
//
let promise = self.eventLoop.makePromise(of: Value.self)
self.whenComplete { result in
switch result {
case .success(let value):
Task {
do {
try await initializer(channel)
promise.succeed(value)
} catch {
promise.fail(error)
}
}

case .failure(let error):
promise.fail(error)
}
}

return promise.futureResult
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ extension ChannelPipeline.SynchronousOperations {
connectionConfig: HTTP2ServerTransport.Config.Connection,
http2Config: HTTP2ServerTransport.Config.HTTP2,
rpcConfig: HTTP2ServerTransport.Config.RPC,
debugConfig: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks,
requireALPN: Bool,
scheme: Scheme
) throws -> (HTTP2ConnectionChannel, HTTP2StreamMultiplexer) {
Expand Down Expand Up @@ -99,7 +100,10 @@ extension ChannelPipeline.SynchronousOperations {
wrappingChannelSynchronously: streamChannel
)
return (asyncStreamChannel, methodDescriptorPromise.futureResult)
}
}.runCallbackIfSet(
on: streamChannel,
callback: debugConfig.onAcceptHTTP2Stream
)
}

try self.addHandler(serverConnectionHandler)
Expand Down
34 changes: 34 additions & 0 deletions Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

public import GRPCCore
public import NIOCore
internal import NIOHTTP2

/// A namespace for the HTTP/2 server transport.
Expand Down Expand Up @@ -185,4 +186,37 @@ extension HTTP2ServerTransport.Config {
Self(maxRequestPayloadSize: 4 * 1024 * 1024)
}
}

/// A set of callbacks used for debugging purposes.
///
/// The callbacks give you access to the underlying NIO `Channel` after gRPC's initializer has
/// run for each `Channel`. These callbacks are intended for debugging purposes.
///
/// - Important: You should be very careful when implementing these callbacks as they may have
/// unexpected side effects on your gRPC application.
public struct ChannelDebuggingCallbacks: Sendable {
/// A callback invoked when the server starts listening for new TCP connections.
public var onBindTCPListener: (@Sendable (_ channel: any Channel) async throws -> Void)?

/// A callback invoked with each new accepted TPC connection.
public var onAcceptTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)?

/// A callback invoked with each accepted HTTP/2 stream.
public var onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)?

public init(
onBindTCPListener: (@Sendable (_ channel: any Channel) -> Void)?,
onAcceptTCPConnection: (@Sendable (_ channel: any Channel) -> Void)?,
onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) -> Void)?
) {
self.onBindTCPListener = onBindTCPListener
self.onAcceptTCPConnection = onAcceptTCPConnection
self.onAcceptHTTP2Stream = onAcceptHTTP2Stream
}

/// Default values; no callbacks are set.
public static var defaults: Self {
Self(onBindTCPListener: nil, onAcceptTCPConnection: nil, onAcceptHTTP2Stream: nil)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,17 @@ extension HTTP2ClientTransport.Posix {
channel: channel,
config: GRPCChannel.Config(posix: self.config)
)
}
}.runCallbackIfSet(
on: channel,
callback: self.config.channelDebuggingCallbacks.onCreateTCPConnection
)
}

return HTTP2Connection(
channel: channel,
multiplexer: multiplexer,
isPlaintext: self.isPlainText
isPlaintext: self.isPlainText,
onCreateHTTP2Stream: self.config.channelDebuggingCallbacks.onCreateHTTP2Stream
)
}
}
Expand All @@ -208,25 +212,31 @@ extension HTTP2ClientTransport.Posix {
/// Compression configuration.
public var compression: HTTP2ClientTransport.Config.Compression

/// Channel callbacks for debugging.
public var channelDebuggingCallbacks: HTTP2ClientTransport.Config.ChannelDebuggingCallbacks

/// Creates a new connection configuration.
///
/// - Parameters:
/// - http2: HTTP2 configuration.
/// - backoff: Backoff configuration.
/// - connection: Connection configuration.
/// - compression: Compression configuration.
/// - channelDebuggingCallbacks: Channel callbacks for debugging.
///
/// - SeeAlso: ``defaults(configure:)`` and ``defaults``.
public init(
http2: HTTP2ClientTransport.Config.HTTP2,
backoff: HTTP2ClientTransport.Config.Backoff,
connection: HTTP2ClientTransport.Config.Connection,
compression: HTTP2ClientTransport.Config.Compression
compression: HTTP2ClientTransport.Config.Compression,
channelDebuggingCallbacks: HTTP2ClientTransport.Config.ChannelDebuggingCallbacks
) {
self.http2 = http2
self.connection = connection
self.backoff = backoff
self.compression = compression
self.channelDebuggingCallbacks = channelDebuggingCallbacks
}

/// Default configuration.
Expand All @@ -245,7 +255,8 @@ extension HTTP2ClientTransport.Posix {
http2: .defaults,
backoff: .defaults,
connection: .defaults,
compression: .defaults
compression: .defaults,
channelDebuggingCallbacks: .defaults
)
configure(&config)
return config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ extension HTTP2ServerTransport {
let quiescingHandler = serverQuiescingHelper.makeServerChannelHandler(
channel: channel
)
return try channel.pipeline.syncOperations.addHandler(quiescingHandler)
}
try channel.pipeline.syncOperations.addHandler(quiescingHandler)
}.runCallbackIfSet(
on: channel,
callback: self.config.channelDebuggingCallbacks.onBindTCPListener
)
}
.bind(to: address) { channel in
channel.eventLoop.makeCompletedFuture {
Expand All @@ -113,10 +116,14 @@ extension HTTP2ServerTransport {
connectionConfig: self.config.connection,
http2Config: self.config.http2,
rpcConfig: self.config.rpc,
debugConfig: self.config.channelDebuggingCallbacks,
requireALPN: requireALPN,
scheme: scheme
)
}
}.runCallbackIfSet(
on: channel,
callback: self.config.channelDebuggingCallbacks.onAcceptTCPConnection
)
}

return serverChannel
Expand Down Expand Up @@ -191,25 +198,31 @@ extension HTTP2ServerTransport.Posix {
/// RPC configuration.
public var rpc: HTTP2ServerTransport.Config.RPC

/// Channel callbacks for debugging.
public var channelDebuggingCallbacks: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks

/// Construct a new `Config`.
///
/// - Parameters:
/// - http2: HTTP2 configuration.
/// - rpc: RPC configuration.
/// - connection: Connection configuration.
/// - compression: Compression configuration.
/// - channelDebuggingCallbacks: Channel callbacks for debugging.
///
/// - SeeAlso: ``defaults(configure:)`` and ``defaults``.
public init(
http2: HTTP2ServerTransport.Config.HTTP2,
rpc: HTTP2ServerTransport.Config.RPC,
connection: HTTP2ServerTransport.Config.Connection,
compression: HTTP2ServerTransport.Config.Compression
compression: HTTP2ServerTransport.Config.Compression,
channelDebuggingCallbacks: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks
) {
self.compression = compression
self.connection = connection
self.http2 = http2
self.rpc = rpc
self.channelDebuggingCallbacks = channelDebuggingCallbacks
}

/// Default configuration.
Expand All @@ -228,7 +241,8 @@ extension HTTP2ServerTransport.Posix {
http2: .defaults,
rpc: .defaults,
connection: .defaults,
compression: .defaults
compression: .defaults,
channelDebuggingCallbacks: .defaults
)
configure(&config)
return config
Expand Down
Loading

0 comments on commit 674b6c2

Please sign in to comment.