diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 9301a3245b..d4c5e5f226 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -24,8 +24,6 @@ set(FDBSERVER_SRCS IKeyValueStore.h IPager.h IVersionedStore.h - IndirectShadowPager.actor.cpp - IndirectShadowPager.h KeyValueStoreCompressTestData.actor.cpp KeyValueStoreMemory.actor.cpp KeyValueStoreSQLite.actor.cpp @@ -45,8 +43,6 @@ set(FDBSERVER_SRCS MasterInterface.h MasterProxyServer.actor.cpp masterserver.actor.cpp - MemoryPager.actor.cpp - MemoryPager.h MoveKeys.actor.cpp MoveKeys.actor.h networktest.actor.cpp diff --git a/fdbserver/DeltaTree.h b/fdbserver/DeltaTree.h index ce584f76f2..b1eb53dfff 100644 --- a/fdbserver/DeltaTree.h +++ b/fdbserver/DeltaTree.h @@ -20,13 +20,89 @@ #pragma once -#include "fdbserver/PrefixTree.h" #include "flow/flow.h" #include "flow/Arena.h" #include "fdbclient/FDBTypes.h" #include "fdbserver/Knobs.h" #include +typedef uint64_t Word; +static inline int commonPrefixLength(uint8_t const* ap, uint8_t const* bp, int cl) { + int i = 0; + const int wordEnd = cl - sizeof(Word) + 1; + + for(; i < wordEnd; i += sizeof(Word)) { + Word a = *(Word *)ap; + Word b = *(Word *)bp; + if(a != b) { + return i + ctzll(a ^ b) / 8; + } + ap += sizeof(Word); + bp += sizeof(Word); + } + + for (; i < cl; i++) { + if (*ap != *bp) { + return i; + } + ++ap; + ++bp; + } + return cl; +} + +static int commonPrefixLength(StringRef a, StringRef b) { + return commonPrefixLength(a.begin(), b.begin(), std::min(a.size(), b.size())); +} + +// This appears to be the fastest version +static int lessOrEqualPowerOfTwo(int n) { + int p; + for (p = 1; p+p <= n; p+=p); + return p; +} + +/* +static int _lessOrEqualPowerOfTwo(uint32_t n) { + if(n == 0) + return n; + int trailing = __builtin_ctz(n); + int leading = __builtin_clz(n); + if(trailing + leading == ((sizeof(n) * 8) - 1)) + return n; + return 1 << ( (sizeof(n) * 8) - leading - 1); +} + +static int __lessOrEqualPowerOfTwo(unsigned int n) { + int p = 1; + for(; p <= n; p <<= 1); + return p >> 1; +} +*/ + +static int perfectSubtreeSplitPoint(int subtree_size) { + // return the inorder index of the root node in a subtree of the given size + // consistent with the resulting binary search tree being "perfect" (having minimal height + // and all missing nodes as far right as possible). + // There has to be a simpler way to do this. + int s = lessOrEqualPowerOfTwo((subtree_size - 1) / 2 + 1) - 1; + return std::min(s * 2 + 1, subtree_size - s - 1); +} + +static int perfectSubtreeSplitPointCached(int subtree_size) { + static uint16_t *points = nullptr; + static const int max = 500; + if(points == nullptr) { + points = new uint16_t[max]; + for(int i = 0; i < max; ++i) + points[i] = perfectSubtreeSplitPoint(i); + } + + if(subtree_size < max) + return points[subtree_size]; + return perfectSubtreeSplitPoint(subtree_size); +} + // Delta Tree is a memory mappable binary tree of T objects such that each node's item is // stored as a Delta which can reproduce the node's T item given the node's greatest // lesser ancestor and the node's least greater ancestor. diff --git a/fdbserver/IPager.h b/fdbserver/IPager.h index dc58461e47..8f79d9c57f 100644 --- a/fdbserver/IPager.h +++ b/fdbserver/IPager.h @@ -53,8 +53,9 @@ #define VALGRIND_MAKE_MEM_DEFINED(x, y) #endif -typedef uint32_t LogicalPageID; // uint64_t? -static const LogicalPageID invalidLogicalPageID = std::numeric_limits::max(); +typedef uint32_t LogicalPageID; +typedef uint32_t PhysicalPageID; +#define invalidLogicalPageID std::numeric_limits::max() class IPage { public: @@ -85,12 +86,10 @@ public: class IPagerSnapshot { public: - virtual Future> getPhysicalPage(LogicalPageID pageID, bool cacheable) = 0; + virtual Future> getPhysicalPage(LogicalPageID pageID, bool cacheable, bool nohit) = 0; virtual Version getVersion() const = 0; - virtual Key getMetaKey() const { - return Key(); - } + virtual Key getMetaKey() const = 0; virtual ~IPagerSnapshot() {} @@ -98,65 +97,7 @@ public: virtual void delref() = 0; }; -class IPager : public IClosable { -public: - // Returns an IPage that can be passed to writePage. The data in the returned IPage might not be zeroed. - virtual Reference newPageBuffer() = 0; - - // Returns the usable size of pages returned by the pager (i.e. the size of the page that isn't pager overhead). - // For a given pager instance, separate calls to this function must return the same value. - virtual int getUsablePageSize() = 0; - - virtual StorageBytes getStorageBytes() = 0; - - // Permitted to fail (ASSERT) during recovery. - virtual Reference getReadSnapshot(Version version) = 0; - - // Returns an unused LogicalPageID. - // LogicalPageIDs in the range [0, SERVER_KNOBS->PAGER_RESERVED_PAGES) do not need to be allocated. - // Permitted to fail (ASSERT) during recovery. - virtual LogicalPageID allocateLogicalPage() = 0; - - // Signals that the page will no longer be used as of the specified version. Versions prior to the specified version must be kept. - // Permitted to fail (ASSERT) during recovery. - virtual void freeLogicalPage(LogicalPageID pageID, Version version) = 0; - - // Writes a page with the given LogicalPageID at the specified version. LogicalPageIDs in the range [0, SERVER_KNOBS->PAGER_RESERVED_PAGES) - // can be written without being allocated. All other LogicalPageIDs must be allocated using allocateLogicalPage before writing them. - // - // If updateVersion is 0, we are signalling to the pager that we are reusing the LogicalPageID entry at the current latest version of pageID. - // - // Otherwise, we will add a new entry for LogicalPageID at the specified version. In that case, updateVersion must be larger than any version - // written to this page previously, and it must be larger than any version committed. If referencePageID is given, the latest version of that - // page will be used for the write, which *can* be less than the latest committed version. - // - // Permitted to fail (ASSERT) during recovery. - virtual void writePage(LogicalPageID pageID, Reference contents, Version updateVersion, LogicalPageID referencePageID = invalidLogicalPageID) = 0; - - // Signals to the pager that no more reads will be performed in the range [begin, end). - // Permitted to fail (ASSERT) during recovery. - virtual void forgetVersions(Version begin, Version end) = 0; - - // Makes durable all writes and any data structures used for recovery. - // Permitted to fail (ASSERT) during recovery. - virtual Future commit() = 0; - - // Returns the latest version of the pager. Permitted to block until recovery is complete, at which point it should always be set immediately. - // Some functions in the IPager interface are permitted to fail (ASSERT) during recovery, so users should wait for getLatestVersion to complete - // before doing anything else. - virtual Future getLatestVersion() = 0; - - // Sets the latest version of the pager. Must be monotonically increasing. - // - // Must be called prior to reading the specified version. SOMEDAY: It may be desirable in the future to relax this constraint for performance reasons. - // - // Permitted to fail (ASSERT) during recovery. - virtual void setLatestVersion(Version version) = 0; - -protected: - ~IPager() {} // Destruction should be done using close()/dispose() from the IClosable interface -}; - +// This API is probably customized to the behavior of DWALPager and probably needs some changes to be more generic. class IPager2 : public IClosable { public: // Returns an IPage that can be passed to writePage. The data in the returned IPage might not be zeroed. @@ -189,7 +130,10 @@ public: // The data returned will be the later of // - the most recent committed atomic // - the most recent non-atomic write - virtual Future> readPage(LogicalPageID pageID, bool cacheable) = 0; + // Cacheable indicates that the page should be added to the page cache (if applicable?) as a result of this read. + // NoHit indicates that the read should not be considered a cache hit, such as when preloading pages that are + // considered likely to be needed soon. + virtual Future> readPage(LogicalPageID pageID, bool cacheable = true, bool noHit = false) = 0; // Get a snapshot of the metakey and all pages as of the version v which must be >= getOldestVersion() // Note that snapshots at any version may still see the results of updatePage() calls. diff --git a/fdbserver/IVersionedStore.h b/fdbserver/IVersionedStore.h index de4cfd2084..9baf5c4469 100644 --- a/fdbserver/IVersionedStore.h +++ b/fdbserver/IVersionedStore.h @@ -30,10 +30,10 @@ class IStoreCursor { public: virtual Future findEqual(KeyRef key) = 0; - virtual Future findFirstEqualOrGreater(KeyRef key, bool needValue, int prefetchNextBytes) = 0; - virtual Future findLastLessOrEqual(KeyRef key, bool needValue, int prefetchPriorBytes) = 0; - virtual Future next(bool needValue) = 0; - virtual Future prev(bool needValue) = 0; + virtual Future findFirstEqualOrGreater(KeyRef key, int prefetchBytes = 0) = 0; + virtual Future findLastLessOrEqual(KeyRef key, int prefetchBytes = 0) = 0; + virtual Future next() = 0; + virtual Future prev() = 0; virtual bool isValid() = 0; virtual KeyRef getKey() = 0; @@ -41,8 +41,6 @@ public: virtual void addref() = 0; virtual void delref() = 0; - - virtual std::string toString() const = 0; }; class IVersionedStore : public IClosable { diff --git a/fdbserver/IndirectShadowPager.actor.cpp b/fdbserver/IndirectShadowPager.actor.cpp deleted file mode 100644 index 5a525b17af..0000000000 --- a/fdbserver/IndirectShadowPager.actor.cpp +++ /dev/null @@ -1,960 +0,0 @@ -/* - * IndirectShadowPager.actor.cpp - * - * This source file is part of the FoundationDB open source project - * - * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "fdbserver/IndirectShadowPager.h" -#include "fdbserver/Knobs.h" - -#include "flow/UnitTest.h" -#include "flow/actorcompiler.h" -#include "fdbrpc/crc32c.h" - -struct SumType { - bool operator==(const SumType &rhs) const { return crc == rhs.crc; } - uint32_t crc; - std::string toString() { return format("0x%08x", crc); } -}; - -bool checksum(IAsyncFile *file, uint8_t *page, int pageSize, LogicalPageID logical, PhysicalPageID physical, bool write) { - // Calculates and then stores or verifies the checksum at the end of the page. - // If write is true then the checksum is written into the page - // If write is false then the checksum is compared to the in-page sum and - // and error will be thrown if they do not match. - ASSERT(sizeof(SumType) == IndirectShadowPage::PAGE_OVERHEAD_BYTES); - // Adjust pageSize to refer to only usable storage bytes - pageSize -= IndirectShadowPage::PAGE_OVERHEAD_BYTES; - SumType sum; - SumType *pSumInPage = (SumType *)(page + pageSize); - // Write sum directly to page or to sum variable based on mode - SumType *sumOut = write ? pSumInPage : ∑ - sumOut->crc = crc32c_append(logical, page, pageSize); - VALGRIND_MAKE_MEM_DEFINED(sumOut, sizeof(SumType)); - - debug_printf("checksum %s%s logical %d physical %d size %d checksums page %s calculated %s data at %p %s\n", - write ? "write" : "read", - (!write && sum != *pSumInPage) ? " MISMATCH" : "", - logical, physical, pageSize, - write ? "NA" : pSumInPage->toString().c_str(), - sumOut->toString().c_str(), page, ""); - - // Verify if not in write mode - if(!write && sum != *pSumInPage) { - TraceEvent (SevError, "IndirectShadowPagerPageChecksumFailure") - .detail("UserPageSize", pageSize) - .detail("Filename", file->getFilename()) - .detail("LogicalPage", logical) - .detail("PhysicalPage", physical) - .detail("ChecksumInPage", pSumInPage->toString()) - .detail("ChecksumCalculated", sum.toString()); - return false; - } - return true; -} - -inline bool checksumRead(IAsyncFile *file, uint8_t *page, int pageSize, LogicalPageID logical, PhysicalPageID physical) { - return checksum(file, page, pageSize, logical, physical, false); -} - -inline void checksumWrite(IAsyncFile *file, uint8_t *page, int pageSize, LogicalPageID logical, PhysicalPageID physical) { - checksum(file, page, pageSize, logical, physical, true); -} - -IndirectShadowPage::IndirectShadowPage() : fastAllocated(true) { - data = (uint8_t*)FastAllocator<4096>::allocate(); -} - -IndirectShadowPage::~IndirectShadowPage() { - if(fastAllocated) { - FastAllocator<4096>::release(data); - } - else if(file) { - file->releaseZeroCopy(data, PAGE_BYTES, (int64_t) physicalPageID * PAGE_BYTES); - } -} - -uint8_t const* IndirectShadowPage::begin() const { - return data; -} - -uint8_t* IndirectShadowPage::mutate() { - return data; -} - -int IndirectShadowPage::size() const { - return PAGE_BYTES - PAGE_OVERHEAD_BYTES; -} - -const int IndirectShadowPage::PAGE_BYTES = 4096; -const int IndirectShadowPage::PAGE_OVERHEAD_BYTES = sizeof(SumType); - -IndirectShadowPagerSnapshot::IndirectShadowPagerSnapshot(IndirectShadowPager *pager, Version version) - : pager(pager), version(version), pagerError(pager->getError()) -{ -} - -Future> IndirectShadowPagerSnapshot::getPhysicalPage(LogicalPageID pageID, bool cacheable) { - if(pagerError.isReady()) - pagerError.get(); - return pager->getPage(Reference::addRef(this), pageID, version); -} - -template -T bigEndian(T val) { - static_assert(sizeof(T) <= 8, "Can't compute bigEndian on integers larger than 8 bytes"); - uint64_t b = bigEndian64(val); - return *(T*)((uint8_t*)&b+8-sizeof(T)); -} - -ACTOR Future recover(IndirectShadowPager *pager) { - try { - TraceEvent("PagerRecovering").detail("Filename", pager->pageFileName); - pager->pageTableLog = keyValueStoreMemory(pager->basename, UID(), 1e9, "pagerlog"); - - // TODO: this can be done synchronously with the log recovery - int64_t flags = IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_LOCK; - state bool exists = fileExists(pager->pageFileName); - if(!exists) { - flags |= IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE; - } - - Reference dataFile = wait(IAsyncFileSystem::filesystem()->open(pager->pageFileName, flags, 0600)); - pager->dataFile = dataFile; - - TraceEvent("PagerOpenedDataFile").detail("Filename", pager->pageFileName); - - if(!exists) { - wait(pager->dataFile->sync()); - } - TraceEvent("PagerSyncdDataFile").detail("Filename", pager->pageFileName); - - state int64_t fileSize = wait(pager->dataFile->size()); - TraceEvent("PagerGotFileSize").detail("Size", fileSize).detail("Filename", pager->pageFileName); - - if(fileSize > 0) { - TraceEvent("PagerRecoveringFromLogs").detail("Filename", pager->pageFileName); - Optional pagesAllocatedValue = wait(pager->pageTableLog->readValue(IndirectShadowPager::PAGES_ALLOCATED_KEY)); - if(pagesAllocatedValue.present()) { - BinaryReader pr(pagesAllocatedValue.get(), Unversioned()); - uint32_t pagesAllocated; - pr >> pagesAllocated; - pager->pagerFile.init(fileSize, pagesAllocated); - - debug_printf("%s: Recovered pages allocated: %d\n", pager->pageFileName.c_str(), pager->pagerFile.pagesAllocated); - ASSERT(pager->pagerFile.pagesAllocated != PagerFile::INVALID_PAGE); - - Optional latestVersionValue = wait(pager->pageTableLog->readValue(IndirectShadowPager::LATEST_VERSION_KEY)); - ASSERT(latestVersionValue.present()); - - BinaryReader vr(latestVersionValue.get(), Unversioned()); - vr >> pager->latestVersion; - - Optional oldestVersionValue = wait(pager->pageTableLog->readValue(IndirectShadowPager::OLDEST_VERSION_KEY)); - - if(oldestVersionValue.present()) { - BinaryReader vr(oldestVersionValue.get(), Unversioned()); - vr >> pager->oldestVersion; - } - - debug_printf("%s: Recovered version info: earliest v%lld latest v%lld\n", pager->pageFileName.c_str(), pager->oldestVersion, pager->latestVersion); - pager->committedVersion = pager->latestVersion; - - Standalone> tableEntries = wait(pager->pageTableLog->readRange(KeyRangeRef(IndirectShadowPager::TABLE_ENTRY_PREFIX, strinc(IndirectShadowPager::TABLE_ENTRY_PREFIX)))); - - if(tableEntries.size() > 0) { - BinaryReader kr(tableEntries.back().key, Unversioned()); - - uint8_t prefix; - LogicalPageID logicalPageID; - - kr >> prefix; - ASSERT(prefix == IndirectShadowPager::TABLE_ENTRY_PREFIX.begin()[0]); - - kr >> logicalPageID; - logicalPageID = bigEndian(logicalPageID); - - LogicalPageID pageTableSize = std::max(logicalPageID+1, SERVER_KNOBS->PAGER_RESERVED_PAGES); - pager->pageTable.resize(pageTableSize); - debug_printf("%s: Recovered page table size: %d\n", pager->pageFileName.c_str(), pageTableSize); - } - else { - debug_printf("%s: Recovered no page table entries\n", pager->pageFileName.c_str()); - } - - LogicalPageID nextPageID = SERVER_KNOBS->PAGER_RESERVED_PAGES; - std::set allocatedPhysicalPages; - for(auto entry : tableEntries) { - BinaryReader kr(entry.key, Unversioned()); - BinaryReader vr(entry.value, Unversioned()); - - uint8_t prefix; - LogicalPageID logicalPageID; - Version version; - PhysicalPageID physicalPageID; - - kr >> prefix; - ASSERT(prefix == IndirectShadowPager::TABLE_ENTRY_PREFIX.begin()[0]); - - kr >> logicalPageID; - logicalPageID = bigEndian(logicalPageID); - - kr >> version; - version = bigEndian(version); - vr >> physicalPageID; - - ASSERT(version <= pager->latestVersion); - - pager->pageTable[logicalPageID].push_back(std::make_pair(version, physicalPageID)); - - if(physicalPageID != PagerFile::INVALID_PAGE) { - allocatedPhysicalPages.insert(physicalPageID); - pager->pagerFile.markPageAllocated(logicalPageID, version, physicalPageID); - } - - while(nextPageID < logicalPageID) { - pager->logicalFreeList.push_back(nextPageID++); - } - if(logicalPageID == nextPageID) { - ++nextPageID; - } - - debug_printf("%s: Recovered page table entry logical %d -> (v%lld, physical %d)\n", pager->pageFileName.c_str(), logicalPageID, version, physicalPageID); - } - - debug_printf("%s: Building physical free list\n", pager->pageFileName.c_str()); - // TODO: can we do this better? does it require storing extra info in the log? - PhysicalPageID nextPhysicalPageID = 0; - for(auto itr = allocatedPhysicalPages.begin(); itr != allocatedPhysicalPages.end(); ++itr) { - while(nextPhysicalPageID < *itr) { - pager->pagerFile.freePage(nextPhysicalPageID++); - } - ++nextPhysicalPageID; - } - - while(nextPhysicalPageID < pager->pagerFile.pagesAllocated) { - pager->pagerFile.freePage(nextPhysicalPageID++); - } - } - } - - if(pager->pageTable.size() < SERVER_KNOBS->PAGER_RESERVED_PAGES) { - pager->pageTable.resize(SERVER_KNOBS->PAGER_RESERVED_PAGES); - } - - pager->pagerFile.finishedMarkingPages(); - pager->pagerFile.startVacuuming(); - - debug_printf("%s: Finished recovery at v%lld\n", pager->pageFileName.c_str(), pager->latestVersion); - TraceEvent("PagerFinishedRecovery").detail("LatestVersion", pager->latestVersion).detail("OldestVersion", pager->oldestVersion).detail("Filename", pager->pageFileName); - } - catch(Error &e) { - if(e.code() != error_code_actor_cancelled) { - TraceEvent(SevError, "PagerRecoveryFailed").error(e, true).detail("Filename", pager->pageFileName); - } - throw; - } - - return Void(); -} - -ACTOR Future housekeeper(IndirectShadowPager *pager) { - wait(pager->recovery); - wait(Never()); - loop { - state LogicalPageID pageID = 0; - for(; pageID < pager->pageTable.size(); ++pageID) { - // TODO: pick an appropriate rate for this loop and determine the right way to implement it - // Right now, this delays 10ms every 400K pages, which means we have 1s of delay for every - // 40M pages. In total, we introduce 100s delay for a max size 4B page file. - if(pageID % 400000 == 0) { - wait(delay(0.01)); - } - else { - wait(yield()); - } - - auto& pageVersionMap = pager->pageTable[pageID]; - - if(pageVersionMap.size() > 0) { - auto itr = pageVersionMap.begin(); - for(auto prev = itr; prev != pageVersionMap.end() && prev->first < pager->oldestVersion; prev=itr) { - pager->pagerFile.markPageAllocated(pageID, itr->first, itr->second); - ++itr; - if(prev->second != PagerFile::INVALID_PAGE && (itr == pageVersionMap.end() || itr->first <= pager->oldestVersion)) { - pager->freePhysicalPageID(prev->second); - } - if(itr == pageVersionMap.end() || itr->first >= pager->oldestVersion) { - debug_printf("%s: Updating oldest version for logical %u: v%lld\n", pager->pageFileName.c_str(), pageID, pager->oldestVersion); - pager->logPageTableClear(pageID, 0, pager->oldestVersion); - - if(itr != pageVersionMap.end() && itr->first > pager->oldestVersion) { - debug_printf("%s: Erasing pages to prev from pageVersionMap for %d (itr=%lld, prev=%lld)\n", pager->pageFileName.c_str(), pageID, itr->first, prev->first); - prev->first = pager->oldestVersion; - pager->logPageTableUpdate(pageID, pager->oldestVersion, prev->second); - itr = pageVersionMap.erase(pageVersionMap.begin(), prev); - } - else { - debug_printf("%s: Erasing pages to itr from pageVersionMap for %d (%d) (itr=%lld, prev=%lld)\n", pager->pageFileName.c_str(), pageID, itr == pageVersionMap.end(), itr==pageVersionMap.end() ? -1 : itr->first, prev->first); - itr = pageVersionMap.erase(pageVersionMap.begin(), itr); - } - } - } - - for(; itr != pageVersionMap.end(); ++itr) { - pager->pagerFile.markPageAllocated(pageID, itr->first, itr->second); - } - - if(pageVersionMap.size() == 0) { - pager->freeLogicalPageID(pageID); - } - } - } - - pager->pagerFile.finishedMarkingPages(); - } -} - -ACTOR Future forwardError(Future f, Promise target) { - try { - wait(f); - } - catch(Error &e) { - if(e.code() != error_code_actor_cancelled && target.canBeSet()) { - target.sendError(e); - } - - throw e; - } - - return Void(); -} - -IndirectShadowPager::IndirectShadowPager(std::string basename) - : basename(basename), latestVersion(0), committedVersion(0), committing(Void()), oldestVersion(0), pagerFile(this) -{ - pageFileName = basename; - recovery = forwardError(recover(this), errorPromise); - housekeeping = forwardError(housekeeper(this), errorPromise); -} - -StorageBytes IndirectShadowPager::getStorageBytes() { - int64_t free; - int64_t total; - g_network->getDiskBytes(parentDirectory(basename), free, total); - return StorageBytes(free, total, pagerFile.size(), free + IndirectShadowPage::PAGE_BYTES * pagerFile.getFreePages()); -} - -Reference IndirectShadowPager::newPageBuffer() { - return Reference(new IndirectShadowPage()); -} - -int IndirectShadowPager::getUsablePageSize() { - return IndirectShadowPage::PAGE_BYTES - IndirectShadowPage::PAGE_OVERHEAD_BYTES; -} - -Reference IndirectShadowPager::getReadSnapshot(Version version) { - debug_printf("%s: Getting read snapshot v%lld latest v%lld oldest v%lld\n", pageFileName.c_str(), version, latestVersion, oldestVersion); - ASSERT(recovery.isReady()); - ASSERT(version <= latestVersion); - ASSERT(version >= oldestVersion); - - return Reference(new IndirectShadowPagerSnapshot(this, version)); -} - -LogicalPageID IndirectShadowPager::allocateLogicalPage() { - ASSERT(recovery.isReady()); - - LogicalPageID allocatedPage; - if(logicalFreeList.size() > 0) { - allocatedPage = logicalFreeList.front(); - logicalFreeList.pop_front(); - } - else { - ASSERT(pageTable.size() < std::numeric_limits::max()); // TODO: different error? - allocatedPage = pageTable.size(); - pageTable.push_back(PageVersionMap()); - } - - ASSERT(allocatedPage >= SERVER_KNOBS->PAGER_RESERVED_PAGES); - debug_printf("%s: op=allocate id=%u\n", pageFileName.c_str(), allocatedPage); - return allocatedPage; -} - -void IndirectShadowPager::freeLogicalPage(LogicalPageID pageID, Version version) { - ASSERT(recovery.isReady()); - ASSERT(committing.isReady()); - - ASSERT(pageID < pageTable.size()); - - PageVersionMap &pageVersionMap = pageTable[pageID]; - ASSERT(!pageVersionMap.empty()); - - // 0 will mean delete as of latest version, similar to write at latest version - if(version == 0) { - version = pageVersionMap.back().first; - } - - auto itr = pageVersionMapLowerBound(pageVersionMap, version); - // TODO: Is this correct, that versions from the past *forward* can be deleted? - for(auto i = itr; i != pageVersionMap.end(); ++i) { - freePhysicalPageID(i->second); - } - - if(itr != pageVersionMap.end()) { - debug_printf("%s: Clearing newest versions for logical %u: v%lld\n", pageFileName.c_str(), pageID, version); - logPageTableClearToEnd(pageID, version); - pageVersionMap.erase(itr, pageVersionMap.end()); - } - - if(pageVersionMap.size() == 0) { - debug_printf("%s: Freeing logical %u (freeLogicalPage)\n", pageFileName.c_str(), pageID); - logicalFreeList.push_back(pageID); - } - else if(pageVersionMap.back().second != PagerFile::INVALID_PAGE) { - pageVersionMap.push_back(std::make_pair(version, PagerFile::INVALID_PAGE)); - logPageTableUpdate(pageID, version, PagerFile::INVALID_PAGE); - } -} - -ACTOR Future waitAndFreePhysicalPageID(IndirectShadowPager *pager, PhysicalPageID pageID, Future canFree) { - wait(canFree); - pager->pagerFile.freePage(pageID); - return Void(); -} - -// TODO: Freeing physical pages must be done *after* committing the page map changes that cause the physical page to no longer be used. -// Otherwise, the physical page could be reused by a write followed by a power loss in which case the mapping change would not -// have been committed and so the physical page should still contain its previous data but it's been overwritten. -void IndirectShadowPager::freePhysicalPageID(PhysicalPageID pageID) { - debug_printf("%s: Freeing physical %u\n", pageFileName.c_str(), pageID); - pagerFile.freePage(pageID); -} - -void IndirectShadowPager::writePage(LogicalPageID pageID, Reference contents, Version updateVersion, LogicalPageID referencePageID) { - ASSERT(recovery.isReady()); - ASSERT(committing.isReady()); - - ASSERT(updateVersion > latestVersion || updateVersion == 0); - ASSERT(pageID < pageTable.size()); - - PageVersionMap &pageVersionMap = pageTable[pageID]; - - ASSERT(pageVersionMap.empty() || pageVersionMap.back().second != PagerFile::INVALID_PAGE); - - // TODO: should this be conditional on the write succeeding? - bool updateExisting = updateVersion == 0; - if(updateExisting) { - // If there is no existing latest version to update then there must be a referencePageID from which to get a latest version - // so get that version and change this to a normal update - if(pageVersionMap.empty()) { - ASSERT(referencePageID != invalidLogicalPageID); - PageVersionMap &rpv = pageTable[referencePageID]; - ASSERT(!rpv.empty()); - updateVersion = rpv.back().first; - updateExisting = false; - } - else { - ASSERT(pageVersionMap.size()); - updateVersion = pageVersionMap.back().first; - } - } - - PhysicalPageID physicalPageID = pagerFile.allocatePage(pageID, updateVersion); - - debug_printf("%s: Writing logical %d v%lld physical %d\n", pageFileName.c_str(), pageID, updateVersion, physicalPageID); - - if(updateExisting) { - // TODO: Physical page cannot be freed now, it must be done after the page mapping change above is committed - //freePhysicalPageID(pageVersionMap.back().second); - pageVersionMap.back().second = physicalPageID; - } - else { - ASSERT(pageVersionMap.empty() || pageVersionMap.back().first < updateVersion); - pageVersionMap.push_back(std::make_pair(updateVersion, physicalPageID)); - } - - logPageTableUpdate(pageID, updateVersion, physicalPageID); - - checksumWrite(dataFile.getPtr(), contents->mutate(), IndirectShadowPage::PAGE_BYTES, pageID, physicalPageID); - - Future write = holdWhile(contents, dataFile->write(contents->begin(), IndirectShadowPage::PAGE_BYTES, (int64_t) physicalPageID * IndirectShadowPage::PAGE_BYTES)); - - if(write.isError()) { - if(errorPromise.canBeSet()) { - errorPromise.sendError(write.getError()); - } - throw write.getError(); - } - writeActors.add(forwardError(write, errorPromise)); -} - -void IndirectShadowPager::forgetVersions(Version begin, Version end) { - ASSERT(recovery.isReady()); - ASSERT(begin <= end); - ASSERT(end <= latestVersion); - - // TODO: support forgetting arbitrary ranges - if(begin <= oldestVersion) { - oldestVersion = std::max(end, oldestVersion); - logVersion(OLDEST_VERSION_KEY, oldestVersion); - } -} - -ACTOR Future commitImpl(IndirectShadowPager *pager, Future previousCommit) { - state Future outstandingWrites = pager->writeActors.signalAndCollapse(); - state Version commitVersion = pager->latestVersion; - - wait(previousCommit); - - pager->logVersion(IndirectShadowPager::LATEST_VERSION_KEY, commitVersion); - - // TODO: we need to prevent writes that happen now from being committed in the subsequent log commit - // This is probably best done once we have better control of the log, where we can write a commit entry - // here without syncing the file. - - wait(outstandingWrites); - - wait(pager->dataFile->sync()); - wait(pager->pageTableLog->commit()); - - pager->committedVersion = std::max(pager->committedVersion, commitVersion); - - return Void(); -} - -Future IndirectShadowPager::commit() { - ASSERT(recovery.isReady()); - Future f = commitImpl(this, committing); - committing = f; - return committing; -} - -void IndirectShadowPager::setLatestVersion(Version version) { - ASSERT(recovery.isReady()); - latestVersion = version; -} - -ACTOR Future getLatestVersionImpl(IndirectShadowPager *pager) { - wait(pager->recovery); - return pager->latestVersion; -} - -Future IndirectShadowPager::getLatestVersion() { - return getLatestVersionImpl(this); -} - -Future IndirectShadowPager::getError() { - return errorPromise.getFuture(); -} - -Future IndirectShadowPager::onClosed() { - return closed.getFuture(); -} - -ACTOR void shutdown(IndirectShadowPager *pager, bool dispose) { - if(pager->errorPromise.canBeSet()) - pager->errorPromise.sendError(actor_cancelled()); // Ideally this should be shutdown_in_progress - - // Cancel all outstanding reads - auto i = pager->busyPages.begin(); - auto iEnd = pager->busyPages.end(); - - while(i != iEnd) { - // Advance before calling cancel as the rawRead cancel will destroy the map entry it lives in - (i++)->second.read.cancel(); - } - ASSERT(pager->busyPages.empty()); - - wait(ready(pager->writeActors.signal())); - wait(ready(pager->operations.signal())); - wait(ready(pager->committing)); - - pager->housekeeping.cancel(); - pager->pagerFile.shutdown(); - - state Future pageTableClosed = pager->pageTableLog->onClosed(); - if(dispose) { - wait(ready(IAsyncFileSystem::filesystem()->deleteFile(pager->pageFileName, true))); - pager->pageTableLog->dispose(); - } - else { - pager->pageTableLog->close(); - } - - wait(ready(pageTableClosed)); - - pager->closed.send(Void()); - delete pager; -} - -void IndirectShadowPager::dispose() { - shutdown(this, true); -} - -void IndirectShadowPager::close() { - shutdown(this, false); -} - -ACTOR Future> rawRead(IndirectShadowPager *pager, LogicalPageID logicalPageID, PhysicalPageID physicalPageID) { - state void *data; - state int len = IndirectShadowPage::PAGE_BYTES; - state bool readSuccess = false; - - try { - wait(pager->dataFile->readZeroCopy(&data, &len, (int64_t) physicalPageID * IndirectShadowPage::PAGE_BYTES)); - readSuccess = true; - - if(!checksumRead(pager->dataFile.getPtr(), (uint8_t *)data, len, logicalPageID, physicalPageID)) { - throw checksum_failed(); - } - - pager->busyPages.erase(physicalPageID); - return Reference(new IndirectShadowPage((uint8_t *)data, pager->dataFile, physicalPageID)); - } - catch(Error &e) { - pager->busyPages.erase(physicalPageID); - if(readSuccess || e.code() == error_code_actor_cancelled) { - pager->dataFile->releaseZeroCopy(data, len, (int64_t) physicalPageID * IndirectShadowPage::PAGE_BYTES); - } - throw; - } -} - -Future> getPageImpl(IndirectShadowPager *pager, Reference snapshot, LogicalPageID logicalPageID, Version version) { - ASSERT(logicalPageID < pager->pageTable.size()); - PageVersionMap &pageVersionMap = pager->pageTable[logicalPageID]; - - auto itr = IndirectShadowPager::pageVersionMapUpperBound(pageVersionMap, version); - if(itr == pageVersionMap.begin()) { - debug_printf("%s: Page version map empty! op=error id=%u @%lld\n", pager->pageFileName.c_str(), logicalPageID, version); - ASSERT(false); - } - --itr; - PhysicalPageID physicalPageID = itr->second; - ASSERT(physicalPageID != PagerFile::INVALID_PAGE); - - debug_printf("%s: Reading logical %d v%lld physical %d mapSize %lu\n", pager->pageFileName.c_str(), logicalPageID, version, physicalPageID, pageVersionMap.size()); - - IndirectShadowPager::BusyPage &bp = pager->busyPages[physicalPageID]; - if(!bp.read.isValid()) { - Future> get = rawRead(pager, logicalPageID, physicalPageID); - if(!get.isReady()) { - bp.read = get; - } - return get; - } - return bp.read; -} - -Future> IndirectShadowPager::getPage(Reference snapshot, LogicalPageID pageID, Version version) { - if(!recovery.isReady()) { - debug_printf("%s: getPage failure, recovery not ready - op=error id=%u @%lld\n", pageFileName.c_str(), pageID, version); - ASSERT(false); - } - - Future> f = getPageImpl(this, snapshot, pageID, version); - operations.add(forwardError(ready(f), errorPromise)); // For some reason if success is ready() then shutdown hangs when waiting on operations - return f; -} - -PageVersionMap::iterator IndirectShadowPager::pageVersionMapLowerBound(PageVersionMap &pageVersionMap, Version version) { - return std::lower_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](std::pair p, Version v) { - return p.first < v; - }); -} - -PageVersionMap::iterator IndirectShadowPager::pageVersionMapUpperBound(PageVersionMap &pageVersionMap, Version version) { - return std::upper_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](Version v, std::pair p) { - return v < p.first; - }); -} - -void IndirectShadowPager::freeLogicalPageID(LogicalPageID pageID) { - if(pageID >= SERVER_KNOBS->PAGER_RESERVED_PAGES) { - debug_printf("%s: Freeing logical %u\n", pageFileName.c_str(), pageID); - logicalFreeList.push_back(pageID); - } -} - -void IndirectShadowPager::logVersion(StringRef versionKey, Version version) { - BinaryWriter v(Unversioned()); - v << version; - - pageTableLog->set(KeyValueRef(versionKey, v.toValue())); -} - -void IndirectShadowPager::logPagesAllocated() { - BinaryWriter v(Unversioned()); - v << pagerFile.getPagesAllocated(); - - pageTableLog->set(KeyValueRef(PAGES_ALLOCATED_KEY, v.toValue())); -} - -void IndirectShadowPager::logPageTableUpdate(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID) { - BinaryWriter k(Unversioned()); - k << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(version); - - BinaryWriter v(Unversioned()); - v << physicalPageID; - - pageTableLog->set(KeyValueRef(k.toValue(), v.toValue())); -} - -void IndirectShadowPager::logPageTableClearToEnd(LogicalPageID logicalPageID, Version start) { - BinaryWriter b(Unversioned()); - b << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(start); - - BinaryWriter e(Unversioned()); - e << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID); - - pageTableLog->clear(KeyRangeRef(b.toValue(), strinc(e.toValue()))); -} - -void IndirectShadowPager::logPageTableClear(LogicalPageID logicalPageID, Version start, Version end) { - BinaryWriter b(Unversioned()); - b << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(start); - - BinaryWriter e(Unversioned()); - e << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(end); - - pageTableLog->clear(KeyRangeRef(b.toValue(), e.toValue())); -} - -const StringRef IndirectShadowPager::LATEST_VERSION_KEY = LiteralStringRef("\xff/LatestVersion"); -const StringRef IndirectShadowPager::OLDEST_VERSION_KEY = LiteralStringRef("\xff/OldestVersion"); -const StringRef IndirectShadowPager::PAGES_ALLOCATED_KEY = LiteralStringRef("\xff/PagesAllocated"); -const StringRef IndirectShadowPager::TABLE_ENTRY_PREFIX = LiteralStringRef("\x00"); - -ACTOR Future copyPage(IndirectShadowPager *pager, Reference page, LogicalPageID logical, PhysicalPageID from, PhysicalPageID to) { - state bool zeroCopied = true; - state int bytes = IndirectShadowPage::PAGE_BYTES; - state void *data = nullptr; - - try { - try { - wait(pager->dataFile->readZeroCopy(&data, &bytes, (int64_t)from * IndirectShadowPage::PAGE_BYTES)); - } - catch(Error &e) { - zeroCopied = false; - data = page->mutate(); - int _bytes = wait(pager->dataFile->read(data, page->size(), (int64_t)from * IndirectShadowPage::PAGE_BYTES)); - bytes = _bytes; - } - - ASSERT(bytes == IndirectShadowPage::PAGE_BYTES); - checksumWrite(pager->dataFile.getPtr(), page->mutate(), bytes, logical, to); - wait(pager->dataFile->write(data, bytes, (int64_t)to * IndirectShadowPage::PAGE_BYTES)); - if(zeroCopied) { - pager->dataFile->releaseZeroCopy(data, bytes, (int64_t)from * IndirectShadowPage::PAGE_BYTES); - } - } - catch(Error &e) { - if(zeroCopied) { - pager->dataFile->releaseZeroCopy(data, bytes, (int64_t)from * IndirectShadowPage::PAGE_BYTES); - } - pager->pagerFile.freePage(to); - throw e; - } - - return Void(); -} - -ACTOR Future vacuumer(IndirectShadowPager *pager, PagerFile *pagerFile) { - state Reference page(new IndirectShadowPage()); - - loop { - state double start = now(); - while(!pagerFile->canVacuum()) { - wait(delay(1.0)); - } - - ASSERT(!pagerFile->freePages.empty()); - - if(!pagerFile->vacuumQueue.empty()) { - state PhysicalPageID lastUsedPage = pagerFile->vacuumQueue.rbegin()->first; - PhysicalPageID lastFreePage = *pagerFile->freePages.rbegin(); - debug_printf("%s: Vacuuming: evaluating (free list size=%lu, lastFreePage=%u, lastUsedPage=%u, pagesAllocated=%u)\n", pager->pageFileName.c_str(), pagerFile->freePages.size(), lastFreePage, lastUsedPage, pagerFile->pagesAllocated); - ASSERT(lastFreePage < pagerFile->pagesAllocated); - ASSERT(lastUsedPage < pagerFile->pagesAllocated); - ASSERT(lastFreePage != lastUsedPage); - - if(lastFreePage < lastUsedPage) { - state std::pair logicalPageInfo = pagerFile->vacuumQueue[lastUsedPage]; - state PhysicalPageID newPage = pagerFile->allocatePage(logicalPageInfo.first, logicalPageInfo.second); - - debug_printf("%s: Vacuuming: copying page %u to %u\n", pager->pageFileName.c_str(), lastUsedPage, newPage); - wait(copyPage(pager, page, logicalPageInfo.first, lastUsedPage, newPage)); - - auto &pageVersionMap = pager->pageTable[logicalPageInfo.first]; - auto itr = IndirectShadowPager::pageVersionMapLowerBound(pageVersionMap, logicalPageInfo.second); - if(itr != pageVersionMap.end() && itr->second == lastUsedPage) { - itr->second = newPage; - pager->logPageTableUpdate(logicalPageInfo.first, itr->first, newPage); - pagerFile->freePage(lastUsedPage); - } - else { - TEST(true); // page was freed while vacuuming - pagerFile->freePage(newPage); - } - } - } - - PhysicalPageID firstFreePage = pagerFile->vacuumQueue.empty() ? pagerFile->minVacuumQueuePage : (pagerFile->vacuumQueue.rbegin()->first + 1); - ASSERT(pagerFile->pagesAllocated >= firstFreePage); - - uint64_t pagesToErase = 0; - if(pagerFile->freePages.size() >= SERVER_KNOBS->FREE_PAGE_VACUUM_THRESHOLD) { - pagesToErase = std::min(pagerFile->freePages.size() - SERVER_KNOBS->FREE_PAGE_VACUUM_THRESHOLD + 1, pagerFile->pagesAllocated - firstFreePage); - } - - debug_printf("%s: Vacuuming: got %llu pages to erase (freePages=%lu, pagesAllocated=%u, vacuumQueueEmpty=%u, minVacuumQueuePage=%u, firstFreePage=%u)\n", pager->pageFileName.c_str(), pagesToErase, pagerFile->freePages.size(), pagerFile->pagesAllocated, pagerFile->vacuumQueue.empty(), pagerFile->minVacuumQueuePage, firstFreePage); - - if(pagesToErase > 0) { - PhysicalPageID eraseStartPage = pagerFile->pagesAllocated - pagesToErase; - debug_printf("%s: Vacuuming: truncating last %llu pages starting at %u\n", pager->pageFileName.c_str(), pagesToErase, eraseStartPage); - - ASSERT(pagesToErase <= pagerFile->pagesAllocated); - - pagerFile->pagesAllocated = eraseStartPage; - pager->logPagesAllocated(); - - auto freePageItr = pagerFile->freePages.find(eraseStartPage); - ASSERT(freePageItr != pagerFile->freePages.end()); - - pagerFile->freePages.erase(freePageItr, pagerFile->freePages.end()); - ASSERT(pagerFile->vacuumQueue.empty() || pagerFile->vacuumQueue.rbegin()->first < eraseStartPage); - - wait(pager->dataFile->truncate((int64_t)pagerFile->pagesAllocated * IndirectShadowPage::PAGE_BYTES)); - } - - wait(delayUntil(start + (double)IndirectShadowPage::PAGE_BYTES / SERVER_KNOBS->VACUUM_BYTES_PER_SECOND)); // TODO: figure out the correct mechanism here - } -} - -PagerFile::PagerFile(IndirectShadowPager *pager) : fileSize(0), pagesAllocated(0), pager(pager), vacuumQueueReady(false), minVacuumQueuePage(0) {} - -PhysicalPageID PagerFile::allocatePage(LogicalPageID logicalPageID, Version version) { - ASSERT((int64_t)pagesAllocated * IndirectShadowPage::PAGE_BYTES <= fileSize); - ASSERT(fileSize % IndirectShadowPage::PAGE_BYTES == 0); - - PhysicalPageID allocatedPage; - if(!freePages.empty()) { - allocatedPage = *freePages.begin(); - freePages.erase(freePages.begin()); - } - else { - if((int64_t)pagesAllocated * IndirectShadowPage::PAGE_BYTES == fileSize) { - fileSize += (1 << 24); - // TODO: extend the file before writing beyond the end. - } - - ASSERT(pagesAllocated < INVALID_PAGE); // TODO: we should throw a better error here - allocatedPage = pagesAllocated++; - pager->logPagesAllocated(); - } - - markPageAllocated(logicalPageID, version, allocatedPage); - - debug_printf("%s: Allocated physical %u\n", pager->pageFileName.c_str(), allocatedPage); - return allocatedPage; -} - -void PagerFile::freePage(PhysicalPageID pageID) { - freePages.insert(pageID); - - if(pageID >= minVacuumQueuePage) { - vacuumQueue.erase(pageID); - } -} - -void PagerFile::markPageAllocated(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID) { - if(physicalPageID != INVALID_PAGE && physicalPageID >= minVacuumQueuePage) { - vacuumQueue[physicalPageID] = std::make_pair(logicalPageID, version); - } -} - -void PagerFile::finishedMarkingPages() { - if(minVacuumQueuePage >= pagesAllocated) { - minVacuumQueuePage = pagesAllocated >= SERVER_KNOBS->VACUUM_QUEUE_SIZE ? pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE : 0; - vacuumQueueReady = false; - } - else { - if(!vacuumQueueReady) { - vacuumQueueReady = true; - } - if(pagesAllocated > SERVER_KNOBS->VACUUM_QUEUE_SIZE && minVacuumQueuePage < pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE) { - minVacuumQueuePage = pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE; - auto itr = vacuumQueue.lower_bound(minVacuumQueuePage); - vacuumQueue.erase(vacuumQueue.begin(), itr); - } - } -} - -uint64_t PagerFile::size() { - return fileSize; -} - -uint32_t PagerFile::getPagesAllocated() { - return pagesAllocated; -} - -uint32_t PagerFile::getFreePages() { - return freePages.size(); -} - -void PagerFile::init(uint64_t fileSize, uint32_t pagesAllocated) { - this->fileSize = fileSize; - this->pagesAllocated = pagesAllocated; - this->minVacuumQueuePage = pagesAllocated >= SERVER_KNOBS->VACUUM_QUEUE_SIZE ? pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE : 0; -} - -void PagerFile::startVacuuming() { - vacuuming = Never(); //vacuumer(pager, this); -} - -void PagerFile::shutdown() { - vacuuming.cancel(); -} - -bool PagerFile::canVacuum() { - if(freePages.size() < SERVER_KNOBS->FREE_PAGE_VACUUM_THRESHOLD // Not enough free pages - || minVacuumQueuePage >= pagesAllocated // We finished processing all pages in the vacuum queue - || !vacuumQueueReady) // Populating vacuum queue - { - debug_printf("%s: Vacuuming: waiting for vacuumable pages (free list size=%lu, minVacuumQueuePage=%u, pages allocated=%u, vacuumQueueReady=%d)\n", pager->pageFileName.c_str(), freePages.size(), minVacuumQueuePage, pagesAllocated, vacuumQueueReady); - return false; - } - - return true; -} - -const PhysicalPageID PagerFile::INVALID_PAGE = std::numeric_limits::max(); - -extern Future simplePagerTest(IPager* const& pager); - -TEST_CASE("/fdbserver/indirectshadowpager/simple") { - state IPager *pager = new IndirectShadowPager("unittest_pageFile"); - - wait(simplePagerTest(pager)); - - Future closedFuture = pager->onClosed(); - pager->close(); - wait(closedFuture); - - return Void(); -} diff --git a/fdbserver/IndirectShadowPager.h b/fdbserver/IndirectShadowPager.h deleted file mode 100644 index 1b097df639..0000000000 --- a/fdbserver/IndirectShadowPager.h +++ /dev/null @@ -1,215 +0,0 @@ -/* - * IndirectShadowPager.h - * - * This source file is part of the FoundationDB open source project - * - * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FDBSERVER_INDIRECTSHADOWPAGER_H -#define FDBSERVER_INDIRECTSHADOWPAGER_H -#pragma once - -#include "fdbserver/IKeyValueStore.h" -#include "fdbserver/IPager.h" - -#include "flow/ActorCollection.h" -#include "fdbclient/Notified.h" - -#include "fdbrpc/IAsyncFile.h" - -typedef uint32_t PhysicalPageID; -typedef std::vector> PageVersionMap; -typedef std::vector LogicalPageTable; - -class IndirectShadowPager; - -class IndirectShadowPage : public IPage, ReferenceCounted { -public: - IndirectShadowPage(); - IndirectShadowPage(uint8_t *data, Reference file, PhysicalPageID pageID) - : file(file), physicalPageID(pageID), fastAllocated(false), data(data) {} - virtual ~IndirectShadowPage(); - - virtual void addref() const { - ReferenceCounted::addref(); - } - - virtual void delref() const { - ReferenceCounted::delref(); - } - - virtual int size() const; - virtual uint8_t const* begin() const; - virtual uint8_t* mutate(); - -//private: - static const int PAGE_BYTES; - static const int PAGE_OVERHEAD_BYTES; - -private: - Reference file; - PhysicalPageID physicalPageID; - bool fastAllocated; - uint8_t *data; -}; - -class IndirectShadowPagerSnapshot : public IPagerSnapshot, ReferenceCounted { -public: - IndirectShadowPagerSnapshot(IndirectShadowPager *pager, Version version); - - virtual Future> getPhysicalPage(LogicalPageID pageID, bool cacheable); - - virtual Version getVersion() const { - return version; - } - - virtual ~IndirectShadowPagerSnapshot() { - } - - virtual void addref() { - ReferenceCounted::addref(); - } - - virtual void delref() { - ReferenceCounted::delref(); - } - -private: - IndirectShadowPager *pager; - Version version; - Future pagerError; -}; - -class PagerFile { -public: - PagerFile(IndirectShadowPager *pager); - - PhysicalPageID allocatePage(LogicalPageID logicalPageID, Version version); - void freePage(PhysicalPageID physicalPageID); - void markPageAllocated(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID); - - void finishedMarkingPages(); - - uint64_t size(); - uint32_t getPagesAllocated(); - uint32_t getFreePages(); - - void init(uint64_t fileSize, uint32_t pagesAllocated); - void startVacuuming(); - void shutdown(); - -//private: - Future vacuuming; - IndirectShadowPager *pager; - - uint32_t pagesAllocated; - uint64_t fileSize; - - std::set freePages; - - PhysicalPageID minVacuumQueuePage; - bool vacuumQueueReady; - std::map> vacuumQueue; - - bool canVacuum(); - - static const PhysicalPageID INVALID_PAGE; -}; - -class IndirectShadowPager : public IPager { -public: - IndirectShadowPager(std::string basename); - virtual ~IndirectShadowPager() { - } - - virtual Reference newPageBuffer(); - virtual int getUsablePageSize(); - - virtual Reference getReadSnapshot(Version version); - - virtual LogicalPageID allocateLogicalPage(); - virtual void freeLogicalPage(LogicalPageID pageID, Version version); - virtual void writePage(LogicalPageID pageID, Reference contents, Version updateVersion, LogicalPageID referencePageID); - virtual void forgetVersions(Version begin, Version end); - virtual Future commit(); - - virtual void setLatestVersion(Version version); - virtual Future getLatestVersion(); - - virtual StorageBytes getStorageBytes(); - - virtual Future getError(); - virtual Future onClosed(); - virtual void dispose(); - virtual void close(); - - Future> getPage(Reference snapshot, LogicalPageID pageID, Version version); - -//private: - std::string basename; - std::string pageFileName; - - Version latestVersion; - Version committedVersion; - - LogicalPageTable pageTable; - IKeyValueStore *pageTableLog; - - Reference dataFile; - Future recovery; - - Future housekeeping; - Future vacuuming; - Version oldestVersion; - - // TODO: This structure maybe isn't needed - struct BusyPage { - Future> read; - }; - - typedef std::map BusyPageMapT; - BusyPageMapT busyPages; - - SignalableActorCollection operations; - SignalableActorCollection writeActors; - Future committing; - - Promise closed; - Promise errorPromise; - - std::deque logicalFreeList; - PagerFile pagerFile; - - static PageVersionMap::iterator pageVersionMapLowerBound(PageVersionMap &pageVersionMap, Version v); - static PageVersionMap::iterator pageVersionMapUpperBound(PageVersionMap &pageVersionMap, Version v); - - void freeLogicalPageID(LogicalPageID pageID); - void freePhysicalPageID(PhysicalPageID pageID); - - void logVersion(StringRef versionKey, Version version); - void logPagesAllocated(); - void logPageTableUpdate(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID); - void logPageTableClearToEnd(LogicalPageID logicalPageID, Version start); - void logPageTableClear(LogicalPageID logicalPageID, Version start, Version end); - - static const StringRef LATEST_VERSION_KEY; - static const StringRef OLDEST_VERSION_KEY; - static const StringRef PAGES_ALLOCATED_KEY; - static const StringRef TABLE_ENTRY_PREFIX; - -}; - -#endif diff --git a/fdbserver/MemoryPager.actor.cpp b/fdbserver/MemoryPager.actor.cpp deleted file mode 100644 index 9e6474dd01..0000000000 --- a/fdbserver/MemoryPager.actor.cpp +++ /dev/null @@ -1,456 +0,0 @@ -/* - * MemoryPager.actor.cpp - * - * This source file is part of the FoundationDB open source project - * - * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include "fdbserver/MemoryPager.h" -#include "fdbserver/Knobs.h" - -#include "flow/Arena.h" -#include "flow/UnitTest.h" -#include "flow/actorcompiler.h" - -typedef uint8_t* PhysicalPageID; -typedef std::vector> PageVersionMap; -typedef std::vector LogicalPageTable; - -class MemoryPager; - -class MemoryPage : public IPage, ReferenceCounted { -public: - MemoryPage(); - MemoryPage(uint8_t *data); - virtual ~MemoryPage(); - - virtual void addref() const { - ReferenceCounted::addref(); - } - - virtual void delref() const { - ReferenceCounted::delref(); - } - - virtual int size() const; - virtual uint8_t const* begin() const; - virtual uint8_t* mutate(); - -private: - friend class MemoryPager; - uint8_t *data; - bool allocated; - - static const int PAGE_BYTES; -}; - -class MemoryPagerSnapshot : public IPagerSnapshot, ReferenceCounted { -public: - MemoryPagerSnapshot(MemoryPager *pager, Version version) : pager(pager), version(version) {} - virtual Future> getPhysicalPage(LogicalPageID pageID, bool cacheable); - virtual Version getVersion() const { - return version; - } - - virtual void addref() { - ReferenceCounted::addref(); - } - - virtual void delref() { - ReferenceCounted::delref(); - } - -private: - MemoryPager *pager; - Version version; -}; - -class MemoryPager : public IPager, ReferenceCounted { -public: - MemoryPager(); - - virtual Reference newPageBuffer(); - virtual int getUsablePageSize(); - - virtual Reference getReadSnapshot(Version version); - - virtual LogicalPageID allocateLogicalPage(); - virtual void freeLogicalPage(LogicalPageID pageID, Version version); - virtual void writePage(LogicalPageID pageID, Reference contents, Version updateVersion, LogicalPageID referencePageID); - virtual void forgetVersions(Version begin, Version end); - virtual Future commit(); - - virtual StorageBytes getStorageBytes() { - // TODO: Get actual values for used and free memory - return StorageBytes(); - } - - virtual void setLatestVersion(Version version); - virtual Future getLatestVersion(); - - virtual Future getError(); - virtual Future onClosed(); - virtual void dispose(); - virtual void close(); - - virtual Reference getPage(LogicalPageID pageID, Version version); - -private: - Version latestVersion; - Version committedVersion; - Standalone>> data; - LogicalPageTable pageTable; - - Promise closed; - - std::vector freeList; // TODO: is this good enough for now? - - PhysicalPageID allocatePage(Reference contents); - void extendData(); - - static const PhysicalPageID INVALID_PAGE; -}; - -IPager * createMemoryPager() { - return new MemoryPager(); -} - -MemoryPage::MemoryPage() : allocated(true) { - data = (uint8_t*)FastAllocator<4096>::allocate(); -} - -MemoryPage::MemoryPage(uint8_t *data) : data(data), allocated(false) {} - -MemoryPage::~MemoryPage() { - if(allocated) { - FastAllocator<4096>::release(data); - } -} - -uint8_t const* MemoryPage::begin() const { - return data; -} - -uint8_t* MemoryPage::mutate() { - return data; -} - -int MemoryPage::size() const { - return PAGE_BYTES; -} - -const int MemoryPage::PAGE_BYTES = 4096; - -Future> MemoryPagerSnapshot::getPhysicalPage(LogicalPageID pageID, bool cacheable) { - return pager->getPage(pageID, version); -} - -MemoryPager::MemoryPager() : latestVersion(0), committedVersion(0) { - extendData(); - pageTable.resize(SERVER_KNOBS->PAGER_RESERVED_PAGES); -} - -Reference MemoryPager::newPageBuffer() { - return Reference(new MemoryPage()); -} - -int MemoryPager::getUsablePageSize() { - return MemoryPage::PAGE_BYTES; -} - -Reference MemoryPager::getReadSnapshot(Version version) { - ASSERT(version <= latestVersion); - return Reference(new MemoryPagerSnapshot(this, version)); -} - -LogicalPageID MemoryPager::allocateLogicalPage() { - ASSERT(pageTable.size() >= SERVER_KNOBS->PAGER_RESERVED_PAGES); - pageTable.push_back(PageVersionMap()); - return pageTable.size() - 1; -} - -void MemoryPager::freeLogicalPage(LogicalPageID pageID, Version version) { - ASSERT(pageID < pageTable.size()); - - PageVersionMap &pageVersionMap = pageTable[pageID]; - ASSERT(!pageVersionMap.empty()); - - auto itr = std::lower_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](std::pair p, Version v) { - return p.first < v; - }); - - pageVersionMap.erase(itr, pageVersionMap.end()); - if(pageVersionMap.size() > 0 && pageVersionMap.back().second != INVALID_PAGE) { - pageVersionMap.push_back(std::make_pair(version, INVALID_PAGE)); - } -} - -void MemoryPager::writePage(LogicalPageID pageID, Reference contents, Version updateVersion, LogicalPageID referencePageID) { - ASSERT(updateVersion > latestVersion || updateVersion == 0); - ASSERT(pageID < pageTable.size()); - - if(referencePageID != invalidLogicalPageID) { - PageVersionMap &rpv = pageTable[referencePageID]; - ASSERT(!rpv.empty()); - updateVersion = rpv.back().first; - } - - PageVersionMap &pageVersionMap = pageTable[pageID]; - - ASSERT(updateVersion >= committedVersion || updateVersion == 0); - PhysicalPageID physicalPageID = allocatePage(contents); - - ASSERT(pageVersionMap.empty() || pageVersionMap.back().second != INVALID_PAGE); - - if(updateVersion == 0) { - ASSERT(pageVersionMap.size()); - updateVersion = pageVersionMap.back().first; - pageVersionMap.back().second = physicalPageID; - // TODO: what to do with old page? - } - else { - ASSERT(pageVersionMap.empty() || pageVersionMap.back().first < updateVersion); - pageVersionMap.push_back(std::make_pair(updateVersion, physicalPageID)); - } - -} - -void MemoryPager::forgetVersions(Version begin, Version end) { - ASSERT(begin <= end); - ASSERT(end <= latestVersion); - // TODO -} - -Future MemoryPager::commit() { - ASSERT(committedVersion < latestVersion); - committedVersion = latestVersion; - return Void(); -} - -void MemoryPager::setLatestVersion(Version version) { - ASSERT(version > latestVersion); - latestVersion = version; -} - -Future MemoryPager::getLatestVersion() { - return latestVersion; -} - -Reference MemoryPager::getPage(LogicalPageID pageID, Version version) { - ASSERT(pageID < pageTable.size()); - PageVersionMap const& pageVersionMap = pageTable[pageID]; - - auto itr = std::upper_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](Version v, std::pair p) { - return v < p.first; - }); - - if(itr == pageVersionMap.begin()) { - return Reference(); // TODO: should this be an error? - } - - --itr; - - ASSERT(itr->second != INVALID_PAGE); - return Reference(new MemoryPage(itr->second)); // TODO: Page memory owned by the pager. Change this? -} - -Future MemoryPager::getError() { - return Void(); -} - -Future MemoryPager::onClosed() { - return closed.getFuture(); -} - -void MemoryPager::dispose() { - closed.send(Void()); - delete this; -} - -void MemoryPager::close() { - dispose(); -} - -PhysicalPageID MemoryPager::allocatePage(Reference contents) { - if(freeList.size()) { - PhysicalPageID pageID = freeList.back(); - freeList.pop_back(); - - memcpy(pageID, contents->begin(), contents->size()); - return pageID; - } - else { - ASSERT(data.size() && data.back().capacity() - data.back().size() >= contents->size()); - PhysicalPageID pageID = data.back().end(); - - data.back().append(data.arena(), contents->begin(), contents->size()); - if(data.back().size() == data.back().capacity()) { - extendData(); - } - else { - ASSERT(data.back().size() <= data.back().capacity() - 4096); - } - - return pageID; - } -} - -void MemoryPager::extendData() { - if(data.size() > 1000) { // TODO: is this an ok way to handle large data size? - throw io_error(); - } - - VectorRef d; - d.reserve(data.arena(), 1 << 22); - data.push_back(data.arena(), d); -} - -// TODO: these tests are not MemoryPager specific, we should make them more general - -void fillPage(Reference page, LogicalPageID pageID, Version version) { - ASSERT(page->size() > sizeof(LogicalPageID) + sizeof(Version)); - - memset(page->mutate(), 0, page->size()); - memcpy(page->mutate(), (void*)&pageID, sizeof(LogicalPageID)); - memcpy(page->mutate() + sizeof(LogicalPageID), (void*)&version, sizeof(Version)); -} - -bool validatePage(Reference page, LogicalPageID pageID, Version version) { - bool valid = true; - - LogicalPageID readPageID = *(LogicalPageID*)page->begin(); - if(readPageID != pageID) { - fprintf(stderr, "Invalid PageID detected: %u (expected %u)\n", readPageID, pageID); - valid = false; - } - - Version readVersion = *(Version*)(page->begin()+sizeof(LogicalPageID)); - if(readVersion != version) { - fprintf(stderr, "Invalid Version detected on page %u: %" PRId64 "(expected %" PRId64 ")\n", pageID, readVersion, version); - valid = false; - } - - return valid; -} - -void writePage(IPager *pager, Reference page, LogicalPageID pageID, Version version, bool updateVersion=true) { - fillPage(page, pageID, version); - pager->writePage(pageID, page, updateVersion ? version : 0); -} - -ACTOR Future commit(IPager *pager) { - static int commitNum = 1; - state int myCommit = commitNum++; - - debug_printf("Commit%d\n", myCommit); - wait(pager->commit()); - debug_printf("FinishedCommit%d\n", myCommit); - return Void(); -} - -ACTOR Future read(IPager *pager, LogicalPageID pageID, Version version, Version expectedVersion=-1) { - static int readNum = 1; - state int myRead = readNum++; - state Reference readSnapshot = pager->getReadSnapshot(version); - debug_printf("Read%d\n", myRead); - Reference readPage = wait(readSnapshot->getPhysicalPage(pageID, true)); - debug_printf("FinishedRead%d\n", myRead); - ASSERT(validatePage(readPage, pageID, expectedVersion >= 0 ? expectedVersion : version)); - return Void(); -} - -ACTOR Future simplePagerTest(IPager *pager) { - state Reference page = pager->newPageBuffer(); - - Version latestVersion = wait(pager->getLatestVersion()); - debug_printf("Got latest version: %lld\n", latestVersion); - - state Version version = latestVersion+1; - state Version v1 = version; - - state LogicalPageID pageID1 = pager->allocateLogicalPage(); - - writePage(pager, page, pageID1, v1); - pager->setLatestVersion(v1); - wait(commit(pager)); - - state LogicalPageID pageID2 = pager->allocateLogicalPage(); - - state Version v2 = ++version; - - writePage(pager, page, pageID1, v2); - writePage(pager, page, pageID2, v2); - pager->setLatestVersion(v2); - wait(commit(pager)); - - wait(read(pager, pageID1, v2)); - wait(read(pager, pageID1, v1)); - - state Version v3 = ++version; - writePage(pager, page, pageID1, v3, false); - pager->setLatestVersion(v3); - - wait(read(pager, pageID1, v2, v3)); - wait(read(pager, pageID1, v3, v3)); - - state LogicalPageID pageID3 = pager->allocateLogicalPage(); - - state Version v4 = ++version; - writePage(pager, page, pageID2, v4); - writePage(pager, page, pageID3, v4); - pager->setLatestVersion(v4); - wait(commit(pager)); - - wait(read(pager, pageID2, v4, v4)); - - state Version v5 = ++version; - writePage(pager, page, pageID2, v5); - - state LogicalPageID pageID4 = pager->allocateLogicalPage(); - writePage(pager, page, pageID4, v5); - - state Version v6 = ++version; - pager->freeLogicalPage(pageID2, v5); - pager->freeLogicalPage(pageID3, v3); - pager->setLatestVersion(v6); - wait(commit(pager)); - - pager->forgetVersions(0, v4); - wait(commit(pager)); - - wait(delay(3.0)); - - wait(commit(pager)); - - return Void(); -} - -/* -TEST_CASE("/fdbserver/memorypager/simple") { - state IPager *pager = new MemoryPager(); - - wait(simplePagerTest(pager)); - - Future closedFuture = pager->onClosed(); - pager->dispose(); - - wait(closedFuture); - return Void(); -} -*/ - -const PhysicalPageID MemoryPager::INVALID_PAGE = nullptr; diff --git a/fdbserver/MemoryPager.h b/fdbserver/MemoryPager.h deleted file mode 100644 index 359c443de7..0000000000 --- a/fdbserver/MemoryPager.h +++ /dev/null @@ -1,29 +0,0 @@ -/* - * MemoryPager.h - * - * This source file is part of the FoundationDB open source project - * - * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FDBSERVER_MEMORYPAGER_H -#define FDBSERVER_MEMORYPAGER_H -#pragma once - -#include "fdbserver/IPager.h" - -IPager * createMemoryPager(); - -#endif \ No newline at end of file diff --git a/fdbserver/PrefixTree.h b/fdbserver/PrefixTree.h deleted file mode 100644 index 2f67c20ccd..0000000000 --- a/fdbserver/PrefixTree.h +++ /dev/null @@ -1,1049 +0,0 @@ -/* - * PrefixTree.h - * - * This source file is part of the FoundationDB open source project - * - * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "flow/flow.h" -#include "flow/Arena.h" -#include "fdbclient/FDBTypes.h" -#include "fdbserver/Knobs.h" -#include - -typedef uint64_t Word; -static inline int commonPrefixLength(uint8_t const* ap, uint8_t const* bp, int cl) { - int i = 0; - const int wordEnd = cl - sizeof(Word) + 1; - - for(; i < wordEnd; i += sizeof(Word)) { - Word a = *(Word *)ap; - Word b = *(Word *)bp; - if(a != b) { - return i + ctzll(a ^ b) / 8; - } - ap += sizeof(Word); - bp += sizeof(Word); - } - - for (; i < cl; i++) { - if (*ap != *bp) { - return i; - } - ++ap; - ++bp; - } - return cl; -} - -static int commonPrefixLength(StringRef a, StringRef b) { - return commonPrefixLength(a.begin(), b.begin(), std::min(a.size(), b.size())); -} - -// This appears to be the fastest version -static int lessOrEqualPowerOfTwo(int n) { - int p; - for (p = 1; p+p <= n; p+=p); - return p; -} - -/* -static int _lessOrEqualPowerOfTwo(uint32_t n) { - if(n == 0) - return n; - int trailing = __builtin_ctz(n); - int leading = __builtin_clz(n); - if(trailing + leading == ((sizeof(n) * 8) - 1)) - return n; - return 1 << ( (sizeof(n) * 8) - leading - 1); -} - -static int __lessOrEqualPowerOfTwo(unsigned int n) { - int p = 1; - for(; p <= n; p <<= 1); - return p >> 1; -} -*/ - -static int perfectSubtreeSplitPoint(int subtree_size) { - // return the inorder index of the root node in a subtree of the given size - // consistent with the resulting binary search tree being "perfect" (having minimal height - // and all missing nodes as far right as possible). - // There has to be a simpler way to do this. - int s = lessOrEqualPowerOfTwo((subtree_size - 1) / 2 + 1) - 1; - return std::min(s * 2 + 1, subtree_size - s - 1); -} - -static int perfectSubtreeSplitPointCached(int subtree_size) { - static uint16_t *points = nullptr; - static const int max = 500; - if(points == nullptr) { - points = new uint16_t[max]; - for(int i = 0; i < max; ++i) - points[i] = perfectSubtreeSplitPoint(i); - } - - if(subtree_size < max) - return points[subtree_size]; - return perfectSubtreeSplitPoint(subtree_size); -} - -struct PrefixTree { - // TODO: Make PrefixTree use a more complex record type with a multi column key - typedef KeyValueRef EntryRef; - typedef Standalone Entry; - - static int MaximumTreeSize() { - return std::numeric_limits::max(); - }; - - struct Node { - uint8_t flags; - -/* - * Node fields - * - * Logically, a node has the following things - * - Flags describing what is in the node - * - Optional left child - * - Optional right child - * - Prefix string, described by a length and a source (which is the most recent left or right ancestor) - * - Optional split string, which contains any bytes after prefix which are needed to make a branching decision - * - Optional suffix string, containing any remaining key bytes after the split string - * - Optional value string - * - * The physical layout places the left child subtree immediately after the split string so that it is likely - * that the bytes read to make a branching decision and then choosing left (as should happen half of the time) - * will have a high cache hit rate. - * - * If necessary, the flags byte could be an enumeration into a set of possible options, since not all options - * combinations are needed. For example, - * - * - The tree is balanced and filled from the left at the last level, so a node cannot have only a right child. - * - If there are no children, there is no point in splitting any key bytes after the prefix into separate strings. - * - If there is exactly one child (left) then the key bytes after the prefix can all go in the split string. The - * traversal decision is to either stop or go left and one of those options (stop) will still have good memory - * locality. - * - * 8 valid/necessary option combinations for presense of (Left, Right, Split, Suffix) out of 16 possibilities - * - * L R Split Suffix - * - * N N N N # No children, key has no bytes after prefix - * N N Y N # No children, key has bytes after prefix - * Y N N N # One child, key has no bytes after prefix - * Y N Y N # One child, key has bytes after prefix - * Y Y N N # Two children, key has no bytes after prefix - * Y Y N Y # Two children, branch decision can be made using only prefix bytes but there are more key bytes after - * Y Y Y N # Two children, branch decision requires all key bytes after prefix - * Y Y Y Y # Two children, branch decision requires some but not all bytes after prefix - * - * This can be represent with just 3 bits, if necessary, but for now there is space in the flags byte for all 4. - * - * Flag Bits - * - * prefix borrow from next - * true - borrow from the closest ancestor greater than this node - * false - borrow from the closest ancestor less than this node - * large lengths = use 2 byte ints instead of 1 byte for prefix, split, suffix, and value lengths - * (TODO: It might be better to just not use a suffix at all when large is lengths is set) - * left child present - * right child present - * split string present - * suffix string present - * value string present - * - * Serialized format: - * All lengths are in the header, which has variable size - * - * flags 1 byte - * prefix length 1-2 bytes based on large lengths flag - * split length 0-2 bytes based on split string present flag - * suffix length 0-2 bytes based on suffix string present and large lengths flags - * value length 0-1 bytes based on value string present and large lengths flag - * left length 0 or 2 bytes depending on left child present - * split 0+ bytes - * left child 0+ bytes - * suffix 0+ bytes - * value 0+ bytes - * right child 0+ bytes - * - */ - enum EFlags { - USE_LARGE_LENGTHS = 1 << 0, - PREFIX_SOURCE_NEXT = 1 << 1, - HAS_LEFT_CHILD = 1 << 2, - HAS_RIGHT_CHILD = 1 << 3, - HAS_SPLIT = 1 << 4, - HAS_SUFFIX = 1 << 5, - HAS_VALUE = 1 << 6 - }; - - // Stores decoded offsets (from beginning) of Node components - struct Parser { - Parser() {} - Parser(const Node *n) { - init(n); - } - - const Node *node; - - typedef uint16_t OffsetT; - OffsetT headerLen; - OffsetT prefixLen; - OffsetT leftPos; - OffsetT suffixPos; - OffsetT valuePos; - OffsetT rightPos; - - StringRef splitString() const { - return StringRef((const uint8_t *)node + headerLen, leftPos); - } - StringRef suffixString() const { - return StringRef((const uint8_t *)node + headerLen + suffixPos, valuePos - suffixPos); - } - StringRef valueString() const { - return StringRef((const uint8_t *)node + headerLen + valuePos, rightPos - valuePos); - } - const Node *leftChild() const { - if(node->flags & HAS_LEFT_CHILD) - return (const Node *)((const uint8_t *)node + headerLen + leftPos); - return nullptr; - } - const Node *rightChild() const { - if(node->flags & HAS_RIGHT_CHILD) - return (const Node *)((const uint8_t *)node + headerLen + rightPos); - return nullptr; - } - int keyLen() const { - int len = prefixLen + leftPos + (valuePos - suffixPos); - ASSERT(len >= 0); - return len; - } - - void init(const Node *n) { - node = n; - union { - const uint8_t *p8; - const uint16_t *p16; - }; - p8 = (const uint8_t *)&n->flags + 1; - - int flags = n->flags; - bool large = flags & USE_LARGE_LENGTHS; - - prefixLen = large ? *p16++ : *p8++; - - if(flags & HAS_SPLIT) - leftPos = large ? *p16++ : *p8++; - else - leftPos = 0; - suffixPos = leftPos; - if(flags & HAS_LEFT_CHILD) - suffixPos += *p16++; - - valuePos = suffixPos; - if(flags & HAS_SUFFIX) - valuePos += (large ? *p16++ : *p8++); - - rightPos = valuePos; - if(flags & HAS_VALUE) - rightPos += (large ? *p16++ : *p8++); - - int header = 2; // flags byte, first prefix len byte - if(large) - ++header; // second prefix len byte - if(flags & HAS_SPLIT) - header += large ? 2 : 1; - if(flags & HAS_LEFT_CHILD) - header += 2; - if(flags & HAS_SUFFIX) - header += large ? 2 : 1; - if(flags & HAS_VALUE) - header += large ? 2 : 1; - headerLen = header; - } - }; - - static inline int getMaxOverhead(int index, int keySize, int valueSize) { - bool large = keySize > 255 || valueSize > 255; - int overhead = 1 + (large ? 2 : 1); // flags and prefix len - // Value length size if present - if(valueSize > 0) - overhead += large ? 2 : 1; - overhead += large ? 6 : 3; // Worst case scenario for value, split and suffix lengths - if((index & 0x01) != 0) - overhead += 2; // Left child length, one less than half of nodes will have one. - return overhead; - } - - public: - - // Methods for decoding specific Node members on-demand - inline int getPrefixLen() const { - return Parser(this).prefixLen; - } - - inline StringRef getSplitString() const { - return Parser(this).splitString(); - } - - inline StringRef getSuffixString() const { - return Parser(this).suffixString(); - } - - inline StringRef getValueString() const { - return Parser(this).valueString(); - } - - inline const Node * getLeftChild() const { - return Parser(this).leftChild(); - } - - inline const Node * getRightChild() const { - return Parser(this).rightChild(); - } - - inline int getKeySize() const { - return Parser(this).keyLen(); - } - }; - -#pragma pack(push,1) - uint16_t size; // size in bytes - Node root; -#pragma pack(pop) - - static inline int GetHeaderSize() { - return sizeof(PrefixTree) - sizeof(root); - } - -private: - struct PathEntry { - const Node *node; - Node::Parser parser; - - // Key may or may not point to the space within keyBuffer. - // Key will always contain at least the prefix bytes borrowed by node - // KeyBuffer will always be large enough to hold the entire reconstituted key for node - // - // These are mutable because getting key bytes from this PathEntry can change these - // but they're really just a read cache for reconstituted key bytes. - mutable StringRef key; - mutable Standalone> keyBuffer; - - // Path entry was reached by going left from the previous node - bool nodeIsLeftChild; - // number of consecutive moves in same direction - int moves; - - PathEntry() : node(nullptr) { - } - PathEntry(const PathEntry &rhs) { - *this = rhs; - } - - // Initialize the key byte buffer to hold bytes of a new node. Use a new arena - // if the old arena is being held by any users. - void initKeyBufferSpace() { - if(node != nullptr) { - int size = parser.keyLen(); - if(keyBuffer.arena().impl && !keyBuffer.arena().impl->isSoleOwnerUnsafe()) { - keyBuffer = Standalone>(); - } - keyBuffer.reserve(keyBuffer.arena(), size); - } - } - - PathEntry & operator= (const PathEntry &rhs) { - node = rhs.node; - parser = rhs.parser; - nodeIsLeftChild = rhs.nodeIsLeftChild; - moves = rhs.moves; - // New key buffer must be able to hold full reconstituted key, not just the - // part of it referenced by rhs.key (which may not be the whole thing) - initKeyBufferSpace(); - if(node != nullptr && rhs.key.size() > 0) { - // Copy rhs.key into keyBuffer and set key to the destination bytes - memcpy(keyBuffer.begin(), rhs.key.begin(), rhs.key.size()); - key = StringRef(keyBuffer.begin(), rhs.key.size()); - } - else { - key = rhs.key; - } - return *this; - } - - void init(StringRef s) { - node = nullptr; - key = s; - } - - void init(const Node *_node, const PathEntry *prefixSource, bool isLeft, int numMoves) { - node = _node; - parser.init(node); - nodeIsLeftChild = isLeft; - moves = numMoves; - - // keyBuffer will be large enough to hold the full reconstituted key but initially - // key will be a reference returned from prefixSource->getKeyRef() - // See comments near keyBuffer and key for more info. - initKeyBufferSpace(); - key = prefixSource->getKeyRef(parser.prefixLen); - } - - inline bool valid() const { - return node != nullptr; - } - - int compareToKey(StringRef s) const { - // Key has at least this node's borrowed prefix bytes in it. - // If s is shorter than key, we only need to compare it to key - if(s.size() < key.size()) - return s.compare(key); - - int cmp = s.substr(0, key.size()).compare(key); - if(cmp != 0) - return cmp; - - // The borrowed prefix bytes and possibly more have already been compared and were equal - int comparedLen = key.size(); - s = s.substr(comparedLen); - StringRef split = parser.splitString(); - int splitSizeOriginal = split.size(); - int splitStart = comparedLen - parser.prefixLen; - if(splitStart < split.size()) { - split = split.substr(splitStart); - if(s.size() < split.size()) - return s.compare(split); - cmp = s.substr(0, split.size()).compare(split); - if(cmp != 0) - return cmp; - s = s.substr(split.size()); - comparedLen += split.size(); - } - - int suffixStart = comparedLen - (parser.prefixLen + splitSizeOriginal); - StringRef suffix = parser.suffixString(); - ASSERT(suffixStart >= 0 && suffixStart <= suffix.size()); - return s.compare(suffix.substr(suffixStart)); - } - - // Make sure that key refers to bytes in keyBuffer, copying if necessary - void ensureKeyInBuffer() const { - if(key.begin() != keyBuffer.begin()) { - memcpy(keyBuffer.begin(), key.begin(), key.size()); - key = StringRef(keyBuffer.begin(), key.size()); - } - } - - // Get the borrowed prefix string. Key must contain all of those bytes but it could contain more. - StringRef getPrefix() const { - if(node == nullptr) - return key; - return key.substr(0, parser.prefixLen); - } - - // Return a reference to the first size bytes of the key. - // - // If size <= key's size then a substring of key will be returned, but if alwaysUseKeyBuffer - // is true then before returning the existing value of key (not just the first size bytes) - // will be copied into keyBuffer and key will be updated to point there. - // - // If size is greater than key's size, then key will be moved into keyBuffer if it is not already there - // and the remaining needed bytes will be copied into keyBuffer from the split and suffix strings. - KeyRef getKeyRef(int size = -1, bool alwaysUseKeyBuffer = false) const { - if(size < 0) - size = parser.keyLen(); - - // If size is less than key then return a substring of it, possibly after moving it to the keyBuffer. - if(size <= key.size()) { - if(alwaysUseKeyBuffer) - ensureKeyInBuffer(); - return key.substr(0, size); - } - - ASSERT(node != nullptr); - ensureKeyInBuffer(); - - // The borrowed prefix bytes and possibly more must already be in key - int writtenLen = key.size(); - StringRef split = parser.splitString(); - StringRef suffix = parser.suffixString(); - int splitStart = writtenLen - parser.prefixLen; - if(splitStart < split.size()) { - int splitLen = std::min(split.size() - splitStart, size - writtenLen); - memcpy(mutateString(key) + writtenLen, split.begin() + splitStart, splitLen); - writtenLen += splitLen; - } - int suffixStart = writtenLen - parser.prefixLen - split.size(); - if(suffixStart < suffix.size()) { - int suffixLen = std::min(suffix.size() - suffixStart, size - writtenLen); - memcpy(mutateString(key) + writtenLen, suffix.begin() + suffixStart, suffixLen); - writtenLen += suffixLen; - } - ASSERT(writtenLen == size); - key = StringRef(key.begin(), size); - return key; - } - - // Return keyRef(size) and the arena that keyBuffer resides in. - Key getKey(int size = -1) const { - StringRef k = getKeyRef(size, true); - return Key(k, keyBuffer.arena()); - } - }; - -public: - // Cursor provides a way to seek into a PrefixTree and iterate over its content - // Seek and move methods can return false can return false if they fail to achieve the desired effect - // but a cursor will remain 'valid' as long as the tree is not empty. - // - // It coalesces prefix bytes into a contiguous buffer for each node along the traversal - // path to make iteration faster. - struct Cursor { - Cursor() : pathLen(0) { - } - - Cursor(const Node *root, StringRef prevAncestor, StringRef nextAncestor) { - init(root, prevAncestor, nextAncestor); - } - - static const int initialPathLen = 3; - static const int initialPathCapacity = 20; - // This is a separate function so that Cursors can be reused to search different PrefixTrees - // which avoids cursor destruction and creation which involves unnecessary memory churn. - // The root node is arbitrarily assumed to be a right child of prevAncestor which itself is a left child of nextAncestor - void init(const Node *root, StringRef prevAncestor, StringRef nextAncestor) { - if(path.size() < initialPathCapacity) - path.resize(initialPathCapacity); - pathLen = initialPathLen; - path[0].init(nextAncestor); - path[1].init(prevAncestor); - path[2].init(root, &path[root->flags & Node::PREFIX_SOURCE_NEXT ? 0 : 1], false, 1); - } - - bool operator == (const Cursor &rhs) const { - return pathBack().node == rhs.pathBack().node; - } - - StringRef leftParentBoundary; - StringRef rightParentBoundary; - std::vector path; - // pathLen is the number of elements in path which are in use. This is to prevent constantly destroying - // and constructing PathEntry objects which would unnecessarily churn through memory in Arena for storing - // coalesced prefixes. - int pathLen; - - bool valid() const { - return pathLen != 0 && pathBack().valid(); - } - - // Get a reference to the current key which is valid until the Cursor is moved. - KeyRef getKeyRef() const { - return pathBack().getKeyRef(); - } - - // Get a Standalone for the current key which will still be valid after the Cursor is moved. - Key getKey() const { - return pathBack().getKey(); - } - - // Get a reference to the current value which is valid as long as the Cursor's page memory exists. - ValueRef getValueRef() const { - return pathBack().parser.valueString(); - } - - // Get a key/value reference that is valid until the Cursor is moved. - EntryRef getKVRef() const { - return EntryRef(getKeyRef(), getValueRef()); - } - - // Returns a standalone EntryRef where both key and value exist in the standalone's arena, - // unless copyValue is false in which case the value will be a reference into tree memory. - Entry getKV(bool copyValue = true) const { - Key k = getKey(); - ValueRef v = getValueRef(); - if(copyValue) - v = ValueRef(k.arena(), getValueRef()); - return Entry(EntryRef(k, v), k.arena()); - } - - // Moves the cursor to the node with the greatest key less than or equal to s. If successful, - // returns true, otherwise returns false and the cursor will be at the node with the next key - // greater than s. - bool seekLessThanOrEqual(StringRef s) { - if(pathLen == 0) - return false; - - pathLen = initialPathLen; - - // TODO: Track position of difference and use prefix reuse bytes and prefix sources - // to skip comparison of some prefix bytes when possible - while(1) { - const PathEntry &p = pathBack(); - const Node *right = p.parser.rightChild(); - _mm_prefetch((const char*)right, _MM_HINT_T0); - - int cmp = p.compareToKey(s); - if(cmp == 0) - return true; - - if(cmp < 0) { - // Try to traverse left - const Node *left = p.parser.leftChild(); - if(left == nullptr) { - // If we're at the root, cursor should now be before the first element - if(pathLen == initialPathLen) { - return false; - } - - if(p.nodeIsLeftChild) { - // If we only went left, cursor should now be before the first element - if((p.moves + initialPathLen) == pathLen) { - return false; - } - - // Otherwise, go to the parent of the last right child traversed, - // which is the last node from which we went right - popPath(p.moves + 1); - return true; - } - - // p.directionLeft is false, so p.node is a right child, so go to its parent. - popPath(1); - return true; - } - - int newMoves = p.nodeIsLeftChild ? p.moves + 1 : 1; - const PathEntry *borrowSource = (left->flags & Node::PREFIX_SOURCE_NEXT) ? &p : &p - newMoves; - pushPath(left, borrowSource, true, newMoves); - } - else { - // Try to traverse right - if(right == nullptr) { - return true; - } - - int newMoves = p.nodeIsLeftChild ? 1 : p.moves + 1; - const PathEntry *borrowSource = (right->flags & Node::PREFIX_SOURCE_NEXT) ? &p - newMoves : &p; - pushPath(right, borrowSource, false, newMoves); - } - } - } - - inline const PathEntry &pathBack() const { - return path[pathLen - 1]; - } - - inline PathEntry &pathBack() { - return path[pathLen - 1]; - } - - inline void pushPath(const Node *node, const PathEntry *borrowSource, bool left, int moves) { - ++pathLen; - if(path.size() < pathLen) { - path.resize(pathLen); - } - pathBack().init(node, borrowSource, left, moves); - } - - inline void popPath(int n) { - pathLen -= n; - } - - std::string pathToString() const { - std::string s; - for(int i = 0; i < pathLen; ++i) { - s += format("(%d: ", i); - const Node *node = path[i].node; - if(node != nullptr) { - s += "childDir="; - s += (path[i].nodeIsLeftChild ? "left " : "right "); - } - s += format("prefix='%s'", path[i].getPrefix().toHexString(20).c_str()); - if(node != nullptr) { - s += format(" split='%s' suffix='%s' value='%s'", node->getSplitString().toHexString(20).c_str(), node->getSuffixString().toHexString(20).c_str(), node->getValueString().toHexString(20).c_str()); - } - else - s += ") "; - } - return s; - } - - bool moveFirst() { - if(pathLen == 0) - return false; - - pathLen = initialPathLen; - - while(1) { - const PathEntry &p = pathBack(); - const Node *left = p.parser.leftChild(); - - if(left == nullptr) - break; - - // TODO: This can be simpler since it only goes left - int newMoves = p.nodeIsLeftChild ? p.moves + 1 : 1; - const PathEntry *borrowSource = (left->flags & Node::PREFIX_SOURCE_NEXT) ? &p : &p - newMoves; - pushPath(left, borrowSource, true, newMoves); - } - - return true; - } - - bool moveLast() { - if(pathLen == 0) - return false; - - pathLen = initialPathLen; - - while(1) { - const PathEntry &p = pathBack(); - const Node *right = p.parser.rightChild(); - - if(right == nullptr) - break; - - // TODO: This can be simpler since it only goes right - int newMoves = p.nodeIsLeftChild ? 1 : p.moves + 1; - const PathEntry *borrowSource = (right->flags & Node::PREFIX_SOURCE_NEXT) ? &p - newMoves : &p; - pushPath(right, borrowSource, false, newMoves); - } - - return true; - } - - bool moveNext() { - const PathEntry &p = pathBack(); - - // If p isn't valid - if(!p.valid()) { - return false; - } - - const Node *right = p.parser.rightChild(); - - // If we can't go right, then go upward to the parent of the last left child - if(right == nullptr) { - // If current node was a left child then pop one node and we're done - if(p.nodeIsLeftChild) { - popPath(1); - return true; - } - - // Current node is a right child. - // If we are at the rightmost tree node return false and don't move. - if(p.moves + initialPathLen - 1 == pathLen) { - return false; - } - - // Truncate path to the parent of the last left child - popPath(p.moves + 1); - return true; - } - - // Go right - int newMoves = p.nodeIsLeftChild ? 1 : p.moves + 1; - const PathEntry *borrowSource = (right->flags & Node::PREFIX_SOURCE_NEXT) ? &p - newMoves : &p; - pushPath(right, borrowSource, false, newMoves); - - // Go left as far as possible - while(1) { - const PathEntry &p = pathBack(); - const Node *left = p.parser.leftChild(); - if(left == nullptr) { - return true; - } - - int newMoves = p.nodeIsLeftChild ? p.moves + 1 : 1; - const PathEntry *borrowSource = (left->flags & Node::PREFIX_SOURCE_NEXT) ? &p : &p - newMoves; - pushPath(left, borrowSource, true, newMoves); - } - } - - bool movePrev() { - const PathEntry &p = pathBack(); - - // If p isn't valid - if(!p.valid()) { - return false; - } - - const Node *left = p.parser.leftChild(); - - // If we can't go left, then go upward to the parent of the last right child - if(left == nullptr) { - // If current node was a right child - if(!p.nodeIsLeftChild) { - // If we are at the root then don't move and return false. - if(pathLen == initialPathLen) - return false; - - // Otherwise, pop one node from the path and return true. - popPath(1); - return true; - } - - // Current node is a left child. - // If we are at the leftmost tree node then return false and don't move. - if(p.moves + 3 == pathLen) { - return false; - } - - // Truncate path to the parent of the last right child - popPath(p.moves + 1); - return true; - } - - // Go left - int newMoves = p.nodeIsLeftChild ? p.moves + 1 : 1; - const PathEntry *borrowSource = (left->flags & Node::PREFIX_SOURCE_NEXT) ? &p : &p - newMoves; - pushPath(left, borrowSource, true, newMoves); - - // Go right as far as possible - while(1) { - const PathEntry &p = pathBack(); - const Node *right = p.parser.rightChild(); - if(right == nullptr) { - return true; - } - - int newMoves = p.nodeIsLeftChild ? 1 : p.moves + 1; - const PathEntry *borrowSource = (right->flags & Node::PREFIX_SOURCE_NEXT) ? &p - newMoves : &p; - pushPath(right, borrowSource, false, newMoves); - } - } - - }; - - Cursor getCursor(StringRef prevAncestor, StringRef nextAncestor) const { - return (size != 0) ? Cursor(&root, prevAncestor, nextAncestor) : Cursor(); - } - - static std::string escapeForDOT(StringRef s) { - std::string r = "\""; - for(char c : s) { - if(c == '\n') - r += "\\n"; - else if(isprint(c) && c != '"') - r += c; - else - r += format("{%02X}", c); - } - return r + '"'; - } - - std::string toDOT(StringRef prevAncestor, StringRef nextAncestor) const { - auto c = getCursor(prevAncestor, nextAncestor); - c.moveFirst(); - - std::string r; - r += format("digraph PrefixTree%p {\n", this); - - do { - const PathEntry &p = c.pathBack(); - const Node *n = p.node; - const Node *left = p.parser.leftChild(); - const Node *right = p.parser.rightChild(); - - std::string label = escapeForDOT(format("PrefixSource: %s\nPrefix: [%s]\nSplit: %s\nSuffix: %s", - n->flags & Node::PREFIX_SOURCE_NEXT ? "Left" : "Right", - p.getPrefix().toString().c_str(), - p.parser.splitString().toString().c_str(), - p.parser.suffixString().toString().c_str() - )); - - r += format("node%p [ label = %s ];\nnode%p -> { %s %s };\n", n, label.c_str(), n, - left ? format("node%p", left).c_str() : "", - right ? format("node%p", right).c_str() : "" - ); - - } while(c.moveNext()); - - r += "}\n"; - - return r; - } - - // Returns number of bytes written - int build(const EntryRef *begin, const EntryRef *end, StringRef prevAncestor, StringRef nextAncestor) { - // The boundary leading to the new page acts as the last time we branched right - if(begin == end) { - size = 0; - } - else { - size = sizeof(size) + build(root, begin, end, nextAncestor, prevAncestor); - } - ASSERT(size <= MaximumTreeSize()); - return size; - } - -private: - static uint16_t build(Node &root, const EntryRef *begin, const EntryRef *end, const StringRef &nextAncestor, const StringRef &prevAncestor) { - ASSERT(end != begin); - - int count = end - begin; - - // Find key to be stored in root - int mid = perfectSubtreeSplitPointCached(count); - const StringRef &key = begin[mid].key; - const StringRef &val = begin[mid].value; - - // Since key must be between lastLeft and lastRight, any common prefix they share must be shared by key - // so rather than comparing all of key to each one separately we can just compare lastLeft and lastRight - // to each other and then skip over the resulting length in key - int nextPrevCommon = commonPrefixLength(nextAncestor.begin(), prevAncestor.begin(), std::min(nextAncestor.size(), prevAncestor.size())); - - // Pointer to remainder of key after the left/right common bytes - const uint8_t *keyExt = key.begin() + nextPrevCommon; - - // Find out how many bytes beyond leftRightCommon key has with each last left/right string separately - int extNext = commonPrefixLength(keyExt, nextAncestor.begin() + nextPrevCommon, std::min(key.size(), nextAncestor.size()) - nextPrevCommon); - int extPrev = commonPrefixLength(keyExt, prevAncestor.begin() + nextPrevCommon, std::min(key.size(), prevAncestor.size()) - nextPrevCommon); - - // Use the longer result - bool prefixSourceNext = extNext > extPrev; - - int prefixLen = nextPrevCommon + (prefixSourceNext ? extNext : extPrev); - - int splitLen; // Bytes after prefix required to make traversal decision - int suffixLen; // Remainder of key bytes after split key portion - - //printf("build: '%s'\n prefixLen %d prefixSourceNext %d\n", key.toHexString(20).c_str(), prefixLen, prefixSourceNext); - - // 2 entries or less means no right child, so just put all remaining key bytes into split string. - if(count < 3) { - splitLen = key.size() - prefixLen; - suffixLen = 0; - } - else { - // There are 2 children - // Avoid using the suffix at all if the remainder is small enough. - splitLen = key.size() - prefixLen; - if(splitLen < SERVER_KNOBS->PREFIX_TREE_IMMEDIATE_KEY_SIZE_LIMIT) { - suffixLen = 0; - } - else { - // Remainder of the key was not small enough to put entirely before the left child, so find the actual required to make the branch decision - const StringRef &prevKey = begin[mid - 1].key; - splitLen = commonPrefixLength(key.begin(), prevKey.begin(), std::min(key.size(), prevKey.size())) + 1 - prefixLen; - - // Put at least the minimum immediate byte count in the split key (before the left child) - if(splitLen < SERVER_KNOBS->PREFIX_TREE_IMMEDIATE_KEY_SIZE_MIN) - splitLen = std::min(key.size() - prefixLen, SERVER_KNOBS->PREFIX_TREE_IMMEDIATE_KEY_SIZE_MIN); - - suffixLen = key.size() - splitLen - prefixLen; - } - } - - // We now know enough about the fields present and their lengths to set the flag bits and write a header - // If any int is more than 8 bits then use large ints - bool large = prefixLen > 255 || splitLen > 255 || suffixLen > 255 || val.size() > 255; - root.flags = large ? Node::USE_LARGE_LENGTHS : 0; - - if(prefixSourceNext) - root.flags |= Node::PREFIX_SOURCE_NEXT; - - union { - uint8_t *p8; - uint16_t *p16; - }; - p8 = &root.flags + 1; - - if(large) - *p16++ = prefixLen; - else - *p8++ = prefixLen; - - if(splitLen > 0) { - root.flags |= Node::HAS_SPLIT; - if(large) - *p16++ = splitLen; - else - *p8++ = splitLen; - } - - uint16_t *pLeftLen = p16; - if(count > 1) { - ++p16; - } - - if(suffixLen > 0) { - root.flags |= Node::HAS_SUFFIX; - if(large) - *p16++ = suffixLen; - else - *p8++ = suffixLen; - } - - if(val.size() > 0) { - root.flags |= Node::HAS_VALUE; - if(large) - *p16++ = val.size(); - else - *p8++ = val.size(); - } - - // Header is written, now write strings and children in order. - const uint8_t *keyPtr = key.begin() + prefixLen; - - // Serialize split bytes - if(splitLen > 0) { - memcpy(p8, keyPtr, splitLen); - p8 += splitLen; - keyPtr += splitLen; - } - - // Serialize left child - if(count > 1) { - root.flags |= Node::HAS_LEFT_CHILD; - int leftLen = build(*(Node *)(p8), begin, begin + mid, key, prevAncestor); - *pLeftLen = leftLen; - p8 += leftLen; - } - - // Serialize suffix bytes - if(suffixLen > 0) { - memcpy(p8, keyPtr, suffixLen); - p8 += suffixLen; - } - - // Serialize value bytes - if(val.size() > 0) { - memcpy(p8, val.begin(), val.size()); - p8 += val.size(); - } - - // Serialize right child - if(count > 2) { - root.flags |= Node::HAS_RIGHT_CHILD; - int rightLen = build(*(Node *)(p8), begin + mid + 1, end, nextAncestor, key); - p8 += rightLen; - } - -/* -printf("\nBuilt: key '%s' c %d p %d spl %d suf %d\nRaw: %s\n", key.toString().c_str(), count, prefixLen, splitLen, suffixLen, StringRef(&root.flags, p8 - &root.flags).toHexString(20).c_str()); -Node::Parser p(&root); -printf("parser: headerLen %d prefixLen %d leftPos %d rightPos %d split %s suffix %s val %s\n", - p.headerLen, p.prefixLen, p.leftPos, p.rightPos, p.splitString().toString().c_str(), p.suffixString().toString().c_str(), p.valueString().toString().c_str()); -*/ - return p8 - (uint8_t *)&root; - } -}; diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 22ca40784e..b4facd88f2 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -29,8 +29,6 @@ #include "fdbrpc/IAsyncFile.h" #include "fdbrpc/crc32c.h" #include "flow/ActorCollection.h" -#include "fdbserver/MemoryPager.h" -#include "fdbserver/IndirectShadowPager.h" #include #include #include "fdbclient/CommitTransaction.h" @@ -271,8 +269,6 @@ public: #pragma pack(push, 1) struct RawPage { - static constexpr int FORMAT_VERSION = 1; - uint16_t formatVersion; LogicalPageID nextPageID; uint16_t nextOffset; uint16_t endOffset; @@ -307,7 +303,6 @@ public: debug_printf("FIFOQueue::Cursor(%s) loadPage\n", toString().c_str()); return map(queue->pager->readPage(pageID, true), [=](Reference p) { page = p; - ASSERT(raw()->formatVersion == RawPage::FORMAT_VERSION); debug_printf("FIFOQueue::Cursor(%s) loadPage done\n", toString().c_str()); return Void(); }); @@ -347,7 +342,6 @@ public: page = queue->pager->newPageBuffer(); setNext(0, 0); auto p = raw(); - p->formatVersion = RawPage::FORMAT_VERSION; ASSERT(newOffset == 0); p->endOffset = 0; } @@ -738,15 +732,22 @@ private: // Future onEvictable() const; // ready when entry can be evicted // indicating if it is safe to evict. template -class ObjectCache { +class ObjectCache : NonCopyable { struct Entry : public boost::intrusive::list_base_hook<> { + Entry() : hits(0) { + } IndexType index; ObjectType item; + int hits; }; public: - ObjectCache(int sizeLimit = 0) : sizeLimit(sizeLimit) { + ObjectCache(int sizeLimit = 0) : sizeLimit(sizeLimit), cacheHits(0), cacheMisses(0), noHitEvictions(0) { + } + + void setSizeLimit(int n) { + sizeLimit = n; } // Get the object for i if it exists, else return nullptr. @@ -754,6 +755,7 @@ public: ObjectType * getIfExists(const IndexType &index) { auto i = cache.find(index); if(i != cache.end()) { + ++i->second.hits; return &i->second.item; } return nullptr; @@ -761,26 +763,36 @@ public: // Get the object for i or create a new one. // After a get(), the object for i is the last in evictionOrder. - ObjectType & get(const IndexType &index) { + ObjectType & get(const IndexType &index, bool noHit = false) { Entry &entry = cache[index]; // If entry is linked into evictionOrder then move it to the back of the order if(entry.is_linked()) { + if(!noHit) { + ++entry.hits; + ++cacheHits; + } // Move the entry to the back of the eviction order evictionOrder.erase(evictionOrder.iterator_to(entry)); evictionOrder.push_back(entry); } else { + ++cacheMisses; // Finish initializing entry entry.index = index; + entry.hits = noHit ? 0 : 1; // Insert the newly created Entry at the back of the eviction order evictionOrder.push_back(entry); // If the cache is too big, try to evict the first Entry in the eviction order if(cache.size() > sizeLimit) { Entry &toEvict = evictionOrder.front(); + debug_printf("Trying to evict %s to make room for %s\n", toString(toEvict.index).c_str(), toString(index).c_str()); // Don't evict the entry that was just added as then we can't return a reference to it. if(toEvict.index != index && toEvict.item.evictable()) { + if(toEvict.hits == 0) { + ++noHitEvictions; + } debug_printf("Evicting %s to make room for %s\n", toString(toEvict.index).c_str(), toString(index).c_str()); evictionOrder.pop_front(); cache.erase(toEvict.index); @@ -827,12 +839,14 @@ public: } private: - int sizeLimit; + int64_t sizeLimit; + int64_t cacheHits; + int64_t cacheMisses; + int64_t noHitEvictions; // TODO: Use boost intrusive unordered set instead, with a comparator that only considers entry.index std::unordered_map cache; boost::intrusive::list evictionOrder; - }; ACTOR template Future forwardError(Future f, Promise target) { @@ -900,7 +914,7 @@ public: // If the file already exists, pageSize might be different than desiredPageSize // Use pageCacheSizeBytes == 0 for default - DWALPager(int desiredPageSize, std::string filename, int pageCacheSizeBytes) + DWALPager(int desiredPageSize, std::string filename, int64_t pageCacheSizeBytes) : desiredPageSize(desiredPageSize), filename(filename), pHeader(nullptr), pageCacheBytes(pageCacheSizeBytes) { if(pageCacheBytes == 0) { @@ -919,8 +933,7 @@ public: if(pHeader != nullptr) { pHeader->pageSize = logicalPageSize; } - ASSERT(pageCache.count() == 0); - pageCache = PageCacheT(pageCacheBytes / physicalPageSize); + pageCache.setSizeLimit(pageCacheBytes / physicalPageSize); } void updateCommittedHeader() { @@ -985,8 +998,18 @@ public: } self->pHeader = (Header *)self->headerPage->begin(); - self->setPageSize(self->pHeader->pageSize); + if(self->pHeader->formatVersion != Header::FORMAT_VERSION) { + Error e = internal_error(); // TODO: Something better? + TraceEvent(SevError, "DWALPagerRecoveryFailedWrongVersion") + .detail("Filename", self->filename) + .detail("Version", self->pHeader->formatVersion) + .detail("ExpectedVersion", Header::FORMAT_VERSION) + .error(e); + throw e; + } + + self->setPageSize(self->pHeader->pageSize); if(self->logicalPageSize != self->desiredPageSize) { TraceEvent(SevWarn, "DWALPagerPageSizeNotDesired") .detail("Filename", self->filename) @@ -1139,8 +1162,8 @@ public: } void updatePage(LogicalPageID pageID, Reference data) override { - // Get the cache entry for this page - PageCacheEntry &cacheEntry = pageCache.get(pageID); + // Get the cache entry for this page, without counting it as a cache hit as we're replacing its contents now + PageCacheEntry &cacheEntry = pageCache.get(pageID, true); debug_printf("DWALPager(%s) op=write %s cached=%d reading=%d writing=%d\n", filename.c_str(), toString(pageID).c_str(), cacheEntry.initialized(), cacheEntry.initialized() && cacheEntry.reading(), cacheEntry.initialized() && cacheEntry.writing()); // If the page is still being read then it's not also being written because a write places @@ -1253,7 +1276,7 @@ public: } // Reads the most recent version of pageID either committed or written using updatePage() - Future> readPage(LogicalPageID pageID, bool cacheable) override { + Future> readPage(LogicalPageID pageID, bool cacheable, bool noHit = false) override { // Use cached page if present, without triggering a cache hit. // Otherwise, read the page and return it but don't add it to the cache if(!cacheable) { @@ -1268,8 +1291,8 @@ public: return forwardError(readPhysicalPage(this, (PhysicalPageID)pageID), errorPromise); } - PageCacheEntry &cacheEntry = pageCache.get(pageID); - debug_printf("DWALPager(%s) op=read %s cached=%d reading=%d writing=%d\n", filename.c_str(), toString(pageID).c_str(), cacheEntry.initialized(), cacheEntry.initialized() && cacheEntry.reading(), cacheEntry.initialized() && cacheEntry.writing()); + PageCacheEntry &cacheEntry = pageCache.get(pageID, noHit); + debug_printf("DWALPager(%s) op=read %s cached=%d reading=%d writing=%d noHit=%d\n", filename.c_str(), toString(pageID).c_str(), cacheEntry.initialized(), cacheEntry.initialized() && cacheEntry.reading(), cacheEntry.initialized() && cacheEntry.writing(), noHit); if(!cacheEntry.initialized()) { debug_printf("DWALPager(%s) issuing actual read of %s\n", filename.c_str(), toString(pageID).c_str()); @@ -1281,7 +1304,7 @@ public: return cacheEntry.readFuture; } - Future> readPageAtVersion(LogicalPageID pageID, Version v, bool cacheable) { + Future> readPageAtVersion(LogicalPageID pageID, Version v, bool cacheable, bool noHit) { auto i = remappedPages.find(pageID); if(i != remappedPages.end()) { @@ -1296,7 +1319,7 @@ public: debug_printf("DWALPager(%s) read %s @%" PRId64 " (not remapped)\n", filename.c_str(), toString(pageID).c_str(), v); } - return readPage(pageID, cacheable); + return readPage(pageID, cacheable, noHit); } // Get snapshot as of the most recent committed version of the pager @@ -1451,7 +1474,6 @@ public: } Key getMetaKey() const override { - ASSERT(recoverFuture.isReady()); return pHeader->getMetaKey(); } @@ -1563,7 +1585,7 @@ private: #pragma pack(push, 1) // Header is the format of page 0 of the database struct Header { - static constexpr int FORMAT_VERSION = 1; + static constexpr int FORMAT_VERSION = 2; uint16_t formatVersion; uint32_t pageSize; int64_t pageCount; @@ -1582,7 +1604,6 @@ private: ASSERT(key.size() < (smallestPhysicalBlock - sizeof(Header))); metaKeySize = key.size(); memcpy(this + 1, key.begin(), key.size()); - ASSERT(formatVersion == FORMAT_VERSION); } int size() const { @@ -1691,11 +1712,11 @@ public: virtual ~DWALPagerSnapshot() { } - Future> getPhysicalPage(LogicalPageID pageID, bool cacheable) override { + Future> getPhysicalPage(LogicalPageID pageID, bool cacheable, bool noHit) override { if(expired.isError()) { throw expired.getError(); } - return map(pager->readPageAtVersion(pageID, version, cacheable), [=](Reference p) { + return map(pager->readPageAtVersion(pageID, version, cacheable, noHit), [=](Reference p) { return Reference(p); }); } @@ -2448,17 +2469,11 @@ struct RedwoodRecordRef { }; struct BTreePage { - - enum EPageFlags { IS_LEAF = 1}; - typedef DeltaTree BinaryTree; typedef DeltaTree ValueTree; - static constexpr int FORMAT_VERSION = 1; #pragma pack(push,1) struct { - uint16_t formatVersion; - uint8_t flags; uint8_t height; uint16_t itemCount; uint32_t kvBytes; @@ -2471,7 +2486,7 @@ struct BTreePage { } bool isLeaf() const { - return flags & IS_LEAF; + return height == 1; } BinaryTree & tree() { @@ -2488,8 +2503,8 @@ struct BTreePage { std::string toString(bool write, BTreePageID id, Version ver, const RedwoodRecordRef *lowerBound, const RedwoodRecordRef *upperBound) const { std::string r; - r += format("BTreePage op=%s %s @%" PRId64 " ptr=%p flags=0x%X count=%d kvBytes=%d\n lowerBound: %s\n upperBound: %s\n", - write ? "write" : "read", ::toString(id).c_str(), ver, this, (int)flags, (int)itemCount, (int)kvBytes, + r += format("BTreePage op=%s %s @%" PRId64 " ptr=%p height=%d count=%d kvBytes=%d\n lowerBound: %s\n upperBound: %s\n", + write ? "write" : "read", ::toString(id).c_str(), ver, this, height, (int)itemCount, (int)kvBytes, lowerBound->toString().c_str(), upperBound->toString().c_str()); try { if(itemCount > 0) { @@ -2533,8 +2548,6 @@ struct BTreePage { static void makeEmptyRoot(Reference page) { BTreePage *btpage = (BTreePage *)page->begin(); - btpage->formatVersion = BTreePage::FORMAT_VERSION; - btpage->flags = BTreePage::IS_LEAF; btpage->height = 1; btpage->kvBytes = 0; btpage->itemCount = 0; @@ -2638,7 +2651,8 @@ public: #pragma pack(push, 1) struct MetaKey { - static constexpr int FORMAT_VERSION = 1; + static constexpr int FORMAT_VERSION = 2; + // This serves as the format version for the entire tree, individual pages will not be versioned uint16_t formatVersion; uint8_t height; LazyDeleteQueueT::QueueState lazyDeleteQueue; @@ -2663,6 +2677,7 @@ public: struct Counts { Counts() { memset(this, 0, sizeof(Counts)); + startTime = g_network ? now() : 0; } void clear() { @@ -2671,6 +2686,8 @@ public: int64_t pageReads; int64_t extPageReads; + int64_t pagePreloads; + int64_t extPagePreloads; int64_t setBytes; int64_t pageWrites; int64_t extPageWrites; @@ -2681,13 +2698,22 @@ public: int64_t getRanges; int64_t commitToPage; int64_t commitToPageStart; + double startTime; std::string toString(bool clearAfter = false) { - std::string s = format("set=%" PRId64 " clear=%" PRId64 " get=%" PRId64 " getRange=%" PRId64 " commit=%" PRId64 " pageRead=%" PRId64 " extPageRead=%" PRId64 " pageWrite=%" PRId64 " extPageWrite=%" PRId64 " commitPage=%" PRId64 " commitPageStart=%" PRId64 "", - sets, clears, gets, getRanges, commits, pageReads, extPageReads, pageWrites, extPageWrites, commitToPage, commitToPageStart); + const char *labels[] = {"set", "clear", "get", "getRange", "commit", "pageReads", "extPageRead", "pagePreloads", "extPagePreloads", "pageWrite", "extPageWrite", "commitPage", "commitPageStart"}; + const int64_t values[] = {sets, clears, gets, getRanges, commits, pageReads, extPageReads, pagePreloads, extPagePreloads, pageWrites, extPageWrites, commitToPage, commitToPageStart}; + + double elapsed = now() - startTime; + std::string s; + for(int i = 0; i < sizeof(values) / sizeof(int64_t); ++i) { + s += format("%s=%" PRId64 " (%d/s) ", labels[i], values[i], int(values[i] / elapsed)); + } + if(clearAfter) { clear(); } + return s; } }; @@ -2697,11 +2723,11 @@ public: // All async opts on the btree are based on pager reads, writes, and commits, so // we can mostly forward these next few functions to the pager - virtual Future getError() { + Future getError() { return m_pager->getError(); } - virtual Future onClosed() { + Future onClosed() { return m_pager->onClosed(); } @@ -2714,24 +2740,24 @@ public: pager->close(); } - virtual void dispose() { + void dispose() { return close_impl(true); } - virtual void close() { + void close() { return close_impl(false); } - virtual KeyValueStoreType getType() NOT_IMPLEMENTED - virtual bool supportsMutation(int op) NOT_IMPLEMENTED - virtual StorageBytes getStorageBytes() { + KeyValueStoreType getType() NOT_IMPLEMENTED + bool supportsMutation(int op) NOT_IMPLEMENTED + StorageBytes getStorageBytes() { return m_pager->getStorageBytes(); } // Writes are provided in an ordered stream. // A write is considered part of (a change leading to) the version determined by the previous call to setWriteVersion() // A write shall not become durable until the following call to commit() begins, and shall be durable once the following call to commit() returns - virtual void set(KeyValueRef keyValue) { + void set(KeyValueRef keyValue) { ++counts.sets; SingleKeyMutationsByVersion &changes = insertMutationBoundary(keyValue.key)->second.startKeyMutations; @@ -2750,7 +2776,7 @@ public: } } } - virtual void clear(KeyRangeRef range) { + void clear(KeyRangeRef range) { ++counts.clears; MutationBufferT::iterator iBegin = insertMutationBoundary(range.begin); MutationBufferT::iterator iEnd = insertMutationBoundary(range.end); @@ -2782,17 +2808,17 @@ public: } } - virtual void mutate(int op, StringRef param1, StringRef param2) NOT_IMPLEMENTED + void mutate(int op, StringRef param1, StringRef param2) NOT_IMPLEMENTED - virtual void setOldestVersion(Version v) { + void setOldestVersion(Version v) { m_newOldestVersion = v; } - virtual Version getOldestVersion() { + Version getOldestVersion() { return m_pager->getOldestVersion(); } - virtual Version getLatestVersion() { + Version getLatestVersion() { if(m_writeVersion != invalidVersion) return m_writeVersion; return m_pager->getLatestVersion(); @@ -2931,12 +2957,7 @@ public: m_latestCommit.cancel(); } - // readAtVersion() may only be called on a committed v which has previously been passed to setWriteVersion() and never previously passed - // to setOldestVersion. The returned results when violating this precondition are unspecified; the store is not required to be able to detect violations. - // The returned read cursor provides a consistent snapshot of the versioned store, corresponding to all the writes done with write versions less - // than or equal to the given version. - // v must be a committed version. - virtual Reference readAtVersion(Version v) { + Reference readAtVersion(Version v) { // Only committed versions can be read. Version recordVersion = singleVersion ? 0 : v; ASSERT(v <= m_lastCommittedVersion); @@ -2944,13 +2965,15 @@ public: ASSERT(v == m_lastCommittedVersion); } Reference snapshot = m_pager->getReadSnapshot(v); - Key m = snapshot->getMetaKey(); + + // Snapshot will continue to hold the metakey value memory + KeyRef m = snapshot->getMetaKey(); return Reference(new Cursor(snapshot, ((MetaKey *)m.begin())->root.get(), recordVersion)); } // Must be nondecreasing - virtual void setWriteVersion(Version v) { + void setWriteVersion(Version v) { ASSERT(v > m_lastCommittedVersion); // If there was no current mutation buffer, create one in the buffer map and update m_pBuffer if(m_pBuffer == nullptr) { @@ -2972,7 +2995,7 @@ public: m_writeVersion = v; } - virtual Future commit() { + Future commit() { if(m_pBuffer == nullptr) return m_latestCommit; return commit_impl(this); @@ -3334,7 +3357,7 @@ private: } // Writes entries to 1 or more pages and return a vector of boundary keys with their IPage(s) - ACTOR static Future>> writePages(VersionedBTree *self, bool minimalBoundaries, const RedwoodRecordRef *lowerBound, const RedwoodRecordRef *upperBound, VectorRef entries, uint8_t newFlags, int height, Version v, BTreePageID previousID) { + ACTOR static Future>> writePages(VersionedBTree *self, bool minimalBoundaries, const RedwoodRecordRef *lowerBound, const RedwoodRecordRef *upperBound, VectorRef entries, int height, Version v, BTreePageID previousID) { ASSERT(entries.size() > 0); state Standalone> records; @@ -3450,8 +3473,6 @@ private: btPage = (BTreePage *)new uint8_t[size]; } - btPage->formatVersion = BTreePage::FORMAT_VERSION; - btPage->flags = newFlags; btPage->height = height; btPage->kvBytes = kvBytes; btPage->itemCount = i - start; @@ -3544,7 +3565,7 @@ private: // While there are multiple child pages for this version we must write new tree levels. while(records.size() > 1) { self->m_header.height = ++height; - Standalone> newRecords = wait(writePages(self, false, &dbBegin, &dbEnd, records, 0, height, version, BTreePageID())); + Standalone> newRecords = wait(writePages(self, false, &dbBegin, &dbEnd, records, height, version, BTreePageID())); debug_printf("Wrote a new root level at version %" PRId64 " height %d size %lu pages\n", version, height, newRecords.size()); records = newRecords; } @@ -3552,7 +3573,7 @@ private: return records; } - class SuperPage : public IPage, ReferenceCounted { + class SuperPage : public IPage, ReferenceCounted, public FastAllocated{ public: SuperPage(std::vector> pages) { int blockSize = pages.front()->size(); @@ -3570,23 +3591,23 @@ private: delete [] m_data; } - virtual void addref() const { + void addref() const { ReferenceCounted::addref(); } - virtual void delref() const { + void delref() const { ReferenceCounted::delref(); } - virtual int size() const { + int size() const { return m_size; } - virtual uint8_t const* begin() const { + uint8_t const* begin() const { return m_data; } - virtual uint8_t* mutate() { + uint8_t* mutate() { return m_data; } @@ -3609,14 +3630,15 @@ private: ++counts.pageReads; if(id.size() == 1) { - wait(store(page, snapshot->getPhysicalPage(id.front(), !forLazyDelete))); + Reference p = wait(snapshot->getPhysicalPage(id.front(), !forLazyDelete, false)); + page = p; } else { ASSERT(!id.empty()); counts.extPageReads += (id.size() - 1); std::vector>> reads; for(auto &pageID : id) { - reads.push_back(snapshot->getPhysicalPage(pageID, !forLazyDelete)); + reads.push_back(snapshot->getPhysicalPage(pageID, !forLazyDelete, false)); } std::vector> pages = wait(getAll(reads)); // TODO: Cache reconstituted super pages somehow, perhaps with help from the Pager. @@ -3625,7 +3647,6 @@ private: debug_printf("readPage() op=readComplete %s @%" PRId64 " \n", toString(id).c_str(), snapshot->getVersion()); const BTreePage *pTreePage = (const BTreePage *)page->begin(); - ASSERT(pTreePage->formatVersion == BTreePage::FORMAT_VERSION); if(!forLazyDelete && page->userData == nullptr) { debug_printf("readPage() Creating Reader for %s @%" PRId64 " lower=%s upper=%s\n", toString(id).c_str(), snapshot->getVersion(), lowerBound->toString().c_str(), upperBound->toString().c_str()); @@ -3640,6 +3661,15 @@ private: return page; } + static void preLoadPage(IPagerSnapshot *snapshot, BTreePageID id) { + ++counts.pagePreloads; + counts.extPagePreloads += (id.size() - 1); + + for(auto pageID : id) { + snapshot->getPhysicalPage(pageID, true, true); + } + } + void freeBtreePage(BTreePageID btPageID, Version v) { // Free individual pages at v for(LogicalPageID id : btPageID) { @@ -3778,6 +3808,7 @@ private: self->counts.commitToPage++; state Reference rawPage = wait(readPage(snapshot, rootID, decodeLowerBound, decodeUpperBound)); state BTreePage *page = (BTreePage *) rawPage->begin(); + ASSERT(isLeaf == page->isLeaf()); debug_printf("%s commitSubtree(): %s\n", context.c_str(), page->toString(false, rootID, snapshot->getVersion(), decodeLowerBound, decodeUpperBound).c_str()); state BTreePage::BinaryTree::Cursor cursor = getReader(rawPage)->getCursor(); @@ -3786,8 +3817,7 @@ private: state Version writeVersion; // Leaf Page - if(page->flags & BTreePage::IS_LEAF) { - ASSERT(isLeaf); + if(isLeaf) { state Standalone> merged; debug_printf("%s Leaf page, merging changes.\n", context.c_str()); @@ -3958,7 +3988,7 @@ private: return results; } - state Standalone> entries = wait(writePages(self, true, lowerBound, upperBound, merged, BTreePage::IS_LEAF, page->height, writeVersion, rootID)); + state Standalone> entries = wait(writePages(self, true, lowerBound, upperBound, merged, page->height, writeVersion, rootID)); results.arena().dependsOn(entries.arena()); results.push_back(results.arena(), VersionAndChildrenRef(writeVersion, entries, *upperBound)); debug_printf("%s Merge complete, returning %s\n", context.c_str(), toString(results).c_str()); @@ -4084,7 +4114,7 @@ private: ASSERT(pageBuilder.lastUpperBound == *upperBound); - Standalone> childEntries = wait(holdWhile(pageBuilder.entries, writePages(self, false, lowerBound, upperBound, pageBuilder.entries, 0, page->height, writeVersion, rootID))); + Standalone> childEntries = wait(holdWhile(pageBuilder.entries, writePages(self, false, lowerBound, upperBound, pageBuilder.entries, page->height, writeVersion, rootID))); results.arena().dependsOn(childEntries.arena()); results.push_back(results.arena(), VersionAndChildrenRef(0, childEntries, *upperBound)); @@ -4218,23 +4248,39 @@ private: return Reference(new PageCursor(*this)); } + const BTreePage * btPage() const { + return (const BTreePage *)page->begin(); + } + // Multiple InternalCursors can share a Page BTreePage::BinaryTree::Reader & getReader() const { return *(BTreePage::BinaryTree::Reader *)page->userData; } bool isLeaf() const { - const BTreePage *p = ((const BTreePage *)page->begin()); - return p->isLeaf(); + return btPage()->isLeaf(); } - Future> getChild(Reference pager) { + Future> getChild(Reference pager, int readAheadBytes = 0) { ASSERT(!isLeaf()); BTreePage::BinaryTree::Cursor next = cursor; next.moveNext(); const RedwoodRecordRef &rec = cursor.get(); BTreePageID id = rec.getChildPage(); Future> child = readPage(pager, id, &rec, &next.getOrUpperBound()); + + // Read ahead siblings at level 2 + if(readAheadBytes > 0 && btPage()->height == 2 && next.valid()) { + do { + debug_printf("preloading %s %d bytes left\n", ::toString(next.get().getChildPage()).c_str(), readAheadBytes); + // If any part of the page was already loaded then stop + if(next.get().value.present()) { + preLoadPage(pager.getPtr(), next.get().getChildPage()); + readAheadBytes -= page->size(); + } + } while(readAheadBytes > 0 && next.moveNext()); + } + return map(child, [=](Reference page) { return Reference(new PageCursor(id, page, Reference::addRef(this))); }); @@ -4324,7 +4370,7 @@ private: }); } - ACTOR Future seekLessThanOrEqual_impl(InternalCursor *self, RedwoodRecordRef query) { + ACTOR Future seekLessThanOrEqual_impl(InternalCursor *self, RedwoodRecordRef query, int prefetchBytes) { Future f = self->moveToRoot(); // f will almost always be ready @@ -4351,7 +4397,7 @@ private: return true; } - Reference child = wait(self->pageCursor->getChild(self->pager)); + Reference child = wait(self->pageCursor->getChild(self->pager, prefetchBytes)); self->pageCursor = child; } else { @@ -4362,8 +4408,8 @@ private: } } - Future seekLTE(RedwoodRecordRef query) { - return seekLessThanOrEqual_impl(this, query); + Future seekLTE(RedwoodRecordRef query, int prefetchBytes) { + return seekLessThanOrEqual_impl(this, query, prefetchBytes); } ACTOR Future move_impl(InternalCursor *self, bool forward) { @@ -4416,13 +4462,6 @@ private: return move_impl(this, forward); } - Future moveNext() { - return move_impl(this, true); - } - Future movePrev() { - return move_impl(this, false); - } - // Move to the first or last record of the database. ACTOR Future move_end(InternalCursor *self, bool begin) { Future f = self->moveToRoot(); @@ -4500,36 +4539,56 @@ private: Optional m_kv; public: - virtual Future findEqual(KeyRef key) { return find_impl(this, key, true, 0); } - virtual Future findFirstEqualOrGreater(KeyRef key, bool needValue, int prefetchNextBytes) { return find_impl(this, key, needValue, 1); } - virtual Future findLastLessOrEqual(KeyRef key, bool needValue, int prefetchPriorBytes) { return find_impl(this, key, needValue, -1); } + Future findEqual(KeyRef key) override { + return find_impl(this, key, 0); + } + Future findFirstEqualOrGreater(KeyRef key, int prefetchBytes) override { + return find_impl(this, key, 1, prefetchBytes); + } + Future findLastLessOrEqual(KeyRef key, int prefetchBytes) override { + return find_impl(this, key, -1, prefetchBytes); + } - virtual Future next(bool needValue) { return move(this, true, needValue); } - virtual Future prev(bool needValue) { return move(this, false, needValue); } + Future next() override { + return move(this, true); + } + Future prev() override { + return move(this, false); + } - virtual bool isValid() { + bool isValid() override { return m_kv.present(); } - virtual KeyRef getKey() { + KeyRef getKey() override { return m_kv.get().key; } - virtual ValueRef getValue() { + ValueRef getValue() override { return m_kv.get().value; } - std::string toString() const { + std::string toString(bool includePaths = false) const { std::string r; r += format("Cursor(%p) ver: %" PRId64 " ", this, m_version); if(m_kv.present()) { - r += format(" KV: '%s' -> '%s'\n", m_kv.get().key.printable().c_str(), m_kv.get().value.printable().c_str()); + r += format(" KV: '%s' -> '%s'", m_kv.get().key.printable().c_str(), m_kv.get().value.printable().c_str()); } else { - r += " KV: \n"; + r += " KV: "; + } + if(includePaths) { + r += format("\n Cur1: %s", m_cur1.toString().c_str()); + r += format("\n Cur2: %s", m_cur2.toString().c_str()); + } + else { + if(m_cur1.valid()) { + r += format("\n Cur1: %s", m_cur1.get().toString().c_str()); + } + if(m_cur2.valid()) { + r += format("\n Cur2: %s", m_cur2.get().toString().c_str()); + } } - r += format(" Cur1: %s\n", m_cur1.toString().c_str()); - r += format(" Cur2: %s\n", m_cur2.toString().c_str()); return r; } @@ -4539,12 +4598,12 @@ private: // for less than or equal use cmp < 0 // for greater than or equal use cmp > 0 // for equal use cmp == 0 - ACTOR static Future find_impl(Cursor *self, KeyRef key, bool needValue, int cmp) { + ACTOR static Future find_impl(Cursor *self, KeyRef key, int cmp, int prefetchBytes = 0) { // Search for the last key at or before (key, version, \xff) state RedwoodRecordRef query(key, self->m_version, {}, 0, std::numeric_limits::max()); self->m_kv.reset(); - wait(success(self->m_cur1.seekLTE(query))); + wait(success(self->m_cur1.seekLTE(query, prefetchBytes))); debug_printf("find%sE(%s): %s\n", cmp > 0 ? "GT" : (cmp == 0 ? "" : "LT"), query.toString().c_str(), self->toString().c_str()); // If we found the target key with a present value then return it as it is valid for any cmp type @@ -4587,7 +4646,7 @@ private: } // Get the next present key at the target version. Handles invalid cursor too. - wait(self->next(needValue)); + wait(self->next()); } else if(cmp < 0) { // Mode is <=, which is the same as the seekLTE(query) @@ -4597,15 +4656,14 @@ private: } // Move to previous present kv pair at the target version - wait(self->prev(needValue)); + wait(self->prev()); } return Void(); } - // TODO: use needValue - ACTOR static Future move(Cursor *self, bool fwd, bool needValue) { - debug_printf("Cursor::move(%d): Cursor = %s\n", fwd, self->toString().c_str()); + ACTOR static Future move(Cursor *self, bool fwd) { + debug_printf("Cursor::move(%d): Start %s\n", fwd, self->toString().c_str()); ASSERT(self->m_cur1.valid()); // If kv is present then the key/version at cur1 was already returned so move to a new key @@ -4614,6 +4672,7 @@ private: ASSERT(self->m_cur1.valid()); loop { self->m_cur2 = self->m_cur1; + debug_printf("Cursor::move(%d): Advancing cur1 %s\n", fwd, self->toString().c_str()); bool valid = wait(self->m_cur1.move(fwd)); if(!valid || self->m_cur1.get().key != self->m_cur2.get().key) { break; @@ -4632,6 +4691,7 @@ private: // TODO: This may already be the case, store state to track this condition and avoid the reset here if(self->m_cur1.valid()) { self->m_cur2 = self->m_cur1; + debug_printf("Cursor::move(%d): Advancing cur2 %s\n", fwd, self->toString().c_str()); wait(success(self->m_cur2.move(true))); } @@ -4648,13 +4708,13 @@ private: if(fwd) { // Moving forward, move cur2 forward and keep cur1 pointing to the prior (predecessor) record - debug_printf("Cursor::move(%d): Moving forward, Cursor = %s\n", fwd, self->toString().c_str()); + debug_printf("Cursor::move(%d): Moving forward %s\n", fwd, self->toString().c_str()); self->m_cur1 = self->m_cur2; wait(success(self->m_cur2.move(true))); } else { // Moving backward, move cur1 backward and keep cur2 pointing to the prior (successor) record - debug_printf("Cursor::move(%d): Moving backward, Cursor = %s\n", fwd, self->toString().c_str()); + debug_printf("Cursor::move(%d): Moving backward %s\n", fwd, self->toString().c_str()); self->m_cur2 = self->m_cur1; wait(success(self->m_cur1.move(false))); } @@ -4726,7 +4786,7 @@ public: m_init = catchError(init_impl(this)); } - virtual Future init() { + Future init() { return m_init; } @@ -4756,15 +4816,15 @@ public: delete self; } - virtual void close() { + void close() { shutdown(this, false); } - virtual void dispose() { + void dispose() { shutdown(this, true); } - virtual Future< Void > onClosed() { + Future< Void > onClosed() { return m_closed.getFuture(); } @@ -4775,15 +4835,15 @@ public: return catchError(c); } - virtual KeyValueStoreType getType() { + KeyValueStoreType getType() { return KeyValueStoreType::SSD_REDWOOD_V1; } - virtual StorageBytes getStorageBytes() { + StorageBytes getStorageBytes() { return m_tree->getStorageBytes(); } - virtual Future< Void > getError() { + Future< Void > getError() { return delayed(m_error.getFuture()); }; @@ -4792,12 +4852,12 @@ public: m_tree->clear(range); } - virtual void set( KeyValueRef keyValue, const Arena* arena = NULL ) { + void set( KeyValueRef keyValue, const Arena* arena = NULL ) { debug_printf("SET %s\n", keyValue.key.printable().c_str()); m_tree->set(keyValue); } - virtual Future< Standalone< VectorRef< KeyValueRef > > > readRange(KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30) { + Future< Standalone< VectorRef< KeyValueRef > > > readRange(KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30) { debug_printf("READRANGE %s\n", printable(keys).c_str()); return catchError(readRange_impl(this, keys, rowLimit, byteLimit)); } @@ -4809,9 +4869,11 @@ public: ASSERT( byteLimit > 0 ); state Reference cur = self->m_tree->readAtVersion(self->m_tree->getLastCommittedVersion()); + // Prefetch is currently only done in the forward direction + state int prefetchBytes = rowLimit > 1 ? byteLimit : 0; if(rowLimit >= 0) { - wait(cur->findFirstEqualOrGreater(keys.begin, true, 0)); + wait(cur->findFirstEqualOrGreater(keys.begin, prefetchBytes)); while(cur->isValid() && cur->getKey() < keys.end) { KeyValueRef kv(KeyRef(result.arena(), cur->getKey()), ValueRef(result.arena(), cur->getValue())); accumulatedBytes += kv.expectedSize(); @@ -4819,12 +4881,12 @@ public: if(--rowLimit == 0 || accumulatedBytes >= byteLimit) { break; } - wait(cur->next(true)); + wait(cur->next()); } } else { - wait(cur->findLastLessOrEqual(keys.end, true, 0)); + wait(cur->findLastLessOrEqual(keys.end)); if(cur->isValid() && cur->getKey() == keys.end) - wait(cur->prev(true)); + wait(cur->prev()); while(cur->isValid() && cur->getKey() >= keys.begin) { KeyValueRef kv(KeyRef(result.arena(), cur->getKey()), ValueRef(result.arena(), cur->getValue())); @@ -4833,7 +4895,7 @@ public: if(++rowLimit == 0 || accumulatedBytes >= byteLimit) { break; } - wait(cur->prev(true)); + wait(cur->prev()); } } return result; @@ -4850,7 +4912,7 @@ public: return Optional(); } - virtual Future< Optional< Value > > readValue(KeyRef key, Optional< UID > debugID = Optional()) { + Future< Optional< Value > > readValue(KeyRef key, Optional< UID > debugID = Optional()) { return catchError(readValue_impl(this, key, debugID)); } @@ -4867,7 +4929,7 @@ public: return Optional(); } - virtual Future< Optional< Value > > readValuePrefix(KeyRef key, int maxLength, Optional< UID > debugID = Optional()) { + Future< Optional< Value > > readValuePrefix(KeyRef key, int maxLength, Optional< UID > debugID = Optional()) { return catchError(readValuePrefix_impl(this, key, maxLength, debugID)); } @@ -4945,11 +5007,11 @@ ACTOR Future verifyRange(VersionedBTree *btree, Key start, Key end, Version if(deterministicRandom()->coinflip()) { state Key randomKey = randomKV().key; debug_printf("VerifyRange(@%" PRId64 ", %s, %s): Dummy seek to '%s'\n", v, start.toString().c_str(), end.toString().c_str(), randomKey.toString().c_str()); - wait(deterministicRandom()->coinflip() ? cur->findFirstEqualOrGreater(randomKey, true, 0) : cur->findLastLessOrEqual(randomKey, true, 0)); + wait(deterministicRandom()->coinflip() ? cur->findFirstEqualOrGreater(randomKey) : cur->findLastLessOrEqual(randomKey)); } debug_printf("VerifyRange(@%" PRId64 ", %s, %s): Actual seek\n", v, start.toString().c_str(), end.toString().c_str()); - wait(cur->findFirstEqualOrGreater(start, true, 0)); + wait(cur->findFirstEqualOrGreater(start)); state std::vector results; @@ -4997,7 +5059,7 @@ ACTOR Future verifyRange(VersionedBTree *btree, Key start, Key end, Version ASSERT(errors == 0); results.push_back(KeyValue(KeyValueRef(cur->getKey(), cur->getValue()))); - wait(cur->next(true)); + wait(cur->next()); } // Make sure there are no further written kv pairs that would be present at this version. @@ -5031,9 +5093,9 @@ ACTOR Future verifyRange(VersionedBTree *btree, Key start, Key end, Version } // Now read the range from the tree in reverse order and compare to the saved results - wait(cur->findLastLessOrEqual(end, true, 0)); + wait(cur->findLastLessOrEqual(end)); if(cur->isValid() && cur->getKey() == end) - wait(cur->prev(true)); + wait(cur->prev()); state std::vector::const_reverse_iterator r = results.rbegin(); @@ -5059,7 +5121,7 @@ ACTOR Future verifyRange(VersionedBTree *btree, Key start, Key end, Version } ++r; - wait(cur->prev(true)); + wait(cur->prev()); } if(r != results.rend()) { @@ -5174,10 +5236,10 @@ ACTOR Future randomReader(VersionedBTree *btree) { } state KeyValue kv = randomKV(10, 0); - wait(cur->findFirstEqualOrGreater(kv.key, true, 0)); + wait(cur->findFirstEqualOrGreater(kv.key)); state int c = deterministicRandom()->randomInt(0, 100); while(cur->isValid() && c-- > 0) { - wait(success(cur->next(true))); + wait(success(cur->next())); wait(yield()); } } @@ -5705,7 +5767,7 @@ TEST_CASE("!/redwood/correctness/btree") { state double clearPostSetProbability = deterministicRandom()->random01() * .1; state double coldStartProbability = deterministicRandom()->random01(); state double advanceOldVersionProbability = deterministicRandom()->random01(); - state double maxWallClockDuration = 60; + state double maxDuration = 60; printf("\n"); printf("serialTest: %d\n", serialTest); @@ -5726,7 +5788,7 @@ TEST_CASE("!/redwood/correctness/btree") { deleteFile(pagerFile); printf("Initializing...\n"); - state double startTime = timer(); + state double startTime = now(); pager = new DWALPager(pageSize, pagerFile, 0); state VersionedBTree *btree = new VersionedBTree(pager, pagerFile, singleVersion); wait(btree->init()); @@ -5756,7 +5818,7 @@ TEST_CASE("!/redwood/correctness/btree") { state Future commit = Void(); - while(mutationBytes.get() < mutationBytesTarget && (timer() - startTime) < maxWallClockDuration) { + while(mutationBytes.get() < mutationBytesTarget && (now() - startTime) < maxDuration) { if(now() - startTime > 600) { mutationBytesTarget = mutationBytes.get(); } @@ -5972,9 +6034,8 @@ ACTOR Future randomSeeks(VersionedBTree *btree, int count, char firstChar, printf("Executing %d random seeks\n", count); state Reference cur = btree->readAtVersion(readVer); while(c < count) { - wait(yield()); state Key k = randomString(20, firstChar, lastChar); - wait(success(cur->findFirstEqualOrGreater(k, false, 0))); + wait(success(cur->findFirstEqualOrGreater(k))); ++c; } double elapsed = timer() - readStart; @@ -5982,6 +6043,33 @@ ACTOR Future randomSeeks(VersionedBTree *btree, int count, char firstChar, return Void(); } +ACTOR Future randomScans(VersionedBTree *btree, int count, int width, int readAhead, char firstChar, char lastChar) { + state Version readVer = btree->getLatestVersion(); + state int c = 0; + state double readStart = timer(); + printf("Executing %d random scans\n", count); + state Reference cur = btree->readAtVersion(readVer); + state bool adaptive = readAhead < 0; + state int totalScanBytes = 0; + while(c++ < count) { + state Key k = randomString(20, firstChar, lastChar); + wait(success(cur->findFirstEqualOrGreater(k, readAhead))); + if(adaptive) { + readAhead = totalScanBytes / c; + } + state int w = width; + while(w > 0 && cur->isValid()) { + totalScanBytes += cur->getKey().size(); + totalScanBytes += cur->getValue().size(); + wait(cur->next()); + --w; + } + } + double elapsed = timer() - readStart; + printf("Completed %d scans: readAhead=%d width=%d bytesRead=%d scansRate=%d/s\n", count, readAhead, width, totalScanBytes, int(count / elapsed)); + return Void(); +} + TEST_CASE("!/redwood/correctness/pager/cow") { state std::string pagerFile = "unittest_pageFile.redwood"; printf("Deleting old test data\n"); @@ -6010,26 +6098,50 @@ TEST_CASE("!/redwood/correctness/pager/cow") { } TEST_CASE("!/redwood/performance/set") { - state std::string pagerFile = "unittest_pageFile.redwood"; - printf("Deleting old test data\n"); - deleteFile(pagerFile); + state SignalableActorCollection actors; + VersionedBTree::counts.clear(); - int pageSize = 4096; - IPager2 *pager = new DWALPager(pageSize, pagerFile, FLOW_KNOBS->PAGE_CACHE_4K / pageSize); + // If a test file is passed in by environment then don't write new data to it. + state bool reload = getenv("TESTFILE") == nullptr; + state std::string pagerFile = reload ? "unittest.redwood" : getenv("TESTFILE"); + + if(reload) { + printf("Deleting old test data\n"); + deleteFile(pagerFile); + } + + state int pageSize = 4096; + state int64_t pageCacheBytes = FLOW_KNOBS->PAGE_CACHE_4K; + DWALPager *pager = new DWALPager(pageSize, pagerFile, pageCacheBytes); state bool singleVersion = true; state VersionedBTree *btree = new VersionedBTree(pager, pagerFile, singleVersion); wait(btree->init()); state int nodeCount = 1e9; state int maxChangesPerVersion = 5000; - state int64_t kvBytesTarget = 4000e6; + state int64_t kvBytesTarget = 4e9; state int commitTarget = 20e6; - state int maxKeyPrefixSize = 25; + state int minKeyPrefixBytes = 0; + state int maxKeyPrefixBytes = 25; + state int minValueSize = 0; state int maxValueSize = 500; state int maxConsecutiveRun = 10; - state int minValueSize = 0; state char firstKeyChar = 'a'; state char lastKeyChar = 'b'; + + printf("pageSize: %d\n", pageSize); + printf("pageCacheBytes: %" PRId64 "\n", pageCacheBytes); + printf("trailingIntegerIndexRange: %d\n", nodeCount); + printf("maxChangesPerVersion: %d\n", maxChangesPerVersion); + printf("minKeyPrefixBytes: %d\n", minKeyPrefixBytes); + printf("maxKeyPrefixBytes: %d\n", maxKeyPrefixBytes); + printf("maxConsecutiveRun: %d\n", maxConsecutiveRun); + printf("minValueSize: %d\n", minValueSize); + printf("maxValueSize: %d\n", maxValueSize); + printf("commitTarget: %d\n", commitTarget); + printf("kvBytesTarget: %" PRId64 "\n", kvBytesTarget); + printf("KeyLexicon '%c' to '%c'\n", firstKeyChar, lastKeyChar); + state int64_t kvBytes = 0; state int64_t kvBytesTotal = 0; state int records = 0; @@ -6040,65 +6152,110 @@ TEST_CASE("!/redwood/performance/set") { state double intervalStart = timer(); state double start = intervalStart; - while(kvBytesTotal < kvBytesTarget) { - wait(yield()); + if(reload) { + while(kvBytesTotal < kvBytesTarget) { + wait(yield()); - Version lastVer = btree->getLatestVersion(); - state Version version = lastVer + 1; - btree->setWriteVersion(version); - int changes = deterministicRandom()->randomInt(0, maxChangesPerVersion); + Version lastVer = btree->getLatestVersion(); + state Version version = lastVer + 1; + btree->setWriteVersion(version); + int changes = deterministicRandom()->randomInt(0, maxChangesPerVersion); - while(changes > 0 && kvBytes < commitTarget) { - KeyValue kv; - kv.key = randomString(kv.arena(), deterministicRandom()->randomInt(sizeof(uint32_t), maxKeyPrefixSize + sizeof(uint32_t) + 1), firstKeyChar, lastKeyChar); - int32_t index = deterministicRandom()->randomInt(0, nodeCount); - int runLength = deterministicRandom()->randomInt(1, maxConsecutiveRun + 1); + while(changes > 0 && kvBytes < commitTarget) { + KeyValue kv; + kv.key = randomString(kv.arena(), deterministicRandom()->randomInt(minKeyPrefixBytes + sizeof(uint32_t), maxKeyPrefixBytes + sizeof(uint32_t) + 1), firstKeyChar, lastKeyChar); + int32_t index = deterministicRandom()->randomInt(0, nodeCount); + int runLength = deterministicRandom()->randomInt(1, maxConsecutiveRun + 1); - while(runLength > 0 && changes > 0) { - *(uint32_t *)(kv.key.end() - sizeof(uint32_t)) = bigEndian32(index++); - kv.value = StringRef((uint8_t *)value.data(), deterministicRandom()->randomInt(minValueSize, maxValueSize + 1)); + while(runLength > 0 && changes > 0) { + *(uint32_t *)(kv.key.end() - sizeof(uint32_t)) = bigEndian32(index++); + kv.value = StringRef((uint8_t *)value.data(), deterministicRandom()->randomInt(minValueSize, maxValueSize + 1)); - btree->set(kv); + btree->set(kv); - --runLength; - --changes; - kvBytes += kv.key.size() + kv.value.size(); - ++records; + --runLength; + --changes; + kvBytes += kv.key.size() + kv.value.size(); + ++records; + } + } + + if(kvBytes >= commitTarget) { + btree->setOldestVersion(btree->getLastCommittedVersion()); + wait(commit); + printf("Cumulative %.2f MB keyValue bytes written at %.2f MB/s\n", kvBytesTotal / 1e6, kvBytesTotal / (timer() - start) / 1e6); + + // Avoid capturing via this to freeze counter values + int recs = records; + int kvb = kvBytes; + + // Capturing invervalStart via this->intervalStart makes IDE's unhappy as they do not know about the actor state object + double *pIntervalStart = &intervalStart; + + commit = map(btree->commit(), [=](Void result) { + printf("Committed: %s\n", VersionedBTree::counts.toString(true).c_str()); + double elapsed = timer() - *pIntervalStart; + printf("Committed %d kvBytes in %d records in %f seconds, %.2f MB/s\n", kvb, recs, elapsed, kvb / elapsed / 1e6); + *pIntervalStart = timer(); + return Void(); + }); + + kvBytesTotal += kvBytes; + kvBytes = 0; + records = 0; } } - if(kvBytes >= commitTarget) { - btree->setOldestVersion(btree->getLastCommittedVersion()); - wait(commit); - printf("Cumulative %.2f MB keyValue bytes written at %.2f MB/s\n", kvBytesTotal / 1e6, kvBytesTotal / (timer() - start) / 1e6); - - // Avoid capturing via this to freeze counter values - int recs = records; - int kvb = kvBytes; - - // Capturing invervalStart via this->intervalStart makes IDE's unhappy as they do not know about the actor state object - double *pIntervalStart = &intervalStart; - - commit = map(btree->commit(), [=](Void result) { - printf("Committed: %s\n", VersionedBTree::counts.toString(true).c_str()); - double elapsed = timer() - *pIntervalStart; - printf("Committed %d kvBytes in %d records in %f seconds, %.2f MB/s\n", kvb, recs, elapsed, kvb / elapsed / 1e6); - *pIntervalStart = timer(); - return Void(); - }); - - kvBytesTotal += kvBytes; - kvBytes = 0; - records = 0; - } + wait(commit); + printf("Cumulative %.2f MB keyValue bytes written at %.2f MB/s\n", kvBytesTotal / 1e6, kvBytesTotal / (timer() - start) / 1e6); } - wait(commit); - printf("Cumulative %.2f MB keyValue bytes written at %.2f MB/s\n", kvBytesTotal / 1e6, kvBytesTotal / (timer() - start) / 1e6); + int seeks = 1e6; + printf("Warming cache with seeks\n"); + actors.add(randomSeeks(btree, seeks/3, firstKeyChar, lastKeyChar)); + actors.add(randomSeeks(btree, seeks/3, firstKeyChar, lastKeyChar)); + actors.add(randomSeeks(btree, seeks/3, firstKeyChar, lastKeyChar)); + wait(actors.signalAndReset()); + printf("Stats: %s\n", VersionedBTree::counts.toString(true).c_str()); - printf("Starting random seeks\n"); - state int reads = 30000; - wait(randomSeeks(btree, reads, firstKeyChar, lastKeyChar) && randomSeeks(btree, reads, firstKeyChar, lastKeyChar) && randomSeeks(btree, reads, firstKeyChar, lastKeyChar)); + state int ops = 10000; + + printf("Serial scans with adaptive readAhead...\n"); + actors.add(randomScans(btree, ops, 50, -1, firstKeyChar, lastKeyChar)); + wait(actors.signalAndReset()); + printf("Stats: %s\n", VersionedBTree::counts.toString(true).c_str()); + + printf("Serial scans with readAhead 3 pages...\n"); + actors.add(randomScans(btree, ops, 50, 12000, firstKeyChar, lastKeyChar)); + wait(actors.signalAndReset()); + printf("Stats: %s\n", VersionedBTree::counts.toString(true).c_str()); + + printf("Serial scans with readAhead 2 pages...\n"); + actors.add(randomScans(btree, ops, 50, 8000, firstKeyChar, lastKeyChar)); + wait(actors.signalAndReset()); + printf("Stats: %s\n", VersionedBTree::counts.toString(true).c_str()); + + printf("Serial scans with readAhead 1 page...\n"); + actors.add(randomScans(btree, ops, 50, 4000, firstKeyChar, lastKeyChar)); + wait(actors.signalAndReset()); + printf("Stats: %s\n", VersionedBTree::counts.toString(true).c_str()); + + printf("Serial scans...\n"); + actors.add(randomScans(btree, ops, 50, 0, firstKeyChar, lastKeyChar)); + wait(actors.signalAndReset()); + printf("Stats: %s\n", VersionedBTree::counts.toString(true).c_str()); + + printf("Serial seeks...\n"); + actors.add(randomSeeks(btree, ops, firstKeyChar, lastKeyChar)); + wait(actors.signalAndReset()); + printf("Stats: %s\n", VersionedBTree::counts.toString(true).c_str()); + + printf("Parallel seeks...\n"); + actors.add(randomSeeks(btree, ops, firstKeyChar, lastKeyChar)); + actors.add(randomSeeks(btree, ops, firstKeyChar, lastKeyChar)); + actors.add(randomSeeks(btree, ops, firstKeyChar, lastKeyChar)); + wait(actors.signalAndReset()); + printf("Stats: %s\n", VersionedBTree::counts.toString(true).c_str()); Future closedFuture = btree->onClosed(); btree->close(); diff --git a/fdbserver/fdbserver.vcxproj b/fdbserver/fdbserver.vcxproj index 01c49e6c35..1144f4c8ec 100644 --- a/fdbserver/fdbserver.vcxproj +++ b/fdbserver/fdbserver.vcxproj @@ -46,7 +46,6 @@ - @@ -57,7 +56,6 @@ - @@ -179,7 +177,6 @@ - @@ -189,7 +186,6 @@ - false false diff --git a/fdbserver/fdbserver.vcxproj.filters b/fdbserver/fdbserver.vcxproj.filters index 5e9360f8c0..c01c9e458a 100644 --- a/fdbserver/fdbserver.vcxproj.filters +++ b/fdbserver/fdbserver.vcxproj.filters @@ -274,8 +274,6 @@ workloads - - @@ -385,8 +383,6 @@ - - diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 9e4d17aa29..c16b36a1f1 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -139,6 +139,8 @@ add_fdb_test(TEST_FILES rare/LargeApiCorrectnessStatus.txt) add_fdb_test(TEST_FILES rare/RYWDisable.txt) add_fdb_test(TEST_FILES rare/RandomReadWriteTest.txt) add_fdb_test(TEST_FILES rare/SwizzledLargeApiCorrectness.txt) +add_fdb_test(TEST_FILES rare/RedwoodCorrectnessBTree.txt) + add_fdb_test( TEST_FILES restarting/ConfigureTestRestart-1.txt restarting/ConfigureTestRestart-2.txt) diff --git a/tests/rare/RedwoodCorrectnessBTree.txt b/tests/rare/RedwoodCorrectnessBTree.txt new file mode 100644 index 0000000000..3bde204032 --- /dev/null +++ b/tests/rare/RedwoodCorrectnessBTree.txt @@ -0,0 +1,6 @@ +testTitle=UnitTests +testName=UnitTests +startDelay=0 +useDB=false +maxTestCases=0 +testsMatching=!/redwood/correctness/btree