Skip to content

Commit

Permalink
PR changes
Browse files Browse the repository at this point in the history
  • Loading branch information
gjcairo committed Jan 15, 2025
1 parent e7068d2 commit 7ca6a4b
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 32 deletions.
32 changes: 18 additions & 14 deletions Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ package final class Connection: Sendable {
descriptor: MethodDescriptor,
options: CallOptions
) async throws -> Stream {
let (multiplexer, scheme) = try self.state.withLock { state in
let (multiplexer, scheme, remotePeer, localPeer) = try self.state.withLock { state in
switch state {
case .connected(let connected):
return (connected.multiplexer, connected.scheme)
return (connected.multiplexer, connected.scheme, connected.remotePeer, connected.localPeer)
case .notConnected, .closing, .closed:
throw RPCError(code: .unavailable, message: "subchannel isn't ready")
}
Expand Down Expand Up @@ -246,7 +246,13 @@ package final class Connection: Sendable {
}
}

return Stream(wrapping: stream, descriptor: descriptor)
let context = ClientContext(
descriptor: descriptor,
remotePeer: remotePeer,
localPeer: localPeer
)

return Stream(wrapping: stream, context: context)
} catch {
throw RPCError(code: .unavailable, message: "subchannel is unavailable", cause: error)
}
Expand Down Expand Up @@ -417,24 +423,16 @@ extension Connection {
}
}

let descriptor: MethodDescriptor
let context: ClientContext

private let http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>

var peerInfo: String {
self.http2Stream.channel.getRemoteAddressInfo()
}

var localInfo: String {
self.http2Stream.channel.getLocalAddressInfo()
}

init(
wrapping stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>,
descriptor: MethodDescriptor
context: ClientContext
) {
self.http2Stream = stream
self.descriptor = descriptor
self.context = context
}

package func execute<T>(
Expand Down Expand Up @@ -465,13 +463,19 @@ extension Connection {
struct Connected: Sendable {
/// The connection channel.
var channel: NIOAsyncChannel<ClientConnectionEvent, Void>
/// The connection's remote peer information.
var remotePeer: String
/// The connection's local peer information.
var localPeer: String
/// Multiplexer for creating HTTP/2 streams.
var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>
/// Whether the connection is plaintext, `false` implies TLS is being used.
var scheme: Scheme

init(_ connection: HTTP2Connection) {
self.channel = connection.channel
self.remotePeer = connection.channel.remoteAddressInfo
self.localPeer = connection.channel.localAddressInfo
self.multiplexer = connection.multiplexer
self.scheme = connection.isPlaintext ? .http : .https
}
Expand Down
16 changes: 6 additions & 10 deletions Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,10 @@ package final class GRPCChannel: ClientTransport {
package func withStream<T: Sendable>(
descriptor: MethodDescriptor,
options: CallOptions,
_ closure: (_ stream: RPCStream<Inbound, Outbound>, _ context: ClientContext) async throws -> T
_ closure: (
_ stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>,
_ context: ClientContext
) async throws -> T
) async throws -> T {
// Merge options from the call with those from the service config.
let methodConfig = self.config(forMethod: descriptor)
Expand All @@ -214,18 +217,11 @@ package final class GRPCChannel: ClientTransport {
case .created(let stream):
return try await stream.execute { inbound, outbound in
let rpcStream = RPCStream(
descriptor: stream.descriptor,
descriptor: stream.context.descriptor,
inbound: RPCAsyncSequence<RPCResponsePart, any Error>(wrapping: inbound),
outbound: RPCWriter.Closable(wrapping: outbound)
)
let context = ClientContext(
descriptor: descriptor,
remotePeer: stream.peerInfo,
localPeer: stream.localInfo,
serverHostname: self.authority ?? "<unknown>",
networkTransportMethod: "tcp"
)
return try await closure(rpcStream, context)
return try await closure(rpcStream, stream.context)
}

case .tryAgain(let error):
Expand Down
14 changes: 7 additions & 7 deletions Sources/GRPCNIOTransportCore/Internal/Channel+AddressInfo.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
* limitations under the License.
*/

import NIOCore
internal import NIOCore

extension Channel {
func getRemoteAddressInfo() -> String {
guard let remote = self.remoteAddress else {
extension NIOAsyncChannel {
var remoteAddressInfo: String {
guard let remote = self.channel.remoteAddress else {
return "<unknown>"
}

Expand All @@ -33,7 +33,7 @@ extension Channel {

case .unixDomainSocket:
// The pathname will be on the local address.
guard let local = self.localAddress else {
guard let local = self.channel.localAddress else {
// UDS but no local address; this shouldn't ever happen but at least note the transport
// as being UDS.
return "unix:<unknown>"
Expand All @@ -51,8 +51,8 @@ extension Channel {
}
}

func getLocalAddressInfo() -> String {
guard let local = self.localAddress else {
var localAddressInfo: String {
guard let local = self.channel.localAddress else {
return "<unknown>"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ package final class CommonHTTP2ServerTransport<
_ context: ServerContext
) async -> Void
) async throws {
let peer = connection.channel.getRemoteAddressInfo()
let peer = connection.remoteAddressInfo
try await connection.executeThenClose { inbound, _ in
await withDiscardingTaskGroup { group in
group.addTask {
Expand Down

0 comments on commit 7ca6a4b

Please sign in to comment.