Do not rollback while uncommitted sets exist
This commit is contained in:
parent
88e8422511
commit
d8dc8e83b9
|
@ -333,7 +333,7 @@ public:
|
||||||
CoalescedKeyRangeMap<bool, int64_t, KeyBytesMetric<int64_t>> byteSampleClears;
|
CoalescedKeyRangeMap<bool, int64_t, KeyBytesMetric<int64_t>> byteSampleClears;
|
||||||
AsyncVar<bool> byteSampleClearsTooLarge;
|
AsyncVar<bool> byteSampleClearsTooLarge;
|
||||||
Future<Void> byteSampleRecovery;
|
Future<Void> byteSampleRecovery;
|
||||||
Future<Void> durable = Void();
|
Future<Void> durableInProgress;
|
||||||
|
|
||||||
AsyncMap<Key,bool> watches;
|
AsyncMap<Key,bool> watches;
|
||||||
int64_t watchBytes;
|
int64_t watchBytes;
|
||||||
|
@ -416,7 +416,7 @@ public:
|
||||||
: instanceID(g_random->randomUniqueID().first()),
|
: instanceID(g_random->randomUniqueID().first()),
|
||||||
storage(this, storage), db(db),
|
storage(this, storage), db(db),
|
||||||
lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
|
lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
|
||||||
updateEagerReads(0),
|
durableInProgress(Void()), updateEagerReads(0),
|
||||||
shardChangeCounter(0),
|
shardChangeCounter(0),
|
||||||
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES),
|
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES),
|
||||||
shuttingDown(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0),
|
shuttingDown(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0),
|
||||||
|
@ -2481,7 +2481,7 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
|
||||||
if (e.code() != error_code_worker_removed && e.code() != error_code_please_reboot) {
|
if (e.code() != error_code_worker_removed && e.code() != error_code_please_reboot) {
|
||||||
TraceEvent(SevError, "SSUpdateError", data->thisServerID).error(e).backtrace();
|
TraceEvent(SevError, "SSUpdateError", data->thisServerID).error(e).backtrace();
|
||||||
} else if (e.code() == error_code_please_reboot) {
|
} else if (e.code() == error_code_please_reboot) {
|
||||||
Void _ = wait( data->durable );
|
Void _ = wait( data->durableInProgress );
|
||||||
}
|
}
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
@ -2493,6 +2493,9 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
||||||
Void _ = wait( data->desiredOldestVersion.whenAtLeast( data->storageVersion()+1 ) );
|
Void _ = wait( data->desiredOldestVersion.whenAtLeast( data->storageVersion()+1 ) );
|
||||||
Void _ = wait( delay(0, TaskUpdateStorage) );
|
Void _ = wait( delay(0, TaskUpdateStorage) );
|
||||||
|
|
||||||
|
state Promise<Void> durableInProgress;
|
||||||
|
data->durableInProgress = durableInProgress.getFuture();
|
||||||
|
|
||||||
state Version startOldestVersion = data->storageVersion();
|
state Version startOldestVersion = data->storageVersion();
|
||||||
state Version newOldestVersion = data->storageVersion();
|
state Version newOldestVersion = data->storageVersion();
|
||||||
state Version desiredVersion = data->desiredOldestVersion.get();
|
state Version desiredVersion = data->desiredOldestVersion.get();
|
||||||
|
@ -2512,16 +2515,19 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
||||||
data->storage.makeVersionDurable( newOldestVersion );
|
data->storage.makeVersionDurable( newOldestVersion );
|
||||||
|
|
||||||
debug_advanceMaxCommittedVersion( data->thisServerID, newOldestVersion );
|
debug_advanceMaxCommittedVersion( data->thisServerID, newOldestVersion );
|
||||||
data->durable = data->storage.commit();
|
state Future<Void> durable = data->storage.commit();
|
||||||
state Future<Void> durableDelay = Void();
|
state Future<Void> durableDelay = Void();
|
||||||
|
|
||||||
if (bytesLeft > 0)
|
if (bytesLeft > 0)
|
||||||
durableDelay = delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL);
|
durableDelay = delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL);
|
||||||
|
|
||||||
Void _ = wait( data->durable );
|
Void _ = wait( durable );
|
||||||
|
|
||||||
debug_advanceMinCommittedVersion( data->thisServerID, newOldestVersion );
|
debug_advanceMinCommittedVersion( data->thisServerID, newOldestVersion );
|
||||||
|
|
||||||
|
durableInProgress.send(Void());
|
||||||
|
Void _ = wait( delay(0, TaskUpdateStorage) ); //Setting durableInProgess could cause the storage server to shut down, so delay to check for cancellation
|
||||||
|
|
||||||
// Taking and releasing the durableVersionLock ensures that no eager reads both begin before the commit was effective and
|
// Taking and releasing the durableVersionLock ensures that no eager reads both begin before the commit was effective and
|
||||||
// are applied after we change the durable version.
|
// are applied after we change the durable version.
|
||||||
Void _ = wait( data->durableVersionLock.take() );
|
Void _ = wait( data->durableVersionLock.take() );
|
||||||
|
|
Loading…
Reference in New Issue