From 267c4deaee8604d25429099148b990e28603894e Mon Sep 17 00:00:00 2001 From: Renxuan Wang Date: Wed, 6 Apr 2022 10:47:00 -0700 Subject: [PATCH] Add tryGetReplyFromHostname() and retryGetReplyFromHostname(). (#6761) * Add hostname to coordination interfaces. * Add tryGetReplyFromHostname() and retryGetReplyFromHostname(). * Change tryGetReplyFromHostname() to call hostname.resolve(). * Add throw for actor_cancelled. --- fdbclient/CoordinationInterface.h | 2 + fdbrpc/genericactors.actor.h | 96 +++++++++++++++++++++++++++++ fdbserver/ConfigFollowerInterface.h | 5 +- fdbserver/CoordinationInterface.h | 3 + flow/Hostname.actor.cpp | 40 +++++++++++- flow/Hostname.h | 1 + 6 files changed, 144 insertions(+), 3 deletions(-) diff --git a/fdbclient/CoordinationInterface.h b/fdbclient/CoordinationInterface.h index 13c6850604..cc28dd5e25 100644 --- a/fdbclient/CoordinationInterface.h +++ b/fdbclient/CoordinationInterface.h @@ -36,10 +36,12 @@ struct ClientLeaderRegInterface { RequestStream getLeader; RequestStream openDatabase; RequestStream checkDescriptorMutable; + Optional hostname; ClientLeaderRegInterface() {} ClientLeaderRegInterface(NetworkAddress remote); ClientLeaderRegInterface(INetwork* local); + ClientLeaderRegInterface(Hostname hostname) : hostname(hostname) {} bool operator==(const ClientLeaderRegInterface& rhs) const { return getLeader == rhs.getLeader && openDatabase == rhs.openDatabase; diff --git a/fdbrpc/genericactors.actor.h b/fdbrpc/genericactors.actor.h index e2fd1885fd..739b34c34b 100644 --- a/fdbrpc/genericactors.actor.h +++ b/fdbrpc/genericactors.actor.h @@ -28,6 +28,8 @@ #include "flow/genericactors.actor.h" #include "fdbrpc/fdbrpc.h" +#include "fdbclient/WellKnownEndpoints.h" +#include "flow/Hostname.h" #include "flow/actorcompiler.h" // This must be the last #include. ACTOR template @@ -70,6 +72,100 @@ Future retryBrokenPromise(RequestStream to, Req request, T } } +ACTOR template +Future> tryGetReplyFromHostname(RequestStream* to, + Req request, + Hostname hostname, + WellKnownEndpoints token) { + // A wrapper of tryGetReply(request), except that the request is sent to an address resolved from a hostname. + // If resolving fails, return lookup_failed(). + // Otherwise, return tryGetReply(request). + try { + wait(hostname.resolve()); + } catch (...) { + return ErrorOr(lookup_failed()); + } + Optional address = hostname.resolvedAddress; + *to = RequestStream(Endpoint::wellKnown({ address.get() }, token)); + return to->tryGetReply(request); +} + +ACTOR template +Future> tryGetReplyFromHostname(RequestStream* to, + Req request, + Hostname hostname, + WellKnownEndpoints token, + TaskPriority taskID) { + // A wrapper of tryGetReply(request), except that the request is sent to an address resolved from a hostname. + // If resolving fails, return lookup_failed(). + // Otherwise, return tryGetReply(request). + try { + wait(hostname.resolve()); + } catch (...) { + return ErrorOr(lookup_failed()); + } + Optional address = hostname.resolvedAddress; + *to = RequestStream(Endpoint::wellKnown({ address.get() }, token)); + return to->tryGetReply(request, taskID); +} + +ACTOR template +Future retryGetReplyFromHostname(RequestStream* to, + Req request, + Hostname hostname, + WellKnownEndpoints token) { + // Like tryGetReplyFromHostname, except that request_maybe_delivered results in re-resolving the hostname. + // Suitable for use with hostname, where RequestStream is NOT initialized yet. + // Not normally useful for endpoints initialized with NetworkAddress. + loop { + wait(hostname.resolveWithRetry()); + state Optional address = hostname.resolvedAddress; + *to = RequestStream(Endpoint::wellKnown({ address.get() }, token)); + ErrorOr reply = wait(to->tryGetReply(request)); + if (reply.isError()) { + resetReply(request); + if (reply.getError().code() == error_code_request_maybe_delivered) { + // Connection failure. + hostname.resetToUnresolved(); + INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service); + } else { + throw reply.getError(); + } + } else { + return reply.get(); + } + } +} + +ACTOR template +Future retryGetReplyFromHostname(RequestStream* to, + Req request, + Hostname hostname, + WellKnownEndpoints token, + TaskPriority taskID) { + // Like tryGetReplyFromHostname, except that request_maybe_delivered results in re-resolving the hostname. + // Suitable for use with hostname, where RequestStream is NOT initialized yet. + // Not normally useful for endpoints initialized with NetworkAddress. + loop { + wait(hostname.resolveWithRetry()); + state Optional address = hostname.resolvedAddress; + *to = RequestStream(Endpoint::wellKnown({ address.get() }, token)); + ErrorOr reply = wait(to->tryGetReply(request, taskID)); + if (reply.isError()) { + resetReply(request); + if (reply.getError().code() == error_code_request_maybe_delivered) { + // Connection failure. + hostname.resetToUnresolved(); + INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service); + } else { + throw reply.getError(); + } + } else { + return reply.get(); + } + } +} + ACTOR template Future timeoutWarning(Future what, double time, PromiseStream output) { state Future end = delay(time); diff --git a/fdbserver/ConfigFollowerInterface.h b/fdbserver/ConfigFollowerInterface.h index 9b901874f7..9191e1aac0 100644 --- a/fdbserver/ConfigFollowerInterface.h +++ b/fdbserver/ConfigFollowerInterface.h @@ -22,6 +22,7 @@ #include "fdbclient/CommitTransaction.h" #include "fdbclient/ConfigKnobs.h" +#include "fdbclient/CoordinationInterface.h" #include "fdbclient/FDBTypes.h" #include "fdbrpc/fdbrpc.h" @@ -215,10 +216,12 @@ public: RequestStream compact; RequestStream rollforward; RequestStream getCommittedVersion; + Optional hostname; ConfigFollowerInterface(); void setupWellKnownEndpoints(); ConfigFollowerInterface(NetworkAddress const& remote); + ConfigFollowerInterface(Hostname hostname) : hostname(hostname) {} bool operator==(ConfigFollowerInterface const& rhs) const; bool operator!=(ConfigFollowerInterface const& rhs) const; UID id() const { return _id; } @@ -226,6 +229,6 @@ public: template void serialize(Ar& ar) { - serializer(ar, _id, getSnapshotAndChanges, getChanges, compact, rollforward, getCommittedVersion); + serializer(ar, _id, getSnapshotAndChanges, getChanges, compact, rollforward, getCommittedVersion, hostname); } }; diff --git a/fdbserver/CoordinationInterface.h b/fdbserver/CoordinationInterface.h index 87998e20bb..461904787c 100644 --- a/fdbserver/CoordinationInterface.h +++ b/fdbserver/CoordinationInterface.h @@ -31,6 +31,7 @@ struct GenerationRegInterface { constexpr static FileIdentifier file_identifier = 16726744; RequestStream read; RequestStream write; + Optional hostname; // read(key,gen2) returns (value,gen,rgen). // If there was no prior write(_,_,0) or a data loss fault, @@ -54,6 +55,7 @@ struct GenerationRegInterface { GenerationRegInterface() {} GenerationRegInterface(NetworkAddress remote); GenerationRegInterface(INetwork* local); + GenerationRegInterface(Hostname hostname) : hostname(hostname){}; }; struct UniqueGeneration { @@ -128,6 +130,7 @@ struct LeaderElectionRegInterface : ClientLeaderRegInterface { LeaderElectionRegInterface() {} LeaderElectionRegInterface(NetworkAddress remote); LeaderElectionRegInterface(INetwork* local); + LeaderElectionRegInterface(Hostname hostname) : ClientLeaderRegInterface(hostname) {} }; struct CandidacyRequest { diff --git a/flow/Hostname.actor.cpp b/flow/Hostname.actor.cpp index 3af5edff65..d7ee5c2bc6 100644 --- a/flow/Hostname.actor.cpp +++ b/flow/Hostname.actor.cpp @@ -66,7 +66,7 @@ ACTOR Future resolveImpl(Hostname* self) { self->status = Hostname::UNRESOLVED; self->resolveFinish.trigger(); self->resolvedAddress = Optional(); - wait(delay(FLOW_KNOBS->HOSTNAME_RESOLVE_DELAY)); + throw lookup_failed(); } } else if (self->status == Hostname::RESOLVING) { wait(self->resolveFinish.onTrigger()); @@ -83,10 +83,28 @@ ACTOR Future resolveImpl(Hostname* self) { return Void(); } +ACTOR Future resolveWithRetryImpl(Hostname* self) { + loop { + try { + wait(resolveImpl(self)); + return Void(); + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) { + throw; + } + wait(delay(FLOW_KNOBS->HOSTNAME_RESOLVE_DELAY)); + } + } +} + Future Hostname::resolve() { return resolveImpl(this); } +Future Hostname::resolveWithRetry() { + return resolveWithRetryImpl(this); +} + void Hostname::resolveBlocking() { if (status != RESOLVED) { try { @@ -164,7 +182,14 @@ TEST_CASE("/flow/Hostname/hostname") { ASSERT(hn4.status == Hostname::UNRESOLVED && !hn4.resolvedAddress.present()); try { - wait(timeoutError(hn2.resolve(), 1)); + wait(hn2.resolve()); + } catch (Error& e) { + ASSERT(e.code() == error_code_lookup_failed); + } + ASSERT(hn2.status == Hostname::UNRESOLVED && !hn2.resolvedAddress.present()); + + try { + wait(timeoutError(hn2.resolveWithRetry(), 1)); } catch (Error& e) { ASSERT(e.code() == error_code_timed_out); } @@ -179,10 +204,21 @@ TEST_CASE("/flow/Hostname/hostname") { state NetworkAddress address = NetworkAddress::parse("127.0.0.0:1234"); INetworkConnections::net()->addMockTCPEndpoint("host-name", "1234", { address }); + + // Test resolve. wait(hn2.resolve()); ASSERT(hn2.status == Hostname::RESOLVED); ASSERT(hn2.resolvedAddress.present() && hn2.resolvedAddress.get() == address); + // Test resolveWithRetry. + hn2.resetToUnresolved(); + ASSERT(hn2.status == Hostname::UNRESOLVED && !hn2.resolvedAddress.present()); + + wait(hn2.resolveWithRetry()); + ASSERT(hn2.status == Hostname::RESOLVED); + ASSERT(hn2.resolvedAddress.present() && hn2.resolvedAddress.get() == address); + + // Test resolveBlocking. hn2.resetToUnresolved(); ASSERT(hn2.status == Hostname::UNRESOLVED && !hn2.resolvedAddress.present()); diff --git a/flow/Hostname.h b/flow/Hostname.h index 61f8351e5b..abfa6e288f 100644 --- a/flow/Hostname.h +++ b/flow/Hostname.h @@ -75,6 +75,7 @@ struct Hostname { Optional resolvedAddress; enum HostnameStatus { UNRESOLVED, RESOLVING, RESOLVED }; Future resolve(); + Future resolveWithRetry(); void resolveBlocking(); // This one should only be used when resolving asynchronously is impossible. // For all other cases, resolve() should be preferred. void resetToUnresolved();