move DataDistributionEnabled to DDTxnProcessor

This commit is contained in:
Xiaoxi Wang 2022-08-18 10:23:33 -07:00
parent 98096349e3
commit 52e7825179
3 changed files with 53 additions and 41 deletions

View File

@ -396,6 +396,46 @@ class DDTxnProcessorImpl {
}
}
ACTOR static Future<bool> isDataDistributionEnabled(Database cx, const DDEnabledState* ddEnabledState) {
state Transaction tr(cx);
loop {
try {
Optional<Value> mode = wait(tr.get(dataDistributionModeKey));
if (!mode.present() && ddEnabledState->isDDEnabled())
return true;
if (mode.present()) {
BinaryReader rd(mode.get(), Unversioned());
int m;
rd >> m;
if (m && ddEnabledState->isDDEnabled()) {
TraceEvent(SevDebug, "IsDDEnabledSucceeded")
.detail("Mode", m)
.detail("IsDDEnabled", ddEnabledState->isDDEnabled());
return true;
}
}
// SOMEDAY: Write a wrapper in MoveKeys.actor.h
Optional<Value> readVal = wait(tr.get(moveKeysLockOwnerKey));
UID currentOwner =
readVal.present() ? BinaryReader::fromStringRef<UID>(readVal.get(), Unversioned()) : UID();
if (ddEnabledState->isDDEnabled() && (currentOwner != dataDistributionModeLock)) {
TraceEvent(SevDebug, "IsDDEnabledSucceeded")
.detail("CurrentOwner", currentOwner)
.detail("DDModeLock", dataDistributionModeLock)
.detail("IsDDEnabled", ddEnabledState->isDDEnabled());
return true;
}
TraceEvent(SevDebug, "IsDDEnabledFailed")
.detail("CurrentOwner", currentOwner)
.detail("DDModeLock", dataDistributionModeLock)
.detail("IsDDEnabled", ddEnabledState->isDDEnabled());
return false;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR static Future<Void> pollMoveKeysLock(Database cx, MoveKeysLock lock, const DDEnabledState* ddEnabledState) {
loop {
wait(delay(SERVER_KNOBS->MOVEKEYS_LOCK_POLLING_DELAY));
@ -447,6 +487,10 @@ Future<Void> DDTxnProcessor::waitForDataDistributionEnabled(const DDEnabledState
return DDTxnProcessorImpl::waitForDataDistributionEnabled(cx, ddEnabledState);
}
Future<bool> DDTxnProcessor::isDataDistributionEnabled(const DDEnabledState* ddEnabledState) const {
return DDTxnProcessorImpl::isDataDistributionEnabled(cx, ddEnabledState);
}
Future<Void> DDTxnProcessor::pollMoveKeysLock(MoveKeysLock lock, const DDEnabledState* ddEnabledState) const {
return DDTxnProcessorImpl::pollMoveKeysLock(cx, lock, ddEnabledState);
}

View File

@ -202,46 +202,6 @@ ACTOR Future<Void> remoteRecovered(Reference<AsyncVar<ServerDBInfo> const> db) {
return Void();
}
ACTOR Future<bool> isDataDistributionEnabled(Database cx, const DDEnabledState* ddEnabledState) {
state Transaction tr(cx);
loop {
try {
Optional<Value> mode = wait(tr.get(dataDistributionModeKey));
if (!mode.present() && ddEnabledState->isDDEnabled())
return true;
if (mode.present()) {
BinaryReader rd(mode.get(), Unversioned());
int m;
rd >> m;
if (m && ddEnabledState->isDDEnabled()) {
TraceEvent(SevDebug, "IsDDEnabledSucceeded")
.detail("Mode", m)
.detail("IsDDEnabled", ddEnabledState->isDDEnabled());
return true;
}
}
// SOMEDAY: Write a wrapper in MoveKeys.actor.h
Optional<Value> readVal = wait(tr.get(moveKeysLockOwnerKey));
UID currentOwner =
readVal.present() ? BinaryReader::fromStringRef<UID>(readVal.get(), Unversioned()) : UID();
if (ddEnabledState->isDDEnabled() && (currentOwner != dataDistributionModeLock)) {
TraceEvent(SevDebug, "IsDDEnabledSucceeded")
.detail("CurrentOwner", currentOwner)
.detail("DDModeLock", dataDistributionModeLock)
.detail("IsDDEnabled", ddEnabledState->isDDEnabled());
return true;
}
TraceEvent(SevDebug, "IsDDEnabledFailed")
.detail("CurrentOwner", currentOwner)
.detail("DDModeLock", dataDistributionModeLock)
.detail("IsDDEnabled", ddEnabledState->isDDEnabled());
return false;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
// Ensures that the serverKeys key space is properly coalesced
// This method is only used for testing and is not implemented in a manner that is safe for large databases
ACTOR Future<Void> debugCheckCoalescing(Database cx) {
@ -532,6 +492,10 @@ public:
}
Future<Void> pollMoveKeysLock() { return txnProcessor->pollMoveKeysLock(lock, context->ddEnabledState.get()); }
Future<bool> isDataDistributionEnabled() const {
return txnProcessor->isDataDistributionEnabled(context->ddEnabledState.get());
}
};
// Runs the data distribution algorithm for FDB, including the DD Queue, DD tracker, and DD team collection
@ -759,7 +723,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
throw err;
}
bool ddEnabled = wait(isDataDistributionEnabled(cx, self->context->ddEnabledState.get()));
bool ddEnabled = wait(self->isDataDistributionEnabled());
TraceEvent("DataDistributionMoveKeysConflict").error(err).detail("DataDistributionEnabled", ddEnabled);
if (ddEnabled) {
throw err;

View File

@ -62,6 +62,8 @@ public:
virtual Future<Void> waitForDataDistributionEnabled(const DDEnabledState* ddEnabledState) const = 0;
virtual Future<bool> isDataDistributionEnabled(const DDEnabledState* ddEnabledState) const = 0;
virtual Future<Void> pollMoveKeysLock(MoveKeysLock lock, const DDEnabledState* ddEnabledState) const = 0;
};
@ -97,6 +99,8 @@ public:
Future<Void> waitForDataDistributionEnabled(const DDEnabledState* ddEnabledState) const override;
Future<bool> isDataDistributionEnabled(const DDEnabledState* ddEnabledState) const override;
Future<Void> pollMoveKeysLock(MoveKeysLock lock, const DDEnabledState* ddEnabledState) const override;
};