Rename more places from proxy to commit proxy

This commit is contained in:
Young Liu 2020-09-15 22:29:49 -07:00
parent 35bef73a1c
commit cc5bc16bd8
22 changed files with 100 additions and 91 deletions

View File

@ -471,8 +471,7 @@ void initHelp() {
helpMap["configure"] = CommandHelp( helpMap["configure"] = CommandHelp(
"configure [new] " "configure [new] "
"<single|double|triple|three_data_hall|three_datacenter|ssd|memory|memory-radixtree-beta|commit_proxies=<" "<single|double|triple|three_data_hall|three_datacenter|ssd|memory|memory-radixtree-beta|commit_proxies=<"
"COMMIT_PROXIES>|grv_" "COMMIT_PROXIES>|grv_proxies=<GRV_PROXIES>|logs=<LOGS>|resolvers=<RESOLVERS>>*",
"proxies=<GRV_PROXIES>|logs=<LOGS>|resolvers=<RESOLVERS>>*",
"change the database configuration", "change the database configuration",
"The `new' option, if present, initializes a new database with the given configuration rather than changing " "The `new' option, if present, initializes a new database with the given configuration rather than changing "
"the configuration of an existing one. When used, both a redundancy mode and a storage engine must be " "the configuration of an existing one. When used, both a redundancy mode and a storage engine must be "
@ -481,15 +480,13 @@ void initHelp() {
"See the Admin Guide.\n three_datacenter - See the Admin Guide.\n\nStorage engine:\n ssd - B-Tree storage " "See the Admin Guide.\n three_datacenter - See the Admin Guide.\n\nStorage engine:\n ssd - B-Tree storage "
"engine optimized for solid state disks.\n memory - Durable in-memory storage engine for small " "engine optimized for solid state disks.\n memory - Durable in-memory storage engine for small "
"datasets.\n\ncommit_proxies=<COMMIT_PROXIES>: Sets the desired number of commit proxies in the cluster. Must " "datasets.\n\ncommit_proxies=<COMMIT_PROXIES>: Sets the desired number of commit proxies in the cluster. Must "
"be at least 1, or set " "be at least 1, or set to -1 which restores the number of commit proxies to the default "
"to -1 which restores the number of commit proxies to the default value.\n\ngrv_proxies=<GRV_PROXIES>: Sets " "value.\n\ngrv_proxies=<GRV_PROXIES>: Sets the desired number of GRV proxies in the cluster. Must be at least "
"the " "1, or set to -1 which restores the number of GRV proxies to the default value.\n\nlogs=<LOGS>: Sets the "
"desired number of GRV proxies in the cluster. Must be at least 1, or set to -1 which restores the number of " "desired number of log servers in the cluster. Must be at least 1, or set to -1 which restores the number of "
"GRV proxies to the default value.\n\nlogs=<LOGS>: Sets the desired number of log servers in the cluster. Must " "logs to the default value.\n\nresolvers=<RESOLVERS>: Sets the desired number of resolvers in the cluster. "
"be " "Must be at least 1, or set to -1 which restores the number of resolvers to the default value.\n\nSee the "
"at least 1, or set to -1 which restores the number of logs to the default value.\n\nresolvers=<RESOLVERS>: " "FoundationDB Administration Guide for more information.");
"Sets the desired number of resolvers in the cluster. Must be at least 1, or set to -1 which restores the "
"number of resolvers to the default value.\n\nSee the FoundationDB Administration Guide for more information.");
helpMap["fileconfigure"] = CommandHelp( helpMap["fileconfigure"] = CommandHelp(
"fileconfigure [new] <FILENAME>", "fileconfigure [new] <FILENAME>",
"change the database configuration from a file", "change the database configuration from a file",

View File

@ -100,10 +100,10 @@ struct CommitProxyInterface {
struct ClientDBInfo { struct ClientDBInfo {
constexpr static FileIdentifier file_identifier = 5355080; constexpr static FileIdentifier file_identifier = 5355080;
UID id; // Changes each time anything else changes UID id; // Changes each time anything else changes
vector< GrvProxyInterface > grvProxies; vector<GrvProxyInterface> grvProxies;
vector<CommitProxyInterface> commitProxies; vector<CommitProxyInterface> commitProxies;
Optional<CommitProxyInterface> Optional<CommitProxyInterface>
firstCommitProxy; // not serialized, used for commitOnFirstProxy when the proxies vector has been shrunk firstCommitProxy; // not serialized, used for commitOnFirstProxy when the commit proxies vector has been shrunk
double clientTxnInfoSampleRate; double clientTxnInfoSampleRate;
int64_t clientTxnInfoSizeLimit; int64_t clientTxnInfoSizeLimit;
Optional<Value> forward; Optional<Value> forward;

View File

@ -164,24 +164,40 @@ void DatabaseConfiguration::setDefaultReplicationPolicy() {
} }
bool DatabaseConfiguration::isValid() const { bool DatabaseConfiguration::isValid() const {
if (!(initialized && tLogWriteAntiQuorum >= 0 && tLogWriteAntiQuorum <= tLogReplicationFactor / 2 && if( !(initialized &&
tLogReplicationFactor >= 1 && storageTeamSize >= 1 && getDesiredCommitProxies() >= 1 && tLogWriteAntiQuorum >= 0 &&
getDesiredGrvProxies() >= 1 && getDesiredLogs() >= 1 && getDesiredResolvers() >= 1 && tLogWriteAntiQuorum <= tLogReplicationFactor/2 &&
tLogVersion != TLogVersion::UNSET && tLogVersion >= TLogVersion::MIN_RECRUITABLE && tLogReplicationFactor >= 1 &&
tLogVersion <= TLogVersion::MAX_SUPPORTED && tLogDataStoreType != KeyValueStoreType::END && storageTeamSize >= 1 &&
getDesiredCommitProxies() >= 1 &&
getDesiredGrvProxies() >= 1 &&
getDesiredLogs() >= 1 &&
getDesiredResolvers() >= 1 &&
tLogVersion != TLogVersion::UNSET &&
tLogVersion >= TLogVersion::MIN_RECRUITABLE &&
tLogVersion <= TLogVersion::MAX_SUPPORTED &&
tLogDataStoreType != KeyValueStoreType::END &&
tLogSpillType != TLogSpillType::UNSET && tLogSpillType != TLogSpillType::UNSET &&
!(tLogSpillType == TLogSpillType::REFERENCE && tLogVersion < TLogVersion::V3) && !(tLogSpillType == TLogSpillType::REFERENCE && tLogVersion < TLogVersion::V3) &&
storageServerStoreType != KeyValueStoreType::END && autoCommitProxyCount >= 1 && autoGrvProxyCount >= 1 && storageServerStoreType != KeyValueStoreType::END &&
autoResolverCount >= 1 && autoDesiredTLogCount >= 1 && storagePolicy && tLogPolicy && autoCommitProxyCount >= 1 &&
getDesiredRemoteLogs() >= 1 && remoteTLogReplicationFactor >= 0 && repopulateRegionAntiQuorum >= 0 && autoGrvProxyCount >= 1 &&
repopulateRegionAntiQuorum <= 1 && usableRegions >= 1 && usableRegions <= 2 && regions.size() <= 2 && autoResolverCount >= 1 &&
(usableRegions == 1 || regions.size() == 2) && (regions.size() == 0 || regions[0].priority >= 0) && autoDesiredTLogCount >= 1 &&
(regions.size() == 0 || storagePolicy &&
tLogPolicy->info() != tLogPolicy &&
"dcid^2 x zoneid^2 x 1"))) { // We cannot specify regions with three_datacenter replication getDesiredRemoteLogs() >= 1 &&
remoteTLogReplicationFactor >= 0 &&
repopulateRegionAntiQuorum >= 0 &&
repopulateRegionAntiQuorum <= 1 &&
usableRegions >= 1 &&
usableRegions <= 2 &&
regions.size() <= 2 &&
( usableRegions == 1 || regions.size() == 2 ) &&
( regions.size() == 0 || regions[0].priority >= 0 ) &&
( regions.size() == 0 || tLogPolicy->info() != "dcid^2 x zoneid^2 x 1") ) ) { //We cannot specify regions with three_datacenter replication
return false; return false;
} }
std::set<Key> dcIds; std::set<Key> dcIds;
dcIds.insert(Key()); dcIds.insert(Key());
for(auto& r : regions) { for(auto& r : regions) {

View File

@ -221,7 +221,7 @@ public:
Future<Void> monitorProxiesInfoChange; Future<Void> monitorProxiesInfoChange;
Reference<CommitProxyInfo> commitProxies; Reference<CommitProxyInfo> commitProxies;
Reference<GrvProxyInfo> grvProxies; Reference<GrvProxyInfo> grvProxies;
bool proxyProvisional; bool proxyProvisional; // Provisional commit proxy and grv proxy are used at the same time.
UID proxiesLastChange; UID proxiesLastChange;
LocalityData clientLocality; LocalityData clientLocality;
QueueModel queueModel; QueueModel queueModel;

View File

@ -747,7 +747,6 @@ ConfigureAutoResult parseConfig( StatusObject const& status ) {
proxyCount = result.old_commit_proxies; proxyCount = result.old_commit_proxies;
} }
// Need to configure a good number.
result.desired_grv_proxies = std::max(std::min(4, processCount / 20), 1); result.desired_grv_proxies = std::max(std::min(4, processCount / 20), 1);
int grvProxyCount; int grvProxyCount;
if (!statusObjConfig.get("grv_proxies", result.old_grv_proxies)) { if (!statusObjConfig.get("grv_proxies", result.old_grv_proxies)) {

View File

@ -1597,9 +1597,9 @@ Reference<GrvProxyInfo> DatabaseContext::getGrvProxies(bool useProvisionalProxie
// Actor which will wait until the MultiInterface<CommitProxyInterface> returned by the DatabaseContext cx is not NULL // Actor which will wait until the MultiInterface<CommitProxyInterface> returned by the DatabaseContext cx is not NULL
ACTOR Future<Reference<CommitProxyInfo>> getCommitProxiesFuture(DatabaseContext* cx, bool useProvisionalProxies) { ACTOR Future<Reference<CommitProxyInfo>> getCommitProxiesFuture(DatabaseContext* cx, bool useProvisionalProxies) {
loop{ loop{
Reference<CommitProxyInfo> proxies = cx->getCommitProxies(useProvisionalProxies); Reference<CommitProxyInfo> commitProxies = cx->getCommitProxies(useProvisionalProxies);
if (proxies) if (commitProxies)
return proxies; return commitProxies;
wait( cx->onProxiesChanged() ); wait( cx->onProxiesChanged() );
} }
} }

View File

@ -195,7 +195,7 @@ description is not currently required but encouraged.
<Option name="next_write_no_write_conflict_range" code="30" <Option name="next_write_no_write_conflict_range" code="30"
description="The next write performed on this transaction will not generate a write conflict range. As a result, other transactions which read the key(s) being modified by the next write will not conflict with this transaction. Care needs to be taken when using this option on a transaction that is shared between multiple threads. When setting this option, write conflict ranges will be disabled on the next write operation, regardless of what thread it is on." /> description="The next write performed on this transaction will not generate a write conflict range. As a result, other transactions which read the key(s) being modified by the next write will not conflict with this transaction. Care needs to be taken when using this option on a transaction that is shared between multiple threads. When setting this option, write conflict ranges will be disabled on the next write operation, regardless of what thread it is on." />
<Option name="commit_on_first_proxy" code="40" <Option name="commit_on_first_proxy" code="40"
description="Committing this transaction will bypass the normal load balancing across proxies and go directly to the specifically nominated 'first proxy'." description="Committing this transaction will bypass the normal load balancing across commit proxies and go directly to the specifically nominated 'first commit proxy'."
hidden="true" /> hidden="true" />
<Option name="check_writes_enable" code="50" <Option name="check_writes_enable" code="50"
hidden="true" /> hidden="true" />

View File

@ -77,7 +77,6 @@ public:
if (s=="storage") _class = StorageClass; if (s=="storage") _class = StorageClass;
else if (s=="transaction") _class = TransactionClass; else if (s=="transaction") _class = TransactionClass;
else if (s=="resolution") _class = ResolutionClass; else if (s=="resolution") _class = ResolutionClass;
// else if (s=="proxy") _class = CommitProxyClass;
else if (s=="commit_proxy") _class = CommitProxyClass; else if (s=="commit_proxy") _class = CommitProxyClass;
else if (s=="grv_proxy") _class = GrvProxyClass; else if (s=="grv_proxy") _class = GrvProxyClass;
else if (s=="master") _class = MasterClass; else if (s=="master") _class = MasterClass;
@ -100,7 +99,6 @@ public:
if (classStr=="storage") _class = StorageClass; if (classStr=="storage") _class = StorageClass;
else if (classStr=="transaction") _class = TransactionClass; else if (classStr=="transaction") _class = TransactionClass;
else if (classStr=="resolution") _class = ResolutionClass; else if (classStr=="resolution") _class = ResolutionClass;
// else if (classStr=="proxy") _class = CommitProxyClass;
else if (classStr=="commit_proxy") _class = CommitProxyClass; else if (classStr=="commit_proxy") _class = CommitProxyClass;
else if (classStr=="grv_proxy") _class = GrvProxyClass; else if (classStr=="grv_proxy") _class = GrvProxyClass;
else if (classStr=="master") _class = MasterClass; else if (classStr=="master") _class = MasterClass;
@ -344,7 +342,6 @@ struct LBLocalityData {
template <class Interface> template <class Interface>
struct LBLocalityData<Interface, typename std::enable_if< Interface::LocationAwareLoadBalance >::type> { struct LBLocalityData<Interface, typename std::enable_if< Interface::LocationAwareLoadBalance >::type> {
enum { Present = 1 }; enum { Present = 1 };
// TODO: figure out why some interfaces don't have locality.
static LocalityData getLocality( Interface const& i ) { return i.locality; } static LocalityData getLocality( Interface const& i ) { return i.locality; }
static NetworkAddress getAddress( Interface const& i ) { return i.address(); } static NetworkAddress getAddress( Interface const& i ) { return i.address(); }
static bool alwaysFresh() { return Interface::AlwaysFresh; } static bool alwaysFresh() { return Interface::AlwaysFresh; }

View File

@ -43,8 +43,8 @@ Reference<StorageInfo> getStorageInfo(UID id, std::map<UID, Reference<StorageInf
} }
// It is incredibly important that any modifications to txnStateStore are done in such a way that // It is incredibly important that any modifications to txnStateStore are done in such a way that
// the same operations will be done on all proxies at the same time. Otherwise, the data stored in // the same operations will be done on all commit proxies at the same time. Otherwise, the data
// txnStateStore will become corrupted. // stored in txnStateStore will become corrupted.
void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRef<MutationRef> const& mutations, void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRef<MutationRef> const& mutations,
IKeyValueStore* txnStateStore, LogPushData* toCommit, bool& confChange, IKeyValueStore* txnStateStore, LogPushData* toCommit, bool& confChange,
Reference<ILogSystem> logSystem, Version popVersion, Reference<ILogSystem> logSystem, Version popVersion,

View File

@ -801,7 +801,7 @@ public:
RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredSatelliteLogs(dcId), RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredSatelliteLogs(dcId),
ProcessClass::TLog) ProcessClass::TLog)
.betterCount(RoleFitness(satelliteLogs, ProcessClass::TLog))) || .betterCount(RoleFitness(satelliteLogs, ProcessClass::TLog))) ||
RoleFitness(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, req.configuration.getDesiredCommitProxies(), RoleFitness(SERVER_KNOBS->EXPECTED_COMMIT_PROXY_FITNESS, req.configuration.getDesiredCommitProxies(),
ProcessClass::CommitProxy) ProcessClass::CommitProxy)
.betterCount(RoleFitness(commit_proxies, ProcessClass::CommitProxy)) || .betterCount(RoleFitness(commit_proxies, ProcessClass::CommitProxy)) ||
RoleFitness(SERVER_KNOBS->EXPECTED_GRV_PROXY_FITNESS, req.configuration.getDesiredGrvProxies(), RoleFitness(SERVER_KNOBS->EXPECTED_GRV_PROXY_FITNESS, req.configuration.getDesiredGrvProxies(),
@ -994,7 +994,7 @@ public:
(RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs(), (RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs(),
ProcessClass::TLog) ProcessClass::TLog)
.betterCount(RoleFitness(tlogs, ProcessClass::TLog)) || .betterCount(RoleFitness(tlogs, ProcessClass::TLog)) ||
RoleFitness(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, req.configuration.getDesiredCommitProxies(), RoleFitness(SERVER_KNOBS->EXPECTED_COMMIT_PROXY_FITNESS, req.configuration.getDesiredCommitProxies(),
ProcessClass::CommitProxy) ProcessClass::CommitProxy)
.betterCount(bestFitness.proxy) || .betterCount(bestFitness.proxy) ||
RoleFitness(SERVER_KNOBS->EXPECTED_GRV_PROXY_FITNESS, req.configuration.getDesiredGrvProxies(), RoleFitness(SERVER_KNOBS->EXPECTED_GRV_PROXY_FITNESS, req.configuration.getDesiredGrvProxies(),

View File

@ -438,7 +438,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( EXPECTED_MASTER_FITNESS, ProcessClass::UnsetFit ); init( EXPECTED_MASTER_FITNESS, ProcessClass::UnsetFit );
init( EXPECTED_TLOG_FITNESS, ProcessClass::UnsetFit ); init( EXPECTED_TLOG_FITNESS, ProcessClass::UnsetFit );
init( EXPECTED_LOG_ROUTER_FITNESS, ProcessClass::UnsetFit ); init( EXPECTED_LOG_ROUTER_FITNESS, ProcessClass::UnsetFit );
init( EXPECTED_PROXY_FITNESS, ProcessClass::UnsetFit ); init( EXPECTED_COMMIT_PROXY_FITNESS, ProcessClass::UnsetFit );
init( EXPECTED_GRV_PROXY_FITNESS, ProcessClass::UnsetFit ); init( EXPECTED_GRV_PROXY_FITNESS, ProcessClass::UnsetFit );
init( EXPECTED_RESOLVER_FITNESS, ProcessClass::UnsetFit ); init( EXPECTED_RESOLVER_FITNESS, ProcessClass::UnsetFit );
init( RECRUITMENT_TIMEOUT, 600 ); if( randomize && BUGGIFY ) RECRUITMENT_TIMEOUT = deterministicRandom()->coinflip() ? 60.0 : 1.0; init( RECRUITMENT_TIMEOUT, 600 ); if( randomize && BUGGIFY ) RECRUITMENT_TIMEOUT = deterministicRandom()->coinflip() ? 60.0 : 1.0;

View File

@ -369,7 +369,7 @@ public:
int EXPECTED_MASTER_FITNESS; int EXPECTED_MASTER_FITNESS;
int EXPECTED_TLOG_FITNESS; int EXPECTED_TLOG_FITNESS;
int EXPECTED_LOG_ROUTER_FITNESS; int EXPECTED_LOG_ROUTER_FITNESS;
int EXPECTED_PROXY_FITNESS; int EXPECTED_COMMIT_PROXY_FITNESS;
int EXPECTED_GRV_PROXY_FITNESS; int EXPECTED_GRV_PROXY_FITNESS;
int EXPECTED_RESOLVER_FITNESS; int EXPECTED_RESOLVER_FITNESS;
double RECRUITMENT_TIMEOUT; double RECRUITMENT_TIMEOUT;

View File

@ -38,7 +38,7 @@ struct MasterInterface {
RequestStream< struct ChangeCoordinatorsRequest > changeCoordinators; RequestStream< struct ChangeCoordinatorsRequest > changeCoordinators;
RequestStream< struct GetCommitVersionRequest > getCommitVersion; RequestStream< struct GetCommitVersionRequest > getCommitVersion;
RequestStream<struct BackupWorkerDoneRequest> notifyBackupWorkerDone; RequestStream<struct BackupWorkerDoneRequest> notifyBackupWorkerDone;
// Get the centralized live committed version reported by proxies. // Get the centralized live committed version reported by commit proxies.
RequestStream< struct GetRawCommittedVersionRequest > getLiveCommittedVersion; RequestStream< struct GetRawCommittedVersionRequest > getLiveCommittedVersion;
// Report a proxy's committed version. // Report a proxy's committed version.
RequestStream< struct ReportRawCommittedVersionRequest> reportLiveCommittedVersion; RequestStream< struct ReportRawCommittedVersionRequest> reportLiveCommittedVersion;

View File

@ -527,7 +527,7 @@ struct RatekeeperLimits {
{} {}
}; };
struct CommitProxyInfo { struct GrvProxyInfo {
int64_t totalTransactions; int64_t totalTransactions;
int64_t batchTransactions; int64_t batchTransactions;
uint64_t lastThrottledTagChangeId; uint64_t lastThrottledTagChangeId;
@ -535,7 +535,7 @@ struct CommitProxyInfo {
double lastUpdateTime; double lastUpdateTime;
double lastTagPushTime; double lastTagPushTime;
CommitProxyInfo() GrvProxyInfo()
: totalTransactions(0), batchTransactions(0), lastUpdateTime(0), lastThrottledTagChangeId(0), lastTagPushTime(0) { : totalTransactions(0), batchTransactions(0), lastUpdateTime(0), lastThrottledTagChangeId(0), lastTagPushTime(0) {
} }
}; };
@ -547,7 +547,7 @@ struct RatekeeperData {
Map<UID, StorageQueueInfo> storageQueueInfo; Map<UID, StorageQueueInfo> storageQueueInfo;
Map<UID, TLogQueueInfo> tlogQueueInfo; Map<UID, TLogQueueInfo> tlogQueueInfo;
std::map<UID, CommitProxyInfo> commitProxyInfo; std::map<UID, GrvProxyInfo> grvProxyInfo;
Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes; Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes;
HealthMetrics healthMetrics; HealthMetrics healthMetrics;
DatabaseConfiguration configuration; DatabaseConfiguration configuration;
@ -1269,7 +1269,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
.detail("ReleasedBatchTPS", self->smoothBatchReleasedTransactions.smoothRate()) .detail("ReleasedBatchTPS", self->smoothBatchReleasedTransactions.smoothRate())
.detail("TPSBasis", actualTps) .detail("TPSBasis", actualTps)
.detail("StorageServers", sscount) .detail("StorageServers", sscount)
.detail("GrvProxies", self->commitProxyInfo.size()) .detail("GrvProxies", self->grvProxyInfo.size())
.detail("TLogs", tlcount) .detail("TLogs", tlcount)
.detail("WorstFreeSpaceStorageServer", worstFreeSpaceStorageServer) .detail("WorstFreeSpaceStorageServer", worstFreeSpaceStorageServer)
.detail("WorstFreeSpaceTLog", worstFreeSpaceTLog) .detail("WorstFreeSpaceTLog", worstFreeSpaceTLog)
@ -1371,9 +1371,9 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
lastLimited = self.smoothReleasedTransactions.smoothRate() > SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit; lastLimited = self.smoothReleasedTransactions.smoothRate() > SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit;
double tooOld = now() - 1.0; double tooOld = now() - 1.0;
for (auto p = self.commitProxyInfo.begin(); p != self.commitProxyInfo.end();) { for (auto p = self.grvProxyInfo.begin(); p != self.grvProxyInfo.end();) {
if (p->second.lastUpdateTime < tooOld) if (p->second.lastUpdateTime < tooOld)
p = self.commitProxyInfo.erase(p); p = self.grvProxyInfo.erase(p);
else else
++p; ++p;
} }
@ -1382,7 +1382,7 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
when (GetRateInfoRequest req = waitNext(rkInterf.getRateInfo.getFuture())) { when (GetRateInfoRequest req = waitNext(rkInterf.getRateInfo.getFuture())) {
GetRateInfoReply reply; GetRateInfoReply reply;
auto& p = self.commitProxyInfo[req.requesterID]; auto& p = self.grvProxyInfo[req.requesterID];
//TraceEvent("RKMPU", req.requesterID).detail("TRT", req.totalReleasedTransactions).detail("Last", p.totalTransactions).detail("Delta", req.totalReleasedTransactions - p.totalTransactions); //TraceEvent("RKMPU", req.requesterID).detail("TRT", req.totalReleasedTransactions).detail("Last", p.totalTransactions).detail("Delta", req.totalReleasedTransactions - p.totalTransactions);
if (p.totalTransactions > 0) { if (p.totalTransactions > 0) {
self.smoothReleasedTransactions.addDelta( req.totalReleasedTransactions - p.totalTransactions ); self.smoothReleasedTransactions.addDelta( req.totalReleasedTransactions - p.totalTransactions );
@ -1399,8 +1399,8 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
p.batchTransactions = req.batchReleasedTransactions; p.batchTransactions = req.batchReleasedTransactions;
p.lastUpdateTime = now(); p.lastUpdateTime = now();
reply.transactionRate = self.normalLimits.tpsLimit / self.commitProxyInfo.size(); reply.transactionRate = self.normalLimits.tpsLimit / self.grvProxyInfo.size();
reply.batchTransactionRate = self.batchLimits.tpsLimit / self.commitProxyInfo.size(); reply.batchTransactionRate = self.batchLimits.tpsLimit / self.grvProxyInfo.size();
reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE; reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE;
if(p.lastThrottledTagChangeId != self.throttledTagChangeId || now() > p.lastTagPushTime + SERVER_KNOBS->TAG_THROTTLE_PUSH_INTERVAL) { if(p.lastThrottledTagChangeId != self.throttledTagChangeId || now() > p.lastTagPushTime + SERVER_KNOBS->TAG_THROTTLE_PUSH_INTERVAL) {

View File

@ -44,7 +44,7 @@ struct ProxyRequestsInfo {
namespace{ namespace{
struct Resolver : ReferenceCounted<Resolver> { struct Resolver : ReferenceCounted<Resolver> {
UID dbgid; UID dbgid;
int proxyCount, resolverCount; int commitProxyCount, resolverCount;
NotifiedVersion version; NotifiedVersion version;
AsyncVar<Version> neededVersion; AsyncVar<Version> neededVersion;
@ -77,8 +77,8 @@ struct Resolver : ReferenceCounted<Resolver> {
Future<Void> logger; Future<Void> logger;
Resolver( UID dbgid, int proxyCount, int resolverCount ) Resolver( UID dbgid, int commitProxyCount, int resolverCount )
: dbgid(dbgid), proxyCount(proxyCount), resolverCount(resolverCount), version(-1), conflictSet( newConflictSet() ), iopsSample( SERVER_KNOBS->KEY_BYTES_PER_SAMPLE ), debugMinRecentStateVersion(0), : dbgid(dbgid), commitProxyCount(commitProxyCount), resolverCount(resolverCount), version(-1), conflictSet( newConflictSet() ), iopsSample( SERVER_KNOBS->KEY_BYTES_PER_SAMPLE ), debugMinRecentStateVersion(0),
cc("Resolver", dbgid.toString()), cc("Resolver", dbgid.toString()),
resolveBatchIn("ResolveBatchIn", cc), resolveBatchStart("ResolveBatchStart", cc), resolvedTransactions("ResolvedTransactions", cc), resolvedBytes("ResolvedBytes", cc), resolveBatchIn("ResolveBatchIn", cc), resolveBatchStart("ResolveBatchStart", cc), resolvedTransactions("ResolvedTransactions", cc), resolvedBytes("ResolvedBytes", cc),
resolvedReadConflictRanges("ResolvedReadConflictRanges", cc), resolvedWriteConflictRanges("ResolvedWriteConflictRanges", cc), transactionsAccepted("TransactionsAccepted", cc), resolvedReadConflictRanges("ResolvedReadConflictRanges", cc), resolvedWriteConflictRanges("ResolvedWriteConflictRanges", cc), transactionsAccepted("TransactionsAccepted", cc),
@ -238,7 +238,7 @@ ACTOR Future<Void> resolveBatch(
//TraceEvent("ResolveBatch", self->dbgid).detail("PrevVersion", req.prevVersion).detail("Version", req.version).detail("StateTransactionVersions", self->recentStateTransactionSizes.size()).detail("StateBytes", stateBytes).detail("FirstVersion", self->recentStateTransactionSizes.empty() ? -1 : self->recentStateTransactionSizes.front().first).detail("StateMutationsIn", req.txnStateTransactions.size()).detail("StateMutationsOut", reply.stateMutations.size()).detail("From", proxyAddress); //TraceEvent("ResolveBatch", self->dbgid).detail("PrevVersion", req.prevVersion).detail("Version", req.version).detail("StateTransactionVersions", self->recentStateTransactionSizes.size()).detail("StateBytes", stateBytes).detail("FirstVersion", self->recentStateTransactionSizes.empty() ? -1 : self->recentStateTransactionSizes.front().first).detail("StateMutationsIn", req.txnStateTransactions.size()).detail("StateMutationsOut", reply.stateMutations.size()).detail("From", proxyAddress);
ASSERT(!proxyInfo.outstandingBatches.empty()); ASSERT(!proxyInfo.outstandingBatches.empty());
ASSERT(self->proxyInfoMap.size() <= self->proxyCount+1); ASSERT(self->proxyInfoMap.size() <= self->commitProxyCount+1);
// SOMEDAY: This is O(n) in number of proxies. O(log n) solution using appropriate data structure? // SOMEDAY: This is O(n) in number of proxies. O(log n) solution using appropriate data structure?
Version oldestProxyVersion = req.version; Version oldestProxyVersion = req.version;
@ -257,7 +257,7 @@ ACTOR Future<Void> resolveBatch(
TEST(oldestProxyVersion != req.version); // The proxy that sent this request does not have the oldest current version TEST(oldestProxyVersion != req.version); // The proxy that sent this request does not have the oldest current version
bool anyPopped = false; bool anyPopped = false;
if(firstUnseenVersion <= oldestProxyVersion && self->proxyInfoMap.size() == self->proxyCount+1) { if(firstUnseenVersion <= oldestProxyVersion && self->proxyInfoMap.size() == self->commitProxyCount+1) {
TEST(true); // Deleting old state transactions TEST(true); // Deleting old state transactions
self->recentStateTransactions.erase( self->recentStateTransactions.begin(), self->recentStateTransactions.upper_bound( oldestProxyVersion ) ); self->recentStateTransactions.erase( self->recentStateTransactions.begin(), self->recentStateTransactions.upper_bound( oldestProxyVersion ) );
self->debugMinRecentStateVersion = oldestProxyVersion + 1; self->debugMinRecentStateVersion = oldestProxyVersion + 1;

View File

@ -1063,14 +1063,14 @@ ACTOR static Future<JsonBuilderObject> recoveryStateStatusFetcher(WorkerDetails
// Add additional metadata for certain statuses // Add additional metadata for certain statuses
if (mStatusCode == RecoveryStatus::recruiting_transaction_servers) { if (mStatusCode == RecoveryStatus::recruiting_transaction_servers) {
int requiredLogs = atoi( md.getValue("RequiredTLogs").c_str() ); int requiredLogs = atoi( md.getValue("RequiredTLogs").c_str() );
int requiredProxies = atoi(md.getValue("RequiredCommitProxies").c_str()); int requiredCommitProxies = atoi(md.getValue("RequiredCommitProxies").c_str());
int requiredGrvProxies = atoi(md.getValue("RequiredGrvProxies").c_str()); int requiredGrvProxies = atoi(md.getValue("RequiredGrvProxies").c_str());
int requiredResolvers = atoi( md.getValue("RequiredResolvers").c_str() ); int requiredResolvers = atoi( md.getValue("RequiredResolvers").c_str() );
//int requiredProcesses = std::max(requiredLogs, std::max(requiredResolvers, requiredProxies)); //int requiredProcesses = std::max(requiredLogs, std::max(requiredResolvers, requiredCommitProxies));
//int requiredMachines = std::max(requiredLogs, 1); //int requiredMachines = std::max(requiredLogs, 1);
message["required_logs"] = requiredLogs; message["required_logs"] = requiredLogs;
message["required_commit_proxies"] = requiredProxies; message["required_commit_proxies"] = requiredCommitProxies;
message["required_grv_proxies"] = requiredGrvProxies; message["required_grv_proxies"] = requiredGrvProxies;
message["required_resolvers"] = requiredResolvers; message["required_resolvers"] = requiredResolvers;
} else if (mStatusCode == RecoveryStatus::locking_old_transaction_servers) { } else if (mStatusCode == RecoveryStatus::locking_old_transaction_servers) {
@ -2443,7 +2443,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
getProcessIssuesAsMessages(workerIssues); getProcessIssuesAsMessages(workerIssues);
state vector<std::pair<StorageServerInterface, EventMap>> storageServers; state vector<std::pair<StorageServerInterface, EventMap>> storageServers;
state vector<std::pair<TLogInterface, EventMap>> tLogs; state vector<std::pair<TLogInterface, EventMap>> tLogs;
state vector<std::pair<CommitProxyInterface, EventMap>> commit_proxies; state vector<std::pair<CommitProxyInterface, EventMap>> commitProxies;
state vector<std::pair<GrvProxyInterface, EventMap>> grvProxies; state vector<std::pair<GrvProxyInterface, EventMap>> grvProxies;
state JsonBuilderObject qos; state JsonBuilderObject qos;
state JsonBuilderObject data_overlay; state JsonBuilderObject data_overlay;
@ -2592,9 +2592,9 @@ ACTOR Future<StatusReply> clusterGetStatus(
} }
// ...also commit proxies // ...also commit proxies
ErrorOr<vector<std::pair<CommitProxyInterface, EventMap>>> _commit_proxies = wait(commitProxyFuture); ErrorOr<vector<std::pair<CommitProxyInterface, EventMap>>> _commitProxies = wait(commitProxyFuture);
if (_commit_proxies.present()) { if (_commitProxies.present()) {
commit_proxies = _commit_proxies.get(); commitProxies = _commitProxies.get();
} else { } else {
messages.push_back( messages.push_back(
JsonBuilder::makeMessage("commit_proxies_error", "Timed out trying to retrieve commit proxies.")); JsonBuilder::makeMessage("commit_proxies_error", "Timed out trying to retrieve commit proxies."));
@ -2620,7 +2620,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
JsonBuilderObject processStatus = wait(processStatusFetcher( JsonBuilderObject processStatus = wait(processStatusFetcher(
db, workers, pMetrics, mMetrics, networkMetrics, latestError, traceFileOpenErrors, programStarts, db, workers, pMetrics, mMetrics, networkMetrics, latestError, traceFileOpenErrors, programStarts,
processIssues, storageServers, tLogs, commit_proxies, grvProxies, coordinators, cx, configuration, processIssues, storageServers, tLogs, commitProxies, grvProxies, coordinators, cx, configuration,
loadResult.present() ? loadResult.get().healthyZone : Optional<Key>(), &status_incomplete_reasons)); loadResult.present() ? loadResult.get().healthyZone : Optional<Key>(), &status_incomplete_reasons));
statusObj["processes"] = processStatus; statusObj["processes"] = processStatus;
statusObj["clients"] = clientStatusFetcher(clientStatus); statusObj["clients"] = clientStatusFetcher(clientStatus);

View File

@ -52,18 +52,18 @@ using std::vector;
using std::min; using std::min;
using std::max; using std::max;
struct ProxyVersionReplies { struct CommitProxyVersionReplies {
std::map<uint64_t, GetCommitVersionReply> replies; std::map<uint64_t, GetCommitVersionReply> replies;
NotifiedVersion latestRequestNum; NotifiedVersion latestRequestNum;
ProxyVersionReplies(ProxyVersionReplies&& r) noexcept CommitProxyVersionReplies(CommitProxyVersionReplies&& r) noexcept
: replies(std::move(r.replies)), latestRequestNum(std::move(r.latestRequestNum)) {} : replies(std::move(r.replies)), latestRequestNum(std::move(r.latestRequestNum)) {}
void operator=(ProxyVersionReplies&& r) noexcept { void operator=(CommitProxyVersionReplies&& r) noexcept {
replies = std::move(r.replies); replies = std::move(r.replies);
latestRequestNum = std::move(r.latestRequestNum); latestRequestNum = std::move(r.latestRequestNum);
} }
ProxyVersionReplies() : latestRequestNum(0) {} CommitProxyVersionReplies() : latestRequestNum(0) {}
}; };
ACTOR Future<Void> masterTerminateOnConflict( UID dbgid, Promise<Void> fullyRecovered, Future<Void> onConflict, Future<Void> switchedState ) { ACTOR Future<Void> masterTerminateOnConflict( UID dbgid, Promise<Void> fullyRecovered, Future<Void> onConflict, Future<Void> switchedState ) {
@ -177,7 +177,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
recoveryTransactionVersion; // The first version in this epoch recoveryTransactionVersion; // The first version in this epoch
double lastCommitTime; double lastCommitTime;
Version liveCommittedVersion; // The largest live committed version reported by proxies. Version liveCommittedVersion; // The largest live committed version reported by commit proxies.
bool databaseLocked; bool databaseLocked;
Optional<Value> proxyMetadataVersion; Optional<Value> proxyMetadataVersion;
Version minKnownCommittedVersion; Version minKnownCommittedVersion;
@ -213,7 +213,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
std::vector<GrvProxyInterface> provisionalGrvProxies; std::vector<GrvProxyInterface> provisionalGrvProxies;
std::vector<ResolverInterface> resolvers; std::vector<ResolverInterface> resolvers;
std::map<UID, ProxyVersionReplies> lastProxyVersionReplies; std::map<UID, CommitProxyVersionReplies> lastCommitProxyVersionReplies;
Standalone<StringRef> dbId; Standalone<StringRef> dbId;
@ -299,7 +299,7 @@ ACTOR Future<Void> newCommitProxies(Reference<MasterData> self, RecruitFromConfi
} }
vector<CommitProxyInterface> newRecruits = wait(getAll(initializationReplies)); vector<CommitProxyInterface> newRecruits = wait(getAll(initializationReplies));
// It is required for the correctness of COMMIT_ON_FIRST_PROXY that self->proxies[0] is the firstCommitProxy. // It is required for the correctness of COMMIT_ON_FIRST_PROXY that self->commitProxies[0] is the firstCommitProxy.
self->commitProxies = newRecruits; self->commitProxies = newRecruits;
return Void(); return Void();
@ -502,14 +502,14 @@ ACTOR Future<Void> updateLogsValue( Reference<MasterData> self, Database cx ) {
} }
Future<Void> sendMasterRegistration(MasterData* self, LogSystemConfig const& logSystemConfig, Future<Void> sendMasterRegistration(MasterData* self, LogSystemConfig const& logSystemConfig,
vector<CommitProxyInterface> proxies, vector<GrvProxyInterface> grvProxies, vector<CommitProxyInterface> commitProxies, vector<GrvProxyInterface> grvProxies,
vector<ResolverInterface> resolvers, DBRecoveryCount recoveryCount, vector<ResolverInterface> resolvers, DBRecoveryCount recoveryCount,
vector<UID> priorCommittedLogServers) { vector<UID> priorCommittedLogServers) {
RegisterMasterRequest masterReq; RegisterMasterRequest masterReq;
masterReq.id = self->myInterface.id(); masterReq.id = self->myInterface.id();
masterReq.mi = self->myInterface.locality; masterReq.mi = self->myInterface.locality;
masterReq.logSystemConfig = logSystemConfig; masterReq.logSystemConfig = logSystemConfig;
masterReq.commitProxies = proxies; masterReq.commitProxies = commitProxies;
masterReq.grvProxies = grvProxies; masterReq.grvProxies = grvProxies;
masterReq.resolvers = resolvers; masterReq.resolvers = resolvers;
masterReq.recoveryCount = recoveryCount; masterReq.recoveryCount = recoveryCount;
@ -978,9 +978,9 @@ ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem
ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionRequest req) { ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionRequest req) {
state Span span("M:getVersion"_loc, { req.spanContext }); state Span span("M:getVersion"_loc, { req.spanContext });
state std::map<UID, ProxyVersionReplies>::iterator proxyItr = self->lastProxyVersionReplies.find(req.requestingProxy); // lastProxyVersionReplies never changes state std::map<UID, CommitProxyVersionReplies>::iterator proxyItr = self->lastCommitProxyVersionReplies.find(req.requestingProxy); // lastCommitProxyVersionReplies never changes
if (proxyItr == self->lastProxyVersionReplies.end()) { if (proxyItr == self->lastCommitProxyVersionReplies.end()) {
// Request from invalid proxy (e.g. from duplicate recruitment request) // Request from invalid proxy (e.g. from duplicate recruitment request)
req.reply.send(Never()); req.reply.send(Never());
return Void(); return Void();
@ -1047,7 +1047,7 @@ ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionReques
ACTOR Future<Void> provideVersions(Reference<MasterData> self) { ACTOR Future<Void> provideVersions(Reference<MasterData> self) {
state ActorCollection versionActors(false); state ActorCollection versionActors(false);
for (auto& p : self->commitProxies) self->lastProxyVersionReplies[p.id()] = ProxyVersionReplies(); for (auto& p : self->commitProxies) self->lastCommitProxyVersionReplies[p.id()] = CommitProxyVersionReplies();
loop { loop {
choose { choose {
@ -1626,7 +1626,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
state Future<ErrorOr<CommitID>> recoveryCommit = self->commitProxies[0].commit.tryGetReply(recoveryCommitRequest); state Future<ErrorOr<CommitID>> recoveryCommit = self->commitProxies[0].commit.tryGetReply(recoveryCommitRequest);
self->addActor.send( self->logSystem->onError() ); self->addActor.send( self->logSystem->onError() );
self->addActor.send( waitResolverFailure( self->resolvers ) ); self->addActor.send( waitResolverFailure( self->resolvers ) );
self->addActor.send(waitCommitProxyFailure(self->commitProxies)); self->addActor.send( waitCommitProxyFailure(self->commitProxies));
self->addActor.send( waitGrvProxyFailure( self->grvProxies ) ); self->addActor.send( waitGrvProxyFailure( self->grvProxies ) );
self->addActor.send( provideVersions(self) ); self->addActor.send( provideVersions(self) );
self->addActor.send( serveLiveCommittedVersion(self) ); self->addActor.send( serveLiveCommittedVersion(self) );

View File

@ -3974,11 +3974,11 @@ ACTOR Future<Void> replaceInterface( StorageServer* self, StorageServerInterface
loop { loop {
state Future<Void> infoChanged = self->db->onChange(); state Future<Void> infoChanged = self->db->onChange();
state Reference<CommitProxyInfo> proxies(new CommitProxyInfo(self->db->get().client.commitProxies)); state Reference<CommitProxyInfo> commitProxies(new CommitProxyInfo(self->db->get().client.commitProxies));
choose { choose {
when(GetStorageServerRejoinInfoReply _rep = when(GetStorageServerRejoinInfoReply _rep =
wait(proxies->size() wait(commitProxies->size()
? basicLoadBalance(proxies, &CommitProxyInterface::getStorageServerRejoinInfo, ? basicLoadBalance(commitProxies, &CommitProxyInterface::getStorageServerRejoinInfo,
GetStorageServerRejoinInfoRequest(ssi.id(), ssi.locality.dcId())) GetStorageServerRejoinInfoRequest(ssi.id(), ssi.locality.dcId()))
: Never())) { : Never())) {
state GetStorageServerRejoinInfoReply rep = _rep; state GetStorageServerRejoinInfoReply rep = _rep;

View File

@ -302,7 +302,7 @@ struct ConfigureDatabaseWorkload : TestWorkload {
config += generateRegions(); config += generateRegions();
if (deterministicRandom()->random01() < 0.5) config += " logs=" + format("%d", randomRoleNumber()); if (deterministicRandom()->random01() < 0.5) config += " logs=" + format("%d", randomRoleNumber());
if (deterministicRandom()->random01() < 0.5) config += " proxies=" + format("%d", randomRoleNumber()); if (deterministicRandom()->random01() < 0.5) config += " commit_proxies=" + format("%d", randomRoleNumber());
if (deterministicRandom()->random01() < 0.5) if (deterministicRandom()->random01() < 0.5)
config += " grv_proxies=" + format("%d", randomRoleNumber()); config += " grv_proxies=" + format("%d", randomRoleNumber());
if (deterministicRandom()->random01() < 0.5) config += " resolvers=" + format("%d", randomRoleNumber()); if (deterministicRandom()->random01() < 0.5) config += " resolvers=" + format("%d", randomRoleNumber());

View File

@ -88,14 +88,14 @@ struct TargetedKillWorkload : TestWorkload {
if( self->machineToKill == "master" ) { if( self->machineToKill == "master" ) {
machine = self->dbInfo->get().master.address(); machine = self->dbInfo->get().master.address();
} else if (self->machineToKill == "commitproxy") { } else if (self->machineToKill == "commitproxy") {
auto proxies = cx->getCommitProxies(false); auto commitProxies = cx->getCommitProxies(false);
int o = deterministicRandom()->randomInt(0, proxies->size()); int o = deterministicRandom()->randomInt(0, commitProxies->size());
for( int i = 0; i < proxies->size(); i++) { for( int i = 0; i < commitProxies->size(); i++) {
CommitProxyInterface mpi = proxies->getInterface(o); CommitProxyInterface mpi = commitProxies->getInterface(o);
machine = mpi.address(); machine = mpi.address();
if(machine != self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().getPrimaryAddress()) if(machine != self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().getPrimaryAddress())
break; break;
o = ++o%proxies->size(); o = ++o%commitProxies->size();
} }
} else if (self->machineToKill == "grvproxy") { } else if (self->machineToKill == "grvproxy") {
auto grvProxies = cx->getGrvProxies(false); auto grvProxies = cx->getGrvProxies(false);

View File

@ -2,6 +2,6 @@ Using cluster file `tcf/separatecoordinator.cluster'.
Recruiting new transaction servers. Recruiting new transaction servers.
Need at least 3 log servers, 1 proxies, 1 grv proxies and 1 resolvers. Need at least 3 log servers, 1 commit proxies, 1 grv proxies and 1 resolvers.
Have 1 processes on 1 machines. Have 1 processes on 1 machines.