fix: if the disk queue adapter peek hangs for a while, switch to a peek from a different locality

This commit is contained in:
Evan Tschannen 2018-10-03 13:58:55 -07:00
parent 28545e0f8d
commit 636420abee
4 changed files with 16 additions and 1 deletions

View File

@ -65,6 +65,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( MAX_QUEUE_COMMIT_BYTES, 15e6 ); if( randomize && BUGGIFY ) MAX_QUEUE_COMMIT_BYTES = 5000;
init( VERSIONS_PER_BATCH, VERSIONS_PER_SECOND/20 ); if( randomize && BUGGIFY ) VERSIONS_PER_BATCH = std::max<int64_t>(1,VERSIONS_PER_SECOND/1000);
init( CONCURRENT_LOG_ROUTER_READS, 1 );
init( DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME, 1.0 );
init( DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME, 5.0 );
// Data distribution queue
init( HEALTH_POLL_TIME, 1.0 );

View File

@ -69,6 +69,8 @@ public:
int64_t MAX_QUEUE_COMMIT_BYTES;
int64_t VERSIONS_PER_BATCH;
int CONCURRENT_LOG_ROUTER_READS;
double DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME;
double DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME;
// Data distribution queue
double HEALTH_POLL_TIME;

View File

@ -44,6 +44,16 @@ public:
self->cursor = self->logSystem->peekSpecial( UID(), self->recoveryLoc, self->tag, self->peekLocality ? self->peekLocality->get().first : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().second : invalidVersion );
self->localityChanged = self->peekLocality->onChange();
}
when(Void _ = wait( delay(self->peekTypeSwitches==0 ? SERVER_KNOBS->DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME : SERVER_KNOBS->DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME))) {
self->peekTypeSwitches++;
if(self->peekTypeSwitches%2==1) {
self->cursor = self->logSystem->peek( UID(), self->recoveryLoc, self->tag, true );
self->localityChanged = Never();
} else {
self->cursor = self->logSystem->peekSpecial( UID(), self->recoveryLoc, self->tag, self->peekLocality ? self->peekLocality->get().first : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().second : invalidVersion );
self->localityChanged = self->peekLocality->onChange();
}
}
}
}
TraceEvent("PeekNextGetMore").detail("Queue", self->recoveryQueue.size()).detail("Bytes", bytes).detail("Loc", self->recoveryLoc).detail("End", self->logSystem->getEnd());

View File

@ -40,7 +40,7 @@ public:
// It does, however, peek the specified tag directly at recovery time.
LogSystemDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag, Reference<AsyncVar<std::pair<int8_t,Version>>> peekLocality, bool recover=true ) : logSystem(logSystem), tag(tag), peekLocality(peekLocality), enableRecovery(recover), recoveryLoc(1), recoveryQueueLoc(1), poppedUpTo(0), nextCommit(1), recoveryQueueDataSize(0) {
LogSystemDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag, Reference<AsyncVar<std::pair<int8_t,Version>>> peekLocality, bool recover=true ) : logSystem(logSystem), tag(tag), peekLocality(peekLocality), enableRecovery(recover), recoveryLoc(1), recoveryQueueLoc(1), poppedUpTo(0), nextCommit(1), recoveryQueueDataSize(0), peekTypeSwitches(0) {
if (enableRecovery) {
localityChanged = peekLocality ? peekLocality->onChange() : Never();
cursor = logSystem->peekSpecial( UID(), 1, tag, peekLocality ? peekLocality->get().first : tagLocalityInvalid, peekLocality ? peekLocality->get().second : invalidVersion );
@ -79,6 +79,7 @@ private:
Reference<AsyncVar<std::pair<int8_t,Version>>> peekLocality;
Future<Void> localityChanged;
Reference<ILogSystem::IPeekCursor> cursor;
int peekTypeSwitches;
Tag tag;
// Recovery state (used while readNext() is being called repeatedly)