Fine-grained interleaving
This commit is contained in:
parent
34d1d04904
commit
b610f01c77
|
@ -549,6 +549,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
||||||
init( FETCH_KEYS_LOWER_PRIORITY, 0 );
|
init( FETCH_KEYS_LOWER_PRIORITY, 0 );
|
||||||
init( BUGGIFY_BLOCK_BYTES, 10000 );
|
init( BUGGIFY_BLOCK_BYTES, 10000 );
|
||||||
init( STORAGE_COMMIT_BYTES, 10000000 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_BYTES = 2000000;
|
init( STORAGE_COMMIT_BYTES, 10000000 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_BYTES = 2000000;
|
||||||
|
init( STORAGE_COMMIT_PIPELINE_BYTES_PER_YIELD, 100000 );
|
||||||
init( STORAGE_DURABILITY_LAG_REJECT_THRESHOLD, 0.25 );
|
init( STORAGE_DURABILITY_LAG_REJECT_THRESHOLD, 0.25 );
|
||||||
init( STORAGE_DURABILITY_LAG_MIN_RATE, 0.1 );
|
init( STORAGE_DURABILITY_LAG_MIN_RATE, 0.1 );
|
||||||
init( STORAGE_COMMIT_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_INTERVAL = 2.0;
|
init( STORAGE_COMMIT_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_INTERVAL = 2.0;
|
||||||
|
|
|
@ -481,6 +481,7 @@ public:
|
||||||
double STORAGE_DURABILITY_LAG_MIN_RATE;
|
double STORAGE_DURABILITY_LAG_MIN_RATE;
|
||||||
int STORAGE_COMMIT_BYTES;
|
int STORAGE_COMMIT_BYTES;
|
||||||
double STORAGE_COMMIT_INTERVAL;
|
double STORAGE_COMMIT_INTERVAL;
|
||||||
|
int STORAGE_COMMIT_PIPELINE_BYTES_PER_YIELD;
|
||||||
double UPDATE_SHARD_VERSION_INTERVAL;
|
double UPDATE_SHARD_VERSION_INTERVAL;
|
||||||
int BYTE_SAMPLING_FACTOR;
|
int BYTE_SAMPLING_FACTOR;
|
||||||
int BYTE_SAMPLING_OVERHEAD;
|
int BYTE_SAMPLING_OVERHEAD;
|
||||||
|
|
|
@ -736,8 +736,7 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR
|
||||||
if (deterministicRandom()->random01() < 0.25) db.proxyCount = deterministicRandom()->randomInt(1, 7);
|
if (deterministicRandom()->random01() < 0.25) db.proxyCount = deterministicRandom()->randomInt(1, 7);
|
||||||
if (deterministicRandom()->random01() < 0.25) db.grvProxyCount = deterministicRandom()->randomInt(1, 4);
|
if (deterministicRandom()->random01() < 0.25) db.grvProxyCount = deterministicRandom()->randomInt(1, 4);
|
||||||
if (deterministicRandom()->random01() < 0.25) db.resolverCount = deterministicRandom()->randomInt(1,7);
|
if (deterministicRandom()->random01() < 0.25) db.resolverCount = deterministicRandom()->randomInt(1,7);
|
||||||
// int storage_engine_type = deterministicRandom()->randomInt(0, 4);
|
int storage_engine_type = deterministicRandom()->randomInt(0, 4);
|
||||||
int storage_engine_type = 3;
|
|
||||||
switch (storage_engine_type) {
|
switch (storage_engine_type) {
|
||||||
case 0: {
|
case 0: {
|
||||||
TEST(true); // Simulated cluster using ssd storage engine
|
TEST(true); // Simulated cluster using ssd storage engine
|
||||||
|
|
|
@ -154,8 +154,8 @@ struct StorageServerDisk {
|
||||||
explicit StorageServerDisk( struct StorageServer* data, IKeyValueStore* storage ) : data(data), storage(storage) {}
|
explicit StorageServerDisk( struct StorageServer* data, IKeyValueStore* storage ) : data(data), storage(storage) {}
|
||||||
|
|
||||||
void makeNewStorageServerDurable();
|
void makeNewStorageServerDurable();
|
||||||
bool makeVersionMutationsDurable( Version& prevStorageVersion, Version newStorageVersion, int64_t& bytesLeft );
|
// Asyncronously move data from mutation log into SE's commit buffer for next commit.
|
||||||
Future<int64_t> asyncMakeVersionMutationsDurable(Version startStorageVersion, Version desiredVersion, int64_t bytesLeft);
|
Future<bool> asyncPrepareVersionsForCommit(Version desiredOldestVersion, Future<Void> durable, Future<Void>durableDelay);
|
||||||
void makeVersionDurable( Version version );
|
void makeVersionDurable( Version version );
|
||||||
Future<bool> restoreDurableState();
|
Future<bool> restoreDurableState();
|
||||||
|
|
||||||
|
@ -180,13 +180,13 @@ struct StorageServerDisk {
|
||||||
std::tuple<size_t, size_t, size_t> getSize() const { return storage->getSize(); }
|
std::tuple<size_t, size_t, size_t> getSize() const { return storage->getSize(); }
|
||||||
|
|
||||||
bool canPipelineCommits() const {return storage->canPipelineCommits();}
|
bool canPipelineCommits() const {return storage->canPipelineCommits();}
|
||||||
|
void set(KeyValueRef kv) { storage->set(kv);}
|
||||||
|
void clear(KeyRangeRef kr) { storage->clear(kr);}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct StorageServer* data;
|
struct StorageServer* data;
|
||||||
IKeyValueStore* storage;
|
IKeyValueStore* storage;
|
||||||
|
|
||||||
void writeMutations(const VectorRef<MutationRef>& mutations, Version debugVersion, const char* debugContext);
|
|
||||||
|
|
||||||
ACTOR static Future<Key> readFirstKey( IKeyValueStore* storage, KeyRangeRef range ) {
|
ACTOR static Future<Key> readFirstKey( IKeyValueStore* storage, KeyRangeRef range ) {
|
||||||
Standalone<RangeResultRef> r = wait( storage->readRange( range, 1 ) );
|
Standalone<RangeResultRef> r = wait( storage->readRange( range, 1 ) );
|
||||||
if (r.size()) return r[0].key;
|
if (r.size()) return r[0].key;
|
||||||
|
@ -3076,101 +3076,36 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
||||||
state Promise<Void> durableInProgress;
|
state Promise<Void> durableInProgress;
|
||||||
data->durableInProgress = durableInProgress.getFuture();
|
data->durableInProgress = durableInProgress.getFuture();
|
||||||
|
|
||||||
state Version startOldestVersion = data->storageVersion();
|
state Version desiredOldestVersion = data->desiredOldestVersion.get();
|
||||||
state Version newOldestVersion = data->storageVersion();
|
|
||||||
state Version desiredVersion = data->desiredOldestVersion.get();
|
|
||||||
state int64_t bytesLeft = SERVER_KNOBS->STORAGE_COMMIT_BYTES;
|
|
||||||
|
|
||||||
state Future<Void> durableDelay = Void();
|
state Future<Void> durableDelay = Void();
|
||||||
state Future<Void> durable = Void();
|
state Future<Void> durable = Void();
|
||||||
if (data->storage.canPipelineCommits()) {
|
|
||||||
state Future<int64_t> bytesLeftF;
|
|
||||||
state bool done = false;
|
|
||||||
loop{
|
|
||||||
durableInProgress.reset();
|
|
||||||
data->durableInProgress = durableInProgress.getFuture();
|
|
||||||
// Start commit all data up to version newOldestVersion that are ready to be persisted
|
|
||||||
durable = data->storage.commit();
|
|
||||||
durableDelay = delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL, TaskPriority::UpdateStorage);
|
|
||||||
newOldestVersion = data->storageVersion();
|
|
||||||
|
|
||||||
// Start moving data after newOldestVersion from SS into engine's buffer to prepare for next round
|
state int64_t ssCommitQuotaBytes;
|
||||||
startOldestVersion = data->storageVersion();
|
state Version pendingCommitVersion;
|
||||||
desiredVersion = data->desiredOldestVersion.get();
|
state int64_t bytesWritten = 0;
|
||||||
bytesLeftF = data->storage.asyncMakeVersionMutationsDurable(startOldestVersion, desiredVersion, bytesLeft);
|
state bool finalCommit = false;
|
||||||
|
state bool done = false;
|
||||||
wait(durable && durableDelay && success(bytesLeftF));
|
loop {
|
||||||
|
// Keep making data from mutation log durable, until no data left whose version is <= desiredOldestVersion
|
||||||
if (bytesLeftF.get() > 0) {
|
pendingCommitVersion = data->storageVersion();
|
||||||
wait(data->storage.commit());
|
ssCommitQuotaBytes = SERVER_KNOBS->STORAGE_COMMIT_BYTES;
|
||||||
newOldestVersion = data->storageVersion();
|
durableInProgress.reset();
|
||||||
done = true;
|
data->durableInProgress = durableInProgress.getFuture();
|
||||||
}
|
durable = data->storage.commit(); // Commit data up to(inclusive) version pendingCommitVersion
|
||||||
debug_advanceMinCommittedVersion( data->thisServerID, newOldestVersion );
|
durableDelay = delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL, TaskPriority::UpdateStorage);
|
||||||
|
if (finalCommit) {
|
||||||
if(newOldestVersion > data->rebootAfterDurableVersion) {
|
wait(durable && durableDelay);
|
||||||
TraceEvent("RebootWhenDurableTriggered", data->thisServerID).detail("NewOldestVersion", newOldestVersion).detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
|
done = true;
|
||||||
// To avoid brokenPromise error, which is caused by the sender of the durableInProgress (i.e., this process)
|
} else {
|
||||||
// never sets durableInProgress, we should set durableInProgress before send the please_reboot() error.
|
// Move data start from pendingCommitVersion+1 to SE's commit buffer.
|
||||||
// Otherwise, in the race situation when storage server receives both reboot and
|
bool _finalCommit = wait(data->storage.asyncPrepareVersionsForCommit(desiredOldestVersion, durable, durableDelay));
|
||||||
// brokenPromise of durableInProgress, the worker of the storage server will die.
|
finalCommit = _finalCommit;
|
||||||
// We will eventually end up with no worker for storage server role.
|
|
||||||
// The data distributor's buildTeam() will get stuck in building a team
|
|
||||||
durableInProgress.sendError(please_reboot());
|
|
||||||
throw please_reboot();
|
|
||||||
}
|
|
||||||
|
|
||||||
durableInProgress.send(Void());
|
|
||||||
wait( delay(0, TaskPriority::UpdateStorage) ); //Setting durableInProgess could cause the storage server to shut down, so delay to check for cancellation
|
|
||||||
|
|
||||||
// Taking and releasing the durableVersionLock ensures that no eager reads both begin before the commit was effective and
|
|
||||||
// are applied after we change the durable version. Also ensure that we have to lock while calling changeDurableVersion,
|
|
||||||
// because otherwise the latest version of mutableData might be partially loaded.
|
|
||||||
wait( data->durableVersionLock.take() );
|
|
||||||
data->popVersion( data->durableVersion.get() + 1 );
|
|
||||||
|
|
||||||
while (!changeDurableVersion( data, newOldestVersion )) {
|
|
||||||
if(g_network->check_yield(TaskPriority::UpdateStorage)) {
|
|
||||||
data->durableVersionLock.release();
|
|
||||||
wait(delay(0, TaskPriority::UpdateStorage));
|
|
||||||
wait( data->durableVersionLock.take() );
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
data->durableVersionLock.release();
|
|
||||||
if (done) break;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
|
|
||||||
// Write mutations to storage until we reach the desiredVersion or have written too much (bytesleft)
|
|
||||||
loop {
|
|
||||||
state bool mvmdDone = data->storage.makeVersionMutationsDurable(newOldestVersion, desiredVersion, bytesLeft);
|
|
||||||
// We want to forget things from these data structures atomically with changing oldestVersion (and "before", since oldestVersion.set() may trigger waiting actors)
|
|
||||||
// forgetVersionsBeforeAsync visibly forgets immediately (without waiting) but asynchronously frees memory.
|
|
||||||
Future<Void> finishedForgetting = data->mutableData().forgetVersionsBeforeAsync( newOldestVersion, TaskPriority::UpdateStorage );
|
|
||||||
data->oldestVersion.set( newOldestVersion );
|
|
||||||
wait( finishedForgetting );
|
|
||||||
wait( yield(TaskPriority::UpdateStorage) );
|
|
||||||
if (mvmdDone) break;
|
|
||||||
}
|
}
|
||||||
|
debug_advanceMinCommittedVersion( data->thisServerID, pendingCommitVersion );
|
||||||
|
|
||||||
// Set the new durable version as part of the outstanding change set, before commit
|
if(pendingCommitVersion > data->rebootAfterDurableVersion) {
|
||||||
if (startOldestVersion != newOldestVersion)
|
TraceEvent("RebootWhenDurableTriggered", data->thisServerID).detail("PendingCommitVersion", pendingCommitVersion).detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
|
||||||
data->storage.makeVersionDurable( newOldestVersion );
|
|
||||||
|
|
||||||
debug_advanceMaxCommittedVersion( data->thisServerID, newOldestVersion );
|
|
||||||
durable = data->storage.commit();
|
|
||||||
|
|
||||||
if (bytesLeft > 0) {
|
|
||||||
durableDelay = delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL, TaskPriority::UpdateStorage);
|
|
||||||
}
|
|
||||||
|
|
||||||
wait( durable );
|
|
||||||
|
|
||||||
debug_advanceMinCommittedVersion( data->thisServerID, newOldestVersion );
|
|
||||||
|
|
||||||
if(newOldestVersion > data->rebootAfterDurableVersion) {
|
|
||||||
TraceEvent("RebootWhenDurableTriggered", data->thisServerID).detail("NewOldestVersion", newOldestVersion).detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
|
|
||||||
// To avoid brokenPromise error, which is caused by the sender of the durableInProgress (i.e., this process)
|
// To avoid brokenPromise error, which is caused by the sender of the durableInProgress (i.e., this process)
|
||||||
// never sets durableInProgress, we should set durableInProgress before send the please_reboot() error.
|
// never sets durableInProgress, we should set durableInProgress before send the please_reboot() error.
|
||||||
// Otherwise, in the race situation when storage server receives both reboot and
|
// Otherwise, in the race situation when storage server receives both reboot and
|
||||||
|
@ -3190,18 +3125,16 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
||||||
wait( data->durableVersionLock.take() );
|
wait( data->durableVersionLock.take() );
|
||||||
data->popVersion( data->durableVersion.get() + 1 );
|
data->popVersion( data->durableVersion.get() + 1 );
|
||||||
|
|
||||||
while (!changeDurableVersion( data, newOldestVersion )) {
|
// Update durableVersion to pendingCommitVersion, which has been committed.
|
||||||
|
while (!changeDurableVersion( data, pendingCommitVersion )) {
|
||||||
if(g_network->check_yield(TaskPriority::UpdateStorage)) {
|
if(g_network->check_yield(TaskPriority::UpdateStorage)) {
|
||||||
data->durableVersionLock.release();
|
data->durableVersionLock.release();
|
||||||
wait(delay(0, TaskPriority::UpdateStorage));
|
wait(delay(0, TaskPriority::UpdateStorage));
|
||||||
wait( data->durableVersionLock.take() );
|
wait( data->durableVersionLock.take() );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
data->durableVersionLock.release();
|
data->durableVersionLock.release();
|
||||||
|
if (done) break;
|
||||||
//TraceEvent("StorageServerDurable", data->thisServerID).detail("Version", newOldestVersion);
|
|
||||||
wait( durableDelay );
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3275,60 +3208,101 @@ void StorageServerDisk::writeMutation( MutationRef mutation ) {
|
||||||
ASSERT(false);
|
ASSERT(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageServerDisk::writeMutations(const VectorRef<MutationRef>& mutations, Version debugVersion,
|
ACTOR Future<int64_t> asyncWriteMutationsToCommitBuffer(StorageServer* data, VectorRef<MutationRef> mutations, Version debugVersion, const char* debugContext, int64_t ssCommitQuotaBytes) {
|
||||||
const char* debugContext) {
|
state int bytesWritten = 0;
|
||||||
for (const auto& m : mutations) {
|
state int i = 0;
|
||||||
|
for (;i < mutations.size(); i++) {
|
||||||
|
const auto& m = mutations[i];
|
||||||
DEBUG_MUTATION(debugContext, debugVersion, m).detail("UID", data->thisServerID);
|
DEBUG_MUTATION(debugContext, debugVersion, m).detail("UID", data->thisServerID);
|
||||||
if (m.type == MutationRef::SetValue) {
|
if (m.type == MutationRef::SetValue) {
|
||||||
storage->set(KeyValueRef(m.param1, m.param2));
|
data->storage.set(KeyValueRef(m.param1, m.param2));
|
||||||
} else if (m.type == MutationRef::ClearRange) {
|
} else if (m.type == MutationRef::ClearRange) {
|
||||||
storage->clear(KeyRangeRef(m.param1, m.param2));
|
data->storage.clear(KeyRangeRef(m.param1, m.param2));
|
||||||
|
}
|
||||||
|
auto mutationBytes = mvccStorageBytes(m);
|
||||||
|
bytesWritten += mutationBytes;
|
||||||
|
ssCommitQuotaBytes -= mutationBytes;
|
||||||
|
if (data->storage.canPipelineCommits() && bytesWritten >= SERVER_KNOBS->STORAGE_COMMIT_PIPELINE_BYTES_PER_YIELD) {
|
||||||
|
bytesWritten = 0;
|
||||||
|
wait(yield());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return ssCommitQuotaBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<int64_t> _asyncMakeVersionMutationsDurable(StorageServerDisk* self, StorageServer* data, Version startStorageVersion, Version desiredVersion, int64_t bytesLeft) {
|
ACTOR Future<bool> asyncPrepareVersionsForCommit_impl(StorageServerDisk* self, StorageServer* data, Version desiredOldestVersion, Future<Void> durable, Future<Void>durableDelay) {
|
||||||
state Version newOldestVersion = startStorageVersion;
|
state int64_t ssCommitQuotaBytes = SERVER_KNOBS->STORAGE_COMMIT_BYTES;
|
||||||
// Write mutations to storage until we reach the desiredVersion or have written too much (bytesleft)
|
state bool finalCommit = false;
|
||||||
|
state Version startOldestVersion = data->storageVersion();
|
||||||
|
state Version newOldestVersion = data->storageVersion();
|
||||||
|
state ActorCollection forgetter(true);
|
||||||
|
state bool forgetting = false;
|
||||||
loop {
|
loop {
|
||||||
state bool done = self->makeVersionMutationsDurable(newOldestVersion, desiredVersion, bytesLeft);
|
// While committing previously written data, keep writting new data from later versions until
|
||||||
// We want to forget things from these data structures atomically with changing oldestVersion (and "before", since oldestVersion.set() may trigger waiting actors)
|
// 1.) commit is done, or
|
||||||
// forgetVersionsBeforeAsync visibly forgets immediately (without waiting) but asynchronously frees memory.
|
// 2.) ssCommitQuotaBytes <= 0, or
|
||||||
Future<Void> finishedForgetting = data->mutableData().forgetVersionsBeforeAsync( newOldestVersion, TaskPriority::UpdateStorage );
|
// 3.) no data in mutaion log to write.
|
||||||
data->oldestVersion.set( newOldestVersion );
|
if (!data->storage.canPipelineCommits()) {
|
||||||
wait( finishedForgetting );
|
// Don't write version data while a commit is going on if the storage engine does not support pipelining
|
||||||
wait( yield(TaskPriority::UpdateStorage) );
|
wait(durable && durableDelay);
|
||||||
if (done) break;
|
}
|
||||||
}
|
// Apply mutations from the mutationLog
|
||||||
|
auto u = data->getMutationLog().upper_bound(newOldestVersion);
|
||||||
|
if (u != data->getMutationLog().end() && u->first <= desiredOldestVersion) {
|
||||||
|
VerUpdateRef const& v = u->second;
|
||||||
|
newOldestVersion = v.version;
|
||||||
|
ASSERT( newOldestVersion > data->storageVersion() && newOldestVersion <= desiredOldestVersion );
|
||||||
|
// TODO(alexmiller): Update to version tracking.
|
||||||
|
DEBUG_KEY_RANGE("asyncPrepareVersionsForCommit", newOldestVersion, KeyRangeRef());
|
||||||
|
int64_t _ssCommitQuotaBytes = wait(asyncWriteMutationsToCommitBuffer(data, v.mutations, newOldestVersion, "asyncPrepareVersionsForCommit", ssCommitQuotaBytes));
|
||||||
|
ssCommitQuotaBytes = _ssCommitQuotaBytes;
|
||||||
|
|
||||||
// Set the new durable version as part of the outstanding change set, before commit
|
// We want to forget things from these data structures atomically with changing oldestVersion (and "before", since oldestVersion.set() may trigger waiting actors)
|
||||||
if (startStorageVersion != newOldestVersion)
|
// forgetVersionsBeforeAsync visibly forgets immediately (without waiting) but asynchronously frees memory.
|
||||||
self->makeVersionDurable( newOldestVersion );
|
forgetter.add(data->mutableData().forgetVersionsBeforeAsync( newOldestVersion, TaskPriority::UpdateStorage ));
|
||||||
return bytesLeft;
|
forgetting = true;
|
||||||
|
data->oldestVersion.set( newOldestVersion );
|
||||||
|
if (ssCommitQuotaBytes <= 0) {
|
||||||
|
// No quota left. Wait for previous commit to finish.
|
||||||
|
wait(durable && durableDelay);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (data->storage.canPipelineCommits() && durable.isReady() && durableDelay.isReady()) {
|
||||||
|
// Previous commit is done.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Since there is no data in mutation log, in order to make progress,
|
||||||
|
// advance it to desiredOldestVersion directly
|
||||||
|
newOldestVersion = desiredOldestVersion;
|
||||||
|
// We want to forget things from these data structures atomically with changing oldestVersion (and "before", since oldestVersion.set() may trigger waiting actors)
|
||||||
|
// forgetVersionsBeforeAsync visibly forgets immediately (without waiting) but asynchronously frees memory.
|
||||||
|
forgetter.add(data->mutableData().forgetVersionsBeforeAsync( newOldestVersion, TaskPriority::UpdateStorage ));
|
||||||
|
forgetting = true;
|
||||||
|
data->oldestVersion.set( newOldestVersion );
|
||||||
|
|
||||||
|
// No more data in mutation log can be written.
|
||||||
|
finalCommit = true;
|
||||||
|
|
||||||
|
// Wait the previously written data to be committed
|
||||||
|
wait(durable && durableDelay);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (newOldestVersion > startOldestVersion){
|
||||||
|
// Set the new durable version as part of the outstanding change set, before commit
|
||||||
|
data->storage.makeVersionDurable( newOldestVersion );
|
||||||
|
}
|
||||||
|
debug_advanceMaxCommittedVersion( data->thisServerID, newOldestVersion );
|
||||||
|
if (forgetting) {
|
||||||
|
wait(forgetter.getResult());
|
||||||
|
}
|
||||||
|
return finalCommit;
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<int64_t> StorageServerDisk::asyncMakeVersionMutationsDurable(Version startStorageVersion, Version desiredVersion, int64_t bytesLeft) {
|
Future<bool> StorageServerDisk::asyncPrepareVersionsForCommit(Version desiredOldestVersion, Future<Void> durable, Future<Void>durableDelay) {
|
||||||
return _asyncMakeVersionMutationsDurable(this, data, startStorageVersion, desiredVersion, bytesLeft);
|
return asyncPrepareVersionsForCommit_impl(this, data, desiredOldestVersion, durable, durableDelay);
|
||||||
}
|
|
||||||
|
|
||||||
bool StorageServerDisk::makeVersionMutationsDurable( Version& prevStorageVersion, Version newStorageVersion, int64_t& bytesLeft ) {
|
|
||||||
if (bytesLeft <= 0) return true;
|
|
||||||
|
|
||||||
// Apply mutations from the mutationLog
|
|
||||||
auto u = data->getMutationLog().upper_bound(prevStorageVersion);
|
|
||||||
if (u != data->getMutationLog().end() && u->first <= newStorageVersion) {
|
|
||||||
VerUpdateRef const& v = u->second;
|
|
||||||
ASSERT( v.version > prevStorageVersion && v.version <= newStorageVersion );
|
|
||||||
// TODO(alexmiller): Update to version tracking.
|
|
||||||
DEBUG_KEY_RANGE("makeVersionMutationsDurable", v.version, KeyRangeRef());
|
|
||||||
writeMutations(v.mutations, v.version, "makeVersionDurable");
|
|
||||||
for (const auto& m : v.mutations) bytesLeft -= mvccStorageBytes(m);
|
|
||||||
prevStorageVersion = v.version;
|
|
||||||
return false;
|
|
||||||
} else {
|
|
||||||
prevStorageVersion = newStorageVersion;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update data->storage to persist the changes from (data->storageVersion(),version]
|
// Update data->storage to persist the changes from (data->storageVersion(),version]
|
||||||
|
|
Loading…
Reference in New Issue