Skip to content

Commit

Permalink
Continue trying to upload incrementals all the time (#727)
Browse files Browse the repository at this point in the history
We have reports of missing incrementals. The incremental upload path is
somewhat of a maze of retries and it turns out that we ultimately were
not uploading with retries and we wouldn't re-enqueue because the hash
that shadowed the queue didn't get updated.

We should ... not do those things.

This cherry-picks 3f21bc7
  • Loading branch information
jolynch authored and arunagrawal-84 committed Oct 2, 2018
1 parent 294519e commit d615e6a
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 102 deletions.
4 changes: 2 additions & 2 deletions priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ public void uploadFile(AbstractBackupPath path, InputStream in, long chunkSize)
objectMetadata.setContentLength(chunk.length);
PutObjectRequest putObjectRequest = new PutObjectRequest(config.getBackupPrefix(), path.getRemotePath(), new ByteArrayInputStream(chunk), objectMetadata);
//Retry if failed.
PutObjectResult upload = new BoundedExponentialRetryCallable<PutObjectResult>() {
PutObjectResult upload = new BoundedExponentialRetryCallable<PutObjectResult>(1000, 10000, 5) {
@Override
public PutObjectResult retriableCall() throws Exception {
return s3Client.putObject(putObjectRequest);
}
}.retriableCall();
}.call();

bytesUploaded.addAndGet(chunk.length);

Expand Down
10 changes: 6 additions & 4 deletions priam/src/main/java/com/netflix/priam/aws/S3PartUploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.*;
import com.netflix.priam.backup.BackupRestoreException;
import com.netflix.priam.utils.RetryableCallable;
import com.netflix.priam.utils.BoundedExponentialRetryCallable;
import com.netflix.priam.utils.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,24 +29,26 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class S3PartUploader extends RetryableCallable<Void> {
public class S3PartUploader extends BoundedExponentialRetryCallable<Void>
{
private final AmazonS3 client;
private DataPart dataPart;
private List<PartETag> partETags;
private AtomicInteger partsUploaded = null; //num of data parts successfully uploaded

private static final Logger logger = LoggerFactory.getLogger(S3PartUploader.class);
private static final int MAX_RETRIES = 5;
private static final int DEFAULT_MIN_SLEEP_MS = 200;

public S3PartUploader(AmazonS3 client, DataPart dp, List<PartETag> partETags) {
super(MAX_RETRIES, RetryableCallable.DEFAULT_WAIT_TIME);
super(DEFAULT_MIN_SLEEP_MS, BoundedExponentialRetryCallable.MAX_SLEEP, MAX_RETRIES);
this.client = client;
this.dataPart = dp;
this.partETags = partETags;
}

public S3PartUploader(AmazonS3 client, DataPart dp, List<PartETag> partETags, AtomicInteger partsUploaded) {
super(MAX_RETRIES, RetryableCallable.DEFAULT_WAIT_TIME);
super(DEFAULT_MIN_SLEEP_MS, BoundedExponentialRetryCallable.MAX_SLEEP, MAX_RETRIES);
this.client = client;
this.dataPart = dp;
this.partETags = partETags;
Expand Down
44 changes: 21 additions & 23 deletions priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ List<AbstractBackupPath> upload(File parent, final BackupFileType type) throws E
try {
logger.info("About to upload file {} for backup", file.getCanonicalFile());

AbstractBackupPath abp = new RetryableCallable<AbstractBackupPath>(3, RetryableCallable.DEFAULT_WAIT_TIME) {
// Allow up to 30s of arbitrary failures at the top level. The upload call itself typically has retries
// as well so this top level retry is on top of those retries. Assuming that each call to upload has
// ~30s maximum of retries this yields about 3.5 minutes of retries at the top level since
// (6 * (5 + 30) = 210 seconds). Even if this fails, however, higher level schedulers (e.g. in
// incremental) will hopefully re-enqueue.
AbstractBackupPath abp = new RetryableCallable<AbstractBackupPath>(6, 5000) {
public AbstractBackupPath retriableCall() throws Exception {
upload(bp);
file.delete();
Expand All @@ -99,33 +104,26 @@ public AbstractBackupPath retriableCall() throws Exception {


/**
* Upload specified file (RandomAccessFile) with retries
* Upload specified file (RandomAccessFile)
*
* @param bp backup path to be uploaded.
*/
protected void upload(final AbstractBackupPath bp) throws Exception {
new RetryableCallable<Void>() {
@Override
public Void retriableCall() throws Exception {
java.io.InputStream is = null;
try {
is = bp.localReader();
if (is == null) {
throw new NullPointerException("Unable to get handle on file: " + bp.fileName);
}
fs.upload(bp, is);
bp.setCompressedFileSize(fs.getBytesUploaded());
return null;
} catch (Exception e) {
logger.error("Exception uploading local file {}, releasing handle, and will retry.", bp.backupFile.getCanonicalFile());
if (is != null) {
is.close();
}
throw e;
}

java.io.InputStream is = null;
try {
is = bp.localReader();
if (is == null) {
throw new NullPointerException("Unable to get handle on file: " + bp.fileName);
}
fs.upload(bp, is);
bp.setCompressedFileSize(fs.getBytesUploaded());
} catch (Exception e) {
logger.error("Exception uploading local file {}, releasing handle, and will retry.", bp.backupFile.getCanonicalFile());
if (is != null) {
is.close();
}
}.call();
throw e;
}
}

protected final void initiateBackup(String monitoringFolder, BackupRestoreUtil backupRestoreUtil) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,39 +48,30 @@ public class CassandraBackupQueueMgr implements ITaskQueueMgr<AbstractBackupPath

@Inject
public CassandraBackupQueueMgr(IConfiguration config) {
tasks = new ArrayBlockingQueue<AbstractBackupPath>(config.getUncrementalBkupQueueSize());
tasksQueued = new HashSet<String>(config.getUncrementalBkupQueueSize()); //Key to task is the S3 absolute path (BASE/REGION/CLUSTER/TOKEN/[yyyymmddhhmm]/[SST|SNP|META]/KEYSPACE/COLUMNFAMILY/FILE
tasks = new ArrayBlockingQueue<AbstractBackupPath>(config.getIncrementalBkupQueueSize());
// Key to task is the S3 absolute path (BASE/REGION/CLUSTER/TOKEN/[yyyymmddhhmm]/[SST|SNP|META]/KEYSPACE/COLUMNFAMILY/FILE
tasksQueued = new HashSet<String>(config.getIncrementalBkupQueueSize());
}

@Override
/*
* Add task to queue if it does not already exist. For performance reasons, this behavior does not acquire a lock on the queue hence
* it is up to the caller to handle possible duplicate tasks.
*
* Note: will block until there is space in the queue.
*/
public void add(AbstractBackupPath task) {
public void add(AbstractBackupPath task)
{
if (!tasksQueued.contains(task.getRemotePath())) {
tasksQueued.add(task.getRemotePath());
try {
tasks.put(task); //block until space becomes available in queue
// block until space becomes available in queue
tasks.put(task);
logger.debug("Queued file {} within CF {}", task.getFileName(), task.getColumnFamily());

} catch (InterruptedException e) {
logger.warn("Interrupted waiting for the task queue to have free space, not fatal will just move on. Error Msg: {}", e.getLocalizedMessage());
tasksQueued.remove(task.getRemotePath());
}
} else {
logger.debug("Already in queue, no-op. File: {}", task.getRemotePath());
}

}

@Override
/*
* Guarantee delivery of a task to only one consumer.
*
* @return task, null if task in queue.
*/
public AbstractBackupPath take() throws InterruptedException {
AbstractBackupPath task = null;
if (!tasks.isEmpty()) {
Expand All @@ -94,34 +85,17 @@ public AbstractBackupPath take() throws InterruptedException {
}

@Override
/*
* @return true if there are more tasks.
*
* Note: this is a best effort so the caller should call me again just before taking a task.
* We anticipate this method will be invoked at a high frequency hence, making it thread-safe will slow down the appliation or
* worse yet, create a deadlock. For example, caller blocks to determine if there are more tasks and also blocks waiting to dequeue
* the task.
*/
public Boolean hasTasks() {
return !tasks.isEmpty();
}

@Override
/*
* A means to perform any post processing once the task has been completed. If post processing is needed,
* the consumer should notify this behavior via callback once the task is completed.
*
* *Note: "completed" here can mean success or failure.
*/
public void taskPostProcessing(AbstractBackupPath completedTask) {
this.tasksQueued.remove(completedTask.getRemotePath());
}

@Override
/*
* @return num of pending tasks. Note, the result is a best guess, don't rely on it to be 100% accurate.
*/
public Integer getNumOfTasksToBeProessed() {
public Integer getNumOfTasksToBeProcessed() {
return tasks.size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,36 @@
*/
public interface ITaskQueueMgr<E> {

/**
* Adds the provided task into the queue if it does not already exist. For performance reasons
* this is best effort and therefore callers are responsible for handling duplicate tasks.
*
* This method will block if the queue of tasks is full
* @param task The task to put onto the queue
*/
void add(E task);

/*
/**
* @return task, null if none is available.
*/
E take() throws InterruptedException;

/*
/**
* @return true if there are tasks within queue to be processed; false otherwise.
*/
Boolean hasTasks();

/*
/**
* A means to perform any post processing once the task has been completed. If post processing is needed,
* the consumer should notify this behavior via callback once the task is completed.
*
* *Note: "completed" here can mean success or failure.
*/
void taskPostProcessing(E completedTask);

Integer getNumOfTasksToBeProcessed();

Integer getNumOfTasksToBeProessed();

/*
/**
* @return true if all tasks completed (includes failures) for a date; false, if at least 1 task is still in queue.
*/
Boolean tasksCompleted(java.util.Date date);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ protected void processColumnFamily(String keyspace, String columnFamily, File ba
try {
final AbstractBackupPath bp = pathFactory.get();
bp.parseLocal(file, BackupFileType.SST);
this.taskQueueMgr.add(bp); //producer -- populate the queue of files. *Note: producer will block if queue is full.
// producer -- populate the queue of files. *Note: producer will block if queue is full.
this.taskQueueMgr.add(bp);
} catch (Exception e) {
logger.warn("Unable to queue incremental file, treating as non-fatal and moving on to next. Msg: {} Fail to queue file: {}",
e.getLocalizedMessage(), file.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,19 @@ public IncrementalConsumer(AbstractBackupPath bp, IBackupFileSystem fs

@Override
/*
* Upload specified file, with retries logic.
* Upload specified file, with retries logic.
* File will be deleted only if uploaded successfully.
*/
public void run() {

logger.info("Consumer - about to upload file: {}", this.bp.getFileName());

try {

new RetryableCallable<Void>() {
// Allow up to 30s of arbitrary failures at the top level. The upload call itself typically has retries
// as well so this top level retry is on top of those retries. Assuming that each call to upload has
// ~30s maximum of retries this yields about 3.5 minutes of retries at the top level since
// (6 * (5 + 30) = 210 seconds). Even if this fails, however, the upload will be re-enqueued
new RetryableCallable<Void>(6, 5000) {
@Override
public Void retriableCall() throws Exception {

Expand All @@ -76,6 +79,9 @@ public Void retriableCall() throws Exception {
if (is == null) {
throw new NullPointerException("Unable to get handle on file: " + bp.getFileName());
}
// Important context: this upload call typically has internal retries but those are only
// to cover over very temporary (<10s) network partitions. For larger partitions re rely on
// higher up retries and re-enqueues.
fs.upload(bp, is);
bp.setCompressedFileSize(fs.getBytesUploaded());
return null;
Expand All @@ -88,16 +94,20 @@ public Void retriableCall() throws Exception {
}
}
}.call();

this.bp.getBackupFile().delete(); //resource cleanup
this.callback.postProcessing(bp); //post processing
// Clean up the underlying file.
bp.getBackupFile().delete();
} catch (Exception e) {
if (e instanceof java.util.concurrent.CancellationException) {
logger.debug("Failed to upload local file {}. Ignoring to continue with rest of backup. Msg: {}", this.bp.getFileName(), e.getLocalizedMessage());
} else {
logger.error("Failed to upload local file {}. Ignoring to continue with rest of backup. Msg: {}", this.bp.getFileName(), e.getLocalizedMessage());
}
} finally {
// post processing must happen regardless of the outcome of the upload. Otherwise we can
// leak tasks into the underlying taskMgr queue and prevent re-enqueues of the upload itself
this.callback.postProcessing(bp);
}
logger.info("Consumer - done with upload file: {}", this.bp.getFileName());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.Nullable;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

/*
* Monitors files to be uploaded and assigns each file to a worker
*/
Expand All @@ -33,7 +41,7 @@ public class IncrementalConsumerMgr implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(IncrementalConsumerMgr.class);

private AtomicBoolean run = new AtomicBoolean(true);
private ThreadPoolExecutor executor;
private ListeningExecutorService executor;
private IBackupFileSystem fs;
private ITaskQueueMgr<AbstractBackupPath> taskQueueMgr;
private BackupPostProcessingCallback<AbstractBackupPath> callback;
Expand All @@ -58,8 +66,8 @@ public IncrementalConsumerMgr(ITaskQueueMgr<AbstractBackupPath> taskQueueMgr, IB
* worker queue. Specifically, the calling will continue to perform the upload unless a worker is avaialble.
*/
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
executor = new ThreadPoolExecutor(maxWorkers, maxWorkers, 60, TimeUnit.SECONDS,
workQueue, rejectedExecutionHandler);
executor = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(maxWorkers, maxWorkers, 60, TimeUnit.SECONDS,
workQueue, rejectedExecutionHandler));

callback = new IncrementalBkupPostProcessing(this.taskQueueMgr);
}
Expand All @@ -78,20 +86,31 @@ public void run() {

while (this.taskQueueMgr.hasTasks()) {
try {
AbstractBackupPath bp = this.taskQueueMgr.take();
final AbstractBackupPath bp = this.taskQueueMgr.take();

IncrementalConsumer task = new IncrementalConsumer(bp, this.fs, this.callback);
executor.submit(task); //non-blocking, will be rejected if the task cannot be scheduled


ListenableFuture<?> upload = executor.submit(task); //non-blocking, will be rejected if the task cannot be scheduled
Futures.addCallback(upload, new FutureCallback<Object>()
{
public void onSuccess(@Nullable Object result) { }

public void onFailure(Throwable t) {
// The post processing hook is responsible for removing the task from the de-duplicating
// HashSet, so we want to do the safe thing here and remove it just in case so the
// producers can re-enqueue this file in the next iteration.
// Note that this should be an abundance of caution as the IncrementalConsumer _should_
// have deleted the task from the queue when it internally failed.
taskQueueMgr.taskPostProcessing(bp);
}
});
} catch (InterruptedException e) {
logger.warn("Was interrupted while wating to dequeued a task. Msgl: {}", e.getLocalizedMessage());
}
}

//Lets not overwhelmend the node hence we will pause before checking the work queue again.
// Lets not overwhelm the node hence we will pause before checking the work queue again.
try {
Thread.currentThread().sleep(IIncrementalBackup.INCREMENTAL_INTERVAL_IN_MILLISECS);
Thread.sleep(IIncrementalBackup.INCREMENTAL_INTERVAL_IN_MILLISECS);
} catch (InterruptedException e) {
logger.warn("Was interrupted while sleeping until next interval run. Msgl: {}", e.getLocalizedMessage());
}
Expand Down
Loading

0 comments on commit d615e6a

Please sign in to comment.