From 43c7f1c77ba906637f4929efccdfc934fbca25e8 Mon Sep 17 00:00:00 2001
From: Arun Agrawal <aagrawal@netflix.com>
Date: Fri, 28 Dec 2018 14:23:10 -0800
Subject: [PATCH] Backup 2.0 Restore

---
 CHANGELOG.md                                  |  10 ++
 .../netflix/priam/backup/AbstractBackup.java  |   4 -
 .../priam/backup/AbstractFileSystem.java      |   4 +-
 .../priam/backup/BackupVerification.java      |  16 +-
 .../priam/backup/CommitLogBackupTask.java     |   8 -
 .../priam/backup/IncrementalBackup.java       |  17 +-
 .../com/netflix/priam/backup/MetaData.java    |  29 ---
 .../netflix/priam/backup/SnapshotBackup.java  |  79 +-------
 .../priam/backupv2/BackupValidator.java       |   8 +-
 .../priam/backupv2/ForgottenFilesManager.java | 147 +++++++++++++++
 .../netflix/priam/backupv2/IMetaProxy.java    |  11 ++
 .../netflix/priam/backupv2/MetaV1Proxy.java   |  24 ++-
 .../netflix/priam/backupv2/MetaV2Proxy.java   |  71 +++++++-
 .../priam/config/BackupRestoreConfig.java     |  10 ++
 .../priam/config/IBackupRestoreConfig.java    |  14 +-
 .../netflix/priam/config/IConfiguration.java  |  13 ++
 .../priam/config/PriamConfiguration.java      |  10 ++
 .../priam/restore/AbstractRestore.java        | 116 +++++++-----
 .../priam/services/SnapshotMetaService.java   |  11 +-
 .../priam/backup/FakeBackupFileSystem.java    |   3 +-
 .../priam/backup/TestAbstractFileSystem.java  |  10 +-
 .../priam/backup/TestS3FileSystem.java        |   1 +
 .../backupv2/TestForgottenFileManager.java    | 169 ++++++++++++++++++
 .../priam/backupv2/TestMetaFileManager.java   |  98 ----------
 .../priam/backupv2/TestMetaV2Proxy.java       |  85 ++++++++-
 .../priam/config/FakeBackupRestoreConfig.java |   5 +
 .../{backup => restore}/TestRestore.java      |  52 +++++-
 27 files changed, 712 insertions(+), 313 deletions(-)
 create mode 100644 priam/src/main/java/com/netflix/priam/backupv2/ForgottenFilesManager.java
 create mode 100644 priam/src/test/java/com/netflix/priam/backupv2/TestForgottenFileManager.java
 delete mode 100644 priam/src/test/java/com/netflix/priam/backupv2/TestMetaFileManager.java
 rename priam/src/test/java/com/netflix/priam/{backup => restore}/TestRestore.java (71%)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7d9629c46..c1926cfce 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,14 @@
 # Changelog
