Bug fixes in lazy subtree deletion, queue pushFront(), queue flush(), and advancing the oldest pager version. CommitSubtree no longer forces page rewrites due to boundary changes. IPager2 and IVersionedStore now have explicit async init() functions to avoid returning futures from some frequently used functions.

This commit is contained in:
Stephen Atherton 2019-10-22 17:17:29 -07:00
parent 44175e0921
commit 6a57fab431
3 changed files with 103 additions and 65 deletions

View File

@ -209,16 +209,21 @@ public:
virtual StorageBytes getStorageBytes() = 0;
// Future returned is ready when pager has been initialized from disk and is ready for reads and writes.
// It is invalid to call most other functions until init() is ready.
// TODO: Document further.
virtual Future<Void> init() = 0;
// Returns latest committed version
// After the returned future is ready, future calls must not wait.
virtual Future<Version> getLatestVersion() = 0;
virtual Version getLatestVersion() = 0;
// Returns the oldest readable version as of the most recent committed version
virtual Future<Version> getOldestVersion() = 0;
virtual Version getOldestVersion() = 0;
// Sets the oldest readable version to be put into affect at the next commit.
// The pager can reuse pages that were freed at a version less than v.
// If any snapshots are in use at a version less than v, the pager can invalidate them
// or keep their versions around until the snapshots are no longer in use.
// If any snapshots are in use at a version less than v, the pager can either forcefully
// invalidate them or keep their versions around until the snapshots are no longer in use.
virtual void setOldestVersion(Version v) = 0;
protected:

View File

@ -62,7 +62,8 @@ public:
virtual Version getOldestVersion() = 0; // Get oldest readable version
virtual Future<Void> commit() = 0;
virtual Future<Version> getLatestVersion() = 0;
virtual Future<Void> init() = 0;
virtual Version getLatestVersion() = 0;
// readAtVersion() may only be called on a version which has previously been passed to setWriteVersion() and never previously passed
// to forgetVersion. The returned results when violating this precondition are unspecified; the store is not required to be able to detect violations.

View File

