Add processID, networkAddress, and locality to layer status JSON for Backup Agents. (#9736)
* Add processID, networkAddress, and locality to layer status JSON for Backup Agents. * Backup/dr agent determines network address to report in Layer Status only once, when the status updater loop begins, since it is a blocking call which connects to the cluster. And lots of code cleanup.
This commit is contained in:
parent
a685a01c07
commit
216d0be2cf
|
@ -1509,6 +1509,7 @@ DBType getDBType(std::string dbType) {
|
|||
}
|
||||
|
||||
ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr,
|
||||
IPAddress localIP,
|
||||
std::string name,
|
||||
std::string id,
|
||||
ProgramExe exe,
|
||||
|
@ -1544,6 +1545,9 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
|
|||
o.create("main_thread_cpu_seconds") = getProcessorTimeThread();
|
||||
o.create("process_cpu_seconds") = getProcessorTimeProcess();
|
||||
o.create("configured_workers") = CLIENT_KNOBS->BACKUP_TASKS_PER_AGENT;
|
||||
o.create("processID") = ::getpid();
|
||||
o.create("locality") = tr->getDatabase()->clientLocality.toJSON<json_spirit::mObject>();
|
||||
o.create("networkAddress") = localIP.toString();
|
||||
|
||||
if (exe == ProgramExe::AGENT) {
|
||||
static S3BlobStoreEndpoint::Stats last_stats;
|
||||
|
@ -1789,6 +1793,21 @@ ACTOR Future<Void> statusUpdateActor(Database statusUpdateDest,
|
|||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(statusUpdateDest));
|
||||
state Future<Void> pollRateUpdater;
|
||||
|
||||
// In order to report a useful networkAddress to the cluster's layer status JSON object, determine which local
|
||||
// network interface IP will be used to talk to the cluster. This is a blocking call, so it is only done once,
|
||||
// and in a retry loop because if we can't connect to the cluster we can't do any work anyway.
|
||||
state IPAddress localIP;
|
||||
|
||||
loop {
|
||||
try {
|
||||
localIP = statusUpdateDest->getConnectionRecord()->getConnectionString().determineLocalSourceIP();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarn, "AgentCouldNotDetermineLocalIP").error(e);
|
||||
wait(delay(1.0));
|
||||
}
|
||||
}
|
||||
|
||||
// Register the existence of this layer in the meta key space
|
||||
loop {
|
||||
try {
|
||||
|
@ -1811,7 +1830,7 @@ ACTOR Future<Void> statusUpdateActor(Database statusUpdateDest,
|
|||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
state Future<std::string> futureStatusDoc =
|
||||
getLayerStatus(tr, name, id, exe, taskDest, Snapshot::True);
|
||||
getLayerStatus(tr, localIP, name, id, exe, taskDest, Snapshot::True);
|
||||
wait(cleanupStatus(tr, rootKey, name, id));
|
||||
std::string statusdoc = wait(futureStatusDoc);
|
||||
tr->set(instanceKey, statusdoc);
|
||||
|
|
|
@ -34,10 +34,8 @@
|
|||
|
||||
#include "fdbclient/CoordinationInterface.h"
|
||||
|
||||
// Determine public IP address by calling the first available coordinator.
|
||||
// If fail connecting all coordinators, throw bind_failed().
|
||||
IPAddress determinePublicIPAutomatically(ClusterConnectionString& ccs) {
|
||||
int size = ccs.coords.size() + ccs.hostnames.size();
|
||||
IPAddress ClusterConnectionString::determineLocalSourceIP() const {
|
||||
int size = coords.size() + hostnames.size();
|
||||
int index = 0;
|
||||
loop {
|
||||
try {
|
||||
|
@ -48,10 +46,10 @@ IPAddress determinePublicIPAutomatically(ClusterConnectionString& ccs) {
|
|||
|
||||
NetworkAddress coordAddr;
|
||||
// Try coords first, because they don't need to be resolved.
|
||||
if (index < ccs.coords.size()) {
|
||||
coordAddr = ccs.coords[index];
|
||||
if (index < coords.size()) {
|
||||
coordAddr = coords[index];
|
||||
} else {
|
||||
Hostname& h = ccs.hostnames[index - ccs.coords.size()];
|
||||
const Hostname& h = hostnames[index - coords.size()];
|
||||
Optional<NetworkAddress> resolvedAddr = h.resolveBlocking();
|
||||
if (!resolvedAddr.present()) {
|
||||
throw lookup_failed();
|
|
@ -2223,8 +2223,6 @@ void DatabaseContext::expireThrottles() {
|
|||
}
|
||||
}
|
||||
|
||||
extern IPAddress determinePublicIPAutomatically(ClusterConnectionString& ccs);
|
||||
|
||||
// Initialize tracing for FDB client
|
||||
//
|
||||
// connRecord is necessary for determining the local IP, which is then included in the trace
|
||||
|
@ -2256,7 +2254,7 @@ void initializeClientTracing(Reference<IClusterConnectionRecord> connRecord, Opt
|
|||
|
||||
Optional<NetworkAddress> localAddress;
|
||||
if (connRecord) {
|
||||
auto publicIP = determinePublicIPAutomatically(connRecord->getConnectionString());
|
||||
auto publicIP = connRecord->getConnectionString().determineLocalSourceIP();
|
||||
localAddress = NetworkAddress(publicIP, ::getpid());
|
||||
}
|
||||
platform::ImageInfo imageInfo = platform::getImageInfo();
|
||||
|
|
|
@ -88,6 +88,11 @@ public:
|
|||
|
||||
size_t getNumberOfCoordinators() const { return coords.size() + hostnames.size(); }
|
||||
|
||||
// Determine the local source IP used to connect to the cluster by connecting to the first available coordinator.
|
||||
// Throw bind_failed() if no connection attempts were successful.
|
||||
// This function blocks on connection attempts.
|
||||
IPAddress determineLocalSourceIP() const;
|
||||
|
||||
bool operator==(const ClusterConnectionString& other) const noexcept {
|
||||
return key == other.key && keyDesc == other.keyDesc && coords == other.coords && hostnames == other.hostnames;
|
||||
}
|
||||
|
|
|
@ -14,8 +14,10 @@ typedef JsonBuilder JsonString;
|
|||
template <typename T>
|
||||
class JsonBuilderObjectSetter;
|
||||
|
||||
// Class for building JSON string values.
|
||||
// Default value is null, as in the JSON type
|
||||
// Class for building JSON strings linearly.
|
||||
// JSON data structure is only appendable. No key deduplication is done in JSON Objects, and the output is not readable
|
||||
// other than obtaining a complete JSON string of what has been written to the builder. Default value is null, as in the
|
||||
// JSON type
|
||||
class JsonBuilder {
|
||||
protected:
|
||||
enum EType { NULLVALUE, OBJECT, ARRAY };
|
||||
|
|
|
@ -322,6 +322,23 @@ public:
|
|||
return infoString;
|
||||
}
|
||||
|
||||
// Convert locality fields to a JSON object. This is a template because it works with JSONBuilder, StatusObject,
|
||||
// and json_spirit::mObject, and none of these types are in the fdbrpc/ project.
|
||||
template <typename JSONType>
|
||||
JSONType toJSON() const {
|
||||
JSONType obj;
|
||||
|
||||
for (auto it = _data.begin(); it != _data.end(); it++) {
|
||||
if (it->second.present()) {
|
||||
obj[it->first.toString()] = it->second.get().toString();
|
||||
} else {
|
||||
obj[it->first.toString()] = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
return obj;
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
// Locality is persisted in the database inside StorageServerInterface, so changes here have to be
|
||||
|
|
|
@ -240,20 +240,6 @@ protected:
|
|||
int64_t counter;
|
||||
};
|
||||
|
||||
static JsonBuilderObject getLocalityInfo(const LocalityData& locality) {
|
||||
JsonBuilderObject localityObj;
|
||||
|
||||
for (auto it = locality._data.begin(); it != locality._data.end(); it++) {
|
||||
if (it->second.present()) {
|
||||
localityObj[it->first] = it->second.get();
|
||||
} else {
|
||||
localityObj[it->first] = JsonBuilder();
|
||||
}
|
||||
}
|
||||
|
||||
return localityObj;
|
||||
}
|
||||
|
||||
static JsonBuilderObject getError(const TraceEventFields& errorFields) {
|
||||
JsonBuilderObject statusObj;
|
||||
try {
|
||||
|
@ -352,7 +338,7 @@ JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics,
|
|||
}
|
||||
|
||||
if (locality.count(it->first)) {
|
||||
statusObj["locality"] = getLocalityInfo(locality[it->first]);
|
||||
statusObj["locality"] = locality[it->first].toJSON<JsonBuilderObject>();
|
||||
}
|
||||
|
||||
statusObj["address"] = address;
|
||||
|
@ -949,7 +935,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
|
|||
std::string MachineID = processMetrics.getValue("MachineID");
|
||||
statusObj["machine_id"] = MachineID;
|
||||
|
||||
statusObj["locality"] = getLocalityInfo(workerItr->interf.locality);
|
||||
statusObj["locality"] = workerItr->interf.locality.toJSON<JsonBuilderObject>();
|
||||
|
||||
statusObj.setKeyRawNumber("uptime_seconds", processMetrics.getValue("UptimeSeconds"));
|
||||
|
||||
|
|
|
@ -234,8 +234,6 @@ extern void pingtest();
|
|||
extern void copyTest();
|
||||
extern void versionedMapTest();
|
||||
extern void createTemplateDatabase();
|
||||
// FIXME: this really belongs in a header somewhere since it is actually used.
|
||||
extern IPAddress determinePublicIPAutomatically(ClusterConnectionString& ccs);
|
||||
|
||||
extern const char* getSourceVersion();
|
||||
|
||||
|
@ -897,7 +895,7 @@ std::pair<NetworkAddressList, NetworkAddressList> buildNetworkAddresses(
|
|||
if (autoPublicAddress) {
|
||||
try {
|
||||
const NetworkAddress& parsedAddress = NetworkAddress::parse("0.0.0.0:" + publicAddressStr.substr(5));
|
||||
const IPAddress publicIP = determinePublicIPAutomatically(connectionRecord.getConnectionString());
|
||||
const IPAddress publicIP = connectionRecord.getConnectionString().determineLocalSourceIP();
|
||||
currentPublicAddress = NetworkAddress(publicIP, parsedAddress.port, true, parsedAddress.isTLS());
|
||||
} catch (Error& e) {
|
||||
fprintf(stderr,
|
||||
|
@ -1114,7 +1112,7 @@ struct CLIOptions {
|
|||
printHelpTeaser(name);
|
||||
flushAndExit(FDB_EXIT_ERROR);
|
||||
}
|
||||
auto publicIP = determinePublicIPAutomatically(connectionFile->getConnectionString());
|
||||
auto publicIP = connectionFile->getConnectionString().determineLocalSourceIP();
|
||||
publicAddresses.address = NetworkAddress(publicIP, ::getpid());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ Hostname Hostname::parse(const std::string& s) {
|
|||
return Hostname(f.substr(0, colonPos), f.substr(colonPos + 1), isTLS);
|
||||
}
|
||||
|
||||
ACTOR Future<Optional<NetworkAddress>> resolveImpl(Hostname* self) {
|
||||
ACTOR Future<Optional<NetworkAddress>> resolveImpl(const Hostname* self) {
|
||||
try {
|
||||
std::vector<NetworkAddress> addresses =
|
||||
wait(INetworkConnections::net()->resolveTCPEndpointWithDNSCache(self->host, self->service));
|
||||
|
@ -76,7 +76,7 @@ ACTOR Future<Optional<NetworkAddress>> resolveImpl(Hostname* self) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<NetworkAddress> resolveWithRetryImpl(Hostname* self) {
|
||||
ACTOR Future<NetworkAddress> resolveWithRetryImpl(const Hostname* self) {
|
||||
state double resolveInterval = FLOW_KNOBS->HOSTNAME_RESOLVE_INIT_INTERVAL;
|
||||
loop {
|
||||
try {
|
||||
|
@ -97,11 +97,11 @@ Future<Optional<NetworkAddress>> Hostname::resolve() {
|
|||
return resolveImpl(this);
|
||||
}
|
||||
|
||||
Future<NetworkAddress> Hostname::resolveWithRetry() {
|
||||
Future<NetworkAddress> Hostname::resolveWithRetry() const {
|
||||
return resolveWithRetryImpl(this);
|
||||
}
|
||||
|
||||
Optional<NetworkAddress> Hostname::resolveBlocking() {
|
||||
Optional<NetworkAddress> Hostname::resolveBlocking() const {
|
||||
try {
|
||||
std::vector<NetworkAddress> addresses =
|
||||
INetworkConnections::net()->resolveTCPEndpointBlockingWithDNSCache(host, service);
|
||||
|
|
|
@ -60,9 +60,9 @@ struct Hostname {
|
|||
|
||||
// The resolve functions below use DNS cache.
|
||||
Future<Optional<NetworkAddress>> resolve();
|
||||
Future<NetworkAddress> resolveWithRetry();
|
||||
Optional<NetworkAddress> resolveBlocking(); // This one should only be used when resolving asynchronously is
|
||||
// impossible. For all other cases, resolve() should be preferred.
|
||||
Future<NetworkAddress> resolveWithRetry() const;
|
||||
Optional<NetworkAddress> resolveBlocking() const; // This one should only be used when resolving asynchronously is
|
||||
// impossible. For all other cases, resolve() should be preferred.
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
|
|
Loading…
Reference in New Issue