do not register a worker with the cluster controller until it has finished recovering all files from disk

This commit is contained in:
Evan Tschannen 2017-09-15 10:57:58 -07:00
parent f3b7aa615d
commit 36c98f18e9
4 changed files with 33 additions and 14 deletions

View File

@ -1284,7 +1284,7 @@ ACTOR Future<Void> checkEmptyQueue(TLogData* self) {
}
}
ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality, Promise<Void> oldLog, PromiseStream<InitializeTLogRequest> tlogRequests ) {
ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality, Promise<Void> oldLog, Promise<Void> recovered, PromiseStream<InitializeTLogRequest> tlogRequests ) {
state double startt = now();
state Reference<LogData> logData;
state KeyRange tagKeys;
@ -1336,6 +1336,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
DUMPTOKEN( recruited.confirmRunning );
//FIXME: need for upgrades from 4.X to 5.0, remove once this upgrade path is no longer needed
if(recovered.canBeSet()) recovered.send(Void());
oldLog.send(Void());
while(!tlogRequests.isEmpty()) {
tlogRequests.getFuture().pop().reply.sendError(recruitment_failed());
@ -1725,7 +1726,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
}
// New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog )
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered )
{
state TLogData self( tlogId, persistentData, persistentQueue, db );
state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
@ -1734,11 +1735,13 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
try {
if(restoreFromDisk) {
Void _ = wait( restorePersistentState( &self, locality, oldLog, tlogRequests ) );
Void _ = wait( restorePersistentState( &self, locality, oldLog, recovered, tlogRequests ) );
} else {
Void _ = wait( checkEmptyQueue(&self) );
}
if(recovered.canBeSet()) recovered.send(Void());
self.sharedActors.send( cleanupPeekTrackers(&self) );
self.sharedActors.send( commitQueue(&self) );
self.sharedActors.send( updateStorageLoop(&self) );
@ -1758,6 +1761,8 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
}
} catch (Error& e) {
TraceEvent("TLogError", tlogId).error(e, true);
if(recovered.canBeSet()) recovered.send(Void());
while(!tlogRequests.isEmpty()) {
tlogRequests.getFuture().pop().reply.sendError(recruitment_failed());
}

View File

@ -287,10 +287,11 @@ Future<Void> storageServer(
class IKeyValueStore* const& persistentData,
StorageServerInterface const& ssi,
Reference<AsyncVar<ServerDBInfo>> const& db,
std::string const& folder ); // changes pssi->id() to be the recovered ID
std::string const& folder,
Promise<Void> const& recovered); // changes pssi->id() to be the recovered ID
Future<Void> masterServer( MasterInterface const& mi, Reference<AsyncVar<ServerDBInfo>> const& db, class ServerCoordinators const&, LifetimeToken const& lifetime );
Future<Void> masterProxyServer(MasterProxyInterface const& proxy, InitializeMasterProxyRequest const& req, Reference<AsyncVar<ServerDBInfo>> const& db);
Future<Void> tLog( class IKeyValueStore* const& persistentData, class IDiskQueue* const& persistentQueue, Reference<AsyncVar<ServerDBInfo>> const& db, LocalityData const& locality, PromiseStream<InitializeTLogRequest> const& tlogRequests, UID const& tlogId, bool const& restoreFromDisk, Promise<Void> const& oldLog ); // changes tli->id() to be the recovered ID
Future<Void> tLog( class IKeyValueStore* const& persistentData, class IDiskQueue* const& persistentQueue, Reference<AsyncVar<ServerDBInfo>> const& db, LocalityData const& locality, PromiseStream<InitializeTLogRequest> const& tlogRequests, UID const& tlogId, bool const& restoreFromDisk, Promise<Void> const& oldLog, Promise<Void> const& recovered ); // changes tli->id() to be the recovered ID
Future<Void> debugQueryServer( DebugQueryRequest const& req );
Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> const& ccInterface, Reference<ClusterConnectionFile> const&, LocalityData const&, Reference<AsyncVar<ServerDBInfo>> const& dbInfo );
Future<Void> resolver( ResolverInterface const& proxy, InitializeResolverRequest const&, Reference<AsyncVar<ServerDBInfo>> const& db );

View File

