support superpage withou modifying atomic update function

This commit is contained in:
Fuheng Zhao 2021-08-03 11:17:14 -07:00
parent 2cf66abc24
commit aec0d044de
2 changed files with 186 additions and 150 deletions

View File

@ -165,8 +165,8 @@ public:
class IPagerSnapshot {
public:
virtual Future<Reference<const ArenaPage>> getPhysicalPage(PagerEventReasons reason,
unsigned int level,
LogicalPageID pageID,
uint8_t level,
VectorRef<LogicalPageID> pageIDs,
int priority,
bool cacheable,
bool nohit) = 0;
@ -185,7 +185,7 @@ public:
class IPager2 : public IClosable {
public:
// Returns an ArenaPage that can be passed to writePage. The data in the returned ArenaPage might not be zeroed.
virtual Reference<ArenaPage> newPageBuffer() = 0;
virtual Reference<ArenaPage> newPageBuffer(size_t size= 1) = 0;
// Returns the usable size of pages returned by the pager (i.e. the size of the page that isn't pager overhead).
// For a given pager instance, separate calls to this function must return the same value.
@ -198,6 +198,7 @@ public:
// Allocate a new page ID for a subsequent write. The page will be considered in-use after the next commit
// regardless of whether or not it was written to.
virtual Future<LogicalPageID> newPageID() = 0;
virtual Future<Standalone<VectorRef<LogicalPageID>>> newPageIDs(size_t size) = 0;
virtual Future<LogicalPageID> newExtentPageID(QueueID queueID) = 0;
virtual QueueID newLastQueueID() = 0;
@ -206,16 +207,19 @@ public:
// Existing holders of a page reference for pageID, read from any version,
// may see the effects of this write.
virtual void updatePage(PagerEventReasons reason,
unsigned int level,
LogicalPageID pageID,
uint8_t level,
Standalone<VectorRef<LogicalPageID>> pageIDs,
Reference<ArenaPage> data) = 0;
void updatePage(PagerEventReasons reason, uint8_t level, LogicalPageID pageID, Reference<ArenaPage> data) {
updatePage(reason, level, VectorRef<LogicalPageID>(&pageID, 1), data);
}
// Try to atomically update the contents of a page as of version v in the next commit.
// If the pager is unable to do this at this time, it may choose to write the data to a new page ID
// instead and return the new page ID to the caller. Otherwise the original pageID argument will be returned.
// If a new page ID is returned, the old page ID will be freed as of version v
virtual Future<LogicalPageID> atomicUpdatePage(PagerEventReasons reason,
unsigned int level,
uint8_t level,
LogicalPageID pageID,
Reference<ArenaPage> data,
Version v) = 0;
@ -237,11 +241,19 @@ public:
// NoHit indicates that the read should not be considered a cache hit, such as when preloading pages that are
// considered likely to be needed soon.
virtual Future<Reference<ArenaPage>> readPage(PagerEventReasons reason,
unsigned int level,
LogicalPageID pageID,
uint8_t level,
Standalone<VectorRef<LogicalPageID>> pageIDs,
int priority,
bool cacheable,
bool noHit) = 0;
Future<Reference<ArenaPage>> readPage(PagerEventReasons reason,
uint8_t level,
LogicalPageID pageID,
int priority,
bool cacheable,
bool noHit) {
return readPage(reason, level, VectorRef<LogicalPageID>(&pageID, 1), priority, cacheable, noHit);
}
virtual Future<Reference<ArenaPage>> readExtent(LogicalPageID pageID) = 0;
virtual void releaseExtentReadLock() = 0;

View File

@ -1464,7 +1464,7 @@ struct RedwoodMetrics {
return eventReasons[(size_t)event][(size_t)reason];
}
std::string toString(int level, double elapsed) const {
std::string toString(uint8_t level, double elapsed) const {
std::string result;
const auto& pairs = (level == 0 ? L0PossibleEventReasonPairs : possibleEventReasonPairs);
@ -1488,7 +1488,7 @@ struct RedwoodMetrics {
return result;
}
void toTraceEvent(TraceEvent* t, int level) const {
void toTraceEvent(TraceEvent* t, uint8_t level) const {
const auto& pairs = (level == 0 ? L0PossibleEventReasonPairs : possibleEventReasonPairs);
for (const auto& p : pairs) {
std::string name =
@ -1528,7 +1528,7 @@ struct RedwoodMetrics {
Level() { clear(); }
void clear(int level = 0) {
void clear(uint8_t level = 0) {
metrics = {};
if (level > 0) {
@ -1593,7 +1593,7 @@ struct RedwoodMetrics {
}
void clear() {
unsigned int levelCounter = 0;
uint8_t levelCounter = 0;
for (RedwoodMetrics::Level& level : levels) {
level.clear(levelCounter);
++levelCounter;
@ -1621,12 +1621,12 @@ struct RedwoodMetrics {
return metric.pagerDiskWrite + metric.pagerDiskRead + metric.pagerCacheHit + metric.pagerProbeHit;
}
Level& level(unsigned int level) {
Level& level(uint8_t level) {
// Valid levels are from 0 - btreeLevels
// Level 0 is for operations that are not BTree level specific, as many of the metrics are the same
// Level 0 - btreeLevels correspond to BTree node height, however heights above btreeLevels are combined
// into the level at btreeLevels
return levels[std::min(level, btreeLevels)];
return levels[std::min((unsigned int)level, btreeLevels)];
}
void updateMaxRecordCount(int maxRecords) {
@ -1788,17 +1788,18 @@ template <class IndexType, class ObjectType>
class ObjectCache : NonCopyable {
struct Entry : public boost::intrusive::list_base_hook<> {
Entry() : hits(0) {}
Entry() : hits(0), size(1) {}
IndexType index;
ObjectType item;
int hits;
int size;
};
typedef std::unordered_map<IndexType, Entry> CacheT;
typedef boost::intrusive::list<Entry> EvictionOrderT;
public:
ObjectCache(int sizeLimit = 1) : sizeLimit(sizeLimit) {}
ObjectCache(int sizeLimit = 1) : sizeLimit(sizeLimit), currentSize(0) {}
void setSizeLimit(int n) {
ASSERT(n > 0);
@ -1838,6 +1839,7 @@ public:
if (toEvict.hits == 0) {
++g_redwoodMetrics.metric.pagerEvictUnhit;
}
currentSize -= toEvict.size;
evictionOrder.erase(evictionOrder.iterator_to(toEvict));
cache.erase(i);
return true;
@ -1847,7 +1849,7 @@ public:
// After a get(), the object for i is the last in evictionOrder.
// If noHit is set, do not consider this access to be cache hit if the object is present
// If noMiss is set, do not consider this access to be a cache miss if the object is not present
ObjectType& get(const IndexType& index, bool noHit = false) {
ObjectType& get(const IndexType& index, int size, bool noHit = false) {
Entry& entry = cache[index];
// If entry is linked into evictionOrder then move it to the back of the order
@ -1863,11 +1865,13 @@ public:
// Finish initializing entry
entry.index = index;
entry.hits = 0;
entry.size = size;
currentSize += size;
// Insert the newly created Entry at the back of the eviction order
evictionOrder.push_back(entry);
// While the cache is too big, evict the oldest entry until the oldest entry can't be evicted.
while (cache.size() > sizeLimit) {
while (currentSize > sizeLimit) {
Entry& toEvict = evictionOrder.front();
// It's critical that we do not evict the item we just added because it would cause the reference
@ -1892,6 +1896,7 @@ public:
if (toEvict.hits == 0) {
++g_redwoodMetrics.metric.pagerEvictUnhit;
}
currentSize -= toEvict.size;
debug_printf(
"Evicting %s to make room for %s\n", toString(toEvict.index).c_str(), toString(index).c_str());
evictionOrder.pop_front();
@ -1907,6 +1912,7 @@ public:
ACTOR static Future<Void> clear_impl(ObjectCache* self) {
state ObjectCache::CacheT cache;
state EvictionOrderT evictionOrder;
state int64_t currentSize;
// Swap cache contents to local state vars
// After this, no more entries will be added to or read from these
@ -1914,20 +1920,23 @@ public:
// after it is either evictable or onEvictable() is ready.
cache.swap(self->cache);
evictionOrder.swap(self->evictionOrder);
currentSize = self->currentSize;
state typename EvictionOrderT::iterator i = evictionOrder.begin();
state typename EvictionOrderT::iterator iEnd = evictionOrder.begin();
state typename EvictionOrderT::iterator iEnd = evictionOrder.end();
while (i != iEnd) {
if (!i->item.evictable()) {
wait(i->item.onEvictable());
}
currentSize -= i->size;
self->currentSize -= i->size;
++i;
}
evictionOrder.clear();
cache.clear();
ASSERT(currentSize == 0);
return Void();
}
@ -1940,7 +1949,7 @@ public:
private:
int64_t sizeLimit;
int64_t currentSize;
CacheT cache;
EvictionOrderT evictionOrder;
};
@ -2374,8 +2383,8 @@ public:
return id;
}
Reference<ArenaPage> newPageBuffer() override {
return Reference<ArenaPage>(new ArenaPage(logicalPageSize, physicalPageSize));
Reference<ArenaPage> newPageBuffer(size_t size=1) override {
return Reference<ArenaPage>(new ArenaPage(logicalPageSize * size, physicalPageSize * size));
}
// Returns the usable size of pages returned by the pager (i.e. the size of the page that isn't pager overhead).
@ -2426,7 +2435,23 @@ public:
}
Future<LogicalPageID> newPageID() override { return newPageID_impl(this); }
Future<Standalone<VectorRef<LogicalPageID>>> newPageIDs(size_t size) override {
return newPageIDs_impl(this, size);
}
ACTOR static Future<Standalone<VectorRef<LogicalPageID>>> newPageIDs_impl(DWALPager* self, size_t size) {
state Standalone<VectorRef<LogicalPageID>> newPages;
state size_t i = 0;
for (; i < size; ++i) {
LogicalPageID id = wait(self->newPageID());
newPages.push_back(newPages.arena(), id);
}
debug_printf("DWALPager(%s) newPageIDs_impl(%d) returning %s at end of file\n",
self->filename.c_str(),
size,
toString(newPages).c_str());
return newPages;
}
// Get a new, previously available extent and it's first page ID. The page will be considered in-use after the next
// commit regardless of whether or not it was written to, until it is returned to the pager via freePage()
ACTOR static Future<LogicalPageID> newExtentPageID_impl(DWALPager* self, QueueID queueID) {
@ -2464,23 +2489,23 @@ public:
ACTOR static Future<Void> writePhysicalPage_impl(DWALPager* self,
PagerEventReasons reason,
unsigned int level,
PhysicalPageID pageID,
uint8_t level,
Standalone<VectorRef<PhysicalPageID>> pageIDs,
Reference<ArenaPage> page,
bool header = false) {
debug_printf("DWALPager(%s) op=%s %s ptr=%p\n",
self->filename.c_str(),
(header ? "writePhysicalHeader" : "writePhysical"),
toString(pageID).c_str(),
toString(pageIDs).c_str(),
page->begin());
VALGRIND_MAKE_MEM_DEFINED(page->begin(), page->size());
page->updateChecksum(pageID);
page->updateChecksum(pageIDs.front());
debug_printf("DWALPager(%s) writePhysicalPage %s CalculatedChecksum=%d ChecksumInPage=%d\n",
self->filename.c_str(),
toString(pageID).c_str(),
page->calculateChecksum(pageID),
toString(pageIDs).c_str(),
page->calculateChecksum(pageIDs.front()),
page->getChecksum());
state PriorityMultiLock::Lock lock = wait(self->ioLock.lock(header ? ioMaxPriority : ioMinPriority));
@ -2493,43 +2518,56 @@ public:
// Note: Not using forwardError here so a write error won't be discovered until commit time.
state int blockSize = header ? smallestPhysicalBlock : self->physicalPageSize;
wait(self->pageFile->write(page->begin(), blockSize, (int64_t)pageID * blockSize));
debug_printf("DWALPager(%s) op=%s %s ptr=%p file offset=%d\n",
self->filename.c_str(),
(header ? "writePhysicalHeaderComplete" : "writePhysicalComplete"),
toString(pageID).c_str(),
page->begin(),
(pageID * blockSize));
state std::vector<Future<Void>> writers;
state int i = 0;
for (const auto& pageID : pageIDs) {
Future<Void> p =
self->pageFile->write(page->mutate() + i * blockSize, blockSize, ((int64_t)pageID) * blockSize);
i += 1;
writers.push_back(p);
}
wait(waitForAll(writers));
if (REDWOOD_DEBUG) {
Standalone<VectorRef<PhysicalPageID>> pageIDsCopy = pageIDs;
debug_printf("DWALPager(%s) op=%s %s ptr=%p file offset=%d\n",
self->filename.c_str(),
(header ? "writePhysicalHeaderComplete" : "writePhysicalComplete"),
toString(pageIDsCopy).c_str(),
page->begin(),
(pageIDsCopy.front() * blockSize));
}
return Void();
}
Future<Void> writePhysicalPage(PagerEventReasons reason,
unsigned int level,
PhysicalPageID pageID,
uint8_t level,
VectorRef<PhysicalPageID> pageIDs,
Reference<ArenaPage> page,
bool header = false) {
Future<Void> f = writePhysicalPage_impl(this, reason, level, pageID, page, header);
Future<Void> f = writePhysicalPage_impl(this, reason, level, pageIDs, page, header);
operations.add(f);
return f;
}
Future<Void> writeHeaderPage(PhysicalPageID pageID, Reference<ArenaPage> page) {
return writePhysicalPage(PagerEventReasons::MetaData, nonBtreeLevel, pageID, page, true);
return writePhysicalPage(PagerEventReasons::MetaData, nonBtreeLevel, VectorRef<PhysicalPageID>(&pageID, 1), page, true);
}
void updatePage(PagerEventReasons reason, uint8_t level, LogicalPageID pageID, Reference<ArenaPage> data) {
updatePage(reason, level, VectorRef<LogicalPageID>(&pageID, 1), data);
}
void updatePage(PagerEventReasons reason,
unsigned int level,
LogicalPageID pageID,
uint8_t level,
Standalone<VectorRef<LogicalPageID>> pageIDs,
Reference<ArenaPage> data) override {
// Get the cache entry for this page, without counting it as a cache hit as we're replacing its contents now
// or as a cache miss because there is no benefit to the page already being in cache
// Similarly, this does not count as a point lookup for reason.
PageCacheEntry& cacheEntry = pageCache.get(pageID, true);
PageCacheEntry& cacheEntry = pageCache.get(pageIDs.front(), pageIDs.size(), true);
debug_printf("DWALPager(%s) op=write %s cached=%d reading=%d writing=%d\n",
filename.c_str(),
toString(pageID).c_str(),
toString(pageIDs).c_str(),
cacheEntry.initialized(),
cacheEntry.initialized() && cacheEntry.reading(),
cacheEntry.initialized() && cacheEntry.writing());
@ -2542,11 +2580,11 @@ public:
// future reads of the version are not allowed) and the write of the next newest version over top
// of the original page begins.
if (!cacheEntry.initialized()) {
cacheEntry.writeFuture = writePhysicalPage(reason, level, pageID, data);
cacheEntry.writeFuture = writePhysicalPage(reason, level, pageIDs, data);
} else if (cacheEntry.reading()) {
// Wait for the read to finish, then start the write.
cacheEntry.writeFuture = map(success(cacheEntry.readFuture), [=](Void) {
writePhysicalPage(reason, level, pageID, data);
writePhysicalPage(reason, level, pageIDs, data);
return Void();
});
}
@ -2554,11 +2592,11 @@ public:
// writes happen in the correct order
else if (cacheEntry.writing()) {
cacheEntry.writeFuture = map(cacheEntry.writeFuture, [=](Void) {
writePhysicalPage(reason, level, pageID, data);
writePhysicalPage(reason, level, pageIDs, data);
return Void();
});
} else {
cacheEntry.writeFuture = writePhysicalPage(reason, level, pageID, data);
cacheEntry.writeFuture = writePhysicalPage(reason, level, pageIDs, data);
}
// Always update the page contents immediately regardless of what happened above.
@ -2566,7 +2604,7 @@ public:
}
Future<LogicalPageID> atomicUpdatePage(PagerEventReasons reason,
unsigned int level,
uint8_t level,
LogicalPageID pageID,
Reference<ArenaPage> data,
Version v) override {
@ -2692,7 +2730,7 @@ public:
// If the user chosen physical page size is larger, then there will be a gap of unused space after the header pages
// and before the user-chosen sized pages.
ACTOR static Future<Reference<ArenaPage>> readPhysicalPage(DWALPager* self,
PhysicalPageID pageID,
Standalone<VectorRef<PhysicalPageID>> pageIDs,
int priority,
bool header) {
ASSERT(!self->memoryOnly);
@ -2703,36 +2741,42 @@ public:
state Reference<ArenaPage> page =
header ? Reference<ArenaPage>(new ArenaPage(smallestPhysicalBlock, smallestPhysicalBlock))
: self->newPageBuffer();
: self->newPageBuffer(pageIDs.size());
debug_printf("DWALPager(%s) op=readPhysicalStart %s ptr=%p\n",
self->filename.c_str(),
toString(pageID).c_str(),
toString(pageIDs).c_str(),
page->begin());
state PriorityMultiLock::Lock lock = wait(self->ioLock.lock(std::min(priority, ioMaxPriority)));
++g_redwoodMetrics.metric.pagerDiskRead;
// TODO: Could a dispatched read try to write to page after it has been destroyed if this actor is cancelled?
int blockSize = header ? smallestPhysicalBlock : self->physicalPageSize;
int readBytes = wait(self->pageFile->read(page->mutate(), blockSize, (int64_t)pageID * blockSize));
state int blockSize = header ? smallestPhysicalBlock : self->physicalPageSize;
state int totalReadBytes = 0;
state int i = 0;
for (i = 0; i < pageIDs.size(); i++) {
int readBytes = wait(
self->pageFile->read(page->mutate() + i * blockSize, blockSize, ((int64_t)pageIDs[i]) * blockSize));
totalReadBytes += readBytes;
}
debug_printf("DWALPager(%s) op=readPhysicalComplete %s ptr=%p bytes=%d\n",
self->filename.c_str(),
toString(pageID).c_str(),
toString(pageIDs).c_str(),
page->begin(),
readBytes);
totalReadBytes);
// Header reads are checked explicitly during recovery
if (!header) {
if (!page->verifyChecksum(pageID)) {
if (!page->verifyChecksum(pageIDs.front())) {
debug_printf(
"DWALPager(%s) checksum failed for %s\n", self->filename.c_str(), toString(pageID).c_str());
"DWALPager(%s) checksum failed for %s\n", self->filename.c_str(), toString(pageIDs).c_str());
Error e = checksum_failed();
TraceEvent(SevError, "RedwoodChecksumFailed")
.detail("Filename", self->filename.c_str())
.detail("PageID", pageID)
.detail("PageID", pageIDs)
.detail("PageSize", self->physicalPageSize)
.detail("Offset", pageID * self->physicalPageSize)
.detail("CalculatedChecksum", page->calculateChecksum(pageID))
.detail("Offset", pageIDs.front() * self->physicalPageSize)
.detail("CalculatedChecksum", page->calculateChecksum(pageIDs.front()))
.detail("ChecksumInPage", page->getChecksum())
.error(e);
ASSERT(false);
@ -2743,7 +2787,7 @@ public:
}
static Future<Reference<ArenaPage>> readHeaderPage(DWALPager* self, PhysicalPageID pageID) {
return readPhysicalPage(self, pageID, ioMaxPriority, true);
return readPhysicalPage(self, VectorRef<LogicalPageID>(&pageID, 1), ioMaxPriority, true);
}
bool tryEvictPage(LogicalPageID logicalID, Version v) {
@ -2754,8 +2798,8 @@ public:
// Reads the most recent version of pageID, either previously committed or written using updatePage()
// in the current commit
Future<Reference<ArenaPage>> readPage(PagerEventReasons reason,
unsigned int level,
LogicalPageID pageID,
uint8_t level,
Standalone<VectorRef<PhysicalPageID>> pageIDs,
int priority,
bool cacheable,
bool noHit) override {
@ -2764,30 +2808,28 @@ public:
auto& eventReasons = g_redwoodMetrics.level(level).metrics.events;
eventReasons.addEventReason(PagerEvents::CacheLookup, reason);
if (!cacheable) {
debug_printf("DWALPager(%s) op=readUncached %s\n", filename.c_str(), toString(pageID).c_str());
PageCacheEntry* pCacheEntry = pageCache.getIfExists(pageID);
debug_printf("DWALPager(%s) op=readUncached %s\n", filename.c_str(), toString(pageIDs).c_str());
PageCacheEntry* pCacheEntry = pageCache.getIfExists(pageIDs.front());
if (pCacheEntry != nullptr) {
++g_redwoodMetrics.metric.pagerProbeHit;
debug_printf("DWALPager(%s) op=readUncachedHit %s\n", filename.c_str(), toString(pageID).c_str());
return pCacheEntry->readFuture;
debug_printf("DWALPager(%s) op=readUncachedHit %s\n", filename.c_str(), toString(pageIDs).c_str()); return pCacheEntry->readFuture;
}
++g_redwoodMetrics.metric.pagerProbeMiss;
debug_printf("DWALPager(%s) op=readUncachedMiss %s\n", filename.c_str(), toString(pageID).c_str());
return forwardError(readPhysicalPage(this, (PhysicalPageID)pageID, priority, false), errorPromise);
debug_printf("DWALPager(%s) op=readUncachedMiss %s\n", filename.c_str(), toString(pageIDs).c_str());
return forwardError(readPhysicalPage(this, pageIDs, priority, false), errorPromise);
}
PageCacheEntry& cacheEntry = pageCache.get(pageID, noHit);
PageCacheEntry& cacheEntry = pageCache.get(pageIDs.front(), pageIDs.size(), noHit);
debug_printf("DWALPager(%s) op=read %s cached=%d reading=%d writing=%d noHit=%d\n",
filename.c_str(),
toString(pageID).c_str(),
toString(pageIDs).c_str(),
cacheEntry.initialized(),
cacheEntry.initialized() && cacheEntry.reading(),
cacheEntry.initialized() && cacheEntry.writing(),
noHit);
if (!cacheEntry.initialized()) {
debug_printf("DWALPager(%s) issuing actual read of %s\n", filename.c_str(), toString(pageID).c_str());
cacheEntry.readFuture =
forwardError(readPhysicalPage(this, (PhysicalPageID)pageID, priority, false), errorPromise);
debug_printf("DWALPager(%s) issuing actual read of %s\n", filename.c_str(), toString(pageIDs).c_str());
cacheEntry.readFuture = forwardError(readPhysicalPage(this, pageIDs, priority, false), errorPromise);
cacheEntry.writeFuture = Void();
++g_redwoodMetrics.metric.pagerCacheMiss;
@ -2828,15 +2870,23 @@ public:
return (PhysicalPageID)pageID;
}
Standalone<VectorRef<PhysicalPageID>> getPhysicalPageIDs(VectorRef<LogicalPageID> logicalIDs, Version v) {
Standalone<VectorRef<PhysicalPageID>> physicalIDs;
for (auto& id : logicalIDs) {
physicalIDs.push_back(physicalIDs.arena(), getPhysicalPageID(id, v));
}
return physicalIDs;
}
Future<Reference<ArenaPage>> readPageAtVersion(PagerEventReasons reason,
unsigned int level,
LogicalPageID logicalID,
uint8_t level,
VectorRef<LogicalPageID> logicalIDs,
int priority,
Version v,
bool cacheable,
bool noHit) {
PhysicalPageID physicalID = getPhysicalPageID(logicalID, v);
return readPage(reason, level, physicalID, priority, cacheable, noHit);
Standalone<VectorRef<PhysicalPageID>> physicalIDs = getPhysicalPageIDs(logicalIDs, v);
return readPage(reason, level, physicalIDs, priority, cacheable, noHit);
}
void releaseExtentReadLock() override { concurrentExtentReads->release(); }
@ -2919,7 +2969,7 @@ public:
Future<Reference<ArenaPage>> readExtent(LogicalPageID pageID) override {
debug_printf("DWALPager(%s) op=readExtent %s\n", filename.c_str(), toString(pageID).c_str());
PageCacheEntry* pCacheEntry = extentCache.getIfExists(pageID);
auto& eventReasons = g_redwoodMetrics.level(0).metrics.events;
auto& eventReasons = g_redwoodMetrics.level(nonBtreeLevel).metrics.events;
if (pCacheEntry != nullptr) {
eventReasons.addEventReason(PagerEvents::CacheLookup, PagerEventReasons::MetaData);
debug_printf("DWALPager(%s) Cache Entry exists for %s\n", filename.c_str(), toString(pageID).c_str());
@ -2937,18 +2987,23 @@ public:
pagesPerExtent,
toString(headPageID).c_str(),
toString(tailPageID).c_str());
int pageSize = 1;
if (headPageID >= pageID && ((headPageID - pageID) < pagesPerExtent))
headExt = true;
if ((tailPageID - pageID) < pagesPerExtent)
tailExt = true;
if (headExt && tailExt) {
readSize = (tailPageID - headPageID + 1) * physicalPageSize;
} else if (headExt)
pageSize = (tailPageID - headPageID + 1);
} else if (headExt) {
readSize = (pagesPerExtent - (headPageID - pageID)) * physicalPageSize;
else if (tailExt)
pageSize = (pagesPerExtent - (headPageID - pageID));
} else if (tailExt) {
readSize = (tailPageID - pageID + 1) * physicalPageSize;
pageSize = (tailPageID - headPageID + 1);
}
PageCacheEntry& cacheEntry = extentCache.get(pageID);
PageCacheEntry& cacheEntry = extentCache.get(pageID, pageSize);
if (!cacheEntry.initialized()) {
cacheEntry.writeFuture = Void();
cacheEntry.readFuture =
@ -3077,7 +3132,7 @@ public:
// Read the data from the page that the original was mapped to
Reference<ArenaPage> data = wait(
self->readPage(PagerEventReasons::MetaData, nonBtreeLevel, p.newPageID, ioLeafPriority, false, true));
self->readPage(PagerEventReasons::MetaData, nonBtreeLevel, VectorRef<LogicalPageID>(&p.newPageID,1), ioLeafPriority, false, true));
// Write the data to the original page so it can be read using its original pageID
self->updatePage(PagerEventReasons::MetaData, nonBtreeLevel, p.originalPageID, data);
@ -3554,15 +3609,15 @@ public:
~DWALPagerSnapshot() override {}
Future<Reference<const ArenaPage>> getPhysicalPage(PagerEventReasons reason,
unsigned int level,
LogicalPageID pageID,
uint8_t level,
VectorRef<LogicalPageID> pageIDs,
int priority,
bool cacheable,
bool noHit) override {
if (expired.isError()) {
throw expired.getError();
}
return map(pager->readPageAtVersion(reason, level, pageID, priority, version, cacheable, noHit),
return map(pager->readPageAtVersion(reason, level, pageIDs, priority, version, cacheable, noHit),
[=](Reference<ArenaPage> p) { return Reference<const ArenaPage>(std::move(p)); });
}
@ -5062,7 +5117,7 @@ private:
const RedwoodRecordRef* upperBound,
int prefixLen,
VectorRef<RedwoodRecordRef> records,
int height,
uint8_t height,
int blockSize) {
debug_printf("splitPages height=%d records=%d lowerBound=%s upperBound=%s\n",
height,
@ -5138,7 +5193,7 @@ private:
const RedwoodRecordRef* lowerBound,
const RedwoodRecordRef* upperBound,
VectorRef<RedwoodRecordRef> entries,
int height,
uint8_t height,
Version v,
BTreePageIDRef previousID) {
ASSERT(entries.size() > 0);
@ -5201,13 +5256,13 @@ private:
pageUpperBound.truncate(commonPrefix + 1);
}
state std::vector<Reference<ArenaPage>> pages;
state Reference<ArenaPage> pages;
BTreePage* btPage;
if (p.blockCount == 1) {
Reference<ArenaPage> page = self->m_pager->newPageBuffer();
btPage = (BTreePage*)page->mutate();
pages.push_back(std::move(page));
pages = std::move(page);
} else {
ASSERT(p.blockCount > 1);
btPage = (BTreePage*)new uint8_t[p.pageSize];
@ -5250,13 +5305,13 @@ private:
if (p.blockCount != 1) {
// Mark the slack in the page buffer as defined
VALGRIND_MAKE_MEM_DEFINED(((uint8_t*)btPage) + written, (p.blockCount * p.blockSize) - written);
Reference<ArenaPage> page = self->m_pager->newPageBuffer(p.blockCount);
const uint8_t* rptr = (const uint8_t*)btPage;
for (int b = 0; b < p.blockCount; ++b) {
Reference<ArenaPage> page = self->m_pager->newPageBuffer();
memcpy(page->mutate(), rptr, p.blockSize);
memcpy(page->mutate() + b * p.blockSize, rptr, p.blockSize);
rptr += p.blockSize;
pages.push_back(std::move(page));
}
pages = std::move(page);
delete[](uint8_t*) btPage;
}
@ -5266,12 +5321,11 @@ private:
// If we are only writing 1 page and it has the same BTreePageID size as the original then try to reuse the
// LogicalPageIDs in previousID and try to update them atomically.
if (pagesToBuild.size() == 1 && previousID.size() == pages.size()) {
for (k = 0; k < pages.size(); ++k) {
LogicalPageID id = wait(
self->m_pager->atomicUpdatePage(PagerEventReasons::Commit, height, previousID[k], pages[k], v));
childPageID.push_back(records.arena(), id);
}
if (pagesToBuild.size() == 1 && previousID.size() == p.blockSize && p.blockSize == 1) {
LogicalPageID id = wait(
self->m_pager->atomicUpdatePage(PagerEventReasons::Commit, height, previousID.front(), pages, v));
childPageID.push_back(records.arena(), id);
} else {
// Either the original page is being split, or it's not but it has changed BTreePageID size.
// Either way, there is no point in reusing any of the original page IDs because the parent
@ -5280,9 +5334,9 @@ private:
if (records.empty()) {
self->freeBTreePage(previousID, v);
}
for (k = 0; k < pages.size(); ++k) {
LogicalPageID id = wait(self->m_pager->newPageID());
self->m_pager->updatePage(PagerEventReasons::Commit, height, id, pages[k]);
Standalone<VectorRef<LogicalPageID>> emptyPages = wait(self->m_pager->newPageIDs(p.blockCount));
self->m_pager->updatePage(PagerEventReasons::Commit, height, emptyPages, pages);
for (const LogicalPageID& id : emptyPages) {
childPageID.push_back(records.arena(), id);
}
}
@ -5320,7 +5374,7 @@ private:
}
ACTOR static Future<Standalone<VectorRef<RedwoodRecordRef>>>
buildNewRoot(VersionedBTree* self, Version version, Standalone<VectorRef<RedwoodRecordRef>> records, int height) {
buildNewRoot(VersionedBTree* self, Version version, Standalone<VectorRef<RedwoodRecordRef>> records, uint8_t height) {
debug_printf("buildNewRoot start version %" PRId64 ", %lu records\n", version, records.size());
// While there are multiple child pages for this version we must write new tree levels.
@ -5351,7 +5405,7 @@ private:
}
ACTOR static Future<Reference<const ArenaPage>> readPage(PagerEventReasons reason,
unsigned int level,
uint8_t level,
Reference<IPagerSnapshot> snapshot,
BTreePageIDRef id,
int priority,
@ -5364,21 +5418,8 @@ private:
snapshot->getVersion());
state Reference<const ArenaPage> page;
if (id.size() == 1) {
Reference<const ArenaPage> p =
wait(snapshot->getPhysicalPage(reason, level, id.front(), priority, cacheable, false));
page = std::move(p);
} else {
ASSERT(!id.empty());
std::vector<Future<Reference<const ArenaPage>>> reads;
for (auto& pageID : id) {
reads.push_back(snapshot->getPhysicalPage(reason, level, pageID, priority, cacheable, false));
}
std::vector<Reference<const ArenaPage>> pages = wait(getAll(reads));
// TODO: Cache reconstituted super pages somehow, perhaps with help from the Pager.
page = ArenaPage::concatPages(pages);
}
Reference<const ArenaPage> p = wait(snapshot->getPhysicalPage(reason, level, id, priority, cacheable, false));
page = std::move(p);
debug_printf("readPage() op=readComplete %s @%" PRId64 " \n", toString(id).c_str(), snapshot->getVersion());
const BTreePage* btPage = (const BTreePage*)page->begin();
@ -5419,14 +5460,11 @@ private:
((BTreePage*)page->begin())->tree());
}
static void preLoadPage(IPagerSnapshot* snapshot, unsigned int l, BTreePageIDRef id, int priority) {
static void preLoadPage(IPagerSnapshot* snapshot, BTreePageIDRef pageIDs, int priority) {
g_redwoodMetrics.metric.btreeLeafPreload += 1;
g_redwoodMetrics.metric.btreeLeafPreloadExt += (id.size() - 1);
g_redwoodMetrics.metric.btreeLeafPreloadExt += (pageIDs.size() - 1);
for (auto pageID : id) {
// Prefetches are always at the Leaf level currently so it isn't part of the per-level metrics set
snapshot->getPhysicalPage(PagerEventReasons::RangePrefetch, nonBtreeLevel, pageID, priority, true, true);
}
snapshot->getPhysicalPage(PagerEventReasons::RangePrefetch, nonBtreeLevel, pageIDs, priority, true, true);
}
void freeBTreePage(BTreePageIDRef btPageID, Version v) {
@ -5465,25 +5503,13 @@ private:
self->m_pager->atomicUpdatePage(PagerEventReasons::Commit, height, oldID.front(), page, writeVersion));
newID.front() = id;
} else {
state std::vector<Reference<ArenaPage>> pages;
const uint8_t* rptr = page->begin();
int bytesLeft = page->size();
while (bytesLeft > 0) {
Reference<ArenaPage> p = self->m_pager->newPageBuffer();
int blockSize = p->size();
memcpy(p->mutate(), rptr, blockSize);
rptr += blockSize;
bytesLeft -= blockSize;
pages.push_back(p);
}
ASSERT(pages.size() == oldID.size());
// Write pages, trying to reuse original page IDs
Standalone<VectorRef<LogicalPageID>> emptyPages = wait(self->m_pager->newPageIDs(oldID.size()));
self->freeBTreePage(oldID, writeVersion);
self->m_pager->updatePage(PagerEventReasons::Commit, height, emptyPages, page);
state int i = 0;
for (; i < pages.size(); ++i) {
LogicalPageID id = wait(self->m_pager->atomicUpdatePage(
PagerEventReasons::Commit, height, oldID[i], pages[i], writeVersion));
for (const LogicalPageID& id : emptyPages) {
newID[i] = id;
++i;
}
}
@ -5808,7 +5834,7 @@ private:
Reference<IPagerSnapshot> snapshot,
MutationBuffer* mutationBuffer,
BTreePageIDRef rootID,
int height,
uint8_t height,
MutationBuffer::const_iterator mBegin, // greatest mutation boundary <= subtreeLowerBound->key
MutationBuffer::const_iterator mEnd, // least boundary >= subtreeUpperBound->key
InternalPageSliceUpdate* update) {
@ -6801,7 +6827,6 @@ public:
// Note that only immediate siblings under the same parent are considered for prefetch so far.
BTreePage::BinaryTree::Cursor c = path[path.size() - 2].cursor;
ASSERT(path[path.size() - 2].btPage()->height == 2);
constexpr int level = 1;
// The loop conditions are split apart into different if blocks for readability.
// While query limits are not exceeded
@ -6825,8 +6850,7 @@ public:
// Prefetch the sibling if the link is not null
if (c.get().value.present()) {
BTreePageIDRef childPage = c.get().getChildPage();
// Assertion above enforces that level is 1
preLoadPage(pager.getPtr(), level, childPage, ioLeafPriority);
preLoadPage(pager.getPtr(), childPage, ioLeafPriority);
recordsRead += estRecordsPerPage;
// Use sibling node capacity as an estimate of bytes read.
bytesRead += childPage.size() * this->btree->m_blockSize;