Merge pull request #4760 from sfc-gh-satherton/redwood-queue-shutdown-crash
Refactor FIFOQueue::Cursor read/write operations
This commit is contained in:
commit
3861e06077
|
@ -235,24 +235,36 @@ public:
|
|||
#pragma pack(pop)
|
||||
|
||||
struct Cursor {
|
||||
// Queue mode
|
||||
enum Mode { NONE, POP, READONLY, WRITE };
|
||||
Mode mode;
|
||||
|
||||
// The current page being read or written to
|
||||
// Queue this cursor is accessing
|
||||
FIFOQueue* queue;
|
||||
|
||||
// The current page and pageID being read or written to
|
||||
LogicalPageID pageID;
|
||||
Reference<IPage> page;
|
||||
|
||||
// The first page ID to be written to the pager, if this cursor has written anything
|
||||
LogicalPageID firstPageIDWritten;
|
||||
|
||||
// Offset after RawPage header to next read from or write to
|
||||
// Offset after RawPage header in page to next read from or write to
|
||||
int offset;
|
||||
|
||||
// A read cursor will not read this page (or beyond)
|
||||
LogicalPageID endPageID;
|
||||
|
||||
Reference<IPage> page;
|
||||
FIFOQueue* queue;
|
||||
Future<Void> operation;
|
||||
Mode mode;
|
||||
// Page future and corresponding page ID for the expected next page to be used. It may not
|
||||
// match the current page's next page link because queues can prepended with new front pages.
|
||||
Future<Reference<IPage>> nextPageReader;
|
||||
LogicalPageID nextPageID;
|
||||
|
||||
// Future that represents all outstanding write operations previously issued
|
||||
// This exists because writing the queue returns void, not a future
|
||||
Future<Void> writeOperations;
|
||||
|
||||
FlowLock mutex;
|
||||
|
||||
Cursor() : mode(NONE) {}
|
||||
|
||||
|
@ -262,26 +274,27 @@ public:
|
|||
LogicalPageID initialPageID = invalidLogicalPageID,
|
||||
int readOffset = 0,
|
||||
LogicalPageID endPage = invalidLogicalPageID) {
|
||||
if (operation.isValid()) {
|
||||
operation.cancel();
|
||||
}
|
||||
queue = q;
|
||||
mode = m;
|
||||
firstPageIDWritten = invalidLogicalPageID;
|
||||
offset = readOffset;
|
||||
endPageID = endPage;
|
||||
page.clear();
|
||||
writeOperations = Void();
|
||||
|
||||
if (mode == POP || mode == READONLY) {
|
||||
// If cursor is not pointed at the end page then start loading it.
|
||||
// The end page will not have been written to disk yet.
|
||||
pageID = initialPageID;
|
||||
operation = (pageID == endPageID) ? Void() : loadPage();
|
||||
if (pageID != endPageID) {
|
||||
startNextPageLoad(pageID);
|
||||
} else {
|
||||
nextPageID = invalidLogicalPageID;
|
||||
}
|
||||
} else {
|
||||
pageID = invalidLogicalPageID;
|
||||
ASSERT(mode == WRITE ||
|
||||
(initialPageID == invalidLogicalPageID && readOffset == 0 && endPage == invalidLogicalPageID));
|
||||
operation = Void();
|
||||
}
|
||||
|
||||
debug_printf("FIFOQueue::Cursor(%s) initialized\n", toString().c_str());
|
||||
|
@ -294,17 +307,17 @@ public:
|
|||
// Since cursors can have async operations pending which modify their state they can't be copied cleanly
|
||||
Cursor(const Cursor& other) = delete;
|
||||
|
||||
~Cursor() { writeOperations.cancel(); }
|
||||
|
||||
// A read cursor can be initialized from a pop cursor
|
||||
void initReadOnly(const Cursor& c) {
|
||||
ASSERT(c.mode == READONLY || c.mode == POP);
|
||||
init(c.queue, READONLY, c.pageID, c.offset, c.endPageID);
|
||||
}
|
||||
|
||||
~Cursor() { operation.cancel(); }
|
||||
|
||||
std::string toString() const {
|
||||
if (mode == WRITE) {
|
||||
return format("{WriteCursor %s:%p pos=%s:%d endOffset=%d}",
|
||||
return format("{WriteCursor %s:%p pos=%s:%d rawEndOffset=%d}",
|
||||
queue->name.c_str(),
|
||||
this,
|
||||
::toString(pageID).c_str(),
|
||||
|
@ -312,13 +325,14 @@ public:
|
|||
page ? raw()->endOffset : -1);
|
||||
}
|
||||
if (mode == POP || mode == READONLY) {
|
||||
return format("{ReadCursor %s:%p pos=%s:%d endOffset=%d endPage=%s}",
|
||||
return format("{ReadCursor %s:%p pos=%s:%d rawEndOffset=%d endPage=%s nextPage=%s}",
|
||||
queue->name.c_str(),
|
||||
this,
|
||||
::toString(pageID).c_str(),
|
||||
offset,
|
||||
page ? raw()->endOffset : -1,
|
||||
::toString(endPageID).c_str());
|
||||
::toString(endPageID).c_str(),
|
||||
::toString(nextPageID).c_str());
|
||||
}
|
||||
ASSERT(mode == NONE);
|
||||
return format("{NullCursor=%p}", this);
|
||||
|
@ -326,17 +340,33 @@ public:
|
|||
|
||||
#pragma pack(push, 1)
|
||||
struct RawPage {
|
||||
// The next page of the queue after this one
|
||||
LogicalPageID nextPageID;
|
||||
// The start offset of the next page
|
||||
uint16_t nextOffset;
|
||||
// The end offset of the current page
|
||||
uint16_t endOffset;
|
||||
// Get pointer to data after page header
|
||||
uint8_t* begin() { return (uint8_t*)(this + 1); }
|
||||
};
|
||||
#pragma pack(pop)
|
||||
|
||||
Future<Void> notBusy() { return operation; }
|
||||
// Returns true if the mutex cannot be immediately taken.
|
||||
bool isBusy() { return mutex.activePermits() != 0; }
|
||||
|
||||
// Wait for all operations started before now to be ready, which is done by
|
||||
// obtaining and releasing the mutex.
|
||||
Future<Void> notBusy() {
|
||||
return isBusy() ? map(mutex.take(),
|
||||
[&](Void) {
|
||||
mutex.release();
|
||||
return Void();
|
||||
})
|
||||
: Void();
|
||||
}
|
||||
|
||||
// Returns true if any items have been written to the last page
|
||||
bool pendingWrites() const { return mode == WRITE && offset != 0; }
|
||||
bool pendingTailWrites() const { return mode == WRITE && offset != 0; }
|
||||
|
||||
RawPage* raw() const { return ((RawPage*)(page->begin())); }
|
||||
|
||||
|
@ -347,14 +377,11 @@ public:
|
|||
p->nextOffset = offset;
|
||||
}
|
||||
|
||||
Future<Void> loadPage() {
|
||||
ASSERT(mode == POP | mode == READONLY);
|
||||
debug_printf("FIFOQueue::Cursor(%s) loadPage\n", toString().c_str());
|
||||
return map(queue->pager->readPage(pageID, true), [=](Reference<IPage> p) {
|
||||
page = p;
|
||||
debug_printf("FIFOQueue::Cursor(%s) loadPage done\n", toString().c_str());
|
||||
return Void();
|
||||
});
|
||||
void startNextPageLoad(LogicalPageID id) {
|
||||
nextPageID = id;
|
||||
debug_printf(
|
||||
"FIFOQueue::Cursor(%s) loadPage start id=%s\n", toString().c_str(), ::toString(nextPageID).c_str());
|
||||
nextPageReader = waitOrError(queue->pager->readPage(nextPageID, true), queue->pagerError);
|
||||
}
|
||||
|
||||
void writePage() {
|
||||
|
@ -369,6 +396,8 @@ public:
|
|||
}
|
||||
|
||||
// Link the current page to newPageID:newOffset and then write it to the pager.
|
||||
// The link destination could be a new page at the end of the queue, or the beginning of
|
||||
// an existing chain of queue pages.
|
||||
// If initializeNewPage is true a page buffer will be allocated for the new page and it will be initialized
|
||||
// as a new tail page.
|
||||
void addNewPage(LogicalPageID newPageID, int newOffset, bool initializeNewPage) {
|
||||
|
@ -382,13 +411,25 @@ public:
|
|||
// Update existing page and write, if it exists
|
||||
if (page) {
|
||||
setNext(newPageID, newOffset);
|
||||
debug_printf("FIFOQueue::Cursor(%s) Linked new page\n", toString().c_str());
|
||||
debug_printf("FIFOQueue::Cursor(%s) Linked new page %s:%d\n",
|
||||
toString().c_str(),
|
||||
::toString(newPageID).c_str(),
|
||||
newOffset);
|
||||
writePage();
|
||||
}
|
||||
|
||||
pageID = newPageID;
|
||||
offset = newOffset;
|
||||
|
||||
if (BUGGIFY) {
|
||||
// Randomly change the byte limit for queue pages. The min here must be large enough for at least one
|
||||
// queue item of any type. This change will suddenly make some pages being written to seem overfilled
|
||||
// but this won't break anything, the next write will just be detected as not fitting and the page will
|
||||
// end.
|
||||
queue->dataBytesPerPage = deterministicRandom()->randomInt(
|
||||
50, queue->pager->getUsablePageSize() - sizeof(typename Cursor::RawPage));
|
||||
}
|
||||
|
||||
if (initializeNewPage) {
|
||||
debug_printf("FIFOQueue::Cursor(%s) Initializing new page\n", toString().c_str());
|
||||
page = queue->pager->newPageBuffer();
|
||||
|
@ -402,123 +443,197 @@ public:
|
|||
}
|
||||
|
||||
// Write item to the next position in the current page or, if it won't fit, add a new page and write it there.
|
||||
ACTOR static Future<Void> write_impl(Cursor* self, T item, Future<Void> start) {
|
||||
ACTOR static Future<Void> write_impl(Cursor* self, T item) {
|
||||
ASSERT(self->mode == WRITE);
|
||||
|
||||
// Wait for the previous operation to finish
|
||||
state Future<Void> previous = self->operation;
|
||||
wait(start);
|
||||
wait(previous);
|
||||
|
||||
state bool mustWait = self->isBusy();
|
||||
state int bytesNeeded = Codec::bytesNeeded(item);
|
||||
if (self->pageID == invalidLogicalPageID || self->offset + bytesNeeded > self->queue->dataBytesPerPage) {
|
||||
state bool needNewPage =
|
||||
self->pageID == invalidLogicalPageID || self->offset + bytesNeeded > self->queue->dataBytesPerPage;
|
||||
|
||||
debug_printf("FIFOQueue::Cursor(%s) write(%s) mustWait=%d needNewPage=%d\n",
|
||||
self->toString().c_str(),
|
||||
::toString(item).c_str(),
|
||||
mustWait,
|
||||
needNewPage);
|
||||
|
||||
// If we have to wait for the mutex because it's busy, or we need a new page, then wait for the mutex.
|
||||
if (mustWait || needNewPage) {
|
||||
wait(self->mutex.take());
|
||||
|
||||
// If we had to wait because the mutex was busy, then update needNewPage as another writer
|
||||
// would have changed the cursor state
|
||||
// Otherwise, taking the mutex would be immediate so no other writer could have run
|
||||
if (mustWait) {
|
||||
needNewPage = self->pageID == invalidLogicalPageID ||
|
||||
self->offset + bytesNeeded > self->queue->dataBytesPerPage;
|
||||
}
|
||||
}
|
||||
|
||||
// If we need a new page, add one.
|
||||
if (needNewPage) {
|
||||
debug_printf("FIFOQueue::Cursor(%s) write(%s) page is full, adding new page\n",
|
||||
self->toString().c_str(),
|
||||
::toString(item).c_str());
|
||||
LogicalPageID newPageID = wait(self->queue->pager->newPageID());
|
||||
self->addNewPage(newPageID, 0, true);
|
||||
++self->queue->numPages;
|
||||
wait(yield());
|
||||
}
|
||||
|
||||
debug_printf(
|
||||
"FIFOQueue::Cursor(%s) before write(%s)\n", self->toString().c_str(), ::toString(item).c_str());
|
||||
"FIFOQueue::Cursor(%s) write(%s) writing\n", self->toString().c_str(), ::toString(item).c_str());
|
||||
auto p = self->raw();
|
||||
Codec::writeToBytes(p->begin() + self->offset, item);
|
||||
self->offset += bytesNeeded;
|
||||
p->endOffset = self->offset;
|
||||
++self->queue->numEntries;
|
||||
|
||||
if (mustWait || needNewPage) {
|
||||
self->mutex.release();
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
void write(const T& item) {
|
||||
Promise<Void> p;
|
||||
operation = write_impl(this, item, p.getFuture());
|
||||
p.send(Void());
|
||||
// Start the write. It may complete immediately if no IO was being waited on
|
||||
Future<Void> w = write_impl(this, item);
|
||||
// If it didn't complete immediately, then store the future in operation
|
||||
if (!w.isReady()) {
|
||||
writeOperations = writeOperations && w;
|
||||
}
|
||||
}
|
||||
|
||||
// Read the next item at the cursor (if <= upperBound), moving to a new page first if the current page is
|
||||
// exhausted
|
||||
ACTOR static Future<Optional<T>> readNext_impl(Cursor* self, Optional<T> upperBound, Future<Void> start) {
|
||||
ASSERT(self->mode == POP || self->mode == READONLY);
|
||||
// If readNext() cannot complete immediately, it will route to here
|
||||
// The mutex will be taken if locked is false
|
||||
// The next page will be waited for if load is true
|
||||
// Only mutex holders will wait on the page read.
|
||||
ACTOR static Future<Optional<T>> waitThenReadNext(Cursor* self,
|
||||
Optional<T> upperBound,
|
||||
bool locked,
|
||||
bool load) {
|
||||
// Lock the mutex if it wasn't already
|
||||
if (!locked) {
|
||||
debug_printf("FIFOQueue::Cursor(%s) waitThenReadNext locking mutex\n", self->toString().c_str());
|
||||
wait(self->mutex.take());
|
||||
}
|
||||
|
||||
// Wait for the previous operation to finish
|
||||
state Future<Void> previous = self->operation;
|
||||
wait(start);
|
||||
wait(previous);
|
||||
if (load) {
|
||||
debug_printf("FIFOQueue::Cursor(%s) waitThenReadNext waiting for page load\n",
|
||||
self->toString().c_str());
|
||||
wait(success(self->nextPageReader));
|
||||
}
|
||||
|
||||
debug_printf("FIFOQueue::Cursor(%s) readNext begin\n", self->toString().c_str());
|
||||
if (self->pageID == invalidLogicalPageID || self->pageID == self->endPageID) {
|
||||
debug_printf("FIFOQueue::Cursor(%s) readNext returning nothing\n", self->toString().c_str());
|
||||
Optional<T> result = wait(self->readNext(upperBound, true));
|
||||
|
||||
// If this actor instance locked the mutex, then unlock it.
|
||||
if (!locked) {
|
||||
debug_printf("FIFOQueue::Cursor(%s) waitThenReadNext unlocking mutex\n", self->toString().c_str());
|
||||
self->mutex.release();
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// Read the next item at the cursor (if < upperBound), moving to a new page first if the current page is
|
||||
// exhausted If locked is true, this call owns the mutex, which would have been locked by readNext() before a
|
||||
// recursive call
|
||||
Future<Optional<T>> readNext(const Optional<T>& upperBound = {}, bool locked = false) {
|
||||
if ((mode != POP && mode != READONLY) || pageID == invalidLogicalPageID || pageID == endPageID) {
|
||||
debug_printf("FIFOQueue::Cursor(%s) readNext returning nothing\n", toString().c_str());
|
||||
return Optional<T>();
|
||||
}
|
||||
|
||||
// We now know we are pointing to PageID and it should be read and used, but it may not be loaded yet.
|
||||
if (!self->page) {
|
||||
wait(self->loadPage());
|
||||
wait(yield());
|
||||
// If we don't own the mutex and it's not available then acquire it
|
||||
if (!locked && isBusy()) {
|
||||
return waitThenReadNext(this, upperBound, false, false);
|
||||
}
|
||||
|
||||
auto p = self->raw();
|
||||
debug_printf("FIFOQueue::Cursor(%s) readNext reading at current position\n", self->toString().c_str());
|
||||
ASSERT(self->offset < p->endOffset);
|
||||
// We now know pageID is valid and should be used, but page might not point to it yet
|
||||
if (!page) {
|
||||
debug_printf("FIFOQueue::Cursor(%s) loading\n", toString().c_str());
|
||||
|
||||
// If the next pageID loading or loaded is not the page we should be reading then restart the load
|
||||
// nextPageID coud be different because it could be invalid or it could be no longer relevant
|
||||
// if the previous commit added new pages to the front of the queue.
|
||||
if (pageID != nextPageID) {
|
||||
debug_printf("FIFOQueue::Cursor(%s) reloading\n", toString().c_str());
|
||||
startNextPageLoad(pageID);
|
||||
}
|
||||
|
||||
if (!nextPageReader.isReady()) {
|
||||
return waitThenReadNext(this, upperBound, locked, true);
|
||||
}
|
||||
|
||||
page = nextPageReader.get();
|
||||
|
||||
// Start loading the next page if it's not the end page
|
||||
auto p = raw();
|
||||
if (p->nextPageID != endPageID) {
|
||||
startNextPageLoad(p->nextPageID);
|
||||
} else {
|
||||
// Prevent a future next page read from reusing the same result as page would have to be updated
|
||||
// before the queue would read it again
|
||||
nextPageID = invalidLogicalPageID;
|
||||
}
|
||||
}
|
||||
|
||||
auto p = raw();
|
||||
debug_printf("FIFOQueue::Cursor(%s) readNext reading at current position\n", toString().c_str());
|
||||
ASSERT(offset < p->endOffset);
|
||||
int bytesRead;
|
||||
T result = Codec::readFromBytes(p->begin() + self->offset, bytesRead);
|
||||
const T result = Codec::readFromBytes(p->begin() + offset, bytesRead);
|
||||
|
||||
if (upperBound.present() && upperBound.get() < result) {
|
||||
debug_printf("FIFOQueue::Cursor(%s) not popping %s, exceeds upper bound %s\n",
|
||||
self->toString().c_str(),
|
||||
toString().c_str(),
|
||||
::toString(result).c_str(),
|
||||
::toString(upperBound.get()).c_str());
|
||||
|
||||
return Optional<T>();
|
||||
}
|
||||
|
||||
self->offset += bytesRead;
|
||||
if (self->mode == POP) {
|
||||
--self->queue->numEntries;
|
||||
offset += bytesRead;
|
||||
if (mode == POP) {
|
||||
--queue->numEntries;
|
||||
}
|
||||
debug_printf(
|
||||
"FIFOQueue::Cursor(%s) after read of %s\n", self->toString().c_str(), ::toString(result).c_str());
|
||||
ASSERT(self->offset <= p->endOffset);
|
||||
debug_printf("FIFOQueue::Cursor(%s) after read of %s\n", toString().c_str(), ::toString(result).c_str());
|
||||
ASSERT(offset <= p->endOffset);
|
||||
|
||||
if (self->offset == p->endOffset) {
|
||||
debug_printf("FIFOQueue::Cursor(%s) Page exhausted\n", self->toString().c_str());
|
||||
LogicalPageID oldPageID = self->pageID;
|
||||
self->pageID = p->nextPageID;
|
||||
self->offset = p->nextOffset;
|
||||
if (self->mode == POP) {
|
||||
--self->queue->numPages;
|
||||
// If this page is exhausted, start reading the next page for the next readNext() to use, unless it's the
|
||||
// tail page
|
||||
if (offset == p->endOffset) {
|
||||
debug_printf("FIFOQueue::Cursor(%s) Page exhausted\n", toString().c_str());
|
||||
LogicalPageID oldPageID = pageID;
|
||||
pageID = p->nextPageID;
|
||||
offset = p->nextOffset;
|
||||
|
||||
// If pageID isn't the tail page and nextPageID isn't pageID then start loading the next page
|
||||
if (pageID != endPageID && nextPageID != pageID) {
|
||||
startNextPageLoad(pageID);
|
||||
}
|
||||
self->page.clear();
|
||||
debug_printf("FIFOQueue::Cursor(%s) readNext page exhausted, moved to new page\n",
|
||||
self->toString().c_str());
|
||||
|
||||
if (self->mode == POP) {
|
||||
if (mode == POP) {
|
||||
--queue->numPages;
|
||||
}
|
||||
page.clear();
|
||||
debug_printf("FIFOQueue::Cursor(%s) readNext page exhausted, moved to new page\n", toString().c_str());
|
||||
|
||||
if (mode == POP) {
|
||||
// Freeing the old page must happen after advancing the cursor and clearing the page reference
|
||||
// because freePage() could cause a push onto a queue that causes a newPageID() call which could
|
||||
// pop() from this very same queue. Queue pages are freed at page 0 because they can be reused after
|
||||
// the next commit.
|
||||
self->queue->pager->freePage(oldPageID, 0);
|
||||
// pop() from this very same queue. Queue pages are freed at version 0 because they can be reused
|
||||
// after the next commit.
|
||||
queue->pager->freePage(oldPageID, 0);
|
||||
}
|
||||
}
|
||||
|
||||
debug_printf("FIFOQueue(%s) %s(upperBound=%s) -> %s\n",
|
||||
self->queue->name.c_str(),
|
||||
(self->mode == POP ? "pop" : "peek"),
|
||||
queue->name.c_str(),
|
||||
(mode == POP ? "pop" : "peek"),
|
||||
::toString(upperBound).c_str(),
|
||||
::toString(result).c_str());
|
||||
return result;
|
||||
}
|
||||
|
||||
// Read and move past the next item if is <= upperBound or if upperBound is not present
|
||||
Future<Optional<T>> readNext(const Optional<T>& upperBound = {}) {
|
||||
if (mode == NONE) {
|
||||
return Optional<T>();
|
||||
}
|
||||
Promise<Void> p;
|
||||
Future<Optional<T>> read = readNext_impl(this, upperBound, p.getFuture());
|
||||
operation = success(read);
|
||||
p.send(Void());
|
||||
return read;
|
||||
return Optional<T>(result);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -534,6 +649,7 @@ public:
|
|||
void create(IPager2* p, LogicalPageID newPageID, std::string queueName) {
|
||||
debug_printf("FIFOQueue(%s) create from page %s\n", queueName.c_str(), toString(newPageID).c_str());
|
||||
pager = p;
|
||||
pagerError = pager->getError();
|
||||
name = queueName;
|
||||
numPages = 1;
|
||||
numEntries = 0;
|
||||
|
@ -549,6 +665,7 @@ public:
|
|||
void recover(IPager2* p, const QueueState& qs, std::string queueName) {
|
||||
debug_printf("FIFOQueue(%s) recover from queue state %s\n", queueName.c_str(), qs.toString().c_str());
|
||||
pager = p;
|
||||
pagerError = pager->getError();
|
||||
name = queueName;
|
||||
numPages = qs.numPages;
|
||||
numEntries = qs.numEntries;
|
||||
|
@ -614,15 +731,15 @@ public:
|
|||
headWriter.write(item);
|
||||
}
|
||||
|
||||
// Wait until the most recently started operations on each cursor as of now are ready
|
||||
Future<Void> notBusy() {
|
||||
return headWriter.notBusy() && headReader.notBusy() && tailWriter.notBusy() && ready(newTailPage);
|
||||
bool isBusy() {
|
||||
return headWriter.isBusy() || headReader.isBusy() || tailWriter.isBusy() || !newTailPage.isReady();
|
||||
}
|
||||
|
||||
// Returns true if any most recently started operations on any cursors are not ready
|
||||
bool busy() {
|
||||
return !headWriter.notBusy().isReady() || !headReader.notBusy().isReady() || !tailWriter.notBusy().isReady() ||
|
||||
!newTailPage.isReady();
|
||||
// Wait until all previously started operations on each cursor are done and the new tail page is ready
|
||||
Future<Void> notBusy() {
|
||||
auto f = headWriter.notBusy() && headReader.notBusy() && tailWriter.notBusy() && ready(newTailPage);
|
||||
debug_printf("FIFOQueue(%s) notBusy future ready=%d\n", name.c_str(), f.isReady());
|
||||
return f;
|
||||
}
|
||||
|
||||
// preFlush() prepares this queue to be flushed to disk, but doesn't actually do it so the queue can still
|
||||
|
@ -631,7 +748,7 @@ public:
|
|||
//
|
||||
// If one or more queues are used by their pager in newPageID() or freePage() operations, then preFlush()
|
||||
// must be called on each of them inside a loop that runs until each of the preFlush() calls have returned
|
||||
// false.
|
||||
// false twice in a row.
|
||||
//
|
||||
// The reason for all this is that:
|
||||
// - queue pop() can call pager->freePage() which can call push() on the same or another queue
|
||||
|
@ -644,7 +761,7 @@ public:
|
|||
|
||||
// Completion of the pending operations as of the start of notBusy() could have began new operations,
|
||||
// so see if any work is pending now.
|
||||
bool workPending = self->busy();
|
||||
bool workPending = self->isBusy();
|
||||
|
||||
if (!workPending) {
|
||||
// A newly created or flushed queue starts out in a state where its tail page to be written to is empty.
|
||||
|
@ -653,8 +770,12 @@ public:
|
|||
// the next flush. (This is explained more at the top of FIFOQueue but it is because queue pages can only
|
||||
// be written once because once they contain durable data a second write to link to a new page could corrupt
|
||||
// the existing data if the subsequent commit never succeeds.)
|
||||
//
|
||||
// If the newTailPage future is ready but it's an invalid page and the tail page we are currently pointed to
|
||||
// has had items added to it, then get a new tail page ID.
|
||||
if (self->newTailPage.isReady() && self->newTailPage.get() == invalidLogicalPageID &&
|
||||
self->tailWriter.pendingWrites()) {
|
||||
self->tailWriter.pendingTailWrites()) {
|
||||
debug_printf("FIFOQueue(%s) preFlush starting to get new page ID\n", self->name.c_str());
|
||||
self->newTailPage = self->pager->newPageID();
|
||||
workPending = true;
|
||||
}
|
||||
|
@ -668,7 +789,7 @@ public:
|
|||
|
||||
void finishFlush() {
|
||||
debug_printf("FIFOQueue(%s) finishFlush start\n", name.c_str());
|
||||
ASSERT(!busy());
|
||||
ASSERT(!isBusy());
|
||||
|
||||
// If a new tail page was allocated, link the last page of the tail writer to it.
|
||||
if (newTailPage.get() != invalidLogicalPageID) {
|
||||
|
@ -677,14 +798,14 @@ public:
|
|||
++numPages;
|
||||
|
||||
// newPage() should be ready immediately since a pageID is being explicitly passed.
|
||||
ASSERT(tailWriter.notBusy().isReady());
|
||||
ASSERT(!tailWriter.isBusy());
|
||||
|
||||
newTailPage = invalidLogicalPageID;
|
||||
}
|
||||
|
||||
// If the headWriter wrote anything, link its tail page to the headReader position and point the headReader
|
||||
// to the start of the headWriter
|
||||
if (headWriter.pendingWrites()) {
|
||||
if (headWriter.pendingTailWrites()) {
|
||||
headWriter.addNewPage(headReader.pageID, headReader.offset, false);
|
||||
headReader.pageID = headWriter.firstPageIDWritten;
|
||||
headReader.offset = 0;
|
||||
|
@ -715,6 +836,8 @@ public:
|
|||
Future<Void> flush() { return flush_impl(this); }
|
||||
|
||||
IPager2* pager;
|
||||
Future<Void> pagerError;
|
||||
|
||||
int64_t numPages;
|
||||
int64_t numEntries;
|
||||
int dataBytesPerPage;
|
||||
|
@ -1220,7 +1343,7 @@ public:
|
|||
|
||||
Type getType() const { return getTypeOf(newPageID); }
|
||||
|
||||
bool operator<(const RemappedPage& rhs) { return version < rhs.version; }
|
||||
bool operator<(const RemappedPage& rhs) const { return version < rhs.version; }
|
||||
|
||||
std::string toString() const {
|
||||
return format("RemappedPage(%c: %s -> %s %s}",
|
||||
|
@ -1405,8 +1528,8 @@ public:
|
|||
|
||||
// Create queues
|
||||
self->freeList.create(self, self->newLastPageID(), "FreeList");
|
||||
self->delayedFreeList.create(self, self->newLastPageID(), "delayedFreeList");
|
||||
self->remapQueue.create(self, self->newLastPageID(), "remapQueue");
|
||||
self->delayedFreeList.create(self, self->newLastPageID(), "DelayedFreeList");
|
||||
self->remapQueue.create(self, self->newLastPageID(), "RemapQueue");
|
||||
|
||||
// The first commit() below will flush the queues and update the queue states in the header,
|
||||
// but since the queues will not be used between now and then their states will not change.
|
||||
|
@ -1696,6 +1819,7 @@ public:
|
|||
.detail("CalculatedChecksum", p->calculateChecksum(pageID))
|
||||
.detail("ChecksumInPage", p->getChecksum())
|
||||
.error(e);
|
||||
ASSERT(false);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -2003,15 +2127,26 @@ public:
|
|||
wait(self->remapQueue.flush());
|
||||
|
||||
// Flush the free list and delayed free list queues together as they are used by freePage() and newPageID()
|
||||
// Since each queue's preFlush can create work for the other, we must see preflush return false for both
|
||||
// twice in row.
|
||||
state int clear = 0;
|
||||
loop {
|
||||
state bool freeBusy = wait(self->freeList.preFlush());
|
||||
state bool delayedFreeBusy = wait(self->delayedFreeList.preFlush());
|
||||
debug_printf("DWALPager(%s) flushQueues freeBusy=%d delayedFreeBusy=%d\n",
|
||||
self->filename.c_str(),
|
||||
freeBusy,
|
||||
delayedFreeBusy);
|
||||
|
||||
// Once preFlush() returns false for both queues then there are no more operations pending
|
||||
// on either queue. If preFlush() returns true for either queue in one loop execution then
|
||||
// it could have generated new work for itself or the other queue.
|
||||
if (!freeBusy && !delayedFreeBusy) {
|
||||
break;
|
||||
if (++clear == 2) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
clear = 0;
|
||||
}
|
||||
}
|
||||
self->freeList.finishFlush();
|
||||
|
@ -7846,8 +7981,8 @@ TEST_CASE("/redwood/correctness/btree") {
|
|||
state std::string fileName = params.get("fileName").orDefault("unittest_pageFile.redwood");
|
||||
IPager2* pager;
|
||||
|
||||
state bool serialTest = params.getInt("serialTest").orDefault(deterministicRandom()->coinflip());
|
||||
state bool shortTest = params.getInt("shortTest").orDefault(deterministicRandom()->coinflip());
|
||||
state bool serialTest = params.getInt("serialTest").orDefault(deterministicRandom()->random01() < 0.25);
|
||||
state bool shortTest = params.getInt("shortTest").orDefault(deterministicRandom()->random01() < 0.25);
|
||||
|
||||
state int pageSize =
|
||||
shortTest ? 200 : (deterministicRandom()->coinflip() ? 4096 : deterministicRandom()->randomInt(200, 400));
|
||||
|
@ -7866,8 +8001,8 @@ TEST_CASE("/redwood/correctness/btree") {
|
|||
params.getDouble("clearSingleKeyProbability").orDefault(deterministicRandom()->random01());
|
||||
state double clearPostSetProbability =
|
||||
params.getDouble("clearPostSetProbability").orDefault(deterministicRandom()->random01() * .1);
|
||||
state double coldStartProbability = params.getDouble("coldStartProbability")
|
||||
.orDefault(pagerMemoryOnly ? 0 : (deterministicRandom()->random01() * 0.3));
|
||||
state double coldStartProbability =
|
||||
params.getDouble("coldStartProbability").orDefault(pagerMemoryOnly ? 0 : (deterministicRandom()->random01()));
|
||||
state double advanceOldVersionProbability =
|
||||
params.getDouble("advanceOldVersionProbability").orDefault(deterministicRandom()->random01());
|
||||
state int64_t cacheSizeBytes =
|
||||
|
@ -8043,8 +8178,9 @@ TEST_CASE("/redwood/correctness/btree") {
|
|||
mutationBytesThisCommit >= mutationBytesTargetThisCommit) {
|
||||
// Wait for previous commit to finish
|
||||
wait(commit);
|
||||
printf(
|
||||
"Committed. Next commit %d bytes, %" PRId64 " bytes.", mutationBytesThisCommit, mutationBytes.get());
|
||||
printf("Last commit complete. Next commit %d bytes, %" PRId64 " bytes committed so far.",
|
||||
mutationBytesThisCommit,
|
||||
mutationBytes.get() - mutationBytesThisCommit);
|
||||
printf(" Stats: Insert %.2f MB/s ClearedKeys %.2f MB/s Total %.2f\n",
|
||||
(keyBytesInserted.rate() + valueBytesInserted.rate()) / 1e6,
|
||||
keyBytesCleared.rate() / 1e6,
|
||||
|
@ -8063,7 +8199,8 @@ TEST_CASE("/redwood/correctness/btree") {
|
|||
commit = map(btree->commit(), [=, &ops = totalPageOps](Void) {
|
||||
// Update pager ops before clearing metrics
|
||||
ops += g_redwoodMetrics.pageOps();
|
||||
printf("PageOps %" PRId64 "/%" PRId64 " (%.2f%%) VerificationMapEntries %d/%d (%.2f%%)\n",
|
||||
printf("Committed %s PageOps %" PRId64 "/%" PRId64 " (%.2f%%) VerificationMapEntries %d/%d (%.2f%%)\n",
|
||||
toString(v).c_str(),
|
||||
ops,
|
||||
targetPageOps,
|
||||
ops * 100.0 / targetPageOps,
|
||||
|
@ -8091,7 +8228,7 @@ TEST_CASE("/redwood/correctness/btree") {
|
|||
mutationBytesTargetThisCommit = randomSize(maxCommitSize);
|
||||
|
||||
// Recover from disk at random
|
||||
if (!serialTest && deterministicRandom()->random01() < coldStartProbability) {
|
||||
if (!pagerMemoryOnly && deterministicRandom()->random01() < coldStartProbability) {
|
||||
printf("Recovering from disk after next commit.\n");
|
||||
|
||||
// Wait for outstanding commit
|
||||
|
@ -8114,8 +8251,11 @@ TEST_CASE("/redwood/correctness/btree") {
|
|||
wait(btree->init());
|
||||
|
||||
Version v = btree->getLatestVersion();
|
||||
printf("Recovered from disk. Latest recovered version %" PRId64 " highest written version %" PRId64
|
||||
"\n",
|
||||
v,
|
||||
version);
|
||||
ASSERT(v == version);
|
||||
printf("Recovered from disk. Latest version %" PRId64 "\n", v);
|
||||
|
||||
// Create new promise stream and start the verifier again
|
||||
committedVersions = PromiseStream<Version>();
|
||||
|
|
Loading…
Reference in New Issue