Address review comments.
This commit is contained in:
parent
d8dc8e83b9
commit
3ceec01392
|
@ -56,8 +56,6 @@ public:
|
|||
virtual void resyncLog() {}
|
||||
|
||||
virtual void enableSnapshot() {}
|
||||
|
||||
virtual void reset() {}
|
||||
/*
|
||||
Concurrency contract
|
||||
Causal consistency:
|
||||
|
|
|
@ -191,11 +191,6 @@ public:
|
|||
return c;
|
||||
}
|
||||
|
||||
virtual void reset() {
|
||||
log_op( OpRollback, StringRef(), StringRef() );
|
||||
semiCommit();
|
||||
}
|
||||
|
||||
virtual Future<Optional<Value>> readValue( KeyRef key, Optional<UID> debugID = Optional<UID>() ) {
|
||||
if(recovering.isError()) throw recovering.getError();
|
||||
if (!recovering.isReady()) return waitAndReadValue(this, key);
|
||||
|
|
|
@ -52,7 +52,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( PEEK_TRACKER_EXPIRATION_TIME, 600 ); if( randomize && BUGGIFY ) PEEK_TRACKER_EXPIRATION_TIME = g_random->coinflip() ? 0.1 : 60;
|
||||
init( PARALLEL_GET_MORE_REQUESTS, 32 ); if( randomize && BUGGIFY ) PARALLEL_GET_MORE_REQUESTS = 2;
|
||||
init( MAX_QUEUE_COMMIT_BYTES, 15e6 ); if( randomize && BUGGIFY ) MAX_QUEUE_COMMIT_BYTES = 5000;
|
||||
init( REUSE_MEMORY_STORE_ON_ROLLBACK, 1 );
|
||||
|
||||
// Versions
|
||||
init( MAX_VERSIONS_IN_FLIGHT, 100000000 );
|
||||
|
|
|
@ -57,7 +57,6 @@ public:
|
|||
double PEEK_TRACKER_EXPIRATION_TIME;
|
||||
int PARALLEL_GET_MORE_REQUESTS;
|
||||
int64_t MAX_QUEUE_COMMIT_BYTES;
|
||||
int REUSE_MEMORY_STORE_ON_ROLLBACK;
|
||||
|
||||
// Versions
|
||||
int MAX_VERSIONS_IN_FLIGHT;
|
||||
|
|
|
@ -3157,10 +3157,8 @@ bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData
|
|||
self.shards.insert( allKeys, Reference<ShardInfo>() );
|
||||
|
||||
// Dispose the IKVS (destroying its data permanently) only if this shutdown is definitely permanent. Otherwise just close it.
|
||||
if ( e.code() == error_code_please_reboot &&
|
||||
persistentData->getType() == KeyValueStoreType::MEMORY &&
|
||||
SERVER_KNOBS->REUSE_MEMORY_STORE_ON_ROLLBACK) {
|
||||
// do nothing on memory storage server reboot when feature is enabled.
|
||||
if (e.code() == error_code_please_reboot) {
|
||||
// do nothing.
|
||||
} else if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed) {
|
||||
persistentData->dispose();
|
||||
} else {
|
||||
|
|
|
@ -110,10 +110,10 @@ ACTOR Future<Void> forwardError( PromiseStream<ErrorInfo> errors,
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> handleIOErrors( Future<Void> actor, IClosable* store, UID id, Future<Void> onClosed = Void(), bool ignoreReboots = false) {
|
||||
ACTOR Future<Void> handleIOErrors( Future<Void> actor, IClosable* store, UID id, Future<Void> onClosed = Void()) {
|
||||
choose {
|
||||
when (state ErrorOr<Void> e = wait( errorOr(actor) )) {
|
||||
if (ignoreReboots && e.isError() && e.getError().code() == error_code_please_reboot) {
|
||||
if (e.isError() && e.getError().code() == error_code_please_reboot) {
|
||||
// no need to wait.
|
||||
} else {
|
||||
Void _ = wait(onClosed);
|
||||
|
@ -355,30 +355,13 @@ ACTOR Future<Void> storageServerRollbackRebooter( Future<Void> prevStorageServer
|
|||
|
||||
TraceEvent("StorageServerRequestedReboot", id);
|
||||
|
||||
// for memory servers, we won't reload from disk if REUSE_MEMORY_STORE_ON_ROLLBACK is active.
|
||||
if (SERVER_KNOBS -> REUSE_MEMORY_STORE_ON_ROLLBACK && storeType == KeyValueStoreType::MEMORY) {
|
||||
store->reset();
|
||||
StorageServerInterface ssi;
|
||||
ssi.uniqueID = id;
|
||||
ssi.locality = locality;
|
||||
ssi.initEndpoints();
|
||||
StorageServerInterface ssi;
|
||||
ssi.uniqueID = id;
|
||||
ssi.locality = locality;
|
||||
ssi.initEndpoints();
|
||||
|
||||
prevStorageServer = storageServer( store, ssi, db, folder, Promise<Void>() );
|
||||
prevStorageServer = handleIOErrors(prevStorageServer, store, id, store->onClosed(), true);
|
||||
} else {
|
||||
//if (BUGGIFY) Void _ = wait(delay(1.0)); // This does the same thing as zombie()
|
||||
// We need a new interface, since the new storageServer will do replaceInterface(). And we need to destroy
|
||||
// the old one so the failure detector will know it is gone.
|
||||
StorageServerInterface ssi;
|
||||
ssi.uniqueID = id;
|
||||
ssi.locality = locality;
|
||||
ssi.initEndpoints();
|
||||
auto *kv = openKVStore(storeType, filename, ssi.uniqueID, memoryLimit);
|
||||
Future<Void> kvClosed = kv->onClosed();
|
||||
filesClosed->add(kvClosed);
|
||||
prevStorageServer = storageServer(kv, ssi, db, folder, Promise<Void>());
|
||||
prevStorageServer = handleIOErrors(prevStorageServer, kv, id, kvClosed);
|
||||
}
|
||||
prevStorageServer = storageServer( store, ssi, db, folder, Promise<Void>() );
|
||||
prevStorageServer = handleIOErrors(prevStorageServer, store, id, store->onClosed());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -624,7 +607,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
Promise<Void> recovery;
|
||||
Future<Void> f = storageServer( kv, recruited, dbInfo, folder, recovery );
|
||||
recoveries.push_back(recovery.getFuture());
|
||||
f = handleIOErrors( f, kv, s.storeID, kvClosed, kv->getType() == KeyValueStoreType::MEMORY && SERVER_KNOBS->REUSE_MEMORY_STORE_ON_ROLLBACK);
|
||||
f = handleIOErrors( f, kv, s.storeID, kvClosed);
|
||||
f = storageServerRollbackRebooter( f, s.storeType, s.filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit, kv);
|
||||
errorForwarders.add( forwardError( errors, "StorageServer", recruited.id(), f ) );
|
||||
} else if( s.storedComponent == DiskStore::TLogData ) {
|
||||
|
@ -776,7 +759,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
ReplyPromise<StorageServerInterface> storageReady = req.reply;
|
||||
storageCache.set( req.reqId, storageReady.getFuture() );
|
||||
Future<Void> s = storageServer( data, recruited, req.seedTag, storageReady, dbInfo, folder );
|
||||
s = handleIOErrors(s, data, recruited.id(), kvClosed, data->getType() == KeyValueStoreType::MEMORY && SERVER_KNOBS->REUSE_MEMORY_STORE_ON_ROLLBACK);
|
||||
s = handleIOErrors(s, data, recruited.id(), kvClosed);
|
||||
s = storageCache.removeOnReady( req.reqId, s );
|
||||
s = storageServerRollbackRebooter( s, req.storeType, filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit, data );
|
||||
errorForwarders.add( forwardError( errors, "StorageServer", recruited.id(), s ) );
|
||||
|
|
Loading…
Reference in New Issue