fix: seed storage servers are recruited based on the storage policy
This commit is contained in:
parent
9404d226d0
commit
f3b7aa615d
|
@ -211,15 +211,56 @@ public:
|
|||
throw no_more_servers();
|
||||
}
|
||||
|
||||
std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDatacenters( DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false )
|
||||
std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForSeedServers( DatabaseConfiguration const& conf ) {
|
||||
std::map<ProcessClass::Fitness, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
|
||||
std::vector<std::pair<WorkerInterface, ProcessClass>> results;
|
||||
LocalitySetRef logServerSet = Reference<LocalitySet>(new LocalityMap<std::pair<WorkerInterface, ProcessClass>>());
|
||||
LocalityMap<std::pair<WorkerInterface, ProcessClass>>* logServerMap = (LocalityMap<std::pair<WorkerInterface, ProcessClass>>*) logServerSet.getPtr();
|
||||
bool bCompleted = false;
|
||||
|
||||
for( auto& it : id_worker ) {
|
||||
auto fitness = it.second.processClass.machineClassFitness( ProcessClass::Storage );
|
||||
if( workerAvailable(it.second, false) && !conf.isExcludedServer(it.second.interf.address()) && fitness != ProcessClass::NeverAssign ) {
|
||||
fitness_workers[ fitness ].push_back(std::make_pair(it.second.interf, it.second.processClass));
|
||||
}
|
||||
}
|
||||
|
||||
for( auto& it : fitness_workers ) {
|
||||
for (auto& worker : it.second ) {
|
||||
logServerMap->add(worker.first.locality, &worker);
|
||||
}
|
||||
|
||||
std::vector<LocalityEntry> bestSet;
|
||||
if( logServerSet->selectReplicas(conf.storagePolicy, bestSet) ) {
|
||||
results.reserve(bestSet.size());
|
||||
for (auto& entry : bestSet) {
|
||||
auto object = logServerMap->getObject(entry);
|
||||
results.push_back(*object);
|
||||
}
|
||||
bCompleted = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
logServerSet->clear();
|
||||
logServerSet.clear();
|
||||
|
||||
if (!bCompleted) {
|
||||
throw no_more_servers();
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDatacenters( DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false )
|
||||
{
|
||||
std::map<ProcessClass::Fitness, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
|
||||
std::vector<std::pair<WorkerInterface, ProcessClass>> results;
|
||||
std::vector<LocalityData> unavailableLocals;
|
||||
LocalitySetRef logServerSet;
|
||||
LocalityMap<std::pair<WorkerInterface, ProcessClass>>* logServerMap;
|
||||
UID functionId = g_nondeterministic_random->randomUniqueID();
|
||||
bool bCompleted = false;
|
||||
std::vector<std::pair<WorkerInterface, ProcessClass>> results;
|
||||
std::vector<LocalityData> unavailableLocals;
|
||||
LocalitySetRef logServerSet;
|
||||
LocalityMap<std::pair<WorkerInterface, ProcessClass>>* logServerMap;
|
||||
UID functionId = g_nondeterministic_random->randomUniqueID();
|
||||
bool bCompleted = false;
|
||||
|
||||
logServerSet = Reference<LocalitySet>(new LocalityMap<std::pair<WorkerInterface, ProcessClass>>());
|
||||
logServerMap = (LocalityMap<std::pair<WorkerInterface, ProcessClass>>*) logServerSet.getPtr();
|
||||
|
@ -570,6 +611,12 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
|
|||
RecruitFromConfigurationReply result;
|
||||
std::map< Optional<Standalone<StringRef>>, int> id_used;
|
||||
|
||||
if(req.recruitSeedServers) {
|
||||
auto storageServers = getWorkersForSeedServers(req.configuration);
|
||||
for(int i = 0; i < storageServers.size(); i++)
|
||||
result.storageServers.push_back(storageServers[i].first);
|
||||
}
|
||||
|
||||
id_used[masterProcessId]++;
|
||||
auto tlogs = getWorkersForTlogsAcrossDatacenters( req.configuration, id_used );
|
||||
for(int i = 0; i < tlogs.size(); i++)
|
||||
|
|
|
@ -64,15 +64,16 @@ struct ClusterControllerFullInterface {
|
|||
|
||||
struct RecruitFromConfigurationRequest {
|
||||
DatabaseConfiguration configuration;
|
||||
bool recruitSeedServers;
|
||||
ReplyPromise< struct RecruitFromConfigurationReply > reply;
|
||||
|
||||
RecruitFromConfigurationRequest() {}
|
||||
explicit RecruitFromConfigurationRequest(DatabaseConfiguration const& configuration)
|
||||
: configuration(configuration) {}
|
||||
explicit RecruitFromConfigurationRequest(DatabaseConfiguration const& configuration, bool recruitSeedServers)
|
||||
: configuration(configuration), recruitSeedServers(recruitSeedServers) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
ar & configuration & reply;
|
||||
ar & configuration & recruitSeedServers & reply;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -80,10 +81,11 @@ struct RecruitFromConfigurationReply {
|
|||
vector<WorkerInterface> tLogs;
|
||||
vector<WorkerInterface> proxies;
|
||||
vector<WorkerInterface> resolvers;
|
||||
vector<WorkerInterface> storageServers;
|
||||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
ar & tLogs & proxies & resolvers;
|
||||
ar & tLogs & proxies & resolvers & storageServers;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -250,53 +250,34 @@ ACTOR Future<Void> newTLogServers( Reference<MasterData> self, Future< RecruitFr
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> newSeedServers( Reference<MasterData> self, vector<StorageServerInterface>* servers ) {
|
||||
ACTOR Future<Void> newSeedServers( Reference<MasterData> self, RecruitFromConfigurationReply recruits, vector<StorageServerInterface>* servers ) {
|
||||
// This is only necessary if the database is at version 0
|
||||
servers->clear();
|
||||
if (self->lastEpochEnd) return Void();
|
||||
|
||||
state Tag tag = 0;
|
||||
state std::set<Optional<Standalone<StringRef>>> dataCenters;
|
||||
while( servers->size() < self->configuration.storageTeamSize ) {
|
||||
try {
|
||||
RecruitStorageRequest req;
|
||||
req.criticalRecruitment = true;
|
||||
for(auto s = servers->begin(); s != servers->end(); ++s)
|
||||
req.excludeMachines.push_back(s->locality.zoneId());
|
||||
while( tag < recruits.storageServers.size() ) {
|
||||
TraceEvent("MasterRecruitingInitialStorageServer", self->dbgid)
|
||||
.detail("CandidateWorker", recruits.storageServers[tag].locality.toString());
|
||||
|
||||
TraceEvent("MasterRecruitingInitialStorageServer", self->dbgid)
|
||||
.detail("ExcludingMachines", req.excludeMachines.size())
|
||||
.detail("ExcludingDataCenters", req.excludeDCs.size());
|
||||
InitializeStorageRequest isr;
|
||||
isr.seedTag = tag;
|
||||
isr.storeType = self->configuration.storageServerStoreType;
|
||||
isr.reqId = g_random->randomUniqueID();
|
||||
isr.interfaceId = g_random->randomUniqueID();
|
||||
|
||||
RecruitStorageReply candidateWorker = wait( brokenPromiseToNever( self->clusterController.recruitStorage.getReply( req ) ) );
|
||||
ErrorOr<StorageServerInterface> newServer = wait( recruits.storageServers[tag].storage.tryGetReply( isr ) );
|
||||
|
||||
TraceEvent("MasterRecruitingInitialStorageServer", self->dbgid)
|
||||
.detail("CandidateWorker", candidateWorker.worker.locality.toString());
|
||||
if( newServer.isError() ) {
|
||||
if( !newServer.isError( error_code_recruitment_failed ) && !newServer.isError( error_code_request_maybe_delivered ) )
|
||||
throw newServer.getError();
|
||||
|
||||
InitializeStorageRequest isr;
|
||||
isr.seedTag = tag;
|
||||
isr.storeType = self->configuration.storageServerStoreType;
|
||||
isr.reqId = g_random->randomUniqueID();
|
||||
isr.interfaceId = g_random->randomUniqueID();
|
||||
|
||||
ErrorOr<StorageServerInterface> newServer = wait( candidateWorker.worker.storage.tryGetReply( isr ) );
|
||||
|
||||
if( newServer.isError() ) {
|
||||
if( !newServer.isError( error_code_recruitment_failed ) && !newServer.isError( error_code_request_maybe_delivered ) )
|
||||
throw newServer.getError();
|
||||
|
||||
TEST( true ); // masterserver initial storage recuitment loop failed to get new server
|
||||
Void _ = wait( delay(SERVER_KNOBS->STORAGE_RECRUITMENT_DELAY) );
|
||||
}
|
||||
else {
|
||||
servers->push_back( newServer.get() );
|
||||
dataCenters.insert( newServer.get().locality.dcId() );
|
||||
tag++;
|
||||
}
|
||||
} catch ( Error &e ) {
|
||||
if(e.code() != error_code_timed_out) {
|
||||
throw;
|
||||
}
|
||||
TEST( true ); // masterserver initial storage recuitment loop failed to get new server
|
||||
Void _ = wait( delay(SERVER_KNOBS->STORAGE_RECRUITMENT_DELAY) );
|
||||
}
|
||||
else {
|
||||
servers->push_back( newServer.get() );
|
||||
tag++;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -486,7 +467,7 @@ ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<Storage
|
|||
|
||||
RecruitFromConfigurationReply recruits = wait(
|
||||
brokenPromiseToNever( self->clusterController.recruitFromConfiguration.getReply(
|
||||
RecruitFromConfigurationRequest( self->configuration ) ) ) );
|
||||
RecruitFromConfigurationRequest( self->configuration, self->lastEpochEnd==0 ) ) ) );
|
||||
|
||||
TraceEvent("MasterRecoveryState", self->dbgid)
|
||||
.detail("StatusCode", RecoveryStatus::initializing_transaction_servers)
|
||||
|
@ -499,7 +480,7 @@ ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<Storage
|
|||
// Actually, newSeedServers does both the recruiting and initialization of the seed servers; so if this is a brand new database we are sort of lying that we are
|
||||
// past the recruitment phase. In a perfect world we would split that up so that the recruitment part happens above (in parallel with recruiting the transaction servers?).
|
||||
|
||||
Void _ = wait( newProxies( self, recruits ) && newResolvers( self, recruits ) && newTLogServers( self, recruits, oldLogSystem ) && newSeedServers( self, seedServers ) );
|
||||
Void _ = wait( newProxies( self, recruits ) && newResolvers( self, recruits ) && newTLogServers( self, recruits, oldLogSystem ) && newSeedServers( self, recruits, seedServers ) );
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue