fix: clients would waste time attempting to read from a remote region when it was in the process of catching up
This commit is contained in:
parent
d55e56993d
commit
4aab9b7bc8
|
@ -1453,6 +1453,17 @@ ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version ) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Version> getRawVersion( Database cx ) {
|
||||
loop {
|
||||
choose {
|
||||
when ( wait( cx->onMasterProxiesChanged() ) ) {}
|
||||
when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE ), cx->taskID ) ) ) {
|
||||
return v.version;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> readVersionBatcher(
|
||||
DatabaseContext* cx, FutureStream<std::pair<Promise<GetReadVersionReply>, Optional<UID>>> versionStream,
|
||||
uint32_t flags);
|
||||
|
@ -2133,6 +2144,10 @@ ACTOR Future<Void> watch( Reference<Watch> watch, Database cx, Transaction *self
|
|||
return Void();
|
||||
}
|
||||
|
||||
Future<Version> Transaction::getRawReadVersion() {
|
||||
return ::getRawVersion(cx);
|
||||
}
|
||||
|
||||
Future< Void > Transaction::watch( Reference<Watch> watch ) {
|
||||
return ::watch(watch, cx, this);
|
||||
}
|
||||
|
|
|
@ -211,6 +211,7 @@ public:
|
|||
|
||||
void setVersion( Version v );
|
||||
Future<Version> getReadVersion() { return getReadVersion(0); }
|
||||
Future<Version> getRawReadVersion();
|
||||
|
||||
Future< Optional<Value> > get( const Key& key, bool snapshot = false );
|
||||
Future< Void > watch( Reference<Watch> watch );
|
||||
|
|
|
@ -473,6 +473,9 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( BYTE_SAMPLE_LOAD_DELAY, 0.0 ); if( randomize && BUGGIFY ) BYTE_SAMPLE_LOAD_DELAY = 0.1;
|
||||
init( BYTE_SAMPLE_START_DELAY, 1.0 ); if( randomize && BUGGIFY ) BYTE_SAMPLE_START_DELAY = 0.0;
|
||||
init( UPDATE_STORAGE_PROCESS_STATS_INTERVAL, 5.0 );
|
||||
init( BEHIND_CHECK_DELAY, 2.0 );
|
||||
init( BEHIND_CHECK_COUNT, 2 );
|
||||
init( BEHIND_CHECK_VERSIONS, 5 * VERSIONS_PER_SECOND );
|
||||
|
||||
//Wait Failure
|
||||
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;
|
||||
|
|
|
@ -416,6 +416,9 @@ public:
|
|||
double BYTE_SAMPLE_LOAD_DELAY;
|
||||
double BYTE_SAMPLE_START_DELAY;
|
||||
double UPDATE_STORAGE_PROCESS_STATS_INTERVAL;
|
||||
double BEHIND_CHECK_DELAY;
|
||||
int BEHIND_CHECK_COUNT;
|
||||
int64_t BEHIND_CHECK_VERSIONS;
|
||||
|
||||
//Wait Failure
|
||||
int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS;
|
||||
|
|
|
@ -451,6 +451,7 @@ public:
|
|||
bool shuttingDown;
|
||||
|
||||
bool behind;
|
||||
bool versionBehind;
|
||||
|
||||
bool debug_inApplyUpdate;
|
||||
double debug_lastValidateTime;
|
||||
|
@ -545,7 +546,7 @@ public:
|
|||
shuttingDown(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0), numWatches(0),
|
||||
logProtocol(0), counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()),
|
||||
readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")),
|
||||
behind(false), byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false),
|
||||
behind(false), versionBehind(false), byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false),
|
||||
lastUpdate(now()), poppedAllAfter(std::numeric_limits<Version>::max()), cpuUsage(0.0), diskUsage(0.0)
|
||||
{
|
||||
version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id);
|
||||
|
@ -780,7 +781,7 @@ ACTOR Future<Version> waitForVersion( StorageServer* data, Version version ) {
|
|||
else if (version <= data->version.get())
|
||||
return version;
|
||||
|
||||
if(data->behind && version > data->version.get()) {
|
||||
if((data->behind || data->versionBehind) && version > data->version.get()) {
|
||||
throw process_behind();
|
||||
}
|
||||
|
||||
|
@ -3449,6 +3450,29 @@ ACTOR Future<Void> logLongByteSampleRecovery(Future<Void> recovery) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> checkBehind( StorageServer* self ) {
|
||||
state int behindCount = 0;
|
||||
loop {
|
||||
wait( delay(SERVER_KNOBS->BEHIND_CHECK_DELAY) );
|
||||
loop {
|
||||
try {
|
||||
state Transaction tr(self->cx);
|
||||
Version readVersion = wait( tr.getRawReadVersion() );
|
||||
Version storageVersion = self->version.get();
|
||||
if( readVersion > storageVersion + SERVER_KNOBS->BEHIND_CHECK_VERSIONS ) {
|
||||
behindCount++;
|
||||
} else {
|
||||
behindCount = 0;
|
||||
}
|
||||
self->versionBehind = behindCount >= SERVER_KNOBS->BEHIND_CHECK_COUNT;
|
||||
break;
|
||||
} catch( Error &e ) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterface ssi )
|
||||
{
|
||||
state Future<Void> doUpdate = Void();
|
||||
|
@ -3465,6 +3489,7 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
|
|||
actors.add(self->otherError.getFuture());
|
||||
actors.add(metricsCore(self, ssi));
|
||||
actors.add(logLongByteSampleRecovery(self->byteSampleRecovery));
|
||||
actors.add(checkBehind(self));
|
||||
|
||||
self->coreStarted.send( Void() );
|
||||
|
||||
|
|
Loading…
Reference in New Issue