From 33a1aeccf6b74fe1e8babc7efc41df841b945106 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Fri, 17 Jan 2025 15:41:27 +0000 Subject: [PATCH] Add debug channel callbacks (#52) Motivation: Sometimes it can be useful to modify the channel pipeline or inspect the channel when debugging. v1 has hooks to allow users to do this which proved helpful a number of times. v2 should offer similar behaviour. Modifications: - Add 'DebugChannelCallbacks' to the client and server config and wire them up appropriately. - Add tests. Extract 'TransportKind' from some tests as it was being recreated a number of times. Result: Users can modify the channel pipeline if they need to. --------- Co-authored-by: Gus Cairo --- .../Client/Connection/Connection.swift | 17 +- .../Client/Connection/ConnectionFactory.swift | 7 +- .../Client/HTTP2ClientTransport.swift | 29 ++++ .../EventLoopFuture+ChannelInitializer.swift | 30 ++++ .../Internal/NIOChannelPipeline+GRPC.swift | 3 +- .../Server/HTTP2ServerTransport.swift | 34 ++++ .../HTTP2ClientTransport+Posix.swift | 19 ++- .../HTTP2ServerTransport+Posix.swift | 24 ++- ...TP2ClientTransport+TransportServices.swift | 19 ++- ...TP2ServerTransport+TransportServices.swift | 32 +++- .../Utilities/HTTP2Connectors.swift | 7 +- .../HTTP2ServerTransport+DebugTests.swift | 158 ++++++++++++++++++ .../HTTP2TransportTLSEnabledTests.swift | 15 -- .../HTTP2TransportTests.swift | 53 ++---- .../Test Utilities/JSONCoding.swift | 43 +++++ .../Test Utilities/StatsService.swift | 121 ++++++++++++++ .../Test Utilities/TransportKind.swift | 11 ++ 17 files changed, 540 insertions(+), 82 deletions(-) create mode 100644 Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift create mode 100644 Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift create mode 100644 Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/JSONCoding.swift create mode 100644 Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/StatsService.swift diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift b/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift index 832eaf7..3a3132a 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift @@ -202,10 +202,10 @@ package final class Connection: Sendable { descriptor: MethodDescriptor, options: CallOptions ) async throws -> Stream { - let (multiplexer, scheme, remotePeer, localPeer) = try self.state.withLock { state in + let connected = try self.state.withLock { state in switch state { case .connected(let connected): - return (connected.multiplexer, connected.scheme, connected.remotePeer, connected.localPeer) + return connected case .notConnected, .closing, .closed: throw RPCError(code: .unavailable, message: "subchannel isn't ready") } @@ -221,11 +221,11 @@ package final class Connection: Sendable { let maxRequestSize = options.maxRequestMessageBytes ?? Self.defaultMaxRequestMessageSizeBytes do { - let stream = try await multiplexer.openStream { channel in + let stream = try await connected.multiplexer.openStream { channel in channel.eventLoop.makeCompletedFuture { let streamHandler = GRPCClientStreamHandler( methodDescriptor: descriptor, - scheme: scheme, + scheme: connected.scheme, // The value of authority here is being used for the ":authority" pseudo-header. Derive // one from the address if we don't already have one. authority: self.authority ?? self.address.authority, @@ -243,13 +243,13 @@ package final class Connection: Sendable { outboundType: RPCRequestPart.self ) ) - } + }.runInitializerIfSet(connected.onCreateHTTP2Stream, on: channel) } let context = ClientContext( descriptor: descriptor, - remotePeer: remotePeer, - localPeer: localPeer + remotePeer: connected.remotePeer, + localPeer: connected.localPeer ) return Stream(wrapping: stream, context: context) @@ -480,6 +480,8 @@ extension Connection { var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer /// Whether the connection is plaintext, `false` implies TLS is being used. var scheme: Scheme + /// A user-provided callback to call after creating the stream. + var onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture)? init(_ connection: HTTP2Connection) { self.channel = connection.channel @@ -487,6 +489,7 @@ extension Connection { self.localPeer = connection.channel.localAddressInfo self.multiplexer = connection.multiplexer self.scheme = connection.isPlaintext ? .http : .https + self.onCreateHTTP2Stream = connection.onCreateHTTP2Stream } } diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift b/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift index d03deb0..cb42d82 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift @@ -37,16 +37,21 @@ package struct HTTP2Connection: Sendable { /// An HTTP/2 stream multiplexer. var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer + /// A callback which is invoked when creating an HTTP/2 stream. + var onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture)? + /// Whether the connection is insecure (i.e. plaintext). var isPlaintext: Bool package init( channel: NIOAsyncChannel, multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer, - isPlaintext: Bool + isPlaintext: Bool, + onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture)? ) { self.channel = channel self.multiplexer = multiplexer self.isPlaintext = isPlaintext + self.onCreateHTTP2Stream = onCreateHTTP2Stream } } diff --git a/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift b/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift index 4bfbf4f..76487b5 100644 --- a/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift +++ b/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift @@ -15,6 +15,7 @@ */ public import GRPCCore +public import NIOCore /// A namespace for the HTTP/2 client transport. public enum HTTP2ClientTransport {} @@ -164,4 +165,32 @@ extension HTTP2ClientTransport.Config { Self(maxFrameSize: 1 << 14, targetWindowSize: 8 * 1024 * 1024, authority: nil) } } + + /// A set of callbacks used for debugging purposes. + /// + /// The callbacks give you access to the underlying NIO `Channel` after gRPC's initializer has + /// run for each `Channel`. These callbacks are intended for debugging purposes. + /// + /// - Important: You should be very careful when implementing these callbacks as they may have + /// unexpected side effects on your gRPC application. + public struct ChannelDebuggingCallbacks: Sendable { + /// A callback invoked with each new TCP connection. + public var onCreateTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? + + /// A callback invoked with each new HTTP/2 stream. + public var onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? + + public init( + onCreateTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture)?, + onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? + ) { + self.onCreateTCPConnection = onCreateTCPConnection + self.onCreateHTTP2Stream = onCreateHTTP2Stream + } + + /// Default values; no callbacks are set. + public static var defaults: Self { + Self(onCreateTCPConnection: nil, onCreateHTTP2Stream: nil) + } + } } diff --git a/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift b/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift new file mode 100644 index 0000000..5ec673b --- /dev/null +++ b/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift @@ -0,0 +1,30 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package import NIOCore + +extension EventLoopFuture where Value: Sendable { + package func runInitializerIfSet( + _ initializer: (@Sendable (any Channel) -> EventLoopFuture)?, + on channel: any Channel + ) -> EventLoopFuture { + guard let initializer = initializer else { return self } + + return self.flatMap { value in + initializer(channel).map { value } + } + } +} diff --git a/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift b/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift index e9cd74c..eded597 100644 --- a/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift +++ b/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift @@ -37,6 +37,7 @@ extension ChannelPipeline.SynchronousOperations { connectionConfig: HTTP2ServerTransport.Config.Connection, http2Config: HTTP2ServerTransport.Config.HTTP2, rpcConfig: HTTP2ServerTransport.Config.RPC, + debugConfig: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks, requireALPN: Bool, scheme: Scheme ) throws -> (HTTP2ConnectionChannel, HTTP2StreamMultiplexer) { @@ -109,7 +110,7 @@ extension ChannelPipeline.SynchronousOperations { ) ) return (asyncStreamChannel, methodDescriptorPromise.futureResult) - } + }.runInitializerIfSet(debugConfig.onAcceptHTTP2Stream, on: streamChannel) } try self.addHandler(serverConnectionHandler) diff --git a/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift b/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift index 3c07457..984e7a2 100644 --- a/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift +++ b/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift @@ -15,6 +15,7 @@ */ public import GRPCCore +public import NIOCore internal import NIOHTTP2 /// A namespace for the HTTP/2 server transport. @@ -185,4 +186,37 @@ extension HTTP2ServerTransport.Config { Self(maxRequestPayloadSize: 4 * 1024 * 1024) } } + + /// A set of callbacks used for debugging purposes. + /// + /// The callbacks give you access to the underlying NIO `Channel` after gRPC's initializer has + /// run for each `Channel`. These callbacks are intended for debugging purposes. + /// + /// - Important: You should be very careful when implementing these callbacks as they may have + /// unexpected side effects on your gRPC application. + public struct ChannelDebuggingCallbacks: Sendable { + /// A callback invoked when the server starts listening for new TCP connections. + public var onBindTCPListener: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? + + /// A callback invoked with each new accepted TPC connection. + public var onAcceptTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? + + /// A callback invoked with each accepted HTTP/2 stream. + public var onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? + + public init( + onBindTCPListener: (@Sendable (_ channel: any Channel) -> EventLoopFuture)?, + onAcceptTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture)?, + onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? + ) { + self.onBindTCPListener = onBindTCPListener + self.onAcceptTCPConnection = onAcceptTCPConnection + self.onAcceptHTTP2Stream = onAcceptHTTP2Stream + } + + /// Default values; no callbacks are set. + public static var defaults: Self { + Self(onBindTCPListener: nil, onAcceptTCPConnection: nil, onAcceptHTTP2Stream: nil) + } + } } diff --git a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift index c507554..879c3ec 100644 --- a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift +++ b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift @@ -184,13 +184,17 @@ extension HTTP2ClientTransport.Posix { channel: channel, config: GRPCChannel.Config(posix: self.config) ) - } + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onCreateTCPConnection, + on: channel + ) } return HTTP2Connection( channel: channel, multiplexer: multiplexer, - isPlaintext: self.isPlainText + isPlaintext: self.isPlainText, + onCreateHTTP2Stream: self.config.channelDebuggingCallbacks.onCreateHTTP2Stream ) } } @@ -210,6 +214,9 @@ extension HTTP2ClientTransport.Posix { /// Compression configuration. public var compression: HTTP2ClientTransport.Config.Compression + /// Channel callbacks for debugging. + public var channelDebuggingCallbacks: HTTP2ClientTransport.Config.ChannelDebuggingCallbacks + /// Creates a new connection configuration. /// /// - Parameters: @@ -217,18 +224,21 @@ extension HTTP2ClientTransport.Posix { /// - backoff: Backoff configuration. /// - connection: Connection configuration. /// - compression: Compression configuration. + /// - channelDebuggingCallbacks: Channel callbacks for debugging. /// /// - SeeAlso: ``defaults(configure:)`` and ``defaults``. public init( http2: HTTP2ClientTransport.Config.HTTP2, backoff: HTTP2ClientTransport.Config.Backoff, connection: HTTP2ClientTransport.Config.Connection, - compression: HTTP2ClientTransport.Config.Compression + compression: HTTP2ClientTransport.Config.Compression, + channelDebuggingCallbacks: HTTP2ClientTransport.Config.ChannelDebuggingCallbacks ) { self.http2 = http2 self.connection = connection self.backoff = backoff self.compression = compression + self.channelDebuggingCallbacks = channelDebuggingCallbacks } /// Default configuration. @@ -247,7 +257,8 @@ extension HTTP2ClientTransport.Posix { http2: .defaults, backoff: .defaults, connection: .defaults, - compression: .defaults + compression: .defaults, + channelDebuggingCallbacks: .defaults ) configure(&config) return config diff --git a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift index 08da85b..0a2cc4a 100644 --- a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift +++ b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift @@ -87,8 +87,11 @@ extension HTTP2ServerTransport { let quiescingHandler = serverQuiescingHelper.makeServerChannelHandler( channel: channel ) - return try channel.pipeline.syncOperations.addHandler(quiescingHandler) - } + try channel.pipeline.syncOperations.addHandler(quiescingHandler) + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onBindTCPListener, + on: channel + ) } .bind(to: address) { channel in channel.eventLoop.makeCompletedFuture { @@ -115,10 +118,14 @@ extension HTTP2ServerTransport { connectionConfig: self.config.connection, http2Config: self.config.http2, rpcConfig: self.config.rpc, + debugConfig: self.config.channelDebuggingCallbacks, requireALPN: requireALPN, scheme: scheme ) - } + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onAcceptTCPConnection, + on: channel + ) } return serverChannel @@ -193,6 +200,9 @@ extension HTTP2ServerTransport.Posix { /// RPC configuration. public var rpc: HTTP2ServerTransport.Config.RPC + /// Channel callbacks for debugging. + public var channelDebuggingCallbacks: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks + /// Construct a new `Config`. /// /// - Parameters: @@ -200,18 +210,21 @@ extension HTTP2ServerTransport.Posix { /// - rpc: RPC configuration. /// - connection: Connection configuration. /// - compression: Compression configuration. + /// - channelDebuggingCallbacks: Channel callbacks for debugging. /// /// - SeeAlso: ``defaults(configure:)`` and ``defaults``. public init( http2: HTTP2ServerTransport.Config.HTTP2, rpc: HTTP2ServerTransport.Config.RPC, connection: HTTP2ServerTransport.Config.Connection, - compression: HTTP2ServerTransport.Config.Compression + compression: HTTP2ServerTransport.Config.Compression, + channelDebuggingCallbacks: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks ) { self.compression = compression self.connection = connection self.http2 = http2 self.rpc = rpc + self.channelDebuggingCallbacks = channelDebuggingCallbacks } /// Default configuration. @@ -230,7 +243,8 @@ extension HTTP2ServerTransport.Posix { http2: .defaults, rpc: .defaults, connection: .defaults, - compression: .defaults + compression: .defaults, + channelDebuggingCallbacks: .defaults ) configure(&config) return config diff --git a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift index 33b5c9f..4925283 100644 --- a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift +++ b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift @@ -181,13 +181,17 @@ extension HTTP2ClientTransport.TransportServices { channel: channel, config: GRPCChannel.Config(transportServices: self.config) ) - } + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onCreateTCPConnection, + on: channel + ) } return HTTP2Connection( channel: channel, multiplexer: multiplexer, - isPlaintext: isPlainText + isPlaintext: isPlainText, + onCreateHTTP2Stream: self.config.channelDebuggingCallbacks.onCreateHTTP2Stream ) } } @@ -208,6 +212,9 @@ extension HTTP2ClientTransport.TransportServices { /// Compression configuration. public var compression: HTTP2ClientTransport.Config.Compression + /// Channel callbacks for debugging. + public var channelDebuggingCallbacks: HTTP2ClientTransport.Config.ChannelDebuggingCallbacks + /// Creates a new connection configuration. /// /// - Parameters: @@ -215,18 +222,21 @@ extension HTTP2ClientTransport.TransportServices { /// - backoff: Backoff configuration. /// - connection: Connection configuration. /// - compression: Compression configuration. + /// - channelDebuggingCallbacks: Channel callbacks for debugging. /// /// - SeeAlso: ``defaults(configure:)`` and ``defaults``. public init( http2: HTTP2ClientTransport.Config.HTTP2, backoff: HTTP2ClientTransport.Config.Backoff, connection: HTTP2ClientTransport.Config.Connection, - compression: HTTP2ClientTransport.Config.Compression + compression: HTTP2ClientTransport.Config.Compression, + channelDebuggingCallbacks: HTTP2ClientTransport.Config.ChannelDebuggingCallbacks ) { self.http2 = http2 self.connection = connection self.backoff = backoff self.compression = compression + self.channelDebuggingCallbacks = channelDebuggingCallbacks } /// Default configuration. @@ -245,7 +255,8 @@ extension HTTP2ClientTransport.TransportServices { http2: .defaults, backoff: .defaults, connection: .defaults, - compression: .defaults + compression: .defaults, + channelDebuggingCallbacks: .defaults ) configure(&config) return config diff --git a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift index 97d90c0..0a70437 100644 --- a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift +++ b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift @@ -61,25 +61,32 @@ extension HTTP2ServerTransport { try await bootstrap .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) .serverChannelInitializer { channel in - return channel.eventLoop.makeCompletedFuture { + channel.eventLoop.makeCompletedFuture { let quiescingHandler = serverQuiescingHelper.makeServerChannelHandler( channel: channel ) - return try channel.pipeline.syncOperations.addHandler(quiescingHandler) - } + try channel.pipeline.syncOperations.addHandler(quiescingHandler) + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onBindTCPListener, + on: channel + ) } .bind(to: address) { channel in - channel.eventLoop.makeCompletedFuture { - return try channel.pipeline.syncOperations.configureGRPCServerPipeline( + return channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.configureGRPCServerPipeline( channel: channel, compressionConfig: self.config.compression, connectionConfig: self.config.connection, http2Config: self.config.http2, rpcConfig: self.config.rpc, + debugConfig: self.config.channelDebuggingCallbacks, requireALPN: requireALPN, scheme: scheme ) - } + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onAcceptTCPConnection, + on: channel + ) } return serverChannel @@ -154,22 +161,30 @@ extension HTTP2ServerTransport.TransportServices { /// RPC configuration. public var rpc: HTTP2ServerTransport.Config.RPC + /// Channel callbacks for debugging. + public var channelDebuggingCallbacks: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks + /// Construct a new `Config`. /// - Parameters: /// - compression: Compression configuration. /// - connection: Connection configuration. /// - http2: HTTP2 configuration. /// - rpc: RPC configuration. + /// - channelDebuggingCallbacks: Channel callbacks for debugging. + /// + /// - SeeAlso: ``defaults(configure:)`` and ``defaults``. public init( compression: HTTP2ServerTransport.Config.Compression, connection: HTTP2ServerTransport.Config.Connection, http2: HTTP2ServerTransport.Config.HTTP2, - rpc: HTTP2ServerTransport.Config.RPC + rpc: HTTP2ServerTransport.Config.RPC, + channelDebuggingCallbacks: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks ) { self.compression = compression self.connection = connection self.http2 = http2 self.rpc = rpc + self.channelDebuggingCallbacks = channelDebuggingCallbacks } public static var defaults: Self { @@ -187,7 +202,8 @@ extension HTTP2ServerTransport.TransportServices { compression: .defaults, connection: .defaults, http2: .defaults, - rpc: .defaults + rpc: .defaults, + channelDebuggingCallbacks: .defaults ) configure(&config) return config diff --git a/Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/HTTP2Connectors.swift b/Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/HTTP2Connectors.swift index f23631a..7cbafa9 100644 --- a/Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/HTTP2Connectors.swift +++ b/Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/HTTP2Connectors.swift @@ -135,7 +135,12 @@ struct NIOPosixConnector: HTTP2Connector { wrappingChannelSynchronously: channel ) - return HTTP2Connection(channel: asyncChannel, multiplexer: multiplexer, isPlaintext: true) + return HTTP2Connection( + channel: asyncChannel, + multiplexer: multiplexer, + isPlaintext: true, + onCreateHTTP2Stream: nil + ) } } } diff --git a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift new file mode 100644 index 0000000..2a26d3b --- /dev/null +++ b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift @@ -0,0 +1,158 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore +import GRPCNIOTransportHTTP2 +import Testing + +@Suite("ChannelDebugCallbacks") +struct ChannelDebugCallbackTests { + @Test(arguments: TransportKind.allCases, TransportKind.allCases) + func debugCallbacksAreCalled(serverKind: TransportKind, clientKind: TransportKind) async throws { + // Validates the callbacks are called appropriately by setting up callbacks which increment + // counters and then returning those stats from a gRPC service. The client's interactions with + // the service drive the callbacks. + + let stats = DebugCallbackStats() + let serverDebug = HTTP2ServerTransport.Config.ChannelDebuggingCallbacks( + onBindTCPListener: { channel in + stats.tcpListenersBound.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() + }, + onAcceptTCPConnection: { channel in + stats.tcpConnectionsAccepted.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() + }, + onAcceptHTTP2Stream: { channel in + stats.http2StreamsAccepted.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() + } + ) + + let clientDebug = HTTP2ClientTransport.Config.ChannelDebuggingCallbacks( + onCreateTCPConnection: { channel in + stats.tcpConnectionsCreated.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() + }, + onCreateHTTP2Stream: { channel in + stats.http2StreamsCreated.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() + } + ) + + // For each server have the client create this many connections. + let connectionsPerServer = 5 + // For each connection have the client create this many streams. + let streamsPerConnection = 3 + + try await withGRPCServer( + transport: self.makeServerTransport( + kind: serverKind, + address: .ipv4(host: "127.0.0.1", port: 0), + debug: serverDebug + ), + services: [StatsService(stats: stats)] + ) { server in + let address = try await server.listeningAddress!.ipv4! + for connectionNumber in 1 ... connectionsPerServer { + try await withGRPCClient( + transport: self.makeClientTransport( + kind: clientKind, + target: .ipv4(host: address.host, port: address.port), + debug: clientDebug + ) + ) { client in + let statsClient = StatsClient(wrapping: client) + + // Create a few streams per connection. + for streamNumber in 1 ... streamsPerConnection { + let streamCount = (connectionNumber - 1) * streamsPerConnection + streamNumber + + let stats = try await statsClient.getStats() + #expect(stats.server.tcpListenersBound == 1) + #expect(stats.server.tcpConnectionsAccepted == connectionNumber) + #expect(stats.server.http2StreamsAccepted == streamCount) + + #expect(stats.client.tcpConnectionsCreated == connectionNumber) + #expect(stats.client.http2StreamsCreated == streamCount) + } + } + } + } + } + + private func makeServerTransport( + kind: TransportKind, + address: SocketAddress, + debug: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks + ) -> NIOServerTransport { + switch kind { + case .posix: + return NIOServerTransport( + .http2NIOPosix( + address: address, + transportSecurity: .plaintext, + config: .defaults { + $0.channelDebuggingCallbacks = debug + } + ) + ) + #if canImport(Network) + case .transportServices: + return NIOServerTransport( + .http2NIOTS( + address: address, + transportSecurity: .plaintext, + config: .defaults { + $0.channelDebuggingCallbacks = debug + } + ) + ) + #endif + } + } + + private func makeClientTransport( + kind: TransportKind, + target: any ResolvableTarget, + debug: HTTP2ClientTransport.Config.ChannelDebuggingCallbacks + ) throws -> NIOClientTransport { + switch kind { + case .posix: + return NIOClientTransport( + try .http2NIOPosix( + target: target, + transportSecurity: .plaintext, + config: .defaults { + $0.channelDebuggingCallbacks = debug + } + ) + ) + #if canImport(Network) + case .transportServices: + return NIOClientTransport( + try .http2NIOTS( + target: target, + transportSecurity: .plaintext, + config: .defaults { + $0.channelDebuggingCallbacks = debug + } + ) + ) + #endif + } + } +} diff --git a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTLSEnabledTests.swift b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTLSEnabledTests.swift index fac2c42..6c45cc1 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTLSEnabledTests.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTLSEnabledTests.swift @@ -233,21 +233,6 @@ struct HTTP2TransportTLSEnabledTests { case clientError(cause: any Error) } - enum TransportKind: Sendable { - case posix - #if canImport(Network) - case transportServices - #endif - - static var supported: [TransportKind] { - #if canImport(Network) - return [.posix, .transportServices] - #else - return [.posix] - #endif - } - } - struct Config { var security: Security var transport: Transport diff --git a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift index 98c1cd3..0f31ead 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift @@ -23,22 +23,8 @@ import XCTest final class HTTP2TransportTests: XCTestCase { // A combination of client and server transport kinds. struct Transport: Sendable, CustomStringConvertible { - var server: Kind - var client: Kind - - enum Kind: Sendable, CustomStringConvertible { - case posix - case niots - - var description: String { - switch self { - case .posix: - return "NIOPosix" - case .niots: - return "NIOTS" - } - } - } + var server: TransportKind + var client: TransportKind var description: String { "server=\(self.server) client=\(self.client)" @@ -105,8 +91,8 @@ final class HTTP2TransportTests: XCTestCase { } func forEachClientAndHTTPStatusCodeServer( - _ kind: [Transport.Kind] = [.posix, .niots], - _ execute: (ControlClient, Transport.Kind) async throws -> Void + _ kind: [TransportKind] = TransportKind.supported, + _ execute: (ControlClient, TransportKind) async throws -> Void ) async throws { for clientKind in kind { try await withThrowingTaskGroup(of: Void.self) { group in @@ -141,7 +127,7 @@ final class HTTP2TransportTests: XCTestCase { private func runServer( in group: inout ThrowingTaskGroup, address: SocketAddress, - kind: Transport.Kind, + kind: TransportKind, enableControlService: Bool, compression: CompressionAlgorithmSet ) async throws -> (GRPCServer, GRPCNIOTransportCore.SocketAddress) { @@ -169,8 +155,8 @@ final class HTTP2TransportTests: XCTestCase { let address = try await server.listeningAddress! return (server, address) - case .niots: - #if canImport(Network) + #if canImport(Network) + case .transportServices: let server = GRPCServer( transport: NIOServerTransport( .http2NIOTS( @@ -190,14 +176,12 @@ final class HTTP2TransportTests: XCTestCase { let address = try await server.listeningAddress! return (server, address) - #else - throw XCTSkip("Transport not supported on this platform") - #endif + #endif } } private func makeClient( - kind: Transport.Kind, + kind: TransportKind, target: any ResolvableTarget, compression: CompressionAlgorithm, enabledCompression: CompressionAlgorithmSet @@ -219,8 +203,8 @@ final class HTTP2TransportTests: XCTestCase { ) transport = NIOClientTransport(posix) - case .niots: - #if canImport(Network) + #if canImport(Network) + case .transportServices: var serviceConfig = ServiceConfig() serviceConfig.loadBalancingConfig = [.roundRobin] let transportServices = try HTTP2ClientTransport.TransportServices( @@ -233,9 +217,7 @@ final class HTTP2TransportTests: XCTestCase { serviceConfig: serviceConfig ) transport = NIOClientTransport(transportServices) - #else - throw XCTSkip("Transport not supported on this platform") - #endif + #endif } return GRPCClient(transport: transport, interceptors: [PeerInfoClientInterceptor()]) @@ -1699,12 +1681,11 @@ final class HTTP2TransportTests: XCTestCase { } extension [HTTP2TransportTests.Transport] { - static let supported = [ - HTTP2TransportTests.Transport(server: .posix, client: .posix), - HTTP2TransportTests.Transport(server: .niots, client: .niots), - HTTP2TransportTests.Transport(server: .niots, client: .posix), - HTTP2TransportTests.Transport(server: .posix, client: .niots), - ] + static let supported: [HTTP2TransportTests.Transport] = TransportKind.allCases.flatMap { server in + TransportKind.allCases.map { client in + HTTP2TransportTests.Transport(server: server, client: client) + } + } } extension ControlInput { diff --git a/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/JSONCoding.swift b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/JSONCoding.swift new file mode 100644 index 0000000..ed27c57 --- /dev/null +++ b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/JSONCoding.swift @@ -0,0 +1,43 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore + +#if canImport(FoundationEssentials) +import struct FoundationEssentials.Data +import class FoundationEssentials.JSONEncoder +import class FoundationEssentials.JSONDecoder +#else +import struct Foundation.Data +import class Foundation.JSONEncoder +import class Foundation.JSONDecoder +#endif + +struct JSONCoder: MessageSerializer, MessageDeserializer { + func serialize(_ message: Message) throws -> Bytes { + let json = JSONEncoder() + let data = try json.encode(message) + return Bytes(data) + } + + func deserialize(_ serializedMessageBytes: Bytes) throws -> Message { + let json = JSONDecoder() + let data = serializedMessageBytes.withUnsafeBytes { + Data($0) + } + return try json.decode(Message.self, from: data) + } +} diff --git a/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/StatsService.swift b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/StatsService.swift new file mode 100644 index 0000000..2ffdadd --- /dev/null +++ b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/StatsService.swift @@ -0,0 +1,121 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore +import Synchronization + +final class DebugCallbackStats: Sendable { + let tcpListenersBound: Atomic + let tcpConnectionsAccepted: Atomic + let tcpConnectionsCreated: Atomic + + let http2StreamsAccepted: Atomic + let http2StreamsCreated: Atomic + + init() { + self.tcpListenersBound = Atomic(0) + self.tcpConnectionsAccepted = Atomic(0) + self.tcpConnectionsCreated = Atomic(0) + + self.http2StreamsAccepted = Atomic(0) + self.http2StreamsCreated = Atomic(0) + } + + var serverStats: GetStatsResponse.Server { + GetStatsResponse.Server( + tcpListenersBound: self.tcpListenersBound.load(ordering: .sequentiallyConsistent), + tcpConnectionsAccepted: self.tcpConnectionsAccepted.load(ordering: .sequentiallyConsistent), + http2StreamsAccepted: self.http2StreamsAccepted.load(ordering: .sequentiallyConsistent) + ) + } + + var clientStats: GetStatsResponse.Client { + GetStatsResponse.Client( + tcpConnectionsCreated: self.tcpConnectionsCreated.load(ordering: .sequentiallyConsistent), + http2StreamsCreated: self.http2StreamsCreated.load(ordering: .sequentiallyConsistent) + ) + } +} + +struct StatsService { + private let stats: DebugCallbackStats + + init(stats: DebugCallbackStats) { + self.stats = stats + } + + func getStats() async throws -> GetStatsResponse { + GetStatsResponse(server: self.stats.serverStats, client: self.stats.clientStats) + } +} + +extension StatsService: RegistrableRPCService { + func registerMethods(with router: inout RPCRouter) { + router.registerHandler( + forMethod: .getStats, + deserializer: JSONCoder(), + serializer: JSONCoder() + ) { request, context in + _ = try await ServerRequest(stream: request) + let response = try await self.getStats() + return StreamingServerResponse { + try await $0.write(response) + return [:] + } + } + } +} + +struct StatsClient { + private let underlying: GRPCClient + + init(wrapping underlying: GRPCClient) { + self.underlying = underlying + } + + func getStats() async throws -> GetStatsResponse { + try await self.underlying.unary( + request: ClientRequest(message: GetStatsRequest()), + descriptor: .getStats, + serializer: JSONCoder(), + deserializer: JSONCoder(), + options: .defaults + ) { + try $0.message + } + } +} + +extension MethodDescriptor { + static let getStats = Self(fullyQualifiedService: "StatsService", method: "GetStats") +} + +struct GetStatsRequest: Codable, Hashable {} +struct GetStatsResponse: Codable, Hashable { + struct Server: Codable, Hashable { + var tcpListenersBound: Int + var tcpConnectionsAccepted: Int + var http2StreamsAccepted: Int + } + + struct Client: Codable, Hashable { + var tcpConnectionsCreated: Int + var http2StreamsCreated: Int + } + + var server: Server + var client: Client +} diff --git a/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/TransportKind.swift b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/TransportKind.swift index 4f6afac..a2369bb 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/TransportKind.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/TransportKind.swift @@ -17,6 +17,17 @@ import GRPCCore import GRPCNIOTransportHTTP2 +enum TransportKind: CaseIterable, Hashable, Sendable { + case posix + #if canImport(Network) + case transportServices + #endif + + static var supported: [Self] { + Self.allCases + } +} + enum NIOClientTransport: ClientTransport { case posix(HTTP2ClientTransport.Posix) #if canImport(Network)