Add tryGetReplyFromHostname() and retryGetReplyFromHostname(). (#6761)

* Add hostname to coordination interfaces.

* Add tryGetReplyFromHostname() and retryGetReplyFromHostname().

* Change tryGetReplyFromHostname() to call hostname.resolve().

* Add throw for actor_cancelled.
This commit is contained in:
Renxuan Wang 2022-04-06 10:47:00 -07:00 committed by GitHub
parent 38ec7624b4
commit 267c4deaee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 144 additions and 3 deletions

View File

@ -36,10 +36,12 @@ struct ClientLeaderRegInterface {
RequestStream<struct GetLeaderRequest> getLeader;
RequestStream<struct OpenDatabaseCoordRequest> openDatabase;
RequestStream<struct CheckDescriptorMutableRequest> checkDescriptorMutable;
Optional<Hostname> hostname;
ClientLeaderRegInterface() {}
ClientLeaderRegInterface(NetworkAddress remote);
ClientLeaderRegInterface(INetwork* local);
ClientLeaderRegInterface(Hostname hostname) : hostname(hostname) {}
bool operator==(const ClientLeaderRegInterface& rhs) const {
return getLeader == rhs.getLeader && openDatabase == rhs.openDatabase;

View File

@ -28,6 +28,8 @@
#include "flow/genericactors.actor.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbclient/WellKnownEndpoints.h"
#include "flow/Hostname.h"
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR template <class Req>
@ -70,6 +72,100 @@ Future<REPLY_TYPE(Req)> retryBrokenPromise(RequestStream<Req> to, Req request, T
}
}
ACTOR template <class Req>
Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(RequestStream<Req>* to,
Req request,
Hostname hostname,
WellKnownEndpoints token) {
// A wrapper of tryGetReply(request), except that the request is sent to an address resolved from a hostname.
// If resolving fails, return lookup_failed().
// Otherwise, return tryGetReply(request).
try {
wait(hostname.resolve());
} catch (...) {
return ErrorOr<REPLY_TYPE(Req)>(lookup_failed());
}
Optional<NetworkAddress> address = hostname.resolvedAddress;
*to = RequestStream<Req>(Endpoint::wellKnown({ address.get() }, token));
return to->tryGetReply(request);
}
ACTOR template <class Req>
Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(RequestStream<Req>* to,
Req request,
Hostname hostname,
WellKnownEndpoints token,
TaskPriority taskID) {
// A wrapper of tryGetReply(request), except that the request is sent to an address resolved from a hostname.
// If resolving fails, return lookup_failed().
// Otherwise, return tryGetReply(request).
try {
wait(hostname.resolve());
} catch (...) {
return ErrorOr<REPLY_TYPE(Req)>(lookup_failed());
}
Optional<NetworkAddress> address = hostname.resolvedAddress;
*to = RequestStream<Req>(Endpoint::wellKnown({ address.get() }, token));
return to->tryGetReply(request, taskID);
}
ACTOR template <class Req>
Future<REPLY_TYPE(Req)> retryGetReplyFromHostname(RequestStream<Req>* to,
Req request,
Hostname hostname,
WellKnownEndpoints token) {
// Like tryGetReplyFromHostname, except that request_maybe_delivered results in re-resolving the hostname.
// Suitable for use with hostname, where RequestStream is NOT initialized yet.
// Not normally useful for endpoints initialized with NetworkAddress.
loop {
wait(hostname.resolveWithRetry());
state Optional<NetworkAddress> address = hostname.resolvedAddress;
*to = RequestStream<Req>(Endpoint::wellKnown({ address.get() }, token));
ErrorOr<REPLY_TYPE(Req)> reply = wait(to->tryGetReply(request));
if (reply.isError()) {
resetReply(request);
if (reply.getError().code() == error_code_request_maybe_delivered) {
// Connection failure.
hostname.resetToUnresolved();
INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service);
} else {
throw reply.getError();
}
} else {
return reply.get();
}
}
}
ACTOR template <class Req>
Future<REPLY_TYPE(Req)> retryGetReplyFromHostname(RequestStream<Req>* to,
Req request,
Hostname hostname,
WellKnownEndpoints token,
TaskPriority taskID) {
// Like tryGetReplyFromHostname, except that request_maybe_delivered results in re-resolving the hostname.
// Suitable for use with hostname, where RequestStream is NOT initialized yet.
// Not normally useful for endpoints initialized with NetworkAddress.
loop {
wait(hostname.resolveWithRetry());
state Optional<NetworkAddress> address = hostname.resolvedAddress;
*to = RequestStream<Req>(Endpoint::wellKnown({ address.get() }, token));
ErrorOr<REPLY_TYPE(Req)> reply = wait(to->tryGetReply(request, taskID));
if (reply.isError()) {
resetReply(request);
if (reply.getError().code() == error_code_request_maybe_delivered) {
// Connection failure.
hostname.resetToUnresolved();
INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service);
} else {
throw reply.getError();
}
} else {
return reply.get();
}
}
}
ACTOR template <class T>
Future<T> timeoutWarning(Future<T> what, double time, PromiseStream<Void> output) {
state Future<Void> end = delay(time);

View File

@ -22,6 +22,7 @@
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/ConfigKnobs.h"
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
@ -215,10 +216,12 @@ public:
RequestStream<ConfigFollowerCompactRequest> compact;
RequestStream<ConfigFollowerRollforwardRequest> rollforward;
RequestStream<ConfigFollowerGetCommittedVersionRequest> getCommittedVersion;
Optional<Hostname> hostname;
ConfigFollowerInterface();
void setupWellKnownEndpoints();
ConfigFollowerInterface(NetworkAddress const& remote);
ConfigFollowerInterface(Hostname hostname) : hostname(hostname) {}
bool operator==(ConfigFollowerInterface const& rhs) const;
bool operator!=(ConfigFollowerInterface const& rhs) const;
UID id() const { return _id; }
@ -226,6 +229,6 @@ public:
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, _id, getSnapshotAndChanges, getChanges, compact, rollforward, getCommittedVersion);
serializer(ar, _id, getSnapshotAndChanges, getChanges, compact, rollforward, getCommittedVersion, hostname);
}
};

