From aec0d044de9ec257eb28f780efdcee3a2ac5c317 Mon Sep 17 00:00:00 2001 From: Fuheng Zhao Date: Tue, 3 Aug 2021 11:17:14 -0700 Subject: [PATCH] support superpage withou modifying atomic update function --- fdbserver/IPager.h | 28 ++- fdbserver/VersionedBTree.actor.cpp | 308 ++++++++++++++++------------- 2 files changed, 186 insertions(+), 150 deletions(-) diff --git a/fdbserver/IPager.h b/fdbserver/IPager.h index c5558613bc..0fe88e1c73 100644 --- a/fdbserver/IPager.h +++ b/fdbserver/IPager.h @@ -165,8 +165,8 @@ public: class IPagerSnapshot { public: virtual Future> getPhysicalPage(PagerEventReasons reason, - unsigned int level, - LogicalPageID pageID, + uint8_t level, + VectorRef pageIDs, int priority, bool cacheable, bool nohit) = 0; @@ -185,7 +185,7 @@ public: class IPager2 : public IClosable { public: // Returns an ArenaPage that can be passed to writePage. The data in the returned ArenaPage might not be zeroed. - virtual Reference newPageBuffer() = 0; + virtual Reference newPageBuffer(size_t size= 1) = 0; // Returns the usable size of pages returned by the pager (i.e. the size of the page that isn't pager overhead). // For a given pager instance, separate calls to this function must return the same value. @@ -198,6 +198,7 @@ public: // Allocate a new page ID for a subsequent write. The page will be considered in-use after the next commit // regardless of whether or not it was written to. virtual Future newPageID() = 0; + virtual Future>> newPageIDs(size_t size) = 0; virtual Future newExtentPageID(QueueID queueID) = 0; virtual QueueID newLastQueueID() = 0; @@ -206,16 +207,19 @@ public: // Existing holders of a page reference for pageID, read from any version, // may see the effects of this write. virtual void updatePage(PagerEventReasons reason, - unsigned int level, - LogicalPageID pageID, + uint8_t level, + Standalone> pageIDs, Reference data) = 0; + void updatePage(PagerEventReasons reason, uint8_t level, LogicalPageID pageID, Reference data) { + updatePage(reason, level, VectorRef(&pageID, 1), data); + } // Try to atomically update the contents of a page as of version v in the next commit. // If the pager is unable to do this at this time, it may choose to write the data to a new page ID // instead and return the new page ID to the caller. Otherwise the original pageID argument will be returned. // If a new page ID is returned, the old page ID will be freed as of version v virtual Future atomicUpdatePage(PagerEventReasons reason, - unsigned int level, + uint8_t level, LogicalPageID pageID, Reference data, Version v) = 0; @@ -237,11 +241,19 @@ public: // NoHit indicates that the read should not be considered a cache hit, such as when preloading pages that are // considered likely to be needed soon. virtual Future> readPage(PagerEventReasons reason, - unsigned int level, - LogicalPageID pageID, + uint8_t level, + Standalone> pageIDs, int priority, bool cacheable, bool noHit) = 0; + Future> readPage(PagerEventReasons reason, + uint8_t level, + LogicalPageID pageID, + int priority, + bool cacheable, + bool noHit) { + return readPage(reason, level, VectorRef(&pageID, 1), priority, cacheable, noHit); + } virtual Future> readExtent(LogicalPageID pageID) = 0; virtual void releaseExtentReadLock() = 0; diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 1ae59e720d..73742d192e 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -1464,7 +1464,7 @@ struct RedwoodMetrics { return eventReasons[(size_t)event][(size_t)reason]; } - std::string toString(int level, double elapsed) const { + std::string toString(uint8_t level, double elapsed) const { std::string result; const auto& pairs = (level == 0 ? L0PossibleEventReasonPairs : possibleEventReasonPairs); @@ -1488,7 +1488,7 @@ struct RedwoodMetrics { return result; } - void toTraceEvent(TraceEvent* t, int level) const { + void toTraceEvent(TraceEvent* t, uint8_t level) const { const auto& pairs = (level == 0 ? L0PossibleEventReasonPairs : possibleEventReasonPairs); for (const auto& p : pairs) { std::string name = @@ -1528,7 +1528,7 @@ struct RedwoodMetrics { Level() { clear(); } - void clear(int level = 0) { + void clear(uint8_t level = 0) { metrics = {}; if (level > 0) { @@ -1593,7 +1593,7 @@ struct RedwoodMetrics { } void clear() { - unsigned int levelCounter = 0; + uint8_t levelCounter = 0; for (RedwoodMetrics::Level& level : levels) { level.clear(levelCounter); ++levelCounter; @@ -1621,12 +1621,12 @@ struct RedwoodMetrics { return metric.pagerDiskWrite + metric.pagerDiskRead + metric.pagerCacheHit + metric.pagerProbeHit; } - Level& level(unsigned int level) { + Level& level(uint8_t level) { // Valid levels are from 0 - btreeLevels // Level 0 is for operations that are not BTree level specific, as many of the metrics are the same // Level 0 - btreeLevels correspond to BTree node height, however heights above btreeLevels are combined // into the level at btreeLevels - return levels[std::min(level, btreeLevels)]; + return levels[std::min((unsigned int)level, btreeLevels)]; } void updateMaxRecordCount(int maxRecords) { @@ -1788,17 +1788,18 @@ template class ObjectCache : NonCopyable { struct Entry : public boost::intrusive::list_base_hook<> { - Entry() : hits(0) {} + Entry() : hits(0), size(1) {} IndexType index; ObjectType item; int hits; + int size; }; typedef std::unordered_map CacheT; typedef boost::intrusive::list EvictionOrderT; public: - ObjectCache(int sizeLimit = 1) : sizeLimit(sizeLimit) {} + ObjectCache(int sizeLimit = 1) : sizeLimit(sizeLimit), currentSize(0) {} void setSizeLimit(int n) { ASSERT(n > 0); @@ -1838,6 +1839,7 @@ public: if (toEvict.hits == 0) { ++g_redwoodMetrics.metric.pagerEvictUnhit; } + currentSize -= toEvict.size; evictionOrder.erase(evictionOrder.iterator_to(toEvict)); cache.erase(i); return true; @@ -1847,7 +1849,7 @@ public: // After a get(), the object for i is the last in evictionOrder. // If noHit is set, do not consider this access to be cache hit if the object is present // If noMiss is set, do not consider this access to be a cache miss if the object is not present - ObjectType& get(const IndexType& index, bool noHit = false) { + ObjectType& get(const IndexType& index, int size, bool noHit = false) { Entry& entry = cache[index]; // If entry is linked into evictionOrder then move it to the back of the order @@ -1863,11 +1865,13 @@ public: // Finish initializing entry entry.index = index; entry.hits = 0; + entry.size = size; + currentSize += size; // Insert the newly created Entry at the back of the eviction order evictionOrder.push_back(entry); // While the cache is too big, evict the oldest entry until the oldest entry can't be evicted. - while (cache.size() > sizeLimit) { + while (currentSize > sizeLimit) { Entry& toEvict = evictionOrder.front(); // It's critical that we do not evict the item we just added because it would cause the reference @@ -1892,6 +1896,7 @@ public: if (toEvict.hits == 0) { ++g_redwoodMetrics.metric.pagerEvictUnhit; } + currentSize -= toEvict.size; debug_printf( "Evicting %s to make room for %s\n", toString(toEvict.index).c_str(), toString(index).c_str()); evictionOrder.pop_front(); @@ -1907,6 +1912,7 @@ public: ACTOR static Future clear_impl(ObjectCache* self) { state ObjectCache::CacheT cache; state EvictionOrderT evictionOrder; + state int64_t currentSize; // Swap cache contents to local state vars // After this, no more entries will be added to or read from these @@ -1914,20 +1920,23 @@ public: // after it is either evictable or onEvictable() is ready. cache.swap(self->cache); evictionOrder.swap(self->evictionOrder); + currentSize = self->currentSize; state typename EvictionOrderT::iterator i = evictionOrder.begin(); - state typename EvictionOrderT::iterator iEnd = evictionOrder.begin(); + state typename EvictionOrderT::iterator iEnd = evictionOrder.end(); while (i != iEnd) { if (!i->item.evictable()) { wait(i->item.onEvictable()); } + currentSize -= i->size; + self->currentSize -= i->size; ++i; } evictionOrder.clear(); cache.clear(); - + ASSERT(currentSize == 0); return Void(); } @@ -1940,7 +1949,7 @@ public: private: int64_t sizeLimit; - + int64_t currentSize; CacheT cache; EvictionOrderT evictionOrder; }; @@ -2374,8 +2383,8 @@ public: return id; } - Reference newPageBuffer() override { - return Reference(new ArenaPage(logicalPageSize, physicalPageSize)); + Reference newPageBuffer(size_t size=1) override { + return Reference(new ArenaPage(logicalPageSize * size, physicalPageSize * size)); } // Returns the usable size of pages returned by the pager (i.e. the size of the page that isn't pager overhead). @@ -2426,7 +2435,23 @@ public: } Future newPageID() override { return newPageID_impl(this); } - + + Future>> newPageIDs(size_t size) override { + return newPageIDs_impl(this, size); + } + ACTOR static Future>> newPageIDs_impl(DWALPager* self, size_t size) { + state Standalone> newPages; + state size_t i = 0; + for (; i < size; ++i) { + LogicalPageID id = wait(self->newPageID()); + newPages.push_back(newPages.arena(), id); + } + debug_printf("DWALPager(%s) newPageIDs_impl(%d) returning %s at end of file\n", + self->filename.c_str(), + size, + toString(newPages).c_str()); + return newPages; + } // Get a new, previously available extent and it's first page ID. The page will be considered in-use after the next // commit regardless of whether or not it was written to, until it is returned to the pager via freePage() ACTOR static Future newExtentPageID_impl(DWALPager* self, QueueID queueID) { @@ -2464,23 +2489,23 @@ public: ACTOR static Future writePhysicalPage_impl(DWALPager* self, PagerEventReasons reason, - unsigned int level, - PhysicalPageID pageID, + uint8_t level, + Standalone> pageIDs, Reference page, bool header = false) { debug_printf("DWALPager(%s) op=%s %s ptr=%p\n", self->filename.c_str(), (header ? "writePhysicalHeader" : "writePhysical"), - toString(pageID).c_str(), + toString(pageIDs).c_str(), page->begin()); VALGRIND_MAKE_MEM_DEFINED(page->begin(), page->size()); - page->updateChecksum(pageID); + page->updateChecksum(pageIDs.front()); debug_printf("DWALPager(%s) writePhysicalPage %s CalculatedChecksum=%d ChecksumInPage=%d\n", self->filename.c_str(), - toString(pageID).c_str(), - page->calculateChecksum(pageID), + toString(pageIDs).c_str(), + page->calculateChecksum(pageIDs.front()), page->getChecksum()); state PriorityMultiLock::Lock lock = wait(self->ioLock.lock(header ? ioMaxPriority : ioMinPriority)); @@ -2493,43 +2518,56 @@ public: // Note: Not using forwardError here so a write error won't be discovered until commit time. state int blockSize = header ? smallestPhysicalBlock : self->physicalPageSize; - wait(self->pageFile->write(page->begin(), blockSize, (int64_t)pageID * blockSize)); - - debug_printf("DWALPager(%s) op=%s %s ptr=%p file offset=%d\n", - self->filename.c_str(), - (header ? "writePhysicalHeaderComplete" : "writePhysicalComplete"), - toString(pageID).c_str(), - page->begin(), - (pageID * blockSize)); - + state std::vector> writers; + state int i = 0; + for (const auto& pageID : pageIDs) { + Future p = + self->pageFile->write(page->mutate() + i * blockSize, blockSize, ((int64_t)pageID) * blockSize); + i += 1; + writers.push_back(p); + } + wait(waitForAll(writers)); + if (REDWOOD_DEBUG) { + Standalone> pageIDsCopy = pageIDs; + debug_printf("DWALPager(%s) op=%s %s ptr=%p file offset=%d\n", + self->filename.c_str(), + (header ? "writePhysicalHeaderComplete" : "writePhysicalComplete"), + toString(pageIDsCopy).c_str(), + page->begin(), + (pageIDsCopy.front() * blockSize)); + } return Void(); } Future writePhysicalPage(PagerEventReasons reason, - unsigned int level, - PhysicalPageID pageID, + uint8_t level, + VectorRef pageIDs, Reference page, bool header = false) { - Future f = writePhysicalPage_impl(this, reason, level, pageID, page, header); + Future f = writePhysicalPage_impl(this, reason, level, pageIDs, page, header); operations.add(f); return f; } Future writeHeaderPage(PhysicalPageID pageID, Reference page) { - return writePhysicalPage(PagerEventReasons::MetaData, nonBtreeLevel, pageID, page, true); + return writePhysicalPage(PagerEventReasons::MetaData, nonBtreeLevel, VectorRef(&pageID, 1), page, true); + } + + void updatePage(PagerEventReasons reason, uint8_t level, LogicalPageID pageID, Reference data) { + updatePage(reason, level, VectorRef(&pageID, 1), data); } void updatePage(PagerEventReasons reason, - unsigned int level, - LogicalPageID pageID, + uint8_t level, + Standalone> pageIDs, Reference data) override { // Get the cache entry for this page, without counting it as a cache hit as we're replacing its contents now // or as a cache miss because there is no benefit to the page already being in cache // Similarly, this does not count as a point lookup for reason. - PageCacheEntry& cacheEntry = pageCache.get(pageID, true); + PageCacheEntry& cacheEntry = pageCache.get(pageIDs.front(), pageIDs.size(), true); debug_printf("DWALPager(%s) op=write %s cached=%d reading=%d writing=%d\n", filename.c_str(), - toString(pageID).c_str(), + toString(pageIDs).c_str(), cacheEntry.initialized(), cacheEntry.initialized() && cacheEntry.reading(), cacheEntry.initialized() && cacheEntry.writing()); @@ -2542,11 +2580,11 @@ public: // future reads of the version are not allowed) and the write of the next newest version over top // of the original page begins. if (!cacheEntry.initialized()) { - cacheEntry.writeFuture = writePhysicalPage(reason, level, pageID, data); + cacheEntry.writeFuture = writePhysicalPage(reason, level, pageIDs, data); } else if (cacheEntry.reading()) { // Wait for the read to finish, then start the write. cacheEntry.writeFuture = map(success(cacheEntry.readFuture), [=](Void) { - writePhysicalPage(reason, level, pageID, data); + writePhysicalPage(reason, level, pageIDs, data); return Void(); }); } @@ -2554,11 +2592,11 @@ public: // writes happen in the correct order else if (cacheEntry.writing()) { cacheEntry.writeFuture = map(cacheEntry.writeFuture, [=](Void) { - writePhysicalPage(reason, level, pageID, data); + writePhysicalPage(reason, level, pageIDs, data); return Void(); }); } else { - cacheEntry.writeFuture = writePhysicalPage(reason, level, pageID, data); + cacheEntry.writeFuture = writePhysicalPage(reason, level, pageIDs, data); } // Always update the page contents immediately regardless of what happened above. @@ -2566,7 +2604,7 @@ public: } Future atomicUpdatePage(PagerEventReasons reason, - unsigned int level, + uint8_t level, LogicalPageID pageID, Reference data, Version v) override { @@ -2692,7 +2730,7 @@ public: // If the user chosen physical page size is larger, then there will be a gap of unused space after the header pages // and before the user-chosen sized pages. ACTOR static Future> readPhysicalPage(DWALPager* self, - PhysicalPageID pageID, + Standalone> pageIDs, int priority, bool header) { ASSERT(!self->memoryOnly); @@ -2703,36 +2741,42 @@ public: state Reference page = header ? Reference(new ArenaPage(smallestPhysicalBlock, smallestPhysicalBlock)) - : self->newPageBuffer(); + : self->newPageBuffer(pageIDs.size()); debug_printf("DWALPager(%s) op=readPhysicalStart %s ptr=%p\n", self->filename.c_str(), - toString(pageID).c_str(), + toString(pageIDs).c_str(), page->begin()); state PriorityMultiLock::Lock lock = wait(self->ioLock.lock(std::min(priority, ioMaxPriority))); ++g_redwoodMetrics.metric.pagerDiskRead; // TODO: Could a dispatched read try to write to page after it has been destroyed if this actor is cancelled? - int blockSize = header ? smallestPhysicalBlock : self->physicalPageSize; - int readBytes = wait(self->pageFile->read(page->mutate(), blockSize, (int64_t)pageID * blockSize)); + state int blockSize = header ? smallestPhysicalBlock : self->physicalPageSize; + state int totalReadBytes = 0; + state int i = 0; + for (i = 0; i < pageIDs.size(); i++) { + int readBytes = wait( + self->pageFile->read(page->mutate() + i * blockSize, blockSize, ((int64_t)pageIDs[i]) * blockSize)); + totalReadBytes += readBytes; + } debug_printf("DWALPager(%s) op=readPhysicalComplete %s ptr=%p bytes=%d\n", self->filename.c_str(), - toString(pageID).c_str(), + toString(pageIDs).c_str(), page->begin(), - readBytes); + totalReadBytes); // Header reads are checked explicitly during recovery if (!header) { - if (!page->verifyChecksum(pageID)) { + if (!page->verifyChecksum(pageIDs.front())) { debug_printf( - "DWALPager(%s) checksum failed for %s\n", self->filename.c_str(), toString(pageID).c_str()); + "DWALPager(%s) checksum failed for %s\n", self->filename.c_str(), toString(pageIDs).c_str()); Error e = checksum_failed(); TraceEvent(SevError, "RedwoodChecksumFailed") .detail("Filename", self->filename.c_str()) - .detail("PageID", pageID) + .detail("PageID", pageIDs) .detail("PageSize", self->physicalPageSize) - .detail("Offset", pageID * self->physicalPageSize) - .detail("CalculatedChecksum", page->calculateChecksum(pageID)) + .detail("Offset", pageIDs.front() * self->physicalPageSize) + .detail("CalculatedChecksum", page->calculateChecksum(pageIDs.front())) .detail("ChecksumInPage", page->getChecksum()) .error(e); ASSERT(false); @@ -2743,7 +2787,7 @@ public: } static Future> readHeaderPage(DWALPager* self, PhysicalPageID pageID) { - return readPhysicalPage(self, pageID, ioMaxPriority, true); + return readPhysicalPage(self, VectorRef(&pageID, 1), ioMaxPriority, true); } bool tryEvictPage(LogicalPageID logicalID, Version v) { @@ -2754,8 +2798,8 @@ public: // Reads the most recent version of pageID, either previously committed or written using updatePage() // in the current commit Future> readPage(PagerEventReasons reason, - unsigned int level, - LogicalPageID pageID, + uint8_t level, + Standalone> pageIDs, int priority, bool cacheable, bool noHit) override { @@ -2764,30 +2808,28 @@ public: auto& eventReasons = g_redwoodMetrics.level(level).metrics.events; eventReasons.addEventReason(PagerEvents::CacheLookup, reason); if (!cacheable) { - debug_printf("DWALPager(%s) op=readUncached %s\n", filename.c_str(), toString(pageID).c_str()); - PageCacheEntry* pCacheEntry = pageCache.getIfExists(pageID); + debug_printf("DWALPager(%s) op=readUncached %s\n", filename.c_str(), toString(pageIDs).c_str()); + PageCacheEntry* pCacheEntry = pageCache.getIfExists(pageIDs.front()); if (pCacheEntry != nullptr) { ++g_redwoodMetrics.metric.pagerProbeHit; - debug_printf("DWALPager(%s) op=readUncachedHit %s\n", filename.c_str(), toString(pageID).c_str()); - return pCacheEntry->readFuture; + debug_printf("DWALPager(%s) op=readUncachedHit %s\n", filename.c_str(), toString(pageIDs).c_str()); return pCacheEntry->readFuture; } ++g_redwoodMetrics.metric.pagerProbeMiss; - debug_printf("DWALPager(%s) op=readUncachedMiss %s\n", filename.c_str(), toString(pageID).c_str()); - return forwardError(readPhysicalPage(this, (PhysicalPageID)pageID, priority, false), errorPromise); + debug_printf("DWALPager(%s) op=readUncachedMiss %s\n", filename.c_str(), toString(pageIDs).c_str()); + return forwardError(readPhysicalPage(this, pageIDs, priority, false), errorPromise); } - PageCacheEntry& cacheEntry = pageCache.get(pageID, noHit); + PageCacheEntry& cacheEntry = pageCache.get(pageIDs.front(), pageIDs.size(), noHit); debug_printf("DWALPager(%s) op=read %s cached=%d reading=%d writing=%d noHit=%d\n", filename.c_str(), - toString(pageID).c_str(), + toString(pageIDs).c_str(), cacheEntry.initialized(), cacheEntry.initialized() && cacheEntry.reading(), cacheEntry.initialized() && cacheEntry.writing(), noHit); if (!cacheEntry.initialized()) { - debug_printf("DWALPager(%s) issuing actual read of %s\n", filename.c_str(), toString(pageID).c_str()); - cacheEntry.readFuture = - forwardError(readPhysicalPage(this, (PhysicalPageID)pageID, priority, false), errorPromise); + debug_printf("DWALPager(%s) issuing actual read of %s\n", filename.c_str(), toString(pageIDs).c_str()); + cacheEntry.readFuture = forwardError(readPhysicalPage(this, pageIDs, priority, false), errorPromise); cacheEntry.writeFuture = Void(); ++g_redwoodMetrics.metric.pagerCacheMiss; @@ -2828,15 +2870,23 @@ public: return (PhysicalPageID)pageID; } + Standalone> getPhysicalPageIDs(VectorRef logicalIDs, Version v) { + Standalone> physicalIDs; + for (auto& id : logicalIDs) { + physicalIDs.push_back(physicalIDs.arena(), getPhysicalPageID(id, v)); + } + return physicalIDs; + } + Future> readPageAtVersion(PagerEventReasons reason, - unsigned int level, - LogicalPageID logicalID, + uint8_t level, + VectorRef logicalIDs, int priority, Version v, bool cacheable, bool noHit) { - PhysicalPageID physicalID = getPhysicalPageID(logicalID, v); - return readPage(reason, level, physicalID, priority, cacheable, noHit); + Standalone> physicalIDs = getPhysicalPageIDs(logicalIDs, v); + return readPage(reason, level, physicalIDs, priority, cacheable, noHit); } void releaseExtentReadLock() override { concurrentExtentReads->release(); } @@ -2919,7 +2969,7 @@ public: Future> readExtent(LogicalPageID pageID) override { debug_printf("DWALPager(%s) op=readExtent %s\n", filename.c_str(), toString(pageID).c_str()); PageCacheEntry* pCacheEntry = extentCache.getIfExists(pageID); - auto& eventReasons = g_redwoodMetrics.level(0).metrics.events; + auto& eventReasons = g_redwoodMetrics.level(nonBtreeLevel).metrics.events; if (pCacheEntry != nullptr) { eventReasons.addEventReason(PagerEvents::CacheLookup, PagerEventReasons::MetaData); debug_printf("DWALPager(%s) Cache Entry exists for %s\n", filename.c_str(), toString(pageID).c_str()); @@ -2937,18 +2987,23 @@ public: pagesPerExtent, toString(headPageID).c_str(), toString(tailPageID).c_str()); + int pageSize = 1; if (headPageID >= pageID && ((headPageID - pageID) < pagesPerExtent)) headExt = true; if ((tailPageID - pageID) < pagesPerExtent) tailExt = true; if (headExt && tailExt) { readSize = (tailPageID - headPageID + 1) * physicalPageSize; - } else if (headExt) + pageSize = (tailPageID - headPageID + 1); + } else if (headExt) { readSize = (pagesPerExtent - (headPageID - pageID)) * physicalPageSize; - else if (tailExt) + pageSize = (pagesPerExtent - (headPageID - pageID)); + } else if (tailExt) { readSize = (tailPageID - pageID + 1) * physicalPageSize; + pageSize = (tailPageID - headPageID + 1); + } - PageCacheEntry& cacheEntry = extentCache.get(pageID); + PageCacheEntry& cacheEntry = extentCache.get(pageID, pageSize); if (!cacheEntry.initialized()) { cacheEntry.writeFuture = Void(); cacheEntry.readFuture = @@ -3077,7 +3132,7 @@ public: // Read the data from the page that the original was mapped to Reference data = wait( - self->readPage(PagerEventReasons::MetaData, nonBtreeLevel, p.newPageID, ioLeafPriority, false, true)); + self->readPage(PagerEventReasons::MetaData, nonBtreeLevel, VectorRef(&p.newPageID,1), ioLeafPriority, false, true)); // Write the data to the original page so it can be read using its original pageID self->updatePage(PagerEventReasons::MetaData, nonBtreeLevel, p.originalPageID, data); @@ -3554,15 +3609,15 @@ public: ~DWALPagerSnapshot() override {} Future> getPhysicalPage(PagerEventReasons reason, - unsigned int level, - LogicalPageID pageID, + uint8_t level, + VectorRef pageIDs, int priority, bool cacheable, bool noHit) override { if (expired.isError()) { throw expired.getError(); } - return map(pager->readPageAtVersion(reason, level, pageID, priority, version, cacheable, noHit), + return map(pager->readPageAtVersion(reason, level, pageIDs, priority, version, cacheable, noHit), [=](Reference p) { return Reference(std::move(p)); }); } @@ -5062,7 +5117,7 @@ private: const RedwoodRecordRef* upperBound, int prefixLen, VectorRef records, - int height, + uint8_t height, int blockSize) { debug_printf("splitPages height=%d records=%d lowerBound=%s upperBound=%s\n", height, @@ -5138,7 +5193,7 @@ private: const RedwoodRecordRef* lowerBound, const RedwoodRecordRef* upperBound, VectorRef entries, - int height, + uint8_t height, Version v, BTreePageIDRef previousID) { ASSERT(entries.size() > 0); @@ -5201,13 +5256,13 @@ private: pageUpperBound.truncate(commonPrefix + 1); } - state std::vector> pages; + state Reference pages; BTreePage* btPage; if (p.blockCount == 1) { Reference page = self->m_pager->newPageBuffer(); btPage = (BTreePage*)page->mutate(); - pages.push_back(std::move(page)); + pages = std::move(page); } else { ASSERT(p.blockCount > 1); btPage = (BTreePage*)new uint8_t[p.pageSize]; @@ -5250,13 +5305,13 @@ private: if (p.blockCount != 1) { // Mark the slack in the page buffer as defined VALGRIND_MAKE_MEM_DEFINED(((uint8_t*)btPage) + written, (p.blockCount * p.blockSize) - written); + Reference page = self->m_pager->newPageBuffer(p.blockCount); const uint8_t* rptr = (const uint8_t*)btPage; for (int b = 0; b < p.blockCount; ++b) { - Reference page = self->m_pager->newPageBuffer(); - memcpy(page->mutate(), rptr, p.blockSize); + memcpy(page->mutate() + b * p.blockSize, rptr, p.blockSize); rptr += p.blockSize; - pages.push_back(std::move(page)); } + pages = std::move(page); delete[](uint8_t*) btPage; } @@ -5266,12 +5321,11 @@ private: // If we are only writing 1 page and it has the same BTreePageID size as the original then try to reuse the // LogicalPageIDs in previousID and try to update them atomically. - if (pagesToBuild.size() == 1 && previousID.size() == pages.size()) { - for (k = 0; k < pages.size(); ++k) { - LogicalPageID id = wait( - self->m_pager->atomicUpdatePage(PagerEventReasons::Commit, height, previousID[k], pages[k], v)); - childPageID.push_back(records.arena(), id); - } + if (pagesToBuild.size() == 1 && previousID.size() == p.blockSize && p.blockSize == 1) { + + LogicalPageID id = wait( + self->m_pager->atomicUpdatePage(PagerEventReasons::Commit, height, previousID.front(), pages, v)); + childPageID.push_back(records.arena(), id); } else { // Either the original page is being split, or it's not but it has changed BTreePageID size. // Either way, there is no point in reusing any of the original page IDs because the parent @@ -5280,9 +5334,9 @@ private: if (records.empty()) { self->freeBTreePage(previousID, v); } - for (k = 0; k < pages.size(); ++k) { - LogicalPageID id = wait(self->m_pager->newPageID()); - self->m_pager->updatePage(PagerEventReasons::Commit, height, id, pages[k]); + Standalone> emptyPages = wait(self->m_pager->newPageIDs(p.blockCount)); + self->m_pager->updatePage(PagerEventReasons::Commit, height, emptyPages, pages); + for (const LogicalPageID& id : emptyPages) { childPageID.push_back(records.arena(), id); } } @@ -5320,7 +5374,7 @@ private: } ACTOR static Future>> - buildNewRoot(VersionedBTree* self, Version version, Standalone> records, int height) { + buildNewRoot(VersionedBTree* self, Version version, Standalone> records, uint8_t height) { debug_printf("buildNewRoot start version %" PRId64 ", %lu records\n", version, records.size()); // While there are multiple child pages for this version we must write new tree levels. @@ -5351,7 +5405,7 @@ private: } ACTOR static Future> readPage(PagerEventReasons reason, - unsigned int level, + uint8_t level, Reference snapshot, BTreePageIDRef id, int priority, @@ -5364,21 +5418,8 @@ private: snapshot->getVersion()); state Reference page; - - if (id.size() == 1) { - Reference p = - wait(snapshot->getPhysicalPage(reason, level, id.front(), priority, cacheable, false)); - page = std::move(p); - } else { - ASSERT(!id.empty()); - std::vector>> reads; - for (auto& pageID : id) { - reads.push_back(snapshot->getPhysicalPage(reason, level, pageID, priority, cacheable, false)); - } - std::vector> pages = wait(getAll(reads)); - // TODO: Cache reconstituted super pages somehow, perhaps with help from the Pager. - page = ArenaPage::concatPages(pages); - } + Reference p = wait(snapshot->getPhysicalPage(reason, level, id, priority, cacheable, false)); + page = std::move(p); debug_printf("readPage() op=readComplete %s @%" PRId64 " \n", toString(id).c_str(), snapshot->getVersion()); const BTreePage* btPage = (const BTreePage*)page->begin(); @@ -5419,14 +5460,11 @@ private: ((BTreePage*)page->begin())->tree()); } - static void preLoadPage(IPagerSnapshot* snapshot, unsigned int l, BTreePageIDRef id, int priority) { + static void preLoadPage(IPagerSnapshot* snapshot, BTreePageIDRef pageIDs, int priority) { g_redwoodMetrics.metric.btreeLeafPreload += 1; - g_redwoodMetrics.metric.btreeLeafPreloadExt += (id.size() - 1); + g_redwoodMetrics.metric.btreeLeafPreloadExt += (pageIDs.size() - 1); - for (auto pageID : id) { - // Prefetches are always at the Leaf level currently so it isn't part of the per-level metrics set - snapshot->getPhysicalPage(PagerEventReasons::RangePrefetch, nonBtreeLevel, pageID, priority, true, true); - } + snapshot->getPhysicalPage(PagerEventReasons::RangePrefetch, nonBtreeLevel, pageIDs, priority, true, true); } void freeBTreePage(BTreePageIDRef btPageID, Version v) { @@ -5465,25 +5503,13 @@ private: self->m_pager->atomicUpdatePage(PagerEventReasons::Commit, height, oldID.front(), page, writeVersion)); newID.front() = id; } else { - state std::vector> pages; - const uint8_t* rptr = page->begin(); - int bytesLeft = page->size(); - while (bytesLeft > 0) { - Reference p = self->m_pager->newPageBuffer(); - int blockSize = p->size(); - memcpy(p->mutate(), rptr, blockSize); - rptr += blockSize; - bytesLeft -= blockSize; - pages.push_back(p); - } - ASSERT(pages.size() == oldID.size()); - - // Write pages, trying to reuse original page IDs + Standalone> emptyPages = wait(self->m_pager->newPageIDs(oldID.size())); + self->freeBTreePage(oldID, writeVersion); + self->m_pager->updatePage(PagerEventReasons::Commit, height, emptyPages, page); state int i = 0; - for (; i < pages.size(); ++i) { - LogicalPageID id = wait(self->m_pager->atomicUpdatePage( - PagerEventReasons::Commit, height, oldID[i], pages[i], writeVersion)); + for (const LogicalPageID& id : emptyPages) { newID[i] = id; + ++i; } } @@ -5808,7 +5834,7 @@ private: Reference snapshot, MutationBuffer* mutationBuffer, BTreePageIDRef rootID, - int height, + uint8_t height, MutationBuffer::const_iterator mBegin, // greatest mutation boundary <= subtreeLowerBound->key MutationBuffer::const_iterator mEnd, // least boundary >= subtreeUpperBound->key InternalPageSliceUpdate* update) { @@ -6801,7 +6827,6 @@ public: // Note that only immediate siblings under the same parent are considered for prefetch so far. BTreePage::BinaryTree::Cursor c = path[path.size() - 2].cursor; ASSERT(path[path.size() - 2].btPage()->height == 2); - constexpr int level = 1; // The loop conditions are split apart into different if blocks for readability. // While query limits are not exceeded @@ -6825,8 +6850,7 @@ public: // Prefetch the sibling if the link is not null if (c.get().value.present()) { BTreePageIDRef childPage = c.get().getChildPage(); - // Assertion above enforces that level is 1 - preLoadPage(pager.getPtr(), level, childPage, ioLeafPriority); + preLoadPage(pager.getPtr(), childPage, ioLeafPriority); recordsRead += estRecordsPerPage; // Use sibling node capacity as an estimate of bytes read. bytesRead += childPage.size() * this->btree->m_blockSize;