Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DH-18399: Add ParquetColumnResolver #6558

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par

public abstract Optional<Collection<List<String>>> getIndexColumns();

public abstract Optional<ParquetColumnResolver.Factory> getColumnResolver();
public abstract Optional<ParquetColumnResolver.Factory> getColumnResolverFactory();

/**
* Creates a new {@link ParquetInstructions} object with the same properties as the current object but definition
Expand Down Expand Up @@ -321,7 +321,7 @@ public Optional<Collection<List<String>>> getIndexColumns() {
}

@Override
public Optional<ParquetColumnResolver.Factory> getColumnResolver() {
public Optional<ParquetColumnResolver.Factory> getColumnResolverFactory() {
return Optional.empty();
}

Expand Down Expand Up @@ -635,7 +635,7 @@ public Optional<Collection<List<String>>> getIndexColumns() {
}

@Override
public Optional<ParquetColumnResolver.Factory> getColumnResolver() {
public Optional<ParquetColumnResolver.Factory> getColumnResolverFactory() {
return Optional.ofNullable(columnResolver);
}

Expand Down Expand Up @@ -760,7 +760,7 @@ public Builder(final ParquetInstructions parquetInstructions) {
tableDefinition = readOnlyParquetInstructions.getTableDefinition().orElse(null);
indexColumns = readOnlyParquetInstructions.getIndexColumns().orElse(null);
onWriteCompleted = readOnlyParquetInstructions.onWriteCompleted().orElse(null);
columnResolverFactory = readOnlyParquetInstructions.getColumnResolver().orElse(null);
columnResolverFactory = readOnlyParquetInstructions.getColumnResolverFactory().orElse(null);
}

public Builder addColumnNameMapping(final String parquetColumnName, final String columnName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,72 +3,42 @@
//
package io.deephaven.parquet.table.location;

import io.deephaven.annotations.BuildableStyle;
import io.deephaven.engine.table.impl.locations.TableKey;
import io.deephaven.parquet.table.ParquetInstructions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.MessageType;
import org.immutables.value.Value;

import java.util.Map;
import java.util.Optional;

/**
* A mapping between Deephaven column names and Parquet {@link ColumnDescriptor column descriptors}.
*
* TODO: describe better
* A resolver from Deephaven column names to Parquet paths.
*/
@Value.Immutable
@BuildableStyle
public abstract class ParquetColumnResolver {
public interface ParquetColumnResolver {

/**
* {@link ParquetInstructions.Builder#setColumnResolverFactory(Factory)}
*/
public interface Factory {
interface Factory {

/**
* TODO: description
*
*
*
* @param tableKey the table key
* @param tableLocationKey the Parquet TLK
* @return the Parquet column resolver
*/
ParquetColumnResolver init(TableKey tableKey, ParquetTableLocationKey tableLocationKey);
}

public static Builder builder() {
return ImmutableParquetColumnResolver.builder();
ParquetColumnResolver of(TableKey tableKey, ParquetTableLocationKey tableLocationKey);
}

// Intentionally not exposed, but necessary to expose to Builder for safety checks.
abstract MessageType schema();

/**
* TODO: javadoc
*
* @return
*
*
* @param columnName the column name
* @return the path to the leaf field in the schema
* @see ColumnDescriptor#getPath()
* @see MessageType#getColumnDescription(String[])
*/
public abstract Map<String, ColumnDescriptor> mapping();

@Value.Check
final void checkColumns() {
for (ColumnDescriptor columnDescriptor : mapping().values()) {
if (!ParquetUtil.contains(schema(), columnDescriptor)) {
throw new IllegalArgumentException("schema does not contain column descriptor " + columnDescriptor);
}
}
}

public interface Builder {

// TODO: javadoc

Builder schema(MessageType schema);

Builder putMapping(String key, ColumnDescriptor value);

Builder putAllMapping(Map<String, ? extends ColumnDescriptor> entries);

ParquetColumnResolver build();
}
Optional<String[]> of(String columnName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.table.location;

import io.deephaven.annotations.BuildableStyle;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.MessageType;
import org.immutables.value.Value;

import java.util.Map;
import java.util.Optional;

/**
* A {@link ParquetColumnResolver} implementation based on a map from Deephaven column names to Parquet
* {@link ColumnDescriptor column descriptors}.
*/
@Value.Immutable
@BuildableStyle
public abstract class ParquetColumnResolverMap implements ParquetColumnResolver {

public static Builder builder() {
return ImmutableParquetColumnResolverMap.builder();
}

/**
* The Parquet schema.
*/
public abstract MessageType schema();

/**
* The map from Deephaven column name to {@link ColumnDescriptor}. The {@link #schema()} must contain each column
* descriptor.
*/
public abstract Map<String, ColumnDescriptor> mapping();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this to keep ColumnDescriptor as part of the interface of the implementation? Do we want the Iceberg implementation using this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very much in favor of keeping this implementation strongly-typed with the safety checks. Iceberg should be able to use this implementation (and even use the same Factory to create it) in some situations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed my mind; the map implementation is now very generic, no Parquet specific types.


@Override
public final Optional<String[]> of(String columnName) {
return Optional.ofNullable(mapping().get(columnName)).map(ColumnDescriptor::getPath);
}

public interface Builder {
Builder schema(MessageType schema);

Builder putMapping(String key, ColumnDescriptor value);

Builder putAllMapping(Map<String, ? extends ColumnDescriptor> entries);

ParquetColumnResolverMap build();
}

@Value.Check
final void checkMapping() {
for (Map.Entry<String, ColumnDescriptor> e : mapping().entrySet()) {
final ColumnDescriptor columnDescriptor = e.getValue();
if (!ParquetUtil.contains(schema(), columnDescriptor)) {
throw new IllegalArgumentException(
String.format("schema does not contain Deephaven columnName=%s columnDescriptor=%s", e.getKey(),
columnDescriptor));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import io.deephaven.engine.table.impl.locations.TableKey;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
Expand All @@ -22,16 +21,16 @@
* The following is an example {@link ParquetColumnResolver.Factory} that may be useful for testing and debugging
* purposes, but is not meant to be used for production use cases.
*/
public final class ParquetColumnResolverFieldIdFactory implements ParquetColumnResolver.Factory {
public final class ParquetFieldIdColumnResolverFactory implements ParquetColumnResolver.Factory {

/**
* TODO: javadoc
*
* @param columnNameToFieldId a map from Deephaven column names to field ids
* @return the column resolver provider
*/
public static ParquetColumnResolverFieldIdFactory of(Map<String, Integer> columnNameToFieldId) {
return new ParquetColumnResolverFieldIdFactory(columnNameToFieldId
public static ParquetFieldIdColumnResolverFactory of(Map<String, Integer> columnNameToFieldId) {
return new ParquetFieldIdColumnResolverFactory(columnNameToFieldId
.entrySet()
.stream()
.collect(Collectors.groupingBy(
Expand All @@ -41,18 +40,22 @@ public static ParquetColumnResolverFieldIdFactory of(Map<String, Integer> column

private final Map<Integer, Set<String>> fieldIdsToDhColumnNames;

private ParquetColumnResolverFieldIdFactory(Map<Integer, Set<String>> fieldIdsToDhColumnNames) {
private ParquetFieldIdColumnResolverFactory(Map<Integer, Set<String>> fieldIdsToDhColumnNames) {
this.fieldIdsToDhColumnNames = Objects.requireNonNull(fieldIdsToDhColumnNames);
}

@Override
public ParquetColumnResolver init(TableKey tableKey, ParquetTableLocationKey tableLocationKey) {
public ParquetColumnResolver of(TableKey tableKey, ParquetTableLocationKey tableLocationKey) {
final MessageType schema = tableLocationKey.getFileReader().getSchema();
// TODO: note the potential for confusion on where to derive schema from.
// final MessageType schema = tableLocationKey.getMetadata().getFileMetaData().getSchema();
return of(schema);
}

public ParquetColumnResolverMap of(MessageType schema) {
final FieldIdMappingVisitor visitor = new FieldIdMappingVisitor();
ParquetUtil.walk(schema, visitor);
return ParquetColumnResolver.builder()
return ParquetColumnResolverMap.builder()
.schema(schema)
.putAllMapping(visitor.nameToColumnDescriptor)
.build();
Expand All @@ -67,10 +70,11 @@ public void accept(Collection<Type> path, PrimitiveType primitiveType) {
// field id closest to the leaf. This version, however, takes the most general approach and considers field
// ids wherever they appear; ultimately, only being resolvable if the field id mapping is unambiguous.
for (Type type : path) {
if (type.getId() == null) {
final Type.ID id = type.getId();
if (id == null) {
continue;
}
final int fieldId = type.getId().intValue();
final int fieldId = id.intValue();
final Set<String> set = fieldIdsToDhColumnNames.get(fieldId);
if (set == null) {
continue;
Expand All @@ -79,9 +83,9 @@ public void accept(Collection<Type> path, PrimitiveType primitiveType) {
for (String columnName : set) {
final ColumnDescriptor existing = nameToColumnDescriptor.putIfAbsent(columnName, columnDescriptor);
if (existing != null) {
throw new IllegalStateException(String.format(
"Parquet columns can't be unambigously mapped. %d -> %s has multiple paths %s, %s",
fieldId, columnName, Arrays.toString(existing.getPath()),
throw new IllegalArgumentException(String.format(
"Parquet columns can't be unambigously mapped. %s -> %d has multiple paths %s, %s",
columnName, fieldId, Arrays.toString(existing.getPath()),
Arrays.toString(columnDescriptor.getPath())));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,9 @@ public ParquetTableLocation(@NotNull final TableKey tableKey,
parquetMetadata = tableLocationKey.getMetadata();
rowGroupIndices = tableLocationKey.getRowGroupIndices();
}
{
final ParquetColumnResolver.Factory factory = readInstructions.getColumnResolver().orElse(null);
resolver = factory == null
? null
: Objects.requireNonNull(factory.init(tableKey, tableLocationKey));
}
resolver = readInstructions.getColumnResolverFactory()
.map(factory -> factory.of(tableKey, tableLocationKey))
.orElse(null);
final int rowGroupCount = rowGroupIndices.length;
rowGroups = IntStream.of(rowGroupIndices)
.mapToObj(rgi -> parquetFileReader.fileMetaData.getRow_groups().get(rgi))
Expand Down Expand Up @@ -194,12 +191,10 @@ protected ColumnLocation makeColumnLocation(@NotNull final String columnName) {
final String[] columnPath = parquetColumnNameToPath.get(parquetColumnName);
nameList = columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath);
} else {
final ColumnDescriptor columnDescriptor = resolver.mapping().get(columnName);
if (columnDescriptor == null) {
nameList = List.of(); // empty, will not resolve
} else {
nameList = Arrays.asList(columnDescriptor.getPath());
}
// empty list will result in exists=false
nameList = resolver.of(columnName)
.map(Arrays::asList)
.orElse(List.of());
}
final ColumnChunkReader[] columnChunkReaders = Arrays.stream(getRowGroupReaders())
.map(rgr -> rgr.getColumnChunk(columnName, nameList))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ interface Visitor {
void accept(Collection<Type> path, PrimitiveType primitiveType);
}


static class ColumnDescriptorVisitor implements Visitor {

private final Consumer<ColumnDescriptor> consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void empty() {
assertThat(ParquetInstructions.EMPTY.getFileLayout()).isEmpty();
assertThat(ParquetInstructions.EMPTY.getTableDefinition()).isEmpty();
assertThat(ParquetInstructions.EMPTY.getIndexColumns()).isEmpty();
assertThat(ParquetInstructions.EMPTY.getColumnResolver()).isEmpty();
assertThat(ParquetInstructions.EMPTY.getColumnResolverFactory()).isEmpty();
assertThat(ParquetInstructions.EMPTY.baseNameForPartitionedParquetData()).isEqualTo("{uuid}");
}

Expand Down Expand Up @@ -152,7 +152,7 @@ public void columnResolver() {
.setTableDefinition(TableDefinition.of(ColumnDefinition.ofInt("Foo")))
.setColumnResolverFactory(ColumnResolverTestImpl.INSTANCE)
.build();
assertThat(instructions.getColumnResolver()).hasValue(ColumnResolverTestImpl.INSTANCE);
assertThat(instructions.getColumnResolverFactory()).hasValue(ColumnResolverTestImpl.INSTANCE);
}

@Test
Expand All @@ -171,7 +171,7 @@ private enum ColumnResolverTestImpl implements ParquetColumnResolver.Factory {
INSTANCE;

@Override
public ParquetColumnResolver init(TableKey tableKey, ParquetTableLocationKey tableLocationKey) {
public ParquetColumnResolver of(TableKey tableKey, ParquetTableLocationKey tableLocationKey) {
throw new UnsupportedOperationException();
}
}
Expand Down
Loading
Loading