Merge branch 'release-6.2' of github.com:apple/foundationdb into feature-share-mutations
This commit is contained in:
commit
8b09cd16b2
|
@ -18,7 +18,7 @@
|
|||
# limitations under the License.
|
||||
cmake_minimum_required(VERSION 3.12)
|
||||
project(foundationdb
|
||||
VERSION 6.2.5
|
||||
VERSION 6.2.6
|
||||
DESCRIPTION "FoundationDB is a scalable, fault-tolerant, ordered key-value store with full ACID transactions."
|
||||
HOMEPAGE_URL "http://www.foundationdb.org/"
|
||||
LANGUAGES C CXX ASM)
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
Release Notes
|
||||
#############
|
||||
|
||||
6.2.5
|
||||
6.2.6
|
||||
=====
|
||||
|
||||
Performance
|
||||
|
@ -50,6 +50,7 @@ Fixes
|
|||
* Data distribution was running at too high of a priority, which sometimes caused other roles on the same process to stall. [6.2.5] `(PR #2170) <https://github.com/apple/foundationdb/pull/2170>`_.
|
||||
* Loading a 6.1 or newer ``fdb_c`` library as a secondary client using the multi-version client could lead to an infinite recursion when run with API versions older than 610. [6.2.5] `(PR #2169) <https://github.com/apple/foundationdb/pull/2169>`_
|
||||
* Using C API functions that were removed in 6.1 when using API version 610 or above now results in a compilation error. [6.2.5] `(PR #2169) <https://github.com/apple/foundationdb/pull/2169>`_
|
||||
* Coordinator changes could fail to complete if the database wasn't allowing any transactions to start. [6.2.6] `(PR #2191) <https://github.com/apple/foundationdb/pull/2191>`_
|
||||
|
||||
Status
|
||||
------
|
||||
|
@ -124,6 +125,7 @@ Fixes only impacting 6.2.0+
|
|||
* The cluster controller could crash if a coordinator was unreachable when compiling cluster status. [6.2.4] `(PR #2065) <https://github.com/apple/foundationdb/pull/2065>`_.
|
||||
* A storage server could crash if it took longer than 10 minutes to fetch a key range from another server. [6.2.5] `(PR #2170) <https://github.com/apple/foundationdb/pull/2170>`_.
|
||||
* Excluding or including servers would restart the data distributor. [6.2.5] `(PR #2170) <https://github.com/apple/foundationdb/pull/2170>`_.
|
||||
* The data distributor could read invalid memory when estimating database size. [6.2.6] `(PR #2225) <https://github.com/apple/foundationdb/pull/2225>`_.
|
||||
|
||||
Earlier release notes
|
||||
---------------------
|
||||
|
|
|
@ -916,6 +916,7 @@ ACTOR Future<CoordinatorsResult::Type> changeQuorum( Database cx, Reference<IQuo
|
|||
try {
|
||||
tr.setOption( FDBTransactionOptions::LOCK_AWARE );
|
||||
tr.setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES );
|
||||
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
|
||||
Optional<Value> currentKey = wait( tr.get( coordinatorsKey ) );
|
||||
|
||||
if (!currentKey.present())
|
||||
|
|
|
@ -1161,11 +1161,36 @@ public:
|
|||
Optional<UID> recruitingRatekeeperID;
|
||||
AsyncVar<bool> recruitRatekeeper;
|
||||
|
||||
CounterCollection clusterControllerMetrics;
|
||||
|
||||
Counter openDatabaseRequests;
|
||||
Counter registerWorkerRequests;
|
||||
Counter getWorkersRequests;
|
||||
Counter getClientWorkersRequests;
|
||||
Counter registerMasterRequests;
|
||||
Counter getServerDBInfoRequests;
|
||||
Counter statusRequests;
|
||||
Counter failureMonitoringRequests;
|
||||
|
||||
Counter serversFailed;
|
||||
Counter serversUnfailed;
|
||||
|
||||
ClusterControllerData( ClusterControllerFullInterface const& ccInterface, LocalityData const& locality )
|
||||
: clusterControllerProcessId(locality.processId()), clusterControllerDcId(locality.dcId()),
|
||||
id(ccInterface.id()), ac(false), outstandingRequestChecker(Void()), gotProcessClasses(false),
|
||||
gotFullyRecoveredConfig(false), startTime(now()), datacenterVersionDifference(0),
|
||||
versionDifferenceUpdated(false), recruitingDistributor(false), recruitRatekeeper(false)
|
||||
versionDifferenceUpdated(false), recruitingDistributor(false), recruitRatekeeper(false),
|
||||
clusterControllerMetrics("ClusterController", id.toString()),
|
||||
openDatabaseRequests("OpenDatabaseRequests", clusterControllerMetrics),
|
||||
registerWorkerRequests("RegisterWorkerRequests", clusterControllerMetrics),
|
||||
getWorkersRequests("GetWorkersRequests", clusterControllerMetrics),
|
||||
getClientWorkersRequests("GetClientWorkersRequests", clusterControllerMetrics),
|
||||
registerMasterRequests("RegisterMasterRequests", clusterControllerMetrics),
|
||||
getServerDBInfoRequests("GetServerDBInfoRequests", clusterControllerMetrics),
|
||||
statusRequests("StatusRequests", clusterControllerMetrics),
|
||||
failureMonitoringRequests("FailureMonitoringRequests", clusterControllerMetrics),
|
||||
serversFailed("ServersFailed", clusterControllerMetrics),
|
||||
serversUnfailed("ServersUnfailed", clusterControllerMetrics)
|
||||
{
|
||||
CachedSerialization<ServerDBInfo> newInfoCache = db.serverInfo->get();
|
||||
auto& serverInfo = newInfoCache.mutate();
|
||||
|
@ -1518,7 +1543,7 @@ struct FailureStatusInfo {
|
|||
};
|
||||
|
||||
//The failure monitor client relies on the fact that the failure detection server will not declare itself failed
|
||||
ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData::DBInfo* db, FutureStream< FailureMonitoringRequest > requests ) {
|
||||
ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData* self, FutureStream< FailureMonitoringRequest > requests ) {
|
||||
state Version currentVersion = 0;
|
||||
state std::map<NetworkAddressList, FailureStatusInfo> currentStatus; // The status at currentVersion
|
||||
state std::deque<SystemFailureStatus> statusHistory; // The last change in statusHistory is from currentVersion-1 to currentVersion
|
||||
|
@ -1527,6 +1552,7 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData::
|
|||
|
||||
loop choose {
|
||||
when ( FailureMonitoringRequest req = waitNext( requests ) ) {
|
||||
++self->failureMonitoringRequests;
|
||||
if ( req.senderStatus.present() ) {
|
||||
// Update the status of requester, if necessary
|
||||
auto& stat = currentStatus[ req.addresses ];
|
||||
|
@ -1536,6 +1562,12 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData::
|
|||
|
||||
stat.insertRequest(now());
|
||||
if (req.senderStatus != stat.status) {
|
||||
if(newStat.failed) {
|
||||
++self->serversFailed;
|
||||
}
|
||||
else {
|
||||
++self->serversUnfailed;
|
||||
}
|
||||
TraceEvent("FailureDetectionStatus", uniqueID).detail("System", req.addresses.toString()).detail("Status", newStat.failed ? "Failed" : "OK").detail("Why", "Request");
|
||||
statusHistory.push_back( SystemFailureStatus( req.addresses, newStat ) );
|
||||
++currentVersion;
|
||||
|
@ -1615,7 +1647,7 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData::
|
|||
//TraceEvent("FailureDetectionPoll", uniqueID).detail("PivotDelay", pivotDelay).detail("Clients", currentStatus.size());
|
||||
//TraceEvent("FailureDetectionAcceptableDelay").detail("Delay", acceptableDelay1000);
|
||||
|
||||
bool tooManyLogGenerations = std::max(db->unfinishedRecoveries, db->logGenerations) > CLIENT_KNOBS->FAILURE_MAX_GENERATIONS;
|
||||
bool tooManyLogGenerations = std::max(self->db.unfinishedRecoveries, self->db.logGenerations) > CLIENT_KNOBS->FAILURE_MAX_GENERATIONS;
|
||||
|
||||
for(auto it = currentStatus.begin(); it != currentStatus.end(); ) {
|
||||
double delay = t - it->second.lastRequestTime;
|
||||
|
@ -1624,7 +1656,8 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData::
|
|||
( delay > pivotDelay * 2 + FLOW_KNOBS->SERVER_REQUEST_INTERVAL + CLIENT_KNOBS->FAILURE_MIN_DELAY || delay > CLIENT_KNOBS->FAILURE_MAX_DELAY ) ) ) {
|
||||
//printf("Failure Detection Server: Status of '%s' is now '%s' after %f sec\n", it->first.toString().c_str(), "Failed", now() - it->second.lastRequestTime);
|
||||
TraceEvent("FailureDetectionStatus", uniqueID).detail("System", describe(it->first)).detail("Status","Failed").detail("Why", "Timeout").detail("LastRequestAge", delay)
|
||||
.detail("PivotDelay", pivotDelay).detail("UnfinishedRecoveries", db->unfinishedRecoveries).detail("LogGenerations", db->logGenerations);
|
||||
.detail("PivotDelay", pivotDelay).detail("UnfinishedRecoveries", self->db.unfinishedRecoveries).detail("LogGenerations", self->db.logGenerations);
|
||||
++self->serversFailed;
|
||||
statusHistory.push_back( SystemFailureStatus( it->first, FailureStatus(true) ) );
|
||||
++currentVersion;
|
||||
it = currentStatus.erase(it);
|
||||
|
@ -2005,6 +2038,7 @@ ACTOR Future<Void> statusServer(FutureStream< StatusRequest> requests,
|
|||
try {
|
||||
// Wait til first request is ready
|
||||
StatusRequest req = waitNext(requests);
|
||||
++self->statusRequests;
|
||||
requests_batch.push_back(req);
|
||||
|
||||
// Earliest time at which we may begin a new request
|
||||
|
@ -2584,7 +2618,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
state uint64_t step = 0;
|
||||
state Future<ErrorOr<Void>> error = errorOr( actorCollection( self.addActor.getFuture() ) );
|
||||
|
||||
self.addActor.send( failureDetectionServer( self.id, &self.db, interf.clientInterface.failureMonitoring.getFuture() ) );
|
||||
self.addActor.send( failureDetectionServer( self.id, &self, interf.clientInterface.failureMonitoring.getFuture() ) );
|
||||
self.addActor.send( clusterWatchDatabase( &self, &self.db ) ); // Start the master database
|
||||
self.addActor.send( self.updateWorkerList.init( self.db.db ) );
|
||||
self.addActor.send( statusServer( interf.clientInterface.databaseStatus.getFuture(), &self, coordinators));
|
||||
|
@ -2598,6 +2632,8 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
self.addActor.send( handleForcedRecoveries(&self, interf) );
|
||||
self.addActor.send( monitorDataDistributor(&self) );
|
||||
self.addActor.send( monitorRatekeeper(&self) );
|
||||
self.addActor.send( traceCounters("ClusterControllerMetrics", self.id, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &self.clusterControllerMetrics, self.id.toString() + "/ClusterControllerMetrics") );
|
||||
|
||||
//printf("%s: I am the cluster controller\n", g_network->getLocalAddress().toString().c_str());
|
||||
|
||||
loop choose {
|
||||
|
@ -2613,6 +2649,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
return Void();
|
||||
}
|
||||
when( OpenDatabaseRequest req = waitNext( interf.clientInterface.openDatabase.getFuture() ) ) {
|
||||
++self.openDatabaseRequests;
|
||||
self.addActor.send(clusterOpenDatabase(&self.db, req));
|
||||
}
|
||||
when( RecruitFromConfigurationRequest req = waitNext( interf.recruitFromConfiguration.getFuture() ) ) {
|
||||
|
@ -2625,9 +2662,11 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
clusterRecruitStorage( &self, req );
|
||||
}
|
||||
when( RegisterWorkerRequest req = waitNext( interf.registerWorker.getFuture() ) ) {
|
||||
++self.registerWorkerRequests;
|
||||
registerWorker( req, &self );
|
||||
}
|
||||
when( GetWorkersRequest req = waitNext( interf.getWorkers.getFuture() ) ) {
|
||||
++self.getWorkersRequests;
|
||||
vector<WorkerDetails> workers;
|
||||
|
||||
for(auto& it : self.id_worker) {
|
||||
|
@ -2645,6 +2684,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
req.reply.send( workers );
|
||||
}
|
||||
when( GetClientWorkersRequest req = waitNext( interf.clientInterface.getClientWorkers.getFuture() ) ) {
|
||||
++self.getClientWorkersRequests;
|
||||
vector<ClientWorkerInterface> workers;
|
||||
for(auto& it : self.id_worker) {
|
||||
if (it.second.details.processClass.classType() != ProcessClass::TesterClass) {
|
||||
|
@ -2661,9 +2701,11 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
TraceEvent("CoordinationPingSent", self.id).detail("TimeStep", message.timeStep);
|
||||
}
|
||||
when( RegisterMasterRequest req = waitNext( interf.registerMaster.getFuture() ) ) {
|
||||
++self.registerMasterRequests;
|
||||
clusterRegisterMaster( &self, req );
|
||||
}
|
||||
when( GetServerDBInfoRequest req = waitNext( interf.getServerDBInfo.getFuture() ) ) {
|
||||
++self.getServerDBInfoRequests;
|
||||
self.addActor.send(
|
||||
clusterGetServerInfo(&self.db, req.knownServerInfoID, req.issues, req.incompatiblePeers, req.reply));
|
||||
}
|
||||
|
|
|
@ -189,7 +189,7 @@ public:
|
|||
int priority;
|
||||
|
||||
explicit TCTeamInfo(vector<Reference<TCServerInfo>> const& servers)
|
||||
: servers(servers), healthy(true), priority(PRIORITY_TEAM_HEALTHY), wrongConfiguration(false) {
|
||||
: servers(servers), healthy(true), priority(SERVER_KNOBS->PRIORITY_TEAM_HEALTHY), wrongConfiguration(false) {
|
||||
if (servers.empty()) {
|
||||
TraceEvent(SevInfo, "ConstructTCTeamFromEmptyServers");
|
||||
}
|
||||
|
@ -2865,25 +2865,25 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
|
|||
state int lastPriority = team->getPriority();
|
||||
if( serversLeft < self->configuration.storageTeamSize ) {
|
||||
if( serversLeft == 0 )
|
||||
team->setPriority( PRIORITY_TEAM_0_LEFT );
|
||||
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_0_LEFT );
|
||||
else if( serversLeft == 1 )
|
||||
team->setPriority( PRIORITY_TEAM_1_LEFT );
|
||||
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_1_LEFT );
|
||||
else if( serversLeft == 2 )
|
||||
team->setPriority( PRIORITY_TEAM_2_LEFT );
|
||||
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_2_LEFT );
|
||||
else
|
||||
team->setPriority( PRIORITY_TEAM_UNHEALTHY );
|
||||
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY );
|
||||
}
|
||||
else if ( badTeam || anyWrongConfiguration ) {
|
||||
if ( redundantTeam ) {
|
||||
team->setPriority( PRIORITY_TEAM_REDUNDANT );
|
||||
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT );
|
||||
} else {
|
||||
team->setPriority( PRIORITY_TEAM_UNHEALTHY );
|
||||
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY );
|
||||
}
|
||||
}
|
||||
else if( anyUndesired )
|
||||
team->setPriority( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER );
|
||||
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER );
|
||||
else
|
||||
team->setPriority( PRIORITY_TEAM_HEALTHY );
|
||||
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_HEALTHY );
|
||||
|
||||
if(lastPriority != team->getPriority()) {
|
||||
self->priority_teams[lastPriority]--;
|
||||
|
@ -2901,13 +2901,13 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
|
|||
|
||||
for(int i=0; i<shards.size(); i++) {
|
||||
int maxPriority = team->getPriority();
|
||||
if(maxPriority < PRIORITY_TEAM_0_LEFT) {
|
||||
if(maxPriority < SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) {
|
||||
auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] );
|
||||
for( int j=0; j < teams.first.size()+teams.second.size(); j++) {
|
||||
// t is the team in primary DC or the remote DC
|
||||
auto& t = j < teams.first.size() ? teams.first[j] : teams.second[j-teams.first.size()];
|
||||
if( !t.servers.size() ) {
|
||||
maxPriority = PRIORITY_TEAM_0_LEFT;
|
||||
maxPriority = SERVER_KNOBS->PRIORITY_TEAM_0_LEFT;
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -2931,8 +2931,8 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
|
|||
// false We want to differentiate the redundant_team from unhealthy_team in
|
||||
// terms of relocate priority
|
||||
maxPriority =
|
||||
std::max<int>(maxPriority, redundantTeam ? PRIORITY_TEAM_REDUNDANT
|
||||
: PRIORITY_TEAM_UNHEALTHY);
|
||||
std::max<int>(maxPriority, redundantTeam ? SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT
|
||||
: SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY);
|
||||
}
|
||||
} else {
|
||||
TEST(true); // A removed server is still associated with a team in SABTF
|
||||
|
@ -4174,9 +4174,21 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self)
|
|||
.detail( "InFlight", 0 )
|
||||
.detail( "InQueue", 0 )
|
||||
.detail( "AverageShardSize", -1 )
|
||||
.detail( "LowPriorityRelocations", 0 )
|
||||
.detail( "HighPriorityRelocations", 0 )
|
||||
.detail( "UnhealthyRelocations", 0 )
|
||||
.detail( "HighestPriority", 0 )
|
||||
.detail( "BytesWritten", 0 )
|
||||
.detail( "PriorityRecoverMove", 0 )
|
||||
.detail( "PriorityRebalanceUnderutilizedTeam", 0 )
|
||||
.detail( "PriorityRebalannceOverutilizedTeam", 0)
|
||||
.detail( "PriorityTeamHealthy", 0 )
|
||||
.detail( "PriorityTeamContainsUndesiredServer", 0 )
|
||||
.detail( "PriorityTeamRedundant", 0 )
|
||||
.detail( "PriorityMergeShard", 0 )
|
||||
.detail( "PriorityTeamUnhealthy", 0 )
|
||||
.detail( "PriorityTeam2Left", 0 )
|
||||
.detail( "PriorityTeam1Left", 0 )
|
||||
.detail( "PriorityTeam0Left", 0 )
|
||||
.detail( "PrioritySplitShard", 0 )
|
||||
.trackLatest( "MovingData" );
|
||||
|
||||
TraceEvent("TotalDataInFlight", self->ddId).detail("Primary", true).detail("TotalBytes", 0).detail("UnhealthyServers", 0).detail("HighestPriority", 0).trackLatest("TotalDataInFlight");
|
||||
|
@ -4219,7 +4231,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self)
|
|||
if (!unhealthy && configuration.usableRegions > 1) {
|
||||
unhealthy = initData->shards[shard].remoteSrc.size() != configuration.storageTeamSize;
|
||||
}
|
||||
output.send( RelocateShard( keys, unhealthy ? PRIORITY_TEAM_UNHEALTHY : PRIORITY_RECOVER_MOVE ) );
|
||||
output.send( RelocateShard( keys, unhealthy ? SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY : SERVER_KNOBS->PRIORITY_RECOVER_MOVE ) );
|
||||
}
|
||||
wait( yield(TaskPriority::DataDistribution) );
|
||||
}
|
||||
|
|
|
@ -38,33 +38,6 @@ struct RelocateShard {
|
|||
RelocateShard( KeyRange const& keys, int priority ) : keys(keys), priority(priority) {}
|
||||
};
|
||||
|
||||
// Higher priorities are executed first
|
||||
// Priority/100 is the "priority group"/"superpriority". Priority inversion
|
||||
// is possible within but not between priority groups; fewer priority groups
|
||||
// mean better worst case time bounds
|
||||
enum {
|
||||
PRIORITY_REBALANCE_SHARD = 100,
|
||||
PRIORITY_RECOVER_MOVE = 110,
|
||||
PRIORITY_REBALANCE_UNDERUTILIZED_TEAM = 120,
|
||||
PRIORITY_REBALANCE_OVERUTILIZED_TEAM = 121,
|
||||
PRIORITY_TEAM_HEALTHY = 140,
|
||||
PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER = 150,
|
||||
|
||||
// Set removing_redundant_team priority lower than merge/split_shard_priority,
|
||||
// so that removing redundant teams does not block merge/split shards.
|
||||
PRIORITY_TEAM_REDUNDANT = 200,
|
||||
|
||||
PRIORITY_MERGE_SHARD = 340,
|
||||
PRIORITY_SPLIT_SHARD = 350,
|
||||
|
||||
PRIORITY_TEAM_UNHEALTHY = 800,
|
||||
PRIORITY_TEAM_2_LEFT = 809,
|
||||
|
||||
PRIORITY_TEAM_1_LEFT = 900,
|
||||
|
||||
PRIORITY_TEAM_0_LEFT = 999
|
||||
};
|
||||
|
||||
enum {
|
||||
SOME_SHARED = 2,
|
||||
NONE_SHARED = 3
|
||||
|
|
|
@ -37,6 +37,9 @@
|
|||
struct RelocateData {
|
||||
KeyRange keys;
|
||||
int priority;
|
||||
int boundaryPriority;
|
||||
int healthPriority;
|
||||
|
||||
double startTime;
|
||||
UID randomId;
|
||||
int workFactor;
|
||||
|
@ -45,34 +48,42 @@ struct RelocateData {
|
|||
bool wantsNewServers;
|
||||
TraceInterval interval;
|
||||
|
||||
RelocateData() : startTime(-1), priority(-1), workFactor(0), wantsNewServers(false), interval("QueuedRelocation") {}
|
||||
RelocateData( RelocateShard const& rs ) : keys(rs.keys), priority(rs.priority), startTime(now()), randomId(deterministicRandom()->randomUniqueID()), workFactor(0),
|
||||
RelocateData() : startTime(-1), priority(-1), boundaryPriority(-1), healthPriority(-1), workFactor(0), wantsNewServers(false), interval("QueuedRelocation") {}
|
||||
explicit RelocateData( RelocateShard const& rs ) : keys(rs.keys), priority(rs.priority), boundaryPriority(isBoundaryPriority(rs.priority) ? rs.priority : -1), healthPriority(isHealthPriority(rs.priority) ? rs.priority : -1), startTime(now()), randomId(deterministicRandom()->randomUniqueID()), workFactor(0),
|
||||
wantsNewServers(
|
||||
rs.priority == PRIORITY_REBALANCE_SHARD ||
|
||||
rs.priority == PRIORITY_REBALANCE_OVERUTILIZED_TEAM ||
|
||||
rs.priority == PRIORITY_REBALANCE_UNDERUTILIZED_TEAM ||
|
||||
rs.priority == PRIORITY_SPLIT_SHARD ||
|
||||
rs.priority == PRIORITY_TEAM_REDUNDANT ||
|
||||
rs.priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM ||
|
||||
rs.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM ||
|
||||
rs.priority == SERVER_KNOBS->PRIORITY_SPLIT_SHARD ||
|
||||
rs.priority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT ||
|
||||
mergeWantsNewServers(rs.keys, rs.priority)), interval("QueuedRelocation") {}
|
||||
|
||||
static bool mergeWantsNewServers(KeyRangeRef keys, int priority) {
|
||||
return priority == PRIORITY_MERGE_SHARD &&
|
||||
return priority == SERVER_KNOBS->PRIORITY_MERGE_SHARD &&
|
||||
(SERVER_KNOBS->MERGE_ONTO_NEW_TEAM == 2 ||
|
||||
(SERVER_KNOBS->MERGE_ONTO_NEW_TEAM == 1 && keys.begin.startsWith(LiteralStringRef("\xff"))));
|
||||
}
|
||||
|
||||
static bool isHealthPriority(int priority) {
|
||||
return priority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY ||
|
||||
priority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT ||
|
||||
priority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT ||
|
||||
priority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT ||
|
||||
priority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT ||
|
||||
priority == SERVER_KNOBS->PRIORITY_TEAM_HEALTHY ||
|
||||
priority == SERVER_KNOBS->PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER;
|
||||
}
|
||||
|
||||
static bool isBoundaryPriority(int priority) {
|
||||
return priority == SERVER_KNOBS->PRIORITY_SPLIT_SHARD ||
|
||||
priority == SERVER_KNOBS->PRIORITY_MERGE_SHARD;
|
||||
}
|
||||
|
||||
bool operator> (const RelocateData& rhs) const {
|
||||
return priority != rhs.priority ? priority > rhs.priority : ( startTime != rhs.startTime ? startTime < rhs.startTime : randomId > rhs.randomId );
|
||||
}
|
||||
|
||||
bool operator== (const RelocateData& rhs) const {
|
||||
return priority == rhs.priority && keys == rhs.keys && startTime == rhs.startTime && workFactor == rhs.workFactor && src == rhs.src && completeSources == rhs.completeSources && wantsNewServers == rhs.wantsNewServers && randomId == rhs.randomId;
|
||||
}
|
||||
|
||||
bool changesBoundaries() {
|
||||
return priority == PRIORITY_MERGE_SHARD ||
|
||||
priority == PRIORITY_SPLIT_SHARD ||
|
||||
priority == PRIORITY_RECOVER_MOVE;
|
||||
return priority == rhs.priority && boundaryPriority == rhs.boundaryPriority && healthPriority == rhs.healthPriority && keys == rhs.keys && startTime == rhs.startTime && workFactor == rhs.workFactor && src == rhs.src && completeSources == rhs.completeSources && wantsNewServers == rhs.wantsNewServers && randomId == rhs.randomId;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -285,9 +296,9 @@ int getWorkFactor( RelocateData const& relocation ) {
|
|||
// Avoid the divide by 0!
|
||||
ASSERT( relocation.src.size() );
|
||||
|
||||
if( relocation.priority >= PRIORITY_TEAM_1_LEFT )
|
||||
if( relocation.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || relocation.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT )
|
||||
return WORK_FULL_UTILIZATION / SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
|
||||
else if( relocation.priority >= PRIORITY_TEAM_2_LEFT )
|
||||
else if( relocation.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT )
|
||||
return WORK_FULL_UTILIZATION / 2 / SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
|
||||
else // for now we assume that any message at a lower priority can best be assumed to have a full team left for work
|
||||
return WORK_FULL_UTILIZATION / relocation.src.size() / SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
|
||||
|
@ -384,26 +395,28 @@ struct DDQueueData {
|
|||
|
||||
std::map<int, int> priority_relocations;
|
||||
int unhealthyRelocations;
|
||||
void startRelocation(int priority) {
|
||||
void startRelocation(int priority, int healthPriority) {
|
||||
// Although PRIORITY_TEAM_REDUNDANT has lower priority than split and merge shard movement,
|
||||
// we must count it into unhealthyRelocations; because team removers relies on unhealthyRelocations to
|
||||
// ensure a team remover will not start before the previous one finishes removing a team and move away data
|
||||
// NOTE: split and merge shard have higher priority. If they have to wait for unhealthyRelocations = 0,
|
||||
// deadlock may happen: split/merge shard waits for unhealthyRelocations, while blocks team_redundant.
|
||||
if (priority >= PRIORITY_TEAM_UNHEALTHY || priority == PRIORITY_TEAM_REDUNDANT) {
|
||||
if (healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT ||
|
||||
healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT) {
|
||||
unhealthyRelocations++;
|
||||
rawProcessingUnhealthy->set(true);
|
||||
}
|
||||
priority_relocations[priority]++;
|
||||
}
|
||||
void finishRelocation(int priority) {
|
||||
if (priority >= PRIORITY_TEAM_UNHEALTHY || priority == PRIORITY_TEAM_REDUNDANT) {
|
||||
void finishRelocation(int priority, int healthPriority) {
|
||||
if (healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT ||
|
||||
healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT) {
|
||||
unhealthyRelocations--;
|
||||
ASSERT(unhealthyRelocations >= 0);
|
||||
if(unhealthyRelocations == 0) {
|
||||
rawProcessingUnhealthy->set(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
priority_relocations[priority]--;
|
||||
}
|
||||
|
||||
|
@ -524,7 +537,7 @@ struct DDQueueData {
|
|||
state Transaction tr(cx);
|
||||
|
||||
// FIXME: is the merge case needed
|
||||
if( input.priority == PRIORITY_MERGE_SHARD ) {
|
||||
if( input.priority == SERVER_KNOBS->PRIORITY_MERGE_SHARD ) {
|
||||
wait( delay( 0.5, decrementPriority(decrementPriority(TaskPriority::DataDistribution )) ) );
|
||||
} else {
|
||||
wait( delay( 0.0001, TaskPriority::DataDistributionLaunch ) );
|
||||
|
@ -586,10 +599,14 @@ struct DDQueueData {
|
|||
}
|
||||
|
||||
//This function cannot handle relocation requests which split a shard into three pieces
|
||||
void queueRelocation( RelocateData rd, std::set<UID> &serversToLaunchFrom ) {
|
||||
void queueRelocation( RelocateShard rs, std::set<UID> &serversToLaunchFrom ) {
|
||||
//TraceEvent("QueueRelocationBegin").detail("Begin", rd.keys.begin).detail("End", rd.keys.end);
|
||||
|
||||
// remove all items from both queues that are fully contained in the new relocation (i.e. will be overwritten)
|
||||
RelocateData rd(rs);
|
||||
bool hasHealthPriority = RelocateData::isHealthPriority( rd.priority );
|
||||
bool hasBoundaryPriority = RelocateData::isBoundaryPriority( rd.priority );
|
||||
|
||||
auto ranges = queueMap.intersectingRanges( rd.keys );
|
||||
for(auto r = ranges.begin(); r != ranges.end(); ++r ) {
|
||||
RelocateData& rrs = r->value();
|
||||
|
@ -611,9 +628,13 @@ struct DDQueueData {
|
|||
if( foundActiveFetching || foundActiveRelocation ) {
|
||||
rd.wantsNewServers |= rrs.wantsNewServers;
|
||||
rd.startTime = std::min( rd.startTime, rrs.startTime );
|
||||
if ((rrs.priority >= PRIORITY_TEAM_UNHEALTHY || rrs.priority == PRIORITY_TEAM_REDUNDANT) &&
|
||||
rd.changesBoundaries())
|
||||
rd.priority = std::max( rd.priority, rrs.priority );
|
||||
if(!hasHealthPriority) {
|
||||
rd.healthPriority = std::max(rd.healthPriority, rrs.healthPriority);
|
||||
}
|
||||
if(!hasBoundaryPriority) {
|
||||
rd.boundaryPriority = std::max(rd.boundaryPriority, rrs.boundaryPriority);
|
||||
}
|
||||
rd.priority = std::max(rd.priority, std::max(rd.boundaryPriority, rd.healthPriority));
|
||||
}
|
||||
|
||||
if( rd.keys.contains( rrs.keys ) ) {
|
||||
|
@ -631,7 +652,7 @@ struct DDQueueData {
|
|||
/*TraceEvent(rrs.interval.end(), mi.id()).detail("Result","Cancelled")
|
||||
.detail("WasFetching", foundActiveFetching).detail("Contained", rd.keys.contains( rrs.keys ));*/
|
||||
queuedRelocations--;
|
||||
finishRelocation(rrs.priority);
|
||||
finishRelocation(rrs.priority, rrs.healthPriority);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -658,7 +679,7 @@ struct DDQueueData {
|
|||
.detail("KeyBegin", rrs.keys.begin).detail("KeyEnd", rrs.keys.end)
|
||||
.detail("Priority", rrs.priority).detail("WantsNewServers", rrs.wantsNewServers);*/
|
||||
queuedRelocations++;
|
||||
startRelocation(rrs.priority);
|
||||
startRelocation(rrs.priority, rrs.healthPriority);
|
||||
|
||||
fetchingSourcesQueue.insert( rrs );
|
||||
getSourceActors.insert( rrs.keys, getSourceServersForRange( cx, rrs, fetchSourceServersComplete ) );
|
||||
|
@ -678,7 +699,7 @@ struct DDQueueData {
|
|||
.detail("KeyBegin", newData.keys.begin).detail("KeyEnd", newData.keys.end)
|
||||
.detail("Priority", newData.priority).detail("WantsNewServers", newData.wantsNewServers);*/
|
||||
queuedRelocations++;
|
||||
startRelocation(newData.priority);
|
||||
startRelocation(newData.priority, newData.healthPriority);
|
||||
foundActiveRelocation = true;
|
||||
}
|
||||
|
||||
|
@ -773,7 +794,7 @@ struct DDQueueData {
|
|||
for(auto it = intersectingInFlight.begin(); it != intersectingInFlight.end(); ++it) {
|
||||
if (fetchKeysComplete.count(it->value()) && inFlightActors.liveActorAt(it->range().begin) &&
|
||||
!rd.keys.contains(it->range()) && it->value().priority >= rd.priority &&
|
||||
rd.priority < PRIORITY_TEAM_UNHEALTHY) {
|
||||
rd.healthPriority < SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY) {
|
||||
/*TraceEvent("OverlappingInFlight", distributorId)
|
||||
.detail("KeyBegin", it->value().keys.begin)
|
||||
.detail("KeyEnd", it->value().keys.end)
|
||||
|
@ -813,7 +834,7 @@ struct DDQueueData {
|
|||
|
||||
//TraceEvent(rd.interval.end(), distributorId).detail("Result","Success");
|
||||
queuedRelocations--;
|
||||
finishRelocation(rd.priority);
|
||||
finishRelocation(rd.priority, rd.healthPriority);
|
||||
|
||||
// now we are launching: remove this entry from the queue of all the src servers
|
||||
for( int i = 0; i < rd.src.size(); i++ ) {
|
||||
|
@ -841,7 +862,7 @@ struct DDQueueData {
|
|||
|
||||
launch( rrs, busymap );
|
||||
activeRelocations++;
|
||||
startRelocation(rrs.priority);
|
||||
startRelocation(rrs.priority, rrs.healthPriority);
|
||||
inFlightActors.insert( rrs.keys, dataDistributionRelocator( this, rrs ) );
|
||||
}
|
||||
|
||||
|
@ -912,10 +933,10 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
|
|||
bestTeams.clear();
|
||||
while( tciIndex < self->teamCollections.size() ) {
|
||||
double inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_HEALTHY;
|
||||
if(rd.priority >= PRIORITY_TEAM_UNHEALTHY) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_UNHEALTHY;
|
||||
if(rd.priority >= PRIORITY_TEAM_1_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT;
|
||||
if(rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_UNHEALTHY;
|
||||
if(rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT;
|
||||
|
||||
auto req = GetTeamRequest(rd.wantsNewServers, rd.priority == PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, true, inflightPenalty);
|
||||
auto req = GetTeamRequest(rd.wantsNewServers, rd.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, true, inflightPenalty);
|
||||
req.sources = rd.src;
|
||||
req.completeSources = rd.completeSources;
|
||||
Optional<Reference<IDataDistributionTeam>> bestTeam = wait(brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req)));
|
||||
|
@ -1154,7 +1175,7 @@ ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<ID
|
|||
std::vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) );
|
||||
for( int i = 0; i < shards.size(); i++ ) {
|
||||
if( moveShard == shards[i] ) {
|
||||
TraceEvent(priority == PRIORITY_REBALANCE_OVERUTILIZED_TEAM ? "BgDDMountainChopper" : "BgDDValleyFiller", self->distributorId)
|
||||
TraceEvent(priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM ? "BgDDMountainChopper" : "BgDDValleyFiller", self->distributorId)
|
||||
.detail("SourceBytes", sourceBytes)
|
||||
.detail("DestBytes", destBytes)
|
||||
.detail("ShardBytes", metrics.bytes)
|
||||
|
@ -1197,7 +1218,7 @@ ACTOR Future<Void> BgDDMountainChopper( DDQueueData* self, int teamCollectionInd
|
|||
std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL);
|
||||
continue;
|
||||
}
|
||||
if (self->priority_relocations[PRIORITY_REBALANCE_OVERUTILIZED_TEAM] <
|
||||
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] <
|
||||
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
|
||||
state Optional<Reference<IDataDistributionTeam>> randomTeam = wait(brokenPromiseToNever(
|
||||
self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, false, true))));
|
||||
|
@ -1208,7 +1229,7 @@ ACTOR Future<Void> BgDDMountainChopper( DDQueueData* self, int teamCollectionInd
|
|||
GetTeamRequest(true, true, false))));
|
||||
if (loadedTeam.present()) {
|
||||
bool moved =
|
||||
wait(rebalanceTeams(self, PRIORITY_REBALANCE_OVERUTILIZED_TEAM, loadedTeam.get(),
|
||||
wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM, loadedTeam.get(),
|
||||
randomTeam.get(), teamCollectionIndex == 0));
|
||||
if (moved) {
|
||||
resetCount = 0;
|
||||
|
@ -1266,7 +1287,7 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
|
|||
std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL);
|
||||
continue;
|
||||
}
|
||||
if (self->priority_relocations[PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] <
|
||||
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] <
|
||||
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
|
||||
state Optional<Reference<IDataDistributionTeam>> randomTeam = wait(brokenPromiseToNever(
|
||||
self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, false, false))));
|
||||
|
@ -1276,7 +1297,7 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
|
|||
if (unloadedTeam.present()) {
|
||||
if (unloadedTeam.get()->getMinFreeSpaceRatio() > SERVER_KNOBS->FREE_SPACE_RATIO_DD_CUTOFF) {
|
||||
bool moved =
|
||||
wait(rebalanceTeams(self, PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, randomTeam.get(),
|
||||
wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, randomTeam.get(),
|
||||
unloadedTeam.get(), teamCollectionIndex == 0));
|
||||
if (moved) {
|
||||
resetCount = 0;
|
||||
|
@ -1382,7 +1403,7 @@ ACTOR Future<Void> dataDistributionQueue(
|
|||
}
|
||||
when ( RelocateData done = waitNext( self.relocationComplete.getFuture() ) ) {
|
||||
self.activeRelocations--;
|
||||
self.finishRelocation(done.priority);
|
||||
self.finishRelocation(done.priority, done.healthPriority);
|
||||
self.fetchKeysComplete.erase( done );
|
||||
//self.logRelocation( done, "ShardRelocatorDone" );
|
||||
actors.add( tag( delay(0, TaskPriority::DataDistributionLaunch), done.keys, rangesComplete ) );
|
||||
|
@ -1400,24 +1421,32 @@ ACTOR Future<Void> dataDistributionQueue(
|
|||
|
||||
recordMetrics = delay(SERVER_KNOBS->DD_QUEUE_LOGGING_INTERVAL);
|
||||
|
||||
int lowPriorityRelocations = 0, highPriorityRelocations = 0, highestPriorityRelocation = 0;
|
||||
int highestPriorityRelocation = 0;
|
||||
for( auto it = self.priority_relocations.begin(); it != self.priority_relocations.end(); ++it ) {
|
||||
if (it->second)
|
||||
if (it->second) {
|
||||
highestPriorityRelocation = std::max(highestPriorityRelocation, it->first);
|
||||
if( it->first < 200 )
|
||||
lowPriorityRelocations += it->second;
|
||||
else
|
||||
highPriorityRelocations += it->second;
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("MovingData", distributorId)
|
||||
.detail( "InFlight", self.activeRelocations )
|
||||
.detail( "InQueue", self.queuedRelocations )
|
||||
.detail( "AverageShardSize", req.getFuture().isReady() ? req.getFuture().get() : -1 )
|
||||
.detail( "LowPriorityRelocations", lowPriorityRelocations )
|
||||
.detail( "HighPriorityRelocations", highPriorityRelocations )
|
||||
.detail( "UnhealthyRelocations", self.unhealthyRelocations )
|
||||
.detail( "HighestPriority", highestPriorityRelocation )
|
||||
.detail( "BytesWritten", self.bytesWritten )
|
||||
.detail( "PriorityRecoverMove", self.priority_relocations[SERVER_KNOBS->PRIORITY_RECOVER_MOVE] )
|
||||
.detail( "PriorityRebalanceUnderutilizedTeam", self.priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] )
|
||||
.detail( "PriorityRebalannceOverutilizedTeam", self.priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] )
|
||||
.detail( "PriorityTeamHealthy", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_HEALTHY] )
|
||||
.detail( "PriorityTeamContainsUndesiredServer", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER] )
|
||||
.detail( "PriorityTeamRedundant", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT] )
|
||||
.detail( "PriorityMergeShard", self.priority_relocations[SERVER_KNOBS->PRIORITY_MERGE_SHARD] )
|
||||
.detail( "PriorityTeamUnhealthy", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY] )
|
||||
.detail( "PriorityTeam2Left", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_2_LEFT] )
|
||||
.detail( "PriorityTeam1Left", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_1_LEFT] )
|
||||
.detail( "PriorityTeam0Left", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_0_LEFT] )
|
||||
.detail( "PrioritySplitShard", self.priority_relocations[SERVER_KNOBS->PRIORITY_SPLIT_SHARD] )
|
||||
.trackLatest( "MovingData" );
|
||||
}
|
||||
when ( wait( self.error.getFuture() ) ) {} // Propagate errors from dataDistributionRelocator
|
||||
|
|
|
@ -258,7 +258,7 @@ ACTOR Future<int64_t> getFirstSize( Reference<AsyncVar<Optional<StorageMetrics>>
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> changeSizes( DataDistributionTracker* self, KeyRangeRef keys, int64_t oldShardsEndingSize ) {
|
||||
ACTOR Future<Void> changeSizes( DataDistributionTracker* self, KeyRange keys, int64_t oldShardsEndingSize ) {
|
||||
state vector<Future<int64_t>> sizes;
|
||||
state vector<Future<int64_t>> systemSizes;
|
||||
for (auto it : self->shards.intersectingRanges(keys) ) {
|
||||
|
@ -369,12 +369,12 @@ ACTOR Future<Void> shardSplitter(
|
|||
for( int i = 0; i < skipRange; i++ ) {
|
||||
KeyRangeRef r(splitKeys[i], splitKeys[i+1]);
|
||||
self->shardsAffectedByTeamFailure->defineShard( r );
|
||||
self->output.send( RelocateShard( r, PRIORITY_SPLIT_SHARD) );
|
||||
self->output.send( RelocateShard( r, SERVER_KNOBS->PRIORITY_SPLIT_SHARD) );
|
||||
}
|
||||
for( int i = numShards-1; i > skipRange; i-- ) {
|
||||
KeyRangeRef r(splitKeys[i], splitKeys[i+1]);
|
||||
self->shardsAffectedByTeamFailure->defineShard( r );
|
||||
self->output.send( RelocateShard( r, PRIORITY_SPLIT_SHARD) );
|
||||
self->output.send( RelocateShard( r, SERVER_KNOBS->PRIORITY_SPLIT_SHARD) );
|
||||
}
|
||||
|
||||
self->sizeChanges.add( changeSizes( self, keys, shardSize->get().get().bytes ) );
|
||||
|
@ -475,7 +475,7 @@ Future<Void> shardMerger(
|
|||
|
||||
restartShardTrackers( self, mergeRange, endingStats );
|
||||
self->shardsAffectedByTeamFailure->defineShard( mergeRange );
|
||||
self->output.send( RelocateShard( mergeRange, PRIORITY_MERGE_SHARD ) );
|
||||
self->output.send( RelocateShard( mergeRange, SERVER_KNOBS->PRIORITY_MERGE_SHARD ) );
|
||||
|
||||
// We are about to be cancelled by the call to restartShardTrackers
|
||||
return Void();
|
||||
|
|
|
@ -105,6 +105,19 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( INFLIGHT_PENALTY_UNHEALTHY, 10.0 );
|
||||
init( INFLIGHT_PENALTY_ONE_LEFT, 1000.0 );
|
||||
init( MERGE_ONTO_NEW_TEAM, 1 ); if( randomize && BUGGIFY ) MERGE_ONTO_NEW_TEAM = deterministicRandom()->coinflip() ? 0 : 2;
|
||||
|
||||
init( PRIORITY_RECOVER_MOVE, 110 );
|
||||
init( PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, 120 );
|
||||
init( PRIORITY_REBALANCE_OVERUTILIZED_TEAM, 121 );
|
||||
init( PRIORITY_TEAM_HEALTHY, 140 );
|
||||
init( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER, 150 );
|
||||
init( PRIORITY_TEAM_REDUNDANT, 200 );
|
||||
init( PRIORITY_MERGE_SHARD, 340 );
|
||||
init( PRIORITY_TEAM_UNHEALTHY, 700 );
|
||||
init( PRIORITY_TEAM_2_LEFT, 709 );
|
||||
init( PRIORITY_TEAM_1_LEFT, 800 );
|
||||
init( PRIORITY_TEAM_0_LEFT, 809 );
|
||||
init( PRIORITY_SPLIT_SHARD, 900 ); if( randomize && BUGGIFY ) PRIORITY_SPLIT_SHARD = 350;
|
||||
|
||||
// Data distribution
|
||||
init( RETRY_RELOCATESHARD_DELAY, 0.1 );
|
||||
|
|
|
@ -106,6 +106,24 @@ public:
|
|||
double INFLIGHT_PENALTY_ONE_LEFT;
|
||||
int MERGE_ONTO_NEW_TEAM; // Merges will request new servers. 0 for off, 1 for \xff only, 2 for all shards.
|
||||
|
||||
// Higher priorities are executed first
|
||||
// Priority/100 is the "priority group"/"superpriority". Priority inversion
|
||||
// is possible within but not between priority groups; fewer priority groups
|
||||
// mean better worst case time bounds
|
||||
// Maximum allowable priority is 999.
|
||||
int PRIORITY_RECOVER_MOVE;
|
||||
int PRIORITY_REBALANCE_UNDERUTILIZED_TEAM;
|
||||
int PRIORITY_REBALANCE_OVERUTILIZED_TEAM;
|
||||
int PRIORITY_TEAM_HEALTHY;
|
||||
int PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER;
|
||||
int PRIORITY_TEAM_REDUNDANT;
|
||||
int PRIORITY_MERGE_SHARD;
|
||||
int PRIORITY_TEAM_UNHEALTHY;
|
||||
int PRIORITY_TEAM_2_LEFT;
|
||||
int PRIORITY_TEAM_1_LEFT;
|
||||
int PRIORITY_TEAM_0_LEFT;
|
||||
int PRIORITY_SPLIT_SHARD;
|
||||
|
||||
// Data distribution
|
||||
double RETRY_RELOCATESHARD_DELAY;
|
||||
double DATA_DISTRIBUTION_FAILURE_REACTION_TIME;
|
||||
|
|
|
@ -1375,10 +1375,10 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
|
|||
bool primary = inFlight.getInt("Primary");
|
||||
int highestPriority = inFlight.getInt("HighestPriority");
|
||||
|
||||
if (movingHighestPriority < PRIORITY_TEAM_REDUNDANT) {
|
||||
if (movingHighestPriority < SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT) {
|
||||
highestPriority = movingHighestPriority;
|
||||
} else if (partitionsInFlight > 0) {
|
||||
highestPriority = std::max<int>(highestPriority, PRIORITY_MERGE_SHARD);
|
||||
highestPriority = std::max<int>(highestPriority, SERVER_KNOBS->PRIORITY_MERGE_SHARD);
|
||||
}
|
||||
|
||||
JsonBuilderObject team_tracker;
|
||||
|
@ -1387,7 +1387,7 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
|
|||
team_tracker.setKeyRawNumber("unhealthy_servers",inFlight.getValue("UnhealthyServers"));
|
||||
|
||||
JsonBuilderObject stateSectionObj;
|
||||
if (highestPriority >= PRIORITY_TEAM_0_LEFT) {
|
||||
if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) {
|
||||
stateSectionObj["healthy"] = false;
|
||||
stateSectionObj["name"] = "missing_data";
|
||||
stateSectionObj["description"] = "No replicas remain of some data";
|
||||
|
@ -1396,7 +1396,7 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
|
|||
*minReplicasRemaining = 0;
|
||||
}
|
||||
}
|
||||
else if (highestPriority >= PRIORITY_TEAM_1_LEFT) {
|
||||
else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_1_LEFT) {
|
||||
stateSectionObj["healthy"] = false;
|
||||
stateSectionObj["name"] = "healing";
|
||||
stateSectionObj["description"] = "Only one replica remains of some data";
|
||||
|
@ -1405,7 +1405,7 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
|
|||
*minReplicasRemaining = 1;
|
||||
}
|
||||
}
|
||||
else if (highestPriority >= PRIORITY_TEAM_2_LEFT) {
|
||||
else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_2_LEFT) {
|
||||
stateSectionObj["healthy"] = false;
|
||||
stateSectionObj["name"] = "healing";
|
||||
stateSectionObj["description"] = "Only two replicas remain of some data";
|
||||
|
@ -1414,26 +1414,26 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
|
|||
*minReplicasRemaining = 2;
|
||||
}
|
||||
}
|
||||
else if (highestPriority >= PRIORITY_TEAM_UNHEALTHY) {
|
||||
else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY) {
|
||||
stateSectionObj["healthy"] = false;
|
||||
stateSectionObj["name"] = "healing";
|
||||
stateSectionObj["description"] = "Restoring replication factor";
|
||||
} else if (highestPriority >= PRIORITY_MERGE_SHARD) {
|
||||
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_MERGE_SHARD) {
|
||||
stateSectionObj["healthy"] = true;
|
||||
stateSectionObj["name"] = "healthy_repartitioning";
|
||||
stateSectionObj["description"] = "Repartitioning.";
|
||||
} else if (highestPriority >= PRIORITY_TEAM_REDUNDANT) {
|
||||
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT) {
|
||||
stateSectionObj["healthy"] = true;
|
||||
stateSectionObj["name"] = "optimizing_team_collections";
|
||||
stateSectionObj["description"] = "Optimizing team collections";
|
||||
} else if (highestPriority >= PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER) {
|
||||
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER) {
|
||||
stateSectionObj["healthy"] = true;
|
||||
stateSectionObj["name"] = "healthy_removing_server";
|
||||
stateSectionObj["description"] = "Removing storage server";
|
||||
} else if (highestPriority == PRIORITY_TEAM_HEALTHY) {
|
||||
} else if (highestPriority == SERVER_KNOBS->PRIORITY_TEAM_HEALTHY) {
|
||||
stateSectionObj["healthy"] = true;
|
||||
stateSectionObj["name"] = "healthy";
|
||||
} else if (highestPriority >= PRIORITY_REBALANCE_SHARD) {
|
||||
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_RECOVER_MOVE) {
|
||||
stateSectionObj["healthy"] = true;
|
||||
stateSectionObj["name"] = "healthy_rebalancing";
|
||||
stateSectionObj["description"] = "Rebalancing";
|
||||
|
|
|
@ -44,7 +44,7 @@ struct DDMetricsWorkload : TestWorkload {
|
|||
TraceEventFields md = wait( timeoutError(masterWorker.eventLogRequest.getReply(
|
||||
EventLogRequest( LiteralStringRef( "MovingData" ) ) ), 1.0 ) );
|
||||
int relocations;
|
||||
sscanf(md.getValue("HighPriorityRelocations").c_str(), "%d", &relocations);
|
||||
sscanf(md.getValue("UnhealthyRelocations").c_str(), "%d", &relocations);
|
||||
return relocations;
|
||||
}
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
|
||||
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
|
||||
<Product Name='$(var.Title)'
|
||||
Id='{72CE09C5-30F1-44B0-9F1C-E4F494745FF2}'
|
||||
Id='{AE316E98-2C7C-4C06-8C4D-0BAD3183DEDD}'
|
||||
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
|
||||
Version='$(var.Version)'
|
||||
Manufacturer='$(var.Manufacturer)'
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
<?xml version="1.0"?>
|
||||
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
|
||||
<PropertyGroup>
|
||||
<Version>6.2.5</Version>
|
||||
<Version>6.2.6</Version>
|
||||
<PackageName>6.2</PackageName>
|
||||
</PropertyGroup>
|
||||
</Project>
|
||||
|
|
Loading…
Reference in New Issue