Merge pull request #2815 from etschannen/feature-timeout-tlog-create
Treat a tlog which takes a long time to create its disk queue as failed
This commit is contained in:
commit
ee3cde0b0d
|
@ -82,6 +82,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
|
|||
init( TLOG_DEGRADED_DURATION, 5.0 );
|
||||
init( TLOG_IGNORE_POP_AUTO_ENABLE_DELAY, 300.0 );
|
||||
init( TXS_POPPED_MAX_DELAY, 1.0 ); if ( randomize && BUGGIFY ) TXS_POPPED_MAX_DELAY = deterministicRandom()->random01();
|
||||
init( TLOG_MAX_CREATE_DURATION, 10.0 );
|
||||
|
||||
// disk snapshot max timeout, to be put in TLog, storage and coordinator nodes
|
||||
init( SNAP_CREATE_MAX_TIMEOUT, 300.0 );
|
||||
|
|
|
@ -84,6 +84,7 @@ public:
|
|||
int DISK_QUEUE_MAX_TRUNCATE_BYTES; // A truncate larger than this will cause the file to be replaced instead.
|
||||
double TLOG_DEGRADED_DURATION;
|
||||
double TXS_POPPED_MAX_DELAY;
|
||||
double TLOG_MAX_CREATE_DURATION;
|
||||
|
||||
// Data distribution queue
|
||||
double HEALTH_POLL_TIME;
|
||||
|
|
|
@ -1416,7 +1416,7 @@ ACTOR Future<Void> initPersistentState( TLogData* self, Reference<LogData> logDa
|
|||
|
||||
// PERSIST: Initial setup of persistentData for a brand new tLog for a new database
|
||||
state IKeyValueStore *storage = self->persistentData;
|
||||
wait(storage->init());
|
||||
wait( ioTimeoutError( storage->init(), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION ) );
|
||||
storage->set( persistFormat );
|
||||
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistCurrentVersionKeys.begin), BinaryWriter::toValue(logData->version.get(), Unversioned()) ) );
|
||||
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistKnownCommittedVersionKeys.begin), BinaryWriter::toValue(logData->knownCommittedVersion, Unversioned()) ) );
|
||||
|
@ -1432,7 +1432,7 @@ ACTOR Future<Void> initPersistentState( TLogData* self, Reference<LogData> logDa
|
|||
}
|
||||
|
||||
TraceEvent("TLogInitCommit", logData->logId);
|
||||
wait( self->persistentData->commit() );
|
||||
wait( ioTimeoutError( self->persistentData->commit(), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION ) );
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -2332,7 +2332,7 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
|
|||
if(restoreFromDisk) {
|
||||
wait( restorePersistentState( &self, locality, oldLog, recovered, tlogRequests ) );
|
||||
} else {
|
||||
wait( checkEmptyQueue(&self) && checkRecovered(&self) );
|
||||
wait( ioTimeoutError( checkEmptyQueue(&self) && checkRecovered(&self), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION ) );
|
||||
}
|
||||
|
||||
//Disk errors need a chance to kill this actor.
|
||||
|
|
|
@ -1807,7 +1807,7 @@ ACTOR Future<Void> initPersistentState( TLogData* self, Reference<LogData> logDa
|
|||
|
||||
// PERSIST: Initial setup of persistentData for a brand new tLog for a new database
|
||||
state IKeyValueStore *storage = self->persistentData;
|
||||
wait(storage->init());
|
||||
wait( ioTimeoutError( storage->init(), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION ) );
|
||||
storage->set( persistFormat );
|
||||
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistCurrentVersionKeys.begin), BinaryWriter::toValue(logData->version.get(), Unversioned()) ) );
|
||||
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistKnownCommittedVersionKeys.begin), BinaryWriter::toValue(logData->knownCommittedVersion, Unversioned()) ) );
|
||||
|
@ -1824,7 +1824,7 @@ ACTOR Future<Void> initPersistentState( TLogData* self, Reference<LogData> logDa
|
|||
}
|
||||
|
||||
TraceEvent("TLogInitCommit", logData->logId);
|
||||
wait( self->persistentData->commit() );
|
||||
wait( ioTimeoutError( self->persistentData->commit(), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION ) );
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -2766,7 +2766,7 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
|
|||
if(restoreFromDisk) {
|
||||
wait( restorePersistentState( &self, locality, oldLog, recovered, tlogRequests ) );
|
||||
} else {
|
||||
wait( checkEmptyQueue(&self) && checkRecovered(&self) );
|
||||
wait( ioTimeoutError( checkEmptyQueue(&self) && checkRecovered(&self), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION ) );
|
||||
}
|
||||
|
||||
//Disk errors need a chance to kill this actor.
|
||||
|
|
|
@ -199,7 +199,6 @@ Future<T> timeoutError( Future<T> what, double time, TaskPriority taskID = TaskP
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
ACTOR template <class T>
|
||||
Future<T> delayed( Future<T> what, double time = 0.0, TaskPriority taskID = TaskPriority::DefaultDelay ) {
|
||||
try {
|
||||
|
@ -866,6 +865,22 @@ Future<Void> timeoutWarningCollector( FutureStream<Void> const& input, double co
|
|||
Future<bool> quorumEqualsTrue( std::vector<Future<bool>> const& futures, int const& required );
|
||||
Future<Void> lowPriorityDelay( double const& waitTime );
|
||||
|
||||
ACTOR template <class T>
|
||||
Future<T> ioTimeoutError( Future<T> what, double time ) {
|
||||
Future<Void> end = lowPriorityDelay( time );
|
||||
choose {
|
||||
when( T t = wait( what ) ) { return t; }
|
||||
when( wait( end ) ) {
|
||||
Error err = io_timeout();
|
||||
if(g_network->isSimulated()) {
|
||||
err = err.asInjectedFault();
|
||||
}
|
||||
TraceEvent(SevError, "IoTimeoutError").error(err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR template <class T>
|
||||
Future<Void> streamHelper( PromiseStream<T> output, PromiseStream<Error> errors, Future<T> input ) {
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue