Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adopt new bag-of-bytes protocol #55

Merged
merged 10 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ package final class Connection: Sendable {
wrappingChannelSynchronously: channel,
configuration: NIOAsyncChannel.Configuration(
isOutboundHalfClosureEnabled: true,
inboundType: RPCResponsePart.self,
outboundType: RPCRequestPart.self
inboundType: RPCResponsePart<GRPCNIOTransportBytes>.self,
outboundType: RPCRequestPart<GRPCNIOTransportBytes>.self
)
)
}
Expand Down Expand Up @@ -383,23 +383,32 @@ package final class Connection: Sendable {

extension Connection {
package struct Stream {
package typealias Inbound = NIOAsyncChannelInboundStream<RPCResponsePart>
package typealias Inbound = NIOAsyncChannelInboundStream<RPCResponsePart<GRPCNIOTransportBytes>>

typealias RequestWriter = NIOAsyncChannelOutboundWriter<
RPCRequestPart<GRPCNIOTransportBytes>
>

typealias HTTP2Stream = NIOAsyncChannel<
RPCResponsePart<GRPCNIOTransportBytes>,
RPCRequestPart<GRPCNIOTransportBytes>
>

package struct Outbound: ClosableRPCWriterProtocol {
package typealias Element = RPCRequestPart
package typealias Element = RPCRequestPart<GRPCNIOTransportBytes>

private let requestWriter: NIOAsyncChannelOutboundWriter<RPCRequestPart>
private let http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
private let requestWriter: RequestWriter
private let http2Stream: HTTP2Stream

fileprivate init(
requestWriter: NIOAsyncChannelOutboundWriter<RPCRequestPart>,
http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
requestWriter: RequestWriter,
http2Stream: HTTP2Stream
) {
self.requestWriter = requestWriter
self.http2Stream = http2Stream
}

package func write(_ element: RPCRequestPart) async throws {
package func write(_ element: RPCRequestPart<GRPCNIOTransportBytes>) async throws {
try await self.requestWriter.write(element)
}

Expand All @@ -419,10 +428,10 @@ extension Connection {

let descriptor: MethodDescriptor

private let http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
private let http2Stream: HTTP2Stream

init(
wrapping stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>,
wrapping stream: HTTP2Stream,
descriptor: MethodDescriptor
) {
self.http2Stream = stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package import GRPCCore
private import Synchronization

package final class GRPCChannel: ClientTransport {
package typealias Bytes = GRPCNIOTransportBytes

private enum Input: Sendable {
/// Close the channel, if possible.
case close
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ internal import NIOHTTP2

final class GRPCClientStreamHandler: ChannelDuplexHandler {
typealias InboundIn = HTTP2Frame.FramePayload
typealias InboundOut = RPCResponsePart
typealias InboundOut = RPCResponsePart<GRPCNIOTransportBytes>

typealias OutboundIn = RPCRequestPart
typealias OutboundIn = RPCRequestPart<GRPCNIOTransportBytes>
typealias OutboundOut = HTTP2Frame.FramePayload

private var stateMachine: GRPCStreamStateMachine
Expand Down Expand Up @@ -80,7 +80,8 @@ extension GRPCClientStreamHandler {
loop: while true {
switch self.stateMachine.nextInboundMessage() {
case .receiveMessage(let message):
context.fireChannelRead(self.wrapInboundOut(.message(message)))
let wrapped = GRPCNIOTransportBytes(message)
context.fireChannelRead(self.wrapInboundOut(.message(wrapped)))
case .awaitMoreMessages:
break loop
case .noMoreMessages:
Expand Down Expand Up @@ -193,7 +194,7 @@ extension GRPCClientStreamHandler {

case .message(let message):
do {
try self.stateMachine.send(message: message, promise: promise)
try self.stateMachine.send(message: message.buffer, promise: promise)
} catch let invalidState {
let error = RPCError(invalidState)
promise?.fail(error)
Expand Down
46 changes: 26 additions & 20 deletions Sources/GRPCNIOTransportCore/Compression/Zlib.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ extension Zlib {
/// - Parameter output: The `ByteBuffer` into which the compressed message should be written.
/// - Returns: The number of bytes written into the `output` buffer.
@discardableResult
func compress(_ input: [UInt8], into output: inout ByteBuffer) throws(ZlibError) -> Int {
func compress(_ input: ByteBuffer, into output: inout ByteBuffer) throws(ZlibError) -> Int {
defer { self.reset() }
let upperBound = self.stream.deflateBound(inputBytes: input.count)
let upperBound = self.stream.deflateBound(inputBytes: input.readableBytes)
return try self.stream.deflate(input, into: &output, upperBound: upperBound)
}

Expand Down Expand Up @@ -110,7 +110,7 @@ extension Zlib {
/// - Parameters:
/// - input: The buffer read compressed bytes from.
/// - limit: The largest size a decompressed payload may be.
func decompress(_ input: inout ByteBuffer, limit: Int) throws -> [UInt8] {
func decompress(_ input: inout ByteBuffer, limit: Int) throws -> ByteBuffer {
defer { self.reset() }
return try self.stream.inflate(input: &input, limit: limit)
}
Expand Down Expand Up @@ -302,24 +302,30 @@ extension UnsafeMutablePointer<z_stream> {
self.pointee.msg.map { String(cString: $0) }
}

func inflate(input: inout ByteBuffer, limit: Int) throws -> [UInt8] {
return try input.readWithUnsafeMutableReadableBytes { inputPointer in
func inflate(input: inout ByteBuffer, limit: Int) throws -> ByteBuffer {
return try input.readWithUnsafeMutableReadableBytes { inputPointer -> (Int, ByteBuffer) in
self.setNextInputBuffer(inputPointer)
defer {
self.setNextInputBuffer(nil)
self.setNextOutputBuffer(nil)
}

// Assume the output will be twice as large as the input.
var output = [UInt8](repeating: 0, count: min(inputPointer.count * 2, limit))
var offset = 0
var output = ByteBuffer()
var outputSize = min(inputPointer.count * 2, limit)
var finished = false
var totalBytesWritten = 0

while true {
let (finished, written) = try output[offset...].withUnsafeMutableBytes { outPointer in
let written = try output.writeWithUnsafeMutableBytes(
minimumWritableBytes: outputSize
) { pointer in
let outPointer = UnsafeMutableRawBufferPointer(
start: pointer.baseAddress,
count: min(pointer.count, outputSize)
)
self.setNextOutputBuffer(outPointer)

let finished: Bool

// Possible return codes:
// - Z_OK: some progress has been made
// - Z_STREAM_END: the end of the compressed data has been reached and all uncompressed
Expand Down Expand Up @@ -347,29 +353,29 @@ extension UnsafeMutablePointer<z_stream> {
)
}

let size = outPointer.count - self.availableOutputBytes
return (finished, size)
return outPointer.count - self.availableOutputBytes
}

if finished {
output.removeLast(output.count - self.totalOutputBytes)
let bytesRead = inputPointer.count - self.availableInputBytes
return (bytesRead, output)
} else {
offset += written
let newSize = min(output.count * 2, limit)
if newSize == output.count {
// There are still more bytes to decompress. Increase the size of the extra space in the
// buffer we're writing into.
totalBytesWritten += written
let newSize = min(outputSize * 2, limit - totalBytesWritten)
if newSize <= 0 {
assert(newSize == 0)
throw RPCError(code: .resourceExhausted, message: "Message is too large to decompress.")
} else {
output.append(contentsOf: repeatElement(0, count: newSize - output.count))
}
outputSize = newSize
}
}
}
}

func deflate(
_ input: [UInt8],
_ input: ByteBuffer,
into output: inout ByteBuffer,
upperBound: Int
) throws(ZlibError) -> Int {
Expand All @@ -380,7 +386,7 @@ extension UnsafeMutablePointer<z_stream> {

do {
var input = input
return try input.withUnsafeMutableBytes { input in
return try input.withUnsafeMutableReadableBytes { input in
self.setNextInputBuffer(input)

return try output.writeWithUnsafeMutableBytes(minimumWritableBytes: upperBound) { output in
Expand Down
8 changes: 4 additions & 4 deletions Sources/GRPCNIOTransportCore/GRPCMessageDecoder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ struct GRPCMessageDecoder: NIOSingleStepByteToMessageDecoder {
/// Length of the gRPC message header (1 compression byte, 4 bytes for the length).
static let metadataLength = 5

typealias InboundOut = [UInt8]
typealias InboundOut = ByteBuffer

private let decompressor: Zlib.Decompressor?
private let maxPayloadSize: Int
Expand Down Expand Up @@ -92,7 +92,7 @@ struct GRPCMessageDecoder: NIOSingleStepByteToMessageDecoder {
}
return try decompressor.decompress(&message, limit: self.maxPayloadSize)
} else {
return Array(buffer: message)
return message
}
}

Expand Down Expand Up @@ -136,7 +136,7 @@ package struct GRPCMessageDeframer {
}
}

package mutating func decodeNext() throws -> [UInt8]? {
package mutating func decodeNext() throws -> ByteBuffer? {
guard (self.buffer?.readableBytes ?? 0) > 0 else { return nil }
// Above checks mean this is both non-nil and non-empty.
let message = try self.decoder.decode(buffer: &self.buffer!)
Expand All @@ -145,7 +145,7 @@ package struct GRPCMessageDeframer {
}

extension GRPCMessageDeframer {
mutating func decode(into queue: inout OneOrManyQueue<[UInt8]>) throws {
mutating func decode(into queue: inout OneOrManyQueue<ByteBuffer>) throws {
while let next = try self.decodeNext() {
queue.append(next)
}
Expand Down
15 changes: 9 additions & 6 deletions Sources/GRPCNIOTransportCore/GRPCMessageFramer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct GRPCMessageFramer {
/// reserves capacity in powers of 2. This way, we can take advantage of the whole buffer.
static let maxWriteBufferLength = 65_536

private var pendingMessages: OneOrManyQueue<(bytes: [UInt8], promise: EventLoopPromise<Void>?)>
private var pendingMessages: OneOrManyQueue<(bytes: ByteBuffer, promise: EventLoopPromise<Void>?)>

private var writeBuffer: ByteBuffer

Expand All @@ -45,7 +45,7 @@ struct GRPCMessageFramer {

/// Queue the given bytes to be framed and potentially coalesced alongside other messages in a `ByteBuffer`.
/// The resulting data will be returned when calling ``GRPCMessageFramer/next()``.
mutating func append(_ bytes: [UInt8], promise: EventLoopPromise<Void>?) {
mutating func append(_ bytes: ByteBuffer, promise: EventLoopPromise<Void>?) {
self.pendingMessages.append((bytes, promise))
}

Expand All @@ -72,7 +72,7 @@ struct GRPCMessageFramer {

var requiredCapacity = 0
for message in self.pendingMessages {
requiredCapacity += message.bytes.count + Self.metadataLength
requiredCapacity += message.bytes.readableBytes + Self.metadataLength
}
self.writeBuffer.clear(minimumCapacity: requiredCapacity)

Expand All @@ -90,7 +90,10 @@ struct GRPCMessageFramer {
return (result: .success(self.writeBuffer), promise: pendingWritePromise)
}

private mutating func encode(_ message: [UInt8], compressor: Zlib.Compressor?) throws(RPCError) {
private mutating func encode(
_ message: ByteBuffer,
compressor: Zlib.Compressor?
) throws(RPCError) {
if let compressor {
self.writeBuffer.writeInteger(UInt8(1)) // Set compression flag

Expand All @@ -108,9 +111,9 @@ struct GRPCMessageFramer {
} else {
self.writeBuffer.writeMultipleIntegers(
UInt8(0), // Clear compression flag
UInt32(message.count) // Set message length
UInt32(message.readableBytes) // Set message length
)
self.writeBuffer.writeBytes(message)
self.writeBuffer.writeImmutableBuffer(message)
}
}
}
65 changes: 65 additions & 0 deletions Sources/GRPCNIOTransportCore/GRPCNIOTransportBytes.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2025, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

public import GRPCCore
public import NIOCore

/// The contiguous bytes type used by the gRPC's NIO transport.
public struct GRPCNIOTransportBytes: GRPCContiguousBytes, Hashable, Sendable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to wrap the ByteBuffer? Can we not conform it to GRPCContiguousBytes directly? Are we afraid the couple of methods we'd need to declare would clash with some future API added to it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping it gives us flexibility to change it in the future without breaking API.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - I didn't consider this because this package is already so tied to NIO (and I find it hard to believe we'd replace ByteBuffer on NIO anytime soon 😄). But that's fair.

@usableFromInline
internal var buffer: ByteBuffer

@inlinable
internal init(_ buffer: ByteBuffer) {
self.buffer = buffer
}

@inlinable
internal init() {
self.buffer = ByteBuffer()
}

@inlinable
public init(repeating: UInt8, count: Int) {
self.buffer = ByteBuffer(repeating: repeating, count: count)
}

@inlinable
public init(_ sequence: some Sequence<UInt8>) {
self.buffer = ByteBuffer(bytes: sequence)
}

@inlinable
public var count: Int {
self.buffer.readableBytes
}

@inlinable
public func withUnsafeBytes<R>(
_ body: (UnsafeRawBufferPointer) throws -> R
) rethrows -> R {
try self.buffer.withUnsafeReadableBytes(body)
}

@inlinable
public mutating func withUnsafeMutableBytes<R>(
_ body: (UnsafeMutableRawBufferPointer) throws -> R
) rethrows -> R {
// 'GRPCContiguousBytes' has no concept of readable/writable bytes; all bytes stored are
// readable and writable. In 'ByteBuffer' terms, these are just the readable bytes.
try self.buffer.withUnsafeMutableReadableBytes(body)
}
}
Loading
Loading