Merge pull request #2346 from satherton/feature-redwood

Update Redwood
This commit is contained in:
Steve Atherton 2019-11-12 16:25:10 -08:00 committed by GitHub
commit 17059596e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 457 additions and 2995 deletions

View File

@ -24,8 +24,6 @@ set(FDBSERVER_SRCS
IKeyValueStore.h
IPager.h
IVersionedStore.h
IndirectShadowPager.actor.cpp
IndirectShadowPager.h
KeyValueStoreCompressTestData.actor.cpp
KeyValueStoreMemory.actor.cpp
KeyValueStoreSQLite.actor.cpp
@ -45,8 +43,6 @@ set(FDBSERVER_SRCS
MasterInterface.h
MasterProxyServer.actor.cpp
masterserver.actor.cpp
MemoryPager.actor.cpp
MemoryPager.h
MoveKeys.actor.cpp
MoveKeys.actor.h
networktest.actor.cpp

View File

@ -20,13 +20,89 @@
#pragma once
#include "fdbserver/PrefixTree.h"
#include "flow/flow.h"
#include "flow/Arena.h"
#include "fdbclient/FDBTypes.h"
#include "fdbserver/Knobs.h"
#include <string.h>
typedef uint64_t Word;
static inline int commonPrefixLength(uint8_t const* ap, uint8_t const* bp, int cl) {
int i = 0;
const int wordEnd = cl - sizeof(Word) + 1;
for(; i < wordEnd; i += sizeof(Word)) {
Word a = *(Word *)ap;
Word b = *(Word *)bp;
if(a != b) {
return i + ctzll(a ^ b) / 8;
}
ap += sizeof(Word);
bp += sizeof(Word);
}
for (; i < cl; i++) {
if (*ap != *bp) {
return i;
}
++ap;
++bp;
}
return cl;
}
static int commonPrefixLength(StringRef a, StringRef b) {
return commonPrefixLength(a.begin(), b.begin(), std::min(a.size(), b.size()));
}
// This appears to be the fastest version
static int lessOrEqualPowerOfTwo(int n) {
int p;
for (p = 1; p+p <= n; p+=p);
return p;
}
/*
static int _lessOrEqualPowerOfTwo(uint32_t n) {
if(n == 0)
return n;
int trailing = __builtin_ctz(n);
int leading = __builtin_clz(n);
if(trailing + leading == ((sizeof(n) * 8) - 1))
return n;
return 1 << ( (sizeof(n) * 8) - leading - 1);
}
static int __lessOrEqualPowerOfTwo(unsigned int n) {
int p = 1;
for(; p <= n; p <<= 1);
return p >> 1;
}
*/
static int perfectSubtreeSplitPoint(int subtree_size) {
// return the inorder index of the root node in a subtree of the given size
// consistent with the resulting binary search tree being "perfect" (having minimal height
// and all missing nodes as far right as possible).
// There has to be a simpler way to do this.
int s = lessOrEqualPowerOfTwo((subtree_size - 1) / 2 + 1) - 1;
return std::min(s * 2 + 1, subtree_size - s - 1);
}
static int perfectSubtreeSplitPointCached(int subtree_size) {
static uint16_t *points = nullptr;
static const int max = 500;
if(points == nullptr) {
points = new uint16_t[max];
for(int i = 0; i < max; ++i)
points[i] = perfectSubtreeSplitPoint(i);
}
if(subtree_size < max)
return points[subtree_size];
return perfectSubtreeSplitPoint(subtree_size);
}
// Delta Tree is a memory mappable binary tree of T objects such that each node's item is
// stored as a Delta which can reproduce the node's T item given the node's greatest
// lesser ancestor and the node's least greater ancestor.

View File

@ -53,8 +53,9 @@
#define VALGRIND_MAKE_MEM_DEFINED(x, y)
#endif
typedef uint32_t LogicalPageID; // uint64_t?
static const LogicalPageID invalidLogicalPageID = std::numeric_limits<LogicalPageID>::max();
typedef uint32_t LogicalPageID;
typedef uint32_t PhysicalPageID;
#define invalidLogicalPageID std::numeric_limits<LogicalPageID>::max()
class IPage {
public:
@ -85,12 +86,10 @@ public:
class IPagerSnapshot {
public:
virtual Future<Reference<const IPage>> getPhysicalPage(LogicalPageID pageID, bool cacheable) = 0;
virtual Future<Reference<const IPage>> getPhysicalPage(LogicalPageID pageID, bool cacheable, bool nohit) = 0;
virtual Version getVersion() const = 0;
virtual Key getMetaKey() const {
return Key();
}
virtual Key getMetaKey() const = 0;
virtual ~IPagerSnapshot() {}
@ -98,65 +97,7 @@ public:
virtual void delref() = 0;
};
class IPager : public IClosable {
public:
// Returns an IPage that can be passed to writePage. The data in the returned IPage might not be zeroed.
virtual Reference<IPage> newPageBuffer() = 0;
// Returns the usable size of pages returned by the pager (i.e. the size of the page that isn't pager overhead).
// For a given pager instance, separate calls to this function must return the same value.
virtual int getUsablePageSize() = 0;
virtual StorageBytes getStorageBytes() = 0;
// Permitted to fail (ASSERT) during recovery.
virtual Reference<IPagerSnapshot> getReadSnapshot(Version version) = 0;
// Returns an unused LogicalPageID.
// LogicalPageIDs in the range [0, SERVER_KNOBS->PAGER_RESERVED_PAGES) do not need to be allocated.
// Permitted to fail (ASSERT) during recovery.
virtual LogicalPageID allocateLogicalPage() = 0;
// Signals that the page will no longer be used as of the specified version. Versions prior to the specified version must be kept.
// Permitted to fail (ASSERT) during recovery.
virtual void freeLogicalPage(LogicalPageID pageID, Version version) = 0;
// Writes a page with the given LogicalPageID at the specified version. LogicalPageIDs in the range [0, SERVER_KNOBS->PAGER_RESERVED_PAGES)
// can be written without being allocated. All other LogicalPageIDs must be allocated using allocateLogicalPage before writing them.
//
// If updateVersion is 0, we are signalling to the pager that we are reusing the LogicalPageID entry at the current latest version of pageID.
//
// Otherwise, we will add a new entry for LogicalPageID at the specified version. In that case, updateVersion must be larger than any version
// written to this page previously, and it must be larger than any version committed. If referencePageID is given, the latest version of that
// page will be used for the write, which *can* be less than the latest committed version.
//
// Permitted to fail (ASSERT) during recovery.
virtual void writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion, LogicalPageID referencePageID = invalidLogicalPageID) = 0;
// Signals to the pager that no more reads will be performed in the range [begin, end).
// Permitted to fail (ASSERT) during recovery.
virtual void forgetVersions(Version begin, Version end) = 0;
// Makes durable all writes and any data structures used for recovery.
// Permitted to fail (ASSERT) during recovery.
virtual Future<Void> commit() = 0;
// Returns the latest version of the pager. Permitted to block until recovery is complete, at which point it should always be set immediately.
// Some functions in the IPager interface are permitted to fail (ASSERT) during recovery, so users should wait for getLatestVersion to complete
// before doing anything else.
virtual Future<Version> getLatestVersion() = 0;
// Sets the latest version of the pager. Must be monotonically increasing.
//
// Must be called prior to reading the specified version. SOMEDAY: It may be desirable in the future to relax this constraint for performance reasons.
//
// Permitted to fail (ASSERT) during recovery.
virtual void setLatestVersion(Version version) = 0;
protected:
~IPager() {} // Destruction should be done using close()/dispose() from the IClosable interface
};
// This API is probably customized to the behavior of DWALPager and probably needs some changes to be more generic.
class IPager2 : public IClosable {
public:
// Returns an IPage that can be passed to writePage. The data in the returned IPage might not be zeroed.
@ -189,7 +130,10 @@ public:
// The data returned will be the later of
// - the most recent committed atomic
// - the most recent non-atomic write
virtual Future<Reference<IPage>> readPage(LogicalPageID pageID, bool cacheable) = 0;
// Cacheable indicates that the page should be added to the page cache (if applicable?) as a result of this read.
// NoHit indicates that the read should not be considered a cache hit, such as when preloading pages that are
// considered likely to be needed soon.
virtual Future<Reference<IPage>> readPage(LogicalPageID pageID, bool cacheable = true, bool noHit = false) = 0;
// Get a snapshot of the metakey and all pages as of the version v which must be >= getOldestVersion()
// Note that snapshots at any version may still see the results of updatePage() calls.

View File

@ -30,10 +30,10 @@
class IStoreCursor {
public:
virtual Future<Void> findEqual(KeyRef key) = 0;
virtual Future<Void> findFirstEqualOrGreater(KeyRef key, bool needValue, int prefetchNextBytes) = 0;
virtual Future<Void> findLastLessOrEqual(KeyRef key, bool needValue, int prefetchPriorBytes) = 0;
virtual Future<Void> next(bool needValue) = 0;
virtual Future<Void> prev(bool needValue) = 0;
virtual Future<Void> findFirstEqualOrGreater(KeyRef key, int prefetchBytes = 0) = 0;
virtual Future<Void> findLastLessOrEqual(KeyRef key, int prefetchBytes = 0) = 0;
virtual Future<Void> next() = 0;
virtual Future<Void> prev() = 0;
virtual bool isValid() = 0;
virtual KeyRef getKey() = 0;
@ -41,8 +41,6 @@ public:
virtual void addref() = 0;
virtual void delref() = 0;
virtual std::string toString() const = 0;
};
class IVersionedStore : public IClosable {

View File

@ -1,960 +0,0 @@
/*
* IndirectShadowPager.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/IndirectShadowPager.h"
#include "fdbserver/Knobs.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h"
#include "fdbrpc/crc32c.h"
struct SumType {
bool operator==(const SumType &rhs) const { return crc == rhs.crc; }
uint32_t crc;
std::string toString() { return format("0x%08x", crc); }
};
bool checksum(IAsyncFile *file, uint8_t *page, int pageSize, LogicalPageID logical, PhysicalPageID physical, bool write) {
// Calculates and then stores or verifies the checksum at the end of the page.
// If write is true then the checksum is written into the page
// If write is false then the checksum is compared to the in-page sum and
// and error will be thrown if they do not match.
ASSERT(sizeof(SumType) == IndirectShadowPage::PAGE_OVERHEAD_BYTES);
// Adjust pageSize to refer to only usable storage bytes
pageSize -= IndirectShadowPage::PAGE_OVERHEAD_BYTES;
SumType sum;
SumType *pSumInPage = (SumType *)(page + pageSize);
// Write sum directly to page or to sum variable based on mode
SumType *sumOut = write ? pSumInPage : &sum;
sumOut->crc = crc32c_append(logical, page, pageSize);
VALGRIND_MAKE_MEM_DEFINED(sumOut, sizeof(SumType));
debug_printf("checksum %s%s logical %d physical %d size %d checksums page %s calculated %s data at %p %s\n",
write ? "write" : "read",
(!write && sum != *pSumInPage) ? " MISMATCH" : "",
logical, physical, pageSize,
write ? "NA" : pSumInPage->toString().c_str(),
sumOut->toString().c_str(), page, "");
// Verify if not in write mode
if(!write && sum != *pSumInPage) {
TraceEvent (SevError, "IndirectShadowPagerPageChecksumFailure")
.detail("UserPageSize", pageSize)
.detail("Filename", file->getFilename())
.detail("LogicalPage", logical)
.detail("PhysicalPage", physical)
.detail("ChecksumInPage", pSumInPage->toString())
.detail("ChecksumCalculated", sum.toString());
return false;
}
return true;
}
inline bool checksumRead(IAsyncFile *file, uint8_t *page, int pageSize, LogicalPageID logical, PhysicalPageID physical) {
return checksum(file, page, pageSize, logical, physical, false);
}
inline void checksumWrite(IAsyncFile *file, uint8_t *page, int pageSize, LogicalPageID logical, PhysicalPageID physical) {
checksum(file, page, pageSize, logical, physical, true);
}
IndirectShadowPage::IndirectShadowPage() : fastAllocated(true) {
data = (uint8_t*)FastAllocator<4096>::allocate();
}
IndirectShadowPage::~IndirectShadowPage() {
if(fastAllocated) {
FastAllocator<4096>::release(data);
}
else if(file) {
file->releaseZeroCopy(data, PAGE_BYTES, (int64_t) physicalPageID * PAGE_BYTES);
}
}
uint8_t const* IndirectShadowPage::begin() const {
return data;
}
uint8_t* IndirectShadowPage::mutate() {
return data;
}
int IndirectShadowPage::size() const {
return PAGE_BYTES - PAGE_OVERHEAD_BYTES;
}
const int IndirectShadowPage::PAGE_BYTES = 4096;
const int IndirectShadowPage::PAGE_OVERHEAD_BYTES = sizeof(SumType);
IndirectShadowPagerSnapshot::IndirectShadowPagerSnapshot(IndirectShadowPager *pager, Version version)
: pager(pager), version(version), pagerError(pager->getError())
{
}
Future<Reference<const IPage>> IndirectShadowPagerSnapshot::getPhysicalPage(LogicalPageID pageID, bool cacheable) {
if(pagerError.isReady())
pagerError.get();
return pager->getPage(Reference<IndirectShadowPagerSnapshot>::addRef(this), pageID, version);
}
template <class T>
T bigEndian(T val) {
static_assert(sizeof(T) <= 8, "Can't compute bigEndian on integers larger than 8 bytes");
uint64_t b = bigEndian64(val);
return *(T*)((uint8_t*)&b+8-sizeof(T));
}
ACTOR Future<Void> recover(IndirectShadowPager *pager) {
try {
TraceEvent("PagerRecovering").detail("Filename", pager->pageFileName);
pager->pageTableLog = keyValueStoreMemory(pager->basename, UID(), 1e9, "pagerlog");
// TODO: this can be done synchronously with the log recovery
int64_t flags = IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_LOCK;
state bool exists = fileExists(pager->pageFileName);
if(!exists) {
flags |= IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE;
}
Reference<IAsyncFile> dataFile = wait(IAsyncFileSystem::filesystem()->open(pager->pageFileName, flags, 0600));
pager->dataFile = dataFile;
TraceEvent("PagerOpenedDataFile").detail("Filename", pager->pageFileName);
if(!exists) {
wait(pager->dataFile->sync());
}
TraceEvent("PagerSyncdDataFile").detail("Filename", pager->pageFileName);
state int64_t fileSize = wait(pager->dataFile->size());
TraceEvent("PagerGotFileSize").detail("Size", fileSize).detail("Filename", pager->pageFileName);
if(fileSize > 0) {
TraceEvent("PagerRecoveringFromLogs").detail("Filename", pager->pageFileName);
Optional<Value> pagesAllocatedValue = wait(pager->pageTableLog->readValue(IndirectShadowPager::PAGES_ALLOCATED_KEY));
if(pagesAllocatedValue.present()) {
BinaryReader pr(pagesAllocatedValue.get(), Unversioned());
uint32_t pagesAllocated;
pr >> pagesAllocated;
pager->pagerFile.init(fileSize, pagesAllocated);
debug_printf("%s: Recovered pages allocated: %d\n", pager->pageFileName.c_str(), pager->pagerFile.pagesAllocated);
ASSERT(pager->pagerFile.pagesAllocated != PagerFile::INVALID_PAGE);
Optional<Value> latestVersionValue = wait(pager->pageTableLog->readValue(IndirectShadowPager::LATEST_VERSION_KEY));
ASSERT(latestVersionValue.present());
BinaryReader vr(latestVersionValue.get(), Unversioned());
vr >> pager->latestVersion;
Optional<Value> oldestVersionValue = wait(pager->pageTableLog->readValue(IndirectShadowPager::OLDEST_VERSION_KEY));
if(oldestVersionValue.present()) {
BinaryReader vr(oldestVersionValue.get(), Unversioned());
vr >> pager->oldestVersion;
}
debug_printf("%s: Recovered version info: earliest v%lld latest v%lld\n", pager->pageFileName.c_str(), pager->oldestVersion, pager->latestVersion);
pager->committedVersion = pager->latestVersion;
Standalone<VectorRef<KeyValueRef>> tableEntries = wait(pager->pageTableLog->readRange(KeyRangeRef(IndirectShadowPager::TABLE_ENTRY_PREFIX, strinc(IndirectShadowPager::TABLE_ENTRY_PREFIX))));
if(tableEntries.size() > 0) {
BinaryReader kr(tableEntries.back().key, Unversioned());
uint8_t prefix;
LogicalPageID logicalPageID;
kr >> prefix;
ASSERT(prefix == IndirectShadowPager::TABLE_ENTRY_PREFIX.begin()[0]);
kr >> logicalPageID;
logicalPageID = bigEndian(logicalPageID);
LogicalPageID pageTableSize = std::max<LogicalPageID>(logicalPageID+1, SERVER_KNOBS->PAGER_RESERVED_PAGES);
pager->pageTable.resize(pageTableSize);
debug_printf("%s: Recovered page table size: %d\n", pager->pageFileName.c_str(), pageTableSize);
}
else {
debug_printf("%s: Recovered no page table entries\n", pager->pageFileName.c_str());
}
LogicalPageID nextPageID = SERVER_KNOBS->PAGER_RESERVED_PAGES;
std::set<PhysicalPageID> allocatedPhysicalPages;
for(auto entry : tableEntries) {
BinaryReader kr(entry.key, Unversioned());
BinaryReader vr(entry.value, Unversioned());
uint8_t prefix;
LogicalPageID logicalPageID;
Version version;
PhysicalPageID physicalPageID;
kr >> prefix;
ASSERT(prefix == IndirectShadowPager::TABLE_ENTRY_PREFIX.begin()[0]);
kr >> logicalPageID;
logicalPageID = bigEndian(logicalPageID);
kr >> version;
version = bigEndian(version);
vr >> physicalPageID;
ASSERT(version <= pager->latestVersion);
pager->pageTable[logicalPageID].push_back(std::make_pair(version, physicalPageID));
if(physicalPageID != PagerFile::INVALID_PAGE) {
allocatedPhysicalPages.insert(physicalPageID);
pager->pagerFile.markPageAllocated(logicalPageID, version, physicalPageID);
}
while(nextPageID < logicalPageID) {
pager->logicalFreeList.push_back(nextPageID++);
}
if(logicalPageID == nextPageID) {
++nextPageID;
}
debug_printf("%s: Recovered page table entry logical %d -> (v%lld, physical %d)\n", pager->pageFileName.c_str(), logicalPageID, version, physicalPageID);
}
debug_printf("%s: Building physical free list\n", pager->pageFileName.c_str());
// TODO: can we do this better? does it require storing extra info in the log?
PhysicalPageID nextPhysicalPageID = 0;
for(auto itr = allocatedPhysicalPages.begin(); itr != allocatedPhysicalPages.end(); ++itr) {
while(nextPhysicalPageID < *itr) {
pager->pagerFile.freePage(nextPhysicalPageID++);
}
++nextPhysicalPageID;
}
while(nextPhysicalPageID < pager->pagerFile.pagesAllocated) {
pager->pagerFile.freePage(nextPhysicalPageID++);
}
}
}
if(pager->pageTable.size() < SERVER_KNOBS->PAGER_RESERVED_PAGES) {
pager->pageTable.resize(SERVER_KNOBS->PAGER_RESERVED_PAGES);
}
pager->pagerFile.finishedMarkingPages();
pager->pagerFile.startVacuuming();
debug_printf("%s: Finished recovery at v%lld\n", pager->pageFileName.c_str(), pager->latestVersion);
TraceEvent("PagerFinishedRecovery").detail("LatestVersion", pager->latestVersion).detail("OldestVersion", pager->oldestVersion).detail("Filename", pager->pageFileName);
}
catch(Error &e) {
if(e.code() != error_code_actor_cancelled) {
TraceEvent(SevError, "PagerRecoveryFailed").error(e, true).detail("Filename", pager->pageFileName);
}
throw;
}
return Void();
}
ACTOR Future<Void> housekeeper(IndirectShadowPager *pager) {
wait(pager->recovery);
wait(Never());
loop {
state LogicalPageID pageID = 0;
for(; pageID < pager->pageTable.size(); ++pageID) {
// TODO: pick an appropriate rate for this loop and determine the right way to implement it
// Right now, this delays 10ms every 400K pages, which means we have 1s of delay for every
// 40M pages. In total, we introduce 100s delay for a max size 4B page file.
if(pageID % 400000 == 0) {
wait(delay(0.01));
}
else {
wait(yield());
}
auto& pageVersionMap = pager->pageTable[pageID];
if(pageVersionMap.size() > 0) {
auto itr = pageVersionMap.begin();
for(auto prev = itr; prev != pageVersionMap.end() && prev->first < pager->oldestVersion; prev=itr) {
pager->pagerFile.markPageAllocated(pageID, itr->first, itr->second);
++itr;
if(prev->second != PagerFile::INVALID_PAGE && (itr == pageVersionMap.end() || itr->first <= pager->oldestVersion)) {
pager->freePhysicalPageID(prev->second);
}
if(itr == pageVersionMap.end() || itr->first >= pager->oldestVersion) {
debug_printf("%s: Updating oldest version for logical %u: v%lld\n", pager->pageFileName.c_str(), pageID, pager->oldestVersion);
pager->logPageTableClear(pageID, 0, pager->oldestVersion);
if(itr != pageVersionMap.end() && itr->first > pager->oldestVersion) {
debug_printf("%s: Erasing pages to prev from pageVersionMap for %d (itr=%lld, prev=%lld)\n", pager->pageFileName.c_str(), pageID, itr->first, prev->first);
prev->first = pager->oldestVersion;
pager->logPageTableUpdate(pageID, pager->oldestVersion, prev->second);
itr = pageVersionMap.erase(pageVersionMap.begin(), prev);
}
else {
debug_printf("%s: Erasing pages to itr from pageVersionMap for %d (%d) (itr=%lld, prev=%lld)\n", pager->pageFileName.c_str(), pageID, itr == pageVersionMap.end(), itr==pageVersionMap.end() ? -1 : itr->first, prev->first);
itr = pageVersionMap.erase(pageVersionMap.begin(), itr);
}
}
}
for(; itr != pageVersionMap.end(); ++itr) {
pager->pagerFile.markPageAllocated(pageID, itr->first, itr->second);
}
if(pageVersionMap.size() == 0) {
pager->freeLogicalPageID(pageID);
}
}
}
pager->pagerFile.finishedMarkingPages();
}
}
ACTOR Future<Void> forwardError(Future<Void> f, Promise<Void> target) {
try {
wait(f);
}
catch(Error &e) {
if(e.code() != error_code_actor_cancelled && target.canBeSet()) {
target.sendError(e);
}
throw e;
}
return Void();
}
IndirectShadowPager::IndirectShadowPager(std::string basename)
: basename(basename), latestVersion(0), committedVersion(0), committing(Void()), oldestVersion(0), pagerFile(this)
{
pageFileName = basename;
recovery = forwardError(recover(this), errorPromise);
housekeeping = forwardError(housekeeper(this), errorPromise);
}
StorageBytes IndirectShadowPager::getStorageBytes() {
int64_t free;
int64_t total;
g_network->getDiskBytes(parentDirectory(basename), free, total);
return StorageBytes(free, total, pagerFile.size(), free + IndirectShadowPage::PAGE_BYTES * pagerFile.getFreePages());
}
Reference<IPage> IndirectShadowPager::newPageBuffer() {
return Reference<IPage>(new IndirectShadowPage());
}
int IndirectShadowPager::getUsablePageSize() {
return IndirectShadowPage::PAGE_BYTES - IndirectShadowPage::PAGE_OVERHEAD_BYTES;
}
Reference<IPagerSnapshot> IndirectShadowPager::getReadSnapshot(Version version) {
debug_printf("%s: Getting read snapshot v%lld latest v%lld oldest v%lld\n", pageFileName.c_str(), version, latestVersion, oldestVersion);
ASSERT(recovery.isReady());
ASSERT(version <= latestVersion);
ASSERT(version >= oldestVersion);
return Reference<IPagerSnapshot>(new IndirectShadowPagerSnapshot(this, version));
}
LogicalPageID IndirectShadowPager::allocateLogicalPage() {
ASSERT(recovery.isReady());
LogicalPageID allocatedPage;
if(logicalFreeList.size() > 0) {
allocatedPage = logicalFreeList.front();
logicalFreeList.pop_front();
}
else {
ASSERT(pageTable.size() < std::numeric_limits<LogicalPageID>::max()); // TODO: different error?
allocatedPage = pageTable.size();
pageTable.push_back(PageVersionMap());
}
ASSERT(allocatedPage >= SERVER_KNOBS->PAGER_RESERVED_PAGES);
debug_printf("%s: op=allocate id=%u\n", pageFileName.c_str(), allocatedPage);
return allocatedPage;
}
void IndirectShadowPager::freeLogicalPage(LogicalPageID pageID, Version version) {
ASSERT(recovery.isReady());
ASSERT(committing.isReady());
ASSERT(pageID < pageTable.size());
PageVersionMap &pageVersionMap = pageTable[pageID];
ASSERT(!pageVersionMap.empty());
// 0 will mean delete as of latest version, similar to write at latest version
if(version == 0) {
version = pageVersionMap.back().first;
}
auto itr = pageVersionMapLowerBound(pageVersionMap, version);
// TODO: Is this correct, that versions from the past *forward* can be deleted?
for(auto i = itr; i != pageVersionMap.end(); ++i) {
freePhysicalPageID(i->second);
}
if(itr != pageVersionMap.end()) {
debug_printf("%s: Clearing newest versions for logical %u: v%lld\n", pageFileName.c_str(), pageID, version);
logPageTableClearToEnd(pageID, version);
pageVersionMap.erase(itr, pageVersionMap.end());
}
if(pageVersionMap.size() == 0) {
debug_printf("%s: Freeing logical %u (freeLogicalPage)\n", pageFileName.c_str(), pageID);
logicalFreeList.push_back(pageID);
}
else if(pageVersionMap.back().second != PagerFile::INVALID_PAGE) {
pageVersionMap.push_back(std::make_pair(version, PagerFile::INVALID_PAGE));
logPageTableUpdate(pageID, version, PagerFile::INVALID_PAGE);
}
}
ACTOR Future<Void> waitAndFreePhysicalPageID(IndirectShadowPager *pager, PhysicalPageID pageID, Future<Void> canFree) {
wait(canFree);
pager->pagerFile.freePage(pageID);
return Void();
}
// TODO: Freeing physical pages must be done *after* committing the page map changes that cause the physical page to no longer be used.
// Otherwise, the physical page could be reused by a write followed by a power loss in which case the mapping change would not
// have been committed and so the physical page should still contain its previous data but it's been overwritten.
void IndirectShadowPager::freePhysicalPageID(PhysicalPageID pageID) {
debug_printf("%s: Freeing physical %u\n", pageFileName.c_str(), pageID);
pagerFile.freePage(pageID);
}
void IndirectShadowPager::writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion, LogicalPageID referencePageID) {
ASSERT(recovery.isReady());
ASSERT(committing.isReady());
ASSERT(updateVersion > latestVersion || updateVersion == 0);
ASSERT(pageID < pageTable.size());
PageVersionMap &pageVersionMap = pageTable[pageID];
ASSERT(pageVersionMap.empty() || pageVersionMap.back().second != PagerFile::INVALID_PAGE);
// TODO: should this be conditional on the write succeeding?
bool updateExisting = updateVersion == 0;
if(updateExisting) {
// If there is no existing latest version to update then there must be a referencePageID from which to get a latest version
// so get that version and change this to a normal update
if(pageVersionMap.empty()) {
ASSERT(referencePageID != invalidLogicalPageID);
PageVersionMap &rpv = pageTable[referencePageID];
ASSERT(!rpv.empty());
updateVersion = rpv.back().first;
updateExisting = false;
}
else {
ASSERT(pageVersionMap.size());
updateVersion = pageVersionMap.back().first;
}
}
PhysicalPageID physicalPageID = pagerFile.allocatePage(pageID, updateVersion);
debug_printf("%s: Writing logical %d v%lld physical %d\n", pageFileName.c_str(), pageID, updateVersion, physicalPageID);
if(updateExisting) {
// TODO: Physical page cannot be freed now, it must be done after the page mapping change above is committed
//freePhysicalPageID(pageVersionMap.back().second);
pageVersionMap.back().second = physicalPageID;
}
else {
ASSERT(pageVersionMap.empty() || pageVersionMap.back().first < updateVersion);
pageVersionMap.push_back(std::make_pair(updateVersion, physicalPageID));
}
logPageTableUpdate(pageID, updateVersion, physicalPageID);
checksumWrite(dataFile.getPtr(), contents->mutate(), IndirectShadowPage::PAGE_BYTES, pageID, physicalPageID);
Future<Void> write = holdWhile(contents, dataFile->write(contents->begin(), IndirectShadowPage::PAGE_BYTES, (int64_t) physicalPageID * IndirectShadowPage::PAGE_BYTES));
if(write.isError()) {
if(errorPromise.canBeSet()) {
errorPromise.sendError(write.getError());
}
throw write.getError();
}
writeActors.add(forwardError(write, errorPromise));
}
void IndirectShadowPager::forgetVersions(Version begin, Version end) {
ASSERT(recovery.isReady());
ASSERT(begin <= end);
ASSERT(end <= latestVersion);
// TODO: support forgetting arbitrary ranges
if(begin <= oldestVersion) {
oldestVersion = std::max(end, oldestVersion);
logVersion(OLDEST_VERSION_KEY, oldestVersion);
}
}
ACTOR Future<Void> commitImpl(IndirectShadowPager *pager, Future<Void> previousCommit) {
state Future<Void> outstandingWrites = pager->writeActors.signalAndCollapse();
state Version commitVersion = pager->latestVersion;
wait(previousCommit);
pager->logVersion(IndirectShadowPager::LATEST_VERSION_KEY, commitVersion);
// TODO: we need to prevent writes that happen now from being committed in the subsequent log commit
// This is probably best done once we have better control of the log, where we can write a commit entry
// here without syncing the file.
wait(outstandingWrites);
wait(pager->dataFile->sync());
wait(pager->pageTableLog->commit());
pager->committedVersion = std::max(pager->committedVersion, commitVersion);
return Void();
}
Future<Void> IndirectShadowPager::commit() {
ASSERT(recovery.isReady());
Future<Void> f = commitImpl(this, committing);
committing = f;
return committing;
}
void IndirectShadowPager::setLatestVersion(Version version) {
ASSERT(recovery.isReady());
latestVersion = version;
}
ACTOR Future<Version> getLatestVersionImpl(IndirectShadowPager *pager) {
wait(pager->recovery);
return pager->latestVersion;
}
Future<Version> IndirectShadowPager::getLatestVersion() {
return getLatestVersionImpl(this);
}
Future<Void> IndirectShadowPager::getError() {
return errorPromise.getFuture();
}
Future<Void> IndirectShadowPager::onClosed() {
return closed.getFuture();
}
ACTOR void shutdown(IndirectShadowPager *pager, bool dispose) {
if(pager->errorPromise.canBeSet())
pager->errorPromise.sendError(actor_cancelled()); // Ideally this should be shutdown_in_progress
// Cancel all outstanding reads
auto i = pager->busyPages.begin();
auto iEnd = pager->busyPages.end();
while(i != iEnd) {
// Advance before calling cancel as the rawRead cancel will destroy the map entry it lives in
(i++)->second.read.cancel();
}
ASSERT(pager->busyPages.empty());
wait(ready(pager->writeActors.signal()));
wait(ready(pager->operations.signal()));
wait(ready(pager->committing));
pager->housekeeping.cancel();
pager->pagerFile.shutdown();
state Future<Void> pageTableClosed = pager->pageTableLog->onClosed();
if(dispose) {
wait(ready(IAsyncFileSystem::filesystem()->deleteFile(pager->pageFileName, true)));
pager->pageTableLog->dispose();
}
else {
pager->pageTableLog->close();
}
wait(ready(pageTableClosed));
pager->closed.send(Void());
delete pager;
}
void IndirectShadowPager::dispose() {
shutdown(this, true);
}
void IndirectShadowPager::close() {
shutdown(this, false);
}
ACTOR Future<Reference<const IPage>> rawRead(IndirectShadowPager *pager, LogicalPageID logicalPageID, PhysicalPageID physicalPageID) {
state void *data;
state int len = IndirectShadowPage::PAGE_BYTES;
state bool readSuccess = false;
try {
wait(pager->dataFile->readZeroCopy(&data, &len, (int64_t) physicalPageID * IndirectShadowPage::PAGE_BYTES));
readSuccess = true;
if(!checksumRead(pager->dataFile.getPtr(), (uint8_t *)data, len, logicalPageID, physicalPageID)) {
throw checksum_failed();
}
pager->busyPages.erase(physicalPageID);
return Reference<const IPage>(new IndirectShadowPage((uint8_t *)data, pager->dataFile, physicalPageID));
}
catch(Error &e) {
pager->busyPages.erase(physicalPageID);
if(readSuccess || e.code() == error_code_actor_cancelled) {
pager->dataFile->releaseZeroCopy(data, len, (int64_t) physicalPageID * IndirectShadowPage::PAGE_BYTES);
}
throw;
}
}
Future<Reference<const IPage>> getPageImpl(IndirectShadowPager *pager, Reference<IndirectShadowPagerSnapshot> snapshot, LogicalPageID logicalPageID, Version version) {
ASSERT(logicalPageID < pager->pageTable.size());
PageVersionMap &pageVersionMap = pager->pageTable[logicalPageID];
auto itr = IndirectShadowPager::pageVersionMapUpperBound(pageVersionMap, version);
if(itr == pageVersionMap.begin()) {
debug_printf("%s: Page version map empty! op=error id=%u @%lld\n", pager->pageFileName.c_str(), logicalPageID, version);
ASSERT(false);
}
--itr;
PhysicalPageID physicalPageID = itr->second;
ASSERT(physicalPageID != PagerFile::INVALID_PAGE);
debug_printf("%s: Reading logical %d v%lld physical %d mapSize %lu\n", pager->pageFileName.c_str(), logicalPageID, version, physicalPageID, pageVersionMap.size());
IndirectShadowPager::BusyPage &bp = pager->busyPages[physicalPageID];
if(!bp.read.isValid()) {
Future<Reference<const IPage>> get = rawRead(pager, logicalPageID, physicalPageID);
if(!get.isReady()) {
bp.read = get;
}
return get;
}
return bp.read;
}
Future<Reference<const IPage>> IndirectShadowPager::getPage(Reference<IndirectShadowPagerSnapshot> snapshot, LogicalPageID pageID, Version version) {
if(!recovery.isReady()) {
debug_printf("%s: getPage failure, recovery not ready - op=error id=%u @%lld\n", pageFileName.c_str(), pageID, version);
ASSERT(false);
}
Future<Reference<const IPage>> f = getPageImpl(this, snapshot, pageID, version);
operations.add(forwardError(ready(f), errorPromise)); // For some reason if success is ready() then shutdown hangs when waiting on operations
return f;
}
PageVersionMap::iterator IndirectShadowPager::pageVersionMapLowerBound(PageVersionMap &pageVersionMap, Version version) {
return std::lower_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](std::pair<Version, PhysicalPageID> p, Version v) {
return p.first < v;
});
}
PageVersionMap::iterator IndirectShadowPager::pageVersionMapUpperBound(PageVersionMap &pageVersionMap, Version version) {
return std::upper_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](Version v, std::pair<Version, PhysicalPageID> p) {
return v < p.first;
});
}
void IndirectShadowPager::freeLogicalPageID(LogicalPageID pageID) {
if(pageID >= SERVER_KNOBS->PAGER_RESERVED_PAGES) {
debug_printf("%s: Freeing logical %u\n", pageFileName.c_str(), pageID);
logicalFreeList.push_back(pageID);
}
}
void IndirectShadowPager::logVersion(StringRef versionKey, Version version) {
BinaryWriter v(Unversioned());
v << version;
pageTableLog->set(KeyValueRef(versionKey, v.toValue()));
}
void IndirectShadowPager::logPagesAllocated() {
BinaryWriter v(Unversioned());
v << pagerFile.getPagesAllocated();
pageTableLog->set(KeyValueRef(PAGES_ALLOCATED_KEY, v.toValue()));
}
void IndirectShadowPager::logPageTableUpdate(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID) {
BinaryWriter k(Unversioned());
k << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(version);
BinaryWriter v(Unversioned());
v << physicalPageID;
pageTableLog->set(KeyValueRef(k.toValue(), v.toValue()));
}
void IndirectShadowPager::logPageTableClearToEnd(LogicalPageID logicalPageID, Version start) {
BinaryWriter b(Unversioned());
b << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(start);
BinaryWriter e(Unversioned());
e << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID);
pageTableLog->clear(KeyRangeRef(b.toValue(), strinc(e.toValue())));
}
void IndirectShadowPager::logPageTableClear(LogicalPageID logicalPageID, Version start, Version end) {
BinaryWriter b(Unversioned());
b << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(start);
BinaryWriter e(Unversioned());
e << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(end);
pageTableLog->clear(KeyRangeRef(b.toValue(), e.toValue()));
}
const StringRef IndirectShadowPager::LATEST_VERSION_KEY = LiteralStringRef("\xff/LatestVersion");
const StringRef IndirectShadowPager::OLDEST_VERSION_KEY = LiteralStringRef("\xff/OldestVersion");
const StringRef IndirectShadowPager::PAGES_ALLOCATED_KEY = LiteralStringRef("\xff/PagesAllocated");
const StringRef IndirectShadowPager::TABLE_ENTRY_PREFIX = LiteralStringRef("\x00");
ACTOR Future<Void> copyPage(IndirectShadowPager *pager, Reference<IPage> page, LogicalPageID logical, PhysicalPageID from, PhysicalPageID to) {
state bool zeroCopied = true;
state int bytes = IndirectShadowPage::PAGE_BYTES;
state void *data = nullptr;
try {
try {
wait(pager->dataFile->readZeroCopy(&data, &bytes, (int64_t)from * IndirectShadowPage::PAGE_BYTES));
}
catch(Error &e) {
zeroCopied = false;
data = page->mutate();
int _bytes = wait(pager->dataFile->read(data, page->size(), (int64_t)from * IndirectShadowPage::PAGE_BYTES));
bytes = _bytes;
}
ASSERT(bytes == IndirectShadowPage::PAGE_BYTES);
checksumWrite(pager->dataFile.getPtr(), page->mutate(), bytes, logical, to);
wait(pager->dataFile->write(data, bytes, (int64_t)to * IndirectShadowPage::PAGE_BYTES));
if(zeroCopied) {
pager->dataFile->releaseZeroCopy(data, bytes, (int64_t)from * IndirectShadowPage::PAGE_BYTES);
}
}
catch(Error &e) {
if(zeroCopied) {
pager->dataFile->releaseZeroCopy(data, bytes, (int64_t)from * IndirectShadowPage::PAGE_BYTES);
}
pager->pagerFile.freePage(to);
throw e;
}
return Void();
}
ACTOR Future<Void> vacuumer(IndirectShadowPager *pager, PagerFile *pagerFile) {
state Reference<IPage> page(new IndirectShadowPage());
loop {
state double start = now();
while(!pagerFile->canVacuum()) {
wait(delay(1.0));
}
ASSERT(!pagerFile->freePages.empty());
if(!pagerFile->vacuumQueue.empty()) {
state PhysicalPageID lastUsedPage = pagerFile->vacuumQueue.rbegin()->first;
PhysicalPageID lastFreePage = *pagerFile->freePages.rbegin();
debug_printf("%s: Vacuuming: evaluating (free list size=%lu, lastFreePage=%u, lastUsedPage=%u, pagesAllocated=%u)\n", pager->pageFileName.c_str(), pagerFile->freePages.size(), lastFreePage, lastUsedPage, pagerFile->pagesAllocated);
ASSERT(lastFreePage < pagerFile->pagesAllocated);
ASSERT(lastUsedPage < pagerFile->pagesAllocated);
ASSERT(lastFreePage != lastUsedPage);
if(lastFreePage < lastUsedPage) {
state std::pair<LogicalPageID, Version> logicalPageInfo = pagerFile->vacuumQueue[lastUsedPage];
state PhysicalPageID newPage = pagerFile->allocatePage(logicalPageInfo.first, logicalPageInfo.second);
debug_printf("%s: Vacuuming: copying page %u to %u\n", pager->pageFileName.c_str(), lastUsedPage, newPage);
wait(copyPage(pager, page, logicalPageInfo.first, lastUsedPage, newPage));
auto &pageVersionMap = pager->pageTable[logicalPageInfo.first];
auto itr = IndirectShadowPager::pageVersionMapLowerBound(pageVersionMap, logicalPageInfo.second);
if(itr != pageVersionMap.end() && itr->second == lastUsedPage) {
itr->second = newPage;
pager->logPageTableUpdate(logicalPageInfo.first, itr->first, newPage);
pagerFile->freePage(lastUsedPage);
}
else {
TEST(true); // page was freed while vacuuming
pagerFile->freePage(newPage);
}
}
}
PhysicalPageID firstFreePage = pagerFile->vacuumQueue.empty() ? pagerFile->minVacuumQueuePage : (pagerFile->vacuumQueue.rbegin()->first + 1);
ASSERT(pagerFile->pagesAllocated >= firstFreePage);
uint64_t pagesToErase = 0;
if(pagerFile->freePages.size() >= SERVER_KNOBS->FREE_PAGE_VACUUM_THRESHOLD) {
pagesToErase = std::min<uint64_t>(pagerFile->freePages.size() - SERVER_KNOBS->FREE_PAGE_VACUUM_THRESHOLD + 1, pagerFile->pagesAllocated - firstFreePage);
}
debug_printf("%s: Vacuuming: got %llu pages to erase (freePages=%lu, pagesAllocated=%u, vacuumQueueEmpty=%u, minVacuumQueuePage=%u, firstFreePage=%u)\n", pager->pageFileName.c_str(), pagesToErase, pagerFile->freePages.size(), pagerFile->pagesAllocated, pagerFile->vacuumQueue.empty(), pagerFile->minVacuumQueuePage, firstFreePage);
if(pagesToErase > 0) {
PhysicalPageID eraseStartPage = pagerFile->pagesAllocated - pagesToErase;
debug_printf("%s: Vacuuming: truncating last %llu pages starting at %u\n", pager->pageFileName.c_str(), pagesToErase, eraseStartPage);
ASSERT(pagesToErase <= pagerFile->pagesAllocated);
pagerFile->pagesAllocated = eraseStartPage;
pager->logPagesAllocated();
auto freePageItr = pagerFile->freePages.find(eraseStartPage);
ASSERT(freePageItr != pagerFile->freePages.end());
pagerFile->freePages.erase(freePageItr, pagerFile->freePages.end());
ASSERT(pagerFile->vacuumQueue.empty() || pagerFile->vacuumQueue.rbegin()->first < eraseStartPage);
wait(pager->dataFile->truncate((int64_t)pagerFile->pagesAllocated * IndirectShadowPage::PAGE_BYTES));
}
wait(delayUntil(start + (double)IndirectShadowPage::PAGE_BYTES / SERVER_KNOBS->VACUUM_BYTES_PER_SECOND)); // TODO: figure out the correct mechanism here
}
}
PagerFile::PagerFile(IndirectShadowPager *pager) : fileSize(0), pagesAllocated(0), pager(pager), vacuumQueueReady(false), minVacuumQueuePage(0) {}
PhysicalPageID PagerFile::allocatePage(LogicalPageID logicalPageID, Version version) {
ASSERT((int64_t)pagesAllocated * IndirectShadowPage::PAGE_BYTES <= fileSize);
ASSERT(fileSize % IndirectShadowPage::PAGE_BYTES == 0);
PhysicalPageID allocatedPage;
if(!freePages.empty()) {
allocatedPage = *freePages.begin();
freePages.erase(freePages.begin());
}
else {
if((int64_t)pagesAllocated * IndirectShadowPage::PAGE_BYTES == fileSize) {
fileSize += (1 << 24);
// TODO: extend the file before writing beyond the end.
}
ASSERT(pagesAllocated < INVALID_PAGE); // TODO: we should throw a better error here
allocatedPage = pagesAllocated++;
pager->logPagesAllocated();
}
markPageAllocated(logicalPageID, version, allocatedPage);
debug_printf("%s: Allocated physical %u\n", pager->pageFileName.c_str(), allocatedPage);
return allocatedPage;
}
void PagerFile::freePage(PhysicalPageID pageID) {
freePages.insert(pageID);
if(pageID >= minVacuumQueuePage) {
vacuumQueue.erase(pageID);
}
}
void PagerFile::markPageAllocated(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID) {
if(physicalPageID != INVALID_PAGE && physicalPageID >= minVacuumQueuePage) {
vacuumQueue[physicalPageID] = std::make_pair(logicalPageID, version);
}
}
void PagerFile::finishedMarkingPages() {
if(minVacuumQueuePage >= pagesAllocated) {
minVacuumQueuePage = pagesAllocated >= SERVER_KNOBS->VACUUM_QUEUE_SIZE ? pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE : 0;
vacuumQueueReady = false;
}
else {
if(!vacuumQueueReady) {
vacuumQueueReady = true;
}
if(pagesAllocated > SERVER_KNOBS->VACUUM_QUEUE_SIZE && minVacuumQueuePage < pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE) {
minVacuumQueuePage = pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE;
auto itr = vacuumQueue.lower_bound(minVacuumQueuePage);
vacuumQueue.erase(vacuumQueue.begin(), itr);
}
}
}
uint64_t PagerFile::size() {
return fileSize;
}
uint32_t PagerFile::getPagesAllocated() {
return pagesAllocated;
}
uint32_t PagerFile::getFreePages() {
return freePages.size();
}
void PagerFile::init(uint64_t fileSize, uint32_t pagesAllocated) {
this->fileSize = fileSize;
this->pagesAllocated = pagesAllocated;
this->minVacuumQueuePage = pagesAllocated >= SERVER_KNOBS->VACUUM_QUEUE_SIZE ? pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE : 0;
}
void PagerFile::startVacuuming() {
vacuuming = Never(); //vacuumer(pager, this);
}
void PagerFile::shutdown() {
vacuuming.cancel();
}
bool PagerFile::canVacuum() {
if(freePages.size() < SERVER_KNOBS->FREE_PAGE_VACUUM_THRESHOLD // Not enough free pages
|| minVacuumQueuePage >= pagesAllocated // We finished processing all pages in the vacuum queue
|| !vacuumQueueReady) // Populating vacuum queue
{
debug_printf("%s: Vacuuming: waiting for vacuumable pages (free list size=%lu, minVacuumQueuePage=%u, pages allocated=%u, vacuumQueueReady=%d)\n", pager->pageFileName.c_str(), freePages.size(), minVacuumQueuePage, pagesAllocated, vacuumQueueReady);
return false;
}
return true;
}
const PhysicalPageID PagerFile::INVALID_PAGE = std::numeric_limits<PhysicalPageID>::max();
extern Future<Void> simplePagerTest(IPager* const& pager);
TEST_CASE("/fdbserver/indirectshadowpager/simple") {
state IPager *pager = new IndirectShadowPager("unittest_pageFile");
wait(simplePagerTest(pager));
Future<Void> closedFuture = pager->onClosed();
pager->close();
wait(closedFuture);
return Void();
}

View File

@ -1,215 +0,0 @@
/*
* IndirectShadowPager.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBSERVER_INDIRECTSHADOWPAGER_H
#define FDBSERVER_INDIRECTSHADOWPAGER_H
#pragma once
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/IPager.h"
#include "flow/ActorCollection.h"
#include "fdbclient/Notified.h"
#include "fdbrpc/IAsyncFile.h"
typedef uint32_t PhysicalPageID;
typedef std::vector<std::pair<Version, PhysicalPageID>> PageVersionMap;
typedef std::vector<PageVersionMap> LogicalPageTable;
class IndirectShadowPager;
class IndirectShadowPage : public IPage, ReferenceCounted<IndirectShadowPage> {
public:
IndirectShadowPage();
IndirectShadowPage(uint8_t *data, Reference<IAsyncFile> file, PhysicalPageID pageID)
: file(file), physicalPageID(pageID), fastAllocated(false), data(data) {}
virtual ~IndirectShadowPage();
virtual void addref() const {
ReferenceCounted<IndirectShadowPage>::addref();
}
virtual void delref() const {
ReferenceCounted<IndirectShadowPage>::delref();
}
virtual int size() const;
virtual uint8_t const* begin() const;
virtual uint8_t* mutate();
//private:
static const int PAGE_BYTES;
static const int PAGE_OVERHEAD_BYTES;
private:
Reference<IAsyncFile> file;
PhysicalPageID physicalPageID;
bool fastAllocated;
uint8_t *data;
};
class IndirectShadowPagerSnapshot : public IPagerSnapshot, ReferenceCounted<IndirectShadowPagerSnapshot> {
public:
IndirectShadowPagerSnapshot(IndirectShadowPager *pager, Version version);
virtual Future<Reference<const IPage>> getPhysicalPage(LogicalPageID pageID, bool cacheable);
virtual Version getVersion() const {
return version;
}
virtual ~IndirectShadowPagerSnapshot() {
}
virtual void addref() {
ReferenceCounted<IndirectShadowPagerSnapshot>::addref();
}
virtual void delref() {
ReferenceCounted<IndirectShadowPagerSnapshot>::delref();
}
private:
IndirectShadowPager *pager;
Version version;
Future<Void> pagerError;
};
class PagerFile {
public:
PagerFile(IndirectShadowPager *pager);
PhysicalPageID allocatePage(LogicalPageID logicalPageID, Version version);
void freePage(PhysicalPageID physicalPageID);
void markPageAllocated(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID);
void finishedMarkingPages();
uint64_t size();
uint32_t getPagesAllocated();
uint32_t getFreePages();
void init(uint64_t fileSize, uint32_t pagesAllocated);
void startVacuuming();
void shutdown();
//private:
Future<Void> vacuuming;
IndirectShadowPager *pager;
uint32_t pagesAllocated;
uint64_t fileSize;
std::set<PhysicalPageID> freePages;
PhysicalPageID minVacuumQueuePage;
bool vacuumQueueReady;
std::map<PhysicalPageID, std::pair<LogicalPageID, Version>> vacuumQueue;
bool canVacuum();
static const PhysicalPageID INVALID_PAGE;
};
class IndirectShadowPager : public IPager {
public:
IndirectShadowPager(std::string basename);
virtual ~IndirectShadowPager() {
}
virtual Reference<IPage> newPageBuffer();
virtual int getUsablePageSize();
virtual Reference<IPagerSnapshot> getReadSnapshot(Version version);
virtual LogicalPageID allocateLogicalPage();
virtual void freeLogicalPage(LogicalPageID pageID, Version version);
virtual void writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion, LogicalPageID referencePageID);
virtual void forgetVersions(Version begin, Version end);
virtual Future<Void> commit();
virtual void setLatestVersion(Version version);
virtual Future<Version> getLatestVersion();
virtual StorageBytes getStorageBytes();
virtual Future<Void> getError();
virtual Future<Void> onClosed();
virtual void dispose();
virtual void close();
Future<Reference<const IPage>> getPage(Reference<IndirectShadowPagerSnapshot> snapshot, LogicalPageID pageID, Version version);
//private:
std::string basename;
std::string pageFileName;
Version latestVersion;
Version committedVersion;
LogicalPageTable pageTable;
IKeyValueStore *pageTableLog;
Reference<IAsyncFile> dataFile;
Future<Void> recovery;
Future<Void> housekeeping;
Future<Void> vacuuming;
Version oldestVersion;
// TODO: This structure maybe isn't needed
struct BusyPage {
Future<Reference<const IPage>> read;
};
typedef std::map<PhysicalPageID, BusyPage> BusyPageMapT;
BusyPageMapT busyPages;
SignalableActorCollection operations;
SignalableActorCollection writeActors;
Future<Void> committing;
Promise<Void> closed;
Promise<Void> errorPromise;
std::deque<LogicalPageID> logicalFreeList;
PagerFile pagerFile;
static PageVersionMap::iterator pageVersionMapLowerBound(PageVersionMap &pageVersionMap, Version v);
static PageVersionMap::iterator pageVersionMapUpperBound(PageVersionMap &pageVersionMap, Version v);
void freeLogicalPageID(LogicalPageID pageID);
void freePhysicalPageID(PhysicalPageID pageID);
void logVersion(StringRef versionKey, Version version);
void logPagesAllocated();
void logPageTableUpdate(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID);
void logPageTableClearToEnd(LogicalPageID logicalPageID, Version start);
void logPageTableClear(LogicalPageID logicalPageID, Version start, Version end);
static const StringRef LATEST_VERSION_KEY;
static const StringRef OLDEST_VERSION_KEY;
static const StringRef PAGES_ALLOCATED_KEY;
static const StringRef TABLE_ENTRY_PREFIX;
};
#endif

View File

@ -1,456 +0,0 @@
/*
* MemoryPager.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cinttypes>
#include "fdbserver/MemoryPager.h"
#include "fdbserver/Knobs.h"
#include "flow/Arena.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h"
typedef uint8_t* PhysicalPageID;
typedef std::vector<std::pair<Version, PhysicalPageID>> PageVersionMap;
typedef std::vector<PageVersionMap> LogicalPageTable;
class MemoryPager;
class MemoryPage : public IPage, ReferenceCounted<MemoryPage> {
public:
MemoryPage();
MemoryPage(uint8_t *data);
virtual ~MemoryPage();
virtual void addref() const {
ReferenceCounted<MemoryPage>::addref();
}
virtual void delref() const {
ReferenceCounted<MemoryPage>::delref();
}
virtual int size() const;
virtual uint8_t const* begin() const;
virtual uint8_t* mutate();
private:
friend class MemoryPager;
uint8_t *data;
bool allocated;
static const int PAGE_BYTES;
};
class MemoryPagerSnapshot : public IPagerSnapshot, ReferenceCounted<MemoryPagerSnapshot> {
public:
MemoryPagerSnapshot(MemoryPager *pager, Version version) : pager(pager), version(version) {}
virtual Future<Reference<const IPage>> getPhysicalPage(LogicalPageID pageID, bool cacheable);
virtual Version getVersion() const {
return version;
}
virtual void addref() {
ReferenceCounted<MemoryPagerSnapshot>::addref();
}
virtual void delref() {
ReferenceCounted<MemoryPagerSnapshot>::delref();
}
private:
MemoryPager *pager;
Version version;
};
class MemoryPager : public IPager, ReferenceCounted<MemoryPager> {
public:
MemoryPager();
virtual Reference<IPage> newPageBuffer();
virtual int getUsablePageSize();
virtual Reference<IPagerSnapshot> getReadSnapshot(Version version);
virtual LogicalPageID allocateLogicalPage();
virtual void freeLogicalPage(LogicalPageID pageID, Version version);
virtual void writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion, LogicalPageID referencePageID);
virtual void forgetVersions(Version begin, Version end);
virtual Future<Void> commit();
virtual StorageBytes getStorageBytes() {
// TODO: Get actual values for used and free memory
return StorageBytes();
}
virtual void setLatestVersion(Version version);
virtual Future<Version> getLatestVersion();
virtual Future<Void> getError();
virtual Future<Void> onClosed();
virtual void dispose();
virtual void close();
virtual Reference<const IPage> getPage(LogicalPageID pageID, Version version);
private:
Version latestVersion;
Version committedVersion;
Standalone<VectorRef<VectorRef<uint8_t>>> data;
LogicalPageTable pageTable;
Promise<Void> closed;
std::vector<PhysicalPageID> freeList; // TODO: is this good enough for now?
PhysicalPageID allocatePage(Reference<IPage> contents);
void extendData();
static const PhysicalPageID INVALID_PAGE;
};
IPager * createMemoryPager() {
return new MemoryPager();
}
MemoryPage::MemoryPage() : allocated(true) {
data = (uint8_t*)FastAllocator<4096>::allocate();
}
MemoryPage::MemoryPage(uint8_t *data) : data(data), allocated(false) {}
MemoryPage::~MemoryPage() {
if(allocated) {
FastAllocator<4096>::release(data);
}
}
uint8_t const* MemoryPage::begin() const {
return data;
}
uint8_t* MemoryPage::mutate() {
return data;
}
int MemoryPage::size() const {
return PAGE_BYTES;
}
const int MemoryPage::PAGE_BYTES = 4096;
Future<Reference<const IPage>> MemoryPagerSnapshot::getPhysicalPage(LogicalPageID pageID, bool cacheable) {
return pager->getPage(pageID, version);
}
MemoryPager::MemoryPager() : latestVersion(0), committedVersion(0) {
extendData();
pageTable.resize(SERVER_KNOBS->PAGER_RESERVED_PAGES);
}
Reference<IPage> MemoryPager::newPageBuffer() {
return Reference<IPage>(new MemoryPage());
}
int MemoryPager::getUsablePageSize() {
return MemoryPage::PAGE_BYTES;
}
Reference<IPagerSnapshot> MemoryPager::getReadSnapshot(Version version) {
ASSERT(version <= latestVersion);
return Reference<IPagerSnapshot>(new MemoryPagerSnapshot(this, version));
}
LogicalPageID MemoryPager::allocateLogicalPage() {
ASSERT(pageTable.size() >= SERVER_KNOBS->PAGER_RESERVED_PAGES);
pageTable.push_back(PageVersionMap());
return pageTable.size() - 1;
}
void MemoryPager::freeLogicalPage(LogicalPageID pageID, Version version) {
ASSERT(pageID < pageTable.size());
PageVersionMap &pageVersionMap = pageTable[pageID];
ASSERT(!pageVersionMap.empty());
auto itr = std::lower_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](std::pair<Version, PhysicalPageID> p, Version v) {
return p.first < v;
});
pageVersionMap.erase(itr, pageVersionMap.end());
if(pageVersionMap.size() > 0 && pageVersionMap.back().second != INVALID_PAGE) {
pageVersionMap.push_back(std::make_pair(version, INVALID_PAGE));
}
}
void MemoryPager::writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion, LogicalPageID referencePageID) {
ASSERT(updateVersion > latestVersion || updateVersion == 0);
ASSERT(pageID < pageTable.size());
if(referencePageID != invalidLogicalPageID) {
PageVersionMap &rpv = pageTable[referencePageID];
ASSERT(!rpv.empty());
updateVersion = rpv.back().first;
}
PageVersionMap &pageVersionMap = pageTable[pageID];
ASSERT(updateVersion >= committedVersion || updateVersion == 0);
PhysicalPageID physicalPageID = allocatePage(contents);
ASSERT(pageVersionMap.empty() || pageVersionMap.back().second != INVALID_PAGE);
if(updateVersion == 0) {
ASSERT(pageVersionMap.size());
updateVersion = pageVersionMap.back().first;
pageVersionMap.back().second = physicalPageID;
// TODO: what to do with old page?
}
else {
ASSERT(pageVersionMap.empty() || pageVersionMap.back().first < updateVersion);
pageVersionMap.push_back(std::make_pair(updateVersion, physicalPageID));
}
}
void MemoryPager::forgetVersions(Version begin, Version end) {
ASSERT(begin <= end);
ASSERT(end <= latestVersion);
// TODO
}
Future<Void> MemoryPager::commit() {
ASSERT(committedVersion < latestVersion);
committedVersion = latestVersion;
return Void();
}
void MemoryPager::setLatestVersion(Version version) {
ASSERT(version > latestVersion);
latestVersion = version;
}
Future<Version> MemoryPager::getLatestVersion() {
return latestVersion;
}
Reference<const IPage> MemoryPager::getPage(LogicalPageID pageID, Version version) {
ASSERT(pageID < pageTable.size());
PageVersionMap const& pageVersionMap = pageTable[pageID];
auto itr = std::upper_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](Version v, std::pair<Version, PhysicalPageID> p) {
return v < p.first;
});
if(itr == pageVersionMap.begin()) {
return Reference<IPage>(); // TODO: should this be an error?
}
--itr;
ASSERT(itr->second != INVALID_PAGE);
return Reference<const IPage>(new MemoryPage(itr->second)); // TODO: Page memory owned by the pager. Change this?
}
Future<Void> MemoryPager::getError() {
return Void();
}
Future<Void> MemoryPager::onClosed() {
return closed.getFuture();
}
void MemoryPager::dispose() {
closed.send(Void());
delete this;
}
void MemoryPager::close() {
dispose();
}
PhysicalPageID MemoryPager::allocatePage(Reference<IPage> contents) {
if(freeList.size()) {
PhysicalPageID pageID = freeList.back();
freeList.pop_back();
memcpy(pageID, contents->begin(), contents->size());
return pageID;
}
else {
ASSERT(data.size() && data.back().capacity() - data.back().size() >= contents->size());
PhysicalPageID pageID = data.back().end();
data.back().append(data.arena(), contents->begin(), contents->size());
if(data.back().size() == data.back().capacity()) {
extendData();
}
else {
ASSERT(data.back().size() <= data.back().capacity() - 4096);
}
return pageID;
}
}
void MemoryPager::extendData() {
if(data.size() > 1000) { // TODO: is this an ok way to handle large data size?
throw io_error();
}
VectorRef<uint8_t> d;
d.reserve(data.arena(), 1 << 22);
data.push_back(data.arena(), d);
}
// TODO: these tests are not MemoryPager specific, we should make them more general
void fillPage(Reference<IPage> page, LogicalPageID pageID, Version version) {
ASSERT(page->size() > sizeof(LogicalPageID) + sizeof(Version));
memset(page->mutate(), 0, page->size());
memcpy(page->mutate(), (void*)&pageID, sizeof(LogicalPageID));
memcpy(page->mutate() + sizeof(LogicalPageID), (void*)&version, sizeof(Version));
}
bool validatePage(Reference<const IPage> page, LogicalPageID pageID, Version version) {
bool valid = true;
LogicalPageID readPageID = *(LogicalPageID*)page->begin();
if(readPageID != pageID) {
fprintf(stderr, "Invalid PageID detected: %u (expected %u)\n", readPageID, pageID);
valid = false;
}
Version readVersion = *(Version*)(page->begin()+sizeof(LogicalPageID));
if(readVersion != version) {
fprintf(stderr, "Invalid Version detected on page %u: %" PRId64 "(expected %" PRId64 ")\n", pageID, readVersion, version);
valid = false;
}
return valid;
}
void writePage(IPager *pager, Reference<IPage> page, LogicalPageID pageID, Version version, bool updateVersion=true) {
fillPage(page, pageID, version);
pager->writePage(pageID, page, updateVersion ? version : 0);
}
ACTOR Future<Void> commit(IPager *pager) {
static int commitNum = 1;
state int myCommit = commitNum++;
debug_printf("Commit%d\n", myCommit);
wait(pager->commit());
debug_printf("FinishedCommit%d\n", myCommit);
return Void();
}
ACTOR Future<Void> read(IPager *pager, LogicalPageID pageID, Version version, Version expectedVersion=-1) {
static int readNum = 1;
state int myRead = readNum++;
state Reference<IPagerSnapshot> readSnapshot = pager->getReadSnapshot(version);
debug_printf("Read%d\n", myRead);
Reference<const IPage> readPage = wait(readSnapshot->getPhysicalPage(pageID, true));
debug_printf("FinishedRead%d\n", myRead);
ASSERT(validatePage(readPage, pageID, expectedVersion >= 0 ? expectedVersion : version));
return Void();
}
ACTOR Future<Void> simplePagerTest(IPager *pager) {
state Reference<IPage> page = pager->newPageBuffer();
Version latestVersion = wait(pager->getLatestVersion());
debug_printf("Got latest version: %lld\n", latestVersion);
state Version version = latestVersion+1;
state Version v1 = version;
state LogicalPageID pageID1 = pager->allocateLogicalPage();
writePage(pager, page, pageID1, v1);
pager->setLatestVersion(v1);
wait(commit(pager));
state LogicalPageID pageID2 = pager->allocateLogicalPage();
state Version v2 = ++version;
writePage(pager, page, pageID1, v2);
writePage(pager, page, pageID2, v2);
pager->setLatestVersion(v2);
wait(commit(pager));
wait(read(pager, pageID1, v2));
wait(read(pager, pageID1, v1));
state Version v3 = ++version;
writePage(pager, page, pageID1, v3, false);
pager->setLatestVersion(v3);
wait(read(pager, pageID1, v2, v3));
wait(read(pager, pageID1, v3, v3));
state LogicalPageID pageID3 = pager->allocateLogicalPage();
state Version v4 = ++version;
writePage(pager, page, pageID2, v4);
writePage(pager, page, pageID3, v4);
pager->setLatestVersion(v4);
wait(commit(pager));
wait(read(pager, pageID2, v4, v4));
state Version v5 = ++version;
writePage(pager, page, pageID2, v5);
state LogicalPageID pageID4 = pager->allocateLogicalPage();
writePage(pager, page, pageID4, v5);
state Version v6 = ++version;
pager->freeLogicalPage(pageID2, v5);
pager->freeLogicalPage(pageID3, v3);
pager->setLatestVersion(v6);
wait(commit(pager));
pager->forgetVersions(0, v4);
wait(commit(pager));
wait(delay(3.0));
wait(commit(pager));
return Void();
}
/*
TEST_CASE("/fdbserver/memorypager/simple") {
state IPager *pager = new MemoryPager();
wait(simplePagerTest(pager));
Future<Void> closedFuture = pager->onClosed();
pager->dispose();
wait(closedFuture);
return Void();
}
*/
const PhysicalPageID MemoryPager::INVALID_PAGE = nullptr;

View File

@ -1,29 +0,0 @@
/*
* MemoryPager.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBSERVER_MEMORYPAGER_H
#define FDBSERVER_MEMORYPAGER_H
#pragma once
#include "fdbserver/IPager.h"
IPager * createMemoryPager();
#endif

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -46,7 +46,6 @@
<ActorCompiler Include="KeyValueStoreMemory.actor.cpp" />
<ActorCompiler Include="SimulatedCluster.actor.cpp" />
<ActorCompiler Include="KeyValueStoreCompressTestData.actor.cpp" />
<ActorCompiler Include="IndirectShadowPager.actor.cpp" />
<ClCompile Include="Knobs.cpp" />
<ActorCompiler Include="FDBExecHelper.actor.cpp" />
<ActorCompiler Include="QuietDatabase.actor.cpp" />
@ -57,7 +56,6 @@
<ActorCompiler Include="Restore.actor.cpp" />
<ActorCompiler Include="LogSystemDiskQueueAdapter.actor.cpp" />
<ActorCompiler Include="LogSystemPeekCursor.actor.cpp" />
<ActorCompiler Include="MemoryPager.actor.cpp" />
<ActorCompiler Include="LogRouter.actor.cpp" />
<ClCompile Include="LatencyBandConfig.cpp" />
<ActorCompiler Include="OldTLogServer_4_6.actor.cpp" />
@ -179,7 +177,6 @@
</ActorCompiler>
<ClInclude Include="IDiskQueue.h" />
<ClInclude Include="IKeyValueStore.h" />
<ClInclude Include="IndirectShadowPager.h" />
<ClInclude Include="IPager.h" />
<ClInclude Include="IVersionedStore.h" />
<ClInclude Include="LatencyBandConfig.h" />
@ -189,7 +186,6 @@
<ClInclude Include="LogSystemConfig.h" />
<ClInclude Include="LogSystemDiskQueueAdapter.h" />
<ClInclude Include="MasterInterface.h" />
<ClInclude Include="MemoryPager.h" />
<ActorCompiler Include="MoveKeys.actor.h">
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>

View File

@ -274,8 +274,6 @@
<ActorCompiler Include="workloads\AtomicRestore.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="MemoryPager.actor.cpp" />
<ActorCompiler Include="IndirectShadowPager.actor.cpp" />
<ActorCompiler Include="OldTLogServer.actor.cpp" />
<ActorCompiler Include="LogRouter.actor.cpp" />
<ActorCompiler Include="workloads\SlowTaskWorkload.actor.cpp">
@ -385,8 +383,6 @@
<ClInclude Include="LogProtocolMessage.h" />
<ClInclude Include="IPager.h" />
<ClInclude Include="IVersionedStore.h" />
<ClInclude Include="MemoryPager.h" />
<ClInclude Include="IndirectShadowPager.h" />
<ClInclude Include="template_fdb.h" />
<ClInclude Include="LatencyBandConfig.h" />
</ItemGroup>

View File

@ -139,6 +139,8 @@ add_fdb_test(TEST_FILES rare/LargeApiCorrectnessStatus.txt)
add_fdb_test(TEST_FILES rare/RYWDisable.txt)
add_fdb_test(TEST_FILES rare/RandomReadWriteTest.txt)
add_fdb_test(TEST_FILES rare/SwizzledLargeApiCorrectness.txt)
add_fdb_test(TEST_FILES rare/RedwoodCorrectnessBTree.txt)
add_fdb_test(
TEST_FILES restarting/ConfigureTestRestart-1.txt
restarting/ConfigureTestRestart-2.txt)

View File

@ -0,0 +1,6 @@
testTitle=UnitTests
testName=UnitTests
startDelay=0
useDB=false
maxTestCases=0
testsMatching=!/redwood/correctness/btree