Redwood commit versions are now FDB commit versions for storage instances. Redwood no longer makes internal commits during Pager or BTree recovery so that commit version is always set by the user.

This commit is contained in:
Steve Atherton 2021-09-10 03:28:46 -07:00
parent 9df392f447
commit b06cf7a328
9 changed files with 224 additions and 217 deletions

View File

@ -746,7 +746,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( REDWOOD_LAZY_CLEAR_BATCH_SIZE_PAGES, 10 );
init( REDWOOD_LAZY_CLEAR_MIN_PAGES, 0 );
init( REDWOOD_LAZY_CLEAR_MAX_PAGES, 1e6 );
init( REDWOOD_REMAP_CLEANUP_WINDOW, 50 );
init( REDWOOD_REMAP_CLEANUP_WINDOW_SECONDS, 60 );
init( REDWOOD_REMAP_CLEANUP_LAG, 0.1 );
init( REDWOOD_METRICS_INTERVAL, 5.0 );
init( REDWOOD_HISTOGRAM_INTERVAL, 30.0 );

View File

@ -692,7 +692,7 @@ public:
// queue is empty
int REDWOOD_LAZY_CLEAR_MAX_PAGES; // Maximum number of pages to free before ending a lazy clear cycle, unless the
// queue is empty
int64_t REDWOOD_REMAP_CLEANUP_WINDOW; // Remap remover lag interval in which to coalesce page writes
int64_t REDWOOD_REMAP_CLEANUP_WINDOW_SECONDS; // Remap remover lag interval in which to coalesce page writes
double REDWOOD_REMAP_CLEANUP_LAG; // Maximum allowed remap remover lag behind the cleanup window as a multiple of
// the window size
double REDWOOD_METRICS_INTERVAL;

View File

