Merge pull request #2941 from etschannen/feature-tree-broadcast

Implemented a tree broadcast for the proxy and cluster controller
This commit is contained in:
Evan Tschannen 2020-04-29 13:38:09 -07:00 committed by GitHub
commit 627eb93dd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 769 additions and 661 deletions

View File

@ -494,11 +494,16 @@ Optional<ValueRef> DatabaseConfiguration::get( KeyRef key ) const {
}
}
bool DatabaseConfiguration::isExcludedServer( NetworkAddress a ) const {
return get( encodeExcludedServersKey( AddressExclusion(a.ip, a.port) ) ).present() ||
get( encodeExcludedServersKey( AddressExclusion(a.ip) ) ).present() ||
get( encodeFailedServersKey( AddressExclusion(a.ip, a.port) ) ).present() ||
get( encodeFailedServersKey( AddressExclusion(a.ip) ) ).present();
bool DatabaseConfiguration::isExcludedServer( NetworkAddressList a ) const {
return get( encodeExcludedServersKey( AddressExclusion(a.address.ip, a.address.port) ) ).present() ||
get( encodeExcludedServersKey( AddressExclusion(a.address.ip) ) ).present() ||
get( encodeFailedServersKey( AddressExclusion(a.address.ip, a.address.port) ) ).present() ||
get( encodeFailedServersKey( AddressExclusion(a.address.ip) ) ).present() ||
( a.secondaryAddress.present() && (
get( encodeExcludedServersKey( AddressExclusion(a.secondaryAddress.get().ip, a.secondaryAddress.get().port) ) ).present() ||
get( encodeExcludedServersKey( AddressExclusion(a.secondaryAddress.get().ip) ) ).present() ||
get( encodeFailedServersKey( AddressExclusion(a.secondaryAddress.get().ip, a.secondaryAddress.get().port) ) ).present() ||
get( encodeFailedServersKey( AddressExclusion(a.secondaryAddress.get().ip) ) ).present() ) );
}
std::set<AddressExclusion> DatabaseConfiguration::getExcludedServers() const {
const_cast<DatabaseConfiguration*>(this)->makeConfigurationImmutable();

View File

@ -187,7 +187,7 @@ struct DatabaseConfiguration {
std::vector<RegionInfo> regions;
// Excluded servers (no state should be here)
bool isExcludedServer( NetworkAddress ) const;
bool isExcludedServer( NetworkAddressList ) const;
std::set<AddressExclusion> getExcludedServers() const;
int32_t getDesiredProxies() const { if(masterProxyCount == -1) return autoMasterProxyCount; return masterProxyCount; }

View File

@ -278,11 +278,12 @@ struct TxnStateRequest {
VectorRef<KeyValueRef> data;
Sequence sequence;
bool last;
std::vector<Endpoint> broadcastInfo;
ReplyPromise<Void> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, data, sequence, last, reply, arena);
serializer(ar, data, sequence, last, broadcastInfo, reply, arena);
}
};

View File

@ -74,6 +74,7 @@ struct StorageServerInterface {
explicit StorageServerInterface(UID uid) : uniqueID( uid ) {}
StorageServerInterface() : uniqueID( deterministicRandom()->randomUniqueID() ) {}
NetworkAddress address() const { return getValue.getEndpoint().getPrimaryAddress(); }
NetworkAddress stableAddress() const { return getValue.getEndpoint().getStableAddress(); }
Optional<NetworkAddress> secondaryAddress() const { return getValue.getEndpoint().addresses.secondaryAddress; }
UID id() const { return uniqueID; }
std::string toString() const { return id().shortString(); }

View File

@ -208,6 +208,7 @@ public:
Int64MetricHandle countConnClosedWithoutError;
std::map<NetworkAddress, std::pair<uint64_t, double>> incompatiblePeers;
AsyncTrigger incompatiblePeersChanged;
uint32_t numIncompatibleConnections;
std::map<uint64_t, double> multiVersionConnections;
double lastIncompatibleMessage;
@ -1122,10 +1123,14 @@ bool TransportData::isLocalAddress(const NetworkAddress& address) const {
ACTOR static Future<Void> multiVersionCleanupWorker( TransportData* self ) {
loop {
wait(delay(FLOW_KNOBS->CONNECTION_CLEANUP_DELAY));
bool foundIncompatible = false;
for(auto it = self->incompatiblePeers.begin(); it != self->incompatiblePeers.end();) {
if( self->multiVersionConnections.count(it->second.first) ) {
it = self->incompatiblePeers.erase(it);
} else {
if( now() - it->second.second > FLOW_KNOBS->INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING ) {
foundIncompatible = true;
}
it++;
}
}
@ -1137,6 +1142,10 @@ ACTOR static Future<Void> multiVersionCleanupWorker( TransportData* self ) {
it++;
}
}
if(foundIncompatible) {
self->incompatiblePeersChanged.trigger();
}
}
}
@ -1167,6 +1176,10 @@ std::map<NetworkAddress, std::pair<uint64_t, double>>* FlowTransport::getIncompa
return &self->incompatiblePeers;
}
Future<Void> FlowTransport::onIncompatibleChanged() {
return self->incompatiblePeersChanged.onTrigger();
}
Future<Void> FlowTransport::bind( NetworkAddress publicAddress, NetworkAddress listenAddress ) {
ASSERT( publicAddress.isPublic() );
if(self->localAddresses.address == NetworkAddress()) {

View File

@ -45,7 +45,9 @@ public:
}
void choosePrimaryAddress() {
if(addresses.secondaryAddress.present() && !g_network->getLocalAddresses().secondaryAddress.present() && (addresses.address.isTLS() != g_network->getLocalAddresses().address.isTLS())) {
if(addresses.secondaryAddress.present() &&
((!g_network->getLocalAddresses().secondaryAddress.present() && (addresses.address.isTLS() != g_network->getLocalAddresses().address.isTLS())) ||
(g_network->getLocalAddresses().secondaryAddress.present() && !addresses.address.isTLS()))) {
std::swap(addresses.address, addresses.secondaryAddress.get());
}
}
@ -59,6 +61,10 @@ public:
return addresses.address;
}
NetworkAddress getStableAddress() const {
return addresses.getTLSAddress();
}
bool operator == (Endpoint const& r) const {
return getPrimaryAddress() == r.getPrimaryAddress() && token == r.token;
}
@ -162,6 +168,9 @@ public:
std::map<NetworkAddress, std::pair<uint64_t, double>>* getIncompatiblePeers();
// Returns the same of all peers that have attempted to connect, but have incompatible protocol versions
Future<Void> onIncompatibleChanged();
// Returns when getIncompatiblePeers has at least one peer which is incompatible.
void addPeerReference(const Endpoint&, bool isStream);
// Signal that a peer connection is being used, even if no messages are currently being sent to the peer

View File

@ -6,7 +6,6 @@ set(FDBSERVER_SRCS
BackupProgress.actor.h
BackupWorker.actor.cpp
ClusterController.actor.cpp
ClusterRecruitmentInterface.h
ConflictSet.h
CoordinatedState.actor.cpp
CoordinatedState.h

View File

@ -36,7 +36,6 @@
#include "fdbserver/LeaderElection.h"
#include "fdbserver/LogSystemConfig.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
#include "fdbserver/RatekeeperInterface.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/Status.h"
@ -63,14 +62,15 @@ struct WorkerInfo : NonCopyable {
Future<Void> haltRatekeeper;
Future<Void> haltDistributor;
Optional<uint16_t> storageCacheInfo;
Standalone<VectorRef<StringRef>> issues;
WorkerInfo() : gen(-1), reboots(0), priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
WorkerInfo( Future<Void> watcher, ReplyPromise<RegisterWorkerReply> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, bool degraded ) :
watcher(watcher), reply(reply), gen(gen), reboots(0), initialClass(initialClass), priorityInfo(priorityInfo), details(interf, processClass, degraded) {}
WorkerInfo( Future<Void> watcher, ReplyPromise<RegisterWorkerReply> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, bool degraded, Standalone<VectorRef<StringRef>> issues ) :
watcher(watcher), reply(reply), gen(gen), reboots(0), initialClass(initialClass), priorityInfo(priorityInfo), details(interf, processClass, degraded), issues(issues) {}
WorkerInfo( WorkerInfo&& r ) BOOST_NOEXCEPT : watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen),
reboots(r.reboots), initialClass(r.initialClass), priorityInfo(r.priorityInfo), details(std::move(r.details)),
haltRatekeeper(r.haltRatekeeper), haltDistributor(r.haltDistributor), storageCacheInfo(r.storageCacheInfo) {}
haltRatekeeper(r.haltRatekeeper), haltDistributor(r.haltDistributor), storageCacheInfo(r.storageCacheInfo), issues(r.issues) {}
void operator=( WorkerInfo&& r ) BOOST_NOEXCEPT {
watcher = std::move(r.watcher);
reply = std::move(r.reply);
@ -82,6 +82,7 @@ struct WorkerInfo : NonCopyable {
haltRatekeeper = r.haltRatekeeper;
haltDistributor = r.haltDistributor;
storageCacheInfo = r.storageCacheInfo;
issues = r.issues;
}
};
@ -98,13 +99,11 @@ class ClusterControllerData {
public:
struct DBInfo {
Reference<AsyncVar<ClientDBInfo>> clientInfo;
Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> serverInfo;
CachedSerialization<ServerDBInfo> serverInfoMasterOnly;
std::set<NetworkAddress> requiredAddresses;
ProcessIssuesMap workersWithIssues;
Reference<AsyncVar<ServerDBInfo>> serverInfo;
std::map<NetworkAddress, double> incompatibleConnections;
AsyncTrigger forceMasterFailure;
int64_t masterRegistrationCount;
int64_t dbInfoCount;
bool recoveryStalled;
bool forceRecovery;
DatabaseConfiguration config; // Asynchronously updated via master registration
@ -117,42 +116,36 @@ public:
std::map<NetworkAddress, std::pair<double, OpenDatabaseRequest>> clientStatus;
DBInfo() : masterRegistrationCount(0), recoveryStalled(false), forceRecovery(false), unfinishedRecoveries(0), logGenerations(0), cachePopulated(false),
clientInfo( new AsyncVar<ClientDBInfo>( ClientDBInfo() ) ),
serverInfo( new AsyncVar<CachedSerialization<ServerDBInfo>>( CachedSerialization<ServerDBInfo>() ) ),
clientInfo( new AsyncVar<ClientDBInfo>( ClientDBInfo() ) ), dbInfoCount(0),
serverInfo( new AsyncVar<ServerDBInfo>( ServerDBInfo() ) ),
db( DatabaseContext::create( clientInfo, Future<Void>(), LocalityData(), true, TaskPriority::DefaultEndpoint, true ) ) // SOMEDAY: Locality!
{
}
void addRequiredAddresses(const std::vector<WorkerInterface>& interfaces) {
for(auto& it : interfaces) {
requiredAddresses.insert(it.address());
}
}
void setDistributor(const DataDistributorInterface& interf) {
CachedSerialization<ServerDBInfo> newInfoCache = serverInfo->get();
auto& newInfo = newInfoCache.mutate();
auto newInfo = serverInfo->get();
newInfo.id = deterministicRandom()->randomUniqueID();
newInfo.infoGeneration = ++dbInfoCount;
newInfo.distributor = interf;
serverInfo->set( newInfoCache );
serverInfo->set( newInfo );
}
void setRatekeeper(const RatekeeperInterface& interf) {
CachedSerialization<ServerDBInfo> newInfoCache = serverInfo->get();
auto& newInfo = newInfoCache.mutate();
auto newInfo = serverInfo->get();
newInfo.id = deterministicRandom()->randomUniqueID();
newInfo.infoGeneration = ++dbInfoCount;
newInfo.ratekeeper = interf;
serverInfo->set( newInfoCache );
serverInfo->set( newInfo );
}
void setStorageCache(uint16_t id, const StorageServerInterface& interf) {
CachedSerialization<ServerDBInfo> newInfoCache = serverInfo->get();
auto& newInfo = newInfoCache.mutate();
auto newInfo = serverInfo->get();
bool found = false;
for(auto& it : newInfo.storageCaches) {
if(it.first == id) {
if(it.second != interf) {
newInfo.id = deterministicRandom()->randomUniqueID();
newInfo.infoGeneration = ++dbInfoCount;
it.second = interf;
}
found = true;
@ -161,36 +154,36 @@ public:
}
if(!found) {
newInfo.id = deterministicRandom()->randomUniqueID();
newInfo.infoGeneration = ++dbInfoCount;
newInfo.storageCaches.push_back(std::make_pair(id, interf));
}
serverInfo->set( newInfoCache );
serverInfo->set( newInfo );
}
void clearInterf(ProcessClass::ClassType t) {
CachedSerialization<ServerDBInfo> newInfoCache = serverInfo->get();
auto& newInfo = newInfoCache.mutate();
auto newInfo = serverInfo->get();
newInfo.id = deterministicRandom()->randomUniqueID();
newInfo.infoGeneration = ++dbInfoCount;
if (t == ProcessClass::DataDistributorClass) {
newInfo.distributor = Optional<DataDistributorInterface>();
} else if (t == ProcessClass::RatekeeperClass) {
newInfo.ratekeeper = Optional<RatekeeperInterface>();
}
serverInfo->set( newInfoCache );
serverInfo->set( newInfo );
}
void clearStorageCache(uint16_t id) {
CachedSerialization<ServerDBInfo> newInfoCache = serverInfo->get();
auto& newInfo = newInfoCache.mutate();
auto newInfo = serverInfo->get();
for(auto it = newInfo.storageCaches.begin(); it != newInfo.storageCaches.end(); ++it) {
if(it->first == id) {
newInfo.id = deterministicRandom()->randomUniqueID();
newInfo.infoGeneration = ++dbInfoCount;
newInfo.storageCaches.erase(it);
break;
}
}
serverInfo->set( newInfoCache );
serverInfo->set( newInfo );
}
};
struct UpdateWorkerList {
@ -256,8 +249,8 @@ public:
}
bool isLongLivedStateless( Optional<Key> const& processId ) {
return (db.serverInfo->get().read().distributor.present() && db.serverInfo->get().read().distributor.get().locality.processId() == processId) ||
(db.serverInfo->get().read().ratekeeper.present() && db.serverInfo->get().read().ratekeeper.get().locality.processId() == processId);
return (db.serverInfo->get().distributor.present() && db.serverInfo->get().distributor.get().locality.processId() == processId) ||
(db.serverInfo->get().ratekeeper.present() && db.serverInfo->get().ratekeeper.get().locality.processId() == processId);
}
WorkerDetails getStorageWorker( RecruitStorageRequest const& req ) {
@ -270,6 +263,7 @@ public:
!excludedMachines.count(it.second.details.interf.locality.zoneId()) &&
( includeDCs.size() == 0 || includeDCs.count(it.second.details.interf.locality.dcId()) ) &&
!addressExcluded(excludedAddresses, it.second.details.interf.address()) &&
( !it.second.details.interf.secondaryAddress().present() || !addressExcluded(excludedAddresses, it.second.details.interf.secondaryAddress().get()) ) &&
it.second.details.processClass.machineClassFitness( ProcessClass::Storage ) <= ProcessClass::UnsetFit ) {
return it.second.details;
}
@ -306,7 +300,7 @@ public:
for( auto& it : id_worker ) {
auto fitness = it.second.details.processClass.machineClassFitness( ProcessClass::Storage );
if( workerAvailable(it.second, false) && !conf.isExcludedServer(it.second.details.interf.address()) && fitness != ProcessClass::NeverAssign && ( !dcId.present() || it.second.details.interf.locality.dcId()==dcId.get() ) ) {
if( workerAvailable(it.second, false) && !conf.isExcludedServer(it.second.details.interf.addresses()) && fitness != ProcessClass::NeverAssign && ( !dcId.present() || it.second.details.interf.locality.dcId()==dcId.get() ) ) {
fitness_workers[ fitness ].push_back(it.second.details);
}
}
@ -351,7 +345,7 @@ public:
for( auto& it : id_worker ) {
if (std::find(exclusionWorkerIds.begin(), exclusionWorkerIds.end(), it.second.details.interf.id()) == exclusionWorkerIds.end()) {
auto fitness = it.second.details.processClass.machineClassFitness(ProcessClass::TLog);
if (workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.details.interf.address()) && fitness != ProcessClass::NeverAssign && (!dcIds.size() || dcIds.count(it.second.details.interf.locality.dcId()))) {
if (workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.details.interf.addresses()) && fitness != ProcessClass::NeverAssign && (!dcIds.size() || dcIds.count(it.second.details.interf.locality.dcId()))) {
fitness_workers[std::make_pair(fitness, it.second.details.degraded)].push_back(it.second.details);
}
else {
@ -507,7 +501,7 @@ public:
for( auto& it : id_worker ) {
auto fitness = it.second.details.processClass.machineClassFitness( role );
if(conf.isExcludedServer(it.second.details.interf.address())) {
if(conf.isExcludedServer(it.second.details.interf.addresses())) {
fitness = std::max(fitness, ProcessClass::ExcludeFit);
}
if( workerAvailable(it.second, checkStable) && fitness < unacceptableFitness && it.second.details.interf.locality.dcId()==dcId ) {
@ -545,7 +539,7 @@ public:
for( auto& it : id_worker ) {
auto fitness = it.second.details.processClass.machineClassFitness( role );
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.details.interf.address()) && it.second.details.interf.locality.dcId() == dcId &&
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.details.interf.addresses()) && it.second.details.interf.locality.dcId() == dcId &&
( !minWorker.present() || ( it.second.details.interf.id() != minWorker.get().worker.interf.id() && ( fitness < minWorker.get().fitness || (fitness == minWorker.get().fitness && id_used[it.first] <= minWorker.get().used ) ) ) ) ) {
if (isLongLivedStateless(it.first)) {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].second.push_back(it.second.details);
@ -664,7 +658,7 @@ public:
std::set<Optional<Standalone<StringRef>>> getDatacenters( DatabaseConfiguration const& conf, bool checkStable = false ) {
std::set<Optional<Standalone<StringRef>>> result;
for( auto& it : id_worker )
if( workerAvailable( it.second, checkStable ) && !conf.isExcludedServer( it.second.details.interf.address() ) )
if( workerAvailable( it.second, checkStable ) && !conf.isExcludedServer( it.second.details.interf.addresses() ) )
result.insert(it.second.details.interf.locality.dcId());
return result;
}
@ -984,7 +978,7 @@ public:
}
void checkRecoveryStalled() {
if( (db.serverInfo->get().read().recoveryState == RecoveryState::RECRUITING || db.serverInfo->get().read().recoveryState == RecoveryState::ACCEPTING_COMMITS || db.serverInfo->get().read().recoveryState == RecoveryState::ALL_LOGS_RECRUITED) && db.recoveryStalled ) {
if( (db.serverInfo->get().recoveryState == RecoveryState::RECRUITING || db.serverInfo->get().recoveryState == RecoveryState::ACCEPTING_COMMITS || db.serverInfo->get().recoveryState == RecoveryState::ALL_LOGS_RECRUITED) && db.recoveryStalled ) {
if (db.config.regions.size() > 1) {
auto regions = db.config.regions;
if(clusterControllerDcId.get() == regions[0].dcId) {
@ -998,7 +992,7 @@ public:
//FIXME: determine when to fail the cluster controller when a primaryDC has not been set
bool betterMasterExists() {
const ServerDBInfo dbi = db.serverInfo->get().read();
const ServerDBInfo dbi = db.serverInfo->get();
if(dbi.recoveryState < RecoveryState::ACCEPTING_COMMITS) {
return false;
@ -1094,7 +1088,7 @@ public:
// Check master fitness. Don't return false if master is excluded in case all the processes are excluded, we still need master for recovery.
ProcessClass::Fitness oldMasterFit = masterWorker->second.details.processClass.machineClassFitness( ProcessClass::Master );
if(db.config.isExcludedServer(dbi.master.address())) {
if(db.config.isExcludedServer(dbi.master.addresses())) {
oldMasterFit = std::max(oldMasterFit, ProcessClass::ExcludeFit);
}
@ -1102,7 +1096,7 @@ public:
id_used[clusterControllerProcessId]++;
WorkerFitnessInfo mworker = getWorkerForRoleInDatacenter(clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db.config, id_used, true);
auto newMasterFit = mworker.worker.processClass.machineClassFitness( ProcessClass::Master );
if(db.config.isExcludedServer(mworker.worker.interf.address())) {
if(db.config.isExcludedServer(mworker.worker.interf.addresses())) {
newMasterFit = std::max(newMasterFit, ProcessClass::ExcludeFit);
}
@ -1263,7 +1257,7 @@ public:
ASSERT(masterProcessId.present());
if (processId == masterProcessId) return false;
auto& dbInfo = db.serverInfo->get().read();
auto& dbInfo = db.serverInfo->get();
for (const auto& tlogset : dbInfo.logSystemConfig.tLogs) {
for (const auto& tlog: tlogset.tLogs) {
if (tlog.present() && tlog.interf().locality.processId() == processId) return true;
@ -1293,7 +1287,7 @@ public:
std::map<Optional<Standalone<StringRef>>, int> idUsed;
updateKnownIds(&idUsed);
auto& dbInfo = db.serverInfo->get().read();
auto& dbInfo = db.serverInfo->get();
for (const auto& tlogset : dbInfo.logSystemConfig.tLogs) {
for (const auto& tlog: tlogset.tLogs) {
if (tlog.present()) {
@ -1331,6 +1325,9 @@ public:
UpdateWorkerList updateWorkerList;
Future<Void> outstandingRequestChecker;
Future<Void> outstandingRemoteRequestChecker;
AsyncTrigger updateDBInfo;
std::set<Endpoint> updateDBInfoEndpoints;
std::set<Endpoint> removedDBInfoEndpoints;
DBInfo db;
Database cx;
@ -1351,7 +1348,6 @@ public:
Counter getWorkersRequests;
Counter getClientWorkersRequests;
Counter registerMasterRequests;
Counter getServerDBInfoRequests;
Counter statusRequests;
Counter failureMonitoringRequests;
@ -1370,18 +1366,18 @@ public:
getWorkersRequests("GetWorkersRequests", clusterControllerMetrics),
getClientWorkersRequests("GetClientWorkersRequests", clusterControllerMetrics),
registerMasterRequests("RegisterMasterRequests", clusterControllerMetrics),
getServerDBInfoRequests("GetServerDBInfoRequests", clusterControllerMetrics),
statusRequests("StatusRequests", clusterControllerMetrics),
failureMonitoringRequests("FailureMonitoringRequests", clusterControllerMetrics),
serversFailed("ServersFailed", clusterControllerMetrics),
serversUnfailed("ServersUnfailed", clusterControllerMetrics)
{
auto& serverInfo = db.serverInfoMasterOnly.mutate();
auto serverInfo = ServerDBInfo();
serverInfo.id = deterministicRandom()->randomUniqueID();
serverInfo.infoGeneration = ++db.dbInfoCount;
serverInfo.masterLifetime.ccID = id;
serverInfo.clusterInterface = ccInterface;
serverInfo.myLocality = locality;
db.serverInfo->set( db.serverInfoMasterOnly );
db.serverInfo->set( serverInfo );
cx = openDBOnServer(db.serverInfo, TaskPriority::DefaultEndpoint, true, true);
}
@ -1416,7 +1412,7 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
continue;
}
RecruitMasterRequest rmq;
rmq.lifetime = db->serverInfo->get().read().masterLifetime;
rmq.lifetime = db->serverInfo->get().masterLifetime;
rmq.forceRecovery = db->forceRecovery;
cluster->masterProcessId = masterWorker.worker.interf.locality.processId();
@ -1436,22 +1432,20 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
db->masterRegistrationCount = 0;
db->recoveryStalled = false;
db->serverInfoMasterOnly = CachedSerialization<ServerDBInfo>();
auto& dbInfo = db->serverInfoMasterOnly.mutate();
auto dbInfo = ServerDBInfo();
dbInfo.master = iMaster;
dbInfo.id = deterministicRandom()->randomUniqueID();
dbInfo.masterLifetime = db->serverInfo->get().read().masterLifetime;
dbInfo.infoGeneration = ++db->dbInfoCount;
dbInfo.masterLifetime = db->serverInfo->get().masterLifetime;
++dbInfo.masterLifetime;
dbInfo.clusterInterface = db->serverInfo->get().read().clusterInterface;
dbInfo.distributor = db->serverInfo->get().read().distributor;
dbInfo.ratekeeper = db->serverInfo->get().read().ratekeeper;
dbInfo.storageCaches = db->serverInfo->get().read().storageCaches;
dbInfo.latencyBandConfig = db->serverInfo->get().read().latencyBandConfig;
dbInfo.clusterInterface = db->serverInfo->get().clusterInterface;
dbInfo.distributor = db->serverInfo->get().distributor;
dbInfo.ratekeeper = db->serverInfo->get().ratekeeper;
dbInfo.storageCaches = db->serverInfo->get().storageCaches;
dbInfo.latencyBandConfig = db->serverInfo->get().latencyBandConfig;
TraceEvent("CCWDB", cluster->id).detail("Lifetime", dbInfo.masterLifetime.toString()).detail("ChangeID", dbInfo.id);
db->requiredAddresses.clear();
db->serverInfo->set( db->serverInfoMasterOnly );
db->serverInfo->set( dbInfo );
state Future<Void> spinDelay = delay(SERVER_KNOBS->MASTER_SPIN_DELAY); // Don't retry master recovery more than once per second, but don't delay the "first" recovery after more than a second of normal operation
@ -1486,30 +1480,14 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
}
ACTOR Future<Void> clusterGetServerInfo(ClusterControllerData::DBInfo* db, UID knownServerInfoID,
Standalone<VectorRef<StringRef>> issues,
std::vector<NetworkAddress> incompatiblePeers,
ReplyPromise<CachedSerialization<ServerDBInfo>> reply) {
state Optional<UID> issueID;
state bool useMasterOnly = false;
setIssues(db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID);
for(auto it : incompatiblePeers) {
db->incompatibleConnections[it] = now() + SERVER_KNOBS->INCOMPATIBLE_PEERS_LOGGING_INTERVAL;
}
loop {
useMasterOnly = db->serverInfo->get().read().recoveryState < RecoveryState::ACCEPTING_COMMITS && !db->requiredAddresses.count(reply.getEndpoint().getPrimaryAddress());
if((useMasterOnly ? db->serverInfoMasterOnly.read().id : db->serverInfo->get().read().id) != knownServerInfoID) {
break;
}
ReplyPromise<ServerDBInfo> reply) {
while(db->serverInfo->get().id == knownServerInfoID) {
choose {
when (wait( yieldedFuture(db->serverInfo->onChange()) )) {}
when (wait( delayJittered( 300 ) )) { break; } // The server might be long gone!
}
}
removeIssues(db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issueID);
reply.send( useMasterOnly ? db->serverInfoMasterOnly : db->serverInfo->get() );
reply.send( db->serverInfo->get() );
return Void();
}
@ -1535,12 +1513,6 @@ void checkOutstandingRecruitmentRequests( ClusterControllerData* self ) {
RecruitFromConfigurationRequest& req = self->outstandingRecruitmentRequests[i];
try {
RecruitFromConfigurationReply rep = self->findWorkersForConfiguration( req );
self->db.addRequiredAddresses(rep.oldLogRouters);
self->db.addRequiredAddresses(rep.proxies);
self->db.addRequiredAddresses(rep.resolvers);
self->db.addRequiredAddresses(rep.satelliteTLogs);
self->db.addRequiredAddresses(rep.tLogs);
self->db.serverInfo->trigger();
req.reply.send( rep );
swapAndPop( &self->outstandingRecruitmentRequests, i-- );
} catch (Error& e) {
@ -1559,9 +1531,6 @@ void checkOutstandingRemoteRecruitmentRequests( ClusterControllerData* self ) {
RecruitRemoteFromConfigurationRequest& req = self->outstandingRemoteRecruitmentRequests[i];
try {
RecruitRemoteFromConfigurationReply rep = self->findRemoteWorkersForConfiguration( req );
self->db.addRequiredAddresses(rep.remoteTLogs);
self->db.addRequiredAddresses(rep.logRouters);
self->db.serverInfo->trigger();
req.reply.send( rep );
swapAndPop( &self->outstandingRemoteRecruitmentRequests, i-- );
} catch (Error& e) {
@ -1609,7 +1578,7 @@ void checkOutstandingStorageRequests( ClusterControllerData* self ) {
}
void checkBetterDDOrRK(ClusterControllerData* self) {
if (!self->masterProcessId.present() || self->db.serverInfo->get().read().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
if (!self->masterProcessId.present() || self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
return;
}
@ -1628,11 +1597,11 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
newDDWorker = self->id_worker[self->masterProcessId.get()].details;
}
auto bestFitnessForRK = newRKWorker.processClass.machineClassFitness(ProcessClass::Ratekeeper);
if(self->db.config.isExcludedServer(newRKWorker.interf.address())) {
if(self->db.config.isExcludedServer(newRKWorker.interf.addresses())) {
bestFitnessForRK = std::max(bestFitnessForRK, ProcessClass::ExcludeFit);
}
auto bestFitnessForDD = newDDWorker.processClass.machineClassFitness(ProcessClass::DataDistributor);
if(self->db.config.isExcludedServer(newDDWorker.interf.address())) {
if(self->db.config.isExcludedServer(newDDWorker.interf.addresses())) {
bestFitnessForDD = std::max(bestFitnessForDD, ProcessClass::ExcludeFit);
}
//TraceEvent("CheckBetterDDorRKNewRecruits", self->id).detail("MasterProcessId", self->masterProcessId)
@ -1641,7 +1610,7 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
Optional<Standalone<StringRef>> currentRKProcessId;
Optional<Standalone<StringRef>> currentDDProcessId;
auto& db = self->db.serverInfo->get().read();
auto& db = self->db.serverInfo->get();
bool ratekeeperHealthy = false;
if (db.ratekeeper.present() && self->id_worker.count(db.ratekeeper.get().locality.processId()) &&
(!self->recruitingRatekeeperID.present() || (self->recruitingRatekeeperID.get() == db.ratekeeper.get().id()))) {
@ -1700,7 +1669,7 @@ ACTOR Future<Void> doCheckOutstandingRequests( ClusterControllerData* self ) {
self->checkRecoveryStalled();
if (self->betterMasterExists()) {
self->db.forceMasterFailure.trigger();
TraceEvent("MasterRegistrationKill", self->id).detail("MasterId", self->db.serverInfo->get().read().master.id());
TraceEvent("MasterRegistrationKill", self->id).detail("MasterId", self->db.serverInfo->get().master.id());
}
} catch( Error &e ) {
if(e.code() != error_code_no_more_servers) {
@ -1757,12 +1726,14 @@ ACTOR Future<Void> rebootAndCheck( ClusterControllerData* cluster, Optional<Stan
return Void();
}
ACTOR Future<Void> workerAvailabilityWatch( WorkerInterface worker, ProcessClass startingClass, ClusterControllerData* cluster ) {
ACTOR Future<Void> workerAvailabilityWatch( WorkerInterface worker, ProcessClass startingClass, ClusterControllerData* cluster) {
state Future<Void> failed =
(worker.address() == g_network->getLocalAddress() || startingClass.classType() == ProcessClass::TesterClass)
? Never()
: waitFailureClient(worker.waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME);
cluster->updateWorkerList.set( worker.locality.processId(), ProcessData(worker.locality, startingClass, worker.address()) );
cluster->updateWorkerList.set( worker.locality.processId(), ProcessData(worker.locality, startingClass, worker.stableAddress()) );
cluster->updateDBInfoEndpoints.insert(worker.updateServerDBInfo.getEndpoint());
cluster->updateDBInfo.trigger();
// This switching avoids a race where the worker can be added to id_worker map after the workerAvailabilityWatch fails for the worker.
wait(delay(0));
@ -1801,6 +1772,7 @@ ACTOR Future<Void> workerAvailabilityWatch( WorkerInterface worker, ProcessClass
if (worker.locality.processId() == cluster->masterProcessId) {
cluster->masterProcessId = Optional<Key>();
}
cluster->removedDBInfoEndpoints.insert(worker.updateServerDBInfo.getEndpoint());
cluster->id_worker.erase( worker.locality.processId() );
cluster->updateWorkerList.set( worker.locality.processId(), Optional<ProcessData>() );
return Void();
@ -1996,12 +1968,6 @@ ACTOR Future<Void> clusterRecruitFromConfiguration( ClusterControllerData* self,
loop {
try {
auto rep = self->findWorkersForConfiguration( req );
self->db.addRequiredAddresses(rep.oldLogRouters);
self->db.addRequiredAddresses(rep.proxies);
self->db.addRequiredAddresses(rep.resolvers);
self->db.addRequiredAddresses(rep.satelliteTLogs);
self->db.addRequiredAddresses(rep.tLogs);
self->db.serverInfo->trigger();
req.reply.send( rep );
return Void();
} catch (Error& e) {
@ -2027,9 +1993,6 @@ ACTOR Future<Void> clusterRecruitRemoteFromConfiguration( ClusterControllerData*
loop {
try {
RecruitRemoteFromConfigurationReply rep = self->findRemoteWorkersForConfiguration( req );
self->db.addRequiredAddresses(rep.remoteTLogs);
self->db.addRequiredAddresses(rep.logRouters);
self->db.serverInfo->trigger();
req.reply.send( rep );
return Void();
} catch (Error& e) {
@ -2066,8 +2029,8 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
//make sure the request comes from an active database
auto db = &self->db;
if ( db->serverInfo->get().read().master.id() != req.id || req.registrationCount <= db->masterRegistrationCount ) {
TraceEvent("MasterRegistrationNotFound", self->id).detail("MasterId", req.id).detail("ExistingId", db->serverInfo->get().read().master.id()).detail("RegCount", req.registrationCount).detail("ExistingRegCount", db->masterRegistrationCount);
if ( db->serverInfo->get().master.id() != req.id || req.registrationCount <= db->masterRegistrationCount ) {
TraceEvent("MasterRegistrationNotFound", self->id).detail("MasterId", req.id).detail("ExistingId", db->serverInfo->get().master.id()).detail("RegCount", req.registrationCount).detail("ExistingRegCount", db->masterRegistrationCount);
return;
}
@ -2088,7 +2051,7 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
self->gotFullyRecoveredConfig = true;
db->fullyRecoveredConfig = req.configuration.get();
for ( auto& it : self->id_worker ) {
bool isExcludedFromConfig = db->fullyRecoveredConfig.isExcludedServer(it.second.details.interf.address());
bool isExcludedFromConfig = db->fullyRecoveredConfig.isExcludedServer(it.second.details.interf.addresses());
if ( it.second.priorityInfo.isExcluded != isExcludedFromConfig ) {
it.second.priorityInfo.isExcluded = isExcludedFromConfig;
if( !it.second.reply.isSet() ) {
@ -2100,8 +2063,7 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
}
bool isChanged = false;
auto cachedInfo = self->db.serverInfo->get();
auto& dbInfo = cachedInfo.mutate();
auto dbInfo = self->db.serverInfo->get();
if (dbInfo.recoveryState != req.recoveryState) {
dbInfo.recoveryState = req.recoveryState;
@ -2142,7 +2104,8 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
if( isChanged ) {
dbInfo.id = deterministicRandom()->randomUniqueID();
self->db.serverInfo->set( cachedInfo );
dbInfo.infoGeneration = ++self->db.dbInfoCount;
self->db.serverInfo->set( dbInfo );
}
checkOutstandingRequests(self);
@ -2155,6 +2118,11 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
ClusterControllerPriorityInfo newPriorityInfo = req.priorityInfo;
newPriorityInfo.processClassFitness = newProcessClass.machineClassFitness(ProcessClass::ClusterController);
for(auto it : req.incompatiblePeers) {
self->db.incompatibleConnections[it] = now() + SERVER_KNOBS->INCOMPATIBLE_PEERS_LOGGING_INTERVAL;
}
self->removedDBInfoEndpoints.erase(w.updateServerDBInfo.getEndpoint());
if(info == self->id_worker.end()) {
TraceEvent("ClusterControllerActualWorkers", self->id).detail("WorkerId",w.id()).detail("ProcessId", w.locality.processId()).detail("ZoneId", w.locality.zoneId()).detail("DataHall", w.locality.dataHallId()).detail("PClass", req.processClass.toString()).detail("Workers", self->id_worker.size());
self->goodRecruitmentTime = lowPriorityDelay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY);
@ -2194,13 +2162,13 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
}
if ( self->gotFullyRecoveredConfig ) {
newPriorityInfo.isExcluded = self->db.fullyRecoveredConfig.isExcludedServer(w.address());
newPriorityInfo.isExcluded = self->db.fullyRecoveredConfig.isExcludedServer(w.addresses());
}
}
if( info == self->id_worker.end() ) {
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, newPriorityInfo, req.degraded );
if (!self->masterProcessId.present() && w.locality.processId() == self->db.serverInfo->get().read().master.locality.processId()) {
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, newPriorityInfo, req.degraded, req.issues );
if (!self->masterProcessId.present() && w.locality.processId() == self->db.serverInfo->get().master.locality.processId()) {
self->masterProcessId = w.locality.processId();
}
checkOutstandingRequests( self );
@ -2214,8 +2182,10 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
info->second.initialClass = req.initialClass;
info->second.details.degraded = req.degraded;
info->second.gen = req.generation;
info->second.issues = req.issues;
if(info->second.details.interf.id() != w.id()) {
self->removedDBInfoEndpoints.insert(info->second.details.interf.updateServerDBInfo.getEndpoint());
info->second.details.interf = w;
info->second.watcher = workerAvailabilityWatch( w, newProcessClass, self );
}
@ -2224,7 +2194,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
TEST(true); // Received an old worker registration request.
}
if (req.distributorInterf.present() && !self->db.serverInfo->get().read().distributor.present() &&
if (req.distributorInterf.present() && !self->db.serverInfo->get().distributor.present() &&
self->clusterControllerDcId == req.distributorInterf.get().locality.dcId() &&
!self->recruitingDistributor) {
const DataDistributorInterface& di = req.distributorInterf.get();
@ -2244,7 +2214,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
req.ratekeeperInterf.get().haltRatekeeper.getReply(HaltRatekeeperRequest(self->id)));
} else if (!self->recruitingRatekeeperID.present()) {
const RatekeeperInterface& rki = req.ratekeeperInterf.get();
const auto& ratekeeper = self->db.serverInfo->get().read().ratekeeper;
const auto& ratekeeper = self->db.serverInfo->get().ratekeeper;
TraceEvent("CCRegisterRatekeeper", self->id).detail("RKID", rki.id());
if (ratekeeper.present() && ratekeeper.get().id() != rki.id() && self->id_worker.count(ratekeeper.get().locality.processId())) {
TraceEvent("CCHaltPreviousRatekeeper", self->id).detail("RKID", ratekeeper.get().id())
@ -2425,8 +2395,14 @@ ACTOR Future<Void> statusServer(FutureStream< StatusRequest> requests,
// Get status but trap errors to send back to client.
vector<WorkerDetails> workers;
for(auto& it : self->id_worker)
std::vector<ProcessIssues> workerIssues;
for(auto& it : self->id_worker) {
workers.push_back(it.second.details);
if(it.second.issues.size()) {
workerIssues.push_back(ProcessIssues(it.second.details.interf.address(), it.second.issues));
}
}
std::vector<NetworkAddress> incompatibleConnections;
for(auto it = self->db.incompatibleConnections.begin(); it != self->db.incompatibleConnections.end();) {
@ -2438,7 +2414,7 @@ ACTOR Future<Void> statusServer(FutureStream< StatusRequest> requests,
}
}
state ErrorOr<StatusReply> result = wait(errorOr(clusterGetStatus(self->db.serverInfo, self->cx, workers, self->db.workersWithIssues, &self->db.clientStatus, coordinators, incompatibleConnections, self->datacenterVersionDifference)));
state ErrorOr<StatusReply> result = wait(errorOr(clusterGetStatus(self->db.serverInfo, self->cx, workers, workerIssues, &self->db.clientStatus, coordinators, incompatibleConnections, self->datacenterVersionDifference)));
if (result.isError() && result.getError().code() == error_code_actor_cancelled)
throw result.getError();
@ -2565,13 +2541,13 @@ ACTOR Future<Void> monitorServerInfoConfig(ClusterControllerData::DBInfo* db) {
config = LatencyBandConfig::parse(configVal.get());
}
auto cachedInfo = db->serverInfo->get();
auto& serverInfo = cachedInfo.mutate();
auto serverInfo = db->serverInfo->get();
if(config != serverInfo.latencyBandConfig) {
TraceEvent("LatencyBandConfigChanged").detail("Present", config.present());
serverInfo.id = deterministicRandom()->randomUniqueID();
serverInfo.infoGeneration = ++db->dbInfoCount;
serverInfo.latencyBandConfig = config;
db->serverInfo->set(cachedInfo);
db->serverInfo->set(serverInfo);
}
state Future<Void> configChangeFuture = tr.watch(latencyBandConfigKey);
@ -2799,7 +2775,7 @@ ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *sel
state double lastLogTime = 0;
loop {
self->versionDifferenceUpdated = false;
if(self->db.serverInfo->get().read().recoveryState >= RecoveryState::ACCEPTING_COMMITS && self->db.config.usableRegions == 1) {
if(self->db.serverInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && self->db.config.usableRegions == 1) {
bool oldDifferenceTooLarge = !self->versionDifferenceUpdated || self->datacenterVersionDifference >= SERVER_KNOBS->MAX_VERSION_DIFFERENCE;
self->versionDifferenceUpdated = true;
self->datacenterVersionDifference = 0;
@ -2814,8 +2790,8 @@ ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *sel
state Optional<TLogInterface> primaryLog;
state Optional<TLogInterface> remoteLog;
if(self->db.serverInfo->get().read().recoveryState >= RecoveryState::ALL_LOGS_RECRUITED) {
for(auto& logSet : self->db.serverInfo->get().read().logSystemConfig.tLogs) {
if(self->db.serverInfo->get().recoveryState >= RecoveryState::ALL_LOGS_RECRUITED) {
for(auto& logSet : self->db.serverInfo->get().logSystemConfig.tLogs) {
if(logSet.isLocal && logSet.locality != tagLocalitySatellite) {
for(auto& tLog : logSet.tLogs) {
if(tLog.present()) {
@ -2916,12 +2892,12 @@ ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerDa
TraceEvent("CCStartDataDistributor", self->id);
loop {
try {
state bool no_distributor = !self->db.serverInfo->get().read().distributor.present();
while (!self->masterProcessId.present() || self->masterProcessId != self->db.serverInfo->get().read().master.locality.processId() || self->db.serverInfo->get().read().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
state bool no_distributor = !self->db.serverInfo->get().distributor.present();
while (!self->masterProcessId.present() || self->masterProcessId != self->db.serverInfo->get().master.locality.processId() || self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
wait(self->db.serverInfo->onChange() || delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY));
}
if (no_distributor && self->db.serverInfo->get().read().distributor.present()) {
return self->db.serverInfo->get().read().distributor.get();
if (no_distributor && self->db.serverInfo->get().distributor.present()) {
return self->db.serverInfo->get().distributor.get();
}
std::map<Optional<Standalone<StringRef>>, int> id_used = self->getUsedIds();
@ -2951,15 +2927,15 @@ ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerDa
}
ACTOR Future<Void> monitorDataDistributor(ClusterControllerData *self) {
while(self->db.serverInfo->get().read().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
while(self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
wait(self->db.serverInfo->onChange());
}
loop {
if ( self->db.serverInfo->get().read().distributor.present() ) {
wait( waitFailureClient( self->db.serverInfo->get().read().distributor.get().waitFailure, SERVER_KNOBS->DD_FAILURE_TIME ) );
if ( self->db.serverInfo->get().distributor.present() ) {
wait( waitFailureClient( self->db.serverInfo->get().distributor.get().waitFailure, SERVER_KNOBS->DD_FAILURE_TIME ) );
TraceEvent("CCDataDistributorDied", self->id)
.detail("DistributorId", self->db.serverInfo->get().read().distributor.get().id());
.detail("DistributorId", self->db.serverInfo->get().distributor.get().id());
self->db.clearInterf(ProcessClass::DataDistributorClass);
} else {
self->recruitingDistributor = true;
@ -2976,11 +2952,11 @@ ACTOR Future<Void> startRatekeeper(ClusterControllerData *self) {
TraceEvent("CCStartRatekeeper", self->id);
loop {
try {
state bool no_ratekeeper = !self->db.serverInfo->get().read().ratekeeper.present();
while (!self->masterProcessId.present() || self->masterProcessId != self->db.serverInfo->get().read().master.locality.processId() || self->db.serverInfo->get().read().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
state bool no_ratekeeper = !self->db.serverInfo->get().ratekeeper.present();
while (!self->masterProcessId.present() || self->masterProcessId != self->db.serverInfo->get().master.locality.processId() || self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
wait(self->db.serverInfo->onChange() || delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY));
}
if (no_ratekeeper && self->db.serverInfo->get().read().ratekeeper.present()) {
if (no_ratekeeper && self->db.serverInfo->get().ratekeeper.present()) {
// Existing ratekeeper registers while waiting, so skip.
return Void();
}
@ -3000,7 +2976,7 @@ ACTOR Future<Void> startRatekeeper(ClusterControllerData *self) {
if (interf.present()) {
self->recruitRatekeeper.set(false);
self->recruitingRatekeeperID = interf.get().id();
const auto& ratekeeper = self->db.serverInfo->get().read().ratekeeper;
const auto& ratekeeper = self->db.serverInfo->get().ratekeeper;
TraceEvent("CCRatekeeperRecruited", self->id).detail("Addr", worker.interf.address()).detail("RKID", interf.get().id());
if (ratekeeper.present() && ratekeeper.get().id() != interf.get().id() && self->id_worker.count(ratekeeper.get().locality.processId())) {
TraceEvent("CCHaltRatekeeperAfterRecruit", self->id).detail("RKID", ratekeeper.get().id())
@ -3025,16 +3001,16 @@ ACTOR Future<Void> startRatekeeper(ClusterControllerData *self) {
}
ACTOR Future<Void> monitorRatekeeper(ClusterControllerData *self) {
while(self->db.serverInfo->get().read().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
while(self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
wait(self->db.serverInfo->onChange());
}
loop {
if ( self->db.serverInfo->get().read().ratekeeper.present() && !self->recruitRatekeeper.get() ) {
if ( self->db.serverInfo->get().ratekeeper.present() && !self->recruitRatekeeper.get() ) {
choose {
when(wait(waitFailureClient( self->db.serverInfo->get().read().ratekeeper.get().waitFailure, SERVER_KNOBS->RATEKEEPER_FAILURE_TIME ))) {
when(wait(waitFailureClient( self->db.serverInfo->get().ratekeeper.get().waitFailure, SERVER_KNOBS->RATEKEEPER_FAILURE_TIME ))) {
TraceEvent("CCRatekeeperDied", self->id)
.detail("RKID", self->db.serverInfo->get().read().ratekeeper.get().id());
.detail("RKID", self->db.serverInfo->get().ratekeeper.get().id());
self->db.clearInterf(ProcessClass::RatekeeperClass);
}
when(wait(self->recruitRatekeeper.onChange())) {}
@ -3045,6 +3021,54 @@ ACTOR Future<Void> monitorRatekeeper(ClusterControllerData *self) {
}
}
ACTOR Future<Void> dbInfoUpdater( ClusterControllerData* self ) {
state Future<Void> dbInfoChange = self->db.serverInfo->onChange();
state Future<Void> updateDBInfo = self->updateDBInfo.onTrigger();
loop {
choose {
when(wait(updateDBInfo)) {
wait(delay(SERVER_KNOBS->DBINFO_BATCH_DELAY) || dbInfoChange);
}
when(wait(dbInfoChange)) {}
}
UpdateServerDBInfoRequest req;
if(dbInfoChange.isReady()) {
for(auto &it : self->id_worker) {
req.broadcastInfo.push_back(it.second.details.interf.updateServerDBInfo.getEndpoint());
}
} else {
for(auto it : self->removedDBInfoEndpoints) {
self->updateDBInfoEndpoints.erase(it);
}
req.broadcastInfo = std::vector<Endpoint>(self->updateDBInfoEndpoints.begin(), self->updateDBInfoEndpoints.end());
}
self->updateDBInfoEndpoints.clear();
self->removedDBInfoEndpoints.clear();
dbInfoChange = self->db.serverInfo->onChange();
updateDBInfo = self->updateDBInfo.onTrigger();
req.serializedDbInfo = BinaryWriter::toValue(self->db.serverInfo->get(), AssumeVersion(currentProtocolVersion));
TraceEvent("DBInfoStartBroadcast", self->id);
choose {
when(std::vector<Endpoint> notUpdated = wait( broadcastDBInfoRequest(req, SERVER_KNOBS->DBINFO_SEND_AMOUNT, Optional<Endpoint>(), false) )) {
TraceEvent("DBInfoFinishBroadcast", self->id);
for(auto &it : notUpdated) {
TraceEvent("DBInfoNotUpdated", self->id).detail("Addr", it.getPrimaryAddress());
}
if(notUpdated.size()) {
self->updateDBInfoEndpoints.insert(notUpdated.begin(), notUpdated.end());
self->updateDBInfo.trigger();
}
}
when(wait(dbInfoChange)) {}
}
}
}
ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf, Future<Void> leaderFail, ServerCoordinators coordinators, LocalityData locality ) {
state ClusterControllerData self( interf, locality );
state Future<Void> coordinationPingDelay = delay( SERVER_KNOBS->WORKER_COORDINATION_PING_DELAY );
@ -3066,6 +3090,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
self.addActor.send( monitorDataDistributor(&self) );
self.addActor.send( monitorRatekeeper(&self) );
self.addActor.send( monitorStorageCache(&self) );
self.addActor.send( dbInfoUpdater(&self) );
self.addActor.send( traceCounters("ClusterControllerMetrics", self.id, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &self.clusterControllerMetrics, self.id.toString() + "/ClusterControllerMetrics") );
//printf("%s: I am the cluster controller\n", g_network->getLocalAddress().toString().c_str());
@ -3103,7 +3128,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
vector<WorkerDetails> workers;
for(auto& it : self.id_worker) {
if ( (req.flags & GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY) && self.db.config.isExcludedServer(it.second.details.interf.address()) ) {
if ( (req.flags & GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY) && self.db.config.isExcludedServer(it.second.details.interf.addresses()) ) {
continue;
}
@ -3138,9 +3163,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
clusterRegisterMaster( &self, req );
}
when( GetServerDBInfoRequest req = waitNext( interf.getServerDBInfo.getFuture() ) ) {
++self.getServerDBInfoRequests;
self.addActor.send(
clusterGetServerInfo(&self.db, req.knownServerInfoID, req.issues, req.incompatiblePeers, req.reply));
self.addActor.send( clusterGetServerInfo(&self.db, req.knownServerInfoID, req.reply) );
}
when( wait( leaderFail ) ) {
// We are no longer the leader if this has changed.

View File

@ -1,263 +0,0 @@
/*
* ClusterRecruitmentInterface.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBSERVER_CLUSTERRECRUITMENTINTERFACE_H
#define FDBSERVER_CLUSTERRECRUITMENTINTERFACE_H
#pragma once
#include <vector>
#include "fdbclient/ClusterInterface.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/MasterProxyInterface.h"
#include "fdbclient/DatabaseConfiguration.h"
#include "fdbserver/BackupInterface.h"
#include "fdbserver/DataDistributorInterface.h"
#include "fdbserver/MasterInterface.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/Knobs.h"
// This interface and its serialization depend on slicing, since the client will deserialize only the first part of this structure
struct ClusterControllerFullInterface {
constexpr static FileIdentifier file_identifier =
ClusterControllerClientInterface::file_identifier;
ClusterInterface clientInterface;
RequestStream< struct RecruitFromConfigurationRequest > recruitFromConfiguration;
RequestStream< struct RecruitRemoteFromConfigurationRequest > recruitRemoteFromConfiguration;
RequestStream< struct RecruitStorageRequest > recruitStorage;
RequestStream< struct RegisterWorkerRequest > registerWorker;
RequestStream< struct GetWorkersRequest > getWorkers;
RequestStream< struct RegisterMasterRequest > registerMaster;
RequestStream< struct GetServerDBInfoRequest > getServerDBInfo;
UID id() const { return clientInterface.id(); }
bool operator == (ClusterControllerFullInterface const& r) const { return id() == r.id(); }
bool operator != (ClusterControllerFullInterface const& r) const { return id() != r.id(); }
bool hasMessage() {
return clientInterface.hasMessage() ||
recruitFromConfiguration.getFuture().isReady() ||
recruitRemoteFromConfiguration.getFuture().isReady() ||
recruitStorage.getFuture().isReady() ||
registerWorker.getFuture().isReady() ||
getWorkers.getFuture().isReady() ||
registerMaster.getFuture().isReady() ||
getServerDBInfo.getFuture().isReady();
}
void initEndpoints() {
clientInterface.initEndpoints();
recruitFromConfiguration.getEndpoint( TaskPriority::ClusterControllerRecruit );
recruitRemoteFromConfiguration.getEndpoint( TaskPriority::ClusterControllerRecruit );
recruitStorage.getEndpoint( TaskPriority::ClusterController );
registerWorker.getEndpoint( TaskPriority::ClusterControllerWorker );
getWorkers.getEndpoint( TaskPriority::ClusterController );
registerMaster.getEndpoint( TaskPriority::ClusterControllerRegister );
getServerDBInfo.getEndpoint( TaskPriority::ClusterController );
}
template <class Ar>
void serialize(Ar& ar) {
if constexpr (!is_fb_function<Ar>) {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, clientInterface, recruitFromConfiguration, recruitRemoteFromConfiguration, recruitStorage,
registerWorker, getWorkers, registerMaster, getServerDBInfo);
}
};
struct RecruitFromConfigurationReply {
constexpr static FileIdentifier file_identifier = 2224085;
std::vector<WorkerInterface> backupWorkers;
std::vector<WorkerInterface> tLogs;
std::vector<WorkerInterface> satelliteTLogs;
std::vector<WorkerInterface> proxies;
std::vector<WorkerInterface> resolvers;
std::vector<WorkerInterface> storageServers;
std::vector<WorkerInterface> oldLogRouters;
Optional<Key> dcId;
bool satelliteFallback;
RecruitFromConfigurationReply() : satelliteFallback(false) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, tLogs, satelliteTLogs, proxies, resolvers, storageServers, oldLogRouters, dcId,
satelliteFallback, backupWorkers);
}
};
struct RecruitFromConfigurationRequest {
constexpr static FileIdentifier file_identifier = 2023046;
DatabaseConfiguration configuration;
bool recruitSeedServers;
int maxOldLogRouters;
ReplyPromise< struct RecruitFromConfigurationReply > reply;
RecruitFromConfigurationRequest() {}
explicit RecruitFromConfigurationRequest(DatabaseConfiguration const& configuration, bool recruitSeedServers, int maxOldLogRouters)
: configuration(configuration), recruitSeedServers(recruitSeedServers), maxOldLogRouters(maxOldLogRouters) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, configuration, recruitSeedServers, maxOldLogRouters, reply);
}
};
struct RecruitRemoteFromConfigurationReply {
constexpr static FileIdentifier file_identifier = 9091392;
std::vector<WorkerInterface> remoteTLogs;
std::vector<WorkerInterface> logRouters;
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, remoteTLogs, logRouters);
}
};
struct RecruitRemoteFromConfigurationRequest {
constexpr static FileIdentifier file_identifier = 3235995;
DatabaseConfiguration configuration;
Optional<Key> dcId;
int logRouterCount;
std::vector<UID> exclusionWorkerIds;
ReplyPromise< struct RecruitRemoteFromConfigurationReply > reply;
RecruitRemoteFromConfigurationRequest() {}
RecruitRemoteFromConfigurationRequest(DatabaseConfiguration const& configuration, Optional<Key> const& dcId, int logRouterCount, const std::vector<UID> &exclusionWorkerIds) : configuration(configuration), dcId(dcId), logRouterCount(logRouterCount), exclusionWorkerIds(exclusionWorkerIds){}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, configuration, dcId, logRouterCount, exclusionWorkerIds, reply);
}
};
struct RecruitStorageReply {
constexpr static FileIdentifier file_identifier = 15877089;
WorkerInterface worker;
ProcessClass processClass;
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, worker, processClass);
}
};
struct RecruitStorageRequest {
constexpr static FileIdentifier file_identifier = 905920;
std::vector<Optional<Standalone<StringRef>>> excludeMachines; //< Don't recruit any of these machines
std::vector<AddressExclusion> excludeAddresses; //< Don't recruit any of these addresses
std::vector<Optional<Standalone<StringRef>>> includeDCs;
bool criticalRecruitment; //< True if machine classes are to be ignored
ReplyPromise< RecruitStorageReply > reply;
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, excludeMachines, excludeAddresses, includeDCs, criticalRecruitment, reply);
}
};
struct RegisterWorkerReply {
constexpr static FileIdentifier file_identifier = 16475696;
ProcessClass processClass;
ClusterControllerPriorityInfo priorityInfo;
Optional<uint16_t> storageCache;
RegisterWorkerReply() : priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
RegisterWorkerReply(ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, Optional<uint16_t> storageCache) : processClass(processClass), priorityInfo(priorityInfo), storageCache(storageCache) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, processClass, priorityInfo, storageCache);
}
};
struct RegisterWorkerRequest {
constexpr static FileIdentifier file_identifier = 14332605;
WorkerInterface wi;
ProcessClass initialClass;
ProcessClass processClass;
ClusterControllerPriorityInfo priorityInfo;
Generation generation;
Optional<DataDistributorInterface> distributorInterf;
Optional<RatekeeperInterface> ratekeeperInterf;
Optional<std::pair<uint16_t,StorageServerInterface>> storageCacheInterf;
ReplyPromise<RegisterWorkerReply> reply;
bool degraded;
RegisterWorkerRequest() : priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown), degraded(false) {}
RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, Generation generation, Optional<DataDistributorInterface> ddInterf, Optional<RatekeeperInterface> rkInterf, Optional<std::pair<uint16_t,StorageServerInterface>> storageCacheInterf, bool degraded) :
wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo), generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), storageCacheInterf(storageCacheInterf), degraded(degraded) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, wi, initialClass, processClass, priorityInfo, generation, distributorInterf, ratekeeperInterf, storageCacheInterf, reply, degraded);
}
};
struct GetWorkersRequest {
constexpr static FileIdentifier file_identifier = 1254174;
enum { TESTER_CLASS_ONLY = 0x1, NON_EXCLUDED_PROCESSES_ONLY = 0x2 };
int flags;
ReplyPromise<vector<WorkerDetails>> reply;
GetWorkersRequest() : flags(0) {}
explicit GetWorkersRequest(int fl) : flags(fl) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, flags, reply);
}
};
struct RegisterMasterRequest {
constexpr static FileIdentifier file_identifier = 10773445;
UID id;
LocalityData mi;
LogSystemConfig logSystemConfig;
std::vector<MasterProxyInterface> proxies;
std::vector<ResolverInterface> resolvers;
DBRecoveryCount recoveryCount;
int64_t registrationCount;
Optional<DatabaseConfiguration> configuration;
std::vector<UID> priorCommittedLogServers;
RecoveryState recoveryState;
bool recoveryStalled;
ReplyPromise<Void> reply;
RegisterMasterRequest() : logSystemConfig(0) {}
template <class Ar>
void serialize(Ar& ar) {
if constexpr (!is_fb_function<Ar>) {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, id, mi, logSystemConfig, proxies, resolvers, recoveryCount, registrationCount, configuration,
priorCommittedLogServers, recoveryState, recoveryStalled, reply);
}
};
#include "fdbserver/ServerDBInfo.h" // include order hack
#endif

View File

@ -1020,7 +1020,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
TraceEvent(SevWarnAlways, "MissingLocality")
.detail("Server", i->first.uniqueID)
.detail("Locality", i->first.locality.toString());
auto addr = i->first.address();
auto addr = i->first.stableAddress();
self->invalidLocalityAddr.insert(AddressExclusion(addr.ip, addr.port));
if (self->checkInvalidLocalities.isReady()) {
self->checkInvalidLocalities = checkAndRemoveInvalidLocalityAddr(self);
@ -2914,6 +2914,14 @@ bool teamContainsFailedServer(DDTeamCollection* self, Reference<TCTeamInfo> team
self->excludedServers.get(ipaddr) == DDTeamCollection::Status::FAILED) {
return true;
}
if(ssi.secondaryAddress().present()) {
AddressExclusion saddr(ssi.secondaryAddress().get().ip, ssi.secondaryAddress().get().port);
AddressExclusion sipaddr(ssi.secondaryAddress().get().ip);
if (self->excludedServers.get(saddr) == DDTeamCollection::Status::FAILED ||
self->excludedServers.get(sipaddr) == DDTeamCollection::Status::FAILED) {
return true;
}
}
}
return false;
}
@ -3626,29 +3634,41 @@ ACTOR Future<Void> storageServerTracker(
// If the storage server is in the excluded servers list, it is undesired
NetworkAddress a = server->lastKnownInterface.address();
state AddressExclusion addr( a.ip, a.port );
state AddressExclusion ipaddr( a.ip );
state DDTeamCollection::Status addrStatus = self->excludedServers.get(addr);
state DDTeamCollection::Status ipaddrStatus = self->excludedServers.get(ipaddr);
if (addrStatus != DDTeamCollection::Status::NONE || ipaddrStatus != DDTeamCollection::Status::NONE) {
AddressExclusion worstAddr( a.ip, a.port );
DDTeamCollection::Status worstStatus = self->excludedServers.get( worstAddr );
otherChanges.push_back( self->excludedServers.onChange( worstAddr ) );
for(int i = 0; i < 3; i++) {
if(i > 0 && !server->lastKnownInterface.secondaryAddress().present()) {
break;
}
AddressExclusion testAddr;
if(i == 0) testAddr = AddressExclusion(a.ip);
else if(i == 1) testAddr = AddressExclusion(server->lastKnownInterface.secondaryAddress().get().ip, server->lastKnownInterface.secondaryAddress().get().port);
else if(i == 2) testAddr = AddressExclusion(server->lastKnownInterface.secondaryAddress().get().ip);
DDTeamCollection::Status testStatus = self->excludedServers.get(testAddr);
if(testStatus > worstStatus) {
worstStatus = testStatus;
worstAddr = testAddr;
}
otherChanges.push_back( self->excludedServers.onChange( testAddr ) );
}
if (worstStatus != DDTeamCollection::Status::NONE) {
TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId)
.detail("Server", server->id)
.detail("Excluded",
ipaddrStatus == DDTeamCollection::Status::NONE ? addr.toString() : ipaddr.toString());
.detail("Server", server->id)
.detail("Excluded", worstAddr.toString());
status.isUndesired = true;
status.isWrongConfiguration = true;
if (addrStatus == DDTeamCollection::Status::FAILED ||
ipaddrStatus == DDTeamCollection::Status::FAILED) {
if (worstStatus == DDTeamCollection::Status::FAILED) {
TraceEvent(SevWarn, "FailedServerRemoveKeys", self->distributorId)
.detail("Address", addr.toString())
.detail("ServerID", server->id);
.detail("Server", server->id)
.detail("Excluded", worstAddr.toString());
wait(removeKeysFromFailedServer(cx, server->id, self->lock));
if (BUGGIFY) wait(delay(5.0));
self->shardsAffectedByTeamFailure->eraseServer(server->id);
}
}
otherChanges.push_back( self->excludedServers.onChange( addr ) );
otherChanges.push_back( self->excludedServers.onChange( ipaddr ) );
failureTracker = storageServerFailureTracker(self, server, cx, &status, addedVersion);
//We need to recruit new storage servers if the key value store type has changed
@ -3921,7 +3941,7 @@ ACTOR Future<Void> checkAndRemoveInvalidLocalityAddr(DDTeamCollection* self) {
int numExistingSSOnAddr(DDTeamCollection* self, const AddressExclusion& addr) {
int numExistingSS = 0;
for (auto& server : self->server_info) {
const NetworkAddress& netAddr = server.second->lastKnownInterface.address();
const NetworkAddress& netAddr = server.second->lastKnownInterface.stableAddress();
AddressExclusion usedAddr(netAddr.ip, netAddr.port);
if (usedAddr == addr) {
++numExistingSS;
@ -3935,10 +3955,10 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self, RecruitStorageReply
// SOMEDAY: Cluster controller waits for availability, retry quickly if a server's Locality changes
self->recruitingStream.set(self->recruitingStream.get() + 1);
const NetworkAddress& netAddr = candidateWorker.worker.address();
const NetworkAddress& netAddr = candidateWorker.worker.stableAddress();
AddressExclusion workerAddr(netAddr.ip, netAddr.port);
if (numExistingSSOnAddr(self, workerAddr) <= 2 &&
self->recruitingLocalities.find(candidateWorker.worker.address()) == self->recruitingLocalities.end()) {
self->recruitingLocalities.find(candidateWorker.worker.stableAddress()) == self->recruitingLocalities.end()) {
// Only allow at most 2 storage servers on an address, because
// too many storage server on the same address (i.e., process) can cause OOM.
// Ask the candidateWorker to initialize a SS only if the worker does not have a pending request
@ -3959,7 +3979,7 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self, RecruitStorageReply
.detail("RecruitingStream", self->recruitingStream.get());
self->recruitingIds.insert(interfaceId);
self->recruitingLocalities.insert(candidateWorker.worker.address());
self->recruitingLocalities.insert(candidateWorker.worker.stableAddress());
state ErrorOr<InitializeStorageReply> newServer =
wait(candidateWorker.worker.storage.tryGetReply(isr, TaskPriority::DataDistribution));
if (newServer.isError()) {
@ -3970,7 +3990,7 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self, RecruitStorageReply
wait(delay(SERVER_KNOBS->STORAGE_RECRUITMENT_DELAY, TaskPriority::DataDistribution));
}
self->recruitingIds.erase(interfaceId);
self->recruitingLocalities.erase(candidateWorker.worker.address());
self->recruitingLocalities.erase(candidateWorker.worker.stableAddress());
TraceEvent("DDRecruiting")
.detail("Primary", self->primary)
@ -4016,7 +4036,7 @@ ACTOR Future<Void> storageRecruiter( DDTeamCollection* self, Reference<AsyncVar<
TraceEvent(SevDebug, "DDRecruitExcl1")
.detail("Primary", self->primary)
.detail("Excluding", s->second->lastKnownInterface.address());
auto addr = s->second->lastKnownInterface.address();
auto addr = s->second->lastKnownInterface.stableAddress();
AddressExclusion addrExcl(addr.ip, addr.port);
exclusions.insert(addrExcl);
numSSPerAddr[addrExcl]++; // increase from 0
@ -4067,8 +4087,8 @@ ACTOR Future<Void> storageRecruiter( DDTeamCollection* self, Reference<AsyncVar<
choose {
when( RecruitStorageReply candidateWorker = wait( fCandidateWorker ) ) {
AddressExclusion candidateSSAddr(candidateWorker.worker.address().ip,
candidateWorker.worker.address().port);
AddressExclusion candidateSSAddr(candidateWorker.worker.stableAddress().ip,
candidateWorker.worker.stableAddress().port);
int numExistingSS = numSSPerAddr[candidateSSAddr];
if (numExistingSS >= 2) {
TraceEvent(SevWarnAlways, "StorageRecruiterTooManySSOnSameAddr", self->distributorId)
@ -4802,7 +4822,7 @@ ACTOR Future<Void> ddExclusionSafetyCheck(DistributorExclusionSafetyCheckRequest
// Go through storage server interfaces and translate Address -> server ID (UID)
for (const AddressExclusion& excl : req.exclusions) {
for (const auto& ssi : ssis) {
if (excl.excludes(ssi.address())) {
if (excl.excludes(ssi.address()) || (ssi.secondaryAddress().present() && excl.excludes(ssi.secondaryAddress().get()))) {
excludeServerIDs.push_back(ssi.id());
}
}

View File

@ -25,7 +25,6 @@
#define FDBSERVER_DATA_DISTRIBUTION_ACTOR_H
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/LogSystem.h"
#include "flow/actorcompiler.h" // This must be the last #include.

View File

@ -349,6 +349,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( MAX_PROXY_COMPUTE, 2.0 );
init( PROXY_COMPUTE_BUCKETS, 20000 );
init( PROXY_COMPUTE_GROWTH_RATE, 0.01 );
init( TXN_STATE_SEND_AMOUNT, 2 );
// Master Server
// masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution)
@ -416,6 +417,8 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( POLICY_RATING_TESTS, 200 ); if( randomize && BUGGIFY ) POLICY_RATING_TESTS = 20;
init( POLICY_GENERATIONS, 100 ); if( randomize && BUGGIFY ) POLICY_GENERATIONS = 10;
init( DBINFO_SEND_AMOUNT, 2 );
init( DBINFO_BATCH_DELAY, 0.1 );
//Move Keys
init( SHARD_READY_DELAY, 0.25 );
@ -527,13 +530,13 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
//Worker
init( WORKER_LOGGING_INTERVAL, 5.0 );
init( INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING, 5.0 );
init( HEAP_PROFILER_INTERVAL, 30.0 );
init( DEGRADED_RESET_INTERVAL, 24*60*60 ); if ( randomize && BUGGIFY ) DEGRADED_RESET_INTERVAL = 10;
init( DEGRADED_WARNING_LIMIT, 1 );
init( DEGRADED_WARNING_RESET_DELAY, 7*24*60*60 );
init( TRACE_LOG_FLUSH_FAILURE_CHECK_INTERVAL_SECONDS, 10 );
init( TRACE_LOG_PING_TIMEOUT_SECONDS, 5.0 );
init( DBINFO_FAILED_DELAY, 1.0 );
// Test harness
init( WORKER_POLL_DELAY, 1.0 );

View File

@ -286,6 +286,7 @@ public:
double MAX_PROXY_COMPUTE;
int PROXY_COMPUTE_BUCKETS;
double PROXY_COMPUTE_GROWTH_RATE;
int TXN_STATE_SEND_AMOUNT;
// Master Server
double COMMIT_SLEEP_TIME;
@ -350,6 +351,8 @@ public:
int EXPECTED_PROXY_FITNESS;
int EXPECTED_RESOLVER_FITNESS;
double RECRUITMENT_TIMEOUT;
int DBINFO_SEND_AMOUNT;
double DBINFO_BATCH_DELAY;
//Move Keys
double SHARD_READY_DELAY;
@ -462,13 +465,13 @@ public:
//Worker
double WORKER_LOGGING_INTERVAL;
double INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING;
double HEAP_PROFILER_INTERVAL;
double DEGRADED_RESET_INTERVAL;
double DEGRADED_WARNING_LIMIT;
double DEGRADED_WARNING_RESET_DELAY;
int64_t TRACE_LOG_FLUSH_FAILURE_CHECK_INTERVAL_SECONDS;
double TRACE_LOG_PING_TIMEOUT_SECONDS;
double DBINFO_FAILED_DELAY;
// Test harness
double WORKER_POLL_DELAY;

View File

@ -20,7 +20,6 @@
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/Locality.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbclient/MonitorLeader.h"
#include "flow/actorcompiler.h" // This must be the last #include.

View File

@ -40,6 +40,7 @@ struct MasterInterface {
RequestStream<struct BackupWorkerDoneRequest> notifyBackupWorkerDone;
NetworkAddress address() const { return changeCoordinators.getEndpoint().getPrimaryAddress(); }
NetworkAddressList addresses() const { return changeCoordinators.getEndpoint().addresses; }
UID id() const { return changeCoordinators.getEndpoint().token; }
template <class Archive>

View File

@ -48,6 +48,29 @@
#include "flow/TDMetric.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR Future<Void> broadcastTxnRequest(TxnStateRequest req, int sendAmount, bool sendReply) {
state ReplyPromise<Void> reply = req.reply;
resetReply( req );
std::vector<Future<Void>> replies;
int currentStream = 0;
std::vector<Endpoint> broadcastEndpoints = req.broadcastInfo;
for(int i = 0; i < sendAmount && currentStream < broadcastEndpoints.size(); i++) {
std::vector<Endpoint> endpoints;
RequestStream<TxnStateRequest> cur(broadcastEndpoints[currentStream++]);
while(currentStream < broadcastEndpoints.size()*(i+1)/sendAmount) {
endpoints.push_back(broadcastEndpoints[currentStream++]);
}
req.broadcastInfo = endpoints;
replies.push_back(brokenPromiseToNever( cur.getReply( req ) ));
resetReply( req );
}
wait( waitForAll(replies) );
if(sendReply) {
reply.send(Void());
}
return Void();
}
struct ProxyStats {
CounterCollection cc;
Counter txnRequestIn, txnRequestOut, txnRequestErrors;
@ -1954,7 +1977,7 @@ ACTOR Future<Void> masterProxyServerCore(
when(ExclusionSafetyCheckRequest exclCheckReq = waitNext(proxy.exclusionSafetyCheckReq.getFuture())) {
addActor.send(proxyCheckSafeExclusion(db, exclCheckReq));
}
when(TxnStateRequest req = waitNext(proxy.txnState.getFuture())) {
when(state TxnStateRequest req = waitNext(proxy.txnState.getFuture())) {
state ReplyPromise<Void> reply = req.reply;
if(req.last) maxSequence = req.sequence + 1;
if (!txnSequences.count(req.sequence)) {
@ -2022,7 +2045,7 @@ ACTOR Future<Void> masterProxyServerCore(
commitData.txnStateStore->enableSnapshot();
}
}
reply.send(Void());
addActor.send(broadcastTxnRequest(req, SERVER_KNOBS->TXN_STATE_SEND_AMOUNT, true));
wait(yield());
}
}

View File

@ -774,6 +774,7 @@ ACTOR Future<std::pair<Version, Tag>> addStorageServer( Database cx, StorageServ
try {
state Future<Standalone<RangeResultRef>> fTagLocalities = tr.getRange( tagLocalityListKeys, CLIENT_KNOBS->TOO_MANY );
state Future<Optional<Value>> fv = tr.get( serverListKeyFor(server.id()) );
state Future<Optional<Value>> fExclProc = tr.get(
StringRef(encodeExcludedServersKey( AddressExclusion( server.address().ip, server.address().port ))) );
state Future<Optional<Value>> fExclIP = tr.get(
@ -782,14 +783,28 @@ ACTOR Future<std::pair<Version, Tag>> addStorageServer( Database cx, StorageServ
StringRef(encodeFailedServersKey( AddressExclusion( server.address().ip, server.address().port ))) );
state Future<Optional<Value>> fFailIP = tr.get(
StringRef(encodeFailedServersKey( AddressExclusion( server.address().ip ))) );
state Future<Optional<Value>> fExclProc2 = server.secondaryAddress().present() ? tr.get(
StringRef(encodeExcludedServersKey( AddressExclusion( server.secondaryAddress().get().ip, server.secondaryAddress().get().port ))) ) : Future<Optional<Value>>( Optional<Value>() );
state Future<Optional<Value>> fExclIP2 = server.secondaryAddress().present() ? tr.get(
StringRef(encodeExcludedServersKey( AddressExclusion( server.secondaryAddress().get().ip ))) ) : Future<Optional<Value>>( Optional<Value>() );
state Future<Optional<Value>> fFailProc2 = server.secondaryAddress().present() ? tr.get(
StringRef(encodeFailedServersKey( AddressExclusion( server.secondaryAddress().get().ip, server.secondaryAddress().get().port ))) ) : Future<Optional<Value>>( Optional<Value>() );
state Future<Optional<Value>> fFailIP2 = server.secondaryAddress().present() ? tr.get(
StringRef(encodeFailedServersKey( AddressExclusion( server.secondaryAddress().get().ip ))) ) : Future<Optional<Value>>( Optional<Value>() );
state Future<Standalone<RangeResultRef>> fTags = tr.getRange( serverTagKeys, CLIENT_KNOBS->TOO_MANY, true);
state Future<Standalone<RangeResultRef>> fHistoryTags = tr.getRange( serverTagHistoryKeys, CLIENT_KNOBS->TOO_MANY, true);
wait( success(fTagLocalities) && success(fv) && success(fExclProc) && success(fExclIP) && success(fFailProc) && success(fFailIP) && success(fTags) && success(fHistoryTags) );
wait( success(fTagLocalities) && success(fv) && success(fTags) && success(fHistoryTags) &&
success(fExclProc) && success(fExclIP) && success(fFailProc) && success(fFailIP) &&
success(fExclProc2) && success(fExclIP2) && success(fFailProc2) && success(fFailIP2) );
// If we have been added to the excluded/failed state servers list, we have to fail
if (fExclProc.get().present() || fExclIP.get().present() || fFailProc.get().present() || fFailIP.get().present() )
if (fExclProc.get().present() || fExclIP.get().present() || fFailProc.get().present() || fFailIP.get().present() ||
fExclProc2.get().present() || fExclIP2.get().present() || fFailProc2.get().present() || fFailIP2.get().present() ) {
throw recruitment_failed();
}
if(fTagLocalities.get().more || fTags.get().more || fHistoryTags.get().more)
ASSERT(false);

View File

@ -22,13 +22,13 @@
#define FDBSERVER_SERVERDBINFO_H
#pragma once
#include "fdbserver/ClusterRecruitmentInterface.h"
#include "fdbserver/DataDistributorInterface.h"
#include "fdbserver/MasterInterface.h"
#include "fdbserver/LogSystemConfig.h"
#include "fdbserver/RatekeeperInterface.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/LatencyBandConfig.h"
#include "fdbserver/WorkerInterface.actor.h"
struct ServerDBInfo {
constexpr static FileIdentifier file_identifier = 13838807;
@ -51,29 +51,45 @@ struct ServerDBInfo {
std::vector<UID> priorCommittedLogServers; // If !fullyRecovered and logSystemConfig refers to a new log system which may not have been committed to the coordinated state yet, then priorCommittedLogServers are the previous, fully committed generation which need to stay alive in case this recovery fails
Optional<LatencyBandConfig> latencyBandConfig;
std::vector<std::pair<uint16_t,StorageServerInterface>> storageCaches;
int64_t infoGeneration;
explicit ServerDBInfo() : recoveryCount(0), recoveryState(RecoveryState::UNINITIALIZED), logSystemConfig(0) {}
ServerDBInfo() : recoveryCount(0), recoveryState(RecoveryState::UNINITIALIZED), logSystemConfig(0), infoGeneration(0) {}
bool operator == (ServerDBInfo const& r) const { return id == r.id; }
bool operator != (ServerDBInfo const& r) const { return id != r.id; }
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, id, clusterInterface, client, distributor, master, ratekeeper, resolvers, recoveryCount, recoveryState, masterLifetime, logSystemConfig, priorCommittedLogServers, latencyBandConfig, storageCaches);
serializer(ar, id, clusterInterface, client, distributor, master, ratekeeper, resolvers, recoveryCount, recoveryState, masterLifetime, logSystemConfig, priorCommittedLogServers, latencyBandConfig, storageCaches, infoGeneration);
}
};
struct UpdateServerDBInfoRequest {
constexpr static FileIdentifier file_identifier = 9467438;
Standalone<StringRef> serializedDbInfo;
std::vector<Endpoint> broadcastInfo;
ReplyPromise<std::vector<Endpoint>> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, serializedDbInfo, broadcastInfo, reply);
}
};
struct GetServerDBInfoRequest {
constexpr static FileIdentifier file_identifier = 9467438;
constexpr static FileIdentifier file_identifier = 9467439;
UID knownServerInfoID;
Standalone<VectorRef<StringRef>> issues;
std::vector<NetworkAddress> incompatiblePeers;
ReplyPromise< CachedSerialization<struct ServerDBInfo> > reply;
ReplyPromise<struct ServerDBInfo> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, knownServerInfoID, issues, incompatiblePeers, reply);
serializer(ar, knownServerInfoID, reply);
}
};
Future<Void> broadcastTxnRequest(TxnStateRequest const& req, int const& sendAmount, bool const& sendReply);
Future<std::vector<Endpoint>> broadcastDBInfoRequest(UpdateServerDBInfoRequest const& req, int const& sendAmount, Optional<Endpoint> const& sender, bool const& sendReply);
#endif

View File

@ -25,7 +25,6 @@
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbclient/ClusterInterface.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbmonitor/SimpleIni.h"
#include "fdbrpc/AsyncFileNonDurable.actor.h"

View File

@ -25,7 +25,6 @@
#include "fdbclient/SystemData.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
#include <time.h>
#include "fdbserver/CoordinationInterface.h"
#include "fdbserver/DataDistribution.actor.h"
@ -35,28 +34,6 @@
#include "fdbclient/JsonBuilder.h"
#include "flow/actorcompiler.h" // This must be the last #include.
void setIssues(ProcessIssuesMap& issueMap, NetworkAddress const& addr, VectorRef<StringRef> const& issues,
Optional<UID>& issueID) {
if (issues.size()) {
auto& e = issueMap[addr];
e.first = issues;
e.second = deterministicRandom()->randomUniqueID();
issueID = e.second;
} else {
issueMap.erase(addr);
issueID = Optional<UID>();
}
}
void removeIssues(ProcessIssuesMap& issueMap, NetworkAddress const& addr, Optional<UID>& issueID) {
if (!issueID.present()) {
return;
}
if (issueMap.count(addr) && issueMap[addr].second == issueID.get()) {
issueMap.erase( addr );
}
}
const char* RecoveryStatus::names[] = {
"reading_coordinated_state", "locking_coordinated_state", "locking_old_transaction_servers", "reading_transaction_system_state",
"configuration_missing", "configuration_never_created", "configuration_invalid",
@ -364,7 +341,10 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, vector<Work
machineJsonMap[machineId] = statusObj;
}
if (configuration.present() && !configuration.get().isExcludedServer(it->first))
//FIXME: this will not catch if the secondary address of the process was excluded
NetworkAddressList tempList;
tempList.address = it->first;
if (configuration.present() && !configuration.get().isExcludedServer(tempList))
notExcludedMap[machineId] = false;
workerContribMap[machineId] ++;
}
@ -569,7 +549,7 @@ struct RolesInfo {
};
ACTOR static Future<JsonBuilderObject> processStatusFetcher(
Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> db, std::vector<WorkerDetails> workers, WorkerEvents pMetrics,
Reference<AsyncVar<ServerDBInfo>> db, std::vector<WorkerDetails> workers, WorkerEvents pMetrics,
WorkerEvents mMetrics, WorkerEvents nMetrics, WorkerEvents errors, WorkerEvents traceFileOpenErrors,
WorkerEvents programStarts, std::map<std::string, std::vector<JsonBuilderObject>> processIssues,
vector<std::pair<StorageServerInterface, EventMap>> storageServers,
@ -627,18 +607,18 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
state RolesInfo roles;
roles.addRole("master", db->get().read().master);
roles.addRole("cluster_controller", db->get().read().clusterInterface.clientInterface);
roles.addRole("master", db->get().master);
roles.addRole("cluster_controller", db->get().clusterInterface.clientInterface);
if (db->get().read().distributor.present()) {
roles.addRole("data_distributor", db->get().read().distributor.get());
if (db->get().distributor.present()) {
roles.addRole("data_distributor", db->get().distributor.get());
}
if (db->get().read().ratekeeper.present()) {
roles.addRole("ratekeeper", db->get().read().ratekeeper.get());
if (db->get().ratekeeper.present()) {
roles.addRole("ratekeeper", db->get().ratekeeper.get());
}
for(auto& tLogSet : db->get().read().logSystemConfig.tLogs) {
for(auto& tLogSet : db->get().logSystemConfig.tLogs) {
for(auto& it : tLogSet.logRouters) {
if(it.present()) {
roles.addRole("router", it.interf());
@ -646,7 +626,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
}
}
for(auto& old : db->get().read().logSystemConfig.oldTLogs) {
for(auto& old : db->get().logSystemConfig.oldTLogs) {
for(auto& tLogSet : old.tLogs) {
for(auto& it : tLogSet.logRouters) {
if(it.present()) {
@ -689,7 +669,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
}
state std::vector<ResolverInterface>::const_iterator res;
state std::vector<ResolverInterface> resolvers = db->get().read().resolvers;
state std::vector<ResolverInterface> resolvers = db->get().resolvers;
for(res = resolvers.begin(); res != resolvers.end(); ++res) {
roles.addRole( "resolver", *res );
wait(yield());
@ -850,7 +830,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
statusObj["roles"] = roles.getStatusForAddress(address);
if (configuration.present()){
statusObj["excluded"] = configuration.get().isExcludedServer(address);
statusObj["excluded"] = configuration.get().isExcludedServer(workerItr->interf.addresses());
}
statusObj["class_type"] = workerItr->processClass.toString();
@ -1551,17 +1531,17 @@ ACTOR static Future<vector<std::pair<StorageServerInterface, EventMap>>> getStor
return results;
}
ACTOR static Future<vector<std::pair<TLogInterface, EventMap>>> getTLogsAndMetrics(Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> db, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
vector<TLogInterface> servers = db->get().read().logSystemConfig.allPresentLogs();
ACTOR static Future<vector<std::pair<TLogInterface, EventMap>>> getTLogsAndMetrics(Reference<AsyncVar<ServerDBInfo>> db, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
vector<TLogInterface> servers = db->get().logSystemConfig.allPresentLogs();
vector<std::pair<TLogInterface, EventMap>> results =
wait(getServerMetrics(servers, address_workers, std::vector<std::string>{ "TLogMetrics" }));
return results;
}
ACTOR static Future<vector<std::pair<MasterProxyInterface, EventMap>>> getProxiesAndMetrics(Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> db, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
ACTOR static Future<vector<std::pair<MasterProxyInterface, EventMap>>> getProxiesAndMetrics(Reference<AsyncVar<ServerDBInfo>> db, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
vector<std::pair<MasterProxyInterface, EventMap>> results = wait(getServerMetrics(
db->get().read().client.proxies, address_workers, std::vector<std::string>{ "GRVLatencyMetrics", "CommitLatencyMetrics" }));
db->get().client.proxies, address_workers, std::vector<std::string>{ "GRVLatencyMetrics", "CommitLatencyMetrics" }));
return results;
}
@ -1571,7 +1551,7 @@ static int getExtraTLogEligibleZones(const vector<WorkerDetails>& workers, const
std::map<Key,std::set<StringRef>> dcId_zone;
for(auto const& worker : workers) {
if(worker.processClass.machineClassFitness(ProcessClass::TLog) < ProcessClass::NeverAssign
&& !configuration.isExcludedServer(worker.interf.address()))
&& !configuration.isExcludedServer(worker.interf.addresses()))
{
allZones.insert(worker.interf.locality.zoneId().get());
if(worker.interf.locality.dcId().present()) {
@ -1629,7 +1609,7 @@ JsonBuilderObject getPerfLimit(TraceEventFields const& ratekeeper, double transP
return perfLimit;
}
ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> db, vector<WorkerDetails> workers, WorkerDetails mWorker, WorkerDetails rkWorker,
ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<ServerDBInfo>> db, vector<WorkerDetails> workers, WorkerDetails mWorker, WorkerDetails rkWorker,
JsonBuilderObject *qos, JsonBuilderObject *data_overlay, std::set<std::string> *incomplete_reasons, Future<ErrorOr<vector<std::pair<StorageServerInterface, EventMap>>>> storageServerFuture)
{
state JsonBuilderObject statusObj;
@ -1644,7 +1624,7 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
for (auto const& w : workers) {
workersMap[w.interf.address()] = w;
}
for (auto &p : db->get().read().client.proxies) {
for (auto &p : db->get().client.proxies) {
auto worker = getWorker(workersMap, p.address());
if (worker.present())
proxyStatFutures.push_back(timeoutError(worker.get().interf.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("ProxyMetrics"))), 1.0));
@ -1859,11 +1839,11 @@ ACTOR static Future<JsonBuilderObject> clusterSummaryStatisticsFetcher(WorkerEve
return statusObj;
}
static JsonBuilderArray oldTlogFetcher(int* oldLogFaultTolerance, Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> db, std::unordered_map<NetworkAddress, WorkerInterface> const& address_workers) {
static JsonBuilderArray oldTlogFetcher(int* oldLogFaultTolerance, Reference<AsyncVar<ServerDBInfo>> db, std::unordered_map<NetworkAddress, WorkerInterface> const& address_workers) {
JsonBuilderArray oldTlogsArray;
if(db->get().read().recoveryState >= RecoveryState::ACCEPTING_COMMITS) {
for(auto it : db->get().read().logSystemConfig.oldTLogs) {
if(db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS) {
for(auto it : db->get().logSystemConfig.oldTLogs) {
JsonBuilderObject statusObj;
JsonBuilderArray logsObj;
Optional<int32_t> sat_log_replication_factor, sat_log_write_anti_quorum, sat_log_fault_tolerance, log_replication_factor, log_write_anti_quorum, log_fault_tolerance, remote_log_replication_factor, remote_log_fault_tolerance;
@ -1986,15 +1966,14 @@ static std::string getIssueDescription(std::string name) {
}
static std::map<std::string, std::vector<JsonBuilderObject>> getProcessIssuesAsMessages(
ProcessIssuesMap const& _issues) {
std::vector<ProcessIssues> const& issues) {
std::map<std::string, std::vector<JsonBuilderObject>> issuesMap;
try {
ProcessIssuesMap issues = _issues;
for (auto processIssues : issues) {
for (auto issue : processIssues.second.first) {
for (auto issue : processIssues.issues) {
std::string issueStr = issue.toString();
issuesMap[processIssues.first.toString()].push_back(
issuesMap[processIssues.address.toString()].push_back(
JsonString::makeMessage(issueStr.c_str(), getIssueDescription(issueStr).c_str()));
}
}
@ -2109,7 +2088,7 @@ ACTOR Future<JsonBuilderObject> layerStatusFetcher(Database cx, JsonBuilderArray
return statusObj;
}
ACTOR Future<JsonBuilderObject> lockedStatusFetcher(Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> db, JsonBuilderArray *messages, std::set<std::string> *incomplete_reasons) {
ACTOR Future<JsonBuilderObject> lockedStatusFetcher(Reference<AsyncVar<ServerDBInfo>> db, JsonBuilderArray *messages, std::set<std::string> *incomplete_reasons) {
state JsonBuilderObject statusObj;
state Database cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, false); // Open a new database connection that isn't lock-aware
@ -2181,10 +2160,10 @@ ACTOR Future<Optional<Value>> getActivePrimaryDC(Database cx, JsonBuilderArray*
// constructs the cluster section of the json status output
ACTOR Future<StatusReply> clusterGetStatus(
Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> db,
Reference<AsyncVar<ServerDBInfo>> db,
Database cx,
vector<WorkerDetails> workers,
ProcessIssuesMap workerIssues,
std::vector<ProcessIssues> workerIssues,
std::map<NetworkAddress, std::pair<double, OpenDatabaseRequest>>* clientStatus,
ServerCoordinators coordinators,
std::vector<NetworkAddress> incompatibleConnections,
@ -2201,7 +2180,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
try {
// Get the master Worker interface
Optional<WorkerDetails> _mWorker = getWorker( workers, db->get().read().master.address() );
Optional<WorkerDetails> _mWorker = getWorker( workers, db->get().master.address() );
if (_mWorker.present()) {
mWorker = _mWorker.get();
} else {
@ -2209,11 +2188,11 @@ ACTOR Future<StatusReply> clusterGetStatus(
}
// Get the DataDistributor worker interface
Optional<WorkerDetails> _ddWorker;
if (db->get().read().distributor.present()) {
_ddWorker = getWorker( workers, db->get().read().distributor.get().address() );
if (db->get().distributor.present()) {
_ddWorker = getWorker( workers, db->get().distributor.get().address() );
}
if (!db->get().read().distributor.present() || !_ddWorker.present()) {
if (!db->get().distributor.present() || !_ddWorker.present()) {
messages.push_back(JsonString::makeMessage("unreachable_dataDistributor_worker", "Unable to locate the data distributor worker."));
} else {
ddWorker = _ddWorker.get();
@ -2221,11 +2200,11 @@ ACTOR Future<StatusReply> clusterGetStatus(
// Get the Ratekeeper worker interface
Optional<WorkerDetails> _rkWorker;
if (db->get().read().ratekeeper.present()) {
_rkWorker = getWorker( workers, db->get().read().ratekeeper.get().address() );
if (db->get().ratekeeper.present()) {
_rkWorker = getWorker( workers, db->get().ratekeeper.get().address() );
}
if (!db->get().read().ratekeeper.present() || !_rkWorker.present()) {
if (!db->get().ratekeeper.present() || !_rkWorker.present()) {
messages.push_back(JsonString::makeMessage("unreachable_ratekeeper_worker", "Unable to locate the ratekeeper worker."));
} else {
rkWorker = _rkWorker.get();
@ -2283,8 +2262,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
state WorkerEvents programStarts = workerEventsVec[5].present() ? workerEventsVec[5].get().first : WorkerEvents();
state JsonBuilderObject statusObj;
if(db->get().read().recoveryCount > 0) {
statusObj["generation"] = db->get().read().recoveryCount;
if(db->get().recoveryCount > 0) {
statusObj["generation"] = db->get().recoveryCount;
}
state std::map<std::string, std::vector<JsonBuilderObject>> processIssues =
@ -2367,7 +2346,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
state std::vector<JsonBuilderObject> workerStatuses = wait(getAll(futures2));
int oldLogFaultTolerance = 100;
if(db->get().read().recoveryState >= RecoveryState::ACCEPTING_COMMITS && db->get().read().logSystemConfig.oldTLogs.size() > 0) {
if(db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && db->get().logSystemConfig.oldTLogs.size() > 0) {
statusObj["old_logs"] = oldTlogFetcher(&oldLogFaultTolerance, db, address_workers);
}

View File

@ -27,14 +27,14 @@
#include "fdbserver/MasterInterface.h"
#include "fdbclient/ClusterInterface.h"
typedef Standalone<VectorRef<StringRef>> ProcessIssues;
typedef std::map<NetworkAddress, std::pair<ProcessIssues, UID>> ProcessIssuesMap;
struct ProcessIssues {
NetworkAddress address;
Standalone<VectorRef<StringRef>> issues;
void setIssues(ProcessIssuesMap& issueMap, NetworkAddress const& addr, VectorRef<StringRef> const& issues, Optional<UID>& issueID);
ProcessIssues(NetworkAddress address, Standalone<VectorRef<StringRef>> issues) : address(address), issues(issues) {}
};
void removeIssues(ProcessIssuesMap& issueMap, NetworkAddress const& addr, Optional<UID>& issueID);
Future<StatusReply> clusterGetStatus( Reference<AsyncVar<CachedSerialization<struct ServerDBInfo>>> const& db, Database const& cx, vector<WorkerDetails> const& workers,
ProcessIssuesMap const& workerIssues, std::map<NetworkAddress, std::pair<double, OpenDatabaseRequest>>* const& clientStatus, ServerCoordinators const& coordinators, std::vector<NetworkAddress> const& incompatibleConnections, Version const& datacenterVersionDifference );
Future<StatusReply> clusterGetStatus( Reference<AsyncVar<struct ServerDBInfo>> const& db, Database const& cx, vector<WorkerDetails> const& workers, std::vector<ProcessIssues> const& workerIssues,
std::map<NetworkAddress, std::pair<double, OpenDatabaseRequest>>* const& clientStatus, ServerCoordinators const& coordinators, std::vector<NetworkAddress> const& incompatibleConnections, Version const& datacenterVersionDifference );
#endif

View File

@ -37,6 +37,7 @@
#include "fdbserver/LogSystemConfig.h"
#include "fdbrpc/MultiInterface.h"
#include "fdbclient/ClientWorkerInterface.h"
#include "fdbserver/RecoveryState.h"
#include "flow/actorcompiler.h"
struct WorkerInterface {
@ -60,14 +61,17 @@ struct WorkerInterface {
RequestStream< struct EventLogRequest > eventLogRequest;
RequestStream< struct TraceBatchDumpRequest > traceBatchDumpRequest;
RequestStream< struct DiskStoreRequest > diskStoreRequest;
RequestStream<struct ExecuteRequest> execReq;
RequestStream<struct WorkerSnapRequest> workerSnapReq;
RequestStream< struct ExecuteRequest> execReq;
RequestStream< struct WorkerSnapRequest> workerSnapReq;
RequestStream< struct UpdateServerDBInfoRequest > updateServerDBInfo;
TesterInterface testerInterface;
UID id() const { return tLog.getEndpoint().token; }
NetworkAddress address() const { return tLog.getEndpoint().getPrimaryAddress(); }
NetworkAddress stableAddress() const { return tLog.getEndpoint().getStableAddress(); }
Optional<NetworkAddress> secondaryAddress() const { return tLog.getEndpoint().addresses.secondaryAddress; }
NetworkAddressList addresses() const { return tLog.getEndpoint().addresses; }
WorkerInterface() {}
WorkerInterface( const LocalityData& locality ) : locality( locality ) {}
@ -81,12 +85,13 @@ struct WorkerInterface {
logRouter.getEndpoint( TaskPriority::Worker );
debugPing.getEndpoint( TaskPriority::Worker );
coordinationPing.getEndpoint( TaskPriority::Worker );
updateServerDBInfo.getEndpoint( TaskPriority::Worker );
eventLogRequest.getEndpoint( TaskPriority::Worker );
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, clientInterface, locality, tLog, master, masterProxy, dataDistributor, ratekeeper, resolver, storage, logRouter, debugPing, coordinationPing, waitFailure, setMetricsRate, eventLogRequest, traceBatchDumpRequest, testerInterface, diskStoreRequest, execReq, workerSnapReq, backup);
serializer(ar, clientInterface, locality, tLog, master, masterProxy, dataDistributor, ratekeeper, resolver, storage, logRouter, debugPing, coordinationPing, waitFailure, setMetricsRate, eventLogRequest, traceBatchDumpRequest, testerInterface, diskStoreRequest, execReq, workerSnapReq, backup, updateServerDBInfo);
}
};
@ -105,6 +110,230 @@ struct WorkerDetails {
}
};
// This interface and its serialization depend on slicing, since the client will deserialize only the first part of this structure
struct ClusterControllerFullInterface {
constexpr static FileIdentifier file_identifier =
ClusterControllerClientInterface::file_identifier;
ClusterInterface clientInterface;
RequestStream< struct RecruitFromConfigurationRequest > recruitFromConfiguration;
RequestStream< struct RecruitRemoteFromConfigurationRequest > recruitRemoteFromConfiguration;
RequestStream< struct RecruitStorageRequest > recruitStorage;
RequestStream< struct RegisterWorkerRequest > registerWorker;
RequestStream< struct GetWorkersRequest > getWorkers;
RequestStream< struct RegisterMasterRequest > registerMaster;
RequestStream< struct GetServerDBInfoRequest > getServerDBInfo; //only used by testers; the cluster controller will send the serverDBInfo to workers
UID id() const { return clientInterface.id(); }
bool operator == (ClusterControllerFullInterface const& r) const { return id() == r.id(); }
bool operator != (ClusterControllerFullInterface const& r) const { return id() != r.id(); }
bool hasMessage() {
return clientInterface.hasMessage() ||
recruitFromConfiguration.getFuture().isReady() ||
recruitRemoteFromConfiguration.getFuture().isReady() ||
recruitStorage.getFuture().isReady() ||
registerWorker.getFuture().isReady() ||
getWorkers.getFuture().isReady() ||
registerMaster.getFuture().isReady() ||
getServerDBInfo.getFuture().isReady();
}
void initEndpoints() {
clientInterface.initEndpoints();
recruitFromConfiguration.getEndpoint( TaskPriority::ClusterControllerRecruit );
recruitRemoteFromConfiguration.getEndpoint( TaskPriority::ClusterControllerRecruit );
recruitStorage.getEndpoint( TaskPriority::ClusterController );
registerWorker.getEndpoint( TaskPriority::ClusterControllerWorker );
getWorkers.getEndpoint( TaskPriority::ClusterController );
registerMaster.getEndpoint( TaskPriority::ClusterControllerRegister );
getServerDBInfo.getEndpoint( TaskPriority::ClusterController );
}
template <class Ar>
void serialize(Ar& ar) {
if constexpr (!is_fb_function<Ar>) {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, clientInterface, recruitFromConfiguration, recruitRemoteFromConfiguration, recruitStorage,
registerWorker, getWorkers, registerMaster, getServerDBInfo);
}
};
struct RegisterWorkerReply {
constexpr static FileIdentifier file_identifier = 16475696;
ProcessClass processClass;
ClusterControllerPriorityInfo priorityInfo;
Optional<uint16_t> storageCache;
RegisterWorkerReply() : priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
RegisterWorkerReply(ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, Optional<uint16_t> storageCache) : processClass(processClass), priorityInfo(priorityInfo), storageCache(storageCache) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, processClass, priorityInfo, storageCache);
}
};
struct RegisterMasterRequest {
constexpr static FileIdentifier file_identifier = 10773445;
UID id;
LocalityData mi;
LogSystemConfig logSystemConfig;
std::vector<MasterProxyInterface> proxies;
std::vector<ResolverInterface> resolvers;
DBRecoveryCount recoveryCount;
int64_t registrationCount;
Optional<DatabaseConfiguration> configuration;
std::vector<UID> priorCommittedLogServers;
RecoveryState recoveryState;
bool recoveryStalled;
ReplyPromise<Void> reply;
RegisterMasterRequest() : logSystemConfig(0) {}
template <class Ar>
void serialize(Ar& ar) {
if constexpr (!is_fb_function<Ar>) {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, id, mi, logSystemConfig, proxies, resolvers, recoveryCount, registrationCount, configuration,
priorCommittedLogServers, recoveryState, recoveryStalled, reply);
}
};
struct RecruitFromConfigurationReply {
constexpr static FileIdentifier file_identifier = 2224085;
std::vector<WorkerInterface> backupWorkers;
std::vector<WorkerInterface> tLogs;
std::vector<WorkerInterface> satelliteTLogs;
std::vector<WorkerInterface> proxies;
std::vector<WorkerInterface> resolvers;
std::vector<WorkerInterface> storageServers;
std::vector<WorkerInterface> oldLogRouters;
Optional<Key> dcId;
bool satelliteFallback;
RecruitFromConfigurationReply() : satelliteFallback(false) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, tLogs, satelliteTLogs, proxies, resolvers, storageServers, oldLogRouters, dcId,
satelliteFallback, backupWorkers);
}
};
struct RecruitFromConfigurationRequest {
constexpr static FileIdentifier file_identifier = 2023046;
DatabaseConfiguration configuration;
bool recruitSeedServers;
int maxOldLogRouters;
ReplyPromise< RecruitFromConfigurationReply > reply;
RecruitFromConfigurationRequest() {}
explicit RecruitFromConfigurationRequest(DatabaseConfiguration const& configuration, bool recruitSeedServers, int maxOldLogRouters)
: configuration(configuration), recruitSeedServers(recruitSeedServers), maxOldLogRouters(maxOldLogRouters) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, configuration, recruitSeedServers, maxOldLogRouters, reply);
}
};
struct RecruitRemoteFromConfigurationReply {
constexpr static FileIdentifier file_identifier = 9091392;
std::vector<WorkerInterface> remoteTLogs;
std::vector<WorkerInterface> logRouters;
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, remoteTLogs, logRouters);
}
};
struct RecruitRemoteFromConfigurationRequest {
constexpr static FileIdentifier file_identifier = 3235995;
DatabaseConfiguration configuration;
Optional<Key> dcId;
int logRouterCount;
std::vector<UID> exclusionWorkerIds;
ReplyPromise< RecruitRemoteFromConfigurationReply > reply;
RecruitRemoteFromConfigurationRequest() {}
RecruitRemoteFromConfigurationRequest(DatabaseConfiguration const& configuration, Optional<Key> const& dcId, int logRouterCount, const std::vector<UID> &exclusionWorkerIds) : configuration(configuration), dcId(dcId), logRouterCount(logRouterCount), exclusionWorkerIds(exclusionWorkerIds){}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, configuration, dcId, logRouterCount, exclusionWorkerIds, reply);
}
};
struct RecruitStorageReply {
constexpr static FileIdentifier file_identifier = 15877089;
WorkerInterface worker;
ProcessClass processClass;
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, worker, processClass);
}
};
struct RecruitStorageRequest {
constexpr static FileIdentifier file_identifier = 905920;
std::vector<Optional<Standalone<StringRef>>> excludeMachines; //< Don't recruit any of these machines
std::vector<AddressExclusion> excludeAddresses; //< Don't recruit any of these addresses
std::vector<Optional<Standalone<StringRef>>> includeDCs;
bool criticalRecruitment; //< True if machine classes are to be ignored
ReplyPromise< RecruitStorageReply > reply;
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, excludeMachines, excludeAddresses, includeDCs, criticalRecruitment, reply);
}
};
struct RegisterWorkerRequest {
constexpr static FileIdentifier file_identifier = 14332605;
WorkerInterface wi;
ProcessClass initialClass;
ProcessClass processClass;
ClusterControllerPriorityInfo priorityInfo;
Generation generation;
Optional<DataDistributorInterface> distributorInterf;
Optional<RatekeeperInterface> ratekeeperInterf;
Optional<std::pair<uint16_t,StorageServerInterface>> storageCacheInterf;
Standalone<VectorRef<StringRef>> issues;
std::vector<NetworkAddress> incompatiblePeers;
ReplyPromise<RegisterWorkerReply> reply;
bool degraded;
RegisterWorkerRequest() : priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown), degraded(false) {}
RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, Generation generation, Optional<DataDistributorInterface> ddInterf, Optional<RatekeeperInterface> rkInterf, Optional<std::pair<uint16_t,StorageServerInterface>> storageCacheInterf, bool degraded) :
wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo), generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), storageCacheInterf(storageCacheInterf), degraded(degraded) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, wi, initialClass, processClass, priorityInfo, generation, distributorInterf, ratekeeperInterf, storageCacheInterf, issues, incompatiblePeers, reply, degraded);
}
};
struct GetWorkersRequest {
constexpr static FileIdentifier file_identifier = 1254174;
enum { TESTER_CLASS_ONLY = 0x1, NON_EXCLUDED_PROCESSES_ONLY = 0x2 };
int flags;
ReplyPromise<vector<WorkerDetails>> reply;
GetWorkersRequest() : flags(0) {}
explicit GetWorkersRequest(int fl) : flags(fl) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, flags, reply);
}
};
struct InitializeTLogRequest {
constexpr static FileIdentifier file_identifier = 15604392;
UID recruitmentID;
@ -463,7 +692,6 @@ void endRole(const Role &role, UID id, std::string reason, bool ok = true, Error
struct ServerDBInfo;
class Database openDBOnServer( Reference<AsyncVar<ServerDBInfo>> const& db, TaskPriority taskID = TaskPriority::DefaultEndpoint, bool enableLocalityLoadBalance = true, bool lockAware = false );
class Database openDBOnServer( Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> const& db, TaskPriority taskID = TaskPriority::DefaultEndpoint, bool enableLocalityLoadBalance = true, bool lockAware = false );
ACTOR Future<Void> extractClusterInterface(Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> a,
Reference<AsyncVar<Optional<struct ClusterInterface>>> b);
@ -497,12 +725,6 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQu
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, UID workerID,
bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog);
ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
Reference<ClusterConnectionFile> ccf, LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Optional<Reference<AsyncVar<std::set<std::string>>>> issues =
Optional<Reference<AsyncVar<std::set<std::string>>>>());
ACTOR Future<Void> resolver(ResolverInterface proxy, InitializeResolverRequest initReq,
Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> logRouter(TLogInterface interf, InitializeLogRouterRequest req,
@ -536,5 +758,6 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQu
typedef decltype(&tLog) TLogFn;
#include "fdbserver/ServerDBInfo.h"
#include "flow/unactorcompiler.h"
#endif

View File

@ -34,7 +34,6 @@
#include "fdbserver/CoordinationInterface.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbclient/RestoreWorkerInterface.actor.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/ConflictSet.h"

View File

@ -33,7 +33,6 @@
#include "fdbserver/MasterInterface.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/CoordinatedState.h"
#include "fdbserver/CoordinationInterface.h" // copy constructors for ServerCoordinators class
@ -740,22 +739,27 @@ ACTOR Future<Void> sendInitialCommitToResolvers( Reference<MasterData> self ) {
ASSERT(self->recoveryTransactionVersion);
state Standalone<RangeResultRef> data = self->txnStateStore->readRange(txnKeys, BUGGIFY ? 3 : SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES).get();
state vector<Future<Void>> txnReplies;
state std::vector<Future<Void>> txnReplies;
state int64_t dataOutstanding = 0;
state std::vector<Endpoint> endpoints;
for(auto& it : self->proxies) {
endpoints.push_back(it.txnState.getEndpoint());
}
loop {
if(!data.size()) break;
((KeyRangeRef&)txnKeys) = KeyRangeRef( keyAfter(data.back().key, txnKeys.arena()), txnKeys.end );
Standalone<RangeResultRef> nextData = self->txnStateStore->readRange(txnKeys, BUGGIFY ? 3 : SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES).get();
for(auto& r : self->proxies) {
TxnStateRequest req;
req.arena = data.arena();
req.data = data;
req.sequence = txnSequence;
req.last = !nextData.size();
txnReplies.push_back( brokenPromiseToNever( r.txnState.getReply( req ) ) );
dataOutstanding += data.arena().getSize();
}
TxnStateRequest req;
req.arena = data.arena();
req.data = data;
req.sequence = txnSequence;
req.last = !nextData.size();
req.broadcastInfo = endpoints;
txnReplies.push_back(broadcastTxnRequest(req, SERVER_KNOBS->TXN_STATE_SEND_AMOUNT, false));
dataOutstanding += SERVER_KNOBS->TXN_STATE_SEND_AMOUNT*data.arena().getSize();
data = nextData;
txnSequence++;

View File

@ -28,13 +28,13 @@
#include "fdbclient/SystemData.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/Status.h"
#include "fdbserver/QuietDatabase.h"
#include "fdbclient/MonitorLeader.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
using namespace std;
@ -1017,11 +1017,40 @@ vector<TestSpec> readTests( ifstream& ifs ) {
return result;
}
ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo) {
// Initially most of the serverDBInfo is not known, but we know our locality right away
ServerDBInfo localInfo;
localInfo.myLocality = locality;
dbInfo->set(localInfo);
loop {
GetServerDBInfoRequest req;
req.knownServerInfoID = dbInfo->get().id;
choose {
when( ServerDBInfo _localInfo = wait( ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().getServerDBInfo.getReply( req ) ) : Never() ) ) {
ServerDBInfo localInfo = _localInfo;
TraceEvent("GotServerDBInfoChange").detail("ChangeID", localInfo.id).detail("MasterID", localInfo.master.id())
.detail("RatekeeperID", localInfo.ratekeeper.present() ? localInfo.ratekeeper.get().id() : UID())
.detail("DataDistributorID", localInfo.distributor.present() ? localInfo.distributor.get().id() : UID());
localInfo.myLocality = locality;
dbInfo->set(localInfo);
}
when( wait( ccInterface->onChange() ) ) {
if(ccInterface->get().present())
TraceEvent("GotCCInterfaceChange").detail("CCID", ccInterface->get().get().id()).detail("CCMachine", ccInterface->get().get().getWorkers.getEndpoint().getPrimaryAddress());
}
}
}
}
ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> cc, Reference<AsyncVar<Optional<struct ClusterInterface>>> ci, vector< TesterInterface > testers, vector<TestSpec> tests, StringRef startingConfiguration, LocalityData locality ) {
state Database cx;
state Reference<AsyncVar<ServerDBInfo>> dbInfo( new AsyncVar<ServerDBInfo> );
state Future<Void> ccMonitor =
monitorServerDBInfo(cc, Reference<ClusterConnectionFile>(), LocalityData(), dbInfo); // FIXME: locality
state Future<Void> ccMonitor = monitorServerDBInfo(cc, LocalityData(), dbInfo); // FIXME: locality
state bool useDB = false;
state bool waitForQuiescenceBegin = false;
@ -1192,7 +1221,7 @@ ACTOR Future<Void> runTests( Reference<ClusterConnectionFile> connFile, test_typ
if (at == TEST_HERE) {
Reference<AsyncVar<ServerDBInfo>> db( new AsyncVar<ServerDBInfo> );
vector<TesterInterface> iTesters(1);
actors.push_back( reportErrors(monitorServerDBInfo( cc, Reference<ClusterConnectionFile>(), LocalityData(), db ), "MonitorServerDBInfo") ); // FIXME: Locality
actors.push_back( reportErrors(monitorServerDBInfo( cc, LocalityData(), db ), "MonitorServerDBInfo") ); // FIXME: Locality
actors.push_back( reportErrors(testerServerCore( iTesters[0], connFile, db, locality ), "TesterServerCore") );
tests = runTests( cc, ci, iTesters, testSpecs, startingConfiguration, locality );
} else {

View File

@ -33,7 +33,6 @@
#include "fdbserver/TesterInterface.actor.h" // for poisson()
#include "fdbserver/IDiskQueue.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
#include "fdbserver/DataDistributorInterface.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/FDBExecHelper.actor.h"
@ -68,6 +67,44 @@ extern IKeyValueStore* keyValueStoreCompressTestData(IKeyValueStore* store);
# define KV_STORE(filename,uid) keyValueStoreMemory(filename,uid)
#endif
ACTOR Future<std::vector<Endpoint>> tryDBInfoBroadcast(RequestStream<UpdateServerDBInfoRequest> stream, UpdateServerDBInfoRequest req) {
ErrorOr<std::vector<Endpoint>> rep = wait( stream.getReplyUnlessFailedFor(req, SERVER_KNOBS->DBINFO_FAILED_DELAY, 0) );
if(rep.present()) {
return rep.get();
}
req.broadcastInfo.push_back(stream.getEndpoint());
return req.broadcastInfo;
}
ACTOR Future<std::vector<Endpoint>> broadcastDBInfoRequest(UpdateServerDBInfoRequest req, int sendAmount, Optional<Endpoint> sender, bool sendReply) {
state std::vector<Future<std::vector<Endpoint>>> replies;
state ReplyPromise<std::vector<Endpoint>> reply = req.reply;
resetReply( req );
int currentStream = 0;
std::vector<Endpoint> broadcastEndpoints = req.broadcastInfo;
for(int i = 0; i < sendAmount && currentStream < broadcastEndpoints.size(); i++) {
std::vector<Endpoint> endpoints;
RequestStream<UpdateServerDBInfoRequest> cur(broadcastEndpoints[currentStream++]);
while(currentStream < broadcastEndpoints.size()*(i+1)/sendAmount) {
endpoints.push_back(broadcastEndpoints[currentStream++]);
}
req.broadcastInfo = endpoints;
replies.push_back( tryDBInfoBroadcast( cur, req ) );
resetReply( req );
}
wait( waitForAll(replies) );
std::vector<Endpoint> notUpdated;
if(sender.present()) {
notUpdated.push_back(sender.get());
}
for(auto& it : replies) {
notUpdated.insert(notUpdated.end(), it.get().begin(), it.get().end());
}
if(sendReply) {
reply.send(notUpdated);
}
return notUpdated;
}
ACTOR static Future<Void> extractClientInfo( Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ClientDBInfo>> info ) {
state std::vector<UID> lastProxyUIDs;
@ -80,27 +117,11 @@ ACTOR static Future<Void> extractClientInfo( Reference<AsyncVar<ServerDBInfo>> d
}
}
ACTOR static Future<Void> extractClientInfo( Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> db, Reference<AsyncVar<ClientDBInfo>> info ) {
state std::vector<UID> lastProxyUIDs;
state std::vector<MasterProxyInterface> lastProxies;
loop {
ClientDBInfo ni = db->get().read().client;
shrinkProxyList(ni, lastProxyUIDs, lastProxies);
info->set( ni );
wait( db->onChange() );
}
}
Database openDBOnServer( Reference<AsyncVar<ServerDBInfo>> const& db, TaskPriority taskID, bool enableLocalityLoadBalance, bool lockAware ) {
Reference<AsyncVar<ClientDBInfo>> info( new AsyncVar<ClientDBInfo> );
return DatabaseContext::create( info, extractClientInfo(db, info), enableLocalityLoadBalance ? db->get().myLocality : LocalityData(), enableLocalityLoadBalance, taskID, lockAware );
}
Database openDBOnServer( Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> const& db, TaskPriority taskID, bool enableLocalityLoadBalance, bool lockAware ) {
Reference<AsyncVar<ClientDBInfo>> info( new AsyncVar<ClientDBInfo> );
return DatabaseContext::create( info, extractClientInfo(db, info), enableLocalityLoadBalance ? db->get().read().myLocality : LocalityData(), enableLocalityLoadBalance, taskID, lockAware );
}
struct ErrorInfo {
Error error;
const Role &role;
@ -413,7 +434,9 @@ ACTOR Future<Void> registrationClient(
Reference<AsyncVar<bool>> degraded,
PromiseStream< ErrorInfo > errors,
LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo) {
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<ClusterConnectionFile> connFile,
Reference<AsyncVar<std::set<std::string>>> issues) {
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply (requiring us to re-register)
// The registration request piggybacks optional distributor interface if it exists.
@ -422,8 +445,41 @@ ACTOR Future<Void> registrationClient(
state Reference<AsyncVar<Optional<std::pair<uint16_t,StorageServerInterface>>>> scInterf( new AsyncVar<Optional<std::pair<uint16_t,StorageServerInterface>>>() );
state Future<Void> cacheProcessFuture;
state Future<Void> cacheErrorsFuture;
state Optional<double> incorrectTime;
loop {
RegisterWorkerRequest request(interf, initialClass, processClass, asyncPriorityInfo->get(), requestGeneration++, ddInterf->get(), rkInterf->get(), scInterf->get(), degraded->get());
for (auto const& i : issues->get()) {
request.issues.push_back_deep(request.issues.arena(), i);
}
ClusterConnectionString fileConnectionString;
if (connFile && !connFile->fileContentsUpToDate(fileConnectionString)) {
request.issues.push_back_deep(request.issues.arena(), LiteralStringRef("incorrect_cluster_file_contents"));
std::string connectionString = connFile->getConnectionString().toString();
if(!incorrectTime.present()) {
incorrectTime = now();
}
if(connFile->canGetFilename()) {
// Don't log a SevWarnAlways initially to account for transient issues (e.g. someone else changing the file right before us)
TraceEvent(now() - incorrectTime.get() > 300 ? SevWarnAlways : SevWarn, "IncorrectClusterFileContents")
.detail("Filename", connFile->getFilename())
.detail("ConnectionStringFromFile", fileConnectionString.toString())
.detail("CurrentConnectionString", connectionString);
}
}
else {
incorrectTime = Optional<double>();
}
auto peers = FlowTransport::transport().getIncompatiblePeers();
for(auto it = peers->begin(); it != peers->end();) {
if( now() - it->second.second > FLOW_KNOBS->INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING ) {
request.incompatiblePeers.push_back(it->first);
it = peers->erase(it);
} else {
it++;
}
}
Future<RegisterWorkerReply> registrationReply = ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().registerWorker.getReply(request) ) : Never();
choose {
when ( RegisterWorkerReply reply = wait( registrationReply )) {
@ -464,6 +520,8 @@ ACTOR Future<Void> registrationClient(
when ( wait( rkInterf->onChange() ) ) {}
when ( wait( scInterf->onChange() ) ) {}
when ( wait( degraded->onChange() ) ) {}
when ( wait( FlowTransport::transport().onIncompatibleChanged() ) ) {}
when ( wait( issues->onChange() ) ) {}
}
}
}
@ -749,7 +807,10 @@ ACTOR Future<Void> workerSnapCreate(WorkerSnapRequest snapReq, StringRef snapFol
return Void();
}
ACTOR Future<Void> monitorTraceLogIssues(Optional<Reference<AsyncVar<std::set<std::string>>>> issues) {
// TODO: `issues` is right now only updated by `monitorTraceLogIssues` and thus is being `set` on every update.
// It could be changed to `insert` and `trigger` later if we want to use it as a generic way for the caller of this
// function to report issues to cluster controller.
ACTOR Future<Void> monitorTraceLogIssues(Reference<AsyncVar<std::set<std::string>>> issues) {
state bool pingTimeout = false;
loop {
wait(delay(SERVER_KNOBS->TRACE_LOG_FLUSH_FAILURE_CHECK_INTERVAL_SECONDS));
@ -764,87 +825,14 @@ ACTOR Future<Void> monitorTraceLogIssues(Optional<Reference<AsyncVar<std::set<st
throw;
}
}
if (issues.present()) {
std::set<std::string> _issues;
retriveTraceLogIssues(_issues);
if (pingTimeout) {
// Ping trace log writer thread timeout.
_issues.insert("trace_log_writer_thread_unresponsive");
pingTimeout = false;
}
issues.get()->set(_issues);
}
}
}
// TODO: `issues` is right now only updated by `monitorTraceLogIssues` and thus is being `set` on every update.
// It could be changed to `insert` and `trigger` later if we want to use it as a generic way for the caller of this
// function to report issues to cluster controller.
ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
Reference<ClusterConnectionFile> connFile, LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Optional<Reference<AsyncVar<std::set<std::string>>>> issues) {
// Initially most of the serverDBInfo is not known, but we know our locality right away
ServerDBInfo localInfo;
localInfo.myLocality = locality;
dbInfo->set(localInfo);
state Optional<double> incorrectTime;
loop {
GetServerDBInfoRequest req;
req.knownServerInfoID = dbInfo->get().id;
if (issues.present()) {
for (auto const& i : issues.get()->get()) {
req.issues.push_back_deep(req.issues.arena(), i);
}
}
ClusterConnectionString fileConnectionString;
if (connFile && !connFile->fileContentsUpToDate(fileConnectionString)) {
req.issues.push_back_deep(req.issues.arena(), LiteralStringRef("incorrect_cluster_file_contents"));
std::string connectionString = connFile->getConnectionString().toString();
if(!incorrectTime.present()) {
incorrectTime = now();
}
if(connFile->canGetFilename()) {
// Don't log a SevWarnAlways initially to account for transient issues (e.g. someone else changing the file right before us)
TraceEvent(now() - incorrectTime.get() > 300 ? SevWarnAlways : SevWarn, "IncorrectClusterFileContents")
.detail("Filename", connFile->getFilename())
.detail("ConnectionStringFromFile", fileConnectionString.toString())
.detail("CurrentConnectionString", connectionString);
}
}
else {
incorrectTime = Optional<double>();
}
auto peers = FlowTransport::transport().getIncompatiblePeers();
for(auto it = peers->begin(); it != peers->end();) {
if( now() - it->second.second > SERVER_KNOBS->INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING ) {
req.incompatiblePeers.push_back(it->first);
it = peers->erase(it);
} else {
it++;
}
}
choose {
when( CachedSerialization<ServerDBInfo> ni = wait( ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().getServerDBInfo.getReply( req ) ) : Never() ) ) {
ServerDBInfo localInfo = ni.read();
TraceEvent("GotServerDBInfoChange").detail("ChangeID", localInfo.id).detail("MasterID", localInfo.master.id())
.detail("RatekeeperID", localInfo.ratekeeper.present() ? localInfo.ratekeeper.get().id() : UID())
.detail("DataDistributorID", localInfo.distributor.present() ? localInfo.distributor.get().id() : UID());
localInfo.myLocality = locality;
dbInfo->set(localInfo);
}
when( wait( ccInterface->onChange() ) ) {
if(ccInterface->get().present())
TraceEvent("GotCCInterfaceChange").detail("CCID", ccInterface->get().get().id()).detail("CCMachine", ccInterface->get().get().getWorkers.getEndpoint().getPrimaryAddress());
}
when(wait(issues.present() ? issues.get()->onChange() : Never())) {}
std::set<std::string> _issues;
retriveTraceLogIssues(_issues);
if (pingTimeout) {
// Ping trace log writer thread timeout.
_issues.insert("trace_log_writer_thread_unresponsive");
pingTimeout = false;
}
issues->set(_issues);
}
}
@ -934,8 +922,7 @@ ACTOR Future<Void> workerServer(
errorForwarders.add( resetAfter(degraded, SERVER_KNOBS->DEGRADED_RESET_INTERVAL, false, SERVER_KNOBS->DEGRADED_WARNING_LIMIT, SERVER_KNOBS->DEGRADED_WARNING_RESET_DELAY, "DegradedReset"));
errorForwarders.add( loadedPonger( interf.debugPing.getFuture() ) );
errorForwarders.add( waitFailureServer( interf.waitFailure.getFuture() ) );
errorForwarders.add(monitorTraceLogIssues(issues));
errorForwarders.add(monitorServerDBInfo(ccInterface, connFile, locality, dbInfo, issues));
errorForwarders.add( monitorTraceLogIssues(issues) );
errorForwarders.add( testerServerCore( interf.testerInterface, connFile, dbInfo, locality ) );
errorForwarders.add(monitorHighMemory(memoryProfileThreshold));
@ -958,6 +945,7 @@ ACTOR Future<Void> workerServer(
DUMPTOKEN(recruited.setMetricsRate);
DUMPTOKEN(recruited.eventLogRequest);
DUMPTOKEN(recruited.traceBatchDumpRequest);
DUMPTOKEN(recruited.updateServerDBInfo);
}
state std::vector<Future<Void>> recoveries;
@ -1051,12 +1039,34 @@ ACTOR Future<Void> workerServer(
wait(waitForAll(recoveries));
recoveredDiskFiles.send(Void());
errorForwarders.add( registrationClient( ccInterface, interf, asyncPriorityInfo, initialClass, ddInterf, rkInterf, degraded, errors, locality, dbInfo ) );
errorForwarders.add( registrationClient( ccInterface, interf, asyncPriorityInfo, initialClass, ddInterf, rkInterf, degraded, errors, locality, dbInfo, connFile, issues) );
TraceEvent("RecoveriesComplete", interf.id());
loop choose {
when( UpdateServerDBInfoRequest req = waitNext( interf.updateServerDBInfo.getFuture() ) ) {
ServerDBInfo localInfo = BinaryReader::fromStringRef<ServerDBInfo>(req.serializedDbInfo, AssumeVersion(currentProtocolVersion));
localInfo.myLocality = locality;
if(localInfo.infoGeneration < dbInfo->get().infoGeneration && localInfo.clusterInterface == dbInfo->get().clusterInterface) {
std::vector<Endpoint> rep = req.broadcastInfo;
rep.push_back(interf.updateServerDBInfo.getEndpoint());
req.reply.send(rep);
} else {
Optional<Endpoint> notUpdated;
if(!ccInterface->get().present() || localInfo.clusterInterface != ccInterface->get().get()) {
notUpdated = interf.updateServerDBInfo.getEndpoint();
}
else if(localInfo.infoGeneration > dbInfo->get().infoGeneration || dbInfo->get().clusterInterface != ccInterface->get().get()) {
TraceEvent("GotServerDBInfoChange").detail("ChangeID", localInfo.id).detail("MasterID", localInfo.master.id())
.detail("RatekeeperID", localInfo.ratekeeper.present() ? localInfo.ratekeeper.get().id() : UID())
.detail("DataDistributorID", localInfo.distributor.present() ? localInfo.distributor.get().id() : UID());
dbInfo->set(localInfo);
}
errorForwarders.add(success(broadcastDBInfoRequest(req, SERVER_KNOBS->DBINFO_SEND_AMOUNT, notUpdated, true)));
}
}
when( RebootRequest req = waitNext( interf.clientInterface.reboot.getFuture() ) ) {
state RebootRequest rebootReq = req;
// If suspendDuration is INT_MAX, the trace will not be logged if it was inside the next block

View File

@ -1141,12 +1141,12 @@ struct ConsistencyCheckWorkload : TestWorkload
std::set<Optional<Key>> missingStorage;
for( int i = 0; i < workers.size(); i++ ) {
NetworkAddress addr = workers[i].interf.tLog.getEndpoint().addresses.getTLSAddress();
if( !configuration.isExcludedServer(addr) &&
NetworkAddress addr = workers[i].interf.stableAddress();
if( !configuration.isExcludedServer(workers[i].interf.addresses()) &&
( workers[i].processClass == ProcessClass::StorageClass || workers[i].processClass == ProcessClass::UnsetClass ) ) {
bool found = false;
for( int j = 0; j < storageServers.size(); j++ ) {
if( storageServers[j].getValue.getEndpoint().addresses.getTLSAddress() == addr ) {
if( storageServers[j].stableAddress() == addr ) {
found = true;
break;
}

View File

@ -20,7 +20,6 @@
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/CoordinationInterface.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"

View File

@ -22,7 +22,6 @@
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/QuietDatabase.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct PerformanceWorkload : TestWorkload {

View File

@ -28,7 +28,6 @@
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
#include "fdbclient/ReadYourWrites.h"
#include "flow/TDMetric.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.

View File

@ -4,7 +4,6 @@
#include "fdbclient/ReadYourWrites.h"
#include "fdbrpc/ContinuousSample.h"
#include "fdbmonitor/SimpleIni.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
#include "fdbserver/Status.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/WorkerInterface.actor.h"

View File

@ -80,6 +80,7 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) {
init( TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY, 5.0 );
init( TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT, 20.0 );
init( PEER_UNAVAILABLE_FOR_LONG_TIME_TIMEOUT, 3600.0 );
init( INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING, 5.0 );
init( TLS_CERT_REFRESH_DELAY_SECONDS, 12*60*60 );
init( TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT, 9.0 );

View File

@ -94,6 +94,7 @@ public:
double RECONNECTION_TIME_GROWTH_RATE;
double RECONNECTION_RESET_TIME;
int ACCEPT_BATCH_SIZE;
double INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING;
int TLS_CERT_REFRESH_DELAY_SECONDS;
double TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT;