Skip to content

Commit

Permalink
Add 'with-' methods for client and server (#2121)
Browse files Browse the repository at this point in the history
Motivation:

In some situations, like examples, testing, and prototyping, it can be
useful to have a client and server with scoped lifetimes. This is all
achievable using task groups but in a number of situations having
helpers is also useful.

Modifications:

- Add 'with-' methods for client and server
- Update docs

Result:

Easier to use API for some scenarios.
  • Loading branch information
glbrntt authored Nov 21, 2024
1 parent 8d0bf6f commit 3b0fe70
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 105 deletions.
139 changes: 72 additions & 67 deletions Sources/GRPCCore/GRPCClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,79 +28,25 @@ private import Synchronization
///
/// However, in most cases you should prefer wrapping the ``GRPCClient`` with a generated stub.
///
/// You can set ``ServiceConfig``s on this client to override whatever configurations have been
/// set on the given transport. You can also use ``ClientInterceptor``s to implement cross-cutting
/// logic which apply to all RPCs. Example uses of interceptors include authentication and logging.
/// ## Creating a client
///
/// ## Creating and configuring a client
///
/// The following example demonstrates how to create and configure a client.
/// You can create and run a client using ``withGRPCClient(transport:interceptors:isolation:handleClient:)``
/// or ``withGRPCClient(transport:interceptorPipeline:isolation:handleClient:)`` which create, configure and
/// run the client providing scoped access to it via the `handleClient` closure. The client will
/// begin gracefully shutting down when the closure returns.
///
/// ```swift
/// // Create a configuration object for the client and override the timeout for the 'Get' method on
/// // the 'echo.Echo' service. This configuration takes precedence over any set by the transport.
/// var configuration = GRPCClient.Configuration()
/// configuration.service.override = ServiceConfig(
/// methodConfig: [
/// MethodConfig(
/// names: [
/// MethodConfig.Name(service: "echo.Echo", method: "Get")
/// ],
/// timeout: .seconds(5)
/// )
/// ]
/// )
///
/// // Configure a fallback timeout for all RPCs (indicated by an empty service and method name) if
/// // no configuration is provided in the overrides or by the transport.
/// configuration.service.defaults = ServiceConfig(
/// methodConfig: [
/// MethodConfig(
/// names: [
/// MethodConfig.Name(service: "", method: "")
/// ],
/// timeout: .seconds(10)
/// )
/// ]
/// )
///
/// // Finally create a transport and instantiate the client, adding an interceptor.
/// let inProcessTransport = InProcessTransport()
///
/// let client = GRPCClient(
/// transport: inProcessTransport.client,
/// interceptors: [StatsRecordingClientInterceptor()],
/// configuration: configuration
/// )
/// let transport: any ClientTransport = ...
/// try await withGRPCClient(transport: transport) { client in
/// // ...
/// }
/// ```
///
/// ## Starting and stopping the client
/// ## Creating a client manually
///
/// Once you have configured the client, call ``run()`` to start it. Calling ``run()`` instructs the
/// transport to start connecting to the server.
///
/// ```swift
/// // Start running the client. 'run()' must be running while RPCs are execute so it's executed in
/// // a task group.
/// try await withThrowingTaskGroup(of: Void.self) { group in
/// group.addTask {
/// try await client.run()
/// }
///
/// // Execute a request against the "echo.Echo" service.
/// try await client.unary(
/// request: ClientRequest<[UInt8]>(message: [72, 101, 108, 108, 111, 33]),
/// descriptor: MethodDescriptor(service: "echo.Echo", method: "Get"),
/// serializer: IdentitySerializer(),
/// deserializer: IdentityDeserializer(),
/// ) { response in
/// print(response.message)
/// }
///
/// // The RPC has completed, close the client.
/// client.beginGracefulShutdown()
/// }
/// ```
/// If the `with`-style methods for creating clients isn't suitable for your application then you
/// can create and run a client manually. This requires you to call the ``run()`` method in a task
/// which instructs the client to start connecting to the server.
///
/// The ``run()`` method won't return until the client has finished handling all requests. You can
/// signal to the client that it should stop creating new request streams by calling ``beginGracefulShutdown()``.
Expand Down Expand Up @@ -425,3 +371,62 @@ public final class GRPCClient: Sendable {
)
}
}

