Merge pull request #182 from cie/log-group-for-unsupported-clients
Support log group field in status json
This commit is contained in:
commit
01a4888627
|
@ -115,7 +115,7 @@ struct OpenDatabaseRequest {
|
|||
// info changes. Returns immediately if the current client info id is different from
|
||||
// knownClientInfoID; otherwise returns when it next changes (or perhaps after a long interval)
|
||||
Arena arena;
|
||||
StringRef dbName, issues;
|
||||
StringRef dbName, issues, traceLogGroup;
|
||||
VectorRef<ClientVersionRef> supportedVersions;
|
||||
UID knownClientInfoID;
|
||||
ReplyPromise< struct ClientDBInfo > reply;
|
||||
|
@ -123,7 +123,7 @@ struct OpenDatabaseRequest {
|
|||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
ASSERT( ar.protocolVersion() >= 0x0FDB00A400040001LL );
|
||||
ar & dbName & issues & supportedVersions & knownClientInfoID & reply & arena;
|
||||
ar & dbName & issues & supportedVersions & traceLogGroup & knownClientInfoID & reply & arena;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -474,6 +474,7 @@ ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<Cluster
|
|||
req.knownClientInfoID = outInfo->get().id;
|
||||
req.dbName = dbName;
|
||||
req.supportedVersions = VectorRef<ClientVersionRef>(req.arena, networkOptions.supportedVersions);
|
||||
req.traceLogGroup = StringRef(req.arena, networkOptions.traceLogGroup);
|
||||
|
||||
ClusterConnectionString fileConnectionString;
|
||||
if (ccf && !ccf->fileContentsUpToDate(fileConnectionString)) {
|
||||
|
|
|
@ -75,6 +75,7 @@ public:
|
|||
ProcessIssuesMap clientsWithIssues, workersWithIssues;
|
||||
std::map<NetworkAddress, double> incompatibleConnections;
|
||||
ClientVersionMap clientVersionMap;
|
||||
std::map<NetworkAddress, std::string> traceLogGroupMap;
|
||||
Promise<Void> forceMasterFailure;
|
||||
int64_t masterRegistrationCount;
|
||||
DatabaseConfiguration config; // Asynchronously updated via master registration
|
||||
|
@ -951,6 +952,7 @@ ACTOR Future<Void> clusterOpenDatabase(
|
|||
UID knownClientInfoID,
|
||||
std::string issues,
|
||||
Standalone<VectorRef<ClientVersionRef>> supportedVersions,
|
||||
Standalone<StringRef> traceLogGroup,
|
||||
ReplyPromise<ClientDBInfo> reply)
|
||||
{
|
||||
// NOTE: The client no longer expects this function to return errors
|
||||
|
@ -961,6 +963,8 @@ ACTOR Future<Void> clusterOpenDatabase(
|
|||
db->clientVersionMap[reply.getEndpoint().address] = supportedVersions;
|
||||
}
|
||||
|
||||
db->traceLogGroupMap[reply.getEndpoint().address] = traceLogGroup.toString();
|
||||
|
||||
while (db->clientInfo->get().id == knownClientInfoID) {
|
||||
choose {
|
||||
when (Void _ = wait( db->clientInfo->onChange() )) {}
|
||||
|
@ -970,6 +974,7 @@ ACTOR Future<Void> clusterOpenDatabase(
|
|||
|
||||
removeIssue( db->clientsWithIssues, reply.getEndpoint().address, issues, issueID );
|
||||
db->clientVersionMap.erase(reply.getEndpoint().address);
|
||||
db->traceLogGroupMap.erase(reply.getEndpoint().address);
|
||||
|
||||
reply.send( db->clientInfo->get() );
|
||||
return Void();
|
||||
|
@ -1501,7 +1506,7 @@ ACTOR Future<Void> statusServer(FutureStream< StatusRequest> requests,
|
|||
}
|
||||
}
|
||||
|
||||
ErrorOr<StatusReply> result = wait(errorOr(clusterGetStatus(self->db.serverInfo, self->cx, workers, self->db.workersWithIssues, self->db.clientsWithIssues, self->db.clientVersionMap, coordinators, incompatibleConnections)));
|
||||
ErrorOr<StatusReply> result = wait(errorOr(clusterGetStatus(self->db.serverInfo, self->cx, workers, self->db.workersWithIssues, self->db.clientsWithIssues, self->db.clientVersionMap, self->db.traceLogGroupMap, coordinators, incompatibleConnections)));
|
||||
if (result.isError() && result.getError().code() == error_code_actor_cancelled)
|
||||
throw result.getError();
|
||||
|
||||
|
@ -1669,7 +1674,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
return Void();
|
||||
}
|
||||
when( OpenDatabaseRequest req = waitNext( interf.clientInterface.openDatabase.getFuture() ) ) {
|
||||
addActor.send( clusterOpenDatabase( &self.db, req.dbName, req.knownClientInfoID, req.issues.toString(), req.supportedVersions, req.reply ) );
|
||||
addActor.send( clusterOpenDatabase( &self.db, req.dbName, req.knownClientInfoID, req.issues.toString(), req.supportedVersions, req.traceLogGroup, req.reply ) );
|
||||
}
|
||||
when( RecruitFromConfigurationRequest req = waitNext( interf.recruitFromConfiguration.getFuture() ) ) {
|
||||
addActor.send( clusterRecruitFromConfiguration( &self, req ) );
|
||||
|
|
|
@ -868,7 +868,7 @@ ACTOR static Future<StatusObject> processStatusFetcher(
|
|||
return processMap;
|
||||
}
|
||||
|
||||
static StatusObject clientStatusFetcher(ClientVersionMap clientVersionMap) {
|
||||
static StatusObject clientStatusFetcher(ClientVersionMap clientVersionMap, std::map<NetworkAddress, std::string> traceLogGroupMap) {
|
||||
StatusObject clientStatus;
|
||||
|
||||
clientStatus["count"] = (int64_t)clientVersionMap.size();
|
||||
|
@ -890,10 +890,13 @@ static StatusObject clientStatusFetcher(ClientVersionMap clientVersionMap) {
|
|||
|
||||
StatusArray clients = StatusArray();
|
||||
for(auto client : cv.second) {
|
||||
clients.push_back(client.toString());
|
||||
StatusObject cli;
|
||||
cli["address"] = client.toString();
|
||||
cli["log_group"] = traceLogGroupMap[client];
|
||||
clients.push_back(cli);
|
||||
}
|
||||
|
||||
ver["clients"] = clients;
|
||||
ver["connected_clients"] = clients;
|
||||
versionsArray.push_back(ver);
|
||||
}
|
||||
|
||||
|
@ -1688,6 +1691,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
|
|||
ProcessIssuesMap workerIssues,
|
||||
ProcessIssuesMap clientIssues,
|
||||
ClientVersionMap clientVersionMap,
|
||||
std::map<NetworkAddress, std::string> traceLogGroupMap,
|
||||
ServerCoordinators coordinators,
|
||||
std::vector<NetworkAddress> incompatibleConnections )
|
||||
{
|
||||
|
@ -1866,7 +1870,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
|
|||
|
||||
StatusObject processStatus = wait(processStatusFetcher(db, workers, pMetrics, mMetrics, latestError, traceFileOpenErrors, programStarts, processIssues, storageServers, tLogs, cx, configuration, &status_incomplete_reasons));
|
||||
statusObj["processes"] = processStatus;
|
||||
statusObj["clients"] = clientStatusFetcher(clientVersionMap);
|
||||
statusObj["clients"] = clientStatusFetcher(clientVersionMap, traceLogGroupMap);
|
||||
|
||||
StatusArray incompatibleConnectionsArray;
|
||||
for(auto it : incompatibleConnections) {
|
||||
|
|
|
@ -32,6 +32,6 @@ typedef std::map< NetworkAddress, Standalone<VectorRef<ClientVersionRef>> > Clie
|
|||
|
||||
std::string extractAttribute( std::string const& expanded, std::string const& attributeToExtract );
|
||||
Future<StatusReply> clusterGetStatus( Reference<AsyncVar<struct ServerDBInfo>> const& db, Database const& cx, vector<std::pair<WorkerInterface, ProcessClass>> const& workers,
|
||||
ProcessIssuesMap const& workerIssues, ProcessIssuesMap const& clientIssues, ClientVersionMap const& clientVersionMap, ServerCoordinators const& coordinators, std::vector<NetworkAddress> const& incompatibleConnections );
|
||||
ProcessIssuesMap const& workerIssues, ProcessIssuesMap const& clientIssues, ClientVersionMap const& clientVersionMap, std::map<NetworkAddress, std::string> const& traceLogGroupMap, ServerCoordinators const& coordinators, std::vector<NetworkAddress> const& incompatibleConnections );
|
||||
|
||||
#endif
|
||||
|
|
|
@ -51,7 +51,7 @@ using namespace boost::asio::ip;
|
|||
// These impact both communications and the deserialization of certain database and IKeyValueStore keys
|
||||
// xyzdev
|
||||
// vvvv
|
||||
uint64_t currentProtocolVersion = 0x0FDB00A551020001LL;
|
||||
uint64_t currentProtocolVersion = 0x0FDB00A551030001LL;
|
||||
uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL;
|
||||
uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;
|
||||
|
||||
|
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue