diff --git a/fdbserver/IPager.h b/fdbserver/IPager.h index 35549ac096..dc58461e47 100644 --- a/fdbserver/IPager.h +++ b/fdbserver/IPager.h @@ -29,7 +29,8 @@ #define REDWOOD_DEBUG 0 -#define debug_printf_always(...) { fprintf(stdout, "%s %f (%s:%d) ", g_network->getLocalAddress().toString().c_str(), now(), __FUNCTION__, __LINE__), fprintf(stdout, __VA_ARGS__); fflush(stdout); } +#define debug_printf_stream stderr +#define debug_printf_always(...) { fprintf(debug_printf_stream, "%s %f %04d ", g_network->getLocalAddress().toString().c_str(), now(), __LINE__); fprintf(debug_printf_stream, __VA_ARGS__); fflush(debug_printf_stream); } #define debug_printf_noop(...) @@ -44,8 +45,8 @@ #define debug_printf printf #endif -#define BEACON fprintf(stderr, "%s: %s line %d \n", __FUNCTION__, __FILE__, __LINE__) -#define TRACE fprintf(stderr, "%s: %s line %d %s\n", __FUNCTION__, __FILE__, __LINE__, platform::get_backtrace().c_str()); +#define BEACON debug_printf_always("HERE\n") +#define TRACE debug_printf_always("%s: %s line %d %s\n", __FUNCTION__, __FILE__, __LINE__, platform::get_backtrace().c_str()); #ifndef VALGRIND #define VALGRIND_MAKE_MEM_UNDEFINED(x, y) @@ -53,7 +54,7 @@ #endif typedef uint32_t LogicalPageID; // uint64_t? -static const int invalidLogicalPageID = LogicalPageID(-1); +static const LogicalPageID invalidLogicalPageID = std::numeric_limits::max(); class IPage { public: @@ -209,6 +210,9 @@ public: virtual StorageBytes getStorageBytes() = 0; + // Count of pages in use by the pager client + virtual Future getUserPageCount() = 0; + // Future returned is ready when pager has been initialized from disk and is ready for reads and writes. // It is invalid to call most other functions until init() is ready. // TODO: Document further. diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 6bc3959854..daf97d46b4 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -189,7 +189,8 @@ public: struct Cursor { enum Mode { NONE, - READ, + POP, + READONLY, WRITE }; @@ -213,7 +214,7 @@ public: Cursor() : mode(NONE) { } - // Initialize a cursor. Since cursors can have async operations pending they can't be copied cleanly. + // Initialize a cursor. void init(FIFOQueue *q = nullptr, Mode m = NONE, LogicalPageID initialPageID = invalidLogicalPageID, int readOffset = 0, LogicalPageID endPage = invalidLogicalPageID) { if(operation.isValid()) { operation.cancel(); @@ -225,7 +226,7 @@ public: endPageID = endPage; page.clear(); - if(mode == READ) { + if(mode == POP || mode == READONLY) { // If cursor is not pointed at the end page then start loading it. // The end page will not have been written to disk yet. pageID = initialPageID; @@ -244,8 +245,15 @@ public: } } + // Since cursors can have async operations pending which modify their state they can't be copied cleanly Cursor(const Cursor &other) = delete; + // A read cursor can be initialized from a pop cursor + void initReadOnly(const Cursor &c) { + ASSERT(c.mode == READONLY || c.mode == POP); + init(c.queue, READONLY, c.pageID, c.offset, c.endPageID); + } + ~Cursor() { operation.cancel(); } @@ -254,7 +262,7 @@ public: if(mode == WRITE) { return format("{WriteCursor %s:%p pos=%s:%d endOffset=%d}", queue->name.c_str(), this, ::toString(pageID).c_str(), offset, page ? raw()->endOffset : -1); } - if(mode == READ) { + if(mode == POP || mode == READONLY) { return format("{ReadCursor %s:%p pos=%s:%d endOffset=%d endPage=%s}", queue->name.c_str(), this, ::toString(pageID).c_str(), offset, page ? raw()->endOffset : -1, ::toString(endPageID).c_str()); } ASSERT(mode == NONE); @@ -295,7 +303,7 @@ public: } Future loadPage() { - ASSERT(mode == READ); + ASSERT(mode == POP | mode == READONLY); debug_printf("FIFOQueue::Cursor(%s) loadPage\n", toString().c_str()); return map(queue->pager->readPage(pageID, true), [=](Reference p) { page = p; @@ -380,9 +388,9 @@ public: p.send(Void()); } - // Read the next item at the cursor, moving to a new page first if the current page is exhausted + // Read the next item at the cursor (if <= upperBound), moving to a new page first if the current page is exhausted ACTOR static Future> readNext_impl(Cursor *self, Optional upperBound, Future start) { - ASSERT(self->mode == READ); + ASSERT(self->mode == POP || self->mode == READONLY); // Wait for the previous operation to finish state Future previous = self->operation; @@ -414,7 +422,9 @@ public: } self->offset += bytesRead; - --self->queue->numEntries; + if(self->mode == POP) { + --self->queue->numEntries; + } debug_printf("FIFOQueue::Cursor(%s) after read of %s\n", self->toString().c_str(), ::toString(result).c_str()); ASSERT(self->offset <= p->endOffset); @@ -423,21 +433,26 @@ public: LogicalPageID oldPageID = self->pageID; self->pageID = p->nextPageID; self->offset = p->nextOffset; - --self->queue->numPages; + if(self->mode == POP) { + --self->queue->numPages; + } self->page.clear(); - debug_printf("FIFOQueue::Cursor(%s) Page exhausted, moved to new page\n", self->toString().c_str()); + debug_printf("FIFOQueue::Cursor(%s) readNext page exhausted, moved to new page\n", self->toString().c_str()); - // Freeing the old page must happen after advancing the cursor and clearing the page reference because - // freePage() could cause a push onto a queue that causes a newPageID() call which could pop() from this - // very same queue. - // Queue pages are freed at page 0 because they can be reused after the next commit. - self->queue->pager->freePage(oldPageID, 0); + if(self->mode == POP) { + // Freeing the old page must happen after advancing the cursor and clearing the page reference because + // freePage() could cause a push onto a queue that causes a newPageID() call which could pop() from this + // very same queue. + // Queue pages are freed at page 0 because they can be reused after the next commit. + self->queue->pager->freePage(oldPageID, 0); + } } - debug_printf("FIFOQueue(%s) pop(upperBound=%s) -> %s\n", self->queue->name.c_str(), ::toString(upperBound).c_str(), ::toString(result).c_str()); + debug_printf("FIFOQueue(%s) %s(upperBound=%s) -> %s\n", self->queue->name.c_str(), (self->mode == POP ? "pop" : "peek"), ::toString(upperBound).c_str(), ::toString(result).c_str()); return result; } + // Read and move past the next item if is <= upperBound or if upperBound is not present Future> readNext(const Optional &upperBound = {}) { if(mode == NONE) { return Optional(); @@ -463,13 +478,13 @@ public: // Create a new queue at newPageID void create(IPager2 *p, LogicalPageID newPageID, std::string queueName) { - debug_printf("FIFOQueue(%s) create from page id %u\n", queueName.c_str(), newPageID); + debug_printf("FIFOQueue(%s) create from page %s\n", queueName.c_str(), toString(newPageID).c_str()); pager = p; name = queueName; numPages = 1; numEntries = 0; dataBytesPerPage = pager->getUsablePageSize() - sizeof(typename Cursor::RawPage); - headReader.init(this, Cursor::READ, newPageID, 0, newPageID); + headReader.init(this, Cursor::POP, newPageID, 0, newPageID); tailWriter.init(this, Cursor::WRITE, newPageID); headWriter.init(this, Cursor::WRITE); newTailPage = invalidLogicalPageID; @@ -484,13 +499,35 @@ public: numPages = qs.numPages; numEntries = qs.numEntries; dataBytesPerPage = pager->getUsablePageSize() - sizeof(typename Cursor::RawPage); - headReader.init(this, Cursor::READ, qs.headPageID, qs.headOffset, qs.tailPageID); + headReader.init(this, Cursor::POP, qs.headPageID, qs.headOffset, qs.tailPageID); tailWriter.init(this, Cursor::WRITE, qs.tailPageID); headWriter.init(this, Cursor::WRITE); newTailPage = invalidLogicalPageID; debug_printf("FIFOQueue(%s) recovered\n", queueName.c_str()); } + ACTOR static Future>> peekAll_impl(FIFOQueue *self) { + state Standalone> results; + state Cursor c; + c.initReadOnly(self->headReader); + results.reserve(results.arena(), self->numEntries); + + loop { + Optional x = wait(c.readNext()); + if(!x.present()) { + break; + } + results.push_back(results.arena(), x.get()); + } + + return results; + } + + Future>> peekAll() { + return peekAll_impl(this); + } + + // Pop the next item on front of queue if it is <= upperBound or if upperBound is not present Future> pop(Optional upperBound = {}) { return headReader.readNext(upperBound); } @@ -787,13 +824,23 @@ ACTOR template Future forwardError(Future f, Promise target } } -class COWPagerSnapshot; +class DWALPagerSnapshot; -class COWPager : public IPager2 { +// An implementation of IPager2 that supports atomicUpdate() of a page without forcing a change to new page ID. +// It does this internally mapping the original page ID to alternate page IDs by write version. +// The page id remaps are kept in memory and also logged to a "remap queue" which must be reloaded on cold start. +// To prevent the set of remaps from growing unboundedly, once a remap is old enough to be at or before the +// oldest pager version being maintained the remap can be "undone" by popping it from the remap queue, +// copying the alternate page ID's data over top of the original page ID's data, and deleting the remap from memory. +// This process basically describes a "Delayed" Write-Ahead-Log (DWAL) because the remap queue and the newly allocated +// alternate pages it references basically serve as a write ahead log for pages that will eventially be copied +// back to their original location once the original version is no longer needed. +class DWALPager : public IPager2 { public: typedef FastAllocatedPage Page; typedef FIFOQueue LogicalPageQueueT; +#pragma pack(push, 1) struct DelayedFreePage { Version version; LogicalPageID pageID; @@ -803,15 +850,32 @@ public: } std::string toString() const { - return format("{%s @%" PRId64 "}", ::toString(pageID).c_str(), version); + return format("DelayedFreePage{%s @%" PRId64 "}", ::toString(pageID).c_str(), version); } }; - typedef FIFOQueue VersionedLogicalPageQueueT; + struct RemappedPage { + Version version; + LogicalPageID originalPageID; + LogicalPageID newPageID; + + bool operator<(const RemappedPage &rhs) { + return version < rhs.version; + } + + std::string toString() const { + return format("RemappedPage(%s -> %s @%" PRId64 "}", ::toString(originalPageID).c_str(), ::toString(newPageID).c_str(), version); + } + }; + +#pragma pack(pop) + + typedef FIFOQueue DelayedFreePageQueueT; + typedef FIFOQueue RemapQueueT; // If the file already exists, pageSize might be different than desiredPageSize // Use pageCacheSizeBytes == 0 for default - COWPager(int desiredPageSize, std::string filename, int pageCacheSizeBytes) + DWALPager(int desiredPageSize, std::string filename, int pageCacheSizeBytes) : desiredPageSize(desiredPageSize), filename(filename), pHeader(nullptr), pageCacheBytes(pageCacheSizeBytes) { if(pageCacheBytes == 0) { @@ -838,9 +902,11 @@ public: memcpy(lastCommittedHeaderPage->mutate(), headerPage->begin(), smallestPhysicalBlock); } - ACTOR static Future recover(COWPager *self) { + ACTOR static Future recover(DWALPager *self) { ASSERT(!self->recoverFuture.isValid()); + self->remapUndoFuture = Void(); + int64_t flags = IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_LOCK; state bool exists = fileExists(self->filename); if(!exists) { @@ -859,13 +925,13 @@ public: wait(store(fileSize, self->pageFile->size())); } - debug_printf("COWPager(%s) recover exists=%d fileSize=%" PRId64 "\n", self->filename.c_str(), exists, fileSize); + debug_printf("DWALPager(%s) recover exists=%d fileSize=%" PRId64 "\n", self->filename.c_str(), exists, fileSize); // TODO: If the file exists but appears to never have been successfully committed is this an error or // should recovery proceed with a new pager instance? // If there are at least 2 pages then try to recover the existing file if(exists && fileSize >= (self->smallestPhysicalBlock * 2)) { - debug_printf("COWPager(%s) recovering using existing file\n"); + debug_printf("DWALPager(%s) recovering using existing file\n"); state bool recoveredHeader = false; @@ -874,7 +940,7 @@ public: // If the checksum fails for the header page, try to recover committed header backup from page 1 if(!self->headerPage.castTo()->verifyChecksum(0)) { - TraceEvent(SevWarn, "COWPagerRecoveringHeader").detail("Filename", self->filename); + TraceEvent(SevWarn, "DWALPagerRecoveringHeader").detail("Filename", self->filename); wait(store(self->headerPage, self->readHeaderPage(self, 1))); @@ -885,7 +951,7 @@ public: } Error e = checksum_failed(); - TraceEvent(SevError, "COWPagerRecoveryFailed") + TraceEvent(SevError, "DWALPagerRecoveryFailed") .detail("Filename", self->filename) .error(e); throw e; @@ -897,7 +963,7 @@ public: self->setPageSize(self->pHeader->pageSize); if(self->logicalPageSize != self->desiredPageSize) { - TraceEvent(SevWarn, "COWPagerPageSizeNotDesired") + TraceEvent(SevWarn, "DWALPagerPageSizeNotDesired") .detail("Filename", self->filename) .detail("ExistingPageSize", self->logicalPageSize) .detail("DesiredPageSize", self->desiredPageSize); @@ -905,6 +971,14 @@ public: self->freeList.recover(self, self->pHeader->freeList, "FreeListRecovered"); self->delayedFreeList.recover(self, self->pHeader->delayedFreeList, "DelayedFreeListRecovered"); + self->remapQueue.recover(self, self->pHeader->remapQueue, "RemapQueueRecovered"); + + Standalone> remaps = wait(self->remapQueue.peekAll()); + for(auto &r : remaps) { + if(r.newPageID != invalidLogicalPageID) { + self->remappedPages[r.originalPageID][r.version] = r.newPageID; + } + } // If the header was recovered from the backup at Page 1 then write and sync it to Page 0 before continuing. // If this fails, the backup header is still in tact for the next recovery attempt. @@ -917,7 +991,7 @@ public: // Sync header wait(self->pageFile->sync()); - debug_printf("COWPager(%s) Header recovery complete.\n", self->filename.c_str()); + debug_printf("DWALPager(%s) Header recovery complete.\n", self->filename.c_str()); } // Update the last committed header with the one that was recovered (which is the last known committed header) @@ -929,7 +1003,7 @@ public: // A new pager will be created in its place. // TODO: Is the right behavior? - debug_printf("COWPager(%s) creating new pager\n"); + debug_printf("DWALPager(%s) creating new pager\n"); self->headerPage = self->newPageBuffer(); self->pHeader = (Header *)self->headerPage->begin(); @@ -949,15 +1023,17 @@ public: // Page 1 - header backup self->pHeader->pageCount = 2; - // Create a new free list + // Create queues self->freeList.create(self, self->newLastPageID(), "FreeList"); self->delayedFreeList.create(self, self->newLastPageID(), "delayedFreeList"); + self->remapQueue.create(self, self->newLastPageID(), "remapQueue"); // The first commit() below will flush the queues and update the queue states in the header, // but since the queues will not be used between now and then their states will not change. // In order to populate lastCommittedHeader, update the header now with the queue states. self->pHeader->freeList = self->freeList.getState(); self->pHeader->delayedFreeList = self->delayedFreeList.getState(); + self->pHeader->remapQueue = self->remapQueue.getState(); // Set remaining header bytes to \xff memset(self->headerPage->mutate() + self->pHeader->size(), 0xff, self->headerPage->size() - self->pHeader->size()); @@ -968,7 +1044,7 @@ public: wait(self->commit()); } - debug_printf("COWPager(%s) recovered. committedVersion=%" PRId64 " logicalPageSize=%d physicalPageSize=%d\n", self->filename.c_str(), self->pHeader->committedVersion, self->logicalPageSize, self->physicalPageSize); + debug_printf("DWALPager(%s) recovered. committedVersion=%" PRId64 " logicalPageSize=%d physicalPageSize=%d\n", self->filename.c_str(), self->pHeader->committedVersion, self->logicalPageSize, self->physicalPageSize); return Void(); } @@ -984,11 +1060,11 @@ public: // Get a new, previously available page ID. The page will be considered in-use after the next commit // regardless of whether or not it was written to, until it is returned to the pager via freePage() - ACTOR static Future newPageID_impl(COWPager *self) { + ACTOR static Future newPageID_impl(DWALPager *self) { // First try the free list Optional freePageID = wait(self->freeList.pop()); if(freePageID.present()) { - debug_printf("COWPager(%s) newPageID() returning %s from free list\n", self->filename.c_str(), toString(freePageID.get()).c_str()); + debug_printf("DWALPager(%s) newPageID() returning %s from free list\n", self->filename.c_str(), toString(freePageID.get()).c_str()); return freePageID.get(); } @@ -996,13 +1072,13 @@ public: ASSERT(!self->snapshots.empty()); Optional delayedFreePageID = wait(self->delayedFreeList.pop(DelayedFreePage{self->effectiveOldestVersion(), 0})); if(delayedFreePageID.present()) { - debug_printf("COWPager(%s) newPageID() returning %s from delayed free list\n", self->filename.c_str(), toString(delayedFreePageID.get()).c_str()); + debug_printf("DWALPager(%s) newPageID() returning %s from delayed free list\n", self->filename.c_str(), toString(delayedFreePageID.get()).c_str()); return delayedFreePageID.get().pageID; } // Lastly, add a new page to the pager LogicalPageID id = self->newLastPageID(); - debug_printf("COWPager(%s) newPageID() returning %s at end of file\n", self->filename.c_str(), toString(id).c_str()); + debug_printf("DWALPager(%s) newPageID() returning %s at end of file\n", self->filename.c_str(), toString(id).c_str()); return id; }; @@ -1018,13 +1094,13 @@ public: } Future writeHeaderPage(PhysicalPageID pageID, Reference page) { - debug_printf("COWPager(%s) header op=write %s\n", filename.c_str(), toString(pageID).c_str()); + debug_printf("DWALPager(%s) header op=write %s\n", filename.c_str(), toString(pageID).c_str()); ((Page *)page.getPtr())->updateChecksum(pageID); return holdWhile(page, pageFile->write(page->begin(), smallestPhysicalBlock, (int64_t)pageID * smallestPhysicalBlock)); } Future writePhysicalPage(PhysicalPageID pageID, Reference page) { - debug_printf("COWPager(%s) op=write %s\n", filename.c_str(), toString(pageID).c_str()); + debug_printf("DWALPager(%s) op=write %s\n", filename.c_str(), toString(pageID).c_str()); ((Page *)page.getPtr())->updateChecksum(pageID); return holdWhile(page, pageFile->write(page->begin(), physicalPageSize, (int64_t)pageID * physicalPageSize)); } @@ -1032,7 +1108,7 @@ public: void updatePage(LogicalPageID pageID, Reference data) override { // Get the cache entry for this page PageCacheEntry &cacheEntry = pageCache.get(pageID); - debug_printf("COWPager(%s) op=write %s cached=%d reading=%d writing=%d\n", filename.c_str(), toString(pageID).c_str(), cacheEntry.readFuture.isValid(), cacheEntry.reading(), cacheEntry.writing()); + debug_printf("DWALPager(%s) op=write %s cached=%d reading=%d writing=%d\n", filename.c_str(), toString(pageID).c_str(), cacheEntry.readFuture.isValid(), cacheEntry.reading(), cacheEntry.writing()); // If the page is still being read then it's not also being written because a write places // the new content in the cache entry when the write is launched, not when it is completed. @@ -1044,46 +1120,57 @@ public: return Void(); }); } + // If the page is being written, wait for this write before issuing the new write + else if(cacheEntry.writing()) { + cacheEntry.writeFuture = map(cacheEntry.writeFuture, [=](Void) { + writePhysicalPage(pageID, data); + return Void(); + }); + } else { - // If the page is being written, wait for this write before issuing the new write - if(cacheEntry.writing()) { - cacheEntry.writeFuture = map(cacheEntry.writeFuture, [=](Void) { - writePhysicalPage(pageID, data); - return Void(); - }); - } - else { - cacheEntry.writeFuture = writePhysicalPage(pageID, data); - } + cacheEntry.writeFuture = writePhysicalPage(pageID, data); } - operations.add(forwardError(cacheEntry.writeFuture, errorPromise)); + cacheEntry.writeFuture = forwardError(cacheEntry.writeFuture, errorPromise); + operations.add(cacheEntry.writeFuture); // Always update the page contents immediately regardless of what happened above. cacheEntry.readFuture = data; } Future atomicUpdatePage(LogicalPageID pageID, Reference data, Version v) override { - debug_printf("COWPager(%s) op=writeAtomic %s @%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v); + debug_printf("DWALPager(%s) op=writeAtomic %s @%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v); // This pager does not support atomic update, so it always allocates and uses a new pageID Future f = map(newPageID(), [=](LogicalPageID newPageID) { updatePage(newPageID, data); - freePage(pageID, v); - return newPageID; + // TODO: Possibly limit size of remap queue since it must be recovered on cold start + RemappedPage r{v, pageID, newPageID}; + remapQueue.pushBack(r); + remappedPages[pageID][v] = newPageID; + debug_printf("DWALPager(%s) pushed %s\n", filename.c_str(), RemappedPage(r).toString().c_str()); + return pageID; }); - return forwardError(f, errorPromise); + // No need for forwardError here because newPageID() is already wrapped in forwardError + return f; } void freePage(LogicalPageID pageID, Version v) override { + // If pageID has been remapped, then it can't be freed until all existing remaps for that page have been undone, so queue it for later deletion + if(remappedPages.find(pageID) != remappedPages.end()) { + debug_printf("DWALPager(%s) op=freeRemapped %s @%" PRId64 " oldestVersion=%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v, pLastCommittedHeader->oldestVersion); + remapQueue.pushBack(RemappedPage{v, pageID, invalidLogicalPageID}); + return; + } + // If v is older than the oldest version still readable then mark pageID as free as of the next commit if(v < effectiveOldestVersion()) { - debug_printf("COWPager(%s) op=freeNow %s @%" PRId64 " oldestVersion=%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v, pLastCommittedHeader->oldestVersion); + debug_printf("DWALPager(%s) op=freeNow %s @%" PRId64 " oldestVersion=%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v, pLastCommittedHeader->oldestVersion); freeList.pushBack(pageID); } else { // Otherwise add it to the delayed free list - debug_printf("COWPager(%s) op=freeLater %s @%" PRId64 " oldestVersion=%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v, pLastCommittedHeader->oldestVersion); + debug_printf("DWALPager(%s) op=freeLater %s @%" PRId64 " oldestVersion=%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v, pLastCommittedHeader->oldestVersion); delayedFreeList.pushBack({v, pageID}); } }; @@ -1091,33 +1178,33 @@ public: // Header pages use a page size of smallestPhysicalBlock // If the user chosen physical page size is larger, then there will be a gap of unused space after // between the end of page 1 and the start of page 2. - ACTOR static Future> readHeaderPage(COWPager *self, PhysicalPageID pageID) { + ACTOR static Future> readHeaderPage(DWALPager *self, PhysicalPageID pageID) { if(g_network->getCurrentTask() > TaskPriority::DiskRead) { wait(delay(0, TaskPriority::DiskRead)); } state Reference page(new FastAllocatedPage(smallestPhysicalBlock, smallestPhysicalBlock)); int readBytes = wait(self->pageFile->read(page->mutate(), smallestPhysicalBlock, (int64_t)pageID * smallestPhysicalBlock)); - debug_printf("COWPager(%s) header op=read_complete %s bytes=%d\n", self->filename.c_str(), toString(pageID).c_str(), readBytes); + debug_printf("DWALPager(%s) header op=read_complete %s bytes=%d\n", self->filename.c_str(), toString(pageID).c_str(), readBytes); ASSERT(readBytes == smallestPhysicalBlock); return page; } - ACTOR static Future> readPhysicalPage(COWPager *self, PhysicalPageID pageID) { + ACTOR static Future> readPhysicalPage(DWALPager *self, PhysicalPageID pageID) { if(g_network->getCurrentTask() > TaskPriority::DiskRead) { wait(delay(0, TaskPriority::DiskRead)); } state Reference page = self->newPageBuffer(); - debug_printf("COWPager(%s) op=read_physical_start %s\n", self->filename.c_str(), toString(pageID).c_str()); + debug_printf("DWALPager(%s) op=read_physical_start %s\n", self->filename.c_str(), toString(pageID).c_str()); int readBytes = wait(self->pageFile->read(page->mutate(), self->physicalPageSize, (int64_t)pageID * self->physicalPageSize)); - debug_printf("COWPager(%s) op=read_complete %s bytes=%d\n", self->filename.c_str(), toString(pageID).c_str(), readBytes); + debug_printf("DWALPager(%s) op=read_complete %s bytes=%d\n", self->filename.c_str(), toString(pageID).c_str(), readBytes); ASSERT(readBytes == self->physicalPageSize); Page *p = (Page *)page.getPtr(); if(!p->verifyChecksum(pageID)) { - debug_printf("COWPager(%s) checksum failed for %s\n", self->filename.c_str(), toString(pageID).c_str()); + debug_printf("DWALPager(%s) checksum failed for %s\n", self->filename.c_str(), toString(pageID).c_str()); Error e = checksum_failed(); - TraceEvent(SevError, "COWPagerChecksumFailed") + TraceEvent(SevError, "DWALPagerChecksumFailed") .detail("Filename", self->filename.c_str()) .detail("PageID", pageID) .detail("PageSize", self->physicalPageSize) @@ -1135,24 +1222,45 @@ public: // Use cached page if present, without triggering a cache hit. // Otherwise, read the page and return it but don't add it to the cache if(!cacheable) { - debug_printf("COWPager(%s) op=read_nocache %s\n", filename.c_str(), toString(pageID).c_str()); + debug_printf("DWALPager(%s) op=read_nocache %s\n", filename.c_str(), toString(pageID).c_str()); PageCacheEntry *pCacheEntry = pageCache.getIfExists(pageID); if(pCacheEntry != nullptr) { + debug_printf("DWALPager(%s) op=read_nocache_hit %s\n", filename.c_str(), toString(pageID).c_str()); return pCacheEntry->readFuture; } + debug_printf("DWALPager(%s) op=read_nocache_miss %s\n", filename.c_str(), toString(pageID).c_str()); return forwardError(readPhysicalPage(this, (PhysicalPageID)pageID), errorPromise); } PageCacheEntry &cacheEntry = pageCache.get(pageID); - debug_printf("COWPager(%s) op=read %s cached=%d reading=%d writing=%d\n", filename.c_str(), toString(pageID).c_str(), cacheEntry.readFuture.isValid(), cacheEntry.reading(), cacheEntry.writing()); + debug_printf("DWALPager(%s) op=read %s cached=%d reading=%d writing=%d\n", filename.c_str(), toString(pageID).c_str(), cacheEntry.readFuture.isValid(), cacheEntry.reading(), cacheEntry.writing()); if(!cacheEntry.readFuture.isValid()) { - debug_printf("COWPager(%s) issuing actual read of %s\n", filename.c_str(), toString(pageID).c_str()); + debug_printf("DWALPager(%s) issuing actual read of %s\n", filename.c_str(), toString(pageID).c_str()); cacheEntry.readFuture = readPhysicalPage(this, (PhysicalPageID)pageID); } - return forwardError(cacheEntry.readFuture, errorPromise); + cacheEntry.readFuture = forwardError(cacheEntry.readFuture, errorPromise); + return cacheEntry.readFuture; + } + + Future> readPageAtVersion(LogicalPageID pageID, Version v, bool cacheable) { + auto i = remappedPages.find(pageID); + + if(i != remappedPages.end()) { + auto j = i->second.upper_bound(v); + if(j != i->second.begin()) { + --j; + debug_printf("DWALPager(%s) read %s @%" PRId64 " -> %s\n", filename.c_str(), toString(pageID).c_str(), v, toString(j->second).c_str()); + pageID = j->second; + } + } + else { + debug_printf("DWALPager(%s) read %s @%" PRId64 " (not remapped)\n", filename.c_str(), toString(pageID).c_str(), v); + } + + return readPage(pageID, cacheable); } // Get snapshot as of the most recent committed version of the pager @@ -1178,12 +1286,69 @@ public: return std::min(pLastCommittedHeader->oldestVersion, snapshots.front().version); } - ACTOR static Future commit_impl(COWPager *self) { - debug_printf("COWPager(%s) commit begin\n", self->filename.c_str()); + ACTOR static Future undoRemaps(DWALPager *self) { + state RemappedPage cutoff; + cutoff.version = self->effectiveOldestVersion(); + + // TODO: Use parallel reads + // TODO: One run of this actor might write to the same original page more than once, in which case just unmap the latest + loop { + if(self->remapUndoStop) { + break; + } + state Optional p = wait(self->remapQueue.pop(cutoff)); + if(!p.present()) { + break; + } + debug_printf("DWALPager(%s) undoRemaps popped %s\n", self->filename.c_str(), p.get().toString().c_str()); + + if(p.get().newPageID == invalidLogicalPageID) { + debug_printf("DWALPager(%s) undoRemaps freeing %s\n", self->filename.c_str(), p.get().toString().c_str()); + self->freePage(p.get().originalPageID, p.get().version); + } + else { + // Read the data from the page that the original was mapped to + Reference data = wait(self->readPage(p.get().newPageID, false)); + + // Some page reads will mark the unused portion of the page as undefined to catch bugs with valgrind. + // We are blindly copying the page data to a new location regardless of its format so mark all of it defined. + VALGRIND_MAKE_MEM_DEFINED(data->begin(), data->size()); + + // Write the data to the original page so it can be read using its original pageID + self->updatePage(p.get().originalPageID, data); + + // Remove the remap from this page, deleting the entry for the pageID if its map becomes empty + auto i = self->remappedPages.find(p.get().originalPageID); + if(i->second.size() == 1) { + self->remappedPages.erase(i); + } + else { + i->second.erase(p.get().version); + } + + // Now that the remap has been undone nothing will read this page so it can be freed as of the next commit. + self->freePage(p.get().newPageID, 0); + } + } + + debug_printf("DWALPager(%s) undoRemaps stopped, remapQueue size is %d\n", self->filename.c_str(), self->remapQueue.numEntries); + return Void(); + } + + ACTOR static Future commit_impl(DWALPager *self) { + debug_printf("DWALPager(%s) commit begin\n", self->filename.c_str()); // Write old committed header to Page 1 self->operations.add(self->writeHeaderPage(1, self->lastCommittedHeaderPage)); + // Trigger the remap eraser to stop and then wait for it. + self->remapUndoStop = true; + wait(self->remapUndoFuture); + + // Flush remap queue separately, it's not involved in free page management + wait(self->remapQueue.flush()); + self->pHeader->remapQueue = self->remapQueue.getState(); + // Flush the free list and delayed free list queues together as they are used by freePage() and newPageID() loop { state bool freeBusy = wait(self->freeList.preFlush()); @@ -1203,16 +1368,16 @@ public: self->pHeader->delayedFreeList = self->delayedFreeList.getState(); // Wait for all outstanding writes to complete - debug_printf("COWPager(%s) waiting for outstanding writes\n", self->filename.c_str()); + debug_printf("DWALPager(%s) waiting for outstanding writes\n", self->filename.c_str()); wait(self->operations.signalAndCollapse()); - debug_printf("COWPager(%s) Syncing\n", self->filename.c_str()); + debug_printf("DWALPager(%s) Syncing\n", self->filename.c_str()); // Sync everything except the header if(g_network->getCurrentTask() > TaskPriority::DiskWrite) { wait(delay(0, TaskPriority::DiskWrite)); } wait(self->pageFile->sync()); - debug_printf("COWPager(%s) commit version %" PRId64 " sync 1\n", self->filename.c_str(), self->pHeader->committedVersion); + debug_printf("DWALPager(%s) commit version %" PRId64 " sync 1\n", self->filename.c_str(), self->pHeader->committedVersion); // Update header on disk and sync again. wait(self->writeHeaderPage(0, self->headerPage)); @@ -1220,7 +1385,7 @@ public: wait(delay(0, TaskPriority::DiskWrite)); } wait(self->pageFile->sync()); - debug_printf("COWPager(%s) commit version %" PRId64 " sync 2\n", self->filename.c_str(), self->pHeader->committedVersion); + debug_printf("DWALPager(%s) commit version %" PRId64 " sync 2\n", self->filename.c_str(), self->pHeader->committedVersion); // Update the last committed header for use in the next commit. self->updateCommittedHeader(); @@ -1229,6 +1394,11 @@ public: // Try to expire snapshots up to the oldest version, in case some were being kept around due to being in use, // because maybe some are no longer in use. self->expireSnapshots(self->pHeader->oldestVersion); + + // Start unmapping pages for expired versions + self->remapUndoStop = false; + self->remapUndoFuture = undoRemaps(self); + return Void(); } @@ -1252,20 +1422,21 @@ public: pHeader->setMetaKey(metaKey); } - ACTOR void shutdown(COWPager *self, bool dispose) { + ACTOR void shutdown(DWALPager *self, bool dispose) { self->recoverFuture.cancel(); self->commitFuture.cancel(); + self->remapUndoFuture.cancel(); - if(self->errorPromise.canBeSet()) + if(self->errorPromise.canBeSet()) { self->errorPromise.sendError(actor_cancelled()); // Ideally this should be shutdown_in_progress + } + self->operations.clear(); // Destroy the cache, cancelling reads and writes in progress self->pageCache.destroy(); - wait(ready(self->operations.signal())); - + // Unreference the file and clear self->pageFile.clear(); - if(dispose) { wait(IAsyncFileSystem::filesystem()->incrementalDeleteFile(self->filename, true)); } @@ -1306,10 +1477,13 @@ public: } // Get the number of pages in use but not by the pager itself. - int64_t getUserPageCount() { - int userPages = pHeader->pageCount - 2 - freeList.numPages - freeList.numEntries - delayedFreeList.numPages - delayedFreeList.numEntries; - debug_printf("COWPager(%s) userPages=%" PRId64 " totalPageCount=%" PRId64 " freeQueuePages=%" PRId64 " freeQueueCount=%" PRId64 " delayedFreeQueuePages=%" PRId64 " delayedFreeQueueCount=%" PRId64 "\n", filename.c_str(), userPages, pHeader->pageCount, freeList.numPages, freeList.numEntries, delayedFreeList.numPages, delayedFreeList.numEntries); - return userPages; + Future getUserPageCount() override { + return map(remapUndoFuture, [=](Void) { + int64_t userPages = pHeader->pageCount - 2 - freeList.numPages - freeList.numEntries - delayedFreeList.numPages - delayedFreeList.numEntries - remapQueue.numPages; + debug_printf("DWALPager(%s) userPages=%" PRId64 " totalPageCount=%" PRId64 " freeQueuePages=%" PRId64 " freeQueueCount=%" PRId64 " delayedFreeQueuePages=%" PRId64 " delayedFreeQueueCount=%" PRId64 " remapQueuePages=%" PRId64 " remapQueueCount=%" PRId64 "\n", + filename.c_str(), userPages, pHeader->pageCount, freeList.numPages, freeList.numEntries, delayedFreeList.numPages, delayedFreeList.numEntries, remapQueue.numPages, remapQueue.numEntries); + return userPages; + }); } Future init() override { @@ -1321,7 +1495,7 @@ public: } private: - ~COWPager() {} + ~DWALPager() {} // Try to expire snapshots up to but not including v, but do not expire any snapshots that are in use. void expireSnapshots(Version v); @@ -1335,6 +1509,7 @@ private: int64_t pageCount; FIFOQueue::QueueState freeList; FIFOQueue::QueueState delayedFreeList; + FIFOQueue::QueueState remapQueue; Version committedVersion; Version oldestVersion; int32_t metaKeySize; @@ -1410,18 +1585,23 @@ private: Future commitFuture; SignalableActorCollection operations; Future recoverFuture; + Future remapUndoFuture; + bool remapUndoStop; Reference pageFile; LogicalPageQueueT freeList; + // The delayed free list will be approximately in Version order. // TODO: Make this an ordered container some day. - VersionedLogicalPageQueueT delayedFreeList; + DelayedFreePageQueueT delayedFreeList; + + RemapQueueT remapQueue; struct SnapshotEntry { Version version; Promise expired; - Reference snapshot; + Reference snapshot; }; struct SnapshotEntryLessThanVersion { @@ -1434,22 +1614,25 @@ private: } }; + // TODO: Better data structure + std::unordered_map> remappedPages; + std::deque snapshots; }; // Prevents pager from reusing freed pages from version until the snapshot is destroyed -class COWPagerSnapshot : public IPagerSnapshot, public ReferenceCounted { +class DWALPagerSnapshot : public IPagerSnapshot, public ReferenceCounted { public: - COWPagerSnapshot(COWPager *pager, Key meta, Version version, Future expiredFuture) : pager(pager), metaKey(meta), version(version), expired(expiredFuture) { + DWALPagerSnapshot(DWALPager *pager, Key meta, Version version, Future expiredFuture) : pager(pager), metaKey(meta), version(version), expired(expiredFuture) { } - virtual ~COWPagerSnapshot() { + virtual ~DWALPagerSnapshot() { } Future> getPhysicalPage(LogicalPageID pageID, bool cacheable) override { if(expired.isError()) { throw expired.getError(); } - return map(pager->readPage(pageID, cacheable), [=](Reference p) { + return map(pager->readPageAtVersion(pageID, version, cacheable), [=](Reference p) { return Reference(p); }); } @@ -1463,23 +1646,23 @@ public: } void addref() override { - ReferenceCounted::addref(); + ReferenceCounted::addref(); } void delref() override { - ReferenceCounted::delref(); + ReferenceCounted::delref(); } - COWPager *pager; + DWALPager *pager; Future expired; Version version; Key metaKey; }; -void COWPager::expireSnapshots(Version v) { - debug_printf("COWPager(%s) expiring snapshots through %" PRId64 " snapshot count %d\n", filename.c_str(), v, (int)snapshots.size()); +void DWALPager::expireSnapshots(Version v) { + debug_printf("DWALPager(%s) expiring snapshots through %" PRId64 " snapshot count %d\n", filename.c_str(), v, (int)snapshots.size()); while(snapshots.size() > 1 && snapshots.front().version < v && snapshots.front().snapshot->isSoleOwner()) { - debug_printf("COWPager(%s) expiring snapshot for %" PRId64 " soleOwner=%d\n", filename.c_str(), snapshots.front().version, snapshots.front().snapshot->isSoleOwner()); + debug_printf("DWALPager(%s) expiring snapshot for %" PRId64 " soleOwner=%d\n", filename.c_str(), snapshots.front().version, snapshots.front().snapshot->isSoleOwner()); // The snapshot contract could be made such that the expired promise isn't need anymore. In practice it // probably is already not needed but it will gracefully handle the case where a user begins a page read // with a snapshot reference, keeps the page read future, and drops the snapshot reference. @@ -1488,7 +1671,7 @@ void COWPager::expireSnapshots(Version v) { } } -Reference COWPager::getReadSnapshot(Version v) { +Reference DWALPager::getReadSnapshot(Version v) { ASSERT(!snapshots.empty()); auto i = std::upper_bound(snapshots.begin(), snapshots.end(), v, SnapshotEntryLessThanVersion()); @@ -1499,12 +1682,12 @@ Reference COWPager::getReadSnapshot(Version v) { return i->snapshot; } -void COWPager::addLatestSnapshot() { +void DWALPager::addLatestSnapshot() { Promise expired; snapshots.push_back({ pLastCommittedHeader->committedVersion, expired, - Reference(new COWPagerSnapshot(this, pLastCommittedHeader->getMetaKey(), pLastCommittedHeader->committedVersion, expired.getFuture())) + Reference(new DWALPagerSnapshot(this, pLastCommittedHeader->getMetaKey(), pLastCommittedHeader->committedVersion, expired.getFuture())) }); } @@ -2573,11 +2756,10 @@ public: m_latestCommit = m_init; } - ACTOR static Future incrementalLazyDelete(VersionedBTree *self, bool *pStop = nullptr, unsigned int minPages = 0, int maxPages = std::numeric_limits::max()) { + ACTOR static Future incrementalSubtreeClear(VersionedBTree *self, bool *pStop = nullptr, unsigned int minPages = 0, int maxPages = std::numeric_limits::max()) { // TODO: Is it contractually okay to always to read at the latest version? state Reference snapshot = self->m_pager->getReadSnapshot(self->m_pager->getLatestVersion()); state int freedPages = 0; - loop { // take a page from front of queue state Optional q = wait(self->m_lazyDeleteQueue.pop()); @@ -2630,6 +2812,7 @@ public: } } + debug_printf("LazyDelete: freed %d pages, %s has %" PRId64 " entries\n", freedPages, self->m_lazyDeleteQueue.name.c_str(), self->m_lazyDeleteQueue.numEntries); return freedPages; } @@ -2712,10 +2895,13 @@ public: // When starting a new mutation buffer its start version must be greater than the last write version ASSERT(v > m_writeVersion); m_pBuffer = &m_mutationBuffers[v]; + // Create range representing the entire keyspace. This reduces edge cases to applying mutations // because now all existing keys are within some range in the mutation map. - (*m_pBuffer)[dbBegin.key]; - (*m_pBuffer)[dbEnd.key]; + (*m_pBuffer)[dbBegin.key] = RangeMutation(); + // Setting the dbEnd key to be cleared prevents having to treat a range clear to dbEnd as a special + // case in order to avoid traversing down the rightmost edge of the tree. + (*m_pBuffer)[dbEnd.key].startKeyMutations[0] = SingleKeyMutation(); } else { // It's OK to set the write version to the same version repeatedly so long as m_pBuffer is not null @@ -2733,27 +2919,40 @@ public: ACTOR static Future destroyAndCheckSanity_impl(VersionedBTree *self) { ASSERT(g_network->isSimulated()); + debug_printf("Clearing tree.\n"); self->setWriteVersion(self->getLatestVersion() + 1); self->clear(KeyRangeRef(dbBegin.key, dbEnd.key)); loop { - int freedPages = wait(self->incrementalLazyDelete(self)); - debug_printf("incrementalLazyDelete freed %d\n", freedPages); + state int freedPages = wait(self->incrementalSubtreeClear(self)); wait(self->commit()); - if(self->m_lazyDeleteQueue.numEntries == 0) { + // Keep looping until the last commit doesn't do anything at all + if(self->m_lazyDeleteQueue.numEntries == 0 && freedPages == 0) { break; } self->setWriteVersion(self->getLatestVersion() + 1); } + // Forget all but the latest version of the tree. + debug_printf("Discarding all old versions.\n"); + self->setOldestVersion(self->getLastCommittedVersion()); + self->setWriteVersion(self->getLatestVersion() + 1); + wait(self->commit()); + + // The lazy delete queue should now be empty and contain only the new page to start writing to + // on the next commit. LazyDeleteQueueT::QueueState s = self->m_lazyDeleteQueue.getState(); ASSERT(s.numEntries == 0); ASSERT(s.numPages == 1); - debug_printf("rootPageCount %d\n", self->m_header.root.count); + // The btree should now be a single non-oversized root page. ASSERT(self->m_header.height == 1); - // All that should be in use now is the root page and the lazy delete queue empty page. - ASSERT(((COWPager *)self->m_pager)->getUserPageCount() == self->m_header.root.count + 1); + ASSERT(self->m_header.root.count == 1); + + // From the pager's perspective the only pages that should be in use are the btree root and + // the previously mentioned lazy delete queue page. + int64_t userPageCount = wait(self->m_pager->getUserPageCount()); + ASSERT(userPageCount == 2); return Void(); } @@ -2938,6 +3137,18 @@ private: // A clear range version, if cleared, for the range starting immediately AFTER the start key Optional rangeClearVersion; + bool keyCleared() const { + return startKeyMutations.size() == 1 && startKeyMutations.begin()->second.isClear(); + } + + bool keyChanged() const { + return !startKeyMutations.empty(); + } + + bool rangeCleared() const { + return rangeClearVersion.present(); + } + // Returns true if this RangeMutation doesn't actually mutate anything bool noChanges() const { return !rangeClearVersion.present() && startKeyMutations.empty(); @@ -3029,22 +3240,6 @@ private: LazyDeleteQueueT m_lazyDeleteQueue; int m_maxPartSize; - void printMutationBuffer(MutationBufferT::const_iterator begin, MutationBufferT::const_iterator end) const { -#if REDWOOD_DEBUG - debug_printf("-------------------------------------\n"); - debug_printf("BUFFER\n"); - while(begin != end) { - debug_printf("'%s': %s\n", printable(begin->first).c_str(), begin->second.toString().c_str()); - ++begin; - } - debug_printf("-------------------------------------\n"); -#endif - } - - void printMutationBuffer(MutationBufferT *buf) const { - return printMutationBuffer(buf->begin(), buf->end()); - } - // Find or create a mutation buffer boundary for bound and return an iterator to it MutationBufferT::iterator insertMutationBoundary(Key boundary) { ASSERT(m_pBuffer != nullptr); @@ -3213,7 +3408,7 @@ private: rptr += blockSize; pages.push_back(std::move(page)); } - delete (uint8_t *)btPage; + delete [] (uint8_t *)btPage; } // Write this btree page, which is made of 1 or more pager pages. @@ -3306,7 +3501,7 @@ private: } virtual ~SuperPage() { - delete m_data; + delete [] m_data; } virtual void addref() const { @@ -3409,13 +3604,27 @@ private: state MutationBufferT::const_iterator iMutationBoundaryEnd = mutationBuffer->lower_bound(upperBound->key); if(REDWOOD_DEBUG) { - self->printMutationBuffer(iMutationBoundary, iMutationBoundaryEnd); + debug_printf("%s ---------MUTATION BUFFER SLICE ---------------------\n", context.c_str()); + auto begin = iMutationBoundary; + while(1) { + debug_printf("%s Mutation: '%s': %s\n", context.c_str(), printable(begin->first).c_str(), begin->second.toString().c_str()); + if(begin == iMutationBoundaryEnd) { + break; + } + ++begin; + } + debug_printf("%s -------------------------------------\n", context.c_str()); } - // If the boundary range iterators are the same then upperbound and lowerbound have the same key. - // If the key is being mutated, them remove this subtree. + // iMutationBoundary is greatest boundary <= lowerBound->key + // iMutationBoundaryEnd is least boundary >= upperBound->key + + // If the boundary range iterators are the same then this subtree only has one unique key, which is the same key as the boundary + // record the iterators are pointing to. There only two outcomes possible: Clearing the subtree or leaving it alone. + // If there are any changes to the one key then the entire subtree should be deleted as the changes for the key + // do not go into this subtree. if(iMutationBoundary == iMutationBoundaryEnd) { - if(!iMutationBoundary->second.startKeyMutations.empty()) { + if(iMutationBoundary->second.keyChanged()) { debug_printf("%s lower and upper bound key/version match and key is modified so deleting page, returning %s\n", context.c_str(), toString(results).c_str()); Version firstKeyChangeVersion = self->singleVersion ? self->getLastCommittedVersion() + 1 : iMutationBoundary->second.startKeyMutations.begin()->first; if(isLeaf) { @@ -3433,27 +3642,64 @@ private: return results; } + // If one mutation range covers the entire subtree, then check if the entire subtree is modified, + // unmodified, or possibly/partially modified. MutationBufferT::const_iterator iMutationBoundaryNext = iMutationBoundary; ++iMutationBoundaryNext; - // If one mutation range covers the entire page if(iMutationBoundaryNext == iMutationBoundaryEnd) { - // If there are no changes in the range (no clear, no boundary key mutations) - // OR there are changes but for a key that is less than the page lower boundary and therefore not part of this page - if(iMutationBoundary->second.noChanges() || - ( !iMutationBoundary->second.rangeClearVersion.present() && iMutationBoundary->first < lowerBound->key) - ) { + // Cleared means the entire range covering the subtree was cleared. It is assumed true + // if the range starting after the lower mutation boundary was cleared, and then proven false + // below if possible. + bool cleared = iMutationBoundary->second.rangeCleared(); + // Unchanged means the entire range covering the subtree was unchanged, it is assumed to be the + // opposite of cleared() and then proven false below if possible. + bool unchanged = !cleared; + debug_printf("%s cleared=%d unchanged=%d\n", context.c_str(), cleared, unchanged); + + // If the lower mutation boundary key is the same as the subtree lower bound then whether or not + // that key is being changed or cleared affects this subtree. + if(iMutationBoundary->first == lowerBound->key) { + // If subtree will be cleared (so far) but the lower boundary key is not cleared then the subtree is not cleared + if(cleared && !iMutationBoundary->second.keyCleared()) { + cleared = false; + debug_printf("%s cleared=%d unchanged=%d\n", context.c_str(), cleared, unchanged); + } + // If the subtree looked unchanged (so far) but the lower boundary is is changed then the subtree is changed + if(unchanged && iMutationBoundary->second.keyChanged()) { + unchanged = false; + debug_printf("%s cleared=%d unchanged=%d\n", context.c_str(), cleared, unchanged); + } + } + + // If the higher mutation boundary key is the same as the subtree upper bound key then whether + // or not it is being changed or cleared affects this subtree. + if((cleared || unchanged) && iMutationBoundaryEnd->first == upperBound->key) { + // If the key is being changed then the records in this subtree with the same key must be removed + // so the subtree is definitely not unchanged, though it may be cleared to achieve the same effect. + if(iMutationBoundaryEnd->second.keyChanged()) { + unchanged = false; + debug_printf("%s cleared=%d unchanged=%d\n", context.c_str(), cleared, unchanged); + } + else { + // If the key is not being changed then the records in this subtree can't be removed so the + // subtree is not being cleared. + cleared = false; + debug_printf("%s cleared=%d unchanged=%d\n", context.c_str(), cleared, unchanged); + } + } + + // The subtree cannot be both cleared and unchanged. + ASSERT(!(cleared && unchanged)); + + // If no changes in subtree + if(unchanged) { results.push_back_deep(results.arena(), VersionAndChildrenRef(0, VectorRef((RedwoodRecordRef *)decodeLowerBound, 1), *decodeUpperBound)); debug_printf("%s no changes on this subtree, returning %s\n", context.c_str(), toString(results).c_str()); return results; } - // If the range is cleared and there either no sets or the sets aren't relevant to this subtree then delete it - // The last if subexpression is checking that either the next key in the mutation buffer is being changed or - // the upper bound key of this page isn't the same. - if(iMutationBoundary->second.rangeClearVersion.present() - && (iMutationBoundary->second.startKeyMutations.empty() || iMutationBoundary->first < lowerBound->key) - && (!iMutationBoundaryEnd->second.startKeyMutations.empty() || upperBound->key != iMutationBoundaryEnd->first) - ) { + // If subtree is cleared + if(cleared) { debug_printf("%s %s cleared, deleting it, returning %s\n", context.c_str(), isLeaf ? "Page" : "Subtree", toString(results).c_str()); Version clearVersion = self->singleVersion ? self->getLastCommittedVersion() + 1 : iMutationBoundary->second.rangeClearVersion.get(); if(isLeaf) { @@ -3475,53 +3721,45 @@ private: cursor.moveFirst(); state Version writeVersion; - state bool isRoot = (rootID == self->m_header.root.get()); // Leaf Page if(page->flags & BTreePage::IS_LEAF) { ASSERT(isLeaf); state Standalone> merged; - debug_printf("%s MERGING EXISTING DATA WITH MUTATIONS:\n", context.c_str()); - if(REDWOOD_DEBUG) { - self->printMutationBuffer(iMutationBoundary, iMutationBoundaryEnd); - } - - // It's a given that the mutation map is not empty so it's safe to do this - Key mutationRangeStart = iMutationBoundary->first; + debug_printf("%s Leaf page, merging changes.\n", context.c_str()); // If replacement pages are written they will be at the minimum version seen in the mutations for this leaf Version minVersion = invalidVersion; - int changes = 0; // Now, process each mutation range and merge changes with existing data. + bool firstMutationBoundary = true; while(iMutationBoundary != iMutationBoundaryEnd) { debug_printf("%s New mutation boundary: '%s': %s\n", context.c_str(), printable(iMutationBoundary->first).c_str(), iMutationBoundary->second.toString().c_str()); SingleKeyMutationsByVersion::const_iterator iMutations; - // If the mutation boundary key is less than the lower bound key then skip startKeyMutations for - // this bounary, we're only processing this mutation range here to apply any clears to existing data. - if(iMutationBoundary->first < lowerBound->key) { + // For the first mutation boundary only, if the boundary key is less than the lower bound for the page + // then skip startKeyMutations for this boundary, we're only processing this mutation range here to apply + // a possible clear to existing data. + if(firstMutationBoundary && iMutationBoundary->first < lowerBound->key) { iMutations = iMutationBoundary->second.startKeyMutations.end(); } - // If the mutation boundary key is the same as the page lowerBound key then start reading single - // key mutations at the first version greater than the lowerBound key's version. - else if(!self->singleVersion && iMutationBoundary->first == lowerBound->key) { - iMutations = iMutationBoundary->second.startKeyMutations.upper_bound(lowerBound->version); - } else { iMutations = iMutationBoundary->second.startKeyMutations.begin(); } + firstMutationBoundary = false; SingleKeyMutationsByVersion::const_iterator iMutationsEnd = iMutationBoundary->second.startKeyMutations.end(); // Iterate over old versions of the mutation boundary key, outputting if necessary + bool boundaryKeyWritten = false; while(cursor.valid() && cursor.get().key == iMutationBoundary->first) { // If not in single version mode or there were no changes to the key if(!self->singleVersion || iMutationBoundary->second.noChanges()) { merged.push_back(merged.arena(), cursor.get()); debug_printf("%s Added %s [existing, boundary start]\n", context.c_str(), merged.back().toString().c_str()); + boundaryKeyWritten = true; } else { ASSERT(self->singleVersion); @@ -3536,16 +3774,26 @@ private: while(iMutations != iMutationsEnd) { const SingleKeyMutation &m = iMutations->second; if(m.isClear() || m.value.size() <= self->m_maxPartSize) { - if(iMutations->first < minVersion || minVersion == invalidVersion) - minVersion = iMutations->first; - ++changes; - merged.push_back(merged.arena(), m.toRecord(iMutationBoundary->first, iMutations->first)); - debug_printf("%s Added non-split %s [mutation, boundary start]\n", context.c_str(), merged.back().toString().c_str()); + // If the boundary key was not yet written to the merged list then clears can be skipped. + // Note that in a more complex scenario where there are multiple sibling pages for the same key, with different + // versions and/or part numbers, this is still a valid thing to do. This is because a changing boundary + // key (set or clear) will result in any instances (different versions, split parts) of this key + // on sibling pages to the left of this page to be removed, so an explicit clear need only be stored + // if a record with the mutation boundary key was already written to this page. + if(!boundaryKeyWritten && iMutations->second.isClear()) { + debug_printf("%s Skipped %s [mutation, unnecessary boundary key clear]\n", context.c_str(), m.toRecord(iMutationBoundary->first, iMutations->first).toString().c_str()); + } + else { + merged.push_back(merged.arena(), m.toRecord(iMutationBoundary->first, iMutations->first)); + debug_printf("%s Added non-split %s [mutation, boundary start]\n", context.c_str(), merged.back().toString().c_str()); + if(iMutations->first < minVersion || minVersion == invalidVersion) + minVersion = iMutations->first; + boundaryKeyWritten = true; + } } else { if(iMutations->first < minVersion || minVersion == invalidVersion) minVersion = iMutations->first; - ++changes; int bytesLeft = m.value.size(); int start = 0; RedwoodRecordRef whole(iMutationBoundary->first, iMutations->first, m.value); @@ -3557,6 +3805,7 @@ private: start += partSize; debug_printf("%s Added split %s [mutation, boundary start] bytesLeft %d\n", context.c_str(), merged.back().toString().c_str(), bytesLeft); } + boundaryKeyWritten = true; } ++iMutations; } @@ -3597,7 +3846,6 @@ private: Version clearVersion = clearRangeVersion.get(); if(clearVersion < minVersion || minVersion == invalidVersion) minVersion = clearVersion; - ++changes; merged.push_back(merged.arena(), RedwoodRecordRef(cursor.get().key, clearVersion)); debug_printf("%s Added %s [existing, middle clear]\n", context.c_str(), merged.back().toString().c_str()); } @@ -3610,28 +3858,38 @@ private: } // Write any remaining existing keys, which are not subject to clears as they are beyond the cleared range. + bool upperMutationBoundaryKeyChanged = iMutationBoundaryEnd->second.keyChanged(); while(cursor.valid()) { + // If the upper mutation boundary is being changed and the cursor's key matches it then stop because none of the earlier + // versions or fragments of that key should be written. + if(upperMutationBoundaryKeyChanged && cursor.get().key == iMutationBoundaryEnd->first) { + debug_printf("%s Skipped %s and beyond [existing, matches changed upper mutation boundary]\n", context.c_str(), cursor.get().toString().c_str()); + Version changedVersion = iMutationBoundaryEnd->second.startKeyMutations.begin()->first; + if(changedVersion < minVersion || minVersion == invalidVersion) + minVersion = changedVersion; + break; + } merged.push_back(merged.arena(), cursor.get()); debug_printf("%s Added %s [existing, tail]\n", context.c_str(), merged.back().toString().c_str()); cursor.moveNext(); } - debug_printf("%s Done merging mutations into existing leaf contents, made %d changes\n", context.c_str(), changes); - // No changes were actually made. This could happen if the only mutations are clear ranges which do not match any records. if(minVersion == invalidVersion) { results.push_back_deep(results.arena(), VersionAndChildrenRef(0, VectorRef((RedwoodRecordRef *)decodeLowerBound, 1), *decodeUpperBound)); debug_printf("%s No changes were made during mutation merge, returning %s\n", context.c_str(), toString(results).c_str()); - ASSERT(changes == 0); return results; } + else { + debug_printf("%s Changes were made, writing.\n", context.c_str()); + } // TODO: Make version and key splits based on contents of merged list, if keeping history writeVersion = self->singleVersion ? self->getLastCommittedVersion() + 1 : minVersion; // If everything in the page was deleted then this page should be deleted as of the new version // Note that if a single range clear covered the entire page then we should not get this far - if(merged.empty() && !isRoot) { + if(merged.empty()) { debug_printf("%s All leaf page contents were cleared, returning %s\n", context.c_str(), toString(results).c_str()); self->freeBtreePage(rootID, writeVersion); return results; @@ -3802,16 +4060,12 @@ private: debug_printf("%s: Beginning commit of version %" PRId64 ", new oldest version set to %" PRId64 "\n", self->m_name.c_str(), writeVersion, self->m_newOldestVersion); state bool lazyDeleteStop = false; - state Future lazyDelete = incrementalLazyDelete(self, &lazyDeleteStop); + state Future lazyDelete = incrementalSubtreeClear(self, &lazyDeleteStop); // Get the latest version from the pager, which is what we will read at state Version latestVersion = self->m_pager->getLatestVersion(); debug_printf("%s: pager latestVersion %" PRId64 "\n", self->m_name.c_str(), latestVersion); - if(REDWOOD_DEBUG) { - self->printMutationBuffer(mutations); - } - state Standalone rootPageID = self->m_header.root.get(); state RedwoodRecordRef lowerBound = dbBegin.withPageID(rootPageID); Standalone versionedRoots = wait(commitSubtree(self, mutations, self->m_pager->getReadSnapshot(latestVersion), rootPageID, self->m_header.height == 1, &lowerBound, &dbEnd, &lowerBound, &dbEnd)); @@ -4364,12 +4618,12 @@ private: return Void(); } + debug_printf("readFullKVPair: Split, first record %s\n", rec.toString().c_str()); + // Split value, need to coalesce split value parts into a buffer in arena, // after which cur1 will point to the first part and kv.key will reference its key ASSERT(rec.chunk.start + rec.value.get().size() == rec.chunk.total); - debug_printf("readFullKVPair: Split, totalsize %d %s\n", rec.chunk.total, self->toString().c_str()); - // Allocate space for the entire value in the same arena as the key state int bytesLeft = rec.chunk.total; state StringRef dst = makeString(bytesLeft, self->m_arena); @@ -4400,23 +4654,11 @@ RedwoodRecordRef VersionedBTree::dbBegin(StringRef(), 0); RedwoodRecordRef VersionedBTree::dbEnd(LiteralStringRef("\xff\xff\xff\xff\xff")); VersionedBTree::Counts VersionedBTree::counts; -ACTOR template -Future catchError(Promise error, Future f) { - try { - T result = wait(f); - return result; - } catch(Error &e) { - if(e.code() != error_code_actor_cancelled && error.canBeSet()) - error.sendError(e); - throw; - } -} - class KeyValueStoreRedwoodUnversioned : public IKeyValueStore { public: KeyValueStoreRedwoodUnversioned(std::string filePrefix, UID logID) : m_filePrefix(filePrefix) { // TODO: This constructor should really just take an IVersionedStore - IPager2 *pager = new COWPager(4096, filePrefix, 0); + IPager2 *pager = new DWALPager(4096, filePrefix, 0); m_tree = new VersionedBTree(pager, filePrefix, true); m_init = catchError(init_impl(this)); } @@ -4577,7 +4819,7 @@ private: Promise m_error; template inline Future catchError(Future f) { - return ::catchError(m_error, f); + return forwardError(f, m_error); } }; @@ -5397,6 +5639,7 @@ TEST_CASE("!/redwood/correctness/btree") { state int maxCommitSize = shortTest ? 1000 : randomSize(std::min((maxKeySize + maxValueSize) * 20000, 10e6)); state int mutationBytesTarget = shortTest ? 5000 : randomSize(std::min(maxCommitSize * 100, 100e6)); state double clearProbability = deterministicRandom()->random01() * .1; + state double clearPostSetProbability = deterministicRandom()->random01() * .1; state double coldStartProbability = deterministicRandom()->random01(); state double advanceOldVersionProbability = deterministicRandom()->random01(); state double maxWallClockDuration = 60; @@ -5411,6 +5654,7 @@ TEST_CASE("!/redwood/correctness/btree") { printf("maxCommitSize: %d\n", maxCommitSize); printf("mutationBytesTarget: %d\n", mutationBytesTarget); printf("clearProbability: %f\n", clearProbability); + printf("clearPostSetProbability: %f\n", clearPostSetProbability); printf("coldStartProbability: %f\n", coldStartProbability); printf("advanceOldVersionProbability: %f\n", advanceOldVersionProbability); printf("\n"); @@ -5420,7 +5664,7 @@ TEST_CASE("!/redwood/correctness/btree") { printf("Initializing...\n"); state double startTime = timer(); - pager = new COWPager(pageSize, pagerFile, 0); + pager = new DWALPager(pageSize, pagerFile, 0); state VersionedBTree *btree = new VersionedBTree(pager, pagerFile, singleVersion); wait(btree->init()); @@ -5514,6 +5758,22 @@ TEST_CASE("!/redwood/correctness/btree") { } btree->clear(range); + + // Sometimes set the range start after the clear + if(deterministicRandom()->random01() < clearPostSetProbability) { + KeyValue kv = randomKV(0, maxValueSize); + kv.key = range.begin; + btree->set(kv); + written[std::make_pair(kv.key.toString(), version)] = kv.value.toString(); + } + + // Sometimes set the range end after the clear + if(deterministicRandom()->random01() < clearPostSetProbability) { + KeyValue kv = randomKV(0, maxValueSize); + kv.key = range.end; + btree->set(kv); + written[std::make_pair(kv.key.toString(), version)] = kv.value.toString(); + } } else { // Set a key @@ -5597,7 +5857,7 @@ TEST_CASE("!/redwood/correctness/btree") { wait(closedFuture); printf("Reopening btree from disk.\n"); - IPager2 *pager = new COWPager(pageSize, pagerFile, 0); + IPager2 *pager = new DWALPager(pageSize, pagerFile, 0); btree = new VersionedBTree(pager, pagerFile, singleVersion); wait(btree->init()); @@ -5623,6 +5883,7 @@ TEST_CASE("!/redwood/correctness/btree") { debug_printf("Waiting for outstanding commit\n"); wait(commit); committedVersions.sendError(end_of_stream()); + randomTask.cancel(); debug_printf("Waiting for verification to complete.\n"); wait(verifyTask); @@ -5630,8 +5891,11 @@ TEST_CASE("!/redwood/correctness/btree") { if(errorCount != 0) throw internal_error(); + wait(btree->destroyAndCheckSanity()); + Future closedFuture = btree->onClosed(); btree->close(); + debug_printf("Closing.\n"); wait(closedFuture); return Void(); @@ -5660,7 +5924,7 @@ TEST_CASE("!/redwood/correctness/pager/cow") { deleteFile(pagerFile); int pageSize = 4096; - state IPager2 *pager = new COWPager(pageSize, pagerFile, 0); + state IPager2 *pager = new DWALPager(pageSize, pagerFile, 0); wait(success(pager->init())); state LogicalPageID id = wait(pager->newPageID()); @@ -5687,7 +5951,7 @@ TEST_CASE("!/redwood/performance/set") { deleteFile(pagerFile); int pageSize = 4096; - IPager2 *pager = new COWPager(pageSize, pagerFile, FLOW_KNOBS->PAGE_CACHE_4K / pageSize); + IPager2 *pager = new DWALPager(pageSize, pagerFile, FLOW_KNOBS->PAGE_CACHE_4K / pageSize); state bool singleVersion = true; state VersionedBTree *btree = new VersionedBTree(pager, pagerFile, singleVersion); wait(btree->init()); @@ -5740,6 +6004,7 @@ TEST_CASE("!/redwood/performance/set") { } if(kvBytes >= commitTarget) { + btree->setOldestVersion(btree->getLastCommittedVersion()); wait(commit); printf("Cumulative %.2f MB keyValue bytes written at %.2f MB/s\n", kvBytesTotal / 1e6, kvBytesTotal / (timer() - start) / 1e6); @@ -5767,6 +6032,7 @@ TEST_CASE("!/redwood/performance/set") { wait(commit); printf("Cumulative %.2f MB keyValue bytes written at %.2f MB/s\n", kvBytesTotal / 1e6, kvBytesTotal / (timer() - start) / 1e6); + printf("Starting random seeks\n"); state int reads = 30000; wait(randomSeeks(btree, reads, firstKeyChar, lastKeyChar) && randomSeeks(btree, reads, firstKeyChar, lastKeyChar) && randomSeeks(btree, reads, firstKeyChar, lastKeyChar));