From 56defe62cc1a460a6b7026ab20f38bd89c0a734e Mon Sep 17 00:00:00 2001 From: George Barnett Date: Fri, 17 Jan 2025 13:33:44 +0000 Subject: [PATCH] Adopt new bag-of-bytes protocol (#55) Motivation: The core package added a bag-of-bytes protocol which transports must be generic over. We need to adopt those changes here. Modifications: - Add a `GRPCNIOTransportBytes` type which wraps `ByteBuffer` and implements the `GRPCContiguousBytes` protocol. - Use this as the bytes type for client/server transports. - Update the appropriate state machines to deal in terms of `ByteBuffer` - Update the compressor/decompressor to deal in terms of `ByteBuffer` - Update tests Result: Avoid unconditonal copying of messages to/from `[UInt8]`/`ByteBuffer` --- .../InteroperabilityTestsExecutable.swift | 9 +- .../Sources/BenchmarkClient.swift | 15 +- .../grpc_testing_benchmark_service.grpc.swift | 9 +- .../Generated/grpc_testing_control.grpc.swift | 24 +++ .../grpc_testing_messages.grpc.swift | 26 +++ .../grpc_testing_payloads.grpc.swift | 24 +++ .../Generated/grpc_testing_stats.grpc.swift | 24 +++ .../grpc_testing_worker_service.grpc.swift | 3 +- .../Sources/WorkerService.swift | 10 +- .../Client/Connection/Connection.swift | 31 ++-- .../Client/Connection/GRPCChannel.swift | 2 + .../Client/GRPCClientStreamHandler.swift | 9 +- .../Compression/Zlib.swift | 46 +++--- .../GRPCMessageDecoder.swift | 8 +- .../GRPCMessageFramer.swift | 15 +- .../GRPCNIOTransportBytes.swift | 65 ++++++++ .../GRPCStreamStateMachine.swift | 22 +-- .../Internal/NIOChannelPipeline+GRPC.swift | 16 +- .../Server/CommonHTTP2ServerTransport.swift | 4 +- .../Server/Connection/ServerConnection.swift | 22 ++- .../Server/GRPCServerStreamHandler.swift | 9 +- .../HTTP2ClientTransport+Posix.swift | 4 +- .../HTTP2ServerTransport+Posix.swift | 2 + ...TP2ClientTransport+TransportServices.swift | 2 + ...TP2ServerTransport+TransportServices.swift | 2 + .../Client/Connection/ConnectionTests.swift | 8 +- .../LoadBalancers/SubchannelTests.swift | 4 +- .../Connection/Utilities/ConnectionTest.swift | 6 +- .../Connection/Utilities/TestServer.swift | 12 +- .../Client/GRPCClientStreamHandlerTests.swift | 117 +++++++------ .../GRPCMessageDecoderTests.swift | 16 +- .../GRPCMessageDeframerTests.swift | 10 +- .../GRPCMessageFramerTests.swift | 6 +- .../GRPCStreamStateMachineTests.swift | 120 +++++++------- .../Server/Compression/ZlibTests.swift | 24 +-- .../Server/GRPCServerStreamHandlerTests.swift | 121 ++++++++------ .../ControlClient.swift | 6 +- .../ControlMessages.swift | 10 +- .../ControlService.swift | 6 +- .../HTTP2TransportTLSEnabledTests.swift | 39 +++-- .../HTTP2TransportTests.swift | 79 +++++---- .../Test Utilities/TransportKind.swift | 156 ++++++++++++++++++ 42 files changed, 789 insertions(+), 354 deletions(-) create mode 100644 IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_control.grpc.swift create mode 100644 IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_messages.grpc.swift create mode 100644 IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_payloads.grpc.swift create mode 100644 IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_stats.grpc.swift create mode 100644 Sources/GRPCNIOTransportCore/GRPCNIOTransportBytes.swift create mode 100644 Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/TransportKind.swift diff --git a/IntegrationTests/grpc-interop-tests/Sources/InteroperabilityTestsExecutable.swift b/IntegrationTests/grpc-interop-tests/Sources/InteroperabilityTestsExecutable.swift index f984c42..f866a7b 100644 --- a/IntegrationTests/grpc-interop-tests/Sources/InteroperabilityTestsExecutable.swift +++ b/IntegrationTests/grpc-interop-tests/Sources/InteroperabilityTestsExecutable.swift @@ -96,7 +96,7 @@ struct InteroperabilityTestsExecutable: AsyncParsableCommand { try await withThrowingDiscardingTaskGroup { group in group.addTask { - try await client.run() + try await client.runConnections() } for testName in testNames { @@ -111,7 +111,10 @@ struct InteroperabilityTestsExecutable: AsyncParsableCommand { } } - private func buildClient(host: String, port: Int) throws -> GRPCClient { + private func buildClient( + host: String, + port: Int + ) throws -> GRPCClient { let serviceConfig = ServiceConfig(loadBalancingConfig: [.roundRobin]) return GRPCClient( transport: try .http2NIOPosix( @@ -127,7 +130,7 @@ struct InteroperabilityTestsExecutable: AsyncParsableCommand { private func runTest( _ testCase: InteroperabilityTestCase, - using client: GRPCClient + using client: GRPCClient ) async { print("Running '\(testCase.name)' ... ", terminator: "") do { diff --git a/IntegrationTests/grpc-performance-tests/Sources/BenchmarkClient.swift b/IntegrationTests/grpc-performance-tests/Sources/BenchmarkClient.swift index 402dde1..e71258e 100644 --- a/IntegrationTests/grpc-performance-tests/Sources/BenchmarkClient.swift +++ b/IntegrationTests/grpc-performance-tests/Sources/BenchmarkClient.swift @@ -16,6 +16,7 @@ import Foundation import GRPCCore +import GRPCNIOTransportHTTP2 import NIOConcurrencyHelpers import Synchronization @@ -29,7 +30,7 @@ final class BenchmarkClient: Sendable { } /// The underlying client. - private let client: GRPCClient + private let client: GRPCClient /// The number of concurrent RPCs to run. private let concurrentRPCs: Int @@ -49,7 +50,7 @@ final class BenchmarkClient: Sendable { private let rpcStats: NIOLockedValueBox init( - client: GRPCClient, + client: GRPCClient, concurrentRPCs: Int, rpcType: RPCType, messagesPerStream: Int, @@ -96,7 +97,7 @@ final class BenchmarkClient: Sendable { return try await withThrowingTaskGroup(of: Void.self) { clientGroup in // Start the client. clientGroup.addTask { - try await self.client.run() + try await self.client.runConnections() } try await withThrowingTaskGroup(of: Void.self) { rpcsGroup in @@ -148,7 +149,9 @@ final class BenchmarkClient: Sendable { return (result, nanoseconds: Double(endTime - startTime)) } - private func unary(benchmark: Grpc_Testing_BenchmarkService.Client) async { + private func unary( + benchmark: Grpc_Testing_BenchmarkService.Client + ) async { let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt { do { try await benchmark.unaryCall(request: ClientRequest(message: self.message)) { @@ -165,7 +168,9 @@ final class BenchmarkClient: Sendable { self.record(latencyNanos: nanoseconds, errorCode: errorCode) } - private func streaming(benchmark: Grpc_Testing_BenchmarkService.Client) async { + private func streaming( + benchmark: Grpc_Testing_BenchmarkService.Client + ) async { // Streaming RPCs ping-pong messages back and forth. To achieve this the response message // stream is sent to the request closure, and the request closure indicates the outcome back // to the response handler to keep the RPC alive for the appropriate amount of time. diff --git a/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_benchmark_service.grpc.swift b/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_benchmark_service.grpc.swift index 8e54201..ac94821 100644 --- a/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_benchmark_service.grpc.swift +++ b/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_benchmark_service.grpc.swift @@ -26,7 +26,6 @@ import GRPCCore import GRPCProtobuf -import SwiftProtobuf // MARK: - grpc.testing.BenchmarkService @@ -437,7 +436,7 @@ extension Grpc_Testing_BenchmarkService { // Default implementation of 'registerMethods(with:)'. extension Grpc_Testing_BenchmarkService.StreamingServiceProtocol { - internal func registerMethods(with router: inout GRPCCore.RPCRouter) { + internal func registerMethods(with router: inout GRPCCore.RPCRouter) where Transport: GRPCCore.ServerTransport { router.registerHandler( forMethod: Grpc_Testing_BenchmarkService.Method.UnaryCall.descriptor, deserializer: GRPCProtobuf.ProtobufDeserializer(), @@ -747,14 +746,14 @@ extension Grpc_Testing_BenchmarkService { /// The ``Client`` provides an implementation of ``ClientProtocol`` which wraps /// a `GRPCCore.GRPCCClient`. The underlying `GRPCClient` provides the long-lived /// means of communication with the remote peer. - internal struct Client: ClientProtocol { - private let client: GRPCCore.GRPCClient + internal struct Client: ClientProtocol where Transport: GRPCCore.ClientTransport { + private let client: GRPCCore.GRPCClient /// Creates a new client wrapping the provided `GRPCCore.GRPCClient`. /// /// - Parameters: /// - client: A `GRPCCore.GRPCClient` providing a communication channel to the service. - internal init(wrapping client: GRPCCore.GRPCClient) { + internal init(wrapping client: GRPCCore.GRPCClient) { self.client = client } diff --git a/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_control.grpc.swift b/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_control.grpc.swift new file mode 100644 index 0000000..dac040b --- /dev/null +++ b/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_control.grpc.swift @@ -0,0 +1,24 @@ +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// DO NOT EDIT. +// swift-format-ignore-file +// +// Generated by the gRPC Swift generator plugin for the protocol buffer compiler. +// Source: grpc/testing/control.proto +// +// For information on using the generated types, please see the documentation: +// https://github.com/grpc/grpc-swift + +// This file contained no services. \ No newline at end of file diff --git a/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_messages.grpc.swift b/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_messages.grpc.swift new file mode 100644 index 0000000..fc8dfa8 --- /dev/null +++ b/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_messages.grpc.swift @@ -0,0 +1,26 @@ +// Copyright 2015-2016 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Message definitions to be used by integration test service definitions. + +// DO NOT EDIT. +// swift-format-ignore-file +// +// Generated by the gRPC Swift generator plugin for the protocol buffer compiler. +// Source: grpc/testing/messages.proto +// +// For information on using the generated types, please see the documentation: +// https://github.com/grpc/grpc-swift + +// This file contained no services. \ No newline at end of file diff --git a/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_payloads.grpc.swift b/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_payloads.grpc.swift new file mode 100644 index 0000000..ec05e33 --- /dev/null +++ b/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_payloads.grpc.swift @@ -0,0 +1,24 @@ +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// DO NOT EDIT. +// swift-format-ignore-file +// +// Generated by the gRPC Swift generator plugin for the protocol buffer compiler. +// Source: grpc/testing/payloads.proto +// +// For information on using the generated types, please see the documentation: +// https://github.com/grpc/grpc-swift + +// This file contained no services. \ No newline at end of file diff --git a/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_stats.grpc.swift b/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_stats.grpc.swift new file mode 100644 index 0000000..1946bf0 --- /dev/null +++ b/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_stats.grpc.swift @@ -0,0 +1,24 @@ +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// DO NOT EDIT. +// swift-format-ignore-file +// +// Generated by the gRPC Swift generator plugin for the protocol buffer compiler. +// Source: grpc/testing/stats.proto +// +// For information on using the generated types, please see the documentation: +// https://github.com/grpc/grpc-swift + +// This file contained no services. \ No newline at end of file diff --git a/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_worker_service.grpc.swift b/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_worker_service.grpc.swift index df08581..eeec27c 100644 --- a/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_worker_service.grpc.swift +++ b/IntegrationTests/grpc-performance-tests/Sources/Generated/grpc_testing_worker_service.grpc.swift @@ -26,7 +26,6 @@ import GRPCCore import GRPCProtobuf -import SwiftProtobuf // MARK: - grpc.testing.WorkerService @@ -381,7 +380,7 @@ extension Grpc_Testing_WorkerService { // Default implementation of 'registerMethods(with:)'. extension Grpc_Testing_WorkerService.StreamingServiceProtocol { - internal func registerMethods(with router: inout GRPCCore.RPCRouter) { + internal func registerMethods(with router: inout GRPCCore.RPCRouter) where Transport: GRPCCore.ServerTransport { router.registerHandler( forMethod: Grpc_Testing_WorkerService.Method.RunServer.descriptor, deserializer: GRPCProtobuf.ProtobufDeserializer(), diff --git a/IntegrationTests/grpc-performance-tests/Sources/WorkerService.swift b/IntegrationTests/grpc-performance-tests/Sources/WorkerService.swift index ecafef5..88b5221 100644 --- a/IntegrationTests/grpc-performance-tests/Sources/WorkerService.swift +++ b/IntegrationTests/grpc-performance-tests/Sources/WorkerService.swift @@ -37,7 +37,7 @@ final class WorkerService: Sendable { } struct Server { - var server: GRPCServer + var server: GRPCServer var stats: ServerStats var eventLoopGroup: MultiThreadedEventLoopGroup } @@ -96,7 +96,7 @@ final class WorkerService: Sendable { } mutating func startedServer( - _ server: GRPCServer, + _ server: GRPCServer, stats: ServerStats, eventLoopGroup: MultiThreadedEventLoopGroup ) -> OnStartedServer { @@ -167,7 +167,7 @@ final class WorkerService: Sendable { } enum OnStopListening { - case stopListening(GRPCServer) + case stopListening(GRPCServer) case nothing } @@ -200,7 +200,7 @@ final class WorkerService: Sendable { } enum OnQuitWorker { - case shutDownServer(GRPCServer) + case shutDownServer(GRPCServer) case shutDownClients([BenchmarkClient]) case nothing } @@ -377,7 +377,7 @@ extension WorkerService: Grpc_Testing_WorkerService.ServiceProtocol { extension WorkerService { private func startServer( _ serverConfig: Grpc_Testing_ServerConfig - ) async throws -> (GRPCServer, HTTP2ServerTransport.Posix) { + ) async throws -> (GRPCServer, HTTP2ServerTransport.Posix) { // Prepare an ELG, the test might require more than the default of one. let numberOfThreads: Int if serverConfig.asyncServerThreads > 0 { diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift b/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift index 1414384..832eaf7 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift @@ -239,8 +239,8 @@ package final class Connection: Sendable { wrappingChannelSynchronously: channel, configuration: NIOAsyncChannel.Configuration( isOutboundHalfClosureEnabled: true, - inboundType: RPCResponsePart.self, - outboundType: RPCRequestPart.self + inboundType: RPCResponsePart.self, + outboundType: RPCRequestPart.self ) ) } @@ -389,23 +389,32 @@ package final class Connection: Sendable { extension Connection { package struct Stream { - package typealias Inbound = NIOAsyncChannelInboundStream + package typealias Inbound = NIOAsyncChannelInboundStream> + + typealias RequestWriter = NIOAsyncChannelOutboundWriter< + RPCRequestPart + > + + typealias HTTP2Stream = NIOAsyncChannel< + RPCResponsePart, + RPCRequestPart + > package struct Outbound: ClosableRPCWriterProtocol { - package typealias Element = RPCRequestPart + package typealias Element = RPCRequestPart - private let requestWriter: NIOAsyncChannelOutboundWriter - private let http2Stream: NIOAsyncChannel + private let requestWriter: RequestWriter + private let http2Stream: HTTP2Stream fileprivate init( - requestWriter: NIOAsyncChannelOutboundWriter, - http2Stream: NIOAsyncChannel + requestWriter: RequestWriter, + http2Stream: HTTP2Stream ) { self.requestWriter = requestWriter self.http2Stream = http2Stream } - package func write(_ element: RPCRequestPart) async throws { + package func write(_ element: RPCRequestPart) async throws { try await self.requestWriter.write(element) } @@ -425,10 +434,10 @@ extension Connection { let context: ClientContext - private let http2Stream: NIOAsyncChannel + private let http2Stream: HTTP2Stream init( - wrapping stream: NIOAsyncChannel, + wrapping stream: HTTP2Stream, context: ClientContext ) { self.http2Stream = stream diff --git a/Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift b/Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift index 6cf6727..eb8254e 100644 --- a/Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift +++ b/Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift @@ -19,6 +19,8 @@ package import GRPCCore private import Synchronization package final class GRPCChannel: ClientTransport { + package typealias Bytes = GRPCNIOTransportBytes + private enum Input: Sendable { /// Close the channel, if possible. case close diff --git a/Sources/GRPCNIOTransportCore/Client/GRPCClientStreamHandler.swift b/Sources/GRPCNIOTransportCore/Client/GRPCClientStreamHandler.swift index cdc8a60..1565cde 100644 --- a/Sources/GRPCNIOTransportCore/Client/GRPCClientStreamHandler.swift +++ b/Sources/GRPCNIOTransportCore/Client/GRPCClientStreamHandler.swift @@ -20,9 +20,9 @@ internal import NIOHTTP2 final class GRPCClientStreamHandler: ChannelDuplexHandler { typealias InboundIn = HTTP2Frame.FramePayload - typealias InboundOut = RPCResponsePart + typealias InboundOut = RPCResponsePart - typealias OutboundIn = RPCRequestPart + typealias OutboundIn = RPCRequestPart typealias OutboundOut = HTTP2Frame.FramePayload private var stateMachine: GRPCStreamStateMachine @@ -80,7 +80,8 @@ extension GRPCClientStreamHandler { loop: while true { switch self.stateMachine.nextInboundMessage() { case .receiveMessage(let message): - context.fireChannelRead(self.wrapInboundOut(.message(message))) + let wrapped = GRPCNIOTransportBytes(message) + context.fireChannelRead(self.wrapInboundOut(.message(wrapped))) case .awaitMoreMessages: break loop case .noMoreMessages: @@ -193,7 +194,7 @@ extension GRPCClientStreamHandler { case .message(let message): do { - try self.stateMachine.send(message: message, promise: promise) + try self.stateMachine.send(message: message.buffer, promise: promise) } catch let invalidState { let error = RPCError(invalidState) promise?.fail(error) diff --git a/Sources/GRPCNIOTransportCore/Compression/Zlib.swift b/Sources/GRPCNIOTransportCore/Compression/Zlib.swift index a2a2511..13c41ac 100644 --- a/Sources/GRPCNIOTransportCore/Compression/Zlib.swift +++ b/Sources/GRPCNIOTransportCore/Compression/Zlib.swift @@ -60,9 +60,9 @@ extension Zlib { /// - Parameter output: The `ByteBuffer` into which the compressed message should be written. /// - Returns: The number of bytes written into the `output` buffer. @discardableResult - func compress(_ input: [UInt8], into output: inout ByteBuffer) throws(ZlibError) -> Int { + func compress(_ input: ByteBuffer, into output: inout ByteBuffer) throws(ZlibError) -> Int { defer { self.reset() } - let upperBound = self.stream.deflateBound(inputBytes: input.count) + let upperBound = self.stream.deflateBound(inputBytes: input.readableBytes) return try self.stream.deflate(input, into: &output, upperBound: upperBound) } @@ -110,7 +110,7 @@ extension Zlib { /// - Parameters: /// - input: The buffer read compressed bytes from. /// - limit: The largest size a decompressed payload may be. - func decompress(_ input: inout ByteBuffer, limit: Int) throws -> [UInt8] { + func decompress(_ input: inout ByteBuffer, limit: Int) throws -> ByteBuffer { defer { self.reset() } return try self.stream.inflate(input: &input, limit: limit) } @@ -302,8 +302,8 @@ extension UnsafeMutablePointer { self.pointee.msg.map { String(cString: $0) } } - func inflate(input: inout ByteBuffer, limit: Int) throws -> [UInt8] { - return try input.readWithUnsafeMutableReadableBytes { inputPointer in + func inflate(input: inout ByteBuffer, limit: Int) throws -> ByteBuffer { + return try input.readWithUnsafeMutableReadableBytes { inputPointer -> (Int, ByteBuffer) in self.setNextInputBuffer(inputPointer) defer { self.setNextInputBuffer(nil) @@ -311,15 +311,21 @@ extension UnsafeMutablePointer { } // Assume the output will be twice as large as the input. - var output = [UInt8](repeating: 0, count: min(inputPointer.count * 2, limit)) - var offset = 0 + var output = ByteBuffer() + var outputSize = min(inputPointer.count * 2, limit) + var finished = false + var totalBytesWritten = 0 while true { - let (finished, written) = try output[offset...].withUnsafeMutableBytes { outPointer in + let written = try output.writeWithUnsafeMutableBytes( + minimumWritableBytes: outputSize + ) { pointer in + let outPointer = UnsafeMutableRawBufferPointer( + start: pointer.baseAddress, + count: min(pointer.count, outputSize) + ) self.setNextOutputBuffer(outPointer) - let finished: Bool - // Possible return codes: // - Z_OK: some progress has been made // - Z_STREAM_END: the end of the compressed data has been reached and all uncompressed @@ -347,29 +353,29 @@ extension UnsafeMutablePointer { ) } - let size = outPointer.count - self.availableOutputBytes - return (finished, size) + return outPointer.count - self.availableOutputBytes } if finished { - output.removeLast(output.count - self.totalOutputBytes) let bytesRead = inputPointer.count - self.availableInputBytes return (bytesRead, output) } else { - offset += written - let newSize = min(output.count * 2, limit) - if newSize == output.count { + // There are still more bytes to decompress. Increase the size of the extra space in the + // buffer we're writing into. + totalBytesWritten += written + let newSize = min(outputSize * 2, limit - totalBytesWritten) + if newSize <= 0 { + assert(newSize == 0) throw RPCError(code: .resourceExhausted, message: "Message is too large to decompress.") - } else { - output.append(contentsOf: repeatElement(0, count: newSize - output.count)) } + outputSize = newSize } } } } func deflate( - _ input: [UInt8], + _ input: ByteBuffer, into output: inout ByteBuffer, upperBound: Int ) throws(ZlibError) -> Int { @@ -380,7 +386,7 @@ extension UnsafeMutablePointer { do { var input = input - return try input.withUnsafeMutableBytes { input in + return try input.withUnsafeMutableReadableBytes { input in self.setNextInputBuffer(input) return try output.writeWithUnsafeMutableBytes(minimumWritableBytes: upperBound) { output in diff --git a/Sources/GRPCNIOTransportCore/GRPCMessageDecoder.swift b/Sources/GRPCNIOTransportCore/GRPCMessageDecoder.swift index 9d557a6..b32a846 100644 --- a/Sources/GRPCNIOTransportCore/GRPCMessageDecoder.swift +++ b/Sources/GRPCNIOTransportCore/GRPCMessageDecoder.swift @@ -25,7 +25,7 @@ struct GRPCMessageDecoder: NIOSingleStepByteToMessageDecoder { /// Length of the gRPC message header (1 compression byte, 4 bytes for the length). static let metadataLength = 5 - typealias InboundOut = [UInt8] + typealias InboundOut = ByteBuffer private let decompressor: Zlib.Decompressor? private let maxPayloadSize: Int @@ -92,7 +92,7 @@ struct GRPCMessageDecoder: NIOSingleStepByteToMessageDecoder { } return try decompressor.decompress(&message, limit: self.maxPayloadSize) } else { - return Array(buffer: message) + return message } } @@ -136,7 +136,7 @@ package struct GRPCMessageDeframer { } } - package mutating func decodeNext() throws -> [UInt8]? { + package mutating func decodeNext() throws -> ByteBuffer? { guard (self.buffer?.readableBytes ?? 0) > 0 else { return nil } // Above checks mean this is both non-nil and non-empty. let message = try self.decoder.decode(buffer: &self.buffer!) @@ -145,7 +145,7 @@ package struct GRPCMessageDeframer { } extension GRPCMessageDeframer { - mutating func decode(into queue: inout OneOrManyQueue<[UInt8]>) throws { + mutating func decode(into queue: inout OneOrManyQueue) throws { while let next = try self.decodeNext() { queue.append(next) } diff --git a/Sources/GRPCNIOTransportCore/GRPCMessageFramer.swift b/Sources/GRPCNIOTransportCore/GRPCMessageFramer.swift index 509b7ea..ddabad0 100644 --- a/Sources/GRPCNIOTransportCore/GRPCMessageFramer.swift +++ b/Sources/GRPCNIOTransportCore/GRPCMessageFramer.swift @@ -33,7 +33,7 @@ struct GRPCMessageFramer { /// reserves capacity in powers of 2. This way, we can take advantage of the whole buffer. static let maxWriteBufferLength = 65_536 - private var pendingMessages: OneOrManyQueue<(bytes: [UInt8], promise: EventLoopPromise?)> + private var pendingMessages: OneOrManyQueue<(bytes: ByteBuffer, promise: EventLoopPromise?)> private var writeBuffer: ByteBuffer @@ -45,7 +45,7 @@ struct GRPCMessageFramer { /// Queue the given bytes to be framed and potentially coalesced alongside other messages in a `ByteBuffer`. /// The resulting data will be returned when calling ``GRPCMessageFramer/next()``. - mutating func append(_ bytes: [UInt8], promise: EventLoopPromise?) { + mutating func append(_ bytes: ByteBuffer, promise: EventLoopPromise?) { self.pendingMessages.append((bytes, promise)) } @@ -72,7 +72,7 @@ struct GRPCMessageFramer { var requiredCapacity = 0 for message in self.pendingMessages { - requiredCapacity += message.bytes.count + Self.metadataLength + requiredCapacity += message.bytes.readableBytes + Self.metadataLength } self.writeBuffer.clear(minimumCapacity: requiredCapacity) @@ -90,7 +90,10 @@ struct GRPCMessageFramer { return (result: .success(self.writeBuffer), promise: pendingWritePromise) } - private mutating func encode(_ message: [UInt8], compressor: Zlib.Compressor?) throws(RPCError) { + private mutating func encode( + _ message: ByteBuffer, + compressor: Zlib.Compressor? + ) throws(RPCError) { if let compressor { self.writeBuffer.writeInteger(UInt8(1)) // Set compression flag @@ -108,9 +111,9 @@ struct GRPCMessageFramer { } else { self.writeBuffer.writeMultipleIntegers( UInt8(0), // Clear compression flag - UInt32(message.count) // Set message length + UInt32(message.readableBytes) // Set message length ) - self.writeBuffer.writeBytes(message) + self.writeBuffer.writeImmutableBuffer(message) } } } diff --git a/Sources/GRPCNIOTransportCore/GRPCNIOTransportBytes.swift b/Sources/GRPCNIOTransportCore/GRPCNIOTransportBytes.swift new file mode 100644 index 0000000..f960f1d --- /dev/null +++ b/Sources/GRPCNIOTransportCore/GRPCNIOTransportBytes.swift @@ -0,0 +1,65 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public import GRPCCore +public import NIOCore + +/// The contiguous bytes type used by the gRPC's NIO transport. +public struct GRPCNIOTransportBytes: GRPCContiguousBytes, Hashable, Sendable { + @usableFromInline + internal var buffer: ByteBuffer + + @inlinable + internal init(_ buffer: ByteBuffer) { + self.buffer = buffer + } + + @inlinable + internal init() { + self.buffer = ByteBuffer() + } + + @inlinable + public init(repeating: UInt8, count: Int) { + self.buffer = ByteBuffer(repeating: repeating, count: count) + } + + @inlinable + public init(_ sequence: some Sequence) { + self.buffer = ByteBuffer(bytes: sequence) + } + + @inlinable + public var count: Int { + self.buffer.readableBytes + } + + @inlinable + public func withUnsafeBytes( + _ body: (UnsafeRawBufferPointer) throws -> R + ) rethrows -> R { + try self.buffer.withUnsafeReadableBytes(body) + } + + @inlinable + public mutating func withUnsafeMutableBytes( + _ body: (UnsafeMutableRawBufferPointer) throws -> R + ) rethrows -> R { + // 'GRPCContiguousBytes' has no concept of readable/writable bytes; all bytes stored are + // readable and writable. In 'ByteBuffer' terms, these are just the readable bytes. + try self.buffer.withUnsafeMutableReadableBytes(body) + } +} diff --git a/Sources/GRPCNIOTransportCore/GRPCStreamStateMachine.swift b/Sources/GRPCNIOTransportCore/GRPCStreamStateMachine.swift index 0355610..8f162e8 100644 --- a/Sources/GRPCNIOTransportCore/GRPCStreamStateMachine.swift +++ b/Sources/GRPCNIOTransportCore/GRPCStreamStateMachine.swift @@ -88,7 +88,7 @@ private enum GRPCStreamStateMachineState { var deframer: GRPCMessageDeframer? var decompressor: Zlib.Decompressor? - var inboundMessageBuffer: OneOrManyQueue<[UInt8]> + var inboundMessageBuffer: OneOrManyQueue // Store the headers received from the remote peer, its storage can be reused when sending // headers back to the remote peer. @@ -122,7 +122,7 @@ private enum GRPCStreamStateMachineState { var deframer: GRPCMessageDeframer var decompressor: Zlib.Decompressor? - var inboundMessageBuffer: OneOrManyQueue<[UInt8]> + var inboundMessageBuffer: OneOrManyQueue // Store the headers received from the remote peer, its storage can be reused when sending // headers back to the remote peer. @@ -153,7 +153,7 @@ private enum GRPCStreamStateMachineState { let deframer: GRPCMessageDeframer? var decompressor: Zlib.Decompressor? - var inboundMessageBuffer: OneOrManyQueue<[UInt8]> + var inboundMessageBuffer: OneOrManyQueue // This transition should only happen on the server-side when, upon receiving // initial client metadata, some of the headers are invalid and we must reject @@ -205,7 +205,7 @@ private enum GRPCStreamStateMachineState { let deframer: GRPCMessageDeframer? var decompressor: Zlib.Decompressor? - var inboundMessageBuffer: OneOrManyQueue<[UInt8]> + var inboundMessageBuffer: OneOrManyQueue // Store the headers received from the remote peer, its storage can be reused when sending // headers back to the remote peer. @@ -275,7 +275,7 @@ private enum GRPCStreamStateMachineState { var deframer: GRPCMessageDeframer? var decompressor: Zlib.Decompressor? - var inboundMessageBuffer: OneOrManyQueue<[UInt8]> + var inboundMessageBuffer: OneOrManyQueue // Store the headers received from the remote peer, its storage can be reused when sending // headers back to the remote peer. @@ -340,7 +340,7 @@ private enum GRPCStreamStateMachineState { var outboundCompression: CompressionAlgorithm // These are already deframed, so we don't need the deframer anymore. - var inboundMessageBuffer: OneOrManyQueue<[UInt8]> + var inboundMessageBuffer: OneOrManyQueue // This transition should only happen on the server-side when, upon receiving // initial client metadata, some of the headers are invalid and we must reject @@ -424,7 +424,7 @@ struct GRPCStreamStateMachine { } } - mutating func send(message: [UInt8], promise: EventLoopPromise?) throws(InvalidState) { + mutating func send(message: ByteBuffer, promise: EventLoopPromise?) throws(InvalidState) { switch self.configuration { case .client: try self.clientSend(message: message, promise: promise) @@ -556,7 +556,7 @@ struct GRPCStreamStateMachine { /// There isn't a message ready to be sent, but we could still receive more, so keep trying. case awaitMoreMessages /// A message has been received. - case receiveMessage([UInt8]) + case receiveMessage(ByteBuffer) } mutating func nextInboundMessage() -> OnNextInboundMessage { @@ -716,7 +716,7 @@ extension GRPCStreamStateMachine { } private mutating func clientSend( - message: [UInt8], + message: ByteBuffer, promise: EventLoopPromise? ) throws(InvalidState) { switch self.state { @@ -1146,7 +1146,7 @@ extension GRPCStreamStateMachine { return .readInbound } catch { self.state = .clientClosedServerOpen(state) - let status = Status(code: .internalError, message: "Failed to decode message") + let status = Status(code: .internalError, message: "Failed to decode message \(error)") return .endRPCAndForwardErrorStatus_clientOnly(status) } @@ -1321,7 +1321,7 @@ extension GRPCStreamStateMachine { } private mutating func serverSend( - message: [UInt8], + message: ByteBuffer, promise: EventLoopPromise? ) throws(InvalidState) { switch self.state { diff --git a/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift b/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift index 89a95ae..e9cd74c 100644 --- a/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift +++ b/Sources/GRPCNIOTransportCore/Internal/NIOChannelPipeline+GRPC.swift @@ -22,7 +22,13 @@ package import NIOHTTP2 extension ChannelPipeline.SynchronousOperations { package typealias HTTP2ConnectionChannel = NIOAsyncChannel package typealias HTTP2StreamMultiplexer = NIOHTTP2Handler.AsyncStreamMultiplexer< - (NIOAsyncChannel, EventLoopFuture) + ( + NIOAsyncChannel< + RPCRequestPart, + RPCResponsePart + >, + EventLoopFuture + ) > package func configureGRPCServerPipeline( @@ -95,8 +101,12 @@ extension ChannelPipeline.SynchronousOperations { ) try streamChannel.pipeline.syncOperations.addHandler(streamHandler) - let asyncStreamChannel = try NIOAsyncChannel( - wrappingChannelSynchronously: streamChannel + let asyncStreamChannel = try NIOAsyncChannel( + wrappingChannelSynchronously: streamChannel, + configuration: NIOAsyncChannel.Configuration( + inboundType: RPCRequestPart.self, + outboundType: RPCResponsePart.self + ) ) return (asyncStreamChannel, methodDescriptorPromise.futureResult) } diff --git a/Sources/GRPCNIOTransportCore/Server/CommonHTTP2ServerTransport.swift b/Sources/GRPCNIOTransportCore/Server/CommonHTTP2ServerTransport.swift index b0fd652..ebd6ca2 100644 --- a/Sources/GRPCNIOTransportCore/Server/CommonHTTP2ServerTransport.swift +++ b/Sources/GRPCNIOTransportCore/Server/CommonHTTP2ServerTransport.swift @@ -26,6 +26,8 @@ private import Synchronization package final class CommonHTTP2ServerTransport< ListenerFactory: HTTP2ListenerFactory >: ServerTransport, ListeningServerTransport { + package typealias Bytes = GRPCNIOTransportBytes + private let eventLoopGroup: any EventLoopGroup private let address: SocketAddress private let listeningAddressState: Mutex @@ -232,7 +234,7 @@ package final class CommonHTTP2ServerTransport< } private func handleStream( - _ stream: NIOAsyncChannel, + _ stream: NIOAsyncChannel, RPCResponsePart>, handler streamHandler: @escaping @Sendable ( _ stream: RPCStream, _ context: ServerContext diff --git a/Sources/GRPCNIOTransportCore/Server/Connection/ServerConnection.swift b/Sources/GRPCNIOTransportCore/Server/Connection/ServerConnection.swift index 3a96524..097085c 100644 --- a/Sources/GRPCNIOTransportCore/Server/Connection/ServerConnection.swift +++ b/Sources/GRPCNIOTransportCore/Server/Connection/ServerConnection.swift @@ -20,20 +20,30 @@ package import NIOCore public enum ServerConnection { public enum Stream { package struct Outbound: ClosableRPCWriterProtocol { - package typealias Element = RPCResponsePart + package typealias Element = RPCResponsePart - private let responseWriter: NIOAsyncChannelOutboundWriter - private let http2Stream: NIOAsyncChannel + private let responseWriter: + NIOAsyncChannelOutboundWriter< + RPCResponsePart + > + private let http2Stream: + NIOAsyncChannel< + RPCRequestPart, + RPCResponsePart + > package init( - responseWriter: NIOAsyncChannelOutboundWriter, - http2Stream: NIOAsyncChannel + responseWriter: NIOAsyncChannelOutboundWriter>, + http2Stream: NIOAsyncChannel< + RPCRequestPart, + RPCResponsePart + > ) { self.responseWriter = responseWriter self.http2Stream = http2Stream } - package func write(_ element: RPCResponsePart) async throws { + package func write(_ element: RPCResponsePart) async throws { try await self.responseWriter.write(element) } diff --git a/Sources/GRPCNIOTransportCore/Server/GRPCServerStreamHandler.swift b/Sources/GRPCNIOTransportCore/Server/GRPCServerStreamHandler.swift index 4482a8b..e36447f 100644 --- a/Sources/GRPCNIOTransportCore/Server/GRPCServerStreamHandler.swift +++ b/Sources/GRPCNIOTransportCore/Server/GRPCServerStreamHandler.swift @@ -20,9 +20,9 @@ package import NIOHTTP2 package final class GRPCServerStreamHandler: ChannelDuplexHandler, RemovableChannelHandler { package typealias InboundIn = HTTP2Frame.FramePayload - package typealias InboundOut = RPCRequestPart + package typealias InboundOut = RPCRequestPart - package typealias OutboundIn = RPCResponsePart + package typealias OutboundIn = RPCResponsePart package typealias OutboundOut = HTTP2Frame.FramePayload private var stateMachine: GRPCStreamStateMachine @@ -139,7 +139,8 @@ extension GRPCServerStreamHandler { loop: while true { switch self.stateMachine.nextInboundMessage() { case .receiveMessage(let message): - context.fireChannelRead(self.wrapInboundOut(.message(message))) + let wrapped = GRPCNIOTransportBytes(message) + context.fireChannelRead(self.wrapInboundOut(.message(wrapped))) case .awaitMoreMessages: break loop @@ -277,7 +278,7 @@ extension GRPCServerStreamHandler { case .message(let message): do { - try self.stateMachine.send(message: message, promise: promise) + try self.stateMachine.send(message: message.buffer, promise: promise) self.connectionManagementHandler.wroteDataFrame() } catch let invalidState { let error = RPCError(invalidState) diff --git a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift index 711032f..c507554 100644 --- a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift +++ b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift @@ -56,6 +56,8 @@ extension HTTP2ClientTransport { /// } /// ``` public struct Posix: ClientTransport { + public typealias Bytes = GRPCNIOTransportBytes + private let channel: GRPCChannel /// Creates a new NIOPosix-based HTTP/2 client transport. @@ -105,7 +107,7 @@ extension HTTP2ClientTransport { self.channel.retryThrottle } - public func connect() async { + public func connect() async throws { await self.channel.connect() } diff --git a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift index 75e23f7..08da85b 100644 --- a/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift +++ b/Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift @@ -52,6 +52,8 @@ extension HTTP2ServerTransport { /// } /// ``` public struct Posix: ServerTransport, ListeningServerTransport { + public typealias Bytes = GRPCNIOTransportBytes + private struct ListenerFactory: HTTP2ListenerFactory { let config: Config let transportSecurity: TransportSecurity diff --git a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift index 4da29c5..33b5c9f 100644 --- a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift +++ b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift @@ -58,6 +58,8 @@ extension HTTP2ClientTransport { /// } /// ``` public struct TransportServices: ClientTransport { + public typealias Bytes = GRPCNIOTransportBytes + private let channel: GRPCChannel public var retryThrottle: RetryThrottle? { diff --git a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift index 2d219ce..97d90c0 100644 --- a/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift +++ b/Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ServerTransport+TransportServices.swift @@ -29,6 +29,8 @@ private import Synchronization extension HTTP2ServerTransport { /// A NIO Transport Services-backed implementation of a server transport. public struct TransportServices: ServerTransport, ListeningServerTransport { + public typealias Bytes = GRPCNIOTransportBytes + private struct ListenerFactory: HTTP2ListenerFactory { let config: Config let transportSecurity: TransportSecurity diff --git a/Tests/GRPCNIOTransportCoreTests/Client/Connection/ConnectionTests.swift b/Tests/GRPCNIOTransportCoreTests/Client/Connection/ConnectionTests.swift index 47a1d5f..4106334 100644 --- a/Tests/GRPCNIOTransportCoreTests/Client/Connection/ConnectionTests.swift +++ b/Tests/GRPCNIOTransportCoreTests/Client/Connection/ConnectionTests.swift @@ -133,10 +133,10 @@ final class ConnectionTests: XCTestCase { ) try await stream.execute { inbound, outbound in try await outbound.write(.metadata(["foo": "bar", "bar": "baz"])) - try await outbound.write(.message([0, 1, 2])) + try await outbound.write(.message(GRPCNIOTransportBytes([0, 1, 2]))) outbound.finish() - var parts = [RPCResponsePart]() + var parts = [RPCResponsePart]() for try await part in inbound { switch part { case .metadata(let metadata): @@ -147,9 +147,9 @@ final class ConnectionTests: XCTestCase { } } - let expected: [RPCResponsePart] = [ + let expected: [RPCResponsePart] = [ .metadata(["foo": "bar", "bar": "baz"]), - .message([0, 1, 2]), + .message(GRPCNIOTransportBytes([0, 1, 2])), .status(Status(code: .ok, message: ""), [:]), ] XCTAssertEqual(parts, expected) diff --git a/Tests/GRPCNIOTransportCoreTests/Client/Connection/LoadBalancers/SubchannelTests.swift b/Tests/GRPCNIOTransportCoreTests/Client/Connection/LoadBalancers/SubchannelTests.swift index 545aed6..4a5f0d0 100644 --- a/Tests/GRPCNIOTransportCoreTests/Client/Connection/LoadBalancers/SubchannelTests.swift +++ b/Tests/GRPCNIOTransportCoreTests/Client/Connection/LoadBalancers/SubchannelTests.swift @@ -85,7 +85,7 @@ final class SubchannelTests: XCTestCase { let stream = try await subchannel.makeStream(descriptor: .echoGet, options: .defaults) try await stream.execute { inbound, outbound in try await outbound.write(.metadata([:])) - try await outbound.write(.message([0, 1, 2])) + try await outbound.write(.message(GRPCNIOTransportBytes([0, 1, 2]))) outbound.finish() for try await part in inbound { @@ -93,7 +93,7 @@ final class SubchannelTests: XCTestCase { case .metadata: () // Don't validate, contains http/2 specific metadata too. case .message(let message): - XCTAssertEqual(message, [0, 1, 2]) + XCTAssertEqual(message, GRPCNIOTransportBytes([0, 1, 2])) case .status(let status, _): XCTAssertEqual(status.code, .ok) XCTAssertEqual(status.message, "") diff --git a/Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/ConnectionTest.swift b/Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/ConnectionTest.swift index 9bf2537..f079b25 100644 --- a/Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/ConnectionTest.swift +++ b/Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/ConnectionTest.swift @@ -180,10 +180,10 @@ extension ConnectionTest { } final class EchoHandler: ChannelInboundHandler { - typealias InboundIn = RPCRequestPart - typealias OutboundOut = RPCResponsePart + typealias InboundIn = RPCRequestPart + typealias OutboundOut = RPCResponsePart - private var received: Deque = [] + private var received: Deque> = [] private var receivedEnd = false func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { diff --git a/Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/TestServer.swift b/Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/TestServer.swift index 452320b..d836bc9 100644 --- a/Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/TestServer.swift +++ b/Tests/GRPCNIOTransportCoreTests/Client/Connection/Utilities/TestServer.swift @@ -25,13 +25,15 @@ import XCTest final class TestServer: Sendable { private let eventLoopGroup: any EventLoopGroup - private typealias Stream = NIOAsyncChannel + private typealias Stream = NIOAsyncChannel< + RPCRequestPart, RPCResponsePart + > private typealias Multiplexer = NIOHTTP2AsyncSequence private let connected: Mutex<[any Channel]> - typealias Inbound = NIOAsyncChannelInboundStream - typealias Outbound = NIOAsyncChannelOutboundWriter + typealias Inbound = NIOAsyncChannelInboundStream> + typealias Outbound = NIOAsyncChannelOutboundWriter> private let server: Mutex?> @@ -94,8 +96,8 @@ final class TestServer: Sendable { return try NIOAsyncChannel( wrappingChannelSynchronously: stream, configuration: .init( - inboundType: RPCRequestPart.self, - outboundType: RPCResponsePart.self + inboundType: RPCRequestPart.self, + outboundType: RPCResponsePart.self ) ) } diff --git a/Tests/GRPCNIOTransportCoreTests/Client/GRPCClientStreamHandlerTests.swift b/Tests/GRPCNIOTransportCoreTests/Client/GRPCClientStreamHandlerTests.swift index 7f62f2e..2fb9e01 100644 --- a/Tests/GRPCNIOTransportCoreTests/Client/GRPCClientStreamHandlerTests.swift +++ b/Tests/GRPCNIOTransportCoreTests/Client/GRPCClientStreamHandlerTests.swift @@ -71,7 +71,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { let channel = EmbeddedChannel(handler: handler) // Send client's initial metadata - let request = RPCRequestPart.metadata([:]) + let request = RPCRequestPart.metadata([:]) XCTAssertNoThrow(try channel.writeOutbound(request)) // Receive server's initial metadata without :status @@ -86,7 +86,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { ) XCTAssertEqual( - try channel.readInbound(as: RPCResponsePart.self), + try channel.readInbound(as: RPCResponsePart.self), .status( .init(code: .unknown, message: "HTTP Status Code is missing."), Metadata(headers: serverInitialMetadata) @@ -108,7 +108,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { let channel = EmbeddedChannel(handler: handler) // Send client's initial metadata - let request = RPCRequestPart.metadata([:]) + let request = RPCRequestPart.metadata([:]) XCTAssertNoThrow(try channel.writeOutbound(request)) // Receive server's initial metadata with 1xx status @@ -123,7 +123,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { ) ) - XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self)) + XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self)) } func testServerInitialMetadataOtherNon200HTTPStatusCodeResultsInFinishedRPC() throws { @@ -140,7 +140,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { let channel = EmbeddedChannel(handler: handler) // Send client's initial metadata - let request = RPCRequestPart.metadata([:]) + let request = RPCRequestPart.metadata([:]) XCTAssertNoThrow(try channel.writeOutbound(request)) // Receive server's initial metadata with non-200 and non-1xx :status @@ -156,7 +156,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { ) XCTAssertEqual( - try channel.readInbound(as: RPCResponsePart.self), + try channel.readInbound(as: RPCResponsePart.self), .status( .init(code: .unavailable, message: "Unexpected non-200 HTTP Status Code."), Metadata(headers: serverInitialMetadata) @@ -178,7 +178,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { let channel = EmbeddedChannel(handler: handler) // Send client's initial metadata - let request = RPCRequestPart.metadata([:]) + let request = RPCRequestPart.metadata([:]) XCTAssertNoThrow(try channel.writeOutbound(request)) // Receive server's initial metadata without content-type @@ -193,7 +193,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { ) XCTAssertEqual( - try channel.readInbound(as: RPCResponsePart.self), + try channel.readInbound(as: RPCResponsePart.self), .status( .init(code: .internalError, message: "Missing content-type header"), Metadata(headers: serverInitialMetadata) @@ -215,7 +215,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { // Send client's initial metadata XCTAssertNoThrow( - try channel.writeOutbound(RPCRequestPart.metadata(Metadata())) + try channel.writeOutbound(RPCRequestPart.metadata([:])) ) // Make sure we have sent right metadata. @@ -248,7 +248,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { ) XCTAssertEqual( - try channel.readInbound(as: RPCResponsePart.self), + try channel.readInbound(as: RPCResponsePart.self), .status( .init( code: .internalError, @@ -275,7 +275,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { // Send client's initial metadata XCTAssertNoThrow( - try channel.writeOutbound(RPCRequestPart.metadata(Metadata())) + try channel.writeOutbound(RPCRequestPart.metadata([:])) ) // Make sure we have sent right metadata. @@ -303,7 +303,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { ) ) XCTAssertEqual( - try channel.readInbound(as: RPCResponsePart.self), + try channel.readInbound(as: RPCResponsePart.self), .metadata(Metadata(headers: serverInitialMetadata)) ) @@ -319,7 +319,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { // Invalid payload should result in error status and stream being closed try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)) - let part = try channel.readInbound(as: RPCResponsePart.self) + let part = try channel.readInbound(as: RPCResponsePart.self) XCTAssertEqual( part, .status(Status(code: .internalError, message: "Failed to decode message"), [:]) @@ -343,7 +343,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { // Send client's initial metadata XCTAssertNoThrow( - try channel.writeOutbound(RPCRequestPart.metadata(Metadata())) + try channel.writeOutbound(RPCRequestPart.metadata([:])) ) // Make sure we have sent right metadata. @@ -371,7 +371,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { ) ) XCTAssertEqual( - try channel.readInbound(as: RPCResponsePart.self), + try channel.readInbound(as: RPCResponsePart.self), .metadata(Metadata(headers: serverInitialMetadata)) ) @@ -385,7 +385,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { // Make sure we got status + trailers with the right error. XCTAssertEqual( - try channel.readInbound(as: RPCResponsePart.self), + try channel.readInbound(as: RPCResponsePart.self), .status( Status( code: .internalError, @@ -411,7 +411,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { let channel = EmbeddedChannel(handler: handler) // Write client's initial metadata - XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))) + XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata([:]))) let clientInitialMetadata: HPACKHeaders = [ GRPCHTTP2Keys.path.rawValue: "/test/test", GRPCHTTP2Keys.scheme.rawValue: "http", @@ -439,7 +439,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { ) ) XCTAssertEqual( - try channel.readInbound(as: RPCResponsePart.self), + try channel.readInbound(as: RPCResponsePart.self), .status( .init(code: .ok, message: ""), [ @@ -478,7 +478,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { let channel = EmbeddedChannel(handler: handler) // Send client's initial metadata - let request = RPCRequestPart.metadata([:]) + let request = RPCRequestPart.metadata([:]) XCTAssertNoThrow(try channel.writeOutbound(request)) // Make sure we have sent the corresponding frame, and that nothing has been written back. @@ -494,7 +494,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { ] ) - XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self)) + XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self)) // Receive server's initial metadata let serverInitialMetadata: HPACKHeaders = [ @@ -509,13 +509,15 @@ final class GRPCClientStreamHandlerTests: XCTestCase { ) XCTAssertEqual( - try channel.readInbound(as: RPCResponsePart.self), + try channel.readInbound(as: RPCResponsePart.self), RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata)) ) // Send a message XCTAssertNoThrow( - try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42))) + try channel.writeOutbound( + RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42)) + ) ) // Assert we wrote it successfully into the channel @@ -542,7 +544,9 @@ final class GRPCClientStreamHandlerTests: XCTestCase { // Make sure we cannot write anymore because client's closed. XCTAssertThrowsError( ofType: RPCError.self, - try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42))) + try channel.writeOutbound( + RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42)) + ) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Invalid state") @@ -562,8 +566,8 @@ final class GRPCClientStreamHandlerTests: XCTestCase { // Make sure we read the message properly XCTAssertEqual( - try channel.readInbound(as: RPCResponsePart.self), - RPCResponsePart.message([UInt8](repeating: 0, count: 42)) + try channel.readInbound(as: RPCResponsePart.self), + RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 0, count: 42)) ) // Server sends status to end RPC @@ -580,7 +584,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { ) XCTAssertEqual( - try channel.readInbound(as: RPCResponsePart.self), + try channel.readInbound(as: RPCResponsePart.self), .status(.init(code: .dataLoss, message: "Test data loss"), ["custom-header": "custom-value"]) ) } @@ -599,7 +603,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { let channel = EmbeddedChannel(handler: handler) // Send client's initial metadata - let request = RPCRequestPart.metadata([:]) + let request = RPCRequestPart.metadata([:]) XCTAssertNoThrow(try channel.writeOutbound(request)) // Make sure we have sent the corresponding frame, and that nothing has been written back. @@ -615,7 +619,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { ] ) - XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self)) + XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self)) // Receive server's initial metadata let serverInitialMetadata: HPACKHeaders = [ @@ -629,13 +633,15 @@ final class GRPCClientStreamHandlerTests: XCTestCase { ) ) XCTAssertEqual( - try channel.readInbound(as: RPCResponsePart.self), + try channel.readInbound(as: RPCResponsePart.self), RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata)) ) // Send a message XCTAssertNoThrow( - try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42))) + try channel.writeOutbound( + RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42)) + ) ) // Assert we wrote it successfully into the channel @@ -652,28 +658,28 @@ final class GRPCClientStreamHandlerTests: XCTestCase { XCTAssertNoThrow( try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))) ) - XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self)) + XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self)) buffer.clear() buffer.writeInteger(UInt32(30)) // message length XCTAssertNoThrow( try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))) ) - XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self)) + XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self)) buffer.clear() buffer.writeRepeatingByte(0, count: 10) // first part of the message XCTAssertNoThrow( try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))) ) - XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self)) + XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self)) buffer.clear() buffer.writeRepeatingByte(1, count: 10) // second part of the message XCTAssertNoThrow( try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))) ) - XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self)) + XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self)) buffer.clear() buffer.writeRepeatingByte(2, count: 10) // third part of the message @@ -681,13 +687,14 @@ final class GRPCClientStreamHandlerTests: XCTestCase { try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))) ) + var expected = ByteBuffer() + expected.writeRepeatingByte(0, count: 10) + expected.writeRepeatingByte(1, count: 10) + expected.writeRepeatingByte(2, count: 10) // Make sure we read the message properly XCTAssertEqual( - try channel.readInbound(as: RPCResponsePart.self), - RPCResponsePart.message( - [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10) - + [UInt8](repeating: 2, count: 10) - ) + try channel.readInbound(as: RPCResponsePart.self), + RPCResponsePart.message(GRPCNIOTransportBytes(expected)) ) } @@ -705,7 +712,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { let channel = EmbeddedChannel(handler: handler) // Send client's initial metadata - let request = RPCRequestPart.metadata([:]) + let request = RPCRequestPart.metadata([:]) XCTAssertNoThrow(try channel.writeOutbound(request)) // Make sure we have sent the corresponding frame, and that nothing has been written back. @@ -721,7 +728,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { ] ) - XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self)) + XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self)) // Receive server's initial metadata let serverInitialMetadata: HPACKHeaders = [ @@ -735,7 +742,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { ) ) XCTAssertEqual( - try channel.readInbound(as: RPCResponsePart.self), + try channel.readInbound(as: RPCResponsePart.self), RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata)) ) @@ -744,11 +751,15 @@ final class GRPCClientStreamHandlerTests: XCTestCase { // until we flush. Once we flush, both messages should be sent in the same ByteBuffer. // Write back first message and make sure nothing's written in the channel. - XCTAssertNoThrow(channel.write(RPCRequestPart.message([UInt8](repeating: 1, count: 4)))) + XCTAssertNoThrow( + channel.write(RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 4))) + ) XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) // Write back second message and make sure nothing's written in the channel. - XCTAssertNoThrow(channel.write(RPCRequestPart.message([UInt8](repeating: 2, count: 4)))) + XCTAssertNoThrow( + channel.write(RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 2, count: 4))) + ) XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) // Now flush and check we *do* write the data. @@ -790,7 +801,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { let channel = EmbeddedChannel(handler: handler) // Write client's initial metadata - XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))) + XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata([:]))) let clientInitialMetadata: HPACKHeaders = [ GRPCHTTP2Keys.path.rawValue: "/test/test", GRPCHTTP2Keys.scheme.rawValue: "http", @@ -807,7 +818,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { // The client receives a status explaining the stream was closed because of the thrown error. XCTAssertEqual( - try channel.readInbound(as: RPCResponsePart.self), + try channel.readInbound(as: RPCResponsePart.self), .status( .init( code: .unavailable, @@ -820,7 +831,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { // We should now be closed: check we can't write anymore. XCTAssertThrowsError( ofType: RPCError.self, - try channel.writeOutbound(RPCRequestPart.metadata(Metadata())) + try channel.writeOutbound(RPCRequestPart.metadata([:])) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Invalid state") @@ -841,7 +852,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { let channel = EmbeddedChannel(handler: handler) // Write client's initial metadata - XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))) + XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata([:]))) let clientInitialMetadata: HPACKHeaders = [ GRPCHTTP2Keys.path.rawValue: "/test/test", GRPCHTTP2Keys.scheme.rawValue: "http", @@ -857,7 +868,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { // The client receives a status explaining the stream was closed. XCTAssertEqual( - try channel.readInbound(as: RPCResponsePart.self), + try channel.readInbound(as: RPCResponsePart.self), .status( .init(code: .unavailable, message: "Stream unexpectedly closed."), [:] @@ -867,7 +878,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { // We should now be closed: check we can't write anymore. XCTAssertThrowsError( ofType: RPCError.self, - try channel.writeOutbound(RPCRequestPart.metadata(Metadata())) + try channel.writeOutbound(RPCRequestPart.metadata([:])) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Invalid state") @@ -888,7 +899,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { let channel = EmbeddedChannel(handler: handler) // Write client's initial metadata - XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))) + XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata([:]))) let clientInitialMetadata: HPACKHeaders = [ GRPCHTTP2Keys.path.rawValue: "/test/test", GRPCHTTP2Keys.scheme.rawValue: "http", @@ -908,7 +919,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { // The client receives a status explaining RST_STREAM was sent. XCTAssertEqual( - try channel.readInbound(as: RPCResponsePart.self), + try channel.readInbound(as: RPCResponsePart.self), .status( .init( code: .unavailable, @@ -921,7 +932,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { // We should now be closed: check we can't write anymore. XCTAssertThrowsError( ofType: RPCError.self, - try channel.writeOutbound(RPCRequestPart.metadata(Metadata())) + try channel.writeOutbound(RPCRequestPart.metadata([:])) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Invalid state") diff --git a/Tests/GRPCNIOTransportCoreTests/GRPCMessageDecoderTests.swift b/Tests/GRPCNIOTransportCoreTests/GRPCMessageDecoderTests.swift index 216907d..764dcdc 100644 --- a/Tests/GRPCNIOTransportCoreTests/GRPCMessageDecoderTests.swift +++ b/Tests/GRPCNIOTransportCoreTests/GRPCMessageDecoderTests.swift @@ -41,8 +41,8 @@ final class GRPCMessageDecoderTests: XCTestCase { try ByteToMessageDecoderVerifier.verifyDecoder( inputOutputPairs: [ - (firstMessage, [Array(repeating: UInt8(42), count: 16)]), - (secondMessage, [Array(repeating: UInt8(43), count: 8)]), + (firstMessage, [ByteBuffer(repeating: UInt8(42), count: 16)]), + (secondMessage, [ByteBuffer(repeating: UInt8(43), count: 8)]), ]) { GRPCMessageDecoder(maxPayloadSize: .max) } @@ -129,19 +129,19 @@ final class GRPCMessageDecoderTests: XCTestCase { } let firstMessage = try { - framer.append(Array(repeating: 42, count: 100), promise: nil) + framer.append(ByteBuffer(repeating: 42, count: 100), promise: nil) return try framer.next(compressor: compressor)! }() let secondMessage = try { - framer.append(Array(repeating: 43, count: 110), promise: nil) + framer.append(ByteBuffer(repeating: 43, count: 110), promise: nil) return try framer.next(compressor: compressor)! }() try ByteToMessageDecoderVerifier.verifyDecoder( inputOutputPairs: [ - (firstMessage.bytes, [Array(repeating: 42, count: 100)]), - (secondMessage.bytes, [Array(repeating: 43, count: 110)]), + (firstMessage.bytes, [ByteBuffer(repeating: 42, count: 100)]), + (secondMessage.bytes, [ByteBuffer(repeating: 43, count: 110)]), ]) { GRPCMessageDecoder(maxPayloadSize: 1000, decompressor: decompressor) } @@ -164,7 +164,7 @@ final class GRPCMessageDecoderTests: XCTestCase { compressor.end() } - framer.append(Array(repeating: 42, count: 100), promise: nil) + framer.append(ByteBuffer(repeating: 42, count: 100), promise: nil) let framedMessage = try framer.next(compressor: compressor)! XCTAssertThrowsError( @@ -195,7 +195,7 @@ final class GRPCMessageDecoderTests: XCTestCase { compressor.end() } - framer.append(Array(repeating: 42, count: 101), promise: nil) + framer.append(ByteBuffer(repeating: 42, count: 101), promise: nil) let framedMessage = try framer.next(compressor: compressor)! XCTAssertThrowsError( diff --git a/Tests/GRPCNIOTransportCoreTests/GRPCMessageDeframerTests.swift b/Tests/GRPCNIOTransportCoreTests/GRPCMessageDeframerTests.swift index 6696b28..1ec5cbc 100644 --- a/Tests/GRPCNIOTransportCoreTests/GRPCMessageDeframerTests.swift +++ b/Tests/GRPCNIOTransportCoreTests/GRPCMessageDeframerTests.swift @@ -43,7 +43,7 @@ final class GRPCMessageDeframerTests: XCTestCase { 0x0, 0x0, 0x0, 0x0, // Length (0) ] deframer.append(ByteBuffer(bytes: bytes)) - XCTAssertEqual(try deframer.decodeNext(), []) + XCTAssertEqual(try deframer.decodeNext(), ByteBuffer()) } func testDecodeMessage() { @@ -54,7 +54,7 @@ final class GRPCMessageDeframerTests: XCTestCase { 0xf, // Payload ] deframer.append(ByteBuffer(bytes: bytes)) - XCTAssertEqual(try deframer.decodeNext(), [0xf]) + XCTAssertEqual(try deframer.decodeNext(), ByteBuffer(bytes: [0xf])) } func testDripFeedAndDecode() { @@ -71,7 +71,7 @@ final class GRPCMessageDeframerTests: XCTestCase { // Drip feed the last byte. deframer.append(ByteBuffer(bytes: [0xf])) - XCTAssertEqual(try deframer.decodeNext(), [0xf]) + XCTAssertEqual(try deframer.decodeNext(), ByteBuffer(bytes: [0xf])) } func testReadBytesAreDiscarded() throws { @@ -90,7 +90,7 @@ final class GRPCMessageDeframerTests: XCTestCase { XCTAssertEqual(deframer._readerIndex, 0) let message1 = try deframer.decodeNext() - XCTAssertEqual(message1, Array(repeating: 42, count: 1024)) + XCTAssertEqual(message1, ByteBuffer(repeating: 42, count: 1024)) XCTAssertNotEqual(deframer._readerIndex, 0) // Append the final byte. This should discard any read bytes and set the reader index back @@ -100,7 +100,7 @@ final class GRPCMessageDeframerTests: XCTestCase { // Read the message let message2 = try deframer.decodeNext() - XCTAssertEqual(message2, Array(repeating: 43, count: 1024)) + XCTAssertEqual(message2, ByteBuffer(repeating: 43, count: 1024)) XCTAssertNotEqual(deframer._readerIndex, 0) } } diff --git a/Tests/GRPCNIOTransportCoreTests/GRPCMessageFramerTests.swift b/Tests/GRPCNIOTransportCoreTests/GRPCMessageFramerTests.swift index c89c55d..dfc9f37 100644 --- a/Tests/GRPCNIOTransportCoreTests/GRPCMessageFramerTests.swift +++ b/Tests/GRPCNIOTransportCoreTests/GRPCMessageFramerTests.swift @@ -23,7 +23,7 @@ import XCTest final class GRPCMessageFramerTests: XCTestCase { func testSingleWrite() throws { var framer = GRPCMessageFramer() - framer.append(Array(repeating: 42, count: 128), promise: nil) + framer.append(ByteBuffer(repeating: 42, count: 128), promise: nil) var buffer = try XCTUnwrap(framer.next()).bytes let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader()) @@ -43,7 +43,7 @@ final class GRPCMessageFramerTests: XCTestCase { } var framer = GRPCMessageFramer() - let message = [UInt8](repeating: 42, count: 128) + let message = ByteBuffer(repeating: 42, count: 128) framer.append(message, promise: nil) var buffer = ByteBuffer() @@ -84,7 +84,7 @@ final class GRPCMessageFramerTests: XCTestCase { for _ in 0 ..< messagesCount { let promise = eventLoop.makePromise(of: Void.self) promises.append(promise) - framer.append(Array(repeating: 42, count: 128), promise: promise) + framer.append(ByteBuffer(repeating: 42, count: 128), promise: promise) } let nextFrame = try XCTUnwrap(framer.next()) diff --git a/Tests/GRPCNIOTransportCoreTests/GRPCStreamStateMachineTests.swift b/Tests/GRPCNIOTransportCoreTests/GRPCStreamStateMachineTests.swift index a48a347..ea09d17 100644 --- a/Tests/GRPCNIOTransportCoreTests/GRPCStreamStateMachineTests.swift +++ b/Tests/GRPCNIOTransportCoreTests/GRPCStreamStateMachineTests.swift @@ -250,7 +250,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Try to send a message without opening (i.e. without sending initial metadata) XCTAssertThrowsError( ofType: GRPCStreamStateMachine.InvalidState.self, - try stateMachine.send(message: [], promise: nil) + try stateMachine.send(message: ByteBuffer(), promise: nil) ) { error in XCTAssertEqual(error.message, "Client not yet open.") } @@ -263,7 +263,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { var stateMachine = self.makeClientStateMachine(targetState: targetState) // Now send a message - XCTAssertNoThrow(try stateMachine.send(message: [], promise: nil)) + XCTAssertNoThrow(try stateMachine.send(message: ByteBuffer(), promise: nil)) } } @@ -277,7 +277,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Try sending another message: it should fail XCTAssertThrowsError( ofType: GRPCStreamStateMachine.InvalidState.self, - try stateMachine.send(message: [], promise: nil) + try stateMachine.send(message: ByteBuffer(), promise: nil) ) { error in XCTAssertEqual(error.message, "Client is closed, cannot send a message.") } @@ -375,7 +375,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { compressionEnabled: true ) - let originalMessage = [UInt8]([42, 42, 43, 43]) + let originalMessage = ByteBuffer(bytes: [42, 42, 43, 43]) // Receiving uncompressed message should still work. let receivedUncompressedBytes = try self.frameMessage(originalMessage, compression: .none) @@ -735,7 +735,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) + XCTAssertNoThrow(try stateMachine.send(message: ByteBuffer(bytes: [42, 42]), promise: nil)) let expectedBytes: [UInt8] = [ 0, // compression flag: unset @@ -760,7 +760,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) - let originalMessage = [UInt8]([42, 42, 43, 43]) + let originalMessage = ByteBuffer(bytes: [42, 42, 43, 43]) XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil)) let request = try stateMachine.nextOutboundFrame() @@ -776,7 +776,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) - let originalMessage = [UInt8]([42, 42, 43, 43]) + let originalMessage = ByteBuffer(bytes: [42, 42, 43, 43]) XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil)) let request = try stateMachine.nextOutboundFrame() @@ -792,7 +792,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Queue a message, but assert the action is .noMoreMessages nevertheless, // because the server is closed. - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) + XCTAssertNoThrow(try stateMachine.send(message: ByteBuffer(bytes: [42, 42]), promise: nil)) XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages) } @@ -800,7 +800,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerIdle) // Send a message and close client - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) + XCTAssertNoThrow(try stateMachine.send(message: ByteBuffer(bytes: [42, 42]), promise: nil)) XCTAssertNoThrow(try stateMachine.closeOutbound()) // Make sure that getting the next outbound message _does_ return the message @@ -821,7 +821,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen) // Send a message and close client - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) + XCTAssertNoThrow(try stateMachine.send(message: ByteBuffer(bytes: [42, 42]), promise: nil)) XCTAssertNoThrow(try stateMachine.closeOutbound()) // Make sure that getting the next outbound message _does_ return the message @@ -841,7 +841,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { func testNextOutboundMessageWhenClientClosedAndServerClosed() throws { var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen) // Send a message - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) + XCTAssertNoThrow(try stateMachine.send(message: ByteBuffer(bytes: [42, 42]), promise: nil)) // Close server XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true)) @@ -878,7 +878,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { .readInbound ) - XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(ByteBuffer(bytes: [42, 42]))) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) } @@ -888,7 +888,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { compressionEnabled: true ) - let originalMessage = [UInt8]([42, 42, 43, 43]) + let originalMessage = ByteBuffer(bytes: [42, 42, 43, 43]) let receivedBytes = try self.frameMessage(originalMessage, compression: .deflate) XCTAssertEqual( try stateMachine.receive(buffer: receivedBytes, endStream: false), @@ -915,7 +915,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Close server XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true)) - XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(ByteBuffer(bytes: [42, 42]))) XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) } @@ -937,7 +937,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Even though the client is closed, because it received a message while open, // we must get the message now. - XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(ByteBuffer(bytes: [42, 42]))) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) } @@ -962,7 +962,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Even though the client is closed, because it received a message while open, // we must get the message now. - XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(ByteBuffer(bytes: [42, 42]))) XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) } @@ -1088,7 +1088,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Client sends messages XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) - let message = [UInt8]([1, 2, 3, 4]) + let message = ByteBuffer(bytes: [1, 2, 3, 4]) let framedMessage = try self.frameMessage(message, compression: .none) try stateMachine.send(message: message, promise: nil) XCTAssertEqual( @@ -1100,9 +1100,9 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Server sends response XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) - let firstResponseBytes = [UInt8]([5, 6, 7]) + let firstResponseBytes = ByteBuffer(bytes: [5, 6, 7]) let firstResponse = try self.frameMessage(firstResponseBytes, compression: .none) - let secondResponseBytes = [UInt8]([8, 9, 10]) + let secondResponseBytes = ByteBuffer(bytes: [8, 9, 10]) let secondResponse = try self.frameMessage(secondResponseBytes, compression: .none) XCTAssertEqual( try stateMachine.receive(buffer: firstResponse, endStream: false), @@ -1169,7 +1169,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Client sends messages and ends XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) - let message = [UInt8]([1, 2, 3, 4]) + let message = ByteBuffer(bytes: [1, 2, 3, 4]) let framedMessage = try self.frameMessage(message, compression: .none) XCTAssertNoThrow(try stateMachine.send(message: message, promise: nil)) XCTAssertNoThrow(try stateMachine.closeOutbound()) @@ -1198,9 +1198,9 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Server sends response XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) - let firstResponseBytes = [UInt8]([5, 6, 7]) + let firstResponseBytes = ByteBuffer(bytes: [5, 6, 7]) let firstResponse = try self.frameMessage(firstResponseBytes, compression: .none) - let secondResponseBytes = [UInt8]([8, 9, 10]) + let secondResponseBytes = ByteBuffer(bytes: [8, 9, 10]) let secondResponse = try self.frameMessage(secondResponseBytes, compression: .none) XCTAssertEqual( try stateMachine.receive(buffer: firstResponse, endStream: false), @@ -1259,7 +1259,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Client sends messages XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) - let message = [UInt8]([1, 2, 3, 4]) + let message = ByteBuffer(bytes: [1, 2, 3, 4]) let framedMessage = try self.frameMessage(message, compression: .none) try stateMachine.send(message: message, promise: nil) XCTAssertEqual( @@ -1290,9 +1290,9 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Server sends response XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) - let firstResponseBytes = [UInt8]([5, 6, 7]) + let firstResponseBytes = ByteBuffer(bytes: [5, 6, 7]) let firstResponse = try self.frameMessage(firstResponseBytes, compression: .none) - let secondResponseBytes = [UInt8]([8, 9, 10]) + let secondResponseBytes = ByteBuffer(bytes: [8, 9, 10]) let secondResponse = try self.frameMessage(secondResponseBytes, compression: .none) XCTAssertEqual( try stateMachine.receive(buffer: firstResponse, endStream: false), @@ -1516,7 +1516,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: GRPCStreamStateMachine.InvalidState.self, - try stateMachine.send(message: [], promise: nil) + try stateMachine.send(message: ByteBuffer(), promise: nil) ) { error in XCTAssertEqual( error.message, @@ -1531,7 +1531,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Now send a message XCTAssertThrowsError( ofType: GRPCStreamStateMachine.InvalidState.self, - try stateMachine.send(message: [], promise: nil) + try stateMachine.send(message: ByteBuffer(), promise: nil) ) { error in XCTAssertEqual( error.message, @@ -1544,7 +1544,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) // Now send a message - XCTAssertNoThrow(try stateMachine.send(message: [], promise: nil)) + XCTAssertNoThrow(try stateMachine.send(message: ByteBuffer(), promise: nil)) } func testSendMessageWhenClientOpenAndServerClosed() { @@ -1553,7 +1553,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail XCTAssertThrowsError( ofType: GRPCStreamStateMachine.InvalidState.self, - try stateMachine.send(message: [], promise: nil) + try stateMachine.send(message: ByteBuffer(), promise: nil) ) { error in XCTAssertEqual(error.message, "Server can't send a message if it's closed.") } @@ -1564,7 +1564,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: GRPCStreamStateMachine.InvalidState.self, - try stateMachine.send(message: [], promise: nil) + try stateMachine.send(message: ByteBuffer(), promise: nil) ) { error in XCTAssertEqual( error.message, @@ -1578,7 +1578,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending a message: even though client is closed, we should send it // because it may be expecting a response. - XCTAssertNoThrow(try stateMachine.send(message: [], promise: nil)) + XCTAssertNoThrow(try stateMachine.send(message: ByteBuffer(), promise: nil)) } func testSendMessageWhenClientClosedAndServerClosed() { @@ -1587,7 +1587,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail XCTAssertThrowsError( ofType: GRPCStreamStateMachine.InvalidState.self, - try stateMachine.send(message: [], promise: nil) + try stateMachine.send(message: ByteBuffer(), promise: nil) ) { error in XCTAssertEqual(error.message, "Server can't send a message if it's closed.") } @@ -1631,7 +1631,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail because server is now closed. XCTAssertThrowsError( ofType: GRPCStreamStateMachine.InvalidState.self, - try stateMachine.send(message: [], promise: nil) + try stateMachine.send(message: ByteBuffer(), promise: nil) ) { error in XCTAssertEqual(error.message, "Server can't send a message if it's closed.") } @@ -1652,7 +1652,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail because server is now closed. XCTAssertThrowsError( ofType: GRPCStreamStateMachine.InvalidState.self, - try stateMachine.send(message: [], promise: nil) + try stateMachine.send(message: ByteBuffer(), promise: nil) ) { error in XCTAssertEqual(error.message, "Server can't send a message if it's closed.") } @@ -1694,7 +1694,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail because server is now closed. XCTAssertThrowsError( ofType: GRPCStreamStateMachine.InvalidState.self, - try stateMachine.send(message: [], promise: nil) + try stateMachine.send(message: ByteBuffer(), promise: nil) ) { error in XCTAssertEqual(error.message, "Server can't send a message if it's closed.") } @@ -1715,7 +1715,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail because server is now closed. XCTAssertThrowsError( ofType: GRPCStreamStateMachine.InvalidState.self, - try stateMachine.send(message: [], promise: nil) + try stateMachine.send(message: ByteBuffer(), promise: nil) ) { error in XCTAssertEqual(error.message, "Server can't send a message if it's closed.") } @@ -2078,7 +2078,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { deflateCompressionEnabled: true ) - let originalMessage = [UInt8]([42, 42, 43, 43]) + let originalMessage = ByteBuffer(bytes: [42, 42, 43, 43]) // Receiving uncompressed message should still work. let receivedUncompressedBytes = try self.frameMessage(originalMessage, compression: .none) @@ -2208,7 +2208,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) + XCTAssertNoThrow(try stateMachine.send(message: ByteBuffer(bytes: [42, 42]), promise: nil)) let response = try stateMachine.nextOutboundFrame() let expectedBytes: [UInt8] = [ @@ -2230,7 +2230,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) - let originalMessage = [UInt8]([42, 42, 43, 43]) + let originalMessage = ByteBuffer(bytes: [42, 42, 43, 43]) XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil)) let response = try stateMachine.nextOutboundFrame() @@ -2242,7 +2242,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) // Send message and close server - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) + XCTAssertNoThrow(try stateMachine.send(message: ByteBuffer(bytes: [42, 42]), promise: nil)) XCTAssertNoThrow( try stateMachine.send( status: .init(code: .ok, message: ""), @@ -2277,13 +2277,13 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) // Send a message - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) + XCTAssertNoThrow(try stateMachine.send(message: ByteBuffer(bytes: [42, 42]), promise: nil)) // Close client XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true)) // Send another message - XCTAssertNoThrow(try stateMachine.send(message: [43, 43], promise: nil)) + XCTAssertNoThrow(try stateMachine.send(message: ByteBuffer(bytes: [43, 43]), promise: nil)) // Make sure that getting the next outbound message _does_ return the message // we have enqueued. @@ -2307,7 +2307,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen) // Send a message and close server - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil)) + XCTAssertNoThrow(try stateMachine.send(message: ByteBuffer(bytes: [42, 42]), promise: nil)) XCTAssertNoThrow( try stateMachine.send( status: .init(code: .ok, message: ""), @@ -2354,7 +2354,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { .readInbound ) - XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(ByteBuffer(bytes: [42, 42]))) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) } @@ -2364,7 +2364,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { deflateCompressionEnabled: true ) - let originalMessage = [UInt8]([42, 42, 43, 43]) + let originalMessage = ByteBuffer(bytes: [42, 42, 43, 43]) let receivedBytes = try self.frameMessage(originalMessage, compression: .deflate) XCTAssertEqual( @@ -2407,7 +2407,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { endStream: true ) XCTAssertEqual(action, .readInbound) - XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([])) + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(ByteBuffer())) XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) } @@ -2429,7 +2429,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Even though the client is closed, because the server received a message // while it was still open, we must get the message now. - XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) + XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(ByteBuffer(bytes: [42, 42]))) XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) } @@ -2572,7 +2572,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) // Client sends messages - let deframedMessage = [UInt8]([1, 2, 3, 4]) + let deframedMessage = ByteBuffer(bytes: [1, 2, 3, 4]) let completeMessage = try self.frameMessage(deframedMessage, compression: .none) // Split message into two parts to make sure the stitching together of the frames works well let firstMessage = completeMessage.getSlice(at: 0, length: 4)! @@ -2594,8 +2594,8 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let firstPromise = eventLoop.makePromise(of: Void.self) let secondPromise = eventLoop.makePromise(of: Void.self) - let firstResponse = [UInt8]([5, 6, 7]) - let secondResponse = [UInt8]([8, 9, 10]) + let firstResponse = ByteBuffer(bytes: [5, 6, 7]) + let secondResponse = ByteBuffer(bytes: [8, 9, 10]) XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) try stateMachine.send(message: firstResponse, promise: firstPromise) @@ -2657,7 +2657,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) // Client sends messages - let deframedMessage = [UInt8]([1, 2, 3, 4]) + let deframedMessage = ByteBuffer(bytes: [1, 2, 3, 4]) let completeMessage = try self.frameMessage(deframedMessage, compression: .none) // Split message into two parts to make sure the stitching together of the frames works well let firstMessage = completeMessage.getSlice(at: 0, length: 4)! @@ -2692,8 +2692,8 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) // Server sends response - let firstResponse = [UInt8]([5, 6, 7]) - let secondResponse = [UInt8]([8, 9, 10]) + let firstResponse = ByteBuffer(bytes: [5, 6, 7]) + let secondResponse = ByteBuffer(bytes: [8, 9, 10]) XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) try stateMachine.send(message: firstResponse, promise: nil) try stateMachine.send(message: secondResponse, promise: nil) @@ -2736,7 +2736,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) // Client sends messages - let deframedMessage = [UInt8]([1, 2, 3, 4]) + let deframedMessage = ByteBuffer(bytes: [1, 2, 3, 4]) let completeMessage = try self.frameMessage(deframedMessage, compression: .none) // Split message into two parts to make sure the stitching together of the frames works well let firstMessage = completeMessage.getSlice(at: 0, length: 4)! @@ -2771,8 +2771,8 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) // Server sends response - let firstResponse = [UInt8]([5, 6, 7]) - let secondResponse = [UInt8]([8, 9, 10]) + let firstResponse = ByteBuffer(bytes: [5, 6, 7]) + let secondResponse = ByteBuffer(bytes: [8, 9, 10]) XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages) try stateMachine.send(message: firstResponse, promise: nil) try stateMachine.send(message: secondResponse, promise: nil) @@ -2811,12 +2811,14 @@ extension XCTestCase { try expression(trailers) } - func frameMessage(_ message: [UInt8], compression: CompressionAlgorithm) throws -> ByteBuffer { + func frameMessage(_ message: ByteBuffer, compression: CompressionAlgorithm) throws -> ByteBuffer { try frameMessages([message], compression: compression) } - func frameMessages(_ messages: [[UInt8]], compression: CompressionAlgorithm) throws -> ByteBuffer - { + func frameMessages( + _ messages: [ByteBuffer], + compression: CompressionAlgorithm + ) throws -> ByteBuffer { var framer = GRPCMessageFramer() let compressor: Zlib.Compressor? = { switch compression { diff --git a/Tests/GRPCNIOTransportCoreTests/Server/Compression/ZlibTests.swift b/Tests/GRPCNIOTransportCoreTests/Server/Compression/ZlibTests.swift index ff0adb5..86fc57b 100644 --- a/Tests/GRPCNIOTransportCoreTests/Server/Compression/ZlibTests.swift +++ b/Tests/GRPCNIOTransportCoreTests/Server/Compression/ZlibTests.swift @@ -30,7 +30,7 @@ final class ZlibTests: XCTestCase { the people who are crazy enough to think they can change the world, are the ones who do. """ - private func compress(_ input: [UInt8], method: Zlib.Method) throws -> ByteBuffer { + private func compress(_ input: ByteBuffer, method: Zlib.Method) throws -> ByteBuffer { let compressor = Zlib.Compressor(method: method) defer { compressor.end() } @@ -43,7 +43,7 @@ final class ZlibTests: XCTestCase { _ input: ByteBuffer, method: Zlib.Method, limit: Int = .max - ) throws -> [UInt8] { + ) throws -> ByteBuffer { let decompressor = Zlib.Decompressor(method: method) defer { decompressor.end() } @@ -52,21 +52,21 @@ final class ZlibTests: XCTestCase { } func testRoundTripUsingDeflate() throws { - let original = Array(self.text.utf8) + let original = ByteBuffer(bytes: self.text.utf8) let compressed = try self.compress(original, method: .deflate) let decompressed = try self.decompress(compressed, method: .deflate) XCTAssertEqual(original, decompressed) } func testRoundTripUsingGzip() throws { - let original = Array(self.text.utf8) + let original = ByteBuffer(bytes: self.text.utf8) let compressed = try self.compress(original, method: .gzip) let decompressed = try self.decompress(compressed, method: .gzip) XCTAssertEqual(original, decompressed) } func testRepeatedCompresses() throws { - let original = Array(self.text.utf8) + let original = ByteBuffer(bytes: self.text.utf8) let compressor = Zlib.Compressor(method: .deflate) defer { compressor.end() } @@ -82,7 +82,7 @@ final class ZlibTests: XCTestCase { } func testRepeatedDecompresses() throws { - let original = Array(self.text.utf8) + let original = ByteBuffer(bytes: self.text.utf8) let decompressor = Zlib.Decompressor(method: .deflate) defer { decompressor.end() } @@ -101,14 +101,14 @@ final class ZlibTests: XCTestCase { // This compresses down to 17 bytes with deflate. The decompressor sets the output buffer to // be double the size of the input buffer and will grow it if necessary. This test exercises // that path. - let original = [UInt8](repeating: 0, count: 1024) + let original = ByteBuffer(repeating: 0, count: 1024) let compressed = try self.compress(original, method: .deflate) let decompressed = try self.decompress(compressed, method: .deflate) XCTAssertEqual(decompressed, original) } func testDecompressRespectsLimit() throws { - let compressed = try self.compress(Array(self.text.utf8), method: .deflate) + let compressed = try self.compress(ByteBuffer(bytes: self.text.utf8), method: .deflate) let limit = compressed.readableBytes - 1 XCTAssertThrowsError( ofType: RPCError.self, @@ -123,13 +123,13 @@ final class ZlibTests: XCTestCase { defer { compressor.end() } var buffer = ByteBuffer() - try compressor.compress(Array(repeating: 0, count: 1024), into: &buffer) + try compressor.compress(ByteBuffer(repeating: 0, count: 1024), into: &buffer) // Should be some readable bytes. let byteCount1 = buffer.readableBytes XCTAssertGreaterThan(byteCount1, 0) - try compressor.compress(Array(repeating: 1, count: 1024), into: &buffer) + try compressor.compress(ByteBuffer(repeating: 1, count: 1024), into: &buffer) // Should be some readable bytes. let byteCount2 = buffer.readableBytes @@ -137,9 +137,9 @@ final class ZlibTests: XCTestCase { let slice1 = buffer.readSlice(length: byteCount1)! let decompressed1 = try self.decompress(slice1, method: .deflate) - XCTAssertEqual(decompressed1, Array(repeating: 0, count: 1024)) + XCTAssertEqual(decompressed1, ByteBuffer(repeating: 0, count: 1024)) let decompressed2 = try self.decompress(buffer, method: .deflate) - XCTAssertEqual(decompressed2, Array(repeating: 1, count: 1024)) + XCTAssertEqual(decompressed2, ByteBuffer(repeating: 1, count: 1024)) } } diff --git a/Tests/GRPCNIOTransportCoreTests/Server/GRPCServerStreamHandlerTests.swift b/Tests/GRPCNIOTransportCoreTests/Server/GRPCServerStreamHandlerTests.swift index b3acc34..e05498b 100644 --- a/Tests/GRPCNIOTransportCoreTests/Server/GRPCServerStreamHandlerTests.swift +++ b/Tests/GRPCNIOTransportCoreTests/Server/GRPCServerStreamHandlerTests.swift @@ -266,15 +266,17 @@ final class GRPCServerStreamHandlerTests: XCTestCase { // Make sure we haven't sent back an error response, and that we read the initial metadata XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) XCTAssertEqual( - try channel.readInbound(as: RPCRequestPart.self), - RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) + try channel.readInbound(as: RPCRequestPart.self), + RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) ) // Write back server's initial metadata let headers: HPACKHeaders = [ "some-custom-header": "some-custom-value" ] - let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers)) + let serverInitialMetadata = RPCResponsePart.metadata( + Metadata(headers: headers) + ) XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata)) // Make sure we wrote back the initial metadata @@ -305,7 +307,7 @@ final class GRPCServerStreamHandlerTests: XCTestCase { // Make sure we haven't sent a response back and that we didn't read the received message XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) - XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self)) + XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self)) } func testClientEndsStream() throws { @@ -330,15 +332,17 @@ final class GRPCServerStreamHandlerTests: XCTestCase { // Make sure we haven't sent back an error response, and that we read the initial metadata XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) XCTAssertEqual( - try channel.readInbound(as: RPCRequestPart.self), - RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) + try channel.readInbound(as: RPCRequestPart.self), + RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) ) // Write back server's initial metadata let headers: HPACKHeaders = [ "some-custom-header": "some-custom-value" ] - let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers)) + let serverInitialMetadata = RPCResponsePart.metadata( + Metadata(headers: headers) + ) XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata)) // Make sure we wrote back the initial metadata @@ -390,15 +394,17 @@ final class GRPCServerStreamHandlerTests: XCTestCase { // Make sure we haven't sent back an error response, and that we read the initial metadata XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) XCTAssertEqual( - try channel.readInbound(as: RPCRequestPart.self), - RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) + try channel.readInbound(as: RPCRequestPart.self), + RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) ) // Write back server's initial metadata let headers: HPACKHeaders = [ "some-custom-header": "some-custom-value" ] - let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers)) + let serverInitialMetadata = RPCResponsePart.metadata( + Metadata(headers: headers) + ) XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata)) // Make sure we wrote back the initial metadata @@ -424,12 +430,12 @@ final class GRPCServerStreamHandlerTests: XCTestCase { // Make sure we haven't sent back an error response, and that we read the message properly XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) XCTAssertEqual( - try channel.readInbound(as: RPCRequestPart.self), - RPCRequestPart.message([UInt8](repeating: 0, count: 42)) + try channel.readInbound(as: RPCRequestPart.self), + RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 0, count: 42)) ) // Write back response - let serverDataPayload = RPCResponsePart.message([UInt8](repeating: 1, count: 42)) + let serverDataPayload = RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 1, count: 42)) XCTAssertNoThrow(try channel.writeOutbound(serverDataPayload)) // Make sure we wrote back the right message @@ -442,7 +448,7 @@ final class GRPCServerStreamHandlerTests: XCTestCase { XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer)) // Send back status to end RPC - let trailers = RPCResponsePart.status( + let trailers = RPCResponsePart.status( .init(code: .dataLoss, message: "Test data loss"), ["custom-header": "custom-value"] ) @@ -494,15 +500,17 @@ final class GRPCServerStreamHandlerTests: XCTestCase { // Make sure we haven't sent back an error response, and that we read the initial metadata XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) XCTAssertEqual( - try channel.readInbound(as: RPCRequestPart.self), - RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) + try channel.readInbound(as: RPCRequestPart.self), + RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) ) // Write back server's initial metadata let headers: HPACKHeaders = [ "some-custom-header": "some-custom-value" ] - let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers)) + let serverInitialMetadata = RPCResponsePart.metadata( + Metadata(headers: headers) + ) XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata)) // Make sure we wrote back the initial metadata @@ -523,28 +531,28 @@ final class GRPCServerStreamHandlerTests: XCTestCase { XCTAssertNoThrow( try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))) ) - XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self)) + XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self)) buffer.clear() buffer.writeInteger(UInt32(30)) // message length XCTAssertNoThrow( try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))) ) - XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self)) + XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self)) buffer.clear() buffer.writeRepeatingByte(0, count: 10) // first part of the message XCTAssertNoThrow( try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))) ) - XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self)) + XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self)) buffer.clear() buffer.writeRepeatingByte(1, count: 10) // second part of the message XCTAssertNoThrow( try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))) ) - XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self)) + XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self)) buffer.clear() buffer.writeRepeatingByte(2, count: 10) // third part of the message @@ -552,14 +560,15 @@ final class GRPCServerStreamHandlerTests: XCTestCase { try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))) ) + var expected = ByteBuffer() + expected.writeRepeatingByte(0, count: 10) + expected.writeRepeatingByte(1, count: 10) + expected.writeRepeatingByte(2, count: 10) // Make sure we haven't sent back an error response, and that we read the message properly XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) XCTAssertEqual( - try channel.readInbound(as: RPCRequestPart.self), - RPCRequestPart.message( - [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10) - + [UInt8](repeating: 2, count: 10) - ) + try channel.readInbound(as: RPCRequestPart.self), + RPCRequestPart.message(GRPCNIOTransportBytes(expected)) ) } @@ -620,15 +629,17 @@ final class GRPCServerStreamHandlerTests: XCTestCase { // Make sure we haven't sent back an error response, and that we read the initial metadata XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) XCTAssertEqual( - try channel.readInbound(as: RPCRequestPart.self), - RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) + try channel.readInbound(as: RPCRequestPart.self), + RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) ) // Write back server's initial metadata let headers: HPACKHeaders = [ "some-custom-header": "some-custom-value" ] - let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers)) + let serverInitialMetadata = RPCResponsePart.metadata( + Metadata(headers: headers) + ) XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata)) // Read out the metadata @@ -639,11 +650,15 @@ final class GRPCServerStreamHandlerTests: XCTestCase { // until we flush. Once we flush, both messages should be sent in the same ByteBuffer. // Write back first message and make sure nothing's written in the channel. - XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4)))) + XCTAssertNoThrow( + channel.write(RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 1, count: 4))) + ) XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) // Write back second message and make sure nothing's written in the channel. - XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 2, count: 4)))) + XCTAssertNoThrow( + channel.write(RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 2, count: 4))) + ) XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) // Now flush and check we *do* write the data. @@ -693,12 +708,14 @@ final class GRPCServerStreamHandlerTests: XCTestCase { // Make sure we haven't sent back an error response, and that we read the initial metadata XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) XCTAssertEqual( - try channel.readInbound(as: RPCRequestPart.self), - RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) + try channel.readInbound(as: RPCRequestPart.self), + RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) ) // Write back server's initial metadata - let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: [:])) + let serverInitialMetadata = RPCResponsePart.metadata( + Metadata(headers: [:]) + ) XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata)) // Read out the metadata @@ -711,11 +728,17 @@ final class GRPCServerStreamHandlerTests: XCTestCase { // and trailers won't be written before the messages. // Write back message and make sure nothing's written in the channel. - XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4)))) + XCTAssertNoThrow( + channel.write(RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 1, count: 4))) + ) XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) // Write status + metadata and make sure nothing's written. - XCTAssertNoThrow(channel.write(RPCResponsePart.status(.init(code: .ok, message: ""), [:]))) + XCTAssertNoThrow( + channel.write( + RPCResponsePart.status(.init(code: .ok, message: ""), [:]) + ) + ) XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) // Now flush and check we *do* write the data in the right order: message first, @@ -770,8 +793,8 @@ final class GRPCServerStreamHandlerTests: XCTestCase { // Make sure we haven't sent back an error response, and that we read the initial metadata XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) XCTAssertEqual( - try channel.readInbound(as: RPCRequestPart.self), - RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) + try channel.readInbound(as: RPCRequestPart.self), + RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) ) XCTAssertEqual( @@ -852,8 +875,8 @@ final class GRPCServerStreamHandlerTests: XCTestCase { // Make sure we haven't sent back an error response, and that we read the initial metadata XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) XCTAssertEqual( - try channel.readInbound(as: RPCRequestPart.self), - RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) + try channel.readInbound(as: RPCRequestPart.self), + RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) ) // An error is fired down the pipeline @@ -871,7 +894,7 @@ final class GRPCServerStreamHandlerTests: XCTestCase { // We should now be closed: check we can't write anymore. XCTAssertThrowsError( ofType: RPCError.self, - try channel.writeOutbound(RPCResponsePart.metadata(Metadata())) + try channel.writeOutbound(RPCResponsePart.metadata(Metadata())) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Invalid state") @@ -905,8 +928,8 @@ final class GRPCServerStreamHandlerTests: XCTestCase { // Make sure we haven't sent back an error response, and that we read the initial metadata XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) XCTAssertEqual( - try channel.readInbound(as: RPCRequestPart.self), - RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) + try channel.readInbound(as: RPCRequestPart.self), + RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) ) // Channel becomes inactive @@ -924,7 +947,7 @@ final class GRPCServerStreamHandlerTests: XCTestCase { // We should now be closed: check we can't write anymore. XCTAssertThrowsError( ofType: RPCError.self, - try channel.writeOutbound(RPCResponsePart.metadata(Metadata())) + try channel.writeOutbound(RPCResponsePart.metadata(Metadata())) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Invalid state") @@ -958,8 +981,8 @@ final class GRPCServerStreamHandlerTests: XCTestCase { // Make sure we haven't sent back an error response, and that we read the initial metadata XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self)) XCTAssertEqual( - try channel.readInbound(as: RPCRequestPart.self), - RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) + try channel.readInbound(as: RPCRequestPart.self), + RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata)) ) // We receive RST_STREAM frame @@ -977,7 +1000,7 @@ final class GRPCServerStreamHandlerTests: XCTestCase { // We should now be closed: check we can't write anymore. XCTAssertThrowsError( ofType: RPCError.self, - try channel.writeOutbound(RPCResponsePart.metadata(Metadata())) + try channel.writeOutbound(RPCResponsePart.metadata([:])) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Invalid state") @@ -1109,7 +1132,7 @@ struct ServerStreamHandlerTests { ) // Now we write back server's initial metadata... - let serverInitialMetadata = RPCResponsePart.metadata([:]) + let serverInitialMetadata = RPCResponsePart.metadata([:]) try channel.writeOutbound(serverInitialMetadata) // And this should have updated the FrameStats @@ -1118,7 +1141,7 @@ struct ServerStreamHandlerTests { // Manually reset the FrameStats to make sure that writing data also updates it correctly. handlers.connectionHandler.frameStats.reset() #expect(!handlers.connectionHandler.frameStats.didWriteHeadersOrData) - try channel.writeOutbound(RPCResponsePart.message([42])) + try channel.writeOutbound(RPCResponsePart.message(GRPCNIOTransportBytes([42]))) #expect(handlers.connectionHandler.frameStats.didWriteHeadersOrData) // Clean up. diff --git a/Tests/GRPCNIOTransportHTTP2Tests/ControlClient.swift b/Tests/GRPCNIOTransportHTTP2Tests/ControlClient.swift index 67d7c91..22bd42c 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/ControlClient.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/ControlClient.swift @@ -16,10 +16,10 @@ import GRPCCore -internal struct ControlClient { - internal let client: GRPCCore.GRPCClient +internal struct ControlClient where Transport: ClientTransport { + internal let client: GRPCCore.GRPCClient - internal init(wrapping client: GRPCCore.GRPCClient) { + internal init(wrapping client: GRPCCore.GRPCClient) { self.client = client } diff --git a/Tests/GRPCNIOTransportHTTP2Tests/ControlMessages.swift b/Tests/GRPCNIOTransportHTTP2Tests/ControlMessages.swift index dfce195..eddca49 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/ControlMessages.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/ControlMessages.swift @@ -137,16 +137,18 @@ struct Empty: Codable { struct JSONSerializer: MessageSerializer { private let encoder = JSONEncoder() - func serialize(_ message: Message) throws -> [UInt8] { + func serialize(_ message: Message) throws -> Bytes { let data = try self.encoder.encode(message) - return Array(data) + return Bytes(data) } } struct JSONDeserializer: MessageDeserializer { private let decoder = JSONDecoder() - func deserialize(_ serializedMessageBytes: [UInt8]) throws -> Message { - try self.decoder.decode(Message.self, from: Data(serializedMessageBytes)) + func deserialize(_ serializedMessageBytes: Bytes) throws -> Message { + try serializedMessageBytes.withUnsafeBytes { + try self.decoder.decode(Message.self, from: Data($0)) + } } } diff --git a/Tests/GRPCNIOTransportHTTP2Tests/ControlService.swift b/Tests/GRPCNIOTransportHTTP2Tests/ControlService.swift index c7c123c..a18ae8f 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/ControlService.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/ControlService.swift @@ -18,7 +18,7 @@ import Foundation import GRPCCore struct ControlService: RegistrableRPCService { - func registerMethods(with router: inout RPCRouter) { + func registerMethods(with router: inout RPCRouter) { router.registerHandler( forMethod: MethodDescriptor(fullyQualifiedService: "Control", method: "Unary"), deserializer: JSONDeserializer(), @@ -121,11 +121,11 @@ extension ControlService { } private func clientRemotePeerInfo(request: StreamingServerRequest) -> String { - request.metadata[stringValues: "remotePeer"].first(where: { _ in true })! + request.metadata[stringValues: "remotePeer"].first(where: { _ in true }) ?? "" } private func clientLocalPeerInfo(request: StreamingServerRequest) -> String { - request.metadata[stringValues: "localPeer"].first(where: { _ in true })! + request.metadata[stringValues: "localPeer"].first(where: { _ in true }) ?? "" } private func handle( diff --git a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTLSEnabledTests.swift b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTLSEnabledTests.swift index f41ec08..fac2c42 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTLSEnabledTests.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTLSEnabledTests.swift @@ -490,7 +490,7 @@ struct HTTP2TransportTLSEnabledTests { func withClientAndServer( clientConfig: ClientConfig, serverConfig: ServerConfig, - _ test: (ControlClient) async throws -> Void + _ test: (ControlClient) async throws -> Void ) async throws { try await withThrowingDiscardingTaskGroup { group in let server = self.makeServer(config: serverConfig) @@ -512,7 +512,7 @@ struct HTTP2TransportTLSEnabledTests { group.addTask { do { - try await client.run() + try await client.runConnections() } catch { throw TLSEnabledTestsError.clientError(cause: error) } @@ -526,16 +526,18 @@ struct HTTP2TransportTLSEnabledTests { } } - private func makeServer(config: ServerConfig) -> GRPCServer { + private func makeServer(config: ServerConfig) -> GRPCServer { let services = [ControlService()] switch config { case .posix(let config): return GRPCServer( - transport: .http2NIOPosix( - address: .ipv4(host: "127.0.0.1", port: 0), - transportSecurity: config.security, - config: config.transport + transport: NIOServerTransport( + .http2NIOPosix( + address: .ipv4(host: "127.0.0.1", port: 0), + transportSecurity: config.security, + config: config.transport + ) ), services: services ) @@ -543,10 +545,12 @@ struct HTTP2TransportTLSEnabledTests { #if canImport(Network) case .transportServices(let config): return GRPCServer( - transport: .http2NIOTS( - address: .ipv4(host: "127.0.0.1", port: 0), - transportSecurity: config.security, - config: config.transport + transport: NIOServerTransport( + .http2NIOTS( + address: .ipv4(host: "127.0.0.1", port: 0), + transportSecurity: config.security, + config: config.transport + ) ), services: services ) @@ -557,33 +561,32 @@ struct HTTP2TransportTLSEnabledTests { private func makeClient( config: ClientConfig, target: any ResolvableTarget - ) throws -> GRPCClient { - let transport: any ClientTransport - + ) throws -> GRPCClient { switch config { case .posix(let config): - transport = try HTTP2ClientTransport.Posix( + let transport = try HTTP2ClientTransport.Posix( target: target, transportSecurity: config.security, config: config.transport, serviceConfig: ServiceConfig() ) + return GRPCClient(transport: NIOClientTransport(transport)) #if canImport(Network) case .transportServices(let config): - transport = try HTTP2ClientTransport.TransportServices( + let transport = try HTTP2ClientTransport.TransportServices( target: target, transportSecurity: config.security, config: config.transport, serviceConfig: ServiceConfig() ) + return GRPCClient(transport: NIOClientTransport(transport)) #endif } - return GRPCClient(transport: transport) } - private func executeUnaryRPC(control: ControlClient) async throws { + private func executeUnaryRPC(control: ControlClient) async throws { let input = ControlInput.with { $0.numberOfMessages = 1 } let request = ClientRequest(message: input) try await control.unary(request: request) { response in diff --git a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift index 0d45776..98c1cd3 100644 --- a/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift +++ b/Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift @@ -52,7 +52,11 @@ final class HTTP2TransportTests: XCTestCase { clientCompression: CompressionAlgorithm = .none, clientEnabledCompression: CompressionAlgorithmSet = .none, serverCompression: CompressionAlgorithmSet = .none, - _ execute: (ControlClient, GRPCServer, Transport) async throws -> Void + _ execute: ( + ControlClient, + GRPCServer, + Transport + ) async throws -> Void ) async throws { for pair in transport { try await withThrowingTaskGroup(of: Void.self) { group in @@ -84,7 +88,7 @@ final class HTTP2TransportTests: XCTestCase { ) group.addTask { - try await client.run() + try await client.runConnections() } do { @@ -102,7 +106,7 @@ final class HTTP2TransportTests: XCTestCase { func forEachClientAndHTTPStatusCodeServer( _ kind: [Transport.Kind] = [.posix, .niots], - _ execute: (ControlClient, Transport.Kind) async throws -> Void + _ execute: (ControlClient, Transport.Kind) async throws -> Void ) async throws { for clientKind in kind { try await withThrowingTaskGroup(of: Void.self) { group in @@ -119,7 +123,7 @@ final class HTTP2TransportTests: XCTestCase { enabledCompression: .none ) group.addTask { - try await client.run() + try await client.runConnections() } do { @@ -140,18 +144,20 @@ final class HTTP2TransportTests: XCTestCase { kind: Transport.Kind, enableControlService: Bool, compression: CompressionAlgorithmSet - ) async throws -> (GRPCServer, GRPCNIOTransportCore.SocketAddress) { + ) async throws -> (GRPCServer, GRPCNIOTransportCore.SocketAddress) { let services = enableControlService ? [ControlService()] : [] switch kind { case .posix: let server = GRPCServer( - transport: .http2NIOPosix( - address: address, - transportSecurity: .plaintext, - config: .defaults { - $0.compression.enabledAlgorithms = compression - } + transport: NIOServerTransport( + .http2NIOPosix( + address: address, + transportSecurity: .plaintext, + config: .defaults { + $0.compression.enabledAlgorithms = compression + } + ) ), services: services ) @@ -166,12 +172,14 @@ final class HTTP2TransportTests: XCTestCase { case .niots: #if canImport(Network) let server = GRPCServer( - transport: .http2NIOTS( - address: address, - transportSecurity: .plaintext, - config: .defaults { - $0.compression.enabledAlgorithms = compression - } + transport: NIOServerTransport( + .http2NIOTS( + address: address, + transportSecurity: .plaintext, + config: .defaults { + $0.compression.enabledAlgorithms = compression + } + ) ), services: services ) @@ -193,14 +201,14 @@ final class HTTP2TransportTests: XCTestCase { target: any ResolvableTarget, compression: CompressionAlgorithm, enabledCompression: CompressionAlgorithmSet - ) throws -> GRPCClient { - let transport: any ClientTransport + ) throws -> GRPCClient { + let transport: NIOClientTransport switch kind { case .posix: var serviceConfig = ServiceConfig() serviceConfig.loadBalancingConfig = [.roundRobin] - transport = try HTTP2ClientTransport.Posix( + let posix = try HTTP2ClientTransport.Posix( target: target, transportSecurity: .plaintext, config: .defaults { @@ -209,12 +217,13 @@ final class HTTP2TransportTests: XCTestCase { }, serviceConfig: serviceConfig ) + transport = NIOClientTransport(posix) case .niots: #if canImport(Network) var serviceConfig = ServiceConfig() serviceConfig.loadBalancingConfig = [.roundRobin] - transport = try HTTP2ClientTransport.TransportServices( + let transportServices = try HTTP2ClientTransport.TransportServices( target: target, transportSecurity: .plaintext, config: .defaults { @@ -223,6 +232,7 @@ final class HTTP2TransportTests: XCTestCase { }, serviceConfig: serviceConfig ) + transport = NIOClientTransport(transportServices) #else throw XCTSkip("Transport not supported on this platform") #endif @@ -793,7 +803,7 @@ final class HTTP2TransportTests: XCTestCase { private func testUnaryCompression( client: CompressionAlgorithm, server: CompressionAlgorithm, - control: ControlClient, + control: ControlClient, pair: Transport ) async throws { let message = ControlInput.with { @@ -843,7 +853,7 @@ final class HTTP2TransportTests: XCTestCase { private func testClientStreamingCompression( client: CompressionAlgorithm, server: CompressionAlgorithm, - control: ControlClient, + control: ControlClient, pair: Transport ) async throws { let request = StreamingClientRequest(of: ControlInput.self) { writer in @@ -888,7 +898,7 @@ final class HTTP2TransportTests: XCTestCase { private func testServerStreamingCompression( client: CompressionAlgorithm, server: CompressionAlgorithm, - control: ControlClient, + control: ControlClient, pair: Transport ) async throws { let message = ControlInput.with { @@ -939,7 +949,7 @@ final class HTTP2TransportTests: XCTestCase { private func testBidiStreamingCompression( client: CompressionAlgorithm, server: CompressionAlgorithm, - control: ControlClient, + control: ControlClient, pair: Transport ) async throws { let request = StreamingClientRequest(of: ControlInput.self) { writer in @@ -1489,7 +1499,10 @@ final class HTTP2TransportTests: XCTestCase { } } - private func checkAuthority(client: GRPCClient, expected: String) async throws { + private func checkAuthority( + client: GRPCClient, + expected: String + ) async throws { let control = ControlClient(wrapping: client) let input = ControlInput.with { $0.echoMetadataInHeaders = true @@ -1527,12 +1540,14 @@ final class HTTP2TransportTests: XCTestCase { let target = clientTarget(listeningAddress) try await withGRPCClient( - transport: .http2NIOPosix( - target: target, - transportSecurity: .plaintext, - config: .defaults { - $0.http2.authority = override - } + transport: NIOClientTransport( + .http2NIOPosix( + target: target, + transportSecurity: .plaintext, + config: .defaults { + $0.http2.authority = override + } + ) ) ) { client in try await self.checkAuthority(client: client, expected: expectedAuthority(listeningAddress)) diff --git a/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/TransportKind.swift b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/TransportKind.swift new file mode 100644 index 0000000..4f6afac --- /dev/null +++ b/Tests/GRPCNIOTransportHTTP2Tests/Test Utilities/TransportKind.swift @@ -0,0 +1,156 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore +import GRPCNIOTransportHTTP2 + +enum NIOClientTransport: ClientTransport { + case posix(HTTP2ClientTransport.Posix) + #if canImport(Network) + case transportServices(HTTP2ClientTransport.TransportServices) + #endif + + init(_ transport: HTTP2ClientTransport.Posix) { + self = .posix(transport) + } + + #if canImport(Network) + init(_ transport: HTTP2ClientTransport.TransportServices) { + self = .transportServices(transport) + } + #endif + + typealias Bytes = GRPCNIOTransportBytes + + var retryThrottle: GRPCCore.RetryThrottle? { + switch self { + case .posix(let transport): + return transport.retryThrottle + #if canImport(Network) + case .transportServices(let transport): + return transport.retryThrottle + #endif + } + } + + func connect() async throws { + switch self { + case .posix(let transport): + try await transport.connect() + #if canImport(Network) + case .transportServices(let transport): + try await transport.connect() + #endif + } + } + + func beginGracefulShutdown() { + switch self { + case .posix(let transport): + transport.beginGracefulShutdown() + #if canImport(Network) + case .transportServices(let transport): + transport.beginGracefulShutdown() + #endif + } + } + + func withStream( + descriptor: MethodDescriptor, + options: CallOptions, + _ closure: (_ stream: RPCStream, _ context: ClientContext) async throws -> T + ) async throws -> T { + switch self { + case .posix(let transport): + return try await transport.withStream(descriptor: descriptor, options: options, closure) + #if canImport(Network) + case .transportServices(let transport): + return try await transport.withStream(descriptor: descriptor, options: options, closure) + #endif + } + } + + func config(forMethod descriptor: GRPCCore.MethodDescriptor) -> GRPCCore.MethodConfig? { + switch self { + case .posix(let transport): + return transport.config(forMethod: descriptor) + #if canImport(Network) + case .transportServices(let transport): + return transport.config(forMethod: descriptor) + #endif + } + } + +} + +enum NIOServerTransport: ServerTransport, ListeningServerTransport { + case posix(HTTP2ServerTransport.Posix) + #if canImport(Network) + case transportServices(HTTP2ServerTransport.TransportServices) + #endif + + init(_ transport: HTTP2ServerTransport.Posix) { + self = .posix(transport) + } + + #if canImport(Network) + init(_ transport: HTTP2ServerTransport.TransportServices) { + self = .transportServices(transport) + } + #endif + + typealias Bytes = GRPCNIOTransportBytes + + var listeningAddress: GRPCNIOTransportCore.SocketAddress { + get async throws { + switch self { + case .posix(let transport): + try await transport.listeningAddress + #if canImport(Network) + case .transportServices(let transport): + try await transport.listeningAddress + #endif + } + } + } + + func listen( + streamHandler: @escaping @Sendable ( + _ stream: RPCStream, + _ context: ServerContext + ) async -> Void + ) async throws { + switch self { + case .posix(let transport): + try await transport.listen(streamHandler: streamHandler) + #if canImport(Network) + case .transportServices(let transport): + try await transport.listen(streamHandler: streamHandler) + #endif + } + } + + func beginGracefulShutdown() { + switch self { + case .posix(let transport): + transport.beginGracefulShutdown() + #if canImport(Network) + case .transportServices(let transport): + transport.beginGracefulShutdown() + #endif + } + } +}