Skip to content

Commit

Permalink
assumeState and catchup tests for Hot Archive BucketList
Browse files Browse the repository at this point in the history
  • Loading branch information
SirTyson committed Jan 15, 2025
1 parent 05be852 commit 6ffde65
Show file tree
Hide file tree
Showing 42 changed files with 1,977 additions and 948 deletions.
3 changes: 2 additions & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ TEST_FILES = $(TESTDATA_DIR)/stellar-core_example.cfg $(TESTDATA_DIR)/stellar-co
$(TESTDATA_DIR)/stellar-core_testnet.cfg $(TESTDATA_DIR)/stellar-core_testnet_legacy.cfg \
$(TESTDATA_DIR)/stellar-history.testnet.6714239.json $(TESTDATA_DIR)/stellar-history.livenet.15686975.json \
$(TESTDATA_DIR)/stellar-core_testnet_validator.cfg $(TESTDATA_DIR)/stellar-core_example_validators.cfg \
$(TESTDATA_DIR)/stellar-history.testnet.6714239.networkPassphrase.json
$(TESTDATA_DIR)/stellar-history.testnet.6714239.networkPassphrase.json \
$(TESTDATA_DIR)/stellar-history.testnet.6714239.networkPassphrase.v2.json

BUILT_SOURCES = $(SRC_X_FILES:.x=.h) main/StellarCoreVersion.cpp main/XDRFilesSha256.cpp $(TEST_FILES)

Expand Down
2 changes: 1 addition & 1 deletion src/bucket/BucketBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
}
if (countMergeEvents)
{
bucketManager.incrMergeCounters(mc);
bucketManager.incrMergeCounters<BucketT>(mc);
}

std::vector<Hash> shadowHashes;
Expand Down
159 changes: 108 additions & 51 deletions src/bucket/BucketManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,18 +330,36 @@ BucketManager::getMergeTimer()
return mBucketSnapMerge;
}

template <>
MergeCounters
BucketManager::readMergeCounters<LiveBucket>()
{
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
return mLiveMergeCounters;
}

template <>
MergeCounters
BucketManager::readMergeCounters()
BucketManager::readMergeCounters<HotArchiveBucket>()
{
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
return mHotArchiveMergeCounters;
}

template <>
void
BucketManager::incrMergeCounters<LiveBucket>(MergeCounters const& delta)
{
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
return mMergeCounters;
mLiveMergeCounters += delta;
}

template <>
void
BucketManager::incrMergeCounters(MergeCounters const& delta)
BucketManager::incrMergeCounters<HotArchiveBucket>(MergeCounters const& delta)
{
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
mMergeCounters += delta;
mHotArchiveMergeCounters += delta;
}

bool
Expand Down Expand Up @@ -623,7 +641,7 @@ BucketManager::getMergeFutureInternal(MergeKey const& key,
auto future = promise.get_future().share();
promise.set_value(bucket);
mc.mFinishedMergeReattachments++;
incrMergeCounters(mc);
incrMergeCounters<BucketT>(mc);
return future;
}
}
Expand All @@ -638,7 +656,7 @@ BucketManager::getMergeFutureInternal(MergeKey const& key,
"BucketManager::getMergeFuture returning running future for merge {}",
key);
mc.mRunningMergeReattachments++;
incrMergeCounters(mc);
incrMergeCounters<BucketT>(mc);
return i->second;
}

