Added format versioning to COWPager page, header, BTreePage, BTree meta record. Added height to BTree pages.

This commit is contained in:
Stephen Atherton 2019-09-05 00:47:57 -07:00
parent b19ef86ab9
commit be37a7c01d
2 changed files with 100 additions and 24 deletions

View File

@ -174,8 +174,8 @@ public:
// call freePage(pageID), and return the new page id. Otherwise the pageID argument will be returned.
virtual Future<LogicalPageID> atomicUpdatePage(LogicalPageID pageID, Reference<IPage> data) = 0;
// Free pageID to be used again after the next commit
virtual void freePage(LogicalPageID pageID) = 0;
// Free pageID to be used again after version v is durable
virtual void freePage(LogicalPageID pageID, Version v) = 0;
// Returns the data for a page by LogicalPageID
// The data returned will be the later of

View File

@ -148,13 +148,17 @@ public:
index = 0;
page = queue->pager->newPageBuffer();
setNext(0, 0);
raw()->endIndex = 0;
auto p = raw();
p->formatVersion = RawPage::FORMAT_VERSION;
p->endIndex = 0;
}
void initNewHeadPage(LogicalPageID newPageID) {
page = queue->pager->newPageBuffer();
setNext(pageID, index);
raw()->endIndex = queue->itemsPerPage;
auto p = raw();
p->formatVersion = RawPage::FORMAT_VERSION;
p->endIndex = queue->itemsPerPage;
pageID = newPageID;
index = queue->itemsPerPage;
}
@ -169,6 +173,8 @@ public:
#pragma pack(push, 1)
struct RawPage {
static constexpr int FORMAT_VERSION = 1;
uint16_t formatVersion;
LogicalPageID nextPageID;
uint16_t nextIndex;
uint16_t endIndex;
@ -197,6 +203,7 @@ public:
debug_printf("FIFOQueue(%s): loading page id=%u index=%d\n", queue->name.c_str(), pageID, index);
return map(queue->pager->readPage(pageID), [=](Reference<IPage> p) {
page = p;
ASSERT(raw()->formatVersion == RawPage::FORMAT_VERSION);
return Void();
});
}
@ -346,7 +353,7 @@ public:
// freePage() must be called after setting the loading future because freePage() might pop from this
// queue recursively if the pager's free list is being stored in this queue.
queue->pager->freePage(oldPageID);
queue->pager->freePage(oldPageID, 0);
}
return Optional<T>(result);
@ -840,7 +847,7 @@ public:
self->setPageSize(self->desiredPageSize);
// Write new header using desiredPageSize
self->pHeader->formatVersion = 1;
self->pHeader->formatVersion = Header::FORMAT_VERSION;
self->pHeader->committedVersion = 1;
// No meta key until a user sets one and commits
self->pHeader->setMetaKey(Key());
@ -971,8 +978,15 @@ public:
}
// Free pageID to be used again after the next commit
void freePage(LogicalPageID pageID) {
void freePage(LogicalPageID pageID, Version v) {
// If v is older than the oldest version still readable then mark pageID as free as of the next commit
if(v < oldestVersion.get()) {
freeList.pushBack(pageID);
}
else {
// Otherwise add it to the delayed free list
delayedFreeList.pushBack({v, pageID});
}
};
// Header pages use a page size of smallestPhysicalBlock
@ -1035,6 +1049,28 @@ public:
};
ACTOR static Future<Void> commit_impl(COWPager *self) {
state int addFront = 10 * deterministicRandom()->randomInt(0, 10);
state int addBack = 10 * deterministicRandom()->randomInt(0, 10);
state int remove = 10 * deterministicRandom()->randomInt(0, 20);
state int i;
for(i = 0; i < addBack; ++i) {
LogicalPageID id = wait(self->newPageID());
self->freeList.pushBack(id);
}
for(i = 0; i < addFront; ++i) {
LogicalPageID id = wait(self->newPageID());
self->freeList.pushFront(id);
}
for(i = 0; i < remove; ++i) {
Optional<LogicalPageID> id = wait(self->freeList.pop());
if(!id.present()) {
break;
}
}
// Write old committed header to Page 1
self->operations.add(self->writeHeaderPage(1, self->lastCommittedHeaderPage));
@ -1182,7 +1218,8 @@ private:
#pragma pack(push, 1)
// Header is the format of page 0 of the database
struct Header {
Version formatVersion;
static constexpr int FORMAT_VERSION = 1;
uint16_t formatVersion;
uint32_t pageSize;
int64_t pageCount;
FIFOQueue<LogicalPageID>::QueueState freeList;
@ -1198,6 +1235,7 @@ private:
ASSERT(key.size() < (smallestPhysicalBlock - sizeof(Header)));
metaKeySize = key.size();
memcpy((uint8_t *)this + sizeof(Header), key.begin(), key.size());
ASSERT(formatVersion == FORMAT_VERSION);
}
int size() const {
@ -1267,6 +1305,8 @@ private:
Reference<IAsyncFile> pageFile;
LogicalPageQueueT freeList;
// The delayed free list will be approximately in Version order.
// TODO: Make this an ordered container some day.
VersionedLogicalPageQueueT delayedFreeList;
struct SnapshotEntry {
@ -1619,7 +1659,7 @@ struct RedwoodRecordRef {
uint32_t start;
} chunk;
// If the value is a page ID it will be stored here
// If the value is a single page ID it will be stored here
uint8_t bigEndianPageIDSpace[sizeof(LogicalPageID)];
int expectedSize() const {
@ -2077,10 +2117,13 @@ struct BTreePage {
typedef DeltaTree<RedwoodRecordRef> BinaryTree;
static constexpr int FORMAT_VERSION = 1;
#pragma pack(push,1)
struct {
uint16_t formatVersion;
uint8_t flags;
uint16_t count;
uint8_t height;
uint16_t itemCount;
uint32_t kvBytes;
uint8_t extensionPageCount;
};
@ -2117,11 +2160,11 @@ struct BTreePage {
std::string toString(bool write, LogicalPageID id, Version ver, const RedwoodRecordRef *lowerBound, const RedwoodRecordRef *upperBound) const {
std::string r;
r += format("BTreePage op=%s id=%d ver=%" PRId64 " ptr=%p flags=0x%X count=%d kvBytes=%d extPages=%d\n lowerBound: %s\n upperBound: %s\n",
write ? "write" : "read", id, ver, this, (int)flags, (int)count, (int)kvBytes, (int)extensionPageCount,
r += format("BTreePage op=%s id=%d ver=%" PRId64 " ptr=%p flags=0x%X count=%d kvBytes=%d\n lowerBound: %s\n upperBound: %s\n",
write ? "write" : "read", id, ver, this, (int)flags, (int)itemCount, (int)kvBytes,
lowerBound->toString().c_str(), upperBound->toString().c_str());
try {
if(count > 0) {
if(itemCount > 0) {
// This doesn't use the cached reader for the page but it is only for debugging purposes
BinaryTree::Reader reader(&tree(), lowerBound, upperBound);
BinaryTree::Cursor c = reader.getCursor();
@ -2162,10 +2205,12 @@ struct BTreePage {
static void makeEmptyPage(Reference<IPage> page, uint8_t newFlags) {
BTreePage *btpage = (BTreePage *)page->begin();
btpage->formatVersion = BTreePage::FORMAT_VERSION;
btpage->flags = newFlags;
btpage->height = 1;
btpage->kvBytes = 0;
btpage->count = 0;
btpage->extensionPageCount = 0;
btpage->itemCount = 0;
btpage->tree().build(nullptr, nullptr, nullptr, nullptr);
VALGRIND_MAKE_MEM_DEFINED(page->begin() + btpage->tree().size(), page->size() - btpage->tree().size());
}
@ -2186,7 +2231,7 @@ struct BoundaryAndPage {
// Returns a std::vector of pairs of lower boundary key indices within kvPairs and encoded pages.
// TODO: Refactor this as an accumulator you add sorted keys to which makes pages.
static std::vector<BoundaryAndPage> buildPages(bool minimalBoundaries, const RedwoodRecordRef &lowerBound, const RedwoodRecordRef &upperBound, std::vector<RedwoodRecordRef> entries, uint8_t newFlags, IPager2 *pager) {
static std::vector<BoundaryAndPage> buildPages(bool minimalBoundaries, const RedwoodRecordRef &lowerBound, const RedwoodRecordRef &upperBound, const std::vector<RedwoodRecordRef> &entries, uint8_t newFlags, int height, IPager2 *pager) {
ASSERT(entries.size() > 0);
int usablePageSize = pager->getUsablePageSize();
@ -2315,10 +2360,12 @@ static std::vector<BoundaryAndPage> buildPages(bool minimalBoundaries, const Red
VALGRIND_MAKE_MEM_DEFINED(btPageMem, allocatedSize);
}
btPage->formatVersion = BTreePage::FORMAT_VERSION;
btPage->flags = newFlags;
btPage->height = height;
btPage->kvBytes = kvBytes;
btPage->count = i - start;
btPage->extensionPageCount = blockCount - 1;
btPage->itemCount = i - start;
int written = btPage->tree().build(&entries[start], &entries[i], &pageLowerBound, &pageUpperBound);
if(written > pageSize) {
@ -2378,17 +2425,25 @@ public:
typedef FIFOQueue<LazyDeleteQueueEntry> LazyDeleteQueueT;
#pragma pack(push, 1)
struct MetaKey {
static constexpr int FORMAT_VERSION = 1;
uint16_t formatVersion;
LogicalPageID root;
uint8_t height;
LazyDeleteQueueT::QueueState lazyDeleteQueue;
KeyRef asKeyRef() const {
return KeyRef((uint8_t *)this, sizeof(MetaKey));
}
void fromKeyRef(KeyRef k) {
ASSERT(k.size() == sizeof(MetaKey));
memcpy(this, k.begin(), k.size());
ASSERT(formatVersion == FORMAT_VERSION);
}
};
#pragma pack(pop)
struct Counts {
Counts() {
@ -2545,13 +2600,15 @@ public:
ACTOR static Future<Void> init_impl(VersionedBTree *self) {
state Version latest = wait(self->m_pager->getLatestVersion());
debug_printf("Recovered to version %" PRId64 "\n", latest);
debug_printf("Recovered pager to version %" PRId64 "\n", latest);
state Key meta = self->m_pager->getMetaKey();
if(meta.size() == 0) {
self->m_header.formatVersion = MetaKey::FORMAT_VERSION;
LogicalPageID newRoot = wait(self->m_pager->newPageID());
debug_printf("new root page id=%u\n", newRoot);
self->m_header.root = newRoot;
self->m_header.height = 1;
++latest;
Reference<IPage> page = self->m_pager->newPageBuffer();
makeEmptyPage(page, BTreePage::IS_LEAF);
@ -2570,6 +2627,9 @@ public:
self->m_header.fromKeyRef(meta);
self->m_lazyDeleteQueue.recover(self->m_pager, self->m_header.lazyDeleteQueue, "LazyDeleteQueueRecovered");
}
debug_printf("Recovered btree at version %" PRId64 " height=%d\n", latest, self->m_header.);
self->m_maxPartSize = std::min(255, self->m_pager->getUsablePageSize() / 5);
self->m_lastCommittedVersion = latest;
return Void();
@ -2771,6 +2831,19 @@ private:
return r + " }";
}
template<typename T>
static std::string toString(const VectorRef<T> &v) {
std::string r = "{ ";
for(auto &o : v) {
r += toString(o) + ", ";
}
return r + " }";
}
static std::string toString(LogicalPageID id) {
return format("%" PRId64, id);
}
// Represents a change to a single key - set, clear, or atomic op
struct SingleKeyMutation {
// Clear
@ -2957,10 +3030,11 @@ private:
childEntries.push_back(entry);
}
*pages = buildPages(false, dbBegin, dbEnd, childEntries, 0, self->m_pager);
debug_printf("Writing a new root level at version %" PRId64 " with %lu children across %lu pages\n", version, childEntries.size(), pages->size());
int newHeight = pPage->height + 1;
self->m_header.height = newHeight;
*pages = buildPages(false, dbBegin, dbEnd, childEntries, 0, newHeight, self->m_pager);
debug_printf_always("Writing a new root level at version %" PRId64 " height %d with %lu children across %lu pages\n", version, newHeight, childEntries.size(), pages->size());
std::vector<LogicalPageID> ids = wait(writePages(self, *pages, version, self->m_header.root, pPage, &dbEnd, nullptr));
*logicalPageIDs = std::move(ids);
}
@ -2968,6 +3042,7 @@ private:
return Void();
}
// Write replacement pages for the given originalID, return a set of internal page records that point to the pages.
ACTOR static Future<std::vector<LogicalPageID>> writePages(VersionedBTree *self, std::vector<BoundaryAndPage> pages, Version version, LogicalPageID originalID, const BTreePage *originalPage, const RedwoodRecordRef *upperBound, void *actor_debug) {
debug_printf("%p: writePages(): %u @%" PRId64 " -> %lu replacement pages\n", actor_debug, originalID, version, pages.size());
@ -3072,13 +3147,14 @@ private:
};
ACTOR static Future<Reference<const IPage>> readPage(Reference<IPagerSnapshot> snapshot, LogicalPageID id, const RedwoodRecordRef *lowerBound, const RedwoodRecordRef *upperBound) {
debug_printf("readPage() op=read id=%u @%" PRId64 " lower=%s upper=%s\n", id, snapshot->getVersion(), lowerBound->toString().c_str(), upperBound->toString().c_str());
debug_printf("readPage() op=read id=%s @%" PRId64 " lower=%s upper=%s\n", toString(id).c_str(), snapshot->getVersion(), lowerBound->toString().c_str(), upperBound->toString().c_str());
wait(delay(0, TaskPriority::DiskRead));
state Reference<const IPage> result = wait(snapshot->getPhysicalPage(id));
state int usablePageSize = result->size();
++counts.pageReads;
state const BTreePage *pTreePage = (const BTreePage *)result->begin();
ASSERT(pTreePage->formatVersion == BTreePage::FORMAT_VERSION);
if(pTreePage->extensionPageCount == 0) {
debug_printf("readPage() Found normal page for op=read id=%u @%" PRId64 "\n", id, snapshot->getVersion());
@ -3343,7 +3419,7 @@ private:
return c;
}
std::vector<BoundaryAndPage> newPages = buildPages(true, *lowerBound, *upperBound, merged, BTreePage::IS_LEAF, self->m_pager);
std::vector<BoundaryAndPage> newPages = buildPages(true, *lowerBound, *upperBound, merged, BTreePage::IS_LEAF, page->height, self->m_pager);
pages = std::move(newPages);
if(!self->singleVersion) {
@ -3522,7 +3598,7 @@ private:
entries.push_back(o);
}
std::vector<BoundaryAndPage> newPages = buildPages(false, *lowerBound, *upperBound, entries, 0, self->m_pager);
std::vector<BoundaryAndPage> newPages = buildPages(false, *lowerBound, *upperBound, entries, 0, page->height, self->m_pager);
pages = std::move(newPages);
writeVersion = self->getLastCommittedVersion() + 1;