From 859143e388c793aa2b6b824bbce849ddd21fb53f Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Mon, 5 Jul 2021 01:33:23 -0700 Subject: [PATCH 01/15] Added PriorityMultiLock, an efficient N-user lock with P priority levels. Replaced FlowLock in Redwood KVS wrapper with a PriorityMultiLock as a test and because it's more efficient. --- fdbserver/VersionedBTree.actor.cpp | 97 ++++++++++++++++++++++++++++-- 1 file changed, 91 insertions(+), 6 deletions(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 19f53cb65f..e7865570a8 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -18,6 +18,7 @@ * limitations under the License. */ +#include "flow/IRandom.h" #include "flow/flow.h" #include "fdbserver/IPager.h" #include "fdbclient/Tuple.h" @@ -94,6 +95,91 @@ void writePrefixedLines(FILE* fout, std::string prefix, std::string msg) { } } +class PriorityMultiLock { +public: + // Waiting on the lock returns a Lock, which is really just a Promise + // Calling release() is not necessary, it exists in case the Lock holder wants to explicitly release + // the Lock before it goes out of scope. + struct Lock { + void release() { promise.send(Void()); } + + // This is exposed in case the caller wants to use/copy it directly + Promise promise; + }; + +private: + struct Slot { + Promise lock; + }; + + typedef Deque Queue; + +public: + PriorityMultiLock(int concurrency, int levels) : concurrency(concurrency), outstanding(0), queues(levels) { + } + + Future lock(int priority = 0) { + debug_printf("lock begin %d/%d\n", outstanding, concurrency); + + if(outstanding < concurrency) { + ++outstanding; + Lock p; + addReleaser(p); + debug_printf("lock exit immediate %d/%d\n", outstanding, concurrency); + return p; + } + + Queue &q = queues[priority]; + q.emplace_back(Slot()); + debug_printf("lock exit queued %d/%d\n", outstanding, concurrency); + return q.back().lock.getFuture(); + } + +private: + void next() { + debug_printf("next %d/%d\n", outstanding, concurrency); + // Clean up any finished releasers at the front of the releasers queue + while(releasers.front().isReady()) { + debug_printf("next cleaned up releaser %d/%d\n", outstanding, concurrency); + releasers.pop_front(); + } + + // Try to start the next task, highest priorities first. + // If successful, the outstanding count does not change + for(int priority = queues.size() - 1; priority >= 0; --priority) { + auto &q = queues[priority]; + if(!q.empty()) { + debug_printf("next found waiter at priority %d, %d/%d\n", priority, outstanding, concurrency); + Slot s = q.front(); + q.pop_front(); + Lock lock; + addReleaser(lock); + debug_printf("next sending %d/%d\n", outstanding, concurrency); + s.lock.send(lock); + debug_printf("next exit sent %d/%d\n", outstanding, concurrency); + return; + } + } + + // There were no tasks to start, so outstanding is reduced. + --outstanding; + debug_printf("next exit nowaiters %d/%d\n", outstanding, concurrency); + } + + void addReleaser(Lock &lock) { + releasers.push_back(map(yieldedFuture(ready(lock.promise.getFuture())), [=](Void) { + next(); + return Void(); + })); + } + + int concurrency; + int outstanding; + std::vector queues; + Deque> releasers; + Future error; +}; + // Some convenience functions for debugging to stringify various structures // Classes can add compatibility by either specializing toString or implementing // std::string toString() const; @@ -6538,7 +6624,7 @@ RedwoodRecordRef VersionedBTree::dbEnd(LiteralStringRef("\xff\xff\xff\xff\xff")) class KeyValueStoreRedwoodUnversioned : public IKeyValueStore { public: KeyValueStoreRedwoodUnversioned(std::string filePrefix, UID logID) - : m_filePrefix(filePrefix), m_concurrentReads(SERVER_KNOBS->REDWOOD_KVSTORE_CONCURRENT_READS), + : m_filePrefix(filePrefix), m_concurrentReads(SERVER_KNOBS->REDWOOD_KVSTORE_CONCURRENT_READS, 1), prefetch(SERVER_KNOBS->REDWOOD_KVSTORE_RANGE_PREFETCH) { int pageSize = @@ -6635,8 +6721,8 @@ public: state VersionedBTree::BTreeCursor cur; wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion())); - wait(self->m_concurrentReads.take()); - state FlowLock::Releaser releaser(self->m_concurrentReads); + ASSERT(!self->m_error.isSet()); + state PriorityMultiLock::Lock lock = wait(self->m_concurrentReads.lock()); ++g_redwoodMetrics.opGetRange; state RangeResult result; @@ -6757,8 +6843,7 @@ public: state VersionedBTree::BTreeCursor cur; wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion())); - wait(self->m_concurrentReads.take()); - state FlowLock::Releaser releaser(self->m_concurrentReads); + state PriorityMultiLock::Lock lock = wait(self->m_concurrentReads.lock()); ++g_redwoodMetrics.opGet; wait(cur.seekGTE(key)); @@ -6796,7 +6881,7 @@ private: Future m_init; Promise m_closed; Promise m_error; - FlowLock m_concurrentReads; + PriorityMultiLock m_concurrentReads; bool prefetch; template From f4bebb5ce676eebee846dbdb1ccf4c4ee2078571 Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Mon, 5 Jul 2021 21:50:33 -0700 Subject: [PATCH 02/15] PriorityMultiLock release callbacks yield less often. --- fdbserver/VersionedBTree.actor.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index e7865570a8..975df4eadf 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -166,8 +166,16 @@ private: debug_printf("next exit nowaiters %d/%d\n", outstanding, concurrency); } - void addReleaser(Lock &lock) { - releasers.push_back(map(yieldedFuture(ready(lock.promise.getFuture())), [=](Void) { + void addReleaser(Lock& lock) { + static int sinceYield = 0; + + Future released = ready(lock.promise.getFuture()); + if (++sinceYield == 1000) { + sinceYield = 0; + released = yieldedFuture(released); + } + + releasers.push_back(map(released, [=](Void) { next(); return Void(); })); From 14724f3e5439b50a9fb181ce608e07c780d37235 Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Tue, 6 Jul 2021 23:24:57 -0700 Subject: [PATCH 03/15] Applied clang-format. --- fdbserver/VersionedBTree.actor.cpp | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 975df4eadf..bb1fec054e 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -115,13 +115,12 @@ private: typedef Deque Queue; public: - PriorityMultiLock(int concurrency, int levels) : concurrency(concurrency), outstanding(0), queues(levels) { - } + PriorityMultiLock(int concurrency, int levels) : concurrency(concurrency), outstanding(0), queues(levels) {} Future lock(int priority = 0) { debug_printf("lock begin %d/%d\n", outstanding, concurrency); - if(outstanding < concurrency) { + if (outstanding < concurrency) { ++outstanding; Lock p; addReleaser(p); @@ -129,7 +128,7 @@ public: return p; } - Queue &q = queues[priority]; + Queue& q = queues[priority]; q.emplace_back(Slot()); debug_printf("lock exit queued %d/%d\n", outstanding, concurrency); return q.back().lock.getFuture(); @@ -139,16 +138,16 @@ private: void next() { debug_printf("next %d/%d\n", outstanding, concurrency); // Clean up any finished releasers at the front of the releasers queue - while(releasers.front().isReady()) { + while (releasers.front().isReady()) { debug_printf("next cleaned up releaser %d/%d\n", outstanding, concurrency); releasers.pop_front(); } // Try to start the next task, highest priorities first. // If successful, the outstanding count does not change - for(int priority = queues.size() - 1; priority >= 0; --priority) { - auto &q = queues[priority]; - if(!q.empty()) { + for (int priority = queues.size() - 1; priority >= 0; --priority) { + auto& q = queues[priority]; + if (!q.empty()) { debug_printf("next found waiter at priority %d, %d/%d\n", priority, outstanding, concurrency); Slot s = q.front(); q.pop_front(); @@ -920,11 +919,11 @@ public: page.clear(); debug_printf("FIFOQueue::Cursor(%s) readNext page exhausted, moved to new page\n", toString().c_str()); if (mode == POP) { - if(!queue->usesExtents) { + if (!queue->usesExtents) { // Freeing the old page must happen after advancing the cursor and clearing the page reference // because freePage() could cause a push onto a queue that causes a newPageID() call which could - // pop() from this very same queue. Queue pages are freed at version 0 because they can be reused - // after the next commit. + // pop() from this very same queue. Queue pages are freed at version 0 because they can be + // reused after the next commit. queue->pager->freePage(oldPageID, 0); } else if (extentCurPageID == extentEndPageID) { // Figure out the beginning of the extent @@ -6514,15 +6513,14 @@ public: if (directionForward) { // If there is no right sibling or its lower boundary is greater // or equal to than the range end then stop. - if(!c.moveNext() || c.get().key >= rangeEnd) { + if (!c.moveNext() || c.get().key >= rangeEnd) { break; } - } - else { + } else { // Prefetching left siblings // If the current leaf lower boundary is less than or equal to the range end // or there is no left sibling then stop - if(c.get().key <= rangeEnd || !c.movePrev()) { + if (c.get().key <= rangeEnd || !c.movePrev()) { break; } } From 15a62c417af33bc0cf73f259b634bd559a73995a Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Thu, 8 Jul 2021 02:18:22 -0700 Subject: [PATCH 04/15] Less overhead per waiter execution in PriorityMultiLock. --- fdbserver/VersionedBTree.actor.cpp | 112 +++++++++++++++-------------- 1 file changed, 59 insertions(+), 53 deletions(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index bb1fec054e..bd5adfd578 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -95,6 +95,8 @@ void writePrefixedLines(FILE* fout, std::string prefix, std::string msg) { } } +// Strict priority lock with N concurrent holders. When a lock is granted to a waiter, it will always be +// the highest priority waiter. class PriorityMultiLock { public: // Waiting on the lock returns a Lock, which is really just a Promise @@ -108,83 +110,87 @@ public: }; private: - struct Slot { - Promise lock; - }; - + typedef Promise Slot; typedef Deque Queue; public: - PriorityMultiLock(int concurrency, int levels) : concurrency(concurrency), outstanding(0), queues(levels) {} + PriorityMultiLock(int concurrency, int levels) : concurrency(concurrency), outstanding(0) { + waiters.resize(levels); + fRunner = runner(this); + } Future lock(int priority = 0) { debug_printf("lock begin %d/%d\n", outstanding, concurrency); + // This shortcut may enable a waiter to jump the line when the releaser loop yields if (outstanding < concurrency) { ++outstanding; Lock p; - addReleaser(p); + addRunner(p); debug_printf("lock exit immediate %d/%d\n", outstanding, concurrency); return p; } - Queue& q = queues[priority]; - q.emplace_back(Slot()); + Slot s; + waiters[priority].push_back(s); debug_printf("lock exit queued %d/%d\n", outstanding, concurrency); - return q.back().lock.getFuture(); + return s.getFuture(); } private: - void next() { - debug_printf("next %d/%d\n", outstanding, concurrency); - // Clean up any finished releasers at the front of the releasers queue - while (releasers.front().isReady()) { - debug_printf("next cleaned up releaser %d/%d\n", outstanding, concurrency); - releasers.pop_front(); - } - - // Try to start the next task, highest priorities first. - // If successful, the outstanding count does not change - for (int priority = queues.size() - 1; priority >= 0; --priority) { - auto& q = queues[priority]; - if (!q.empty()) { - debug_printf("next found waiter at priority %d, %d/%d\n", priority, outstanding, concurrency); - Slot s = q.front(); - q.pop_front(); - Lock lock; - addReleaser(lock); - debug_printf("next sending %d/%d\n", outstanding, concurrency); - s.lock.send(lock); - debug_printf("next exit sent %d/%d\n", outstanding, concurrency); - return; - } - } - - // There were no tasks to start, so outstanding is reduced. - --outstanding; - debug_printf("next exit nowaiters %d/%d\n", outstanding, concurrency); - } - - void addReleaser(Lock& lock) { - static int sinceYield = 0; - - Future released = ready(lock.promise.getFuture()); - if (++sinceYield == 1000) { - sinceYield = 0; - released = yieldedFuture(released); - } - - releasers.push_back(map(released, [=](Void) { - next(); + void addRunner(Lock& lock) { + runners.push_back(map(ready(lock.promise.getFuture()), [=](Void) { + --outstanding; + release.trigger(); return Void(); })); } + ACTOR static Future runner(PriorityMultiLock* self) { + state int sinceYield = 0; + + loop { + // Cleanup finished runner futures at the front of the runner queue. + while (!self->runners.empty() && self->runners.front().isReady()) { + self->runners.pop_front(); + } + + // Wait for a runner to release its lock + wait(self->release.onTrigger()); + debug_printf("runner wakeup %d/%d\n", self->outstanding, self->concurrency); + + if (++sinceYield == 100) { + sinceYield = 0; + wait(yield()); + } + + // Start tasks, highest priority first + for (int priority = self->waiters.size() - 1; priority >= 0 && self->outstanding < self->concurrency; + --priority) { + auto& q = self->waiters[priority]; + while (!q.empty() && self->outstanding < self->concurrency) { + ++self->outstanding; + debug_printf("Running next waiter at priority=%d levelSize=%d slots %d/%d\n", + priority, + q.size(), + self->outstanding, + self->concurrency); + Slot s = q.front(); + q.pop_front(); + Lock lock; + self->addRunner(lock); + s.send(lock); + } + } + } + } + int concurrency; int outstanding; - std::vector queues; - Deque> releasers; - Future error; + std::vector waiters; + Deque> runners; + Future fRunner; + AsyncTrigger release; }; // Some convenience functions for debugging to stringify various structures From ac39fd65a7e9223c8819d16d4f63a2ace0f0ad89 Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Thu, 8 Jul 2021 03:20:33 -0700 Subject: [PATCH 05/15] Bug fixes in PriorityMultiLock related to destruction during a waiter callback. Removed a waitOrError() because it throws internal_error and shouldn't be needed anyway. --- fdbserver/VersionedBTree.actor.cpp | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index bd5adfd578..7d30878f0e 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -119,6 +119,10 @@ public: fRunner = runner(this); } + ~PriorityMultiLock() { + debug_printf("destruct"); + } + Future lock(int priority = 0) { debug_printf("lock begin %d/%d\n", outstanding, concurrency); @@ -140,6 +144,7 @@ public: private: void addRunner(Lock& lock) { runners.push_back(map(ready(lock.promise.getFuture()), [=](Void) { + debug_printf("Runner callback\n"); --outstanding; release.trigger(); return Void(); @@ -148,6 +153,7 @@ private: ACTOR static Future runner(PriorityMultiLock* self) { state int sinceYield = 0; + state Future error = self->brokenOnDestruct.getFuture(); loop { // Cleanup finished runner futures at the front of the runner queue. @@ -159,7 +165,7 @@ private: wait(self->release.onTrigger()); debug_printf("runner wakeup %d/%d\n", self->outstanding, self->concurrency); - if (++sinceYield == 100) { + if (++sinceYield == 1000) { sinceYield = 0; wait(yield()); } @@ -167,6 +173,7 @@ private: // Start tasks, highest priority first for (int priority = self->waiters.size() - 1; priority >= 0 && self->outstanding < self->concurrency; --priority) { + debug_printf("checking priority %d\n", priority); auto& q = self->waiters[priority]; while (!q.empty() && self->outstanding < self->concurrency) { ++self->outstanding; @@ -178,8 +185,13 @@ private: Slot s = q.front(); q.pop_front(); Lock lock; - self->addRunner(lock); s.send(lock); + self->addRunner(lock); + + // Self may have been destructed during the lock callback + if(error.isReady()) { + throw error.getError(); + } } } } @@ -191,6 +203,7 @@ private: Deque> runners; Future fRunner; AsyncTrigger release; + Promise brokenOnDestruct; }; // Some convenience functions for debugging to stringify various structures @@ -565,7 +578,7 @@ public: nextPageID = id; debug_printf( "FIFOQueue::Cursor(%s) loadPage start id=%s\n", toString().c_str(), ::toString(nextPageID).c_str()); - nextPageReader = waitOrError(queue->pager->readPage(nextPageID, true), queue->pagerError); + nextPageReader = queue->pager->readPage(nextPageID, true); } Future loadExtent() { From f58d091148cfce9b48c7b18c6ef6656b971578c3 Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Fri, 9 Jul 2021 02:51:34 -0700 Subject: [PATCH 06/15] Refactored PriorityMultiLock again, fixed bug where slots could be under-utilized, reduced overhead when locks are released without waiting. --- fdbserver/VersionedBTree.actor.cpp | 86 ++++++++++++++++++++---------- 1 file changed, 59 insertions(+), 27 deletions(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 7d30878f0e..5e64914f41 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -95,9 +95,12 @@ void writePrefixedLines(FILE* fout, std::string prefix, std::string msg) { } } +#define PRIORITYMULTILOCK_DEBUG 0 + // Strict priority lock with N concurrent holders. When a lock is granted to a waiter, it will always be // the highest priority waiter. class PriorityMultiLock { + public: // Waiting on the lock returns a Lock, which is really just a Promise // Calling release() is not necessary, it exists in case the Lock holder wants to explicitly release @@ -113,40 +116,47 @@ private: typedef Promise Slot; typedef Deque Queue; +#if PRIORITYMULTILOCK_DEBUG +#define prioritylock_printf(...) printf(__VA_ARGS__) +#else +#define prioritylock_printf(...) +#endif + public: - PriorityMultiLock(int concurrency, int levels) : concurrency(concurrency), outstanding(0) { + PriorityMultiLock(int concurrency, int levels) : concurrency(concurrency), available(concurrency), waiting(0) { waiters.resize(levels); fRunner = runner(this); } - ~PriorityMultiLock() { - debug_printf("destruct"); - } + ~PriorityMultiLock() { prioritylock_printf("destruct"); } Future lock(int priority = 0) { - debug_printf("lock begin %d/%d\n", outstanding, concurrency); + prioritylock_printf("lock begin %s\n", toString().c_str()); // This shortcut may enable a waiter to jump the line when the releaser loop yields - if (outstanding < concurrency) { - ++outstanding; + if (available > 0) { + --available; Lock p; addRunner(p); - debug_printf("lock exit immediate %d/%d\n", outstanding, concurrency); + prioritylock_printf("lock exit immediate %s\n", toString().c_str()); return p; } Slot s; waiters[priority].push_back(s); - debug_printf("lock exit queued %d/%d\n", outstanding, concurrency); + ++waiting; + prioritylock_printf("lock exit queued %s\n", toString().c_str()); return s.getFuture(); } private: void addRunner(Lock& lock) { runners.push_back(map(ready(lock.promise.getFuture()), [=](Void) { - debug_printf("Runner callback\n"); - --outstanding; - release.trigger(); + prioritylock_printf("Lock released\n"); + ++available; + if (waiting > 0 || runners.size() > 100) { + release.trigger(); + } return Void(); })); } @@ -154,6 +164,7 @@ private: ACTOR static Future runner(PriorityMultiLock* self) { state int sinceYield = 0; state Future error = self->brokenOnDestruct.getFuture(); + state int maxPriority = self->waiters.size() - 1; loop { // Cleanup finished runner futures at the front of the runner queue. @@ -163,42 +174,63 @@ private: // Wait for a runner to release its lock wait(self->release.onTrigger()); - debug_printf("runner wakeup %d/%d\n", self->outstanding, self->concurrency); + prioritylock_printf("runner wakeup %s\n", self->toString().c_str()); if (++sinceYield == 1000) { sinceYield = 0; wait(yield()); } - // Start tasks, highest priority first - for (int priority = self->waiters.size() - 1; priority >= 0 && self->outstanding < self->concurrency; - --priority) { - debug_printf("checking priority %d\n", priority); + // While there are available slots and there are waiters, launch tasks + int priority = maxPriority; + + while (self->available > 0 && self->waiting > 0) { auto& q = self->waiters[priority]; - while (!q.empty() && self->outstanding < self->concurrency) { - ++self->outstanding; - debug_printf("Running next waiter at priority=%d levelSize=%d slots %d/%d\n", - priority, - q.size(), - self->outstanding, - self->concurrency); + prioritylock_printf( + "Checking priority=%d prioritySize=%d %s\n", priority, q.size(), self->toString().c_str()); + + while (!q.empty()) { Slot s = q.front(); q.pop_front(); + --self->waiting; Lock lock; + prioritylock_printf(" Running waiter priority=%d prioritySize=%d\n", priority, q.size()); s.send(lock); - self->addRunner(lock); // Self may have been destructed during the lock callback - if(error.isReady()) { + if (error.isReady()) { throw error.getError(); } + + // If the lock was not already released, add it to the runners future queue + if (lock.promise.canBeSet()) { + self->addRunner(lock); + + // A slot has been consumed, so stop reading from this queue if there aren't any more + if (--self->available == 0) { + break; + } + } + } + + // Wrap around to highest priority + if (priority == 0) { + priority = maxPriority; + } else { + --priority; } } } } + std::string toString() const { + return format( + "{ slots=%d/%d waiting=%d runners=%d }", (concurrency - available), concurrency, waiting, runners.size()); + } + int concurrency; - int outstanding; + int available; + int waiting; std::vector waiters; Deque> runners; Future fRunner; From fe575b995a10125ed31898c9edc917eb316f5e28 Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Fri, 9 Jul 2021 05:05:08 -0700 Subject: [PATCH 07/15] Added IO Priority to Redwood. --- fdbserver/IPager.h | 7 +- fdbserver/VersionedBTree.actor.cpp | 116 ++++++++++++++++++----------- 2 files changed, 78 insertions(+), 45 deletions(-) diff --git a/fdbserver/IPager.h b/fdbserver/IPager.h index aea7bbf9d2..a0e05361c6 100644 --- a/fdbserver/IPager.h +++ b/fdbserver/IPager.h @@ -128,7 +128,10 @@ public: class IPagerSnapshot { public: - virtual Future> getPhysicalPage(LogicalPageID pageID, bool cacheable, bool nohit) = 0; + virtual Future> getPhysicalPage(LogicalPageID pageID, + int priority, + bool cacheable, + bool nohit) = 0; virtual bool tryEvictPage(LogicalPageID id) = 0; virtual Version getVersion() const = 0; @@ -188,7 +191,7 @@ public: // Cacheable indicates that the page should be added to the page cache (if applicable?) as a result of this read. // NoHit indicates that the read should not be considered a cache hit, such as when preloading pages that are // considered likely to be needed soon. - virtual Future> readPage(LogicalPageID pageID, bool cacheable = true, bool noHit = false) = 0; + virtual Future> readPage(LogicalPageID pageID, int priority, bool cacheable, bool noHit) = 0; virtual Future> readExtent(LogicalPageID pageID) = 0; virtual void releaseExtentReadLock() = 0; diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 5e64914f41..273082759f 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -18,7 +18,9 @@ * limitations under the License. */ +#include "fdbserver/Knobs.h" #include "flow/IRandom.h" +#include "flow/Knobs.h" #include "flow/flow.h" #include "fdbserver/IPager.h" #include "fdbclient/Tuple.h" @@ -97,8 +99,15 @@ void writePrefixedLines(FILE* fout, std::string prefix, std::string msg) { #define PRIORITYMULTILOCK_DEBUG 0 -// Strict priority lock with N concurrent holders. When a lock is granted to a waiter, it will always be -// the highest priority waiter. +// A multi user lock with a concurrent holder limit where waiters are granted the lock according to +// an integer priority from 0 to maxPriority, inclusive, where higher integers are given priority. +// +// The interface is similar to FlowMutex except that lock holders can drop the lock to release it. +// +// Usage: +// Lock lock = wait(prioritylock.take(priorityLevel)); +// lock.release(); // Explicit release, or +// // let lock and all copies of lock go out of scope to release class PriorityMultiLock { public: @@ -123,14 +132,14 @@ private: #endif public: - PriorityMultiLock(int concurrency, int levels) : concurrency(concurrency), available(concurrency), waiting(0) { - waiters.resize(levels); + PriorityMultiLock(int concurrency, int maxPriority) : concurrency(concurrency), available(concurrency), waiting(0) { + waiters.resize(maxPriority + 1); fRunner = runner(this); } ~PriorityMultiLock() { prioritylock_printf("destruct"); } - Future lock(int priority = 0) { + Future lock(int priority) { prioritylock_printf("lock begin %s\n", toString().c_str()); // This shortcut may enable a waiter to jump the line when the releaser loop yields @@ -610,7 +619,7 @@ public: nextPageID = id; debug_printf( "FIFOQueue::Cursor(%s) loadPage start id=%s\n", toString().c_str(), ::toString(nextPageID).c_str()); - nextPageReader = queue->pager->readPage(nextPageID, true); + nextPageReader = queue->pager->readPage(nextPageID, 0, true, false); } Future loadExtent() { @@ -1922,7 +1931,8 @@ public: Promise errorPromise = {}) : desiredPageSize(desiredPageSize), desiredExtentSize(desiredExtentSize), filename(filename), pHeader(nullptr), pageCacheBytes(pageCacheSizeBytes), memoryOnly(memoryOnly), remapCleanupWindow(remapCleanupWindow), - concurrentExtentReads(new FlowLock(concurrentExtentReads)), errorPromise(errorPromise) { + concurrentExtentReads(new FlowLock(concurrentExtentReads)), errorPromise(errorPromise), + ioLock(FLOW_KNOBS->MAX_OUTSTANDING, ioMaxPriority) { if (!g_redwoodMetricsActor.isValid()) { g_redwoodMetricsActor = redwoodMetricsLogger(); @@ -2535,13 +2545,16 @@ public: // and before the user-chosen sized pages. ACTOR static Future> readPhysicalPage(DWALPager* self, PhysicalPageID pageID, - bool header = false) { + int priority, + bool header) { ASSERT(!self->memoryOnly); ++g_redwoodMetrics.pagerDiskRead; - if (g_network->getCurrentTask() > TaskPriority::DiskRead) { - wait(delay(0, TaskPriority::DiskRead)); - } + state PriorityMultiLock::Lock lock = wait(self->ioLock.lock(std::min(priority, ioMaxPriority))); + + // if (g_network->getCurrentTask() > TaskPriority::DiskRead) { + // wait(delay(0, TaskPriority::DiskRead)); + // } state Reference page = header ? Reference(new ArenaPage(smallestPhysicalBlock, smallestPhysicalBlock)) @@ -2582,7 +2595,7 @@ public: } static Future> readHeaderPage(DWALPager* self, PhysicalPageID pageID) { - return readPhysicalPage(self, pageID, true); + return readPhysicalPage(self, pageID, ioMaxPriority, true); } bool tryEvictPage(LogicalPageID logicalID, Version v) { @@ -2592,7 +2605,7 @@ public: // Reads the most recent version of pageID, either previously committed or written using updatePage() // in the current commit - Future> readPage(LogicalPageID pageID, bool cacheable, bool noHit = false) override { + Future> readPage(LogicalPageID pageID, int priority, bool cacheable, bool noHit) override { // Use cached page if present, without triggering a cache hit. // Otherwise, read the page and return it but don't add it to the cache if (!cacheable) { @@ -2605,7 +2618,7 @@ public: } debug_printf("DWALPager(%s) op=readUncachedMiss %s\n", filename.c_str(), toString(pageID).c_str()); - return forwardError(readPhysicalPage(this, (PhysicalPageID)pageID), errorPromise); + return forwardError(readPhysicalPage(this, (PhysicalPageID)pageID, priority, false), errorPromise); } PageCacheEntry& cacheEntry = pageCache.get(pageID, noHit); @@ -2619,7 +2632,8 @@ public: if (!cacheEntry.initialized()) { debug_printf("DWALPager(%s) issuing actual read of %s\n", filename.c_str(), toString(pageID).c_str()); - cacheEntry.readFuture = forwardError(readPhysicalPage(this, (PhysicalPageID)pageID), errorPromise); + cacheEntry.readFuture = + forwardError(readPhysicalPage(this, (PhysicalPageID)pageID, priority, false), errorPromise); cacheEntry.writeFuture = Void(); } @@ -2655,9 +2669,13 @@ public: return (PhysicalPageID)pageID; } - Future> readPageAtVersion(LogicalPageID logicalID, Version v, bool cacheable, bool noHit) { + Future> readPageAtVersion(LogicalPageID logicalID, + int priority, + Version v, + bool cacheable, + bool noHit) { PhysicalPageID physicalID = getPhysicalPageID(logicalID, v); - return readPage(physicalID, cacheable, noHit); + return readPage(physicalID, priority, cacheable, noHit); } void releaseExtentReadLock() override { concurrentExtentReads->release(); } @@ -2885,7 +2903,7 @@ public: debug_printf("DWALPager(%s) remapCleanup copy %s\n", self->filename.c_str(), p.toString().c_str()); // Read the data from the page that the original was mapped to - Reference data = wait(self->readPage(p.newPageID, false, true)); + Reference data = wait(self->readPage(p.newPageID, 0, false, true)); // Write the data to the original page so it can be read using its original pageID self->updatePage(p.originalPageID, data); @@ -3289,6 +3307,12 @@ private: int physicalExtentSize; int pagesPerExtent; +public: + static constexpr int ioMaxPriority = 2; + +private: + PriorityMultiLock ioLock; + int64_t pageCacheBytes; // The header will be written to / read from disk as a smallestPhysicalBlock sized chunk. @@ -3358,11 +3382,14 @@ public: : pager(pager), metaKey(meta), version(version), expired(expiredFuture) {} ~DWALPagerSnapshot() override {} - Future> getPhysicalPage(LogicalPageID pageID, bool cacheable, bool noHit) override { + Future> getPhysicalPage(LogicalPageID pageID, + int priority, + bool cacheable, + bool noHit) override { if (expired.isError()) { throw expired.getError(); } - return map(pager->readPageAtVersion(pageID, version, cacheable, noHit), + return map(pager->readPageAtVersion(pageID, priority, version, cacheable, noHit), [=](Reference p) { return Reference(std::move(p)); }); } @@ -4299,7 +4326,7 @@ public: break; } // Start reading the page, without caching - entries.push_back(std::make_pair(q.get(), self->readPage(snapshot, q.get().pageID, true, false))); + entries.push_back(std::make_pair(q.get(), self->readPage(snapshot, q.get().pageID, 1, true, false))); --toPop; } @@ -5132,8 +5159,9 @@ private: ACTOR static Future> readPage(Reference snapshot, BTreePageIDRef id, - bool forLazyClear = false, - bool cacheable = true) { + int priority, + bool forLazyClear, + bool cacheable) { debug_printf("readPage() op=read%s %s @%" PRId64 "\n", forLazyClear ? "ForDeferredClear" : "", @@ -5143,13 +5171,13 @@ private: state Reference page; if (id.size() == 1) { - Reference p = wait(snapshot->getPhysicalPage(id.front(), cacheable, false)); + Reference p = wait(snapshot->getPhysicalPage(id.front(), priority, cacheable, false)); page = std::move(p); } else { ASSERT(!id.empty()); std::vector>> reads; for (auto& pageID : id) { - reads.push_back(snapshot->getPhysicalPage(pageID, cacheable, false)); + reads.push_back(snapshot->getPhysicalPage(pageID, priority, cacheable, false)); } std::vector> pages = wait(getAll(reads)); // TODO: Cache reconstituted super pages somehow, perhaps with help from the Pager. @@ -5195,12 +5223,12 @@ private: ((BTreePage*)page->begin())->tree()); } - static void preLoadPage(IPagerSnapshot* snapshot, BTreePageIDRef id) { + static void preLoadPage(IPagerSnapshot* snapshot, BTreePageIDRef id, int priority) { g_redwoodMetrics.btreeLeafPreload += 1; g_redwoodMetrics.btreeLeafPreloadExt += (id.size() - 1); for (auto pageID : id) { - snapshot->getPhysicalPage(pageID, true, true); + snapshot->getPhysicalPage(pageID, priority, true, true); } } @@ -5576,7 +5604,7 @@ private: Reference snapshot, MutationBuffer* mutationBuffer, BTreePageIDRef rootID, - bool isLeaf, + int height, MutationBuffer::const_iterator mBegin, // greatest mutation boundary <= subtreeLowerBound->key MutationBuffer::const_iterator mEnd, // least boundary >= subtreeUpperBound->key InternalPageSliceUpdate* update) { @@ -5602,7 +5630,7 @@ private: debug_printf("%s -------------------------------------\n", context.c_str()); } - state Reference page = wait(readPage(snapshot, rootID, false, false)); + state Reference page = wait(readPage(snapshot, rootID, height - 1, false, false)); state Version writeVersion = self->getLastCommittedVersion() + 1; // If the page exists in the cache, it must be copied before modification. @@ -5612,7 +5640,7 @@ private: state Reference pageCopy; state BTreePage* btPage = (BTreePage*)page->begin(); - ASSERT(isLeaf == btPage->isLeaf()); + ASSERT(height == btPage->height); g_redwoodMetrics.level(btPage->height).pageCommitStart += 1; // TODO: Decide if it is okay to update if the subtree boundaries are expanded. It can result in @@ -5647,7 +5675,7 @@ private: } // Leaf Page - if (isLeaf) { + if (btPage->isLeaf()) { bool updating = tryToUpdate; bool changesMade = false; @@ -6088,7 +6116,7 @@ private: // If this page has height of 2 then its children are leaf nodes recursions.push_back( - self->commitSubtree(self, snapshot, mutationBuffer, pageID, btPage->height == 2, mBegin, mEnd, &u)); + self->commitSubtree(self, snapshot, mutationBuffer, pageID, btPage->height - 1, mBegin, mEnd, &u)); } debug_printf( @@ -6324,7 +6352,7 @@ private: self->m_pager->getReadSnapshot(latestVersion), mutations, rootPageID, - self->m_pHeader->height == 1, + self->m_pHeader->height, mBegin, mEnd, &all)); @@ -6444,19 +6472,21 @@ public: Future pushPage(const BTreePage::BinaryTree::Cursor& link) { debug_printf("pushPage(link=%s)\n", link.get().toString(false).c_str()); - return map(readPage(pager, link.get().getChildPage()), [=](Reference p) { + return map(readPage(pager, link.get().getChildPage(), DWALPager::ioMaxPriority, false, true), + [=](Reference p) { #if REDWOOD_DEBUG - path.push_back({ p, getCursor(p, link), link.get().getChildPage() }); + path.push_back({ p, getCursor(p, link), link.get().getChildPage() }); #else path.push_back({ p, getCursor(p, link) }); #endif - return Void(); - }); + return Void(); + }); } Future pushPage(BTreePageIDRef id) { debug_printf("pushPage(root=%s)\n", ::toString(id).c_str()); - return map(readPage(pager, id), [=](Reference p) { + + return map(readPage(pager, id, DWALPager::ioMaxPriority, false, true), [=](Reference p) { #if REDWOOD_DEBUG path.push_back({ p, getCursor(p, dbBegin, dbEnd), id }); #else @@ -6579,7 +6609,7 @@ public: // Prefetch the sibling if the link is not null if (c.get().value.present()) { BTreePageIDRef childPage = c.get().getChildPage(); - preLoadPage(pager.getPtr(), childPage); + preLoadPage(pager.getPtr(), childPage, 0); recordsRead += estRecordsPerPage; // Use sibling node capacity as an estimate of bytes read. bytesRead += childPage.size() * this->btree->m_blockSize; @@ -6681,7 +6711,7 @@ RedwoodRecordRef VersionedBTree::dbEnd(LiteralStringRef("\xff\xff\xff\xff\xff")) class KeyValueStoreRedwoodUnversioned : public IKeyValueStore { public: KeyValueStoreRedwoodUnversioned(std::string filePrefix, UID logID) - : m_filePrefix(filePrefix), m_concurrentReads(SERVER_KNOBS->REDWOOD_KVSTORE_CONCURRENT_READS, 1), + : m_filePrefix(filePrefix), m_concurrentReads(SERVER_KNOBS->REDWOOD_KVSTORE_CONCURRENT_READS, 0), prefetch(SERVER_KNOBS->REDWOOD_KVSTORE_RANGE_PREFETCH) { int pageSize = @@ -6779,7 +6809,7 @@ public: wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion())); ASSERT(!self->m_error.isSet()); - state PriorityMultiLock::Lock lock = wait(self->m_concurrentReads.lock()); + state PriorityMultiLock::Lock lock = wait(self->m_concurrentReads.lock(0)); ++g_redwoodMetrics.opGetRange; state RangeResult result; @@ -6900,7 +6930,7 @@ public: state VersionedBTree::BTreeCursor cur; wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion())); - state PriorityMultiLock::Lock lock = wait(self->m_concurrentReads.lock()); + state PriorityMultiLock::Lock lock = wait(self->m_concurrentReads.lock(0)); ++g_redwoodMetrics.opGet; wait(cur.seekGTE(key)); @@ -8975,7 +9005,7 @@ TEST_CASE(":/redwood/correctness/pager/cow") { pager->updatePage(id, p); pager->setMetaKey(LiteralStringRef("asdfasdf")); wait(pager->commit()); - Reference p2 = wait(pager->readPage(id, true)); + Reference p2 = wait(pager->readPage(id, 0, true, false)); printf("%s\n", StringRef(p2->begin(), p2->size()).toHexString().c_str()); // TODO: Verify reads, do more writes and reads to make this a real pager validator From 8a5caeb756cb6088ae3d3e8fdbc4a7e6a1e9080f Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Wed, 14 Jul 2021 01:42:30 -0700 Subject: [PATCH 08/15] IO priority tweaks. Writes are lowest, queue reads are highest, reads for prefetch, lazy clear, or remap removal are at leaf priority. Count pager disk reads and writes when they start, after the lock is obtained. --- fdbserver/VersionedBTree.actor.cpp | 83 +++++++++++++++++------------- 1 file changed, 47 insertions(+), 36 deletions(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 273082759f..a3cc417a1e 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -139,7 +139,7 @@ public: ~PriorityMultiLock() { prioritylock_printf("destruct"); } - Future lock(int priority) { + Future lock(int priority = 0) { prioritylock_printf("lock begin %s\n", toString().c_str()); // This shortcut may enable a waiter to jump the line when the releaser loop yields @@ -359,6 +359,10 @@ std::string toString(const std::pair& o) { return format("{%s, %s}", toString(o.first).c_str(), toString(o.second).c_str()); } +static constexpr int ioMinPriority = 0; +static constexpr int ioLeafPriority = 1; +static constexpr int ioMaxPriority = 2; + // A FIFO queue of T stored as a linked list of pages. // Main operations are pop(), pushBack(), pushFront(), and flush(). // @@ -619,7 +623,7 @@ public: nextPageID = id; debug_printf( "FIFOQueue::Cursor(%s) loadPage start id=%s\n", toString().c_str(), ::toString(nextPageID).c_str()); - nextPageReader = queue->pager->readPage(nextPageID, 0, true, false); + nextPageReader = queue->pager->readPage(nextPageID, ioMaxPriority, true, false); } Future loadExtent() { @@ -2339,44 +2343,53 @@ public: Future newExtentPageID(QueueID queueID) override { return newExtentPageID_impl(this, queueID); } - Future writePhysicalPage(PhysicalPageID pageID, Reference page, bool header = false) { + ACTOR static Future writePhysicalPage_impl(DWALPager* self, + PhysicalPageID pageID, + Reference page, + bool header = false) { debug_printf("DWALPager(%s) op=%s %s ptr=%p\n", - filename.c_str(), + self->filename.c_str(), (header ? "writePhysicalHeader" : "writePhysical"), toString(pageID).c_str(), page->begin()); - ++g_redwoodMetrics.pagerDiskWrite; VALGRIND_MAKE_MEM_DEFINED(page->begin(), page->size()); page->updateChecksum(pageID); debug_printf("DWALPager(%s) writePhysicalPage %s CalculatedChecksum=%d ChecksumInPage=%d\n", - filename.c_str(), + self->filename.c_str(), toString(pageID).c_str(), page->calculateChecksum(pageID), page->getChecksum()); - if (memoryOnly) { + if (self->memoryOnly) { return Void(); } + state PriorityMultiLock::Lock lock = wait(self->ioLock.lock(ioMinPriority)); + ++g_redwoodMetrics.pagerDiskWrite; + // Note: Not using forwardError here so a write error won't be discovered until commit time. - int blockSize = header ? smallestPhysicalBlock : physicalPageSize; - Future f = - holdWhile(page, map(pageFile->write(page->begin(), blockSize, (int64_t)pageID * blockSize), [=](Void) { - debug_printf("DWALPager(%s) op=%s %s ptr=%p file offset=%d\n", - filename.c_str(), - (header ? "writePhysicalHeaderComplete" : "writePhysicalComplete"), - toString(pageID).c_str(), - page->begin(), - (pageID * blockSize)); - return Void(); - })); + state int blockSize = header ? smallestPhysicalBlock : self->physicalPageSize; + wait(self->pageFile->write(page->begin(), blockSize, (int64_t)pageID * blockSize)); + + debug_printf("DWALPager(%s) op=%s %s ptr=%p file offset=%d\n", + self->filename.c_str(), + (header ? "writePhysicalHeaderComplete" : "writePhysicalComplete"), + toString(pageID).c_str(), + page->begin(), + (pageID * blockSize)); + + return Void(); + } + + Future writePhysicalPage(PhysicalPageID pageID, Reference page, bool header = false) { + Future f = writePhysicalPage_impl(this, pageID, page, header); operations.add(f); return f; } Future writeHeaderPage(PhysicalPageID pageID, Reference page) { - return writePhysicalPage(pageID, page, true); + return writePhysicalPage_impl(this, pageID, page, true); } void updatePage(LogicalPageID pageID, Reference data) override { @@ -2548,9 +2561,6 @@ public: int priority, bool header) { ASSERT(!self->memoryOnly); - ++g_redwoodMetrics.pagerDiskRead; - - state PriorityMultiLock::Lock lock = wait(self->ioLock.lock(std::min(priority, ioMaxPriority))); // if (g_network->getCurrentTask() > TaskPriority::DiskRead) { // wait(delay(0, TaskPriority::DiskRead)); @@ -2564,8 +2574,11 @@ public: toString(pageID).c_str(), page->begin()); - int blockSize = header ? smallestPhysicalBlock : self->physicalPageSize; + state PriorityMultiLock::Lock lock = wait(self->ioLock.lock(std::min(priority, ioMaxPriority))); + ++g_redwoodMetrics.pagerDiskRead; + // TODO: Could a dispatched read try to write to page after it has been destroyed if this actor is cancelled? + int blockSize = header ? smallestPhysicalBlock : self->physicalPageSize; int readBytes = wait(self->pageFile->read(page->mutate(), blockSize, (int64_t)pageID * blockSize)); debug_printf("DWALPager(%s) op=readPhysicalComplete %s ptr=%p bytes=%d\n", self->filename.c_str(), @@ -2689,8 +2702,6 @@ public: wait(self->concurrentExtentReads->take()); ASSERT(!self->memoryOnly); - ++g_redwoodMetrics.pagerDiskRead; - if (g_network->getCurrentTask() > TaskPriority::DiskRead) { wait(delay(0, TaskPriority::DiskRead)); } @@ -2725,6 +2736,7 @@ public: for (i = 0; i < parallelReads; i++) { currentOffset = i * physicalReadSize; debug_printf("DWALPager(%s) current offset %d\n", self->filename.c_str(), currentOffset); + ++g_redwoodMetrics.pagerDiskRead; reads.push_back( self->pageFile->read(extent->mutate() + currentOffset, physicalReadSize, startOffset + currentOffset)); } @@ -2737,6 +2749,7 @@ public: i, currentOffset, lastReadSize); + ++g_redwoodMetrics.pagerDiskRead; reads.push_back( self->pageFile->read(extent->mutate() + currentOffset, lastReadSize, startOffset + currentOffset)); } @@ -2903,7 +2916,7 @@ public: debug_printf("DWALPager(%s) remapCleanup copy %s\n", self->filename.c_str(), p.toString().c_str()); // Read the data from the page that the original was mapped to - Reference data = wait(self->readPage(p.newPageID, 0, false, true)); + Reference data = wait(self->readPage(p.newPageID, ioLeafPriority, false, true)); // Write the data to the original page so it can be read using its original pageID self->updatePage(p.originalPageID, data); @@ -3307,9 +3320,6 @@ private: int physicalExtentSize; int pagesPerExtent; -public: - static constexpr int ioMaxPriority = 2; - private: PriorityMultiLock ioLock; @@ -4326,7 +4336,8 @@ public: break; } // Start reading the page, without caching - entries.push_back(std::make_pair(q.get(), self->readPage(snapshot, q.get().pageID, 1, true, false))); + entries.push_back( + std::make_pair(q.get(), self->readPage(snapshot, q.get().pageID, ioLeafPriority, true, false))); --toPop; } @@ -6472,7 +6483,7 @@ public: Future pushPage(const BTreePage::BinaryTree::Cursor& link) { debug_printf("pushPage(link=%s)\n", link.get().toString(false).c_str()); - return map(readPage(pager, link.get().getChildPage(), DWALPager::ioMaxPriority, false, true), + return map(readPage(pager, link.get().getChildPage(), ioMaxPriority, false, true), [=](Reference p) { #if REDWOOD_DEBUG path.push_back({ p, getCursor(p, link), link.get().getChildPage() }); @@ -6486,7 +6497,7 @@ public: Future pushPage(BTreePageIDRef id) { debug_printf("pushPage(root=%s)\n", ::toString(id).c_str()); - return map(readPage(pager, id, DWALPager::ioMaxPriority, false, true), [=](Reference p) { + return map(readPage(pager, id, ioMaxPriority, false, true), [=](Reference p) { #if REDWOOD_DEBUG path.push_back({ p, getCursor(p, dbBegin, dbEnd), id }); #else @@ -6609,7 +6620,7 @@ public: // Prefetch the sibling if the link is not null if (c.get().value.present()) { BTreePageIDRef childPage = c.get().getChildPage(); - preLoadPage(pager.getPtr(), childPage, 0); + preLoadPage(pager.getPtr(), childPage, ioLeafPriority); recordsRead += estRecordsPerPage; // Use sibling node capacity as an estimate of bytes read. bytesRead += childPage.size() * this->btree->m_blockSize; @@ -6809,7 +6820,7 @@ public: wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion())); ASSERT(!self->m_error.isSet()); - state PriorityMultiLock::Lock lock = wait(self->m_concurrentReads.lock(0)); + state PriorityMultiLock::Lock lock = wait(self->m_concurrentReads.lock()); ++g_redwoodMetrics.opGetRange; state RangeResult result; @@ -6930,7 +6941,7 @@ public: state VersionedBTree::BTreeCursor cur; wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion())); - state PriorityMultiLock::Lock lock = wait(self->m_concurrentReads.lock(0)); + state PriorityMultiLock::Lock lock = wait(self->m_concurrentReads.lock()); ++g_redwoodMetrics.opGet; wait(cur.seekGTE(key)); @@ -9005,7 +9016,7 @@ TEST_CASE(":/redwood/correctness/pager/cow") { pager->updatePage(id, p); pager->setMetaKey(LiteralStringRef("asdfasdf")); wait(pager->commit()); - Reference p2 = wait(pager->readPage(id, 0, true, false)); + Reference p2 = wait(pager->readPage(id, ioMinPriority, true, false)); printf("%s\n", StringRef(p2->begin(), p2->size()).toHexString().c_str()); // TODO: Verify reads, do more writes and reads to make this a real pager validator From 8b0f03f6e8d162ea5b36e5d6e1379823b117226c Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Thu, 15 Jul 2021 09:05:36 -0700 Subject: [PATCH 09/15] CommitSubtree was still doing non-caching reads, this was unintentional. --- fdbserver/VersionedBTree.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index a3cc417a1e..5474a05a8f 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -5641,7 +5641,7 @@ private: debug_printf("%s -------------------------------------\n", context.c_str()); } - state Reference page = wait(readPage(snapshot, rootID, height - 1, false, false)); + state Reference page = wait(readPage(snapshot, rootID, height - 1, false, true)); state Version writeVersion = self->getLastCommittedVersion() + 1; // If the page exists in the cache, it must be copied before modification. From 39eb71f54a8e9aaacd5c9ca2ad5f8b4e2ec26f74 Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Sun, 18 Jul 2021 03:02:10 -0700 Subject: [PATCH 10/15] Corrections in BTreeCursor event reasons, fixed by BTreeCursor now storing reason as a member because it's much simpler. Some other clang-format changes and a couple of renames. --- fdbserver/IPager.h | 4 +- fdbserver/VersionedBTree.actor.cpp | 119 +++++++++++++++-------------- 2 files changed, 62 insertions(+), 61 deletions(-) diff --git a/fdbserver/IPager.h b/fdbserver/IPager.h index b04ff2a987..22ee4b94ca 100644 --- a/fdbserver/IPager.h +++ b/fdbserver/IPager.h @@ -42,10 +42,10 @@ typedef uint32_t QueueID; // Pager Events enum class PagerEvents { CacheLookup = 0, CacheHit, CacheMiss, PageWrite, MAXEVENTS }; -static const std::string PagerEventsCodes[] = { "Lookup", "Hit", "Miss", "Write" }; +static const std::string PagerEventsStrings[] = { "Lookup", "Hit", "Miss", "Write", "Unknown" }; // Reasons for page level events. enum class PagerEventReasons { PointRead = 0, RangeRead, RangePrefetch, Commit, LazyClear, MetaData, MAXEVENTREASONS }; -static const std::string PagerEventReasonsCodes[] = { "Get", "GetR", "GetRPF", "Commit", "LazyClr", "Meta" }; +static const std::string PagerEventReasonsStrings[] = { "Get", "GetR", "GetRPF", "Commit", "LazyClr", "Meta", "Unknown" }; static const int nonBtreeLevel = 0; static const std::pair possibleEventReasonPairs[] = { diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index cc3879191f..710ff6d725 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -1475,13 +1475,13 @@ struct RedwoodMetrics { for (const auto& ER : L0PossibleEventReasonPairs) { if (prevEvent != ER.first) { result += "\n"; - result += PagerEventsCodes[(size_t)ER.first]; + result += PagerEventsStrings[(size_t)ER.first]; result += "\n\t"; prevEvent = ER.first; } std::string num = std::to_string(eventReasons[(size_t)ER.first][(size_t)ER.second]); - result += PagerEventReasonsCodes[(size_t)ER.second]; - result.append(16 - PagerEventReasonsCodes[(size_t)ER.second].length(), ' '); + result += PagerEventReasonsStrings[(size_t)ER.second]; + result.append(16 - PagerEventReasonsStrings[(size_t)ER.second].length(), ' '); result.append(8 - num.length(), ' '); result += num; result.append(13, ' '); @@ -1490,13 +1490,13 @@ struct RedwoodMetrics { for (const auto& ER : possibleEventReasonPairs) { if (prevEvent != ER.first) { result += "\n"; - result += PagerEventsCodes[(size_t)ER.first]; + result += PagerEventsStrings[(size_t)ER.first]; result += "\n\t"; prevEvent = ER.first; } std::string num = std::to_string(eventReasons[(size_t)ER.first][(size_t)ER.second]); - result += PagerEventReasonsCodes[(size_t)ER.second]; - result.append(16 - PagerEventReasonsCodes[(size_t)ER.second].length(), ' '); + result += PagerEventReasonsStrings[(size_t)ER.second]; + result.append(16 - PagerEventReasonsStrings[(size_t)ER.second].length(), ' '); result.append(8 - num.length(), ' '); result += num; result.append(13, ' '); @@ -1511,7 +1511,7 @@ struct RedwoodMetrics { format( "L%d%s", h, - (PagerEventsCodes[(size_t)ER.first] + PagerEventReasonsCodes[(size_t)ER.second]).c_str()), + (PagerEventsStrings[(size_t)ER.first] + PagerEventReasonsStrings[(size_t)ER.second]).c_str()), eventReasons[(size_t)ER.first][(size_t)ER.second]); } } else { @@ -1520,7 +1520,7 @@ struct RedwoodMetrics { format( "L%d%s", h, - (PagerEventsCodes[(size_t)ER.first] + PagerEventReasonsCodes[(size_t)ER.second]).c_str()), + (PagerEventsStrings[(size_t)ER.first] + PagerEventReasonsStrings[(size_t)ER.second]).c_str()), eventReasons[(size_t)ER.first][(size_t)ER.second]); } } @@ -1664,7 +1664,7 @@ struct RedwoodMetrics { } void updateMaxRecordCount(int maxRecords) { - if(maxRecordCount != maxRecords) { + if (maxRecordCount != maxRecords) { maxRecordCount = maxRecordCount; for (int i = 0; i < btreeLevels + 1; ++i) { auto& level = levels[i]; @@ -5458,12 +5458,7 @@ private: for (auto pageID : id) { // Prefetches are always at the Leaf level currently so it isn't part of the per-level metrics set - snapshot->getPhysicalPage(PagerEventReasons::RangePrefetch, - nonBtreeLevel, - pageID, - priority, - true, - true); + snapshot->getPhysicalPage(PagerEventReasons::RangePrefetch, nonBtreeLevel, pageID, priority, true, true); } } @@ -5499,8 +5494,8 @@ private: state int height = ((BTreePage*)page->begin())->height; if (oldID.size() == 1) { - LogicalPageID id = wait(self->m_pager->atomicUpdatePage( - PagerEventReasons::Commit, height, oldID.front(), page, writeVersion)); + LogicalPageID id = wait( + self->m_pager->atomicUpdatePage(PagerEventReasons::Commit, height, oldID.front(), page, writeVersion)); newID.front() = id; } else { state std::vector> pages; @@ -6666,19 +6661,23 @@ public: }; private: + PagerEventReasons reason; VersionedBTree* btree; Reference pager; bool valid; std::vector path; public: - BTreeCursor() {} + BTreeCursor() : reason(PagerEventReasons::MAXEVENTREASONS) {} bool intialized() const { return pager.isValid(); } bool isValid() const { return valid; } std::string toString() const { - std::string r = format("{ptr=%p %s ", this, ::toString(pager->getVersion()).c_str()); + std::string r = format("{ptr=%p reason=%s %s ", + this, + PagerEventsStrings[(int)reason].c_str(), + ::toString(pager->getVersion()).c_str()); for (int i = 0; i < path.size(); ++i) { std::string id = ""; #if REDWOOD_DEBUG @@ -6707,7 +6706,7 @@ public: PathEntry& back() { return path.back(); } void popPath() { path.pop_back(); } - Future pushPage(PagerEventReasons reason, const BTreePage::BinaryTree::Cursor& link) { + Future pushPage(const BTreePage::BinaryTree::Cursor& link) { debug_printf("pushPage(link=%s)\n", link.get().toString(false).c_str()); return map(readPage(reason, path.back().btPage()->height - 1, @@ -6726,7 +6725,7 @@ public: }); } - Future pushPage(PagerEventReasons reason, BTreePageIDRef id) { + Future pushPage(BTreePageIDRef id) { debug_printf("pushPage(root=%s)\n", ::toString(id).c_str()); return map(readPage(reason, btree->m_pHeader->height, pager, id, ioMaxPriority, false, true), [=](Reference p) { @@ -6740,13 +6739,17 @@ public: } // Initialize or reinitialize cursor - Future init(VersionedBTree* btree_in, Reference pager_in, BTreePageIDRef root) { + Future init(VersionedBTree* btree_in, + PagerEventReasons reason, + Reference pager_in, + BTreePageIDRef root) { btree = btree_in; + reason = reason; pager = pager_in; path.clear(); path.reserve(6); valid = false; - return pushPage(PagerEventReasons::PointRead, root); + return pushPage(root); } // Seeks cursor to query if it exists, the record before or after it, or an undefined and invalid @@ -6759,7 +6762,7 @@ public: // If non-zero is returned then the cursor is valid and the return value is logically equivalent // to query.compare(cursor.get()) - ACTOR Future seek_impl(BTreeCursor* self, RedwoodRecordRef query, PagerEventReasons reason) { + ACTOR Future seek_impl(BTreeCursor* self, RedwoodRecordRef query) { state RedwoodRecordRef internalPageQuery = query.withMaxPageID(); self->path.resize(1); debug_printf("seek(%s) start cursor = %s\n", query.toString().c_str(), self->toString().c_str()); @@ -6783,7 +6786,7 @@ public: if (entry.cursor.seekLessThan(internalPageQuery) && entry.cursor.get().value.present()) { debug_printf( "seek(%s) loop seek success cursor=%s\n", query.toString().c_str(), self->toString().c_str()); - Future f = self->pushPage(reason, entry.cursor); + Future f = self->pushPage(entry.cursor); wait(f); } else { self->valid = false; @@ -6794,20 +6797,18 @@ public: } } - Future seek(RedwoodRecordRef query, PagerEventReasons reason) { return seek_impl(this, query, reason); } + Future seek(RedwoodRecordRef query) { return seek_impl(this, query); } - ACTOR Future seekGTE_impl(BTreeCursor* self, RedwoodRecordRef query, PagerEventReasons reason) { + ACTOR Future seekGTE_impl(BTreeCursor* self, RedwoodRecordRef query) { debug_printf("seekGTE(%s) start\n", query.toString().c_str()); - int cmp = wait(self->seek(query, reason)); + int cmp = wait(self->seek(query)); if (cmp > 0 || (cmp == 0 && !self->isValid())) { wait(self->moveNext()); } return Void(); } - Future seekGTE(RedwoodRecordRef query, PagerEventReasons reason) { - return seekGTE_impl(this, query, reason); - } + Future seekGTE(RedwoodRecordRef query) { return seekGTE_impl(this, query); } // Start fetching sibling nodes in the forward or backward direction, stopping after recordLimit or byteLimit void prefetch(KeyRef rangeEnd, bool directionForward, int recordLimit, int byteLimit) { @@ -6866,18 +6867,16 @@ public: } } - ACTOR Future seekLT_impl(BTreeCursor* self, RedwoodRecordRef query, PagerEventReasons reason) { + ACTOR Future seekLT_impl(BTreeCursor* self, RedwoodRecordRef query) { debug_printf("seekLT(%s) start\n", query.toString().c_str()); - int cmp = wait(self->seek(query, reason)); + int cmp = wait(self->seek(query)); if (cmp <= 0) { wait(self->movePrev()); } return Void(); } - Future seekLT(RedwoodRecordRef query, PagerEventReasons reason) { - return seekLT_impl(this, query, reason); - } + Future seekLT(RedwoodRecordRef query) { return seekLT_impl(this, query); } ACTOR Future move_impl(BTreeCursor* self, bool forward) { // Try to the move cursor at the end of the path in the correct direction @@ -6927,7 +6926,7 @@ public: ASSERT(entry.cursor.get().value.present()); } - wait(self->pushPage(PagerEventReasons::MetaData, entry.cursor)); + wait(self->pushPage(entry.cursor)); auto& newEntry = self->path.back(); ASSERT(forward ? newEntry.cursor.moveFirst() : newEntry.cursor.moveLast()); } @@ -6942,7 +6941,7 @@ public: Future movePrev() { return move_impl(this, false); } }; - Future initBTreeCursor(BTreeCursor* cursor, Version snapshotVersion) { + Future initBTreeCursor(BTreeCursor* cursor, Version snapshotVersion, PagerEventReasons reason) { // Only committed versions can be read. ASSERT(snapshotVersion <= m_lastCommittedVersion); Reference snapshot = m_pager->getReadSnapshot(snapshotVersion); @@ -6950,7 +6949,7 @@ public: // This is a ref because snapshot will continue to hold the metakey value memory KeyRef m = snapshot->getMetaKey(); - return cursor->init(this, snapshot, ((MetaKey*)m.begin())->root.get()); + return cursor->init(this, reason, snapshot, ((MetaKey*)m.begin())->root.get()); } }; @@ -7057,7 +7056,8 @@ public: int rowLimit, int byteLimit) { state VersionedBTree::BTreeCursor cur; - wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion())); + wait( + self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion(), PagerEventReasons::RangeRead)); state PriorityMultiLock::Lock lock = wait(self->m_concurrentReads.lock()); ++g_redwoodMetrics.metric.opGetRange; @@ -7071,12 +7071,12 @@ public: } if (rowLimit > 0) { - wait(cur.seekGTE(keys.begin, PagerEventReasons::RangeRead)); + wait(cur.seekGTE(keys.begin)); if (self->prefetch) { cur.prefetch(keys.end, true, rowLimit, byteLimit); } - + while (cur.isValid()) { // Read page contents without using waits BTreePage::BinaryTree::Cursor leafCursor = cur.back().cursor; @@ -7118,7 +7118,7 @@ public: wait(cur.moveNext()); } } else { - wait(cur.seekLT(keys.end, PagerEventReasons::RangeRead)); + wait(cur.seekLT(keys.end)); if (self->prefetch) { cur.prefetch(keys.begin, false, -rowLimit, byteLimit); @@ -7179,12 +7179,13 @@ public: Key key, Optional debugID) { state VersionedBTree::BTreeCursor cur; - wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion())); + wait( + self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion(), PagerEventReasons::PointRead)); state PriorityMultiLock::Lock lock = wait(self->m_concurrentReads.lock()); ++g_redwoodMetrics.metric.opGet; - wait(cur.seekGTE(key, PagerEventReasons::PointRead)); + wait(cur.seekGTE(key)); if (cur.isValid() && cur.get().key == key) { // Return a Value whose arena depends on the source page arena Value v; @@ -7291,7 +7292,7 @@ ACTOR Future verifyRangeBTreeCursor(VersionedBTree* btree, state std::map, Optional>::const_iterator iLast; state VersionedBTree::BTreeCursor cur; - wait(btree->initBTreeCursor(&cur, v)); + wait(btree->initBTreeCursor(&cur, v, PagerEventReasons::RangeRead)); debug_printf("VerifyRange(@%" PRId64 ", %s, %s): Start\n", v, start.printable().c_str(), end.printable().c_str()); // Randomly use the cursor for something else first. @@ -7302,13 +7303,13 @@ ACTOR Future verifyRangeBTreeCursor(VersionedBTree* btree, start.printable().c_str(), end.printable().c_str(), randomKey.toString().c_str()); - wait(success(cur.seek(randomKey, PagerEventReasons::RangeRead))); + wait(success(cur.seek(randomKey))); } debug_printf( "VerifyRange(@%" PRId64 ", %s, %s): Actual seek\n", v, start.printable().c_str(), end.printable().c_str()); - wait(cur.seekGTE(start, PagerEventReasons::RangeRead)); + wait(cur.seekGTE(start)); state Standalone> results; @@ -7404,11 +7405,11 @@ ACTOR Future verifyRangeBTreeCursor(VersionedBTree* btree, // opening new cursors if (v >= btree->getOldestVersion() && deterministicRandom()->coinflip()) { cur = VersionedBTree::BTreeCursor(); - wait(btree->initBTreeCursor(&cur, v)); + wait(btree->initBTreeCursor(&cur, v, PagerEventReasons::RangeRead)); } // Now read the range from the tree in reverse order and compare to the saved results - wait(cur.seekLT(end, PagerEventReasons::RangeRead)); + wait(cur.seekLT(end)); state std::reverse_iterator r = results.rbegin(); @@ -7476,7 +7477,7 @@ ACTOR Future seekAllBTreeCursor(VersionedBTree* btree, state int errors = 0; state VersionedBTree::BTreeCursor cur; - wait(btree->initBTreeCursor(&cur, v)); + wait(btree->initBTreeCursor(&cur, v, PagerEventReasons::RangeRead)); while (i != iEnd) { state std::string key = i->first.first; @@ -7485,7 +7486,7 @@ ACTOR Future seekAllBTreeCursor(VersionedBTree* btree, state Optional val = i->second; debug_printf("Verifying @%" PRId64 " '%s'\n", ver, key.c_str()); state Arena arena; - wait(cur.seekGTE(RedwoodRecordRef(KeyRef(arena, key)), PagerEventReasons::RangeRead)); + wait(cur.seekGTE(RedwoodRecordRef(KeyRef(arena, key)))); bool foundKey = cur.isValid() && cur.get().key == key; bool hasValue = foundKey && cur.get().value.present(); @@ -7558,7 +7559,7 @@ ACTOR Future verify(VersionedBTree* btree, // Get a cursor at v so that v doesn't get expired between the possibly serial steps below. state VersionedBTree::BTreeCursor cur; - wait(btree->initBTreeCursor(&cur, v)); + wait(btree->initBTreeCursor(&cur, v, PagerEventReasons::RangeRead)); debug_printf("Verifying entire key range at version %" PRId64 "\n", v); state Future fRangeAll = verifyRangeBTreeCursor( @@ -7606,11 +7607,11 @@ ACTOR Future randomReader(VersionedBTree* btree) { loop { wait(yield()); if (!cur.intialized() || deterministicRandom()->random01() > .01) { - wait(btree->initBTreeCursor(&cur, btree->getLastCommittedVersion())); + wait(btree->initBTreeCursor(&cur, btree->getLastCommittedVersion(), PagerEventReasons::RangeRead)); } state KeyValue kv = randomKV(10, 0); - wait(cur.seekGTE(kv.key, PagerEventReasons::RangeRead)); + wait(cur.seekGTE(kv.key)); state int c = deterministicRandom()->randomInt(0, 100); state bool direction = deterministicRandom()->coinflip(); while (cur.isValid() && c-- > 0) { @@ -9187,10 +9188,10 @@ ACTOR Future randomSeeks(VersionedBTree* btree, int count, char firstChar, state int c = 0; state double readStart = timer(); state VersionedBTree::BTreeCursor cur; - wait(btree->initBTreeCursor(&cur, readVer)); + wait(btree->initBTreeCursor(&cur, readVer, PagerEventReasons::PointRead)); while (c < count) { state Key k = randomString(20, firstChar, lastChar); - wait(cur.seekGTE(k, PagerEventReasons::PointRead)); + wait(cur.seekGTE(k)); ++c; } double elapsed = timer() - readStart; @@ -9208,12 +9209,12 @@ ACTOR Future randomScans(VersionedBTree* btree, state int c = 0; state double readStart = timer(); state VersionedBTree::BTreeCursor cur; - wait(btree->initBTreeCursor(&cur, readVer)); + wait(btree->initBTreeCursor(&cur, readVer, PagerEventReasons::RangeRead)); state int totalScanBytes = 0; while (c++ < count) { state Key k = randomString(20, firstChar, lastChar); - wait(cur.seekGTE(k, PagerEventReasons::PointRead)); + wait(cur.seekGTE(k)); state int w = width; state bool directionFwd = deterministicRandom()->coinflip(); From 13db52f1f24605961ea6c1816ac19dfc977aaeee Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Sun, 18 Jul 2021 04:38:42 -0700 Subject: [PATCH 11/15] Changed yield to delay(0) in places that use static counters to avoid trying to yield too often. Bug fixes with lazy delete queue entry heights. --- fdbserver/VersionedBTree.actor.cpp | 35 +++++++++++++++--------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 710ff6d725..da6e6f1808 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -191,7 +191,7 @@ private: if (++sinceYield == 1000) { sinceYield = 0; - wait(yield()); + wait(delay(0)); } // While there are available slots and there are waiters, launch tasks @@ -841,7 +841,7 @@ public: static int sinceYield = 0; if (++sinceYield == 1000) { sinceYield = 0; - wait(yield()); + wait(delay(0)); } lock.release(); @@ -891,7 +891,7 @@ public: static int sinceYield = 0; if (++sinceYield == 1000) { sinceYield = 0; - wait(yield()); + wait(delay(0)); } debug_printf("FIFOQueue::Cursor(%s) waitThenReadNext unlocking mutex\n", self->toString().c_str()); @@ -1508,19 +1508,19 @@ struct RedwoodMetrics { if (h == 0) { for (const auto& ER : L0PossibleEventReasonPairs) { t->detail( - format( - "L%d%s", - h, - (PagerEventsStrings[(size_t)ER.first] + PagerEventReasonsStrings[(size_t)ER.second]).c_str()), + format("L%d%s", + h, + (PagerEventsStrings[(size_t)ER.first] + PagerEventReasonsStrings[(size_t)ER.second]) + .c_str()), eventReasons[(size_t)ER.first][(size_t)ER.second]); } } else { for (const auto& ER : possibleEventReasonPairs) { t->detail( - format( - "L%d%s", - h, - (PagerEventsStrings[(size_t)ER.first] + PagerEventReasonsStrings[(size_t)ER.second]).c_str()), + format("L%d%s", + h, + (PagerEventsStrings[(size_t)ER.first] + PagerEventReasonsStrings[(size_t)ER.second]) + .c_str()), eventReasons[(size_t)ER.first][(size_t)ER.second]); } } @@ -4558,12 +4558,12 @@ public: const LazyClearQueueEntry& entry = entries[i].first; const BTreePage& btPage = *(BTreePage*)p->begin(); ASSERT(btPage.height == entry.height); - auto& metrics = g_redwoodMetrics.level(btPage.height).metrics; + auto& metrics = g_redwoodMetrics.level(entry.height).metrics; debug_printf("LazyClear: processing %s\n", toString(entry).c_str()); // Level 1 (leaf) nodes should never be in the lazy delete queue - ASSERT(btPage.height > 1); + ASSERT(entry.height > 1); // Iterate over page entries, skipping key decoding using BTreePage::ValueTree which uses // RedwoodRecordRef::DeltaValueOnly as the delta type type to skip key decoding @@ -4575,7 +4575,7 @@ public: if (c.get().value.present()) { BTreePageIDRef btChildPageID = c.get().getChildPage(); // If this page is height 2, then the children are leaves so free them directly - if (btPage.height == 2) { + if (entry.height == 2) { debug_printf("LazyClear: freeing child %s\n", toString(btChildPageID).c_str()); self->freeBTreePage(btChildPageID, v); freedPages += btChildPageID.size(); @@ -4584,7 +4584,8 @@ public: } else { // Otherwise, queue them for lazy delete. debug_printf("LazyClear: queuing child %s\n", toString(btChildPageID).c_str()); - self->m_lazyClearQueue.pushFront(LazyClearQueueEntry{ btPage.height, v, btChildPageID }); + self->m_lazyClearQueue.pushFront( + LazyClearQueueEntry{ (uint8_t)(entry.height - 1), v, btChildPageID }); metrics.lazyClearRequeue += 1; metrics.lazyClearRequeueExt += (btChildPageID.size() - 1); } @@ -6327,8 +6328,8 @@ private: debug_printf("%s: queuing subtree deletion cleared subtree range: %s\n", context.c_str(), ::toString(rec.getChildPage()).c_str()); - self->m_lazyClearQueue.pushFront( - LazyClearQueueEntry{ (uint8_t)height, writeVersion, rec.getChildPage() }); + self->m_lazyClearQueue.pushBack(LazyClearQueueEntry{ + (uint8_t)(height - 1), writeVersion, rec.getChildPage() }); } } c.moveNext(); From b3cc2beaabc7630556d46fb39045831683388397 Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Mon, 19 Jul 2021 15:10:01 -0700 Subject: [PATCH 12/15] Documentation fix. --- fdbserver/VersionedBTree.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index da6e6f1808..6e8fdd5188 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -109,7 +109,7 @@ void writePrefixedLines(FILE* fout, std::string prefix, std::string msg) { // The interface is similar to FlowMutex except that lock holders can drop the lock to release it. // // Usage: -// Lock lock = wait(prioritylock.take(priorityLevel)); +// Lock lock = wait(prioritylock.lock(priorityLevel)); // lock.release(); // Explicit release, or // // let lock and all copies of lock go out of scope to release class PriorityMultiLock { From 275089a9a942f1c619fb757850295ebb9cc249ce Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Mon, 19 Jul 2021 15:16:43 -0700 Subject: [PATCH 13/15] Bug fix, setting var to itself. --- fdbserver/VersionedBTree.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 6e8fdd5188..0c04c7c888 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -1665,7 +1665,7 @@ struct RedwoodMetrics { void updateMaxRecordCount(int maxRecords) { if (maxRecordCount != maxRecords) { - maxRecordCount = maxRecordCount; + maxRecordCount = maxRecords; for (int i = 0; i < btreeLevels + 1; ++i) { auto& level = levels[i]; level.buildItemCountSketch->updateUpperBound(maxRecordCount); From 80ac791dcf8b2fbe168e8eb707d4f761b59e7e62 Mon Sep 17 00:00:00 2001 From: Steve Atherton Date: Mon, 19 Jul 2021 15:58:38 -0700 Subject: [PATCH 14/15] Priority for commit reads should be == height. --- fdbserver/VersionedBTree.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 0c04c7c888..d0fdd4e283 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -5869,7 +5869,7 @@ private: } state Reference page = - wait(readPage(PagerEventReasons::Commit, height, snapshot, rootID, height - 1, false, true)); + wait(readPage(PagerEventReasons::Commit, height, snapshot, rootID, height, false, true)); state Version writeVersion = self->getLastCommittedVersion() + 1; // If the page exists in the cache, it must be copied before modification. From 45c0d4a57b359dec0344a8ef3b5b9c82db8ac2b2 Mon Sep 17 00:00:00 2001 From: Zhe Wang Date: Tue, 20 Jul 2021 17:26:37 -0500 Subject: [PATCH 15/15] include FDB HA write path doc in toctree --- documentation/sphinx/source/ha-write-path.rst | 48 +++++++++---------- .../sphinx/source/technical-overview.rst | 3 ++ 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/documentation/sphinx/source/ha-write-path.rst b/documentation/sphinx/source/ha-write-path.rst index 6af0db134b..ccd18c01ea 100644 --- a/documentation/sphinx/source/ha-write-path.rst +++ b/documentation/sphinx/source/ha-write-path.rst @@ -1,6 +1,6 @@ -############################## +################################################### FDB HA Write Path: How a mutation travels in FDB HA -############################## +################################################### | Author: Meng Xu | Reviewer: Alex Miller, Jingyu Zhou, Lukas Joswiak, Trevor Clinkenbeard @@ -15,15 +15,15 @@ To simplify the description, we assume the HA cluster has the following configur * Replication factor = 3 for transaction logs (tLogs). It means each mutation is synchronously replicated to 3 primary tLogs and 1 satellite tLog. * Satellite replication factor = 1 satellite single replication. It means each mutation must be synchronously replicated to 1 satellite tLog before it can be committed. - - * The satellite replication factor can be configured with one or two satellites and single, double or triple replicas as described here. We typically use only 1 satellite single replica config. + + * The satellite replication factor can be configured with one or two satellites and single, double or triple replicas as described here. We typically use only 1 satellite single replica config. * Only 1 satellite is configured in the primary DC. We describe the background knowledge -- Sharding and Tag structure -- before we discuss how a mutation travels in a FDB HA cluster. Sharding: Which shard goes to which servers? -================= +============================================ A shard is a continuous key range. FDB divides the entire keyspace to thousands of shards. A mutation’s key decides which shard it belongs to. @@ -35,7 +35,7 @@ Shard-to-tLog mapping is decided by shard-to-SS mapping and tLog’s replication Tag structure -================= +============= Tag is an overloaded term in FDB. In the early history of FDB, a tag is a number used in SS-to-tag mapping. As FDB evolves, tags are used by different components for different purposes: @@ -50,15 +50,15 @@ To distinguish the types of tags used for different purposes at different locati * locality (int8_t): When it is non-negative value, it decides which DC id the tag is used in. For example, if it is 0, it means the tag is used in primary DC and the tag’s id represents a storage server and is used for primary tLogs to index by storage servers. When it is negative, it decides which types of tags the tag belongs to. For example, if it is -2, it is a log router tag, and its id is used to decide which log router the tagged mutation should be sent to. The definition of all localities are in FDBTypes.h and you can easily find it if you search tagLocalitySpecial in the file. * id (uint16_t): Once locality decides which FDB components will the tag be applied to, id decides which process in the component type will be used for the tagged mutation. - - * FDB components in this context means (i) which DC of tLogs, and (ii) which types of tLogs. + + * FDB components in this context means (i) which DC of tLogs, and (ii) which types of tLogs. To simplify our discussion in the document, we use “tag.id” to represent a tag’s id, and tag as the Tag structure that has both locality and id. We represent a Tag as (locality, id). How does a mutation travel in FDB? -================= +================================== To simplify the description, we ignore the batching mechanisms happening in each component in the data path that are used to improve the system’s performance. @@ -67,12 +67,12 @@ Figure 1 illustrates how a mutation is routed inside FDB. The solid lines are as .. image:: /images/FDB_ha_write_path.png At Client ------------------ +--------- When an application creates a transaction and writes mutations, its FDB client sends the set of mutations to a proxy, say proxy 0. Now let’s focus on one of the normal mutations, say m1, whose key is in the normal keyspace. At Proxy ------------------ +-------- **Sequencing.** *It first asks the master for the commit version of this transaction batch*. The master acts like a sequencer for FDB transactions to determine the order of transactions to commit by assigning a new commit version and the last assigned commit version as the previous commit version. The transaction log system will use the [previous commit version, commit version] pair to determine its commit order, i.e., only make this transaction durable after the transaction with the previous commit version is made durable. @@ -102,21 +102,21 @@ Proxy groups mutations with the same tag as messages. Proxy then synchronously p At primary tLogs and satellite tLogs ------------------ +------------------------------------ Once it receives mutations pushed by proxies, it builds indexes for each tag’s mutations. Primary TLogs index both log router tags and the primary DC's SS tags. Satellite tLogs only index log router tags. If tLogs’ mutations cannot be peeked and popped by its consumers (i.e., SSes and log routers) quickly enough, tLogs’ memory usage will increase. When buffered mutations exceed 1.5GB (configurable by knob), their in-memory index will be spilled into a “Tag,version->disk location” B-tree. tLogs also maintain two properties: - + * It will not make a mutation at version V1 durable until mutations before V1 has been made durable; * It will not pop (i.e., delete) mutations at version V2, until mutations before V2 have been popped. At primary SS ------------------ +------------- **Primary tLog of a SS.** Since a SS’s tag is identically mapped to one tLog. The tLog has all mutations for the SS and is the primary tLog for the SS. When the SS peeks data from tLogs, it will prefer to peek data from its primary tLog. If the primary tLog crashes, it will contact the rest of tLogs, ask for mutations with the SS’s tag, and merge them together. This complex merge operation is abstracted in the TagPartitionedLogSystem interface. @@ -128,7 +128,7 @@ Now let’s look at how the mutation m1 is routed to the remote DC. At log router ------------------ +------------- Log routers are consumers of satellite tLogs or primary tLogs, controlled by a knob LOG_ROUTER_PEEK_FROM_SATELLITES_PREFERRED. By default, the knob is configured for log routers to use satellite tLogs. This relationship is similar to primary SSes to primary tLogs. @@ -138,7 +138,7 @@ Log router buffers its mutations in memory and waits for the remote tLogs to pee At remote tLogs ------------------ +--------------- Remote tLogs are consumers of log routers. Each remote tLog keeps pulling mutations, which have the remote tLog’s tag, from log routers. Because log router tags are randomly chosen for mutations, a remote tLog’s mutations can spread across all log routers. So each remote tLog must contact all log routers for its data and merge these mutations in increasing order of versions on the remote tLog. @@ -147,34 +147,34 @@ Once a remote tLog collects and merge mutations from all log routers, it makes t Now the mutation m1 has arrived at the remote tLog, which is similar as when it arrives at the primary tLog. -At remote SSes. ------------------ +At remote SSes +-------------- Similar to how primary SSes pull mutations from primary tLogs, each remote SS keeps pulling mutations, which have its tag, from remote tLogs. Once a remote SS makes mutations up to a version V1 durable, the SS pops its tag to the version V1 from all remote tLogs. Implementation -================= +============== * proxy assigns tags to a mutation: -https://github.com/xumengpanda/foundationdb/blob/063700e4d60cd44c1f32413761e3fe7571fab9c0/fdbserver/MasterProxyServer.actor.cpp#L824 +https://github.com/apple/foundationdb/blob/7eabdf784a21bca102f84e7eaf14bafc54605dff/fdbserver/MasterProxyServer.actor.cpp#L1410 Mutation Serialization (WiP) -================= +============================ This section will go into detail on how mutations are serialized as preparation for ingestion into the TagPartitionedLogSystem. This has also been covered at: -https://drive.google.com/file/d/1OaP5bqH2kst1VxD6RWj8h2cdr9rhhBHy/view. +https://drive.google.com/file/d/1OaP5bqH2kst1VxD6RWj8h2cdr9rhhBHy/view The proxy handles splitting transactions into their individual mutations. These mutations are then serialized and synchronously sent to multiple transaction logs. -The process starts in *commitBatch*. Eventually, *assignMutationsToStorageServers* is called to assign mutations to storage servers and serialize them. This function loops over each mutation in each transaction, determining the set of tags for the mutation (which storage servers it will be sent to), and then calling *LogPushData::writeTypedMessage* on the mutation. +The process starts in *commitBatch*. Eventually, *assignMutationsToStorageServers* is called to assign mutations to storage servers and serialize them. This function loops over each mutation in each transaction, determining the set of tags for the mutation (which storage servers it will be sent to), and then calling *LogPushData.writeTypedMessage* on the mutation. The *LogPushData* class is used to hold serialized mutations on a per transaction log basis. It’s *messagesWriter* field holds one *BinaryWriter* per transaction log. -*LogPushData::writeTypedMessage* is the function that serializes each mutation and writes it to the correct binary stream to be sent to the corresponding transaction log. Each serialized mutation contains additional metadata about the message, with the format: +*LogPushData.writeTypedMessage* is the function that serializes each mutation and writes it to the correct binary stream to be sent to the corresponding transaction log. Each serialized mutation contains additional metadata about the message, with the format: .. image:: /images/serialized_mutation_metadata_format.png diff --git a/documentation/sphinx/source/technical-overview.rst b/documentation/sphinx/source/technical-overview.rst index f66dfb3311..af0021f3f9 100644 --- a/documentation/sphinx/source/technical-overview.rst +++ b/documentation/sphinx/source/technical-overview.rst @@ -30,6 +30,8 @@ These documents explain the engineering design of FoundationDB, with detailed in * :doc:`read-write-path` describes how FDB read and write path works. +* :doc:`ha-write-path` describes how FDB write path works in HA setting. + .. toctree:: :maxdepth: 1 :titlesonly: @@ -48,3 +50,4 @@ These documents explain the engineering design of FoundationDB, with detailed in testing kv-architecture read-write-path + ha-write-path