Merge pull request #5124 from sfc-gh-tclinkenbeard/const-serverdbinfo

Restrict mutable access to ServerDBInfo
This commit is contained in:
Markus Pilman 2021-07-21 14:41:21 -06:00 committed by GitHub
commit e4e109a520
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 176 additions and 160 deletions

View File

@ -196,7 +196,7 @@ public:
Reference<CommitProxyInfo> getCommitProxies(bool useProvisionalProxies);
Future<Reference<CommitProxyInfo>> getCommitProxiesFuture(bool useProvisionalProxies);
Reference<GrvProxyInfo> getGrvProxies(bool useProvisionalProxies);
Future<Void> onProxiesChanged();
Future<Void> onProxiesChanged() const;
Future<HealthMetrics> getHealthMetrics(bool detailed);
// Returns the protocol version reported by the coordinator this client is connected to
@ -255,7 +255,7 @@ public:
// private:
explicit DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile,
Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>> const> coordinator,
Future<Void> clientInfoMonitor,
TaskPriority taskID,
LocalityData const& clientLocality,
@ -307,7 +307,7 @@ public:
// trust that the read version (possibly set manually by the application) is actually from the correct cluster.
// Updated everytime we get a GRV response
Version minAcceptableReadVersion = std::numeric_limits<Version>::max();
void validateVersion(Version);
void validateVersion(Version) const;
// Client status updater
struct ClientStatusUpdater {
@ -399,7 +399,7 @@ public:
Future<Void> connected;
// An AsyncVar that reports the coordinator this DatabaseContext is interacting with
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator;
Reference<AsyncVar<Optional<ClientLeaderRegInterface>> const> coordinator;
Reference<AsyncVar<Optional<ClusterInterface>>> statusClusterInterface;
Future<Void> statusLeaderMon;
@ -428,7 +428,6 @@ public:
static bool debugUseTags;
static const std::vector<std::string> debugTransactionTagChoices;
std::unordered_map<KeyRef, Reference<WatchMetadata>> watchMap;
// Adds or updates the specified (SS, TSS) pair in the TSS mapping (if not already present).
// Requests to the storage server will be duplicated to the TSS.
@ -437,6 +436,9 @@ public:
// Removes the storage server and its TSS pair from the TSS mapping (if present).
// Requests to the storage server will no longer be duplicated to its pair TSS.
void removeTssMapping(StorageServerInterface const& ssi);
private:
std::unordered_map<KeyRef, Reference<WatchMetadata>> watchMap;
};
#endif

View File

