Bug fixes in FIFOQueue. Read cursor would not start loading pages again after its end was pushed forward. Queue flushing of the free list queue would leave tail cursor in a bad state.

This commit is contained in:
Stephen Atherton 2019-08-18 22:29:24 -07:00
parent 1bb323fa8c
commit 5384cf8f9c
1 changed files with 23 additions and 12 deletions

View File

@ -80,7 +80,7 @@ public:
// Cursor will not read this page or anything beyond it. // Cursor will not read this page or anything beyond it.
LogicalPageID endPageID; LogicalPageID endPageID;
Cursor() : queue(nullptr) { Cursor() : queue(nullptr), pageID(0), endPageID(0) {
} }
void setEnd(Cursor &end) { void setEnd(Cursor &end) {
@ -94,6 +94,7 @@ public:
queue = q; queue = q;
pageID = newPageID; pageID = newPageID;
initNewPageBuffer(); initNewPageBuffer();
loading = Void();
} }
// Point cursor to a page to read from. Begin loading the page if beginLoad is set. // Point cursor to a page to read from. Begin loading the page if beginLoad is set.
@ -115,7 +116,6 @@ public:
auto p = raw(); auto p = raw();
p->next = 0; p->next = 0;
p->count = 0; p->count = 0;
loading = Void();
} }
Cursor(Cursor &) = delete; Cursor(Cursor &) = delete;
@ -125,8 +125,8 @@ public:
loading.cancel(); loading.cancel();
} }
Future<Void> ready() { Future<Void> notLoading() {
return loading; return loading.isValid() ? loading : Void();
} }
#pragma pack(push, 1) #pragma pack(push, 1)
@ -192,6 +192,7 @@ public:
Future<Void> writeNext(const T &item) { Future<Void> writeNext(const T &item) {
// If the cursor is loaded already, write the item and move to the next slot // If the cursor is loaded already, write the item and move to the next slot
if(loading.isReady()) { if(loading.isReady()) {
debug_printf("FIFOQueue(%s): write next to %u:%d\n", queue->name.c_str(), pageID, index);
auto p = raw(); auto p = raw();
p->at(index) = item; p->at(index) = item;
++p->count; ++p->count;
@ -214,9 +215,18 @@ public:
// Read and moved past the next item if it is < upperBound // Read and moved past the next item if it is < upperBound
Future<Optional<T>> moveNext(const Optional<T> &upperBound = {}) { Future<Optional<T>> moveNext(const Optional<T> &upperBound = {}) {
// If loading is not valid then either the cursor is not initialized or it points to a page not yet durable. // If loading is not valid then either the cursor is not initialized.
// It may have at one time pointed to a page not yet committed.
if(!loading.isValid()) { if(!loading.isValid()) {
return Optional<T>(); // If the pageID isn't the endPageID then start loading the page
if(pageID != endPageID) {
debug_printf("FIFOQueue(%s) starting load of page id=%u which is no longer the end page id=%u\n", queue->name.c_str(), pageID, endPageID);
loading = loadPage();
}
else {
// Otherwise we can't read anymore so return nothing
return Optional<T>();
}
} }
// If loading is ready, read an item and move forward // If loading is ready, read an item and move forward
@ -231,7 +241,6 @@ public:
T result = p->at(index); T result = p->at(index);
--queue->numEntries; --queue->numEntries;
++index; ++index;
debug_printf("FIFOQueue(%s) read cursor popped from page id=%u index=%d count=%d\n", queue->name.c_str(), pageID, index, p->count);
// If this page is out of items, start reading the next one // If this page is out of items, start reading the next one
if(index == p->count) { if(index == p->count) {
@ -246,9 +255,6 @@ public:
// queue recursively if the pager's free list is being stored in this queue. // queue recursively if the pager's free list is being stored in this queue.
queue->pager->freePage(oldPageID); queue->pager->freePage(oldPageID);
} }
else {
debug_printf("FIFOQueue(%s) index and count are not the same %d %u\n", queue->name.c_str(), index, p->count);
}
return Optional<T>(result); return Optional<T>(result);
} }
@ -322,8 +328,13 @@ public:
} }
} }
// Wait for tail to be ready to write to a page // Wait for the head cursor to be done loading because it might free a page, which would add to the
wait(self->tail.ready()); // free list queue, which might be this queue.
wait(self->head.notLoading());
// Wait for the final write to the queue to be finished, it may be waiting for a new pageID after
// filling a page to capacity.
wait(self->tail.notLoading());
// If tail page is not empty, link it to a new unwritten/empty page // If tail page is not empty, link it to a new unwritten/empty page
if(!self->tail.empty()) { if(!self->tail.empty()) {