Remove linear time loop

This commit is contained in:
Lukas Joswiak 2021-08-23 14:02:41 -07:00
parent f037ac7b19
commit 7e6bc27863
2 changed files with 19 additions and 16 deletions

View File

@ -201,21 +201,10 @@ class ConfigBroadcasterImpl {
}
ACTOR static Future<Void> waitForFailure(ConfigBroadcasterImpl* self, Future<Void> watcher, UID clientUID) {
// Clean up clientFailures futures. The values in this map should only
// be used for testing, and cleaning them up here ensures tests still
// have enough time to read the future while making sure the map
// doesn't grow unbounded.
for (auto it = self->clientFailures.begin(); it != self->clientFailures.end();) {
if (it->second.isReady()) {
it = self->clientFailures.erase(it);
} else {
++it;
}
}
wait(watcher);
TraceEvent(SevDebug, "ConfigBroadcastClientDied", self->id).detail("ClientID", clientUID);
self->clients.erase(clientUID);
self->clientFailures.erase(clientUID);
return Void();
}

View File

@ -247,6 +247,7 @@ class BroadcasterToLocalConfigEnvironment {
Version lastWrittenVersion{ 0 };
Future<Void> broadcastServer;
Promise<Void> workerFailure;
Future<Void> workerFailed_;
ACTOR static Future<Void> setup(BroadcasterToLocalConfigEnvironment* self, ConfigClassSet configClassSet) {
wait(self->readFrom.setup());
@ -294,9 +295,15 @@ public:
return readFrom.restartLocalConfig(newConfigPath);
}
void killLocalConfig() { workerFailure.send(Void()); }
void killLocalConfig() {
workerFailed_ = broadcaster.getClientFailure(cbi->get().id());
workerFailure.send(Void());
}
Future<Void> workerFailed() { return broadcaster.getClientFailure(cbi->get().id()); }
Future<Void> workerFailed() {
ASSERT(workerFailed_.isValid());
return workerFailed_;
}
void compact() { broadcaster.compact(lastWrittenVersion); }
@ -396,6 +403,7 @@ class TransactionToLocalConfigEnvironment {
ConfigBroadcaster broadcaster;
Future<Void> broadcastServer;
Promise<Void> workerFailure;
Future<Void> workerFailed_;
ACTOR static Future<Void> setup(TransactionToLocalConfigEnvironment* self, ConfigClassSet configClassSet) {
wait(self->readFrom.setup());
@ -427,9 +435,15 @@ public:
return readFrom.restartLocalConfig(newConfigPath);
}
void killLocalConfig() { workerFailure.send(Void()); }
void killLocalConfig() {
workerFailed_ = broadcaster.getClientFailure(cbi->get().id());
workerFailure.send(Void());
}
Future<Void> workerFailed() { return broadcaster.getClientFailure(cbi->get().id()); }
Future<Void> workerFailed() {
ASSERT(workerFailed_.isValid());
return workerFailed_;
}
Future<Void> compact() { return writeTo.compact(); }