From b65ad3563ceaf3fabad2ca5ee0211422befe4d50 Mon Sep 17 00:00:00 2001 From: Stephen Atherton Date: Fri, 9 Jun 2017 14:56:41 -0700 Subject: [PATCH] Merge branch 'master' into feature-redwood # Conflicts: # fdbserver/fdbserver.vcxproj # fdbserver/fdbserver.vcxproj.filters --- fdbserver/IPager.h | 113 ++++ fdbserver/IVersionedStore.h | 76 +++ fdbserver/IndirectShadowPager.actor.cpp | 837 ++++++++++++++++++++++++ fdbserver/IndirectShadowPager.h | 192 ++++++ fdbserver/Knobs.cpp | 8 + fdbserver/Knobs.h | 8 + fdbserver/MemoryPager.actor.cpp | 343 ++++++++++ fdbserver/MemoryPager.h | 119 ++++ fdbserver/VersionedBTree.actor.cpp | 673 +++++++++++++++++++ fdbserver/fdbserver.vcxproj | 7 + fdbserver/fdbserver.vcxproj.filters | 7 + flow/ActorCollection.h | 40 ++ tests/RedwoodUnitTests.txt | 6 + 13 files changed, 2429 insertions(+) create mode 100644 fdbserver/IPager.h create mode 100644 fdbserver/IVersionedStore.h create mode 100644 fdbserver/IndirectShadowPager.actor.cpp create mode 100644 fdbserver/IndirectShadowPager.h create mode 100644 fdbserver/MemoryPager.actor.cpp create mode 100644 fdbserver/MemoryPager.h create mode 100755 fdbserver/VersionedBTree.actor.cpp create mode 100644 tests/RedwoodUnitTests.txt diff --git a/fdbserver/IPager.h b/fdbserver/IPager.h new file mode 100644 index 0000000000..28df064bdc --- /dev/null +++ b/fdbserver/IPager.h @@ -0,0 +1,113 @@ +/* + * IPager.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBSERVER_IPAGER_H +#define FDBSERVER_IPAGER_H +#pragma once + +#include "IKeyValueStore.h" + +#include "flow/flow.h" +#include "fdbclient/FDBTypes.h" + +typedef uint32_t LogicalPageID; // uint64_t? + +class IPage { +public: + virtual uint8_t const* begin() const = 0; + virtual uint8_t* mutate() = 0; + + // Must return the same size for all pages created by the same pager instance + virtual int size() const = 0; + + virtual ~IPage() {} + + virtual void addref() const = 0; + virtual void delref() const = 0; +}; + +class IPagerSnapshot { +public: + virtual Future> getPhysicalPage(LogicalPageID pageID) = 0; + virtual void invalidateReturnedPages() = 0; + + virtual ~IPagerSnapshot() {} + + virtual void addref() = 0; + virtual void delref() = 0; +}; + +class IPager : public IClosable { +public: + // Returns an IPage that can be passed to writePage. The data in the returned IPage might not be zeroed. + virtual Reference newPageBuffer() = 0; + + // Returns the usable size of pages returned by the pager (i.e. the size of the page that isn't pager overhead). + // For a given pager instance, separate calls to this function must return the same value. + virtual int getUsablePageSize() = 0; + + // Permitted to fail (ASSERT) during recovery. + virtual Reference getReadSnapshot(Version version) = 0; + + // Returns an unused LogicalPageID. + // LogicalPageIDs in the range [0, SERVER_KNOBS->PAGER_RESERVED_PAGES) do not need to be allocated. + // Permitted to fail (ASSERT) during recovery. + virtual LogicalPageID allocateLogicalPage() = 0; + + // Signals that the page will no longer be used as of the specified version. Versions prior to the specified version must be kept. + // Permitted to fail (ASSERT) during recovery. + virtual void freeLogicalPage(LogicalPageID pageID, Version version) = 0; + + // Writes a page with the given LogicalPageID at the specified version. LogicalPageIDs in the range [0, SERVER_KNOBS->PAGER_RESERVED_PAGES) + // can be written without being allocated. All other LogicalPageIDs must be allocated using allocateLogicalPage before writing them. + // + // If updateVersion is 0, we are signalling to the pager that we are reusing the LogicalPageID entry at the current latest version of pageID. + // + // Otherwise, we will add a new entry for LogicalPageID at the specified version. In that case, updateVersion must be larger than any version + // written to this page previously, and it must be larger than any version committed + // + // Permitted to fail (ASSERT) during recovery. + virtual void writePage(LogicalPageID pageID, Reference contents, Version updateVersion) = 0; + + // Signals to the pager that no more reads will be performed in the range [begin, end). + // Permitted to fail (ASSERT) during recovery. + virtual void forgetVersions(Version begin, Version end) = 0; + + // Makes durable all writes and any data structures used for recovery. + // Permitted to fail (ASSERT) during recovery. + virtual Future commit() = 0; + + // Returns the latest version of the pager. Permitted to block until recovery is complete, at which point it should always be set immediately. + // Some functions in the IPager interface are permitted to fail (ASSERT) during recovery, so users should wait for getLatestVersion to complete + // before doing anything else. + virtual Future getLatestVersion() = 0; + + // Sets the latest version of the pager. Must be monotonically increasing. + // + // Must be called prior to reading the specified version. SOMEDAY: It may be desirable in the future to relax this constraint for performance reasons. + // + // Permitted to fail (ASSERT) during recovery. + virtual void setLatestVersion(Version version) = 0; + +protected: + ~IPager() {} // Destruction should be done using close()/dispose() from the IClosable interface +}; + +#endif \ No newline at end of file diff --git a/fdbserver/IVersionedStore.h b/fdbserver/IVersionedStore.h new file mode 100644 index 0000000000..ff0ff38a36 --- /dev/null +++ b/fdbserver/IVersionedStore.h @@ -0,0 +1,76 @@ +/* + * IVersionedStore.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBSERVER_IVERSIONEDSTORE_H +#define FDBSERVER_IVERSIONEDSTORE_H +#pragma once + +#include "IKeyValueStore.h" + +#include "flow/flow.h" +#include "fdbclient/FDBTypes.h" + +class IStoreCursor { +public: + virtual Future findEqual(KeyRef key) = 0; + virtual Future findFirstGreaterOrEqual(KeyRef key, int prefetchNextBytes) = 0; + virtual Future findLastLessOrEqual(KeyRef key, int prefetchPriorBytes) = 0; + virtual Future next(bool needValue) = 0; + virtual Future prev(bool needValue) = 0; + + virtual bool isValid() = 0; + virtual KeyRef getKey() = 0; + //virtual StringRef getCompressedKey() = 0; + virtual ValueRef getValue() = 0; + + virtual void invalidateReturnedStrings() = 0; + + virtual void addref() = 0; + virtual void delref() = 0; +}; + +class IVersionedStore : public IClosable { +public: + virtual KeyValueStoreType getType() = 0; + virtual bool supportsMutation(int op) = 0; // If this returns true, then mutate(op, ...) may be called + virtual StorageBytes getStorageBytes() = 0; + + // Writes are provided in an ordered stream. + // A write is considered part of (a change leading to) the version determined by the previous call to setWriteVersion() + // A write shall not become durable until the following call to commit() begins, and shall be durable once the following call to commit() returns + virtual void set(KeyValueRef keyValue) = 0; + virtual void clear(KeyRangeRef range) = 0; + virtual void mutate(int op, StringRef param1, StringRef param2) = 0; + virtual void setWriteVersion(Version) = 0; // The write version must be nondecreasing + virtual void forgetVersions(Version begin, Version end) = 0; // Versions [begin, end) no longer readable + virtual Future commit() = 0; + + virtual Future getLatestVersion() = 0; + + // readAtVersion() may only be called on a version which has previously been passed to setWriteVersion() and never previously passed + // to forgetVersion. The returned results when violating this precondition are unspecified; the store is not required to be able to detect violations. + // The returned read cursor provides a consistent snapshot of the versioned store, corresponding to all the writes done with write versions less + // than or equal to the given version. + // If readAtVersion() is called on the *current* write version, the given read cursor MAY reflect subsequent writes at the same + // write version, OR it may represent a snapshot as of the call to readAtVersion(). + virtual Reference readAtVersion(Version) = 0; +}; + +#endif diff --git a/fdbserver/IndirectShadowPager.actor.cpp b/fdbserver/IndirectShadowPager.actor.cpp new file mode 100644 index 0000000000..598da8867f --- /dev/null +++ b/fdbserver/IndirectShadowPager.actor.cpp @@ -0,0 +1,837 @@ +/* + * IndirectShadowPager.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "IndirectShadowPager.h" +#include "Knobs.h" + +#include "flow/UnitTest.h" + +IndirectShadowPage::IndirectShadowPage() : allocated(true) { + data = (uint8_t*)FastAllocator<4096>::allocate(); +} + +IndirectShadowPage::IndirectShadowPage(uint8_t *data) : data(data), allocated(false) {} + +IndirectShadowPage::~IndirectShadowPage() { + if(allocated) { + FastAllocator<4096>::release(data); + } +} + +uint8_t const* IndirectShadowPage::begin() const { + return data; +} + +uint8_t* IndirectShadowPage::mutate() { + return data; +} + +int IndirectShadowPage::size() const { + return PAGE_BYTES - PAGE_OVERHEAD_BYTES; +} + +const int IndirectShadowPage::PAGE_BYTES = 4096; +const int IndirectShadowPage::PAGE_OVERHEAD_BYTES = 0; + +Future> IndirectShadowPagerSnapshot::getPhysicalPage(LogicalPageID pageID) { + return pager->getPage(Reference::addRef(this), pageID, version); +} + +void IndirectShadowPagerSnapshot::invalidateReturnedPages() { + arena = Arena(); +} + +template +T bigEndian(T val) { + static_assert(sizeof(T) <= 8, "Can't compute bigEndian on integers larger than 8 bytes"); + uint64_t b = bigEndian64(val); + return *(T*)((uint8_t*)&b+8-sizeof(T)); +} + +ACTOR Future recover(IndirectShadowPager *pager) { + try { + TraceEvent("PagerRecovering").detail("Basename", pager->basename); + pager->pageTableLog = openKVStore(KeyValueStoreType::MEMORY, pager->basename, UID(), 1e9); + + // TODO: this can be done synchronously with the log recovery + int64_t flags = IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_LOCK; + if(!fileExists(pager->basename + ".redwood")) { + flags |= IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE; + } + + Reference dataFile = wait(IAsyncFileSystem::filesystem()->open(pager->basename + ".redwood", flags, 0600)); + pager->dataFile = dataFile; + + TraceEvent("PagerOpenedDataFile").detail("Filename", pager->basename + ".redwood"); + + Void _ = wait(pager->dataFile->sync()); + + state int64_t fileSize = wait(pager->dataFile->size()); + TraceEvent("PagerGotFileSize").detail("Size", fileSize); + + if(fileSize > 0) { + TraceEvent("PagerRecoveringFromLogs"); + Optional pagesAllocatedValue = wait(pager->pageTableLog->readValue(IndirectShadowPager::PAGES_ALLOCATED_KEY)); + if(pagesAllocatedValue.present()) { + BinaryReader pr(pagesAllocatedValue.get(), Unversioned()); + uint32_t pagesAllocated; + pr >> pagesAllocated; + pager->pagerFile.init(fileSize, pagesAllocated); + + fprintf(stderr, "Recovered pages allocated: %d\n", pager->pagerFile.pagesAllocated); + ASSERT(pager->pagerFile.pagesAllocated != PagerFile::INVALID_PAGE); + + Optional latestVersionValue = wait(pager->pageTableLog->readValue(IndirectShadowPager::LATEST_VERSION_KEY)); + ASSERT(latestVersionValue.present()); + + BinaryReader vr(latestVersionValue.get(), Unversioned()); + vr >> pager->latestVersion; + + Optional oldestVersionValue = wait(pager->pageTableLog->readValue(IndirectShadowPager::OLDEST_VERSION_KEY)); + + if(oldestVersionValue.present()) { + BinaryReader vr(oldestVersionValue.get(), Unversioned()); + vr >> pager->oldestVersion; + } + + fprintf(stderr, "Recovered version info: %d - %d\n", pager->oldestVersion, pager->latestVersion); + pager->committedVersion = pager->latestVersion; + + Standalone> tableEntries = wait(pager->pageTableLog->readRange(KeyRangeRef(IndirectShadowPager::TABLE_ENTRY_PREFIX, strinc(IndirectShadowPager::TABLE_ENTRY_PREFIX)))); + + if(tableEntries.size() > 0) { + BinaryReader kr(tableEntries.back().key, Unversioned()); + + uint8_t prefix; + LogicalPageID logicalPageID; + + kr >> prefix; + ASSERT(prefix == IndirectShadowPager::TABLE_ENTRY_PREFIX.begin()[0]); + + kr >> logicalPageID; + logicalPageID = bigEndian(logicalPageID); + + LogicalPageID pageTableSize = std::max(logicalPageID+1, SERVER_KNOBS->PAGER_RESERVED_PAGES); + pager->pageTable.resize(pageTableSize); + fprintf(stderr, "Recovered page table size: %d\n", pageTableSize); + } + else { + fprintf(stderr, "Recovered no page table entries\n"); + } + + LogicalPageID nextPageID = SERVER_KNOBS->PAGER_RESERVED_PAGES; + std::set allocatedPhysicalPages; + for(auto entry : tableEntries) { + BinaryReader kr(entry.key, Unversioned()); + BinaryReader vr(entry.value, Unversioned()); + + uint8_t prefix; + LogicalPageID logicalPageID; + Version version; + PhysicalPageID physicalPageID; + + kr >> prefix; + ASSERT(prefix == IndirectShadowPager::TABLE_ENTRY_PREFIX.begin()[0]); + + kr >> logicalPageID; + logicalPageID = bigEndian(logicalPageID); + + kr >> version; + version = bigEndian(version); + + vr >> physicalPageID; + + pager->pageTable[logicalPageID].push_back(std::make_pair(version, physicalPageID)); + + if(physicalPageID != PagerFile::INVALID_PAGE) { + allocatedPhysicalPages.insert(physicalPageID); + pager->pagerFile.markPageAllocated(logicalPageID, version, physicalPageID); + } + + while(nextPageID < logicalPageID) { + pager->logicalFreeList.push_back(nextPageID++); + } + if(logicalPageID == nextPageID) { + ++nextPageID; + } + + fprintf(stderr, "Recovered page table entry %d -> (%d, %d)\n", logicalPageID, version, physicalPageID); + } + + fprintf(stderr, "Building physical free list\n"); + // TODO: can we do this better? does it require storing extra info in the log? + PhysicalPageID nextPhysicalPageID = 0; + for(auto itr = allocatedPhysicalPages.begin(); itr != allocatedPhysicalPages.end(); ++itr) { + while(nextPhysicalPageID < *itr) { + pager->pagerFile.freePage(nextPhysicalPageID++); + } + ++nextPhysicalPageID; + } + + while(nextPhysicalPageID < pager->pagerFile.pagesAllocated) { + pager->pagerFile.freePage(nextPhysicalPageID++); + } + } + } + + if(pager->pageTable.size() < SERVER_KNOBS->PAGER_RESERVED_PAGES) { + pager->pageTable.resize(SERVER_KNOBS->PAGER_RESERVED_PAGES); + } + + pager->pagerFile.finishedMarkingPages(); + pager->pagerFile.startVacuuming(); + + fprintf(stderr, "Finished recovery\n", pager->oldestVersion); + TraceEvent("PagerFinishedRecovery"); + } + catch(Error &e) { + TraceEvent(SevError, "PagerRecoveryFailed").error(e, true); + throw e; + } + + return Void(); +} + +ACTOR Future housekeeper(IndirectShadowPager *pager) { + Void _ = wait(pager->recovery); + + loop { + state LogicalPageID pageID = 0; + for(; pageID < pager->pageTable.size(); ++pageID) { + // TODO: pick an appropriate rate for this loop and determine the right way to implement it + // Right now, this delays 10ms every 400K pages, which means we have 1s of delay for every + // 40M pages. In total, we introduce 100s delay for a max size 4B page file. + if(pageID % 400000 == 0) { + Void _ = wait(delay(0.01)); + } + else { + Void _ = wait(yield()); + } + + auto& pageVersionMap = pager->pageTable[pageID]; + + if(pageVersionMap.size() > 0) { + auto itr = pageVersionMap.begin(); + for(auto prev = itr; prev != pageVersionMap.end() && prev->first < pager->oldestVersion; prev=itr) { + pager->pagerFile.markPageAllocated(pageID, itr->first, itr->second); + ++itr; + if(prev->second != PagerFile::INVALID_PAGE && (itr == pageVersionMap.end() || itr->first <= pager->oldestVersion)) { + pager->freePhysicalPageID(prev->second); + } + if(itr == pageVersionMap.end() || itr->first >= pager->oldestVersion) { + //fprintf(stderr, "Updating oldest version for logical page %d: %d\n", pageID, pager->oldestVersion); + pager->logPageTableClear(pageID, 0, pager->oldestVersion); + + if(itr != pageVersionMap.end() && itr->first > pager->oldestVersion) { + //fprintf(stderr, "Erasing pages to prev from pageVersionMap for %d (itr=%d, prev=%d)\n", pageID, itr->first, prev->first); + prev->first = pager->oldestVersion; + pager->logPageTableUpdate(pageID, pager->oldestVersion, prev->second); + itr = pageVersionMap.erase(pageVersionMap.begin(), prev); + } + else { + //fprintf(stderr, "Erasing pages to itr from pageVersionMap for %d (%d) (itr=%d, prev=%d)\n", pageID, itr == pageVersionMap.end(), itr==pageVersionMap.end() ? -1 : itr->first, prev->first); + itr = pageVersionMap.erase(pageVersionMap.begin(), itr); + } + } + } + + for(; itr != pageVersionMap.end(); ++itr) { + pager->pagerFile.markPageAllocated(pageID, itr->first, itr->second); + } + + if(pageVersionMap.size() == 0) { + pager->freeLogicalPageID(pageID); + } + } + } + + pager->pagerFile.finishedMarkingPages(); + } +} + +ACTOR Future forwardError(Future f, Promise target) { + try { + Void _ = wait(f); + } + catch(Error &e) { + if(target.canBeSet()) { + target.sendError(e); + } + + throw e; + } + + return Void(); +} + +IndirectShadowPager::IndirectShadowPager(std::string basename) + : basename(basename), latestVersion(0), committedVersion(0), committing(Void()), oldestVersion(0), pagerFile(this) +{ + recovery = forwardError(recover(this), errorPromise); + housekeeping = forwardError(housekeeper(this), errorPromise); +} + +Reference IndirectShadowPager::newPageBuffer() { + return Reference(new IndirectShadowPage()); +} + +int IndirectShadowPager::getUsablePageSize() { + return IndirectShadowPage::PAGE_BYTES - IndirectShadowPage::PAGE_OVERHEAD_BYTES; +} + +Reference IndirectShadowPager::getReadSnapshot(Version version) { + ASSERT(recovery.isReady()); + ASSERT(version <= latestVersion); + ASSERT(version >= oldestVersion); + + return Reference(new IndirectShadowPagerSnapshot(this, version)); +} + +LogicalPageID IndirectShadowPager::allocateLogicalPage() { + ASSERT(recovery.isReady()); + + LogicalPageID allocatedPage; + if(logicalFreeList.size() > 0) { + allocatedPage = logicalFreeList.front(); + logicalFreeList.pop_front(); + } + else { + ASSERT(pageTable.size() < std::numeric_limits::max()); // TODO: different error? + allocatedPage = pageTable.size(); + pageTable.push_back(PageVersionMap()); + } + + ASSERT(allocatedPage >= SERVER_KNOBS->PAGER_RESERVED_PAGES); + return allocatedPage; +} + +void IndirectShadowPager::freeLogicalPage(LogicalPageID pageID, Version version) { + ASSERT(recovery.isReady()); + ASSERT(committing.isReady()); + + ASSERT(pageID < pageTable.size()); + + PageVersionMap &pageVersionMap = pageTable[pageID]; + ASSERT(!pageVersionMap.empty()); + + auto itr = pageVersionMapLowerBound(pageVersionMap, version); + for(auto i = itr; i != pageVersionMap.end(); ++i) { + freePhysicalPageID(i->second); + } + + if(itr != pageVersionMap.end()) { + //fprintf(stderr, "Clearing newest versions for logical page %d: %d\n", pageID, version); + logPageTableClearToEnd(pageID, version); + pageVersionMap.erase(itr, pageVersionMap.end()); + } + + if(pageVersionMap.size() == 0) { + fprintf(stderr, "Freeing logical page %d (freeLogicalPage)\n", pageID); + logicalFreeList.push_back(pageID); + } + else if(pageVersionMap.back().second != PagerFile::INVALID_PAGE) { + pageVersionMap.push_back(std::make_pair(version, PagerFile::INVALID_PAGE)); + logPageTableUpdate(pageID, version, PagerFile::INVALID_PAGE); + } +} + +ACTOR Future waitAndFreePhysicalPageID(IndirectShadowPager *pager, PhysicalPageID pageID, Future canFree) { + Void _ = wait(canFree); + pager->pagerFile.freePage(pageID); + return Void(); +} + +// TODO: we need to be more careful about making sure the action that caused the page to be freed is durable before freeing the page +// Otherwise, the page could be rewritten prematurely. +void IndirectShadowPager::freePhysicalPageID(PhysicalPageID pageID) { + fprintf(stderr, "Freeing physical page: %u\n", pageID); + auto itr = readCounts.find(pageID); + if(itr != readCounts.end()) { + operations.add(waitAndFreePhysicalPageID(this, pageID, itr->second.second.getFuture())); + } + else { + pagerFile.freePage(pageID); + } +} + +void IndirectShadowPager::writePage(LogicalPageID pageID, Reference contents, Version updateVersion) { + ASSERT(recovery.isReady()); + ASSERT(committing.isReady()); + + ASSERT(updateVersion > latestVersion || updateVersion == 0); + ASSERT(pageID < pageTable.size()); + + PageVersionMap &pageVersionMap = pageTable[pageID]; + + ASSERT(pageVersionMap.empty() || pageVersionMap.back().second != PagerFile::INVALID_PAGE); + + // TODO: should this be conditional on the write succeeding? + bool updateExisting = updateVersion == 0; + if(updateExisting) { + ASSERT(pageVersionMap.size()); + updateVersion = pageVersionMap.back().first; + } + + PhysicalPageID physicalPageID = pagerFile.allocatePage(pageID, updateVersion); + + if(updateExisting) { + freePhysicalPageID(pageVersionMap.back().second); + pageVersionMap.back().second = physicalPageID; + } + else { + ASSERT(pageVersionMap.empty() || pageVersionMap.back().first < updateVersion); + pageVersionMap.push_back(std::make_pair(updateVersion, physicalPageID)); + } + + logPageTableUpdate(pageID, updateVersion, physicalPageID); + Future write = dataFile->write(contents->begin(), IndirectShadowPage::PAGE_BYTES, physicalPageID * IndirectShadowPage::PAGE_BYTES); + + if(write.isError()) { + if(errorPromise.canBeSet()) { + errorPromise.sendError(write.getError()); + } + throw write.getError(); + } + writeActors.add(forwardError(write, errorPromise)); +} + +void IndirectShadowPager::forgetVersions(Version begin, Version end) { + ASSERT(recovery.isReady()); + ASSERT(begin <= end); + ASSERT(end <= latestVersion); + + // TODO: support forgetting arbitrary ranges + if(begin <= oldestVersion) { + oldestVersion = std::max(end, oldestVersion); + logVersion(OLDEST_VERSION_KEY, oldestVersion); + } +} + +ACTOR Future commitImpl(IndirectShadowPager *pager, Future previousCommit) { + state Future outstandingWrites = pager->writeActors.signalAndCollapse(); + state Version commitVersion = pager->latestVersion; + + Void _ = wait(previousCommit); + + pager->logVersion(IndirectShadowPager::LATEST_VERSION_KEY, commitVersion); + + // TODO: we need to prevent writes that happen now from being committed in the subsequent log commit + // This is probably best done once we have better control of the log, where we can write a commit entry + // here without syncing the file. + + Void _ = wait(outstandingWrites); + + Void _ = wait(pager->dataFile->sync()); + Void _ = wait(pager->pageTableLog->commit()); + + pager->committedVersion = std::max(pager->committedVersion, commitVersion); + + return Void(); +} + +Future IndirectShadowPager::commit() { + ASSERT(recovery.isReady()); + Future f = commitImpl(this, committing); + committing = f; + return committing; +} + +void IndirectShadowPager::setLatestVersion(Version version) { + ASSERT(recovery.isReady()); + latestVersion = version; +} + +ACTOR Future getLatestVersionImpl(IndirectShadowPager *pager) { + Void _ = wait(pager->recovery); + return pager->latestVersion; +} + +Future IndirectShadowPager::getLatestVersion() { + return getLatestVersionImpl(this); +} + +Future IndirectShadowPager::getError() { + return errorPromise.getFuture(); +} + +Future IndirectShadowPager::onClosed() { + return closed.getFuture(); +} + +ACTOR void shutdown(IndirectShadowPager *pager, bool dispose) { + pager->recovery = Never(); // TODO: this is a hacky way to prevent users from performing operations after calling shutdown. Implement a better mechanism + + Void _ = wait(pager->writeActors.signal()); + Void _ = wait(pager->operations.signal()); + Void _ = wait(pager->committing); + + pager->housekeeping.cancel(); + pager->pagerFile.shutdown(); + + state Future pageTableClosed = pager->pageTableLog->onClosed(); + if(dispose) { + Void _ = wait(IAsyncFileSystem::filesystem()->deleteFile(pager->basename + ".redwood", true)); + pager->pageTableLog->dispose(); + } + else { + pager->pageTableLog->close(); + } + + Void _ = wait(pageTableClosed); + + pager->closed.send(Void()); + delete pager; +} + +void IndirectShadowPager::dispose() { + shutdown(this, true); +} + +void IndirectShadowPager::close() { + shutdown(this, false); +} + +ACTOR Future> getPageImpl(IndirectShadowPager *pager, Reference snapshot, LogicalPageID pageID, Version version) { + ASSERT(pageID < pager->pageTable.size()); + PageVersionMap &pageVersionMap = pager->pageTable[pageID]; + + auto itr = IndirectShadowPager::pageVersionMapUpperBound(pageVersionMap, version); + if(itr == pageVersionMap.begin()) { + ASSERT(false); + } + + --itr; + + state PhysicalPageID physicalPageID = itr->second; + fprintf(stderr, "Reading %d at v%d from %d\n", pageID, version, physicalPageID); + + ASSERT(physicalPageID != PagerFile::INVALID_PAGE); + ++pager->readCounts[physicalPageID].first; + + // We are relying on the use of AsyncFileCached for performance here. We expect that all write actors will complete immediately (with a possible yield()), + // so this wait should either be nonexistent or just a yield. + Void _ = wait(pager->writeActors.signalAndCollapse()); + + state uint8_t *buf = new (snapshot->arena) uint8_t[IndirectShadowPage::PAGE_BYTES]; + int read = wait(pager->dataFile->read(buf, IndirectShadowPage::PAGE_BYTES, physicalPageID * IndirectShadowPage::PAGE_BYTES)); + ASSERT(read == IndirectShadowPage::PAGE_BYTES); + + auto readCountItr = pager->readCounts.find(physicalPageID); + ASSERT(readCountItr != pager->readCounts.end()); + if(readCountItr->second.first == 1) { + readCountItr->second.second.send(Void()); + pager->readCounts.erase(readCountItr); + } + else { + --readCountItr->second.first; + } + + return Reference(new IndirectShadowPage(buf)); +} + +Future> IndirectShadowPager::getPage(Reference snapshot, LogicalPageID pageID, Version version) { + ASSERT(recovery.isReady()); + + Future> f = getPageImpl(this, snapshot, pageID, version); + operations.add(success(f)); + return f; +} + +PageVersionMap::iterator IndirectShadowPager::pageVersionMapLowerBound(PageVersionMap &pageVersionMap, Version version) { + return std::lower_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](std::pair p, Version v) { + return p.first < v; + }); +} + +PageVersionMap::iterator IndirectShadowPager::pageVersionMapUpperBound(PageVersionMap &pageVersionMap, Version version) { + return std::upper_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](Version v, std::pair p) { + return v < p.first; + }); +} + +void IndirectShadowPager::freeLogicalPageID(LogicalPageID pageID) { + if(pageID >= SERVER_KNOBS->PAGER_RESERVED_PAGES) { + fprintf(stderr, "Freeing logical page: %u\n", pageID); + logicalFreeList.push_back(pageID); + } +} + +void IndirectShadowPager::logVersion(StringRef versionKey, Version version) { + BinaryWriter v(Unversioned()); + v << version; + + pageTableLog->set(KeyValueRef(versionKey, v.toStringRef())); +} + +void IndirectShadowPager::logPagesAllocated() { + BinaryWriter v(Unversioned()); + v << pagerFile.getPagesAllocated(); + + pageTableLog->set(KeyValueRef(PAGES_ALLOCATED_KEY, v.toStringRef())); +} + +void IndirectShadowPager::logPageTableUpdate(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID) { + BinaryWriter k(Unversioned()); + k << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(version); + + BinaryWriter v(Unversioned()); + v << physicalPageID; + + pageTableLog->set(KeyValueRef(k.toStringRef(), v.toStringRef())); +} + +void IndirectShadowPager::logPageTableClearToEnd(LogicalPageID logicalPageID, Version start) { + BinaryWriter b(Unversioned()); + b << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(start); + + BinaryWriter e(Unversioned()); + e << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID); + + pageTableLog->clear(KeyRangeRef(b.toStringRef(), strinc(e.toStringRef()))); +} + +void IndirectShadowPager::logPageTableClear(LogicalPageID logicalPageID, Version start, Version end) { + BinaryWriter b(Unversioned()); + b << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(start); + + BinaryWriter e(Unversioned()); + e << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(end); + + pageTableLog->clear(KeyRangeRef(b.toStringRef(), e.toStringRef())); +} + +const StringRef IndirectShadowPager::LATEST_VERSION_KEY = LiteralStringRef("\xff/LatestVersion"); +const StringRef IndirectShadowPager::OLDEST_VERSION_KEY = LiteralStringRef("\xff/OldestVersion"); +const StringRef IndirectShadowPager::PAGES_ALLOCATED_KEY = LiteralStringRef("\xff/PagesAllocated"); +const StringRef IndirectShadowPager::TABLE_ENTRY_PREFIX = LiteralStringRef("\x00"); + +ACTOR Future copyPage(IndirectShadowPager *pager, Reference page, PhysicalPageID from, PhysicalPageID to) { + state bool zeroCopied = true; + state int bytes = IndirectShadowPage::PAGE_BYTES; + state void *data = nullptr; + + try { + try { + Void _ = wait(pager->dataFile->readZeroCopy(&data, &bytes, from * IndirectShadowPage::PAGE_BYTES)); + } + catch(Error &e) { + zeroCopied = false; + data = page->mutate(); + int _bytes = wait(pager->dataFile->read(data, page->size(), from * IndirectShadowPage::PAGE_BYTES)); + bytes = _bytes; + } + + ASSERT(bytes == IndirectShadowPage::PAGE_BYTES); + Void _ = wait(pager->dataFile->write(data, bytes, to * IndirectShadowPage::PAGE_BYTES)); + if(zeroCopied) { + pager->dataFile->releaseZeroCopy(data, bytes, from * IndirectShadowPage::PAGE_BYTES); + } + } + catch(Error &e) { + if(zeroCopied) { + pager->dataFile->releaseZeroCopy(data, bytes, from * IndirectShadowPage::PAGE_BYTES); + } + pager->pagerFile.freePage(to); + throw e; + } + + return Void(); +} + +ACTOR Future vacuumer(IndirectShadowPager *pager, PagerFile *pagerFile) { + state Reference page(new IndirectShadowPage()); + + loop { + state double start = now(); + while(!pagerFile->canVacuum()) { + Void _ = wait(delay(1.0)); + } + + ASSERT(!pagerFile->freePages.empty()); + + if(!pagerFile->vacuumQueue.empty()) { + state PhysicalPageID lastUsedPage = pagerFile->vacuumQueue.rbegin()->first; + PhysicalPageID lastFreePage = *pagerFile->freePages.rbegin(); + //fprintf(stderr, "Vacuuming: evaluating (free list size=%u, lastFreePage=%u, lastUsedPage=%u, pagesAllocated=%u)\n", pagerFile->freePages.size(), lastFreePage, lastUsedPage, pagerFile->pagesAllocated); + ASSERT(lastFreePage < pagerFile->pagesAllocated); + ASSERT(lastUsedPage < pagerFile->pagesAllocated); + ASSERT(lastFreePage != lastUsedPage); + + if(lastFreePage < lastUsedPage) { + state std::pair logicalPageInfo = pagerFile->vacuumQueue[lastUsedPage]; + state PhysicalPageID newPage = pagerFile->allocatePage(logicalPageInfo.first, logicalPageInfo.second); + + fprintf(stderr, "Vacuuming: copying page %u to %u\n", lastUsedPage, newPage); + Void _ = wait(copyPage(pager, page, lastUsedPage, newPage)); + + auto &pageVersionMap = pager->pageTable[logicalPageInfo.first]; + auto itr = IndirectShadowPager::pageVersionMapLowerBound(pageVersionMap, logicalPageInfo.second); + if(itr != pageVersionMap.end() && itr->second == lastUsedPage) { + itr->second = newPage; + pager->logPageTableUpdate(logicalPageInfo.first, itr->first, newPage); + pagerFile->freePage(lastUsedPage); + } + else { + TEST(true); // page was freed while vacuuming + pagerFile->freePage(newPage); + } + } + } + + PhysicalPageID firstFreePage = pagerFile->vacuumQueue.empty() ? pagerFile->minVacuumQueuePage : (pagerFile->vacuumQueue.rbegin()->first + 1); + ASSERT(pagerFile->pagesAllocated >= firstFreePage); + + uint64_t pagesToErase = 0; + if(pagerFile->freePages.size() >= SERVER_KNOBS->FREE_PAGE_VACUUM_THRESHOLD) { + pagesToErase = std::min(pagerFile->freePages.size() - SERVER_KNOBS->FREE_PAGE_VACUUM_THRESHOLD + 1, pagerFile->pagesAllocated - firstFreePage); + } + + //fprintf(stderr, "Vacuuming: got %u pages to erase (freePages=%u, pagesAllocated=%u, vacuumQueueEmpty=%u, minVacuumQueuePage=%u, firstFreePage=%u)\n", pagesToErase, pagerFile->freePages.size(), pagerFile->pagesAllocated, pagerFile->vacuumQueue.empty(), pagerFile->minVacuumQueuePage, firstFreePage); + + if(pagesToErase > 0) { + PhysicalPageID eraseStartPage = pagerFile->pagesAllocated - pagesToErase; + fprintf(stderr, "Vacuuming: truncating last %u pages starting at %u\n", pagesToErase, eraseStartPage); + + ASSERT(pagesToErase <= pagerFile->pagesAllocated); + + pagerFile->pagesAllocated = eraseStartPage; + pager->logPagesAllocated(); + + auto freePageItr = pagerFile->freePages.find(eraseStartPage); + ASSERT(freePageItr != pagerFile->freePages.end()); + + pagerFile->freePages.erase(freePageItr, pagerFile->freePages.end()); + ASSERT(pagerFile->vacuumQueue.empty() || pagerFile->vacuumQueue.rbegin()->first < eraseStartPage); + + Void _ = wait(pager->dataFile->truncate(pagerFile->pagesAllocated * IndirectShadowPage::PAGE_BYTES)); + } + + Void _ = wait(delayUntil(start + (double)IndirectShadowPage::PAGE_BYTES / SERVER_KNOBS->VACUUM_BYTES_PER_SECOND)); // TODO: figure out the correct mechanism here + } +} + +PagerFile::PagerFile(IndirectShadowPager *pager) : fileSize(0), pagesAllocated(0), pager(pager), vacuumQueueReady(false), minVacuumQueuePage(0) {} + +PhysicalPageID PagerFile::allocatePage(LogicalPageID logicalPageID, Version version) { + ASSERT(pagesAllocated * IndirectShadowPage::PAGE_BYTES <= fileSize); + ASSERT(fileSize % IndirectShadowPage::PAGE_BYTES == 0); + + PhysicalPageID allocatedPage; + if(!freePages.empty()) { + allocatedPage = *freePages.begin(); + freePages.erase(freePages.begin()); + } + else { + if(pagesAllocated * IndirectShadowPage::PAGE_BYTES == fileSize) { + fileSize += (1 << 24); + // TODO: extend the file before writing beyond the end. + } + + ASSERT(pagesAllocated < INVALID_PAGE); // TODO: we should throw a better error here + allocatedPage = pagesAllocated++; + pager->logPagesAllocated(); + } + + markPageAllocated(logicalPageID, version, allocatedPage); + + fprintf(stderr, "Allocated physical page %u\n", allocatedPage); + return allocatedPage; +} + +void PagerFile::freePage(PhysicalPageID pageID) { + freePages.insert(pageID); + + if(pageID >= minVacuumQueuePage) { + vacuumQueue.erase(pageID); + } +} + +void PagerFile::markPageAllocated(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID) { + if(physicalPageID != INVALID_PAGE && physicalPageID >= minVacuumQueuePage) { + vacuumQueue[physicalPageID] = std::make_pair(logicalPageID, version); + } +} + +void PagerFile::finishedMarkingPages() { + if(minVacuumQueuePage >= pagesAllocated) { + minVacuumQueuePage = pagesAllocated >= SERVER_KNOBS->VACUUM_QUEUE_SIZE ? pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE : 0; + vacuumQueueReady = false; + } + else { + if(!vacuumQueueReady) { + vacuumQueueReady = true; + } + if(pagesAllocated > SERVER_KNOBS->VACUUM_QUEUE_SIZE && minVacuumQueuePage < pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE) { + minVacuumQueuePage = pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE; + auto itr = vacuumQueue.lower_bound(minVacuumQueuePage); + vacuumQueue.erase(vacuumQueue.begin(), itr); + } + } +} + +uint64_t PagerFile::size() { + return fileSize; +} + +uint32_t PagerFile::getPagesAllocated() { + return pagesAllocated; +} + +void PagerFile::init(uint64_t fileSize, uint32_t pagesAllocated) { + this->fileSize = fileSize; + this->pagesAllocated = pagesAllocated; + this->minVacuumQueuePage = pagesAllocated >= SERVER_KNOBS->VACUUM_QUEUE_SIZE ? pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE : 0; +} + +void PagerFile::startVacuuming() { + vacuuming = vacuumer(pager, this); +} + +void PagerFile::shutdown() { + vacuuming.cancel(); +} + +bool PagerFile::canVacuum() { + if(freePages.size() < SERVER_KNOBS->FREE_PAGE_VACUUM_THRESHOLD // Not enough free pages + || minVacuumQueuePage >= pagesAllocated // We finished processing all pages in the vacuum queue + || !vacuumQueueReady) // Populating vacuum queue + { + //fprintf(stderr, "Vacuuming: waiting for vacuumable pages (free list size=%u, minVacuumQueuePage=%u, pages allocated=%u, vacuumQueueReady=%d)\n", freePages.size(), minVacuumQueuePage, pagesAllocated, vacuumQueueReady); + return false; + } + + return true; +} + +const PhysicalPageID PagerFile::INVALID_PAGE = std::numeric_limits::max(); + +extern Future simplePagerTest(IPager* const& pager); + +TEST_CASE("fdbserver/indirectshadowpager/simple") { + state IPager *pager = new IndirectShadowPager("data/test"); + + Void _ = wait(simplePagerTest(pager)); + + Future closedFuture = pager->onClosed(); + pager->close(); + Void _ = wait(closedFuture); + + return Void(); +} diff --git a/fdbserver/IndirectShadowPager.h b/fdbserver/IndirectShadowPager.h new file mode 100644 index 0000000000..69e917f644 --- /dev/null +++ b/fdbserver/IndirectShadowPager.h @@ -0,0 +1,192 @@ +/* + * IndirectShadowPager.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBSERVER_INDIRECTSHADOWPAGER_H +#define FDBSERVER_INDIRECTSHADOWPAGER_H +#pragma once + +#include "IKeyValueStore.h" +#include "IPager.h" + +#include "flow/ActorCollection.h" +#include "flow/Notified.h" + +#include "fdbrpc/IAsyncFile.h" + +typedef uint32_t PhysicalPageID; +typedef std::vector> PageVersionMap; +typedef std::vector LogicalPageTable; + +class IndirectShadowPager; + +class IndirectShadowPage : public IPage, ReferenceCounted { +public: + IndirectShadowPage(); + IndirectShadowPage(uint8_t *data); + virtual ~IndirectShadowPage(); + + virtual void addref() const { + ReferenceCounted::addref(); + } + + virtual void delref() const { + ReferenceCounted::delref(); + } + + virtual int size() const; + virtual uint8_t const* begin() const; + virtual uint8_t* mutate(); + +//private: + static const int PAGE_BYTES; + static const int PAGE_OVERHEAD_BYTES; + +private: + uint8_t *data; + bool allocated; +}; + +class IndirectShadowPagerSnapshot : public IPagerSnapshot, ReferenceCounted { +public: + IndirectShadowPagerSnapshot(IndirectShadowPager *pager, Version version) : pager(pager), version(version) {} + virtual Future> getPhysicalPage(LogicalPageID pageID); + virtual void invalidateReturnedPages(); + + virtual void addref() { + ReferenceCounted::addref(); + } + + virtual void delref() { + ReferenceCounted::delref(); + } + + Arena arena; +private: + IndirectShadowPager *pager; + Version version; +}; + +class PagerFile { +public: + PagerFile(IndirectShadowPager *pager); + + PhysicalPageID allocatePage(LogicalPageID logicalPageID, Version version); + void freePage(PhysicalPageID physicalPageID); + void markPageAllocated(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID); + + void finishedMarkingPages(); + + uint64_t size(); + uint32_t getPagesAllocated(); + + void init(uint64_t fileSize, uint32_t pagesAllocated); + void startVacuuming(); + void shutdown(); + +//private: + Future vacuuming; + IndirectShadowPager *pager; + + uint32_t pagesAllocated; + uint64_t fileSize; + + std::set freePages; + + PhysicalPageID minVacuumQueuePage; + bool vacuumQueueReady; + std::map> vacuumQueue; + + bool canVacuum(); + + static const PhysicalPageID INVALID_PAGE; +}; + +class IndirectShadowPager : public IPager { +public: + IndirectShadowPager(std::string basename); + + virtual Reference newPageBuffer(); + virtual int getUsablePageSize(); + + virtual Reference getReadSnapshot(Version version); + + virtual LogicalPageID allocateLogicalPage(); + virtual void freeLogicalPage(LogicalPageID pageID, Version version); + virtual void writePage(LogicalPageID pageID, Reference contents, Version updateVersion); + virtual void forgetVersions(Version begin, Version end); + virtual Future commit(); + + virtual void setLatestVersion(Version version); + virtual Future getLatestVersion(); + + virtual Future getError(); + virtual Future onClosed(); + virtual void dispose(); + virtual void close(); + + Future> getPage(Reference snapshot, LogicalPageID pageID, Version version); + +//private: + std::string basename; + + Version latestVersion; + Version committedVersion; + + LogicalPageTable pageTable; + IKeyValueStore *pageTableLog; + + Reference dataFile; + Future recovery; + + Future housekeeping; + Future vacuuming; + Version oldestVersion; + + std::map>> readCounts; + SignalableActorCollection operations; + SignalableActorCollection writeActors; + Future committing; + + Promise closed; + Promise errorPromise; + + std::deque logicalFreeList; + PagerFile pagerFile; + + static PageVersionMap::iterator pageVersionMapLowerBound(PageVersionMap &pageVersionMap, Version v); + static PageVersionMap::iterator pageVersionMapUpperBound(PageVersionMap &pageVersionMap, Version v); + + void freeLogicalPageID(LogicalPageID pageID); + void freePhysicalPageID(PhysicalPageID pageID); + + void logVersion(StringRef versionKey, Version version); + void logPagesAllocated(); + void logPageTableUpdate(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID); + void logPageTableClearToEnd(LogicalPageID logicalPageID, Version start); + void logPageTableClear(LogicalPageID logicalPageID, Version start, Version end); + + static const StringRef LATEST_VERSION_KEY; + static const StringRef OLDEST_VERSION_KEY; + static const StringRef PAGES_ALLOCATED_KEY; + static const StringRef TABLE_ENTRY_PREFIX; + +}; + +#endif diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index f315410d89..42c6741980 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -361,6 +361,14 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( STATUS_MIN_TIME_BETWEEN_REQUESTS, 0.0 ); init( CONFIGURATION_ROWS_TO_FETCH, 20000 ); + // IPager + init( PAGER_RESERVED_PAGES, 1 ); + + // IndirectShadowPager + init( FREE_PAGE_VACUUM_THRESHOLD, 1 ); + init( VACUUM_QUEUE_SIZE, 100000 ); + init( VACUUM_BYTES_PER_SECOND, 1e6 ); + if(clientKnobs) clientKnobs->IS_ACCEPTABLE_DELAY = clientKnobs->IS_ACCEPTABLE_DELAY*std::min(MAX_READ_TRANSACTION_LIFE_VERSIONS, MAX_WRITE_TRANSACTION_LIFE_VERSIONS)/(5.0*VERSIONS_PER_SECOND); } diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index c89b94e7b5..9f097da8e4 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -303,6 +303,14 @@ public: double STATUS_MIN_TIME_BETWEEN_REQUESTS; int CONFIGURATION_ROWS_TO_FETCH; + // IPager + int PAGER_RESERVED_PAGES; + + // IndirectShadowPager + int FREE_PAGE_VACUUM_THRESHOLD; + int VACUUM_QUEUE_SIZE; + int VACUUM_BYTES_PER_SECOND; + ServerKnobs(bool randomize = false, ClientKnobs* clientKnobs = NULL); }; diff --git a/fdbserver/MemoryPager.actor.cpp b/fdbserver/MemoryPager.actor.cpp new file mode 100644 index 0000000000..e55f9cb021 --- /dev/null +++ b/fdbserver/MemoryPager.actor.cpp @@ -0,0 +1,343 @@ +/* + * MemoryPager.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "MemoryPager.h" +#include "Knobs.h" + +#include "flow/Arena.h" +#include "flow/UnitTest.h" + +MemoryPage::MemoryPage() : allocated(true) { + data = (uint8_t*)FastAllocator<4096>::allocate(); +} + +MemoryPage::MemoryPage(uint8_t *data) : data(data), allocated(false) {} + +MemoryPage::~MemoryPage() { + if(allocated) { + FastAllocator<4096>::release(data); + } +} + +uint8_t const* MemoryPage::begin() const { + return data; +} + +uint8_t* MemoryPage::mutate() { + return data; +} + +int MemoryPage::size() const { + return PAGE_BYTES; +} + +const int MemoryPage::PAGE_BYTES = 4096; + +Future> MemoryPagerSnapshot::getPhysicalPage(LogicalPageID pageID) { + return pager->getPage(pageID, version); +} + +MemoryPager::MemoryPager() : latestVersion(0), committedVersion(0) { + extendData(); + pageTable.resize(SERVER_KNOBS->PAGER_RESERVED_PAGES); +} + +Reference MemoryPager::newPageBuffer() { + return Reference(new MemoryPage()); +} + +int MemoryPager::getUsablePageSize() { + return MemoryPage::PAGE_BYTES; +} + +Reference MemoryPager::getReadSnapshot(Version version) { + ASSERT(version <= latestVersion); + return Reference(new MemoryPagerSnapshot(this, version)); +} + +LogicalPageID MemoryPager::allocateLogicalPage() { + ASSERT(pageTable.size() >= SERVER_KNOBS->PAGER_RESERVED_PAGES); + pageTable.push_back(PageVersionMap()); + return pageTable.size() - 1; +} + +void MemoryPager::freeLogicalPage(LogicalPageID pageID, Version version) { + ASSERT(pageID < pageTable.size()); + + PageVersionMap &pageVersionMap = pageTable[pageID]; + ASSERT(!pageVersionMap.empty()); + + auto itr = std::lower_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](std::pair p, Version v) { + return p.first < v; + }); + + pageVersionMap.erase(itr, pageVersionMap.end()); + if(pageVersionMap.size() > 0 && pageVersionMap.back().second != INVALID_PAGE) { + pageVersionMap.push_back(std::make_pair(version, INVALID_PAGE)); + } +} + +void MemoryPager::writePage(LogicalPageID pageID, Reference contents, Version updateVersion) { + ASSERT(updateVersion > latestVersion || updateVersion == 0); + ASSERT(pageID < pageTable.size()); + + PageVersionMap &pageVersionMap = pageTable[pageID]; + + ASSERT(updateVersion >= committedVersion || updateVersion == 0); + PhysicalPageID physicalPageID = allocatePage(contents); + + ASSERT(pageVersionMap.empty() || pageVersionMap.back().second != INVALID_PAGE); + + if(updateVersion == 0) { + ASSERT(pageVersionMap.size()); + updateVersion = pageVersionMap.back().first; + pageVersionMap.back().second = physicalPageID; + // TODO: what to do with old page? + } + else { + ASSERT(pageVersionMap.empty() || pageVersionMap.back().first < updateVersion); + pageVersionMap.push_back(std::make_pair(updateVersion, physicalPageID)); + } + +} + +void MemoryPager::forgetVersions(Version begin, Version end) { + ASSERT(begin <= end); + ASSERT(end <= latestVersion); + // TODO +} + +Future MemoryPager::commit() { + ASSERT(committedVersion < latestVersion); + committedVersion = latestVersion; + return Void(); +} + +void MemoryPager::setLatestVersion(Version version) { + ASSERT(version > latestVersion); + latestVersion = version; +} + +Future MemoryPager::getLatestVersion() { + return latestVersion; +} + +Reference MemoryPager::getPage(LogicalPageID pageID, Version version) { + ASSERT(pageID < pageTable.size()); + PageVersionMap const& pageVersionMap = pageTable[pageID]; + + auto itr = std::upper_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](Version v, std::pair p) { + return v < p.first; + }); + + if(itr == pageVersionMap.begin()) { + return Reference(); // TODO: should this be an error? + } + + --itr; + + ASSERT(itr->second != INVALID_PAGE); + return Reference(new MemoryPage(itr->second)); // TODO: Page memory owned by the pager. Change this? +} + +Future MemoryPager::getError() { + return Void(); +} + +Future MemoryPager::onClosed() { + return closed.getFuture(); +} + +void MemoryPager::dispose() { + closed.send(Void()); + delete this; +} + +void MemoryPager::close() { + dispose(); +} + +PhysicalPageID MemoryPager::allocatePage(Reference contents) { + if(freeList.size()) { + PhysicalPageID pageID = freeList.back(); + freeList.pop_back(); + + memcpy(pageID, contents->begin(), contents->size()); + return pageID; + } + else { + ASSERT(data.size() && data.back().capacity() - data.back().size() >= contents->size()); + PhysicalPageID pageID = data.back().end(); + + data.back().append(data.arena(), contents->begin(), contents->size()); + if(data.back().size() == data.back().capacity()) { + extendData(); + } + else { + ASSERT(data.back().size() <= data.back().capacity() - 4096); + } + + return pageID; + } +} + +void MemoryPager::extendData() { + if(data.size() > 1000) { // TODO: is this an ok way to handle large data size? + throw io_error(); + } + + VectorRef d; + d.reserve(data.arena(), 1 << 22); + data.push_back(data.arena(), d); +} + +// TODO: these tests are not MemoryPager specific, we should make them more general + +void fillPage(Reference page, LogicalPageID pageID, Version version) { + ASSERT(page->size() > sizeof(LogicalPageID) + sizeof(Version)); + + memset(page->mutate(), 0, page->size()); + memcpy(page->mutate(), (void*)&pageID, sizeof(LogicalPageID)); + memcpy(page->mutate() + sizeof(LogicalPageID), (void*)&version, sizeof(Version)); +} + +bool validatePage(Reference page, LogicalPageID pageID, Version version) { + bool valid = true; + + LogicalPageID readPageID = *(LogicalPageID*)page->begin(); + if(readPageID != pageID) { + fprintf(stderr, "Invalid PageID detected: %lld (expected %lld)\n", readPageID, pageID); + valid = false; + } + + Version readVersion = *(Version*)(page->begin()+sizeof(LogicalPageID)); + if(readVersion != version) { + fprintf(stderr, "Invalid Version detected on page %lld: %lld (expected %lld)\n", pageID, readVersion, version); + valid = false; + } + + return valid; +} + +void writePage(IPager *pager, Reference page, LogicalPageID pageID, Version version, bool updateVersion=true) { + fillPage(page, pageID, version); + pager->writePage(pageID, page, updateVersion ? version : 0); +} + +ACTOR Future commit(IPager *pager) { + static int commitNum = 1; + state int myCommit = commitNum++; + + fprintf(stderr, "Commit%d\n", myCommit); + Void _ = wait(pager->commit()); + fprintf(stderr, "FinishedCommit%d\n", myCommit); + return Void(); +} + +ACTOR Future read(IPager *pager, LogicalPageID pageID, Version version, Version expectedVersion=-1) { + static int readNum = 1; + state int myRead = readNum++; + state Reference readSnapshot = pager->getReadSnapshot(version); + fprintf(stderr, "Read%d\n", myRead); + Reference readPage = wait(readSnapshot->getPhysicalPage(pageID)); + fprintf(stderr, "FinishedRead%d\n", myRead); + ASSERT(validatePage(readPage, pageID, expectedVersion >= 0 ? expectedVersion : version)); + return Void(); +} + +ACTOR Future simplePagerTest(IPager *pager) { + state Reference page = pager->newPageBuffer(); + + Version latestVersion = wait(pager->getLatestVersion()); + fprintf(stderr, "Got latest version: %d\n", latestVersion); + + state Version version = latestVersion+1; + state Version v1 = version; + + state LogicalPageID pageID1 = pager->allocateLogicalPage(); + + writePage(pager, page, pageID1, v1); + pager->setLatestVersion(v1); + Void _ = wait(commit(pager)); + + state LogicalPageID pageID2 = pager->allocateLogicalPage(); + + state Version v2 = ++version; + + writePage(pager, page, pageID1, v2); + writePage(pager, page, pageID2, v2); + pager->setLatestVersion(v2); + Void _ = wait(commit(pager)); + + Void _ = wait(read(pager, pageID1, v2)); + Void _ = wait(read(pager, pageID1, v1)); + + state Version v3 = ++version; + writePage(pager, page, pageID1, v3, false); + pager->setLatestVersion(v3); + + Void _ = wait(read(pager, pageID1, v2, v3)); + Void _ = wait(read(pager, pageID1, v3, v3)); + + state LogicalPageID pageID3 = pager->allocateLogicalPage(); + + state Version v4 = ++version; + writePage(pager, page, pageID2, v4); + writePage(pager, page, pageID3, v4); + pager->setLatestVersion(v4); + Void _ = wait(commit(pager)); + + Void _ = wait(read(pager, pageID2, v4, v4)); + + state Version v5 = ++version; + writePage(pager, page, pageID2, v5); + + state LogicalPageID pageID4 = pager->allocateLogicalPage(); + writePage(pager, page, pageID4, v5); + + state Version v6 = ++version; + pager->freeLogicalPage(pageID2, v5); + pager->freeLogicalPage(pageID3, v3); + pager->setLatestVersion(v6); + Void _ = wait(commit(pager)); + + pager->forgetVersions(0, v4); + Void _ = wait(commit(pager)); + + Void _ = wait(delay(3.0)); + + Void _ = wait(commit(pager)); + + return Void(); +} + +TEST_CASE("fdbserver/memorypager/simple") { + state IPager *pager = new MemoryPager(); + + Void _ = wait(simplePagerTest(pager)); + + Future closedFuture = pager->onClosed(); + pager->dispose(); + + Void _ = wait(closedFuture); + return Void(); +} + +const PhysicalPageID MemoryPager::INVALID_PAGE = nullptr; \ No newline at end of file diff --git a/fdbserver/MemoryPager.h b/fdbserver/MemoryPager.h new file mode 100644 index 0000000000..5de86054b2 --- /dev/null +++ b/fdbserver/MemoryPager.h @@ -0,0 +1,119 @@ +/* + * MemoryPager.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBSERVER_MEMORYPAGER_H +#define FDBSERVER_MEMORYPAGER_H +#pragma once + +#include "IPager.h" + +typedef uint8_t* PhysicalPageID; +typedef std::vector> PageVersionMap; +typedef std::vector LogicalPageTable; + +class MemoryPager; + +class MemoryPage : public IPage, ReferenceCounted { +public: + MemoryPage(); + MemoryPage(uint8_t *data); + virtual ~MemoryPage(); + + virtual void addref() const { + ReferenceCounted::addref(); + } + + virtual void delref() const { + ReferenceCounted::delref(); + } + + virtual int size() const; + virtual uint8_t const* begin() const; + virtual uint8_t* mutate(); + +private: + friend class MemoryPager; + uint8_t *data; + bool allocated; + + static const int PAGE_BYTES; +}; + +class MemoryPagerSnapshot : public IPagerSnapshot, ReferenceCounted { +public: + MemoryPagerSnapshot(MemoryPager *pager, Version version) : pager(pager), version(version) {} + virtual Future> getPhysicalPage(LogicalPageID pageID); + virtual void invalidateReturnedPages() {} + + virtual void addref() { + ReferenceCounted::addref(); + } + + virtual void delref() { + ReferenceCounted::delref(); + } + +private: + MemoryPager *pager; + Version version; +}; + +class MemoryPager : public IPager, ReferenceCounted { +public: + MemoryPager(); + + virtual Reference newPageBuffer(); + virtual int getUsablePageSize(); + + virtual Reference getReadSnapshot(Version version); + + virtual LogicalPageID allocateLogicalPage(); + virtual void freeLogicalPage(LogicalPageID pageID, Version version); + virtual void writePage(LogicalPageID pageID, Reference contents, Version updateVersion); + virtual void forgetVersions(Version begin, Version end); + virtual Future commit(); + + virtual void setLatestVersion(Version version); + virtual Future getLatestVersion(); + + virtual Future getError(); + virtual Future onClosed(); + virtual void dispose(); + virtual void close(); + + virtual Reference getPage(LogicalPageID pageID, Version version); + +private: + Version latestVersion; + Version committedVersion; + Standalone>> data; + LogicalPageTable pageTable; + + Promise closed; + + std::vector freeList; // TODO: is this good enough for now? + + PhysicalPageID allocatePage(Reference contents); + void extendData(); + + static const PhysicalPageID INVALID_PAGE; +}; + +#endif \ No newline at end of file diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp new file mode 100755 index 0000000000..ada603a403 --- /dev/null +++ b/fdbserver/VersionedBTree.actor.cpp @@ -0,0 +1,673 @@ +/* + * VersionedBTree.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flow/flow.h" +#include "IVersionedStore.h" +#include "IPager.h" +#include "fdbclient/Tuple.h" +#include "flow/serialize.h" +#include "flow/genericactors.actor.h" +#include "flow/UnitTest.h" +#include "MemoryPager.h" +#include +#include + +#define INTERNAL_PAGES_HAVE_TUPLES 1 + +struct SimpleFixedSizeMapRef { + typedef std::vector> KVPairsT; + + SimpleFixedSizeMapRef() : flags(0) {} + + static SimpleFixedSizeMapRef decode(StringRef buf) { + SimpleFixedSizeMapRef result; + BinaryReader r(buf, AssumeVersion(currentProtocolVersion)); + r >> result.flags; + r >> result.entries; + return result; + }; + + // Returns -1 if key is less than first key, otherwise index into entries + int findLastLessOrEqual(StringRef key) { + return std::upper_bound(entries.begin(), entries.end(), + key, + [](StringRef const& a, KVPairsT::value_type const& b) { return a + static Reference emptyPage(uint8_t newFlags, Allocator const &newPageFn) { + Reference page = newPageFn(); + BinaryWriter bw(AssumeVersion(currentProtocolVersion)); + bw << newFlags; + bw << KVPairsT(); + memcpy(page->mutate(), bw.getData(), bw.getLength()); + return page; + } + + template + static vector>> buildMany(const KVPairsT &kvPairs, uint8_t newFlags, Allocator const &newPageFn, int page_size_override = -1) { + vector>> pages; + + Reference page = newPageFn(); + int pageSize = page->size(); + if(page_size_override > 0 && page_size_override < pageSize) + pageSize = page_size_override; + BinaryWriter bw(AssumeVersion(currentProtocolVersion)); + bw << newFlags; + uint32_t i = 0; + uint32_t start = i; + int mapSizeOffset = bw.getLength(); + bw << start; // placeholder for map size + + for(auto const &kv : kvPairs) { + // If page would overflow, output it and start new one + if(bw.getLength() + 8 + kv.first.size() + kv.second.size() > pageSize) { + memcpy(page->mutate(), bw.getData(), bw.getLength()); + *(uint32_t *)(page->mutate() + mapSizeOffset) = i - start; + pages.push_back({start, page}); + bw = BinaryWriter(AssumeVersion(currentProtocolVersion)); + bw << newFlags; + page = newPageFn(); + start = i; + int mapSizeOffset = bw.getLength(); + bw << start; // placeholder for map size; + } + + bw << kv; + ++i; + } + + if(bw.getLength() != sizeof(newFlags)) { + memcpy(page->mutate(), bw.getData(), bw.getLength()); + *(uint32_t *)(page->mutate() + mapSizeOffset) = i - start; + pages.push_back({start, page}); + } + + return pages; + } + + std::string toString(); + + KVPairsT entries; + uint8_t flags; +}; + +#define NOT_IMPLEMENTED { UNSTOPPABLE_ASSERT(false); } + +class VersionedBTree : public IVersionedStore { +public: + enum EPageFlags { IS_LEAF = 1}; + + typedef SimpleFixedSizeMapRef FixedSizeMap; + + virtual Future getError() NOT_IMPLEMENTED + virtual Future onClosed() NOT_IMPLEMENTED + virtual void dispose() NOT_IMPLEMENTED + virtual void close() NOT_IMPLEMENTED + + virtual KeyValueStoreType getType() NOT_IMPLEMENTED + virtual bool supportsMutation(int op) NOT_IMPLEMENTED + virtual StorageBytes getStorageBytes() NOT_IMPLEMENTED + + // Writes are provided in an ordered stream. + // A write is considered part of (a change leading to) the version determined by the previous call to setWriteVersion() + // A write shall not become durable until the following call to commit() begins, and shall be durable once the following call to commit() returns + virtual void set(KeyValueRef keyValue) { + ASSERT(m_writeVersion != invalidVersion); + m_buffer[keyValue.key.toString()].push_back({m_writeVersion, keyValue.value.toString()}); + } + virtual void clear(KeyRangeRef range) NOT_IMPLEMENTED + virtual void mutate(int op, StringRef param1, StringRef param2) NOT_IMPLEMENTED + + // Versions [begin, end) no longer readable + virtual void forgetVersions(Version begin, Version end) NOT_IMPLEMENTED + + virtual Future getLatestVersion() { + if(m_writeVersion != invalidVersion) + return m_writeVersion; + return m_pager->getLatestVersion(); + } + + VersionedBTree(IPager *pager, int page_size_override = -1) : m_pager(pager), m_writeVersion(invalidVersion), m_page_size_override(page_size_override) { + } + + ACTOR static Future init(VersionedBTree *self) { + // TODO: don't just create a new root, load the existing one + self->m_root = self->m_pager->allocateLogicalPage(); + Version latest = wait(self->m_pager->getLatestVersion()); + Version v = latest + 1; + IPager *pager = self->m_pager; + self->writePage(self->m_root, FixedSizeMap::emptyPage(EPageFlags::IS_LEAF, [pager](){ return pager->newPageBuffer(); }), v); + self->m_pager->setLatestVersion(v); + Void _ = wait(self->m_pager->commit()); + return Void(); + } + + Future init() { return init(this); } + + virtual ~VersionedBTree() {} + + // readAtVersion() may only be called on a version which has previously been passed to setWriteVersion() and never previously passed + // to forgetVersion. The returned results when violating this precondition are unspecified; the store is not required to be able to detect violations. + // The returned read cursor provides a consistent snapshot of the versioned store, corresponding to all the writes done with write versions less + // than or equal to the given version. + // If readAtVersion() is called on the *current* write version, the given read cursor MAY reflect subsequent writes at the same + // write version, OR it may represent a snapshot as of the call to readAtVersion(). + virtual Reference readAtVersion(Version v) { + // TODO: Use the buffer to return uncommitted data + return Reference(new Cursor(v, m_pager, m_root)); + } + + // Must be nondecreasing + virtual void setWriteVersion(Version v) { + ASSERT(v >= m_writeVersion); + m_writeVersion = v; + //m_pager->setLatestVersion(v); + } + + virtual Future commit() { + return commit_impl(this); + } + +private: + void writePage(LogicalPageID id, Reference page, Version ver) { + FixedSizeMap map = FixedSizeMap::decode(StringRef(page->begin(), page->size())); + printf("Writing page: id=%d ver=%lld %s\n", id, ver, map.toString().c_str()); + m_pager->writePage(id, page, ver); + } + + LogicalPageID m_root; + + typedef std::vector>>> VersionedChildrenT; + typedef std::map>> MutationBufferT; + struct KeyVersionValue { + KeyVersionValue(Key k, Version ver, Value val) : key(k), version(ver), value(val) {} + bool operator< (KeyVersionValue const &rhs) const { + int64_t cmp = key.compare(rhs.key); + if(cmp == 0) { + cmp = version - rhs.version; + if(cmp == 0) + return false; + } + return cmp < 0; + } + Key key; + Version version; + Value value; + }; + + ACTOR static Future commitSubtree(VersionedBTree *self, Reference snapshot, LogicalPageID root, std::string lowerBoundKey, MutationBufferT::const_iterator bufBegin, MutationBufferT::const_iterator bufEnd) { + printf("commit subtree from page %u\n", root); + if(bufBegin == bufEnd) { + return VersionedChildrenT({ {0,{{lowerBoundKey,root}}} }); + } + + state FixedSizeMap map; + printf("commitSubtree: Reading page %d\n", root); + Reference rawPage = wait(snapshot->getPhysicalPage(root)); + map = FixedSizeMap::decode(StringRef(rawPage->begin(), rawPage->size())); + + if(map.flags & EPageFlags::IS_LEAF) { + VersionedChildrenT results; + FixedSizeMap::KVPairsT kvpairs; + + // Fill existing with records from the roof page (which is a leaf) + std::vector existing; + for(auto const &kv : map.entries) { + Tuple t = Tuple::unpack(kv.first); + existing.push_back(KeyVersionValue(t.getString(0), t.getInt(1), StringRef(kv.second))); + } + + // Fill mutations with changes begin committed + std::vector mutations; + Version minVersion = std::numeric_limits::max(); + MutationBufferT::const_iterator iBuf = bufBegin; + while(iBuf != bufEnd) { + Key k = StringRef(iBuf->first); + for(auto const &vv : iBuf->second) { + mutations.push_back(KeyVersionValue(StringRef(k), vv.first, StringRef(vv.second))); + minVersion = std::min(minVersion, vv.first); + } + ++iBuf; + } + + std::vector merged; + std::merge(existing.cbegin(), existing.cend(), mutations.cbegin(), mutations.cend(), std::back_inserter(merged)); + + // TODO: Make version and key splits based on contents of merged list + + FixedSizeMap::KVPairsT leafEntries; + for(auto const &kvv : merged) { + Tuple t; + t.append(kvv.key); + t.append(kvv.version); + leafEntries.push_back({t.pack().toString(), kvv.value.toString()}); + } + + IPager *pager = self->m_pager; + vector< std::pair> > pages = FixedSizeMap::buildMany( leafEntries, EPageFlags::IS_LEAF, [pager](){ return pager->newPageBuffer(); }, self->m_page_size_override); + + if(pages.size() != 1) + results.push_back( {0, {{lowerBoundKey, root}}} ); + + // Verify that no consecutive split keys are equal + StringRef lastSplitKey(LiteralStringRef("\xff\xff\xff")); + for(auto const &p : pages) { + if(p.first != 0) { + StringRef splitKey = merged[p.first].key; + ASSERT(splitKey != lastSplitKey); + lastSplitKey = splitKey; + } + } + + // For each IPage of data, assign a logical pageID. + std::vector logicalPages; + + // Only reuse first page if only one page is being returned or if root is not the btree root. + if(pages.size() == 1 || root != self->m_root) + logicalPages.push_back(root); + + // Allocate enough pageIDs for all of the pages + for(int i=logicalPages.size(); im_pager->allocateLogicalPage() ); + + // Write each page using its assigned page ID + printf("Writing leaf pages, subtreeRoot=%u\n", root); + for(int i=0; iwritePage(logicalPages[i], pages[i].second, minVersion); + + results.push_back({minVersion, {}}); + + for(int i=0; i> m_futureChildren; + + auto childMutBegin = bufBegin; + + for(int i=0; im_buffer.lower_bound( t.getString(0).toString() ); + } + else + childMutEnd = self->m_buffer.lower_bound( map.entries[i+1].first ); + } + + m_futureChildren.push_back(self->commitSubtree(self, snapshot, *(uint32_t*)map.entries[i].second.data(), map.entries[i].first, childMutBegin, childMutEnd)); + childMutBegin = childMutEnd; + } + + Void _ = wait(waitForAll(m_futureChildren)); + + bool unmodified = true; + for( auto &c : m_futureChildren) { + if(c.get().size() != 1 || c.get()[0].second.size() != 1) { + unmodified = false; + break; + } + } + + if(unmodified) + return VersionedChildrenT({{0, {{lowerBoundKey, root}}}}); + + Version version = 0; + VersionedChildrenT result; + + loop { // over version splits of this page + Version nextVersion = std::numeric_limits::max(); + + FixedSizeMap::KVPairsT childEntries; // Logically std::vector> childEntries; + + // For each Future + //printf("LOOP: Version %lld\n", version); + + for( auto& c : m_futureChildren ) { + const VersionedChildrenT &children = c.get(); + /* + printf(" versioned page set size: %d\n", children.size()); + for(auto &versionedPageSet : children) { + printf(" version: %lld\n", versionedPageSet.first); + for(auto &boundaryPage : versionedPageSet.second) { + printf(" %s -> %u\n", boundaryPage.first.c_str(), boundaryPage.second); + } + } + printf(" Current version: %lld\n", version); + */ + + // Find the first version greater than the current version we are writing + auto cv = std::upper_bound( children.begin(), children.end(), version, [](Version a, VersionedChildrenT::value_type const &b) { return a < b.first; } ); + + // If there are no versions before the one we found, just update nextVersion and continue. + if(cv == children.begin()) { + //printf(" First version (%lld) in set is greater than current, setting nextVersion and continuing\n", cv->first); + nextVersion = std::min(nextVersion, cv->first); + //printf(" curr %lld next %lld\n", version, nextVersion); + continue; + } + + // If a version greater than the current version being written was found, update nextVersion + if(cv != children.end()) { + //printf(" First greater version found is %lld\n"); + nextVersion = std::min(nextVersion, cv->first); + //printf(" curr %lld next %lld\n", version, nextVersion); + } + + // Go back one to the last version that was valid prior to or at the current version we are writing + --cv; + + //printf(" Using children for version %lld\n", cv->first); + + // Add the children at this version to the child entries list for the current version being built. + for (auto &childPage : cv->second) { + //printf(" Adding child page '%s'\n", childPage.first.c_str()); + childEntries.push_back( {childPage.first, std::string((char *)&childPage.second, sizeof(uint32_t))}); + } + } + + //printf("Finished pass through futurechildren. childEntries=%d version=%lld nextVersion=%lld\n", childEntries.size(), version, nextVersion); + + // TODO: Track split points across iterations of this loop, so that they don't shift unnecessarily and + // cause unnecessary path copying + + IPager *pager = self->m_pager; + vector< std::pair> > pages = FixedSizeMap::buildMany( childEntries, 0, [pager](){ return pager->newPageBuffer(); }, self->m_page_size_override); + + // For each IPage of data, assign a logical pageID. + std::vector logicalPages; + + // Only reuse first page if only one page is being returned or if root is not the btree root. + if(pages.size() == 1 || root != self->m_root) + logicalPages.push_back(root); + + // Allocate enough pageIDs for all of the pages + for(int i=logicalPages.size(); im_pager->allocateLogicalPage() ); + + // Write each page using its assigned page ID + printf("Writing internal pages, subtreeRoot=%u\n", root); + for(int i=0; iwritePage( logicalPages[i], pages[i].second, version ); + + result.resize(result.size()+1); + result.back().first = version; + + for(int i=0; i 1 && result.back().second == result.end()[-2].second) + result.pop_back(); + + if (nextVersion == std::numeric_limits::max()) + break; + version = nextVersion; + } + + return result; + } + } + + ACTOR static Future commit_impl(VersionedBTree *self) { + Version latestVersion = wait(self->m_pager->getLatestVersion()); + + VersionedChildrenT rootNodes = wait(commitSubtree(self, self->m_pager->getReadSnapshot(latestVersion), self->m_root, std::string(), self->m_buffer.begin(), self->m_buffer.end())); + + for(auto const &versionedPages : rootNodes) { + // If the version of the root page set is 0 and the page set size is 1 then there is nothing to write. + if(versionedPages.first == 0 & versionedPages.second.size() == 1) + continue; + + FixedSizeMap::KVPairsT childEntries; + for (auto &childPage : versionedPages.second) + childEntries.push_back( {childPage.first, std::string((char *)&childPage.second, sizeof(uint32_t))}); + + IPager *pager = self->m_pager; + vector< std::pair> > pages = FixedSizeMap::buildMany( childEntries, 0, [pager](){ return pager->newPageBuffer(); }, self->m_page_size_override); + // Until we have only one root, write new multi-page top levels of the tree + while(pages.size() != 1) { + printf("Root level would be %d pages\n", pages.size()); + FixedSizeMap::KVPairsT newRootLevelEntries; + printf("Writing new root level at version %lld\n", versionedPages.first); + for(auto const &p : pages) { + LogicalPageID pageID = self->m_pager->allocateLogicalPage(); + self->writePage(pageID, p.second, versionedPages.first); + newRootLevelEntries.push_back( {childEntries[p.first].first, std::string((char *)&pageID, sizeof(LogicalPageID))}); + } + + pages = FixedSizeMap::buildMany( newRootLevelEntries, 0, [pager](){ return pager->newPageBuffer(); }, self->m_page_size_override); + childEntries = std::move(newRootLevelEntries); + } + + printf("Writing new root id %d\n", self->m_root); + self->writePage(self->m_root, pages[0].second, versionedPages.first); + } + + self->m_pager->setLatestVersion(self->m_writeVersion); + self->m_pager->commit(); + + self->m_buffer.clear(); + return Void(); + } + + IPager *m_pager; + MutationBufferT m_buffer; + Version m_writeVersion; + int m_page_size_override; + + class Cursor : public IStoreCursor, public ReferenceCounted { + public: + Cursor(Version version, IPager *pager, LogicalPageID root) + : m_version(version), m_pager(pager->getReadSnapshot(version)), m_root(root) { + } + virtual ~Cursor() {} + + virtual Future findFirstGreaterOrEqual(KeyRef key, int prefetchNextBytes) NOT_IMPLEMENTED + virtual Future findLastLessOrEqual(KeyRef key, int prefetchPriorBytes) NOT_IMPLEMENTED + virtual Future next(bool needValue) NOT_IMPLEMENTED + virtual Future prev(bool needValue) NOT_IMPLEMENTED + + virtual bool isValid() { + return m_kv.present(); + } + + virtual KeyRef getKey() { + return m_kv.get().key; + } + //virtual StringRef getCompressedKey() = 0; + virtual ValueRef getValue() { + return m_kv.get().value; + } + + virtual void invalidateReturnedStrings() { + m_pager->invalidateReturnedPages(); + } + + Version m_version; + Reference m_pager; + Optional m_kv; + Arena m_arena; + LogicalPageID m_root; + + void addref() { ReferenceCounted::addref(); } + void delref() { ReferenceCounted::delref(); } + + ACTOR static Future findEqual_impl(Reference self, KeyRef key) { + state LogicalPageID pageNumber = self->m_root; + + state Tuple t; + t.append(key); + t.append(self->m_version); + state KeyRef tupleKey = t.pack(); + + loop { + printf("findEqual: Reading page %d @%lld\n", pageNumber, self->m_version); + Reference rawPage = wait(self->m_pager->getPhysicalPage(pageNumber)); + FixedSizeMap map = FixedSizeMap::decode(StringRef(rawPage->begin(), rawPage->size())); + + // Special case of empty page (which should only happen for root) + if(map.entries.empty()) { + ASSERT(pageNumber == self->m_root); + self->m_kv = Optional(); + return Void(); + } + + if(map.flags && EPageFlags::IS_LEAF) { + int i = map.findLastLessOrEqual(tupleKey); + if(i >= 0 && Tuple::unpack(map.entries[i].first).getString(0) == key) { + self->m_kv = Standalone(KeyValueRef(key, map.entries[i].second), self->m_arena); + } + else { + self->m_kv = Optional(); + } + return Void(); + } + else { + int i = map.findLastLessOrEqual(INTERNAL_PAGES_HAVE_TUPLES ? tupleKey : key); + i = std::max(i, 0); + pageNumber = *(uint32_t *)map.entries[i].second.data(); + } + } + } + + virtual Future findEqual(KeyRef key) { + return findEqual_impl(Reference::addRef(this), key); + } + }; +}; + +KeyValue randomKV() { + int kLen = g_random->randomInt(1, 10); + int vLen = g_random->randomInt(0, 5); + KeyValue kv; + kv.key = makeString(kLen, kv.arena()); + kv.value = makeString(vLen, kv.arena()); + for(int i = 0; i < kLen; ++i) + mutateString(kv.key)[i] = (uint8_t)g_random->randomInt('a', 'm'); + for(int i = 0; i < vLen; ++i) + mutateString(kv.value)[i] = (uint8_t)g_random->randomInt('n', 'z'); + return kv; +} + +TEST_CASE("/redwood/set") { + state IPager *pager = new MemoryPager(); + state VersionedBTree *btree = new VersionedBTree(pager, g_random->randomInt(50, 200)); + Void _ = wait(btree->init()); + + state std::map, std::string> written; + + Version lastVer = wait(btree->getLatestVersion()); + state Version version = lastVer + 1; + state int commits = g_random->randomInt(1, 20); + printf("Will do %d commits\n", commits); + while(commits--) { + int versions = g_random->randomInt(1, 20); + printf(" Commit will have %d versions\n", versions); + while(versions--) { + btree->setWriteVersion(version); + int changes = g_random->randomInt(0, 20); + printf(" Version will have %d changes\n", changes); + while(changes--) { + KeyValue kv = randomKV(); + printf(" Set '%s' -> '%s' @%lld\n", kv.key.toString().c_str(), kv.value.toString().c_str(), version); + btree->set(kv); + written[std::make_pair(kv.key.toString(), version)] = kv.value.toString(); + } + ++version; + } + Void _ = wait(btree->commit()); + + // Check that all writes can be read at their written versions + state std::map, std::string>::const_iterator i = written.cbegin(); + state std::map, std::string>::const_iterator iEnd = written.cend(); + state int errors = 0; + + printf("Checking changes committed thus far.\n"); + while(i != iEnd) { + state std::string key = i->first.first; + state Version ver = i->first.second; + state std::string val = i->second; + + state Reference cur = btree->readAtVersion(ver); + + Void _ = wait(cur->findEqual(i->first.first)); + + if(!(cur->isValid() && cur->getKey() == key && cur->getValue() == val)) { + ++errors; + if(!cur->isValid()) + printf("Verify failed: key_not_found: '%s' -> '%s' @%lld\n", key.c_str(), val.c_str(), ver); + else if(cur->getKey() != key) + printf("Verify failed: key_incorrect: found '%s' expected '%s' @%lld\n", cur->getKey().toString().c_str(), key.c_str(), ver); + else if(cur->getValue() != val) + printf("Verify failed: value_incorrect: for '%s' found '%s' expected '%s' @%lld\n", cur->getKey().toString().c_str(), cur->getValue().toString().c_str(), val.c_str(), ver); + } + ++i; + } + + printf("%d sets, %d errors\n", (int)written.size(), errors); + + if(errors != 0) + throw internal_error(); + } + + return Void(); +} + +std::string SimpleFixedSizeMapRef::toString() { + std::string result; + result.append(format("flags=0x%x data: ", flags)); + for(auto const &kv : entries) { + result.append(" "); + if(INTERNAL_PAGES_HAVE_TUPLES || flags && VersionedBTree::IS_LEAF) { + Tuple t = Tuple::unpack(kv.first); + result.append("["); + for(int i = 0; i < t.size(); ++i) { + if(i != 0) + result.append(","); + if(t.getType(i) == Tuple::ElementType::BYTES) + result.append(format("%s", t.getString(i).toString().c_str())); + if(t.getType(i) == Tuple::ElementType::INT) + result.append(format("%lld", t.getInt(i))); + } + } + else + result.append(format("'%s'", printable(StringRef(kv.first)).c_str())); + result.append("->"); + if(flags && VersionedBTree::IS_LEAF) + result.append(format("'%s'", printable(StringRef(kv.second)).c_str())); + else + result.append(format("%u", *(const uint32_t *)kv.second.data())); + result.append("]"); + } + + return result; +} diff --git a/fdbserver/fdbserver.vcxproj b/fdbserver/fdbserver.vcxproj index 853ca6d69a..dc42e18933 100644 --- a/fdbserver/fdbserver.vcxproj +++ b/fdbserver/fdbserver.vcxproj @@ -33,6 +33,7 @@ + @@ -45,6 +46,7 @@ + @@ -53,6 +55,7 @@ + @@ -150,12 +153,16 @@ + + + + diff --git a/fdbserver/fdbserver.vcxproj.filters b/fdbserver/fdbserver.vcxproj.filters index 4996e5a50b..e73252bc52 100644 --- a/fdbserver/fdbserver.vcxproj.filters +++ b/fdbserver/fdbserver.vcxproj.filters @@ -246,6 +246,9 @@ workloads + + + @@ -330,6 +333,10 @@ + + + + diff --git a/flow/ActorCollection.h b/flow/ActorCollection.h index 596429d425..b5685b5466 100644 --- a/flow/ActorCollection.h +++ b/flow/ActorCollection.h @@ -63,4 +63,44 @@ public: void clear( bool returnWhenEmptied ) { m_out.cancel(); m_out = actorCollection(m_add.getFuture(), NULL, NULL, NULL, NULL, returnWhenEmptied ); } }; +class SignalableActorCollection : NonCopyable { + PromiseStream> m_add; + Promise stopSignal; + Future m_out; + + void init() { + PromiseStream> addStream; + m_out = actorCollection(addStream.getFuture(), NULL, NULL, NULL, NULL, true); + m_add = addStream; + stopSignal = Promise(); + m_add.send(stopSignal.getFuture()); + } + +public: + explicit SignalableActorCollection() { + init(); + } + + Future signal() { + stopSignal.send(Void()); + Future result = holdWhile(m_add, m_out); + return result; + } + + Future signalAndReset() { + Future result = signal(); + clear(); + return result; + } + + Future signalAndCollapse() { + Future result = signalAndReset(); + add(result); + return result; + } + + void add(Future a) { m_add.send(a); } + void clear() { init(); } +}; + #endif \ No newline at end of file diff --git a/tests/RedwoodUnitTests.txt b/tests/RedwoodUnitTests.txt new file mode 100644 index 0000000000..fc6ee1394c --- /dev/null +++ b/tests/RedwoodUnitTests.txt @@ -0,0 +1,6 @@ +testTitle=UnitTests +testName=UnitTests +startDelay=0 +useDB=false +maxTestCases=0 +testsMatching=/redwood