diff --git a/fdbclient/ClusterInterface.h b/fdbclient/ClusterInterface.h index 6b45fa2226..61d6754b8f 100644 --- a/fdbclient/ClusterInterface.h +++ b/fdbclient/ClusterInterface.h @@ -117,7 +117,8 @@ struct OpenDatabaseRequest { // info changes. Returns immediately if the current client info id is different from // knownClientInfoID; otherwise returns when it next changes (or perhaps after a long interval) Arena arena; - StringRef issues, traceLogGroup; + StringRef traceLogGroup; + VectorRef issues; VectorRef supportedVersions; int connectedCoordinatorsNum; // Number of coordinators connected by the client UID knownClientInfoID; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 8813b01eec..71f2d86f0e 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -549,7 +549,7 @@ ACTOR static Future monitorClientInfo( ReferencefileContentsUpToDate(fileConnectionString)) { - req.issues = LiteralStringRef("incorrect_cluster_file_contents"); + req.issues.push_back_deep(req.arena, LiteralStringRef("incorrect_cluster_file_contents")); std::string connectionString = ccf->getConnectionString().toString(); if(!incorrectTime.present()) { incorrectTime = now(); diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index ddc2828dd8..a6a53254cc 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -1270,28 +1270,34 @@ ACTOR Future clusterWatchDatabase( ClusterControllerData* cluster, Cluster } } -void addIssue( ProcessIssuesMap& issueMap, NetworkAddress const& addr, std::string const& issue, UID& issueID ) { - auto& e = issueMap[addr]; - e.first = issue; - e.second = issueID = g_random->randomUniqueID(); - if (!issue.size()) issueMap.erase(addr); +void setIssues(ProcessIssuesMap& issueMap, NetworkAddress const& addr, VectorRef const& issues, + Optional& issueID) { + if (issues.size()) { + auto& e = issueMap[addr]; + e.first = issues; + e.second = g_random->randomUniqueID(); + issueID = e.second; + } else { + issueMap.erase(addr); + issueID = Optional(); + } } -void removeIssue( ProcessIssuesMap& issueMap, NetworkAddress const& addr, std::string const& issue, UID& issueID ) { - if (!issue.size()) return; - if ( issueMap.count(addr) && issueMap[addr].second == issueID ) +void removeIssues(ProcessIssuesMap& issueMap, NetworkAddress const& addr, Optional& issueID) { + if (!issueID.present()) { + return; + } + if (issueMap.count(addr) && issueMap[addr].second == issueID.get()) { issueMap.erase( addr ); + } } -ACTOR Future clusterGetServerInfo( - ClusterControllerData::DBInfo* db, - UID knownServerInfoID, - std::string issues, - std::vector incompatiblePeers, - ReplyPromise reply) -{ - state UID issueID; - addIssue( db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID ); +ACTOR Future clusterGetServerInfo(ClusterControllerData::DBInfo* db, UID knownServerInfoID, + Standalone> issues, + std::vector incompatiblePeers, + ReplyPromise reply) { + state Optional issueID; + setIssues(db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID); for(auto it : incompatiblePeers) { db->incompatibleConnections[it] = now() + SERVER_KNOBS->INCOMPATIBLE_PEERS_LOGGING_INTERVAL; } @@ -1303,24 +1309,20 @@ ACTOR Future clusterGetServerInfo( } } - removeIssue( db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID ); + removeIssues(db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issueID); reply.send( db->serverInfo->get() ); return Void(); } -ACTOR Future clusterOpenDatabase( - ClusterControllerData::DBInfo* db, - UID knownClientInfoID, - std::string issues, - Standalone> supportedVersions, - int connectedCoordinatorsNum, - Standalone traceLogGroup, - ReplyPromise reply) -{ +ACTOR Future clusterOpenDatabase(ClusterControllerData::DBInfo* db, UID knownClientInfoID, + Standalone> issues, + Standalone> supportedVersions, + int connectedCoordinatorsNum, Standalone traceLogGroup, + ReplyPromise reply) { // NOTE: The client no longer expects this function to return errors - state UID issueID; - addIssue( db->clientsWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID ); + state Optional issueID; + setIssues(db->clientsWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID); if(supportedVersions.size() > 0) { db->clientVersionMap[reply.getEndpoint().getPrimaryAddress()] = supportedVersions; @@ -1336,7 +1338,7 @@ ACTOR Future clusterOpenDatabase( } } - removeIssue( db->clientsWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID ); + removeIssues(db->clientsWithIssues, reply.getEndpoint().getPrimaryAddress(), issueID); db->clientVersionMap.erase(reply.getEndpoint().getPrimaryAddress()); db->clientStatusInfoMap.erase(reply.getEndpoint().getPrimaryAddress()); @@ -2639,7 +2641,8 @@ ACTOR Future clusterControllerCore( ClusterControllerFullInterface interf, return Void(); } when( OpenDatabaseRequest req = waitNext( interf.clientInterface.openDatabase.getFuture() ) ) { - self.addActor.send( clusterOpenDatabase( &self.db, req.knownClientInfoID, req.issues.toString(), req.supportedVersions, req.connectedCoordinatorsNum, req.traceLogGroup, req.reply ) ); + self.addActor.send(clusterOpenDatabase(&self.db, req.knownClientInfoID, req.issues, req.supportedVersions, + req.connectedCoordinatorsNum, req.traceLogGroup, req.reply)); } when( RecruitFromConfigurationRequest req = waitNext( interf.recruitFromConfiguration.getFuture() ) ) { self.addActor.send( clusterRecruitFromConfiguration( &self, req ) ); @@ -2690,7 +2693,8 @@ ACTOR Future clusterControllerCore( ClusterControllerFullInterface interf, clusterRegisterMaster( &self, req ); } when( GetServerDBInfoRequest req = waitNext( interf.getServerDBInfo.getFuture() ) ) { - self.addActor.send( clusterGetServerInfo( &self.db, req.knownServerInfoID, req.issues.toString(), req.incompatiblePeers, req.reply ) ); + self.addActor.send( + clusterGetServerInfo(&self.db, req.knownServerInfoID, req.issues, req.incompatiblePeers, req.reply)); } when( wait( leaderFail ) ) { // We are no longer the leader if this has changed. diff --git a/fdbserver/ClusterRecruitmentInterface.h b/fdbserver/ClusterRecruitmentInterface.h index 1dc1c4ad8f..f66b88032d 100644 --- a/fdbserver/ClusterRecruitmentInterface.h +++ b/fdbserver/ClusterRecruitmentInterface.h @@ -224,7 +224,7 @@ struct RegisterMasterRequest { struct GetServerDBInfoRequest { UID knownServerInfoID; - Standalone issues; + Standalone> issues; std::vector incompatiblePeers; ReplyPromise< struct ServerDBInfo > reply; diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index c75676167a..5de6f404f4 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -539,20 +539,12 @@ struct RolesInfo { }; ACTOR static Future processStatusFetcher( - Reference> db, - std::vector workers, - WorkerEvents pMetrics, - WorkerEvents mMetrics, - WorkerEvents errors, - WorkerEvents traceFileOpenErrors, - WorkerEvents programStarts, - std::map processIssues, - vector> storageServers, - vector> tLogs, - vector> proxies, - Database cx, - Optional configuration, - std::set *incomplete_reasons) { + Reference> db, std::vector workers, WorkerEvents pMetrics, + WorkerEvents mMetrics, WorkerEvents errors, WorkerEvents traceFileOpenErrors, WorkerEvents programStarts, + std::map> processIssues, + vector> storageServers, + vector> tLogs, vector> proxies, + Database cx, Optional configuration, std::set* incomplete_reasons) { state JsonBuilderObject processMap; state double metric; @@ -771,8 +763,8 @@ ACTOR static Future processStatusFetcher( std::string strAddress = address.toString(); // If this process has a process issue, identified by strAddress, then add it to messages array - if (processIssues.count(strAddress)){ - messages.push_back(processIssues[strAddress]); + for (auto issue : processIssues[strAddress]) { + messages.push_back(issue); } // If this process had a trace file open error, identified by strAddress, then add it to messages array @@ -1655,13 +1647,18 @@ static std::string getIssueDescription(std::string name) { return name; } -static std::map getProcessIssuesAsMessages( ProcessIssuesMap const& _issues ) { - std::map issuesMap; +static std::map> getProcessIssuesAsMessages( + ProcessIssuesMap const& _issues) { + std::map> issuesMap; try { ProcessIssuesMap issues = _issues; - for (auto i : issues) { - issuesMap[i.first.toString()] = JsonString::makeMessage(i.second.first.c_str(), getIssueDescription(i.second.first).c_str()); + for (auto processIssues : issues) { + for (auto issue : processIssues.second.first) { + std::string issueStr = issue.toString(); + issuesMap[processIssues.first.toString()].push_back( + JsonString::makeMessage(issueStr.c_str(), getIssueDescription(issueStr).c_str())); + } } } catch (Error &e) { @@ -1679,8 +1676,11 @@ static JsonBuilderArray getClientIssuesAsMessages( ProcessIssuesMap const& _issu ProcessIssuesMap issues = _issues; std::map> deduplicatedIssues; - for(auto i : issues) { - deduplicatedIssues[i.second.first].push_back(formatIpPort(i.first.ip, i.first.port)); + for (auto processIssues : issues) { + for (auto issue : processIssues.second.first) { + deduplicatedIssues[issue.toString()].push_back( + formatIpPort(processIssues.first.ip, processIssues.first.port)); + } } for (auto i : deduplicatedIssues) { @@ -1911,7 +1911,8 @@ ACTOR Future clusterGetStatus( statusObj["generation"] = db->get().recoveryCount; } - state std::map processIssues = getProcessIssuesAsMessages(workerIssues); + state std::map> processIssues = + getProcessIssuesAsMessages(workerIssues); state vector> storageServers; state vector> tLogs; state vector> proxies; diff --git a/fdbserver/Status.h b/fdbserver/Status.h index c5ad78532b..90b4f19634 100644 --- a/fdbserver/Status.h +++ b/fdbserver/Status.h @@ -27,7 +27,8 @@ #include "fdbserver/MasterInterface.h" #include "fdbclient/ClusterInterface.h" -typedef std::map< NetworkAddress, std::pair > ProcessIssuesMap; +typedef Standalone> ProcessIssues; +typedef std::map> ProcessIssuesMap; typedef std::map< NetworkAddress, Standalone> > ClientVersionMap; struct ClientStatusInfo { diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 35466d5deb..70746e7b49 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -567,7 +567,7 @@ ACTOR Future monitorServerDBInfo( ReferencefileContentsUpToDate(fileConnectionString)) { - req.issues = LiteralStringRef("incorrect_cluster_file_contents"); + req.issues.push_back_deep(req.issues.arena(), LiteralStringRef("incorrect_cluster_file_contents")); std::string connectionString = connFile->getConnectionString().toString(); if(!incorrectTime.present()) { incorrectTime = now(); diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 7555438d69..6633b2e635 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -55,7 +55,7 @@ using namespace boost::asio::ip; // // xyzdev // vvvv -const uint64_t currentProtocolVersion = 0x0FDB00B061040001LL; +const uint64_t currentProtocolVersion = 0x0FDB00B061050001LL; const uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL; const uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;