Skip to content

Commit

Permalink
Use ELFs
Browse files Browse the repository at this point in the history
  • Loading branch information
glbrntt committed Jan 17, 2025
1 parent e2bdb70 commit 3184e9c
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 88 deletions.
25 changes: 8 additions & 17 deletions Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -202,16 +202,10 @@ package final class Connection: Sendable {
descriptor: MethodDescriptor,
options: CallOptions
) async throws -> Stream {
let (multiplexer, scheme, remotePeer, localPeer, onCreateStream) = try self.state.withLock { state in
let connected = try self.state.withLock { state in
switch state {
case .connected(let connected):
return (
connected.multiplexer,
connected.scheme,
connected.remotePeer,
connected.localPeer,
connected.onCreateHTTP2Stream
)
return connected
case .notConnected, .closing, .closed:
throw RPCError(code: .unavailable, message: "subchannel isn't ready")
}
Expand All @@ -227,11 +221,11 @@ package final class Connection: Sendable {
let maxRequestSize = options.maxRequestMessageBytes ?? Self.defaultMaxRequestMessageSizeBytes

do {
let stream = try await multiplexer.openStream { channel in
let stream = try await connected.multiplexer.openStream { channel in
channel.eventLoop.makeCompletedFuture {
let streamHandler = GRPCClientStreamHandler(
methodDescriptor: descriptor,
scheme: scheme,
scheme: connected.scheme,
// The value of authority here is being used for the ":authority" pseudo-header. Derive
// one from the address if we don't already have one.
authority: self.authority ?? self.address.authority,
Expand All @@ -249,16 +243,13 @@ package final class Connection: Sendable {
outboundType: RPCRequestPart<GRPCNIOTransportBytes>.self
)
)
}.runCallbackIfSet(
on: channel,
callback: onCreateStream
)
}.runInitializerIfSet(connected.onCreateHTTP2Stream, on: channel)
}

let context = ClientContext(
descriptor: descriptor,
remotePeer: remotePeer,
localPeer: localPeer
remotePeer: connected.remotePeer,
localPeer: connected.localPeer
)

return Stream(wrapping: stream, context: context)
Expand Down Expand Up @@ -490,7 +481,7 @@ extension Connection {
/// Whether the connection is plaintext, `false` implies TLS is being used.
var scheme: Scheme
/// A user-provided callback to call after creating the stream.
var onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)?
var onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture<Void>)?

init(_ connection: HTTP2Connection) {
self.channel = connection.channel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ package struct HTTP2Connection: Sendable {
var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>

/// A callback which is invoked when creating an HTTP/2 stream.
var onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)?
var onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture<Void>)?

/// Whether the connection is insecure (i.e. plaintext).
var isPlaintext: Bool
Expand All @@ -47,7 +47,7 @@ package struct HTTP2Connection: Sendable {
channel: NIOAsyncChannel<ClientConnectionEvent, Void>,
multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>,
isPlaintext: Bool,
onCreateHTTP2Stream: (@Sendable (any Channel) async throws -> Void)?
onCreateHTTP2Stream: (@Sendable (any Channel) -> EventLoopFuture<Void>)?
) {
self.channel = channel
self.multiplexer = multiplexer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,14 @@ extension HTTP2ClientTransport.Config {
/// unexpected side effects on your gRPC application.
public struct ChannelDebuggingCallbacks: Sendable {
/// A callback invoked with each new TCP connection.
public var onCreateTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)?
public var onCreateTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?

/// A callback invoked with each new HTTP/2 stream.
public var onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)?
public var onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?

public init(
onCreateTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)?,
onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)?
onCreateTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?,
onCreateHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?
) {
self.onCreateTCPConnection = onCreateTCPConnection
self.onCreateHTTP2Stream = onCreateHTTP2Stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,14 @@
package import NIOCore

extension EventLoopFuture where Value: Sendable {
package func runCallbackIfSet(
on channel: any Channel,
callback: (@Sendable (any Channel) async throws -> Void)?
package func runInitializerIfSet(
_ initializer: (@Sendable (any Channel) -> EventLoopFuture<Void>)?,
on channel: any Channel
) -> EventLoopFuture<Value> {
guard let initializer = callback else { return self }
guard let initializer = initializer else { return self }

// The code below code is equivalent to the following but avoids allocating an extra future.
//
// return self.flatMap { value in
// self.eventLoop.makeFutureWithTask {
// try await userInitializer(channel)
// }.map {
// value
// }
// }
//
let promise = self.eventLoop.makePromise(of: Value.self)
self.whenComplete { result in
switch result {
case .success(let value):
Task {
do {
try await initializer(channel)
promise.succeed(value)
} catch {
promise.fail(error)
}
}

case .failure(let error):
promise.fail(error)
}
return self.flatMap { value in
initializer(channel).map { value }
}

return promise.futureResult
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,7 @@ extension ChannelPipeline.SynchronousOperations {
)
)
return (asyncStreamChannel, methodDescriptorPromise.futureResult)
}.runCallbackIfSet(
on: streamChannel,
callback: debugConfig.onAcceptHTTP2Stream
)
}.runInitializerIfSet(debugConfig.onAcceptHTTP2Stream, on: streamChannel)
}

try self.addHandler(serverConnectionHandler)
Expand Down
12 changes: 6 additions & 6 deletions Sources/GRPCNIOTransportCore/Server/HTTP2ServerTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -196,18 +196,18 @@ extension HTTP2ServerTransport.Config {
/// unexpected side effects on your gRPC application.
public struct ChannelDebuggingCallbacks: Sendable {
/// A callback invoked when the server starts listening for new TCP connections.
public var onBindTCPListener: (@Sendable (_ channel: any Channel) async throws -> Void)?
public var onBindTCPListener: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?

/// A callback invoked with each new accepted TPC connection.
public var onAcceptTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)?
public var onAcceptTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?

/// A callback invoked with each accepted HTTP/2 stream.
public var onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)?
public var onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?

public init(
onBindTCPListener: (@Sendable (_ channel: any Channel) async throws -> Void)?,
onAcceptTCPConnection: (@Sendable (_ channel: any Channel) async throws -> Void)?,
onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) async throws -> Void)?
onBindTCPListener: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?,
onAcceptTCPConnection: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?,
onAcceptHTTP2Stream: (@Sendable (_ channel: any Channel) -> EventLoopFuture<Void>)?
) {
self.onBindTCPListener = onBindTCPListener
self.onAcceptTCPConnection = onAcceptTCPConnection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ extension HTTP2ClientTransport.Posix {
channel: channel,
config: GRPCChannel.Config(posix: self.config)
)
}.runCallbackIfSet(
on: channel,
callback: self.config.channelDebuggingCallbacks.onCreateTCPConnection
}.runInitializerIfSet(
self.config.channelDebuggingCallbacks.onCreateTCPConnection,
on: channel
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ extension HTTP2ServerTransport {
channel: channel
)
try channel.pipeline.syncOperations.addHandler(quiescingHandler)
}.runCallbackIfSet(
on: channel,
callback: self.config.channelDebuggingCallbacks.onBindTCPListener
}.runInitializerIfSet(
self.config.channelDebuggingCallbacks.onBindTCPListener,
on: channel
)
}
.bind(to: address) { channel in
Expand Down Expand Up @@ -122,9 +122,9 @@ extension HTTP2ServerTransport {
requireALPN: requireALPN,
scheme: scheme
)
}.runCallbackIfSet(
on: channel,
callback: self.config.channelDebuggingCallbacks.onAcceptTCPConnection
}.runInitializerIfSet(
self.config.channelDebuggingCallbacks.onAcceptTCPConnection,
on: channel
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ extension HTTP2ClientTransport.TransportServices {
channel: channel,
config: GRPCChannel.Config(transportServices: self.config)
)
}.runCallbackIfSet(
on: channel,
callback: self.config.channelDebuggingCallbacks.onCreateTCPConnection
}.runInitializerIfSet(
self.config.channelDebuggingCallbacks.onCreateTCPConnection,
on: channel
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ extension HTTP2ServerTransport {
channel: channel
)
try channel.pipeline.syncOperations.addHandler(quiescingHandler)
}.runCallbackIfSet(
on: channel,
callback: self.config.channelDebuggingCallbacks.onBindTCPListener
}.runInitializerIfSet(
self.config.channelDebuggingCallbacks.onBindTCPListener,
on: channel
)
}
.bind(to: address) { channel in
Expand All @@ -83,9 +83,9 @@ extension HTTP2ServerTransport {
requireALPN: requireALPN,
scheme: scheme
)
}.runCallbackIfSet(
on: channel,
callback: self.config.channelDebuggingCallbacks.onAcceptTCPConnection
}.runInitializerIfSet(
self.config.channelDebuggingCallbacks.onAcceptTCPConnection,
on: channel
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,28 @@ struct ChannelDebugCallbackTests {

let stats = DebugCallbackStats()
let serverDebug = HTTP2ServerTransport.Config.ChannelDebuggingCallbacks(
onBindTCPListener: { _ in
onBindTCPListener: { channel in
stats.tcpListenersBound.add(1, ordering: .sequentiallyConsistent)
return channel.eventLoop.makeSucceededVoidFuture()
},
onAcceptTCPConnection: { _ in
onAcceptTCPConnection: { channel in
stats.tcpConnectionsAccepted.add(1, ordering: .sequentiallyConsistent)
return channel.eventLoop.makeSucceededVoidFuture()
},
onAcceptHTTP2Stream: { _ in
onAcceptHTTP2Stream: { channel in
stats.http2StreamsAccepted.add(1, ordering: .sequentiallyConsistent)
return channel.eventLoop.makeSucceededVoidFuture()
}
)

let clientDebug = HTTP2ClientTransport.Config.ChannelDebuggingCallbacks(
onCreateTCPConnection: { _ in
onCreateTCPConnection: { channel in
stats.tcpConnectionsCreated.add(1, ordering: .sequentiallyConsistent)
return channel.eventLoop.makeSucceededVoidFuture()
},
onCreateHTTP2Stream: { _ in
onCreateHTTP2Stream: { channel in
stats.http2StreamsCreated.add(1, ordering: .sequentiallyConsistent)
return channel.eventLoop.makeSucceededVoidFuture()
}
)

Expand Down

0 comments on commit 3184e9c

Please sign in to comment.