Merge pull request #1345 from ajbeamon/support-multiple-client-or-worker-issues

Add support for a client or worker having multiple issues.
This commit is contained in:
Evan Tschannen 2019-03-24 17:27:50 -07:00 committed by GitHub
commit 5e03e178de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 68 additions and 61 deletions

View File

@ -117,7 +117,8 @@ struct OpenDatabaseRequest {
// info changes. Returns immediately if the current client info id is different from
// knownClientInfoID; otherwise returns when it next changes (or perhaps after a long interval)
Arena arena;
StringRef issues, traceLogGroup;
StringRef traceLogGroup;
VectorRef<StringRef> issues;
VectorRef<ClientVersionRef> supportedVersions;
int connectedCoordinatorsNum; // Number of coordinators connected by the client
UID knownClientInfoID;

View File

@ -549,7 +549,7 @@ ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<Cluster
ClusterConnectionString fileConnectionString;
if (ccf && !ccf->fileContentsUpToDate(fileConnectionString)) {
req.issues = LiteralStringRef("incorrect_cluster_file_contents");
req.issues.push_back_deep(req.arena, LiteralStringRef("incorrect_cluster_file_contents"));
std::string connectionString = ccf->getConnectionString().toString();
if(!incorrectTime.present()) {
incorrectTime = now();

View File

@ -1270,28 +1270,34 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
}
}
void addIssue( ProcessIssuesMap& issueMap, NetworkAddress const& addr, std::string const& issue, UID& issueID ) {
auto& e = issueMap[addr];
e.first = issue;
e.second = issueID = g_random->randomUniqueID();
if (!issue.size()) issueMap.erase(addr);
void setIssues(ProcessIssuesMap& issueMap, NetworkAddress const& addr, VectorRef<StringRef> const& issues,
Optional<UID>& issueID) {
if (issues.size()) {
auto& e = issueMap[addr];
e.first = issues;
e.second = g_random->randomUniqueID();
issueID = e.second;
} else {
issueMap.erase(addr);
issueID = Optional<UID>();
}
}
void removeIssue( ProcessIssuesMap& issueMap, NetworkAddress const& addr, std::string const& issue, UID& issueID ) {
if (!issue.size()) return;
if ( issueMap.count(addr) && issueMap[addr].second == issueID )
void removeIssues(ProcessIssuesMap& issueMap, NetworkAddress const& addr, Optional<UID>& issueID) {
if (!issueID.present()) {
return;
}
if (issueMap.count(addr) && issueMap[addr].second == issueID.get()) {
issueMap.erase( addr );
}
}
ACTOR Future<Void> clusterGetServerInfo(
ClusterControllerData::DBInfo* db,
UID knownServerInfoID,
std::string issues,
std::vector<NetworkAddress> incompatiblePeers,
ReplyPromise<ServerDBInfo> reply)
{
state UID issueID;
addIssue( db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID );
ACTOR Future<Void> clusterGetServerInfo(ClusterControllerData::DBInfo* db, UID knownServerInfoID,
Standalone<VectorRef<StringRef>> issues,
std::vector<NetworkAddress> incompatiblePeers,
ReplyPromise<ServerDBInfo> reply) {
state Optional<UID> issueID;
setIssues(db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID);
for(auto it : incompatiblePeers) {
db->incompatibleConnections[it] = now() + SERVER_KNOBS->INCOMPATIBLE_PEERS_LOGGING_INTERVAL;
}
@ -1303,24 +1309,20 @@ ACTOR Future<Void> clusterGetServerInfo(
}
}
removeIssue( db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID );
removeIssues(db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issueID);
reply.send( db->serverInfo->get() );
return Void();
}
ACTOR Future<Void> clusterOpenDatabase(
ClusterControllerData::DBInfo* db,
UID knownClientInfoID,
std::string issues,
Standalone<VectorRef<ClientVersionRef>> supportedVersions,
int connectedCoordinatorsNum,
Standalone<StringRef> traceLogGroup,
ReplyPromise<ClientDBInfo> reply)
{
ACTOR Future<Void> clusterOpenDatabase(ClusterControllerData::DBInfo* db, UID knownClientInfoID,
Standalone<VectorRef<StringRef>> issues,
Standalone<VectorRef<ClientVersionRef>> supportedVersions,
int connectedCoordinatorsNum, Standalone<StringRef> traceLogGroup,
ReplyPromise<ClientDBInfo> reply) {
// NOTE: The client no longer expects this function to return errors
state UID issueID;
addIssue( db->clientsWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID );
state Optional<UID> issueID;
setIssues(db->clientsWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID);
if(supportedVersions.size() > 0) {
db->clientVersionMap[reply.getEndpoint().getPrimaryAddress()] = supportedVersions;
@ -1336,7 +1338,7 @@ ACTOR Future<Void> clusterOpenDatabase(
}
}
removeIssue( db->clientsWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID );
removeIssues(db->clientsWithIssues, reply.getEndpoint().getPrimaryAddress(), issueID);
db->clientVersionMap.erase(reply.getEndpoint().getPrimaryAddress());
db->clientStatusInfoMap.erase(reply.getEndpoint().getPrimaryAddress());
@ -2639,7 +2641,8 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
return Void();
}
when( OpenDatabaseRequest req = waitNext( interf.clientInterface.openDatabase.getFuture() ) ) {
self.addActor.send( clusterOpenDatabase( &self.db, req.knownClientInfoID, req.issues.toString(), req.supportedVersions, req.connectedCoordinatorsNum, req.traceLogGroup, req.reply ) );
self.addActor.send(clusterOpenDatabase(&self.db, req.knownClientInfoID, req.issues, req.supportedVersions,
req.connectedCoordinatorsNum, req.traceLogGroup, req.reply));
}
when( RecruitFromConfigurationRequest req = waitNext( interf.recruitFromConfiguration.getFuture() ) ) {
self.addActor.send( clusterRecruitFromConfiguration( &self, req ) );
@ -2690,7 +2693,8 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
clusterRegisterMaster( &self, req );
}
when( GetServerDBInfoRequest req = waitNext( interf.getServerDBInfo.getFuture() ) ) {
self.addActor.send( clusterGetServerInfo( &self.db, req.knownServerInfoID, req.issues.toString(), req.incompatiblePeers, req.reply ) );
self.addActor.send(
clusterGetServerInfo(&self.db, req.knownServerInfoID, req.issues, req.incompatiblePeers, req.reply));
}
when( wait( leaderFail ) ) {
// We are no longer the leader if this has changed.

View File

@ -224,7 +224,7 @@ struct RegisterMasterRequest {
struct GetServerDBInfoRequest {
UID knownServerInfoID;
Standalone<StringRef> issues;
Standalone<VectorRef<StringRef>> issues;
std::vector<NetworkAddress> incompatiblePeers;
ReplyPromise< struct ServerDBInfo > reply;

View File

@ -539,20 +539,12 @@ struct RolesInfo {
};
ACTOR static Future<JsonBuilderObject> processStatusFetcher(
Reference<AsyncVar<struct ServerDBInfo>> db,
std::vector<WorkerDetails> workers,
WorkerEvents pMetrics,
WorkerEvents mMetrics,
WorkerEvents errors,
WorkerEvents traceFileOpenErrors,
WorkerEvents programStarts,
std::map<std::string, JsonBuilderObject> processIssues,
vector<std::pair<StorageServerInterface, EventMap>> storageServers,
vector<std::pair<TLogInterface, EventMap>> tLogs,
vector<std::pair<MasterProxyInterface, EventMap>> proxies,
Database cx,
Optional<DatabaseConfiguration> configuration,
std::set<std::string> *incomplete_reasons) {
Reference<AsyncVar<struct ServerDBInfo>> db, std::vector<WorkerDetails> workers, WorkerEvents pMetrics,
WorkerEvents mMetrics, WorkerEvents errors, WorkerEvents traceFileOpenErrors, WorkerEvents programStarts,
std::map<std::string, std::vector<JsonBuilderObject>> processIssues,
vector<std::pair<StorageServerInterface, EventMap>> storageServers,
vector<std::pair<TLogInterface, EventMap>> tLogs, vector<std::pair<MasterProxyInterface, EventMap>> proxies,
Database cx, Optional<DatabaseConfiguration> configuration, std::set<std::string>* incomplete_reasons) {
state JsonBuilderObject processMap;
state double metric;
@ -771,8 +763,8 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
std::string strAddress = address.toString();
// If this process has a process issue, identified by strAddress, then add it to messages array
if (processIssues.count(strAddress)){
messages.push_back(processIssues[strAddress]);
for (auto issue : processIssues[strAddress]) {
messages.push_back(issue);
}
// If this process had a trace file open error, identified by strAddress, then add it to messages array
@ -1655,13 +1647,18 @@ static std::string getIssueDescription(std::string name) {
return name;
}
static std::map<std::string, JsonBuilderObject> getProcessIssuesAsMessages( ProcessIssuesMap const& _issues ) {
std::map<std::string, JsonBuilderObject> issuesMap;
static std::map<std::string, std::vector<JsonBuilderObject>> getProcessIssuesAsMessages(
ProcessIssuesMap const& _issues) {
std::map<std::string, std::vector<JsonBuilderObject>> issuesMap;
try {
ProcessIssuesMap issues = _issues;
for (auto i : issues) {
issuesMap[i.first.toString()] = JsonString::makeMessage(i.second.first.c_str(), getIssueDescription(i.second.first).c_str());
for (auto processIssues : issues) {
for (auto issue : processIssues.second.first) {
std::string issueStr = issue.toString();
issuesMap[processIssues.first.toString()].push_back(
JsonString::makeMessage(issueStr.c_str(), getIssueDescription(issueStr).c_str()));
}
}
}
catch (Error &e) {
@ -1679,8 +1676,11 @@ static JsonBuilderArray getClientIssuesAsMessages( ProcessIssuesMap const& _issu
ProcessIssuesMap issues = _issues;
std::map<std::string, std::vector<std::string>> deduplicatedIssues;
for(auto i : issues) {
deduplicatedIssues[i.second.first].push_back(formatIpPort(i.first.ip, i.first.port));
for (auto processIssues : issues) {
for (auto issue : processIssues.second.first) {
deduplicatedIssues[issue.toString()].push_back(
formatIpPort(processIssues.first.ip, processIssues.first.port));
}
}
for (auto i : deduplicatedIssues) {
@ -1911,7 +1911,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
statusObj["generation"] = db->get().recoveryCount;
}
state std::map<std::string, JsonBuilderObject> processIssues = getProcessIssuesAsMessages(workerIssues);
state std::map<std::string, std::vector<JsonBuilderObject>> processIssues =
getProcessIssuesAsMessages(workerIssues);
state vector<std::pair<StorageServerInterface, EventMap>> storageServers;
state vector<std::pair<TLogInterface, EventMap>> tLogs;
state vector<std::pair<MasterProxyInterface, EventMap>> proxies;

View File

@ -27,7 +27,8 @@
#include "fdbserver/MasterInterface.h"
#include "fdbclient/ClusterInterface.h"
typedef std::map< NetworkAddress, std::pair<std::string,UID> > ProcessIssuesMap;
typedef Standalone<VectorRef<StringRef>> ProcessIssues;
typedef std::map<NetworkAddress, std::pair<ProcessIssues, UID>> ProcessIssuesMap;
typedef std::map< NetworkAddress, Standalone<VectorRef<ClientVersionRef>> > ClientVersionMap;
struct ClientStatusInfo {

View File

@ -567,7 +567,7 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
ClusterConnectionString fileConnectionString;
if (connFile && !connFile->fileContentsUpToDate(fileConnectionString)) {
req.issues = LiteralStringRef("incorrect_cluster_file_contents");
req.issues.push_back_deep(req.issues.arena(), LiteralStringRef("incorrect_cluster_file_contents"));
std::string connectionString = connFile->getConnectionString().toString();
if(!incorrectTime.present()) {
incorrectTime = now();

View File

@ -55,7 +55,7 @@ using namespace boost::asio::ip;
//
// xyzdev
// vvvv
const uint64_t currentProtocolVersion = 0x0FDB00B061040001LL;
const uint64_t currentProtocolVersion = 0x0FDB00B061050001LL;
const uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL;
const uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;