From 55ee063dd4b9a1c80b0869397c04e4c1c9b0c287 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Thu, 9 Jan 2025 08:39:08 +0000 Subject: [PATCH 1/7] Add debug channel callbacks Motivation: Sometimes it can be useful to modify the channel pipeline or inspect the channel when debugging. v1 has hooks to allow users to do this which proved helpful a number of times. v2 should offer similar behaviour. Modifications: - Add 'DebugChannelCallbacks' to the client and server config and wire them up appropriately. - Add tests. Extract 'TransportKind' from some tests as it was being recreated a number of times. Result: Users can modify the channel pipeline if they need to. --- .../Client/Connection/Connection.swift | 12 +- .../Client/Connection/ConnectionFactory.swift | 7 +- .../Client/HTTP2ClientTransport.swift | 29 ++++ .../EventLoopFuture+ChannelInitializer.swift | 56 +++++++ .../Internal/NIOChannelPipeline+GRPC.swift | 6 +- .../Server/HTTP2ServerTransport.swift | 34 ++++ .../HTTP2ClientTransport+Posix.swift | 19 ++- .../HTTP2ServerTransport+Posix.swift | 24 ++- ...TP2ClientTransport+TransportServices.swift | 19 ++- ...TP2ServerTransport+TransportServices.swift | 32 +++- .../Utilities/HTTP2Connectors.swift | 7 +- .../HTTP2ServerTransport+DebugTests.swift | 145 ++++++++++++++++++ .../HTTP2TransportTLSEnabledTests.swift | 15 -- .../HTTP2TransportTests.swift | 48 ++---- .../Test Utilities/JSONCoding.swift | 41 +++++ .../Test Utilities/StatsService.swift | 121 +++++++++++++++ .../Test Utilities/TransportKind.swift | 26 ++++ 17 files changed, 566 insertions(+), 75 deletions(-) create mode 100644 Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift create mode 100644 Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift create mode 100644 Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/JSONCoding.swift create mode 100644 Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/StatsService.swift create mode 100644 Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/TransportKind.swift diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift b/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift index c366194..c89fbb2 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift @@ -202,10 +202,10 @@ package final class Connection: Sendable { descriptor: MethodDescriptor, options: CallOptions ) async throws -> Stream { - let (multiplexer, scheme) = try self.state.withLock { state in + let (multiplexer, scheme, onCreateStream) = try self.state.withLock { state in switch state { case .connected(let connected): - return (connected.multiplexer, connected.scheme) + return (connected.multiplexer, connected.scheme, connected.onCreateHTTP2Stream) case .notConnected, .closing, .closed: throw RPCError(code: .unavailable, message: "subchannel isn't ready") } @@ -243,7 +243,10 @@ package final class Connection: Sendable { outboundType: RPCRequestPart.self ) ) - } + }.runCallbackIfSet( + on: channel, + callback: onCreateStream + ) } return Stream(wrapping: stream, descriptor: descriptor) @@ -461,11 +464,14 @@ extension Connection { var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer /// Whether the connection is plaintext, `false` implies TLS is being used. var scheme: Scheme + /// A user-provided callback to call after creating the stream. + var onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)? init(_ connection: HTTP2Connection) { self.channel = connection.channel self.multiplexer = connection.multiplexer self.scheme = connection.isPlaintext ? .http : .https + self.onCreateHTTP2Stream = connection.onCreateHTTP2Stream } } diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift b/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift index d03deb0..bfc2d7a 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift @@ -37,16 +37,21 @@ package struct HTTP2Connection: Sendable { /// An HTTP/2 stream multiplexer. var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer + /// A callback which is invoked when creating an HTTP/2 stream. + var onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)? + /// Whether the connection is insecure (i.e. plaintext). var isPlaintext: Bool package init( channel: NIOAsyncChannel, multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer, - isPlaintext: Bool + isPlaintext: Bool, + onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)? ) { self.channel = channel self.multiplexer = multiplexer self.isPlaintext = isPlaintext + self.onCreateHTTP2Stream = onCreateHTTP2Stream } } diff --git a/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift b/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift index 4bfbf4f..d8ae5ca 100644 --- a/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift +++ b/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift @@ -15,6 +15,7 @@ */ public import GRPCCore +public import NIOCore /// A namespace for the HTTP/2 client transport. public enum HTTP2ClientTransport {} @@ -164,4 +165,32 @@ extension HTTP2ClientTransport.Config { Self(maxFrameSize: 1 << 14, targetWindowSize: 8 * 1024 * 1024, authority: nil) } } + + /// A set of callbacks used for debugging purposes. + /// + /// The callbacks give you access to the underlying NIO `Channel` after gRPC's initializer has + /// run for each `Channel`. These callbacks are intended for debugging purposes. + /// + /// - Important: You should be very careful when implementing these callbacks as they may have + /// unexpected side effects on your gRPC application. + public struct ChannelDebuggingCallbacks: Sendable { + /// A callback invoked with each new TCP connection. + public var onCreateTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)? + + /// A callback invoked with each new HTTP/2 stream. + public var onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? + + public init( + onCreateTCPConnection: (@Sendable (_ channel: any Channel) -> Void)?, + onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) -> Void)? + ) { + self.onCreateTCPConnection = onCreateTCPConnection + self.onCreateHTTP2Stream = onCreateHTTP2Stream + } + + /// Default values; no callbacks are set. + public static var defaults: Self { + Self(onCreateTCPConnection: nil, onCreateHTTP2Stream: nil) + } + } } diff --git a/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift b/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift new file mode 100644 index 0000000..7df26b5 --- /dev/null +++ b/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift @@ -0,0 +1,56 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package import NIOCore + +extension EventLoopFuture where Value: Sendable { + package func runCallbackIfSet( + on channel: any Channel, + callback: (@Sendable (any Channel) async throws -> Void)? + ) -> EventLoopFuture { + guard let initializer = callback else { return self } + + // The code below code is equivalent to the following but avoids allocating an extra future. + // + // return self.flatMap { value in + // self.eventLoop.makeFutureWithTask { + // try await userInitializer(channel) + // }.map { + // value + // } + // } + // + let promise = self.eventLoop.makePromise(of: Value.self) + self.whenComplete { result in + switch result { + case .success(let value): + Task { + do { + try await initializer(channel) + promise.succeed(value) + } catch { + promise.fail(error) + } + } + + case .failure(let error): + promise.fail(error) + } + } + + return promise.futureResult + } +} diff --git a/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift b/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift index 89a95ae..1fccd54 100644 --- a/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift +++ b/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift @@ -31,6 +31,7 @@ extension ChannelPipeline.SynchronousOperations { connectionConfig: HTTP2ServerTransport.Config.Connection, http2Config: HTTP2ServerTransport.Config.HTTP2, rpcConfig: HTTP2ServerTransport.Config.RPC, + debugConfig: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks, requireALPN: Bool, scheme: Scheme ) throws -> (HTTP2ConnectionChannel, HTTP2StreamMultiplexer) { @@ -99,7 +100,10 @@ extension ChannelPipeline.SynchronousOperations { wrappingChannelSynchronously: streamChannel ) return (asyncStreamChannel, methodDescriptorPromise.futureResult) - } + }.runCallbackIfSet( + on: streamChannel, + callback: debugConfig.onAcceptHTTP2Stream + ) } try self.addHandler(serverConnectionHandler) diff --git a/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift b/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift index 3c07457..80f145b 100644 --- a/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift +++ b/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift @@ -15,6 +15,7 @@ */ public import GRPCCore +public import NIOCore internal import NIOHTTP2 /// A namespace for the HTTP/2 server transport. @@ -185,4 +186,37 @@ extension HTTP2ServerTransport.Config { Self(maxRequestPayloadSize: 4 * 1024 * 1024) } } + + /// A set of callbacks used for debugging purposes. + /// + /// The callbacks give you access to the underlying NIO `Channel` after gRPC's initializer has + /// run for each `Channel`. These callbacks are intended for debugging purposes. + /// + /// - Important: You should be very careful when implementing these callbacks as they may have + /// unexpected side effects on your gRPC application. + public struct ChannelDebuggingCallbacks: Sendable { + /// A callback invoked when the server starts listening for new TCP connections. + public var onBindTCPListener: (@Sendable (_ channel: any Channel) async throws -> Void)? + + /// A callback invoked with each new accepted TPC connection. + public var onAcceptTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)? + + /// A callback invoked with each accepted HTTP/2 stream. + public var onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? + + public init( + onBindTCPListener: (@Sendable (_ channel: any Channel) -> Void)?, + onAcceptTCPConnection: (@Sendable (_ channel: any Channel) -> Void)?, + onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) -> Void)? + ) { + self.onBindTCPListener = onBindTCPListener + self.onAcceptTCPConnection = onAcceptTCPConnection + self.onAcceptHTTP2Stream = onAcceptHTTP2Stream + } + + /// Default values; no callbacks are set. + public static var defaults: Self { + Self(onBindTCPListener: nil, onAcceptTCPConnection: nil, onAcceptHTTP2Stream: nil) + } + } } diff --git a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift index 0326f56..a68ecf7 100644 --- a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift +++ b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift @@ -182,13 +182,17 @@ extension HTTP2ClientTransport.Posix { channel: channel, config: GRPCChannel.Config(posix: self.config) ) - } + }.runCallbackIfSet( + on: channel, + callback: self.config.channelDebuggingCallbacks.onCreateTCPConnection + ) } return HTTP2Connection( channel: channel, multiplexer: multiplexer, - isPlaintext: self.isPlainText + isPlaintext: self.isPlainText, + onCreateHTTP2Stream: self.config.channelDebuggingCallbacks.onCreateHTTP2Stream ) } } @@ -208,6 +212,9 @@ extension HTTP2ClientTransport.Posix { /// Compression configuration. public var compression: HTTP2ClientTransport.Config.Compression + /// Channel callbacks for debugging. + public var channelDebuggingCallbacks: HTTP2ClientTransport.Config.ChannelDebuggingCallbacks + /// Creates a new connection configuration. /// /// - Parameters: @@ -215,18 +222,21 @@ extension HTTP2ClientTransport.Posix { /// - backoff: Backoff configuration. /// - connection: Connection configuration. /// - compression: Compression configuration. + /// - channelDebuggingCallbacks: Channel callbacks for debugging. /// /// - SeeAlso: ``defaults(configure:)`` and ``defaults``. public init( http2: HTTP2ClientTransport.Config.HTTP2, backoff: HTTP2ClientTransport.Config.Backoff, connection: HTTP2ClientTransport.Config.Connection, - compression: HTTP2ClientTransport.Config.Compression + compression: HTTP2ClientTransport.Config.Compression, + channelDebuggingCallbacks: HTTP2ClientTransport.Config.ChannelDebuggingCallbacks ) { self.http2 = http2 self.connection = connection self.backoff = backoff self.compression = compression + self.channelDebuggingCallbacks = channelDebuggingCallbacks } /// Default configuration. @@ -245,7 +255,8 @@ extension HTTP2ClientTransport.Posix { http2: .defaults, backoff: .defaults, connection: .defaults, - compression: .defaults + compression: .defaults, + channelDebuggingCallbacks: .defaults ) configure(&config) return config diff --git a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift index 75e23f7..2841dbc 100644 --- a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift +++ b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift @@ -85,8 +85,11 @@ extension HTTP2ServerTransport { let quiescingHandler = serverQuiescingHelper.makeServerChannelHandler( channel: channel ) - return try channel.pipeline.syncOperations.addHandler(quiescingHandler) - } + try channel.pipeline.syncOperations.addHandler(quiescingHandler) + }.runCallbackIfSet( + on: channel, + callback: self.config.channelDebuggingCallbacks.onBindTCPListener + ) } .bind(to: address) { channel in channel.eventLoop.makeCompletedFuture { @@ -113,10 +116,14 @@ extension HTTP2ServerTransport { connectionConfig: self.config.connection, http2Config: self.config.http2, rpcConfig: self.config.rpc, + debugConfig: self.config.channelDebuggingCallbacks, requireALPN: requireALPN, scheme: scheme ) - } + }.runCallbackIfSet( + on: channel, + callback: self.config.channelDebuggingCallbacks.onAcceptTCPConnection + ) } return serverChannel @@ -191,6 +198,9 @@ extension HTTP2ServerTransport.Posix { /// RPC configuration. public var rpc: HTTP2ServerTransport.Config.RPC + /// Channel callbacks for debugging. + public var channelDebuggingCallbacks: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks + /// Construct a new `Config`. /// /// - Parameters: @@ -198,18 +208,21 @@ extension HTTP2ServerTransport.Posix { /// - rpc: RPC configuration. /// - connection: Connection configuration. /// - compression: Compression configuration. + /// - channelDebuggingCallbacks: Channel callbacks for debugging. /// /// - SeeAlso: ``defaults(configure:)`` and ``defaults``. public init( http2: HTTP2ServerTransport.Config.HTTP2, rpc: HTTP2ServerTransport.Config.RPC, connection: HTTP2ServerTransport.Config.Connection, - compression: HTTP2ServerTransport.Config.Compression + compression: HTTP2ServerTransport.Config.Compression, + channelDebuggingCallbacks: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks ) { self.compression = compression self.connection = connection self.http2 = http2 self.rpc = rpc + self.channelDebuggingCallbacks = channelDebuggingCallbacks } /// Default configuration. @@ -228,7 +241,8 @@ extension HTTP2ServerTransport.Posix { http2: .defaults, rpc: .defaults, connection: .defaults, - compression: .defaults + compression: .defaults, + channelDebuggingCallbacks: .defaults ) configure(&config) return config diff --git a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift index e8baced..03f5c56 100644 --- a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift +++ b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift @@ -179,13 +179,17 @@ extension HTTP2ClientTransport.TransportServices { channel: channel, config: GRPCChannel.Config(transportServices: self.config) ) - } + }.runCallbackIfSet( + on: channel, + callback: self.config.channelDebuggingCallbacks.onCreateTCPConnection + ) } return HTTP2Connection( channel: channel, multiplexer: multiplexer, - isPlaintext: isPlainText + isPlaintext: isPlainText, + onCreateHTTP2Stream: self.config.channelDebuggingCallbacks.onCreateHTTP2Stream ) } } @@ -206,6 +210,9 @@ extension HTTP2ClientTransport.TransportServices { /// Compression configuration. public var compression: HTTP2ClientTransport.Config.Compression + /// Channel callbacks for debugging. + public var channelDebuggingCallbacks: HTTP2ClientTransport.Config.ChannelDebuggingCallbacks + /// Creates a new connection configuration. /// /// - Parameters: @@ -213,18 +220,21 @@ extension HTTP2ClientTransport.TransportServices { /// - backoff: Backoff configuration. /// - connection: Connection configuration. /// - compression: Compression configuration. + /// - channelDebuggingCallbacks: Channel callbacks for debugging. /// /// - SeeAlso: ``defaults(configure:)`` and ``defaults``. public init( http2: HTTP2ClientTransport.Config.HTTP2, backoff: HTTP2ClientTransport.Config.Backoff, connection: HTTP2ClientTransport.Config.Connection, - compression: HTTP2ClientTransport.Config.Compression + compression: HTTP2ClientTransport.Config.Compression, + channelDebuggingCallbacks: HTTP2ClientTransport.Config.ChannelDebuggingCallbacks ) { self.http2 = http2 self.connection = connection self.backoff = backoff self.compression = compression + self.channelDebuggingCallbacks = channelDebuggingCallbacks } /// Default configuration. @@ -243,7 +253,8 @@ extension HTTP2ClientTransport.TransportServices { http2: .defaults, backoff: .defaults, connection: .defaults, - compression: .defaults + compression: .defaults, + channelDebuggingCallbacks: .defaults ) configure(&config) return config diff --git a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift index 2d219ce..31c7d0c 100644 --- a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift +++ b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift @@ -59,25 +59,32 @@ extension HTTP2ServerTransport { try await bootstrap .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) .serverChannelInitializer { channel in - return channel.eventLoop.makeCompletedFuture { + channel.eventLoop.makeCompletedFuture { let quiescingHandler = serverQuiescingHelper.makeServerChannelHandler( channel: channel ) - return try channel.pipeline.syncOperations.addHandler(quiescingHandler) - } + try channel.pipeline.syncOperations.addHandler(quiescingHandler) + }.runCallbackIfSet( + on: channel, + callback: self.config.channelDebuggingCallbacks.onBindTCPListener + ) } .bind(to: address) { channel in - channel.eventLoop.makeCompletedFuture { - return try channel.pipeline.syncOperations.configureGRPCServerPipeline( + return channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.configureGRPCServerPipeline( channel: channel, compressionConfig: self.config.compression, connectionConfig: self.config.connection, http2Config: self.config.http2, rpcConfig: self.config.rpc, + debugConfig: self.config.channelDebuggingCallbacks, requireALPN: requireALPN, scheme: scheme ) - } + }.runCallbackIfSet( + on: channel, + callback: self.config.channelDebuggingCallbacks.onAcceptTCPConnection + ) } return serverChannel @@ -152,22 +159,30 @@ extension HTTP2ServerTransport.TransportServices { /// RPC configuration. public var rpc: HTTP2ServerTransport.Config.RPC + /// Channel callbacks for debugging. + public var channelDebuggingCallbacks: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks + /// Construct a new `Config`. /// - Parameters: /// - compression: Compression configuration. /// - connection: Connection configuration. /// - http2: HTTP2 configuration. /// - rpc: RPC configuration. + /// - channelDebuggingCallbacks: Channel callbacks for debugging. + /// + /// - SeeAlso: ``defaults(configure:)`` and ``defaults``. public init( compression: HTTP2ServerTransport.Config.Compression, connection: HTTP2ServerTransport.Config.Connection, http2: HTTP2ServerTransport.Config.HTTP2, - rpc: HTTP2ServerTransport.Config.RPC + rpc: HTTP2ServerTransport.Config.RPC, + channelDebuggingCallbacks: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks ) { self.compression = compression self.connection = connection self.http2 = http2 self.rpc = rpc + self.channelDebuggingCallbacks = channelDebuggingCallbacks } public static var defaults: Self { @@ -185,7 +200,8 @@ extension HTTP2ServerTransport.TransportServices { compression: .defaults, connection: .defaults, http2: .defaults, - rpc: .defaults + rpc: .defaults, + channelDebuggingCallbacks: .defaults ) configure(&config) return config diff --git a/Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/HTTP2Connectors.swift b/Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/HTTP2Connectors.swift index f23631a..7cbafa9 100644 --- a/Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/HTTP2Connectors.swift +++ b/Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/HTTP2Connectors.swift @@ -135,7 +135,12 @@ struct NIOPosixConnector: HTTP2Connector { wrappingChannelSynchronously: channel ) - return HTTP2Connection(channel: asyncChannel, multiplexer: multiplexer, isPlaintext: true) + return HTTP2Connection( + channel: asyncChannel, + multiplexer: multiplexer, + isPlaintext: true, + onCreateHTTP2Stream: nil + ) } } } diff --git a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift new file mode 100644 index 0000000..b8760c1 --- /dev/null +++ b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift @@ -0,0 +1,145 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore +import GRPCNIOTransportHTTP2 +import Testing + +@Suite("ChannelDebugCallbacks") +struct ChannelDebugCallbackTests { + @Test(arguments: TransportKind.allCases, TransportKind.allCases) + func debugCallbacksAreCalled(serverKind: TransportKind, clientKind: TransportKind) async throws { + // Validates the callbacks are called appropriately by setting up callbacks which increment + // counters and then returning those stats from a gRPC service. The clients interactions with + // the service drive the callbacks. + + let stats = DebugCallbackStats() + let serverDebug = HTTP2ServerTransport.Config.ChannelDebuggingCallbacks( + onBindTCPListener: { _ in + stats.tcpListenersBound.add(1, ordering: .sequentiallyConsistent) + }, + onAcceptTCPConnection: { _ in + stats.tcpConnectionsAccepted.add(1, ordering: .sequentiallyConsistent) + }, + onAcceptHTTP2Stream: { _ in + stats.http2StreamsAccepted.add(1, ordering: .sequentiallyConsistent) + } + ) + + let clientDebug = HTTP2ClientTransport.Config.ChannelDebuggingCallbacks( + onCreateTCPConnection: { _ in + stats.tcpConnectionsCreated.add(1, ordering: .sequentiallyConsistent) + }, + onCreateHTTP2Stream: { _ in + stats.http2StreamsCreated.add(1, ordering: .sequentiallyConsistent) + } + ) + + // For each server have the client create this many connections. + let connectionsPerServer = 5 + // For each connection have the client create this many streams. + let streamsPerConnection = 3 + + try await withGRPCServer( + transport: self.makeServerTransport( + kind: serverKind, + address: .ipv4(host: "127.0.0.1", port: 0), + debug: serverDebug + ), + services: [StatsService(stats: stats)] + ) { server in + let address = try await server.listeningAddress!.ipv4! + for connectionNumber in 1 ... connectionsPerServer { + try await withGRPCClient( + transport: self.makeClientTransport( + kind: clientKind, + target: .ipv4(host: address.host, port: address.port), + debug: clientDebug + ) + ) { client in + let statsClient = StatsClient(wrapping: client) + + // Create a few streams per connection. + for streamNumber in 1 ... streamsPerConnection { + let streamCount = (connectionNumber - 1) * streamsPerConnection + streamNumber + + let stats = try await statsClient.getStats() + #expect(stats.server.tcpListenersBound == 1) + #expect(stats.server.tcpConnectionsAccepted == connectionNumber) + #expect(stats.server.http2StreamsAccepted == streamCount) + + #expect(stats.client.tcpConnectionsCreated == connectionNumber) + #expect(stats.client.http2StreamsCreated == streamCount) + } + } + } + } + } + + private func makeServerTransport( + kind: TransportKind, + address: SocketAddress, + debug: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks + ) -> any ServerTransport { + switch kind { + case .posix: + return .http2NIOPosix( + address: address, + transportSecurity: .plaintext, + config: .defaults { + $0.channelDebuggingCallbacks = debug + } + ) + #if canImport(Network) + case .transportServices: + return .http2NIOTS( + address: address, + transportSecurity: .plaintext, + config: .defaults { + $0.channelDebuggingCallbacks = debug + } + ) + #endif + } + } + + private func makeClientTransport( + kind: TransportKind, + target: any ResolvableTarget, + debug: HTTP2ClientTransport.Config.ChannelDebuggingCallbacks + ) throws -> any ClientTransport { + switch kind { + case .posix: + return try .http2NIOPosix( + target: target, + transportSecurity: .plaintext, + config: .defaults { + $0.channelDebuggingCallbacks = debug + } + ) + #if canImport(Network) + case .transportServices: + return try .http2NIOTS( + target: target, + transportSecurity: .plaintext, + config: .defaults { + $0.channelDebuggingCallbacks = debug + } + ) + #endif + } + } +} diff --git a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTLSEnabledTests.swift b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTLSEnabledTests.swift index f41ec08..09aaa27 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTLSEnabledTests.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTLSEnabledTests.swift @@ -233,21 +233,6 @@ struct HTTP2TransportTLSEnabledTests { case clientError(cause: any Error) } - enum TransportKind: Sendable { - case posix - #if canImport(Network) - case transportServices - #endif - - static var supported: [TransportKind] { - #if canImport(Network) - return [.posix, .transportServices] - #else - return [.posix] - #endif - } - } - struct Config { var security: Security var transport: Transport diff --git a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift index d3b901a..8e62858 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift @@ -23,22 +23,8 @@ import XCTest final class HTTP2TransportTests: XCTestCase { // A combination of client and server transport kinds. struct Transport: Sendable, CustomStringConvertible { - var server: Kind - var client: Kind - - enum Kind: Sendable, CustomStringConvertible { - case posix - case niots - - var description: String { - switch self { - case .posix: - return "NIOPosix" - case .niots: - return "NIOTS" - } - } - } + var server: TransportKind + var client: TransportKind var description: String { "server=\(self.server) client=\(self.client)" @@ -101,8 +87,8 @@ final class HTTP2TransportTests: XCTestCase { } func forEachClientAndHTTPStatusCodeServer( - _ kind: [Transport.Kind] = [.posix, .niots], - _ execute: (ControlClient, Transport.Kind) async throws -> Void + _ kind: [TransportKind] = [.posix, .transportServices], + _ execute: (ControlClient, TransportKind) async throws -> Void ) async throws { for clientKind in kind { try await withThrowingTaskGroup(of: Void.self) { group in @@ -137,7 +123,7 @@ final class HTTP2TransportTests: XCTestCase { private func runServer( in group: inout ThrowingTaskGroup, address: SocketAddress, - kind: Transport.Kind, + kind: TransportKind, enableControlService: Bool, compression: CompressionAlgorithmSet ) async throws -> (GRPCServer, GRPCNIOTransportCore.SocketAddress) { @@ -163,8 +149,8 @@ final class HTTP2TransportTests: XCTestCase { let address = try await server.listeningAddress! return (server, address) - case .niots: - #if canImport(Network) + #if canImport(Network) + case .transportServices: let server = GRPCServer( transport: .http2NIOTS( address: address, @@ -182,14 +168,12 @@ final class HTTP2TransportTests: XCTestCase { let address = try await server.listeningAddress! return (server, address) - #else - throw XCTSkip("Transport not supported on this platform") - #endif + #endif } } private func makeClient( - kind: Transport.Kind, + kind: TransportKind, target: any ResolvableTarget, compression: CompressionAlgorithm, enabledCompression: CompressionAlgorithmSet @@ -210,8 +194,8 @@ final class HTTP2TransportTests: XCTestCase { serviceConfig: serviceConfig ) - case .niots: - #if canImport(Network) + #if canImport(Network) + case .transportServices: var serviceConfig = ServiceConfig() serviceConfig.loadBalancingConfig = [.roundRobin] transport = try HTTP2ClientTransport.TransportServices( @@ -223,9 +207,7 @@ final class HTTP2TransportTests: XCTestCase { }, serviceConfig: serviceConfig ) - #else - throw XCTSkip("Transport not supported on this platform") - #endif + #endif } return GRPCClient(transport: transport) @@ -1661,9 +1643,9 @@ final class HTTP2TransportTests: XCTestCase { extension [HTTP2TransportTests.Transport] { static let supported = [ HTTP2TransportTests.Transport(server: .posix, client: .posix), - HTTP2TransportTests.Transport(server: .niots, client: .niots), - HTTP2TransportTests.Transport(server: .niots, client: .posix), - HTTP2TransportTests.Transport(server: .posix, client: .niots), + HTTP2TransportTests.Transport(server: .transportServices, client: .transportServices), + HTTP2TransportTests.Transport(server: .transportServices, client: .posix), + HTTP2TransportTests.Transport(server: .posix, client: .transportServices), ] } diff --git a/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/JSONCoding.swift b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/JSONCoding.swift new file mode 100644 index 0000000..d8a01bc --- /dev/null +++ b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/JSONCoding.swift @@ -0,0 +1,41 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore + +#if canImport(FoundationEssentials) +import struct FoundationEssentials.Data +import class FoundationEssentials.JSONEncoder +import class FoundationEssentials.JSONDecoder +#else +import struct Foundation.Data +import class Foundation.JSONEncoder +import class Foundation.JSONDecoder +#endif + +struct JSONCoder: MessageSerializer, MessageDeserializer { + func serialize(_ message: Message) throws -> [UInt8] { + let json = JSONEncoder() + let bytes = try json.encode(message) + return Array(bytes) + } + + func deserialize(_ serializedMessageBytes: [UInt8]) throws -> Message { + let json = JSONDecoder() + let data = Data(serializedMessageBytes) + return try json.decode(Message.self, from: data) + } +} diff --git a/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/StatsService.swift b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/StatsService.swift new file mode 100644 index 0000000..e911109 --- /dev/null +++ b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/StatsService.swift @@ -0,0 +1,121 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore +import Synchronization + +final class DebugCallbackStats: Sendable { + let tcpListenersBound: Atomic + let tcpConnectionsAccepted: Atomic + let tcpConnectionsCreated: Atomic + + let http2StreamsAccepted: Atomic + let http2StreamsCreated: Atomic + + init() { + self.tcpListenersBound = Atomic(0) + self.tcpConnectionsAccepted = Atomic(0) + self.tcpConnectionsCreated = Atomic(0) + + self.http2StreamsAccepted = Atomic(0) + self.http2StreamsCreated = Atomic(0) + } + + var serverStats: GetStatsResponse.Server { + GetStatsResponse.Server( + tcpListenersBound: self.tcpListenersBound.load(ordering: .sequentiallyConsistent), + tcpConnectionsAccepted: self.tcpConnectionsAccepted.load(ordering: .sequentiallyConsistent), + http2StreamsAccepted: self.http2StreamsAccepted.load(ordering: .sequentiallyConsistent) + ) + } + + var clientStats: GetStatsResponse.Client { + GetStatsResponse.Client( + tcpConnectionsCreated: self.tcpConnectionsCreated.load(ordering: .sequentiallyConsistent), + http2StreamsCreated: self.http2StreamsCreated.load(ordering: .sequentiallyConsistent) + ) + } +} + +struct StatsService { + private let stats: DebugCallbackStats + + init(stats: DebugCallbackStats) { + self.stats = stats + } + + func getStats() async throws -> GetStatsResponse { + GetStatsResponse(server: self.stats.serverStats, client: self.stats.clientStats) + } +} + +extension StatsService: RegistrableRPCService { + func registerMethods(with router: inout RPCRouter) { + router.registerHandler( + forMethod: .getStats, + deserializer: JSONCoder(), + serializer: JSONCoder() + ) { request, context in + _ = try await ServerRequest(stream: request) + let response = try await self.getStats() + return StreamingServerResponse { + try await $0.write(response) + return [:] + } + } + } +} + +struct StatsClient { + private let underlying: GRPCClient + + init(wrapping underlying: GRPCClient) { + self.underlying = underlying + } + + func getStats() async throws -> GetStatsResponse { + try await self.underlying.unary( + request: ClientRequest(message: GetStatsRequest()), + descriptor: .getStats, + serializer: JSONCoder(), + deserializer: JSONCoder(), + options: .defaults + ) { + try $0.message + } + } +} + +extension MethodDescriptor { + static let getStats = Self(fullyQualifiedService: "StatsService", method: "GetStats") +} + +struct GetStatsRequest: Codable, Hashable {} +struct GetStatsResponse: Codable, Hashable { + struct Server: Codable, Hashable { + var tcpListenersBound: Int + var tcpConnectionsAccepted: Int + var http2StreamsAccepted: Int + } + + struct Client: Codable, Hashable { + var tcpConnectionsCreated: Int + var http2StreamsCreated: Int + } + + var server: Server + var client: Client +} diff --git a/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/TransportKind.swift b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/TransportKind.swift new file mode 100644 index 0000000..9b52e48 --- /dev/null +++ b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/TransportKind.swift @@ -0,0 +1,26 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +enum TransportKind: CaseIterable, Hashable, Sendable { + case posix + #if canImport(Network) + case transportServices + #endif + + static var supported: [Self] { + Self.allCases + } +} From fd179752142ec060119535743e0b1804490498f6 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Thu, 9 Jan 2025 11:08:11 +0000 Subject: [PATCH 2/7] fix supported --- .../HTTP2TransportTests.swift | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift index 8e62858..7ce862d 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift @@ -1641,12 +1641,11 @@ final class HTTP2TransportTests: XCTestCase { } extension [HTTP2TransportTests.Transport] { - static let supported = [ - HTTP2TransportTests.Transport(server: .posix, client: .posix), - HTTP2TransportTests.Transport(server: .transportServices, client: .transportServices), - HTTP2TransportTests.Transport(server: .transportServices, client: .posix), - HTTP2TransportTests.Transport(server: .posix, client: .transportServices), - ] + static let supported: [HTTP2TransportTests.Transport] = TransportKind.allCases.flatMap { server in + TransportKind.allCases.map { client in + HTTP2TransportTests.Transport(server: server, client: client) + } + } } extension ControlInput { From 12978a168de0d4e8391ea2dc046c249297c52ad4 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Thu, 9 Jan 2025 11:12:46 +0000 Subject: [PATCH 3/7] fix another platform issue --- Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift index 7ce862d..fbbd32e 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift @@ -87,7 +87,7 @@ final class HTTP2TransportTests: XCTestCase { } func forEachClientAndHTTPStatusCodeServer( - _ kind: [TransportKind] = [.posix, .transportServices], + _ kind: [TransportKind] = TransportKind.supported, _ execute: (ControlClient, TransportKind) async throws -> Void ) async throws { for clientKind in kind { From 68c352a54678d44cab8c57a11536f01bab75f2f0 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Mon, 13 Jan 2025 17:37:12 +0000 Subject: [PATCH 4/7] Update Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift Co-authored-by: Gus Cairo --- .../HTTP2ServerTransport+DebugTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift index b8760c1..e8fdd22 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift @@ -23,7 +23,7 @@ struct ChannelDebugCallbackTests { @Test(arguments: TransportKind.allCases, TransportKind.allCases) func debugCallbacksAreCalled(serverKind: TransportKind, clientKind: TransportKind) async throws { // Validates the callbacks are called appropriately by setting up callbacks which increment - // counters and then returning those stats from a gRPC service. The clients interactions with + // counters and then returning those stats from a gRPC service. The client's interactions with // the service drive the callbacks. let stats = DebugCallbackStats() From b1380dd256e0b5b5b5be03f0751e146181fd9af9 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Mon, 13 Jan 2025 17:42:46 +0000 Subject: [PATCH 5/7] add missing async throws --- .../GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift | 4 ++-- .../GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift b/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift index d8ae5ca..0d166c0 100644 --- a/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift +++ b/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift @@ -181,8 +181,8 @@ extension HTTP2ClientTransport.Config { public var onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? public init( - onCreateTCPConnection: (@Sendable (_ channel: any Channel) -> Void)?, - onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) -> Void)? + onCreateTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)?, + onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? ) { self.onCreateTCPConnection = onCreateTCPConnection self.onCreateHTTP2Stream = onCreateHTTP2Stream diff --git a/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift b/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift index 80f145b..5c8d385 100644 --- a/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift +++ b/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift @@ -205,9 +205,9 @@ extension HTTP2ServerTransport.Config { public var onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? public init( - onBindTCPListener: (@Sendable (_ channel: any Channel) -> Void)?, - onAcceptTCPConnection: (@Sendable (_ channel: any Channel) -> Void)?, - onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) -> Void)? + onBindTCPListener: (@Sendable (_ channel: any Channel) async throws -> Void)?, + onAcceptTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)?, + onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? ) { self.onBindTCPListener = onBindTCPListener self.onAcceptTCPConnection = onAcceptTCPConnection From e2bdb702caa56e9459dbc0692d1ae809f0101cc2 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Fri, 17 Jan 2025 14:37:01 +0000 Subject: [PATCH 6/7] fixup --- .../HTTP2ServerTransport+DebugTests.swift | 60 +++++++++++-------- .../Test Utilities/JSONCoding.swift | 12 ++-- .../Test Utilities/StatsService.swift | 8 +-- 3 files changed, 45 insertions(+), 35 deletions(-) diff --git a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift index e8fdd22..d0ab6ec 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift @@ -93,24 +93,28 @@ struct ChannelDebugCallbackTests { kind: TransportKind, address: SocketAddress, debug: HTTP2ServerTransport.Config.ChannelDebuggingCallbacks - ) -> any ServerTransport { + ) -> NIOServerTransport { switch kind { case .posix: - return .http2NIOPosix( - address: address, - transportSecurity: .plaintext, - config: .defaults { - $0.channelDebuggingCallbacks = debug - } + return NIOServerTransport( + .http2NIOPosix( + address: address, + transportSecurity: .plaintext, + config: .defaults { + $0.channelDebuggingCallbacks = debug + } + ) ) #if canImport(Network) case .transportServices: - return .http2NIOTS( - address: address, - transportSecurity: .plaintext, - config: .defaults { - $0.channelDebuggingCallbacks = debug - } + return NIOServerTransport( + .http2NIOTS( + address: address, + transportSecurity: .plaintext, + config: .defaults { + $0.channelDebuggingCallbacks = debug + } + ) ) #endif } @@ -120,24 +124,28 @@ struct ChannelDebugCallbackTests { kind: TransportKind, target: any ResolvableTarget, debug: HTTP2ClientTransport.Config.ChannelDebuggingCallbacks - ) throws -> any ClientTransport { + ) throws -> NIOClientTransport { switch kind { case .posix: - return try .http2NIOPosix( - target: target, - transportSecurity: .plaintext, - config: .defaults { - $0.channelDebuggingCallbacks = debug - } + return NIOClientTransport( + try .http2NIOPosix( + target: target, + transportSecurity: .plaintext, + config: .defaults { + $0.channelDebuggingCallbacks = debug + } + ) ) #if canImport(Network) case .transportServices: - return try .http2NIOTS( - target: target, - transportSecurity: .plaintext, - config: .defaults { - $0.channelDebuggingCallbacks = debug - } + return NIOClientTransport( + try .http2NIOTS( + target: target, + transportSecurity: .plaintext, + config: .defaults { + $0.channelDebuggingCallbacks = debug + } + ) ) #endif } diff --git a/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/JSONCoding.swift b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/JSONCoding.swift index d8a01bc..ed27c57 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/JSONCoding.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/JSONCoding.swift @@ -27,15 +27,17 @@ import class Foundation.JSONDecoder #endif struct JSONCoder: MessageSerializer, MessageDeserializer { - func serialize(_ message: Message) throws -> [UInt8] { + func serialize(_ message: Message) throws -> Bytes { let json = JSONEncoder() - let bytes = try json.encode(message) - return Array(bytes) + let data = try json.encode(message) + return Bytes(data) } - func deserialize(_ serializedMessageBytes: [UInt8]) throws -> Message { + func deserialize(_ serializedMessageBytes: Bytes) throws -> Message { let json = JSONDecoder() - let data = Data(serializedMessageBytes) + let data = serializedMessageBytes.withUnsafeBytes { + Data($0) + } return try json.decode(Message.self, from: data) } } diff --git a/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/StatsService.swift b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/StatsService.swift index e911109..2ffdadd 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/StatsService.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/StatsService.swift @@ -63,7 +63,7 @@ struct StatsService { } extension StatsService: RegistrableRPCService { - func registerMethods(with router: inout RPCRouter) { + func registerMethods(with router: inout RPCRouter) { router.registerHandler( forMethod: .getStats, deserializer: JSONCoder(), @@ -79,10 +79,10 @@ extension StatsService: RegistrableRPCService { } } -struct StatsClient { - private let underlying: GRPCClient +struct StatsClient { + private let underlying: GRPCClient - init(wrapping underlying: GRPCClient) { + init(wrapping underlying: GRPCClient) { self.underlying = underlying } From 3184e9c82a1b023808c820a62a06db5c0794b8cc Mon Sep 17 00:00:00 2001 From: George Barnett Date: Fri, 17 Jan 2025 14:46:20 +0000 Subject: [PATCH 7/7] Use ELFs --- .../Client/Connection/Connection.swift | 25 ++++-------- .../Client/Connection/ConnectionFactory.swift | 4 +- .../Client/HTTP2ClientTransport.swift | 8 ++-- .../EventLoopFuture+ChannelInitializer.swift | 38 +++---------------- .../Internal/NIOChannelPipeline+GRPC.swift | 5 +-- .../Server/HTTP2ServerTransport.swift | 12 +++--- .../HTTP2ClientTransport+Posix.swift | 6 +-- .../HTTP2ServerTransport+Posix.swift | 12 +++--- ...TP2ClientTransport+TransportServices.swift | 6 +-- ...TP2ServerTransport+TransportServices.swift | 12 +++--- .../HTTP2ServerTransport+DebugTests.swift | 15 +++++--- 11 files changed, 55 insertions(+), 88 deletions(-) diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift b/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift index c50ab70..3a3132a 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift @@ -202,16 +202,10 @@ package final class Connection: Sendable { descriptor: MethodDescriptor, options: CallOptions ) async throws -> Stream { - let (multiplexer, scheme, remotePeer, localPeer, onCreateStream) = try self.state.withLock { state in + let connected = try self.state.withLock { state in switch state { case .connected(let connected): - return ( - connected.multiplexer, - connected.scheme, - connected.remotePeer, - connected.localPeer, - connected.onCreateHTTP2Stream - ) + return connected case .notConnected, .closing, .closed: throw RPCError(code: .unavailable, message: "subchannel isn't ready") } @@ -227,11 +221,11 @@ package final class Connection: Sendable { let maxRequestSize = options.maxRequestMessageBytes ?? Self.defaultMaxRequestMessageSizeBytes do { - let stream = try await multiplexer.openStream { channel in + let stream = try await connected.multiplexer.openStream { channel in channel.eventLoop.makeCompletedFuture { let streamHandler = GRPCClientStreamHandler( methodDescriptor: descriptor, - scheme: scheme, + scheme: connected.scheme, // The value of authority here is being used for the ":authority" pseudo-header. Derive // one from the address if we don't already have one. authority: self.authority ?? self.address.authority, @@ -249,16 +243,13 @@ package final class Connection: Sendable { outboundType: RPCRequestPart.self ) ) - }.runCallbackIfSet( - on: channel, - callback: onCreateStream - ) + }.runInitializerIfSet(connected.onCreateHTTP2Stream, on: channel) } let context = ClientContext( descriptor: descriptor, - remotePeer: remotePeer, - localPeer: localPeer + remotePeer: connected.remotePeer, + localPeer: connected.localPeer ) return Stream(wrapping: stream, context: context) @@ -490,7 +481,7 @@ extension Connection { /// Whether the connection is plaintext, `false` implies TLS is being used. var scheme: Scheme /// A user-provided callback to call after creating the stream. - var onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)? + var onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture)? init(_ connection: HTTP2Connection) { self.channel = connection.channel diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift b/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift index bfc2d7a..cb42d82 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift @@ -38,7 +38,7 @@ package struct HTTP2Connection: Sendable { var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer /// A callback which is invoked when creating an HTTP/2 stream. - var onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)? + var onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture)? /// Whether the connection is insecure (i.e. plaintext). var isPlaintext: Bool @@ -47,7 +47,7 @@ package struct HTTP2Connection: Sendable { channel: NIOAsyncChannel, multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer, isPlaintext: Bool, - onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)? + onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture)? ) { self.channel = channel self.multiplexer = multiplexer diff --git a/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift b/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift index 0d166c0..76487b5 100644 --- a/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift +++ b/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift @@ -175,14 +175,14 @@ extension HTTP2ClientTransport.Config { /// unexpected side effects on your gRPC application. public struct ChannelDebuggingCallbacks: Sendable { /// A callback invoked with each new TCP connection. - public var onCreateTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)? + public var onCreateTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? /// A callback invoked with each new HTTP/2 stream. - public var onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? + public var onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? public init( - onCreateTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)?, - onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? + onCreateTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture)?, + onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? ) { self.onCreateTCPConnection = onCreateTCPConnection self.onCreateHTTP2Stream = onCreateHTTP2Stream diff --git a/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift b/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift index 7df26b5..5ec673b 100644 --- a/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift +++ b/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift @@ -17,40 +17,14 @@ package import NIOCore extension EventLoopFuture where Value: Sendable { - package func runCallbackIfSet( - on channel: any Channel, - callback: (@Sendable (any Channel) async throws -> Void)? + package func runInitializerIfSet( + _ initializer: (@Sendable (any Channel) -> EventLoopFuture)?, + on channel: any Channel ) -> EventLoopFuture { - guard let initializer = callback else { return self } + guard let initializer = initializer else { return self } - // The code below code is equivalent to the following but avoids allocating an extra future. - // - // return self.flatMap { value in - // self.eventLoop.makeFutureWithTask { - // try await userInitializer(channel) - // }.map { - // value - // } - // } - // - let promise = self.eventLoop.makePromise(of: Value.self) - self.whenComplete { result in - switch result { - case .success(let value): - Task { - do { - try await initializer(channel) - promise.succeed(value) - } catch { - promise.fail(error) - } - } - - case .failure(let error): - promise.fail(error) - } + return self.flatMap { value in + initializer(channel).map { value } } - - return promise.futureResult } } diff --git a/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift b/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift index 653b765..eded597 100644 --- a/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift +++ b/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift @@ -110,10 +110,7 @@ extension ChannelPipeline.SynchronousOperations { ) ) return (asyncStreamChannel, methodDescriptorPromise.futureResult) - }.runCallbackIfSet( - on: streamChannel, - callback: debugConfig.onAcceptHTTP2Stream - ) + }.runInitializerIfSet(debugConfig.onAcceptHTTP2Stream, on: streamChannel) } try self.addHandler(serverConnectionHandler) diff --git a/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift b/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift index 5c8d385..984e7a2 100644 --- a/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift +++ b/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift @@ -196,18 +196,18 @@ extension HTTP2ServerTransport.Config { /// unexpected side effects on your gRPC application. public struct ChannelDebuggingCallbacks: Sendable { /// A callback invoked when the server starts listening for new TCP connections. - public var onBindTCPListener: (@Sendable (_ channel: any Channel) async throws -> Void)? + public var onBindTCPListener: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? /// A callback invoked with each new accepted TPC connection. - public var onAcceptTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)? + public var onAcceptTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? /// A callback invoked with each accepted HTTP/2 stream. - public var onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? + public var onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? public init( - onBindTCPListener: (@Sendable (_ channel: any Channel) async throws -> Void)?, - onAcceptTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)?, - onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? + onBindTCPListener: (@Sendable (_ channel: any Channel) -> EventLoopFuture)?, + onAcceptTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture)?, + onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? ) { self.onBindTCPListener = onBindTCPListener self.onAcceptTCPConnection = onAcceptTCPConnection diff --git a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift index 5f2641e..879c3ec 100644 --- a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift +++ b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift @@ -184,9 +184,9 @@ extension HTTP2ClientTransport.Posix { channel: channel, config: GRPCChannel.Config(posix: self.config) ) - }.runCallbackIfSet( - on: channel, - callback: self.config.channelDebuggingCallbacks.onCreateTCPConnection + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onCreateTCPConnection, + on: channel ) } diff --git a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift index cbd35ec..0a2cc4a 100644 --- a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift +++ b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift @@ -88,9 +88,9 @@ extension HTTP2ServerTransport { channel: channel ) try channel.pipeline.syncOperations.addHandler(quiescingHandler) - }.runCallbackIfSet( - on: channel, - callback: self.config.channelDebuggingCallbacks.onBindTCPListener + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onBindTCPListener, + on: channel ) } .bind(to: address) { channel in @@ -122,9 +122,9 @@ extension HTTP2ServerTransport { requireALPN: requireALPN, scheme: scheme ) - }.runCallbackIfSet( - on: channel, - callback: self.config.channelDebuggingCallbacks.onAcceptTCPConnection + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onAcceptTCPConnection, + on: channel ) } diff --git a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift index 71cf27f..4925283 100644 --- a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift +++ b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift @@ -181,9 +181,9 @@ extension HTTP2ClientTransport.TransportServices { channel: channel, config: GRPCChannel.Config(transportServices: self.config) ) - }.runCallbackIfSet( - on: channel, - callback: self.config.channelDebuggingCallbacks.onCreateTCPConnection + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onCreateTCPConnection, + on: channel ) } diff --git a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift index 0b3b93a..0a70437 100644 --- a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift +++ b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift @@ -66,9 +66,9 @@ extension HTTP2ServerTransport { channel: channel ) try channel.pipeline.syncOperations.addHandler(quiescingHandler) - }.runCallbackIfSet( - on: channel, - callback: self.config.channelDebuggingCallbacks.onBindTCPListener + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onBindTCPListener, + on: channel ) } .bind(to: address) { channel in @@ -83,9 +83,9 @@ extension HTTP2ServerTransport { requireALPN: requireALPN, scheme: scheme ) - }.runCallbackIfSet( - on: channel, - callback: self.config.channelDebuggingCallbacks.onAcceptTCPConnection + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onAcceptTCPConnection, + on: channel ) } diff --git a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift index d0ab6ec..2a26d3b 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift @@ -28,23 +28,28 @@ struct ChannelDebugCallbackTests { let stats = DebugCallbackStats() let serverDebug = HTTP2ServerTransport.Config.ChannelDebuggingCallbacks( - onBindTCPListener: { _ in + onBindTCPListener: { channel in stats.tcpListenersBound.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() }, - onAcceptTCPConnection: { _ in + onAcceptTCPConnection: { channel in stats.tcpConnectionsAccepted.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() }, - onAcceptHTTP2Stream: { _ in + onAcceptHTTP2Stream: { channel in stats.http2StreamsAccepted.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() } ) let clientDebug = HTTP2ClientTransport.Config.ChannelDebuggingCallbacks( - onCreateTCPConnection: { _ in + onCreateTCPConnection: { channel in stats.tcpConnectionsCreated.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() }, - onCreateHTTP2Stream: { _ in + onCreateHTTP2Stream: { channel in stats.http2StreamsCreated.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() } )