Merge branch 'main' of github.com:apple/foundationdb into jfu-grv-cache

This commit is contained in:
Jon Fu 2022-02-23 12:35:25 -05:00
commit ce1d71472d
19 changed files with 919 additions and 704 deletions

View File

@ -176,7 +176,7 @@ ACTOR Future<bool> configureCommandActor(Reference<IDatabase> db,
case ConfigurationResult::STORAGE_MIGRATION_DISABLED:
fprintf(stderr,
"ERROR: Storage engine type cannot be changed because "
"storage_migration_mode=disabled.\n");
"storage_migration_type=disabled.\n");
fprintf(stderr,
"Type `configure perpetual_storage_wiggle=1 storage_migration_type=gradual' to enable gradual "
"migration with the perpetual wiggle, or `configure "

View File

@ -100,6 +100,7 @@ ACTOR Future<bool> changeCoordinators(Reference<IDatabase> db, std::vector<Strin
state std::vector<StringRef>::iterator t;
for (t = tokens.begin() + 1; t != tokens.end(); ++t) {
try {
// TODO(renxuan): add hostname parsing here.
auto const& addr = NetworkAddress::parse(t->toString());
if (new_coordinators_addresses.count(addr)) {
fprintf(stderr, "ERROR: passed redundant coordinators: `%s'\n", addr.toString().c_str());

View File

@ -1157,7 +1157,6 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
state bool writeMode = false;
state std::string clusterConnectString;
state std::map<Key, std::pair<Value, ClientLeaderRegInterface>> address_interface;
state FdbOptions globalOptions;
@ -1171,6 +1170,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
ClusterConnectionFile::lookupClusterFileName(opt.clusterFile);
try {
ccf = makeReference<ClusterConnectionFile>(resolvedClusterFile.first);
wait(ccf->resolveHostnames());
} catch (Error& e) {
fprintf(stderr, "%s\n", ClusterConnectionFile::getErrorString(resolvedClusterFile, e).c_str());
return 1;

View File

@ -28,6 +28,7 @@
#include "fdbclient/CoordinationInterface.h"
// Determine public IP address by calling the first coordinator.
IPAddress determinePublicIPAutomatically(ClusterConnectionString& ccs) {
try {
using namespace boost::asio;
@ -35,6 +36,7 @@ IPAddress determinePublicIPAutomatically(ClusterConnectionString& ccs) {
io_service ioService;
ip::udp::socket socket(ioService);
ccs.resolveHostnamesBlocking();
const auto& coordAddr = ccs.coordinators()[0];
const auto boostIp = coordAddr.ip.isV6() ? ip::address(ip::address_v6(coordAddr.ip.toV6()))
: ip::address(ip::address_v4(coordAddr.ip.toV4()));

View File

@ -99,11 +99,11 @@ public:
AsyncTrigger resolveFinish;
std::vector<NetworkAddress> coords;
std::vector<Hostname> hostnames;
std::unordered_map<NetworkAddress, Hostname> networkAddressToHostname;
private:
void parseConnString();
void parseKey(const std::string& key);
std::unordered_map<NetworkAddress, Hostname> networkAddressToHostname;
Key key, keyDesc;
std::string connectionString;
};

View File

@ -169,7 +169,7 @@ std::map<std::string, std::string> configForToken(std::string const& mode) {
} else if (value == "gradual") {
type = StorageMigrationType::GRADUAL;
} else {
printf("Error: Only disabled|aggressive|gradual are valid for storage_migration_mode.\n");
printf("Error: Only disabled|aggressive|gradual are valid for storage_migration_type.\n");
return out;
}
out[p + key] = format("%d", type);

View File

@ -516,6 +516,7 @@ std::string ClusterConnectionString::toString() const {
}
ClientCoordinators::ClientCoordinators(Reference<IClusterConnectionRecord> ccr) : ccr(ccr) {
ASSERT(ccr->connectionStringStatus() == ClusterConnectionString::RESOLVED);
ClusterConnectionString cs = ccr->getConnectionString();
for (auto s = cs.coordinators().begin(); s != cs.coordinators().end(); ++s)
clientLeaderServers.push_back(ClientLeaderRegInterface(*s));
@ -544,15 +545,44 @@ ClientLeaderRegInterface::ClientLeaderRegInterface(INetwork* local) {
// Nominee is the worker among all workers that are considered as leader by one coordinator
// This function contacts a coordinator coord to ask who is its nominee.
// Note: for coordinators whose NetworkAddress is parsed out of a hostname, a connection failure will cause this actor
// to throw `coordinators_changed()` error
ACTOR Future<Void> monitorNominee(Key key,
ClientLeaderRegInterface coord,
AsyncTrigger* nomineeChange,
Optional<LeaderInfo>* info) {
Optional<LeaderInfo>* info,
Optional<Hostname> hostname = Optional<Hostname>()) {
loop {
state Optional<LeaderInfo> li =
state Optional<LeaderInfo> li;
if (coord.getLeader.getEndpoint().getPrimaryAddress().fromHostname) {
state ErrorOr<Optional<LeaderInfo>> rep =
wait(coord.getLeader.tryGetReply(GetLeaderRequest(key, info->present() ? info->get().changeID : UID()),
TaskPriority::CoordinationReply));
if (rep.isError()) {
// Connecting to nominee failed, most likely due to connection failed.
TraceEvent("MonitorNomineeError")
.detail("Hostname", hostname.present() ? hostname.get().toString() : "UnknownHostname")
.detail("OldAddr", coord.getLeader.getEndpoint().getPrimaryAddress().toString())
.error(rep.getError());
if (rep.getError().code() == error_code_request_maybe_delivered) {
// 50 milliseconds delay to prevent tight resolving loop due to outdated DNS cache
wait(delay(0.05));
throw coordinators_changed();
} else {
throw rep.getError();
}
} else if (rep.present()) {
li = rep.get();
}
} else {
Optional<LeaderInfo> tmp =
wait(retryBrokenPromise(coord.getLeader,
GetLeaderRequest(key, info->present() ? info->get().changeID : UID()),
TaskPriority::CoordinationReply));
li = tmp;
}
wait(Future<Void>(Void())); // Make sure we weren't cancelled
TraceEvent("GetLeaderReply")
@ -627,6 +657,9 @@ Optional<std::pair<LeaderInfo, bool>> getLeader(const std::vector<Optional<Leade
ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Value>> outSerializedLeaderInfo,
MonitorLeaderInfo info) {
loop {
wait(connRecord->resolveHostnames());
wait(info.intermediateConnRecord->resolveHostnames());
state ClientCoordinators coordinators(info.intermediateConnRecord);
state AsyncTrigger nomineeChange;
state std::vector<Optional<LeaderInfo>> nominees;
@ -634,12 +667,19 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration(Reference<IClusterCon
nominees.resize(coordinators.clientLeaderServers.size());
std::vector<Future<Void>> actors;
state std::vector<Future<Void>> actors;
// Ask all coordinators if the worker is considered as a leader (leader nominee) by the coordinator.
actors.reserve(coordinators.clientLeaderServers.size());
for (int i = 0; i < coordinators.clientLeaderServers.size(); i++)
actors.push_back(
monitorNominee(coordinators.clusterKey, coordinators.clientLeaderServers[i], &nomineeChange, &nominees[i]));
for (int i = 0; i < coordinators.clientLeaderServers.size(); i++) {
Optional<Hostname> hostname;
auto r = connRecord->getConnectionString().networkAddressToHostname.find(
coordinators.clientLeaderServers[i].getLeader.getEndpoint().getPrimaryAddress());
if (r != connRecord->getConnectionString().networkAddressToHostname.end()) {
hostname = r->second;
}
actors.push_back(monitorNominee(
coordinators.clusterKey, coordinators.clientLeaderServers[i], &nomineeChange, &nominees[i], hostname));
}
allActors = waitForAll(actors);
loop {
@ -673,7 +713,18 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration(Reference<IClusterCon
outSerializedLeaderInfo->set(leader.get().first.serializedInfo);
}
try {
wait(nomineeChange.onTrigger() || allActors);
} catch (Error& e) {
if (e.code() == error_code_coordinators_changed) {
TraceEvent("MonitorLeaderCoordinatorsChanged").suppressFor(1.0);
connRecord->getConnectionString().resetToUnresolved();
break;
} else {
throw e;
}
}
}
}
}
@ -793,8 +844,8 @@ ACTOR Future<Void> getClientInfoFromLeader(Reference<AsyncVar<Optional<ClusterCo
when(ClientDBInfo ni =
wait(brokenPromiseToNever(knownLeader->get().get().clientInterface.openDatabase.getReply(req)))) {
TraceEvent("GetClientInfoFromLeaderGotClientInfo", knownLeader->get().get().clientInterface.id())
.detail("CommitProxy0", ni.commitProxies.size() ? ni.commitProxies[0].id() : UID())
.detail("GrvProxy0", ni.grvProxies.size() ? ni.grvProxies[0].id() : UID())
.detail("CommitProxy0", ni.commitProxies.size() ? ni.commitProxies[0].address().toString() : "")
.detail("GrvProxy0", ni.grvProxies.size() ? ni.grvProxies[0].address().toString() : "")
.detail("ClientID", ni.id);
clientData->clientInfo->set(CachedSerialization<ClientDBInfo>(ni));
}
@ -806,7 +857,8 @@ ACTOR Future<Void> getClientInfoFromLeader(Reference<AsyncVar<Optional<ClusterCo
ACTOR Future<Void> monitorLeaderAndGetClientInfo(Key clusterKey,
std::vector<NetworkAddress> coordinators,
ClientData* clientData,
Reference<AsyncVar<Optional<LeaderInfo>>> leaderInfo) {
Reference<AsyncVar<Optional<LeaderInfo>>> leaderInfo,
Reference<AsyncVar<Void>> coordinatorsChanged) {
state std::vector<ClientLeaderRegInterface> clientLeaderServers;
state AsyncTrigger nomineeChange;
state std::vector<Optional<LeaderInfo>> nominees;
@ -854,7 +906,14 @@ ACTOR Future<Void> monitorLeaderAndGetClientInfo(Key clusterKey,
leaderInfo->set(leader.get().first);
}
}
try {
wait(nomineeChange.onTrigger() || allActors);
} catch (Error& e) {
if (e.code() == error_code_coordinators_changed) {
coordinatorsChanged->trigger();
}
throw e;
}
}
}
@ -983,9 +1042,15 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
successIndex = index;
} else {
TEST(rep.getError().code() == error_code_failed_to_progress); // Coordinator cant talk to cluster controller
if (rep.getError().code() == error_code_coordinators_changed) {
throw coordinators_changed();
}
index = (index + 1) % addrs.size();
if (index == successIndex) {
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));
// When the client fails talking to all coordinators, we throw coordinators_changed() and let the caller
// re-resolve the connection string and retry.
throw coordinators_changed();
}
}
}
@ -997,8 +1062,11 @@ ACTOR Future<Void> monitorProxies(
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> supportedVersions,
Key traceLogGroup) {
wait(connRecord->get()->resolveHostnames());
state MonitorLeaderInfo info(connRecord->get());
loop {
try {
wait(info.intermediateConnRecord->resolveHostnames());
choose {
when(MonitorLeaderInfo _info = wait(monitorProxiesOneGeneration(
connRecord->get(), clientInfo, coordinator, info, supportedVersions, traceLogGroup))) {
@ -1009,5 +1077,13 @@ ACTOR Future<Void> monitorProxies(
info.intermediateConnRecord = connRecord->get();
}
}
} catch (Error& e) {
if (e.code() == error_code_coordinators_changed) {
TraceEvent("MonitorProxiesCoordinatorsChanged").suppressFor(1.0);
info.intermediateConnRecord->getConnectionString().resetToUnresolved();
} else {
throw e;
}
}
}
}

View File

@ -74,10 +74,11 @@ Future<Void> monitorLeader(Reference<IClusterConnectionRecord> const& connFile,
// This is one place where the leader election algorithm is run. The coodinator contacts all coodinators to collect
// nominees, the nominee with the most nomination is the leader, and collects client data from the leader. This function
// also monitors the change of the leader.
Future<Void> monitorLeaderAndGetClientInfo(Value const& key,
Future<Void> monitorLeaderAndGetClientInfo(Key const& clusterKey,
std::vector<NetworkAddress> const& coordinators,
ClientData* const& clientData,
Reference<AsyncVar<Optional<LeaderInfo>>> const& leaderInfo);
Reference<AsyncVar<Optional<LeaderInfo>>> const& leaderInfo,
Reference<AsyncVar<Void>> const& coordinatorsChanged);
Future<Void> monitorProxies(
Reference<AsyncVar<Reference<IClusterConnectionRecord>>> const& connRecord,

View File

@ -759,16 +759,18 @@ Future<Void> attemptGRVFromOldProxies(std::vector<GrvProxyInterface> oldProxies,
ACTOR static Future<Void> monitorClientDBInfoChange(DatabaseContext* cx,
Reference<AsyncVar<ClientDBInfo> const> clientDBInfo,
AsyncTrigger* proxyChangeTrigger) {
AsyncTrigger* proxiesChangeTrigger) {
state std::vector<CommitProxyInterface> curCommitProxies;
state std::vector<GrvProxyInterface> curGrvProxies;
state ActorCollection actors(false);
state Future<Void> clientDBInfoOnChange = clientDBInfo->onChange();
curCommitProxies = clientDBInfo->get().commitProxies;
curGrvProxies = clientDBInfo->get().grvProxies;
loop {
choose {
when(wait(clientDBInfo->onChange())) {
when(wait(clientDBInfoOnChange)) {
clientDBInfoOnChange = clientDBInfo->onChange();
if (clientDBInfo->get().commitProxies != curCommitProxies ||
clientDBInfo->get().grvProxies != curGrvProxies) {
// This condition is a bit complicated. Here we want to verify that we're unable to receive a read
@ -785,7 +787,7 @@ ACTOR static Future<Void> monitorClientDBInfoChange(DatabaseContext* cx,
}
curCommitProxies = clientDBInfo->get().commitProxies;
curGrvProxies = clientDBInfo->get().grvProxies;
proxyChangeTrigger->trigger();
proxiesChangeTrigger->trigger();
}
}
when(wait(actors.getResult())) { UNSTOPPABLE_ASSERT(false); }
@ -5806,9 +5808,10 @@ ACTOR Future<GetReadVersionReply> getConsistentReadVersion(SpanID parentSpan,
loop {
try {
state GetReadVersionRequest req(span.context, transactionCount, priority, flags, tags, debugID);
state Future<Void> onProxiesChanged = cx->onProxiesChanged();
choose {
when(wait(cx->onProxiesChanged())) {}
when(wait(onProxiesChanged)) { onProxiesChanged = cx->onProxiesChanged(); }
when(GetReadVersionReply v =
wait(basicLoadBalance(cx->getGrvProxies(UseProvisionalProxies(
flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES)),
@ -7017,6 +7020,7 @@ ACTOR Future<bool> checkSafeExclusions(Database cx, std::vector<AddressExclusion
throw;
}
TraceEvent("ExclusionSafetyCheckCoordinators").log();
wait(cx->getConnectionRecord()->resolveHostnames());
state ClientCoordinators coordinatorList(cx->getConnectionRecord());
state std::vector<Future<Optional<LeaderInfo>>> leaderServers;
leaderServers.reserve(coordinatorList.clientLeaderServers.size());

View File

@ -96,6 +96,7 @@ LeaderElectionRegInterface::LeaderElectionRegInterface(INetwork* local) : Client
}
ServerCoordinators::ServerCoordinators(Reference<IClusterConnectionRecord> ccr) : ClientCoordinators(ccr) {
ASSERT(ccr->connectionStringStatus() == ClusterConnectionString::RESOLVED);
ClusterConnectionString cs = ccr->getConnectionString();
for (auto s = cs.coordinators().begin(); s != cs.coordinators().end(); ++s) {
leaderElectionServers.emplace_back(*s);
@ -205,8 +206,11 @@ ACTOR Future<Void> openDatabase(ClientData* db,
int* clientCount,
Reference<AsyncVar<bool>> hasConnectedClients,
OpenDatabaseCoordRequest req,
Future<Void> checkStuck) {
Future<Void> checkStuck,
Reference<AsyncVar<Void>> coordinatorsChanged) {
state ErrorOr<CachedSerialization<ClientDBInfo>> replyContents;
state Future<Void> coordinatorsChangedOnChange = coordinatorsChanged->onChange();
state Future<Void> clientInfoOnChange = db->clientInfo->onChange();
++(*clientCount);
hasConnectedClients->set(true);
@ -223,7 +227,15 @@ ACTOR Future<Void> openDatabase(ClientData* db,
replyContents = failed_to_progress();
break;
}
when(wait(yieldedFuture(db->clientInfo->onChange()))) { replyContents = db->clientInfo->get(); }
when(wait(yieldedFuture(clientInfoOnChange))) {
clientInfoOnChange = db->clientInfo->onChange();
replyContents = db->clientInfo->get();
}
when(wait(coordinatorsChangedOnChange)) {
coordinatorsChangedOnChange = coordinatorsChanged->onChange();
replyContents = coordinators_changed();
break;
}
when(wait(delayJittered(SERVER_KNOBS->CLIENT_REGISTER_INTERVAL))) {
if (db->clientInfo->get().read().id.isValid()) {
replyContents = db->clientInfo->get();
@ -254,18 +266,33 @@ ACTOR Future<Void> openDatabase(ClientData* db,
ACTOR Future<Void> remoteMonitorLeader(int* clientCount,
Reference<AsyncVar<bool>> hasConnectedClients,
Reference<AsyncVar<Optional<LeaderInfo>>> currentElectedLeader,
ElectionResultRequest req) {
ElectionResultRequest req,
Reference<AsyncVar<Void>> coordinatorsChanged) {
state bool coordinatorsChangeDetected = false;
state Future<Void> coordinatorsChangedOnChange = coordinatorsChanged->onChange();
state Future<Void> currentElectedLeaderOnChange = currentElectedLeader->onChange();
++(*clientCount);
hasConnectedClients->set(true);
while (!currentElectedLeader->get().present() || req.knownLeader == currentElectedLeader->get().get().changeID) {
choose {
when(wait(yieldedFuture(currentElectedLeader->onChange()))) {}
when(wait(yieldedFuture(currentElectedLeaderOnChange))) {
currentElectedLeaderOnChange = currentElectedLeader->onChange();
}
when(wait(coordinatorsChangedOnChange)) {
coordinatorsChangedOnChange = coordinatorsChanged->onChange();
coordinatorsChangeDetected = true;
break;
}
when(wait(delayJittered(SERVER_KNOBS->CLIENT_REGISTER_INTERVAL))) { break; }
}
}
if (coordinatorsChangeDetected) {
req.reply.sendError(coordinators_changed());
} else {
req.reply.send(currentElectedLeader->get());
}
if (--(*clientCount) == 0) {
hasConnectedClients->set(false);
@ -296,6 +323,9 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
state Reference<AsyncVar<Optional<LeaderInfo>>> currentElectedLeader =
makeReference<AsyncVar<Optional<LeaderInfo>>>();
state LivenessChecker canConnectToLeader(SERVER_KNOBS->COORDINATOR_LEADER_CONNECTION_TIMEOUT);
state Reference<AsyncVar<Void>> coordinatorsChanged = makeReference<AsyncVar<Void>>();
state Future<Void> coordinatorsChangedOnChange = coordinatorsChanged->onChange();
state Future<Void> hasConnectedClientsOnChange = hasConnectedClients->onChange();
loop choose {
when(OpenDatabaseCoordRequest req = waitNext(interf.openDatabase.getFuture())) {
@ -306,10 +336,14 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
} else {
if (!leaderMon.isValid()) {
leaderMon = monitorLeaderAndGetClientInfo(
req.clusterKey, req.coordinators, &clientData, currentElectedLeader);
req.clusterKey, req.coordinators, &clientData, currentElectedLeader, coordinatorsChanged);
}
actors.add(
openDatabase(&clientData, &clientCount, hasConnectedClients, req, canConnectToLeader.checkStuck()));
actors.add(openDatabase(&clientData,
&clientCount,
hasConnectedClients,
req,
canConnectToLeader.checkStuck(),
coordinatorsChanged));
}
}
when(ElectionResultRequest req = waitNext(interf.electionResult.getFuture())) {
@ -318,10 +352,11 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
req.reply.send(currentElectedLeader->get());
} else {
if (!leaderMon.isValid()) {
leaderMon =
monitorLeaderAndGetClientInfo(req.key, req.coordinators, &clientData, currentElectedLeader);
leaderMon = monitorLeaderAndGetClientInfo(
req.key, req.coordinators, &clientData, currentElectedLeader, coordinatorsChanged);
}
actors.add(remoteMonitorLeader(&clientCount, hasConnectedClients, currentElectedLeader, req));
actors.add(remoteMonitorLeader(
&clientCount, hasConnectedClients, currentElectedLeader, req, coordinatorsChanged));
}
}
when(GetLeaderRequest req = waitNext(interf.getLeader.getFuture())) {
@ -454,13 +489,18 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
notify.pop_front();
}
}
when(wait(hasConnectedClients->onChange())) {
when(wait(hasConnectedClientsOnChange)) {
hasConnectedClientsOnChange = hasConnectedClients->onChange();
if (!hasConnectedClients->get() && !nextInterval.isValid()) {
TraceEvent("LeaderRegisterUnneeded").detail("Key", key);
return Void();
}
}
when(wait(actors.getResult())) {}
when(wait(coordinatorsChangedOnChange)) {
leaderMon = Future<Void>();
coordinatorsChangedOnChange = coordinatorsChanged->onChange();
}
}
}

View File

@ -5060,7 +5060,8 @@ Future<Void> DDTeamCollection::printSnapshotTeamsInfo(Reference<DDTeamCollection
return DDTeamCollectionImpl::printSnapshotTeamsInfo(self);
}
std::unique_ptr<DDTeamCollection> testTeamCollection(int teamSize,
class DDTeamCollectionUnitTest {
static std::unique_ptr<DDTeamCollection> testTeamCollection(int teamSize,
Reference<IReplicationPolicy> policy,
int processCount) {
Database database = DatabaseContext::create(
@ -5104,7 +5105,7 @@ std::unique_ptr<DDTeamCollection> testTeamCollection(int teamSize,
return collection;
}
std::unique_ptr<DDTeamCollection> testMachineTeamCollection(int teamSize,
static std::unique_ptr<DDTeamCollection> testMachineTeamCollection(int teamSize,
Reference<IReplicationPolicy> policy,
int processCount) {
Database database = DatabaseContext::create(
@ -5164,7 +5165,8 @@ std::unique_ptr<DDTeamCollection> testMachineTeamCollection(int teamSize,
return collection;
}
TEST_CASE("DataDistribution/AddTeamsBestOf/UseMachineID") {
public:
ACTOR static Future<Void> AddTeamsBestOf_UseMachineID() {
wait(Future<Void>(Void()));
int teamSize = 3; // replication size
@ -5183,7 +5185,7 @@ TEST_CASE("DataDistribution/AddTeamsBestOf/UseMachineID") {
return Void();
}
TEST_CASE("DataDistribution/AddTeamsBestOf/NotUseMachineID") {
ACTOR static Future<Void> AddTeamsBestOf_NotUseMachineID() {
wait(Future<Void>(Void()));
int teamSize = 3; // replication size
@ -5207,9 +5209,9 @@ TEST_CASE("DataDistribution/AddTeamsBestOf/NotUseMachineID") {
return Void();
}
TEST_CASE("DataDistribution/AddAllTeams/isExhaustive") {
Reference<IReplicationPolicy> policy =
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
ACTOR static Future<Void> AddAllTeams_isExhaustive() {
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(
new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
state int processSize = 10;
state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
@ -5225,9 +5227,9 @@ TEST_CASE("DataDistribution/AddAllTeams/isExhaustive") {
return Void();
}
TEST_CASE("/DataDistribution/AddAllTeams/withLimit") {
Reference<IReplicationPolicy> policy =
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
ACTOR static Future<Void> AddAllTeams_withLimit() {
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(
new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
state int processSize = 10;
state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
@ -5241,10 +5243,10 @@ TEST_CASE("/DataDistribution/AddAllTeams/withLimit") {
return Void();
}
TEST_CASE("/DataDistribution/AddTeamsBestOf/SkippingBusyServers") {
ACTOR static Future<Void> AddTeamsBestOf_SkippingBusyServers() {
wait(Future<Void>(Void()));
Reference<IReplicationPolicy> policy =
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(
new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
state int processSize = 10;
state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
@ -5271,11 +5273,11 @@ TEST_CASE("/DataDistribution/AddTeamsBestOf/SkippingBusyServers") {
// Due to the randomness in choosing the machine team and the server team from the machine team, it is possible that
// we may not find the remaining several (e.g., 1 or 2) available teams.
// It is hard to conclude what is the minimum number of teams the addTeamsBestOf() should create in this situation.
TEST_CASE("/DataDistribution/AddTeamsBestOf/NotEnoughServers") {
ACTOR static Future<Void> AddTeamsBestOf_NotEnoughServers() {
wait(Future<Void>(Void()));
Reference<IReplicationPolicy> policy =
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(
new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
state int processSize = 5;
state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
@ -5297,7 +5299,8 @@ TEST_CASE("/DataDistribution/AddTeamsBestOf/NotEnoughServers") {
// When we chnage the selectReplicas function to achieve such guarantee, we can enable the following ASSERT
ASSERT(collection->machineTeams.size() == 10); // Should create all machine teams
// We need to guarantee a server always have at least a team so that the server can participate in data distribution
// We need to guarantee a server always have at least a team so that the server can participate in data
// distribution
for (auto process = collection->server_info.begin(); process != collection->server_info.end(); process++) {
auto teamCount = process->second->getTeams().size();
ASSERT(teamCount >= 1);
@ -5309,10 +5312,10 @@ TEST_CASE("/DataDistribution/AddTeamsBestOf/NotEnoughServers") {
return Void();
}
TEST_CASE("/DataDistribution/GetTeam/NewServersNotNeeded") {
ACTOR static Future<Void> GetTeam_NewServersNotNeeded() {
Reference<IReplicationPolicy> policy =
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(
new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
state int processSize = 5;
state int teamSize = 3;
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(teamSize, policy, processSize);
@ -5366,10 +5369,9 @@ TEST_CASE("/DataDistribution/GetTeam/NewServersNotNeeded") {
return Void();
}
TEST_CASE("/DataDistribution/GetTeam/HealthyCompleteSource") {
Reference<IReplicationPolicy> policy =
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
ACTOR static Future<Void> GetTeam_HealthyCompleteSource() {
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(
new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
state int processSize = 5;
state int teamSize = 3;
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(teamSize, policy, processSize);
@ -5424,10 +5426,10 @@ TEST_CASE("/DataDistribution/GetTeam/HealthyCompleteSource") {
return Void();
}
TEST_CASE("/DataDistribution/GetTeam/TrueBestLeastUtilized") {
ACTOR static Future<Void> GetTeam_TrueBestLeastUtilized() {
Reference<IReplicationPolicy> policy =
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(
new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
state int processSize = 5;
state int teamSize = 3;
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(teamSize, policy, processSize);
@ -5479,10 +5481,10 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestLeastUtilized") {
return Void();
}
TEST_CASE("/DataDistribution/GetTeam/TrueBestMostUtilized") {
ACTOR static Future<Void> GetTeam_TrueBestMostUtilized() {
Reference<IReplicationPolicy> policy =
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(
new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
state int processSize = 5;
state int teamSize = 3;
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(teamSize, policy, processSize);
@ -5534,10 +5536,9 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestMostUtilized") {
return Void();
}
TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationBelowCutoff") {
Reference<IReplicationPolicy> policy =
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
ACTOR static Future<Void> GetTeam_ServerUtilizationBelowCutoff() {
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(
new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
state int processSize = 5;
state int teamSize = 3;
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(teamSize, policy, processSize);
@ -5587,18 +5588,19 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationBelowCutoff") {
return Void();
}
TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationNearCutoff") {
Reference<IReplicationPolicy> policy =
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
ACTOR static Future<Void> GetTeam_ServerUtilizationNearCutoff() {
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(
new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
state int processSize = 5;
state int teamSize = 3;
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(teamSize, policy, processSize);
GetStorageMetricsReply low_avail;
if (SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO > 0) {
/* Pick a capacity where MIN_AVAILABLE_SPACE_RATIO of the capacity would be higher than MIN_AVAILABLE_SPACE */
low_avail.capacity.bytes = SERVER_KNOBS->MIN_AVAILABLE_SPACE * (2 / SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO);
/* Pick a capacity where MIN_AVAILABLE_SPACE_RATIO of the capacity would be higher than MIN_AVAILABLE_SPACE
*/
low_avail.capacity.bytes =
SERVER_KNOBS->MIN_AVAILABLE_SPACE * (2 / SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO);
} else {
low_avail.capacity.bytes = 2000 * 1024 * 1024;
}
@ -5650,3 +5652,64 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationNearCutoff") {
return Void();
}
};
TEST_CASE("DataDistribution/AddTeamsBestOf/UseMachineID") {
wait(DDTeamCollectionUnitTest::AddTeamsBestOf_UseMachineID());
return Void();
}
TEST_CASE("DataDistribution/AddTeamsBestOf/NotUseMachineID") {
wait(DDTeamCollectionUnitTest::AddTeamsBestOf_NotUseMachineID());
return Void();
}
TEST_CASE("DataDistribution/AddAllTeams/isExhaustive") {
wait(DDTeamCollectionUnitTest::AddAllTeams_isExhaustive());
return Void();
}
TEST_CASE("/DataDistribution/AddAllTeams/withLimit") {
wait(DDTeamCollectionUnitTest::AddAllTeams_withLimit());
return Void();
}
TEST_CASE("/DataDistribution/AddTeamsBestOf/SkippingBusyServers") {
wait(DDTeamCollectionUnitTest::AddTeamsBestOf_SkippingBusyServers());
return Void();
}
TEST_CASE("/DataDistribution/AddTeamsBestOf/NotEnoughServers") {
wait(DDTeamCollectionUnitTest::AddTeamsBestOf_NotEnoughServers());
return Void();
}
TEST_CASE("/DataDistribution/GetTeam/NewServersNotNeeded") {
wait(DDTeamCollectionUnitTest::GetTeam_NewServersNotNeeded());
return Void();
}
TEST_CASE("/DataDistribution/GetTeam/HealthyCompleteSource") {
wait(DDTeamCollectionUnitTest::GetTeam_HealthyCompleteSource());
return Void();
}
TEST_CASE("/DataDistribution/GetTeam/TrueBestLeastUtilized") {
wait(DDTeamCollectionUnitTest::GetTeam_TrueBestLeastUtilized());
return Void();
}
TEST_CASE("/DataDistribution/GetTeam/TrueBestMostUtilized") {
wait(DDTeamCollectionUnitTest::GetTeam_TrueBestMostUtilized());
return Void();
}
TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationBelowCutoff") {
wait(DDTeamCollectionUnitTest::GetTeam_ServerUtilizationBelowCutoff());
return Void();
}
TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationNearCutoff") {
wait(DDTeamCollectionUnitTest::GetTeam_ServerUtilizationNearCutoff());
return Void();
}

View File

@ -171,6 +171,7 @@ typedef AsyncMap<UID, ServerStatus> ServerStatusMap;
class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
friend class DDTeamCollectionImpl;
friend class DDTeamCollectionUnitTest;
enum class Status { NONE = 0, WIGGLING = 1, EXCLUDED = 2, FAILED = 3 };
@ -521,6 +522,37 @@ class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
void noHealthyTeams() const;
// To enable verbose debug info, set shouldPrint to true
void traceAllInfo(bool shouldPrint = false) const;
// Check if the server belongs to a machine; if not, create the machine.
// Establish the two-direction link between server and machine
Reference<TCMachineInfo> checkAndCreateMachine(Reference<TCServerInfo> server);
// Group storage servers (process) based on their machineId in LocalityData
// All created machines are healthy
// Return The number of healthy servers we grouped into machines
int constructMachinesFromServers();
// Create machineTeamsToBuild number of machine teams
// No operation if machineTeamsToBuild is 0
// Note: The creation of machine teams should not depend on server teams:
// No matter how server teams will be created, we will create the same set of machine teams;
// We should never use server team number in building machine teams.
//
// Five steps to create each machine team, which are document in the function
// Reuse ReplicationPolicy selectReplicas func to select machine team
// return number of added machine teams
int addBestMachineTeams(int machineTeamsToBuild);
// Sanity check the property of teams in unit test
// Return true if all server teams belong to machine teams
bool sanityCheckTeams() const;
void disableBuildingTeams() { doBuildTeams = false; }
void setCheckTeamDelay() { this->checkTeamDelay = Void(); }
public:
Database cx;
@ -595,39 +627,6 @@ public:
void addTeam(std::set<UID> const& team, bool isInitialTeam) { addTeam(team.begin(), team.end(), isInitialTeam); }
// FIXME: Public for testing only
void disableBuildingTeams() { doBuildTeams = false; }
// FIXME: Public for testing only
void setCheckTeamDelay() { this->checkTeamDelay = Void(); }
// FIXME: Public for testing only
// Group storage servers (process) based on their machineId in LocalityData
// All created machines are healthy
// Return The number of healthy servers we grouped into machines
int constructMachinesFromServers();
// FIXME: Public for testing only
// To enable verbose debug info, set shouldPrint to true
void traceAllInfo(bool shouldPrint = false) const;
// FIXME: Public for testing only
// Create machineTeamsToBuild number of machine teams
// No operation if machineTeamsToBuild is 0
// Note: The creation of machine teams should not depend on server teams:
// No matter how server teams will be created, we will create the same set of machine teams;
// We should never use server team number in building machine teams.
//
// Five steps to create each machine team, which are document in the function
// Reuse ReplicationPolicy selectReplicas func to select machine team
// return number of added machine teams
int addBestMachineTeams(int machineTeamsToBuild);
// FIXME: Public for testing only
// Sanity check the property of teams in unit test
// Return true if all server teams belong to machine teams
bool sanityCheckTeams() const;
// Create server teams based on machine teams
// Before the number of machine teams reaches the threshold, build a machine team for each server team
// When it reaches the threshold, first try to build a server team with existing machine teams; if failed,
@ -642,11 +641,6 @@ public:
bool removeTeam(Reference<TCTeamInfo> team);
// FIXME: Public for testing only
// Check if the server belongs to a machine; if not, create the machine.
// Establish the two-direction link between server and machine
Reference<TCMachineInfo> checkAndCreateMachine(Reference<TCServerInfo> server);
void removeTSS(UID removedServer);
void removeServer(UID removedServer);

View File

@ -158,8 +158,9 @@ ACTOR Future<std::vector<WorkerInterface>> getCoordWorkers(Database cx,
if (!coordinators.present()) {
throw operation_failed();
}
std::vector<NetworkAddress> coordinatorsAddr =
ClusterConnectionString(coordinators.get().toString()).coordinators();
state ClusterConnectionString ccs(coordinators.get().toString());
wait(ccs.resolveHostnames());
std::vector<NetworkAddress> coordinatorsAddr = ccs.coordinators();
std::set<NetworkAddress> coordinatorsAddrSet;
for (const auto& addr : coordinatorsAddr) {
TraceEvent(SevDebug, "CoordinatorAddress").detail("Addr", addr);

View File

@ -1905,8 +1905,8 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
TEST(useIPv6); // Use IPv6
TEST(!useIPv6); // Use IPv4
// TODO(renxuan): Use hostname 25% of the time, unless it is disabled
bool useHostname = false; // !testConfig.disableHostname && deterministicRandom()->random01() < 0.25;
// Use hostname 25% of the time, unless it is disabled
bool useHostname = !testConfig.disableHostname && deterministicRandom()->random01() < 0.25;
TEST(useHostname); // Use hostname
TEST(!useHostname); // Use IP address
NetworkAddressFromHostname fromHostname =

View File

@ -833,6 +833,7 @@ std::pair<NetworkAddressList, NetworkAddressList> buildNetworkAddresses(
NetworkAddressList publicNetworkAddresses;
NetworkAddressList listenNetworkAddresses;
connectionRecord.resolveHostnamesBlocking();
auto& coordinators = connectionRecord.getConnectionString().coordinators();
ASSERT(coordinators.size() > 0);
@ -1022,6 +1023,29 @@ struct CLIOptions {
return opts;
}
// Determine publicAddresses and listenAddresses by calling buildNetworkAddresses().
void buildNetwork(const char* name) {
try {
if (!publicAddressStrs.empty()) {
std::tie(publicAddresses, listenAddresses) =
buildNetworkAddresses(*connectionFile, publicAddressStrs, listenAddressStrs);
}
} catch (Error&) {
printHelpTeaser(name);
flushAndExit(FDB_EXIT_ERROR);
}
if (role == ServerRole::ConsistencyCheck) {
if (!publicAddressStrs.empty()) {
fprintf(stderr, "ERROR: Public address cannot be specified for consistency check processes\n");
printHelpTeaser(name);
flushAndExit(FDB_EXIT_ERROR);
}
auto publicIP = determinePublicIPAutomatically(connectionFile->getConnectionString());
publicAddresses.address = NetworkAddress(publicIP, ::getpid());
}
}
private:
CLIOptions() = default;
@ -1594,26 +1618,6 @@ private:
// failmon?
}
try {
if (!publicAddressStrs.empty()) {
std::tie(publicAddresses, listenAddresses) =
buildNetworkAddresses(*connectionFile, publicAddressStrs, listenAddressStrs);
}
} catch (Error&) {
printHelpTeaser(argv[0]);
flushAndExit(FDB_EXIT_ERROR);
}
if (role == ServerRole::ConsistencyCheck) {
if (!publicAddressStrs.empty()) {
fprintf(stderr, "ERROR: Public address cannot be specified for consistency check processes\n");
printHelpTeaser(argv[0]);
flushAndExit(FDB_EXIT_ERROR);
}
auto publicIP = determinePublicIPAutomatically(connectionFile->getConnectionString());
publicAddresses.address = NetworkAddress(publicIP, ::getpid());
}
if (role == ServerRole::Simulation) {
Optional<bool> buggifyOverride = checkBuggifyOverride(testFile);
if (buggifyOverride.present())
@ -1692,7 +1696,7 @@ int main(int argc, char* argv[]) {
//_set_output_format(_TWO_DIGIT_EXPONENT);
#endif
const auto opts = CLIOptions::parseArgs(argc, argv);
auto opts = CLIOptions::parseArgs(argc, argv);
const auto role = opts.role;
#ifdef _WIN32
@ -1787,6 +1791,7 @@ int main(int argc, char* argv[]) {
if (role == ServerRole::Simulation || role == ServerRole::CreateTemplateDatabase) {
// startOldSimulator();
opts.buildNetwork(argv[0]);
startNewSimulator(opts.printSimTime);
openTraceFile(NetworkAddress(), opts.rollsize, opts.maxLogsSize, opts.logFolder, "trace", opts.logGroup);
openTracer(TracerType(deterministicRandom()->randomInt(static_cast<int>(TracerType::DISABLED),
@ -1795,6 +1800,7 @@ int main(int argc, char* argv[]) {
g_network = newNet2(opts.tlsConfig, opts.useThreadPool, true);
g_network->addStopCallback(Net2FileSystem::stop);
FlowTransport::createInstance(false, 1, WLTOKEN_RESERVED_COUNT);
opts.buildNetwork(argv[0]);
const bool expectsPublicAddress =
(role == ServerRole::FDBD || role == ServerRole::NetworkTestServer || role == ServerRole::Restore);

View File

@ -2308,10 +2308,11 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
ACTOR Future<Void> extractClusterInterface(Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> in,
Reference<AsyncVar<Optional<ClusterInterface>>> out) {
loop {
if (in->get().present())
if (in->get().present()) {
out->set(in->get().get().clientInterface);
else
} else {
out->set(Optional<ClusterInterface>());
}
wait(in->onChange());
}
}
@ -2509,9 +2510,14 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderWithDelayedCandidacyImplOneGenerati
}
successIndex = index;
} else {
if (leader.isError() && leader.getError().code() == error_code_coordinators_changed) {
info.intermediateConnRecord->getConnectionString().resetToUnresolved();
throw coordinators_changed();
}
index = (index + 1) % addrs.size();
if (index == successIndex) {
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));
throw coordinators_changed();
}
}
}
@ -2519,11 +2525,22 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderWithDelayedCandidacyImplOneGenerati
ACTOR Future<Void> monitorLeaderWithDelayedCandidacyImplInternal(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Value>> outSerializedLeaderInfo) {
wait(connRecord->resolveHostnames());
state MonitorLeaderInfo info(connRecord);
loop {
try {
wait(info.intermediateConnRecord->resolveHostnames());
MonitorLeaderInfo _info =
wait(monitorLeaderWithDelayedCandidacyImplOneGeneration(connRecord, outSerializedLeaderInfo, info));
info = _info;
} catch (Error& e) {
if (e.code() == error_code_coordinators_changed) {
TraceEvent("MonitorLeaderWithDelayedCandidacyCoordinatorsChanged").suppressFor(1.0);
info.intermediateConnRecord->getConnectionString().resetToUnresolved();
} else {
throw e;
}
}
}
}
@ -2657,6 +2674,7 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
actors.push_back(serveProcess());
try {
wait(connRecord->resolveHostnames());
ServerCoordinators coordinators(connRecord);
if (g_network->isSimulated()) {
whitelistBinPaths = ",, random_path, /bin/snap_create.sh,,";

View File

@ -926,10 +926,11 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
wait(tx->get(LiteralStringRef("processes")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators"))));
ASSERT(coordinator_processes_key.present());
std::vector<std::string> process_addresses;
state std::vector<std::string> process_addresses;
boost::split(
process_addresses, coordinator_processes_key.get().toString(), [](char c) { return c == ','; });
ASSERT(process_addresses.size() == cs.coordinators().size());
ASSERT(process_addresses.size() == cs.coordinators().size() + cs.hostnames.size());
wait(cs.resolveHostnames());
// compare the coordinator process network addresses one by one
for (const auto& network_address : cs.coordinators()) {
ASSERT(std::find(process_addresses.begin(), process_addresses.end(), network_address.toString()) !=
@ -970,16 +971,15 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
old_coordinators_processes, processes_key.get().toString(), [](char c) { return c == ','; });
// pick up one non-coordinator process if possible
std::vector<ProcessData> workers = wait(getWorkers(&tx->getTransaction()));
std::string old_coordinators_processes_string = describe(old_coordinators_processes);
TraceEvent(SevDebug, "CoordinatorsManualChange")
.detail("OldCoordinators", describe(old_coordinators_processes))
.detail("OldCoordinators", old_coordinators_processes_string)
.detail("WorkerSize", workers.size());
if (workers.size() > old_coordinators_processes.size()) {
loop {
auto worker = deterministicRandom()->randomChoice(workers);
new_coordinator_process = worker.address.toString();
if (std::find(old_coordinators_processes.begin(),
old_coordinators_processes.end(),
worker.address.toString()) == old_coordinators_processes.end()) {
if (old_coordinators_processes_string.find(new_coordinator_process) == std::string::npos) {
break;
}
}
@ -1049,10 +1049,11 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> res = wait(tx->get(coordinatorsKey));
ASSERT(res.present()); // Otherwise, database is in a bad state
ClusterConnectionString cs(res.get().toString());
ASSERT(cs.coordinators().size() == old_coordinators_processes.size() + 1);
state ClusterConnectionString csNew(res.get().toString());
wait(csNew.resolveHostnames());
ASSERT(csNew.coordinators().size() == old_coordinators_processes.size() + 1);
// verify the coordinators' addresses
for (const auto& network_address : cs.coordinators()) {
for (const auto& network_address : csNew.coordinators()) {
std::string address_str = network_address.toString();
ASSERT(std::find(old_coordinators_processes.begin(),
old_coordinators_processes.end(),
@ -1060,7 +1061,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
new_coordinator_process == address_str);
}
// verify the cluster decription
ASSERT(new_cluster_description == cs.clusterKeyName().toString());
ASSERT(new_cluster_description == csNew.clusterKeyName().toString());
tx->reset();
} catch (Error& e) {
wait(tx->onError(e));

View File

@ -20,12 +20,14 @@
#include "flow/StreamCipher.h"
#include "flow/Arena.h"
#include "flow/IRandom.h"
#include "flow/ITrace.h"
#include "flow/UnitTest.h"
#include <memory>
std::unordered_set<EVP_CIPHER_CTX*> StreamCipher::ctxs;
std::unordered_set<StreamCipherKey*> StreamCipherKey::cipherKeys;
UID StreamCipherKey::globalKeyId;
std::unordered_map<UID, EVP_CIPHER_CTX*> StreamCipher::ctxs;
std::unordered_map<UID, StreamCipherKey*> StreamCipherKey::cipherKeys;
std::unique_ptr<StreamCipherKey> StreamCipherKey::globalKey;
bool StreamCipherKey::isGlobalKeyPresent() {
@ -36,8 +38,9 @@ void StreamCipherKey::allocGlobalCipherKey() {
if (StreamCipherKey::isGlobalKeyPresent()) {
return;
}
StreamCipherKey::globalKeyId = deterministicRandom()->randomUniqueID();
StreamCipherKey::globalKey = std::make_unique<StreamCipherKey>(AES_256_KEY_LENGTH);
StreamCipherKey::cipherKeys.insert(StreamCipherKey::globalKey.get());
StreamCipherKey::cipherKeys[StreamCipherKey::globalKeyId] = StreamCipherKey::globalKey.get();
}
void StreamCipherKey::initializeGlobalRandomTestKey() {
@ -56,8 +59,8 @@ StreamCipherKey const* StreamCipherKey::getGlobalCipherKey() {
}
void StreamCipherKey::cleanup() noexcept {
for (auto cipherKey : cipherKeys) {
cipherKey->reset();
for (const auto& itr : cipherKeys) {
itr.second->reset();
}
}
@ -67,31 +70,33 @@ void StreamCipherKey::initializeKey(uint8_t* data, int len) {
memcpy(arr.get(), data, copyLen);
}
StreamCipherKey::StreamCipherKey(int size) : arr(std::make_unique<uint8_t[]>(size)), keySize(size) {
StreamCipherKey::StreamCipherKey(int size)
: id(deterministicRandom()->randomUniqueID()), arr(std::make_unique<uint8_t[]>(size)), keySize(size) {
memset(arr.get(), 0, keySize);
cipherKeys.insert(this);
cipherKeys[id] = this;
}
StreamCipherKey::~StreamCipherKey() {
reset();
cipherKeys.erase(this);
cipherKeys.erase(this->id);
}
StreamCipher::StreamCipher(int keySize)
: ctx(EVP_CIPHER_CTX_new()), hmacCtx(HMAC_CTX_new()), cipherKey(std::make_unique<StreamCipherKey>(keySize)) {
ctxs.insert(ctx);
: id(deterministicRandom()->randomUniqueID()), ctx(EVP_CIPHER_CTX_new()), hmacCtx(HMAC_CTX_new()),
cipherKey(std::make_unique<StreamCipherKey>(keySize)) {
ctxs[id] = ctx;
}
StreamCipher::StreamCipher()
: ctx(EVP_CIPHER_CTX_new()), hmacCtx(HMAC_CTX_new()),
: id(deterministicRandom()->randomUniqueID()), ctx(EVP_CIPHER_CTX_new()), hmacCtx(HMAC_CTX_new()),
cipherKey(std::make_unique<StreamCipherKey>(AES_256_KEY_LENGTH)) {
ctxs.insert(ctx);
ctxs[id] = ctx;
}
StreamCipher::~StreamCipher() {
HMAC_CTX_free(hmacCtx);
EVP_CIPHER_CTX_free(ctx);
ctxs.erase(ctx);
ctxs.erase(id);
}
EVP_CIPHER_CTX* StreamCipher::getCtx() {
@ -103,8 +108,8 @@ HMAC_CTX* StreamCipher::getHmacCtx() {
}
void StreamCipher::cleanup() noexcept {
for (auto ctx : ctxs) {
EVP_CIPHER_CTX_free(ctx);
for (auto itr : ctxs) {
EVP_CIPHER_CTX_free(itr.second);
}
}

View File

@ -44,8 +44,10 @@
// Wrapper class for openssl implementation of AES GCM
// encryption/decryption
class StreamCipherKey : NonCopyable {
static UID globalKeyId;
static std::unique_ptr<StreamCipherKey> globalKey;
static std::unordered_set<StreamCipherKey*> cipherKeys;
static std::unordered_map<UID, StreamCipherKey*> cipherKeys;
UID id;
std::unique_ptr<uint8_t[]> arr;
int keySize;
@ -67,7 +69,8 @@ public:
};
class StreamCipher final : NonCopyable {
static std::unordered_set<EVP_CIPHER_CTX*> ctxs;
UID id;
static std::unordered_map<UID, EVP_CIPHER_CTX*> ctxs;
EVP_CIPHER_CTX* ctx;
HMAC_CTX* hmacCtx;
std::unique_ptr<StreamCipherKey> cipherKey;