Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Feb 21, 2024
1 parent d4da175 commit 3087ea9
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ interface ColumnPageReaderIterator {
interface ColumnPageDirectAccessor {
/**
* Directly access a page reader for a given page number.
*
* @param pageNum The page number to access.
* @param channelContext The channel context to use for constructing the reader
*/
ColumnPageReader getPageReader(int pageNum);
ColumnPageReader getPageReader(int pageNum, SeekableChannelContext channelContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Function;

import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME;
import static org.apache.parquet.format.Encoding.PLAIN_DICTIONARY;
import static org.apache.parquet.format.Encoding.RLE_DICTIONARY;

Expand Down Expand Up @@ -93,18 +91,14 @@ public int getMaxRl() {
return path.getMaxRepetitionLevel();
}

public final OffsetIndex getOffsetIndex() {
public OffsetIndex getOffsetIndex() {
return offsetIndex;
}

@Override
public ColumnPageReaderIterator getPageIterator() {
final long dataPageOffset = columnChunk.meta_data.getData_page_offset();
if (offsetIndex == null) {
return new ColumnPageReaderIteratorImpl(dataPageOffset, columnChunk.getMeta_data().getNum_values());
} else {
return new ColumnPageReaderIteratorIndexImpl();
}
return new ColumnPageReaderIteratorImpl(columnChunk.meta_data.getData_page_offset(),
columnChunk.getMeta_data().getNum_values());
}

@Override
Expand Down Expand Up @@ -217,9 +211,9 @@ private final class ColumnPageReaderIteratorImpl implements ColumnPageReaderIter
private long nextHeaderOffset;
private long remainingValues;

ColumnPageReaderIteratorImpl(final long startOffset, final long numValues) {
ColumnPageReaderIteratorImpl(final long dataPageStartOffset, final long numValues) {
this.remainingValues = numValues;
this.nextHeaderOffset = startOffset;
this.nextHeaderOffset = dataPageStartOffset;
}

@Override
Expand All @@ -238,37 +232,34 @@ public ColumnPageReader next(@NotNull final SeekableChannelContext channelContex
final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext);
final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), getURI())) {
ch.position(headerOffset);
final PageHeader pageHeader;
try (final InputStream in = SeekableChannelsProvider.channelPositionInputStream(channelsProvider, ch)) {
pageHeader = Util.readPageHeader(in);
}
final PageHeader pageHeader = readPageHeader(ch);
// relying on exact position of ch
final long dataOffset = ch.position();
nextHeaderOffset = dataOffset + pageHeader.getCompressed_page_size();
if (pageHeader.isSetDictionary_page_header()) {
// Dictionary page; skip it
return next(holder.get());
}
if (!pageHeader.isSetData_page_header() && !pageHeader.isSetData_page_header_v2()) {
throw new IllegalStateException(
"Expected data page, but neither v1 nor v2 data page header is set in file "
+ ch + " at offset " + headerOffset);
final PageType pageType = pageHeader.type;
if (pageType != PageType.DATA_PAGE && pageType != PageType.DATA_PAGE_V2) {
throw new IllegalStateException("Expected data page, but got " + pageType + " in file " + ch +
" at offset " + headerOffset);
}
remainingValues -= getNumValues(pageHeader);
final org.apache.parquet.format.Encoding encoding = getEncoding(pageHeader);
final int numValuesInPage = getNumValues(pageHeader);
remainingValues -= numValuesInPage;
final Function<SeekableChannelContext, Dictionary> pageDictionarySupplier =
(encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY)
? dictionarySupplier
: (SeekableChannelContext context) -> NULL_DICTIONARY;
return new ColumnPageReaderImpl(channelsProvider, decompressor,
pageDictionarySupplier, nullMaterializerFactory, path, getURI(), fieldTypes,
dataOffset, pageHeader, ColumnPageReaderImpl.NULL_NUM_VALUES);
getPageDictionarySupplier(pageHeader);
return new ColumnPageReaderImpl(channelsProvider, decompressor, pageDictionarySupplier,
nullMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader, numValuesInPage);
} catch (IOException e) {
throw new UncheckedDeephavenException("Error reading page header", e);
}
}
}

private Function<SeekableChannelContext, Dictionary> getPageDictionarySupplier(final PageHeader pageHeader) {
final org.apache.parquet.format.Encoding encoding = getEncoding(pageHeader);
return (encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY)
? dictionarySupplier
: (SeekableChannelContext context) -> NULL_DICTIONARY;
}

private static org.apache.parquet.format.Encoding getEncoding(PageHeader pageHeader) {
switch (pageHeader.type) {
case DATA_PAGE:
Expand All @@ -281,58 +272,51 @@ private static org.apache.parquet.format.Encoding getEncoding(PageHeader pageHea
}
}

private PageHeader readPageHeader(final SeekableByteChannel ch) throws IOException {
try (final InputStream in = SeekableChannelsProvider.channelPositionInputStream(channelsProvider, ch)) {
return Util.readPageHeader(in);
}
}

private static int getNumValues(PageHeader pageHeader) {
return pageHeader.isSetData_page_header()
? pageHeader.getData_page_header().getNum_values()
: pageHeader.getData_page_header_v2().getNum_values();
}

private final class ColumnPageReaderIteratorIndexImpl implements ColumnPageReaderIterator {
private int pos;

ColumnPageReaderIteratorIndexImpl() {
pos = 0;
}

@Override
public boolean hasNext() {
return offsetIndex.getPageCount() > pos;
}

@Override
public ColumnPageReader next(@NotNull final SeekableChannelContext channelContext) {
if (!hasNext()) {
throw new NoSuchElementException("No next element");
}
// Following logic assumes that offsetIndex will store the number of values for a page instead of number
// of rows (which can be different for array and vector columns). This behavior is because of a bug on
// parquet writing side which got fixed in deephaven-core/pull/4844 and is only kept to support reading
// parquet files written before deephaven-core/pull/4844.
final int numValues = (int) (offsetIndex.getLastRowIndex(pos, columnChunk.getMeta_data().getNum_values())
- offsetIndex.getFirstRowIndex(pos) + 1);
final ColumnPageReader columnPageReader =
new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier,
nullMaterializerFactory, path, getURI(), fieldTypes, offsetIndex.getOffset(pos), null,
numValues);
pos++;
return columnPageReader;
}
}

private final class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor {

ColumnPageDirectAccessorImpl() {}

@Override
public ColumnPageReader getPageReader(final int pageNum) {
public ColumnPageReader getPageReader(final int pageNum, final SeekableChannelContext channelContext) {
if (pageNum < 0 || pageNum >= offsetIndex.getPageCount()) {
throw new IndexOutOfBoundsException(
"pageNum=" + pageNum + ", offsetIndex.getPageCount()=" + offsetIndex.getPageCount());
}
// Page header and number of values will be populated later when we read the page header from the file
return new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, nullMaterializerFactory,
path, getURI(), fieldTypes, offsetIndex.getOffset(pageNum), null,
ColumnPageReaderImpl.NULL_NUM_VALUES);

// Read the page header to determine whether we need to use dictionary for this page
final long headerOffset = offsetIndex.getOffset(pageNum);
try (
final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext);
final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), getURI())) {
ch.position(headerOffset);
final PageHeader pageHeader = readPageHeader(ch);
final long dataOffset = ch.position();
final PageType pageType = pageHeader.type;
if (pageType != PageType.DATA_PAGE && pageType != PageType.DATA_PAGE_V2) {
throw new IllegalStateException("Expected data page, but got " + pageType + " for page number "
+ pageNum + " for column " + getURI());
}
final Function<SeekableChannelContext, Dictionary> pageDictionarySupplier =
getPageDictionarySupplier(pageHeader);
return new ColumnPageReaderImpl(channelsProvider, decompressor, pageDictionarySupplier,
nullMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader,
getNumValues(pageHeader));
} catch (final IOException e) {
throw new UncheckedDeephavenException("Error reading page header for page number " + pageNum + " for " +
"column " + getURI(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ public interface ColumnPageReader extends AutoCloseable {

/**
* @param channelContext The channel context to use for reading the parquet file
* @return The number of rows in this ColumnChunk, or -1 if it's unknown.
* @return The number of rows in this page, or -1 if it's unknown.
*/
default long numRows(final SeekableChannelContext channelContext) throws IOException {
return numValues(channelContext);
return numValues();
}

/**
Expand All @@ -47,10 +47,9 @@ IntBuffer readKeyValues(IntBuffer keyDest, int nullPlaceholder,
SeekableChannelContext channelContext) throws IOException;

/**
* @param channelContext The channel context to use for reading the parquet file
* @return The value stored under number DataPageHeader.num_values
* @return The number of values in this page
*/
int numValues(SeekableChannelContext channelContext) throws IOException;
int numValues();

/**
* @param channelContext The channel context to use for reading the parquet file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.parquet.format.DataPageHeaderV2;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.PageType;
import org.apache.parquet.format.Util;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.schema.Type;
import org.jetbrains.annotations.NotNull;
Expand All @@ -41,8 +40,7 @@
import static org.apache.parquet.column.ValuesType.VALUES;

final class ColumnPageReaderImpl implements ColumnPageReader {
public static final int NULL_OFFSET = -1;
static final int NULL_NUM_VALUES = -1;
private static final int NULL_OFFSET = -1;

private final SeekableChannelsProvider channelsProvider;
private final CompressorAdapter compressorAdapter;
Expand All @@ -53,10 +51,9 @@ final class ColumnPageReaderImpl implements ColumnPageReader {
private final List<Type> fieldTypes;

/**
* Stores the offset from where the next byte should be read. Can be the offset of page header if
* {@link #pageHeader} is {@code null}, else will be the offset of data.
* The offset for data following the page header in the file.
*/
private long offset;
private final long dataOffset;
private PageHeader pageHeader;
private int numValues;
private int rowCount = -1;
Expand All @@ -72,11 +69,9 @@ final class ColumnPageReaderImpl implements ColumnPageReader {
* @param path The path of the column.
* @param uri The uri of the parquet file.
* @param fieldTypes The types of the fields in the column.
* @param offset The offset for page header if supplied {@code pageHeader} is {@code null}. Else, the offset of data
* following the header in the page.
* @param pageHeader The page header if it is already read from the file. Else, {@code null}.
* @param numValues The number of values in the page if it is already read from the file. Else,
* {@value #NULL_NUM_VALUES}
* @param dataOffset The offset for data following the page header in the file.
* @param pageHeader The page header, should not be {@code null}.
* @param numValues The number of values in the page.
*/
ColumnPageReaderImpl(SeekableChannelsProvider channelsProvider,
CompressorAdapter compressorAdapter,
Expand All @@ -85,7 +80,7 @@ final class ColumnPageReaderImpl implements ColumnPageReader {
ColumnDescriptor path,
URI uri,
List<Type> fieldTypes,
long offset,
long dataOffset,
PageHeader pageHeader,
int numValues) {
this.channelsProvider = channelsProvider;
Expand All @@ -95,8 +90,10 @@ final class ColumnPageReaderImpl implements ColumnPageReader {
this.path = path;
this.uri = uri;
this.fieldTypes = fieldTypes;
this.offset = offset;
this.dataOffset = dataOffset;
Assert.neqNull(pageHeader, "pageHeader");
this.pageHeader = pageHeader;
Assert.geqZero(numValues, "numValues");
this.numValues = numValues;
}

Expand All @@ -106,7 +103,7 @@ public Object materialize(@NotNull final Object nullValue,
try (
final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext);
final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), uri)) {
ensurePageHeader(channelsProvider, ch);
ch.position(dataOffset);
return readDataPage(nullValue, ch, holder.get());
}
}
Expand All @@ -115,7 +112,7 @@ public int readRowCount(@NotNull final SeekableChannelContext channelContext) th
try (
final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext);
final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), uri)) {
ensurePageHeader(channelsProvider, ch);
ch.position(dataOffset);
return readRowCountFromDataPage(ch);
}
}
Expand All @@ -126,51 +123,11 @@ public IntBuffer readKeyValues(IntBuffer keyDest, int nullPlaceholder,
try (
final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext);
final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), uri)) {
ensurePageHeader(channelsProvider, ch);
ch.position(dataOffset);
return readKeysFromDataPage(keyDest, nullPlaceholder, ch, holder.get());
}
}

