Skip to content

Commit

Permalink
Adopt new bag-of-bytes protocol (#55)
Browse files Browse the repository at this point in the history
Motivation:

The core package added a bag-of-bytes protocol which transports must be
generic over. We need to adopt those changes here.

Modifications:

- Add a `GRPCNIOTransportBytes` type which wraps `ByteBuffer` and
implements the `GRPCContiguousBytes` protocol.
- Use this as the bytes type for client/server transports.
- Update the appropriate state machines to deal in terms of `ByteBuffer`
- Update the compressor/decompressor to deal in terms of `ByteBuffer`
- Update tests

Result:

Avoid unconditonal copying of messages to/from `[UInt8]`/`ByteBuffer`
  • Loading branch information
glbrntt authored Jan 17, 2025
1 parent 64874f2 commit 56defe6
Show file tree
Hide file tree
Showing 42 changed files with 789 additions and 354 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ struct InteroperabilityTestsExecutable: AsyncParsableCommand {

try await withThrowingDiscardingTaskGroup { group in
group.addTask {
try await client.run()
try await client.runConnections()
}

for testName in testNames {
Expand All @@ -111,7 +111,10 @@ struct InteroperabilityTestsExecutable: AsyncParsableCommand {
}
}

private func buildClient(host: String, port: Int) throws -> GRPCClient {
private func buildClient(
host: String,
port: Int
) throws -> GRPCClient<HTTP2ClientTransport.Posix> {
let serviceConfig = ServiceConfig(loadBalancingConfig: [.roundRobin])
return GRPCClient(
transport: try .http2NIOPosix(
Expand All @@ -127,7 +130,7 @@ struct InteroperabilityTestsExecutable: AsyncParsableCommand {

private func runTest(
_ testCase: InteroperabilityTestCase,
using client: GRPCClient
using client: GRPCClient<HTTP2ClientTransport.Posix>
) async {
print("Running '\(testCase.name)' ... ", terminator: "")
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import Foundation
import GRPCCore
import GRPCNIOTransportHTTP2
import NIOConcurrencyHelpers
import Synchronization

Expand All @@ -29,7 +30,7 @@ final class BenchmarkClient: Sendable {
}

/// The underlying client.
private let client: GRPCClient
private let client: GRPCClient<HTTP2ClientTransport.Posix>

/// The number of concurrent RPCs to run.
private let concurrentRPCs: Int
Expand All @@ -49,7 +50,7 @@ final class BenchmarkClient: Sendable {
private let rpcStats: NIOLockedValueBox<RPCStats>

init(
client: GRPCClient,
client: GRPCClient<HTTP2ClientTransport.Posix>,
concurrentRPCs: Int,
rpcType: RPCType,
messagesPerStream: Int,
Expand Down Expand Up @@ -96,7 +97,7 @@ final class BenchmarkClient: Sendable {
return try await withThrowingTaskGroup(of: Void.self) { clientGroup in
// Start the client.
clientGroup.addTask {
try await self.client.run()
try await self.client.runConnections()
}

try await withThrowingTaskGroup(of: Void.self) { rpcsGroup in
Expand Down Expand Up @@ -148,7 +149,9 @@ final class BenchmarkClient: Sendable {
return (result, nanoseconds: Double(endTime - startTime))
}

private func unary(benchmark: Grpc_Testing_BenchmarkService.Client) async {
private func unary(
benchmark: Grpc_Testing_BenchmarkService.Client<HTTP2ClientTransport.Posix>
) async {
let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt {
do {
try await benchmark.unaryCall(request: ClientRequest(message: self.message)) {
Expand All @@ -165,7 +168,9 @@ final class BenchmarkClient: Sendable {
self.record(latencyNanos: nanoseconds, errorCode: errorCode)
}

private func streaming(benchmark: Grpc_Testing_BenchmarkService.Client) async {
private func streaming(
benchmark: Grpc_Testing_BenchmarkService.Client<HTTP2ClientTransport.Posix>
) async {
// Streaming RPCs ping-pong messages back and forth. To achieve this the response message
// stream is sent to the request closure, and the request closure indicates the outcome back
// to the response handler to keep the RPC alive for the appropriate amount of time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import GRPCCore
import GRPCProtobuf
import SwiftProtobuf

// MARK: - grpc.testing.BenchmarkService

Expand Down Expand Up @@ -437,7 +436,7 @@ extension Grpc_Testing_BenchmarkService {

// Default implementation of 'registerMethods(with:)'.
extension Grpc_Testing_BenchmarkService.StreamingServiceProtocol {
internal func registerMethods(with router: inout GRPCCore.RPCRouter) {
internal func registerMethods<Transport>(with router: inout GRPCCore.RPCRouter<Transport>) where Transport: GRPCCore.ServerTransport {
router.registerHandler(
forMethod: Grpc_Testing_BenchmarkService.Method.UnaryCall.descriptor,
deserializer: GRPCProtobuf.ProtobufDeserializer<Grpc_Testing_SimpleRequest>(),
Expand Down Expand Up @@ -747,14 +746,14 @@ extension Grpc_Testing_BenchmarkService {
/// The ``Client`` provides an implementation of ``ClientProtocol`` which wraps
/// a `GRPCCore.GRPCCClient`. The underlying `GRPCClient` provides the long-lived
/// means of communication with the remote peer.
internal struct Client: ClientProtocol {
private let client: GRPCCore.GRPCClient
internal struct Client<Transport>: ClientProtocol where Transport: GRPCCore.ClientTransport {
private let client: GRPCCore.GRPCClient<Transport>

/// Creates a new client wrapping the provided `GRPCCore.GRPCClient`.
///
/// - Parameters:
/// - client: A `GRPCCore.GRPCClient` providing a communication channel to the service.
internal init(wrapping client: GRPCCore.GRPCClient) {
internal init(wrapping client: GRPCCore.GRPCClient<Transport>) {
self.client = client
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// DO NOT EDIT.
// swift-format-ignore-file
//
// Generated by the gRPC Swift generator plugin for the protocol buffer compiler.
// Source: grpc/testing/control.proto
//
// For information on using the generated types, please see the documentation:
// https://github.com/grpc/grpc-swift

// This file contained no services.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2015-2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Message definitions to be used by integration test service definitions.

// DO NOT EDIT.
// swift-format-ignore-file
//
// Generated by the gRPC Swift generator plugin for the protocol buffer compiler.
// Source: grpc/testing/messages.proto
//
// For information on using the generated types, please see the documentation:
// https://github.com/grpc/grpc-swift

// This file contained no services.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// DO NOT EDIT.
// swift-format-ignore-file
//
// Generated by the gRPC Swift generator plugin for the protocol buffer compiler.
// Source: grpc/testing/payloads.proto
//
// For information on using the generated types, please see the documentation:
// https://github.com/grpc/grpc-swift

// This file contained no services.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// DO NOT EDIT.
// swift-format-ignore-file
//
// Generated by the gRPC Swift generator plugin for the protocol buffer compiler.
// Source: grpc/testing/stats.proto
//
// For information on using the generated types, please see the documentation:
// https://github.com/grpc/grpc-swift

// This file contained no services.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import GRPCCore
import GRPCProtobuf
import SwiftProtobuf

// MARK: - grpc.testing.WorkerService

Expand Down Expand Up @@ -381,7 +380,7 @@ extension Grpc_Testing_WorkerService {

// Default implementation of 'registerMethods(with:)'.
extension Grpc_Testing_WorkerService.StreamingServiceProtocol {
internal func registerMethods(with router: inout GRPCCore.RPCRouter) {
internal func registerMethods<Transport>(with router: inout GRPCCore.RPCRouter<Transport>) where Transport: GRPCCore.ServerTransport {
router.registerHandler(
forMethod: Grpc_Testing_WorkerService.Method.RunServer.descriptor,
deserializer: GRPCProtobuf.ProtobufDeserializer<Grpc_Testing_ServerArgs>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ final class WorkerService: Sendable {
}

struct Server {
var server: GRPCServer
var server: GRPCServer<HTTP2ServerTransport.Posix>
var stats: ServerStats
var eventLoopGroup: MultiThreadedEventLoopGroup
}
Expand Down Expand Up @@ -96,7 +96,7 @@ final class WorkerService: Sendable {
}

mutating func startedServer(
_ server: GRPCServer,
_ server: GRPCServer<HTTP2ServerTransport.Posix>,
stats: ServerStats,
eventLoopGroup: MultiThreadedEventLoopGroup
) -> OnStartedServer {
Expand Down Expand Up @@ -167,7 +167,7 @@ final class WorkerService: Sendable {
}

enum OnStopListening {
case stopListening(GRPCServer)
case stopListening(GRPCServer<HTTP2ServerTransport.Posix>)
case nothing
}

Expand Down Expand Up @@ -200,7 +200,7 @@ final class WorkerService: Sendable {
}

enum OnQuitWorker {
case shutDownServer(GRPCServer)
case shutDownServer(GRPCServer<HTTP2ServerTransport.Posix>)
case shutDownClients([BenchmarkClient])
case nothing
}
Expand Down Expand Up @@ -377,7 +377,7 @@ extension WorkerService: Grpc_Testing_WorkerService.ServiceProtocol {
extension WorkerService {
private func startServer(
_ serverConfig: Grpc_Testing_ServerConfig
) async throws -> (GRPCServer, HTTP2ServerTransport.Posix) {
) async throws -> (GRPCServer<HTTP2ServerTransport.Posix>, HTTP2ServerTransport.Posix) {
// Prepare an ELG, the test might require more than the default of one.
let numberOfThreads: Int
if serverConfig.asyncServerThreads > 0 {
Expand Down
31 changes: 20 additions & 11 deletions Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ package final class Connection: Sendable {
wrappingChannelSynchronously: channel,
configuration: NIOAsyncChannel.Configuration(
isOutboundHalfClosureEnabled: true,
inboundType: RPCResponsePart.self,
outboundType: RPCRequestPart.self
inboundType: RPCResponsePart<GRPCNIOTransportBytes>.self,
outboundType: RPCRequestPart<GRPCNIOTransportBytes>.self
)
)
}
Expand Down Expand Up @@ -389,23 +389,32 @@ package final class Connection: Sendable {

extension Connection {
package struct Stream {
package typealias Inbound = NIOAsyncChannelInboundStream<RPCResponsePart>
package typealias Inbound = NIOAsyncChannelInboundStream<RPCResponsePart<GRPCNIOTransportBytes>>

typealias RequestWriter = NIOAsyncChannelOutboundWriter<
RPCRequestPart<GRPCNIOTransportBytes>
>

typealias HTTP2Stream = NIOAsyncChannel<
RPCResponsePart<GRPCNIOTransportBytes>,
RPCRequestPart<GRPCNIOTransportBytes>
>

package struct Outbound: ClosableRPCWriterProtocol {
package typealias Element = RPCRequestPart
package typealias Element = RPCRequestPart<GRPCNIOTransportBytes>

private let requestWriter: NIOAsyncChannelOutboundWriter<RPCRequestPart>
private let http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
private let requestWriter: RequestWriter
private let http2Stream: HTTP2Stream

fileprivate init(
requestWriter: NIOAsyncChannelOutboundWriter<RPCRequestPart>,
http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
requestWriter: RequestWriter,
http2Stream: HTTP2Stream
) {
self.requestWriter = requestWriter
self.http2Stream = http2Stream
}

package func write(_ element: RPCRequestPart) async throws {
package func write(_ element: RPCRequestPart<GRPCNIOTransportBytes>) async throws {
try await self.requestWriter.write(element)
}

Expand All @@ -425,10 +434,10 @@ extension Connection {

let context: ClientContext

private let http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
private let http2Stream: HTTP2Stream

init(
wrapping stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>,
wrapping stream: HTTP2Stream,
context: ClientContext
) {
self.http2Stream = stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package import GRPCCore
private import Synchronization

package final class GRPCChannel: ClientTransport {
package typealias Bytes = GRPCNIOTransportBytes

private enum Input: Sendable {
/// Close the channel, if possible.
case close
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ internal import NIOHTTP2

final class GRPCClientStreamHandler: ChannelDuplexHandler {
typealias InboundIn = HTTP2Frame.FramePayload
typealias InboundOut = RPCResponsePart
typealias InboundOut = RPCResponsePart<GRPCNIOTransportBytes>

typealias OutboundIn = RPCRequestPart
typealias OutboundIn = RPCRequestPart<GRPCNIOTransportBytes>
typealias OutboundOut = HTTP2Frame.FramePayload

private var stateMachine: GRPCStreamStateMachine
Expand Down Expand Up @@ -80,7 +80,8 @@ extension GRPCClientStreamHandler {
loop: while true {
switch self.stateMachine.nextInboundMessage() {
case .receiveMessage(let message):
context.fireChannelRead(self.wrapInboundOut(.message(message)))
let wrapped = GRPCNIOTransportBytes(message)
context.fireChannelRead(self.wrapInboundOut(.message(wrapped)))
case .awaitMoreMessages:
break loop
case .noMoreMessages:
Expand Down Expand Up @@ -193,7 +194,7 @@ extension GRPCClientStreamHandler {

case .message(let message):
do {
try self.stateMachine.send(message: message, promise: promise)
try self.stateMachine.send(message: message.buffer, promise: promise)
} catch let invalidState {
let error = RPCError(invalidState)
promise?.fail(error)
Expand Down
Loading

0 comments on commit 56defe6

Please sign in to comment.