Skip to content

Commit

Permalink
Add debug channel callbacks (#52)
Browse files Browse the repository at this point in the history
Motivation:

Sometimes it can be useful to modify the channel pipeline or inspect the
channel when debugging. v1 has hooks to allow users to do this which
proved helpful a number of times. v2 should offer similar behaviour.

Modifications:

- Add 'DebugChannelCallbacks' to the client and server config and wire
them up appropriately.
- Add tests. Extract 'TransportKind' from some tests as it was being
recreated a number of times.

Result:

Users can modify the channel pipeline if they need to.

---------

Co-authored-by: Gus Cairo <[email protected]>
  • Loading branch information
glbrntt and gjcairo authored Jan 17, 2025
1 parent 882788e commit 33a1aec
Show file tree
Hide file tree
Showing 17 changed files with 540 additions and 82 deletions.
17 changes: 10 additions & 7 deletions Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ package final class Connection: Sendable {
descriptor: MethodDescriptor,
options: CallOptions
) async throws -> Stream {
let (multiplexer, scheme, remotePeer, localPeer) = try self.state.withLock { state in
let connected = try self.state.withLock { state in
switch state {
case .connected(let connected):
return (connected.multiplexer, connected.scheme, connected.remotePeer, connected.localPeer)
return connected
case .notConnected, .closing, .closed:
throw RPCError(code: .unavailable, message: "subchannel isn't ready")
}
Expand All @@ -221,11 +221,11 @@ package final class Connection: Sendable {
let maxRequestSize = options.maxRequestMessageBytes ?? Self.defaultMaxRequestMessageSizeBytes

do {
let stream = try await multiplexer.openStream { channel in
let stream = try await connected.multiplexer.openStream { channel in
channel.eventLoop.makeCompletedFuture {
let streamHandler = GRPCClientStreamHandler(
methodDescriptor: descriptor,
scheme: scheme,
scheme: connected.scheme,
// The value of authority here is being used for the ":authority" pseudo-header. Derive
// one from the address if we don't already have one.
authority: self.authority ?? self.address.authority,
Expand All @@ -243,13 +243,13 @@ package final class Connection: Sendable {
outboundType: RPCRequestPart<GRPCNIOTransportBytes>.self
)
)
}
}.runInitializerIfSet(connected.onCreateHTTP2Stream, on: channel)
}

let context = ClientContext(
descriptor: descriptor,
remotePeer: remotePeer,
localPeer: localPeer
remotePeer: connected.remotePeer,
localPeer: connected.localPeer
)

return Stream(wrapping: stream, context: context)
Expand Down Expand Up @@ -480,13 +480,16 @@ extension Connection {
var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>
/// Whether the connection is plaintext, `false` implies TLS is being used.
var scheme: Scheme
/// A user-provided callback to call after creating the stream.
var onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture<Void>)?

init(_ connection: HTTP2Connection) {
self.channel = connection.channel
self.remotePeer = connection.channel.remoteAddressInfo
self.localPeer = connection.channel.localAddressInfo
self.multiplexer = connection.multiplexer
self.scheme = connection.isPlaintext ? .http : .https
self.onCreateHTTP2Stream = connection.onCreateHTTP2Stream
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,21 @@ package struct HTTP2Connection: Sendable {
/// An HTTP/2 stream multiplexer.
var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>

/// A callback which is invoked when creating an HTTP/2 stream.
var onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture<Void>)?

/// Whether the connection is insecure (i.e. plaintext).
var isPlaintext: Bool

package init(
channel: NIOAsyncChannel<ClientConnectionEvent, Void>,
multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>,
isPlaintext: Bool
isPlaintext: Bool,
onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture<Void>)?
) {
self.channel = channel
self.multiplexer = multiplexer
self.isPlaintext = isPlaintext
self.onCreateHTTP2Stream = onCreateHTTP2Stream
}
}
29 changes: 29 additions & 0 deletions Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

public import GRPCCore
public import NIOCore

/// A namespace for the HTTP/2 client transport.
public enum HTTP2ClientTransport {}
Expand Down Expand Up @@ -164,4 +165,32 @@ extension HTTP2ClientTransport.Config {
Self(maxFrameSize: 1 << 14, targetWindowSize: 8 * 1024 * 1024, authority: nil)
}
}

/// A set of callbacks used for debugging purposes.
///
/// The callbacks give you access to the underlying NIO `Channel` after gRPC's initializer has
/// run for each `Channel`. These callbacks are intended for debugging purposes.
///
/// - Important: You should be very careful when implementing these callbacks as they may have
/// unexpected side effects on your gRPC application.
public struct ChannelDebuggingCallbacks: Sendable {
/// A callback invoked with each new TCP connection.
public var onCreateTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?

/// A callback invoked with each new HTTP/2 stream.
public var onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?

public init(
onCreateTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?,
onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?
) {
self.onCreateTCPConnection = onCreateTCPConnection
self.onCreateHTTP2Stream = onCreateHTTP2Stream
}

/// Default values; no callbacks are set.
public static var defaults: Self {
Self(onCreateTCPConnection: nil, onCreateHTTP2Stream: nil)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2025, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package import NIOCore

extension EventLoopFuture where Value: Sendable {
package func runInitializerIfSet(
_ initializer: (@Sendable (any Channel) -> EventLoopFuture<Void>)?,
on channel: any Channel
) -> EventLoopFuture<Value> {
guard let initializer = initializer else { return self }

return self.flatMap { value in
initializer(channel).map { value }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ extension ChannelPipeline.SynchronousOperations {
connectionConfig: HTTP2ServerTransport.Config.Connection,
http2Config: HTTP2ServerTransport.Config.HTTP2,
rpcConfig: HTTP2ServerTransport.Config.RPC,
debugConfig: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks,
requireALPN: Bool,
scheme: Scheme
) throws -> (HTTP2ConnectionChannel, HTTP2StreamMultiplexer) {
Expand Down Expand Up @@ -109,7 +110,7 @@ extension ChannelPipeline.SynchronousOperations {
)
)
return (asyncStreamChannel, methodDescriptorPromise.futureResult)
}
}.runInitializerIfSet(debugConfig.onAcceptHTTP2Stream, on: streamChannel)
}

try self.addHandler(serverConnectionHandler)
Expand Down
34 changes: 34 additions & 0 deletions Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

public import GRPCCore
public import NIOCore
internal import NIOHTTP2

/// A namespace for the HTTP/2 server transport.
Expand Down Expand Up @@ -185,4 +186,37 @@ extension HTTP2ServerTransport.Config {
Self(maxRequestPayloadSize: 4 * 1024 * 1024)
}
}

/// A set of callbacks used for debugging purposes.
///
/// The callbacks give you access to the underlying NIO `Channel` after gRPC's initializer has
/// run for each `Channel`. These callbacks are intended for debugging purposes.
///
/// - Important: You should be very careful when implementing these callbacks as they may have
/// unexpected side effects on your gRPC application.
public struct ChannelDebuggingCallbacks: Sendable {
/// A callback invoked when the server starts listening for new TCP connections.
public var onBindTCPListener: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?

/// A callback invoked with each new accepted TPC connection.
public var onAcceptTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?

/// A callback invoked with each accepted HTTP/2 stream.
public var onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?

public init(
onBindTCPListener: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?,
onAcceptTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?,
onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?
) {
self.onBindTCPListener = onBindTCPListener
self.onAcceptTCPConnection = onAcceptTCPConnection
self.onAcceptHTTP2Stream = onAcceptHTTP2Stream
}

/// Default values; no callbacks are set.
public static var defaults: Self {
Self(onBindTCPListener: nil, onAcceptTCPConnection: nil, onAcceptHTTP2Stream: nil)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,17 @@ extension HTTP2ClientTransport.Posix {
channel: channel,
config: GRPCChannel.Config(posix: self.config)
)
}
}.runInitializerIfSet(
self.config.channelDebuggingCallbacks.onCreateTCPConnection,
on: channel
)
}

return HTTP2Connection(
channel: channel,
multiplexer: multiplexer,
isPlaintext: self.isPlainText
isPlaintext: self.isPlainText,
onCreateHTTP2Stream: self.config.channelDebuggingCallbacks.onCreateHTTP2Stream
)
}
}
Expand All @@ -210,25 +214,31 @@ extension HTTP2ClientTransport.Posix {
/// Compression configuration.
public var compression: HTTP2ClientTransport.Config.Compression

/// Channel callbacks for debugging.
public var channelDebuggingCallbacks: HTTP2ClientTransport.Config.ChannelDebuggingCallbacks

/// Creates a new connection configuration.
///
/// - Parameters:
/// - http2: HTTP2 configuration.
/// - backoff: Backoff configuration.
/// - connection: Connection configuration.
/// - compression: Compression configuration.
/// - channelDebuggingCallbacks: Channel callbacks for debugging.
///
/// - SeeAlso: ``defaults(configure:)`` and ``defaults``.
public init(
http2: HTTP2ClientTransport.Config.HTTP2,
backoff: HTTP2ClientTransport.Config.Backoff,
connection: HTTP2ClientTransport.Config.Connection,
compression: HTTP2ClientTransport.Config.Compression
compression: HTTP2ClientTransport.Config.Compression,
channelDebuggingCallbacks: HTTP2ClientTransport.Config.ChannelDebuggingCallbacks
) {
self.http2 = http2
self.connection = connection
self.backoff = backoff
self.compression = compression
self.channelDebuggingCallbacks = channelDebuggingCallbacks
}

/// Default configuration.
Expand All @@ -247,7 +257,8 @@ extension HTTP2ClientTransport.Posix {
http2: .defaults,
backoff: .defaults,
connection: .defaults,
compression: .defaults
compression: .defaults,
channelDebuggingCallbacks: .defaults
)
configure(&config)
return config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ extension HTTP2ServerTransport {
let quiescingHandler = serverQuiescingHelper.makeServerChannelHandler(
channel: channel
)
return try channel.pipeline.syncOperations.addHandler(quiescingHandler)
}
try channel.pipeline.syncOperations.addHandler(quiescingHandler)
}.runInitializerIfSet(
self.config.channelDebuggingCallbacks.onBindTCPListener,
on: channel
)
}
.bind(to: address) { channel in
channel.eventLoop.makeCompletedFuture {
Expand All @@ -115,10 +118,14 @@ extension HTTP2ServerTransport {
connectionConfig: self.config.connection,
http2Config: self.config.http2,
rpcConfig: self.config.rpc,
debugConfig: self.config.channelDebuggingCallbacks,
requireALPN: requireALPN,
scheme: scheme
)
}
}.runInitializerIfSet(
self.config.channelDebuggingCallbacks.onAcceptTCPConnection,
on: channel
)
}

