From 3b0fe70fb1bd3f9dd2877220c9caaedbaadb4d73 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Thu, 21 Nov 2024 10:23:44 +0000 Subject: [PATCH] Add 'with-' methods for client and server (#2121) Motivation: In some situations, like examples, testing, and prototyping, it can be useful to have a client and server with scoped lifetimes. This is all achievable using task groups but in a number of situations having helpers is also useful. Modifications: - Add 'with-' methods for client and server - Update docs Result: Easier to use API for some scenarios. --- Sources/GRPCCore/GRPCClient.swift | 139 +++++++++--------- Sources/GRPCCore/GRPCServer.swift | 99 +++++++++++-- Tests/GRPCCoreTests/GRPCClientTests.swift | 23 ++- Tests/GRPCCoreTests/GRPCServerTests.swift | 22 ++- .../ClientServerWithMethods.swift | 56 +++++++ 5 files changed, 234 insertions(+), 105 deletions(-) create mode 100644 Tests/GRPCInProcessTransportTests/ClientServerWithMethods.swift diff --git a/Sources/GRPCCore/GRPCClient.swift b/Sources/GRPCCore/GRPCClient.swift index 79e3deb4a..0ac39c8e0 100644 --- a/Sources/GRPCCore/GRPCClient.swift +++ b/Sources/GRPCCore/GRPCClient.swift @@ -28,79 +28,25 @@ private import Synchronization /// /// However, in most cases you should prefer wrapping the ``GRPCClient`` with a generated stub. /// -/// You can set ``ServiceConfig``s on this client to override whatever configurations have been -/// set on the given transport. You can also use ``ClientInterceptor``s to implement cross-cutting -/// logic which apply to all RPCs. Example uses of interceptors include authentication and logging. +/// ## Creating a client /// -/// ## Creating and configuring a client -/// -/// The following example demonstrates how to create and configure a client. +/// You can create and run a client using ``withGRPCClient(transport:interceptors:isolation:handleClient:)`` +/// or ``withGRPCClient(transport:interceptorPipeline:isolation:handleClient:)`` which create, configure and +/// run the client providing scoped access to it via the `handleClient` closure. The client will +/// begin gracefully shutting down when the closure returns. /// /// ```swift -/// // Create a configuration object for the client and override the timeout for the 'Get' method on -/// // the 'echo.Echo' service. This configuration takes precedence over any set by the transport. -/// var configuration = GRPCClient.Configuration() -/// configuration.service.override = ServiceConfig( -/// methodConfig: [ -/// MethodConfig( -/// names: [ -/// MethodConfig.Name(service: "echo.Echo", method: "Get") -/// ], -/// timeout: .seconds(5) -/// ) -/// ] -/// ) -/// -/// // Configure a fallback timeout for all RPCs (indicated by an empty service and method name) if -/// // no configuration is provided in the overrides or by the transport. -/// configuration.service.defaults = ServiceConfig( -/// methodConfig: [ -/// MethodConfig( -/// names: [ -/// MethodConfig.Name(service: "", method: "") -/// ], -/// timeout: .seconds(10) -/// ) -/// ] -/// ) -/// -/// // Finally create a transport and instantiate the client, adding an interceptor. -/// let inProcessTransport = InProcessTransport() -/// -/// let client = GRPCClient( -/// transport: inProcessTransport.client, -/// interceptors: [StatsRecordingClientInterceptor()], -/// configuration: configuration -/// ) +/// let transport: any ClientTransport = ... +/// try await withGRPCClient(transport: transport) { client in +/// // ... +/// } /// ``` /// -/// ## Starting and stopping the client +/// ## Creating a client manually /// -/// Once you have configured the client, call ``run()`` to start it. Calling ``run()`` instructs the -/// transport to start connecting to the server. -/// -/// ```swift -/// // Start running the client. 'run()' must be running while RPCs are execute so it's executed in -/// // a task group. -/// try await withThrowingTaskGroup(of: Void.self) { group in -/// group.addTask { -/// try await client.run() -/// } -/// -/// // Execute a request against the "echo.Echo" service. -/// try await client.unary( -/// request: ClientRequest<[UInt8]>(message: [72, 101, 108, 108, 111, 33]), -/// descriptor: MethodDescriptor(service: "echo.Echo", method: "Get"), -/// serializer: IdentitySerializer(), -/// deserializer: IdentityDeserializer(), -/// ) { response in -/// print(response.message) -/// } -/// -/// // The RPC has completed, close the client. -/// client.beginGracefulShutdown() -/// } -/// ``` +/// If the `with`-style methods for creating clients isn't suitable for your application then you +/// can create and run a client manually. This requires you to call the ``run()`` method in a task +/// which instructs the client to start connecting to the server. /// /// The ``run()`` method won't return until the client has finished handling all requests. You can /// signal to the client that it should stop creating new request streams by calling ``beginGracefulShutdown()``. @@ -425,3 +371,62 @@ public final class GRPCClient: Sendable { ) } } + +/// Creates and runs a new client with the given transport and interceptors. +/// +/// - Parameters: +/// - transport: The transport used to establish a communication channel with a server. +/// - interceptors: A collection of ``ClientInterceptor``s providing cross-cutting functionality to each +/// accepted RPC. The order in which interceptors are added reflects the order in which they +/// are called. The first interceptor added will be the first interceptor to intercept each +/// request. The last interceptor added will be the final interceptor to intercept each +/// request before calling the appropriate handler. +/// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the +/// code is nonisolated. +/// - handleClient: A closure which is called with the client. When the closure returns, the +/// client is shutdown gracefully. +public func withGRPCClient( + transport: some ClientTransport, + interceptors: [any ClientInterceptor] = [], + isolation: isolated (any Actor)? = #isolation, + handleClient: (GRPCClient) async throws -> Result +) async throws -> Result { + try await withGRPCClient( + transport: transport, + interceptorPipeline: interceptors.map { .apply($0, to: .all) }, + isolation: isolation, + handleClient: handleClient + ) +} + +/// Creates and runs a new client with the given transport and interceptors. +/// +/// - Parameters: +/// - transport: The transport used to establish a communication channel with a server. +/// - interceptorPipeline: A collection of ``ClientInterceptorPipelineOperation`` providing cross-cutting +/// functionality to each accepted RPC. Only applicable interceptors from the pipeline will be applied to each RPC. +/// The order in which interceptors are added reflects the order in which they are called. +/// The first interceptor added will be the first interceptor to intercept each request. +/// The last interceptor added will be the final interceptor to intercept each request before calling the appropriate handler. +/// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the +/// code is nonisolated. +/// - handleClient: A closure which is called with the client. When the closure returns, the +/// client is shutdown gracefully. +/// - Returns: The result of the `handleClient` closure. +public func withGRPCClient( + transport: some ClientTransport, + interceptorPipeline: [ClientInterceptorPipelineOperation], + isolation: isolated (any Actor)? = #isolation, + handleClient: (GRPCClient) async throws -> Result +) async throws -> Result { + try await withThrowingDiscardingTaskGroup { group in + let client = GRPCClient(transport: transport, interceptorPipeline: interceptorPipeline) + group.addTask { + try await client.run() + } + + let result = try await handleClient(client) + client.beginGracefulShutdown() + return result + } +} diff --git a/Sources/GRPCCore/GRPCServer.swift b/Sources/GRPCCore/GRPCServer.swift index 6ff82b9dd..f8f576e65 100644 --- a/Sources/GRPCCore/GRPCServer.swift +++ b/Sources/GRPCCore/GRPCServer.swift @@ -29,13 +29,13 @@ private import Synchronization /// include request filtering, authentication, and logging. Once requests have been intercepted /// they are passed to a handler which in turn returns a response to send back to the client. /// -/// ## Creating and configuring a server +/// ## Configuring and starting a server /// -/// The following example demonstrates how to create and configure a server. +/// The following example demonstrates how to create and run a server. /// /// ```swift -/// // Create and an in-process transport. -/// let inProcessTransport = InProcessTransport() +/// // Create an transport +/// let transport: any ServerTransport = ... /// /// // Create the 'Greeter' and 'Echo' services. /// let greeter = GreeterService() @@ -44,19 +44,24 @@ private import Synchronization /// // Create an interceptor. /// let statsRecorder = StatsRecordingServerInterceptors() /// -/// // Finally create the server. -/// let server = GRPCServer( -/// transport: inProcessTransport.server, +/// // Run the server. +/// try await withGRPCServer( +/// transport: transport, /// services: [greeter, echo], /// interceptors: [statsRecorder] -/// ) +/// ) { server in +/// // ... +/// // The server begins shutting down when this closure returns +/// // ... +/// } /// ``` /// -/// ## Starting and stopping the server +/// ## Creating a client manually /// -/// Once you have configured the server call ``serve()`` to start it. Calling ``serve()`` starts the server's -/// transport too. A ``RuntimeError`` is thrown if the transport can't be started or encounters some other -/// runtime error. +/// If the `with`-style methods for creating a server isn't suitable for your application then you +/// can create and run it manually. This requires you to call the ``serve()`` method in a task +/// which instructs the server to start its transport and listen for new RPCs. A ``RuntimeError`` is +/// thrown if the transport can't be started or encounters some other runtime error. /// /// ```swift /// // Start running the server. @@ -235,3 +240,73 @@ public final class GRPCServer: Sendable { } } } + +/// Creates and runs a gRPC server. +/// +/// - Parameters: +/// - transport: The transport the server should listen on. +/// - services: Services offered by the server. +/// - interceptors: A collection of interceptors providing cross-cutting functionality to each +/// accepted RPC. The order in which interceptors are added reflects the order in which they +/// are called. The first interceptor added will be the first interceptor to intercept each +/// request. The last interceptor added will be the final interceptor to intercept each +/// request before calling the appropriate handler. +/// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the +/// code is nonisolated. +/// - handleServer: A closure which is called with the server. When the closure returns, the +/// server is shutdown gracefully. +/// - Returns: The result of the `handleServer` closure. +public func withGRPCServer( + transport: any ServerTransport, + services: [any RegistrableRPCService], + interceptors: [any ServerInterceptor] = [], + isolation: isolated (any Actor)? = #isolation, + handleServer: (GRPCServer) async throws -> Result +) async throws -> Result { + try await withGRPCServer( + transport: transport, + services: services, + interceptorPipeline: interceptors.map { .apply($0, to: .all) }, + isolation: isolation, + handleServer: handleServer + ) +} + +/// Creates and runs a gRPC server. +/// +/// - Parameters: +/// - transport: The transport the server should listen on. +/// - services: Services offered by the server. +/// - interceptorPipeline: A collection of interceptors providing cross-cutting functionality to each +/// accepted RPC. The order in which interceptors are added reflects the order in which they +/// are called. The first interceptor added will be the first interceptor to intercept each +/// request. The last interceptor added will be the final interceptor to intercept each +/// request before calling the appropriate handler. +/// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the +/// code is nonisolated. +/// - handleServer: A closure which is called with the server. When the closure returns, the +/// server is shutdown gracefully. +/// - Returns: The result of the `handleServer` closure. +public func withGRPCServer( + transport: any ServerTransport, + services: [any RegistrableRPCService], + interceptorPipeline: [ServerInterceptorPipelineOperation], + isolation: isolated (any Actor)? = #isolation, + handleServer: (GRPCServer) async throws -> Result +) async throws -> Result { + return try await withThrowingDiscardingTaskGroup { group in + let server = GRPCServer( + transport: transport, + services: services, + interceptorPipeline: interceptorPipeline + ) + + group.addTask { + try await server.serve() + } + + let result = try await handleServer(server) + server.beginGracefulShutdown() + return result + } +} diff --git a/Tests/GRPCCoreTests/GRPCClientTests.swift b/Tests/GRPCCoreTests/GRPCClientTests.swift index ed5396da1..ca8331b61 100644 --- a/Tests/GRPCCoreTests/GRPCClientTests.swift +++ b/Tests/GRPCCoreTests/GRPCClientTests.swift @@ -29,20 +29,17 @@ final class GRPCClientTests: XCTestCase { let client = GRPCClient(transport: inProcess.client, interceptorPipeline: interceptorPipeline) let server = GRPCServer(transport: inProcess.server, services: services) - try await withThrowingTaskGroup(of: Void.self) { group in - group.addTask { - try await server.serve() - } - - group.addTask { - try await client.run() + try await withGRPCServer( + transport: inProcess.server, + services: services + ) { server in + try await withGRPCClient( + transport: inProcess.client, + interceptorPipeline: interceptorPipeline + ) { client in + try await Task.sleep(for: .milliseconds(100)) + try await body(client, server) } - - // Make sure both server and client are running - try await Task.sleep(for: .milliseconds(100)) - try await body(client, server) - client.beginGracefulShutdown() - server.beginGracefulShutdown() } } diff --git a/Tests/GRPCCoreTests/GRPCServerTests.swift b/Tests/GRPCCoreTests/GRPCServerTests.swift index 9b20785d5..b61fb2022 100644 --- a/Tests/GRPCCoreTests/GRPCServerTests.swift +++ b/Tests/GRPCCoreTests/GRPCServerTests.swift @@ -26,24 +26,20 @@ final class GRPCServerTests: XCTestCase { _ body: (InProcessTransport.Client, GRPCServer) async throws -> Void ) async throws { let inProcess = InProcessTransport() - let server = GRPCServer( + + try await withGRPCServer( transport: inProcess.server, services: services, interceptorPipeline: interceptorPipeline - ) - - try await withThrowingTaskGroup(of: Void.self) { group in - group.addTask { - try await server.serve() - } + ) { server in + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await inProcess.client.connect() + } - group.addTask { - try await inProcess.client.connect() + try await body(inProcess.client, server) + inProcess.client.beginGracefulShutdown() } - - try await body(inProcess.client, server) - inProcess.client.beginGracefulShutdown() - server.beginGracefulShutdown() } } diff --git a/Tests/GRPCInProcessTransportTests/ClientServerWithMethods.swift b/Tests/GRPCInProcessTransportTests/ClientServerWithMethods.swift new file mode 100644 index 000000000..930b18183 --- /dev/null +++ b/Tests/GRPCInProcessTransportTests/ClientServerWithMethods.swift @@ -0,0 +1,56 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore +import GRPCInProcessTransport +import Testing + +@Suite("withGRPCServer / withGRPCClient") +struct WithMethods { + @Test("Actor isolation") + func actorIsolation() async throws { + let testActor = TestActor() + #expect(await !testActor.hasRun) + try await testActor.run() + #expect(await testActor.hasRun) + } +} + +fileprivate actor TestActor { + private(set) var hasRun = false + + func run() async throws { + let inProcess = InProcessTransport() + + try await withGRPCServer(transport: inProcess.server, services: []) { server in + do { + try await withGRPCClient(transport: inProcess.client) { client in + self.hasRun = true + } + } catch { + // Starting the client can race with the closure returning which begins graceful shutdown. + // If that happens the client run method will throw an error as the client is being run + // when it's already been shutdown. That's okay and expected so rather than slowing down + // the closure tolerate that specific error. + if let error = error as? RuntimeError { + #expect(error.code == .clientIsStopped) + } else { + Issue.record(error) + } + } + } + } +}