Many bug fixes in new page API and usages of it.

This commit is contained in:
Steve Atherton 2021-12-09 02:21:50 -08:00
parent d1ede0225b
commit 4bc0cfe4af
4 changed files with 109 additions and 59 deletions

View File

@ -1226,7 +1226,10 @@ public:
return item.get();
}
void switchTree(DeltaTree2* newTree) {
// Switch the cursor to point to a new DeltaTree
// if noReset is true, then the current decoded item will NOT be reset, so be sure that the original tree will
// have a lifetime that exceeds this cursor as the decoded item may point into the original tree.
void switchTree(DeltaTree2* newTree, bool noReset = false) {
tree = newTree;
// Reset item because it may point into tree memory
item.reset();
@ -1652,6 +1655,10 @@ public:
nodeBytesUsed = 0;
}
nodeBytesFree = spaceAvailable - size();
#ifdef VALGRIND
// Mark unused available space as defined
VALGRIND_MAKE_MEM_DEFINED((uint8_t*)this + size(), nodeBytesFree);
#endif
return size();
}

View File

@ -31,14 +31,10 @@
#define XXH_INLINE_ALL
#include "flow/xxhash.h"
#ifndef VALGRIND
#define VALGRIND_MAKE_MEM_UNDEFINED(x, y)
#define VALGRIND_MAKE_MEM_DEFINED(x, y)
#endif
typedef uint32_t LogicalPageID;
typedef uint32_t PhysicalPageID;
#define invalidLogicalPageID std::numeric_limits<LogicalPageID>::max()
#define invalidPhysicalPageID std::numeric_limits<PhysicalPageID>::max()
typedef uint32_t QueueID;
#define invalidQueueID std::numeric_limits<QueueID>::max()
@ -113,8 +109,10 @@ public:
if (bufferSize > 0) {
buffer = (uint8_t*)arena.allocate4kAlignedBuffer(bufferSize);
#ifdef VALGRIND
// Mark any unused page portion defined
VALGRIND_MAKE_MEM_DEFINED(buffer + logicalSize, bufferSize - logicalSize);
#endif
} else {
buffer = nullptr;
}
@ -231,6 +229,7 @@ public:
// Initialize the header for a new page to be populated soon and written to disk
void init(EncodingType t, PageType pageType, uint8_t pageSubType) {
encodingType = t;
Reader next{ buffer };
VersionHeader* vh = next;
// Only the latest header version is written.
@ -249,7 +248,7 @@ public:
throw unsupported_format_version();
}
next.skip<XXH64_hash_t>();
next.skip<Footer>();
pUsable = next;
usableSize = logicalSize - (pUsable - buffer);
@ -257,7 +256,7 @@ public:
// Get the usable size for a new page of pageSize using HEADER_WRITE_VERSION with encoding type t
static int getUsableSize(int pageSize, EncodingType t) {
int usable = pageSize - sizeof(VersionHeader) - sizeof(Header) - sizeof(XXH64_hash_t);
int usable = pageSize - sizeof(VersionHeader) - sizeof(Header) - sizeof(Footer);
if (t == EncodingType::XXHash64) {
usable -= sizeof(XXHashEncodingHeader);
@ -277,6 +276,14 @@ public:
ArenaPage* p = new ArenaPage(logicalSize, bufferSize);
memcpy(p->buffer, buffer, logicalSize);
p->pUsable = p->buffer + (pUsable - buffer);
p->usableSize = usableSize;
p->encodingType = encodingType;
if (encodingType == EncodingType::XOREncryption) {
p->xorKeyID = xorKeyID;
p->xorKeySecret = xorKeySecret;
}
return Reference<ArenaPage>(p);
}
@ -298,6 +305,12 @@ public:
Header* h = next;
h->firstPhysicalPageID = pageID;
h->writeTime = now();
// TODO: Update these when possible.
h->lastKnownLogicalPageID = invalidLogicalPageID;
h->lastKnownParentID = invalidLogicalPageID;
h->writeVersion = invalidVersion;
if (vh->encodingType == EncodingType::XXHash64) {
XXHashEncodingHeader* xh = next;
@ -367,6 +380,7 @@ private:
uint8_t* pUsable;
int usableSize;
EncodingType encodingType;
// Encoding-specific secrets
uint8_t xorKeyID;
uint8_t xorKeySecret;

View File

@ -439,17 +439,18 @@ public:
#pragma pack(push, 1)
struct QueueState {
bool operator==(const QueueState& rhs) const { return memcmp(this, &rhs, sizeof(QueueState)) == 0; }
QueueID queueID = invalidQueueID;
LogicalPageID headPageID = invalidLogicalPageID;
LogicalPageID tailPageID = invalidLogicalPageID;
QueueID queueID;
LogicalPageID headPageID;
LogicalPageID tailPageID;
uint16_t headOffset;
// Note that there is no tail index because the tail page is always never-before-written and its index will
// start at 0
int64_t numPages;
int64_t numEntries;
bool usesExtents = false; // Is this an extent based queue?
LogicalPageID prevExtentEndPageID = invalidLogicalPageID;
bool tailPageNewExtent = false;
bool usesExtents; // Is this an extent based queue?
LogicalPageID prevExtentEndPageID;
bool tailPageNewExtent; // Tail page points to the start of a new extent
KeyRef asKeyRef() const { return KeyRef((uint8_t*)this, sizeof(QueueState)); }
void fromKeyRef(KeyRef k) { memcpy(this, k.begin(), k.size()); }
@ -467,19 +468,20 @@ public:
}
};
struct PageHeader {
struct QueuePage {
// The next page of the queue after this one
LogicalPageID nextPageID;
PhysicalPageID nextPageID;
// The start offset of the next page
uint16_t nextOffset;
// The end offset of the current page's data entries
uint16_t endOffset;
// Current page within the extent
LogicalPageID extentCurPageID;
PhysicalPageID extentCurPageID;
// The end page within the extent
LogicalPageID extentEndPageID;
uint16_t pageSize;
PhysicalPageID extentEndPageID;
uint16_t itemSpace;
// Get pointer to data after page header
const uint8_t* begin() const { return (const uint8_t*)(this + 1); }
uint8_t* begin() { return (uint8_t*)(this + 1); }
};
#pragma pack(pop)
@ -499,7 +501,7 @@ public:
// The first page ID to be written to the pager, if this cursor has written anything
LogicalPageID firstPageIDWritten;
// Offset after PageHeader header in page to next read from or write to
// Offset after QueuePage::begin() to next read from or write to
int offset;
// A read cursor will not read this page (or beyond)
@ -623,11 +625,11 @@ public:
// Returns true if any items have been written to the last page
bool pendingTailWrites() const { return mode == WRITE && offset != 0; }
PageHeader* header() const { return ((PageHeader*)(page->mutateData())); }
QueuePage* header() const { return ((QueuePage*)(page->mutateData())); }
void setNext(LogicalPageID pageID, int offset) {
ASSERT(mode == WRITE);
PageHeader* p = header();
QueuePage* p = header();
p->nextPageID = pageID;
p->nextOffset = offset;
}
@ -656,6 +658,10 @@ public:
void writePage() {
ASSERT(mode == WRITE);
debug_printf("FIFOQueue::Cursor(%s) writePage\n", toString().c_str());
// Mark the unused part of the page as defined
VALGRIND_MAKE_MEM_DEFINED(header()->begin() + offset, header()->itemSpace - header()->endOffset);
queue->pager->updatePage(
PagerEventReasons::MetaData, nonBtreeLevel, VectorRef<LogicalPageID>(&pageID, 1), page);
if (firstPageIDWritten == invalidLogicalPageID) {
@ -713,16 +719,18 @@ public:
queue->usesExtents,
initializeExtentInfo);
page = queue->pager->newPageBuffer();
page->init(EncodingType::XOREncryption,
page->init(EncodingType::XXHash64,
queue->usesExtents ? PageType::QueuePageInExtent : PageType::QueuePageStandalone,
(uint8_t)queue->queueID);
setNext(0, 0);
auto p = header();
ASSERT(newOffset == 0);
p->endOffset = 0;
p->pageSize = page->dataSize() - sizeof(PageHeader);
p->itemSpace = page->dataSize() - sizeof(QueuePage);
if (BUGGIFY) {
p->itemSpace = deterministicRandom()->randomInt(50, p->itemSpace);
}
// p-> page size end = page->size() - sizeof(RawHeader)
// For extent based queue, update the index of current page within the extent
if (queue->usesExtents) {
debug_printf("FIFOQueue::Cursor(%s) Adding page %s init=%d pageCount %d\n",
@ -748,6 +756,9 @@ public:
::toString(p->extentEndPageID).c_str());
}
}
} else {
p->extentCurPageID = invalidPhysicalPageID;
p->extentEndPageID = invalidPhysicalPageID;
}
} else {
debug_printf("FIFOQueue::Cursor(%s) Clearing new page\n", toString().c_str());
@ -763,7 +774,7 @@ public:
state bool mustWait = self->isBusy();
state int bytesNeeded = Codec::bytesNeeded(item);
state bool needNewPage =
self->pageID == invalidLogicalPageID || self->offset + bytesNeeded > self->header()->pageSize;
self->pageID == invalidLogicalPageID || self->offset + bytesNeeded > self->header()->itemSpace;
if (BUGGIFY) {
// Sometimes (1% probability) decide a new page is needed as long as at least 1 item has been
@ -789,7 +800,7 @@ public:
// Otherwise, taking the mutex would be immediate so no other writer could have run
if (mustWait) {
needNewPage =
self->pageID == invalidLogicalPageID || self->offset + bytesNeeded > self->header()->pageSize;
self->pageID == invalidLogicalPageID || self->offset + bytesNeeded > self->header()->itemSpace;
if (BUGGIFY) {
// Sometimes (1% probability) decide a new page is needed as long as at least 1 item has been
// written (indicated by non-zero offset) to the current page.
@ -1035,6 +1046,8 @@ public:
numPages = 1;
numEntries = 0;
usesExtents = extent;
tailPageNewExtent = false;
prevExtentEndPageID = invalidLogicalPageID;
pagesPerExtent = pager->getPagesPerExtent();
headReader.init(this, Cursor::POP, newPageID, false, false, newPageID, 0);
tailWriter.init(this, Cursor::WRITE, newPageID, true, true);
@ -1053,6 +1066,7 @@ public:
numPages = qs.numPages;
numEntries = qs.numEntries;
usesExtents = qs.usesExtents;
tailPageNewExtent = qs.tailPageNewExtent;
pagesPerExtent = pager->getPagesPerExtent();
headReader.init(this, Cursor::POP, qs.headPageID, loadExtents, false, qs.tailPageID, qs.headOffset);
tailWriter.init(this,
@ -1133,7 +1147,7 @@ public:
throw;
}
PageHeader* p = (PageHeader*)(page->data());
const QueuePage* p = (const QueuePage*)(page->data());
int bytesRead;
// Now loop over all entries inside the current page
@ -1427,13 +1441,13 @@ public:
int pagesPerExtent;
bool usesExtents;
bool tailPageNewExtent;
LogicalPageID prevExtentEndPageID;
PhysicalPageID prevExtentEndPageID;
Cursor headReader;
Cursor tailWriter;
Cursor headWriter;
Future<LogicalPageID> newTailPage;
Future<PhysicalPageID> newTailPage;
// For debugging
std::string name;
@ -2140,7 +2154,8 @@ public:
}
void updateCommittedHeader() {
memcpy(lastCommittedHeaderPage->mutateData(), headerPage->data(), lastCommittedHeaderPage->dataSize());
lastCommittedHeaderPage = headerPage->cloneContents();
pLastCommittedHeader = (Header*)lastCommittedHeaderPage->mutateData();
}
ACTOR static Future<Void> recover(DWALPager* self) {
@ -2157,11 +2172,9 @@ public:
}
wait(store(self->pageFile, IAsyncFileSystem::filesystem()->open(self->filename, flags, 0644)));
}
// Header page is always treated as having a page size of smallestPhysicalBlock
self->setPageSize(smallestPhysicalBlock);
self->lastCommittedHeaderPage = self->newPageBuffer();
self->lastCommittedHeaderPage->init(EncodingType::XXHash64, PageType::BackupHeaderPage, 0);
self->pLastCommittedHeader = (Header*)self->lastCommittedHeaderPage->data();
state int64_t fileSize = 0;
if (exists) {
@ -2207,7 +2220,7 @@ public:
}
}
self->pHeader = (Header*)self->headerPage->data();
self->pHeader = (Header*)self->headerPage->mutateData();
if (self->pHeader->formatVersion != Header::FORMAT_VERSION) {
Error e = unsupported_format_version();
@ -2288,7 +2301,7 @@ public:
// If this fails, the backup header is still in tact for the next recovery attempt.
if (recoveredHeader) {
// Write the header to page 0
wait(self->writeHeaderPage(0, self->headerPage));
wait(self->writeHeaderPage(primaryHeaderPageID, self->headerPage));
// Wait for all outstanding writes to complete
wait(self->operations.signalAndCollapse());
@ -2316,7 +2329,8 @@ public:
self->headerPage = self->newPageBuffer();
self->headerPage->init(EncodingType::XXHash64, PageType::HeaderPage, 0);
self->pHeader = (Header*)self->headerPage->data();
self->pHeader = (Header*)self->headerPage->mutateData();
self->pHeader->headerSpace = self->headerPage->dataSize();
// Now that the header page has been allocated, set page size to desired
self->setPageSize(self->desiredPageSize);
@ -2552,9 +2566,8 @@ public:
toString(pageIDs).c_str(),
page->data());
VALGRIND_MAKE_MEM_DEFINED(page->rawData(), page->rawSize());
page->preWrite(pageIDs.front());
debug_printf("DWALPager(%s) writePhysicalPage %s\n", filename.c_str(), toString(pageIDs).c_str());
page->preWrite(pageIDs.front());
int blockSize = header ? smallestPhysicalBlock : physicalPageSize;
Future<Void> f;
if (pageIDs.size() == 1) {
@ -3417,7 +3430,7 @@ public:
debug_printf("DWALPager(%s) commit begin %s\n", self->filename.c_str(), ::toString(v).c_str());
// Write old committed header to Page 1
self->writeHeaderPage(1, self->lastCommittedHeaderPage);
self->writeHeaderPage(backupHeaderPageID, self->lastCommittedHeaderPage);
// Trigger the remap eraser to stop and then wait for it.
self->remapCleanupStop = true;
@ -3450,7 +3463,7 @@ public:
}
// Update header on disk and sync again.
wait(self->writeHeaderPage(0, self->headerPage));
wait(self->writeHeaderPage(primaryHeaderPageID, self->headerPage));
if (g_network->getCurrentTask() > TaskPriority::DiskWrite) {
wait(delay(0, TaskPriority::DiskWrite));
}
@ -3621,6 +3634,9 @@ private:
// Header is the format of page 0 of the database
struct Header {
static constexpr int FORMAT_VERSION = 9;
// Total space that Header can take up including members and variable sized metakey
uint16_t headerSpace;
uint16_t formatVersion;
uint32_t queueCount;
uint32_t pageSize;
@ -3638,11 +3654,15 @@ private:
KeyRef getMetaKey() const { return KeyRef((const uint8_t*)(this + 1), metaKeySize); }
void setMetaKey(StringRef key) {
ASSERT(key.size() < (smallestPhysicalBlock - sizeof(Header)));
ASSERT(key.size() < (headerSpace - sizeof(Header)));
metaKeySize = key.size();
if (key.size() > 0) {
memcpy(this + 1, key.begin(), key.size());
}
// Mark the rest of the header as defined
VALGRIND_MAKE_MEM_DEFINED((const uint8_t*)(this + 1) + key.size(),
headerSpace - sizeof(Header) - key.size());
}
int size() const { return sizeof(Header) + metaKeySize; }
@ -4682,7 +4702,7 @@ public:
Reference<ArenaPage> makeEmptyRoot() {
Reference<ArenaPage> page = m_pager->newPageBuffer();
page->init(m_encodingType, PageType::BTreeNode, 1);
BTreePage* btpage = (BTreePage*)page->data();
BTreePage* btpage = (BTreePage*)page->mutateData();
btpage->height = 1;
btpage->kvBytes = 0;
btpage->tree()->build(page->dataSize(), nullptr, nullptr, nullptr, nullptr);
@ -4727,7 +4747,7 @@ public:
for (i = 0; i < entries.size(); ++i) {
Reference<const ArenaPage> p = wait(entries[i].second);
const LazyClearQueueEntry& entry = entries[i].first;
const BTreePage& btPage = *(BTreePage*)p->data();
const BTreePage& btPage = *(const BTreePage*)p->data();
ASSERT(btPage.height == entry.height);
auto& metrics = g_redwoodMetrics.level(entry.height).metrics;
@ -4808,7 +4828,7 @@ public:
if (meta.size() == 0) {
// Create new BTree
self->m_pHeader->formatVersion = MetaKey::FORMAT_VERSION;
self->m_pHeader->encodingType = BUGGIFY ? EncodingType::XOREncryption : EncodingType::XXHash64;
self->m_pHeader->encodingType = self->m_encodingType;
LogicalPageID id = wait(self->m_pager->newPageID());
BTreePageIDRef newRoot((LogicalPageID*)&id, 1);
debug_printf("new root %s\n", toString(newRoot).c_str());
@ -5595,7 +5615,7 @@ private:
}
return BTreePage::BinaryTree::Cursor((BTreePage::BinaryTree::DecodeCache*)page->userData,
((BTreePage*)page->data())->tree());
((BTreePage*)page->mutateData())->tree());
}
// Get cursor into a BTree node from a child link
@ -5606,7 +5626,7 @@ private:
}
return BTreePage::BinaryTree::Cursor((BTreePage::BinaryTree::DecodeCache*)page->userData,
((BTreePage*)page->data())->tree());
((BTreePage*)page->mutateData())->tree());
}
static void preLoadPage(IPagerSnapshot* snapshot, BTreePageIDRef pageIDs, int priority) {
@ -5640,7 +5660,7 @@ private:
newID.resize(*arena, oldID.size());
if (REDWOOD_DEBUG) {
BTreePage* btPage = (BTreePage*)page->data();
const BTreePage* btPage = (const BTreePage*)page->mutateData();
BTreePage::BinaryTree::DecodeCache* cache = (BTreePage::BinaryTree::DecodeCache*)page->userData;
debug_printf_always(
"updateBTreePage(%s, %s) %s\n",
@ -5651,7 +5671,7 @@ private:
: btPage->toString(true, oldID, writeVersion, cache->lowerBound, cache->upperBound).c_str());
}
state unsigned int height = (unsigned int)((BTreePage*)page->data())->height;
state unsigned int height = (unsigned int)((const BTreePage*)page->data())->height;
if (oldID.size() == 1) {
LogicalPageID id = wait(
self->m_pager->atomicUpdatePage(PagerEventReasons::Commit, height, oldID.front(), page, writeVersion));
@ -5680,10 +5700,12 @@ private:
static Reference<ArenaPage> clonePageForUpdate(Reference<const ArenaPage> page) {
Reference<ArenaPage> newPage = page->cloneContents();
BTreePage::BinaryTree::DecodeCache* cache = (BTreePage::BinaryTree::DecodeCache*)page->userData;
cache->addref();
newPage->userData = cache;
newPage->userDataDestructor = [](void* cache) { ((BTreePage::BinaryTree::DecodeCache*)cache)->delref(); };
if (newPage->userData != nullptr) {
BTreePage::BinaryTree::DecodeCache* cache = (BTreePage::BinaryTree::DecodeCache*)page->userData;
cache->addref();
newPage->userData = cache;
newPage->userDataDestructor = page->userDataDestructor;
}
debug_printf("cloneForUpdate(%p -> %p size=%d\n", page->data(), newPage->data(), page->dataSize());
return newPage;
@ -5844,7 +5866,7 @@ private:
bool changesMade;
ParentInfo* parentInfo;
BTreePage* btPage() const { return (BTreePage*)page->data(); }
BTreePage* btPage() const { return (BTreePage*)page->mutateData(); }
bool empty() const {
if (updating) {
@ -5867,7 +5889,7 @@ private:
int i = 0;
if (updating) {
// Update must be done in the new tree, not the original tree where the end cursor will be from
end.tree = btPage()->tree();
end.switchTree(btPage()->tree(), true);
// TODO: insert recs in a random order to avoid new subtree being entirely right child links
while (i != recs.size()) {
@ -5939,7 +5961,7 @@ private:
if (c != u.cEnd) {
cloneForUpdate();
// must point c to the tree to erase from
c.tree = btPage()->tree();
c.switchTree(btPage()->tree(), true);
}
while (c != u.cEnd) {
@ -6029,7 +6051,7 @@ private:
// If the page is not in the cache, then no copy is needed so we will initialize pageCopy to page
state Reference<const ArenaPage> pageCopy;
state BTreePage* btPage = (BTreePage*)page->data();
state BTreePage* btPage = (BTreePage*)page->mutateData();
ASSERT(height == btPage->height);
++g_redwoodMetrics.level(height).metrics.pageCommitStart;
@ -6075,7 +6097,7 @@ private:
auto copyForUpdate = [&]() {
if (!pageCopy.isValid()) {
pageCopy = clonePageForUpdate(page);
btPage = (BTreePage*)pageCopy->data();
btPage = (BTreePage*)pageCopy->mutateData();
cursor.switchTree(btPage->tree());
}
};
@ -6828,7 +6850,7 @@ public:
Standalone<BTreePageIDRef> id;
#endif
const BTreePage* btPage() const { return (BTreePage*)page->data(); };
const BTreePage* btPage() const { return (const BTreePage*)page->data(); };
};
private:

View File

@ -73,3 +73,10 @@ T waitNext(const FutureStream<T>&);
#ifdef _MSC_VER
#pragma warning(disable : 4355) // 'this' : used in base member initializer list
#endif
// Currently, #ifdef can't be used inside actors, so define no-op versions of these valgrind
// functions if valgrind is not defined
#ifndef VALGRIND
#define VALGRIND_MAKE_MEM_UNDEFINED(x, y)
#define VALGRIND_MAKE_MEM_DEFINED(x, y)
#endif