diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift b/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift index c50ab70..3a3132a 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift @@ -202,16 +202,10 @@ package final class Connection: Sendable { descriptor: MethodDescriptor, options: CallOptions ) async throws -> Stream { - let (multiplexer, scheme, remotePeer, localPeer, onCreateStream) = try self.state.withLock { state in + let connected = try self.state.withLock { state in switch state { case .connected(let connected): - return ( - connected.multiplexer, - connected.scheme, - connected.remotePeer, - connected.localPeer, - connected.onCreateHTTP2Stream - ) + return connected case .notConnected, .closing, .closed: throw RPCError(code: .unavailable, message: "subchannel isn't ready") } @@ -227,11 +221,11 @@ package final class Connection: Sendable { let maxRequestSize = options.maxRequestMessageBytes ?? Self.defaultMaxRequestMessageSizeBytes do { - let stream = try await multiplexer.openStream { channel in + let stream = try await connected.multiplexer.openStream { channel in channel.eventLoop.makeCompletedFuture { let streamHandler = GRPCClientStreamHandler( methodDescriptor: descriptor, - scheme: scheme, + scheme: connected.scheme, // The value of authority here is being used for the ":authority" pseudo-header. Derive // one from the address if we don't already have one. authority: self.authority ?? self.address.authority, @@ -249,16 +243,13 @@ package final class Connection: Sendable { outboundType: RPCRequestPart.self ) ) - }.runCallbackIfSet( - on: channel, - callback: onCreateStream - ) + }.runInitializerIfSet(connected.onCreateHTTP2Stream, on: channel) } let context = ClientContext( descriptor: descriptor, - remotePeer: remotePeer, - localPeer: localPeer + remotePeer: connected.remotePeer, + localPeer: connected.localPeer ) return Stream(wrapping: stream, context: context) @@ -490,7 +481,7 @@ extension Connection { /// Whether the connection is plaintext, `false` implies TLS is being used. var scheme: Scheme /// A user-provided callback to call after creating the stream. - var onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)? + var onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture)? init(_ connection: HTTP2Connection) { self.channel = connection.channel diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift b/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift index bfc2d7a..cb42d82 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift @@ -38,7 +38,7 @@ package struct HTTP2Connection: Sendable { var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer /// A callback which is invoked when creating an HTTP/2 stream. - var onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)? + var onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture)? /// Whether the connection is insecure (i.e. plaintext). var isPlaintext: Bool @@ -47,7 +47,7 @@ package struct HTTP2Connection: Sendable { channel: NIOAsyncChannel, multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer, isPlaintext: Bool, - onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)? + onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture)? ) { self.channel = channel self.multiplexer = multiplexer diff --git a/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift b/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift index 0d166c0..76487b5 100644 --- a/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift +++ b/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift @@ -175,14 +175,14 @@ extension HTTP2ClientTransport.Config { /// unexpected side effects on your gRPC application. public struct ChannelDebuggingCallbacks: Sendable { /// A callback invoked with each new TCP connection. - public var onCreateTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)? + public var onCreateTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? /// A callback invoked with each new HTTP/2 stream. - public var onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? + public var onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? public init( - onCreateTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)?, - onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? + onCreateTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture)?, + onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? ) { self.onCreateTCPConnection = onCreateTCPConnection self.onCreateHTTP2Stream = onCreateHTTP2Stream diff --git a/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift b/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift index 7df26b5..5ec673b 100644 --- a/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift +++ b/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift @@ -17,40 +17,14 @@ package import NIOCore extension EventLoopFuture where Value: Sendable { - package func runCallbackIfSet( - on channel: any Channel, - callback: (@Sendable (any Channel) async throws -> Void)? + package func runInitializerIfSet( + _ initializer: (@Sendable (any Channel) -> EventLoopFuture)?, + on channel: any Channel ) -> EventLoopFuture { - guard let initializer = callback else { return self } + guard let initializer = initializer else { return self } - // The code below code is equivalent to the following but avoids allocating an extra future. - // - // return self.flatMap { value in - // self.eventLoop.makeFutureWithTask { - // try await userInitializer(channel) - // }.map { - // value - // } - // } - // - let promise = self.eventLoop.makePromise(of: Value.self) - self.whenComplete { result in - switch result { - case .success(let value): - Task { - do { - try await initializer(channel) - promise.succeed(value) - } catch { - promise.fail(error) - } - } - - case .failure(let error): - promise.fail(error) - } + return self.flatMap { value in + initializer(channel).map { value } } - - return promise.futureResult } } diff --git a/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift b/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift index 653b765..eded597 100644 --- a/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift +++ b/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift @@ -110,10 +110,7 @@ extension ChannelPipeline.SynchronousOperations { ) ) return (asyncStreamChannel, methodDescriptorPromise.futureResult) - }.runCallbackIfSet( - on: streamChannel, - callback: debugConfig.onAcceptHTTP2Stream - ) + }.runInitializerIfSet(debugConfig.onAcceptHTTP2Stream, on: streamChannel) } try self.addHandler(serverConnectionHandler) diff --git a/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift b/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift index 5c8d385..984e7a2 100644 --- a/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift +++ b/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift @@ -196,18 +196,18 @@ extension HTTP2ServerTransport.Config { /// unexpected side effects on your gRPC application. public struct ChannelDebuggingCallbacks: Sendable { /// A callback invoked when the server starts listening for new TCP connections. - public var onBindTCPListener: (@Sendable (_ channel: any Channel) async throws -> Void)? + public var onBindTCPListener: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? /// A callback invoked with each new accepted TPC connection. - public var onAcceptTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)? + public var onAcceptTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? /// A callback invoked with each accepted HTTP/2 stream. - public var onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? + public var onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? public init( - onBindTCPListener: (@Sendable (_ channel: any Channel) async throws -> Void)?, - onAcceptTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)?, - onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? + onBindTCPListener: (@Sendable (_ channel: any Channel) -> EventLoopFuture)?, + onAcceptTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture)?, + onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? ) { self.onBindTCPListener = onBindTCPListener self.onAcceptTCPConnection = onAcceptTCPConnection diff --git a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift index 5f2641e..879c3ec 100644 --- a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift +++ b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift @@ -184,9 +184,9 @@ extension HTTP2ClientTransport.Posix { channel: channel, config: GRPCChannel.Config(posix: self.config) ) - }.runCallbackIfSet( - on: channel, - callback: self.config.channelDebuggingCallbacks.onCreateTCPConnection + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onCreateTCPConnection, + on: channel ) } diff --git a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift index cbd35ec..0a2cc4a 100644 --- a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift +++ b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift @@ -88,9 +88,9 @@ extension HTTP2ServerTransport { channel: channel ) try channel.pipeline.syncOperations.addHandler(quiescingHandler) - }.runCallbackIfSet( - on: channel, - callback: self.config.channelDebuggingCallbacks.onBindTCPListener + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onBindTCPListener, + on: channel ) } .bind(to: address) { channel in @@ -122,9 +122,9 @@ extension HTTP2ServerTransport { requireALPN: requireALPN, scheme: scheme ) - }.runCallbackIfSet( - on: channel, - callback: self.config.channelDebuggingCallbacks.onAcceptTCPConnection + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onAcceptTCPConnection, + on: channel ) } diff --git a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift index 71cf27f..4925283 100644 --- a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift +++ b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift @@ -181,9 +181,9 @@ extension HTTP2ClientTransport.TransportServices { channel: channel, config: GRPCChannel.Config(transportServices: self.config) ) - }.runCallbackIfSet( - on: channel, - callback: self.config.channelDebuggingCallbacks.onCreateTCPConnection + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onCreateTCPConnection, + on: channel ) } diff --git a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift index 0b3b93a..0a70437 100644 --- a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift +++ b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift @@ -66,9 +66,9 @@ extension HTTP2ServerTransport { channel: channel ) try channel.pipeline.syncOperations.addHandler(quiescingHandler) - }.runCallbackIfSet( - on: channel, - callback: self.config.channelDebuggingCallbacks.onBindTCPListener + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onBindTCPListener, + on: channel ) } .bind(to: address) { channel in @@ -83,9 +83,9 @@ extension HTTP2ServerTransport { requireALPN: requireALPN, scheme: scheme ) - }.runCallbackIfSet( - on: channel, - callback: self.config.channelDebuggingCallbacks.onAcceptTCPConnection + }.runInitializerIfSet( + self.config.channelDebuggingCallbacks.onAcceptTCPConnection, + on: channel ) } diff --git a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift index d0ab6ec..2a26d3b 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift @@ -28,23 +28,28 @@ struct ChannelDebugCallbackTests { let stats = DebugCallbackStats() let serverDebug = HTTP2ServerTransport.Config.ChannelDebuggingCallbacks( - onBindTCPListener: { _ in + onBindTCPListener: { channel in stats.tcpListenersBound.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() }, - onAcceptTCPConnection: { _ in + onAcceptTCPConnection: { channel in stats.tcpConnectionsAccepted.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() }, - onAcceptHTTP2Stream: { _ in + onAcceptHTTP2Stream: { channel in stats.http2StreamsAccepted.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() } ) let clientDebug = HTTP2ClientTransport.Config.ChannelDebuggingCallbacks( - onCreateTCPConnection: { _ in + onCreateTCPConnection: { channel in stats.tcpConnectionsCreated.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() }, - onCreateHTTP2Stream: { _ in + onCreateHTTP2Stream: { channel in stats.http2StreamsCreated.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() } )