From 3426fc3c1a86fc8c6a330ee7dd9037e83fcce005 Mon Sep 17 00:00:00 2001 From: Zhe Wang Date: Fri, 21 Jul 2023 17:06:25 -0700 Subject: [PATCH] Add DD Security Mode (#10646) * dd-security-mode * address comments * cleanup * revise tr option set in loadAndUpdateAuditMetadataWithNewDDId * address comments * reset auditStorageInitStarted before DD init * decouple audit resume and audit launch * audit launch new request should wait for resuming existing requests * address comment/clean up/fix * fix * fix initAuditMetadata retry * fix initAuditMetadata retry should reset tr --- fdbclient/AuditUtils.actor.cpp | 206 +++++---- fdbclient/ManagementAPI.actor.cpp | 2 + fdbclient/SpecialKeySpace.actor.cpp | 2 +- .../include/fdbclient/AuditUtils.actor.h | 7 +- fdbserver/DDTxnProcessor.actor.cpp | 7 - fdbserver/DataDistribution.actor.cpp | 393 ++++++++---------- fdbserver/MoveKeys.actor.cpp | 67 +-- fdbserver/storageserver.actor.cpp | 4 + fdbserver/workloads/ValidateStorage.actor.cpp | 64 ++- 9 files changed, 408 insertions(+), 344 deletions(-) diff --git a/fdbclient/AuditUtils.actor.cpp b/fdbclient/AuditUtils.actor.cpp index 226182fec7..6769502865 100644 --- a/fdbclient/AuditUtils.actor.cpp +++ b/fdbclient/AuditUtils.actor.cpp @@ -78,46 +78,6 @@ ACTOR Future checkStorageServerRemoved(Database cx, UID ssid) { return res; } -ACTOR Future clearAuditMetadata(Database cx, AuditType auditType, UID auditId, bool clearProgressMetadata) { - try { - state Transaction tr(cx); - TraceEvent(SevDebug, "AuditUtilClearAuditMetadataStart", auditId) - .detail("AuditKey", auditKey(auditType, auditId)); - loop { - try { - tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - Optional res_ = wait(tr.get(auditKey(auditType, auditId))); - if (!res_.present()) { // has been cleared - break; // Nothing to clear - } - state AuditStorageState toClearState = decodeAuditStorageState(res_.get()); - ASSERT(toClearState.id == auditId && toClearState.getType() == auditType); - // For a zombie audit, it is in running state - // Clear audit metadata - tr.clear(auditKey(auditType, auditId)); - // clear progress metadata - if (clearProgressMetadata) { - clearAuditProgressMetadata(&tr, auditType, auditId); - } - wait(tr.commit()); - TraceEvent(SevDebug, "AuditUtilClearAuditMetadataEnd", auditId) - .detail("AuditKey", auditKey(auditType, auditId)); - break; - } catch (Error& e) { - TraceEvent(SevDebug, "AuditUtilClearAuditMetadataError", auditId) - .detail("AuditKey", auditKey(auditType, auditId)); - wait(tr.onError(e)); - } - } - } catch (Error& e) { - // We do not want audit cleanup effects DD - // pass - } - return Void(); -} - ACTOR Future cancelAuditMetadata(Database cx, AuditType auditType, UID auditId) { try { state Transaction tr(cx); @@ -351,59 +311,15 @@ ACTOR static Future checkMoveKeysLock(Transaction* tr, return Void(); } else { CODE_PROBE(true, "checkMoveKeysLock: Conflict with new owner"); - TraceEvent(SevDebug, "AuditUtilConflictWithNewOwner"); + TraceEvent(SevDebug, "AuditUtilConflictWithNewOwner") + .detail("CurrentOwner", currentOwner.toString()) + .detail("PrevOwner", lock.prevOwner.toString()) + .detail("PrevWrite", lock.prevWrite.toString()) + .detail("MyOwner", lock.myOwner.toString()); throw movekeys_conflict(); // need a new name } } -ACTOR Future updateAuditState(Database cx, AuditStorageState auditState, MoveKeyLockInfo lock, bool ddEnabled) { - state Transaction tr(cx); - state bool hasCancelled = false; - - loop { - try { - tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - wait(checkMoveKeysLock(&tr, lock, ddEnabled, true)); - // Check existing state - Optional res_ = wait(tr.get(auditKey(auditState.getType(), auditState.id))); - if (!res_.present()) { // has been cancelled - hasCancelled = true; - break; // exit - } else { - const AuditStorageState currentState = decodeAuditStorageState(res_.get()); - ASSERT(currentState.id == auditState.id && currentState.getType() == auditState.getType()); - if (currentState.getPhase() == AuditPhase::Failed) { - hasCancelled = true; - break; // exit - } - } - // Persist audit result - tr.set(auditKey(auditState.getType(), auditState.id), auditStorageStateValue(auditState)); - wait(tr.commit()); - break; - } catch (Error& e) { - TraceEvent(SevDebug, "AuditUtilUpdateAuditStateError", auditState.id) - .errorUnsuppressed(e) - .detail("AuditID", auditState.id) - .detail("AuditType", auditState.getType()) - .detail("AuditPhase", auditState.getPhase()) - .detail("AuditKey", auditKey(auditState.getType(), auditState.id)); - wait(tr.onError(e)); - } - } - - TraceEvent(SevDebug, "AuditUtilUpdateAuditStateEnd", auditState.id) - .detail("Cancelled", hasCancelled) - .detail("AuditID", auditState.id) - .detail("AuditType", auditState.getType()) - .detail("AuditPhase", auditState.getPhase()) - .detail("AuditKey", auditKey(auditState.getType(), auditState.id)); - - return Void(); -} - ACTOR Future persistNewAuditState(Database cx, AuditStorageState auditState, MoveKeyLockInfo lock, @@ -793,3 +709,115 @@ ACTOR Future checkAuditProgressComplete(Database cx, AuditType auditType, } return true; } + +// Load RUNNING audit states to resume, clean up COMPLETE and FAILED audit states +// Update ddId for RUNNING audit states +ACTOR Future> initAuditMetadata(Database cx, + MoveKeyLockInfo lock, + bool ddEnabled, + UID dataDistributorId, + int persistFinishAuditCount) { + state std::unordered_map> existingAuditStates; + state std::vector auditStatesToResume; + state Transaction tr(cx); + state int retryCount = 0; + loop { + try { + // Load existing audit states and update ddId in audit states + existingAuditStates.clear(); + auditStatesToResume.clear(); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + wait(checkMoveKeysLock(&tr, lock, ddEnabled, true)); + RangeResult result = wait(tr.getRange(auditKeys, CLIENT_KNOBS->TOO_MANY)); + if (result.more || result.size() >= CLIENT_KNOBS->TOO_MANY) { + TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, + "AuditUtilLoadMetadataIncomplete", + dataDistributorId) + .detail("ResMore", result.more) + .detail("ResSize", result.size()); + } + for (int i = 0; i < result.size(); ++i) { + auto auditState = decodeAuditStorageState(result[i].value); + TraceEvent(SevVerbose, "AuditUtilLoadMetadataEach", dataDistributorId) + .detail("CurrentDDID", dataDistributorId) + .detail("AuditDDID", auditState.ddId) + .detail("AuditType", auditState.getType()) + .detail("AuditID", auditState.id) + .detail("AuditPhase", auditState.getPhase()); + if (auditState.getPhase() == AuditPhase::Running) { + AuditStorageState toUpdate = auditState; + toUpdate.ddId = dataDistributorId; + tr.set(auditKey(toUpdate.getType(), toUpdate.id), auditStorageStateValue(toUpdate)); + } + existingAuditStates[auditState.getType()].push_back(auditState); + } + // Cleanup Complete/Failed audit metadata for each type separately + for (const auto& [auditType, _] : existingAuditStates) { + int numFinishAudit = 0; // "finish" audits include Complete/Failed audits + for (const auto& auditState : existingAuditStates[auditType]) { + if (auditState.getPhase() == AuditPhase::Complete || auditState.getPhase() == AuditPhase::Failed) { + numFinishAudit++; + } + } + const int numFinishAuditsToClear = numFinishAudit - persistFinishAuditCount; + int numFinishAuditsCleared = 0; + std::sort(existingAuditStates[auditType].begin(), + existingAuditStates[auditType].end(), + [](AuditStorageState a, AuditStorageState b) { + return a.id < b.id; // Inplacement sort in ascending order + }); + for (const auto& auditState : existingAuditStates[auditType]) { + if (auditState.getPhase() == AuditPhase::Failed) { + if (numFinishAuditsCleared < numFinishAuditsToClear) { + // Clear both audit metadata and corresponding progress metadata + tr.clear(auditKey(auditState.getType(), auditState.id)); + clearAuditProgressMetadata(&tr, auditState.getType(), auditState.id); + numFinishAuditsCleared++; + TraceEvent(SevInfo, "AuditUtilMetadataCleared", dataDistributorId) + .detail("AuditID", auditState.id) + .detail("AuditType", auditState.getType()) + .detail("AuditRange", auditState.range); + } + } else if (auditState.getPhase() == AuditPhase::Complete) { + if (numFinishAuditsCleared < numFinishAuditsToClear) { + // Clear audit metadata only + // No need to clear the corresponding progress metadata + // since it has been cleared for Complete audits + tr.clear(auditKey(auditState.getType(), auditState.id)); + numFinishAuditsCleared++; + TraceEvent(SevInfo, "AuditUtilMetadataCleared", dataDistributorId) + .detail("AuditID", auditState.id) + .detail("AuditType", auditState.getType()) + .detail("AuditRange", auditState.range); + } + } else if (auditState.getPhase() == AuditPhase::Running) { + auditStatesToResume.push_back(auditState); + TraceEvent(SevInfo, "AuditUtilMetadataAddedToResume", dataDistributorId) + .detail("AuditID", auditState.id) + .detail("AuditType", auditState.getType()) + .detail("AuditRange", auditState.range); + } + } + } + wait(tr.commit()); + break; + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled || e.code() == error_code_movekeys_conflict) { + throw e; + } + if (retryCount > 50) { + TraceEvent(SevWarnAlways, "InitAuditMetadataExceedRetryMax", dataDistributorId).errorUnsuppressed(e); + break; + } + try { + wait(tr.onError(e)); + } catch (Error& e) { + retryCount++; + tr.reset(); + } + } + } + return auditStatesToResume; +} diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 21ae8898f1..51fa821194 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -2234,6 +2234,8 @@ ACTOR Future setDDMode(Database cx, int mode) { tr.set(dataDistributionModeKey, wr.toValue()); if (mode) { // set DDMode to 1 will enable all disabled parts, for instance the SS failure monitors. + // set DDMode to 2 is a security mode which disables data moves but allows auditStorage part + // DDMode=2 is set when shard location metadata inconsistency is detected Optional currentHealthyZoneValue = wait(tr.get(healthyZoneKey)); if (currentHealthyZoneValue.present() && decodeHealthyZoneValue(currentHealthyZoneValue.get()).first == ignoreSSFailuresZoneString) { diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 743c6c8226..3969ff8375 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -2626,7 +2626,7 @@ Future> DataDistributionImpl::commit(ReadYourWritesTransac try { int mode = boost::lexical_cast(iter->value().second.get().toString()); Value modeVal = BinaryWriter::toValue(mode, Unversioned()); - if (mode == 0 || mode == 1) { + if (mode == 0 || mode == 1 || mode == 2) { // Whenever configuration changes or DD related system keyspace is changed, // actor must grab the moveKeysLockOwnerKey and update moveKeysLockWriteKey. // This prevents concurrent write to the same system keyspace. diff --git a/fdbclient/include/fdbclient/AuditUtils.actor.h b/fdbclient/include/fdbclient/AuditUtils.actor.h index c42f62b620..ff38293003 100644 --- a/fdbclient/include/fdbclient/AuditUtils.actor.h +++ b/fdbclient/include/fdbclient/AuditUtils.actor.h @@ -36,7 +36,6 @@ struct MoveKeyLockInfo { UID prevOwner, myOwner, prevWrite; }; -ACTOR Future clearAuditMetadata(Database cx, AuditType auditType, UID auditId, bool clearProgressMetadata); ACTOR Future cancelAuditMetadata(Database cx, AuditType auditType, UID auditId); ACTOR Future persistNewAuditState(Database cx, AuditStorageState auditState, MoveKeyLockInfo lock, bool ddEnabled); ACTOR Future persistAuditState(Database cx, @@ -67,8 +66,12 @@ ACTOR Future clearAuditMetadataForType(Database cx, UID maxAuditIdToClear, int numFinishAuditToKeep); ACTOR Future checkStorageServerRemoved(Database cx, UID ssid); -ACTOR Future updateAuditState(Database cx, AuditStorageState auditState, MoveKeyLockInfo lock, bool ddEnabled); AuditPhase stringToAuditPhase(std::string auditPhaseStr); ACTOR Future checkAuditProgressComplete(Database cx, AuditType auditType, UID auditId, KeyRange auditRange); +ACTOR Future> initAuditMetadata(Database cx, + MoveKeyLockInfo lock, + bool ddEnabled, + UID dataDistributorId, + int persistFinishAuditCount); #include "flow/unactorcompiler.h" #endif diff --git a/fdbserver/DDTxnProcessor.actor.cpp b/fdbserver/DDTxnProcessor.actor.cpp index 6651451044..0e48b870e5 100644 --- a/fdbserver/DDTxnProcessor.actor.cpp +++ b/fdbserver/DDTxnProcessor.actor.cpp @@ -364,13 +364,6 @@ class DDTxnProcessorImpl { ++numDataMoves; } - RangeResult ads = wait(tr.getRange(auditKeys, CLIENT_KNOBS->TOO_MANY)); - ASSERT(!ads.more && ads.size() < CLIENT_KNOBS->TOO_MANY); - for (int i = 0; i < ads.size(); ++i) { - auto auditState = decodeAuditStorageState(ads[i].value); - result->auditStates.push_back(auditState); - } - succeeded = true; break; diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index a130c26798..cb0b150883 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -82,11 +82,17 @@ ShardSizeBounds ShardSizeBounds::shardSizeBoundsBeforeTrack() { .opsReadPerKSecond = StorageMetrics::infinity } }; } +enum class DDAuditContext : uint8_t { + Invalid = 0, + Resume = 1, + Launch = 2, + Retry = 3, +}; struct DDAudit { DDAudit(AuditStorageState coreState) : coreState(coreState), actors(true), foundError(false), auditStorageAnyChildFailed(false), retryCount(0), cancelled(false), overallCompleteDoAuditCount(0), overallIssuedDoAuditCount(0), - remainingBudgetForAuditTasks(SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX) {} + remainingBudgetForAuditTasks(SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX), context(0) {} AuditStorageState coreState; ActorCollection actors; @@ -98,10 +104,14 @@ struct DDAudit { int64_t overallIssuedDoAuditCount; int64_t overallCompleteDoAuditCount; AsyncVar remainingBudgetForAuditTasks; + uint8_t context; inline void setAuditRunActor(Future actor) { auditActor = actor; } inline Future getAuditRunActor() { return auditActor; } + inline void setDDAuditContext(DDAuditContext context) { this->context = static_cast(context); } + inline DDAuditContext getDDAuditContext() const { return static_cast(this->context); } + // auditActor and actors are guaranteed to deliver a cancel signal void cancel() { auditActor.cancel(); @@ -307,6 +317,33 @@ static std::set const& normalDDQueueErrors() { return s; } +struct DataDistributor; +void runAuditStorage(Reference self, + AuditStorageState auditStates, + int retryCount, + DDAuditContext context); +ACTOR Future auditStorageCore(Reference self, + UID auditID, + AuditType auditType, + int currentRetryCount); +ACTOR Future launchAudit(Reference self, KeyRange auditRange, AuditType auditType); +ACTOR Future auditStorage(Reference self, TriggerAuditRequest req); +void loadAndDispatchAudit(Reference self, std::shared_ptr audit, KeyRange range); +ACTOR Future dispatchAuditStorageServerShard(Reference self, std::shared_ptr audit); +ACTOR Future scheduleAuditStorageShardOnServer(Reference self, + std::shared_ptr audit, + StorageServerInterface ssi); +ACTOR Future dispatchAuditStorage(Reference self, + std::shared_ptr audit, + KeyRange range); +ACTOR Future scheduleAuditOnRange(Reference self, + std::shared_ptr audit, + KeyRange range); +ACTOR Future doAuditOnStorageServer(Reference self, + std::shared_ptr audit, + StorageServerInterface ssi, + AuditStorageRequest req); + struct DataDistributor : NonCopyable, ReferenceCounted { public: Reference const> dbInfo; @@ -339,11 +376,12 @@ public: Promise initialized; std::unordered_map>> audits; - Promise auditInitialized; FlowLock auditStorageHaLaunchingLock; FlowLock auditStorageReplicaLaunchingLock; FlowLock auditStorageLocationMetadataLaunchingLock; FlowLock auditStorageSsShardLaunchingLock; + Promise auditStorageInitialized; + bool auditStorageInitStarted; Optional> ddTenantCache; @@ -358,7 +396,8 @@ public: totalDataInFlightEventHolder(makeReference("TotalDataInFlight")), totalDataInFlightRemoteEventHolder(makeReference("TotalDataInFlightRemote")), teamCollection(nullptr), auditStorageHaLaunchingLock(1), auditStorageReplicaLaunchingLock(1), - auditStorageLocationMetadataLaunchingLock(1), auditStorageSsShardLaunchingLock(1) {} + auditStorageLocationMetadataLaunchingLock(1), auditStorageSsShardLaunchingLock(1), + auditStorageInitStarted(false) {} // bootstrap steps @@ -396,6 +435,72 @@ public: return txnProcessor->waitForDataDistributionEnabled(context->ddEnabledState.get()); } + // Resume in-memory audit instances and issue background audit metadata cleanup + void resumeAuditStorage(Reference self, std::vector auditStates) { + for (const auto& auditState : auditStates) { + if (auditState.getPhase() != AuditPhase::Running) { + TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, "WrongAuditStateToResume") + .detail("AuditState", auditState.toString()); + return; + } + if (self->audits.contains(auditState.getType()) && + self->audits[auditState.getType()].contains(auditState.id)) { + // Ignore any RUNNING auditState with an alive audit + // instance in DD audits map + continue; + } + runAuditStorage(self, auditState, 0, DDAuditContext::Resume); + TraceEvent(SevInfo, "AuditStorageResumed", self->ddId) + .detail("AuditID", auditState.id) + .detail("AuditType", auditState.getType()) + .detail("AuditState", auditState.toString()); + } + return; + } + + ACTOR static Future initAuditStorage(Reference self) { + self->auditStorageInitStarted = true; + MoveKeyLockInfo lockInfo; + lockInfo.myOwner = self->lock.myOwner; + lockInfo.prevOwner = self->lock.prevOwner; + lockInfo.prevWrite = self->lock.prevWrite; + std::vector auditStatesToResume = + wait(initAuditMetadata(self->txnProcessor->context(), + lockInfo, + self->context->isDDEnabled(), + self->ddId, + SERVER_KNOBS->PERSIST_FINISH_AUDIT_COUNT)); + self->resumeAuditStorage(self, auditStatesToResume); + self->auditStorageInitialized.send(Void()); + return Void(); + } + + ACTOR static Future waitUntilDataDistributorExitSecurityMode(Reference self) { + state Transaction tr(self->txnProcessor->context()); + loop { + wait(delay(SERVER_KNOBS->DD_ENABLED_CHECK_DELAY, TaskPriority::DataDistribution)); + tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE); + tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + try { + Optional mode = wait(tr.get(dataDistributionModeKey)); + if (!mode.present()) { + return Void(); + } + BinaryReader rd(mode.get(), Unversioned()); + int ddMode = 1; + rd >> ddMode; + if (ddMode != 2) { + return Void(); + } + wait(checkMoveKeysLockReadOnly(&tr, self->context->lock, self->context->ddEnabledState.get())); + tr.reset(); + } catch (Error& e) { + wait(tr.onError(e)); + } + } + } + // Initialize the required internal states of DataDistributor from system metadata. It's necessary before // DataDistributor start working. Doesn't include initialization of optional components, like TenantCache, DDQueue, // Tracker, TeamCollection. The components should call its own ::init methods. @@ -408,6 +513,25 @@ public: wait(self->takeMoveKeysLock()); TraceEvent("DDInitTookMoveKeysLock", self->ddId).log(); + // AuditStorage does not rely on DatabaseConfiguration + // AuditStorage read neccessary info purely from system key space + if (!self->auditStorageInitStarted) { + // Avoid multiple initAuditStorages + self->addActor.send(self->initAuditStorage(self)); + } + // It is possible that an audit request arrives and then DDMode + // is set to 2 at this point + // No polling MoveKeyLock is running + // So, we need to check MoveKeyLock when waitUntilDataDistributorExitSecurityMode + wait(waitUntilDataDistributorExitSecurityMode(self)); // Trap DDMode == 2 + // It is possible DDMode begins with 2 and passes + // waitDataDistributorEnabled and then set to 0 before + // waitUntilDataDistributorExitSecurityMode. For this case, + // after waitUntilDataDistributorExitSecurityMode, DDMode is 0. + // The init loop does not break and the loop will stuct at + // waitDataDistributorEnabled in the next iteration. + TraceEvent("DataDistributorExitSecurityMode").log(); + wait(self->loadDatabaseConfiguration()); self->initDcInfo(); TraceEvent("DDInitGotConfiguration", self->ddId) @@ -442,7 +566,7 @@ public: .trackLatest(self->initialDDEventHolder->trackingKey); } - if (self->initData->mode && self->context->isDDEnabled()) { + if (self->initData->mode == 1 && self->context->isDDEnabled()) { // mode may be set true by system operator using fdbcli and isEnabled() set to true break; } @@ -780,155 +904,6 @@ inline std::unordered_map> getAuditsForType(Refere return self->audits[auditType]; } -void runAuditStorage(Reference self, - AuditStorageState auditStates, - int retryCount, - std::string context); -ACTOR Future auditStorageCore(Reference self, - UID auditID, - AuditType auditType, - std::string context, - int currentRetryCount); -ACTOR Future launchAudit(Reference self, KeyRange auditRange, AuditType auditType); -ACTOR Future auditStorage(Reference self, TriggerAuditRequest req); -void loadAndDispatchAudit(Reference self, std::shared_ptr audit, KeyRange range); -ACTOR Future dispatchAuditStorageServerShard(Reference self, std::shared_ptr audit); -ACTOR Future scheduleAuditStorageShardOnServer(Reference self, - std::shared_ptr audit, - StorageServerInterface ssi); -ACTOR Future dispatchAuditStorage(Reference self, - std::shared_ptr audit, - KeyRange range); -ACTOR Future scheduleAuditOnRange(Reference self, - std::shared_ptr audit, - KeyRange range); -ACTOR Future doAuditOnStorageServer(Reference self, - std::shared_ptr audit, - StorageServerInterface ssi, - AuditStorageRequest req); - -void cancelAllAuditsInAuditMap(Reference self) { - TraceEvent(SevDebug, "AuditMapOps", self->ddId).detail("Ops", "cancelAllAuditsInAuditMap"); - for (auto& [auditType, auditMap] : self->audits) { - for (auto& [auditID, audit] : auditMap) { - // Any existing audit should stop running when the context switches out - audit->cancel(); - } - } - self->audits.clear(); - return; -} - -ACTOR Future resumeStorageAudits(Reference self) { - ASSERT(!self->auditInitialized.getFuture().isReady()); - if (self->initData->auditStates.empty()) { - self->auditInitialized.send(Void()); - TraceEvent(SevVerbose, "AuditStorageResumeEmptyDone", self->ddId); - return Void(); - } - - // Update metadata - state int retryCount = 0; - loop { - try { - std::vector> fs; - state MoveKeyLockInfo lockInfo; - lockInfo.myOwner = self->lock.myOwner; - lockInfo.prevOwner = self->lock.prevOwner; - lockInfo.prevWrite = self->lock.prevWrite; - for (const auto& auditState : self->initData->auditStates) { - // Only running audit will be resumed - if (auditState.getPhase() == AuditPhase::Running) { - AuditStorageState toUpdate = auditState; - toUpdate.ddId = self->ddId; - fs.push_back(updateAuditState( - self->txnProcessor->context(), toUpdate, lockInfo, self->context->isDDEnabled())); - } - } - wait(waitForAll(fs)); - break; - } catch (Error& e) { - if (e.code() == error_code_actor_cancelled || e.code() == error_code_movekeys_conflict) { - throw e; - } - if (retryCount > 50) { - TraceEvent(SevWarnAlways, "AuditStorageResumeUnableUpdateMetadata", self->ddId).errorUnsuppressed(e); - return Void(); - } - retryCount++; - } - } - - // Following is atomic - // Cancel existing audits and restore - cancelAllAuditsInAuditMap(self); - std::unordered_map> restoredAudits; - for (const auto& auditState : self->initData->auditStates) { - restoredAudits[auditState.getType()].push_back(auditState); - } - // We clear existing audit state for each auditType separately - for (const auto& [auditType, _] : restoredAudits) { - int numFinishAudit = 0; // "finish" audits include Complete/Failed audits - for (const auto& auditState : restoredAudits[auditType]) { - if (auditState.getPhase() == AuditPhase::Complete || auditState.getPhase() == AuditPhase::Failed) { - numFinishAudit++; - } - } - const int numFinishAuditsToClear = numFinishAudit - SERVER_KNOBS->PERSIST_FINISH_AUDIT_COUNT; - int numFinishAuditsCleared = 0; - std::sort(restoredAudits[auditType].begin(), - restoredAudits[auditType].end(), - [](AuditStorageState a, AuditStorageState b) { - return a.id < b.id; // Inplacement sort in ascending order - }); - // Cleanup audit metadata for Failed/Complete audits - // Resume RUNNING audits - // Keep Error audits persistent - for (const auto& auditState : restoredAudits[auditType]) { - TraceEvent(SevVerbose, "AuditStorageResumeCheck", self->ddId) - .detail("AuditID", auditState.id) - .detail("AuditType", auditState.getType()); - if (auditState.getPhase() == AuditPhase::Error) { - continue; - } else if (auditState.getPhase() == AuditPhase::Failed) { - if (numFinishAuditsCleared < numFinishAuditsToClear) { - // Clear both audit metadata and corresponding progress metadata - self->addActor.send(clearAuditMetadata(self->txnProcessor->context(), - auditState.getType(), - auditState.id, - /*clearProgressMetadata=*/true)); - numFinishAuditsCleared++; - } - continue; - } else if (auditState.getPhase() == AuditPhase::Complete) { - if (numFinishAuditsCleared < numFinishAuditsToClear) { - // Clear audit metadata only - // No need to clear the corresponding progress metadata - // since it has been cleared for Complete audits - self->addActor.send(clearAuditMetadata(self->txnProcessor->context(), - auditState.getType(), - auditState.id, - /*clearProgressMetadata=*/false)); - numFinishAuditsCleared++; - } - continue; - } - ASSERT(auditState.getPhase() == AuditPhase::Running); - TraceEvent(SevDebug, "AuditStorageResume", self->ddId) - .detail("AuditID", auditState.id) - .detail("AuditType", auditState.getType()) - .detail("AuditState", auditState.toString()) - .detail("NumFinishAuditsCleared", numFinishAuditsCleared) - .detail("IsReady", self->auditInitialized.getFuture().isReady()); - runAuditStorage(self, auditState, 0, "ResumeAudit"); - } - } - - self->auditInitialized.send(Void()); - TraceEvent(SevDebug, "AuditStorageResumeDone", self->ddId); - return Void(); -} - // Periodically check and log the physicalShard status; clean up empty physicalShard; ACTOR Future monitorPhysicalShardStatus(Reference self) { ASSERT(SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA); @@ -1023,7 +998,6 @@ ACTOR Future dataDistribution(Reference self, self->context->trackerCancelled = false; // whether all initial shard are tracked self->initialized = Promise(); - self->auditInitialized = Promise(); // Stored outside of data distribution tracker to avoid slow tasks // when tracker is cancelled @@ -1073,8 +1047,6 @@ ACTOR Future dataDistribution(Reference self, anyZeroHealthyTeams = zeroHealthyTeams[0]; } - actors.push_back(resumeStorageAudits(self)); - actors.push_back(self->pollMoveKeysLock()); self->context->tracker = makeReference( @@ -1817,9 +1789,7 @@ ACTOR Future ddGetMetrics(GetDataDistributorMetricsRequest req, ACTOR Future auditStorageCore(Reference self, UID auditID, AuditType auditType, - std::string context, int currentRetryCount) { - // At this point, audit must be launched ASSERT(auditID.isValid()); state std::shared_ptr audit = getAuditFromAuditMap(self, auditType, auditID); @@ -1833,18 +1803,19 @@ ACTOR Future auditStorageCore(Reference self, ASSERT(audit->coreState.ddId == self->ddId); loadAndDispatchAudit(self, audit, audit->coreState.range); TraceEvent(SevInfo, "DDAuditStorageCoreScheduled", self->ddId) - .detail("Context", context) + .detail("Context", audit->getDDAuditContext()) .detail("AuditID", audit->coreState.id) .detail("Range", audit->coreState.range) .detail("AuditType", audit->coreState.getType()) - .detail("RetryCount", currentRetryCount) - .detail("IsReady", self->auditInitialized.getFuture().isReady()); + .detail("AuditStorageCoreGeneration", currentRetryCount) + .detail("RetryCount", audit->retryCount); wait(audit->actors.getResult()); // goto exception handler if any actor is failed TraceEvent(SevInfo, "DDAuditStorageCoreAllActorsComplete", self->ddId) .detail("AuditID", audit->coreState.id) .detail("Range", audit->coreState.range) .detail("AuditType", audit->coreState.getType()) - .detail("RetryCount", currentRetryCount) + .detail("AuditStorageCoreGeneration", currentRetryCount) + .detail("RetryCount", audit->retryCount) .detail("DDDoAuditTasksIssued", audit->overallIssuedDoAuditCount) .detail("DDDoAuditTasksComplete", audit->overallCompleteDoAuditCount); // reset for the usage for future retry @@ -1873,30 +1844,30 @@ ACTOR Future auditStorageCore(Reference self, audit->coreState.setPhase(AuditPhase::Complete); } TraceEvent(SevVerbose, "DDAuditStorageCoreCompleteAudit", self->ddId) - .detail("Context", context) + .detail("Context", audit->getDDAuditContext()) .detail("AuditState", audit->coreState.toString()) - .detail("RetryCount", currentRetryCount) - .detail("IsReady", self->auditInitialized.getFuture().isReady()); + .detail("AuditStorageCoreGeneration", currentRetryCount) + .detail("RetryCount", audit->retryCount); wait(persistAuditState(self->txnProcessor->context(), audit->coreState, "AuditStorageCore", lockInfo, self->context->isDDEnabled())); TraceEvent(SevVerbose, "DDAuditStorageCoreSetResult", self->ddId) - .detail("Context", context) + .detail("Context", audit->getDDAuditContext()) .detail("AuditState", audit->coreState.toString()) - .detail("RetryCount", currentRetryCount) - .detail("IsReady", self->auditInitialized.getFuture().isReady()); + .detail("AuditStorageCoreGeneration", currentRetryCount) + .detail("RetryCount", audit->retryCount); removeAuditFromAuditMap(self, audit->coreState.getType(), audit->coreState.id); // remove audit TraceEvent(SevInfo, "DDAuditStorageCoreEnd", self->ddId) - .detail("Context", context) + .detail("Context", audit->getDDAuditContext()) .detail("AuditID", auditID) .detail("AuditType", auditType) .detail("Range", audit->coreState.range) - .detail("RetryCount", currentRetryCount) - .detail("IsReady", self->auditInitialized.getFuture().isReady()); + .detail("AuditStorageCoreGeneration", currentRetryCount) + .detail("RetryCount", audit->retryCount); } catch (Error& e) { if (e.code() == error_code_actor_cancelled) { // If this audit is cancelled, the place where cancelling @@ -1905,16 +1876,16 @@ ACTOR Future auditStorageCore(Reference self, } TraceEvent(SevDebug, "DDAuditStorageCoreError", self->ddId) .errorUnsuppressed(e) - .detail("Context", context) + .detail("Context", audit->getDDAuditContext()) .detail("AuditID", auditID) - .detail("RetryCount", currentRetryCount) + .detail("AuditStorageCoreGeneration", currentRetryCount) + .detail("RetryCount", audit->retryCount) .detail("AuditType", auditType) - .detail("Range", audit->coreState.range) - .detail("IsReady", self->auditInitialized.getFuture().isReady()); + .detail("Range", audit->coreState.range); if (e.code() == error_code_movekeys_conflict) { removeAuditFromAuditMap(self, audit->coreState.getType(), audit->coreState.id); // remove audit - throw e; // throw to DD and DD will restart + // Silently exit } else if (e.code() == error_code_audit_storage_cancelled) { // If this audit is cancelled, the place where cancelling // this audit does removeAuditFromAuditMap @@ -1924,18 +1895,20 @@ ACTOR Future auditStorageCore(Reference self, TraceEvent(SevVerbose, "DDAuditStorageCoreRetry", self->ddId) .detail("AuditID", auditID) .detail("AuditType", auditType) - .detail("RetryCount", currentRetryCount) + .detail("AuditStorageCoreGeneration", currentRetryCount) + .detail("RetryCount", audit->retryCount) .detail("Contains", self->audits.contains(auditType) && self->audits[auditType].contains(auditID)); wait(delay(0.1)); TraceEvent(SevVerbose, "DDAuditStorageCoreRetryAfterWait", self->ddId) .detail("AuditID", auditID) .detail("AuditType", auditType) - .detail("RetryCount", currentRetryCount) + .detail("AuditStorageCoreGeneration", currentRetryCount) + .detail("RetryCount", audit->retryCount) .detail("Contains", self->audits.contains(auditType) && self->audits[auditType].contains(auditID)); // Erase the old audit from map and spawn a new audit inherit from the old audit removeAuditFromAuditMap(self, audit->coreState.getType(), audit->coreState.id); // remove audit - runAuditStorage(self, audit->coreState, audit->retryCount, "auditStorageCoreRetry"); + runAuditStorage(self, audit->coreState, audit->retryCount, DDAuditContext::Retry); } else { try { audit->coreState.setPhase(AuditPhase::Failed); @@ -1944,22 +1917,22 @@ ACTOR Future auditStorageCore(Reference self, "AuditStorageCoreError", lockInfo, self->context->isDDEnabled())); - TraceEvent(SevInfo, "DDAuditStorageCoreSetFailed", self->ddId) - .detail("Context", context) + TraceEvent(SevWarn, "DDAuditStorageCoreSetAuditFailed", self->ddId) + .detail("Context", audit->getDDAuditContext()) .detail("AuditID", auditID) .detail("AuditType", auditType) - .detail("RetryCount", currentRetryCount) - .detail("AuditState", audit->coreState.toString()) - .detail("IsReady", self->auditInitialized.getFuture().isReady()); + .detail("AuditStorageCoreGeneration", currentRetryCount) + .detail("RetryCount", audit->retryCount) + .detail("AuditState", audit->coreState.toString()); } catch (Error& e) { TraceEvent(SevWarn, "DDAuditStorageCoreErrorWhenSetAuditFailed", self->ddId) .errorUnsuppressed(e) - .detail("Context", context) + .detail("Context", audit->getDDAuditContext()) .detail("AuditID", auditID) .detail("AuditType", auditType) - .detail("RetryCount", currentRetryCount) - .detail("AuditState", audit->coreState.toString()) - .detail("IsReady", self->auditInitialized.getFuture().isReady()); + .detail("AuditStorageCoreGeneration", currentRetryCount) + .detail("RetryCount", audit->retryCount) + .detail("AuditState", audit->coreState.toString()); // unexpected error when persistAuditState // However, we do not want any audit error kills the DD // So, we silently remove audit from auditMap @@ -1989,7 +1962,7 @@ ACTOR Future auditStorageCore(Reference self, void runAuditStorage(Reference self, AuditStorageState auditState, int retryCount, - std::string context) { + DDAuditContext context) { // Validate input auditState if (auditState.getType() != AuditType::ValidateHA && auditState.getType() != AuditType::ValidateReplica && auditState.getType() != AuditType::ValidateLocationMetadata && @@ -2005,14 +1978,13 @@ void runAuditStorage(Reference self, auditState.ddId = self->ddId; // make sure any existing audit state claims the current DD std::shared_ptr audit = std::make_shared(auditState); audit->retryCount = retryCount; + audit->setDDAuditContext(context); addAuditToAuditMap(self, audit); - audit->setAuditRunActor( - auditStorageCore(self, audit->coreState.id, audit->coreState.getType(), context, audit->retryCount)); + audit->setAuditRunActor(auditStorageCore(self, audit->coreState.id, audit->coreState.getType(), audit->retryCount)); return; } -// Create/pick an audit for auditRange and auditType -// Return audit ID if no error happens +// Get audit for auditRange and auditType, if not exist, launch a new one ACTOR Future launchAudit(Reference self, KeyRange auditRange, AuditType auditType) { state MoveKeyLockInfo lockInfo; lockInfo.myOwner = self->lock.myOwner; @@ -2021,21 +1993,10 @@ ACTOR Future launchAudit(Reference self, KeyRange auditRan state UID auditID; try { - TraceEvent(SevInfo, "DDAuditStorageLaunchTriggered", self->ddId) + TraceEvent(SevInfo, "DDAuditStorageLaunchStarts", self->ddId) .detail("AuditType", auditType) - .detail("Range", auditRange) - .detail("IsReady", self->auditInitialized.getFuture().isReady()); - std::vector> fs; - fs.push_back(self->auditInitialized.getFuture()); - fs.push_back(self->initialized.getFuture()); - wait(waitForAll(fs)); - - // Get audit, if not exist, triggers a new one - ASSERT(self->auditInitialized.getFuture().isReady() && self->initialized.getFuture().isReady()); - TraceEvent(SevVerbose, "DDAuditStorageLaunchStart", self->ddId) - .detail("AuditType", auditType) - .detail("Range", auditRange) - .detail("IsReady", self->auditInitialized.getFuture().isReady()); + .detail("RequestedRange", auditRange); + wait(self->auditStorageInitialized.getFuture()); // Start an audit if no audit exists // If existing an audit for a different purpose, send error to client // aka, we only allow one audit at a time for all purposes @@ -2069,8 +2030,7 @@ ACTOR Future launchAudit(Reference self, KeyRange auditRan .detail("AuditType", auditType) .detail("AuditID", auditID) .detail("RequestedRange", auditRange) - .detail("ExistingState", audit->coreState.toString()) - .detail("IsReady", self->auditInitialized.getFuture().isReady()); + .detail("ExistingState", audit->coreState.toString()); } else { state AuditStorageState auditState; auditState.setType(auditType); @@ -2104,7 +2064,14 @@ ACTOR Future launchAudit(Reference self, KeyRange auditRan .detail("Range", auditRange); auditState.id = auditID_; auditID = auditID_; - runAuditStorage(self, auditState, 0, "LaunchAudit"); + if (self->audits.contains(auditType) && self->audits[auditType].contains(auditID)) { + // It is possible that the current DD is running this audit + // Suppose DDinit re-runs right after a new audit is persisted + // For this case, auditResume sees the new audit and resumes it + // At this point, the new audit is already in the audit map + return auditID; + } + runAuditStorage(self, auditState, 0, DDAuditContext::Launch); } } catch (Error& e) { if (e.code() == error_code_actor_cancelled) { @@ -2206,8 +2173,7 @@ ACTOR Future auditStorage(Reference self, TriggerAuditReq TraceEvent(SevDebug, "DDAuditStorageStart", self->ddId) .detail("RetryCount", retryCount) .detail("AuditType", req.getType()) - .detail("Range", req.range) - .detail("IsReady", self->auditInitialized.getFuture().isReady()); + .detail("Range", req.range); UID auditID = wait(launchAudit(self, req.range, req.getType())); req.reply.send(auditID); TraceEvent(SevVerbose, "DDAuditStorageReply", self->ddId) @@ -2393,8 +2359,7 @@ ACTOR Future scheduleAuditStorageShardOnServer(Reference .detail("AuditType", auditType) .detail("IssuedDoAuditCount", issueDoAuditCount); - if (e.code() == error_code_not_implemented || e.code() == error_code_audit_storage_exceeded_request_limit || - e.code() == error_code_audit_storage_cancelled) { + if (e.code() == error_code_not_implemented || e.code() == error_code_audit_storage_cancelled) { throw e; } else if (e.code() == error_code_audit_storage_error) { audit->foundError = true; diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index b9421377fd..ffcace1a73 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -306,7 +306,7 @@ ACTOR static Future checkPersistentMoveKeysLock(Transaction* tr, MoveKeysL tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); Optional readVal = wait(tr->get(moveKeysLockOwnerKey)); - UID currentOwner = readVal.present() ? BinaryReader::fromStringRef(readVal.get(), Unversioned()) : UID(); + state UID currentOwner = readVal.present() ? BinaryReader::fromStringRef(readVal.get(), Unversioned()) : UID(); if (currentOwner == lock.prevOwner) { // Check that the previous owner hasn't touched the lock since we took it @@ -314,6 +314,13 @@ ACTOR static Future checkPersistentMoveKeysLock(Transaction* tr, MoveKeysL UID lastWrite = readVal.present() ? BinaryReader::fromStringRef(readVal.get(), Unversioned()) : UID(); if (lastWrite != lock.prevWrite) { CODE_PROBE(true, "checkMoveKeysLock: Conflict with previous owner"); + TraceEvent(SevDebug, "CheckPersistentMoveKeysWritterConflict") + .errorUnsuppressed(movekeys_conflict()) + .detail("PrevOwner", lock.prevOwner.toString()) + .detail("PrevWrite", lock.prevWrite.toString()) + .detail("MyOwner", lock.myOwner.toString()) + .detail("CurrentOwner", currentOwner.toString()) + .detail("Writer", lastWrite.toString()); throw movekeys_conflict(); } @@ -347,6 +354,12 @@ ACTOR static Future checkPersistentMoveKeysLock(Transaction* tr, MoveKeysL return Void(); } else { CODE_PROBE(true, "checkMoveKeysLock: Conflict with new owner"); + TraceEvent(SevDebug, "CheckPersistentMoveKeysLockOwnerConflict") + .errorUnsuppressed(movekeys_conflict()) + .detail("PrevOwner", lock.prevOwner.toString()) + .detail("PrevWrite", lock.prevWrite.toString()) + .detail("MyOwner", lock.myOwner.toString()) + .detail("CurrentOwner", currentOwner.toString()); throw movekeys_conflict(); } } @@ -426,8 +439,10 @@ ACTOR Future validateRangeAssignment(Database occ, } } if (!allCorrect) { - try { // If corruption detected, stop using DD - int _ = wait(setDDMode(occ, 0)); + try { + // If corruption detected, enter security mode which + // stops using data moves and only allow auditStorage + int _ = wait(setDDMode(occ, 2)); TraceEvent(SevInfo, "ValidateRangeAssignmentCorruptionDetectedAndDDStopped") .detail("DataMoveID", dataMoveId) .detail("Range", range) @@ -450,19 +465,17 @@ ACTOR Future auditLocationMetadataPreCheck(Database occ, std::string context, UID dataMoveId) { if (range.empty()) { - TraceEvent(SevWarn, "AuditLocationMetadataEmptyInputRange") - .detail("By", "PreCheck") - .detail("AuditRange", range); + TraceEvent(SevWarn, "CheckLocationMetadataEmptyInputRange").detail("By", "PreCheck").detail("Range", range); return Void(); } state std::vector> actors; state std::unordered_map> results; - TraceEvent(SevDebug, "AuditLocationMetadataStart") + TraceEvent(SevVerbose, "CheckLocationMetadataStart") .detail("By", "PreCheck") .detail("DataMoveID", dataMoveId) .detail("Servers", describe(servers)) .detail("Context", context) - .detail("AuditRange", range); + .detail("Range", range); try { actors.clear(); results.clear(); @@ -473,40 +486,40 @@ ACTOR Future auditLocationMetadataPreCheck(Database occ, for (const auto& [ssid, res] : results) { ASSERT(res.present()); if (!res.get()) { // Stop check if corruption detected - TraceEvent(SevError, "AuditLocationMetadataCorruptionDetected") + TraceEvent(SevError, "CheckLocationMetadataCorruptionDetected") .detail("By", "PreCheck") .detail("DataMoveID", dataMoveId) .detail("Servers", describe(servers)) .detail("Context", context) - .detail("AuditRange", range); + .detail("Range", range); throw location_metadata_corruption(); } } - TraceEvent(SevDebug, "AuditLocationMetadataComplete") + TraceEvent(SevVerbose, "CheckLocationMetadataComplete") .detail("By", "PreCheck") .detail("DataMoveID", dataMoveId) .detail("Servers", describe(servers)) .detail("Context", context) - .detail("AuditRange", range); + .detail("Range", range); } catch (Error& e) { if (e.code() == error_code_actor_cancelled || e.code() == error_code_location_metadata_corruption) { throw e; } else { - TraceEvent(SevInfo, "AuditLocationMetadataFailed") + TraceEvent(SevInfo, "CheckLocationMetadataFailed") .errorUnsuppressed(e) .detail("By", "PreCheck") .detail("DataMoveID", dataMoveId) .detail("Context", context) - .detail("AuditRange", range); + .detail("Range", range); // Check any existing result when failure presents for (const auto& [ssid, res] : results) { if (res.present() && !res.get()) { - TraceEvent(SevError, "AuditLocationMetadataCorruptionDetectedWhenFailed") + TraceEvent(SevError, "CheckLocationMetadataCorruptionDetectedWhenFailed") .detail("By", "PreCheck") .detail("DataMoveID", dataMoveId) .detail("Servers", describe(servers)) .detail("Context", context) - .detail("AuditRange", range); + .detail("Range", range); throw location_metadata_corruption(); } } @@ -518,9 +531,7 @@ ACTOR Future auditLocationMetadataPreCheck(Database occ, ACTOR Future auditLocationMetadataPostCheck(Database occ, KeyRange range, std::string context, UID dataMoveId) { if (range.empty()) { - TraceEvent(SevWarn, "AuditLocationMetadataEmptyInputRange") - .detail("By", "PostCheck") - .detail("AuditRange", range); + TraceEvent(SevWarn, "CheckLocationMetadataEmptyInputRange").detail("By", "PostCheck").detail("Range", range); return Void(); } state std::vector> actors; @@ -530,10 +541,10 @@ ACTOR Future auditLocationMetadataPostCheck(Database occ, KeyRange range, state RangeResult UIDtoTagMap; state Transaction tr(occ); state int retryCount = 0; - TraceEvent(SevDebug, "AuditLocationMetadataStart") + TraceEvent(SevVerbose, "CheckLocationMetadataStart") .detail("By", "PostCheck") .detail("Context", context) - .detail("AuditRange", range); + .detail("Range", range); loop { try { loop { @@ -554,7 +565,7 @@ ACTOR Future auditLocationMetadataPostCheck(Database occ, KeyRange range, actors.push_back(store(UIDtoTagMap, tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY))); wait(waitForAll(actors)); ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY); - TraceEvent(SevVerbose, "AuditLocationMetadataReadDone") + TraceEvent(SevVerbose, "CheckLocationMetadataReadDone") .detail("By", "PostCheck") .detail("ResultSize", readResultKS.size()); // Read serverKeys @@ -591,11 +602,11 @@ ACTOR Future auditLocationMetadataPostCheck(Database occ, KeyRange range, rangeToReadBegin = readResultKS.back().key; continue; } else { - TraceEvent(SevDebug, "AuditLocationMetadataComplete") + TraceEvent(SevVerbose, "CheckLocationMetadataComplete") .detail("By", "PostCheck") .detail("DataMoveID", dataMoveId) .detail("Context", context) - .detail("AuditRange", range); + .detail("Range", range); break; } } catch (Error& e) { @@ -611,21 +622,21 @@ ACTOR Future auditLocationMetadataPostCheck(Database occ, KeyRange range, // Check corruptions for the current (failed) round for (const auto& [idx, res] : results) { if (res.present() && !res.get()) { - TraceEvent(SevError, "AuditLocationMetadataCorruptionDetectedWhenFailed") + TraceEvent(SevError, "CheckLocationMetadataCorruptionDetectedWhenFailed") .detail("By", "PostCheck") .detail("DataMoveID", dataMoveId) .detail("Context", context) - .detail("AuditRange", range); + .detail("Range", range); throw location_metadata_corruption(); } } if (retryCount > SERVER_KNOBS->AUDIT_DATAMOVE_POST_CHECK_RETRY_COUNT_MAX) { - TraceEvent(SevInfo, "AuditLocationMetadataFailed") + TraceEvent(SevInfo, "CheckLocationMetadataFailed") .errorUnsuppressed(e) .detail("By", "PostCheck") .detail("DataMoveID", dataMoveId) .detail("Context", context) - .detail("AuditRange", range); + .detail("Range", range); // If no corruption detected, exit silently } else { wait(delay(0.5)); diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index eb308f7e61..c5760901ea 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -5684,6 +5684,10 @@ ACTOR Future auditStorageServerShardQ(StorageServer* data, AuditStorageReq .detail("NumValidatedLocalShards", cumulatedValidatedLocalShardsNum) .detail("NumValidatedServerKeys", cumulatedValidatedServerKeysNum); + // Make sure the history collection is not open due to this audit + data->stopTrackShardAssignment(); + TraceEvent(SevVerbose, "SSShardAssignmentHistoryRecordStopWhenExit", data->thisServerID).detail("AuditID", req.id); + return Void(); } diff --git a/fdbserver/workloads/ValidateStorage.actor.cpp b/fdbserver/workloads/ValidateStorage.actor.cpp index a67c320a6f..b58f274d6f 100644 --- a/fdbserver/workloads/ValidateStorage.actor.cpp +++ b/fdbserver/workloads/ValidateStorage.actor.cpp @@ -23,6 +23,7 @@ #include "fdbclient/ManagementAPI.actor.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbserver/Knobs.h" +#include "fdbserver/QuietDatabase.h" #include "fdbrpc/simulator.h" #include "fdbserver/workloads/workloads.actor.h" #include "flow/Error.h" @@ -71,7 +72,17 @@ struct ValidateStorage : TestWorkload { // We disable failure injection because there is an irrelevant issue: // Remote tLog is failed to rejoin to CC // Once this issue is fixed, we should be able to enable the failure injection - void disableFailureInjectionWorkloads(std::set& out) const override { out.emplace("Attrition"); } + // This workload is not compatible with following workload because they will race in changing the DD mode + void disableFailureInjectionWorkloads(std::set& out) const override { + out.insert({ "RandomMoveKeys", + "DataLossRecovery", + "IDDTxnProcessorApiCorrectness", + "PerpetualWiggleStatsWorkload", + "PhysicalShardMove", + "StorageCorruption", + "StorageServerCheckpointRestoreTest", + "Attrition" }); + } void validationFailed(ErrorOr> expectedValue, ErrorOr> actualValue) { TraceEvent(SevError, "TestFailed") @@ -173,7 +184,6 @@ struct ValidateStorage : TestWorkload { } ACTOR Future checkAuditStorageInternalState(Database cx, AuditType type, UID auditId, std::string context) { - // Check no audit is in Running or Error phase // Check the number of existing persisted audits is no more than PERSIST_FINISH_AUDIT_COUNT state Transaction tr(cx); loop { @@ -206,7 +216,11 @@ struct ValidateStorage : TestWorkload { } } } - if (res.size() > SERVER_KNOBS->PERSIST_FINISH_AUDIT_COUNT + 1) { + if (res.size() > SERVER_KNOBS->PERSIST_FINISH_AUDIT_COUNT + 5) { + // Note that 5 is the sum of 4 + 1 + // In the test, we issue at most 4 concurrent audits at the same time + // The 4 concurrent audits may not be complete in time + // So, the cleanup does not precisely guarantee PERSIST_FINISH_AUDIT_COUNT TraceEvent("TestAuditStorageCheckPersistStateWaitClean") .detail("ExistCount", res.size()) .detail("Context", context) @@ -294,6 +308,12 @@ struct ValidateStorage : TestWorkload { wait(self->testAuditStorageProgress(self, cx)); TraceEvent("TestAuditStorageProgressDone"); + wait(self->testAuditStorageWhenDDSecurityMode(self, cx)); + TraceEvent("TestAuditStorageWhenDDSecurityModeDone"); + + wait(self->testAuditStorageWhenDDBackToNormalMode(self, cx)); + TraceEvent("TestAuditStorageWhenDDBackToNormalModeDone"); + return Void(); } @@ -710,6 +730,44 @@ struct ValidateStorage : TestWorkload { return Void(); } + ACTOR Future testAuditStorageWhenDDSecurityMode(ValidateStorage* self, Database cx) { + TraceEvent("TestAuditStorageWhenDDSecurityModeBegin"); + int _ = wait(setDDMode(cx, 2)); + UID auditIdA = + wait(self->auditStorageForType(self, cx, AuditType::ValidateHA, "TestAuditStorageWhenDDSecurityMode")); + TraceEvent("TestFunctionalityHADoneWhenDDSecurityMode", auditIdA); + UID auditIdB = + wait(self->auditStorageForType(self, cx, AuditType::ValidateReplica, "TestAuditStorageWhenDDSecurityMode")); + TraceEvent("TestFunctionalityReplicaDoneWhenDDSecurityMode", auditIdB); + UID auditIdC = wait(self->auditStorageForType( + self, cx, AuditType::ValidateLocationMetadata, "TestAuditStorageWhenDDSecurityMode")); + TraceEvent("TestFunctionalityShardLocationMetadataDoneWhenDDSecurityMode", auditIdC); + UID auditIdD = wait(self->auditStorageForType( + self, cx, AuditType::ValidateStorageServerShard, "TestAuditStorageWhenDDSecurityMode")); + TraceEvent("TestFunctionalitySSShardInfoDoneWhenDDSecurityMode", auditIdD); + TraceEvent("TestAuditStorageWhenDDSecurityModeEnd"); + return Void(); + } + + ACTOR Future testAuditStorageWhenDDBackToNormalMode(ValidateStorage* self, Database cx) { + TraceEvent("TestAuditStorageWhenDDBackToNormalModeBegin"); + int _ = wait(setDDMode(cx, 1)); + UID auditIdA = + wait(self->auditStorageForType(self, cx, AuditType::ValidateHA, "TestAuditStorageWhenDDBackToNormalMode")); + TraceEvent("TestFunctionalityHADoneWhenDDBackToNormalMode", auditIdA); + UID auditIdB = wait( + self->auditStorageForType(self, cx, AuditType::ValidateReplica, "TestAuditStorageWhenDDBackToNormalMode")); + TraceEvent("TestFunctionalityReplicaDoneWhenDDBackToNormalMode", auditIdB); + UID auditIdC = wait(self->auditStorageForType( + self, cx, AuditType::ValidateLocationMetadata, "TestAuditStorageWhenDDBackToNormalMode")); + TraceEvent("TestFunctionalityShardLocationMetadataDoneWhenDDBackToNormalMode", auditIdC); + UID auditIdD = wait(self->auditStorageForType( + self, cx, AuditType::ValidateStorageServerShard, "TestAuditStorageWhenDDBackToNormalMode")); + TraceEvent("TestFunctionalitySSShardInfoDoneWhenDDBackToNormalMode", auditIdD); + TraceEvent("TestAuditStorageWhenDDBackToNormalModeEnd"); + return Void(); + } + Future check(Database const& cx) override { return true; } void getMetrics(std::vector& m) override {}