Bug fixes in Redwood. BTree height was not being reset when a new empty root is written. IKeyValueStore wrapper was not obeying the row limit in a reverse range query. Added yields to and delays to break up tasks and set IO priorities.

This commit is contained in:
Stephen Atherton 2019-10-25 14:52:06 -07:00
parent 0e51a248b4
commit 2ee1782c19
1 changed files with 22 additions and 6 deletions

View File

@ -1092,6 +1092,10 @@ public:
// If the user chosen physical page size is larger, then there will be a gap of unused space after
// between the end of page 1 and the start of page 2.
ACTOR static Future<Reference<IPage>> readHeaderPage(COWPager *self, PhysicalPageID pageID) {
if(g_network->getCurrentTask() > TaskPriority::DiskRead) {
wait(delay(0, TaskPriority::DiskRead));
}
state Reference<IPage> page(new FastAllocatedPage(smallestPhysicalBlock, smallestPhysicalBlock));
int readBytes = wait(self->pageFile->read(page->mutate(), smallestPhysicalBlock, (int64_t)pageID * smallestPhysicalBlock));
debug_printf("COWPager(%s) header op=read_complete %s bytes=%d\n", self->filename.c_str(), toString(pageID).c_str(), readBytes);
@ -1100,6 +1104,10 @@ public:
}
ACTOR static Future<Reference<IPage>> readPhysicalPage(COWPager *self, PhysicalPageID pageID) {
if(g_network->getCurrentTask() > TaskPriority::DiskRead) {
wait(delay(0, TaskPriority::DiskRead));
}
state Reference<IPage> page = self->newPageBuffer();
debug_printf("COWPager(%s) op=read_physical_start %s\n", self->filename.c_str(), toString(pageID).c_str());
int readBytes = wait(self->pageFile->read(page->mutate(), self->physicalPageSize, (int64_t)pageID * self->physicalPageSize));
@ -1200,11 +1208,17 @@ public:
debug_printf("COWPager(%s) Syncing\n", self->filename.c_str());
// Sync everything except the header
if(g_network->getCurrentTask() > TaskPriority::DiskWrite) {
wait(delay(0, TaskPriority::DiskWrite));
}
wait(self->pageFile->sync());
debug_printf("COWPager(%s) commit version %" PRId64 " sync 1\n", self->filename.c_str(), self->pHeader->committedVersion);
// Update header on disk and sync again.
wait(self->writeHeaderPage(0, self->headerPage));
if(g_network->getCurrentTask() > TaskPriority::DiskWrite) {
wait(delay(0, TaskPriority::DiskWrite));
}
wait(self->pageFile->sync());
debug_printf("COWPager(%s) commit version %" PRId64 " sync 2\n", self->filename.c_str(), self->pHeader->committedVersion);
@ -2275,10 +2289,10 @@ struct BTreePage {
}
};
static void makeEmptyPage(Reference<IPage> page, uint8_t newFlags) {
static void makeEmptyRoot(Reference<IPage> page) {
BTreePage *btpage = (BTreePage *)page->begin();
btpage->formatVersion = BTreePage::FORMAT_VERSION;
btpage->flags = newFlags;
btpage->flags = BTreePage::IS_LEAF;
btpage->height = 1;
btpage->kvBytes = 0;
btpage->itemCount = 0;
@ -2641,7 +2655,7 @@ public:
self->m_header.height = 1;
++latest;
Reference<IPage> page = self->m_pager->newPageBuffer();
makeEmptyPage(page, BTreePage::IS_LEAF);
makeEmptyRoot(page);
self->m_pager->updatePage(id, page);
self->m_pager->setCommitVersion(latest);
@ -3232,6 +3246,7 @@ private:
childPageID.push_back(records.arena(), id);
}
}
wait(yield());
// Update activity counts
++counts.pageWrites;
@ -3331,7 +3346,7 @@ private:
debug_printf("readPage() op=readForDeferredClear %s @%" PRId64 " \n", toString(id).c_str(), snapshot->getVersion());
}
wait(delay(0, TaskPriority::DiskRead));
wait(yield());
state Reference<const IPage> page;
@ -3815,7 +3830,8 @@ private:
debug_printf("Writing new empty root.\n");
LogicalPageID newRootID = wait(self->m_pager->newPageID());
Reference<IPage> page = self->m_pager->newPageBuffer();
makeEmptyPage(page, BTreePage::IS_LEAF);
makeEmptyRoot(page);
self->m_header.height = 1;
self->m_pager->updatePage(newRootID, page);
rootPageID = BTreePageID((LogicalPageID *)&newRootID, 1);
}
@ -4513,7 +4529,7 @@ public:
KeyValueRef kv(KeyRef(result.arena(), cur->getKey()), ValueRef(result.arena(), cur->getValue()));
accumulatedBytes += kv.expectedSize();
result.push_back(result.arena(), kv);
if(--rowLimit == 0 || accumulatedBytes >= byteLimit) {
if(++rowLimit == 0 || accumulatedBytes >= byteLimit) {
break;
}
wait(cur->prev(true));