fixed a number of bugs related to running fearless without remote logs
This commit is contained in:
parent
d3116fb336
commit
9630deba3a
|
@ -2013,6 +2013,8 @@ ACTOR Future<Void> dataDistribution(
|
|||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges,
|
||||
Reference<ILogSystem> logSystem,
|
||||
Version recoveryCommitVersion,
|
||||
std::vector<Optional<Key>> primaryDcId,
|
||||
std::vector<Optional<Key>> remoteDcId,
|
||||
double* lastLimited)
|
||||
{
|
||||
state Database cx = openDBOnServer(db, TaskDataDistributionLaunch, true, true);
|
||||
|
@ -2110,17 +2112,12 @@ ACTOR Future<Void> dataDistribution(
|
|||
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure( new ShardsAffectedByTeamFailure );
|
||||
vector<Future<Void>> actors;
|
||||
|
||||
std::vector<Optional<Key>> primaryDcId;
|
||||
primaryDcId.push_back(configuration.primaryDcId);
|
||||
|
||||
actors.push_back( pollMoveKeysLock(cx, lock) );
|
||||
actors.push_back( popOldTags( cx, logSystem, recoveryCommitVersion) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, shardsAffectedByTeamFailure, output, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, getShardMetrics, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, configuration.storageTeamSize, configuration.durableStorageQuorum, lastLimited ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, serverChanges, readyToStart.getFuture() ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
|
||||
if (configuration.remoteTLogReplicationFactor > 0) {
|
||||
std::vector<Optional<Key>> remoteDcId;
|
||||
remoteDcId.push_back(configuration.remoteDcId);
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcId, serverChanges, readyToStart.getFuture() ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
|
||||
}
|
||||
|
||||
|
|
|
@ -171,6 +171,8 @@ Future<Void> dataDistribution(
|
|||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > const& serverChanges,
|
||||
Reference<ILogSystem> const& logSystem,
|
||||
Version const& recoveryCommitVersion,
|
||||
std::vector<Optional<Key>> const& primaryDcId,
|
||||
std::vector<Optional<Key>> const& remoteDcId,
|
||||
double* const& lastLimited);
|
||||
|
||||
Future<Void> dataDistributionTracker(
|
||||
|
|
|
@ -510,6 +510,8 @@ struct ILogSystem {
|
|||
|
||||
virtual void getPushLocations( std::vector<Tag> const& tags, vector<int>& locations ) = 0;
|
||||
|
||||
virtual bool hasRemoteLogs() = 0;
|
||||
|
||||
virtual void addRemoteTags( int logSet, std::vector<Tag> const& originalTags, std::vector<int>& tags ) = 0;
|
||||
|
||||
virtual Tag getRandomRouterTag() = 0;
|
||||
|
@ -564,7 +566,9 @@ struct LogPushData : NonCopyable {
|
|||
void addMessage( StringRef rawMessageWithoutLength, bool usePreviousLocations = false ) {
|
||||
if( !usePreviousLocations ) {
|
||||
prev_tags.clear();
|
||||
prev_tags.push_back( logSystem->getRandomRouterTag() );
|
||||
if(logSystem->hasRemoteLogs()) {
|
||||
prev_tags.push_back( logSystem->getRandomRouterTag() );
|
||||
}
|
||||
for(auto& tag : next_message_tags) {
|
||||
prev_tags.push_back(tag);
|
||||
}
|
||||
|
@ -587,7 +591,9 @@ struct LogPushData : NonCopyable {
|
|||
template <class T>
|
||||
void addTypedMessage( T const& item ) {
|
||||
prev_tags.clear();
|
||||
prev_tags.push_back( logSystem->getRandomRouterTag() );
|
||||
if(logSystem->hasRemoteLogs()) {
|
||||
prev_tags.push_back( logSystem->getRandomRouterTag() );
|
||||
}
|
||||
for(auto& tag : next_message_tags) {
|
||||
prev_tags.push_back(tag);
|
||||
}
|
||||
|
|
|
@ -644,10 +644,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
return std::numeric_limits<Version>::max();
|
||||
}
|
||||
|
||||
virtual void addRemoteTags( int logSet, std::vector<Tag> const& originalTags, std::vector<int>& tags ) {
|
||||
tLogs[logSet]->getPushLocations(originalTags, tags, 0);
|
||||
}
|
||||
|
||||
virtual void getPushLocations( std::vector<Tag> const& tags, std::vector<int>& locations ) {
|
||||
int locationOffset = 0;
|
||||
for(auto& log : tLogs) {
|
||||
|
@ -658,6 +654,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
}
|
||||
}
|
||||
|
||||
virtual bool hasRemoteLogs() {
|
||||
return minRouters > 0;
|
||||
}
|
||||
|
||||
virtual void addRemoteTags( int logSet, std::vector<Tag> const& originalTags, std::vector<int>& tags ) {
|
||||
tLogs[logSet]->getPushLocations(originalTags, tags, 0);
|
||||
}
|
||||
|
||||
virtual Tag getRandomRouterTag() {
|
||||
ASSERT(minRouters < 1e6);
|
||||
return Tag(tagLocalityLogRouter, g_random->randomInt(0, minRouters));
|
||||
|
|
|
@ -172,6 +172,8 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
|||
|
||||
DatabaseConfiguration originalConfiguration;
|
||||
DatabaseConfiguration configuration;
|
||||
std::vector<Optional<Key>> primaryDcId;
|
||||
std::vector<Optional<Key>> remoteDcId;
|
||||
bool hasConfiguration;
|
||||
|
||||
ServerCoordinators coordinators;
|
||||
|
@ -242,21 +244,16 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
|||
~MasterData() { if(txnStateStore) txnStateStore->close(); }
|
||||
};
|
||||
|
||||
ACTOR Future<Void> newProxies( Reference<MasterData> self, Future< RecruitFromConfigurationReply > recruits ) {
|
||||
self->proxies.clear();
|
||||
|
||||
RecruitFromConfigurationReply recr = wait( recruits );
|
||||
state std::vector<WorkerInterface> workers = recr.proxies;
|
||||
|
||||
state vector<Future<MasterProxyInterface>> initializationReplies;
|
||||
for( int i = 0; i < workers.size(); i++ ) {
|
||||
ACTOR Future<Void> newProxies( Reference<MasterData> self, RecruitFromConfigurationReply recr ) {
|
||||
vector<Future<MasterProxyInterface>> initializationReplies;
|
||||
for( int i = 0; i < recr.proxies.size(); i++ ) {
|
||||
InitializeMasterProxyRequest req;
|
||||
req.master = self->myInterface;
|
||||
req.recoveryCount = self->cstate.myDBState.recoveryCount + 1;
|
||||
req.recoveryTransactionVersion = self->recoveryTransactionVersion;
|
||||
req.firstProxy = i == 0;
|
||||
TraceEvent("ProxyReplies",self->dbgid).detail("workerID",workers[i].id());
|
||||
initializationReplies.push_back( transformErrors( throwErrorOr( workers[i].masterProxy.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
|
||||
TraceEvent("ProxyReplies",self->dbgid).detail("workerID", recr.proxies[i].id());
|
||||
initializationReplies.push_back( transformErrors( throwErrorOr( recr.proxies[i].masterProxy.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
|
||||
}
|
||||
|
||||
vector<MasterProxyInterface> newRecruits = wait( getAll( initializationReplies ) );
|
||||
|
@ -266,20 +263,15 @@ ACTOR Future<Void> newProxies( Reference<MasterData> self, Future< RecruitFromCo
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> newResolvers( Reference<MasterData> self, Future< RecruitFromConfigurationReply > recruits ) {
|
||||
self->resolvers.clear();
|
||||
|
||||
RecruitFromConfigurationReply recr = wait( recruits );
|
||||
state std::vector<WorkerInterface> workers = recr.resolvers;
|
||||
|
||||
state vector<Future<ResolverInterface>> initializationReplies;
|
||||
for( int i = 0; i < workers.size(); i++ ) {
|
||||
ACTOR Future<Void> newResolvers( Reference<MasterData> self, RecruitFromConfigurationReply recr ) {
|
||||
vector<Future<ResolverInterface>> initializationReplies;
|
||||
for( int i = 0; i < recr.resolvers.size(); i++ ) {
|
||||
InitializeResolverRequest req;
|
||||
req.recoveryCount = self->cstate.myDBState.recoveryCount + 1;
|
||||
req.proxyCount = recr.proxies.size();
|
||||
req.resolverCount = recr.resolvers.size();
|
||||
TraceEvent("ResolverReplies",self->dbgid).detail("workerID",workers[i].id());
|
||||
initializationReplies.push_back( transformErrors( throwErrorOr( workers[i].resolver.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
|
||||
TraceEvent("ResolverReplies",self->dbgid).detail("workerID", recr.resolvers[i].id());
|
||||
initializationReplies.push_back( transformErrors( throwErrorOr( recr.resolvers[i].resolver.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
|
||||
}
|
||||
|
||||
vector<ResolverInterface> newRecruits = wait( getAll( initializationReplies ) );
|
||||
|
@ -288,16 +280,16 @@ ACTOR Future<Void> newResolvers( Reference<MasterData> self, Future< RecruitFrom
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> newTLogServers( Reference<MasterData> self, Future< RecruitFromConfigurationReply > recruits, Reference<ILogSystem> oldLogSystem ) {
|
||||
if(!self->dcId_locality.count(self->configuration.primaryDcId) || !self->dcId_locality.count(self->configuration.remoteDcId)) {
|
||||
ACTOR Future<Void> newTLogServers( Reference<MasterData> self, RecruitFromConfigurationReply recr, Reference<ILogSystem> oldLogSystem ) {
|
||||
state Optional<Key> primaryDcId = recr.remoteDcId == self->configuration.remoteDcId ? self->configuration.primaryDcId : self->configuration.remoteDcId;
|
||||
if( !self->dcId_locality.count(primaryDcId) || (self->configuration.remoteTLogReplicationFactor > 0 && !self->dcId_locality.count(recr.remoteDcId)) ) {
|
||||
TraceEvent(SevWarnAlways, "UnknownDCID", self->dbgid).detail("primaryFound", self->dcId_locality.count(self->configuration.primaryDcId)).detail("remoteFound", self->dcId_locality.count(self->configuration.primaryDcId)).detail("primaryId", printable(self->configuration.primaryDcId)).detail("remoteId", printable(self->configuration.remoteDcId));
|
||||
Void _ = wait( Future<Void>(Never()) );
|
||||
}
|
||||
|
||||
RecruitFromConfigurationReply recr = wait( recruits );
|
||||
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, recr.remoteDcId ) ) );
|
||||
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = self->configuration.remoteTLogReplicationFactor > 0 ? brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, recr.remoteDcId ) ) ) : Never();
|
||||
|
||||
Optional<Key> primaryDcId = recr.remoteDcId == self->configuration.remoteDcId ? self->configuration.primaryDcId : self->configuration.remoteDcId;
|
||||
|
||||
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->dcId_locality[primaryDcId], self->dcId_locality[recr.remoteDcId] ) );
|
||||
self->logSystem = newLogSystem;
|
||||
|
||||
|
@ -534,6 +526,11 @@ ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<Storage
|
|||
brokenPromiseToNever( self->clusterController.recruitFromConfiguration.getReply(
|
||||
RecruitFromConfigurationRequest( self->configuration, self->lastEpochEnd==0 ) ) ) );
|
||||
|
||||
self->primaryDcId.clear();
|
||||
self->primaryDcId.push_back(recruits.remoteDcId == self->configuration.remoteDcId ? self->configuration.primaryDcId : self->configuration.remoteDcId);
|
||||
self->remoteDcId.clear();
|
||||
self->remoteDcId.push_back(recruits.remoteDcId);
|
||||
|
||||
TraceEvent("MasterRecoveryState", self->dbgid)
|
||||
.detail("StatusCode", RecoveryStatus::initializing_transaction_servers)
|
||||
.detail("Status", RecoveryStatus::names[RecoveryStatus::initializing_transaction_servers])
|
||||
|
@ -1230,7 +1227,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
|
|||
{
|
||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > ddStorageServerChanges;
|
||||
state double lastLimited = 0;
|
||||
self->addActor.send( reportErrorsExcept( dataDistribution( self->dbInfo, self->myInterface, self->configuration, ddStorageServerChanges, self->logSystem, self->recoveryTransactionVersion, &lastLimited ), "DataDistribution", self->dbgid, &normalMasterErrors() ) );
|
||||
self->addActor.send( reportErrorsExcept( dataDistribution( self->dbInfo, self->myInterface, self->configuration, ddStorageServerChanges, self->logSystem, self->recoveryTransactionVersion, self->primaryDcId, self->remoteDcId, &lastLimited ), "DataDistribution", self->dbgid, &normalMasterErrors() ) );
|
||||
self->addActor.send( reportErrors( rateKeeper( self->dbInfo, ddStorageServerChanges, self->myInterface.getRateInfo.getFuture(), self->dbName, self->configuration, &lastLimited ), "Ratekeeper", self->dbgid) );
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue