From 103b82228874079443d7bf2d3bd5b604fdd606e2 Mon Sep 17 00:00:00 2001 From: KnightChess <981159963@qq.com> Date: Wed, 17 Jul 2024 15:41:46 +0800 Subject: [PATCH] [HUDI-7988] ListingBasedRollbackStrategy support log compact (#11631) --- .../ListingBasedRollbackStrategy.java | 8 ++ ...TestMergeOnReadRollbackActionExecutor.java | 96 ++++++++++++++++++- .../timeline/MetadataConversionUtils.java | 4 - 3 files changed, 103 insertions(+), 5 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java index 5f9b3dcfb220f..af3a88643a21d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java @@ -103,6 +103,10 @@ public List getRollbackRequests(HoodieInstant instantToRo if (commitMetadataOptional.isPresent()) { isCompaction.set(commitMetadataOptional.get().getOperationType() == WriteOperationType.COMPACT); } + AtomicBoolean isLogCompaction = new AtomicBoolean(false); + if (commitMetadataOptional.isPresent()) { + isLogCompaction.set(commitMetadataOptional.get().getOperationType() == WriteOperationType.LOG_COMPACT); + } return context.flatMap(partitionPaths, partitionPath -> { List hoodieRollbackRequests = new ArrayList<>(partitionPaths.size()); @@ -125,6 +129,9 @@ public List getRollbackRequests(HoodieInstant instantToRo if (isCompaction.get()) { // compaction's action in hoodie instant will be "commit". So, we might need to override. action = HoodieTimeline.COMPACTION_ACTION; } + if (isLogCompaction.get()) { + action = HoodieTimeline.LOG_COMPACTION_ACTION; + } switch (action) { case HoodieTimeline.COMMIT_ACTION: case HoodieTimeline.REPLACE_COMMIT_ACTION: @@ -152,6 +159,7 @@ public List getRollbackRequests(HoodieInstant instantToRo } break; case HoodieTimeline.DELTA_COMMIT_ACTION: + case HoodieTimeline.LOG_COMPACTION_ACTION: // In case all data was inserts and the commit failed, delete the file belonging to that commit // We do not know fileIds for inserts (first inserts are either log files or base files), diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java index 4113dddda937b..da43f5b1578f0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java @@ -18,9 +18,11 @@ package org.apache.hudi.table.action.rollback; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.model.FileSlice; @@ -36,12 +38,14 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.testutils.Assertions; import org.apache.hudi.testutils.MetadataMergeWriteStatus; @@ -152,6 +156,96 @@ public void testMergeOnReadRollbackActionExecutor(boolean isUsingMarkers) throws assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, "002").doesMarkerDirExist()); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testMergeOnReadRollbackLogCompactActionExecutorWithListingStrategy(boolean isComplete) throws IOException { + //1. prepare data and assert data result + List firstPartitionCommit2FileSlices = new ArrayList<>(); + List secondPartitionCommit2FileSlices = new ArrayList<>(); + HoodieWriteConfig cfg = getConfigBuilder() + .withRollbackUsingMarkers(false).withAutoCommit(false) + .withMetadataConfig( + HoodieMetadataConfig.newBuilder().enable(true).build()) + .build(); + twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, true); + List firstPartitionCommit2LogFiles = new ArrayList<>(); + List secondPartitionCommit2LogFiles = new ArrayList<>(); + firstPartitionCommit2FileSlices.get(0).getLogFiles().collect(Collectors.toList()).forEach(logFile -> firstPartitionCommit2LogFiles.add(logFile)); + assertEquals(1, firstPartitionCommit2LogFiles.size()); + secondPartitionCommit2FileSlices.get(0).getLogFiles().collect(Collectors.toList()).forEach(logFile -> secondPartitionCommit2LogFiles.add(logFile)); + assertEquals(1, secondPartitionCommit2LogFiles.size()); + + //2. log compact + cfg = getConfigBuilder() + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withLogCompactionBlocksThreshold(1) + .withMaxNumDeltaCommitsBeforeCompaction(1) + .withInlineCompactionTriggerStrategy(CompactionTriggerStrategy.NUM_COMMITS).build()) + .withRollbackUsingMarkers(false).withAutoCommit(false) + .withMetadataConfig( + HoodieMetadataConfig.newBuilder().enable(false).build()) + .build(); + + String action = HoodieTimeline.LOG_COMPACTION_ACTION; + if (isComplete) { + cfg.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true"); + action = HoodieTimeline.DELTA_COMMIT_ACTION; + } + SparkRDDWriteClient client = getHoodieWriteClient(cfg); + client.scheduleLogCompactionAtInstant("003", Option.empty()); + client.logCompact("003"); + + //3. rollback log compact + metaClient.reloadActiveTimeline(); + HoodieInstant rollBackInstant = new HoodieInstant(!isComplete, action, "003"); + HoodieTable table = this.getHoodieTable(metaClient, cfg); + BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor = + new BaseRollbackPlanActionExecutor(context, cfg, table, "004", rollBackInstant, false, + cfg.shouldRollbackUsingMarkers(), false); + mergeOnReadRollbackPlanActionExecutor.execute().get(); + MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor( + context, + cfg, + table, + "004", + rollBackInstant, + true, + false); + //4. assert the rollback stat + final HoodieRollbackMetadata execute = mergeOnReadRollbackActionExecutor.execute(); + Map rollbackMetadata = execute.getPartitionMetadata(); + assertEquals(2, rollbackMetadata.size()); + + for (Map.Entry entry : rollbackMetadata.entrySet()) { + HoodieRollbackPartitionMetadata meta = entry.getValue(); + assertEquals(0, meta.getFailedDeleteFiles().size()); + assertEquals(1, meta.getSuccessDeleteFiles().size()); + } + + //4. assert file group after rollback, and compare to the rollbackstat + // assert the first partition data and log file size + metaClient.reloadActiveTimeline(); + table = this.getHoodieTable(metaClient, cfg); + List firstPartitionRollBack1FileGroups = table.getFileSystemView().getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList()); + assertEquals(1, firstPartitionRollBack1FileGroups.size()); + List firstPartitionRollBack1FileSlices = firstPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()); + assertEquals(1, firstPartitionRollBack1FileSlices.size()); + FileSlice firstPartitionRollBack1FileSlice = firstPartitionRollBack1FileSlices.get(0); + List firstPartitionRollBackLogFiles = firstPartitionRollBack1FileSlice.getLogFiles().collect(Collectors.toList()); + assertEquals(1, firstPartitionRollBackLogFiles.size()); + + // assert the second partition data and log file size + List secondPartitionRollBack1FileGroups = table.getFileSystemView().getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList()); + assertEquals(1, secondPartitionRollBack1FileGroups.size()); + List secondPartitionRollBack1FileSlices = secondPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()); + assertEquals(1, secondPartitionRollBack1FileSlices.size()); + FileSlice secondPartitionRollBack1FileSlice = secondPartitionRollBack1FileSlices.get(0); + List secondPartitionRollBackLogFiles = secondPartitionRollBack1FileSlice.getLogFiles().collect(Collectors.toList()); + assertEquals(1, secondPartitionRollBackLogFiles.size()); + + assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, "003").doesMarkerDirExist()); + } + @Test public void testMergeOnReadRestoreCompactionCommit() throws IOException { boolean isUsingMarkers = false; @@ -159,7 +253,7 @@ public void testMergeOnReadRestoreCompactionCommit() throws IOException { // 1. ingest data to partition 3. //just generate two partitions - HoodieTestDataGenerator dataGenPartition3 = new HoodieTestDataGenerator(new String[]{DEFAULT_THIRD_PARTITION_PATH}); + HoodieTestDataGenerator dataGenPartition3 = new HoodieTestDataGenerator(new String[] {DEFAULT_THIRD_PARTITION_PATH}); HoodieTestDataGenerator.writePartitionMetadataDeprecated(storage, new String[] {DEFAULT_THIRD_PARTITION_PATH}, basePath); SparkRDDWriteClient client = getHoodieWriteClient(cfg); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java index 7bd4068dab3da..ed741acf3650b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java @@ -29,7 +29,6 @@ import org.apache.hudi.common.model.ActionType; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CompactionUtils; @@ -262,9 +261,6 @@ public static T convertCommitMetadata(HoodieCommi } hoodieCommitMetadata.getPartitionToWriteStats().remove(null); org.apache.hudi.avro.model.HoodieCommitMetadata avroMetaData = JsonUtils.getObjectMapper().convertValue(hoodieCommitMetadata, org.apache.hudi.avro.model.HoodieCommitMetadata.class); - if (hoodieCommitMetadata.getCompacted()) { - avroMetaData.setOperationType(WriteOperationType.COMPACT.name()); - } return (T) avroMetaData; }