From 27c267e47831c172f0007c84ddc7adedc3f0c893 Mon Sep 17 00:00:00 2001 From: mattl-netflix <63665634+mattl-netflix@users.noreply.github.com> Date: Thu, 2 Jun 2022 14:59:07 -0700 Subject: [PATCH] Dynamic Rate Limiting of Snapshots (#975) * CASS-2011 update interface to accommodate dependency change * CASS-2011 move backup cleaning outside of getTimer method * CASS-2011 introduce a target time for upload completion. Currently always the epoch which implies a no-op * CASS-2011 Change AbstractFileSystem API to pass target instant to fileUploadImpl * CASS-2011 Delete already-uploaded files to get accurate estimates of remaining bytes to upload. * CASS-2011 create a rate limiter that dynamically adjusts its throttle based on the bytes still to upload in all remaining snapshots and a user-specified target time. Adjust the throttle only when we've deviated by a user-configurable threshold to ensure the rate limiter is not constantly adjusted. In that case, it would be redundant as it does not throttle the subsequent file after an adjustment. Ensure that the target does not exceed the earlier of the next scheduled snapshot or the time at which we would fail to meet our backup verification SLO. * CASS-2011 Add unit tests and associated refactoring for BackupDynamicRateLimiter. --- .../priam/aws/S3EncryptedFileSystem.java | 11 +- .../com/netflix/priam/aws/S3FileSystem.java | 21 ++- .../netflix/priam/aws/S3FileSystemBase.java | 6 +- .../netflix/priam/backup/AbstractBackup.java | 15 +- .../priam/backup/AbstractFileSystem.java | 12 +- .../backup/BackupDynamicRateLimiter.java | 52 ++++++ .../priam/backup/BackupVerification.java | 21 ++- .../netflix/priam/backup/DirectorySize.java | 10 ++ .../priam/backup/DynamicRateLimiter.java | 9 + .../priam/backup/IBackupFileSystem.java | 20 ++- .../priam/backup/IncrementalBackup.java | 2 +- .../priam/backup/SnapshotDirectorySize.java | 50 ++++++ .../priam/backupv2/BackupV2Service.java | 5 +- .../priam/backupv2/FileUploadResult.java | 8 + .../priam/backupv2/MetaFileWriterBuilder.java | 9 +- .../priam/backupv2/SnapshotMetaTask.java | 109 ++++++++---- .../netflix/priam/config/IConfiguration.java | 13 ++ .../priam/config/PriamConfiguration.java | 10 ++ .../google/GoogleEncryptedFileSystem.java | 4 +- .../netflix/priam/backup/BRTestModule.java | 10 +- .../priam/backup/FakeBackupFileSystem.java | 4 +- .../priam/backup/FakeDynamicRateLimiter.java | 8 + .../priam/backup/NullBackupFileSystem.java | 4 +- .../priam/backup/TestAbstractFileSystem.java | 7 +- .../backup/TestBackupDynamicRateLimiter.java | 159 ++++++++++++++++++ .../priam/backupv2/TestSnapshotMetaTask.java | 2 +- .../priam/config/FakeConfiguration.java | 7 + 27 files changed, 518 insertions(+), 70 deletions(-) create mode 100644 priam/src/main/java/com/netflix/priam/backup/BackupDynamicRateLimiter.java create mode 100644 priam/src/main/java/com/netflix/priam/backup/DirectorySize.java create mode 100644 priam/src/main/java/com/netflix/priam/backup/DynamicRateLimiter.java create mode 100644 priam/src/main/java/com/netflix/priam/backup/SnapshotDirectorySize.java create mode 100644 priam/src/test/java/com/netflix/priam/backup/FakeDynamicRateLimiter.java create mode 100644 priam/src/test/java/com/netflix/priam/backup/TestBackupDynamicRateLimiter.java diff --git a/priam/src/main/java/com/netflix/priam/aws/S3EncryptedFileSystem.java b/priam/src/main/java/com/netflix/priam/aws/S3EncryptedFileSystem.java index 9012af932..eeeaa2ca4 100755 --- a/priam/src/main/java/com/netflix/priam/aws/S3EncryptedFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/aws/S3EncryptedFileSystem.java @@ -25,6 +25,7 @@ import com.google.inject.name.Named; import com.netflix.priam.backup.AbstractBackupPath; import com.netflix.priam.backup.BackupRestoreException; +import com.netflix.priam.backup.DynamicRateLimiter; import com.netflix.priam.backup.RangeReadInputStream; import com.netflix.priam.compress.ChunkedStream; import com.netflix.priam.compress.ICompression; @@ -37,6 +38,7 @@ import java.io.*; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Instant; import java.util.Iterator; import java.util.List; import org.apache.commons.io.IOUtils; @@ -49,6 +51,7 @@ public class S3EncryptedFileSystem extends S3FileSystemBase { private static final Logger logger = LoggerFactory.getLogger(S3EncryptedFileSystem.class); private final IFileCryptography encryptor; + private final DynamicRateLimiter dynamicRateLimiter; @Inject public S3EncryptedFileSystem( @@ -59,10 +62,12 @@ public S3EncryptedFileSystem( @Named("filecryptoalgorithm") IFileCryptography fileCryptography, BackupMetrics backupMetrics, BackupNotificationMgr backupNotificationMgr, - InstanceInfo instanceInfo) { + InstanceInfo instanceInfo, + DynamicRateLimiter dynamicRateLimiter) { super(pathProvider, compress, config, backupMetrics, backupNotificationMgr); this.encryptor = fileCryptography; + this.dynamicRateLimiter = dynamicRateLimiter; super.s3Client = AmazonS3Client.builder() .withCredentials(cred.getAwsCredentialProvider()) @@ -97,7 +102,8 @@ s3Client, getShard(), super.getFileSize(remotePath), remotePath)) { } @Override - protected long uploadFileImpl(AbstractBackupPath path) throws BackupRestoreException { + protected long uploadFileImpl(AbstractBackupPath path, Instant target) + throws BackupRestoreException { Path localPath = Paths.get(path.getBackupFile().getAbsolutePath()); String remotePath = path.getRemotePath(); @@ -150,6 +156,7 @@ protected long uploadFileImpl(AbstractBackupPath path) throws BackupRestoreExcep byte[] chunk = chunks.next(); // throttle upload to endpoint rateLimiter.acquire(chunk.length); + dynamicRateLimiter.acquire(path, target, chunk.length); DataPart dp = new DataPart( diff --git a/priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java b/priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java index 363058d76..e24f68100 100644 --- a/priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java +++ b/priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java @@ -27,6 +27,7 @@ import com.netflix.priam.aws.auth.IS3Credential; import com.netflix.priam.backup.AbstractBackupPath; import com.netflix.priam.backup.BackupRestoreException; +import com.netflix.priam.backup.DynamicRateLimiter; import com.netflix.priam.backup.RangeReadInputStream; import com.netflix.priam.compress.ChunkedStream; import com.netflix.priam.compress.CompressionType; @@ -39,6 +40,7 @@ import java.io.*; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -53,6 +55,7 @@ public class S3FileSystem extends S3FileSystemBase { private static final Logger logger = LoggerFactory.getLogger(S3FileSystem.class); private static final long MAX_BUFFER_SIZE = 5L * 1024L * 1024L; + private final DynamicRateLimiter dynamicRateLimiter; @Inject public S3FileSystem( @@ -62,13 +65,15 @@ public S3FileSystem( final IConfiguration config, BackupMetrics backupMetrics, BackupNotificationMgr backupNotificationMgr, - InstanceInfo instanceInfo) { + InstanceInfo instanceInfo, + DynamicRateLimiter dynamicRateLimiter) { super(pathProvider, compress, config, backupMetrics, backupNotificationMgr); s3Client = AmazonS3Client.builder() .withCredentials(cred.getAwsCredentialProvider()) .withRegion(instanceInfo.getRegion()) .build(); + this.dynamicRateLimiter = dynamicRateLimiter; } @Override @@ -113,7 +118,8 @@ private ObjectMetadata getObjectMetadata(File file) { return ret; } - private long uploadMultipart(AbstractBackupPath path) throws BackupRestoreException { + private long uploadMultipart(AbstractBackupPath path, Instant target) + throws BackupRestoreException { Path localPath = Paths.get(path.getBackupFile().getAbsolutePath()); String remotePath = path.getRemotePath(); long chunkSize = getChunkSize(localPath); @@ -137,6 +143,7 @@ private long uploadMultipart(AbstractBackupPath path) throws BackupRestoreExcept while (chunks.hasNext()) { byte[] chunk = chunks.next(); rateLimiter.acquire(chunk.length); + dynamicRateLimiter.acquire(path, target, chunk.length); DataPart dp = new DataPart(++partNum, chunk, prefix, remotePath, uploadId); S3PartUploader partUploader = new S3PartUploader(s3Client, dp, partETags, partsPut); compressedFileSize += chunk.length; @@ -163,12 +170,13 @@ private long uploadMultipart(AbstractBackupPath path) throws BackupRestoreExcept } } - protected long uploadFileImpl(AbstractBackupPath path) throws BackupRestoreException { + protected long uploadFileImpl(AbstractBackupPath path, Instant target) + throws BackupRestoreException { Path localPath = Paths.get(path.getBackupFile().getAbsolutePath()); String remotePath = path.getRemotePath(); long chunkSize = config.getBackupChunkSize(); File localFile = localPath.toFile(); - if (localFile.length() >= chunkSize) return uploadMultipart(path); + if (localFile.length() >= chunkSize) return uploadMultipart(path, target); String prefix = config.getBackupPrefix(); if (logger.isDebugEnabled()) logger.debug("PUTing {}/{}", prefix, remotePath); @@ -182,7 +190,10 @@ protected long uploadFileImpl(AbstractBackupPath path) throws BackupRestoreExcep byte[] chunk = byteArrayOutputStream.toByteArray(); long compressedFileSize = chunk.length; // C* snapshots may have empty files. That is probably unintentional. - if (chunk.length > 0) rateLimiter.acquire(chunk.length); + if (chunk.length > 0) { + rateLimiter.acquire(chunk.length); + dynamicRateLimiter.acquire(path, target, chunk.length); + } ObjectMetadata objectMetadata = getObjectMetadata(localFile); objectMetadata.setContentLength(chunk.length); ByteArrayInputStream inputStream = new ByteArrayInputStream(chunk); diff --git a/priam/src/main/java/com/netflix/priam/aws/S3FileSystemBase.java b/priam/src/main/java/com/netflix/priam/aws/S3FileSystemBase.java index 4ff3d10df..5703424f2 100755 --- a/priam/src/main/java/com/netflix/priam/aws/S3FileSystemBase.java +++ b/priam/src/main/java/com/netflix/priam/aws/S3FileSystemBase.java @@ -136,13 +136,13 @@ public void visit(LifecyclePrefixPredicate lifecyclePrefixPredicate) { @Override public void visit(LifecycleTagPredicate lifecycleTagPredicate) {} - @Override - public void visit(LifecycleAndOperator lifecycleAndOperator) {} - @Override public void visit( LifecycleObjectSizeGreaterThanPredicate lifecycleObjectSizeGreaterThanPredicate) {} + @Override + public void visit(LifecycleAndOperator lifecycleAndOperator) {} + @Override public void visit( LifecycleObjectSizeLessThanPredicate lifecycleObjectSizeLessThanPredicate) {} diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java b/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java index 583a5d3f3..282eecf5c 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java @@ -37,6 +37,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Instant; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -65,6 +66,12 @@ public AbstractBackup( this.fs = backupFileSystemCtx.getFileStrategy(config); } + /** Overload that uploads files without any custom throttling */ + protected ImmutableList> uploadAndDeleteAllFiles( + final File parent, final BackupFileType type, boolean async) throws Exception { + return uploadAndDeleteAllFiles(parent, type, async, Instant.EPOCH); + } + /** * Upload files in the specified dir. Does not delete the file in case of error. The files are * uploaded serially or async based on flag provided. @@ -72,18 +79,20 @@ public AbstractBackup( * @param parent Parent dir * @param type Type of file (META, SST, SNAP etc) * @param async Upload the file(s) in async fashion if enabled. + * @param target target time of completion of the batch of files * @return List of files that are successfully uploaded as part of backup * @throws Exception when there is failure in uploading files. */ protected ImmutableList> uploadAndDeleteAllFiles( - final File parent, final BackupFileType type, boolean async) throws Exception { + final File parent, final BackupFileType type, boolean async, Instant target) + throws Exception { ImmutableSet backupPaths = getBackupPaths(parent, type); final ImmutableList.Builder> futures = ImmutableList.builder(); for (AbstractBackupPath bp : backupPaths) { - if (async) futures.add(fs.asyncUploadAndDelete(bp, 10)); + if (async) futures.add(fs.asyncUploadAndDelete(bp, 10, target)); else { - fs.uploadAndDelete(bp, 10); + fs.uploadAndDelete(bp, 10, target); futures.add(Futures.immediateFuture(bp)); } } diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java index 84b74d9bc..0737ac639 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java @@ -38,6 +38,7 @@ import java.io.File; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Instant; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -157,16 +158,17 @@ protected abstract void downloadFileImpl(final AbstractBackupPath path, String s @Override public ListenableFuture asyncUploadAndDelete( - final AbstractBackupPath path, final int retry) throws RejectedExecutionException { + final AbstractBackupPath path, final int retry, Instant target) + throws RejectedExecutionException { return fileUploadExecutor.submit( () -> { - uploadAndDelete(path, retry); + uploadAndDelete(path, retry, target); return path; }); } @Override - public void uploadAndDelete(final AbstractBackupPath path, final int retry) + public void uploadAndDelete(final AbstractBackupPath path, final int retry, Instant target) throws BackupRestoreException { Path localPath = Paths.get(path.getBackupFile().getAbsolutePath()); File localFile = localPath.toFile(); @@ -189,7 +191,7 @@ public void uploadAndDelete(final AbstractBackupPath path, final int retry) new BoundedExponentialRetryCallable(500, 10000, retry) { @Override public Long retriableCall() throws Exception { - return uploadFileImpl(path); + return uploadFileImpl(path, target); } }.call(); @@ -270,7 +272,7 @@ public void deleteRemoteFiles(List remotePaths) throws BackupRestoreExcept protected abstract boolean doesRemoteFileExist(Path remotePath); - protected abstract long uploadFileImpl(final AbstractBackupPath path) + protected abstract long uploadFileImpl(final AbstractBackupPath path, Instant target) throws BackupRestoreException; @Override diff --git a/priam/src/main/java/com/netflix/priam/backup/BackupDynamicRateLimiter.java b/priam/src/main/java/com/netflix/priam/backup/BackupDynamicRateLimiter.java new file mode 100644 index 000000000..4b301e321 --- /dev/null +++ b/priam/src/main/java/com/netflix/priam/backup/BackupDynamicRateLimiter.java @@ -0,0 +1,52 @@ +package com.netflix.priam.backup; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.RateLimiter; +import com.netflix.priam.config.IConfiguration; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import javax.inject.Inject; + +public class BackupDynamicRateLimiter implements DynamicRateLimiter { + + private final Clock clock; + private final IConfiguration config; + private final DirectorySize dirSize; + private final RateLimiter rateLimiter; + + @Inject + BackupDynamicRateLimiter(IConfiguration config, Clock clock, DirectorySize dirSize) { + this.clock = clock; + this.config = config; + this.dirSize = dirSize; + this.rateLimiter = RateLimiter.create(Double.MAX_VALUE); + } + + @Override + public void acquire(AbstractBackupPath path, Instant target, int permits) { + if (target.equals(Instant.EPOCH) + || !path.getBackupFile() + .getAbsolutePath() + .contains(AbstractBackup.SNAPSHOT_FOLDER)) { + return; + } + long secondsRemaining = Duration.between(clock.instant(), target).getSeconds(); + if (secondsRemaining < 1) { + // skip file system checks when unnecessary + return; + } + int backupThreads = config.getBackupThreads(); + Preconditions.checkState(backupThreads > 0); + long bytesPerThread = this.dirSize.getBytes(config.getDataFileLocation()) / backupThreads; + if (bytesPerThread < 1) { + return; + } + double newRate = (double) bytesPerThread / secondsRemaining; + double oldRate = rateLimiter.getRate(); + if ((Math.abs(newRate - oldRate) / oldRate) > config.getRateLimitChangeThreshold()) { + rateLimiter.setRate(newRate); + } + rateLimiter.acquire(permits); + } +} diff --git a/priam/src/main/java/com/netflix/priam/backup/BackupVerification.java b/priam/src/main/java/com/netflix/priam/backup/BackupVerification.java index 71102f3a7..3dfc1bdfb 100644 --- a/priam/src/main/java/com/netflix/priam/backup/BackupVerification.java +++ b/priam/src/main/java/com/netflix/priam/backup/BackupVerification.java @@ -23,6 +23,7 @@ import com.netflix.priam.utils.DateUtil.DateRange; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Instant; import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,7 @@ public class BackupVerification { private final IMetaProxy metaV2Proxy; private final IBackupStatusMgr backupStatusMgr; private final Provider abstractBackupPathProvider; + private BackupVerificationResult latestResult; @Inject BackupVerification( @@ -83,14 +85,14 @@ public Optional verifyBackup( for (BackupMetadata backupMetadata : metadata) { if (backupMetadata.getLastValidated() != null && !force) { // Backup is already validated. Nothing to do. - BackupVerificationResult result = new BackupVerificationResult(); - result.valid = true; - result.manifestAvailable = true; - result.snapshotInstant = backupMetadata.getStart().toInstant(); + latestResult = new BackupVerificationResult(); + latestResult.valid = true; + latestResult.manifestAvailable = true; + latestResult.snapshotInstant = backupMetadata.getStart().toInstant(); Path snapshotLocation = Paths.get(backupMetadata.getSnapshotLocation()); - result.remotePath = + latestResult.remotePath = snapshotLocation.subpath(1, snapshotLocation.getNameCount()).toString(); - return Optional.of(result); + return Optional.of(latestResult); } BackupVerificationResult backupVerificationResult = verifyBackup(metaProxy, backupMetadata); @@ -102,9 +104,11 @@ public Optional verifyBackup( if (backupVerificationResult.valid) { backupMetadata.setLastValidated(new Date(DateUtil.getInstant().toEpochMilli())); backupStatusMgr.update(backupMetadata); + latestResult = backupVerificationResult; return Optional.of(backupVerificationResult); } } + latestResult = null; return Optional.empty(); } @@ -145,6 +149,11 @@ public List verifyAllBackups( return result; } + /** returns the latest valid backup verification result if we have found one within the SLO * */ + public Optional getLatestVerfifiedBackupTime() { + return latestResult == null ? Optional.empty() : Optional.of(latestResult.snapshotInstant); + } + private BackupVerificationResult verifyBackup( IMetaProxy metaProxy, BackupMetadata latestBackupMetaData) { Path metadataLocation = Paths.get(latestBackupMetaData.getSnapshotLocation()); diff --git a/priam/src/main/java/com/netflix/priam/backup/DirectorySize.java b/priam/src/main/java/com/netflix/priam/backup/DirectorySize.java new file mode 100644 index 000000000..1dc44ec35 --- /dev/null +++ b/priam/src/main/java/com/netflix/priam/backup/DirectorySize.java @@ -0,0 +1,10 @@ +package com.netflix.priam.backup; + +import com.google.inject.ImplementedBy; + +/** estimates the number of bytes remaining to upload in a snapshot */ +@ImplementedBy(SnapshotDirectorySize.class) +public interface DirectorySize { + /** return the total bytes of all snapshot files south of location in the filesystem */ + long getBytes(String location); +} diff --git a/priam/src/main/java/com/netflix/priam/backup/DynamicRateLimiter.java b/priam/src/main/java/com/netflix/priam/backup/DynamicRateLimiter.java new file mode 100644 index 000000000..dd1417d7f --- /dev/null +++ b/priam/src/main/java/com/netflix/priam/backup/DynamicRateLimiter.java @@ -0,0 +1,9 @@ +package com.netflix.priam.backup; + +import com.google.inject.ImplementedBy; +import java.time.Instant; + +@ImplementedBy(BackupDynamicRateLimiter.class) +public interface DynamicRateLimiter { + void acquire(AbstractBackupPath dir, Instant target, int tokens); +} diff --git a/priam/src/main/java/com/netflix/priam/backup/IBackupFileSystem.java b/priam/src/main/java/com/netflix/priam/backup/IBackupFileSystem.java index 7999958a2..176958e2b 100644 --- a/priam/src/main/java/com/netflix/priam/backup/IBackupFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/backup/IBackupFileSystem.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ListenableFuture; import java.io.FileNotFoundException; import java.nio.file.Path; +import java.time.Instant; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -55,6 +56,12 @@ void downloadFile(AbstractBackupPath path, String suffix, int retry) Future asyncDownloadFile(final AbstractBackupPath path, final int retry) throws BackupRestoreException, RejectedExecutionException; + /** Overload that uploads as fast as possible without any custom throttling */ + default void uploadAndDelete(AbstractBackupPath path, int retry) + throws FileNotFoundException, BackupRestoreException { + uploadAndDelete(path, retry, Instant.EPOCH); + } + /** * Upload the local file to its remote counterpart. Both locations are embedded within the path * parameter. De-duping of the file to upload will always be done by comparing the @@ -66,14 +73,22 @@ Future asyncDownloadFile(final AbstractBackupPath path, final int retry) * @param path Backup path representing a local and remote file pair * @param retry No of times to retry to upload a file. If <1, it will try to upload file * exactly once. + * @param target The target time of completion of all files in the upload. * @throws BackupRestoreException in case of failure to upload for any reason including file not * readable or remote file system errors. * @throws FileNotFoundException If a file as denoted by localPath is not available or is a * directory. */ - void uploadAndDelete(AbstractBackupPath path, int retry) + void uploadAndDelete(AbstractBackupPath path, int retry, Instant target) throws FileNotFoundException, BackupRestoreException; + /** Overload that uploads as fast as possible without any custom throttling */ + default ListenableFuture asyncUploadAndDelete( + final AbstractBackupPath path, final int retry) + throws FileNotFoundException, RejectedExecutionException, BackupRestoreException { + return asyncUploadAndDelete(path, retry, Instant.EPOCH); + } + /** * Upload the local file denoted by localPath in async fashion to the remote file system at * location denoted by remotePath. @@ -81,6 +96,7 @@ void uploadAndDelete(AbstractBackupPath path, int retry) * @param path AbstractBackupPath to be used to send backup notifications only. * @param retry No of times to retry to upload a file. If <1, it will try to upload file * exactly once. + * @param target The target time of completion of all files in the upload. * @return The future of the async job to monitor the progress of the job. This will be null if * file was de-duped for upload. * @throws BackupRestoreException in case of failure to upload for any reason including file not @@ -91,7 +107,7 @@ void uploadAndDelete(AbstractBackupPath path, int retry) * to add the work to the queue. */ ListenableFuture asyncUploadAndDelete( - final AbstractBackupPath path, final int retry) + final AbstractBackupPath path, final int retry, Instant target) throws FileNotFoundException, RejectedExecutionException, BackupRestoreException; /** diff --git a/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java b/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java index 92907859a..2326b1618 100644 --- a/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java +++ b/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java @@ -93,7 +93,7 @@ public static boolean isEnabled( && (SnapshotBackup.isBackupEnabled(configuration) || (backupRestoreConfig.enableV2Backups() && SnapshotMetaTask.isBackupEnabled( - configuration, backupRestoreConfig)))); + backupRestoreConfig)))); logger.info("Incremental backups are enabled: {}", enabled); if (!enabled) { diff --git a/priam/src/main/java/com/netflix/priam/backup/SnapshotDirectorySize.java b/priam/src/main/java/com/netflix/priam/backup/SnapshotDirectorySize.java new file mode 100644 index 000000000..ab77cfe4a --- /dev/null +++ b/priam/src/main/java/com/netflix/priam/backup/SnapshotDirectorySize.java @@ -0,0 +1,50 @@ +package com.netflix.priam.backup; + +import java.io.IOException; +import java.nio.file.*; +import java.nio.file.attribute.BasicFileAttributes; + +/** Estimates remaining bytes to upload in a backup by looking at the file system */ +public class SnapshotDirectorySize implements DirectorySize { + + public long getBytes(String location) { + SummingFileVisitor fileVisitor = new SummingFileVisitor(); + try { + Files.walkFileTree(Paths.get(location), fileVisitor); + } catch (IOException e) { + // BackupFileVisitor is happy with an estimate and won't produce these in practice. + } + return fileVisitor.getTotalBytes(); + } + + private static final class SummingFileVisitor implements FileVisitor { + private long totalBytes; + + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) { + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + if (file.toString().contains(AbstractBackup.SNAPSHOT_FOLDER) && attrs.isRegularFile()) { + totalBytes += attrs.size(); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFileFailed(Path file, IOException exc) { + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) { + return FileVisitResult.CONTINUE; + } + + long getTotalBytes() { + return totalBytes; + } + } +} diff --git a/priam/src/main/java/com/netflix/priam/backupv2/BackupV2Service.java b/priam/src/main/java/com/netflix/priam/backupv2/BackupV2Service.java index 9b74047cb..5c02b396e 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/BackupV2Service.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/BackupV2Service.java @@ -56,7 +56,10 @@ public BackupV2Service( @Override public void scheduleService() throws Exception { - TaskTimer snapshotMetaTimer = SnapshotMetaTask.getTimer(configuration, backupRestoreConfig); + TaskTimer snapshotMetaTimer = SnapshotMetaTask.getTimer(backupRestoreConfig); + if (snapshotMetaTimer == null) { + SnapshotMetaTask.cleanOldBackups(configuration); + } scheduleTask(scheduler, SnapshotMetaTask.class, snapshotMetaTimer); if (snapshotMetaTimer != null) { diff --git a/priam/src/main/java/com/netflix/priam/backupv2/FileUploadResult.java b/priam/src/main/java/com/netflix/priam/backupv2/FileUploadResult.java index f9c5da37d..dc01a5a81 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/FileUploadResult.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/FileUploadResult.java @@ -73,6 +73,14 @@ public void setUploaded(Boolean uploaded) { isUploaded = uploaded; } + public Boolean getIsUploaded() { + return isUploaded; + } + + public Path getFileName() { + return fileName; + } + public String getBackupPath() { return backupPath; } diff --git a/priam/src/main/java/com/netflix/priam/backupv2/MetaFileWriterBuilder.java b/priam/src/main/java/com/netflix/priam/backupv2/MetaFileWriterBuilder.java index 489d087f8..3bda21e45 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/MetaFileWriterBuilder.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/MetaFileWriterBuilder.java @@ -65,7 +65,7 @@ public interface StartStep { } public interface DataStep { - DataStep addColumnfamilyResult( + ColumnfamilyResult addColumnfamilyResult( String keyspace, String columnFamily, ImmutableMultimap sstables) @@ -142,7 +142,7 @@ public DataStep startMetaFileGeneration(Instant snapshotInstant) throws IOExcept * * @throws IOException if unable to write to the file or if JSON is not valid */ - public DataStep addColumnfamilyResult( + public ColumnfamilyResult addColumnfamilyResult( String keyspace, String columnFamily, ImmutableMultimap sstables) @@ -151,8 +151,9 @@ public DataStep addColumnfamilyResult( if (jsonWriter == null) throw new NullPointerException( "addColumnfamilyResult: Json Writer in MetaFileWriter is null. This should not happen!"); - jsonWriter.jsonValue(toColumnFamilyResult(keyspace, columnFamily, sstables).toString()); - return this; + ColumnfamilyResult result = toColumnFamilyResult(keyspace, columnFamily, sstables); + jsonWriter.jsonValue(result.toString()); + return result; } /** diff --git a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java index 4c85a6bb1..10f3d4821 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java @@ -36,10 +36,13 @@ import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; +import java.text.ParseException; +import java.time.Clock; +import java.time.Duration; import java.time.Instant; -import java.util.Date; -import java.util.Optional; -import java.util.Set; +import java.time.ZoneId; +import java.time.temporal.ChronoUnit; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; @@ -48,6 +51,7 @@ import javax.inject.Named; import javax.inject.Singleton; import org.apache.commons.io.FileUtils; +import org.quartz.CronExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +77,7 @@ public class SnapshotMetaTask extends AbstractBackup { private static final String SNAPSHOT_PREFIX = "snap_v2_"; private static final String CASSANDRA_MANIFEST_FILE = "manifest.json"; private static final String CASSANDRA_SCHEMA_FILE = "schema.cql"; + private static final TimeZone UTC = TimeZone.getTimeZone(ZoneId.of("UTC")); private final BackupRestoreUtil backupRestoreUtil; private final MetaFileWriterBuilder metaFileWriter; private MetaFileWriterBuilder.DataStep dataStep; @@ -83,6 +88,10 @@ public class SnapshotMetaTask extends AbstractBackup { private final IBackupStatusMgr snapshotStatusMgr; private final InstanceIdentity instanceIdentity; private final ExecutorService threadPool; + private final IConfiguration config; + private final Clock clock; + private final IBackupRestoreConfig backupRestoreConfig; + private final BackupVerification backupVerification; private enum MetaStep { META_GENERATION, @@ -100,11 +109,18 @@ private enum MetaStep { @Named("v2") IMetaProxy metaProxy, InstanceIdentity instanceIdentity, IBackupStatusMgr snapshotStatusMgr, - CassandraOperations cassandraOperations) { + CassandraOperations cassandraOperations, + Clock clock, + IBackupRestoreConfig backupRestoreConfig, + BackupVerification backupVerification) { super(config, backupFileSystemCtx, pathFactory); + this.config = config; this.instanceIdentity = instanceIdentity; this.snapshotStatusMgr = snapshotStatusMgr; this.cassandraOperations = cassandraOperations; + this.clock = clock; + this.backupRestoreConfig = backupRestoreConfig; + this.backupVerification = backupVerification; backupRestoreUtil = new BackupRestoreUtil( config.getSnapshotIncludeCFList(), config.getSnapshotExcludeCFList()); @@ -116,31 +132,22 @@ private enum MetaStep { /** * Interval between generating snapshot meta file using {@link SnapshotMetaTask}. * - * @param backupRestoreConfig {@link - * IBackupRestoreConfig#getSnapshotMetaServiceCronExpression()} to get configuration details - * from priam. Use "-1" to disable the service. - * @param config configuration to get the data folder. + * @param config {@link IBackupRestoreConfig#getSnapshotMetaServiceCronExpression()} to get + * configuration details from priam. Use "-1" to disable the service. * @return the timer to be used for snapshot meta service. - * @throws Exception if the configuration is not set correctly or are not valid. This is to - * ensure we fail-fast. + * @throws IllegalArgumentException if the configuration is not set correctly or are not valid. + * This is to ensure we fail-fast. */ - public static TaskTimer getTimer( - IConfiguration config, IBackupRestoreConfig backupRestoreConfig) throws Exception { - TaskTimer timer = - CronTimer.getCronTimer( - JOBNAME, backupRestoreConfig.getSnapshotMetaServiceCronExpression()); - if (timer == null) { - cleanOldBackups(config); - } - return timer; + public static TaskTimer getTimer(IBackupRestoreConfig config) throws IllegalArgumentException { + return CronTimer.getCronTimer(JOBNAME, config.getSnapshotMetaServiceCronExpression()); } - private static void cleanOldBackups(IConfiguration config) throws Exception { + static void cleanOldBackups(IConfiguration config) throws Exception { // Clean up all the backup directories, if any. Set backupPaths = AbstractBackup.getBackupDirectories(config, SNAPSHOT_FOLDER); for (Path backupDirPath : backupPaths) try (DirectoryStream directoryStream = - Files.newDirectoryStream(backupDirPath, path -> Files.isDirectory(path))) { + Files.newDirectoryStream(backupDirPath, Files::isDirectory)) { for (Path backupDir : directoryStream) { if (backupDir.toFile().getName().startsWith(SNAPSHOT_PREFIX)) { FileUtils.deleteDirectory(backupDir.toFile()); @@ -149,10 +156,9 @@ private static void cleanOldBackups(IConfiguration config) throws Exception { } } - public static boolean isBackupEnabled( - IConfiguration configuration, IBackupRestoreConfig backupRestoreConfig) + public static boolean isBackupEnabled(IBackupRestoreConfig backupRestoreConfig) throws Exception { - return (getTimer(configuration, backupRestoreConfig) != null); + return (getTimer(backupRestoreConfig) != null); } String generateSnapshotName(Instant snapshotInstant) { @@ -194,7 +200,7 @@ public void execute() throws Exception { } // Save start snapshot status - Instant snapshotInstant = DateUtil.getInstant(); + Instant snapshotInstant = clock.instant(); String token = instanceIdentity.getInstance().getToken(); BackupMetadata backupMetadata = new BackupMetadata( @@ -264,6 +270,7 @@ private void uploadAllFiles(final File backupDir) throws Exception { // (like we exhausted the wait time for upload) File[] snapshotDirectories = backupDir.listFiles(); if (snapshotDirectories != null) { + Instant target = getUploadTarget(); for (File snapshotDirectory : snapshotDirectories) { // Is it a valid SNAPSHOT_PREFIX if (!snapshotDirectory.getName().startsWith(SNAPSHOT_PREFIX) @@ -278,13 +285,13 @@ private void uploadAllFiles(final File backupDir) throws Exception { // We do not want to wait for completion and we just want to add them to queue. This // is to ensure that next run happens on time. AbstractBackupPath.BackupFileType type = AbstractBackupPath.BackupFileType.SST_V2; - uploadAndDeleteAllFiles(snapshotDirectory, type, true); + uploadAndDeleteAllFiles(snapshotDirectory, type, true, target); // Next, upload secondary indexes type = AbstractBackupPath.BackupFileType.SECONDARY_INDEX_V2; ImmutableList> futures; for (File subDir : getSecondaryIndexDirectories(snapshotDirectory)) { - futures = uploadAndDeleteAllFiles(subDir, type, true); + futures = uploadAndDeleteAllFiles(subDir, type, true, target); if (futures.isEmpty()) { deleteIfEmpty(subDir); } @@ -294,6 +301,35 @@ private void uploadAllFiles(final File backupDir) throws Exception { } } + private Instant getUploadTarget() { + Instant now = clock.instant(); + Instant target = + now.plus(config.getTargetMinutesToCompleteSnaphotUpload(), ChronoUnit.MINUTES); + Duration verificationSLO = + Duration.ofHours(backupRestoreConfig.getBackupVerificationSLOInHours()); + Instant verificationDeadline = + backupVerification + .getLatestVerfifiedBackupTime() + .map(backupTime -> backupTime.plus(verificationSLO)) + .orElse(Instant.MAX); + Instant nextSnapshotTime; + try { + CronExpression snapshotCron = + new CronExpression(backupRestoreConfig.getSnapshotMetaServiceCronExpression()); + snapshotCron.setTimeZone(UTC); + Date nextSnapshotDate = snapshotCron.getNextValidTimeAfter(Date.from(Instant.now())); + nextSnapshotTime = + nextSnapshotDate == null ? Instant.MAX : nextSnapshotDate.toInstant(); + } catch (ParseException e) { + nextSnapshotTime = Instant.MAX; + } + return earliest(target, verificationDeadline, nextSnapshotTime); + } + + private Instant earliest(Instant... instants) { + return Arrays.stream(instants).min(Instant::compareTo).get(); + } + private Void deleteIfEmpty(File dir) { if (FileUtils.sizeOfDirectory(dir) == 0) FileUtils.deleteQuietly(dir); return null; @@ -305,7 +341,8 @@ protected void processColumnFamily(File backupDir) throws Exception { String columnFamily = getColumnFamily(backupDir); switch (metaStep) { case META_GENERATION: - generateMetaFile(keyspace, columnFamily, backupDir); + generateMetaFile(keyspace, columnFamily, backupDir) + .ifPresent(this::deleteUploadedFiles); break; case UPLOAD_FILES: uploadAllFiles(backupDir); @@ -315,14 +352,14 @@ protected void processColumnFamily(File backupDir) throws Exception { } } - private void generateMetaFile( + private Optional generateMetaFile( final String keyspace, final String columnFamily, final File backupDir) throws Exception { File snapshotDir = getValidSnapshot(backupDir, snapshotName); // Process this snapshot folder for the given columnFamily if (snapshotDir == null) { logger.warn("{} folder does not contain {} snapshots", backupDir, snapshotName); - return; + return Optional.empty(); } logger.debug("Scanning for all SSTables in: {}", snapshotDir.getAbsolutePath()); @@ -338,8 +375,18 @@ private void generateMetaFile( ImmutableSetMultimap sstables = builder.build(); logger.debug("Processing {} sstables from {}.{}", keyspace, columnFamily, sstables.size()); - dataStep.addColumnfamilyResult(keyspace, columnFamily, sstables); + ColumnfamilyResult result = + dataStep.addColumnfamilyResult(keyspace, columnFamily, sstables); logger.debug("Finished processing KS: {}, CF: {}", keyspace, columnFamily); + return Optional.of(result); + } + + private void deleteUploadedFiles(ColumnfamilyResult result) { + result.getSstables() + .stream() + .flatMap(sstable -> sstable.getSstableComponents().stream()) + .filter(file -> Boolean.TRUE.equals(file.getIsUploaded())) + .forEach(file -> FileUtils.deleteQuietly(file.getFileName().toFile())); } private ImmutableSetMultimap getSSTables( diff --git a/priam/src/main/java/com/netflix/priam/config/IConfiguration.java b/priam/src/main/java/com/netflix/priam/config/IConfiguration.java index 35abf7818..b013c62dd 100644 --- a/priam/src/main/java/com/netflix/priam/config/IConfiguration.java +++ b/priam/src/main/java/com/netflix/priam/config/IConfiguration.java @@ -1148,6 +1148,19 @@ default boolean permitDirectTokenAssignmentWithGossipMismatch() { return false; } + /** returns how long a snapshot backup should take to upload in minutes */ + default int getTargetMinutesToCompleteSnaphotUpload() { + return 0; + } + + /** + * @return the percentage off of the old rate that the current rate must be to trigger a new + * rate in the dynamic rate limiter + */ + default double getRateLimitChangeThreshold() { + return 0.1; + } + /** * Escape hatch for getting any arbitrary property by key This is useful so we don't have to * keep adding methods to this interface for every single configuration option ever. Also diff --git a/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java b/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java index d0d449f50..a7193655e 100644 --- a/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java +++ b/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java @@ -789,4 +789,14 @@ public boolean checkThriftServerIsListening() { public boolean permitDirectTokenAssignmentWithGossipMismatch() { return config.get(PRIAM_PRE + ".permitDirectTokenAssignmentWithGossipMismatch", false); } + + @Override + public int getTargetMinutesToCompleteSnaphotUpload() { + return config.get(PRIAM_PRE + ".snapshotUploadDuration", 0); + } + + @Override + public double getRateLimitChangeThreshold() { + return config.get(PRIAM_PRE + ".rateLimitChangeThreshold", 0.1); + } } diff --git a/priam/src/main/java/com/netflix/priam/google/GoogleEncryptedFileSystem.java b/priam/src/main/java/com/netflix/priam/google/GoogleEncryptedFileSystem.java index 49455517b..f008e5726 100755 --- a/priam/src/main/java/com/netflix/priam/google/GoogleEncryptedFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/google/GoogleEncryptedFileSystem.java @@ -34,6 +34,7 @@ import com.netflix.priam.notification.BackupNotificationMgr; import java.io.*; import java.nio.file.Path; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -236,7 +237,8 @@ public void shutdown() { } @Override - protected long uploadFileImpl(AbstractBackupPath path) throws BackupRestoreException { + protected long uploadFileImpl(AbstractBackupPath path, Instant target) + throws BackupRestoreException { throw new UnsupportedOperationException(); } diff --git a/priam/src/test/java/com/netflix/priam/backup/BRTestModule.java b/priam/src/test/java/com/netflix/priam/backup/BRTestModule.java index 29c87b051..6a1425f49 100644 --- a/priam/src/test/java/com/netflix/priam/backup/BRTestModule.java +++ b/priam/src/test/java/com/netflix/priam/backup/BRTestModule.java @@ -34,7 +34,10 @@ import com.netflix.priam.cryptography.pgp.PgpCryptography; import com.netflix.priam.defaultimpl.FakeCassandraProcess; import com.netflix.priam.defaultimpl.ICassandraProcess; -import com.netflix.priam.identity.*; +import com.netflix.priam.identity.FakeMembership; +import com.netflix.priam.identity.FakePriamInstanceFactory; +import com.netflix.priam.identity.IMembership; +import com.netflix.priam.identity.IPriamInstanceFactory; import com.netflix.priam.identity.config.FakeInstanceInfo; import com.netflix.priam.identity.config.InstanceInfo; import com.netflix.priam.restore.IPostRestoreHook; @@ -42,6 +45,9 @@ import com.netflix.priam.utils.Sleeper; import com.netflix.spectator.api.DefaultRegistry; import com.netflix.spectator.api.Registry; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; import java.util.Collections; import org.junit.Ignore; import org.quartz.SchedulerFactory; @@ -83,5 +89,7 @@ protected void configure() { bind(Registry.class).toInstance(new DefaultRegistry()); bind(IMetaProxy.class).annotatedWith(Names.named("v1")).to(MetaV1Proxy.class); bind(IMetaProxy.class).annotatedWith(Names.named("v2")).to(MetaV2Proxy.class); + bind(DynamicRateLimiter.class).to(FakeDynamicRateLimiter.class); + bind(Clock.class).toInstance(Clock.fixed(Instant.EPOCH, ZoneId.systemDefault())); } } diff --git a/priam/src/test/java/com/netflix/priam/backup/FakeBackupFileSystem.java b/priam/src/test/java/com/netflix/priam/backup/FakeBackupFileSystem.java index 86d5d822e..8807e4b4d 100644 --- a/priam/src/test/java/com/netflix/priam/backup/FakeBackupFileSystem.java +++ b/priam/src/test/java/com/netflix/priam/backup/FakeBackupFileSystem.java @@ -28,6 +28,7 @@ import java.io.FileWriter; import java.io.IOException; import java.nio.file.Path; +import java.time.Instant; import java.util.*; import org.json.simple.JSONArray; @@ -166,7 +167,8 @@ protected void downloadFileImpl(AbstractBackupPath path, String suffix) } @Override - protected long uploadFileImpl(AbstractBackupPath path) throws BackupRestoreException { + protected long uploadFileImpl(AbstractBackupPath path, Instant target) + throws BackupRestoreException { uploadedFiles.add(path.getBackupFile().getAbsolutePath()); addFile(path.getRemotePath()); return path.getBackupFile().length(); diff --git a/priam/src/test/java/com/netflix/priam/backup/FakeDynamicRateLimiter.java b/priam/src/test/java/com/netflix/priam/backup/FakeDynamicRateLimiter.java new file mode 100644 index 000000000..2108d3567 --- /dev/null +++ b/priam/src/test/java/com/netflix/priam/backup/FakeDynamicRateLimiter.java @@ -0,0 +1,8 @@ +package com.netflix.priam.backup; + +import java.time.Instant; + +public class FakeDynamicRateLimiter implements DynamicRateLimiter { + @Override + public void acquire(AbstractBackupPath dir, Instant target, int tokens) {} +} diff --git a/priam/src/test/java/com/netflix/priam/backup/NullBackupFileSystem.java b/priam/src/test/java/com/netflix/priam/backup/NullBackupFileSystem.java index d074a5e99..6dbf3941f 100644 --- a/priam/src/test/java/com/netflix/priam/backup/NullBackupFileSystem.java +++ b/priam/src/test/java/com/netflix/priam/backup/NullBackupFileSystem.java @@ -23,6 +23,7 @@ import com.netflix.priam.merics.BackupMetrics; import com.netflix.priam.notification.BackupNotificationMgr; import java.nio.file.Path; +import java.time.Instant; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -72,7 +73,8 @@ protected boolean doesRemoteFileExist(Path remotePath) { } @Override - protected long uploadFileImpl(AbstractBackupPath path) throws BackupRestoreException { + protected long uploadFileImpl(AbstractBackupPath path, Instant target) + throws BackupRestoreException { return 0; } } diff --git a/priam/src/test/java/com/netflix/priam/backup/TestAbstractFileSystem.java b/priam/src/test/java/com/netflix/priam/backup/TestAbstractFileSystem.java index e042cd0ac..47cf1d1fa 100644 --- a/priam/src/test/java/com/netflix/priam/backup/TestAbstractFileSystem.java +++ b/priam/src/test/java/com/netflix/priam/backup/TestAbstractFileSystem.java @@ -29,6 +29,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.text.ParseException; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -284,7 +285,8 @@ protected void downloadFileImpl(AbstractBackupPath path, String suffix) } @Override - protected long uploadFileImpl(AbstractBackupPath path) throws BackupRestoreException { + protected long uploadFileImpl(AbstractBackupPath path, Instant target) + throws BackupRestoreException { throw new BackupRestoreException( "User injected failure file system error for testing upload. Local path: " + path.getBackupFile().getAbsolutePath()); @@ -315,7 +317,8 @@ protected void downloadFileImpl(AbstractBackupPath path, String suffix) } @Override - protected long uploadFileImpl(AbstractBackupPath path) throws BackupRestoreException { + protected long uploadFileImpl(AbstractBackupPath path, Instant target) + throws BackupRestoreException { try { Thread.sleep(random.nextInt(20)); } catch (InterruptedException e) { diff --git a/priam/src/test/java/com/netflix/priam/backup/TestBackupDynamicRateLimiter.java b/priam/src/test/java/com/netflix/priam/backup/TestBackupDynamicRateLimiter.java new file mode 100644 index 000000000..1c607ba7d --- /dev/null +++ b/priam/src/test/java/com/netflix/priam/backup/TestBackupDynamicRateLimiter.java @@ -0,0 +1,159 @@ +package com.netflix.priam.backup; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableMap; +import com.google.common.truth.Truth; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.netflix.priam.aws.RemoteBackupPath; +import com.netflix.priam.config.FakeConfiguration; +import java.nio.file.Paths; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestBackupDynamicRateLimiter { + private static final Instant NOW = Instant.ofEpochMilli(1 << 16); + private static final Instant LATER = NOW.plusMillis(Duration.ofHours(1).toMillis()); + private static final int DIR_SIZE = 1 << 16; + + private BackupDynamicRateLimiter rateLimiter; + private FakeConfiguration config; + private Injector injector; + + @Before + public void setUp() { + injector = Guice.createInjector(new BRTestModule()); + config = injector.getInstance(FakeConfiguration.class); + } + + @Test + public void sunnyDay() { + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE); + Stopwatch timer = timePermitAcquisition(getBackupPath(), LATER, 21); + Truth.assertThat(timer.elapsed(TimeUnit.MILLISECONDS)).isAtLeast(1_000); + Truth.assertThat(timer.elapsed(TimeUnit.MILLISECONDS)).isAtMost(2_000); + } + + @Test + public void targetSetToEpoch() { + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE); + Stopwatch timer = timePermitAcquisition(getBackupPath(), Instant.EPOCH, 20); + assertNoRateLimiting(timer); + } + + @Test + public void pathIsNotASnapshot() { + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE); + AbstractBackupPath path = + getBackupPath( + "target/data/Keyspace1/Standard1/backups/Keyspace1-Standard1-ia-4-Data.db"); + Stopwatch timer = timePermitAcquisition(path, LATER, 20); + assertNoRateLimiting(timer); + } + + @Test + public void targetIsNow() { + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE); + Stopwatch timer = timePermitAcquisition(getBackupPath(), NOW, 20); + assertNoRateLimiting(timer); + } + + @Test + public void targetIsInThePast() { + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE); + Instant target = NOW.minus(Duration.ofHours(1L)); + Stopwatch timer = timePermitAcquisition(getBackupPath(), target, 20); + assertNoRateLimiting(timer); + } + + @Test + public void noBackupThreads() { + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 0), NOW, DIR_SIZE); + Assert.assertThrows( + IllegalStateException.class, + () -> timePermitAcquisition(getBackupPath(), LATER, 20)); + } + + @Test + public void negativeBackupThreads() { + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", -1), NOW, DIR_SIZE); + Assert.assertThrows( + IllegalStateException.class, + () -> timePermitAcquisition(getBackupPath(), LATER, 20)); + } + + @Test + public void noData() { + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, 0); + Stopwatch timer = timePermitAcquisition(getBackupPath(), LATER, 20); + assertNoRateLimiting(timer); + } + + @Test + public void noPermitsRequested() { + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE); + Assert.assertThrows( + IllegalArgumentException.class, + () -> timePermitAcquisition(getBackupPath(), LATER, 0)); + } + + @Test + public void negativePermitsRequested() { + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE); + Assert.assertThrows( + IllegalArgumentException.class, + () -> timePermitAcquisition(getBackupPath(), LATER, -1)); + } + + private RemoteBackupPath getBackupPath() { + return getBackupPath( + "target/data/Keyspace1/Standard1/snapshots/snap_v2_202201010000/.STANDARD1_field1_idx_1/Keyspace1-Standard1-ia-4-Data.db"); + } + + private RemoteBackupPath getBackupPath(String filePath) { + RemoteBackupPath path = injector.getInstance(RemoteBackupPath.class); + path.parseLocal(Paths.get(filePath).toFile(), AbstractBackupPath.BackupFileType.SST_V2); + return path; + } + + private Stopwatch timePermitAcquisition(AbstractBackupPath path, Instant now, int permits) { + rateLimiter.acquire(path, now, permits); // Do this once first or else it won't throttle. + Stopwatch timer = Stopwatch.createStarted(); + rateLimiter.acquire(path, now, permits); + timer.stop(); + return timer; + } + + private BackupDynamicRateLimiter getRateLimiter( + Map properties, Instant now, long directorySize) { + properties.forEach(config::setFakeConfig); + return new BackupDynamicRateLimiter( + config, + Clock.fixed(now, ZoneId.systemDefault()), + new FakeDirectorySize(directorySize)); + } + + private void assertNoRateLimiting(Stopwatch timer) { + Truth.assertThat(timer.elapsed(TimeUnit.MILLISECONDS)).isAtMost(1); + } + + private static final class FakeDirectorySize implements DirectorySize { + private final long size; + + FakeDirectorySize(long size) { + this.size = size; + } + + @Override + public long getBytes(String location) { + return size; + } + } +} diff --git a/priam/src/test/java/com/netflix/priam/backupv2/TestSnapshotMetaTask.java b/priam/src/test/java/com/netflix/priam/backupv2/TestSnapshotMetaTask.java index 4ae4de2c1..913b09364 100644 --- a/priam/src/test/java/com/netflix/priam/backupv2/TestSnapshotMetaTask.java +++ b/priam/src/test/java/com/netflix/priam/backupv2/TestSnapshotMetaTask.java @@ -64,7 +64,7 @@ public void setUp() { @Test public void testSnapshotMetaServiceEnabled() throws Exception { - TaskTimer taskTimer = SnapshotMetaTask.getTimer(configuration, backupRestoreConfig); + TaskTimer taskTimer = SnapshotMetaTask.getTimer(backupRestoreConfig); Assert.assertNotNull(taskTimer); } diff --git a/priam/src/test/java/com/netflix/priam/config/FakeConfiguration.java b/priam/src/test/java/com/netflix/priam/config/FakeConfiguration.java index e9b61cfff..046b367f5 100644 --- a/priam/src/test/java/com/netflix/priam/config/FakeConfiguration.java +++ b/priam/src/test/java/com/netflix/priam/config/FakeConfiguration.java @@ -285,4 +285,11 @@ public boolean skipIngressUnlessIPIsPublic() { public void setSkipIngressUnlessIPIsPublic(boolean skipIngressUnlessIPIsPublic) { this.skipIngressUnlessIPIsPublic = skipIngressUnlessIPIsPublic; } + + @Override + public int getBackupThreads() { + return (Integer) + fakeConfig.getOrDefault( + "Priam.backup.threads", IConfiguration.super.getBackupThreads()); + } }