Merge commit '2a34115e65639b7aad368a148de3c4189bc34bfc'
# Conflicts: # fdbserver/storageserver.actor.cpp # fdbserver/worker.actor.cpp
This commit is contained in:
commit
6bb2ba5d92
|
@ -367,6 +367,7 @@ public:
|
|||
CoalescedKeyRangeMap<bool, int64_t, KeyBytesMetric<int64_t>> byteSampleClears;
|
||||
AsyncVar<bool> byteSampleClearsTooLarge;
|
||||
Future<Void> byteSampleRecovery;
|
||||
Future<Void> durableInProgress;
|
||||
|
||||
AsyncMap<Key,bool> watches;
|
||||
int64_t watchBytes;
|
||||
|
@ -466,6 +467,7 @@ public:
|
|||
: instanceID(g_random->randomUniqueID().first()),
|
||||
storage(this, storage), db(db),
|
||||
lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
|
||||
durableInProgress(Void()),
|
||||
versionLag(0),
|
||||
updateEagerReads(0),
|
||||
shardChangeCounter(0),
|
||||
|
@ -2617,10 +2619,14 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
|
|||
}
|
||||
|
||||
return Void(); // update will get called again ASAP
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_worker_removed && e.code() != error_code_please_reboot)
|
||||
} catch (Error& err) {
|
||||
state Error e = err;
|
||||
if (e.code() != error_code_worker_removed && e.code() != error_code_please_reboot) {
|
||||
TraceEvent(SevError, "SSUpdateError", data->thisServerID).error(e).backtrace();
|
||||
throw;
|
||||
} else if (e.code() == error_code_please_reboot) {
|
||||
wait( data->durableInProgress );
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2630,6 +2636,9 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
|||
wait( data->desiredOldestVersion.whenAtLeast( data->storageVersion()+1 ) );
|
||||
wait( delay(0, TaskUpdateStorage) );
|
||||
|
||||
state Promise<Void> durableInProgress;
|
||||
data->durableInProgress = durableInProgress.getFuture();
|
||||
|
||||
state Version startOldestVersion = data->storageVersion();
|
||||
state Version newOldestVersion = data->storageVersion();
|
||||
state Version desiredVersion = data->desiredOldestVersion.get();
|
||||
|
@ -2658,6 +2667,9 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
|||
wait( durable );
|
||||
|
||||
debug_advanceMinCommittedVersion( data->thisServerID, newOldestVersion );
|
||||
|
||||
durableInProgress.send(Void());
|
||||
wait( delay(0, TaskUpdateStorage) ); //Setting durableInProgess could cause the storage server to shut down, so delay to check for cancellation
|
||||
|
||||
// Taking and releasing the durableVersionLock ensures that no eager reads both begin before the commit was effective and
|
||||
// are applied after we change the durable version. Also ensure that we have to lock while calling changeDurableVersion,
|
||||
|
@ -3301,10 +3313,13 @@ bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData
|
|||
self.shards.insert( allKeys, Reference<ShardInfo>() );
|
||||
|
||||
// Dispose the IKVS (destroying its data permanently) only if this shutdown is definitely permanent. Otherwise just close it.
|
||||
if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed)
|
||||
if (e.code() == error_code_please_reboot) {
|
||||
// do nothing.
|
||||
} else if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed) {
|
||||
persistentData->dispose();
|
||||
else
|
||||
} else {
|
||||
persistentData->close();
|
||||
}
|
||||
|
||||
if ( e.code() == error_code_worker_removed ||
|
||||
e.code() == error_code_recruitment_failed ||
|
||||
|
|
|
@ -113,8 +113,12 @@ ACTOR Future<Void> handleIOErrors( Future<Void> actor, IClosable* store, UID id,
|
|||
state Future<ErrorOr<Void>> storeError = actor.isReady() ? Never() : errorOr( store->getError() );
|
||||
choose {
|
||||
when (state ErrorOr<Void> e = wait( errorOr(actor) )) {
|
||||
wait(onClosed);
|
||||
if(storeError.isReady()) throw storeError.getError();
|
||||
if (e.isError() && e.getError().code() == error_code_please_reboot) {
|
||||
// no need to wait.
|
||||
} else {
|
||||
wait(onClosed);
|
||||
}
|
||||
if(storeError.isReady()) throw storeError.get().getError();
|
||||
if (e.isError()) throw e.getError(); else return e.get();
|
||||
}
|
||||
when (ErrorOr<Void> e = wait( storeError )) {
|
||||
|
@ -354,7 +358,7 @@ ACTOR Future<Void> runProfiler(ProfilerRequest req) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> storageServerRollbackRebooter( Future<Void> prevStorageServer, KeyValueStoreType storeType, std::string filename, UID id, LocalityData locality, Reference<AsyncVar<ServerDBInfo>> db, std::string folder, ActorCollection* filesClosed, int64_t memoryLimit ) {
|
||||
ACTOR Future<Void> storageServerRollbackRebooter( Future<Void> prevStorageServer, KeyValueStoreType storeType, std::string filename, UID id, LocalityData locality, Reference<AsyncVar<ServerDBInfo>> db, std::string folder, ActorCollection* filesClosed, int64_t memoryLimit, IKeyValueStore* store ) {
|
||||
loop {
|
||||
ErrorOr<Void> e = wait( errorOr( prevStorageServer) );
|
||||
if (!e.isError()) return Void();
|
||||
|
@ -362,18 +366,13 @@ ACTOR Future<Void> storageServerRollbackRebooter( Future<Void> prevStorageServer
|
|||
|
||||
TraceEvent("StorageServerRequestedReboot", id);
|
||||
|
||||
//if (BUGGIFY) wait(delay(1.0)); // This does the same thing as zombie()
|
||||
// We need a new interface, since the new storageServer will do replaceInterface(). And we need to destroy
|
||||
// the old one so the failure detector will know it is gone.
|
||||
StorageServerInterface ssi;
|
||||
ssi.uniqueID = id;
|
||||
ssi.locality = locality;
|
||||
ssi.initEndpoints();
|
||||
auto* kv = openKVStore( storeType, filename, ssi.uniqueID, memoryLimit );
|
||||
Future<Void> kvClosed = kv->onClosed();
|
||||
filesClosed->add( kvClosed );
|
||||
prevStorageServer = storageServer( kv, ssi, db, folder, Promise<Void>() );
|
||||
prevStorageServer = handleIOErrors( prevStorageServer, kv, id, kvClosed );
|
||||
|
||||
prevStorageServer = storageServer( store, ssi, db, folder, Promise<Void>() );
|
||||
prevStorageServer = handleIOErrors(prevStorageServer, store, id, store->onClosed());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -590,8 +589,8 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
Promise<Void> recovery;
|
||||
Future<Void> f = storageServer( kv, recruited, dbInfo, folder, recovery );
|
||||
recoveries.push_back(recovery.getFuture());
|
||||
f = handleIOErrors( f, kv, s.storeID, kvClosed );
|
||||
f = storageServerRollbackRebooter( f, s.storeType, s.filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit );
|
||||
f = handleIOErrors( f, kv, s.storeID, kvClosed );
|
||||
f = storageServerRollbackRebooter( f, s.storeType, s.filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit, kv);
|
||||
errorForwarders.add( forwardError( errors, Role::STORAGE_SERVER, recruited.id(), f ) );
|
||||
} else if( s.storedComponent == DiskStore::TLogData ) {
|
||||
IKeyValueStore* kv = openKVStore( s.storeType, s.filename, s.storeID, memoryLimit, validateDataFiles );
|
||||
|
@ -748,7 +747,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
Future<Void> s = storageServer( data, recruited, req.seedTag, storageReady, dbInfo, folder );
|
||||
s = handleIOErrors(s, data, recruited.id(), kvClosed);
|
||||
s = storageCache.removeOnReady( req.reqId, s );
|
||||
s = storageServerRollbackRebooter( s, req.storeType, filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit );
|
||||
s = storageServerRollbackRebooter( s, req.storeType, filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit, data );
|
||||
errorForwarders.add( forwardError( errors, Role::STORAGE_SERVER, recruited.id(), s ) );
|
||||
} else
|
||||
forwardPromise( req.reply, storageCache.get( req.reqId ) );
|
||||
|
|
Loading…
Reference in New Issue