Merge pull request #526 from etschannen/feature-remote-logs
Added configurable log routers and updated wait for good recruitment logic
This commit is contained in:
commit
4fd303b46c
|
@ -365,6 +365,7 @@
|
|||
"remote_redundancy_mode":"remote_single",
|
||||
"remote_log_replicas":3,
|
||||
"remote_logs":5,
|
||||
"log_routers":10,
|
||||
"usable_regions":1,
|
||||
"storage_replicas":1,
|
||||
"resolvers":1,
|
||||
|
|
|
@ -884,6 +884,12 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
|
|||
|
||||
if (statusObjConfig.get("logs", intVal))
|
||||
outputString += format("\n Desired Logs - %d", intVal);
|
||||
|
||||
if (statusObjConfig.get("remote_logs", intVal))
|
||||
outputString += format("\n Desired Remote Logs - %d", intVal);
|
||||
|
||||
if (statusObjConfig.get("log_routers", intVal))
|
||||
outputString += format("\n Desired Log Routers - %d", intVal);
|
||||
}
|
||||
catch (std::runtime_error& e) {
|
||||
outputString = outputStringCache;
|
||||
|
|
|
@ -29,7 +29,7 @@ DatabaseConfiguration::DatabaseConfiguration()
|
|||
void DatabaseConfiguration::resetInternal() {
|
||||
// does NOT reset rawConfiguration
|
||||
initialized = false;
|
||||
masterProxyCount = resolverCount = desiredTLogCount = tLogWriteAntiQuorum = tLogReplicationFactor = storageTeamSize = -1;
|
||||
masterProxyCount = resolverCount = desiredTLogCount = tLogWriteAntiQuorum = tLogReplicationFactor = storageTeamSize = desiredLogRouterCount = -1;
|
||||
tLogDataStoreType = storageServerStoreType = KeyValueStoreType::END;
|
||||
autoMasterProxyCount = CLIENT_KNOBS->DEFAULT_AUTO_PROXIES;
|
||||
autoResolverCount = CLIENT_KNOBS->DEFAULT_AUTO_RESOLVERS;
|
||||
|
@ -297,6 +297,9 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
|
|||
if( resolverCount != -1 ) {
|
||||
result["resolvers"] = resolverCount;
|
||||
}
|
||||
if( desiredLogRouterCount != -1 ) {
|
||||
result["log_routers"] = desiredLogRouterCount;
|
||||
}
|
||||
if( remoteDesiredTLogCount != -1 ) {
|
||||
result["remote_logs"] = remoteDesiredTLogCount;
|
||||
}
|
||||
|
@ -336,6 +339,7 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
|
|||
else if (ck == LiteralStringRef("auto_logs")) parse(&autoDesiredTLogCount, value);
|
||||
else if (ck == LiteralStringRef("storage_replication_policy")) parseReplicationPolicy(&storagePolicy, value);
|
||||
else if (ck == LiteralStringRef("log_replication_policy")) parseReplicationPolicy(&tLogPolicy, value);
|
||||
else if (ck == LiteralStringRef("log_routers")) parse(&desiredLogRouterCount, value);
|
||||
else if (ck == LiteralStringRef("remote_logs")) parse(&remoteDesiredTLogCount, value);
|
||||
else if (ck == LiteralStringRef("remote_log_replicas")) parse(&remoteTLogReplicationFactor, value);
|
||||
else if (ck == LiteralStringRef("remote_log_policy")) parseReplicationPolicy(&remoteTLogPolicy, value);
|
||||
|
|
|
@ -159,6 +159,7 @@ struct DatabaseConfiguration {
|
|||
KeyValueStoreType storageServerStoreType;
|
||||
|
||||
// Remote TLogs
|
||||
int32_t desiredLogRouterCount;
|
||||
int32_t remoteDesiredTLogCount;
|
||||
int32_t remoteTLogReplicationFactor;
|
||||
IRepPolicyRef remoteTLogPolicy;
|
||||
|
|
|
@ -65,14 +65,14 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
|
|||
std::string key = mode.substr(0, pos);
|
||||
std::string value = mode.substr(pos+1);
|
||||
|
||||
if( (key == "logs" || key == "proxies" || key == "resolvers" || key == "remote_logs" || key == "satellite_logs" || key == "usable_regions") && isInteger(value) ) {
|
||||
if( (key == "logs" || key == "proxies" || key == "resolvers" || key == "remote_logs" || key == "log_routers" || key == "satellite_logs" || key == "usable_regions") && isInteger(value) ) {
|
||||
out[p+key] = value;
|
||||
}
|
||||
|
||||
if( key == "regions" ) {
|
||||
json_spirit::mValue mv;
|
||||
json_spirit::read_string( value, mv );
|
||||
|
||||
|
||||
StatusObject regionObj;
|
||||
regionObj["regions"] = mv;
|
||||
out[p+key] = BinaryWriter::toValue(regionObj, IncludeVersion()).toString();
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
struct ProcessClass {
|
||||
// This enum is stored in restartInfo.ini for upgrade tests, so be very careful about changing the existing items!
|
||||
enum ClassType { UnsetClass, StorageClass, TransactionClass, ResolutionClass, TesterClass, ProxyClass, MasterClass, StatelessClass, LogClass, ClusterControllerClass, LogRouterClass, InvalidClass = -1 };
|
||||
enum Fitness { BestFit, GoodFit, OkayFit, UnsetFit, WorstFit, ExcludeFit, NeverAssign }; //cannot be larger than 7 because of leader election mask
|
||||
enum Fitness { BestFit, GoodFit, UnsetFit, OkayFit, WorstFit, ExcludeFit, NeverAssign }; //cannot be larger than 7 because of leader election mask
|
||||
enum ClusterRole { Storage, TLog, Proxy, Master, Resolver, LogRouter, ClusterController };
|
||||
enum ClassSource { CommandLineSource, AutoSource, DBSource, InvalidSource = -1 };
|
||||
int16_t _class;
|
||||
|
|
|
@ -439,6 +439,11 @@ public:
|
|||
return false;
|
||||
}
|
||||
|
||||
bool betterCount (RoleFitness const& r) const {
|
||||
if(count > r.count) return true;
|
||||
return worstFit < r.worstFit;
|
||||
}
|
||||
|
||||
bool operator == (RoleFitness const& r) const { return worstFit == r.worstFit && bestFit == r.bestFit && count == r.count; }
|
||||
};
|
||||
|
||||
|
@ -459,7 +464,6 @@ public:
|
|||
|
||||
std::set<Optional<Key>> remoteDC;
|
||||
remoteDC.insert(req.dcId);
|
||||
|
||||
|
||||
auto remoteLogs = getWorkersForTlogs( req.configuration, req.configuration.getRemoteTLogReplicationFactor(), req.configuration.getDesiredRemoteLogs(), req.configuration.getRemoteTLogPolicy(), id_used, false, remoteDC );
|
||||
for(int i = 0; i < remoteLogs.size(); i++) {
|
||||
|
@ -471,9 +475,13 @@ public:
|
|||
result.logRouters.push_back(logRouters[i].first);
|
||||
}
|
||||
|
||||
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
|
||||
( ( RoleFitness(remoteLogs, ProcessClass::TLog) > RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ) ||
|
||||
( RoleFitness(logRouters, ProcessClass::LogRouter) > RoleFitness(SERVER_KNOBS->EXPECTED_LOG_ROUTER_FITNESS, req.logRouterCount) ) ) ) {
|
||||
if(!remoteStartTime.present()) {
|
||||
remoteStartTime = now();
|
||||
}
|
||||
|
||||
if( now() - remoteStartTime.get() < SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY &&
|
||||
( ( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()).betterCount(RoleFitness(remoteLogs, ProcessClass::TLog)) ) ||
|
||||
( RoleFitness(SERVER_KNOBS->EXPECTED_LOG_ROUTER_FITNESS, req.logRouterCount).betterCount(RoleFitness(logRouters, ProcessClass::LogRouter)) ) ) ) {
|
||||
throw operation_failed();
|
||||
}
|
||||
|
||||
|
@ -562,10 +570,10 @@ public:
|
|||
}
|
||||
|
||||
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
|
||||
( RoleFitness(tlogs, ProcessClass::TLog) > RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
|
||||
( region.satelliteTLogReplicationFactor > 0 && RoleFitness(satelliteLogs, ProcessClass::TLog) > RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredSatelliteLogs(dcId)) ) ||
|
||||
RoleFitness(proxies, ProcessClass::Proxy) > RoleFitness(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, req.configuration.getDesiredProxies()) ||
|
||||
RoleFitness(resolvers, ProcessClass::Resolver) > RoleFitness(SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredResolvers()) ) ) {
|
||||
( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()).betterCount(RoleFitness(tlogs, ProcessClass::TLog)) ||
|
||||
( region.satelliteTLogReplicationFactor > 0 && RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredSatelliteLogs(dcId)).betterCount(RoleFitness(satelliteLogs, ProcessClass::TLog)) ) ||
|
||||
RoleFitness(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, req.configuration.getDesiredProxies()).betterCount(RoleFitness(proxies, ProcessClass::Proxy)) ||
|
||||
RoleFitness(SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredResolvers()).betterCount(RoleFitness(resolvers, ProcessClass::Resolver)) ) ) {
|
||||
return operation_failed();
|
||||
}
|
||||
|
||||
|
@ -593,7 +601,7 @@ public:
|
|||
}
|
||||
throw no_more_servers();
|
||||
} catch( Error& e ) {
|
||||
if (e.code() != error_code_no_more_servers || regions[1].priority < 0 || now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) {
|
||||
if (e.code() != error_code_no_more_servers || regions[1].priority < 0 || now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY) {
|
||||
throw;
|
||||
}
|
||||
TraceEvent(SevWarn, "AttemptingRecruitmentInRemoteDC", id).error(e);
|
||||
|
@ -703,8 +711,8 @@ public:
|
|||
.detail("DesiredResolvers", req.configuration.getDesiredResolvers()).detail("ActualResolvers", result.resolvers.size());
|
||||
|
||||
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
|
||||
( RoleFitness(tlogs, ProcessClass::TLog) > RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
|
||||
bestFitness > RoleFitness(std::min(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS), std::max(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS), req.configuration.getDesiredProxies()+req.configuration.getDesiredResolvers()) ) ) {
|
||||
( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()).betterCount(RoleFitness(tlogs, ProcessClass::TLog)) ||
|
||||
RoleFitness(std::min(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS), std::max(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS), req.configuration.getDesiredProxies()+req.configuration.getDesiredResolvers()).betterCount(bestFitness) ) ) {
|
||||
throw operation_failed();
|
||||
}
|
||||
|
||||
|
@ -895,13 +903,15 @@ public:
|
|||
|
||||
if(oldRemoteTLogFit < newRemoteTLogFit) return false;
|
||||
|
||||
int oldRouterCount = oldTLogFit.count * std::max<int>(1, db.config.desiredLogRouterCount / std::max(1,oldTLogFit.count));
|
||||
int newRouterCount = newTLogFit.count * std::max<int>(1, db.config.desiredLogRouterCount / std::max(1,newTLogFit.count));
|
||||
RoleFitness oldLogRoutersFit(log_routers, ProcessClass::LogRouter);
|
||||
RoleFitness newLogRoutersFit((db.config.usableRegions > 1 && dbi.recoveryState == RecoveryState::REMOTE_RECOVERED) ? getWorkersForRoleInDatacenter( *remoteDC.begin(), ProcessClass::LogRouter, newTLogFit.count, db.config, id_used, Optional<WorkerFitnessInfo>(), true ) : log_routers, ProcessClass::LogRouter);
|
||||
RoleFitness newLogRoutersFit((db.config.usableRegions > 1 && dbi.recoveryState == RecoveryState::REMOTE_RECOVERED) ? getWorkersForRoleInDatacenter( *remoteDC.begin(), ProcessClass::LogRouter, newRouterCount, db.config, id_used, Optional<WorkerFitnessInfo>(), true ) : log_routers, ProcessClass::LogRouter);
|
||||
|
||||
if(oldLogRoutersFit.count < oldTLogFit.count) {
|
||||
if(oldLogRoutersFit.count < oldRouterCount) {
|
||||
oldLogRoutersFit.worstFit = ProcessClass::NeverAssign;
|
||||
}
|
||||
if(newLogRoutersFit.count < newTLogFit.count) {
|
||||
if(newLogRoutersFit.count < newRouterCount) {
|
||||
newLogRoutersFit.worstFit = ProcessClass::NeverAssign;
|
||||
}
|
||||
|
||||
|
@ -958,6 +968,7 @@ public:
|
|||
DBInfo db;
|
||||
Database cx;
|
||||
double startTime;
|
||||
Optional<double> remoteStartTime;
|
||||
Version datacenterVersionDifference;
|
||||
bool versionDifferenceUpdated;
|
||||
|
||||
|
@ -1481,7 +1492,7 @@ ACTOR Future<Void> clusterRecruitRemoteFromConfiguration( ClusterControllerData*
|
|||
req.reply.send( self->findRemoteWorkersForConfiguration( req ) );
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_no_more_servers && now() - self->startTime >= SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) {
|
||||
if (e.code() == error_code_no_more_servers && self->remoteStartTime.present() && now() - self->remoteStartTime.get() >= SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY) {
|
||||
self->outstandingRemoteRecruitmentRequests.push_back( req );
|
||||
TraceEvent(SevWarn, "RecruitRemoteFromConfigurationNotAvailable", self->id).error(e);
|
||||
return Void();
|
||||
|
|
|
@ -270,7 +270,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( SHUTDOWN_TIMEOUT, 600 ); if( randomize && BUGGIFY ) SHUTDOWN_TIMEOUT = 60.0;
|
||||
init( MASTER_SPIN_DELAY, 1.0 ); if( randomize && BUGGIFY ) MASTER_SPIN_DELAY = 10.0;
|
||||
init( CC_CHANGE_DELAY, 0.1 );
|
||||
init( WAIT_FOR_GOOD_RECRUITMENT_DELAY, 0.1 );
|
||||
init( WAIT_FOR_GOOD_RECRUITMENT_DELAY, 1.0 );
|
||||
init( WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY, 5.0 );
|
||||
init( ATTEMPT_RECRUITMENT_DELAY, 0.035 );
|
||||
init( WORKER_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) WORKER_FAILURE_TIME = 10.0;
|
||||
init( CHECK_BETTER_MASTER_INTERVAL, 1.0 ); if( randomize && BUGGIFY ) CHECK_BETTER_MASTER_INTERVAL = 0.001;
|
||||
|
@ -278,11 +279,11 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( MAX_VERSION_DIFFERENCE, 20 * VERSIONS_PER_SECOND );
|
||||
|
||||
init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;
|
||||
init( EXPECTED_MASTER_FITNESS, ProcessClass::GoodFit );
|
||||
init( EXPECTED_TLOG_FITNESS, ProcessClass::GoodFit );
|
||||
init( EXPECTED_LOG_ROUTER_FITNESS, ProcessClass::GoodFit );
|
||||
init( EXPECTED_PROXY_FITNESS, ProcessClass::GoodFit );
|
||||
init( EXPECTED_RESOLVER_FITNESS, ProcessClass::GoodFit );
|
||||
init( EXPECTED_MASTER_FITNESS, ProcessClass::UnsetFit );
|
||||
init( EXPECTED_TLOG_FITNESS, ProcessClass::UnsetFit );
|
||||
init( EXPECTED_LOG_ROUTER_FITNESS, ProcessClass::UnsetFit );
|
||||
init( EXPECTED_PROXY_FITNESS, ProcessClass::UnsetFit );
|
||||
init( EXPECTED_RESOLVER_FITNESS, ProcessClass::UnsetFit );
|
||||
init( RECRUITMENT_TIMEOUT, 600 ); if( randomize && BUGGIFY ) RECRUITMENT_TIMEOUT = g_random->coinflip() ? 60.0 : 1.0;
|
||||
|
||||
init( POLICY_RATING_TESTS, 200 ); if( randomize && BUGGIFY ) POLICY_RATING_TESTS = 20;
|
||||
|
|
|
@ -212,6 +212,7 @@ public:
|
|||
double MASTER_SPIN_DELAY;
|
||||
double CC_CHANGE_DELAY;
|
||||
double WAIT_FOR_GOOD_RECRUITMENT_DELAY;
|
||||
double WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY;
|
||||
double ATTEMPT_RECRUITMENT_DELAY;
|
||||
double WORKER_FAILURE_TIME;
|
||||
double CHECK_BETTER_MASTER_INTERVAL;
|
||||
|
|
|
@ -82,7 +82,10 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
|
|||
state bool iAmLeader = false;
|
||||
state UID prevChangeID;
|
||||
|
||||
if( asyncPriorityInfo->get().processClassFitness > ProcessClass::UnsetFit || asyncPriorityInfo->get().dcFitness == ClusterControllerPriorityInfo::FitnessBad || asyncPriorityInfo->get().isExcluded ) {
|
||||
|
||||
if(asyncPriorityInfo->get().dcFitness == ClusterControllerPriorityInfo::FitnessBad || asyncPriorityInfo->get().dcFitness == ClusterControllerPriorityInfo::FitnessRemote || asyncPriorityInfo->get().isExcluded) {
|
||||
Void _ = wait( delay(SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY) );
|
||||
} else if( asyncPriorityInfo->get().processClassFitness > ProcessClass::UnsetFit ) {
|
||||
Void _ = wait( delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) );
|
||||
}
|
||||
|
||||
|
|
|
@ -864,6 +864,7 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) {
|
|||
ASSERT(false); // Programmer forgot to adjust cases.
|
||||
}
|
||||
|
||||
if (g_random->random01() < 0.25) db.desiredLogRouterCount = g_random->randomInt(1,7);
|
||||
if (g_random->random01() < 0.25) db.remoteDesiredTLogCount = g_random->randomInt(1,7);
|
||||
}
|
||||
|
||||
|
|
|
@ -1600,7 +1600,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
oldLogSystem->recruitmentID = logSystem->recruitmentID;
|
||||
|
||||
if(configuration.usableRegions > 1) {
|
||||
logSystem->logRouterTags = recr.tLogs.size();
|
||||
logSystem->logRouterTags = recr.tLogs.size() * std::max<int>(1, configuration.desiredLogRouterCount / std::max<int>(1,recr.tLogs.size()));
|
||||
logSystem->expectedLogSets++;
|
||||
} else {
|
||||
logSystem->logRouterTags = 0;
|
||||
|
|
|
@ -311,7 +311,7 @@ ACTOR Future<Void> newTLogServers( Reference<MasterData> self, RecruitFromConfig
|
|||
self->dcId_locality[remoteDcId] = loc;
|
||||
}
|
||||
|
||||
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, remoteDcId, recr.tLogs.size() ) ) );
|
||||
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, remoteDcId, recr.tLogs.size() * std::max<int>(1, self->configuration.desiredLogRouterCount / std::max<int>(1, recr.tLogs.size())) ) ) );
|
||||
|
||||
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->dcId_locality[recr.dcId], self->dcId_locality[remoteDcId], self->allTags, self->recruitmentStalled ) );
|
||||
self->logSystem = newLogSystem;
|
||||
|
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue