From 7ca6a4be4d14d9dfee1921ff74423045e425679f Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 15 Jan 2025 17:53:02 +0000 Subject: [PATCH] PR changes --- .../Client/Connection/Connection.swift | 32 +++++++++++-------- .../Client/Connection/GRPCChannel.swift | 16 ++++------ .../Internal/Channel+AddressInfo.swift | 14 ++++---- .../Server/CommonHTTP2ServerTransport.swift | 2 +- 4 files changed, 32 insertions(+), 32 deletions(-) diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift b/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift index 91aa1b2..1414384 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift @@ -202,10 +202,10 @@ package final class Connection: Sendable { descriptor: MethodDescriptor, options: CallOptions ) async throws -> Stream { - let (multiplexer, scheme) = try self.state.withLock { state in + let (multiplexer, scheme, remotePeer, localPeer) = try self.state.withLock { state in switch state { case .connected(let connected): - return (connected.multiplexer, connected.scheme) + return (connected.multiplexer, connected.scheme, connected.remotePeer, connected.localPeer) case .notConnected, .closing, .closed: throw RPCError(code: .unavailable, message: "subchannel isn't ready") } @@ -246,7 +246,13 @@ package final class Connection: Sendable { } } - return Stream(wrapping: stream, descriptor: descriptor) + let context = ClientContext( + descriptor: descriptor, + remotePeer: remotePeer, + localPeer: localPeer + ) + + return Stream(wrapping: stream, context: context) } catch { throw RPCError(code: .unavailable, message: "subchannel is unavailable", cause: error) } @@ -417,24 +423,16 @@ extension Connection { } } - let descriptor: MethodDescriptor + let context: ClientContext private let http2Stream: NIOAsyncChannel - var peerInfo: String { - self.http2Stream.channel.getRemoteAddressInfo() - } - - var localInfo: String { - self.http2Stream.channel.getLocalAddressInfo() - } - init( wrapping stream: NIOAsyncChannel, - descriptor: MethodDescriptor + context: ClientContext ) { self.http2Stream = stream - self.descriptor = descriptor + self.context = context } package func execute( @@ -465,6 +463,10 @@ extension Connection { struct Connected: Sendable { /// The connection channel. var channel: NIOAsyncChannel + /// The connection's remote peer information. + var remotePeer: String + /// The connection's local peer information. + var localPeer: String /// Multiplexer for creating HTTP/2 streams. var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer /// Whether the connection is plaintext, `false` implies TLS is being used. @@ -472,6 +474,8 @@ extension Connection { init(_ connection: HTTP2Connection) { self.channel = connection.channel + self.remotePeer = connection.channel.remoteAddressInfo + self.localPeer = connection.channel.localAddressInfo self.multiplexer = connection.multiplexer self.scheme = connection.isPlaintext ? .http : .https } diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift b/Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift index 57bc329..cef1422 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift @@ -202,7 +202,10 @@ package final class GRPCChannel: ClientTransport { package func withStream( descriptor: MethodDescriptor, options: CallOptions, - _ closure: (_ stream: RPCStream, _ context: ClientContext) async throws -> T + _ closure: ( + _ stream: RPCStream, + _ context: ClientContext + ) async throws -> T ) async throws -> T { // Merge options from the call with those from the service config. let methodConfig = self.config(forMethod: descriptor) @@ -214,18 +217,11 @@ package final class GRPCChannel: ClientTransport { case .created(let stream): return try await stream.execute { inbound, outbound in let rpcStream = RPCStream( - descriptor: stream.descriptor, + descriptor: stream.context.descriptor, inbound: RPCAsyncSequence(wrapping: inbound), outbound: RPCWriter.Closable(wrapping: outbound) ) - let context = ClientContext( - descriptor: descriptor, - remotePeer: stream.peerInfo, - localPeer: stream.localInfo, - serverHostname: self.authority ?? "", - networkTransportMethod: "tcp" - ) - return try await closure(rpcStream, context) + return try await closure(rpcStream, stream.context) } case .tryAgain(let error): diff --git a/Sources/GRPCNIOTransportCore/Internal/Channel+AddressInfo.swift b/Sources/GRPCNIOTransportCore/Internal/Channel+AddressInfo.swift index 63b40f6..f83ef34 100644 --- a/Sources/GRPCNIOTransportCore/Internal/Channel+AddressInfo.swift +++ b/Sources/GRPCNIOTransportCore/Internal/Channel+AddressInfo.swift @@ -14,11 +14,11 @@ * limitations under the License. */ -import NIOCore +internal import NIOCore -extension Channel { - func getRemoteAddressInfo() -> String { - guard let remote = self.remoteAddress else { +extension NIOAsyncChannel { + var remoteAddressInfo: String { + guard let remote = self.channel.remoteAddress else { return "" } @@ -33,7 +33,7 @@ extension Channel { case .unixDomainSocket: // The pathname will be on the local address. - guard let local = self.localAddress else { + guard let local = self.channel.localAddress else { // UDS but no local address; this shouldn't ever happen but at least note the transport // as being UDS. return "unix:" @@ -51,8 +51,8 @@ extension Channel { } } - func getLocalAddressInfo() -> String { - guard let local = self.localAddress else { + var localAddressInfo: String { + guard let local = self.channel.localAddress else { return "" } diff --git a/Sources/GRPCNIOTransportCore/Server/CommonHTTP2ServerTransport.swift b/Sources/GRPCNIOTransportCore/Server/CommonHTTP2ServerTransport.swift index 5397b60..46c0eff 100644 --- a/Sources/GRPCNIOTransportCore/Server/CommonHTTP2ServerTransport.swift +++ b/Sources/GRPCNIOTransportCore/Server/CommonHTTP2ServerTransport.swift @@ -199,7 +199,7 @@ package final class CommonHTTP2ServerTransport< _ context: ServerContext ) async -> Void ) async throws { - let peer = connection.channel.getRemoteAddressInfo() + let peer = connection.remoteAddressInfo try await connection.executeThenClose { inbound, _ in await withDiscardingTaskGroup { group in group.addTask {