Merge pull request #4780 from sfc-gh-satherton/arena-page

Zero-copy reads in Redwood via Arena-based pages and ability to allocate 4k-aligned memory in Arenas
This commit is contained in:
Josh Slocum 2021-05-18 16:31:34 -07:00 committed by GitHub
commit 0752fb693e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 380 additions and 270 deletions

View File

@ -630,7 +630,14 @@ void showArena(ArenaBlock* a, ArenaBlock* parent) {
int o = a->nextBlockOffset;
while (o) {
ArenaBlockRef* r = (ArenaBlockRef*)((char*)a->getData() + o);
showArena(r->next, a);
// If alignedBuffer is valid then print its pointer and size, else recurse
if (r->aligned4kBufferSize != 0) {
printf("AlignedBuffer %p (<-%p) %u bytes\n", r->aligned4kBuffer, a, r->aligned4kBufferSize);
} else {
showArena(r->next, a);
}
o = r->nextBlockOffset;
}
}

View File

@ -371,9 +371,9 @@ public:
const T* upperBound() const { return upper; }
DeltaTree* tree;
Arena arena;
private:
Arena arena;
DecodedNode* root;
const T* lower;
const T* upper;

View File

@ -26,6 +26,7 @@
#include "flow/flow.h"
#include "fdbclient/FDBTypes.h"
#include "flow/crc32c.h"
#ifndef VALGRIND
#define VALGRIND_MAKE_MEM_UNDEFINED(x, y)
@ -36,39 +37,101 @@ typedef uint32_t LogicalPageID;
typedef uint32_t PhysicalPageID;
#define invalidLogicalPageID std::numeric_limits<LogicalPageID>::max()
class IPage {
// Represents a block of memory in a 4096-byte aligned location held by an Arena.
class ArenaPage : public ReferenceCounted<ArenaPage>, public FastAllocated<ArenaPage> {
public:
IPage() : userData(nullptr) {}
// The page's logical size includes an opaque checksum, use size() to get usable size
ArenaPage(int logicalSize, int bufferSize) : logicalSize(logicalSize), bufferSize(bufferSize), userData(nullptr) {
if (bufferSize > 0) {
buffer = (uint8_t*)arena.allocate4kAlignedBuffer(bufferSize);
virtual uint8_t const* begin() const = 0;
virtual uint8_t* mutate() = 0;
// Mark any unused page portion defined
VALGRIND_MAKE_MEM_DEFINED(buffer + logicalSize, bufferSize - logicalSize);
} else {
buffer = nullptr;
}
};
// Must return the same size for all pages created by the same pager instance
virtual int size() const = 0;
StringRef asStringRef() const { return StringRef(begin(), size()); }
virtual ~IPage() {
~ArenaPage() {
if (userData != nullptr && userDataDestructor != nullptr) {
userDataDestructor(userData);
}
if (buffer != nullptr) {
VALGRIND_MAKE_MEM_UNDEFINED(buffer, bufferSize);
}
}
virtual Reference<IPage> clone() const = 0;
uint8_t const* begin() const { return (uint8_t*)buffer; }
virtual void addref() const = 0;
virtual void delref() const = 0;
uint8_t* mutate() { return (uint8_t*)buffer; }
typedef uint32_t Checksum;
// Usable size, without checksum
int size() const { return logicalSize - sizeof(Checksum); }
Standalone<StringRef> asStringRef() const { return Standalone<StringRef>(StringRef(begin(), size()), arena); }
// Get an ArenaPage which is a copy of this page, in its own Arena
Reference<ArenaPage> cloneContents() const {
ArenaPage* p = new ArenaPage(logicalSize, bufferSize);
memcpy(p->buffer, buffer, logicalSize);
return Reference<ArenaPage>(p);
}
// Get an ArenaPage which depends on this page's Arena and references some of its memory
Reference<ArenaPage> subPage(int offset, int len) const {
ArenaPage* p = new ArenaPage(len, 0);
p->buffer = buffer + offset;
p->arena.dependsOn(arena);
return Reference<ArenaPage>(p);
}
// Given a vector of pages with the same ->size(), create a new ArenaPage with a ->size() that is
// equivalent to all of the input pages and has all of their contents copied into it.
static Reference<ArenaPage> concatPages(const std::vector<Reference<const ArenaPage>>& pages) {
int usableSize = pages.front()->size();
int totalUsableSize = pages.size() * usableSize;
int totalBufferSize = pages.front()->bufferSize * pages.size();
ArenaPage* superpage = new ArenaPage(totalUsableSize + sizeof(Checksum), totalBufferSize);
uint8_t* wptr = superpage->mutate();
for (auto& p : pages) {
ASSERT(p->size() == usableSize);
memcpy(wptr, p->begin(), usableSize);
wptr += usableSize;
}
return Reference<ArenaPage>(superpage);
}
Checksum& getChecksum() { return *(Checksum*)(buffer + size()); }
Checksum calculateChecksum(LogicalPageID pageID) { return crc32c_append(pageID, buffer, size()); }
void updateChecksum(LogicalPageID pageID) { getChecksum() = calculateChecksum(pageID); }
bool verifyChecksum(LogicalPageID pageID) { return getChecksum() == calculateChecksum(pageID); }
const Arena& getArena() const { return arena; }
private:
Arena arena;
int logicalSize;
int bufferSize;
uint8_t* buffer;
public:
mutable void* userData;
mutable void (*userDataDestructor)(void*);
};
class IPagerSnapshot {
public:
virtual Future<Reference<const IPage>> getPhysicalPage(LogicalPageID pageID,
bool cacheable,
bool nohit,
bool* fromCache = nullptr) = 0;
virtual Future<Reference<const ArenaPage>> getPhysicalPage(LogicalPageID pageID,
bool cacheable,
bool nohit,
bool* fromCache = nullptr) = 0;
virtual bool tryEvictPage(LogicalPageID id) = 0;
virtual Version getVersion() const = 0;
@ -83,8 +146,8 @@ public:
// This API is probably too customized to the behavior of DWALPager and probably needs some changes to be more generic.
class IPager2 : public IClosable {
public:
// Returns an IPage that can be passed to writePage. The data in the returned IPage might not be zeroed.
virtual Reference<IPage> newPageBuffer() = 0;
// Returns an ArenaPage that can be passed to writePage. The data in the returned ArenaPage might not be zeroed.
virtual Reference<ArenaPage> newPageBuffer() = 0;
// Returns the usable size of pages returned by the pager (i.e. the size of the page that isn't pager overhead).
// For a given pager instance, separate calls to this function must return the same value.
@ -98,13 +161,13 @@ public:
// Replace the contents of a page with new data across *all* versions.
// Existing holders of a page reference for pageID, read from any version,
// may see the effects of this write.
virtual void updatePage(LogicalPageID pageID, Reference<IPage> data) = 0;
virtual void updatePage(LogicalPageID pageID, Reference<ArenaPage> data) = 0;
// Try to atomically update the contents of a page as of version v in the next commit.
// If the pager is unable to do this at this time, it may choose to write the data to a new page ID
// instead and return the new page ID to the caller. Otherwise the original pageID argument will be returned.
// If a new page ID is returned, the old page ID will be freed as of version v
virtual Future<LogicalPageID> atomicUpdatePage(LogicalPageID pageID, Reference<IPage> data, Version v) = 0;
virtual Future<LogicalPageID> atomicUpdatePage(LogicalPageID pageID, Reference<ArenaPage> data, Version v) = 0;
// Free pageID to be used again after the commit that moves oldestVersion past v
virtual void freePage(LogicalPageID pageID, Version v) = 0;
@ -120,10 +183,10 @@ public:
// Cacheable indicates that the page should be added to the page cache (if applicable?) as a result of this read.
// NoHit indicates that the read should not be considered a cache hit, such as when preloading pages that are
// considered likely to be needed soon.
virtual Future<Reference<IPage>> readPage(LogicalPageID pageID,
bool cacheable = true,
bool noHit = false,
bool* fromCache = nullptr) = 0;
virtual Future<Reference<ArenaPage>> readPage(LogicalPageID pageID,
bool cacheable = true,
bool noHit = false,
bool* fromCache = nullptr) = 0;
// Get a snapshot of the metakey and all pages as of the version v which must be >= getOldestVersion()
// Note that snapshots at any version may still see the results of updatePage() calls.

View File

@ -27,7 +27,6 @@
#include "flow/UnitTest.h"
#include "fdbserver/IPager.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/crc32c.h"
#include "flow/ActorCollection.h"
#include <map>
#include <vector>
@ -244,7 +243,7 @@ public:
// The current page and pageID being read or written to
LogicalPageID pageID;
Reference<IPage> page;
Reference<ArenaPage> page;
// The first page ID to be written to the pager, if this cursor has written anything
LogicalPageID firstPageIDWritten;
@ -257,7 +256,7 @@ public:
// Page future and corresponding page ID for the expected next page to be used. It may not
// match the current page's next page link because queues can prepended with new front pages.
Future<Reference<IPage>> nextPageReader;
Future<Reference<ArenaPage>> nextPageReader;
LogicalPageID nextPageID;
// Future that represents all outstanding write operations previously issued
@ -265,6 +264,7 @@ public:
Future<Void> writeOperations;
FlowLock mutex;
Future<Void> killMutex;
Cursor() : mode(NONE) {}
@ -275,6 +275,14 @@ public:
int readOffset = 0,
LogicalPageID endPage = invalidLogicalPageID) {
queue = q;
// If the pager gets an error, which includes shutdown, kill the mutex so any waiters can no longer run.
// This avoids having every mutex wait also wait on pagerError.
killMutex = map(ready(queue->pagerError), [=](Void e) {
mutex.kill();
return Void();
});
mode = m;
firstPageIDWritten = invalidLogicalPageID;
offset = readOffset;
@ -856,50 +864,6 @@ int nextPowerOf2(uint32_t x) {
return 1 << (32 - clz(x - 1));
}
class FastAllocatedPage : public IPage, public FastAllocated<FastAllocatedPage>, ReferenceCounted<FastAllocatedPage> {
public:
// Create a fast-allocated page with size total bytes INCLUDING checksum
FastAllocatedPage(int size, int bufferSize) : logicalSize(size), bufferSize(bufferSize) {
buffer = (uint8_t*)allocateFast(bufferSize);
// Mark any unused page portion defined
VALGRIND_MAKE_MEM_DEFINED(buffer + logicalSize, bufferSize - logicalSize);
};
~FastAllocatedPage() override { freeFast(bufferSize, buffer); }
Reference<IPage> clone() const override {
FastAllocatedPage* p = new FastAllocatedPage(logicalSize, bufferSize);
memcpy(p->buffer, buffer, logicalSize);
return Reference<IPage>(p);
}
// Usable size, without checksum
int size() const override { return logicalSize - sizeof(Checksum); }
uint8_t const* begin() const override { return buffer; }
uint8_t* mutate() override { return buffer; }
void addref() const override { ReferenceCounted<FastAllocatedPage>::addref(); }
void delref() const override { ReferenceCounted<FastAllocatedPage>::delref(); }
typedef uint32_t Checksum;
Checksum& getChecksum() { return *(Checksum*)(buffer + size()); }
Checksum calculateChecksum(LogicalPageID pageID) { return crc32c_append(pageID, buffer, size()); }
void updateChecksum(LogicalPageID pageID) { getChecksum() = calculateChecksum(pageID); }
bool verifyChecksum(LogicalPageID pageID) { return getChecksum() == calculateChecksum(pageID); }
private:
int logicalSize;
int bufferSize;
uint8_t* buffer;
};
struct RedwoodMetrics {
static constexpr int btreeLevels = 5;
@ -1303,7 +1267,6 @@ class DWALPagerSnapshot;
// back to their original location once the original version is no longer needed.
class DWALPager : public IPager2 {
public:
typedef FastAllocatedPage Page;
typedef FIFOQueue<LogicalPageID> LogicalPageQueueT;
typedef std::map<Version, LogicalPageID> VersionToPageMapT;
typedef std::unordered_map<LogicalPageID, VersionToPageMapT> PageToVersionedMapT;
@ -1435,12 +1398,12 @@ public:
wait(store(self->headerPage, self->readHeaderPage(self, 0)));
// If the checksum fails for the header page, try to recover committed header backup from page 1
if (!self->headerPage.castTo<Page>()->verifyChecksum(0)) {
if (!self->headerPage->verifyChecksum(0)) {
TraceEvent(SevWarn, "DWALPagerRecoveringHeader").detail("Filename", self->filename);
wait(store(self->headerPage, self->readHeaderPage(self, 1)));
if (!self->headerPage.castTo<Page>()->verifyChecksum(1)) {
if (!self->headerPage->verifyChecksum(1)) {
if (g_network->isSimulated()) {
// TODO: Detect if process is being restarted and only throw injected if so?
throw io_error().asInjectedFault();
@ -1558,13 +1521,15 @@ public:
return Void();
}
Reference<IPage> newPageBuffer() override {
return Reference<IPage>(new FastAllocatedPage(logicalPageSize, physicalPageSize));
Reference<ArenaPage> newPageBuffer() override {
return Reference<ArenaPage>(new ArenaPage(logicalPageSize, physicalPageSize));
}
// Returns the usable size of pages returned by the pager (i.e. the size of the page that isn't pager overhead).
// For a given pager instance, separate calls to this function must return the same value.
int getUsablePageSize() const override { return logicalPageSize - sizeof(FastAllocatedPage::Checksum); }
// TODO: This is abstraction breaking. This should probably be stored as a member, calculated once on construction
// by creating an ArenaPage and getting its usable size.
int getUsablePageSize() const override { return logicalPageSize - sizeof(ArenaPage::Checksum); }
// Get a new, previously available page ID. The page will be considered in-use after the next commit
// regardless of whether or not it was written to, until it is returned to the pager via freePage()
@ -1606,7 +1571,7 @@ public:
Future<LogicalPageID> newPageID() override { return newPageID_impl(this); }
Future<Void> writePhysicalPage(PhysicalPageID pageID, Reference<IPage> page, bool header = false) {
Future<Void> writePhysicalPage(PhysicalPageID pageID, Reference<ArenaPage> page, bool header = false) {
debug_printf("DWALPager(%s) op=%s %s ptr=%p\n",
filename.c_str(),
(header ? "writePhysicalHeader" : "writePhysical"),
@ -1615,7 +1580,7 @@ public:
++g_redwoodMetrics.pagerDiskWrite;
VALGRIND_MAKE_MEM_DEFINED(page->begin(), page->size());
((Page*)page.getPtr())->updateChecksum(pageID);
page->updateChecksum(pageID);
if (memoryOnly) {
return Void();
@ -1636,11 +1601,11 @@ public:
return f;
}
Future<Void> writeHeaderPage(PhysicalPageID pageID, Reference<IPage> page) {
Future<Void> writeHeaderPage(PhysicalPageID pageID, Reference<ArenaPage> page) {
return writePhysicalPage(pageID, page, true);
}
void updatePage(LogicalPageID pageID, Reference<IPage> data) override {
void updatePage(LogicalPageID pageID, Reference<ArenaPage> data) override {
// Get the cache entry for this page, without counting it as a cache hit as we're replacing its contents now
// or as a cache miss because there is no benefit to the page already being in cache
PageCacheEntry& cacheEntry = pageCache.get(pageID, true, true);
@ -1682,7 +1647,7 @@ public:
cacheEntry.readFuture = data;
}
Future<LogicalPageID> atomicUpdatePage(LogicalPageID pageID, Reference<IPage> data, Version v) override {
Future<LogicalPageID> atomicUpdatePage(LogicalPageID pageID, Reference<ArenaPage> data, Version v) override {
debug_printf("DWALPager(%s) op=writeAtomic %s @%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v);
Future<LogicalPageID> f = map(newPageID(), [=](LogicalPageID newPageID) {
updatePage(newPageID, data);
@ -1777,9 +1742,9 @@ public:
// Read a physical page from the page file. Note that header pages use a page size of smallestPhysicalBlock
// If the user chosen physical page size is larger, then there will be a gap of unused space after the header pages
// and before the user-chosen sized pages.
ACTOR static Future<Reference<IPage>> readPhysicalPage(DWALPager* self,
PhysicalPageID pageID,
bool header = false) {
ACTOR static Future<Reference<ArenaPage>> readPhysicalPage(DWALPager* self,
PhysicalPageID pageID,
bool header = false) {
ASSERT(!self->memoryOnly);
++g_redwoodMetrics.pagerDiskRead;
@ -1787,8 +1752,8 @@ public:
wait(delay(0, TaskPriority::DiskRead));
}
state Reference<IPage> page =
header ? Reference<IPage>(new FastAllocatedPage(smallestPhysicalBlock, smallestPhysicalBlock))
state Reference<ArenaPage> page =
header ? Reference<ArenaPage>(new ArenaPage(smallestPhysicalBlock, smallestPhysicalBlock))
: self->newPageBuffer();
debug_printf("DWALPager(%s) op=readPhysicalStart %s ptr=%p\n",
self->filename.c_str(),
@ -1806,8 +1771,7 @@ public:
// Header reads are checked explicitly during recovery
if (!header) {
Page* p = (Page*)page.getPtr();
if (!p->verifyChecksum(pageID)) {
if (!page->verifyChecksum(pageID)) {
debug_printf(
"DWALPager(%s) checksum failed for %s\n", self->filename.c_str(), toString(pageID).c_str());
Error e = checksum_failed();
@ -1816,8 +1780,8 @@ public:
.detail("PageID", pageID)
.detail("PageSize", self->physicalPageSize)
.detail("Offset", pageID * self->physicalPageSize)
.detail("CalculatedChecksum", p->calculateChecksum(pageID))
.detail("ChecksumInPage", p->getChecksum())
.detail("CalculatedChecksum", page->calculateChecksum(pageID))
.detail("ChecksumInPage", page->getChecksum())
.error(e);
ASSERT(false);
throw e;
@ -1826,7 +1790,7 @@ public:
return page;
}
static Future<Reference<IPage>> readHeaderPage(DWALPager* self, PhysicalPageID pageID) {
static Future<Reference<ArenaPage>> readHeaderPage(DWALPager* self, PhysicalPageID pageID) {
return readPhysicalPage(self, pageID, true);
}
@ -1840,10 +1804,10 @@ public:
// If cacheable is false then if fromCache is valid it will be set to true if the page is from cache, otherwise
// false. If cacheable is true, fromCache is ignored as the result is automatically from cache by virtue of being
// cacheable.
Future<Reference<IPage>> readPage(LogicalPageID pageID,
bool cacheable,
bool noHit = false,
bool* fromCache = nullptr) override {
Future<Reference<ArenaPage>> readPage(LogicalPageID pageID,
bool cacheable,
bool noHit = false,
bool* fromCache = nullptr) override {
// Use cached page if present, without triggering a cache hit.
// Otherwise, read the page and return it but don't add it to the cache
if (!cacheable) {
@ -1905,11 +1869,11 @@ public:
return (PhysicalPageID)pageID;
}
Future<Reference<IPage>> readPageAtVersion(LogicalPageID logicalID,
Version v,
bool cacheable,
bool noHit,
bool* fromCache) {
Future<Reference<ArenaPage>> readPageAtVersion(LogicalPageID logicalID,
Version v,
bool cacheable,
bool noHit,
bool* fromCache) {
PhysicalPageID physicalID = getPhysicalPageID(logicalID, v);
return readPage(physicalID, cacheable, noHit, fromCache);
}
@ -2018,7 +1982,7 @@ public:
debug_printf("DWALPager(%s) remapCleanup copy %s\n", self->filename.c_str(), p.toString().c_str());
// Read the data from the page that the original was mapped to
Reference<IPage> data = wait(self->readPage(p.newPageID, false, true));
Reference<ArenaPage> data = wait(self->readPage(p.newPageID, false, true));
// Write the data to the original page so it can be read using its original pageID
self->updatePage(p.originalPageID, data);
@ -2365,7 +2329,7 @@ private:
#pragma pack(pop)
struct PageCacheEntry {
Future<Reference<IPage>> readFuture;
Future<Reference<ArenaPage>> readFuture;
Future<Void> writeFuture;
bool initialized() const { return readFuture.isValid(); }
@ -2392,12 +2356,12 @@ private:
int64_t pageCacheBytes;
// The header will be written to / read from disk as a smallestPhysicalBlock sized chunk.
Reference<IPage> headerPage;
Reference<ArenaPage> headerPage;
Header* pHeader;
int desiredPageSize;
Reference<IPage> lastCommittedHeaderPage;
Reference<ArenaPage> lastCommittedHeaderPage;
Header* pLastCommittedHeader;
std::string filename;
@ -2451,15 +2415,15 @@ public:
: pager(pager), metaKey(meta), version(version), expired(expiredFuture) {}
~DWALPagerSnapshot() override {}
Future<Reference<const IPage>> getPhysicalPage(LogicalPageID pageID,
bool cacheable,
bool noHit,
bool* fromCache) override {
Future<Reference<const ArenaPage>> getPhysicalPage(LogicalPageID pageID,
bool cacheable,
bool noHit,
bool* fromCache) override {
if (expired.isError()) {
throw expired.getError();
}
return map(pager->readPageAtVersion(pageID, version, cacheable, noHit, fromCache),
[=](Reference<IPage> p) { return Reference<const IPage>(std::move(p)); });
[=](Reference<ArenaPage> p) { return Reference<const ArenaPage>(std::move(p)); });
}
bool tryEvictPage(LogicalPageID id) override { return pager->tryEvictPage(id, version); }
@ -3238,21 +3202,21 @@ struct BTreePage {
}
};
static void makeEmptyRoot(Reference<IPage> page) {
static void makeEmptyRoot(Reference<ArenaPage> page) {
BTreePage* btpage = (BTreePage*)page->begin();
btpage->height = 1;
btpage->kvBytes = 0;
btpage->tree().build(page->size(), nullptr, nullptr, nullptr, nullptr);
}
BTreePage::BinaryTree::Cursor getCursor(const Reference<const IPage>& page) {
BTreePage::BinaryTree::Cursor getCursor(const Reference<const ArenaPage>& page) {
return ((BTreePage::BinaryTree::Mirror*)page->userData)->getCursor();
}
struct BoundaryRefAndPage {
Standalone<RedwoodRecordRef> lowerBound;
Reference<IPage> firstPage;
std::vector<Reference<IPage>> extPages;
Reference<ArenaPage> firstPage;
std::vector<Reference<ArenaPage>> extPages;
std::string toString() const {
return format("[%s, %d pages]", lowerBound.toString().c_str(), extPages.size() + (firstPage ? 1 : 0));
@ -3464,7 +3428,7 @@ public:
loop {
state int toPop = SERVER_KNOBS->REDWOOD_LAZY_CLEAR_BATCH_SIZE_PAGES;
state std::vector<std::pair<LazyClearQueueEntry, Future<Reference<const IPage>>>> entries;
state std::vector<std::pair<LazyClearQueueEntry, Future<Reference<const ArenaPage>>>> entries;
entries.reserve(toPop);
// Take up to batchSize pages from front of queue
@ -3483,7 +3447,7 @@ public:
state int i;
for (i = 0; i < entries.size(); ++i) {
Reference<const IPage> p = wait(entries[i].second);
Reference<const ArenaPage> p = wait(entries[i].second);
const LazyClearQueueEntry& entry = entries[i].first;
const BTreePage& btPage = *(BTreePage*)p->begin();
auto& metrics = g_redwoodMetrics.level(btPage.height);
@ -3566,7 +3530,7 @@ public:
self->m_header.root.set(newRoot, sizeof(headerSpace) - sizeof(m_header));
self->m_header.height = 1;
++latest;
Reference<IPage> page = self->m_pager->newPageBuffer();
Reference<ArenaPage> page = self->m_pager->newPageBuffer();
makeEmptyRoot(page);
self->m_pager->updatePage(id, page);
self->m_pager->setCommitVersion(latest);
@ -4100,7 +4064,7 @@ private:
return pages;
}
// Writes entries to 1 or more pages and return a vector of boundary keys with their IPage(s)
// Writes entries to 1 or more pages and return a vector of boundary keys with their ArenaPage(s)
ACTOR static Future<Standalone<VectorRef<RedwoodRecordRef>>> writePages(VersionedBTree* self,
const RedwoodRecordRef* lowerBound,
const RedwoodRecordRef* upperBound,
@ -4167,11 +4131,11 @@ private:
pageUpperBound.truncate(commonPrefix + 1);
}
state std::vector<Reference<IPage>> pages;
state std::vector<Reference<ArenaPage>> pages;
BTreePage* btPage;
if (p.blockCount == 1) {
Reference<IPage> page = self->m_pager->newPageBuffer();
Reference<ArenaPage> page = self->m_pager->newPageBuffer();
btPage = (BTreePage*)page->mutate();
pages.push_back(std::move(page));
} else {
@ -4216,7 +4180,7 @@ private:
VALGRIND_MAKE_MEM_DEFINED(((uint8_t*)btPage) + written, (p.blockCount * p.blockSize) - written);
const uint8_t* rptr = (const uint8_t*)btPage;
for (int b = 0; b < p.blockCount; ++b) {
Reference<IPage> page = self->m_pager->newPageBuffer();
Reference<ArenaPage> page = self->m_pager->newPageBuffer();
memcpy(page->mutate(), rptr, p.blockSize);
rptr += p.blockSize;
pages.push_back(std::move(page));
@ -4298,44 +4262,9 @@ private:
return records;
}
class SuperPage : public IPage, ReferenceCounted<SuperPage>, public FastAllocated<SuperPage> {
public:
SuperPage(std::vector<Reference<const IPage>> pages) {
int blockSize = pages.front()->size();
m_size = blockSize * pages.size();
m_data = new uint8_t[m_size];
uint8_t* wptr = m_data;
for (auto& p : pages) {
ASSERT(p->size() == blockSize);
memcpy(wptr, p->begin(), blockSize);
wptr += blockSize;
}
}
~SuperPage() override { delete[] m_data; }
Reference<IPage> clone() const override {
return Reference<IPage>(new SuperPage({ Reference<const IPage>::addRef(this) }));
}
void addref() const override { ReferenceCounted<SuperPage>::addref(); }
void delref() const override { ReferenceCounted<SuperPage>::delref(); }
int size() const override { return m_size; }
uint8_t const* begin() const override { return m_data; }
uint8_t* mutate() override { return m_data; }
private:
uint8_t* m_data;
int m_size;
};
// Try to evict a BTree page from the pager cache.
// Returns true if, at the end of the call, the page is no longer in cache,
// so the caller can assume its IPage reference is the only one.
// so the caller can assume its ArenaPage reference is the only one.
bool tryEvictPage(IPagerSnapshot* pager, BTreePageIDRef id) {
// If it's an oversized page, currently it cannot be in the cache
if (id.size() > 0) {
@ -4344,15 +4273,13 @@ private:
return pager->tryEvictPage(id.front());
}
ACTOR static Future<Reference<const IPage>> readPage(Reference<IPagerSnapshot> snapshot,
BTreePageIDRef id,
const RedwoodRecordRef* lowerBound,
const RedwoodRecordRef* upperBound,
bool forLazyClear = false,
bool cacheable = true,
bool* fromCache = nullptr)
{
ACTOR static Future<Reference<const ArenaPage>> readPage(Reference<IPagerSnapshot> snapshot,
BTreePageIDRef id,
const RedwoodRecordRef* lowerBound,
const RedwoodRecordRef* upperBound,
bool forLazyClear = false,
bool cacheable = true,
bool* fromCache = nullptr) {
if (!forLazyClear) {
debug_printf("readPage() op=read %s @%" PRId64 " lower=%s upper=%s\n",
toString(id).c_str(),
@ -4366,20 +4293,20 @@ private:
wait(yield());
state Reference<const IPage> page;
state Reference<const ArenaPage> page;
if (id.size() == 1) {
Reference<const IPage> p = wait(snapshot->getPhysicalPage(id.front(), cacheable, false, fromCache));
Reference<const ArenaPage> p = wait(snapshot->getPhysicalPage(id.front(), cacheable, false, fromCache));
page = std::move(p);
} else {
ASSERT(!id.empty());
std::vector<Future<Reference<const IPage>>> reads;
std::vector<Future<Reference<const ArenaPage>>> reads;
for (auto& pageID : id) {
reads.push_back(snapshot->getPhysicalPage(pageID, cacheable, false));
}
std::vector<Reference<const IPage>> pages = wait(getAll(reads));
std::vector<Reference<const ArenaPage>> pages = wait(getAll(reads));
// TODO: Cache reconstituted super pages somehow, perhaps with help from the Pager.
page = Reference<const IPage>(new SuperPage(pages));
page = ArenaPage::concatPages(pages);
// In the current implementation, SuperPages are never present in the cache
if (fromCache != nullptr) {
@ -4432,7 +4359,7 @@ private:
ACTOR static Future<BTreePageIDRef> updateBTreePage(VersionedBTree* self,
BTreePageIDRef oldID,
Arena* arena,
Reference<IPage> page,
Reference<ArenaPage> page,
Version writeVersion) {
state BTreePageIDRef newID;
newID.resize(*arena, oldID.size());
@ -4441,11 +4368,11 @@ private:
LogicalPageID id = wait(self->m_pager->atomicUpdatePage(oldID.front(), page, writeVersion));
newID.front() = id;
} else {
state std::vector<Reference<IPage>> pages;
state std::vector<Reference<ArenaPage>> pages;
const uint8_t* rptr = page->begin();
int bytesLeft = page->size();
while (bytesLeft > 0) {
Reference<IPage> p = self->m_pager->newPageBuffer();
Reference<ArenaPage> p = self->m_pager->newPageBuffer();
int blockSize = p->size();
memcpy(p->mutate(), rptr, blockSize);
rptr += blockSize;
@ -4466,8 +4393,8 @@ private:
}
// Copy page and initialize a Mirror for reading it.
Reference<IPage> cloneForUpdate(Reference<const IPage> page) {
Reference<IPage> newPage = page->clone();
Reference<ArenaPage> cloneForUpdate(Reference<const ArenaPage> page) {
Reference<ArenaPage> newPage = page->cloneContents();
auto oldMirror = (const BTreePage::BinaryTree::Mirror*)page->userData;
auto newBTPage = (BTreePage*)newPage->mutate();
@ -4780,7 +4707,7 @@ private:
wait(commitReadLock->take());
state FlowLock::Releaser readLock(*commitReadLock);
state bool fromCache = false;
state Reference<const IPage> page = wait(
state Reference<const ArenaPage> page = wait(
readPage(snapshot, rootID, update->decodeLowerBound, update->decodeUpperBound, false, false, &fromCache));
readLock.release();
@ -5040,7 +4967,7 @@ private:
} else {
// Otherwise update it.
BTreePageIDRef newID = wait(self->updateBTreePage(
self, rootID, &update->newLinks.arena(), page.castTo<IPage>(), writeVersion));
self, rootID, &update->newLinks.arena(), page.castTo<ArenaPage>(), writeVersion));
update->updatedInPlace(newID, btPage, newID.size() * self->m_blockSize);
debug_printf(
@ -5352,7 +5279,7 @@ private:
}
BTreePageIDRef newID = wait(self->updateBTreePage(
self, rootID, &update->newLinks.arena(), page.castTo<IPage>(), writeVersion));
self, rootID, &update->newLinks.arena(), page.castTo<ArenaPage>(), writeVersion));
debug_printf(
"%s commitSubtree(): Internal page updated in-place at version %s, new contents: %s\n",
context.c_str(),
@ -5478,7 +5405,7 @@ private:
if (all.newLinks.empty()) {
debug_printf("Writing new empty root.\n");
LogicalPageID newRootID = wait(self->m_pager->newPageID());
Reference<IPage> page = self->m_pager->newPageBuffer();
Reference<ArenaPage> page = self->m_pager->newPageBuffer();
makeEmptyRoot(page);
self->m_header.height = 1;
self->m_pager->updatePage(newRootID, page);
@ -5539,12 +5466,12 @@ public:
struct PageCursor : ReferenceCounted<PageCursor>, FastAllocated<PageCursor> {
Reference<PageCursor> parent;
BTreePageIDRef pageID; // Only needed for debugging purposes
Reference<const IPage> page;
Reference<const ArenaPage> page;
BTreePage::BinaryTree::Cursor cursor;
// id will normally reference memory owned by the parent, which is okay because a reference to the parent
// will be held in the cursor
PageCursor(BTreePageIDRef id, Reference<const IPage> page, Reference<PageCursor> parent = {})
PageCursor(BTreePageIDRef id, Reference<const ArenaPage> page, Reference<PageCursor> parent = {})
: pageID(id), page(page), parent(parent), cursor(getCursor(page)) {}
PageCursor(const PageCursor& toCopy)
@ -5563,7 +5490,7 @@ public:
next.moveNext();
const RedwoodRecordRef& rec = cursor.get();
BTreePageIDRef id = rec.getChildPage();
Future<Reference<const IPage>> child = readPage(pager, id, &rec, &next.getOrUpperBound());
Future<Reference<const ArenaPage>> child = readPage(pager, id, &rec, &next.getOrUpperBound());
// Read ahead siblings at level 2
// TODO: Application of readAheadBytes is not taking into account the size of the current page or any
@ -5581,7 +5508,7 @@ public:
} while (readAheadBytes > 0 && next.moveNext());
}
return map(child, [=](Reference<const IPage> page) {
return map(child, [=](Reference<const ArenaPage> page) {
return makeReference<PageCursor>(id, page, Reference<PageCursor>::addRef(this));
});
}
@ -5657,8 +5584,8 @@ public:
}
// Otherwise read the root page
Future<Reference<const IPage>> root = readPage(pager, rootPageID, &dbBegin, &dbEnd);
return map(root, [=](Reference<const IPage> p) {
Future<Reference<const ArenaPage>> root = readPage(pager, rootPageID, &dbBegin, &dbEnd);
return map(root, [=](Reference<const ArenaPage> p) {
pageCursor = makeReference<PageCursor>(rootPageID, p);
return Void();
});
@ -5798,17 +5725,19 @@ public:
// Holds references to all pages touched.
// All record references returned from it are valid until the cursor is destroyed.
class BTreeCursor {
Arena arena;
Reference<IPagerSnapshot> pager;
std::unordered_map<LogicalPageID, Reference<const IPage>> pages;
VersionedBTree* btree;
bool valid;
public:
struct PathEntry {
BTreePage* btPage;
Reference<const ArenaPage> page;
BTreePage::BinaryTree::Cursor cursor;
const BTreePage* btPage() const { return (BTreePage*)page->begin(); };
};
VectorRef<PathEntry> path;
private:
VersionedBTree* btree;
Reference<IPagerSnapshot> pager;
bool valid;
std::vector<PathEntry> path;
public:
BTreeCursor() {}
@ -5821,7 +5750,7 @@ public:
r += format("[%d/%d: %s] ",
i + 1,
path.size(),
path[i].cursor.valid() ? path[i].cursor.get().toString(path[i].btPage->isLeaf()).c_str()
path[i].cursor.valid() ? path[i].cursor.get().toString(path[i].btPage()->isLeaf()).c_str()
: "<invalid>");
}
if (!valid) {
@ -5835,29 +5764,17 @@ public:
bool inRoot() const { return path.size() == 1; }
// Pop and return the page cursor at the end of the path.
// This is meant to enable range scans to consume the contents of a leaf page more efficiently.
// Can only be used when inRoot() is true.
BTreePage::BinaryTree::Cursor popPath() {
BTreePage::BinaryTree::Cursor c = path.back().cursor;
path.pop_back();
return c;
}
// To enable more efficient range scans, caller can read the lowest page
// of the cursor and pop it.
PathEntry& back() { return path.back(); }
void popPath() { path.pop_back(); }
Future<Void> pushPage(BTreePageIDRef id,
const RedwoodRecordRef& lowerBound,
const RedwoodRecordRef& upperBound) {
Reference<const IPage>& page = pages[id.front()];
if (page.isValid()) {
// The pager won't see this access so count it as a cache hit
++g_redwoodMetrics.pagerCacheHit;
path.push_back(arena, { (BTreePage*)page->begin(), getCursor(page) });
return Void();
}
return map(readPage(pager, id, &lowerBound, &upperBound), [this, &page, id](Reference<const IPage> p) {
page = p;
path.push_back(arena, { (BTreePage*)p->begin(), getCursor(p) });
return map(readPage(pager, id, &lowerBound, &upperBound), [this, id](Reference<const ArenaPage> p) {
path.push_back({ p, getCursor(p) });
return Void();
});
}
@ -5873,7 +5790,7 @@ public:
Future<Void> init(VersionedBTree* btree_in, Reference<IPagerSnapshot> pager_in, BTreePageIDRef root) {
btree = btree_in;
pager = pager_in;
path.reserve(arena, 6);
path.reserve(6);
valid = false;
return pushPage(root, dbBegin, dbEnd);
}
@ -5889,13 +5806,13 @@ public:
// to query.compare(cursor.get())
ACTOR Future<int> seek_impl(BTreeCursor* self, RedwoodRecordRef query, int prefetchBytes) {
state RedwoodRecordRef internalPageQuery = query.withMaxPageID();
self->path = self->path.slice(0, 1);
self->path.resize(1);
debug_printf(
"seek(%s, %d) start cursor = %s\n", query.toString().c_str(), prefetchBytes, self->toString().c_str());
loop {
auto& entry = self->path.back();
if (entry.btPage->isLeaf()) {
if (entry.btPage()->isLeaf()) {
int cmp = entry.cursor.seek(query);
self->valid = entry.cursor.valid() && !entry.cursor.node->isDeleted();
debug_printf("seek(%s, %d) loop exit cmp=%d cursor=%s\n",
@ -5919,7 +5836,7 @@ public:
// Prefetch siblings, at least prefetchBytes, at level 2 but without jumping to another level 2
// sibling
if (prefetchBytes != 0 && entry.btPage->height == 2) {
if (prefetchBytes != 0 && entry.btPage()->height == 2) {
auto c = entry.cursor;
bool fwd = prefetchBytes > 0;
prefetchBytes = abs(prefetchBytes);
@ -5988,7 +5905,7 @@ public:
}
// Skip over internal page entries that do not link to child pages. There should never be two in a row.
if (success && !entry.btPage->isLeaf() && !entry.cursor.get().value.present()) {
if (success && !entry.btPage()->isLeaf() && !entry.cursor.get().value.present()) {
success = forward ? entry.cursor.moveNext() : entry.cursor.movePrev();
ASSERT(!success || entry.cursor.get().value.present());
}
@ -6004,14 +5921,14 @@ public:
}
// Move to parent
self->path = self->path.slice(0, self->path.size() - 1);
self->path.pop_back();
}
// While not on a leaf page, move down to get to one.
while (1) {
debug_printf("move%s() second loop cursor=%s\n", forward ? "Next" : "Prev", self->toString().c_str());
auto& entry = self->path.back();
if (entry.btPage->isLeaf()) {
if (entry.btPage()->isLeaf()) {
break;
}
@ -6365,58 +6282,84 @@ public:
wait(cur.seekGTE(keys.begin, prefetchBytes));
while (cur.isValid()) {
// Read page contents without using waits
bool isRoot = cur.inRoot();
BTreePage::BinaryTree::Cursor leafCursor = cur.popPath();
BTreePage::BinaryTree::Cursor leafCursor = cur.back().cursor;
// we can bypass the bounds check for each key in the leaf if the entire leaf is in range
// > because both query end and page upper bound are exclusive of the query results and page contents,
// respectively
bool boundsCheck = leafCursor.upperBound() > keys.end;
// Whether or not any results from this page were added to results
bool usedPage = false;
while (leafCursor.valid()) {
KeyValueRef kv = leafCursor.get().toKeyValueRef();
if (boundsCheck && kv.key.compare(keys.end) >= 0) {
break;
}
accumulatedBytes += kv.expectedSize();
result.push_back_deep(result.arena(), kv);
result.push_back(result.arena(), kv);
usedPage = true;
if (--rowLimit == 0 || accumulatedBytes >= byteLimit) {
break;
}
leafCursor.moveNext();
}
// If the page was used, results must depend on the ArenaPage arena and the Mirror arena.
// This must be done after visiting all the results in case the Mirror arena changes.
if (usedPage) {
result.arena().dependsOn(leafCursor.mirror->arena);
result.arena().dependsOn(cur.back().page->getArena());
}
// Stop if the leaf cursor is still valid which means we hit a key or size limit or
// if we started in the root page
if (leafCursor.valid() || isRoot) {
// if the cursor is in the root page, in which case there are no more pages.
if (leafCursor.valid() || cur.inRoot()) {
break;
}
cur.popPath();
wait(cur.moveNext());
}
} else {
wait(cur.seekLT(keys.end, prefetchBytes));
while (cur.isValid()) {
// Read page contents without using waits
bool isRoot = cur.inRoot();
BTreePage::BinaryTree::Cursor leafCursor = cur.popPath();
BTreePage::BinaryTree::Cursor leafCursor = cur.back().cursor;
// we can bypass the bounds check for each key in the leaf if the entire leaf is in range
// < because both query begin and page lower bound are inclusive of the query results and page contents,
// respectively
bool boundsCheck = leafCursor.lowerBound() < keys.begin;
// Whether or not any results from this page were added to results
bool usedPage = false;
while (leafCursor.valid()) {
KeyValueRef kv = leafCursor.get().toKeyValueRef();
if (boundsCheck && kv.key.compare(keys.begin) < 0) {
break;
}
accumulatedBytes += kv.expectedSize();
result.push_back_deep(result.arena(), kv);
result.push_back(result.arena(), kv);
usedPage = true;
if (++rowLimit == 0 || accumulatedBytes >= byteLimit) {
break;
}
leafCursor.movePrev();
}
// If the page was used, results must depend on the ArenaPage arena and the Mirror arena.
// This must be done after visiting all the results in case the Mirror arena changes.
if (usedPage) {
result.arena().dependsOn(leafCursor.mirror->arena);
result.arena().dependsOn(cur.back().page->getArena());
}
// Stop if the leaf cursor is still valid which means we hit a key or size limit or
// if we started in the root page
if (leafCursor.valid() || isRoot) {
if (leafCursor.valid() || cur.inRoot()) {
break;
}
cur.popPath();
wait(cur.movePrev());
}
}
@ -6442,8 +6385,13 @@ public:
wait(cur.seekGTE(key, 0));
if (cur.isValid() && cur.get().key == key) {
return cur.get().value.get();
// Return a Value whose arena depends on the source page arena
Value v;
v.arena().dependsOn(cur.back().page->getArena());
v.contents() = cur.get().value.get();
return v;
}
return Optional<Value>();
}
@ -6451,32 +6399,15 @@ public:
return catchError(readValue_impl(this, key, debugID));
}
ACTOR static Future<Optional<Value>> readValuePrefix_impl(KeyValueStoreRedwoodUnversioned* self,
Key key,
int maxLength,
Optional<UID> debugID) {
state VersionedBTree::BTreeCursor cur;
wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion()));
state Reference<FlowLock> readLock = self->m_concurrentReads;
wait(readLock->take());
state FlowLock::Releaser releaser(*readLock);
++g_redwoodMetrics.opGet;
wait(cur.seekGTE(key, 0));
if (cur.isValid() && cur.get().key == key) {
Value v = cur.get().value.get();
int len = std::min(v.size(), maxLength);
return Value(v.substr(0, len));
}
return Optional<Value>();
}
Future<Optional<Value>> readValuePrefix(KeyRef key,
int maxLength,
Optional<UID> debugID = Optional<UID>()) override {
return catchError(readValuePrefix_impl(this, key, maxLength, debugID));
return catchError(map(readValue_impl(this, key, debugID), [maxLength](Optional<Value> v) {
if (v.present() && v.get().size() > maxLength) {
v.get().contents() = v.get().substr(0, maxLength);
}
return v;
}));
}
~KeyValueStoreRedwoodUnversioned() override{};
@ -6575,7 +6506,7 @@ ACTOR Future<int> verifyRangeBTreeCursor(VersionedBTree* btree,
"VerifyRange(@%" PRId64 ", %s, %s): Actual seek\n", v, start.printable().c_str(), end.printable().c_str());
wait(cur.seekGTE(start, 0));
state std::vector<KeyValue> results;
state Standalone<VectorRef<KeyValueRef>> results;
while (cur.isValid() && cur.get().key < end) {
// Find the next written kv pair that would be present at this version
@ -6633,7 +6564,10 @@ ACTOR Future<int> verifyRangeBTreeCursor(VersionedBTree* btree,
ASSERT(errors == 0);
results.push_back(KeyValue(KeyValueRef(cur.get().key, cur.get().value.get())));
results.push_back(results.arena(), cur.get().toKeyValueRef());
results.arena().dependsOn(cur.back().cursor.mirror->arena);
results.arena().dependsOn(cur.back().page->getArena());
wait(cur.moveNext());
}
@ -6672,7 +6606,7 @@ ACTOR Future<int> verifyRangeBTreeCursor(VersionedBTree* btree,
// Now read the range from the tree in reverse order and compare to the saved results
wait(cur.seekLT(end, 0));
state std::vector<KeyValue>::const_reverse_iterator r = results.rbegin();
state std::reverse_iterator<const KeyValueRef*> r = results.rbegin();
while (cur.isValid() && cur.get().key >= start) {
if (r == results.rend()) {
@ -7974,6 +7908,27 @@ TEST_CASE(":/redwood/performance/mutationBuffer") {
return Void();
}
// This test is only useful with Arena debug statements which show when aligned buffers are allocated and freed.
TEST_CASE(":/redwood/pager/ArenaPage") {
Arena x;
printf("Making p\n");
Reference<ArenaPage> p(new ArenaPage(4096, 4096));
printf("Made p=%p\n", p->begin());
printf("Clearing p\n");
p.clear();
printf("Making p\n");
p = Reference<ArenaPage>(new ArenaPage(4096, 4096));
printf("Made p=%p\n", p->begin());
printf("Making x depend on p\n");
x.dependsOn(p->getArena());
printf("Clearing p\n");
p.clear();
printf("Clearing x\n");
x = Arena();
printf("Pointer should be freed\n");
return Void();
}
TEST_CASE("/redwood/correctness/btree") {
g_redwoodMetricsActor = Void(); // Prevent trace event metrics from starting
g_redwoodMetrics.clear();
@ -8362,12 +8317,12 @@ TEST_CASE(":/redwood/correctness/pager/cow") {
wait(success(pager->init()));
state LogicalPageID id = wait(pager->newPageID());
Reference<IPage> p = pager->newPageBuffer();
Reference<ArenaPage> p = pager->newPageBuffer();
memset(p->mutate(), (char)id, p->size());
pager->updatePage(id, p);
pager->setMetaKey(LiteralStringRef("asdfasdf"));
wait(pager->commit());
Reference<IPage> p2 = wait(pager->readPage(id, true));
Reference<ArenaPage> p2 = wait(pager->readPage(id, true));
printf("%s\n", StringRef(p2->begin(), p2->size()).toHexString().c_str());
// TODO: Verify reads, do more writes and reads to make this a real pager validator

View File

@ -101,6 +101,11 @@ void Arena::dependsOn(const Arena& p) {
}
}
}
void* Arena::allocate4kAlignedBuffer(uint32_t size) {
return ArenaBlock::dependOn4kAlignedBuffer(impl, size);
}
size_t Arena::getSize() const {
if (impl) {
allowAccess(impl.getPtr());
@ -172,9 +177,13 @@ size_t ArenaBlock::totalSize() {
while (o) {
ArenaBlockRef* r = (ArenaBlockRef*)((char*)getData() + o);
makeDefined(r, sizeof(ArenaBlockRef));
allowAccess(r->next);
s += r->next->totalSize();
disallowAccess(r->next);
if (r->aligned4kBufferSize != 0) {
s += r->aligned4kBufferSize;
} else {
allowAccess(r->next);
s += r->next->totalSize();
disallowAccess(r->next);
}
o = r->nextBlockOffset;
makeNoAccess(r, sizeof(ArenaBlockRef));
}
@ -190,7 +199,12 @@ void ArenaBlock::getUniqueBlocks(std::set<ArenaBlock*>& a) {
while (o) {
ArenaBlockRef* r = (ArenaBlockRef*)((char*)getData() + o);
makeDefined(r, sizeof(ArenaBlockRef));
r->next->getUniqueBlocks(a);
// If next is valid recursively count its blocks
if (r->aligned4kBufferSize == 0) {
r->next->getUniqueBlocks(a);
}
o = r->nextBlockOffset;
makeNoAccess(r, sizeof(ArenaBlockRef));
}
@ -212,6 +226,7 @@ int ArenaBlock::addUsed(int bytes) {
void ArenaBlock::makeReference(ArenaBlock* next) {
ArenaBlockRef* r = (ArenaBlockRef*)((char*)getData() + bigUsed);
makeDefined(r, sizeof(ArenaBlockRef));
r->aligned4kBufferSize = 0;
r->next = next;
r->nextBlockOffset = nextBlockOffset;
makeNoAccess(r, sizeof(ArenaBlockRef));
@ -219,6 +234,19 @@ void ArenaBlock::makeReference(ArenaBlock* next) {
bigUsed += sizeof(ArenaBlockRef);
}
void* ArenaBlock::make4kAlignedBuffer(uint32_t size) {
ArenaBlockRef* r = (ArenaBlockRef*)((char*)getData() + bigUsed);
makeDefined(r, sizeof(ArenaBlockRef));
r->aligned4kBufferSize = size;
r->aligned4kBuffer = allocateFast4kAligned(size);
// printf("Arena::aligned4kBuffer alloc size=%u ptr=%p\n", size, r->aligned4kBuffer);
r->nextBlockOffset = nextBlockOffset;
makeNoAccess(r, sizeof(ArenaBlockRef));
nextBlockOffset = bigUsed;
bigUsed += sizeof(ArenaBlockRef);
return r->aligned4kBuffer;
}
void ArenaBlock::dependOn(Reference<ArenaBlock>& self, ArenaBlock* other) {
other->addref();
if (!self || self->isTiny() || self->unused() < sizeof(ArenaBlockRef))
@ -227,6 +255,14 @@ void ArenaBlock::dependOn(Reference<ArenaBlock>& self, ArenaBlock* other) {
self->makeReference(other);
}
void* ArenaBlock::dependOn4kAlignedBuffer(Reference<ArenaBlock>& self, uint32_t size) {
if (!self || self->isTiny() || self->unused() < sizeof(ArenaBlockRef)) {
return create(SMALL, self)->make4kAlignedBuffer(size);
} else {
return self->make4kAlignedBuffer(size);
}
}
void* ArenaBlock::allocate(Reference<ArenaBlock>& self, int bytes) {
ArenaBlock* b = self.getPtr();
allowAccess(b);
@ -359,10 +395,18 @@ void ArenaBlock::destroy() {
while (o) {
ArenaBlockRef* br = (ArenaBlockRef*)((char*)b->getData() + o);
makeDefined(br, sizeof(ArenaBlockRef));
allowAccess(br->next);
if (br->next->delref_no_destroy())
stack.push_back(stackArena, br->next);
disallowAccess(br->next);
// If aligned4kBuffer is valid, free it
if (br->aligned4kBufferSize != 0) {
// printf("Arena::aligned4kBuffer free %p\n", br->aligned4kBuffer);
freeFast4kAligned(br->aligned4kBufferSize, br->aligned4kBuffer);
} else {
allowAccess(br->next);
if (br->next->delref_no_destroy())
stack.push_back(stackArena, br->next);
disallowAccess(br->next);
}
o = br->nextBlockOffset;
}
}

View File

@ -102,6 +102,7 @@ public:
Arena& operator=(Arena&&) noexcept;
void dependsOn(const Arena& p);
void* allocate4kAlignedBuffer(uint32_t size);
size_t getSize() const;
bool hasFree(size_t size, const void* address);
@ -129,7 +130,15 @@ struct scalar_traits<Arena> : std::true_type {
};
struct ArenaBlockRef {
ArenaBlock* next;
union {
ArenaBlock* next;
void* aligned4kBuffer;
};
// Only one of (next, aligned4kBuffer) is valid at any one time, as they occupy the same space.
// If aligned4kBufferSize is not 0, aligned4kBuffer is valid, otherwise next is valid.
uint32_t aligned4kBufferSize;
uint32_t nextBlockOffset;
};
@ -160,7 +169,9 @@ struct ArenaBlock : NonCopyable, ThreadSafeReferenceCounted<ArenaBlock> {
void getUniqueBlocks(std::set<ArenaBlock*>& a);
int addUsed(int bytes);
void makeReference(ArenaBlock* next);
void* make4kAlignedBuffer(uint32_t size);
static void dependOn(Reference<ArenaBlock>& self, ArenaBlock* other);
static void* dependOn4kAlignedBuffer(Reference<ArenaBlock>& self, uint32_t size);
static void* allocate(Reference<ArenaBlock>& self, int bytes);
// Return an appropriately-sized ArenaBlock to store the given data
static ArenaBlock* create(int dataSize, Reference<ArenaBlock>& next);

View File

@ -266,4 +266,26 @@ inline void freeFast(int size, void* ptr) {
delete[](uint8_t*) ptr;
}
[[nodiscard]] inline void* allocateFast4kAligned(int size) {
// Use FastAllocator for sizes it supports to avoid internal fragmentation in some implementations of aligned_alloc
if (size <= 4096)
return FastAllocator<4096>::allocate();
if (size <= 8192)
return FastAllocator<8192>::allocate();
if (size <= 16384)
return FastAllocator<16384>::allocate();
return aligned_alloc(4096, size);
}
inline void freeFast4kAligned(int size, void* ptr) {
// Sizes supported by FastAllocator must be release via FastAllocator
if (size <= 4096)
return FastAllocator<4096>::release(ptr);
if (size <= 8192)
return FastAllocator<8192>::release(ptr);
if (size <= 16384)
return FastAllocator<16384>::release(ptr);
aligned_free(ptr);
}
#endif

View File

@ -1334,6 +1334,14 @@ struct FlowLock : NonCopyable, public ReferenceCounted<FlowLock> {
int64_t activePermits() const { return active; }
int waiters() const { return takers.size(); }
// Try to send error to all current and future waiters
// Only works if broken_on_destruct.canBeSet()
void kill(Error e = broken_promise()) {
if (broken_on_destruct.canBeSet()) {
broken_on_destruct.sendError(e);
}
}
private:
std::list<std::pair<Promise<Void>, int64_t>> takers;
const int64_t permits;