Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add debug channel callbacks #52

Merged
merged 10 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ package final class Connection: Sendable {
descriptor: MethodDescriptor,
options: CallOptions
) async throws -> Stream {
let (multiplexer, scheme, remotePeer, localPeer) = try self.state.withLock { state in
let connected = try self.state.withLock { state in
switch state {
case .connected(let connected):
return (connected.multiplexer, connected.scheme, connected.remotePeer, connected.localPeer)
return connected
case .notConnected, .closing, .closed:
throw RPCError(code: .unavailable, message: "subchannel isn't ready")
}
Expand All @@ -221,11 +221,11 @@ package final class Connection: Sendable {
let maxRequestSize = options.maxRequestMessageBytes ?? Self.defaultMaxRequestMessageSizeBytes

do {
let stream = try await multiplexer.openStream { channel in
let stream = try await connected.multiplexer.openStream { channel in
channel.eventLoop.makeCompletedFuture {
let streamHandler = GRPCClientStreamHandler(
methodDescriptor: descriptor,
scheme: scheme,
scheme: connected.scheme,
// The value of authority here is being used for the ":authority" pseudo-header. Derive
// one from the address if we don't already have one.
authority: self.authority ?? self.address.authority,
Expand All @@ -243,13 +243,13 @@ package final class Connection: Sendable {
outboundType: RPCRequestPart<GRPCNIOTransportBytes>.self
)
)
}
}.runInitializerIfSet(connected.onCreateHTTP2Stream, on: channel)
}

let context = ClientContext(
descriptor: descriptor,
remotePeer: remotePeer,
localPeer: localPeer
remotePeer: connected.remotePeer,
localPeer: connected.localPeer
)

return Stream(wrapping: stream, context: context)
Expand Down Expand Up @@ -480,13 +480,16 @@ extension Connection {
var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>
/// Whether the connection is plaintext, `false` implies TLS is being used.
var scheme: Scheme
/// A user-provided callback to call after creating the stream.
var onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture<Void>)?

init(_ connection: HTTP2Connection) {
self.channel = connection.channel
self.remotePeer = connection.channel.remoteAddressInfo
self.localPeer = connection.channel.localAddressInfo
self.multiplexer = connection.multiplexer
self.scheme = connection.isPlaintext ? .http : .https
self.onCreateHTTP2Stream = connection.onCreateHTTP2Stream
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,21 @@ package struct HTTP2Connection: Sendable {
/// An HTTP/2 stream multiplexer.
var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>

/// A callback which is invoked when creating an HTTP/2 stream.
var onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture<Void>)?

/// Whether the connection is insecure (i.e. plaintext).
var isPlaintext: Bool

package init(
channel: NIOAsyncChannel<ClientConnectionEvent, Void>,
multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>,
isPlaintext: Bool
isPlaintext: Bool,
onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture<Void>)?
) {
self.channel = channel
self.multiplexer = multiplexer
self.isPlaintext = isPlaintext
self.onCreateHTTP2Stream = onCreateHTTP2Stream
}
}
29 changes: 29 additions & 0 deletions Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

public import GRPCCore
public import NIOCore

/// A namespace for the HTTP/2 client transport.
public enum HTTP2ClientTransport {}
Expand Down Expand Up @@ -164,4 +165,32 @@ extension HTTP2ClientTransport.Config {
Self(maxFrameSize: 1 << 14, targetWindowSize: 8 * 1024 * 1024, authority: nil)
}
}

/// A set of callbacks used for debugging purposes.
///
/// The callbacks give you access to the underlying NIO `Channel` after gRPC's initializer has
/// run for each `Channel`. These callbacks are intended for debugging purposes.
///
/// - Important: You should be very careful when implementing these callbacks as they may have
/// unexpected side effects on your gRPC application.
public struct ChannelDebuggingCallbacks: Sendable {
/// A callback invoked with each new TCP connection.
public var onCreateTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?

/// A callback invoked with each new HTTP/2 stream.
public var onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?

public init(
onCreateTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?,
onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?
) {
self.onCreateTCPConnection = onCreateTCPConnection
self.onCreateHTTP2Stream = onCreateHTTP2Stream
}

/// Default values; no callbacks are set.
public static var defaults: Self {
Self(onCreateTCPConnection: nil, onCreateHTTP2Stream: nil)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2025, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package import NIOCore

extension EventLoopFuture where Value: Sendable {
package func runInitializerIfSet(
_ initializer: (@Sendable (any Channel) -> EventLoopFuture<Void>)?,
on channel: any Channel
) -> EventLoopFuture<Value> {
guard let initializer = initializer else { return self }

return self.flatMap { value in
initializer(channel).map { value }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ extension ChannelPipeline.SynchronousOperations {
connectionConfig: HTTP2ServerTransport.Config.Connection,
http2Config: HTTP2ServerTransport.Config.HTTP2,
rpcConfig: HTTP2ServerTransport.Config.RPC,
debugConfig: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks,
requireALPN: Bool,
scheme: Scheme
) throws -> (HTTP2ConnectionChannel, HTTP2StreamMultiplexer) {
Expand Down Expand Up @@ -109,7 +110,7 @@ extension ChannelPipeline.SynchronousOperations {
)
)
return (asyncStreamChannel, methodDescriptorPromise.futureResult)
}
}.runInitializerIfSet(debugConfig.onAcceptHTTP2Stream, on: streamChannel)
}

try self.addHandler(serverConnectionHandler)
Expand Down
34 changes: 34 additions & 0 deletions Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

public import GRPCCore
public import NIOCore
internal import NIOHTTP2

/// A namespace for the HTTP/2 server transport.
Expand Down Expand Up @@ -185,4 +186,37 @@ extension HTTP2ServerTransport.Config {
Self(maxRequestPayloadSize: 4 * 1024 * 1024)
}
}

/// A set of callbacks used for debugging purposes.
///
/// The callbacks give you access to the underlying NIO `Channel` after gRPC's initializer has
/// run for each `Channel`. These callbacks are intended for debugging purposes.
///
/// - Important: You should be very careful when implementing these callbacks as they may have
/// unexpected side effects on your gRPC application.
public struct ChannelDebuggingCallbacks: Sendable {
/// A callback invoked when the server starts listening for new TCP connections.
public var onBindTCPListener: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?

/// A callback invoked with each new accepted TPC connection.
public var onAcceptTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?

/// A callback invoked with each accepted HTTP/2 stream.
public var onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?

public init(
onBindTCPListener: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?,
onAcceptTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?,
onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?
) {
self.onBindTCPListener = onBindTCPListener
self.onAcceptTCPConnection = onAcceptTCPConnection
self.onAcceptHTTP2Stream = onAcceptHTTP2Stream
}

/// Default values; no callbacks are set.
public static var defaults: Self {
Self(onBindTCPListener: nil, onAcceptTCPConnection: nil, onAcceptHTTP2Stream: nil)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,17 @@ extension HTTP2ClientTransport.Posix {
channel: channel,
config: GRPCChannel.Config(posix: self.config)
)
}
}.runInitializerIfSet(
self.config.channelDebuggingCallbacks.onCreateTCPConnection,
on: channel
)
}

return HTTP2Connection(
channel: channel,
multiplexer: multiplexer,
isPlaintext: self.isPlainText
isPlaintext: self.isPlainText,
onCreateHTTP2Stream: self.config.channelDebuggingCallbacks.onCreateHTTP2Stream
)
}
}
Expand All @@ -210,25 +214,31 @@ extension HTTP2ClientTransport.Posix {
/// Compression configuration.
public var compression: HTTP2ClientTransport.Config.Compression

/// Channel callbacks for debugging.
public var channelDebuggingCallbacks: HTTP2ClientTransport.Config.ChannelDebuggingCallbacks

/// Creates a new connection configuration.
///
/// - Parameters:
/// - http2: HTTP2 configuration.
/// - backoff: Backoff configuration.
/// - connection: Connection configuration.
/// - compression: Compression configuration.
/// - channelDebuggingCallbacks: Channel callbacks for debugging.
///
/// - SeeAlso: ``defaults(configure:)`` and ``defaults``.
public init(
http2: HTTP2ClientTransport.Config.HTTP2,
backoff: HTTP2ClientTransport.Config.Backoff,
connection: HTTP2ClientTransport.Config.Connection,
compression: HTTP2ClientTransport.Config.Compression
compression: HTTP2ClientTransport.Config.Compression,
channelDebuggingCallbacks: HTTP2ClientTransport.Config.ChannelDebuggingCallbacks
) {
self.http2 = http2
self.connection = connection
self.backoff = backoff
self.compression = compression
self.channelDebuggingCallbacks = channelDebuggingCallbacks
}

/// Default configuration.
Expand All @@ -247,7 +257,8 @@ extension HTTP2ClientTransport.Posix {
http2: .defaults,
backoff: .defaults,
connection: .defaults,
compression: .defaults
compression: .defaults,
channelDebuggingCallbacks: .defaults
)
configure(&config)
return config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ extension HTTP2ServerTransport {
let quiescingHandler = serverQuiescingHelper.makeServerChannelHandler(
channel: channel
)
return try channel.pipeline.syncOperations.addHandler(quiescingHandler)
}
try channel.pipeline.syncOperations.addHandler(quiescingHandler)
}.runInitializerIfSet(
self.config.channelDebuggingCallbacks.onBindTCPListener,
on: channel
)
}
.bind(to: address) { channel in
channel.eventLoop.makeCompletedFuture {
Expand All @@ -115,10 +118,14 @@ extension HTTP2ServerTransport {
connectionConfig: self.config.connection,
http2Config: self.config.http2,
rpcConfig: self.config.rpc,
debugConfig: self.config.channelDebuggingCallbacks,
requireALPN: requireALPN,
scheme: scheme
)
}
}.runInitializerIfSet(
self.config.channelDebuggingCallbacks.onAcceptTCPConnection,
on: channel
)
}

