Bug fix in FIFOQueue pop() when freeing an exhausted page causes a recursive pop() from the same queue, which happens when the queue is the freelist itself and the write cursor is also at the end of its page.
This commit is contained in:
parent
1882b58d21
commit
1bb323fa8c
|
@ -97,11 +97,12 @@ public:
|
|||
}
|
||||
|
||||
// Point cursor to a page to read from. Begin loading the page if beginLoad is set.
|
||||
void initRead(FIFOQueue *q, LogicalPageID p, int i, LogicalPageID endPageID) {
|
||||
debug_printf("FIFOQueue(%s): New read queue cursor at page id=%u index=%d end page id=%u\n", q->name.c_str(), p, i, endPageID);
|
||||
void initRead(FIFOQueue *q, LogicalPageID p, int i, LogicalPageID end) {
|
||||
debug_printf("FIFOQueue(%s): New read queue cursor at page id=%u index=%d end page id=%u\n", q->name.c_str(), p, i, end);
|
||||
queue = q;
|
||||
pageID = p;
|
||||
index = i;
|
||||
endPageID = end;
|
||||
|
||||
// If cursor is not pointed at the end page then start loading it.
|
||||
// The end page will not have been written to disk yet.
|
||||
|
@ -153,8 +154,10 @@ public:
|
|||
|
||||
Future<Void> newPage() {
|
||||
ASSERT(page);
|
||||
return map(queue->pager->newPageID(), [=](LogicalPageID newPageID) {
|
||||
debug_printf("FIFOQueue(%s): new page id=%u\n", queue->name.c_str(), newPageID);
|
||||
ASSERT(loading.isReady());
|
||||
|
||||
loading = map(queue->pager->newPageID(), [=](LogicalPageID newPageID) {
|
||||
debug_printf("FIFOQueue(%s): new page id=%u\n", queue->name.c_str(), newPageID);
|
||||
auto p = raw();
|
||||
p->next = newPageID;
|
||||
writePage();
|
||||
|
@ -163,6 +166,8 @@ public:
|
|||
initNewPageBuffer();
|
||||
return Void();
|
||||
});
|
||||
|
||||
return loading;
|
||||
}
|
||||
|
||||
bool operator== (const Cursor &rhs) {
|
||||
|
@ -174,11 +179,7 @@ public:
|
|||
}
|
||||
|
||||
void writePage() {
|
||||
// Pages are never written after being read, so if the write cursor is not
|
||||
// ready then it is getting a new page ID which must be written to the next
|
||||
// page ID of the page behind it.
|
||||
debug_printf("FIFOQueue(%s): write page id=%u\n", queue->name.c_str(), pageID);
|
||||
ASSERT(loading.isReady());
|
||||
queue->pager->updatePage(pageID, page);
|
||||
}
|
||||
|
||||
|
@ -197,7 +198,7 @@ public:
|
|||
++queue->numEntries;
|
||||
++index;
|
||||
if(index == queue->itemsPerPage) {
|
||||
this->loading = newPage();
|
||||
newPage();
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
@ -222,20 +223,31 @@ public:
|
|||
if(loading.isReady()) {
|
||||
auto p = raw();
|
||||
if(upperBound.present() && p->at(index) >= upperBound.get()) {
|
||||
debug_printf("FIFOQueue(%s) pop upperbound limit exceeded\n", queue->name.c_str());
|
||||
return Optional<T>();
|
||||
}
|
||||
|
||||
debug_printf("FIFOQueue(%s) read cursor pop from page id=%u index=%d count=%d\n", queue->name.c_str(), pageID, index, p->count);
|
||||
T result = p->at(index);
|
||||
--queue->numEntries;
|
||||
++index;
|
||||
debug_printf("FIFOQueue(%s) read cursor popped from page id=%u index=%d count=%d\n", queue->name.c_str(), pageID, index, p->count);
|
||||
|
||||
// If this page is out of items, start reading the next one
|
||||
if(index == p->count) {
|
||||
queue->pager->freePage(pageID);
|
||||
LogicalPageID oldPageID = pageID;
|
||||
pageID = p->next;
|
||||
index = 0;
|
||||
--queue->numPages;
|
||||
debug_printf("FIFOQueue(%s) advancing to next page id=%u endPageID=%u\n", queue->name.c_str(), pageID, endPageID);
|
||||
loading = (pageID == endPageID) ? Future<Void>() : loadPage();
|
||||
|
||||
// freePage() must be called after setting the loading future because freePage() might pop from this
|
||||
// queue recursively if the pager's free list is being stored in this queue.
|
||||
queue->pager->freePage(oldPageID);
|
||||
}
|
||||
else {
|
||||
debug_printf("FIFOQueue(%s) index and count are not the same %d %u\n", queue->name.c_str(), index, p->count);
|
||||
}
|
||||
|
||||
return Optional<T>(result);
|
||||
|
@ -310,12 +322,15 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
// Wait for tail to be ready to write to a page
|
||||
wait(self->tail.ready());
|
||||
|
||||
// If tail page is not empty, link it to a new unwritten/empty page
|
||||
if(!self->tail.empty()) {
|
||||
wait(self->tail.newPage());
|
||||
}
|
||||
|
||||
// After queue is flushed, head may read everything written so far (which will have been committed)
|
||||
self->head.setEnd(self->tail);
|
||||
|
||||
return self->getState();
|
||||
|
@ -671,16 +686,24 @@ public:
|
|||
Future<Optional<LogicalPageID>> nextPageID = freeList.pop();
|
||||
if(nextPageID.isReady()) {
|
||||
if(nextPageID.get().present()) {
|
||||
debug_printf("COWPager(%s) new page id=%u from ready freelist\n", filename.c_str(), nextPageID.get().get());
|
||||
return nextPageID.get().get();
|
||||
}
|
||||
return ++pHeader->pageCount;
|
||||
LogicalPageID id = pHeader->pageCount;
|
||||
++pHeader->pageCount;
|
||||
debug_printf("COWPager(%s) new page id=%u at end of file\n", filename.c_str(), id);
|
||||
return id;
|
||||
}
|
||||
|
||||
Future<LogicalPageID> f = map(nextPageID, [=](Optional<LogicalPageID> nextPageID) {
|
||||
if(nextPageID.present()) {
|
||||
debug_printf("COWPager(%s) new page id=%u from freelist after wait\n", filename.c_str(), nextPageID.get());
|
||||
return nextPageID.get();
|
||||
}
|
||||
return (LogicalPageID)++(pHeader->pageCount);
|
||||
LogicalPageID id = pHeader->pageCount;
|
||||
++pHeader->pageCount;
|
||||
debug_printf("COWPager(%s) new page id=%u at end of file\n", filename.c_str(), id);
|
||||
return id;
|
||||
});
|
||||
|
||||
return forwardError(f, errorPromise);
|
||||
|
|
Loading…
Reference in New Issue