diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index 690ebb9865..e4801e8a2a 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -591,7 +591,9 @@ struct Traceable : std::true_type { struct KeyValueStoreType { constexpr static FileIdentifier file_identifier = 6560359; - // These enumerated values are stored in the database configuration, so can NEVER be changed. Only add new ones just before END. + // These enumerated values are stored in the database configuration, so should NEVER be changed. + // Only add new ones just before END. + // SS storeType is END before the storageServerInterface is initialized. enum StoreType { SSD_BTREE_V1, MEMORY, diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index c1a815c50e..d979dfccf7 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -484,13 +484,20 @@ ACTOR Future changeConfig( Database cx, std::mapfirst) ); } - for(auto i=m.begin(); i!=m.end(); ++i) + for (auto i = m.begin(); i != m.end(); ++i) { + // Debug purpose + TraceEvent("ChangeConfigAPI").detail("Param1", i->first).detail("Param2", i->second); tr.set( StringRef(i->first), StringRef(i->second) ); + } tr.addReadConflictRange( singleKeyRange(moveKeysLockOwnerKey) ); tr.set( moveKeysLockOwnerKey, versionKey ); wait( tr.commit() ); + // Debug purpose + TraceEvent("ChangeConfigAPI") + .detail("NewConfig", newConfig.toString()) + .detail("OldConfig", oldConfig.toString()); break; } catch (Error& e) { state Error e1(e); @@ -1610,6 +1617,11 @@ ACTOR Future waitForFullReplication( Database cx ) { state std::vector> watchFutures; for(int i = 0; i < config.regions.size(); i++) { if( !replicasFutures[i].get().present() || decodeDatacenterReplicasValue(replicasFutures[i].get().get()) < config.storageTeamSize ) { + TraceEvent("WaitForFullReplication") + .detail("DecodedReplicas", replicasFutures[i].get().present() + ? decodeDatacenterReplicasValue(replicasFutures[i].get().get()) + : -1) + .detail("ConfigReplicas", config.storageTeamSize); watchFutures.push_back(tr.watch(datacenterReplicasKeyFor(config.regions[i].dcId))); } } diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 2658c29aa2..353744ff2d 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -363,6 +363,8 @@ ProcessClass decodeProcessClassValue( ValueRef const& value ) { const KeyRangeRef configKeys( LiteralStringRef("\xff/conf/"), LiteralStringRef("\xff/conf0") ); const KeyRef configKeysPrefix = configKeys.begin; +const KeyRef storeTypeConfig(LiteralStringRef("\xff/conf/storage_engine")); + const KeyRangeRef excludedServersKeys( LiteralStringRef("\xff/conf/excluded/"), LiteralStringRef("\xff/conf/excluded0") ); const KeyRef excludedServersPrefix = excludedServersKeys.begin; const KeyRef excludedServersVersionKey = LiteralStringRef("\xff/conf/excluded"); @@ -426,6 +428,10 @@ const KeyRef primaryLocalityPrivateKey = LiteralStringRef("\xff\xff/globals/prim const KeyRef fastLoggingEnabled = LiteralStringRef("\xff/globals/fastLoggingEnabled"); const KeyRef fastLoggingEnabledPrivateKey = LiteralStringRef("\xff\xff/globals/fastLoggingEnabled"); +// Whenever configuration changes or DD related system keyspace is changed(e.g.., serverList), +// actor must grab the moveKeysLockOwnerKey and update moveKeysLockWriteKey. +// This prevents concurrent write to the same system keyspace. +// When the owner of the DD related system keyspace changes, DD will reboot const KeyRef moveKeysLockOwnerKey = LiteralStringRef("\xff/moveKeysLock/Owner"); const KeyRef moveKeysLockWriteKey = LiteralStringRef("\xff/moveKeysLock/Write"); diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index f4bedb8f14..d893d38a9c 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -121,6 +121,8 @@ UID decodeProcessClassKeyOld( KeyRef const& key ); // "\xff/conf/[[option]]" := "value" extern const KeyRangeRef configKeys; extern const KeyRef configKeysPrefix; +// Debug purpose: Storage engine's store type +extern const KeyRef storeTypeConfig; // "\xff/conf/excluded/1.2.3.4" := "" // "\xff/conf/excluded/1.2.3.4:4000" := "" diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index c39c3e2571..8d38d24b1d 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -1362,7 +1362,10 @@ void checkOutstandingStorageRequests( ClusterControllerData* self ) { } } catch (Error& e) { if (e.code() == error_code_no_more_servers) { - TraceEvent(SevWarn, "RecruitStorageNotAvailable", self->id).error(e); + TraceEvent(SevWarn, "RecruitStorageNotAvailable", self->id) + .detail("OutstandingReq", i) + .detail("IsCriticalRecruitment", req.first.criticalRecruitment) + .error(e); } else { TraceEvent(SevError, "RecruitStorageError", self->id).error(e); throw; @@ -1655,7 +1658,9 @@ void clusterRecruitStorage( ClusterControllerData* self, RecruitStorageRequest r } catch ( Error& e ) { if (e.code() == error_code_no_more_servers) { self->outstandingStorageRequests.push_back( std::make_pair(req, now() + SERVER_KNOBS->RECRUITMENT_TIMEOUT) ); - TraceEvent(SevWarn, "RecruitStorageNotAvailable", self->id).error(e); + TraceEvent(SevWarn, "RecruitStorageNotAvailable", self->id) + .detail("IsCriticalRecruitment", req.criticalRecruitment) + .error(e); } else { TraceEvent(SevError, "RecruitStorageError", self->id).error(e); throw; // Any other error will bring down the cluster controller diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 4306cc0708..8cd617dda3 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -59,10 +59,23 @@ struct TCServerInfo : public ReferenceCounted { bool inDesiredDC; LocalityEntry localityEntry; Promise updated; + AsyncTrigger wrongStoreTypeRemoved; // wrongStoreTypeRemoved + int toRemove; // Debug purpose: 0: not remove, >0: to remove due to wrongStoreType + // A storage server's StoreType does not change. + //To change storeType for an ip:port, we destroy the old one and create a new one. + KeyValueStoreType storeType; // Storage engine type - TCServerInfo(StorageServerInterface ssi, ProcessClass processClass, bool inDesiredDC, Reference storageServerSet) : id(ssi.id()), lastKnownInterface(ssi), lastKnownClass(processClass), dataInFlightToServer(0), onInterfaceChanged(interfaceChanged.getFuture()), onRemoved(removed.getFuture()), inDesiredDC(inDesiredDC) { + TCServerInfo(StorageServerInterface ssi, ProcessClass processClass, bool inDesiredDC, + Reference storageServerSet) + : id(ssi.id()), lastKnownInterface(ssi), lastKnownClass(processClass), dataInFlightToServer(0), + onInterfaceChanged(interfaceChanged.getFuture()), onRemoved(removed.getFuture()), inDesiredDC(inDesiredDC), + storeType(KeyValueStoreType::END), toRemove(0) { localityEntry = ((LocalityMap*) storageServerSet.getPtr())->add(ssi.locality, &id); } + + bool isCorrectStoreType(KeyValueStoreType configStoreType) { + return (storeType == configStoreType || storeType == KeyValueStoreType::END); + } }; struct TCMachineInfo : public ReferenceCounted { @@ -428,7 +441,7 @@ ACTOR Future> getInitialDataDistribution( Dat for( int i = 0; i < serverList.get().size(); i++ ) { auto ssi = decodeServerListValue( serverList.get()[i].value ); - result->allServers.push_back( std::make_pair(ssi, id_data[ssi.locality.processId()].processClass) ); + result->allServers.push_back(std::make_pair(ssi, id_data[ssi.locality.processId()].processClass)); server_dc[ssi.id()] = ssi.locality.dcId(); } @@ -603,6 +616,7 @@ struct DDTeamCollection : ReferenceCounted { Future checkTeamDelay; Promise addSubsetComplete; Future badTeamRemover; + Future wrongStoreTypeRemover; Future redundantMachineTeamRemover; Future redundantServerTeamRemover; @@ -645,9 +659,10 @@ struct DDTeamCollection : ReferenceCounted { Reference> zeroHealthyTeams, bool primary, Reference> processingUnhealthy) : cx(cx), distributorId(distributorId), lock(lock), output(output), - shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), lastBuildTeamsFailed(false), teamBuilder(Void()), - badTeamRemover(Void()), redundantMachineTeamRemover(Void()), redundantServerTeamRemover(Void()), - configuration(configuration), readyToStart(readyToStart), clearHealthyZoneFuture(true), + shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), lastBuildTeamsFailed(false), + teamBuilder(Void()), badTeamRemover(Void()), wrongStoreTypeRemover(Void()), redundantMachineTeamRemover(Void()), + redundantServerTeamRemover(Void()), configuration(configuration), readyToStart(readyToStart), + clearHealthyZoneFuture(true), checkTeamDelay(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistribution)), initialFailureReactionDelay( delayed(readyToStart, SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskPriority::DataDistribution)), @@ -696,6 +711,7 @@ struct DDTeamCollection : ReferenceCounted { } ACTOR static Future interruptableBuildTeams( DDTeamCollection* self ) { + TraceEvent("DDInterruptableBuildTeamsStart", self->distributorId); if(!self->addSubsetComplete.isSet()) { wait( addSubsetOfEmergencyTeams(self) ); self->addSubsetComplete.send(Void()); @@ -712,6 +728,7 @@ struct DDTeamCollection : ReferenceCounted { } ACTOR static Future checkBuildTeams( DDTeamCollection* self ) { + TraceEvent("DDCheckBuildTeamsStart", self->distributorId); wait( self->checkTeamDelay ); while( !self->teamBuilder.isReady() ) wait( self->teamBuilder ); @@ -737,6 +754,7 @@ struct DDTeamCollection : ReferenceCounted { // shardsAffectedByTeamFailure or we could be dropping a shard on the floor (since team // tracking is "edge triggered") // SOMEDAY: Account for capacity, load (when shardMetrics load is high) + // Q: How do we enforce the above statement? // self->teams.size() can be 0 under the ConfigureTest.txt test when we change configurations // The situation happens rarely. We may want to eliminate this situation someday @@ -1245,26 +1263,31 @@ struct DDTeamCollection : ReferenceCounted { } void traceConfigInfo() { - TraceEvent("DDConfig") + TraceEvent("DDConfig", distributorId) .detail("StorageTeamSize", configuration.storageTeamSize) .detail("DesiredTeamsPerServer", SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER) - .detail("MaxTeamsPerServer", SERVER_KNOBS->MAX_TEAMS_PER_SERVER); + .detail("MaxTeamsPerServer", SERVER_KNOBS->MAX_TEAMS_PER_SERVER) + .detail("StoreType", configuration.storageServerStoreType); } void traceServerInfo() { int i = 0; - TraceEvent("ServerInfo").detail("Size", server_info.size()); + TraceEvent("ServerInfo", distributorId).detail("Size", server_info.size()); for (auto& server : server_info) { - TraceEvent("ServerInfo") + TraceEvent("ServerInfo", distributorId) .detail("ServerInfoIndex", i++) .detail("ServerID", server.first.toString()) .detail("ServerTeamOwned", server.second->teams.size()) - .detail("MachineID", server.second->machine->machineID.contents().toString()); + .detail("MachineID", server.second->machine->machineID.contents().toString()) + .detail("StoreType", server.second->storeType.toString()) + .detail("InDesiredDC", server.second->inDesiredDC) + .detail("ToRemove", server.second->toRemove); } for (auto& server : server_info) { const UID& uid = server.first; - TraceEvent("ServerStatus", uid) + TraceEvent("ServerStatus", distributorId) + .detail("ServerID", uid) .detail("Healthy", !server_status.get(uid).isUnhealthy()) .detail("MachineIsValid", server_info[uid]->machine.isValid()) .detail("MachineTeamSize", @@ -1275,9 +1298,9 @@ struct DDTeamCollection : ReferenceCounted { void traceServerTeamInfo() { int i = 0; - TraceEvent("ServerTeamInfo").detail("Size", teams.size()); + TraceEvent("ServerTeamInfo", distributorId).detail("Size", teams.size()); for (auto& team : teams) { - TraceEvent("ServerTeamInfo") + TraceEvent("ServerTeamInfo", distributorId) .detail("TeamIndex", i++) .detail("Healthy", team->isHealthy()) .detail("TeamSize", team->size()) @@ -1290,7 +1313,7 @@ struct DDTeamCollection : ReferenceCounted { TraceEvent("MachineInfo").detail("Size", machine_info.size()); for (auto& machine : machine_info) { - TraceEvent("MachineInfo") + TraceEvent("MachineInfo", distributorId) .detail("MachineInfoIndex", i++) .detail("Healthy", isMachineHealthy(machine.second)) .detail("MachineID", machine.first.contents().toString()) @@ -1303,9 +1326,9 @@ struct DDTeamCollection : ReferenceCounted { void traceMachineTeamInfo() { int i = 0; - TraceEvent("MachineTeamInfo").detail("Size", machineTeams.size()); + TraceEvent("MachineTeamInfo", distributorId).detail("Size", machineTeams.size()); for (auto& team : machineTeams) { - TraceEvent("MachineTeamInfo") + TraceEvent("MachineTeamInfo", distributorId) .detail("TeamIndex", i++) .detail("MachineIDs", team->getMachineIDsStr()) .detail("ServerTeams", team->serverTeams.size()); @@ -1326,11 +1349,11 @@ struct DDTeamCollection : ReferenceCounted { void traceMachineLocalityMap() { int i = 0; - TraceEvent("MachineLocalityMap").detail("Size", machineLocalityMap.size()); + TraceEvent("MachineLocalityMap", distributorId).detail("Size", machineLocalityMap.size()); for (auto& uid : machineLocalityMap.getObjects()) { Reference record = machineLocalityMap.getRecord(i); if (record.isValid()) { - TraceEvent("MachineLocalityMap") + TraceEvent("MachineLocalityMap", distributorId) .detail("LocalityIndex", i++) .detail("UID", uid->toString()) .detail("LocalityRecord", record->toString()); @@ -1348,7 +1371,7 @@ struct DDTeamCollection : ReferenceCounted { if (!shouldPrint) return; - TraceEvent("TraceAllInfo").detail("Primary", primary); + TraceEvent("TraceAllInfo", distributorId).detail("Primary", primary); traceConfigInfo(); traceServerInfo(); traceServerTeamInfo(); @@ -1591,7 +1614,11 @@ struct DDTeamCollection : ReferenceCounted { } } - return deterministicRandom()->randomChoice(leastUsedServers); + if (leastUsedServers.empty()) { + return Reference(); + } else { + return deterministicRandom()->randomChoice(leastUsedServers); + } } // Randomly choose one machine team that has chosenServer and has the correct size @@ -1879,9 +1906,15 @@ struct DDTeamCollection : ReferenceCounted { std::vector bestServerTeam; int bestScore = std::numeric_limits::max(); int maxAttempts = SERVER_KNOBS->BEST_OF_AMT; // BEST_OF_AMT = 4 + bool earlyQuitBuild = false; for (int i = 0; i < maxAttempts && i < 100; ++i) { // Step 2: Choose 1 least used server and then choose 1 least used machine team from the server Reference chosenServer = findOneLeastUsedServer(); + if (!chosenServer.isValid()) { + TraceEvent(SevWarn, "NoValidServer").detail("Primary", primary); + earlyQuitBuild = true; + break; + } // Note: To avoid creating correlation of picked machine teams, we simply choose a random machine team // instead of choosing the least used machine team. // The correlation happens, for example, when we add two new machines, we may always choose the machine @@ -1945,6 +1978,9 @@ struct DDTeamCollection : ReferenceCounted { } } + if (earlyQuitBuild) { + break; + } if (bestServerTeam.size() != configuration.storageTeamSize) { // Not find any team and will unlikely find a team lastBuildTeamsFailed = true; @@ -2139,6 +2175,10 @@ struct DDTeamCollection : ReferenceCounted { .detail("DoBuildTeams", self->doBuildTeams) .trackLatest("TeamCollectionInfo"); } + } else { + // Recruit more servers in the hope that we will get enough machines + TraceEvent("BuildTeam").detail("RestartRecruiting", "Because not enough machines"); + self->restartRecruiting.trigger(); } self->evaluateTeamQuality(); @@ -2165,6 +2205,7 @@ struct DDTeamCollection : ReferenceCounted { .detail("CurrentTeamCount", teams.size()) .detail("ServerCount", server_info.size()) .detail("NonFailedServerCount", desiredServerSet.size()); + traceAllInfo(true); } bool shouldHandleServer(const StorageServerInterface &newServer) { @@ -2176,6 +2217,11 @@ struct DDTeamCollection : ReferenceCounted { void addServer( StorageServerInterface newServer, ProcessClass processClass, Promise errorOut, Version addedVersion ) { if (!shouldHandleServer(newServer)) { + TraceEvent("AddedStorageServer", distributorId) + .detail("ServerID", newServer.id()) + .detail("ShouldHandleServer", 0) + .detail("ServerDCId", newServer.locality.dcId()) + .detail("IncludedDCSize", includedDCs.size()); return; } allServers.push_back( newServer.id() ); @@ -2400,7 +2446,7 @@ struct DDTeamCollection : ReferenceCounted { TraceEvent(SevInfo, "NoTeamsRemovedWhenServerRemoved") .detail("Primary", primary) .detail("Debug", "ThisShouldRarelyHappen_CheckInfoBelow"); - traceAllInfo(); + traceAllInfo(true); } // Step: Remove machine info related to removedServer @@ -2490,6 +2536,65 @@ ACTOR Future removeBadTeams(DDTeamCollection* self) { return Void(); } +bool inCorrectDC(DDTeamCollection* self, TCServerInfo* server) { + return (self->includedDCs.empty() || + std::find(self->includedDCs.begin(), self->includedDCs.end(), server->lastKnownInterface.locality.dcId()) != + self->includedDCs.end()); +} + +ACTOR Future removeWrongStoreType(DDTeamCollection* self) { + state int numServersRemoved = 0; + state std::map>::iterator server; + state vector> serversToRemove; + state int i = 0; + + loop { + wait(delay(1.0)); + TraceEvent("WrongStoreTypeRemoverStartLoop", self->distributorId) + .detail("Primary", self->primary) + .detail("ServerInfoSize", self->server_info.size()) + .detail("SysRestoreType", self->configuration.storageServerStoreType); + serversToRemove.clear(); + for (server = self->server_info.begin(); server != self->server_info.end(); ++server) { + NetworkAddress a = server->second->lastKnownInterface.address(); + AddressExclusion addr(a.ip, a.port); + TraceEvent("WrongStoreTypeRemover", self->distributorId) + .detail("DDID", self->distributorId) + .detail("Server", server->first) + .detail("Addr", addr.toString()) + .detail("StoreType", server->second->storeType) + .detail("IsCorrectStoreType", + server->second->isCorrectStoreType(self->configuration.storageServerStoreType)) + .detail("ToRemove", server->second->toRemove); + if (!server->second->isCorrectStoreType(self->configuration.storageServerStoreType) || + !inCorrectDC(self, server->second.getPtr())) { + serversToRemove.push_back(server->second); + } else { + server->second->toRemove = + 0; // In case the configuration.storeType is changed back to the server's type + } + } + + for (i = 0; i < serversToRemove.size(); i++) { + Reference s = serversToRemove[i]; + if (s.isValid()) { + s->toRemove++; // The server's location will not be excluded + s->wrongStoreTypeRemoved.trigger(); + ASSERT(s->toRemove >= 0); + wait(delay(1.0)); + } + } + + if (!serversToRemove.empty() || self->healthyTeamCount == 0) { + TraceEvent("WrongStoreTypeRemover").detail("KickTeamBuilder", "Start"); + self->restartRecruiting.trigger(); + self->doBuildTeams = true; + wait(delay(5.0)); // I have to add delay here; otherwise, it will immediately go to the next loop and print + // WrongStoreTypeRemoverStartLoop. Why?! + } + } +} + ACTOR Future machineTeamRemover(DDTeamCollection* self) { state int numMachineTeamRemoved = 0; loop { @@ -2737,6 +2842,18 @@ ACTOR Future teamTracker(DDTeamCollection* self, Reference tea team->setHealthy( healthy ); // Unhealthy teams won't be chosen by bestTeam bool optimal = team->isOptimal() && healthy; bool recheck = !healthy && (lastReady != self->initialFailureReactionDelay.isReady() || (lastZeroHealthy && !self->zeroHealthyTeams->get())); + TraceEvent("TeamHealthChangeDetected", self->distributorId) + .detail("Team", team->getDesc()) + .detail("ServersLeft", serversLeft) + .detail("LastServersLeft", lastServersLeft) + .detail("AnyUndesired", anyUndesired) + .detail("LastAnyUndesired", lastAnyUndesired) + .detail("AnyWrongConfiguration", anyWrongConfiguration) + .detail("LastWrongConfiguration", lastWrongConfiguration) + .detail("Recheck", recheck) + .detail("BadTeam", badTeam) + .detail("LastZeroHealthy", lastZeroHealthy) + .detail("ZeroHealthyTeam", self->zeroHealthyTeams->get()); lastReady = self->initialFailureReactionDelay.isReady(); lastZeroHealthy = self->zeroHealthyTeams->get(); @@ -2787,6 +2904,11 @@ ACTOR Future teamTracker(DDTeamCollection* self, Reference tea TraceEvent(SevWarn, "ZeroTeamsHealthySignalling", self->distributorId) .detail("SignallingTeam", team->getDesc()) .detail("Primary", self->primary); + self->traceAllInfo(true); + // Create a new team for safe + self->restartRecruiting.trigger(); + self->doBuildTeams = true; + self->restartTeamBuilder.trigger(); } if(logTeamEvents) { @@ -2842,7 +2964,9 @@ ACTOR Future teamTracker(DDTeamCollection* self, Reference tea for(int i=0; igetPriority(); - if(maxPriority < PRIORITY_TEAM_0_LEFT) { + // The shard split/merge and DD rebooting may make a shard mapped to multiple teams, + // so we need to recalculate the shard's priority + if (maxPriority < PRIORITY_TEAM_0_LEFT) { // Q: When will maxPriority >= PRIORITY_TEAM_0_LEFT auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] ); for( int j=0; j < teams.first.size()+teams.second.size(); j++) { // t is the team in primary DC or the remote DC @@ -2919,6 +3043,7 @@ ACTOR Future teamTracker(DDTeamCollection* self, Reference tea if( self->healthyTeamCount == 0 ) { TraceEvent(SevWarn, "ZeroTeamsHealthySignalling", self->distributorId).detail("SignallingTeam", team->getDesc()); self->zeroHealthyTeams->set(true); + self->restartRecruiting.trigger(); } } if (lastOptimal) { @@ -3004,6 +3129,11 @@ ACTOR Future>> getServerL return results; } +// Q: Why do we need this actor? +// The serverList system keyspace keeps the StorageServerInterface for each serverID. If a storage server process +// crashes and restarted at a different machine, will we reuse the StorageServerInterface? A: Storage server's storeType +// and serverID are decided by the server's filename. By parsing storage server file's filename on each disk, process on +// each machine creates the TCServer with the correct serverID and StorageServerInterface. ACTOR Future waitServerListChange( DDTeamCollection* self, FutureStream serverRemoved ) { state Future checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY); state Future>> serverListAndProcessClasses = Never(); @@ -3118,9 +3248,24 @@ ACTOR Future serverMetricsPolling( TCServerInfo *server) { //Returns the KeyValueStoreType of server if it is different from self->storeType ACTOR Future keyValueStoreTypeTracker(DDTeamCollection* self, TCServerInfo *server) { - state KeyValueStoreType type = wait(brokenPromiseToNever(server->lastKnownInterface.getKeyValueStoreType.getReplyWithTaskID(TaskPriority::DataDistribution))); - if(type == self->configuration.storageServerStoreType && (self->includedDCs.empty() || std::find(self->includedDCs.begin(), self->includedDCs.end(), server->lastKnownInterface.locality.dcId()) != self->includedDCs.end()) ) + try { + // Update server's storeType, especially when it was created + state KeyValueStoreType type = wait( + brokenPromiseToNever(server->lastKnownInterface.getKeyValueStoreType.getReplyWithTaskID( + TaskPriority::DataDistribution))); + server->storeType = type; + if (server->storeType == self->configuration.storageServerStoreType) { + server->toRemove = 0; // In case sys config is changed back to the server's storeType + } + if (server->storeType == self->configuration.storageServerStoreType && + (self->includedDCs.empty() || + std::find(self->includedDCs.begin(), self->includedDCs.end(), + server->lastKnownInterface.locality.dcId()) != self->includedDCs.end())) { + wait(Future(Never())); + } + } catch (Error& e) { wait(Future(Never())); + } return type; } @@ -3135,6 +3280,10 @@ ACTOR Future waitForAllDataRemoved( Database cx, UID serverID, Version add //we cannot remove a server immediately after adding it, because a perfectly timed master recovery could cause us to not store the mutations sent to the short lived storage server. if(ver > addedVersion + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) { bool canRemove = wait( canRemoveStorageServer( &tr, serverID ) ); + TraceEvent("WaitForAllDataRemoved") + .detail("Server", serverID) + .detail("CanRemove", canRemove) + .detail("Shards", teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID)); if (canRemove && teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID) == 0) { return Void(); } @@ -3184,8 +3333,18 @@ ACTOR Future storageServerFailureTracker(DDTeamCollection* self, TCServerI } self->server_status.set( interf.id(), *status ); - if( status->isFailed ) + TraceEvent("MXTEST") + .detail("DDID", self->distributorId) + .detail("Server", interf.id()) + .detail("Unhealthy", status->isUnhealthy()) + .detail("Status", status->toString()); + if (status->isFailed) { self->restartRecruiting.trigger(); + TraceEvent("MXTESTTriggerRestartRecruiting") + .detail("DDID", self->distributorId) + .detail("Server", interf.id()); + wait(delay(0.1)); + } Future healthChanged = Never(); if(status->isFailed) { @@ -3214,6 +3373,17 @@ ACTOR Future storageServerFailureTracker(DDTeamCollection* self, TCServerI self->healthyZone.set(Optional()); } } + // if (status->isFailed) { + // self->restartRecruiting.trigger(); + // } + // self->server_status.set( interf.id(), *status ); // Update the global server status, so that + // storageRecruiter can use the updated info for recruiting + + TraceEvent("StatusMapChange", self->distributorId) + .detail("ServerID", interf.id()) + .detail("Status", status->toString()) + .detail("Available", + IFailureMonitor::failureMonitor().getState(interf.waitFailure.getEndpoint()).isAvailable()); } when ( wait( status->isUnhealthy() ? waitForAllDataRemoved(cx, interf.id(), addedVersion, self) : Never() ) ) { break; } when ( wait( self->healthyZone.onChange() ) ) {} @@ -3226,12 +3396,9 @@ ACTOR Future storageServerFailureTracker(DDTeamCollection* self, TCServerI // Check the status of a storage server. // Apply all requirements to the server and mark it as excluded if it fails to satisfies these requirements ACTOR Future storageServerTracker( - DDTeamCollection* self, - Database cx, - TCServerInfo *server, //This actor is owned by this TCServerInfo - Promise errorOut, - Version addedVersion) -{ + DDTeamCollection* self, Database cx, + TCServerInfo* server, // This actor is owned by this TCServerInfo, point to server_info[id] + Promise errorOut, Version addedVersion) { state Future failureTracker; state ServerStatus status( false, false, server->lastKnownInterface.locality ); state bool lastIsUnhealthy = false; @@ -3239,7 +3406,8 @@ ACTOR Future storageServerTracker( state Future> interfaceChanged = server->onInterfaceChanged; state Future storeTracker = keyValueStoreTypeTracker( self, server ); - state bool hasWrongStoreTypeOrDC = false; + state bool hasWrongDC = !inCorrectDC(self, server); + state bool toRemoveWrongStoreType = false; state int targetTeamNumPerServer = (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (self->configuration.storageTeamSize + 1)) / 2; try { @@ -3247,7 +3415,9 @@ ACTOR Future storageServerTracker( status.isUndesired = false; status.isWrongConfiguration = false; - // If there is any other server on this exact NetworkAddress, this server is undesired and will eventually be eliminated + // If there is any other server on this exact NetworkAddress, this server is undesired and will eventually + // be eliminated. This samAddress checking must be redo whenever the server's state (e.g., storeType, + // dcLocation, interface) is changed. state std::vector> otherChanges; std::vector> wakeUpTrackers; for(const auto& i : self->server_info) { @@ -3263,7 +3433,12 @@ ACTOR Future storageServerTracker( .detail("OtherHealthy", !self->server_status.get( i.second->id ).isUnhealthy()); // wait for the server's ip to be changed otherChanges.push_back(self->server_status.onChange(i.second->id)); - if(!self->server_status.get( i.second->id ).isUnhealthy()) { + // ASSERT(i.first == i.second->id); //MX: TO enable the assert + // When a wrongStoreType server colocate with a correct StoreType server, we should not mark the + // correct one as unhealthy + // TODO: We should not need i.second->toRemove == 0 because this loop should be triggered after + // failureTracker returns + if (!self->server_status.get(i.second->id).isUnhealthy()) { //&& i.second->toRemove == 0 if(self->shardsAffectedByTeamFailure->getNumberOfShards(i.second->id) >= self->shardsAffectedByTeamFailure->getNumberOfShards(server->id)) { TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId) @@ -3297,11 +3472,21 @@ ACTOR Future storageServerTracker( status.isUndesired = true; } otherChanges.push_back( self->zeroOptimalTeams.onChange() ); + otherChanges.push_back(self->zeroHealthyTeams->onChange()); } //If this storage server has the wrong key-value store type, then mark it undesired so it will be replaced with a server having the correct type - if(hasWrongStoreTypeOrDC) { - TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId).detail("Server", server->id).detail("StoreType", "?"); + if (hasWrongDC) { + TraceEvent(SevWarn, "UndesiredDC", self->distributorId) + .detail("Server", server->id) + .detail("WrongDC", "?"); + status.isUndesired = true; + status.isWrongConfiguration = true; + } + if (toRemoveWrongStoreType) { // TODO: merge with the above if (hasWrongDC) + TraceEvent(SevWarn, "WrongStoreTypeToRemove", self->distributorId) + .detail("Server", server->id) + .detail("StoreType", "?"); status.isUndesired = true; status.isWrongConfiguration = true; } @@ -3321,8 +3506,7 @@ ACTOR Future storageServerTracker( failureTracker = storageServerFailureTracker(self, server, cx, &status, addedVersion); //We need to recruit new storage servers if the key value store type has changed - if(hasWrongStoreTypeOrDC) - self->restartRecruiting.trigger(); + if (hasWrongDC || toRemoveWrongStoreType) self->restartRecruiting.trigger(); if (lastIsUnhealthy && !status.isUnhealthy() && ( server->teams.size() < targetTeamNumPerServer || self->lastBuildTeamsFailed)) { @@ -3447,14 +3631,19 @@ ACTOR Future storageServerTracker( } interfaceChanged = server->onInterfaceChanged; - // We rely on the old failureTracker being actorCancelled since the old actor now has a pointer to an invalid location + // We rely on the old failureTracker being actorCancelled since the old actor now has a pointer to + // an invalid location ? + // What does this mean? Why does the old failureTracker has a pointer to an invalid location? + // MXQ: Will the status's isFailed and isUndesired field be reset at the beginning of loop?!! status = ServerStatus( status.isFailed, status.isUndesired, server->lastKnownInterface.locality ); // self->traceTeamCollectionInfo(); recordTeamCollectionInfo = true; //Restart the storeTracker for the new interface - storeTracker = keyValueStoreTypeTracker(self, server); - hasWrongStoreTypeOrDC = false; + storeTracker = keyValueStoreTypeTracker( + self, server); // hasWrongStoretype server will be delayed to be deleted. + hasWrongDC = false; + toRemoveWrongStoreType = false; self->restartTeamBuilder.trigger(); if(restartRecruiting) @@ -3471,7 +3660,14 @@ ACTOR Future storageServerTracker( TEST(true); //KeyValueStore type changed storeTracker = Never(); - hasWrongStoreTypeOrDC = true; + hasWrongDC = !inCorrectDC(self, server); + } + when(wait(server->wrongStoreTypeRemoved.onTrigger())) { + TraceEvent(SevWarn, "UndesiredStorageServerTriggered", self->distributorId) + .detail("Server", server->id) + .detail("StoreType", server->storeType) + .detail("ConfigStoreType", self->configuration.storageServerStoreType); + toRemoveWrongStoreType = true; } when( wait( server->wakeUpTracker.getFuture() ) ) { server->wakeUpTracker = Promise(); @@ -3530,8 +3726,14 @@ ACTOR Future initializeStorage( DDTeamCollection* self, RecruitStorageRepl isr.reqId = deterministicRandom()->randomUniqueID(); isr.interfaceId = interfaceId; - TraceEvent("DDRecruiting").detail("State", "Sending request to worker").detail("WorkerID", candidateWorker.worker.id()) - .detail("WorkerLocality", candidateWorker.worker.locality.toString()).detail("Interf", interfaceId).detail("Addr", candidateWorker.worker.address()); + TraceEvent("DDRecruiting") + .detail("Primary", self->primary) + .detail("State", "Sending request to worker") + .detail("WorkerID", candidateWorker.worker.id()) + .detail("WorkerLocality", candidateWorker.worker.locality.toString()) + .detail("Interf", interfaceId) + .detail("Addr", candidateWorker.worker.address()) + .detail("RecruitingStream", self->recruitingStream.get()); self->recruitingIds.insert(interfaceId); self->recruitingLocalities.insert(candidateWorker.worker.address()); @@ -3547,8 +3749,14 @@ ACTOR Future initializeStorage( DDTeamCollection* self, RecruitStorageRepl self->recruitingStream.set(self->recruitingStream.get()-1); - TraceEvent("DDRecruiting").detail("State", "Finished request").detail("WorkerID", candidateWorker.worker.id()) - .detail("WorkerLocality", candidateWorker.worker.locality.toString()).detail("Interf", interfaceId).detail("Addr", candidateWorker.worker.address()); + TraceEvent("DDRecruiting") + .detail("Primary", self->primary) + .detail("State", "Finished request") + .detail("WorkerID", candidateWorker.worker.id()) + .detail("WorkerLocality", candidateWorker.worker.locality.toString()) + .detail("Interf", interfaceId) + .detail("Addr", candidateWorker.worker.address()) + .detail("RecruitingStream", self->recruitingStream.get()); if( newServer.present() ) { if( !self->server_info.count( newServer.get().interf.id() ) ) @@ -3568,16 +3776,25 @@ ACTOR Future initializeStorage( DDTeamCollection* self, RecruitStorageRepl ACTOR Future storageRecruiter( DDTeamCollection* self, Reference> db ) { state Future fCandidateWorker; state RecruitStorageRequest lastRequest; + state bool hasHealthyTeam; + state int numRecuitSSPending = 0; + state std::map numSSPerAddr; loop { try { + numSSPerAddr.clear(); + hasHealthyTeam = (self->healthyTeamCount != 0); RecruitStorageRequest rsr; std::set exclusions; for(auto s = self->server_info.begin(); s != self->server_info.end(); ++s) { auto serverStatus = self->server_status.get( s->second->lastKnownInterface.id() ); if( serverStatus.excludeOnRecruit() ) { - TraceEvent(SevDebug, "DDRecruitExcl1").detail("Excluding", s->second->lastKnownInterface.address()); + TraceEvent(SevDebug, "DDRecruitExcl1") + .detail("Primary", self->primary) + .detail("Excluding", s->second->lastKnownInterface.address()); auto addr = s->second->lastKnownInterface.address(); - exclusions.insert( AddressExclusion( addr.ip, addr.port ) ); + AddressExclusion addrExcl(addr.ip, addr.port); + exclusions.insert(addrExcl); + numSSPerAddr[addrExcl]++; // increase from 0 } } for(auto addr : self->recruitingLocalities) { @@ -3587,7 +3804,9 @@ ACTOR Future storageRecruiter( DDTeamCollection* self, ReferenceexcludedServers.getKeys(); for(auto& s : excl) if (self->excludedServers.get(s)) { - TraceEvent(SevDebug, "DDRecruitExcl2").detail("Excluding", s.toString()); + TraceEvent(SevDebug, "DDRecruitExcl2") + .detail("Primary", self->primary) + .detail("Excluding", s.toString()); exclusions.insert( s ); } rsr.criticalRecruitment = self->healthyTeamCount == 0; @@ -3597,30 +3816,65 @@ ACTOR Future storageRecruiter( DDTeamCollection* self, ReferenceincludedDCs; - TraceEvent(rsr.criticalRecruitment ? SevWarn : SevInfo, "DDRecruiting").detail("State", "Sending request to CC") - .detail("Exclusions", rsr.excludeAddresses.size()).detail("Critical", rsr.criticalRecruitment); + TraceEvent(rsr.criticalRecruitment ? SevWarn : SevInfo, "DDRecruiting") + .detail("Primary", self->primary) + .detail("State", "Sending request to CC") + .detail("Exclusions", rsr.excludeAddresses.size()) + .detail("Critical", rsr.criticalRecruitment) + .detail("IncludedDCsSize", rsr.includeDCs.size()); if( rsr.criticalRecruitment ) { - TraceEvent(SevWarn, "DDRecruitingEmergency", self->distributorId); + TraceEvent(SevWarn, "DDRecruitingEmergency", self->distributorId).detail("Primary", self->primary); } if(!fCandidateWorker.isValid() || fCandidateWorker.isReady() || rsr.excludeAddresses != lastRequest.excludeAddresses || rsr.criticalRecruitment != lastRequest.criticalRecruitment) { + TraceEvent(rsr.criticalRecruitment ? SevWarn : SevInfo, "DDRecruiting") + .detail("Primary", self->primary) + .detail("State", "Sending rsr request to CC"); lastRequest = rsr; fCandidateWorker = brokenPromiseToNever( db->get().clusterInterface.recruitStorage.getReply( rsr, TaskPriority::DataDistribution ) ); } + TraceEvent("StorageRecruiterMX", self->distributorId) + .detail("Primary", self->primary) + .detail("HasHealthyTeam", hasHealthyTeam) + .detail("SysStoreType", self->configuration.storageServerStoreType); + self->traceAllInfo(true); + choose { when( RecruitStorageReply candidateWorker = wait( fCandidateWorker ) ) { + AddressExclusion candidateSSAddr(candidateWorker.worker.address().ip, + candidateWorker.worker.address().port); + int numExistingSS = numSSPerAddr[candidateSSAddr]; + if (numExistingSS >= 2) { + TraceEvent(SevWarnAlways, "StorageRecruiterTooManySSOnSameAddrMX", self->distributorId) + .detail("Primary", self->primary) + .detail("Addr", candidateSSAddr.toString()) + .detail("NumExistingSS", numExistingSS); + } else { + TraceEvent("DDRecruiting", self->distributorId) + .detail("Primary", self->primary) + .detail("State", "Got worker for SS") + .detail("Addr", candidateSSAddr.toString()) + .detail("NumExistingSS", numExistingSS); + } self->addActor.send(initializeStorage(self, candidateWorker)); } when( wait( db->onChange() ) ) { // SOMEDAY: only if clusterInterface changes? fCandidateWorker = Future(); } - when( wait( self->restartRecruiting.onTrigger() ) ) {} + when(wait(self->restartRecruiting.onTrigger())) { + TraceEvent("DDRecruiting", self->distributorId) + .detail("Primary", self->primary) + .detail("State", "Restart recruiting"); + } } wait( delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY) ); } catch( Error &e ) { if(e.code() != error_code_timed_out) { + TraceEvent("StorageRecruiterMXExit", self->distributorId) + .detail("Primary", self->primary) + .detail("Error", e.what()); throw; } TEST(true); //Storage recruitment timed out @@ -3637,14 +3891,20 @@ ACTOR Future updateReplicasKey(DDTeamCollection* self, Optional dcId) wait(self->initialFailureReactionDelay && waitForAll(serverUpdates)); wait(waitUntilHealthy(self)); - TraceEvent("DDUpdatingReplicas", self->distributorId).detail("DcId", dcId).detail("Replicas", self->configuration.storageTeamSize); + TraceEvent("DDUpdatingReplicas", self->distributorId) + .detail("Primary", self->primary) + .detail("DcId", dcId) + .detail("Replicas", self->configuration.storageTeamSize); state Transaction tr(self->cx); loop { try { Optional val = wait( tr.get(datacenterReplicasKeyFor(dcId)) ); state int oldReplicas = val.present() ? decodeDatacenterReplicasValue(val.get()) : 0; if(oldReplicas == self->configuration.storageTeamSize) { - TraceEvent("DDUpdatedAlready", self->distributorId).detail("DcId", dcId).detail("Replicas", self->configuration.storageTeamSize); + TraceEvent("DDUpdatedAlready", self->distributorId) + .detail("Primary", self->primary) + .detail("DcId", dcId) + .detail("Replicas", self->configuration.storageTeamSize); return Void(); } if(oldReplicas < self->configuration.storageTeamSize) { @@ -3652,7 +3912,11 @@ ACTOR Future updateReplicasKey(DDTeamCollection* self, Optional dcId) } tr.set(datacenterReplicasKeyFor(dcId), datacenterReplicasValue(self->configuration.storageTeamSize)); wait( tr.commit() ); - TraceEvent("DDUpdatedReplicas", self->distributorId).detail("DcId", dcId).detail("Replicas", self->configuration.storageTeamSize).detail("OldReplicas", oldReplicas); + TraceEvent("DDUpdatedReplicas", self->distributorId) + .detail("Primary", self->primary) + .detail("DcId", dcId) + .detail("Replicas", self->configuration.storageTeamSize) + .detail("OldReplicas", oldReplicas); return Void(); } catch( Error &e ) { wait( tr.onError(e) ); @@ -3677,6 +3941,7 @@ ACTOR Future remoteRecovered( Reference> db } ACTOR Future monitorHealthyTeams( DDTeamCollection* self ) { + TraceEvent("DDMonitorHealthyTeamsStart").detail("ZeroHealthyTeams", self->zeroHealthyTeams->get()); loop choose { when ( wait(self->zeroHealthyTeams->get() ? delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY) : Never()) ) { self->doBuildTeams = true; @@ -3722,9 +3987,18 @@ ACTOR Future dataDistributionTeamCollection( self->redundantServerTeamRemover = serverTeamRemover(self); self->addActor.send(self->redundantServerTeamRemover); } + + if (self->wrongStoreTypeRemover.isReady()) { + self->wrongStoreTypeRemover = removeWrongStoreType(self); + self->addActor.send(self->wrongStoreTypeRemover); + } + self->traceTeamCollectionInfo(); if(self->includedDCs.size()) { + for (int i = 0; i < self->includedDCs.size(); ++i) { + TraceEvent("DDTeamCollectionMXTEST").detail("IncludedDC", i).detail("DC", self->includedDCs[i]); + } //start this actor before any potential recruitments can happen self->addActor.send(updateReplicasKey(self, self->includedDCs[0])); } diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index 7ea6597c24..720b506658 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -225,6 +225,7 @@ public: (*it)->setHealthy(h); } } + virtual int getPriority() { int priority = 0; for (auto it = teams.begin(); it != teams.end(); it++) { @@ -247,6 +248,7 @@ public: } }; +// MXQ: Why do we need to count the utilization for each priority? Can a relocationShard have multiple priorities? struct Busyness { vector ledger; @@ -295,6 +297,7 @@ int getWorkFactor( RelocateData const& relocation ) { // Data movement's resource control: Do not overload source servers used for the RelocateData // return true if servers are not too busy to launch the relocation +// This ensure source servers will not be overloaded. bool canLaunch( RelocateData & relocation, int teamSize, std::map & busymap, std::vector cancellableRelocations ) { // assert this has not already been launched @@ -356,7 +359,7 @@ struct DDQueueData { int64_t bytesWritten; int teamSize; - std::map busymap; + std::map busymap; // UID is serverID KeyRangeMap< RelocateData > queueMap; std::set> fetchingSourcesQueue; @@ -365,7 +368,8 @@ struct DDQueueData { std::map>> queue; //Key UID is serverID, value is the serverID's set of RelocateData to relocate KeyRangeMap< RelocateData > inFlight; - KeyRangeActorMap inFlightActors; //Key: RelocatData, Value: Actor to move the data + // Track all actors that relocates specified keys to a good place; Key: keyRange; Value: actor + KeyRangeActorMap inFlightActors; Promise error; PromiseStream dataTransferComplete; @@ -760,6 +764,9 @@ struct DDQueueData { launchQueuedWork( combined ); } + // For each relocateData rd in the queue, check if there exist inflight relocate data whose keyrange is overlapped + // with rd. If there exist, cancel them by cancel their actors and reduce the src servers' busyness of those + // canceled inflight relocateData Launch the relocation for the rd. void launchQueuedWork( std::set> combined ) { int startedHere = 0; double startTime = now(); @@ -768,6 +775,7 @@ struct DDQueueData { for(; it != combined.end(); it++ ) { RelocateData rd( *it ); + // Check if there is an inflight shard that is overlapped with the queued relocateShard (rd) bool overlappingInFlight = false; auto intersectingInFlight = inFlight.intersectingRanges( rd.keys ); for(auto it = intersectingInFlight.begin(); it != intersectingInFlight.end(); ++it) { @@ -788,6 +796,7 @@ struct DDQueueData { continue; } + // MXQ: What does the if mean in the following comment? // Because the busyness of a server is decreased when a superseding relocation is issued, we // need to consider what the busyness of a server WOULD be if auto containedRanges = inFlight.containedRanges( rd.keys ); @@ -798,8 +807,10 @@ struct DDQueueData { } } + // MXQ: I don't understand the SOMEDAY and FIXME statement // Data movement avoids overloading source servers in moving data. - // SOMEDAY: the list of source servers may be outdated since they were fetched when the work was put in the queue + // SOMEDAY: the list of source servers may be outdated since they were fetched when the work was put in the + // queue // FIXME: we need spare capacity even when we're just going to be cancelling work via TEAM_HEALTHY if( !canLaunch( rd, teamSize, busymap, cancellableRelocations ) ) { //logRelocation( rd, "SkippingQueuedRelocation" ); @@ -842,6 +853,7 @@ struct DDQueueData { launch( rrs, busymap ); activeRelocations++; startRelocation(rrs.priority); + // Start the actor that relocates data in the rrs.keys inFlightActors.insert( rrs.keys, dataDistributionRelocator( this, rrs ) ); } diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index b71ed41227..3421e0b70b 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -196,6 +196,10 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( TR_REMOVE_SERVER_TEAM_DELAY, 60.0 ); if( randomize && BUGGIFY ) TR_REMOVE_SERVER_TEAM_DELAY = deterministicRandom()->random01() * 60.0; init( TR_REMOVE_SERVER_TEAM_EXTRA_DELAY, 5.0 ); if( randomize && BUGGIFY ) TR_REMOVE_SERVER_TEAM_EXTRA_DELAY = deterministicRandom()->random01() * 10.0; + init( STR_NUM_SERVERS_REMOVED_ONCE, 1 ); if( randomize && BUGGIFY ) STR_NUM_SERVERS_REMOVED_ONCE = deterministicRandom()->random01() * 100.0; + init( STR_REMOVE_STORE_ENGINE_TIMEOUT, 60.0 ); if( randomize && BUGGIFY ) STR_REMOVE_STORE_ENGINE_TIMEOUT = deterministicRandom()->random01() * 60.0; + init( STR_REMOVE_STORE_ENGINE_DELAY, 60.0); if( randomize && BUGGIFY ) STR_REMOVE_STORE_ENGINE_DELAY = deterministicRandom()->random01() * 60.0; + // Redwood Storage Engine init( PREFIX_TREE_IMMEDIATE_KEY_SIZE_LIMIT, 30 ); init( PREFIX_TREE_IMMEDIATE_KEY_SIZE_MIN, 0 ); diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index 39d9abc85e..8865edaccd 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -155,6 +155,12 @@ public: double TR_REMOVE_SERVER_TEAM_DELAY; // wait for the specified time before try to remove next server team double TR_REMOVE_SERVER_TEAM_EXTRA_DELAY; // serverTeamRemover waits for the delay and check DD healthyness again to ensure it runs after machineTeamRemover + // WrongStoreTypeRemover to remove wrong storage engines + int STR_NUM_SERVERS_REMOVED_ONCE; // The number of servers with wrong storage engines to remove + double STR_REMOVE_STORE_ENGINE_TIMEOUT; // wait for at most timeout time before remove next batch of wrong stroage + // engines + double STR_REMOVE_STORE_ENGINE_DELAY; // wait for the specified time before remove the next batch + double DD_FAILURE_TIME; double DD_ZERO_HEALTHY_TEAM_DELAY; diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 09d89ef827..286204adda 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -258,6 +258,11 @@ ACTOR Future>> additionalSources(Standalone s return result; } +// keyServer: map from keys to destination servers +// serverKeys: two-dimension map: [servers][keys], value is the servers' state of having the keys: active(not-have), +// complete(already has), ""() MXQ: What does serverKeys[dest][keys] mean? It seems having the same meaning with +// serverKeys[servers][keys]? + // Set keyServers[keys].dest = servers // Set serverKeys[servers][keys] = active for each subrange of keys that the server did not already have, complete for each subrange that it already has // Set serverKeys[dest][keys] = "" for the dest servers of each existing shard in keys (unless that destination is a member of servers OR if the source list is sufficiently degraded) @@ -276,8 +281,9 @@ ACTOR Future startMoveKeys( Database occ, KeyRange keys, vector serve state int shards = 0; state int maxRetries = 0; - //This process can be split up into multiple transactions if there are too many existing overlapping shards - //In that case, each iteration of this loop will have begin set to the end of the last processed shard + // If it's multiple transaction, how do we achieve atomicity? + // This process can be split up into multiple transactions if there are too many existing overlapping shards + // In that case, each iteration of this loop will have begin set to the end of the last processed shard while(begin < keys.end) { TEST(begin > keys.begin); //Multi-transactional startMoveKeys batches++; @@ -307,8 +313,9 @@ ACTOR Future startMoveKeys( Database occ, KeyRange keys, vector serve for(int s=0; s startMoveKeys( Database occ, KeyRange keys, vector serve state Key endKey = old.end()[-1].key; currentKeys = KeyRangeRef(currentKeys.begin, endKey); - /*TraceEvent("StartMoveKeysBatch", relocationIntervalId) - .detail("KeyBegin", currentKeys.begin.c_str()) - .detail("KeyEnd", currentKeys.end.c_str());*/ + TraceEvent("StartMoveKeysBatch", relocationIntervalId) + .detail("KeyBegin", currentKeys.begin.toString()) + .detail("KeyEnd", currentKeys.end.toString()); - //printf("Moving '%s'-'%s' (%d) to %d servers\n", keys.begin.toString().c_str(), keys.end.toString().c_str(), old.size(), servers.size()); - //for(int i=0; i> addAsSource = wait(additionalSources(old, &tr, servers.size(), SERVER_KNOBS->MAX_ADDED_SOURCES_MULTIPLIER*servers.size())); @@ -340,12 +347,12 @@ ACTOR Future startMoveKeys( Database occ, KeyRange keys, vector serve vector dest; decodeKeyServersValue( old[i].value, src, dest ); - /*TraceEvent("StartMoveKeysOldRange", relocationIntervalId) - .detail("KeyBegin", rangeIntersectKeys.begin.c_str()) - .detail("KeyEnd", rangeIntersectKeys.end.c_str()) - .detail("OldSrc", describe(src)) - .detail("OldDest", describe(dest)) - .detail("ReadVersion", tr.getReadVersion().get());*/ + TraceEvent("StartMoveKeysOldRange", relocationIntervalId) + .detail("KeyBegin", rangeIntersectKeys.begin.toString()) + .detail("KeyEnd", rangeIntersectKeys.end.toString()) + .detail("OldSrc", describe(src)) + .detail("OldDest", describe(dest)) + .detail("ReadVersion", tr.getReadVersion().get()); for(auto& uid : addAsSource[i]) { src.push_back(uid); @@ -358,15 +365,13 @@ ACTOR Future startMoveKeys( Database occ, KeyRange keys, vector serve //Track old destination servers. They may be removed from serverKeys soon, since they are about to be overwritten in keyServers for(auto s = dest.begin(); s != dest.end(); ++s) { oldDests.insert(*s); - /*TraceEvent("StartMoveKeysOldDestAdd", relocationIntervalId) - .detail("Server", s->id());*/ + TraceEvent("StartMoveKeysOldDestAdd", relocationIntervalId).detail("Server", *s); } //Keep track of src shards so that we can preserve their values when we overwrite serverKeys for(auto& uid : src) { shardMap[uid].push_back(old.arena(), rangeIntersectKeys); - /*TraceEvent("StartMoveKeysShardMapAdd", relocationIntervalId) - .detail("Server", *s);*/ + TraceEvent("StartMoveKeysShardMapAdd", relocationIntervalId).detail("Server", uid); } } @@ -819,9 +824,9 @@ ACTOR Future> addStorageServer( Database cx, StorageServ } } } - +// A SS can be removed only if all data (shards) on the SS have been moved away from the SS. ACTOR Future canRemoveStorageServer( Transaction* tr, UID serverID ) { - Standalone keys = wait( krmGetRanges( tr, serverKeysPrefixFor(serverID), allKeys, 2 ) ); + state Standalone keys = wait(krmGetRanges(tr, serverKeysPrefixFor(serverID), allKeys, 2)); ASSERT(keys.size() >= 2); @@ -830,6 +835,16 @@ ACTOR Future canRemoveStorageServer( Transaction* tr, UID serverID ) { ASSERT(false); } + // DEBUG purpose + if (!(keys[0].value == serverKeysFalse && keys[1].key == allKeys.end)) { + Standalone allKeys = + wait(krmGetRanges(tr, serverKeysPrefixFor(serverID), allKeys, CLIENT_KNOBS->TOO_MANY)); + TraceEvent("CanNOTRemove").detail("KeysNum", allKeys.size()); + for (auto& k : allKeys) { + TraceEvent("CanNOTRemove").detail("Key", k.key).detail("Value", k.value); + } + } + //Return true if the entire range is false. Since these values are coalesced, we can return false if there is more than one result return keys[0].value == serverKeysFalse && keys[1].key == allKeys.end; } diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index 7a2a5560e9..e5708f4d78 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -305,6 +305,9 @@ ACTOR Future getMaxStorageServerQueueSize( Database cx, Reference waitForQuietDatabase( Database cx, Reference dataInFlightGate || tLogQueueInfo.get().first > maxTLogQueueGate || tLogQueueInfo.get().second > maxPoppedVersionLag || dataDistributionQueueSize.get() > maxDataDistributionQueueSize || diff --git a/flow/flow.h b/flow/flow.h index 581255e96f..348e234dcc 100644 --- a/flow/flow.h +++ b/flow/flow.h @@ -306,6 +306,7 @@ struct SingleCallback { } }; +// SAV is short for Single Assigment Variable: It can be assigned for only once! template struct SAV : private Callback, FastAllocated> { int promises; // one for each promise (and one for an active actor if this is an actor)