diff --git a/contrib/gen_compile_db.py b/contrib/gen_compile_db.py old mode 100755 new mode 100644 diff --git a/documentation/sphinx/source/ha-write-path.rst b/documentation/sphinx/source/ha-write-path.rst index 6af0db134b..ccd18c01ea 100644 --- a/documentation/sphinx/source/ha-write-path.rst +++ b/documentation/sphinx/source/ha-write-path.rst @@ -1,6 +1,6 @@ -############################## +################################################### FDB HA Write Path: How a mutation travels in FDB HA -############################## +################################################### | Author: Meng Xu | Reviewer: Alex Miller, Jingyu Zhou, Lukas Joswiak, Trevor Clinkenbeard @@ -15,15 +15,15 @@ To simplify the description, we assume the HA cluster has the following configur * Replication factor = 3 for transaction logs (tLogs). It means each mutation is synchronously replicated to 3 primary tLogs and 1 satellite tLog. * Satellite replication factor = 1 satellite single replication. It means each mutation must be synchronously replicated to 1 satellite tLog before it can be committed. - - * The satellite replication factor can be configured with one or two satellites and single, double or triple replicas as described here. We typically use only 1 satellite single replica config. + + * The satellite replication factor can be configured with one or two satellites and single, double or triple replicas as described here. We typically use only 1 satellite single replica config. * Only 1 satellite is configured in the primary DC. We describe the background knowledge -- Sharding and Tag structure -- before we discuss how a mutation travels in a FDB HA cluster. Sharding: Which shard goes to which servers? -================= +============================================ A shard is a continuous key range. FDB divides the entire keyspace to thousands of shards. A mutation’s key decides which shard it belongs to. @@ -35,7 +35,7 @@ Shard-to-tLog mapping is decided by shard-to-SS mapping and tLog’s replication Tag structure -================= +============= Tag is an overloaded term in FDB. In the early history of FDB, a tag is a number used in SS-to-tag mapping. As FDB evolves, tags are used by different components for different purposes: @@ -50,15 +50,15 @@ To distinguish the types of tags used for different purposes at different locati * locality (int8_t): When it is non-negative value, it decides which DC id the tag is used in. For example, if it is 0, it means the tag is used in primary DC and the tag’s id represents a storage server and is used for primary tLogs to index by storage servers. When it is negative, it decides which types of tags the tag belongs to. For example, if it is -2, it is a log router tag, and its id is used to decide which log router the tagged mutation should be sent to. The definition of all localities are in FDBTypes.h and you can easily find it if you search tagLocalitySpecial in the file. * id (uint16_t): Once locality decides which FDB components will the tag be applied to, id decides which process in the component type will be used for the tagged mutation. - - * FDB components in this context means (i) which DC of tLogs, and (ii) which types of tLogs. + + * FDB components in this context means (i) which DC of tLogs, and (ii) which types of tLogs. To simplify our discussion in the document, we use “tag.id” to represent a tag’s id, and tag as the Tag structure that has both locality and id. We represent a Tag as (locality, id). How does a mutation travel in FDB? -================= +================================== To simplify the description, we ignore the batching mechanisms happening in each component in the data path that are used to improve the system’s performance. @@ -67,12 +67,12 @@ Figure 1 illustrates how a mutation is routed inside FDB. The solid lines are as .. image:: /images/FDB_ha_write_path.png At Client ------------------ +--------- When an application creates a transaction and writes mutations, its FDB client sends the set of mutations to a proxy, say proxy 0. Now let’s focus on one of the normal mutations, say m1, whose key is in the normal keyspace. At Proxy ------------------ +-------- **Sequencing.** *It first asks the master for the commit version of this transaction batch*. The master acts like a sequencer for FDB transactions to determine the order of transactions to commit by assigning a new commit version and the last assigned commit version as the previous commit version. The transaction log system will use the [previous commit version, commit version] pair to determine its commit order, i.e., only make this transaction durable after the transaction with the previous commit version is made durable. @@ -102,21 +102,21 @@ Proxy groups mutations with the same tag as messages. Proxy then synchronously p At primary tLogs and satellite tLogs ------------------ +------------------------------------ Once it receives mutations pushed by proxies, it builds indexes for each tag’s mutations. Primary TLogs index both log router tags and the primary DC's SS tags. Satellite tLogs only index log router tags. If tLogs’ mutations cannot be peeked and popped by its consumers (i.e., SSes and log routers) quickly enough, tLogs’ memory usage will increase. When buffered mutations exceed 1.5GB (configurable by knob), their in-memory index will be spilled into a “Tag,version->disk location” B-tree. tLogs also maintain two properties: - + * It will not make a mutation at version V1 durable until mutations before V1 has been made durable; * It will not pop (i.e., delete) mutations at version V2, until mutations before V2 have been popped. At primary SS ------------------ +------------- **Primary tLog of a SS.** Since a SS’s tag is identically mapped to one tLog. The tLog has all mutations for the SS and is the primary tLog for the SS. When the SS peeks data from tLogs, it will prefer to peek data from its primary tLog. If the primary tLog crashes, it will contact the rest of tLogs, ask for mutations with the SS’s tag, and merge them together. This complex merge operation is abstracted in the TagPartitionedLogSystem interface. @@ -128,7 +128,7 @@ Now let’s look at how the mutation m1 is routed to the remote DC. At log router ------------------ +------------- Log routers are consumers of satellite tLogs or primary tLogs, controlled by a knob LOG_ROUTER_PEEK_FROM_SATELLITES_PREFERRED. By default, the knob is configured for log routers to use satellite tLogs. This relationship is similar to primary SSes to primary tLogs. @@ -138,7 +138,7 @@ Log router buffers its mutations in memory and waits for the remote tLogs to pee At remote tLogs ------------------ +--------------- Remote tLogs are consumers of log routers. Each remote tLog keeps pulling mutations, which have the remote tLog’s tag, from log routers. Because log router tags are randomly chosen for mutations, a remote tLog’s mutations can spread across all log routers. So each remote tLog must contact all log routers for its data and merge these mutations in increasing order of versions on the remote tLog. @@ -147,34 +147,34 @@ Once a remote tLog collects and merge mutations from all log routers, it makes t Now the mutation m1 has arrived at the remote tLog, which is similar as when it arrives at the primary tLog. -At remote SSes. ------------------ +At remote SSes +-------------- Similar to how primary SSes pull mutations from primary tLogs, each remote SS keeps pulling mutations, which have its tag, from remote tLogs. Once a remote SS makes mutations up to a version V1 durable, the SS pops its tag to the version V1 from all remote tLogs. Implementation -================= +============== * proxy assigns tags to a mutation: -https://github.com/xumengpanda/foundationdb/blob/063700e4d60cd44c1f32413761e3fe7571fab9c0/fdbserver/MasterProxyServer.actor.cpp#L824 +https://github.com/apple/foundationdb/blob/7eabdf784a21bca102f84e7eaf14bafc54605dff/fdbserver/MasterProxyServer.actor.cpp#L1410 Mutation Serialization (WiP) -================= +============================ This section will go into detail on how mutations are serialized as preparation for ingestion into the TagPartitionedLogSystem. This has also been covered at: -https://drive.google.com/file/d/1OaP5bqH2kst1VxD6RWj8h2cdr9rhhBHy/view. +https://drive.google.com/file/d/1OaP5bqH2kst1VxD6RWj8h2cdr9rhhBHy/view The proxy handles splitting transactions into their individual mutations. These mutations are then serialized and synchronously sent to multiple transaction logs. -The process starts in *commitBatch*. Eventually, *assignMutationsToStorageServers* is called to assign mutations to storage servers and serialize them. This function loops over each mutation in each transaction, determining the set of tags for the mutation (which storage servers it will be sent to), and then calling *LogPushData::writeTypedMessage* on the mutation. +The process starts in *commitBatch*. Eventually, *assignMutationsToStorageServers* is called to assign mutations to storage servers and serialize them. This function loops over each mutation in each transaction, determining the set of tags for the mutation (which storage servers it will be sent to), and then calling *LogPushData.writeTypedMessage* on the mutation. The *LogPushData* class is used to hold serialized mutations on a per transaction log basis. It’s *messagesWriter* field holds one *BinaryWriter* per transaction log. -*LogPushData::writeTypedMessage* is the function that serializes each mutation and writes it to the correct binary stream to be sent to the corresponding transaction log. Each serialized mutation contains additional metadata about the message, with the format: +*LogPushData.writeTypedMessage* is the function that serializes each mutation and writes it to the correct binary stream to be sent to the corresponding transaction log. Each serialized mutation contains additional metadata about the message, with the format: .. image:: /images/serialized_mutation_metadata_format.png diff --git a/documentation/sphinx/source/technical-overview.rst b/documentation/sphinx/source/technical-overview.rst index f66dfb3311..af0021f3f9 100644 --- a/documentation/sphinx/source/technical-overview.rst +++ b/documentation/sphinx/source/technical-overview.rst @@ -30,6 +30,8 @@ These documents explain the engineering design of FoundationDB, with detailed in * :doc:`read-write-path` describes how FDB read and write path works. +* :doc:`ha-write-path` describes how FDB write path works in HA setting. + .. toctree:: :maxdepth: 1 :titlesonly: @@ -48,3 +50,4 @@ These documents explain the engineering design of FoundationDB, with detailed in testing kv-architecture read-write-path + ha-write-path diff --git a/fdbserver/IPager.h b/fdbserver/IPager.h index cf68e98de5..22ee4b94ca 100644 --- a/fdbserver/IPager.h +++ b/fdbserver/IPager.h @@ -42,10 +42,10 @@ typedef uint32_t QueueID; // Pager Events enum class PagerEvents { CacheLookup = 0, CacheHit, CacheMiss, PageWrite, MAXEVENTS }; -static const std::string PagerEventsCodes[] = { "Lookup", "Hit", "Miss", "Write" }; +static const std::string PagerEventsStrings[] = { "Lookup", "Hit", "Miss", "Write", "Unknown" }; // Reasons for page level events. enum class PagerEventReasons { PointRead = 0, RangeRead, RangePrefetch, Commit, LazyClear, MetaData, MAXEVENTREASONS }; -static const std::string PagerEventReasonsCodes[] = { "Get", "GetR", "GetRPF", "Commit", "LazyClr", "Meta" }; +static const std::string PagerEventReasonsStrings[] = { "Get", "GetR", "GetRPF", "Commit", "LazyClr", "Meta", "Unknown" }; static const int nonBtreeLevel = 0; static const std::pair possibleEventReasonPairs[] = { @@ -167,6 +167,7 @@ public: virtual Future> getPhysicalPage(PagerEventReasons reason, unsigned int level, LogicalPageID pageID, + int priority, bool cacheable, bool nohit) = 0; virtual bool tryEvictPage(LogicalPageID id) = 0; @@ -238,8 +239,9 @@ public: virtual Future> readPage(PagerEventReasons reason, unsigned int level, LogicalPageID pageID, - bool cacheable = true, - bool noHit = false) = 0; + int priority, + bool cacheable, + bool noHit) = 0; virtual Future> readExtent(LogicalPageID pageID) = 0; virtual void releaseExtentReadLock() = 0; diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 713f0784f0..d42a65d3c9 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -18,8 +18,12 @@ * limitations under the License. */ +#include "fdbserver/Knobs.h" +#include "flow/IRandom.h" +#include "flow/Knobs.h" #include "flow/flow.h" #include "flow/Histogram.h" +#include #include #include "fdbrpc/ContinuousSample.h" #include "fdbserver/IPager.h" @@ -97,6 +101,156 @@ void writePrefixedLines(FILE* fout, std::string prefix, std::string msg) { } } +#define PRIORITYMULTILOCK_DEBUG 0 + +// A multi user lock with a concurrent holder limit where waiters are granted the lock according to +// an integer priority from 0 to maxPriority, inclusive, where higher integers are given priority. +// +// The interface is similar to FlowMutex except that lock holders can drop the lock to release it. +// +// Usage: +// Lock lock = wait(prioritylock.lock(priorityLevel)); +// lock.release(); // Explicit release, or +// // let lock and all copies of lock go out of scope to release +class PriorityMultiLock { + +public: + // Waiting on the lock returns a Lock, which is really just a Promise + // Calling release() is not necessary, it exists in case the Lock holder wants to explicitly release + // the Lock before it goes out of scope. + struct Lock { + void release() { promise.send(Void()); } + + // This is exposed in case the caller wants to use/copy it directly + Promise promise; + }; + +private: + typedef Promise Slot; + typedef Deque Queue; + +#if PRIORITYMULTILOCK_DEBUG +#define prioritylock_printf(...) printf(__VA_ARGS__) +#else +#define prioritylock_printf(...) +#endif + +public: + PriorityMultiLock(int concurrency, int maxPriority) : concurrency(concurrency), available(concurrency), waiting(0) { + waiters.resize(maxPriority + 1); + fRunner = runner(this); + } + + ~PriorityMultiLock() { prioritylock_printf("destruct"); } + + Future lock(int priority = 0) { + prioritylock_printf("lock begin %s\n", toString().c_str()); + + // This shortcut may enable a waiter to jump the line when the releaser loop yields + if (available > 0) { + --available; + Lock p; + addRunner(p); + prioritylock_printf("lock exit immediate %s\n", toString().c_str()); + return p; + } + + Slot s; + waiters[priority].push_back(s); + ++waiting; + prioritylock_printf("lock exit queued %s\n", toString().c_str()); + return s.getFuture(); + } + +private: + void addRunner(Lock& lock) { + runners.push_back(map(ready(lock.promise.getFuture()), [=](Void) { + prioritylock_printf("Lock released\n"); + ++available; + if (waiting > 0 || runners.size() > 100) { + release.trigger(); + } + return Void(); + })); + } + + ACTOR static Future runner(PriorityMultiLock* self) { + state int sinceYield = 0; + state Future error = self->brokenOnDestruct.getFuture(); + state int maxPriority = self->waiters.size() - 1; + + loop { + // Cleanup finished runner futures at the front of the runner queue. + while (!self->runners.empty() && self->runners.front().isReady()) { + self->runners.pop_front(); + } + + // Wait for a runner to release its lock + wait(self->release.onTrigger()); + prioritylock_printf("runner wakeup %s\n", self->toString().c_str()); + + if (++sinceYield == 1000) { + sinceYield = 0; + wait(delay(0)); + } + + // While there are available slots and there are waiters, launch tasks + int priority = maxPriority; + + while (self->available > 0 && self->waiting > 0) { + auto& q = self->waiters[priority]; + prioritylock_printf( + "Checking priority=%d prioritySize=%d %s\n", priority, q.size(), self->toString().c_str()); + + while (!q.empty()) { + Slot s = q.front(); + q.pop_front(); + --self->waiting; + Lock lock; + prioritylock_printf(" Running waiter priority=%d prioritySize=%d\n", priority, q.size()); + s.send(lock); + + // Self may have been destructed during the lock callback + if (error.isReady()) { + throw error.getError(); + } + + // If the lock was not already released, add it to the runners future queue + if (lock.promise.canBeSet()) { + self->addRunner(lock); + + // A slot has been consumed, so stop reading from this queue if there aren't any more + if (--self->available == 0) { + break; + } + } + } + + // Wrap around to highest priority + if (priority == 0) { + priority = maxPriority; + } else { + --priority; + } + } + } + } + + std::string toString() const { + return format( + "{ slots=%d/%d waiting=%d runners=%d }", (concurrency - available), concurrency, waiting, runners.size()); + } + + int concurrency; + int available; + int waiting; + std::vector waiters; + Deque> runners; + Future fRunner; + AsyncTrigger release; + Promise brokenOnDestruct; +}; + // Some convenience functions for debugging to stringify various structures // Classes can add compatibility by either specializing toString or implementing // std::string toString() const; @@ -209,6 +363,10 @@ std::string toString(const std::pair& o) { return format("{%s, %s}", toString(o.first).c_str(), toString(o.second).c_str()); } +static constexpr int ioMinPriority = 0; +static constexpr int ioLeafPriority = 1; +static constexpr int ioMaxPriority = 2; + // A FIFO queue of T stored as a linked list of pages. // Main operations are pop(), pushBack(), pushFront(), and flush(). // @@ -469,9 +627,8 @@ public: nextPageID = id; debug_printf( "FIFOQueue::Cursor(%s) loadPage start id=%s\n", toString().c_str(), ::toString(nextPageID).c_str()); - nextPageReader = - waitOrError(queue->pager->readPage(PagerEventReasons::MetaData, nonBtreeLevel, nextPageID, true), - queue->pagerError); + nextPageReader = queue->pager->readPage( + PagerEventReasons::MetaData, nonBtreeLevel, nextPageID, ioMaxPriority, true, false); } Future loadExtent() { @@ -684,7 +841,7 @@ public: static int sinceYield = 0; if (++sinceYield == 1000) { sinceYield = 0; - wait(yield()); + wait(delay(0)); } lock.release(); @@ -734,7 +891,7 @@ public: static int sinceYield = 0; if (++sinceYield == 1000) { sinceYield = 0; - wait(yield()); + wait(delay(0)); } debug_printf("FIFOQueue::Cursor(%s) waitThenReadNext unlocking mutex\n", self->toString().c_str()); @@ -1318,13 +1475,13 @@ struct RedwoodMetrics { for (const auto& ER : L0PossibleEventReasonPairs) { if (prevEvent != ER.first) { result += "\n"; - result += PagerEventsCodes[(size_t)ER.first]; + result += PagerEventsStrings[(size_t)ER.first]; result += "\n\t"; prevEvent = ER.first; } std::string num = std::to_string(eventReasons[(size_t)ER.first][(size_t)ER.second]); - result += PagerEventReasonsCodes[(size_t)ER.second]; - result.append(16 - PagerEventReasonsCodes[(size_t)ER.second].length(), ' '); + result += PagerEventReasonsStrings[(size_t)ER.second]; + result.append(16 - PagerEventReasonsStrings[(size_t)ER.second].length(), ' '); result.append(8 - num.length(), ' '); result += num; result.append(13, ' '); @@ -1333,13 +1490,13 @@ struct RedwoodMetrics { for (const auto& ER : possibleEventReasonPairs) { if (prevEvent != ER.first) { result += "\n"; - result += PagerEventsCodes[(size_t)ER.first]; + result += PagerEventsStrings[(size_t)ER.first]; result += "\n\t"; prevEvent = ER.first; } std::string num = std::to_string(eventReasons[(size_t)ER.first][(size_t)ER.second]); - result += PagerEventReasonsCodes[(size_t)ER.second]; - result.append(16 - PagerEventReasonsCodes[(size_t)ER.second].length(), ' '); + result += PagerEventReasonsStrings[(size_t)ER.second]; + result.append(16 - PagerEventReasonsStrings[(size_t)ER.second].length(), ' '); result.append(8 - num.length(), ' '); result += num; result.append(13, ' '); @@ -1351,19 +1508,19 @@ struct RedwoodMetrics { if (h == 0) { for (const auto& ER : L0PossibleEventReasonPairs) { t->detail( - format( - "L%d%s", - h, - (PagerEventsCodes[(size_t)ER.first] + PagerEventReasonsCodes[(size_t)ER.second]).c_str()), + format("L%d%s", + h, + (PagerEventsStrings[(size_t)ER.first] + PagerEventReasonsStrings[(size_t)ER.second]) + .c_str()), eventReasons[(size_t)ER.first][(size_t)ER.second]); } } else { for (const auto& ER : possibleEventReasonPairs) { t->detail( - format( - "L%d%s", - h, - (PagerEventsCodes[(size_t)ER.first] + PagerEventReasonsCodes[(size_t)ER.second]).c_str()), + format("L%d%s", + h, + (PagerEventsStrings[(size_t)ER.first] + PagerEventReasonsStrings[(size_t)ER.second]) + .c_str()), eventReasons[(size_t)ER.first][(size_t)ER.second]); } } @@ -1459,9 +1616,9 @@ struct RedwoodMetrics { kvSizeWritten = Histogram::getHistogram(LiteralStringRef("kvSize"), LiteralStringRef("Written"), Histogram::Unit::bytes); kvSizeReadByGet = - Histogram::getHistogram(LiteralStringRef("kvSize"), LiteralStringRef("ReadByGet "), Histogram::Unit::bytes); - kvSizeReadByRangeGet = Histogram::getHistogram( - LiteralStringRef("kvSize"), LiteralStringRef("ReadByRangeGet"), Histogram::Unit::bytes); + Histogram::getHistogram(LiteralStringRef("kvSize"), LiteralStringRef("ReadByGet"), Histogram::Unit::bytes); + kvSizeReadByGetRange = Histogram::getHistogram( + LiteralStringRef("kvSize"), LiteralStringRef("ReadByGetRange"), Histogram::Unit::bytes); clear(); } @@ -1476,7 +1633,7 @@ struct RedwoodMetrics { kvSizeWritten->clear(); kvSizeReadByGet->clear(); - kvSizeReadByRangeGet->clear(); + kvSizeReadByGetRange->clear(); startTime = g_network ? now() : 0; } @@ -1485,7 +1642,7 @@ struct RedwoodMetrics { metrics metric; Reference kvSizeWritten; Reference kvSizeReadByGet; - Reference kvSizeReadByRangeGet; + Reference kvSizeReadByGetRange; double startTime; // Return number of pages read or written, from cache or disk @@ -1503,12 +1660,14 @@ struct RedwoodMetrics { return levels[level]; } - void updateMaxRecordCount(int maxRecordCount) { - this->maxRecordCount = maxRecordCount; - for (int i = 0; i < btreeLevels + 1; ++i) { - auto& level = levels[i]; - level.buildItemCountSketch->updateUpperBound(maxRecordCount); - level.modifyItemCountSketch->updateUpperBound(maxRecordCount); + void updateMaxRecordCount(int maxRecords) { + if (maxRecordCount != maxRecords) { + maxRecordCount = maxRecords; + for (int i = 0; i < btreeLevels + 1; ++i) { + auto& level = levels[i]; + level.buildItemCountSketch->updateUpperBound(maxRecordCount); + level.modifyItemCountSketch->updateUpperBound(maxRecordCount); + } } } @@ -1924,7 +2083,8 @@ public: Promise errorPromise = {}) : desiredPageSize(desiredPageSize), desiredExtentSize(desiredExtentSize), filename(filename), pHeader(nullptr), pageCacheBytes(pageCacheSizeBytes), memoryOnly(memoryOnly), remapCleanupWindow(remapCleanupWindow), - concurrentExtentReads(new FlowLock(concurrentExtentReads)), errorPromise(errorPromise) { + concurrentExtentReads(new FlowLock(concurrentExtentReads)), errorPromise(errorPromise), + ioLock(FLOW_KNOBS->MAX_OUTSTANDING, ioMaxPriority) { if (!g_redwoodMetricsActor.isValid()) { g_redwoodMetricsActor = redwoodMetricsLogger(); @@ -1935,9 +2095,8 @@ public: } void setPageSize(int size) { - if (g_redwoodMetrics.maxRecordCount != 315 * size / 4096) { - g_redwoodMetrics.updateMaxRecordCount(315 * size / 4096); - } + g_redwoodMetrics.updateMaxRecordCount(315 * size / 4096); + logicalPageSize = size; // Physical page size is the total size of the smallest number of physical blocks needed to store // logicalPageSize bytes @@ -2334,45 +2493,55 @@ public: Future newExtentPageID(QueueID queueID) override { return newExtentPageID_impl(this, queueID); } + ACTOR static Future writePhysicalPage_impl(DWALPager* self, + PagerEventReasons reason, + unsigned int level, + PhysicalPageID pageID, + Reference page, + bool header = false) { + + debug_printf("DWALPager(%s) op=%s %s ptr=%p\n", + self->filename.c_str(), + (header ? "writePhysicalHeader" : "writePhysical"), + toString(pageID).c_str(), + page->begin()); + + VALGRIND_MAKE_MEM_DEFINED(page->begin(), page->size()); + page->updateChecksum(pageID); + debug_printf("DWALPager(%s) writePhysicalPage %s CalculatedChecksum=%d ChecksumInPage=%d\n", + self->filename.c_str(), + toString(pageID).c_str(), + page->calculateChecksum(pageID), + page->getChecksum()); + + state PriorityMultiLock::Lock lock = wait(self->ioLock.lock(header ? ioMaxPriority : ioMinPriority)); + ++g_redwoodMetrics.metric.pagerDiskWrite; + g_redwoodMetrics.level(level).metrics.eventReasons.addEventReason(PagerEvents::PageWrite, reason); + + if (self->memoryOnly) { + return Void(); + } + + // Note: Not using forwardError here so a write error won't be discovered until commit time. + state int blockSize = header ? smallestPhysicalBlock : self->physicalPageSize; + wait(self->pageFile->write(page->begin(), blockSize, (int64_t)pageID * blockSize)); + + debug_printf("DWALPager(%s) op=%s %s ptr=%p file offset=%d\n", + self->filename.c_str(), + (header ? "writePhysicalHeaderComplete" : "writePhysicalComplete"), + toString(pageID).c_str(), + page->begin(), + (pageID * blockSize)); + + return Void(); + } + Future writePhysicalPage(PagerEventReasons reason, unsigned int level, PhysicalPageID pageID, Reference page, bool header = false) { - debug_printf("DWALPager(%s) op=%s %s ptr=%p\n", - filename.c_str(), - (header ? "writePhysicalHeader" : "writePhysical"), - toString(pageID).c_str(), - page->begin()); - - ++g_redwoodMetrics.metric.pagerDiskWrite; - auto& eventReasons = g_redwoodMetrics.level(level).metrics.eventReasons; - eventReasons.addEventReason(PagerEvents::PageWrite, reason); - - VALGRIND_MAKE_MEM_DEFINED(page->begin(), page->size()); - page->updateChecksum(pageID); - debug_printf("DWALPager(%s) writePhysicalPage %s CalculatedChecksum=%d ChecksumInPage=%d\n", - filename.c_str(), - toString(pageID).c_str(), - page->calculateChecksum(pageID), - page->getChecksum()); - - if (memoryOnly) { - return Void(); - } - - // Note: Not using forwardError here so a write error won't be discovered until commit time. - int blockSize = header ? smallestPhysicalBlock : physicalPageSize; - Future f = - holdWhile(page, map(pageFile->write(page->begin(), blockSize, (int64_t)pageID * blockSize), [=](Void) { - debug_printf("DWALPager(%s) op=%s %s ptr=%p file offset=%d\n", - filename.c_str(), - (header ? "writePhysicalHeaderComplete" : "writePhysicalComplete"), - toString(pageID).c_str(), - page->begin(), - (pageID * blockSize)); - return Void(); - })); + Future f = writePhysicalPage_impl(this, reason, level, pageID, page, header); operations.add(f); return f; } @@ -2387,7 +2556,7 @@ public: Reference data) override { // Get the cache entry for this page, without counting it as a cache hit as we're replacing its contents now // or as a cache miss because there is no benefit to the page already being in cache - // this metaData reason will not be accounted since its not a cache hit or cache miss + // Similarly, this does not count as a point lookup for reason. PageCacheEntry& cacheEntry = pageCache.get(pageID, true); debug_printf("DWALPager(%s) op=write %s cached=%d reading=%d writing=%d\n", filename.c_str(), @@ -2555,13 +2724,13 @@ public: // and before the user-chosen sized pages. ACTOR static Future> readPhysicalPage(DWALPager* self, PhysicalPageID pageID, - bool header = false) { + int priority, + bool header) { ASSERT(!self->memoryOnly); - ++g_redwoodMetrics.metric.pagerDiskRead; - if (g_network->getCurrentTask() > TaskPriority::DiskRead) { - wait(delay(0, TaskPriority::DiskRead)); - } + // if (g_network->getCurrentTask() > TaskPriority::DiskRead) { + // wait(delay(0, TaskPriority::DiskRead)); + // } state Reference page = header ? Reference(new ArenaPage(smallestPhysicalBlock, smallestPhysicalBlock)) @@ -2571,8 +2740,11 @@ public: toString(pageID).c_str(), page->begin()); - int blockSize = header ? smallestPhysicalBlock : self->physicalPageSize; + state PriorityMultiLock::Lock lock = wait(self->ioLock.lock(std::min(priority, ioMaxPriority))); + ++g_redwoodMetrics.metric.pagerDiskRead; + // TODO: Could a dispatched read try to write to page after it has been destroyed if this actor is cancelled? + int blockSize = header ? smallestPhysicalBlock : self->physicalPageSize; int readBytes = wait(self->pageFile->read(page->mutate(), blockSize, (int64_t)pageID * blockSize)); debug_printf("DWALPager(%s) op=readPhysicalComplete %s ptr=%p bytes=%d\n", self->filename.c_str(), @@ -2602,7 +2774,7 @@ public: } static Future> readHeaderPage(DWALPager* self, PhysicalPageID pageID) { - return readPhysicalPage(self, pageID, true); + return readPhysicalPage(self, pageID, ioMaxPriority, true); } bool tryEvictPage(LogicalPageID logicalID, Version v) { @@ -2615,8 +2787,9 @@ public: Future> readPage(PagerEventReasons reason, unsigned int level, LogicalPageID pageID, + int priority, bool cacheable, - bool noHit = false) override { + bool noHit) override { // Use cached page if present, without triggering a cache hit. // Otherwise, read the page and return it but don't add it to the cache auto& eventReasons = g_redwoodMetrics.level(level).metrics.eventReasons; @@ -2631,7 +2804,7 @@ public: } ++g_redwoodMetrics.metric.pagerProbeMiss; debug_printf("DWALPager(%s) op=readUncachedMiss %s\n", filename.c_str(), toString(pageID).c_str()); - return forwardError(readPhysicalPage(this, (PhysicalPageID)pageID), errorPromise); + return forwardError(readPhysicalPage(this, (PhysicalPageID)pageID, priority, false), errorPromise); } PageCacheEntry& cacheEntry = pageCache.get(pageID, noHit); @@ -2644,7 +2817,8 @@ public: noHit); if (!cacheEntry.initialized()) { debug_printf("DWALPager(%s) issuing actual read of %s\n", filename.c_str(), toString(pageID).c_str()); - cacheEntry.readFuture = forwardError(readPhysicalPage(this, (PhysicalPageID)pageID), errorPromise); + cacheEntry.readFuture = + forwardError(readPhysicalPage(this, (PhysicalPageID)pageID, priority, false), errorPromise); cacheEntry.writeFuture = Void(); ++g_redwoodMetrics.metric.pagerCacheMiss; @@ -2688,11 +2862,12 @@ public: Future> readPageAtVersion(PagerEventReasons reason, unsigned int level, LogicalPageID logicalID, + int priority, Version v, bool cacheable, bool noHit) { PhysicalPageID physicalID = getPhysicalPageID(logicalID, v); - return readPage(reason, level, physicalID, cacheable, noHit); + return readPage(reason, level, physicalID, priority, cacheable, noHit); } void releaseExtentReadLock() override { concurrentExtentReads->release(); } @@ -2706,7 +2881,6 @@ public: wait(self->concurrentExtentReads->take()); ASSERT(!self->memoryOnly); - ++g_redwoodMetrics.metric.pagerDiskRead; if (g_network->getCurrentTask() > TaskPriority::DiskRead) { wait(delay(0, TaskPriority::DiskRead)); @@ -2742,6 +2916,7 @@ public: for (i = 0; i < parallelReads; i++) { currentOffset = i * physicalReadSize; debug_printf("DWALPager(%s) current offset %d\n", self->filename.c_str(), currentOffset); + ++g_redwoodMetrics.metric.pagerDiskRead; reads.push_back( self->pageFile->read(extent->mutate() + currentOffset, physicalReadSize, startOffset + currentOffset)); } @@ -2754,6 +2929,7 @@ public: i, currentOffset, lastReadSize); + ++g_redwoodMetrics.metric.pagerDiskRead; reads.push_back( self->pageFile->read(extent->mutate() + currentOffset, lastReadSize, startOffset + currentOffset)); } @@ -2776,12 +2952,10 @@ public: PageCacheEntry* pCacheEntry = extentCache.getIfExists(pageID); auto& eventReasons = g_redwoodMetrics.level(0).metrics.eventReasons; if (pCacheEntry != nullptr) { - ++g_redwoodMetrics.metric.pagerProbeHit; eventReasons.addEventReason(PagerEvents::CacheLookup, PagerEventReasons::MetaData); debug_printf("DWALPager(%s) Cache Entry exists for %s\n", filename.c_str(), toString(pageID).c_str()); return pCacheEntry->readFuture; } - ++g_redwoodMetrics.metric.pagerProbeMiss; eventReasons.addEventReason(PagerEvents::CacheLookup, PagerEventReasons::MetaData); LogicalPageID headPageID = pHeader->remapQueue.headPageID; @@ -2933,8 +3107,8 @@ public: debug_printf("DWALPager(%s) remapCleanup copy %s\n", self->filename.c_str(), p.toString().c_str()); // Read the data from the page that the original was mapped to - Reference data = - wait(self->readPage(PagerEventReasons::MetaData, nonBtreeLevel, p.newPageID, false, true)); + Reference data = wait( + self->readPage(PagerEventReasons::MetaData, nonBtreeLevel, p.newPageID, ioLeafPriority, false, true)); // Write the data to the original page so it can be read using its original pageID self->updatePage(PagerEventReasons::MetaData, nonBtreeLevel, p.originalPageID, data); @@ -3338,6 +3512,9 @@ private: int physicalExtentSize; int pagesPerExtent; +private: + PriorityMultiLock ioLock; + int64_t pageCacheBytes; // The header will be written to / read from disk as a smallestPhysicalBlock sized chunk. @@ -3410,12 +3587,13 @@ public: Future> getPhysicalPage(PagerEventReasons reason, unsigned int level, LogicalPageID pageID, + int priority, bool cacheable, bool noHit) override { if (expired.isError()) { throw expired.getError(); } - return map(pager->readPageAtVersion(reason, level, pageID, version, cacheable, noHit), + return map(pager->readPageAtVersion(reason, level, pageID, priority, version, cacheable, noHit), [=](Reference p) { return Reference(std::move(p)); }); } @@ -4358,13 +4536,16 @@ public: if (!q.present()) { break; } + // Start reading the page, without caching - - entries.push_back(std::make_pair( - q.get(), - self->readPage( - PagerEventReasons::LazyClear, q.get().height, snapshot, q.get().pageID, true, false))); - + entries.push_back(std::make_pair(q.get(), + self->readPage(PagerEventReasons::LazyClear, + q.get().height, + snapshot, + q.get().pageID, + ioLeafPriority, + true, + false))); --toPop; } @@ -4373,12 +4554,13 @@ public: Reference p = wait(entries[i].second); const LazyClearQueueEntry& entry = entries[i].first; const BTreePage& btPage = *(BTreePage*)p->begin(); - auto& metrics = g_redwoodMetrics.level(btPage.height).metrics; + ASSERT(btPage.height == entry.height); + auto& metrics = g_redwoodMetrics.level(entry.height).metrics; debug_printf("LazyClear: processing %s\n", toString(entry).c_str()); // Level 1 (leaf) nodes should never be in the lazy delete queue - ASSERT(btPage.height > 1); + ASSERT(entry.height > 1); // Iterate over page entries, skipping key decoding using BTreePage::ValueTree which uses // RedwoodRecordRef::DeltaValueOnly as the delta type type to skip key decoding @@ -4390,7 +4572,7 @@ public: if (c.get().value.present()) { BTreePageIDRef btChildPageID = c.get().getChildPage(); // If this page is height 2, then the children are leaves so free them directly - if (btPage.height == 2) { + if (entry.height == 2) { debug_printf("LazyClear: freeing child %s\n", toString(btChildPageID).c_str()); self->freeBTreePage(btChildPageID, v); freedPages += btChildPageID.size(); @@ -4399,7 +4581,8 @@ public: } else { // Otherwise, queue them for lazy delete. debug_printf("LazyClear: queuing child %s\n", toString(btChildPageID).c_str()); - self->m_lazyClearQueue.pushFront(LazyClearQueueEntry{ btPage.height, v, btChildPageID }); + self->m_lazyClearQueue.pushFront( + LazyClearQueueEntry{ (uint8_t)(entry.height - 1), v, btChildPageID }); metrics.lazyClearRequeue += 1; metrics.lazyClearRequeueExt += (btChildPageID.size() - 1); } @@ -5037,6 +5220,7 @@ private: break; } } + // Use the next entry as the upper bound, or upperBound if there are no more entries beyond this page int endIndex = p.endIndex(); bool lastPage = endIndex == entries.size(); @@ -5062,7 +5246,6 @@ private: btPage->height = height; btPage->kvBytes = p.kvBytes; - g_redwoodMetrics.kvSizeWritten->sample(p.kvBytes); debug_printf("Building tree for %s\nlower: %s\nupper: %s\n", @@ -5174,6 +5357,7 @@ private: // While there are multiple child pages for this version we must write new tree levels. while (records.size() > 1) { self->m_pHeader->height = ++height; + ASSERT(height < std::numeric_limits::max()); Standalone> newRecords = wait(writePages(self, &dbBegin, &dbEnd, records, height, version, BTreePageIDRef())); debug_printf("Wrote a new root level at version %" PRId64 " height %d size %lu pages\n", @@ -5201,8 +5385,9 @@ private: unsigned int level, Reference snapshot, BTreePageIDRef id, - bool forLazyClear = false, - bool cacheable = true) { + int priority, + bool forLazyClear, + bool cacheable) { debug_printf("readPage() op=read%s %s @%" PRId64 "\n", forLazyClear ? "ForDeferredClear" : "", @@ -5212,13 +5397,14 @@ private: state Reference page; if (id.size() == 1) { - Reference p = wait(snapshot->getPhysicalPage(reason, level, id.front(), cacheable, false)); + Reference p = + wait(snapshot->getPhysicalPage(reason, level, id.front(), priority, cacheable, false)); page = std::move(p); } else { ASSERT(!id.empty()); std::vector>> reads; for (auto& pageID : id) { - reads.push_back(snapshot->getPhysicalPage(reason, level, pageID, cacheable, false)); + reads.push_back(snapshot->getPhysicalPage(reason, level, pageID, priority, cacheable, false)); } std::vector> pages = wait(getAll(reads)); // TODO: Cache reconstituted super pages somehow, perhaps with help from the Pager. @@ -5264,13 +5450,13 @@ private: ((BTreePage*)page->begin())->tree()); } - static void preLoadPage(IPagerSnapshot* snapshot, unsigned int l, BTreePageIDRef id) { + static void preLoadPage(IPagerSnapshot* snapshot, unsigned int l, BTreePageIDRef id, int priority) { g_redwoodMetrics.metric.btreeLeafPreload += 1; g_redwoodMetrics.metric.btreeLeafPreloadExt += (id.size() - 1); for (auto pageID : id) { - snapshot->getPhysicalPage( - PagerEventReasons::RangePrefetch, nonBtreeLevel, pageID, true, true); // prefetch btree leaf node + // Prefetches are always at the Leaf level currently so it isn't part of the per-level metrics set + snapshot->getPhysicalPage(PagerEventReasons::RangePrefetch, nonBtreeLevel, pageID, priority, true, true); } } @@ -5283,7 +5469,7 @@ private: // Write new version of pageID at version v using page as its data. // Attempts to reuse original id(s) in btPageID, returns BTreePageID. - // UpdateBtreePage is only called from commitSubTree funciton + // updateBTreePage is only called from commitSubTree function so write reason is always btree commit ACTOR static Future updateBTreePage(VersionedBTree* self, BTreePageIDRef oldID, Arena* arena, @@ -5304,10 +5490,10 @@ private: : btPage->toString(true, oldID, writeVersion, cache->lowerBound, cache->upperBound).c_str()); } + state int height = ((BTreePage*)page->begin())->height; if (oldID.size() == 1) { - BTreePage* btPage = (BTreePage*)page->begin(); - LogicalPageID id = wait(self->m_pager->atomicUpdatePage( - PagerEventReasons::Commit, btPage->height, oldID.front(), page, writeVersion)); + LogicalPageID id = wait( + self->m_pager->atomicUpdatePage(PagerEventReasons::Commit, height, oldID.front(), page, writeVersion)); newID.front() = id; } else { state std::vector> pages; @@ -5326,9 +5512,8 @@ private: // Write pages, trying to reuse original page IDs state int i = 0; for (; i < pages.size(); ++i) { - BTreePage* btPage = (BTreePage*)page->begin(); LogicalPageID id = wait(self->m_pager->atomicUpdatePage( - PagerEventReasons::Commit, btPage->height, oldID[i], pages[i], writeVersion)); + PagerEventReasons::Commit, height, oldID[i], pages[i], writeVersion)); newID[i] = id; } } @@ -5654,8 +5839,7 @@ private: Reference snapshot, MutationBuffer* mutationBuffer, BTreePageIDRef rootID, - bool isLeaf, - unsigned int l, + int height, MutationBuffer::const_iterator mBegin, // greatest mutation boundary <= subtreeLowerBound->key MutationBuffer::const_iterator mEnd, // least boundary >= subtreeUpperBound->key InternalPageSliceUpdate* update) { @@ -5682,7 +5866,7 @@ private: } state Reference page = - wait(readPage(PagerEventReasons::Commit, l, snapshot, rootID, false, false)); + wait(readPage(PagerEventReasons::Commit, height, snapshot, rootID, height, false, true)); state Version writeVersion = self->getLastCommittedVersion() + 1; // If the page exists in the cache, it must be copied before modification. @@ -5692,9 +5876,8 @@ private: state Reference pageCopy; state BTreePage* btPage = (BTreePage*)page->begin(); - ASSERT(isLeaf == btPage->isLeaf()); - ASSERT(btPage != nullptr); - ++g_redwoodMetrics.level(btPage->height).metrics.pageCommitStart; + ASSERT(height == btPage->height); + ++g_redwoodMetrics.level(height).metrics.pageCommitStart; // TODO: Decide if it is okay to update if the subtree boundaries are expanded. It can result in // records in a DeltaTree being outside its decode boundary range, which isn't actually invalid @@ -5728,9 +5911,10 @@ private: } // Leaf Page - if (isLeaf) { + if (btPage->isLeaf()) { bool updating = tryToUpdate; bool changesMade = false; + state Standalone> merged; auto switchToLinearMerge = [&]() { // Couldn't make changes in place, so now do a linear merge and build new pages. @@ -5990,16 +6174,12 @@ private: } // Rebuild new page(s). - state Standalone> entries = wait(writePages(self, - &update->subtreeLowerBound, - &update->subtreeUpperBound, - merged, - btPage->height, - writeVersion, - rootID)); + state Standalone> entries = wait(writePages( + self, &update->subtreeLowerBound, &update->subtreeUpperBound, merged, height, writeVersion, rootID)); // Put new links into update and tell update that pages were rebuilt update->rebuilt(entries); + debug_printf("%s Merge complete, returning %s\n", context.c_str(), toString(*update).c_str()); return Void(); } else { @@ -6136,7 +6316,7 @@ private: while (c != u.cEnd) { RedwoodRecordRef rec = c.get(); if (rec.value.present()) { - if (btPage->height == 2) { + if (height == 2) { debug_printf("%s: freeing child page in cleared subtree range: %s\n", context.c_str(), ::toString(rec.getChildPage()).c_str()); @@ -6145,8 +6325,8 @@ private: debug_printf("%s: queuing subtree deletion cleared subtree range: %s\n", context.c_str(), ::toString(rec.getChildPage()).c_str()); - self->m_lazyClearQueue.pushFront( - LazyClearQueueEntry{ btPage->height, writeVersion, rec.getChildPage() }); + self->m_lazyClearQueue.pushBack(LazyClearQueueEntry{ + (uint8_t)(height - 1), writeVersion, rec.getChildPage() }); } } c.moveNext(); @@ -6166,8 +6346,8 @@ private: } // If this page has height of 2 then its children are leaf nodes - recursions.push_back(self->commitSubtree( - self, snapshot, mutationBuffer, pageID, btPage->height == 2, btPage->height, mBegin, mEnd, &u)); + recursions.push_back( + self->commitSubtree(self, snapshot, mutationBuffer, pageID, height - 1, mBegin, mEnd, &u)); } debug_printf( @@ -6227,8 +6407,7 @@ private: // Make sure the modifier cloned the page so we can update the child links in-place below. modifier.cloneForUpdate(); - ASSERT(btPage != nullptr); - ++g_redwoodMetrics.level(btPage->height).metrics.forceUpdate; + ++g_redwoodMetrics.level(height).metrics.forceUpdate; } // If the modifier cloned the page for updating, then update our local pageCopy, btPage, and cursor @@ -6260,7 +6439,7 @@ private: if (detachChildren) { int detached = 0; cursor.moveFirst(); - auto& stats = g_redwoodMetrics.level(btPage->height); + auto& stats = g_redwoodMetrics.level(height); while (cursor.valid()) { if (cursor.get().value.present()) { for (auto& p : cursor.get().getChildPage()) { @@ -6310,7 +6489,7 @@ private: context.c_str()); if (detachChildren) { - auto& stats = g_redwoodMetrics.level(btPage->height); + auto& stats = g_redwoodMetrics.level(height); for (auto& rec : modifier.rebuild) { if (rec.value.present()) { BTreePageIDRef oldPages = rec.getChildPage(); @@ -6341,7 +6520,7 @@ private: &update->subtreeLowerBound, &update->subtreeUpperBound, modifier.rebuild, - btPage->height, + height, writeVersion, rootID)); update->rebuilt(newChildEntries); @@ -6398,11 +6577,11 @@ private: MutationBuffer::const_iterator mBegin = mutations->upper_bound(all.subtreeLowerBound.key); --mBegin; MutationBuffer::const_iterator mEnd = mutations->lower_bound(all.subtreeUpperBound.key); + wait(commitSubtree(self, self->m_pager->getReadSnapshot(latestVersion), mutations, rootPageID, - self->m_pHeader->height == 1, self->m_pHeader->height, mBegin, mEnd, @@ -6480,24 +6659,23 @@ public: }; private: + PagerEventReasons reason; VersionedBTree* btree; Reference pager; bool valid; std::vector path; public: - BTreeCursor() {} + BTreeCursor() : reason(PagerEventReasons::MAXEVENTREASONS) {} bool intialized() const { return pager.isValid(); } bool isValid() const { return valid; } - int getHeight() { - if (!path.empty() && path.back().btPage() != nullptr) - return path.back().btPage()->height; - return 0; - } std::string toString() const { - std::string r = format("{ptr=%p %s ", this, ::toString(pager->getVersion()).c_str()); + std::string r = format("{ptr=%p reason=%s %s ", + this, + PagerEventsStrings[(int)reason].c_str(), + ::toString(pager->getVersion()).c_str()); for (int i = 0; i < path.size(); ++i) { std::string id = ""; #if REDWOOD_DEBUG @@ -6526,9 +6704,15 @@ public: PathEntry& back() { return path.back(); } void popPath() { path.pop_back(); } - Future pushPage(PagerEventReasons reason, const BTreePage::BinaryTree::Cursor& link) { + Future pushPage(const BTreePage::BinaryTree::Cursor& link) { debug_printf("pushPage(link=%s)\n", link.get().toString(false).c_str()); - return map(readPage(reason, path.back().btPage()->height - 1, pager, link.get().getChildPage()), + return map(readPage(reason, + path.back().btPage()->height - 1, + pager, + link.get().getChildPage(), + ioMaxPriority, + false, + true), [=](Reference p) { #if REDWOOD_DEBUG path.push_back({ p, getCursor(p, link), link.get().getChildPage() }); @@ -6539,26 +6723,31 @@ public: }); } - Future pushPage(PagerEventReasons reason, BTreePageIDRef id) { + Future pushPage(BTreePageIDRef id) { debug_printf("pushPage(root=%s)\n", ::toString(id).c_str()); - return map(readPage(reason, btree->m_pHeader->height, pager, id), [=](Reference p) { + return map(readPage(reason, btree->m_pHeader->height, pager, id, ioMaxPriority, false, true), + [=](Reference p) { #if REDWOOD_DEBUG - path.push_back({ p, getCursor(p, dbBegin, dbEnd), id }); + path.push_back({ p, getCursor(p, dbBegin, dbEnd), id }); #else path.push_back({ p, getCursor(p, dbBegin, dbEnd) }); #endif - return Void(); - }); + return Void(); + }); } // Initialize or reinitialize cursor - Future init(VersionedBTree* btree_in, Reference pager_in, BTreePageIDRef root) { + Future init(VersionedBTree* btree_in, + PagerEventReasons reason, + Reference pager_in, + BTreePageIDRef root) { btree = btree_in; + reason = reason; pager = pager_in; path.clear(); path.reserve(6); valid = false; - return pushPage(PagerEventReasons::PointRead, root); + return pushPage(root); } // Seeks cursor to query if it exists, the record before or after it, or an undefined and invalid @@ -6571,7 +6760,7 @@ public: // If non-zero is returned then the cursor is valid and the return value is logically equivalent // to query.compare(cursor.get()) - ACTOR Future seek_impl(BTreeCursor* self, RedwoodRecordRef query, PagerEventReasons reason) { + ACTOR Future seek_impl(BTreeCursor* self, RedwoodRecordRef query) { state RedwoodRecordRef internalPageQuery = query.withMaxPageID(); self->path.resize(1); debug_printf("seek(%s) start cursor = %s\n", query.toString().c_str(), self->toString().c_str()); @@ -6595,7 +6784,7 @@ public: if (entry.cursor.seekLessThan(internalPageQuery) && entry.cursor.get().value.present()) { debug_printf( "seek(%s) loop seek success cursor=%s\n", query.toString().c_str(), self->toString().c_str()); - Future f = self->pushPage(reason, entry.cursor); + Future f = self->pushPage(entry.cursor); wait(f); } else { self->valid = false; @@ -6606,20 +6795,18 @@ public: } } - Future seek(RedwoodRecordRef query, PagerEventReasons reason) { return seek_impl(this, query, reason); } + Future seek(RedwoodRecordRef query) { return seek_impl(this, query); } - ACTOR Future seekGTE_impl(BTreeCursor* self, RedwoodRecordRef query, PagerEventReasons reason) { + ACTOR Future seekGTE_impl(BTreeCursor* self, RedwoodRecordRef query) { debug_printf("seekGTE(%s) start\n", query.toString().c_str()); - int cmp = wait(self->seek(query, reason)); + int cmp = wait(self->seek(query)); if (cmp > 0 || (cmp == 0 && !self->isValid())) { wait(self->moveNext()); } return Void(); } - Future seekGTE(RedwoodRecordRef query, PagerEventReasons reason) { - return seekGTE_impl(this, query, reason); - } + Future seekGTE(RedwoodRecordRef query) { return seekGTE_impl(this, query); } // Start fetching sibling nodes in the forward or backward direction, stopping after recordLimit or byteLimit void prefetch(KeyRef rangeEnd, bool directionForward, int recordLimit, int byteLimit) { @@ -6644,6 +6831,8 @@ public: // Cursor for moving through siblings. // Note that only immediate siblings under the same parent are considered for prefetch so far. BTreePage::BinaryTree::Cursor c = path[path.size() - 2].cursor; + ASSERT(path[path.size() - 2].btPage()->height == 2); + constexpr int level = 1; // The loop conditions are split apart into different if blocks for readability. // While query limits are not exceeded @@ -6667,7 +6856,8 @@ public: // Prefetch the sibling if the link is not null if (c.get().value.present()) { BTreePageIDRef childPage = c.get().getChildPage(); - preLoadPage(pager.getPtr(), path[path.size() - 2].btPage()->height - 1, childPage); + // Assertion above enforces that level is 1 + preLoadPage(pager.getPtr(), level, childPage, ioLeafPriority); recordsRead += estRecordsPerPage; // Use sibling node capacity as an estimate of bytes read. bytesRead += childPage.size() * this->btree->m_blockSize; @@ -6675,18 +6865,16 @@ public: } } - ACTOR Future seekLT_impl(BTreeCursor* self, RedwoodRecordRef query, PagerEventReasons reason) { + ACTOR Future seekLT_impl(BTreeCursor* self, RedwoodRecordRef query) { debug_printf("seekLT(%s) start\n", query.toString().c_str()); - int cmp = wait(self->seek(query, reason)); + int cmp = wait(self->seek(query)); if (cmp <= 0) { wait(self->movePrev()); } return Void(); } - Future seekLT(RedwoodRecordRef query, PagerEventReasons reason) { - return seekLT_impl(this, query, reason); - } + Future seekLT(RedwoodRecordRef query) { return seekLT_impl(this, query); } ACTOR Future move_impl(BTreeCursor* self, bool forward) { // Try to the move cursor at the end of the path in the correct direction @@ -6735,7 +6923,8 @@ public: ASSERT(entry.cursor.movePrev()); ASSERT(entry.cursor.get().value.present()); } - wait(self->pushPage(PagerEventReasons::MetaData, entry.cursor)); + + wait(self->pushPage(entry.cursor)); auto& newEntry = self->path.back(); ASSERT(forward ? newEntry.cursor.moveFirst() : newEntry.cursor.moveLast()); } @@ -6750,7 +6939,7 @@ public: Future movePrev() { return move_impl(this, false); } }; - Future initBTreeCursor(BTreeCursor* cursor, Version snapshotVersion) { + Future initBTreeCursor(BTreeCursor* cursor, Version snapshotVersion, PagerEventReasons reason) { // Only committed versions can be read. ASSERT(snapshotVersion <= m_lastCommittedVersion); Reference snapshot = m_pager->getReadSnapshot(snapshotVersion); @@ -6758,7 +6947,7 @@ public: // This is a ref because snapshot will continue to hold the metakey value memory KeyRef m = snapshot->getMetaKey(); - return cursor->init(this, snapshot, ((MetaKey*)m.begin())->root.get()); + return cursor->init(this, reason, snapshot, ((MetaKey*)m.begin())->root.get()); } }; @@ -6770,7 +6959,7 @@ RedwoodRecordRef VersionedBTree::dbEnd(LiteralStringRef("\xff\xff\xff\xff\xff")) class KeyValueStoreRedwoodUnversioned : public IKeyValueStore { public: KeyValueStoreRedwoodUnversioned(std::string filePrefix, UID logID) - : m_filePrefix(filePrefix), m_concurrentReads(SERVER_KNOBS->REDWOOD_KVSTORE_CONCURRENT_READS), + : m_filePrefix(filePrefix), m_concurrentReads(SERVER_KNOBS->REDWOOD_KVSTORE_CONCURRENT_READS, 0), prefetch(SERVER_KNOBS->REDWOOD_KVSTORE_RANGE_PREFETCH) { int pageSize = @@ -6865,10 +7054,10 @@ public: int rowLimit, int byteLimit) { state VersionedBTree::BTreeCursor cur; - wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion())); + wait( + self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion(), PagerEventReasons::RangeRead)); - wait(self->m_concurrentReads.take()); - state FlowLock::Releaser releaser(self->m_concurrentReads); + state PriorityMultiLock::Lock lock = wait(self->m_concurrentReads.lock()); ++g_redwoodMetrics.metric.opGetRange; state RangeResult result; @@ -6880,11 +7069,12 @@ public: } if (rowLimit > 0) { - wait(cur.seekGTE(keys.begin, PagerEventReasons::RangeRead)); + wait(cur.seekGTE(keys.begin)); if (self->prefetch) { cur.prefetch(keys.end, true, rowLimit, byteLimit); } + while (cur.isValid()) { // Read page contents without using waits BTreePage::BinaryTree::Cursor leafCursor = cur.back().cursor; @@ -6926,7 +7116,7 @@ public: wait(cur.moveNext()); } } else { - wait(cur.seekLT(keys.end, PagerEventReasons::RangeRead)); + wait(cur.seekLT(keys.end)); if (self->prefetch) { cur.prefetch(keys.begin, false, -rowLimit, byteLimit); @@ -6979,7 +7169,7 @@ public: ASSERT(result.size() > 0); result.readThrough = result[result.size() - 1].key; } - g_redwoodMetrics.kvSizeReadByRangeGet->sample(accumulatedBytes); + g_redwoodMetrics.kvSizeReadByGetRange->sample(accumulatedBytes); return result; } @@ -6987,13 +7177,13 @@ public: Key key, Optional debugID) { state VersionedBTree::BTreeCursor cur; - wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion())); + wait( + self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion(), PagerEventReasons::PointRead)); - wait(self->m_concurrentReads.take()); - state FlowLock::Releaser releaser(self->m_concurrentReads); + state PriorityMultiLock::Lock lock = wait(self->m_concurrentReads.lock()); ++g_redwoodMetrics.metric.opGet; - wait(cur.seekGTE(key, PagerEventReasons::PointRead)); + wait(cur.seekGTE(key)); if (cur.isValid() && cur.get().key == key) { // Return a Value whose arena depends on the source page arena Value v; @@ -7029,7 +7219,7 @@ private: Future m_init; Promise m_closed; Promise m_error; - FlowLock m_concurrentReads; + PriorityMultiLock m_concurrentReads; bool prefetch; template @@ -7100,7 +7290,7 @@ ACTOR Future verifyRangeBTreeCursor(VersionedBTree* btree, state std::map, Optional>::const_iterator iLast; state VersionedBTree::BTreeCursor cur; - wait(btree->initBTreeCursor(&cur, v)); + wait(btree->initBTreeCursor(&cur, v, PagerEventReasons::RangeRead)); debug_printf("VerifyRange(@%" PRId64 ", %s, %s): Start\n", v, start.printable().c_str(), end.printable().c_str()); // Randomly use the cursor for something else first. @@ -7111,13 +7301,13 @@ ACTOR Future verifyRangeBTreeCursor(VersionedBTree* btree, start.printable().c_str(), end.printable().c_str(), randomKey.toString().c_str()); - wait(success(cur.seek(randomKey, PagerEventReasons::RangeRead))); + wait(success(cur.seek(randomKey))); } debug_printf( "VerifyRange(@%" PRId64 ", %s, %s): Actual seek\n", v, start.printable().c_str(), end.printable().c_str()); - wait(cur.seekGTE(start, PagerEventReasons::RangeRead)); + wait(cur.seekGTE(start)); state Standalone> results; @@ -7213,11 +7403,11 @@ ACTOR Future verifyRangeBTreeCursor(VersionedBTree* btree, // opening new cursors if (v >= btree->getOldestVersion() && deterministicRandom()->coinflip()) { cur = VersionedBTree::BTreeCursor(); - wait(btree->initBTreeCursor(&cur, v)); + wait(btree->initBTreeCursor(&cur, v, PagerEventReasons::RangeRead)); } // Now read the range from the tree in reverse order and compare to the saved results - wait(cur.seekLT(end, PagerEventReasons::RangeRead)); + wait(cur.seekLT(end)); state std::reverse_iterator r = results.rbegin(); @@ -7285,7 +7475,7 @@ ACTOR Future seekAllBTreeCursor(VersionedBTree* btree, state int errors = 0; state VersionedBTree::BTreeCursor cur; - wait(btree->initBTreeCursor(&cur, v)); + wait(btree->initBTreeCursor(&cur, v, PagerEventReasons::RangeRead)); while (i != iEnd) { state std::string key = i->first.first; @@ -7294,7 +7484,7 @@ ACTOR Future seekAllBTreeCursor(VersionedBTree* btree, state Optional val = i->second; debug_printf("Verifying @%" PRId64 " '%s'\n", ver, key.c_str()); state Arena arena; - wait(cur.seekGTE(RedwoodRecordRef(KeyRef(arena, key)), PagerEventReasons::MetaData)); + wait(cur.seekGTE(RedwoodRecordRef(KeyRef(arena, key)))); bool foundKey = cur.isValid() && cur.get().key == key; bool hasValue = foundKey && cur.get().value.present(); @@ -7367,7 +7557,7 @@ ACTOR Future verify(VersionedBTree* btree, // Get a cursor at v so that v doesn't get expired between the possibly serial steps below. state VersionedBTree::BTreeCursor cur; - wait(btree->initBTreeCursor(&cur, v)); + wait(btree->initBTreeCursor(&cur, v, PagerEventReasons::RangeRead)); debug_printf("Verifying entire key range at version %" PRId64 "\n", v); state Future fRangeAll = verifyRangeBTreeCursor( @@ -7415,11 +7605,11 @@ ACTOR Future randomReader(VersionedBTree* btree) { loop { wait(yield()); if (!cur.intialized() || deterministicRandom()->random01() > .01) { - wait(btree->initBTreeCursor(&cur, btree->getLastCommittedVersion())); + wait(btree->initBTreeCursor(&cur, btree->getLastCommittedVersion(), PagerEventReasons::RangeRead)); } state KeyValue kv = randomKV(10, 0); - wait(cur.seekGTE(kv.key, PagerEventReasons::PointRead)); + wait(cur.seekGTE(kv.key)); state int c = deterministicRandom()->randomInt(0, 100); state bool direction = deterministicRandom()->coinflip(); while (cur.isValid() && c-- > 0) { @@ -8996,10 +9186,10 @@ ACTOR Future randomSeeks(VersionedBTree* btree, int count, char firstChar, state int c = 0; state double readStart = timer(); state VersionedBTree::BTreeCursor cur; - wait(btree->initBTreeCursor(&cur, readVer)); + wait(btree->initBTreeCursor(&cur, readVer, PagerEventReasons::PointRead)); while (c < count) { state Key k = randomString(20, firstChar, lastChar); - wait(cur.seekGTE(k, PagerEventReasons::PointRead)); + wait(cur.seekGTE(k)); ++c; } double elapsed = timer() - readStart; @@ -9017,12 +9207,12 @@ ACTOR Future randomScans(VersionedBTree* btree, state int c = 0; state double readStart = timer(); state VersionedBTree::BTreeCursor cur; - wait(btree->initBTreeCursor(&cur, readVer)); + wait(btree->initBTreeCursor(&cur, readVer, PagerEventReasons::RangeRead)); state int totalScanBytes = 0; while (c++ < count) { state Key k = randomString(20, firstChar, lastChar); - wait(cur.seekGTE(k, PagerEventReasons::PointRead)); + wait(cur.seekGTE(k)); state int w = width; state bool directionFwd = deterministicRandom()->coinflip(); @@ -9067,7 +9257,8 @@ TEST_CASE(":/redwood/correctness/pager/cow") { pager->updatePage(PagerEventReasons::MetaData, nonBtreeLevel, id, p); pager->setMetaKey(LiteralStringRef("asdfasdf")); wait(pager->commit()); - Reference p2 = wait(pager->readPage(PagerEventReasons::PointRead, nonBtreeLevel, id, true)); + Reference p2 = + wait(pager->readPage(PagerEventReasons::PointRead, nonBtreeLevel, id, ioMinPriority, true, false)); printf("%s\n", StringRef(p2->begin(), p2->size()).toHexString().c_str()); // TODO: Verify reads, do more writes and reads to make this a real pager validator diff --git a/tests/TestRunner/tmp_multi_cluster.py b/tests/TestRunner/tmp_multi_cluster.py old mode 100755 new mode 100644