Add DD Security Mode (#10646)

* dd-security-mode

* address comments

* cleanup

* revise tr option set in loadAndUpdateAuditMetadataWithNewDDId

* address comments

* reset auditStorageInitStarted before DD init

* decouple audit resume and audit launch

* audit launch new request should wait for resuming existing requests

* address comment/clean up/fix

* fix

* fix initAuditMetadata retry

* fix initAuditMetadata retry should reset tr
This commit is contained in:
Zhe Wang 2023-07-21 17:06:25 -07:00 committed by GitHub
parent 8785840e52
commit 3426fc3c1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 408 additions and 344 deletions

View File

@ -78,46 +78,6 @@ ACTOR Future<bool> checkStorageServerRemoved(Database cx, UID ssid) {
return res;
}
ACTOR Future<Void> clearAuditMetadata(Database cx, AuditType auditType, UID auditId, bool clearProgressMetadata) {
try {
state Transaction tr(cx);
TraceEvent(SevDebug, "AuditUtilClearAuditMetadataStart", auditId)
.detail("AuditKey", auditKey(auditType, auditId));
loop {
try {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> res_ = wait(tr.get(auditKey(auditType, auditId)));
if (!res_.present()) { // has been cleared
break; // Nothing to clear
}
state AuditStorageState toClearState = decodeAuditStorageState(res_.get());
ASSERT(toClearState.id == auditId && toClearState.getType() == auditType);
// For a zombie audit, it is in running state
// Clear audit metadata
tr.clear(auditKey(auditType, auditId));
// clear progress metadata
if (clearProgressMetadata) {
clearAuditProgressMetadata(&tr, auditType, auditId);
}
wait(tr.commit());
TraceEvent(SevDebug, "AuditUtilClearAuditMetadataEnd", auditId)
.detail("AuditKey", auditKey(auditType, auditId));
break;
} catch (Error& e) {
TraceEvent(SevDebug, "AuditUtilClearAuditMetadataError", auditId)
.detail("AuditKey", auditKey(auditType, auditId));
wait(tr.onError(e));
}
}
} catch (Error& e) {
// We do not want audit cleanup effects DD
// pass
}
return Void();
}
ACTOR Future<Void> cancelAuditMetadata(Database cx, AuditType auditType, UID auditId) {
try {
state Transaction tr(cx);
@ -351,59 +311,15 @@ ACTOR static Future<Void> checkMoveKeysLock(Transaction* tr,
return Void();
} else {
CODE_PROBE(true, "checkMoveKeysLock: Conflict with new owner");
TraceEvent(SevDebug, "AuditUtilConflictWithNewOwner");
TraceEvent(SevDebug, "AuditUtilConflictWithNewOwner")
.detail("CurrentOwner", currentOwner.toString())
.detail("PrevOwner", lock.prevOwner.toString())
.detail("PrevWrite", lock.prevWrite.toString())
.detail("MyOwner", lock.myOwner.toString());
throw movekeys_conflict(); // need a new name
}
}
ACTOR Future<Void> updateAuditState(Database cx, AuditStorageState auditState, MoveKeyLockInfo lock, bool ddEnabled) {
state Transaction tr(cx);
state bool hasCancelled = false;
loop {
try {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
wait(checkMoveKeysLock(&tr, lock, ddEnabled, true));
// Check existing state
Optional<Value> res_ = wait(tr.get(auditKey(auditState.getType(), auditState.id)));
if (!res_.present()) { // has been cancelled
hasCancelled = true;
break; // exit
} else {
const AuditStorageState currentState = decodeAuditStorageState(res_.get());
ASSERT(currentState.id == auditState.id && currentState.getType() == auditState.getType());
if (currentState.getPhase() == AuditPhase::Failed) {
hasCancelled = true;
break; // exit
}
}
// Persist audit result
tr.set(auditKey(auditState.getType(), auditState.id), auditStorageStateValue(auditState));
wait(tr.commit());
break;
} catch (Error& e) {
TraceEvent(SevDebug, "AuditUtilUpdateAuditStateError", auditState.id)
.errorUnsuppressed(e)
.detail("AuditID", auditState.id)
.detail("AuditType", auditState.getType())
.detail("AuditPhase", auditState.getPhase())
.detail("AuditKey", auditKey(auditState.getType(), auditState.id));
wait(tr.onError(e));
}
}
TraceEvent(SevDebug, "AuditUtilUpdateAuditStateEnd", auditState.id)
.detail("Cancelled", hasCancelled)
.detail("AuditID", auditState.id)
.detail("AuditType", auditState.getType())
.detail("AuditPhase", auditState.getPhase())
.detail("AuditKey", auditKey(auditState.getType(), auditState.id));
return Void();
}
ACTOR Future<UID> persistNewAuditState(Database cx,
AuditStorageState auditState,
MoveKeyLockInfo lock,
@ -793,3 +709,115 @@ ACTOR Future<bool> checkAuditProgressComplete(Database cx, AuditType auditType,
}
return true;
}
// Load RUNNING audit states to resume, clean up COMPLETE and FAILED audit states
// Update ddId for RUNNING audit states
ACTOR Future<std::vector<AuditStorageState>> initAuditMetadata(Database cx,
MoveKeyLockInfo lock,
bool ddEnabled,
UID dataDistributorId,
int persistFinishAuditCount) {
state std::unordered_map<AuditType, std::vector<AuditStorageState>> existingAuditStates;
state std::vector<AuditStorageState> auditStatesToResume;
state Transaction tr(cx);
state int retryCount = 0;
loop {
try {
// Load existing audit states and update ddId in audit states
existingAuditStates.clear();
auditStatesToResume.clear();
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
wait(checkMoveKeysLock(&tr, lock, ddEnabled, true));
RangeResult result = wait(tr.getRange(auditKeys, CLIENT_KNOBS->TOO_MANY));
if (result.more || result.size() >= CLIENT_KNOBS->TOO_MANY) {
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways,
"AuditUtilLoadMetadataIncomplete",
dataDistributorId)
.detail("ResMore", result.more)
.detail("ResSize", result.size());
}
for (int i = 0; i < result.size(); ++i) {
auto auditState = decodeAuditStorageState(result[i].value);
TraceEvent(SevVerbose, "AuditUtilLoadMetadataEach", dataDistributorId)
.detail("CurrentDDID", dataDistributorId)
.detail("AuditDDID", auditState.ddId)
.detail("AuditType", auditState.getType())
.detail("AuditID", auditState.id)
.detail("AuditPhase", auditState.getPhase());
if (auditState.getPhase() == AuditPhase::Running) {
AuditStorageState toUpdate = auditState;
toUpdate.ddId = dataDistributorId;
tr.set(auditKey(toUpdate.getType(), toUpdate.id), auditStorageStateValue(toUpdate));
}
existingAuditStates[auditState.getType()].push_back(auditState);
}
// Cleanup Complete/Failed audit metadata for each type separately
for (const auto& [auditType, _] : existingAuditStates) {
int numFinishAudit = 0; // "finish" audits include Complete/Failed audits
for (const auto& auditState : existingAuditStates[auditType]) {
if (auditState.getPhase() == AuditPhase::Complete || auditState.getPhase() == AuditPhase::Failed) {
numFinishAudit++;
}
}
const int numFinishAuditsToClear = numFinishAudit - persistFinishAuditCount;
int numFinishAuditsCleared = 0;
std::sort(existingAuditStates[auditType].begin(),
existingAuditStates[auditType].end(),
[](AuditStorageState a, AuditStorageState b) {
return a.id < b.id; // Inplacement sort in ascending order
});
for (const auto& auditState : existingAuditStates[auditType]) {
if (auditState.getPhase() == AuditPhase::Failed) {
if (numFinishAuditsCleared < numFinishAuditsToClear) {
// Clear both audit metadata and corresponding progress metadata
tr.clear(auditKey(auditState.getType(), auditState.id));
clearAuditProgressMetadata(&tr, auditState.getType(), auditState.id);
numFinishAuditsCleared++;
TraceEvent(SevInfo, "AuditUtilMetadataCleared", dataDistributorId)
.detail("AuditID", auditState.id)
.detail("AuditType", auditState.getType())
.detail("AuditRange", auditState.range);
}
} else if (auditState.getPhase() == AuditPhase::Complete) {
if (numFinishAuditsCleared < numFinishAuditsToClear) {
// Clear audit metadata only
// No need to clear the corresponding progress metadata
// since it has been cleared for Complete audits
tr.clear(auditKey(auditState.getType(), auditState.id));
numFinishAuditsCleared++;
TraceEvent(SevInfo, "AuditUtilMetadataCleared", dataDistributorId)
.detail("AuditID", auditState.id)
.detail("AuditType", auditState.getType())
.detail("AuditRange", auditState.range);
}
} else if (auditState.getPhase() == AuditPhase::Running) {
auditStatesToResume.push_back(auditState);
TraceEvent(SevInfo, "AuditUtilMetadataAddedToResume", dataDistributorId)
.detail("AuditID", auditState.id)
.detail("AuditType", auditState.getType())
.detail("AuditRange", auditState.range);
}
}
}
wait(tr.commit());
break;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled || e.code() == error_code_movekeys_conflict) {
throw e;
}
if (retryCount > 50) {
TraceEvent(SevWarnAlways, "InitAuditMetadataExceedRetryMax", dataDistributorId).errorUnsuppressed(e);
break;
}
try {
wait(tr.onError(e));
} catch (Error& e) {
retryCount++;
tr.reset();
}
}
}
return auditStatesToResume;
}

