add sendError method to AsyncMap

This commit is contained in:
Xiaoxi Wang 2023-01-05 12:06:51 -08:00
parent 8266f52dea
commit c6c34ba1d2
3 changed files with 43 additions and 17 deletions

View File

@ -90,6 +90,7 @@ public:
bool operator<(Endpoint const& r) const {
return addresses.address < r.addresses.address || (addresses.address == r.addresses.address && token < r.token);
}
bool operator>=(Endpoint const& r) const { return !(*this < r); }
template <class Ar>
void serialize(Ar& ar) {

View File

@ -8743,8 +8743,9 @@ void StorageServer::clearTenants(StringRef startTenant, StringRef endTenant, Ver
if (*itr >= startTenant && *itr < endTenant) {
// Trigger any watches on the prefix associated with the tenant.
Key tenantPrefix = TenantAPI::idToPrefix(itr.key());
watches.triggerRange(tenantPrefix, strinc(tenantPrefix));
TraceEvent("EraseTenant", thisServerID).detail("Tenant", itr.key()).detail("Version", version);
watches.sendError(tenantPrefix, strinc(tenantPrefix), tenant_removed());
tenantsToClear.insert(itr.key());
addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange,

View File

@ -571,26 +571,26 @@ public:
setUnconditional(k, v, i);
}
void setUnconditional(K const& k, V const& v) { setUnconditional(k, v, items[k]); }
void sendError(K const& begin, K const& end, Error const& e) {
if (begin >= end)
return;
std::vector<Promise<Void>> noDestroy = swapRangePromises(items.lower_bound(begin), items.lower_bound(end));
sendError(noDestroy, e);
}
void triggerAll() {
std::vector<Promise<Void>> ps;
for (auto it = items.begin(); it != items.end(); ++it) {
ps.resize(ps.size() + 1);
ps.back().swap(it->second.change);
}
std::vector<Promise<Void>> noDestroy = ps; // See explanation of noDestroy in setUnconditional()
for (auto p = ps.begin(); p != ps.end(); ++p)
p->send(Void());
std::vector<Promise<Void>> noDestroy = swapRangePromises(items.begin(), items.end());
send(noDestroy);
}
void triggerRange(K const& begin, K const& end) {
std::vector<Promise<Void>> ps;
for (auto it = items.lower_bound(begin); it != items.end() && it->first < end; ++it) {
ps.resize(ps.size() + 1);
ps.back().swap(it->second.change);
}
std::vector<Promise<Void>> noDestroy = ps; // See explanation of noDestroy in setUnconditional()
for (auto p = ps.begin(); p != ps.end(); ++p)
p->send(Void());
if (begin >= end)
return;
std::vector<Promise<Void>> noDestroy = swapRangePromises(items.lower_bound(begin), items.lower_bound(end));
send(noDestroy);
}
void trigger(K const& key) {
if (items.count(key) != 0) {
auto& i = items[key];
@ -649,6 +649,30 @@ protected:
const V defaultValue;
bool destructing;
template <typename Iterator>
std::vector<Promise<Void>> swapRangePromises(Iterator begin, Iterator end) {
std::vector<Promise<Void>> ps;
for (auto it = begin; it != end; ++it) {
ps.resize(ps.size() + 1);
ps.back().swap(it->second.change);
}
return ps;
}
// ps can't be a reference. See explanation of noDestroy in setUnconditional()
void send(std::vector<Promise<Void>> ps) {
for (auto& p : ps) {
p.send(Void());
}
}
// ps can't be a reference. See explanation of noDestroy in setUnconditional()
void sendError(std::vector<Promise<Void>> ps, Error const& e) {
for (auto& p : ps) {
p.sendError(e);
}
}
void setUnconditional(K const& k, V const& v, P& i) {
Promise<Void> trigger;
i.change.swap(trigger);