diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 95203945ed..41242359c6 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -145,7 +145,6 @@ public class ParquetRewriter implements Closeable { private final Queue inputFiles = new LinkedList<>(); private final Queue inputFilesToJoin = new LinkedList<>(); private final MessageType outSchema; - private final MessageType outSchemaWithRenamedColumns; // The index cache strategy private final IndexCache.CacheStrategy indexCacheStrategy; private final boolean overwriteInputWithJoinColumns; @@ -158,23 +157,21 @@ public ParquetRewriter(RewriteOptions options) throws IOException { this.overwriteInputWithJoinColumns = options.getOverwriteInputWithJoinColumns(); this.renamedColumns = options.gerRenameColumns(); ParquetConfiguration conf = options.getParquetConfiguration(); - OutputFile out = options.getParquetOutputFile(); inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf)); inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), conf)); + this.outSchema = pruneColumnsInSchema(getSchema(), options.getPruneColumns()); + this.extraMetaData = getExtraMetadata(options); ensureSameSchema(inputFiles); ensureSameSchema(inputFilesToJoin); ensureRowCount(); + ensureRenamingCorrectness(outSchema, renamedColumns); + OutputFile out = options.getParquetOutputFile(); LOG.info( "Start rewriting {} input file(s) {} to {}", inputFiles.size() + inputFilesToJoin.size(), Stream.concat(options.getParquetInputFiles().stream(), options.getParquetInputFilesToJoin().stream()) .collect(Collectors.toList()), - out); - - this.outSchema = pruneColumnsInSchema(getSchema(), options.getPruneColumns()); - ensureRenamedColumnsCorrectness(outSchema, renamedColumns); - this.outSchemaWithRenamedColumns = getSchemaWithRenamedColumns(this.outSchema); - this.extraMetaData = getExtraMetadata(options); + options.getParquetOutputFile()); if (options.getMaskColumns() != null) { this.maskColumns = new HashMap<>(); @@ -191,7 +188,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException { ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE; writer = new ParquetFileWriter( out, - outSchemaWithRenamedColumns != null ? outSchemaWithRenamedColumns : outSchema, + renamedColumns.isEmpty() ? outSchema : getSchemaWithRenamedColumns(this.outSchema), writerMode, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, @@ -227,7 +224,6 @@ public ParquetRewriter( MaskMode maskMode) { this.writer = writer; this.outSchema = outSchema; - this.outSchemaWithRenamedColumns = outSchema; this.newCodecName = codecName; extraMetaData = new HashMap<>(meta.getFileMetaData().getKeyValueMetaData()); extraMetaData.put( @@ -366,7 +362,7 @@ private void ensureSameSchema(Queue inputFileReaders) { } } - private void ensureRenamedColumnsCorrectness(MessageType schema, Map renameMap) { + private void ensureRenamingCorrectness(MessageType schema, Map renameMap) { Set columns = schema.getFields().stream().map(Type::getName).collect(Collectors.toSet()); renameMap.forEach((src, dst) -> { if (!columns.contains(src)) { @@ -504,7 +500,7 @@ private void processBlock( ColumnDescriptor descriptorOriginal = outSchema.getColumns().get(outColumnIdx); ColumnDescriptor descriptorRenamed = - outSchemaWithRenamedColumns.getColumns().get(outColumnIdx); + getSchemaWithRenamedColumns(outSchema).getColumns().get(outColumnIdx); BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(blockIdx); String originalCreatedBy = reader.getFileMetaData().getCreatedBy(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index 66c68286d8..2a39b0eaeb 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -573,6 +573,28 @@ public Builder ignoreJoinFilesMetadata(boolean ignoreJoinFilesMetadata) { * @return a RewriterOptions */ public RewriteOptions build() { + checkPreconditions(); + return new RewriteOptions( + conf, + inputFiles, + (inputFilesToJoin != null ? inputFilesToJoin : new ArrayList<>()), + outputFile, + pruneColumns, + newCodecName, + maskColumns, + (renameColumns == null + ? new HashMap<>() + : renameColumns.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, x -> x.getValue().trim()))), + encryptColumns, + fileEncryptionProperties, + indexCacheStrategy, + overwriteInputWithJoinColumns, + ignoreJoinFilesMetadata); + } + + private void checkPreconditions() { Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(), "Input file is required"); Preconditions.checkArgument(outputFile != null, "Output file is required"); @@ -619,25 +641,6 @@ public RewriteOptions build() { encryptColumns != null && !encryptColumns.isEmpty(), "Encrypt columns is required when FileEncryptionProperties is set"); } - - return new RewriteOptions( - conf, - inputFiles, - (inputFilesToJoin != null ? inputFilesToJoin : new ArrayList<>()), - outputFile, - pruneColumns, - newCodecName, - maskColumns, - (renameColumns == null - ? new HashMap<>() - : renameColumns.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, x -> x.getValue().trim()))), - encryptColumns, - fileEncryptionProperties, - indexCacheStrategy, - overwriteInputWithJoinColumns, - ignoreJoinFilesMetadata); } } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index fc79a47b58..dca7d42762 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -588,15 +588,14 @@ public void testMergeTwoFilesOnlyRenameColumn() throws Exception { addGzipInputFile(); addUncompressedInputFile(); - // Only merge two files but do not change anything. - List inputPaths = new ArrayList<>(); - for (EncryptionTestFile inputFile : inputFiles) { - inputPaths.add(new Path(inputFile.getFileName())); - } Map renameColumns = ImmutableMap.of("Name", "NameRenamed"); + List pruneColumns = ImmutableList.of("Gender"); + List inputPaths = + inputFiles.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList()); RewriteOptions.Builder builder = createBuilder(inputPaths); RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy) .renameColumns(ImmutableMap.of("Name", "NameRenamed")) + .prune(pruneColumns) .build(); rewriter = new ParquetRewriter(options); @@ -622,7 +621,7 @@ public void testMergeTwoFilesOnlyRenameColumn() throws Exception { null); // Verify the merged data are not changed - validateColumnData(Collections.emptySet(), Collections.emptySet(), null, false, renameColumns); + validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), null, false, renameColumns); // Verify the page index validatePageIndex(new HashSet<>(), false, renameColumns); @@ -941,7 +940,6 @@ private MessageType createSchemaWithRenamed() { "schema", new PrimitiveType(OPTIONAL, INT64, "DocId"), new PrimitiveType(REQUIRED, BINARY, "NameRenamed"), - new PrimitiveType(OPTIONAL, BINARY, "Gender"), new PrimitiveType(REPEATED, FLOAT, "FloatFraction"), new PrimitiveType(OPTIONAL, DOUBLE, "DoubleFraction"), new GroupType(