From 702e7ee0af027b79c655bd730393faf0b5f27128 Mon Sep 17 00:00:00 2001 From: mattl-netflix <63665634+mattl-netflix@users.noreply.github.com> Date: Thu, 2 Jun 2022 16:05:49 -0700 Subject: [PATCH] Optionally Add Content-MD5 header to uploads of files smaller than the backupChunkSize. (#985) * Preliminary refactoring in advance of adding Content-MD5 header to single part uploads. That header is required when objects have a retention period. * Optionally add md5 header for direct puts. --- .../com/netflix/priam/aws/S3FileSystem.java | 73 +++++++++++-------- .../netflix/priam/config/IConfiguration.java | 4 + 2 files changed, 46 insertions(+), 31 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java b/priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java index e24f68100..ce597c13e 100644 --- a/priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java +++ b/priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java @@ -37,6 +37,7 @@ import com.netflix.priam.merics.BackupMetrics; import com.netflix.priam.notification.BackupNotificationMgr; import com.netflix.priam.utils.BoundedExponentialRetryCallable; +import com.netflix.priam.utils.SystemUtils; import java.io.*; import java.nio.file.Path; import java.nio.file.Paths; @@ -172,45 +173,55 @@ private long uploadMultipart(AbstractBackupPath path, Instant target) protected long uploadFileImpl(AbstractBackupPath path, Instant target) throws BackupRestoreException { - Path localPath = Paths.get(path.getBackupFile().getAbsolutePath()); - String remotePath = path.getRemotePath(); - long chunkSize = config.getBackupChunkSize(); - File localFile = localPath.toFile(); - if (localFile.length() >= chunkSize) return uploadMultipart(path, target); + File localFile = Paths.get(path.getBackupFile().getAbsolutePath()).toFile(); + if (localFile.length() >= config.getBackupChunkSize()) return uploadMultipart(path, target); + byte[] chunk = getFileContents(path); + // C* snapshots may have empty files. That is probably unintentional. + if (chunk.length > 0) { + rateLimiter.acquire(chunk.length); + dynamicRateLimiter.acquire(path, target, chunk.length); + } + try { + new BoundedExponentialRetryCallable(1000, 10000, 5) { + @Override + public PutObjectResult retriableCall() { + return s3Client.putObject(generatePut(path, chunk)); + } + }.call(); + } catch (Exception e) { + throw new BackupRestoreException("Error uploading file: " + localFile.getName(), e); + } + return chunk.length; + } - String prefix = config.getBackupPrefix(); - if (logger.isDebugEnabled()) logger.debug("PUTing {}/{}", prefix, remotePath); + private PutObjectRequest generatePut(AbstractBackupPath path, byte[] chunk) { + File localFile = Paths.get(path.getBackupFile().getAbsolutePath()).toFile(); + ObjectMetadata metadata = getObjectMetadata(localFile); + metadata.setContentLength(chunk.length); + PutObjectRequest put = + new PutObjectRequest( + config.getBackupPrefix(), + path.getRemotePath(), + new ByteArrayInputStream(chunk), + metadata); + if (config.addMD5ToBackupUploads()) { + put.getMetadata().setContentMD5(SystemUtils.toBase64(SystemUtils.md5(chunk))); + } + return put; + } + private byte[] getFileContents(AbstractBackupPath path) throws BackupRestoreException { + File localFile = Paths.get(path.getBackupFile().getAbsolutePath()).toFile(); try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); InputStream in = new BufferedInputStream(new FileInputStream(localFile))) { - Iterator chunks = new ChunkedStream(in, chunkSize, path.getCompression()); + Iterator chunks = + new ChunkedStream(in, config.getBackupChunkSize(), path.getCompression()); while (chunks.hasNext()) { byteArrayOutputStream.write(chunks.next()); } - byte[] chunk = byteArrayOutputStream.toByteArray(); - long compressedFileSize = chunk.length; - // C* snapshots may have empty files. That is probably unintentional. - if (chunk.length > 0) { - rateLimiter.acquire(chunk.length); - dynamicRateLimiter.acquire(path, target, chunk.length); - } - ObjectMetadata objectMetadata = getObjectMetadata(localFile); - objectMetadata.setContentLength(chunk.length); - ByteArrayInputStream inputStream = new ByteArrayInputStream(chunk); - PutObjectRequest putObjectRequest = - new PutObjectRequest(prefix, remotePath, inputStream, objectMetadata); - PutObjectResult upload = - new BoundedExponentialRetryCallable(1000, 10000, 5) { - @Override - public PutObjectResult retriableCall() { - return s3Client.putObject(putObjectRequest); - } - }.call(); - if (logger.isDebugEnabled()) - logger.debug("Put: {} with etag: {}", remotePath, upload.getETag()); - return compressedFileSize; + return byteArrayOutputStream.toByteArray(); } catch (Exception e) { - throw new BackupRestoreException("Error uploading file: " + localFile.getName(), e); + throw new BackupRestoreException("Error reading file: " + localFile.getName(), e); } } } diff --git a/priam/src/main/java/com/netflix/priam/config/IConfiguration.java b/priam/src/main/java/com/netflix/priam/config/IConfiguration.java index b013c62dd..e5365d410 100644 --- a/priam/src/main/java/com/netflix/priam/config/IConfiguration.java +++ b/priam/src/main/java/com/netflix/priam/config/IConfiguration.java @@ -1161,6 +1161,10 @@ default double getRateLimitChangeThreshold() { return 0.1; } + default boolean addMD5ToBackupUploads() { + return false; + } + /** * Escape hatch for getting any arbitrary property by key This is useful so we don't have to * keep adding methods to this interface for every single configuration option ever. Also