Skip to content

Commit

Permalink
New backup consolidated commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao authored and flowguru committed Jan 7, 2025
1 parent 2b23111 commit 202d3dc
Show file tree
Hide file tree
Showing 28 changed files with 2,993 additions and 383 deletions.
5 changes: 4 additions & 1 deletion fdbbackup/FileConverter.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
if (fp->empty()) {
self->fileProgress.erase(self->fileProgress.begin());
} else {
// Keep fileProgress sorted
// Keep fileProgress sorted because only the first one can be chagned,so this is enough
for (int i = 1; i < self->fileProgress.size(); i++) {
if (*self->fileProgress[i - 1] <= *self->fileProgress[i]) {
break;
Expand Down Expand Up @@ -489,6 +489,9 @@ ACTOR Future<Void> convert(ConvertParams params) {
arena = Arena();
}

// keep getting data until a new version is encounter, then flush all data buffered and start to buffer for a
// new version.

ArenaReader rd(data.arena, data.message, AssumeVersion(g_network->protocolVersion()));
MutationRef m;
rd >> m;
Expand Down
34 changes: 17 additions & 17 deletions fdbbackup/backup.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2397,23 +2397,23 @@ ACTOR Future<Void> runRestore(Database db,
}

if (performRestore) {
Version restoredVersion = wait(backupAgent.restore(db,
origDb,
KeyRef(tagName),
KeyRef(container),
proxy,
ranges,
waitForDone,
targetVersion,
verbose,
KeyRef(addPrefix),
KeyRef(removePrefix),
LockDB::True,
UnlockDB::True,
onlyApplyMutationLogs,
inconsistentSnapshotOnly,
beginVersion,
encryptionKeyFile));
Version restoredVersion = wait(backupAgent.restoreConstructVersion(db,
origDb,
KeyRef(tagName),
KeyRef(container),
proxy,
ranges,
waitForDone,
targetVersion,
verbose,
KeyRef(addPrefix),
KeyRef(removePrefix),
LockDB::True,
UnlockDB::True,
onlyApplyMutationLogs,
inconsistentSnapshotOnly,
beginVersion,
encryptionKeyFile));

if (waitForDone && verbose) {
// If restore is now complete then report version restored
Expand Down
80 changes: 77 additions & 3 deletions fdbclient/BackupAgentBase.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,12 @@ Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion,
return ret;
}

// given a begin and end version, get the prefix in the database for this range
// which is applyLogKeys.begin/backupUid/hash(uint8)/version(64bites)/part
// returns multiple key ranges, each should be of length APPLY_BLOCK_SIZE
// (64, 200) -> [(64, 128), (128, 192), (192, 200)]
Standalone<VectorRef<KeyRangeRef>> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid) {
Standalone<VectorRef<KeyRangeRef>> ret;

Key baLogRangePrefix = backupUid.withPrefix(applyLogKeys.begin);

//TraceEvent("GetLogRanges").detail("BackupUid", backupUid).detail("Prefix", baLogRangePrefix);
Expand Down Expand Up @@ -286,12 +289,19 @@ void _addResult(bool* tenantMapChanging,
*mutationSize += logValue.expectedSize();
}

static double testKeyToDouble(const KeyRef& p) {
uint64_t x = 0;
sscanf(p.toString().c_str(), "%" SCNx64, &x);
return *(double*)&x;
}

/*
This actor is responsible for taking an original transaction which was added to the backup mutation log (represented
by "value" parameter), breaking it up into the individual MutationRefs (that constitute the transaction), decrypting
each mutation (if needed) and adding/removing prefixes from the mutations. The final mutations are then added to the
"result" vector alongside their encrypted counterparts (which is added to the "encryptedResult" vector)
*/
// hfu5: value is each Param2
ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
VectorRef<MutationRef>* result,
VectorRef<Optional<MutationRef>>* encryptedResult,
Expand All @@ -311,38 +321,53 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
memcpy(&protocolVersion, value.begin(), sizeof(uint64_t));
offset += sizeof(uint64_t);
if (protocolVersion <= 0x0FDB00A200090001) {
// it fails here now
TraceEvent(SevError, "DecodeBackupLogValue")
.detail("IncompatibleProtocolVersion", protocolVersion)
.detail("ValueSize", value.size())
.detail("Value", value);
throw incompatible_protocol_version();
}

// hfu5: this is the format for Param2
// change total bytes to 64 bytes in generateOldFormatMutations
state uint32_t totalBytes = 0;
memcpy(&totalBytes, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
state uint32_t consumed = 0;

if (totalBytes + offset > value.size())
if (totalBytes + offset > value.size()) {
TraceEvent(SevError, "OffsetOutOfBoundary")
.detail("TotalBytes", totalBytes)
.detail("Offset", offset)
.detail("Size", value.size());
throw restore_missing_data();
}

state int originalOffset = offset;
state DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
state KeyRangeRef tenantMapRange = TenantMetadata::tenantMap().subspace;

while (consumed < totalBytes) {
// fmt::print(stderr, "DecodeStartRound, offset={}\n", offset);
uint32_t type = 0;
// hfu5: format should be type|kLen|vLen|Key|Value
memcpy(&type, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);

state uint32_t len1 = 0;
memcpy(&len1, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
state uint32_t len2 = 0;
memcpy(&len2, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);

// guru: here len1 is 24, but initially it is 16. so i am not sure if restore can handle tenant mode correctly
// fmt::print(stderr, "DecodeProcess, offset={}, len1={}, len2={}, size={}, type={}, valid={}\n",
// offset, len1, len2, value.size(), type, isValidMutationType(type));
ASSERT(offset + len1 + len2 <= value.size() && isValidMutationType(type));

// mutationref is constructed here
state MutationRef logValue;
state Arena tempArena;
logValue.type = type;
Expand Down Expand Up @@ -448,6 +473,9 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
} else {
Version ver = key_version->rangeContaining(logValue.param1).value();
//TraceEvent("ApplyMutation").detail("LogValue", logValue).detail("Version", version).detail("Ver", ver).detail("Apply", version > ver && ver != invalidVersion);
// version is the version of this mutation decoded from log
// ver is the old version stored in keyVersionMap
// as a result, only add this mutation in log when the version is larger(to work with range file)
if (version > ver && ver != invalidVersion) {
if (removePrefix.size()) {
logValue.param1 = logValue.param1.removePrefix(removePrefix);
Expand All @@ -471,9 +499,12 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,

ASSERT(consumed == totalBytes);
if (value.size() != offset) {
// UnexpectedExtraDataSize="5069" Offset="3826" TotalBytes="3814" Consumed="3814" OriginalOffset="12"
// UnexpectedExtraDataSize="4592" Offset="3429" GroupKey="462625367" TotalBytes="3417" Consumed="3417"
TraceEvent(SevError, "BA_DecodeBackupLogValue")
.detail("UnexpectedExtraDataSize", value.size())
.detail("Offset", offset)
.detail("GroupKey", version)
.detail("TotalBytes", totalBytes)
.detail("Consumed", consumed)
.detail("OriginalOffset", originalOffset);
Expand Down Expand Up @@ -587,6 +618,7 @@ ACTOR Future<Void> readCommitted(Database cx,
}
}

// hfu5: read each version, potentially multiple part within the same version
ACTOR Future<Void> readCommitted(Database cx,
PromiseStream<RCGroup> results,
Future<Void> active,
Expand Down Expand Up @@ -639,14 +671,23 @@ ACTOR Future<Void> readCommitted(Database cx,
wait(lock->take(TaskPriority::DefaultYield, rangevalue.expectedSize() + rcGroup.items.expectedSize()));
releaser = FlowLock::Releaser(*lock, rangevalue.expectedSize() + rcGroup.items.expectedSize());

// iterate on a version range.
// each version - partition is a key-value pair
// hfu5 question: when in the edge case, two partitions of same key goes to two different blocks, so they
// cannot be combined here, what happens?
for (auto& s : rangevalue) {
// hfu5 : (version, part)
uint64_t groupKey = groupBy(s.key).first;
//TraceEvent("Log_ReadCommitted").detail("GroupKey", groupKey).detail("SkipGroup", skipGroup).detail("NextKey", nextKey.key).detail("End", end.key).detail("Valuesize", value.size()).detail("Index",index++).detail("Size",s.value.size());
TraceEvent("Log_ReadCommitted").detail("GroupKey", groupKey).detail("SkipGroup", skipGroup).detail("Begin", range.begin).detail("End", range.end).detail("Size",s.value.size());
if (groupKey != skipGroup) {
if (rcGroup.version == -1) {
rcGroup.version = tr.getReadVersion().get();
rcGroup.groupKey = groupKey;
} else if (rcGroup.groupKey != groupKey) {
// hfu5: if seeing a different version, then send result directly, and then create another
// rcGroup as a result, each rcgroup is for a single version, but a single version can span in
// different rcgroups

//TraceEvent("Log_ReadCommitted").detail("SendGroup0", rcGroup.groupKey).detail("ItemSize", rcGroup.items.size()).detail("DataLength",rcGroup.items[0].value.size());
// state uint32_t len(0);
// for (size_t j = 0; j < rcGroup.items.size(); ++j) {
Expand All @@ -665,6 +706,7 @@ ACTOR Future<Void> readCommitted(Database cx,
rcGroup.version = tr.getReadVersion().get();
rcGroup.groupKey = groupKey;
}
// this is each item, so according to kvMutationLogToTransactions, each item should be a partition
rcGroup.items.push_back_deep(rcGroup.items.arena(), s);
}
}
Expand Down Expand Up @@ -706,6 +748,8 @@ Future<Void> readCommitted(Database cx,
cx, results, Void(), lock, range, groupBy, Terminator::True, AccessSystemKeys::True, LockAware::True);
}

// restore transaction has to be first in the batch, or it is the only txn in batch to make sure it never conflicts with
// others.
ACTOR Future<Void> sendCommitTransactionRequest(CommitTransactionRequest req,
Key uid,
Version newBeginVersion,
Expand All @@ -722,6 +766,7 @@ ACTOR Future<Void> sendCommitTransactionRequest(CommitTransactionRequest req,

// mutations and encrypted mutations (and their relationship) is described in greater detail in the defenition of
// CommitTransactionRef in CommitTransaction.h

req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::SetValue, applyBegin, versionKey));
req.transaction.encryptedMutations.push_back_deep(req.arena, Optional<MutationRef>());
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(applyBegin));
Expand Down Expand Up @@ -759,25 +804,34 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
state Version lastVersion = invalidVersion;
state bool endOfStream = false;
state int totalBytes = 0;
// two layer of loops, outside loop for each file range,
// inside look for each transaction(version)
// fmt::print(stderr, "BackupAgentBase-kvMutationLogToTransactions-beforeLoop\n");
loop {
state CommitTransactionRequest req;
state Version newBeginVersion = invalidVersion;
state int mutationSize = 0;
state bool tenantMapChanging = false;
loop {
try {
// fmt::print(stderr, "BackupAgentBase-RCGroup-Before\n");
state RCGroup group = waitNext(results.getFuture());
// fmt::print(stderr, "BackupAgentBase-RCGroup-After group={}\n", group.groupKey);
state CommitTransactionRequest curReq;
lock->release(group.items.expectedSize());
state int curBatchMutationSize = 0;
tenantMapChanging = false;

BinaryWriter bw(Unversioned());
// BinaryWriter bw(IncludeVersion(ProtocolVersion::withBackupMutations()));

for (int i = 0; i < group.items.size(); ++i) {
// each value should be a partition
bw.serializeBytes(group.items[i].value);
}
// Parse a single transaction from the backup mutation log
Standalone<StringRef> value = bw.toValue();
// ref: https://github.com/apple/foundationdb/blob/release-6.2/design/backup-dataFormat.md
wait(decodeBackupLogValue(&curReq.arena,
&curReq.transaction.mutations,
&curReq.transaction.encryptedMutations,
Expand Down Expand Up @@ -821,12 +875,15 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,

state int i;
for (i = 0; i < curReq.transaction.mutations.size(); i++) {
MutationRef mutation = curReq.transaction.mutations[i];
req.transaction.mutations.push_back_deep(req.arena, curReq.transaction.mutations[i]);
req.transaction.encryptedMutations.push_back_deep(req.arena,
curReq.transaction.encryptedMutations[i]);
}
mutationSize += curBatchMutationSize;
newBeginVersion = group.groupKey + 1;

// fmt::print(stderr, "BackupAgentBase-kvMutationLogToTransactions: newBeginVersion={}, groupKey={}\n", newBeginVersion, group.groupKey);

// At this point if the tenant map changed we would have already sent any normalKey mutations
// accumulated thus far, so all thats left to do is to send all the mutations in the the offending
Expand All @@ -836,6 +893,7 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
break;
}
} catch (Error& e) {
// fmt::print(stderr, "BackupAgentBaseError error={}\n", e.code());
if (e.code() == error_code_end_of_stream) {
if (endVersion.present() && endVersion.get() > lastVersion && endVersion.get() > newBeginVersion) {
newBeginVersion = endVersion.get();
Expand Down Expand Up @@ -882,6 +940,13 @@ ACTOR Future<Void> coalesceKeyVersionCache(Key uid,
lastVersion = it.value();
} else {
Version ver = it.value();
// ver: version from keyVersion
// endVersion: after applying a batch of versions from log files, the largest version
// if ver < endVersion, that means this key in keyVersion is outdated
// in this case, runClearRange on the keyVersionMapRange prefix for this key,
// so that the alog key is the truth, otherwise, keyVersionMapRange should be the truth
// each key needs to be individually checked, because even though range file is for a range, log file does
// not
if (ver < endVersion && lastVersion < endVersion && ver != invalidVersion &&
lastVersion != invalidVersion) {
Key removeKey = it.range().begin.withPrefix(mapPrefix);
Expand Down Expand Up @@ -928,6 +993,7 @@ ACTOR Future<Void> applyMutations(Database cx,
state int maxBytes = CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES;

keyVersion->insert(metadataVersionKey, 0);
// fmt::print(stderr, "BackupAgentBaseApplyMutationBegin: begin={}, end={}\n", beginVersion, *endVersion);

try {
loop {
Expand All @@ -940,15 +1006,22 @@ ACTOR Future<Void> applyMutations(Database cx,
}

int rangeCount = std::max(1, CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES / maxBytes);
// this means newEndVersion can only be at most of size APPLY_BLOCK_SIZE
state Version newEndVersion = std::min(*endVersion,
((beginVersion / CLIENT_KNOBS->APPLY_BLOCK_SIZE) + rangeCount) *
CLIENT_KNOBS->APPLY_BLOCK_SIZE);

// ranges each represent a partition of version, e.g. [100, 200], [201, 300], [301, 400]
// (64, 200) -> [(64, 128), (128, 192), (192, 200)] assuming block size is 64
state Standalone<VectorRef<KeyRangeRef>> ranges = getApplyRanges(beginVersion, newEndVersion, uid);
// ranges have format: applyLogKeys.begin/uid/hash(uint8)/version(64bites)/part
state size_t idx;
state std::vector<PromiseStream<RCGroup>> results;
state std::vector<Future<Void>> rc;
state std::vector<Reference<FlowLock>> locks;

// each RCGroup is for a single version, each results[i] is for a single range
// one range might have multiple versions
for (int i = 0; i < ranges.size(); ++i) {
results.push_back(PromiseStream<RCGroup>());
locks.push_back(makeReference<FlowLock>(
Expand All @@ -957,6 +1030,7 @@ ACTOR Future<Void> applyMutations(Database cx,
}

maxBytes = std::max<int>(maxBytes * CLIENT_KNOBS->APPLY_MAX_DECAY_RATE, CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES);

for (idx = 0; idx < ranges.size(); ++idx) {
int bytes =
wait(kvMutationLogToTransactions(cx,
Expand Down
Loading

0 comments on commit 202d3dc

Please sign in to comment.