View File

@ -2234,6 +2234,8 @@ ACTOR Future<int> setDDMode(Database cx, int mode) {
tr.set(dataDistributionModeKey, wr.toValue());
if (mode) {
// set DDMode to 1 will enable all disabled parts, for instance the SS failure monitors.
// set DDMode to 2 is a security mode which disables data moves but allows auditStorage part
// DDMode=2 is set when shard location metadata inconsistency is detected
Optional<Value> currentHealthyZoneValue = wait(tr.get(healthyZoneKey));
if (currentHealthyZoneValue.present() &&
decodeHealthyZoneValue(currentHealthyZoneValue.get()).first == ignoreSSFailuresZoneString) {

View File

@ -2626,7 +2626,7 @@ Future<Optional<std::string>> DataDistributionImpl::commit(ReadYourWritesTransac
try {
int mode = boost::lexical_cast<int>(iter->value().second.get().toString());
Value modeVal = BinaryWriter::toValue(mode, Unversioned());
if (mode == 0 || mode == 1) {
if (mode == 0 || mode == 1 || mode == 2) {
// Whenever configuration changes or DD related system keyspace is changed,
// actor must grab the moveKeysLockOwnerKey and update moveKeysLockWriteKey.
// This prevents concurrent write to the same system keyspace.

View File

@ -36,7 +36,6 @@ struct MoveKeyLockInfo {
UID prevOwner, myOwner, prevWrite;
};
ACTOR Future<Void> clearAuditMetadata(Database cx, AuditType auditType, UID auditId, bool clearProgressMetadata);
ACTOR Future<Void> cancelAuditMetadata(Database cx, AuditType auditType, UID auditId);
ACTOR Future<UID> persistNewAuditState(Database cx, AuditStorageState auditState, MoveKeyLockInfo lock, bool ddEnabled);
ACTOR Future<Void> persistAuditState(Database cx,
@ -67,8 +66,12 @@ ACTOR Future<Void> clearAuditMetadataForType(Database cx,
UID maxAuditIdToClear,
int numFinishAuditToKeep);
ACTOR Future<bool> checkStorageServerRemoved(Database cx, UID ssid);
ACTOR Future<Void> updateAuditState(Database cx, AuditStorageState auditState, MoveKeyLockInfo lock, bool ddEnabled);
AuditPhase stringToAuditPhase(std::string auditPhaseStr);
ACTOR Future<bool> checkAuditProgressComplete(Database cx, AuditType auditType, UID auditId, KeyRange auditRange);
ACTOR Future<std::vector<AuditStorageState>> initAuditMetadata(Database cx,
MoveKeyLockInfo lock,
bool ddEnabled,
UID dataDistributorId,
int persistFinishAuditCount);
#include "flow/unactorcompiler.h"
#endif

View File

@ -364,13 +364,6 @@ class DDTxnProcessorImpl {
++numDataMoves;
}
RangeResult ads = wait(tr.getRange(auditKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!ads.more && ads.size() < CLIENT_KNOBS->TOO_MANY);
for (int i = 0; i < ads.size(); ++i) {
auto auditState = decodeAuditStorageState(ads[i].value);
result->auditStates.push_back(auditState);
}
succeeded = true;
break;

View File

@ -82,11 +82,17 @@ ShardSizeBounds ShardSizeBounds::shardSizeBoundsBeforeTrack() {
.opsReadPerKSecond = StorageMetrics::infinity } };
}
enum class DDAuditContext : uint8_t {
Invalid = 0,
Resume = 1,
Launch = 2,
Retry = 3,
};
struct DDAudit {
DDAudit(AuditStorageState coreState)
: coreState(coreState), actors(true), foundError(false), auditStorageAnyChildFailed(false), retryCount(0),
cancelled(false), overallCompleteDoAuditCount(0), overallIssuedDoAuditCount(0),
remainingBudgetForAuditTasks(SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX) {}
remainingBudgetForAuditTasks(SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX), context(0) {}
AuditStorageState coreState;
ActorCollection actors;
@ -98,10 +104,14 @@ struct DDAudit {
int64_t overallIssuedDoAuditCount;
int64_t overallCompleteDoAuditCount;
AsyncVar<int> remainingBudgetForAuditTasks;
uint8_t context;
inline void setAuditRunActor(Future<Void> actor) { auditActor = actor; }
inline Future<Void> getAuditRunActor() { return auditActor; }
inline void setDDAuditContext(DDAuditContext context) { this->context = static_cast<uint8_t>(context); }
inline DDAuditContext getDDAuditContext() const { return static_cast<DDAuditContext>(this->context); }
// auditActor and actors are guaranteed to deliver a cancel signal
void cancel() {
auditActor.cancel();
@ -307,6 +317,33 @@ static std::set<int> const& normalDDQueueErrors() {
return s;
}
struct DataDistributor;
void runAuditStorage(Reference<DataDistributor> self,
AuditStorageState auditStates,
int retryCount,
DDAuditContext context);
ACTOR Future<Void> auditStorageCore(Reference<DataDistributor> self,
UID auditID,
AuditType auditType,
int currentRetryCount);
ACTOR Future<UID> launchAudit(Reference<DataDistributor> self, KeyRange auditRange, AuditType auditType);
ACTOR Future<Void> auditStorage(Reference<DataDistributor> self, TriggerAuditRequest req);
void loadAndDispatchAudit(Reference<DataDistributor> self, std::shared_ptr<DDAudit> audit, KeyRange range);
ACTOR Future<Void> dispatchAuditStorageServerShard(Reference<DataDistributor> self, std::shared_ptr<DDAudit> audit);
ACTOR Future<Void> scheduleAuditStorageShardOnServer(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
StorageServerInterface ssi);
ACTOR Future<Void> dispatchAuditStorage(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range);
ACTOR Future<Void> scheduleAuditOnRange(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range);
ACTOR Future<Void> doAuditOnStorageServer(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
StorageServerInterface ssi,
AuditStorageRequest req);
struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
public:
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
@ -339,11 +376,12 @@ public:
Promise<Void> initialized;
std::unordered_map<AuditType, std::unordered_map<UID, std::shared_ptr<DDAudit>>> audits;
Promise<Void> auditInitialized;
FlowLock auditStorageHaLaunchingLock;
FlowLock auditStorageReplicaLaunchingLock;
FlowLock auditStorageLocationMetadataLaunchingLock;
FlowLock auditStorageSsShardLaunchingLock;
Promise<Void> auditStorageInitialized;
bool auditStorageInitStarted;
Optional<Reference<TenantCache>> ddTenantCache;
@ -358,7 +396,8 @@ public:
totalDataInFlightEventHolder(makeReference<EventCacheHolder>("TotalDataInFlight")),
totalDataInFlightRemoteEventHolder(makeReference<EventCacheHolder>("TotalDataInFlightRemote")),
teamCollection(nullptr), auditStorageHaLaunchingLock(1), auditStorageReplicaLaunchingLock(1),
auditStorageLocationMetadataLaunchingLock(1), auditStorageSsShardLaunchingLock(1) {}
auditStorageLocationMetadataLaunchingLock(1), auditStorageSsShardLaunchingLock(1),
auditStorageInitStarted(false) {}
// bootstrap steps
@ -396,6 +435,72 @@ public:
return txnProcessor->waitForDataDistributionEnabled(context->ddEnabledState.get());
}
// Resume in-memory audit instances and issue background audit metadata cleanup
void resumeAuditStorage(Reference<DataDistributor> self, std::vector<AuditStorageState> auditStates) {
for (const auto& auditState : auditStates) {
if (auditState.getPhase() != AuditPhase::Running) {
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, "WrongAuditStateToResume")
.detail("AuditState", auditState.toString());
return;
}
if (self->audits.contains(auditState.getType()) &&
self->audits[auditState.getType()].contains(auditState.id)) {
// Ignore any RUNNING auditState with an alive audit
// instance in DD audits map
continue;
}
runAuditStorage(self, auditState, 0, DDAuditContext::Resume);
TraceEvent(SevInfo, "AuditStorageResumed", self->ddId)
.detail("AuditID", auditState.id)
.detail("AuditType", auditState.getType())
.detail("AuditState", auditState.toString());
}
return;
}
ACTOR static Future<Void> initAuditStorage(Reference<DataDistributor> self) {
self->auditStorageInitStarted = true;
MoveKeyLockInfo lockInfo;
lockInfo.myOwner = self->lock.myOwner;
lockInfo.prevOwner = self->lock.prevOwner;
lockInfo.prevWrite = self->lock.prevWrite;
std::vector<AuditStorageState> auditStatesToResume =
wait(initAuditMetadata(self->txnProcessor->context(),
lockInfo,
self->context->isDDEnabled(),
self->ddId,
SERVER_KNOBS->PERSIST_FINISH_AUDIT_COUNT));
self->resumeAuditStorage(self, auditStatesToResume);
self->auditStorageInitialized.send(Void());
return Void();
}
ACTOR static Future<Void> waitUntilDataDistributorExitSecurityMode(Reference<DataDistributor> self) {
state Transaction tr(self->txnProcessor->context());
loop {
wait(delay(SERVER_KNOBS->DD_ENABLED_CHECK_DELAY, TaskPriority::DataDistribution));
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
try {
Optional<Value> mode = wait(tr.get(dataDistributionModeKey));
if (!mode.present()) {
return Void();
}
BinaryReader rd(mode.get(), Unversioned());
int ddMode = 1;
rd >> ddMode;
if (ddMode != 2) {
return Void();
}
wait(checkMoveKeysLockReadOnly(&tr, self->context->lock, self->context->ddEnabledState.get()));
tr.reset();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
// Initialize the required internal states of DataDistributor from system metadata. It's necessary before
// DataDistributor start working. Doesn't include initialization of optional components, like TenantCache, DDQueue,
// Tracker, TeamCollection. The components should call its own ::init methods.
@ -408,6 +513,25 @@ public:
wait(self->takeMoveKeysLock());
TraceEvent("DDInitTookMoveKeysLock", self->ddId).log();
// AuditStorage does not rely on DatabaseConfiguration
// AuditStorage read neccessary info purely from system key space
if (!self->auditStorageInitStarted) {
// Avoid multiple initAuditStorages
self->addActor.send(self->initAuditStorage(self));
}
// It is possible that an audit request arrives and then DDMode
// is set to 2 at this point
// No polling MoveKeyLock is running
// So, we need to check MoveKeyLock when waitUntilDataDistributorExitSecurityMode
wait(waitUntilDataDistributorExitSecurityMode(self)); // Trap DDMode == 2
// It is possible DDMode begins with 2 and passes
// waitDataDistributorEnabled and then set to 0 before
// waitUntilDataDistributorExitSecurityMode. For this case,
// after waitUntilDataDistributorExitSecurityMode, DDMode is 0.
// The init loop does not break and the loop will stuct at
// waitDataDistributorEnabled in the next iteration.
TraceEvent("DataDistributorExitSecurityMode").log();
wait(self->loadDatabaseConfiguration());
self->initDcInfo();
TraceEvent("DDInitGotConfiguration", self->ddId)
@ -442,7 +566,7 @@ public:
.trackLatest(self->initialDDEventHolder->trackingKey);
}
if (self->initData->mode && self->context->isDDEnabled()) {
if (self->initData->mode == 1 && self->context->isDDEnabled()) {
// mode may be set true by system operator using fdbcli and isEnabled() set to true
break;
}
@ -780,155 +904,6 @@ inline std::unordered_map<UID, std::shared_ptr<DDAudit>> getAuditsForType(Refere
return self->audits[auditType];
}
void runAuditStorage(Reference<DataDistributor> self,
AuditStorageState auditStates,
int retryCount,
std::string context);
ACTOR Future<Void> auditStorageCore(Reference<DataDistributor> self,
UID auditID,
AuditType auditType,
std::string context,
int currentRetryCount);
ACTOR Future<UID> launchAudit(Reference<DataDistributor> self, KeyRange auditRange, AuditType auditType);
ACTOR Future<Void> auditStorage(Reference<DataDistributor> self, TriggerAuditRequest req);
void loadAndDispatchAudit(Reference<DataDistributor> self, std::shared_ptr<DDAudit> audit, KeyRange range);
ACTOR Future<Void> dispatchAuditStorageServerShard(Reference<DataDistributor> self, std::shared_ptr<DDAudit> audit);
ACTOR Future<Void> scheduleAuditStorageShardOnServer(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
StorageServerInterface ssi);
ACTOR Future<Void> dispatchAuditStorage(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range);
ACTOR Future<Void> scheduleAuditOnRange(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range);
ACTOR Future<Void> doAuditOnStorageServer(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
StorageServerInterface ssi,
AuditStorageRequest req);
void cancelAllAuditsInAuditMap(Reference<DataDistributor> self) {
TraceEvent(SevDebug, "AuditMapOps", self->ddId).detail("Ops", "cancelAllAuditsInAuditMap");
for (auto& [auditType, auditMap] : self->audits) {
for (auto& [auditID, audit] : auditMap) {
// Any existing audit should stop running when the context switches out
audit->cancel();
}
}
self->audits.clear();
return;
}
ACTOR Future<Void> resumeStorageAudits(Reference<DataDistributor> self) {
ASSERT(!self->auditInitialized.getFuture().isReady());
if (self->initData->auditStates.empty()) {
self->auditInitialized.send(Void());
TraceEvent(SevVerbose, "AuditStorageResumeEmptyDone", self->ddId);
return Void();
}
// Update metadata
state int retryCount = 0;
loop {
try {
std::vector<Future<Void>> fs;
state MoveKeyLockInfo lockInfo;
lockInfo.myOwner = self->lock.myOwner;
lockInfo.prevOwner = self->lock.prevOwner;
lockInfo.prevWrite = self->lock.prevWrite;
for (const auto& auditState : self->initData->auditStates) {
// Only running audit will be resumed
if (auditState.getPhase() == AuditPhase::Running) {
AuditStorageState toUpdate = auditState;
toUpdate.ddId = self->ddId;
fs.push_back(updateAuditState(
self->txnProcessor->context(), toUpdate, lockInfo, self->context->isDDEnabled()));
}
}
wait(waitForAll(fs));
break;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled || e.code() == error_code_movekeys_conflict) {
throw e;
}
if (retryCount > 50) {
TraceEvent(SevWarnAlways, "AuditStorageResumeUnableUpdateMetadata", self->ddId).errorUnsuppressed(e);
return Void();
}
retryCount++;
}
}
// Following is atomic
// Cancel existing audits and restore
cancelAllAuditsInAuditMap(self);
std::unordered_map<AuditType, std::vector<AuditStorageState>> restoredAudits;
for (const auto& auditState : self->initData->auditStates) {
restoredAudits[auditState.getType()].push_back(auditState);
}
// We clear existing audit state for each auditType separately
for (const auto& [auditType, _] : restoredAudits) {
int numFinishAudit = 0; // "finish" audits include Complete/Failed audits
for (const auto& auditState : restoredAudits[auditType]) {
if (auditState.getPhase() == AuditPhase::Complete || auditState.getPhase() == AuditPhase::Failed) {
numFinishAudit++;
}
}
const int numFinishAuditsToClear = numFinishAudit - SERVER_KNOBS->PERSIST_FINISH_AUDIT_COUNT;
int numFinishAuditsCleared = 0;
std::sort(restoredAudits[auditType].begin(),
restoredAudits[auditType].end(),
[](AuditStorageState a, AuditStorageState b) {
return a.id < b.id; // Inplacement sort in ascending order
});
// Cleanup audit metadata for Failed/Complete audits
// Resume RUNNING audits
// Keep Error audits persistent
for (const auto& auditState : restoredAudits[auditType]) {
TraceEvent(SevVerbose, "AuditStorageResumeCheck", self->ddId)
.detail("AuditID", auditState.id)
.detail("AuditType", auditState.getType());
if (auditState.getPhase() == AuditPhase::Error) {
continue;
} else if (auditState.getPhase() == AuditPhase::Failed) {
if (numFinishAuditsCleared < numFinishAuditsToClear) {
// Clear both audit metadata and corresponding progress metadata
self->addActor.send(clearAuditMetadata(self->txnProcessor->context(),
auditState.getType(),
auditState.id,
/*clearProgressMetadata=*/true));
numFinishAuditsCleared++;
}
continue;
} else if (auditState.getPhase() == AuditPhase::Complete) {
if (numFinishAuditsCleared < numFinishAuditsToClear) {
// Clear audit metadata only
// No need to clear the corresponding progress metadata
// since it has been cleared for Complete audits
self->addActor.send(clearAuditMetadata(self->txnProcessor->context(),
auditState.getType(),
auditState.id,
/*clearProgressMetadata=*/false));
numFinishAuditsCleared++;
}
continue;
}
ASSERT(auditState.getPhase() == AuditPhase::Running);
TraceEvent(SevDebug, "AuditStorageResume", self->ddId)
.detail("AuditID", auditState.id)
.detail("AuditType", auditState.getType())
.detail("AuditState", auditState.toString())
.detail("NumFinishAuditsCleared", numFinishAuditsCleared)
.detail("IsReady", self->auditInitialized.getFuture().isReady());
runAuditStorage(self, auditState, 0, "ResumeAudit");
}
}
self->auditInitialized.send(Void());
TraceEvent(SevDebug, "AuditStorageResumeDone", self->ddId);
return Void();
}
// Periodically check and log the physicalShard status; clean up empty physicalShard;
ACTOR Future<Void> monitorPhysicalShardStatus(Reference<PhysicalShardCollection> self) {
ASSERT(SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
@ -1023,7 +998,6 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
self->context->trackerCancelled = false;
// whether all initial shard are tracked
self->initialized = Promise<Void>();
self->auditInitialized = Promise<Void>();
// Stored outside of data distribution tracker to avoid slow tasks
// when tracker is cancelled
@ -1073,8 +1047,6 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
anyZeroHealthyTeams = zeroHealthyTeams[0];
}
actors.push_back(resumeStorageAudits(self));
actors.push_back(self->pollMoveKeysLock());
self->context->tracker = makeReference<DataDistributionTracker>(
@ -1817,9 +1789,7 @@ ACTOR Future<Void> ddGetMetrics(GetDataDistributorMetricsRequest req,
ACTOR Future<Void> auditStorageCore(Reference<DataDistributor> self,
UID auditID,
AuditType auditType,
std::string context,
int currentRetryCount) {
// At this point, audit must be launched
ASSERT(auditID.isValid());
state std::shared_ptr<DDAudit> audit = getAuditFromAuditMap(self, auditType, auditID);
@ -1833,18 +1803,19 @@ ACTOR Future<Void> auditStorageCore(Reference<DataDistributor> self,
ASSERT(audit->coreState.ddId == self->ddId);
loadAndDispatchAudit(self, audit, audit->coreState.range);
TraceEvent(SevInfo, "DDAuditStorageCoreScheduled", self->ddId)
.detail("Context", context)
.detail("Context", audit->getDDAuditContext())
.detail("AuditID", audit->coreState.id)
.detail("Range", audit->coreState.range)
.detail("AuditType", audit->coreState.getType())
.detail("RetryCount", currentRetryCount)
.detail("IsReady", self->auditInitialized.getFuture().isReady());
.detail("AuditStorageCoreGeneration", currentRetryCount)
.detail("RetryCount", audit->retryCount);
wait(audit->actors.getResult()); // goto exception handler if any actor is failed
TraceEvent(SevInfo, "DDAuditStorageCoreAllActorsComplete", self->ddId)
.detail("AuditID", audit->coreState.id)
.detail("Range", audit->coreState.range)
.detail("AuditType", audit->coreState.getType())
.detail("RetryCount", currentRetryCount)
.detail("AuditStorageCoreGeneration", currentRetryCount)
.detail("RetryCount", audit->retryCount)
.detail("DDDoAuditTasksIssued", audit->overallIssuedDoAuditCount)
.detail("DDDoAuditTasksComplete", audit->overallCompleteDoAuditCount);
// reset for the usage for future retry
@ -1873,30 +1844,30 @@ ACTOR Future<Void> auditStorageCore(Reference<DataDistributor> self,
audit->coreState.setPhase(AuditPhase::Complete);
}
TraceEvent(SevVerbose, "DDAuditStorageCoreCompleteAudit", self->ddId)
.detail("Context", context)
.detail("Context", audit->getDDAuditContext())
.detail("AuditState", audit->coreState.toString())
.detail("RetryCount", currentRetryCount)
.detail("IsReady", self->auditInitialized.getFuture().isReady());
.detail("AuditStorageCoreGeneration", currentRetryCount)
.detail("RetryCount", audit->retryCount);
wait(persistAuditState(self->txnProcessor->context(),
audit->coreState,
"AuditStorageCore",
lockInfo,
self->context->isDDEnabled()));
TraceEvent(SevVerbose, "DDAuditStorageCoreSetResult", self->ddId)
.detail("Context", context)
.detail("Context", audit->getDDAuditContext())
.detail("AuditState", audit->coreState.toString())
.detail("RetryCount", currentRetryCount)
.detail("IsReady", self->auditInitialized.getFuture().isReady());
.detail("AuditStorageCoreGeneration", currentRetryCount)
.detail("RetryCount", audit->retryCount);
removeAuditFromAuditMap(self, audit->coreState.getType(),
audit->coreState.id); // remove audit
TraceEvent(SevInfo, "DDAuditStorageCoreEnd", self->ddId)
.detail("Context", context)
.detail("Context", audit->getDDAuditContext())
.detail("AuditID", auditID)
.detail("AuditType", auditType)
.detail("Range", audit->coreState.range)
.detail("RetryCount", currentRetryCount)
.detail("IsReady", self->auditInitialized.getFuture().isReady());
.detail("AuditStorageCoreGeneration", currentRetryCount)
.detail("RetryCount", audit->retryCount);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
// If this audit is cancelled, the place where cancelling
@ -1905,16 +1876,16 @@ ACTOR Future<Void> auditStorageCore(Reference<DataDistributor> self,
}
TraceEvent(SevDebug, "DDAuditStorageCoreError", self->ddId)
.errorUnsuppressed(e)
.detail("Context", context)
.detail("Context", audit->getDDAuditContext())
.detail("AuditID", auditID)
.detail("RetryCount", currentRetryCount)
.detail("AuditStorageCoreGeneration", currentRetryCount)
.detail("RetryCount", audit->retryCount)
.detail("AuditType", auditType)
.detail("Range", audit->coreState.range)
.detail("IsReady", self->auditInitialized.getFuture().isReady());
.detail("Range", audit->coreState.range);
if (e.code() == error_code_movekeys_conflict) {
removeAuditFromAuditMap(self, audit->coreState.getType(),
audit->coreState.id); // remove audit
throw e; // throw to DD and DD will restart
// Silently exit
} else if (e.code() == error_code_audit_storage_cancelled) {
// If this audit is cancelled, the place where cancelling
// this audit does removeAuditFromAuditMap
@ -1924,18 +1895,20 @@ ACTOR Future<Void> auditStorageCore(Reference<DataDistributor> self,
TraceEvent(SevVerbose, "DDAuditStorageCoreRetry", self->ddId)
.detail("AuditID", auditID)
.detail("AuditType", auditType)
.detail("RetryCount", currentRetryCount)
.detail("AuditStorageCoreGeneration", currentRetryCount)
.detail("RetryCount", audit->retryCount)
.detail("Contains", self->audits.contains(auditType) && self->audits[auditType].contains(auditID));
wait(delay(0.1));
TraceEvent(SevVerbose, "DDAuditStorageCoreRetryAfterWait", self->ddId)
.detail("AuditID", auditID)
.detail("AuditType", auditType)
.detail("RetryCount", currentRetryCount)
.detail("AuditStorageCoreGeneration", currentRetryCount)
.detail("RetryCount", audit->retryCount)
.detail("Contains", self->audits.contains(auditType) && self->audits[auditType].contains(auditID));
// Erase the old audit from map and spawn a new audit inherit from the old audit
removeAuditFromAuditMap(self, audit->coreState.getType(),
audit->coreState.id); // remove audit
runAuditStorage(self, audit->coreState, audit->retryCount, "auditStorageCoreRetry");
runAuditStorage(self, audit->coreState, audit->retryCount, DDAuditContext::Retry);
} else {
try {
audit->coreState.setPhase(AuditPhase::Failed);
@ -1944,22 +1917,22 @@ ACTOR Future<Void> auditStorageCore(Reference<DataDistributor> self,
"AuditStorageCoreError",
lockInfo,
self->context->isDDEnabled()));
TraceEvent(SevInfo, "DDAuditStorageCoreSetFailed", self->ddId)
.detail("Context", context)
TraceEvent(SevWarn, "DDAuditStorageCoreSetAuditFailed", self->ddId)
.detail("Context", audit->getDDAuditContext())
.detail("AuditID", auditID)
.detail("AuditType", auditType)
.detail("RetryCount", currentRetryCount)
.detail("AuditState", audit->coreState.toString())
.detail("IsReady", self->auditInitialized.getFuture().isReady());
.detail("AuditStorageCoreGeneration", currentRetryCount)
.detail("RetryCount", audit->retryCount)
.detail("AuditState", audit->coreState.toString());
} catch (Error& e) {
TraceEvent(SevWarn, "DDAuditStorageCoreErrorWhenSetAuditFailed", self->ddId)
.errorUnsuppressed(e)
.detail("Context", context)
.detail("Context", audit->getDDAuditContext())
.detail("AuditID", auditID)
.detail("AuditType", auditType)
.detail("RetryCount", currentRetryCount)
.detail("AuditState", audit->coreState.toString())
.detail("IsReady", self->auditInitialized.getFuture().isReady());
.detail("AuditStorageCoreGeneration", currentRetryCount)
.detail("RetryCount", audit->retryCount)
.detail("AuditState", audit->coreState.toString());
// unexpected error when persistAuditState
// However, we do not want any audit error kills the DD
// So, we silently remove audit from auditMap
@ -1989,7 +1962,7 @@ ACTOR Future<Void> auditStorageCore(Reference<DataDistributor> self,
void runAuditStorage(Reference<DataDistributor> self,
AuditStorageState auditState,
int retryCount,
std::string context) {
DDAuditContext context) {
// Validate input auditState
if (auditState.getType() != AuditType::ValidateHA && auditState.getType() != AuditType::ValidateReplica &&
auditState.getType() != AuditType::ValidateLocationMetadata &&
@ -2005,14 +1978,13 @@ void runAuditStorage(Reference<DataDistributor> self,
auditState.ddId = self->ddId; // make sure any existing audit state claims the current DD
std::shared_ptr<DDAudit> audit = std::make_shared<DDAudit>(auditState);
audit->retryCount = retryCount;
audit->setDDAuditContext(context);
addAuditToAuditMap(self, audit);
audit->setAuditRunActor(
auditStorageCore(self, audit->coreState.id, audit->coreState.getType(), context, audit->retryCount));
audit->setAuditRunActor(auditStorageCore(self, audit->coreState.id, audit->coreState.getType(), audit->retryCount));
return;
}
// Create/pick an audit for auditRange and auditType
// Return audit ID if no error happens
// Get audit for auditRange and auditType, if not exist, launch a new one
ACTOR Future<UID> launchAudit(Reference<DataDistributor> self, KeyRange auditRange, AuditType auditType) {
state MoveKeyLockInfo lockInfo;
lockInfo.myOwner = self->lock.myOwner;
@ -2021,21 +1993,10 @@ ACTOR Future<UID> launchAudit(Reference<DataDistributor> self, KeyRange auditRan
state UID auditID;
try {
TraceEvent(SevInfo, "DDAuditStorageLaunchTriggered", self->ddId)
TraceEvent(SevInfo, "DDAuditStorageLaunchStarts", self->ddId)
.detail("AuditType", auditType)
.detail("Range", auditRange)
.detail("IsReady", self->auditInitialized.getFuture().isReady());
std::vector<Future<Void>> fs;
fs.push_back(self->auditInitialized.getFuture());
fs.push_back(self->initialized.getFuture());
wait(waitForAll(fs));
// Get audit, if not exist, triggers a new one
ASSERT(self->auditInitialized.getFuture().isReady() && self->initialized.getFuture().isReady());
TraceEvent(SevVerbose, "DDAuditStorageLaunchStart", self->ddId)
.detail("AuditType", auditType)
.detail("Range", auditRange)
.detail("IsReady", self->auditInitialized.getFuture().isReady());
.detail("RequestedRange", auditRange);
wait(self->auditStorageInitialized.getFuture());
// Start an audit if no audit exists
// If existing an audit for a different purpose, send error to client
// aka, we only allow one audit at a time for all purposes
@ -2069,8 +2030,7 @@ ACTOR Future<UID> launchAudit(Reference<DataDistributor> self, KeyRange auditRan
.detail("AuditType", auditType)
.detail("AuditID", auditID)
.detail("RequestedRange", auditRange)
.detail("ExistingState", audit->coreState.toString())
.detail("IsReady", self->auditInitialized.getFuture().isReady());
.detail("ExistingState", audit->coreState.toString());
} else {
state AuditStorageState auditState;
auditState.setType(auditType);
@ -2104,7 +2064,14 @@ ACTOR Future<UID> launchAudit(Reference<DataDistributor> self, KeyRange auditRan
.detail("Range", auditRange);
auditState.id = auditID_;
auditID = auditID_;
runAuditStorage(self, auditState, 0, "LaunchAudit");
if (self->audits.contains(auditType) && self->audits[auditType].contains(auditID)) {
// It is possible that the current DD is running this audit
// Suppose DDinit re-runs right after a new audit is persisted
// For this case, auditResume sees the new audit and resumes it
// At this point, the new audit is already in the audit map
return auditID;
}
runAuditStorage(self, auditState, 0, DDAuditContext::Launch);
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
@ -2206,8 +2173,7 @@ ACTOR Future<Void> auditStorage(Reference<DataDistributor> self, TriggerAuditReq
TraceEvent(SevDebug, "DDAuditStorageStart", self->ddId)
.detail("RetryCount", retryCount)
.detail("AuditType", req.getType())
.detail("Range", req.range)
.detail("IsReady", self->auditInitialized.getFuture().isReady());
.detail("Range", req.range);
UID auditID = wait(launchAudit(self, req.range, req.getType()));
req.reply.send(auditID);
TraceEvent(SevVerbose, "DDAuditStorageReply", self->ddId)
@ -2393,8 +2359,7 @@ ACTOR Future<Void> scheduleAuditStorageShardOnServer(Reference<DataDistributor>
.detail("AuditType", auditType)
.detail("IssuedDoAuditCount", issueDoAuditCount);
if (e.code() == error_code_not_implemented || e.code() == error_code_audit_storage_exceeded_request_limit ||
e.code() == error_code_audit_storage_cancelled) {
if (e.code() == error_code_not_implemented || e.code() == error_code_audit_storage_cancelled) {
throw e;
} else if (e.code() == error_code_audit_storage_error) {
audit->foundError = true;

View File

@ -306,7 +306,7 @@ ACTOR static Future<Void> checkPersistentMoveKeysLock(Transaction* tr, MoveKeysL
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> readVal = wait(tr->get(moveKeysLockOwnerKey));
UID currentOwner = readVal.present() ? BinaryReader::fromStringRef<UID>(readVal.get(), Unversioned()) : UID();
state UID currentOwner = readVal.present() ? BinaryReader::fromStringRef<UID>(readVal.get(), Unversioned()) : UID();
if (currentOwner == lock.prevOwner) {
// Check that the previous owner hasn't touched the lock since we took it
@ -314,6 +314,13 @@ ACTOR static Future<Void> checkPersistentMoveKeysLock(Transaction* tr, MoveKeysL
UID lastWrite = readVal.present() ? BinaryReader::fromStringRef<UID>(readVal.get(), Unversioned()) : UID();
if (lastWrite != lock.prevWrite) {
CODE_PROBE(true, "checkMoveKeysLock: Conflict with previous owner");
TraceEvent(SevDebug, "CheckPersistentMoveKeysWritterConflict")
.errorUnsuppressed(movekeys_conflict())
.detail("PrevOwner", lock.prevOwner.toString())
.detail("PrevWrite", lock.prevWrite.toString())
.detail("MyOwner", lock.myOwner.toString())
.detail("CurrentOwner", currentOwner.toString())
.detail("Writer", lastWrite.toString());
throw movekeys_conflict();
}
@ -347,6 +354,12 @@ ACTOR static Future<Void> checkPersistentMoveKeysLock(Transaction* tr, MoveKeysL
return Void();
} else {
CODE_PROBE(true, "checkMoveKeysLock: Conflict with new owner");
TraceEvent(SevDebug, "CheckPersistentMoveKeysLockOwnerConflict")
.errorUnsuppressed(movekeys_conflict())
.detail("PrevOwner", lock.prevOwner.toString())
.detail("PrevWrite", lock.prevWrite.toString())
.detail("MyOwner", lock.myOwner.toString())
.detail("CurrentOwner", currentOwner.toString());
throw movekeys_conflict();
}
}
@ -426,8 +439,10 @@ ACTOR Future<bool> validateRangeAssignment(Database occ,
}
}
if (!allCorrect) {
try { // If corruption detected, stop using DD
int _ = wait(setDDMode(occ, 0));
try {
// If corruption detected, enter security mode which
// stops using data moves and only allow auditStorage
int _ = wait(setDDMode(occ, 2));
TraceEvent(SevInfo, "ValidateRangeAssignmentCorruptionDetectedAndDDStopped")
.detail("DataMoveID", dataMoveId)
.detail("Range", range)
@ -450,19 +465,17 @@ ACTOR Future<Void> auditLocationMetadataPreCheck(Database occ,
std::string context,
UID dataMoveId) {
if (range.empty()) {
TraceEvent(SevWarn, "AuditLocationMetadataEmptyInputRange")
.detail("By", "PreCheck")
.detail("AuditRange", range);
TraceEvent(SevWarn, "CheckLocationMetadataEmptyInputRange").detail("By", "PreCheck").detail("Range", range);
return Void();
}
state std::vector<Future<Void>> actors;
state std::unordered_map<UID, Optional<bool>> results;
TraceEvent(SevDebug, "AuditLocationMetadataStart")
TraceEvent(SevVerbose, "CheckLocationMetadataStart")
.detail("By", "PreCheck")
.detail("DataMoveID", dataMoveId)
.detail("Servers", describe(servers))
.detail("Context", context)
.detail("AuditRange", range);
.detail("Range", range);
try {
actors.clear();
results.clear();
@ -473,40 +486,40 @@ ACTOR Future<Void> auditLocationMetadataPreCheck(Database occ,
for (const auto& [ssid, res] : results) {
ASSERT(res.present());
if (!res.get()) { // Stop check if corruption detected
TraceEvent(SevError, "AuditLocationMetadataCorruptionDetected")
TraceEvent(SevError, "CheckLocationMetadataCorruptionDetected")
.detail("By", "PreCheck")
.detail("DataMoveID", dataMoveId)
.detail("Servers", describe(servers))
.detail("Context", context)
.detail("AuditRange", range);
.detail("Range", range);
throw location_metadata_corruption();
}
}
TraceEvent(SevDebug, "AuditLocationMetadataComplete")
TraceEvent(SevVerbose, "CheckLocationMetadataComplete")
.detail("By", "PreCheck")
.detail("DataMoveID", dataMoveId)
.detail("Servers", describe(servers))
.detail("Context", context)
.detail("AuditRange", range);
.detail("Range", range);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled || e.code() == error_code_location_metadata_corruption) {
throw e;
} else {
TraceEvent(SevInfo, "AuditLocationMetadataFailed")
TraceEvent(SevInfo, "CheckLocationMetadataFailed")
.errorUnsuppressed(e)
.detail("By", "PreCheck")
.detail("DataMoveID", dataMoveId)
.detail("Context", context)
.detail("AuditRange", range);
.detail("Range", range);
// Check any existing result when failure presents
for (const auto& [ssid, res] : results) {
if (res.present() && !res.get()) {
TraceEvent(SevError, "AuditLocationMetadataCorruptionDetectedWhenFailed")
TraceEvent(SevError, "CheckLocationMetadataCorruptionDetectedWhenFailed")
.detail("By", "PreCheck")
.detail("DataMoveID", dataMoveId)
.detail("Servers", describe(servers))
.detail("Context", context)
.detail("AuditRange", range);
.detail("Range", range);
throw location_metadata_corruption();
}
}
@ -518,9 +531,7 @@ ACTOR Future<Void> auditLocationMetadataPreCheck(Database occ,
ACTOR Future<Void> auditLocationMetadataPostCheck(Database occ, KeyRange range, std::string context, UID dataMoveId) {
if (range.empty()) {
TraceEvent(SevWarn, "AuditLocationMetadataEmptyInputRange")
.detail("By", "PostCheck")
.detail("AuditRange", range);
TraceEvent(SevWarn, "CheckLocationMetadataEmptyInputRange").detail("By", "PostCheck").detail("Range", range);
return Void();
}
state std::vector<Future<Void>> actors;
@ -530,10 +541,10 @@ ACTOR Future<Void> auditLocationMetadataPostCheck(Database occ, KeyRange range,
state RangeResult UIDtoTagMap;
state Transaction tr(occ);
state int retryCount = 0;
TraceEvent(SevDebug, "AuditLocationMetadataStart")
TraceEvent(SevVerbose, "CheckLocationMetadataStart")
.detail("By", "PostCheck")
.detail("Context", context)
.detail("AuditRange", range);
.detail("Range", range);
loop {
try {
loop {
@ -554,7 +565,7 @@ ACTOR Future<Void> auditLocationMetadataPostCheck(Database occ, KeyRange range,
actors.push_back(store(UIDtoTagMap, tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY)));
wait(waitForAll(actors));
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
TraceEvent(SevVerbose, "AuditLocationMetadataReadDone")
TraceEvent(SevVerbose, "CheckLocationMetadataReadDone")
.detail("By", "PostCheck")
.detail("ResultSize", readResultKS.size());
// Read serverKeys
@ -591,11 +602,11 @@ ACTOR Future<Void> auditLocationMetadataPostCheck(Database occ, KeyRange range,
rangeToReadBegin = readResultKS.back().key;
continue;
} else {
TraceEvent(SevDebug, "AuditLocationMetadataComplete")
TraceEvent(SevVerbose, "CheckLocationMetadataComplete")
.detail("By", "PostCheck")
.detail("DataMoveID", dataMoveId)
.detail("Context", context)
.detail("AuditRange", range);
.detail("Range", range);
break;
}
} catch (Error& e) {
@ -611,21 +622,21 @@ ACTOR Future<Void> auditLocationMetadataPostCheck(Database occ, KeyRange range,
// Check corruptions for the current (failed) round
for (const auto& [idx, res] : results) {
if (res.present() && !res.get()) {
TraceEvent(SevError, "AuditLocationMetadataCorruptionDetectedWhenFailed")
TraceEvent(SevError, "CheckLocationMetadataCorruptionDetectedWhenFailed")
.detail("By", "PostCheck")
.detail("DataMoveID", dataMoveId)
.detail("Context", context)
.detail("AuditRange", range);
.detail("Range", range);
throw location_metadata_corruption();
}
}
if (retryCount > SERVER_KNOBS->AUDIT_DATAMOVE_POST_CHECK_RETRY_COUNT_MAX) {
TraceEvent(SevInfo, "AuditLocationMetadataFailed")
TraceEvent(SevInfo, "CheckLocationMetadataFailed")
.errorUnsuppressed(e)
.detail("By", "PostCheck")
.detail("DataMoveID", dataMoveId)
.detail("Context", context)
.detail("AuditRange", range);
.detail("Range", range);
// If no corruption detected, exit silently
} else {
wait(delay(0.5));

View File

@ -5684,6 +5684,10 @@ ACTOR Future<Void> auditStorageServerShardQ(StorageServer* data, AuditStorageReq
.detail("NumValidatedLocalShards", cumulatedValidatedLocalShardsNum)
.detail("NumValidatedServerKeys", cumulatedValidatedServerKeysNum);
// Make sure the history collection is not open due to this audit
data->stopTrackShardAssignment();
TraceEvent(SevVerbose, "SSShardAssignmentHistoryRecordStopWhenExit", data->thisServerID).detail("AuditID", req.id);
return Void();
}

View File

@ -23,6 +23,7 @@
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/QuietDatabase.h"
#include "fdbrpc/simulator.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/Error.h"
@ -71,7 +72,17 @@ struct ValidateStorage : TestWorkload {
// We disable failure injection because there is an irrelevant issue:
// Remote tLog is failed to rejoin to CC
// Once this issue is fixed, we should be able to enable the failure injection
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override { out.emplace("Attrition"); }
// This workload is not compatible with following workload because they will race in changing the DD mode
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override {
out.insert({ "RandomMoveKeys",
"DataLossRecovery",
"IDDTxnProcessorApiCorrectness",
"PerpetualWiggleStatsWorkload",
"PhysicalShardMove",
"StorageCorruption",
"StorageServerCheckpointRestoreTest",
"Attrition" });
}
void validationFailed(ErrorOr<Optional<Value>> expectedValue, ErrorOr<Optional<Value>> actualValue) {
TraceEvent(SevError, "TestFailed")
@ -173,7 +184,6 @@ struct ValidateStorage : TestWorkload {
}
ACTOR Future<Void> checkAuditStorageInternalState(Database cx, AuditType type, UID auditId, std::string context) {
// Check no audit is in Running or Error phase
// Check the number of existing persisted audits is no more than PERSIST_FINISH_AUDIT_COUNT
state Transaction tr(cx);
loop {
@ -206,7 +216,11 @@ struct ValidateStorage : TestWorkload {
}
}
}
if (res.size() > SERVER_KNOBS->PERSIST_FINISH_AUDIT_COUNT + 1) {
if (res.size() > SERVER_KNOBS->PERSIST_FINISH_AUDIT_COUNT + 5) {
// Note that 5 is the sum of 4 + 1
// In the test, we issue at most 4 concurrent audits at the same time
// The 4 concurrent audits may not be complete in time
// So, the cleanup does not precisely guarantee PERSIST_FINISH_AUDIT_COUNT
TraceEvent("TestAuditStorageCheckPersistStateWaitClean")
.detail("ExistCount", res.size())
.detail("Context", context)
@ -294,6 +308,12 @@ struct ValidateStorage : TestWorkload {
wait(self->testAuditStorageProgress(self, cx));
TraceEvent("TestAuditStorageProgressDone");
wait(self->testAuditStorageWhenDDSecurityMode(self, cx));
TraceEvent("TestAuditStorageWhenDDSecurityModeDone");
wait(self->testAuditStorageWhenDDBackToNormalMode(self, cx));
TraceEvent("TestAuditStorageWhenDDBackToNormalModeDone");
return Void();
}
@ -710,6 +730,44 @@ struct ValidateStorage : TestWorkload {
return Void();
}
ACTOR Future<Void> testAuditStorageWhenDDSecurityMode(ValidateStorage* self, Database cx) {
TraceEvent("TestAuditStorageWhenDDSecurityModeBegin");
int _ = wait(setDDMode(cx, 2));
UID auditIdA =
wait(self->auditStorageForType(self, cx, AuditType::ValidateHA, "TestAuditStorageWhenDDSecurityMode"));
TraceEvent("TestFunctionalityHADoneWhenDDSecurityMode", auditIdA);
UID auditIdB =
wait(self->auditStorageForType(self, cx, AuditType::ValidateReplica, "TestAuditStorageWhenDDSecurityMode"));
TraceEvent("TestFunctionalityReplicaDoneWhenDDSecurityMode", auditIdB);
UID auditIdC = wait(self->auditStorageForType(
self, cx, AuditType::ValidateLocationMetadata, "TestAuditStorageWhenDDSecurityMode"));
TraceEvent("TestFunctionalityShardLocationMetadataDoneWhenDDSecurityMode", auditIdC);
UID auditIdD = wait(self->auditStorageForType(
self, cx, AuditType::ValidateStorageServerShard, "TestAuditStorageWhenDDSecurityMode"));
TraceEvent("TestFunctionalitySSShardInfoDoneWhenDDSecurityMode", auditIdD);
TraceEvent("TestAuditStorageWhenDDSecurityModeEnd");
return Void();
}
ACTOR Future<Void> testAuditStorageWhenDDBackToNormalMode(ValidateStorage* self, Database cx) {
TraceEvent("TestAuditStorageWhenDDBackToNormalModeBegin");
int _ = wait(setDDMode(cx, 1));
UID auditIdA =
wait(self->auditStorageForType(self, cx, AuditType::ValidateHA, "TestAuditStorageWhenDDBackToNormalMode"));
TraceEvent("TestFunctionalityHADoneWhenDDBackToNormalMode", auditIdA);
UID auditIdB = wait(
self->auditStorageForType(self, cx, AuditType::ValidateReplica, "TestAuditStorageWhenDDBackToNormalMode"));
TraceEvent("TestFunctionalityReplicaDoneWhenDDBackToNormalMode", auditIdB);
UID auditIdC = wait(self->auditStorageForType(
self, cx, AuditType::ValidateLocationMetadata, "TestAuditStorageWhenDDBackToNormalMode"));
TraceEvent("TestFunctionalityShardLocationMetadataDoneWhenDDBackToNormalMode", auditIdC);
UID auditIdD = wait(self->auditStorageForType(
self, cx, AuditType::ValidateStorageServerShard, "TestAuditStorageWhenDDBackToNormalMode"));
TraceEvent("TestFunctionalitySSShardInfoDoneWhenDDBackToNormalMode", auditIdD);
TraceEvent("TestAuditStorageWhenDDBackToNormalModeEnd");
return Void();
}
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(std::vector<PerfMetric>& m) override {}