Merge branch 'main' of https://github.com/apple/foundationdb into readaware

This commit is contained in:
Xiaoxi Wang 2022-05-23 15:09:03 -07:00
commit fd35fde481
21 changed files with 231 additions and 213 deletions

View File

@ -280,43 +280,31 @@ def suspend(logger):
assert get_value_from_status_json(False, 'client', 'database_status', 'available')
def extract_version_epoch(cli_output):
return int(cli_output.split("\n")[-1].split(" ")[-1])
@enable_logging()
def targetversion(logger):
version1 = run_fdbcli_command('targetversion getepoch')
def versionepoch(logger):
version1 = run_fdbcli_command('versionepoch')
assert version1 == "Version epoch is unset"
version2 = int(run_fdbcli_command('getversion'))
logger.debug("read version: {}".format(version2))
assert version2 >= 0
# set the version epoch to the default value
logger.debug("setting version epoch to default")
run_fdbcli_command('targetversion add 0')
# get the version epoch
versionepoch1 = extract_version_epoch(run_fdbcli_command('targetversion getepoch'))
logger.debug("version epoch: {}".format(versionepoch1))
# make sure the version increased
version3 = int(run_fdbcli_command('getversion'))
logger.debug("read version: {}".format(version3))
assert version3 >= version2
# slightly increase the version epoch
versionepoch2 = extract_version_epoch(run_fdbcli_command("targetversion setepoch {}".format(versionepoch1 + 1000000)))
logger.debug("version epoch: {}".format(versionepoch2))
assert versionepoch2 == versionepoch1 + 1000000
# slightly decrease the version epoch
versionepoch3 = extract_version_epoch(run_fdbcli_command("targetversion add {}".format(-1000000)))
logger.debug("version epoch: {}".format(versionepoch3))
assert versionepoch3 == versionepoch2 - 1000000 == versionepoch1
# the versions should still be increasing
version4 = int(run_fdbcli_command('getversion'))
logger.debug("read version: {}".format(version4))
assert version4 >= version3
# clear the version epoch and make sure it is now unset
run_fdbcli_command("targetversion clearepoch")
version5 = run_fdbcli_command('targetversion getepoch')
assert version5 == "Version epoch is unset"
version2 = run_fdbcli_command('versionepoch get')
assert version2 == "Version epoch is unset"
version3 = run_fdbcli_command('versionepoch commit')
assert version3 == "Must set the version epoch before committing it (see `versionepoch enable`)"
version4 = run_fdbcli_command('versionepoch enable')
assert version4 == "Version epoch enabled. Run `versionepoch commit` to irreversibly jump to the target version"
version5 = run_fdbcli_command('versionepoch get')
assert version5 == "Current version epoch is 0"
version6 = run_fdbcli_command('versionepoch set 10')
assert version6 == "Version epoch enabled. Run `versionepoch commit` to irreversibly jump to the target version"
version7 = run_fdbcli_command('versionepoch get')
assert version7 == "Current version epoch is 10"
run_fdbcli_command('versionepoch disable')
version8 = run_fdbcli_command('versionepoch get')
assert version8 == "Version epoch is unset"
version9 = run_fdbcli_command('versionepoch enable')
assert version9 == "Version epoch enabled. Run `versionepoch commit` to irreversibly jump to the target version"
version10 = run_fdbcli_command('versionepoch get')
assert version10 == "Current version epoch is 0"
version11 = run_fdbcli_command('versionepoch commit')
assert version11.startswith("Current read version is ")
def get_value_from_status_json(retry, *args):
@ -747,9 +735,7 @@ if __name__ == '__main__':
throttle()
triggerddteaminfolog()
tenants()
# TODO: similar to advanceversion, this seems to cause some issues, so disable for now
# This must go last, otherwise the version advancement can mess with the other tests
# targetversion()
versionepoch()
else:
assert args.process_number > 1, "Process number should be positive"
coordinators()

View File

@ -13,8 +13,6 @@ Overview
A tenant in a FoundationDB cluster maps a byte-string name to a key-space that can be used to store data associated with that tenant. This key-space is stored in the clusters global key-space under a prefix assigned to that tenant, with each tenant being assigned a separate non-intersecting prefix.
In addition to being each assigned a separate tenant prefix, tenants can be configured to have a common shared prefix. By default, the shared prefix is empty and tenants are allocated prefixes throughout the normal key-space. To configure an alternate shared prefix, set the ``\xff/tenantDataPrefix`` key to have the desired prefix as the value.
Tenant operations are implicitly confined to the key-space associated with the tenant. It is not necessary for client applications to use or be aware of the prefix assigned to the tenant.
Enabling tenants

View File

