Instead of ignoring onClosed on all IOErrors, only ignore reboots (and only for memory and if the flag is on).

This commit is contained in:
Clement Pang 2018-10-07 02:53:20 -07:00
parent 6fd5c8e0ff
commit ebc42ba609
1 changed files with 8 additions and 12 deletions

View File

@ -110,10 +110,14 @@ ACTOR Future<Void> forwardError( PromiseStream<ErrorInfo> errors,
}
}
ACTOR Future<Void> handleIOErrors( Future<Void> actor, IClosable* store, UID id, Future<Void> onClosed = Void() ) {
ACTOR Future<Void> handleIOErrors( Future<Void> actor, IClosable* store, UID id, Future<Void> onClosed = Void(), boolean ignoreReboots = false) {
choose {
when (state ErrorOr<Void> e = wait( errorOr(actor) )) {
Void _ = wait(onClosed);
if (ignoreReboots && e.isError() && e.getError().code() == error_code_please_reboot) {
// no need to wait.
} else {
Void _ = wait(onClosed);
}
if (e.isError()) throw e.getError(); else return e.get();
}
when (ErrorOr<Void> e = wait( actor.isReady() ? Never() : errorOr( store->getError() ) )) {
@ -621,11 +625,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
Promise<Void> recovery;
Future<Void> f = storageServer( kv, recruited, dbInfo, folder, recovery );
recoveries.push_back(recovery.getFuture());
if (kv->getType() == KeyValueStoreType::MEMORY && SERVER_KNOBS->REUSE_MEMORY_STORE_ON_ROLLBACK) {
f = handleIOErrors( f, kv, s.storeID);
} else {
f = handleIOErrors( f, kv, s.storeID, kvClosed);
}
f = handleIOErrors( f, kv, s.storeID, kvClosed, kv->getType() == KeyValueStoreType::MEMORY && SERVER_KNOBS->REUSE_MEMORY_STORE_ON_ROLLBACK);
f = storageServerRollbackRebooter( f, s.storeType, s.filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit, kv);
errorForwarders.add( forwardError( errors, "StorageServer", recruited.id(), f ) );
} else if( s.storedComponent == DiskStore::TLogData ) {
@ -777,11 +777,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
ReplyPromise<StorageServerInterface> storageReady = req.reply;
storageCache.set( req.reqId, storageReady.getFuture() );
Future<Void> s = storageServer( data, recruited, req.seedTag, storageReady, dbInfo, folder );
if (data->getType() == KeyValueStoreType::MEMORY && SERVER_KNOBS->REUSE_MEMORY_STORE_ON_ROLLBACK) {
s = handleIOErrors(s, data, recruited.id());
} else {
s = handleIOErrors(s, data, recruited.id(), kvClosed);
}
s = handleIOErrors(s, data, recruited.id(), kvClosed, data->getType() == KeyValueStoreType::MEMORY && SERVER_KNOBS->REUSE_MEMORY_STORE_ON_ROLLBACK);
s = storageCache.removeOnReady( req.reqId, s );
s = storageServerRollbackRebooter( s, req.storeType, filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit, data );
errorForwarders.add( forwardError( errors, "StorageServer", recruited.id(), s ) );