@ -72,7 +72,7 @@ public:
// to allow global configuration to run transactions on the latest
// database.
template <class T>
static void create(Database& cx, Reference<AsyncVar<T>> db, const ClientDBInfo* dbInfo) {
static void create(Database& cx, Reference<AsyncVar<T> const> db, const ClientDBInfo* dbInfo) {
if (g_network->global(INetwork::enGlobalConfig) == nullptr) {
auto config = new GlobalConfig{ cx };
g_network->setGlobal(INetwork::enGlobalConfig, config);

View File

@ -49,7 +49,7 @@ struct ClientData {
OpenDatabaseRequest getRequest();
ClientData() : clientInfo(new AsyncVar<CachedSerialization<ClientDBInfo>>(CachedSerialization<ClientDBInfo>())) {}
ClientData() : clientInfo(makeReference<AsyncVar<CachedSerialization<ClientDBInfo>>>()) {}
};
struct MonitorLeaderInfo {

View File

@ -285,7 +285,7 @@ std::string unprintable(std::string const& val) {
return s;
}
void DatabaseContext::validateVersion(Version version) {
void DatabaseContext::validateVersion(Version version) const {
// Version could be 0 if the INITIALIZE_NEW_DATABASE option is set. In that case, it is illegal to perform any
// reads. We throw client_invalid_operation because the caller didn't directly set the version, so the
// version_invalid error might be confusing.
@ -650,7 +650,7 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext* cx) {
}
}
ACTOR static Future<Void> monitorProxiesChange(Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
ACTOR static Future<Void> monitorProxiesChange(Reference<AsyncVar<ClientDBInfo> const> clientDBInfo,
AsyncTrigger* triggerVar) {
state vector<CommitProxyInterface> curCommitProxies;
state vector<GrvProxyInterface> curGrvProxies;
@ -1085,7 +1085,7 @@ Future<RangeResult> HealthMetricsRangeImpl::getRange(ReadYourWritesTransaction*
DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile,
Reference<AsyncVar<ClientDBInfo>> clientInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>> const> coordinator,
Future<Void> clientInfoMonitor,
TaskPriority taskID,
LocalityData const& clientLocality,
@ -1482,7 +1482,7 @@ void DatabaseContext::invalidateCache(const KeyRangeRef& keys) {
locationCache.insert(KeyRangeRef(begin, end), Reference<LocationInfo>());
}
Future<Void> DatabaseContext::onProxiesChanged() {
Future<Void> DatabaseContext::onProxiesChanged() const {
return this->proxiesChangeTrigger.onTrigger();
}
@ -1759,7 +1759,8 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
}
auto database = Database(db);
GlobalConfig::create(database, clientInfo, std::addressof(clientInfo->get()));
GlobalConfig::create(
database, Reference<AsyncVar<ClientDBInfo> const>(clientInfo), std::addressof(clientInfo->get()));
return database;
}
@ -5760,7 +5761,7 @@ ACTOR Future<Optional<ProtocolVersion>> getCoordinatorProtocolFromConnectPacket(
NetworkAddress coordinatorAddress,
Optional<ProtocolVersion> expectedVersion) {
state Reference<AsyncVar<Optional<ProtocolVersion>>> protocolVersion =
state Reference<AsyncVar<Optional<ProtocolVersion>> const> protocolVersion =
FlowTransport::transport().getPeerProtocolAsyncVar(coordinatorAddress);
loop {
@ -5785,7 +5786,7 @@ ACTOR Future<Optional<ProtocolVersion>> getCoordinatorProtocolFromConnectPacket(
// Returns the protocol version reported by the given coordinator
// If an expected version is given, the future won't return until the protocol version is different than expected
ACTOR Future<ProtocolVersion> getClusterProtocolImpl(
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>> const> coordinator,
Optional<ProtocolVersion> expectedVersion) {
state bool needToConnect = true;

View File

@ -1698,7 +1698,7 @@ Reference<AsyncVar<bool>> FlowTransport::getDegraded() {
//
// Note that this function does not establish a connection to the peer. In order to obtain a peer's protocol
// version, some other mechanism should be used to connect to that peer.
Reference<AsyncVar<Optional<ProtocolVersion>>> FlowTransport::getPeerProtocolAsyncVar(NetworkAddress addr) {
Reference<AsyncVar<Optional<ProtocolVersion>> const> FlowTransport::getPeerProtocolAsyncVar(NetworkAddress addr) {
return self->peers.at(addr)->protocolVersion;
}
@ -1723,4 +1723,4 @@ void FlowTransport::createInstance(bool isClient, uint64_t transportId) {
HealthMonitor* FlowTransport::healthMonitor() {
return &self->healthMonitor;
}
}

View File

@ -252,7 +252,7 @@ public:
//
// Note that this function does not establish a connection to the peer. In order to obtain a peer's protocol
// version, some other mechanism should be used to connect to that peer.
Reference<AsyncVar<Optional<ProtocolVersion>>> getPeerProtocolAsyncVar(NetworkAddress addr);
Reference<AsyncVar<Optional<ProtocolVersion>> const> getPeerProtocolAsyncVar(NetworkAddress addr);
static FlowTransport& transport() {
return *static_cast<FlowTransport*>((void*)g_network->global(INetwork::enFlowTransport));

View File

@ -237,7 +237,7 @@ struct BackupData {
CounterCollection cc;
Future<Void> logger;
explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo>> db, const InitializeBackupRequest& req)
explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo> const> db, const InitializeBackupRequest& req)
: myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion),
endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch),
minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1),
@ -987,7 +987,7 @@ ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self, bool keyPresent)
}
}
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, LogEpoch recoveryCount, BackupData* self) {
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db, LogEpoch recoveryCount, BackupData* self) {
loop {
bool isDisplaced =
db->get().recoveryCount > recoveryCount && db->get().recoveryState != RecoveryState::UNINITIALIZED;
@ -1033,7 +1033,7 @@ ACTOR static Future<Void> monitorWorkerPause(BackupData* self) {
ACTOR Future<Void> backupWorker(BackupInterface interf,
InitializeBackupRequest req,
Reference<AsyncVar<ServerDBInfo>> db) {
Reference<AsyncVar<ServerDBInfo> const> db) {
state BackupData self(interf.id(), db, req);
state PromiseStream<Future<Void>> addActor;
state Future<Void> error = actorCollection(addActor.getFuture());

View File

@ -1596,7 +1596,7 @@ ACTOR static Future<Void> rejoinServer(CommitProxyInterface proxy, ProxyCommitDa
}
}
ACTOR Future<Void> ddMetricsRequestServer(CommitProxyInterface proxy, Reference<AsyncVar<ServerDBInfo>> db) {
ACTOR Future<Void> ddMetricsRequestServer(CommitProxyInterface proxy, Reference<AsyncVar<ServerDBInfo> const> db) {
loop {
choose {
when(state GetDDMetricsRequest req = waitNext(proxy.getDDMetrics.getFuture())) {
@ -1754,7 +1754,8 @@ ACTOR Future<Void> proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* co
return Void();
}
ACTOR Future<Void> proxyCheckSafeExclusion(Reference<AsyncVar<ServerDBInfo>> db, ExclusionSafetyCheckRequest req) {
ACTOR Future<Void> proxyCheckSafeExclusion(Reference<AsyncVar<ServerDBInfo> const> db,
ExclusionSafetyCheckRequest req) {
TraceEvent("SafetyCheckCommitProxyBegin");
state ExclusionSafetyCheckReply reply(false);
if (!db->get().distributor.present()) {
@ -1783,7 +1784,7 @@ ACTOR Future<Void> proxyCheckSafeExclusion(Reference<AsyncVar<ServerDBInfo>> db,
}
ACTOR Future<Void> reportTxnTagCommitCost(UID myID,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
UIDTransactionTagMap<TransactionCommitCostEstimation>* ssTrTagCommitCost) {
state Future<Void> nextRequestTimer = Never();
state Future<Void> nextReply = Never();
@ -1818,7 +1819,7 @@ ACTOR Future<Void> reportTxnTagCommitCost(UID myID,
ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
MasterInterface master,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LogEpoch epoch,
Version recoveryTransactionVersion,
bool firstProxy,
@ -2037,7 +2038,7 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
}
}
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db,
uint64_t recoveryCount,
CommitProxyInterface myInterface) {
loop {
@ -2051,7 +2052,7 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> commitProxyServer(CommitProxyInterface proxy,
InitializeCommitProxyRequest req,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string whitelistBinPaths) {
try {
state Future<Void> core = commitProxyServerCore(proxy,

View File

@ -126,7 +126,7 @@ class ReadFromLocalConfigEnvironment {
UID id;
std::string dataDir;
LocalConfiguration localConfiguration;
Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> cbfi;
Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> cbfi;
Future<Void> consumer;
ACTOR static Future<Void> checkEventually(LocalConfiguration const* localConfiguration,
@ -168,7 +168,7 @@ public:
return setup();
}
void connectToBroadcaster(Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> const& cbfi) {
void connectToBroadcaster(Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> const& cbfi) {
ASSERT(!this->cbfi);
this->cbfi = cbfi;
consumer = localConfiguration.consume(cbfi);
@ -228,7 +228,7 @@ class BroadcasterToLocalConfigEnvironment {
ACTOR static Future<Void> setup(BroadcasterToLocalConfigEnvironment* self) {
wait(self->readFrom.setup());
self->readFrom.connectToBroadcaster(IDependentAsyncVar<ConfigBroadcastFollowerInterface>::create(self->cbfi));
self->readFrom.connectToBroadcaster(IAsyncListener<ConfigBroadcastFollowerInterface>::create(self->cbfi));
self->broadcastServer = self->broadcaster.serve(self->cbfi->get());
return Void();
}
@ -364,7 +364,7 @@ class TransactionToLocalConfigEnvironment {
ACTOR static Future<Void> setup(TransactionToLocalConfigEnvironment* self) {
wait(self->readFrom.setup());
self->readFrom.connectToBroadcaster(IDependentAsyncVar<ConfigBroadcastFollowerInterface>::create(self->cbfi));
self->readFrom.connectToBroadcaster(IAsyncListener<ConfigBroadcastFollowerInterface>::create(self->cbfi));
self->broadcastServer = self->broadcaster.serve(self->cbfi->get());
return Void();
}

View File

@ -5218,7 +5218,7 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self,
}
ACTOR Future<Void> storageRecruiter(DDTeamCollection* self,
Reference<AsyncVar<struct ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
const DDEnabledState* ddEnabledState) {
state Future<RecruitStorageReply> fCandidateWorker;
state RecruitStorageRequest lastRequest;
@ -5490,7 +5490,7 @@ ACTOR Future<Void> serverGetTeamRequests(TeamCollectionInterface tci, DDTeamColl
}
}
ACTOR Future<Void> remoteRecovered(Reference<AsyncVar<struct ServerDBInfo>> db) {
ACTOR Future<Void> remoteRecovered(Reference<AsyncVar<ServerDBInfo> const> db) {
TraceEvent("DDTrackerStarting");
while (db->get().recoveryState < RecoveryState::ALL_LOGS_RECRUITED) {
TraceEvent("DDTrackerStarting").detail("RecoveryState", (int)db->get().recoveryState);
@ -5516,8 +5516,8 @@ ACTOR Future<Void> monitorHealthyTeams(DDTeamCollection* self) {
ACTOR Future<Void> dataDistributionTeamCollection(Reference<DDTeamCollection> teamCollection,
Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci,
Reference<AsyncVar<struct ServerDBInfo>> db,
const DDEnabledState* ddEnabledState) {
Reference<AsyncVar<ServerDBInfo> const> db,
DDEnabledState const* ddEnabledState) {
state DDTeamCollection* self = teamCollection.getPtr();
state Future<Void> loggingTrigger = Void();
state PromiseStream<Void> serverRemoved;
@ -5744,16 +5744,16 @@ ACTOR Future<Void> pollMoveKeysLock(Database cx, MoveKeysLock lock, const DDEnab
}
struct DataDistributorData : NonCopyable, ReferenceCounted<DataDistributorData> {
Reference<AsyncVar<struct ServerDBInfo>> dbInfo;
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
UID ddId;
PromiseStream<Future<Void>> addActor;
DDTeamCollection* teamCollection;
DataDistributorData(Reference<AsyncVar<ServerDBInfo>> const& db, UID id)
DataDistributorData(Reference<AsyncVar<ServerDBInfo> const> const& db, UID id)
: dbInfo(db), ddId(id), teamCollection(nullptr) {}
};
ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo>> db, double* lastLimited) {
ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo> const> db, double* lastLimited) {
loop {
wait(delay(SERVER_KNOBS->METRIC_UPDATE_RATE));
@ -6121,7 +6121,7 @@ static std::set<int> const& normalDataDistributorErrors() {
return s;
}
ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<AsyncVar<struct ServerDBInfo>> db) {
ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<AsyncVar<ServerDBInfo> const> db) {
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True);
state ReadYourWritesTransaction tr(cx);
loop {
@ -6265,7 +6265,7 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
}
ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq,
Reference<AsyncVar<struct ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
DDEnabledState* ddEnabledState) {
state Future<Void> dbInfoChange = db->onChange();
if (!ddEnabledState->setDDEnabled(false, snapReq.snapUID)) {
@ -6459,7 +6459,7 @@ ACTOR Future<Void> ddGetMetrics(GetDataDistributorMetricsRequest req,
return Void();
}
ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<struct ServerDBInfo>> db) {
ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<ServerDBInfo> const> db) {
state Reference<DataDistributorData> self(new DataDistributorData(db, di.id()));
state Future<Void> collection = actorCollection(self->addActor.getFuture());
state PromiseStream<GetMetricsListRequest> getShardMetricsList;

View File

@ -222,7 +222,7 @@ struct GrvProxyData {
Reference<ILogSystem> logSystem;
Database cx;
Reference<AsyncVar<ServerDBInfo>> db;
Reference<AsyncVar<ServerDBInfo> const> db;
Optional<LatencyBandConfig> latencyBandConfig;
double lastStartCommit;
@ -251,7 +251,7 @@ struct GrvProxyData {
GrvProxyData(UID dbgid,
MasterInterface master,
RequestStream<GetReadVersionRequest> getConsistentReadVersion,
Reference<AsyncVar<ServerDBInfo>> db)
Reference<AsyncVar<ServerDBInfo> const> db)
: dbgid(dbgid), stats(dbgid), master(master), getConsistentReadVersion(getConsistentReadVersion),
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True)), db(db), lastStartCommit(0),
lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), updateCommitRequests(0), lastCommitTime(0),
@ -275,7 +275,7 @@ ACTOR Future<Void> healthMetricsRequestServer(GrvProxyInterface grvProxy,
// Get transaction rate info from RateKeeper.
ACTOR Future<Void> getRate(UID myID,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
int64_t* inTransactionCount,
int64_t* inBatchTransactionCount,
GrvTransactionRateInfo* transactionRateInfo,
@ -375,7 +375,7 @@ void dropRequestFromQueue(Deque<GetReadVersionRequest>* queue, GrvProxyStats* st
}
// Put a GetReadVersion request into the queue corresponding to its priority.
ACTOR Future<Void> queueGetReadVersionRequests(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> queueGetReadVersionRequests(Reference<AsyncVar<ServerDBInfo> const> db,
SpannedDeque<GetReadVersionRequest>* systemQueue,
SpannedDeque<GetReadVersionRequest>* defaultQueue,
SpannedDeque<GetReadVersionRequest>* batchQueue,
@ -634,7 +634,7 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture,
return Void();
}
ACTOR Future<Void> monitorDDMetricsChanges(int64_t* midShardSize, Reference<AsyncVar<ServerDBInfo>> db) {
ACTOR Future<Void> monitorDDMetricsChanges(int64_t* midShardSize, Reference<AsyncVar<ServerDBInfo> const> db) {
state Future<Void> nextRequestTimer = Never();
state Future<GetDataDistributorMetricsReply> nextReply = Never();
@ -680,7 +680,7 @@ ACTOR Future<Void> monitorDDMetricsChanges(int64_t* midShardSize, Reference<Asyn
}
ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
PromiseStream<Future<Void>> addActor,
GrvProxyData* grvProxyData,
GetHealthMetricsReply* healthMetricsReply,
@ -898,7 +898,7 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
ACTOR Future<Void> grvProxyServerCore(GrvProxyInterface proxy,
MasterInterface master,
Reference<AsyncVar<ServerDBInfo>> db) {
Reference<AsyncVar<ServerDBInfo> const> db) {
state GrvProxyData grvProxyData(proxy.id(), master, proxy.getConsistentReadVersion, db);
state PromiseStream<Future<Void>> addActor;
@ -945,7 +945,7 @@ ACTOR Future<Void> grvProxyServerCore(GrvProxyInterface proxy,
}
}
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db,
uint64_t recoveryCount,
GrvProxyInterface myInterface) {
loop {
@ -959,7 +959,7 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> grvProxyServer(GrvProxyInterface proxy,
InitializeGrvProxyRequest req,
Reference<AsyncVar<ServerDBInfo>> db) {
Reference<AsyncVar<ServerDBInfo> const> db) {
try {
state Future<Void> core = grvProxyServerCore(proxy, req.master, db);
wait(core || checkRemoved(db, req.recoveryCount, proxy));

View File

@ -309,9 +309,8 @@ class LocalConfigurationImpl {
}
}
ACTOR static Future<Void> consume(
LocalConfigurationImpl* self,
Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> broadcaster) {
ACTOR static Future<Void> consume(LocalConfigurationImpl* self,
Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> broadcaster) {
ASSERT(self->initFuture.isValid() && self->initFuture.isReady());
loop {
choose {
@ -371,7 +370,7 @@ public:
return getKnobs().getTestKnobs();
}
Future<Void> consume(Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> const& broadcaster) {
Future<Void> consume(Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> const& broadcaster) {
return consume(this, broadcaster);
}
@ -453,7 +452,7 @@ TestKnobs const& LocalConfiguration::getTestKnobs() const {
}
Future<Void> LocalConfiguration::consume(
Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> const& broadcaster) {
Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> const& broadcaster) {
return impl().consume(broadcaster);
}

View File

@ -60,7 +60,7 @@ public:
ClientKnobs const& getClientKnobs() const;
ServerKnobs const& getServerKnobs() const;
TestKnobs const& getTestKnobs() const;
Future<Void> consume(Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> const& broadcaster);
Future<Void> consume(Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> const& broadcaster);
UID getID() const;
public: // Testing

View File

@ -625,7 +625,7 @@ ACTOR Future<Void> logRouterPop(LogRouterData* self, TLogPopRequest req) {
ACTOR Future<Void> logRouterCore(TLogInterface interf,
InitializeLogRouterRequest req,
Reference<AsyncVar<ServerDBInfo>> db) {
Reference<AsyncVar<ServerDBInfo> const> db) {
state LogRouterData logRouterData(interf.id(), req);
state PromiseStream<Future<Void>> addActor;
state Future<Void> error = actorCollection(addActor.getFuture());
@ -653,7 +653,7 @@ ACTOR Future<Void> logRouterCore(TLogInterface interf,
}
}
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db,
uint64_t recoveryCount,
TLogInterface myInterface) {
loop {
@ -670,7 +670,7 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> logRouter(TLogInterface interf,
InitializeLogRouterRequest req,
Reference<AsyncVar<ServerDBInfo>> db) {
Reference<AsyncVar<ServerDBInfo> const> db) {
try {
TraceEvent("LogRouterStart", interf.id())
.detail("Start", req.startVersion)

View File

@ -291,7 +291,7 @@ struct TLogData : NonCopyable {
AsyncVar<bool>
largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES
Reference<AsyncVar<ServerDBInfo>> dbInfo;
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
NotifiedVersion queueCommitEnd;
Version queueCommitBegin;
@ -321,7 +321,7 @@ struct TLogData : NonCopyable {
UID workerID,
IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> const& dbInfo)
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo)
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
persistentData(persistentData), rawPersistentQueue(persistentQueue),
persistentQueue(new TLogQueue(persistentQueue, dbgid)), dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0),
@ -1568,7 +1568,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, LocalityData locality)
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality,
UID tlogId,
UID workerID) {

View File

@ -264,7 +264,7 @@ struct TLogData : NonCopyable {
AsyncVar<bool>
largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES
Reference<AsyncVar<ServerDBInfo>> dbInfo;
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
Database cx;
NotifiedVersion queueCommitEnd;
@ -301,7 +301,7 @@ struct TLogData : NonCopyable {
UID workerID,
IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
Reference<AsyncVar<bool>> degraded,
std::string folder)
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
@ -2716,7 +2716,7 @@ ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen
// New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId,

View File

@ -327,7 +327,7 @@ struct TLogData : NonCopyable {
AsyncVar<bool>
largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES
Reference<AsyncVar<ServerDBInfo>> dbInfo;
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
Database cx;
NotifiedVersion queueCommitEnd;
@ -364,7 +364,7 @@ struct TLogData : NonCopyable {
UID workerID,
IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
Reference<AsyncVar<bool>> degraded,
std::string folder)
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
@ -3205,7 +3205,7 @@ ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen
// New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId,

View File

@ -161,7 +161,7 @@ struct ProxyCommitData {
RequestStream<GetReadVersionRequest> getConsistentReadVersion;
RequestStream<CommitTransactionRequest> commit;
Database cx;
Reference<AsyncVar<ServerDBInfo>> db;
Reference<AsyncVar<ServerDBInfo> const> db;
EventMetricHandle<SingleKeyMutation> singleKeyMutationEvent;
std::map<UID, Reference<StorageInfo>> storageCache;
@ -239,7 +239,7 @@ struct ProxyCommitData {
RequestStream<GetReadVersionRequest> getConsistentReadVersion,
Version recoveryTransactionVersion,
RequestStream<CommitTransactionRequest> commit,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
bool firstProxy)
: dbgid(dbgid), stats(dbgid, &version, &committedVersion, &commitBatchesMemBytesCount), master(master),
logAdapter(nullptr), txnStateStore(nullptr), popRemoteTxs(false), committedVersion(recoveryTransactionVersion),

View File

@ -35,7 +35,7 @@
#include <boost/lexical_cast.hpp>
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo>> dbInfo, int flags = 0) {
ACTOR Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo> const> dbInfo, int flags = 0) {
loop {
choose {
when(vector<WorkerDetails> w = wait(brokenPromiseToNever(
@ -48,7 +48,7 @@ ACTOR Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo>>
}
// Gets the WorkerInterface representing the Master server.
ACTOR Future<WorkerInterface> getMasterWorker(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<WorkerInterface> getMasterWorker(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
TraceEvent("GetMasterWorker").detail("Stage", "GettingWorkers");
loop {
@ -75,7 +75,7 @@ ACTOR Future<WorkerInterface> getMasterWorker(Database cx, Reference<AsyncVar<Se
}
// Gets the WorkerInterface representing the data distributor.
ACTOR Future<WorkerInterface> getDataDistributorWorker(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<WorkerInterface> getDataDistributorWorker(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
TraceEvent("GetDataDistributorWorker").detail("Stage", "GettingWorkers");
loop {
@ -118,7 +118,7 @@ ACTOR Future<int64_t> getDataInFlight(Database cx, WorkerInterface distributorWo
}
// Gets the number of bytes in flight from the data distributor.
ACTOR Future<int64_t> getDataInFlight(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<int64_t> getDataInFlight(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
WorkerInterface distributorInterf = wait(getDataDistributorWorker(cx, dbInfo));
int64_t dataInFlight = wait(getDataInFlight(cx, distributorInterf));
return dataInFlight;
@ -144,7 +144,7 @@ int64_t getPoppedVersionLag(const TraceEventFields& md) {
return persistentDataDurableVersion - queuePoppedVersion;
}
ACTOR Future<vector<WorkerInterface>> getCoordWorkers(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<vector<WorkerInterface>> getCoordWorkers(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
state std::vector<WorkerDetails> workers = wait(getWorkers(dbInfo));
Optional<Value> coordinators =
@ -177,7 +177,8 @@ ACTOR Future<vector<WorkerInterface>> getCoordWorkers(Database cx, Reference<Asy
}
// This is not robust in the face of a TLog failure
ACTOR Future<std::pair<int64_t, int64_t>> getTLogQueueInfo(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<std::pair<int64_t, int64_t>> getTLogQueueInfo(Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
TraceEvent("MaxTLogQueueSize").detail("Stage", "ContactingLogs");
state std::vector<WorkerDetails> workers = wait(getWorkers(dbInfo));
@ -245,7 +246,7 @@ ACTOR Future<vector<StorageServerInterface>> getStorageServers(Database cx, bool
}
ACTOR Future<vector<WorkerInterface>> getStorageWorkers(Database cx,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
bool localOnly) {
state std::vector<StorageServerInterface> servers = wait(getStorageServers(cx));
state std::map<NetworkAddress, WorkerInterface> workersMap;
@ -335,7 +336,7 @@ ACTOR Future<TraceEventFields> getStorageMetricsTimeout(UID storage, WorkerInter
};
// Gets the maximum size of all the storage server queues
ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ContactingStorageServers");
Future<std::vector<StorageServerInterface>> serversFuture = getStorageServers(cx);
@ -399,7 +400,7 @@ ACTOR Future<int64_t> getDataDistributionQueueSize(Database cx,
// Gets the size of the data distribution queue. If reportInFlight is true, then data in flight is considered part of
// the queue Convenience method that first finds the master worker from a zookeeper interface
ACTOR Future<int64_t> getDataDistributionQueueSize(Database cx,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
bool reportInFlight) {
WorkerInterface distributorInterf = wait(getDataDistributorWorker(cx, dbInfo));
int64_t inQueue = wait(getDataDistributionQueueSize(cx, distributorInterf, reportInFlight));
@ -516,7 +517,7 @@ ACTOR Future<bool> getTeamCollectionValid(Database cx, WorkerInterface dataDistr
// Gets if the number of process and machine teams does not exceed the maximum allowed number of teams
// Convenience method that first finds the master worker from a zookeeper interface
ACTOR Future<bool> getTeamCollectionValid(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<bool> getTeamCollectionValid(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
WorkerInterface dataDistributorWorker = wait(getDataDistributorWorker(cx, dbInfo));
bool valid = wait(getTeamCollectionValid(cx, dataDistributorWorker));
return valid;
@ -565,7 +566,9 @@ ACTOR Future<bool> getStorageServersRecruiting(Database cx, WorkerInterface dist
}
}
ACTOR Future<Void> repairDeadDatacenter(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo, std::string context) {
ACTOR Future<Void> repairDeadDatacenter(Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string context) {
if (g_network->isSimulated() && g_simulator.usableRegions > 1) {
bool primaryDead = g_simulator.datacenterDead(g_simulator.primaryDcId);
bool remoteDead = g_simulator.datacenterDead(g_simulator.remoteDcId);
@ -601,7 +604,7 @@ ACTOR Future<Void> repairDeadDatacenter(Database cx, Reference<AsyncVar<ServerDB
ACTOR Future<Void> reconfigureAfter(Database cx,
double time,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string context) {
wait(delay(time));
wait(repairDeadDatacenter(cx, dbInfo, context));
@ -611,7 +614,7 @@ ACTOR Future<Void> reconfigureAfter(Database cx,
// Waits until a database quiets down (no data in flight, small tlog queue, low SQ, no active data distribution). This
// requires the database to be available and healthy in order to succeed.
ACTOR Future<Void> waitForQuietDatabase(Database cx,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string phase,
int64_t dataInFlightGate = 2e6,
int64_t maxTLogQueueGate = 5e6,
@ -747,7 +750,7 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
}
Future<Void> quietDatabase(Database const& cx,
Reference<AsyncVar<ServerDBInfo>> const& dbInfo,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
std::string phase,
int64_t dataInFlightGate,
int64_t maxTLogQueueGate,

View File

@ -28,25 +28,26 @@
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/actorcompiler.h"
Future<int64_t> getDataInFlight(Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const&);
Future<int64_t> getDataInFlight(Database const& cx, Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<std::pair<int64_t, int64_t>> getTLogQueueInfo(Database const& cx,
Reference<AsyncVar<struct ServerDBInfo>> const&);
Future<int64_t> getMaxStorageServerQueueSize(Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const&);
Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<int64_t> getMaxStorageServerQueueSize(Database const& cx, Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<int64_t> getDataDistributionQueueSize(Database const& cx,
Reference<AsyncVar<struct ServerDBInfo>> const&,
Reference<AsyncVar<struct ServerDBInfo> const> const&,
bool const& reportInFlight);
Future<bool> getTeamCollectionValid(Database const& cx, WorkerInterface const&);
Future<bool> getTeamCollectionValid(Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const&);
Future<bool> getTeamCollectionValid(Database const& cx, Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<vector<StorageServerInterface>> getStorageServers(Database const& cx, bool const& use_system_priority = false);
Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo>> const& dbInfo, int const& flags = 0);
Future<WorkerInterface> getMasterWorker(Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo);
Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo> const> const& dbInfo, int const& flags = 0);
Future<WorkerInterface> getMasterWorker(Database const& cx, Reference<AsyncVar<ServerDBInfo> const> const& dbInfo);
Future<Void> repairDeadDatacenter(Database const& cx,
Reference<AsyncVar<ServerDBInfo>> const& dbInfo,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
std::string const& context);
Future<vector<WorkerInterface>> getStorageWorkers(Database const& cx,
Reference<AsyncVar<ServerDBInfo>> const& dbInfo,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
bool const& localOnly);
Future<vector<WorkerInterface>> getCoordWorkers(Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo);
Future<vector<WorkerInterface>> getCoordWorkers(Database const& cx,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo);
#include "flow/unactorcompiler.h"
#endif

View File

@ -1408,7 +1408,7 @@ ACTOR Future<Void> configurationMonitor(RatekeeperData* self) {
}
}
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
state RatekeeperData self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True));
state Future<Void> timeout = Void();
state std::vector<Future<Void>> tlogTrackers;

View File

@ -354,7 +354,7 @@ ACTOR Future<Void> resolverCore(ResolverInterface resolver, InitializeResolverRe
}
}
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db,
uint64_t recoveryCount,
ResolverInterface myInterface) {
loop {
@ -367,7 +367,7 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> resolver(ResolverInterface resolver,
InitializeResolverRequest initReq,
Reference<AsyncVar<ServerDBInfo>> db) {
Reference<AsyncVar<ServerDBInfo> const> db) {
try {
state Future<Void> core = resolverCore(resolver, initReq);
loop choose {

View File

@ -162,7 +162,7 @@ public:
ProtocolVersion logProtocol;
Reference<ILogSystem> logSystem;
Key ck; // cacheKey
Reference<AsyncVar<ServerDBInfo>> const& db;
Reference<AsyncVar<ServerDBInfo> const> db;
Database cx;
StorageCacheUpdater* updater;
@ -238,7 +238,7 @@ public:
}
} counters;
explicit StorageCacheData(UID thisServerID, uint16_t index, Reference<AsyncVar<ServerDBInfo>> const& db)
explicit StorageCacheData(UID thisServerID, uint16_t index, Reference<AsyncVar<ServerDBInfo> const> const& db)
: /*versionedData(FastAllocPTree<KeyRef>{std::make_shared<int>(0)}), */
thisServerID(thisServerID), index(index), logProtocol(0), db(db), cacheRangeChangeCounter(0),
lastTLogVersion(0), lastVersionWithData(0), peekVersion(0), compactionInProgress(Void()),
@ -2165,7 +2165,9 @@ ACTOR Future<Void> watchInterface(StorageCacheData* self, StorageServerInterface
}
}
ACTOR Future<Void> storageCacheServer(StorageServerInterface ssi, uint16_t id, Reference<AsyncVar<ServerDBInfo>> db) {
ACTOR Future<Void> storageCacheServer(StorageServerInterface ssi,
uint16_t id,
Reference<AsyncVar<ServerDBInfo> const> db) {
state StorageCacheData self(ssi.id(), id, db);
state ActorCollection actors(false);
state Future<Void> dbInfoChange = Void();

View File

@ -329,7 +329,7 @@ struct TLogData : NonCopyable {
AsyncVar<bool>
largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES
Reference<AsyncVar<ServerDBInfo>> dbInfo;
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
Database cx;
NotifiedVersion queueCommitEnd;
@ -372,7 +372,7 @@ struct TLogData : NonCopyable {
UID workerID,
IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
Reference<AsyncVar<bool>> degraded,
std::string folder)
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
@ -3280,7 +3280,7 @@ ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen
// New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId,

View File

@ -831,7 +831,7 @@ ACTOR Future<Void> traceRole(Role role, UID roleId);
struct ServerDBInfo;
class Database openDBOnServer(Reference<AsyncVar<ServerDBInfo>> const& db,
class Database openDBOnServer(Reference<AsyncVar<ServerDBInfo> const> const& db,
TaskPriority taskID = TaskPriority::DefaultEndpoint,
LockAware = LockAware::False,
EnableLocalityLoadBalance = EnableLocalityLoadBalance::True);
@ -868,32 +868,32 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
Tag seedTag,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder);
ACTOR Future<Void> storageServer(
IKeyValueStore* persistentData,
StorageServerInterface ssi,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder,
Promise<Void> recovered,
Reference<ClusterConnectionFile>
connFile); // changes pssi->id() to be the recovered ID); // changes pssi->id() to be the recovered ID
ACTOR Future<Void> masterServer(MasterInterface mi,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
ServerCoordinators serverCoordinators,
LifetimeToken lifetime,
bool forceRecovery);
ACTOR Future<Void> commitProxyServer(CommitProxyInterface proxy,
InitializeCommitProxyRequest req,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string whitelistBinPaths);
ACTOR Future<Void> grvProxyServer(GrvProxyInterface proxy,
InitializeGrvProxyRequest req,
Reference<AsyncVar<ServerDBInfo>> db);
Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId,
@ -906,14 +906,18 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
Reference<AsyncVar<UID>> activeSharedTLog);
ACTOR Future<Void> resolver(ResolverInterface resolver,
InitializeResolverRequest initReq,
Reference<AsyncVar<ServerDBInfo>> db);
Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> logRouter(TLogInterface interf,
InitializeLogRouterRequest req,
Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> dataDistributor(DataDistributorInterface ddi, Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> ratekeeper(RatekeeperInterface rki, Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> storageCacheServer(StorageServerInterface interf, uint16_t id, Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> backupWorker(BackupInterface bi, InitializeBackupRequest req, Reference<AsyncVar<ServerDBInfo>> db);
Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> dataDistributor(DataDistributorInterface ddi, Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> ratekeeper(RatekeeperInterface rki, Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> storageCacheServer(StorageServerInterface interf,
uint16_t id,
Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> backupWorker(BackupInterface bi,
InitializeBackupRequest req,
Reference<AsyncVar<ServerDBInfo> const> db);
void registerThreadForProfiling();
void updateCpuProfiler(ProfilerRequest req);
@ -921,7 +925,7 @@ void updateCpuProfiler(ProfilerRequest req);
namespace oldTLog_4_6 {
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality,
UID tlogId,
UID workerID);
@ -929,7 +933,7 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
namespace oldTLog_6_0 {
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId,
@ -944,7 +948,7 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
namespace oldTLog_6_2 {
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId,

View File

@ -228,7 +228,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
ReusableCoordinatedState cstate;
Promise<Void> cstateUpdated;
Reference<AsyncVar<ServerDBInfo>> dbInfo;
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
int64_t registrationCount; // Number of different MasterRegistrationRequests sent to clusterController
RecoveryState recoveryState;
@ -255,7 +255,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
Future<Void> logger;
MasterData(Reference<AsyncVar<ServerDBInfo>> const& dbInfo,
MasterData(Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
MasterInterface const& myInterface,
ServerCoordinators const& coordinators,
ClusterControllerFullInterface const& clusterController,
@ -1978,7 +1978,7 @@ ACTOR Future<Void> masterCore(Reference<MasterData> self) {
}
ACTOR Future<Void> masterServer(MasterInterface mi,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
ServerCoordinators coordinators,
LifetimeToken lifetime,

View File

@ -614,7 +614,7 @@ public:
bool tssInQuarantine;
Key sk;
Reference<AsyncVar<ServerDBInfo>> db;
Reference<AsyncVar<ServerDBInfo> const> db;
Database cx;
ActorCollection actors;
@ -806,7 +806,7 @@ public:
} counters;
StorageServer(IKeyValueStore* storage,
Reference<AsyncVar<ServerDBInfo>> const& db,
Reference<AsyncVar<ServerDBInfo> const> const& db,
StorageServerInterface const& ssi)
: fetchKeysHistograms(), instanceID(deterministicRandom()->randomUniqueID().first()), storage(this, storage),
db(db), actors(false), lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
@ -5134,7 +5134,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
Tag seedTag,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder) {
state StorageServer self(persistentData, db, ssi);
if (ssi.isTss()) {
@ -5328,7 +5328,7 @@ ACTOR Future<Void> replaceTSSInterface(StorageServer* self, StorageServerInterfa
// for recovering an existing storage server
ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
StorageServerInterface ssi,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder,
Promise<Void> recovered,
Reference<ClusterConnectionFile> connFile) {

View File

@ -122,7 +122,7 @@ ACTOR Future<std::vector<Endpoint>> broadcastDBInfoRequest(UpdateServerDBInfoReq
return notUpdated;
}
ACTOR static Future<Void> extractClientInfo(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR static Future<Void> extractClientInfo(Reference<AsyncVar<ServerDBInfo> const> db,
Reference<AsyncVar<ClientDBInfo>> info) {
state std::vector<UID> lastCommitProxyUIDs;
state std::vector<CommitProxyInterface> lastCommitProxies;
@ -136,7 +136,7 @@ ACTOR static Future<Void> extractClientInfo(Reference<AsyncVar<ServerDBInfo>> db
}
}
Database openDBOnServer(Reference<AsyncVar<ServerDBInfo>> const& db,
Database openDBOnServer(Reference<AsyncVar<ServerDBInfo> const> const& db,
TaskPriority taskID,
LockAware lockAware,
EnableLocalityLoadBalance enableLocalityLoadBalance) {
@ -502,15 +502,15 @@ std::vector<DiskStore> getDiskStores(std::string folder) {
// Register the worker interf to cluster controller (cc) and
// re-register the worker when key roles interface, e.g., cc, dd, ratekeeper, change.
ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
WorkerInterface interf,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
ProcessClass initialClass,
Reference<AsyncVar<Optional<DataDistributorInterface>>> ddInterf,
Reference<AsyncVar<Optional<RatekeeperInterface>>> rkInterf,
Reference<AsyncVar<bool>> degraded,
Reference<AsyncVar<Optional<DataDistributorInterface>> const> ddInterf,
Reference<AsyncVar<Optional<RatekeeperInterface>> const> rkInterf,
Reference<AsyncVar<bool> const> degraded,
Reference<ClusterConnectionFile> connFile,
Reference<AsyncVar<std::set<std::string>>> issues) {
Reference<AsyncVar<std::set<std::string>> const> issues) {
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply
// (requiring us to re-register) The registration request piggybacks optional distributor interface if it exists.
@ -2303,10 +2303,9 @@ ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> connFile,
auto dbInfo = makeReference<AsyncVar<ServerDBInfo>>();
if (useConfigDB != UseConfigDB::DISABLED) {
actors.push_back(
reportErrors(localConfig.consume(IDependentAsyncVar<ConfigBroadcastFollowerInterface>::create(
dbInfo, [](auto const& info) { return info.configBroadcaster; })),
"LocalConfiguration"));
actors.push_back(reportErrors(localConfig.consume(IAsyncListener<ConfigBroadcastFollowerInterface>::create(
dbInfo, [](auto const& info) { return info.configBroadcaster; })),
"LocalConfiguration"));
}
actors.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo),
"MonitorAndWriteCCPriorityInfo"));

View File

@ -222,7 +222,7 @@ double testKeyToDouble(const KeyRef& p, const KeyRef& prefix);
ACTOR Future<Void> databaseWarmer(Database cx);
Future<Void> quietDatabase(Database const& cx,
Reference<AsyncVar<struct ServerDBInfo>> const&,
Reference<AsyncVar<struct ServerDBInfo> const> const&,
std::string phase,
int64_t dataInFlightGate = 2e6,
int64_t maxTLogQueueGate = 5e6,

View File

@ -158,7 +158,7 @@ ACTOR Future<Void> testPublisher(Reference<AsyncVar<DummyState>> input) {
return Void();
}
ACTOR Future<Void> testSubscriber(Reference<IDependentAsyncVar<int>> output, Optional<int> expected) {
ACTOR Future<Void> testSubscriber(Reference<IAsyncListener<int>> output, Optional<int> expected) {
loop {
wait(output->onChange());
ASSERT(expected.present());
@ -170,12 +170,12 @@ ACTOR Future<Void> testSubscriber(Reference<IDependentAsyncVar<int>> output, Opt
} // namespace
TEST_CASE("/flow/genericactors/DependentAsyncVar") {
TEST_CASE("/flow/genericactors/AsyncListener") {
auto input = makeReference<AsyncVar<DummyState>>();
state Future<Void> subscriber1 =
testSubscriber(IDependentAsyncVar<int>::create(input, [](auto const& var) { return var.changed; }), 100);
testSubscriber(IAsyncListener<int>::create(input, [](auto const& var) { return var.changed; }), 100);
state Future<Void> subscriber2 =
testSubscriber(IDependentAsyncVar<int>::create(input, [](auto const& var) { return var.unchanged; }), {});
testSubscriber(IAsyncListener<int>::create(input, [](auto const& var) { return var.unchanged; }), {});
wait(subscriber1 && testPublisher(input));
ASSERT(!subscriber2.isReady());
return Void();

View File

@ -690,7 +690,7 @@ public:
AsyncTrigger() {}
AsyncTrigger(AsyncTrigger&& at) : v(std::move(at.v)) {}
void operator=(AsyncTrigger&& at) { v = std::move(at.v); }
Future<Void> onTrigger() { return v.onChange(); }
Future<Void> onTrigger() const { return v.onChange(); }
void trigger() { v.trigger(); }
private:
@ -700,7 +700,7 @@ private:
// Binds an AsyncTrigger object to an AsyncVar, so when the AsyncVar changes
// the AsyncTrigger is triggered.
ACTOR template <class T>
void forward(Reference<AsyncVar<T>> from, AsyncTrigger* to) {
void forward(Reference<AsyncVar<T> const> from, AsyncTrigger* to) {
loop {
wait(from->onChange());
to->trigger();
@ -1957,25 +1957,28 @@ Future<U> operator>>(Future<T> const& lhs, Future<U> const& rhs) {
}
/*
* IDependentAsyncVar is similar to AsyncVar, but it decouples the input and output, so the translation unit
* IAsyncListener is similar to AsyncVar, but it decouples the input and output, so the translation unit
* responsible for handling the output does not need to have knowledge of how the output is generated
*/
template <class Output>
class IDependentAsyncVar : public ReferenceCounted<IDependentAsyncVar<Output>> {
class IAsyncListener : public ReferenceCounted<IAsyncListener<Output>> {
public:
virtual ~IDependentAsyncVar() = default;
virtual ~IAsyncListener() = default;
virtual Output const& get() const = 0;
virtual Future<Void> onChange() const = 0;
template <class Input, class F>
static Reference<IDependentAsyncVar> create(Reference<AsyncVar<Input>> const& input, F const& f);
static Reference<IDependentAsyncVar> create(Reference<AsyncVar<Output>> const& output);
static Reference<IAsyncListener> create(Reference<AsyncVar<Input>> const& input, F const& f);
static Reference<IAsyncListener> create(Reference<AsyncVar<Output>> const& output);
};
namespace IAsyncListenerImpl {
template <class Input, class Output, class F>
class DependentAsyncVar final : public IDependentAsyncVar<Output> {
Reference<AsyncVar<Output>> output;
class AsyncListener final : public IAsyncListener<Output> {
// Order matters here, output must outlive monitorActor
AsyncVar<Output> output;
Future<Void> monitorActor;
ACTOR static Future<Void> monitor(Reference<AsyncVar<Input>> input, Reference<AsyncVar<Output>> output, F f) {
ACTOR static Future<Void> monitor(Reference<AsyncVar<Input> const> input, AsyncVar<Output>* output, F f) {
loop {
wait(input->onChange());
output->set(f(input->get()));
@ -1983,23 +1986,24 @@ class DependentAsyncVar final : public IDependentAsyncVar<Output> {
}
public:
DependentAsyncVar(Reference<AsyncVar<Input>> const& input, F const& f)
: output(makeReference<AsyncVar<Output>>(f(input->get()))), monitorActor(monitor(input, output, f)) {}
Output const& get() const override { return output->get(); }
Future<Void> onChange() const override { return output->onChange(); }
AsyncListener(Reference<AsyncVar<Input> const> const& input, F const& f)
: output(f(input->get())), monitorActor(monitor(input, &output, f)) {}
Output const& get() const override { return output.get(); }
Future<Void> onChange() const override { return output.onChange(); }
};
} // namespace IAsyncListenerImpl
template <class Output>
template <class Input, class F>
Reference<IDependentAsyncVar<Output>> IDependentAsyncVar<Output>::create(Reference<AsyncVar<Input>> const& input,
F const& f) {
return makeReference<DependentAsyncVar<Input, Output, F>>(input, f);
Reference<IAsyncListener<Output>> IAsyncListener<Output>::create(Reference<AsyncVar<Input>> const& input, F const& f) {
return makeReference<IAsyncListenerImpl::AsyncListener<Input, Output, F>>(input, f);
}
template <class Output>
Reference<IDependentAsyncVar<Output>> IDependentAsyncVar<Output>::create(Reference<AsyncVar<Output>> const& input) {
Reference<IAsyncListener<Output>> IAsyncListener<Output>::create(Reference<AsyncVar<Output>> const& input) {
auto identity = [](const auto& x) { return x; };
return makeReference<DependentAsyncVar<Output, Output, decltype(identity)>>(input, identity);
return makeReference<IAsyncListenerImpl::AsyncListener<Output, Output, decltype(identity)>>(input, identity);
}
// A weak reference type to wrap a future Reference<T> object.