@ -3224,7 +3224,7 @@ ACTOR Future<Void> replaceInterface( StorageServer* self, StorageServerInterface
return Void();
}
ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Reference<AsyncVar<ServerDBInfo>> db, std::string folder )
ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Reference<AsyncVar<ServerDBInfo>> db, std::string folder, Promise<Void> recovered )
{
state StorageServer self(persistentData, db, ssi);
self.folder = folder;
@ -3233,23 +3233,28 @@ ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerI
state double start = now();
TraceEvent("StorageServerRebootStart", self.thisServerID);
bool ok = wait( self.storage.restoreDurableState() );
if (!ok) return Void();
if (!ok) {
if(recovered.canBeSet()) recovered.send(Void());
return Void();
}
TraceEvent("SSTimeRestoreDurableState", self.thisServerID).detail("TimeTaken", now() - start);
ASSERT( self.thisServerID == ssi.id() );
TraceEvent("StorageServerReboot", self.thisServerID)
.detail("Version", self.version.get());
if(recovered.canBeSet()) recovered.send(Void());
Void _ = wait( replaceInterface( &self, ssi ) );
TraceEvent("StorageServerStartingCore", self.thisServerID).detail("TimeTaken", now() - start);
//Void _ = wait( delay(0) ); // To make sure self->zkMasterInfo.onChanged is available to wait on
Void _ = wait( storageServerCore(&self, ssi) );
throw internal_error();
} catch (Error& e) {
if(recovered.canBeSet()) recovered.send(Void());
if (storageServerTerminated(self, persistentData, e))
return Void();
throw e;

View File

@ -319,7 +319,7 @@ ACTOR Future<Void> storageServerRollbackRebooter( Future<Void> prevStorageServer
auto* kv = openKVStore( storeType, filename, ssi.uniqueID, memoryLimit );
Future<Void> kvClosed = kv->onClosed();
filesClosed->add( kvClosed );
prevStorageServer = storageServer( kv, ssi, db, folder );
prevStorageServer = storageServer( kv, ssi, db, folder, Promise<Void>() );
prevStorageServer = handleIOErrors( prevStorageServer, kv, ssi.id(), kvClosed );
}
}
@ -523,7 +523,6 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
}
errorForwarders.add( loadedPonger( interf.debugPing.getFuture() ) );
errorForwarders.add( registrationClient( ccInterface, interf, processClass ) );
errorForwarders.add( waitFailureServer( interf.waitFailure.getFuture() ) );
errorForwarders.add( monitorServerDBInfo( ccInterface, connFile, locality, dbInfo ) );
errorForwarders.add( testerServerCore( interf.testerInterface, connFile, dbInfo, locality ) );
@ -553,7 +552,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
try {
std::vector<DiskStore> stores = getDiskStores( folder );
bool validateDataFiles = deleteFile(joinPath(folder, validationFilename));
std::vector<Future<Void>> recoveries;
for( int f = 0; f < stores.size(); f++ ) {
DiskStore s = stores[f];
// FIXME: Error handling
@ -584,7 +583,9 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
DUMPTOKEN(recruited.getKeyValueStoreType);
DUMPTOKEN(recruited.watchValue);
Future<Void> f = storageServer( kv, recruited, dbInfo, folder );
Promise<Void> recovery;
Future<Void> f = storageServer( kv, recruited, dbInfo, folder, recovery );
recoveries.push_back(recovery.getFuture());
f = handleIOErrors( f, kv, s.storeID, kvClosed );
f = storageServerRollbackRebooter( f, s.storeType, s.filename, recruited, dbInfo, folder, &filesClosed, memoryLimit );
errorForwarders.add( forwardError( errors, "StorageServer", recruited.id(), f ) );
@ -600,7 +601,9 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
startRole( s.storeID, interf.id(), "SharedTLog", details, "Restored" );
Promise<Void> oldLog;
Future<Void> tl = tLog( kv, queue, dbInfo, locality, tlog.isReady() ? tlogRequests : PromiseStream<InitializeTLogRequest>(), s.storeID, true, oldLog );
Promise<Void> recovery;
Future<Void> tl = tLog( kv, queue, dbInfo, locality, tlog.isReady() ? tlogRequests : PromiseStream<InitializeTLogRequest>(), s.storeID, true, oldLog, recovery );
recoveries.push_back(recovery.getFuture());
tl = handleIOErrors( tl, kv, s.storeID );
tl = handleIOErrors( tl, queue, s.storeID );
if(tlog.isReady()) {
@ -616,6 +619,11 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
details["StoresPresent"] = format("%d", stores.size());
startRole( interf.id(), interf.id(), "Worker", details );
Void _ = wait(waitForAll(recoveries));
errorForwarders.add( registrationClient( ccInterface, interf, processClass ) );
TraceEvent("RecoveriesComplete", interf.id());
loop choose {
when( RebootRequest req = waitNext( interf.clientInterface.reboot.getFuture() ) ) {
@ -672,7 +680,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
IDiskQueue* queue = openDiskQueue( joinPath( folder, fileLogQueuePrefix.toString() + logId.toString() + "-" ), logId );
filesClosed.add( data->onClosed() );
filesClosed.add( queue->onClosed() );
tlog = tLog( data, queue, dbInfo, locality, tlogRequests, logId, false, Promise<Void>() );
tlog = tLog( data, queue, dbInfo, locality, tlogRequests, logId, false, Promise<Void>(), Promise<Void>() );
tlog = handleIOErrors( tlog, data, logId );
tlog = handleIOErrors( tlog, queue, logId );
errorForwarders.add( forwardError( errors, "SharedTLog", logId, tlog ) );