Rewrite superpage writes to fix bugs involving logical ID reuse and to make the logic cleaner. Correctness test is stronger, with pipelined writes/commits during verification.

This commit is contained in:
Stephen Atherton 2018-10-01 16:51:57 -07:00
parent a9f467c502
commit 15c1f87681
2 changed files with 72 additions and 58 deletions

View File

@ -386,7 +386,13 @@ void IndirectShadowPager::freeLogicalPage(LogicalPageID pageID, Version version)
PageVersionMap &pageVersionMap = pageTable[pageID];
ASSERT(!pageVersionMap.empty());
// 0 will mean delete as of latest version, similar to write at latest version
if(version == 0) {
version = pageVersionMap.back().first;
}
auto itr = pageVersionMapLowerBound(pageVersionMap, version);
// TODO: Is this correct, that versions from the past *forward* can be deleted?
for(auto i = itr; i != pageVersionMap.end(); ++i) {
freePhysicalPageID(i->second);
}
@ -584,6 +590,7 @@ ACTOR Future<Reference<const IPage>> getPageImpl(IndirectShadowPager *pager, Ref
auto itr = IndirectShadowPager::pageVersionMapUpperBound(pageVersionMap, version);
if(itr == pageVersionMap.begin()) {
debug_printf("Page version map empty! op=error id=%u @%lld\n", logicalPageID, version);
ASSERT(false);
}
@ -636,7 +643,10 @@ ACTOR Future<Reference<const IPage>> getPageImpl(IndirectShadowPager *pager, Ref
}
Future<Reference<const IPage>> IndirectShadowPager::getPage(Reference<IndirectShadowPagerSnapshot> snapshot, LogicalPageID pageID, Version version) {
ASSERT(recovery.isReady());
if(!recovery.isReady()) {
debug_printf("getPage failure, recovery not ready - op=error id=%u @%lld\n", pageID, version);
ASSERT(false);
}
Future<Reference<const IPage>> f = getPageImpl(this, snapshot, pageID, version);
operations.add(success(f));

View File

@ -759,54 +759,36 @@ private:
}
}
std::vector<LogicalPageID> writePages(std::vector<BoundaryAndPage> pages, Version version, LogicalPageID previousID, const BTreePage *originalPage, StringRef upperBound, void *actor_debug) {
debug_printf("%p: writePages(): %u @%lld -> %lu replacement pages\n", actor_debug, previousID, version, pages.size());
std::vector<LogicalPageID> writePages(std::vector<BoundaryAndPage> pages, Version version, LogicalPageID originalID, const BTreePage *originalPage, StringRef upperBound, void *actor_debug) {
debug_printf("%p: writePages(): %u @%lld -> %lu replacement pages\n", actor_debug, originalID, version, pages.size());
// For each IPage of data, assign a logical pageID.
std::vector<LogicalPageID> logicalPageIDs;
std::vector<LogicalPageID> firstLogicalPageIDs;
ASSERT(version != 0 || pages.size() == 1);
// Gather the old page IDs
logicalPageIDs.push_back(previousID);
std::vector<LogicalPageID> primaryLogicalPageIDs;
// Extension pages
for(int i = 0; i < originalPage->extensionPageCount; ++i) {
logicalPageIDs.push_back(originalPage->extensionPages[i]);
// Reuse original primary page ID if it's not the root or if only one page is being written.
if(originalID != m_root || pages.size() == 1)
primaryLogicalPageIDs.push_back(originalID);
// Allocate a primary page ID for each page to be written
while(primaryLogicalPageIDs.size() < pages.size()) {
primaryLogicalPageIDs.push_back(m_pager->allocateLogicalPage());
}
// If writing multiple replacement pages then don't reuse the original IDs.
// Also, if the original page ID is not root (and >1 page is being written) then
// delete the pages as of version
// TODO: It's probably okay to reuse IDs in a split so long as the version being written
// is not 0 (meaning a replacement of the most recent version)
if(pages.size() != 1) {
if(previousID != m_root) {
for(auto id : logicalPageIDs) {
//debug_printf("%p: writePages(): Deleting logical page not to be reused, op=del id=%u @%lld\n", actor_debug, id, version);
//m_pager->freeLogicalPage(id, version);
}
}
logicalPageIDs.clear();
}
auto p = logicalPageIDs.begin();
auto pEnd = logicalPageIDs.end();
debug_printf("%p: writePages(): Writing %lu replacement pages for %d at version %lld\n", actor_debug, pages.size(), previousID, version);
debug_printf("%p: writePages(): Writing %lu replacement pages for %d at version %lld\n", actor_debug, pages.size(), originalID, version);
for(int i=0; i<pages.size(); i++) {
// Allocate page number for main page first
LogicalPageID id = (p != pEnd) ? *p++ : m_pager->allocateLogicalPage();
firstLogicalPageIDs.push_back(id);
LogicalPageID id = primaryLogicalPageIDs[i];
// Check for extension pages, if they exist assign IDs for them and write them at version
BTreePage *newPage = (BTreePage *)pages[i].firstPage->mutate();
// If there are extension pages, write through pager directly because this->writePage() will fail to dump it to text
if(newPage->extensionPageCount != 0) {
auto const &extPages = pages[i].extPages;
auto const &extPages = pages[i].extPages;
// If there are extension pages, write all pages using pager directly because this->writePage() is for whole primary pages
if(extPages.size() != 0) {
BTreePage *newPage = (BTreePage *)pages[i].firstPage->mutate();
ASSERT(newPage->extensionPageCount == extPages.size());
for(int e = 0, eEnd = extPages.size(); e < eEnd; ++e) {
LogicalPageID eid = (p != pEnd) ? *p++ : m_pager->allocateLogicalPage();
LogicalPageID eid = m_pager->allocateLogicalPage();
debug_printf("%p: writePages(): Writing extension page op=write id=%u @%lld (%d of %lu) referencePage=%u\n", actor_debug, eid, version, e + 1, extPages.size(), id);
newPage->extensionPages[e] = eid;
// If replacing the primary page below (version == 0) then pass the primary page's ID as the reference page ID
@ -817,20 +799,18 @@ private:
m_pager->writePage(id, pages[i].firstPage, version);
}
else {
ASSERT(pages[i].extPages.size() == 0);
debug_printf("%p: writePages(): Writing normal page op=write id=%u @%lld\n", actor_debug, id, version);
writePage(id, pages[i].firstPage, version, pages[i].lowerBound, (i == pages.size() - 1) ? upperBound : pages[i + 1].lowerBound);
}
}
// If there were unused logical IDs that were available for reuse, delete them
while(p != pEnd) {
//debug_printf("%p: writePages(): Deleting logical page left unused, op=del id=%u @%lld\n", actor_debug, *p, version);
//m_pager->freeLogicalPage(*p++, version);
++p;
// Free the old extension pages now that all replacement pages have been written
for(int i = 0; i < originalPage->extensionPageCount; ++i) {
//debug_printf("%p: writePages(): Freeing old extension op=del id=%u @latest\n", actor_debug, originalPage->extensionPages[i]);
//m_pager->freeLogicalPage(originalPage->extensionPages[i], version);
}
return firstLogicalPageIDs;
return primaryLogicalPageIDs;
}
class SuperPage : public IPage, ReferenceCounted<SuperPage> {
@ -2140,13 +2120,17 @@ ACTOR Future<int> verifyAll(VersionedBTree *btree, Version maxCommittedVersion,
ACTOR Future<Void> verify(VersionedBTree *btree, FutureStream<Version> vStream, std::map<std::pair<std::string, Version>, Optional<std::string>> *written, int *pErrorCount) {
try {
loop {
Version v = waitNext(vStream);
state Version v = waitNext(vStream);
debug_printf("Verifying through version %lld\n", v);
state Future<int> vall = verifyAll(btree, v, written);
state Future<int> vrange = verifyRandomRange(btree, g_random->randomInt(0, v + 1), written);
state Future<int> vrange = verifyRandomRange(btree, g_random->randomInt(1, v + 1), written);
wait(success(vall) && success(vrange));
*pErrorCount += (vall.get() + vrange.get());
int errors = vall.get() + vrange.get();
*pErrorCount += errors;
debug_printf("Verified through version %lld, %d errors\n", v, errors);
if(*pErrorCount != 0)
break;
@ -2205,6 +2189,8 @@ TEST_CASE("/redwood/correctness") {
state PromiseStream<Version> committedVersions;
state Future<Void> verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount);
state Future<Void> commit = Void();
while(mutationBytes < mutationBytesTarget) {
// Sometimes advance the version
if(g_random->random01() < 0.10) {
@ -2282,36 +2268,49 @@ TEST_CASE("/redwood/correctness") {
// Sometimes (and at end) commit then check all results
if(mutationBytes >= std::min(mutationBytesTarget, (int)20e6) || g_random->random01() < .002) {
wait(btree->commit());
// Wait for btree commit and send the new version to committedVersions.
// Avoid capture of version as a member of *this
Version v = version;
commit = map(commit && btree->commit(), [=](Void) {
// Notify the background verifier that version is committed and therefore readable
committedVersions.send(v);
return Void();
});
printf("Cumulative: %d total mutation bytes, %lu key changes, %lld key bytes, %lld value bytes\n", mutationBytes, written.size(), keyBytesInserted, ValueBytesInserted);
// Recover from disk at random
if(useDisk && g_random->random01() < .1) {
printf("Recovering from disk.\n");
// Wait for outstanding commit
debug_printf("Waiting for outstanding commit\n");
wait(commit);
// Stop and wait for the verifier task
committedVersions.sendError(end_of_stream());
debug_printf("Waiting for verification to complete.\n");
wait(verifyTask);
printf("Reopening disk btree\n");
delete btree;
Future<Void> closedFuture = pager->onClosed();
pager->close();
wait(closedFuture);
debug_printf_always("Opening btree\n");
pager = new IndirectShadowPager(pagerFile);
btree = new VersionedBTree(pager, pagerFile, pageSize);
wait(btree->init());
Version v = wait(btree->getLatestVersion());
ASSERT(v == version);
printf("Recovered from disk. Latest version %lld\n", v);
// Create new promise stream and start the verifier again
committedVersions = PromiseStream<Version>();
verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount);
}
// Notify the background verifier that version is committed and therefore readable
committedVersions.send(version);
// Check for errors
if(errorCount != 0)
throw internal_error();
@ -2322,7 +2321,10 @@ TEST_CASE("/redwood/correctness") {
}
debug_printf("Waiting for outstanding commit\n");
wait(commit);
committedVersions.sendError(end_of_stream());
debug_printf("Waiting for verification to complete.\n");
wait(verifyTask);
Future<Void> closedFuture = pager->onClosed();
@ -2337,16 +2339,17 @@ TEST_CASE("/redwood/performance/set") {
state VersionedBTree *btree = new VersionedBTree(pager, "unittest_pageFile");
wait(btree->init());
state int nodeCount = 100000;
state int maxChangesPerVersion = 100;
state int nodeCount = 10000000;
state int maxChangesPerVersion = 1000;
state int versions = 5000;
int maxKeySize = 50;
int maxValueSize = 500;
int maxValueSize = 100;
state std::string key(maxKeySize, 'k');
state std::string value(maxKeySize, 'v');
state int64_t kvBytes = 0;
state int records = 0;
state Future<Void> commit = Void();
state double startTime = now();
while(--versions) {
@ -2365,8 +2368,9 @@ TEST_CASE("/redwood/performance/set") {
++records;
}
if(g_random->random01() < .01) {
wait(btree->commit());
if(g_random->random01() < (1.0 / 300)) {
wait(commit);
commit = btree->commit();
double elapsed = now() - startTime;
printf("Committed (cumulative) %lld bytes in %d records in %f seconds, %.2f MB/s\n", kvBytes, records, elapsed, kvBytes / elapsed / 1e6);
}