@ -354,13 +354,13 @@ public:
wait(previous);
state int bytesNeeded = Codec::bytesNeeded(item);
if(self->offset + bytesNeeded > self->queue->dataBytesPerPage) {
if(self->pageID == invalidLogicalPageID || self->offset + bytesNeeded > self->queue->dataBytesPerPage) {
debug_printf("FIFOQueue::Cursor(%s) write(%s) page is full, adding new page\n", self->toString().c_str(), ::toString(item).c_str());
LogicalPageID newPageID = wait(self->queue->pager->newPageID());
self->addNewPage(newPageID, 0, true);
wait(yield());
}
debug_printf("FIFOQueue::Cursor(%s) write(%s)\n", self->toString().c_str(), ::toString(item).c_str());
debug_printf("FIFOQueue::Cursor(%s) before write(%s)\n", self->toString().c_str(), ::toString(item).c_str());
auto p = self->raw();
Codec::writeToBytes(p->begin() + self->offset, item);
self->offset += bytesNeeded;
@ -410,7 +410,7 @@ public:
self->offset += bytesRead;
--self->queue->numEntries;
debug_printf("FIFOQueue::Cursor(%s) popped %s\n", self->toString().c_str(), ::toString(result).c_str());
debug_printf("FIFOQueue::Cursor(%s) after read of %s\n", self->toString().c_str(), ::toString(result).c_str());
ASSERT(self->offset <= p->endOffset);
if(self->offset == p->endOffset) {
@ -425,9 +425,11 @@ public:
// Freeing the old page must happen after advancing the cursor and clearing the page reference because
// freePage() could cause a push onto a queue that causes a newPageID() call which could pop() from this
// very same queue.
// Queue pages are freed at page 0 because they can be reused after the next commit.
self->queue->pager->freePage(oldPageID, 0);
}
debug_printf("FIFOQueue(%s) pop(upperBound=%s) -> %s\n", self->queue->name.c_str(), ::toString(upperBound).c_str(), ::toString(result).c_str());
return result;
}
@ -584,6 +586,7 @@ public:
headWriter.addNewPage(headReader.pageID, headReader.offset, false);
headReader.pageID = headWriter.firstPageIDWritten;
headReader.offset = 0;
headReader.page.clear();
}
// Update headReader's end page to the new tail page
@ -986,8 +989,7 @@ public:
// Try to reuse pages up to the earlier of the oldest version set by the user or the oldest snapshot still in the snapshots list
ASSERT(!self->snapshots.empty());
Version oldestVersion = std::min(self->pLastCommittedHeader->oldestVersion, self->snapshots.front().version);
Optional<DelayedFreePage> delayedFreePageID = wait(self->delayedFreeList.pop(DelayedFreePage{oldestVersion, 0}));
Optional<DelayedFreePage> delayedFreePageID = wait(self->delayedFreeList.pop(DelayedFreePage{self->effectiveOldestVersion(), 0}));
if(delayedFreePageID.present()) {
debug_printf("COWPager(%s) newPageID() returning %s from delayed free list\n", self->filename.c_str(), toString(delayedFreePageID.get()).c_str());
return delayedFreePageID.get().pageID;
@ -1070,13 +1072,13 @@ public:
void freePage(LogicalPageID pageID, Version v) override {
// If v is older than the oldest version still readable then mark pageID as free as of the next commit
if(v < pLastCommittedHeader->oldestVersion) {
debug_printf("COWPager(%s) op=freeNow %s @%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v);
if(v < effectiveOldestVersion()) {
debug_printf("COWPager(%s) op=freeNow %s @%" PRId64 " oldestVersion=%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v, pLastCommittedHeader->oldestVersion);
freeList.pushBack(pageID);
}
else {
// Otherwise add it to the delayed free list
debug_printf("COWPager(%s) op=freeLater %s @%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v);
debug_printf("COWPager(%s) op=freeLater %s @%" PRId64 " oldestVersion=%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v, pLastCommittedHeader->oldestVersion);
delayedFreeList.pushBack({v, pageID});
}
};
@ -1144,6 +1146,7 @@ public:
Reference<IPagerSnapshot> getReadSnapshot(Version v) override;
void addLatestSnapshot();
// Set the pending oldest versiont to keep as of the next commit
void setOldestVersion(Version v) override {
ASSERT(v >= pHeader->oldestVersion);
ASSERT(v <= pHeader->committedVersion);
@ -1151,12 +1154,17 @@ public:
expireSnapshots(v);
};
Future<Version> getOldestVersion() override {
return map(recoverFuture, [=](Void) {
return pLastCommittedHeader->oldestVersion;
});
// Get the oldest version set as of the last commit.
Version getOldestVersion() override {
return pLastCommittedHeader->oldestVersion;
};
// Calculate the *effective* oldest version, which can be older than the one set in the last commit since we
// are allowing active snapshots to temporarily delay page reuse.
Version effectiveOldestVersion() {
return std::min(pLastCommittedHeader->oldestVersion, snapshots.front().version);
}
ACTOR static Future<Void> commit_impl(COWPager *self) {
debug_printf("COWPager(%s) commit begin\n", self->filename.c_str());
@ -1277,10 +1285,12 @@ public:
return StorageBytes(free, total, pagerSize, free + reusable);
}
Future<Version> getLatestVersion() override {
return map(recoverFuture, [=](Void) {
return pLastCommittedHeader->committedVersion;
});
Future<Void> init() override {
return recoverFuture;
}
Version getLatestVersion() override {
return pLastCommittedHeader->committedVersion;
}
private:
@ -1442,7 +1452,7 @@ public:
void COWPager::expireSnapshots(Version v) {
debug_printf("COWPager(%s) expiring snapshots through %" PRId64 " snapshot count %d\n", filename.c_str(), v, (int)snapshots.size());
while(snapshots.size() > 1 && snapshots.front().version < v && snapshots.front().snapshot->isSoleOwner()) {
debug_printf("COWPager(%s) expiring snapshot for %" PRId64 "\n", filename.c_str(), snapshots.front().version);
debug_printf("COWPager(%s) expiring snapshot for %" PRId64 " soleOwner=%d\n", filename.c_str(), snapshots.front().version, snapshots.front().snapshot->isSoleOwner());
// The snapshot contract could be made such that the expired promise isn't need anymore. In practice it
// probably is already not needed but it will gracefully handle the case where a user begins a page read
// with a snapshot reference, keeps the page read future, and drops the snapshot reference.
@ -2507,10 +2517,10 @@ public:
}
virtual Version getOldestVersion() {
return m_pager->getOldestVersion().get();
return m_pager->getOldestVersion();
}
virtual Future<Version> getLatestVersion() {
virtual Version getLatestVersion() {
if(m_writeVersion != invalidVersion)
return m_writeVersion;
return m_pager->getLatestVersion();
@ -2536,9 +2546,9 @@ public:
m_latestCommit = m_init;
}
ACTOR static Future<Void> incrementalLazyDelete(VersionedBTree *self, int minPages) {
ACTOR static Future<int> incrementalLazyDelete(VersionedBTree *self, bool *stop, unsigned int minPages = 0, int maxPages = std::numeric_limits<int>::max()) {
// TODO: Is it contractually okay to always to read at the latest version?
state Reference<IPagerSnapshot> snapshot = self->m_pager->getReadSnapshot(self->m_pager->getLatestVersion().get());
state Reference<IPagerSnapshot> snapshot = self->m_pager->getReadSnapshot(self->m_pager->getLatestVersion());
state int freedPages = 0;
loop {
@ -2546,7 +2556,7 @@ public:
state Optional<LazyDeleteQueueEntry> q = wait(self->m_lazyDeleteQueue.pop());
debug_printf("LazyDelete: popped %s\n", toString(q).c_str());
if(!q.present()) {
return Void();
break;
}
// Read the page without caching
@ -2587,15 +2597,20 @@ public:
self->freeBtreePage(q.get().pageID, v);
freedPages += q.get().pageID.size();
if(freedPages >= minPages) {
return Void();
// If stop is set and we've freed the minimum number of pages required, or the maximum is exceeded, return.
if((freedPages >= minPages && *stop) || freedPages >= maxPages) {
break;
}
}
return freedPages;
}
ACTOR static Future<Void> init_impl(VersionedBTree *self) {
state Version latest = wait(self->m_pager->getLatestVersion());
self->m_newOldestVersion = self->m_pager->getOldestVersion().get();
wait(self->m_pager->init());
state Version latest = self->m_pager->getLatestVersion();
self->m_newOldestVersion = self->m_pager->getOldestVersion();
debug_printf("Recovered pager to version %" PRId64 ", oldest version is %" PRId64 "\n", self->m_newOldestVersion);
@ -2632,7 +2647,9 @@ public:
return Void();
}
Future<Void> init() { return m_init; }
Future<Void> init() override {
return m_init;
}
virtual ~VersionedBTree() {
// This probably shouldn't be called directly (meaning deleting an instance directly) but it should be safe,
@ -3325,10 +3342,6 @@ private:
debug_printf("%s decodeLower=%s decodeUpper=%s\n", context.c_str(), decodeLowerBound->toString().c_str(), decodeUpperBound->toString().c_str());
self->counts.commitToPageStart++;
// If a boundary changed, the page must be rewritten regardless of KV mutations
state bool boundaryChanged = (lowerBound != decodeLowerBound) || (upperBound != decodeUpperBound);
debug_printf("%s boundaryChanged=%d\n", context.c_str(), boundaryChanged);
// Find the slice of the mutation buffer that is relevant to this subtree
// TODO: Rather than two lower_bound searches, perhaps just compare each mutation to the upperBound key while iterating
state MutationBufferT::const_iterator iMutationBoundary = mutationBuffer->upper_bound(lowerBound->key);
@ -3354,27 +3367,43 @@ private:
return results;
}
// If there are no forced boundary changes then this subtree is unchanged.
if(!boundaryChanged) {
results.push_back_deep(results.arena(), VersionAndChildrenRef(0, VectorRef<RedwoodRecordRef>((RedwoodRecordRef *)decodeLowerBound, 1), *decodeUpperBound));
debug_printf("%s page contains a single key '%s' which is not changing, returning %s\n", context.c_str(), lowerBound->key.toString().c_str(), toString(results).c_str());
return results;
}
// Otherwise, no changes to this subtree
results.push_back_deep(results.arena(), VersionAndChildrenRef(0, VectorRef<RedwoodRecordRef>((RedwoodRecordRef *)decodeLowerBound, 1), *decodeUpperBound));
debug_printf("%s page contains a single key '%s' which is not changing, returning %s\n", context.c_str(), lowerBound->key.toString().c_str(), toString(results).c_str());
return results;
}
// Another way to have no mutations is to have a single mutation range cover this
// subtree but have no changes in it
MutationBufferT::const_iterator iMutationBoundaryNext = iMutationBoundary;
++iMutationBoundaryNext;
if(!boundaryChanged && iMutationBoundaryNext == iMutationBoundaryEnd &&
( iMutationBoundary->second.noChanges() ||
( !iMutationBoundary->second.rangeClearVersion.present() &&
iMutationBoundary->first < lowerBound->key)
)
) {
results.push_back_deep(results.arena(), VersionAndChildrenRef(0, VectorRef<RedwoodRecordRef>((RedwoodRecordRef *)decodeLowerBound, 1), *decodeUpperBound));
debug_printf("%s no changes because sole mutation range was not cleared, returning %s\n", context.c_str(), toString(results).c_str());
return results;
// If one mutation range covers the entire page
if(iMutationBoundaryNext == iMutationBoundaryEnd) {
// If there are no changes in the range (no clear, no boundary key mutations)
// OR there are changes but for a key that is less than the page lower boundary and therefore not part of this page
if(iMutationBoundary->second.noChanges() ||
( !iMutationBoundary->second.rangeClearVersion.present() && iMutationBoundary->first < lowerBound->key)
) {
results.push_back_deep(results.arena(), VersionAndChildrenRef(0, VectorRef<RedwoodRecordRef>((RedwoodRecordRef *)decodeLowerBound, 1), *decodeUpperBound));
debug_printf("%s no changes on this subtree, returning %s\n", context.c_str(), toString(results).c_str());
return results;
}
// If the range is cleared and there either no sets or the sets aren't relevant to this subtree then delete it
// The last if subexpression is checking that either the next key in the mutation buffer is being changed or
// the upper bound key of this page isn't the same.
if(iMutationBoundary->second.rangeClearVersion.present()
&& (iMutationBoundary->second.startKeyMutations.empty() || iMutationBoundary->first < lowerBound->key)
&& (!iMutationBoundaryEnd->second.startKeyMutations.empty() || upperBound->key != iMutationBoundaryEnd->first)
) {
debug_printf("%s %s cleared, deleting it, returning %s\n", context.c_str(), isLeaf ? "Page" : "Subtree", toString(results).c_str());
Version clearVersion = self->singleVersion ? self->getLastCommittedVersion() + 1 : iMutationBoundary->second.rangeClearVersion.get();
if(isLeaf) {
self->freeBtreePage(rootID, clearVersion);
}
else {
self->m_lazyDeleteQueue.pushBack(LazyDeleteQueueEntry{clearVersion, rootID});
}
return results;
}
}
self->counts.commitToPage++;
@ -3530,8 +3559,7 @@ private:
debug_printf("%s Done merging mutations into existing leaf contents, made %d changes\n", context.c_str(), changes);
// No changes were actually made. This could happen if the only mutations are clear ranges which do not match any records.
// But if a boundary was changed then we must rewrite the page anyway.
if(!boundaryChanged && minVersion == invalidVersion) {
if(minVersion == invalidVersion) {
results.push_back_deep(results.arena(), VersionAndChildrenRef(0, VectorRef<RedwoodRecordRef>((RedwoodRecordRef *)decodeLowerBound, 1), *decodeUpperBound));
debug_printf("%s No changes were made during mutation merge, returning %s\n", context.c_str(), toString(results).c_str());
ASSERT(changes == 0);
@ -3713,10 +3741,11 @@ private:
self->m_pager->setOldestVersion(self->m_newOldestVersion);
debug_printf("%s: Beginning commit of version %" PRId64 ", new oldest version set to %" PRId64 "\n", self->m_name.c_str(), writeVersion, self->m_newOldestVersion);
state Future<Void> lazyDelete = incrementalLazyDelete(self, 100);
state bool lazyDeleteStop = false;
state Future<int> lazyDelete = incrementalLazyDelete(self, &lazyDeleteStop);
// Get the latest version from the pager, which is what we will read at
state Version latestVersion = wait(self->m_pager->getLatestVersion());
state Version latestVersion = self->m_pager->getLatestVersion();
debug_printf("%s: pager latestVersion %" PRId64 "\n", self->m_name.c_str(), latestVersion);
if(REDWOOD_DEBUG) {
@ -3755,7 +3784,9 @@ private:
self->m_header.root.set(rootPageID, sizeof(headerSpace) - sizeof(m_header));
wait(lazyDelete);
lazyDeleteStop = true;
wait(success(lazyDelete));
debug_printf("Lazy delete freed %u pages\n", lazyDelete.get());
self->m_pager->setCommitVersion(writeVersion);
@ -4336,7 +4367,7 @@ public:
ACTOR Future<Void> init_impl(KeyValueStoreRedwoodUnversioned *self) {
TraceEvent(SevInfo, "RedwoodInit").detail("FilePrefix", self->m_filePrefix);
wait(self->m_tree->init());
Version v = wait(self->m_tree->getLatestVersion());
Version v = self->m_tree->getLatestVersion();
self->m_tree->setWriteVersion(v + 1);
TraceEvent(SevInfo, "RedwoodInitComplete").detail("FilePrefix", self->m_filePrefix);
return Void();
@ -4373,6 +4404,7 @@ public:
Future<Void> commit(bool sequential = false) {
Future<Void> c = m_tree->commit();
m_tree->setOldestVersion(m_tree->getLatestVersion());
m_tree->setWriteVersion(m_tree->getWriteVersion() + 1);
return catchError(c);
}
@ -5334,7 +5366,7 @@ TEST_CASE("!/redwood/correctness/btree") {
state std::map<std::pair<std::string, Version>, Optional<std::string>> written;
state std::set<Key> keys;
state Version lastVer = wait(btree->getLatestVersion());
state Version lastVer = btree->getLatestVersion();
printf("Starting from version: %" PRId64 "\n", lastVer);
state Version version = lastVer + 1;
@ -5508,7 +5540,7 @@ TEST_CASE("!/redwood/correctness/btree") {
btree = new VersionedBTree(pager, pagerFile, singleVersion);
wait(btree->init());
Version v = wait(btree->getLatestVersion());
Version v = btree->getLatestVersion();
ASSERT(v == version);
printf("Recovered from disk. Latest version %" PRId64 "\n", v);
@ -5545,7 +5577,7 @@ TEST_CASE("!/redwood/correctness/btree") {
}
ACTOR Future<Void> randomSeeks(VersionedBTree *btree, int count, char firstChar, char lastChar) {
state Version readVer = wait(btree->getLatestVersion());
state Version readVer = btree->getLatestVersion();
state int c = 0;
state double readStart = timer();
printf("Executing %d random seeks\n", count);
@ -5569,7 +5601,7 @@ TEST_CASE("!/redwood/correctness/pager/cow") {
int pageSize = 4096;
state IPager2 *pager = new COWPager(pageSize, pagerFile, 0);
wait(success(pager->getLatestVersion()));
wait(success(pager->init()));
state LogicalPageID id = wait(pager->newPageID());
Reference<IPage> p = pager->newPageBuffer();
memset(p->mutate(), (char)id, p->size());
@ -5622,7 +5654,7 @@ TEST_CASE("!/redwood/performance/set") {
while(kvBytesTotal < kvBytesTarget) {
wait(yield());
Version lastVer = wait(btree->getLatestVersion());
Version lastVer = btree->getLatestVersion();
state Version version = lastVer + 1;
btree->setWriteVersion(version);
int changes = deterministicRandom()->randomInt(0, maxChangesPerVersion);