Bug fix in the design of the COWPager commit sequence. Page 1 is now used to store a copy of the previous committed header rather than the new one, as recovering to an unsync'd new header from Page 1 is incorrect behavior since other pager writes may not have made it to disk. Also fixed header page size handling which would write unusable backup headers when using >4k pages.
This commit is contained in:
parent
ca11845934
commit
61054492b6
|
@ -63,7 +63,7 @@ public:
|
|||
int64_t numPages;
|
||||
int64_t numEntries;
|
||||
std::string toString() const {
|
||||
return format("head: %u:%d tail: %u:%d numPages: %" PRId64 " numEntries: %" PRId64 "\n", headPageID, (int)headIndex, tailPageID, numPages, numEntries);
|
||||
return format("head: %u:%d tail: %u numPages: %" PRId64 " numEntries: %" PRId64 "\n", headPageID, (int)headIndex, tailPageID, numPages, numEntries);
|
||||
}
|
||||
};
|
||||
#pragma pack(pop)
|
||||
|
@ -90,7 +90,7 @@ public:
|
|||
// Point cursor to a page which has never been written before, allocate
|
||||
// a page buffer and initialize it
|
||||
void initWrite(FIFOQueue *q, LogicalPageID newPageID) {
|
||||
debug_printf("New queue cursor at page id=%u write=%d\n", newPageID, write);
|
||||
debug_printf("FIFOQueue(%s): New write queue cursor at page id=%u\n", q->name.c_str(), newPageID);
|
||||
queue = q;
|
||||
pageID = newPageID;
|
||||
initNewPageBuffer();
|
||||
|
@ -98,7 +98,7 @@ public:
|
|||
|
||||
// Point cursor to a page to read from. Begin loading the page if beginLoad is set.
|
||||
void initRead(FIFOQueue *q, LogicalPageID p, int i, LogicalPageID endPageID) {
|
||||
debug_printf("Loading queue cursor at page id=%u index=%d\n", p, i);
|
||||
debug_printf("FIFOQueue(%s): New read queue cursor at page id=%u index=%d end page id=%u\n", q->name.c_str(), p, i, endPageID);
|
||||
queue = q;
|
||||
pageID = p;
|
||||
index = i;
|
||||
|
@ -144,7 +144,7 @@ public:
|
|||
}
|
||||
|
||||
Future<Void> loadPage() {
|
||||
debug_printf("queue(%p, %s) loading page id=%u index=%d\n", this, queue->name.c_str(), pageID, index);
|
||||
debug_printf("FIFOQueue(%s): loading page id=%u index=%d\n", queue->name.c_str(), pageID, index);
|
||||
return map(queue->pager->readPage(pageID), [=](Reference<IPage> p) {
|
||||
page = p;
|
||||
return Void();
|
||||
|
@ -154,7 +154,7 @@ public:
|
|||
Future<Void> newPage() {
|
||||
ASSERT(page);
|
||||
return map(queue->pager->newPageID(), [=](LogicalPageID newPageID) {
|
||||
debug_printf("queue(%p, %s) new page id=%u\n", this, queue->name.c_str(), newPageID);
|
||||
debug_printf("FIFOQueue(%s): new page id=%u\n", queue->name.c_str(), newPageID);
|
||||
auto p = raw();
|
||||
p->next = newPageID;
|
||||
writePage();
|
||||
|
@ -177,7 +177,7 @@ public:
|
|||
// Pages are never written after being read, so if the write cursor is not
|
||||
// ready then it is getting a new page ID which must be written to the next
|
||||
// page ID of the page behind it.
|
||||
debug_printf("queue(%p, %s) write page id=%u\n", this, queue->name.c_str(), pageID);
|
||||
debug_printf("FIFOQueue(%s): write page id=%u\n", queue->name.c_str(), pageID);
|
||||
ASSERT(loading.isReady());
|
||||
queue->pager->updatePage(pageID, page);
|
||||
}
|
||||
|
@ -254,7 +254,7 @@ public:
|
|||
|
||||
// Create a new queue at newPageID
|
||||
void create(IPager2 *p, LogicalPageID newPageID, std::string queueName) {
|
||||
debug_printf("FIFOQueue::init(%p, %s) from page id %u\n", this, name.c_str(), newPageID);
|
||||
debug_printf("FIFOQueue(%s): create from page id %u\n", queueName.c_str(), newPageID);
|
||||
pager = p;
|
||||
name = queueName;
|
||||
numPages = 1;
|
||||
|
@ -267,9 +267,8 @@ public:
|
|||
|
||||
// Load an existing queue from its queue state
|
||||
void recover(IPager2 *p, const QueueState &qs, std::string queueName) {
|
||||
debug_printf("FIFOQueue::init(%p, %s) from queue state %u\n", this, name.c_str(), qs.toString().c_str());
|
||||
debug_printf("FIFOQueue(%s): recover from queue state %s\n", queueName.c_str(), qs.toString().c_str());
|
||||
pager = p;
|
||||
this->name = name;
|
||||
name = queueName;
|
||||
numPages = qs.numPages;
|
||||
numEntries = qs.numEntries;
|
||||
|
@ -293,6 +292,8 @@ public:
|
|||
s.tailPageID = tail.pageID;
|
||||
s.numEntries = numEntries;
|
||||
s.numPages = numPages;
|
||||
|
||||
debug_printf("FIFOQueue(%s): getState(): %s\n", name.c_str(), s.toString().c_str());
|
||||
return s;
|
||||
}
|
||||
|
||||
|
@ -326,13 +327,13 @@ public:
|
|||
|
||||
// Flush changes to the pager and return the resulting queue state.
|
||||
Future<QueueState> flush() {
|
||||
debug_printf("FIFOQueue::flush %p %s\n", this, name.c_str());
|
||||
debug_printf("FIFOQueue(%s): flush\n", name.c_str());
|
||||
Future<QueueState> oldWriter = writer;
|
||||
writeQueue.sendError(end_of_stream());
|
||||
writeQueue = PromiseStream<T>();
|
||||
writer = writeActor(this, writeQueue.getFuture());
|
||||
if(!oldWriter.isValid()) {
|
||||
debug_printf("FIFOQueue::flush %p oldwriter not valid %s\n", this, name.c_str());
|
||||
debug_printf("FIFOQueue(%s): flush, oldwriter not valid\n", name.c_str());
|
||||
return getState();
|
||||
}
|
||||
return oldWriter;
|
||||
|
@ -518,6 +519,10 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
void updateCommittedHeader() {
|
||||
memcpy(lastCommittedHeaderPage->mutate(), headerPage->begin(), smallestPhysicalBlock);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> recover(COWPager *self) {
|
||||
ASSERT(!self->recoverFuture.isValid());
|
||||
|
||||
|
@ -531,8 +536,10 @@ public:
|
|||
|
||||
// Header page is always treated as having a page size of smallestPhysicalBlock
|
||||
self->setPageSize(smallestPhysicalBlock);
|
||||
state int64_t fileSize = 0;
|
||||
self->lastCommittedHeaderPage = self->newPageBuffer();
|
||||
self->pLastCommittedHeader = (Header *)self->lastCommittedHeaderPage->begin();
|
||||
|
||||
state int64_t fileSize = 0;
|
||||
if(exists) {
|
||||
wait(store(fileSize, self->pageFile->size()));
|
||||
}
|
||||
|
@ -547,14 +554,14 @@ public:
|
|||
|
||||
state bool recoveredHeader = false;
|
||||
|
||||
// Read physical page 0 directly, checksum not required
|
||||
wait(store(self->headerPage, self->readPhysicalPage(self, 0, false)));
|
||||
// Read physical page 0 directly
|
||||
wait(store(self->headerPage, self->readHeaderPage(self, 0)));
|
||||
|
||||
// If the checksum fails for the header page, try to recover it from page 1
|
||||
// If the checksum fails for the header page, try to recover committed header backup from page 1
|
||||
if(BUGGIFY || !self->headerPage.castTo<Page>()->verifyChecksum(0)) {
|
||||
TraceEvent(SevWarn, "COWPagerRecoveringHeader").detail("Filename", self->filename);
|
||||
|
||||
wait(store(self->headerPage, self->readPhysicalPage(self, 1, false)));
|
||||
wait(store(self->headerPage, self->readHeaderPage(self, 1)));
|
||||
|
||||
if(!self->headerPage.castTo<Page>()->verifyChecksum(1)) {
|
||||
if(g_network->isSimulated()) {
|
||||
|
@ -562,8 +569,11 @@ public:
|
|||
throw io_error().asInjectedFault();
|
||||
}
|
||||
|
||||
TraceEvent(SevError, "COWPagerRecoveryFailed").detail("Filename", self->filename);
|
||||
throw io_error();
|
||||
Error e = checksum_failed();
|
||||
TraceEvent(SevError, "COWPagerRecoveryFailed")
|
||||
.detail("Filename", self->filename)
|
||||
.error(e);
|
||||
throw e;
|
||||
}
|
||||
recoveredHeader = true;
|
||||
}
|
||||
|
@ -580,10 +590,11 @@ public:
|
|||
|
||||
self->freeList.recover(self, self->pHeader->freeList, "FreeListRecovered");
|
||||
|
||||
// If the header was recovered from Page 1 then write and sync it to Page 0 before continuing.
|
||||
// If the header was recovered from the backup at Page 1 then write and sync it to Page 0 before continuing.
|
||||
// If this fails, the backup header is still in tact for the next recovery attempt.
|
||||
if(recoveredHeader) {
|
||||
// Write the header to page 0
|
||||
wait(self->writePhysicalPage(0, self->headerPage));
|
||||
wait(self->writeHeaderPage(0, self->headerPage));
|
||||
|
||||
// Wait for all outstanding writes to complete
|
||||
wait(self->writes.signalAndCollapse());
|
||||
|
@ -592,8 +603,15 @@ public:
|
|||
wait(self->pageFile->sync());
|
||||
debug_printf("COWPager(%s) Header recovery complete.\n", self->filename.c_str());
|
||||
}
|
||||
|
||||
// Update the last committed header with the one that was recovered (which is the last known committed header)
|
||||
self->updateCommittedHeader();
|
||||
}
|
||||
else {
|
||||
// Note: If the file contains less than 2 pages but more than 0 bytes then the pager was never successfully committed.
|
||||
// A new pager will be created in its place.
|
||||
// TODO: Is the right behavior?
|
||||
|
||||
debug_printf("COWPager(%s) creating new pager\n");
|
||||
|
||||
self->headerPage = self->newPageBuffer();
|
||||
|
@ -610,21 +628,27 @@ public:
|
|||
|
||||
// There are 2 reserved pages:
|
||||
// Page 0 - header
|
||||
// Page 1 - header write-ahead "log"
|
||||
// Page 1 - header backup
|
||||
self->pHeader->pageCount = 2;
|
||||
|
||||
// Create a new free list
|
||||
self->freeList.create(self, self->newPageID().get(), "FreeListNew");
|
||||
|
||||
// Clear remaining bytes of header
|
||||
memset(self->headerPage->mutate() + self->pHeader->size(), 0, self->headerPage->size() - self->pHeader->size());
|
||||
// The first commit() below will flush the queue and update the queue state in the header,
|
||||
// but since the queue will not be used between now and then its state will not change.
|
||||
// In order to populate lastCommittedHeader, update the header now with the queue's state.
|
||||
self->pHeader->freeList = self->freeList.getState();
|
||||
|
||||
// Set remaining header bytes to \xff
|
||||
memset(self->headerPage->mutate() + self->pHeader->size(), 0xff, self->headerPage->size() - self->pHeader->size());
|
||||
|
||||
// Since there is no previously committed header use the initial header for the initial commit.
|
||||
self->updateCommittedHeader();
|
||||
|
||||
wait(self->commit());
|
||||
}
|
||||
|
||||
self->pageCache = PageCacheT(self->pageCacheBytes / self->physicalPageSize);
|
||||
self->lastCommittedVersion = self->pHeader->committedVersion;
|
||||
self->lastCommittedMeta = self->pHeader->getMetaKey();
|
||||
|
||||
debug_printf("COWPager(%s) recovered. LogicalPageSize=%d PhysicalPageSize=%d\n", self->filename.c_str(), self->logicalPageSize, self->physicalPageSize);
|
||||
return Void();
|
||||
|
@ -662,11 +686,16 @@ public:
|
|||
return forwardError(f, errorPromise);
|
||||
};
|
||||
|
||||
Future<Void> writeHeaderPage(PhysicalPageID pageID, Reference<IPage> page) {
|
||||
debug_printf("COWPager(%s) header op=write id=%u\n", filename.c_str(), pageID);
|
||||
((Page *)page.getPtr())->updateChecksum(pageID);
|
||||
return holdWhile(page, pageFile->write(page->begin(), smallestPhysicalBlock, (int64_t)pageID * smallestPhysicalBlock));
|
||||
}
|
||||
|
||||
Future<Void> writePhysicalPage(PhysicalPageID pageID, Reference<IPage> page) {
|
||||
debug_printf("COWPager(%s) op=write id=%u\n", filename.c_str(), pageID);
|
||||
((Page *)page.getPtr())->updateChecksum(pageID);
|
||||
int physicalSize = (pageID == 0 || pageID == 1) ? smallestPhysicalBlock : physicalPageSize;
|
||||
return holdWhile(page, pageFile->write(page->begin(), physicalSize, (int64_t)pageID * physicalSize));
|
||||
return holdWhile(page, pageFile->write(page->begin(), physicalPageSize, (int64_t)pageID * physicalPageSize));
|
||||
}
|
||||
|
||||
void updatePage(LogicalPageID pageID, Reference<IPage> data) {
|
||||
|
@ -717,13 +746,24 @@ public:
|
|||
freeList.push(pageID);
|
||||
};
|
||||
|
||||
ACTOR static Future<Reference<IPage>> readPhysicalPage(COWPager *self, PhysicalPageID pageID, bool verifyChecksum = true) {
|
||||
// Header pages use a page size of smallestPhysicalBlock
|
||||
// If the user chosen physical page size is larger, then there will be a gap of unused space after
|
||||
// between the end of page 1 and the start of page 2.
|
||||
ACTOR static Future<Reference<IPage>> readHeaderPage(COWPager *self, PhysicalPageID pageID) {
|
||||
state Reference<IPage> page(new FastAllocatedPage(smallestPhysicalBlock, smallestPhysicalBlock));
|
||||
int readBytes = wait(self->pageFile->read(page->mutate(), smallestPhysicalBlock, (int64_t)pageID * smallestPhysicalBlock));
|
||||
debug_printf("COWPager(%s) header op=read_complete id=%u bytes=%d\n", self->filename.c_str(), pageID, readBytes);
|
||||
ASSERT(readBytes == smallestPhysicalBlock);
|
||||
return page;
|
||||
}
|
||||
|
||||
ACTOR static Future<Reference<IPage>> readPhysicalPage(COWPager *self, PhysicalPageID pageID) {
|
||||
state Reference<IPage> page = self->newPageBuffer();
|
||||
int readBytes = wait(self->pageFile->read(page->mutate(), self->physicalPageSize, (int64_t)pageID * self->physicalPageSize));
|
||||
debug_printf("COWPager(%s) op=read_complete id=%u bytes=%d\n", self->filename.c_str(), pageID, readBytes);
|
||||
ASSERT(readBytes == self->physicalPageSize);
|
||||
Page *p = (Page *)page.getPtr();
|
||||
if(verifyChecksum && !p->verifyChecksum(pageID)) {
|
||||
if(!p->verifyChecksum(pageID)) {
|
||||
debug_printf("COWPager(%s) checksum failed id=%u\n", self->filename.c_str(), pageID);
|
||||
Error e = checksum_failed();
|
||||
TraceEvent(SevError, "COWPagerChecksumFailed")
|
||||
|
@ -755,11 +795,11 @@ public:
|
|||
Reference<IPagerSnapshot> getReadSnapshot();
|
||||
|
||||
ACTOR static Future<Void> commit_impl(COWPager *self) {
|
||||
// Flush the free list queue to the pager
|
||||
wait(store(self->pHeader->freeList, self->freeList.flush()));
|
||||
// Write old committed header to Page 1
|
||||
self->writes.add(forwardError(self->writeHeaderPage(1, self->lastCommittedHeaderPage), self->errorPromise));
|
||||
|
||||
// Write the header write-ahead "log" at Page 1
|
||||
wait(self->writePhysicalPage(1, self->headerPage));
|
||||
// Flush the free list queue to the pager and get the new queue state into the header
|
||||
wait(store(self->pHeader->freeList, self->freeList.flush()));
|
||||
|
||||
// Wait for all outstanding writes to complete
|
||||
wait(self->writes.signalAndCollapse());
|
||||
|
@ -769,13 +809,12 @@ public:
|
|||
debug_printf("COWPager(%s) commit sync 1\n", self->filename.c_str());
|
||||
|
||||
// Update header on disk and sync again.
|
||||
wait(self->writePhysicalPage(0, self->headerPage));
|
||||
wait(self->writeHeaderPage(0, self->headerPage));
|
||||
wait(self->pageFile->sync());
|
||||
debug_printf("COWPager(%s) commit sync 2\n", self->filename.c_str());
|
||||
|
||||
// Update last committed state for use in creating snapshots at current version.
|
||||
self->lastCommittedVersion = self->pHeader->committedVersion;
|
||||
self->lastCommittedMeta = self->pHeader->getMetaKey();
|
||||
// Update the last committed header for use in the next commit.
|
||||
self->updateCommittedHeader();
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -851,7 +890,7 @@ public:
|
|||
|
||||
Future<Version> getLatestVersion() {
|
||||
return map(recoverFuture, [=](Void) {
|
||||
return lastCommittedVersion;
|
||||
return pLastCommittedHeader->committedVersion;
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -925,8 +964,8 @@ private:
|
|||
|
||||
int desiredPageSize;
|
||||
|
||||
Version lastCommittedVersion;
|
||||
Key lastCommittedMeta;
|
||||
Reference<IPage> lastCommittedHeaderPage;
|
||||
Header *pLastCommittedHeader;
|
||||
|
||||
std::string filename;
|
||||
|
||||
|
@ -983,8 +1022,8 @@ private:
|
|||
};
|
||||
|
||||
Reference<IPagerSnapshot> COWPager::getReadSnapshot() {
|
||||
++snapshotsInUse[lastCommittedVersion];
|
||||
return Reference<IPagerSnapshot>(new COWPagerSnapshot(this, lastCommittedMeta, lastCommittedVersion));
|
||||
++snapshotsInUse[pLastCommittedHeader->committedVersion];
|
||||
return Reference<IPagerSnapshot>(new COWPagerSnapshot(this, pLastCommittedHeader->getMetaKey(), pLastCommittedHeader->committedVersion));
|
||||
}
|
||||
|
||||
// TODO: Move this to a flow header once it is mature.
|
||||
|
@ -2558,7 +2597,7 @@ private:
|
|||
}
|
||||
|
||||
ACTOR static Future<Void> buildNewRoot(VersionedBTree *self, Version version, std::vector<BoundaryAndPage> *pages, std::vector<LogicalPageID> *logicalPageIDs, BTreePage *pPage) {
|
||||
debug_printf("buildNewRoot start version %" PRId64 ", %lu pages %s\n", version, pages->size());
|
||||
debug_printf("buildNewRoot start version %" PRId64 ", %lu pages\n", version, pages->size());
|
||||
|
||||
// While there are multiple child pages for this version we must write new tree levels.
|
||||
while(pages->size() > 1) {
|
||||
|
|
Loading…
Reference in New Issue