warmRange needs to get a read version occasionally to prevent it from overwhelming the proxy
quietDatabase waits for all data distribution to be completely finished so that databases are cached in a cleaner state
This commit is contained in:
parent
be643d6937
commit
645dc5ead6
|
@ -1082,13 +1082,30 @@ Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations(
|
|||
|
||||
ACTOR Future<Void> warmRange_impl( Transaction *self, Database cx, KeyRange keys ) {
|
||||
state int totalRanges = 0;
|
||||
state int totalRequests = 0;
|
||||
loop {
|
||||
vector<pair<KeyRange, Reference<LocationInfo>>> locations = wait(getKeyRangeLocations_internal(cx, keys, CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT, false, self->info));
|
||||
totalRanges += CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT;
|
||||
totalRequests++;
|
||||
if(locations.size() == 0 || totalRanges >= cx->locationCacheSize || locations[locations.size()-1].first.end >= keys.end)
|
||||
break;
|
||||
|
||||
keys = KeyRangeRef(locations[locations.size()-1].first.end, keys.end);
|
||||
|
||||
if(totalRequests%20 == 0) {
|
||||
//To avoid blocking the proxies from starting other transactions, occasionally get a read version.
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption( FDBTransactionOptions::LOCK_AWARE );
|
||||
tr.setOption( FDBTransactionOptions::CAUSAL_READ_RISKY );
|
||||
Version _ = wait( tr.getReadVersion() );
|
||||
break;
|
||||
} catch( Error &e ) {
|
||||
Void _ = wait( tr.onError(e) );
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
|
|
@ -292,7 +292,7 @@ ACTOR Future<Void> waitForQuietDatabase( Database cx, Reference<AsyncVar<ServerD
|
|||
if(g_network->isSimulated())
|
||||
Void _ = wait(delay(5.0));
|
||||
|
||||
//Require 2 consecutive successful quiet database checks spaced 1 second apart
|
||||
//Require 3 consecutive successful quiet database checks spaced 2 second apart
|
||||
state int numSuccesses = 0;
|
||||
|
||||
loop {
|
||||
|
@ -322,7 +322,7 @@ ACTOR Future<Void> waitForQuietDatabase( Database cx, Reference<AsyncVar<ServerD
|
|||
Void _ = wait( delay( 1.0 ) );
|
||||
numSuccesses = 0;
|
||||
} else {
|
||||
if(++numSuccesses == 2) {
|
||||
if(++numSuccesses == 3) {
|
||||
TraceEvent(("QuietDatabase" + phase + "Done").c_str());
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -1063,7 +1063,7 @@ ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControlle
|
|||
if(tests.empty() || useDB) {
|
||||
if(waitForQuiescenceEnd) {
|
||||
try {
|
||||
Void _ = wait( quietDatabase( cx, dbInfo, "End", 1e6, 2e6, 2e6 ) ||
|
||||
Void _ = wait( quietDatabase( cx, dbInfo, "End", 0, 2e6, 2e6 ) ||
|
||||
( databasePingDelay == 0.0 ? Never() : testDatabaseLiveness( cx, databasePingDelay, "QuietDatabaseEnd" ) ) );
|
||||
} catch( Error& e ) {
|
||||
if( e.code() != error_code_actor_cancelled )
|
||||
|
|
Loading…
Reference in New Issue