@ -158,7 +158,7 @@ ACTOR Future<bool> versionEpochCommandActor(Reference<IDatabase> db, Database cx
CommandFactory versionEpochFactory(
"versionepoch",
CommandHelp("versionepoch [<enable|commit|set|disable> [EPOCH]]",
CommandHelp("versionepoch [<enable|commit|get|set|disable> [EPOCH]]",
"Read or write the version epoch",
"If no arguments are specified, reports the offset between the expected version "
"and the actual version. Otherwise, enables, disables, or commits the version epoch. "

View File

@ -31,7 +31,7 @@
// Determine public IP address by calling the first available coordinator.
// If fail connecting all coordinators, throw bind_failed().
IPAddress determinePublicIPAutomatically(ClusterConnectionString& ccs) {
int size = ccs.coordinators().size() + ccs.hostnames.size();
int size = ccs.coords.size() + ccs.hostnames.size();
int index = 0;
loop {
try {
@ -42,10 +42,10 @@ IPAddress determinePublicIPAutomatically(ClusterConnectionString& ccs) {
NetworkAddress coordAddr;
// Try coords first, because they don't need to be resolved.
if (index < ccs.coordinators().size()) {
coordAddr = ccs.coordinators()[index];
if (index < ccs.coords.size()) {
coordAddr = ccs.coords[index];
} else {
Hostname& h = ccs.hostnames[index - ccs.coordinators().size()];
Hostname& h = ccs.hostnames[index - ccs.coords.size()];
Optional<NetworkAddress> resolvedAddr = h.resolveBlocking();
if (!resolvedAddr.present()) {
throw lookup_failed();

View File

@ -66,7 +66,6 @@ public:
ClusterConnectionString(const std::vector<NetworkAddress>& coordinators, Key key);
ClusterConnectionString(const std::vector<Hostname>& hosts, Key key);
std::vector<NetworkAddress> const& coordinators() const { return coords; }
Key clusterKey() const { return key; }
Key clusterKeyName() const {
return keyDesc;

View File

@ -108,6 +108,11 @@ void IKnobCollection::setupKnobs(const std::vector<std::pair<std::string, std::s
TraceEvent(SevWarnAlways, "InvalidKnobValue")
.detail("Knob", printable(knobName))
.detail("Value", printable(knobValueString));
} else if (e.code() == error_code_invalid_option) {
std::cerr << "WARNING: Invalid knob option '" << knobName << "'\n";
TraceEvent(SevWarnAlways, "InvalidKnobName")
.detail("Knob", printable(knobName))
.detail("Value", printable(knobValueString));
} else {
std::cerr << "ERROR: Failed to set knob option '" << knobName << "': " << e.what() << "\n";
TraceEvent(SevError, "FailedToSetKnob")

View File

@ -799,8 +799,8 @@ ACTOR Future<Optional<ClusterConnectionString>> getConnectionString(Database cx)
}
ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
Reference<IQuorumChange> change,
std::vector<NetworkAddress> desiredCoordinators) {
ClusterConnectionString* conn,
std::string newName) {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES);
@ -816,34 +816,30 @@ ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
tr->getDatabase()->getConnectionRecord()->getConnectionString().clusterKeyName())
return CoordinatorsResult::BAD_DATABASE_STATE; // Someone changed the "name" of the database??
state std::vector<NetworkAddress> oldCoordinators = wait(old.tryResolveHostnames());
state CoordinatorsResult result = CoordinatorsResult::SUCCESS;
if (!desiredCoordinators.size()) {
std::vector<NetworkAddress> _desiredCoordinators = wait(change->getDesiredCoordinators(
tr,
oldCoordinators,
Reference<ClusterConnectionMemoryRecord>(new ClusterConnectionMemoryRecord(old)),
result));
desiredCoordinators = _desiredCoordinators;
if (conn->hostnames.size() + conn->coords.size() == 0) {
conn->hostnames = old.hostnames;
conn->coords = old.coords;
}
std::vector<NetworkAddress> desiredCoordinators = wait(conn->tryResolveHostnames());
if (desiredCoordinators.size() != conn->hostnames.size() + conn->coords.size()) {
TraceEvent("ChangeQuorumCheckerEarlyTermination")
.detail("Reason", "One or more hostnames are unresolvable")
.backtrace();
return CoordinatorsResult::COORDINATOR_UNREACHABLE;
}
if (result != CoordinatorsResult::SUCCESS)
return result;
if (!desiredCoordinators.size())
return CoordinatorsResult::INVALID_NETWORK_ADDRESSES;
std::sort(desiredCoordinators.begin(), desiredCoordinators.end());
std::string newName = change->getDesiredClusterKeyName();
if (newName.empty())
if (newName.empty()) {
newName = old.clusterKeyName().toString();
if (oldCoordinators == desiredCoordinators && old.clusterKeyName() == newName)
}
std::sort(conn->hostnames.begin(), conn->hostnames.end());
std::sort(conn->coords.begin(), conn->coords.end());
std::sort(old.hostnames.begin(), old.hostnames.end());
std::sort(old.coords.begin(), old.coords.end());
if (conn->hostnames == old.hostnames && conn->coords == old.coords && old.clusterKeyName() == newName) {
return CoordinatorsResult::SAME_NETWORK_ADDRESSES;
}
state ClusterConnectionString conn(desiredCoordinators,
StringRef(newName + ':' + deterministicRandom()->randomAlphaNumeric(32)));
conn->parseKey(newName + ':' + deterministicRandom()->randomAlphaNumeric(32));
if (g_network->isSimulated()) {
int i = 0;
@ -868,19 +864,27 @@ ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
}
std::vector<Future<Optional<LeaderInfo>>> leaderServers;
ClientCoordinators coord(Reference<ClusterConnectionMemoryRecord>(new ClusterConnectionMemoryRecord(conn)));
ClientCoordinators coord(Reference<ClusterConnectionMemoryRecord>(new ClusterConnectionMemoryRecord(*conn)));
leaderServers.reserve(coord.clientLeaderServers.size());
for (int i = 0; i < coord.clientLeaderServers.size(); i++)
leaderServers.push_back(retryBrokenPromise(coord.clientLeaderServers[i].getLeader,
GetLeaderRequest(coord.clusterKey, UID()),
TaskPriority::CoordinationReply));
for (int i = 0; i < coord.clientLeaderServers.size(); i++) {
if (coord.clientLeaderServers[i].hostname.present()) {
leaderServers.push_back(retryGetReplyFromHostname(GetLeaderRequest(coord.clusterKey, UID()),
coord.clientLeaderServers[i].hostname.get(),
WLTOKEN_CLIENTLEADERREG_GETLEADER,
TaskPriority::CoordinationReply));
} else {
leaderServers.push_back(retryBrokenPromise(coord.clientLeaderServers[i].getLeader,
GetLeaderRequest(coord.clusterKey, UID()),
TaskPriority::CoordinationReply));
}
}
choose {
when(wait(waitForAll(leaderServers))) {}
when(wait(delay(5.0))) { return CoordinatorsResult::COORDINATOR_UNREACHABLE; }
}
tr->set(coordinatorsKey, conn.toString());
tr->set(coordinatorsKey, conn->toString());
return Optional<CoordinatorsResult>();
}
@ -990,32 +994,6 @@ ACTOR Future<CoordinatorsResult> changeQuorum(Database cx, Reference<IQuorumChan
}
}
struct SpecifiedQuorumChange final : IQuorumChange {
std::vector<NetworkAddress> desired;
explicit SpecifiedQuorumChange(std::vector<NetworkAddress> const& desired) : desired(desired) {}
Future<std::vector<NetworkAddress>> getDesiredCoordinators(Transaction* tr,
std::vector<NetworkAddress> oldCoordinators,
Reference<IClusterConnectionRecord>,
CoordinatorsResult&) override {
return desired;
}
};
Reference<IQuorumChange> specifiedQuorumChange(std::vector<NetworkAddress> const& addresses) {
return Reference<IQuorumChange>(new SpecifiedQuorumChange(addresses));
}
struct NoQuorumChange final : IQuorumChange {
Future<std::vector<NetworkAddress>> getDesiredCoordinators(Transaction* tr,
std::vector<NetworkAddress> oldCoordinators,
Reference<IClusterConnectionRecord>,
CoordinatorsResult&) override {
return oldCoordinators;
}
};
Reference<IQuorumChange> noQuorumChange() {
return Reference<IQuorumChange>(new NoQuorumChange);
}
struct NameQuorumChange final : IQuorumChange {
std::string newName;
Reference<IQuorumChange> otherChange;
@ -1062,12 +1040,30 @@ struct AutoQuorumChange final : IQuorumChange {
Reference<IClusterConnectionRecord> ccr,
int desiredCount,
std::set<AddressExclusion>* excluded) {
ClusterConnectionString cs = ccr->getConnectionString();
if (oldCoordinators.size() != cs.hostnames.size() + cs.coords.size()) {
return false;
}
// Are there enough coordinators for the redundancy level?
if (oldCoordinators.size() < desiredCount)
return false;
if (oldCoordinators.size() % 2 != 1)
return false;
// Check exclusions
for (auto& c : oldCoordinators) {
if (addressExcluded(*excluded, c))
return false;
}
// Check locality
// FIXME: Actual locality!
std::sort(oldCoordinators.begin(), oldCoordinators.end());
for (int i = 1; i < oldCoordinators.size(); i++)
if (oldCoordinators[i - 1].ip == oldCoordinators[i].ip)
return false; // Multiple coordinators share an IP
// Check availability
ClientCoordinators coord(ccr);
std::vector<Future<Optional<LeaderInfo>>> leaderServers;
@ -1095,19 +1091,6 @@ struct AutoQuorumChange final : IQuorumChange {
}
}
// Check exclusions
for (auto& c : oldCoordinators) {
if (addressExcluded(*excluded, c))
return false;
}
// Check locality
// FIXME: Actual locality!
std::sort(oldCoordinators.begin(), oldCoordinators.end());
for (int i = 1; i < oldCoordinators.size(); i++)
if (oldCoordinators[i - 1].ip == oldCoordinators[i].ip)
return false; // Multiple coordinators share an IP
return true; // The status quo seems fine
}
@ -1149,8 +1132,10 @@ struct AutoQuorumChange final : IQuorumChange {
if (checkAcceptable) {
bool ok = wait(isAcceptable(self.getPtr(), tr, oldCoordinators, ccr, desiredCount, &excluded));
if (ok)
if (ok) {
*err = CoordinatorsResult::SAME_NETWORK_ADDRESSES;
return oldCoordinators;
}
}
std::vector<NetworkAddress> chosen;

View File

@ -55,12 +55,10 @@ struct IQuorumChange : ReferenceCounted<IQuorumChange> {
// Change to use the given set of coordination servers
ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
Reference<IQuorumChange> change,
std::vector<NetworkAddress> desiredCoordinators);
ClusterConnectionString* conn,
std::string newName);
ACTOR Future<CoordinatorsResult> changeQuorum(Database cx, Reference<IQuorumChange> change);
Reference<IQuorumChange> autoQuorumChange(int desired = -1);
Reference<IQuorumChange> noQuorumChange();
Reference<IQuorumChange> specifiedQuorumChange(std::vector<NetworkAddress> const&);
Reference<IQuorumChange> nameQuorumChange(std::string const& name, Reference<IQuorumChange> const& other);
// Exclude the given set of servers from use as state servers. Returns as soon as the change is durable, without

View File

@ -250,7 +250,7 @@ TEST_CASE("/fdbclient/MonitorLeader/ConnectionString/hostname") {
ClusterConnectionString cs(hostnames, LiteralStringRef("TestCluster:0"));
ASSERT(cs.hostnames.size() == 2);
ASSERT(cs.coordinators().size() == 0);
ASSERT(cs.coords.size() == 0);
ASSERT(cs.toString() == connectionString);
}
@ -301,7 +301,7 @@ TEST_CASE("/fdbclient/MonitorLeader/PartialResolve") {
INetworkConnections::net()->addMockTCPEndpoint(hn, port, { address });
ClusterConnectionString cs(connectionString);
state std::vector<NetworkAddress> allCoordinators = wait(cs.tryResolveHostnames());
std::vector<NetworkAddress> allCoordinators = wait(cs.tryResolveHostnames());
ASSERT(allCoordinators.size() == 1 &&
std::find(allCoordinators.begin(), allCoordinators.end(), address) != allCoordinators.end());
@ -460,7 +460,7 @@ ClientCoordinators::ClientCoordinators(Reference<IClusterConnectionRecord> ccr)
for (auto h : cs.hostnames) {
clientLeaderServers.push_back(ClientLeaderRegInterface(h));
}
for (auto s : cs.coordinators()) {
for (auto s : cs.coords) {
clientLeaderServers.push_back(ClientLeaderRegInterface(s));
}
}
@ -866,7 +866,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> supportedVersions,
Key traceLogGroup) {
state ClusterConnectionString cs = info.intermediateConnRecord->getConnectionString();
state int coordinatorsSize = cs.hostnames.size() + cs.coordinators().size();
state int coordinatorsSize = cs.hostnames.size() + cs.coords.size();
state int index = 0;
state int successIndex = 0;
state Optional<double> incorrectTime;
@ -880,7 +880,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
for (const auto& h : cs.hostnames) {
clientLeaderServers.push_back(ClientLeaderRegInterface(h));
}
for (const auto& c : cs.coordinators()) {
for (const auto& c : cs.coords) {
clientLeaderServers.push_back(ClientLeaderRegInterface(c));
}
@ -892,7 +892,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
req.clusterKey = cs.clusterKey();
req.hostnames = cs.hostnames;
req.coordinators = cs.coordinators();
req.coordinators = cs.coords;
req.knownClientInfoID = clientInfo->get().id;
req.supportedVersions = supportedVersions->get();
req.traceLogGroup = traceLogGroup;

View File

@ -409,11 +409,11 @@ public:
PaxosConfigTransactionImpl(Database const& cx) : cx(cx) {
const ClusterConnectionString& cs = cx->getConnectionRecord()->getConnectionString();
ctis.reserve(cs.hostnames.size() + cs.coordinators().size());
ctis.reserve(cs.hostnames.size() + cs.coords.size());
for (const auto& h : cs.hostnames) {
ctis.emplace_back(h);
}
for (const auto& c : cs.coordinators()) {
for (const auto& c : cs.coords) {
ctis.emplace_back(c);
}
getGenerationQuorum = GetGenerationQuorum{ ctis };

View File

@ -162,8 +162,8 @@ class SimpleConfigTransactionImpl {
public:
SimpleConfigTransactionImpl(Database const& cx) : cx(cx) {
const ClusterConnectionString& cs = cx->getConnectionRecord()->getConnectionString();
if (cs.coordinators().size()) {
std::vector<NetworkAddress> coordinators = cs.coordinators();
if (cs.coords.size()) {
std::vector<NetworkAddress> coordinators = cs.coords;
std::sort(coordinators.begin(), coordinators.end());
cti = ConfigTransactionInterface(coordinators[0]);
} else {

View File

@ -1658,7 +1658,6 @@ Future<RangeResult> CoordinatorsImpl::getRange(ReadYourWritesTransaction* ryw,
}
ACTOR static Future<Optional<std::string>> coordinatorsCommitActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
state Reference<IQuorumChange> change;
state ClusterConnectionString conn; // We don't care about the Key here.
state std::vector<std::string> process_address_or_hostname_strs;
state Optional<std::string> msg;
@ -1704,15 +1703,7 @@ ACTOR static Future<Optional<std::string>> coordinatorsCommitActor(ReadYourWrite
}
}
std::vector<NetworkAddress> addressesVec = wait(conn.tryResolveHostnames());
if (addressesVec.size() != conn.hostnames.size() + conn.coordinators().size()) {
return ManagementAPIError::toJsonString(false, "coordinators", "One or more hostnames are not resolvable.");
} else if (addressesVec.size()) {
change = specifiedQuorumChange(addressesVec);
} else {
change = noQuorumChange();
}
std::string newName;
// check update for cluster_description
Key cluster_decription_key = LiteralStringRef("cluster_description").withPrefix(kr.begin);
auto entry = ryw->getSpecialKeySpaceWriteMap()[cluster_decription_key];
@ -1720,7 +1711,7 @@ ACTOR static Future<Optional<std::string>> coordinatorsCommitActor(ReadYourWrite
// check valid description [a-zA-Z0-9_]+
if (entry.second.present() && isAlphaNumeric(entry.second.get().toString())) {
// do the name change
change = nameQuorumChange(entry.second.get().toString(), change);
newName = entry.second.get().toString();
} else {
// throw the error
return ManagementAPIError::toJsonString(
@ -1728,13 +1719,11 @@ ACTOR static Future<Optional<std::string>> coordinatorsCommitActor(ReadYourWrite
}
}
ASSERT(change.isValid());
TraceEvent(SevDebug, "SKSChangeCoordinatorsStart")
.detail("NewAddresses", describe(addressesVec))
.detail("NewConnectionString", conn.toString())
.detail("Description", entry.first ? entry.second.get().toString() : "");
Optional<CoordinatorsResult> r = wait(changeQuorumChecker(&ryw->getTransaction(), change, addressesVec));
Optional<CoordinatorsResult> r = wait(changeQuorumChecker(&ryw->getTransaction(), &conn, newName));
TraceEvent(SevDebug, "SKSChangeCoordinatorsFinish")
.detail("Result", r.present() ? static_cast<int>(r.get()) : -1); // -1 means success
@ -1803,9 +1792,20 @@ ACTOR static Future<RangeResult> CoordinatorsAutoImplActor(ReadYourWritesTransac
throw special_keys_api_failure();
}
for (const auto& address : _desiredCoordinators) {
autoCoordinatorsKey += autoCoordinatorsKey.size() ? "," : "";
autoCoordinatorsKey += address.toString();
if (result == CoordinatorsResult::SAME_NETWORK_ADDRESSES) {
for (const auto& host : old.hostnames) {
autoCoordinatorsKey += autoCoordinatorsKey.size() ? "," : "";
autoCoordinatorsKey += host.toString();
}
for (const auto& coord : old.coords) {
autoCoordinatorsKey += autoCoordinatorsKey.size() ? "," : "";
autoCoordinatorsKey += coord.toString();
}
} else {
for (const auto& address : _desiredCoordinators) {
autoCoordinatorsKey += autoCoordinatorsKey.size() ? "," : "";
autoCoordinatorsKey += address.toString();
}
}
res.push_back_deep(res.arena(), KeyValueRef(kr.begin, Value(autoCoordinatorsKey)));
return res;
@ -1853,6 +1853,12 @@ Future<RangeResult> AdvanceVersionImpl::getRange(ReadYourWritesTransaction* ryw,
}
ACTOR static Future<Optional<std::string>> advanceVersionCommitActor(ReadYourWritesTransaction* ryw, Version v) {
Optional<Standalone<StringRef>> versionEpochValue = wait(ryw->getTransaction().get(versionEpochKey));
if (versionEpochValue.present()) {
return ManagementAPIError::toJsonString(
false, "advanceversion", "Illegal to modify the version while the version epoch is enabled");
}
// Max version we can set for minRequiredCommitVersionKey,
// making sure the cluster can still be alive for 1000 years after the recovery
static const Version maxAllowedVerion =

View File

@ -95,7 +95,7 @@ inline void save(Archive& ar, const Reference<IReplicationPolicy>& value) {
}
}
struct PolicyOne final : IReplicationPolicy, public ReferenceCounted<PolicyOne> {
struct PolicyOne final : IReplicationPolicy {
PolicyOne(){};
explicit PolicyOne(const PolicyOne& o) {}
std::string name() const override { return "One"; }
@ -115,7 +115,7 @@ struct PolicyOne final : IReplicationPolicy, public ReferenceCounted<PolicyOne>
void attributeKeys(std::set<std::string>* set) const override { return; }
};
struct PolicyAcross final : IReplicationPolicy, public ReferenceCounted<PolicyAcross> {
struct PolicyAcross final : IReplicationPolicy {
friend struct serializable_traits<PolicyAcross*>;
PolicyAcross(int count, std::string const& attribKey, Reference<IReplicationPolicy> const policy);
explicit PolicyAcross();
@ -168,7 +168,7 @@ protected:
Arena _arena;
};
struct PolicyAnd final : IReplicationPolicy, public ReferenceCounted<PolicyAnd> {
struct PolicyAnd final : IReplicationPolicy {
friend struct serializable_traits<PolicyAnd*>;
PolicyAnd(std::vector<Reference<IReplicationPolicy>> policies) : _policies(policies), _sortedPolicies(policies) {
// Sort the policy array

View File

@ -104,7 +104,7 @@ ServerCoordinators::ServerCoordinators(Reference<IClusterConnectionRecord> ccr)
stateServers.emplace_back(h);
configServers.emplace_back(h);
}
for (auto s : cs.coordinators()) {
for (auto s : cs.coords) {
leaderElectionServers.emplace_back(s);
stateServers.emplace_back(s);
configServers.emplace_back(s);

View File

@ -127,18 +127,9 @@ class LocalConfigurationImpl {
} catch (Error& e) {
if (e.code() == error_code_invalid_option) {
TEST(true); // Attempted to manually set invalid knob option
if (!g_network->isSimulated()) {
fprintf(stderr, "WARNING: Unrecognized knob option '%s'\n", knobName.c_str());
}
TraceEvent(SevWarnAlways, "UnrecognizedKnobOption").detail("Knob", printable(knobName));
} else if (e.code() == error_code_invalid_option_value) {
TEST(true); // Invalid manually set knob value
if (!g_network->isSimulated()) {
fprintf(stderr,
"WARNING: Invalid value '%s' for knob option '%s'\n",
knobValueString.c_str(),
knobName.c_str());
}
TraceEvent(SevWarnAlways, "InvalidKnobValue")
.detail("Knob", printable(knobName))
.detail("Value", printable(knobValueString));

View File

@ -857,7 +857,7 @@ std::pair<NetworkAddressList, NetworkAddressList> buildNetworkAddresses(
NetworkAddressList listenNetworkAddresses;
std::vector<Hostname>& hostnames = connectionRecord.getConnectionString().hostnames;
const std::vector<NetworkAddress>& coords = connectionRecord.getConnectionString().coordinators();
const std::vector<NetworkAddress>& coords = connectionRecord.getConnectionString().coords;
ASSERT(hostnames.size() + coords.size() > 0);
for (int ii = 0; ii < publicAddressStrs.size(); ++ii) {

View File

@ -119,6 +119,28 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
~MasterData() = default;
};
Version figureVersion(Version current,
double now,
Version reference,
int64_t toAdd,
double maxVersionRateModifier,
int64_t maxVersionRateOffset) {
// Versions should roughly follow wall-clock time, based on the
// system clock of the current machine and an FDB-specific epoch.
// Calculate the expected version and determine whether we need to
// hand out versions faster or slower to stay in sync with the
// clock.
Version expected = now * SERVER_KNOBS->VERSIONS_PER_SECOND - reference;
// Attempt to jump directly to the expected version. But make
// sure that versions are still being handed out at a rate
// around VERSIONS_PER_SECOND. This rate is scaled depending on
// how far off the calculated version is from the expected
// version.
int64_t maxOffset = std::min(static_cast<int64_t>(toAdd * maxVersionRateModifier), maxVersionRateOffset);
return std::clamp(expected, current + toAdd - maxOffset, current + toAdd + maxOffset);
}
ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionRequest req) {
state Span span("M:getVersion"_loc, req.spanContext);
state std::map<UID, CommitProxyVersionReplies>::iterator proxyItr =
@ -158,11 +180,6 @@ ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionReques
t1 = self->lastVersionTime;
}
// Versions should roughly follow wall-clock time, based on the
// system clock of the current machine and an FDB-specific epoch.
// Calculate the expected version and determine whether we need to
// hand out versions faster or slower to stay in sync with the
// clock.
Version toAdd =
std::max<Version>(1,
std::min<Version>(SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS,
@ -170,18 +187,12 @@ ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionReques
rep.prevVersion = self->version;
if (self->referenceVersion.present()) {
Version expected =
g_network->timer() * SERVER_KNOBS->VERSIONS_PER_SECOND - self->referenceVersion.get();
// Attempt to jump directly to the expected version. But make
// sure that versions are still being handed out at a rate
// around VERSIONS_PER_SECOND. This rate is scaled depending on
// how far off the calculated version is from the expected
// version.
int64_t maxOffset = std::min(static_cast<int64_t>(toAdd * SERVER_KNOBS->MAX_VERSION_RATE_MODIFIER),
SERVER_KNOBS->MAX_VERSION_RATE_OFFSET);
self->version =
std::clamp(expected, self->version + toAdd - maxOffset, self->version + toAdd + maxOffset);
self->version = figureVersion(self->version,
g_network->timer(),
self->referenceVersion.get(),
toAdd,
SERVER_KNOBS->MAX_VERSION_RATE_MODIFIER,
SERVER_KNOBS->MAX_VERSION_RATE_OFFSET);
ASSERT_GT(self->version, rep.prevVersion);
} else {
self->version = self->version + toAdd;
@ -433,3 +444,44 @@ ACTOR Future<Void> masterServer(MasterInterface mi,
throw err;
}
}
TEST_CASE("/fdbserver/MasterServer/FigureVersion/Simple") {
ASSERT_EQ(
figureVersion(0, 1.0, 0, 1e6, SERVER_KNOBS->MAX_VERSION_RATE_MODIFIER, SERVER_KNOBS->MAX_VERSION_RATE_OFFSET),
1e6);
ASSERT_EQ(figureVersion(1e6, 1.5, 0, 100, 0.1, 1e6), 1000110);
ASSERT_EQ(figureVersion(1e6, 1.5, 0, 550000, 0.1, 1e6), 1500000);
return Void();
}
TEST_CASE("/fdbserver/MasterServer/FigureVersion/Small") {
// Should always advance by at least 1 version.
ASSERT_EQ(figureVersion(1e6, 2.0, 0, 1, 0.0001, 1e6), 1000001);
ASSERT_EQ(figureVersion(1e6, 0.0, 0, 1, 0.1, 1e6), 1000001);
return Void();
}
TEST_CASE("/fdbserver/MasterServer/FigureVersion/MaxOffset") {
ASSERT_EQ(figureVersion(1e6, 10.0, 0, 5e6, 0.1, 1e6), 6500000);
ASSERT_EQ(figureVersion(1e6, 20.0, 0, 15e6, 0.1, 1e6), 17e6);
return Void();
}
TEST_CASE("/fdbserver/MasterServer/FigureVersion/PositiveReferenceVersion") {
ASSERT_EQ(figureVersion(1e6, 3.0, 1e6, 1e6, 0.1, 1e6), 2e6);
ASSERT_EQ(figureVersion(1e6, 3.0, 1e6, 100, 0.1, 1e6), 1000110);
return Void();
}
TEST_CASE("/fdbserver/MasterServer/FigureVersion/NegativeReferenceVersion") {
ASSERT_EQ(figureVersion(0, 2.0, -1e6, 3e6, 0.1, 1e6), 3e6);
ASSERT_EQ(figureVersion(0, 2.0, -1e6, 5e5, 0.1, 1e6), 550000);
return Void();
}
TEST_CASE("/fdbserver/MasterServer/FigureVersion/Overflow") {
// The upper range used in std::clamp should overflow.
ASSERT_EQ(figureVersion(std::numeric_limits<Version>::max() - static_cast<Version>(1e6), 1.0, 0, 1e6, 0.1, 1e6),
std::numeric_limits<Version>::max() - static_cast<Version>(1e6 * 0.1));
return Void();
}

View File

@ -2978,7 +2978,7 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderWithDelayedCandidacyImplOneGenerati
Reference<AsyncVar<Value>> result,
MonitorLeaderInfo info) {
ClusterConnectionString cs = info.intermediateConnRecord->getConnectionString();
state int coordinatorsSize = cs.hostnames.size() + cs.coordinators().size();
state int coordinatorsSize = cs.hostnames.size() + cs.coords.size();
state ElectionResultRequest request;
state int index = 0;
state int successIndex = 0;
@ -2988,14 +2988,14 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderWithDelayedCandidacyImplOneGenerati
for (const auto& h : cs.hostnames) {
leaderElectionServers.push_back(LeaderElectionRegInterface(h));
}
for (const auto& c : cs.coordinators()) {
for (const auto& c : cs.coords) {
leaderElectionServers.push_back(LeaderElectionRegInterface(c));
}
deterministicRandom()->randomShuffle(leaderElectionServers);
request.key = cs.clusterKey();
request.hostnames = cs.hostnames;
request.coordinators = cs.coordinators();
request.coordinators = cs.coords;
loop {
LeaderElectionRegInterface interf = leaderElectionServers[index];

View File

@ -956,7 +956,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
state std::vector<std::string> process_addresses;
boost::split(
process_addresses, coordinator_processes_key.get().toString(), [](char c) { return c == ','; });
ASSERT(process_addresses.size() == cs.coordinators().size() + cs.hostnames.size());
ASSERT(process_addresses.size() == cs.coords.size() + cs.hostnames.size());
// compare the coordinator process network addresses one by one
std::vector<NetworkAddress> coordinators = wait(cs.tryResolveHostnames());
for (const auto& network_address : coordinators) {
@ -1080,8 +1080,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
ClusterConnectionString csNew(res.get().toString());
// verify the cluster decription
ASSERT(new_cluster_description == csNew.clusterKeyName().toString());
ASSERT(csNew.hostnames.size() + csNew.coordinators().size() ==
old_coordinators_processes.size() + 1);
ASSERT(csNew.hostnames.size() + csNew.coords.size() == old_coordinators_processes.size() + 1);
std::vector<NetworkAddress> newCoordinators = wait(csNew.tryResolveHostnames());
// verify the coordinators' addresses
for (const auto& network_address : newCoordinators) {

View File

@ -1847,35 +1847,34 @@ ACTOR static Future<std::vector<NetworkAddress>> resolveTCPEndpoint_impl(Net2* s
Promise<std::vector<NetworkAddress>> promise;
state Future<std::vector<NetworkAddress>> result = promise.getFuture();
tcpResolver.async_resolve(tcp::resolver::query(host, service),
[=](const boost::system::error_code& ec, tcp::resolver::iterator iter) {
if (ec) {
self->dnsCache.remove(host, service);
promise.sendError(lookup_failed());
return;
}
tcpResolver.async_resolve(host, service, [=](const boost::system::error_code& ec, tcp::resolver::iterator iter) {
if (ec) {
self->dnsCache.remove(host, service);
promise.sendError(lookup_failed());
return;
}
std::vector<NetworkAddress> addrs;
std::vector<NetworkAddress> addrs;
tcp::resolver::iterator end;
while (iter != end) {
auto endpoint = iter->endpoint();
auto addr = endpoint.address();
if (addr.is_v6()) {
addrs.emplace_back(IPAddress(addr.to_v6().to_bytes()), endpoint.port());
} else {
addrs.emplace_back(addr.to_v4().to_ulong(), endpoint.port());
}
++iter;
}
tcp::resolver::iterator end;
while (iter != end) {
auto endpoint = iter->endpoint();
auto addr = endpoint.address();
if (addr.is_v6()) {
addrs.emplace_back(IPAddress(addr.to_v6().to_bytes()), endpoint.port());
} else {
addrs.emplace_back(addr.to_v4().to_ulong(), endpoint.port());
}
++iter;
}
if (addrs.empty()) {
self->dnsCache.remove(host, service);
promise.sendError(lookup_failed());
} else {
promise.send(addrs);
}
});
if (addrs.empty()) {
self->dnsCache.remove(host, service);
promise.sendError(lookup_failed());
} else {
promise.send(addrs);
}
});
wait(ready(result));
tcpResolver.cancel();

View File

@ -45,7 +45,7 @@ def is_unknown_option(output):
def is_unknown_knob(output):
return output.startswith("ERROR: Failed to set knob option")
return output.startswith("WARNING: Invalid knob option")
def is_cli_usage(output):