/// Creates and runs a new client with the given transport and interceptors.
///
/// - Parameters:
/// - transport: The transport used to establish a communication channel with a server.
/// - interceptors: A collection of ``ClientInterceptor``s providing cross-cutting functionality to each
/// accepted RPC. The order in which interceptors are added reflects the order in which they
/// are called. The first interceptor added will be the first interceptor to intercept each
/// request. The last interceptor added will be the final interceptor to intercept each
/// request before calling the appropriate handler.
/// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
/// code is nonisolated.
/// - handleClient: A closure which is called with the client. When the closure returns, the
/// client is shutdown gracefully.
public func withGRPCClient<Result: Sendable>(
transport: some ClientTransport,
interceptors: [any ClientInterceptor] = [],
isolation: isolated (any Actor)? = #isolation,
handleClient: (GRPCClient) async throws -> Result
) async throws -> Result {
try await withGRPCClient(
transport: transport,
interceptorPipeline: interceptors.map { .apply($0, to: .all) },
isolation: isolation,
handleClient: handleClient
)
}

/// Creates and runs a new client with the given transport and interceptors.
///
/// - Parameters:
/// - transport: The transport used to establish a communication channel with a server.
/// - interceptorPipeline: A collection of ``ClientInterceptorPipelineOperation`` providing cross-cutting
/// functionality to each accepted RPC. Only applicable interceptors from the pipeline will be applied to each RPC.
/// The order in which interceptors are added reflects the order in which they are called.
/// The first interceptor added will be the first interceptor to intercept each request.
/// The last interceptor added will be the final interceptor to intercept each request before calling the appropriate handler.
/// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
/// code is nonisolated.
/// - handleClient: A closure which is called with the client. When the closure returns, the
/// client is shutdown gracefully.
/// - Returns: The result of the `handleClient` closure.
public func withGRPCClient<Result: Sendable>(
transport: some ClientTransport,
interceptorPipeline: [ClientInterceptorPipelineOperation],
isolation: isolated (any Actor)? = #isolation,
handleClient: (GRPCClient) async throws -> Result
) async throws -> Result {
try await withThrowingDiscardingTaskGroup { group in
let client = GRPCClient(transport: transport, interceptorPipeline: interceptorPipeline)
group.addTask {
try await client.run()
}

let result = try await handleClient(client)
client.beginGracefulShutdown()
return result
}
}
99 changes: 87 additions & 12 deletions Sources/GRPCCore/GRPCServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ private import Synchronization
/// include request filtering, authentication, and logging. Once requests have been intercepted
/// they are passed to a handler which in turn returns a response to send back to the client.
///
/// ## Creating and configuring a server
/// ## Configuring and starting a server
///
/// The following example demonstrates how to create and configure a server.
/// The following example demonstrates how to create and run a server.
///
/// ```swift
/// // Create and an in-process transport.
/// let inProcessTransport = InProcessTransport()
/// // Create an transport
/// let transport: any ServerTransport = ...
///
/// // Create the 'Greeter' and 'Echo' services.
/// let greeter = GreeterService()
Expand All @@ -44,19 +44,24 @@ private import Synchronization
/// // Create an interceptor.
/// let statsRecorder = StatsRecordingServerInterceptors()
///
/// // Finally create the server.
/// let server = GRPCServer(
/// transport: inProcessTransport.server,
/// // Run the server.
/// try await withGRPCServer(
/// transport: transport,
/// services: [greeter, echo],
/// interceptors: [statsRecorder]
/// )
/// ) { server in
/// // ...
/// // The server begins shutting down when this closure returns
/// // ...
/// }
/// ```
///
/// ## Starting and stopping the server
/// ## Creating a client manually
///
/// Once you have configured the server call ``serve()`` to start it. Calling ``serve()`` starts the server's
/// transport too. A ``RuntimeError`` is thrown if the transport can't be started or encounters some other
/// runtime error.
/// If the `with`-style methods for creating a server isn't suitable for your application then you
/// can create and run it manually. This requires you to call the ``serve()`` method in a task
/// which instructs the server to start its transport and listen for new RPCs. A ``RuntimeError`` is
/// thrown if the transport can't be started or encounters some other runtime error.
///
/// ```swift
/// // Start running the server.
Expand Down Expand Up @@ -235,3 +240,73 @@ public final class GRPCServer: Sendable {
}
}
}

