fix compilation errors

This commit is contained in:
Evan Tschannen 2020-04-06 20:58:43 -07:00
parent 477d66b46d
commit 2a1bd97120
11 changed files with 339 additions and 289 deletions

View File

@ -268,22 +268,13 @@ struct GetStorageServerRejoinInfoRequest {
}
};
struct TreeBroadcastInfo {
std::vector<Endpoint> endpoints;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, endpoints);
}
};
struct TxnStateRequest {
constexpr static FileIdentifier file_identifier = 15250781;
Arena arena;
VectorRef<KeyValueRef> data;
Sequence sequence;
bool last;
TreeBroadcastInfo broadcastInfo;
std::vector<Endpoint> broadcastInfo;
ReplyPromise<Void> reply;
template <class Ar>

View File

@ -1045,7 +1045,7 @@ ACTOR static Future<Void> multiVersionCleanupWorker( TransportData* self ) {
if( self->multiVersionConnections.count(it->second.first) ) {
it = self->incompatiblePeers.erase(it);
} else {
if( now() - it->second.second > SERVER_KNOBS->INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING ) {
if( now() - it->second.second > 5.0 ) { //INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING
foundIncompatible = true;
}
it++;

View File

@ -164,7 +164,7 @@ public:
std::map<NetworkAddress, std::pair<uint64_t, double>>* getIncompatiblePeers();
// Returns the same of all peers that have attempted to connect, but have incompatible protocol versions
Future<Void> FlowTransport::onIncompatibleChanged();
Future<Void> onIncompatibleChanged();
// Returns when getIncompatiblePeers has at least one peer which is incompatible.
void addPeerReference(const Endpoint&, bool isStream);

View File

@ -1376,13 +1376,14 @@ public:
serversFailed("ServersFailed", clusterControllerMetrics),
serversUnfailed("ServersUnfailed", clusterControllerMetrics)
{
ServerDBInfo serverInfo;
CachedSerialization<ServerDBInfo> newInfoCache;
auto& serverInfo = newInfoCache.mutate();
serverInfo.id = deterministicRandom()->randomUniqueID();
serverInfo.infoGeneration = ++db.dbInfoCount;
serverInfo.masterLifetime.ccID = id;
serverInfo.clusterInterface = ccInterface;
serverInfo.myLocality = locality;
db.serverInfo->set( serverInfo );
db.serverInfo->set( newInfoCache );
cx = openDBOnServer(db.serverInfo, TaskPriority::DefaultEndpoint, true, true);
}
@ -1437,7 +1438,8 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
db->masterRegistrationCount = 0;
db->recoveryStalled = false;
ServerDBInfo dbInfo;
CachedSerialization<ServerDBInfo> newInfoCache;
auto& dbInfo = newInfoCache.mutate();
dbInfo.master = iMaster;
dbInfo.id = deterministicRandom()->randomUniqueID();
dbInfo.infoGeneration = ++db->dbInfoCount;
@ -1450,7 +1452,7 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
dbInfo.latencyBandConfig = db->serverInfo->get().read().latencyBandConfig;
TraceEvent("CCWDB", cluster->id).detail("Lifetime", dbInfo.masterLifetime.toString()).detail("ChangeID", dbInfo.id);
db->serverInfo->set( dbInfo );
db->serverInfo->set( newInfoCache );
state Future<Void> spinDelay = delay(SERVER_KNOBS->MASTER_SPIN_DELAY); // Don't retry master recovery more than once per second, but don't delay the "first" recovery after more than a second of normal operation
@ -1484,6 +1486,18 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
}
}
ACTOR Future<Void> clusterGetServerInfo(ClusterControllerData::DBInfo* db, UID knownServerInfoID,
ReplyPromise<CachedSerialization<ServerDBInfo>> reply) {
while(db->serverInfo->get().read().id == knownServerInfoID) {
choose {
when (wait( yieldedFuture(db->serverInfo->onChange()) )) {}
when (wait( delayJittered( 300 ) )) { break; } // The server might be long gone!
}
}
reply.send( db->serverInfo->get() );
return Void();
}
ACTOR Future<Void> clusterOpenDatabase(ClusterControllerData::DBInfo* db, OpenDatabaseRequest req) {
db->clientStatus[req.reply.getEndpoint().getPrimaryAddress()] = std::make_pair(now(), req);
if(db->clientStatus.size() > 10000) {
@ -2104,7 +2118,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
newPriorityInfo.processClassFitness = newProcessClass.machineClassFitness(ProcessClass::ClusterController);
for(auto it : req.incompatiblePeers) {
db->incompatibleConnections[it] = now() + SERVER_KNOBS->INCOMPATIBLE_PEERS_LOGGING_INTERVAL;
self->db.incompatibleConnections[it] = now() + SERVER_KNOBS->INCOMPATIBLE_PEERS_LOGGING_INTERVAL;
}
if(info == self->id_worker.end()) {
@ -2530,7 +2544,7 @@ ACTOR Future<Void> monitorServerInfoConfig(ClusterControllerData::DBInfo* db) {
if(config != serverInfo.latencyBandConfig) {
TraceEvent("LatencyBandConfigChanged").detail("Present", config.present());
serverInfo.id = deterministicRandom()->randomUniqueID();
serverInfo.infoGeneration = ++db->serverInfo->dbInfoCount;
serverInfo.infoGeneration = ++db->dbInfoCount;
serverInfo.latencyBandConfig = config;
db->serverInfo->set(cachedInfo);
}
@ -3038,7 +3052,7 @@ ACTOR Future<Void> dbInfoUpdater( ClusterControllerData* self ) {
UpdateServerDBInfoRequest req;
//FIXME: cache serialization
req.dbInfo = self->db.serverInfo->get().read();
req.broadcastInfo.endpoints = self->updateDBInfoEndpoints;
req.broadcastInfo = self->updateDBInfoEndpoints;
choose {
when(std::vector<Endpoint> notUpdated = wait( broadcastDBInfoRequest(req, 2, Optional<Endpoint>(), false) )) {
@ -3145,6 +3159,9 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
++self.registerMasterRequests;
clusterRegisterMaster( &self, req );
}
when( GetServerDBInfoRequest req = waitNext( interf.getServerDBInfo.getFuture() ) ) {
self.addActor.send( clusterGetServerInfo(&self.db, req.knownServerInfoID, req.reply) );
}
when( wait( leaderFail ) ) {
// We are no longer the leader if this has changed.
endRole(Role::CLUSTER_CONTROLLER, interf.id(), "Leader Replaced", true);

View File

@ -31,231 +31,11 @@
#include "fdbserver/BackupInterface.h"
#include "fdbserver/DataDistributorInterface.h"
#include "fdbserver/MasterInterface.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/Knobs.h"
// This interface and its serialization depend on slicing, since the client will deserialize only the first part of this structure
struct ClusterControllerFullInterface {
constexpr static FileIdentifier file_identifier =
ClusterControllerClientInterface::file_identifier;
ClusterInterface clientInterface;
RequestStream< struct RecruitFromConfigurationRequest > recruitFromConfiguration;
RequestStream< struct RecruitRemoteFromConfigurationRequest > recruitRemoteFromConfiguration;
RequestStream< struct RecruitStorageRequest > recruitStorage;
RequestStream< struct RegisterWorkerRequest > registerWorker;
RequestStream< struct GetWorkersRequest > getWorkers;
RequestStream< struct RegisterMasterRequest > registerMaster;
UID id() const { return clientInterface.id(); }
bool operator == (ClusterControllerFullInterface const& r) const { return id() == r.id(); }
bool operator != (ClusterControllerFullInterface const& r) const { return id() != r.id(); }
bool hasMessage() {
return clientInterface.hasMessage() ||
recruitFromConfiguration.getFuture().isReady() ||
recruitRemoteFromConfiguration.getFuture().isReady() ||
recruitStorage.getFuture().isReady() ||
registerWorker.getFuture().isReady() ||
getWorkers.getFuture().isReady() ||
registerMaster.getFuture().isReady();
}
void initEndpoints() {
clientInterface.initEndpoints();
recruitFromConfiguration.getEndpoint( TaskPriority::ClusterControllerRecruit );
recruitRemoteFromConfiguration.getEndpoint( TaskPriority::ClusterControllerRecruit );
recruitStorage.getEndpoint( TaskPriority::ClusterController );
registerWorker.getEndpoint( TaskPriority::ClusterControllerWorker );
getWorkers.getEndpoint( TaskPriority::ClusterController );
registerMaster.getEndpoint( TaskPriority::ClusterControllerRegister );
}
template <class Ar>
void serialize(Ar& ar) {
if constexpr (!is_fb_function<Ar>) {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, clientInterface, recruitFromConfiguration, recruitRemoteFromConfiguration, recruitStorage,
registerWorker, getWorkers, registerMaster);
}
};
struct RecruitFromConfigurationReply {
constexpr static FileIdentifier file_identifier = 2224085;
std::vector<WorkerInterface> backupWorkers;
std::vector<WorkerInterface> tLogs;
std::vector<WorkerInterface> satelliteTLogs;
std::vector<WorkerInterface> proxies;
std::vector<WorkerInterface> resolvers;
std::vector<WorkerInterface> storageServers;
std::vector<WorkerInterface> oldLogRouters;
Optional<Key> dcId;
bool satelliteFallback;
RecruitFromConfigurationReply() : satelliteFallback(false) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, tLogs, satelliteTLogs, proxies, resolvers, storageServers, oldLogRouters, dcId,
satelliteFallback, backupWorkers);
}
};
struct RecruitFromConfigurationRequest {
constexpr static FileIdentifier file_identifier = 2023046;
DatabaseConfiguration configuration;
bool recruitSeedServers;
int maxOldLogRouters;
ReplyPromise< struct RecruitFromConfigurationReply > reply;
RecruitFromConfigurationRequest() {}
explicit RecruitFromConfigurationRequest(DatabaseConfiguration const& configuration, bool recruitSeedServers, int maxOldLogRouters)
: configuration(configuration), recruitSeedServers(recruitSeedServers), maxOldLogRouters(maxOldLogRouters) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, configuration, recruitSeedServers, maxOldLogRouters, reply);
}
};
struct RecruitRemoteFromConfigurationReply {
constexpr static FileIdentifier file_identifier = 9091392;
std::vector<WorkerInterface> remoteTLogs;
std::vector<WorkerInterface> logRouters;
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, remoteTLogs, logRouters);
}
};
struct RecruitRemoteFromConfigurationRequest {
constexpr static FileIdentifier file_identifier = 3235995;
DatabaseConfiguration configuration;
Optional<Key> dcId;
int logRouterCount;
std::vector<UID> exclusionWorkerIds;
ReplyPromise< struct RecruitRemoteFromConfigurationReply > reply;
RecruitRemoteFromConfigurationRequest() {}
RecruitRemoteFromConfigurationRequest(DatabaseConfiguration const& configuration, Optional<Key> const& dcId, int logRouterCount, const std::vector<UID> &exclusionWorkerIds) : configuration(configuration), dcId(dcId), logRouterCount(logRouterCount), exclusionWorkerIds(exclusionWorkerIds){}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, configuration, dcId, logRouterCount, exclusionWorkerIds, reply);
}
};
struct RecruitStorageReply {
constexpr static FileIdentifier file_identifier = 15877089;
WorkerInterface worker;
ProcessClass processClass;
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, worker, processClass);
}
};
struct RecruitStorageRequest {
constexpr static FileIdentifier file_identifier = 905920;
std::vector<Optional<Standalone<StringRef>>> excludeMachines; //< Don't recruit any of these machines
std::vector<AddressExclusion> excludeAddresses; //< Don't recruit any of these addresses
std::vector<Optional<Standalone<StringRef>>> includeDCs;
bool criticalRecruitment; //< True if machine classes are to be ignored
ReplyPromise< RecruitStorageReply > reply;
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, excludeMachines, excludeAddresses, includeDCs, criticalRecruitment, reply);
}
};
struct RegisterWorkerReply {
constexpr static FileIdentifier file_identifier = 16475696;
ProcessClass processClass;
ClusterControllerPriorityInfo priorityInfo;
Optional<uint16_t> storageCache;
RegisterWorkerReply() : priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
RegisterWorkerReply(ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, Optional<uint16_t> storageCache) : processClass(processClass), priorityInfo(priorityInfo), storageCache(storageCache) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, processClass, priorityInfo, storageCache);
}
};
struct RegisterWorkerRequest {
constexpr static FileIdentifier file_identifier = 14332605;
WorkerInterface wi;
ProcessClass initialClass;
ProcessClass processClass;
ClusterControllerPriorityInfo priorityInfo;
Generation generation;
Optional<DataDistributorInterface> distributorInterf;
Optional<RatekeeperInterface> ratekeeperInterf;
Optional<std::pair<uint16_t,StorageServerInterface>> storageCacheInterf;
Standalone<VectorRef<StringRef>> issues;
std::vector<NetworkAddress> incompatiblePeers;
ReplyPromise<RegisterWorkerReply> reply;
bool degraded;
RegisterWorkerRequest() : priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown), degraded(false) {}
RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, Generation generation, Optional<DataDistributorInterface> ddInterf, Optional<RatekeeperInterface> rkInterf, Optional<std::pair<uint16_t,StorageServerInterface>> storageCacheInterf, bool degraded) :
wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo), generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), storageCacheInterf(storageCacheInterf), degraded(degraded) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, wi, initialClass, processClass, priorityInfo, generation, distributorInterf, ratekeeperInterf, storageCacheInterf, issues, incompatiblePeers, reply, degraded);
}
};
struct GetWorkersRequest {
constexpr static FileIdentifier file_identifier = 1254174;
enum { TESTER_CLASS_ONLY = 0x1, NON_EXCLUDED_PROCESSES_ONLY = 0x2 };
int flags;
ReplyPromise<vector<WorkerDetails>> reply;
GetWorkersRequest() : flags(0) {}
explicit GetWorkersRequest(int fl) : flags(fl) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, flags, reply);
}
};
struct RegisterMasterRequest {
constexpr static FileIdentifier file_identifier = 10773445;
UID id;
LocalityData mi;
LogSystemConfig logSystemConfig;
std::vector<MasterProxyInterface> proxies;
std::vector<ResolverInterface> resolvers;
DBRecoveryCount recoveryCount;
int64_t registrationCount;
Optional<DatabaseConfiguration> configuration;
std::vector<UID> priorCommittedLogServers;
RecoveryState recoveryState;
bool recoveryStalled;
ReplyPromise<Void> reply;
RegisterMasterRequest() : logSystemConfig(0) {}
template <class Ar>
void serialize(Ar& ar) {
if constexpr (!is_fb_function<Ar>) {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, id, mi, logSystemConfig, proxies, resolvers, recoveryCount, registrationCount, configuration,
priorCommittedLogServers, recoveryState, recoveryStalled, reply);
}
};
#include "fdbserver/ServerDBInfo.h" // include order hack

