Status: Use new data distributor worker to publish status

After we add a new data distributor role, we publish the data
related to data distributor and rate keeper through the new
role (and new worker).

So the status needs to contact the data distributor, instead of master,
to get the status information.
This commit is contained in:
Meng Xu 2019-02-21 18:05:46 -08:00
parent 3e703dc2d1
commit 9445ac0b0c
6 changed files with 28 additions and 21 deletions

View File

@ -127,7 +127,7 @@ The following format informally describes the JSON containing the status data. T
"name": < "initializing" "name": < "initializing"
| "missing_data" | "missing_data"
| "healing" | "healing"
| "healthy_removing_redundant_teams" | "removing_redundant_teams"
| "healthy_repartitioning" | "healthy_repartitioning"
| "healthy_removing_server" | "healthy_removing_server"
| "healthy_rebalancing" | "healthy_rebalancing"

View File

@ -496,7 +496,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"initializing", "initializing",
"missing_data", "missing_data",
"healing", "healing",
"healthy_removing_redundant_teams", "removing_redundant_teams",
"healthy_repartitioning", "healthy_repartitioning",
"healthy_removing_server", "healthy_removing_server",
"healthy_rebalancing", "healthy_rebalancing",
@ -529,7 +529,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"initializing", "initializing",
"missing_data", "missing_data",
"healing", "healing",
"healthy_removing_redundant_teams", "removing_redundant_teams",
"healthy_repartitioning", "healthy_repartitioning",
"healthy_removing_server", "healthy_removing_server",
"healthy_rebalancing", "healthy_rebalancing",

View File

