Refactored page rebuild logic to bulk build pages full and split pages more evenly.

This commit is contained in:
Steve Atherton 2021-04-03 19:54:49 -07:00
parent 79fed2cd7c
commit 90ebf90c8b
5 changed files with 267 additions and 180 deletions

View File

@ -230,8 +230,6 @@ struct DeltaTree {
inline Node& newNode() { return *(Node*)((uint8_t*)this + size()); }
public:
// Get count of total overhead bytes (everything but the user-formatted Delta) for a tree given size n
static int emptyTreeSize() { return sizeof(DeltaTree); }
struct DecodedNode {
DecodedNode() {}

View File

@ -76,7 +76,7 @@ public:
virtual void delref() = 0;
};
// This API is probably customized to the behavior of DWALPager and probably needs some changes to be more generic.
// This API is probably too customized to the behavior of DWALPager and probably needs some changes to be more generic.
class IPager2 : public IClosable {
public:
// Returns an IPage that can be passed to writePage. The data in the returned IPage might not be zeroed.

View File

@ -703,7 +703,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( REDWOOD_DEFAULT_PAGE_SIZE, 4096 );
init( REDWOOD_KVSTORE_CONCURRENT_READS, 64 );
init( REDWOOD_COMMIT_CONCURRENT_READS, 64 );
init( REDWOOD_PAGE_REBUILD_FILL_FACTOR, 0.66 );
init( REDWOOD_PAGE_REBUILD_MAX_SLACK, 0.33 );
init( REDWOOD_LAZY_CLEAR_BATCH_SIZE_PAGES, 10 );
init( REDWOOD_LAZY_CLEAR_MIN_PAGES, 0 );
init( REDWOOD_LAZY_CLEAR_MAX_PAGES, 1e6 );

View File

@ -636,7 +636,7 @@ public:
int REDWOOD_DEFAULT_PAGE_SIZE; // Page size for new Redwood files
int REDWOOD_KVSTORE_CONCURRENT_READS; // Max number of simultaneous point or range reads in progress.
int REDWOOD_COMMIT_CONCURRENT_READS; // Max number of concurrent reads done to support commit operations
double REDWOOD_PAGE_REBUILD_FILL_FACTOR; // When rebuilding pages, start a new page after this capacity
double REDWOOD_PAGE_REBUILD_MAX_SLACK; // When rebuilding pages, max slack to allow in page
int REDWOOD_LAZY_CLEAR_BATCH_SIZE_PAGES; // Number of pages to try to pop from the lazy delete queue and process at
// once
int REDWOOD_LAZY_CLEAR_MIN_PAGES; // Minimum number of pages to free before ending a lazy clear cycle, unless the

View File

@ -2864,7 +2864,8 @@ struct RedwoodRecordRef {
bool operator>=(const RedwoodRecordRef& rhs) const { return compare(rhs) >= 0; }
// Worst case overhead means to assu
// Worst case overhead means to assume that either the prefix length or the suffix length
// could contain the full key size
int deltaSize(const RedwoodRecordRef& base, int skipLen, bool worstCaseOverhead) const {
int prefixLen = getCommonPrefixLen(base, skipLen);
int keySuffixLen = key.size() - prefixLen;
@ -3732,6 +3733,184 @@ private:
Future<int> m_lazyClearActor;
bool m_lazyClearStop;
struct PageToBuild {
PageToBuild(int index, int blockSize)
: startIndex(index), count(0), pageSize(blockSize),
bytesLeft(blockSize - sizeof(BTreePage) - sizeof(BTreePage::BinaryTree)),
largeDeltaTree(pageSize > BTreePage::BinaryTree::SmallSizeLimit), blockSize(blockSize), blockCount(1),
kvBytes(0) {}
int startIndex;
int count;
int pageSize;
int bytesLeft;
bool largeDeltaTree;
int blockSize;
int blockCount;
int kvBytes;
int size() const { return pageSize - bytesLeft; }
double usedFraction() const { return (double)size() / pageSize; }
double slackFraction() const { return (double)bytesLeft / pageSize; }
double kvFraction() const { return (double)kvBytes / pageSize; }
int endIndex() const { return startIndex + count; }
int lastIndex() const { return endIndex() - 1; }
std::string toString() const {
return format(
"{start=%d count=%d used %d/%d bytes (%.2f%% slack) kvBytes=%d blocks=%d blockSize=%d large=%d}",
startIndex,
count,
size(),
pageSize,
slackFraction() * 100,
kvBytes,
blockCount,
blockSize,
largeDeltaTree);
}
// Move an item from a to b if a has 2 or more items and the item fits in b
// a and b must be consecutive pages from the same array of records
static bool shiftItem(PageToBuild& a, PageToBuild& b, int deltaSize, int kvBytes) {
if (a.count < 2) {
return false;
}
// Size of the nodes in A and B, respectively
int aNodeSize = deltaSize + BTreePage::BinaryTree::Node::headerSize(a.largeDeltaTree);
int bNodeSize = deltaSize + BTreePage::BinaryTree::Node::headerSize(b.largeDeltaTree);
if (b.bytesLeft < bNodeSize) {
return false;
}
--a.count;
++b.count;
--b.startIndex;
a.bytesLeft += aNodeSize;
b.bytesLeft -= bNodeSize;
a.kvBytes -= kvBytes;
b.kvBytes += kvBytes;
return true;
}
// Try to add a record of the given delta size to the page.
// If force is true, the page will be expanded to make the record fit if needed.
// Return value is whether or not the record was added to the page.
bool addRecord(const RedwoodRecordRef& rec, int deltaSize, bool force) {
int nodeSize = deltaSize + BTreePage::BinaryTree::Node::headerSize(largeDeltaTree);
// If the record doesn't fit and the page can't be expanded then return false
if (nodeSize > bytesLeft && !force) {
return false;
}
++count;
bytesLeft -= nodeSize;
kvBytes += rec.kvBytes();
// If needed, expand page so that record fits.
// This is a loop because the first expansion may increase per-node overhead which could
// then require a second expansion.
while (bytesLeft < 0) {
int newBlocks = (-bytesLeft + blockSize - 1) / blockSize;
int extraSpace = newBlocks * blockSize;
blockCount += newBlocks;
bytesLeft += extraSpace;
pageSize += extraSpace;
// If size has moved into the "large" range then every node has gotten bigger so adjust bytesLeft
if (!largeDeltaTree && pageSize > BTreePage::BinaryTree::SmallSizeLimit) {
largeDeltaTree = true;
bytesLeft -= (count * BTreePage::BinaryTree::LargeTreePerNodeExtraOverhead);
}
}
return true;
}
};
static std::vector<PageToBuild> splitPages(const RedwoodRecordRef* lowerBound,
const RedwoodRecordRef* upperBound,
int prefixLen,
VectorRef<RedwoodRecordRef> records,
int height,
int blockSize) {
debug_printf("splitPages height=%d records=%d lowerBound=%s upperBound=%s\n",
height,
records.size(),
lowerBound->toString(false).c_str(),
upperBound->toString(false).c_str());
ASSERT(!records.empty());
// Leaves can have just one record if it's large, but internal pages should have at least 4
int minRecords = height == 1 ? 1 : 4;
double maxSlack = SERVER_KNOBS->REDWOOD_PAGE_REBUILD_MAX_SLACK;
std::vector<PageToBuild> pages;
// deltaSizes contains pair-wise delta sizes for [lowerBound, records..., upperBound]
std::vector<int> deltaSizes(records.size() + 1);
deltaSizes.front() = records.front().deltaSize(*lowerBound, prefixLen, true);
deltaSizes.back() = records.back().deltaSize(*upperBound, prefixLen, true);
for (int i = 1; i < records.size(); ++i) {
deltaSizes[i] = records[i].deltaSize(records[i - 1], prefixLen, true);
}
PageToBuild p(0, blockSize);
for (int i = 0; i < records.size(); ++i) {
bool force = p.count < minRecords || p.slackFraction() > maxSlack;
debug_printf(
" before addRecord i=%d records=%d deltaSize=%d kvSize=%d force=%d pageToBuild=%s record=%s",
i,
records.size(),
deltaSizes[i],
records[i].kvBytes(),
force,
p.toString().c_str(),
records[i].toString(height == 1).c_str());
if (!p.addRecord(records[i], deltaSizes[i], force)) {
pages.push_back(p);
p = PageToBuild(p.endIndex(), blockSize);
p.addRecord(records[i], deltaSizes[i], true);
}
}
if (p.count > 0) {
pages.push_back(p);
}
debug_printf(" Before shift: %s\n", ::toString(pages).c_str());
// If page count is > 1, try to balance slack between last two pages
// The buggify disables this balancing as this will result in more edge
// cases of pages with very few records.
if (pages.size() > 1 && !BUGGIFY) {
PageToBuild& a = pages[pages.size() - 2];
PageToBuild& b = pages.back();
// While the last page page has too much slack and the second to last page
// has more than the minimum record count, shift a record from the second
// to last page to the last page.
while (b.slackFraction() > maxSlack && a.count > minRecords) {
int i = a.lastIndex();
if (!PageToBuild::shiftItem(a, b, deltaSizes[i], records[i].kvBytes())) {
break;
}
debug_printf(" After shifting i=%d: a=%s b=%s\n", i, a.toString().c_str(), b.toString().c_str());
}
}
return pages;
}
// Writes entries to 1 or more pages and return a vector of boundary keys with their IPage(s)
ACTOR static Future<Standalone<VectorRef<RedwoodRecordRef>>> writePages(VersionedBTree* self,
const RedwoodRecordRef* lowerBound,
@ -3741,197 +3920,130 @@ private:
Version v,
BTreePageIDRef previousID) {
ASSERT(entries.size() > 0);
state Standalone<VectorRef<RedwoodRecordRef>> records;
// This is how much space for the binary tree exists in the page, after the header
state int blockSize = self->m_blockSize;
state int pageSize = blockSize - sizeof(BTreePage);
state int pageFillTarget = pageSize * SERVER_KNOBS->REDWOOD_PAGE_REBUILD_FILL_FACTOR;
state int blockCount = 1;
// All records share the prefix shared by the lower and upper boundaries
state int prefixLen = lowerBound->getCommonPrefixLen(*upperBound);
state int kvBytes = 0;
state int compressedBytes = BTreePage::BinaryTree::emptyTreeSize();
state bool largeTree = false;
state int start = 0;
state int i = 0;
// The common prefix length between the first and last records are common to all records
state int skipLen = entries.front().getCommonPrefixLen(entries.back());
// Leaves can have just one record if it's large, but internal pages should have at least 4
state int minimumEntries = (height == 1 ? 1 : 4);
state std::vector<PageToBuild> pagesToBuild =
splitPages(lowerBound, upperBound, prefixLen, entries, height, self->m_blockSize);
debug_printf("splitPages returning %s\n", toString(pagesToBuild).c_str());
// Lower bound of the page being added to
state RedwoodRecordRef pageLowerBound = lowerBound->withoutValue();
state RedwoodRecordRef pageUpperBound;
while (1) {
// While there are still entries to add and the page isn't full enough, add an entry
while (i < entries.size() && (i - start < minimumEntries || compressedBytes < pageFillTarget)) {
const RedwoodRecordRef& entry = entries[i];
state int pageIndex;
// Get delta from previous record or page lower boundary if this is the first item in a page
const RedwoodRecordRef& base = (i == start) ? pageLowerBound : entries[i - 1];
for (pageIndex = 0; pageIndex < pagesToBuild.size(); ++pageIndex) {
auto& p = pagesToBuild[pageIndex];
debug_printf("building page %d of %d %s\n", pageIndex + 1, pagesToBuild.size(), p.toString().c_str());
ASSERT(p.count != 0);
// All record pairs in entries have skipLen bytes in common with each other, but for i == 0 the base is
// lowerBound
int skip = i == 0 ? 0 : skipLen;
// For internal pages, skip first entry if child link is null. Such links only exist
// to maintain a borrow-able prefix for the previous subtree after a subtree deletion.
// If the null link falls on a new page post-split, then the pageLowerBound of the page
// being built now will serve as the previous subtree's upper boundary as it is the same
// key as entries[p.startIndex] and there is no need to actually store the null link in
// the new page.
if (height != 1 && !entries[p.startIndex].value.present()) {
p.kvBytes -= entries[p.startIndex].key.size();
++p.startIndex;
--p.count;
debug_printf("Skipping first null record, new count=%d\n", p.count);
// In a delta tree, all common prefix bytes that can be borrowed, will be, but not necessarily
// by the same records during the linear estimate of the built page size. Since the key suffix bytes
// and therefore the key prefix lengths can be distributed differently in the balanced tree, worst case
// overhead for the delta size must be assumed.
int deltaSize = entry.deltaSize(base, skip, true);
int nodeSize = BTreePage::BinaryTree::Node::headerSize(largeTree) + deltaSize;
debug_printf("Adding %3d of %3lu (i=%3d) klen %4d vlen %5d nodeSize %5d deltaSize %5d page usage: "
"%d/%d (%.2f%%) record=%s\n",
i + 1,
entries.size(),
i,
entry.key.size(),
entry.value.orDefault(StringRef()).size(),
nodeSize,
deltaSize,
compressedBytes,
pageSize,
(float)compressedBytes / pageSize * 100,
entry.toString(height == 1).c_str());
// While the node doesn't fit, expand the page.
// This is a loop because if the page size moves into "large" range for DeltaTree
// then the overhead will increase, which could require another page expansion.
int spaceAvailable = pageSize - compressedBytes;
if (nodeSize > spaceAvailable) {
// Figure out how many additional whole or partial blocks are needed
// newBlocks = ceil ( additional space needed / block size)
int newBlocks = 1 + (nodeSize - spaceAvailable - 1) / blockSize;
int newPageSize = pageSize + (newBlocks * blockSize);
// If we've moved into "large" page range for the delta tree then add additional overhead required
if (!largeTree && newPageSize > BTreePage::BinaryTree::SmallSizeLimit) {
largeTree = true;
// Add increased overhead for the current node to nodeSize
nodeSize += BTreePage::BinaryTree::LargeTreePerNodeExtraOverhead;
// Add increased overhead for all previously added nodes
compressedBytes += (i - start) * BTreePage::BinaryTree::LargeTreePerNodeExtraOverhead;
// Update calculations above made with previous overhead sizes
spaceAvailable = pageSize - compressedBytes;
newBlocks = 1 + (nodeSize - spaceAvailable - 1) / blockSize;
newPageSize = pageSize + (newBlocks * blockSize);
}
blockCount += newBlocks;
pageSize = newPageSize;
pageFillTarget = pageSize * SERVER_KNOBS->REDWOOD_PAGE_REBUILD_FILL_FACTOR;
// If the page is now empty then it must be the last page in pagesToBuild, otherwise there would
// be more than 1 item since internal pages need to have multiple children. While there is no page
// to be built here, a record must be added to the output set because the upper boundary of the last
// page built does not match the upper boundary of the original page that this call to writePages() is
// replacing. Put another way, the upper boundary of the rightmost page of the page set that was just
// built does not match the upper boundary of the original page that the page set is replacing, so
// adding the extra null link fixes this.
if (p.count == 0) {
ASSERT(pageIndex == pagesToBuild.size() - 1);
records.push_back_deep(records.arena(), pageUpperBound);
break;
}
kvBytes += entry.kvBytes();
compressedBytes += nodeSize;
++i;
}
// Flush the accumulated records to a page
state int nextStart = i;
// If we are building internal pages and there is a record after this page (index nextStart) but it has an
// empty childPage value then skip it. It only exists to serve as an upper boundary for a child page that
// has not been rewritten in the current commit, and that purpose will now be served by the upper bound of
// the page we are now building.
if (height != 1 && nextStart < entries.size() && !entries[nextStart].value.present()) {
++nextStart;
}
// Use the next entry as the upper bound, or upperBound if there are no more entries beyond this page
pageUpperBound = (i == entries.size()) ? upperBound->withoutValue() : entries[i].withoutValue();
int endIndex = p.endIndex();
bool lastPage = endIndex == entries.size();
pageUpperBound = lastPage ? upperBound->withoutValue() : entries[endIndex].withoutValue();
// If this is a leaf page, and not the last one to be written, shorten the upper boundary
state bool isLastPage = (nextStart == entries.size());
if (!isLastPage && height == 1) {
int commonPrefix = pageUpperBound.getCommonPrefixLen(entries[i - 1], 0);
if (!lastPage && height == 1) {
int commonPrefix = pageUpperBound.getCommonPrefixLen(entries[endIndex - 1], prefixLen);
pageUpperBound.truncate(commonPrefix + 1);
}
state std::vector<Reference<IPage>> pages;
BTreePage* btPage;
int capacity = blockSize * blockCount;
if (blockCount == 1) {
if (p.blockCount == 1) {
Reference<IPage> page = self->m_pager->newPageBuffer();
btPage = (BTreePage*)page->mutate();
pages.push_back(std::move(page));
} else {
ASSERT(blockCount > 1);
btPage = (BTreePage*)new uint8_t[capacity];
ASSERT(p.blockCount > 1);
btPage = (BTreePage*)new uint8_t[p.pageSize];
}
btPage->height = height;
btPage->kvBytes = kvBytes;
btPage->kvBytes = p.kvBytes;
debug_printf(
"Building tree. start=%d i=%d count=%d page usage: %d/%d (%.2f%%) bytes\nlower: %s\nupper: %s\n",
start,
i,
i - start,
compressedBytes,
pageSize,
(float)compressedBytes / pageSize * 100,
pageLowerBound.toString(false).c_str(),
pageUpperBound.toString(false).c_str());
debug_printf("Building tree for %s\nlower: %s\nupper: %s\n",
p.toString().c_str(),
pageLowerBound.toString(false).c_str(),
pageUpperBound.toString(false).c_str());
int written =
btPage->tree().build(pageSize, &entries[start], &entries[i], &pageLowerBound, &pageUpperBound);
if (written > pageSize) {
debug_printf("ERROR: Wrote %d bytes to %d byte page (%d blocks). recs %d kvBytes %d compressed %d\n",
int deltaTreeSpace = p.pageSize - sizeof(BTreePage);
state int written = btPage->tree().build(
deltaTreeSpace, &entries[p.startIndex], &entries[endIndex], &pageLowerBound, &pageUpperBound);
if (written > deltaTreeSpace) {
debug_printf("ERROR: Wrote %d bytes to page %s deltaTreeSpace=%d\n",
written,
pageSize,
blockCount,
i - start,
kvBytes,
compressedBytes);
fprintf(stderr,
"ERROR: Wrote %d bytes to %d byte page (%d blocks). recs %d kvBytes %d compressed %d\n",
written,
pageSize,
blockCount,
i - start,
kvBytes,
compressedBytes);
p.toString().c_str(),
deltaTreeSpace);
TraceEvent(SevError, "RedwoodDeltaTreeOverflow")
.detail("PageSize", p.pageSize)
.detail("BytesWritten", written);
ASSERT(false);
}
auto& metrics = g_redwoodMetrics.level(btPage->height);
metrics.pageBuild += 1;
metrics.pageBuildExt += blockCount - 1;
metrics.buildFillPct += (double)written / capacity;
metrics.buildStoredPct += (double)btPage->kvBytes / capacity;
metrics.buildItemCount += btPage->tree().numItems;
metrics.pageBuildExt += p.blockCount - 1;
metrics.buildFillPct += p.usedFraction();
metrics.buildStoredPct += p.kvFraction();
metrics.buildItemCount += p.count;
// Create chunked pages
// TODO: Avoid copying page bytes, but this is not trivial due to how pager checksums are currently handled.
if (blockCount != 1) {
if (p.blockCount != 1) {
// Mark the slack in the page buffer as defined
VALGRIND_MAKE_MEM_DEFINED(((uint8_t*)btPage) + written, (blockCount * blockSize) - written);
VALGRIND_MAKE_MEM_DEFINED(((uint8_t*)btPage) + written, (p.blockCount * p.blockSize) - written);
const uint8_t* rptr = (const uint8_t*)btPage;
for (int b = 0; b < blockCount; ++b) {
for (int b = 0; b < p.blockCount; ++b) {
Reference<IPage> page = self->m_pager->newPageBuffer();
memcpy(page->mutate(), rptr, blockSize);
rptr += blockSize;
memcpy(page->mutate(), rptr, p.blockSize);
rptr += p.blockSize;
pages.push_back(std::move(page));
}
delete[](uint8_t*) btPage;
}
// Write this btree page, which is made of 1 or more pager pages.
state int p;
state BTreePageIDRef childPageID;
state int k;
// If we are only writing 1 page and it has the same BTreePageID size as the original then try to reuse the
// LogicalPageIDs in previousID and try to update them atomically.
bool isOnlyPage = isLastPage && (start == 0);
if (isOnlyPage && previousID.size() == pages.size()) {
for (p = 0; p < pages.size(); ++p) {
LogicalPageID id = wait(self->m_pager->atomicUpdatePage(previousID[p], pages[p], v));
if (pagesToBuild.size() == 1 && previousID.size() == pages.size()) {
for (k = 0; k < pages.size(); ++k) {
LogicalPageID id = wait(self->m_pager->atomicUpdatePage(previousID[k], pages[k], v));
childPageID.push_back(records.arena(), id);
}
} else {
@ -3942,31 +4054,25 @@ private:
if (records.empty()) {
self->freeBTreePage(previousID, v);
}
for (p = 0; p < pages.size(); ++p) {
for (k = 0; k < pages.size(); ++k) {
LogicalPageID id = wait(self->m_pager->newPageID());
self->m_pager->updatePage(id, pages[p]);
self->m_pager->updatePage(id, pages[k]);
childPageID.push_back(records.arena(), id);
}
}
wait(yield());
debug_printf("Flushing %s lastPage=%d original=%s start=%d i=%d count=%d page usage: %d/%d (%.2f%%) "
"bytes\nlower: %s\nupper: %s\n",
toString(childPageID).c_str(),
isLastPage,
toString(previousID).c_str(),
start,
i,
i - start,
compressedBytes,
pageSize,
(float)compressedBytes / pageSize * 100,
pageLowerBound.toString(false).c_str(),
pageUpperBound.toString(false).c_str());
if (REDWOOD_DEBUG) {
for (int j = start; j < i; ++j) {
auto& p = pagesToBuild[pageIndex];
debug_printf("Wrote %s original=%s deltaTreeSize=%d for %s\nlower: %s\nupper: %s\n",
toString(childPageID).c_str(),
toString(previousID).c_str(),
written,
p.toString().c_str(),
pageLowerBound.toString(false).c_str(),
pageUpperBound.toString(false).c_str());
for (int j = p.startIndex; j < p.endIndex(); ++j) {
debug_printf(" %3d: %s\n", j, entries[j].toString(height == 1).c_str());
}
ASSERT(pageLowerBound.key <= pageUpperBound.key);
@ -3978,27 +4084,9 @@ private:
// records.arena() above
records.back().setChildPage(childPageID);
if (isLastPage) {
break;
}
start = nextStart;
kvBytes = 0;
compressedBytes = BTreePage::BinaryTree::emptyTreeSize();
pageLowerBound = pageUpperBound;
}
// If we're writing internal pages, if the last entry was the start of a new page and had an empty child link
// then it would not be written to a page. This means that the upper boundary for the the page set being built
// is not the upper bound of the final page in that set, so it must be added to the output set to preserve the
// decodability of the subtree to its left. Fortunately, this is easy to detect because the loop above would
// exit before i has reached the item count.
if (height != 1 && i != entries.size()) {
debug_printf("Adding dummy record to avoid writing useless page containing only one null link: %s\n",
pageUpperBound.toString(false).c_str());
records.push_back_deep(records.arena(), pageUpperBound);
}
return records;
}
@ -4294,11 +4382,12 @@ private:
std::string toString() const {
std::string s;
s += format("SubtreeSlice: addr=%p skipLen=%d subtreeCleared=%d childrenChanged=%d\n",
s += format("SubtreeSlice: addr=%p skipLen=%d subtreeCleared=%d childrenChanged=%d inPlaceUpdate=%d\n",
this,
skipLen,
childrenChanged && newLinks.empty(),
childrenChanged);
childrenChanged,
inPlaceUpdate);
s += format("SubtreeLower: %s\n", subtreeLowerBound->toString(false).c_str());
s += format(" DecodeLower: %s\n", decodeLowerBound->toString(false).c_str());
s += format(" DecodeUpper: %s\n", decodeUpperBound->toString(false).c_str());