Bug fixes. COWPager initialization was not flushing non-header pages with fsync() before writing and syncing the header. FIFOQueue was writing the initial page of a new queue multiple times. FIFOQueue::writePage() would unnecessarily (and invalidly) attempt to write if the page is not yet loaded.

This commit is contained in:
Stephen Atherton 2019-08-08 02:57:23 -07:00
parent bd8ed07f4d
commit f81eeea495
2 changed files with 28 additions and 25 deletions

View File

@ -40,6 +40,7 @@
#endif
#define BEACON fprintf(stderr, "%s: %s line %d \n", __FUNCTION__, __FILE__, __LINE__)
#define TRACE fprintf(stderr, "%s: %s line %d %s\n", __FUNCTION__, __FILE__, __LINE__, platform::get_backtrace().c_str());
#ifndef VALGRIND
#define VALGRIND_MAKE_MEM_UNDEFINED(x, y)

View File

@ -73,7 +73,8 @@ public:
Cursor() : queue(nullptr) {
}
void init(FIFOQueue *q, LogicalPageID p) {
void initNew(FIFOQueue *q, LogicalPageID p) {
debug_printf("New queue cursor at page id=%u write=%d\n", p, write);
queue = q;
pageID = p;
index = 0;
@ -82,7 +83,8 @@ public:
writePage();
}
void init(FIFOQueue *q, LogicalPageID p, int i) {
void initExisting(FIFOQueue *q, LogicalPageID p, int i) {
debug_printf("Loading queue cursor at page id=%u index=%d\n", p, i);
queue = q;
pageID = p;
index = i;
@ -140,7 +142,10 @@ public:
}
void writePage() {
queue->pager->updatePage(pageID, page);
// If the page isn't loaded yet then there can't possibly be anything new to write
if(loading.isReady()) {
queue->pager->updatePage(pageID, page);
}
}
ACTOR static Future<Void> waitThenWriteNext(Cursor *self, T item) {
@ -207,9 +212,9 @@ public:
name = queueName;
numPages = 1;
numEntries = 0;
head.init(this, newPageID);
tail.init(this, newPageID);
stop.init(this, newPageID);
tail.initNew(this, newPageID);
head.initExisting(this, tail.pageID, tail.index);
stop.initExisting(this, tail.pageID, tail.index);
ASSERT(flush().isReady());
}
@ -221,9 +226,9 @@ public:
name = queueName;
numPages = qs.numPages;
numEntries = qs.numEntries;
head.init(this, qs.headPageID, qs.headIndex);
tail.init(this, qs.tailPageID, qs.tailIndex);
stop.init(this, qs.tailPageID, qs.tailIndex);
head.initExisting(this, qs.headPageID, qs.headIndex);
tail.initExisting(this, qs.tailPageID, qs.tailIndex);
stop.initExisting(this, qs.tailPageID, qs.tailIndex);
ASSERT(flush().isReady());
}
@ -246,10 +251,12 @@ public:
}
ACTOR static Future<QueueState> writeActor(FIFOQueue *self, FutureStream<T> queue) {
state bool modified = false;
try {
loop {
state T item = waitNext(queue);
self->tail.writeNext(item);
wait(self->tail.writeNext(item));
modified = true;
}
}
catch(Error &e) {
@ -258,8 +265,10 @@ public:
}
}
self->tail.writePage();
self->stop.init(self, self->tail.pageID, self->tail.index);
if(modified) {
self->tail.writePage();
self->stop.initExisting(self, self->tail.pageID, self->tail.index);
}
return self->getState();
}
@ -494,15 +503,9 @@ public:
// Create a new free list at page 1
self->freeList.init(self, 1, "FreeListNew");
// Flush free list, store state in header
store(self->pHeader->freeList, self->freeList.flush());
// Clear remaining bytes of header
memset(self->headerPage->mutate() + self->pHeader->size(), 0, self->headerPage->size() - self->pHeader->size());
// Update header page on disk and sync
wait(self->writePhysicalPage(0, self->headerPage));
wait(self->commit());
}
@ -552,7 +555,7 @@ public:
void updatePage(LogicalPageID pageID, Reference<IPage> data) {
// Get the cache entry for this page
PageCacheEntry &cacheEntry = pageCache.get(pageID);
debug_printf("COWPager op=write id=%u cached=%d\n", pageID, cacheEntry.page.isValid());
debug_printf("COWPager op=write id=%u cached=%d reading=%d writing=%d\n", pageID, cacheEntry.page.isValid(), cacheEntry.reading(), cacheEntry.writing());
// If the page is still being read then it's not also being written because a write places
// the new content in the cache entry when the write is launched, not when it is completed.
@ -601,6 +604,7 @@ public:
try {
int readBytes = wait(self->pageFile->read(page->mutate(), self->physicalPageSize, (int64_t)pageID * self->physicalPageSize));
debug_printf("op=read_complete id=%u bytes=%d\n", pageID, readBytes);
ASSERT(readBytes == self->physicalPageSize);
ASSERT(((Page *)page.getPtr())->verifyChecksum(pageID));
} catch(Error &e) {
@ -616,7 +620,7 @@ public:
// Reads the most recent version of pageID either committed or written using updatePage()
Future<Reference<IPage>> readPage(LogicalPageID pageID) {
PageCacheEntry &cacheEntry = pageCache.get(pageID);
debug_printf("COWPager op=read id=%u cached=%d ready=%d\n", pageID, cacheEntry.page.isValid(), cacheEntry.page.isValid() && cacheEntry.page.isReady());
debug_printf("COWPager op=read id=%u cached=%d reading=%d writing=%d\n", pageID, cacheEntry.page.isValid(), cacheEntry.reading(), cacheEntry.writing());
if(!cacheEntry.page.isValid()) {
cacheEntry.page = readPhysicalPage(this, (PhysicalPageID)pageID);
@ -644,10 +648,7 @@ public:
ACTOR static Future<Void> commit_impl(COWPager *self) {
// Flush the free list queue to the pager
LogicalPageQueueT::QueueState freeListState = wait(self->freeList.flush());
// Update header in memory
self->pHeader->freeList = freeListState;
wait(store(self->pHeader->freeList, self->freeList.flush()));
// Wait for all outstanding writes to complete
wait(self->writes.signalAndCollapse());
@ -655,10 +656,11 @@ public:
// Sync everything except the header
wait(self->pageFile->sync());
// Update header on disk and sync
// Update header on disk and sync again.
wait(self->writePhysicalPage(0, self->headerPage));
wait(self->pageFile->sync());
// Update last committed state for use in creating snapshots at current version.
self->lastCommittedVersion = self->pHeader->committedVersion;
self->lastCommittedMeta = self->pHeader->getMetaKey();