parent
40ad06b0ac
commit
eb72427923
|
@ -57,8 +57,8 @@ public:
|
|||
|
||||
virtual void enableSnapshot() {}
|
||||
|
||||
virtual void reset() {}
|
||||
/*
|
||||
virtual void reset() {}
|
||||
/*
|
||||
Concurrency contract
|
||||
Causal consistency:
|
||||
A read which begins after a commit ends sees the effects of the commit.
|
||||
|
|
|
@ -191,9 +191,11 @@ public:
|
|||
return c;
|
||||
}
|
||||
|
||||
virtual void reset() { log_op(OpRollback, StringRef(), StringRef()); }
|
||||
virtual void reset() {
|
||||
log_op( OpRollback, StringRef(), StringRef() );
|
||||
}
|
||||
|
||||
virtual Future<Optional<Value>> readValue( KeyRef key, Optional<UID> debugID = Optional<UID>() ) {
|
||||
virtual Future<Optional<Value>> readValue( KeyRef key, Optional<UID> debugID = Optional<UID>() ) {
|
||||
if(recovering.isError()) throw recovering.getError();
|
||||
if (!recovering.isReady()) return waitAndReadValue(this, key);
|
||||
|
||||
|
|
|
@ -52,9 +52,9 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( PEEK_TRACKER_EXPIRATION_TIME, 600 ); if( randomize && BUGGIFY ) PEEK_TRACKER_EXPIRATION_TIME = g_random->coinflip() ? 0.1 : 60;
|
||||
init( PARALLEL_GET_MORE_REQUESTS, 32 ); if( randomize && BUGGIFY ) PARALLEL_GET_MORE_REQUESTS = 2;
|
||||
init( MAX_QUEUE_COMMIT_BYTES, 15e6 ); if( randomize && BUGGIFY ) MAX_QUEUE_COMMIT_BYTES = 5000;
|
||||
init(REUSE_MEMORY_STORE_ON_ROLLBACK, 1);
|
||||
init( REUSE_MEMORY_STORE_ON_ROLLBACK, 1 );
|
||||
|
||||
// Versions
|
||||
// Versions
|
||||
init( MAX_VERSIONS_IN_FLIGHT, 100000000 );
|
||||
init( VERSIONS_PER_SECOND, 1000000 );
|
||||
init( MAX_READ_TRANSACTION_LIFE_VERSIONS, 5 * VERSIONS_PER_SECOND ); if (randomize && BUGGIFY) MAX_READ_TRANSACTION_LIFE_VERSIONS=std::max<int>(1, 0.1 * VERSIONS_PER_SECOND); else if( randomize && BUGGIFY ) MAX_READ_TRANSACTION_LIFE_VERSIONS = 10 * VERSIONS_PER_SECOND;
|
||||
|
|
|
@ -57,9 +57,9 @@ public:
|
|||
double PEEK_TRACKER_EXPIRATION_TIME;
|
||||
int PARALLEL_GET_MORE_REQUESTS;
|
||||
int64_t MAX_QUEUE_COMMIT_BYTES;
|
||||
int REUSE_MEMORY_STORE_ON_ROLLBACK;
|
||||
int REUSE_MEMORY_STORE_ON_ROLLBACK;
|
||||
|
||||
// Versions
|
||||
// Versions
|
||||
int MAX_VERSIONS_IN_FLIGHT;
|
||||
int MAX_READ_TRANSACTION_LIFE_VERSIONS;
|
||||
int MAX_WRITE_TRANSACTION_LIFE_VERSIONS;
|
||||
|
|
|
@ -3146,18 +3146,17 @@ bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData
|
|||
self.shards.insert( allKeys, Reference<ShardInfo>() );
|
||||
|
||||
// Dispose the IKVS (destroying its data permanently) only if this shutdown is definitely permanent. Otherwise just close it.
|
||||
if (e.code() == error_code_please_reboot &&
|
||||
persistentData->getType() == KeyValueStoreType::MEMORY &&
|
||||
SERVER_KNOBS->REUSE_MEMORY_STORE_ON_ROLLBACK) {
|
||||
// do nothing on memory storage server reboot when feature is enabled.
|
||||
} else if (e.code() == error_code_worker_removed ||
|
||||
e.code() == error_code_recruitment_failed) {
|
||||
persistentData->dispose();
|
||||
} else {
|
||||
persistentData->close();
|
||||
}
|
||||
if ( e.code() == error_code_please_reboot &&
|
||||
persistentData->getType() == KeyValueStoreType::MEMORY &&
|
||||
SERVER_KNOBS->REUSE_MEMORY_STORE_ON_ROLLBACK) {
|
||||
// do nothing on memory storage server reboot when feature is enabled.
|
||||
} else if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed) {
|
||||
persistentData->dispose();
|
||||
} else {
|
||||
persistentData->close();
|
||||
}
|
||||
|
||||
if ( e.code() == error_code_worker_removed ||
|
||||
if ( e.code() == error_code_worker_removed ||
|
||||
e.code() == error_code_recruitment_failed ||
|
||||
e.code() == error_code_file_not_found ||
|
||||
e.code() == error_code_actor_cancelled )
|
||||
|
|
|
@ -110,35 +110,27 @@ ACTOR Future<Void> forwardError( PromiseStream<ErrorInfo> errors,
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> handleIOErrors(Future<Void> actor, IClosable *store, UID id,
|
||||
Future<Void> onClosed = Void(),
|
||||
bool ignoreReboots = false) {
|
||||
choose {
|
||||
when(state ErrorOr<Void> e = wait(errorOr(actor))) {
|
||||
if (ignoreReboots && e.isError() &&
|
||||
e.getError().code() == error_code_please_reboot) {
|
||||
// no need to wait.
|
||||
} else {
|
||||
Void _ = wait(onClosed);
|
||||
}
|
||||
if (e.isError())
|
||||
throw e.getError();
|
||||
else
|
||||
return e.get();
|
||||
}
|
||||
when(ErrorOr<Void> e =
|
||||
wait(actor.isReady() ? Never() : errorOr(store->getError()))) {
|
||||
TraceEvent("WorkerTerminatingByIOError", id).error(e.getError(), true);
|
||||
actor.cancel();
|
||||
// file_not_found can occur due to attempting to open a partially deleted
|
||||
// DiskQueue, which should not be reported SevError.
|
||||
if (e.getError().code() == error_code_file_not_found) {
|
||||
TEST(true); // Worker terminated with file_not_found error
|
||||
return Void();
|
||||
}
|
||||
throw e.getError();
|
||||
}
|
||||
}
|
||||
ACTOR Future<Void> handleIOErrors( Future<Void> actor, IClosable* store, UID id, Future<Void> onClosed = Void(), bool ignoreReboots = false) {
|
||||
choose {
|
||||
when (state ErrorOr<Void> e = wait( errorOr(actor) )) {
|
||||
if (ignoreReboots && e.isError() && e.getError().code() == error_code_please_reboot) {
|
||||
// no need to wait.
|
||||
} else {
|
||||
Void _ = wait(onClosed);
|
||||
}
|
||||
if (e.isError()) throw e.getError(); else return e.get();
|
||||
}
|
||||
when (ErrorOr<Void> e = wait( actor.isReady() ? Never() : errorOr( store->getError() ) )) {
|
||||
TraceEvent("WorkerTerminatingByIOError", id).error(e.getError(), true);
|
||||
actor.cancel();
|
||||
// file_not_found can occur due to attempting to open a partially deleted DiskQueue, which should not be reported SevError.
|
||||
if (e.getError().code() == error_code_file_not_found) {
|
||||
TEST(true); // Worker terminated with file_not_found error
|
||||
return Void();
|
||||
}
|
||||
throw e.getError();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> workerHandleErrors(FutureStream<ErrorInfo> errors) {
|
||||
|
@ -355,51 +347,39 @@ ACTOR Future<Void> runProfiler(ProfilerRequest req) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> storageServerRollbackRebooter(
|
||||
Future<Void> prevStorageServer, KeyValueStoreType storeType,
|
||||
std::string filename, UID id, LocalityData locality,
|
||||
Reference<AsyncVar<ServerDBInfo>> db, std::string folder,
|
||||
ActorCollection *filesClosed, int64_t memoryLimit, IKeyValueStore *store) {
|
||||
loop {
|
||||
ErrorOr<Void> e = wait(errorOr(prevStorageServer));
|
||||
if (!e.isError())
|
||||
return Void();
|
||||
else if (e.getError().code() != error_code_please_reboot)
|
||||
throw e.getError();
|
||||
ACTOR Future<Void> storageServerRollbackRebooter( Future<Void> prevStorageServer, KeyValueStoreType storeType, std::string filename, UID id, LocalityData locality, Reference<AsyncVar<ServerDBInfo>> db, std::string folder, ActorCollection* filesClosed, int64_t memoryLimit, IKeyValueStore* store ) {
|
||||
loop {
|
||||
ErrorOr<Void> e = wait( errorOr( prevStorageServer) );
|
||||
if (!e.isError()) return Void();
|
||||
else if (e.getError().code() != error_code_please_reboot) throw e.getError();
|
||||
|
||||
TraceEvent("StorageServerRequestedReboot", id);
|
||||
TraceEvent("StorageServerRequestedReboot", id);
|
||||
|
||||
// for memory servers, we won't reload from disk if
|
||||
// REUSE_MEMORY_STORE_ON_ROLLBACK is active.
|
||||
if (SERVER_KNOBS->REUSE_MEMORY_STORE_ON_ROLLBACK &&
|
||||
storeType == KeyValueStoreType::MEMORY) {
|
||||
store->reset();
|
||||
StorageServerInterface ssi;
|
||||
ssi.uniqueID = id;
|
||||
ssi.locality = locality;
|
||||
ssi.initEndpoints();
|
||||
// for memory servers, we won't reload from disk if REUSE_MEMORY_STORE_ON_ROLLBACK is active.
|
||||
if (SERVER_KNOBS -> REUSE_MEMORY_STORE_ON_ROLLBACK && storeType == KeyValueStoreType::MEMORY) {
|
||||
store->reset();
|
||||
StorageServerInterface ssi;
|
||||
ssi.uniqueID = id;
|
||||
ssi.locality = locality;
|
||||
ssi.initEndpoints();
|
||||
|
||||
prevStorageServer =
|
||||
storageServer(store, ssi, db, folder, Promise<Void>());
|
||||
prevStorageServer =
|
||||
handleIOErrors(prevStorageServer, store, id, store->onClosed(), true);
|
||||
} else {
|
||||
// if (BUGGIFY) Void _ = wait(delay(1.0)); // This does the same thing as
|
||||
// zombie()
|
||||
// We need a new interface, since the new storageServer will do
|
||||
// replaceInterface(). And we need to destroy the old one so the failure
|
||||
// detector will know it is gone.
|
||||
StorageServerInterface ssi;
|
||||
ssi.uniqueID = id;
|
||||
ssi.locality = locality;
|
||||
ssi.initEndpoints();
|
||||
auto *kv = openKVStore(storeType, filename, ssi.uniqueID, memoryLimit);
|
||||
Future<Void> kvClosed = kv->onClosed();
|
||||
filesClosed->add(kvClosed);
|
||||
prevStorageServer = storageServer(kv, ssi, db, folder, Promise<Void>());
|
||||
prevStorageServer = handleIOErrors(prevStorageServer, kv, id, kvClosed);
|
||||
}
|
||||
}
|
||||
prevStorageServer = storageServer( store, ssi, db, folder, Promise<Void>() );
|
||||
prevStorageServer = handleIOErrors(prevStorageServer, store, id, store->onClosed(), true);
|
||||
} else {
|
||||
//if (BUGGIFY) Void _ = wait(delay(1.0)); // This does the same thing as zombie()
|
||||
// We need a new interface, since the new storageServer will do replaceInterface(). And we need to destroy
|
||||
// the old one so the failure detector will know it is gone.
|
||||
StorageServerInterface ssi;
|
||||
ssi.uniqueID = id;
|
||||
ssi.locality = locality;
|
||||
ssi.initEndpoints();
|
||||
auto *kv = openKVStore(storeType, filename, ssi.uniqueID, memoryLimit);
|
||||
Future<Void> kvClosed = kv->onClosed();
|
||||
filesClosed->add(kvClosed);
|
||||
prevStorageServer = storageServer(kv, ssi, db, folder, Promise<Void>());
|
||||
prevStorageServer = handleIOErrors(prevStorageServer, kv, id, kvClosed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: This will not work correctly in simulation as all workers would share the same roles map
|
||||
|
@ -644,17 +624,9 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
Promise<Void> recovery;
|
||||
Future<Void> f = storageServer( kv, recruited, dbInfo, folder, recovery );
|
||||
recoveries.push_back(recovery.getFuture());
|
||||
f = handleIOErrors(
|
||||
f, kv, s.storeID, kvClosed,
|
||||
kv->getType() ==
|
||||
KeyValueStoreType::MEMORY &&
|
||||
SERVER_KNOBS
|
||||
->REUSE_MEMORY_STORE_ON_ROLLBACK);
|
||||
f = storageServerRollbackRebooter(
|
||||
f, s.storeType, s.filename, recruited.id(),
|
||||
recruited.locality, dbInfo, folder,
|
||||
&filesClosed, memoryLimit, kv);
|
||||
errorForwarders.add( forwardError( errors, "StorageServer", recruited.id(), f ) );
|
||||
f = handleIOErrors( f, kv, s.storeID, kvClosed, kv->getType() == KeyValueStoreType::MEMORY && SERVER_KNOBS->REUSE_MEMORY_STORE_ON_ROLLBACK);
|
||||
f = storageServerRollbackRebooter( f, s.storeType, s.filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit, kv);
|
||||
errorForwarders.add( forwardError( errors, "StorageServer", recruited.id(), f ) );
|
||||
} else if( s.storedComponent == DiskStore::TLogData ) {
|
||||
IKeyValueStore* kv = openKVStore( s.storeType, s.filename, s.storeID, memoryLimit, validateDataFiles );
|
||||
IDiskQueue* queue = openDiskQueue(
|
||||
|
@ -804,19 +776,10 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
ReplyPromise<StorageServerInterface> storageReady = req.reply;
|
||||
storageCache.set( req.reqId, storageReady.getFuture() );
|
||||
Future<Void> s = storageServer( data, recruited, req.seedTag, storageReady, dbInfo, folder );
|
||||
s = handleIOErrors(
|
||||
s, data, recruited.id(), kvClosed,
|
||||
data->getType() ==
|
||||
KeyValueStoreType::MEMORY &&
|
||||
SERVER_KNOBS
|
||||
->REUSE_MEMORY_STORE_ON_ROLLBACK);
|
||||
s = storageCache.removeOnReady( req.reqId, s );
|
||||
s = storageServerRollbackRebooter(
|
||||
s, req.storeType, filename,
|
||||
recruited.id(), recruited.locality,
|
||||
dbInfo, folder, &filesClosed,
|
||||
memoryLimit, data);
|
||||
errorForwarders.add( forwardError( errors, "StorageServer", recruited.id(), s ) );
|
||||
s = handleIOErrors(s, data, recruited.id(), kvClosed, data->getType() == KeyValueStoreType::MEMORY && SERVER_KNOBS->REUSE_MEMORY_STORE_ON_ROLLBACK);
|
||||
s = storageCache.removeOnReady( req.reqId, s );
|
||||
s = storageServerRollbackRebooter( s, req.storeType, filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit, data );
|
||||
errorForwarders.add( forwardError( errors, "StorageServer", recruited.id(), s ) );
|
||||
} else
|
||||
forwardPromise( req.reply, storageCache.get( req.reqId ) );
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue