Merge pull request #7563 from sfc-gh-xwang/feature/dd-refactor-incremental

[DD Testability] move updateReplicaKey to txnProcessor
This commit is contained in:
Xiaoxi Wang 2022-07-11 17:11:55 -07:00 committed by GitHub
commit cc185c51c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 59 additions and 30 deletions

View File

@ -89,6 +89,41 @@ class DDTxnProcessorImpl {
return IDDTxnProcessor::SourceServers{ std::vector<UID>(servers.begin(), servers.end()), completeSources }; return IDDTxnProcessor::SourceServers{ std::vector<UID>(servers.begin(), servers.end()), completeSources };
} }
// set the system key space
ACTOR static Future<Void> updateReplicaKeys(Database cx,
std::vector<Optional<Key>> primaryDcId,
std::vector<Optional<Key>> remoteDcIds,
DatabaseConfiguration configuration) {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
RangeResult replicaKeys = wait(tr.getRange(datacenterReplicasKeys, CLIENT_KNOBS->TOO_MANY));
for (auto& kv : replicaKeys) {
auto dcId = decodeDatacenterReplicasKey(kv.key);
auto replicas = decodeDatacenterReplicasValue(kv.value);
if ((primaryDcId.size() && primaryDcId.at(0) == dcId) ||
(remoteDcIds.size() && remoteDcIds.at(0) == dcId && configuration.usableRegions > 1)) {
if (replicas > configuration.storageTeamSize) {
tr.set(kv.key, datacenterReplicasValue(configuration.storageTeamSize));
}
} else {
tr.clear(kv.key);
}
}
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}
}; };
Future<IDDTxnProcessor::SourceServers> DDTxnProcessor::getSourceServersForRange(const KeyRangeRef range) { Future<IDDTxnProcessor::SourceServers> DDTxnProcessor::getSourceServersForRange(const KeyRangeRef range) {
@ -106,4 +141,10 @@ Future<MoveKeysLock> DDTxnProcessor::takeMoveKeysLock(UID ddId) const {
Future<DatabaseConfiguration> DDTxnProcessor::getDatabaseConfiguration() const { Future<DatabaseConfiguration> DDTxnProcessor::getDatabaseConfiguration() const {
return ::getDatabaseConfiguration(cx); return ::getDatabaseConfiguration(cx);
}
Future<Void> DDTxnProcessor::updateReplicaKeys(const std::vector<Optional<Key>>& primaryIds,
const std::vector<Optional<Key>>& remoteIds,
const DatabaseConfiguration& configuration) const {
return DDTxnProcessorImpl::updateReplicaKeys(cx, primaryIds, remoteIds, configuration);
} }

View File

@ -587,6 +587,9 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
Future<Void> loadDatabaseConfiguration() { return store(configuration, txnProcessor->getDatabaseConfiguration()); } Future<Void> loadDatabaseConfiguration() { return store(configuration, txnProcessor->getDatabaseConfiguration()); }
Future<Void> updateReplicaKeys() {
return txnProcessor->updateReplicaKeys(primaryDcId, remoteDcIds, configuration);
}
void initDcInfo() { void initDcInfo() {
primaryDcId.clear(); primaryDcId.clear();
remoteDcIds.clear(); remoteDcIds.clear();
@ -637,36 +640,9 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
self->initDcInfo(); self->initDcInfo();
TraceEvent("DDInitGotConfiguration", self->ddId).detail("Conf", self->configuration.toString()); TraceEvent("DDInitGotConfiguration", self->ddId).detail("Conf", self->configuration.toString());
state Transaction tr(cx); wait(self->updateReplicaKeys());
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
RangeResult replicaKeys = wait(tr.getRange(datacenterReplicasKeys, CLIENT_KNOBS->TOO_MANY));
for (auto& kv : replicaKeys) {
auto dcId = decodeDatacenterReplicasKey(kv.key);
auto replicas = decodeDatacenterReplicasValue(kv.value);
if ((self->primaryDcId.size() && self->primaryDcId[0] == dcId) ||
(self->remoteDcIds.size() && self->remoteDcIds[0] == dcId &&
self->configuration.usableRegions > 1)) {
if (replicas > self->configuration.storageTeamSize) {
tr.set(kv.key, datacenterReplicasValue(self->configuration.storageTeamSize));
}
} else {
tr.clear(kv.key);
}
}
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
TraceEvent("DDInitUpdatedReplicaKeys", self->ddId).log(); TraceEvent("DDInitUpdatedReplicaKeys", self->ddId).log();
Reference<InitialDataDistribution> initData_ = wait(getInitialDataDistribution( Reference<InitialDataDistribution> initData_ = wait(getInitialDataDistribution(
cx, cx,
self->ddId, self->ddId,

View File

@ -45,6 +45,12 @@ public:
[[nodiscard]] virtual Future<MoveKeysLock> takeMoveKeysLock(UID ddId) const { return MoveKeysLock(); } [[nodiscard]] virtual Future<MoveKeysLock> takeMoveKeysLock(UID ddId) const { return MoveKeysLock(); }
virtual Future<DatabaseConfiguration> getDatabaseConfiguration() const { return DatabaseConfiguration(); } virtual Future<DatabaseConfiguration> getDatabaseConfiguration() const { return DatabaseConfiguration(); }
virtual Future<Void> updateReplicaKeys(const std::vector<Optional<Key>>& primaryIds,
const std::vector<Optional<Key>>& remoteIds,
const DatabaseConfiguration& configuration) const {
return Void();
}
}; };
class DDTxnProcessorImpl; class DDTxnProcessorImpl;
@ -66,9 +72,15 @@ public:
Future<MoveKeysLock> takeMoveKeysLock(UID ddId) const override; Future<MoveKeysLock> takeMoveKeysLock(UID ddId) const override;
Future<DatabaseConfiguration> getDatabaseConfiguration() const override; Future<DatabaseConfiguration> getDatabaseConfiguration() const override;
Future<Void> updateReplicaKeys(const std::vector<Optional<Key>>& primaryIds,
const std::vector<Optional<Key>>& remoteIds,
const DatabaseConfiguration& configuration) const override;
}; };
// run mock transaction // A mock transaction implementation for test usage.
// Contract: every function involving mock transaction should return immediately to mimic the ACI property of real
// transaction.
class DDMockTxnProcessor : public IDDTxnProcessor {}; class DDMockTxnProcessor : public IDDTxnProcessor {};
#endif // FOUNDATIONDB_DDTXNPROCESSOR_H #endif // FOUNDATIONDB_DDTXNPROCESSOR_H