From 44175e0921949a7dc880331fd30415d8982e1954 Mon Sep 17 00:00:00 2001 From: Stephen Atherton Date: Fri, 18 Oct 2019 01:27:00 -0700 Subject: [PATCH] COWPager will no longer expire read Snapshots that are still in use. --- fdbserver/IPager.h | 11 ++-- fdbserver/IVersionedStore.h | 3 +- fdbserver/VersionedBTree.actor.cpp | 95 ++++++++++++++++++++---------- 3 files changed, 73 insertions(+), 36 deletions(-) diff --git a/fdbserver/IPager.h b/fdbserver/IPager.h index 508c90cf9b..d6e60fd2fe 100644 --- a/fdbserver/IPager.h +++ b/fdbserver/IPager.h @@ -213,13 +213,14 @@ public: // After the returned future is ready, future calls must not wait. virtual Future getLatestVersion() = 0; - // The pager can invalidate snapshots at versions < v and reuse - // any pages that were freed as of version v - virtual void setOldestVersion(Version v) = 0; - - // Get the oldest readable version + // Returns the oldest readable version as of the most recent committed version virtual Future getOldestVersion() = 0; + // The pager can reuse pages that were freed at a version less than v. + // If any snapshots are in use at a version less than v, the pager can invalidate them + // or keep their versions around until the snapshots are no longer in use. + virtual void setOldestVersion(Version v) = 0; + protected: ~IPager2() {} // Destruction should be done using close()/dispose() from the IClosable interface }; diff --git a/fdbserver/IVersionedStore.h b/fdbserver/IVersionedStore.h index d991073b2d..482a1521a9 100644 --- a/fdbserver/IVersionedStore.h +++ b/fdbserver/IVersionedStore.h @@ -58,7 +58,8 @@ public: virtual void clear(KeyRangeRef range) = 0; virtual void mutate(int op, StringRef param1, StringRef param2) = 0; virtual void setWriteVersion(Version) = 0; // The write version must be nondecreasing - virtual void forgetVersions(Version begin, Version end) = 0; // Versions [begin, end) no longer readable + virtual void setOldestVersion(Version v) = 0; // Set oldest readable version to be used in next commit + virtual Version getOldestVersion() = 0; // Get oldest readable version virtual Future commit() = 0; virtual Future getLatestVersion() = 0; diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index d4c4ed5c61..bce0462add 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -779,6 +779,8 @@ ACTOR template Future forwardError(Future f, Promise target } } +class COWPagerSnapshot; + class COWPager : public IPager2 { public: typedef FastAllocatedPage Page; @@ -940,8 +942,8 @@ public: self->pHeader->pageCount = 2; // Create a new free list - self->freeList.create(self, self->newPageID().get(), "FreeList"); - self->delayedFreeList.create(self, self->newPageID().get(), "delayedFreeList"); + self->freeList.create(self, self->newLastPageID(), "FreeList"); + self->delayedFreeList.create(self, self->newLastPageID(), "delayedFreeList"); // The first commit() below will flush the queues and update the queue states in the header, // but since the queues will not be used between now and then their states will not change. @@ -982,19 +984,28 @@ public: return freePageID.get(); } - Optional delayedFreePageID = wait(self->delayedFreeList.pop(DelayedFreePage{self->pLastCommittedHeader->oldestVersion, 0})); + // Try to reuse pages up to the earlier of the oldest version set by the user or the oldest snapshot still in the snapshots list + ASSERT(!self->snapshots.empty()); + Version oldestVersion = std::min(self->pLastCommittedHeader->oldestVersion, self->snapshots.front().version); + Optional delayedFreePageID = wait(self->delayedFreeList.pop(DelayedFreePage{oldestVersion, 0})); if(delayedFreePageID.present()) { debug_printf("COWPager(%s) newPageID() returning %s from delayed free list\n", self->filename.c_str(), toString(delayedFreePageID.get()).c_str()); return delayedFreePageID.get().pageID; } - // Lastly, grow the pager file by a page and return it. - LogicalPageID id = self->pHeader->pageCount; - ++self->pHeader->pageCount; + // Lastly, add a new page to the pager + LogicalPageID id = self->newLastPageID(); debug_printf("COWPager(%s) newPageID() returning %s at end of file\n", self->filename.c_str(), toString(id).c_str()); return id; }; + // Grow the pager file by pone page and return it + LogicalPageID newLastPageID() { + LogicalPageID id = pHeader->pageCount; + ++pHeader->pageCount; + return id; + } + Future newPageID() override { return forwardError(newPageID_impl(this), errorPromise); } @@ -1131,7 +1142,7 @@ public: // Get snapshot as of the most recent committed version of the pager Reference getReadSnapshot(Version v) override; - void addLatestSnapshot() override; + void addLatestSnapshot(); void setOldestVersion(Version v) override { ASSERT(v >= pHeader->oldestVersion); @@ -1156,6 +1167,10 @@ public: loop { state bool freeBusy = wait(self->freeList.preFlush()); state bool delayedFreeBusy = wait(self->delayedFreeList.preFlush()); + + // Once preFlush() returns false for both queues then there are no more operations pending + // on either queue. If preFlush() returns true for either queue in one loop execution then + // it could have generated new work for itself or the other queue. if(!freeBusy && !delayedFreeBusy) { break; } @@ -1184,6 +1199,9 @@ public: self->updateCommittedHeader(); self->addLatestSnapshot(); + // Try to expire snapshots up to the oldest version, in case some were being kept around due to being in use, + // because maybe some are no longer in use. + self->expireSnapshots(self->pHeader->oldestVersion); return Void(); } @@ -1268,15 +1286,8 @@ public: private: ~COWPager() {} - // Expire snapshots up to but not including v - void expireSnapshots(Version v) { - debug_printf("COWPager(%s) expiring snapshots through %" PRId64 " snapshot count %d\n", filename.c_str(), v, (int)snapshots.size()); - while(snapshots.size() > 1 && snapshots.front().version < v) { - debug_printf("COWPager(%s) expiring snapshot for %" PRId64 "\n", filename.c_str(), snapshots.front().version); - snapshots.front().expired.sendError(transaction_too_old()); - snapshots.pop_front(); - } - } + // Try to expire snapshots up to but not including v, but do not expire any snapshots that are in use. + void expireSnapshots(Version v); #pragma pack(push, 1) // Header is the format of page 0 of the database @@ -1373,7 +1384,7 @@ private: struct SnapshotEntry { Version version; Promise expired; - Reference snapshot; + Reference snapshot; }; struct SnapshotEntryLessThanVersion { @@ -1390,7 +1401,7 @@ private: }; // Prevents pager from reusing freed pages from version until the snapshot is destroyed -class COWPagerSnapshot : public IPagerSnapshot, ReferenceCounted { +class COWPagerSnapshot : public IPagerSnapshot, public ReferenceCounted { public: COWPagerSnapshot(COWPager *pager, Key meta, Version version, Future expiredFuture) : pager(pager), metaKey(meta), version(version), expired(expiredFuture) { } @@ -1428,6 +1439,18 @@ public: Key metaKey; }; +void COWPager::expireSnapshots(Version v) { + debug_printf("COWPager(%s) expiring snapshots through %" PRId64 " snapshot count %d\n", filename.c_str(), v, (int)snapshots.size()); + while(snapshots.size() > 1 && snapshots.front().version < v && snapshots.front().snapshot->isSoleOwner()) { + debug_printf("COWPager(%s) expiring snapshot for %" PRId64 "\n", filename.c_str(), snapshots.front().version); + // The snapshot contract could be made such that the expired promise isn't need anymore. In practice it + // probably is already not needed but it will gracefully handle the case where a user begins a page read + // with a snapshot reference, keeps the page read future, and drops the snapshot reference. + snapshots.front().expired.sendError(transaction_too_old()); + snapshots.pop_front(); + } +} + Reference COWPager::getReadSnapshot(Version v) { ASSERT(!snapshots.empty()); @@ -1444,7 +1467,7 @@ void COWPager::addLatestSnapshot() { snapshots.push_back({ pLastCommittedHeader->committedVersion, expired, - Reference(new COWPagerSnapshot(this, pLastCommittedHeader->getMetaKey(), pLastCommittedHeader->committedVersion, expired.getFuture())) + Reference(new COWPagerSnapshot(this, pLastCommittedHeader->getMetaKey(), pLastCommittedHeader->committedVersion, expired.getFuture())) }); } @@ -2479,8 +2502,13 @@ public: virtual void mutate(int op, StringRef param1, StringRef param2) NOT_IMPLEMENTED - // Versions [begin, end) no longer readable - virtual void forgetVersions(Version begin, Version end) NOT_IMPLEMENTED + virtual void setOldestVersion(Version v) { + m_newOldestVersion = v; + } + + virtual Version getOldestVersion() { + return m_pager->getOldestVersion().get(); + } virtual Future getLatestVersion() { if(m_writeVersion != invalidVersion) @@ -2567,7 +2595,9 @@ public: ACTOR static Future init_impl(VersionedBTree *self) { state Version latest = wait(self->m_pager->getLatestVersion()); - debug_printf("Recovered pager to version %" PRId64 "\n", latest); + self->m_newOldestVersion = self->m_pager->getOldestVersion().get(); + + debug_printf("Recovered pager to version %" PRId64 ", oldest version is %" PRId64 "\n", self->m_newOldestVersion); state Key meta = self->m_pager->getMetaKey(); if(meta.size() == 0) { @@ -2612,12 +2642,11 @@ public: m_latestCommit.cancel(); } - // readAtVersion() may only be called on a version which has previously been passed to setWriteVersion() and never previously passed - // to forgetVersion. The returned results when violating this precondition are unspecified; the store is not required to be able to detect violations. + // readAtVersion() may only be called on a committed v which has previously been passed to setWriteVersion() and never previously passed + // to setOldestVersion. The returned results when violating this precondition are unspecified; the store is not required to be able to detect violations. // The returned read cursor provides a consistent snapshot of the versioned store, corresponding to all the writes done with write versions less // than or equal to the given version. - // If readAtVersion() is called on the *current* write version, the given read cursor MAY reflect subsequent writes at the same - // write version, OR it may represent a snapshot as of the call to readAtVersion(). + // v must be a committed version. virtual Reference readAtVersion(Version v) { // Only committed versions can be read. Version recordVersion = singleVersion ? 0 : v; @@ -2909,6 +2938,7 @@ private: Version m_writeVersion; Version m_lastCommittedVersion; + Version m_newOldestVersion; Future m_latestCommit; Future m_init; std::string m_name; @@ -3680,10 +3710,8 @@ private: // Wait for the latest commit that started to be finished. wait(previousCommit); - // Advance oldest version by a random number between 0 and the difference between the latest and oldest versions. - Version newOldestVersion = self->m_pager->getOldestVersion().get() + deterministicRandom()->randomInt(0, self->m_pager->getLatestVersion().get() - self->m_pager->getOldestVersion().get() + 1); - self->m_pager->setOldestVersion(newOldestVersion); - debug_printf("%s: Beginning commit of version %" PRId64 ", oldest version set to %" PRId64 "\n", self->m_name.c_str(), writeVersion, newOldestVersion); + self->m_pager->setOldestVersion(self->m_newOldestVersion); + debug_printf("%s: Beginning commit of version %" PRId64 ", new oldest version set to %" PRId64 "\n", self->m_name.c_str(), writeVersion, self->m_newOldestVersion); state Future lazyDelete = incrementalLazyDelete(self, 100); @@ -5277,6 +5305,7 @@ TEST_CASE("!/redwood/correctness/btree") { state int mutationBytesTarget = shortTest ? 5000 : randomSize(std::min(maxCommitSize * 100, 100e6)); state double clearProbability = deterministicRandom()->random01() * .1; state double coldStartProbability = deterministicRandom()->random01(); + state double advanceOldVersionProbability = deterministicRandom()->random01(); state double maxWallClockDuration = 60; printf("\n"); @@ -5290,6 +5319,7 @@ TEST_CASE("!/redwood/correctness/btree") { printf("mutationBytesTarget: %d\n", mutationBytesTarget); printf("clearProbability: %f\n", clearProbability); printf("coldStartProbability: %f\n", coldStartProbability); + printf("advanceOldVersionProbability: %f\n", advanceOldVersionProbability); printf("\n"); printf("Deleting existing test data...\n"); @@ -5431,6 +5461,11 @@ TEST_CASE("!/redwood/correctness/btree") { Version v = version; // Avoid capture of version as a member of *this + // Sometimes advance the oldest version to close the gap between the oldest and latest versions by a random amount. + if(deterministicRandom()->random01() < advanceOldVersionProbability) { + btree->setOldestVersion(btree->getLastCommittedVersion() - deterministicRandom()->randomInt(0, btree->getLastCommittedVersion() - btree->getOldestVersion() + 1)); + } + commit = map(btree->commit(), [=](Void) { printf("Committed: %s\n", VersionedBTree::counts.toString(true).c_str()); // Notify the background verifier that version is committed and therefore readable