From d004703cc89ad3ef59a50e790b4a354b1904ec83 Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Thu, 19 Aug 2021 17:07:21 -0700 Subject: [PATCH] Add worker kill unit test --- fdbserver/ConfigBroadcaster.actor.cpp | 10 +++++- fdbserver/ConfigBroadcaster.h | 1 + fdbserver/ConfigDatabaseUnitTests.actor.cpp | 37 +++++++++++++++++++-- 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/fdbserver/ConfigBroadcaster.actor.cpp b/fdbserver/ConfigBroadcaster.actor.cpp index c32bd10707..c3000c94d8 100644 --- a/fdbserver/ConfigBroadcaster.actor.cpp +++ b/fdbserver/ConfigBroadcaster.actor.cpp @@ -81,6 +81,7 @@ class ConfigBroadcasterImpl { Future consumerFuture; ActorCollection actors{ false }; std::map clients; + std::map> clientFailures; UID id; CounterCollection cc; @@ -203,6 +204,7 @@ class ConfigBroadcasterImpl { wait(watcher); TraceEvent(SevDebug, "ConfigBroadcastClientDied", self->id).detail("ClientID", clientUID); self->clients.erase(clientUID); + // TODO: Erase clientUID from clientFailures at some point return Void(); } @@ -230,7 +232,7 @@ class ConfigBroadcasterImpl { // Push full snapshot to worker if it isn't up to date. wait(impl->pushSnapshot(impl->mostRecentVersion, client)); impl->clients[broadcastInterface.id()] = client; - impl->actors.add(waitForFailure(impl, watcher, broadcastInterface.id())); + impl->clientFailures[broadcastInterface.id()] = waitForFailure(impl, watcher, broadcastInterface.id()); return Void(); } @@ -345,6 +347,8 @@ public: Future getError() const { return consumerFuture || actors.getResult(); } + Future getClientFailure(UID clientUID) const { return clientFailures.find(clientUID)->second; } + UID getID() const { return id; } static void runPendingRequestStoreTest(bool includeGlobalMutation, int expectedMatches); @@ -397,6 +401,10 @@ Future ConfigBroadcaster::getError() const { return impl->getError(); } +Future ConfigBroadcaster::getClientFailure(UID clientUID) const { + return impl->getClientFailure(clientUID); +} + UID ConfigBroadcaster::getID() const { return impl->getID(); } diff --git a/fdbserver/ConfigBroadcaster.h b/fdbserver/ConfigBroadcaster.h index 9ef33bbde6..338bc87023 100644 --- a/fdbserver/ConfigBroadcaster.h +++ b/fdbserver/ConfigBroadcaster.h @@ -67,4 +67,5 @@ public: public: // Testing explicit ConfigBroadcaster(ConfigFollowerInterface const&); + Future getClientFailure(UID clientUID) const; }; diff --git a/fdbserver/ConfigDatabaseUnitTests.actor.cpp b/fdbserver/ConfigDatabaseUnitTests.actor.cpp index c20c58ac08..a85309f5af 100644 --- a/fdbserver/ConfigDatabaseUnitTests.actor.cpp +++ b/fdbserver/ConfigDatabaseUnitTests.actor.cpp @@ -246,12 +246,14 @@ class BroadcasterToLocalConfigEnvironment { ConfigBroadcaster broadcaster; Version lastWrittenVersion{ 0 }; Future broadcastServer; + Promise workerFailure; ACTOR static Future setup(BroadcasterToLocalConfigEnvironment* self, ConfigClassSet configClassSet) { wait(self->readFrom.setup()); self->cbi = makeReference>(); self->readFrom.connectToBroadcaster(self->cbi); - self->broadcastServer = self->broadcaster.registerWorker(0, configClassSet, Never(), self->cbi->get()); + self->broadcastServer = + self->broadcaster.registerWorker(0, configClassSet, self->workerFailure.getFuture(), self->cbi->get()); return Void(); } @@ -284,14 +286,18 @@ public: broadcastServer.cancel(); cbi->set(ConfigBroadcastInterface{}); readFrom.connectToBroadcaster(cbi); - broadcastServer = - broadcaster.registerWorker(readFrom.lastSeenVersion(), readFrom.configClassSet(), Never(), cbi->get()); + broadcastServer = broadcaster.registerWorker( + readFrom.lastSeenVersion(), readFrom.configClassSet(), workerFailure.getFuture(), cbi->get()); } Future restartLocalConfig(std::string const& newConfigPath) { return readFrom.restartLocalConfig(newConfigPath); } + void killLocalConfig() { workerFailure.send(Void()); } + + Future workerFailed() { return broadcaster.getClientFailure(cbi->get().id()); } + void compact() { broadcaster.compact(lastWrittenVersion); } Future getError() const { return readFrom.getError() || broadcaster.getError(); } @@ -389,6 +395,7 @@ class TransactionToLocalConfigEnvironment { Reference> cbi; ConfigBroadcaster broadcaster; Future broadcastServer; + Promise workerFailure; ACTOR static Future setup(TransactionToLocalConfigEnvironment* self, ConfigClassSet configClassSet) { wait(self->readFrom.setup()); @@ -419,6 +426,10 @@ public: return readFrom.restartLocalConfig(newConfigPath); } + void killLocalConfig() { workerFailure.send(Void()); } + + Future workerFailed() { return broadcaster.getClientFailure(cbi->get().id()); } + Future compact() { return writeTo.compact(); } template @@ -516,6 +527,16 @@ Future testNewLocalConfigAfterCompaction(UnitTestParameters params) { return Void(); } +ACTOR template +Future testKillWorker(UnitTestParameters params) { + state Env env(params.getDataDir(), "class-A"); + wait(env.setup(ConfigClassSet({ "class-A"_sr }))); + env.killLocalConfig(); + // Make sure broadcaster detects worker death in a timely manner. + wait(timeoutError(env.workerFailed(), 3)); + return Void(); +} + ACTOR template Future testSet(UnitTestParameters params) { state Env env(params.getDataDir(), "class-A"); @@ -760,6 +781,11 @@ TEST_CASE("/fdbserver/ConfigDB/BroadcasterToLocalConfig/RestartLocalConfiguratio return Void(); } +TEST_CASE("/fdbserver/ConfigDB/BroadcasterToLocalConfig/KillWorker") { + wait(testKillWorker(params)); + return Void(); +} + TEST_CASE("/fdbserver/ConfigDB/TransactionToLocalConfig/Set") { wait(testSet(params)); return Void(); @@ -804,6 +830,11 @@ TEST_CASE("/fdbserver/ConfigDB/TransactionToLocalConfig/RestartLocalConfiguratio return Void(); } +TEST_CASE("/fdbserver/ConfigDB/TransactionToLocalconfig/KillWorker") { + wait(testKillWorker(params)); + return Void(); +} + TEST_CASE("/fdbserver/ConfigDB/Transaction/Set") { state TransactionEnvironment env(params.getDataDir()); wait(env.setup());