diff --git a/fdbbackup/FileConverter.actor.cpp b/fdbbackup/FileConverter.actor.cpp index 84cded889c2..733a00651ce 100644 --- a/fdbbackup/FileConverter.actor.cpp +++ b/fdbbackup/FileConverter.actor.cpp @@ -489,6 +489,8 @@ ACTOR Future convert(ConvertParams params) { arena = Arena(); } + // keep getting data until a new version is encounter, then flush all data buffered and start to buffer for a + // new version. ArenaReader rd(data.arena, data.message, AssumeVersion(g_network->protocolVersion())); MutationRef m; rd >> m; diff --git a/fdbclient/BackupAgentBase.actor.cpp b/fdbclient/BackupAgentBase.actor.cpp index 201e33b3505..ad8a89aca2d 100644 --- a/fdbclient/BackupAgentBase.actor.cpp +++ b/fdbclient/BackupAgentBase.actor.cpp @@ -187,9 +187,12 @@ Standalone> getLogRanges(Version beginVersion, return ret; } +// Given a begin and end version, get the prefix in the database for this range +// i.e. applyLogKeys.begin/backupUid/hash(uint8)/version(64bites)/part +// returns multiple key ranges, each of length APPLY_BLOCK_SIZE +// e.g. (64, 200) -> [(64, 128), (128, 192), (192, 200)] Standalone> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid) { Standalone> ret; - Key baLogRangePrefix = backupUid.withPrefix(applyLogKeys.begin); //TraceEvent("GetLogRanges").detail("BackupUid", backupUid).detail("Prefix", baLogRangePrefix); @@ -291,6 +294,7 @@ void _addResult(bool* tenantMapChanging, by "value" parameter), breaking it up into the individual MutationRefs (that constitute the transaction), decrypting each mutation (if needed) and adding/removing prefixes from the mutations. The final mutations are then added to the "result" vector alongside their encrypted counterparts (which is added to the "encryptedResult" vector) + Each `value` is a param2 */ ACTOR static Future decodeBackupLogValue(Arena* arena, VectorRef* result, @@ -323,8 +327,13 @@ ACTOR static Future decodeBackupLogValue(Arena* arena, offset += sizeof(uint32_t); state uint32_t consumed = 0; - if (totalBytes + offset > value.size()) + if (totalBytes + offset > value.size()) { + TraceEvent(SevError, "OffsetOutOfBoundary") + .detail("TotalBytes", totalBytes) + .detail("Offset", offset) + .detail("Size", value.size()); throw restore_missing_data(); + } state int originalOffset = offset; state DatabaseConfiguration config = wait(getDatabaseConfiguration(cx)); @@ -332,6 +341,7 @@ ACTOR static Future decodeBackupLogValue(Arena* arena, while (consumed < totalBytes) { uint32_t type = 0; + // encoding format: type|kLen|vLen|Key|Value memcpy(&type, value.begin() + offset, sizeof(uint32_t)); offset += sizeof(uint32_t); state uint32_t len1 = 0; @@ -343,6 +353,7 @@ ACTOR static Future decodeBackupLogValue(Arena* arena, ASSERT(offset + len1 + len2 <= value.size() && isValidMutationType(type)); + // Construct MutationRef from StringRef state MutationRef logValue; state Arena tempArena; logValue.type = type; @@ -448,6 +459,9 @@ ACTOR static Future decodeBackupLogValue(Arena* arena, } else { Version ver = key_version->rangeContaining(logValue.param1).value(); //TraceEvent("ApplyMutation").detail("LogValue", logValue).detail("Version", version).detail("Ver", ver).detail("Apply", version > ver && ver != invalidVersion); + // versio: version of this mutation decoded from log + // ver: the old version stored in keyVersionMap + // as a result, only add this mutation in log when the version is larger(to work with range file) if (version > ver && ver != invalidVersion) { if (removePrefix.size()) { logValue.param1 = logValue.param1.removePrefix(removePrefix); @@ -474,6 +488,7 @@ ACTOR static Future decodeBackupLogValue(Arena* arena, TraceEvent(SevError, "BA_DecodeBackupLogValue") .detail("UnexpectedExtraDataSize", value.size()) .detail("Offset", offset) + .detail("GroupKey", version) .detail("TotalBytes", totalBytes) .detail("Consumed", consumed) .detail("OriginalOffset", originalOffset); @@ -587,6 +602,7 @@ ACTOR Future readCommitted(Database cx, } } +// hfu5: read each version, potentially multiple part within the same version ACTOR Future readCommitted(Database cx, PromiseStream results, Future active, @@ -639,14 +655,24 @@ ACTOR Future readCommitted(Database cx, wait(lock->take(TaskPriority::DefaultYield, rangevalue.expectedSize() + rcGroup.items.expectedSize())); releaser = FlowLock::Releaser(*lock, rangevalue.expectedSize() + rcGroup.items.expectedSize()); + // iterate on a version range, each key-value pair is (version, part) for (auto& s : rangevalue) { uint64_t groupKey = groupBy(s.key).first; - //TraceEvent("Log_ReadCommitted").detail("GroupKey", groupKey).detail("SkipGroup", skipGroup).detail("NextKey", nextKey.key).detail("End", end.key).detail("Valuesize", value.size()).detail("Index",index++).detail("Size",s.value.size()); + TraceEvent("Log_ReadCommitted") + .detail("GroupKey", groupKey) + .detail("SkipGroup", skipGroup) + .detail("Begin", range.begin) + .detail("End", range.end) + .detail("Size", s.value.size()); if (groupKey != skipGroup) { if (rcGroup.version == -1) { rcGroup.version = tr.getReadVersion().get(); rcGroup.groupKey = groupKey; } else if (rcGroup.groupKey != groupKey) { + // if seeing a different version, then send result directly, and then create another + // rcGroup as a result, each rcgroup is for a single version, but a single version can span + // different rcgroups + //TraceEvent("Log_ReadCommitted").detail("SendGroup0", rcGroup.groupKey).detail("ItemSize", rcGroup.items.size()).detail("DataLength",rcGroup.items[0].value.size()); // state uint32_t len(0); // for (size_t j = 0; j < rcGroup.items.size(); ++j) { @@ -665,6 +691,7 @@ ACTOR Future readCommitted(Database cx, rcGroup.version = tr.getReadVersion().get(); rcGroup.groupKey = groupKey; } + // each item is a partition, ref: kvMutationLogToTransactions, rcGroup.items.push_back_deep(rcGroup.items.arena(), s); } } @@ -706,6 +733,8 @@ Future readCommitted(Database cx, cx, results, Void(), lock, range, groupBy, Terminator::True, AccessSystemKeys::True, LockAware::True); } +// restore transaction has to be first in the batch, or it is the only txn in batch to make sure it never conflicts with +// others. ACTOR Future sendCommitTransactionRequest(CommitTransactionRequest req, Key uid, Version newBeginVersion, @@ -759,6 +788,7 @@ ACTOR Future kvMutationLogToTransactions(Database cx, state Version lastVersion = invalidVersion; state bool endOfStream = false; state int totalBytes = 0; + // outer loop to batch multiple versions while inner loop for each version loop { state CommitTransactionRequest req; state Version newBeginVersion = invalidVersion; @@ -773,7 +803,10 @@ ACTOR Future kvMutationLogToTransactions(Database cx, tenantMapChanging = false; BinaryWriter bw(Unversioned()); + for (int i = 0; i < group.items.size(); ++i) { + // each value is a partition + // ref: https://github.com/apple/foundationdb/blob/release-6.2/design/backup-dataFormat.md bw.serializeBytes(group.items[i].value); } // Parse a single transaction from the backup mutation log @@ -882,6 +915,12 @@ ACTOR Future coalesceKeyVersionCache(Key uid, lastVersion = it.value(); } else { Version ver = it.value(); + // ver: version from keyVersion + // endVersion: after applying a batch of versions from log files, the largest version + // if ver < endVersion, that means this key in keyVersion is outdated + // in this case, run ClearRange on the keyVersionMapRange prefix for this key, + // so that the alog key is the truth, otherwise, keyVersionMapRange should be the truth + // each key needs to be individually checked, though range file is for a range, log file is not if (ver < endVersion && lastVersion < endVersion && ver != invalidVersion && lastVersion != invalidVersion) { Key removeKey = it.range().begin.withPrefix(mapPrefix); @@ -940,15 +979,22 @@ ACTOR Future applyMutations(Database cx, } int rangeCount = std::max(1, CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES / maxBytes); + // newEndVersion can only be at most of size APPLY_BLOCK_SIZE state Version newEndVersion = std::min(*endVersion, ((beginVersion / CLIENT_KNOBS->APPLY_BLOCK_SIZE) + rangeCount) * CLIENT_KNOBS->APPLY_BLOCK_SIZE); + + // ranges each represent a partition of version, e.g. [100, 200], [201, 300], [301, 400] + // (64, 200) -> [(64, 128), (128, 192), (192, 200)] assuming block size is 64 + // ranges have format: applyLogKeys.begin/uid/hash(uint8)/version(64bites)/part state Standalone> ranges = getApplyRanges(beginVersion, newEndVersion, uid); state size_t idx; state std::vector> results; state std::vector> rc; state std::vector> locks; + // each RCGroup is for a single version, each results[i] is for a single range who can have multiple + // versions for (int i = 0; i < ranges.size(); ++i) { results.push_back(PromiseStream()); locks.push_back(makeReference( diff --git a/fdbclient/BackupContainerFileSystem.actor.cpp b/fdbclient/BackupContainerFileSystem.actor.cpp index 62d35d82549..cc07c3817b7 100644 --- a/fdbclient/BackupContainerFileSystem.actor.cpp +++ b/fdbclient/BackupContainerFileSystem.actor.cpp @@ -259,12 +259,15 @@ class BackupContainerFileSystemImpl { for (int idx : indices) { const LogFile& file = files[idx]; if (lastEnd == invalidVersion) { - if (file.beginVersion > begin) + if (file.beginVersion > begin) { + // the first version of the first file must be smaller or equal to the desired beginVersion return false; + } if (file.endVersion > begin) { lastBegin = begin; lastTags = file.totalTags; } else { + // if endVerison of file is smaller than desired beginVersion, then do not include this file continue; } } else if (lastEnd < file.beginVersion) { @@ -904,7 +907,6 @@ class BackupContainerFileSystemImpl { // If "keyRangesFilter" is empty, the file set will cover all key ranges present in the backup. // It's generally a good idea to specify "keyRangesFilter" to reduce the number of files for // restore times. - // // If "logsOnly" is true, then only log files are returned and "keyRangesFilter" is ignored, // because the log can contain mutations of the whole key space, unlike range files that each // is limited to a smaller key range. @@ -974,6 +976,7 @@ class BackupContainerFileSystemImpl { if (restorable.targetVersion < maxKeyRangeVersion) continue; + // restorable.snapshot.beginVersion is set to the smallest(oldest) snapshot's beginVersion restorable.snapshot = snapshots[i]; // No logs needed if there is a complete filtered key space snapshot at the target version. diff --git a/fdbclient/ClientKnobs.cpp b/fdbclient/ClientKnobs.cpp index a53847c9d40..1d668567bb0 100644 --- a/fdbclient/ClientKnobs.cpp +++ b/fdbclient/ClientKnobs.cpp @@ -183,6 +183,7 @@ void ClientKnobs::initialize(Randomize randomize) { init( BACKUP_DISPATCH_ADDTASK_SIZE, 50 ); init( RESTORE_DISPATCH_ADDTASK_SIZE, 150 ); init( RESTORE_DISPATCH_BATCH_SIZE, 30000 ); if( randomize && BUGGIFY ) RESTORE_DISPATCH_BATCH_SIZE = 20; + init (RESTORE_PARTITIONED_BATCH_VERSION_SIZE, 10000000); // each step restores 10s worth of data init( RESTORE_WRITE_TX_SIZE, 256 * 1024 ); init( APPLY_MAX_LOCK_BYTES, 1e9 ); init( APPLY_MIN_LOCK_BYTES, 11e6 ); //Must be bigger than TRANSACTION_SIZE_LIMIT diff --git a/fdbclient/FileBackupAgent.actor.cpp b/fdbclient/FileBackupAgent.actor.cpp index f0f02877e0c..228eaa4b14c 100644 --- a/fdbclient/FileBackupAgent.actor.cpp +++ b/fdbclient/FileBackupAgent.actor.cpp @@ -40,6 +40,7 @@ #include "fdbclient/KeyRangeMap.h" #include "fdbclient/Knobs.h" #include "fdbclient/ManagementAPI.actor.h" +#include "fdbclient/PartitionedLogIterator.h" #include "fdbclient/RestoreInterface.h" #include "fdbclient/Status.h" #include "fdbclient/SystemData.h" @@ -156,7 +157,6 @@ ACTOR Future> TagUidMap::getAll_impl(TagUidMap* tagsMa KeyBackedTag::KeyBackedTag(std::string tagName, StringRef tagMapPrefix) : KeyBackedProperty(TagUidMap(tagMapPrefix).getProperty(tagName)), tagName(tagName), tagMapPrefix(tagMapPrefix) {} - class RestoreConfig : public KeyBackedTaskConfig { public: RestoreConfig(UID uid = UID()) : KeyBackedTaskConfig(fileRestorePrefixRange.begin, uid) {} @@ -172,6 +172,7 @@ class RestoreConfig : public KeyBackedTaskConfig { KeyBackedProperty onlyApplyMutationLogs() { return configSpace.pack(__FUNCTION__sr); } KeyBackedProperty inconsistentSnapshotOnly() { return configSpace.pack(__FUNCTION__sr); } KeyBackedProperty unlockDBAfterRestore() { return configSpace.pack(__FUNCTION__sr); } + KeyBackedProperty transformPartitionedLog() { return configSpace.pack(__FUNCTION__sr); } // XXX: Remove restoreRange() once it is safe to remove. It has been changed to restoreRanges KeyBackedProperty restoreRange() { return configSpace.pack(__FUNCTION__sr); } // XXX: Changed to restoreRangeSet. It can be removed. @@ -233,15 +234,18 @@ class RestoreConfig : public KeyBackedTaskConfig { // Describes a file to load blocks from during restore. Ordered by version and then fileName to enable // incrementally advancing through the map, saving the version and path of the next starting point. struct RestoreFile { - Version version; + Version version; // this is beginVersion, not endVersion std::string fileName; bool isRange{ false }; // false for log file int64_t blockSize{ 0 }; int64_t fileSize{ 0 }; Version endVersion{ ::invalidVersion }; // not meaningful for range files + int64_t tagId = -1; // only meaningful to log files, Log router tag. Non-negative for new backup format. + int64_t totalTags = -1; // only meaningful to log files, Total number of log router tags. Tuple pack() const { - return Tuple::makeTuple(version, fileName, (int)isRange, fileSize, blockSize, endVersion); + return Tuple::makeTuple( + version, fileName, (int64_t)isRange, fileSize, blockSize, endVersion, tagId, totalTags); } static RestoreFile unpack(Tuple const& t) { RestoreFile r; @@ -252,6 +256,8 @@ class RestoreConfig : public KeyBackedTaskConfig { r.fileSize = t.getInt(i++); r.blockSize = t.getInt(i++); r.endVersion = t.getInt(i++); + r.tagId = t.getInt(i++); + r.totalTags = t.getInt(i++); return r; } }; @@ -259,6 +265,9 @@ class RestoreConfig : public KeyBackedTaskConfig { typedef KeyBackedSet FileSetT; FileSetT fileSet() { return configSpace.pack(__FUNCTION__sr); } + FileSetT logFileSet() { return configSpace.pack(__FUNCTION__sr); } + FileSetT rangeFileSet() { return configSpace.pack(__FUNCTION__sr); } + Future isRunnable(Reference tr) { return map(stateEnum().getD(tr), [](ERestoreState s) -> bool { return s != ERestoreState::ABORTED && s != ERestoreState::COMPLETED && s != ERestoreState::UNITIALIZED; @@ -444,7 +453,8 @@ ACTOR Future RestoreConfig::getProgress_impl(RestoreConfig restore, .detail("ApplyLag", lag.get()) .detail("TaskInstance", THIS_ADDR); - return format("Tag: %s UID: %s State: %s Blocks: %lld/%lld BlocksInProgress: %lld Files: %lld BytesWritten: " + return format("Tag: %s UID: %s State: %s Blocks: %lld/%lld BlocksInProgress: %lld Files: %lld BytesWritten: + " "%lld CurrentVersion: %lld FirstConsistentVersion: %lld ApplyVersionLag: %lld LastError: %s", tag.get().c_str(), uid.toString().c_str(), @@ -488,6 +498,667 @@ ACTOR Future RestoreConfig::getFullStatus_impl(RestoreConfig restor return returnStr; } +// two buffers are alternatively serving data and reading data from file +// thus when one buffer is serving data through peek() +// the other buffer is reading data from file to provide pipelining. +class TwoBuffers : public ReferenceCounted, NonCopyable { +public: + class IteratorBuffer : public ReferenceCounted { + public: + std::shared_ptr data; + // has_value means there is data, otherwise it means there is no data being fetched or ready + // is_valid means data is being fetched, is_ready means data is ready + std::optional> fetchingData; + size_t size; + int index; + int capacity; + IteratorBuffer(int _capacity) { + capacity = _capacity; + data = std::shared_ptr(new char[capacity]()); + fetchingData.reset(); + size = 0; + } + bool is_valid() { return fetchingData.has_value(); } + void reset() { + size = 0; + index = 0; + fetchingData.reset(); + } + }; + TwoBuffers(int capacity, Reference _bc, std::vector& _files, int tag); + // ready need to be called first before calling peek + // because a shared_ptr cannot be wrapped by a Future + // this method ensures the current buffer has available data + Future ready(); + ACTOR static Future ready(Reference self); + // fill buffer[index] with the next block of file + // it has side effects to change currentFileIndex and currentFilePosition + ACTOR static Future readNextBlock(Reference self, int index); + // peek can only be called after ready is called + // it returns the pointer to the active buffer + std::shared_ptr peek(); + + int getFileIndex(); + void setFileIndex(int); + + bool hasNext(); + + void reset(); + + // discard the current buffer and swap to the next one + void discardAndSwap(); + + // try to fill the buffer[index] + // but no-op if the buffer have valid data or it is actively being filled + void fillBufferIfAbsent(int index); + + size_t getBufferSize(); + +private: + Reference buffers[2]; // Two buffers for alternating + size_t bufferCapacity; // Size of each buffer in bytes + Reference bc; + std::vector files; + int tag; + + int cur; // Index of the current active buffer (0 or 1) + size_t currentFileIndex; // Index of the current file being read + size_t currentFilePosition; // Current read position in the current file +}; + +TwoBuffers::TwoBuffers(int capacity, + Reference _bc, + std::vector& _files, + int _tag) + : currentFileIndex(0), currentFilePosition(0), cur(0) { + bufferCapacity = capacity; + files = _files; + bc = _bc; + tag = _tag; + buffers[0] = makeReference(capacity); + buffers[1] = makeReference(capacity); +} + +bool TwoBuffers::hasNext() { + // if it is being load (valid but not ready, what would be the size?) + while (currentFileIndex < files.size() && currentFilePosition >= files[currentFileIndex].fileSize) { + currentFileIndex++; + currentFilePosition = 0; + } + + if (buffers[0]->is_valid() || buffers[1]->is_valid()) { + return true; + } + + return currentFileIndex != files.size(); +} + +Future TwoBuffers::ready() { + return ready(Reference::addRef(this)); +} + +ACTOR Future TwoBuffers::ready(Reference self) { + // if cur is not ready, then wait + if (!self->hasNext()) { + return Void(); + } + // try to fill the current buffer, and wait before it is filled + self->fillBufferIfAbsent(self->cur); + wait(self->buffers[self->cur]->fetchingData.value()); + // try to fill the next buffer, do not wait for the filling + if (self->hasNext()) { + self->fillBufferIfAbsent(1 - self->cur); + } + return Void(); +} + +std::shared_ptr TwoBuffers::peek() { + return buffers[cur]->data; +} + +int TwoBuffers::getFileIndex() { + return buffers[cur]->index; +} + +void TwoBuffers::setFileIndex(int newIndex) { + if (newIndex < 0 || newIndex >= files.size()) { + TraceEvent(SevError, "TwoBuffersFileIndexOutOfBound") + .detail("FilesSize", files.size()) + .detail("NewIndex", newIndex) + .log(); + } + currentFileIndex = newIndex; +} + +void TwoBuffers::discardAndSwap() { + // invalidate cur and change cur to next + buffers[cur]->fetchingData.reset(); + cur = 1 - cur; +} + +void TwoBuffers::reset() { + // invalidate cur and change cur to next + buffers[0]->reset(); + buffers[1]->reset(); + cur = 0; + currentFileIndex = 0; + currentFilePosition = 0; +} + +size_t TwoBuffers::getBufferSize() { + return buffers[cur]->size; +} + +static double testKeyToDouble(const KeyRef& p) { + uint64_t x = 0; + sscanf(p.toString().c_str(), "%" SCNx64, &x); + return *(double*)&x; +} + +// only one readNextBlock can be run at a single time, otherwie the same block might be loaded twice +ACTOR Future TwoBuffers::readNextBlock(Reference self, int index) { + state Reference asyncFile; + if (self->currentFileIndex >= self->files.size()) { + TraceEvent(SevError, "ReadNextBlockOutOfBound") + .detail("FileIndex", self->currentFileIndex) + .detail("Tag", self->tag) + .detail("Position", self->currentFilePosition) + .detail("FileSize", self->files[self->currentFileIndex].fileSize) + .detail("FilesCount", self->files.size()) + .log(); + return Void(); + } + Reference asyncFileTmp = wait(self->bc->readFile(self->files[self->currentFileIndex].fileName)); + asyncFile = asyncFileTmp; + state size_t fileSize = self->files[self->currentFileIndex].fileSize; + size_t remaining = fileSize - self->currentFilePosition; + state size_t bytesToRead = std::min(self->bufferCapacity, remaining); + state int bytesRead = wait( + asyncFile->read(static_cast(self->buffers[index]->data.get()), bytesToRead, self->currentFilePosition)); + if (bytesRead != bytesToRead) + throw restore_bad_read(); + self->buffers[index]->index = self->currentFileIndex; + self->buffers[index]->size = bytesRead; // Set to actual bytes read + self->currentFilePosition += bytesRead; + + return Void(); +} + +void TwoBuffers::fillBufferIfAbsent(int index) { + auto self = Reference::addRef(this); + + if (self->buffers[index]->is_valid()) { + // if this buffer is valid, then do not overwrite it + return; + } + if (self->currentFileIndex == self->files.size()) { + // quit if no more contents + return; + } + self->buffers[index]->fetchingData = readNextBlock(self, index); + return; +} + +bool endOfBlock(char* start, int offset) { + unsigned char paddingChar = '\xff'; + return (unsigned char)*(start + offset) == paddingChar; +} + +class PartitionedLogIteratorSimple : public PartitionedLogIterator { +public: + const int BATCH_READ_BLOCK_COUNT = 1; + const int BLOCK_SIZE = CLIENT_KNOBS->BACKUP_LOGFILE_BLOCK_SIZE; + const int mutationHeaderBytes = sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t); + Reference bc; + size_t bufferCapacity; + int tag; + std::vector files; + size_t bufferOffset; // Current read offset + int bufferSize; + int fileOffset; + int fileIndex; + std::shared_ptr buffer; + std::vector endVersions; + + PartitionedLogIteratorSimple(Reference _bc, + int _tag, + std::vector _files, + std::vector _endVersions); + + bool hasNext(); + Future loadNextBlock(); + ACTOR static Future loadNextBlock(Reference self); + void removeBlockHeader(); + + Standalone> consumeData(Version firstVersion); + + // find the next version without advanding the iterator + Future peekNextVersion(); + ACTOR static Future peekNextVersion(Reference iterator); + + // get all the mutations of next version and advance the iterator + // this might issue multiple consumeData() if the data of a version cross buffer boundary + Future>> getNext(); + ACTOR static Future>> getNext( + Reference iterator); +}; + +PartitionedLogIteratorSimple::PartitionedLogIteratorSimple(Reference _bc, + int _tag, + std::vector _files, + std::vector _endVersions) + : bc(_bc), tag(_tag), endVersions(_endVersions), files(std::move(_files)), bufferOffset(0) { + bufferCapacity = BATCH_READ_BLOCK_COUNT * BLOCK_SIZE; + buffer = std::shared_ptr(new char[bufferCapacity]()); + fileOffset = 0; + fileIndex = 0; + bufferSize = 0; +} + +// it will set fileOffset and fileIndex +bool PartitionedLogIteratorSimple::hasNext() { + if (bufferOffset < bufferSize) { + return true; + } + while (fileIndex < files.size() && fileOffset >= files[fileIndex].fileSize) { + TraceEvent("ReachEndOfLogFiles") + .detail("BufferOffset", bufferOffset) + .detail("BufferSize", bufferSize) + .detail("FileOffset", fileOffset) + .detail("FileSize", files[fileIndex].fileSize) + .detail("FileName", files[fileIndex].fileName) + .detail("Tag", tag) + .detail("Index", fileIndex) + .log(); + fileOffset = 0; + fileIndex++; + } + return fileIndex < files.size() && fileOffset < files[fileIndex].fileSize; +} + +void PartitionedLogIteratorSimple::removeBlockHeader() { + if (bufferOffset % BLOCK_SIZE == 0) { + bufferOffset += sizeof(uint32_t); + } +} + +Standalone> PartitionedLogIteratorSimple::consumeData(Version firstVersion) { + Standalone> mutations = Standalone>(); + char* start = buffer.get(); + bool foundNewVersion = false; + while (bufferOffset < bufferSize) { + while (bufferOffset < bufferSize && !endOfBlock(start, bufferOffset)) { + // for each block + removeBlockHeader(); + + // encoding format: + // wr << bigEndian64(message.version.version) << bigEndian32(message.version.sub) << + // bigEndian32(mutation.size()); + Version version; + std::memcpy(&version, start + bufferOffset, sizeof(Version)); + version = bigEndian64(version); + if (version != firstVersion) { + foundNewVersion = true; + break; // Different version, stop here + } + + int32_t subsequence; + std::memcpy(&subsequence, start + bufferOffset + sizeof(Version), sizeof(int32_t)); + subsequence = bigEndian32(subsequence); + + int32_t mutationSize; + std::memcpy(&mutationSize, start + bufferOffset + sizeof(Version) + sizeof(int32_t), sizeof(int32_t)); + mutationSize = bigEndian32(mutationSize); + + // assumption: the entire mutation is within the buffer + size_t mutationTotalSize = mutationHeaderBytes + mutationSize; + ASSERT(bufferOffset + mutationTotalSize <= bufferSize); + + // transform from stringref to mutationref here + Standalone mutationData = makeString(mutationSize); + std::memcpy(mutateString(mutationData), start + bufferOffset + mutationHeaderBytes, mutationSize); + ArenaReader reader(mutationData.arena(), mutationData, AssumeVersion(g_network->protocolVersion())); + MutationRef mutation; + reader >> mutation; + + VersionedMutation vm; + vm.version = version; + vm.subsequence = subsequence; + vm.mutation = mutation; + mutations.push_back_deep(mutations.arena(), vm); + // Move the bufferOffset to include this mutation + bufferOffset += mutationTotalSize; + } + + if (bufferOffset < bufferSize && endOfBlock(start, bufferOffset)) { + // there are paddings + int remain = BLOCK_SIZE - (bufferOffset % BLOCK_SIZE); + bufferOffset += remain; + } + if (foundNewVersion) { + break; + } + } + + return mutations; +} + +Future PartitionedLogIteratorSimple::loadNextBlock() { + return loadNextBlock(Reference::addRef(this)); +} + +ACTOR Future PartitionedLogIteratorSimple::loadNextBlock(Reference self) { + if (self->bufferOffset < self->bufferSize) { + // do nothing + return Void(); + } + if (!self->hasNext()) { + return Void(); + } + state Reference asyncFile; + Reference asyncFileTmp = wait(self->bc->readFile(self->files[self->fileIndex].fileName)); + asyncFile = asyncFileTmp; + state size_t fileSize = self->files[self->fileIndex].fileSize; + size_t remaining = fileSize - self->fileOffset; + state size_t bytesToRead = std::min(self->bufferCapacity, remaining); + state int bytesRead = + wait(asyncFile->read(static_cast((self->buffer.get())), bytesToRead, self->fileOffset)); + if (bytesRead != bytesToRead) + throw restore_bad_read(); + self->bufferSize = bytesRead; // Set to actual bytes read + self->bufferOffset = 0; // Reset bufferOffset for the new data + self->fileOffset += bytesRead; + return Void(); +} + +Future PartitionedLogIteratorSimple::peekNextVersion() { + return peekNextVersion(Reference::addRef(this)); +} + +ACTOR Future PartitionedLogIteratorSimple::peekNextVersion(Reference self) { + // Read the first mutation's version + if (!self->hasNext()) { + return Version(0); + } + wait(self->loadNextBlock()); + self->removeBlockHeader(); + state Version version; + std::memcpy(&version, self->buffer.get() + self->bufferOffset, sizeof(Version)); + version = bigEndian64(version); + + while (self->fileIndex < self->endVersions.size() - 1 && version >= self->endVersions[self->fileIndex]) { + TraceEvent("SimpleIteratorFindOverlapAndSkip") + .detail("Version", version) + .detail("FileIndex", self->fileIndex) + .log(); + self->bufferOffset = 0; + self->bufferSize = 0; + self->fileOffset = 0; + self->fileIndex += 1; + wait(self->loadNextBlock()); + self->removeBlockHeader(); + std::memcpy(&version, self->buffer.get() + self->bufferOffset, sizeof(Version)); + version = bigEndian64(version); + } + return version; +} + +ACTOR Future>> PartitionedLogIteratorSimple::getNext( + Reference self) { + state Standalone> mutations; + if (!self->hasNext()) { + TraceEvent(SevWarn, "SimpleIteratorExhausted") + .detail("BufferOffset", self->bufferOffset) + .detail("BufferSize", self->bufferSize) + .detail("Tag", self->tag) + .log(); + return mutations; + } + state Version firstVersion = wait(self->peekNextVersion()); + Standalone> firstBatch = self->consumeData(firstVersion); + mutations = firstBatch; + // If the current buffer is fully consumed, then we need to check the next buffer in case + // the version is sliced across this buffer boundary + + while (self->bufferOffset >= self->bufferSize) { + // data for one version cannot exceed single buffer size + // if hitting the end of a batch, check the next batch in case version is + if (self->hasNext()) { + // now this is run for each block, but it is not necessary if it is the last block of a file + // cannot check hasMoreData here because other buffer might have the last piece + wait(self->loadNextBlock()); + Standalone> batch = self->consumeData(firstVersion); + for (const VersionedMutation& vm : batch) { + mutations.push_back_deep(mutations.arena(), vm); + } + } else { + break; + } + } + return mutations; +} + +Future>> PartitionedLogIteratorSimple::getNext() { + return getNext(Reference::addRef(this)); +} + +class PartitionedLogIteratorTwoBuffers : public PartitionedLogIterator { +private: + Reference twobuffer; + + // consume single version data upto the end of the current batch + // stop if seeing a different version from the parameter. + // it has side effects to update bufferOffset after reading the data + Future>> consumeData(Version firstVersion); + ACTOR static Future>> consumeData( + Reference self, + Version v); + + // each block has a format of {
[mutations]}, need to skip the header to read mutations + // this method check if bufferOffset is at the boundary and advance it if necessary + void removeBlockHeader(); + +public: + // read up to a fixed number of block count + // noted that each version has to be contained within 2 blocks + const int BATCH_READ_BLOCK_COUNT = 1; + const int BLOCK_SIZE = CLIENT_KNOBS->BACKUP_LOGFILE_BLOCK_SIZE; + const int mutationHeaderBytes = sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t); + Reference bc; + int tag; + std::vector files; + std::vector endVersions; + bool hasMoreData; // Flag indicating if more data is available + size_t bufferOffset; // Current read offset + // empty means no data, future is valid but not ready means being fetched + // future is ready means it currently holds data + + PartitionedLogIteratorTwoBuffers(Reference _bc, + int _tag, + std::vector _files, + std::vector _endVersions); + + // whether there are more contents for this tag in all files specified + bool hasNext(); + + // find the next version without advanding the iterator + Future peekNextVersion(); + ACTOR static Future peekNextVersion(Reference iterator); + + // get all the mutations of next version and advance the iterator + // this might issue multiple consumeData() if the data of a version cross buffer boundary + Future>> getNext(); + ACTOR static Future>> getNext( + Reference iterator); +}; + +Future>> PartitionedLogIteratorTwoBuffers::consumeData(Version firstVersion) { + return consumeData(Reference::addRef(this), firstVersion); +} + +ACTOR Future>> PartitionedLogIteratorTwoBuffers::consumeData( + Reference self, + Version firstVersion) { + state Standalone> mutations = Standalone>(); + wait(self->twobuffer->ready()); + std::shared_ptr start = self->twobuffer->peek(); + int size = self->twobuffer->getBufferSize(); + bool foundNewVersion = false; + while (self->bufferOffset < size) { + while (self->bufferOffset < size && !endOfBlock(start.get(), self->bufferOffset)) { + // for each block + self->removeBlockHeader(); + + // encoding is: + // wr << bigEndian64(message.version.version) << bigEndian32(message.version.sub) << + // bigEndian32(mutation.size()); + Version version; + std::memcpy(&version, start.get() + self->bufferOffset, sizeof(Version)); + version = bigEndian64(version); + if (version != firstVersion) { + foundNewVersion = true; + break; // Different version, stop here + } + + int32_t subsequence; + std::memcpy(&subsequence, start.get() + self->bufferOffset + sizeof(Version), sizeof(int32_t)); + subsequence = bigEndian32(subsequence); + + int32_t mutationSize; + std::memcpy( + &mutationSize, start.get() + self->bufferOffset + sizeof(Version) + sizeof(int32_t), sizeof(int32_t)); + mutationSize = bigEndian32(mutationSize); + + // assumption: the entire mutation is within the buffer + size_t mutationTotalSize = self->mutationHeaderBytes + mutationSize; + ASSERT(self->bufferOffset + mutationTotalSize <= size); + + Standalone mutationData = makeString(mutationSize); + std::memcpy( + mutateString(mutationData), start.get() + self->bufferOffset + self->mutationHeaderBytes, mutationSize); + // transform from stringref to mutationref here + ArenaReader reader(mutationData.arena(), mutationData, AssumeVersion(g_network->protocolVersion())); + MutationRef mutation; + reader >> mutation; + + VersionedMutation vm; + vm.version = version; + vm.subsequence = subsequence; + vm.mutation = mutation; + mutations.push_back_deep(mutations.arena(), vm); + // Move the bufferOffset to include this mutation + self->bufferOffset += mutationTotalSize; + } + + if (self->bufferOffset < size && endOfBlock(start.get(), self->bufferOffset)) { + // there are paddings, skip them + int remain = self->BLOCK_SIZE - (self->bufferOffset % self->BLOCK_SIZE); + self->bufferOffset += remain; + } + if (foundNewVersion) { + break; + } + } + return mutations; +} + +void PartitionedLogIteratorTwoBuffers::removeBlockHeader() { + if (bufferOffset % BLOCK_SIZE == 0) { + bufferOffset += sizeof(uint32_t); + } +} + +PartitionedLogIteratorTwoBuffers::PartitionedLogIteratorTwoBuffers(Reference _bc, + int _tag, + std::vector _files, + std::vector _endVersions) + : bc(_bc), tag(_tag), files(std::move(_files)), endVersions(_endVersions), bufferOffset(0) { + int bufferCapacity = BATCH_READ_BLOCK_COUNT * BLOCK_SIZE; + twobuffer = makeReference(bufferCapacity, _bc, files, tag); +} + +bool PartitionedLogIteratorTwoBuffers::hasNext() { + return twobuffer->hasNext(); +} + +Future PartitionedLogIteratorTwoBuffers::peekNextVersion() { + return peekNextVersion(Reference::addRef(this)); +} +ACTOR Future PartitionedLogIteratorTwoBuffers::peekNextVersion( + Reference self) { + // Read the first mutation's version + state std::shared_ptr start; + state Version version; + state int fileIndex; + if (!self->hasNext()) { + return Version(0); + } + wait(self->twobuffer->ready()); + start = self->twobuffer->peek(); + self->removeBlockHeader(); + std::memcpy(&version, start.get() + self->bufferOffset, sizeof(Version)); + version = bigEndian64(version); + fileIndex = self->twobuffer->getFileIndex(); + while (fileIndex < self->endVersions.size() - 1 && version >= self->endVersions[fileIndex]) { + TraceEvent("RestoreLogFilesFoundOverlapAndSkip") + .detail("Version", version) + .detail("FileIndex", fileIndex) + .log(); + // need to read from next file in the case of overlap range versions between log files + self->twobuffer->reset(); + self->bufferOffset = 0; + self->twobuffer->setFileIndex(fileIndex + 1); + wait(self->twobuffer->ready()); + start = self->twobuffer->peek(); + self->removeBlockHeader(); + std::memcpy(&version, start.get() + self->bufferOffset, sizeof(Version)); + version = bigEndian64(version); + fileIndex = self->twobuffer->getFileIndex(); + } + return version; +} + +ACTOR Future>> PartitionedLogIteratorTwoBuffers::getNext( + Reference self) { + state Standalone> mutations; + if (!self->hasNext()) { + TraceEvent(SevWarn, "IteratorExhausted").log(); + return mutations; + } + state Version firstVersion = wait(self->peekNextVersion()); + + Standalone> firstBatch = wait(self->consumeData(firstVersion)); + mutations = firstBatch; + // If the current buffer is fully consumed, then we need to check the next buffer in case + // the version is sliced across this buffer boundary + while (self->bufferOffset >= self->twobuffer->getBufferSize()) { + // fmt::print(stderr, "getNext: offset={}, size={}\n", self->bufferOffset, self->twobuffer->getBufferSize()); + self->twobuffer->discardAndSwap(); + self->bufferOffset = 0; + // data for one version cannot exceed single buffer size + // if hitting the end of a batch, check the next batch in case version is + if (self->twobuffer->hasNext()) { + // now this is run for each block, but it is not necessary if it is the last block of a file + // cannot check hasMoreData here because other buffer might have the last piece + Version nextVersion = wait(self->peekNextVersion()); + if (nextVersion != firstVersion) { + break; + } + Standalone> batch = wait(self->consumeData(firstVersion)); + for (const VersionedMutation& vm : batch) { + mutations.push_back_deep(mutations.arena(), vm); + } + } else { + break; + } + } + return mutations; +} + +Future>> PartitionedLogIteratorTwoBuffers::getNext() { + return getNext(Reference::addRef(this)); +} + FileBackupAgent::FileBackupAgent() : subspace(Subspace(fileBackupPrefixRange.begin)) // The other subspaces have logUID -> value @@ -1368,6 +2039,8 @@ struct LogFileWriter { int64_t blockEnd; }; +// input: a string of [param1, param2], [param1, param2] ..., [param1, param2] +// output: a vector of [param1, param2] after removing the length info Standalone> decodeMutationLogFileBlock(const Standalone& buf) { Standalone> results({}, buf.arena()); StringRefReader reader(buf, restore_corrupted_data()); @@ -1640,6 +2313,7 @@ ACTOR static Future addBackupTask(StringRef name, state Reference task(new Task(name, version, doneKey, priority)); // Bind backup config to new task + // allow this new task to find the config(keyspace) of the parent task wait(config.toTask(tr, task, setValidation)); // Set task specific params @@ -3058,6 +3732,7 @@ struct BackupLogsDispatchTask : BackupTaskFuncBase { if (!partitionedLog.present() || !partitionedLog.get()) { // Add the initial log range task to read/copy the mutations and the next logs dispatch task which will // run after this batch is done + // read blog/ prefix and write those (param1, param2) into files wait(success(BackupLogRangeTaskFunc::addTask(tr, taskBucket, task, @@ -3065,6 +3740,7 @@ struct BackupLogsDispatchTask : BackupTaskFuncBase { beginVersion, endVersion, TaskCompletionKey::joinWith(logDispatchBatchFuture)))); + // issue the next key range wait(success(BackupLogsDispatchTask::addTask(tr, taskBucket, task, @@ -3771,6 +4447,7 @@ struct RestoreRangeTaskFunc : RestoreFileTaskFuncBase { state Reference inFile = wait(bc.get()->readFile(rangeFile.fileName)); state Standalone> blockData; try { + // data is each real KV, not encoded mutations Standalone> data = wait(decodeRangeFileBlock(inFile, readOffset, readLen, cx)); blockData = data; } catch (Error& e) { @@ -3858,6 +4535,7 @@ struct RestoreRangeTaskFunc : RestoreFileTaskFuncBase { // Clear the range we are about to set. // If start == 0 then use fileBegin for the start of the range, else data[start] // If iend == end then use fileEnd for the end of the range, else data[iend] + // clear the raw key(without alog prefix) state KeyRange trRange = KeyRangeRef( (start == 0) ? fileRange.begin : data[start].key.removePrefix(removePrefix.get()).withPrefix(addPrefix.get()), @@ -3945,6 +4623,7 @@ struct RestoreRangeTaskFunc : RestoreFileTaskFuncBase { // Update the KV range map if originalFileRange is set std::vector> updateMap; std::vector ranges = Params.getOriginalFileRanges(task); + // if to restore((a, b), (e, f), (x, y)), then there are 3 ranges for (auto& range : ranges) { Value versionEncoded = BinaryWriter::toValue(Params.inputFile().get(task).version, Unversioned()); updateMap.push_back(krmSetRange(tr, restore.applyMutationsMapPrefix(), range, versionEncoded)); @@ -4025,6 +4704,7 @@ std::pair decodeMutationLogKey(const StringRef& key) { // [includeVersion:uint64_t][val_length:uint32_t][mutation_1][mutation_2]...[mutation_k], // where a mutation is encoded as: // [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][param1][param2] +// noted version needs to be included here(0x0FDB00A200090001) std::vector decodeMutationLogValue(const StringRef& value) { StringRefReader reader(value, restore_corrupted_data()); @@ -4061,6 +4741,7 @@ std::vector decodeMutationLogValue(const StringRef& value) { } void AccumulatedMutations::addChunk(int chunkNumber, const KeyValueRef& kv) { + // here it validates that partition(chunk) number has to be continuous if (chunkNumber == lastChunkNumber + 1) { lastChunkNumber = chunkNumber; serializedMutations += kv.value.toString(); @@ -4091,6 +4772,7 @@ bool AccumulatedMutations::isComplete() const { // range in ranges. // It is undefined behavior to run this if isComplete() does not return true. bool AccumulatedMutations::matchesAnyRange(const RangeMapFilters& filters) const { + // decode param2, so that each actual mutations are in mutations variable std::vector mutations = decodeMutationLogValue(serializedMutations); for (auto& m : mutations) { if (m.type == MutationRef::Encrypted) { @@ -4143,13 +4825,17 @@ bool RangeMapFilters::match(const KeyRangeRef& range) const { std::vector filterLogMutationKVPairs(VectorRef data, const RangeMapFilters& filters) { std::unordered_map mutationBlocksByVersion; + // group mutations by version for (auto& kv : data) { + // each kv is a [param1, param2] auto versionAndChunkNumber = decodeMutationLogKey(kv.key); mutationBlocksByVersion[versionAndChunkNumber.first].addChunk(versionAndChunkNumber.second, kv); } std::vector output; + // then add each version to the output, and now each K in output is also a KeyValueRef, + // but mutations of the same versions stay together for (auto& vb : mutationBlocksByVersion) { AccumulatedMutations& m = vb.second; @@ -4233,7 +4919,6 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase { try { if (start == end) return Void(); - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); @@ -4241,7 +4926,7 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase { state int txBytes = 0; for (; i < end && txBytes < dataSizeLimit; ++i) { Key k = dataFiltered[i].key.withPrefix(mutationLogPrefix); - ValueRef v = dataFiltered[i].value; + ValueRef v = dataFiltered[i].value; // each KV is a [param1 with added prefix -> param2] tr->set(k, v); txBytes += k.expectedSize(); txBytes += v.expectedSize(); @@ -4313,6 +4998,7 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase { state Reference task(new Task(RestoreLogDataTaskFunc::name, RestoreLogDataTaskFunc::version, doneKey)); // Create a restore config from the current task and bind it to the new task. + // RestoreConfig(parentTask) creates prefix of : fileRestorePrefixRange.begin/uid->config/[uid] wait(RestoreConfig(parentTask).toTask(tr, task)); Params.inputFile().set(task, lf); Params.readOffset().set(task, offset); @@ -4342,6 +5028,618 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase { StringRef RestoreLogDataTaskFunc::name = "restore_log_data"_sr; REGISTER_TASKFUNC(RestoreLogDataTaskFunc); +// this method takes a version and a list of list of mutations of this verison, +// each list is returned from a iterator sorted by sub +// it will first add all mutations in subsequence order +// then combine them in old-format (param1, parma2) and return +// this method assumes that iterator can return a list of mutations +/* + mutations are serialized in file as below format: + `` + `` + `` + `…` + ` + + for now, assume each iterator returns a vector > + noted that the mutation's arena has to be valid during the execution + + according to BackupWorker::addMutation, version has 64, sub has 32 and mutation length has 32 + So iterator will combine all mutations in the same version and return a vector + iterator should also return the subsequence together with each mutation + as here we will do another mergeSort for subsequence again to decide the order + and here we will decode the stringref + + + Version currentVersion; + uint32_t sub; + uint32_t mutationSize; + BinaryReader rd(str, Unversioned()); + rd >> currentVersion >> sub >> mutationSize; +*/ + +// type|kLen|vLen|Key|Value +// ref: decodeBackupLogValue() +Standalone transformMutationToOldFormat(MutationRef m) { + BinaryWriter bw(Unversioned()); + uint32_t len1, len2, type; + type = m.type; + len1 = m.param1.size(); + len2 = m.param2.size(); + bw << type; + bw << len1; + bw << len2; + // do not use <<, it is overloaded for stringref to write its size first + bw.serializeBytes(m.param1); + bw.serializeBytes(m.param2); + return bw.toValue(); +} + +Standalone> generateOldFormatMutations( + Version commitVersion, + std::vector>>& newFormatMutations) { + Standalone> results; + std::vector>> oldFormatMutations; + // mergeSort subversion here + // just do a global sort for everyone + int32_t totalBytes = 0; + std::map>> mutationsBySub; + std::map>> tmpMap; + for (auto& eachTagMutations : newFormatMutations) { + for (auto& vm : eachTagMutations) { + uint32_t sub = vm.subsequence; + Standalone mutationOldFormat = transformMutationToOldFormat(vm.mutation); + mutationsBySub[sub].push_back(mutationOldFormat); + tmpMap[sub].push_back(vm.mutation); + totalBytes += mutationOldFormat.size(); + } + } + // the list of param2 needs to have the first 64 bites as 0x0FDB00A200090001 + BinaryWriter param2Writer(IncludeVersion(ProtocolVersion::withBackupMutations())); + param2Writer << totalBytes; + + for (auto& mutationsForSub : mutationsBySub) { + // concatenate them to param2Str + for (auto& m : mutationsForSub.second) { + // refer to transformMutationToOldFormat + param2Writer.serializeBytes(m); + } + } + Key param2Concat = param2Writer.toValue(); + + // deal with param1 + int32_t hashBase = commitVersion / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE; + + BinaryWriter wrParam1(Unversioned()); // hash/commitVersion/part + wrParam1 << (uint8_t)hashlittle(&hashBase, sizeof(hashBase), 0); + wrParam1 << bigEndian64(commitVersion); + uint32_t* partBuffer = nullptr; + + // generate a list of (param1, param2) + // param2 has format: length_of_the_mutation_group | encoded_mutation_1 | … | encoded_mutation_k + // each mutation has format type|kLen|vLen|Key|Value + for (int part = 0; part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE < param2Concat.size(); part++) { + KeyValueRef backupKV; + // Assign the second parameter as the part + backupKV.value = param2Concat.substr(part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE, + std::min(param2Concat.size() - part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE, + CLIENT_KNOBS->MUTATION_BLOCK_SIZE)); + // Write the last part of the mutation to the serialization, if the buffer is not defined + if (!partBuffer) { + // part = 0 + wrParam1 << bigEndian32(part); + partBuffer = (uint32_t*)((char*)wrParam1.getData() + wrParam1.getLength() - sizeof(uint32_t)); + } else { + // part > 0 + *partBuffer = bigEndian32(part); + } + backupKV.key = wrParam1.toValue(); + results.push_back_deep(results.arena(), backupKV); + } + return results; +} + +struct RestoreLogDataPartitionedTaskFunc : RestoreFileTaskFuncBase { + static StringRef name; + static constexpr uint32_t version = 1; + StringRef getName() const override { return name; }; + + static struct { + static TaskParam maxTagID() { return __FUNCTION__sr; } + static TaskParam beginVersion() { return __FUNCTION__sr; } + static TaskParam endVersion() { return __FUNCTION__sr; } + static TaskParam> logs() { return __FUNCTION__sr; } + } Params; + + static std::string printVec(std::vector& vec) { + std::string str = ""; + for (int i : vec) { + str += std::to_string(i); + str += ", "; + } + return str; + } + ACTOR static Future _execute(Database cx, + Reference taskBucket, + Reference futureBucket, + Reference task) { + state RestoreConfig restore(task); + + state int64_t maxTagID = Params.maxTagID().get(task); + state std::vector logs = Params.logs().get(task); + state Version begin = Params.beginVersion().get(task); + state Version end = Params.endVersion().get(task); + + state Reference tr(new ReadYourWritesTransaction(cx)); + state Reference bc; + state std::vector ranges; // this is the actual KV, not version + loop { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + + Reference _bc = wait(restore.sourceContainer().getOrThrow(tr)); + bc = getBackupContainerWithProxy(_bc); + + wait(store(ranges, restore.getRestoreRangesOrDefault(tr))); + + wait(checkTaskVersion(tr->getDatabase(), task, name, version)); + wait(taskBucket->keepRunning(tr, task)); + + break; + } catch (Error& e) { + wait(tr->onError(e)); + } + } + + std::vector> filesByTag(maxTagID + 1); + std::vector> fileEndVersionByTag(maxTagID + 1); + + for (RestoreConfig::RestoreFile& f : logs) { + // find the tag, aggregate files by tags + if (f.tagId == -1) { + // inconsistent data + TraceEvent(SevError, "PartitionedLogFileNoTag") + .detail("FileName", f.fileName) + .detail("FileSize", f.fileSize) + .log(); + } else { + filesByTag[f.tagId].push_back(f); + } + } + for (int i = 0; i < maxTagID + 1; i++) { + std::vector& files = filesByTag[i]; + int cnt = files.size(); + fileEndVersionByTag[i].resize(cnt); + for (int j = 0; j < cnt; j++) { + // [10, 20), [18, 30) -> endVersion is (18, 30) for 2 files so we have [10, 18) and [18, 30) + // greedy algorithm, because sorted by beginVersion and to minimize duplicate reading + if (j != cnt - 1 && files[j].endVersion < files[j + 1].version) { + TraceEvent(SevError, "NonContinuousLog").detail("Tag", i).detail("Index", j).log(); + } + fileEndVersionByTag[i][j] = (j == cnt - 1 ? end : files[j + 1].version); + } + } + + state std::vector> iterators(maxTagID + 1); + // for each tag, create an iterator + for (int k = 0; k < filesByTag.size(); k++) { + iterators[k] = + makeReference(bc, k, filesByTag[k], fileEndVersionByTag[k]); + } + + // mergeSort all iterator until all are exhausted + state int totalItereators = iterators.size(); + // it stores all mutations for the next min version, in new format + state std::vector>> mutationsSingleVersion; + state bool atLeastOneIteratorHasNext = true; + state Version minVersion; + state int k; + state int versionRestored = 0; + state std::vector minVs(totalItereators, 0); + while (atLeastOneIteratorHasNext) { + minVs.resize(totalItereators, 0); + atLeastOneIteratorHasNext = false; + minVersion = std::numeric_limits::max(); + k = 0; + for (; k < totalItereators; k++) { + if (!iterators[k]->hasNext()) { + continue; + } + atLeastOneIteratorHasNext = true; + Version v = wait(iterators[k]->peekNextVersion()); + minVs[k] = v; + + if (v <= minVersion) { + minVersion = v; + } + } + if (atLeastOneIteratorHasNext) { + k = 0; + for (; k < totalItereators; k++) { + if (!iterators[k]->hasNext()) { + continue; + } + Version v = wait(iterators[k]->peekNextVersion()); + if (v == minVersion) { + Standalone> tmp = wait(iterators[k]->getNext()); + mutationsSingleVersion.push_back(tmp); + } + } + + if (minVersion < begin) { + // skip generating mutations, because this is not within desired range + // this is already handled by the previous taskfunc + mutationsSingleVersion.clear(); + continue; + } else if (minVersion >= end) { + // all valid data has been consumed + break; + } + + // transform from new format to old format(param1, param2) + // in the current implementation, each version will trigger a mutation + // if each version data is too small, we might want to combine multiple versions + // for a single mutation + state Standalone> oldFormatMutations = + generateOldFormatMutations(minVersion, mutationsSingleVersion); + state int mutationIndex = 0; + state int txnCount = 0; + state int txBytes = 0; + state int totalMutation = oldFormatMutations.size(); + state int txBytesLimit = CLIENT_KNOBS->RESTORE_WRITE_TX_SIZE; + state Key mutationLogPrefix = restore.mutationLogPrefix(); + + // multiple transactions are needed, so this has to be in a _execute method rather than + // a _finish method + loop { + try { + if (mutationIndex == totalMutation) { + break; + } + txBytes = 0; + txnCount = 0; + tr->reset(); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + + while (mutationIndex + txnCount < totalMutation && txBytes < txBytesLimit) { + Key k = oldFormatMutations[mutationIndex + txnCount].key.withPrefix(mutationLogPrefix); + ValueRef v = oldFormatMutations[mutationIndex + txnCount] + .value; // each KV is a [param1 with added prefix -> param2] + tr->set(k, v); + txBytes += k.expectedSize(); + txBytes += v.expectedSize(); + ++txnCount; + } + wait(tr->commit()); + mutationIndex += txnCount; // update mutationIndex after the commit succeeds + } catch (Error& e) { + if (e.code() == error_code_transaction_too_large) { + txBytesLimit /= 2; + } else { + wait(tr->onError(e)); + } + } + } + ++versionRestored; + } + mutationsSingleVersion.clear(); + } + return Void(); + } + + ACTOR static Future _finish(Reference tr, + Reference taskBucket, + Reference futureBucket, + Reference task) { + RestoreConfig(task).fileBlocksFinished().atomicOp(tr, 1, MutationRef::Type::AddValue); + + state Reference taskFuture = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]); + wait(taskFuture->set(tr, taskBucket) && taskBucket->finish(tr, task)); + + return Void(); + } + + ACTOR static Future addTask(Reference tr, + Reference taskBucket, + Reference parentTask, + int64_t maxTagID, + std::vector logs, + Version begin, + Version end, + TaskCompletionKey completionKey, + Reference waitFor = Reference()) { + Key doneKey = wait(completionKey.get(tr, taskBucket)); + state Reference task( + new Task(RestoreLogDataPartitionedTaskFunc::name, RestoreLogDataPartitionedTaskFunc::version, doneKey)); + + // Create a restore config from the current task and bind it to the new task. + // RestoreConfig(parentTask) createsa prefix of : fileRestorePrefixRange.begin/uid->config/[uid] + wait(RestoreConfig(parentTask).toTask(tr, task)); + Params.maxTagID().set(task, maxTagID); + Params.beginVersion().set(task, begin); + Params.endVersion().set(task, end); + Params.logs().set(task, logs); + + if (!waitFor) { + return taskBucket->addTask(tr, task); + } + + wait(waitFor->onSetAddTask(tr, taskBucket, task)); + return "OnSetAddTask"_sr; + } + + Future execute(Database cx, + Reference tb, + Reference fb, + Reference task) override { + return _execute(cx, tb, fb, task); + }; + Future finish(Reference tr, + Reference tb, + Reference fb, + Reference task) override { + return _finish(tr, tb, fb, task); + }; +}; +StringRef RestoreLogDataPartitionedTaskFunc::name = "restore_log_data_partitioned"_sr; +REGISTER_TASKFUNC(RestoreLogDataPartitionedTaskFunc); + +// each task can be partitioned to smaller ranges because commit proxy would +// only start to commit alog/ prefix mutations to original prefix when +// the final version is set, but do it in a single task for now for simplicity +struct RestoreDispatchPartitionedTaskFunc : RestoreTaskFuncBase { + static StringRef name; + static constexpr uint32_t version = 1; + StringRef getName() const override { return name; }; + + static struct { + static TaskParam beginVersion() { return __FUNCTION__sr; } + static TaskParam firstVersion() { return __FUNCTION__sr; } + static TaskParam endVersion() { return __FUNCTION__sr; } + } Params; + + ACTOR static Future _finish(Reference tr, + Reference taskBucket, + Reference futureBucket, + Reference task) { + state RestoreConfig restore(task); + + state Version beginVersion = Params.beginVersion().get(task); + state Version firstVersion = Params.firstVersion().get(task); + state Version endVersion = Params.endVersion().get(task); + Reference _bc = wait(restore.sourceContainer().getOrThrow(tr)); + state Reference bc = getBackupContainerWithProxy(_bc); + state Reference onDone = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]); + + state Version restoreVersion; + state int fileLimit = 1000; + + wait(store(restoreVersion, restore.restoreVersion().getOrThrow(tr)) && + checkTaskVersion(tr->getDatabase(), task, name, version)); + + // if current is [40, 50] and restore version is 50, we need another [50, 51] task to process data at version 50 + state Version nextEndVersion = + std::min(restoreVersion + 1, endVersion + CLIENT_KNOBS->RESTORE_PARTITIONED_BATCH_VERSION_SIZE); + // update the apply mutations end version so the mutations from the previous batch can be applied. + // Only do this once beginVersion is > 0 (it will be 0 for the initial dispatch). + if (beginVersion > firstVersion) { + // if the last file is [80, 100] and the restoreVersion is 90, we should use 90 here + // this is an additional taskFunc after last file + restore.setApplyEndVersion(tr, std::min(beginVersion, restoreVersion + 1)); + } + + // The applyLag must be retrieved AFTER potentially updating the apply end version. + state int64_t applyLag = wait(restore.getApplyVersionLag(tr)); + // this is to guarantee commit proxy is catching up doing apply alog -> normal key + // with this backupFile -> alog process + // If starting a new batch and the apply lag is too large then re-queue and wait + if (applyLag > (BUGGIFY ? 1 : CLIENT_KNOBS->CORE_VERSIONSPERSECOND * 300)) { + // Wait a small amount of time and then re-add this same task. + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); + wait(success(RestoreDispatchPartitionedTaskFunc::addTask( + tr, taskBucket, task, firstVersion, beginVersion, endVersion))); + + TraceEvent("RestorePartitionDispatch") + .detail("RestoreUID", restore.getUid()) + .detail("BeginVersion", beginVersion) + .detail("ApplyLag", applyLag) + .detail("Decision", "too_far_behind") + .detail("TaskInstance", THIS_ADDR); + + wait(taskBucket->finish(tr, task)); + return Void(); + } + + // Get a batch of files. We're targeting batchSize blocks(30k) being dispatched so query for batchSize(150) + // files (each of which is 0 or more blocks). + // lets say files have [10, 20), [20, 30) then if our range is [15, 25], we need to include both files, + // because [15, 20] is included in the first file, and [20, 25] is included in the second file + // say we have b and e + // as a result, the first file(inclusive): largest file whose begin <= b, + // the last file(exclusive): smallest file whose begin > e + // because of the encoding, version comes before the type of files(log or range), + // begin needs to start from the beginning -- + // if we have a 3 files, (log,100,200), (range, 180), (log, 200, 300), then if we want to restore[190, 250] + // it would stop at (range, 180) and not looking at (log, 100, 200) + state Optional beginLogInclude = Optional{}; + // because of the encoding of RestoreFile, we use greaterThanOrEqual(end + 1) instead of greaterThan(end) + // because RestoreFile::pack has the version at the most significant position, and keyAfter(end) does not result + // in a end+1 + state Optional endLogExclude = wait( + restore.logFileSet().seekGreaterOrEqual(tr, RestoreConfig::RestoreFile({ endVersion + 1, "", false }))); + state RestoreConfig::FileSetT::RangeResultType logFiles = + wait(restore.logFileSet().getRange(tr, beginLogInclude, endLogExclude, fileLimit)); + state Optional beginRangeInclude = + wait(restore.rangeFileSet().seekGreaterOrEqual(tr, RestoreConfig::RestoreFile({ beginVersion, "", true }))); + // greaterThanOrEqual(end + 1) instead of greaterThan(end) + // because RestoreFile::pack has the version at the most significant position, and keyAfter(end) does not result + // in a end+1 + state Optional endRangeExclude = wait( + restore.rangeFileSet().seekGreaterOrEqual(tr, RestoreConfig::RestoreFile({ endVersion + 1, "", true }))); + state RestoreConfig::FileSetT::RangeResultType rangeFiles = + wait(restore.rangeFileSet().getRange(tr, beginRangeInclude, endRangeExclude, fileLimit)); + state int64_t maxTagID = 0; + state std::vector logs; + state std::vector ranges; + for (auto f : logFiles.results) { + if (f.endVersion > beginVersion) { + // skip all files whose endVersion is smaller or equal to beginVersion + logs.push_back(f); + maxTagID = std::max(maxTagID, f.tagId); + } + } + for (auto f : rangeFiles.results) { + // the getRange might get out-of-bound range file because log files need them to work + if (f.version >= beginVersion && f.version < endVersion) { + ranges.push_back(f); + } + } + // allPartsDone will be set once all block tasks in the current batch are finished. + // create a new future for the new batch + state Reference allPartsDone = futureBucket->future(tr); + restore.batchFuture().set(tr, allPartsDone->pack()); + + // if there are no files, if i am not the last batch, then on to the next batch + // if there are no files and i am the last batch, then just wait for applying to finish + // do we need this files.results.size() == 0 at all? + // if (files.results.size() == 0 && beginVersion >= restoreVersion) { + // fmt::print(stderr, "CheckBegin and restore, begin={}, restore={}, applyLag={}\n", beginVersion, + // restoreVersion, applyLag); + if (beginVersion > restoreVersion) { + if (applyLag == 0) { + // i am the last batch + // If apply lag is 0 then we are done so create the completion task + wait(success(RestoreCompleteTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal()))); + + TraceEvent("RestorePartitionDispatch") + .detail("RestoreUID", restore.getUid()) + .detail("BeginVersion", beginVersion) + .detail("ApplyLag", applyLag) + .detail("Decision", "restore_complete") + .detail("TaskInstance", THIS_ADDR); + } else { + // i am the last batch, and applyLag is not zero, then I will create another dummy task to wait + // for apply log to be zero, then it will go into the branch above. + // Applying of mutations is not yet finished so wait a small amount of time and then re-add this + // same task. + // this is only to create a dummy one wait for it to finish + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); + wait(success(RestoreDispatchPartitionedTaskFunc::addTask( + tr, taskBucket, task, firstVersion, beginVersion, endVersion))); + + TraceEvent("RestorePartitionDispatch") + .detail("RestoreUID", restore.getUid()) + .detail("BeginVersion", beginVersion) + .detail("ApplyLag", applyLag) + .detail("Decision", "apply_still_behind") + .detail("TaskInstance", THIS_ADDR); + } + wait(taskBucket->finish(tr, task)); + return Void(); + } + + // if we reach here, this batch is not empty(i.e. we have range and/or mutation files in this) + // Start moving through the file list and queuing up blocks. Only queue up to RESTORE_DISPATCH_ADDTASK_SIZE + // blocks per Dispatch task and target batchSize total per batch but a batch must end on a complete version + // boundary so exceed the limit if necessary to reach the end of a version of files. + state std::vector> addTaskFutures; + state int i = 0; + // need to process all range files, keep using the same RestoreRangeTaskFunc as non-partitioned restore. + // this can be done first, because they are not overlap within a restore uid + // each task will read the file, restore those key to their original keys after clear that range + // also it will update the keyVersionMap[key -> versionFromRangeFile] + // by this time, corresponding mutation files within the same version range has not been applied yet + // because they are waiting for the singal of this RestoreDispatchPartitionedTaskFunc + // when log are being applied, they will compare version of key to the keyVersionMap updated by range file + // after each RestoreDispatchPartitionedTaskFunc, keyVersionMap will be clear if mutation version is larger. + for (; i < ranges.size(); ++i) { + RestoreConfig::RestoreFile& f = ranges[i]; + // For each block of the file + for (int64_t j = 0; j < f.fileSize; j += f.blockSize) { + addTaskFutures.push_back(RestoreRangeTaskFunc::addTask(tr, + taskBucket, + task, + f, + j, + std::min(f.blockSize, f.fileSize - j), + TaskCompletionKey::joinWith(allPartsDone))); + } + } + bool is_set = wait(allPartsDone->isSet(tr)); + addTaskFutures.push_back(RestoreLogDataPartitionedTaskFunc::addTask( + tr, taskBucket, task, maxTagID, logs, beginVersion, endVersion, TaskCompletionKey::joinWith(allPartsDone))); + // even if file exsists, but they are empty, in this case just start the next batch + + addTaskFutures.push_back(RestoreDispatchPartitionedTaskFunc::addTask(tr, + taskBucket, + task, + firstVersion, + endVersion, + nextEndVersion, + TaskCompletionKey::noSignal(), + allPartsDone)); + + wait(waitForAll(addTaskFutures)); + wait(taskBucket->finish(tr, task)); + + TraceEvent("RestorePartitionDispatch") + .detail("RestoreUID", restore.getUid()) + .detail("BeginVersion", beginVersion) + .detail("EndVersion", endVersion) + .detail("ApplyLag", applyLag) + .detail("Decision", "dispatch_batch_complete") + .detail("TaskInstance", THIS_ADDR) + .log(); + + return Void(); + } + + ACTOR static Future addTask(Reference tr, + Reference taskBucket, + Reference parentTask, + Version firstVersion, + Version beginVersion, + Version endVersion, + TaskCompletionKey completionKey = TaskCompletionKey::noSignal(), + Reference waitFor = Reference()) { + Key doneKey = wait(completionKey.get(tr, taskBucket)); + + // Use high priority for dispatch tasks that have to queue more blocks for the current batch + unsigned int priority = 0; + state Reference task(new Task( + RestoreDispatchPartitionedTaskFunc::name, RestoreDispatchPartitionedTaskFunc::version, doneKey, priority)); + + // Create a config from the parent task and bind it to the new task + wait(RestoreConfig(parentTask).toTask(tr, task)); + Params.firstVersion().set(task, firstVersion); + Params.beginVersion().set(task, beginVersion); + Params.endVersion().set(task, endVersion); + + if (!waitFor) { + return taskBucket->addTask(tr, task); + } + + wait(waitFor->onSetAddTask(tr, taskBucket, task)); + return "OnSetAddTask"_sr; + } + + Future execute(Database cx, + Reference tb, + Reference fb, + Reference task) override { + return Void(); + }; + Future finish(Reference tr, + Reference tb, + Reference fb, + Reference task) override { + return _finish(tr, tb, fb, task); + }; +}; +StringRef RestoreDispatchPartitionedTaskFunc::name = "restore_dispatch_partitioned"_sr; +REGISTER_TASKFUNC(RestoreDispatchPartitionedTaskFunc); struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { static StringRef name; static constexpr uint32_t version = 1; @@ -4367,15 +5665,17 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { state int64_t remainingInBatch = Params.remainingInBatch().get(task); state bool addingToExistingBatch = remainingInBatch > 0; state Version restoreVersion; - state Future> onlyApplyMutationLogs = restore.onlyApplyMutationLogs().get(tr); - wait(store(restoreVersion, restore.restoreVersion().getOrThrow(tr)) && success(onlyApplyMutationLogs) && + wait(store(restoreVersion, restore.restoreVersion().getOrThrow(tr)) && checkTaskVersion(tr->getDatabase(), task, name, version)); // If not adding to an existing batch then update the apply mutations end version so the mutations from the // previous batch can be applied. Only do this once beginVersion is > 0 (it will be 0 for the initial // dispatch). if (!addingToExistingBatch && beginVersion > 0) { + // hfu5 : unblock apply alog to normal key space + // if the last file is [80, 100] and the restoreVersion is 90, we should use 90 here + // this call an additional call after last file restore.setApplyEndVersion(tr, std::min(beginVersion, restoreVersion + 1)); } @@ -4402,6 +5702,7 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { return Void(); } + // need beginFile to handle stop in the middle of version case state std::string beginFile = Params.beginFile().getOrDefault(task); // Get a batch of files. We're targeting batchSize blocks being dispatched so query for batchSize files // (each of which is 0 or more blocks). @@ -4453,6 +5754,7 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { .detail("TaskInstance", THIS_ADDR); } else if (beginVersion < restoreVersion) { // If beginVersion is less than restoreVersion then do one more dispatch task to get there + // there are no more files between beginVersion and restoreVersion wait(success(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, restoreVersion, "", 0, batchSize))); TraceEvent("FileRestoreDispatch") @@ -4479,6 +5781,7 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { } else { // Applying of mutations is not yet finished so wait a small amount of time and then re-add this // same task. + // this is only to create a dummy one wait for it to finish wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); wait(success(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, beginVersion, "", 0, batchSize))); @@ -4508,6 +5811,9 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { state int64_t beginBlock = Params.beginBlock().getOrDefault(task); state int i = 0; + // for each file + // not creating a new task at this level because restore files are read back together -- both range and log + // so i have to process range files anyway. for (; i < files.results.size(); ++i) { RestoreConfig::RestoreFile& f = files.results[i]; @@ -4517,6 +5823,9 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { if (f.version != endVersion && remainingInBatch <= 0) { // Next start will be at the first version after endVersion at the first file first block ++endVersion; + // beginFile set to empty to indicate we are not in the middle of a range + // by middle of a range, we mean that we have rangeFile v=80, and logFile v=[80, 100], + // then we have to include this log file too in this batch beginFile = ""; beginBlock = 0; break; @@ -4626,8 +5935,12 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { // If beginFile is not empty then we had to stop in the middle of a version (possibly within a file) so we // cannot end the batch here because we do not know if we got all of the files and blocks from the last // version queued, so make sure remainingInBatch is at least 1. - if (!beginFile.empty()) + if (!beginFile.empty()) { + // this is to make sure if we stop in the middle of a version, we do not end this batch + // instead next RestoreDispatchTaskFunc should have addingToExistingBatch as true + // thus they are considered the same batch and alog will be committed only when all of them succeed remainingInBatch = std::max(1, remainingInBatch); + } // If more blocks need to be dispatched in this batch then add a follow-on task that is part of the // allPartsDone group which will won't wait to run and will add more block tasks. @@ -4914,11 +6227,15 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { // Convert the two lists in restorable (logs and ranges) to a single list of RestoreFiles. // Order does not matter, they will be put in order when written to the restoreFileMap below. state std::vector files; + state std::vector logFiles; + state std::vector rangeFiles; if (!logsOnly) { beginVersion = restorable.get().snapshot.beginVersion; + if (!inconsistentSnapshotOnly) { for (const RangeFile& f : restorable.get().ranges) { files.push_back({ f.version, f.fileName, true, f.blockSize, f.fileSize }); + rangeFiles.push_back({ f.version, f.fileName, true, f.blockSize, f.fileSize }); // In a restore with both snapshots and logs, the firstConsistentVersion is the highest version // of any range file. firstConsistentVersion = std::max(firstConsistentVersion, f.version); @@ -4927,6 +6244,7 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { for (int i = 0; i < restorable.get().ranges.size(); ++i) { const RangeFile& f = restorable.get().ranges[i]; files.push_back({ f.version, f.fileName, true, f.blockSize, f.fileSize }); + rangeFiles.push_back({ f.version, f.fileName, true, f.blockSize, f.fileSize }); // In inconsistentSnapshotOnly mode, if all range files have the same version, then it is the // firstConsistentVersion, otherwise unknown (use -1). if (i != 0 && f.version != firstConsistentVersion) { @@ -4942,7 +6260,10 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { } if (!inconsistentSnapshotOnly) { for (const LogFile& f : restorable.get().logs) { - files.push_back({ f.beginVersion, f.fileName, false, f.blockSize, f.fileSize, f.endVersion }); + files.push_back( + { f.beginVersion, f.fileName, false, f.blockSize, f.fileSize, f.endVersion, f.tagId, f.totalTags }); + logFiles.push_back( + { f.beginVersion, f.fileName, false, f.blockSize, f.fileSize, f.endVersion, f.tagId, f.totalTags }); } } // First version for which log data should be applied @@ -4961,6 +6282,81 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { } } + // add log files + state std::vector::iterator logStart = logFiles.begin(); + state std::vector::iterator logEnd = logFiles.end(); + + tr->reset(); + while (logStart != logEnd) { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + + wait(taskBucket->keepRunning(tr, task)); + + state std::vector::iterator i = logStart; + + state int txBytes = 0; + state int logFileCount = 0; + auto fileSet = restore.logFileSet(); + // as a result, fileSet has everything, including [beginVersion, endVersion] for each tag + for (; i != logEnd && txBytes < 1e6; ++i) { + txBytes += fileSet.insert(tr, *i); + ++logFileCount; + } + wait(tr->commit()); + + TraceEvent("FileRestoreLoadedLogFiles") + .detail("RestoreUID", restore.getUid()) + .detail("FileCount", logFileCount) + .detail("TransactionBytes", txBytes) + .detail("TaskInstance", THIS_ADDR); + + logStart = i; + tr->reset(); + } catch (Error& e) { + wait(tr->onError(e)); + } + } + + state std::vector::iterator rangeStart = rangeFiles.begin(); + state std::vector::iterator rangeEnd = rangeFiles.end(); + + tr->reset(); + while (rangeStart != rangeEnd) { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + + wait(taskBucket->keepRunning(tr, task)); + + i = rangeStart; + + txBytes = 0; + state int rangeFileCount = 0; + auto fileSet = restore.rangeFileSet(); + // as a result, fileSet has everything, including [beginVersion, endVersion] for each tag + for (; i != rangeEnd && txBytes < 1e6; ++i) { + txBytes += fileSet.insert(tr, *i); + // handle the remaining + ++rangeFileCount; + } + wait(tr->commit()); + + TraceEvent("FileRestoreLoadedRangeFiles") + .detail("RestoreUID", restore.getUid()) + .detail("FileCount", rangeFileCount) + .detail("TransactionBytes", txBytes) + .detail("TaskInstance", THIS_ADDR); + + rangeStart = i; + tr->reset(); + } catch (Error& e) { + wait(tr->onError(e)); + } + } + + // add files state std::vector::iterator start = files.begin(); state std::vector::iterator end = files.end(); @@ -4972,14 +6368,16 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { wait(taskBucket->keepRunning(tr, task)); - state std::vector::iterator i = start; + i = start; - state int txBytes = 0; + txBytes = 0; state int nFileBlocks = 0; state int nFiles = 0; auto fileSet = restore.fileSet(); + // as a result, fileSet has everything, including [beginVersion, endVersion] for each tag for (; i != end && txBytes < 1e6; ++i) { txBytes += fileSet.insert(tr, *i); + // handle the remaining nFileBlocks += (i->fileSize + i->blockSize - 1) / i->blockSize; ++nFiles; } @@ -5011,7 +6409,8 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { Reference futureBucket, Reference task) { state RestoreConfig restore(task); - + state bool transformPartitionedLog; + state Version restoreVersion; state Version firstVersion = Params.firstVersion().getOrDefault(task, invalidVersion); if (firstVersion == invalidVersion) { wait(restore.logError( @@ -5029,8 +6428,22 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { restore.setApplyEndVersion(tr, firstVersion); // Apply range data and log data in order - wait(success(RestoreDispatchTaskFunc::addTask( - tr, taskBucket, task, 0, "", 0, CLIENT_KNOBS->RESTORE_DISPATCH_BATCH_SIZE))); + wait(store(transformPartitionedLog, restore.transformPartitionedLog().getD(tr, Snapshot::False, false))); + wait(store(restoreVersion, restore.restoreVersion().getOrThrow(tr))); + + if (transformPartitionedLog) { + Version endVersion = + std::min(firstVersion + CLIENT_KNOBS->RESTORE_PARTITIONED_BATCH_VERSION_SIZE, restoreVersion); + fmt::print("Start Initial task, firstVersion={}, endVersion={}, restoreVersion={}\n", + firstVersion, + endVersion, + restoreVersion); + wait(success(RestoreDispatchPartitionedTaskFunc::addTask( + tr, taskBucket, task, firstVersion, firstVersion, endVersion))); + } else { + wait(success(RestoreDispatchTaskFunc::addTask( + tr, taskBucket, task, 0, "", 0, CLIENT_KNOBS->RESTORE_DISPATCH_BATCH_SIZE))); + } wait(taskBucket->finish(tr, task)); return Void(); @@ -5443,7 +6856,8 @@ class FileBackupAgentImpl { OnlyApplyMutationLogs onlyApplyMutationLogs, InconsistentSnapshotOnly inconsistentSnapshotOnly, Version beginVersion, - UID uid) { + UID uid, + TransformPartitionedLog transformPartitionedLog) { KeyRangeMap restoreRangeSet; for (auto& range : ranges) { restoreRangeSet.insert(range, 1); @@ -5516,6 +6930,7 @@ class FileBackupAgentImpl { restore.inconsistentSnapshotOnly().set(tr, inconsistentSnapshotOnly); restore.beginVersion().set(tr, beginVersion); restore.unlockDBAfterRestore().set(tr, unlockDB); + restore.transformPartitionedLog().set(tr, transformPartitionedLog); if (BUGGIFY && restoreRanges.size() == 1) { restore.restoreRange().set(tr, restoreRanges[0]); } else { @@ -6099,25 +7514,27 @@ class FileBackupAgentImpl { // When set to true, gives an inconsistent snapshot, thus not recommended // beginVersions: restore's begin version for each range // randomUid: the UID for lock the database - ACTOR static Future restore(FileBackupAgent* backupAgent, - Database cx, - Optional cxOrig, - Key tagName, - Key url, - Optional proxy, - Standalone> ranges, - Standalone> beginVersions, - WaitForComplete waitForComplete, - Version targetVersion, - Verbose verbose, - Key addPrefix, - Key removePrefix, - LockDB lockDB, - UnlockDB unlockDB, - OnlyApplyMutationLogs onlyApplyMutationLogs, - InconsistentSnapshotOnly inconsistentSnapshotOnly, - Optional encryptionKeyFileName, - UID randomUid) { + ACTOR static Future restore( + FileBackupAgent* backupAgent, + Database cx, + Optional cxOrig, + Key tagName, + Key url, + Optional proxy, + Standalone> ranges, + Standalone> beginVersions, + WaitForComplete waitForComplete, + Version targetVersion, + Verbose verbose, + Key addPrefix, + Key removePrefix, + LockDB lockDB, + UnlockDB unlockDB, + OnlyApplyMutationLogs onlyApplyMutationLogs, + InconsistentSnapshotOnly inconsistentSnapshotOnly, + Optional encryptionKeyFileName, + UID randomUid, + TransformPartitionedLog transformPartitionedLog = TransformPartitionedLog::False) { // The restore command line tool won't allow ranges to be empty, but correctness workloads somehow might. if (ranges.empty()) { throw restore_error(); @@ -6150,7 +7567,6 @@ class FileBackupAgentImpl { .detail("BackupContainer", bc->getURL()) .detail("BeginVersion", beginVersion) .detail("TargetVersion", targetVersion); - fmt::print(stderr, "ERROR: Restore version {0} is not possible from {1}\n", targetVersion, bc->getURL()); throw restore_invalid_version(); } @@ -6183,7 +7599,8 @@ class FileBackupAgentImpl { onlyApplyMutationLogs, inconsistentSnapshotOnly, beginVersion, - randomUid)); + randomUid, + transformPartitionedLog)); wait(tr->commit()); break; } catch (Error& e) { @@ -6545,7 +7962,8 @@ Future FileBackupAgent::restore(Database cx, UnlockDB unlockDB, OnlyApplyMutationLogs onlyApplyMutationLogs, InconsistentSnapshotOnly inconsistentSnapshotOnly, - Optional const& encryptionKeyFileName) { + Optional const& encryptionKeyFileName, + TransformPartitionedLog transformPartitionedLog) { return FileBackupAgentImpl::restore(this, cx, cxOrig, @@ -6564,7 +7982,8 @@ Future FileBackupAgent::restore(Database cx, onlyApplyMutationLogs, inconsistentSnapshotOnly, encryptionKeyFileName, - deterministicRandom()->randomUniqueID()); + deterministicRandom()->randomUniqueID(), + transformPartitionedLog); } Future FileBackupAgent::restore(Database cx, @@ -6583,7 +8002,8 @@ Future FileBackupAgent::restore(Database cx, OnlyApplyMutationLogs onlyApplyMutationLogs, InconsistentSnapshotOnly inconsistentSnapshotOnly, Version beginVersion, - Optional const& encryptionKeyFileName) { + Optional const& encryptionKeyFileName, + TransformPartitionedLog transformPartitionedLog) { Standalone> beginVersions; for (auto i = 0; i < ranges.size(); ++i) { beginVersions.push_back(beginVersions.arena(), beginVersion); @@ -6604,7 +8024,8 @@ Future FileBackupAgent::restore(Database cx, unlockDB, onlyApplyMutationLogs, inconsistentSnapshotOnly, - encryptionKeyFileName); + encryptionKeyFileName, + transformPartitionedLog); } Future FileBackupAgent::restore(Database cx, diff --git a/fdbclient/KeyRangeMap.actor.cpp b/fdbclient/KeyRangeMap.actor.cpp index a678c28e4a3..0d57cc280e8 100644 --- a/fdbclient/KeyRangeMap.actor.cpp +++ b/fdbclient/KeyRangeMap.actor.cpp @@ -199,10 +199,12 @@ ACTOR Future krmSetRange(Transaction* tr, Key mapPrefix, KeyRange range, V } ACTOR Future krmSetRange(Reference tr, Key mapPrefix, KeyRange range, Value value) { + // keyVersionMap, (a, b), v1 state KeyRange withPrefix = KeyRangeRef(mapPrefix.toString() + range.begin.toString(), mapPrefix.toString() + range.end.toString()); RangeResult old = wait(tr->getRange(lastLessOrEqual(withPrefix.end), firstGreaterThan(withPrefix.end), 1, Snapshot::True)); + // fetch [keyVersionMap/end, keyVersionMap/inc(end)] Value oldValue; bool hasResult = old.size() > 0 && old[0].key.startsWith(mapPrefix); @@ -213,8 +215,10 @@ ACTOR Future krmSetRange(Reference tr, Key mapP if (!conflictRange.empty()) tr->addReadConflictRange(conflictRange); - tr->clear(withPrefix); - tr->set(withPrefix.begin, value); + tr->clear(withPrefix); // clear [keyVersionMap/a, keyVersionMap/b) + tr->set(withPrefix.begin, value); // set [keyVersionMap/a, v1) + // set [keyVersionMap/b, preveiousVersion], because end is exclusive here, + // but starting from end it might be covered by another range file, so set it to old value tr->set(withPrefix.end, oldValue); return Void(); diff --git a/fdbclient/TaskBucket.actor.cpp b/fdbclient/TaskBucket.actor.cpp index 203b64453aa..6d277ccf141 100644 --- a/fdbclient/TaskBucket.actor.cpp +++ b/fdbclient/TaskBucket.actor.cpp @@ -1078,6 +1078,9 @@ class TaskFutureImpl { taskFuture->futureBucket->setOptions(tr); bool is_set = wait(isSet(tr, taskFuture)); + // this means that if the task future is already set, then just return + // so the taskFuture cannot be completed already if we want to join + // vectorFuture with taskFuture if (is_set) { return Void(); } @@ -1262,6 +1265,8 @@ class TaskFutureImpl { taskFuture->futureBucket->setOptions(tr); std::vector> vectorFuture; + // the next line means generate a new task future with different key, + // but share the same prefix of futureBucket with the input taskFuture state Reference future = taskFuture->futureBucket->future(tr); vectorFuture.push_back(future); wait(join(tr, taskBucket, taskFuture, vectorFuture)); @@ -1276,7 +1281,7 @@ TaskFuture::TaskFuture(const Reference bucket, Key k) : futureBuck key = deterministicRandom()->randomUniqueID().toString(); } - prefix = futureBucket->prefix.get(key); + prefix = futureBucket->prefix.get(key); // this ::get actually append the key to the taskBucket prefix blocks = prefix.get("bl"_sr); callbacks = prefix.get("cb"_sr); } @@ -1347,5 +1352,6 @@ ACTOR Future getCompletionKey(TaskCompletionKey* self, Future TaskCompletionKey::get(Reference tr, Reference taskBucket) { ASSERT(key.present() == (joinFuture.getPtr() == nullptr)); + // from the parent bucket, it generate a new taskfuture and returns the key of the new taskfuture return key.present() ? key.get() : getCompletionKey(this, joinFuture->joinedFuture(tr, taskBucket)); } diff --git a/fdbclient/include/fdbclient/BackupAgent.actor.h b/fdbclient/include/fdbclient/BackupAgent.actor.h index c8931750aad..e5714b8fe47 100644 --- a/fdbclient/include/fdbclient/BackupAgent.actor.h +++ b/fdbclient/include/fdbclient/BackupAgent.actor.h @@ -45,6 +45,7 @@ FDB_BOOLEAN_PARAM(ForceAction); FDB_BOOLEAN_PARAM(Terminator); FDB_BOOLEAN_PARAM(IncrementalBackupOnly); FDB_BOOLEAN_PARAM(UsePartitionedLog); +FDB_BOOLEAN_PARAM(TransformPartitionedLog); FDB_BOOLEAN_PARAM(OnlyApplyMutationLogs); FDB_BOOLEAN_PARAM(SnapshotBackupUseTenantCache); FDB_BOOLEAN_PARAM(InconsistentSnapshotOnly); @@ -203,8 +204,10 @@ class FileBackupAgent : public BackupAgentBase { UnlockDB = UnlockDB::True, OnlyApplyMutationLogs = OnlyApplyMutationLogs::False, InconsistentSnapshotOnly = InconsistentSnapshotOnly::False, - Optional const& encryptionKeyFileName = {}); + Optional const& encryptionKeyFileName = {}, + TransformPartitionedLog transformPartitionedLog = TransformPartitionedLog::False); + // this method will construct range and version vectors and then call restore() Future restore(Database cx, Optional cxOrig, Key tagName, @@ -222,6 +225,7 @@ class FileBackupAgent : public BackupAgentBase { Version beginVersion = ::invalidVersion, Optional const& encryptionKeyFileName = {}); + // create a version vector of size ranges.size(), all elements are the same, i.e. beginVersion Future restore(Database cx, Optional cxOrig, Key tagName, @@ -238,7 +242,8 @@ class FileBackupAgent : public BackupAgentBase { OnlyApplyMutationLogs onlyApplyMutationLogs = OnlyApplyMutationLogs::False, InconsistentSnapshotOnly inconsistentSnapshotOnly = InconsistentSnapshotOnly::False, Version beginVersion = ::invalidVersion, - Optional const& encryptionKeyFileName = {}); + Optional const& encryptionKeyFileName = {}, + TransformPartitionedLog transformPartitionedLog = TransformPartitionedLog::False); Future atomicRestore(Database cx, Key tagName, @@ -522,8 +527,8 @@ using RangeResultWithVersion = std::pair; struct RCGroup { RangeResult items; - Version version; - uint64_t groupKey; + Version version; // this is read version for this group + uint64_t groupKey; // this is the original version for this group RCGroup() : version(-1), groupKey(ULLONG_MAX){}; @@ -676,6 +681,7 @@ class KeyBackedTaskConfig : public KeyBackedClass { Reference task, SetValidation setValidation = SetValidation::True) { // Set the uid task parameter + // task's uid is set to my uid TaskParams.uid().set(task, uid); if (!setValidation) { diff --git a/fdbclient/include/fdbclient/ClientKnobs.h b/fdbclient/include/fdbclient/ClientKnobs.h index 29509e7dd99..070748843c1 100644 --- a/fdbclient/include/fdbclient/ClientKnobs.h +++ b/fdbclient/include/fdbclient/ClientKnobs.h @@ -184,6 +184,7 @@ class ClientKnobs : public KnobsImpl { int RESTORE_DISPATCH_ADDTASK_SIZE; int RESTORE_DISPATCH_BATCH_SIZE; int RESTORE_WRITE_TX_SIZE; + int RESTORE_PARTITIONED_BATCH_VERSION_SIZE; int APPLY_MAX_LOCK_BYTES; int APPLY_MIN_LOCK_BYTES; int APPLY_BLOCK_SIZE; diff --git a/fdbclient/include/fdbclient/KeyBackedTypes.actor.h b/fdbclient/include/fdbclient/KeyBackedTypes.actor.h index 0281e98a6a1..4d62569127a 100644 --- a/fdbclient/include/fdbclient/KeyBackedTypes.actor.h +++ b/fdbclient/include/fdbclient/KeyBackedTypes.actor.h @@ -976,7 +976,7 @@ class KeyBackedSet { return Optional(); } - return self.unpackKey(kvs.front()); + return self.unpackKey(kvs.front().key); } template diff --git a/fdbclient/include/fdbclient/PartitionedLogIterator.h b/fdbclient/include/fdbclient/PartitionedLogIterator.h new file mode 100644 index 00000000000..03c6f6d194a --- /dev/null +++ b/fdbclient/include/fdbclient/PartitionedLogIterator.h @@ -0,0 +1,28 @@ +#ifndef FDBCLIENT_PARTITIONED_LOG_ITERATOR_H +#define FDBCLIENT_PARTITIONED_LOG_ITERATOR_H + +#include "fdbclient/FDBTypes.h" + +// Structure to represent each mutation entity +struct VersionedMutation { + Version version; + int32_t subsequence; + MutationRef mutation; + VersionedMutation(Arena& p, const VersionedMutation& toCopy) : mutation(p, toCopy.mutation) { + version = toCopy.version; + subsequence = toCopy.subsequence; + } + VersionedMutation() {} +}; + +class PartitionedLogIterator : public ReferenceCounted { +public: + virtual bool hasNext() = 0; + + virtual Future peekNextVersion() = 0; + + virtual Future>> getNext() = 0; + + virtual ~PartitionedLogIterator() = default; +}; +#endif \ No newline at end of file diff --git a/fdbserver/ApplyMetadataMutation.cpp b/fdbserver/ApplyMetadataMutation.cpp index 4a5b247428f..5fcc957127d 100644 --- a/fdbserver/ApplyMetadataMutation.cpp +++ b/fdbserver/ApplyMetadataMutation.cpp @@ -510,6 +510,7 @@ class ApplyMetadataMutationsImpl { } void checkSetApplyMutationsEndRange(MutationRef m) { + // only proceed when see mutation with applyMutationsEndRange if (!m.param1.startsWith(applyMutationsEndRange.begin)) { return; } @@ -532,6 +533,11 @@ class ApplyMetadataMutationsImpl { auto addPrefixValue = txnStateStore->readValue(uid.withPrefix(applyMutationsAddPrefixRange.begin)).get(); auto removePrefixValue = txnStateStore->readValue(uid.withPrefix(applyMutationsRemovePrefixRange.begin)).get(); auto beginValue = txnStateStore->readValue(uid.withPrefix(applyMutationsBeginRange.begin)).get(); + TraceEvent("BackupAgentBaseApplyMutationsBegin") + .detail("BeginVersion", + beginValue.present() ? BinaryReader::fromStringRef(beginValue.get(), Unversioned()) : 0) + .detail("EndVersion", p.endVersion) + .log(); p.worker = applyMutations( cx, uid, diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index cbed292db6b..8ea98d9a0c7 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -671,6 +671,7 @@ ACTOR Future addMutation(Reference logFile, StringRef mutation, int64_t* blockEnd, int blockSize) { + // format: version, subversion, messageSize, message state int bytes = sizeof(Version) + sizeof(uint32_t) + sizeof(int) + mutation.size(); // Convert to big Endianness for version.version, version.sub, and msgSize diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index 894b403a76e..14b230d882d 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -512,8 +512,9 @@ ACTOR Future addBackupMutations(ProxyCommitData* self, // Serialize the log range mutations within the map for (; logRangeMutation != logRangeMutations->cend(); ++logRangeMutation) { // FIXME: this is re-implementing the serialize function of MutationListRef in order to have a yield + // this is 0x0FDB00A200090001 valueWriter = BinaryWriter(IncludeVersion(ProtocolVersion::withBackupMutations())); - valueWriter << logRangeMutation->second.totalSize(); + valueWriter << logRangeMutation->second.totalSize(); // this is int32 by default state MutationListRef::Blob* blobIter = logRangeMutation->second.blob_begin; while (blobIter) { @@ -532,7 +533,7 @@ ACTOR Future addBackupMutations(ProxyCommitData* self, Key val = valueWriter.toValue(); - BinaryWriter wr(Unversioned()); + BinaryWriter wr(Unversioned()); // backupName/hash/commitVersion/part, so wr is param1 // Serialize the log destination wr.serializeBytes(logRangeMutation->first); diff --git a/fdbserver/workloads/BackupCorrectnessPartitioned.actor.cpp b/fdbserver/workloads/BackupCorrectnessPartitioned.actor.cpp new file mode 100644 index 00000000000..d72c0fc00a3 --- /dev/null +++ b/fdbserver/workloads/BackupCorrectnessPartitioned.actor.cpp @@ -0,0 +1,808 @@ +/* + * BackupCorrectness.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/DatabaseConfiguration.h" +#include "fdbclient/ManagementAPI.actor.h" +#include "fdbclient/ReadYourWrites.h" +#include "fdbrpc/simulator.h" +#include "fdbclient/BackupAgent.actor.h" +#include "fdbclient/BackupContainer.h" +#include "fdbclient/BackupContainerFileSystem.h" +#include "fdbclient/TenantManagement.actor.h" +#include "fdbserver/Knobs.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "fdbserver/workloads/BulkSetup.actor.h" +#include "flow/IRandom.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +// A workload which test the correctness of backup and restore process +struct BackupAndRestorePartitionedCorrectnessWorkload : TestWorkload { + static constexpr auto NAME = "BackupAndRestorePartitionedCorrectness"; + double backupAfter, restoreAfter, abortAndRestartAfter; + double minBackupAfter; + double backupStartAt, restoreStartAfterBackupFinished, stopDifferentialAfter; + Key backupTag; + int backupRangesCount, backupRangeLengthMax; + bool differentialBackup, performRestore, agentRequest; + Standalone> backupRanges; + std::vector skippedRestoreRanges; + Standalone> restoreRanges; + static int backupAgentRequests; + LockDB locked{ false }; + bool allowPauses; + bool shareLogRange; + bool shouldSkipRestoreRanges; + bool defaultBackup; + Optional encryptionKeyFileName; + + BackupAndRestorePartitionedCorrectnessWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { + locked.set(sharedRandomNumber % 2); + backupAfter = getOption(options, "backupAfter"_sr, 10.0); + double minBackupAfter = getOption(options, "minBackupAfter"_sr, backupAfter); + if (backupAfter > minBackupAfter) { + backupAfter = deterministicRandom()->random01() * (backupAfter - minBackupAfter) + minBackupAfter; + } + restoreAfter = getOption(options, "restoreAfter"_sr, 35.0); + performRestore = getOption(options, "performRestore"_sr, true); + backupTag = getOption(options, "backupTag"_sr, BackupAgentBase::getDefaultTag()); + backupRangesCount = getOption(options, "backupRangesCount"_sr, 5); + backupRangeLengthMax = getOption(options, "backupRangeLengthMax"_sr, 1); + abortAndRestartAfter = + getOption(options, + "abortAndRestartAfter"_sr, + deterministicRandom()->random01() < 0.5 + ? deterministicRandom()->random01() * (restoreAfter - backupAfter) + backupAfter + : 0.0); + differentialBackup = + getOption(options, "differentialBackup"_sr, deterministicRandom()->random01() < 0.5 ? true : false); + stopDifferentialAfter = + getOption(options, + "stopDifferentialAfter"_sr, + differentialBackup ? deterministicRandom()->random01() * + (restoreAfter - std::max(abortAndRestartAfter, backupAfter)) + + std::max(abortAndRestartAfter, backupAfter) + : 0.0); + agentRequest = getOption(options, "simBackupAgents"_sr, true); + allowPauses = getOption(options, "allowPauses"_sr, true); + shareLogRange = getOption(options, "shareLogRange"_sr, false); + defaultBackup = getOption(options, "defaultBackup"_sr, false); + + std::vector restorePrefixesToInclude = + getOption(options, "restorePrefixesToInclude"_sr, std::vector()); + + shouldSkipRestoreRanges = deterministicRandom()->random01() < 0.3 ? true : false; + if (getOption(options, "encrypted"_sr, deterministicRandom()->random01() < 0.1)) { + encryptionKeyFileName = "simfdb/" + getTestEncryptionFileName(); + } + + TraceEvent("BARW_ClientId").detail("Id", wcx.clientId); + UID randomID = nondeterministicRandom()->randomUniqueID(); + TraceEvent("BARW_PerformRestore", randomID).detail("Value", performRestore); + if (defaultBackup) { + addDefaultBackupRanges(backupRanges); + } else if (shareLogRange) { + bool beforePrefix = sharedRandomNumber & 1; + if (beforePrefix) + backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(normalKeys.begin, "\xfe\xff\xfe"_sr)); + else + backupRanges.push_back_deep(backupRanges.arena(), + KeyRangeRef(strinc("\x00\x00\x01"_sr), normalKeys.end)); + } else if (backupRangesCount <= 0) { + backupRanges.push_back_deep(backupRanges.arena(), normalKeys); + } else { + // Add backup ranges + std::set rangeEndpoints; + while (rangeEndpoints.size() < backupRangesCount * 2) { + rangeEndpoints.insert(deterministicRandom()->randomAlphaNumeric( + deterministicRandom()->randomInt(1, backupRangeLengthMax + 1))); + } + + // Create ranges from the keys, in order, to prevent overlaps + std::vector sortedEndpoints(rangeEndpoints.begin(), rangeEndpoints.end()); + sort(sortedEndpoints.begin(), sortedEndpoints.end()); + for (auto i = sortedEndpoints.begin(); i != sortedEndpoints.end(); ++i) { + const std::string& start = *i++; + backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(start, *i)); + + // Track the added range + TraceEvent("BARW_BackupCorrectnessRange", randomID).detail("RangeBegin", start).detail("RangeEnd", *i); + } + } + + if (performRestore && !restorePrefixesToInclude.empty() && shouldSkipRestoreRanges) { + for (auto& range : backupRanges) { + bool intersection = false; + for (auto& prefix : restorePrefixesToInclude) { + KeyRange prefixRange(KeyRangeRef(prefix, strinc(prefix))); + if (range.intersects(prefixRange)) { + intersection = true; + } + TraceEvent("BARW_PrefixSkipRangeDetails") + .detail("PrefixMandatory", printable(prefix)) + .detail("BackupRange", printable(range)) + .detail("Intersection", intersection); + } + // If the backup range intersects with restorePrefixesToInclude or a coin flip is true then use it as a + // restore range as well, otherwise skip it. + if (intersection || deterministicRandom()->coinflip()) { + restoreRanges.push_back_deep(restoreRanges.arena(), range); + } else { + skippedRestoreRanges.push_back(range); + } + } + } else { + restoreRanges = backupRanges; + } + + // If no random backup ranges intersected with restorePrefixesToInclude or won the coin flip then restoreRanges + // will be empty, so move an item from skippedRestoreRanges to restoreRanges. + if (restoreRanges.empty()) { + ASSERT(!skippedRestoreRanges.empty()); + restoreRanges.push_back_deep(restoreRanges.arena(), skippedRestoreRanges.back()); + skippedRestoreRanges.pop_back(); + } + + for (auto& range : restoreRanges) { + TraceEvent("BARW_RestoreRange", randomID) + .detail("RangeBegin", printable(range.begin)) + .detail("RangeEnd", printable(range.end)); + } + for (auto& range : skippedRestoreRanges) { + TraceEvent("BARW_SkipRange", randomID) + .detail("RangeBegin", printable(range.begin)) + .detail("RangeEnd", printable(range.end)); + } + } + + Future setup(Database const& cx) override { + if (clientId != 0) { + return Void(); + } + + return _setup(cx, this); + } + + ACTOR Future _setup(Database cx, BackupAndRestorePartitionedCorrectnessWorkload* self) { + state bool adjusted = false; + state TenantMapEntry entry; + + if (!self->defaultBackup && (cx->defaultTenant.present() || BUGGIFY)) { + if (cx->defaultTenant.present()) { + wait(store(entry, TenantAPI::getTenant(cx.getReference(), cx->defaultTenant.get()))); + + // If we are specifying sub-ranges (or randomly, if backing up normal keys), adjust them to be relative + // to the tenant + if (self->backupRanges.size() != 1 || self->backupRanges[0] != normalKeys || + deterministicRandom()->coinflip()) { + adjusted = true; + Standalone> modifiedBackupRanges; + for (int i = 0; i < self->backupRanges.size(); ++i) { + modifiedBackupRanges.push_back_deep( + modifiedBackupRanges.arena(), + self->backupRanges[i].withPrefix(entry.prefix, self->backupRanges.arena())); + } + self->backupRanges = modifiedBackupRanges; + } + } + for (auto r : getSystemBackupRanges()) { + self->backupRanges.push_back_deep(self->backupRanges.arena(), r); + } + + if (adjusted) { + Standalone> modifiedRestoreRanges; + for (int i = 0; i < self->restoreRanges.size(); ++i) { + modifiedRestoreRanges.push_back_deep( + modifiedRestoreRanges.arena(), + self->restoreRanges[i].withPrefix(entry.prefix, self->restoreRanges.arena())); + } + self->restoreRanges = modifiedRestoreRanges; + + for (int i = 0; i < self->skippedRestoreRanges.size(); ++i) { + self->skippedRestoreRanges[i] = self->skippedRestoreRanges[i].withPrefix(entry.prefix); + } + } + for (auto r : getSystemBackupRanges()) { + self->restoreRanges.push_back_deep(self->restoreRanges.arena(), r); + } + } + + return Void(); + } + + Future start(Database const& cx) override { + if (clientId != 0) + return Void(); + + TraceEvent(SevInfo, "BARW_Param").detail("Locked", locked); + TraceEvent(SevInfo, "BARW_Param").detail("BackupAfter", backupAfter); + TraceEvent(SevInfo, "BARW_Param").detail("RestoreAfter", restoreAfter); + TraceEvent(SevInfo, "BARW_Param").detail("PerformRestore", performRestore); + TraceEvent(SevInfo, "BARW_Param").detail("BackupTag", printable(backupTag).c_str()); + TraceEvent(SevInfo, "BARW_Param").detail("BackupRangesCount", backupRangesCount); + TraceEvent(SevInfo, "BARW_Param").detail("BackupRangeLengthMax", backupRangeLengthMax); + TraceEvent(SevInfo, "BARW_Param").detail("AbortAndRestartAfter", abortAndRestartAfter); + TraceEvent(SevInfo, "BARW_Param").detail("DifferentialBackup", differentialBackup); + TraceEvent(SevInfo, "BARW_Param").detail("StopDifferentialAfter", stopDifferentialAfter); + TraceEvent(SevInfo, "BARW_Param").detail("AgentRequest", agentRequest); + TraceEvent(SevInfo, "BARW_Param").detail("Encrypted", encryptionKeyFileName.present()); + + return _start(cx, this); + } + + Future check(Database const& cx) override { + if (clientId != 0) + return true; + else + return _check(cx, this); + } + + ACTOR static Future _check(Database cx, BackupAndRestorePartitionedCorrectnessWorkload* self) { + state Transaction tr(cx); + loop { + try { + state int restoreIndex; + for (restoreIndex = 0; restoreIndex < self->skippedRestoreRanges.size(); restoreIndex++) { + state KeyRangeRef range = self->skippedRestoreRanges[restoreIndex]; + Standalone restoreTag(self->backupTag.toString() + "_" + std::to_string(restoreIndex)); + RangeResult res = wait(tr.getRange(range, GetRangeLimits::ROW_LIMIT_UNLIMITED)); + if (!res.empty()) { + TraceEvent(SevError, "BARW_UnexpectedRangePresent").detail("Range", printable(range)); + return false; + } + } + TraceEvent("BackupCorrectnessFinish").log(); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + return true; + } + + void getMetrics(std::vector& m) override {} + + ACTOR static Future changePaused(Database cx, FileBackupAgent* backupAgent) { + loop { + wait(backupAgent->changePause(cx, true)); + wait(delay(30 * deterministicRandom()->random01())); + wait(backupAgent->changePause(cx, false)); + wait(delay(120 * deterministicRandom()->random01())); + } + } + + ACTOR static Future statusLoop(Database cx, std::string tag) { + state FileBackupAgent agent; + loop { + bool active = wait(agent.checkActive(cx)); + TraceEvent("BARW_AgentActivityCheck").detail("IsActive", active); + std::string status = wait(agent.getStatus(cx, ShowErrors::True, tag)); + puts(status.c_str()); + std::string statusJSON = wait(agent.getStatusJSON(cx, tag)); + puts(statusJSON.c_str()); + wait(delay(2.0)); + } + } + + ACTOR static Future doBackup(BackupAndRestorePartitionedCorrectnessWorkload* self, + double startDelay, + FileBackupAgent* backupAgent, + Database cx, + Key tag, + Standalone> backupRanges, + double stopDifferentialDelay, + Promise submittted) { + + state UID randomID = nondeterministicRandom()->randomUniqueID(); + + state Future stopDifferentialFuture = delay(stopDifferentialDelay); + wait(delay(startDelay)); + + if (startDelay || BUGGIFY) { + TraceEvent("BARW_DoBackupAbortBackup1", randomID) + .detail("Tag", printable(tag)) + .detail("StartDelay", startDelay); + + try { + wait(backupAgent->abortBackup(cx, tag.toString())); + } catch (Error& e) { + TraceEvent("BARW_DoBackupAbortBackupException", randomID).error(e).detail("Tag", printable(tag)); + if (e.code() != error_code_backup_unneeded) + throw; + } + } + + TraceEvent("BARW_DoBackupSubmitBackup", randomID) + .detail("Tag", printable(tag)) + .detail("StopWhenDone", stopDifferentialDelay ? "False" : "True"); + + state std::string backupContainer = "file://simfdb/backups/"; + state Future status = statusLoop(cx, tag.toString()); + try { + wait(backupAgent->submitBackup(cx, + StringRef(backupContainer), + {}, + deterministicRandom()->randomInt(0, 60), + deterministicRandom()->randomInt(0, 2000), + tag.toString(), + backupRanges, + true, + StopWhenDone{ !stopDifferentialDelay }, + UsePartitionedLog::True, // enable partitioned log here + IncrementalBackupOnly::False, + self->encryptionKeyFileName)); + } catch (Error& e) { + TraceEvent("BARW_DoBackupSubmitBackupException", randomID).error(e).detail("Tag", printable(tag)); + if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate) + throw; + } + + submittted.send(Void()); + + // Stop the differential backup, if enabled + if (stopDifferentialDelay) { + CODE_PROBE(!stopDifferentialFuture.isReady(), + "Restore starts at specified time - stopDifferential not ready"); + wait(stopDifferentialFuture); + TraceEvent("BARW_DoBackupWaitToDiscontinue", randomID) + .detail("Tag", printable(tag)) + .detail("DifferentialAfter", stopDifferentialDelay); + + try { + if (BUGGIFY) { + state KeyBackedTag backupTag = makeBackupTag(tag.toString()); + TraceEvent("BARW_DoBackupWaitForRestorable", randomID).detail("Tag", backupTag.tagName); + + // Wait until the backup is in a restorable state and get the status, URL, and UID atomically + state Reference lastBackupContainer; + state UID lastBackupUID; + state EBackupState resultWait = wait(backupAgent->waitBackup( + cx, backupTag.tagName, StopWhenDone::False, &lastBackupContainer, &lastBackupUID)); + + TraceEvent("BARW_DoBackupWaitForRestorable", randomID) + .detail("Tag", backupTag.tagName) + .detail("Result", BackupAgentBase::getStateText(resultWait)); + + state bool restorable = false; + if (lastBackupContainer) { + state Future fdesc = lastBackupContainer->describeBackup(); + wait(ready(fdesc)); + + if (!fdesc.isError()) { + state BackupDescription desc = fdesc.get(); + wait(desc.resolveVersionTimes(cx)); + printf("BackupDescription:\n%s\n", desc.toString().c_str()); + restorable = desc.maxRestorableVersion.present(); + } + } + + TraceEvent("BARW_LastBackupContainer", randomID) + .detail("BackupTag", printable(tag)) + .detail("LastBackupContainer", lastBackupContainer ? lastBackupContainer->getURL() : "") + .detail("LastBackupUID", lastBackupUID) + .detail("WaitStatus", BackupAgentBase::getStateText(resultWait)) + .detail("Restorable", restorable); + + // Do not check the backup, if aborted + if (resultWait == EBackupState::STATE_ABORTED) { + } + // Ensure that a backup container was found + else if (!lastBackupContainer) { + TraceEvent(SevError, "BARW_MissingBackupContainer", randomID) + .detail("LastBackupUID", lastBackupUID) + .detail("BackupTag", printable(tag)) + .detail("WaitStatus", BackupAgentBase::getStateText(resultWait)); + printf("BackupCorrectnessMissingBackupContainer tag: %s status: %s\n", + printable(tag).c_str(), + BackupAgentBase::getStateText(resultWait)); + } + // Check that backup is restorable + else if (!restorable) { + TraceEvent(SevError, "BARW_NotRestorable", randomID) + .detail("LastBackupUID", lastBackupUID) + .detail("BackupTag", printable(tag)) + .detail("BackupFolder", lastBackupContainer->getURL()) + .detail("WaitStatus", BackupAgentBase::getStateText(resultWait)); + printf("BackupCorrectnessNotRestorable: tag: %s\n", printable(tag).c_str()); + } + + // Abort the backup, if not the first backup because the second backup may have aborted the backup + // by now + if (startDelay) { + TraceEvent("BARW_DoBackupAbortBackup2", randomID) + .detail("Tag", printable(tag)) + .detail("WaitStatus", BackupAgentBase::getStateText(resultWait)) + .detail("LastBackupContainer", lastBackupContainer ? lastBackupContainer->getURL() : "") + .detail("Restorable", restorable); + wait(backupAgent->abortBackup(cx, tag.toString())); + } else { + TraceEvent("BARW_DoBackupDiscontinueBackup", randomID) + .detail("Tag", printable(tag)) + .detail("DifferentialAfter", stopDifferentialDelay); + wait(backupAgent->discontinueBackup(cx, tag)); + } + } + + else { + TraceEvent("BARW_DoBackupDiscontinueBackup", randomID) + .detail("Tag", printable(tag)) + .detail("DifferentialAfter", stopDifferentialDelay); + wait(backupAgent->discontinueBackup(cx, tag)); + } + } catch (Error& e) { + TraceEvent("BARW_DoBackupDiscontinueBackupException", randomID).error(e).detail("Tag", printable(tag)); + if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate) + throw; + } + } + + // Wait for the backup to complete + TraceEvent("BARW_DoBackupWaitBackup", randomID).detail("Tag", printable(tag)); + state EBackupState statusValue = wait(backupAgent->waitBackup(cx, tag.toString(), StopWhenDone::True)); + + state std::string statusText; + + std::string _statusText = wait(backupAgent->getStatus(cx, ShowErrors::True, tag.toString())); + statusText = _statusText; + // Can we validate anything about status? + + TraceEvent("BARW_DoBackupComplete", randomID) + .detail("Tag", printable(tag)) + .detail("Status", statusText) + .detail("StatusValue", BackupAgentBase::getStateText(statusValue)); + + return Void(); + } + + ACTOR static Future clearAndRestoreSystemKeys(Database cx, + BackupAndRestorePartitionedCorrectnessWorkload* self, + FileBackupAgent* backupAgent, + Version targetVersion, + Reference lastBackupContainer, + Standalone> systemRestoreRanges) { + // restore system keys before restoring any other ranges + wait(runRYWTransaction(cx, [=](Reference tr) -> Future { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + for (auto& range : systemRestoreRanges) { + tr->clear(range); + } + return Void(); + })); + state Standalone restoreTag(self->backupTag.toString() + "_system"); + printf("BackupCorrectness, backupAgent.restore is called for tag:%s\n", restoreTag.toString().c_str()); + wait(success(backupAgent->restore(cx, + cx, + restoreTag, + KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), + systemRestoreRanges, + WaitForComplete::True, + targetVersion, + Verbose::True, + Key(), + Key(), + self->locked, + UnlockDB::True, + OnlyApplyMutationLogs::False, + InconsistentSnapshotOnly::False, + ::invalidVersion, + self->encryptionKeyFileName, + TransformPartitionedLog::True))); + printf("BackupCorrectness, backupAgent.restore finished for tag:%s\n", restoreTag.toString().c_str()); + return Void(); + } + + ACTOR static Future _start(Database cx, BackupAndRestorePartitionedCorrectnessWorkload* self) { + state FileBackupAgent backupAgent; + state bool extraTasks = false; + state DatabaseConfiguration config = wait(getDatabaseConfiguration(cx)); + TraceEvent("BARW_Arguments") + .detail("BackupTag", printable(self->backupTag)) + .detail("PerformRestore", self->performRestore) + .detail("BackupAfter", self->backupAfter) + .detail("RestoreAfter", self->restoreAfter) + .detail("AbortAndRestartAfter", self->abortAndRestartAfter) + .detail("DifferentialAfter", self->stopDifferentialAfter); + + state UID randomID = nondeterministicRandom()->randomUniqueID(); + if (self->allowPauses && BUGGIFY) { + state Future cp = changePaused(cx, &backupAgent); + } + + // Increment the backup agent requests + if (self->agentRequest) { + BackupAndRestorePartitionedCorrectnessWorkload::backupAgentRequests++; + } + + if (self->encryptionKeyFileName.present()) { + wait(BackupContainerFileSystem::createTestEncryptionKeyFile(self->encryptionKeyFileName.get())); + } + + try { + state Future startRestore = delay(self->restoreAfter); + + // backup + wait(delay(self->backupAfter)); + + TraceEvent("BARW_DoBackup1", randomID).detail("Tag", printable(self->backupTag)); + state Promise submitted; + state Future b = doBackup( + self, 0, &backupAgent, cx, self->backupTag, self->backupRanges, self->stopDifferentialAfter, submitted); + + TraceEvent("BARW_DoBackupWait", randomID) + .detail("BackupTag", printable(self->backupTag)) + .detail("AbortAndRestartAfter", self->abortAndRestartAfter); + try { + wait(b); + } catch (Error& e) { + if (e.code() != error_code_database_locked) + throw; + if (self->performRestore) + throw; + return Void(); + } + TraceEvent("BARW_DoBackupDone", randomID) + .detail("BackupTag", printable(self->backupTag)) + .detail("AbortAndRestartAfter", self->abortAndRestartAfter); + + state KeyBackedTag keyBackedTag = makeBackupTag(self->backupTag.toString()); + UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx.getReference())); + state UID logUid = uidFlag.first; + state Key destUidValue = wait(BackupConfig(logUid).destUidValue().getD(cx.getReference())); + state Reference lastBackupContainer = + wait(BackupConfig(logUid).backupContainer().getD(cx.getReference())); + + CODE_PROBE(!startRestore.isReady(), "Restore starts at specified time"); + wait(startRestore); + // between here + if (lastBackupContainer && self->performRestore) { + wait(runRYWTransaction(cx, [=](Reference tr) -> Future { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + for (auto& kvrange : self->backupRanges) + tr->clear(kvrange); + return Void(); + })); + + // restore database + TraceEvent("BARW_Restore", randomID) + .detail("LastBackupContainer", lastBackupContainer->getURL()) + .detail("RestoreAfter", self->restoreAfter) + .detail("BackupTag", printable(self->backupTag)); + + auto container = IBackupContainer::openContainer(lastBackupContainer->getURL(), + lastBackupContainer->getProxy(), + lastBackupContainer->getEncryptionKeyFileName()); + BackupDescription desc = wait(container->describeBackup()); + + state Version targetVersion = -1; + if (desc.maxRestorableVersion.present()) { + if (deterministicRandom()->random01() < 0.1) { + targetVersion = desc.minRestorableVersion.get(); + } else if (deterministicRandom()->random01() < 0.1) { + targetVersion = desc.maxRestorableVersion.get(); + } else if (deterministicRandom()->random01() < 0.5) { + targetVersion = deterministicRandom()->randomInt64(desc.minRestorableVersion.get(), + desc.contiguousLogEnd.get()); + } + } + + TraceEvent("BARW_RestoreDebug").detail("TargetVersion", targetVersion); + + state std::vector> restores; + state std::vector> restoreTags; + state bool multipleRangesInOneTag = false; + state int restoreIndex = 0; + // make sure system keys are not present in the restoreRanges as they will get restored first separately + // from the rest + Standalone> modifiedRestoreRanges; + Standalone> systemRestoreRanges; + for (int i = 0; i < self->restoreRanges.size(); ++i) { + if (config.tenantMode != TenantMode::REQUIRED || + !self->restoreRanges[i].intersects(getSystemBackupRanges())) { + modifiedRestoreRanges.push_back_deep(modifiedRestoreRanges.arena(), self->restoreRanges[i]); + } else { + KeyRangeRef normalKeyRange = self->restoreRanges[i] & normalKeys; + KeyRangeRef systemKeyRange = self->restoreRanges[i] & systemKeys; + if (!normalKeyRange.empty()) { + modifiedRestoreRanges.push_back_deep(modifiedRestoreRanges.arena(), normalKeyRange); + } + if (!systemKeyRange.empty()) { + systemRestoreRanges.push_back_deep(systemRestoreRanges.arena(), systemKeyRange); + } + } + } + self->restoreRanges = modifiedRestoreRanges; + if (!systemRestoreRanges.empty()) { + // We are able to restore system keys first since we restore an entire cluster at once rather than + // partial key ranges. + // this is where it fails + wait(clearAndRestoreSystemKeys( + cx, self, &backupAgent, targetVersion, lastBackupContainer, systemRestoreRanges)); + } + // and here + + multipleRangesInOneTag = true; + Standalone restoreTag(self->backupTag.toString() + "_" + std::to_string(restoreIndex)); + restoreTags.push_back(restoreTag); + printf("BackupCorrectness, backupAgent.restore is called for restoreIndex:%d tag:%s\n", + restoreIndex, + restoreTag.toString().c_str()); + restores.push_back(backupAgent.restore(cx, + cx, + restoreTag, + KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), + self->restoreRanges, + WaitForComplete::True, + targetVersion, + Verbose::True, + Key(), + Key(), + self->locked, + UnlockDB::True, + OnlyApplyMutationLogs::False, + InconsistentSnapshotOnly::False, + ::invalidVersion, + self->encryptionKeyFileName, + TransformPartitionedLog::True)); + + wait(waitForAll(restores)); + + for (auto& restore : restores) { + ASSERT(!restore.isError()); + } + } + state Key backupAgentKey = uidPrefixKey(logRangesRange.begin, logUid); + state Key backupLogValuesKey = destUidValue.withPrefix(backupLogKeys.begin); + state Key backupLatestVersionsPath = destUidValue.withPrefix(backupLatestVersionsPrefix); + state Key backupLatestVersionsKey = uidPrefixKey(backupLatestVersionsPath, logUid); + state int displaySystemKeys = 0; + + // Ensure that there is no left over key within the backup subspace + loop { + state Reference tr(new ReadYourWritesTransaction(cx)); + + TraceEvent("BARW_CheckLeftoverKeys", randomID).detail("BackupTag", printable(self->backupTag)); + + try { + // Check the left over tasks + // We have to wait for the list to empty since an abort and get status + // can leave extra tasks in the queue + TraceEvent("BARW_CheckLeftoverTasks", randomID).detail("BackupTag", printable(self->backupTag)); + state int64_t taskCount = wait(backupAgent.getTaskCount(tr)); + state int waitCycles = 0; + + if ((taskCount) && false) { + TraceEvent("BARW_EndingNonzeroTaskCount", randomID) + .detail("BackupTag", printable(self->backupTag)) + .detail("TaskCount", taskCount) + .detail("WaitCycles", waitCycles); + printf("EndingNonZeroTasks: %ld\n", (long)taskCount); + wait(TaskBucket::debugPrintRange(cx, normalKeys.end, StringRef())); + } + + while (taskCount > 0) { + waitCycles++; + + TraceEvent("BARW_NonzeroTaskWait", randomID) + .detail("BackupTag", printable(self->backupTag)) + .detail("TaskCount", taskCount) + .detail("WaitCycles", waitCycles); + printf("%.6f %-10s Wait #%4d for %lld tasks to end\n", + now(), + randomID.toString().c_str(), + waitCycles, + (long long)taskCount); + + wait(delay(5.0)); + + tr = makeReference(cx); + wait(store(taskCount, backupAgent.getTaskCount(tr))); + } + + RangeResult agentValues = + wait(tr->getRange(KeyRange(KeyRangeRef(backupAgentKey, strinc(backupAgentKey))), 100)); + + // Error if the system keyspace for the backup tag is not empty + if (agentValues.size() > 0) { + displaySystemKeys++; + printf("BackupCorrectnessLeftOverMutationKeys: (%d) %s\n", + agentValues.size(), + printable(backupAgentKey).c_str()); + TraceEvent(SevError, "BackupCorrectnessLeftOverMutationKeys", randomID) + .detail("BackupTag", printable(self->backupTag)) + .detail("LeftOverKeys", agentValues.size()) + .detail("KeySpace", printable(backupAgentKey)); + for (auto& s : agentValues) { + TraceEvent("BARW_LeftOverKey", randomID) + .detail("Key", printable(StringRef(s.key.toString()))) + .detail("Value", printable(StringRef(s.value.toString()))); + printf(" Key: %-50s Value: %s\n", + printable(StringRef(s.key.toString())).c_str(), + printable(StringRef(s.value.toString())).c_str()); + } + } else { + printf("No left over backup agent configuration keys\n"); + } + + Optional latestVersion = wait(tr->get(backupLatestVersionsKey)); + if (latestVersion.present()) { + TraceEvent(SevError, "BackupCorrectnessLeftOverVersionKey", randomID) + .detail("BackupTag", printable(self->backupTag)) + .detail("BackupLatestVersionsKey", backupLatestVersionsKey.printable()) + .detail("DestUidValue", destUidValue.printable()); + } else { + printf("No left over backup version key\n"); + } + + RangeResult versions = wait(tr->getRange( + KeyRange(KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath))), 1)); + if (!self->shareLogRange || !versions.size()) { + RangeResult logValues = wait( + tr->getRange(KeyRange(KeyRangeRef(backupLogValuesKey, strinc(backupLogValuesKey))), 100)); + + // Error if the log/mutation keyspace for the backup tag is not empty + if (logValues.size() > 0) { + displaySystemKeys++; + printf("BackupCorrectnessLeftOverLogKeys: (%d) %s\n", + logValues.size(), + printable(backupLogValuesKey).c_str()); + TraceEvent(SevError, "BackupCorrectnessLeftOverLogKeys", randomID) + .detail("BackupTag", printable(self->backupTag)) + .detail("LeftOverKeys", logValues.size()) + .detail("KeySpace", printable(backupLogValuesKey)); + } else { + printf("No left over backup log keys\n"); + } + } + + break; + } catch (Error& e) { + TraceEvent("BARW_CheckException", randomID).error(e); + wait(tr->onError(e)); + } + } + + if (displaySystemKeys) { + wait(TaskBucket::debugPrintRange(cx, normalKeys.end, StringRef())); + } + + TraceEvent("BARW_Complete", randomID).detail("BackupTag", printable(self->backupTag)); + + // Decrement the backup agent requets + if (self->agentRequest) { + BackupAndRestorePartitionedCorrectnessWorkload::backupAgentRequests--; + } + + // SOMEDAY: Remove after backup agents can exist quiescently + if ((g_simulator->backupAgents == ISimulator::BackupAgentType::BackupToFile) && + (!BackupAndRestorePartitionedCorrectnessWorkload::backupAgentRequests)) { + g_simulator->backupAgents = ISimulator::BackupAgentType::NoBackupAgents; + } + } catch (Error& e) { + TraceEvent(SevError, "BackupAndRestorePartitionedCorrectness").error(e).GetLastError(); + throw; + } + return Void(); + } +}; + +int BackupAndRestorePartitionedCorrectnessWorkload::backupAgentRequests = 0; + +WorkloadFactory BackupAndRestorePartitionedCorrectnessWorkloadFactory; diff --git a/fdbserver/workloads/Cycle.actor.cpp b/fdbserver/workloads/Cycle.actor.cpp index c522e8c86d7..86c3635614d 100644 --- a/fdbserver/workloads/Cycle.actor.cpp +++ b/fdbserver/workloads/Cycle.actor.cpp @@ -147,6 +147,7 @@ struct CycleWorkload : TestWorkload, CycleMembers { ACTOR Future cycleClient(Database cx, CycleWorkload* self, double delay) { state double lastTime = now(); + TraceEvent("CycleClientStart").log(); try { loop { wait(poisson(&lastTime, delay)); @@ -165,8 +166,9 @@ struct CycleWorkload : TestWorkload, CycleMembers { self->setAuthToken(tr); // Reverse next and next^2 node Optional v = wait(tr.get(self->key(r))); - if (!v.present()) + if (!v.present()) { self->badRead("KeyR", r, tr); + } state int r2 = self->fromValue(v.get()); Optional v2 = wait(tr.get(self->key(r2))); if (!v2.present()) @@ -175,18 +177,23 @@ struct CycleWorkload : TestWorkload, CycleMembers { Optional v3 = wait(tr.get(self->key(r3))); if (!v3.present()) self->badRead("KeyR3", r3, tr); - int r4 = self->fromValue(v3.get()); + state int r4 = self->fromValue(v3.get()); tr.clear(self->key(r)); //< Shouldn't have an effect, but will break with wrong ordering tr.set(self->key(r), self->value(r3)); tr.set(self->key(r2), self->value(r4)); tr.set(self->key(r3), self->value(r2)); - // TraceEvent("CyclicTest").detail("Key", self->key(r).toString()).detail("Value", self->value(r3).toString()); - // TraceEvent("CyclicTest").detail("Key", self->key(r2).toString()).detail("Value", self->value(r4).toString()); - // TraceEvent("CyclicTest").detail("Key", self->key(r3).toString()).detail("Value", self->value(r2).toString()); + // TraceEvent("CyclicTest1").detail("RawKey", r).detail("RawValue", r3).detail("Key", self->key(r).toString()).detail("Value", self->value(r3).toString()).log(); + // TraceEvent("CyclicTest2").detail("RawKey", r2).detail("RawValue", r4).detail("Key", self->key(r2).toString()).detail("Value", self->value(r4).toString()).log(); + // TraceEvent("CyclicTest3").detail("RawKey", r3).detail("RawValue", r2).detail("Key", self->key(r3).toString()).detail("Value", self->value(r2).toString()).log(); wait(tr.commit()); - // TraceEvent("CycleCommit"); + // TraceEvent("CyclicTestCommit") + // .detail("R1", r) + // .detail("R2", r2) + // .detail("R3", r3) + // .detail("R4", r4) + // .log(); break; } catch (Error& e) { if (e.code() == error_code_transaction_too_old) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index f93605683f9..ae97f177f05 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -459,6 +459,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES slow/ApiCorrectness.toml) add_fdb_test(TEST_FILES slow/ApiCorrectnessAtomicRestore.toml) add_fdb_test(TEST_FILES slow/ApiCorrectnessSwitchover.toml) + add_fdb_test(TEST_FILES slow/BackupCorrectnessPartitioned.toml) add_fdb_test(TEST_FILES slow/BlobGranuleVerifyLarge.toml IGNORE) add_fdb_test(TEST_FILES slow/BlobGranuleVerifyLargeClean.toml IGNORE) add_fdb_test(TEST_FILES slow/BlobGranuleVerifyBalance.toml) diff --git a/tests/fast/BackupCorrectness.toml b/tests/fast/BackupCorrectness.toml index 0582bfa0ebc..7c9ffbf5695 100644 --- a/tests/fast/BackupCorrectness.toml +++ b/tests/fast/BackupCorrectness.toml @@ -40,3 +40,13 @@ simBackupAgents = 'BackupToFile' machinesToLeave = 3 reboot = true testDuration = 90.0 + +[[test]] +testTitle = 'PostBackupAndRestore' +clearAfterTest = false + [[test.workload]] + testName = 'Cycle' + nodeCount = 30000 + transactionsPerSecond = 2500.0 + testDuration = 30.0 + expectedRate = 0 diff --git a/tests/slow/BackupCorrectnessPartitioned.toml b/tests/slow/BackupCorrectnessPartitioned.toml new file mode 100644 index 00000000000..5f1a7b1fbd3 --- /dev/null +++ b/tests/slow/BackupCorrectnessPartitioned.toml @@ -0,0 +1,26 @@ +testClass = "Backup" + +[configuration] +buggify = false +tenantModes = ['disabled'] # Do not support tenant +encryptModes = ['disabled'] # Do not support encryption + +[[test]] +testTitle = 'BackupAndRestorePartitioned' +clearAfterTest = false +simBackupAgents = 'BackupToFile' + + [[test.workload]] + testName = 'Cycle' + nodeCount = 3000 + transactionsPerSecond = 2500.0 + testDuration = 30.0 + expectedRate = 0 + + [[test.workload]] + testName = 'BackupAndRestorePartitionedCorrectness' + backupAfter = 10.0 + restoreAfter = 60.0 + backupRangesCount = -1 + +