Skip to content

Commit

Permalink
[apacheGH-3035][ParquetRewriter] replace renamed schema variable with…
Browse files Browse the repository at this point in the history
… dynamic extraction
  • Loading branch information
maxim_konstantinov committed Oct 31, 2024
1 parent 2aec624 commit 169af05
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ public class ParquetRewriter implements Closeable {
private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
private final Queue<TransParquetFileReader> inputFilesToJoin = new LinkedList<>();
private final MessageType outSchema;
private final MessageType outSchemaWithRenamedColumns;
// The index cache strategy
private final IndexCache.CacheStrategy indexCacheStrategy;
private final boolean overwriteInputWithJoinColumns;
Expand All @@ -158,23 +157,21 @@ public ParquetRewriter(RewriteOptions options) throws IOException {
this.overwriteInputWithJoinColumns = options.getOverwriteInputWithJoinColumns();
this.renamedColumns = options.gerRenameColumns();
ParquetConfiguration conf = options.getParquetConfiguration();
OutputFile out = options.getParquetOutputFile();
inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), conf));
this.outSchema = pruneColumnsInSchema(getSchema(), options.getPruneColumns());
this.extraMetaData = getExtraMetadata(options);
ensureSameSchema(inputFiles);
ensureSameSchema(inputFilesToJoin);
ensureRowCount();
ensureRenamingCorrectness(outSchema, renamedColumns);
OutputFile out = options.getParquetOutputFile();
LOG.info(
"Start rewriting {} input file(s) {} to {}",
inputFiles.size() + inputFilesToJoin.size(),
Stream.concat(options.getParquetInputFiles().stream(), options.getParquetInputFilesToJoin().stream())
.collect(Collectors.toList()),
out);

this.outSchema = pruneColumnsInSchema(getSchema(), options.getPruneColumns());
ensureRenamedColumnsCorrectness(outSchema, renamedColumns);
this.outSchemaWithRenamedColumns = getSchemaWithRenamedColumns(this.outSchema);
this.extraMetaData = getExtraMetadata(options);
options.getParquetOutputFile());

if (options.getMaskColumns() != null) {
this.maskColumns = new HashMap<>();
Expand All @@ -191,7 +188,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException {
ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
writer = new ParquetFileWriter(
out,
outSchemaWithRenamedColumns != null ? outSchemaWithRenamedColumns : outSchema,
renamedColumns.isEmpty() ? outSchema : getSchemaWithRenamedColumns(this.outSchema),
writerMode,
DEFAULT_BLOCK_SIZE,
MAX_PADDING_SIZE_DEFAULT,
Expand Down Expand Up @@ -227,7 +224,6 @@ public ParquetRewriter(
MaskMode maskMode) {
this.writer = writer;
this.outSchema = outSchema;
this.outSchemaWithRenamedColumns = outSchema;
this.newCodecName = codecName;
extraMetaData = new HashMap<>(meta.getFileMetaData().getKeyValueMetaData());
extraMetaData.put(
Expand Down Expand Up @@ -366,7 +362,7 @@ private void ensureSameSchema(Queue<TransParquetFileReader> inputFileReaders) {
}
}

private void ensureRenamedColumnsCorrectness(MessageType schema, Map<String, String> renameMap) {
private void ensureRenamingCorrectness(MessageType schema, Map<String, String> renameMap) {
Set<String> columns = schema.getFields().stream().map(Type::getName).collect(Collectors.toSet());
renameMap.forEach((src, dst) -> {
if (!columns.contains(src)) {
Expand Down Expand Up @@ -504,7 +500,7 @@ private void processBlock(

ColumnDescriptor descriptorOriginal = outSchema.getColumns().get(outColumnIdx);
ColumnDescriptor descriptorRenamed =
outSchemaWithRenamedColumns.getColumns().get(outColumnIdx);
getSchemaWithRenamedColumns(outSchema).getColumns().get(outColumnIdx);
BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(blockIdx);
String originalCreatedBy = reader.getFileMetaData().getCreatedBy();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,28 @@ public Builder ignoreJoinFilesMetadata(boolean ignoreJoinFilesMetadata) {
* @return a RewriterOptions
*/
public RewriteOptions build() {
checkPreconditions();
return new RewriteOptions(
conf,
inputFiles,
(inputFilesToJoin != null ? inputFilesToJoin : new ArrayList<>()),
outputFile,
pruneColumns,
newCodecName,
maskColumns,
(renameColumns == null
? new HashMap<>()
: renameColumns.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey, x -> x.getValue().trim()))),
encryptColumns,
fileEncryptionProperties,
indexCacheStrategy,
overwriteInputWithJoinColumns,
ignoreJoinFilesMetadata);
}

private void checkPreconditions() {
Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(), "Input file is required");
Preconditions.checkArgument(outputFile != null, "Output file is required");

Expand Down Expand Up @@ -619,25 +641,6 @@ public RewriteOptions build() {
encryptColumns != null && !encryptColumns.isEmpty(),
"Encrypt columns is required when FileEncryptionProperties is set");
}

return new RewriteOptions(
conf,
inputFiles,
(inputFilesToJoin != null ? inputFilesToJoin : new ArrayList<>()),
outputFile,
pruneColumns,
newCodecName,
maskColumns,
(renameColumns == null
? new HashMap<>()
: renameColumns.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey, x -> x.getValue().trim()))),
encryptColumns,
fileEncryptionProperties,
indexCacheStrategy,
overwriteInputWithJoinColumns,
ignoreJoinFilesMetadata);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -588,15 +588,14 @@ public void testMergeTwoFilesOnlyRenameColumn() throws Exception {
addGzipInputFile();
addUncompressedInputFile();

// Only merge two files but do not change anything.
List<Path> inputPaths = new ArrayList<>();
for (EncryptionTestFile inputFile : inputFiles) {
inputPaths.add(new Path(inputFile.getFileName()));
}
Map<String, String> renameColumns = ImmutableMap.of("Name", "NameRenamed");
List<String> pruneColumns = ImmutableList.of("Gender");
List<Path> inputPaths =
inputFiles.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList());
RewriteOptions.Builder builder = createBuilder(inputPaths);
RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy)
.renameColumns(ImmutableMap.of("Name", "NameRenamed"))
.prune(pruneColumns)
.build();

rewriter = new ParquetRewriter(options);
Expand All @@ -622,7 +621,7 @@ public void testMergeTwoFilesOnlyRenameColumn() throws Exception {
null);

// Verify the merged data are not changed
validateColumnData(Collections.emptySet(), Collections.emptySet(), null, false, renameColumns);
validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), null, false, renameColumns);

// Verify the page index
validatePageIndex(new HashSet<>(), false, renameColumns);
Expand Down Expand Up @@ -941,7 +940,6 @@ private MessageType createSchemaWithRenamed() {
"schema",
new PrimitiveType(OPTIONAL, INT64, "DocId"),
new PrimitiveType(REQUIRED, BINARY, "NameRenamed"),
new PrimitiveType(OPTIONAL, BINARY, "Gender"),
new PrimitiveType(REPEATED, FLOAT, "FloatFraction"),
new PrimitiveType(OPTIONAL, DOUBLE, "DoubleFraction"),
new GroupType(
Expand Down

0 comments on commit 169af05

Please sign in to comment.