address cr comments:

This commit is contained in:
mengranwo 2019-06-20 17:41:00 -07:00 committed by Alex Miller
parent d96cdacdd5
commit c7148bbb14
1 changed files with 12 additions and 14 deletions

View File

@ -3627,10 +3627,10 @@ bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData
return false;
}
ACTOR Future<bool> memoryStoreRecover(IKeyValueStore* store, Reference<ClusterConnectionFile> connFile, UID id)
ACTOR Future<Void> memoryStoreRecover(IKeyValueStore* store, Reference<ClusterConnectionFile> connFile, UID id)
{
if(store->getType() != KeyValueStoreType::MEMORY || connFile.getPtr() == nullptr) {
return false;
return Never();
}
// create a temp client connect to DB
@ -3650,7 +3650,7 @@ ACTOR Future<bool> memoryStoreRecover(IKeyValueStore* store, Reference<ClusterCo
TraceEvent("RemoveStorageServerRetrying").detail("Count", noCanRemoveCount++).detail("ServerID", id).detail("CanRemove", canRemove);
} else {
wait(tr.commit());
return true;
return Void();
}
} catch (Error& e) {
state Error err = e;
@ -3792,19 +3792,17 @@ ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerI
state double start = now();
TraceEvent("StorageServerRebootStart", self.thisServerID);
state Future<bool> dispose = memoryStoreRecover (persistentData, connFile, self.thisServerID);
wait(self.storage.init());
choose {
//after a rollback there might be uncommitted changes.
//for memory storage engine type, wait until recovery is done before commit
state Future<Void> committed = self.storage.commit();
wait(success(dispose) || success(committed));
if (committed.isReady()) {
// recovery finished before dispose
dispose.cancel();
} else if (dispose.isReady() && dispose.get()){
wait(self.storage.commit()) {}
wait(memoryStoreRecover (persistentData, connFile, self.thisServerID)) {
TraceEvent("DisposeStorageServer", self.thisServerID);
throw worker_removed();
}
}
bool ok = wait( self.storage.restoreDurableState() );
if (!ok) {