return serverChannel
Expand Down Expand Up @@ -193,25 +200,31 @@ extension HTTP2ServerTransport.Posix {
/// RPC configuration.
public var rpc: HTTP2ServerTransport.Config.RPC

/// Channel callbacks for debugging.
public var channelDebuggingCallbacks: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks

/// Construct a new `Config`.
///
/// - Parameters:
/// - http2: HTTP2 configuration.
/// - rpc: RPC configuration.
/// - connection: Connection configuration.
/// - compression: Compression configuration.
/// - channelDebuggingCallbacks: Channel callbacks for debugging.
///
/// - SeeAlso: ``defaults(configure:)`` and ``defaults``.
public init(
http2: HTTP2ServerTransport.Config.HTTP2,
rpc: HTTP2ServerTransport.Config.RPC,
connection: HTTP2ServerTransport.Config.Connection,
compression: HTTP2ServerTransport.Config.Compression
compression: HTTP2ServerTransport.Config.Compression,
channelDebuggingCallbacks: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks
) {
self.compression = compression
self.connection = connection
self.http2 = http2
self.rpc = rpc
self.channelDebuggingCallbacks = channelDebuggingCallbacks
}

/// Default configuration.
Expand All @@ -230,7 +243,8 @@ extension HTTP2ServerTransport.Posix {
http2: .defaults,
rpc: .defaults,
connection: .defaults,
compression: .defaults
compression: .defaults,
channelDebuggingCallbacks: .defaults
)
configure(&config)
return config
Expand Down
Loading

0 comments on commit 33a1aec

Please sign in to comment.