From e69bc04a3b9157d697a7eb79e06ca92e4948d2b1 Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Thu, 4 Nov 2021 00:40:21 -0700 Subject: [PATCH 01/54] Change asserts that should never be disabled to unstoppable asserts since they have critical side effects. --- fdbserver/VersionedBTree.actor.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 7c79052dd3..7d98ce7b63 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -7099,13 +7099,13 @@ public: // The last entry in an internal page could be a null link, if so move back if (!forward && !entry.cursor.get().value.present()) { - ASSERT(entry.cursor.movePrev()); - ASSERT(entry.cursor.get().value.present()); + UNSTOPPABLE_ASSERT(entry.cursor.movePrev()); + UNSTOPPABLE_ASSERT(entry.cursor.get().value.present()); } wait(self->pushPage(entry.cursor)); auto& newEntry = self->path.back(); - ASSERT(forward ? newEntry.cursor.moveFirst() : newEntry.cursor.moveLast()); + UNSTOPPABLE_ASSERT(forward ? newEntry.cursor.moveFirst() : newEntry.cursor.moveLast()); } self->valid = true; From 5c85555ea957304eddb86e80b61e38beea15a748 Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Sat, 6 Nov 2021 21:07:00 -0700 Subject: [PATCH 02/54] Refactored mutation application in leaf nodes to do fewer comparisons and do in place value updates if the new value is the same size as the old value. --- fdbserver/DeltaTree.h | 6 +- fdbserver/VersionedBTree.actor.cpp | 185 +++++++++++++++-------------- 2 files changed, 104 insertions(+), 87 deletions(-) diff --git a/fdbserver/DeltaTree.h b/fdbserver/DeltaTree.h index 4a2f77c52f..9ea758aee5 100644 --- a/fdbserver/DeltaTree.h +++ b/fdbserver/DeltaTree.h @@ -1226,7 +1226,11 @@ public: return item.get(); } - void switchTree(DeltaTree2* newTree) { tree = newTree; } + void switchTree(DeltaTree2* newTree) { + tree = newTree; + // Reset item because it may point into tree memory + item.reset(); + } // If the cursor is valid, return a reference to the cursor's internal T. // Otherwise, returns a reference to the cache's upper boundary. diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 7c79052dd3..de582fde9e 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -4941,6 +4941,7 @@ private: bool clearAfterBoundary; bool boundaryCleared() const { return boundaryChanged && !boundaryValue.present(); } + bool boundarySet() const { return boundaryChanged && boundaryValue.present(); } // Returns true if this RangeMutation doesn't actually mutate anything bool noChanges() const { return !boundaryChanged && !clearAfterBoundary; } @@ -4960,8 +4961,6 @@ private: boundaryValue = v; } - bool boundarySet() const { return boundaryChanged && boundaryValue.present(); } - std::string toString() const { return format("boundaryChanged=%d clearAfterBoundary=%d boundaryValue=%s", boundaryChanged, @@ -6077,13 +6076,22 @@ private: // Leaf Page if (btPage->isLeaf()) { - bool updating = tryToUpdate; + bool updatingInPlace = tryToUpdate; bool changesMade = false; + // Copy page for modification if not already copied + auto copyForUpdate = [&]() { + if (!pageCopy.isValid()) { + pageCopy = clonePageForUpdate(page); + btPage = (BTreePage*)pageCopy->begin(); + cursor.switchTree(btPage->tree()); + } + }; + state Standalone> merged; auto switchToLinearMerge = [&]() { // Couldn't make changes in place, so now do a linear merge and build new pages. - updating = false; + updatingInPlace = false; auto c = cursor; c.moveFirst(); while (c != cursor) { @@ -6100,48 +6108,59 @@ private: // Now, process each mutation range and merge changes with existing data. bool firstMutationBoundary = true; - while (mBegin != mEnd) { - debug_printf("%s New mutation boundary: '%s': %s\n", - context.c_str(), - printable(mBegin.key()).c_str(), - mBegin.mutation().toString().c_str()); + constexpr int maxHeightAllowed = 8; + while (mBegin != mEnd) { // Apply the change to the mutation buffer start boundary key only if - // - there actually is a change (whether a set or a clear, old records are to be removed) + // - there actually is a change (clear or set to new value) // - either this is not the first boundary or it is but its key matches our lower bound key bool applyBoundaryChange = mBegin.mutation().boundaryChanged && (!firstMutationBoundary || mBegin.key() == update->subtreeLowerBound.key); + bool boundaryExists = cursor.valid() && cursor.get().key == mBegin.key(); + + debug_printf("%s New mutation boundary: '%s': %s applyBoundaryChange=%d boundaryExists=%d " + "updatingInPlace=%d\n", + context.c_str(), + printable(mBegin.key()).c_str(), + mBegin.mutation().toString().c_str(), + applyBoundaryChange, + boundaryExists, + updatingInPlace); + firstMutationBoundary = false; - // Iterate over records for the mutation boundary key, keep them unless the boundary key was changed or - // we are not applying it - while (cursor.valid() && cursor.get().key == mBegin.key()) { - // If there were no changes to the key or we're not applying it - if (!applyBoundaryChange) { - // If not updating, add to the output set, otherwise skip ahead past the records for the - // mutation boundary - if (!updating) { - merged.push_back(merged.arena(), cursor.get()); - debug_printf("%s Added %s [existing, boundary start]\n", - context.c_str(), - cursor.get().toString().c_str()); - } - cursor.moveNext(); - } else { + if (applyBoundaryChange) { + // If the boundary is being set to a value, the new KV record will be inserted + bool shouldInsertBoundary = mBegin.mutation().boundarySet(); + + // Optimization: In-place value update of new same-sized value + // If the boundary exists in the page and we're in update mode and the boundary is being set to a + // new value of the same length as the old value then just update the value bytes. + if (boundaryExists && updatingInPlace && shouldInsertBoundary && + mBegin.mutation().boundaryValue.get().size() == cursor.get().value.get().size()) { changesMade = true; + shouldInsertBoundary = false; + + debug_printf("%s In-place value update for %s [existing, boundary start]\n", + context.c_str(), + cursor.get().toString().c_str()); + + copyForUpdate(); + memcpy((uint8_t*)cursor.get().value.get().begin(), + mBegin.mutation().boundaryValue.get().begin(), + cursor.get().value.get().size()); + cursor.moveNext(); + } else if (boundaryExists) { + // An in place update can't be done, so if the boundary exists then erase or skip the record + changesMade = true; + // If updating, erase from the page, otherwise do not add to the output set - if (updating) { + if (updatingInPlace) { debug_printf("%s Erasing %s [existing, boundary start]\n", context.c_str(), cursor.get().toString().c_str()); - // Copy page for modification if not already copied - if (!pageCopy.isValid()) { - pageCopy = clonePageForUpdate(page); - btPage = (BTreePage*)pageCopy->begin(); - cursor.tree = btPage->tree(); - } - + copyForUpdate(); btPage->kvBytes -= cursor.get().kvBytes(); cursor.erase(); } else { @@ -6151,42 +6170,46 @@ private: cursor.moveNext(); } } - } - constexpr int maxHeightAllowed = 8; + // If the boundary value is being set and we must insert it, add it to the page or the output set + if (shouldInsertBoundary) { + RedwoodRecordRef rec(mBegin.key(), mBegin.mutation().boundaryValue.get()); + changesMade = true; - // Write the new record(s) for the mutation boundary start key if its value has been set - // Clears of this key will have been processed above by not being erased from the updated page or - // excluded from the merge output - if (applyBoundaryChange && mBegin.mutation().boundarySet()) { - RedwoodRecordRef rec(mBegin.key(), mBegin.mutation().boundaryValue.get()); - changesMade = true; - - // If updating, add to the page, else add to the output set - if (updating) { - // Copy page for modification if not already copied - if (!pageCopy.isValid()) { - pageCopy = clonePageForUpdate(page); - btPage = (BTreePage*)pageCopy->begin(); - cursor.tree = btPage->tree(); + // If updating, first try to add the record to the page + if (updatingInPlace) { + copyForUpdate(); + if (cursor.insert(rec, update->skipLen, maxHeightAllowed)) { + btPage->kvBytes += rec.kvBytes(); + debug_printf("%s Inserted %s [mutation, boundary start]\n", + context.c_str(), + rec.toString().c_str()); + } else { + debug_printf("%s Insert failed for %s [mutation, boundary start]\n", + context.c_str(), + rec.toString().c_str()); + switchToLinearMerge(); + } } - if (cursor.insert(rec, update->skipLen, maxHeightAllowed)) { - btPage->kvBytes += rec.kvBytes(); + // If not updating, add record to the output set + if (!updatingInPlace) { + merged.push_back(merged.arena(), rec); debug_printf( - "%s Inserted %s [mutation, boundary start]\n", context.c_str(), rec.toString().c_str()); - } else { - debug_printf("%s Insert failed for %s [mutation, boundary start]\n", - context.c_str(), - rec.toString().c_str()); - switchToLinearMerge(); + "%s Added %s [mutation, boundary start]\n", context.c_str(), rec.toString().c_str()); } } - - if (!updating) { - merged.push_back(merged.arena(), rec); - debug_printf( - "%s Added %s [mutation, boundary start]\n", context.c_str(), rec.toString().c_str()); + } else if (boundaryExists) { + // If the boundary exists in the page but there is no pending change, + // then if updating move past it, otherwise add it to the output set. + if (updatingInPlace) { + cursor.moveNext(); + } else { + merged.push_back(merged.arena(), cursor.get()); + debug_printf("%s Added %s [existing, boundary start]\n", + context.c_str(), + cursor.get().toString().c_str()); + cursor.moveNext(); } } @@ -6205,35 +6228,29 @@ private: // If the records are being removed and we're not doing an in-place update // OR if we ARE doing an update but the records are NOT being removed, then just skip them. - if (remove != updating) { + if (remove != updatingInPlace) { // If not updating, then the records, if any exist, are being removed. We don't know if there // actually are any but we must assume there are. - if (!updating) { + if (!updatingInPlace) { changesMade = true; } debug_printf("%s Seeking forward to next boundary (remove=%d updating=%d) %s\n", context.c_str(), remove, - updating, + updatingInPlace, mBegin.key().toString().c_str()); cursor.seekGreaterThanOrEqual(end, update->skipLen); } else { // Otherwise we must visit the records. If updating, the visit is to erase them, and if doing a // linear merge than the visit is to add them to the output set. while (cursor.valid() && cursor.get().compare(end, update->skipLen) < 0) { - if (updating) { + if (updatingInPlace) { debug_printf("%s Erasing %s [existing, boundary start]\n", context.c_str(), cursor.get().toString().c_str()); - // Copy page for modification if not already copied - if (!pageCopy.isValid()) { - pageCopy = clonePageForUpdate(page); - btPage = (BTreePage*)pageCopy->begin(); - cursor.tree = btPage->tree(); - } - + copyForUpdate(); btPage->kvBytes -= cursor.get().kvBytes(); cursor.erase(); changesMade = true; @@ -6257,27 +6274,23 @@ private: // If we don't have to remove the records and we are updating, do nothing. // If we do have to remove the records and we are not updating, do nothing. - if (remove != updating) { - debug_printf( - "%s Ignoring remaining records, remove=%d updating=%d\n", context.c_str(), remove, updating); + if (remove != updatingInPlace) { + debug_printf("%s Ignoring remaining records, remove=%d updating=%d\n", + context.c_str(), + remove, + updatingInPlace); } else { // If updating and the key is changing, we must visit the records to erase them. // If not updating and the key is not changing, we must visit the records to add them to the output // set. while (cursor.valid()) { - if (updating) { + if (updatingInPlace) { debug_printf( "%s Erasing %s and beyond [existing, matches changed upper mutation boundary]\n", context.c_str(), cursor.get().toString().c_str()); - // Copy page for modification if not already copied - if (!pageCopy.isValid()) { - pageCopy = clonePageForUpdate(page); - btPage = (BTreePage*)pageCopy->begin(); - cursor.tree = btPage->tree(); - } - + copyForUpdate(); btPage->kvBytes -= cursor.get().kvBytes(); cursor.erase(); } else { @@ -6305,7 +6318,7 @@ private: context.c_str()); } - if (updating) { + if (updatingInPlace) { // If the tree is now empty, delete the page if (cursor.tree->numItems == 0) { update->cleared(); @@ -6581,7 +6594,7 @@ private: if (modifier.clonedPage) { pageCopy = modifier.page; btPage = modifier.btPage(); - cursor.tree = btPage->tree(); + cursor.switchTree(btPage->tree()); } // If page contents have changed From d1ede0225bf94e59df71e8c358505f407a4c22bf Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Sun, 5 Dec 2021 13:39:35 -0800 Subject: [PATCH 03/54] Initial commit of Redwood page header format and API to enable different page encodings to support encryption or different checksum types. --- fdbserver/IPager.h | 268 ++++++++++++++-- fdbserver/VersionedBTree.actor.cpp | 498 ++++++++++++++--------------- fdbserver/art_impl.h | 4 +- flow/error_definitions.h | 2 +- 4 files changed, 489 insertions(+), 283 deletions(-) diff --git a/fdbserver/IPager.h b/fdbserver/IPager.h index 1346279da9..d39122e529 100644 --- a/fdbserver/IPager.h +++ b/fdbserver/IPager.h @@ -20,12 +20,15 @@ #ifndef FDBSERVER_IPAGER_H #define FDBSERVER_IPAGER_H +#include "flow/Error.h" +#include #pragma once #include "fdbserver/IKeyValueStore.h" #include "flow/flow.h" #include "fdbclient/FDBTypes.h" +#define XXH_INLINE_ALL #include "flow/xxhash.h" #ifndef VALGRIND @@ -76,11 +79,37 @@ static const std::vector> L0PossibleEv { PagerEvents::PageWrite, PagerEventReasons::MetaData }, }; +enum EncodingType : uint8_t { + XXHash64 = 0, + // For testing purposes + XOREncryption = 1 +}; + +enum PageType : uint8_t { + HeaderPage = 0, + BackupHeaderPage = 1, + BTreeNode = 2, + BTreeSuperNode = 3, + QueuePageStandalone = 4, + QueuePageInExtent = 5 +}; + // Represents a block of memory in a 4096-byte aligned location held by an Arena. +// Page Format: +// VersionHeader +// Header based on headerVersion +// EncodingType-specific Header +// Footer based headerVersion +// Payload acording to encoding +// +// Pages can only be written using the latest HeaderVersion +// +// preWrite() must be called before writing a page to disk, which will do any checksum generation or encryption needed +// postRead() must be called after loading a page from disk, which will do any verification or decryption needed class ArenaPage : public ReferenceCounted, public FastAllocated { public: - // The page's logical size includes an opaque checksum, use size() to get usable size - ArenaPage(int logicalSize, int bufferSize) : logicalSize(logicalSize), bufferSize(bufferSize), userData(nullptr) { + ArenaPage(int logicalSize, int bufferSize) + : logicalSize(logicalSize), bufferSize(bufferSize), pUsable(nullptr), userData(nullptr) { if (bufferSize > 0) { buffer = (uint8_t*)arena.allocate4kAlignedBuffer(bufferSize); @@ -91,27 +120,163 @@ public: } }; + // Convenient constructor that returns a reference + static Reference create(int logicalSize, int bufferSize) { + return Reference(new ArenaPage(logicalSize, bufferSize)); + } + ~ArenaPage() { if (userData != nullptr && userDataDestructor != nullptr) { userDataDestructor(userData); } } - uint8_t const* begin() const { return (uint8_t*)buffer; } + // Before using begin() or size(), either init() or postRead() must be called + const uint8_t* data() const { return pUsable; } + uint8_t* mutateData() const { return (uint8_t*)pUsable; } + int dataSize() const { return usableSize; } - uint8_t* mutate() { return (uint8_t*)buffer; } + const uint8_t* rawData() const { return buffer; } + uint8_t* rawData() { return buffer; } + int rawSize() const { return bufferSize; } + static constexpr uint8_t HEADER_WRITE_VERSION = 1; - typedef XXH64_hash_t Checksum; +#pragma pack(push, 1) - // Usable size, without checksum - int size() const { return logicalSize - sizeof(Checksum); } + // This can't change + struct VersionHeader { + uint8_t headerVersion; + EncodingType encodingType; + }; - Standalone asStringRef() const { return Standalone(StringRef(begin(), size()), arena); } + struct Header { + // pageType Meaning is based on type. + // For Queue pages, pageSubType is QueueID + // For BTree nodes, pageSubType is Height (also stored in BTreeNode) + PageType pageType; + uint8_t pageSubType; + + // Physical page ID of first block on disk of the ArenaPage + PhysicalPageID firstPhysicalPageID; + // The first logical page ID the ArenaPage was referenced by when last written + LogicalPageID lastKnownLogicalPageID; + // The first logical page ID of the parent of this ArenaPage when last written + LogicalPageID lastKnownParentID; + + // Time and write version as of the last update to this page + double writeTime; + Version writeVersion; + }; + + struct XXHashEncodingHeader { + XXH64_hash_t checksum; + void encode(uint8_t* payload, int len, PhysicalPageID seed) { + checksum = XXH3_64bits_withSeed(payload, len, seed); + } + void decode(uint8_t* payload, int len, PhysicalPageID seed) { + if (checksum != XXH3_64bits_withSeed(payload, len, seed)) { + throw checksum_failed(); + } + } + }; + + struct XOREncodingHeader { + XXH64_hash_t checksum; + uint8_t keyID; + void encode(uint8_t secret, uint8_t* payload, int len, PhysicalPageID seed) { + uint8_t key = secret ^ keyID; + for (int i = 0; i < len; ++i) { + payload[i] ^= key; + } + checksum = XXH3_64bits_withSeed(payload, len, seed); + } + void decode(uint8_t secret, uint8_t* payload, int len, PhysicalPageID seed) { + if (checksum != XXH3_64bits_withSeed(payload, len, seed)) { + throw checksum_failed(); + } + uint8_t key = secret ^ keyID; + for (int i = 0; i < len; ++i) { + payload[i] ^= key; + } + } + }; + + struct Footer { + XXH64_hash_t checksum; + void update(uint8_t* payload, int len) { checksum = XXH3_64bits(payload, len); } + void verify(uint8_t* payload, int len) { + if (checksum != XXH3_64bits(payload, len)) { + throw checksum_failed(); + } + } + }; + +#pragma pack(pop) + + // Syntactic sugar for getting a series of types from a byte buffer + // The Reader casts to any T * and increments the read pointer by T's size. + struct Reader { + uint8_t* ptr; + template + operator T*() { + T* p = (T*)ptr; + ptr += sizeof(T); + return p; + } + template + void skip() { + ptr += sizeof(T); + } + }; + + // Initialize the header for a new page to be populated soon and written to disk + void init(EncodingType t, PageType pageType, uint8_t pageSubType) { + Reader next{ buffer }; + VersionHeader* vh = next; + // Only the latest header version is written. + vh->headerVersion = HEADER_WRITE_VERSION; + vh->encodingType = t; + + Header* h = next; + h->pageType = pageType; + h->pageSubType = pageSubType; + + if (t == EncodingType::XXHash64) { + next.skip(); + } else if (t == EncodingType::XOREncryption) { + next.skip(); + } else { + throw unsupported_format_version(); + } + + next.skip(); + + pUsable = next; + usableSize = logicalSize - (pUsable - buffer); + } + + // Get the usable size for a new page of pageSize using HEADER_WRITE_VERSION with encoding type t + static int getUsableSize(int pageSize, EncodingType t) { + int usable = pageSize - sizeof(VersionHeader) - sizeof(Header) - sizeof(XXH64_hash_t); + + if (t == EncodingType::XXHash64) { + usable -= sizeof(XXHashEncodingHeader); + } else if (t == EncodingType::XOREncryption) { + usable -= sizeof(XOREncodingHeader); + } else { + throw unsupported_format_version(); + } + + return usable; + } + + Standalone asStringRef() const { return Standalone(StringRef(buffer, logicalSize)); } // Get an ArenaPage which is a copy of this page, in its own Arena Reference cloneContents() const { ArenaPage* p = new ArenaPage(logicalSize, bufferSize); memcpy(p->buffer, buffer, logicalSize); + p->pUsable = p->buffer + (pUsable - buffer); return Reference(p); } @@ -123,41 +288,91 @@ public: return Reference(p); } - // Given a vector of pages with the same ->size(), create a new ArenaPage with a ->size() that is - // equivalent to all of the input pages and has all of their contents copied into it. - static Reference concatPages(const std::vector>& pages) { - int usableSize = pages.front()->size(); - int totalUsableSize = pages.size() * usableSize; - int totalBufferSize = pages.front()->bufferSize * pages.size(); - ArenaPage* superpage = new ArenaPage(totalUsableSize + sizeof(Checksum), totalBufferSize); + // Must be called before writing to disk to update headers and encrypt page + // Pre: Encoding secrets and other options must be set + // Post: Encoding options will be stored in page if needed, payload will be encrypted + void preWrite(PhysicalPageID pageID) const { + Reader next{ buffer }; + const VersionHeader* vh = next; + ASSERT(vh->headerVersion == HEADER_WRITE_VERSION); - uint8_t* wptr = superpage->mutate(); - for (auto& p : pages) { - ASSERT(p->size() == usableSize); - memcpy(wptr, p->begin(), usableSize); - wptr += usableSize; + Header* h = next; + h->firstPhysicalPageID = pageID; + + if (vh->encodingType == EncodingType::XXHash64) { + XXHashEncodingHeader* xh = next; + xh->encode(pUsable, usableSize, pageID); + } else if (vh->encodingType == EncodingType::XOREncryption) { + XOREncodingHeader* xorh = next; + xorh->keyID = xorKeyID; + xorh->encode(xorKeySecret, pUsable, usableSize, pageID); + } else { + throw unsupported_format_version(); } - return Reference(superpage); + Footer* f = next; + f->update(buffer, (uint8_t*)f - buffer); } - Checksum& getChecksum() { return *(Checksum*)(buffer + size()); } + // Must be called after reading from disk to verify and decrypt page + // Pre: Encoding secrets must be set + // Post: Encoding options that come from page data will be populated, payload will be decrypted + void postRead(PhysicalPageID pageID) { + Reader next{ buffer }; + const VersionHeader* vh = next; - Checksum calculateChecksum(LogicalPageID pageID) { return XXH3_64bits_withSeed(buffer, size(), pageID); } + if (vh->headerVersion == 1) { + next.skip
(); + XXHashEncodingHeader* xh = nullptr; + XOREncodingHeader* xorh = nullptr; - void updateChecksum(LogicalPageID pageID) { getChecksum() = calculateChecksum(pageID); } + if (vh->encodingType == EncodingType::XXHash64) { + xh = next; + } else if (vh->encodingType == EncodingType::XOREncryption) { + xorh = next; + } else { + throw unsupported_format_version(); + } - bool verifyChecksum(LogicalPageID pageID) { return getChecksum() == calculateChecksum(pageID); } + Footer* f = next; + f->verify(buffer, (uint8_t*)f - buffer); + + if (xh != nullptr) { + xh->decode(pUsable, usableSize, pageID); + } else if (xorh != nullptr) { + xorh->decode(xorKeySecret, pUsable, usableSize, pageID); + xorKeyID = xorh->keyID; + } + } else { + throw unsupported_format_version(); + } + } const Arena& getArena() const { return arena; } private: Arena arena; + + // The logical size of the page, which can be smaller than bufferSize, which is only of + // practical purpose in simulation to use arbitrarily small page sizes to test edge cases + // with shorter execution time int logicalSize; + + // The physical size of allocated memory for the page which also represents the space + // to be written to disk int bufferSize; uint8_t* buffer; + // Pointer and length of page space available to the user + uint8_t* pUsable; + int usableSize; + + // Encoding-specific secrets + uint8_t xorKeyID; + uint8_t xorKeySecret; + public: + // A metadata object that can be attached to the page and will be deleted with the page mutable void* userData; mutable void (*userDataDestructor)(void*); }; @@ -191,12 +406,11 @@ public: class IPager2 : public IClosable { public: // Returns an ArenaPage that can be passed to writePage. The data in the returned ArenaPage might not be zeroed. - virtual Reference newPageBuffer(size_t size = 1) = 0; + virtual Reference newPageBuffer(size_t blocks = 1) = 0; // Returns the usable size of pages returned by the pager (i.e. the size of the page that isn't pager overhead). // For a given pager instance, separate calls to this function must return the same value. // Only valid to call after recovery is complete. - virtual int getUsablePageSize() const = 0; virtual int getPhysicalPageSize() const = 0; virtual int getLogicalPageSize() const = 0; virtual int getPagesPerExtent() const = 0; diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 696b5145b0..9a90b95d47 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -20,8 +20,10 @@ #include "fdbclient/FDBTypes.h" #include "fdbserver/Knobs.h" +#include "flow/Error.h" #include "flow/IRandom.h" #include "flow/Knobs.h" +#include "flow/Trace.h" #include "flow/flow.h" #include "flow/Histogram.h" #include @@ -46,7 +48,7 @@ #include #include -#define REDWOOD_DEBUG 0 +#define REDWOOD_DEBUG 1 // Only print redwood debug statements for a certain address. Useful in simulation with many redwood processes to reduce // log size. @@ -465,17 +467,18 @@ public: } }; - struct RawPage { + struct PageHeader { // The next page of the queue after this one LogicalPageID nextPageID; // The start offset of the next page uint16_t nextOffset; - // The end offset of the current page + // The end offset of the current page's data entries uint16_t endOffset; // Current page within the extent LogicalPageID extentCurPageID; - // The nd page within the extent + // The end page within the extent LogicalPageID extentEndPageID; + uint16_t pageSize; // Get pointer to data after page header uint8_t* begin() { return (uint8_t*)(this + 1); } }; @@ -496,7 +499,7 @@ public: // The first page ID to be written to the pager, if this cursor has written anything LogicalPageID firstPageIDWritten; - // Offset after RawPage header in page to next read from or write to + // Offset after PageHeader header in page to next read from or write to int offset; // A read cursor will not read this page (or beyond) @@ -587,7 +590,7 @@ public: this, ::toString(pageID).c_str(), offset, - page ? raw()->endOffset : -1); + page ? header()->endOffset : -1); } if (mode == POP || mode == READONLY) { return format("{ReadCursor %s:%p pos=%s:%d rawEndOffset=%d endPage=%s nextPage=%s}", @@ -595,7 +598,7 @@ public: this, ::toString(pageID).c_str(), offset, - page ? raw()->endOffset : -1, + page ? header()->endOffset : -1, ::toString(endPageID).c_str(), ::toString(nextPageID).c_str()); } @@ -620,11 +623,11 @@ public: // Returns true if any items have been written to the last page bool pendingTailWrites() const { return mode == WRITE && offset != 0; } - RawPage* raw() const { return ((RawPage*)(page->begin())); } + PageHeader* header() const { return ((PageHeader*)(page->mutateData())); } void setNext(LogicalPageID pageID, int offset) { ASSERT(mode == WRITE); - RawPage* p = raw(); + PageHeader* p = header(); p->nextPageID = pageID; p->nextOffset = offset; } @@ -645,7 +648,7 @@ public: debug_printf("FIFOQueue::Cursor(%s) loadExtent\n", toString().c_str()); return map(queue->pager->readExtent(pageID), [=](Reference p) { page = p; - debug_printf("FIFOQueue::Cursor(%s) loadExtent done. Page: %p\n", toString().c_str(), page->begin()); + debug_printf("FIFOQueue::Cursor(%s) loadExtent done. Page: %p\n", toString().c_str(), page->data()); return Void(); }); } @@ -653,8 +656,6 @@ public: void writePage() { ASSERT(mode == WRITE); debug_printf("FIFOQueue::Cursor(%s) writePage\n", toString().c_str()); - VALGRIND_MAKE_MEM_DEFINED(raw()->begin(), offset); - VALGRIND_MAKE_MEM_DEFINED(raw()->begin() + offset, queue->dataBytesPerPage - raw()->endOffset); queue->pager->updatePage( PagerEventReasons::MetaData, nonBtreeLevel, VectorRef(&pageID, 1), page); if (firstPageIDWritten == invalidLogicalPageID) { @@ -692,8 +693,7 @@ public: ::toString(newPageID).c_str(), newOffset); writePage(); - auto p = raw(); - prevExtentEndPageID = p->extentEndPageID; + prevExtentEndPageID = header()->extentEndPageID; if (pageID == prevExtentEndPageID) newExtentPage = true; debug_printf( @@ -707,25 +707,22 @@ public: pageID = newPageID; offset = newOffset; - if (BUGGIFY) { - // Randomly change the byte limit for queue pages. The min here must be large enough for at least one - // queue item of any type. This change will suddenly make some pages being written to seem overfilled - // but this won't break anything, the next write will just be detected as not fitting and the page will - // end. - queue->dataBytesPerPage = - deterministicRandom()->randomInt(50, queue->pager->getUsablePageSize() - sizeof(RawPage)); - } - if (initializeNewPage) { debug_printf("FIFOQueue::Cursor(%s) Initializing new page. usesExtents: %d, initializeExtentInfo: %d\n", toString().c_str(), queue->usesExtents, initializeExtentInfo); page = queue->pager->newPageBuffer(); + page->init(EncodingType::XOREncryption, + queue->usesExtents ? PageType::QueuePageInExtent : PageType::QueuePageStandalone, + (uint8_t)queue->queueID); setNext(0, 0); - auto p = raw(); + auto p = header(); ASSERT(newOffset == 0); p->endOffset = 0; + p->pageSize = page->dataSize() - sizeof(PageHeader); + + // p-> page size end = page->size() - sizeof(RawHeader) // For extent based queue, update the index of current page within the extent if (queue->usesExtents) { debug_printf("FIFOQueue::Cursor(%s) Adding page %s init=%d pageCount %d\n", @@ -766,7 +763,7 @@ public: state bool mustWait = self->isBusy(); state int bytesNeeded = Codec::bytesNeeded(item); state bool needNewPage = - self->pageID == invalidLogicalPageID || self->offset + bytesNeeded > self->queue->dataBytesPerPage; + self->pageID == invalidLogicalPageID || self->offset + bytesNeeded > self->header()->pageSize; if (BUGGIFY) { // Sometimes (1% probability) decide a new page is needed as long as at least 1 item has been @@ -791,8 +788,8 @@ public: // would have changed the cursor state // Otherwise, taking the mutex would be immediate so no other writer could have run if (mustWait) { - needNewPage = self->pageID == invalidLogicalPageID || - self->offset + bytesNeeded > self->queue->dataBytesPerPage; + needNewPage = + self->pageID == invalidLogicalPageID || self->offset + bytesNeeded > self->header()->pageSize; if (BUGGIFY) { // Sometimes (1% probability) decide a new page is needed as long as at least 1 item has been // written (indicated by non-zero offset) to the current page. @@ -807,16 +804,13 @@ public: if (needNewPage) { debug_printf("FIFOQueue::Cursor(%s) write(%s) page is full, adding new page\n", self->toString().c_str(), - ::toString(item).c_str(), - ::toString(self->pageID).c_str(), - bytesNeeded, - self->queue->dataBytesPerPage); + ::toString(item).c_str()); state LogicalPageID newPageID; // If this is an extent based queue, check if there is an available page in current extent if (self->queue->usesExtents) { bool allocateNewExtent = false; if (self->pageID != invalidLogicalPageID) { - auto praw = self->raw(); + auto praw = self->header(); if (praw->extentCurPageID < praw->extentEndPageID) { newPageID = praw->extentCurPageID + 1; } else { @@ -839,7 +833,7 @@ public: debug_printf( "FIFOQueue::Cursor(%s) write(%s) writing\n", self->toString().c_str(), ::toString(item).c_str()); - auto p = self->raw(); + auto p = self->header(); Codec::writeToBytes(p->begin() + self->offset, item); self->offset += bytesNeeded; p->endOffset = self->offset; @@ -944,7 +938,7 @@ public: page = nextPageReader.get(); // Start loading the next page if it's not the end page - auto p = raw(); + auto p = header(); if (p->nextPageID != endPageID) { startNextPageLoad(p->nextPageID); } else { @@ -953,7 +947,7 @@ public: nextPageID = invalidLogicalPageID; } } - auto p = raw(); + auto p = header(); debug_printf("FIFOQueue::Cursor(%s) readNext reading at current position\n", toString().c_str()); ASSERT(offset < p->endOffset); int bytesRead; @@ -1040,7 +1034,6 @@ public: queueID = id; numPages = 1; numEntries = 0; - dataBytesPerPage = pager->getUsablePageSize() - sizeof(RawPage); usesExtents = extent; pagesPerExtent = pager->getPagesPerExtent(); headReader.init(this, Cursor::POP, newPageID, false, false, newPageID, 0); @@ -1059,7 +1052,6 @@ public: queueID = qs.queueID; numPages = qs.numPages; numEntries = qs.numEntries; - dataBytesPerPage = pager->getUsablePageSize() - sizeof(RawPage); usesExtents = qs.usesExtents; pagesPerExtent = pager->getPagesPerExtent(); headReader.init(this, Cursor::POP, qs.headPageID, loadExtents, false, qs.tailPageID, qs.headOffset); @@ -1113,37 +1105,35 @@ public: results.reserve(results.arena(), self->pagesPerExtent * self->pager->getPhysicalPageSize() / sizeof(T)); // Loop over all the pages in this extent - int pageIdx = 0; + state int pageIdx = 0; loop { // Position the page pointer to current page in the extent - Reference page = + state Reference page = c.page->subPage(pageIdx++ * self->pager->getPhysicalPageSize(), self->pager->getLogicalPageSize()); - debug_printf("FIFOQueue::Cursor(%s) peekALLExt %s. Offset %d, CalculateChecksum %d ChecksumInPage %d\n", + debug_printf("FIFOQueue::Cursor(%s) peekALLExt %s. Offset %d\n", c.toString().c_str(), toString(c.pageID).c_str(), - c.pageID * self->pager->getPhysicalPageSize(), - page->calculateChecksum(c.pageID), - page->getChecksum()); - if (!page->verifyChecksum(c.pageID)) { - debug_printf("FIFOQueue::Cursor(%s) peekALLExt checksum failed for %s. Offset %d, " - "CalculateChecksum %d ChecksumInPage %d\n", - c.toString().c_str(), - toString(c.pageID).c_str(), - c.pageID * self->pager->getPhysicalPageSize(), - page->calculateChecksum(c.pageID), - page->getChecksum()); - Error e = checksum_failed(); + c.pageID * self->pager->getPhysicalPageSize()); + + try { + page->postRead(c.pageID); + + } catch (Error& e) { TraceEvent(SevError, "RedwoodChecksumFailed") .detail("PageID", c.pageID) .detail("PageSize", self->pager->getPhysicalPageSize()) .detail("Offset", c.pageID * self->pager->getPhysicalPageSize()) - .detail("CalculatedChecksum", page->calculateChecksum(c.pageID)) - .detail("ChecksumInPage", page->getChecksum()) .error(e); - throw e; + + debug_printf("FIFOQueue::Cursor(%s) peekALLExt subPage error=%s for %s. Offset %d ", + c.toString().c_str(), + e.what(), + toString(c.pageID).c_str(), + c.pageID * self->pager->getPhysicalPageSize()); + throw; } - RawPage* p = (RawPage*)(page->begin()); + PageHeader* p = (PageHeader*)(page->data()); int bytesRead; // Now loop over all entries inside the current page @@ -1322,7 +1312,7 @@ public: self->tailPageNewExtent = true; self->prevExtentEndPageID = invalidLogicalPageID; } else { - auto p = self->tailWriter.raw(); + auto p = self->tailWriter.header(); debug_printf( "FIFOQueue(%s) newTailPage tailWriterPage %u extentCurPageID %u, extentEndPageID %u\n", self->name.c_str(), @@ -1350,7 +1340,7 @@ public: workPending = true; } else { if (self->usesExtents) { - auto p = self->tailWriter.raw(); + auto p = self->tailWriter.header(); self->prevExtentEndPageID = p->extentEndPageID; self->tailPageNewExtent = false; debug_printf("FIFOQueue(%s) newTailPage tailPageNewExtent: %d prevExtentEndPageID: %u " @@ -1434,7 +1424,6 @@ public: int64_t numPages; int64_t numEntries; - int dataBytesPerPage; int pagesPerExtent; bool usesExtents; bool tailPageNewExtent; @@ -2028,6 +2017,9 @@ public: typedef std::map VersionToPageMapT; typedef std::unordered_map PageToVersionedMapT; + static constexpr PhysicalPageID primaryHeaderPageID = 0; + static constexpr PhysicalPageID backupHeaderPageID = 1; + #pragma pack(push, 1) struct DelayedFreePage { Version version; @@ -2148,7 +2140,7 @@ public: } void updateCommittedHeader() { - memcpy(lastCommittedHeaderPage->mutate(), headerPage->begin(), smallestPhysicalBlock); + memcpy(lastCommittedHeaderPage->mutateData(), headerPage->data(), lastCommittedHeaderPage->dataSize()); } ACTOR static Future recover(DWALPager* self) { @@ -2168,7 +2160,8 @@ public: // Header page is always treated as having a page size of smallestPhysicalBlock self->setPageSize(smallestPhysicalBlock); self->lastCommittedHeaderPage = self->newPageBuffer(); - self->pLastCommittedHeader = (Header*)self->lastCommittedHeaderPage->begin(); + self->lastCommittedHeaderPage->init(EncodingType::XXHash64, PageType::BackupHeaderPage, 0); + self->pLastCommittedHeader = (Header*)self->lastCommittedHeaderPage->data(); state int64_t fileSize = 0; if (exists) { @@ -2186,33 +2179,39 @@ public: state bool recoveredHeader = false; - // Read physical page 0 directly - wait(store(self->headerPage, self->readHeaderPage(self, 0))); - - // If the checksum fails for the header page, try to recover committed header backup from page 1 - if (!self->headerPage->verifyChecksum(0)) { - TraceEvent(SevWarn, "RedwoodRecoveringHeader").detail("Filename", self->filename); - - wait(store(self->headerPage, self->readHeaderPage(self, 1))); - - if (!self->headerPage->verifyChecksum(1)) { - if (g_network->isSimulated()) { - // TODO: Detect if process is being restarted and only throw injected if so? - throw io_error().asInjectedFault(); - } - - Error e = checksum_failed(); - TraceEvent(SevError, "RedwoodRecoveryFailed").detail("Filename", self->filename).error(e); - throw e; + // Try to read primary header + try { + wait(store(self->headerPage, self->readHeaderPage(primaryHeaderPageID))); + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) { + throw; } - recoveredHeader = true; + TraceEvent(SevWarn, "RedwoodHeaderError") + .detail("Filename", self->filename) + .detail("PageID", primaryHeaderPageID) + .error(e); } - self->pHeader = (Header*)self->headerPage->begin(); + // If primary header wasn't valid, try backup header + if (!self->headerPage.isValid()) { + // Try to read backup header + try { + wait(store(self->headerPage, self->readHeaderPage(backupHeaderPageID))); + recoveredHeader = true; + } catch (Error& e) { + TraceEvent(SevWarn, "RedwoodRecoveryError") + .detail("Filename", self->filename) + .detail("PageID", backupHeaderPageID) + .error(e); + throw; + } + } + + self->pHeader = (Header*)self->headerPage->data(); if (self->pHeader->formatVersion != Header::FORMAT_VERSION) { - Error e = wrong_format_version(); - TraceEvent(SevWarn, "RedwoodRecoveryFailedWrongVersion") + Error e = unsupported_format_version(); + TraceEvent(SevWarn, "RedwoodRecoveryError") .detail("Filename", self->filename) .detail("Version", self->pHeader->formatVersion) .detail("ExpectedVersion", Header::FORMAT_VERSION) @@ -2316,7 +2315,8 @@ public: debug_printf("DWALPager(%s) creating new pager\n", self->filename.c_str()); self->headerPage = self->newPageBuffer(); - self->pHeader = (Header*)self->headerPage->begin(); + self->headerPage->init(EncodingType::XXHash64, PageType::HeaderPage, 0); + self->pHeader = (Header*)self->headerPage->data(); // Now that the header page has been allocated, set page size to desired self->setPageSize(self->desiredPageSize); @@ -2357,9 +2357,9 @@ public: self->pHeader->remapQueue = self->remapQueue.getState(); // Set remaining header bytes to \xff - memset(self->headerPage->mutate() + self->pHeader->size(), + memset(self->headerPage->mutateData() + self->pHeader->size(), 0xff, - self->headerPage->size() - self->pHeader->size()); + self->headerPage->dataSize() - self->pHeader->size()); // There is no previously committed header, but the current header state is sufficient to use as the backup // header for the next commit, which if recovered would result in a valid empty pager at version 0. @@ -2440,15 +2440,10 @@ public: return id; } - Reference newPageBuffer(size_t size = 1) override { - return Reference(new ArenaPage(logicalPageSize * size, physicalPageSize * size)); + Reference newPageBuffer(size_t blocks = 1) override { + return ArenaPage::create(logicalPageSize * blocks, physicalPageSize * blocks); } - // Returns the usable size of pages returned by the pager (i.e. the size of the page that isn't pager overhead). - // For a given pager instance, separate calls to this function must return the same value. - // TODO: This is abstraction breaking. This should probably be stored as a member, calculated once on construction - // by creating an ArenaPage and getting its usable size. - int getUsablePageSize() const override { return logicalPageSize - sizeof(ArenaPage::Checksum); } int getPhysicalPageSize() const override { return physicalPageSize; } int getLogicalPageSize() const override { return logicalPageSize; } int getPagesPerExtent() const override { return pagesPerExtent; } @@ -2527,13 +2522,13 @@ public: Future newExtentPageID(QueueID queueID) override { return newExtentPageID_impl(this, queueID); } - ACTOR static Future writePhysicalPage_impl(DWALPager* self, - Void* data, - PagerEventReasons reason, - unsigned int level, - PhysicalPageID pageID, - int blockSize, - bool header) { + ACTOR static Future writePhysicalBlock(DWALPager* self, + uint8_t* data, + PagerEventReasons reason, + unsigned int level, + PhysicalPageID pageID, + int blockSize, + bool header) { state PriorityMultiLock::Lock lock = wait(self->ioLock.lock(header ? ioMaxPriority : ioMinPriority)); ++g_redwoodMetrics.metric.pagerDiskWrite; @@ -2542,7 +2537,7 @@ public: return Void(); } // Note: Not using forwardError here so a write error won't be discovered until commit time. - wait(self->pageFile->write(data, blockSize, (int64_t)pageID * blockSize)); + wait(self->pageFile->write((void*)data, blockSize, (int64_t)pageID * blockSize)); return Void(); } @@ -2555,25 +2550,22 @@ public: filename.c_str(), (header ? "writePhysicalHeader" : "writePhysical"), toString(pageIDs).c_str(), - page->begin()); - VALGRIND_MAKE_MEM_DEFINED(page->begin(), page->size()); - page->updateChecksum(pageIDs.front()); - debug_printf("DWALPager(%s) writePhysicalPage %s CalculatedChecksum=%d ChecksumInPage=%d\n", - filename.c_str(), - toString(pageIDs).c_str(), - page->calculateChecksum(pageIDs.front()), - page->getChecksum()); + page->data()); + + VALGRIND_MAKE_MEM_DEFINED(page->rawData(), page->rawSize()); + page->preWrite(pageIDs.front()); + debug_printf("DWALPager(%s) writePhysicalPage %s\n", filename.c_str(), toString(pageIDs).c_str()); int blockSize = header ? smallestPhysicalBlock : physicalPageSize; Future f; if (pageIDs.size() == 1) { - f = writePhysicalPage_impl(this, (Void*)page->mutate(), reason, level, pageIDs.front(), blockSize, header); + f = writePhysicalBlock(this, page->rawData(), reason, level, pageIDs.front(), blockSize, header); operations.add(f); return f; } std::vector> writers; for (int i = 0; i < pageIDs.size(); ++i) { - Future p = writePhysicalPage_impl( - this, (Void*)page->mutate() + i * blockSize, reason, level, pageIDs[i], blockSize, header); + Future p = writePhysicalBlock( + this, page->rawData() + (i * blockSize), reason, level, pageIDs[i], blockSize, header); writers.push_back(p); } f = waitForAll(writers); @@ -2756,14 +2748,15 @@ public: } void freeExtent(LogicalPageID pageID) override { freeExtent_impl(this, pageID); } - ACTOR static Future readPhysicalPage_impl(DWALPager* self, - uint8_t* data, - int blockSize, - int64_t offset, - int priority) { + ACTOR static Future readPhysicalBlock(DWALPager* self, + uint8_t* data, + int blockSize, + int64_t offset, + int priority) { state PriorityMultiLock::Lock lock = wait(self->ioLock.lock(std::min(priority, ioMaxPriority))); - int reader = wait(self->pageFile->read(data, blockSize, offset)); - return reader; + ++g_redwoodMetrics.metric.pagerDiskRead; + int bytes = wait(self->pageFile->read(data, blockSize, offset)); + return bytes; } // Read a physical page from the page file. Note that header pages use a page size of smallestPhysicalBlock @@ -2775,48 +2768,43 @@ public: bool header) { ASSERT(!self->memoryOnly); - // if (g_network->getCurrentTask() > TaskPriority::DiskRead) { - // wait(delay(0, TaskPriority::DiskRead)); - // } - state Reference page = - header ? Reference(new ArenaPage(smallestPhysicalBlock, smallestPhysicalBlock)) - : self->newPageBuffer(); + header ? ArenaPage::create(smallestPhysicalBlock, smallestPhysicalBlock) : self->newPageBuffer(); debug_printf("DWALPager(%s) op=readPhysicalStart %s ptr=%p\n", self->filename.c_str(), toString(pageID).c_str(), - page->begin()); + page->data()); - state PriorityMultiLock::Lock lock = wait(self->ioLock.lock(std::min(priority, ioMaxPriority))); - ++g_redwoodMetrics.metric.pagerDiskRead; - - // TODO: Could a dispatched read try to write to page after it has been destroyed if this actor is cancelled? - int blockSize = header ? smallestPhysicalBlock : self->physicalPageSize; - int readBytes = wait(self->pageFile->read(page->mutate(), blockSize, (int64_t)pageID * blockSize)); + int readBytes = wait(readPhysicalBlock(self, + page->rawData(), + page->rawSize(), + (int64_t)pageID * page->rawSize(), + std::min(priority, ioMaxPriority))); debug_printf("DWALPager(%s) op=readPhysicalComplete %s ptr=%p bytes=%d\n", self->filename.c_str(), toString(pageID).c_str(), - page->begin(), + page->data(), readBytes); - // Header reads are checked explicitly during recovery - if (!header) { - if (!page->verifyChecksum(pageID)) { - debug_printf( - "DWALPager(%s) checksum failed for %s\n", self->filename.c_str(), toString(pageID).c_str()); - Error e = checksum_failed(); - TraceEvent(SevError, "RedwoodChecksumFailed") - .detail("Filename", self->filename.c_str()) - .detail("PageID", pageID) - .detail("PageSize", self->physicalPageSize) - .detail("Offset", pageID * self->physicalPageSize) - .detail("CalculatedChecksum", page->calculateChecksum(pageID)) - .detail("ChecksumInPage", page->getChecksum()) - .error(e); - ASSERT(false); - throw e; - } + try { + page->postRead(pageID); + } catch (Error& e) { + // For header pages, error is a warning because recovery may still be possible + TraceEvent(header ? SevWarnAlways : SevError, "RedwoodPageError") + .detail("Filename", self->filename.c_str()) + .detail("PageID", pageID) + .detail("PageSize", self->physicalPageSize) + .detail("Offset", pageID * self->physicalPageSize) + .error(e); + + debug_printf("DWALPager(%s) checksum failed for %s with %s\n", + self->filename.c_str(), + toString(pageID).c_str(), + e.what()); + + throw; } + return page; } @@ -2833,16 +2821,14 @@ public: debug_printf("DWALPager(%s) op=readPhysicalStart %s ptr=%p\n", self->filename.c_str(), toString(pageIDs).c_str(), - page->begin()); + page->data()); - ++g_redwoodMetrics.metric.pagerDiskRead; // TODO: Could a dispatched read try to write to page after it has been destroyed if this actor is cancelled? state int blockSize = self->physicalPageSize; - state uint8_t* data = page->mutate(); std::vector> reads; for (int i = 0; i < pageIDs.size(); ++i) { - reads.push_back(readPhysicalPage_impl(self, data, blockSize, ((int64_t)pageIDs[i]) * blockSize, priority)); - data += blockSize; + reads.push_back(readPhysicalBlock( + self, page->rawData() + (i * blockSize), blockSize, ((int64_t)pageIDs[i]) * blockSize, priority)); } // wait for all the parallel read futures wait(waitForAll(reads)); @@ -2850,29 +2836,34 @@ public: debug_printf("DWALPager(%s) op=readPhysicalComplete %s ptr=%p bytes=%d\n", self->filename.c_str(), toString(pageIDs).c_str(), - page->begin(), + page->data(), pageIDs.size() * blockSize); - // Header reads are checked explicitly during recovery - if (!page->verifyChecksum(pageIDs.front())) { - debug_printf("DWALPager(%s) checksum failed for %s\n", self->filename.c_str(), toString(pageIDs).c_str()); - Error e = checksum_failed(); - TraceEvent(SevError, "RedwoodChecksumFailed") + + try { + page->postRead(pageIDs.front()); + } catch (Error& e) { + // For header pages, error is a warning because recovery may still be possible + TraceEvent(SevError, "RedwoodPageError") .detail("Filename", self->filename.c_str()) - .detail("PageID", pageIDs) + .detail("PageIDs", pageIDs) .detail("PageSize", self->physicalPageSize) .detail("Offset", pageIDs.front() * self->physicalPageSize) - .detail("CalculatedChecksum", page->calculateChecksum(pageIDs.front())) - .detail("ChecksumInPage", page->getChecksum()) .error(e); - ASSERT(false); - throw e; + + debug_printf("DWALPager(%s) checksum failed for %s with %s\n", + self->filename.c_str(), + toString(pageIDs).c_str(), + e.what()); + + throw; } + return page; } - static Future> readHeaderPage(DWALPager* self, PhysicalPageID pageID) { - debug_printf("DWALPager(%s) readHeaderPage %s\n", self->filename.c_str(), toString(pageID).c_str()); - return readPhysicalPage(self, pageID, ioMaxPriority, true); + Future> readHeaderPage(PhysicalPageID pageID) { + debug_printf("DWALPager(%s) readHeaderPage %s\n", filename.c_str(), toString(pageID).c_str()); + return readPhysicalPage(this, pageID, ioMaxPriority, true); } bool tryEvictPage(LogicalPageID logicalID, Version v) { @@ -3051,7 +3042,7 @@ public: if (!readSize) readSize = self->physicalExtentSize; - state Reference extent = Reference(new ArenaPage(self->logicalPageSize, readSize)); + state Reference extent = ArenaPage::create(self->logicalPageSize, readSize); // physicalReadSize is the size of disk read we intend to issue auto physicalReadSize = SERVER_KNOBS->REDWOOD_DEFAULT_EXTENT_READ_SIZE; @@ -3079,7 +3070,7 @@ public: debug_printf("DWALPager(%s) current offset %d\n", self->filename.c_str(), currentOffset); ++g_redwoodMetrics.metric.pagerDiskRead; reads.push_back( - self->pageFile->read(extent->mutate() + currentOffset, physicalReadSize, startOffset + currentOffset)); + self->pageFile->read(extent->rawData() + currentOffset, physicalReadSize, startOffset + currentOffset)); } // Handle the last read separately as it may be smaller than physicalReadSize @@ -3092,7 +3083,7 @@ public: lastReadSize); ++g_redwoodMetrics.metric.pagerDiskRead; reads.push_back( - self->pageFile->read(extent->mutate() + currentOffset, lastReadSize, startOffset + currentOffset)); + self->pageFile->read(extent->rawData() + currentOffset, lastReadSize, startOffset + currentOffset)); } // wait for all the parallel read futures for the given extent @@ -3101,7 +3092,7 @@ public: debug_printf("DWALPager(%s) op=readPhysicalExtentComplete %s ptr=%p bytes=%d file offset=%d\n", self->filename.c_str(), toString(pageID).c_str(), - extent->begin(), + extent->data(), readSize, (pageID * self->physicalPageSize)); @@ -3629,7 +3620,7 @@ private: #pragma pack(push, 1) // Header is the format of page 0 of the database struct Header { - static constexpr int FORMAT_VERSION = 8; + static constexpr int FORMAT_VERSION = 9; uint16_t formatVersion; uint32_t queueCount; uint32_t pageSize; @@ -4481,13 +4472,6 @@ struct BTreePage { } }; -static void makeEmptyRoot(Reference page) { - BTreePage* btpage = (BTreePage*)page->begin(); - btpage->height = 1; - btpage->kvBytes = 0; - btpage->tree()->build(page->size(), nullptr, nullptr, nullptr, nullptr); -} - struct BoundaryRefAndPage { Standalone lowerBound; Reference firstPage; @@ -4597,9 +4581,10 @@ public: #pragma pack(push, 1) struct MetaKey { - static constexpr int FORMAT_VERSION = 15; + static constexpr int FORMAT_VERSION = 16; // This serves as the format version for the entire tree, individual pages will not be versioned uint16_t formatVersion; + EncodingType encodingType; uint8_t height; LazyClearQueueT::QueueState lazyDeleteQueue; InPlaceArray root; @@ -4608,7 +4593,14 @@ public: void fromKeyRef(KeyRef k) { memcpy(this, k.begin(), k.size()); - ASSERT(formatVersion == FORMAT_VERSION); + if (formatVersion != FORMAT_VERSION) { + Error e = unsupported_format_version(); + TraceEvent(SevWarn, "RedwoodMetaKeyError") + .detail("Version", formatVersion) + .detail("ExpectedVersion", FORMAT_VERSION) + .error(e); + throw e; + } } std::string toString() { @@ -4678,14 +4670,25 @@ public: Version getLastCommittedVersion() const { return m_pager->getLastCommittedVersion(); } - VersionedBTree(IPager2* pager, std::string name) - : m_pager(pager), m_pBuffer(nullptr), m_mutationCount(0), m_name(name), m_pHeader(nullptr), m_headerSpace(0) { + VersionedBTree(IPager2* pager, std::string name, EncodingType t = EncodingType::XXHash64) + : m_pager(pager), m_pBuffer(nullptr), m_mutationCount(0), m_encodingType(t), m_name(name), m_pHeader(nullptr), + m_headerSpace(0) { m_lazyClearActor = 0; m_init = init_impl(this); m_latestCommit = m_init; } + Reference makeEmptyRoot() { + Reference page = m_pager->newPageBuffer(); + page->init(m_encodingType, PageType::BTreeNode, 1); + BTreePage* btpage = (BTreePage*)page->data(); + btpage->height = 1; + btpage->kvBytes = 0; + btpage->tree()->build(page->dataSize(), nullptr, nullptr, nullptr, nullptr); + return page; + } + ACTOR static Future incrementalLazyClear(VersionedBTree* self) { ASSERT(self->m_lazyClearActor.isReady()); self->m_lazyClearStop = false; @@ -4724,7 +4727,7 @@ public: for (i = 0; i < entries.size(); ++i) { Reference p = wait(entries[i].second); const LazyClearQueueEntry& entry = entries[i].first; - const BTreePage& btPage = *(BTreePage*)p->begin(); + const BTreePage& btPage = *(BTreePage*)p->data(); ASSERT(btPage.height == entry.height); auto& metrics = g_redwoodMetrics.level(entry.height).metrics; @@ -4791,13 +4794,13 @@ public: ACTOR static Future init_impl(VersionedBTree* self) { wait(self->m_pager->init()); self->m_pBuffer.reset(new MutationBuffer()); - // TODO: Get actual max MetaKey size limit from Pager - self->m_headerSpace = self->m_pager->getUsablePageSize(); - self->m_pHeader = (MetaKey*)new uint8_t[self->m_headerSpace]; - self->m_blockSize = self->m_pager->getUsablePageSize(); + self->m_blockSize = self->m_pager->getLogicalPageSize(); self->m_newOldestVersion = self->m_pager->getOldestReadableVersion(); + self->m_headerSpace = ArenaPage::getUsableSize(self->m_blockSize, EncodingType::XXHash64); + self->m_pHeader = (MetaKey*)new uint8_t[self->m_headerSpace]; + debug_printf("Recovered pager to version %" PRId64 ", oldest version is %" PRId64 "\n", self->m_newOldestVersion); @@ -4805,13 +4808,13 @@ public: if (meta.size() == 0) { // Create new BTree self->m_pHeader->formatVersion = MetaKey::FORMAT_VERSION; + self->m_pHeader->encodingType = BUGGIFY ? EncodingType::XOREncryption : EncodingType::XXHash64; LogicalPageID id = wait(self->m_pager->newPageID()); BTreePageIDRef newRoot((LogicalPageID*)&id, 1); debug_printf("new root %s\n", toString(newRoot).c_str()); self->m_pHeader->root.set(newRoot, self->m_headerSpace - sizeof(MetaKey)); self->m_pHeader->height = 1; - Reference page = self->m_pager->newPageBuffer(); - makeEmptyRoot(page); + Reference page = self->makeEmptyRoot(); self->m_pager->updatePage(PagerEventReasons::MetaData, nonBtreeLevel, newRoot, page); LogicalPageID newQueuePage = wait(self->m_pager->newPageID()); @@ -5124,6 +5127,7 @@ private: Reference snapshot; }; + EncodingType m_encodingType; Version m_newOldestVersion; Future m_latestCommit; Future m_init; @@ -5139,17 +5143,19 @@ private: Future m_lazyClearActor; bool m_lazyClearStop; - // Describes a range of a vector of records that should be built into a BTreePage + // Describes a range of a vector of records that should be built into a single BTreePage struct PageToBuild { - PageToBuild(int index, int blockSize) - : startIndex(index), count(0), pageSize(blockSize), - bytesLeft(blockSize - sizeof(BTreePage) - sizeof(BTreePage::BinaryTree)), + PageToBuild(int index, int blockSize, EncodingType t) + : startIndex(index), count(0), pageSize(blockSize), bytesLeft(ArenaPage::getUsableSize(blockSize, t)), largeDeltaTree(pageSize > BTreePage::BinaryTree::SmallSizeLimit), blockSize(blockSize), blockCount(1), kvBytes(0) {} + PageToBuild next(EncodingType t) { return PageToBuild(endIndex(), blockSize, t); } + int startIndex; // Index of the first record int count; // Number of records added to the page - int pageSize; // Page size required to hold a BTreePage of the added records, which is a multiple of blockSize + int pageSize; // Page or Multipage size required to hold a BTreePage of the added records, which is a multiple + // of blockSize int bytesLeft; // Bytes in pageSize that are unused by the BTreePage so far bool largeDeltaTree; // Whether or not the tree in the generated page is in the 'large' size range int blockSize; // Base block size by which pageSize can be incremented @@ -5249,13 +5255,16 @@ private: } }; +#warning update forensic fields +#warning update forensic fields on second write + // Scans a vector of records and decides on page split points, returning a vector of 1+ pages to build - static std::vector splitPages(const RedwoodRecordRef* lowerBound, - const RedwoodRecordRef* upperBound, - int prefixLen, - VectorRef records, - unsigned int height, - int blockSize) { + std::vector splitPages(const RedwoodRecordRef* lowerBound, + const RedwoodRecordRef* upperBound, + int prefixLen, + VectorRef records, + unsigned int height) { + debug_printf("splitPages height=%d records=%d\n\tlowerBound=%s\n\tupperBound=%s\n", height, records.size(), @@ -5276,7 +5285,7 @@ private: deltaSizes[i] = records[i].deltaSize(records[i - 1], prefixLen, true); } - PageToBuild p(0, blockSize); + PageToBuild p(0, m_blockSize, m_encodingType); for (int i = 0; i < records.size(); ++i) { bool force = p.count < minRecords || p.slackFraction() > maxSlack; @@ -5292,7 +5301,7 @@ private: if (!p.addRecord(records[i], deltaSizes[i], force)) { pages.push_back(p); - p = PageToBuild(p.endIndex(), blockSize); + p = p.next(m_encodingType); p.addRecord(records[i], deltaSizes[i], true); } } @@ -5341,7 +5350,7 @@ private: state int prefixLen = lowerBound->getCommonPrefixLen(*upperBound); state std::vector pagesToBuild = - splitPages(lowerBound, upperBound, prefixLen, entries, height, self->m_blockSize); + self->splitPages(lowerBound, upperBound, prefixLen, entries, height); debug_printf("splitPages returning %s\n", toString(pagesToBuild).c_str()); // Lower bound of the page being added to @@ -5393,17 +5402,11 @@ private: pageUpperBound.truncate(commonPrefix + 1); } - state Reference pages; - BTreePage* btPage; + state Reference page = self->m_pager->newPageBuffer(p.blockCount); + page->init( + self->m_encodingType, (p.blockCount == 1) ? PageType::BTreeNode : PageType::BTreeSuperNode, height); - if (p.blockCount == 1) { - Reference page = self->m_pager->newPageBuffer(); - btPage = (BTreePage*)page->mutate(); - pages = std::move(page); - } else { - ASSERT(p.blockCount > 1); - btPage = (BTreePage*)new uint8_t[p.pageSize]; - } + BTreePage* btPage = (BTreePage*)page->mutateData(); btPage->height = height; btPage->kvBytes = p.kvBytes; @@ -5418,6 +5421,10 @@ private: state int written = btPage->tree()->build( deltaTreeSpace, &entries[p.startIndex], &entries[endIndex], &pageLowerBound, &pageUpperBound); + // Mark the slack in the page as defined + VALGRIND_MAKE_MEM_DEFINED((uint8_t*)btPage->tree() + written, + page->dataSize() - written - ((uint8_t*)btPage->tree() - page->data())); + if (written > deltaTreeSpace) { debug_printf("ERROR: Wrote %d bytes to page %s deltaTreeSpace=%d\n", written, @@ -5436,21 +5443,6 @@ private: metrics.buildStoredPctSketch->samplePercentage(p.kvFraction()); metrics.buildItemCountSketch->sampleRecordCounter(p.count); - // Create chunked pages - // TODO: Avoid copying page bytes, but this is not trivial due to how pager checksums are currently handled. - if (p.blockCount != 1) { - // Mark the slack in the page buffer as defined - VALGRIND_MAKE_MEM_DEFINED(((uint8_t*)btPage) + written, (p.blockCount * p.blockSize) - written); - Reference page = self->m_pager->newPageBuffer(p.blockCount); - const uint8_t* rptr = (const uint8_t*)btPage; - for (int b = 0; b < p.blockCount; ++b) { - memcpy(page->mutate() + b * p.blockSize, rptr, p.blockSize); - rptr += p.blockSize; - } - pages = std::move(page); - delete[](uint8_t*) btPage; - } - // Write this btree page, which is made of 1 or more pager pages. state BTreePageIDRef childPageID; @@ -5458,7 +5450,7 @@ private: // then try to update the page atomically so its logical page ID does not change if (pagesToBuild.size() == 1 && p.blockCount == 1 && previousID.size() == 1) { LogicalPageID id = wait( - self->m_pager->atomicUpdatePage(PagerEventReasons::Commit, height, previousID.front(), pages, v)); + self->m_pager->atomicUpdatePage(PagerEventReasons::Commit, height, previousID.front(), page, v)); childPageID.push_back(records.arena(), id); } else { // Either the original node is being split, or it's not but it has changed BTreePageID size or @@ -5481,7 +5473,7 @@ private: emptyPages[i] = id; } debug_printf("writePages: newPages %s", toString(emptyPages).c_str()); - self->m_pager->updatePage(PagerEventReasons::Commit, height, emptyPages, pages); + self->m_pager->updatePage(PagerEventReasons::Commit, height, emptyPages, page); for (const LogicalPageID id : emptyPages) { childPageID.push_back(records.arena(), id); } @@ -5579,7 +5571,7 @@ private: page = std::move(p); } debug_printf("readPage() op=readComplete %s @%" PRId64 " \n", toString(id).c_str(), snapshot->getVersion()); - const BTreePage* btPage = (const BTreePage*)page->begin(); + const BTreePage* btPage = (const BTreePage*)page->data(); auto& metrics = g_redwoodMetrics.level(btPage->height).metrics; metrics.pageRead += 1; metrics.pageReadExt += (id.size() - 1); @@ -5593,7 +5585,7 @@ private: const RedwoodRecordRef& upperBound) { if (page->userData == nullptr) { debug_printf("Creating DecodeCache for ptr=%p lower=%s upper=%s\n", - page->begin(), + page->data(), lowerBound.toString().c_str(), upperBound.toString().c_str()); @@ -5603,7 +5595,7 @@ private: } return BTreePage::BinaryTree::Cursor((BTreePage::BinaryTree::DecodeCache*)page->userData, - ((BTreePage*)page->begin())->tree()); + ((BTreePage*)page->data())->tree()); } // Get cursor into a BTree node from a child link @@ -5614,7 +5606,7 @@ private: } return BTreePage::BinaryTree::Cursor((BTreePage::BinaryTree::DecodeCache*)page->userData, - ((BTreePage*)page->begin())->tree()); + ((BTreePage*)page->data())->tree()); } static void preLoadPage(IPagerSnapshot* snapshot, BTreePageIDRef pageIDs, int priority) { @@ -5648,7 +5640,7 @@ private: newID.resize(*arena, oldID.size()); if (REDWOOD_DEBUG) { - BTreePage* btPage = (BTreePage*)page->begin(); + BTreePage* btPage = (BTreePage*)page->data(); BTreePage::BinaryTree::DecodeCache* cache = (BTreePage::BinaryTree::DecodeCache*)page->userData; debug_printf_always( "updateBTreePage(%s, %s) %s\n", @@ -5659,7 +5651,7 @@ private: : btPage->toString(true, oldID, writeVersion, cache->lowerBound, cache->upperBound).c_str()); } - state unsigned int height = (unsigned int)((BTreePage*)page->begin())->height; + state unsigned int height = (unsigned int)((BTreePage*)page->data())->height; if (oldID.size() == 1) { LogicalPageID id = wait( self->m_pager->atomicUpdatePage(PagerEventReasons::Commit, height, oldID.front(), page, writeVersion)); @@ -5693,7 +5685,7 @@ private: newPage->userData = cache; newPage->userDataDestructor = [](void* cache) { ((BTreePage::BinaryTree::DecodeCache*)cache)->delref(); }; - debug_printf("cloneForUpdate(%p -> %p size=%d\n", page->begin(), newPage->begin(), page->size()); + debug_printf("cloneForUpdate(%p -> %p size=%d\n", page->data(), newPage->data(), page->dataSize()); return newPage; } @@ -5852,7 +5844,7 @@ private: bool changesMade; ParentInfo* parentInfo; - BTreePage* btPage() const { return (BTreePage*)page->begin(); } + BTreePage* btPage() const { return (BTreePage*)page->data(); } bool empty() const { if (updating) { @@ -6037,7 +6029,7 @@ private: // If the page is not in the cache, then no copy is needed so we will initialize pageCopy to page state Reference pageCopy; - state BTreePage* btPage = (BTreePage*)page->begin(); + state BTreePage* btPage = (BTreePage*)page->data(); ASSERT(height == btPage->height); ++g_redwoodMetrics.level(height).metrics.pageCommitStart; @@ -6083,7 +6075,7 @@ private: auto copyForUpdate = [&]() { if (!pageCopy.isValid()) { pageCopy = clonePageForUpdate(page); - btPage = (BTreePage*)pageCopy->begin(); + btPage = (BTreePage*)pageCopy->data(); cursor.switchTree(btPage->tree()); } }; @@ -6781,8 +6773,7 @@ private: if (all.newLinks.empty()) { debug_printf("Writing new empty root.\n"); LogicalPageID newRootID = wait(self->m_pager->newPageID()); - Reference page = self->m_pager->newPageBuffer(); - makeEmptyRoot(page); + Reference page = self->makeEmptyRoot(); self->m_pHeader->height = 1; VectorRef rootID((LogicalPageID*)&newRootID, 1); self->m_pager->updatePage(PagerEventReasons::Commit, self->m_pHeader->height, rootID, page); @@ -6837,7 +6828,7 @@ public: Standalone id; #endif - const BTreePage* btPage() const { return (BTreePage*)page->begin(); }; + const BTreePage* btPage() const { return (BTreePage*)page->data(); }; }; private: @@ -6868,7 +6859,7 @@ public: r += format("\n\t[Level=%d ID=%s ptr=%p Cursor=%s] ", height, id.c_str(), - path[i].page->begin(), + path[i].page->data(), path[i].cursor.valid() ? path[i].cursor.get().toString(path[i].btPage()->isLeaf()).c_str() : ""); if (height <= dumpHeight) { @@ -9013,13 +9004,13 @@ TEST_CASE(":/redwood/performance/mutationBuffer") { TEST_CASE(":/redwood/pager/ArenaPage") { Arena x; printf("Making p\n"); - Reference p(new ArenaPage(4096, 4096)); - printf("Made p=%p\n", p->begin()); + Reference p = ArenaPage::create(4096, 4096); + printf("Made p=%p\n", p->data()); printf("Clearing p\n"); p.clear(); printf("Making p\n"); - p = Reference(new ArenaPage(4096, 4096)); - printf("Made p=%p\n", p->begin()); + p = ArenaPage::create(4096, 4096); + printf("Made p=%p\n", p->data()); printf("Making x depend on p\n"); x.dependsOn(p->getArena()); printf("Clearing p\n"); @@ -9438,13 +9429,14 @@ TEST_CASE(":/redwood/correctness/pager/cow") { state LogicalPageID id = wait(pager->newPageID()); state VectorRef pageID(&id, 1); Reference p = pager->newPageBuffer(); - memset(p->mutate(), (char)id, p->size()); + p->init(EncodingType::XXHash64, PageType::QueuePageStandalone, 0); + memset(p->mutateData(), (char)id, p->dataSize()); pager->updatePage(PagerEventReasons::MetaData, nonBtreeLevel, pageID, p); pager->setMetaKey(LiteralStringRef("asdfasdf")); wait(pager->commit(pager->getLastCommittedVersion() + 1)); Reference p2 = wait(pager->readPage(PagerEventReasons::PointRead, nonBtreeLevel, id, ioMinPriority, true, false)); - printf("%s\n", StringRef(p2->begin(), p2->size()).toHexString().c_str()); + printf("%s\n", StringRef(p2->data(), p2->dataSize()).toHexString().c_str()); // TODO: Verify reads, do more writes and reads to make this a real pager validator diff --git a/fdbserver/art_impl.h b/fdbserver/art_impl.h index 22aef36690..3be4a9e759 100644 --- a/fdbserver/art_impl.h +++ b/fdbserver/art_impl.h @@ -166,7 +166,7 @@ void art_tree::art_bound_iterative(art_node* n, const KeyRef& k, int depth, art_ static stack_entry arena[ART_MAX_KEY_LEN]; // Single threaded implementation. - stack_entry *head = nullptr, *tmp, *curr_arena = arena; + stack_entry *head = nullptr, /**tmp,*/ *curr_arena = arena; int ret; art_node** child; unsigned char* key = (unsigned char*)k.begin(); @@ -198,7 +198,7 @@ void art_tree::art_bound_iterative(art_node* n, const KeyRef& k, int depth, art_ *result = minimum(next); break; } else { - tmp = head; + // tmp = head; head = head->prev; } } diff --git a/flow/error_definitions.h b/flow/error_definitions.h index e468e46801..35c947c820 100755 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -78,7 +78,7 @@ ERROR( wrong_connection_file, 1054, "Connection file mismatch") ERROR( version_already_compacted, 1055, "The requested changes have been compacted away") ERROR( local_config_changed, 1056, "Local configuration file has changed. Restart and apply these changes" ) ERROR( failed_to_reach_quorum, 1057, "Failed to reach quorum from configuration database nodes. Retry sending these requests" ) -ERROR( wrong_format_version, 1058, "Format version not recognized" ) +ERROR( unsupported_format_version, 1058, "Format version not supported" ) ERROR( unknown_change_feed, 1059, "Change feed not found" ) ERROR( change_feed_not_registered, 1060, "Change feed not registered" ) ERROR( granule_assignment_conflict, 1061, "Conflicting attempts to assign blob granules" ) From 4bc0cfe4af9ac95a4543f6c8e6cfb728a1f750bc Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Thu, 9 Dec 2021 02:21:50 -0800 Subject: [PATCH 04/54] Many bug fixes in new page API and usages of it. --- fdbserver/DeltaTree.h | 9 ++- fdbserver/IPager.h | 28 +++++-- fdbserver/VersionedBTree.actor.cpp | 124 +++++++++++++++++------------ flow/actorcompiler.h | 7 ++ 4 files changed, 109 insertions(+), 59 deletions(-) diff --git a/fdbserver/DeltaTree.h b/fdbserver/DeltaTree.h index 9ea758aee5..62313b7766 100644 --- a/fdbserver/DeltaTree.h +++ b/fdbserver/DeltaTree.h @@ -1226,7 +1226,10 @@ public: return item.get(); } - void switchTree(DeltaTree2* newTree) { + // Switch the cursor to point to a new DeltaTree + // if noReset is true, then the current decoded item will NOT be reset, so be sure that the original tree will + // have a lifetime that exceeds this cursor as the decoded item may point into the original tree. + void switchTree(DeltaTree2* newTree, bool noReset = false) { tree = newTree; // Reset item because it may point into tree memory item.reset(); @@ -1652,6 +1655,10 @@ public: nodeBytesUsed = 0; } nodeBytesFree = spaceAvailable - size(); +#ifdef VALGRIND + // Mark unused available space as defined + VALGRIND_MAKE_MEM_DEFINED((uint8_t*)this + size(), nodeBytesFree); +#endif return size(); } diff --git a/fdbserver/IPager.h b/fdbserver/IPager.h index d39122e529..3f7d3af348 100644 --- a/fdbserver/IPager.h +++ b/fdbserver/IPager.h @@ -31,14 +31,10 @@ #define XXH_INLINE_ALL #include "flow/xxhash.h" -#ifndef VALGRIND -#define VALGRIND_MAKE_MEM_UNDEFINED(x, y) -#define VALGRIND_MAKE_MEM_DEFINED(x, y) -#endif - typedef uint32_t LogicalPageID; typedef uint32_t PhysicalPageID; #define invalidLogicalPageID std::numeric_limits::max() +#define invalidPhysicalPageID std::numeric_limits::max() typedef uint32_t QueueID; #define invalidQueueID std::numeric_limits::max() @@ -113,8 +109,10 @@ public: if (bufferSize > 0) { buffer = (uint8_t*)arena.allocate4kAlignedBuffer(bufferSize); +#ifdef VALGRIND // Mark any unused page portion defined VALGRIND_MAKE_MEM_DEFINED(buffer + logicalSize, bufferSize - logicalSize); +#endif } else { buffer = nullptr; } @@ -231,6 +229,7 @@ public: // Initialize the header for a new page to be populated soon and written to disk void init(EncodingType t, PageType pageType, uint8_t pageSubType) { + encodingType = t; Reader next{ buffer }; VersionHeader* vh = next; // Only the latest header version is written. @@ -249,7 +248,7 @@ public: throw unsupported_format_version(); } - next.skip(); + next.skip