Merge branch 'master' of github.com:apple/foundationdb into redwood-io-priority

This commit is contained in:
Steve Atherton 2021-07-21 13:44:48 -07:00
commit 2d3e78df77
32 changed files with 179 additions and 163 deletions

View File

@ -416,14 +416,14 @@ function(add_fdbclient_test)
message(STATUS "Adding Client test ${T_NAME}") message(STATUS "Adding Client test ${T_NAME}")
if (T_PROCESS_NUMBER) if (T_PROCESS_NUMBER)
add_test(NAME "${T_NAME}" add_test(NAME "${T_NAME}"
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/tmp_cluster.py COMMAND ${Python_EXECUTABLE} ${CMAKE_SOURCE_DIR}/tests/TestRunner/tmp_cluster.py
--build-dir ${CMAKE_BINARY_DIR} --build-dir ${CMAKE_BINARY_DIR}
--process-number ${T_PROCESS_NUMBER} --process-number ${T_PROCESS_NUMBER}
-- --
${T_COMMAND}) ${T_COMMAND})
else() else()
add_test(NAME "${T_NAME}" add_test(NAME "${T_NAME}"
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/tmp_cluster.py COMMAND ${Python_EXECUTABLE} ${CMAKE_SOURCE_DIR}/tests/TestRunner/tmp_cluster.py
--build-dir ${CMAKE_BINARY_DIR} --build-dir ${CMAKE_BINARY_DIR}
-- --
${T_COMMAND}) ${T_COMMAND})
@ -459,7 +459,7 @@ function(add_multi_fdbclient_test)
endif() endif()
message(STATUS "Adding Client test ${T_NAME}") message(STATUS "Adding Client test ${T_NAME}")
add_test(NAME "${T_NAME}" add_test(NAME "${T_NAME}"
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/tmp_multi_cluster.py COMMAND ${Python_EXECUTABLE} ${CMAKE_SOURCE_DIR}/tests/TestRunner/tmp_multi_cluster.py
--build-dir ${CMAKE_BINARY_DIR} --build-dir ${CMAKE_BINARY_DIR}
--clusters 3 --clusters 3
-- --

View File

@ -196,7 +196,7 @@ public:
Reference<CommitProxyInfo> getCommitProxies(bool useProvisionalProxies); Reference<CommitProxyInfo> getCommitProxies(bool useProvisionalProxies);
Future<Reference<CommitProxyInfo>> getCommitProxiesFuture(bool useProvisionalProxies); Future<Reference<CommitProxyInfo>> getCommitProxiesFuture(bool useProvisionalProxies);
Reference<GrvProxyInfo> getGrvProxies(bool useProvisionalProxies); Reference<GrvProxyInfo> getGrvProxies(bool useProvisionalProxies);
Future<Void> onProxiesChanged(); Future<Void> onProxiesChanged() const;
Future<HealthMetrics> getHealthMetrics(bool detailed); Future<HealthMetrics> getHealthMetrics(bool detailed);
// Returns the protocol version reported by the coordinator this client is connected to // Returns the protocol version reported by the coordinator this client is connected to
@ -255,7 +255,7 @@ public:
// private: // private:
explicit DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile, explicit DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile,
Reference<AsyncVar<ClientDBInfo>> clientDBInfo, Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator, Reference<AsyncVar<Optional<ClientLeaderRegInterface>> const> coordinator,
Future<Void> clientInfoMonitor, Future<Void> clientInfoMonitor,
TaskPriority taskID, TaskPriority taskID,
LocalityData const& clientLocality, LocalityData const& clientLocality,
@ -307,7 +307,7 @@ public:
// trust that the read version (possibly set manually by the application) is actually from the correct cluster. // trust that the read version (possibly set manually by the application) is actually from the correct cluster.
// Updated everytime we get a GRV response // Updated everytime we get a GRV response
Version minAcceptableReadVersion = std::numeric_limits<Version>::max(); Version minAcceptableReadVersion = std::numeric_limits<Version>::max();
void validateVersion(Version); void validateVersion(Version) const;
// Client status updater // Client status updater
struct ClientStatusUpdater { struct ClientStatusUpdater {
@ -399,7 +399,7 @@ public:
Future<Void> connected; Future<Void> connected;
// An AsyncVar that reports the coordinator this DatabaseContext is interacting with // An AsyncVar that reports the coordinator this DatabaseContext is interacting with
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator; Reference<AsyncVar<Optional<ClientLeaderRegInterface>> const> coordinator;
Reference<AsyncVar<Optional<ClusterInterface>>> statusClusterInterface; Reference<AsyncVar<Optional<ClusterInterface>>> statusClusterInterface;
Future<Void> statusLeaderMon; Future<Void> statusLeaderMon;
@ -428,7 +428,6 @@ public:
static bool debugUseTags; static bool debugUseTags;
static const std::vector<std::string> debugTransactionTagChoices; static const std::vector<std::string> debugTransactionTagChoices;
std::unordered_map<KeyRef, Reference<WatchMetadata>> watchMap;
// Adds or updates the specified (SS, TSS) pair in the TSS mapping (if not already present). // Adds or updates the specified (SS, TSS) pair in the TSS mapping (if not already present).
// Requests to the storage server will be duplicated to the TSS. // Requests to the storage server will be duplicated to the TSS.
@ -437,6 +436,9 @@ public:
// Removes the storage server and its TSS pair from the TSS mapping (if present). // Removes the storage server and its TSS pair from the TSS mapping (if present).
// Requests to the storage server will no longer be duplicated to its pair TSS. // Requests to the storage server will no longer be duplicated to its pair TSS.
void removeTssMapping(StorageServerInterface const& ssi); void removeTssMapping(StorageServerInterface const& ssi);
private:
std::unordered_map<KeyRef, Reference<WatchMetadata>> watchMap;
}; };
#endif #endif

View File

@ -72,7 +72,7 @@ public:
// to allow global configuration to run transactions on the latest // to allow global configuration to run transactions on the latest
// database. // database.
template <class T> template <class T>
static void create(Database& cx, Reference<AsyncVar<T>> db, const ClientDBInfo* dbInfo) { static void create(Database& cx, Reference<AsyncVar<T> const> db, const ClientDBInfo* dbInfo) {
if (g_network->global(INetwork::enGlobalConfig) == nullptr) { if (g_network->global(INetwork::enGlobalConfig) == nullptr) {
auto config = new GlobalConfig{ cx }; auto config = new GlobalConfig{ cx };
g_network->setGlobal(INetwork::enGlobalConfig, config); g_network->setGlobal(INetwork::enGlobalConfig, config);

View File

@ -49,7 +49,7 @@ struct ClientData {
OpenDatabaseRequest getRequest(); OpenDatabaseRequest getRequest();
ClientData() : clientInfo(new AsyncVar<CachedSerialization<ClientDBInfo>>(CachedSerialization<ClientDBInfo>())) {} ClientData() : clientInfo(makeReference<AsyncVar<CachedSerialization<ClientDBInfo>>>()) {}
}; };
struct MonitorLeaderInfo { struct MonitorLeaderInfo {

View File

@ -285,7 +285,7 @@ std::string unprintable(std::string const& val) {
return s; return s;
} }
void DatabaseContext::validateVersion(Version version) { void DatabaseContext::validateVersion(Version version) const {
// Version could be 0 if the INITIALIZE_NEW_DATABASE option is set. In that case, it is illegal to perform any // Version could be 0 if the INITIALIZE_NEW_DATABASE option is set. In that case, it is illegal to perform any
// reads. We throw client_invalid_operation because the caller didn't directly set the version, so the // reads. We throw client_invalid_operation because the caller didn't directly set the version, so the
// version_invalid error might be confusing. // version_invalid error might be confusing.
@ -650,7 +650,7 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext* cx) {
} }
} }
ACTOR static Future<Void> monitorProxiesChange(Reference<AsyncVar<ClientDBInfo>> clientDBInfo, ACTOR static Future<Void> monitorProxiesChange(Reference<AsyncVar<ClientDBInfo> const> clientDBInfo,
AsyncTrigger* triggerVar) { AsyncTrigger* triggerVar) {
state vector<CommitProxyInterface> curCommitProxies; state vector<CommitProxyInterface> curCommitProxies;
state vector<GrvProxyInterface> curGrvProxies; state vector<GrvProxyInterface> curGrvProxies;
@ -1085,7 +1085,7 @@ Future<RangeResult> HealthMetricsRangeImpl::getRange(ReadYourWritesTransaction*
DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile, DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile,
Reference<AsyncVar<ClientDBInfo>> clientInfo, Reference<AsyncVar<ClientDBInfo>> clientInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator, Reference<AsyncVar<Optional<ClientLeaderRegInterface>> const> coordinator,
Future<Void> clientInfoMonitor, Future<Void> clientInfoMonitor,
TaskPriority taskID, TaskPriority taskID,
LocalityData const& clientLocality, LocalityData const& clientLocality,
@ -1482,7 +1482,7 @@ void DatabaseContext::invalidateCache(const KeyRangeRef& keys) {
locationCache.insert(KeyRangeRef(begin, end), Reference<LocationInfo>()); locationCache.insert(KeyRangeRef(begin, end), Reference<LocationInfo>());
} }
Future<Void> DatabaseContext::onProxiesChanged() { Future<Void> DatabaseContext::onProxiesChanged() const {
return this->proxiesChangeTrigger.onTrigger(); return this->proxiesChangeTrigger.onTrigger();
} }
@ -1759,7 +1759,8 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
} }
auto database = Database(db); auto database = Database(db);
GlobalConfig::create(database, clientInfo, std::addressof(clientInfo->get())); GlobalConfig::create(
database, Reference<AsyncVar<ClientDBInfo> const>(clientInfo), std::addressof(clientInfo->get()));
return database; return database;
} }
@ -5760,7 +5761,7 @@ ACTOR Future<Optional<ProtocolVersion>> getCoordinatorProtocolFromConnectPacket(
NetworkAddress coordinatorAddress, NetworkAddress coordinatorAddress,
Optional<ProtocolVersion> expectedVersion) { Optional<ProtocolVersion> expectedVersion) {
state Reference<AsyncVar<Optional<ProtocolVersion>>> protocolVersion = state Reference<AsyncVar<Optional<ProtocolVersion>> const> protocolVersion =
FlowTransport::transport().getPeerProtocolAsyncVar(coordinatorAddress); FlowTransport::transport().getPeerProtocolAsyncVar(coordinatorAddress);
loop { loop {
@ -5785,7 +5786,7 @@ ACTOR Future<Optional<ProtocolVersion>> getCoordinatorProtocolFromConnectPacket(
// Returns the protocol version reported by the given coordinator // Returns the protocol version reported by the given coordinator
// If an expected version is given, the future won't return until the protocol version is different than expected // If an expected version is given, the future won't return until the protocol version is different than expected
ACTOR Future<ProtocolVersion> getClusterProtocolImpl( ACTOR Future<ProtocolVersion> getClusterProtocolImpl(
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator, Reference<AsyncVar<Optional<ClientLeaderRegInterface>> const> coordinator,
Optional<ProtocolVersion> expectedVersion) { Optional<ProtocolVersion> expectedVersion) {
state bool needToConnect = true; state bool needToConnect = true;

View File

@ -1698,7 +1698,7 @@ Reference<AsyncVar<bool>> FlowTransport::getDegraded() {
// //
// Note that this function does not establish a connection to the peer. In order to obtain a peer's protocol // Note that this function does not establish a connection to the peer. In order to obtain a peer's protocol
// version, some other mechanism should be used to connect to that peer. // version, some other mechanism should be used to connect to that peer.
Reference<AsyncVar<Optional<ProtocolVersion>>> FlowTransport::getPeerProtocolAsyncVar(NetworkAddress addr) { Reference<AsyncVar<Optional<ProtocolVersion>> const> FlowTransport::getPeerProtocolAsyncVar(NetworkAddress addr) {
return self->peers.at(addr)->protocolVersion; return self->peers.at(addr)->protocolVersion;
} }
@ -1723,4 +1723,4 @@ void FlowTransport::createInstance(bool isClient, uint64_t transportId) {
HealthMonitor* FlowTransport::healthMonitor() { HealthMonitor* FlowTransport::healthMonitor() {
return &self->healthMonitor; return &self->healthMonitor;
} }

View File

@ -252,7 +252,7 @@ public:
// //
// Note that this function does not establish a connection to the peer. In order to obtain a peer's protocol // Note that this function does not establish a connection to the peer. In order to obtain a peer's protocol
// version, some other mechanism should be used to connect to that peer. // version, some other mechanism should be used to connect to that peer.
Reference<AsyncVar<Optional<ProtocolVersion>>> getPeerProtocolAsyncVar(NetworkAddress addr); Reference<AsyncVar<Optional<ProtocolVersion>> const> getPeerProtocolAsyncVar(NetworkAddress addr);
static FlowTransport& transport() { static FlowTransport& transport() {
return *static_cast<FlowTransport*>((void*)g_network->global(INetwork::enFlowTransport)); return *static_cast<FlowTransport*>((void*)g_network->global(INetwork::enFlowTransport));

View File

@ -237,7 +237,7 @@ struct BackupData {
CounterCollection cc; CounterCollection cc;
Future<Void> logger; Future<Void> logger;
explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo>> db, const InitializeBackupRequest& req) explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo> const> db, const InitializeBackupRequest& req)
: myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion), : myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion),
endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch), endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch),
minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1), minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1),
@ -987,7 +987,7 @@ ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self, bool keyPresent)
} }
} }
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, LogEpoch recoveryCount, BackupData* self) { ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db, LogEpoch recoveryCount, BackupData* self) {
loop { loop {
bool isDisplaced = bool isDisplaced =
db->get().recoveryCount > recoveryCount && db->get().recoveryState != RecoveryState::UNINITIALIZED; db->get().recoveryCount > recoveryCount && db->get().recoveryState != RecoveryState::UNINITIALIZED;
@ -1033,7 +1033,7 @@ ACTOR static Future<Void> monitorWorkerPause(BackupData* self) {
ACTOR Future<Void> backupWorker(BackupInterface interf, ACTOR Future<Void> backupWorker(BackupInterface interf,
InitializeBackupRequest req, InitializeBackupRequest req,
Reference<AsyncVar<ServerDBInfo>> db) { Reference<AsyncVar<ServerDBInfo> const> db) {
state BackupData self(interf.id(), db, req); state BackupData self(interf.id(), db, req);
state PromiseStream<Future<Void>> addActor; state PromiseStream<Future<Void>> addActor;
state Future<Void> error = actorCollection(addActor.getFuture()); state Future<Void> error = actorCollection(addActor.getFuture());

View File

@ -1596,7 +1596,7 @@ ACTOR static Future<Void> rejoinServer(CommitProxyInterface proxy, ProxyCommitDa
} }
} }
ACTOR Future<Void> ddMetricsRequestServer(CommitProxyInterface proxy, Reference<AsyncVar<ServerDBInfo>> db) { ACTOR Future<Void> ddMetricsRequestServer(CommitProxyInterface proxy, Reference<AsyncVar<ServerDBInfo> const> db) {
loop { loop {
choose { choose {
when(state GetDDMetricsRequest req = waitNext(proxy.getDDMetrics.getFuture())) { when(state GetDDMetricsRequest req = waitNext(proxy.getDDMetrics.getFuture())) {
@ -1754,7 +1754,8 @@ ACTOR Future<Void> proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* co
return Void(); return Void();
} }
ACTOR Future<Void> proxyCheckSafeExclusion(Reference<AsyncVar<ServerDBInfo>> db, ExclusionSafetyCheckRequest req) { ACTOR Future<Void> proxyCheckSafeExclusion(Reference<AsyncVar<ServerDBInfo> const> db,
ExclusionSafetyCheckRequest req) {
TraceEvent("SafetyCheckCommitProxyBegin"); TraceEvent("SafetyCheckCommitProxyBegin");
state ExclusionSafetyCheckReply reply(false); state ExclusionSafetyCheckReply reply(false);
if (!db->get().distributor.present()) { if (!db->get().distributor.present()) {
@ -1783,7 +1784,7 @@ ACTOR Future<Void> proxyCheckSafeExclusion(Reference<AsyncVar<ServerDBInfo>> db,
} }
ACTOR Future<Void> reportTxnTagCommitCost(UID myID, ACTOR Future<Void> reportTxnTagCommitCost(UID myID,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
UIDTransactionTagMap<TransactionCommitCostEstimation>* ssTrTagCommitCost) { UIDTransactionTagMap<TransactionCommitCostEstimation>* ssTrTagCommitCost) {
state Future<Void> nextRequestTimer = Never(); state Future<Void> nextRequestTimer = Never();
state Future<Void> nextReply = Never(); state Future<Void> nextReply = Never();
@ -1818,7 +1819,7 @@ ACTOR Future<Void> reportTxnTagCommitCost(UID myID,
ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy, ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
MasterInterface master, MasterInterface master,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
LogEpoch epoch, LogEpoch epoch,
Version recoveryTransactionVersion, Version recoveryTransactionVersion,
bool firstProxy, bool firstProxy,
@ -2037,7 +2038,7 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
} }
} }
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db,
uint64_t recoveryCount, uint64_t recoveryCount,
CommitProxyInterface myInterface) { CommitProxyInterface myInterface) {
loop { loop {
@ -2051,7 +2052,7 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> commitProxyServer(CommitProxyInterface proxy, ACTOR Future<Void> commitProxyServer(CommitProxyInterface proxy,
InitializeCommitProxyRequest req, InitializeCommitProxyRequest req,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
std::string whitelistBinPaths) { std::string whitelistBinPaths) {
try { try {
state Future<Void> core = commitProxyServerCore(proxy, state Future<Void> core = commitProxyServerCore(proxy,

View File

@ -126,7 +126,7 @@ class ReadFromLocalConfigEnvironment {
UID id; UID id;
std::string dataDir; std::string dataDir;
LocalConfiguration localConfiguration; LocalConfiguration localConfiguration;
Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> cbfi; Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> cbfi;
Future<Void> consumer; Future<Void> consumer;
ACTOR static Future<Void> checkEventually(LocalConfiguration const* localConfiguration, ACTOR static Future<Void> checkEventually(LocalConfiguration const* localConfiguration,
@ -168,7 +168,7 @@ public:
return setup(); return setup();
} }
void connectToBroadcaster(Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> const& cbfi) { void connectToBroadcaster(Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> const& cbfi) {
ASSERT(!this->cbfi); ASSERT(!this->cbfi);
this->cbfi = cbfi; this->cbfi = cbfi;
consumer = localConfiguration.consume(cbfi); consumer = localConfiguration.consume(cbfi);
@ -228,7 +228,7 @@ class BroadcasterToLocalConfigEnvironment {
ACTOR static Future<Void> setup(BroadcasterToLocalConfigEnvironment* self) { ACTOR static Future<Void> setup(BroadcasterToLocalConfigEnvironment* self) {
wait(self->readFrom.setup()); wait(self->readFrom.setup());
self->readFrom.connectToBroadcaster(IDependentAsyncVar<ConfigBroadcastFollowerInterface>::create(self->cbfi)); self->readFrom.connectToBroadcaster(IAsyncListener<ConfigBroadcastFollowerInterface>::create(self->cbfi));
self->broadcastServer = self->broadcaster.serve(self->cbfi->get()); self->broadcastServer = self->broadcaster.serve(self->cbfi->get());
return Void(); return Void();
} }
@ -364,7 +364,7 @@ class TransactionToLocalConfigEnvironment {
ACTOR static Future<Void> setup(TransactionToLocalConfigEnvironment* self) { ACTOR static Future<Void> setup(TransactionToLocalConfigEnvironment* self) {
wait(self->readFrom.setup()); wait(self->readFrom.setup());
self->readFrom.connectToBroadcaster(IDependentAsyncVar<ConfigBroadcastFollowerInterface>::create(self->cbfi)); self->readFrom.connectToBroadcaster(IAsyncListener<ConfigBroadcastFollowerInterface>::create(self->cbfi));
self->broadcastServer = self->broadcaster.serve(self->cbfi->get()); self->broadcastServer = self->broadcaster.serve(self->cbfi->get());
return Void(); return Void();
} }

View File

@ -5218,7 +5218,7 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self,
} }
ACTOR Future<Void> storageRecruiter(DDTeamCollection* self, ACTOR Future<Void> storageRecruiter(DDTeamCollection* self,
Reference<AsyncVar<struct ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
const DDEnabledState* ddEnabledState) { const DDEnabledState* ddEnabledState) {
state Future<RecruitStorageReply> fCandidateWorker; state Future<RecruitStorageReply> fCandidateWorker;
state RecruitStorageRequest lastRequest; state RecruitStorageRequest lastRequest;
@ -5490,7 +5490,7 @@ ACTOR Future<Void> serverGetTeamRequests(TeamCollectionInterface tci, DDTeamColl
} }
} }
ACTOR Future<Void> remoteRecovered(Reference<AsyncVar<struct ServerDBInfo>> db) { ACTOR Future<Void> remoteRecovered(Reference<AsyncVar<ServerDBInfo> const> db) {
TraceEvent("DDTrackerStarting"); TraceEvent("DDTrackerStarting");
while (db->get().recoveryState < RecoveryState::ALL_LOGS_RECRUITED) { while (db->get().recoveryState < RecoveryState::ALL_LOGS_RECRUITED) {
TraceEvent("DDTrackerStarting").detail("RecoveryState", (int)db->get().recoveryState); TraceEvent("DDTrackerStarting").detail("RecoveryState", (int)db->get().recoveryState);
@ -5516,8 +5516,8 @@ ACTOR Future<Void> monitorHealthyTeams(DDTeamCollection* self) {
ACTOR Future<Void> dataDistributionTeamCollection(Reference<DDTeamCollection> teamCollection, ACTOR Future<Void> dataDistributionTeamCollection(Reference<DDTeamCollection> teamCollection,
Reference<InitialDataDistribution> initData, Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci, TeamCollectionInterface tci,
Reference<AsyncVar<struct ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
const DDEnabledState* ddEnabledState) { DDEnabledState const* ddEnabledState) {
state DDTeamCollection* self = teamCollection.getPtr(); state DDTeamCollection* self = teamCollection.getPtr();
state Future<Void> loggingTrigger = Void(); state Future<Void> loggingTrigger = Void();
state PromiseStream<Void> serverRemoved; state PromiseStream<Void> serverRemoved;
@ -5744,16 +5744,16 @@ ACTOR Future<Void> pollMoveKeysLock(Database cx, MoveKeysLock lock, const DDEnab
} }
struct DataDistributorData : NonCopyable, ReferenceCounted<DataDistributorData> { struct DataDistributorData : NonCopyable, ReferenceCounted<DataDistributorData> {
Reference<AsyncVar<struct ServerDBInfo>> dbInfo; Reference<AsyncVar<ServerDBInfo> const> dbInfo;
UID ddId; UID ddId;
PromiseStream<Future<Void>> addActor; PromiseStream<Future<Void>> addActor;
DDTeamCollection* teamCollection; DDTeamCollection* teamCollection;
DataDistributorData(Reference<AsyncVar<ServerDBInfo>> const& db, UID id) DataDistributorData(Reference<AsyncVar<ServerDBInfo> const> const& db, UID id)
: dbInfo(db), ddId(id), teamCollection(nullptr) {} : dbInfo(db), ddId(id), teamCollection(nullptr) {}
}; };
ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo>> db, double* lastLimited) { ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo> const> db, double* lastLimited) {
loop { loop {
wait(delay(SERVER_KNOBS->METRIC_UPDATE_RATE)); wait(delay(SERVER_KNOBS->METRIC_UPDATE_RATE));
@ -6121,7 +6121,7 @@ static std::set<int> const& normalDataDistributorErrors() {
return s; return s;
} }
ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<AsyncVar<struct ServerDBInfo>> db) { ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<AsyncVar<ServerDBInfo> const> db) {
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True); state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True);
state ReadYourWritesTransaction tr(cx); state ReadYourWritesTransaction tr(cx);
loop { loop {
@ -6265,7 +6265,7 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
} }
ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq, ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq,
Reference<AsyncVar<struct ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
DDEnabledState* ddEnabledState) { DDEnabledState* ddEnabledState) {
state Future<Void> dbInfoChange = db->onChange(); state Future<Void> dbInfoChange = db->onChange();
if (!ddEnabledState->setDDEnabled(false, snapReq.snapUID)) { if (!ddEnabledState->setDDEnabled(false, snapReq.snapUID)) {
@ -6459,7 +6459,7 @@ ACTOR Future<Void> ddGetMetrics(GetDataDistributorMetricsRequest req,
return Void(); return Void();
} }
ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<struct ServerDBInfo>> db) { ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<ServerDBInfo> const> db) {
state Reference<DataDistributorData> self(new DataDistributorData(db, di.id())); state Reference<DataDistributorData> self(new DataDistributorData(db, di.id()));
state Future<Void> collection = actorCollection(self->addActor.getFuture()); state Future<Void> collection = actorCollection(self->addActor.getFuture());
state PromiseStream<GetMetricsListRequest> getShardMetricsList; state PromiseStream<GetMetricsListRequest> getShardMetricsList;

View File

@ -222,7 +222,7 @@ struct GrvProxyData {
Reference<ILogSystem> logSystem; Reference<ILogSystem> logSystem;
Database cx; Database cx;
Reference<AsyncVar<ServerDBInfo>> db; Reference<AsyncVar<ServerDBInfo> const> db;
Optional<LatencyBandConfig> latencyBandConfig; Optional<LatencyBandConfig> latencyBandConfig;
double lastStartCommit; double lastStartCommit;
@ -251,7 +251,7 @@ struct GrvProxyData {
GrvProxyData(UID dbgid, GrvProxyData(UID dbgid,
MasterInterface master, MasterInterface master,
RequestStream<GetReadVersionRequest> getConsistentReadVersion, RequestStream<GetReadVersionRequest> getConsistentReadVersion,
Reference<AsyncVar<ServerDBInfo>> db) Reference<AsyncVar<ServerDBInfo> const> db)
: dbgid(dbgid), stats(dbgid), master(master), getConsistentReadVersion(getConsistentReadVersion), : dbgid(dbgid), stats(dbgid), master(master), getConsistentReadVersion(getConsistentReadVersion),
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True)), db(db), lastStartCommit(0), cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True)), db(db), lastStartCommit(0),
lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), updateCommitRequests(0), lastCommitTime(0), lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), updateCommitRequests(0), lastCommitTime(0),
@ -275,7 +275,7 @@ ACTOR Future<Void> healthMetricsRequestServer(GrvProxyInterface grvProxy,
// Get transaction rate info from RateKeeper. // Get transaction rate info from RateKeeper.
ACTOR Future<Void> getRate(UID myID, ACTOR Future<Void> getRate(UID myID,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
int64_t* inTransactionCount, int64_t* inTransactionCount,
int64_t* inBatchTransactionCount, int64_t* inBatchTransactionCount,
GrvTransactionRateInfo* transactionRateInfo, GrvTransactionRateInfo* transactionRateInfo,
@ -375,7 +375,7 @@ void dropRequestFromQueue(Deque<GetReadVersionRequest>* queue, GrvProxyStats* st
} }
// Put a GetReadVersion request into the queue corresponding to its priority. // Put a GetReadVersion request into the queue corresponding to its priority.
ACTOR Future<Void> queueGetReadVersionRequests(Reference<AsyncVar<ServerDBInfo>> db, ACTOR Future<Void> queueGetReadVersionRequests(Reference<AsyncVar<ServerDBInfo> const> db,
SpannedDeque<GetReadVersionRequest>* systemQueue, SpannedDeque<GetReadVersionRequest>* systemQueue,
SpannedDeque<GetReadVersionRequest>* defaultQueue, SpannedDeque<GetReadVersionRequest>* defaultQueue,
SpannedDeque<GetReadVersionRequest>* batchQueue, SpannedDeque<GetReadVersionRequest>* batchQueue,
@ -634,7 +634,7 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture,
return Void(); return Void();
} }
ACTOR Future<Void> monitorDDMetricsChanges(int64_t* midShardSize, Reference<AsyncVar<ServerDBInfo>> db) { ACTOR Future<Void> monitorDDMetricsChanges(int64_t* midShardSize, Reference<AsyncVar<ServerDBInfo> const> db) {
state Future<Void> nextRequestTimer = Never(); state Future<Void> nextRequestTimer = Never();
state Future<GetDataDistributorMetricsReply> nextReply = Never(); state Future<GetDataDistributorMetricsReply> nextReply = Never();
@ -680,7 +680,7 @@ ACTOR Future<Void> monitorDDMetricsChanges(int64_t* midShardSize, Reference<Asyn
} }
ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy, ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
PromiseStream<Future<Void>> addActor, PromiseStream<Future<Void>> addActor,
GrvProxyData* grvProxyData, GrvProxyData* grvProxyData,
GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* healthMetricsReply,
@ -898,7 +898,7 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
ACTOR Future<Void> grvProxyServerCore(GrvProxyInterface proxy, ACTOR Future<Void> grvProxyServerCore(GrvProxyInterface proxy,
MasterInterface master, MasterInterface master,
Reference<AsyncVar<ServerDBInfo>> db) { Reference<AsyncVar<ServerDBInfo> const> db) {
state GrvProxyData grvProxyData(proxy.id(), master, proxy.getConsistentReadVersion, db); state GrvProxyData grvProxyData(proxy.id(), master, proxy.getConsistentReadVersion, db);
state PromiseStream<Future<Void>> addActor; state PromiseStream<Future<Void>> addActor;
@ -945,7 +945,7 @@ ACTOR Future<Void> grvProxyServerCore(GrvProxyInterface proxy,
} }
} }
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db,
uint64_t recoveryCount, uint64_t recoveryCount,
GrvProxyInterface myInterface) { GrvProxyInterface myInterface) {
loop { loop {
@ -959,7 +959,7 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> grvProxyServer(GrvProxyInterface proxy, ACTOR Future<Void> grvProxyServer(GrvProxyInterface proxy,
InitializeGrvProxyRequest req, InitializeGrvProxyRequest req,
Reference<AsyncVar<ServerDBInfo>> db) { Reference<AsyncVar<ServerDBInfo> const> db) {
try { try {
state Future<Void> core = grvProxyServerCore(proxy, req.master, db); state Future<Void> core = grvProxyServerCore(proxy, req.master, db);
wait(core || checkRemoved(db, req.recoveryCount, proxy)); wait(core || checkRemoved(db, req.recoveryCount, proxy));

View File

@ -309,9 +309,8 @@ class LocalConfigurationImpl {
} }
} }
ACTOR static Future<Void> consume( ACTOR static Future<Void> consume(LocalConfigurationImpl* self,
LocalConfigurationImpl* self, Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> broadcaster) {
Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> broadcaster) {
ASSERT(self->initFuture.isValid() && self->initFuture.isReady()); ASSERT(self->initFuture.isValid() && self->initFuture.isReady());
loop { loop {
choose { choose {
@ -371,7 +370,7 @@ public:
return getKnobs().getTestKnobs(); return getKnobs().getTestKnobs();
} }
Future<Void> consume(Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> const& broadcaster) { Future<Void> consume(Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> const& broadcaster) {
return consume(this, broadcaster); return consume(this, broadcaster);
} }
@ -453,7 +452,7 @@ TestKnobs const& LocalConfiguration::getTestKnobs() const {
} }
Future<Void> LocalConfiguration::consume( Future<Void> LocalConfiguration::consume(
Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> const& broadcaster) { Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> const& broadcaster) {
return impl().consume(broadcaster); return impl().consume(broadcaster);
} }

View File

@ -60,7 +60,7 @@ public:
ClientKnobs const& getClientKnobs() const; ClientKnobs const& getClientKnobs() const;
ServerKnobs const& getServerKnobs() const; ServerKnobs const& getServerKnobs() const;
TestKnobs const& getTestKnobs() const; TestKnobs const& getTestKnobs() const;
Future<Void> consume(Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> const& broadcaster); Future<Void> consume(Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> const& broadcaster);
UID getID() const; UID getID() const;
public: // Testing public: // Testing

View File

@ -625,7 +625,7 @@ ACTOR Future<Void> logRouterPop(LogRouterData* self, TLogPopRequest req) {
ACTOR Future<Void> logRouterCore(TLogInterface interf, ACTOR Future<Void> logRouterCore(TLogInterface interf,
InitializeLogRouterRequest req, InitializeLogRouterRequest req,
Reference<AsyncVar<ServerDBInfo>> db) { Reference<AsyncVar<ServerDBInfo> const> db) {
state LogRouterData logRouterData(interf.id(), req); state LogRouterData logRouterData(interf.id(), req);
state PromiseStream<Future<Void>> addActor; state PromiseStream<Future<Void>> addActor;
state Future<Void> error = actorCollection(addActor.getFuture()); state Future<Void> error = actorCollection(addActor.getFuture());
@ -653,7 +653,7 @@ ACTOR Future<Void> logRouterCore(TLogInterface interf,
} }
} }
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db,
uint64_t recoveryCount, uint64_t recoveryCount,
TLogInterface myInterface) { TLogInterface myInterface) {
loop { loop {
@ -670,7 +670,7 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> logRouter(TLogInterface interf, ACTOR Future<Void> logRouter(TLogInterface interf,
InitializeLogRouterRequest req, InitializeLogRouterRequest req,
Reference<AsyncVar<ServerDBInfo>> db) { Reference<AsyncVar<ServerDBInfo> const> db) {
try { try {
TraceEvent("LogRouterStart", interf.id()) TraceEvent("LogRouterStart", interf.id())
.detail("Start", req.startVersion) .detail("Start", req.startVersion)

View File

@ -291,7 +291,7 @@ struct TLogData : NonCopyable {
AsyncVar<bool> AsyncVar<bool>
largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES
Reference<AsyncVar<ServerDBInfo>> dbInfo; Reference<AsyncVar<ServerDBInfo> const> dbInfo;
NotifiedVersion queueCommitEnd; NotifiedVersion queueCommitEnd;
Version queueCommitBegin; Version queueCommitBegin;
@ -321,7 +321,7 @@ struct TLogData : NonCopyable {
UID workerID, UID workerID,
IKeyValueStore* persistentData, IKeyValueStore* persistentData,
IDiskQueue* persistentQueue, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> const& dbInfo) Reference<AsyncVar<ServerDBInfo> const> const& dbInfo)
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()), : dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentData(persistentData), rawPersistentQueue(persistentQueue),
persistentQueue(new TLogQueue(persistentQueue, dbgid)), dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0), persistentQueue(new TLogQueue(persistentQueue, dbgid)), dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0),
@ -1568,7 +1568,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, LocalityData locality)
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality, LocalityData locality,
UID tlogId, UID tlogId,
UID workerID) { UID workerID) {

View File

@ -264,7 +264,7 @@ struct TLogData : NonCopyable {
AsyncVar<bool> AsyncVar<bool>
largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES
Reference<AsyncVar<ServerDBInfo>> dbInfo; Reference<AsyncVar<ServerDBInfo> const> dbInfo;
Database cx; Database cx;
NotifiedVersion queueCommitEnd; NotifiedVersion queueCommitEnd;
@ -301,7 +301,7 @@ struct TLogData : NonCopyable {
UID workerID, UID workerID,
IKeyValueStore* persistentData, IKeyValueStore* persistentData,
IDiskQueue* persistentQueue, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<ServerDBInfo> const> dbInfo,
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<bool>> degraded,
std::string folder) std::string folder)
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()), : dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
@ -2716,7 +2716,7 @@ ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen
// New tLog (if !recoverFrom.size()) or restore from network // New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality, LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests, PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId, UID tlogId,

View File

@ -327,7 +327,7 @@ struct TLogData : NonCopyable {
AsyncVar<bool> AsyncVar<bool>
largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES
Reference<AsyncVar<ServerDBInfo>> dbInfo; Reference<AsyncVar<ServerDBInfo> const> dbInfo;
Database cx; Database cx;
NotifiedVersion queueCommitEnd; NotifiedVersion queueCommitEnd;
@ -364,7 +364,7 @@ struct TLogData : NonCopyable {
UID workerID, UID workerID,
IKeyValueStore* persistentData, IKeyValueStore* persistentData,
IDiskQueue* persistentQueue, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<ServerDBInfo> const> dbInfo,
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<bool>> degraded,
std::string folder) std::string folder)
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()), : dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
@ -3205,7 +3205,7 @@ ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen
// New tLog (if !recoverFrom.size()) or restore from network // New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality, LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests, PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId, UID tlogId,

View File

@ -161,7 +161,7 @@ struct ProxyCommitData {
RequestStream<GetReadVersionRequest> getConsistentReadVersion; RequestStream<GetReadVersionRequest> getConsistentReadVersion;
RequestStream<CommitTransactionRequest> commit; RequestStream<CommitTransactionRequest> commit;
Database cx; Database cx;
Reference<AsyncVar<ServerDBInfo>> db; Reference<AsyncVar<ServerDBInfo> const> db;
EventMetricHandle<SingleKeyMutation> singleKeyMutationEvent; EventMetricHandle<SingleKeyMutation> singleKeyMutationEvent;
std::map<UID, Reference<StorageInfo>> storageCache; std::map<UID, Reference<StorageInfo>> storageCache;
@ -239,7 +239,7 @@ struct ProxyCommitData {
RequestStream<GetReadVersionRequest> getConsistentReadVersion, RequestStream<GetReadVersionRequest> getConsistentReadVersion,
Version recoveryTransactionVersion, Version recoveryTransactionVersion,
RequestStream<CommitTransactionRequest> commit, RequestStream<CommitTransactionRequest> commit,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
bool firstProxy) bool firstProxy)
: dbgid(dbgid), stats(dbgid, &version, &committedVersion, &commitBatchesMemBytesCount), master(master), : dbgid(dbgid), stats(dbgid, &version, &committedVersion, &commitBatchesMemBytesCount), master(master),
logAdapter(nullptr), txnStateStore(nullptr), popRemoteTxs(false), committedVersion(recoveryTransactionVersion), logAdapter(nullptr), txnStateStore(nullptr), popRemoteTxs(false), committedVersion(recoveryTransactionVersion),

View File

@ -35,7 +35,7 @@
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
#include "flow/actorcompiler.h" // This must be the last #include. #include "flow/actorcompiler.h" // This must be the last #include.
ACTOR Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo>> dbInfo, int flags = 0) { ACTOR Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo> const> dbInfo, int flags = 0) {
loop { loop {
choose { choose {
when(vector<WorkerDetails> w = wait(brokenPromiseToNever( when(vector<WorkerDetails> w = wait(brokenPromiseToNever(
@ -48,7 +48,7 @@ ACTOR Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo>>
} }
// Gets the WorkerInterface representing the Master server. // Gets the WorkerInterface representing the Master server.
ACTOR Future<WorkerInterface> getMasterWorker(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) { ACTOR Future<WorkerInterface> getMasterWorker(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
TraceEvent("GetMasterWorker").detail("Stage", "GettingWorkers"); TraceEvent("GetMasterWorker").detail("Stage", "GettingWorkers");
loop { loop {
@ -75,7 +75,7 @@ ACTOR Future<WorkerInterface> getMasterWorker(Database cx, Reference<AsyncVar<Se
} }
// Gets the WorkerInterface representing the data distributor. // Gets the WorkerInterface representing the data distributor.
ACTOR Future<WorkerInterface> getDataDistributorWorker(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) { ACTOR Future<WorkerInterface> getDataDistributorWorker(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
TraceEvent("GetDataDistributorWorker").detail("Stage", "GettingWorkers"); TraceEvent("GetDataDistributorWorker").detail("Stage", "GettingWorkers");
loop { loop {
@ -118,7 +118,7 @@ ACTOR Future<int64_t> getDataInFlight(Database cx, WorkerInterface distributorWo
} }
// Gets the number of bytes in flight from the data distributor. // Gets the number of bytes in flight from the data distributor.
ACTOR Future<int64_t> getDataInFlight(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) { ACTOR Future<int64_t> getDataInFlight(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
WorkerInterface distributorInterf = wait(getDataDistributorWorker(cx, dbInfo)); WorkerInterface distributorInterf = wait(getDataDistributorWorker(cx, dbInfo));
int64_t dataInFlight = wait(getDataInFlight(cx, distributorInterf)); int64_t dataInFlight = wait(getDataInFlight(cx, distributorInterf));
return dataInFlight; return dataInFlight;
@ -144,7 +144,7 @@ int64_t getPoppedVersionLag(const TraceEventFields& md) {
return persistentDataDurableVersion - queuePoppedVersion; return persistentDataDurableVersion - queuePoppedVersion;
} }
ACTOR Future<vector<WorkerInterface>> getCoordWorkers(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) { ACTOR Future<vector<WorkerInterface>> getCoordWorkers(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
state std::vector<WorkerDetails> workers = wait(getWorkers(dbInfo)); state std::vector<WorkerDetails> workers = wait(getWorkers(dbInfo));
Optional<Value> coordinators = Optional<Value> coordinators =
@ -177,7 +177,8 @@ ACTOR Future<vector<WorkerInterface>> getCoordWorkers(Database cx, Reference<Asy
} }
// This is not robust in the face of a TLog failure // This is not robust in the face of a TLog failure
ACTOR Future<std::pair<int64_t, int64_t>> getTLogQueueInfo(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) { ACTOR Future<std::pair<int64_t, int64_t>> getTLogQueueInfo(Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
TraceEvent("MaxTLogQueueSize").detail("Stage", "ContactingLogs"); TraceEvent("MaxTLogQueueSize").detail("Stage", "ContactingLogs");
state std::vector<WorkerDetails> workers = wait(getWorkers(dbInfo)); state std::vector<WorkerDetails> workers = wait(getWorkers(dbInfo));
@ -245,7 +246,7 @@ ACTOR Future<vector<StorageServerInterface>> getStorageServers(Database cx, bool
} }
ACTOR Future<vector<WorkerInterface>> getStorageWorkers(Database cx, ACTOR Future<vector<WorkerInterface>> getStorageWorkers(Database cx,
Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<ServerDBInfo> const> dbInfo,
bool localOnly) { bool localOnly) {
state std::vector<StorageServerInterface> servers = wait(getStorageServers(cx)); state std::vector<StorageServerInterface> servers = wait(getStorageServers(cx));
state std::map<NetworkAddress, WorkerInterface> workersMap; state std::map<NetworkAddress, WorkerInterface> workersMap;
@ -335,7 +336,7 @@ ACTOR Future<TraceEventFields> getStorageMetricsTimeout(UID storage, WorkerInter
}; };
// Gets the maximum size of all the storage server queues // Gets the maximum size of all the storage server queues
ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) { ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ContactingStorageServers"); TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ContactingStorageServers");
Future<std::vector<StorageServerInterface>> serversFuture = getStorageServers(cx); Future<std::vector<StorageServerInterface>> serversFuture = getStorageServers(cx);
@ -399,7 +400,7 @@ ACTOR Future<int64_t> getDataDistributionQueueSize(Database cx,
// Gets the size of the data distribution queue. If reportInFlight is true, then data in flight is considered part of // Gets the size of the data distribution queue. If reportInFlight is true, then data in flight is considered part of
// the queue Convenience method that first finds the master worker from a zookeeper interface // the queue Convenience method that first finds the master worker from a zookeeper interface
ACTOR Future<int64_t> getDataDistributionQueueSize(Database cx, ACTOR Future<int64_t> getDataDistributionQueueSize(Database cx,
Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<ServerDBInfo> const> dbInfo,
bool reportInFlight) { bool reportInFlight) {
WorkerInterface distributorInterf = wait(getDataDistributorWorker(cx, dbInfo)); WorkerInterface distributorInterf = wait(getDataDistributorWorker(cx, dbInfo));
int64_t inQueue = wait(getDataDistributionQueueSize(cx, distributorInterf, reportInFlight)); int64_t inQueue = wait(getDataDistributionQueueSize(cx, distributorInterf, reportInFlight));
@ -516,7 +517,7 @@ ACTOR Future<bool> getTeamCollectionValid(Database cx, WorkerInterface dataDistr
// Gets if the number of process and machine teams does not exceed the maximum allowed number of teams // Gets if the number of process and machine teams does not exceed the maximum allowed number of teams
// Convenience method that first finds the master worker from a zookeeper interface // Convenience method that first finds the master worker from a zookeeper interface
ACTOR Future<bool> getTeamCollectionValid(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) { ACTOR Future<bool> getTeamCollectionValid(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
WorkerInterface dataDistributorWorker = wait(getDataDistributorWorker(cx, dbInfo)); WorkerInterface dataDistributorWorker = wait(getDataDistributorWorker(cx, dbInfo));
bool valid = wait(getTeamCollectionValid(cx, dataDistributorWorker)); bool valid = wait(getTeamCollectionValid(cx, dataDistributorWorker));
return valid; return valid;
@ -565,7 +566,9 @@ ACTOR Future<bool> getStorageServersRecruiting(Database cx, WorkerInterface dist
} }
} }
ACTOR Future<Void> repairDeadDatacenter(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo, std::string context) { ACTOR Future<Void> repairDeadDatacenter(Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string context) {
if (g_network->isSimulated() && g_simulator.usableRegions > 1) { if (g_network->isSimulated() && g_simulator.usableRegions > 1) {
bool primaryDead = g_simulator.datacenterDead(g_simulator.primaryDcId); bool primaryDead = g_simulator.datacenterDead(g_simulator.primaryDcId);
bool remoteDead = g_simulator.datacenterDead(g_simulator.remoteDcId); bool remoteDead = g_simulator.datacenterDead(g_simulator.remoteDcId);
@ -601,7 +604,7 @@ ACTOR Future<Void> repairDeadDatacenter(Database cx, Reference<AsyncVar<ServerDB
ACTOR Future<Void> reconfigureAfter(Database cx, ACTOR Future<Void> reconfigureAfter(Database cx,
double time, double time,
Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string context) { std::string context) {
wait(delay(time)); wait(delay(time));
wait(repairDeadDatacenter(cx, dbInfo, context)); wait(repairDeadDatacenter(cx, dbInfo, context));
@ -611,7 +614,7 @@ ACTOR Future<Void> reconfigureAfter(Database cx,
// Waits until a database quiets down (no data in flight, small tlog queue, low SQ, no active data distribution). This // Waits until a database quiets down (no data in flight, small tlog queue, low SQ, no active data distribution). This
// requires the database to be available and healthy in order to succeed. // requires the database to be available and healthy in order to succeed.
ACTOR Future<Void> waitForQuietDatabase(Database cx, ACTOR Future<Void> waitForQuietDatabase(Database cx,
Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string phase, std::string phase,
int64_t dataInFlightGate = 2e6, int64_t dataInFlightGate = 2e6,
int64_t maxTLogQueueGate = 5e6, int64_t maxTLogQueueGate = 5e6,
@ -747,7 +750,7 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
} }
Future<Void> quietDatabase(Database const& cx, Future<Void> quietDatabase(Database const& cx,
Reference<AsyncVar<ServerDBInfo>> const& dbInfo, Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
std::string phase, std::string phase,
int64_t dataInFlightGate, int64_t dataInFlightGate,
int64_t maxTLogQueueGate, int64_t maxTLogQueueGate,

View File

@ -28,25 +28,26 @@
#include "fdbserver/WorkerInterface.actor.h" #include "fdbserver/WorkerInterface.actor.h"
#include "flow/actorcompiler.h" #include "flow/actorcompiler.h"
Future<int64_t> getDataInFlight(Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const&); Future<int64_t> getDataInFlight(Database const& cx, Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<std::pair<int64_t, int64_t>> getTLogQueueInfo(Database const& cx, Future<std::pair<int64_t, int64_t>> getTLogQueueInfo(Database const& cx,
Reference<AsyncVar<struct ServerDBInfo>> const&); Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<int64_t> getMaxStorageServerQueueSize(Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const&); Future<int64_t> getMaxStorageServerQueueSize(Database const& cx, Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<int64_t> getDataDistributionQueueSize(Database const& cx, Future<int64_t> getDataDistributionQueueSize(Database const& cx,
Reference<AsyncVar<struct ServerDBInfo>> const&, Reference<AsyncVar<struct ServerDBInfo> const> const&,
bool const& reportInFlight); bool const& reportInFlight);
Future<bool> getTeamCollectionValid(Database const& cx, WorkerInterface const&); Future<bool> getTeamCollectionValid(Database const& cx, WorkerInterface const&);
Future<bool> getTeamCollectionValid(Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const&); Future<bool> getTeamCollectionValid(Database const& cx, Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<vector<StorageServerInterface>> getStorageServers(Database const& cx, bool const& use_system_priority = false); Future<vector<StorageServerInterface>> getStorageServers(Database const& cx, bool const& use_system_priority = false);
Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo>> const& dbInfo, int const& flags = 0); Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo> const> const& dbInfo, int const& flags = 0);
Future<WorkerInterface> getMasterWorker(Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo); Future<WorkerInterface> getMasterWorker(Database const& cx, Reference<AsyncVar<ServerDBInfo> const> const& dbInfo);
Future<Void> repairDeadDatacenter(Database const& cx, Future<Void> repairDeadDatacenter(Database const& cx,
Reference<AsyncVar<ServerDBInfo>> const& dbInfo, Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
std::string const& context); std::string const& context);
Future<vector<WorkerInterface>> getStorageWorkers(Database const& cx, Future<vector<WorkerInterface>> getStorageWorkers(Database const& cx,
Reference<AsyncVar<ServerDBInfo>> const& dbInfo, Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
bool const& localOnly); bool const& localOnly);
Future<vector<WorkerInterface>> getCoordWorkers(Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo); Future<vector<WorkerInterface>> getCoordWorkers(Database const& cx,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo);
#include "flow/unactorcompiler.h" #include "flow/unactorcompiler.h"
#endif #endif

View File

@ -1408,7 +1408,7 @@ ACTOR Future<Void> configurationMonitor(RatekeeperData* self) {
} }
} }
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo>> dbInfo) { ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
state RatekeeperData self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)); state RatekeeperData self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True));
state Future<Void> timeout = Void(); state Future<Void> timeout = Void();
state std::vector<Future<Void>> tlogTrackers; state std::vector<Future<Void>> tlogTrackers;

View File

@ -354,7 +354,7 @@ ACTOR Future<Void> resolverCore(ResolverInterface resolver, InitializeResolverRe
} }
} }
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db,
uint64_t recoveryCount, uint64_t recoveryCount,
ResolverInterface myInterface) { ResolverInterface myInterface) {
loop { loop {
@ -367,7 +367,7 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> resolver(ResolverInterface resolver, ACTOR Future<Void> resolver(ResolverInterface resolver,
InitializeResolverRequest initReq, InitializeResolverRequest initReq,
Reference<AsyncVar<ServerDBInfo>> db) { Reference<AsyncVar<ServerDBInfo> const> db) {
try { try {
state Future<Void> core = resolverCore(resolver, initReq); state Future<Void> core = resolverCore(resolver, initReq);
loop choose { loop choose {

View File

@ -162,7 +162,7 @@ public:
ProtocolVersion logProtocol; ProtocolVersion logProtocol;
Reference<ILogSystem> logSystem; Reference<ILogSystem> logSystem;
Key ck; // cacheKey Key ck; // cacheKey
Reference<AsyncVar<ServerDBInfo>> const& db; Reference<AsyncVar<ServerDBInfo> const> db;
Database cx; Database cx;
StorageCacheUpdater* updater; StorageCacheUpdater* updater;
@ -238,7 +238,7 @@ public:
} }
} counters; } counters;
explicit StorageCacheData(UID thisServerID, uint16_t index, Reference<AsyncVar<ServerDBInfo>> const& db) explicit StorageCacheData(UID thisServerID, uint16_t index, Reference<AsyncVar<ServerDBInfo> const> const& db)
: /*versionedData(FastAllocPTree<KeyRef>{std::make_shared<int>(0)}), */ : /*versionedData(FastAllocPTree<KeyRef>{std::make_shared<int>(0)}), */
thisServerID(thisServerID), index(index), logProtocol(0), db(db), cacheRangeChangeCounter(0), thisServerID(thisServerID), index(index), logProtocol(0), db(db), cacheRangeChangeCounter(0),
lastTLogVersion(0), lastVersionWithData(0), peekVersion(0), compactionInProgress(Void()), lastTLogVersion(0), lastVersionWithData(0), peekVersion(0), compactionInProgress(Void()),
@ -2165,7 +2165,9 @@ ACTOR Future<Void> watchInterface(StorageCacheData* self, StorageServerInterface
} }
} }
ACTOR Future<Void> storageCacheServer(StorageServerInterface ssi, uint16_t id, Reference<AsyncVar<ServerDBInfo>> db) { ACTOR Future<Void> storageCacheServer(StorageServerInterface ssi,
uint16_t id,
Reference<AsyncVar<ServerDBInfo> const> db) {
state StorageCacheData self(ssi.id(), id, db); state StorageCacheData self(ssi.id(), id, db);
state ActorCollection actors(false); state ActorCollection actors(false);
state Future<Void> dbInfoChange = Void(); state Future<Void> dbInfoChange = Void();

View File

@ -329,7 +329,7 @@ struct TLogData : NonCopyable {
AsyncVar<bool> AsyncVar<bool>
largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES
Reference<AsyncVar<ServerDBInfo>> dbInfo; Reference<AsyncVar<ServerDBInfo> const> dbInfo;
Database cx; Database cx;
NotifiedVersion queueCommitEnd; NotifiedVersion queueCommitEnd;
@ -372,7 +372,7 @@ struct TLogData : NonCopyable {
UID workerID, UID workerID,
IKeyValueStore* persistentData, IKeyValueStore* persistentData,
IDiskQueue* persistentQueue, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<ServerDBInfo> const> dbInfo,
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<bool>> degraded,
std::string folder) std::string folder)
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()), : dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
@ -3280,7 +3280,7 @@ ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen
// New tLog (if !recoverFrom.size()) or restore from network // New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality, LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests, PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId, UID tlogId,

View File

@ -831,7 +831,7 @@ ACTOR Future<Void> traceRole(Role role, UID roleId);
struct ServerDBInfo; struct ServerDBInfo;
class Database openDBOnServer(Reference<AsyncVar<ServerDBInfo>> const& db, class Database openDBOnServer(Reference<AsyncVar<ServerDBInfo> const> const& db,
TaskPriority taskID = TaskPriority::DefaultEndpoint, TaskPriority taskID = TaskPriority::DefaultEndpoint,
LockAware = LockAware::False, LockAware = LockAware::False,
EnableLocalityLoadBalance = EnableLocalityLoadBalance::True); EnableLocalityLoadBalance = EnableLocalityLoadBalance::True);
@ -868,32 +868,32 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
Tag seedTag, Tag seedTag,
Version tssSeedVersion, Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply, ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder); std::string folder);
ACTOR Future<Void> storageServer( ACTOR Future<Void> storageServer(
IKeyValueStore* persistentData, IKeyValueStore* persistentData,
StorageServerInterface ssi, StorageServerInterface ssi,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder, std::string folder,
Promise<Void> recovered, Promise<Void> recovered,
Reference<ClusterConnectionFile> Reference<ClusterConnectionFile>
connFile); // changes pssi->id() to be the recovered ID); // changes pssi->id() to be the recovered ID connFile); // changes pssi->id() to be the recovered ID); // changes pssi->id() to be the recovered ID
ACTOR Future<Void> masterServer(MasterInterface mi, ACTOR Future<Void> masterServer(MasterInterface mi,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
ServerCoordinators serverCoordinators, ServerCoordinators serverCoordinators,
LifetimeToken lifetime, LifetimeToken lifetime,
bool forceRecovery); bool forceRecovery);
ACTOR Future<Void> commitProxyServer(CommitProxyInterface proxy, ACTOR Future<Void> commitProxyServer(CommitProxyInterface proxy,
InitializeCommitProxyRequest req, InitializeCommitProxyRequest req,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
std::string whitelistBinPaths); std::string whitelistBinPaths);
ACTOR Future<Void> grvProxyServer(GrvProxyInterface proxy, ACTOR Future<Void> grvProxyServer(GrvProxyInterface proxy,
InitializeGrvProxyRequest req, InitializeGrvProxyRequest req,
Reference<AsyncVar<ServerDBInfo>> db); Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality, LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests, PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId, UID tlogId,
@ -906,14 +906,18 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
Reference<AsyncVar<UID>> activeSharedTLog); Reference<AsyncVar<UID>> activeSharedTLog);
ACTOR Future<Void> resolver(ResolverInterface resolver, ACTOR Future<Void> resolver(ResolverInterface resolver,
InitializeResolverRequest initReq, InitializeResolverRequest initReq,
Reference<AsyncVar<ServerDBInfo>> db); Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> logRouter(TLogInterface interf, ACTOR Future<Void> logRouter(TLogInterface interf,
InitializeLogRouterRequest req, InitializeLogRouterRequest req,
Reference<AsyncVar<ServerDBInfo>> db); Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> dataDistributor(DataDistributorInterface ddi, Reference<AsyncVar<ServerDBInfo>> db); ACTOR Future<Void> dataDistributor(DataDistributorInterface ddi, Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> ratekeeper(RatekeeperInterface rki, Reference<AsyncVar<ServerDBInfo>> db); ACTOR Future<Void> ratekeeper(RatekeeperInterface rki, Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> storageCacheServer(StorageServerInterface interf, uint16_t id, Reference<AsyncVar<ServerDBInfo>> db); ACTOR Future<Void> storageCacheServer(StorageServerInterface interf,
ACTOR Future<Void> backupWorker(BackupInterface bi, InitializeBackupRequest req, Reference<AsyncVar<ServerDBInfo>> db); uint16_t id,
Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> backupWorker(BackupInterface bi,
InitializeBackupRequest req,
Reference<AsyncVar<ServerDBInfo> const> db);
void registerThreadForProfiling(); void registerThreadForProfiling();
void updateCpuProfiler(ProfilerRequest req); void updateCpuProfiler(ProfilerRequest req);
@ -921,7 +925,7 @@ void updateCpuProfiler(ProfilerRequest req);
namespace oldTLog_4_6 { namespace oldTLog_4_6 {
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality, LocalityData locality,
UID tlogId, UID tlogId,
UID workerID); UID workerID);
@ -929,7 +933,7 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
namespace oldTLog_6_0 { namespace oldTLog_6_0 {
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality, LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests, PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId, UID tlogId,
@ -944,7 +948,7 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
namespace oldTLog_6_2 { namespace oldTLog_6_2 {
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality, LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests, PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId, UID tlogId,

View File

@ -228,7 +228,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
ReusableCoordinatedState cstate; ReusableCoordinatedState cstate;
Promise<Void> cstateUpdated; Promise<Void> cstateUpdated;
Reference<AsyncVar<ServerDBInfo>> dbInfo; Reference<AsyncVar<ServerDBInfo> const> dbInfo;
int64_t registrationCount; // Number of different MasterRegistrationRequests sent to clusterController int64_t registrationCount; // Number of different MasterRegistrationRequests sent to clusterController
RecoveryState recoveryState; RecoveryState recoveryState;
@ -255,7 +255,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
Future<Void> logger; Future<Void> logger;
MasterData(Reference<AsyncVar<ServerDBInfo>> const& dbInfo, MasterData(Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
MasterInterface const& myInterface, MasterInterface const& myInterface,
ServerCoordinators const& coordinators, ServerCoordinators const& coordinators,
ClusterControllerFullInterface const& clusterController, ClusterControllerFullInterface const& clusterController,
@ -1978,7 +1978,7 @@ ACTOR Future<Void> masterCore(Reference<MasterData> self) {
} }
ACTOR Future<Void> masterServer(MasterInterface mi, ACTOR Future<Void> masterServer(MasterInterface mi,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
ServerCoordinators coordinators, ServerCoordinators coordinators,
LifetimeToken lifetime, LifetimeToken lifetime,

View File

@ -614,7 +614,7 @@ public:
bool tssInQuarantine; bool tssInQuarantine;
Key sk; Key sk;
Reference<AsyncVar<ServerDBInfo>> db; Reference<AsyncVar<ServerDBInfo> const> db;
Database cx; Database cx;
ActorCollection actors; ActorCollection actors;
@ -806,7 +806,7 @@ public:
} counters; } counters;
StorageServer(IKeyValueStore* storage, StorageServer(IKeyValueStore* storage,
Reference<AsyncVar<ServerDBInfo>> const& db, Reference<AsyncVar<ServerDBInfo> const> const& db,
StorageServerInterface const& ssi) StorageServerInterface const& ssi)
: fetchKeysHistograms(), instanceID(deterministicRandom()->randomUniqueID().first()), storage(this, storage), : fetchKeysHistograms(), instanceID(deterministicRandom()->randomUniqueID().first()), storage(this, storage),
db(db), actors(false), lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0), db(db), actors(false), lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
@ -5134,7 +5134,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
Tag seedTag, Tag seedTag,
Version tssSeedVersion, Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply, ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder) { std::string folder) {
state StorageServer self(persistentData, db, ssi); state StorageServer self(persistentData, db, ssi);
if (ssi.isTss()) { if (ssi.isTss()) {
@ -5328,7 +5328,7 @@ ACTOR Future<Void> replaceTSSInterface(StorageServer* self, StorageServerInterfa
// for recovering an existing storage server // for recovering an existing storage server
ACTOR Future<Void> storageServer(IKeyValueStore* persistentData, ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
StorageServerInterface ssi, StorageServerInterface ssi,
Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder, std::string folder,
Promise<Void> recovered, Promise<Void> recovered,
Reference<ClusterConnectionFile> connFile) { Reference<ClusterConnectionFile> connFile) {

View File

@ -122,7 +122,7 @@ ACTOR Future<std::vector<Endpoint>> broadcastDBInfoRequest(UpdateServerDBInfoReq
return notUpdated; return notUpdated;
} }
ACTOR static Future<Void> extractClientInfo(Reference<AsyncVar<ServerDBInfo>> db, ACTOR static Future<Void> extractClientInfo(Reference<AsyncVar<ServerDBInfo> const> db,
Reference<AsyncVar<ClientDBInfo>> info) { Reference<AsyncVar<ClientDBInfo>> info) {
state std::vector<UID> lastCommitProxyUIDs; state std::vector<UID> lastCommitProxyUIDs;
state std::vector<CommitProxyInterface> lastCommitProxies; state std::vector<CommitProxyInterface> lastCommitProxies;
@ -136,7 +136,7 @@ ACTOR static Future<Void> extractClientInfo(Reference<AsyncVar<ServerDBInfo>> db
} }
} }
Database openDBOnServer(Reference<AsyncVar<ServerDBInfo>> const& db, Database openDBOnServer(Reference<AsyncVar<ServerDBInfo> const> const& db,
TaskPriority taskID, TaskPriority taskID,
LockAware lockAware, LockAware lockAware,
EnableLocalityLoadBalance enableLocalityLoadBalance) { EnableLocalityLoadBalance enableLocalityLoadBalance) {
@ -502,15 +502,15 @@ std::vector<DiskStore> getDiskStores(std::string folder) {
// Register the worker interf to cluster controller (cc) and // Register the worker interf to cluster controller (cc) and
// re-register the worker when key roles interface, e.g., cc, dd, ratekeeper, change. // re-register the worker when key roles interface, e.g., cc, dd, ratekeeper, change.
ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
WorkerInterface interf, WorkerInterface interf,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
ProcessClass initialClass, ProcessClass initialClass,
Reference<AsyncVar<Optional<DataDistributorInterface>>> ddInterf, Reference<AsyncVar<Optional<DataDistributorInterface>> const> ddInterf,
Reference<AsyncVar<Optional<RatekeeperInterface>>> rkInterf, Reference<AsyncVar<Optional<RatekeeperInterface>> const> rkInterf,
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<bool> const> degraded,
Reference<ClusterConnectionFile> connFile, Reference<ClusterConnectionFile> connFile,
Reference<AsyncVar<std::set<std::string>>> issues) { Reference<AsyncVar<std::set<std::string>> const> issues) {
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists // Keeps the cluster controller (as it may be re-elected) informed that this worker exists
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply // The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply
// (requiring us to re-register) The registration request piggybacks optional distributor interface if it exists. // (requiring us to re-register) The registration request piggybacks optional distributor interface if it exists.
@ -2303,10 +2303,9 @@ ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> connFile,
auto dbInfo = makeReference<AsyncVar<ServerDBInfo>>(); auto dbInfo = makeReference<AsyncVar<ServerDBInfo>>();
if (useConfigDB != UseConfigDB::DISABLED) { if (useConfigDB != UseConfigDB::DISABLED) {
actors.push_back( actors.push_back(reportErrors(localConfig.consume(IAsyncListener<ConfigBroadcastFollowerInterface>::create(
reportErrors(localConfig.consume(IDependentAsyncVar<ConfigBroadcastFollowerInterface>::create( dbInfo, [](auto const& info) { return info.configBroadcaster; })),
dbInfo, [](auto const& info) { return info.configBroadcaster; })), "LocalConfiguration"));
"LocalConfiguration"));
} }
actors.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo), actors.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo),
"MonitorAndWriteCCPriorityInfo")); "MonitorAndWriteCCPriorityInfo"));

View File

@ -222,7 +222,7 @@ double testKeyToDouble(const KeyRef& p, const KeyRef& prefix);
ACTOR Future<Void> databaseWarmer(Database cx); ACTOR Future<Void> databaseWarmer(Database cx);
Future<Void> quietDatabase(Database const& cx, Future<Void> quietDatabase(Database const& cx,
Reference<AsyncVar<struct ServerDBInfo>> const&, Reference<AsyncVar<struct ServerDBInfo> const> const&,
std::string phase, std::string phase,
int64_t dataInFlightGate = 2e6, int64_t dataInFlightGate = 2e6,
int64_t maxTLogQueueGate = 5e6, int64_t maxTLogQueueGate = 5e6,

View File

@ -158,7 +158,7 @@ ACTOR Future<Void> testPublisher(Reference<AsyncVar<DummyState>> input) {
return Void(); return Void();
} }
ACTOR Future<Void> testSubscriber(Reference<IDependentAsyncVar<int>> output, Optional<int> expected) { ACTOR Future<Void> testSubscriber(Reference<IAsyncListener<int>> output, Optional<int> expected) {
loop { loop {
wait(output->onChange()); wait(output->onChange());
ASSERT(expected.present()); ASSERT(expected.present());
@ -170,12 +170,12 @@ ACTOR Future<Void> testSubscriber(Reference<IDependentAsyncVar<int>> output, Opt
} // namespace } // namespace
TEST_CASE("/flow/genericactors/DependentAsyncVar") { TEST_CASE("/flow/genericactors/AsyncListener") {
auto input = makeReference<AsyncVar<DummyState>>(); auto input = makeReference<AsyncVar<DummyState>>();
state Future<Void> subscriber1 = state Future<Void> subscriber1 =
testSubscriber(IDependentAsyncVar<int>::create(input, [](auto const& var) { return var.changed; }), 100); testSubscriber(IAsyncListener<int>::create(input, [](auto const& var) { return var.changed; }), 100);
state Future<Void> subscriber2 = state Future<Void> subscriber2 =
testSubscriber(IDependentAsyncVar<int>::create(input, [](auto const& var) { return var.unchanged; }), {}); testSubscriber(IAsyncListener<int>::create(input, [](auto const& var) { return var.unchanged; }), {});
wait(subscriber1 && testPublisher(input)); wait(subscriber1 && testPublisher(input));
ASSERT(!subscriber2.isReady()); ASSERT(!subscriber2.isReady());
return Void(); return Void();

View File

@ -690,7 +690,7 @@ public:
AsyncTrigger() {} AsyncTrigger() {}
AsyncTrigger(AsyncTrigger&& at) : v(std::move(at.v)) {} AsyncTrigger(AsyncTrigger&& at) : v(std::move(at.v)) {}
void operator=(AsyncTrigger&& at) { v = std::move(at.v); } void operator=(AsyncTrigger&& at) { v = std::move(at.v); }
Future<Void> onTrigger() { return v.onChange(); } Future<Void> onTrigger() const { return v.onChange(); }
void trigger() { v.trigger(); } void trigger() { v.trigger(); }
private: private:
@ -700,7 +700,7 @@ private:
// Binds an AsyncTrigger object to an AsyncVar, so when the AsyncVar changes // Binds an AsyncTrigger object to an AsyncVar, so when the AsyncVar changes
// the AsyncTrigger is triggered. // the AsyncTrigger is triggered.
ACTOR template <class T> ACTOR template <class T>
void forward(Reference<AsyncVar<T>> from, AsyncTrigger* to) { void forward(Reference<AsyncVar<T> const> from, AsyncTrigger* to) {
loop { loop {
wait(from->onChange()); wait(from->onChange());
to->trigger(); to->trigger();
@ -1957,25 +1957,28 @@ Future<U> operator>>(Future<T> const& lhs, Future<U> const& rhs) {
} }
/* /*
* IDependentAsyncVar is similar to AsyncVar, but it decouples the input and output, so the translation unit * IAsyncListener is similar to AsyncVar, but it decouples the input and output, so the translation unit
* responsible for handling the output does not need to have knowledge of how the output is generated * responsible for handling the output does not need to have knowledge of how the output is generated
*/ */
template <class Output> template <class Output>
class IDependentAsyncVar : public ReferenceCounted<IDependentAsyncVar<Output>> { class IAsyncListener : public ReferenceCounted<IAsyncListener<Output>> {
public: public:
virtual ~IDependentAsyncVar() = default; virtual ~IAsyncListener() = default;
virtual Output const& get() const = 0; virtual Output const& get() const = 0;
virtual Future<Void> onChange() const = 0; virtual Future<Void> onChange() const = 0;
template <class Input, class F> template <class Input, class F>
static Reference<IDependentAsyncVar> create(Reference<AsyncVar<Input>> const& input, F const& f); static Reference<IAsyncListener> create(Reference<AsyncVar<Input>> const& input, F const& f);
static Reference<IDependentAsyncVar> create(Reference<AsyncVar<Output>> const& output); static Reference<IAsyncListener> create(Reference<AsyncVar<Output>> const& output);
}; };
namespace IAsyncListenerImpl {
template <class Input, class Output, class F> template <class Input, class Output, class F>
class DependentAsyncVar final : public IDependentAsyncVar<Output> { class AsyncListener final : public IAsyncListener<Output> {
Reference<AsyncVar<Output>> output; // Order matters here, output must outlive monitorActor
AsyncVar<Output> output;
Future<Void> monitorActor; Future<Void> monitorActor;
ACTOR static Future<Void> monitor(Reference<AsyncVar<Input>> input, Reference<AsyncVar<Output>> output, F f) { ACTOR static Future<Void> monitor(Reference<AsyncVar<Input> const> input, AsyncVar<Output>* output, F f) {
loop { loop {
wait(input->onChange()); wait(input->onChange());
output->set(f(input->get())); output->set(f(input->get()));
@ -1983,23 +1986,24 @@ class DependentAsyncVar final : public IDependentAsyncVar<Output> {
} }
public: public:
DependentAsyncVar(Reference<AsyncVar<Input>> const& input, F const& f) AsyncListener(Reference<AsyncVar<Input> const> const& input, F const& f)
: output(makeReference<AsyncVar<Output>>(f(input->get()))), monitorActor(monitor(input, output, f)) {} : output(f(input->get())), monitorActor(monitor(input, &output, f)) {}
Output const& get() const override { return output->get(); } Output const& get() const override { return output.get(); }
Future<Void> onChange() const override { return output->onChange(); } Future<Void> onChange() const override { return output.onChange(); }
}; };
} // namespace IAsyncListenerImpl
template <class Output> template <class Output>
template <class Input, class F> template <class Input, class F>
Reference<IDependentAsyncVar<Output>> IDependentAsyncVar<Output>::create(Reference<AsyncVar<Input>> const& input, Reference<IAsyncListener<Output>> IAsyncListener<Output>::create(Reference<AsyncVar<Input>> const& input, F const& f) {
F const& f) { return makeReference<IAsyncListenerImpl::AsyncListener<Input, Output, F>>(input, f);
return makeReference<DependentAsyncVar<Input, Output, F>>(input, f);
} }
template <class Output> template <class Output>
Reference<IDependentAsyncVar<Output>> IDependentAsyncVar<Output>::create(Reference<AsyncVar<Output>> const& input) { Reference<IAsyncListener<Output>> IAsyncListener<Output>::create(Reference<AsyncVar<Output>> const& input) {
auto identity = [](const auto& x) { return x; }; auto identity = [](const auto& x) { return x; };
return makeReference<DependentAsyncVar<Output, Output, decltype(identity)>>(input, identity); return makeReference<IAsyncListenerImpl::AsyncListener<Output, Output, decltype(identity)>>(input, identity);
} }
// A weak reference type to wrap a future Reference<T> object. // A weak reference type to wrap a future Reference<T> object.