Bug fix, COWPager failed to reopen a created but unsync'd pager file. Added proper checksum error handling.

This commit is contained in:
Stephen Atherton 2019-08-14 03:01:46 -07:00
parent 57f55c1e99
commit 537b8dc7ac
1 changed files with 28 additions and 7 deletions

View File

@ -525,9 +525,16 @@ public:
// Header page is always treated as having a page size of smallestPhysicalBlock
self->setPageSize(smallestPhysicalBlock);
state int64_t fileSize = 0;
if(exists) {
debug_printf("File exists, reading header: %s\n", self->filename.c_str());
wait(store(fileSize, self->pageFile->size()));
}
debug_printf("COWPager(%s) recover exists=%d fileSize=%" PRId64 "\n", self->filename.c_str(), exists, fileSize);
if(exists && fileSize >= self->smallestPhysicalBlock) {
debug_printf("COWPager(%s) recovering using existing file\n");
// Read physical page 0 directly
wait(store(self->headerPage, self->readPhysicalPage(self, 0)));
@ -544,7 +551,7 @@ public:
self->freeList.recover(self, self->pHeader->freeList, "FreeListRecovered");
}
else {
debug_printf("File does not exist, creating header page: %s\n", self->filename.c_str());
debug_printf("COWPager(%s) creating new pager\n");
self->headerPage = self->newPageBuffer();
self->pHeader = (Header *)self->headerPage->begin();
@ -575,7 +582,7 @@ public:
self->lastCommittedVersion = self->pHeader->committedVersion;
self->lastCommittedMeta = self->pHeader->getMetaKey();
debug_printf("Recovered %s\n", self->filename.c_str());
debug_printf("COWPager(%s) recovered. LogicalPageSize=%d PhysicalPageSize=%d\n", self->filename.c_str(), self->logicalPageSize, self->physicalPageSize);
return Void();
}
@ -620,7 +627,7 @@ public:
void updatePage(LogicalPageID pageID, Reference<IPage> data) {
// Get the cache entry for this page
PageCacheEntry &cacheEntry = pageCache.get(pageID);
debug_printf("COWPager op=write id=%u cached=%d reading=%d writing=%d\n", pageID, cacheEntry.page.isValid(), cacheEntry.reading(), cacheEntry.writing());
debug_printf("COWPager(%s) op=write id=%u cached=%d reading=%d writing=%d\n", filename.c_str(), pageID, cacheEntry.page.isValid(), cacheEntry.reading(), cacheEntry.writing());
// If the page is still being read then it's not also being written because a write places
// the new content in the cache entry when the write is launched, not when it is completed.
@ -668,16 +675,28 @@ public:
ACTOR static Future<Reference<IPage>> readPhysicalPage(COWPager *self, PhysicalPageID pageID) {
state Reference<IPage> page = self->newPageBuffer();
int readBytes = wait(self->pageFile->read(page->mutate(), self->physicalPageSize, (int64_t)pageID * self->physicalPageSize));
debug_printf("op=read_complete id=%u bytes=%d\n", pageID, readBytes);
debug_printf("COWPager(%s) op=read_complete id=%u bytes=%d\n", self->filename.c_str(), pageID, readBytes);
ASSERT(readBytes == self->physicalPageSize);
ASSERT(((Page *)page.getPtr())->verifyChecksum(pageID));
Page *p = (Page *)page.getPtr();
if(!p->verifyChecksum(pageID)) {
Error e = checksum_failed();
TraceEvent(SevError, "COWPagerChecksumFailed")
.detail("Filename", self->filename.c_str())
.detail("PageID", pageID)
.detail("PageSize", self->physicalPageSize)
.detail("Offset", pageID * self->physicalPageSize)
.detail("CalculatedChecksum", p->calculateChecksum(pageID))
.detail("ChecksumInPage", p->getChecksum())
.error(e);
throw e;
}
return page;
}
// Reads the most recent version of pageID either committed or written using updatePage()
Future<Reference<IPage>> readPage(LogicalPageID pageID) {
PageCacheEntry &cacheEntry = pageCache.get(pageID);
debug_printf("COWPager op=read id=%u cached=%d reading=%d writing=%d\n", pageID, cacheEntry.page.isValid(), cacheEntry.reading(), cacheEntry.writing());
debug_printf("COWPager(%s) op=read id=%u cached=%d reading=%d writing=%d\n", filename.c_str(), pageID, cacheEntry.page.isValid(), cacheEntry.reading(), cacheEntry.writing());
if(!cacheEntry.page.isValid()) {
cacheEntry.page = readPhysicalPage(this, (PhysicalPageID)pageID);
@ -698,10 +717,12 @@ public:
// Sync everything except the header
wait(self->pageFile->sync());
debug_printf("COWPager(%s) commit sync 1\n", self->filename.c_str());
// Update header on disk and sync again.
wait(self->writePhysicalPage(0, self->headerPage));
wait(self->pageFile->sync());
debug_printf("COWPager(%s) commit sync 2\n", self->filename.c_str());
// Update last committed state for use in creating snapshots at current version.
self->lastCommittedVersion = self->pHeader->committedVersion;