Skip to content

Commit

Permalink
Delete secondary index backup directories after uploading files (#941)
Browse files Browse the repository at this point in the history
* CASS-2201 Consolidate logic to find secondary index directories

* CASS-2201 use correct file type for secondary index files in IncrementalBackup

* CASS-2201 Delete empty secondary index backup directories

* Responding to review comments and ensuring we wait for completion in IncrementalBackup.
  • Loading branch information
mattl-netflix authored May 25, 2021
1 parent 858fe1c commit 392633b
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 63 deletions.
46 changes: 26 additions & 20 deletions priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import static java.util.stream.Collectors.toSet;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.netflix.priam.backup.AbstractBackupPath.BackupFileType;
Expand All @@ -29,15 +31,16 @@
import com.netflix.priam.scheduler.Task;
import com.netflix.priam.utils.SystemUtils;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -70,28 +73,22 @@ public AbstractBackup(
* @param parent Parent dir
* @param type Type of file (META, SST, SNAP etc)
* @param async Upload the file(s) in async fashion if enabled.
* @param waitForCompletion wait for completion for all files to upload if using async API. If
* `false` it will queue the files and return with no guarantee to upload.
* @return List of files that are successfully uploaded as part of backup
* @throws Exception when there is failure in uploading files.
*/
protected ImmutableSet<AbstractBackupPath> upload(
final File parent, final BackupFileType type, boolean async, boolean waitForCompletion)
throws Exception {
ImmutableSet<AbstractBackupPath> bps = getBackupPaths(parent, type);
final List<Future<AbstractBackupPath>> futures = Lists.newArrayList();
for (AbstractBackupPath bp : bps) {
protected ImmutableList<ListenableFuture<AbstractBackupPath>> uploadAndDeleteAllFiles(
final File parent, final BackupFileType type, boolean async) throws Exception {
ImmutableSet<AbstractBackupPath> backupPaths = getBackupPaths(parent, type);
final ImmutableList.Builder<ListenableFuture<AbstractBackupPath>> futures =
ImmutableList.builder();
for (AbstractBackupPath bp : backupPaths) {
if (async) futures.add(fs.asyncUploadAndDelete(bp, 10));
else fs.uploadAndDelete(bp, 10);
}

// Wait for all files to be uploaded.
if (async && waitForCompletion) {
for (Future<AbstractBackupPath> future : futures)
future.get(); // This might throw exception if there is any error
else {
fs.uploadAndDelete(bp, 10);
futures.add(Futures.immediateFuture(bp));
}
}

return bps;
return futures.build();
}

protected ImmutableSet<AbstractBackupPath> getBackupPaths(File dir, BackupFileType type)
Expand Down Expand Up @@ -219,6 +216,15 @@ public static Set<Path> getBackupDirectories(IConfiguration config, String monit
return backupPaths;
}

protected static File[] getSecondaryIndexDirectories(File backupDir, String columnFamily) {
String reference = "." + columnFamily.toLowerCase(Locale.ROOT);
FileFilter filter =
(file) ->
file.getName().toLowerCase(Locale.ROOT).startsWith(reference)
&& isAReadableDirectory(file);
return Optional.ofNullable(backupDir.listFiles(filter)).orElse(new File[] {});
}

protected static boolean isAReadableDirectory(File dir) {
return dir.exists() && dir.isDirectory() && dir.canRead();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.netflix.priam.backup.AbstractBackupPath.BackupFileType;
Expand Down Expand Up @@ -61,7 +64,7 @@ public abstract class AbstractFileSystem implements IBackupFileSystem, EventGene
private final IConfiguration configuration;
protected final BackupMetrics backupMetrics;
private final Set<Path> tasksQueued;
private final ThreadPoolExecutor fileUploadExecutor;
private final ListeningExecutorService fileUploadExecutor;
private final ThreadPoolExecutor fileDownloadExecutor;

// This is going to be a write-thru cache containing the most frequently used items from remote
Expand Down Expand Up @@ -93,10 +96,11 @@ Also, we may want to have different TIMEOUT for each kind of operation (upload/d
.withName(backupMetrics.uploadQueueSize)
.monitorSize(uploadQueue);
this.fileUploadExecutor =
new BlockingSubmitThreadPoolExecutor(
configuration.getBackupThreads(),
uploadQueue,
configuration.getUploadTimeout());
MoreExecutors.listeningDecorator(
new BlockingSubmitThreadPoolExecutor(
configuration.getBackupThreads(),
uploadQueue,
configuration.getUploadTimeout()));

BlockingQueue<Runnable> downloadQueue =
new ArrayBlockingQueue<>(configuration.getDownloadQueueSize());
Expand Down Expand Up @@ -152,7 +156,7 @@ protected abstract void downloadFileImpl(final AbstractBackupPath path, String s
throws BackupRestoreException;

@Override
public Future<AbstractBackupPath> asyncUploadAndDelete(
public ListenableFuture<AbstractBackupPath> asyncUploadAndDelete(
final AbstractBackupPath path, final int retry) throws RejectedExecutionException {
return fileUploadExecutor.submit(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.netflix.priam.backup;

import com.google.common.util.concurrent.ListenableFuture;
import java.io.FileNotFoundException;
import java.nio.file.Path;
import java.util.Date;
Expand Down Expand Up @@ -89,7 +90,8 @@ void uploadAndDelete(AbstractBackupPath path, int retry)
* @throws RejectedExecutionException if the queue is full and TIMEOUT is reached while trying
* to add the work to the queue.
*/
Future<AbstractBackupPath> asyncUploadAndDelete(final AbstractBackupPath path, final int retry)
ListenableFuture<AbstractBackupPath> asyncUploadAndDelete(
final AbstractBackupPath path, final int retry)
throws FileNotFoundException, RejectedExecutionException, BackupRestoreException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package com.netflix.priam.backup;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
Expand All @@ -26,9 +30,7 @@
import com.netflix.priam.scheduler.SimpleTimer;
import com.netflix.priam.scheduler.TaskTimer;
import java.io.File;
import java.io.FileFilter;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -118,14 +120,22 @@ protected void processColumnFamily(String keyspace, String columnFamily, File ba
backupRestoreConfig.enableV2Backups() ? BackupFileType.SST_V2 : BackupFileType.SST;

// upload SSTables and components
upload(backupDir, fileType, config.enableAsyncIncremental(), true);
ImmutableList<ListenableFuture<AbstractBackupPath>> futures =
uploadAndDeleteAllFiles(backupDir, fileType, config.enableAsyncIncremental());
Futures.whenAllComplete(futures).call(() -> null, MoreExecutors.directExecutor());

// Next, upload secondary indexes
FileFilter filter =
(file) ->
file.getName().startsWith("." + columnFamily) && isAReadableDirectory(file);
for (File subDir : Optional.ofNullable(backupDir.listFiles(filter)).orElse(new File[] {})) {
upload(subDir, fileType, config.enableAsyncIncremental(), true);
fileType = BackupFileType.SECONDARY_INDEX_V2;
for (File dir : getSecondaryIndexDirectories(backupDir, columnFamily)) {
futures = uploadAndDeleteAllFiles(dir, fileType, config.enableAsyncIncremental());
Futures.whenAllComplete(futures)
.call(
() -> {
if (FileUtils.sizeOfDirectory(dir) == 0)
FileUtils.deleteQuietly(dir);
return null;
},
MoreExecutors.directExecutor());
}
}
}
12 changes: 10 additions & 2 deletions priam/src/main/java/com/netflix/priam/backup/SnapshotBackup.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package com.netflix.priam.backup;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
Expand All @@ -36,6 +38,7 @@
import java.nio.file.Path;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -209,8 +212,13 @@ protected void processColumnFamily(String keyspace, String columnFamily, File ba

forgottenFilesManager.findAndMoveForgottenFiles(snapshotInstant, snapshotDir);
// Add files to this dir
abstractBackupPaths.addAll(
upload(snapshotDir, BackupFileType.SNAP, config.enableAsyncSnapshot(), true));

ImmutableList<ListenableFuture<AbstractBackupPath>> futures =
uploadAndDeleteAllFiles(
snapshotDir, BackupFileType.SNAP, config.enableAsyncSnapshot());
for (Future<AbstractBackupPath> future : futures) {
abstractBackupPaths.add(future.get());
}
}

private static boolean isValidBackupDir(Path backupDir) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package com.netflix.priam.backupv2;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Provider;
import com.netflix.priam.backup.*;
import com.netflix.priam.config.IBackupRestoreConfig;
Expand All @@ -35,12 +38,12 @@
import java.nio.file.Path;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
Expand Down Expand Up @@ -79,6 +82,7 @@ public class SnapshotMetaTask extends AbstractBackup {
private static final Lock lock = new ReentrantLock();
private final IBackupStatusMgr snapshotStatusMgr;
private final InstanceIdentity instanceIdentity;
private final ExecutorService threadPool;

private enum MetaStep {
META_GENERATION,
Expand Down Expand Up @@ -106,6 +110,7 @@ private enum MetaStep {
config.getSnapshotIncludeCFList(), config.getSnapshotExcludeCFList());
this.metaFileWriter = metaFileWriter;
this.metaProxy = metaProxy;
this.threadPool = Executors.newSingleThreadExecutor();
}

/**
Expand Down Expand Up @@ -273,15 +278,22 @@ private void uploadAllFiles(final String columnFamily, final File backupDir) thr
// Process each snapshot of SNAPSHOT_PREFIX
// We do not want to wait for completion and we just want to add them to queue. This
// is to ensure that next run happens on time.
upload(snapshotDirectory, AbstractBackupPath.BackupFileType.SST_V2, true, false);
AbstractBackupPath.BackupFileType type = AbstractBackupPath.BackupFileType.SST_V2;
uploadAndDeleteAllFiles(snapshotDirectory, type, true);

// Next, upload secondary indexes
type = AbstractBackupPath.BackupFileType.SECONDARY_INDEX_V2;
ImmutableList<ListenableFuture<AbstractBackupPath>> futures;
for (File subDir : getSecondaryIndexDirectories(snapshotDirectory, columnFamily)) {
upload(
subDir,
AbstractBackupPath.BackupFileType.SECONDARY_INDEX_V2,
true,
false);
futures = uploadAndDeleteAllFiles(subDir, type, true);
Futures.whenAllComplete(futures)
.call(
() -> {
if (FileUtils.sizeOfDirectory(subDir) == 0)
FileUtils.deleteQuietly(subDir);
return null;
},
threadPool);
}
}
}
Expand Down Expand Up @@ -363,14 +375,6 @@ private static Optional<String> getPrefix(File file) {
return Optional.ofNullable(prefix);
}

private List<File> getSecondaryIndexDirectories(File snapshotDir, String columnFamily)
throws IOException {
return Files.walk(snapshotDir.toPath(), Integer.MAX_VALUE)
.map(Path::toFile)
.filter(f -> f.getName().startsWith("." + columnFamily) && isAReadableDirectory(f))
.collect(Collectors.toList());
}

@VisibleForTesting
void setSnapshotName(String snapshotName) {
this.snapshotName = snapshotName;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.netflix.priam.backup;

import com.google.appengine.repackaged.com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableList;
import com.google.common.truth.Truth;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provider;
Expand Down Expand Up @@ -115,13 +115,15 @@ public String getName() {
public void testCorrectCompressionType() throws Exception {
File parent = new File(DIRECTORY);
AbstractBackupPath.BackupFileType backupFileType = AbstractBackupPath.BackupFileType.SST_V2;
ImmutableSet<AbstractBackupPath> paths =
abstractBackup.upload(parent, backupFileType, false, false);
AbstractBackupPath abstractBackupPath =
paths.stream()
.filter(path -> path.getFileName().equals(tablePart))
.findAny()
.orElseThrow(IllegalStateException::new);
ImmutableList<ListenableFuture<AbstractBackupPath>> futures =
abstractBackup.uploadAndDeleteAllFiles(parent, backupFileType, false);
AbstractBackupPath abstractBackupPath = null;
for (ListenableFuture<AbstractBackupPath> future : futures) {
if (future.get().getFileName().equals(tablePart)) {
abstractBackupPath = future.get();
break;
}
}
Truth.assertThat(abstractBackupPath.getCompression()).isEqualTo(compressionAlgorithm);
}
}
Loading

0 comments on commit 392633b

Please sign in to comment.