Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New backup consolidated commit #11862

Open
wants to merge 1 commit into
base: release-7.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions fdbbackup/FileConverter.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,8 @@ ACTOR Future<Void> convert(ConvertParams params) {
arena = Arena();
}

// keep getting data until a new version is encounter, then flush all data buffered and start to buffer for a
// new version.
ArenaReader rd(data.arena, data.message, AssumeVersion(g_network->protocolVersion()));
MutationRef m;
rd >> m;
Expand Down
52 changes: 49 additions & 3 deletions fdbclient/BackupAgentBase.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,12 @@ Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion,
return ret;
}

// Given a begin and end version, get the prefix in the database for this range
// i.e. applyLogKeys.begin/backupUid/hash(uint8)/version(64bites)/part
// returns multiple key ranges, each of length APPLY_BLOCK_SIZE
// e.g. (64, 200) -> [(64, 128), (128, 192), (192, 200)]
Standalone<VectorRef<KeyRangeRef>> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid) {
Standalone<VectorRef<KeyRangeRef>> ret;

Key baLogRangePrefix = backupUid.withPrefix(applyLogKeys.begin);

//TraceEvent("GetLogRanges").detail("BackupUid", backupUid).detail("Prefix", baLogRangePrefix);
Expand Down Expand Up @@ -291,6 +294,7 @@ void _addResult(bool* tenantMapChanging,
by "value" parameter), breaking it up into the individual MutationRefs (that constitute the transaction), decrypting
each mutation (if needed) and adding/removing prefixes from the mutations. The final mutations are then added to the
"result" vector alongside their encrypted counterparts (which is added to the "encryptedResult" vector)
Each `value` is a param2
*/
ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
VectorRef<MutationRef>* result,
Expand Down Expand Up @@ -323,15 +327,21 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
offset += sizeof(uint32_t);
state uint32_t consumed = 0;

if (totalBytes + offset > value.size())
if (totalBytes + offset > value.size()) {
TraceEvent(SevError, "OffsetOutOfBoundary")
.detail("TotalBytes", totalBytes)
.detail("Offset", offset)
.detail("Size", value.size());
throw restore_missing_data();
}

state int originalOffset = offset;
state DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
state KeyRangeRef tenantMapRange = TenantMetadata::tenantMap().subspace;

while (consumed < totalBytes) {
uint32_t type = 0;
// encoding format: type|kLen|vLen|Key|Value
memcpy(&type, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
state uint32_t len1 = 0;
Expand All @@ -343,6 +353,7 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,

ASSERT(offset + len1 + len2 <= value.size() && isValidMutationType(type));

// Construct MutationRef from StringRef
state MutationRef logValue;
state Arena tempArena;
logValue.type = type;
Expand Down Expand Up @@ -448,6 +459,9 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
} else {
Version ver = key_version->rangeContaining(logValue.param1).value();
//TraceEvent("ApplyMutation").detail("LogValue", logValue).detail("Version", version).detail("Ver", ver).detail("Apply", version > ver && ver != invalidVersion);
// versio: version of this mutation decoded from log
// ver: the old version stored in keyVersionMap
// as a result, only add this mutation in log when the version is larger(to work with range file)
if (version > ver && ver != invalidVersion) {
if (removePrefix.size()) {
logValue.param1 = logValue.param1.removePrefix(removePrefix);
Expand All @@ -474,6 +488,7 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
TraceEvent(SevError, "BA_DecodeBackupLogValue")
.detail("UnexpectedExtraDataSize", value.size())
.detail("Offset", offset)
.detail("GroupKey", version)
.detail("TotalBytes", totalBytes)
.detail("Consumed", consumed)
.detail("OriginalOffset", originalOffset);
Expand Down Expand Up @@ -587,6 +602,7 @@ ACTOR Future<Void> readCommitted(Database cx,
}
}

// hfu5: read each version, potentially multiple part within the same version
ACTOR Future<Void> readCommitted(Database cx,
PromiseStream<RCGroup> results,
Future<Void> active,
Expand Down Expand Up @@ -639,14 +655,24 @@ ACTOR Future<Void> readCommitted(Database cx,
wait(lock->take(TaskPriority::DefaultYield, rangevalue.expectedSize() + rcGroup.items.expectedSize()));
releaser = FlowLock::Releaser(*lock, rangevalue.expectedSize() + rcGroup.items.expectedSize());

// iterate on a version range, each key-value pair is (version, part)
for (auto& s : rangevalue) {
uint64_t groupKey = groupBy(s.key).first;
//TraceEvent("Log_ReadCommitted").detail("GroupKey", groupKey).detail("SkipGroup", skipGroup).detail("NextKey", nextKey.key).detail("End", end.key).detail("Valuesize", value.size()).detail("Index",index++).detail("Size",s.value.size());
TraceEvent("Log_ReadCommitted")
.detail("GroupKey", groupKey)
.detail("SkipGroup", skipGroup)
.detail("Begin", range.begin)
.detail("End", range.end)
.detail("Size", s.value.size());
if (groupKey != skipGroup) {
if (rcGroup.version == -1) {
rcGroup.version = tr.getReadVersion().get();
rcGroup.groupKey = groupKey;
} else if (rcGroup.groupKey != groupKey) {
// if seeing a different version, then send result directly, and then create another
// rcGroup as a result, each rcgroup is for a single version, but a single version can span
// different rcgroups

//TraceEvent("Log_ReadCommitted").detail("SendGroup0", rcGroup.groupKey).detail("ItemSize", rcGroup.items.size()).detail("DataLength",rcGroup.items[0].value.size());
// state uint32_t len(0);
// for (size_t j = 0; j < rcGroup.items.size(); ++j) {
Expand All @@ -665,6 +691,7 @@ ACTOR Future<Void> readCommitted(Database cx,
rcGroup.version = tr.getReadVersion().get();
rcGroup.groupKey = groupKey;
}
// each item is a partition, ref: kvMutationLogToTransactions,
rcGroup.items.push_back_deep(rcGroup.items.arena(), s);
}
}
Expand Down Expand Up @@ -706,6 +733,8 @@ Future<Void> readCommitted(Database cx,
cx, results, Void(), lock, range, groupBy, Terminator::True, AccessSystemKeys::True, LockAware::True);
}

// restore transaction has to be first in the batch, or it is the only txn in batch to make sure it never conflicts with
// others.
ACTOR Future<Void> sendCommitTransactionRequest(CommitTransactionRequest req,
Key uid,
Version newBeginVersion,
Expand Down Expand Up @@ -759,6 +788,7 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
state Version lastVersion = invalidVersion;
state bool endOfStream = false;
state int totalBytes = 0;
// outer loop to batch multiple versions while inner loop for each version
loop {
state CommitTransactionRequest req;
state Version newBeginVersion = invalidVersion;
Expand All @@ -773,7 +803,10 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
tenantMapChanging = false;

BinaryWriter bw(Unversioned());

for (int i = 0; i < group.items.size(); ++i) {
// each value is a partition
// ref: https://github.com/apple/foundationdb/blob/release-6.2/design/backup-dataFormat.md
bw.serializeBytes(group.items[i].value);
}
// Parse a single transaction from the backup mutation log
Expand Down Expand Up @@ -882,6 +915,12 @@ ACTOR Future<Void> coalesceKeyVersionCache(Key uid,
lastVersion = it.value();
} else {
Version ver = it.value();
// ver: version from keyVersion
// endVersion: after applying a batch of versions from log files, the largest version
// if ver < endVersion, that means this key in keyVersion is outdated
// in this case, run ClearRange on the keyVersionMapRange prefix for this key,
// so that the alog key is the truth, otherwise, keyVersionMapRange should be the truth
// each key needs to be individually checked, though range file is for a range, log file is not
if (ver < endVersion && lastVersion < endVersion && ver != invalidVersion &&
lastVersion != invalidVersion) {
Key removeKey = it.range().begin.withPrefix(mapPrefix);
Expand Down Expand Up @@ -940,15 +979,22 @@ ACTOR Future<Void> applyMutations(Database cx,
}

int rangeCount = std::max(1, CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES / maxBytes);
// newEndVersion can only be at most of size APPLY_BLOCK_SIZE
state Version newEndVersion = std::min(*endVersion,
((beginVersion / CLIENT_KNOBS->APPLY_BLOCK_SIZE) + rangeCount) *
CLIENT_KNOBS->APPLY_BLOCK_SIZE);

// ranges each represent a partition of version, e.g. [100, 200], [201, 300], [301, 400]
// (64, 200) -> [(64, 128), (128, 192), (192, 200)] assuming block size is 64
// ranges have format: applyLogKeys.begin/uid/hash(uint8)/version(64bites)/part
state Standalone<VectorRef<KeyRangeRef>> ranges = getApplyRanges(beginVersion, newEndVersion, uid);
state size_t idx;
state std::vector<PromiseStream<RCGroup>> results;
state std::vector<Future<Void>> rc;
state std::vector<Reference<FlowLock>> locks;

// each RCGroup is for a single version, each results[i] is for a single range who can have multiple
// versions
for (int i = 0; i < ranges.size(); ++i) {
results.push_back(PromiseStream<RCGroup>());
locks.push_back(makeReference<FlowLock>(
Expand Down
7 changes: 5 additions & 2 deletions fdbclient/BackupContainerFileSystem.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,15 @@ class BackupContainerFileSystemImpl {
for (int idx : indices) {
const LogFile& file = files[idx];
if (lastEnd == invalidVersion) {
if (file.beginVersion > begin)
if (file.beginVersion > begin) {
// the first version of the first file must be smaller or equal to the desired beginVersion
return false;
}
if (file.endVersion > begin) {
lastBegin = begin;
lastTags = file.totalTags;
} else {
// if endVerison of file is smaller than desired beginVersion, then do not include this file
continue;
}
} else if (lastEnd < file.beginVersion) {
Expand Down Expand Up @@ -904,7 +907,6 @@ class BackupContainerFileSystemImpl {
// If "keyRangesFilter" is empty, the file set will cover all key ranges present in the backup.
// It's generally a good idea to specify "keyRangesFilter" to reduce the number of files for
// restore times.
//
// If "logsOnly" is true, then only log files are returned and "keyRangesFilter" is ignored,
// because the log can contain mutations of the whole key space, unlike range files that each
// is limited to a smaller key range.
Expand Down Expand Up @@ -974,6 +976,7 @@ class BackupContainerFileSystemImpl {
if (restorable.targetVersion < maxKeyRangeVersion)
continue;

// restorable.snapshot.beginVersion is set to the smallest(oldest) snapshot's beginVersion
restorable.snapshot = snapshots[i];

// No logs needed if there is a complete filtered key space snapshot at the target version.
Expand Down
1 change: 1 addition & 0 deletions fdbclient/ClientKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( BACKUP_DISPATCH_ADDTASK_SIZE, 50 );
init( RESTORE_DISPATCH_ADDTASK_SIZE, 150 );
init( RESTORE_DISPATCH_BATCH_SIZE, 30000 ); if( randomize && BUGGIFY ) RESTORE_DISPATCH_BATCH_SIZE = 20;
init (RESTORE_PARTITIONED_BATCH_VERSION_SIZE, 10000000); // each step restores 10s worth of data
init( RESTORE_WRITE_TX_SIZE, 256 * 1024 );
init( APPLY_MAX_LOCK_BYTES, 1e9 );
init( APPLY_MIN_LOCK_BYTES, 11e6 ); //Must be bigger than TRANSACTION_SIZE_LIMIT
Expand Down
Loading