Replaced IPage with ArenaPage, a new Arena-based page class which enables StringRefs to hold a reference to Redwood page memory.
This commit is contained in:
parent
a8d7f7748c
commit
06e8caa0aa
|
@ -26,6 +26,7 @@
|
|||
|
||||
#include "flow/flow.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "flow/crc32c.h"
|
||||
|
||||
#ifndef VALGRIND
|
||||
#define VALGRIND_MAKE_MEM_UNDEFINED(x, y)
|
||||
|
@ -36,36 +37,97 @@ typedef uint32_t LogicalPageID;
|
|||
typedef uint32_t PhysicalPageID;
|
||||
#define invalidLogicalPageID std::numeric_limits<LogicalPageID>::max()
|
||||
|
||||
class IPage {
|
||||
// Represents a block of memory in a 4096-byte aligned location held by an Arena.
|
||||
class ArenaPage : public ReferenceCounted<ArenaPage>, public FastAllocated<ArenaPage> {
|
||||
public:
|
||||
IPage() : userData(nullptr) {}
|
||||
// The page's logical size includes an opaque checksum, use size() to get usable size
|
||||
ArenaPage(int logicalSize, int bufferSize) : logicalSize(logicalSize), bufferSize(bufferSize), userData(nullptr) {
|
||||
if (bufferSize > 0) {
|
||||
// Get an aligned pointer to bufferSize from an arena in a wasteful way
|
||||
// TODO: Use arena.allocateAlignedBuffer()
|
||||
size_t space = bufferSize * 2;
|
||||
void* pSpace = new (arena) uint8_t[space];
|
||||
buffer = (uint8_t*)std::align(4096, bufferSize, pSpace, space);
|
||||
|
||||
virtual uint8_t const* begin() const = 0;
|
||||
virtual uint8_t* mutate() = 0;
|
||||
// Mark any unused page portion defined
|
||||
VALGRIND_MAKE_MEM_DEFINED(buffer + logicalSize, bufferSize - logicalSize);
|
||||
} else {
|
||||
buffer = nullptr;
|
||||
}
|
||||
};
|
||||
|
||||
// Must return the same size for all pages created by the same pager instance
|
||||
virtual int size() const = 0;
|
||||
|
||||
StringRef asStringRef() const { return StringRef(begin(), size()); }
|
||||
|
||||
virtual ~IPage() {
|
||||
~ArenaPage() {
|
||||
if (userData != nullptr && userDataDestructor != nullptr) {
|
||||
userDataDestructor(userData);
|
||||
}
|
||||
}
|
||||
|
||||
virtual Reference<IPage> clone() const = 0;
|
||||
uint8_t const* begin() const { return (uint8_t*)buffer; }
|
||||
|
||||
virtual void addref() const = 0;
|
||||
virtual void delref() const = 0;
|
||||
uint8_t* mutate() { return (uint8_t*)buffer; }
|
||||
|
||||
typedef uint32_t Checksum;
|
||||
|
||||
// Usable size, without checksum
|
||||
int size() const { return logicalSize - sizeof(Checksum); }
|
||||
|
||||
Standalone<StringRef> asStringRef() const { return Standalone<StringRef>(StringRef(begin(), size()), arena); }
|
||||
|
||||
// Get an ArenaPage which is a copy of this page, in its own Arena
|
||||
Reference<ArenaPage> cloneContents() const {
|
||||
ArenaPage* p = new ArenaPage(logicalSize, bufferSize);
|
||||
memcpy(p->buffer, buffer, logicalSize);
|
||||
return Reference<ArenaPage>(p);
|
||||
}
|
||||
|
||||
// Get an ArenaPage which depends on this page's Arena and references some of its memory
|
||||
Reference<ArenaPage> subPage(int offset, int len) const {
|
||||
ArenaPage* p = new ArenaPage(len, 0);
|
||||
p->buffer = buffer + offset;
|
||||
p->arena.dependsOn(arena);
|
||||
return Reference<ArenaPage>(p);
|
||||
}
|
||||
|
||||
// Given a vector of pages with the same ->size(), create a new ArenaPage with a ->size() that is
|
||||
// equivalent to all of the input pages and has all of their contents copied into it.
|
||||
static Reference<ArenaPage> concatPages(const std::vector<Reference<const ArenaPage>>& pages) {
|
||||
int usableSize = pages.front()->size();
|
||||
int totalUsableSize = pages.size() * usableSize;
|
||||
int totalBufferSize = pages.front()->bufferSize * pages.size();
|
||||
ArenaPage* p = new ArenaPage(totalUsableSize + sizeof(Checksum), totalBufferSize);
|
||||
|
||||
uint8_t* wptr = p->mutate();
|
||||
for (auto& p : pages) {
|
||||
ASSERT(p->size() == usableSize);
|
||||
memcpy(wptr, p->begin(), usableSize);
|
||||
wptr += usableSize;
|
||||
}
|
||||
|
||||
return Reference<ArenaPage>(p);
|
||||
}
|
||||
|
||||
Checksum& getChecksum() { return *(Checksum*)(buffer + size()); }
|
||||
|
||||
Checksum calculateChecksum(LogicalPageID pageID) { return crc32c_append(pageID, buffer, size()); }
|
||||
|
||||
void updateChecksum(LogicalPageID pageID) { getChecksum() = calculateChecksum(pageID); }
|
||||
|
||||
bool verifyChecksum(LogicalPageID pageID) { return getChecksum() == calculateChecksum(pageID); }
|
||||
|
||||
private:
|
||||
Arena arena;
|
||||
int logicalSize;
|
||||
int bufferSize;
|
||||
uint8_t* buffer;
|
||||
|
||||
public:
|
||||
mutable void* userData;
|
||||
mutable void (*userDataDestructor)(void*);
|
||||
};
|
||||
|
||||
class IPagerSnapshot {
|
||||
public:
|
||||
virtual Future<Reference<const IPage>> getPhysicalPage(LogicalPageID pageID, bool cacheable, bool nohit) = 0;
|
||||
virtual Future<Reference<const ArenaPage>> getPhysicalPage(LogicalPageID pageID, bool cacheable, bool nohit) = 0;
|
||||
virtual Version getVersion() const = 0;
|
||||
|
||||
virtual Key getMetaKey() const = 0;
|
||||
|
@ -79,8 +141,8 @@ public:
|
|||
// This API is probably customized to the behavior of DWALPager and probably needs some changes to be more generic.
|
||||
class IPager2 : public IClosable {
|
||||
public:
|
||||
// Returns an IPage that can be passed to writePage. The data in the returned IPage might not be zeroed.
|
||||
virtual Reference<IPage> newPageBuffer() = 0;
|
||||
// Returns an ArenaPage that can be passed to writePage. The data in the returned ArenaPage might not be zeroed.
|
||||
virtual Reference<ArenaPage> newPageBuffer() = 0;
|
||||
|
||||
// Returns the usable size of pages returned by the pager (i.e. the size of the page that isn't pager overhead).
|
||||
// For a given pager instance, separate calls to this function must return the same value.
|
||||
|
@ -94,13 +156,13 @@ public:
|
|||
// Replace the contents of a page with new data across *all* versions.
|
||||
// Existing holders of a page reference for pageID, read from any version,
|
||||
// may see the effects of this write.
|
||||
virtual void updatePage(LogicalPageID pageID, Reference<IPage> data) = 0;
|
||||
virtual void updatePage(LogicalPageID pageID, Reference<ArenaPage> data) = 0;
|
||||
|
||||
// Try to atomically update the contents of a page as of version v in the next commit.
|
||||
// If the pager is unable to do this at this time, it may choose to write the data to a new page ID
|
||||
// instead and return the new page ID to the caller. Otherwise the original pageID argument will be returned.
|
||||
// If a new page ID is returned, the old page ID will be freed as of version v
|
||||
virtual Future<LogicalPageID> atomicUpdatePage(LogicalPageID pageID, Reference<IPage> data, Version v) = 0;
|
||||
virtual Future<LogicalPageID> atomicUpdatePage(LogicalPageID pageID, Reference<ArenaPage> data, Version v) = 0;
|
||||
|
||||
// Free pageID to be used again after the commit that moves oldestVersion past v
|
||||
virtual void freePage(LogicalPageID pageID, Version v) = 0;
|
||||
|
@ -116,7 +178,7 @@ public:
|
|||
// Cacheable indicates that the page should be added to the page cache (if applicable?) as a result of this read.
|
||||
// NoHit indicates that the read should not be considered a cache hit, such as when preloading pages that are
|
||||
// considered likely to be needed soon.
|
||||
virtual Future<Reference<IPage>> readPage(LogicalPageID pageID, bool cacheable = true, bool noHit = false) = 0;
|
||||
virtual Future<Reference<ArenaPage>> readPage(LogicalPageID pageID, bool cacheable = true, bool noHit = false) = 0;
|
||||
|
||||
// Get a snapshot of the metakey and all pages as of the version v which must be >= getOldestVersion()
|
||||
// Note that snapshots at any version may still see the results of updatePage() calls.
|
||||
|
|
|
@ -27,7 +27,6 @@
|
|||
#include "flow/UnitTest.h"
|
||||
#include "fdbserver/IPager.h"
|
||||
#include "fdbrpc/IAsyncFile.h"
|
||||
#include "flow/crc32c.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include <map>
|
||||
#include <vector>
|
||||
|
@ -244,7 +243,7 @@ public:
|
|||
|
||||
// The current page and pageID being read or written to
|
||||
LogicalPageID pageID;
|
||||
Reference<IPage> page;
|
||||
Reference<ArenaPage> page;
|
||||
|
||||
// The first page ID to be written to the pager, if this cursor has written anything
|
||||
LogicalPageID firstPageIDWritten;
|
||||
|
@ -257,7 +256,7 @@ public:
|
|||
|
||||
// Page future and corresponding page ID for the expected next page to be used. It may not
|
||||
// match the current page's next page link because queues can prepended with new front pages.
|
||||
Future<Reference<IPage>> nextPageReader;
|
||||
Future<Reference<ArenaPage>> nextPageReader;
|
||||
LogicalPageID nextPageID;
|
||||
|
||||
// Future that represents all outstanding write operations previously issued
|
||||
|
@ -856,50 +855,6 @@ int nextPowerOf2(uint32_t x) {
|
|||
return 1 << (32 - clz(x - 1));
|
||||
}
|
||||
|
||||
class FastAllocatedPage : public IPage, public FastAllocated<FastAllocatedPage>, ReferenceCounted<FastAllocatedPage> {
|
||||
public:
|
||||
// Create a fast-allocated page with size total bytes INCLUDING checksum
|
||||
FastAllocatedPage(int size, int bufferSize) : logicalSize(size), bufferSize(bufferSize) {
|
||||
buffer = (uint8_t*)allocateFast(bufferSize);
|
||||
// Mark any unused page portion defined
|
||||
VALGRIND_MAKE_MEM_DEFINED(buffer + logicalSize, bufferSize - logicalSize);
|
||||
};
|
||||
|
||||
~FastAllocatedPage() override { freeFast(bufferSize, buffer); }
|
||||
|
||||
Reference<IPage> clone() const override {
|
||||
FastAllocatedPage* p = new FastAllocatedPage(logicalSize, bufferSize);
|
||||
memcpy(p->buffer, buffer, logicalSize);
|
||||
return Reference<IPage>(p);
|
||||
}
|
||||
|
||||
// Usable size, without checksum
|
||||
int size() const override { return logicalSize - sizeof(Checksum); }
|
||||
|
||||
uint8_t const* begin() const override { return buffer; }
|
||||
|
||||
uint8_t* mutate() override { return buffer; }
|
||||
|
||||
void addref() const override { ReferenceCounted<FastAllocatedPage>::addref(); }
|
||||
|
||||
void delref() const override { ReferenceCounted<FastAllocatedPage>::delref(); }
|
||||
|
||||
typedef uint32_t Checksum;
|
||||
|
||||
Checksum& getChecksum() { return *(Checksum*)(buffer + size()); }
|
||||
|
||||
Checksum calculateChecksum(LogicalPageID pageID) { return crc32c_append(pageID, buffer, size()); }
|
||||
|
||||
void updateChecksum(LogicalPageID pageID) { getChecksum() = calculateChecksum(pageID); }
|
||||
|
||||
bool verifyChecksum(LogicalPageID pageID) { return getChecksum() == calculateChecksum(pageID); }
|
||||
|
||||
private:
|
||||
int logicalSize;
|
||||
int bufferSize;
|
||||
uint8_t* buffer;
|
||||
};
|
||||
|
||||
struct RedwoodMetrics {
|
||||
static constexpr int btreeLevels = 5;
|
||||
|
||||
|
@ -1287,7 +1242,6 @@ class DWALPagerSnapshot;
|
|||
// back to their original location once the original version is no longer needed.
|
||||
class DWALPager : public IPager2 {
|
||||
public:
|
||||
typedef FastAllocatedPage Page;
|
||||
typedef FIFOQueue<LogicalPageID> LogicalPageQueueT;
|
||||
typedef std::map<Version, LogicalPageID> VersionToPageMapT;
|
||||
typedef std::unordered_map<LogicalPageID, VersionToPageMapT> PageToVersionedMapT;
|
||||
|
@ -1419,12 +1373,12 @@ public:
|
|||
wait(store(self->headerPage, self->readHeaderPage(self, 0)));
|
||||
|
||||
// If the checksum fails for the header page, try to recover committed header backup from page 1
|
||||
if (!self->headerPage.castTo<Page>()->verifyChecksum(0)) {
|
||||
if (!self->headerPage->verifyChecksum(0)) {
|
||||
TraceEvent(SevWarn, "DWALPagerRecoveringHeader").detail("Filename", self->filename);
|
||||
|
||||
wait(store(self->headerPage, self->readHeaderPage(self, 1)));
|
||||
|
||||
if (!self->headerPage.castTo<Page>()->verifyChecksum(1)) {
|
||||
if (!self->headerPage->verifyChecksum(1)) {
|
||||
if (g_network->isSimulated()) {
|
||||
// TODO: Detect if process is being restarted and only throw injected if so?
|
||||
throw io_error().asInjectedFault();
|
||||
|
@ -1542,13 +1496,15 @@ public:
|
|||
return Void();
|
||||
}
|
||||
|
||||
Reference<IPage> newPageBuffer() override {
|
||||
return Reference<IPage>(new FastAllocatedPage(logicalPageSize, physicalPageSize));
|
||||
Reference<ArenaPage> newPageBuffer() override {
|
||||
return Reference<ArenaPage>(new ArenaPage(logicalPageSize, physicalPageSize));
|
||||
}
|
||||
|
||||
// Returns the usable size of pages returned by the pager (i.e. the size of the page that isn't pager overhead).
|
||||
// For a given pager instance, separate calls to this function must return the same value.
|
||||
int getUsablePageSize() const override { return logicalPageSize - sizeof(FastAllocatedPage::Checksum); }
|
||||
// TODO: This is abstraction breaking. This should probably be stored as a member, calculated once on construction
|
||||
// by creating an ArenaPage and getting its usable size.
|
||||
int getUsablePageSize() const override { return logicalPageSize - sizeof(ArenaPage::Checksum); }
|
||||
|
||||
// Get a new, previously available page ID. The page will be considered in-use after the next commit
|
||||
// regardless of whether or not it was written to, until it is returned to the pager via freePage()
|
||||
|
@ -1590,7 +1546,7 @@ public:
|
|||
|
||||
Future<LogicalPageID> newPageID() override { return newPageID_impl(this); }
|
||||
|
||||
Future<Void> writePhysicalPage(PhysicalPageID pageID, Reference<IPage> page, bool header = false) {
|
||||
Future<Void> writePhysicalPage(PhysicalPageID pageID, Reference<ArenaPage> page, bool header = false) {
|
||||
debug_printf("DWALPager(%s) op=%s %s ptr=%p\n",
|
||||
filename.c_str(),
|
||||
(header ? "writePhysicalHeader" : "writePhysical"),
|
||||
|
@ -1599,7 +1555,7 @@ public:
|
|||
|
||||
++g_redwoodMetrics.pagerDiskWrite;
|
||||
VALGRIND_MAKE_MEM_DEFINED(page->begin(), page->size());
|
||||
((Page*)page.getPtr())->updateChecksum(pageID);
|
||||
page->updateChecksum(pageID);
|
||||
|
||||
if (memoryOnly) {
|
||||
return Void();
|
||||
|
@ -1620,11 +1576,11 @@ public:
|
|||
return f;
|
||||
}
|
||||
|
||||
Future<Void> writeHeaderPage(PhysicalPageID pageID, Reference<IPage> page) {
|
||||
Future<Void> writeHeaderPage(PhysicalPageID pageID, Reference<ArenaPage> page) {
|
||||
return writePhysicalPage(pageID, page, true);
|
||||
}
|
||||
|
||||
void updatePage(LogicalPageID pageID, Reference<IPage> data) override {
|
||||
void updatePage(LogicalPageID pageID, Reference<ArenaPage> data) override {
|
||||
// Get the cache entry for this page, without counting it as a cache hit as we're replacing its contents now
|
||||
// or as a cache miss because there is no benefit to the page already being in cache
|
||||
PageCacheEntry& cacheEntry = pageCache.get(pageID, true, true);
|
||||
|
@ -1666,7 +1622,7 @@ public:
|
|||
cacheEntry.readFuture = data;
|
||||
}
|
||||
|
||||
Future<LogicalPageID> atomicUpdatePage(LogicalPageID pageID, Reference<IPage> data, Version v) override {
|
||||
Future<LogicalPageID> atomicUpdatePage(LogicalPageID pageID, Reference<ArenaPage> data, Version v) override {
|
||||
debug_printf("DWALPager(%s) op=writeAtomic %s @%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v);
|
||||
Future<LogicalPageID> f = map(newPageID(), [=](LogicalPageID newPageID) {
|
||||
updatePage(newPageID, data);
|
||||
|
@ -1761,9 +1717,9 @@ public:
|
|||
// Read a physical page from the page file. Note that header pages use a page size of smallestPhysicalBlock
|
||||
// If the user chosen physical page size is larger, then there will be a gap of unused space after the header pages
|
||||
// and before the user-chosen sized pages.
|
||||
ACTOR static Future<Reference<IPage>> readPhysicalPage(DWALPager* self,
|
||||
PhysicalPageID pageID,
|
||||
bool header = false) {
|
||||
ACTOR static Future<Reference<ArenaPage>> readPhysicalPage(DWALPager* self,
|
||||
PhysicalPageID pageID,
|
||||
bool header = false) {
|
||||
ASSERT(!self->memoryOnly);
|
||||
++g_redwoodMetrics.pagerDiskRead;
|
||||
|
||||
|
@ -1771,8 +1727,8 @@ public:
|
|||
wait(delay(0, TaskPriority::DiskRead));
|
||||
}
|
||||
|
||||
state Reference<IPage> page =
|
||||
header ? Reference<IPage>(new FastAllocatedPage(smallestPhysicalBlock, smallestPhysicalBlock))
|
||||
state Reference<ArenaPage> page =
|
||||
header ? Reference<ArenaPage>(new ArenaPage(smallestPhysicalBlock, smallestPhysicalBlock))
|
||||
: self->newPageBuffer();
|
||||
debug_printf("DWALPager(%s) op=readPhysicalStart %s ptr=%p\n",
|
||||
self->filename.c_str(),
|
||||
|
@ -1790,8 +1746,7 @@ public:
|
|||
|
||||
// Header reads are checked explicitly during recovery
|
||||
if (!header) {
|
||||
Page* p = (Page*)page.getPtr();
|
||||
if (!p->verifyChecksum(pageID)) {
|
||||
if (!page->verifyChecksum(pageID)) {
|
||||
debug_printf(
|
||||
"DWALPager(%s) checksum failed for %s\n", self->filename.c_str(), toString(pageID).c_str());
|
||||
Error e = checksum_failed();
|
||||
|
@ -1800,8 +1755,8 @@ public:
|
|||
.detail("PageID", pageID)
|
||||
.detail("PageSize", self->physicalPageSize)
|
||||
.detail("Offset", pageID * self->physicalPageSize)
|
||||
.detail("CalculatedChecksum", p->calculateChecksum(pageID))
|
||||
.detail("ChecksumInPage", p->getChecksum())
|
||||
.detail("CalculatedChecksum", page->calculateChecksum(pageID))
|
||||
.detail("ChecksumInPage", page->getChecksum())
|
||||
.error(e);
|
||||
ASSERT(false);
|
||||
throw e;
|
||||
|
@ -1810,13 +1765,13 @@ public:
|
|||
return page;
|
||||
}
|
||||
|
||||
static Future<Reference<IPage>> readHeaderPage(DWALPager* self, PhysicalPageID pageID) {
|
||||
static Future<Reference<ArenaPage>> readHeaderPage(DWALPager* self, PhysicalPageID pageID) {
|
||||
return readPhysicalPage(self, pageID, true);
|
||||
}
|
||||
|
||||
// Reads the most recent version of pageID, either previously committed or written using updatePage() in the current
|
||||
// commit
|
||||
Future<Reference<IPage>> readPage(LogicalPageID pageID, bool cacheable, bool noHit = false) override {
|
||||
Future<Reference<ArenaPage>> readPage(LogicalPageID pageID, bool cacheable, bool noHit = false) override {
|
||||
// Use cached page if present, without triggering a cache hit.
|
||||
// Otherwise, read the page and return it but don't add it to the cache
|
||||
if (!cacheable) {
|
||||
|
@ -1849,7 +1804,7 @@ public:
|
|||
return cacheEntry.readFuture;
|
||||
}
|
||||
|
||||
Future<Reference<IPage>> readPageAtVersion(LogicalPageID pageID, Version v, bool cacheable, bool noHit) {
|
||||
Future<Reference<ArenaPage>> readPageAtVersion(LogicalPageID pageID, Version v, bool cacheable, bool noHit) {
|
||||
auto i = remappedPages.find(pageID);
|
||||
|
||||
if (i != remappedPages.end()) {
|
||||
|
@ -1978,7 +1933,7 @@ public:
|
|||
debug_printf("DWALPager(%s) remapCleanup copy %s\n", self->filename.c_str(), p.toString().c_str());
|
||||
|
||||
// Read the data from the page that the original was mapped to
|
||||
Reference<IPage> data = wait(self->readPage(p.newPageID, false, true));
|
||||
Reference<ArenaPage> data = wait(self->readPage(p.newPageID, false, true));
|
||||
|
||||
// Write the data to the original page so it can be read using its original pageID
|
||||
self->updatePage(p.originalPageID, data);
|
||||
|
@ -2324,7 +2279,7 @@ private:
|
|||
#pragma pack(pop)
|
||||
|
||||
struct PageCacheEntry {
|
||||
Future<Reference<IPage>> readFuture;
|
||||
Future<Reference<ArenaPage>> readFuture;
|
||||
Future<Void> writeFuture;
|
||||
|
||||
bool initialized() const { return readFuture.isValid(); }
|
||||
|
@ -2351,12 +2306,12 @@ private:
|
|||
int64_t pageCacheBytes;
|
||||
|
||||
// The header will be written to / read from disk as a smallestPhysicalBlock sized chunk.
|
||||
Reference<IPage> headerPage;
|
||||
Reference<ArenaPage> headerPage;
|
||||
Header* pHeader;
|
||||
|
||||
int desiredPageSize;
|
||||
|
||||
Reference<IPage> lastCommittedHeaderPage;
|
||||
Reference<ArenaPage> lastCommittedHeaderPage;
|
||||
Header* pLastCommittedHeader;
|
||||
|
||||
std::string filename;
|
||||
|
@ -2410,12 +2365,12 @@ public:
|
|||
: pager(pager), metaKey(meta), version(version), expired(expiredFuture) {}
|
||||
~DWALPagerSnapshot() override {}
|
||||
|
||||
Future<Reference<const IPage>> getPhysicalPage(LogicalPageID pageID, bool cacheable, bool noHit) override {
|
||||
Future<Reference<const ArenaPage>> getPhysicalPage(LogicalPageID pageID, bool cacheable, bool noHit) override {
|
||||
if (expired.isError()) {
|
||||
throw expired.getError();
|
||||
}
|
||||
return map(pager->readPageAtVersion(pageID, version, cacheable, noHit),
|
||||
[=](Reference<IPage> p) { return Reference<const IPage>(p); });
|
||||
[=](Reference<ArenaPage> p) { return Reference<const ArenaPage>(p); });
|
||||
}
|
||||
|
||||
Key getMetaKey() const override { return metaKey; }
|
||||
|
@ -3191,21 +3146,21 @@ struct BTreePage {
|
|||
}
|
||||
};
|
||||
|
||||
static void makeEmptyRoot(Reference<IPage> page) {
|
||||
static void makeEmptyRoot(Reference<ArenaPage> page) {
|
||||
BTreePage* btpage = (BTreePage*)page->begin();
|
||||
btpage->height = 1;
|
||||
btpage->kvBytes = 0;
|
||||
btpage->tree().build(page->size(), nullptr, nullptr, nullptr, nullptr);
|
||||
}
|
||||
|
||||
BTreePage::BinaryTree::Cursor getCursor(const Reference<const IPage>& page) {
|
||||
BTreePage::BinaryTree::Cursor getCursor(const Reference<const ArenaPage>& page) {
|
||||
return ((BTreePage::BinaryTree::Mirror*)page->userData)->getCursor();
|
||||
}
|
||||
|
||||
struct BoundaryRefAndPage {
|
||||
Standalone<RedwoodRecordRef> lowerBound;
|
||||
Reference<IPage> firstPage;
|
||||
std::vector<Reference<IPage>> extPages;
|
||||
Reference<ArenaPage> firstPage;
|
||||
std::vector<Reference<ArenaPage>> extPages;
|
||||
|
||||
std::string toString() const {
|
||||
return format("[%s, %d pages]", lowerBound.toString().c_str(), extPages.size() + (firstPage ? 1 : 0));
|
||||
|
@ -3417,7 +3372,7 @@ public:
|
|||
|
||||
loop {
|
||||
state int toPop = SERVER_KNOBS->REDWOOD_LAZY_CLEAR_BATCH_SIZE_PAGES;
|
||||
state std::vector<std::pair<LazyClearQueueEntry, Future<Reference<const IPage>>>> entries;
|
||||
state std::vector<std::pair<LazyClearQueueEntry, Future<Reference<const ArenaPage>>>> entries;
|
||||
entries.reserve(toPop);
|
||||
|
||||
// Take up to batchSize pages from front of queue
|
||||
|
@ -3436,7 +3391,7 @@ public:
|
|||
|
||||
state int i;
|
||||
for (i = 0; i < entries.size(); ++i) {
|
||||
Reference<const IPage> p = wait(entries[i].second);
|
||||
Reference<const ArenaPage> p = wait(entries[i].second);
|
||||
const LazyClearQueueEntry& entry = entries[i].first;
|
||||
const BTreePage& btPage = *(BTreePage*)p->begin();
|
||||
auto& metrics = g_redwoodMetrics.level(btPage.height);
|
||||
|
@ -3519,7 +3474,7 @@ public:
|
|||
self->m_header.root.set(newRoot, sizeof(headerSpace) - sizeof(m_header));
|
||||
self->m_header.height = 1;
|
||||
++latest;
|
||||
Reference<IPage> page = self->m_pager->newPageBuffer();
|
||||
Reference<ArenaPage> page = self->m_pager->newPageBuffer();
|
||||
makeEmptyRoot(page);
|
||||
self->m_pager->updatePage(id, page);
|
||||
self->m_pager->setCommitVersion(latest);
|
||||
|
@ -3867,7 +3822,7 @@ private:
|
|||
Future<int> m_lazyClearActor;
|
||||
bool m_lazyClearStop;
|
||||
|
||||
// Writes entries to 1 or more pages and return a vector of boundary keys with their IPage(s)
|
||||
// Writes entries to 1 or more pages and return a vector of boundary keys with their ArenaPage(s)
|
||||
ACTOR static Future<Standalone<VectorRef<RedwoodRecordRef>>> writePages(VersionedBTree* self,
|
||||
const RedwoodRecordRef* lowerBound,
|
||||
const RedwoodRecordRef* upperBound,
|
||||
|
@ -3987,12 +3942,12 @@ private:
|
|||
pageUpperBound.truncate(commonPrefix + 1);
|
||||
}
|
||||
|
||||
state std::vector<Reference<IPage>> pages;
|
||||
state std::vector<Reference<ArenaPage>> pages;
|
||||
BTreePage* btPage;
|
||||
|
||||
int capacity = blockSize * blockCount;
|
||||
if (blockCount == 1) {
|
||||
Reference<IPage> page = self->m_pager->newPageBuffer();
|
||||
Reference<ArenaPage> page = self->m_pager->newPageBuffer();
|
||||
btPage = (BTreePage*)page->mutate();
|
||||
pages.push_back(std::move(page));
|
||||
} else {
|
||||
|
@ -4049,7 +4004,7 @@ private:
|
|||
VALGRIND_MAKE_MEM_DEFINED(((uint8_t*)btPage) + written, (blockCount * blockSize) - written);
|
||||
const uint8_t* rptr = (const uint8_t*)btPage;
|
||||
for (int b = 0; b < blockCount; ++b) {
|
||||
Reference<IPage> page = self->m_pager->newPageBuffer();
|
||||
Reference<ArenaPage> page = self->m_pager->newPageBuffer();
|
||||
memcpy(page->mutate(), rptr, blockSize);
|
||||
rptr += blockSize;
|
||||
pages.push_back(std::move(page));
|
||||
|
@ -4156,46 +4111,11 @@ private:
|
|||
return records;
|
||||
}
|
||||
|
||||
class SuperPage : public IPage, ReferenceCounted<SuperPage>, public FastAllocated<SuperPage> {
|
||||
public:
|
||||
SuperPage(std::vector<Reference<const IPage>> pages) {
|
||||
int blockSize = pages.front()->size();
|
||||
m_size = blockSize * pages.size();
|
||||
m_data = new uint8_t[m_size];
|
||||
uint8_t* wptr = m_data;
|
||||
for (auto& p : pages) {
|
||||
ASSERT(p->size() == blockSize);
|
||||
memcpy(wptr, p->begin(), blockSize);
|
||||
wptr += blockSize;
|
||||
}
|
||||
}
|
||||
|
||||
~SuperPage() override { delete[] m_data; }
|
||||
|
||||
Reference<IPage> clone() const override {
|
||||
return Reference<IPage>(new SuperPage({ Reference<const IPage>::addRef(this) }));
|
||||
}
|
||||
|
||||
void addref() const override { ReferenceCounted<SuperPage>::addref(); }
|
||||
|
||||
void delref() const override { ReferenceCounted<SuperPage>::delref(); }
|
||||
|
||||
int size() const override { return m_size; }
|
||||
|
||||
uint8_t const* begin() const override { return m_data; }
|
||||
|
||||
uint8_t* mutate() override { return m_data; }
|
||||
|
||||
private:
|
||||
uint8_t* m_data;
|
||||
int m_size;
|
||||
};
|
||||
|
||||
ACTOR static Future<Reference<const IPage>> readPage(Reference<IPagerSnapshot> snapshot,
|
||||
BTreePageIDRef id,
|
||||
const RedwoodRecordRef* lowerBound,
|
||||
const RedwoodRecordRef* upperBound,
|
||||
bool forLazyClear = false) {
|
||||
ACTOR static Future<Reference<const ArenaPage>> readPage(Reference<IPagerSnapshot> snapshot,
|
||||
BTreePageIDRef id,
|
||||
const RedwoodRecordRef* lowerBound,
|
||||
const RedwoodRecordRef* upperBound,
|
||||
bool forLazyClear = false) {
|
||||
if (!forLazyClear) {
|
||||
debug_printf("readPage() op=read %s @%" PRId64 " lower=%s upper=%s\n",
|
||||
toString(id).c_str(),
|
||||
|
@ -4209,20 +4129,20 @@ private:
|
|||
|
||||
wait(yield());
|
||||
|
||||
state Reference<const IPage> page;
|
||||
state Reference<const ArenaPage> page;
|
||||
|
||||
if (id.size() == 1) {
|
||||
Reference<const IPage> p = wait(snapshot->getPhysicalPage(id.front(), !forLazyClear, false));
|
||||
Reference<const ArenaPage> p = wait(snapshot->getPhysicalPage(id.front(), !forLazyClear, false));
|
||||
page = p;
|
||||
} else {
|
||||
ASSERT(!id.empty());
|
||||
std::vector<Future<Reference<const IPage>>> reads;
|
||||
std::vector<Future<Reference<const ArenaPage>>> reads;
|
||||
for (auto& pageID : id) {
|
||||
reads.push_back(snapshot->getPhysicalPage(pageID, !forLazyClear, false));
|
||||
}
|
||||
std::vector<Reference<const IPage>> pages = wait(getAll(reads));
|
||||
std::vector<Reference<const ArenaPage>> pages = wait(getAll(reads));
|
||||
// TODO: Cache reconstituted super pages somehow, perhaps with help from the Pager.
|
||||
page = Reference<const IPage>(new SuperPage(pages));
|
||||
page = ArenaPage::concatPages(pages);
|
||||
}
|
||||
|
||||
debug_printf("readPage() op=readComplete %s @%" PRId64 " \n", toString(id).c_str(), snapshot->getVersion());
|
||||
|
@ -4270,7 +4190,7 @@ private:
|
|||
ACTOR static Future<BTreePageIDRef> updateBTreePage(VersionedBTree* self,
|
||||
BTreePageIDRef oldID,
|
||||
Arena* arena,
|
||||
Reference<IPage> page,
|
||||
Reference<ArenaPage> page,
|
||||
Version writeVersion) {
|
||||
state BTreePageIDRef newID;
|
||||
newID.resize(*arena, oldID.size());
|
||||
|
@ -4279,11 +4199,11 @@ private:
|
|||
LogicalPageID id = wait(self->m_pager->atomicUpdatePage(oldID.front(), page, writeVersion));
|
||||
newID.front() = id;
|
||||
} else {
|
||||
state std::vector<Reference<IPage>> pages;
|
||||
state std::vector<Reference<ArenaPage>> pages;
|
||||
const uint8_t* rptr = page->begin();
|
||||
int bytesLeft = page->size();
|
||||
while (bytesLeft > 0) {
|
||||
Reference<IPage> p = self->m_pager->newPageBuffer();
|
||||
Reference<ArenaPage> p = self->m_pager->newPageBuffer();
|
||||
int blockSize = p->size();
|
||||
memcpy(p->mutate(), rptr, blockSize);
|
||||
rptr += blockSize;
|
||||
|
@ -4304,8 +4224,8 @@ private:
|
|||
}
|
||||
|
||||
// Copy page and initialize a Mirror for reading it.
|
||||
Reference<IPage> cloneForUpdate(Reference<const IPage> page) {
|
||||
Reference<IPage> newPage = page->clone();
|
||||
Reference<ArenaPage> cloneForUpdate(Reference<const ArenaPage> page) {
|
||||
Reference<ArenaPage> newPage = page->cloneContents();
|
||||
|
||||
auto oldMirror = (const BTreePage::BinaryTree::Mirror*)page->userData;
|
||||
auto newBTPage = (BTreePage*)newPage->mutate();
|
||||
|
@ -4616,7 +4536,7 @@ private:
|
|||
state Reference<FlowLock> commitReadLock = self->m_commitReadLock;
|
||||
wait(commitReadLock->take());
|
||||
state FlowLock::Releaser readLock(*commitReadLock);
|
||||
state Reference<const IPage> page =
|
||||
state Reference<const ArenaPage> page =
|
||||
wait(readPage(snapshot, rootID, update->decodeLowerBound, update->decodeUpperBound));
|
||||
readLock.release();
|
||||
|
||||
|
@ -4874,7 +4794,7 @@ private:
|
|||
} else {
|
||||
// Otherwise update it.
|
||||
BTreePageIDRef newID = wait(self->updateBTreePage(
|
||||
self, rootID, &update->newLinks.arena(), page.castTo<IPage>(), writeVersion));
|
||||
self, rootID, &update->newLinks.arena(), page.castTo<ArenaPage>(), writeVersion));
|
||||
|
||||
update->updatedInPlace(newID, btPage, newID.size() * self->m_blockSize);
|
||||
debug_printf(
|
||||
|
@ -5180,7 +5100,7 @@ private:
|
|||
}
|
||||
|
||||
BTreePageIDRef newID = wait(self->updateBTreePage(
|
||||
self, rootID, &update->newLinks.arena(), page.castTo<IPage>(), writeVersion));
|
||||
self, rootID, &update->newLinks.arena(), page.castTo<ArenaPage>(), writeVersion));
|
||||
debug_printf(
|
||||
"%s commitSubtree(): Internal page updated in-place at version %s, new contents: %s\n",
|
||||
context.c_str(),
|
||||
|
@ -5306,7 +5226,7 @@ private:
|
|||
if (all.newLinks.empty()) {
|
||||
debug_printf("Writing new empty root.\n");
|
||||
LogicalPageID newRootID = wait(self->m_pager->newPageID());
|
||||
Reference<IPage> page = self->m_pager->newPageBuffer();
|
||||
Reference<ArenaPage> page = self->m_pager->newPageBuffer();
|
||||
makeEmptyRoot(page);
|
||||
self->m_header.height = 1;
|
||||
self->m_pager->updatePage(newRootID, page);
|
||||
|
@ -5367,12 +5287,12 @@ public:
|
|||
struct PageCursor : ReferenceCounted<PageCursor>, FastAllocated<PageCursor> {
|
||||
Reference<PageCursor> parent;
|
||||
BTreePageIDRef pageID; // Only needed for debugging purposes
|
||||
Reference<const IPage> page;
|
||||
Reference<const ArenaPage> page;
|
||||
BTreePage::BinaryTree::Cursor cursor;
|
||||
|
||||
// id will normally reference memory owned by the parent, which is okay because a reference to the parent
|
||||
// will be held in the cursor
|
||||
PageCursor(BTreePageIDRef id, Reference<const IPage> page, Reference<PageCursor> parent = {})
|
||||
PageCursor(BTreePageIDRef id, Reference<const ArenaPage> page, Reference<PageCursor> parent = {})
|
||||
: pageID(id), page(page), parent(parent), cursor(getCursor(page)) {}
|
||||
|
||||
PageCursor(const PageCursor& toCopy)
|
||||
|
@ -5391,7 +5311,7 @@ public:
|
|||
next.moveNext();
|
||||
const RedwoodRecordRef& rec = cursor.get();
|
||||
BTreePageIDRef id = rec.getChildPage();
|
||||
Future<Reference<const IPage>> child = readPage(pager, id, &rec, &next.getOrUpperBound());
|
||||
Future<Reference<const ArenaPage>> child = readPage(pager, id, &rec, &next.getOrUpperBound());
|
||||
|
||||
// Read ahead siblings at level 2
|
||||
// TODO: Application of readAheadBytes is not taking into account the size of the current page or any
|
||||
|
@ -5409,7 +5329,7 @@ public:
|
|||
} while (readAheadBytes > 0 && next.moveNext());
|
||||
}
|
||||
|
||||
return map(child, [=](Reference<const IPage> page) {
|
||||
return map(child, [=](Reference<const ArenaPage> page) {
|
||||
return makeReference<PageCursor>(id, page, Reference<PageCursor>::addRef(this));
|
||||
});
|
||||
}
|
||||
|
@ -5485,8 +5405,8 @@ public:
|
|||
}
|
||||
|
||||
// Otherwise read the root page
|
||||
Future<Reference<const IPage>> root = readPage(pager, rootPageID, &dbBegin, &dbEnd);
|
||||
return map(root, [=](Reference<const IPage> p) {
|
||||
Future<Reference<const ArenaPage>> root = readPage(pager, rootPageID, &dbBegin, &dbEnd);
|
||||
return map(root, [=](Reference<const ArenaPage> p) {
|
||||
pageCursor = makeReference<PageCursor>(rootPageID, p);
|
||||
return Void();
|
||||
});
|
||||
|
@ -5628,7 +5548,7 @@ public:
|
|||
class BTreeCursor {
|
||||
Arena arena;
|
||||
Reference<IPagerSnapshot> pager;
|
||||
std::unordered_map<LogicalPageID, Reference<const IPage>> pages;
|
||||
std::unordered_map<LogicalPageID, Reference<const ArenaPage>> pages;
|
||||
VersionedBTree* btree;
|
||||
bool valid;
|
||||
|
||||
|
@ -5675,7 +5595,7 @@ public:
|
|||
Future<Void> pushPage(BTreePageIDRef id,
|
||||
const RedwoodRecordRef& lowerBound,
|
||||
const RedwoodRecordRef& upperBound) {
|
||||
Reference<const IPage>& page = pages[id.front()];
|
||||
Reference<const ArenaPage>& page = pages[id.front()];
|
||||
if (page.isValid()) {
|
||||
// The pager won't see this access so count it as a cache hit
|
||||
++g_redwoodMetrics.pagerCacheHit;
|
||||
|
@ -5683,7 +5603,7 @@ public:
|
|||
return Void();
|
||||
}
|
||||
|
||||
return map(readPage(pager, id, &lowerBound, &upperBound), [this, &page, id](Reference<const IPage> p) {
|
||||
return map(readPage(pager, id, &lowerBound, &upperBound), [this, &page, id](Reference<const ArenaPage> p) {
|
||||
page = p;
|
||||
path.push_back(arena, { (BTreePage*)p->begin(), getCursor(p) });
|
||||
return Void();
|
||||
|
@ -8190,12 +8110,12 @@ TEST_CASE(":/redwood/correctness/pager/cow") {
|
|||
|
||||
wait(success(pager->init()));
|
||||
state LogicalPageID id = wait(pager->newPageID());
|
||||
Reference<IPage> p = pager->newPageBuffer();
|
||||
Reference<ArenaPage> p = pager->newPageBuffer();
|
||||
memset(p->mutate(), (char)id, p->size());
|
||||
pager->updatePage(id, p);
|
||||
pager->setMetaKey(LiteralStringRef("asdfasdf"));
|
||||
wait(pager->commit());
|
||||
Reference<IPage> p2 = wait(pager->readPage(id, true));
|
||||
Reference<ArenaPage> p2 = wait(pager->readPage(id, true));
|
||||
printf("%s\n", StringRef(p2->begin(), p2->size()).toHexString().c_str());
|
||||
|
||||
// TODO: Verify reads, do more writes and reads to make this a real pager validator
|
||||
|
|
Loading…
Reference in New Issue