Expand Down Expand Up @@ -1013,10 +1031,10 @@ BucketManager::snapshotLedger(LedgerHeader& currentHeader)
currentHeader.ledgerVersion,
BucketBase::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION))
{
// TODO: Hash Archive Bucket
// Dependency: HAS supports Hot Archive BucketList

hash = mLiveBucketList->getHash();
SHA256 hsh;
hsh.add(mLiveBucketList->getHash());
hsh.add(mHotArchiveBucketList->getHash());
hash = hsh.finish();
}
else
{
Expand Down Expand Up @@ -1229,51 +1247,71 @@ BucketManager::assumeState(HistoryArchiveState const& has,
releaseAssert(threadIsMain());
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);

// TODO: Assume archival bucket state
// Dependency: HAS supports Hot Archive BucketList
for (uint32_t i = 0; i < LiveBucketList::kNumLevels; ++i)
{
auto curr = getBucketByHashInternal(
hexToBin256(has.currentBuckets.at(i).curr), mSharedLiveBuckets);
auto snap = getBucketByHashInternal(
hexToBin256(has.currentBuckets.at(i).snap), mSharedLiveBuckets);
if (!(curr && snap))
{
throw std::runtime_error("Missing bucket files while assuming "
"saved live BucketList state");
}

auto const& nextFuture = has.currentBuckets.at(i).next;
std::shared_ptr<LiveBucket> nextBucket = nullptr;
if (nextFuture.hasOutputHash())
auto processBucketList = [&](auto& bl, auto const& hasBuckets) {
auto kNumLevels = std::remove_reference<decltype(bl)>::type::kNumLevels;
using BucketT =
typename std::remove_reference<decltype(bl)>::type::bucket_type;
for (uint32_t i = 0; i < kNumLevels; ++i)
{
nextBucket = getBucketByHashInternal(
hexToBin256(nextFuture.getOutputHash()), mSharedLiveBuckets);
if (!nextBucket)
auto curr =
getBucketByHash<BucketT>(hexToBin256(hasBuckets.at(i).curr));
auto snap =
getBucketByHash<BucketT>(hexToBin256(hasBuckets.at(i).snap));
if (!(curr && snap))
{
throw std::runtime_error(
"Missing future bucket files while "
"assuming saved live BucketList state");
throw std::runtime_error("Missing bucket files while assuming "
"saved live BucketList state");
}
}

// Buckets on the BucketList should always be indexed
releaseAssert(curr->isEmpty() || curr->isIndexed());
releaseAssert(snap->isEmpty() || snap->isIndexed());
if (nextBucket)
{
releaseAssert(nextBucket->isEmpty() || nextBucket->isIndexed());
auto const& nextFuture = hasBuckets.at(i).next;
std::shared_ptr<BucketT> nextBucket = nullptr;
if (nextFuture.hasOutputHash())
{
nextBucket = getBucketByHash<BucketT>(
hexToBin256(nextFuture.getOutputHash()));
if (!nextBucket)
{
throw std::runtime_error(
"Missing future bucket files while "
"assuming saved live BucketList state");
}
}

// Buckets on the BucketList should always be indexed
releaseAssert(curr->isEmpty() || curr->isIndexed());
releaseAssert(snap->isEmpty() || snap->isIndexed());
if (nextBucket)
{
releaseAssert(nextBucket->isEmpty() || nextBucket->isIndexed());
}

bl.getLevel(i).setCurr(curr);
bl.getLevel(i).setSnap(snap);
bl.getLevel(i).setNext(nextFuture);
}
};

mLiveBucketList->getLevel(i).setCurr(curr);
mLiveBucketList->getLevel(i).setSnap(snap);
mLiveBucketList->getLevel(i).setNext(nextFuture);
processBucketList(*mLiveBucketList, has.currentBuckets);
#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
if (has.hasHotArchiveBuckets())
{
processBucketList(*mHotArchiveBucketList, has.hotArchiveBuckets);
}
#endif

if (restartMerges)
{
mLiveBucketList->restartMerges(mApp, maxProtocolVersion,
has.currentLedger);
#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
if (has.hasHotArchiveBuckets())
{
mHotArchiveBucketList->restartMerges(mApp, maxProtocolVersion,
has.currentLedger);
}
#endif
}
cleanupStaleFiles(has);
}
Expand Down Expand Up @@ -1580,16 +1618,35 @@ BucketManager::scheduleVerifyReferencedBucketsWork(
continue;
}

// TODO: Update verify to for ArchiveBucket
// Dependency: HAS supports Hot Archive BucketList
auto b = getBucketByHashInternal(h, mSharedLiveBuckets);
if (!b)
{
throw std::runtime_error(fmt::format(
FMT_STRING("Missing referenced bucket {}"), binToHex(h)));
}
seq.emplace_back(std::make_shared<VerifyBucketWork>(
mApp, b->getFilename().string(), b->getHash(), nullptr));
auto loadFilenameAndHash = [&]() -> std::pair<std::string, Hash> {
auto live = getBucketByHashInternal(h, mSharedLiveBuckets);
if (!live)
{
auto hot = getBucketByHashInternal(h, mSharedHotArchiveBuckets);

// Check both live and hot archive buckets for hash. If we don't
// find it in either, we're missing a bucket. Note that live and
// hot archive buckets are guaranteed to have no hash collisions
// due to type field in MetaEntry.
if (!hot)
{
throw std::runtime_error(
fmt::format(FMT_STRING("Missing referenced bucket {}"),
binToHex(h)));
}
return std::make_pair(hot->getFilename().string(),
hot->getHash());
}
else
{
return std::make_pair(live->getFilename().string(),
live->getHash());
}
};

auto [filename, hash] = loadFilenameAndHash();
seq.emplace_back(
std::make_shared<VerifyBucketWork>(mApp, filename, hash, nullptr));
}
return mApp.getWorkScheduler().scheduleWork<WorkSequence>(
"verify-referenced-buckets", seq);
Expand Down
7 changes: 4 additions & 3 deletions src/bucket/BucketManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ class BucketManager : NonMovableOrCopyable
medida::Counter& mLiveBucketListSizeCounter;
medida::Counter& mArchiveBucketListSizeCounter;
EvictionCounters mBucketListEvictionCounters;
MergeCounters mMergeCounters;
MergeCounters mLiveMergeCounters;
MergeCounters mHotArchiveMergeCounters;
std::shared_ptr<EvictionStatistics> mEvictionStatistics{};
std::map<LedgerEntryTypeAndDurability, medida::Counter&>
mBucketListEntryCountCounters;
Expand Down Expand Up @@ -203,8 +204,8 @@ class BucketManager : NonMovableOrCopyable