@ -2490,11 +2490,8 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
team->setPriority( PRIORITY_TEAM_1_LEFT ); team->setPriority( PRIORITY_TEAM_1_LEFT );
else if( serversLeft == 2 ) else if( serversLeft == 2 )
team->setPriority( PRIORITY_TEAM_2_LEFT ); team->setPriority( PRIORITY_TEAM_2_LEFT );
else if ( redundantTeam ) { else
team->setPriority( PRIORITY_TEAM_REDUNDANT ); team->setPriority( PRIORITY_TEAM_UNHEALTHY );
} else {
team->setPriority( PRIORITY_TEAM_UNHEALTHY );
}
} }
else if ( badTeam || anyWrongConfiguration ) { else if ( badTeam || anyWrongConfiguration ) {
if ( redundantTeam ) { if ( redundantTeam ) {

View File

@ -890,7 +890,6 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
bestTeams.clear(); bestTeams.clear();
while( tciIndex < self->teamCollections.size() ) { while( tciIndex < self->teamCollections.size() ) {
double inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_HEALTHY; double inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_HEALTHY;
if(rd.priority >= PRIORITY_TEAM_REDUNDANT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_REDUNDANT;
if(rd.priority >= PRIORITY_TEAM_UNHEALTHY) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_UNHEALTHY; if(rd.priority >= PRIORITY_TEAM_UNHEALTHY) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_UNHEALTHY;
if(rd.priority >= PRIORITY_TEAM_1_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT; if(rd.priority >= PRIORITY_TEAM_1_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT;

View File

@ -86,7 +86,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( BG_DD_DECREASE_RATE, 1.02 ); init( BG_DD_DECREASE_RATE, 1.02 );
init( BG_DD_SATURATION_DELAY, 1.0 ); init( BG_DD_SATURATION_DELAY, 1.0 );
init( INFLIGHT_PENALTY_HEALTHY, 1.0 ); init( INFLIGHT_PENALTY_HEALTHY, 1.0 );
init( INFLIGHT_PENALTY_REDUNDANT, 9.0 );
init( INFLIGHT_PENALTY_UNHEALTHY, 10.0 ); init( INFLIGHT_PENALTY_UNHEALTHY, 10.0 );
init( INFLIGHT_PENALTY_ONE_LEFT, 1000.0 ); init( INFLIGHT_PENALTY_ONE_LEFT, 1000.0 );

View File

@ -1131,18 +1131,18 @@ static JsonBuilderObject configurationFetcher(Optional<DatabaseConfiguration> co
return statusObj; return statusObj;
} }
ACTOR static Future<JsonBuilderObject> dataStatusFetcher(std::pair<WorkerInterface, ProcessClass> mWorker, int *minReplicasRemaining) { ACTOR static Future<JsonBuilderObject> dataStatusFetcher(std::pair<WorkerInterface, ProcessClass> ddWorker, int *minReplicasRemaining) {
state JsonBuilderObject statusObjData; state JsonBuilderObject statusObjData;
try { try {
std::vector<Future<TraceEventFields>> futures; std::vector<Future<TraceEventFields>> futures;
// TODO: Should this be serial? // TODO: Should this be serial?
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("DDTrackerStarting"))), 1.0)); futures.push_back(timeoutError(ddWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("DDTrackerStarting"))), 1.0));
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("DDTrackerStats"))), 1.0)); futures.push_back(timeoutError(ddWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("DDTrackerStats"))), 1.0));
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("MovingData"))), 1.0)); futures.push_back(timeoutError(ddWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("MovingData"))), 1.0));
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("TotalDataInFlight"))), 1.0)); futures.push_back(timeoutError(ddWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("TotalDataInFlight"))), 1.0));
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("TotalDataInFlightRemote"))), 1.0)); futures.push_back(timeoutError(ddWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("TotalDataInFlightRemote"))), 1.0));
std::vector<TraceEventFields> dataInfo = wait(getAll(futures)); std::vector<TraceEventFields> dataInfo = wait(getAll(futures));
@ -1235,7 +1235,7 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(std::pair<WorkerInterfa
} }
else if (highestPriority >= PRIORITY_TEAM_REDUNDANT) { else if (highestPriority >= PRIORITY_TEAM_REDUNDANT) {
stateSectionObj["healthy"] = true; stateSectionObj["healthy"] = true;
stateSectionObj["name"] = "healthy_removing_redundant_teams"; stateSectionObj["name"] = "removing_redundant_teams";
stateSectionObj["description"] = "Removing redundant machine teams"; stateSectionObj["description"] = "Removing redundant machine teams";
} }
else if (highestPriority >= PRIORITY_MERGE_SHARD) { else if (highestPriority >= PRIORITY_MERGE_SHARD) {
@ -1388,7 +1388,7 @@ static int getExtraTLogEligibleMachines(const vector<std::pair<WorkerInterface,
return extraTlogEligibleMachines; return extraTlogEligibleMachines;
} }
ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<struct ServerDBInfo>> db, vector<std::pair<WorkerInterface, ProcessClass>> workers, std::pair<WorkerInterface, ProcessClass> mWorker, ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<struct ServerDBInfo>> db, vector<std::pair<WorkerInterface, ProcessClass>> workers, std::pair<WorkerInterface, ProcessClass> mWorker, std::pair<WorkerInterface, ProcessClass> ddWorker,
JsonBuilderObject *qos, JsonBuilderObject *data_overlay, std::set<std::string> *incomplete_reasons, Future<ErrorOr<vector<std::pair<StorageServerInterface, EventMap>>>> storageServerFuture) JsonBuilderObject *qos, JsonBuilderObject *data_overlay, std::set<std::string> *incomplete_reasons, Future<ErrorOr<vector<std::pair<StorageServerInterface, EventMap>>>> storageServerFuture)
{ {
state JsonBuilderObject statusObj; state JsonBuilderObject statusObj;
@ -1440,7 +1440,7 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
// Transactions // Transactions
try { try {
TraceEventFields md = wait( timeoutError(mWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdate") ) ), 1.0) ); TraceEventFields md = wait( timeoutError(ddWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdate") ) ), 1.0) );
double tpsLimit = parseDouble(md.getValue("TPSLimit")); double tpsLimit = parseDouble(md.getValue("TPSLimit"));
double transPerSec = parseDouble(md.getValue("ReleasedTPS")); double transPerSec = parseDouble(md.getValue("ReleasedTPS"));
int ssCount = parseInt(md.getValue("StorageServers")); int ssCount = parseInt(md.getValue("StorageServers"));
@ -1820,6 +1820,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
state JsonBuilderArray messages; state JsonBuilderArray messages;
state std::set<std::string> status_incomplete_reasons; state std::set<std::string> status_incomplete_reasons;
state std::pair<WorkerInterface, ProcessClass> mWorker; state std::pair<WorkerInterface, ProcessClass> mWorker;
state std::pair<WorkerInterface, ProcessClass> ddWorker; // DataDistributor worker
try { try {
// Get the master Worker interface // Get the master Worker interface
@ -1829,6 +1830,17 @@ ACTOR Future<StatusReply> clusterGetStatus(
} else { } else {
messages.push_back(JsonString::makeMessage("unreachable_master_worker", "Unable to locate the master worker.")); messages.push_back(JsonString::makeMessage("unreachable_master_worker", "Unable to locate the master worker."));
} }
// Get the DataDistributor worker interface
Optional<std::pair<WorkerInterface, ProcessClass>> _ddWorker;
if (db->get().distributor.present()) {
_ddWorker = getWorker( workers, db->get().distributor.get().address() );
}
if (!db->get().distributor.present() || !_ddWorker.present()) {
messages.push_back(JsonString::makeMessage("unreachable_dataDistributor_worker", "Unable to locate the data distributor worker."));
} else {
ddWorker = _ddWorker.get();
}
// Get latest events for various event types from ALL workers // Get latest events for various event types from ALL workers
// WorkerEvents is a map of worker's NetworkAddress to its event string // WorkerEvents is a map of worker's NetworkAddress to its event string
@ -1932,8 +1944,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
state int minReplicasRemaining = -1; state int minReplicasRemaining = -1;
std::vector<Future<JsonBuilderObject>> futures2; std::vector<Future<JsonBuilderObject>> futures2;
futures2.push_back(dataStatusFetcher(mWorker, &minReplicasRemaining)); futures2.push_back(dataStatusFetcher(ddWorker, &minReplicasRemaining));
futures2.push_back(workloadStatusFetcher(db, workers, mWorker, &qos, &data_overlay, &status_incomplete_reasons, storageServerFuture)); futures2.push_back(workloadStatusFetcher(db, workers, mWorker, ddWorker, &qos, &data_overlay, &status_incomplete_reasons, storageServerFuture));
futures2.push_back(layerStatusFetcher(cx, &messages, &status_incomplete_reasons)); futures2.push_back(layerStatusFetcher(cx, &messages, &status_incomplete_reasons));
futures2.push_back(lockedStatusFetcher(db, &messages, &status_incomplete_reasons)); futures2.push_back(lockedStatusFetcher(db, &messages, &status_incomplete_reasons));