Make ConfigFollowerInterface copyable

This commit is contained in:
sfc-gh-tclinkenbeard 2021-04-25 16:07:32 -07:00
parent 807695ff1e
commit c63cf5da26
4 changed files with 28 additions and 29 deletions

View File

@ -138,7 +138,7 @@ struct ConfigFollowerCompactRequest {
} }
}; };
struct ConfigFollowerInterface final : ReferenceCounted<ConfigFollowerInterface> { struct ConfigFollowerInterface {
static constexpr FileIdentifier file_identifier = 7721102; static constexpr FileIdentifier file_identifier = 7721102;
RequestStream<ConfigFollowerGetVersionRequest> getVersion; RequestStream<ConfigFollowerGetVersionRequest> getVersion;
RequestStream<ConfigFollowerGetFullDatabaseRequest> getFullDatabase; RequestStream<ConfigFollowerGetFullDatabaseRequest> getFullDatabase;

View File

@ -23,12 +23,11 @@
#include "fdbclient/CoordinationInterface.h" #include "fdbclient/CoordinationInterface.h"
#include "fdbserver/ConfigFollowerInterface.h" #include "fdbserver/ConfigFollowerInterface.h"
#include "flow/flow.h" #include "flow/flow.h"
#include "flow/FastRef.h"
#include <memory> #include <memory>
class IConfigBroadcaster { class IConfigBroadcaster {
public: public:
virtual Future<Void> serve(Reference<ConfigFollowerInterface>) = 0; virtual Future<Void> serve(ConfigFollowerInterface&) = 0;
}; };
class SimpleConfigBroadcaster : public IConfigBroadcaster { class SimpleConfigBroadcaster : public IConfigBroadcaster {
@ -37,5 +36,5 @@ class SimpleConfigBroadcaster : public IConfigBroadcaster {
public: public:
SimpleConfigBroadcaster(ClusterConnectionString const&); SimpleConfigBroadcaster(ClusterConnectionString const&);
~SimpleConfigBroadcaster(); ~SimpleConfigBroadcaster();
Future<Void> serve(Reference<ConfigFollowerInterface>) override; Future<Void> serve(ConfigFollowerInterface&) override;
}; };

View File

@ -21,7 +21,7 @@
#include "fdbserver/IConfigBroadcaster.h" #include "fdbserver/IConfigBroadcaster.h"
class SimpleConfigBroadcasterImpl { class SimpleConfigBroadcasterImpl {
Reference<ConfigFollowerInterface> subscriber; ConfigFollowerInterface subscriber;
std::map<Key, Value> database; std::map<Key, Value> database;
// TODO: Should create fewer arenas // TODO: Should create fewer arenas
std::deque<Standalone<VersionedMutationRef>> versionedMutations; std::deque<Standalone<VersionedMutationRef>> versionedMutations;
@ -46,7 +46,7 @@ class SimpleConfigBroadcasterImpl {
ACTOR static Future<Void> fetchUpdates(SimpleConfigBroadcasterImpl *self) { ACTOR static Future<Void> fetchUpdates(SimpleConfigBroadcasterImpl *self) {
loop { loop {
try { try {
ConfigFollowerGetChangesReply reply = wait(self->subscriber->getChanges.getReply( ConfigFollowerGetChangesReply reply = wait(self->subscriber.getChanges.getReply(
ConfigFollowerGetChangesRequest{ self->mostRecentVersion, {} })); ConfigFollowerGetChangesRequest{ self->mostRecentVersion, {} }));
++self->successfulChangeRequestOut; ++self->successfulChangeRequestOut;
for (const auto& versionedMutation : reply.versionedMutations) { for (const auto& versionedMutation : reply.versionedMutations) {
@ -63,10 +63,10 @@ class SimpleConfigBroadcasterImpl {
++self->failedChangeRequestOut; ++self->failedChangeRequestOut;
if (e.code() == error_code_version_already_compacted) { if (e.code() == error_code_version_already_compacted) {
ConfigFollowerGetVersionReply versionReply = ConfigFollowerGetVersionReply versionReply =
wait(self->subscriber->getVersion.getReply(ConfigFollowerGetVersionRequest{})); wait(self->subscriber.getVersion.getReply(ConfigFollowerGetVersionRequest{}));
ASSERT(versionReply.version > self->mostRecentVersion); ASSERT(versionReply.version > self->mostRecentVersion);
self->mostRecentVersion = versionReply.version; self->mostRecentVersion = versionReply.version;
ConfigFollowerGetFullDatabaseReply dbReply = wait(self->subscriber->getFullDatabase.getReply( ConfigFollowerGetFullDatabaseReply dbReply = wait(self->subscriber.getFullDatabase.getReply(
ConfigFollowerGetFullDatabaseRequest{ self->mostRecentVersion, Optional<Value>{} })); ConfigFollowerGetFullDatabaseRequest{ self->mostRecentVersion, Optional<Value>{} }));
self->database = dbReply.database; self->database = dbReply.database;
++self->fullDBRequestOut; ++self->fullDBRequestOut;
@ -81,7 +81,7 @@ class SimpleConfigBroadcasterImpl {
loop { loop {
wait(delayJittered(COMPACTION_INTERVAL)); wait(delayJittered(COMPACTION_INTERVAL));
// TODO: Enable compaction once bugs are fixed // TODO: Enable compaction once bugs are fixed
// wait(self->subscriber->compact.getReply(ConfigFollowerCompactRequest{ self->mostRecentVersion })); // wait(self->subscriber.compact.getReply(ConfigFollowerCompactRequest{ self->mostRecentVersion }));
//++self->compactRequestOut; //++self->compactRequestOut;
} }
} }
@ -106,11 +106,11 @@ class SimpleConfigBroadcasterImpl {
database.erase(b, e); database.erase(b, e);
} }
ACTOR static Future<Void> serve(SimpleConfigBroadcasterImpl* self, Reference<ConfigFollowerInterface> publisher) { ACTOR static Future<Void> serve(SimpleConfigBroadcasterImpl* self, ConfigFollowerInterface publisher) {
ConfigFollowerGetVersionReply versionReply = ConfigFollowerGetVersionReply versionReply =
wait(self->subscriber->getVersion.getReply(ConfigFollowerGetVersionRequest{})); wait(self->subscriber.getVersion.getReply(ConfigFollowerGetVersionRequest{}));
self->mostRecentVersion = versionReply.version; self->mostRecentVersion = versionReply.version;
ConfigFollowerGetFullDatabaseReply reply = wait(self->subscriber->getFullDatabase.getReply( ConfigFollowerGetFullDatabaseReply reply = wait(self->subscriber.getFullDatabase.getReply(
ConfigFollowerGetFullDatabaseRequest{ self->mostRecentVersion, Optional<Value>{} })); ConfigFollowerGetFullDatabaseRequest{ self->mostRecentVersion, Optional<Value>{} }));
TraceEvent(SevDebug, "BroadcasterGotInitialFullDB").detail("Version", self->mostRecentVersion); TraceEvent(SevDebug, "BroadcasterGotInitialFullDB").detail("Version", self->mostRecentVersion);
self->database = reply.database; self->database = reply.database;
@ -119,10 +119,10 @@ class SimpleConfigBroadcasterImpl {
loop { loop {
//self->traceQueuedMutations(); //self->traceQueuedMutations();
choose { choose {
when(ConfigFollowerGetVersionRequest req = waitNext(publisher->getVersion.getFuture())) { when(ConfigFollowerGetVersionRequest req = waitNext(publisher.getVersion.getFuture())) {
req.reply.send(self->mostRecentVersion); req.reply.send(self->mostRecentVersion);
} }
when(ConfigFollowerGetFullDatabaseRequest req = waitNext(publisher->getFullDatabase.getFuture())) { when(ConfigFollowerGetFullDatabaseRequest req = waitNext(publisher.getFullDatabase.getFuture())) {
++self->fullDBRequestIn; ++self->fullDBRequestIn;
ConfigFollowerGetFullDatabaseReply reply; ConfigFollowerGetFullDatabaseReply reply;
reply.database = self->database; reply.database = self->database;
@ -148,7 +148,7 @@ class SimpleConfigBroadcasterImpl {
} }
req.reply.send(reply); req.reply.send(reply);
} }
when(ConfigFollowerGetChangesRequest req = waitNext(publisher->getChanges.getFuture())) { when(ConfigFollowerGetChangesRequest req = waitNext(publisher.getChanges.getFuture())) {
if (req.lastSeenVersion < self->lastCompactedVersion) { if (req.lastSeenVersion < self->lastCompactedVersion) {
req.reply.sendError(version_already_compacted()); req.reply.sendError(version_already_compacted());
++self->failedChangeRequestIn; ++self->failedChangeRequestIn;
@ -171,7 +171,7 @@ class SimpleConfigBroadcasterImpl {
req.reply.send(reply); req.reply.send(reply);
++self->successfulChangeRequestIn; ++self->successfulChangeRequestIn;
} }
when(ConfigFollowerCompactRequest req = waitNext(publisher->compact.getFuture())) { when(ConfigFollowerCompactRequest req = waitNext(publisher.compact.getFuture())) {
++self->compactRequestIn; ++self->compactRequestIn;
while (!self->versionedMutations.empty()) { while (!self->versionedMutations.empty()) {
const auto& versionedMutation = self->versionedMutations.front(); const auto& versionedMutation = self->versionedMutations.front();
@ -214,12 +214,12 @@ public:
failedChangeRequestOut("FailedChangeRequestOut", cc), fullDBRequestOut("FullDBRequestOut", cc) { failedChangeRequestOut("FailedChangeRequestOut", cc), fullDBRequestOut("FullDBRequestOut", cc) {
auto coordinators = ccs.coordinators(); auto coordinators = ccs.coordinators();
std::sort(coordinators.begin(), coordinators.end()); std::sort(coordinators.begin(), coordinators.end());
subscriber = makeReference<ConfigFollowerInterface>(coordinators[0]); subscriber = ConfigFollowerInterface(coordinators[0]);
logger = traceCounters( logger = traceCounters(
"ConfigBroadcasterMetrics", UID{}, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ConfigBroadcasterMetrics"); "ConfigBroadcasterMetrics", UID{}, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ConfigBroadcasterMetrics");
} }
Future<Void> serve(Reference<ConfigFollowerInterface> publisher) { return serve(this, publisher); } Future<Void> serve(ConfigFollowerInterface& publisher) { return serve(this, publisher); }
}; };
const double SimpleConfigBroadcasterImpl::POLLING_INTERVAL = 0.5; const double SimpleConfigBroadcasterImpl::POLLING_INTERVAL = 0.5;
@ -230,6 +230,6 @@ SimpleConfigBroadcaster::SimpleConfigBroadcaster(ClusterConnectionString const&
SimpleConfigBroadcaster::~SimpleConfigBroadcaster() = default; SimpleConfigBroadcaster::~SimpleConfigBroadcaster() = default;
Future<Void> SimpleConfigBroadcaster::serve(Reference<ConfigFollowerInterface> publisher) { Future<Void> SimpleConfigBroadcaster::serve(ConfigFollowerInterface& publisher) {
return impl->serve(publisher); return impl->serve(publisher);
} }

View File

@ -48,6 +48,7 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
int getFullDatabaseCount{ 0 }; int getFullDatabaseCount{ 0 };
int compactionCount{ 0 }; int compactionCount{ 0 };
Promise<std::map<uint32_t, uint32_t>> finalSnapshot; // when clients finish, publish final snapshot here Promise<std::map<uint32_t, uint32_t>> finalSnapshot; // when clients finish, publish final snapshot here
std::vector<ConfigFollowerInterface> followerInterfaces;
ACTOR static Future<std::map<uint32_t, uint32_t>> getSnapshot(ConfigurationDatabaseWorkload* self, Database cx) { ACTOR static Future<std::map<uint32_t, uint32_t>> getSnapshot(ConfigurationDatabaseWorkload* self, Database cx) {
state std::map<uint32_t, uint32_t> result; state std::map<uint32_t, uint32_t> result;
@ -214,17 +215,17 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
return Void(); return Void();
} }
ACTOR static Future<Version> getCurrentVersion(Reference<ConfigFollowerInterface> cfi) { ACTOR static Future<Version> getCurrentVersion(ConfigFollowerInterface cfi) {
ConfigFollowerGetVersionReply versionReply = wait(cfi->getVersion.getReply(ConfigFollowerGetVersionRequest{})); ConfigFollowerGetVersionReply versionReply = wait(cfi.getVersion.getReply(ConfigFollowerGetVersionRequest{}));
return versionReply.version; return versionReply.version;
} }
ACTOR static Future<Void> runConsumer(ConfigurationDatabaseWorkload* self, Reference<ConfigFollowerInterface> cfi) { ACTOR static Future<Void> runConsumer(ConfigurationDatabaseWorkload* self, ConfigFollowerInterface cfi) {
state std::map<uint32_t, uint32_t> database; state std::map<uint32_t, uint32_t> database;
state Version mostRecentVersion = wait(getCurrentVersion(cfi)); state Version mostRecentVersion = wait(getCurrentVersion(cfi));
TraceEvent(SevDebug, "ConfigDatabaseConsumerGotInitialDB").detail("Version", mostRecentVersion); TraceEvent(SevDebug, "ConfigDatabaseConsumerGotInitialDB").detail("Version", mostRecentVersion);
ConfigFollowerGetFullDatabaseReply reply = ConfigFollowerGetFullDatabaseReply reply =
wait(cfi->getFullDatabase.getReply(ConfigFollowerGetFullDatabaseRequest{ mostRecentVersion, {} })); wait(cfi.getFullDatabase.getReply(ConfigFollowerGetFullDatabaseRequest{ mostRecentVersion, {} }));
for (const auto& [k, v] : reply.database) { for (const auto& [k, v] : reply.database) {
database[self->fromKey(k)] = self->fromKey(v); database[self->fromKey(k)] = self->fromKey(v);
} }
@ -233,7 +234,7 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
loop { loop {
try { try {
state ConfigFollowerGetChangesReply changesReply = state ConfigFollowerGetChangesReply changesReply =
wait(cfi->getChanges.getReply(ConfigFollowerGetChangesRequest{ mostRecentVersion, {} })); wait(cfi.getChanges.getReply(ConfigFollowerGetChangesRequest{ mostRecentVersion, {} }));
mostRecentVersion = changesReply.mostRecentVersion; mostRecentVersion = changesReply.mostRecentVersion;
for (const auto& versionedMutation : changesReply.versionedMutations) { for (const auto& versionedMutation : changesReply.versionedMutations) {
const auto& mutation = versionedMutation.mutation; const auto& mutation = versionedMutation.mutation;
@ -254,7 +255,7 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
Version version = wait(getCurrentVersion(cfi)); Version version = wait(getCurrentVersion(cfi));
mostRecentVersion = version; mostRecentVersion = version;
ConfigFollowerGetFullDatabaseReply reply = wait( ConfigFollowerGetFullDatabaseReply reply = wait(
cfi->getFullDatabase.getReply(ConfigFollowerGetFullDatabaseRequest{ mostRecentVersion, {} })); cfi.getFullDatabase.getReply(ConfigFollowerGetFullDatabaseRequest{ mostRecentVersion, {} }));
TraceEvent(SevDebug, "ConfigDatabaseConsumerGotFullDB").detail("Version", mostRecentVersion); TraceEvent(SevDebug, "ConfigDatabaseConsumerGotFullDB").detail("Version", mostRecentVersion);
database.clear(); database.clear();
for (const auto& [k, v] : reply.database) { for (const auto& [k, v] : reply.database) {
@ -273,12 +274,11 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
} }
} }
ACTOR static Future<Void> runCompactor(ConfigurationDatabaseWorkload* self, ACTOR static Future<Void> runCompactor(ConfigurationDatabaseWorkload* self, ConfigFollowerInterface cfi) {
Reference<ConfigFollowerInterface> cfi) {
loop { loop {
wait(delay(2 * deterministicRandom()->random01() * self->meanCompactionInterval)); wait(delay(2 * deterministicRandom()->random01() * self->meanCompactionInterval));
Version version = wait(getCurrentVersion(cfi)); Version version = wait(getCurrentVersion(cfi));
wait(cfi->compact.getReply(ConfigFollowerCompactRequest{ version })); wait(cfi.compact.getReply(ConfigFollowerCompactRequest{ version }));
++self->compactionCount; ++self->compactionCount;
} }
} }
@ -286,7 +286,7 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
ACTOR static Future<Void> runBroadcasterAndConsumers(ConfigurationDatabaseWorkload* self, Database cx) { ACTOR static Future<Void> runBroadcasterAndConsumers(ConfigurationDatabaseWorkload* self, Database cx) {
state SimpleConfigBroadcaster broadcaster(cx->getConnectionFile()->getConnectionString()); state SimpleConfigBroadcaster broadcaster(cx->getConnectionFile()->getConnectionString());
state std::vector<Future<Void>> consumers; state std::vector<Future<Void>> consumers;
state Reference<ConfigFollowerInterface> cfi = makeReference<ConfigFollowerInterface>(); state ConfigFollowerInterface cfi = self->followerInterfaces.emplace_back();
for (int i = 0; i < self->numConsumersPerBroadcaster; ++i) { for (int i = 0; i < self->numConsumersPerBroadcaster; ++i) {
consumers.push_back(runConsumer(self, cfi)); consumers.push_back(runConsumer(self, cfi));
} }