Recruit backup worker in newEpoch

This commit is contained in:
Jingyu Zhou 2019-05-20 14:22:31 -07:00
parent ece3cadf8e
commit a4d6ebe79e
8 changed files with 56 additions and 30 deletions

View File

@ -110,10 +110,8 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, uint64_t r
}
}
ACTOR Future<Void> backupWorker(
BackupInterface interf, InitializeBackupRequest req,
Reference<AsyncVar<ServerDBInfo>> db)
{
ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest req,
Reference<AsyncVar<ServerDBInfo>> db) {
state BackupData self(interf.id(), req);
state PromiseStream<Future<Void>> addActor;
state Future<Void> error = actorCollection(addActor.getFuture());
@ -122,6 +120,7 @@ ACTOR Future<Void> backupWorker(
TraceEvent("BackupWorkerStart", interf.id());
try {
addActor.send(pullAsyncData(&self));
addActor.send(waitFailureServer(interf.waitFailure.getFuture()));
loop choose {
when(wait(dbInfoChange)) {

View File

@ -25,13 +25,14 @@
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Locality.h"
// The interface for backup workers.
struct BackupInterface {
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct HaltBackupRequest> haltBackup;
struct LocalityData locality;
UID myId;
BackupInterface() {}
BackupInterface() = default;
explicit BackupInterface(const struct LocalityData& l, UID id) : locality(l), myId(id) {}
void initEndpoints() {}

View File

@ -355,6 +355,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( LAST_LIMITED_RATIO, 2.0 );
// Backup Worker
init( BACKUP_TIMEOUT, 0.4 );
init( BACKUP_FAILURE_TIME, 1.0 );
init( WAIT_FOR_BACKUP_JOIN_DELAY, 1.0 );

View File

@ -41,6 +41,7 @@ class LogSet : NonCopyable, public ReferenceCounted<LogSet> {
public:
std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> logServers;
std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> logRouters;
std::vector<Reference<AsyncVar<OptionalInterface<BackupInterface>>>> backupWorkers;
int32_t tLogWriteAntiQuorum;
int32_t tLogReplicationFactor;
std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers

View File

@ -96,7 +96,7 @@ struct TLogSet {
explicit TLogSet(const LogSet& rhs);
std::string toString() const {
return format("anti: %d replication: %d local: %d routers: %d tLogs: %s backupWorkers: %s locality: %d",
return format("anti: %d replication: %d local: %d routers: %d tLogs: %s backupWorkers: %d locality: %d",
tLogWriteAntiQuorum, tLogReplicationFactor, isLocal, logRouters.size(), describe(tLogs).c_str(),
backupWorkers.size(), locality);
}

View File

@ -81,12 +81,15 @@ LogSet::LogSet(const TLogSet& tLogSet) :
locality(tLogSet.locality), startVersion(tLogSet.startVersion),
satelliteTagLocations(tLogSet.satelliteTagLocations)
{
for(const auto& log : tLogSet.tLogs) {
for (const auto& log : tLogSet.tLogs) {
logServers.emplace_back(new AsyncVar<OptionalInterface<TLogInterface>>(log));
}
for(const auto& log : tLogSet.logRouters) {
for (const auto& log : tLogSet.logRouters) {
logRouters.emplace_back(new AsyncVar<OptionalInterface<TLogInterface>>(log));
}
for (const auto& log : tLogSet.backupWorkers) {
backupWorkers.emplace_back(new AsyncVar<OptionalInterface<BackupInterface>>(log));
}
filterLocalityDataForPolicy(tLogPolicy, &tLogLocalities);
updateLocalitySet(tLogLocalities);
}
@ -117,10 +120,12 @@ TLogSet::TLogSet(const LogSet& rhs) :
for (const auto& tlog : rhs.logServers) {
tLogs.push_back(tlog->get());
}
for (const auto& logRouter : rhs.logRouters) {
logRouters.push_back(logRouter->get());
}
for (const auto& worker : rhs.backupWorkers) {
backupWorkers.push_back(worker->get());
}
}
OldTLogConf::OldTLogConf(const OldLogData& oldLogData) :
@ -399,6 +404,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
changes.push_back(t->onChange());
}
}
for (const auto& worker : it->backupWorkers) {
if (worker->get().present()) {
failed.push_back(waitFailureClient(
worker->get().interf().waitFailure, SERVER_KNOBS->BACKUP_TIMEOUT,
-SERVER_KNOBS->BACKUP_TIMEOUT / SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY));
} else {
changes.push_back(worker->onChange());
}
}
}
if(!self->recoveryCompleteWrittenToCoreState.get()) {
@ -1833,6 +1847,30 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return localTags;
}
ACTOR static Future<Void> newBackupWorkers(Reference<LogSet> logSet, RecruitFromConfigurationReply recruits,
LogEpoch recoveryCount) {
std::vector<Future<BackupInterface>> initializationReplies;
for (const auto& worker : recruits.backupWorkers) {
InitializeBackupRequest req(g_random->randomUniqueID());
req.recoveryCount = recoveryCount;
req.startVersion = logSet->startVersion;
TraceEvent("BackupRecruitment").detail("WorkerID", worker.id()).detail("RecoveryCount", recoveryCount)
.detail("StartVersion", req.startVersion);
initializationReplies.push_back(transformErrors(
throwErrorOr(worker.backup.getReplyUnlessFailedFor(req, SERVER_KNOBS->BACKUP_TIMEOUT,
SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
master_recovery_failed()));
}
std::vector<BackupInterface> newRecruits = wait(getAll(initializationReplies));
for (const auto& interf : newRecruits) {
logSet->backupWorkers.emplace_back(
new AsyncVar<OptionalInterface<BackupInterface>>(OptionalInterface<BackupInterface>(interf)));
}
return Void();
}
ACTOR static Future<Void> newRemoteEpoch( TagPartitionedLogSystem* self, Reference<TagPartitionedLogSystem> oldLogSystem, Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, int8_t remoteLocality, std::vector<Tag> allTags ) {
TraceEvent("RemoteLogRecruitment_WaitingForWorkers");
state RecruitRemoteFromConfigurationReply remoteWorkers = wait( fRemoteWorkers );
@ -2162,6 +2200,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for( int i = 0; i < recr.tLogs.size(); i++ )
initializationReplies.push_back( transformErrors( throwErrorOr( recr.tLogs[i].tLog.getReplyUnlessFailedFor( reqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
state Future<Void> recruitBackup = newBackupWorkers(logSystem->tLogs[0], recr, recoveryCount);
state std::vector<Future<Void>> recoveryComplete;
if(region.satelliteTLogReplicationFactor > 0) {
@ -2235,6 +2274,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
wait( waitForAll( initializationReplies ) || oldRouterRecruitment );
wait(recruitBackup);
for( int i = 0; i < initializationReplies.size(); i++ ) {
logSystem->tLogs[0]->logServers[i] = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(initializationReplies[i].get()) ) );

View File

@ -197,7 +197,6 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
std::vector<MasterProxyInterface> proxies;
std::vector<MasterProxyInterface> provisionalProxies;
std::vector<ResolverInterface> resolvers;
std::vector<BackupInterface> backupWorkers;
std::map<UID, ProxyVersionReplies> lastProxyVersionReplies;
@ -339,23 +338,6 @@ ACTOR Future<Void> newTLogServers( Reference<MasterData> self, RecruitFromConfig
return Void();
}
ACTOR Future<Void> newBackupWorkers(Reference<MasterData> self, RecruitFromConfigurationReply recruits) {
std::vector<Future<BackupInterface>> initializationReplies;
for (int i = 0; i < recruits.backupWorkers.size(); i++) {
InitializeBackupRequest req(g_random->randomUniqueID());
TraceEvent("BackupReplies", self->dbgid).detail("WorkerID", recruits.backupWorkers[i].id());
initializationReplies.push_back(
transformErrors(throwErrorOr(recruits.backupWorkers[i].backup.getReplyUnlessFailedFor(
req, SERVER_KNOBS->BACKUP_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
master_recovery_failed()));
}
std::vector<BackupInterface> newRecruits = wait(getAll(initializationReplies));
self->backupWorkers = newRecruits;
return Void();
}
ACTOR Future<Void> newSeedServers( Reference<MasterData> self, RecruitFromConfigurationReply recruits, vector<StorageServerInterface>* servers ) {
// This is only necessary if the database is at version 0
servers->clear();
@ -625,7 +607,7 @@ ACTOR Future<vector<Standalone<CommitTransactionRef>>> recruitEverything( Refere
wait( newSeedServers( self, recruits, seedServers ) );
state vector<Standalone<CommitTransactionRef>> confChanges;
wait(newProxies(self, recruits) && newResolvers(self, recruits) &&
newTLogServers(self, recruits, oldLogSystem, &confChanges) && newBackupWorkers(self, recruits));
newTLogServers(self, recruits, oldLogSystem, &confChanges));
return confChanges;
}
@ -1112,6 +1094,7 @@ static std::set<int> const& normalMasterErrors() {
s.insert( error_code_master_tlog_failed );
s.insert( error_code_master_proxy_failed );
s.insert( error_code_master_resolver_failed );
s.insert( error_code_master_backup_worker_failed );
s.insert( error_code_recruitment_failed );
s.insert( error_code_no_more_servers );
s.insert( error_code_master_recovery_failed );
@ -1508,9 +1491,9 @@ ACTOR Future<Void> masterServer( MasterInterface mi, Reference<AsyncVar<ServerDB
TEST(err.code() == error_code_master_tlog_failed); // Master: terminated because of a tLog failure
TEST(err.code() == error_code_master_proxy_failed); // Master: terminated because of a proxy failure
TEST(err.code() == error_code_master_resolver_failed); // Master: terminated because of a resolver failure
TEST(err.code() == error_code_master_backup_worker_failed); // Master: terminated because of a backup worker failure
if (normalMasterErrors().count(err.code()))
{
if (normalMasterErrors().count(err.code())) {
TraceEvent("MasterTerminated", mi.id()).error(err);
return Void();
}

View File

@ -86,6 +86,7 @@ ERROR( please_reboot_delete, 1208, "Reboot of server process requested, with del
ERROR( master_proxy_failed, 1209, "Master terminating because a Proxy failed" )
ERROR( master_resolver_failed, 1210, "Master terminating because a Resolver failed" )
ERROR( server_overloaded, 1211, "Server is under too much load and cannot respond" )
ERROR( master_backup_worker_failed, 1212, "Master terminating because a backup worker failed")
// 15xx Platform errors
ERROR( platform_error, 1500, "Platform error" )