@ -35,6 +35,7 @@ set(FDBSERVER_SRCS
IDiskQueue.h
IKeyValueContainer.h
IKeyValueStore.h
IKeyValueStore.cpp
IPager.h
KeyValueStoreCompressTestData.actor.cpp
KeyValueStoreMemory.actor.cpp

View File

@ -0,0 +1,37 @@
/*
* IKeyValueStore.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/IKeyValueStore.h"
#include "fdbclient/FDBTypes.h"
static const StringRef persistVersion = "\xff\xffVersion"_sr;
Future<Version> IKeyValueStore::getCommittedVersion() {
return map(readValue(persistVersion), [](Optional<Value> val) -> Version {
if (val.present()) {
return BinaryReader::fromStringRef<Version>(val.get(), Unversioned());
}
return invalidVersion;
});
}
void IKeyValueStore::setCommitVersion(Version v) {
set(KeyValueRef(persistVersion, BinaryWriter::toValue(v, Unversioned())));
}

View File

@ -58,6 +58,12 @@ public:
// The total size of the returned value (less the last entry) will be less than byteLimit
virtual Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit = 1 << 30, int byteLimit = 1 << 30) = 0;
// Set the version of the next commit.
// Must be greater than 0 and greater than previous commit version.
virtual void setCommitVersion(Version v);
// Get latest durable committed version
virtual Future<Version> getCommittedVersion();
// To debug MEMORY_RADIXTREE type ONLY
// Returns (1) how many key & value pairs have been inserted (2) how many nodes have been created (3) how many
// key size is less than 12 bytes

View File

@ -257,8 +257,10 @@ public:
// The snapshot shall be usable until setOldVersion() is called with a version > v.
virtual Reference<IPagerSnapshot> getReadSnapshot(Version v) = 0;
// Atomically make durable all pending page writes, page frees, and update the metadata string.
virtual Future<Void> commit() = 0;
// Atomically make durable all pending page writes, page frees, and update the metadata string,
// setting the committed version to v
// v must be >= the highest versioned page write.
virtual Future<Void> commit(Version v) = 0;
// Get the latest meta key set or committed
virtual Key getMetaKey() const = 0;
@ -266,9 +268,6 @@ public:
// Set the metakey which will be stored in the next commit
virtual void setMetaKey(KeyRef metaKey) = 0;
// Sets the next commit version
virtual void setCommitVersion(Version v) = 0;
virtual StorageBytes getStorageBytes() const = 0;
virtual int64_t getPageCount() = 0;
@ -282,16 +281,16 @@ public:
virtual Future<Void> init() = 0;
// Returns latest committed version
virtual Version getLatestVersion() const = 0;
virtual Version getLastCommittedVersion() const = 0;
// Returns the oldest readable version as of the most recent committed version
virtual Version getOldestVersion() const = 0;
virtual Version getOldestReadableVersion() const = 0;
// Sets the oldest readable version to be put into affect at the next commit.
// The pager can reuse pages that were freed at a version less than v.
// If any snapshots are in use at a version less than v, the pager can either forcefully
// invalidate them or keep their versions around until the snapshots are no longer in use.
virtual void setOldestVersion(Version v) = 0;
virtual void setOldestReadableVersion(Version v) = 0;
protected:
~IPager2() {} // Destruction should be done using close()/dispose() from the IClosable interface

View File

@ -2187,8 +2187,8 @@ public:
self->pHeader = (Header*)self->headerPage->begin();
if (self->pHeader->formatVersion != Header::FORMAT_VERSION) {
Error e = internal_error(); // TODO: Something better?
TraceEvent(SevError, "RedwoodRecoveryFailedWrongVersion")
Error e = wrong_format_version();
TraceEvent(SevWarn, "RedwoodRecoveryFailedWrongVersion")
.detail("Filename", self->filename)
.detail("Version", self->pHeader->formatVersion)
.detail("ExpectedVersion", Header::FORMAT_VERSION)
@ -2277,7 +2277,7 @@ public:
// Update the last committed header with the one that was recovered (which is the last known committed
// header)
self->updateCommittedHeader();
self->addLatestSnapshot();
self->addSnapshot(self->pHeader->committedVersion, self->pHeader->getMetaKey());
// Reset the remapQueue head reader for normal reads
self->remapQueue.resetHeadReader();
@ -2308,8 +2308,9 @@ public:
// Write new header using desiredPageSize
self->pHeader->formatVersion = Header::FORMAT_VERSION;
self->pHeader->committedVersion = 1;
self->pHeader->oldestVersion = 1;
constexpr int initialVersion = 0;
self->pHeader->committedVersion = initialVersion;
self->pHeader->oldestVersion = initialVersion;
// No meta key until a user sets one and commits
self->pHeader->setMetaKey(Key());
@ -2342,14 +2343,12 @@ public:
0xff,
self->headerPage->size() - self->pHeader->size());
// Since there is no previously committed header use the initial header for the initial commit.
// There is no previously committed header, but the current header state is sufficient to use as the backup
// header for the next commit, which if recovered would result in a valid empty pager at version 0.
self->updateCommittedHeader();
// TODO: Double check this - need to do this as extentUsedList was pushed into
self->addLatestSnapshot();
self->addSnapshot(initialVersion, KeyRef());
self->remapCleanupFuture = Void();
wait(self->commit());
}
debug_printf("DWALPager(%s) recovered. committedVersion=%" PRId64 " logicalPageSize=%d physicalPageSize=%d\n",
@ -2427,7 +2426,6 @@ public:
// Try to reuse pages up to the earlier of the oldest version set by the user or the oldest snapshot still in
// the snapshots list
ASSERT(!self->snapshots.empty());
Optional<DelayedFreePage> delayedFreePageID =
wait(self->delayedFreeList.pop(DelayedFreePage{ self->effectiveOldestVersion(), 0 }));
if (delayedFreePageID.present()) {
@ -2996,10 +2994,25 @@ public:
// Get snapshot as of the most recent committed version of the pager
Reference<IPagerSnapshot> getReadSnapshot(Version v) override;
void addLatestSnapshot();
void addSnapshot(Version version, KeyRef meta) {
if (!snapshots.empty() && snapshots.back().version == version) {
// Replacing the latest snapshot is currently only allowed for the initial commit at version 0
// Committing at the same version multiple times in a row could be supported (and likely is already
// supported physically) but it would be a bad practice for a user to do since after recovery it would
// be unclear which "version" of the same version was recovered to unless the user stores some other
// metadata to differentiate them.
ASSERT(version == 0);
snapshots.back().snapshot =
makeReference<DWALPagerSnapshot>(this, meta, version, snapshots.back().expired.getFuture());
} else {
Promise<Void> expired;
snapshots.push_back(
{ version, expired, makeReference<DWALPagerSnapshot>(this, meta, version, expired.getFuture()) });
}
}
// Set the pending oldest versiont to keep as of the next commit
void setOldestVersion(Version v) override {
void setOldestReadableVersion(Version v) override {
ASSERT(v >= pHeader->oldestVersion);
ASSERT(v <= pHeader->committedVersion);
pHeader->oldestVersion = v;
@ -3008,7 +3021,7 @@ public:
// Get the oldest *readable* version, which is not the same as the oldest retained version as the version
// returned could have been set as the oldest version in the pending commit
Version getOldestVersion() const override { return pHeader->oldestVersion; };
Version getOldestReadableVersion() const override { return pHeader->oldestVersion; };
// Calculate the *effective* oldest version, which can be older than the one set in the last commit since we
// are allowing active snapshots to temporarily delay page reuse.
@ -3247,9 +3260,10 @@ public:
return Void();
}
ACTOR static Future<Void> commit_impl(DWALPager* self) {
debug_printf("DWALPager(%s) commit begin\n", self->filename.c_str());
ACTOR static Future<Void> commit_impl(DWALPager* self, Version v) {
debug_printf("DWALPager(%s) commit begin %s\n", self->filename.c_str(), ::toString(v).c_str());
ASSERT(v >= self->pLastCommittedHeader->committedVersion);
// Write old committed header to Page 1
self->writeHeaderPage(1, self->lastCommittedHeaderPage);
@ -3259,6 +3273,7 @@ public:
wait(flushQueues(self));
self->pHeader->committedVersion = v;
self->pHeader->remapQueue = self->remapQueue.getState();
self->pHeader->extentFreeList = self->extentFreeList.getState();
self->pHeader->extentUsedList = self->extentUsedList.getState();
@ -3297,7 +3312,7 @@ public:
// Update the last committed header for use in the next commit.
self->updateCommittedHeader();
self->addLatestSnapshot();
self->addSnapshot(v, self->pHeader->getMetaKey());
// Try to expire snapshots up to the oldest version, in case some were being kept around due to being in use,
// because maybe some are no longer in use.
@ -3309,17 +3324,15 @@ public:
return Void();
}
Future<Void> commit() override {
Future<Void> commit(Version v) override {
// Can't have more than one commit outstanding.
ASSERT(commitFuture.isReady());
commitFuture = forwardError(commit_impl(this), errorPromise);
commitFuture = forwardError(commit_impl(this, v), errorPromise);
return commitFuture;
}
Key getMetaKey() const override { return pHeader->getMetaKey(); }
void setCommitVersion(Version v) override { pHeader->committedVersion = v; }
void setMetaKey(KeyRef metaKey) override { pHeader->setMetaKey(metaKey); }
ACTOR void shutdown(DWALPager* self, bool dispose) {
@ -3435,7 +3448,7 @@ public:
Future<Void> init() override { return recoverFuture; }
Version getLatestVersion() const override { return pLastCommittedHeader->committedVersion; }
Version getLastCommittedVersion() const override { return pLastCommittedHeader->committedVersion; }
private:
~DWALPager() {}
@ -3446,7 +3459,7 @@ private:
#pragma pack(push, 1)
// Header is the format of page 0 of the database
struct Header {
static constexpr int FORMAT_VERSION = 3;
static constexpr int FORMAT_VERSION = 6;
uint16_t formatVersion;
uint32_t queueCount;
uint32_t pageSize;
@ -3637,15 +3650,6 @@ Reference<IPagerSnapshot> DWALPager::getReadSnapshot(Version v) {
return i->snapshot;
}
void DWALPager::addLatestSnapshot() {
Promise<Void> expired;
snapshots.push_back(
{ pLastCommittedHeader->committedVersion,
expired,
makeReference<DWALPagerSnapshot>(
this, pLastCommittedHeader->getMetaKey(), pLastCommittedHeader->committedVersion, expired.getFuture()) });
}
// TODO: Move this to a flow header once it is mature.
struct SplitStringRef {
StringRef a;
@ -4413,7 +4417,7 @@ public:
#pragma pack(push, 1)
struct MetaKey {
static constexpr int FORMAT_VERSION = 12;
static constexpr int FORMAT_VERSION = 13;
// This serves as the format version for the entire tree, individual pages will not be versioned
uint16_t formatVersion;
uint8_t height;
@ -4458,10 +4462,8 @@ public:
StorageBytes getStorageBytes() const { return m_pager->getStorageBytes(); }
// Writes are provided in an ordered stream.
// A write is considered part of (a change leading to) the version determined by the previous call to
// setWriteVersion() A write shall not become durable until the following call to commit() begins, and shall be
// durable once the following call to commit() returns
// Set key to value as of the next commit
// The new value is not readable until after the next commit is completed.
void set(KeyValueRef keyValue) {
++g_redwoodMetrics.metric.opSet;
g_redwoodMetrics.metric.opSetKeyBytes += keyValue.key.size();
@ -4488,23 +4490,14 @@ public:
m_pBuffer->erase(iBegin, iEnd);
}
void setOldestVersion(Version v) { m_newOldestVersion = v; }
void setOldestReadableVersion(Version v) { m_newOldestVersion = v; }
Version getOldestVersion() const { return m_pager->getOldestVersion(); }
Version getOldestReadableVersion() const { return m_pager->getOldestReadableVersion(); }
Version getLatestVersion() const {
if (m_writeVersion != invalidVersion)
return m_writeVersion;
return m_pager->getLatestVersion();
}
Version getWriteVersion() const { return m_writeVersion; }
Version getLastCommittedVersion() const { return m_lastCommittedVersion; }
Version getLastCommittedVersion() const { return m_pager->getLastCommittedVersion(); }
VersionedBTree(IPager2* pager, std::string name)
: m_pager(pager), m_pBuffer(nullptr), m_writeVersion(invalidVersion), m_lastCommittedVersion(invalidVersion),
m_name(name), m_pHeader(nullptr), m_headerSpace(0) {
: m_pager(pager), m_pBuffer(nullptr), m_name(name), m_pHeader(nullptr), m_headerSpace(0) {
m_lazyClearActor = 0;
m_init = init_impl(this);
@ -4516,7 +4509,8 @@ public:
self->m_lazyClearStop = false;
// TODO: Is it contractually okay to always to read at the latest version?
state Reference<IPagerSnapshot> snapshot = self->m_pager->getReadSnapshot(self->m_pager->getLatestVersion());
state Reference<IPagerSnapshot> snapshot =
self->m_pager->getReadSnapshot(self->m_pager->getLastCommittedVersion());
state int freedPages = 0;
loop {
@ -4615,47 +4609,48 @@ public:
ACTOR static Future<Void> init_impl(VersionedBTree* self) {
wait(self->m_pager->init());
self->m_pBuffer.reset(new MutationBuffer());
// TODO: Get actual max MetaKey size limit from Pager
self->m_headerSpace = self->m_pager->getUsablePageSize();
self->m_pHeader = (MetaKey*)new uint8_t[self->m_headerSpace];
self->m_blockSize = self->m_pager->getUsablePageSize();
state Version latest = self->m_pager->getLatestVersion();
self->m_newOldestVersion = self->m_pager->getOldestVersion();
self->m_newOldestVersion = self->m_pager->getOldestReadableVersion();
debug_printf("Recovered pager to version %" PRId64 ", oldest version is %" PRId64 "\n",
self->m_newOldestVersion);
state Key meta = self->m_pager->getMetaKey();
if (meta.size() == 0) {
// Create new BTree
self->m_pHeader->formatVersion = MetaKey::FORMAT_VERSION;
LogicalPageID id = wait(self->m_pager->newPageID());
BTreePageIDRef newRoot((LogicalPageID*)&id, 1);
debug_printf("new root %s\n", toString(newRoot).c_str());
self->m_pHeader->root.set(newRoot, self->m_headerSpace - sizeof(MetaKey));
self->m_pHeader->height = 1;
++latest;
Reference<ArenaPage> page = self->m_pager->newPageBuffer();
makeEmptyRoot(page);
self->m_pager->updatePage(PagerEventReasons::MetaData, nonBtreeLevel, id, page);
self->m_pager->setCommitVersion(latest);
LogicalPageID newQueuePage = wait(self->m_pager->newPageID());
self->m_lazyClearQueue.create(
self->m_pager, newQueuePage, "LazyClearQueue", self->m_pager->newLastQueueID(), false);
self->m_pHeader->lazyDeleteQueue = self->m_lazyClearQueue.getState();
self->m_pager->setMetaKey(self->m_pHeader->asKeyRef());
wait(self->m_pager->commit());
debug_printf("Committed initial commit.\n");
debug_printf("BTree created (but not committed)\n");
} else {
self->m_pHeader->fromKeyRef(meta);
self->m_lazyClearQueue.recover(self->m_pager, self->m_pHeader->lazyDeleteQueue, "LazyClearQueueRecovered");
debug_printf("BTree recovered.\n");
}
debug_printf("Recovered btree at version %" PRId64 ": %s\n", latest, self->m_pHeader->toString().c_str());
self->m_lazyClearActor = 0;
self->m_lastCommittedVersion = latest;
self->m_lazyClearActor = incrementalLazyClear(self);
debug_printf("Recovered btree at version %" PRId64 ": %s\n",
self->m_pager->getLastCommittedVersion(),
self->m_pHeader->toString().c_str());
return Void();
}
@ -4673,38 +4668,17 @@ public:
}
}
// Must be nondecreasing
void setWriteVersion(Version v) {
ASSERT(v > m_lastCommittedVersion);
// If there was no current mutation buffer, create one in the buffer map and update m_pBuffer
if (m_pBuffer == nullptr) {
// When starting a new mutation buffer its start version must be greater than the last write version
ASSERT(v > m_writeVersion);
m_pBuffer = &m_mutationBuffers[v];
} else {
// It's OK to set the write version to the same version repeatedly so long as m_pBuffer is not null
ASSERT(v >= m_writeVersion);
}
m_writeVersion = v;
}
Future<Void> commit() {
if (m_pBuffer == nullptr)
return m_latestCommit;
return commit_impl(this);
}
Future<Void> commit(Version v) { return commit_impl(this, v); }
ACTOR static Future<Void> clearAllAndCheckSanity_impl(VersionedBTree* self) {
ASSERT(g_network->isSimulated());
debug_printf("Clearing tree.\n");
self->setWriteVersion(self->getLatestVersion() + 1);
self->clear(KeyRangeRef(dbBegin.key, dbEnd.key));
wait(self->commit());
// Loop commits until the the lazy delete queue is completely processed.
loop {
wait(self->commit());
wait(self->commit(self->getLastCommittedVersion() + 1));
// If the lazy delete queue is completely processed then the last time the lazy delete actor
// was started it, after the last commit, it would exist immediately and do no work, so its
@ -4712,14 +4686,12 @@ public:
if (self->m_lazyClearActor.isReady() && self->m_lazyClearActor.get() == 0) {
break;
}
self->setWriteVersion(self->getLatestVersion() + 1);
}
// Forget all but the latest version of the tree.
debug_printf("Discarding all old versions.\n");
self->setOldestVersion(self->getLastCommittedVersion());
self->setWriteVersion(self->getLatestVersion() + 1);
wait(self->commit());
self->setOldestReadableVersion(self->getLastCommittedVersion());
wait(self->commit(self->getLastCommittedVersion() + 1));
// The lazy delete queue should now be empty and contain only the new page to start writing to
// on the next commit.
@ -4953,11 +4925,17 @@ private:
*/
IPager2* m_pager;
MutationBuffer* m_pBuffer;
std::map<Version, MutationBuffer> m_mutationBuffers;
Version m_writeVersion;
Version m_lastCommittedVersion;
// The mutation buffer currently being written to
std::unique_ptr<MutationBuffer> m_pBuffer;
struct CommitBatch {
Version readVersion;
Version writeVersion;
std::unique_ptr<MutationBuffer> mutations;
Reference<IPagerSnapshot> snapshot;
};
Version m_newOldestVersion;
Future<Void> m_latestCommit;
Future<Void> m_init;
@ -5831,8 +5809,7 @@ private:
ACTOR static Future<Void> commitSubtree(
VersionedBTree* self,
Reference<IPagerSnapshot> snapshot,
MutationBuffer* mutationBuffer,
CommitBatch* batch,
BTreePageIDRef rootID,
int height,
MutationBuffer::const_iterator mBegin, // greatest mutation boundary <= subtreeLowerBound->key
@ -5861,8 +5838,7 @@ private:
}
state Reference<const ArenaPage> page =
wait(readPage(PagerEventReasons::Commit, height, snapshot, rootID, height, false, true));
state Version writeVersion = self->getLastCommittedVersion() + 1;
wait(readPage(PagerEventReasons::Commit, height, batch->snapshot, rootID, height, false, true));
// If the page exists in the cache, it must be copied before modification.
// That copy will be referenced by pageCopy, as page must stay in scope in case anything references its
@ -5883,7 +5859,9 @@ private:
debug_printf(
"%s commitSubtree(): %s\n",
context.c_str(),
btPage->toString(false, rootID, snapshot->getVersion(), update->decodeLowerBound, update->decodeUpperBound)
btPage
->toString(
false, rootID, batch->snapshot->getVersion(), update->decodeLowerBound, update->decodeUpperBound)
.c_str());
state BTreePage::BinaryTree::Cursor cursor =
@ -6135,20 +6113,18 @@ private:
context.c_str());
}
writeVersion = self->getLastCommittedVersion() + 1;
if (updating) {
// If the tree is now empty, delete the page
if (cursor.tree->numItems == 0) {
update->cleared();
self->freeBTreePage(rootID, writeVersion);
self->freeBTreePage(rootID, batch->writeVersion);
debug_printf("%s Page updates cleared all entries, returning %s\n",
context.c_str(),
toString(*update).c_str());
} else {
// Otherwise update it.
BTreePageIDRef newID = wait(self->updateBTreePage(
self, rootID, &update->newLinks.arena(), pageCopy.castTo<ArenaPage>(), writeVersion));
self, rootID, &update->newLinks.arena(), pageCopy.castTo<ArenaPage>(), batch->writeVersion));
update->updatedInPlace(newID, btPage, newID.size() * self->m_blockSize);
debug_printf(
@ -6160,7 +6136,7 @@ private:
// If everything in the page was deleted then this page should be deleted as of the new version
if (merged.empty()) {
update->cleared();
self->freeBTreePage(rootID, writeVersion);
self->freeBTreePage(rootID, batch->writeVersion);
debug_printf("%s All leaf page contents were cleared, returning %s\n",
context.c_str(),
@ -6169,8 +6145,13 @@ private:
}
// Rebuild new page(s).
state Standalone<VectorRef<RedwoodRecordRef>> entries = wait(writePages(
self, &update->subtreeLowerBound, &update->subtreeUpperBound, merged, height, writeVersion, rootID));
state Standalone<VectorRef<RedwoodRecordRef>> entries = wait(writePages(self,
&update->subtreeLowerBound,
&update->subtreeUpperBound,
merged,
height,
batch->writeVersion,
rootID));
// Put new links into update and tell update that pages were rebuilt
update->rebuilt(entries);
@ -6241,7 +6222,7 @@ private:
u.skipLen = 0; // TODO: set this
// Find the mutation buffer range that includes all changes to the range described by u
mEnd = mutationBuffer->lower_bound(u.subtreeUpperBound.key);
mEnd = batch->mutations->lower_bound(u.subtreeUpperBound.key);
// If the mutation range described by mBegin extends to mEnd, then see if the part of that range
// that overlaps with u's subtree range is being fully cleared or fully unchanged.
@ -6315,13 +6296,13 @@ private:
debug_printf("%s: freeing child page in cleared subtree range: %s\n",
context.c_str(),
::toString(rec.getChildPage()).c_str());
self->freeBTreePage(rec.getChildPage(), writeVersion);
self->freeBTreePage(rec.getChildPage(), batch->writeVersion);
} else {
debug_printf("%s: queuing subtree deletion cleared subtree range: %s\n",
context.c_str(),
::toString(rec.getChildPage()).c_str());
self->m_lazyClearQueue.pushBack(LazyClearQueueEntry{
(uint8_t)(height - 1), writeVersion, rec.getChildPage() });
(uint8_t)(height - 1), batch->writeVersion, rec.getChildPage() });
}
}
c.moveNext();
@ -6341,8 +6322,7 @@ private:
}
// If this page has height of 2 then its children are leaf nodes
recursions.push_back(
self->commitSubtree(self, snapshot, mutationBuffer, pageID, height - 1, mBegin, mEnd, &u));
recursions.push_back(self->commitSubtree(self, batch, pageID, height - 1, mBegin, mEnd, &u));
}
debug_printf(
@ -6419,7 +6399,7 @@ private:
debug_printf("%s All internal page children were deleted so deleting this page too, returning %s\n",
context.c_str(),
toString(*update).c_str());
self->freeBTreePage(rootID, writeVersion);
self->freeBTreePage(rootID, batch->writeVersion);
self->childUpdateTracker.erase(rootID.front());
} else {
if (modifier.updating) {
@ -6439,7 +6419,8 @@ private:
if (cursor.get().value.present()) {
for (auto& p : cursor.get().getChildPage()) {
if (parentInfo->maybeUpdated(p)) {
LogicalPageID newID = self->m_pager->detachRemappedPage(p, writeVersion);
LogicalPageID newID =
self->m_pager->detachRemappedPage(p, batch->writeVersion);
if (newID != invalidLogicalPageID) {
debug_printf("%s Detach updated %u -> %u\n", context.c_str(), p, newID);
p = newID;
@ -6460,16 +6441,19 @@ private:
}
}
BTreePageIDRef newID = wait(self->updateBTreePage(
self, rootID, &update->newLinks.arena(), pageCopy.castTo<ArenaPage>(), writeVersion));
BTreePageIDRef newID = wait(self->updateBTreePage(self,
rootID,
&update->newLinks.arena(),
pageCopy.castTo<ArenaPage>(),
batch->writeVersion));
debug_printf(
"%s commitSubtree(): Internal page updated in-place at version %s, new contents: %s\n",
context.c_str(),
toString(writeVersion).c_str(),
toString(batch->writeVersion).c_str(),
btPage
->toString(false,
newID,
snapshot->getVersion(),
batch->snapshot->getVersion(),
update->decodeLowerBound,
update->decodeUpperBound)
.c_str());
@ -6492,7 +6476,8 @@ private:
for (int i = 0; i < oldPages.size(); ++i) {
LogicalPageID p = oldPages[i];
if (parentInfo->maybeUpdated(p)) {
LogicalPageID newID = self->m_pager->detachRemappedPage(p, writeVersion);
LogicalPageID newID =
self->m_pager->detachRemappedPage(p, batch->writeVersion);
if (newID != invalidLogicalPageID) {
// Rebuild record values reference original page memory so make a copy
if (newPages.empty()) {
@ -6516,7 +6501,7 @@ private:
&update->subtreeUpperBound,
modifier.rebuild,
height,
writeVersion,
batch->writeVersion,
rootID));
update->rebuilt(newChildEntries);
@ -6531,16 +6516,12 @@ private:
}
}
ACTOR static Future<Void> commit_impl(VersionedBTree* self) {
state MutationBuffer* mutations = self->m_pBuffer;
// No more mutations are allowed to be written to this mutation buffer we will commit
// at m_writeVersion, which we must save locally because it could change during commit.
self->m_pBuffer = nullptr;
state Version writeVersion = self->m_writeVersion;
// The latest mutation buffer start version is the one we will now (or eventually) commit.
state Version mutationBufferStartVersion = self->m_mutationBuffers.rbegin()->first;
ACTOR static Future<Void> commit_impl(VersionedBTree* self, Version writeVersion) {
// Take ownership of the current mutation buffer and make a new one
state CommitBatch batch;
batch.mutations = std::move(self->m_pBuffer);
self->m_pBuffer.reset(new MutationBuffer());
batch.writeVersion = writeVersion;
// Replace the lastCommit future with a new one and then wait on the old one
state Promise<Void> committed;
@ -6550,15 +6531,18 @@ private:
// Wait for the latest commit to be finished.
wait(previousCommit);
self->m_pager->setOldestVersion(self->m_newOldestVersion);
debug_printf("%s: Beginning commit of version %" PRId64 ", new oldest version set to %" PRId64 "\n",
// For this commit, use the latest snapshot that was just committed.
batch.readVersion = self->m_pager->getLastCommittedVersion();
self->m_pager->setOldestReadableVersion(self->m_newOldestVersion);
debug_printf("%s: Beginning commit of version %" PRId64 ", read version %" PRId64
", new oldest version set to %" PRId64 "\n",
self->m_name.c_str(),
writeVersion,
batch.writeVersion,
batch.readVersion,
self->m_newOldestVersion);
// Get the latest version from the pager, which is what we will read at
state Version latestVersion = self->m_pager->getLatestVersion();
debug_printf("%s: pager latestVersion %" PRId64 "\n", self->m_name.c_str(), latestVersion);
batch.snapshot = self->m_pager->getReadSnapshot(batch.readVersion);
state Standalone<BTreePageIDRef> rootPageID = self->m_pHeader->root.get();
state InternalPageSliceUpdate all;
@ -6569,18 +6553,11 @@ private:
all.decodeUpperBound = dbEnd;
all.skipLen = 0;
MutationBuffer::const_iterator mBegin = mutations->upper_bound(all.subtreeLowerBound.key);
MutationBuffer::const_iterator mBegin = batch.mutations->upper_bound(all.subtreeLowerBound.key);
--mBegin;
MutationBuffer::const_iterator mEnd = mutations->lower_bound(all.subtreeUpperBound.key);
MutationBuffer::const_iterator mEnd = batch.mutations->lower_bound(all.subtreeUpperBound.key);
wait(commitSubtree(self,
self->m_pager->getReadSnapshot(latestVersion),
mutations,
rootPageID,
self->m_pHeader->height,
mBegin,
mEnd,
&all));
wait(commitSubtree(self, &batch, rootPageID, self->m_pHeader->height, mBegin, mEnd, &all));
// If the old root was deleted, write a new empty tree root node and free the old roots
if (all.childrenChanged) {
@ -6599,7 +6576,7 @@ private:
} else {
// If the new root level's size is not 1 then build new root level(s)
Standalone<VectorRef<RedwoodRecordRef>> newRootPage =
wait(buildNewRoot(self, latestVersion, newRootRecords, self->m_pHeader->height));
wait(buildNewRoot(self, batch.writeVersion, newRootRecords, self->m_pHeader->height));
rootPageID = newRootPage.front().getChildPage();
}
}
@ -6612,8 +6589,6 @@ private:
wait(success(self->m_lazyClearActor));
debug_printf("Lazy delete freed %u pages\n", self->m_lazyClearActor.get());
self->m_pager->setCommitVersion(writeVersion);
wait(self->m_lazyClearQueue.flush());
self->m_pHeader->lazyDeleteQueue = self->m_lazyClearQueue.getState();
@ -6621,15 +6596,9 @@ private:
self->m_pager->setMetaKey(self->m_pHeader->asKeyRef());
debug_printf("%s: Committing pager %" PRId64 "\n", self->m_name.c_str(), writeVersion);
wait(self->m_pager->commit());
wait(self->m_pager->commit(writeVersion));
debug_printf("%s: Committed version %" PRId64 "\n", self->m_name.c_str(), writeVersion);
// Now that everything is committed we must delete the mutation buffer.
// Our buffer's start version should be the oldest mutation buffer version in the map.
ASSERT(mutationBufferStartVersion == self->m_mutationBuffers.begin()->first);
self->m_mutationBuffers.erase(self->m_mutationBuffers.begin());
self->m_lastCommittedVersion = writeVersion;
++g_redwoodMetrics.metric.opCommit;
self->m_lazyClearActor = incrementalLazyClear(self);
@ -6742,7 +6711,7 @@ public:
path.clear();
path.reserve(6);
valid = false;
return pushPage(root);
return root.empty() ? Void() : pushPage(root);
}
// Seeks cursor to query if it exists, the record before or after it, or an undefined and invalid
@ -6754,7 +6723,6 @@ public:
// If there is a record in the tree > query then moveNext() will move to it.
// If non-zero is returned then the cursor is valid and the return value is logically equivalent
// to query.compare(cursor.get())
ACTOR Future<int> seek_impl(BTreeCursor* self, RedwoodRecordRef query) {
state RedwoodRecordRef internalPageQuery = query.withMaxPageID();
self->path.resize(1);
@ -6790,7 +6758,7 @@ public:
}
}
Future<int> seek(RedwoodRecordRef query) { return seek_impl(this, query); }
Future<int> seek(RedwoodRecordRef query) { return path.empty() ? 0 : seek_impl(this, query); }
ACTOR Future<Void> seekGTE_impl(BTreeCursor* self, RedwoodRecordRef query) {
debug_printf("seekGTE(%s) start\n", query.toString().c_str());
@ -6930,19 +6898,18 @@ public:
return Void();
}
Future<Void> moveNext() { return move_impl(this, true); }
Future<Void> movePrev() { return move_impl(this, false); }
Future<Void> moveNext() { return path.empty() ? Void() : move_impl(this, true); }
Future<Void> movePrev() { return path.empty() ? Void() : move_impl(this, false); }
};
Future<Void> initBTreeCursor(BTreeCursor* cursor, Version snapshotVersion, PagerEventReasons reason) {
// Only committed versions can be read.
ASSERT(snapshotVersion <= m_lastCommittedVersion);
Reference<IPagerSnapshot> snapshot = m_pager->getReadSnapshot(snapshotVersion);
// This is a ref because snapshot will continue to hold the metakey value memory
KeyRef m = snapshot->getMetaKey();
return cursor->init(this, reason, snapshot, ((MetaKey*)m.begin())->root.get());
return cursor->init(
this, reason, snapshot, m.size() == 0 ? BTreePageIDRef() : ((MetaKey*)m.begin())->root.get());
}
};
@ -6951,9 +6918,9 @@ public:
RedwoodRecordRef VersionedBTree::dbBegin(LiteralStringRef(""));
RedwoodRecordRef VersionedBTree::dbEnd(LiteralStringRef("\xff\xff\xff\xff\xff"));
class KeyValueStoreRedwoodUnversioned : public IKeyValueStore {
class KeyValueStoreRedwood : public IKeyValueStore {
public:
KeyValueStoreRedwoodUnversioned(std::string filePrefix, UID logID)
KeyValueStoreRedwood(std::string filePrefix, UID logID)
: m_filePrefix(filePrefix), m_concurrentReads(SERVER_KNOBS->REDWOOD_KVSTORE_CONCURRENT_READS, 0),
prefetch(SERVER_KNOBS->REDWOOD_KVSTORE_RANGE_PREFETCH) {
@ -6966,7 +6933,8 @@ public:
: FLOW_KNOBS->SIM_PAGE_CACHE_4K)
: FLOW_KNOBS->PAGE_CACHE_4K;
Version remapCleanupWindow =
BUGGIFY ? deterministicRandom()->randomInt64(0, 1000) : SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_WINDOW;
SERVER_KNOBS->VERSIONS_PER_SECOND *
(BUGGIFY ? deterministicRandom()->randomInt64(0, 100) : SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_WINDOW_SECONDS);
IPager2* pager = new DWALPager(pageSize,
extentSize,
@ -6982,16 +6950,17 @@ public:
Future<Void> init() override { return m_init; }
ACTOR Future<Void> init_impl(KeyValueStoreRedwoodUnversioned* self) {
ACTOR Future<Void> init_impl(KeyValueStoreRedwood* self) {
TraceEvent(SevInfo, "RedwoodInit").detail("FilePrefix", self->m_filePrefix);
wait(self->m_tree->init());
Version v = self->m_tree->getLatestVersion();
self->m_tree->setWriteVersion(v + 1);
TraceEvent(SevInfo, "RedwoodInitComplete").detail("FilePrefix", self->m_filePrefix);
self->m_nextCommitVersion = self->m_tree->getLastCommittedVersion() + 1;
TraceEvent(SevInfo, "RedwoodInitComplete")
.detail("FilePrefix", self->m_filePrefix)
.detail("Version", self->m_tree->getLastCommittedVersion());
return Void();
}
ACTOR void shutdown(KeyValueStoreRedwoodUnversioned* self, bool dispose) {
ACTOR void shutdown(KeyValueStoreRedwood* self, bool dispose) {
TraceEvent(SevInfo, "RedwoodShutdown").detail("FilePrefix", self->m_filePrefix).detail("Dispose", dispose);
if (self->m_error.canBeSet()) {
self->m_error.sendError(actor_cancelled()); // Ideally this should be shutdown_in_progress
@ -7017,9 +6986,10 @@ public:
Future<Void> onClosed() override { return m_closed.getFuture(); }
Future<Void> commit(bool sequential = false) override {
Future<Void> c = m_tree->commit();
m_tree->setOldestVersion(m_tree->getLatestVersion());
m_tree->setWriteVersion(m_tree->getWriteVersion() + 1);
Future<Void> c = m_tree->commit(m_nextCommitVersion);
// Currently not keeping history
m_tree->setOldestReadableVersion(m_nextCommitVersion);
++m_nextCommitVersion;
return catchError(c);
}
@ -7044,7 +7014,7 @@ public:
return catchError(readRange_impl(this, keys, rowLimit, byteLimit));
}
ACTOR static Future<RangeResult> readRange_impl(KeyValueStoreRedwoodUnversioned* self,
ACTOR static Future<RangeResult> readRange_impl(KeyValueStoreRedwood* self,
KeyRange keys,
int rowLimit,
int byteLimit) {
@ -7168,9 +7138,7 @@ public:
return result;
}
ACTOR static Future<Optional<Value>> readValue_impl(KeyValueStoreRedwoodUnversioned* self,
Key key,
Optional<UID> debugID) {
ACTOR static Future<Optional<Value>> readValue_impl(KeyValueStoreRedwood* self, Key key, Optional<UID> debugID) {
state VersionedBTree::BTreeCursor cur;
wait(
self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion(), PagerEventReasons::PointRead));
@ -7206,7 +7174,7 @@ public:
}));
}
~KeyValueStoreRedwoodUnversioned() override{};
~KeyValueStoreRedwood() override{};
private:
std::string m_filePrefix;
@ -7216,6 +7184,7 @@ private:
Promise<Void> m_error;
PriorityMultiLock m_concurrentReads;
bool prefetch;
Version m_nextCommitVersion;
template <typename T>
inline Future<T> catchError(Future<T> f) {
@ -7224,7 +7193,7 @@ private:
};
IKeyValueStore* keyValueStoreRedwoodV1(std::string const& filename, UID logID) {
return new KeyValueStoreRedwoodUnversioned(filename, logID);
return new KeyValueStoreRedwood(filename, logID);
}
int randomSize(int max) {
@ -7396,7 +7365,7 @@ ACTOR Future<int> verifyRangeBTreeCursor(VersionedBTree* btree,
// Randomly use a new cursor at the same version for the reverse range read, if the version is still available for
// opening new cursors
if (v >= btree->getOldestVersion() && deterministicRandom()->coinflip()) {
if (v >= btree->getOldestReadableVersion() && deterministicRandom()->coinflip()) {
cur = VersionedBTree::BTreeCursor();
wait(btree->initBTreeCursor(&cur, v, PagerEventReasons::RangeRead));
}
@ -7535,7 +7504,7 @@ ACTOR Future<Void> verify(VersionedBTree* btree,
committedVersions.push_back(v);
// Remove expired versions
while (!committedVersions.empty() && committedVersions.front() < btree->getOldestVersion()) {
while (!committedVersions.empty() && committedVersions.front() < btree->getOldestReadableVersion()) {
committedVersions.pop_front();
}
@ -8920,11 +8889,10 @@ TEST_CASE("/redwood/correctness/btree") {
state std::map<std::pair<std::string, Version>, Optional<std::string>> written;
state std::set<Key> keys;
state Version lastVer = btree->getLatestVersion();
state Version lastVer = btree->getLastCommittedVersion();
printf("Starting from version: %" PRId64 "\n", lastVer);
state Version version = lastVer + 1;
btree->setWriteVersion(version);
state SimpleCounter mutationBytes;
state SimpleCounter keyBytesInserted;
@ -8948,7 +8916,6 @@ TEST_CASE("/redwood/correctness/btree") {
// Sometimes increment the version
if (deterministicRandom()->random01() < 0.10) {
++version;
btree->setWriteVersion(version);
}
// Sometimes do a clear range
@ -9059,17 +9026,16 @@ TEST_CASE("/redwood/correctness/btree") {
keyBytesCleared.rate() / 1e6,
mutationBytes.rate() / 1e6);
Version v = version; // Avoid capture of version as a member of *this
// Sometimes advance the oldest version to close the gap between the oldest and latest versions by a random
// amount.
if (deterministicRandom()->random01() < advanceOldVersionProbability) {
btree->setOldestVersion(btree->getLastCommittedVersion() -
deterministicRandom()->randomInt64(
0, btree->getLastCommittedVersion() - btree->getOldestVersion() + 1));
btree->setOldestReadableVersion(
btree->getLastCommittedVersion() -
deterministicRandom()->randomInt64(
0, btree->getLastCommittedVersion() - btree->getOldestReadableVersion() + 1));
}
commit = map(btree->commit(), [=, &ops = totalPageOps](Void) {
commit = map(btree->commit(version), [=, &ops = totalPageOps, v = version](Void) {
// Update pager ops before clearing metrics
ops += g_redwoodMetrics.pageOps();
printf("Committed %s PageOps %" PRId64 "/%" PRId64 " (%.2f%%) VerificationMapEntries %d/%d (%.2f%%)\n",
@ -9124,7 +9090,7 @@ TEST_CASE("/redwood/correctness/btree") {
btree = new VersionedBTree(pager, fileName);
wait(btree->init());
Version v = btree->getLatestVersion();
Version v = btree->getLastCommittedVersion();
printf("Recovered from disk. Latest recovered version %" PRId64 " highest written version %" PRId64
"\n",
v,
@ -9141,7 +9107,6 @@ TEST_CASE("/redwood/correctness/btree") {
}
version += versionIncrement;
btree->setWriteVersion(version);
}
// Check for errors
@ -9177,7 +9142,7 @@ TEST_CASE("/redwood/correctness/btree") {
}
ACTOR Future<Void> randomSeeks(VersionedBTree* btree, int count, char firstChar, char lastChar) {
state Version readVer = btree->getLatestVersion();
state Version readVer = btree->getLastCommittedVersion();
state int c = 0;
state double readStart = timer();
state VersionedBTree::BTreeCursor cur;
@ -9198,7 +9163,7 @@ ACTOR Future<Void> randomScans(VersionedBTree* btree,
int prefetchBytes,
char firstChar,
char lastChar) {
state Version readVer = btree->getLatestVersion();
state Version readVer = btree->getLastCommittedVersion();
state int c = 0;
state double readStart = timer();
state VersionedBTree::BTreeCursor cur;
@ -9251,7 +9216,7 @@ TEST_CASE(":/redwood/correctness/pager/cow") {
memset(p->mutate(), (char)id, p->size());
pager->updatePage(PagerEventReasons::MetaData, nonBtreeLevel, id, p);
pager->setMetaKey(LiteralStringRef("asdfasdf"));
wait(pager->commit());
wait(pager->commit(pager->getLastCommittedVersion() + 1));
Reference<ArenaPage> p2 =
wait(pager->readPage(PagerEventReasons::PointRead, nonBtreeLevel, id, ioMinPriority, true, false));
printf("%s\n", StringRef(p2->begin(), p2->size()).toHexString().c_str());
@ -9332,7 +9297,7 @@ TEST_CASE(":/redwood/performance/extentQueue") {
cumulativeCommitSize,
pager->getPageCacheCount());
wait(m_extentQueue.flush());
wait(pager->commit());
wait(pager->commit(pager->getLastCommittedVersion() + 1));
cumulativeCommitSize += currentCommitSize;
targetCommitSize = deterministicRandom()->randomInt(2e6, 30e6);
currentCommitSize = 0;
@ -9355,7 +9320,7 @@ TEST_CASE(":/redwood/performance/extentQueue") {
extentQueueState = m_extentQueue.getState();
printf("Commit ExtentQueue getState(): %s\n", extentQueueState.toString().c_str());
pager->setMetaKey(extentQueueState.asKeyRef());
wait(pager->commit());
wait(pager->commit(pager->getLastCommittedVersion() + 1));
Future<Void> onClosed = pager->onClosed();
pager->close();
@ -9450,8 +9415,7 @@ TEST_CASE(":/redwood/performance/set") {
state int maxConsecutiveRun = params.getInt("maxConsecutiveRun").orDefault(100);
state char firstKeyChar = params.get("firstKeyChar").orDefault("a")[0];
state char lastKeyChar = params.get("lastKeyChar").orDefault("m")[0];
state Version remapCleanupWindow =
params.getInt("remapCleanupWindow").orDefault(SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_WINDOW);
state Version remapCleanupWindow = params.getInt("remapCleanupWindow").orDefault(100);
state int concurrentExtentReads =
params.getInt("concurrentExtentReads").orDefault(SERVER_KNOBS->REDWOOD_EXTENT_CONCURRENT_READS);
state bool openExisting = params.getInt("openExisting").orDefault(0);
@ -9521,9 +9485,8 @@ TEST_CASE(":/redwood/performance/set") {
if (insertRecords) {
while (kvBytesTotal < kvBytesTarget) {
Version lastVer = btree->getLatestVersion();
Version lastVer = btree->getLastCommittedVersion();
state Version version = lastVer + 1;
btree->setWriteVersion(version);
state int changesThisVersion =
deterministicRandom()->randomInt(0, maxRecordsPerCommit - recordsThisCommit + 1);
@ -9557,7 +9520,7 @@ TEST_CASE(":/redwood/performance/set") {
}
if (kvBytesThisCommit >= maxKVBytesPerCommit || recordsThisCommit >= maxRecordsPerCommit) {
btree->setOldestVersion(btree->getLastCommittedVersion());
btree->setOldestReadableVersion(btree->getLastCommittedVersion());
wait(commit);
printf("Cumulative %.2f MB keyValue bytes written at %.2f MB/s\n",
kvBytesTotal / 1e6,
@ -9571,7 +9534,7 @@ TEST_CASE(":/redwood/performance/set") {
// actor state object
double* pIntervalStart = &intervalStart;
commit = map(btree->commit(), [=](Void result) {
commit = map(btree->commit(version), [=](Void result) {
if (!traceMetrics) {
printf("%s\n", g_redwoodMetrics.toString(true).c_str());
}

View File

@ -849,6 +849,7 @@ public:
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), maxQueryQueue(0), counters(this) {
version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id);
oldestVersion.initMetric(LiteralStringRef("StorageServer.OldestVersion"), counters.cc.id);
durableVersion.initMetric(LiteralStringRef("StorageServer.DurableVersion"), counters.cc.id);
@ -3411,7 +3412,6 @@ static const KeyRef persistTssPairID = LiteralStringRef(PERSIST_PREFIX "tssPairI
static const KeyRef persistTssQuarantine = LiteralStringRef(PERSIST_PREFIX "tssQ");
// (Potentially) change with the durable version or when fetchKeys completes
static const KeyRef persistVersion = LiteralStringRef(PERSIST_PREFIX "Version");
static const KeyRangeRef persistShardAssignedKeys =
KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "ShardAssigned/"), LiteralStringRef(PERSIST_PREFIX "ShardAssigned0"));
static const KeyRangeRef persistShardAvailableKeys =
@ -4115,7 +4115,7 @@ void StorageServerDisk::makeNewStorageServerDurable() {
if (data->tssPairID.present()) {
storage->set(KeyValueRef(persistTssPairID, BinaryWriter::toValue(data->tssPairID.get(), Unversioned())));
}
storage->set(KeyValueRef(persistVersion, BinaryWriter::toValue(data->version.get(), Unversioned())));
storage->setCommitVersion(data->version.get());
storage->set(KeyValueRef(persistShardAssignedKeys.begin.toString(), LiteralStringRef("0")));
storage->set(KeyValueRef(persistShardAvailableKeys.begin.toString(), LiteralStringRef("0")));
}
@ -4220,7 +4220,7 @@ bool StorageServerDisk::makeVersionMutationsDurable(Version& prevStorageVersion,
// Update data->storage to persist the changes from (data->storageVersion(),version]
void StorageServerDisk::makeVersionDurable(Version version) {
storage->set(KeyValueRef(persistVersion, BinaryWriter::toValue(version, Unversioned())));
storage->setCommitVersion(version);
// TraceEvent("MakeDurable", data->thisServerID)
// .detail("FromVersion", prevStorageVersion)
@ -4349,7 +4349,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
state Future<Optional<Value>> fID = storage->readValue(persistID);
state Future<Optional<Value>> ftssPairID = storage->readValue(persistTssPairID);
state Future<Optional<Value>> fTssQuarantine = storage->readValue(persistTssQuarantine);
state Future<Optional<Value>> fVersion = storage->readValue(persistVersion);
state Future<Version> fVersion = storage->getCommittedVersion();
state Future<Optional<Value>> fLogProtocol = storage->readValue(persistLogProtocol);
state Future<Optional<Value>> fPrimaryLocality = storage->readValue(persistPrimaryLocality);
state Future<RangeResult> fShardAssigned = storage->readRange(persistShardAssignedKeys);
@ -4361,7 +4361,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
restoreByteSample(data, storage, byteSampleSampleRecovered, startByteSampleRestore.getFuture());
TraceEvent("ReadingDurableState", data->thisServerID).log();
wait(waitForAll(std::vector{ fFormat, fID, ftssPairID, fTssQuarantine, fVersion, fLogProtocol, fPrimaryLocality }));
wait(waitForAll(std::vector{ fFormat, fID, ftssPairID, fTssQuarantine, fLogProtocol, fPrimaryLocality }));
wait(waitForAll(std::vector{ fShardAssigned, fShardAvailable }));
wait(byteSampleSampleRecovered.getFuture());
TraceEvent("RestoringDurableState", data->thisServerID).log();
@ -4402,7 +4402,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
if (fPrimaryLocality.get().present())
data->primaryLocality = BinaryReader::fromStringRef<int8_t>(fPrimaryLocality.get().get(), Unversioned());
state Version version = BinaryReader::fromStringRef<Version>(fVersion.get().get(), Unversioned());
state Version version = wait(fVersion);
debug_checkRestoredVersion(data->thisServerID, version, "StorageServer");
data->setInitialVersion(version);

View File

@ -78,6 +78,7 @@ ERROR( wrong_connection_file, 1054, "Connection file mismatch")
ERROR( version_already_compacted, 1055, "The requested changes have been compacted away")
ERROR( local_config_changed, 1056, "Local configuration file has changed. Restart and apply these changes" )
ERROR( failed_to_reach_quorum, 1057, "Failed to reach quorum from configuration database nodes. Retry sending these requests" )
ERROR( wrong_format_version, 1058, "Format version not recognize." )
ERROR( broken_promise, 1100, "Broken promise" )
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )