Prevent dataDistributor from modifying ServerDBInfo object

This commit is contained in:
sfc-gh-tclinkenbeard 2021-07-11 22:04:38 -07:00
parent 6c1d913ab8
commit 8a212862f0
5 changed files with 40 additions and 36 deletions

View File

@ -5202,7 +5202,7 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self,
}
ACTOR Future<Void> storageRecruiter(DDTeamCollection* self,
Reference<AsyncVar<struct ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
const DDEnabledState* ddEnabledState) {
state Future<RecruitStorageReply> fCandidateWorker;
state RecruitStorageRequest lastRequest;
@ -5474,7 +5474,7 @@ ACTOR Future<Void> serverGetTeamRequests(TeamCollectionInterface tci, DDTeamColl
}
}
ACTOR Future<Void> remoteRecovered(Reference<AsyncVar<struct ServerDBInfo>> db) {
ACTOR Future<Void> remoteRecovered(Reference<AsyncVar<ServerDBInfo> const> db) {
TraceEvent("DDTrackerStarting");
while (db->get().recoveryState < RecoveryState::ALL_LOGS_RECRUITED) {
TraceEvent("DDTrackerStarting").detail("RecoveryState", (int)db->get().recoveryState);
@ -5500,8 +5500,8 @@ ACTOR Future<Void> monitorHealthyTeams(DDTeamCollection* self) {
ACTOR Future<Void> dataDistributionTeamCollection(Reference<DDTeamCollection> teamCollection,
Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci,
Reference<AsyncVar<struct ServerDBInfo>> db,
const DDEnabledState* ddEnabledState) {
Reference<AsyncVar<ServerDBInfo> const> db,
DDEnabledState const* ddEnabledState) {
state DDTeamCollection* self = teamCollection.getPtr();
state Future<Void> loggingTrigger = Void();
state PromiseStream<Void> serverRemoved;
@ -5728,16 +5728,16 @@ ACTOR Future<Void> pollMoveKeysLock(Database cx, MoveKeysLock lock, const DDEnab
}
struct DataDistributorData : NonCopyable, ReferenceCounted<DataDistributorData> {
Reference<AsyncVar<struct ServerDBInfo>> dbInfo;
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
UID ddId;
PromiseStream<Future<Void>> addActor;
DDTeamCollection* teamCollection;
DataDistributorData(Reference<AsyncVar<ServerDBInfo>> const& db, UID id)
DataDistributorData(Reference<AsyncVar<ServerDBInfo> const> const& db, UID id)
: dbInfo(db), ddId(id), teamCollection(nullptr) {}
};
ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo>> db, double* lastLimited) {
ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo> const> db, double* lastLimited) {
loop {
wait(delay(SERVER_KNOBS->METRIC_UPDATE_RATE));
@ -6105,7 +6105,7 @@ static std::set<int> const& normalDataDistributorErrors() {
return s;
}
ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<AsyncVar<struct ServerDBInfo>> db) {
ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<AsyncVar<ServerDBInfo> const> db) {
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::TRUE);
state ReadYourWritesTransaction tr(cx);
loop {
@ -6249,7 +6249,7 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
}
ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq,
Reference<AsyncVar<struct ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
DDEnabledState* ddEnabledState) {
state Future<Void> dbInfoChange = db->onChange();
if (!ddEnabledState->setDDEnabled(false, snapReq.snapUID)) {
@ -6443,7 +6443,7 @@ ACTOR Future<Void> ddGetMetrics(GetDataDistributorMetricsRequest req,
return Void();
}
ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<struct ServerDBInfo>> db) {
ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<ServerDBInfo> const> db) {
state Reference<DataDistributorData> self(new DataDistributorData(db, di.id()));
state Future<Void> collection = actorCollection(self->addActor.getFuture());
state PromiseStream<GetMetricsListRequest> getShardMetricsList;

View File

@ -35,7 +35,7 @@
#include <boost/lexical_cast.hpp>
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo>> dbInfo, int flags = 0) {
ACTOR Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo> const> dbInfo, int flags = 0) {
loop {
choose {
when(vector<WorkerDetails> w = wait(brokenPromiseToNever(
@ -48,7 +48,7 @@ ACTOR Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo>>
}
// Gets the WorkerInterface representing the Master server.
ACTOR Future<WorkerInterface> getMasterWorker(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<WorkerInterface> getMasterWorker(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
TraceEvent("GetMasterWorker").detail("Stage", "GettingWorkers");
loop {
@ -75,7 +75,7 @@ ACTOR Future<WorkerInterface> getMasterWorker(Database cx, Reference<AsyncVar<Se
}
// Gets the WorkerInterface representing the data distributor.
ACTOR Future<WorkerInterface> getDataDistributorWorker(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<WorkerInterface> getDataDistributorWorker(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
TraceEvent("GetDataDistributorWorker").detail("Stage", "GettingWorkers");
loop {
@ -118,7 +118,7 @@ ACTOR Future<int64_t> getDataInFlight(Database cx, WorkerInterface distributorWo
}
// Gets the number of bytes in flight from the data distributor.
ACTOR Future<int64_t> getDataInFlight(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<int64_t> getDataInFlight(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
WorkerInterface distributorInterf = wait(getDataDistributorWorker(cx, dbInfo));
int64_t dataInFlight = wait(getDataInFlight(cx, distributorInterf));
return dataInFlight;
@ -144,7 +144,7 @@ int64_t getPoppedVersionLag(const TraceEventFields& md) {
return persistentDataDurableVersion - queuePoppedVersion;
}
ACTOR Future<vector<WorkerInterface>> getCoordWorkers(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<vector<WorkerInterface>> getCoordWorkers(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
state std::vector<WorkerDetails> workers = wait(getWorkers(dbInfo));
Optional<Value> coordinators =
@ -177,7 +177,8 @@ ACTOR Future<vector<WorkerInterface>> getCoordWorkers(Database cx, Reference<Asy
}
// This is not robust in the face of a TLog failure
ACTOR Future<std::pair<int64_t, int64_t>> getTLogQueueInfo(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<std::pair<int64_t, int64_t>> getTLogQueueInfo(Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
TraceEvent("MaxTLogQueueSize").detail("Stage", "ContactingLogs");
state std::vector<WorkerDetails> workers = wait(getWorkers(dbInfo));
@ -245,7 +246,7 @@ ACTOR Future<vector<StorageServerInterface>> getStorageServers(Database cx, bool
}
ACTOR Future<vector<WorkerInterface>> getStorageWorkers(Database cx,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
bool localOnly) {
state std::vector<StorageServerInterface> servers = wait(getStorageServers(cx));
state std::map<NetworkAddress, WorkerInterface> workersMap;
@ -335,7 +336,7 @@ ACTOR Future<TraceEventFields> getStorageMetricsTimeout(UID storage, WorkerInter
};
// Gets the maximum size of all the storage server queues
ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ContactingStorageServers");
Future<std::vector<StorageServerInterface>> serversFuture = getStorageServers(cx);
@ -399,7 +400,7 @@ ACTOR Future<int64_t> getDataDistributionQueueSize(Database cx,
// Gets the size of the data distribution queue. If reportInFlight is true, then data in flight is considered part of
// the queue Convenience method that first finds the master worker from a zookeeper interface
ACTOR Future<int64_t> getDataDistributionQueueSize(Database cx,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
bool reportInFlight) {
WorkerInterface distributorInterf = wait(getDataDistributorWorker(cx, dbInfo));
int64_t inQueue = wait(getDataDistributionQueueSize(cx, distributorInterf, reportInFlight));
@ -516,7 +517,7 @@ ACTOR Future<bool> getTeamCollectionValid(Database cx, WorkerInterface dataDistr
// Gets if the number of process and machine teams does not exceed the maximum allowed number of teams
// Convenience method that first finds the master worker from a zookeeper interface
ACTOR Future<bool> getTeamCollectionValid(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<bool> getTeamCollectionValid(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
WorkerInterface dataDistributorWorker = wait(getDataDistributorWorker(cx, dbInfo));
bool valid = wait(getTeamCollectionValid(cx, dataDistributorWorker));
return valid;
@ -565,7 +566,9 @@ ACTOR Future<bool> getStorageServersRecruiting(Database cx, WorkerInterface dist
}
}
ACTOR Future<Void> repairDeadDatacenter(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo, std::string context) {
ACTOR Future<Void> repairDeadDatacenter(Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string context) {
if (g_network->isSimulated() && g_simulator.usableRegions > 1) {
bool primaryDead = g_simulator.datacenterDead(g_simulator.primaryDcId);
bool remoteDead = g_simulator.datacenterDead(g_simulator.remoteDcId);
@ -601,7 +604,7 @@ ACTOR Future<Void> repairDeadDatacenter(Database cx, Reference<AsyncVar<ServerDB
ACTOR Future<Void> reconfigureAfter(Database cx,
double time,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string context) {
wait(delay(time));
wait(repairDeadDatacenter(cx, dbInfo, context));
@ -611,7 +614,7 @@ ACTOR Future<Void> reconfigureAfter(Database cx,
// Waits until a database quiets down (no data in flight, small tlog queue, low SQ, no active data distribution). This
// requires the database to be available and healthy in order to succeed.
ACTOR Future<Void> waitForQuietDatabase(Database cx,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string phase,
int64_t dataInFlightGate = 2e6,
int64_t maxTLogQueueGate = 5e6,
@ -747,7 +750,7 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
}
Future<Void> quietDatabase(Database const& cx,
Reference<AsyncVar<ServerDBInfo>> const& dbInfo,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
std::string phase,
int64_t dataInFlightGate,
int64_t maxTLogQueueGate,

View File

@ -28,25 +28,26 @@
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/actorcompiler.h"
Future<int64_t> getDataInFlight(Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const&);
Future<int64_t> getDataInFlight(Database const& cx, Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<std::pair<int64_t, int64_t>> getTLogQueueInfo(Database const& cx,
Reference<AsyncVar<struct ServerDBInfo>> const&);
Future<int64_t> getMaxStorageServerQueueSize(Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const&);
Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<int64_t> getMaxStorageServerQueueSize(Database const& cx, Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<int64_t> getDataDistributionQueueSize(Database const& cx,
Reference<AsyncVar<struct ServerDBInfo>> const&,
Reference<AsyncVar<struct ServerDBInfo> const> const&,
bool const& reportInFlight);
Future<bool> getTeamCollectionValid(Database const& cx, WorkerInterface const&);
Future<bool> getTeamCollectionValid(Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const&);
Future<bool> getTeamCollectionValid(Database const& cx, Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<vector<StorageServerInterface>> getStorageServers(Database const& cx, bool const& use_system_priority = false);
Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo>> const& dbInfo, int const& flags = 0);
Future<WorkerInterface> getMasterWorker(Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo);
Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo> const> const& dbInfo, int const& flags = 0);
Future<WorkerInterface> getMasterWorker(Database const& cx, Reference<AsyncVar<ServerDBInfo> const> const& dbInfo);
Future<Void> repairDeadDatacenter(Database const& cx,
Reference<AsyncVar<ServerDBInfo>> const& dbInfo,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
std::string const& context);
Future<vector<WorkerInterface>> getStorageWorkers(Database const& cx,
Reference<AsyncVar<ServerDBInfo>> const& dbInfo,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
bool const& localOnly);
Future<vector<WorkerInterface>> getCoordWorkers(Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo);
Future<vector<WorkerInterface>> getCoordWorkers(Database const& cx,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo);
#include "flow/unactorcompiler.h"
#endif

View File

@ -910,7 +910,7 @@ ACTOR Future<Void> resolver(ResolverInterface resolver,
ACTOR Future<Void> logRouter(TLogInterface interf,
InitializeLogRouterRequest req,
Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> dataDistributor(DataDistributorInterface ddi, Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> dataDistributor(DataDistributorInterface ddi, Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> ratekeeper(RatekeeperInterface rki, Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> storageCacheServer(StorageServerInterface interf, uint16_t id, Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> backupWorker(BackupInterface bi, InitializeBackupRequest req, Reference<AsyncVar<ServerDBInfo>> db);

View File

@ -223,7 +223,7 @@ double testKeyToDouble(const KeyRef& p, const KeyRef& prefix);
ACTOR Future<Void> databaseWarmer(Database cx);
Future<Void> quietDatabase(Database const& cx,
Reference<AsyncVar<struct ServerDBInfo>> const&,
Reference<AsyncVar<struct ServerDBInfo> const> const&,
std::string phase,
int64_t dataInFlightGate = 2e6,
int64_t maxTLogQueueGate = 5e6,