// Reading and writing the merge counters is done in bulk, and takes a lock
// briefly; this can be done from any thread.
MergeCounters readMergeCounters();
void incrMergeCounters(MergeCounters const& delta);
template <class BucketT> MergeCounters readMergeCounters();
template <class BucketT> void incrMergeCounters(MergeCounters const& delta);

// Get a reference to a persistent bucket (in the BucketManager's bucket
// directory), from the BucketManager's shared bucket-set.
Expand Down
2 changes: 1 addition & 1 deletion src/bucket/HotArchiveBucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ HotArchiveBucket::fresh(BucketManager& bucketManager, uint32_t protocolVersion,

if (countMergeEvents)
{
bucketManager.incrMergeCounters(mc);
bucketManager.incrMergeCounters<HotArchiveBucket>(mc);
}

return out.getBucket(bucketManager);
Expand Down
10 changes: 5 additions & 5 deletions src/bucket/HotArchiveBucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ typedef BucketOutputIterator<HotArchiveBucket> HotArchiveBucketOutputIterator;
class HotArchiveBucket : public BucketBase,
public std::enable_shared_from_this<HotArchiveBucket>
{
static std::vector<HotArchiveBucketEntry>
convertToBucketEntry(std::vector<LedgerEntry> const& archivedEntries,
std::vector<LedgerKey> const& restoredEntries,
std::vector<LedgerKey> const& deletedEntries);

public:
// Entry type that this bucket stores
using EntryT = HotArchiveBucketEntry;
Expand Down Expand Up @@ -91,6 +86,11 @@ class HotArchiveBucket : public BucketBase,
static std::shared_ptr<LoadT>
bucketEntryToLoadResult(std::shared_ptr<EntryT> const& be);

static std::vector<HotArchiveBucketEntry>
convertToBucketEntry(std::vector<LedgerEntry> const& archivedEntries,
std::vector<LedgerKey> const& restoredEntries,
std::vector<LedgerKey> const& deletedEntries);

friend class HotArchiveBucketSnapshot;
};
}
2 changes: 2 additions & 0 deletions src/bucket/HotArchiveBucketList.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ namespace stellar
class HotArchiveBucketList : public BucketListBase<HotArchiveBucket>
{
public:
using bucket_type = HotArchiveBucket;

void addBatch(Application& app, uint32_t currLedger,
uint32_t currLedgerProtocol,
std::vector<LedgerEntry> const& archiveEntries,
Expand Down
2 changes: 1 addition & 1 deletion src/bucket/LiveBucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ LiveBucket::fresh(BucketManager& bucketManager, uint32_t protocolVersion,

if (countMergeEvents)
{
bucketManager.incrMergeCounters(mc);
bucketManager.incrMergeCounters<LiveBucket>(mc);
}

return out.getBucket(bucketManager);
Expand Down
2 changes: 2 additions & 0 deletions src/bucket/LiveBucketList.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ namespace stellar
class LiveBucketList : public BucketListBase<LiveBucket>
{
public:
using bucket_type = LiveBucket;

// Reset Eviction Iterator position if an incoming spill or upgrade has
// invalidated the previous position
static void updateStartingEvictionIterator(EvictionIterator& iter,
Expand Down
7 changes: 4 additions & 3 deletions src/bucket/test/BucketListTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1139,9 +1139,10 @@ TEST_CASE_VERSIONS("eviction scan", "[bucketlist][archival]")
// Close ledgers until evicted DEADENTRYs merge with
// original INITENTRYs. This checks that BucketList
// invariants are respected
for (auto initialDeadMerges =
bm.readMergeCounters().mOldInitEntriesMergedWithNewDead;
bm.readMergeCounters().mOldInitEntriesMergedWithNewDead <
for (auto initialDeadMerges = bm.readMergeCounters<LiveBucket>()
.mOldInitEntriesMergedWithNewDead;
bm.readMergeCounters<LiveBucket>()
.mOldInitEntriesMergedWithNewDead <
initialDeadMerges + tempEntries.size();
++ledgerSeq)
{
Expand Down
Loading

0 comments on commit 6ffde65

Please sign in to comment.