Skip to content

Commit

Permalink
Dynamic Rate Limiting of Snapshots (#975)
Browse files Browse the repository at this point in the history
* CASS-2011 update interface to accommodate dependency change

* CASS-2011 move backup cleaning outside of getTimer method

* CASS-2011 introduce a target time for upload completion. Currently always the epoch which implies a no-op

* CASS-2011 Change AbstractFileSystem API to pass target instant to fileUploadImpl

* CASS-2011 Delete already-uploaded files to get accurate estimates of remaining bytes to upload.

* CASS-2011 create a rate limiter that dynamically adjusts its throttle based on the bytes still to upload in all remaining snapshots and a user-specified target time. Adjust the throttle only when we've deviated by a user-configurable threshold to ensure the rate limiter is not constantly adjusted. In that case, it would be redundant as it does not throttle the subsequent file after an adjustment. Ensure that the target does not exceed the earlier of the next scheduled snapshot or the time at which we would fail to meet our backup verification SLO.

* CASS-2011 Add unit tests and associated refactoring for BackupDynamicRateLimiter.
  • Loading branch information
mattl-netflix authored Jun 2, 2022
1 parent 99df803 commit 27c267e
Show file tree
Hide file tree
Showing 27 changed files with 518 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.inject.name.Named;
import com.netflix.priam.backup.AbstractBackupPath;
import com.netflix.priam.backup.BackupRestoreException;
import com.netflix.priam.backup.DynamicRateLimiter;
import com.netflix.priam.backup.RangeReadInputStream;
import com.netflix.priam.compress.ChunkedStream;
import com.netflix.priam.compress.ICompression;
Expand All @@ -37,6 +38,7 @@
import java.io.*;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.IOUtils;
Expand All @@ -49,6 +51,7 @@ public class S3EncryptedFileSystem extends S3FileSystemBase {

private static final Logger logger = LoggerFactory.getLogger(S3EncryptedFileSystem.class);
private final IFileCryptography encryptor;
private final DynamicRateLimiter dynamicRateLimiter;

@Inject
public S3EncryptedFileSystem(
Expand All @@ -59,10 +62,12 @@ public S3EncryptedFileSystem(
@Named("filecryptoalgorithm") IFileCryptography fileCryptography,
BackupMetrics backupMetrics,
BackupNotificationMgr backupNotificationMgr,
InstanceInfo instanceInfo) {
InstanceInfo instanceInfo,
DynamicRateLimiter dynamicRateLimiter) {

super(pathProvider, compress, config, backupMetrics, backupNotificationMgr);
this.encryptor = fileCryptography;
this.dynamicRateLimiter = dynamicRateLimiter;
super.s3Client =
AmazonS3Client.builder()
.withCredentials(cred.getAwsCredentialProvider())
Expand Down Expand Up @@ -97,7 +102,8 @@ s3Client, getShard(), super.getFileSize(remotePath), remotePath)) {
}

@Override
protected long uploadFileImpl(AbstractBackupPath path) throws BackupRestoreException {
protected long uploadFileImpl(AbstractBackupPath path, Instant target)
throws BackupRestoreException {
Path localPath = Paths.get(path.getBackupFile().getAbsolutePath());
String remotePath = path.getRemotePath();

Expand Down Expand Up @@ -150,6 +156,7 @@ protected long uploadFileImpl(AbstractBackupPath path) throws BackupRestoreExcep
byte[] chunk = chunks.next();
// throttle upload to endpoint
rateLimiter.acquire(chunk.length);
dynamicRateLimiter.acquire(path, target, chunk.length);

DataPart dp =
new DataPart(
Expand Down
21 changes: 16 additions & 5 deletions priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.netflix.priam.aws.auth.IS3Credential;
import com.netflix.priam.backup.AbstractBackupPath;
import com.netflix.priam.backup.BackupRestoreException;
import com.netflix.priam.backup.DynamicRateLimiter;
import com.netflix.priam.backup.RangeReadInputStream;
import com.netflix.priam.compress.ChunkedStream;
import com.netflix.priam.compress.CompressionType;
Expand All @@ -39,6 +40,7 @@
import java.io.*;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
Expand All @@ -53,6 +55,7 @@
public class S3FileSystem extends S3FileSystemBase {
private static final Logger logger = LoggerFactory.getLogger(S3FileSystem.class);
private static final long MAX_BUFFER_SIZE = 5L * 1024L * 1024L;
private final DynamicRateLimiter dynamicRateLimiter;

@Inject
public S3FileSystem(
Expand All @@ -62,13 +65,15 @@ public S3FileSystem(
final IConfiguration config,
BackupMetrics backupMetrics,
BackupNotificationMgr backupNotificationMgr,
InstanceInfo instanceInfo) {
InstanceInfo instanceInfo,
DynamicRateLimiter dynamicRateLimiter) {
super(pathProvider, compress, config, backupMetrics, backupNotificationMgr);
s3Client =
AmazonS3Client.builder()
.withCredentials(cred.getAwsCredentialProvider())
.withRegion(instanceInfo.getRegion())
.build();
this.dynamicRateLimiter = dynamicRateLimiter;
}

@Override
Expand Down Expand Up @@ -113,7 +118,8 @@ private ObjectMetadata getObjectMetadata(File file) {
return ret;
}

private long uploadMultipart(AbstractBackupPath path) throws BackupRestoreException {
private long uploadMultipart(AbstractBackupPath path, Instant target)
throws BackupRestoreException {
Path localPath = Paths.get(path.getBackupFile().getAbsolutePath());
String remotePath = path.getRemotePath();
long chunkSize = getChunkSize(localPath);
Expand All @@ -137,6 +143,7 @@ private long uploadMultipart(AbstractBackupPath path) throws BackupRestoreExcept
while (chunks.hasNext()) {
byte[] chunk = chunks.next();
rateLimiter.acquire(chunk.length);
dynamicRateLimiter.acquire(path, target, chunk.length);
DataPart dp = new DataPart(++partNum, chunk, prefix, remotePath, uploadId);
S3PartUploader partUploader = new S3PartUploader(s3Client, dp, partETags, partsPut);
compressedFileSize += chunk.length;
Expand All @@ -163,12 +170,13 @@ private long uploadMultipart(AbstractBackupPath path) throws BackupRestoreExcept
}
}

protected long uploadFileImpl(AbstractBackupPath path) throws BackupRestoreException {
protected long uploadFileImpl(AbstractBackupPath path, Instant target)
throws BackupRestoreException {
Path localPath = Paths.get(path.getBackupFile().getAbsolutePath());
String remotePath = path.getRemotePath();
long chunkSize = config.getBackupChunkSize();
File localFile = localPath.toFile();
if (localFile.length() >= chunkSize) return uploadMultipart(path);
if (localFile.length() >= chunkSize) return uploadMultipart(path, target);

String prefix = config.getBackupPrefix();
if (logger.isDebugEnabled()) logger.debug("PUTing {}/{}", prefix, remotePath);
Expand All @@ -182,7 +190,10 @@ protected long uploadFileImpl(AbstractBackupPath path) throws BackupRestoreExcep
byte[] chunk = byteArrayOutputStream.toByteArray();
long compressedFileSize = chunk.length;
// C* snapshots may have empty files. That is probably unintentional.
if (chunk.length > 0) rateLimiter.acquire(chunk.length);
if (chunk.length > 0) {
rateLimiter.acquire(chunk.length);
dynamicRateLimiter.acquire(path, target, chunk.length);
}
ObjectMetadata objectMetadata = getObjectMetadata(localFile);
objectMetadata.setContentLength(chunk.length);
ByteArrayInputStream inputStream = new ByteArrayInputStream(chunk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,13 @@ public void visit(LifecyclePrefixPredicate lifecyclePrefixPredicate) {
@Override
public void visit(LifecycleTagPredicate lifecycleTagPredicate) {}

@Override
public void visit(LifecycleAndOperator lifecycleAndOperator) {}

@Override
public void visit(
LifecycleObjectSizeGreaterThanPredicate lifecycleObjectSizeGreaterThanPredicate) {}

@Override
public void visit(LifecycleAndOperator lifecycleAndOperator) {}

@Override
public void visit(
LifecycleObjectSizeLessThanPredicate lifecycleObjectSizeLessThanPredicate) {}
Expand Down
15 changes: 12 additions & 3 deletions priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -65,25 +66,33 @@ public AbstractBackup(
this.fs = backupFileSystemCtx.getFileStrategy(config);
}

/** Overload that uploads files without any custom throttling */
protected ImmutableList<ListenableFuture<AbstractBackupPath>> uploadAndDeleteAllFiles(
final File parent, final BackupFileType type, boolean async) throws Exception {
return uploadAndDeleteAllFiles(parent, type, async, Instant.EPOCH);
}

/**
* Upload files in the specified dir. Does not delete the file in case of error. The files are
* uploaded serially or async based on flag provided.
*
* @param parent Parent dir
* @param type Type of file (META, SST, SNAP etc)
* @param async Upload the file(s) in async fashion if enabled.
* @param target target time of completion of the batch of files
* @return List of files that are successfully uploaded as part of backup
* @throws Exception when there is failure in uploading files.
*/
protected ImmutableList<ListenableFuture<AbstractBackupPath>> uploadAndDeleteAllFiles(
final File parent, final BackupFileType type, boolean async) throws Exception {
final File parent, final BackupFileType type, boolean async, Instant target)
throws Exception {
ImmutableSet<AbstractBackupPath> backupPaths = getBackupPaths(parent, type);
final ImmutableList.Builder<ListenableFuture<AbstractBackupPath>> futures =
ImmutableList.builder();
for (AbstractBackupPath bp : backupPaths) {
if (async) futures.add(fs.asyncUploadAndDelete(bp, 10));
if (async) futures.add(fs.asyncUploadAndDelete(bp, 10, target));
else {
fs.uploadAndDelete(bp, 10);
fs.uploadAndDelete(bp, 10, target);
futures.add(Futures.immediateFuture(bp));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -157,16 +158,17 @@ protected abstract void downloadFileImpl(final AbstractBackupPath path, String s

@Override
public ListenableFuture<AbstractBackupPath> asyncUploadAndDelete(
final AbstractBackupPath path, final int retry) throws RejectedExecutionException {
final AbstractBackupPath path, final int retry, Instant target)
throws RejectedExecutionException {
return fileUploadExecutor.submit(
() -> {
uploadAndDelete(path, retry);
uploadAndDelete(path, retry, target);
return path;
});
}

@Override
public void uploadAndDelete(final AbstractBackupPath path, final int retry)
public void uploadAndDelete(final AbstractBackupPath path, final int retry, Instant target)
throws BackupRestoreException {
Path localPath = Paths.get(path.getBackupFile().getAbsolutePath());
File localFile = localPath.toFile();
Expand All @@ -189,7 +191,7 @@ public void uploadAndDelete(final AbstractBackupPath path, final int retry)
new BoundedExponentialRetryCallable<Long>(500, 10000, retry) {
@Override
public Long retriableCall() throws Exception {
return uploadFileImpl(path);
return uploadFileImpl(path, target);
}
}.call();

Expand Down Expand Up @@ -270,7 +272,7 @@ public void deleteRemoteFiles(List<Path> remotePaths) throws BackupRestoreExcept

protected abstract boolean doesRemoteFileExist(Path remotePath);

protected abstract long uploadFileImpl(final AbstractBackupPath path)
protected abstract long uploadFileImpl(final AbstractBackupPath path, Instant target)
throws BackupRestoreException;

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.netflix.priam.backup;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
import com.netflix.priam.config.IConfiguration;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import javax.inject.Inject;

public class BackupDynamicRateLimiter implements DynamicRateLimiter {

private final Clock clock;
private final IConfiguration config;
private final DirectorySize dirSize;
private final RateLimiter rateLimiter;

@Inject
BackupDynamicRateLimiter(IConfiguration config, Clock clock, DirectorySize dirSize) {
this.clock = clock;
this.config = config;
this.dirSize = dirSize;
this.rateLimiter = RateLimiter.create(Double.MAX_VALUE);
}

@Override
public void acquire(AbstractBackupPath path, Instant target, int permits) {
if (target.equals(Instant.EPOCH)
|| !path.getBackupFile()
.getAbsolutePath()
.contains(AbstractBackup.SNAPSHOT_FOLDER)) {
return;
}
long secondsRemaining = Duration.between(clock.instant(), target).getSeconds();
if (secondsRemaining < 1) {
// skip file system checks when unnecessary
return;
}
int backupThreads = config.getBackupThreads();
Preconditions.checkState(backupThreads > 0);
long bytesPerThread = this.dirSize.getBytes(config.getDataFileLocation()) / backupThreads;
if (bytesPerThread < 1) {
return;
}
double newRate = (double) bytesPerThread / secondsRemaining;
double oldRate = rateLimiter.getRate();
if ((Math.abs(newRate - oldRate) / oldRate) > config.getRateLimitChangeThreshold()) {
rateLimiter.setRate(newRate);
}
rateLimiter.acquire(permits);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netflix.priam.utils.DateUtil.DateRange;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,6 +41,7 @@ public class BackupVerification {
private final IMetaProxy metaV2Proxy;
private final IBackupStatusMgr backupStatusMgr;
private final Provider<AbstractBackupPath> abstractBackupPathProvider;
private BackupVerificationResult latestResult;

@Inject
BackupVerification(
Expand Down Expand Up @@ -83,14 +85,14 @@ public Optional<BackupVerificationResult> verifyBackup(
for (BackupMetadata backupMetadata : metadata) {
if (backupMetadata.getLastValidated() != null && !force) {
// Backup is already validated. Nothing to do.
BackupVerificationResult result = new BackupVerificationResult();
result.valid = true;
result.manifestAvailable = true;
result.snapshotInstant = backupMetadata.getStart().toInstant();
latestResult = new BackupVerificationResult();
latestResult.valid = true;
latestResult.manifestAvailable = true;
latestResult.snapshotInstant = backupMetadata.getStart().toInstant();
Path snapshotLocation = Paths.get(backupMetadata.getSnapshotLocation());
result.remotePath =
latestResult.remotePath =
snapshotLocation.subpath(1, snapshotLocation.getNameCount()).toString();
return Optional.of(result);
return Optional.of(latestResult);
}
BackupVerificationResult backupVerificationResult =
verifyBackup(metaProxy, backupMetadata);
Expand All @@ -102,9 +104,11 @@ public Optional<BackupVerificationResult> verifyBackup(
if (backupVerificationResult.valid) {
backupMetadata.setLastValidated(new Date(DateUtil.getInstant().toEpochMilli()));
backupStatusMgr.update(backupMetadata);
latestResult = backupVerificationResult;
return Optional.of(backupVerificationResult);
}
}
latestResult = null;
return Optional.empty();
}

Expand Down Expand Up @@ -145,6 +149,11 @@ public List<BackupVerificationResult> verifyAllBackups(
return result;
}

/** returns the latest valid backup verification result if we have found one within the SLO * */
public Optional<Instant> getLatestVerfifiedBackupTime() {
return latestResult == null ? Optional.empty() : Optional.of(latestResult.snapshotInstant);
}

private BackupVerificationResult verifyBackup(
IMetaProxy metaProxy, BackupMetadata latestBackupMetaData) {
Path metadataLocation = Paths.get(latestBackupMetaData.getSnapshotLocation());
Expand Down
10 changes: 10 additions & 0 deletions priam/src/main/java/com/netflix/priam/backup/DirectorySize.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.netflix.priam.backup;

import com.google.inject.ImplementedBy;

/** estimates the number of bytes remaining to upload in a snapshot */
@ImplementedBy(SnapshotDirectorySize.class)
public interface DirectorySize {
/** return the total bytes of all snapshot files south of location in the filesystem */
long getBytes(String location);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.netflix.priam.backup;

import com.google.inject.ImplementedBy;
import java.time.Instant;

@ImplementedBy(BackupDynamicRateLimiter.class)
public interface DynamicRateLimiter {
void acquire(AbstractBackupPath dir, Instant target, int tokens);
}
Loading

0 comments on commit 27c267e

Please sign in to comment.