From 5de0623784c98af65e3471fc325d2265aea992a8 Mon Sep 17 00:00:00 2001 From: Russell Pecka Date: Sat, 6 Jul 2024 01:37:59 -0700 Subject: [PATCH 1/2] Use chunked file read even when the read size is less than the single shot read limit since readChunk is not guaranteed to read all the requested bytes --- .../NIOFileSystem/FileHandleProtocol.swift | 46 ++++--- .../FileHandleTests.swift | 119 ++++++++++++++++++ 2 files changed, 141 insertions(+), 24 deletions(-) diff --git a/Sources/NIOFileSystem/FileHandleProtocol.swift b/Sources/NIOFileSystem/FileHandleProtocol.swift index e7607f7829..14287e84fd 100644 --- a/Sources/NIOFileSystem/FileHandleProtocol.swift +++ b/Sources/NIOFileSystem/FileHandleProtocol.swift @@ -376,33 +376,31 @@ extension ReadableFileHandleProtocol { forceChunkedRead = true } - if !forceChunkedRead, readSize <= singleShotReadLimit { - return try await self.readChunk( - fromAbsoluteOffset: offset, - length: .bytes(Int64(readSize)) - ) + let chunkLength: ByteCount = if !forceChunkedRead, readSize <= singleShotReadLimit { + .bytes(Int64(readSize)) } else { - var accumulator = ByteBuffer() - accumulator.reserveCapacity(readSize) - - for try await chunk in self.readChunks(in: offset..., chunkLength: .mebibytes(8)) { - accumulator.writeImmutableBuffer(chunk) - if accumulator.readableBytes > maximumSizeAllowed.bytes { - throw FileSystemError( - code: .resourceExhausted, - message: """ - There are more bytes to read than the maximum size allowed \ - (\(maximumSizeAllowed)). Read the file in chunks or increase the maximum size \ - allowed. - """, - cause: nil, - location: .here() - ) - } - } + .mebibytes(8) + } + var accumulator = ByteBuffer() + accumulator.reserveCapacity(readSize) - return accumulator + for try await chunk in self.readChunks(in: offset..., chunkLength: chunkLength) { + accumulator.writeImmutableBuffer(chunk) + if accumulator.readableBytes > maximumSizeAllowed.bytes { + throw FileSystemError( + code: .resourceExhausted, + message: """ + There are more bytes to read than the maximum size allowed \ + (\(maximumSizeAllowed)). Read the file in chunks or increase the maximum size \ + allowed. + """, + cause: nil, + location: .here() + ) + } } + + return accumulator } else { guard offset == 0 else { throw FileSystemError( diff --git a/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift b/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift index cbfd56887a..23df6abaff 100644 --- a/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift +++ b/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift @@ -21,6 +21,111 @@ import XCTest @available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) final class FileHandleTests: XCTestCase { + private enum MockFileHandleError: Error { + case unimplemented(function: String) + } + + private final class MockFileHandle: ReadableFileHandleProtocol { + struct ChunkedByteBufferSequence: AsyncSequence { + struct Interator: AsyncIteratorProtocol { + let buffer: ByteBuffer + private(set) var range: Range + let chunkLength: Int + mutating func next() async throws -> ByteBuffer? { + guard let slice = self.buffer.getSlice(at: Int(range.lowerBound), length: self.chunkLength) else { + return nil + } + self.range = self.range.lowerBound+Int64(slice.readableBytes).. + let chunkLength: ByteCount + func makeAsyncIterator() -> Interator { + .init(buffer: self.buffer, range: self.range, chunkLength: Int(self.chunkLength.bytes)) + } + } + let bytes: ByteBuffer + let size: Int + let chunkSize: ByteCount + + init(bytes: ByteBuffer, chunkSize: ByteCount = .bytes(Int64.max)) { + self.bytes = bytes + self.size = bytes.readableBytes // capture initial size since we might be moving the read/write index later + self.chunkSize = chunkSize + } + + func readChunk(fromAbsoluteOffset offset: Int64, length: ByteCount) async throws -> ByteBuffer { + self.bytes.getSlice(at: Int(offset), length: Int(min(length.bytes, self.chunkSize.bytes))) ?? .init() + } + + func readChunks(in range: Range, chunkLength: ByteCount) -> FileChunks { + .init(wrapping: ChunkedByteBufferSequence(buffer: self.bytes, range: range, chunkLength: chunkLength)) + } + + func info() async throws -> FileInfo { + .init( + type: .regular, + permissions: [.ownerReadWrite, .groupRead, .otherRead], + size: Int64(self.size), + userID: .init(rawValue: 501), + groupID: .init(rawValue: 20), + lastAccessTime: .omit, + lastDataModificationTime: .omit, + lastStatusChangeTime: .omit + ) + } + + func replacePermissions(_ permissions: FilePermissions) async throws { + throw MockFileHandleError.unimplemented(function: #function) + } + + func addPermissions(_ permissions: FilePermissions) async throws -> FilePermissions { + throw MockFileHandleError.unimplemented(function: #function) + } + + func removePermissions(_ permissions: FilePermissions) async throws -> FilePermissions { + throw MockFileHandleError.unimplemented(function: #function) + } + + func attributeNames() async throws -> [String] { + throw MockFileHandleError.unimplemented(function: #function) + } + + func valueForAttribute(_ name: String) async throws -> [UInt8] { + throw MockFileHandleError.unimplemented(function: #function) + } + + func updateValueForAttribute(_ bytes: some RandomAccessCollection & Sendable, attribute name: String) async throws { + throw MockFileHandleError.unimplemented(function: #function) + } + + func removeValueForAttribute(_ name: String) async throws { + throw MockFileHandleError.unimplemented(function: #function) + } + + func synchronize() async throws { + throw MockFileHandleError.unimplemented(function: #function) + } + + func withUnsafeDescriptor(_ execute: @escaping @Sendable (FileDescriptor) throws -> R) async throws -> R where R : Sendable { + throw MockFileHandleError.unimplemented(function: #function) + } + + func detachUnsafeFileDescriptor() throws -> FileDescriptor { + throw MockFileHandleError.unimplemented(function: #function) + } + + func close() async throws { + throw MockFileHandleError.unimplemented(function: #function) + } + + func setTimes(lastAccess: FileInfo.Timespec?, lastDataModification: FileInfo.Timespec?) async throws { + throw MockFileHandleError.unimplemented(function: #function) + } + } static let thisFile = FilePath(#filePath) static let testData = FilePath(#filePath) .removingLastComponent() // FileHandleTests.swift @@ -303,6 +408,20 @@ final class FileHandleTests: XCTestCase { } } + func testReadWholeFilePartialChunk() async throws { + let fileContents = ByteBuffer(string: "the quick brown fox jumped over the lazy dog") + let mockHandle = MockFileHandle( + bytes: fileContents, + chunkSize: .bytes(Int64(fileContents.readableBytes / 2)) // simulate reading a chunk of less than the requested size + ) + let contents = try await mockHandle.readToEnd(maximumSizeAllowed: .bytes(Int64.max)) + XCTAssertEqual( + contents, + fileContents, + "Contents of mock file differ to what was read by readToEnd" + ) + } + func testWriteAndReadUnseekableFile() async throws { let privateTempDirPath = try await FileSystem.shared.createTemporaryDirectory(template: "test-XXX") self.addTeardownBlock { From ecbb478d63c49c1e589946dab47c5c0da7a0a5df Mon Sep 17 00:00:00 2001 From: Russell Pecka Date: Sat, 6 Jul 2024 03:08:11 -0700 Subject: [PATCH 2/2] Rewrite ChunkRange to use either a specified offset or the current offset --- Sources/NIOFileSystem/FileChunks.swift | 19 ++++++++++--------- Sources/NIOFileSystem/FileHandle.swift | 4 ++-- .../NIOFileSystem/FileHandleProtocol.swift | 10 ++++------ .../Internal/SystemFileHandle.swift | 2 +- .../FileHandleTests.swift | 12 ++++++++---- 5 files changed, 25 insertions(+), 22 deletions(-) diff --git a/Sources/NIOFileSystem/FileChunks.swift b/Sources/NIOFileSystem/FileChunks.swift index 4b87221410..242ef9dfff 100644 --- a/Sources/NIOFileSystem/FileChunks.swift +++ b/Sources/NIOFileSystem/FileChunks.swift @@ -22,8 +22,10 @@ import NIOPosix @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) public struct FileChunks: AsyncSequence { enum ChunkRange { - case entireFile - case partial(Range) + /// Read from the current file access offset. Useful for reading from unseekable files. + case current + /// Read from a specific offset. + case specified(Range) } public typealias Element = ByteBuffer @@ -39,13 +41,12 @@ public struct FileChunks: AsyncSequence { internal init( handle: SystemFileHandle, chunkLength: ByteCount, - range: Range + range: Range? ) { - let chunkRange: ChunkRange - if range.lowerBound == 0, range.upperBound == .max { - chunkRange = .entireFile + let chunkRange: ChunkRange = if let range { + .specified(range) } else { - chunkRange = .partial(range) + .current } // TODO: choose reasonable watermarks; this should likely be at least somewhat dependent @@ -96,9 +97,9 @@ extension BufferedStream where Element == ByteBuffer { ) -> BufferedStream { let state: ProducerState switch range { - case .entireFile: + case .current: state = ProducerState(handle: handle, range: nil) - case .partial(let partialRange): + case .specified(let partialRange): state = ProducerState(handle: handle, range: partialRange) } let protectedState = NIOLockedValueBox(state) diff --git a/Sources/NIOFileSystem/FileHandle.swift b/Sources/NIOFileSystem/FileHandle.swift index 2f6f0ed47f..30cd08bede 100644 --- a/Sources/NIOFileSystem/FileHandle.swift +++ b/Sources/NIOFileSystem/FileHandle.swift @@ -190,7 +190,7 @@ public struct ReadFileHandle: ReadableFileHandleProtocol, _HasFileHandle { ) } - public func readChunks(in range: Range, chunkLength: ByteCount) -> FileChunks { + public func readChunks(in range: Range?, chunkLength: ByteCount) -> FileChunks { self.fileHandle.systemFileHandle.readChunks(in: range, chunkLength: chunkLength) } @@ -265,7 +265,7 @@ public struct ReadWriteFileHandle: ReadableAndWritableFileHandleProtocol, _HasFi ) } - public func readChunks(in offset: Range, chunkLength: ByteCount) -> FileChunks { + public func readChunks(in offset: Range?, chunkLength: ByteCount) -> FileChunks { self.fileHandle.systemFileHandle.readChunks(in: offset, chunkLength: chunkLength) } diff --git a/Sources/NIOFileSystem/FileHandleProtocol.swift b/Sources/NIOFileSystem/FileHandleProtocol.swift index 14287e84fd..430340d0d9 100644 --- a/Sources/NIOFileSystem/FileHandleProtocol.swift +++ b/Sources/NIOFileSystem/FileHandleProtocol.swift @@ -201,7 +201,7 @@ public protocol ReadableFileHandleProtocol: FileHandleProtocol { /// - range: The absolute offsets into the file to read. /// - chunkLength: The maximum length of the chunk to read as a ``ByteCount``. /// - Returns: A sequence of chunks read from the file. - func readChunks(in range: Range, chunkLength: ByteCount) -> FileChunks + func readChunks(in range: Range?, chunkLength: ByteCount) -> FileChunks } // MARK: - Read chunks with default chunk length @@ -227,15 +227,13 @@ extension ReadableFileHandleProtocol { /// /// - Parameters: /// - range: A range of offsets in the file to read. - /// - chunkLength: The length of chunks to read, defaults to 128 KiB. /// - as: Type of chunk to read. /// - SeeAlso: ``ReadableFileHandleProtocol/readChunks(in:chunkLength:)-2dz6`` /// - Returns: An `AsyncSequence` of chunks read from the file. public func readChunks( - in range: Range, - chunkLength: ByteCount = .kibibytes(128) + in range: Range? ) -> FileChunks { - return self.readChunks(in: range, chunkLength: chunkLength) + return self.readChunks(in: range, chunkLength: .kibibytes(128)) } /// Returns an asynchronous sequence of chunks read from the file. @@ -413,7 +411,7 @@ extension ReadableFileHandleProtocol { var accumulator = ByteBuffer() accumulator.reserveCapacity(readSize) - for try await chunk in self.readChunks(in: ..., chunkLength: .mebibytes(8)) { + for try await chunk in self.readChunks(in: nil, chunkLength: .mebibytes(8)) { accumulator.writeImmutableBuffer(chunk) if accumulator.readableBytes > maximumSizeAllowed.bytes { throw FileSystemError( diff --git a/Sources/NIOFileSystem/Internal/SystemFileHandle.swift b/Sources/NIOFileSystem/Internal/SystemFileHandle.swift index 34c8f62353..6b458934bf 100644 --- a/Sources/NIOFileSystem/Internal/SystemFileHandle.swift +++ b/Sources/NIOFileSystem/Internal/SystemFileHandle.swift @@ -1069,7 +1069,7 @@ extension SystemFileHandle: ReadableFileHandleProtocol { } public func readChunks( - in range: Range, + in range: Range?, chunkLength size: ByteCount ) -> FileChunks { return FileChunks(handle: self, chunkLength: size, range: range) diff --git a/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift b/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift index 23df6abaff..49b3768d43 100644 --- a/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift +++ b/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift @@ -61,8 +61,11 @@ final class FileHandleTests: XCTestCase { self.bytes.getSlice(at: Int(offset), length: Int(min(length.bytes, self.chunkSize.bytes))) ?? .init() } - func readChunks(in range: Range, chunkLength: ByteCount) -> FileChunks { - .init(wrapping: ChunkedByteBufferSequence(buffer: self.bytes, range: range, chunkLength: chunkLength)) + func readChunks(in range: Range?, chunkLength: ByteCount) -> FileChunks { + guard let range else { + preconditionFailure("Reading from the current offset is not implemented for MockFileHandle") + } + return .init(wrapping: ChunkedByteBufferSequence(buffer: self.bytes, range: range, chunkLength: chunkLength)) } func info() async throws -> FileInfo { @@ -436,10 +439,11 @@ final class FileHandleTests: XCTestCase { try await self.withHandle(forFileAtPath: privateTempDirPath.appending("fifo"), accessMode: .readWrite) { handle in let someBytes = ByteBuffer(repeating: 42, count: 1546) + try await handle.write(contentsOf: someBytes.readableBytesView, toAbsoluteOffset: 0) - let readSomeBytes = try await handle.readToEnd(maximumSizeAllowed: .bytes(1546)) - XCTAssertEqual(readSomeBytes, someBytes) + let contents = try await handle.readToEnd(maximumSizeAllowed: .bytes(1546)) + XCTAssertEqual(contents, someBytes, "Data read back from the fifo should match what was written") } }