-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-3080: HadoopStreams to support ByteBufferPositionedReadable #3096
base: master
Are you sure you want to change the base?
Conversation
Based of the H2 stream test suite but * parameterized for on/off heap * expect no changes in buffer contents on out of range reads. Still one test failure.
* changing how stream capabilities are set up and queried, makes it easy to generate streams with different declared behaviours. * pull out common assertions * lots of javadoc of what each test case is trying to do. + all the tests are happy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this! I've left some comments.
|
||
if (isByteBufferReadable) { | ||
if (stream.hasCapability(READBYTEBUFFER)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can avoid any reflection here because of the Hadoop version bump?
* @param stream stream to probe | ||
* @return A H2SeekableInputStream to access, or H1SeekableInputStream if the stream is not seekable | ||
*/ | ||
private static Function<FSDataInputStream, SeekableInputStream> unwrapByteBufferReadableLegacy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any behavior change of a wrapped stream after removing unwrapByteBufferReadableLegacy
?
@@ -100,6 +100,10 @@ public void readVectored(List<ParquetFileRange> ranges, ByteBufferAllocator allo | |||
VectorIoBridge.instance().readVectoredRanges(stream, ranges, allocator); | |||
} | |||
|
|||
protected Reader getReader() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why adding this but not used elsewhere?
* @param buf a byte buffer to fill with data from the stream | ||
* @return number of bytes read. | ||
* | ||
* @throws EOFException the buffer length is greater than the file length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add EOFException
to the method signature?
* {@code ByteBufferPositionedReadable.readFully()}. | ||
* <p>This is implemented by HDFS and possibly other clients, | ||
*/ | ||
class H3ByteBufferInputStream extends H2SeekableInputStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with the inheritance to reduce code duplication.
I'm away until 2025; will reply to comments then. Thanks for the review. |
I'm back, don't think I've forgotten this. In fact I've been actually setting up a test-only-loop for hadoop for regression testing parquet support through the cloud connectors. |
Rationale for this change
If a stream declares in its StreamCapabilities that it supports
ByteBufferPositionedReadable
, then use that API forreadFully(ByteBuffer)
Adding support for Hadoop
ByteBufferPositionedReadable
streams may improve performanceby pushing retry/recovery logic into the filesystem client library.
This interface is implemented by the HDFS input stream; we are considering adding
it elsewhere.
What changes are included in this PR?
H3ByteBufferInputStream
HadoopStreams
if theFSDataInputStream
is considered suitable.Class
H3ByteBufferInputStream
The reading is done in a new class,
H3ByteBufferInputStream
, which subclassesH2ByteBufferInputStream
. This reduces the amount of duplicate code, it just makes it a bit unclean.The purist way to do it would be to create an abstract superclass
HadoopInputStream
to hold all commonality between the the three input streams.I'm happy to do this, just didn't want to doing some larger refactoring without (a) showing the core design worked and (b) getting permission to do this. Should I do this?
HadoopStreams
changesSelection of the new input stream is done if and only if the stream declares the capability
in:preadbytebuffer
.There is no equivalent of
isWrappedStreamByteBufferReadable()
which recurses througha chain of wrapped streams looking for the API.
If a stream doesn't declare its support for the API, it won't get picked up.
This is done knowing that the sole production implemenation which currently exists,
the HDFS input stream, does declare this capability.
Are these changes tested?
There is new test suite, for new behavior and ensuring that the integration with
HadoopStreams still retains the correct behavior for existing streams.
Suite is parameterized on heap and direct buffers.
Are there any user-facing changes?
No
Closes GH-3080