/**
* If {@link #pageHeader} is {@code null}, read it from the channel, and increment the {@link #offset} by the length
* of page header. Channel position would be set to the end of page header or beginning of data before returning.
*/
private void ensurePageHeader(SeekableChannelsProvider provider, SeekableByteChannel ch) throws IOException {
// Set this channel's position to appropriate offset for reading. If pageHeader is null, this offset would be
// the offset of page header, else it would be the offset of data.
ch.position(offset);
synchronized (this) {
if (pageHeader == null) {
try (final InputStream in = SeekableChannelsProvider.channelPositionInputStream(provider, ch)) {
pageHeader = Util.readPageHeader(in);
}
offset = ch.position();
if (numValues >= 0) {
final int numValuesFromHeader = readNumValuesFromPageHeader(pageHeader);
if (numValues != numValuesFromHeader) {
throw new IllegalStateException(
"numValues = " + numValues + " different from number of values " +
"read from the page header = " + numValuesFromHeader + " for column " + path);
}
}
}
if (numValues == NULL_NUM_VALUES) {
numValues = readNumValuesFromPageHeader(pageHeader);
}
}
}

private static int readNumValuesFromPageHeader(@NotNull final PageHeader header) throws IOException {
switch (header.type) {
case DATA_PAGE:
return header.getData_page_header().getNum_values();
case DATA_PAGE_V2:
return header.getData_page_header_v2().getNum_values();
default:
throw new IOException(String.format("Unexpected page of type {%s}", header.getType()));
}
}

/**
* Callers must ensure resulting data page does not outlive the input stream.
*/
Expand Down Expand Up @@ -588,18 +545,8 @@ private ValuesReader getDataReader(Encoding dataEncoding, ByteBuffer in, int val
}

@Override
public int numValues(@NotNull final SeekableChannelContext channelContext) throws IOException {
if (numValues >= 0) {
return numValues;
}
try (
final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext);
final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), uri)) {
ensurePageHeader(channelsProvider, ch);
// Above will block till it populates numValues
Assert.geqZero(numValues, "numValues");
return numValues;
}
public int numValues() {
return numValues;
}

@NotNull
Expand All @@ -617,7 +564,7 @@ public void close() throws Exception {
public long numRows(@NotNull final SeekableChannelContext channelContext) throws IOException {
if (rowCount == -1) {
if (path.getMaxRepetitionLevel() == 0) {
rowCount = numValues(channelContext);
rowCount = numValues();
} else {
rowCount = readRowCount(channelContext);
}
Expand Down
Loading

0 comments on commit 3087ea9

Please sign in to comment.