Skip to content

Commit

Permalink
endpoint to return backup queue, metatdata and pending files count (#…
Browse files Browse the repository at this point in the history
…1100)

* endpoint to return backup queue, metatdata and pending files count

* Updated backup DirectorySize interface to get the file count for snapshots/backups

---------

Co-authored-by: ayushis <[email protected]>
  • Loading branch information
ayushisingh29 and ayushis authored Dec 19, 2024
1 parent 2a5cc87 commit 7521446
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import com.google.inject.ImplementedBy;

/** estimates the number of bytes remaining to upload in a snapshot */
@ImplementedBy(SnapshotDirectorySize.class)
/** estimates the number of bytes and files remaining to upload in a snapshot/backup */
public interface DirectorySize {
/** return the total bytes of all snapshot files south of location in the filesystem */
/** return the total bytes of all snapshot/backup files south of location in the filesystem */
long getBytes(String location);
/** return the total files of all snapshot/backup files south of location in the filesystem */
int getFiles(String location);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import com.netflix.priam.scheduler.SimpleTimer;
import com.netflix.priam.scheduler.TaskTimer;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Set;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.io.FileUtils;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.netflix.priam.backup;

import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;

/** Estimates remaining bytes or files to upload in a backup by looking at the file system */
public class IncrementalBackupDirectorySize implements DirectorySize {

public long getBytes(String location) {
SummingFileVisitor fileVisitor = new SummingFileVisitor();
try {
Files.walkFileTree(Paths.get(location), fileVisitor);
} catch (IOException e) {
// BackupFileVisitor is happy with an estimate and won't produce these in practice.
}
return fileVisitor.getTotalBytes();
}

public int getFiles(String location) {
SummingFileVisitor fileVisitor = new SummingFileVisitor();
try {
Files.walkFileTree(Paths.get(location), fileVisitor);
} catch (IOException e) {
// BackupFileVisitor is happy with an estimate and won't produce these in practice.
}
return fileVisitor.getTotalFiles();
}

private static final class SummingFileVisitor implements FileVisitor<Path> {
private long totalBytes;
private int totalFiles;

@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
if (file.toString().contains(AbstractBackup.INCREMENTAL_BACKUP_FOLDER) && attrs.isRegularFile()) {
totalBytes += attrs.size();
totalFiles += 1;
}
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFileFailed(Path file, IOException exc) {
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
return FileVisitResult.CONTINUE;
}

long getTotalBytes() {
return totalBytes;
}

int getTotalFiles() {
return totalFiles;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;

/** Estimates remaining bytes to upload in a backup by looking at the file system */
/** Estimates remaining bytes or files to upload in a backup by looking at the file system */
public class SnapshotDirectorySize implements DirectorySize {

public long getBytes(String location) {
Expand All @@ -17,8 +17,19 @@ public long getBytes(String location) {
return fileVisitor.getTotalBytes();
}

public int getFiles(String location) {
SummingFileVisitor fileVisitor = new SummingFileVisitor();
try {
Files.walkFileTree(Paths.get(location), fileVisitor);
} catch (IOException e) {
// BackupFileVisitor is happy with an estimate and won't produce these in practice.
}
return fileVisitor.getTotalFiles();
}

private static final class SummingFileVisitor implements FileVisitor<Path> {
private long totalBytes;
private int totalFiles;

@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
Expand All @@ -29,6 +40,7 @@ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
if (file.toString().contains(AbstractBackup.SNAPSHOT_FOLDER) && attrs.isRegularFile()) {
totalBytes += attrs.size();
totalFiles += 1;
}
return FileVisitResult.CONTINUE;
}
Expand All @@ -46,5 +58,9 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
long getTotalBytes() {
return totalBytes;
}

int getTotalFiles() {
return totalFiles;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.netflix.priam.backupv2;

import com.netflix.priam.backup.DirectorySize;
import com.netflix.priam.backup.IncrementalBackup;
import com.netflix.priam.config.IBackupRestoreConfig;
import com.netflix.priam.config.IConfiguration;
Expand All @@ -25,8 +26,13 @@
import com.netflix.priam.scheduler.PriamScheduler;
import com.netflix.priam.scheduler.TaskTimer;
import com.netflix.priam.tuner.CassandraTunerService;
import java.util.HashMap;
import java.util.Map;
import javax.inject.Inject;
import org.apache.commons.lang3.math.Fraction;
import com.netflix.priam.backup.SnapshotDirectorySize;
import com.netflix.priam.backup.IncrementalBackupDirectorySize;


/**
* Encapsulate the backup service 2.0 - Execute all the tasks required to run backup service.
Expand All @@ -39,6 +45,8 @@ public class BackupV2Service implements IService {
private final SnapshotMetaTask snapshotMetaTask;
private final CassandraTunerService cassandraTunerService;
private final ITokenRetriever tokenRetriever;
private final DirectorySize snapshotDirectorySize = new SnapshotDirectorySize();
private final DirectorySize incrementalBackupDirectorySize = new IncrementalBackupDirectorySize();

@Inject
public BackupV2Service(
Expand Down Expand Up @@ -101,4 +109,11 @@ public void updateServicePre() throws Exception {

@Override
public void updateServicePost() throws Exception {}

public Map<String, Integer> countPendingBackupFiles() throws Exception {
Map<String, Integer> backupFiles = new HashMap<String, Integer>();
backupFiles.put("totalFiles", (snapshotDirectorySize.getFiles(configuration.getDataFileLocation()) +
incrementalBackupDirectorySize.getFiles(configuration.getDataFileLocation())));
return backupFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package com.netflix.priam.resources;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.priam.PriamServer;
import com.netflix.priam.backup.*;
import com.netflix.priam.backupv2.BackupTTLTask;
import com.netflix.priam.backupv2.BackupV2Service;
Expand All @@ -29,7 +31,9 @@
import com.netflix.priam.utils.GsonJsonSerializer;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.inject.Inject;
Expand All @@ -55,6 +59,8 @@ public class BackupServletV2 {
private final Provider<AbstractBackupPath> pathProvider;
private final BackupV2Service backupService;
private final BackupNotificationMgr backupNotificationMgr;
private final PriamServer priamServer;

private static final String REST_SUCCESS = "[\"ok\"]";

@Inject
Expand All @@ -68,7 +74,8 @@ public BackupServletV2(
@Named("v2") IMetaProxy metaV2Proxy,
Provider<AbstractBackupPath> pathProvider,
BackupV2Service backupService,
BackupNotificationMgr backupNotificationMgr) {
BackupNotificationMgr backupNotificationMgr,
PriamServer priamServer) {
this.backupStatusMgr = backupStatusMgr;
this.backupVerification = backupVerification;
this.snapshotMetaService = snapshotMetaService;
Expand All @@ -78,6 +85,7 @@ public BackupServletV2(
this.pathProvider = pathProvider;
this.backupService = backupService;
this.backupNotificationMgr = backupNotificationMgr;
this.priamServer = priamServer;
}

@GET
Expand Down Expand Up @@ -175,4 +183,26 @@ public Response list(@PathParam("daterange") String daterange) throws Exception
files.stream().map(AbstractBackupPath::getRemotePath).collect(Collectors.toList());
return Response.ok(GsonJsonSerializer.getGson().toJson(remotePaths)).build();
}

@GET
@Path("/state/{hours}")
public Response backupState(@PathParam("hours") int hours) throws Exception {
Map<String, Object> responseMap = new HashMap<>();

responseMap.put("tasksQueued", fs.getUploadTasksQueued());
responseMap.put("queueSize", priamServer.getConfiguration().getBackupQueueSize());
for (Map.Entry<String, Integer> entry :
backupService.countPendingBackupFiles().entrySet()) {
responseMap.put(entry.getKey(), entry.getValue());
}

List<BackupMetadata> latestBackupMetadata =
backupStatusMgr.getLatestBackupMetadata(
new DateRange(Instant.now().minus(hours, ChronoUnit.HOURS), Instant.now()));
responseMap.put("latestBackupMetadata", latestBackupMetadata);

ObjectMapper mapper = new ObjectMapper();
String jsonResponse = mapper.writeValueAsString(responseMap);
return Response.ok(jsonResponse).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
public class TestBackupDynamicRateLimiter {
private static final Instant NOW = Instant.ofEpochMilli(1 << 16);
private static final Instant LATER = NOW.plusMillis(Duration.ofHours(1).toMillis());
private static final int DIR_SIZE = 1 << 16;
private static final int DIR_SIZE_BYTES = 1 << 16;
private static final int DIR_SIZE_FILES = 10;

private BackupDynamicRateLimiter rateLimiter;
private FakeConfiguration config;
Expand All @@ -34,22 +35,22 @@ public void setUp() {

@Test
public void sunnyDay() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
Stopwatch timer = timePermitAcquisition(getBackupPath(), LATER, 21);
Truth.assertThat(timer.elapsed(TimeUnit.MILLISECONDS)).isAtLeast(1_000);
Truth.assertThat(timer.elapsed(TimeUnit.MILLISECONDS)).isAtMost(2_000);
}

@Test
public void targetSetToEpoch() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
Stopwatch timer = timePermitAcquisition(getBackupPath(), Instant.EPOCH, 20);
assertNoRateLimiting(timer);
}

@Test
public void pathIsNotASnapshot() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
AbstractBackupPath path =
getBackupPath(
"target/data/Keyspace1/Standard1/backups/Keyspace1-Standard1-ia-4-Data.db");
Expand All @@ -59,47 +60,47 @@ public void pathIsNotASnapshot() {

@Test
public void targetIsNow() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
Stopwatch timer = timePermitAcquisition(getBackupPath(), NOW, 20);
assertNoRateLimiting(timer);
}

@Test
public void targetIsInThePast() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
Instant target = NOW.minus(Duration.ofHours(1L));
Stopwatch timer = timePermitAcquisition(getBackupPath(), target, 20);
assertNoRateLimiting(timer);
}

@Test
public void noBackupThreads() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 0), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 0), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
assertIllegalState(() -> timePermitAcquisition(getBackupPath(), LATER, 20));
}

@Test
public void negativeBackupThreads() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", -1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", -1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
assertIllegalState(() -> timePermitAcquisition(getBackupPath(), LATER, 20));
}

@Test
public void noData() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, 0);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, 0, 0);
Stopwatch timer = timePermitAcquisition(getBackupPath(), LATER, 20);
assertNoRateLimiting(timer);
}

@Test
public void noPermitsRequested() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
assertIllegalArgument(() -> timePermitAcquisition(getBackupPath(), LATER, 0));
}

@Test
public void negativePermitsRequested() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
assertIllegalArgument(() -> timePermitAcquisition(getBackupPath(), LATER, -1));
}

Expand All @@ -123,12 +124,12 @@ private Stopwatch timePermitAcquisition(AbstractBackupPath path, Instant now, in
}

private BackupDynamicRateLimiter getRateLimiter(
Map<String, Object> properties, Instant now, long directorySize) {
Map<String, Object> properties, Instant now, long directorySizeBytes, int directorySizeFiles) {
properties.forEach(config::setFakeConfig);
return new BackupDynamicRateLimiter(
config,
Clock.fixed(now, ZoneId.systemDefault()),
new FakeDirectorySize(directorySize));
new FakeDirectorySize(directorySizeBytes, directorySizeFiles));
}

private void assertNoRateLimiting(Stopwatch timer) {
Expand All @@ -155,14 +156,21 @@ private void assertIllegalArgument(Runnable method) {

private static final class FakeDirectorySize implements DirectorySize {
private final long size;
private final int fileCount;

FakeDirectorySize(long size) {
FakeDirectorySize(long size, int fileCount) {
this.size = size;
this.fileCount = fileCount;
}

@Override
public long getBytes(String location) {
return size;
}

@Override
public int getFiles(String location) {
return fileCount;
}
}
}

0 comments on commit 7521446

Please sign in to comment.