re-added the ability to configure the number of log routers. Many log routers are needed to get a sufficient number of sockets involved in copying data across the WAN
This commit is contained in:
parent
9a91dad5bd
commit
8a8914f046
|
@ -362,6 +362,7 @@
|
|||
"remote_redundancy_mode":"remote_single",
|
||||
"remote_log_replicas":3,
|
||||
"remote_logs":5,
|
||||
"log_routers":10,
|
||||
"usable_regions":1,
|
||||
"storage_replicas":1,
|
||||
"resolvers":1,
|
||||
|
|
|
@ -884,6 +884,12 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
|
|||
|
||||
if (statusObjConfig.get("logs", intVal))
|
||||
outputString += format("\n Desired Logs - %d", intVal);
|
||||
|
||||
if (statusObjConfig.get("remote_logs", intVal))
|
||||
outputString += format("\n Desired Remote Logs - %d", intVal);
|
||||
|
||||
if (statusObjConfig.get("log_routers", intVal))
|
||||
outputString += format("\n Desired Log Routers - %d", intVal);
|
||||
}
|
||||
catch (std::runtime_error& e) {
|
||||
outputString = outputStringCache;
|
||||
|
|
|
@ -29,7 +29,7 @@ DatabaseConfiguration::DatabaseConfiguration()
|
|||
void DatabaseConfiguration::resetInternal() {
|
||||
// does NOT reset rawConfiguration
|
||||
initialized = false;
|
||||
masterProxyCount = resolverCount = desiredTLogCount = tLogWriteAntiQuorum = tLogReplicationFactor = storageTeamSize = -1;
|
||||
masterProxyCount = resolverCount = desiredTLogCount = tLogWriteAntiQuorum = tLogReplicationFactor = storageTeamSize = desiredLogRouterCount = -1;
|
||||
tLogDataStoreType = storageServerStoreType = KeyValueStoreType::END;
|
||||
autoMasterProxyCount = CLIENT_KNOBS->DEFAULT_AUTO_PROXIES;
|
||||
autoResolverCount = CLIENT_KNOBS->DEFAULT_AUTO_RESOLVERS;
|
||||
|
@ -297,6 +297,9 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
|
|||
if( resolverCount != -1 ) {
|
||||
result["resolvers"] = resolverCount;
|
||||
}
|
||||
if( desiredLogRouterCount != -1 ) {
|
||||
result["log_routers"] = desiredLogRouterCount;
|
||||
}
|
||||
if( remoteDesiredTLogCount != -1 ) {
|
||||
result["remote_logs"] = remoteDesiredTLogCount;
|
||||
}
|
||||
|
@ -336,6 +339,7 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
|
|||
else if (ck == LiteralStringRef("auto_logs")) parse(&autoDesiredTLogCount, value);
|
||||
else if (ck == LiteralStringRef("storage_replication_policy")) parseReplicationPolicy(&storagePolicy, value);
|
||||
else if (ck == LiteralStringRef("log_replication_policy")) parseReplicationPolicy(&tLogPolicy, value);
|
||||
else if (ck == LiteralStringRef("log_routers")) parse(&desiredLogRouterCount, value);
|
||||
else if (ck == LiteralStringRef("remote_logs")) parse(&remoteDesiredTLogCount, value);
|
||||
else if (ck == LiteralStringRef("remote_log_replicas")) parse(&remoteTLogReplicationFactor, value);
|
||||
else if (ck == LiteralStringRef("remote_log_policy")) parseReplicationPolicy(&remoteTLogPolicy, value);
|
||||
|
|
|
@ -159,6 +159,7 @@ struct DatabaseConfiguration {
|
|||
KeyValueStoreType storageServerStoreType;
|
||||
|
||||
// Remote TLogs
|
||||
int32_t desiredLogRouterCount;
|
||||
int32_t remoteDesiredTLogCount;
|
||||
int32_t remoteTLogReplicationFactor;
|
||||
IRepPolicyRef remoteTLogPolicy;
|
||||
|
|
|
@ -65,14 +65,14 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
|
|||
std::string key = mode.substr(0, pos);
|
||||
std::string value = mode.substr(pos+1);
|
||||
|
||||
if( (key == "logs" || key == "proxies" || key == "resolvers" || key == "remote_logs" || key == "satellite_logs" || key == "usable_regions") && isInteger(value) ) {
|
||||
if( (key == "logs" || key == "proxies" || key == "resolvers" || key == "remote_logs" || key == "log_routers" || key == "satellite_logs" || key == "usable_regions") && isInteger(value) ) {
|
||||
out[p+key] = value;
|
||||
}
|
||||
|
||||
if( key == "regions" ) {
|
||||
json_spirit::mValue mv;
|
||||
json_spirit::read_string( value, mv );
|
||||
|
||||
|
||||
StatusObject regionObj;
|
||||
regionObj["regions"] = mv;
|
||||
out[p+key] = BinaryWriter::toValue(regionObj, IncludeVersion()).toString();
|
||||
|
|
|
@ -459,7 +459,6 @@ public:
|
|||
|
||||
std::set<Optional<Key>> remoteDC;
|
||||
remoteDC.insert(req.dcId);
|
||||
|
||||
|
||||
auto remoteLogs = getWorkersForTlogs( req.configuration, req.configuration.getRemoteTLogReplicationFactor(), req.configuration.getDesiredRemoteLogs(), req.configuration.getRemoteTLogPolicy(), id_used, false, remoteDC );
|
||||
for(int i = 0; i < remoteLogs.size(); i++) {
|
||||
|
@ -895,13 +894,15 @@ public:
|
|||
|
||||
if(oldRemoteTLogFit < newRemoteTLogFit) return false;
|
||||
|
||||
int oldRouterCount = oldTLogFit.count * std::max<int>(1, db.config.desiredLogRouterCount / std::max(1,oldTLogFit.count));
|
||||
int newRouterCount = newTLogFit.count * std::max<int>(1, db.config.desiredLogRouterCount / std::max(1,newTLogFit.count));
|
||||
RoleFitness oldLogRoutersFit(log_routers, ProcessClass::LogRouter);
|
||||
RoleFitness newLogRoutersFit((db.config.usableRegions > 1 && dbi.recoveryState == RecoveryState::REMOTE_RECOVERED) ? getWorkersForRoleInDatacenter( *remoteDC.begin(), ProcessClass::LogRouter, newTLogFit.count, db.config, id_used, Optional<WorkerFitnessInfo>(), true ) : log_routers, ProcessClass::LogRouter);
|
||||
RoleFitness newLogRoutersFit((db.config.usableRegions > 1 && dbi.recoveryState == RecoveryState::REMOTE_RECOVERED) ? getWorkersForRoleInDatacenter( *remoteDC.begin(), ProcessClass::LogRouter, newRouterCount, db.config, id_used, Optional<WorkerFitnessInfo>(), true ) : log_routers, ProcessClass::LogRouter);
|
||||
|
||||
if(oldLogRoutersFit.count < oldTLogFit.count) {
|
||||
if(oldLogRoutersFit.count < oldRouterCount) {
|
||||
oldLogRoutersFit.worstFit = ProcessClass::NeverAssign;
|
||||
}
|
||||
if(newLogRoutersFit.count < newTLogFit.count) {
|
||||
if(newLogRoutersFit.count < newRouterCount) {
|
||||
newLogRoutersFit.worstFit = ProcessClass::NeverAssign;
|
||||
}
|
||||
|
||||
|
|
|
@ -863,6 +863,7 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) {
|
|||
ASSERT(false); // Programmer forgot to adjust cases.
|
||||
}
|
||||
|
||||
if (g_random->random01() < 0.25) db.desiredLogRouterCount = g_random->randomInt(1,7);
|
||||
if (g_random->random01() < 0.25) db.remoteDesiredTLogCount = g_random->randomInt(1,7);
|
||||
}
|
||||
|
||||
|
|
|
@ -1600,7 +1600,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
oldLogSystem->recruitmentID = logSystem->recruitmentID;
|
||||
|
||||
if(configuration.usableRegions > 1) {
|
||||
logSystem->logRouterTags = recr.tLogs.size();
|
||||
logSystem->logRouterTags = recr.tLogs.size() * std::max<int>(1, configuration.desiredLogRouterCount / std::max<int>(1,recr.tLogs.size()));
|
||||
logSystem->expectedLogSets++;
|
||||
} else {
|
||||
logSystem->logRouterTags = 0;
|
||||
|
|
|
@ -311,7 +311,7 @@ ACTOR Future<Void> newTLogServers( Reference<MasterData> self, RecruitFromConfig
|
|||
self->dcId_locality[remoteDcId] = loc;
|
||||
}
|
||||
|
||||
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, remoteDcId, recr.tLogs.size() ) ) );
|
||||
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, remoteDcId, recr.tLogs.size() * std::max<int>(1, self->configuration.desiredLogRouterCount / std::max<int>(1, recr.tLogs.size())) ) ) );
|
||||
|
||||
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->dcId_locality[recr.dcId], self->dcId_locality[remoteDcId], self->allTags, self->recruitmentStalled ) );
|
||||
self->logSystem = newLogSystem;
|
||||
|
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue