Checkpointing progress on single-version mode in VersionedBTree. Subtree clears now work, preserving internal page boundary keys when necessary. Multi-version mode is unfortunately now broken, in addition to being incomplete. Added serial and simple btree unit test options.

This commit is contained in:
Stephen Atherton 2019-05-21 19:16:32 -07:00
parent 2801298ae8
commit 3f8fce0296
1 changed files with 277 additions and 125 deletions

View File

@ -52,6 +52,10 @@ struct RedwoodRecordRef {
}
}
RedwoodRecordRef withoutValue() const {
return RedwoodRecordRef(key, version, {}, chunk.total, chunk.start);
}
KeyRef key;
Version version;
Optional<ValueRef> value;
@ -138,20 +142,19 @@ struct RedwoodRecordRef {
#pragma pack(pop)
int compare(const RedwoodRecordRef &rhs) const {
//printf("compare %s to %s\n", toString().c_str(), rhs.toString().c_str());
int cmp = key.compare(rhs.key);
if(cmp == 0) {
cmp = version - rhs.version;
if(cmp == 0) {
// Absent value sorts higher than present (for reasons)
if(value.present() != rhs.value.present()) {
cmp = value.present() ? -1 : 1;
}
else {
// Chunked (represented by chunk.total > 0) sorts higher than whole
// It is assumed that in any data set there will never be more than one
// unique chunk total size for the same key and version, so sort by start, total
// Chunked (represented by chunk.total > 0) sorts higher than whole
cmp = chunk.start - rhs.chunk.start;
if(cmp == 0) {
cmp = chunk.total - rhs.chunk.total;
if(cmp == 0) {
cmp = chunk.start - rhs.chunk.start;
// No-value sorts AFTER having a value.
cmp = (value.present() ? 0 : 1) - (rhs.value.present() ? 0 : 1);
}
}
}
@ -235,14 +238,12 @@ struct RedwoodRecordRef {
std::string toString(int hexLimit = 15) const {
std::string r;
r += format("'%s' @%lld ", kvformat(key, hexLimit).c_str(), version);
if(isMultiPart()) {
r += format("[%d/%d] ", chunk.start, chunk.total);
}
r += format("[%d/%d] ", chunk.start, chunk.total);
if(value.present()) {
r += format("-> '%s'", kvformat(value.get(), hexLimit).c_str());
}
else {
r += "-> <cleared>";
r += "-> <not_present>";
}
return r;
}
@ -303,10 +304,13 @@ struct BTreePage {
r += " ";
if(!(flags & IS_LEAF)) {
RedwoodRecordRef rec = c.get();
ASSERT(rec.value.present() && rec.value.get().size() == sizeof(uint32_t));
uint32_t id = *(const uint32_t *)rec.value.get().begin();
std::string val = format("[Page id=%u]", id);
rec.value = val;
std::string val;
if(rec.value.present()) {
ASSERT(rec.value.get().size() == sizeof(uint32_t));
uint32_t id = *(const uint32_t *)rec.value.get().begin();
val = format("[Page id=%u]", id);
rec.value = val;
}
r += rec.toString();
}
else {
@ -315,6 +319,8 @@ struct BTreePage {
r += "\n";
ASSERT(c.get().key >= lowerBound->key && c.get().key <= upperBound->key);
} while(c.moveNext());
}
} catch(Error &e) {
@ -327,7 +333,7 @@ struct BTreePage {
}
};
static void writeEmptyPage(Reference<IPage> page, uint8_t newFlags, int pageSize) {
static void makeEmptyPage(Reference<IPage> page, uint8_t newFlags, int pageSize) {
VALGRIND_MAKE_MEM_DEFINED(page->begin(), page->size());
BTreePage *btpage = (BTreePage *)page->begin();
btpage->flags = newFlags;
@ -371,7 +377,7 @@ static std::vector<BoundaryAndPage> buildPages(bool minimalBoundaries, const Red
int i = 0;
const int iEnd = entries.size();
// Lower bound of the page being added to
RedwoodRecordRef pageLowerBound = lowerBound;
RedwoodRecordRef pageLowerBound = lowerBound.withoutValue();
RedwoodRecordRef pageUpperBound;
while(i <= iEnd) {
@ -380,7 +386,7 @@ static std::vector<BoundaryAndPage> buildPages(bool minimalBoundaries, const Red
// If not the end, add i to the page if necessary
if(end) {
pageUpperBound = upperBound;
pageUpperBound = upperBound.withoutValue();
}
else {
// Get delta from previous record
@ -420,6 +426,7 @@ static std::vector<BoundaryAndPage> buildPages(bool minimalBoundaries, const Red
if(!fits) {
// Flush page
if(minimalBoundaries) {
// TODO: Write minimal boundaries
// Note that prefixLen is guaranteed to be < entry.key.size() because entries are in increasing order and cannot repeat.
// int len = prefixLen + 1;
// if(entry.key[prefixLen] == 0)
@ -427,7 +434,7 @@ static std::vector<BoundaryAndPage> buildPages(bool minimalBoundaries, const Red
// pageUpperBound = entry.key.substr(0, len);
}
else {
pageUpperBound = entry;
pageUpperBound = entry.withoutValue();
}
}
}
@ -450,7 +457,7 @@ static std::vector<BoundaryAndPage> buildPages(bool minimalBoundaries, const Red
// If not writing the final page, reduce entry count of page by a third
if(!end) {
i -= count / 3;
pageUpperBound = entries[i];
pageUpperBound = entries[i].withoutValue();
}
debug_printf("Flushing page start=%d i=%d count=%d\nlower: %s\nupper: %s\n", start, i, count, pageLowerBound.toString().c_str(), pageUpperBound.toString().c_str());
@ -458,11 +465,11 @@ static std::vector<BoundaryAndPage> buildPages(bool minimalBoundaries, const Red
for(int j = start; j < i; ++j) {
debug_printf(" %3d: %s\n", j, entries[j].toString().c_str());
if(j > start) {
ASSERT(entries[j] > entries[j - 1]);
//ASSERT(entries[j] > entries[j - 1]);
}
}
ASSERT(pageLowerBound.key <= pageUpperBound.key);
#endif
ASSERT(pageLowerBound <= pageUpperBound);
union {
BTreePage *btPage;
@ -523,7 +530,7 @@ static std::vector<BoundaryAndPage> buildPages(bool minimalBoundaries, const Red
start = i;
kvBytes = 0;
compressedBytes = BTreePage::BinaryTree::GetTreeOverhead();
pageLowerBound = pageUpperBound;
pageLowerBound = pageUpperBound.withoutValue();
}
}
@ -620,7 +627,7 @@ public:
changes[0] = SingleKeyMutation(keyValue.value);
}
else {
changes.begin()->second.value = keyValue.value;
changes.begin()->second = SingleKeyMutation(keyValue.value);
}
}
else {
@ -639,6 +646,7 @@ public:
if(singleVersion) {
RangeMutation &range = iBegin->second;
range.startKeyMutations.clear();
range.startKeyMutations[0] = SingleKeyMutation();
range.rangeClearVersion = 0;
++iBegin;
m_pBuffer->erase(iBegin, iEnd);
@ -701,7 +709,7 @@ public:
if(latest == 0) {
++latest;
Reference<IPage> page = self->m_pager->newPageBuffer();
writeEmptyPage(page, BTreePage::IS_LEAF, self->m_usablePageSizeOverride);
makeEmptyPage(page, BTreePage::IS_LEAF, self->m_usablePageSizeOverride);
self->writePage(self->m_root, page, latest, &dbBegin, &dbEnd);
self->m_pager->setLatestVersion(latest);
wait(self->m_pager->commit());
@ -1091,7 +1099,7 @@ private:
};
ACTOR static Future<Reference<const IPage>> readPage(Reference<IPagerSnapshot> snapshot, LogicalPageID id, int usablePageSize, const RedwoodRecordRef *lowerBound, const RedwoodRecordRef *upperBound) {
debug_printf("readPage() op=read id=%u @%lld\n", id, snapshot->getVersion());
debug_printf("readPage() op=read id=%u @%lld lower=%s upper=%s\n", id, snapshot->getVersion(), lowerBound->toString().c_str(), upperBound->toString().c_str());
wait(delay(0, TaskDiskRead));
state Reference<const IPage> result = wait(snapshot->getPhysicalPage(id));
@ -1117,6 +1125,7 @@ private:
}
if(result->userData == nullptr) {
debug_printf("readPage() Creating Reader for page id=%u @%lld lower=%s upper=%s\n", id, snapshot->getVersion(), lowerBound->toString().c_str(), upperBound->toString().c_str());
result->userData = new BTreePage::BinaryTree::Reader(&pTreePage->tree(), lowerBound, upperBound);
result->userDataDestructor = [](void *ptr) { delete (BTreePage::BinaryTree::Reader *)ptr; };
}
@ -1131,47 +1140,47 @@ private:
// Returns list of (version, list of (lower_bound, list of children) )
// TODO: Probably should pass prev/next records by pointer in many places
ACTOR static Future<VersionedChildrenT> commitSubtree(VersionedBTree *self, MutationBufferT *mutationBuffer, Reference<IPagerSnapshot> snapshot, LogicalPageID root, const RedwoodRecordRef *lowerBound, const RedwoodRecordRef *upperBound, const RedwoodRecordRef *newLowerBound = nullptr, const RedwoodRecordRef *newUpperBound = nullptr) {
debug_printf("%p commitSubtree: root=%d lower='%s' upper='%s'\n", THIS, root, lowerBound->toString().c_str(), upperBound->toString().c_str());
ACTOR static Future<VersionedChildrenT> commitSubtree(VersionedBTree *self, MutationBufferT *mutationBuffer, Reference<IPagerSnapshot> snapshot, LogicalPageID root, const RedwoodRecordRef *lowerBound, const RedwoodRecordRef *upperBound, const RedwoodRecordRef *decodeLowerBound, const RedwoodRecordRef *decodeUpperBound) {
debug_printf("%p commitSubtree: root=%d lower=%s upper=%s\n", THIS, root, lowerBound->toString().c_str(), upperBound->toString().c_str());
debug_printf("%p commitSubtree: root=%d decodeLower=%s decodeUpper=%s\n", THIS, root, decodeLowerBound->toString().c_str(), decodeUpperBound->toString().c_str());
self->counts.commitToPageStart++;
// If the lower bound key and the upper bound key are the same then there can't be any changes to
// this subtree since changes would happen after the upper bound key as the mutated versions would
// necessarily be higher than all previous versions
// TODO: Avoid calling commitSubtree() when this is true to avoid creating the rather large state of this actor
if(lowerBound->key == upperBound->key) {
VersionedChildrenT c({ {0,{{*lowerBound,root}}} });
debug_printf("%p id=%u no changes, lower and upper bound keys are the same, returning %s\n", THIS, root, toString(c).c_str());
return c;
}
// If a boundary changed, the page must be rewritten regardless of KV mutations
state bool boundaryChanged = (lowerBound != decodeLowerBound) || (upperBound != decodeUpperBound);
debug_printf("%p id=%u boundaryChanged=%d\n", THIS, root, boundaryChanged);
// Find the slice of the mutation buffer that is relevant to this subtree
// TODO: Rather than two lower_bound searches, perhaps just compare each mutation to the upperBound key while iterating
state MutationBufferT::const_iterator iMutationBoundary = mutationBuffer->lower_bound(lowerBound->key);
state MutationBufferT::const_iterator iMutationBoundary = mutationBuffer->upper_bound(lowerBound->key);
--iMutationBoundary;
state MutationBufferT::const_iterator iMutationBoundaryEnd = mutationBuffer->lower_bound(upperBound->key);
// If the mutation buffer key found is greater than the lower bound key then go to the previous mutation
// buffer key because it may cover deletion of some keys at the start of this subtree.
if(iMutationBoundary != mutationBuffer->begin() && iMutationBoundary->first > lowerBound->key) {
--iMutationBoundary;
if(REDWOOD_DEBUG) {
self->printMutationBuffer(iMutationBoundary, iMutationBoundaryEnd);
}
else {
// If the there are no mutations, we're done
if(iMutationBoundary == iMutationBoundaryEnd) {
// If the boundary range iterators are the same then upperbound and lowerbound have the same key.
// If the key is being mutated, them remove this subtree.
if(iMutationBoundary == iMutationBoundaryEnd) {
if(!iMutationBoundary->second.startKeyMutations.empty()) {
VersionedChildrenT c;
debug_printf("%p id=%u lower and upper bound key/version match and key is modified so deleting page, returning %s\n", THIS, root, toString(c).c_str());
return c;
}
// If there are no forced boundary changes then this subtree is unchanged.
if(!boundaryChanged) {
VersionedChildrenT c({ {0,{{*lowerBound,root}}} });
debug_printf("%p id=%d no changes, mutation buffer start/end are the same, returning %s\n", THIS, root, toString(c).c_str());
debug_printf("%p id=%d page contains a single key '%s' which is not changing, returning %s\n", THIS, root, lowerBound->key.toString().c_str(), toString(c).c_str());
return c;
}
}
// TODO: Check if entire subtree is erased and return no pages, also have the previous pages deleted as of
// the cleared version.
// Another way to have no mutations is to have a single mutation range cover this
// subtree but have no changes in it
MutationBufferT::const_iterator iMutationBoundaryNext = iMutationBoundary;
++iMutationBoundaryNext;
if(iMutationBoundaryNext == iMutationBoundaryEnd &&
if(!boundaryChanged && iMutationBoundaryNext == iMutationBoundaryEnd &&
( iMutationBoundary->second.noChanges() ||
( !iMutationBoundary->second.rangeClearVersion.present() &&
iMutationBoundary->first < lowerBound->key)
@ -1183,9 +1192,9 @@ private:
}
self->counts.commitToPage++;
state Reference<const IPage> rawPage = wait(readPage(snapshot, root, self->m_usablePageSizeOverride, lowerBound, upperBound));
state Reference<const IPage> rawPage = wait(readPage(snapshot, root, self->m_usablePageSizeOverride, decodeLowerBound, decodeUpperBound));
state BTreePage *page = (BTreePage *) rawPage->begin();
debug_printf("%p commitSubtree(): %s\n", THIS, page->toString(false, root, snapshot->getVersion(), lowerBound, upperBound).c_str());
debug_printf("%p commitSubtree(): %s\n", THIS, page->toString(false, root, snapshot->getVersion(), decodeLowerBound, decodeUpperBound).c_str());
BTreePage::BinaryTree::Cursor cursor = getReader(rawPage)->getCursor();
cursor.moveFirst();
@ -1195,7 +1204,7 @@ private:
VersionedChildrenT results;
std::vector<RedwoodRecordRef> merged;
debug_printf("%p MERGING EXISTING DATA WITH MUTATIONS:\n", THIS);
debug_printf("%p id=%u MERGING EXISTING DATA WITH MUTATIONS:\n", THIS, root);
if(REDWOOD_DEBUG) {
self->printMutationBuffer(iMutationBoundary, iMutationBoundaryEnd);
}
@ -1253,7 +1262,7 @@ private:
if(iMutations->first < minVersion || minVersion == invalidVersion)
minVersion = iMutations->first;
++changes;
merged.push_back(iMutations->second.toRecord(iMutationBoundary->first, iMutations->first));
merged.push_back(m.toRecord(iMutationBoundary->first, iMutations->first));
debug_printf("%p: Added non-split %s [mutation, boundary start]\n", THIS, merged.back().toString().c_str());
}
else {
@ -1333,7 +1342,7 @@ private:
debug_printf("%p Done merging mutations into existing leaf contents, made %d changes\n", THIS, changes);
// No changes were actually made. This could happen if the only mutations are clear ranges which do not match any records.
if(minVersion == invalidVersion) {
if(!boundaryChanged && minVersion == invalidVersion) {
VersionedChildrenT c({ {0,{{*lowerBound,root}}} });
debug_printf("%p No changes were made during mutation merge, returning %s\n", THIS, toString(c).c_str());
ASSERT(changes == 0);
@ -1344,10 +1353,10 @@ private:
// If everything in the page was deleted then this page should be deleted as of the new version
// Note that if a single range clear covered the entire page then we should not get this far
if(merged.empty()) {
if(merged.empty() && root != 0) {
// TODO: For multi version mode only delete this page as of the new version
VersionedChildrenT c({});
debug_printf("%p All leaf page contents were cleared, returning %s\n", THIS, toString(c).c_str());
debug_printf("%p id=%u All leaf page contents were cleared, returning %s\n", THIS, root, toString(c).c_str());
return c;
}
@ -1359,7 +1368,7 @@ private:
// The new split pages will be valid as of minVersion, but the old page remains valid at the old version
if(pages.size() != 1) {
results.push_back( {0, {{*lowerBound, root}}} );
debug_printf("%p Added versioned child set: %s\n", THIS, toString(results.back()).c_str());
debug_printf("%p Added versioned child set #1: %s\n", THIS, toString(results.back()).c_str());
}
else {
// The page was updated but not size-split or version-split so the last page version's data
@ -1379,16 +1388,14 @@ private:
self->buildNewRoot(writeVersion, pages, newPageIDs, page);
}
// TODO: Can this be moved into writePages?
// TODO: This can probably be skipped for root
results.push_back({writeVersion, {}});
for(int i=0; i<pages.size(); i++) {
// The lower bound of the first page is the lower bound of the subtree, not the first entry in the page
const RedwoodRecordRef &lower = (i == 0) ? *lowerBound : pages[i].lowerBound;
debug_printf("%p Adding page to results: %s => %d\n", THIS, lower.toString().c_str(), newPageIDs[i]);
debug_printf("%p Adding page to results: %s => Page %d\n", THIS, lower.toString().c_str(), newPageIDs[i]);
results.back().second.push_back( {lower, newPageIDs[i]} );
}
debug_printf("%p Added versioned child set: %s\n", THIS, toString(results.back()).c_str());
debug_printf("%p Added versioned child set #2: %s\n", THIS, toString(results.back()).c_str());
debug_printf("%p DONE.\n", THIS);
return results;
@ -1396,25 +1403,53 @@ private:
else {
// Internal Page
// TODO: Combine these into one vector and/or do something more elegant
state std::vector<Future<VersionedChildrenT>> futureChildren;
state std::vector<LogicalPageID> childPageIDs;
state std::vector<const RedwoodRecordRef *> lowerBoundaries;
state std::vector<const RedwoodRecordRef *> decodeLowerBoundaries;
// Track whether or not any child has had its boundaries forcibly changed
state bool childBoundariesChanged = false;
// TODO: Make this much more efficient with a skip-merge through the two sorted sets (mutations, existing cursor)
bool first = true;
while(cursor.valid()) {
// The lower bound for the first child is the lowerBound arg
const RedwoodRecordRef &childLowerBound = first ? *lowerBound : cursor.get();
if(first)
first = false;
first = false;
uint32_t pageID = *(uint32_t*)cursor.get().value.get().begin();
// Skip over any children that do not link to a page. They exist to preserve the ancestors from
// which adjacent children can borrow prefix bytes.
// If there are any, then the first valid child page will incur a boundary change to move
// its lower bound to the left so we can delete the non-linking entry from this page to free up space.
while(!cursor.get().value.present()) {
// There should be an internal page written that has no valid child pages. This loop will find
// the first valid child link, and if there are no more then execution will not return to this loop.
ASSERT(cursor.moveNext());
childBoundariesChanged = true;
}
ASSERT(cursor.valid());
const RedwoodRecordRef &decodeChildLowerBound = cursor.get();
const Optional<ValueRef> &childValue = cursor.get().value;
uint32_t pageID = *(uint32_t*)childValue.get().begin();
ASSERT(pageID != 0);
const RedwoodRecordRef &childUpperBound = cursor.moveNext() ? cursor.get() : *upperBound;
const RedwoodRecordRef &decodeChildUpperBound = cursor.moveNext() ? cursor.get() : *upperBound;
debug_printf("%p lower '%s'\n", THIS, childLowerBound.toString().c_str());
debug_printf("%p upper '%s'\n", THIS, childUpperBound.toString().c_str());
ASSERT(childLowerBound <= childUpperBound);
// Skip over any next-children which do not actually link to child pages
while(cursor.valid() && !cursor.get().value.present()) {
cursor.moveNext();
childBoundariesChanged = true;
}
const RedwoodRecordRef &childUpperBound = cursor.valid() ? cursor.get() : *upperBound;
debug_printf("%p internal page id=%d child page id=%u lower=%s upper=%s decodeLower=%s decodeUpper=%s\n",
THIS, root, pageID, childLowerBound.toString().c_str(), childUpperBound.toString().c_str(), decodeChildLowerBound.toString().c_str(), decodeChildUpperBound.toString().c_str());
/*
// TODO: If lower bound and upper bound have the same key, do something intelligent if possible
@ -1446,8 +1481,10 @@ private:
futureChildren.push_back(self->commitSubtree(self, mutationBuffer, snapshot, pageID, &childLowerBound, &childUpperBound));
}
*/
futureChildren.push_back(self->commitSubtree(self, mutationBuffer, snapshot, pageID, &childLowerBound, &childUpperBound));
futureChildren.push_back(self->commitSubtree(self, mutationBuffer, snapshot, pageID, &childLowerBound, &childUpperBound, &decodeChildLowerBound, &decodeChildUpperBound));
childPageIDs.push_back(pageID);
lowerBoundaries.push_back(&childLowerBound);
decodeLowerBoundaries.push_back(&decodeChildLowerBound);
}
// Waiting one at a time makes debugging easier
@ -1457,28 +1494,67 @@ private:
wait(success(futureChildren[k]));
}
// Were any children modified?
bool modified = false;
// Were all children deleted?
bool all_deleted = true;
if(REDWOOD_DEBUG) {
debug_printf("%p Subtree update results for id=%d\n", THIS, root);
for(int i = 0; i < futureChildren.size(); ++i) {
const VersionedChildrenT &children = futureChildren[i].get();
debug_printf("%p subtree for child page id=%u: %s\n", THIS, childPageIDs[i], toString(children).c_str());
debug_printf("%p subtree for child page id=%u lowerBound=%s: %s\n", THIS, childPageIDs[i], lowerBoundaries[i]->toString(50).c_str(), toString(children).c_str());
}
}
for(int i = 0; i < futureChildren.size(); ++i) {
const VersionedChildrenT &children = futureChildren[i].get();
// If the merge resulted in 1 versioned child set with exactly one child
// page, and its id is the same as the original, then no changes were made.
if(!(children.size() == 1 && children.front().second.size() == 1 && children.front().second.front().second == childPageIDs[i])) {
if(children.empty()) {
modified = true;
break;
}
else {
// Not all children were deleted since this branch has a child
all_deleted = false;
// If the merge resulted in 1 versioned child set with exactly one child
// page, and its page ID and lower bound are the same as the original, then no changes were made.
// Otherwise, a change was made so we know the page was modified and we can stop iterating.
if(!( children.size() == 1
&& children.front().second.size() == 1
&& children.front().second.front().second == childPageIDs[i]
&& children.front().second.front().first == *decodeLowerBoundaries[i]
)
)
{
modified = true;
break;
}
}
}
if(!modified) {
if(childBoundariesChanged) {
modified = true;
}
if(all_deleted) {
ASSERT(modified);
if(root == 0) {
Reference<IPage> page = self->m_pager->newPageBuffer();
makeEmptyPage(page, BTreePage::IS_LEAF, self->m_usablePageSizeOverride);
self->writePage(0, page, self->getLastCommittedVersion() + 1, &dbBegin, &dbEnd);
VersionedChildrenT c({ {0, { {dbBegin, 0} } } });
debug_printf("%p id=%u All root page children were deleted, rewrote root as leaf, returning %s\n", THIS, root, toString(c).c_str());
return c;
}
else {
VersionedChildrenT c({});
debug_printf("%p id=%u All internal page children were deleted #1 so deleting this page too, returning %s\n", THIS, root, toString(c).c_str());
return c;
}
}
if(!boundaryChanged && !modified) {
VersionedChildrenT c({{0, {{*lowerBound, root}}}});
debug_printf("%p not modified, returning %s\n", THIS, toString(c).c_str());
return c;
@ -1497,14 +1573,23 @@ private:
// For each Future<VersionedChildrenT>
debug_printf("%p creating replacement pages for id=%d at Version %lld\n", THIS, root, version);
// If we're writing version 0, there is a chance that we don't have to write ourselves, if there are no changes
bool modified = version != 0;
// In multi version mode if we're writing version 0 there is a chance that we don't have to write ourselves, if there are no changes in any child subtrees
bool modified = self->singleVersion || version != 0;
for(int i = 0; i < futureChildren.size(); ++i) {
LogicalPageID pageID = childPageIDs[i];
const VersionedChildrenT &children = futureChildren[i].get();
if(children.empty()) {
// Subtree was deleted, but due to prefix dependencies the boundary key might still need to exist
modified = true;
// If there is a previous child and it has a subtree then this boundary key must still exist
// to continue being the upper bound of the previous child
if(!childEntries.empty() && childEntries.back().value.present()) {
RedwoodRecordRef entry(*lowerBoundaries[i]);
entry.value = Optional<ValueRef>();
childEntries.push_back(entry);
}
continue;
}
@ -1556,9 +1641,9 @@ private:
// Add the children at this version to the child entries list for the current version being built.
for (auto &childPage : cv->second) {
debug_printf("%p Adding child page %s\n", THIS, childPage.first.toString().c_str());
RedwoodRecordRef entry = childPage.first;
entry.value = StringRef((unsigned char *)&childPage.second, sizeof(uint32_t));
debug_printf("%p Adding child page %s\n", THIS, entry.toString().c_str());
childEntries.push_back(entry);
}
}
@ -1569,43 +1654,44 @@ private:
// If all children were deleted then this page should be deleted as of the new version
// Note that if a single range clear covered the entire page then we should not get this far
if(childEntries.empty()) {
// TODO: delete page as of new version
VersionedChildrenT c({});
debug_printf("%p All internal page children were deleted, returning %s\n", THIS, toString(c).c_str());
return c;
if(self->singleVersion) {
debug_printf("%p All internal page children were deleted #2 at version %lld\n", THIS, version);
}
else {
VersionedKeyToPageSetT c({version, {} });
debug_printf("%p All internal page children were deleted #3 at version %lld, adding %s\n", THIS, version, toString(c).c_str());
result.push_back(c);
}
}
else {
// TODO: Track split points across iterations of this loop, so that they don't shift unnecessarily and
// cause unnecessary path copying
// TODO: Track split points across iterations of this loop, so that they don't shift unnecessarily and
// cause unnecessary path copying
IPager *pager = self->m_pager;
std::vector<BoundaryAndPage> pages = buildPages(false, *lowerBound, *upperBound, childEntries, 0, [pager](){ return pager->newPageBuffer(); }, self->m_usablePageSizeOverride);
IPager *pager = self->m_pager;
std::vector<BoundaryAndPage> pages = buildPages(false, *lowerBound, *upperBound, childEntries, 0, [pager](){ return pager->newPageBuffer(); }, self->m_usablePageSizeOverride);
// Write page(s), use version 0 to replace latest version if only writing one page
Version writeVersion = self->singleVersion ? self->getLastCommittedVersion() + 1 : version;
std::vector<LogicalPageID> newPageIDs = self->writePages(pages, writeVersion, root, page, upperBound, THIS);
// Write page(s), use version 0 to replace latest version if only writing one page
Version writeVersion = self->singleVersion ? self->getLastCommittedVersion() + 1 : version;
std::vector<LogicalPageID> newPageIDs = self->writePages(pages, writeVersion, root, page, upperBound, THIS);
// If this commitSubtree() is operating on the root, write new levels if needed until until we're returning a single page
if(root == self->m_root) {
self->buildNewRoot(writeVersion, pages, newPageIDs, page);
}
// If this commitSubtree() is operating on the root, write new levels if needed until until we're returning a single page
if(root == self->m_root)
self->buildNewRoot(writeVersion, pages, newPageIDs, page);
result.resize(result.size()+1);
result.back().first = writeVersion;
result.resize(result.size()+1);
result.back().first = writeVersion;
for(int i=0; i<pages.size(); i++)
result.back().second.push_back( {pages[i].lowerBound, newPageIDs[i]} );
for(int i=0; i<pages.size(); i++)
result.back().second.push_back( {pages[i].lowerBound, newPageIDs[i]} );
// TODO: figure this out earlier instead of writing replacement page more than once
if (result.size() > 1 && result.back().second == result.end()[-2].second) {
debug_printf("%p Output same as last version, popping it.\n", THIS);
result.pop_back();
debug_printf("%p Added versioned child set #3: %s\n", THIS, toString(result.back()).c_str());
}
debug_printf("%p Added versioned child set: %s\n", THIS, toString(result.back()).c_str());
}
else {
debug_printf("%p Version 0 has no changes\n", THIS);
result.push_back({0, {{*lowerBound, root}}});
debug_printf("%p Added versioned child set: %s\n", THIS, toString(result.back()).c_str());
debug_printf("%p Added versioned child set #4: %s\n", THIS, toString(result.back()).c_str());
}
if (nextVersion == std::numeric_limits<Version>::max())
@ -1646,7 +1732,7 @@ private:
self->printMutationBuffer(mutations);
}
VersionedChildrenT _ = wait(commitSubtree(self, mutations, self->m_pager->getReadSnapshot(latestVersion), self->m_root, &dbBegin, &dbEnd));
VersionedChildrenT newRoot = wait(commitSubtree(self, mutations, self->m_pager->getReadSnapshot(latestVersion), self->m_root, &dbBegin, &dbEnd, &dbBegin, &dbEnd));
self->m_pager->setLatestVersion(writeVersion);
debug_printf("%s: Committing pager %lld\n", self->m_name.c_str(), writeVersion);
@ -1809,13 +1895,22 @@ private:
self->ensureUnshared();
loop {
if(self->pageCursor->cursor.seekLessThanOrEqual(query)) {
bool success = self->pageCursor->cursor.seekLessThanOrEqual(query);
// Skip backwards over internal page entries that do not link to child pages
if(!self->pageCursor->isLeaf()) {
// While record has no value, move again
while(success && !self->pageCursor->cursor.get().value.present()) {
success = self->pageCursor->cursor.movePrev();
}
}
if(success) {
// If we found a record <= query at a leaf page then return success
if(self->pageCursor->isLeaf()) {
return true;
}
// Otherwise move to next child page
Reference<PageCursor> child = wait(self->pageCursor->getChild(self->pager, self->usablePageSizeOverride));
self->pageCursor = child;
}
@ -1837,6 +1932,14 @@ private:
self->ensureUnshared();
bool success = self->pageCursor->cursor.valid() && (forward ? self->pageCursor->cursor.moveNext() : self->pageCursor->cursor.movePrev());
// Skip over internal page entries that do not link to child pages
if(!self->pageCursor->isLeaf()) {
// While record has no value, move again
while(success && !self->pageCursor->cursor.get().value.present()) {
success = forward ? self->pageCursor->cursor.moveNext() : self->pageCursor->cursor.movePrev();
}
}
// Stop if successful or there's no parent to move to
if(success || !self->pageCursor->parent) {
break;
@ -1853,6 +1956,14 @@ private:
// While not on a leaf page, move down to get to one.
while(!self->pageCursor->isLeaf()) {
// Skip over internal page entries that do not link to child pages
while(!self->pageCursor->cursor.get().value.present()) {
bool success = forward ? self->pageCursor->cursor.moveNext() : self->pageCursor->cursor.movePrev();
if(!success) {
return false;
}
}
Reference<PageCursor> child = wait(self->pageCursor->getChild(self->pager, self->usablePageSizeOverride));
bool success = forward ? child->cursor.moveFirst() : child->cursor.moveLast();
self->pageCursor = child;
@ -1887,11 +1998,20 @@ private:
// Move to first or last record in the page
bool success = begin ? self->pageCursor->cursor.moveFirst() : self->pageCursor->cursor.moveLast();
// Skip over internal page entries that do not link to child pages
if(!self->pageCursor->isLeaf()) {
// While record has no value, move past it
while(success && !self->pageCursor->cursor.get().value.present()) {
success = begin ? self->pageCursor->cursor.moveNext() : self->pageCursor->cursor.movePrev();
}
}
// If it worked, return true if we've reached a leaf page otherwise go to the next child
if(success) {
if(self->pageCursor->isLeaf()) {
return true;
}
Reference<PageCursor> child = wait(self->pageCursor->getChild(self->pager, self->usablePageSizeOverride));
self->pageCursor = child;
}
@ -1987,7 +2107,7 @@ private:
// for equal use cmp == 0
ACTOR static Future<Void> find_impl(Cursor *self, KeyRef key, bool needValue, int cmp) {
// Search for the last key at or before (key, version, \xff)
state RedwoodRecordRef query(key, self->m_version);
state RedwoodRecordRef query(key, self->m_version, {}, 0, std::numeric_limits<int32_t>::max());
self->m_kv.reset();
wait(success(self->m_cur1.seekLTE(query)));
@ -2380,8 +2500,8 @@ ACTOR Future<int> verifyRange(VersionedBTree *btree, Key start, Key end, Version
// Randomly use the cursor for something else first.
if(g_random->coinflip()) {
debug_printf("VerifyRange(@%lld, %s, %s): Dummy seek\n", v, start.toString().c_str(), end.toString().c_str());
state Key randomKey = randomKV().key;
debug_printf("VerifyRange(@%lld, %s, %s): Dummy seek to '%s'\n", v, start.toString().c_str(), end.toString().c_str(), randomKey.toString().c_str());
wait(g_random->coinflip() ? cur->findFirstEqualOrGreater(randomKey, true, 0) : cur->findLastLessOrEqual(randomKey, true, 0));
}
@ -2397,6 +2517,7 @@ ACTOR Future<int> verifyRange(VersionedBTree *btree, Key start, Key end, Version
if(i == iEnd)
break;
++i;
if(iLast->first.second <= v
&& iLast->second.present()
&& (
@ -2404,8 +2525,10 @@ ACTOR Future<int> verifyRange(VersionedBTree *btree, Key start, Key end, Version
|| i->first.first != iLast->first.first
|| i->first.second > v
)
)
) {
debug_printf("VerifyRange(@%lld, %s, %s) Found key in written map: %s\n", v, start.toString().c_str(), end.toString().c_str(), iLast->first.first.c_str());
break;
}
}
if(iLast == iEnd) {
@ -2428,6 +2551,8 @@ ACTOR Future<int> verifyRange(VersionedBTree *btree, Key start, Key end, Version
break;
}
ASSERT(errors == 0);
results.push_back(KeyValue(KeyValueRef(cur->getKey(), cur->getValue())));
wait(cur->next(true));
}
@ -2545,7 +2670,7 @@ ACTOR Future<int> verifyAll(VersionedBTree *btree, Version maxCommittedVersion,
return errors;
}
ACTOR Future<Void> verify(VersionedBTree *btree, FutureStream<Version> vStream, std::map<std::pair<std::string, Version>, Optional<std::string>> *written, int *pErrorCount) {
ACTOR Future<Void> verify(VersionedBTree *btree, FutureStream<Version> vStream, std::map<std::pair<std::string, Version>, Optional<std::string>> *written, int *pErrorCount, bool serial) {
state Future<int> vall;
state Future<int> vrange;
@ -2557,12 +2682,24 @@ ACTOR Future<Void> verify(VersionedBTree *btree, FutureStream<Version> vStream,
v = btree->getLastCommittedVersion();
debug_printf("Verifying at latest committed version %lld\n", v);
vall = verifyRange(btree, LiteralStringRef(""), LiteralStringRef("\xff\xff"), v, written, pErrorCount);
if(serial) {
wait(success(vall));
}
vrange = verifyRange(btree, randomKV().key, randomKV().key, v, written, pErrorCount);
if(serial) {
wait(success(vrange));
}
}
else {
debug_printf("Verifying through version %lld\n", v);
vall = verifyAll(btree, v, written, pErrorCount);
if(serial) {
wait(success(vall));
}
vrange = verifyRange(btree, randomKV().key, randomKV().key, g_random->randomInt(1, v + 1), written, pErrorCount);
if(serial) {
wait(success(vrange));
}
}
wait(success(vall) && success(vrange));
@ -2880,6 +3017,12 @@ TEST_CASE("!/redwood/correctness") {
state std::string pagerFile = "unittest_pageFile";
IPager *pager;
state bool serialTest = g_random->coinflip();
state bool shortTest = g_random->coinflip();
state bool singleVersion = true; // Multi-version mode is broken / not finished
printf("serialTest: %d shortTest: %d singleVersion: %d\n", serialTest, shortTest, singleVersion);
if(useDisk) {
printf("Deleting existing test data...\n");
deleteFile(pagerFile);
@ -2891,8 +3034,7 @@ TEST_CASE("!/redwood/correctness") {
pager = createMemoryPager();
printf("Initializing...\n");
state int pageSize = g_random->coinflip() ? pager->getUsablePageSize() : g_random->randomInt(200, 400);
state bool singleVersion = g_random->random01() < .05;
state int pageSize = shortTest ? 200 : (g_random->coinflip() ? pager->getUsablePageSize() : g_random->randomInt(200, 400));
state VersionedBTree *btree = new VersionedBTree(pager, pagerFile, singleVersion, pageSize);
wait(btree->init());
@ -2900,10 +3042,10 @@ TEST_CASE("!/redwood/correctness") {
// a situation where the tree cannot be grown upward with decreasing level size.
// TODO: Handle arbitrarily large keys
state int maxKeySize = g_random->randomInt(4, pageSize * 2);
state int maxValueSize = g_random->randomInt(0, pageSize * 2);
state int maxCommitSize = 5e6;
state int mutationBytesTarget = randomSize(50e6);
state double clearChance = g_random->random01() * .001; // at most 1 in 1000
state int maxValueSize = g_random->randomInt(0, pageSize * 4);
state int maxCommitSize = shortTest ? 1000 : randomSize(10e6);
state int mutationBytesTarget = shortTest ? 5000 : randomSize(50e6);
state double clearChance = g_random->random01() * .01; // at most 1 in 100
printf("Using page size %d, max key size %d, max value size %d, clearchance %f, total mutation byte target %d\n", pageSize, maxKeySize, maxValueSize, clearChance, mutationBytesTarget);
@ -2927,8 +3069,8 @@ TEST_CASE("!/redwood/correctness") {
state int mutationBytesTargetThisCommit = randomSize(maxCommitSize);
state PromiseStream<Version> committedVersions;
state Future<Void> verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount);
state Future<Void> randomTask = randomReader(btree) || btree->getError();
state Future<Void> verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount, serialTest);
state Future<Void> randomTask = serialTest ? Void() : (randomReader(btree) || btree->getError());
state Future<Void> commit = Void();
@ -2964,7 +3106,7 @@ TEST_CASE("!/redwood/correctness") {
++rangeClears;
KeyRangeRef range(start, end);
debug_printf(" Clear '%s' to '%s' @%lld\n", start.toString().c_str(), end.toString().c_str(), version);
debug_printf(" Mutation: Clear '%s' to '%s' @%lld\n", start.toString().c_str(), end.toString().c_str(), version);
auto e = written.lower_bound(std::make_pair(start.toString(), 0));
if(e != written.end()) {
auto last = e;
@ -2974,7 +3116,7 @@ TEST_CASE("!/redwood/correctness") {
++e;
// If e key is different from last and last was present then insert clear for last's key at version
if(last != eEnd && ((e == eEnd || e->first.first != last->first.first) && last->second.present())) {
debug_printf(" Clearing key '%s' @%lld\n", last->first.first.c_str(), version);
debug_printf(" Mutation: Clearing key '%s' @%lld\n", last->first.first.c_str(), version);
keyBytesCleared += last->first.first.size();
mutationBytes += last->first.first.size();
@ -3004,7 +3146,7 @@ TEST_CASE("!/redwood/correctness") {
kv.key = StringRef(kv.arena(), *i);
}
debug_printf(" Set '%s' -> '%s' @%lld\n", kv.key.toString().c_str(), kv.value.toString().c_str(), version);
debug_printf(" Mutation: Set '%s' -> '%s' @%lld\n", kv.key.toString().c_str(), kv.value.toString().c_str(), version);
++sets;
keyBytesInserted += kv.key.size();
@ -3039,11 +3181,21 @@ TEST_CASE("!/redwood/correctness") {
return Void();
});
if(serialTest) {
// Wait for commit, wait for verification, then start new verification
wait(commit);
committedVersions.sendError(end_of_stream());
debug_printf("Waiting for verification to complete.\n");
wait(verifyTask);
committedVersions = PromiseStream<Version>();
verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount, serialTest);
}
mutationBytesThisCommit = 0;
mutationBytesTargetThisCommit = randomSize(maxCommitSize);
// Recover from disk at random
if(useDisk && g_random->random01() < .02) {
if(!serialTest && useDisk && g_random->random01() < .02) {
printf("Recovering from disk.\n");
// Wait for outstanding commit
@ -3070,7 +3222,7 @@ TEST_CASE("!/redwood/correctness") {
// Create new promise stream and start the verifier again
committedVersions = PromiseStream<Version>();
verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount);
verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount, serialTest);
randomTask = randomReader(btree) || btree->getError();
}