View File

@ -31,6 +31,7 @@ struct GenerationRegInterface {
constexpr static FileIdentifier file_identifier = 16726744;
RequestStream<struct GenerationRegReadRequest> read;
RequestStream<struct GenerationRegWriteRequest> write;
Optional<Hostname> hostname;
// read(key,gen2) returns (value,gen,rgen).
// If there was no prior write(_,_,0) or a data loss fault,
@ -54,6 +55,7 @@ struct GenerationRegInterface {
GenerationRegInterface() {}
GenerationRegInterface(NetworkAddress remote);
GenerationRegInterface(INetwork* local);
GenerationRegInterface(Hostname hostname) : hostname(hostname){};
};
struct UniqueGeneration {
@ -128,6 +130,7 @@ struct LeaderElectionRegInterface : ClientLeaderRegInterface {
LeaderElectionRegInterface() {}
LeaderElectionRegInterface(NetworkAddress remote);
LeaderElectionRegInterface(INetwork* local);
LeaderElectionRegInterface(Hostname hostname) : ClientLeaderRegInterface(hostname) {}
};
struct CandidacyRequest {

View File

@ -66,7 +66,7 @@ ACTOR Future<Void> resolveImpl(Hostname* self) {
self->status = Hostname::UNRESOLVED;
self->resolveFinish.trigger();
self->resolvedAddress = Optional<NetworkAddress>();
wait(delay(FLOW_KNOBS->HOSTNAME_RESOLVE_DELAY));
throw lookup_failed();
}
} else if (self->status == Hostname::RESOLVING) {
wait(self->resolveFinish.onTrigger());
@ -83,10 +83,28 @@ ACTOR Future<Void> resolveImpl(Hostname* self) {
return Void();
}
ACTOR Future<Void> resolveWithRetryImpl(Hostname* self) {
loop {
try {
wait(resolveImpl(self));
return Void();
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
wait(delay(FLOW_KNOBS->HOSTNAME_RESOLVE_DELAY));
}
}
}
Future<Void> Hostname::resolve() {
return resolveImpl(this);
}
Future<Void> Hostname::resolveWithRetry() {
return resolveWithRetryImpl(this);
}
void Hostname::resolveBlocking() {
if (status != RESOLVED) {
try {
@ -164,7 +182,14 @@ TEST_CASE("/flow/Hostname/hostname") {
ASSERT(hn4.status == Hostname::UNRESOLVED && !hn4.resolvedAddress.present());
try {
wait(timeoutError(hn2.resolve(), 1));
wait(hn2.resolve());
} catch (Error& e) {
ASSERT(e.code() == error_code_lookup_failed);
}
ASSERT(hn2.status == Hostname::UNRESOLVED && !hn2.resolvedAddress.present());
try {
wait(timeoutError(hn2.resolveWithRetry(), 1));
} catch (Error& e) {
ASSERT(e.code() == error_code_timed_out);
}
@ -179,10 +204,21 @@ TEST_CASE("/flow/Hostname/hostname") {
state NetworkAddress address = NetworkAddress::parse("127.0.0.0:1234");
INetworkConnections::net()->addMockTCPEndpoint("host-name", "1234", { address });
// Test resolve.
wait(hn2.resolve());
ASSERT(hn2.status == Hostname::RESOLVED);
ASSERT(hn2.resolvedAddress.present() && hn2.resolvedAddress.get() == address);
// Test resolveWithRetry.
hn2.resetToUnresolved();
ASSERT(hn2.status == Hostname::UNRESOLVED && !hn2.resolvedAddress.present());
wait(hn2.resolveWithRetry());
ASSERT(hn2.status == Hostname::RESOLVED);
ASSERT(hn2.resolvedAddress.present() && hn2.resolvedAddress.get() == address);
// Test resolveBlocking.
hn2.resetToUnresolved();
ASSERT(hn2.status == Hostname::UNRESOLVED && !hn2.resolvedAddress.present());

View File

@ -75,6 +75,7 @@ struct Hostname {
Optional<NetworkAddress> resolvedAddress;
enum HostnameStatus { UNRESOLVED, RESOLVING, RESOLVED };
Future<Void> resolve();
Future<Void> resolveWithRetry();
void resolveBlocking(); // This one should only be used when resolving asynchronously is impossible.
// For all other cases, resolve() should be preferred.
void resetToUnresolved();