COWPager will no longer expire read Snapshots that are still in use.

This commit is contained in:
Stephen Atherton 2019-10-18 01:27:00 -07:00
parent 0e9d082805
commit 44175e0921
3 changed files with 73 additions and 36 deletions

View File

@ -213,13 +213,14 @@ public:
// After the returned future is ready, future calls must not wait.
virtual Future<Version> getLatestVersion() = 0;
// The pager can invalidate snapshots at versions < v and reuse
// any pages that were freed as of version v
virtual void setOldestVersion(Version v) = 0;
// Get the oldest readable version
// Returns the oldest readable version as of the most recent committed version
virtual Future<Version> getOldestVersion() = 0;
// The pager can reuse pages that were freed at a version less than v.
// If any snapshots are in use at a version less than v, the pager can invalidate them
// or keep their versions around until the snapshots are no longer in use.
virtual void setOldestVersion(Version v) = 0;
protected:
~IPager2() {} // Destruction should be done using close()/dispose() from the IClosable interface
};

View File

@ -58,7 +58,8 @@ public:
virtual void clear(KeyRangeRef range) = 0;
virtual void mutate(int op, StringRef param1, StringRef param2) = 0;
virtual void setWriteVersion(Version) = 0; // The write version must be nondecreasing
virtual void forgetVersions(Version begin, Version end) = 0; // Versions [begin, end) no longer readable
virtual void setOldestVersion(Version v) = 0; // Set oldest readable version to be used in next commit
virtual Version getOldestVersion() = 0; // Get oldest readable version
virtual Future<Void> commit() = 0;
virtual Future<Version> getLatestVersion() = 0;

View File