/// Creates and runs a gRPC server.
///
/// - Parameters:
/// - transport: The transport the server should listen on.
/// - services: Services offered by the server.
/// - interceptors: A collection of interceptors providing cross-cutting functionality to each
/// accepted RPC. The order in which interceptors are added reflects the order in which they
/// are called. The first interceptor added will be the first interceptor to intercept each
/// request. The last interceptor added will be the final interceptor to intercept each
/// request before calling the appropriate handler.
/// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
/// code is nonisolated.
/// - handleServer: A closure which is called with the server. When the closure returns, the
/// server is shutdown gracefully.
/// - Returns: The result of the `handleServer` closure.
public func withGRPCServer<Result: Sendable>(
transport: any ServerTransport,
services: [any RegistrableRPCService],
interceptors: [any ServerInterceptor] = [],
isolation: isolated (any Actor)? = #isolation,
handleServer: (GRPCServer) async throws -> Result
) async throws -> Result {
try await withGRPCServer(
transport: transport,
services: services,
interceptorPipeline: interceptors.map { .apply($0, to: .all) },
isolation: isolation,
handleServer: handleServer
)
}

/// Creates and runs a gRPC server.
///
/// - Parameters:
/// - transport: The transport the server should listen on.
/// - services: Services offered by the server.
/// - interceptorPipeline: A collection of interceptors providing cross-cutting functionality to each
/// accepted RPC. The order in which interceptors are added reflects the order in which they
/// are called. The first interceptor added will be the first interceptor to intercept each
/// request. The last interceptor added will be the final interceptor to intercept each
/// request before calling the appropriate handler.
/// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
/// code is nonisolated.
/// - handleServer: A closure which is called with the server. When the closure returns, the
/// server is shutdown gracefully.
/// - Returns: The result of the `handleServer` closure.
public func withGRPCServer<Result: Sendable>(
transport: any ServerTransport,
services: [any RegistrableRPCService],
interceptorPipeline: [ServerInterceptorPipelineOperation],
isolation: isolated (any Actor)? = #isolation,
handleServer: (GRPCServer) async throws -> Result
) async throws -> Result {
return try await withThrowingDiscardingTaskGroup { group in
let server = GRPCServer(
transport: transport,
services: services,
interceptorPipeline: interceptorPipeline
)

group.addTask {
try await server.serve()
}

let result = try await handleServer(server)
server.beginGracefulShutdown()
return result
}
}
23 changes: 10 additions & 13 deletions Tests/GRPCCoreTests/GRPCClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,17 @@ final class GRPCClientTests: XCTestCase {
let client = GRPCClient(transport: inProcess.client, interceptorPipeline: interceptorPipeline)
let server = GRPCServer(transport: inProcess.server, services: services)

try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await server.serve()
}

group.addTask {
try await client.run()
try await withGRPCServer(
transport: inProcess.server,
services: services
) { server in
try await withGRPCClient(
transport: inProcess.client,
interceptorPipeline: interceptorPipeline
) { client in
try await Task.sleep(for: .milliseconds(100))
try await body(client, server)
}

// Make sure both server and client are running
try await Task.sleep(for: .milliseconds(100))
try await body(client, server)
client.beginGracefulShutdown()
server.beginGracefulShutdown()
}
}

Expand Down
22 changes: 9 additions & 13 deletions Tests/GRPCCoreTests/GRPCServerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,20 @@ final class GRPCServerTests: XCTestCase {
_ body: (InProcessTransport.Client, GRPCServer) async throws -> Void
) async throws {
let inProcess = InProcessTransport()
let server = GRPCServer(

try await withGRPCServer(
transport: inProcess.server,
services: services,
interceptorPipeline: interceptorPipeline
)

try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await server.serve()
}
) { server in
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await inProcess.client.connect()
}

group.addTask {
try await inProcess.client.connect()
try await body(inProcess.client, server)
inProcess.client.beginGracefulShutdown()
}

try await body(inProcess.client, server)
inProcess.client.beginGracefulShutdown()
server.beginGracefulShutdown()
}
}

Expand Down
Loading

0 comments on commit 3b0fe70

Please sign in to comment.