return serverChannel
Expand Down Expand Up @@ -193,25 +200,31 @@ extension HTTP2ServerTransport.Posix {
/// RPC configuration.
public var rpc: HTTP2ServerTransport.Config.RPC

/// Channel callbacks for debugging.
public var channelDebuggingCallbacks: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks

/// Construct a new `Config`.
///
/// - Parameters:
/// - http2: HTTP2 configuration.
/// - rpc: RPC configuration.
/// - connection: Connection configuration.
/// - compression: Compression configuration.
/// - channelDebuggingCallbacks: Channel callbacks for debugging.
///
/// - SeeAlso: ``defaults(configure:)`` and ``defaults``.
public init(
http2: HTTP2ServerTransport.Config.HTTP2,
rpc: HTTP2ServerTransport.Config.RPC,
connection: HTTP2ServerTransport.Config.Connection,
compression: HTTP2ServerTransport.Config.Compression
compression: HTTP2ServerTransport.Config.Compression,
channelDebuggingCallbacks: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks
) {
self.compression = compression
self.connection = connection
self.http2 = http2
self.rpc = rpc
self.channelDebuggingCallbacks = channelDebuggingCallbacks
}

/// Default configuration.
Expand All @@ -230,7 +243,8 @@ extension HTTP2ServerTransport.Posix {
http2: .defaults,
rpc: .defaults,
connection: .defaults,
compression: .defaults
compression: .defaults,
channelDebuggingCallbacks: .defaults
)
configure(&config)
return config
Expand Down
Loading
Loading