From 6c1d913ab8614cfafed4c687576bc1bb3dc08208 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Sun, 11 Jul 2021 21:11:21 -0700 Subject: [PATCH 01/15] Prevent masterServer from modifying db --- fdbclient/GlobalConfig.actor.h | 2 +- fdbclient/NativeAPI.actor.cpp | 3 ++- fdbserver/WorkerInterface.actor.h | 4 ++-- fdbserver/masterserver.actor.cpp | 6 +++--- fdbserver/worker.actor.cpp | 4 ++-- flow/genericactors.actor.h | 2 +- 6 files changed, 11 insertions(+), 10 deletions(-) diff --git a/fdbclient/GlobalConfig.actor.h b/fdbclient/GlobalConfig.actor.h index 2d63d8de60..444f1ab697 100644 --- a/fdbclient/GlobalConfig.actor.h +++ b/fdbclient/GlobalConfig.actor.h @@ -72,7 +72,7 @@ public: // to allow global configuration to run transactions on the latest // database. template - static void create(Database& cx, Reference> db, const ClientDBInfo* dbInfo) { + static void create(Database& cx, Reference const> db, const ClientDBInfo* dbInfo) { if (g_network->global(INetwork::enGlobalConfig) == nullptr) { auto config = new GlobalConfig{ cx }; g_network->setGlobal(INetwork::enGlobalConfig, config); diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 5419ae825e..84faf6ffec 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1756,7 +1756,8 @@ Database Database::createDatabase(Reference connFile, } auto database = Database(db); - GlobalConfig::create(database, clientInfo, std::addressof(clientInfo->get())); + GlobalConfig::create( + database, Reference const>(clientInfo), std::addressof(clientInfo->get())); return database; } diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index e642d015dd..3fedfe1c8f 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -831,7 +831,7 @@ ACTOR Future traceRole(Role role, UID roleId); struct ServerDBInfo; -class Database openDBOnServer(Reference> const& db, +class Database openDBOnServer(Reference const> const& db, TaskPriority taskID = TaskPriority::DefaultEndpoint, LockAware = LockAware::FALSE, EnableLocalityLoadBalance = EnableLocalityLoadBalance::TRUE); @@ -879,7 +879,7 @@ ACTOR Future storageServer( Reference connFile); // changes pssi->id() to be the recovered ID); // changes pssi->id() to be the recovered ID ACTOR Future masterServer(MasterInterface mi, - Reference> db, + Reference const> db, Reference>> ccInterface, ServerCoordinators serverCoordinators, LifetimeToken lifetime, diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index ea93b35f7f..b6bf046991 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -228,7 +228,7 @@ struct MasterData : NonCopyable, ReferenceCounted { ReusableCoordinatedState cstate; Promise cstateUpdated; - Reference> dbInfo; + Reference const> dbInfo; int64_t registrationCount; // Number of different MasterRegistrationRequests sent to clusterController RecoveryState recoveryState; @@ -255,7 +255,7 @@ struct MasterData : NonCopyable, ReferenceCounted { Future logger; - MasterData(Reference> const& dbInfo, + MasterData(Reference const> const& dbInfo, MasterInterface const& myInterface, ServerCoordinators const& coordinators, ClusterControllerFullInterface const& clusterController, @@ -1978,7 +1978,7 @@ ACTOR Future masterCore(Reference self) { } ACTOR Future masterServer(MasterInterface mi, - Reference> db, + Reference const> db, Reference>> ccInterface, ServerCoordinators coordinators, LifetimeToken lifetime, diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index dafcfba2e3..2ff5252320 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -122,7 +122,7 @@ ACTOR Future> broadcastDBInfoRequest(UpdateServerDBInfoReq return notUpdated; } -ACTOR static Future extractClientInfo(Reference> db, +ACTOR static Future extractClientInfo(Reference const> db, Reference> info) { state std::vector lastCommitProxyUIDs; state std::vector lastCommitProxies; @@ -136,7 +136,7 @@ ACTOR static Future extractClientInfo(Reference> db } } -Database openDBOnServer(Reference> const& db, +Database openDBOnServer(Reference const> const& db, TaskPriority taskID, LockAware lockAware, EnableLocalityLoadBalance enableLocalityLoadBalance) { diff --git a/flow/genericactors.actor.h b/flow/genericactors.actor.h index d04a0478f8..5cb0344ffb 100644 --- a/flow/genericactors.actor.h +++ b/flow/genericactors.actor.h @@ -700,7 +700,7 @@ private: // Binds an AsyncTrigger object to an AsyncVar, so when the AsyncVar changes // the AsyncTrigger is triggered. ACTOR template -void forward(Reference> from, AsyncTrigger* to) { +void forward(Reference const> from, AsyncTrigger* to) { loop { wait(from->onChange()); to->trigger(); From 8a212862f0fb11705d7d11731962f81f5256f498 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Sun, 11 Jul 2021 22:04:38 -0700 Subject: [PATCH 02/15] Prevent dataDistributor from modifying ServerDBInfo object --- fdbserver/DataDistribution.actor.cpp | 20 ++++++++--------- fdbserver/QuietDatabase.actor.cpp | 31 +++++++++++++++------------ fdbserver/QuietDatabase.h | 21 +++++++++--------- fdbserver/WorkerInterface.actor.h | 2 +- fdbserver/workloads/workloads.actor.h | 2 +- 5 files changed, 40 insertions(+), 36 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 3829d11d2f..b0e579ff84 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -5202,7 +5202,7 @@ ACTOR Future initializeStorage(DDTeamCollection* self, } ACTOR Future storageRecruiter(DDTeamCollection* self, - Reference> db, + Reference const> db, const DDEnabledState* ddEnabledState) { state Future fCandidateWorker; state RecruitStorageRequest lastRequest; @@ -5474,7 +5474,7 @@ ACTOR Future serverGetTeamRequests(TeamCollectionInterface tci, DDTeamColl } } -ACTOR Future remoteRecovered(Reference> db) { +ACTOR Future remoteRecovered(Reference const> db) { TraceEvent("DDTrackerStarting"); while (db->get().recoveryState < RecoveryState::ALL_LOGS_RECRUITED) { TraceEvent("DDTrackerStarting").detail("RecoveryState", (int)db->get().recoveryState); @@ -5500,8 +5500,8 @@ ACTOR Future monitorHealthyTeams(DDTeamCollection* self) { ACTOR Future dataDistributionTeamCollection(Reference teamCollection, Reference initData, TeamCollectionInterface tci, - Reference> db, - const DDEnabledState* ddEnabledState) { + Reference const> db, + DDEnabledState const* ddEnabledState) { state DDTeamCollection* self = teamCollection.getPtr(); state Future loggingTrigger = Void(); state PromiseStream serverRemoved; @@ -5728,16 +5728,16 @@ ACTOR Future pollMoveKeysLock(Database cx, MoveKeysLock lock, const DDEnab } struct DataDistributorData : NonCopyable, ReferenceCounted { - Reference> dbInfo; + Reference const> dbInfo; UID ddId; PromiseStream> addActor; DDTeamCollection* teamCollection; - DataDistributorData(Reference> const& db, UID id) + DataDistributorData(Reference const> const& db, UID id) : dbInfo(db), ddId(id), teamCollection(nullptr) {} }; -ACTOR Future monitorBatchLimitedTime(Reference> db, double* lastLimited) { +ACTOR Future monitorBatchLimitedTime(Reference const> db, double* lastLimited) { loop { wait(delay(SERVER_KNOBS->METRIC_UPDATE_RATE)); @@ -6105,7 +6105,7 @@ static std::set const& normalDataDistributorErrors() { return s; } -ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, Reference> db) { +ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, Reference const> db) { state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::TRUE); state ReadYourWritesTransaction tr(cx); loop { @@ -6249,7 +6249,7 @@ ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, Reference ddSnapCreate(DistributorSnapRequest snapReq, - Reference> db, + Reference const> db, DDEnabledState* ddEnabledState) { state Future dbInfoChange = db->onChange(); if (!ddEnabledState->setDDEnabled(false, snapReq.snapUID)) { @@ -6443,7 +6443,7 @@ ACTOR Future ddGetMetrics(GetDataDistributorMetricsRequest req, return Void(); } -ACTOR Future dataDistributor(DataDistributorInterface di, Reference> db) { +ACTOR Future dataDistributor(DataDistributorInterface di, Reference const> db) { state Reference self(new DataDistributorData(db, di.id())); state Future collection = actorCollection(self->addActor.getFuture()); state PromiseStream getShardMetricsList; diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index 47b9a9f2f3..e0915c3366 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -35,7 +35,7 @@ #include #include "flow/actorcompiler.h" // This must be the last #include. -ACTOR Future> getWorkers(Reference> dbInfo, int flags = 0) { +ACTOR Future> getWorkers(Reference const> dbInfo, int flags = 0) { loop { choose { when(vector w = wait(brokenPromiseToNever( @@ -48,7 +48,7 @@ ACTOR Future> getWorkers(Reference> } // Gets the WorkerInterface representing the Master server. -ACTOR Future getMasterWorker(Database cx, Reference> dbInfo) { +ACTOR Future getMasterWorker(Database cx, Reference const> dbInfo) { TraceEvent("GetMasterWorker").detail("Stage", "GettingWorkers"); loop { @@ -75,7 +75,7 @@ ACTOR Future getMasterWorker(Database cx, Reference getDataDistributorWorker(Database cx, Reference> dbInfo) { +ACTOR Future getDataDistributorWorker(Database cx, Reference const> dbInfo) { TraceEvent("GetDataDistributorWorker").detail("Stage", "GettingWorkers"); loop { @@ -118,7 +118,7 @@ ACTOR Future getDataInFlight(Database cx, WorkerInterface distributorWo } // Gets the number of bytes in flight from the data distributor. -ACTOR Future getDataInFlight(Database cx, Reference> dbInfo) { +ACTOR Future getDataInFlight(Database cx, Reference const> dbInfo) { WorkerInterface distributorInterf = wait(getDataDistributorWorker(cx, dbInfo)); int64_t dataInFlight = wait(getDataInFlight(cx, distributorInterf)); return dataInFlight; @@ -144,7 +144,7 @@ int64_t getPoppedVersionLag(const TraceEventFields& md) { return persistentDataDurableVersion - queuePoppedVersion; } -ACTOR Future> getCoordWorkers(Database cx, Reference> dbInfo) { +ACTOR Future> getCoordWorkers(Database cx, Reference const> dbInfo) { state std::vector workers = wait(getWorkers(dbInfo)); Optional coordinators = @@ -177,7 +177,8 @@ ACTOR Future> getCoordWorkers(Database cx, Reference> getTLogQueueInfo(Database cx, Reference> dbInfo) { +ACTOR Future> getTLogQueueInfo(Database cx, + Reference const> dbInfo) { TraceEvent("MaxTLogQueueSize").detail("Stage", "ContactingLogs"); state std::vector workers = wait(getWorkers(dbInfo)); @@ -245,7 +246,7 @@ ACTOR Future> getStorageServers(Database cx, bool } ACTOR Future> getStorageWorkers(Database cx, - Reference> dbInfo, + Reference const> dbInfo, bool localOnly) { state std::vector servers = wait(getStorageServers(cx)); state std::map workersMap; @@ -335,7 +336,7 @@ ACTOR Future getStorageMetricsTimeout(UID storage, WorkerInter }; // Gets the maximum size of all the storage server queues -ACTOR Future getMaxStorageServerQueueSize(Database cx, Reference> dbInfo) { +ACTOR Future getMaxStorageServerQueueSize(Database cx, Reference const> dbInfo) { TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ContactingStorageServers"); Future> serversFuture = getStorageServers(cx); @@ -399,7 +400,7 @@ ACTOR Future getDataDistributionQueueSize(Database cx, // Gets the size of the data distribution queue. If reportInFlight is true, then data in flight is considered part of // the queue Convenience method that first finds the master worker from a zookeeper interface ACTOR Future getDataDistributionQueueSize(Database cx, - Reference> dbInfo, + Reference const> dbInfo, bool reportInFlight) { WorkerInterface distributorInterf = wait(getDataDistributorWorker(cx, dbInfo)); int64_t inQueue = wait(getDataDistributionQueueSize(cx, distributorInterf, reportInFlight)); @@ -516,7 +517,7 @@ ACTOR Future getTeamCollectionValid(Database cx, WorkerInterface dataDistr // Gets if the number of process and machine teams does not exceed the maximum allowed number of teams // Convenience method that first finds the master worker from a zookeeper interface -ACTOR Future getTeamCollectionValid(Database cx, Reference> dbInfo) { +ACTOR Future getTeamCollectionValid(Database cx, Reference const> dbInfo) { WorkerInterface dataDistributorWorker = wait(getDataDistributorWorker(cx, dbInfo)); bool valid = wait(getTeamCollectionValid(cx, dataDistributorWorker)); return valid; @@ -565,7 +566,9 @@ ACTOR Future getStorageServersRecruiting(Database cx, WorkerInterface dist } } -ACTOR Future repairDeadDatacenter(Database cx, Reference> dbInfo, std::string context) { +ACTOR Future repairDeadDatacenter(Database cx, + Reference const> dbInfo, + std::string context) { if (g_network->isSimulated() && g_simulator.usableRegions > 1) { bool primaryDead = g_simulator.datacenterDead(g_simulator.primaryDcId); bool remoteDead = g_simulator.datacenterDead(g_simulator.remoteDcId); @@ -601,7 +604,7 @@ ACTOR Future repairDeadDatacenter(Database cx, Reference reconfigureAfter(Database cx, double time, - Reference> dbInfo, + Reference const> dbInfo, std::string context) { wait(delay(time)); wait(repairDeadDatacenter(cx, dbInfo, context)); @@ -611,7 +614,7 @@ ACTOR Future reconfigureAfter(Database cx, // Waits until a database quiets down (no data in flight, small tlog queue, low SQ, no active data distribution). This // requires the database to be available and healthy in order to succeed. ACTOR Future waitForQuietDatabase(Database cx, - Reference> dbInfo, + Reference const> dbInfo, std::string phase, int64_t dataInFlightGate = 2e6, int64_t maxTLogQueueGate = 5e6, @@ -747,7 +750,7 @@ ACTOR Future waitForQuietDatabase(Database cx, } Future quietDatabase(Database const& cx, - Reference> const& dbInfo, + Reference const> const& dbInfo, std::string phase, int64_t dataInFlightGate, int64_t maxTLogQueueGate, diff --git a/fdbserver/QuietDatabase.h b/fdbserver/QuietDatabase.h index 37897e63fe..6a7ddc6d5e 100644 --- a/fdbserver/QuietDatabase.h +++ b/fdbserver/QuietDatabase.h @@ -28,25 +28,26 @@ #include "fdbserver/WorkerInterface.actor.h" #include "flow/actorcompiler.h" -Future getDataInFlight(Database const& cx, Reference> const&); +Future getDataInFlight(Database const& cx, Reference const> const&); Future> getTLogQueueInfo(Database const& cx, - Reference> const&); -Future getMaxStorageServerQueueSize(Database const& cx, Reference> const&); + Reference const> const&); +Future getMaxStorageServerQueueSize(Database const& cx, Reference const> const&); Future getDataDistributionQueueSize(Database const& cx, - Reference> const&, + Reference const> const&, bool const& reportInFlight); Future getTeamCollectionValid(Database const& cx, WorkerInterface const&); -Future getTeamCollectionValid(Database const& cx, Reference> const&); +Future getTeamCollectionValid(Database const& cx, Reference const> const&); Future> getStorageServers(Database const& cx, bool const& use_system_priority = false); -Future> getWorkers(Reference> const& dbInfo, int const& flags = 0); -Future getMasterWorker(Database const& cx, Reference> const& dbInfo); +Future> getWorkers(Reference const> const& dbInfo, int const& flags = 0); +Future getMasterWorker(Database const& cx, Reference const> const& dbInfo); Future repairDeadDatacenter(Database const& cx, - Reference> const& dbInfo, + Reference const> const& dbInfo, std::string const& context); Future> getStorageWorkers(Database const& cx, - Reference> const& dbInfo, + Reference const> const& dbInfo, bool const& localOnly); -Future> getCoordWorkers(Database const& cx, Reference> const& dbInfo); +Future> getCoordWorkers(Database const& cx, + Reference const> const& dbInfo); #include "flow/unactorcompiler.h" #endif diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 3fedfe1c8f..5937855574 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -910,7 +910,7 @@ ACTOR Future resolver(ResolverInterface resolver, ACTOR Future logRouter(TLogInterface interf, InitializeLogRouterRequest req, Reference> db); -ACTOR Future dataDistributor(DataDistributorInterface ddi, Reference> db); +ACTOR Future dataDistributor(DataDistributorInterface ddi, Reference const> db); ACTOR Future ratekeeper(RatekeeperInterface rki, Reference> db); ACTOR Future storageCacheServer(StorageServerInterface interf, uint16_t id, Reference> db); ACTOR Future backupWorker(BackupInterface bi, InitializeBackupRequest req, Reference> db); diff --git a/fdbserver/workloads/workloads.actor.h b/fdbserver/workloads/workloads.actor.h index fa3fb62571..ad89b1cce5 100644 --- a/fdbserver/workloads/workloads.actor.h +++ b/fdbserver/workloads/workloads.actor.h @@ -223,7 +223,7 @@ double testKeyToDouble(const KeyRef& p, const KeyRef& prefix); ACTOR Future databaseWarmer(Database cx); Future quietDatabase(Database const& cx, - Reference> const&, + Reference const> const&, std::string phase, int64_t dataInFlightGate = 2e6, int64_t maxTLogQueueGate = 5e6, From edbac4a26a569b41f077e8a49dbceb003750ec79 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Sun, 11 Jul 2021 21:14:38 -0700 Subject: [PATCH 03/15] Prevent storageServer from modifying ServerDBInfo object --- fdbserver/WorkerInterface.actor.h | 4 ++-- fdbserver/storageserver.actor.cpp | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 5937855574..20ac496c2a 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -868,12 +868,12 @@ ACTOR Future storageServer(IKeyValueStore* persistentData, Tag seedTag, Version tssSeedVersion, ReplyPromise recruitReply, - Reference> db, + Reference const> db, std::string folder); ACTOR Future storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, - Reference> db, + Reference const> db, std::string folder, Promise recovered, Reference diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index e9a23ab309..078b2b1a66 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -614,7 +614,7 @@ public: bool tssInQuarantine; Key sk; - Reference> db; + Reference const> db; Database cx; ActorCollection actors; @@ -806,7 +806,7 @@ public: } counters; StorageServer(IKeyValueStore* storage, - Reference> const& db, + Reference const> const& db, StorageServerInterface const& ssi) : fetchKeysHistograms(), instanceID(deterministicRandom()->randomUniqueID().first()), storage(this, storage), db(db), actors(false), lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0), @@ -5134,7 +5134,7 @@ ACTOR Future storageServer(IKeyValueStore* persistentData, Tag seedTag, Version tssSeedVersion, ReplyPromise recruitReply, - Reference> db, + Reference const> db, std::string folder) { state StorageServer self(persistentData, db, ssi); if (ssi.isTss()) { @@ -5328,7 +5328,7 @@ ACTOR Future replaceTSSInterface(StorageServer* self, StorageServerInterfa // for recovering an existing storage server ACTOR Future storageServer(IKeyValueStore* persistentData, StorageServerInterface ssi, - Reference> db, + Reference const> db, std::string folder, Promise recovered, Reference connFile) { From 7cfa37a731d535ba9e883355acf5381f5f036cd4 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Sun, 11 Jul 2021 21:18:09 -0700 Subject: [PATCH 04/15] Prevent storageCacheServer from modifying ServerDBInfo object --- fdbserver/StorageCache.actor.cpp | 8 +++++--- fdbserver/WorkerInterface.actor.h | 4 +++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/fdbserver/StorageCache.actor.cpp b/fdbserver/StorageCache.actor.cpp index 888c94c3b3..e62e80e5ed 100644 --- a/fdbserver/StorageCache.actor.cpp +++ b/fdbserver/StorageCache.actor.cpp @@ -162,7 +162,7 @@ public: ProtocolVersion logProtocol; Reference logSystem; Key ck; // cacheKey - Reference> const& db; + Reference const> db; Database cx; StorageCacheUpdater* updater; @@ -238,7 +238,7 @@ public: } } counters; - explicit StorageCacheData(UID thisServerID, uint16_t index, Reference> const& db) + explicit StorageCacheData(UID thisServerID, uint16_t index, Reference const> const& db) : /*versionedData(FastAllocPTree{std::make_shared(0)}), */ thisServerID(thisServerID), index(index), logProtocol(0), db(db), cacheRangeChangeCounter(0), lastTLogVersion(0), lastVersionWithData(0), peekVersion(0), compactionInProgress(Void()), @@ -2165,7 +2165,9 @@ ACTOR Future watchInterface(StorageCacheData* self, StorageServerInterface } } -ACTOR Future storageCacheServer(StorageServerInterface ssi, uint16_t id, Reference> db) { +ACTOR Future storageCacheServer(StorageServerInterface ssi, + uint16_t id, + Reference const> db) { state StorageCacheData self(ssi.id(), id, db); state ActorCollection actors(false); state Future dbInfoChange = Void(); diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 20ac496c2a..45332ac0b4 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -912,7 +912,9 @@ ACTOR Future logRouter(TLogInterface interf, Reference> db); ACTOR Future dataDistributor(DataDistributorInterface ddi, Reference const> db); ACTOR Future ratekeeper(RatekeeperInterface rki, Reference> db); -ACTOR Future storageCacheServer(StorageServerInterface interf, uint16_t id, Reference> db); +ACTOR Future storageCacheServer(StorageServerInterface interf, + uint16_t id, + Reference const> db); ACTOR Future backupWorker(BackupInterface bi, InitializeBackupRequest req, Reference> db); void registerThreadForProfiling(); From fe03cead964e33e3e48b3db0fb1d573e612aa8bb Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Sun, 11 Jul 2021 21:26:47 -0700 Subject: [PATCH 05/15] Prevent resolver from modifying ServerDBInfo object --- fdbserver/Resolver.actor.cpp | 4 ++-- fdbserver/WorkerInterface.actor.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/fdbserver/Resolver.actor.cpp b/fdbserver/Resolver.actor.cpp index 351439a947..ff09179bf1 100644 --- a/fdbserver/Resolver.actor.cpp +++ b/fdbserver/Resolver.actor.cpp @@ -354,7 +354,7 @@ ACTOR Future resolverCore(ResolverInterface resolver, InitializeResolverRe } } -ACTOR Future checkRemoved(Reference> db, +ACTOR Future checkRemoved(Reference const> db, uint64_t recoveryCount, ResolverInterface myInterface) { loop { @@ -367,7 +367,7 @@ ACTOR Future checkRemoved(Reference> db, ACTOR Future resolver(ResolverInterface resolver, InitializeResolverRequest initReq, - Reference> db) { + Reference const> db) { try { state Future core = resolverCore(resolver, initReq); loop choose { diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 45332ac0b4..f7c71cddc9 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -906,7 +906,7 @@ ACTOR Future tLog(IKeyValueStore* persistentData, Reference> activeSharedTLog); ACTOR Future resolver(ResolverInterface resolver, InitializeResolverRequest initReq, - Reference> db); + Reference const> db); ACTOR Future logRouter(TLogInterface interf, InitializeLogRouterRequest req, Reference> db); From a106d40012bb667bbb1c977e732b804f6cb5e29f Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Sun, 11 Jul 2021 22:05:26 -0700 Subject: [PATCH 06/15] Prevent logRouter from modifying ServerDBInfo object --- fdbserver/LogRouter.actor.cpp | 6 +++--- fdbserver/WorkerInterface.actor.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index ec0ec6a416..5b0aa75ad7 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -625,7 +625,7 @@ ACTOR Future logRouterPop(LogRouterData* self, TLogPopRequest req) { ACTOR Future logRouterCore(TLogInterface interf, InitializeLogRouterRequest req, - Reference> db) { + Reference const> db) { state LogRouterData logRouterData(interf.id(), req); state PromiseStream> addActor; state Future error = actorCollection(addActor.getFuture()); @@ -653,7 +653,7 @@ ACTOR Future logRouterCore(TLogInterface interf, } } -ACTOR Future checkRemoved(Reference> db, +ACTOR Future checkRemoved(Reference const> db, uint64_t recoveryCount, TLogInterface myInterface) { loop { @@ -670,7 +670,7 @@ ACTOR Future checkRemoved(Reference> db, ACTOR Future logRouter(TLogInterface interf, InitializeLogRouterRequest req, - Reference> db) { + Reference const> db) { try { TraceEvent("LogRouterStart", interf.id()) .detail("Start", req.startVersion) diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index f7c71cddc9..4d1877268a 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -909,7 +909,7 @@ ACTOR Future resolver(ResolverInterface resolver, Reference const> db); ACTOR Future logRouter(TLogInterface interf, InitializeLogRouterRequest req, - Reference> db); + Reference const> db); ACTOR Future dataDistributor(DataDistributorInterface ddi, Reference const> db); ACTOR Future ratekeeper(RatekeeperInterface rki, Reference> db); ACTOR Future storageCacheServer(StorageServerInterface interf, From 1a20cf9579081c3dd63a05fd65699464f13f7d71 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Sun, 11 Jul 2021 22:17:09 -0700 Subject: [PATCH 07/15] Prevent commitProxyServer from modifying ServerDBInfo object --- fdbserver/CommitProxyServer.actor.cpp | 13 +++++++------ fdbserver/ProxyCommitData.actor.h | 4 ++-- fdbserver/WorkerInterface.actor.h | 2 +- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index 337a24e956..71316dcdb7 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -1596,7 +1596,7 @@ ACTOR static Future rejoinServer(CommitProxyInterface proxy, ProxyCommitDa } } -ACTOR Future ddMetricsRequestServer(CommitProxyInterface proxy, Reference> db) { +ACTOR Future ddMetricsRequestServer(CommitProxyInterface proxy, Reference const> db) { loop { choose { when(state GetDDMetricsRequest req = waitNext(proxy.getDDMetrics.getFuture())) { @@ -1754,7 +1754,8 @@ ACTOR Future proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* co return Void(); } -ACTOR Future proxyCheckSafeExclusion(Reference> db, ExclusionSafetyCheckRequest req) { +ACTOR Future proxyCheckSafeExclusion(Reference const> db, + ExclusionSafetyCheckRequest req) { TraceEvent("SafetyCheckCommitProxyBegin"); state ExclusionSafetyCheckReply reply(false); if (!db->get().distributor.present()) { @@ -1783,7 +1784,7 @@ ACTOR Future proxyCheckSafeExclusion(Reference> db, } ACTOR Future reportTxnTagCommitCost(UID myID, - Reference> db, + Reference const> db, UIDTransactionTagMap* ssTrTagCommitCost) { state Future nextRequestTimer = Never(); state Future nextReply = Never(); @@ -1818,7 +1819,7 @@ ACTOR Future reportTxnTagCommitCost(UID myID, ACTOR Future commitProxyServerCore(CommitProxyInterface proxy, MasterInterface master, - Reference> db, + Reference const> db, LogEpoch epoch, Version recoveryTransactionVersion, bool firstProxy, @@ -2037,7 +2038,7 @@ ACTOR Future commitProxyServerCore(CommitProxyInterface proxy, } } -ACTOR Future checkRemoved(Reference> db, +ACTOR Future checkRemoved(Reference const> db, uint64_t recoveryCount, CommitProxyInterface myInterface) { loop { @@ -2051,7 +2052,7 @@ ACTOR Future checkRemoved(Reference> db, ACTOR Future commitProxyServer(CommitProxyInterface proxy, InitializeCommitProxyRequest req, - Reference> db, + Reference const> db, std::string whitelistBinPaths) { try { state Future core = commitProxyServerCore(proxy, diff --git a/fdbserver/ProxyCommitData.actor.h b/fdbserver/ProxyCommitData.actor.h index 7a2960022e..a8896b768c 100644 --- a/fdbserver/ProxyCommitData.actor.h +++ b/fdbserver/ProxyCommitData.actor.h @@ -161,7 +161,7 @@ struct ProxyCommitData { RequestStream getConsistentReadVersion; RequestStream commit; Database cx; - Reference> db; + Reference const> db; EventMetricHandle singleKeyMutationEvent; std::map> storageCache; @@ -239,7 +239,7 @@ struct ProxyCommitData { RequestStream getConsistentReadVersion, Version recoveryTransactionVersion, RequestStream commit, - Reference> db, + Reference const> db, bool firstProxy) : dbgid(dbgid), stats(dbgid, &version, &committedVersion, &commitBatchesMemBytesCount), master(master), logAdapter(nullptr), txnStateStore(nullptr), popRemoteTxs(false), committedVersion(recoveryTransactionVersion), diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 4d1877268a..e77c79d341 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -886,7 +886,7 @@ ACTOR Future masterServer(MasterInterface mi, bool forceRecovery); ACTOR Future commitProxyServer(CommitProxyInterface proxy, InitializeCommitProxyRequest req, - Reference> db, + Reference const> db, std::string whitelistBinPaths); ACTOR Future grvProxyServer(GrvProxyInterface proxy, InitializeGrvProxyRequest req, From b2bbdf0d7f9cfca42b16e4b054a527065b438491 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Sun, 11 Jul 2021 22:19:18 -0700 Subject: [PATCH 08/15] Prevent grvProxyServer from modifying ServerDBInfo object --- fdbserver/GrvProxyServer.actor.cpp | 18 +++++++++--------- fdbserver/WorkerInterface.actor.h | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/fdbserver/GrvProxyServer.actor.cpp b/fdbserver/GrvProxyServer.actor.cpp index 0a7614a52c..56251976ab 100644 --- a/fdbserver/GrvProxyServer.actor.cpp +++ b/fdbserver/GrvProxyServer.actor.cpp @@ -222,7 +222,7 @@ struct GrvProxyData { Reference logSystem; Database cx; - Reference> db; + Reference const> db; Optional latencyBandConfig; double lastStartCommit; @@ -251,7 +251,7 @@ struct GrvProxyData { GrvProxyData(UID dbgid, MasterInterface master, RequestStream getConsistentReadVersion, - Reference> db) + Reference const> db) : dbgid(dbgid), stats(dbgid), master(master), getConsistentReadVersion(getConsistentReadVersion), cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::TRUE)), db(db), lastStartCommit(0), lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), updateCommitRequests(0), lastCommitTime(0), @@ -275,7 +275,7 @@ ACTOR Future healthMetricsRequestServer(GrvProxyInterface grvProxy, // Get transaction rate info from RateKeeper. ACTOR Future getRate(UID myID, - Reference> db, + Reference const> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, GrvTransactionRateInfo* transactionRateInfo, @@ -375,7 +375,7 @@ void dropRequestFromQueue(Deque* queue, GrvProxyStats* st } // Put a GetReadVersion request into the queue corresponding to its priority. -ACTOR Future queueGetReadVersionRequests(Reference> db, +ACTOR Future queueGetReadVersionRequests(Reference const> db, SpannedDeque* systemQueue, SpannedDeque* defaultQueue, SpannedDeque* batchQueue, @@ -634,7 +634,7 @@ ACTOR Future sendGrvReplies(Future replyFuture, return Void(); } -ACTOR Future monitorDDMetricsChanges(int64_t* midShardSize, Reference> db) { +ACTOR Future monitorDDMetricsChanges(int64_t* midShardSize, Reference const> db) { state Future nextRequestTimer = Never(); state Future nextReply = Never(); @@ -680,7 +680,7 @@ ACTOR Future monitorDDMetricsChanges(int64_t* midShardSize, Reference transactionStarter(GrvProxyInterface proxy, - Reference> db, + Reference const> db, PromiseStream> addActor, GrvProxyData* grvProxyData, GetHealthMetricsReply* healthMetricsReply, @@ -898,7 +898,7 @@ ACTOR static Future transactionStarter(GrvProxyInterface proxy, ACTOR Future grvProxyServerCore(GrvProxyInterface proxy, MasterInterface master, - Reference> db) { + Reference const> db) { state GrvProxyData grvProxyData(proxy.id(), master, proxy.getConsistentReadVersion, db); state PromiseStream> addActor; @@ -945,7 +945,7 @@ ACTOR Future grvProxyServerCore(GrvProxyInterface proxy, } } -ACTOR Future checkRemoved(Reference> db, +ACTOR Future checkRemoved(Reference const> db, uint64_t recoveryCount, GrvProxyInterface myInterface) { loop { @@ -959,7 +959,7 @@ ACTOR Future checkRemoved(Reference> db, ACTOR Future grvProxyServer(GrvProxyInterface proxy, InitializeGrvProxyRequest req, - Reference> db) { + Reference const> db) { try { state Future core = grvProxyServerCore(proxy, req.master, db); wait(core || checkRemoved(db, req.recoveryCount, proxy)); diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index e77c79d341..a9a7a6dcb8 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -890,7 +890,7 @@ ACTOR Future commitProxyServer(CommitProxyInterface proxy, std::string whitelistBinPaths); ACTOR Future grvProxyServer(GrvProxyInterface proxy, InitializeGrvProxyRequest req, - Reference> db); + Reference const> db); ACTOR Future tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference> db, From 84f6b55e6c3368ca685fed48d6f73aeb17bda9f8 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Sun, 11 Jul 2021 22:25:40 -0700 Subject: [PATCH 09/15] Prevent tLog from modifying ServerDBInfo object --- fdbserver/OldTLogServer_4_6.actor.cpp | 6 +++--- fdbserver/OldTLogServer_6_0.actor.cpp | 6 +++--- fdbserver/OldTLogServer_6_2.actor.cpp | 6 +++--- fdbserver/TLogServer.actor.cpp | 6 +++--- fdbserver/WorkerInterface.actor.h | 8 ++++---- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/fdbserver/OldTLogServer_4_6.actor.cpp b/fdbserver/OldTLogServer_4_6.actor.cpp index d8d6755910..ce291e644c 100644 --- a/fdbserver/OldTLogServer_4_6.actor.cpp +++ b/fdbserver/OldTLogServer_4_6.actor.cpp @@ -291,7 +291,7 @@ struct TLogData : NonCopyable { AsyncVar largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES - Reference> dbInfo; + Reference const> dbInfo; NotifiedVersion queueCommitEnd; Version queueCommitBegin; @@ -321,7 +321,7 @@ struct TLogData : NonCopyable { UID workerID, IKeyValueStore* persistentData, IDiskQueue* persistentQueue, - Reference> const& dbInfo) + Reference const> const& dbInfo) : dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()), persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)), dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0), @@ -1568,7 +1568,7 @@ ACTOR Future restorePersistentState(TLogData* self, LocalityData locality) ACTOR Future tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue, - Reference> db, + Reference const> db, LocalityData locality, UID tlogId, UID workerID) { diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index 24c97f741c..dfa3a94a34 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -264,7 +264,7 @@ struct TLogData : NonCopyable { AsyncVar largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES - Reference> dbInfo; + Reference const> dbInfo; Database cx; NotifiedVersion queueCommitEnd; @@ -301,7 +301,7 @@ struct TLogData : NonCopyable { UID workerID, IKeyValueStore* persistentData, IDiskQueue* persistentQueue, - Reference> dbInfo, + Reference const> dbInfo, Reference> degraded, std::string folder) : dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()), @@ -2716,7 +2716,7 @@ ACTOR Future startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen // New tLog (if !recoverFrom.size()) or restore from network ACTOR Future tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue, - Reference> db, + Reference const> db, LocalityData locality, PromiseStream tlogRequests, UID tlogId, diff --git a/fdbserver/OldTLogServer_6_2.actor.cpp b/fdbserver/OldTLogServer_6_2.actor.cpp index 68c125858f..4fa793c74b 100644 --- a/fdbserver/OldTLogServer_6_2.actor.cpp +++ b/fdbserver/OldTLogServer_6_2.actor.cpp @@ -327,7 +327,7 @@ struct TLogData : NonCopyable { AsyncVar largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES - Reference> dbInfo; + Reference const> dbInfo; Database cx; NotifiedVersion queueCommitEnd; @@ -364,7 +364,7 @@ struct TLogData : NonCopyable { UID workerID, IKeyValueStore* persistentData, IDiskQueue* persistentQueue, - Reference> dbInfo, + Reference const> dbInfo, Reference> degraded, std::string folder) : dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()), @@ -3205,7 +3205,7 @@ ACTOR Future startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen // New tLog (if !recoverFrom.size()) or restore from network ACTOR Future tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue, - Reference> db, + Reference const> db, LocalityData locality, PromiseStream tlogRequests, UID tlogId, diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index a948ecefb2..15074905da 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -329,7 +329,7 @@ struct TLogData : NonCopyable { AsyncVar largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES - Reference> dbInfo; + Reference const> dbInfo; Database cx; NotifiedVersion queueCommitEnd; @@ -372,7 +372,7 @@ struct TLogData : NonCopyable { UID workerID, IKeyValueStore* persistentData, IDiskQueue* persistentQueue, - Reference> dbInfo, + Reference const> dbInfo, Reference> degraded, std::string folder) : dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()), @@ -3280,7 +3280,7 @@ ACTOR Future startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen // New tLog (if !recoverFrom.size()) or restore from network ACTOR Future tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue, - Reference> db, + Reference const> db, LocalityData locality, PromiseStream tlogRequests, UID tlogId, diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index a9a7a6dcb8..1e32cc55db 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -893,7 +893,7 @@ ACTOR Future grvProxyServer(GrvProxyInterface proxy, Reference const> db); ACTOR Future tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue, - Reference> db, + Reference const> db, LocalityData locality, PromiseStream tlogRequests, UID tlogId, @@ -923,7 +923,7 @@ void updateCpuProfiler(ProfilerRequest req); namespace oldTLog_4_6 { ACTOR Future tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue, - Reference> db, + Reference const> db, LocalityData locality, UID tlogId, UID workerID); @@ -931,7 +931,7 @@ ACTOR Future tLog(IKeyValueStore* persistentData, namespace oldTLog_6_0 { ACTOR Future tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue, - Reference> db, + Reference const> db, LocalityData locality, PromiseStream tlogRequests, UID tlogId, @@ -946,7 +946,7 @@ ACTOR Future tLog(IKeyValueStore* persistentData, namespace oldTLog_6_2 { ACTOR Future tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue, - Reference> db, + Reference const> db, LocalityData locality, PromiseStream tlogRequests, UID tlogId, From ca3f0152724fb5ee6af5e5835cfb6c86e00931b0 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Sun, 11 Jul 2021 21:52:55 -0700 Subject: [PATCH 10/15] Prevent ratekeeper from modifying ServerDBInfo object --- fdbserver/Ratekeeper.actor.cpp | 2 +- fdbserver/WorkerInterface.actor.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 77bda2577b..83f25160cf 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -1408,7 +1408,7 @@ ACTOR Future configurationMonitor(RatekeeperData* self) { } } -ACTOR Future ratekeeper(RatekeeperInterface rkInterf, Reference> dbInfo) { +ACTOR Future ratekeeper(RatekeeperInterface rkInterf, Reference const> dbInfo) { state RatekeeperData self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::TRUE)); state Future timeout = Void(); state std::vector> tlogTrackers; diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 1e32cc55db..d63c22c586 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -911,7 +911,7 @@ ACTOR Future logRouter(TLogInterface interf, InitializeLogRouterRequest req, Reference const> db); ACTOR Future dataDistributor(DataDistributorInterface ddi, Reference const> db); -ACTOR Future ratekeeper(RatekeeperInterface rki, Reference> db); +ACTOR Future ratekeeper(RatekeeperInterface rki, Reference const> db); ACTOR Future storageCacheServer(StorageServerInterface interf, uint16_t id, Reference const> db); From 0e1d5c34e6dbb57ba2cbc920fa309dafcf999c63 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Sun, 11 Jul 2021 21:54:36 -0700 Subject: [PATCH 11/15] Prevent backupWorker from modifying ServerDBInfo object --- fdbserver/BackupWorker.actor.cpp | 6 +++--- fdbserver/WorkerInterface.actor.h | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index 8a1f2c952a..4e40ea1c47 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -237,7 +237,7 @@ struct BackupData { CounterCollection cc; Future logger; - explicit BackupData(UID id, Reference> db, const InitializeBackupRequest& req) + explicit BackupData(UID id, Reference const> db, const InitializeBackupRequest& req) : myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion), endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch), minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1), @@ -987,7 +987,7 @@ ACTOR Future monitorBackupKeyOrPullData(BackupData* self, bool keyPresent) } } -ACTOR Future checkRemoved(Reference> db, LogEpoch recoveryCount, BackupData* self) { +ACTOR Future checkRemoved(Reference const> db, LogEpoch recoveryCount, BackupData* self) { loop { bool isDisplaced = db->get().recoveryCount > recoveryCount && db->get().recoveryState != RecoveryState::UNINITIALIZED; @@ -1033,7 +1033,7 @@ ACTOR static Future monitorWorkerPause(BackupData* self) { ACTOR Future backupWorker(BackupInterface interf, InitializeBackupRequest req, - Reference> db) { + Reference const> db) { state BackupData self(interf.id(), db, req); state PromiseStream> addActor; state Future error = actorCollection(addActor.getFuture()); diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index d63c22c586..9e155756f0 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -915,7 +915,9 @@ ACTOR Future ratekeeper(RatekeeperInterface rki, Reference storageCacheServer(StorageServerInterface interf, uint16_t id, Reference const> db); -ACTOR Future backupWorker(BackupInterface bi, InitializeBackupRequest req, Reference> db); +ACTOR Future backupWorker(BackupInterface bi, + InitializeBackupRequest req, + Reference const> db); void registerThreadForProfiling(); void updateCpuProfiler(ProfilerRequest req); From 77cbc1aa81bd4fb561cd74b76f6bfb3b9970dca3 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Sun, 11 Jul 2021 11:54:04 -0700 Subject: [PATCH 12/15] s/IDependentAsyncVar/IAsyncListener --- fdbclient/MonitorLeader.h | 2 +- fdbserver/ConfigDatabaseUnitTests.actor.cpp | 8 +++---- fdbserver/LocalConfiguration.actor.cpp | 9 ++++---- fdbserver/LocalConfiguration.h | 2 +- fdbserver/worker.actor.cpp | 7 +++---- flow/genericactors.actor.cpp | 8 +++---- flow/genericactors.actor.h | 23 ++++++++++----------- 7 files changed, 28 insertions(+), 31 deletions(-) diff --git a/fdbclient/MonitorLeader.h b/fdbclient/MonitorLeader.h index b9b195a9da..22ef1a5300 100644 --- a/fdbclient/MonitorLeader.h +++ b/fdbclient/MonitorLeader.h @@ -49,7 +49,7 @@ struct ClientData { OpenDatabaseRequest getRequest(); - ClientData() : clientInfo(new AsyncVar>(CachedSerialization())) {} + ClientData() : clientInfo(makeReference>>()) {} }; struct MonitorLeaderInfo { diff --git a/fdbserver/ConfigDatabaseUnitTests.actor.cpp b/fdbserver/ConfigDatabaseUnitTests.actor.cpp index e315156da0..aa57ae9db3 100644 --- a/fdbserver/ConfigDatabaseUnitTests.actor.cpp +++ b/fdbserver/ConfigDatabaseUnitTests.actor.cpp @@ -126,7 +126,7 @@ class ReadFromLocalConfigEnvironment { UID id; std::string dataDir; LocalConfiguration localConfiguration; - Reference const> cbfi; + Reference const> cbfi; Future consumer; ACTOR static Future checkEventually(LocalConfiguration const* localConfiguration, @@ -168,7 +168,7 @@ public: return setup(); } - void connectToBroadcaster(Reference const> const& cbfi) { + void connectToBroadcaster(Reference const> const& cbfi) { ASSERT(!this->cbfi); this->cbfi = cbfi; consumer = localConfiguration.consume(cbfi); @@ -228,7 +228,7 @@ class BroadcasterToLocalConfigEnvironment { ACTOR static Future setup(BroadcasterToLocalConfigEnvironment* self) { wait(self->readFrom.setup()); - self->readFrom.connectToBroadcaster(IDependentAsyncVar::create(self->cbfi)); + self->readFrom.connectToBroadcaster(IAsyncListener::create(self->cbfi)); self->broadcastServer = self->broadcaster.serve(self->cbfi->get()); return Void(); } @@ -364,7 +364,7 @@ class TransactionToLocalConfigEnvironment { ACTOR static Future setup(TransactionToLocalConfigEnvironment* self) { wait(self->readFrom.setup()); - self->readFrom.connectToBroadcaster(IDependentAsyncVar::create(self->cbfi)); + self->readFrom.connectToBroadcaster(IAsyncListener::create(self->cbfi)); self->broadcastServer = self->broadcaster.serve(self->cbfi->get()); return Void(); } diff --git a/fdbserver/LocalConfiguration.actor.cpp b/fdbserver/LocalConfiguration.actor.cpp index 30974a2d19..3fd55141a3 100644 --- a/fdbserver/LocalConfiguration.actor.cpp +++ b/fdbserver/LocalConfiguration.actor.cpp @@ -309,9 +309,8 @@ class LocalConfigurationImpl { } } - ACTOR static Future consume( - LocalConfigurationImpl* self, - Reference const> broadcaster) { + ACTOR static Future consume(LocalConfigurationImpl* self, + Reference const> broadcaster) { ASSERT(self->initFuture.isValid() && self->initFuture.isReady()); loop { choose { @@ -371,7 +370,7 @@ public: return getKnobs().getTestKnobs(); } - Future consume(Reference const> const& broadcaster) { + Future consume(Reference const> const& broadcaster) { return consume(this, broadcaster); } @@ -453,7 +452,7 @@ TestKnobs const& LocalConfiguration::getTestKnobs() const { } Future LocalConfiguration::consume( - Reference const> const& broadcaster) { + Reference const> const& broadcaster) { return impl().consume(broadcaster); } diff --git a/fdbserver/LocalConfiguration.h b/fdbserver/LocalConfiguration.h index 6f9ecabc8f..e5e212b83e 100644 --- a/fdbserver/LocalConfiguration.h +++ b/fdbserver/LocalConfiguration.h @@ -60,7 +60,7 @@ public: ClientKnobs const& getClientKnobs() const; ServerKnobs const& getServerKnobs() const; TestKnobs const& getTestKnobs() const; - Future consume(Reference const> const& broadcaster); + Future consume(Reference const> const& broadcaster); UID getID() const; public: // Testing diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 2ff5252320..d14e1b21e1 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -2303,10 +2303,9 @@ ACTOR Future fdbd(Reference connFile, auto dbInfo = makeReference>(); if (useConfigDB != UseConfigDB::DISABLED) { - actors.push_back( - reportErrors(localConfig.consume(IDependentAsyncVar::create( - dbInfo, [](auto const& info) { return info.configBroadcaster; })), - "LocalConfiguration")); + actors.push_back(reportErrors(localConfig.consume(IAsyncListener::create( + dbInfo, [](auto const& info) { return info.configBroadcaster; })), + "LocalConfiguration")); } actors.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo), "MonitorAndWriteCCPriorityInfo")); diff --git a/flow/genericactors.actor.cpp b/flow/genericactors.actor.cpp index b199175af7..9b7f906713 100644 --- a/flow/genericactors.actor.cpp +++ b/flow/genericactors.actor.cpp @@ -158,7 +158,7 @@ ACTOR Future testPublisher(Reference> input) { return Void(); } -ACTOR Future testSubscriber(Reference> output, Optional expected) { +ACTOR Future testSubscriber(Reference> output, Optional expected) { loop { wait(output->onChange()); ASSERT(expected.present()); @@ -170,12 +170,12 @@ ACTOR Future testSubscriber(Reference> output, Opt } // namespace -TEST_CASE("/flow/genericactors/DependentAsyncVar") { +TEST_CASE("/flow/genericactors/AsyncListener") { auto input = makeReference>(); state Future subscriber1 = - testSubscriber(IDependentAsyncVar::create(input, [](auto const& var) { return var.changed; }), 100); + testSubscriber(IAsyncListener::create(input, [](auto const& var) { return var.changed; }), 100); state Future subscriber2 = - testSubscriber(IDependentAsyncVar::create(input, [](auto const& var) { return var.unchanged; }), {}); + testSubscriber(IAsyncListener::create(input, [](auto const& var) { return var.unchanged; }), {}); wait(subscriber1 && testPublisher(input)); ASSERT(!subscriber2.isReady()); return Void(); diff --git a/flow/genericactors.actor.h b/flow/genericactors.actor.h index 5cb0344ffb..bffee8bdfa 100644 --- a/flow/genericactors.actor.h +++ b/flow/genericactors.actor.h @@ -1957,22 +1957,22 @@ Future operator>>(Future const& lhs, Future const& rhs) { } /* - * IDependentAsyncVar is similar to AsyncVar, but it decouples the input and output, so the translation unit + * IAsyncListener is similar to AsyncVar, but it decouples the input and output, so the translation unit * responsible for handling the output does not need to have knowledge of how the output is generated */ template -class IDependentAsyncVar : public ReferenceCounted> { +class IAsyncListener : public ReferenceCounted> { public: - virtual ~IDependentAsyncVar() = default; + virtual ~IAsyncListener() = default; virtual Output const& get() const = 0; virtual Future onChange() const = 0; template - static Reference create(Reference> const& input, F const& f); - static Reference create(Reference> const& output); + static Reference create(Reference> const& input, F const& f); + static Reference create(Reference> const& output); }; template -class DependentAsyncVar final : public IDependentAsyncVar { +class AsyncListener final : public IAsyncListener { Reference> output; Future monitorActor; ACTOR static Future monitor(Reference> input, Reference> output, F f) { @@ -1983,7 +1983,7 @@ class DependentAsyncVar final : public IDependentAsyncVar { } public: - DependentAsyncVar(Reference> const& input, F const& f) + AsyncListener(Reference> const& input, F const& f) : output(makeReference>(f(input->get()))), monitorActor(monitor(input, output, f)) {} Output const& get() const override { return output->get(); } Future onChange() const override { return output->onChange(); } @@ -1991,15 +1991,14 @@ public: template template -Reference> IDependentAsyncVar::create(Reference> const& input, - F const& f) { - return makeReference>(input, f); +Reference> IAsyncListener::create(Reference> const& input, F const& f) { + return makeReference>(input, f); } template -Reference> IDependentAsyncVar::create(Reference> const& input) { +Reference> IAsyncListener::create(Reference> const& input) { auto identity = [](const auto& x) { return x; }; - return makeReference>(input, identity); + return makeReference>(input, identity); } // A weak reference type to wrap a future Reference object. From 95d86a1d1e13531ce32de4f75ae6c61e22c227d3 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Mon, 12 Jul 2021 16:30:27 -0700 Subject: [PATCH 13/15] Put IAsyncListener implementation in its own namespace --- flow/genericactors.actor.h | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/flow/genericactors.actor.h b/flow/genericactors.actor.h index bffee8bdfa..b83c3f90be 100644 --- a/flow/genericactors.actor.h +++ b/flow/genericactors.actor.h @@ -1971,11 +1971,14 @@ public: static Reference create(Reference> const& output); }; +namespace IAsyncListenerImpl { + template class AsyncListener final : public IAsyncListener { - Reference> output; + // Order matters here, output must outlive monitorActor + AsyncVar output; Future monitorActor; - ACTOR static Future monitor(Reference> input, Reference> output, F f) { + ACTOR static Future monitor(Reference const> input, AsyncVar* output, F f) { loop { wait(input->onChange()); output->set(f(input->get())); @@ -1983,22 +1986,24 @@ class AsyncListener final : public IAsyncListener { } public: - AsyncListener(Reference> const& input, F const& f) - : output(makeReference>(f(input->get()))), monitorActor(monitor(input, output, f)) {} - Output const& get() const override { return output->get(); } - Future onChange() const override { return output->onChange(); } + AsyncListener(Reference const> const& input, F const& f) + : output(f(input->get())), monitorActor(monitor(input, &output, f)) {} + Output const& get() const override { return output.get(); } + Future onChange() const override { return output.onChange(); } }; +} // namespace IAsyncListenerImpl + template template Reference> IAsyncListener::create(Reference> const& input, F const& f) { - return makeReference>(input, f); + return makeReference>(input, f); } template Reference> IAsyncListener::create(Reference> const& input) { auto identity = [](const auto& x) { return x; }; - return makeReference>(input, identity); + return makeReference>(input, identity); } // A weak reference type to wrap a future Reference object. From 03949f2bf9583aef9fbab4469836341e5f292766 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Mon, 12 Jul 2021 16:53:52 -0700 Subject: [PATCH 14/15] Improve const-correctness of registrationClient arguments --- fdbserver/worker.actor.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index d14e1b21e1..bdbb2ac7e7 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -502,15 +502,15 @@ std::vector getDiskStores(std::string folder) { // Register the worker interf to cluster controller (cc) and // re-register the worker when key roles interface, e.g., cc, dd, ratekeeper, change. -ACTOR Future registrationClient(Reference>> ccInterface, +ACTOR Future registrationClient(Reference> const> ccInterface, WorkerInterface interf, Reference> asyncPriorityInfo, ProcessClass initialClass, - Reference>> ddInterf, - Reference>> rkInterf, - Reference> degraded, + Reference> const> ddInterf, + Reference> const> rkInterf, + Reference const> degraded, Reference connFile, - Reference>> issues) { + Reference> const> issues) { // Keeps the cluster controller (as it may be re-elected) informed that this worker exists // The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply // (requiring us to re-register) The registration request piggybacks optional distributor interface if it exists. From 4f853b19a6f1387ee2e6a59a5815ec576d16065b Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Mon, 12 Jul 2021 21:28:38 -0700 Subject: [PATCH 15/15] More const-correctness improvements for Reference> objects --- fdbclient/DatabaseContext.h | 12 +++++++----- fdbclient/NativeAPI.actor.cpp | 12 ++++++------ fdbrpc/FlowTransport.actor.cpp | 4 ++-- fdbrpc/FlowTransport.h | 2 +- flow/genericactors.actor.h | 2 +- 5 files changed, 17 insertions(+), 15 deletions(-) diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index d95ca71c32..a1d33d6781 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -196,7 +196,7 @@ public: Reference getCommitProxies(bool useProvisionalProxies); Future> getCommitProxiesFuture(bool useProvisionalProxies); Reference getGrvProxies(bool useProvisionalProxies); - Future onProxiesChanged(); + Future onProxiesChanged() const; Future getHealthMetrics(bool detailed); // Returns the protocol version reported by the coordinator this client is connected to @@ -255,7 +255,7 @@ public: // private: explicit DatabaseContext(Reference>> connectionFile, Reference> clientDBInfo, - Reference>> coordinator, + Reference> const> coordinator, Future clientInfoMonitor, TaskPriority taskID, LocalityData const& clientLocality, @@ -307,7 +307,7 @@ public: // trust that the read version (possibly set manually by the application) is actually from the correct cluster. // Updated everytime we get a GRV response Version minAcceptableReadVersion = std::numeric_limits::max(); - void validateVersion(Version); + void validateVersion(Version) const; // Client status updater struct ClientStatusUpdater { @@ -399,7 +399,7 @@ public: Future connected; // An AsyncVar that reports the coordinator this DatabaseContext is interacting with - Reference>> coordinator; + Reference> const> coordinator; Reference>> statusClusterInterface; Future statusLeaderMon; @@ -428,7 +428,6 @@ public: static bool debugUseTags; static const std::vector debugTransactionTagChoices; - std::unordered_map> watchMap; // Adds or updates the specified (SS, TSS) pair in the TSS mapping (if not already present). // Requests to the storage server will be duplicated to the TSS. @@ -437,6 +436,9 @@ public: // Removes the storage server and its TSS pair from the TSS mapping (if present). // Requests to the storage server will no longer be duplicated to its pair TSS. void removeTssMapping(StorageServerInterface const& ssi); + +private: + std::unordered_map> watchMap; }; #endif diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 8fa167694e..dcae7caa9c 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -285,7 +285,7 @@ std::string unprintable(std::string const& val) { return s; } -void DatabaseContext::validateVersion(Version version) { +void DatabaseContext::validateVersion(Version version) const { // Version could be 0 if the INITIALIZE_NEW_DATABASE option is set. In that case, it is illegal to perform any // reads. We throw client_invalid_operation because the caller didn't directly set the version, so the // version_invalid error might be confusing. @@ -650,7 +650,7 @@ ACTOR static Future clientStatusUpdateActor(DatabaseContext* cx) { } } -ACTOR static Future monitorProxiesChange(Reference> clientDBInfo, +ACTOR static Future monitorProxiesChange(Reference const> clientDBInfo, AsyncTrigger* triggerVar) { state vector curCommitProxies; state vector curGrvProxies; @@ -1085,7 +1085,7 @@ Future HealthMetricsRangeImpl::getRange(ReadYourWritesTransaction* DatabaseContext::DatabaseContext(Reference>> connectionFile, Reference> clientInfo, - Reference>> coordinator, + Reference> const> coordinator, Future clientInfoMonitor, TaskPriority taskID, LocalityData const& clientLocality, @@ -1482,7 +1482,7 @@ void DatabaseContext::invalidateCache(const KeyRangeRef& keys) { locationCache.insert(KeyRangeRef(begin, end), Reference()); } -Future DatabaseContext::onProxiesChanged() { +Future DatabaseContext::onProxiesChanged() const { return this->proxiesChangeTrigger.onTrigger(); } @@ -5761,7 +5761,7 @@ ACTOR Future> getCoordinatorProtocolFromConnectPacket( NetworkAddress coordinatorAddress, Optional expectedVersion) { - state Reference>> protocolVersion = + state Reference> const> protocolVersion = FlowTransport::transport().getPeerProtocolAsyncVar(coordinatorAddress); loop { @@ -5786,7 +5786,7 @@ ACTOR Future> getCoordinatorProtocolFromConnectPacket( // Returns the protocol version reported by the given coordinator // If an expected version is given, the future won't return until the protocol version is different than expected ACTOR Future getClusterProtocolImpl( - Reference>> coordinator, + Reference> const> coordinator, Optional expectedVersion) { state bool needToConnect = true; diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index e16264591f..5b08069fc7 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -1698,7 +1698,7 @@ Reference> FlowTransport::getDegraded() { // // Note that this function does not establish a connection to the peer. In order to obtain a peer's protocol // version, some other mechanism should be used to connect to that peer. -Reference>> FlowTransport::getPeerProtocolAsyncVar(NetworkAddress addr) { +Reference> const> FlowTransport::getPeerProtocolAsyncVar(NetworkAddress addr) { return self->peers.at(addr)->protocolVersion; } @@ -1723,4 +1723,4 @@ void FlowTransport::createInstance(bool isClient, uint64_t transportId) { HealthMonitor* FlowTransport::healthMonitor() { return &self->healthMonitor; -} \ No newline at end of file +} diff --git a/fdbrpc/FlowTransport.h b/fdbrpc/FlowTransport.h index 7ae82b8ef7..0ba5a605aa 100644 --- a/fdbrpc/FlowTransport.h +++ b/fdbrpc/FlowTransport.h @@ -252,7 +252,7 @@ public: // // Note that this function does not establish a connection to the peer. In order to obtain a peer's protocol // version, some other mechanism should be used to connect to that peer. - Reference>> getPeerProtocolAsyncVar(NetworkAddress addr); + Reference> const> getPeerProtocolAsyncVar(NetworkAddress addr); static FlowTransport& transport() { return *static_cast((void*)g_network->global(INetwork::enFlowTransport)); diff --git a/flow/genericactors.actor.h b/flow/genericactors.actor.h index b83c3f90be..30794d9791 100644 --- a/flow/genericactors.actor.h +++ b/flow/genericactors.actor.h @@ -690,7 +690,7 @@ public: AsyncTrigger() {} AsyncTrigger(AsyncTrigger&& at) : v(std::move(at.v)) {} void operator=(AsyncTrigger&& at) { v = std::move(at.v); } - Future onTrigger() { return v.onChange(); } + Future onTrigger() const { return v.onChange(); } void trigger() { v.trigger(); } private: