diff --git a/fdbrpc/include/fdbrpc/FlowTransport.h b/fdbrpc/include/fdbrpc/FlowTransport.h index d8b4652e20..6741997528 100644 --- a/fdbrpc/include/fdbrpc/FlowTransport.h +++ b/fdbrpc/include/fdbrpc/FlowTransport.h @@ -90,6 +90,7 @@ public: bool operator<(Endpoint const& r) const { return addresses.address < r.addresses.address || (addresses.address == r.addresses.address && token < r.token); } + bool operator>=(Endpoint const& r) const { return !(*this < r); } template void serialize(Ar& ar) { diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index c29636f03b..36bad7083a 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -8743,8 +8743,9 @@ void StorageServer::clearTenants(StringRef startTenant, StringRef endTenant, Ver if (*itr >= startTenant && *itr < endTenant) { // Trigger any watches on the prefix associated with the tenant. Key tenantPrefix = TenantAPI::idToPrefix(itr.key()); - watches.triggerRange(tenantPrefix, strinc(tenantPrefix)); TraceEvent("EraseTenant", thisServerID).detail("Tenant", itr.key()).detail("Version", version); + watches.sendError(tenantPrefix, strinc(tenantPrefix), tenant_removed()); + tenantsToClear.insert(itr.key()); addMutationToMutationLog(mLV, MutationRef(MutationRef::ClearRange, diff --git a/flow/include/flow/genericactors.actor.h b/flow/include/flow/genericactors.actor.h index f2fde0c90e..b969a537e6 100644 --- a/flow/include/flow/genericactors.actor.h +++ b/flow/include/flow/genericactors.actor.h @@ -571,26 +571,26 @@ public: setUnconditional(k, v, i); } void setUnconditional(K const& k, V const& v) { setUnconditional(k, v, items[k]); } + + void sendError(K const& begin, K const& end, Error const& e) { + if (begin >= end) + return; + std::vector> noDestroy = swapRangePromises(items.lower_bound(begin), items.lower_bound(end)); + sendError(noDestroy, e); + } + void triggerAll() { - std::vector> ps; - for (auto it = items.begin(); it != items.end(); ++it) { - ps.resize(ps.size() + 1); - ps.back().swap(it->second.change); - } - std::vector> noDestroy = ps; // See explanation of noDestroy in setUnconditional() - for (auto p = ps.begin(); p != ps.end(); ++p) - p->send(Void()); + std::vector> noDestroy = swapRangePromises(items.begin(), items.end()); + send(noDestroy); } + void triggerRange(K const& begin, K const& end) { - std::vector> ps; - for (auto it = items.lower_bound(begin); it != items.end() && it->first < end; ++it) { - ps.resize(ps.size() + 1); - ps.back().swap(it->second.change); - } - std::vector> noDestroy = ps; // See explanation of noDestroy in setUnconditional() - for (auto p = ps.begin(); p != ps.end(); ++p) - p->send(Void()); + if (begin >= end) + return; + std::vector> noDestroy = swapRangePromises(items.lower_bound(begin), items.lower_bound(end)); + send(noDestroy); } + void trigger(K const& key) { if (items.count(key) != 0) { auto& i = items[key]; @@ -649,6 +649,30 @@ protected: const V defaultValue; bool destructing; + template + std::vector> swapRangePromises(Iterator begin, Iterator end) { + std::vector> ps; + for (auto it = begin; it != end; ++it) { + ps.resize(ps.size() + 1); + ps.back().swap(it->second.change); + } + return ps; + } + + // ps can't be a reference. See explanation of noDestroy in setUnconditional() + void send(std::vector> ps) { + for (auto& p : ps) { + p.send(Void()); + } + } + + // ps can't be a reference. See explanation of noDestroy in setUnconditional() + void sendError(std::vector> ps, Error const& e) { + for (auto& p : ps) { + p.sendError(e); + } + } + void setUnconditional(K const& k, V const& v, P& i) { Promise trigger; i.change.swap(trigger);