Implemented page preloading on BTree cursor seeks to enable hiding latency on soon-to-be-read sibling pages. Added random scans with various preload sizes to the set performance unit test. ObjectCache now tracks hits, misses, and pages which were preloaded but then never used prior to eviction. BTree pages no longer store flags because height is sufficient. Removed virtual specifier in classes not designed to be further inherited. Removed old prototype code (PrefixTree, IndirectShadowPager, MemoryPager) as some interface changes are incompatible and they are no longer worth maintaining.
This commit is contained in:
parent
2aa672cb59
commit
61558eea04
|
@ -24,8 +24,6 @@ set(FDBSERVER_SRCS
|
|||
IKeyValueStore.h
|
||||
IPager.h
|
||||
IVersionedStore.h
|
||||
IndirectShadowPager.actor.cpp
|
||||
IndirectShadowPager.h
|
||||
KeyValueStoreCompressTestData.actor.cpp
|
||||
KeyValueStoreMemory.actor.cpp
|
||||
KeyValueStoreSQLite.actor.cpp
|
||||
|
@ -45,8 +43,6 @@ set(FDBSERVER_SRCS
|
|||
MasterInterface.h
|
||||
MasterProxyServer.actor.cpp
|
||||
masterserver.actor.cpp
|
||||
MemoryPager.actor.cpp
|
||||
MemoryPager.h
|
||||
MoveKeys.actor.cpp
|
||||
MoveKeys.actor.h
|
||||
networktest.actor.cpp
|
||||
|
|
|
@ -20,13 +20,89 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "fdbserver/PrefixTree.h"
|
||||
#include "flow/flow.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include <string.h>
|
||||
|
||||
typedef uint64_t Word;
|
||||
static inline int commonPrefixLength(uint8_t const* ap, uint8_t const* bp, int cl) {
|
||||
int i = 0;
|
||||
const int wordEnd = cl - sizeof(Word) + 1;
|
||||
|
||||
for(; i < wordEnd; i += sizeof(Word)) {
|
||||
Word a = *(Word *)ap;
|
||||
Word b = *(Word *)bp;
|
||||
if(a != b) {
|
||||
return i + ctzll(a ^ b) / 8;
|
||||
}
|
||||
ap += sizeof(Word);
|
||||
bp += sizeof(Word);
|
||||
}
|
||||
|
||||
for (; i < cl; i++) {
|
||||
if (*ap != *bp) {
|
||||
return i;
|
||||
}
|
||||
++ap;
|
||||
++bp;
|
||||
}
|
||||
return cl;
|
||||
}
|
||||
|
||||
static int commonPrefixLength(StringRef a, StringRef b) {
|
||||
return commonPrefixLength(a.begin(), b.begin(), std::min(a.size(), b.size()));
|
||||
}
|
||||
|
||||
// This appears to be the fastest version
|
||||
static int lessOrEqualPowerOfTwo(int n) {
|
||||
int p;
|
||||
for (p = 1; p+p <= n; p+=p);
|
||||
return p;
|
||||
}
|
||||
|
||||
/*
|
||||
static int _lessOrEqualPowerOfTwo(uint32_t n) {
|
||||
if(n == 0)
|
||||
return n;
|
||||
int trailing = __builtin_ctz(n);
|
||||
int leading = __builtin_clz(n);
|
||||
if(trailing + leading == ((sizeof(n) * 8) - 1))
|
||||
return n;
|
||||
return 1 << ( (sizeof(n) * 8) - leading - 1);
|
||||
}
|
||||
|
||||
static int __lessOrEqualPowerOfTwo(unsigned int n) {
|
||||
int p = 1;
|
||||
for(; p <= n; p <<= 1);
|
||||
return p >> 1;
|
||||
}
|
||||
*/
|
||||
|
||||
static int perfectSubtreeSplitPoint(int subtree_size) {
|
||||
// return the inorder index of the root node in a subtree of the given size
|
||||
// consistent with the resulting binary search tree being "perfect" (having minimal height
|
||||
// and all missing nodes as far right as possible).
|
||||
// There has to be a simpler way to do this.
|
||||
int s = lessOrEqualPowerOfTwo((subtree_size - 1) / 2 + 1) - 1;
|
||||
return std::min(s * 2 + 1, subtree_size - s - 1);
|
||||
}
|
||||
|
||||
static int perfectSubtreeSplitPointCached(int subtree_size) {
|
||||
static uint16_t *points = nullptr;
|
||||
static const int max = 500;
|
||||
if(points == nullptr) {
|
||||
points = new uint16_t[max];
|
||||
for(int i = 0; i < max; ++i)
|
||||
points[i] = perfectSubtreeSplitPoint(i);
|
||||
}
|
||||
|
||||
if(subtree_size < max)
|
||||
return points[subtree_size];
|
||||
return perfectSubtreeSplitPoint(subtree_size);
|
||||
}
|
||||
|
||||
// Delta Tree is a memory mappable binary tree of T objects such that each node's item is
|
||||
// stored as a Delta which can reproduce the node's T item given the node's greatest
|
||||
// lesser ancestor and the node's least greater ancestor.
|
||||
|
|
|
@ -53,8 +53,9 @@
|
|||
#define VALGRIND_MAKE_MEM_DEFINED(x, y)
|
||||
#endif
|
||||
|
||||
typedef uint32_t LogicalPageID; // uint64_t?
|
||||
static const LogicalPageID invalidLogicalPageID = std::numeric_limits<LogicalPageID>::max();
|
||||
typedef uint32_t LogicalPageID;
|
||||
typedef uint32_t PhysicalPageID;
|
||||
#define invalidLogicalPageID std::numeric_limits<LogicalPageID>::max()
|
||||
|
||||
class IPage {
|
||||
public:
|
||||
|
@ -85,12 +86,10 @@ public:
|
|||
|
||||
class IPagerSnapshot {
|
||||
public:
|
||||
virtual Future<Reference<const IPage>> getPhysicalPage(LogicalPageID pageID, bool cacheable) = 0;
|
||||
virtual Future<Reference<const IPage>> getPhysicalPage(LogicalPageID pageID, bool cacheable, bool nohit) = 0;
|
||||
virtual Version getVersion() const = 0;
|
||||
|
||||
virtual Key getMetaKey() const {
|
||||
return Key();
|
||||
}
|
||||
virtual Key getMetaKey() const = 0;
|
||||
|
||||
virtual ~IPagerSnapshot() {}
|
||||
|
||||
|
@ -98,65 +97,7 @@ public:
|
|||
virtual void delref() = 0;
|
||||
};
|
||||
|
||||
class IPager : public IClosable {
|
||||
public:
|
||||
// Returns an IPage that can be passed to writePage. The data in the returned IPage might not be zeroed.
|
||||
virtual Reference<IPage> newPageBuffer() = 0;
|
||||
|
||||
// Returns the usable size of pages returned by the pager (i.e. the size of the page that isn't pager overhead).
|
||||
// For a given pager instance, separate calls to this function must return the same value.
|
||||
virtual int getUsablePageSize() = 0;
|
||||
|
||||
virtual StorageBytes getStorageBytes() = 0;
|
||||
|
||||
// Permitted to fail (ASSERT) during recovery.
|
||||
virtual Reference<IPagerSnapshot> getReadSnapshot(Version version) = 0;
|
||||
|
||||
// Returns an unused LogicalPageID.
|
||||
// LogicalPageIDs in the range [0, SERVER_KNOBS->PAGER_RESERVED_PAGES) do not need to be allocated.
|
||||
// Permitted to fail (ASSERT) during recovery.
|
||||
virtual LogicalPageID allocateLogicalPage() = 0;
|
||||
|
||||
// Signals that the page will no longer be used as of the specified version. Versions prior to the specified version must be kept.
|
||||
// Permitted to fail (ASSERT) during recovery.
|
||||
virtual void freeLogicalPage(LogicalPageID pageID, Version version) = 0;
|
||||
|
||||
// Writes a page with the given LogicalPageID at the specified version. LogicalPageIDs in the range [0, SERVER_KNOBS->PAGER_RESERVED_PAGES)
|
||||
// can be written without being allocated. All other LogicalPageIDs must be allocated using allocateLogicalPage before writing them.
|
||||
//
|
||||
// If updateVersion is 0, we are signalling to the pager that we are reusing the LogicalPageID entry at the current latest version of pageID.
|
||||
//
|
||||
// Otherwise, we will add a new entry for LogicalPageID at the specified version. In that case, updateVersion must be larger than any version
|
||||
// written to this page previously, and it must be larger than any version committed. If referencePageID is given, the latest version of that
|
||||
// page will be used for the write, which *can* be less than the latest committed version.
|
||||
//
|
||||
// Permitted to fail (ASSERT) during recovery.
|
||||
virtual void writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion, LogicalPageID referencePageID = invalidLogicalPageID) = 0;
|
||||
|
||||
// Signals to the pager that no more reads will be performed in the range [begin, end).
|
||||
// Permitted to fail (ASSERT) during recovery.
|
||||
virtual void forgetVersions(Version begin, Version end) = 0;
|
||||
|
||||
// Makes durable all writes and any data structures used for recovery.
|
||||
// Permitted to fail (ASSERT) during recovery.
|
||||
virtual Future<Void> commit() = 0;
|
||||
|
||||
// Returns the latest version of the pager. Permitted to block until recovery is complete, at which point it should always be set immediately.
|
||||
// Some functions in the IPager interface are permitted to fail (ASSERT) during recovery, so users should wait for getLatestVersion to complete
|
||||
// before doing anything else.
|
||||
virtual Future<Version> getLatestVersion() = 0;
|
||||
|
||||
// Sets the latest version of the pager. Must be monotonically increasing.
|
||||
//
|
||||
// Must be called prior to reading the specified version. SOMEDAY: It may be desirable in the future to relax this constraint for performance reasons.
|
||||
//
|
||||
// Permitted to fail (ASSERT) during recovery.
|
||||
virtual void setLatestVersion(Version version) = 0;
|
||||
|
||||
protected:
|
||||
~IPager() {} // Destruction should be done using close()/dispose() from the IClosable interface
|
||||
};
|
||||
|
||||
// This API is probably customized to the behavior of DWALPager and probably needs some changes to be more generic.
|
||||
class IPager2 : public IClosable {
|
||||
public:
|
||||
// Returns an IPage that can be passed to writePage. The data in the returned IPage might not be zeroed.
|
||||
|
@ -189,7 +130,10 @@ public:
|
|||
// The data returned will be the later of
|
||||
// - the most recent committed atomic
|
||||
// - the most recent non-atomic write
|
||||
virtual Future<Reference<IPage>> readPage(LogicalPageID pageID, bool cacheable) = 0;
|
||||
// Cacheable indicates that the page should be added to the page cache (if applicable?) as a result of this read.
|
||||
// NoHit indicates that the read should not be considered a cache hit, such as when preloading pages that are
|
||||
// considered likely to be needed soon.
|
||||
virtual Future<Reference<IPage>> readPage(LogicalPageID pageID, bool cacheable = true, bool noHit = false) = 0;
|
||||
|
||||
// Get a snapshot of the metakey and all pages as of the version v which must be >= getOldestVersion()
|
||||
// Note that snapshots at any version may still see the results of updatePage() calls.
|
||||
|
|
|
@ -30,10 +30,10 @@
|
|||
class IStoreCursor {
|
||||
public:
|
||||
virtual Future<Void> findEqual(KeyRef key) = 0;
|
||||
virtual Future<Void> findFirstEqualOrGreater(KeyRef key, bool needValue, int prefetchNextBytes) = 0;
|
||||
virtual Future<Void> findLastLessOrEqual(KeyRef key, bool needValue, int prefetchPriorBytes) = 0;
|
||||
virtual Future<Void> next(bool needValue) = 0;
|
||||
virtual Future<Void> prev(bool needValue) = 0;
|
||||
virtual Future<Void> findFirstEqualOrGreater(KeyRef key, int prefetchBytes = 0) = 0;
|
||||
virtual Future<Void> findLastLessOrEqual(KeyRef key, int prefetchBytes = 0) = 0;
|
||||
virtual Future<Void> next() = 0;
|
||||
virtual Future<Void> prev() = 0;
|
||||
|
||||
virtual bool isValid() = 0;
|
||||
virtual KeyRef getKey() = 0;
|
||||
|
@ -41,8 +41,6 @@ public:
|
|||
|
||||
virtual void addref() = 0;
|
||||
virtual void delref() = 0;
|
||||
|
||||
virtual std::string toString() const = 0;
|
||||
};
|
||||
|
||||
class IVersionedStore : public IClosable {
|
||||
|
|
|
@ -1,960 +0,0 @@
|
|||
/*
|
||||
* IndirectShadowPager.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbserver/IndirectShadowPager.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
|
||||
#include "flow/UnitTest.h"
|
||||
#include "flow/actorcompiler.h"
|
||||
#include "fdbrpc/crc32c.h"
|
||||
|
||||
struct SumType {
|
||||
bool operator==(const SumType &rhs) const { return crc == rhs.crc; }
|
||||
uint32_t crc;
|
||||
std::string toString() { return format("0x%08x", crc); }
|
||||
};
|
||||
|
||||
bool checksum(IAsyncFile *file, uint8_t *page, int pageSize, LogicalPageID logical, PhysicalPageID physical, bool write) {
|
||||
// Calculates and then stores or verifies the checksum at the end of the page.
|
||||
// If write is true then the checksum is written into the page
|
||||
// If write is false then the checksum is compared to the in-page sum and
|
||||
// and error will be thrown if they do not match.
|
||||
ASSERT(sizeof(SumType) == IndirectShadowPage::PAGE_OVERHEAD_BYTES);
|
||||
// Adjust pageSize to refer to only usable storage bytes
|
||||
pageSize -= IndirectShadowPage::PAGE_OVERHEAD_BYTES;
|
||||
SumType sum;
|
||||
SumType *pSumInPage = (SumType *)(page + pageSize);
|
||||
// Write sum directly to page or to sum variable based on mode
|
||||
SumType *sumOut = write ? pSumInPage : ∑
|
||||
sumOut->crc = crc32c_append(logical, page, pageSize);
|
||||
VALGRIND_MAKE_MEM_DEFINED(sumOut, sizeof(SumType));
|
||||
|
||||
debug_printf("checksum %s%s logical %d physical %d size %d checksums page %s calculated %s data at %p %s\n",
|
||||
write ? "write" : "read",
|
||||
(!write && sum != *pSumInPage) ? " MISMATCH" : "",
|
||||
logical, physical, pageSize,
|
||||
write ? "NA" : pSumInPage->toString().c_str(),
|
||||
sumOut->toString().c_str(), page, "");
|
||||
|
||||
// Verify if not in write mode
|
||||
if(!write && sum != *pSumInPage) {
|
||||
TraceEvent (SevError, "IndirectShadowPagerPageChecksumFailure")
|
||||
.detail("UserPageSize", pageSize)
|
||||
.detail("Filename", file->getFilename())
|
||||
.detail("LogicalPage", logical)
|
||||
.detail("PhysicalPage", physical)
|
||||
.detail("ChecksumInPage", pSumInPage->toString())
|
||||
.detail("ChecksumCalculated", sum.toString());
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
inline bool checksumRead(IAsyncFile *file, uint8_t *page, int pageSize, LogicalPageID logical, PhysicalPageID physical) {
|
||||
return checksum(file, page, pageSize, logical, physical, false);
|
||||
}
|
||||
|
||||
inline void checksumWrite(IAsyncFile *file, uint8_t *page, int pageSize, LogicalPageID logical, PhysicalPageID physical) {
|
||||
checksum(file, page, pageSize, logical, physical, true);
|
||||
}
|
||||
|
||||
IndirectShadowPage::IndirectShadowPage() : fastAllocated(true) {
|
||||
data = (uint8_t*)FastAllocator<4096>::allocate();
|
||||
}
|
||||
|
||||
IndirectShadowPage::~IndirectShadowPage() {
|
||||
if(fastAllocated) {
|
||||
FastAllocator<4096>::release(data);
|
||||
}
|
||||
else if(file) {
|
||||
file->releaseZeroCopy(data, PAGE_BYTES, (int64_t) physicalPageID * PAGE_BYTES);
|
||||
}
|
||||
}
|
||||
|
||||
uint8_t const* IndirectShadowPage::begin() const {
|
||||
return data;
|
||||
}
|
||||
|
||||
uint8_t* IndirectShadowPage::mutate() {
|
||||
return data;
|
||||
}
|
||||
|
||||
int IndirectShadowPage::size() const {
|
||||
return PAGE_BYTES - PAGE_OVERHEAD_BYTES;
|
||||
}
|
||||
|
||||
const int IndirectShadowPage::PAGE_BYTES = 4096;
|
||||
const int IndirectShadowPage::PAGE_OVERHEAD_BYTES = sizeof(SumType);
|
||||
|
||||
IndirectShadowPagerSnapshot::IndirectShadowPagerSnapshot(IndirectShadowPager *pager, Version version)
|
||||
: pager(pager), version(version), pagerError(pager->getError())
|
||||
{
|
||||
}
|
||||
|
||||
Future<Reference<const IPage>> IndirectShadowPagerSnapshot::getPhysicalPage(LogicalPageID pageID, bool cacheable) {
|
||||
if(pagerError.isReady())
|
||||
pagerError.get();
|
||||
return pager->getPage(Reference<IndirectShadowPagerSnapshot>::addRef(this), pageID, version);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
T bigEndian(T val) {
|
||||
static_assert(sizeof(T) <= 8, "Can't compute bigEndian on integers larger than 8 bytes");
|
||||
uint64_t b = bigEndian64(val);
|
||||
return *(T*)((uint8_t*)&b+8-sizeof(T));
|
||||
}
|
||||
|
||||
ACTOR Future<Void> recover(IndirectShadowPager *pager) {
|
||||
try {
|
||||
TraceEvent("PagerRecovering").detail("Filename", pager->pageFileName);
|
||||
pager->pageTableLog = keyValueStoreMemory(pager->basename, UID(), 1e9, "pagerlog");
|
||||
|
||||
// TODO: this can be done synchronously with the log recovery
|
||||
int64_t flags = IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_LOCK;
|
||||
state bool exists = fileExists(pager->pageFileName);
|
||||
if(!exists) {
|
||||
flags |= IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE;
|
||||
}
|
||||
|
||||
Reference<IAsyncFile> dataFile = wait(IAsyncFileSystem::filesystem()->open(pager->pageFileName, flags, 0600));
|
||||
pager->dataFile = dataFile;
|
||||
|
||||
TraceEvent("PagerOpenedDataFile").detail("Filename", pager->pageFileName);
|
||||
|
||||
if(!exists) {
|
||||
wait(pager->dataFile->sync());
|
||||
}
|
||||
TraceEvent("PagerSyncdDataFile").detail("Filename", pager->pageFileName);
|
||||
|
||||
state int64_t fileSize = wait(pager->dataFile->size());
|
||||
TraceEvent("PagerGotFileSize").detail("Size", fileSize).detail("Filename", pager->pageFileName);
|
||||
|
||||
if(fileSize > 0) {
|
||||
TraceEvent("PagerRecoveringFromLogs").detail("Filename", pager->pageFileName);
|
||||
Optional<Value> pagesAllocatedValue = wait(pager->pageTableLog->readValue(IndirectShadowPager::PAGES_ALLOCATED_KEY));
|
||||
if(pagesAllocatedValue.present()) {
|
||||
BinaryReader pr(pagesAllocatedValue.get(), Unversioned());
|
||||
uint32_t pagesAllocated;
|
||||
pr >> pagesAllocated;
|
||||
pager->pagerFile.init(fileSize, pagesAllocated);
|
||||
|
||||
debug_printf("%s: Recovered pages allocated: %d\n", pager->pageFileName.c_str(), pager->pagerFile.pagesAllocated);
|
||||
ASSERT(pager->pagerFile.pagesAllocated != PagerFile::INVALID_PAGE);
|
||||
|
||||
Optional<Value> latestVersionValue = wait(pager->pageTableLog->readValue(IndirectShadowPager::LATEST_VERSION_KEY));
|
||||
ASSERT(latestVersionValue.present());
|
||||
|
||||
BinaryReader vr(latestVersionValue.get(), Unversioned());
|
||||
vr >> pager->latestVersion;
|
||||
|
||||
Optional<Value> oldestVersionValue = wait(pager->pageTableLog->readValue(IndirectShadowPager::OLDEST_VERSION_KEY));
|
||||
|
||||
if(oldestVersionValue.present()) {
|
||||
BinaryReader vr(oldestVersionValue.get(), Unversioned());
|
||||
vr >> pager->oldestVersion;
|
||||
}
|
||||
|
||||
debug_printf("%s: Recovered version info: earliest v%lld latest v%lld\n", pager->pageFileName.c_str(), pager->oldestVersion, pager->latestVersion);
|
||||
pager->committedVersion = pager->latestVersion;
|
||||
|
||||
Standalone<VectorRef<KeyValueRef>> tableEntries = wait(pager->pageTableLog->readRange(KeyRangeRef(IndirectShadowPager::TABLE_ENTRY_PREFIX, strinc(IndirectShadowPager::TABLE_ENTRY_PREFIX))));
|
||||
|
||||
if(tableEntries.size() > 0) {
|
||||
BinaryReader kr(tableEntries.back().key, Unversioned());
|
||||
|
||||
uint8_t prefix;
|
||||
LogicalPageID logicalPageID;
|
||||
|
||||
kr >> prefix;
|
||||
ASSERT(prefix == IndirectShadowPager::TABLE_ENTRY_PREFIX.begin()[0]);
|
||||
|
||||
kr >> logicalPageID;
|
||||
logicalPageID = bigEndian(logicalPageID);
|
||||
|
||||
LogicalPageID pageTableSize = std::max<LogicalPageID>(logicalPageID+1, SERVER_KNOBS->PAGER_RESERVED_PAGES);
|
||||
pager->pageTable.resize(pageTableSize);
|
||||
debug_printf("%s: Recovered page table size: %d\n", pager->pageFileName.c_str(), pageTableSize);
|
||||
}
|
||||
else {
|
||||
debug_printf("%s: Recovered no page table entries\n", pager->pageFileName.c_str());
|
||||
}
|
||||
|
||||
LogicalPageID nextPageID = SERVER_KNOBS->PAGER_RESERVED_PAGES;
|
||||
std::set<PhysicalPageID> allocatedPhysicalPages;
|
||||
for(auto entry : tableEntries) {
|
||||
BinaryReader kr(entry.key, Unversioned());
|
||||
BinaryReader vr(entry.value, Unversioned());
|
||||
|
||||
uint8_t prefix;
|
||||
LogicalPageID logicalPageID;
|
||||
Version version;
|
||||
PhysicalPageID physicalPageID;
|
||||
|
||||
kr >> prefix;
|
||||
ASSERT(prefix == IndirectShadowPager::TABLE_ENTRY_PREFIX.begin()[0]);
|
||||
|
||||
kr >> logicalPageID;
|
||||
logicalPageID = bigEndian(logicalPageID);
|
||||
|
||||
kr >> version;
|
||||
version = bigEndian(version);
|
||||
vr >> physicalPageID;
|
||||
|
||||
ASSERT(version <= pager->latestVersion);
|
||||
|
||||
pager->pageTable[logicalPageID].push_back(std::make_pair(version, physicalPageID));
|
||||
|
||||
if(physicalPageID != PagerFile::INVALID_PAGE) {
|
||||
allocatedPhysicalPages.insert(physicalPageID);
|
||||
pager->pagerFile.markPageAllocated(logicalPageID, version, physicalPageID);
|
||||
}
|
||||
|
||||
while(nextPageID < logicalPageID) {
|
||||
pager->logicalFreeList.push_back(nextPageID++);
|
||||
}
|
||||
if(logicalPageID == nextPageID) {
|
||||
++nextPageID;
|
||||
}
|
||||
|
||||
debug_printf("%s: Recovered page table entry logical %d -> (v%lld, physical %d)\n", pager->pageFileName.c_str(), logicalPageID, version, physicalPageID);
|
||||
}
|
||||
|
||||
debug_printf("%s: Building physical free list\n", pager->pageFileName.c_str());
|
||||
// TODO: can we do this better? does it require storing extra info in the log?
|
||||
PhysicalPageID nextPhysicalPageID = 0;
|
||||
for(auto itr = allocatedPhysicalPages.begin(); itr != allocatedPhysicalPages.end(); ++itr) {
|
||||
while(nextPhysicalPageID < *itr) {
|
||||
pager->pagerFile.freePage(nextPhysicalPageID++);
|
||||
}
|
||||
++nextPhysicalPageID;
|
||||
}
|
||||
|
||||
while(nextPhysicalPageID < pager->pagerFile.pagesAllocated) {
|
||||
pager->pagerFile.freePage(nextPhysicalPageID++);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(pager->pageTable.size() < SERVER_KNOBS->PAGER_RESERVED_PAGES) {
|
||||
pager->pageTable.resize(SERVER_KNOBS->PAGER_RESERVED_PAGES);
|
||||
}
|
||||
|
||||
pager->pagerFile.finishedMarkingPages();
|
||||
pager->pagerFile.startVacuuming();
|
||||
|
||||
debug_printf("%s: Finished recovery at v%lld\n", pager->pageFileName.c_str(), pager->latestVersion);
|
||||
TraceEvent("PagerFinishedRecovery").detail("LatestVersion", pager->latestVersion).detail("OldestVersion", pager->oldestVersion).detail("Filename", pager->pageFileName);
|
||||
}
|
||||
catch(Error &e) {
|
||||
if(e.code() != error_code_actor_cancelled) {
|
||||
TraceEvent(SevError, "PagerRecoveryFailed").error(e, true).detail("Filename", pager->pageFileName);
|
||||
}
|
||||
throw;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> housekeeper(IndirectShadowPager *pager) {
|
||||
wait(pager->recovery);
|
||||
wait(Never());
|
||||
loop {
|
||||
state LogicalPageID pageID = 0;
|
||||
for(; pageID < pager->pageTable.size(); ++pageID) {
|
||||
// TODO: pick an appropriate rate for this loop and determine the right way to implement it
|
||||
// Right now, this delays 10ms every 400K pages, which means we have 1s of delay for every
|
||||
// 40M pages. In total, we introduce 100s delay for a max size 4B page file.
|
||||
if(pageID % 400000 == 0) {
|
||||
wait(delay(0.01));
|
||||
}
|
||||
else {
|
||||
wait(yield());
|
||||
}
|
||||
|
||||
auto& pageVersionMap = pager->pageTable[pageID];
|
||||
|
||||
if(pageVersionMap.size() > 0) {
|
||||
auto itr = pageVersionMap.begin();
|
||||
for(auto prev = itr; prev != pageVersionMap.end() && prev->first < pager->oldestVersion; prev=itr) {
|
||||
pager->pagerFile.markPageAllocated(pageID, itr->first, itr->second);
|
||||
++itr;
|
||||
if(prev->second != PagerFile::INVALID_PAGE && (itr == pageVersionMap.end() || itr->first <= pager->oldestVersion)) {
|
||||
pager->freePhysicalPageID(prev->second);
|
||||
}
|
||||
if(itr == pageVersionMap.end() || itr->first >= pager->oldestVersion) {
|
||||
debug_printf("%s: Updating oldest version for logical %u: v%lld\n", pager->pageFileName.c_str(), pageID, pager->oldestVersion);
|
||||
pager->logPageTableClear(pageID, 0, pager->oldestVersion);
|
||||
|
||||
if(itr != pageVersionMap.end() && itr->first > pager->oldestVersion) {
|
||||
debug_printf("%s: Erasing pages to prev from pageVersionMap for %d (itr=%lld, prev=%lld)\n", pager->pageFileName.c_str(), pageID, itr->first, prev->first);
|
||||
prev->first = pager->oldestVersion;
|
||||
pager->logPageTableUpdate(pageID, pager->oldestVersion, prev->second);
|
||||
itr = pageVersionMap.erase(pageVersionMap.begin(), prev);
|
||||
}
|
||||
else {
|
||||
debug_printf("%s: Erasing pages to itr from pageVersionMap for %d (%d) (itr=%lld, prev=%lld)\n", pager->pageFileName.c_str(), pageID, itr == pageVersionMap.end(), itr==pageVersionMap.end() ? -1 : itr->first, prev->first);
|
||||
itr = pageVersionMap.erase(pageVersionMap.begin(), itr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for(; itr != pageVersionMap.end(); ++itr) {
|
||||
pager->pagerFile.markPageAllocated(pageID, itr->first, itr->second);
|
||||
}
|
||||
|
||||
if(pageVersionMap.size() == 0) {
|
||||
pager->freeLogicalPageID(pageID);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pager->pagerFile.finishedMarkingPages();
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> forwardError(Future<Void> f, Promise<Void> target) {
|
||||
try {
|
||||
wait(f);
|
||||
}
|
||||
catch(Error &e) {
|
||||
if(e.code() != error_code_actor_cancelled && target.canBeSet()) {
|
||||
target.sendError(e);
|
||||
}
|
||||
|
||||
throw e;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
IndirectShadowPager::IndirectShadowPager(std::string basename)
|
||||
: basename(basename), latestVersion(0), committedVersion(0), committing(Void()), oldestVersion(0), pagerFile(this)
|
||||
{
|
||||
pageFileName = basename;
|
||||
recovery = forwardError(recover(this), errorPromise);
|
||||
housekeeping = forwardError(housekeeper(this), errorPromise);
|
||||
}
|
||||
|
||||
StorageBytes IndirectShadowPager::getStorageBytes() {
|
||||
int64_t free;
|
||||
int64_t total;
|
||||
g_network->getDiskBytes(parentDirectory(basename), free, total);
|
||||
return StorageBytes(free, total, pagerFile.size(), free + IndirectShadowPage::PAGE_BYTES * pagerFile.getFreePages());
|
||||
}
|
||||
|
||||
Reference<IPage> IndirectShadowPager::newPageBuffer() {
|
||||
return Reference<IPage>(new IndirectShadowPage());
|
||||
}
|
||||
|
||||
int IndirectShadowPager::getUsablePageSize() {
|
||||
return IndirectShadowPage::PAGE_BYTES - IndirectShadowPage::PAGE_OVERHEAD_BYTES;
|
||||
}
|
||||
|
||||
Reference<IPagerSnapshot> IndirectShadowPager::getReadSnapshot(Version version) {
|
||||
debug_printf("%s: Getting read snapshot v%lld latest v%lld oldest v%lld\n", pageFileName.c_str(), version, latestVersion, oldestVersion);
|
||||
ASSERT(recovery.isReady());
|
||||
ASSERT(version <= latestVersion);
|
||||
ASSERT(version >= oldestVersion);
|
||||
|
||||
return Reference<IPagerSnapshot>(new IndirectShadowPagerSnapshot(this, version));
|
||||
}
|
||||
|
||||
LogicalPageID IndirectShadowPager::allocateLogicalPage() {
|
||||
ASSERT(recovery.isReady());
|
||||
|
||||
LogicalPageID allocatedPage;
|
||||
if(logicalFreeList.size() > 0) {
|
||||
allocatedPage = logicalFreeList.front();
|
||||
logicalFreeList.pop_front();
|
||||
}
|
||||
else {
|
||||
ASSERT(pageTable.size() < std::numeric_limits<LogicalPageID>::max()); // TODO: different error?
|
||||
allocatedPage = pageTable.size();
|
||||
pageTable.push_back(PageVersionMap());
|
||||
}
|
||||
|
||||
ASSERT(allocatedPage >= SERVER_KNOBS->PAGER_RESERVED_PAGES);
|
||||
debug_printf("%s: op=allocate id=%u\n", pageFileName.c_str(), allocatedPage);
|
||||
return allocatedPage;
|
||||
}
|
||||
|
||||
void IndirectShadowPager::freeLogicalPage(LogicalPageID pageID, Version version) {
|
||||
ASSERT(recovery.isReady());
|
||||
ASSERT(committing.isReady());
|
||||
|
||||
ASSERT(pageID < pageTable.size());
|
||||
|
||||
PageVersionMap &pageVersionMap = pageTable[pageID];
|
||||
ASSERT(!pageVersionMap.empty());
|
||||
|
||||
// 0 will mean delete as of latest version, similar to write at latest version
|
||||
if(version == 0) {
|
||||
version = pageVersionMap.back().first;
|
||||
}
|
||||
|
||||
auto itr = pageVersionMapLowerBound(pageVersionMap, version);
|
||||
// TODO: Is this correct, that versions from the past *forward* can be deleted?
|
||||
for(auto i = itr; i != pageVersionMap.end(); ++i) {
|
||||
freePhysicalPageID(i->second);
|
||||
}
|
||||
|
||||
if(itr != pageVersionMap.end()) {
|
||||
debug_printf("%s: Clearing newest versions for logical %u: v%lld\n", pageFileName.c_str(), pageID, version);
|
||||
logPageTableClearToEnd(pageID, version);
|
||||
pageVersionMap.erase(itr, pageVersionMap.end());
|
||||
}
|
||||
|
||||
if(pageVersionMap.size() == 0) {
|
||||
debug_printf("%s: Freeing logical %u (freeLogicalPage)\n", pageFileName.c_str(), pageID);
|
||||
logicalFreeList.push_back(pageID);
|
||||
}
|
||||
else if(pageVersionMap.back().second != PagerFile::INVALID_PAGE) {
|
||||
pageVersionMap.push_back(std::make_pair(version, PagerFile::INVALID_PAGE));
|
||||
logPageTableUpdate(pageID, version, PagerFile::INVALID_PAGE);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> waitAndFreePhysicalPageID(IndirectShadowPager *pager, PhysicalPageID pageID, Future<Void> canFree) {
|
||||
wait(canFree);
|
||||
pager->pagerFile.freePage(pageID);
|
||||
return Void();
|
||||
}
|
||||
|
||||
// TODO: Freeing physical pages must be done *after* committing the page map changes that cause the physical page to no longer be used.
|
||||
// Otherwise, the physical page could be reused by a write followed by a power loss in which case the mapping change would not
|
||||
// have been committed and so the physical page should still contain its previous data but it's been overwritten.
|
||||
void IndirectShadowPager::freePhysicalPageID(PhysicalPageID pageID) {
|
||||
debug_printf("%s: Freeing physical %u\n", pageFileName.c_str(), pageID);
|
||||
pagerFile.freePage(pageID);
|
||||
}
|
||||
|
||||
void IndirectShadowPager::writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion, LogicalPageID referencePageID) {
|
||||
ASSERT(recovery.isReady());
|
||||
ASSERT(committing.isReady());
|
||||
|
||||
ASSERT(updateVersion > latestVersion || updateVersion == 0);
|
||||
ASSERT(pageID < pageTable.size());
|
||||
|
||||
PageVersionMap &pageVersionMap = pageTable[pageID];
|
||||
|
||||
ASSERT(pageVersionMap.empty() || pageVersionMap.back().second != PagerFile::INVALID_PAGE);
|
||||
|
||||
// TODO: should this be conditional on the write succeeding?
|
||||
bool updateExisting = updateVersion == 0;
|
||||
if(updateExisting) {
|
||||
// If there is no existing latest version to update then there must be a referencePageID from which to get a latest version
|
||||
// so get that version and change this to a normal update
|
||||
if(pageVersionMap.empty()) {
|
||||
ASSERT(referencePageID != invalidLogicalPageID);
|
||||
PageVersionMap &rpv = pageTable[referencePageID];
|
||||
ASSERT(!rpv.empty());
|
||||
updateVersion = rpv.back().first;
|
||||
updateExisting = false;
|
||||
}
|
||||
else {
|
||||
ASSERT(pageVersionMap.size());
|
||||
updateVersion = pageVersionMap.back().first;
|
||||
}
|
||||
}
|
||||
|
||||
PhysicalPageID physicalPageID = pagerFile.allocatePage(pageID, updateVersion);
|
||||
|
||||
debug_printf("%s: Writing logical %d v%lld physical %d\n", pageFileName.c_str(), pageID, updateVersion, physicalPageID);
|
||||
|
||||
if(updateExisting) {
|
||||
// TODO: Physical page cannot be freed now, it must be done after the page mapping change above is committed
|
||||
//freePhysicalPageID(pageVersionMap.back().second);
|
||||
pageVersionMap.back().second = physicalPageID;
|
||||
}
|
||||
else {
|
||||
ASSERT(pageVersionMap.empty() || pageVersionMap.back().first < updateVersion);
|
||||
pageVersionMap.push_back(std::make_pair(updateVersion, physicalPageID));
|
||||
}
|
||||
|
||||
logPageTableUpdate(pageID, updateVersion, physicalPageID);
|
||||
|
||||
checksumWrite(dataFile.getPtr(), contents->mutate(), IndirectShadowPage::PAGE_BYTES, pageID, physicalPageID);
|
||||
|
||||
Future<Void> write = holdWhile(contents, dataFile->write(contents->begin(), IndirectShadowPage::PAGE_BYTES, (int64_t) physicalPageID * IndirectShadowPage::PAGE_BYTES));
|
||||
|
||||
if(write.isError()) {
|
||||
if(errorPromise.canBeSet()) {
|
||||
errorPromise.sendError(write.getError());
|
||||
}
|
||||
throw write.getError();
|
||||
}
|
||||
writeActors.add(forwardError(write, errorPromise));
|
||||
}
|
||||
|
||||
void IndirectShadowPager::forgetVersions(Version begin, Version end) {
|
||||
ASSERT(recovery.isReady());
|
||||
ASSERT(begin <= end);
|
||||
ASSERT(end <= latestVersion);
|
||||
|
||||
// TODO: support forgetting arbitrary ranges
|
||||
if(begin <= oldestVersion) {
|
||||
oldestVersion = std::max(end, oldestVersion);
|
||||
logVersion(OLDEST_VERSION_KEY, oldestVersion);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> commitImpl(IndirectShadowPager *pager, Future<Void> previousCommit) {
|
||||
state Future<Void> outstandingWrites = pager->writeActors.signalAndCollapse();
|
||||
state Version commitVersion = pager->latestVersion;
|
||||
|
||||
wait(previousCommit);
|
||||
|
||||
pager->logVersion(IndirectShadowPager::LATEST_VERSION_KEY, commitVersion);
|
||||
|
||||
// TODO: we need to prevent writes that happen now from being committed in the subsequent log commit
|
||||
// This is probably best done once we have better control of the log, where we can write a commit entry
|
||||
// here without syncing the file.
|
||||
|
||||
wait(outstandingWrites);
|
||||
|
||||
wait(pager->dataFile->sync());
|
||||
wait(pager->pageTableLog->commit());
|
||||
|
||||
pager->committedVersion = std::max(pager->committedVersion, commitVersion);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> IndirectShadowPager::commit() {
|
||||
ASSERT(recovery.isReady());
|
||||
Future<Void> f = commitImpl(this, committing);
|
||||
committing = f;
|
||||
return committing;
|
||||
}
|
||||
|
||||
void IndirectShadowPager::setLatestVersion(Version version) {
|
||||
ASSERT(recovery.isReady());
|
||||
latestVersion = version;
|
||||
}
|
||||
|
||||
ACTOR Future<Version> getLatestVersionImpl(IndirectShadowPager *pager) {
|
||||
wait(pager->recovery);
|
||||
return pager->latestVersion;
|
||||
}
|
||||
|
||||
Future<Version> IndirectShadowPager::getLatestVersion() {
|
||||
return getLatestVersionImpl(this);
|
||||
}
|
||||
|
||||
Future<Void> IndirectShadowPager::getError() {
|
||||
return errorPromise.getFuture();
|
||||
}
|
||||
|
||||
Future<Void> IndirectShadowPager::onClosed() {
|
||||
return closed.getFuture();
|
||||
}
|
||||
|
||||
ACTOR void shutdown(IndirectShadowPager *pager, bool dispose) {
|
||||
if(pager->errorPromise.canBeSet())
|
||||
pager->errorPromise.sendError(actor_cancelled()); // Ideally this should be shutdown_in_progress
|
||||
|
||||
// Cancel all outstanding reads
|
||||
auto i = pager->busyPages.begin();
|
||||
auto iEnd = pager->busyPages.end();
|
||||
|
||||
while(i != iEnd) {
|
||||
// Advance before calling cancel as the rawRead cancel will destroy the map entry it lives in
|
||||
(i++)->second.read.cancel();
|
||||
}
|
||||
ASSERT(pager->busyPages.empty());
|
||||
|
||||
wait(ready(pager->writeActors.signal()));
|
||||
wait(ready(pager->operations.signal()));
|
||||
wait(ready(pager->committing));
|
||||
|
||||
pager->housekeeping.cancel();
|
||||
pager->pagerFile.shutdown();
|
||||
|
||||
state Future<Void> pageTableClosed = pager->pageTableLog->onClosed();
|
||||
if(dispose) {
|
||||
wait(ready(IAsyncFileSystem::filesystem()->deleteFile(pager->pageFileName, true)));
|
||||
pager->pageTableLog->dispose();
|
||||
}
|
||||
else {
|
||||
pager->pageTableLog->close();
|
||||
}
|
||||
|
||||
wait(ready(pageTableClosed));
|
||||
|
||||
pager->closed.send(Void());
|
||||
delete pager;
|
||||
}
|
||||
|
||||
void IndirectShadowPager::dispose() {
|
||||
shutdown(this, true);
|
||||
}
|
||||
|
||||
void IndirectShadowPager::close() {
|
||||
shutdown(this, false);
|
||||
}
|
||||
|
||||
ACTOR Future<Reference<const IPage>> rawRead(IndirectShadowPager *pager, LogicalPageID logicalPageID, PhysicalPageID physicalPageID) {
|
||||
state void *data;
|
||||
state int len = IndirectShadowPage::PAGE_BYTES;
|
||||
state bool readSuccess = false;
|
||||
|
||||
try {
|
||||
wait(pager->dataFile->readZeroCopy(&data, &len, (int64_t) physicalPageID * IndirectShadowPage::PAGE_BYTES));
|
||||
readSuccess = true;
|
||||
|
||||
if(!checksumRead(pager->dataFile.getPtr(), (uint8_t *)data, len, logicalPageID, physicalPageID)) {
|
||||
throw checksum_failed();
|
||||
}
|
||||
|
||||
pager->busyPages.erase(physicalPageID);
|
||||
return Reference<const IPage>(new IndirectShadowPage((uint8_t *)data, pager->dataFile, physicalPageID));
|
||||
}
|
||||
catch(Error &e) {
|
||||
pager->busyPages.erase(physicalPageID);
|
||||
if(readSuccess || e.code() == error_code_actor_cancelled) {
|
||||
pager->dataFile->releaseZeroCopy(data, len, (int64_t) physicalPageID * IndirectShadowPage::PAGE_BYTES);
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
Future<Reference<const IPage>> getPageImpl(IndirectShadowPager *pager, Reference<IndirectShadowPagerSnapshot> snapshot, LogicalPageID logicalPageID, Version version) {
|
||||
ASSERT(logicalPageID < pager->pageTable.size());
|
||||
PageVersionMap &pageVersionMap = pager->pageTable[logicalPageID];
|
||||
|
||||
auto itr = IndirectShadowPager::pageVersionMapUpperBound(pageVersionMap, version);
|
||||
if(itr == pageVersionMap.begin()) {
|
||||
debug_printf("%s: Page version map empty! op=error id=%u @%lld\n", pager->pageFileName.c_str(), logicalPageID, version);
|
||||
ASSERT(false);
|
||||
}
|
||||
--itr;
|
||||
PhysicalPageID physicalPageID = itr->second;
|
||||
ASSERT(physicalPageID != PagerFile::INVALID_PAGE);
|
||||
|
||||
debug_printf("%s: Reading logical %d v%lld physical %d mapSize %lu\n", pager->pageFileName.c_str(), logicalPageID, version, physicalPageID, pageVersionMap.size());
|
||||
|
||||
IndirectShadowPager::BusyPage &bp = pager->busyPages[physicalPageID];
|
||||
if(!bp.read.isValid()) {
|
||||
Future<Reference<const IPage>> get = rawRead(pager, logicalPageID, physicalPageID);
|
||||
if(!get.isReady()) {
|
||||
bp.read = get;
|
||||
}
|
||||
return get;
|
||||
}
|
||||
return bp.read;
|
||||
}
|
||||
|
||||
Future<Reference<const IPage>> IndirectShadowPager::getPage(Reference<IndirectShadowPagerSnapshot> snapshot, LogicalPageID pageID, Version version) {
|
||||
if(!recovery.isReady()) {
|
||||
debug_printf("%s: getPage failure, recovery not ready - op=error id=%u @%lld\n", pageFileName.c_str(), pageID, version);
|
||||
ASSERT(false);
|
||||
}
|
||||
|
||||
Future<Reference<const IPage>> f = getPageImpl(this, snapshot, pageID, version);
|
||||
operations.add(forwardError(ready(f), errorPromise)); // For some reason if success is ready() then shutdown hangs when waiting on operations
|
||||
return f;
|
||||
}
|
||||
|
||||
PageVersionMap::iterator IndirectShadowPager::pageVersionMapLowerBound(PageVersionMap &pageVersionMap, Version version) {
|
||||
return std::lower_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](std::pair<Version, PhysicalPageID> p, Version v) {
|
||||
return p.first < v;
|
||||
});
|
||||
}
|
||||
|
||||
PageVersionMap::iterator IndirectShadowPager::pageVersionMapUpperBound(PageVersionMap &pageVersionMap, Version version) {
|
||||
return std::upper_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](Version v, std::pair<Version, PhysicalPageID> p) {
|
||||
return v < p.first;
|
||||
});
|
||||
}
|
||||
|
||||
void IndirectShadowPager::freeLogicalPageID(LogicalPageID pageID) {
|
||||
if(pageID >= SERVER_KNOBS->PAGER_RESERVED_PAGES) {
|
||||
debug_printf("%s: Freeing logical %u\n", pageFileName.c_str(), pageID);
|
||||
logicalFreeList.push_back(pageID);
|
||||
}
|
||||
}
|
||||
|
||||
void IndirectShadowPager::logVersion(StringRef versionKey, Version version) {
|
||||
BinaryWriter v(Unversioned());
|
||||
v << version;
|
||||
|
||||
pageTableLog->set(KeyValueRef(versionKey, v.toValue()));
|
||||
}
|
||||
|
||||
void IndirectShadowPager::logPagesAllocated() {
|
||||
BinaryWriter v(Unversioned());
|
||||
v << pagerFile.getPagesAllocated();
|
||||
|
||||
pageTableLog->set(KeyValueRef(PAGES_ALLOCATED_KEY, v.toValue()));
|
||||
}
|
||||
|
||||
void IndirectShadowPager::logPageTableUpdate(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID) {
|
||||
BinaryWriter k(Unversioned());
|
||||
k << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(version);
|
||||
|
||||
BinaryWriter v(Unversioned());
|
||||
v << physicalPageID;
|
||||
|
||||
pageTableLog->set(KeyValueRef(k.toValue(), v.toValue()));
|
||||
}
|
||||
|
||||
void IndirectShadowPager::logPageTableClearToEnd(LogicalPageID logicalPageID, Version start) {
|
||||
BinaryWriter b(Unversioned());
|
||||
b << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(start);
|
||||
|
||||
BinaryWriter e(Unversioned());
|
||||
e << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID);
|
||||
|
||||
pageTableLog->clear(KeyRangeRef(b.toValue(), strinc(e.toValue())));
|
||||
}
|
||||
|
||||
void IndirectShadowPager::logPageTableClear(LogicalPageID logicalPageID, Version start, Version end) {
|
||||
BinaryWriter b(Unversioned());
|
||||
b << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(start);
|
||||
|
||||
BinaryWriter e(Unversioned());
|
||||
e << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(end);
|
||||
|
||||
pageTableLog->clear(KeyRangeRef(b.toValue(), e.toValue()));
|
||||
}
|
||||
|
||||
const StringRef IndirectShadowPager::LATEST_VERSION_KEY = LiteralStringRef("\xff/LatestVersion");
|
||||
const StringRef IndirectShadowPager::OLDEST_VERSION_KEY = LiteralStringRef("\xff/OldestVersion");
|
||||
const StringRef IndirectShadowPager::PAGES_ALLOCATED_KEY = LiteralStringRef("\xff/PagesAllocated");
|
||||
const StringRef IndirectShadowPager::TABLE_ENTRY_PREFIX = LiteralStringRef("\x00");
|
||||
|
||||
ACTOR Future<Void> copyPage(IndirectShadowPager *pager, Reference<IPage> page, LogicalPageID logical, PhysicalPageID from, PhysicalPageID to) {
|
||||
state bool zeroCopied = true;
|
||||
state int bytes = IndirectShadowPage::PAGE_BYTES;
|
||||
state void *data = nullptr;
|
||||
|
||||
try {
|
||||
try {
|
||||
wait(pager->dataFile->readZeroCopy(&data, &bytes, (int64_t)from * IndirectShadowPage::PAGE_BYTES));
|
||||
}
|
||||
catch(Error &e) {
|
||||
zeroCopied = false;
|
||||
data = page->mutate();
|
||||
int _bytes = wait(pager->dataFile->read(data, page->size(), (int64_t)from * IndirectShadowPage::PAGE_BYTES));
|
||||
bytes = _bytes;
|
||||
}
|
||||
|
||||
ASSERT(bytes == IndirectShadowPage::PAGE_BYTES);
|
||||
checksumWrite(pager->dataFile.getPtr(), page->mutate(), bytes, logical, to);
|
||||
wait(pager->dataFile->write(data, bytes, (int64_t)to * IndirectShadowPage::PAGE_BYTES));
|
||||
if(zeroCopied) {
|
||||
pager->dataFile->releaseZeroCopy(data, bytes, (int64_t)from * IndirectShadowPage::PAGE_BYTES);
|
||||
}
|
||||
}
|
||||
catch(Error &e) {
|
||||
if(zeroCopied) {
|
||||
pager->dataFile->releaseZeroCopy(data, bytes, (int64_t)from * IndirectShadowPage::PAGE_BYTES);
|
||||
}
|
||||
pager->pagerFile.freePage(to);
|
||||
throw e;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> vacuumer(IndirectShadowPager *pager, PagerFile *pagerFile) {
|
||||
state Reference<IPage> page(new IndirectShadowPage());
|
||||
|
||||
loop {
|
||||
state double start = now();
|
||||
while(!pagerFile->canVacuum()) {
|
||||
wait(delay(1.0));
|
||||
}
|
||||
|
||||
ASSERT(!pagerFile->freePages.empty());
|
||||
|
||||
if(!pagerFile->vacuumQueue.empty()) {
|
||||
state PhysicalPageID lastUsedPage = pagerFile->vacuumQueue.rbegin()->first;
|
||||
PhysicalPageID lastFreePage = *pagerFile->freePages.rbegin();
|
||||
debug_printf("%s: Vacuuming: evaluating (free list size=%lu, lastFreePage=%u, lastUsedPage=%u, pagesAllocated=%u)\n", pager->pageFileName.c_str(), pagerFile->freePages.size(), lastFreePage, lastUsedPage, pagerFile->pagesAllocated);
|
||||
ASSERT(lastFreePage < pagerFile->pagesAllocated);
|
||||
ASSERT(lastUsedPage < pagerFile->pagesAllocated);
|
||||
ASSERT(lastFreePage != lastUsedPage);
|
||||
|
||||
if(lastFreePage < lastUsedPage) {
|
||||
state std::pair<LogicalPageID, Version> logicalPageInfo = pagerFile->vacuumQueue[lastUsedPage];
|
||||
state PhysicalPageID newPage = pagerFile->allocatePage(logicalPageInfo.first, logicalPageInfo.second);
|
||||
|
||||
debug_printf("%s: Vacuuming: copying page %u to %u\n", pager->pageFileName.c_str(), lastUsedPage, newPage);
|
||||
wait(copyPage(pager, page, logicalPageInfo.first, lastUsedPage, newPage));
|
||||
|
||||
auto &pageVersionMap = pager->pageTable[logicalPageInfo.first];
|
||||
auto itr = IndirectShadowPager::pageVersionMapLowerBound(pageVersionMap, logicalPageInfo.second);
|
||||
if(itr != pageVersionMap.end() && itr->second == lastUsedPage) {
|
||||
itr->second = newPage;
|
||||
pager->logPageTableUpdate(logicalPageInfo.first, itr->first, newPage);
|
||||
pagerFile->freePage(lastUsedPage);
|
||||
}
|
||||
else {
|
||||
TEST(true); // page was freed while vacuuming
|
||||
pagerFile->freePage(newPage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
PhysicalPageID firstFreePage = pagerFile->vacuumQueue.empty() ? pagerFile->minVacuumQueuePage : (pagerFile->vacuumQueue.rbegin()->first + 1);
|
||||
ASSERT(pagerFile->pagesAllocated >= firstFreePage);
|
||||
|
||||
uint64_t pagesToErase = 0;
|
||||
if(pagerFile->freePages.size() >= SERVER_KNOBS->FREE_PAGE_VACUUM_THRESHOLD) {
|
||||
pagesToErase = std::min<uint64_t>(pagerFile->freePages.size() - SERVER_KNOBS->FREE_PAGE_VACUUM_THRESHOLD + 1, pagerFile->pagesAllocated - firstFreePage);
|
||||
}
|
||||
|
||||
debug_printf("%s: Vacuuming: got %llu pages to erase (freePages=%lu, pagesAllocated=%u, vacuumQueueEmpty=%u, minVacuumQueuePage=%u, firstFreePage=%u)\n", pager->pageFileName.c_str(), pagesToErase, pagerFile->freePages.size(), pagerFile->pagesAllocated, pagerFile->vacuumQueue.empty(), pagerFile->minVacuumQueuePage, firstFreePage);
|
||||
|
||||
if(pagesToErase > 0) {
|
||||
PhysicalPageID eraseStartPage = pagerFile->pagesAllocated - pagesToErase;
|
||||
debug_printf("%s: Vacuuming: truncating last %llu pages starting at %u\n", pager->pageFileName.c_str(), pagesToErase, eraseStartPage);
|
||||
|
||||
ASSERT(pagesToErase <= pagerFile->pagesAllocated);
|
||||
|
||||
pagerFile->pagesAllocated = eraseStartPage;
|
||||
pager->logPagesAllocated();
|
||||
|
||||
auto freePageItr = pagerFile->freePages.find(eraseStartPage);
|
||||
ASSERT(freePageItr != pagerFile->freePages.end());
|
||||
|
||||
pagerFile->freePages.erase(freePageItr, pagerFile->freePages.end());
|
||||
ASSERT(pagerFile->vacuumQueue.empty() || pagerFile->vacuumQueue.rbegin()->first < eraseStartPage);
|
||||
|
||||
wait(pager->dataFile->truncate((int64_t)pagerFile->pagesAllocated * IndirectShadowPage::PAGE_BYTES));
|
||||
}
|
||||
|
||||
wait(delayUntil(start + (double)IndirectShadowPage::PAGE_BYTES / SERVER_KNOBS->VACUUM_BYTES_PER_SECOND)); // TODO: figure out the correct mechanism here
|
||||
}
|
||||
}
|
||||
|
||||
PagerFile::PagerFile(IndirectShadowPager *pager) : fileSize(0), pagesAllocated(0), pager(pager), vacuumQueueReady(false), minVacuumQueuePage(0) {}
|
||||
|
||||
PhysicalPageID PagerFile::allocatePage(LogicalPageID logicalPageID, Version version) {
|
||||
ASSERT((int64_t)pagesAllocated * IndirectShadowPage::PAGE_BYTES <= fileSize);
|
||||
ASSERT(fileSize % IndirectShadowPage::PAGE_BYTES == 0);
|
||||
|
||||
PhysicalPageID allocatedPage;
|
||||
if(!freePages.empty()) {
|
||||
allocatedPage = *freePages.begin();
|
||||
freePages.erase(freePages.begin());
|
||||
}
|
||||
else {
|
||||
if((int64_t)pagesAllocated * IndirectShadowPage::PAGE_BYTES == fileSize) {
|
||||
fileSize += (1 << 24);
|
||||
// TODO: extend the file before writing beyond the end.
|
||||
}
|
||||
|
||||
ASSERT(pagesAllocated < INVALID_PAGE); // TODO: we should throw a better error here
|
||||
allocatedPage = pagesAllocated++;
|
||||
pager->logPagesAllocated();
|
||||
}
|
||||
|
||||
markPageAllocated(logicalPageID, version, allocatedPage);
|
||||
|
||||
debug_printf("%s: Allocated physical %u\n", pager->pageFileName.c_str(), allocatedPage);
|
||||
return allocatedPage;
|
||||
}
|
||||
|
||||
void PagerFile::freePage(PhysicalPageID pageID) {
|
||||
freePages.insert(pageID);
|
||||
|
||||
if(pageID >= minVacuumQueuePage) {
|
||||
vacuumQueue.erase(pageID);
|
||||
}
|
||||
}
|
||||
|
||||
void PagerFile::markPageAllocated(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID) {
|
||||
if(physicalPageID != INVALID_PAGE && physicalPageID >= minVacuumQueuePage) {
|
||||
vacuumQueue[physicalPageID] = std::make_pair(logicalPageID, version);
|
||||
}
|
||||
}
|
||||
|
||||
void PagerFile::finishedMarkingPages() {
|
||||
if(minVacuumQueuePage >= pagesAllocated) {
|
||||
minVacuumQueuePage = pagesAllocated >= SERVER_KNOBS->VACUUM_QUEUE_SIZE ? pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE : 0;
|
||||
vacuumQueueReady = false;
|
||||
}
|
||||
else {
|
||||
if(!vacuumQueueReady) {
|
||||
vacuumQueueReady = true;
|
||||
}
|
||||
if(pagesAllocated > SERVER_KNOBS->VACUUM_QUEUE_SIZE && minVacuumQueuePage < pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE) {
|
||||
minVacuumQueuePage = pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE;
|
||||
auto itr = vacuumQueue.lower_bound(minVacuumQueuePage);
|
||||
vacuumQueue.erase(vacuumQueue.begin(), itr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t PagerFile::size() {
|
||||
return fileSize;
|
||||
}
|
||||
|
||||
uint32_t PagerFile::getPagesAllocated() {
|
||||
return pagesAllocated;
|
||||
}
|
||||
|
||||
uint32_t PagerFile::getFreePages() {
|
||||
return freePages.size();
|
||||
}
|
||||
|
||||
void PagerFile::init(uint64_t fileSize, uint32_t pagesAllocated) {
|
||||
this->fileSize = fileSize;
|
||||
this->pagesAllocated = pagesAllocated;
|
||||
this->minVacuumQueuePage = pagesAllocated >= SERVER_KNOBS->VACUUM_QUEUE_SIZE ? pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE : 0;
|
||||
}
|
||||
|
||||
void PagerFile::startVacuuming() {
|
||||
vacuuming = Never(); //vacuumer(pager, this);
|
||||
}
|
||||
|
||||
void PagerFile::shutdown() {
|
||||
vacuuming.cancel();
|
||||
}
|
||||
|
||||
bool PagerFile::canVacuum() {
|
||||
if(freePages.size() < SERVER_KNOBS->FREE_PAGE_VACUUM_THRESHOLD // Not enough free pages
|
||||
|| minVacuumQueuePage >= pagesAllocated // We finished processing all pages in the vacuum queue
|
||||
|| !vacuumQueueReady) // Populating vacuum queue
|
||||
{
|
||||
debug_printf("%s: Vacuuming: waiting for vacuumable pages (free list size=%lu, minVacuumQueuePage=%u, pages allocated=%u, vacuumQueueReady=%d)\n", pager->pageFileName.c_str(), freePages.size(), minVacuumQueuePage, pagesAllocated, vacuumQueueReady);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
const PhysicalPageID PagerFile::INVALID_PAGE = std::numeric_limits<PhysicalPageID>::max();
|
||||
|
||||
extern Future<Void> simplePagerTest(IPager* const& pager);
|
||||
|
||||
TEST_CASE("/fdbserver/indirectshadowpager/simple") {
|
||||
state IPager *pager = new IndirectShadowPager("unittest_pageFile");
|
||||
|
||||
wait(simplePagerTest(pager));
|
||||
|
||||
Future<Void> closedFuture = pager->onClosed();
|
||||
pager->close();
|
||||
wait(closedFuture);
|
||||
|
||||
return Void();
|
||||
}
|
|
@ -1,215 +0,0 @@
|
|||
/*
|
||||
* IndirectShadowPager.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBSERVER_INDIRECTSHADOWPAGER_H
|
||||
#define FDBSERVER_INDIRECTSHADOWPAGER_H
|
||||
#pragma once
|
||||
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "fdbserver/IPager.h"
|
||||
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "fdbclient/Notified.h"
|
||||
|
||||
#include "fdbrpc/IAsyncFile.h"
|
||||
|
||||
typedef uint32_t PhysicalPageID;
|
||||
typedef std::vector<std::pair<Version, PhysicalPageID>> PageVersionMap;
|
||||
typedef std::vector<PageVersionMap> LogicalPageTable;
|
||||
|
||||
class IndirectShadowPager;
|
||||
|
||||
class IndirectShadowPage : public IPage, ReferenceCounted<IndirectShadowPage> {
|
||||
public:
|
||||
IndirectShadowPage();
|
||||
IndirectShadowPage(uint8_t *data, Reference<IAsyncFile> file, PhysicalPageID pageID)
|
||||
: file(file), physicalPageID(pageID), fastAllocated(false), data(data) {}
|
||||
virtual ~IndirectShadowPage();
|
||||
|
||||
virtual void addref() const {
|
||||
ReferenceCounted<IndirectShadowPage>::addref();
|
||||
}
|
||||
|
||||
virtual void delref() const {
|
||||
ReferenceCounted<IndirectShadowPage>::delref();
|
||||
}
|
||||
|
||||
virtual int size() const;
|
||||
virtual uint8_t const* begin() const;
|
||||
virtual uint8_t* mutate();
|
||||
|
||||
//private:
|
||||
static const int PAGE_BYTES;
|
||||
static const int PAGE_OVERHEAD_BYTES;
|
||||
|
||||
private:
|
||||
Reference<IAsyncFile> file;
|
||||
PhysicalPageID physicalPageID;
|
||||
bool fastAllocated;
|
||||
uint8_t *data;
|
||||
};
|
||||
|
||||
class IndirectShadowPagerSnapshot : public IPagerSnapshot, ReferenceCounted<IndirectShadowPagerSnapshot> {
|
||||
public:
|
||||
IndirectShadowPagerSnapshot(IndirectShadowPager *pager, Version version);
|
||||
|
||||
virtual Future<Reference<const IPage>> getPhysicalPage(LogicalPageID pageID, bool cacheable);
|
||||
|
||||
virtual Version getVersion() const {
|
||||
return version;
|
||||
}
|
||||
|
||||
virtual ~IndirectShadowPagerSnapshot() {
|
||||
}
|
||||
|
||||
virtual void addref() {
|
||||
ReferenceCounted<IndirectShadowPagerSnapshot>::addref();
|
||||
}
|
||||
|
||||
virtual void delref() {
|
||||
ReferenceCounted<IndirectShadowPagerSnapshot>::delref();
|
||||
}
|
||||
|
||||
private:
|
||||
IndirectShadowPager *pager;
|
||||
Version version;
|
||||
Future<Void> pagerError;
|
||||
};
|
||||
|
||||
class PagerFile {
|
||||
public:
|
||||
PagerFile(IndirectShadowPager *pager);
|
||||
|
||||
PhysicalPageID allocatePage(LogicalPageID logicalPageID, Version version);
|
||||
void freePage(PhysicalPageID physicalPageID);
|
||||
void markPageAllocated(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID);
|
||||
|
||||
void finishedMarkingPages();
|
||||
|
||||
uint64_t size();
|
||||
uint32_t getPagesAllocated();
|
||||
uint32_t getFreePages();
|
||||
|
||||
void init(uint64_t fileSize, uint32_t pagesAllocated);
|
||||
void startVacuuming();
|
||||
void shutdown();
|
||||
|
||||
//private:
|
||||
Future<Void> vacuuming;
|
||||
IndirectShadowPager *pager;
|
||||
|
||||
uint32_t pagesAllocated;
|
||||
uint64_t fileSize;
|
||||
|
||||
std::set<PhysicalPageID> freePages;
|
||||
|
||||
PhysicalPageID minVacuumQueuePage;
|
||||
bool vacuumQueueReady;
|
||||
std::map<PhysicalPageID, std::pair<LogicalPageID, Version>> vacuumQueue;
|
||||
|
||||
bool canVacuum();
|
||||
|
||||
static const PhysicalPageID INVALID_PAGE;
|
||||
};
|
||||
|
||||
class IndirectShadowPager : public IPager {
|
||||
public:
|
||||
IndirectShadowPager(std::string basename);
|
||||
virtual ~IndirectShadowPager() {
|
||||
}
|
||||
|
||||
virtual Reference<IPage> newPageBuffer();
|
||||
virtual int getUsablePageSize();
|
||||
|
||||
virtual Reference<IPagerSnapshot> getReadSnapshot(Version version);
|
||||
|
||||
virtual LogicalPageID allocateLogicalPage();
|
||||
virtual void freeLogicalPage(LogicalPageID pageID, Version version);
|
||||
virtual void writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion, LogicalPageID referencePageID);
|
||||
virtual void forgetVersions(Version begin, Version end);
|
||||
virtual Future<Void> commit();
|
||||
|
||||
virtual void setLatestVersion(Version version);
|
||||
virtual Future<Version> getLatestVersion();
|
||||
|
||||
virtual StorageBytes getStorageBytes();
|
||||
|
||||
virtual Future<Void> getError();
|
||||
virtual Future<Void> onClosed();
|
||||
virtual void dispose();
|
||||
virtual void close();
|
||||
|
||||
Future<Reference<const IPage>> getPage(Reference<IndirectShadowPagerSnapshot> snapshot, LogicalPageID pageID, Version version);
|
||||
|
||||
//private:
|
||||
std::string basename;
|
||||
std::string pageFileName;
|
||||
|
||||
Version latestVersion;
|
||||
Version committedVersion;
|
||||
|
||||
LogicalPageTable pageTable;
|
||||
IKeyValueStore *pageTableLog;
|
||||
|
||||
Reference<IAsyncFile> dataFile;
|
||||
Future<Void> recovery;
|
||||
|
||||
Future<Void> housekeeping;
|
||||
Future<Void> vacuuming;
|
||||
Version oldestVersion;
|
||||
|
||||
// TODO: This structure maybe isn't needed
|
||||
struct BusyPage {
|
||||
Future<Reference<const IPage>> read;
|
||||
};
|
||||
|
||||
typedef std::map<PhysicalPageID, BusyPage> BusyPageMapT;
|
||||
BusyPageMapT busyPages;
|
||||
|
||||
SignalableActorCollection operations;
|
||||
SignalableActorCollection writeActors;
|
||||
Future<Void> committing;
|
||||
|
||||
Promise<Void> closed;
|
||||
Promise<Void> errorPromise;
|
||||
|
||||
std::deque<LogicalPageID> logicalFreeList;
|
||||
PagerFile pagerFile;
|
||||
|
||||
static PageVersionMap::iterator pageVersionMapLowerBound(PageVersionMap &pageVersionMap, Version v);
|
||||
static PageVersionMap::iterator pageVersionMapUpperBound(PageVersionMap &pageVersionMap, Version v);
|
||||
|
||||
void freeLogicalPageID(LogicalPageID pageID);
|
||||
void freePhysicalPageID(PhysicalPageID pageID);
|
||||
|
||||
void logVersion(StringRef versionKey, Version version);
|
||||
void logPagesAllocated();
|
||||
void logPageTableUpdate(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID);
|
||||
void logPageTableClearToEnd(LogicalPageID logicalPageID, Version start);
|
||||
void logPageTableClear(LogicalPageID logicalPageID, Version start, Version end);
|
||||
|
||||
static const StringRef LATEST_VERSION_KEY;
|
||||
static const StringRef OLDEST_VERSION_KEY;
|
||||
static const StringRef PAGES_ALLOCATED_KEY;
|
||||
static const StringRef TABLE_ENTRY_PREFIX;
|
||||
|
||||
};
|
||||
|
||||
#endif
|
|
@ -1,456 +0,0 @@
|
|||
/*
|
||||
* MemoryPager.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <cinttypes>
|
||||
#include "fdbserver/MemoryPager.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "flow/actorcompiler.h"
|
||||
|
||||
typedef uint8_t* PhysicalPageID;
|
||||
typedef std::vector<std::pair<Version, PhysicalPageID>> PageVersionMap;
|
||||
typedef std::vector<PageVersionMap> LogicalPageTable;
|
||||
|
||||
class MemoryPager;
|
||||
|
||||
class MemoryPage : public IPage, ReferenceCounted<MemoryPage> {
|
||||
public:
|
||||
MemoryPage();
|
||||
MemoryPage(uint8_t *data);
|
||||
virtual ~MemoryPage();
|
||||
|
||||
virtual void addref() const {
|
||||
ReferenceCounted<MemoryPage>::addref();
|
||||
}
|
||||
|
||||
virtual void delref() const {
|
||||
ReferenceCounted<MemoryPage>::delref();
|
||||
}
|
||||
|
||||
virtual int size() const;
|
||||
virtual uint8_t const* begin() const;
|
||||
virtual uint8_t* mutate();
|
||||
|
||||
private:
|
||||
friend class MemoryPager;
|
||||
uint8_t *data;
|
||||
bool allocated;
|
||||
|
||||
static const int PAGE_BYTES;
|
||||
};
|
||||
|
||||
class MemoryPagerSnapshot : public IPagerSnapshot, ReferenceCounted<MemoryPagerSnapshot> {
|
||||
public:
|
||||
MemoryPagerSnapshot(MemoryPager *pager, Version version) : pager(pager), version(version) {}
|
||||
virtual Future<Reference<const IPage>> getPhysicalPage(LogicalPageID pageID, bool cacheable);
|
||||
virtual Version getVersion() const {
|
||||
return version;
|
||||
}
|
||||
|
||||
virtual void addref() {
|
||||
ReferenceCounted<MemoryPagerSnapshot>::addref();
|
||||
}
|
||||
|
||||
virtual void delref() {
|
||||
ReferenceCounted<MemoryPagerSnapshot>::delref();
|
||||
}
|
||||
|
||||
private:
|
||||
MemoryPager *pager;
|
||||
Version version;
|
||||
};
|
||||
|
||||
class MemoryPager : public IPager, ReferenceCounted<MemoryPager> {
|
||||
public:
|
||||
MemoryPager();
|
||||
|
||||
virtual Reference<IPage> newPageBuffer();
|
||||
virtual int getUsablePageSize();
|
||||
|
||||
virtual Reference<IPagerSnapshot> getReadSnapshot(Version version);
|
||||
|
||||
virtual LogicalPageID allocateLogicalPage();
|
||||
virtual void freeLogicalPage(LogicalPageID pageID, Version version);
|
||||
virtual void writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion, LogicalPageID referencePageID);
|
||||
virtual void forgetVersions(Version begin, Version end);
|
||||
virtual Future<Void> commit();
|
||||
|
||||
virtual StorageBytes getStorageBytes() {
|
||||
// TODO: Get actual values for used and free memory
|
||||
return StorageBytes();
|
||||
}
|
||||
|
||||
virtual void setLatestVersion(Version version);
|
||||
virtual Future<Version> getLatestVersion();
|
||||
|
||||
virtual Future<Void> getError();
|
||||
virtual Future<Void> onClosed();
|
||||
virtual void dispose();
|
||||
virtual void close();
|
||||
|
||||
virtual Reference<const IPage> getPage(LogicalPageID pageID, Version version);
|
||||
|
||||
private:
|
||||
Version latestVersion;
|
||||
Version committedVersion;
|
||||
Standalone<VectorRef<VectorRef<uint8_t>>> data;
|
||||
LogicalPageTable pageTable;
|
||||
|
||||
Promise<Void> closed;
|
||||
|
||||
std::vector<PhysicalPageID> freeList; // TODO: is this good enough for now?
|
||||
|
||||
PhysicalPageID allocatePage(Reference<IPage> contents);
|
||||
void extendData();
|
||||
|
||||
static const PhysicalPageID INVALID_PAGE;
|
||||
};
|
||||
|
||||
IPager * createMemoryPager() {
|
||||
return new MemoryPager();
|
||||
}
|
||||
|
||||
MemoryPage::MemoryPage() : allocated(true) {
|
||||
data = (uint8_t*)FastAllocator<4096>::allocate();
|
||||
}
|
||||
|
||||
MemoryPage::MemoryPage(uint8_t *data) : data(data), allocated(false) {}
|
||||
|
||||
MemoryPage::~MemoryPage() {
|
||||
if(allocated) {
|
||||
FastAllocator<4096>::release(data);
|
||||
}
|
||||
}
|
||||
|
||||
uint8_t const* MemoryPage::begin() const {
|
||||
return data;
|
||||
}
|
||||
|
||||
uint8_t* MemoryPage::mutate() {
|
||||
return data;
|
||||
}
|
||||
|
||||
int MemoryPage::size() const {
|
||||
return PAGE_BYTES;
|
||||
}
|
||||
|
||||
const int MemoryPage::PAGE_BYTES = 4096;
|
||||
|
||||
Future<Reference<const IPage>> MemoryPagerSnapshot::getPhysicalPage(LogicalPageID pageID, bool cacheable) {
|
||||
return pager->getPage(pageID, version);
|
||||
}
|
||||
|
||||
MemoryPager::MemoryPager() : latestVersion(0), committedVersion(0) {
|
||||
extendData();
|
||||
pageTable.resize(SERVER_KNOBS->PAGER_RESERVED_PAGES);
|
||||
}
|
||||
|
||||
Reference<IPage> MemoryPager::newPageBuffer() {
|
||||
return Reference<IPage>(new MemoryPage());
|
||||
}
|
||||
|
||||
int MemoryPager::getUsablePageSize() {
|
||||
return MemoryPage::PAGE_BYTES;
|
||||
}
|
||||
|
||||
Reference<IPagerSnapshot> MemoryPager::getReadSnapshot(Version version) {
|
||||
ASSERT(version <= latestVersion);
|
||||
return Reference<IPagerSnapshot>(new MemoryPagerSnapshot(this, version));
|
||||
}
|
||||
|
||||
LogicalPageID MemoryPager::allocateLogicalPage() {
|
||||
ASSERT(pageTable.size() >= SERVER_KNOBS->PAGER_RESERVED_PAGES);
|
||||
pageTable.push_back(PageVersionMap());
|
||||
return pageTable.size() - 1;
|
||||
}
|
||||
|
||||
void MemoryPager::freeLogicalPage(LogicalPageID pageID, Version version) {
|
||||
ASSERT(pageID < pageTable.size());
|
||||
|
||||
PageVersionMap &pageVersionMap = pageTable[pageID];
|
||||
ASSERT(!pageVersionMap.empty());
|
||||
|
||||
auto itr = std::lower_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](std::pair<Version, PhysicalPageID> p, Version v) {
|
||||
return p.first < v;
|
||||
});
|
||||
|
||||
pageVersionMap.erase(itr, pageVersionMap.end());
|
||||
if(pageVersionMap.size() > 0 && pageVersionMap.back().second != INVALID_PAGE) {
|
||||
pageVersionMap.push_back(std::make_pair(version, INVALID_PAGE));
|
||||
}
|
||||
}
|
||||
|
||||
void MemoryPager::writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion, LogicalPageID referencePageID) {
|
||||
ASSERT(updateVersion > latestVersion || updateVersion == 0);
|
||||
ASSERT(pageID < pageTable.size());
|
||||
|
||||
if(referencePageID != invalidLogicalPageID) {
|
||||
PageVersionMap &rpv = pageTable[referencePageID];
|
||||
ASSERT(!rpv.empty());
|
||||
updateVersion = rpv.back().first;
|
||||
}
|
||||
|
||||
PageVersionMap &pageVersionMap = pageTable[pageID];
|
||||
|
||||
ASSERT(updateVersion >= committedVersion || updateVersion == 0);
|
||||
PhysicalPageID physicalPageID = allocatePage(contents);
|
||||
|
||||
ASSERT(pageVersionMap.empty() || pageVersionMap.back().second != INVALID_PAGE);
|
||||
|
||||
if(updateVersion == 0) {
|
||||
ASSERT(pageVersionMap.size());
|
||||
updateVersion = pageVersionMap.back().first;
|
||||
pageVersionMap.back().second = physicalPageID;
|
||||
// TODO: what to do with old page?
|
||||
}
|
||||
else {
|
||||
ASSERT(pageVersionMap.empty() || pageVersionMap.back().first < updateVersion);
|
||||
pageVersionMap.push_back(std::make_pair(updateVersion, physicalPageID));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void MemoryPager::forgetVersions(Version begin, Version end) {
|
||||
ASSERT(begin <= end);
|
||||
ASSERT(end <= latestVersion);
|
||||
// TODO
|
||||
}
|
||||
|
||||
Future<Void> MemoryPager::commit() {
|
||||
ASSERT(committedVersion < latestVersion);
|
||||
committedVersion = latestVersion;
|
||||
return Void();
|
||||
}
|
||||
|
||||
void MemoryPager::setLatestVersion(Version version) {
|
||||
ASSERT(version > latestVersion);
|
||||
latestVersion = version;
|
||||
}
|
||||
|
||||
Future<Version> MemoryPager::getLatestVersion() {
|
||||
return latestVersion;
|
||||
}
|
||||
|
||||
Reference<const IPage> MemoryPager::getPage(LogicalPageID pageID, Version version) {
|
||||
ASSERT(pageID < pageTable.size());
|
||||
PageVersionMap const& pageVersionMap = pageTable[pageID];
|
||||
|
||||
auto itr = std::upper_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](Version v, std::pair<Version, PhysicalPageID> p) {
|
||||
return v < p.first;
|
||||
});
|
||||
|
||||
if(itr == pageVersionMap.begin()) {
|
||||
return Reference<IPage>(); // TODO: should this be an error?
|
||||
}
|
||||
|
||||
--itr;
|
||||
|
||||
ASSERT(itr->second != INVALID_PAGE);
|
||||
return Reference<const IPage>(new MemoryPage(itr->second)); // TODO: Page memory owned by the pager. Change this?
|
||||
}
|
||||
|
||||
Future<Void> MemoryPager::getError() {
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> MemoryPager::onClosed() {
|
||||
return closed.getFuture();
|
||||
}
|
||||
|
||||
void MemoryPager::dispose() {
|
||||
closed.send(Void());
|
||||
delete this;
|
||||
}
|
||||
|
||||
void MemoryPager::close() {
|
||||
dispose();
|
||||
}
|
||||
|
||||
PhysicalPageID MemoryPager::allocatePage(Reference<IPage> contents) {
|
||||
if(freeList.size()) {
|
||||
PhysicalPageID pageID = freeList.back();
|
||||
freeList.pop_back();
|
||||
|
||||
memcpy(pageID, contents->begin(), contents->size());
|
||||
return pageID;
|
||||
}
|
||||
else {
|
||||
ASSERT(data.size() && data.back().capacity() - data.back().size() >= contents->size());
|
||||
PhysicalPageID pageID = data.back().end();
|
||||
|
||||
data.back().append(data.arena(), contents->begin(), contents->size());
|
||||
if(data.back().size() == data.back().capacity()) {
|
||||
extendData();
|
||||
}
|
||||
else {
|
||||
ASSERT(data.back().size() <= data.back().capacity() - 4096);
|
||||
}
|
||||
|
||||
return pageID;
|
||||
}
|
||||
}
|
||||
|
||||
void MemoryPager::extendData() {
|
||||
if(data.size() > 1000) { // TODO: is this an ok way to handle large data size?
|
||||
throw io_error();
|
||||
}
|
||||
|
||||
VectorRef<uint8_t> d;
|
||||
d.reserve(data.arena(), 1 << 22);
|
||||
data.push_back(data.arena(), d);
|
||||
}
|
||||
|
||||
// TODO: these tests are not MemoryPager specific, we should make them more general
|
||||
|
||||
void fillPage(Reference<IPage> page, LogicalPageID pageID, Version version) {
|
||||
ASSERT(page->size() > sizeof(LogicalPageID) + sizeof(Version));
|
||||
|
||||
memset(page->mutate(), 0, page->size());
|
||||
memcpy(page->mutate(), (void*)&pageID, sizeof(LogicalPageID));
|
||||
memcpy(page->mutate() + sizeof(LogicalPageID), (void*)&version, sizeof(Version));
|
||||
}
|
||||
|
||||
bool validatePage(Reference<const IPage> page, LogicalPageID pageID, Version version) {
|
||||
bool valid = true;
|
||||
|
||||
LogicalPageID readPageID = *(LogicalPageID*)page->begin();
|
||||
if(readPageID != pageID) {
|
||||
fprintf(stderr, "Invalid PageID detected: %u (expected %u)\n", readPageID, pageID);
|
||||
valid = false;
|
||||
}
|
||||
|
||||
Version readVersion = *(Version*)(page->begin()+sizeof(LogicalPageID));
|
||||
if(readVersion != version) {
|
||||
fprintf(stderr, "Invalid Version detected on page %u: %" PRId64 "(expected %" PRId64 ")\n", pageID, readVersion, version);
|
||||
valid = false;
|
||||
}
|
||||
|
||||
return valid;
|
||||
}
|
||||
|
||||
void writePage(IPager *pager, Reference<IPage> page, LogicalPageID pageID, Version version, bool updateVersion=true) {
|
||||
fillPage(page, pageID, version);
|
||||
pager->writePage(pageID, page, updateVersion ? version : 0);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> commit(IPager *pager) {
|
||||
static int commitNum = 1;
|
||||
state int myCommit = commitNum++;
|
||||
|
||||
debug_printf("Commit%d\n", myCommit);
|
||||
wait(pager->commit());
|
||||
debug_printf("FinishedCommit%d\n", myCommit);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> read(IPager *pager, LogicalPageID pageID, Version version, Version expectedVersion=-1) {
|
||||
static int readNum = 1;
|
||||
state int myRead = readNum++;
|
||||
state Reference<IPagerSnapshot> readSnapshot = pager->getReadSnapshot(version);
|
||||
debug_printf("Read%d\n", myRead);
|
||||
Reference<const IPage> readPage = wait(readSnapshot->getPhysicalPage(pageID, true));
|
||||
debug_printf("FinishedRead%d\n", myRead);
|
||||
ASSERT(validatePage(readPage, pageID, expectedVersion >= 0 ? expectedVersion : version));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> simplePagerTest(IPager *pager) {
|
||||
state Reference<IPage> page = pager->newPageBuffer();
|
||||
|
||||
Version latestVersion = wait(pager->getLatestVersion());
|
||||
debug_printf("Got latest version: %lld\n", latestVersion);
|
||||
|
||||
state Version version = latestVersion+1;
|
||||
state Version v1 = version;
|
||||
|
||||
state LogicalPageID pageID1 = pager->allocateLogicalPage();
|
||||
|
||||
writePage(pager, page, pageID1, v1);
|
||||
pager->setLatestVersion(v1);
|
||||
wait(commit(pager));
|
||||
|
||||
state LogicalPageID pageID2 = pager->allocateLogicalPage();
|
||||
|
||||
state Version v2 = ++version;
|
||||
|
||||
writePage(pager, page, pageID1, v2);
|
||||
writePage(pager, page, pageID2, v2);
|
||||
pager->setLatestVersion(v2);
|
||||
wait(commit(pager));
|
||||
|
||||
wait(read(pager, pageID1, v2));
|
||||
wait(read(pager, pageID1, v1));
|
||||
|
||||
state Version v3 = ++version;
|
||||
writePage(pager, page, pageID1, v3, false);
|
||||
pager->setLatestVersion(v3);
|
||||
|
||||
wait(read(pager, pageID1, v2, v3));
|
||||
wait(read(pager, pageID1, v3, v3));
|
||||
|
||||
state LogicalPageID pageID3 = pager->allocateLogicalPage();
|
||||
|
||||
state Version v4 = ++version;
|
||||
writePage(pager, page, pageID2, v4);
|
||||
writePage(pager, page, pageID3, v4);
|
||||
pager->setLatestVersion(v4);
|
||||
wait(commit(pager));
|
||||
|
||||
wait(read(pager, pageID2, v4, v4));
|
||||
|
||||
state Version v5 = ++version;
|
||||
writePage(pager, page, pageID2, v5);
|
||||
|
||||
state LogicalPageID pageID4 = pager->allocateLogicalPage();
|
||||
writePage(pager, page, pageID4, v5);
|
||||
|
||||
state Version v6 = ++version;
|
||||
pager->freeLogicalPage(pageID2, v5);
|
||||
pager->freeLogicalPage(pageID3, v3);
|
||||
pager->setLatestVersion(v6);
|
||||
wait(commit(pager));
|
||||
|
||||
pager->forgetVersions(0, v4);
|
||||
wait(commit(pager));
|
||||
|
||||
wait(delay(3.0));
|
||||
|
||||
wait(commit(pager));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
/*
|
||||
TEST_CASE("/fdbserver/memorypager/simple") {
|
||||
state IPager *pager = new MemoryPager();
|
||||
|
||||
wait(simplePagerTest(pager));
|
||||
|
||||
Future<Void> closedFuture = pager->onClosed();
|
||||
pager->dispose();
|
||||
|
||||
wait(closedFuture);
|
||||
return Void();
|
||||
}
|
||||
*/
|
||||
|
||||
const PhysicalPageID MemoryPager::INVALID_PAGE = nullptr;
|
|
@ -1,29 +0,0 @@
|
|||
/*
|
||||
* MemoryPager.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBSERVER_MEMORYPAGER_H
|
||||
#define FDBSERVER_MEMORYPAGER_H
|
||||
#pragma once
|
||||
|
||||
#include "fdbserver/IPager.h"
|
||||
|
||||
IPager * createMemoryPager();
|
||||
|
||||
#endif
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -46,7 +46,6 @@
|
|||
<ActorCompiler Include="KeyValueStoreMemory.actor.cpp" />
|
||||
<ActorCompiler Include="SimulatedCluster.actor.cpp" />
|
||||
<ActorCompiler Include="KeyValueStoreCompressTestData.actor.cpp" />
|
||||
<ActorCompiler Include="IndirectShadowPager.actor.cpp" />
|
||||
<ClCompile Include="Knobs.cpp" />
|
||||
<ActorCompiler Include="FDBExecHelper.actor.cpp" />
|
||||
<ActorCompiler Include="QuietDatabase.actor.cpp" />
|
||||
|
@ -57,7 +56,6 @@
|
|||
<ActorCompiler Include="Restore.actor.cpp" />
|
||||
<ActorCompiler Include="LogSystemDiskQueueAdapter.actor.cpp" />
|
||||
<ActorCompiler Include="LogSystemPeekCursor.actor.cpp" />
|
||||
<ActorCompiler Include="MemoryPager.actor.cpp" />
|
||||
<ActorCompiler Include="LogRouter.actor.cpp" />
|
||||
<ClCompile Include="LatencyBandConfig.cpp" />
|
||||
<ActorCompiler Include="OldTLogServer_4_6.actor.cpp" />
|
||||
|
@ -179,7 +177,6 @@
|
|||
</ActorCompiler>
|
||||
<ClInclude Include="IDiskQueue.h" />
|
||||
<ClInclude Include="IKeyValueStore.h" />
|
||||
<ClInclude Include="IndirectShadowPager.h" />
|
||||
<ClInclude Include="IPager.h" />
|
||||
<ClInclude Include="IVersionedStore.h" />
|
||||
<ClInclude Include="LatencyBandConfig.h" />
|
||||
|
@ -189,7 +186,6 @@
|
|||
<ClInclude Include="LogSystemConfig.h" />
|
||||
<ClInclude Include="LogSystemDiskQueueAdapter.h" />
|
||||
<ClInclude Include="MasterInterface.h" />
|
||||
<ClInclude Include="MemoryPager.h" />
|
||||
<ActorCompiler Include="MoveKeys.actor.h">
|
||||
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
|
||||
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
|
||||
|
|
|
@ -274,8 +274,6 @@
|
|||
<ActorCompiler Include="workloads\AtomicRestore.actor.cpp">
|
||||
<Filter>workloads</Filter>
|
||||
</ActorCompiler>
|
||||
<ActorCompiler Include="MemoryPager.actor.cpp" />
|
||||
<ActorCompiler Include="IndirectShadowPager.actor.cpp" />
|
||||
<ActorCompiler Include="OldTLogServer.actor.cpp" />
|
||||
<ActorCompiler Include="LogRouter.actor.cpp" />
|
||||
<ActorCompiler Include="workloads\SlowTaskWorkload.actor.cpp">
|
||||
|
@ -385,8 +383,6 @@
|
|||
<ClInclude Include="LogProtocolMessage.h" />
|
||||
<ClInclude Include="IPager.h" />
|
||||
<ClInclude Include="IVersionedStore.h" />
|
||||
<ClInclude Include="MemoryPager.h" />
|
||||
<ClInclude Include="IndirectShadowPager.h" />
|
||||
<ClInclude Include="template_fdb.h" />
|
||||
<ClInclude Include="LatencyBandConfig.h" />
|
||||
</ItemGroup>
|
||||
|
|
Loading…
Reference in New Issue