Add worker kill unit test

This commit is contained in:
Lukas Joswiak 2021-08-19 17:07:21 -07:00
parent 61a8bb6443
commit d004703cc8
3 changed files with 44 additions and 4 deletions

View File

@ -81,6 +81,7 @@ class ConfigBroadcasterImpl {
Future<Void> consumerFuture;
ActorCollection actors{ false };
std::map<UID, BroadcastClientDetails> clients;
std::map<UID, Future<Void>> clientFailures;
UID id;
CounterCollection cc;
@ -203,6 +204,7 @@ class ConfigBroadcasterImpl {
wait(watcher);
TraceEvent(SevDebug, "ConfigBroadcastClientDied", self->id).detail("ClientID", clientUID);
self->clients.erase(clientUID);
// TODO: Erase clientUID from clientFailures at some point
return Void();
}
@ -230,7 +232,7 @@ class ConfigBroadcasterImpl {
// Push full snapshot to worker if it isn't up to date.
wait(impl->pushSnapshot(impl->mostRecentVersion, client));
impl->clients[broadcastInterface.id()] = client;
impl->actors.add(waitForFailure(impl, watcher, broadcastInterface.id()));
impl->clientFailures[broadcastInterface.id()] = waitForFailure(impl, watcher, broadcastInterface.id());
return Void();
}
@ -345,6 +347,8 @@ public:
Future<Void> getError() const { return consumerFuture || actors.getResult(); }
Future<Void> getClientFailure(UID clientUID) const { return clientFailures.find(clientUID)->second; }
UID getID() const { return id; }
static void runPendingRequestStoreTest(bool includeGlobalMutation, int expectedMatches);
@ -397,6 +401,10 @@ Future<Void> ConfigBroadcaster::getError() const {
return impl->getError();
}
Future<Void> ConfigBroadcaster::getClientFailure(UID clientUID) const {
return impl->getClientFailure(clientUID);
}
UID ConfigBroadcaster::getID() const {
return impl->getID();
}

View File

@ -67,4 +67,5 @@ public:
public: // Testing
explicit ConfigBroadcaster(ConfigFollowerInterface const&);
Future<Void> getClientFailure(UID clientUID) const;
};

View File

@ -246,12 +246,14 @@ class BroadcasterToLocalConfigEnvironment {
ConfigBroadcaster broadcaster;
Version lastWrittenVersion{ 0 };
Future<Void> broadcastServer;
Promise<Void> workerFailure;
ACTOR static Future<Void> setup(BroadcasterToLocalConfigEnvironment* self, ConfigClassSet configClassSet) {
wait(self->readFrom.setup());
self->cbi = makeReference<AsyncVar<ConfigBroadcastInterface>>();
self->readFrom.connectToBroadcaster(self->cbi);
self->broadcastServer = self->broadcaster.registerWorker(0, configClassSet, Never(), self->cbi->get());
self->broadcastServer =
self->broadcaster.registerWorker(0, configClassSet, self->workerFailure.getFuture(), self->cbi->get());
return Void();
}
@ -284,14 +286,18 @@ public:
broadcastServer.cancel();
cbi->set(ConfigBroadcastInterface{});
readFrom.connectToBroadcaster(cbi);
broadcastServer =
broadcaster.registerWorker(readFrom.lastSeenVersion(), readFrom.configClassSet(), Never(), cbi->get());
broadcastServer = broadcaster.registerWorker(
readFrom.lastSeenVersion(), readFrom.configClassSet(), workerFailure.getFuture(), cbi->get());
}
Future<Void> restartLocalConfig(std::string const& newConfigPath) {
return readFrom.restartLocalConfig(newConfigPath);
}
void killLocalConfig() { workerFailure.send(Void()); }
Future<Void> workerFailed() { return broadcaster.getClientFailure(cbi->get().id()); }
void compact() { broadcaster.compact(lastWrittenVersion); }
Future<Void> getError() const { return readFrom.getError() || broadcaster.getError(); }
@ -389,6 +395,7 @@ class TransactionToLocalConfigEnvironment {
Reference<AsyncVar<ConfigBroadcastInterface>> cbi;
ConfigBroadcaster broadcaster;
Future<Void> broadcastServer;
Promise<Void> workerFailure;
ACTOR static Future<Void> setup(TransactionToLocalConfigEnvironment* self, ConfigClassSet configClassSet) {
wait(self->readFrom.setup());
@ -419,6 +426,10 @@ public:
return readFrom.restartLocalConfig(newConfigPath);
}
void killLocalConfig() { workerFailure.send(Void()); }
Future<Void> workerFailed() { return broadcaster.getClientFailure(cbi->get().id()); }
Future<Void> compact() { return writeTo.compact(); }
template <class T>
@ -516,6 +527,16 @@ Future<Void> testNewLocalConfigAfterCompaction(UnitTestParameters params) {
return Void();
}
ACTOR template <class Env>
Future<Void> testKillWorker(UnitTestParameters params) {
state Env env(params.getDataDir(), "class-A");
wait(env.setup(ConfigClassSet({ "class-A"_sr })));
env.killLocalConfig();
// Make sure broadcaster detects worker death in a timely manner.
wait(timeoutError(env.workerFailed(), 3));
return Void();
}
ACTOR template <class Env>
Future<Void> testSet(UnitTestParameters params) {
state Env env(params.getDataDir(), "class-A");
@ -760,6 +781,11 @@ TEST_CASE("/fdbserver/ConfigDB/BroadcasterToLocalConfig/RestartLocalConfiguratio
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/BroadcasterToLocalConfig/KillWorker") {
wait(testKillWorker<BroadcasterToLocalConfigEnvironment>(params));
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/TransactionToLocalConfig/Set") {
wait(testSet<TransactionToLocalConfigEnvironment>(params));
return Void();
@ -804,6 +830,11 @@ TEST_CASE("/fdbserver/ConfigDB/TransactionToLocalConfig/RestartLocalConfiguratio
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/TransactionToLocalconfig/KillWorker") {
wait(testKillWorker<TransactionToLocalConfigEnvironment>(params));
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/Transaction/Set") {
state TransactionEnvironment env(params.getDataDir());
wait(env.setup());