Merge pull request #2333 from apple/release-6.2

Merge release 6.2 to master
This commit is contained in:
Steve Atherton 2019-11-08 11:01:07 -08:00 committed by GitHub
commit fe79742881
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 223 additions and 107 deletions

View File

@ -680,6 +680,7 @@ public:
// Create a fast-allocated page with size total bytes INCLUDING checksum
FastAllocatedPage(int size, int bufferSize) : logicalSize(size), bufferSize(bufferSize) {
buffer = (uint8_t *)allocateFast(bufferSize);
// Mark any unused page portion defined
VALGRIND_MAKE_MEM_DEFINED(buffer + logicalSize, bufferSize - logicalSize);
};
@ -733,10 +734,17 @@ private:
// Holds an index of recently used objects.
// ObjectType must have the method
// bool evictable() const;
// bool evictable() const; // return true if the entry can be evicted
// Future<Void> onEvictable() const; // ready when entry can be evicted
// indicating if it is safe to evict.
template<class IndexType, class ObjectType>
class ObjectCache {
struct Entry : public boost::intrusive::list_base_hook<> {
IndexType index;
ObjectType item;
};
public:
ObjectCache(int sizeLimit = 0) : sizeLimit(sizeLimit) {
}
@ -783,13 +791,34 @@ public:
return entry.item;
}
// Clears the cache and calls destroy() on each ObjectType
void destroy() {
evictionOrder.clear();
for(auto &entry : cache) {
entry.second.item.destroy();
// Clears the cache, saving the entries, and then waits for eachWaits for each item to be evictable and evicts it.
// The cache should not be Evicts all evictable entries
ACTOR static Future<Void> clear_impl(ObjectCache *self) {
state std::unordered_map<IndexType, Entry> cache;
state boost::intrusive::list<Entry> evictionOrder;
// Swap cache contents to local state vars
cache.swap(self->cache);
evictionOrder.swap(self->evictionOrder);
state typename boost::intrusive::list<Entry>::iterator i = evictionOrder.begin();
state typename boost::intrusive::list<Entry>::iterator iEnd = evictionOrder.begin();
while(i != iEnd) {
if(!i->item.evictable()) {
wait(i->item.onEvictable());
}
++i;
}
evictionOrder.clear();
cache.clear();
return Void();
}
Future<Void> clear() {
return clear_impl(this);
}
int count() const {
@ -798,16 +827,12 @@ public:
}
private:
struct Entry : public boost::intrusive::list_base_hook<> {
IndexType index;
ObjectType item;
};
int sizeLimit;
// TODO: Use boost intrusive unordered set instead, with a comparator that only considers entry.index
std::unordered_map<IndexType, Entry> cache;
boost::intrusive::list<Entry> evictionOrder;
};
ACTOR template<class T> Future<T> forwardError(Future<T> f, Promise<Void> target) {
@ -1090,37 +1115,52 @@ public:
}
Future<LogicalPageID> newPageID() override {
return forwardError(newPageID_impl(this), errorPromise);
return newPageID_impl(this);
}
Future<Void> writePhysicalPage(PhysicalPageID pageID, Reference<IPage> page, bool header = false) {
debug_printf("DWALPager(%s) op=%s %s ptr=%p\n", filename.c_str(), (header ? "writePhysicalHeader" : "writePhysical"), toString(pageID).c_str(), page->begin());
VALGRIND_MAKE_MEM_DEFINED(page->begin(), page->size());
((Page *)page.getPtr())->updateChecksum(pageID);
// Note: Not using forwardError here so a write error won't be discovered until commit time.
int blockSize = header ? smallestPhysicalBlock : physicalPageSize;
Future<Void> f = holdWhile(page, map(pageFile->write(page->begin(), blockSize, (int64_t)pageID * blockSize), [=](Void) {
debug_printf("DWALPager(%s) op=%s %s ptr=%p\n", filename.c_str(), (header ? "writePhysicalHeaderComplete" : "writePhysicalComplete"), toString(pageID).c_str(), page->begin());
return Void();
}));
operations.add(f);
return f;
}
Future<Void> writeHeaderPage(PhysicalPageID pageID, Reference<IPage> page) {
debug_printf("DWALPager(%s) header op=write %s\n", filename.c_str(), toString(pageID).c_str());
((Page *)page.getPtr())->updateChecksum(pageID);
return holdWhile(page, pageFile->write(page->begin(), smallestPhysicalBlock, (int64_t)pageID * smallestPhysicalBlock));
}
Future<Void> writePhysicalPage(PhysicalPageID pageID, Reference<IPage> page) {
debug_printf("DWALPager(%s) op=write %s\n", filename.c_str(), toString(pageID).c_str());
((Page *)page.getPtr())->updateChecksum(pageID);
return holdWhile(page, pageFile->write(page->begin(), physicalPageSize, (int64_t)pageID * physicalPageSize));
return writePhysicalPage(pageID, page, true);
}
void updatePage(LogicalPageID pageID, Reference<IPage> data) override {
// Get the cache entry for this page
PageCacheEntry &cacheEntry = pageCache.get(pageID);
debug_printf("DWALPager(%s) op=write %s cached=%d reading=%d writing=%d\n", filename.c_str(), toString(pageID).c_str(), cacheEntry.readFuture.isValid(), cacheEntry.reading(), cacheEntry.writing());
debug_printf("DWALPager(%s) op=write %s cached=%d reading=%d writing=%d\n", filename.c_str(), toString(pageID).c_str(), cacheEntry.initialized(), cacheEntry.initialized() && cacheEntry.reading(), cacheEntry.initialized() && cacheEntry.writing());
// If the page is still being read then it's not also being written because a write places
// the new content in the cache entry when the write is launched, not when it is completed.
// Any waiting readers should not see this write (though this might change)
if(cacheEntry.reading()) {
// the new content into readFuture when the write is launched, not when it is completed.
// Read/write ordering is being enforced waiting readers will not see the new write. This
// is necessary for remap erasure to work correctly since the oldest version of a page, located
// at the original page ID, could have a pending read when that version is expired and the write
// of the next newest version over top of the original page begins.
if(!cacheEntry.initialized()) {
cacheEntry.writeFuture = writePhysicalPage(pageID, data);
}
else if(cacheEntry.reading()) {
// Wait for the read to finish, then start the write.
cacheEntry.writeFuture = map(success(cacheEntry.readFuture), [=](Void) {
writePhysicalPage(pageID, data);
return Void();
});
}
// If the page is being written, wait for this write before issuing the new write
// If the page is being written, wait for this write before issuing the new write to ensure the
// writes happen in the correct order
else if(cacheEntry.writing()) {
cacheEntry.writeFuture = map(cacheEntry.writeFuture, [=](Void) {
writePhysicalPage(pageID, data);
@ -1131,9 +1171,6 @@ public:
cacheEntry.writeFuture = writePhysicalPage(pageID, data);
}
cacheEntry.writeFuture = forwardError(cacheEntry.writeFuture, errorPromise);
operations.add(cacheEntry.writeFuture);
// Always update the page contents immediately regardless of what happened above.
cacheEntry.readFuture = data;
}
@ -1175,46 +1212,44 @@ public:
}
};
// Header pages use a page size of smallestPhysicalBlock
// If the user chosen physical page size is larger, then there will be a gap of unused space after
// between the end of page 1 and the start of page 2.
ACTOR static Future<Reference<IPage>> readHeaderPage(DWALPager *self, PhysicalPageID pageID) {
// Read a physical page from the page file. Note that header pages use a page size of smallestPhysicalBlock
// If the user chosen physical page size is larger, then there will be a gap of unused space after the header pages
// and before the user-chosen sized pages.
ACTOR static Future<Reference<IPage>> readPhysicalPage(DWALPager *self, PhysicalPageID pageID, bool header = false) {
if(g_network->getCurrentTask() > TaskPriority::DiskRead) {
wait(delay(0, TaskPriority::DiskRead));
}
state Reference<IPage> page(new FastAllocatedPage(smallestPhysicalBlock, smallestPhysicalBlock));
int readBytes = wait(self->pageFile->read(page->mutate(), smallestPhysicalBlock, (int64_t)pageID * smallestPhysicalBlock));
debug_printf("DWALPager(%s) header op=read_complete %s bytes=%d\n", self->filename.c_str(), toString(pageID).c_str(), readBytes);
ASSERT(readBytes == smallestPhysicalBlock);
state Reference<IPage> page = header ? Reference<IPage>(new FastAllocatedPage(smallestPhysicalBlock, smallestPhysicalBlock)) : self->newPageBuffer();
debug_printf("DWALPager(%s) op=readPhysicalStart %s ptr=%p\n", self->filename.c_str(), toString(pageID).c_str(), page->begin());
int blockSize = header ? smallestPhysicalBlock : self->physicalPageSize;
// TODO: Could a dispatched read try to write to page after it has been destroyed if this actor is cancelled?
int readBytes = wait(self->pageFile->read(page->mutate(), blockSize, (int64_t)pageID * blockSize));
debug_printf("DWALPager(%s) op=readPhysicalComplete %s ptr=%p bytes=%d\n", self->filename.c_str(), toString(pageID).c_str(), page->begin(), readBytes);
// Header reads are checked explicitly during recovery
if(!header) {
Page *p = (Page *)page.getPtr();
if(!p->verifyChecksum(pageID)) {
debug_printf("DWALPager(%s) checksum failed for %s\n", self->filename.c_str(), toString(pageID).c_str());
Error e = checksum_failed();
TraceEvent(SevError, "DWALPagerChecksumFailed")
.detail("Filename", self->filename.c_str())
.detail("PageID", pageID)
.detail("PageSize", self->physicalPageSize)
.detail("Offset", pageID * self->physicalPageSize)
.detail("CalculatedChecksum", p->calculateChecksum(pageID))
.detail("ChecksumInPage", p->getChecksum())
.error(e);
throw e;
}
}
return page;
}
ACTOR static Future<Reference<IPage>> readPhysicalPage(DWALPager *self, PhysicalPageID pageID) {
if(g_network->getCurrentTask() > TaskPriority::DiskRead) {
wait(delay(0, TaskPriority::DiskRead));
}
state Reference<IPage> page = self->newPageBuffer();
debug_printf("DWALPager(%s) op=read_physical_start %s\n", self->filename.c_str(), toString(pageID).c_str());
int readBytes = wait(self->pageFile->read(page->mutate(), self->physicalPageSize, (int64_t)pageID * self->physicalPageSize));
debug_printf("DWALPager(%s) op=read_complete %s bytes=%d\n", self->filename.c_str(), toString(pageID).c_str(), readBytes);
ASSERT(readBytes == self->physicalPageSize);
Page *p = (Page *)page.getPtr();
if(!p->verifyChecksum(pageID)) {
debug_printf("DWALPager(%s) checksum failed for %s\n", self->filename.c_str(), toString(pageID).c_str());
Error e = checksum_failed();
TraceEvent(SevError, "DWALPagerChecksumFailed")
.detail("Filename", self->filename.c_str())
.detail("PageID", pageID)
.detail("PageSize", self->physicalPageSize)
.detail("Offset", pageID * self->physicalPageSize)
.detail("CalculatedChecksum", p->calculateChecksum(pageID))
.detail("ChecksumInPage", p->getChecksum())
.error(e);
throw e;
}
return page;
static Future<Reference<IPage>> readHeaderPage(DWALPager *self, PhysicalPageID pageID) {
return readPhysicalPage(self, pageID, true);
}
// Reads the most recent version of pageID either committed or written using updatePage()
@ -1222,23 +1257,24 @@ public:
// Use cached page if present, without triggering a cache hit.
// Otherwise, read the page and return it but don't add it to the cache
if(!cacheable) {
debug_printf("DWALPager(%s) op=read_nocache %s\n", filename.c_str(), toString(pageID).c_str());
debug_printf("DWALPager(%s) op=readUncached %s\n", filename.c_str(), toString(pageID).c_str());
PageCacheEntry *pCacheEntry = pageCache.getIfExists(pageID);
if(pCacheEntry != nullptr) {
debug_printf("DWALPager(%s) op=read_nocache_hit %s\n", filename.c_str(), toString(pageID).c_str());
debug_printf("DWALPager(%s) op=readUncachedHit %s\n", filename.c_str(), toString(pageID).c_str());
return pCacheEntry->readFuture;
}
debug_printf("DWALPager(%s) op=read_nocache_miss %s\n", filename.c_str(), toString(pageID).c_str());
debug_printf("DWALPager(%s) op=readUncachedMiss %s\n", filename.c_str(), toString(pageID).c_str());
return forwardError(readPhysicalPage(this, (PhysicalPageID)pageID), errorPromise);
}
PageCacheEntry &cacheEntry = pageCache.get(pageID);
debug_printf("DWALPager(%s) op=read %s cached=%d reading=%d writing=%d\n", filename.c_str(), toString(pageID).c_str(), cacheEntry.readFuture.isValid(), cacheEntry.reading(), cacheEntry.writing());
debug_printf("DWALPager(%s) op=read %s cached=%d reading=%d writing=%d\n", filename.c_str(), toString(pageID).c_str(), cacheEntry.initialized(), cacheEntry.initialized() && cacheEntry.reading(), cacheEntry.initialized() && cacheEntry.writing());
if(!cacheEntry.readFuture.isValid()) {
if(!cacheEntry.initialized()) {
debug_printf("DWALPager(%s) issuing actual read of %s\n", filename.c_str(), toString(pageID).c_str());
cacheEntry.readFuture = readPhysicalPage(this, (PhysicalPageID)pageID);
cacheEntry.writeFuture = Void();
}
cacheEntry.readFuture = forwardError(cacheEntry.readFuture, errorPromise);
@ -1310,10 +1346,6 @@ public:
// Read the data from the page that the original was mapped to
Reference<IPage> data = wait(self->readPage(p.get().newPageID, false));
// Some page reads will mark the unused portion of the page as undefined to catch bugs with valgrind.
// We are blindly copying the page data to a new location regardless of its format so mark all of it defined.
VALGRIND_MAKE_MEM_DEFINED(data->begin(), data->size());
// Write the data to the original page so it can be read using its original pageID
self->updatePage(p.get().originalPageID, data);
@ -1335,19 +1367,12 @@ public:
return Void();
}
ACTOR static Future<Void> commit_impl(DWALPager *self) {
debug_printf("DWALPager(%s) commit begin\n", self->filename.c_str());
// Write old committed header to Page 1
self->operations.add(self->writeHeaderPage(1, self->lastCommittedHeaderPage));
// Trigger the remap eraser to stop and then wait for it.
self->remapUndoStop = true;
wait(self->remapUndoFuture);
// Flush all queues so they have no operations pending.
ACTOR static Future<Void> flushQueues(DWALPager *self) {
ASSERT(self->remapUndoFuture.isReady());
// Flush remap queue separately, it's not involved in free page management
wait(self->remapQueue.flush());
self->pHeader->remapQueue = self->remapQueue.getState();
// Flush the free list and delayed free list queues together as they are used by freePage() and newPageID()
loop {
@ -1364,6 +1389,22 @@ public:
self->freeList.finishFlush();
self->delayedFreeList.finishFlush();
return Void();
}
ACTOR static Future<Void> commit_impl(DWALPager *self) {
debug_printf("DWALPager(%s) commit begin\n", self->filename.c_str());
// Write old committed header to Page 1
self->writeHeaderPage(1, self->lastCommittedHeaderPage);
// Trigger the remap eraser to stop and then wait for it.
self->remapUndoStop = true;
wait(self->remapUndoFuture);
wait(flushQueues(self));
self->pHeader->remapQueue = self->remapQueue.getState();
self->pHeader->freeList = self->freeList.getState();
self->pHeader->delayedFreeList = self->delayedFreeList.getState();
@ -1423,21 +1464,30 @@ public:
}
ACTOR void shutdown(DWALPager *self, bool dispose) {
debug_printf("DWALPager(%s) shutdown cancel recovery\n", self->filename.c_str());
self->recoverFuture.cancel();
debug_printf("DWALPager(%s) shutdown cancel commit\n", self->filename.c_str());
self->commitFuture.cancel();
debug_printf("DWALPager(%s) shutdown cancel remap\n", self->filename.c_str());
self->remapUndoFuture.cancel();
if(self->errorPromise.canBeSet()) {
debug_printf("DWALPager(%s) shutdown sending error\n", self->filename.c_str());
self->errorPromise.sendError(actor_cancelled()); // Ideally this should be shutdown_in_progress
}
self->operations.clear();
// Destroy the cache, cancelling reads and writes in progress
self->pageCache.destroy();
// Must wait for pending operations to complete, canceling them can cause a crash because the underlying
// operations may be uncancellable and depend on memory from calling scope's page reference
debug_printf("DWALPager(%s) shutdown wait for operations\n", self->filename.c_str());
wait(self->operations.signal());
debug_printf("DWALPager(%s) shutdown destroy page cache\n", self->filename.c_str());
wait(self->pageCache.clear());
// Unreference the file and clear
self->pageFile.clear();
if(dispose) {
debug_printf("DWALPager(%s) shutdown deleting file\n", self->filename.c_str());
wait(IAsyncFileSystem::filesystem()->incrementalDeleteFile(self->filename, true));
}
@ -1476,9 +1526,19 @@ public:
return StorageBytes(free, total, pagerSize, free + reusable);
}
// Get the number of pages in use but not by the pager itself.
ACTOR static Future<Void> getUserPageCount_cleanup(DWALPager *self) {
// Wait for the remap eraser to finish all of its work (not triggering stop)
wait(self->remapUndoFuture);
// Flush queues so there are no pending freelist operations
wait(flushQueues(self));
return Void();
}
// Get the number of pages in use by the pager's user
Future<int64_t> getUserPageCount() override {
return map(remapUndoFuture, [=](Void) {
return map(getUserPageCount_cleanup(this), [=](Void) {
int64_t userPages = pHeader->pageCount - 2 - freeList.numPages - freeList.numEntries - delayedFreeList.numPages - delayedFreeList.numEntries - remapQueue.numPages;
debug_printf("DWALPager(%s) userPages=%" PRId64 " totalPageCount=%" PRId64 " freeQueuePages=%" PRId64 " freeQueueCount=%" PRId64 " delayedFreeQueuePages=%" PRId64 " delayedFreeQueueCount=%" PRId64 " remapQueuePages=%" PRId64 " remapQueueCount=%" PRId64 "\n",
filename.c_str(), userPages, pHeader->pageCount, freeList.numPages, freeList.numEntries, delayedFreeList.numPages, delayedFreeList.numEntries, remapQueue.numPages, remapQueue.numEntries);
@ -1538,12 +1598,16 @@ private:
Future<Reference<IPage>> readFuture;
Future<Void> writeFuture;
bool initialized() const {
return readFuture.isValid();
}
bool reading() const {
return readFuture.isValid() && !readFuture.isReady();
return !readFuture.isReady();
}
bool writing() const {
return writeFuture.isValid() && !writeFuture.isReady();
return !writeFuture.isReady();
}
bool evictable() const {
@ -1551,9 +1615,8 @@ private:
return !reading() && !writing();
}
void destroy() {
readFuture.cancel();
writeFuture.cancel();
Future<Void> onEvictable() const {
return ready(readFuture) && writeFuture;
}
};
@ -2476,7 +2539,6 @@ static void makeEmptyRoot(Reference<IPage> page) {
btpage->kvBytes = 0;
btpage->itemCount = 0;
btpage->tree().build(nullptr, nullptr, nullptr, nullptr);
VALGRIND_MAKE_MEM_DEFINED(page->begin() + btpage->tree().size(), page->size() - btpage->tree().size());
}
BTreePage::BinaryTree::Reader * getReader(Reference<const IPage> page) {
@ -3354,12 +3416,17 @@ private:
// If flush then write a page using records from start to i. It's guaranteed that pageUpperBound has been set above.
if(flush) {
end = i == entries.size(); // i could have been moved above
int remaining = entries.size() - i;
end = remaining == 0; // i could have been moved above
int count = i - start;
// If not writing the final page, reduce entry count of page by a third
if(!end) {
i -= count / 3;
// If
// - this is not the last page
// - the number of entries remaining after this page is less than the count of the current page
// - the page that would be written ends on a user key boundary
// Then adjust the current page item count to half the amount remaining after the start position.
if(!end && remaining < count && entries[i - 1].key != entries[i].key) {
i = (start + entries.size()) / 2;
pageUpperBound = entries[i].withoutValue();
}
@ -3374,7 +3441,6 @@ private:
if(blockCount == 1) {
Reference<IPage> page = self->m_pager->newPageBuffer();
VALGRIND_MAKE_MEM_DEFINED(page->begin(), page->size());
btPage = (BTreePage *)page->mutate();
pages.push_back(std::move(page));
}
@ -3382,7 +3448,6 @@ private:
ASSERT(blockCount > 1);
int size = blockSize * blockCount;
btPage = (BTreePage *)new uint8_t[size];
VALGRIND_MAKE_MEM_DEFINED(btPage, size);
}
btPage->formatVersion = BTreePage::FORMAT_VERSION;
@ -3400,10 +3465,11 @@ private:
// Create chunked pages
// TODO: Avoid copying page bytes, but this is not trivial due to how pager checksums are currently handled.
if(blockCount != 1) {
// Mark the slack in the page buffer as defined
VALGRIND_MAKE_MEM_DEFINED(((uint8_t *)btPage) + written, (blockCount * blockSize) - written);
const uint8_t *rptr = (const uint8_t *)btPage;
for(int b = 0; b < blockCount; ++b) {
Reference<IPage> page = self->m_pager->newPageBuffer();
VALGRIND_MAKE_MEM_DEFINED(page->begin(), page->size());
memcpy(page->mutate(), rptr, blockSize);
rptr += blockSize;
pages.push_back(std::move(page));
@ -3571,9 +3637,6 @@ private:
debug_printf("readPage() %s\n", pTreePage->toString(false, id, snapshot->getVersion(), lowerBound, upperBound).c_str());
}
// Nothing should attempt to read bytes in the page outside the BTreePage structure
VALGRIND_MAKE_MEM_UNDEFINED(page->begin() + pTreePage->size(), page->size() - pTreePage->size());
return page;
}
@ -4572,6 +4635,7 @@ private:
wait(success(self->m_cur2.move(true)));
}
self->m_kv.reset();
while(self->m_cur1.valid()) {
if(self->m_cur1.presentAtVersion(self->m_version) &&
@ -4597,7 +4661,6 @@ private:
}
self->m_kv.reset();
debug_printf("Cursor::move(%d): Exit, end of db reached. Cursor = %s\n", fwd, self->toString().c_str());
return Void();
}
@ -5852,6 +5915,7 @@ TEST_CASE("!/redwood/correctness/btree") {
debug_printf("Waiting for verification to complete.\n");
wait(verifyTask);
debug_printf("Closing btree\n");
Future<Void> closedFuture = btree->onClosed();
btree->close();
wait(closedFuture);

View File

@ -7,11 +7,12 @@ setFraction=0.01
nodeCount=20000000
keyBytes=16
valueBytes=96
filename=bttest
setup=true
clear=false
count=false
useDB=false
storeType=ssd
filename=bttest-sqlite
testTitle=Scan
testName=KVStoreTest
@ -22,11 +23,12 @@ setFraction=0.01
nodeCount=20000000
keyBytes=16
valueBytes=96
filename=bttest
setup=false
clear=false
count=true
useDB=false
storeType=ssd
filename=bttest-sqlite
testTitle=RandomWriteSaturation
testName=KVStoreTest
@ -38,8 +40,58 @@ setFraction=1.0
nodeCount=20000000
keyBytes=16
valueBytes=96
filename=bttest
setup=false
clear=false
count=false
useDB=false
storeType=ssd
filename=bttest-sqlite
testTitle=Insert
testName=KVStoreTest
testDuration=0.0
operationsPerSecond=28000
commitFraction=0.001
setFraction=0.01
nodeCount=20000000
keyBytes=16
valueBytes=96
setup=true
clear=false
count=false
useDB=false
storeType=ssd-redwood-experimental
filename=bttest-redwood
testTitle=Scan
testName=KVStoreTest
testDuration=20.0
operationsPerSecond=28000
commitFraction=0.0001
setFraction=0.01
nodeCount=20000000
keyBytes=16
valueBytes=96
setup=false
clear=false
count=true
useDB=false
storeType=ssd-redwood-experimental
filename=bttest-redwood
testTitle=RandomWriteSaturation
testName=KVStoreTest
testDuration=20.0
saturation=true
operationsPerSecond=10000
commitFraction=0.00005
setFraction=1.0
nodeCount=20000000
keyBytes=16
valueBytes=96
setup=false
clear=false
count=false
useDB=false
storeType=ssd-redwood-experimental
filename=bttest-redwood