From fa9f9a5659c94ff75d60bef15b5967e8ddd72640 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Fri, 17 Jan 2025 14:46:20 +0000 Subject: [PATCH] Use ELFs --- .../Client/Connection/Connection.swift | 22 +++++-------- .../Client/Connection/ConnectionFactory.swift | 4 +-- .../Client/HTTP2ClientTransport.swift | 8 ++--- .../EventLoopFuture+ChannelInitializer.swift | 32 ++----------------- .../Server/HTTP2ServerTransport.swift | 12 +++---- .../HTTP2ServerTransport+DebugTests.swift | 15 ++++++--- 6 files changed, 33 insertions(+), 60 deletions(-) diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift b/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift index c50ab70..0c58b2d 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift @@ -202,16 +202,10 @@ package final class Connection: Sendable { descriptor: MethodDescriptor, options: CallOptions ) async throws -> Stream { - let (multiplexer, scheme, remotePeer, localPeer, onCreateStream) = try self.state.withLock { state in + let connected = try self.state.withLock { state in switch state { case .connected(let connected): - return ( - connected.multiplexer, - connected.scheme, - connected.remotePeer, - connected.localPeer, - connected.onCreateHTTP2Stream - ) + return connected case .notConnected, .closing, .closed: throw RPCError(code: .unavailable, message: "subchannel isn't ready") } @@ -227,11 +221,11 @@ package final class Connection: Sendable { let maxRequestSize = options.maxRequestMessageBytes ?? Self.defaultMaxRequestMessageSizeBytes do { - let stream = try await multiplexer.openStream { channel in + let stream = try await connected.multiplexer.openStream { channel in channel.eventLoop.makeCompletedFuture { let streamHandler = GRPCClientStreamHandler( methodDescriptor: descriptor, - scheme: scheme, + scheme: connected.scheme, // The value of authority here is being used for the ":authority" pseudo-header. Derive // one from the address if we don't already have one. authority: self.authority ?? self.address.authority, @@ -251,14 +245,14 @@ package final class Connection: Sendable { ) }.runCallbackIfSet( on: channel, - callback: onCreateStream + callback: connected.onCreateHTTP2Stream ) } let context = ClientContext( descriptor: descriptor, - remotePeer: remotePeer, - localPeer: localPeer + remotePeer: connected.remotePeer, + localPeer: connected.localPeer ) return Stream(wrapping: stream, context: context) @@ -490,7 +484,7 @@ extension Connection { /// Whether the connection is plaintext, `false` implies TLS is being used. var scheme: Scheme /// A user-provided callback to call after creating the stream. - var onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)? + var onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture)? init(_ connection: HTTP2Connection) { self.channel = connection.channel diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift b/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift index bfc2d7a..cb42d82 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/ConnectionFactory.swift @@ -38,7 +38,7 @@ package struct HTTP2Connection: Sendable { var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer /// A callback which is invoked when creating an HTTP/2 stream. - var onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)? + var onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture)? /// Whether the connection is insecure (i.e. plaintext). var isPlaintext: Bool @@ -47,7 +47,7 @@ package struct HTTP2Connection: Sendable { channel: NIOAsyncChannel, multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer, isPlaintext: Bool, - onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)? + onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture)? ) { self.channel = channel self.multiplexer = multiplexer diff --git a/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift b/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift index 0d166c0..76487b5 100644 --- a/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift +++ b/Sources/GRPCNIOTransportCore/Client/HTTP2ClientTransport.swift @@ -175,14 +175,14 @@ extension HTTP2ClientTransport.Config { /// unexpected side effects on your gRPC application. public struct ChannelDebuggingCallbacks: Sendable { /// A callback invoked with each new TCP connection. - public var onCreateTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)? + public var onCreateTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? /// A callback invoked with each new HTTP/2 stream. - public var onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? + public var onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? public init( - onCreateTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)?, - onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? + onCreateTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture)?, + onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? ) { self.onCreateTCPConnection = onCreateTCPConnection self.onCreateHTTP2Stream = onCreateHTTP2Stream diff --git a/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift b/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift index 7df26b5..7250bbd 100644 --- a/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift +++ b/Sources/GRPCNIOTransportCore/Internal/EventLoopFuture+ChannelInitializer.swift @@ -19,38 +19,12 @@ package import NIOCore extension EventLoopFuture where Value: Sendable { package func runCallbackIfSet( on channel: any Channel, - callback: (@Sendable (any Channel) async throws -> Void)? + callback: (@Sendable (any Channel) -> EventLoopFuture)? ) -> EventLoopFuture { guard let initializer = callback else { return self } - // The code below code is equivalent to the following but avoids allocating an extra future. - // - // return self.flatMap { value in - // self.eventLoop.makeFutureWithTask { - // try await userInitializer(channel) - // }.map { - // value - // } - // } - // - let promise = self.eventLoop.makePromise(of: Value.self) - self.whenComplete { result in - switch result { - case .success(let value): - Task { - do { - try await initializer(channel) - promise.succeed(value) - } catch { - promise.fail(error) - } - } - - case .failure(let error): - promise.fail(error) - } + return self.flatMap { value in + initializer(channel).map { value } } - - return promise.futureResult } } diff --git a/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift b/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift index 5c8d385..984e7a2 100644 --- a/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift +++ b/Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift @@ -196,18 +196,18 @@ extension HTTP2ServerTransport.Config { /// unexpected side effects on your gRPC application. public struct ChannelDebuggingCallbacks: Sendable { /// A callback invoked when the server starts listening for new TCP connections. - public var onBindTCPListener: (@Sendable (_ channel: any Channel) async throws -> Void)? + public var onBindTCPListener: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? /// A callback invoked with each new accepted TPC connection. - public var onAcceptTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)? + public var onAcceptTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? /// A callback invoked with each accepted HTTP/2 stream. - public var onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? + public var onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? public init( - onBindTCPListener: (@Sendable (_ channel: any Channel) async throws -> Void)?, - onAcceptTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)?, - onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)? + onBindTCPListener: (@Sendable (_ channel: any Channel) -> EventLoopFuture)?, + onAcceptTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture)?, + onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture)? ) { self.onBindTCPListener = onBindTCPListener self.onAcceptTCPConnection = onAcceptTCPConnection diff --git a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift index d0ab6ec..2a26d3b 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2ServerTransport+DebugTests.swift @@ -28,23 +28,28 @@ struct ChannelDebugCallbackTests { let stats = DebugCallbackStats() let serverDebug = HTTP2ServerTransport.Config.ChannelDebuggingCallbacks( - onBindTCPListener: { _ in + onBindTCPListener: { channel in stats.tcpListenersBound.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() }, - onAcceptTCPConnection: { _ in + onAcceptTCPConnection: { channel in stats.tcpConnectionsAccepted.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() }, - onAcceptHTTP2Stream: { _ in + onAcceptHTTP2Stream: { channel in stats.http2StreamsAccepted.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() } ) let clientDebug = HTTP2ClientTransport.Config.ChannelDebuggingCallbacks( - onCreateTCPConnection: { _ in + onCreateTCPConnection: { channel in stats.tcpConnectionsCreated.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() }, - onCreateHTTP2Stream: { _ in + onCreateHTTP2Stream: { channel in stats.http2StreamsCreated.add(1, ordering: .sequentiallyConsistent) + return channel.eventLoop.makeSucceededVoidFuture() } )