From a588710376c0701315113e866e73d1548609f07c Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Mon, 12 Aug 2019 10:08:12 -0700 Subject: [PATCH] StorageEngineSwitch:Graceful switch When fdbcli change storeType for storage engines, we switch the store type of storage servers one by one gracefully. This avoids recruiting multiple storage servers on the same process, which can cause OOM error. --- fdbclient/FDBTypes.h | 4 +- fdbclient/ManagementAPI.actor.cpp | 14 +- fdbclient/SystemData.cpp | 6 + fdbclient/SystemData.h | 2 + fdbserver/ClusterController.actor.cpp | 9 +- fdbserver/DataDistribution.actor.cpp | 386 ++++++++++++++++++---- fdbserver/DataDistributionQueue.actor.cpp | 18 +- fdbserver/Knobs.cpp | 4 + fdbserver/Knobs.h | 6 + fdbserver/MoveKeys.actor.cpp | 59 ++-- fdbserver/QuietDatabase.actor.cpp | 25 +- flow/flow.h | 1 + 12 files changed, 441 insertions(+), 93 deletions(-) diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index 690ebb9865..e4801e8a2a 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -591,7 +591,9 @@ struct Traceable : std::true_type { struct KeyValueStoreType { constexpr static FileIdentifier file_identifier = 6560359; - // These enumerated values are stored in the database configuration, so can NEVER be changed. Only add new ones just before END. + // These enumerated values are stored in the database configuration, so should NEVER be changed. + // Only add new ones just before END. + // SS storeType is END before the storageServerInterface is initialized. enum StoreType { SSD_BTREE_V1, MEMORY, diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index c1a815c50e..d979dfccf7 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -484,13 +484,20 @@ ACTOR Future changeConfig( Database cx, std::mapfirst) ); } - for(auto i=m.begin(); i!=m.end(); ++i) + for (auto i = m.begin(); i != m.end(); ++i) { + // Debug purpose + TraceEvent("ChangeConfigAPI").detail("Param1", i->first).detail("Param2", i->second); tr.set( StringRef(i->first), StringRef(i->second) ); + } tr.addReadConflictRange( singleKeyRange(moveKeysLockOwnerKey) ); tr.set( moveKeysLockOwnerKey, versionKey ); wait( tr.commit() ); + // Debug purpose + TraceEvent("ChangeConfigAPI") + .detail("NewConfig", newConfig.toString()) + .detail("OldConfig", oldConfig.toString()); break; } catch (Error& e) { state Error e1(e); @@ -1610,6 +1617,11 @@ ACTOR Future waitForFullReplication( Database cx ) { state std::vector> watchFutures; for(int i = 0; i < config.regions.size(); i++) { if( !replicasFutures[i].get().present() || decodeDatacenterReplicasValue(replicasFutures[i].get().get()) < config.storageTeamSize ) { + TraceEvent("WaitForFullReplication") + .detail("DecodedReplicas", replicasFutures[i].get().present() + ? decodeDatacenterReplicasValue(replicasFutures[i].get().get()) + : -1) + .detail("ConfigReplicas", config.storageTeamSize); watchFutures.push_back(tr.watch(datacenterReplicasKeyFor(config.regions[i].dcId))); } } diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 2658c29aa2..353744ff2d 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -363,6 +363,8 @@ ProcessClass decodeProcessClassValue( ValueRef const& value ) { const KeyRangeRef configKeys( LiteralStringRef("\xff/conf/"), LiteralStringRef("\xff/conf0") ); const KeyRef configKeysPrefix = configKeys.begin; +const KeyRef storeTypeConfig(LiteralStringRef("\xff/conf/storage_engine")); + const KeyRangeRef excludedServersKeys( LiteralStringRef("\xff/conf/excluded/"), LiteralStringRef("\xff/conf/excluded0") ); const KeyRef excludedServersPrefix = excludedServersKeys.begin; const KeyRef excludedServersVersionKey = LiteralStringRef("\xff/conf/excluded"); @@ -426,6 +428,10 @@ const KeyRef primaryLocalityPrivateKey = LiteralStringRef("\xff\xff/globals/prim const KeyRef fastLoggingEnabled = LiteralStringRef("\xff/globals/fastLoggingEnabled"); const KeyRef fastLoggingEnabledPrivateKey = LiteralStringRef("\xff\xff/globals/fastLoggingEnabled"); +// Whenever configuration changes or DD related system keyspace is changed(e.g.., serverList), +// actor must grab the moveKeysLockOwnerKey and update moveKeysLockWriteKey. +// This prevents concurrent write to the same system keyspace. +// When the owner of the DD related system keyspace changes, DD will reboot const KeyRef moveKeysLockOwnerKey = LiteralStringRef("\xff/moveKeysLock/Owner"); const KeyRef moveKeysLockWriteKey = LiteralStringRef("\xff/moveKeysLock/Write"); diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index f4bedb8f14..d893d38a9c 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -121,6 +121,8 @@ UID decodeProcessClassKeyOld( KeyRef const& key ); // "\xff/conf/[[option]]" := "value" extern const KeyRangeRef configKeys; extern const KeyRef configKeysPrefix; +// Debug purpose: Storage engine's store type +extern const KeyRef storeTypeConfig; // "\xff/conf/excluded/1.2.3.4" := "" // "\xff/conf/excluded/1.2.3.4:4000" := "" diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index c39c3e2571..8d38d24b1d 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -1362,7 +1362,10 @@ void checkOutstandingStorageRequests( ClusterControllerData* self ) { } } catch (Error& e) { if (e.code() == error_code_no_more_servers) { - TraceEvent(SevWarn, "RecruitStorageNotAvailable", self->id).error(e); + TraceEvent(SevWarn, "RecruitStorageNotAvailable", self->id) + .detail("OutstandingReq", i) + .detail("IsCriticalRecruitment", req.first.criticalRecruitment) + .error(e); } else { TraceEvent(SevError, "RecruitStorageError", self->id).error(e); throw; @@ -1655,7 +1658,9 @@ void clusterRecruitStorage( ClusterControllerData* self, RecruitStorageRequest r } catch ( Error& e ) { if (e.code() == error_code_no_more_servers) { self->outstandingStorageRequests.push_back( std::make_pair(req, now() + SERVER_KNOBS->RECRUITMENT_TIMEOUT) ); - TraceEvent(SevWarn, "RecruitStorageNotAvailable", self->id).error(e); + TraceEvent(SevWarn, "RecruitStorageNotAvailable", self->id) + .detail("IsCriticalRecruitment", req.criticalRecruitment) + .error(e); } else { TraceEvent(SevError, "RecruitStorageError", self->id).error(e); throw; // Any other error will bring down the cluster controller diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 4306cc0708..8cd617dda3 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -59,10 +59,23 @@ struct TCServerInfo : public ReferenceCounted { bool inDesiredDC; LocalityEntry localityEntry; Promise updated; + AsyncTrigger wrongStoreTypeRemoved; // wrongStoreTypeRemoved + int toRemove; // Debug purpose: 0: not remove, >0: to remove due to wrongStoreType + // A storage server's StoreType does not change. + //To change storeType for an ip:port, we destroy the old one and create a new one. + KeyValueStoreType storeType; // Storage engine type - TCServerInfo(StorageServerInterface ssi, ProcessClass processClass, bool inDesiredDC, Reference storageServerSet) : id(ssi.id()), lastKnownInterface(ssi), lastKnownClass(processClass), dataInFlightToServer(0), onInterfaceChanged(interfaceChanged.getFuture()), onRemoved(removed.getFuture()), inDesiredDC(inDesiredDC) { + TCServerInfo(StorageServerInterface ssi, ProcessClass processClass, bool inDesiredDC, + Reference storageServerSet) + : id(ssi.id()), lastKnownInterface(ssi), lastKnownClass(processClass), dataInFlightToServer(0), + onInterfaceChanged(interfaceChanged.getFuture()), onRemoved(removed.getFuture()), inDesiredDC(inDesiredDC), + storeType(KeyValueStoreType::END), toRemove(0) { localityEntry = ((LocalityMap*) storageServerSet.getPtr())->add(ssi.locality, &id); } + + bool isCorrectStoreType(KeyValueStoreType configStoreType) { + return (storeType == configStoreType || storeType == KeyValueStoreType::END); + } }; struct TCMachineInfo : public ReferenceCounted { @@ -428,7 +441,7 @@ ACTOR Future> getInitialDataDistribution( Dat for( int i = 0; i < serverList.get().size(); i++ ) { auto ssi = decodeServerListValue( serverList.get()[i].value ); - result->allServers.push_back( std::make_pair(ssi, id_data[ssi.locality.processId()].processClass) ); + result->allServers.push_back(std::make_pair(ssi, id_data[ssi.locality.processId()].processClass)); server_dc[ssi.id()] = ssi.locality.dcId(); } @@ -603,6 +616,7 @@ struct DDTeamCollection : ReferenceCounted { Future checkTeamDelay; Promise addSubsetComplete; Future badTeamRemover; + Future wrongStoreTypeRemover; Future redundantMachineTeamRemover; Future redundantServerTeamRemover; @@ -645,9 +659,10 @@ struct DDTeamCollection : ReferenceCounted { Reference> zeroHealthyTeams, bool primary, Reference> processingUnhealthy) : cx(cx), distributorId(distributorId), lock(lock), output(output), - shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), lastBuildTeamsFailed(false), teamBuilder(Void()), - badTeamRemover(Void()), redundantMachineTeamRemover(Void()), redundantServerTeamRemover(Void()), - configuration(configuration), readyToStart(readyToStart), clearHealthyZoneFuture(true), + shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), lastBuildTeamsFailed(false), + teamBuilder(Void()), badTeamRemover(Void()), wrongStoreTypeRemover(Void()), redundantMachineTeamRemover(Void()), + redundantServerTeamRemover(Void()), configuration(configuration), readyToStart(readyToStart), + clearHealthyZoneFuture(true), checkTeamDelay(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistribution)), initialFailureReactionDelay( delayed(readyToStart, SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskPriority::DataDistribution)), @@ -696,6 +711,7 @@ struct DDTeamCollection : ReferenceCounted { } ACTOR static Future interruptableBuildTeams( DDTeamCollection* self ) { + TraceEvent("DDInterruptableBuildTeamsStart", self->distributorId); if(!self->addSubsetComplete.isSet()) { wait( addSubsetOfEmergencyTeams(self) ); self->addSubsetComplete.send(Void()); @@ -712,6 +728,7 @@ struct DDTeamCollection : ReferenceCounted { } ACTOR static Future checkBuildTeams( DDTeamCollection* self ) { + TraceEvent("DDCheckBuildTeamsStart", self->distributorId); wait( self->checkTeamDelay ); while( !self->teamBuilder.isReady() ) wait( self->teamBuilder ); @@ -737,6 +754,7 @@ struct DDTeamCollection : ReferenceCounted { // shardsAffectedByTeamFailure or we could be dropping a shard on the floor (since team // tracking is "edge triggered") // SOMEDAY: Account for capacity, load (when shardMetrics load is high) + // Q: How do we enforce the above statement? // self->teams.size() can be 0 under the ConfigureTest.txt test when we change configurations // The situation happens rarely. We may want to eliminate this situation someday @@ -1245,26 +1263,31 @@ struct DDTeamCollection : ReferenceCounted { } void traceConfigInfo() { - TraceEvent("DDConfig") + TraceEvent("DDConfig", distributorId) .detail("StorageTeamSize", configuration.storageTeamSize) .detail("DesiredTeamsPerServer", SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER) - .detail("MaxTeamsPerServer", SERVER_KNOBS->MAX_TEAMS_PER_SERVER); + .detail("MaxTeamsPerServer", SERVER_KNOBS->MAX_TEAMS_PER_SERVER) + .detail("StoreType", configuration.storageServerStoreType); } void traceServerInfo() { int i = 0; - TraceEvent("ServerInfo").detail("Size", server_info.size()); + TraceEvent("ServerInfo", distributorId).detail("Size", server_info.size()); for (auto& server : server_info) { - TraceEvent("ServerInfo") + TraceEvent("ServerInfo", distributorId) .detail("ServerInfoIndex", i++) .detail("ServerID", server.first.toString()) .detail("ServerTeamOwned", server.second->teams.size()) - .detail("MachineID", server.second->machine->machineID.contents().toString()); + .detail("MachineID", server.second->machine->machineID.contents().toString()) + .detail("StoreType", server.second->storeType.toString()) + .detail("InDesiredDC", server.second->inDesiredDC) + .detail("ToRemove", server.second->toRemove); } for (auto& server : server_info) { const UID& uid = server.first; - TraceEvent("ServerStatus", uid) + TraceEvent("ServerStatus", distributorId) + .detail("ServerID", uid) .detail("Healthy", !server_status.get(uid).isUnhealthy()) .detail("MachineIsValid", server_info[uid]->machine.isValid()) .detail("MachineTeamSize", @@ -1275,9 +1298,9 @@ struct DDTeamCollection : ReferenceCounted { void traceServerTeamInfo() { int i = 0; - TraceEvent("ServerTeamInfo").detail("Size", teams.size()); + TraceEvent("ServerTeamInfo", distributorId).detail("Size", teams.size()); for (auto& team : teams) { - TraceEvent("ServerTeamInfo") + TraceEvent("ServerTeamInfo", distributorId) .detail("TeamIndex", i++) .detail("Healthy", team->isHealthy()) .detail("TeamSize", team->size()) @@ -1290,7 +1313,7 @@ struct DDTeamCollection : ReferenceCounted { TraceEvent("MachineInfo").detail("Size", machine_info.size()); for (auto& machine : machine_info) { - TraceEvent("MachineInfo") + TraceEvent("MachineInfo", distributorId) .detail("MachineInfoIndex", i++) .detail("Healthy", isMachineHealthy(machine.second)) .detail("MachineID", machine.first.contents().toString()) @@ -1303,9 +1326,9 @@ struct DDTeamCollection : ReferenceCounted { void traceMachineTeamInfo() { int i = 0; - TraceEvent("MachineTeamInfo").detail("Size", machineTeams.size()); + TraceEvent("MachineTeamInfo", distributorId).detail("Size", machineTeams.size()); for (auto& team : machineTeams) { - TraceEvent("MachineTeamInfo") + TraceEvent("MachineTeamInfo", distributorId) .detail("TeamIndex", i++) .detail("MachineIDs", team->getMachineIDsStr()) .detail("ServerTeams", team->serverTeams.size()); @@ -1326,11 +1349,11 @@ struct DDTeamCollection : ReferenceCounted { void traceMachineLocalityMap() { int i = 0; - TraceEvent("MachineLocalityMap").detail("Size", machineLocalityMap.size()); + TraceEvent("MachineLocalityMap", distributorId).detail("Size", machineLocalityMap.size()); for (auto& uid : machineLocalityMap.getObjects()) { Reference record = machineLocalityMap.getRecord(i); if (record.isValid()) { - TraceEvent("MachineLocalityMap") + TraceEvent("MachineLocalityMap", distributorId) .detail("LocalityIndex", i++) .detail("UID", uid->toString()) .detail("LocalityRecord", record->toString()); @@ -1348,7 +1371,7 @@ struct DDTeamCollection : ReferenceCounted { if (!shouldPrint) return; - TraceEvent("TraceAllInfo").detail("Primary", primary); + TraceEvent("TraceAllInfo", distributorId).detail("Primary", primary); traceConfigInfo(); traceServerInfo(); traceServerTeamInfo(); @@ -1591,7 +1614,11 @@ struct DDTeamCollection : ReferenceCounted { } } - return deterministicRandom()->randomChoice(leastUsedServers); + if (leastUsedServers.empty()) { + return Reference(); + } else { + return deterministicRandom()->randomChoice(leastUsedServers); + } } // Randomly choose one machine team that has chosenServer and has the correct size @@ -1879,9 +1906,15 @@ struct DDTeamCollection : ReferenceCounted { std::vector bestServerTeam; int bestScore = std::numeric_limits::max(); int maxAttempts = SERVER_KNOBS->BEST_OF_AMT; // BEST_OF_AMT = 4 + bool earlyQuitBuild = false; for (int i = 0; i < maxAttempts && i < 100; ++i) { // Step 2: Choose 1 least used server and then choose 1 least used machine team from the server Reference chosenServer = findOneLeastUsedServer(); + if (!chosenServer.isValid()) { + TraceEvent(SevWarn, "NoValidServer").detail("Primary", primary); + earlyQuitBuild = true; + break; + } // Note: To avoid creating correlation of picked machine teams, we simply choose a random machine team // instead of choosing the least used machine team. // The correlation happens, for example, when we add two new machines, we may always choose the machine @@ -1945,6 +1978,9 @@ struct DDTeamCollection : ReferenceCounted { } } + if (earlyQuitBuild) { + break; + } if (bestServerTeam.size() != configuration.storageTeamSize) { // Not find any team and will unlikely find a team lastBuildTeamsFailed = true; @@ -2139,6 +2175,10 @@ struct DDTeamCollection : ReferenceCounted { .detail("DoBuildTeams", self->doBuildTeams) .trackLatest("TeamCollectionInfo"); } + } else { + // Recruit more servers in the hope that we will get enough machines + TraceEvent("BuildTeam").detail("RestartRecruiting", "Because not enough machines"); + self->restartRecruiting.trigger(); } self->evaluateTeamQuality(); @@ -2165,6 +2205,7 @@ struct DDTeamCollection : ReferenceCounted { .detail("CurrentTeamCount", teams.size()) .detail("ServerCount", server_info.size()) .detail("NonFailedServerCount", desiredServerSet.size()); + traceAllInfo(true); } bool shouldHandleServer(const StorageServerInterface &newServer) { @@ -2176,6 +2217,11 @@ struct DDTeamCollection : ReferenceCounted { void addServer( StorageServerInterface newServer, ProcessClass processClass, Promise errorOut, Version addedVersion ) { if (!shouldHandleServer(newServer)) { + TraceEvent("AddedStorageServer", distributorId) + .detail("ServerID", newServer.id()) + .detail("ShouldHandleServer", 0) + .detail("ServerDCId", newServer.locality.dcId()) + .detail("IncludedDCSize", includedDCs.size()); return; } allServers.push_back( newServer.id() ); @@ -2400,7 +2446,7 @@ struct DDTeamCollection : ReferenceCounted { TraceEvent(SevInfo, "NoTeamsRemovedWhenServerRemoved") .detail("Primary", primary) .detail("Debug", "ThisShouldRarelyHappen_CheckInfoBelow"); - traceAllInfo(); + traceAllInfo(true); } // Step: Remove machine info related to removedServer @@ -2490,6 +2536,65 @@ ACTOR Future removeBadTeams(DDTeamCollection* self) { return Void(); } +bool inCorrectDC(DDTeamCollection* self, TCServerInfo* server) { + return (self->includedDCs.empty() || + std::find(self->includedDCs.begin(), self->includedDCs.end(), server->lastKnownInterface.locality.dcId()) != + self->includedDCs.end()); +} + +ACTOR Future removeWrongStoreType(DDTeamCollection* self) { + state int numServersRemoved = 0; + state std::map>::iterator server; + state vector> serversToRemove; + state int i = 0; + + loop { + wait(delay(1.0)); + TraceEvent("WrongStoreTypeRemoverStartLoop", self->distributorId) + .detail("Primary", self->primary) + .detail("ServerInfoSize", self->server_info.size()) + .detail("SysRestoreType", self->configuration.storageServerStoreType); + serversToRemove.clear(); + for (server = self->server_info.begin(); server != self->server_info.end(); ++server) { + NetworkAddress a = server->second->lastKnownInterface.address(); + AddressExclusion addr(a.ip, a.port); + TraceEvent("WrongStoreTypeRemover", self->distributorId) + .detail("DDID", self->distributorId) + .detail("Server", server->first) + .detail("Addr", addr.toString()) + .detail("StoreType", server->second->storeType) + .detail("IsCorrectStoreType", + server->second->isCorrectStoreType(self->configuration.storageServerStoreType)) + .detail("ToRemove", server->second->toRemove); + if (!server->second->isCorrectStoreType(self->configuration.storageServerStoreType) || + !inCorrectDC(self, server->second.getPtr())) { + serversToRemove.push_back(server->second); + } else { + server->second->toRemove = + 0; // In case the configuration.storeType is changed back to the server's type + } + } + + for (i = 0; i < serversToRemove.size(); i++) { + Reference s = serversToRemove[i]; + if (s.isValid()) { + s->toRemove++; // The server's location will not be excluded + s->wrongStoreTypeRemoved.trigger(); + ASSERT(s->toRemove >= 0); + wait(delay(1.0)); + } + } + + if (!serversToRemove.empty() || self->healthyTeamCount == 0) { + TraceEvent("WrongStoreTypeRemover").detail("KickTeamBuilder", "Start"); + self->restartRecruiting.trigger(); + self->doBuildTeams = true; + wait(delay(5.0)); // I have to add delay here; otherwise, it will immediately go to the next loop and print + // WrongStoreTypeRemoverStartLoop. Why?! + } + } +} + ACTOR Future machineTeamRemover(DDTeamCollection* self) { state int numMachineTeamRemoved = 0; loop { @@ -2737,6 +2842,18 @@ ACTOR Future teamTracker(DDTeamCollection* self, Reference tea team->setHealthy( healthy ); // Unhealthy teams won't be chosen by bestTeam bool optimal = team->isOptimal() && healthy; bool recheck = !healthy && (lastReady != self->initialFailureReactionDelay.isReady() || (lastZeroHealthy && !self->zeroHealthyTeams->get())); + TraceEvent("TeamHealthChangeDetected", self->distributorId) + .detail("Team", team->getDesc()) + .detail("ServersLeft", serversLeft) + .detail("LastServersLeft", lastServersLeft) + .detail("AnyUndesired", anyUndesired) + .detail("LastAnyUndesired", lastAnyUndesired) + .detail("AnyWrongConfiguration", anyWrongConfiguration) + .detail("LastWrongConfiguration", lastWrongConfiguration) + .detail("Recheck", recheck) + .detail("BadTeam", badTeam) + .detail("LastZeroHealthy", lastZeroHealthy) + .detail("ZeroHealthyTeam", self->zeroHealthyTeams->get()); lastReady = self->initialFailureReactionDelay.isReady(); lastZeroHealthy = self->zeroHealthyTeams->get(); @@ -2787,6 +2904,11 @@ ACTOR Future teamTracker(DDTeamCollection* self, Reference tea TraceEvent(SevWarn, "ZeroTeamsHealthySignalling", self->distributorId) .detail("SignallingTeam", team->getDesc()) .detail("Primary", self->primary); + self->traceAllInfo(true); + // Create a new team for safe + self->restartRecruiting.trigger(); + self->doBuildTeams = true; + self->restartTeamBuilder.trigger(); } if(logTeamEvents) { @@ -2842,7 +2964,9 @@ ACTOR Future teamTracker(DDTeamCollection* self, Reference tea for(int i=0; igetPriority(); - if(maxPriority < PRIORITY_TEAM_0_LEFT) { + // The shard split/merge and DD rebooting may make a shard mapped to multiple teams, + // so we need to recalculate the shard's priority + if (maxPriority < PRIORITY_TEAM_0_LEFT) { // Q: When will maxPriority >= PRIORITY_TEAM_0_LEFT auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] ); for( int j=0; j < teams.first.size()+teams.second.size(); j++) { // t is the team in primary DC or the remote DC @@ -2919,6 +3043,7 @@ ACTOR Future teamTracker(DDTeamCollection* self, Reference tea if( self->healthyTeamCount == 0 ) { TraceEvent(SevWarn, "ZeroTeamsHealthySignalling", self->distributorId).detail("SignallingTeam", team->getDesc()); self->zeroHealthyTeams->set(true); + self->restartRecruiting.trigger(); } } if (lastOptimal) { @@ -3004,6 +3129,11 @@ ACTOR Future>> getServerL return results; } +// Q: Why do we need this actor? +// The serverList system keyspace keeps the StorageServerInterface for each serverID. If a storage server process +// crashes and restarted at a different machine, will we reuse the StorageServerInterface? A: Storage server's storeType +// and serverID are decided by the server's filename. By parsing storage server file's filename on each disk, process on +// each machine creates the TCServer with the correct serverID and StorageServerInterface. ACTOR Future waitServerListChange( DDTeamCollection* self, FutureStream serverRemoved ) { state Future checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY); state Future>> serverListAndProcessClasses = Never(); @@ -3118,9 +3248,24 @@ ACTOR Future serverMetricsPolling( TCServerInfo *server) { //Returns the KeyValueStoreType of server if it is different from self->storeType ACTOR Future keyValueStoreTypeTracker(DDTeamCollection* self, TCServerInfo *server) { - state KeyValueStoreType type = wait(brokenPromiseToNever(server->lastKnownInterface.getKeyValueStoreType.getReplyWithTaskID(TaskPriority::DataDistribution))); - if(type == self->configuration.storageServerStoreType && (self->includedDCs.empty() || std::find(self->includedDCs.begin(), self->includedDCs.end(), server->lastKnownInterface.locality.dcId()) != self->includedDCs.end()) ) + try { + // Update server's storeType, especially when it was created + state KeyValueStoreType type = wait( + brokenPromiseToNever(server->lastKnownInterface.getKeyValueStoreType.getReplyWithTaskID( + TaskPriority::DataDistribution))); + server->storeType = type; + if (server->storeType == self->configuration.storageServerStoreType) { + server->toRemove = 0; // In case sys config is changed back to the server's storeType + } + if (server->storeType == self->configuration.storageServerStoreType && + (self->includedDCs.empty() || + std::find(self->includedDCs.begin(), self->includedDCs.end(), + server->lastKnownInterface.locality.dcId()) != self->includedDCs.end())) { + wait(Future(Never())); + } + } catch (Error& e) { wait(Future(Never())); + } return type; } @@ -3135,6 +3280,10 @@ ACTOR Future waitForAllDataRemoved( Database cx, UID serverID, Version add //we cannot remove a server immediately after adding it, because a perfectly timed master recovery could cause us to not store the mutations sent to the short lived storage server. if(ver > addedVersion + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) { bool canRemove = wait( canRemoveStorageServer( &tr, serverID ) ); + TraceEvent("WaitForAllDataRemoved") + .detail("Server", serverID) + .detail("CanRemove", canRemove) + .detail("Shards", teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID)); if (canRemove && teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID) == 0) { return Void(); } @@ -3184,8 +3333,18 @@ ACTOR Future storageServerFailureTracker(DDTeamCollection* self, TCServerI } self->server_status.set( interf.id(), *status ); - if( status->isFailed ) + TraceEvent("MXTEST") + .detail("DDID", self->distributorId) + .detail("Server", interf.id()) + .detail("Unhealthy", status->isUnhealthy()) + .detail("Status", status->toString()); + if (status->isFailed) { self->restartRecruiting.trigger(); + TraceEvent("MXTESTTriggerRestartRecruiting") + .detail("DDID", self->distributorId) + .detail("Server", interf.id()); + wait(delay(0.1)); + } Future healthChanged = Never(); if(status->isFailed) { @@ -3214,6 +3373,17 @@ ACTOR Future storageServerFailureTracker(DDTeamCollection* self, TCServerI self->healthyZone.set(Optional()); } } + // if (status->isFailed) { + // self->restartRecruiting.trigger(); + // } + // self->server_status.set( interf.id(), *status ); // Update the global server status, so that + // storageRecruiter can use the updated info for recruiting + + TraceEvent("StatusMapChange", self->distributorId) + .detail("ServerID", interf.id()) + .detail("Status", status->toString()) + .detail("Available", + IFailureMonitor::failureMonitor().getState(interf.waitFailure.getEndpoint()).isAvailable()); } when ( wait( status->isUnhealthy() ? waitForAllDataRemoved(cx, interf.id(), addedVersion, self) : Never() ) ) { break; } when ( wait( self->healthyZone.onChange() ) ) {} @@ -3226,12 +3396,9 @@ ACTOR Future storageServerFailureTracker(DDTeamCollection* self, TCServerI // Check the status of a storage server. // Apply all requirements to the server and mark it as excluded if it fails to satisfies these requirements ACTOR Future storageServerTracker( - DDTeamCollection* self, - Database cx, - TCServerInfo *server, //This actor is owned by this TCServerInfo - Promise errorOut, - Version addedVersion) -{ + DDTeamCollection* self, Database cx, + TCServerInfo* server, // This actor is owned by this TCServerInfo, point to server_info[id] + Promise errorOut, Version addedVersion) { state Future failureTracker; state ServerStatus status( false, false, server->lastKnownInterface.locality ); state bool lastIsUnhealthy = false; @@ -3239,7 +3406,8 @@ ACTOR Future storageServerTracker( state Future> interfaceChanged = server->onInterfaceChanged; state Future storeTracker = keyValueStoreTypeTracker( self, server ); - state bool hasWrongStoreTypeOrDC = false; + state bool hasWrongDC = !inCorrectDC(self, server); + state bool toRemoveWrongStoreType = false; state int targetTeamNumPerServer = (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (self->configuration.storageTeamSize + 1)) / 2; try { @@ -3247,7 +3415,9 @@ ACTOR Future storageServerTracker( status.isUndesired = false; status.isWrongConfiguration = false; - // If there is any other server on this exact NetworkAddress, this server is undesired and will eventually be eliminated + // If there is any other server on this exact NetworkAddress, this server is undesired and will eventually + // be eliminated. This samAddress checking must be redo whenever the server's state (e.g., storeType, + // dcLocation, interface) is changed. state std::vector> otherChanges; std::vector> wakeUpTrackers; for(const auto& i : self->server_info) { @@ -3263,7 +3433,12 @@ ACTOR Future storageServerTracker( .detail("OtherHealthy", !self->server_status.get( i.second->id ).isUnhealthy()); // wait for the server's ip to be changed otherChanges.push_back(self->server_status.onChange(i.second->id)); - if(!self->server_status.get( i.second->id ).isUnhealthy()) { + // ASSERT(i.first == i.second->id); //MX: TO enable the assert + // When a wrongStoreType server colocate with a correct StoreType server, we should not mark the + // correct one as unhealthy + // TODO: We should not need i.second->toRemove == 0 because this loop should be triggered after + // failureTracker returns + if (!self->server_status.get(i.second->id).isUnhealthy()) { //&& i.second->toRemove == 0 if(self->shardsAffectedByTeamFailure->getNumberOfShards(i.second->id) >= self->shardsAffectedByTeamFailure->getNumberOfShards(server->id)) { TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId) @@ -3297,11 +3472,21 @@ ACTOR Future storageServerTracker( status.isUndesired = true; } otherChanges.push_back( self->zeroOptimalTeams.onChange() ); + otherChanges.push_back(self->zeroHealthyTeams->onChange()); } //If this storage server has the wrong key-value store type, then mark it undesired so it will be replaced with a server having the correct type - if(hasWrongStoreTypeOrDC) { - TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId).detail("Server", server->id).detail("StoreType", "?"); + if (hasWrongDC) { + TraceEvent(SevWarn, "UndesiredDC", self->distributorId) + .detail("Server", server->id) + .detail("WrongDC", "?"); + status.isUndesired = true; + status.isWrongConfiguration = true; + } + if (toRemoveWrongStoreType) { // TODO: merge with the above if (hasWrongDC) + TraceEvent(SevWarn, "WrongStoreTypeToRemove", self->distributorId) + .detail("Server", server->id) + .detail("StoreType", "?"); status.isUndesired = true; status.isWrongConfiguration = true; } @@ -3321,8 +3506,7 @@ ACTOR Future storageServerTracker( failureTracker = storageServerFailureTracker(self, server, cx, &status, addedVersion); //We need to recruit new storage servers if the key value store type has changed - if(hasWrongStoreTypeOrDC) - self->restartRecruiting.trigger(); + if (hasWrongDC || toRemoveWrongStoreType) self->restartRecruiting.trigger(); if (lastIsUnhealthy && !status.isUnhealthy() && ( server->teams.size() < targetTeamNumPerServer || self->lastBuildTeamsFailed)) { @@ -3447,14 +3631,19 @@ ACTOR Future storageServerTracker( } interfaceChanged = server->onInterfaceChanged; - // We rely on the old failureTracker being actorCancelled since the old actor now has a pointer to an invalid location + // We rely on the old failureTracker being actorCancelled since the old actor now has a pointer to + // an invalid location ? + // What does this mean? Why does the old failureTracker has a pointer to an invalid location? + // MXQ: Will the status's isFailed and isUndesired field be reset at the beginning of loop?!! status = ServerStatus( status.isFailed, status.isUndesired, server->lastKnownInterface.locality ); // self->traceTeamCollectionInfo(); recordTeamCollectionInfo = true; //Restart the storeTracker for the new interface - storeTracker = keyValueStoreTypeTracker(self, server); - hasWrongStoreTypeOrDC = false; + storeTracker = keyValueStoreTypeTracker( + self, server); // hasWrongStoretype server will be delayed to be deleted. + hasWrongDC = false; + toRemoveWrongStoreType = false; self->restartTeamBuilder.trigger(); if(restartRecruiting) @@ -3471,7 +3660,14 @@ ACTOR Future storageServerTracker( TEST(true); //KeyValueStore type changed storeTracker = Never(); - hasWrongStoreTypeOrDC = true; + hasWrongDC = !inCorrectDC(self, server); + } + when(wait(server->wrongStoreTypeRemoved.onTrigger())) { + TraceEvent(SevWarn, "UndesiredStorageServerTriggered", self->distributorId) + .detail("Server", server->id) + .detail("StoreType", server->storeType) + .detail("ConfigStoreType", self->configuration.storageServerStoreType); + toRemoveWrongStoreType = true; } when( wait( server->wakeUpTracker.getFuture() ) ) { server->wakeUpTracker = Promise(); @@ -3530,8 +3726,14 @@ ACTOR Future initializeStorage( DDTeamCollection* self, RecruitStorageRepl isr.reqId = deterministicRandom()->randomUniqueID(); isr.interfaceId = interfaceId; - TraceEvent("DDRecruiting").detail("State", "Sending request to worker").detail("WorkerID", candidateWorker.worker.id()) - .detail("WorkerLocality", candidateWorker.worker.locality.toString()).detail("Interf", interfaceId).detail("Addr", candidateWorker.worker.address()); + TraceEvent("DDRecruiting") + .detail("Primary", self->primary) + .detail("State", "Sending request to worker") + .detail("WorkerID", candidateWorker.worker.id()) + .detail("WorkerLocality", candidateWorker.worker.locality.toString()) + .detail("Interf", interfaceId) + .detail("Addr", candidateWorker.worker.address()) + .detail("RecruitingStream", self->recruitingStream.get()); self->recruitingIds.insert(interfaceId); self->recruitingLocalities.insert(candidateWorker.worker.address()); @@ -3547,8 +3749,14 @@ ACTOR Future initializeStorage( DDTeamCollection* self, RecruitStorageRepl self->recruitingStream.set(self->recruitingStream.get()-1); - TraceEvent("DDRecruiting").detail("State", "Finished request").detail("WorkerID", candidateWorker.worker.id()) - .detail("WorkerLocality", candidateWorker.worker.locality.toString()).detail("Interf", interfaceId).detail("Addr", candidateWorker.worker.address()); + TraceEvent("DDRecruiting") + .detail("Primary", self->primary) + .detail("State", "Finished request") + .detail("WorkerID", candidateWorker.worker.id()) + .detail("WorkerLocality", candidateWorker.worker.locality.toString()) + .detail("Interf", interfaceId) + .detail("Addr", candidateWorker.worker.address()) + .detail("RecruitingStream", self->recruitingStream.get()); if( newServer.present() ) { if( !self->server_info.count( newServer.get().interf.id() ) ) @@ -3568,16 +3776,25 @@ ACTOR Future initializeStorage( DDTeamCollection* self, RecruitStorageRepl ACTOR Future storageRecruiter( DDTeamCollection* self, Reference> db ) { state Future fCandidateWorker; state RecruitStorageRequest lastRequest; + state bool hasHealthyTeam; + state int numRecuitSSPending = 0; + state std::map numSSPerAddr; loop { try { + numSSPerAddr.clear(); + hasHealthyTeam = (self->healthyTeamCount != 0); RecruitStorageRequest rsr; std::set exclusions; for(auto s = self->server_info.begin(); s != self->server_info.end(); ++s) { auto serverStatus = self->server_status.get( s->second->lastKnownInterface.id() ); if( serverStatus.excludeOnRecruit() ) { - TraceEvent(SevDebug, "DDRecruitExcl1").detail("Excluding", s->second->lastKnownInterface.address()); + TraceEvent(SevDebug, "DDRecruitExcl1") + .detail("Primary", self->primary) + .detail("Excluding", s->second->lastKnownInterface.address()); auto addr = s->second->lastKnownInterface.address(); - exclusions.insert( AddressExclusion( addr.ip, addr.port ) ); + AddressExclusion addrExcl(addr.ip, addr.port); + exclusions.insert(addrExcl); + numSSPerAddr[addrExcl]++; // increase from 0 } } for(auto addr : self->recruitingLocalities) { @@ -3587,7 +3804,9 @@ ACTOR Future storageRecruiter( DDTeamCollection* self, ReferenceexcludedServers.getKeys(); for(auto& s : excl) if (self->excludedServers.get(s)) { - TraceEvent(SevDebug, "DDRecruitExcl2").detail("Excluding", s.toString()); + TraceEvent(SevDebug, "DDRecruitExcl2") + .detail("Primary", self->primary) + .detail("Excluding", s.toString()); exclusions.insert( s ); } rsr.criticalRecruitment = self->healthyTeamCount == 0; @@ -3597,30 +3816,65 @@ ACTOR Future storageRecruiter( DDTeamCollection* self, ReferenceincludedDCs; - TraceEvent(rsr.criticalRecruitment ? SevWarn : SevInfo, "DDRecruiting").detail("State", "Sending request to CC") - .detail("Exclusions", rsr.excludeAddresses.size()).detail("Critical", rsr.criticalRecruitment); + TraceEvent(rsr.criticalRecruitment ? SevWarn : SevInfo, "DDRecruiting") + .detail("Primary", self->primary) + .detail("State", "Sending request to CC") + .detail("Exclusions", rsr.excludeAddresses.size()) + .detail("Critical", rsr.criticalRecruitment) + .detail("IncludedDCsSize", rsr.includeDCs.size()); if( rsr.criticalRecruitment ) { - TraceEvent(SevWarn, "DDRecruitingEmergency", self->distributorId); + TraceEvent(SevWarn, "DDRecruitingEmergency", self->distributorId).detail("Primary", self->primary); } if(!fCandidateWorker.isValid() || fCandidateWorker.isReady() || rsr.excludeAddresses != lastRequest.excludeAddresses || rsr.criticalRecruitment != lastRequest.criticalRecruitment) { + TraceEvent(rsr.criticalRecruitment ? SevWarn : SevInfo, "DDRecruiting") + .detail("Primary", self->primary) + .detail("State", "Sending rsr request to CC"); lastRequest = rsr; fCandidateWorker = brokenPromiseToNever( db->get().clusterInterface.recruitStorage.getReply( rsr, TaskPriority::DataDistribution ) ); } + TraceEvent("StorageRecruiterMX", self->distributorId) + .detail("Primary", self->primary) + .detail("HasHealthyTeam", hasHealthyTeam) + .detail("SysStoreType", self->configuration.storageServerStoreType); + self->traceAllInfo(true); + choose { when( RecruitStorageReply candidateWorker = wait( fCandidateWorker ) ) { + AddressExclusion candidateSSAddr(candidateWorker.worker.address().ip, + candidateWorker.worker.address().port); + int numExistingSS = numSSPerAddr[candidateSSAddr]; + if (numExistingSS >= 2) { + TraceEvent(SevWarnAlways, "StorageRecruiterTooManySSOnSameAddrMX", self->distributorId) + .detail("Primary", self->primary) + .detail("Addr", candidateSSAddr.toString()) + .detail("NumExistingSS", numExistingSS); + } else { + TraceEvent("DDRecruiting", self->distributorId) + .detail("Primary", self->primary) + .detail("State", "Got worker for SS") + .detail("Addr", candidateSSAddr.toString()) + .detail("NumExistingSS", numExistingSS); + } self->addActor.send(initializeStorage(self, candidateWorker)); } when( wait( db->onChange() ) ) { // SOMEDAY: only if clusterInterface changes? fCandidateWorker = Future(); } - when( wait( self->restartRecruiting.onTrigger() ) ) {} + when(wait(self->restartRecruiting.onTrigger())) { + TraceEvent("DDRecruiting", self->distributorId) + .detail("Primary", self->primary) + .detail("State", "Restart recruiting"); + } } wait( delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY) ); } catch( Error &e ) { if(e.code() != error_code_timed_out) { + TraceEvent("StorageRecruiterMXExit", self->distributorId) + .detail("Primary", self->primary) + .detail("Error", e.what()); throw; } TEST(true); //Storage recruitment timed out @@ -3637,14 +3891,20 @@ ACTOR Future updateReplicasKey(DDTeamCollection* self, Optional dcId) wait(self->initialFailureReactionDelay && waitForAll(serverUpdates)); wait(waitUntilHealthy(self)); - TraceEvent("DDUpdatingReplicas", self->distributorId).detail("DcId", dcId).detail("Replicas", self->configuration.storageTeamSize); + TraceEvent("DDUpdatingReplicas", self->distributorId) + .detail("Primary", self->primary) + .detail("DcId", dcId) + .detail("Replicas", self->configuration.storageTeamSize); state Transaction tr(self->cx); loop { try { Optional val = wait( tr.get(datacenterReplicasKeyFor(dcId)) ); state int oldReplicas = val.present() ? decodeDatacenterReplicasValue(val.get()) : 0; if(oldReplicas == self->configuration.storageTeamSize) { - TraceEvent("DDUpdatedAlready", self->distributorId).detail("DcId", dcId).detail("Replicas", self->configuration.storageTeamSize); + TraceEvent("DDUpdatedAlready", self->distributorId) + .detail("Primary", self->primary) + .detail("DcId", dcId) + .detail("Replicas", self->configuration.storageTeamSize); return Void(); } if(oldReplicas < self->configuration.storageTeamSize) { @@ -3652,7 +3912,11 @@ ACTOR Future updateReplicasKey(DDTeamCollection* self, Optional dcId) } tr.set(datacenterReplicasKeyFor(dcId), datacenterReplicasValue(self->configuration.storageTeamSize)); wait( tr.commit() ); - TraceEvent("DDUpdatedReplicas", self->distributorId).detail("DcId", dcId).detail("Replicas", self->configuration.storageTeamSize).detail("OldReplicas", oldReplicas); + TraceEvent("DDUpdatedReplicas", self->distributorId) + .detail("Primary", self->primary) + .detail("DcId", dcId) + .detail("Replicas", self->configuration.storageTeamSize) + .detail("OldReplicas", oldReplicas); return Void(); } catch( Error &e ) { wait( tr.onError(e) ); @@ -3677,6 +3941,7 @@ ACTOR Future remoteRecovered( Reference> db } ACTOR Future monitorHealthyTeams( DDTeamCollection* self ) { + TraceEvent("DDMonitorHealthyTeamsStart").detail("ZeroHealthyTeams", self->zeroHealthyTeams->get()); loop choose { when ( wait(self->zeroHealthyTeams->get() ? delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY) : Never()) ) { self->doBuildTeams = true; @@ -3722,9 +3987,18 @@ ACTOR Future dataDistributionTeamCollection( self->redundantServerTeamRemover = serverTeamRemover(self); self->addActor.send(self->redundantServerTeamRemover); } + + if (self->wrongStoreTypeRemover.isReady()) { + self->wrongStoreTypeRemover = removeWrongStoreType(self); + self->addActor.send(self->wrongStoreTypeRemover); + } + self->traceTeamCollectionInfo(); if(self->includedDCs.size()) { + for (int i = 0; i < self->includedDCs.size(); ++i) { + TraceEvent("DDTeamCollectionMXTEST").detail("IncludedDC", i).detail("DC", self->includedDCs[i]); + } //start this actor before any potential recruitments can happen self->addActor.send(updateReplicasKey(self, self->includedDCs[0])); } diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index 7ea6597c24..720b506658 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -225,6 +225,7 @@ public: (*it)->setHealthy(h); } } + virtual int getPriority() { int priority = 0; for (auto it = teams.begin(); it != teams.end(); it++) { @@ -247,6 +248,7 @@ public: } }; +// MXQ: Why do we need to count the utilization for each priority? Can a relocationShard have multiple priorities? struct Busyness { vector ledger; @@ -295,6 +297,7 @@ int getWorkFactor( RelocateData const& relocation ) { // Data movement's resource control: Do not overload source servers used for the RelocateData // return true if servers are not too busy to launch the relocation +// This ensure source servers will not be overloaded. bool canLaunch( RelocateData & relocation, int teamSize, std::map & busymap, std::vector cancellableRelocations ) { // assert this has not already been launched @@ -356,7 +359,7 @@ struct DDQueueData { int64_t bytesWritten; int teamSize; - std::map busymap; + std::map busymap; // UID is serverID KeyRangeMap< RelocateData > queueMap; std::set> fetchingSourcesQueue; @@ -365,7 +368,8 @@ struct DDQueueData { std::map>> queue; //Key UID is serverID, value is the serverID's set of RelocateData to relocate KeyRangeMap< RelocateData > inFlight; - KeyRangeActorMap inFlightActors; //Key: RelocatData, Value: Actor to move the data + // Track all actors that relocates specified keys to a good place; Key: keyRange; Value: actor + KeyRangeActorMap inFlightActors; Promise error; PromiseStream dataTransferComplete; @@ -760,6 +764,9 @@ struct DDQueueData { launchQueuedWork( combined ); } + // For each relocateData rd in the queue, check if there exist inflight relocate data whose keyrange is overlapped + // with rd. If there exist, cancel them by cancel their actors and reduce the src servers' busyness of those + // canceled inflight relocateData Launch the relocation for the rd. void launchQueuedWork( std::set> combined ) { int startedHere = 0; double startTime = now(); @@ -768,6 +775,7 @@ struct DDQueueData { for(; it != combined.end(); it++ ) { RelocateData rd( *it ); + // Check if there is an inflight shard that is overlapped with the queued relocateShard (rd) bool overlappingInFlight = false; auto intersectingInFlight = inFlight.intersectingRanges( rd.keys ); for(auto it = intersectingInFlight.begin(); it != intersectingInFlight.end(); ++it) { @@ -788,6 +796,7 @@ struct DDQueueData { continue; } + // MXQ: What does the if mean in the following comment? // Because the busyness of a server is decreased when a superseding relocation is issued, we // need to consider what the busyness of a server WOULD be if auto containedRanges = inFlight.containedRanges( rd.keys ); @@ -798,8 +807,10 @@ struct DDQueueData { } } + // MXQ: I don't understand the SOMEDAY and FIXME statement // Data movement avoids overloading source servers in moving data. - // SOMEDAY: the list of source servers may be outdated since they were fetched when the work was put in the queue + // SOMEDAY: the list of source servers may be outdated since they were fetched when the work was put in the + // queue // FIXME: we need spare capacity even when we're just going to be cancelling work via TEAM_HEALTHY if( !canLaunch( rd, teamSize, busymap, cancellableRelocations ) ) { //logRelocation( rd, "SkippingQueuedRelocation" ); @@ -842,6 +853,7 @@ struct DDQueueData { launch( rrs, busymap ); activeRelocations++; startRelocation(rrs.priority); + // Start the actor that relocates data in the rrs.keys inFlightActors.insert( rrs.keys, dataDistributionRelocator( this, rrs ) ); } diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index b71ed41227..3421e0b70b 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -196,6 +196,10 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( TR_REMOVE_SERVER_TEAM_DELAY, 60.0 ); if( randomize && BUGGIFY ) TR_REMOVE_SERVER_TEAM_DELAY = deterministicRandom()->random01() * 60.0; init( TR_REMOVE_SERVER_TEAM_EXTRA_DELAY, 5.0 ); if( randomize && BUGGIFY ) TR_REMOVE_SERVER_TEAM_EXTRA_DELAY = deterministicRandom()->random01() * 10.0; + init( STR_NUM_SERVERS_REMOVED_ONCE, 1 ); if( randomize && BUGGIFY ) STR_NUM_SERVERS_REMOVED_ONCE = deterministicRandom()->random01() * 100.0; + init( STR_REMOVE_STORE_ENGINE_TIMEOUT, 60.0 ); if( randomize && BUGGIFY ) STR_REMOVE_STORE_ENGINE_TIMEOUT = deterministicRandom()->random01() * 60.0; + init( STR_REMOVE_STORE_ENGINE_DELAY, 60.0); if( randomize && BUGGIFY ) STR_REMOVE_STORE_ENGINE_DELAY = deterministicRandom()->random01() * 60.0; + // Redwood Storage Engine init( PREFIX_TREE_IMMEDIATE_KEY_SIZE_LIMIT, 30 ); init( PREFIX_TREE_IMMEDIATE_KEY_SIZE_MIN, 0 ); diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index 39d9abc85e..8865edaccd 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -155,6 +155,12 @@ public: double TR_REMOVE_SERVER_TEAM_DELAY; // wait for the specified time before try to remove next server team double TR_REMOVE_SERVER_TEAM_EXTRA_DELAY; // serverTeamRemover waits for the delay and check DD healthyness again to ensure it runs after machineTeamRemover + // WrongStoreTypeRemover to remove wrong storage engines + int STR_NUM_SERVERS_REMOVED_ONCE; // The number of servers with wrong storage engines to remove + double STR_REMOVE_STORE_ENGINE_TIMEOUT; // wait for at most timeout time before remove next batch of wrong stroage + // engines + double STR_REMOVE_STORE_ENGINE_DELAY; // wait for the specified time before remove the next batch + double DD_FAILURE_TIME; double DD_ZERO_HEALTHY_TEAM_DELAY; diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 09d89ef827..286204adda 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -258,6 +258,11 @@ ACTOR Future>> additionalSources(Standalone s return result; } +// keyServer: map from keys to destination servers +// serverKeys: two-dimension map: [servers][keys], value is the servers' state of having the keys: active(not-have), +// complete(already has), ""() MXQ: What does serverKeys[dest][keys] mean? It seems having the same meaning with +// serverKeys[servers][keys]? + // Set keyServers[keys].dest = servers // Set serverKeys[servers][keys] = active for each subrange of keys that the server did not already have, complete for each subrange that it already has // Set serverKeys[dest][keys] = "" for the dest servers of each existing shard in keys (unless that destination is a member of servers OR if the source list is sufficiently degraded) @@ -276,8 +281,9 @@ ACTOR Future startMoveKeys( Database occ, KeyRange keys, vector serve state int shards = 0; state int maxRetries = 0; - //This process can be split up into multiple transactions if there are too many existing overlapping shards - //In that case, each iteration of this loop will have begin set to the end of the last processed shard + // If it's multiple transaction, how do we achieve atomicity? + // This process can be split up into multiple transactions if there are too many existing overlapping shards + // In that case, each iteration of this loop will have begin set to the end of the last processed shard while(begin < keys.end) { TEST(begin > keys.begin); //Multi-transactional startMoveKeys batches++; @@ -307,8 +313,9 @@ ACTOR Future startMoveKeys( Database occ, KeyRange keys, vector serve for(int s=0; s startMoveKeys( Database occ, KeyRange keys, vector serve state Key endKey = old.end()[-1].key; currentKeys = KeyRangeRef(currentKeys.begin, endKey); - /*TraceEvent("StartMoveKeysBatch", relocationIntervalId) - .detail("KeyBegin", currentKeys.begin.c_str()) - .detail("KeyEnd", currentKeys.end.c_str());*/ + TraceEvent("StartMoveKeysBatch", relocationIntervalId) + .detail("KeyBegin", currentKeys.begin.toString()) + .detail("KeyEnd", currentKeys.end.toString()); - //printf("Moving '%s'-'%s' (%d) to %d servers\n", keys.begin.toString().c_str(), keys.end.toString().c_str(), old.size(), servers.size()); - //for(int i=0; i> addAsSource = wait(additionalSources(old, &tr, servers.size(), SERVER_KNOBS->MAX_ADDED_SOURCES_MULTIPLIER*servers.size())); @@ -340,12 +347,12 @@ ACTOR Future startMoveKeys( Database occ, KeyRange keys, vector serve vector dest; decodeKeyServersValue( old[i].value, src, dest ); - /*TraceEvent("StartMoveKeysOldRange", relocationIntervalId) - .detail("KeyBegin", rangeIntersectKeys.begin.c_str()) - .detail("KeyEnd", rangeIntersectKeys.end.c_str()) - .detail("OldSrc", describe(src)) - .detail("OldDest", describe(dest)) - .detail("ReadVersion", tr.getReadVersion().get());*/ + TraceEvent("StartMoveKeysOldRange", relocationIntervalId) + .detail("KeyBegin", rangeIntersectKeys.begin.toString()) + .detail("KeyEnd", rangeIntersectKeys.end.toString()) + .detail("OldSrc", describe(src)) + .detail("OldDest", describe(dest)) + .detail("ReadVersion", tr.getReadVersion().get()); for(auto& uid : addAsSource[i]) { src.push_back(uid); @@ -358,15 +365,13 @@ ACTOR Future startMoveKeys( Database occ, KeyRange keys, vector serve //Track old destination servers. They may be removed from serverKeys soon, since they are about to be overwritten in keyServers for(auto s = dest.begin(); s != dest.end(); ++s) { oldDests.insert(*s); - /*TraceEvent("StartMoveKeysOldDestAdd", relocationIntervalId) - .detail("Server", s->id());*/ + TraceEvent("StartMoveKeysOldDestAdd", relocationIntervalId).detail("Server", *s); } //Keep track of src shards so that we can preserve their values when we overwrite serverKeys for(auto& uid : src) { shardMap[uid].push_back(old.arena(), rangeIntersectKeys); - /*TraceEvent("StartMoveKeysShardMapAdd", relocationIntervalId) - .detail("Server", *s);*/ + TraceEvent("StartMoveKeysShardMapAdd", relocationIntervalId).detail("Server", uid); } } @@ -819,9 +824,9 @@ ACTOR Future> addStorageServer( Database cx, StorageServ } } } - +// A SS can be removed only if all data (shards) on the SS have been moved away from the SS. ACTOR Future canRemoveStorageServer( Transaction* tr, UID serverID ) { - Standalone keys = wait( krmGetRanges( tr, serverKeysPrefixFor(serverID), allKeys, 2 ) ); + state Standalone keys = wait(krmGetRanges(tr, serverKeysPrefixFor(serverID), allKeys, 2)); ASSERT(keys.size() >= 2); @@ -830,6 +835,16 @@ ACTOR Future canRemoveStorageServer( Transaction* tr, UID serverID ) { ASSERT(false); } + // DEBUG purpose + if (!(keys[0].value == serverKeysFalse && keys[1].key == allKeys.end)) { + Standalone allKeys = + wait(krmGetRanges(tr, serverKeysPrefixFor(serverID), allKeys, CLIENT_KNOBS->TOO_MANY)); + TraceEvent("CanNOTRemove").detail("KeysNum", allKeys.size()); + for (auto& k : allKeys) { + TraceEvent("CanNOTRemove").detail("Key", k.key).detail("Value", k.value); + } + } + //Return true if the entire range is false. Since these values are coalesced, we can return false if there is more than one result return keys[0].value == serverKeysFalse && keys[1].key == allKeys.end; } diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index 7a2a5560e9..e5708f4d78 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -305,6 +305,9 @@ ACTOR Future getMaxStorageServerQueueSize( Database cx, Reference waitForQuietDatabase( Database cx, Reference dataInFlightGate || tLogQueueInfo.get().first > maxTLogQueueGate || tLogQueueInfo.get().second > maxPoppedVersionLag || dataDistributionQueueSize.get() > maxDataDistributionQueueSize || diff --git a/flow/flow.h b/flow/flow.h index 581255e96f..348e234dcc 100644 --- a/flow/flow.h +++ b/flow/flow.h @@ -306,6 +306,7 @@ struct SingleCallback { } }; +// SAV is short for Single Assigment Variable: It can be assigned for only once! template struct SAV : private Callback, FastAllocated> { int promises; // one for each promise (and one for an active actor if this is an actor)