Merge branch 'master' into feature-redwood
# Conflicts: # fdbserver/fdbserver.vcxproj # fdbserver/fdbserver.vcxproj.filters
This commit is contained in:
parent
ac8c289296
commit
b65ad3563c
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* IPager.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBSERVER_IPAGER_H
|
||||
#define FDBSERVER_IPAGER_H
|
||||
#pragma once
|
||||
|
||||
#include "IKeyValueStore.h"
|
||||
|
||||
#include "flow/flow.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
|
||||
typedef uint32_t LogicalPageID; // uint64_t?
|
||||
|
||||
class IPage {
|
||||
public:
|
||||
virtual uint8_t const* begin() const = 0;
|
||||
virtual uint8_t* mutate() = 0;
|
||||
|
||||
// Must return the same size for all pages created by the same pager instance
|
||||
virtual int size() const = 0;
|
||||
|
||||
virtual ~IPage() {}
|
||||
|
||||
virtual void addref() const = 0;
|
||||
virtual void delref() const = 0;
|
||||
};
|
||||
|
||||
class IPagerSnapshot {
|
||||
public:
|
||||
virtual Future<Reference<const IPage>> getPhysicalPage(LogicalPageID pageID) = 0;
|
||||
virtual void invalidateReturnedPages() = 0;
|
||||
|
||||
virtual ~IPagerSnapshot() {}
|
||||
|
||||
virtual void addref() = 0;
|
||||
virtual void delref() = 0;
|
||||
};
|
||||
|
||||
class IPager : public IClosable {
|
||||
public:
|
||||
// Returns an IPage that can be passed to writePage. The data in the returned IPage might not be zeroed.
|
||||
virtual Reference<IPage> newPageBuffer() = 0;
|
||||
|
||||
// Returns the usable size of pages returned by the pager (i.e. the size of the page that isn't pager overhead).
|
||||
// For a given pager instance, separate calls to this function must return the same value.
|
||||
virtual int getUsablePageSize() = 0;
|
||||
|
||||
// Permitted to fail (ASSERT) during recovery.
|
||||
virtual Reference<IPagerSnapshot> getReadSnapshot(Version version) = 0;
|
||||
|
||||
// Returns an unused LogicalPageID.
|
||||
// LogicalPageIDs in the range [0, SERVER_KNOBS->PAGER_RESERVED_PAGES) do not need to be allocated.
|
||||
// Permitted to fail (ASSERT) during recovery.
|
||||
virtual LogicalPageID allocateLogicalPage() = 0;
|
||||
|
||||
// Signals that the page will no longer be used as of the specified version. Versions prior to the specified version must be kept.
|
||||
// Permitted to fail (ASSERT) during recovery.
|
||||
virtual void freeLogicalPage(LogicalPageID pageID, Version version) = 0;
|
||||
|
||||
// Writes a page with the given LogicalPageID at the specified version. LogicalPageIDs in the range [0, SERVER_KNOBS->PAGER_RESERVED_PAGES)
|
||||
// can be written without being allocated. All other LogicalPageIDs must be allocated using allocateLogicalPage before writing them.
|
||||
//
|
||||
// If updateVersion is 0, we are signalling to the pager that we are reusing the LogicalPageID entry at the current latest version of pageID.
|
||||
//
|
||||
// Otherwise, we will add a new entry for LogicalPageID at the specified version. In that case, updateVersion must be larger than any version
|
||||
// written to this page previously, and it must be larger than any version committed
|
||||
//
|
||||
// Permitted to fail (ASSERT) during recovery.
|
||||
virtual void writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion) = 0;
|
||||
|
||||
// Signals to the pager that no more reads will be performed in the range [begin, end).
|
||||
// Permitted to fail (ASSERT) during recovery.
|
||||
virtual void forgetVersions(Version begin, Version end) = 0;
|
||||
|
||||
// Makes durable all writes and any data structures used for recovery.
|
||||
// Permitted to fail (ASSERT) during recovery.
|
||||
virtual Future<Void> commit() = 0;
|
||||
|
||||
// Returns the latest version of the pager. Permitted to block until recovery is complete, at which point it should always be set immediately.
|
||||
// Some functions in the IPager interface are permitted to fail (ASSERT) during recovery, so users should wait for getLatestVersion to complete
|
||||
// before doing anything else.
|
||||
virtual Future<Version> getLatestVersion() = 0;
|
||||
|
||||
// Sets the latest version of the pager. Must be monotonically increasing.
|
||||
//
|
||||
// Must be called prior to reading the specified version. SOMEDAY: It may be desirable in the future to relax this constraint for performance reasons.
|
||||
//
|
||||
// Permitted to fail (ASSERT) during recovery.
|
||||
virtual void setLatestVersion(Version version) = 0;
|
||||
|
||||
protected:
|
||||
~IPager() {} // Destruction should be done using close()/dispose() from the IClosable interface
|
||||
};
|
||||
|
||||
#endif
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* IVersionedStore.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBSERVER_IVERSIONEDSTORE_H
|
||||
#define FDBSERVER_IVERSIONEDSTORE_H
|
||||
#pragma once
|
||||
|
||||
#include "IKeyValueStore.h"
|
||||
|
||||
#include "flow/flow.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
|
||||
class IStoreCursor {
|
||||
public:
|
||||
virtual Future<Void> findEqual(KeyRef key) = 0;
|
||||
virtual Future<Void> findFirstGreaterOrEqual(KeyRef key, int prefetchNextBytes) = 0;
|
||||
virtual Future<Void> findLastLessOrEqual(KeyRef key, int prefetchPriorBytes) = 0;
|
||||
virtual Future<Void> next(bool needValue) = 0;
|
||||
virtual Future<Void> prev(bool needValue) = 0;
|
||||
|
||||
virtual bool isValid() = 0;
|
||||
virtual KeyRef getKey() = 0;
|
||||
//virtual StringRef getCompressedKey() = 0;
|
||||
virtual ValueRef getValue() = 0;
|
||||
|
||||
virtual void invalidateReturnedStrings() = 0;
|
||||
|
||||
virtual void addref() = 0;
|
||||
virtual void delref() = 0;
|
||||
};
|
||||
|
||||
class IVersionedStore : public IClosable {
|
||||
public:
|
||||
virtual KeyValueStoreType getType() = 0;
|
||||
virtual bool supportsMutation(int op) = 0; // If this returns true, then mutate(op, ...) may be called
|
||||
virtual StorageBytes getStorageBytes() = 0;
|
||||
|
||||
// Writes are provided in an ordered stream.
|
||||
// A write is considered part of (a change leading to) the version determined by the previous call to setWriteVersion()
|
||||
// A write shall not become durable until the following call to commit() begins, and shall be durable once the following call to commit() returns
|
||||
virtual void set(KeyValueRef keyValue) = 0;
|
||||
virtual void clear(KeyRangeRef range) = 0;
|
||||
virtual void mutate(int op, StringRef param1, StringRef param2) = 0;
|
||||
virtual void setWriteVersion(Version) = 0; // The write version must be nondecreasing
|
||||
virtual void forgetVersions(Version begin, Version end) = 0; // Versions [begin, end) no longer readable
|
||||
virtual Future<Void> commit() = 0;
|
||||
|
||||
virtual Future<Version> getLatestVersion() = 0;
|
||||
|
||||
// readAtVersion() may only be called on a version which has previously been passed to setWriteVersion() and never previously passed
|
||||
// to forgetVersion. The returned results when violating this precondition are unspecified; the store is not required to be able to detect violations.
|
||||
// The returned read cursor provides a consistent snapshot of the versioned store, corresponding to all the writes done with write versions less
|
||||
// than or equal to the given version.
|
||||
// If readAtVersion() is called on the *current* write version, the given read cursor MAY reflect subsequent writes at the same
|
||||
// write version, OR it may represent a snapshot as of the call to readAtVersion().
|
||||
virtual Reference<IStoreCursor> readAtVersion(Version) = 0;
|
||||
};
|
||||
|
||||
#endif
|
|
@ -0,0 +1,837 @@
|
|||
/*
|
||||
* IndirectShadowPager.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "IndirectShadowPager.h"
|
||||
#include "Knobs.h"
|
||||
|
||||
#include "flow/UnitTest.h"
|
||||
|
||||
IndirectShadowPage::IndirectShadowPage() : allocated(true) {
|
||||
data = (uint8_t*)FastAllocator<4096>::allocate();
|
||||
}
|
||||
|
||||
IndirectShadowPage::IndirectShadowPage(uint8_t *data) : data(data), allocated(false) {}
|
||||
|
||||
IndirectShadowPage::~IndirectShadowPage() {
|
||||
if(allocated) {
|
||||
FastAllocator<4096>::release(data);
|
||||
}
|
||||
}
|
||||
|
||||
uint8_t const* IndirectShadowPage::begin() const {
|
||||
return data;
|
||||
}
|
||||
|
||||
uint8_t* IndirectShadowPage::mutate() {
|
||||
return data;
|
||||
}
|
||||
|
||||
int IndirectShadowPage::size() const {
|
||||
return PAGE_BYTES - PAGE_OVERHEAD_BYTES;
|
||||
}
|
||||
|
||||
const int IndirectShadowPage::PAGE_BYTES = 4096;
|
||||
const int IndirectShadowPage::PAGE_OVERHEAD_BYTES = 0;
|
||||
|
||||
Future<Reference<const IPage>> IndirectShadowPagerSnapshot::getPhysicalPage(LogicalPageID pageID) {
|
||||
return pager->getPage(Reference<IndirectShadowPagerSnapshot>::addRef(this), pageID, version);
|
||||
}
|
||||
|
||||
void IndirectShadowPagerSnapshot::invalidateReturnedPages() {
|
||||
arena = Arena();
|
||||
}
|
||||
|
||||
template <class T>
|
||||
T bigEndian(T val) {
|
||||
static_assert(sizeof(T) <= 8, "Can't compute bigEndian on integers larger than 8 bytes");
|
||||
uint64_t b = bigEndian64(val);
|
||||
return *(T*)((uint8_t*)&b+8-sizeof(T));
|
||||
}
|
||||
|
||||
ACTOR Future<Void> recover(IndirectShadowPager *pager) {
|
||||
try {
|
||||
TraceEvent("PagerRecovering").detail("Basename", pager->basename);
|
||||
pager->pageTableLog = openKVStore(KeyValueStoreType::MEMORY, pager->basename, UID(), 1e9);
|
||||
|
||||
// TODO: this can be done synchronously with the log recovery
|
||||
int64_t flags = IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_LOCK;
|
||||
if(!fileExists(pager->basename + ".redwood")) {
|
||||
flags |= IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE;
|
||||
}
|
||||
|
||||
Reference<IAsyncFile> dataFile = wait(IAsyncFileSystem::filesystem()->open(pager->basename + ".redwood", flags, 0600));
|
||||
pager->dataFile = dataFile;
|
||||
|
||||
TraceEvent("PagerOpenedDataFile").detail("Filename", pager->basename + ".redwood");
|
||||
|
||||
Void _ = wait(pager->dataFile->sync());
|
||||
|
||||
state int64_t fileSize = wait(pager->dataFile->size());
|
||||
TraceEvent("PagerGotFileSize").detail("Size", fileSize);
|
||||
|
||||
if(fileSize > 0) {
|
||||
TraceEvent("PagerRecoveringFromLogs");
|
||||
Optional<Value> pagesAllocatedValue = wait(pager->pageTableLog->readValue(IndirectShadowPager::PAGES_ALLOCATED_KEY));
|
||||
if(pagesAllocatedValue.present()) {
|
||||
BinaryReader pr(pagesAllocatedValue.get(), Unversioned());
|
||||
uint32_t pagesAllocated;
|
||||
pr >> pagesAllocated;
|
||||
pager->pagerFile.init(fileSize, pagesAllocated);
|
||||
|
||||
fprintf(stderr, "Recovered pages allocated: %d\n", pager->pagerFile.pagesAllocated);
|
||||
ASSERT(pager->pagerFile.pagesAllocated != PagerFile::INVALID_PAGE);
|
||||
|
||||
Optional<Value> latestVersionValue = wait(pager->pageTableLog->readValue(IndirectShadowPager::LATEST_VERSION_KEY));
|
||||
ASSERT(latestVersionValue.present());
|
||||
|
||||
BinaryReader vr(latestVersionValue.get(), Unversioned());
|
||||
vr >> pager->latestVersion;
|
||||
|
||||
Optional<Value> oldestVersionValue = wait(pager->pageTableLog->readValue(IndirectShadowPager::OLDEST_VERSION_KEY));
|
||||
|
||||
if(oldestVersionValue.present()) {
|
||||
BinaryReader vr(oldestVersionValue.get(), Unversioned());
|
||||
vr >> pager->oldestVersion;
|
||||
}
|
||||
|
||||
fprintf(stderr, "Recovered version info: %d - %d\n", pager->oldestVersion, pager->latestVersion);
|
||||
pager->committedVersion = pager->latestVersion;
|
||||
|
||||
Standalone<VectorRef<KeyValueRef>> tableEntries = wait(pager->pageTableLog->readRange(KeyRangeRef(IndirectShadowPager::TABLE_ENTRY_PREFIX, strinc(IndirectShadowPager::TABLE_ENTRY_PREFIX))));
|
||||
|
||||
if(tableEntries.size() > 0) {
|
||||
BinaryReader kr(tableEntries.back().key, Unversioned());
|
||||
|
||||
uint8_t prefix;
|
||||
LogicalPageID logicalPageID;
|
||||
|
||||
kr >> prefix;
|
||||
ASSERT(prefix == IndirectShadowPager::TABLE_ENTRY_PREFIX.begin()[0]);
|
||||
|
||||
kr >> logicalPageID;
|
||||
logicalPageID = bigEndian(logicalPageID);
|
||||
|
||||
LogicalPageID pageTableSize = std::max<LogicalPageID>(logicalPageID+1, SERVER_KNOBS->PAGER_RESERVED_PAGES);
|
||||
pager->pageTable.resize(pageTableSize);
|
||||
fprintf(stderr, "Recovered page table size: %d\n", pageTableSize);
|
||||
}
|
||||
else {
|
||||
fprintf(stderr, "Recovered no page table entries\n");
|
||||
}
|
||||
|
||||
LogicalPageID nextPageID = SERVER_KNOBS->PAGER_RESERVED_PAGES;
|
||||
std::set<PhysicalPageID> allocatedPhysicalPages;
|
||||
for(auto entry : tableEntries) {
|
||||
BinaryReader kr(entry.key, Unversioned());
|
||||
BinaryReader vr(entry.value, Unversioned());
|
||||
|
||||
uint8_t prefix;
|
||||
LogicalPageID logicalPageID;
|
||||
Version version;
|
||||
PhysicalPageID physicalPageID;
|
||||
|
||||
kr >> prefix;
|
||||
ASSERT(prefix == IndirectShadowPager::TABLE_ENTRY_PREFIX.begin()[0]);
|
||||
|
||||
kr >> logicalPageID;
|
||||
logicalPageID = bigEndian(logicalPageID);
|
||||
|
||||
kr >> version;
|
||||
version = bigEndian(version);
|
||||
|
||||
vr >> physicalPageID;
|
||||
|
||||
pager->pageTable[logicalPageID].push_back(std::make_pair(version, physicalPageID));
|
||||
|
||||
if(physicalPageID != PagerFile::INVALID_PAGE) {
|
||||
allocatedPhysicalPages.insert(physicalPageID);
|
||||
pager->pagerFile.markPageAllocated(logicalPageID, version, physicalPageID);
|
||||
}
|
||||
|
||||
while(nextPageID < logicalPageID) {
|
||||
pager->logicalFreeList.push_back(nextPageID++);
|
||||
}
|
||||
if(logicalPageID == nextPageID) {
|
||||
++nextPageID;
|
||||
}
|
||||
|
||||
fprintf(stderr, "Recovered page table entry %d -> (%d, %d)\n", logicalPageID, version, physicalPageID);
|
||||
}
|
||||
|
||||
fprintf(stderr, "Building physical free list\n");
|
||||
// TODO: can we do this better? does it require storing extra info in the log?
|
||||
PhysicalPageID nextPhysicalPageID = 0;
|
||||
for(auto itr = allocatedPhysicalPages.begin(); itr != allocatedPhysicalPages.end(); ++itr) {
|
||||
while(nextPhysicalPageID < *itr) {
|
||||
pager->pagerFile.freePage(nextPhysicalPageID++);
|
||||
}
|
||||
++nextPhysicalPageID;
|
||||
}
|
||||
|
||||
while(nextPhysicalPageID < pager->pagerFile.pagesAllocated) {
|
||||
pager->pagerFile.freePage(nextPhysicalPageID++);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(pager->pageTable.size() < SERVER_KNOBS->PAGER_RESERVED_PAGES) {
|
||||
pager->pageTable.resize(SERVER_KNOBS->PAGER_RESERVED_PAGES);
|
||||
}
|
||||
|
||||
pager->pagerFile.finishedMarkingPages();
|
||||
pager->pagerFile.startVacuuming();
|
||||
|
||||
fprintf(stderr, "Finished recovery\n", pager->oldestVersion);
|
||||
TraceEvent("PagerFinishedRecovery");
|
||||
}
|
||||
catch(Error &e) {
|
||||
TraceEvent(SevError, "PagerRecoveryFailed").error(e, true);
|
||||
throw e;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> housekeeper(IndirectShadowPager *pager) {
|
||||
Void _ = wait(pager->recovery);
|
||||
|
||||
loop {
|
||||
state LogicalPageID pageID = 0;
|
||||
for(; pageID < pager->pageTable.size(); ++pageID) {
|
||||
// TODO: pick an appropriate rate for this loop and determine the right way to implement it
|
||||
// Right now, this delays 10ms every 400K pages, which means we have 1s of delay for every
|
||||
// 40M pages. In total, we introduce 100s delay for a max size 4B page file.
|
||||
if(pageID % 400000 == 0) {
|
||||
Void _ = wait(delay(0.01));
|
||||
}
|
||||
else {
|
||||
Void _ = wait(yield());
|
||||
}
|
||||
|
||||
auto& pageVersionMap = pager->pageTable[pageID];
|
||||
|
||||
if(pageVersionMap.size() > 0) {
|
||||
auto itr = pageVersionMap.begin();
|
||||
for(auto prev = itr; prev != pageVersionMap.end() && prev->first < pager->oldestVersion; prev=itr) {
|
||||
pager->pagerFile.markPageAllocated(pageID, itr->first, itr->second);
|
||||
++itr;
|
||||
if(prev->second != PagerFile::INVALID_PAGE && (itr == pageVersionMap.end() || itr->first <= pager->oldestVersion)) {
|
||||
pager->freePhysicalPageID(prev->second);
|
||||
}
|
||||
if(itr == pageVersionMap.end() || itr->first >= pager->oldestVersion) {
|
||||
//fprintf(stderr, "Updating oldest version for logical page %d: %d\n", pageID, pager->oldestVersion);
|
||||
pager->logPageTableClear(pageID, 0, pager->oldestVersion);
|
||||
|
||||
if(itr != pageVersionMap.end() && itr->first > pager->oldestVersion) {
|
||||
//fprintf(stderr, "Erasing pages to prev from pageVersionMap for %d (itr=%d, prev=%d)\n", pageID, itr->first, prev->first);
|
||||
prev->first = pager->oldestVersion;
|
||||
pager->logPageTableUpdate(pageID, pager->oldestVersion, prev->second);
|
||||
itr = pageVersionMap.erase(pageVersionMap.begin(), prev);
|
||||
}
|
||||
else {
|
||||
//fprintf(stderr, "Erasing pages to itr from pageVersionMap for %d (%d) (itr=%d, prev=%d)\n", pageID, itr == pageVersionMap.end(), itr==pageVersionMap.end() ? -1 : itr->first, prev->first);
|
||||
itr = pageVersionMap.erase(pageVersionMap.begin(), itr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for(; itr != pageVersionMap.end(); ++itr) {
|
||||
pager->pagerFile.markPageAllocated(pageID, itr->first, itr->second);
|
||||
}
|
||||
|
||||
if(pageVersionMap.size() == 0) {
|
||||
pager->freeLogicalPageID(pageID);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pager->pagerFile.finishedMarkingPages();
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> forwardError(Future<Void> f, Promise<Void> target) {
|
||||
try {
|
||||
Void _ = wait(f);
|
||||
}
|
||||
catch(Error &e) {
|
||||
if(target.canBeSet()) {
|
||||
target.sendError(e);
|
||||
}
|
||||
|
||||
throw e;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
IndirectShadowPager::IndirectShadowPager(std::string basename)
|
||||
: basename(basename), latestVersion(0), committedVersion(0), committing(Void()), oldestVersion(0), pagerFile(this)
|
||||
{
|
||||
recovery = forwardError(recover(this), errorPromise);
|
||||
housekeeping = forwardError(housekeeper(this), errorPromise);
|
||||
}
|
||||
|
||||
Reference<IPage> IndirectShadowPager::newPageBuffer() {
|
||||
return Reference<IPage>(new IndirectShadowPage());
|
||||
}
|
||||
|
||||
int IndirectShadowPager::getUsablePageSize() {
|
||||
return IndirectShadowPage::PAGE_BYTES - IndirectShadowPage::PAGE_OVERHEAD_BYTES;
|
||||
}
|
||||
|
||||
Reference<IPagerSnapshot> IndirectShadowPager::getReadSnapshot(Version version) {
|
||||
ASSERT(recovery.isReady());
|
||||
ASSERT(version <= latestVersion);
|
||||
ASSERT(version >= oldestVersion);
|
||||
|
||||
return Reference<IPagerSnapshot>(new IndirectShadowPagerSnapshot(this, version));
|
||||
}
|
||||
|
||||
LogicalPageID IndirectShadowPager::allocateLogicalPage() {
|
||||
ASSERT(recovery.isReady());
|
||||
|
||||
LogicalPageID allocatedPage;
|
||||
if(logicalFreeList.size() > 0) {
|
||||
allocatedPage = logicalFreeList.front();
|
||||
logicalFreeList.pop_front();
|
||||
}
|
||||
else {
|
||||
ASSERT(pageTable.size() < std::numeric_limits<LogicalPageID>::max()); // TODO: different error?
|
||||
allocatedPage = pageTable.size();
|
||||
pageTable.push_back(PageVersionMap());
|
||||
}
|
||||
|
||||
ASSERT(allocatedPage >= SERVER_KNOBS->PAGER_RESERVED_PAGES);
|
||||
return allocatedPage;
|
||||
}
|
||||
|
||||
void IndirectShadowPager::freeLogicalPage(LogicalPageID pageID, Version version) {
|
||||
ASSERT(recovery.isReady());
|
||||
ASSERT(committing.isReady());
|
||||
|
||||
ASSERT(pageID < pageTable.size());
|
||||
|
||||
PageVersionMap &pageVersionMap = pageTable[pageID];
|
||||
ASSERT(!pageVersionMap.empty());
|
||||
|
||||
auto itr = pageVersionMapLowerBound(pageVersionMap, version);
|
||||
for(auto i = itr; i != pageVersionMap.end(); ++i) {
|
||||
freePhysicalPageID(i->second);
|
||||
}
|
||||
|
||||
if(itr != pageVersionMap.end()) {
|
||||
//fprintf(stderr, "Clearing newest versions for logical page %d: %d\n", pageID, version);
|
||||
logPageTableClearToEnd(pageID, version);
|
||||
pageVersionMap.erase(itr, pageVersionMap.end());
|
||||
}
|
||||
|
||||
if(pageVersionMap.size() == 0) {
|
||||
fprintf(stderr, "Freeing logical page %d (freeLogicalPage)\n", pageID);
|
||||
logicalFreeList.push_back(pageID);
|
||||
}
|
||||
else if(pageVersionMap.back().second != PagerFile::INVALID_PAGE) {
|
||||
pageVersionMap.push_back(std::make_pair(version, PagerFile::INVALID_PAGE));
|
||||
logPageTableUpdate(pageID, version, PagerFile::INVALID_PAGE);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> waitAndFreePhysicalPageID(IndirectShadowPager *pager, PhysicalPageID pageID, Future<Void> canFree) {
|
||||
Void _ = wait(canFree);
|
||||
pager->pagerFile.freePage(pageID);
|
||||
return Void();
|
||||
}
|
||||
|
||||
// TODO: we need to be more careful about making sure the action that caused the page to be freed is durable before freeing the page
|
||||
// Otherwise, the page could be rewritten prematurely.
|
||||
void IndirectShadowPager::freePhysicalPageID(PhysicalPageID pageID) {
|
||||
fprintf(stderr, "Freeing physical page: %u\n", pageID);
|
||||
auto itr = readCounts.find(pageID);
|
||||
if(itr != readCounts.end()) {
|
||||
operations.add(waitAndFreePhysicalPageID(this, pageID, itr->second.second.getFuture()));
|
||||
}
|
||||
else {
|
||||
pagerFile.freePage(pageID);
|
||||
}
|
||||
}
|
||||
|
||||
void IndirectShadowPager::writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion) {
|
||||
ASSERT(recovery.isReady());
|
||||
ASSERT(committing.isReady());
|
||||
|
||||
ASSERT(updateVersion > latestVersion || updateVersion == 0);
|
||||
ASSERT(pageID < pageTable.size());
|
||||
|
||||
PageVersionMap &pageVersionMap = pageTable[pageID];
|
||||
|
||||
ASSERT(pageVersionMap.empty() || pageVersionMap.back().second != PagerFile::INVALID_PAGE);
|
||||
|
||||
// TODO: should this be conditional on the write succeeding?
|
||||
bool updateExisting = updateVersion == 0;
|
||||
if(updateExisting) {
|
||||
ASSERT(pageVersionMap.size());
|
||||
updateVersion = pageVersionMap.back().first;
|
||||
}
|
||||
|
||||
PhysicalPageID physicalPageID = pagerFile.allocatePage(pageID, updateVersion);
|
||||
|
||||
if(updateExisting) {
|
||||
freePhysicalPageID(pageVersionMap.back().second);
|
||||
pageVersionMap.back().second = physicalPageID;
|
||||
}
|
||||
else {
|
||||
ASSERT(pageVersionMap.empty() || pageVersionMap.back().first < updateVersion);
|
||||
pageVersionMap.push_back(std::make_pair(updateVersion, physicalPageID));
|
||||
}
|
||||
|
||||
logPageTableUpdate(pageID, updateVersion, physicalPageID);
|
||||
Future<Void> write = dataFile->write(contents->begin(), IndirectShadowPage::PAGE_BYTES, physicalPageID * IndirectShadowPage::PAGE_BYTES);
|
||||
|
||||
if(write.isError()) {
|
||||
if(errorPromise.canBeSet()) {
|
||||
errorPromise.sendError(write.getError());
|
||||
}
|
||||
throw write.getError();
|
||||
}
|
||||
writeActors.add(forwardError(write, errorPromise));
|
||||
}
|
||||
|
||||
void IndirectShadowPager::forgetVersions(Version begin, Version end) {
|
||||
ASSERT(recovery.isReady());
|
||||
ASSERT(begin <= end);
|
||||
ASSERT(end <= latestVersion);
|
||||
|
||||
// TODO: support forgetting arbitrary ranges
|
||||
if(begin <= oldestVersion) {
|
||||
oldestVersion = std::max(end, oldestVersion);
|
||||
logVersion(OLDEST_VERSION_KEY, oldestVersion);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> commitImpl(IndirectShadowPager *pager, Future<Void> previousCommit) {
|
||||
state Future<Void> outstandingWrites = pager->writeActors.signalAndCollapse();
|
||||
state Version commitVersion = pager->latestVersion;
|
||||
|
||||
Void _ = wait(previousCommit);
|
||||
|
||||
pager->logVersion(IndirectShadowPager::LATEST_VERSION_KEY, commitVersion);
|
||||
|
||||
// TODO: we need to prevent writes that happen now from being committed in the subsequent log commit
|
||||
// This is probably best done once we have better control of the log, where we can write a commit entry
|
||||
// here without syncing the file.
|
||||
|
||||
Void _ = wait(outstandingWrites);
|
||||
|
||||
Void _ = wait(pager->dataFile->sync());
|
||||
Void _ = wait(pager->pageTableLog->commit());
|
||||
|
||||
pager->committedVersion = std::max(pager->committedVersion, commitVersion);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> IndirectShadowPager::commit() {
|
||||
ASSERT(recovery.isReady());
|
||||
Future<Void> f = commitImpl(this, committing);
|
||||
committing = f;
|
||||
return committing;
|
||||
}
|
||||
|
||||
void IndirectShadowPager::setLatestVersion(Version version) {
|
||||
ASSERT(recovery.isReady());
|
||||
latestVersion = version;
|
||||
}
|
||||
|
||||
ACTOR Future<Version> getLatestVersionImpl(IndirectShadowPager *pager) {
|
||||
Void _ = wait(pager->recovery);
|
||||
return pager->latestVersion;
|
||||
}
|
||||
|
||||
Future<Version> IndirectShadowPager::getLatestVersion() {
|
||||
return getLatestVersionImpl(this);
|
||||
}
|
||||
|
||||
Future<Void> IndirectShadowPager::getError() {
|
||||
return errorPromise.getFuture();
|
||||
}
|
||||
|
||||
Future<Void> IndirectShadowPager::onClosed() {
|
||||
return closed.getFuture();
|
||||
}
|
||||
|
||||
ACTOR void shutdown(IndirectShadowPager *pager, bool dispose) {
|
||||
pager->recovery = Never(); // TODO: this is a hacky way to prevent users from performing operations after calling shutdown. Implement a better mechanism
|
||||
|
||||
Void _ = wait(pager->writeActors.signal());
|
||||
Void _ = wait(pager->operations.signal());
|
||||
Void _ = wait(pager->committing);
|
||||
|
||||
pager->housekeeping.cancel();
|
||||
pager->pagerFile.shutdown();
|
||||
|
||||
state Future<Void> pageTableClosed = pager->pageTableLog->onClosed();
|
||||
if(dispose) {
|
||||
Void _ = wait(IAsyncFileSystem::filesystem()->deleteFile(pager->basename + ".redwood", true));
|
||||
pager->pageTableLog->dispose();
|
||||
}
|
||||
else {
|
||||
pager->pageTableLog->close();
|
||||
}
|
||||
|
||||
Void _ = wait(pageTableClosed);
|
||||
|
||||
pager->closed.send(Void());
|
||||
delete pager;
|
||||
}
|
||||
|
||||
void IndirectShadowPager::dispose() {
|
||||
shutdown(this, true);
|
||||
}
|
||||
|
||||
void IndirectShadowPager::close() {
|
||||
shutdown(this, false);
|
||||
}
|
||||
|
||||
ACTOR Future<Reference<const IPage>> getPageImpl(IndirectShadowPager *pager, Reference<IndirectShadowPagerSnapshot> snapshot, LogicalPageID pageID, Version version) {
|
||||
ASSERT(pageID < pager->pageTable.size());
|
||||
PageVersionMap &pageVersionMap = pager->pageTable[pageID];
|
||||
|
||||
auto itr = IndirectShadowPager::pageVersionMapUpperBound(pageVersionMap, version);
|
||||
if(itr == pageVersionMap.begin()) {
|
||||
ASSERT(false);
|
||||
}
|
||||
|
||||
--itr;
|
||||
|
||||
state PhysicalPageID physicalPageID = itr->second;
|
||||
fprintf(stderr, "Reading %d at v%d from %d\n", pageID, version, physicalPageID);
|
||||
|
||||
ASSERT(physicalPageID != PagerFile::INVALID_PAGE);
|
||||
++pager->readCounts[physicalPageID].first;
|
||||
|
||||
// We are relying on the use of AsyncFileCached for performance here. We expect that all write actors will complete immediately (with a possible yield()),
|
||||
// so this wait should either be nonexistent or just a yield.
|
||||
Void _ = wait(pager->writeActors.signalAndCollapse());
|
||||
|
||||
state uint8_t *buf = new (snapshot->arena) uint8_t[IndirectShadowPage::PAGE_BYTES];
|
||||
int read = wait(pager->dataFile->read(buf, IndirectShadowPage::PAGE_BYTES, physicalPageID * IndirectShadowPage::PAGE_BYTES));
|
||||
ASSERT(read == IndirectShadowPage::PAGE_BYTES);
|
||||
|
||||
auto readCountItr = pager->readCounts.find(physicalPageID);
|
||||
ASSERT(readCountItr != pager->readCounts.end());
|
||||
if(readCountItr->second.first == 1) {
|
||||
readCountItr->second.second.send(Void());
|
||||
pager->readCounts.erase(readCountItr);
|
||||
}
|
||||
else {
|
||||
--readCountItr->second.first;
|
||||
}
|
||||
|
||||
return Reference<const IPage>(new IndirectShadowPage(buf));
|
||||
}
|
||||
|
||||
Future<Reference<const IPage>> IndirectShadowPager::getPage(Reference<IndirectShadowPagerSnapshot> snapshot, LogicalPageID pageID, Version version) {
|
||||
ASSERT(recovery.isReady());
|
||||
|
||||
Future<Reference<const IPage>> f = getPageImpl(this, snapshot, pageID, version);
|
||||
operations.add(success(f));
|
||||
return f;
|
||||
}
|
||||
|
||||
PageVersionMap::iterator IndirectShadowPager::pageVersionMapLowerBound(PageVersionMap &pageVersionMap, Version version) {
|
||||
return std::lower_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](std::pair<Version, PhysicalPageID> p, Version v) {
|
||||
return p.first < v;
|
||||
});
|
||||
}
|
||||
|
||||
PageVersionMap::iterator IndirectShadowPager::pageVersionMapUpperBound(PageVersionMap &pageVersionMap, Version version) {
|
||||
return std::upper_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](Version v, std::pair<Version, PhysicalPageID> p) {
|
||||
return v < p.first;
|
||||
});
|
||||
}
|
||||
|
||||
void IndirectShadowPager::freeLogicalPageID(LogicalPageID pageID) {
|
||||
if(pageID >= SERVER_KNOBS->PAGER_RESERVED_PAGES) {
|
||||
fprintf(stderr, "Freeing logical page: %u\n", pageID);
|
||||
logicalFreeList.push_back(pageID);
|
||||
}
|
||||
}
|
||||
|
||||
void IndirectShadowPager::logVersion(StringRef versionKey, Version version) {
|
||||
BinaryWriter v(Unversioned());
|
||||
v << version;
|
||||
|
||||
pageTableLog->set(KeyValueRef(versionKey, v.toStringRef()));
|
||||
}
|
||||
|
||||
void IndirectShadowPager::logPagesAllocated() {
|
||||
BinaryWriter v(Unversioned());
|
||||
v << pagerFile.getPagesAllocated();
|
||||
|
||||
pageTableLog->set(KeyValueRef(PAGES_ALLOCATED_KEY, v.toStringRef()));
|
||||
}
|
||||
|
||||
void IndirectShadowPager::logPageTableUpdate(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID) {
|
||||
BinaryWriter k(Unversioned());
|
||||
k << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(version);
|
||||
|
||||
BinaryWriter v(Unversioned());
|
||||
v << physicalPageID;
|
||||
|
||||
pageTableLog->set(KeyValueRef(k.toStringRef(), v.toStringRef()));
|
||||
}
|
||||
|
||||
void IndirectShadowPager::logPageTableClearToEnd(LogicalPageID logicalPageID, Version start) {
|
||||
BinaryWriter b(Unversioned());
|
||||
b << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(start);
|
||||
|
||||
BinaryWriter e(Unversioned());
|
||||
e << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID);
|
||||
|
||||
pageTableLog->clear(KeyRangeRef(b.toStringRef(), strinc(e.toStringRef())));
|
||||
}
|
||||
|
||||
void IndirectShadowPager::logPageTableClear(LogicalPageID logicalPageID, Version start, Version end) {
|
||||
BinaryWriter b(Unversioned());
|
||||
b << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(start);
|
||||
|
||||
BinaryWriter e(Unversioned());
|
||||
e << TABLE_ENTRY_PREFIX.begin()[0] << bigEndian(logicalPageID) << bigEndian(end);
|
||||
|
||||
pageTableLog->clear(KeyRangeRef(b.toStringRef(), e.toStringRef()));
|
||||
}
|
||||
|
||||
const StringRef IndirectShadowPager::LATEST_VERSION_KEY = LiteralStringRef("\xff/LatestVersion");
|
||||
const StringRef IndirectShadowPager::OLDEST_VERSION_KEY = LiteralStringRef("\xff/OldestVersion");
|
||||
const StringRef IndirectShadowPager::PAGES_ALLOCATED_KEY = LiteralStringRef("\xff/PagesAllocated");
|
||||
const StringRef IndirectShadowPager::TABLE_ENTRY_PREFIX = LiteralStringRef("\x00");
|
||||
|
||||
ACTOR Future<Void> copyPage(IndirectShadowPager *pager, Reference<IPage> page, PhysicalPageID from, PhysicalPageID to) {
|
||||
state bool zeroCopied = true;
|
||||
state int bytes = IndirectShadowPage::PAGE_BYTES;
|
||||
state void *data = nullptr;
|
||||
|
||||
try {
|
||||
try {
|
||||
Void _ = wait(pager->dataFile->readZeroCopy(&data, &bytes, from * IndirectShadowPage::PAGE_BYTES));
|
||||
}
|
||||
catch(Error &e) {
|
||||
zeroCopied = false;
|
||||
data = page->mutate();
|
||||
int _bytes = wait(pager->dataFile->read(data, page->size(), from * IndirectShadowPage::PAGE_BYTES));
|
||||
bytes = _bytes;
|
||||
}
|
||||
|
||||
ASSERT(bytes == IndirectShadowPage::PAGE_BYTES);
|
||||
Void _ = wait(pager->dataFile->write(data, bytes, to * IndirectShadowPage::PAGE_BYTES));
|
||||
if(zeroCopied) {
|
||||
pager->dataFile->releaseZeroCopy(data, bytes, from * IndirectShadowPage::PAGE_BYTES);
|
||||
}
|
||||
}
|
||||
catch(Error &e) {
|
||||
if(zeroCopied) {
|
||||
pager->dataFile->releaseZeroCopy(data, bytes, from * IndirectShadowPage::PAGE_BYTES);
|
||||
}
|
||||
pager->pagerFile.freePage(to);
|
||||
throw e;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> vacuumer(IndirectShadowPager *pager, PagerFile *pagerFile) {
|
||||
state Reference<IPage> page(new IndirectShadowPage());
|
||||
|
||||
loop {
|
||||
state double start = now();
|
||||
while(!pagerFile->canVacuum()) {
|
||||
Void _ = wait(delay(1.0));
|
||||
}
|
||||
|
||||
ASSERT(!pagerFile->freePages.empty());
|
||||
|
||||
if(!pagerFile->vacuumQueue.empty()) {
|
||||
state PhysicalPageID lastUsedPage = pagerFile->vacuumQueue.rbegin()->first;
|
||||
PhysicalPageID lastFreePage = *pagerFile->freePages.rbegin();
|
||||
//fprintf(stderr, "Vacuuming: evaluating (free list size=%u, lastFreePage=%u, lastUsedPage=%u, pagesAllocated=%u)\n", pagerFile->freePages.size(), lastFreePage, lastUsedPage, pagerFile->pagesAllocated);
|
||||
ASSERT(lastFreePage < pagerFile->pagesAllocated);
|
||||
ASSERT(lastUsedPage < pagerFile->pagesAllocated);
|
||||
ASSERT(lastFreePage != lastUsedPage);
|
||||
|
||||
if(lastFreePage < lastUsedPage) {
|
||||
state std::pair<LogicalPageID, Version> logicalPageInfo = pagerFile->vacuumQueue[lastUsedPage];
|
||||
state PhysicalPageID newPage = pagerFile->allocatePage(logicalPageInfo.first, logicalPageInfo.second);
|
||||
|
||||
fprintf(stderr, "Vacuuming: copying page %u to %u\n", lastUsedPage, newPage);
|
||||
Void _ = wait(copyPage(pager, page, lastUsedPage, newPage));
|
||||
|
||||
auto &pageVersionMap = pager->pageTable[logicalPageInfo.first];
|
||||
auto itr = IndirectShadowPager::pageVersionMapLowerBound(pageVersionMap, logicalPageInfo.second);
|
||||
if(itr != pageVersionMap.end() && itr->second == lastUsedPage) {
|
||||
itr->second = newPage;
|
||||
pager->logPageTableUpdate(logicalPageInfo.first, itr->first, newPage);
|
||||
pagerFile->freePage(lastUsedPage);
|
||||
}
|
||||
else {
|
||||
TEST(true); // page was freed while vacuuming
|
||||
pagerFile->freePage(newPage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
PhysicalPageID firstFreePage = pagerFile->vacuumQueue.empty() ? pagerFile->minVacuumQueuePage : (pagerFile->vacuumQueue.rbegin()->first + 1);
|
||||
ASSERT(pagerFile->pagesAllocated >= firstFreePage);
|
||||
|
||||
uint64_t pagesToErase = 0;
|
||||
if(pagerFile->freePages.size() >= SERVER_KNOBS->FREE_PAGE_VACUUM_THRESHOLD) {
|
||||
pagesToErase = std::min<uint64_t>(pagerFile->freePages.size() - SERVER_KNOBS->FREE_PAGE_VACUUM_THRESHOLD + 1, pagerFile->pagesAllocated - firstFreePage);
|
||||
}
|
||||
|
||||
//fprintf(stderr, "Vacuuming: got %u pages to erase (freePages=%u, pagesAllocated=%u, vacuumQueueEmpty=%u, minVacuumQueuePage=%u, firstFreePage=%u)\n", pagesToErase, pagerFile->freePages.size(), pagerFile->pagesAllocated, pagerFile->vacuumQueue.empty(), pagerFile->minVacuumQueuePage, firstFreePage);
|
||||
|
||||
if(pagesToErase > 0) {
|
||||
PhysicalPageID eraseStartPage = pagerFile->pagesAllocated - pagesToErase;
|
||||
fprintf(stderr, "Vacuuming: truncating last %u pages starting at %u\n", pagesToErase, eraseStartPage);
|
||||
|
||||
ASSERT(pagesToErase <= pagerFile->pagesAllocated);
|
||||
|
||||
pagerFile->pagesAllocated = eraseStartPage;
|
||||
pager->logPagesAllocated();
|
||||
|
||||
auto freePageItr = pagerFile->freePages.find(eraseStartPage);
|
||||
ASSERT(freePageItr != pagerFile->freePages.end());
|
||||
|
||||
pagerFile->freePages.erase(freePageItr, pagerFile->freePages.end());
|
||||
ASSERT(pagerFile->vacuumQueue.empty() || pagerFile->vacuumQueue.rbegin()->first < eraseStartPage);
|
||||
|
||||
Void _ = wait(pager->dataFile->truncate(pagerFile->pagesAllocated * IndirectShadowPage::PAGE_BYTES));
|
||||
}
|
||||
|
||||
Void _ = wait(delayUntil(start + (double)IndirectShadowPage::PAGE_BYTES / SERVER_KNOBS->VACUUM_BYTES_PER_SECOND)); // TODO: figure out the correct mechanism here
|
||||
}
|
||||
}
|
||||
|
||||
PagerFile::PagerFile(IndirectShadowPager *pager) : fileSize(0), pagesAllocated(0), pager(pager), vacuumQueueReady(false), minVacuumQueuePage(0) {}
|
||||
|
||||
PhysicalPageID PagerFile::allocatePage(LogicalPageID logicalPageID, Version version) {
|
||||
ASSERT(pagesAllocated * IndirectShadowPage::PAGE_BYTES <= fileSize);
|
||||
ASSERT(fileSize % IndirectShadowPage::PAGE_BYTES == 0);
|
||||
|
||||
PhysicalPageID allocatedPage;
|
||||
if(!freePages.empty()) {
|
||||
allocatedPage = *freePages.begin();
|
||||
freePages.erase(freePages.begin());
|
||||
}
|
||||
else {
|
||||
if(pagesAllocated * IndirectShadowPage::PAGE_BYTES == fileSize) {
|
||||
fileSize += (1 << 24);
|
||||
// TODO: extend the file before writing beyond the end.
|
||||
}
|
||||
|
||||
ASSERT(pagesAllocated < INVALID_PAGE); // TODO: we should throw a better error here
|
||||
allocatedPage = pagesAllocated++;
|
||||
pager->logPagesAllocated();
|
||||
}
|
||||
|
||||
markPageAllocated(logicalPageID, version, allocatedPage);
|
||||
|
||||
fprintf(stderr, "Allocated physical page %u\n", allocatedPage);
|
||||
return allocatedPage;
|
||||
}
|
||||
|
||||
void PagerFile::freePage(PhysicalPageID pageID) {
|
||||
freePages.insert(pageID);
|
||||
|
||||
if(pageID >= minVacuumQueuePage) {
|
||||
vacuumQueue.erase(pageID);
|
||||
}
|
||||
}
|
||||
|
||||
void PagerFile::markPageAllocated(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID) {
|
||||
if(physicalPageID != INVALID_PAGE && physicalPageID >= minVacuumQueuePage) {
|
||||
vacuumQueue[physicalPageID] = std::make_pair(logicalPageID, version);
|
||||
}
|
||||
}
|
||||
|
||||
void PagerFile::finishedMarkingPages() {
|
||||
if(minVacuumQueuePage >= pagesAllocated) {
|
||||
minVacuumQueuePage = pagesAllocated >= SERVER_KNOBS->VACUUM_QUEUE_SIZE ? pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE : 0;
|
||||
vacuumQueueReady = false;
|
||||
}
|
||||
else {
|
||||
if(!vacuumQueueReady) {
|
||||
vacuumQueueReady = true;
|
||||
}
|
||||
if(pagesAllocated > SERVER_KNOBS->VACUUM_QUEUE_SIZE && minVacuumQueuePage < pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE) {
|
||||
minVacuumQueuePage = pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE;
|
||||
auto itr = vacuumQueue.lower_bound(minVacuumQueuePage);
|
||||
vacuumQueue.erase(vacuumQueue.begin(), itr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t PagerFile::size() {
|
||||
return fileSize;
|
||||
}
|
||||
|
||||
uint32_t PagerFile::getPagesAllocated() {
|
||||
return pagesAllocated;
|
||||
}
|
||||
|
||||
void PagerFile::init(uint64_t fileSize, uint32_t pagesAllocated) {
|
||||
this->fileSize = fileSize;
|
||||
this->pagesAllocated = pagesAllocated;
|
||||
this->minVacuumQueuePage = pagesAllocated >= SERVER_KNOBS->VACUUM_QUEUE_SIZE ? pagesAllocated - SERVER_KNOBS->VACUUM_QUEUE_SIZE : 0;
|
||||
}
|
||||
|
||||
void PagerFile::startVacuuming() {
|
||||
vacuuming = vacuumer(pager, this);
|
||||
}
|
||||
|
||||
void PagerFile::shutdown() {
|
||||
vacuuming.cancel();
|
||||
}
|
||||
|
||||
bool PagerFile::canVacuum() {
|
||||
if(freePages.size() < SERVER_KNOBS->FREE_PAGE_VACUUM_THRESHOLD // Not enough free pages
|
||||
|| minVacuumQueuePage >= pagesAllocated // We finished processing all pages in the vacuum queue
|
||||
|| !vacuumQueueReady) // Populating vacuum queue
|
||||
{
|
||||
//fprintf(stderr, "Vacuuming: waiting for vacuumable pages (free list size=%u, minVacuumQueuePage=%u, pages allocated=%u, vacuumQueueReady=%d)\n", freePages.size(), minVacuumQueuePage, pagesAllocated, vacuumQueueReady);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
const PhysicalPageID PagerFile::INVALID_PAGE = std::numeric_limits<PhysicalPageID>::max();
|
||||
|
||||
extern Future<Void> simplePagerTest(IPager* const& pager);
|
||||
|
||||
TEST_CASE("fdbserver/indirectshadowpager/simple") {
|
||||
state IPager *pager = new IndirectShadowPager("data/test");
|
||||
|
||||
Void _ = wait(simplePagerTest(pager));
|
||||
|
||||
Future<Void> closedFuture = pager->onClosed();
|
||||
pager->close();
|
||||
Void _ = wait(closedFuture);
|
||||
|
||||
return Void();
|
||||
}
|
|
@ -0,0 +1,192 @@
|
|||
/*
|
||||
* IndirectShadowPager.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBSERVER_INDIRECTSHADOWPAGER_H
|
||||
#define FDBSERVER_INDIRECTSHADOWPAGER_H
|
||||
#pragma once
|
||||
|
||||
#include "IKeyValueStore.h"
|
||||
#include "IPager.h"
|
||||
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "flow/Notified.h"
|
||||
|
||||
#include "fdbrpc/IAsyncFile.h"
|
||||
|
||||
typedef uint32_t PhysicalPageID;
|
||||
typedef std::vector<std::pair<Version, PhysicalPageID>> PageVersionMap;
|
||||
typedef std::vector<PageVersionMap> LogicalPageTable;
|
||||
|
||||
class IndirectShadowPager;
|
||||
|
||||
class IndirectShadowPage : public IPage, ReferenceCounted<IndirectShadowPage> {
|
||||
public:
|
||||
IndirectShadowPage();
|
||||
IndirectShadowPage(uint8_t *data);
|
||||
virtual ~IndirectShadowPage();
|
||||
|
||||
virtual void addref() const {
|
||||
ReferenceCounted<IndirectShadowPage>::addref();
|
||||
}
|
||||
|
||||
virtual void delref() const {
|
||||
ReferenceCounted<IndirectShadowPage>::delref();
|
||||
}
|
||||
|
||||
virtual int size() const;
|
||||
virtual uint8_t const* begin() const;
|
||||
virtual uint8_t* mutate();
|
||||
|
||||
//private:
|
||||
static const int PAGE_BYTES;
|
||||
static const int PAGE_OVERHEAD_BYTES;
|
||||
|
||||
private:
|
||||
uint8_t *data;
|
||||
bool allocated;
|
||||
};
|
||||
|
||||
class IndirectShadowPagerSnapshot : public IPagerSnapshot, ReferenceCounted<IndirectShadowPagerSnapshot> {
|
||||
public:
|
||||
IndirectShadowPagerSnapshot(IndirectShadowPager *pager, Version version) : pager(pager), version(version) {}
|
||||
virtual Future<Reference<const IPage>> getPhysicalPage(LogicalPageID pageID);
|
||||
virtual void invalidateReturnedPages();
|
||||
|
||||
virtual void addref() {
|
||||
ReferenceCounted<IndirectShadowPagerSnapshot>::addref();
|
||||
}
|
||||
|
||||
virtual void delref() {
|
||||
ReferenceCounted<IndirectShadowPagerSnapshot>::delref();
|
||||
}
|
||||
|
||||
Arena arena;
|
||||
private:
|
||||
IndirectShadowPager *pager;
|
||||
Version version;
|
||||
};
|
||||
|
||||
class PagerFile {
|
||||
public:
|
||||
PagerFile(IndirectShadowPager *pager);
|
||||
|
||||
PhysicalPageID allocatePage(LogicalPageID logicalPageID, Version version);
|
||||
void freePage(PhysicalPageID physicalPageID);
|
||||
void markPageAllocated(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID);
|
||||
|
||||
void finishedMarkingPages();
|
||||
|
||||
uint64_t size();
|
||||
uint32_t getPagesAllocated();
|
||||
|
||||
void init(uint64_t fileSize, uint32_t pagesAllocated);
|
||||
void startVacuuming();
|
||||
void shutdown();
|
||||
|
||||
//private:
|
||||
Future<Void> vacuuming;
|
||||
IndirectShadowPager *pager;
|
||||
|
||||
uint32_t pagesAllocated;
|
||||
uint64_t fileSize;
|
||||
|
||||
std::set<PhysicalPageID> freePages;
|
||||
|
||||
PhysicalPageID minVacuumQueuePage;
|
||||
bool vacuumQueueReady;
|
||||
std::map<PhysicalPageID, std::pair<LogicalPageID, Version>> vacuumQueue;
|
||||
|
||||
bool canVacuum();
|
||||
|
||||
static const PhysicalPageID INVALID_PAGE;
|
||||
};
|
||||
|
||||
class IndirectShadowPager : public IPager {
|
||||
public:
|
||||
IndirectShadowPager(std::string basename);
|
||||
|
||||
virtual Reference<IPage> newPageBuffer();
|
||||
virtual int getUsablePageSize();
|
||||
|
||||
virtual Reference<IPagerSnapshot> getReadSnapshot(Version version);
|
||||
|
||||
virtual LogicalPageID allocateLogicalPage();
|
||||
virtual void freeLogicalPage(LogicalPageID pageID, Version version);
|
||||
virtual void writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion);
|
||||
virtual void forgetVersions(Version begin, Version end);
|
||||
virtual Future<Void> commit();
|
||||
|
||||
virtual void setLatestVersion(Version version);
|
||||
virtual Future<Version> getLatestVersion();
|
||||
|
||||
virtual Future<Void> getError();
|
||||
virtual Future<Void> onClosed();
|
||||
virtual void dispose();
|
||||
virtual void close();
|
||||
|
||||
Future<Reference<const IPage>> getPage(Reference<IndirectShadowPagerSnapshot> snapshot, LogicalPageID pageID, Version version);
|
||||
|
||||
//private:
|
||||
std::string basename;
|
||||
|
||||
Version latestVersion;
|
||||
Version committedVersion;
|
||||
|
||||
LogicalPageTable pageTable;
|
||||
IKeyValueStore *pageTableLog;
|
||||
|
||||
Reference<IAsyncFile> dataFile;
|
||||
Future<Void> recovery;
|
||||
|
||||
Future<Void> housekeeping;
|
||||
Future<Void> vacuuming;
|
||||
Version oldestVersion;
|
||||
|
||||
std::map<PhysicalPageID, std::pair<int, Promise<Void>>> readCounts;
|
||||
SignalableActorCollection operations;
|
||||
SignalableActorCollection writeActors;
|
||||
Future<Void> committing;
|
||||
|
||||
Promise<Void> closed;
|
||||
Promise<Void> errorPromise;
|
||||
|
||||
std::deque<LogicalPageID> logicalFreeList;
|
||||
PagerFile pagerFile;
|
||||
|
||||
static PageVersionMap::iterator pageVersionMapLowerBound(PageVersionMap &pageVersionMap, Version v);
|
||||
static PageVersionMap::iterator pageVersionMapUpperBound(PageVersionMap &pageVersionMap, Version v);
|
||||
|
||||
void freeLogicalPageID(LogicalPageID pageID);
|
||||
void freePhysicalPageID(PhysicalPageID pageID);
|
||||
|
||||
void logVersion(StringRef versionKey, Version version);
|
||||
void logPagesAllocated();
|
||||
void logPageTableUpdate(LogicalPageID logicalPageID, Version version, PhysicalPageID physicalPageID);
|
||||
void logPageTableClearToEnd(LogicalPageID logicalPageID, Version start);
|
||||
void logPageTableClear(LogicalPageID logicalPageID, Version start, Version end);
|
||||
|
||||
static const StringRef LATEST_VERSION_KEY;
|
||||
static const StringRef OLDEST_VERSION_KEY;
|
||||
static const StringRef PAGES_ALLOCATED_KEY;
|
||||
static const StringRef TABLE_ENTRY_PREFIX;
|
||||
|
||||
};
|
||||
|
||||
#endif
|
|
@ -361,6 +361,14 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( STATUS_MIN_TIME_BETWEEN_REQUESTS, 0.0 );
|
||||
init( CONFIGURATION_ROWS_TO_FETCH, 20000 );
|
||||
|
||||
// IPager
|
||||
init( PAGER_RESERVED_PAGES, 1 );
|
||||
|
||||
// IndirectShadowPager
|
||||
init( FREE_PAGE_VACUUM_THRESHOLD, 1 );
|
||||
init( VACUUM_QUEUE_SIZE, 100000 );
|
||||
init( VACUUM_BYTES_PER_SECOND, 1e6 );
|
||||
|
||||
if(clientKnobs)
|
||||
clientKnobs->IS_ACCEPTABLE_DELAY = clientKnobs->IS_ACCEPTABLE_DELAY*std::min(MAX_READ_TRANSACTION_LIFE_VERSIONS, MAX_WRITE_TRANSACTION_LIFE_VERSIONS)/(5.0*VERSIONS_PER_SECOND);
|
||||
}
|
||||
|
|
|
@ -303,6 +303,14 @@ public:
|
|||
double STATUS_MIN_TIME_BETWEEN_REQUESTS;
|
||||
int CONFIGURATION_ROWS_TO_FETCH;
|
||||
|
||||
// IPager
|
||||
int PAGER_RESERVED_PAGES;
|
||||
|
||||
// IndirectShadowPager
|
||||
int FREE_PAGE_VACUUM_THRESHOLD;
|
||||
int VACUUM_QUEUE_SIZE;
|
||||
int VACUUM_BYTES_PER_SECOND;
|
||||
|
||||
ServerKnobs(bool randomize = false, ClientKnobs* clientKnobs = NULL);
|
||||
};
|
||||
|
||||
|
|
|
@ -0,0 +1,343 @@
|
|||
/*
|
||||
* MemoryPager.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "MemoryPager.h"
|
||||
#include "Knobs.h"
|
||||
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/UnitTest.h"
|
||||
|
||||
MemoryPage::MemoryPage() : allocated(true) {
|
||||
data = (uint8_t*)FastAllocator<4096>::allocate();
|
||||
}
|
||||
|
||||
MemoryPage::MemoryPage(uint8_t *data) : data(data), allocated(false) {}
|
||||
|
||||
MemoryPage::~MemoryPage() {
|
||||
if(allocated) {
|
||||
FastAllocator<4096>::release(data);
|
||||
}
|
||||
}
|
||||
|
||||
uint8_t const* MemoryPage::begin() const {
|
||||
return data;
|
||||
}
|
||||
|
||||
uint8_t* MemoryPage::mutate() {
|
||||
return data;
|
||||
}
|
||||
|
||||
int MemoryPage::size() const {
|
||||
return PAGE_BYTES;
|
||||
}
|
||||
|
||||
const int MemoryPage::PAGE_BYTES = 4096;
|
||||
|
||||
Future<Reference<const IPage>> MemoryPagerSnapshot::getPhysicalPage(LogicalPageID pageID) {
|
||||
return pager->getPage(pageID, version);
|
||||
}
|
||||
|
||||
MemoryPager::MemoryPager() : latestVersion(0), committedVersion(0) {
|
||||
extendData();
|
||||
pageTable.resize(SERVER_KNOBS->PAGER_RESERVED_PAGES);
|
||||
}
|
||||
|
||||
Reference<IPage> MemoryPager::newPageBuffer() {
|
||||
return Reference<IPage>(new MemoryPage());
|
||||
}
|
||||
|
||||
int MemoryPager::getUsablePageSize() {
|
||||
return MemoryPage::PAGE_BYTES;
|
||||
}
|
||||
|
||||
Reference<IPagerSnapshot> MemoryPager::getReadSnapshot(Version version) {
|
||||
ASSERT(version <= latestVersion);
|
||||
return Reference<IPagerSnapshot>(new MemoryPagerSnapshot(this, version));
|
||||
}
|
||||
|
||||
LogicalPageID MemoryPager::allocateLogicalPage() {
|
||||
ASSERT(pageTable.size() >= SERVER_KNOBS->PAGER_RESERVED_PAGES);
|
||||
pageTable.push_back(PageVersionMap());
|
||||
return pageTable.size() - 1;
|
||||
}
|
||||
|
||||
void MemoryPager::freeLogicalPage(LogicalPageID pageID, Version version) {
|
||||
ASSERT(pageID < pageTable.size());
|
||||
|
||||
PageVersionMap &pageVersionMap = pageTable[pageID];
|
||||
ASSERT(!pageVersionMap.empty());
|
||||
|
||||
auto itr = std::lower_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](std::pair<Version, PhysicalPageID> p, Version v) {
|
||||
return p.first < v;
|
||||
});
|
||||
|
||||
pageVersionMap.erase(itr, pageVersionMap.end());
|
||||
if(pageVersionMap.size() > 0 && pageVersionMap.back().second != INVALID_PAGE) {
|
||||
pageVersionMap.push_back(std::make_pair(version, INVALID_PAGE));
|
||||
}
|
||||
}
|
||||
|
||||
void MemoryPager::writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion) {
|
||||
ASSERT(updateVersion > latestVersion || updateVersion == 0);
|
||||
ASSERT(pageID < pageTable.size());
|
||||
|
||||
PageVersionMap &pageVersionMap = pageTable[pageID];
|
||||
|
||||
ASSERT(updateVersion >= committedVersion || updateVersion == 0);
|
||||
PhysicalPageID physicalPageID = allocatePage(contents);
|
||||
|
||||
ASSERT(pageVersionMap.empty() || pageVersionMap.back().second != INVALID_PAGE);
|
||||
|
||||
if(updateVersion == 0) {
|
||||
ASSERT(pageVersionMap.size());
|
||||
updateVersion = pageVersionMap.back().first;
|
||||
pageVersionMap.back().second = physicalPageID;
|
||||
// TODO: what to do with old page?
|
||||
}
|
||||
else {
|
||||
ASSERT(pageVersionMap.empty() || pageVersionMap.back().first < updateVersion);
|
||||
pageVersionMap.push_back(std::make_pair(updateVersion, physicalPageID));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void MemoryPager::forgetVersions(Version begin, Version end) {
|
||||
ASSERT(begin <= end);
|
||||
ASSERT(end <= latestVersion);
|
||||
// TODO
|
||||
}
|
||||
|
||||
Future<Void> MemoryPager::commit() {
|
||||
ASSERT(committedVersion < latestVersion);
|
||||
committedVersion = latestVersion;
|
||||
return Void();
|
||||
}
|
||||
|
||||
void MemoryPager::setLatestVersion(Version version) {
|
||||
ASSERT(version > latestVersion);
|
||||
latestVersion = version;
|
||||
}
|
||||
|
||||
Future<Version> MemoryPager::getLatestVersion() {
|
||||
return latestVersion;
|
||||
}
|
||||
|
||||
Reference<const IPage> MemoryPager::getPage(LogicalPageID pageID, Version version) {
|
||||
ASSERT(pageID < pageTable.size());
|
||||
PageVersionMap const& pageVersionMap = pageTable[pageID];
|
||||
|
||||
auto itr = std::upper_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](Version v, std::pair<Version, PhysicalPageID> p) {
|
||||
return v < p.first;
|
||||
});
|
||||
|
||||
if(itr == pageVersionMap.begin()) {
|
||||
return Reference<IPage>(); // TODO: should this be an error?
|
||||
}
|
||||
|
||||
--itr;
|
||||
|
||||
ASSERT(itr->second != INVALID_PAGE);
|
||||
return Reference<const IPage>(new MemoryPage(itr->second)); // TODO: Page memory owned by the pager. Change this?
|
||||
}
|
||||
|
||||
Future<Void> MemoryPager::getError() {
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> MemoryPager::onClosed() {
|
||||
return closed.getFuture();
|
||||
}
|
||||
|
||||
void MemoryPager::dispose() {
|
||||
closed.send(Void());
|
||||
delete this;
|
||||
}
|
||||
|
||||
void MemoryPager::close() {
|
||||
dispose();
|
||||
}
|
||||
|
||||
PhysicalPageID MemoryPager::allocatePage(Reference<IPage> contents) {
|
||||
if(freeList.size()) {
|
||||
PhysicalPageID pageID = freeList.back();
|
||||
freeList.pop_back();
|
||||
|
||||
memcpy(pageID, contents->begin(), contents->size());
|
||||
return pageID;
|
||||
}
|
||||
else {
|
||||
ASSERT(data.size() && data.back().capacity() - data.back().size() >= contents->size());
|
||||
PhysicalPageID pageID = data.back().end();
|
||||
|
||||
data.back().append(data.arena(), contents->begin(), contents->size());
|
||||
if(data.back().size() == data.back().capacity()) {
|
||||
extendData();
|
||||
}
|
||||
else {
|
||||
ASSERT(data.back().size() <= data.back().capacity() - 4096);
|
||||
}
|
||||
|
||||
return pageID;
|
||||
}
|
||||
}
|
||||
|
||||
void MemoryPager::extendData() {
|
||||
if(data.size() > 1000) { // TODO: is this an ok way to handle large data size?
|
||||
throw io_error();
|
||||
}
|
||||
|
||||
VectorRef<uint8_t> d;
|
||||
d.reserve(data.arena(), 1 << 22);
|
||||
data.push_back(data.arena(), d);
|
||||
}
|
||||
|
||||
// TODO: these tests are not MemoryPager specific, we should make them more general
|
||||
|
||||
void fillPage(Reference<IPage> page, LogicalPageID pageID, Version version) {
|
||||
ASSERT(page->size() > sizeof(LogicalPageID) + sizeof(Version));
|
||||
|
||||
memset(page->mutate(), 0, page->size());
|
||||
memcpy(page->mutate(), (void*)&pageID, sizeof(LogicalPageID));
|
||||
memcpy(page->mutate() + sizeof(LogicalPageID), (void*)&version, sizeof(Version));
|
||||
}
|
||||
|
||||
bool validatePage(Reference<const IPage> page, LogicalPageID pageID, Version version) {
|
||||
bool valid = true;
|
||||
|
||||
LogicalPageID readPageID = *(LogicalPageID*)page->begin();
|
||||
if(readPageID != pageID) {
|
||||
fprintf(stderr, "Invalid PageID detected: %lld (expected %lld)\n", readPageID, pageID);
|
||||
valid = false;
|
||||
}
|
||||
|
||||
Version readVersion = *(Version*)(page->begin()+sizeof(LogicalPageID));
|
||||
if(readVersion != version) {
|
||||
fprintf(stderr, "Invalid Version detected on page %lld: %lld (expected %lld)\n", pageID, readVersion, version);
|
||||
valid = false;
|
||||
}
|
||||
|
||||
return valid;
|
||||
}
|
||||
|
||||
void writePage(IPager *pager, Reference<IPage> page, LogicalPageID pageID, Version version, bool updateVersion=true) {
|
||||
fillPage(page, pageID, version);
|
||||
pager->writePage(pageID, page, updateVersion ? version : 0);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> commit(IPager *pager) {
|
||||
static int commitNum = 1;
|
||||
state int myCommit = commitNum++;
|
||||
|
||||
fprintf(stderr, "Commit%d\n", myCommit);
|
||||
Void _ = wait(pager->commit());
|
||||
fprintf(stderr, "FinishedCommit%d\n", myCommit);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> read(IPager *pager, LogicalPageID pageID, Version version, Version expectedVersion=-1) {
|
||||
static int readNum = 1;
|
||||
state int myRead = readNum++;
|
||||
state Reference<IPagerSnapshot> readSnapshot = pager->getReadSnapshot(version);
|
||||
fprintf(stderr, "Read%d\n", myRead);
|
||||
Reference<const IPage> readPage = wait(readSnapshot->getPhysicalPage(pageID));
|
||||
fprintf(stderr, "FinishedRead%d\n", myRead);
|
||||
ASSERT(validatePage(readPage, pageID, expectedVersion >= 0 ? expectedVersion : version));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> simplePagerTest(IPager *pager) {
|
||||
state Reference<IPage> page = pager->newPageBuffer();
|
||||
|
||||
Version latestVersion = wait(pager->getLatestVersion());
|
||||
fprintf(stderr, "Got latest version: %d\n", latestVersion);
|
||||
|
||||
state Version version = latestVersion+1;
|
||||
state Version v1 = version;
|
||||
|
||||
state LogicalPageID pageID1 = pager->allocateLogicalPage();
|
||||
|
||||
writePage(pager, page, pageID1, v1);
|
||||
pager->setLatestVersion(v1);
|
||||
Void _ = wait(commit(pager));
|
||||
|
||||
state LogicalPageID pageID2 = pager->allocateLogicalPage();
|
||||
|
||||
state Version v2 = ++version;
|
||||
|
||||
writePage(pager, page, pageID1, v2);
|
||||
writePage(pager, page, pageID2, v2);
|
||||
pager->setLatestVersion(v2);
|
||||
Void _ = wait(commit(pager));
|
||||
|
||||
Void _ = wait(read(pager, pageID1, v2));
|
||||
Void _ = wait(read(pager, pageID1, v1));
|
||||
|
||||
state Version v3 = ++version;
|
||||
writePage(pager, page, pageID1, v3, false);
|
||||
pager->setLatestVersion(v3);
|
||||
|
||||
Void _ = wait(read(pager, pageID1, v2, v3));
|
||||
Void _ = wait(read(pager, pageID1, v3, v3));
|
||||
|
||||
state LogicalPageID pageID3 = pager->allocateLogicalPage();
|
||||
|
||||
state Version v4 = ++version;
|
||||
writePage(pager, page, pageID2, v4);
|
||||
writePage(pager, page, pageID3, v4);
|
||||
pager->setLatestVersion(v4);
|
||||
Void _ = wait(commit(pager));
|
||||
|
||||
Void _ = wait(read(pager, pageID2, v4, v4));
|
||||
|
||||
state Version v5 = ++version;
|
||||
writePage(pager, page, pageID2, v5);
|
||||
|
||||
state LogicalPageID pageID4 = pager->allocateLogicalPage();
|
||||
writePage(pager, page, pageID4, v5);
|
||||
|
||||
state Version v6 = ++version;
|
||||
pager->freeLogicalPage(pageID2, v5);
|
||||
pager->freeLogicalPage(pageID3, v3);
|
||||
pager->setLatestVersion(v6);
|
||||
Void _ = wait(commit(pager));
|
||||
|
||||
pager->forgetVersions(0, v4);
|
||||
Void _ = wait(commit(pager));
|
||||
|
||||
Void _ = wait(delay(3.0));
|
||||
|
||||
Void _ = wait(commit(pager));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("fdbserver/memorypager/simple") {
|
||||
state IPager *pager = new MemoryPager();
|
||||
|
||||
Void _ = wait(simplePagerTest(pager));
|
||||
|
||||
Future<Void> closedFuture = pager->onClosed();
|
||||
pager->dispose();
|
||||
|
||||
Void _ = wait(closedFuture);
|
||||
return Void();
|
||||
}
|
||||
|
||||
const PhysicalPageID MemoryPager::INVALID_PAGE = nullptr;
|
|
@ -0,0 +1,119 @@
|
|||
/*
|
||||
* MemoryPager.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBSERVER_MEMORYPAGER_H
|
||||
#define FDBSERVER_MEMORYPAGER_H
|
||||
#pragma once
|
||||
|
||||
#include "IPager.h"
|
||||
|
||||
typedef uint8_t* PhysicalPageID;
|
||||
typedef std::vector<std::pair<Version, PhysicalPageID>> PageVersionMap;
|
||||
typedef std::vector<PageVersionMap> LogicalPageTable;
|
||||
|
||||
class MemoryPager;
|
||||
|
||||
class MemoryPage : public IPage, ReferenceCounted<MemoryPage> {
|
||||
public:
|
||||
MemoryPage();
|
||||
MemoryPage(uint8_t *data);
|
||||
virtual ~MemoryPage();
|
||||
|
||||
virtual void addref() const {
|
||||
ReferenceCounted<MemoryPage>::addref();
|
||||
}
|
||||
|
||||
virtual void delref() const {
|
||||
ReferenceCounted<MemoryPage>::delref();
|
||||
}
|
||||
|
||||
virtual int size() const;
|
||||
virtual uint8_t const* begin() const;
|
||||
virtual uint8_t* mutate();
|
||||
|
||||
private:
|
||||
friend class MemoryPager;
|
||||
uint8_t *data;
|
||||
bool allocated;
|
||||
|
||||
static const int PAGE_BYTES;
|
||||
};
|
||||
|
||||
class MemoryPagerSnapshot : public IPagerSnapshot, ReferenceCounted<MemoryPagerSnapshot> {
|
||||
public:
|
||||
MemoryPagerSnapshot(MemoryPager *pager, Version version) : pager(pager), version(version) {}
|
||||
virtual Future<Reference<const IPage>> getPhysicalPage(LogicalPageID pageID);
|
||||
virtual void invalidateReturnedPages() {}
|
||||
|
||||
virtual void addref() {
|
||||
ReferenceCounted<MemoryPagerSnapshot>::addref();
|
||||
}
|
||||
|
||||
virtual void delref() {
|
||||
ReferenceCounted<MemoryPagerSnapshot>::delref();
|
||||
}
|
||||
|
||||
private:
|
||||
MemoryPager *pager;
|
||||
Version version;
|
||||
};
|
||||
|
||||
class MemoryPager : public IPager, ReferenceCounted<MemoryPager> {
|
||||
public:
|
||||
MemoryPager();
|
||||
|
||||
virtual Reference<IPage> newPageBuffer();
|
||||
virtual int getUsablePageSize();
|
||||
|
||||
virtual Reference<IPagerSnapshot> getReadSnapshot(Version version);
|
||||
|
||||
virtual LogicalPageID allocateLogicalPage();
|
||||
virtual void freeLogicalPage(LogicalPageID pageID, Version version);
|
||||
virtual void writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion);
|
||||
virtual void forgetVersions(Version begin, Version end);
|
||||
virtual Future<Void> commit();
|
||||
|
||||
virtual void setLatestVersion(Version version);
|
||||
virtual Future<Version> getLatestVersion();
|
||||
|
||||
virtual Future<Void> getError();
|
||||
virtual Future<Void> onClosed();
|
||||
virtual void dispose();
|
||||
virtual void close();
|
||||
|
||||
virtual Reference<const IPage> getPage(LogicalPageID pageID, Version version);
|
||||
|
||||
private:
|
||||
Version latestVersion;
|
||||
Version committedVersion;
|
||||
Standalone<VectorRef<VectorRef<uint8_t>>> data;
|
||||
LogicalPageTable pageTable;
|
||||
|
||||
Promise<Void> closed;
|
||||
|
||||
std::vector<PhysicalPageID> freeList; // TODO: is this good enough for now?
|
||||
|
||||
PhysicalPageID allocatePage(Reference<IPage> contents);
|
||||
void extendData();
|
||||
|
||||
static const PhysicalPageID INVALID_PAGE;
|
||||
};
|
||||
|
||||
#endif
|
|
@ -0,0 +1,673 @@
|
|||
/*
|
||||
* VersionedBTree.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "flow/flow.h"
|
||||
#include "IVersionedStore.h"
|
||||
#include "IPager.h"
|
||||
#include "fdbclient/Tuple.h"
|
||||
#include "flow/serialize.h"
|
||||
#include "flow/genericactors.actor.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "MemoryPager.h"
|
||||
#include <map>
|
||||
#include <vector>
|
||||
|
||||
#define INTERNAL_PAGES_HAVE_TUPLES 1
|
||||
|
||||
struct SimpleFixedSizeMapRef {
|
||||
typedef std::vector<std::pair<std::string, std::string>> KVPairsT;
|
||||
|
||||
SimpleFixedSizeMapRef() : flags(0) {}
|
||||
|
||||
static SimpleFixedSizeMapRef decode(StringRef buf) {
|
||||
SimpleFixedSizeMapRef result;
|
||||
BinaryReader r(buf, AssumeVersion(currentProtocolVersion));
|
||||
r >> result.flags;
|
||||
r >> result.entries;
|
||||
return result;
|
||||
};
|
||||
|
||||
// Returns -1 if key is less than first key, otherwise index into entries
|
||||
int findLastLessOrEqual(StringRef key) {
|
||||
return std::upper_bound(entries.begin(), entries.end(),
|
||||
key,
|
||||
[](StringRef const& a, KVPairsT::value_type const& b) { return a<b.first; })
|
||||
- entries.begin() - 1;
|
||||
}
|
||||
|
||||
template<typename Allocator>
|
||||
static Reference<IPage> emptyPage(uint8_t newFlags, Allocator const &newPageFn) {
|
||||
Reference<IPage> page = newPageFn();
|
||||
BinaryWriter bw(AssumeVersion(currentProtocolVersion));
|
||||
bw << newFlags;
|
||||
bw << KVPairsT();
|
||||
memcpy(page->mutate(), bw.getData(), bw.getLength());
|
||||
return page;
|
||||
}
|
||||
|
||||
template<typename Allocator>
|
||||
static vector<std::pair<int, Reference<IPage>>> buildMany(const KVPairsT &kvPairs, uint8_t newFlags, Allocator const &newPageFn, int page_size_override = -1) {
|
||||
vector<std::pair<int, Reference<IPage>>> pages;
|
||||
|
||||
Reference<IPage> page = newPageFn();
|
||||
int pageSize = page->size();
|
||||
if(page_size_override > 0 && page_size_override < pageSize)
|
||||
pageSize = page_size_override;
|
||||
BinaryWriter bw(AssumeVersion(currentProtocolVersion));
|
||||
bw << newFlags;
|
||||
uint32_t i = 0;
|
||||
uint32_t start = i;
|
||||
int mapSizeOffset = bw.getLength();
|
||||
bw << start; // placeholder for map size
|
||||
|
||||
for(auto const &kv : kvPairs) {
|
||||
// If page would overflow, output it and start new one
|
||||
if(bw.getLength() + 8 + kv.first.size() + kv.second.size() > pageSize) {
|
||||
memcpy(page->mutate(), bw.getData(), bw.getLength());
|
||||
*(uint32_t *)(page->mutate() + mapSizeOffset) = i - start;
|
||||
pages.push_back({start, page});
|
||||
bw = BinaryWriter(AssumeVersion(currentProtocolVersion));
|
||||
bw << newFlags;
|
||||
page = newPageFn();
|
||||
start = i;
|
||||
int mapSizeOffset = bw.getLength();
|
||||
bw << start; // placeholder for map size;
|
||||
}
|
||||
|
||||
bw << kv;
|
||||
++i;
|
||||
}
|
||||
|
||||
if(bw.getLength() != sizeof(newFlags)) {
|
||||
memcpy(page->mutate(), bw.getData(), bw.getLength());
|
||||
*(uint32_t *)(page->mutate() + mapSizeOffset) = i - start;
|
||||
pages.push_back({start, page});
|
||||
}
|
||||
|
||||
return pages;
|
||||
}
|
||||
|
||||
std::string toString();
|
||||
|
||||
KVPairsT entries;
|
||||
uint8_t flags;
|
||||
};
|
||||
|
||||
#define NOT_IMPLEMENTED { UNSTOPPABLE_ASSERT(false); }
|
||||
|
||||
class VersionedBTree : public IVersionedStore {
|
||||
public:
|
||||
enum EPageFlags { IS_LEAF = 1};
|
||||
|
||||
typedef SimpleFixedSizeMapRef FixedSizeMap;
|
||||
|
||||
virtual Future<Void> getError() NOT_IMPLEMENTED
|
||||
virtual Future<Void> onClosed() NOT_IMPLEMENTED
|
||||
virtual void dispose() NOT_IMPLEMENTED
|
||||
virtual void close() NOT_IMPLEMENTED
|
||||
|
||||
virtual KeyValueStoreType getType() NOT_IMPLEMENTED
|
||||
virtual bool supportsMutation(int op) NOT_IMPLEMENTED
|
||||
virtual StorageBytes getStorageBytes() NOT_IMPLEMENTED
|
||||
|
||||
// Writes are provided in an ordered stream.
|
||||
// A write is considered part of (a change leading to) the version determined by the previous call to setWriteVersion()
|
||||
// A write shall not become durable until the following call to commit() begins, and shall be durable once the following call to commit() returns
|
||||
virtual void set(KeyValueRef keyValue) {
|
||||
ASSERT(m_writeVersion != invalidVersion);
|
||||
m_buffer[keyValue.key.toString()].push_back({m_writeVersion, keyValue.value.toString()});
|
||||
}
|
||||
virtual void clear(KeyRangeRef range) NOT_IMPLEMENTED
|
||||
virtual void mutate(int op, StringRef param1, StringRef param2) NOT_IMPLEMENTED
|
||||
|
||||
// Versions [begin, end) no longer readable
|
||||
virtual void forgetVersions(Version begin, Version end) NOT_IMPLEMENTED
|
||||
|
||||
virtual Future<Version> getLatestVersion() {
|
||||
if(m_writeVersion != invalidVersion)
|
||||
return m_writeVersion;
|
||||
return m_pager->getLatestVersion();
|
||||
}
|
||||
|
||||
VersionedBTree(IPager *pager, int page_size_override = -1) : m_pager(pager), m_writeVersion(invalidVersion), m_page_size_override(page_size_override) {
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> init(VersionedBTree *self) {
|
||||
// TODO: don't just create a new root, load the existing one
|
||||
self->m_root = self->m_pager->allocateLogicalPage();
|
||||
Version latest = wait(self->m_pager->getLatestVersion());
|
||||
Version v = latest + 1;
|
||||
IPager *pager = self->m_pager;
|
||||
self->writePage(self->m_root, FixedSizeMap::emptyPage(EPageFlags::IS_LEAF, [pager](){ return pager->newPageBuffer(); }), v);
|
||||
self->m_pager->setLatestVersion(v);
|
||||
Void _ = wait(self->m_pager->commit());
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> init() { return init(this); }
|
||||
|
||||
virtual ~VersionedBTree() {}
|
||||
|
||||
// readAtVersion() may only be called on a version which has previously been passed to setWriteVersion() and never previously passed
|
||||
// to forgetVersion. The returned results when violating this precondition are unspecified; the store is not required to be able to detect violations.
|
||||
// The returned read cursor provides a consistent snapshot of the versioned store, corresponding to all the writes done with write versions less
|
||||
// than or equal to the given version.
|
||||
// If readAtVersion() is called on the *current* write version, the given read cursor MAY reflect subsequent writes at the same
|
||||
// write version, OR it may represent a snapshot as of the call to readAtVersion().
|
||||
virtual Reference<IStoreCursor> readAtVersion(Version v) {
|
||||
// TODO: Use the buffer to return uncommitted data
|
||||
return Reference<IStoreCursor>(new Cursor(v, m_pager, m_root));
|
||||
}
|
||||
|
||||
// Must be nondecreasing
|
||||
virtual void setWriteVersion(Version v) {
|
||||
ASSERT(v >= m_writeVersion);
|
||||
m_writeVersion = v;
|
||||
//m_pager->setLatestVersion(v);
|
||||
}
|
||||
|
||||
virtual Future<Void> commit() {
|
||||
return commit_impl(this);
|
||||
}
|
||||
|
||||
private:
|
||||
void writePage(LogicalPageID id, Reference<IPage> page, Version ver) {
|
||||
FixedSizeMap map = FixedSizeMap::decode(StringRef(page->begin(), page->size()));
|
||||
printf("Writing page: id=%d ver=%lld %s\n", id, ver, map.toString().c_str());
|
||||
m_pager->writePage(id, page, ver);
|
||||
}
|
||||
|
||||
LogicalPageID m_root;
|
||||
|
||||
typedef std::vector<std::pair<Version, std::vector<std::pair<std::string, LogicalPageID>>>> VersionedChildrenT;
|
||||
typedef std::map<std::string, std::vector<std::pair<Version, std::string>>> MutationBufferT;
|
||||
struct KeyVersionValue {
|
||||
KeyVersionValue(Key k, Version ver, Value val) : key(k), version(ver), value(val) {}
|
||||
bool operator< (KeyVersionValue const &rhs) const {
|
||||
int64_t cmp = key.compare(rhs.key);
|
||||
if(cmp == 0) {
|
||||
cmp = version - rhs.version;
|
||||
if(cmp == 0)
|
||||
return false;
|
||||
}
|
||||
return cmp < 0;
|
||||
}
|
||||
Key key;
|
||||
Version version;
|
||||
Value value;
|
||||
};
|
||||
|
||||
ACTOR static Future<VersionedChildrenT> commitSubtree(VersionedBTree *self, Reference<IPagerSnapshot> snapshot, LogicalPageID root, std::string lowerBoundKey, MutationBufferT::const_iterator bufBegin, MutationBufferT::const_iterator bufEnd) {
|
||||
printf("commit subtree from page %u\n", root);
|
||||
if(bufBegin == bufEnd) {
|
||||
return VersionedChildrenT({ {0,{{lowerBoundKey,root}}} });
|
||||
}
|
||||
|
||||
state FixedSizeMap map;
|
||||
printf("commitSubtree: Reading page %d\n", root);
|
||||
Reference<const IPage> rawPage = wait(snapshot->getPhysicalPage(root));
|
||||
map = FixedSizeMap::decode(StringRef(rawPage->begin(), rawPage->size()));
|
||||
|
||||
if(map.flags & EPageFlags::IS_LEAF) {
|
||||
VersionedChildrenT results;
|
||||
FixedSizeMap::KVPairsT kvpairs;
|
||||
|
||||
// Fill existing with records from the roof page (which is a leaf)
|
||||
std::vector<KeyVersionValue> existing;
|
||||
for(auto const &kv : map.entries) {
|
||||
Tuple t = Tuple::unpack(kv.first);
|
||||
existing.push_back(KeyVersionValue(t.getString(0), t.getInt(1), StringRef(kv.second)));
|
||||
}
|
||||
|
||||
// Fill mutations with changes begin committed
|
||||
std::vector<KeyVersionValue> mutations;
|
||||
Version minVersion = std::numeric_limits<Version>::max();
|
||||
MutationBufferT::const_iterator iBuf = bufBegin;
|
||||
while(iBuf != bufEnd) {
|
||||
Key k = StringRef(iBuf->first);
|
||||
for(auto const &vv : iBuf->second) {
|
||||
mutations.push_back(KeyVersionValue(StringRef(k), vv.first, StringRef(vv.second)));
|
||||
minVersion = std::min(minVersion, vv.first);
|
||||
}
|
||||
++iBuf;
|
||||
}
|
||||
|
||||
std::vector<KeyVersionValue> merged;
|
||||
std::merge(existing.cbegin(), existing.cend(), mutations.cbegin(), mutations.cend(), std::back_inserter(merged));
|
||||
|
||||
// TODO: Make version and key splits based on contents of merged list
|
||||
|
||||
FixedSizeMap::KVPairsT leafEntries;
|
||||
for(auto const &kvv : merged) {
|
||||
Tuple t;
|
||||
t.append(kvv.key);
|
||||
t.append(kvv.version);
|
||||
leafEntries.push_back({t.pack().toString(), kvv.value.toString()});
|
||||
}
|
||||
|
||||
IPager *pager = self->m_pager;
|
||||
vector< std::pair<int, Reference<IPage>> > pages = FixedSizeMap::buildMany( leafEntries, EPageFlags::IS_LEAF, [pager](){ return pager->newPageBuffer(); }, self->m_page_size_override);
|
||||
|
||||
if(pages.size() != 1)
|
||||
results.push_back( {0, {{lowerBoundKey, root}}} );
|
||||
|
||||
// Verify that no consecutive split keys are equal
|
||||
StringRef lastSplitKey(LiteralStringRef("\xff\xff\xff"));
|
||||
for(auto const &p : pages) {
|
||||
if(p.first != 0) {
|
||||
StringRef splitKey = merged[p.first].key;
|
||||
ASSERT(splitKey != lastSplitKey);
|
||||
lastSplitKey = splitKey;
|
||||
}
|
||||
}
|
||||
|
||||
// For each IPage of data, assign a logical pageID.
|
||||
std::vector<LogicalPageID> logicalPages;
|
||||
|
||||
// Only reuse first page if only one page is being returned or if root is not the btree root.
|
||||
if(pages.size() == 1 || root != self->m_root)
|
||||
logicalPages.push_back(root);
|
||||
|
||||
// Allocate enough pageIDs for all of the pages
|
||||
for(int i=logicalPages.size(); i<pages.size(); i++)
|
||||
logicalPages.push_back(self->m_pager->allocateLogicalPage() );
|
||||
|
||||
// Write each page using its assigned page ID
|
||||
printf("Writing leaf pages, subtreeRoot=%u\n", root);
|
||||
for(int i=0; i<pages.size(); i++)
|
||||
self->writePage(logicalPages[i], pages[i].second, minVersion);
|
||||
|
||||
results.push_back({minVersion, {}});
|
||||
|
||||
for(int i=0; i<pages.size(); i++) {
|
||||
// Actorcompiler doesn't like using #if here since there are no lines of code after this loop
|
||||
if(INTERNAL_PAGES_HAVE_TUPLES)
|
||||
results.back().second.push_back( {leafEntries[pages[i].first].first, logicalPages[i]} );
|
||||
else {
|
||||
Tuple t = Tuple::unpack(leafEntries[pages[i].first].first);
|
||||
results.back().second.push_back( {t.getString(0).toString(), logicalPages[i]} );
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
else {
|
||||
state std::vector<Future<VersionedChildrenT>> m_futureChildren;
|
||||
|
||||
auto childMutBegin = bufBegin;
|
||||
|
||||
for(int i=0; i<map.entries.size(); i++) {
|
||||
auto childMutEnd = bufEnd;
|
||||
if (i+1 != map.entries.size()) {
|
||||
if(INTERNAL_PAGES_HAVE_TUPLES) {
|
||||
Tuple t = Tuple::unpack(map.entries[i+1].first);
|
||||
childMutEnd = self->m_buffer.lower_bound( t.getString(0).toString() );
|
||||
}
|
||||
else
|
||||
childMutEnd = self->m_buffer.lower_bound( map.entries[i+1].first );
|
||||
}
|
||||
|
||||
m_futureChildren.push_back(self->commitSubtree(self, snapshot, *(uint32_t*)map.entries[i].second.data(), map.entries[i].first, childMutBegin, childMutEnd));
|
||||
childMutBegin = childMutEnd;
|
||||
}
|
||||
|
||||
Void _ = wait(waitForAll(m_futureChildren));
|
||||
|
||||
bool unmodified = true;
|
||||
for( auto &c : m_futureChildren) {
|
||||
if(c.get().size() != 1 || c.get()[0].second.size() != 1) {
|
||||
unmodified = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if(unmodified)
|
||||
return VersionedChildrenT({{0, {{lowerBoundKey, root}}}});
|
||||
|
||||
Version version = 0;
|
||||
VersionedChildrenT result;
|
||||
|
||||
loop { // over version splits of this page
|
||||
Version nextVersion = std::numeric_limits<Version>::max();
|
||||
|
||||
FixedSizeMap::KVPairsT childEntries; // Logically std::vector<std::pair<std::string, LogicalPageID>> childEntries;
|
||||
|
||||
// For each Future<VersionedChildrenT>
|
||||
//printf("LOOP: Version %lld\n", version);
|
||||
|
||||
for( auto& c : m_futureChildren ) {
|
||||
const VersionedChildrenT &children = c.get();
|
||||
/*
|
||||
printf(" versioned page set size: %d\n", children.size());
|
||||
for(auto &versionedPageSet : children) {
|
||||
printf(" version: %lld\n", versionedPageSet.first);
|
||||
for(auto &boundaryPage : versionedPageSet.second) {
|
||||
printf(" %s -> %u\n", boundaryPage.first.c_str(), boundaryPage.second);
|
||||
}
|
||||
}
|
||||
printf(" Current version: %lld\n", version);
|
||||
*/
|
||||
|
||||
// Find the first version greater than the current version we are writing
|
||||
auto cv = std::upper_bound( children.begin(), children.end(), version, [](Version a, VersionedChildrenT::value_type const &b) { return a < b.first; } );
|
||||
|
||||
// If there are no versions before the one we found, just update nextVersion and continue.
|
||||
if(cv == children.begin()) {
|
||||
//printf(" First version (%lld) in set is greater than current, setting nextVersion and continuing\n", cv->first);
|
||||
nextVersion = std::min(nextVersion, cv->first);
|
||||
//printf(" curr %lld next %lld\n", version, nextVersion);
|
||||
continue;
|
||||
}
|
||||
|
||||
// If a version greater than the current version being written was found, update nextVersion
|
||||
if(cv != children.end()) {
|
||||
//printf(" First greater version found is %lld\n");
|
||||
nextVersion = std::min(nextVersion, cv->first);
|
||||
//printf(" curr %lld next %lld\n", version, nextVersion);
|
||||
}
|
||||
|
||||
// Go back one to the last version that was valid prior to or at the current version we are writing
|
||||
--cv;
|
||||
|
||||
//printf(" Using children for version %lld\n", cv->first);
|
||||
|
||||
// Add the children at this version to the child entries list for the current version being built.
|
||||
for (auto &childPage : cv->second) {
|
||||
//printf(" Adding child page '%s'\n", childPage.first.c_str());
|
||||
childEntries.push_back( {childPage.first, std::string((char *)&childPage.second, sizeof(uint32_t))});
|
||||
}
|
||||
}
|
||||
|
||||
//printf("Finished pass through futurechildren. childEntries=%d version=%lld nextVersion=%lld\n", childEntries.size(), version, nextVersion);
|
||||
|
||||
// TODO: Track split points across iterations of this loop, so that they don't shift unnecessarily and
|
||||
// cause unnecessary path copying
|
||||
|
||||
IPager *pager = self->m_pager;
|
||||
vector< std::pair<int, Reference<IPage>> > pages = FixedSizeMap::buildMany( childEntries, 0, [pager](){ return pager->newPageBuffer(); }, self->m_page_size_override);
|
||||
|
||||
// For each IPage of data, assign a logical pageID.
|
||||
std::vector<LogicalPageID> logicalPages;
|
||||
|
||||
// Only reuse first page if only one page is being returned or if root is not the btree root.
|
||||
if(pages.size() == 1 || root != self->m_root)
|
||||
logicalPages.push_back(root);
|
||||
|
||||
// Allocate enough pageIDs for all of the pages
|
||||
for(int i=logicalPages.size(); i<pages.size(); i++)
|
||||
logicalPages.push_back( self->m_pager->allocateLogicalPage() );
|
||||
|
||||
// Write each page using its assigned page ID
|
||||
printf("Writing internal pages, subtreeRoot=%u\n", root);
|
||||
for(int i=0; i<pages.size(); i++)
|
||||
self->writePage( logicalPages[i], pages[i].second, version );
|
||||
|
||||
result.resize(result.size()+1);
|
||||
result.back().first = version;
|
||||
|
||||
for(int i=0; i<pages.size(); i++)
|
||||
result.back().second.push_back( {childEntries[pages[i].first].first, logicalPages[i]} );
|
||||
|
||||
if (result.size() > 1 && result.back().second == result.end()[-2].second)
|
||||
result.pop_back();
|
||||
|
||||
if (nextVersion == std::numeric_limits<Version>::max())
|
||||
break;
|
||||
version = nextVersion;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> commit_impl(VersionedBTree *self) {
|
||||
Version latestVersion = wait(self->m_pager->getLatestVersion());
|
||||
|
||||
VersionedChildrenT rootNodes = wait(commitSubtree(self, self->m_pager->getReadSnapshot(latestVersion), self->m_root, std::string(), self->m_buffer.begin(), self->m_buffer.end()));
|
||||
|
||||
for(auto const &versionedPages : rootNodes) {
|
||||
// If the version of the root page set is 0 and the page set size is 1 then there is nothing to write.
|
||||
if(versionedPages.first == 0 & versionedPages.second.size() == 1)
|
||||
continue;
|
||||
|
||||
FixedSizeMap::KVPairsT childEntries;
|
||||
for (auto &childPage : versionedPages.second)
|
||||
childEntries.push_back( {childPage.first, std::string((char *)&childPage.second, sizeof(uint32_t))});
|
||||
|
||||
IPager *pager = self->m_pager;
|
||||
vector< std::pair<int, Reference<IPage>> > pages = FixedSizeMap::buildMany( childEntries, 0, [pager](){ return pager->newPageBuffer(); }, self->m_page_size_override);
|
||||
// Until we have only one root, write new multi-page top levels of the tree
|
||||
while(pages.size() != 1) {
|
||||
printf("Root level would be %d pages\n", pages.size());
|
||||
FixedSizeMap::KVPairsT newRootLevelEntries;
|
||||
printf("Writing new root level at version %lld\n", versionedPages.first);
|
||||
for(auto const &p : pages) {
|
||||
LogicalPageID pageID = self->m_pager->allocateLogicalPage();
|
||||
self->writePage(pageID, p.second, versionedPages.first);
|
||||
newRootLevelEntries.push_back( {childEntries[p.first].first, std::string((char *)&pageID, sizeof(LogicalPageID))});
|
||||
}
|
||||
|
||||
pages = FixedSizeMap::buildMany( newRootLevelEntries, 0, [pager](){ return pager->newPageBuffer(); }, self->m_page_size_override);
|
||||
childEntries = std::move(newRootLevelEntries);
|
||||
}
|
||||
|
||||
printf("Writing new root id %d\n", self->m_root);
|
||||
self->writePage(self->m_root, pages[0].second, versionedPages.first);
|
||||
}
|
||||
|
||||
self->m_pager->setLatestVersion(self->m_writeVersion);
|
||||
self->m_pager->commit();
|
||||
|
||||
self->m_buffer.clear();
|
||||
return Void();
|
||||
}
|
||||
|
||||
IPager *m_pager;
|
||||
MutationBufferT m_buffer;
|
||||
Version m_writeVersion;
|
||||
int m_page_size_override;
|
||||
|
||||
class Cursor : public IStoreCursor, public ReferenceCounted<Cursor> {
|
||||
public:
|
||||
Cursor(Version version, IPager *pager, LogicalPageID root)
|
||||
: m_version(version), m_pager(pager->getReadSnapshot(version)), m_root(root) {
|
||||
}
|
||||
virtual ~Cursor() {}
|
||||
|
||||
virtual Future<Void> findFirstGreaterOrEqual(KeyRef key, int prefetchNextBytes) NOT_IMPLEMENTED
|
||||
virtual Future<Void> findLastLessOrEqual(KeyRef key, int prefetchPriorBytes) NOT_IMPLEMENTED
|
||||
virtual Future<Void> next(bool needValue) NOT_IMPLEMENTED
|
||||
virtual Future<Void> prev(bool needValue) NOT_IMPLEMENTED
|
||||
|
||||
virtual bool isValid() {
|
||||
return m_kv.present();
|
||||
}
|
||||
|
||||
virtual KeyRef getKey() {
|
||||
return m_kv.get().key;
|
||||
}
|
||||
//virtual StringRef getCompressedKey() = 0;
|
||||
virtual ValueRef getValue() {
|
||||
return m_kv.get().value;
|
||||
}
|
||||
|
||||
virtual void invalidateReturnedStrings() {
|
||||
m_pager->invalidateReturnedPages();
|
||||
}
|
||||
|
||||
Version m_version;
|
||||
Reference<IPagerSnapshot> m_pager;
|
||||
Optional<KeyValueRef> m_kv;
|
||||
Arena m_arena;
|
||||
LogicalPageID m_root;
|
||||
|
||||
void addref() { ReferenceCounted<Cursor>::addref(); }
|
||||
void delref() { ReferenceCounted<Cursor>::delref(); }
|
||||
|
||||
ACTOR static Future<Void> findEqual_impl(Reference<Cursor> self, KeyRef key) {
|
||||
state LogicalPageID pageNumber = self->m_root;
|
||||
|
||||
state Tuple t;
|
||||
t.append(key);
|
||||
t.append(self->m_version);
|
||||
state KeyRef tupleKey = t.pack();
|
||||
|
||||
loop {
|
||||
printf("findEqual: Reading page %d @%lld\n", pageNumber, self->m_version);
|
||||
Reference<const IPage> rawPage = wait(self->m_pager->getPhysicalPage(pageNumber));
|
||||
FixedSizeMap map = FixedSizeMap::decode(StringRef(rawPage->begin(), rawPage->size()));
|
||||
|
||||
// Special case of empty page (which should only happen for root)
|
||||
if(map.entries.empty()) {
|
||||
ASSERT(pageNumber == self->m_root);
|
||||
self->m_kv = Optional<KeyValueRef>();
|
||||
return Void();
|
||||
}
|
||||
|
||||
if(map.flags && EPageFlags::IS_LEAF) {
|
||||
int i = map.findLastLessOrEqual(tupleKey);
|
||||
if(i >= 0 && Tuple::unpack(map.entries[i].first).getString(0) == key) {
|
||||
self->m_kv = Standalone<KeyValueRef>(KeyValueRef(key, map.entries[i].second), self->m_arena);
|
||||
}
|
||||
else {
|
||||
self->m_kv = Optional<KeyValueRef>();
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
else {
|
||||
int i = map.findLastLessOrEqual(INTERNAL_PAGES_HAVE_TUPLES ? tupleKey : key);
|
||||
i = std::max(i, 0);
|
||||
pageNumber = *(uint32_t *)map.entries[i].second.data();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
virtual Future<Void> findEqual(KeyRef key) {
|
||||
return findEqual_impl(Reference<Cursor>::addRef(this), key);
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
KeyValue randomKV() {
|
||||
int kLen = g_random->randomInt(1, 10);
|
||||
int vLen = g_random->randomInt(0, 5);
|
||||
KeyValue kv;
|
||||
kv.key = makeString(kLen, kv.arena());
|
||||
kv.value = makeString(vLen, kv.arena());
|
||||
for(int i = 0; i < kLen; ++i)
|
||||
mutateString(kv.key)[i] = (uint8_t)g_random->randomInt('a', 'm');
|
||||
for(int i = 0; i < vLen; ++i)
|
||||
mutateString(kv.value)[i] = (uint8_t)g_random->randomInt('n', 'z');
|
||||
return kv;
|
||||
}
|
||||
|
||||
TEST_CASE("/redwood/set") {
|
||||
state IPager *pager = new MemoryPager();
|
||||
state VersionedBTree *btree = new VersionedBTree(pager, g_random->randomInt(50, 200));
|
||||
Void _ = wait(btree->init());
|
||||
|
||||
state std::map<std::pair<std::string, Version>, std::string> written;
|
||||
|
||||
Version lastVer = wait(btree->getLatestVersion());
|
||||
state Version version = lastVer + 1;
|
||||
state int commits = g_random->randomInt(1, 20);
|
||||
printf("Will do %d commits\n", commits);
|
||||
while(commits--) {
|
||||
int versions = g_random->randomInt(1, 20);
|
||||
printf(" Commit will have %d versions\n", versions);
|
||||
while(versions--) {
|
||||
btree->setWriteVersion(version);
|
||||
int changes = g_random->randomInt(0, 20);
|
||||
printf(" Version will have %d changes\n", changes);
|
||||
while(changes--) {
|
||||
KeyValue kv = randomKV();
|
||||
printf(" Set '%s' -> '%s' @%lld\n", kv.key.toString().c_str(), kv.value.toString().c_str(), version);
|
||||
btree->set(kv);
|
||||
written[std::make_pair(kv.key.toString(), version)] = kv.value.toString();
|
||||
}
|
||||
++version;
|
||||
}
|
||||
Void _ = wait(btree->commit());
|
||||
|
||||
// Check that all writes can be read at their written versions
|
||||
state std::map<std::pair<std::string, Version>, std::string>::const_iterator i = written.cbegin();
|
||||
state std::map<std::pair<std::string, Version>, std::string>::const_iterator iEnd = written.cend();
|
||||
state int errors = 0;
|
||||
|
||||
printf("Checking changes committed thus far.\n");
|
||||
while(i != iEnd) {
|
||||
state std::string key = i->first.first;
|
||||
state Version ver = i->first.second;
|
||||
state std::string val = i->second;
|
||||
|
||||
state Reference<IStoreCursor> cur = btree->readAtVersion(ver);
|
||||
|
||||
Void _ = wait(cur->findEqual(i->first.first));
|
||||
|
||||
if(!(cur->isValid() && cur->getKey() == key && cur->getValue() == val)) {
|
||||
++errors;
|
||||
if(!cur->isValid())
|
||||
printf("Verify failed: key_not_found: '%s' -> '%s' @%lld\n", key.c_str(), val.c_str(), ver);
|
||||
else if(cur->getKey() != key)
|
||||
printf("Verify failed: key_incorrect: found '%s' expected '%s' @%lld\n", cur->getKey().toString().c_str(), key.c_str(), ver);
|
||||
else if(cur->getValue() != val)
|
||||
printf("Verify failed: value_incorrect: for '%s' found '%s' expected '%s' @%lld\n", cur->getKey().toString().c_str(), cur->getValue().toString().c_str(), val.c_str(), ver);
|
||||
}
|
||||
++i;
|
||||
}
|
||||
|
||||
printf("%d sets, %d errors\n", (int)written.size(), errors);
|
||||
|
||||
if(errors != 0)
|
||||
throw internal_error();
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
std::string SimpleFixedSizeMapRef::toString() {
|
||||
std::string result;
|
||||
result.append(format("flags=0x%x data: ", flags));
|
||||
for(auto const &kv : entries) {
|
||||
result.append(" ");
|
||||
if(INTERNAL_PAGES_HAVE_TUPLES || flags && VersionedBTree::IS_LEAF) {
|
||||
Tuple t = Tuple::unpack(kv.first);
|
||||
result.append("[");
|
||||
for(int i = 0; i < t.size(); ++i) {
|
||||
if(i != 0)
|
||||
result.append(",");
|
||||
if(t.getType(i) == Tuple::ElementType::BYTES)
|
||||
result.append(format("%s", t.getString(i).toString().c_str()));
|
||||
if(t.getType(i) == Tuple::ElementType::INT)
|
||||
result.append(format("%lld", t.getInt(i)));
|
||||
}
|
||||
}
|
||||
else
|
||||
result.append(format("'%s'", printable(StringRef(kv.first)).c_str()));
|
||||
result.append("->");
|
||||
if(flags && VersionedBTree::IS_LEAF)
|
||||
result.append(format("'%s'", printable(StringRef(kv.second)).c_str()));
|
||||
else
|
||||
result.append(format("%u", *(const uint32_t *)kv.second.data()));
|
||||
result.append("]");
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
|
@ -33,6 +33,7 @@
|
|||
<ActorCompiler Include="worker.actor.cpp" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<ActorCompiler Include="VersionedBTree.actor.cpp" />
|
||||
<ActorCompiler Include="Coordination.actor.cpp" />
|
||||
<ActorCompiler Include="CoordinatedState.actor.cpp" />
|
||||
<ActorCompiler Include="CoroFlow.actor.cpp" />
|
||||
|
@ -45,6 +46,7 @@
|
|||
<ActorCompiler Include="KeyValueStoreMemory.actor.cpp" />
|
||||
<ActorCompiler Include="SimulatedCluster.actor.cpp" />
|
||||
<ActorCompiler Include="KeyValueStoreCompressTestData.actor.cpp" />
|
||||
<ActorCompiler Include="IndirectShadowPager.actor.cpp" />
|
||||
<ClCompile Include="Knobs.cpp" />
|
||||
<ActorCompiler Include="QuietDatabase.actor.cpp" />
|
||||
<ActorCompiler Include="networktest.actor.cpp" />
|
||||
|
@ -53,6 +55,7 @@
|
|||
<ActorCompiler Include="Resolver.actor.cpp" />
|
||||
<ActorCompiler Include="LogSystemDiskQueueAdapter.actor.cpp" />
|
||||
<ActorCompiler Include="LogSystemPeekCursor.actor.cpp" />
|
||||
<ActorCompiler Include="MemoryPager.actor.cpp" />
|
||||
<ActorCompiler Include="OldTLogServer.actor.cpp" />
|
||||
<ClCompile Include="SkipList.cpp" />
|
||||
<ActorCompiler Include="WaitFailure.actor.cpp" />
|
||||
|
@ -150,12 +153,16 @@
|
|||
<ClInclude Include="DBCoreState.h" />
|
||||
<ClInclude Include="IDiskQueue.h" />
|
||||
<ClInclude Include="IKeyValueStore.h" />
|
||||
<ClInclude Include="IndirectShadowPager.h" />
|
||||
<ClInclude Include="IPager.h" />
|
||||
<ClInclude Include="IVersionedStore.h" />
|
||||
<ClInclude Include="LeaderElection.h" />
|
||||
<ClInclude Include="LogProtocolMessage.h" />
|
||||
<ClInclude Include="LogSystem.h" />
|
||||
<ClInclude Include="LogSystemConfig.h" />
|
||||
<ClInclude Include="LogSystemDiskQueueAdapter.h" />
|
||||
<ClInclude Include="MasterInterface.h" />
|
||||
<ClInclude Include="MemoryPager.h" />
|
||||
<ClInclude Include="MoveKeys.h" />
|
||||
<ClInclude Include="NetworkTest.h" />
|
||||
<ActorCompiler Include="Orderer.actor.h">
|
||||
|
|
|
@ -246,6 +246,9 @@
|
|||
<ActorCompiler Include="workloads\AtomicRestore.actor.cpp">
|
||||
<Filter>workloads</Filter>
|
||||
</ActorCompiler>
|
||||
<ActorCompiler Include="workloads\SlowTaskWorkload.actor.cpp" />
|
||||
<ActorCompiler Include="MemoryPager.actor.cpp" />
|
||||
<ActorCompiler Include="IndirectShadowPager.actor.cpp" />
|
||||
<ActorCompiler Include="OldTLogServer.actor.cpp" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
|
@ -330,6 +333,10 @@
|
|||
<ClInclude Include="ApplyMetadataMutation.h" />
|
||||
<ClInclude Include="RecoveryState.h" />
|
||||
<ClInclude Include="LogProtocolMessage.h" />
|
||||
<ClInclude Include="IPager.h" />
|
||||
<ClInclude Include="IVersionedStore.h" />
|
||||
<ClInclude Include="MemoryPager.h" />
|
||||
<ClInclude Include="IndirectShadowPager.h" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<Filter Include="workloads">
|
||||
|
|
|
@ -63,4 +63,44 @@ public:
|
|||
void clear( bool returnWhenEmptied ) { m_out.cancel(); m_out = actorCollection(m_add.getFuture(), NULL, NULL, NULL, NULL, returnWhenEmptied ); }
|
||||
};
|
||||
|
||||
class SignalableActorCollection : NonCopyable {
|
||||
PromiseStream<Future<Void>> m_add;
|
||||
Promise<Void> stopSignal;
|
||||
Future<Void> m_out;
|
||||
|
||||
void init() {
|
||||
PromiseStream<Future<Void>> addStream;
|
||||
m_out = actorCollection(addStream.getFuture(), NULL, NULL, NULL, NULL, true);
|
||||
m_add = addStream;
|
||||
stopSignal = Promise<Void>();
|
||||
m_add.send(stopSignal.getFuture());
|
||||
}
|
||||
|
||||
public:
|
||||
explicit SignalableActorCollection() {
|
||||
init();
|
||||
}
|
||||
|
||||
Future<Void> signal() {
|
||||
stopSignal.send(Void());
|
||||
Future<Void> result = holdWhile(m_add, m_out);
|
||||
return result;
|
||||
}
|
||||
|
||||
Future<Void> signalAndReset() {
|
||||
Future<Void> result = signal();
|
||||
clear();
|
||||
return result;
|
||||
}
|
||||
|
||||
Future<Void> signalAndCollapse() {
|
||||
Future<Void> result = signalAndReset();
|
||||
add(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
void add(Future<Void> a) { m_add.send(a); }
|
||||
void clear() { init(); }
|
||||
};
|
||||
|
||||
#endif
|
|
@ -0,0 +1,6 @@
|
|||
testTitle=UnitTests
|
||||
testName=UnitTests
|
||||
startDelay=0
|
||||
useDB=false
|
||||
maxTestCases=0
|
||||
testsMatching=/redwood
|
Loading…
Reference in New Issue