View File

@ -53,13 +53,13 @@ ACTOR Future<Void> broadcastTxnRequest(TxnStateRequest req, int sendAmount, bool
resetReply( req );
std::vector<Future<Void>> replies;
int currentStream = 0;
for(int i = 0; i < sendAmount && currentStream < req.broadcastInfo.endpoints.size(); i++) {
TreeBroadcastInfo info;
RequestStream<TxnStateRequest> cur(req.broadcastInfo.endpoints[currentStream++]);
while(currentStream < req.broadcastInfo.endpoints.size()*(i+1)/sendAmount) {
info.endpoints.push_back(req.broadcastInfo.endpoints[currentStream++]);
for(int i = 0; i < sendAmount && currentStream < req.broadcastInfo.size(); i++) {
std::vector<Endpoint> endpoints;
RequestStream<TxnStateRequest> cur(req.broadcastInfo[currentStream++]);
while(currentStream < req.broadcastInfo.size()*(i+1)/sendAmount) {
endpoints.push_back(req.broadcastInfo[currentStream++]);
}
req.broadcastInfo = info;
req.broadcastInfo = endpoints;
replies.push_back(brokenPromiseToNever( cur.getReply( req ) ));
resetReply( req );
}

View File

@ -29,6 +29,7 @@
#include "fdbserver/RatekeeperInterface.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/LatencyBandConfig.h"
#include "fdbserver/WorkerInterface.actor.h"
struct ServerDBInfo {
constexpr static FileIdentifier file_identifier = 13838807;
@ -53,7 +54,7 @@ struct ServerDBInfo {
std::vector<std::pair<uint16_t,StorageServerInterface>> storageCaches;
int64_t infoGeneration;
explicit ServerDBInfo() : recoveryCount(0), recoveryState(RecoveryState::UNINITIALIZED), logSystemConfig(0), infoGeneration(0) {}
ServerDBInfo() : recoveryCount(0), recoveryState(RecoveryState::UNINITIALIZED), logSystemConfig(0), infoGeneration(0) {}
bool operator == (ServerDBInfo const& r) const { return id == r.id; }
bool operator != (ServerDBInfo const& r) const { return id != r.id; }
@ -64,4 +65,32 @@ struct ServerDBInfo {
}
};
struct UpdateServerDBInfoRequest {
constexpr static FileIdentifier file_identifier = 9467438;
ServerDBInfo dbInfo;
std::vector<Endpoint> broadcastInfo;
ReplyPromise<std::vector<Endpoint>> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, dbInfo, broadcastInfo, reply);
}
};
struct GetServerDBInfoRequest {
constexpr static FileIdentifier file_identifier = 9467439;
UID knownServerInfoID;
ReplyPromise< CachedSerialization<struct ServerDBInfo> > reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, knownServerInfoID, reply);
}
};
Future<Void> broadcastTxnRequest(TxnStateRequest const& req, int const& sendAmount, bool const& sendReply);
Future<std::vector<Endpoint>> broadcastDBInfoRequest(UpdateServerDBInfoRequest const& req, int const& sendAmount, Optional<Endpoint> const& sender, bool const& sendReply);
#endif