@ -779,6 +779,8 @@ ACTOR template<class T> Future<T> forwardError(Future<T> f, Promise<Void> target
}
}
class COWPagerSnapshot;
class COWPager : public IPager2 {
public:
typedef FastAllocatedPage Page;
@ -940,8 +942,8 @@ public:
self->pHeader->pageCount = 2;
// Create a new free list
self->freeList.create(self, self->newPageID().get(), "FreeList");
self->delayedFreeList.create(self, self->newPageID().get(), "delayedFreeList");
self->freeList.create(self, self->newLastPageID(), "FreeList");
self->delayedFreeList.create(self, self->newLastPageID(), "delayedFreeList");
// The first commit() below will flush the queues and update the queue states in the header,
// but since the queues will not be used between now and then their states will not change.
@ -982,19 +984,28 @@ public:
return freePageID.get();
}
Optional<DelayedFreePage> delayedFreePageID = wait(self->delayedFreeList.pop(DelayedFreePage{self->pLastCommittedHeader->oldestVersion, 0}));
// Try to reuse pages up to the earlier of the oldest version set by the user or the oldest snapshot still in the snapshots list
ASSERT(!self->snapshots.empty());
Version oldestVersion = std::min(self->pLastCommittedHeader->oldestVersion, self->snapshots.front().version);
Optional<DelayedFreePage> delayedFreePageID = wait(self->delayedFreeList.pop(DelayedFreePage{oldestVersion, 0}));
if(delayedFreePageID.present()) {
debug_printf("COWPager(%s) newPageID() returning %s from delayed free list\n", self->filename.c_str(), toString(delayedFreePageID.get()).c_str());
return delayedFreePageID.get().pageID;
}
// Lastly, grow the pager file by a page and return it.
LogicalPageID id = self->pHeader->pageCount;
++self->pHeader->pageCount;
// Lastly, add a new page to the pager
LogicalPageID id = self->newLastPageID();
debug_printf("COWPager(%s) newPageID() returning %s at end of file\n", self->filename.c_str(), toString(id).c_str());
return id;
};
// Grow the pager file by pone page and return it
LogicalPageID newLastPageID() {
LogicalPageID id = pHeader->pageCount;
++pHeader->pageCount;
return id;
}
Future<LogicalPageID> newPageID() override {
return forwardError(newPageID_impl(this), errorPromise);
}
@ -1131,7 +1142,7 @@ public:
// Get snapshot as of the most recent committed version of the pager
Reference<IPagerSnapshot> getReadSnapshot(Version v) override;
void addLatestSnapshot() override;
void addLatestSnapshot();
void setOldestVersion(Version v) override {
ASSERT(v >= pHeader->oldestVersion);
@ -1156,6 +1167,10 @@ public:
loop {
state bool freeBusy = wait(self->freeList.preFlush());
state bool delayedFreeBusy = wait(self->delayedFreeList.preFlush());
// Once preFlush() returns false for both queues then there are no more operations pending
// on either queue. If preFlush() returns true for either queue in one loop execution then
// it could have generated new work for itself or the other queue.
if(!freeBusy && !delayedFreeBusy) {
break;
}
@ -1184,6 +1199,9 @@ public:
self->updateCommittedHeader();
self->addLatestSnapshot();
// Try to expire snapshots up to the oldest version, in case some were being kept around due to being in use,
// because maybe some are no longer in use.
self->expireSnapshots(self->pHeader->oldestVersion);
return Void();
}
@ -1268,15 +1286,8 @@ public:
private:
~COWPager() {}
// Expire snapshots up to but not including v
void expireSnapshots(Version v) {
debug_printf("COWPager(%s) expiring snapshots through %" PRId64 " snapshot count %d\n", filename.c_str(), v, (int)snapshots.size());
while(snapshots.size() > 1 && snapshots.front().version < v) {
debug_printf("COWPager(%s) expiring snapshot for %" PRId64 "\n", filename.c_str(), snapshots.front().version);
snapshots.front().expired.sendError(transaction_too_old());
snapshots.pop_front();
}
}
// Try to expire snapshots up to but not including v, but do not expire any snapshots that are in use.
void expireSnapshots(Version v);
#pragma pack(push, 1)
// Header is the format of page 0 of the database
@ -1373,7 +1384,7 @@ private:
struct SnapshotEntry {
Version version;
Promise<Void> expired;
Reference<IPagerSnapshot> snapshot;
Reference<COWPagerSnapshot> snapshot;
};
struct SnapshotEntryLessThanVersion {
@ -1390,7 +1401,7 @@ private:
};
// Prevents pager from reusing freed pages from version until the snapshot is destroyed
class COWPagerSnapshot : public IPagerSnapshot, ReferenceCounted<COWPagerSnapshot> {
class COWPagerSnapshot : public IPagerSnapshot, public ReferenceCounted<COWPagerSnapshot> {
public:
COWPagerSnapshot(COWPager *pager, Key meta, Version version, Future<Void> expiredFuture) : pager(pager), metaKey(meta), version(version), expired(expiredFuture) {
}
@ -1428,6 +1439,18 @@ public:
Key metaKey;
};
void COWPager::expireSnapshots(Version v) {
debug_printf("COWPager(%s) expiring snapshots through %" PRId64 " snapshot count %d\n", filename.c_str(), v, (int)snapshots.size());
while(snapshots.size() > 1 && snapshots.front().version < v && snapshots.front().snapshot->isSoleOwner()) {
debug_printf("COWPager(%s) expiring snapshot for %" PRId64 "\n", filename.c_str(), snapshots.front().version);
// The snapshot contract could be made such that the expired promise isn't need anymore. In practice it
// probably is already not needed but it will gracefully handle the case where a user begins a page read
// with a snapshot reference, keeps the page read future, and drops the snapshot reference.
snapshots.front().expired.sendError(transaction_too_old());
snapshots.pop_front();
}
}
Reference<IPagerSnapshot> COWPager::getReadSnapshot(Version v) {
ASSERT(!snapshots.empty());
@ -1444,7 +1467,7 @@ void COWPager::addLatestSnapshot() {
snapshots.push_back({
pLastCommittedHeader->committedVersion,
expired,
Reference<IPagerSnapshot>(new COWPagerSnapshot(this, pLastCommittedHeader->getMetaKey(), pLastCommittedHeader->committedVersion, expired.getFuture()))
Reference<COWPagerSnapshot>(new COWPagerSnapshot(this, pLastCommittedHeader->getMetaKey(), pLastCommittedHeader->committedVersion, expired.getFuture()))
});
}
@ -2479,8 +2502,13 @@ public:
virtual void mutate(int op, StringRef param1, StringRef param2) NOT_IMPLEMENTED
// Versions [begin, end) no longer readable
virtual void forgetVersions(Version begin, Version end) NOT_IMPLEMENTED
virtual void setOldestVersion(Version v) {
m_newOldestVersion = v;
}
virtual Version getOldestVersion() {
return m_pager->getOldestVersion().get();
}
virtual Future<Version> getLatestVersion() {
if(m_writeVersion != invalidVersion)
@ -2567,7 +2595,9 @@ public:
ACTOR static Future<Void> init_impl(VersionedBTree *self) {
state Version latest = wait(self->m_pager->getLatestVersion());
debug_printf("Recovered pager to version %" PRId64 "\n", latest);
self->m_newOldestVersion = self->m_pager->getOldestVersion().get();
debug_printf("Recovered pager to version %" PRId64 ", oldest version is %" PRId64 "\n", self->m_newOldestVersion);
state Key meta = self->m_pager->getMetaKey();
if(meta.size() == 0) {
@ -2612,12 +2642,11 @@ public:
m_latestCommit.cancel();
}
// readAtVersion() may only be called on a version which has previously been passed to setWriteVersion() and never previously passed
// to forgetVersion. The returned results when violating this precondition are unspecified; the store is not required to be able to detect violations.
// readAtVersion() may only be called on a committed v which has previously been passed to setWriteVersion() and never previously passed
// to setOldestVersion. The returned results when violating this precondition are unspecified; the store is not required to be able to detect violations.
// The returned read cursor provides a consistent snapshot of the versioned store, corresponding to all the writes done with write versions less
// than or equal to the given version.
// If readAtVersion() is called on the *current* write version, the given read cursor MAY reflect subsequent writes at the same
// write version, OR it may represent a snapshot as of the call to readAtVersion().
// v must be a committed version.
virtual Reference<IStoreCursor> readAtVersion(Version v) {
// Only committed versions can be read.
Version recordVersion = singleVersion ? 0 : v;
@ -2909,6 +2938,7 @@ private:
Version m_writeVersion;
Version m_lastCommittedVersion;
Version m_newOldestVersion;
Future<Void> m_latestCommit;
Future<Void> m_init;
std::string m_name;
@ -3680,10 +3710,8 @@ private:
// Wait for the latest commit that started to be finished.
wait(previousCommit);
// Advance oldest version by a random number between 0 and the difference between the latest and oldest versions.
Version newOldestVersion = self->m_pager->getOldestVersion().get() + deterministicRandom()->randomInt(0, self->m_pager->getLatestVersion().get() - self->m_pager->getOldestVersion().get() + 1);
self->m_pager->setOldestVersion(newOldestVersion);
debug_printf("%s: Beginning commit of version %" PRId64 ", oldest version set to %" PRId64 "\n", self->m_name.c_str(), writeVersion, newOldestVersion);
self->m_pager->setOldestVersion(self->m_newOldestVersion);
debug_printf("%s: Beginning commit of version %" PRId64 ", new oldest version set to %" PRId64 "\n", self->m_name.c_str(), writeVersion, self->m_newOldestVersion);
state Future<Void> lazyDelete = incrementalLazyDelete(self, 100);
@ -5277,6 +5305,7 @@ TEST_CASE("!/redwood/correctness/btree") {
state int mutationBytesTarget = shortTest ? 5000 : randomSize(std::min<int>(maxCommitSize * 100, 100e6));
state double clearProbability = deterministicRandom()->random01() * .1;
state double coldStartProbability = deterministicRandom()->random01();
state double advanceOldVersionProbability = deterministicRandom()->random01();
state double maxWallClockDuration = 60;
printf("\n");
@ -5290,6 +5319,7 @@ TEST_CASE("!/redwood/correctness/btree") {
printf("mutationBytesTarget: %d\n", mutationBytesTarget);
printf("clearProbability: %f\n", clearProbability);
printf("coldStartProbability: %f\n", coldStartProbability);
printf("advanceOldVersionProbability: %f\n", advanceOldVersionProbability);
printf("\n");
printf("Deleting existing test data...\n");
@ -5431,6 +5461,11 @@ TEST_CASE("!/redwood/correctness/btree") {
Version v = version; // Avoid capture of version as a member of *this
// Sometimes advance the oldest version to close the gap between the oldest and latest versions by a random amount.
if(deterministicRandom()->random01() < advanceOldVersionProbability) {
btree->setOldestVersion(btree->getLastCommittedVersion() - deterministicRandom()->randomInt(0, btree->getLastCommittedVersion() - btree->getOldestVersion() + 1));
}
commit = map(btree->commit(), [=](Void) {
printf("Committed: %s\n", VersionedBTree::counts.toString(true).c_str());
// Notify the background verifier that version is committed and therefore readable