Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3080: HadoopStreams to support ByteBufferPositionedReadable #3096

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

steveloughran
Copy link
Contributor

Rationale for this change

If a stream declares in its StreamCapabilities that it supports
ByteBufferPositionedReadable, then use that API for
readFully(ByteBuffer)

ByteBufferPositionedReadable.readFully(long position, ByteBuffer buf)

Adding support for Hadoop ByteBufferPositionedReadable streams may improve performance
by pushing retry/recovery logic into the filesystem client library.

This interface is implemented by the HDFS input stream; we are considering adding
it elsewhere.

What changes are included in this PR?

  • New SeekableInputStream implementation: H3ByteBufferInputStream
  • Instantiated in HadoopStreams if the FSDataInputStream is considered suitable.
  • Tests for the new behavior and that no regressions are caused.

Class H3ByteBufferInputStream

The reading is done in a new class, H3ByteBufferInputStream, which subclasses H2ByteBufferInputStream. This reduces the amount of duplicate code, it just makes it a bit unclean.

The purist way to do it would be to create an abstract superclass HadoopInputStream to hold all commonality between the the three input streams.

I'm happy to do this, just didn't want to doing some larger refactoring without (a) showing the core design worked and (b) getting permission to do this. Should I do this?

HadoopStreams changes

Selection of the new input stream is done if and only if the stream declares the capability in:preadbytebuffer.
There is no equivalent of isWrappedStreamByteBufferReadable() which recurses through
a chain of wrapped streams looking for the API.
If a stream doesn't declare its support for the API, it won't get picked up.
This is done knowing that the sole production implemenation which currently exists,
the HDFS input stream, does declare this capability.

Are these changes tested?

There is new test suite, for new behavior and ensuring that the integration with
HadoopStreams still retains the correct behavior for existing streams.
Suite is parameterized on heap and direct buffers.

Are there any user-facing changes?

No

Closes GH-3080

Based of the H2 stream test suite but
* parameterized for on/off heap
* expect no changes in buffer contents on out of range reads.

Still one test failure.
* changing how stream capabilities are set up and queried,
  makes it easy to generate streams with different declared
  behaviours.
* pull out common assertions
* lots of javadoc of what each test case is trying to do.

+ all the tests are happy.
Copy link
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this! I've left some comments.


if (isByteBufferReadable) {
if (stream.hasCapability(READBYTEBUFFER)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can avoid any reflection here because of the Hadoop version bump?

* @param stream stream to probe
* @return A H2SeekableInputStream to access, or H1SeekableInputStream if the stream is not seekable
*/
private static Function<FSDataInputStream, SeekableInputStream> unwrapByteBufferReadableLegacy(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any behavior change of a wrapped stream after removing unwrapByteBufferReadableLegacy?

@@ -100,6 +100,10 @@ public void readVectored(List<ParquetFileRange> ranges, ByteBufferAllocator allo
VectorIoBridge.instance().readVectoredRanges(stream, ranges, allocator);
}

protected Reader getReader() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why adding this but not used elsewhere?

* @param buf a byte buffer to fill with data from the stream
* @return number of bytes read.
*
* @throws EOFException the buffer length is greater than the file length
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add EOFException to the method signature?

* {@code ByteBufferPositionedReadable.readFully()}.
* <p>This is implemented by HDFS and possibly other clients,
*/
class H3ByteBufferInputStream extends H2SeekableInputStream {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with the inheritance to reduce code duplication.

@steveloughran
Copy link
Contributor Author

I'm away until 2025; will reply to comments then. Thanks for the review.

@steveloughran
Copy link
Contributor Author

I'm back, don't think I've forgotten this. In fact I've been actually setting up a test-only-loop for hadoop for regression testing parquet support through the cloud connectors.
apache/hadoop#7285

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

HadoopStreams to support ByteBufferPositionedReadable input streams
2 participants