From 6a57fab43145526858169424056307ee3be0d8de Mon Sep 17 00:00:00 2001 From: Stephen Atherton Date: Tue, 22 Oct 2019 17:17:29 -0700 Subject: [PATCH] Bug fixes in lazy subtree deletion, queue pushFront(), queue flush(), and advancing the oldest pager version. CommitSubtree no longer forces page rewrites due to boundary changes. IPager2 and IVersionedStore now have explicit async init() functions to avoid returning futures from some frequently used functions. --- fdbserver/IPager.h | 15 ++- fdbserver/IVersionedStore.h | 3 +- fdbserver/VersionedBTree.actor.cpp | 150 +++++++++++++++++------------ 3 files changed, 103 insertions(+), 65 deletions(-) diff --git a/fdbserver/IPager.h b/fdbserver/IPager.h index d6e60fd2fe..35549ac096 100644 --- a/fdbserver/IPager.h +++ b/fdbserver/IPager.h @@ -209,16 +209,21 @@ public: virtual StorageBytes getStorageBytes() = 0; + // Future returned is ready when pager has been initialized from disk and is ready for reads and writes. + // It is invalid to call most other functions until init() is ready. + // TODO: Document further. + virtual Future init() = 0; + // Returns latest committed version - // After the returned future is ready, future calls must not wait. - virtual Future getLatestVersion() = 0; + virtual Version getLatestVersion() = 0; // Returns the oldest readable version as of the most recent committed version - virtual Future getOldestVersion() = 0; + virtual Version getOldestVersion() = 0; + // Sets the oldest readable version to be put into affect at the next commit. // The pager can reuse pages that were freed at a version less than v. - // If any snapshots are in use at a version less than v, the pager can invalidate them - // or keep their versions around until the snapshots are no longer in use. + // If any snapshots are in use at a version less than v, the pager can either forcefully + // invalidate them or keep their versions around until the snapshots are no longer in use. virtual void setOldestVersion(Version v) = 0; protected: diff --git a/fdbserver/IVersionedStore.h b/fdbserver/IVersionedStore.h index 482a1521a9..de4cfd2084 100644 --- a/fdbserver/IVersionedStore.h +++ b/fdbserver/IVersionedStore.h @@ -62,7 +62,8 @@ public: virtual Version getOldestVersion() = 0; // Get oldest readable version virtual Future commit() = 0; - virtual Future getLatestVersion() = 0; + virtual Future init() = 0; + virtual Version getLatestVersion() = 0; // readAtVersion() may only be called on a version which has previously been passed to setWriteVersion() and never previously passed // to forgetVersion. The returned results when violating this precondition are unspecified; the store is not required to be able to detect violations. diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index bce0462add..ab06953722 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -354,13 +354,13 @@ public: wait(previous); state int bytesNeeded = Codec::bytesNeeded(item); - if(self->offset + bytesNeeded > self->queue->dataBytesPerPage) { + if(self->pageID == invalidLogicalPageID || self->offset + bytesNeeded > self->queue->dataBytesPerPage) { debug_printf("FIFOQueue::Cursor(%s) write(%s) page is full, adding new page\n", self->toString().c_str(), ::toString(item).c_str()); LogicalPageID newPageID = wait(self->queue->pager->newPageID()); self->addNewPage(newPageID, 0, true); wait(yield()); } - debug_printf("FIFOQueue::Cursor(%s) write(%s)\n", self->toString().c_str(), ::toString(item).c_str()); + debug_printf("FIFOQueue::Cursor(%s) before write(%s)\n", self->toString().c_str(), ::toString(item).c_str()); auto p = self->raw(); Codec::writeToBytes(p->begin() + self->offset, item); self->offset += bytesNeeded; @@ -410,7 +410,7 @@ public: self->offset += bytesRead; --self->queue->numEntries; - debug_printf("FIFOQueue::Cursor(%s) popped %s\n", self->toString().c_str(), ::toString(result).c_str()); + debug_printf("FIFOQueue::Cursor(%s) after read of %s\n", self->toString().c_str(), ::toString(result).c_str()); ASSERT(self->offset <= p->endOffset); if(self->offset == p->endOffset) { @@ -425,9 +425,11 @@ public: // Freeing the old page must happen after advancing the cursor and clearing the page reference because // freePage() could cause a push onto a queue that causes a newPageID() call which could pop() from this // very same queue. + // Queue pages are freed at page 0 because they can be reused after the next commit. self->queue->pager->freePage(oldPageID, 0); } + debug_printf("FIFOQueue(%s) pop(upperBound=%s) -> %s\n", self->queue->name.c_str(), ::toString(upperBound).c_str(), ::toString(result).c_str()); return result; } @@ -584,6 +586,7 @@ public: headWriter.addNewPage(headReader.pageID, headReader.offset, false); headReader.pageID = headWriter.firstPageIDWritten; headReader.offset = 0; + headReader.page.clear(); } // Update headReader's end page to the new tail page @@ -986,8 +989,7 @@ public: // Try to reuse pages up to the earlier of the oldest version set by the user or the oldest snapshot still in the snapshots list ASSERT(!self->snapshots.empty()); - Version oldestVersion = std::min(self->pLastCommittedHeader->oldestVersion, self->snapshots.front().version); - Optional delayedFreePageID = wait(self->delayedFreeList.pop(DelayedFreePage{oldestVersion, 0})); + Optional delayedFreePageID = wait(self->delayedFreeList.pop(DelayedFreePage{self->effectiveOldestVersion(), 0})); if(delayedFreePageID.present()) { debug_printf("COWPager(%s) newPageID() returning %s from delayed free list\n", self->filename.c_str(), toString(delayedFreePageID.get()).c_str()); return delayedFreePageID.get().pageID; @@ -1070,13 +1072,13 @@ public: void freePage(LogicalPageID pageID, Version v) override { // If v is older than the oldest version still readable then mark pageID as free as of the next commit - if(v < pLastCommittedHeader->oldestVersion) { - debug_printf("COWPager(%s) op=freeNow %s @%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v); + if(v < effectiveOldestVersion()) { + debug_printf("COWPager(%s) op=freeNow %s @%" PRId64 " oldestVersion=%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v, pLastCommittedHeader->oldestVersion); freeList.pushBack(pageID); } else { // Otherwise add it to the delayed free list - debug_printf("COWPager(%s) op=freeLater %s @%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v); + debug_printf("COWPager(%s) op=freeLater %s @%" PRId64 " oldestVersion=%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v, pLastCommittedHeader->oldestVersion); delayedFreeList.pushBack({v, pageID}); } }; @@ -1144,6 +1146,7 @@ public: Reference getReadSnapshot(Version v) override; void addLatestSnapshot(); + // Set the pending oldest versiont to keep as of the next commit void setOldestVersion(Version v) override { ASSERT(v >= pHeader->oldestVersion); ASSERT(v <= pHeader->committedVersion); @@ -1151,12 +1154,17 @@ public: expireSnapshots(v); }; - Future getOldestVersion() override { - return map(recoverFuture, [=](Void) { - return pLastCommittedHeader->oldestVersion; - }); + // Get the oldest version set as of the last commit. + Version getOldestVersion() override { + return pLastCommittedHeader->oldestVersion; }; + // Calculate the *effective* oldest version, which can be older than the one set in the last commit since we + // are allowing active snapshots to temporarily delay page reuse. + Version effectiveOldestVersion() { + return std::min(pLastCommittedHeader->oldestVersion, snapshots.front().version); + } + ACTOR static Future commit_impl(COWPager *self) { debug_printf("COWPager(%s) commit begin\n", self->filename.c_str()); @@ -1277,10 +1285,12 @@ public: return StorageBytes(free, total, pagerSize, free + reusable); } - Future getLatestVersion() override { - return map(recoverFuture, [=](Void) { - return pLastCommittedHeader->committedVersion; - }); + Future init() override { + return recoverFuture; + } + + Version getLatestVersion() override { + return pLastCommittedHeader->committedVersion; } private: @@ -1442,7 +1452,7 @@ public: void COWPager::expireSnapshots(Version v) { debug_printf("COWPager(%s) expiring snapshots through %" PRId64 " snapshot count %d\n", filename.c_str(), v, (int)snapshots.size()); while(snapshots.size() > 1 && snapshots.front().version < v && snapshots.front().snapshot->isSoleOwner()) { - debug_printf("COWPager(%s) expiring snapshot for %" PRId64 "\n", filename.c_str(), snapshots.front().version); + debug_printf("COWPager(%s) expiring snapshot for %" PRId64 " soleOwner=%d\n", filename.c_str(), snapshots.front().version, snapshots.front().snapshot->isSoleOwner()); // The snapshot contract could be made such that the expired promise isn't need anymore. In practice it // probably is already not needed but it will gracefully handle the case where a user begins a page read // with a snapshot reference, keeps the page read future, and drops the snapshot reference. @@ -2507,10 +2517,10 @@ public: } virtual Version getOldestVersion() { - return m_pager->getOldestVersion().get(); + return m_pager->getOldestVersion(); } - virtual Future getLatestVersion() { + virtual Version getLatestVersion() { if(m_writeVersion != invalidVersion) return m_writeVersion; return m_pager->getLatestVersion(); @@ -2536,9 +2546,9 @@ public: m_latestCommit = m_init; } - ACTOR static Future incrementalLazyDelete(VersionedBTree *self, int minPages) { + ACTOR static Future incrementalLazyDelete(VersionedBTree *self, bool *stop, unsigned int minPages = 0, int maxPages = std::numeric_limits::max()) { // TODO: Is it contractually okay to always to read at the latest version? - state Reference snapshot = self->m_pager->getReadSnapshot(self->m_pager->getLatestVersion().get()); + state Reference snapshot = self->m_pager->getReadSnapshot(self->m_pager->getLatestVersion()); state int freedPages = 0; loop { @@ -2546,7 +2556,7 @@ public: state Optional q = wait(self->m_lazyDeleteQueue.pop()); debug_printf("LazyDelete: popped %s\n", toString(q).c_str()); if(!q.present()) { - return Void(); + break; } // Read the page without caching @@ -2587,15 +2597,20 @@ public: self->freeBtreePage(q.get().pageID, v); freedPages += q.get().pageID.size(); - if(freedPages >= minPages) { - return Void(); + // If stop is set and we've freed the minimum number of pages required, or the maximum is exceeded, return. + if((freedPages >= minPages && *stop) || freedPages >= maxPages) { + break; } } + + return freedPages; } ACTOR static Future init_impl(VersionedBTree *self) { - state Version latest = wait(self->m_pager->getLatestVersion()); - self->m_newOldestVersion = self->m_pager->getOldestVersion().get(); + wait(self->m_pager->init()); + + state Version latest = self->m_pager->getLatestVersion(); + self->m_newOldestVersion = self->m_pager->getOldestVersion(); debug_printf("Recovered pager to version %" PRId64 ", oldest version is %" PRId64 "\n", self->m_newOldestVersion); @@ -2632,7 +2647,9 @@ public: return Void(); } - Future init() { return m_init; } + Future init() override { + return m_init; + } virtual ~VersionedBTree() { // This probably shouldn't be called directly (meaning deleting an instance directly) but it should be safe, @@ -3325,10 +3342,6 @@ private: debug_printf("%s decodeLower=%s decodeUpper=%s\n", context.c_str(), decodeLowerBound->toString().c_str(), decodeUpperBound->toString().c_str()); self->counts.commitToPageStart++; - // If a boundary changed, the page must be rewritten regardless of KV mutations - state bool boundaryChanged = (lowerBound != decodeLowerBound) || (upperBound != decodeUpperBound); - debug_printf("%s boundaryChanged=%d\n", context.c_str(), boundaryChanged); - // Find the slice of the mutation buffer that is relevant to this subtree // TODO: Rather than two lower_bound searches, perhaps just compare each mutation to the upperBound key while iterating state MutationBufferT::const_iterator iMutationBoundary = mutationBuffer->upper_bound(lowerBound->key); @@ -3354,27 +3367,43 @@ private: return results; } - // If there are no forced boundary changes then this subtree is unchanged. - if(!boundaryChanged) { - results.push_back_deep(results.arena(), VersionAndChildrenRef(0, VectorRef((RedwoodRecordRef *)decodeLowerBound, 1), *decodeUpperBound)); - debug_printf("%s page contains a single key '%s' which is not changing, returning %s\n", context.c_str(), lowerBound->key.toString().c_str(), toString(results).c_str()); - return results; - } + // Otherwise, no changes to this subtree + results.push_back_deep(results.arena(), VersionAndChildrenRef(0, VectorRef((RedwoodRecordRef *)decodeLowerBound, 1), *decodeUpperBound)); + debug_printf("%s page contains a single key '%s' which is not changing, returning %s\n", context.c_str(), lowerBound->key.toString().c_str(), toString(results).c_str()); + return results; } - // Another way to have no mutations is to have a single mutation range cover this - // subtree but have no changes in it MutationBufferT::const_iterator iMutationBoundaryNext = iMutationBoundary; ++iMutationBoundaryNext; - if(!boundaryChanged && iMutationBoundaryNext == iMutationBoundaryEnd && - ( iMutationBoundary->second.noChanges() || - ( !iMutationBoundary->second.rangeClearVersion.present() && - iMutationBoundary->first < lowerBound->key) - ) - ) { - results.push_back_deep(results.arena(), VersionAndChildrenRef(0, VectorRef((RedwoodRecordRef *)decodeLowerBound, 1), *decodeUpperBound)); - debug_printf("%s no changes because sole mutation range was not cleared, returning %s\n", context.c_str(), toString(results).c_str()); - return results; + // If one mutation range covers the entire page + if(iMutationBoundaryNext == iMutationBoundaryEnd) { + // If there are no changes in the range (no clear, no boundary key mutations) + // OR there are changes but for a key that is less than the page lower boundary and therefore not part of this page + if(iMutationBoundary->second.noChanges() || + ( !iMutationBoundary->second.rangeClearVersion.present() && iMutationBoundary->first < lowerBound->key) + ) { + results.push_back_deep(results.arena(), VersionAndChildrenRef(0, VectorRef((RedwoodRecordRef *)decodeLowerBound, 1), *decodeUpperBound)); + debug_printf("%s no changes on this subtree, returning %s\n", context.c_str(), toString(results).c_str()); + return results; + } + + // If the range is cleared and there either no sets or the sets aren't relevant to this subtree then delete it + // The last if subexpression is checking that either the next key in the mutation buffer is being changed or + // the upper bound key of this page isn't the same. + if(iMutationBoundary->second.rangeClearVersion.present() + && (iMutationBoundary->second.startKeyMutations.empty() || iMutationBoundary->first < lowerBound->key) + && (!iMutationBoundaryEnd->second.startKeyMutations.empty() || upperBound->key != iMutationBoundaryEnd->first) + ) { + debug_printf("%s %s cleared, deleting it, returning %s\n", context.c_str(), isLeaf ? "Page" : "Subtree", toString(results).c_str()); + Version clearVersion = self->singleVersion ? self->getLastCommittedVersion() + 1 : iMutationBoundary->second.rangeClearVersion.get(); + if(isLeaf) { + self->freeBtreePage(rootID, clearVersion); + } + else { + self->m_lazyDeleteQueue.pushBack(LazyDeleteQueueEntry{clearVersion, rootID}); + } + return results; + } } self->counts.commitToPage++; @@ -3530,8 +3559,7 @@ private: debug_printf("%s Done merging mutations into existing leaf contents, made %d changes\n", context.c_str(), changes); // No changes were actually made. This could happen if the only mutations are clear ranges which do not match any records. - // But if a boundary was changed then we must rewrite the page anyway. - if(!boundaryChanged && minVersion == invalidVersion) { + if(minVersion == invalidVersion) { results.push_back_deep(results.arena(), VersionAndChildrenRef(0, VectorRef((RedwoodRecordRef *)decodeLowerBound, 1), *decodeUpperBound)); debug_printf("%s No changes were made during mutation merge, returning %s\n", context.c_str(), toString(results).c_str()); ASSERT(changes == 0); @@ -3713,10 +3741,11 @@ private: self->m_pager->setOldestVersion(self->m_newOldestVersion); debug_printf("%s: Beginning commit of version %" PRId64 ", new oldest version set to %" PRId64 "\n", self->m_name.c_str(), writeVersion, self->m_newOldestVersion); - state Future lazyDelete = incrementalLazyDelete(self, 100); + state bool lazyDeleteStop = false; + state Future lazyDelete = incrementalLazyDelete(self, &lazyDeleteStop); // Get the latest version from the pager, which is what we will read at - state Version latestVersion = wait(self->m_pager->getLatestVersion()); + state Version latestVersion = self->m_pager->getLatestVersion(); debug_printf("%s: pager latestVersion %" PRId64 "\n", self->m_name.c_str(), latestVersion); if(REDWOOD_DEBUG) { @@ -3755,7 +3784,9 @@ private: self->m_header.root.set(rootPageID, sizeof(headerSpace) - sizeof(m_header)); - wait(lazyDelete); + lazyDeleteStop = true; + wait(success(lazyDelete)); + debug_printf("Lazy delete freed %u pages\n", lazyDelete.get()); self->m_pager->setCommitVersion(writeVersion); @@ -4336,7 +4367,7 @@ public: ACTOR Future init_impl(KeyValueStoreRedwoodUnversioned *self) { TraceEvent(SevInfo, "RedwoodInit").detail("FilePrefix", self->m_filePrefix); wait(self->m_tree->init()); - Version v = wait(self->m_tree->getLatestVersion()); + Version v = self->m_tree->getLatestVersion(); self->m_tree->setWriteVersion(v + 1); TraceEvent(SevInfo, "RedwoodInitComplete").detail("FilePrefix", self->m_filePrefix); return Void(); @@ -4373,6 +4404,7 @@ public: Future commit(bool sequential = false) { Future c = m_tree->commit(); + m_tree->setOldestVersion(m_tree->getLatestVersion()); m_tree->setWriteVersion(m_tree->getWriteVersion() + 1); return catchError(c); } @@ -5334,7 +5366,7 @@ TEST_CASE("!/redwood/correctness/btree") { state std::map, Optional> written; state std::set keys; - state Version lastVer = wait(btree->getLatestVersion()); + state Version lastVer = btree->getLatestVersion(); printf("Starting from version: %" PRId64 "\n", lastVer); state Version version = lastVer + 1; @@ -5508,7 +5540,7 @@ TEST_CASE("!/redwood/correctness/btree") { btree = new VersionedBTree(pager, pagerFile, singleVersion); wait(btree->init()); - Version v = wait(btree->getLatestVersion()); + Version v = btree->getLatestVersion(); ASSERT(v == version); printf("Recovered from disk. Latest version %" PRId64 "\n", v); @@ -5545,7 +5577,7 @@ TEST_CASE("!/redwood/correctness/btree") { } ACTOR Future randomSeeks(VersionedBTree *btree, int count, char firstChar, char lastChar) { - state Version readVer = wait(btree->getLatestVersion()); + state Version readVer = btree->getLatestVersion(); state int c = 0; state double readStart = timer(); printf("Executing %d random seeks\n", count); @@ -5569,7 +5601,7 @@ TEST_CASE("!/redwood/correctness/pager/cow") { int pageSize = 4096; state IPager2 *pager = new COWPager(pageSize, pagerFile, 0); - wait(success(pager->getLatestVersion())); + wait(success(pager->init())); state LogicalPageID id = wait(pager->newPageID()); Reference p = pager->newPageBuffer(); memset(p->mutate(), (char)id, p->size()); @@ -5622,7 +5654,7 @@ TEST_CASE("!/redwood/performance/set") { while(kvBytesTotal < kvBytesTarget) { wait(yield()); - Version lastVer = wait(btree->getLatestVersion()); + Version lastVer = btree->getLatestVersion(); state Version version = lastVer + 1; btree->setWriteVersion(version); int changes = deterministicRandom()->randomInt(0, maxChangesPerVersion);