Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make transport generic over its bag-of-bytes type #2155

Merged
merged 5 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions Sources/GRPCCodeGen/Internal/StructuredSwift+Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -645,10 +645,10 @@ extension FunctionDescription {

extension StructDescription {
/// ```
/// struct <Name>: <ClientProtocol> {
/// private let client: GRPCCore.GRPCClient
/// struct <Name><Transport>: <ClientProtocol> where Transport: GRPCCore.ClientTransport {
/// private let client: GRPCCore.GRPCClient<Transport>
///
/// init(wrapping client: GRPCCore.GRPCClient) {
/// init(wrapping client: GRPCCore.GRPCClient<Transport>) {
/// self.client = client
/// }
///
Expand All @@ -665,9 +665,18 @@ extension StructDescription {
StructDescription(
accessModifier: accessLevel,
name: name,
generics: [.member("Transport")],
conformances: [clientProtocol],
whereClause: WhereClause(
requirements: [.conformance("Transport", "GRPCCore.ClientTransport")]
),
members: [
.variable(accessModifier: .private, kind: .let, left: "client", type: .grpcClient),
.variable(
accessModifier: .private,
kind: .let,
left: "client",
type: .grpcClient(genericOver: "Transport")
),
.commentable(
.preFormatted(
"""
Expand All @@ -681,7 +690,13 @@ extension StructDescription {
accessModifier: accessLevel,
kind: .initializer,
parameters: [
ParameterDescription(label: "wrapping", name: "client", type: .grpcClient)
ParameterDescription(
label: "wrapping",
name: "client",
type: .grpcClient(
genericOver: "Transport"
)
)
],
whereClause: nil,
body: [
Expand Down
8 changes: 7 additions & 1 deletion Sources/GRPCCodeGen/Internal/StructuredSwift+Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,20 @@ extension FunctionDescription {
return FunctionDescription(
accessModifier: accessLevel,
kind: .function(name: "registerMethods"),
generics: [.member("Transport")],
parameters: [
ParameterDescription(
label: "with",
name: "router",
type: .rpcRouter,
type: .rpcRouter(genericOver: "Transport"),
`inout`: true
)
],
whereClause: WhereClause(
requirements: [
.conformance("Transport", "GRPCCore.ServerTransport")
]
),
body: methods.map { method in
.functionCall(
.registerWithRouter(
Expand Down
11 changes: 9 additions & 2 deletions Sources/GRPCCodeGen/Internal/StructuredSwift+Types.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ extension ExistingTypeDescription {
}

package static let serverContext: Self = .grpcCore("ServerContext")
package static let rpcRouter: Self = .grpcCore("RPCRouter")

package static func rpcRouter(genericOver type: String) -> Self {
.generic(wrapper: .grpcCore("RPCRouter"), wrapped: .member(type))
}

package static let serviceDescriptor: Self = .grpcCore("ServiceDescriptor")
package static let methodDescriptor: Self = .grpcCore("MethodDescriptor")

Expand All @@ -80,5 +84,8 @@ extension ExistingTypeDescription {

package static let callOptions: Self = .grpcCore("CallOptions")
package static let metadata: Self = .grpcCore("Metadata")
package static let grpcClient: Self = .grpcCore("GRPCClient")

package static func grpcClient(genericOver transport: String) -> Self {
.generic(wrapper: .grpcCore("GRPCClient"), wrapped: [.member(transport)])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,12 @@ extension ClientRPCExecutor.RetryExecutor {
}

@inlinable
func executeAttempt<R: Sendable>(
func executeAttempt<R: Sendable, Bytes: GRPCContiguousBytes>(
context: ClientContext,
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>,
stream: RPCStream<
RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>,
RPCWriter<RPCRequestPart<Bytes>>.Closable
>,
metadata: Metadata,
retryStream: BroadcastAsyncSequence<Input>,
method: MethodDescriptor,
Expand Down
7 changes: 5 additions & 2 deletions Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,18 @@ extension ClientRPCExecutor {
/// - stream: The stream to excecute the RPC on.
/// - Returns: The deserialized response.
@inlinable // would be private
static func _execute<Input: Sendable, Output: Sendable>(
static func _execute<Input: Sendable, Output: Sendable, Bytes: GRPCContiguousBytes>(
in group: inout TaskGroup<Void>,
context: ClientContext,
request: StreamingClientRequest<Input>,
attempt: Int,
serializer: some MessageSerializer<Input>,
deserializer: some MessageDeserializer<Output>,
interceptors: [any ClientInterceptor],
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>
stream: RPCStream<
RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>,
RPCWriter<RPCRequestPart<Bytes>>.Closable
>
) async -> StreamingClientResponse<Output> {

if interceptors.isEmpty {
Expand Down
27 changes: 17 additions & 10 deletions Sources/GRPCCore/Call/Client/Internal/ClientStreamExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ internal enum ClientStreamExecutor {
/// - stream: The stream to excecute the RPC on.
/// - Returns: A streamed response.
@inlinable
static func execute<Input: Sendable, Output: Sendable>(
static func execute<Input: Sendable, Output: Sendable, Bytes: GRPCContiguousBytes>(
in group: inout TaskGroup<Void>,
request: StreamingClientRequest<Input>,
context: ClientContext,
attempt: Int,
serializer: some MessageSerializer<Input>,
deserializer: some MessageDeserializer<Output>,
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>
stream: RPCStream<
RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>,
RPCWriter<RPCRequestPart<Bytes>>.Closable
>
) async -> StreamingClientResponse<Output> {
// Let the server know this is a retry.
var metadata = request.metadata
Expand Down Expand Up @@ -83,8 +86,8 @@ internal enum ClientStreamExecutor {
}

@inlinable // would be private
static func _processRequest<Outbound>(
on stream: some ClosableRPCWriterProtocol<RPCRequestPart>,
static func _processRequest<Outbound, Bytes: GRPCContiguousBytes>(
on stream: some ClosableRPCWriterProtocol<RPCRequestPart<Bytes>>,
request: StreamingClientRequest<Outbound>,
serializer: some MessageSerializer<Outbound>
) async {
Expand All @@ -104,16 +107,19 @@ internal enum ClientStreamExecutor {
}

@usableFromInline
enum OnFirstResponsePart: Sendable {
case metadata(Metadata, UnsafeTransfer<ClientTransport.Inbound.AsyncIterator>)
enum OnFirstResponsePart<Bytes: GRPCContiguousBytes>: Sendable {
case metadata(
Metadata,
UnsafeTransfer<RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>.AsyncIterator>
)
case status(Status, Metadata)
case failed(RPCError)
}

@inlinable // would be private
static func _waitForFirstResponsePart(
on stream: ClientTransport.Inbound
) async -> OnFirstResponsePart {
static func _waitForFirstResponsePart<Bytes: GRPCContiguousBytes>(
on stream: RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>
) async -> OnFirstResponsePart<Bytes> {
var iterator = stream.makeAsyncIterator()
let result = await Result<OnFirstResponsePart, any Error> {
switch try await iterator.next() {
Expand Down Expand Up @@ -165,7 +171,8 @@ internal enum ClientStreamExecutor {

@usableFromInline
struct RawBodyPartToMessageSequence<
Base: AsyncSequence<RPCResponsePart, Failure>,
Base: AsyncSequence<RPCResponsePart<Bytes>, Failure>,
Bytes: GRPCContiguousBytes,
Message: Sendable,
Deserializer: MessageDeserializer<Message>,
Failure: Error
Expand Down
36 changes: 18 additions & 18 deletions Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ struct ServerRPCExecutor {
/// interceptors will be called in the order of the array.
/// - handler: A handler which turns the request into a response.
@inlinable
static func execute<Input, Output>(
static func execute<Input, Output, Bytes: GRPCContiguousBytes>(
context: ServerContext,
stream: RPCStream<
RPCAsyncSequence<RPCRequestPart, any Error>,
RPCWriter<RPCResponsePart>.Closable
RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>,
RPCWriter<RPCResponsePart<Bytes>>.Closable
>,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
Expand Down Expand Up @@ -66,11 +66,11 @@ struct ServerRPCExecutor {
}

@inlinable
static func _execute<Input, Output>(
static func _execute<Input, Output, Bytes: GRPCContiguousBytes>(
context: ServerContext,
metadata: Metadata,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart>.Closable,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart<Bytes>>.Closable,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
interceptors: [any ServerInterceptor],
Expand Down Expand Up @@ -106,12 +106,12 @@ struct ServerRPCExecutor {
}

@inlinable
static func _processRPCWithTimeout<Input, Output>(
static func _processRPCWithTimeout<Input, Output, Bytes: GRPCContiguousBytes>(
timeout: Duration,
context: ServerContext,
metadata: Metadata,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart>.Closable,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart<Bytes>>.Closable,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
interceptors: [any ServerInterceptor],
Expand Down Expand Up @@ -147,11 +147,11 @@ struct ServerRPCExecutor {
}

@inlinable
static func _processRPC<Input, Output>(
static func _processRPC<Input, Output, Bytes: GRPCContiguousBytes>(
context: ServerContext,
metadata: Metadata,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart>.Closable,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart<Bytes>>.Closable,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
interceptors: [any ServerInterceptor],
Expand Down Expand Up @@ -235,12 +235,12 @@ struct ServerRPCExecutor {
}

@inlinable
static func _waitForFirstRequestPart(
inbound: RPCAsyncSequence<RPCRequestPart, any Error>
) async -> OnFirstRequestPart {
static func _waitForFirstRequestPart<Bytes: GRPCContiguousBytes>(
inbound: RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>
) async -> OnFirstRequestPart<Bytes> {
var iterator = inbound.makeAsyncIterator()
let part = await Result { try await iterator.next() }
let onFirstRequestPart: OnFirstRequestPart
let onFirstRequestPart: OnFirstRequestPart<Bytes>

switch part {
case .success(.metadata(let metadata)):
Expand Down Expand Up @@ -275,10 +275,10 @@ struct ServerRPCExecutor {
}

@usableFromInline
enum OnFirstRequestPart {
enum OnFirstRequestPart<Bytes: GRPCContiguousBytes> {
case process(
Metadata,
UnsafeTransfer<RPCAsyncSequence<RPCRequestPart, any Error>.AsyncIterator>
UnsafeTransfer<RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>.AsyncIterator>
)
case reject(RPCError)
}
Expand Down
14 changes: 7 additions & 7 deletions Sources/GRPCCore/Call/Server/RPCRouter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@
/// 1. Remove individual methods by calling ``removeHandler(forMethod:)``, or
/// 2. Implement ``RegistrableRPCService/registerMethods(with:)`` to register only the methods you
/// want to be served.
public struct RPCRouter: Sendable {
public struct RPCRouter<Transport: ServerTransport>: Sendable {
@usableFromInline
struct RPCHandler: Sendable {
@usableFromInline
let _fn:
@Sendable (
_ stream: RPCStream<
RPCAsyncSequence<RPCRequestPart, any Error>,
RPCWriter<RPCResponsePart>.Closable
RPCAsyncSequence<RPCRequestPart<Transport.Bytes>, any Error>,
RPCWriter<RPCResponsePart<Transport.Bytes>>.Closable
>,
_ context: ServerContext,
_ interceptors: [any ServerInterceptor]
Expand Down Expand Up @@ -73,8 +73,8 @@ public struct RPCRouter: Sendable {
@inlinable
func handle(
stream: RPCStream<
RPCAsyncSequence<RPCRequestPart, any Error>,
RPCWriter<RPCResponsePart>.Closable
RPCAsyncSequence<RPCRequestPart<Transport.Bytes>, any Error>,
RPCWriter<RPCResponsePart<Transport.Bytes>>.Closable
>,
context: ServerContext,
interceptors: [any ServerInterceptor]
Expand Down Expand Up @@ -171,8 +171,8 @@ public struct RPCRouter: Sendable {
extension RPCRouter {
internal func handle(
stream: RPCStream<
RPCAsyncSequence<RPCRequestPart, any Error>,
RPCWriter<RPCResponsePart>.Closable
RPCAsyncSequence<RPCRequestPart<Transport.Bytes>, any Error>,
RPCWriter<RPCResponsePart<Transport.Bytes>>.Closable
>,
context: ServerContext
) async {
Expand Down
2 changes: 1 addition & 1 deletion Sources/GRPCCore/Call/Server/RegistrableRPCService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ public protocol RegistrableRPCService: Sendable {
/// Registers methods to server with the provided ``RPCRouter``.
///
/// - Parameter router: The router to register methods with.
func registerMethods(with router: inout RPCRouter)
func registerMethods<Transport: ServerTransport>(with router: inout RPCRouter<Transport>)
}
4 changes: 2 additions & 2 deletions Sources/GRPCCore/Coding/Coding.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public protocol MessageSerializer<Message>: Sendable {
///
/// - Parameter message: The message to serialize.
/// - Returns: The serialized bytes of a message.
func serialize(_ message: Message) throws -> [UInt8]
func serialize<Bytes: GRPCContiguousBytes>(_ message: Message) throws -> Bytes
}

/// Deserializes a sequence of bytes into a message.
Expand All @@ -49,5 +49,5 @@ public protocol MessageDeserializer<Message>: Sendable {
///
/// - Parameter serializedMessageBytes: The bytes to deserialize.
/// - Returns: The deserialized message.
func deserialize(_ serializedMessageBytes: [UInt8]) throws -> Message
func deserialize<Bytes: GRPCContiguousBytes>(_ serializedMessageBytes: Bytes) throws -> Message
}
Loading
Loading