From 3f8fce029693db87ba5884480ce4dd931563d487 Mon Sep 17 00:00:00 2001 From: Stephen Atherton Date: Tue, 21 May 2019 19:16:32 -0700 Subject: [PATCH] Checkpointing progress on single-version mode in VersionedBTree. Subtree clears now work, preserving internal page boundary keys when necessary. Multi-version mode is unfortunately now broken, in addition to being incomplete. Added serial and simple btree unit test options. --- fdbserver/VersionedBTree.actor.cpp | 402 ++++++++++++++++++++--------- 1 file changed, 277 insertions(+), 125 deletions(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 5c926f1b52..f66ab5aa9f 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -52,6 +52,10 @@ struct RedwoodRecordRef { } } + RedwoodRecordRef withoutValue() const { + return RedwoodRecordRef(key, version, {}, chunk.total, chunk.start); + } + KeyRef key; Version version; Optional value; @@ -138,20 +142,19 @@ struct RedwoodRecordRef { #pragma pack(pop) int compare(const RedwoodRecordRef &rhs) const { - //printf("compare %s to %s\n", toString().c_str(), rhs.toString().c_str()); int cmp = key.compare(rhs.key); if(cmp == 0) { cmp = version - rhs.version; if(cmp == 0) { - // Absent value sorts higher than present (for reasons) - if(value.present() != rhs.value.present()) { - cmp = value.present() ? -1 : 1; - } - else { - // Chunked (represented by chunk.total > 0) sorts higher than whole + // It is assumed that in any data set there will never be more than one + // unique chunk total size for the same key and version, so sort by start, total + // Chunked (represented by chunk.total > 0) sorts higher than whole + cmp = chunk.start - rhs.chunk.start; + if(cmp == 0) { cmp = chunk.total - rhs.chunk.total; if(cmp == 0) { - cmp = chunk.start - rhs.chunk.start; + // No-value sorts AFTER having a value. + cmp = (value.present() ? 0 : 1) - (rhs.value.present() ? 0 : 1); } } } @@ -235,14 +238,12 @@ struct RedwoodRecordRef { std::string toString(int hexLimit = 15) const { std::string r; r += format("'%s' @%lld ", kvformat(key, hexLimit).c_str(), version); - if(isMultiPart()) { - r += format("[%d/%d] ", chunk.start, chunk.total); - } + r += format("[%d/%d] ", chunk.start, chunk.total); if(value.present()) { r += format("-> '%s'", kvformat(value.get(), hexLimit).c_str()); } else { - r += "-> "; + r += "-> "; } return r; } @@ -303,10 +304,13 @@ struct BTreePage { r += " "; if(!(flags & IS_LEAF)) { RedwoodRecordRef rec = c.get(); - ASSERT(rec.value.present() && rec.value.get().size() == sizeof(uint32_t)); - uint32_t id = *(const uint32_t *)rec.value.get().begin(); - std::string val = format("[Page id=%u]", id); - rec.value = val; + std::string val; + if(rec.value.present()) { + ASSERT(rec.value.get().size() == sizeof(uint32_t)); + uint32_t id = *(const uint32_t *)rec.value.get().begin(); + val = format("[Page id=%u]", id); + rec.value = val; + } r += rec.toString(); } else { @@ -315,6 +319,8 @@ struct BTreePage { r += "\n"; + ASSERT(c.get().key >= lowerBound->key && c.get().key <= upperBound->key); + } while(c.moveNext()); } } catch(Error &e) { @@ -327,7 +333,7 @@ struct BTreePage { } }; -static void writeEmptyPage(Reference page, uint8_t newFlags, int pageSize) { +static void makeEmptyPage(Reference page, uint8_t newFlags, int pageSize) { VALGRIND_MAKE_MEM_DEFINED(page->begin(), page->size()); BTreePage *btpage = (BTreePage *)page->begin(); btpage->flags = newFlags; @@ -371,7 +377,7 @@ static std::vector buildPages(bool minimalBoundaries, const Red int i = 0; const int iEnd = entries.size(); // Lower bound of the page being added to - RedwoodRecordRef pageLowerBound = lowerBound; + RedwoodRecordRef pageLowerBound = lowerBound.withoutValue(); RedwoodRecordRef pageUpperBound; while(i <= iEnd) { @@ -380,7 +386,7 @@ static std::vector buildPages(bool minimalBoundaries, const Red // If not the end, add i to the page if necessary if(end) { - pageUpperBound = upperBound; + pageUpperBound = upperBound.withoutValue(); } else { // Get delta from previous record @@ -420,6 +426,7 @@ static std::vector buildPages(bool minimalBoundaries, const Red if(!fits) { // Flush page if(minimalBoundaries) { + // TODO: Write minimal boundaries // Note that prefixLen is guaranteed to be < entry.key.size() because entries are in increasing order and cannot repeat. // int len = prefixLen + 1; // if(entry.key[prefixLen] == 0) @@ -427,7 +434,7 @@ static std::vector buildPages(bool minimalBoundaries, const Red // pageUpperBound = entry.key.substr(0, len); } else { - pageUpperBound = entry; + pageUpperBound = entry.withoutValue(); } } } @@ -450,7 +457,7 @@ static std::vector buildPages(bool minimalBoundaries, const Red // If not writing the final page, reduce entry count of page by a third if(!end) { i -= count / 3; - pageUpperBound = entries[i]; + pageUpperBound = entries[i].withoutValue(); } debug_printf("Flushing page start=%d i=%d count=%d\nlower: %s\nupper: %s\n", start, i, count, pageLowerBound.toString().c_str(), pageUpperBound.toString().c_str()); @@ -458,11 +465,11 @@ static std::vector buildPages(bool minimalBoundaries, const Red for(int j = start; j < i; ++j) { debug_printf(" %3d: %s\n", j, entries[j].toString().c_str()); if(j > start) { - ASSERT(entries[j] > entries[j - 1]); + //ASSERT(entries[j] > entries[j - 1]); } } + ASSERT(pageLowerBound.key <= pageUpperBound.key); #endif - ASSERT(pageLowerBound <= pageUpperBound); union { BTreePage *btPage; @@ -523,7 +530,7 @@ static std::vector buildPages(bool minimalBoundaries, const Red start = i; kvBytes = 0; compressedBytes = BTreePage::BinaryTree::GetTreeOverhead(); - pageLowerBound = pageUpperBound; + pageLowerBound = pageUpperBound.withoutValue(); } } @@ -620,7 +627,7 @@ public: changes[0] = SingleKeyMutation(keyValue.value); } else { - changes.begin()->second.value = keyValue.value; + changes.begin()->second = SingleKeyMutation(keyValue.value); } } else { @@ -639,6 +646,7 @@ public: if(singleVersion) { RangeMutation &range = iBegin->second; range.startKeyMutations.clear(); + range.startKeyMutations[0] = SingleKeyMutation(); range.rangeClearVersion = 0; ++iBegin; m_pBuffer->erase(iBegin, iEnd); @@ -701,7 +709,7 @@ public: if(latest == 0) { ++latest; Reference page = self->m_pager->newPageBuffer(); - writeEmptyPage(page, BTreePage::IS_LEAF, self->m_usablePageSizeOverride); + makeEmptyPage(page, BTreePage::IS_LEAF, self->m_usablePageSizeOverride); self->writePage(self->m_root, page, latest, &dbBegin, &dbEnd); self->m_pager->setLatestVersion(latest); wait(self->m_pager->commit()); @@ -1091,7 +1099,7 @@ private: }; ACTOR static Future> readPage(Reference snapshot, LogicalPageID id, int usablePageSize, const RedwoodRecordRef *lowerBound, const RedwoodRecordRef *upperBound) { - debug_printf("readPage() op=read id=%u @%lld\n", id, snapshot->getVersion()); + debug_printf("readPage() op=read id=%u @%lld lower=%s upper=%s\n", id, snapshot->getVersion(), lowerBound->toString().c_str(), upperBound->toString().c_str()); wait(delay(0, TaskDiskRead)); state Reference result = wait(snapshot->getPhysicalPage(id)); @@ -1117,6 +1125,7 @@ private: } if(result->userData == nullptr) { + debug_printf("readPage() Creating Reader for page id=%u @%lld lower=%s upper=%s\n", id, snapshot->getVersion(), lowerBound->toString().c_str(), upperBound->toString().c_str()); result->userData = new BTreePage::BinaryTree::Reader(&pTreePage->tree(), lowerBound, upperBound); result->userDataDestructor = [](void *ptr) { delete (BTreePage::BinaryTree::Reader *)ptr; }; } @@ -1131,47 +1140,47 @@ private: // Returns list of (version, list of (lower_bound, list of children) ) // TODO: Probably should pass prev/next records by pointer in many places - ACTOR static Future commitSubtree(VersionedBTree *self, MutationBufferT *mutationBuffer, Reference snapshot, LogicalPageID root, const RedwoodRecordRef *lowerBound, const RedwoodRecordRef *upperBound, const RedwoodRecordRef *newLowerBound = nullptr, const RedwoodRecordRef *newUpperBound = nullptr) { - debug_printf("%p commitSubtree: root=%d lower='%s' upper='%s'\n", THIS, root, lowerBound->toString().c_str(), upperBound->toString().c_str()); + ACTOR static Future commitSubtree(VersionedBTree *self, MutationBufferT *mutationBuffer, Reference snapshot, LogicalPageID root, const RedwoodRecordRef *lowerBound, const RedwoodRecordRef *upperBound, const RedwoodRecordRef *decodeLowerBound, const RedwoodRecordRef *decodeUpperBound) { + debug_printf("%p commitSubtree: root=%d lower=%s upper=%s\n", THIS, root, lowerBound->toString().c_str(), upperBound->toString().c_str()); + debug_printf("%p commitSubtree: root=%d decodeLower=%s decodeUpper=%s\n", THIS, root, decodeLowerBound->toString().c_str(), decodeUpperBound->toString().c_str()); self->counts.commitToPageStart++; - // If the lower bound key and the upper bound key are the same then there can't be any changes to - // this subtree since changes would happen after the upper bound key as the mutated versions would - // necessarily be higher than all previous versions - // TODO: Avoid calling commitSubtree() when this is true to avoid creating the rather large state of this actor - if(lowerBound->key == upperBound->key) { - VersionedChildrenT c({ {0,{{*lowerBound,root}}} }); - debug_printf("%p id=%u no changes, lower and upper bound keys are the same, returning %s\n", THIS, root, toString(c).c_str()); - return c; - } + // If a boundary changed, the page must be rewritten regardless of KV mutations + state bool boundaryChanged = (lowerBound != decodeLowerBound) || (upperBound != decodeUpperBound); + debug_printf("%p id=%u boundaryChanged=%d\n", THIS, root, boundaryChanged); // Find the slice of the mutation buffer that is relevant to this subtree // TODO: Rather than two lower_bound searches, perhaps just compare each mutation to the upperBound key while iterating - state MutationBufferT::const_iterator iMutationBoundary = mutationBuffer->lower_bound(lowerBound->key); + state MutationBufferT::const_iterator iMutationBoundary = mutationBuffer->upper_bound(lowerBound->key); + --iMutationBoundary; state MutationBufferT::const_iterator iMutationBoundaryEnd = mutationBuffer->lower_bound(upperBound->key); - // If the mutation buffer key found is greater than the lower bound key then go to the previous mutation - // buffer key because it may cover deletion of some keys at the start of this subtree. - if(iMutationBoundary != mutationBuffer->begin() && iMutationBoundary->first > lowerBound->key) { - --iMutationBoundary; + if(REDWOOD_DEBUG) { + self->printMutationBuffer(iMutationBoundary, iMutationBoundaryEnd); } - else { - // If the there are no mutations, we're done - if(iMutationBoundary == iMutationBoundaryEnd) { + + // If the boundary range iterators are the same then upperbound and lowerbound have the same key. + // If the key is being mutated, them remove this subtree. + if(iMutationBoundary == iMutationBoundaryEnd) { + if(!iMutationBoundary->second.startKeyMutations.empty()) { + VersionedChildrenT c; + debug_printf("%p id=%u lower and upper bound key/version match and key is modified so deleting page, returning %s\n", THIS, root, toString(c).c_str()); + return c; + } + + // If there are no forced boundary changes then this subtree is unchanged. + if(!boundaryChanged) { VersionedChildrenT c({ {0,{{*lowerBound,root}}} }); - debug_printf("%p id=%d no changes, mutation buffer start/end are the same, returning %s\n", THIS, root, toString(c).c_str()); + debug_printf("%p id=%d page contains a single key '%s' which is not changing, returning %s\n", THIS, root, lowerBound->key.toString().c_str(), toString(c).c_str()); return c; } } - // TODO: Check if entire subtree is erased and return no pages, also have the previous pages deleted as of - // the cleared version. - // Another way to have no mutations is to have a single mutation range cover this // subtree but have no changes in it MutationBufferT::const_iterator iMutationBoundaryNext = iMutationBoundary; ++iMutationBoundaryNext; - if(iMutationBoundaryNext == iMutationBoundaryEnd && + if(!boundaryChanged && iMutationBoundaryNext == iMutationBoundaryEnd && ( iMutationBoundary->second.noChanges() || ( !iMutationBoundary->second.rangeClearVersion.present() && iMutationBoundary->first < lowerBound->key) @@ -1183,9 +1192,9 @@ private: } self->counts.commitToPage++; - state Reference rawPage = wait(readPage(snapshot, root, self->m_usablePageSizeOverride, lowerBound, upperBound)); + state Reference rawPage = wait(readPage(snapshot, root, self->m_usablePageSizeOverride, decodeLowerBound, decodeUpperBound)); state BTreePage *page = (BTreePage *) rawPage->begin(); - debug_printf("%p commitSubtree(): %s\n", THIS, page->toString(false, root, snapshot->getVersion(), lowerBound, upperBound).c_str()); + debug_printf("%p commitSubtree(): %s\n", THIS, page->toString(false, root, snapshot->getVersion(), decodeLowerBound, decodeUpperBound).c_str()); BTreePage::BinaryTree::Cursor cursor = getReader(rawPage)->getCursor(); cursor.moveFirst(); @@ -1195,7 +1204,7 @@ private: VersionedChildrenT results; std::vector merged; - debug_printf("%p MERGING EXISTING DATA WITH MUTATIONS:\n", THIS); + debug_printf("%p id=%u MERGING EXISTING DATA WITH MUTATIONS:\n", THIS, root); if(REDWOOD_DEBUG) { self->printMutationBuffer(iMutationBoundary, iMutationBoundaryEnd); } @@ -1253,7 +1262,7 @@ private: if(iMutations->first < minVersion || minVersion == invalidVersion) minVersion = iMutations->first; ++changes; - merged.push_back(iMutations->second.toRecord(iMutationBoundary->first, iMutations->first)); + merged.push_back(m.toRecord(iMutationBoundary->first, iMutations->first)); debug_printf("%p: Added non-split %s [mutation, boundary start]\n", THIS, merged.back().toString().c_str()); } else { @@ -1333,7 +1342,7 @@ private: debug_printf("%p Done merging mutations into existing leaf contents, made %d changes\n", THIS, changes); // No changes were actually made. This could happen if the only mutations are clear ranges which do not match any records. - if(minVersion == invalidVersion) { + if(!boundaryChanged && minVersion == invalidVersion) { VersionedChildrenT c({ {0,{{*lowerBound,root}}} }); debug_printf("%p No changes were made during mutation merge, returning %s\n", THIS, toString(c).c_str()); ASSERT(changes == 0); @@ -1344,10 +1353,10 @@ private: // If everything in the page was deleted then this page should be deleted as of the new version // Note that if a single range clear covered the entire page then we should not get this far - if(merged.empty()) { + if(merged.empty() && root != 0) { // TODO: For multi version mode only delete this page as of the new version VersionedChildrenT c({}); - debug_printf("%p All leaf page contents were cleared, returning %s\n", THIS, toString(c).c_str()); + debug_printf("%p id=%u All leaf page contents were cleared, returning %s\n", THIS, root, toString(c).c_str()); return c; } @@ -1359,7 +1368,7 @@ private: // The new split pages will be valid as of minVersion, but the old page remains valid at the old version if(pages.size() != 1) { results.push_back( {0, {{*lowerBound, root}}} ); - debug_printf("%p Added versioned child set: %s\n", THIS, toString(results.back()).c_str()); + debug_printf("%p Added versioned child set #1: %s\n", THIS, toString(results.back()).c_str()); } else { // The page was updated but not size-split or version-split so the last page version's data @@ -1379,16 +1388,14 @@ private: self->buildNewRoot(writeVersion, pages, newPageIDs, page); } - // TODO: Can this be moved into writePages? - // TODO: This can probably be skipped for root results.push_back({writeVersion, {}}); for(int i=0; i %d\n", THIS, lower.toString().c_str(), newPageIDs[i]); + debug_printf("%p Adding page to results: %s => Page %d\n", THIS, lower.toString().c_str(), newPageIDs[i]); results.back().second.push_back( {lower, newPageIDs[i]} ); } - debug_printf("%p Added versioned child set: %s\n", THIS, toString(results.back()).c_str()); + debug_printf("%p Added versioned child set #2: %s\n", THIS, toString(results.back()).c_str()); debug_printf("%p DONE.\n", THIS); return results; @@ -1396,25 +1403,53 @@ private: else { // Internal Page + // TODO: Combine these into one vector and/or do something more elegant state std::vector> futureChildren; state std::vector childPageIDs; + state std::vector lowerBoundaries; + state std::vector decodeLowerBoundaries; + + // Track whether or not any child has had its boundaries forcibly changed + state bool childBoundariesChanged = false; // TODO: Make this much more efficient with a skip-merge through the two sorted sets (mutations, existing cursor) bool first = true; while(cursor.valid()) { // The lower bound for the first child is the lowerBound arg const RedwoodRecordRef &childLowerBound = first ? *lowerBound : cursor.get(); - if(first) - first = false; + first = false; - uint32_t pageID = *(uint32_t*)cursor.get().value.get().begin(); + // Skip over any children that do not link to a page. They exist to preserve the ancestors from + // which adjacent children can borrow prefix bytes. + // If there are any, then the first valid child page will incur a boundary change to move + // its lower bound to the left so we can delete the non-linking entry from this page to free up space. + while(!cursor.get().value.present()) { + // There should be an internal page written that has no valid child pages. This loop will find + // the first valid child link, and if there are no more then execution will not return to this loop. + ASSERT(cursor.moveNext()); + childBoundariesChanged = true; + } + + ASSERT(cursor.valid()); + + const RedwoodRecordRef &decodeChildLowerBound = cursor.get(); + + const Optional &childValue = cursor.get().value; + uint32_t pageID = *(uint32_t*)childValue.get().begin(); ASSERT(pageID != 0); - const RedwoodRecordRef &childUpperBound = cursor.moveNext() ? cursor.get() : *upperBound; + const RedwoodRecordRef &decodeChildUpperBound = cursor.moveNext() ? cursor.get() : *upperBound; - debug_printf("%p lower '%s'\n", THIS, childLowerBound.toString().c_str()); - debug_printf("%p upper '%s'\n", THIS, childUpperBound.toString().c_str()); - ASSERT(childLowerBound <= childUpperBound); + // Skip over any next-children which do not actually link to child pages + while(cursor.valid() && !cursor.get().value.present()) { + cursor.moveNext(); + childBoundariesChanged = true; + } + + const RedwoodRecordRef &childUpperBound = cursor.valid() ? cursor.get() : *upperBound; + + debug_printf("%p internal page id=%d child page id=%u lower=%s upper=%s decodeLower=%s decodeUpper=%s\n", + THIS, root, pageID, childLowerBound.toString().c_str(), childUpperBound.toString().c_str(), decodeChildLowerBound.toString().c_str(), decodeChildUpperBound.toString().c_str()); /* // TODO: If lower bound and upper bound have the same key, do something intelligent if possible @@ -1446,8 +1481,10 @@ private: futureChildren.push_back(self->commitSubtree(self, mutationBuffer, snapshot, pageID, &childLowerBound, &childUpperBound)); } */ - futureChildren.push_back(self->commitSubtree(self, mutationBuffer, snapshot, pageID, &childLowerBound, &childUpperBound)); + futureChildren.push_back(self->commitSubtree(self, mutationBuffer, snapshot, pageID, &childLowerBound, &childUpperBound, &decodeChildLowerBound, &decodeChildUpperBound)); childPageIDs.push_back(pageID); + lowerBoundaries.push_back(&childLowerBound); + decodeLowerBoundaries.push_back(&decodeChildLowerBound); } // Waiting one at a time makes debugging easier @@ -1457,28 +1494,67 @@ private: wait(success(futureChildren[k])); } + // Were any children modified? bool modified = false; + // Were all children deleted? + bool all_deleted = true; if(REDWOOD_DEBUG) { debug_printf("%p Subtree update results for id=%d\n", THIS, root); for(int i = 0; i < futureChildren.size(); ++i) { const VersionedChildrenT &children = futureChildren[i].get(); - debug_printf("%p subtree for child page id=%u: %s\n", THIS, childPageIDs[i], toString(children).c_str()); + debug_printf("%p subtree for child page id=%u lowerBound=%s: %s\n", THIS, childPageIDs[i], lowerBoundaries[i]->toString(50).c_str(), toString(children).c_str()); } } for(int i = 0; i < futureChildren.size(); ++i) { const VersionedChildrenT &children = futureChildren[i].get(); - // If the merge resulted in 1 versioned child set with exactly one child - // page, and its id is the same as the original, then no changes were made. - if(!(children.size() == 1 && children.front().second.size() == 1 && children.front().second.front().second == childPageIDs[i])) { + if(children.empty()) { modified = true; - break; + } + else { + // Not all children were deleted since this branch has a child + all_deleted = false; + + // If the merge resulted in 1 versioned child set with exactly one child + // page, and its page ID and lower bound are the same as the original, then no changes were made. + // Otherwise, a change was made so we know the page was modified and we can stop iterating. + if(!( children.size() == 1 + && children.front().second.size() == 1 + && children.front().second.front().second == childPageIDs[i] + && children.front().second.front().first == *decodeLowerBoundaries[i] + ) + ) + { + modified = true; + break; + } } } - if(!modified) { + if(childBoundariesChanged) { + modified = true; + } + + if(all_deleted) { + ASSERT(modified); + if(root == 0) { + Reference page = self->m_pager->newPageBuffer(); + makeEmptyPage(page, BTreePage::IS_LEAF, self->m_usablePageSizeOverride); + self->writePage(0, page, self->getLastCommittedVersion() + 1, &dbBegin, &dbEnd); + VersionedChildrenT c({ {0, { {dbBegin, 0} } } }); + debug_printf("%p id=%u All root page children were deleted, rewrote root as leaf, returning %s\n", THIS, root, toString(c).c_str()); + return c; + } + else { + VersionedChildrenT c({}); + debug_printf("%p id=%u All internal page children were deleted #1 so deleting this page too, returning %s\n", THIS, root, toString(c).c_str()); + return c; + } + } + + if(!boundaryChanged && !modified) { VersionedChildrenT c({{0, {{*lowerBound, root}}}}); debug_printf("%p not modified, returning %s\n", THIS, toString(c).c_str()); return c; @@ -1497,14 +1573,23 @@ private: // For each Future debug_printf("%p creating replacement pages for id=%d at Version %lld\n", THIS, root, version); - // If we're writing version 0, there is a chance that we don't have to write ourselves, if there are no changes - bool modified = version != 0; + // In multi version mode if we're writing version 0 there is a chance that we don't have to write ourselves, if there are no changes in any child subtrees + bool modified = self->singleVersion || version != 0; for(int i = 0; i < futureChildren.size(); ++i) { LogicalPageID pageID = childPageIDs[i]; const VersionedChildrenT &children = futureChildren[i].get(); if(children.empty()) { + // Subtree was deleted, but due to prefix dependencies the boundary key might still need to exist modified = true; + + // If there is a previous child and it has a subtree then this boundary key must still exist + // to continue being the upper bound of the previous child + if(!childEntries.empty() && childEntries.back().value.present()) { + RedwoodRecordRef entry(*lowerBoundaries[i]); + entry.value = Optional(); + childEntries.push_back(entry); + } continue; } @@ -1556,9 +1641,9 @@ private: // Add the children at this version to the child entries list for the current version being built. for (auto &childPage : cv->second) { - debug_printf("%p Adding child page %s\n", THIS, childPage.first.toString().c_str()); RedwoodRecordRef entry = childPage.first; entry.value = StringRef((unsigned char *)&childPage.second, sizeof(uint32_t)); + debug_printf("%p Adding child page %s\n", THIS, entry.toString().c_str()); childEntries.push_back(entry); } } @@ -1569,43 +1654,44 @@ private: // If all children were deleted then this page should be deleted as of the new version // Note that if a single range clear covered the entire page then we should not get this far if(childEntries.empty()) { - // TODO: delete page as of new version - VersionedChildrenT c({}); - debug_printf("%p All internal page children were deleted, returning %s\n", THIS, toString(c).c_str()); - return c; + if(self->singleVersion) { + debug_printf("%p All internal page children were deleted #2 at version %lld\n", THIS, version); + } + else { + VersionedKeyToPageSetT c({version, {} }); + debug_printf("%p All internal page children were deleted #3 at version %lld, adding %s\n", THIS, version, toString(c).c_str()); + result.push_back(c); + } } + else { + // TODO: Track split points across iterations of this loop, so that they don't shift unnecessarily and + // cause unnecessary path copying - // TODO: Track split points across iterations of this loop, so that they don't shift unnecessarily and - // cause unnecessary path copying + IPager *pager = self->m_pager; + std::vector pages = buildPages(false, *lowerBound, *upperBound, childEntries, 0, [pager](){ return pager->newPageBuffer(); }, self->m_usablePageSizeOverride); - IPager *pager = self->m_pager; - std::vector pages = buildPages(false, *lowerBound, *upperBound, childEntries, 0, [pager](){ return pager->newPageBuffer(); }, self->m_usablePageSizeOverride); + // Write page(s), use version 0 to replace latest version if only writing one page + Version writeVersion = self->singleVersion ? self->getLastCommittedVersion() + 1 : version; + std::vector newPageIDs = self->writePages(pages, writeVersion, root, page, upperBound, THIS); - // Write page(s), use version 0 to replace latest version if only writing one page - Version writeVersion = self->singleVersion ? self->getLastCommittedVersion() + 1 : version; - std::vector newPageIDs = self->writePages(pages, writeVersion, root, page, upperBound, THIS); + // If this commitSubtree() is operating on the root, write new levels if needed until until we're returning a single page + if(root == self->m_root) { + self->buildNewRoot(writeVersion, pages, newPageIDs, page); + } - // If this commitSubtree() is operating on the root, write new levels if needed until until we're returning a single page - if(root == self->m_root) - self->buildNewRoot(writeVersion, pages, newPageIDs, page); + result.resize(result.size()+1); + result.back().first = writeVersion; - result.resize(result.size()+1); - result.back().first = writeVersion; + for(int i=0; i 1 && result.back().second == result.end()[-2].second) { - debug_printf("%p Output same as last version, popping it.\n", THIS); - result.pop_back(); + debug_printf("%p Added versioned child set #3: %s\n", THIS, toString(result.back()).c_str()); } - debug_printf("%p Added versioned child set: %s\n", THIS, toString(result.back()).c_str()); } else { debug_printf("%p Version 0 has no changes\n", THIS); result.push_back({0, {{*lowerBound, root}}}); - debug_printf("%p Added versioned child set: %s\n", THIS, toString(result.back()).c_str()); + debug_printf("%p Added versioned child set #4: %s\n", THIS, toString(result.back()).c_str()); } if (nextVersion == std::numeric_limits::max()) @@ -1646,7 +1732,7 @@ private: self->printMutationBuffer(mutations); } - VersionedChildrenT _ = wait(commitSubtree(self, mutations, self->m_pager->getReadSnapshot(latestVersion), self->m_root, &dbBegin, &dbEnd)); + VersionedChildrenT newRoot = wait(commitSubtree(self, mutations, self->m_pager->getReadSnapshot(latestVersion), self->m_root, &dbBegin, &dbEnd, &dbBegin, &dbEnd)); self->m_pager->setLatestVersion(writeVersion); debug_printf("%s: Committing pager %lld\n", self->m_name.c_str(), writeVersion); @@ -1809,13 +1895,22 @@ private: self->ensureUnshared(); loop { - if(self->pageCursor->cursor.seekLessThanOrEqual(query)) { + bool success = self->pageCursor->cursor.seekLessThanOrEqual(query); + + // Skip backwards over internal page entries that do not link to child pages + if(!self->pageCursor->isLeaf()) { + // While record has no value, move again + while(success && !self->pageCursor->cursor.get().value.present()) { + success = self->pageCursor->cursor.movePrev(); + } + } + + if(success) { // If we found a record <= query at a leaf page then return success if(self->pageCursor->isLeaf()) { return true; } - // Otherwise move to next child page Reference child = wait(self->pageCursor->getChild(self->pager, self->usablePageSizeOverride)); self->pageCursor = child; } @@ -1837,6 +1932,14 @@ private: self->ensureUnshared(); bool success = self->pageCursor->cursor.valid() && (forward ? self->pageCursor->cursor.moveNext() : self->pageCursor->cursor.movePrev()); + // Skip over internal page entries that do not link to child pages + if(!self->pageCursor->isLeaf()) { + // While record has no value, move again + while(success && !self->pageCursor->cursor.get().value.present()) { + success = forward ? self->pageCursor->cursor.moveNext() : self->pageCursor->cursor.movePrev(); + } + } + // Stop if successful or there's no parent to move to if(success || !self->pageCursor->parent) { break; @@ -1853,6 +1956,14 @@ private: // While not on a leaf page, move down to get to one. while(!self->pageCursor->isLeaf()) { + // Skip over internal page entries that do not link to child pages + while(!self->pageCursor->cursor.get().value.present()) { + bool success = forward ? self->pageCursor->cursor.moveNext() : self->pageCursor->cursor.movePrev(); + if(!success) { + return false; + } + } + Reference child = wait(self->pageCursor->getChild(self->pager, self->usablePageSizeOverride)); bool success = forward ? child->cursor.moveFirst() : child->cursor.moveLast(); self->pageCursor = child; @@ -1887,11 +1998,20 @@ private: // Move to first or last record in the page bool success = begin ? self->pageCursor->cursor.moveFirst() : self->pageCursor->cursor.moveLast(); + // Skip over internal page entries that do not link to child pages + if(!self->pageCursor->isLeaf()) { + // While record has no value, move past it + while(success && !self->pageCursor->cursor.get().value.present()) { + success = begin ? self->pageCursor->cursor.moveNext() : self->pageCursor->cursor.movePrev(); + } + } + // If it worked, return true if we've reached a leaf page otherwise go to the next child if(success) { if(self->pageCursor->isLeaf()) { return true; } + Reference child = wait(self->pageCursor->getChild(self->pager, self->usablePageSizeOverride)); self->pageCursor = child; } @@ -1987,7 +2107,7 @@ private: // for equal use cmp == 0 ACTOR static Future find_impl(Cursor *self, KeyRef key, bool needValue, int cmp) { // Search for the last key at or before (key, version, \xff) - state RedwoodRecordRef query(key, self->m_version); + state RedwoodRecordRef query(key, self->m_version, {}, 0, std::numeric_limits::max()); self->m_kv.reset(); wait(success(self->m_cur1.seekLTE(query))); @@ -2380,8 +2500,8 @@ ACTOR Future verifyRange(VersionedBTree *btree, Key start, Key end, Version // Randomly use the cursor for something else first. if(g_random->coinflip()) { - debug_printf("VerifyRange(@%lld, %s, %s): Dummy seek\n", v, start.toString().c_str(), end.toString().c_str()); state Key randomKey = randomKV().key; + debug_printf("VerifyRange(@%lld, %s, %s): Dummy seek to '%s'\n", v, start.toString().c_str(), end.toString().c_str(), randomKey.toString().c_str()); wait(g_random->coinflip() ? cur->findFirstEqualOrGreater(randomKey, true, 0) : cur->findLastLessOrEqual(randomKey, true, 0)); } @@ -2397,6 +2517,7 @@ ACTOR Future verifyRange(VersionedBTree *btree, Key start, Key end, Version if(i == iEnd) break; ++i; + if(iLast->first.second <= v && iLast->second.present() && ( @@ -2404,8 +2525,10 @@ ACTOR Future verifyRange(VersionedBTree *btree, Key start, Key end, Version || i->first.first != iLast->first.first || i->first.second > v ) - ) + ) { + debug_printf("VerifyRange(@%lld, %s, %s) Found key in written map: %s\n", v, start.toString().c_str(), end.toString().c_str(), iLast->first.first.c_str()); break; + } } if(iLast == iEnd) { @@ -2428,6 +2551,8 @@ ACTOR Future verifyRange(VersionedBTree *btree, Key start, Key end, Version break; } + ASSERT(errors == 0); + results.push_back(KeyValue(KeyValueRef(cur->getKey(), cur->getValue()))); wait(cur->next(true)); } @@ -2545,7 +2670,7 @@ ACTOR Future verifyAll(VersionedBTree *btree, Version maxCommittedVersion, return errors; } -ACTOR Future verify(VersionedBTree *btree, FutureStream vStream, std::map, Optional> *written, int *pErrorCount) { +ACTOR Future verify(VersionedBTree *btree, FutureStream vStream, std::map, Optional> *written, int *pErrorCount, bool serial) { state Future vall; state Future vrange; @@ -2557,12 +2682,24 @@ ACTOR Future verify(VersionedBTree *btree, FutureStream vStream, v = btree->getLastCommittedVersion(); debug_printf("Verifying at latest committed version %lld\n", v); vall = verifyRange(btree, LiteralStringRef(""), LiteralStringRef("\xff\xff"), v, written, pErrorCount); + if(serial) { + wait(success(vall)); + } vrange = verifyRange(btree, randomKV().key, randomKV().key, v, written, pErrorCount); + if(serial) { + wait(success(vrange)); + } } else { debug_printf("Verifying through version %lld\n", v); vall = verifyAll(btree, v, written, pErrorCount); + if(serial) { + wait(success(vall)); + } vrange = verifyRange(btree, randomKV().key, randomKV().key, g_random->randomInt(1, v + 1), written, pErrorCount); + if(serial) { + wait(success(vrange)); + } } wait(success(vall) && success(vrange)); @@ -2880,6 +3017,12 @@ TEST_CASE("!/redwood/correctness") { state std::string pagerFile = "unittest_pageFile"; IPager *pager; + state bool serialTest = g_random->coinflip(); + state bool shortTest = g_random->coinflip(); + state bool singleVersion = true; // Multi-version mode is broken / not finished + + printf("serialTest: %d shortTest: %d singleVersion: %d\n", serialTest, shortTest, singleVersion); + if(useDisk) { printf("Deleting existing test data...\n"); deleteFile(pagerFile); @@ -2891,8 +3034,7 @@ TEST_CASE("!/redwood/correctness") { pager = createMemoryPager(); printf("Initializing...\n"); - state int pageSize = g_random->coinflip() ? pager->getUsablePageSize() : g_random->randomInt(200, 400); - state bool singleVersion = g_random->random01() < .05; + state int pageSize = shortTest ? 200 : (g_random->coinflip() ? pager->getUsablePageSize() : g_random->randomInt(200, 400)); state VersionedBTree *btree = new VersionedBTree(pager, pagerFile, singleVersion, pageSize); wait(btree->init()); @@ -2900,10 +3042,10 @@ TEST_CASE("!/redwood/correctness") { // a situation where the tree cannot be grown upward with decreasing level size. // TODO: Handle arbitrarily large keys state int maxKeySize = g_random->randomInt(4, pageSize * 2); - state int maxValueSize = g_random->randomInt(0, pageSize * 2); - state int maxCommitSize = 5e6; - state int mutationBytesTarget = randomSize(50e6); - state double clearChance = g_random->random01() * .001; // at most 1 in 1000 + state int maxValueSize = g_random->randomInt(0, pageSize * 4); + state int maxCommitSize = shortTest ? 1000 : randomSize(10e6); + state int mutationBytesTarget = shortTest ? 5000 : randomSize(50e6); + state double clearChance = g_random->random01() * .01; // at most 1 in 100 printf("Using page size %d, max key size %d, max value size %d, clearchance %f, total mutation byte target %d\n", pageSize, maxKeySize, maxValueSize, clearChance, mutationBytesTarget); @@ -2927,8 +3069,8 @@ TEST_CASE("!/redwood/correctness") { state int mutationBytesTargetThisCommit = randomSize(maxCommitSize); state PromiseStream committedVersions; - state Future verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount); - state Future randomTask = randomReader(btree) || btree->getError(); + state Future verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount, serialTest); + state Future randomTask = serialTest ? Void() : (randomReader(btree) || btree->getError()); state Future commit = Void(); @@ -2964,7 +3106,7 @@ TEST_CASE("!/redwood/correctness") { ++rangeClears; KeyRangeRef range(start, end); - debug_printf(" Clear '%s' to '%s' @%lld\n", start.toString().c_str(), end.toString().c_str(), version); + debug_printf(" Mutation: Clear '%s' to '%s' @%lld\n", start.toString().c_str(), end.toString().c_str(), version); auto e = written.lower_bound(std::make_pair(start.toString(), 0)); if(e != written.end()) { auto last = e; @@ -2974,7 +3116,7 @@ TEST_CASE("!/redwood/correctness") { ++e; // If e key is different from last and last was present then insert clear for last's key at version if(last != eEnd && ((e == eEnd || e->first.first != last->first.first) && last->second.present())) { - debug_printf(" Clearing key '%s' @%lld\n", last->first.first.c_str(), version); + debug_printf(" Mutation: Clearing key '%s' @%lld\n", last->first.first.c_str(), version); keyBytesCleared += last->first.first.size(); mutationBytes += last->first.first.size(); @@ -3004,7 +3146,7 @@ TEST_CASE("!/redwood/correctness") { kv.key = StringRef(kv.arena(), *i); } - debug_printf(" Set '%s' -> '%s' @%lld\n", kv.key.toString().c_str(), kv.value.toString().c_str(), version); + debug_printf(" Mutation: Set '%s' -> '%s' @%lld\n", kv.key.toString().c_str(), kv.value.toString().c_str(), version); ++sets; keyBytesInserted += kv.key.size(); @@ -3039,11 +3181,21 @@ TEST_CASE("!/redwood/correctness") { return Void(); }); + if(serialTest) { + // Wait for commit, wait for verification, then start new verification + wait(commit); + committedVersions.sendError(end_of_stream()); + debug_printf("Waiting for verification to complete.\n"); + wait(verifyTask); + committedVersions = PromiseStream(); + verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount, serialTest); + } + mutationBytesThisCommit = 0; mutationBytesTargetThisCommit = randomSize(maxCommitSize); // Recover from disk at random - if(useDisk && g_random->random01() < .02) { + if(!serialTest && useDisk && g_random->random01() < .02) { printf("Recovering from disk.\n"); // Wait for outstanding commit @@ -3070,7 +3222,7 @@ TEST_CASE("!/redwood/correctness") { // Create new promise stream and start the verifier again committedVersions = PromiseStream(); - verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount); + verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount, serialTest); randomTask = randomReader(btree) || btree->getError(); }