From 3087ea93156aec845ee42acdeae00f86d7d93a17 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 21 Feb 2024 06:31:07 -0600 Subject: [PATCH] Initial commit --- .../parquet/base/ColumnChunkReader.java | 5 +- .../parquet/base/ColumnChunkReaderImpl.java | 120 ++++++++---------- .../parquet/base/ColumnPageReader.java | 9 +- .../parquet/base/ColumnPageReaderImpl.java | 85 +++---------- .../parquet/base/ParquetFileReader.java | 2 +- .../parquet/table/ParquetTableWriter.java | 9 +- .../OffsetIndexBasedColumnChunkPageStore.java | 2 +- .../topage/ToPageWithDictionary.java | 2 +- .../deephaven/extensions/s3/BufferPool.java | 2 - 9 files changed, 83 insertions(+), 153 deletions(-) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java index a3967dc24ea..2f13ec64802 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java @@ -61,8 +61,11 @@ interface ColumnPageReaderIterator { interface ColumnPageDirectAccessor { /** * Directly access a page reader for a given page number. + * + * @param pageNum The page number to access. + * @param channelContext The channel context to use for constructing the reader */ - ColumnPageReader getPageReader(int pageNum); + ColumnPageReader getPageReader(int pageNum, SeekableChannelContext channelContext); } /** diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index 8bf4141aff6..5237673b8d9 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -27,12 +27,10 @@ import java.io.UncheckedIOException; import java.net.URI; import java.nio.channels.SeekableByteChannel; -import java.nio.file.Path; import java.util.List; import java.util.NoSuchElementException; import java.util.function.Function; -import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME; import static org.apache.parquet.format.Encoding.PLAIN_DICTIONARY; import static org.apache.parquet.format.Encoding.RLE_DICTIONARY; @@ -93,18 +91,14 @@ public int getMaxRl() { return path.getMaxRepetitionLevel(); } - public final OffsetIndex getOffsetIndex() { + public OffsetIndex getOffsetIndex() { return offsetIndex; } @Override public ColumnPageReaderIterator getPageIterator() { - final long dataPageOffset = columnChunk.meta_data.getData_page_offset(); - if (offsetIndex == null) { - return new ColumnPageReaderIteratorImpl(dataPageOffset, columnChunk.getMeta_data().getNum_values()); - } else { - return new ColumnPageReaderIteratorIndexImpl(); - } + return new ColumnPageReaderIteratorImpl(columnChunk.meta_data.getData_page_offset(), + columnChunk.getMeta_data().getNum_values()); } @Override @@ -217,9 +211,9 @@ private final class ColumnPageReaderIteratorImpl implements ColumnPageReaderIter private long nextHeaderOffset; private long remainingValues; - ColumnPageReaderIteratorImpl(final long startOffset, final long numValues) { + ColumnPageReaderIteratorImpl(final long dataPageStartOffset, final long numValues) { this.remainingValues = numValues; - this.nextHeaderOffset = startOffset; + this.nextHeaderOffset = dataPageStartOffset; } @Override @@ -238,37 +232,34 @@ public ColumnPageReader next(@NotNull final SeekableChannelContext channelContex final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext); final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), getURI())) { ch.position(headerOffset); - final PageHeader pageHeader; - try (final InputStream in = SeekableChannelsProvider.channelPositionInputStream(channelsProvider, ch)) { - pageHeader = Util.readPageHeader(in); - } + final PageHeader pageHeader = readPageHeader(ch); // relying on exact position of ch final long dataOffset = ch.position(); nextHeaderOffset = dataOffset + pageHeader.getCompressed_page_size(); - if (pageHeader.isSetDictionary_page_header()) { - // Dictionary page; skip it - return next(holder.get()); - } - if (!pageHeader.isSetData_page_header() && !pageHeader.isSetData_page_header_v2()) { - throw new IllegalStateException( - "Expected data page, but neither v1 nor v2 data page header is set in file " - + ch + " at offset " + headerOffset); + final PageType pageType = pageHeader.type; + if (pageType != PageType.DATA_PAGE && pageType != PageType.DATA_PAGE_V2) { + throw new IllegalStateException("Expected data page, but got " + pageType + " in file " + ch + + " at offset " + headerOffset); } - remainingValues -= getNumValues(pageHeader); - final org.apache.parquet.format.Encoding encoding = getEncoding(pageHeader); + final int numValuesInPage = getNumValues(pageHeader); + remainingValues -= numValuesInPage; final Function pageDictionarySupplier = - (encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY) - ? dictionarySupplier - : (SeekableChannelContext context) -> NULL_DICTIONARY; - return new ColumnPageReaderImpl(channelsProvider, decompressor, - pageDictionarySupplier, nullMaterializerFactory, path, getURI(), fieldTypes, - dataOffset, pageHeader, ColumnPageReaderImpl.NULL_NUM_VALUES); + getPageDictionarySupplier(pageHeader); + return new ColumnPageReaderImpl(channelsProvider, decompressor, pageDictionarySupplier, + nullMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader, numValuesInPage); } catch (IOException e) { throw new UncheckedDeephavenException("Error reading page header", e); } } } + private Function getPageDictionarySupplier(final PageHeader pageHeader) { + final org.apache.parquet.format.Encoding encoding = getEncoding(pageHeader); + return (encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY) + ? dictionarySupplier + : (SeekableChannelContext context) -> NULL_DICTIONARY; + } + private static org.apache.parquet.format.Encoding getEncoding(PageHeader pageHeader) { switch (pageHeader.type) { case DATA_PAGE: @@ -281,58 +272,51 @@ private static org.apache.parquet.format.Encoding getEncoding(PageHeader pageHea } } + private PageHeader readPageHeader(final SeekableByteChannel ch) throws IOException { + try (final InputStream in = SeekableChannelsProvider.channelPositionInputStream(channelsProvider, ch)) { + return Util.readPageHeader(in); + } + } + private static int getNumValues(PageHeader pageHeader) { return pageHeader.isSetData_page_header() ? pageHeader.getData_page_header().getNum_values() : pageHeader.getData_page_header_v2().getNum_values(); } - private final class ColumnPageReaderIteratorIndexImpl implements ColumnPageReaderIterator { - private int pos; - - ColumnPageReaderIteratorIndexImpl() { - pos = 0; - } - - @Override - public boolean hasNext() { - return offsetIndex.getPageCount() > pos; - } - - @Override - public ColumnPageReader next(@NotNull final SeekableChannelContext channelContext) { - if (!hasNext()) { - throw new NoSuchElementException("No next element"); - } - // Following logic assumes that offsetIndex will store the number of values for a page instead of number - // of rows (which can be different for array and vector columns). This behavior is because of a bug on - // parquet writing side which got fixed in deephaven-core/pull/4844 and is only kept to support reading - // parquet files written before deephaven-core/pull/4844. - final int numValues = (int) (offsetIndex.getLastRowIndex(pos, columnChunk.getMeta_data().getNum_values()) - - offsetIndex.getFirstRowIndex(pos) + 1); - final ColumnPageReader columnPageReader = - new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, - nullMaterializerFactory, path, getURI(), fieldTypes, offsetIndex.getOffset(pos), null, - numValues); - pos++; - return columnPageReader; - } - } - private final class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor { ColumnPageDirectAccessorImpl() {} @Override - public ColumnPageReader getPageReader(final int pageNum) { + public ColumnPageReader getPageReader(final int pageNum, final SeekableChannelContext channelContext) { if (pageNum < 0 || pageNum >= offsetIndex.getPageCount()) { throw new IndexOutOfBoundsException( "pageNum=" + pageNum + ", offsetIndex.getPageCount()=" + offsetIndex.getPageCount()); } - // Page header and number of values will be populated later when we read the page header from the file - return new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, nullMaterializerFactory, - path, getURI(), fieldTypes, offsetIndex.getOffset(pageNum), null, - ColumnPageReaderImpl.NULL_NUM_VALUES); + + // Read the page header to determine whether we need to use dictionary for this page + final long headerOffset = offsetIndex.getOffset(pageNum); + try ( + final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext); + final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), getURI())) { + ch.position(headerOffset); + final PageHeader pageHeader = readPageHeader(ch); + final long dataOffset = ch.position(); + final PageType pageType = pageHeader.type; + if (pageType != PageType.DATA_PAGE && pageType != PageType.DATA_PAGE_V2) { + throw new IllegalStateException("Expected data page, but got " + pageType + " for page number " + + pageNum + " for column " + getURI()); + } + final Function pageDictionarySupplier = + getPageDictionarySupplier(pageHeader); + return new ColumnPageReaderImpl(channelsProvider, decompressor, pageDictionarySupplier, + nullMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader, + getNumValues(pageHeader)); + } catch (final IOException e) { + throw new UncheckedDeephavenException("Error reading page header for page number " + pageNum + " for " + + "column " + getURI(), e); + } } } } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReader.java index 58b66e6abdd..b3192fda9bc 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReader.java @@ -18,10 +18,10 @@ public interface ColumnPageReader extends AutoCloseable { /** * @param channelContext The channel context to use for reading the parquet file - * @return The number of rows in this ColumnChunk, or -1 if it's unknown. + * @return The number of rows in this page, or -1 if it's unknown. */ default long numRows(final SeekableChannelContext channelContext) throws IOException { - return numValues(channelContext); + return numValues(); } /** @@ -47,10 +47,9 @@ IntBuffer readKeyValues(IntBuffer keyDest, int nullPlaceholder, SeekableChannelContext channelContext) throws IOException; /** - * @param channelContext The channel context to use for reading the parquet file - * @return The value stored under number DataPageHeader.num_values + * @return The number of values in this page */ - int numValues(SeekableChannelContext channelContext) throws IOException; + int numValues(); /** * @param channelContext The channel context to use for reading the parquet file diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java index 743926ea86c..39789ddfb38 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java @@ -22,7 +22,6 @@ import org.apache.parquet.format.DataPageHeaderV2; import org.apache.parquet.format.PageHeader; import org.apache.parquet.format.PageType; -import org.apache.parquet.format.Util; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.schema.Type; import org.jetbrains.annotations.NotNull; @@ -41,8 +40,7 @@ import static org.apache.parquet.column.ValuesType.VALUES; final class ColumnPageReaderImpl implements ColumnPageReader { - public static final int NULL_OFFSET = -1; - static final int NULL_NUM_VALUES = -1; + private static final int NULL_OFFSET = -1; private final SeekableChannelsProvider channelsProvider; private final CompressorAdapter compressorAdapter; @@ -53,10 +51,9 @@ final class ColumnPageReaderImpl implements ColumnPageReader { private final List fieldTypes; /** - * Stores the offset from where the next byte should be read. Can be the offset of page header if - * {@link #pageHeader} is {@code null}, else will be the offset of data. + * The offset for data following the page header in the file. */ - private long offset; + private final long dataOffset; private PageHeader pageHeader; private int numValues; private int rowCount = -1; @@ -72,11 +69,9 @@ final class ColumnPageReaderImpl implements ColumnPageReader { * @param path The path of the column. * @param uri The uri of the parquet file. * @param fieldTypes The types of the fields in the column. - * @param offset The offset for page header if supplied {@code pageHeader} is {@code null}. Else, the offset of data - * following the header in the page. - * @param pageHeader The page header if it is already read from the file. Else, {@code null}. - * @param numValues The number of values in the page if it is already read from the file. Else, - * {@value #NULL_NUM_VALUES} + * @param dataOffset The offset for data following the page header in the file. + * @param pageHeader The page header, should not be {@code null}. + * @param numValues The number of values in the page. */ ColumnPageReaderImpl(SeekableChannelsProvider channelsProvider, CompressorAdapter compressorAdapter, @@ -85,7 +80,7 @@ final class ColumnPageReaderImpl implements ColumnPageReader { ColumnDescriptor path, URI uri, List fieldTypes, - long offset, + long dataOffset, PageHeader pageHeader, int numValues) { this.channelsProvider = channelsProvider; @@ -95,8 +90,10 @@ final class ColumnPageReaderImpl implements ColumnPageReader { this.path = path; this.uri = uri; this.fieldTypes = fieldTypes; - this.offset = offset; + this.dataOffset = dataOffset; + Assert.neqNull(pageHeader, "pageHeader"); this.pageHeader = pageHeader; + Assert.geqZero(numValues, "numValues"); this.numValues = numValues; } @@ -106,7 +103,7 @@ public Object materialize(@NotNull final Object nullValue, try ( final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext); final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), uri)) { - ensurePageHeader(channelsProvider, ch); + ch.position(dataOffset); return readDataPage(nullValue, ch, holder.get()); } } @@ -115,7 +112,7 @@ public int readRowCount(@NotNull final SeekableChannelContext channelContext) th try ( final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext); final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), uri)) { - ensurePageHeader(channelsProvider, ch); + ch.position(dataOffset); return readRowCountFromDataPage(ch); } } @@ -126,51 +123,11 @@ public IntBuffer readKeyValues(IntBuffer keyDest, int nullPlaceholder, try ( final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext); final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), uri)) { - ensurePageHeader(channelsProvider, ch); + ch.position(dataOffset); return readKeysFromDataPage(keyDest, nullPlaceholder, ch, holder.get()); } } - /** - * If {@link #pageHeader} is {@code null}, read it from the channel, and increment the {@link #offset} by the length - * of page header. Channel position would be set to the end of page header or beginning of data before returning. - */ - private void ensurePageHeader(SeekableChannelsProvider provider, SeekableByteChannel ch) throws IOException { - // Set this channel's position to appropriate offset for reading. If pageHeader is null, this offset would be - // the offset of page header, else it would be the offset of data. - ch.position(offset); - synchronized (this) { - if (pageHeader == null) { - try (final InputStream in = SeekableChannelsProvider.channelPositionInputStream(provider, ch)) { - pageHeader = Util.readPageHeader(in); - } - offset = ch.position(); - if (numValues >= 0) { - final int numValuesFromHeader = readNumValuesFromPageHeader(pageHeader); - if (numValues != numValuesFromHeader) { - throw new IllegalStateException( - "numValues = " + numValues + " different from number of values " + - "read from the page header = " + numValuesFromHeader + " for column " + path); - } - } - } - if (numValues == NULL_NUM_VALUES) { - numValues = readNumValuesFromPageHeader(pageHeader); - } - } - } - - private static int readNumValuesFromPageHeader(@NotNull final PageHeader header) throws IOException { - switch (header.type) { - case DATA_PAGE: - return header.getData_page_header().getNum_values(); - case DATA_PAGE_V2: - return header.getData_page_header_v2().getNum_values(); - default: - throw new IOException(String.format("Unexpected page of type {%s}", header.getType())); - } - } - /** * Callers must ensure resulting data page does not outlive the input stream. */ @@ -588,18 +545,8 @@ private ValuesReader getDataReader(Encoding dataEncoding, ByteBuffer in, int val } @Override - public int numValues(@NotNull final SeekableChannelContext channelContext) throws IOException { - if (numValues >= 0) { - return numValues; - } - try ( - final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext); - final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), uri)) { - ensurePageHeader(channelsProvider, ch); - // Above will block till it populates numValues - Assert.geqZero(numValues, "numValues"); - return numValues; - } + public int numValues() { + return numValues; } @NotNull @@ -617,7 +564,7 @@ public void close() throws Exception { public long numRows(@NotNull final SeekableChannelContext channelContext) throws IOException { if (rowCount == -1) { if (path.getMaxRepetitionLevel() == 0) { - rowCount = numValues(channelContext); + rowCount = numValues(); } else { rowCount = readRowCount(channelContext); } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java index cafe83aa12b..d8ffbb4307f 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java @@ -123,7 +123,7 @@ public Set getColumnsWithDictionaryUsedOnEveryDataPage() { * True only if we are certain every data page in this column chunk uses dictionary encoding; note false also covers * the "we can't tell" case. */ - private boolean columnChunkUsesDictionaryOnEveryPage(final ColumnChunk columnChunk) { + private static boolean columnChunkUsesDictionaryOnEveryPage(final ColumnChunk columnChunk) { final ColumnMetaData columnMeta = columnChunk.getMeta_data(); if (columnMeta.encoding_stats == null) { return false; // this is false as "don't know". diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java index 0e27c297922..5c556ec1f74 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java @@ -65,21 +65,20 @@ public static class GroupingColumnWritingInfo { /** * Parquet name of this grouping column */ - public final String parquetColumnName; + final String parquetColumnName; /** * File path to be added in the grouping metadata of main parquet file */ - public final File metadataFilePath; + final File metadataFilePath; /** * Destination path for writing the grouping file. The two filenames can differ because we write grouping files * to shadow file paths first and then place them at the final path once the write is complete. But the metadata * should always hold the accurate path. */ - public final File destFile; + final File destFile; - public GroupingColumnWritingInfo(final String parquetColumnName, final File metadataFilePath, - final File destFile) { + GroupingColumnWritingInfo(final String parquetColumnName, final File metadataFilePath, final File destFile) { this.parquetColumnName = parquetColumnName; this.metadataFilePath = metadataFilePath; this.destFile = destFile; diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java index 2634c65b0b9..807837d13aa 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java @@ -127,9 +127,9 @@ private ChunkPage getPage(@Nullable final FillContext fillContext, final i } private ChunkPage getPageImpl(@Nullable FillContext fillContext, int pageNum) { - final ColumnPageReader reader = columnPageDirectAccessor.getPageReader(pageNum); // Use the latest context while reading the page, or create (and close) new one try (final ContextHolder holder = ensureContext(fillContext)) { + final ColumnPageReader reader = columnPageDirectAccessor.getPageReader(pageNum, holder.get()); return toPage(offsetIndex.getFirstRowIndex(pageNum), reader, holder.get()); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToPageWithDictionary.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToPageWithDictionary.java index d733e8a198f..c947b281c82 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToPageWithDictionary.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToPageWithDictionary.java @@ -58,7 +58,7 @@ public final Object getResult(@NotNull final ColumnPageReader columnPageReader, return ToPage.super.getResult(columnPageReader, channelContext); } - final int[] keys = new int[columnPageReader.numValues(channelContext)]; + final int[] keys = new int[columnPageReader.numValues()]; final IntBuffer offsets = columnPageReader.readKeyValues(IntBuffer.wrap(keys), NULL_INT, channelContext); return offsets == null ? keys : new DataWithOffsets(offsets, keys); diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/BufferPool.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/BufferPool.java index b528fbf8ef5..38d4ba03d08 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/BufferPool.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/BufferPool.java @@ -1,9 +1,7 @@ package io.deephaven.extensions.s3; import io.deephaven.base.reference.PooledObjectReference; -import io.deephaven.base.reference.SimpleReference; import io.deephaven.util.datastructures.SegmentedSoftPool; -import io.deephaven.util.referencecounting.ReferenceCounted; import org.jetbrains.annotations.NotNull; import java.nio.ByteBuffer;