View File

@ -37,7 +37,7 @@
#include "fdbserver/LogSystemConfig.h"
#include "fdbrpc/MultiInterface.h"
#include "fdbclient/ClientWorkerInterface.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/RecoveryState.h"
#include "flow/actorcompiler.h"
struct WorkerInterface {
@ -106,6 +106,230 @@ struct WorkerDetails {
}
};
// This interface and its serialization depend on slicing, since the client will deserialize only the first part of this structure
struct ClusterControllerFullInterface {
constexpr static FileIdentifier file_identifier =
ClusterControllerClientInterface::file_identifier;
ClusterInterface clientInterface;
RequestStream< struct RecruitFromConfigurationRequest > recruitFromConfiguration;
RequestStream< struct RecruitRemoteFromConfigurationRequest > recruitRemoteFromConfiguration;
RequestStream< struct RecruitStorageRequest > recruitStorage;
RequestStream< struct RegisterWorkerRequest > registerWorker;
RequestStream< struct GetWorkersRequest > getWorkers;
RequestStream< struct RegisterMasterRequest > registerMaster;
RequestStream< struct GetServerDBInfoRequest > getServerDBInfo;
UID id() const { return clientInterface.id(); }
bool operator == (ClusterControllerFullInterface const& r) const { return id() == r.id(); }
bool operator != (ClusterControllerFullInterface const& r) const { return id() != r.id(); }
bool hasMessage() {
return clientInterface.hasMessage() ||
recruitFromConfiguration.getFuture().isReady() ||
recruitRemoteFromConfiguration.getFuture().isReady() ||
recruitStorage.getFuture().isReady() ||
registerWorker.getFuture().isReady() ||
getWorkers.getFuture().isReady() ||
registerMaster.getFuture().isReady() ||
getServerDBInfo.getFuture().isReady();
}
void initEndpoints() {
clientInterface.initEndpoints();
recruitFromConfiguration.getEndpoint( TaskPriority::ClusterControllerRecruit );
recruitRemoteFromConfiguration.getEndpoint( TaskPriority::ClusterControllerRecruit );
recruitStorage.getEndpoint( TaskPriority::ClusterController );
registerWorker.getEndpoint( TaskPriority::ClusterControllerWorker );
getWorkers.getEndpoint( TaskPriority::ClusterController );
registerMaster.getEndpoint( TaskPriority::ClusterControllerRegister );
getServerDBInfo.getEndpoint( TaskPriority::ClusterController );
}
template <class Ar>
void serialize(Ar& ar) {
if constexpr (!is_fb_function<Ar>) {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, clientInterface, recruitFromConfiguration, recruitRemoteFromConfiguration, recruitStorage,
registerWorker, getWorkers, registerMaster, getServerDBInfo);
}
};
struct RegisterWorkerReply {
constexpr static FileIdentifier file_identifier = 16475696;
ProcessClass processClass;
ClusterControllerPriorityInfo priorityInfo;
Optional<uint16_t> storageCache;
RegisterWorkerReply() : priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
RegisterWorkerReply(ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, Optional<uint16_t> storageCache) : processClass(processClass), priorityInfo(priorityInfo), storageCache(storageCache) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, processClass, priorityInfo, storageCache);
}
};
struct RegisterMasterRequest {
constexpr static FileIdentifier file_identifier = 10773445;
UID id;
LocalityData mi;
LogSystemConfig logSystemConfig;
std::vector<MasterProxyInterface> proxies;
std::vector<ResolverInterface> resolvers;
DBRecoveryCount recoveryCount;
int64_t registrationCount;
Optional<DatabaseConfiguration> configuration;
std::vector<UID> priorCommittedLogServers;
RecoveryState recoveryState;
bool recoveryStalled;
ReplyPromise<Void> reply;
RegisterMasterRequest() : logSystemConfig(0) {}
template <class Ar>
void serialize(Ar& ar) {
if constexpr (!is_fb_function<Ar>) {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, id, mi, logSystemConfig, proxies, resolvers, recoveryCount, registrationCount, configuration,
priorCommittedLogServers, recoveryState, recoveryStalled, reply);
}
};
struct RecruitFromConfigurationReply {
constexpr static FileIdentifier file_identifier = 2224085;
std::vector<WorkerInterface> backupWorkers;
std::vector<WorkerInterface> tLogs;
std::vector<WorkerInterface> satelliteTLogs;
std::vector<WorkerInterface> proxies;
std::vector<WorkerInterface> resolvers;
std::vector<WorkerInterface> storageServers;
std::vector<WorkerInterface> oldLogRouters;
Optional<Key> dcId;
bool satelliteFallback;
RecruitFromConfigurationReply() : satelliteFallback(false) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, tLogs, satelliteTLogs, proxies, resolvers, storageServers, oldLogRouters, dcId,
satelliteFallback, backupWorkers);
}
};
struct RecruitFromConfigurationRequest {
constexpr static FileIdentifier file_identifier = 2023046;
DatabaseConfiguration configuration;
bool recruitSeedServers;
int maxOldLogRouters;
ReplyPromise< RecruitFromConfigurationReply > reply;
RecruitFromConfigurationRequest() {}
explicit RecruitFromConfigurationRequest(DatabaseConfiguration const& configuration, bool recruitSeedServers, int maxOldLogRouters)
: configuration(configuration), recruitSeedServers(recruitSeedServers), maxOldLogRouters(maxOldLogRouters) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, configuration, recruitSeedServers, maxOldLogRouters, reply);
}
};
struct RecruitRemoteFromConfigurationReply {
constexpr static FileIdentifier file_identifier = 9091392;
std::vector<WorkerInterface> remoteTLogs;
std::vector<WorkerInterface> logRouters;
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, remoteTLogs, logRouters);
}
};
struct RecruitRemoteFromConfigurationRequest {
constexpr static FileIdentifier file_identifier = 3235995;
DatabaseConfiguration configuration;
Optional<Key> dcId;
int logRouterCount;
std::vector<UID> exclusionWorkerIds;
ReplyPromise< RecruitRemoteFromConfigurationReply > reply;
RecruitRemoteFromConfigurationRequest() {}
RecruitRemoteFromConfigurationRequest(DatabaseConfiguration const& configuration, Optional<Key> const& dcId, int logRouterCount, const std::vector<UID> &exclusionWorkerIds) : configuration(configuration), dcId(dcId), logRouterCount(logRouterCount), exclusionWorkerIds(exclusionWorkerIds){}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, configuration, dcId, logRouterCount, exclusionWorkerIds, reply);
}
};
struct RecruitStorageReply {
constexpr static FileIdentifier file_identifier = 15877089;
WorkerInterface worker;
ProcessClass processClass;
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, worker, processClass);
}
};
struct RecruitStorageRequest {
constexpr static FileIdentifier file_identifier = 905920;
std::vector<Optional<Standalone<StringRef>>> excludeMachines; //< Don't recruit any of these machines
std::vector<AddressExclusion> excludeAddresses; //< Don't recruit any of these addresses
std::vector<Optional<Standalone<StringRef>>> includeDCs;
bool criticalRecruitment; //< True if machine classes are to be ignored
ReplyPromise< RecruitStorageReply > reply;
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, excludeMachines, excludeAddresses, includeDCs, criticalRecruitment, reply);
}
};
struct RegisterWorkerRequest {
constexpr static FileIdentifier file_identifier = 14332605;
WorkerInterface wi;
ProcessClass initialClass;
ProcessClass processClass;
ClusterControllerPriorityInfo priorityInfo;
Generation generation;
Optional<DataDistributorInterface> distributorInterf;
Optional<RatekeeperInterface> ratekeeperInterf;
Optional<std::pair<uint16_t,StorageServerInterface>> storageCacheInterf;
Standalone<VectorRef<StringRef>> issues;
std::vector<NetworkAddress> incompatiblePeers;
ReplyPromise<RegisterWorkerReply> reply;
bool degraded;
RegisterWorkerRequest() : priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown), degraded(false) {}
RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, Generation generation, Optional<DataDistributorInterface> ddInterf, Optional<RatekeeperInterface> rkInterf, Optional<std::pair<uint16_t,StorageServerInterface>> storageCacheInterf, bool degraded) :
wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo), generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), storageCacheInterf(storageCacheInterf), degraded(degraded) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, wi, initialClass, processClass, priorityInfo, generation, distributorInterf, ratekeeperInterf, storageCacheInterf, issues, incompatiblePeers, reply, degraded);
}
};
struct GetWorkersRequest {
constexpr static FileIdentifier file_identifier = 1254174;
enum { TESTER_CLASS_ONLY = 0x1, NON_EXCLUDED_PROCESSES_ONLY = 0x2 };
int flags;
ReplyPromise<vector<WorkerDetails>> reply;
GetWorkersRequest() : flags(0) {}
explicit GetWorkersRequest(int fl) : flags(fl) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, flags, reply);
}
};
struct InitializeTLogRequest {
constexpr static FileIdentifier file_identifier = 15604392;
UID recruitmentID;
@ -423,22 +647,6 @@ struct DiskStoreRequest {
}
};
ACTOR Future<Void> broadcastTxnRequest(TxnStateRequest req, int sendAmount, bool sendReply);
ACTOR Future<std::vector<Endpoint>> broadcastDBInfoRequest(UpdateServerDBInfoRequest req, int sendAmount, Optional<Endpoint> sender, bool sendReply);
struct UpdateServerDBInfoRequest {
constexpr static FileIdentifier file_identifier = 9467438;
ServerDBInfo dbInfo;
TreeBroadcastInfo broadcastInfo;
ReplyPromise<std::vector<Endpoint>> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, dbInfo, broadcastInfo, reply);
}
};
struct Role {
static const Role WORKER;
static const Role STORAGE_SERVER;
@ -513,10 +721,6 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQu
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, UID workerID,
bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog);
ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
Reference<ClusterConnectionFile> ccf, LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo);
ACTOR Future<Void> resolver(ResolverInterface proxy, InitializeResolverRequest initReq,
Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> logRouter(TLogInterface interf, InitializeLogRouterRequest req,
@ -550,5 +754,6 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQu
typedef decltype(&tLog) TLogFn;
#include "fdbserver/ServerDBInfo.h"
#include "flow/unactorcompiler.h"
#endif

View File

@ -752,7 +752,7 @@ ACTOR Future<Void> sendInitialCommitToResolvers( Reference<MasterData> self ) {
req.data = data;
req.sequence = txnSequence;
req.last = !nextData.size();
req.broadcastInfo.endpoints = endpoints;
req.broadcastInfo = endpoints;
txnReplies.push_back(broadcastTxnRequest(req, sendAmount, false));
dataOutstanding += sendAmount*data.arena().getSize();
data = nextData;

View File

@ -35,6 +35,7 @@
#include "fdbclient/MonitorLeader.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
using namespace std;
@ -1017,11 +1018,40 @@ vector<TestSpec> readTests( ifstream& ifs ) {
return result;
}
ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo) {
// Initially most of the serverDBInfo is not known, but we know our locality right away
ServerDBInfo localInfo;
localInfo.myLocality = locality;
dbInfo->set(localInfo);
loop {
GetServerDBInfoRequest req;
req.knownServerInfoID = dbInfo->get().id;
choose {
when( CachedSerialization<ServerDBInfo> ni = wait( ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().getServerDBInfo.getReply( req ) ) : Never() ) ) {
ServerDBInfo localInfo = ni.read();
TraceEvent("GotServerDBInfoChange").detail("ChangeID", localInfo.id).detail("MasterID", localInfo.master.id())
.detail("RatekeeperID", localInfo.ratekeeper.present() ? localInfo.ratekeeper.get().id() : UID())
.detail("DataDistributorID", localInfo.distributor.present() ? localInfo.distributor.get().id() : UID());
localInfo.myLocality = locality;
dbInfo->set(localInfo);
}
when( wait( ccInterface->onChange() ) ) {
if(ccInterface->get().present())
TraceEvent("GotCCInterfaceChange").detail("CCID", ccInterface->get().get().id()).detail("CCMachine", ccInterface->get().get().getWorkers.getEndpoint().getPrimaryAddress());
}
}
}
}
ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> cc, Reference<AsyncVar<Optional<struct ClusterInterface>>> ci, vector< TesterInterface > testers, vector<TestSpec> tests, StringRef startingConfiguration, LocalityData locality ) {
state Database cx;
state Reference<AsyncVar<ServerDBInfo>> dbInfo( new AsyncVar<ServerDBInfo> );
state Future<Void> ccMonitor =
monitorServerDBInfo(cc, Reference<ClusterConnectionFile>(), LocalityData(), dbInfo); // FIXME: locality
state Future<Void> ccMonitor = monitorServerDBInfo(cc, LocalityData(), dbInfo); // FIXME: locality
state bool useDB = false;
state bool waitForQuiescenceBegin = false;
@ -1192,7 +1222,7 @@ ACTOR Future<Void> runTests( Reference<ClusterConnectionFile> connFile, test_typ
if (at == TEST_HERE) {
Reference<AsyncVar<ServerDBInfo>> db( new AsyncVar<ServerDBInfo> );
vector<TesterInterface> iTesters(1);
actors.push_back( reportErrors(monitorServerDBInfo( cc, Reference<ClusterConnectionFile>(), LocalityData(), db ), "MonitorServerDBInfo") ); // FIXME: Locality
actors.push_back( reportErrors(monitorServerDBInfo( cc, LocalityData(), db ), "MonitorServerDBInfo") ); // FIXME: Locality
actors.push_back( reportErrors(testerServerCore( iTesters[0], connFile, db, locality ), "TesterServerCore") );
tests = runTests( cc, ci, iTesters, testSpecs, startingConfiguration, locality );
} else {

View File

@ -68,12 +68,12 @@ extern IKeyValueStore* keyValueStoreCompressTestData(IKeyValueStore* store);
#endif
ACTOR Future<std::vector<Endpoint>> tryDBInfoBroadcast(RequestStream<UpdateServerDBInfoRequest> stream, UpdateServerDBInfoRequest req) {
ErrorOr<std::vector<Endpoint>> rep = wait( stream.getReplyUnlessFailedFor(req, 1.0) );
ErrorOr<std::vector<Endpoint>> rep = wait( stream.getReplyUnlessFailedFor(req, 1.0, 0) );
if(rep.present()) {
return rep.get();
}
req.broadcastInfo.endpoints.push_back(stream.getEndpoint());
return req.broadcastInfo.endpoints;
req.broadcastInfo.push_back(stream.getEndpoint());
return req.broadcastInfo;
}
ACTOR Future<std::vector<Endpoint>> broadcastDBInfoRequest(UpdateServerDBInfoRequest req, int sendAmount, Optional<Endpoint> sender, bool sendReply) {
@ -81,13 +81,13 @@ ACTOR Future<std::vector<Endpoint>> broadcastDBInfoRequest(UpdateServerDBInfoReq
state ReplyPromise<std::vector<Endpoint>> reply = req.reply;
resetReply( req );
int currentStream = 0;
for(int i = 0; i < sendAmount && currentStream < req.broadcastInfo.endpoints.size(); i++) {
TreeBroadcastInfo info;
RequestStream<TxnStateRequest> cur(req.broadcastInfo.endpoints[currentStream++]);
while(currentStream < req.broadcastInfo.endpoints.size()*(i+1)/sendAmount) {
info.endpoints.push_back(req.broadcastInfo.endpoints[currentStream++]);
for(int i = 0; i < sendAmount && currentStream < req.broadcastInfo.size(); i++) {
std::vector<Endpoint> endpoints;
RequestStream<UpdateServerDBInfoRequest> cur(req.broadcastInfo[currentStream++]);
while(currentStream < req.broadcastInfo.size()*(i+1)/sendAmount) {
endpoints.push_back(req.broadcastInfo[currentStream++]);
}
req.broadcastInfo = info;
req.broadcastInfo = endpoints;
replies.push_back( tryDBInfoBroadcast( cur, req ) );
resetReply( req );
}
@ -482,7 +482,7 @@ ACTOR Future<Void> registrationClient(
auto peers = FlowTransport::transport().getIncompatiblePeers();
for(auto it = peers->begin(); it != peers->end();) {
if( now() - it->second.second > SERVER_KNOBS->INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING ) {
req.incompatiblePeers.push_back(it->first);
request.incompatiblePeers.push_back(it->first);
it = peers->erase(it);
} else {
it++;
@ -898,7 +898,6 @@ ACTOR Future<Void> workerServer(
errorForwarders.add( resetAfter(degraded, SERVER_KNOBS->DEGRADED_RESET_INTERVAL, false, SERVER_KNOBS->DEGRADED_WARNING_LIMIT, SERVER_KNOBS->DEGRADED_WARNING_RESET_DELAY, "DegradedReset"));
errorForwarders.add( loadedPonger( interf.debugPing.getFuture() ) );
errorForwarders.add( waitFailureServer( interf.waitFailure.getFuture() ) );
errorForwarders.add(monitorServerDBInfo(ccInterface, connFile, locality, dbInfo));
errorForwarders.add( testerServerCore( interf.testerInterface, connFile, dbInfo, locality ) );
errorForwarders.add(monitorHighMemory(memoryProfileThreshold));
@ -1014,13 +1013,17 @@ ACTOR Future<Void> workerServer(
wait(waitForAll(recoveries));
recoveredDiskFiles.send(Void());
errorForwarders.add( registrationClient( ccInterface, interf, asyncPriorityInfo, initialClass, ddInterf, rkInterf, degraded, errors, locality, dbInfo ) );
errorForwarders.add( registrationClient( ccInterface, interf, asyncPriorityInfo, initialClass, ddInterf, rkInterf, degraded, errors, locality, dbInfo, connFile) );
TraceEvent("RecoveriesComplete", interf.id());
loop choose {
when( UpdateServerDBInfoRequest req = waitNext( interf.updateServerDBInfo.getFuture() ) ) {
if(req.dbInfo.clusterInterface == ccInterface->get().get() && (req.dbInfo.infoGeneration > dbInfo->infoGeneration || dbInfo->clusterInterface != ccInterface->get().get())) {
Optional<Endpoint> notUpdated;
if(req.dbInfo.clusterInterface != ccInterface->get().get() || (req.dbInfo.infoGeneration < dbInfo->get().infoGeneration && dbInfo->get().clusterInterface == ccInterface->get().get())) {
notUpdated = interf.updateServerDBInfo.getEndpoint();
}
if(req.dbInfo.clusterInterface == ccInterface->get().get() && (req.dbInfo.infoGeneration > dbInfo->get().infoGeneration || dbInfo->get().clusterInterface != ccInterface->get().get())) {
ServerDBInfo localInfo = req.dbInfo;
TraceEvent("GotServerDBInfoChange").detail("ChangeID", localInfo.id).detail("MasterID", localInfo.master.id())
.detail("RatekeeperID", localInfo.ratekeeper.present() ? localInfo.ratekeeper.get().id() : UID())
@ -1029,11 +1032,6 @@ ACTOR Future<Void> workerServer(
localInfo.myLocality = locality;
dbInfo->set(localInfo);
}
Optional<Endpoint> notUpdated;
if(req.dbInfo.clusterInterface != ccInterface->get().get() || (req.dbInfo.infoGeneration < dbInfo->infoGeneration && dbInfo->clusterInterface == ccInterface->get().get())) {
notUpdated = interf.updateServerDBInfo.getEndpoint();
}
errorForwarders.add(success(broadcastDBInfoRequest(req, 2, notUpdated, true)));
}
when( RebootRequest req = waitNext( interf.clientInterface.reboot.getFuture() ) ) {