+
+## 2018/01/11 3.11.38
+(#761) Add new file format (SST_V2) and methods to get/parse remote locations.
+(#761) Upload files from SnapshotMetaService in backup version 2.0, if enabled.
+(#761) Process older SNAPSHOT_V2 at the restart of Priam.
+(#767) Backup Verification for Backup 2.0.
+(#767) Restore for Backup 2.0
+(#767) Some API changes for Snapshot Verification
+(#767) Remove deprecated code like flush hour or snapshot hour.
+
 ## 2018/10/29 3.11.37
 * Bug Fix: SnapshotMetaService can leave snapshots if there is any error. 
 * Bug Fix: SnapshotMetaService should continue building snapshot even if an unexpected file is found in snapshot. 
diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java b/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java
index cec5c53c9..3f17facd8 100644
--- a/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java
+++ b/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java
@@ -106,7 +106,6 @@ protected List<AbstractBackupPath> upload(
                             true);
 
                 bps.add(bp);
-                addToRemotePath(bp.getRemotePath());
             }
         }
 
@@ -183,7 +182,4 @@ private boolean isValidBackupDir(File keyspaceDir, File backupDir) {
 
         return true;
     }
-
-    /** Adds Remote path to the list of Remote Paths */
-    protected abstract void addToRemotePath(String remotePath);
 }
diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java
index e2b79298a..c8126daff 100644
--- a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java
+++ b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java
@@ -115,8 +115,8 @@ public Future<Path> asyncDownloadFile(
     public void downloadFile(final Path remotePath, final Path localPath, final int retry)
             throws BackupRestoreException {
         // TODO: Should we download the file if localPath already exists?
-        if (remotePath == null) return;
-
+        if (remotePath == null || localPath == null) return;
+        localPath.toFile().getParentFile().mkdirs();
         logger.info("Downloading file: {} to location: {}", remotePath, localPath);
         try {
             new BoundedExponentialRetryCallable<Void>(500, 10000, retry) {
diff --git a/priam/src/main/java/com/netflix/priam/backup/BackupVerification.java b/priam/src/main/java/com/netflix/priam/backup/BackupVerification.java
index 8e7bd771e..13e8d881c 100644
--- a/priam/src/main/java/com/netflix/priam/backup/BackupVerification.java
+++ b/priam/src/main/java/com/netflix/priam/backup/BackupVerification.java
@@ -21,7 +21,6 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.*;
-import java.util.stream.Collectors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,22 +45,21 @@ public class BackupVerification {
     }
 
     public Optional<BackupMetadata> getLatestBackupMetaData(List<BackupMetadata> metadata) {
-        metadata =
-                metadata.stream()
-                        .filter(backupMetadata -> backupMetadata.getStatus() == Status.FINISHED)
-                        .collect(Collectors.toList());
-        metadata.sort((o1, o2) -> o2.getStart().compareTo(o1.getStart()));
-        return metadata.stream().findFirst();
+        return metadata.stream()
+                .filter(backupMetadata -> backupMetadata != null)
+                .filter(backupMetadata -> backupMetadata.getStatus() == Status.FINISHED)
+                .sorted(Comparator.comparing(BackupMetadata::getStart).reversed())
+                .findFirst();
     }
 
     public Optional<BackupVerificationResult> verifyBackup(List<BackupMetadata> metadata) {
-        if (metadata == null || metadata.isEmpty()) return null;
+        if (metadata == null || metadata.isEmpty()) return Optional.empty();
 
         Optional<BackupMetadata> latestBackupMetaData = getLatestBackupMetaData(metadata);
 
         if (!latestBackupMetaData.isPresent()) {
             logger.error("No backup found which finished during the time provided.");
-            return null;
+            return Optional.empty();
         }
 
         Path metadataLocation = Paths.get(latestBackupMetaData.get().getSnapshotLocation());
diff --git a/priam/src/main/java/com/netflix/priam/backup/CommitLogBackupTask.java b/priam/src/main/java/com/netflix/priam/backup/CommitLogBackupTask.java
index 894b8da0b..18ca2566c 100644
--- a/priam/src/main/java/com/netflix/priam/backup/CommitLogBackupTask.java
+++ b/priam/src/main/java/com/netflix/priam/backup/CommitLogBackupTask.java
@@ -20,8 +20,6 @@
 import com.netflix.priam.scheduler.SimpleTimer;
 import com.netflix.priam.scheduler.TaskTimer;
 import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,7 +29,6 @@ public class CommitLogBackupTask extends AbstractBackup {
     public static final String JOBNAME = "CommitLogBackup";
 
     private static final Logger logger = LoggerFactory.getLogger(CommitLogBackupTask.class);
-    private final List<String> clRemotePaths = new ArrayList<>();
     private final CommitLogBackup clBackup;
 
     @Inject
@@ -70,9 +67,4 @@ protected void processColumnFamily(String keyspace, String columnFamily, File ba
             throws Exception {
         // Do nothing.
     }
-
-    @Override
-    protected void addToRemotePath(String remotePath) {
-        clRemotePaths.add(remotePath);
-    }
 }
diff --git a/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java b/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java
index 610d7f894..627c303e5 100644
--- a/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java
+++ b/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java
@@ -20,12 +20,12 @@
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
 import com.netflix.priam.backup.AbstractBackupPath.BackupFileType;
+import com.netflix.priam.config.IBackupRestoreConfig;
 import com.netflix.priam.config.IConfiguration;
 import com.netflix.priam.scheduler.SimpleTimer;
 import com.netflix.priam.scheduler.TaskTimer;
 import com.netflix.priam.utils.DateUtil;
 import java.io.File;
-import java.util.ArrayList;
 import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,13 +37,14 @@
 public class IncrementalBackup extends AbstractBackup {
     private static final Logger logger = LoggerFactory.getLogger(IncrementalBackup.class);
     public static final String JOBNAME = "IncrementalBackup";
-    private final List<String> incrementalRemotePaths = new ArrayList<>();
     private final IncrementalMetaData metaData;
     private final BackupRestoreUtil backupRestoreUtil;
+    private final IBackupRestoreConfig backupRestoreConfig;
 
     @Inject
     public IncrementalBackup(
             IConfiguration config,
+            IBackupRestoreConfig backupRestoreConfig,
             Provider<AbstractBackupPath> pathFactory,
             IFileSystemContext backupFileSystemCtx,
             IncrementalMetaData metaData) {
@@ -51,6 +52,7 @@ public IncrementalBackup(
         // a means to upload audit trail (via meta_cf_yyyymmddhhmm.json) of files successfully
         // uploaded)
         this.metaData = metaData;
+        this.backupRestoreConfig = backupRestoreConfig;
         backupRestoreUtil =
                 new BackupRestoreUtil(
                         config.getIncrementalIncludeCFList(), config.getIncrementalExcludeCFList());
@@ -59,7 +61,6 @@ public IncrementalBackup(
     @Override
     public void execute() throws Exception {
         // Clearing remotePath List
-        incrementalRemotePaths.clear();
         initiateBackup(INCREMENTAL_BACKUP_FOLDER, backupRestoreUtil);
     }
 
@@ -76,8 +77,11 @@ public String getName() {
     @Override
     protected void processColumnFamily(String keyspace, String columnFamily, File backupDir)
             throws Exception {
+        BackupFileType fileType = BackupFileType.SST;
+        if (backupRestoreConfig.enableV2Backups()) fileType = BackupFileType.SST_V2;
+
         List<AbstractBackupPath> uploadedFiles =
-                upload(backupDir, BackupFileType.SST, config.enableAsyncIncremental(), true);
+                upload(backupDir, fileType, config.enableAsyncIncremental(), true);
 
         if (!uploadedFiles.isEmpty()) {
             // format of yyyymmddhhmm (e.g. 201505060901)
@@ -90,9 +94,4 @@ protected void processColumnFamily(String keyspace, String columnFamily, File ba
             logger.info("Uploaded meta file for incremental backup: {}", metaFileName);
         }
     }
-
-    @Override
-    protected void addToRemotePath(String remotePath) {
-        incrementalRemotePaths.add(remotePath);
-    }
 }
diff --git a/priam/src/main/java/com/netflix/priam/backup/MetaData.java b/priam/src/main/java/com/netflix/priam/backup/MetaData.java
index 67be8ae4c..be7a0bdab 100644
--- a/priam/src/main/java/com/netflix/priam/backup/MetaData.java
+++ b/priam/src/main/java/com/netflix/priam/backup/MetaData.java
@@ -16,14 +16,12 @@
  */
 package com.netflix.priam.backup;
 
-import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.netflix.priam.backup.AbstractBackupPath.BackupFileType;
 import com.netflix.priam.config.IConfiguration;
 import com.netflix.priam.utils.DateUtil;
 import java.io.File;
-import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.nio.file.Paths;
@@ -32,7 +30,6 @@
 import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.json.simple.JSONArray;
-import org.json.simple.parser.JSONParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -117,30 +114,4 @@ public File createTmpMetaFile() throws IOException {
     private void addToRemotePath(String remotePath) {
         metaRemotePaths.add(remotePath);
     }
-
-    public List<AbstractBackupPath> toJson(File input) {
-        List<AbstractBackupPath> files = Lists.newArrayList();
-        try {
-            JSONArray jsonObj = (JSONArray) new JSONParser().parse(new FileReader(input));
-            for (Object aJsonObj : jsonObj) {
-                AbstractBackupPath p = pathFactory.get();
-                p.parseRemote((String) aJsonObj);
-                files.add(p);
-            }
-
-        } catch (Exception ex) {
-            throw new RuntimeException(
-                    "Error transforming file "
-                            + input.getAbsolutePath()
-                            + " to JSON format.  Msg:"
-                            + ex.getLocalizedMessage(),
-                    ex);
-        }
-
-        logger.debug(
-                "Transformed file {} to JSON.  Number of JSON elements: {}",
-                input.getAbsolutePath(),
-                files.size());
-        return files;
-    }
 }
diff --git a/priam/src/main/java/com/netflix/priam/backup/SnapshotBackup.java b/priam/src/main/java/com/netflix/priam/backup/SnapshotBackup.java
index 785079a15..ff5efd488 100644
--- a/priam/src/main/java/com/netflix/priam/backup/SnapshotBackup.java
+++ b/priam/src/main/java/com/netflix/priam/backup/SnapshotBackup.java
@@ -21,6 +21,7 @@
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
 import com.netflix.priam.backup.AbstractBackupPath.BackupFileType;
+import com.netflix.priam.backupv2.ForgottenFilesManager;
 import com.netflix.priam.config.IConfiguration;
 import com.netflix.priam.defaultimpl.CassandraOperations;
 import com.netflix.priam.identity.InstanceIdentity;
@@ -30,6 +31,7 @@
 import com.netflix.priam.utils.DateUtil;
 import com.netflix.priam.utils.ThreadSleeper;
 import java.io.File;
+import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -42,13 +44,14 @@ public class SnapshotBackup extends AbstractBackup {
     private static final Logger logger = LoggerFactory.getLogger(SnapshotBackup.class);
     public static final String JOBNAME = "SnapshotBackup";
     private final MetaData metaData;
-    private final List<String> snapshotRemotePaths = new ArrayList<>();
     private final ThreadSleeper sleeper = new ThreadSleeper();
     private static final long WAIT_TIME_MS = 60 * 1000 * 10;
     private final InstanceIdentity instanceIdentity;
     private final IBackupStatusMgr snapshotStatusMgr;
     private final BackupRestoreUtil backupRestoreUtil;
+    private final ForgottenFilesManager forgottenFilesManager;
     private String snapshotName = null;
+    private Instant snapshotInstant = DateUtil.getInstant();
     private List<AbstractBackupPath> abstractBackupPaths = null;
     private final CassandraOperations cassandraOperations;
     private static final Lock lock = new ReentrantLock();
@@ -61,7 +64,8 @@ public SnapshotBackup(
             IFileSystemContext backupFileSystemCtx,
             IBackupStatusMgr snapshotStatusMgr,
             InstanceIdentity instanceIdentity,
-            CassandraOperations cassandraOperations) {
+            CassandraOperations cassandraOperations,
+            ForgottenFilesManager forgottenFilesManager) {
         super(config, backupFileSystemCtx, pathFactory);
         this.metaData = metaData;
         this.snapshotStatusMgr = snapshotStatusMgr;
@@ -70,6 +74,7 @@ public SnapshotBackup(
         backupRestoreUtil =
                 new BackupRestoreUtil(
                         config.getSnapshotIncludeCFList(), config.getSnapshotExcludeCFList());
+        this.forgottenFilesManager = forgottenFilesManager;
     }
 
     @Override
@@ -100,6 +105,7 @@ public void execute() throws Exception {
     private void executeSnapshot() throws Exception {
         Date startTime = Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime();
         snapshotName = DateUtil.formatyyyyMMddHHmm(startTime);
+        snapshotInstant = DateUtil.getInstant();
         String token = instanceIdentity.getInstance().getToken();
 
         // Save start snapshot status
@@ -108,8 +114,6 @@ private void executeSnapshot() throws Exception {
 
         try {
             logger.info("Starting snapshot {}", snapshotName);
-            // Clearing remotePath List
-            snapshotRemotePaths.clear();
             cassandraOperations.takeSnapshot(snapshotName);
 
             // Collect all snapshot dir's under keyspace dir's
@@ -177,74 +181,9 @@ protected void processColumnFamily(String keyspace, String columnFamily, File ba
             return;
         }
 
+        forgottenFilesManager.findAndMoveForgottenFiles(snapshotInstant, snapshotDir);
         // Add files to this dir
         abstractBackupPaths.addAll(
                 upload(snapshotDir, BackupFileType.SNAP, config.enableAsyncSnapshot(), true));
     }
-
-    //    private void findForgottenFiles(File snapshotDir) {
-    //        try {
-    //            Collection<File> snapshotFiles = FileUtils.listFiles(snapshotDir,
-    // FileFilterUtils.fileFileFilter(), null);
-    //            File columnfamilyDir = snapshotDir.getParentFile().getParentFile();
-    //
-    //            //Find all the files in columnfamily folder which is :
-    //            // 1. Not a temp file.
-    //            // 2. Is a file. (we don't care about directories)
-    //            // 3. Is older than snapshot time, as new files keep getting created after taking
-    // a snapshot.
-    //            IOFileFilter tmpFileFilter1 = FileFilterUtils.suffixFileFilter(TMP_EXT);
-    //            IOFileFilter tmpFileFilter2 = FileFilterUtils.asFileFilter(pathname ->
-    // tmpFilePattern.matcher(pathname.getName()).matches());
-    //            IOFileFilter tmpFileFilter = FileFilterUtils.or(tmpFileFilter1, tmpFileFilter2);
-    //            // Here we are allowing files which were more than
-    // @link{IConfiguration#getForgottenFileGracePeriodDays}. We do this to allow cassandra to
-    //            // clean up any files which were generated as part of repair/compaction and
-    // cleanup thread has not already deleted.
-    //            // Refer to https://issues.apache.org/jira/browse/CASSANDRA-6756 and
-    // https://issues.apache.org/jira/browse/CASSANDRA-7066
-    //            // for more information.
-    //            IOFileFilter ageFilter =
-    // FileFilterUtils.ageFileFilter(snapshotInstant.minus(config.getForgottenFileGracePeriodDays(),
-    // ChronoUnit.DAYS).toEpochMilli());
-    //            IOFileFilter fileFilter =
-    // FileFilterUtils.and(FileFilterUtils.notFileFilter(tmpFileFilter),
-    // FileFilterUtils.fileFileFilter(), ageFilter);
-    //
-    //            Collection<File> columnfamilyFiles = FileUtils.listFiles(columnfamilyDir,
-    // fileFilter, null);
-    //
-    //            //Remove the SSTable(s) which are part of snapshot from the CF file list.
-    //            //This cannot be a simple removeAll as snapshot files have "different" file folder
-    // prefix.
-    //            for (File file : snapshotFiles) {
-    //                //Get its parent directory file based on this file.
-    //                File originalFile = new File(columnfamilyDir, file.getName());
-    //                columnfamilyFiles.remove(originalFile);
-    //            }
-    //
-    //            //If there are no "extra" SSTables in CF data folder, we are done.
-    //            if (columnfamilyFiles.size() == 0)
-    //                return;
-    //
-    //            columnfamilyFiles.parallelStream().forEach(file -> logger.info("Forgotten file: {}
-    // found for CF: {}", file.getAbsolutePath(), columnfamilyDir.getName()));
-    //
-    //            //TODO: The eventual plan is to move the forgotten files to a lost+found directory
-    // and clean the directory after 'x' amount of time. This behavior should be configurable.
-    //            backupMetrics.incrementForgottenFiles(columnfamilyFiles.size());
-    //            logger.warn("# of forgotten files: {} found for CF: {}", columnfamilyFiles.size(),
-    // columnfamilyDir.getName());
-    //        } catch (Exception e) {
-    //            //Eat the exception, if there, for any reason. This should not stop the snapshot
-    // for any reason.
-    //            logger.error("Exception occurred while trying to find forgottenFile. Ignoring the
-    // error and continuing with remaining backup", e);
-    //        }
-    //    }
-
-    @Override
-    protected void addToRemotePath(String remotePath) {
-        snapshotRemotePaths.add(remotePath);
-    }
 }
diff --git a/priam/src/main/java/com/netflix/priam/backupv2/BackupValidator.java b/priam/src/main/java/com/netflix/priam/backupv2/BackupValidator.java
index bd6b332cf..2b7a7d3e4 100644
--- a/priam/src/main/java/com/netflix/priam/backupv2/BackupValidator.java
+++ b/priam/src/main/java/com/netflix/priam/backupv2/BackupValidator.java
@@ -18,7 +18,6 @@
 package com.netflix.priam.backupv2;
 
 import com.netflix.priam.backup.*;
-import com.netflix.priam.config.IConfiguration;
 import com.netflix.priam.utils.DateUtil;
 import java.util.List;
 import java.util.Optional;
@@ -33,15 +32,10 @@
  */
 public class BackupValidator {
     private static final Logger logger = LoggerFactory.getLogger(BackupVerification.class);
-    private final IBackupFileSystem fs;
     private IMetaProxy metaProxy;
 
     @Inject
-    public BackupValidator(
-            IConfiguration configuration,
-            IFileSystemContext backupFileSystemCtx,
-            @Named("v2") IMetaProxy metaProxy) {
-        fs = backupFileSystemCtx.getFileStrategy(configuration);
+    public BackupValidator(@Named("v2") IMetaProxy metaProxy) {
         this.metaProxy = metaProxy;
     }
 
diff --git a/priam/src/main/java/com/netflix/priam/backupv2/ForgottenFilesManager.java b/priam/src/main/java/com/netflix/priam/backupv2/ForgottenFilesManager.java
new file mode 100644
index 000000000..4c8884008
--- /dev/null
+++ b/priam/src/main/java/com/netflix/priam/backupv2/ForgottenFilesManager.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2019 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.netflix.priam.backupv2;
+
+import com.google.inject.Inject;
+import com.netflix.priam.config.IConfiguration;
+import com.netflix.priam.merics.BackupMetrics;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Collection;
+import java.util.regex.Pattern;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.FileFilterUtils;
+import org.apache.commons.io.filefilter.IOFileFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Created by aagrawal on 1/1/19. */
+public class ForgottenFilesManager {
+    private static final Logger logger = LoggerFactory.getLogger(ForgottenFilesManager.class);
+
+    private BackupMetrics backupMetrics;
+    private IConfiguration config;
+    private static final String TMP_EXT = ".tmp";
+
+    @SuppressWarnings("Annotator")
+    private static final Pattern tmpFilePattern =
+            Pattern.compile("^((.*)\\-(.*)\\-)?tmp(link)?\\-((?:l|k).)\\-(\\d)*\\-(.*)$");
+
+    protected static final String LOST_FOUND = "lost+found";
+
+    @Inject
+    public ForgottenFilesManager(IConfiguration configuration, BackupMetrics backupMetrics) {
+        this.config = configuration;
+        this.backupMetrics = backupMetrics;
+    }
+
+    public void findAndMoveForgottenFiles(Instant snapshotInstant, File snapshotDir) {
+        try {
+            Collection<File> snapshotFiles =
+                    FileUtils.listFiles(snapshotDir, FileFilterUtils.fileFileFilter(), null);
+            File columnfamilyDir = snapshotDir.getParentFile().getParentFile();
+            Collection<File> columnfamilyFiles =
+                    getColumnfamilyFiles(snapshotInstant, columnfamilyDir);
+
+            // Remove the SSTable(s) which are part of snapshot from the CF file list.
+            // This cannot be a simple removeAll as snapshot files have "different" file folder
+            // prefix.
+            for (File file : snapshotFiles) {
+                // Get its parent directory file based on this file.
+                File originalFile = new File(columnfamilyDir, file.getName());
+                columnfamilyFiles.remove(originalFile);
+            }
+
+            // If there are no "extra" SSTables in CF data folder, we are done.
+            if (columnfamilyFiles.size() == 0) return;
+
+            logger.warn(
+                    "# of forgotten files: {} found for CF: {}",
+                    columnfamilyFiles.size(),
+                    columnfamilyDir.getName());
+            backupMetrics.incrementForgottenFiles(columnfamilyFiles.size());
+
+            // Move the files to lost_found directory if configured.
+            moveForgottenFiles(columnfamilyDir, columnfamilyFiles);
+
+        } catch (Exception e) {
+            // Eat the exception, if there, for any reason. This should not stop the snapshot for
+            // any reason.
+            logger.error(
+                    "Exception occurred while trying to find forgottenFile. Ignoring the error and continuing with remaining backup",
+                    e);
+            e.printStackTrace();
+        }
+    }
+
+    protected Collection<File> getColumnfamilyFiles(Instant snapshotInstant, File columnfamilyDir) {
+        // Find all the files in columnfamily folder which is :
+        // 1. Not a temp file.
+        // 2. Is a file. (we don't care about directories)
+        // 3. Is older than snapshot time, as new files keep getting created after taking a
+        // snapshot.
+        IOFileFilter tmpFileFilter1 = FileFilterUtils.suffixFileFilter(TMP_EXT);
+        IOFileFilter tmpFileFilter2 =
+                FileFilterUtils.asFileFilter(
+                        pathname -> tmpFilePattern.matcher(pathname.getName()).matches());
+        IOFileFilter tmpFileFilter = FileFilterUtils.or(tmpFileFilter1, tmpFileFilter2);
+        // Here we are allowing files which were more than
+        // @link{IConfiguration#getForgottenFileGracePeriodDays}. We do this to allow cassandra
+        // to clean up any files which were generated as part of repair/compaction and cleanup
+        // thread has not already deleted.
+        // Refer to https://issues.apache.org/jira/browse/CASSANDRA-6756 and
+        // https://issues.apache.org/jira/browse/CASSANDRA-7066
+        // for more information.
+        IOFileFilter ageFilter =
+                FileFilterUtils.ageFileFilter(
+                        snapshotInstant
+                                .minus(config.getForgottenFileGracePeriodDays(), ChronoUnit.DAYS)
+                                .toEpochMilli());
+        IOFileFilter fileFilter =
+                FileFilterUtils.and(
+                        FileFilterUtils.notFileFilter(tmpFileFilter),
+                        FileFilterUtils.fileFileFilter(),
+                        ageFilter);
+
+        return FileUtils.listFiles(columnfamilyDir, fileFilter, null);
+    }
+
+    protected void moveForgottenFiles(File columnfamilyDir, Collection<File> columnfamilyFiles) {
+        final Path destDir = Paths.get(columnfamilyDir.getAbsolutePath(), LOST_FOUND);
+        for (File file : columnfamilyFiles) {
+            logger.warn(
+                    "Forgotten file: {} found for CF: {}",
+                    file.getAbsolutePath(),
+                    columnfamilyDir.getName());
+            if (config.isForgottenFileMoveEnabled()) {
+                try {
+                    FileUtils.moveFileToDirectory(file, destDir.toFile(), true);
+                } catch (IOException e) {
+                    logger.error(
+                            "Exception occurred while trying to move forgottenFile: {}. Ignoring the error and continuing with remaining backup/forgotten files.",
+                            file);
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+}
diff --git a/priam/src/main/java/com/netflix/priam/backupv2/IMetaProxy.java b/priam/src/main/java/com/netflix/priam/backupv2/IMetaProxy.java
index 4c173b599..99c52abb2 100644
--- a/priam/src/main/java/com/netflix/priam/backupv2/IMetaProxy.java
+++ b/priam/src/main/java/com/netflix/priam/backupv2/IMetaProxy.java
@@ -22,6 +22,7 @@
 import com.netflix.priam.backup.BackupVerificationResult;
 import com.netflix.priam.utils.DateUtil;
 import java.nio.file.Path;
+import java.util.Iterator;
 import java.util.List;
 
 /** Proxy to do management tasks for meta files. Created by aagrawal on 12/18/18. */
@@ -71,6 +72,16 @@ public interface IMetaProxy {
      */
     List<String> getSSTFilesFromMeta(Path localMetaPath) throws Exception;
 
+    /**
+     * Get the list of incremental files given the daterange.
+     *
+     * @param dateRange the time period to scan in the remote file system for incremental files.
+     * @return iterator containing the list of path on the remote file system satisfying criteria.
+     * @throws BackupRestoreException if there is an issue contacting remote file system.
+     */
+    Iterator<AbstractBackupPath> getIncrementals(DateUtil.DateRange dateRange)
+            throws BackupRestoreException;
+
     /**
      * Validate that all the files mentioned in the meta file actually exists on remote file system.
      *
diff --git a/priam/src/main/java/com/netflix/priam/backupv2/MetaV1Proxy.java b/priam/src/main/java/com/netflix/priam/backupv2/MetaV1Proxy.java
index 3c71101c6..cfce6ae81 100644
--- a/priam/src/main/java/com/netflix/priam/backupv2/MetaV1Proxy.java
+++ b/priam/src/main/java/com/netflix/priam/backupv2/MetaV1Proxy.java
@@ -29,6 +29,7 @@
 import java.time.temporal.ChronoUnit;
 import java.util.*;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.iterators.FilterIterator;
 import org.apache.commons.io.FileUtils;
 import org.json.simple.parser.JSONParser;
 import org.slf4j.Logger;
@@ -38,11 +39,9 @@
 public class MetaV1Proxy implements IMetaProxy {
     private static final Logger logger = LoggerFactory.getLogger(MetaV1Proxy.class);
     private final IBackupFileSystem fs;
-    private final IConfiguration configuration;
 
     @Inject
     MetaV1Proxy(IConfiguration configuration, IFileSystemContext backupFileSystemCtx) {
-        this.configuration = configuration;
         fs = backupFileSystemCtx.getFileStrategy(configuration);
     }
 
@@ -71,12 +70,12 @@ public List<AbstractBackupPath> findMetaFiles(DateUtil.DateRange dateRange) {
             if (path.getType() == AbstractBackupPath.BackupFileType.META)
                 // Since there are now meta file for incrementals as well as snapshot, we need to
                 // find the correct one (i.e. the snapshot meta file (meta.json))
-                if (path.getType() == AbstractBackupPath.BackupFileType.META) {
+                if (path.getFileName().equalsIgnoreCase("meta.json")) {
                     metas.add(path);
                 }
         }
 
-        Collections.sort(metas, Collections.reverseOrder());
+        metas.sort(Collections.reverseOrder());
 
         if (metas.size() == 0) {
             logger.info(
@@ -102,7 +101,7 @@ public BackupVerificationResult isMetaFileValid(AbstractBackupPath metaBackupPat
             result.manifestAvailable = true;
 
             // List the remote file system to validate the backup.
-            String prefix = configuration.getBackupPrefix();
+            String prefix = fs.getPrefix().toString();
             Date strippedMsSnapshotTime =
                     new Date(result.snapshotInstant.truncatedTo(ChronoUnit.MINUTES).toEpochMilli());
             Iterator<AbstractBackupPath> backupfiles =
@@ -170,6 +169,21 @@ public List<String> getSSTFilesFromMeta(Path localMetaPath) throws Exception {
         return result;
     }
 
+    @Override
+    public Iterator<AbstractBackupPath> getIncrementals(DateUtil.DateRange dateRange)
+            throws BackupRestoreException {
+        String prefix = fs.getPrefix().toString();
+        Iterator<AbstractBackupPath> iterator =
+                fs.list(
+                        prefix,
+                        new Date(dateRange.getStartTime().toEpochMilli()),
+                        new Date(dateRange.getEndTime().toEpochMilli()));
+        return new FilterIterator<>(
+                iterator,
+                abstractBackupPath ->
+                        abstractBackupPath.getType() == AbstractBackupPath.BackupFileType.SST);
+    }
+
     @Override
     public void cleanupOldMetaFiles() {}
 }
diff --git a/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java b/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java
index fa9df765d..2a4167b1f 100644
--- a/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java
+++ b/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java
@@ -28,6 +28,8 @@
 import java.nio.file.Paths;
 import java.util.*;
 import javax.inject.Inject;
+import org.apache.commons.collections4.iterators.FilterIterator;
+import org.apache.commons.collections4.iterators.TransformIterator;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.filefilter.IOFileFilter;
@@ -59,18 +61,61 @@ public Path getLocalMetaFileDirectory() {
 
     @Override
     public String getMetaPrefix(DateUtil.DateRange dateRange) {
+        return getMatch(dateRange, AbstractBackupPath.BackupFileType.META_V2);
+    }
+
+    private String getMatch(
+            DateUtil.DateRange dateRange, AbstractBackupPath.BackupFileType backupFileType) {
         Path location = fs.getPrefix();
         AbstractBackupPath abstractBackupPath = abstractBackupPathProvider.get();
         String match = StringUtils.EMPTY;
         if (dateRange != null) match = dateRange.match();
+        if (dateRange != null && dateRange.getEndTime() == null)
+            match = dateRange.getStartTime().toEpochMilli() + "";
         return Paths.get(
-                        abstractBackupPath
-                                .remoteV2Prefix(location, AbstractBackupPath.BackupFileType.META_V2)
-                                .toString(),
+                        abstractBackupPath.remoteV2Prefix(location, backupFileType).toString(),
                         match)
                 .toString();
     }
 
+    @Override
+    public Iterator<AbstractBackupPath> getIncrementals(DateUtil.DateRange dateRange)
+            throws BackupRestoreException {
+        String incrementalPrefix = getMatch(dateRange, AbstractBackupPath.BackupFileType.SST_V2);
+        String marker =
+                getMatch(
+                        new DateUtil.DateRange(dateRange.getStartTime(), null),
+                        AbstractBackupPath.BackupFileType.SST_V2);
+        logger.info(
+                "Listing filesystem with prefix: {}, marker: {}, daterange: {}",
+                incrementalPrefix,
+                marker,
+                dateRange);
+        Iterator<String> iterator = fs.listFileSystem(incrementalPrefix, null, marker);
+        Iterator<AbstractBackupPath> transformIterator =
+                new TransformIterator<>(
+                        iterator,
+                        s -> {
+                            AbstractBackupPath path = abstractBackupPathProvider.get();
+                            path.parseRemote(s);
+                            return path;
+                        });
+
+        return new FilterIterator<>(
+                transformIterator,
+                abstractBackupPath ->
+                        (abstractBackupPath.getLastModified().isAfter(dateRange.getStartTime())
+                                        && abstractBackupPath
+                                                .getLastModified()
+                                                .isBefore(dateRange.getEndTime()))
+                                || abstractBackupPath
+                                        .getLastModified()
+                                        .equals(dateRange.getStartTime())
+                                || abstractBackupPath
+                                        .getLastModified()
+                                        .equals(dateRange.getEndTime()));
+    }
+
     @Override
     public List<AbstractBackupPath> findMetaFiles(DateUtil.DateRange dateRange) {
         ArrayList<AbstractBackupPath> metas = new ArrayList<>();
@@ -95,7 +140,7 @@ public List<AbstractBackupPath> findMetaFiles(DateUtil.DateRange dateRange) {
             }
         }
 
-        Collections.sort(metas, Collections.reverseOrder());
+        metas.sort(Collections.reverseOrder());
 
         if (metas.size() == 0) {
             logger.info(
@@ -136,7 +181,9 @@ public void cleanupOldMetaFiles() {
 
     @Override
     public List<String> getSSTFilesFromMeta(Path localMetaPath) throws Exception {
-        return null;
+        MetaFileBackupWalker metaFileBackupWalker = new MetaFileBackupWalker();
+        metaFileBackupWalker.readMeta(localMetaPath);
+        return metaFileBackupWalker.backupRemotePaths;
     }
 
     @Override
@@ -191,4 +238,18 @@ public void process(ColumnfamilyResult columnfamilyResult) {
             }
         }
     }
+
+    private class MetaFileBackupWalker extends MetaFileReader {
+        private List<String> backupRemotePaths = new ArrayList<>();
+
+        @Override
+        public void process(ColumnfamilyResult columnfamilyResult) {
+            for (ColumnfamilyResult.SSTableResult ssTableResult :
+                    columnfamilyResult.getSstables()) {
+                for (FileUploadResult fileUploadResult : ssTableResult.getSstableComponents()) {
+                    backupRemotePaths.add(fileUploadResult.getBackupPath());
+                }
+            }
+        }
+    }
 }
diff --git a/priam/src/main/java/com/netflix/priam/config/BackupRestoreConfig.java b/priam/src/main/java/com/netflix/priam/config/BackupRestoreConfig.java
index a0e0957ac..aae6b6739 100644
--- a/priam/src/main/java/com/netflix/priam/config/BackupRestoreConfig.java
+++ b/priam/src/main/java/com/netflix/priam/config/BackupRestoreConfig.java
@@ -35,4 +35,14 @@ public String getSnapshotMetaServiceCronExpression() {
     public boolean enableV2Backups() {
         return config.get("priam.enableV2Backups", false);
     }
+
+    @Override
+    public boolean enableV2Restore() {
+        return config.get("priam.enableV2Restore", false);
+    }
+
+    @Override
+    public String getBackupTTLCronExpression() {
+        return config.get("priam.backupTTLCronExpression", "0 0 0/6 1/1 * ? *");
+    }
 }
diff --git a/priam/src/main/java/com/netflix/priam/config/IBackupRestoreConfig.java b/priam/src/main/java/com/netflix/priam/config/IBackupRestoreConfig.java
index 86d366a0e..bfdfbca4f 100644
--- a/priam/src/main/java/com/netflix/priam/config/IBackupRestoreConfig.java
+++ b/priam/src/main/java/com/netflix/priam/config/IBackupRestoreConfig.java
@@ -35,8 +35,8 @@ default String getSnapshotMetaServiceCronExpression() {
     }
 
     /**
-     * Enable the backup version 2.0 in new format. This will start uploading of backups in new
-     * format. This is to be used for migration from backup version 1.0.
+     * Enable the backup version 2.0 in new format. This will start uploads of "incremental" backups
+     * in new format. This is to be used for migration from backup version 1.0.
      *
      * @return boolean value indicating if backups in version 2.0 should be started.
      */
@@ -59,4 +59,14 @@ default boolean enableV2Backups() {
     default String getBackupTTLCronExpression() {
         return "0 0 0/6 1/1 * ? *";
     }
+
+    /**
+     * If restore is enabled and if this flag is enabled, we will try to restore using Backup V2.0.
+     *
+     * @return if restore should be using backup version 2.0. If this is false we will use backup
+     *     version 1.0.
+     */
+    default boolean enableV2Restore() {
+        return false;
+    }
 }
diff --git a/priam/src/main/java/com/netflix/priam/config/IConfiguration.java b/priam/src/main/java/com/netflix/priam/config/IConfiguration.java
index 5fdd0be8f..b5def0c87 100644
--- a/priam/src/main/java/com/netflix/priam/config/IConfiguration.java
+++ b/priam/src/main/java/com/netflix/priam/config/IConfiguration.java
@@ -980,6 +980,19 @@ default int getForgottenFileGracePeriodDays() {
         return 1;
     }
 
+    /**
+     * If any forgotten file is found in Cassandra, it is usually good practice to move/delete them
+     * so when cassandra restarts, it does not load old data which should be removed else you may
+     * run into data resurrection issues. This behavior is fixed in 3.x. This configuration will
+     * allow Priam to move the forgotten files to a "lost_found" directory for user to review at
+     * later time at the same time ensuring that Cassandra does not resurrect data.
+     *
+     * @return true if Priam should move forgotten file to "lost_found" directory of that CF.
+     */
+    default boolean isForgottenFileMoveEnabled() {
+        return false;
+    }
+
     /**
      * A method for allowing access to outside programs to Priam configuration when paired with the
      * Priam configuration HTTP endpoint at /v1/config/structured/all/property
diff --git a/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java b/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java
index 242dc0861..09704ec6d 100644
--- a/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java
+++ b/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java
@@ -738,4 +738,14 @@ public String getMergedConfigurationCronExpression() {
         // Every minute on the top of the minute.
         return config.get(PRIAM_PRE + ".configMerge.cron", "0 * * * * ? *");
     }
+
+    @Override
+    public int getForgottenFileGracePeriodDays() {
+        return config.get(PRIAM_PRE + ".forgottenFileGracePeriodDays", 1);
+    }
+
+    @Override
+    public boolean isForgottenFileMoveEnabled() {
+        return config.get(PRIAM_PRE + ".forgottenFileMoveEnabled", false);
+    }
 }
diff --git a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java
index 8e6d3a338..574005351 100644
--- a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java
+++ b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java
@@ -21,6 +21,7 @@
 import com.netflix.priam.backup.*;
 import com.netflix.priam.backup.AbstractBackupPath.BackupFileType;
 import com.netflix.priam.backupv2.IMetaProxy;
+import com.netflix.priam.config.IBackupRestoreConfig;
 import com.netflix.priam.config.IConfiguration;
 import com.netflix.priam.defaultimpl.ICassandraProcess;
 import com.netflix.priam.health.InstanceState;
@@ -32,10 +33,12 @@
 import java.io.IOException;
 import java.math.BigInteger;
 import java.nio.file.Path;
+import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.*;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 import javax.inject.Named;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.io.FileUtils;
@@ -73,6 +76,8 @@ public abstract class AbstractRestore extends Task implements IRestoreStrategy {
     @Named("v2")
     IMetaProxy metaV2Proxy;
 
+    @Inject IBackupRestoreConfig backupRestoreConfig;
+
     public AbstractRestore(
             IConfiguration config,
             IBackupFileSystem fs,
@@ -113,10 +118,7 @@ public void setRestoreConfiguration(String restoreIncludeCFList, String restoreE
     }
 
     private List<Future<Path>> download(
-            Iterator<AbstractBackupPath> fsIterator,
-            BackupFileType bkupFileType,
-            boolean waitForCompletion)
-            throws Exception {
+            Iterator<AbstractBackupPath> fsIterator, boolean waitForCompletion) throws Exception {
         List<Future<Path>> futureList = new ArrayList<>();
         while (fsIterator.hasNext()) {
             AbstractBackupPath temp = fsIterator.next();
@@ -130,16 +132,14 @@ private List<Future<Path>> download(
                 continue;
             }
 
-            if (temp.getType() == bkupFileType) {
-                File localFileHandler = temp.newRestoreFile();
-                if (logger.isDebugEnabled())
-                    logger.debug(
-                            "Created local file name: "
-                                    + localFileHandler.getAbsolutePath()
-                                    + File.pathSeparator
-                                    + localFileHandler.getName());
-                futureList.add(downloadFile(temp, localFileHandler));
-            }
+            File localFileHandler = temp.newRestoreFile();
+            if (logger.isDebugEnabled())
+                logger.debug(
+                        "Created local file name: "
+                                + localFileHandler.getAbsolutePath()
+                                + File.pathSeparator
+                                + localFileHandler.getName());
+            futureList.add(downloadFile(temp, localFileHandler));
         }
 
         // Wait for all download to finish that were started from this method.
@@ -153,24 +153,19 @@ private void waitForCompletion(List<Future<Path>> futureList) throws Exception {
     }
 
     private List<Future<Path>> downloadCommitLogs(
-            Iterator<AbstractBackupPath> fsIterator,
-            BackupFileType filter,
-            int lastN,
-            boolean waitForCompletion)
+            Iterator<AbstractBackupPath> fsIterator, int lastN, boolean waitForCompletion)
             throws Exception {
         if (fsIterator == null) return null;
 
         BoundedList<AbstractBackupPath> bl = new BoundedList(lastN);
         while (fsIterator.hasNext()) {
             AbstractBackupPath temp = fsIterator.next();
-            if (temp.getType() == BackupFileType.SST) continue;
-
-            if (temp.getType() == filter) {
+            if (temp.getType() == BackupFileType.CL) {
                 bl.add(temp);
             }
         }
 
-        return download(bl.iterator(), filter, waitForCompletion);
+        return download(bl.iterator(), waitForCompletion);
     }
 
     private void stopCassProcess() throws IOException {
@@ -196,6 +191,22 @@ public Void retriableCall() throws Exception {
         }.call();
     }
 
+    private Optional<AbstractBackupPath> getLatestValidMetaPath(
+            IMetaProxy metaProxy, DateUtil.DateRange dateRange) {
+        // Get a list of manifest files.
+        List<AbstractBackupPath> metas = metaProxy.findMetaFiles(dateRange);
+
+        // Find a valid manifest file.
+        for (AbstractBackupPath meta : metas) {
+            BackupVerificationResult result = metaProxy.isMetaFileValid(meta);
+            if (result.valid) {
+                return Optional.of(meta);
+            }
+        }
+
+        return Optional.empty();
+    }
+
     public void restore(DateUtil.DateRange dateRange) throws Exception {
         // fail early if post restore hook has invalid parameters
         if (!postRestoreHook.hasValidParameters()) {
@@ -203,6 +214,8 @@ public void restore(DateUtil.DateRange dateRange) throws Exception {
         }
 
         Date endTime = new Date(dateRange.getEndTime().toEpochMilli());
+        IMetaProxy metaProxy = metaV1Proxy;
+        if (backupRestoreConfig.enableV2Restore()) metaProxy = metaV2Proxy;
 
         // Set the restore status.
         instanceState.getRestoreStatus().resetStatus();
@@ -231,38 +244,60 @@ public void restore(DateUtil.DateRange dateRange) throws Exception {
             File dataDir = new File(config.getDataFileLocation());
             if (dataDir.exists() && dataDir.isDirectory()) FileUtils.cleanDirectory(dataDir);
 
-            // Try and read the Meta file.
-            List<AbstractBackupPath> metas = metaV1Proxy.findMetaFiles(dateRange);
+            // Find latest valid meta file.
+            Optional<AbstractBackupPath> latestValidMetaFile =
+                    getLatestValidMetaPath(metaProxy, dateRange);
 
-            if (metas.size() == 0) {
-                logger.info("No snapshot meta file found, Restore Failed.");
+            if (!latestValidMetaFile.isPresent()) {
+                logger.info("No valid snapshot meta file found, Restore Failed.");
                 instanceState.getRestoreStatus().setExecutionEndTime(LocalDateTime.now());
                 instanceState.setRestoreStatus(Status.FAILED);
                 return;
             }
 
-            AbstractBackupPath meta = metas.get(0);
-            logger.info("Snapshot Meta file for restore {}", meta.getRemotePath());
-            instanceState.getRestoreStatus().setSnapshotMetaFile(meta.getRemotePath());
+            logger.info(
+                    "Snapshot Meta file for restore {}", latestValidMetaFile.get().getRemotePath());
+            instanceState
+                    .getRestoreStatus()
+                    .setSnapshotMetaFile(latestValidMetaFile.get().getRemotePath());
 
-            // TODO: Validate that manifest file is valid.
             // Download the meta.json file.
-            Path metaFile = metaV1Proxy.downloadMetaFile(meta);
+            Path metaFile = metaProxy.downloadMetaFile(latestValidMetaFile.get());
             // Parse meta.json file to find the files required to download from this snapshot.
-            List<AbstractBackupPath> snapshots = metaData.toJson(metaFile.toFile());
+            List<AbstractBackupPath> snapshots =
+                    metaProxy
+                            .getSSTFilesFromMeta(metaFile)
+                            .stream()
+                            .map(
+                                    value -> {
+                                        AbstractBackupPath path = pathProvider.get();
+                                        path.parseRemote(value);
+                                        return path;
+                                    })
+                            .collect(Collectors.toList());
+
             FileUtils.deleteQuietly(metaFile.toFile());
 
             // Download snapshot which is listed in the meta file.
             List<Future<Path>> futureList = new ArrayList<>();
-            futureList.addAll(download(snapshots.iterator(), BackupFileType.SNAP, false));
+            futureList.addAll(download(snapshots.iterator(), false));
 
             logger.info("Downloading incrementals");
+
             // Download incrementals (SST) after the snapshot meta file.
-            String prefix = fs.getPrefix().toString();
-            Iterator<AbstractBackupPath> incrementals = fs.list(prefix, meta.getTime(), endTime);
-            futureList.addAll(download(incrementals, BackupFileType.SST, false));
+            Instant snapshotTime;
+            if (backupRestoreConfig.enableV2Restore())
+                snapshotTime = latestValidMetaFile.get().getLastModified();
+            else snapshotTime = latestValidMetaFile.get().getTime().toInstant();
+
+            DateUtil.DateRange incrementalDateRange =
+                    new DateUtil.DateRange(snapshotTime, dateRange.getEndTime());
+            Iterator<AbstractBackupPath> incrementals =
+                    metaProxy.getIncrementals(incrementalDateRange);
+            futureList.addAll(download(incrementals, false));
 
             // Downloading CommitLogs
+            // Note for Backup V2.0 we do not backup commit logs, as saving them is cost-expensive.
             if (config.isBackingUpCommitLogs()) {
                 logger.info(
                         "Delete all backuped commitlog files in {}",
@@ -271,15 +306,12 @@ public void restore(DateUtil.DateRange dateRange) throws Exception {
 
                 logger.info("Delete all commitlog files in {}", config.getCommitLogLocation());
                 SystemUtils.cleanupDir(config.getCommitLogLocation(), null);
-
+                String prefix = fs.getPrefix().toString();
                 Iterator<AbstractBackupPath> commitLogPathIterator =
-                        fs.list(prefix, meta.getTime(), endTime);
+                        fs.list(prefix, latestValidMetaFile.get().getTime(), endTime);
                 futureList.addAll(
                         downloadCommitLogs(
-                                commitLogPathIterator,
-                                BackupFileType.CL,
-                                config.maxCommitLogsRestore(),
-                                false));
+                                commitLogPathIterator, config.maxCommitLogsRestore(), false));
             }
 
             // Wait for all the futures to finish.
diff --git a/priam/src/main/java/com/netflix/priam/services/SnapshotMetaService.java b/priam/src/main/java/com/netflix/priam/services/SnapshotMetaService.java
index 63d695d9d..1cf62e885 100644
--- a/priam/src/main/java/com/netflix/priam/services/SnapshotMetaService.java
+++ b/priam/src/main/java/com/netflix/priam/services/SnapshotMetaService.java
@@ -61,7 +61,6 @@ public class SnapshotMetaService extends AbstractBackup {
     private static final String SNAPSHOT_PREFIX = "snap_v2_";
     private static final String CASSANDRA_MANIFEST_FILE = "manifest.json";
     private static final String CASSANDRA_SCHEMA_FILE = "schema.cql";
-    private final IBackupRestoreConfig backupRestoreConfig;
     private final BackupRestoreUtil backupRestoreUtil;
     private final MetaFileWriterBuilder metaFileWriter;
     private MetaFileWriterBuilder.DataStep dataStep;
@@ -80,14 +79,12 @@ private enum MetaStep {
     @Inject
     SnapshotMetaService(
             IConfiguration config,
-            IBackupRestoreConfig backupRestoreConfig,
             IFileSystemContext backupFileSystemCtx,
             Provider<AbstractBackupPath> pathFactory,
             MetaFileWriterBuilder metaFileWriter,
             @Named("v2") IMetaProxy metaProxy,
             CassandraOperations cassandraOperations) {
         super(config, backupFileSystemCtx, pathFactory);
-        this.backupRestoreConfig = backupRestoreConfig;
         this.cassandraOperations = cassandraOperations;
         backupRestoreUtil =
                 new BackupRestoreUtil(
@@ -247,8 +244,7 @@ private void uploadAllFiles(
                 if (!snapshotDirectory.getName().startsWith(SNAPSHOT_PREFIX)
                         || !snapshotDirectory.isDirectory()) continue;
 
-                if (snapshotDirectory.list().length == 0
-                        || !backupRestoreConfig.enableV2Backups()) {
+                if (snapshotDirectory.list().length == 0) {
                     FileUtils.cleanDirectory(snapshotDirectory);
                     FileUtils.deleteDirectory(snapshotDirectory);
                     continue;
@@ -369,11 +365,6 @@ private void generateMetaFile(
                 columnfamilyResult.getColumnfamilyName());
     }
 
-    @Override
-    protected void addToRemotePath(String remotePath) {
-        // Do nothing
-    }
-
     // For testing purposes only.
     void setSnapshotName(String snapshotName) {
         this.snapshotName = snapshotName;
diff --git a/priam/src/test/java/com/netflix/priam/backup/FakeBackupFileSystem.java b/priam/src/test/java/com/netflix/priam/backup/FakeBackupFileSystem.java
index b65589cd6..b55d385c5 100644
--- a/priam/src/test/java/com/netflix/priam/backup/FakeBackupFileSystem.java
+++ b/priam/src/test/java/com/netflix/priam/backup/FakeBackupFileSystem.java
@@ -151,7 +151,8 @@ protected void downloadFileImpl(Path remotePath, Path localPath) throws BackupRe
             try (FileWriter fr = new FileWriter(localPath.toFile())) {
                 JSONArray jsonObj = new JSONArray();
                 for (AbstractBackupPath filePath : flist) {
-                    if (filePath.type == AbstractBackupPath.BackupFileType.SNAP) {
+                    if (filePath.type == AbstractBackupPath.BackupFileType.SNAP
+                            && filePath.time.equals(path.time)) {
                         jsonObj.add(filePath.getRemotePath());
                     }
                 }
diff --git a/priam/src/test/java/com/netflix/priam/backup/TestAbstractFileSystem.java b/priam/src/test/java/com/netflix/priam/backup/TestAbstractFileSystem.java
index b05fe0d78..7bfeab6df 100644
--- a/priam/src/test/java/com/netflix/priam/backup/TestAbstractFileSystem.java
+++ b/priam/src/test/java/com/netflix/priam/backup/TestAbstractFileSystem.java
@@ -139,7 +139,7 @@ public void testUpload() throws Exception {
     @Test
     public void testDownload() throws Exception {
         // Dummy download
-        myFileSystem.downloadFile(Paths.get(""), null, 2);
+        myFileSystem.downloadFile(Paths.get(""), Paths.get(configuration.getDataFileLocation()), 2);
         // Verify the success metric for download is incremented.
         Assert.assertEquals(1, (int) backupMetrics.getValidDownloads().actualCount());
     }
@@ -254,7 +254,9 @@ public void testAsyncUploadFailure() throws Exception {
     @Test
     public void testAsyncDownload() throws Exception {
         // Testing single async download.
-        Future<Path> future = myFileSystem.asyncDownloadFile(Paths.get(""), null, 2);
+        Future<Path> future =
+                myFileSystem.asyncDownloadFile(
+                        Paths.get(""), Paths.get(configuration.getDataFileLocation()), 2);
         future.get();
         // 1. Verify the success metric for download is incremented.
         Assert.assertEquals(1, (int) backupMetrics.getValidDownloads().actualCount());
@@ -269,7 +271,9 @@ public void testAsyncDownloadBulk() throws Exception {
         int totalFiles = 1000;
         List<Future<Path>> futureList = new ArrayList<>();
         for (int i = 0; i < totalFiles; i++)
-            futureList.add(myFileSystem.asyncDownloadFile(Paths.get("" + i), null, 2));
+            futureList.add(
+                    myFileSystem.asyncDownloadFile(
+                            Paths.get("" + i), Paths.get(configuration.getDataFileLocation()), 2));
 
         // Ensure processing is finished.
         for (Future future1 : futureList) {
diff --git a/priam/src/test/java/com/netflix/priam/backup/TestS3FileSystem.java b/priam/src/test/java/com/netflix/priam/backup/TestS3FileSystem.java
index c78e818fa..e8f2f92e2 100644
--- a/priam/src/test/java/com/netflix/priam/backup/TestS3FileSystem.java
+++ b/priam/src/test/java/com/netflix/priam/backup/TestS3FileSystem.java
@@ -134,6 +134,7 @@ public void testFileUploadCompleteFailure() throws Exception {
         MockS3PartUploader.setup();
         MockS3PartUploader.completionFailure = true;
         S3FileSystem fs = injector.getInstance(S3FileSystem.class);
+        fs.setS3Client(new MockAmazonS3Client().getMockInstance());
         String snapshotfile =
                 "target/data/Keyspace1/Standard1/backups/201108082320/Keyspace1-Standard1-ia-1-Data.db";
         RemoteBackupPath backupfile = injector.getInstance(RemoteBackupPath.class);
diff --git a/priam/src/test/java/com/netflix/priam/backupv2/TestForgottenFileManager.java b/priam/src/test/java/com/netflix/priam/backupv2/TestForgottenFileManager.java
new file mode 100644
index 000000000..2f566d328
--- /dev/null
+++ b/priam/src/test/java/com/netflix/priam/backupv2/TestForgottenFileManager.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2019 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.netflix.priam.backupv2;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.netflix.priam.backup.BRTestModule;
+import com.netflix.priam.config.FakeConfiguration;
+import com.netflix.priam.config.IConfiguration;
+import com.netflix.priam.merics.BackupMetrics;
+import com.netflix.priam.utils.DateUtil;
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/** Created by aagrawal on 1/1/19. */
+public class TestForgottenFileManager {
+    private ForgottenFilesManager forgottenFilesManager;
+    private TestBackupUtils testBackupUtils;
+    private IConfiguration configuration;
+    private List<Path> allFiles = new ArrayList<>();
+    private Instant snapshotInstant;
+    private Path snapshotDir;
+
+    public TestForgottenFileManager() {
+        Injector injector = Guice.createInjector(new BRTestModule());
+        BackupMetrics backupMetrics = injector.getInstance(BackupMetrics.class);
+        configuration = new ForgottenFilesConfiguration();
+        forgottenFilesManager = new ForgottenFilesManager(configuration, backupMetrics);
+        testBackupUtils = injector.getInstance(TestBackupUtils.class);
+    }
+
+    @Before
+    public void prep() throws Exception {
+        cleanup();
+        Instant now = DateUtil.getInstant();
+        snapshotInstant = now;
+        Path file1 = Paths.get(testBackupUtils.createFile("file1", now.minus(5, ChronoUnit.DAYS)));
+        Path file2 = Paths.get(testBackupUtils.createFile("file2", now.minus(3, ChronoUnit.DAYS)));
+        Path file3 = Paths.get(testBackupUtils.createFile("file3", now.minus(2, ChronoUnit.DAYS)));
+        Path file4 = Paths.get(testBackupUtils.createFile("file4", now.minus(6, ChronoUnit.HOURS)));
+        Path file5 =
+                Paths.get(testBackupUtils.createFile("file5", now.minus(1, ChronoUnit.MINUTES)));
+        Path file6 =
+                Paths.get(
+                        testBackupUtils.createFile(
+                                "tmplink-lb-59516-big-Index.db", now.minus(3, ChronoUnit.DAYS)));
+        Path file7 =
+                Paths.get(testBackupUtils.createFile("file7.tmp", now.minus(3, ChronoUnit.DAYS)));
+
+        allFiles.add(file1);
+        allFiles.add(file2);
+        allFiles.add(file3);
+        allFiles.add(file4);
+        allFiles.add(file5);
+        allFiles.add(file6);
+        allFiles.add(file7);
+
+        // Create a snapshot with file2, file3, file4.
+        Path columnfamilyDir = file1.getParent();
+        snapshotDir =
+                Paths.get(
+                        columnfamilyDir.toString(),
+                        "snapshot",
+                        "snap_v2_" + DateUtil.formatInstant(DateUtil.yyyyMMddHHmm, now));
+        snapshotDir.toFile().mkdirs();
+        Files.createLink(Paths.get(snapshotDir.toString(), file2.getFileName().toString()), file2);
+        Files.createLink(Paths.get(snapshotDir.toString(), file3.getFileName().toString()), file3);
+        Files.createLink(Paths.get(snapshotDir.toString(), file4.getFileName().toString()), file4);
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        String dataDir = configuration.getDataFileLocation();
+        org.apache.commons.io.FileUtils.cleanDirectory(new File(dataDir));
+    }
+
+    @Test
+    public void testMoveForgottenFiles() {
+        Collection<File> files = allFiles.stream().map(Path::toFile).collect(Collectors.toList());
+        forgottenFilesManager.moveForgottenFiles(
+                new File(configuration.getDataFileLocation()), files);
+        Path lostFoundDir =
+                Paths.get(configuration.getDataFileLocation(), forgottenFilesManager.LOST_FOUND);
+        Collection<File> movedFiles = FileUtils.listFiles(lostFoundDir.toFile(), null, false);
+        Assert.assertEquals(allFiles.size(), movedFiles.size());
+        for (Path file : allFiles)
+            Assert.assertTrue(
+                    movedFiles.contains(
+                            Paths.get(lostFoundDir.toString(), file.getFileName().toString())
+                                    .toFile()));
+    }
+
+    @Test
+    public void getColumnfamilyFiles() {
+        Path columnfamilyDir = allFiles.get(0).getParent();
+        Collection<File> columnfamilyFiles =
+                forgottenFilesManager.getColumnfamilyFiles(
+                        snapshotInstant, columnfamilyDir.toFile());
+        Assert.assertEquals(3, columnfamilyFiles.size());
+        Assert.assertTrue(columnfamilyFiles.contains(allFiles.get(0).toFile()));
+        Assert.assertTrue(columnfamilyFiles.contains(allFiles.get(1).toFile()));
+        Assert.assertTrue(columnfamilyFiles.contains(allFiles.get(2).toFile()));
+    }
+
+    @Test
+    public void findAndMoveForgottenFiles() {
+        Path lostFoundDir =
+                Paths.get(allFiles.get(0).getParent().toString(), forgottenFilesManager.LOST_FOUND);
+        forgottenFilesManager.findAndMoveForgottenFiles(snapshotInstant, snapshotDir.toFile());
+
+        // Only one forgotten file - file1.
+        Collection<File> movedFiles = FileUtils.listFiles(lostFoundDir.toFile(), null, false);
+        Assert.assertEquals(1, movedFiles.size());
+        Assert.assertTrue(
+                movedFiles
+                        .iterator()
+                        .next()
+                        .getName()
+                        .equals(allFiles.get(0).getFileName().toString()));
+
+        // All other files still remain in columnfamily dir.
+        Collection<File> cfFiles =
+                FileUtils.listFiles(new File(allFiles.get(0).getParent().toString()), null, false);
+        Assert.assertEquals(6, cfFiles.size());
+        int temp_file_name = 1;
+        for (File file : cfFiles) {
+            file.getName().equals(allFiles.get(temp_file_name++).getFileName().toString());
+        }
+
+        // Snapshot is untouched.
+        Collection<File> snapshotFiles = FileUtils.listFiles(snapshotDir.toFile(), null, false);
+        Assert.assertEquals(3, snapshotFiles.size());
+    }
+
+    private class ForgottenFilesConfiguration extends FakeConfiguration {
+        @Override
+        public boolean isForgottenFileMoveEnabled() {
+            return true;
+        }
+    }
+}
diff --git a/priam/src/test/java/com/netflix/priam/backupv2/TestMetaFileManager.java b/priam/src/test/java/com/netflix/priam/backupv2/TestMetaFileManager.java
deleted file mode 100644
index d816b3ac8..000000000
--- a/priam/src/test/java/com/netflix/priam/backupv2/TestMetaFileManager.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright 2018 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.netflix.priam.backupv2;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.netflix.priam.backup.BRTestModule;
-import com.netflix.priam.config.IConfiguration;
-import com.netflix.priam.utils.DateUtil;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.time.temporal.ChronoUnit;
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-/** Created by aagrawal on 11/28/18. */
-public class TestMetaFileManager {
-
-    private MetaV2Proxy metaV2Proxy;
-    private IConfiguration configuration;
-
-    public TestMetaFileManager() {
-        Injector injector = Guice.createInjector(new BRTestModule());
-        metaV2Proxy = injector.getInstance(MetaV2Proxy.class);
-        configuration = injector.getInstance(IConfiguration.class);
-    }
-
-    @After
-    public void cleanup() throws IOException {
-        FileUtils.cleanDirectory(new File(configuration.getDataFileLocation()));
-    }
-
-    @Test
-    public void testCleanupOldMetaFiles() throws IOException {
-        generateDummyMetaFiles();
-        Path dataDir = Paths.get(configuration.getDataFileLocation());
-        Assert.assertEquals(4, dataDir.toFile().listFiles().length);
-
-        // clean the directory
-        metaV2Proxy.cleanupOldMetaFiles();
-
-        Assert.assertEquals(1, dataDir.toFile().listFiles().length);
-        Path dummy = Paths.get(dataDir.toString(), "dummy.tmp");
-        Assert.assertTrue(dummy.toFile().exists());
-    }
-
-    private void generateDummyMetaFiles() throws IOException {
-        Path dataDir = Paths.get(configuration.getDataFileLocation());
-        FileUtils.write(
-                Paths.get(
-                                configuration.getDataFileLocation(),
-                                MetaFileInfo.getMetaFileName(DateUtil.getInstant()))
-                        .toFile(),
-                "dummy",
-                "UTF-8");
-
-        FileUtils.write(
-                Paths.get(
-                                configuration.getDataFileLocation(),
-                                MetaFileInfo.getMetaFileName(
-                                        DateUtil.getInstant().minus(10, ChronoUnit.MINUTES)))
-                        .toFile(),
-                "dummy",
-                "UTF-8");
-
-        FileUtils.write(
-                Paths.get(
-                                configuration.getDataFileLocation(),
-                                MetaFileInfo.getMetaFileName(DateUtil.getInstant()) + ".tmp")
-                        .toFile(),
-                "dummy",
-                "UTF-8");
-
-        FileUtils.write(
-                Paths.get(configuration.getDataFileLocation(), "dummy.tmp").toFile(),
-                "dummy",
-                "UTF-8");
-    }
-}
diff --git a/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java b/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java
index 5fe5ba14b..f434204d7 100644
--- a/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java
+++ b/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java
@@ -26,13 +26,18 @@
 import com.netflix.priam.backup.FakeBackupFileSystem;
 import com.netflix.priam.config.IConfiguration;
 import com.netflix.priam.utils.DateUtil;
+import java.io.File;
+import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -58,10 +63,11 @@ public TestMetaV2Proxy() {
     public void testMetaPrefix() {
         // Null date range
         Assert.assertEquals(getPrefix() + "/META_V2", metaProxy.getMetaPrefix(null));
+        Instant now = Instant.now();
         // No end date.
         Assert.assertEquals(
-                getPrefix() + "/META_V2",
-                metaProxy.getMetaPrefix(new DateUtil.DateRange(Instant.now(), null)));
+                getPrefix() + "/META_V2/" + now.toEpochMilli(),
+                metaProxy.getMetaPrefix(new DateUtil.DateRange(now, null)));
         // No start date
         Assert.assertEquals(
                 getPrefix() + "/META_V2",
@@ -106,6 +112,28 @@ public void testIsMetaFileValid() throws Exception {
         Assert.assertFalse(metaProxy.isMetaFileValid(abstractBackupPath).valid);
     }
 
+    @Test
+    public void testGetSSTFilesFromMeta() throws Exception {
+        Instant snapshotInstant = DateUtil.getInstant();
+        List<String> remoteFiles = getRemoteFakeFiles();
+        Path metaPath = backupUtils.createMeta(remoteFiles, snapshotInstant);
+        List<String> filesFromMeta = metaProxy.getSSTFilesFromMeta(metaPath);
+        filesFromMeta.removeAll(remoteFiles);
+        Assert.assertTrue(filesFromMeta.isEmpty());
+    }
+
+    @Test
+    public void testGetIncrementalFiles() throws Exception {
+        DateUtil.DateRange dateRange = new DateUtil.DateRange("202812071820,20281229");
+        Iterator<AbstractBackupPath> incrementals = metaProxy.getIncrementals(dateRange);
+        int i = 0;
+        while (incrementals.hasNext()) {
+            System.out.println(incrementals.next());
+            i++;
+        }
+        Assert.assertEquals(3, i);
+    }
+
     @Test
     public void testFindMetaFiles() throws BackupRestoreException {
         List<AbstractBackupPath> metas =
@@ -203,4 +231,57 @@ private List<String> getRemoteFakeFiles() {
                         "meta_v2_202812071901.json"));
         return files.stream().map(Path::toString).collect(Collectors.toList());
     }
+
+    @After
+    public void cleanup() throws IOException {
+        FileUtils.cleanDirectory(new File(configuration.getDataFileLocation()));
+    }
+
+    @Test
+    public void testCleanupOldMetaFiles() throws IOException {
+        generateDummyMetaFiles();
+        Path dataDir = Paths.get(configuration.getDataFileLocation());
+        Assert.assertEquals(4, dataDir.toFile().listFiles().length);
+
+        // clean the directory
+        metaProxy.cleanupOldMetaFiles();
+
+        Assert.assertEquals(1, dataDir.toFile().listFiles().length);
+        Path dummy = Paths.get(dataDir.toString(), "dummy.tmp");
+        Assert.assertTrue(dummy.toFile().exists());
+    }
+
+    private void generateDummyMetaFiles() throws IOException {
+        Path dataDir = Paths.get(configuration.getDataFileLocation());
+        FileUtils.cleanDirectory(dataDir.toFile());
+        FileUtils.write(
+                Paths.get(
+                                configuration.getDataFileLocation(),
+                                MetaFileInfo.getMetaFileName(DateUtil.getInstant()))
+                        .toFile(),
+                "dummy",
+                "UTF-8");
+
+        FileUtils.write(
+                Paths.get(
+                                configuration.getDataFileLocation(),
+                                MetaFileInfo.getMetaFileName(
+                                        DateUtil.getInstant().minus(10, ChronoUnit.MINUTES)))
+                        .toFile(),
+                "dummy",
+                "UTF-8");
+
+        FileUtils.write(
+                Paths.get(
+                                configuration.getDataFileLocation(),
+                                MetaFileInfo.getMetaFileName(DateUtil.getInstant()) + ".tmp")
+                        .toFile(),
+                "dummy",
+                "UTF-8");
+
+        FileUtils.write(
+                Paths.get(configuration.getDataFileLocation(), "dummy.tmp").toFile(),
+                "dummy",
+                "UTF-8");
+    }
 }
diff --git a/priam/src/test/java/com/netflix/priam/config/FakeBackupRestoreConfig.java b/priam/src/test/java/com/netflix/priam/config/FakeBackupRestoreConfig.java
index 7339d5b04..246a91a86 100644
--- a/priam/src/test/java/com/netflix/priam/config/FakeBackupRestoreConfig.java
+++ b/priam/src/test/java/com/netflix/priam/config/FakeBackupRestoreConfig.java
@@ -24,4 +24,9 @@ public String getSnapshotMetaServiceCronExpression() {
     public boolean enableV2Backups() {
         return true;
     }
+
+    @Override
+    public boolean enableV2Restore() {
+        return false;
+    }
 }
diff --git a/priam/src/test/java/com/netflix/priam/backup/TestRestore.java b/priam/src/test/java/com/netflix/priam/restore/TestRestore.java
similarity index 71%
rename from priam/src/test/java/com/netflix/priam/backup/TestRestore.java
rename to priam/src/test/java/com/netflix/priam/restore/TestRestore.java
index 3a931f6bc..06a21b1a6 100644
--- a/priam/src/test/java/com/netflix/priam/backup/TestRestore.java
+++ b/priam/src/test/java/com/netflix/priam/restore/TestRestore.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2017 Netflix, Inc.
+ * Copyright 2018 Netflix, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -15,15 +15,17 @@
  *
  */
 
-package com.netflix.priam.backup;
+package com.netflix.priam.restore;
 
 import com.google.inject.Guice;
 import com.google.inject.Injector;
+import com.netflix.priam.backup.BRTestModule;
+import com.netflix.priam.backup.FakeBackupFileSystem;
+import com.netflix.priam.backup.Status;
 import com.netflix.priam.config.FakeConfiguration;
 import com.netflix.priam.config.IConfiguration;
 import com.netflix.priam.health.InstanceState;
 import com.netflix.priam.identity.config.InstanceInfo;
-import com.netflix.priam.restore.Restore;
 import com.netflix.priam.utils.DateUtil;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -78,20 +80,56 @@ public void testRestore() throws Exception {
         Assert.assertEquals(Status.FINISHED, instanceState.getRestoreStatus().getStatus());
     }
 
-    // Pick latest file
+    @Test
+    public void testRestoreWithIncremental() throws Exception {
+        populateBackupFileSystem("test_backup");
+        String dateRange = "201108110030,201108110730";
+        restore.restore(new DateUtil.DateRange(dateRange));
+        Assert.assertTrue(filesystem.downloadedFiles.contains(fileList.get(0)));
+        Assert.assertTrue(filesystem.downloadedFiles.contains(fileList.get(1)));
+        Assert.assertTrue(filesystem.downloadedFiles.contains(fileList.get(2)));
+        Assert.assertTrue(filesystem.downloadedFiles.contains(fileList.get(3)));
+        Assert.assertTrue(filesystem.downloadedFiles.contains(fileList.get(4)));
+        Assert.assertTrue(filesystem.downloadedFiles.contains(fileList.get(5)));
+        Assert.assertEquals(Status.FINISHED, instanceState.getRestoreStatus().getStatus());
+    }
+
+    @Test
+    public void testRestoreLatestWithEmptyMeta() throws Exception {
+        populateBackupFileSystem("test_backup");
+        String metafile =
+                "test_backup/" + region + "/fakecluster/123456/201108110130/META/meta.json";
+        filesystem.addFile(metafile);
+        String dateRange = "201108110030,201108110530";
+        restore.restore(new DateUtil.DateRange(dateRange));
+        Assert.assertFalse(filesystem.downloadedFiles.contains(fileList.get(0)));
+        Assert.assertTrue(filesystem.downloadedFiles.contains(metafile));
+        Assert.assertFalse(filesystem.downloadedFiles.contains(fileList.get(1)));
+        Assert.assertFalse(filesystem.downloadedFiles.contains(fileList.get(2)));
+        Assert.assertFalse(filesystem.downloadedFiles.contains(fileList.get(3)));
+        Assert.assertFalse(filesystem.downloadedFiles.contains(fileList.get(4)));
+        Assert.assertFalse(filesystem.downloadedFiles.contains(fileList.get(5)));
+        Assert.assertEquals(Status.FINISHED, instanceState.getRestoreStatus().getStatus());
+        Assert.assertEquals(metafile, instanceState.getRestoreStatus().getSnapshotMetaFile());
+    }
+
     @Test
     public void testRestoreLatest() throws Exception {
         populateBackupFileSystem("test_backup");
         String metafile =
                 "test_backup/" + region + "/fakecluster/123456/201108110130/META/meta.json";
         filesystem.addFile(metafile);
+        String snapFile =
+                "test_backup/" + region + "/fakecluster/123456/201108110130/SNAP/ks1/cf1/f9.db";
+        filesystem.addFile(snapFile);
         String dateRange = "201108110030,201108110530";
         restore.restore(new DateUtil.DateRange(dateRange));
         Assert.assertFalse(filesystem.downloadedFiles.contains(fileList.get(0)));
         Assert.assertTrue(filesystem.downloadedFiles.contains(metafile));
-        Assert.assertTrue(filesystem.downloadedFiles.contains(fileList.get(1)));
-        Assert.assertTrue(filesystem.downloadedFiles.contains(fileList.get(2)));
-        Assert.assertTrue(filesystem.downloadedFiles.contains(fileList.get(3)));
+        Assert.assertTrue(filesystem.downloadedFiles.contains(snapFile));
+        Assert.assertFalse(filesystem.downloadedFiles.contains(fileList.get(1)));
+        Assert.assertFalse(filesystem.downloadedFiles.contains(fileList.get(2)));
+        Assert.assertFalse(filesystem.downloadedFiles.contains(fileList.get(3)));
         Assert.assertFalse(filesystem.downloadedFiles.contains(fileList.get(4)));
         Assert.assertFalse(filesystem.downloadedFiles.contains(fileList.get(5)));
         Assert.assertEquals(Status.FINISHED, instanceState.getRestoreStatus().getStatus());