From 5160f91e788b01309a49a273a12ab12e9fc24554 Mon Sep 17 00:00:00 2001 From: He Liu Date: Tue, 23 May 2023 13:39:25 -0700 Subject: [PATCH 01/10] Removed SHARD_ENCODE_LOCATION_METADATA. --- fdbclient/ServerKnobs.cpp | 1 - fdbclient/include/fdbclient/ServerKnobs.h | 1 - fdbserver/MoveKeys.actor.cpp | 8 ++------ 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 475d8ca527..4cd0464839 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -180,7 +180,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( SHARD_ENCODE_LOCATION_METADATA, false ); if( randomize && BUGGIFY ) SHARD_ENCODE_LOCATION_METADATA = true; init( ENABLE_DD_PHYSICAL_SHARD, false ); // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true; When true, optimization of data move between DCs is disabled - init( ENABLE_DD_PHYSICAL_SHARD_MOVE, false ); if( isSimulated ) ENABLE_DD_PHYSICAL_SHARD_MOVE = deterministicRandom()->coinflip(); init( DD_PHYSICAL_SHARD_MOVE_PROBABILITY, 0.5 ); init( MAX_PHYSICAL_SHARD_BYTES, 10000000 ); // 10 MB; for ENABLE_DD_PHYSICAL_SHARD; smaller leads to larger number of physicalShard per storage server init( PHYSICAL_SHARD_METRICS_DELAY, 300.0 ); // 300 seconds; for ENABLE_DD_PHYSICAL_SHARD diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 832d7812ab..368fcea237 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -204,7 +204,6 @@ public: bool SHARD_ENCODE_LOCATION_METADATA; // If true, location metadata will contain shard ID. bool ENABLE_DD_PHYSICAL_SHARD; // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true. - bool ENABLE_DD_PHYSICAL_SHARD_MOVE; // Enable physical shard move. double DD_PHYSICAL_SHARD_MOVE_PROBABILITY; // Percentage of physical shard move, in the range of [0, 1]. int64_t MAX_PHYSICAL_SHARD_BYTES; double PHYSICAL_SHARD_METRICS_DELAY; diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 251a13e026..ed8340f5d0 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1953,9 +1953,7 @@ ACTOR static Future finishMoveShards(Database occ, wait(waitForAll(actors)); if (range.end == dataMove.ranges.front().end) { - if (SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD_MOVE) { - wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId)); - } + wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId)); tr.clear(dataMoveKeyFor(dataMoveId)); complete = true; TraceEvent(sevDm, "FinishMoveShardsDeleteMetaData", dataMoveId) @@ -2707,9 +2705,7 @@ ACTOR Future cleanUpDataMoveCore(Database occ, } if (range.end == dataMove.ranges.front().end) { - if (SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD_MOVE) { - wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId)); - } + wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId)); tr.clear(dataMoveKeyFor(dataMoveId)); complete = true; TraceEvent(sevDm, "CleanUpDataMoveDeleteMetaData", dataMoveId) From 241907d8ad61e11d6a301b0391c39e8967cc8e59 Mon Sep 17 00:00:00 2001 From: He Liu Date: Wed, 24 May 2023 19:02:30 -0700 Subject: [PATCH 02/10] Clear move-in-shards before terminating SS. --- fdbserver/DDRelocationQueue.actor.cpp | 4 ++-- fdbserver/storageserver.actor.cpp | 4 ++++ fdbserver/workloads/PhysicalShardMove.actor.cpp | 1 - 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/fdbserver/DDRelocationQueue.actor.cpp b/fdbserver/DDRelocationQueue.actor.cpp index 9c0f957742..819a67c408 100644 --- a/fdbserver/DDRelocationQueue.actor.cpp +++ b/fdbserver/DDRelocationQueue.actor.cpp @@ -1047,7 +1047,7 @@ void DDQueue::launchQueuedWork(std::set rrs.dataMoveId = UID(); } else { const bool enabled = - deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY; + deterministicRandom()->random01() <= SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY; rrs.dataMoveId = newDataMoveId(deterministicRandom()->randomUInt64(), AssignEmptyRange::False, EnablePhysicalShardMove(enabled)); @@ -1635,7 +1635,7 @@ ACTOR Future dataDistributionRelocator(DDQueue* self, self->moveCreateNewPhysicalShard++; } const bool enabled = - deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY; + deterministicRandom()->random01() <= SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY; rd.dataMoveId = newDataMoveId( physicalShardIDCandidate, AssignEmptyRange::False, EnablePhysicalShardMove(enabled)); TraceEvent(SevInfo, "NewDataMoveWithPhysicalShard") diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 55de7245b4..a894e9411f 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -9431,6 +9431,8 @@ ACTOR Future cleanUpMoveInShard(StorageServer* data, Version version, Move } wait(data->durableVersion.whenAtLeast(mLV.version + 1)); + data->moveInShards.erase(moveInShard->id()); + return Void(); } @@ -13882,6 +13884,8 @@ ACTOR Future storageServer(IKeyValueStore* persistentData, self.ssLock->halt(); + self.moveInShards.clear(); + state Error err = e; if (storageServerTerminated(self, persistentData, err)) { ssCore.cancel(); diff --git a/fdbserver/workloads/PhysicalShardMove.actor.cpp b/fdbserver/workloads/PhysicalShardMove.actor.cpp index 287d542639..05f82a7237 100644 --- a/fdbserver/workloads/PhysicalShardMove.actor.cpp +++ b/fdbserver/workloads/PhysicalShardMove.actor.cpp @@ -101,7 +101,6 @@ struct PhysicalShardMoveWorkLoad : TestWorkload { newDataMoveId(deterministicRandom()->randomUInt64(), AssignEmptyRange::False, EnablePhysicalShardMove::True), - // EnablePhysicalShardMove::False), KeyRangeRef("TestKeyA"_sr, "TestKeyF"_sr), teamSize, includes, From 53675db30610df3e02c7e67fde695bf102a220d1 Mon Sep 17 00:00:00 2001 From: Zhe Wang Date: Fri, 26 May 2023 15:56:03 -0700 Subject: [PATCH 03/10] Fix audit storage issue with multiple DDs (#10310) * init * add DDAuditContext * move metadata update before runauditstorage * revert DDAuditContext and replace ddAuditId with ddId * cleanup --- fdbclient/AuditUtils.actor.cpp | 49 +++++++++++++---- fdbclient/ServerKnobs.cpp | 1 - fdbclient/include/fdbclient/Audit.h | 34 +++++++----- .../include/fdbclient/AuditUtils.actor.h | 1 + fdbclient/include/fdbclient/ServerKnobs.h | 1 - fdbserver/DataDistribution.actor.cpp | 53 ++++++++++++++++--- fdbserver/storageserver.actor.cpp | 46 ++++++++-------- 7 files changed, 132 insertions(+), 53 deletions(-) diff --git a/fdbclient/AuditUtils.actor.cpp b/fdbclient/AuditUtils.actor.cpp index 98f0118bc4..ba43a34d5f 100644 --- a/fdbclient/AuditUtils.actor.cpp +++ b/fdbclient/AuditUtils.actor.cpp @@ -251,7 +251,6 @@ ACTOR static Future checkMoveKeysLock(Transaction* tr, TraceEvent(SevDebug, "ConflictWithPreviousOwner"); throw movekeys_conflict(); // need a new name } - // Take the lock if (isWrite) { BinaryWriter wrMyOwner(Unversioned()); @@ -267,7 +266,6 @@ ACTOR static Future checkMoveKeysLock(Transaction* tr, .detail("MyOwner", lock.myOwner.toString()) .detail("Writer", lastWriter.toString()); } - return Void(); } else if (currentOwner == lock.myOwner) { if (isWrite) { @@ -278,7 +276,6 @@ ACTOR static Future checkMoveKeysLock(Transaction* tr, // Make this transaction self-conflicting so the database will not execute it twice with the same write key tr->makeSelfConflicting(); } - return Void(); } else { CODE_PROBE(true, "checkMoveKeysLock: Conflict with new owner"); @@ -287,6 +284,37 @@ ACTOR static Future checkMoveKeysLock(Transaction* tr, } } +ACTOR Future updateAuditState(Database cx, AuditStorageState auditState, MoveKeyLockInfo lock, bool ddEnabled) { + state Transaction tr(cx); + + loop { + try { + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + wait(checkMoveKeysLock(&tr, lock, ddEnabled, true)); + // Persist audit result + tr.set(auditKey(auditState.getType(), auditState.id), auditStorageStateValue(auditState)); + wait(tr.commit()); + TraceEvent(SevDebug, "AuditUtilUpdateAuditState", auditState.id) + .detail("AuditID", auditState.id) + .detail("AuditType", auditState.getType()) + .detail("AuditPhase", auditState.getPhase()) + .detail("AuditKey", auditKey(auditState.getType(), auditState.id)); + break; + } catch (Error& e) { + TraceEvent(SevDebug, "AuditUtilUpdateAuditStateError", auditState.id) + .errorUnsuppressed(e) + .detail("AuditID", auditState.id) + .detail("AuditType", auditState.getType()) + .detail("AuditPhase", auditState.getPhase()) + .detail("AuditKey", auditKey(auditState.getType(), auditState.id)); + wait(tr.onError(e)); + } + } + + return Void(); +} + ACTOR Future persistNewAuditState(Database cx, AuditStorageState auditState, MoveKeyLockInfo lock, @@ -333,7 +361,6 @@ ACTOR Future persistNewAuditState(Database cx, TraceEvent(SevDebug, "AuditUtilPersistedNewAuditState", auditId) .detail("AuditKey", auditKey(auditState.getType(), auditId)); break; - } catch (Error& e) { TraceEvent(SevDebug, "AuditUtilPersistedNewAuditStateError", auditId) .errorUnsuppressed(e) @@ -384,7 +411,6 @@ ACTOR Future persistAuditState(Database cx, .detail("AuditKey", auditKey(auditState.getType(), auditState.id)) .detail("Context", context); break; - } catch (Error& e) { TraceEvent(SevDebug, "AuditUtilPersistAuditStateError", auditState.id) .errorUnsuppressed(e) @@ -415,7 +441,6 @@ ACTOR Future getAuditState(Database cx, AuditType type, UID i .detail("AuditType", type) .detail("AuditKey", auditKey(type, id)); break; - } catch (Error& e) { TraceEvent(SevDebug, "AuditUtilReadAuditStateError", id) .errorUnsuppressed(e) @@ -493,12 +518,15 @@ ACTOR Future persistAuditStateByRange(Database cx, AuditStorageState audit if (ddAuditState.getPhase() != AuditPhase::Running) { throw audit_storage_failed(); } + ASSERT(ddAuditState.ddId.isValid()); + if (ddAuditState.ddId != auditState.ddId) { + throw audit_storage_failed(); // a new dd starts and this audit task is outdated + } wait(krmSetRange(&tr, auditRangeBasedProgressPrefixFor(auditState.getType(), auditState.id), auditState.range, auditStorageStateValue(auditState))); break; - } catch (Error& e) { wait(tr.onError(e)); } @@ -525,7 +553,6 @@ ACTOR Future> getAuditStateByRange(Database cx, CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES)); auditStates = res_; break; - } catch (Error& e) { TraceEvent(SevDebug, "AuditUtilGetAuditStateForRangeError").errorUnsuppressed(e).detail("AuditID", auditId); wait(tr.onError(e)); @@ -567,13 +594,16 @@ ACTOR Future persistAuditStateByServer(Database cx, AuditStorageState audi if (ddAuditState.getPhase() != AuditPhase::Running) { throw audit_storage_failed(); } + ASSERT(ddAuditState.ddId.isValid()); + if (ddAuditState.ddId != auditState.ddId) { + throw audit_storage_failed(); // a new dd starts and this audit task is outdated + } wait(krmSetRange( &tr, auditServerBasedProgressPrefixFor(auditState.getType(), auditState.id, auditState.auditServerId), auditState.range, auditStorageStateValue(auditState))); break; - } catch (Error& e) { wait(tr.onError(e)); } @@ -601,7 +631,6 @@ ACTOR Future> getAuditStateByServer(Database cx, CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES)); auditStates = res_; break; - } catch (Error& e) { TraceEvent(SevDebug, "AuditUtilGetAuditStateForRangeError").errorUnsuppressed(e).detail("AuditID", auditId); wait(tr.onError(e)); diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 8ebb2e36d1..6a04928254 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -878,7 +878,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( SERVE_AUDIT_STORAGE_PARALLELISM, 1 ); init( PERSIST_FINISH_AUDIT_COUNT, 10 ); if ( isSimulated ) PERSIST_FINISH_AUDIT_COUNT = 1; init( AUDIT_RETRY_COUNT_MAX, 100 ); if ( isSimulated ) AUDIT_RETRY_COUNT_MAX = 10; - init( SS_AUDIT_AUTO_PROCEED_COUNT_MAX, 5 ); init( CONCURRENT_AUDIT_TASK_COUNT_MAX, 50 ); if ( isSimulated ) CONCURRENT_AUDIT_TASK_COUNT_MAX = deterministicRandom()->randomInt(1, CONCURRENT_AUDIT_TASK_COUNT_MAX+1); init( BUGGIFY_BLOCK_BYTES, 10000 ); init( STORAGE_RECOVERY_VERSION_LAG_LIMIT, 2 * MAX_READ_TRANSACTION_LIFE_VERSIONS ); diff --git a/fdbclient/include/fdbclient/Audit.h b/fdbclient/include/fdbclient/Audit.h index 1d6c9a999e..ce9ae90a6b 100644 --- a/fdbclient/include/fdbclient/Audit.h +++ b/fdbclient/include/fdbclient/Audit.h @@ -45,24 +45,24 @@ enum class AuditType : uint8_t { struct AuditStorageState { constexpr static FileIdentifier file_identifier = 13804340; - AuditStorageState() : type(0), auditServerId(UID()), phase(0) {} + AuditStorageState() : type(0), auditServerId(UID()), phase(0), ddId(UID()) {} AuditStorageState(UID id, UID auditServerId, AuditType type) - : id(id), auditServerId(auditServerId), type(static_cast(type)), phase(0) {} + : id(id), auditServerId(auditServerId), type(static_cast(type)), phase(0), ddId(UID()) {} AuditStorageState(UID id, KeyRange range, AuditType type) - : id(id), auditServerId(UID()), range(range), type(static_cast(type)), phase(0) {} + : id(id), auditServerId(UID()), range(range), type(static_cast(type)), phase(0), ddId(UID()) {} AuditStorageState(UID id, AuditType type) - : id(id), auditServerId(UID()), type(static_cast(type)), phase(0) {} + : id(id), auditServerId(UID()), type(static_cast(type)), phase(0), ddId(UID()) {} template void serialize(Ar& ar) { - serializer(ar, id, auditServerId, range, type, phase, error); + serializer(ar, id, auditServerId, range, type, phase, error, ddId); } - void setType(AuditType type) { this->type = static_cast(type); } - AuditType getType() const { return static_cast(this->type); } + inline void setType(AuditType type) { this->type = static_cast(type); } + inline AuditType getType() const { return static_cast(this->type); } - void setPhase(AuditPhase phase) { this->phase = static_cast(phase); } - AuditPhase getPhase() const { return static_cast(this->phase); } + inline void setPhase(AuditPhase phase) { this->phase = static_cast(phase); } + inline AuditPhase getPhase() const { return static_cast(this->phase); } // for fdbcli get_audit_status std::string toStringForCLI() const { @@ -90,7 +90,14 @@ struct AuditStorageState { } UID id; - UID auditServerId; + UID ddId; // ddId indicates this audit is managed by which dd + // ddId is used to check if dd has changed + // When a new dd starts in the middle of an onging audit, + // The ongoing audit's ddId gets updated + // When SS updates the progress, it checks ddId + // If the ddId is updated, SS Audit actors of the old dd will stop themselves + // New dd will issue new requests to SSes to continue the remaining work + UID auditServerId; // UID of SS who is working on this audit task KeyRange range; uint8_t type; uint8_t phase; @@ -105,15 +112,16 @@ struct AuditStorageRequest { AuditStorageRequest(UID id, KeyRange range, AuditType type) : id(id), range(range), type(static_cast(type)) {} - void setType(AuditType type) { this->type = static_cast(this->type); } - AuditType getType() const { return static_cast(this->type); } + inline void setType(AuditType type) { this->type = static_cast(this->type); } + inline AuditType getType() const { return static_cast(this->type); } template void serialize(Ar& ar) { - serializer(ar, id, range, type, targetServers, reply); + serializer(ar, id, range, type, targetServers, reply, ddId); } UID id; + UID ddId; // UID of DD who claims the audit KeyRange range; uint8_t type; std::vector targetServers; diff --git a/fdbclient/include/fdbclient/AuditUtils.actor.h b/fdbclient/include/fdbclient/AuditUtils.actor.h index 4b63e7389a..f93f60a26b 100644 --- a/fdbclient/include/fdbclient/AuditUtils.actor.h +++ b/fdbclient/include/fdbclient/AuditUtils.actor.h @@ -66,5 +66,6 @@ ACTOR Future clearAuditMetadataForType(Database cx, UID maxAuditIdToClear, int numFinishAuditToKeep); ACTOR Future checkStorageServerRemoved(Database cx, UID ssid); +ACTOR Future updateAuditState(Database cx, AuditStorageState auditState, MoveKeyLockInfo lock, bool ddEnabled); #include "flow/unactorcompiler.h" #endif diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 0c7909eaec..5d4b8af04b 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -865,7 +865,6 @@ public: int SERVE_AUDIT_STORAGE_PARALLELISM; int PERSIST_FINISH_AUDIT_COUNT; // Num of persist complete/failed audits for each type int AUDIT_RETRY_COUNT_MAX; - int SS_AUDIT_AUTO_PROCEED_COUNT_MAX; int CONCURRENT_AUDIT_TASK_COUNT_MAX; int BUGGIFY_BLOCK_BYTES; int64_t STORAGE_RECOVERY_VERSION_LAG_LIMIT; diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index a433d8d371..9c70181620 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -96,8 +96,8 @@ struct DDAudit { int64_t overallIssuedDoAuditCount; int64_t overallCompleteDoAuditCount; - void setAuditRunActor(Future actor) { auditActor = actor; } - Future getAuditRunActor() { return auditActor; } + inline void setAuditRunActor(Future actor) { auditActor = actor; } + inline Future getAuditRunActor() { return auditActor; } // auditActor and actors are guaranteed to deliver a cancel signal void cancel() { @@ -771,15 +771,49 @@ void cancelAllAuditsInAuditMap(Reference self) { return; } -void resumeStorageAudits(Reference self) { +ACTOR Future resumeStorageAudits(Reference self) { ASSERT(!self->auditInitialized.getFuture().isReady()); if (self->initData->auditStates.empty()) { self->auditInitialized.send(Void()); TraceEvent(SevVerbose, "AuditStorageResumeEmptyDone", self->ddId); - return; + return Void(); } - cancelAllAuditsInAuditMap(self); // cancel existing audits + // Update metadata + state int retryCount = 0; + loop { + try { + std::vector> fs; + state MoveKeyLockInfo lockInfo; + lockInfo.myOwner = self->lock.myOwner; + lockInfo.prevOwner = self->lock.prevOwner; + lockInfo.prevWrite = self->lock.prevWrite; + for (const auto& auditState : self->initData->auditStates) { + // Only running audit will be resumed + if (auditState.getPhase() == AuditPhase::Running) { + AuditStorageState toUpdate = auditState; + toUpdate.ddId = self->ddId; + fs.push_back(updateAuditState( + self->txnProcessor->context(), toUpdate, lockInfo, self->context->isDDEnabled())); + } + } + wait(waitForAll(fs)); + break; + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled || e.code() == error_code_movekeys_conflict) { + throw e; + } + if (retryCount > 50) { + TraceEvent(SevWarnAlways, "ResumeAuditStorageUnableUpdateMetadata", self->ddId).errorUnsuppressed(e); + return Void(); + } + retryCount++; + } + } + + // Following is atomic + // Cancel existing audits and restore + cancelAllAuditsInAuditMap(self); std::unordered_map> restoredAudits; for (const auto& auditState : self->initData->auditStates) { restoredAudits[auditState.getType()].push_back(auditState); @@ -843,7 +877,7 @@ void resumeStorageAudits(Reference self) { self->auditInitialized.send(Void()); TraceEvent(SevDebug, "AuditStorageResumeDone", self->ddId); - return; + return Void(); } // Periodically check and log the physicalShard status; clean up empty physicalShard; @@ -1001,7 +1035,7 @@ ACTOR Future dataDistribution(Reference self, anyZeroHealthyTeams = zeroHealthyTeams[0]; } - resumeStorageAudits(self); + actors.push_back(resumeStorageAudits(self)); actors.push_back(self->pollMoveKeysLock()); @@ -1753,6 +1787,7 @@ ACTOR Future auditStorageCore(Reference self, try { ASSERT(audit != nullptr); + ASSERT(audit->coreState.ddId == self->ddId); loadAndDispatchAudit(self, audit, audit->coreState.range); TraceEvent(SevInfo, "DDAuditStorageCoreScheduled", self->ddId) .detail("Context", context) @@ -1931,6 +1966,7 @@ void runAuditStorage(Reference self, ASSERT(auditState.id.isValid()); ASSERT(!auditState.range.empty()); ASSERT(auditState.getPhase() == AuditPhase::Running); + auditState.ddId = self->ddId; // make sure any existing audit state claims the current DD std::shared_ptr audit = std::make_shared(auditState); audit->retryCount = retryCount; TraceEvent(SevDebug, "DDRunAuditStorage", self->ddId) @@ -1997,6 +2033,7 @@ ACTOR Future launchAudit(Reference self, KeyRange auditRan auditState.setType(auditType); auditState.range = auditRange; auditState.setPhase(AuditPhase::Running); + auditState.ddId = self->ddId; // persist ddId to new ddAudit metadata TraceEvent(SevVerbose, "DDAuditStorageLaunchPersistNewAuditIDBefore", self->ddId) .detail("AuditType", auditType) .detail("Range", auditRange); @@ -2227,6 +2264,7 @@ ACTOR Future scheduleAuditStorageShardOnServer(Reference // We always issue exactly one audit task (for the remaining part) when schedule ASSERT(issueDoAuditCount == 0); issueDoAuditCount++; + req.ddId = self->ddId; // send this ddid to SS audit->actors.add(doAuditOnStorageServer(self, audit, ssi, req)); } } @@ -2447,6 +2485,7 @@ ACTOR Future scheduleAuditOnRange(Reference self, SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX - 1); } issueDoAuditCount++; + req.ddId = self->ddId; // send this ddid to SS audit->actors.add(doAuditOnStorageServer(self, audit, targetServer, req)); } } diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 2936d4f521..9754bd63c7 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -4899,7 +4899,8 @@ Key constructMappedKey(KeyValueRef* keyValue, std::vector>& vec, ACTOR Future validateRangeAgainstServer(StorageServer* data, AuditStorageState auditState, Version version, - StorageServerInterface remoteServer) { + StorageServerInterface remoteServer, + UID ddId) { TraceEvent(SevInfo, "ValidateRangeAgainstServerBegin", data->thisServerID) .detail("AuditID", auditState.id) .detail("Range", auditState.range) @@ -5029,6 +5030,8 @@ ACTOR Future validateRangeAgainstServer(StorageServer* data, range = KeyRangeRef(keyAfter(lastKey), range.end); auditState.range = KeyRangeRef(originBegin, range.begin); auditState.setPhase(AuditPhase::Complete); + ASSERT(ddId.isValid()); + auditState.ddId = ddId; // used to compare req.ddId with existing persisted ddId wait(persistAuditStateByRange(data->cx, auditState)); } } catch (Error& e) { @@ -5048,6 +5051,8 @@ ACTOR Future validateRangeAgainstServer(StorageServer* data, .detail("ErrorMessage", error) .detail("RemoteServer", remoteServer.toString()); auditState.setPhase(AuditPhase::Error); + ASSERT(ddId.isValid()); + auditState.ddId = ddId; // used to compare req.ddId with existing persisted ddId wait(persistAuditStateByRange(data->cx, auditState)); throw audit_storage_error(); } @@ -5061,7 +5066,10 @@ ACTOR Future validateRangeAgainstServer(StorageServer* data, return Void(); } -ACTOR Future validateRangeShard(StorageServer* data, AuditStorageState auditState, std::vector candidates) { +ACTOR Future validateRangeShard(StorageServer* data, + AuditStorageState auditState, + std::vector candidates, + UID ddId) { TraceEvent(SevDebug, "ServeValidateRangeShardBegin", data->thisServerID) .detail("Range", auditState.range) .detail("Servers", describe(candidates)); @@ -5122,7 +5130,7 @@ ACTOR Future validateRangeShard(StorageServer* data, AuditStorageState aud } if (remoteServer != nullptr) { - wait(validateRangeAgainstServer(data, auditState, version, *remoteServer)); + wait(validateRangeAgainstServer(data, auditState, version, *remoteServer, ddId)); } else { TraceEvent(SevWarn, "ServeValidateRangeShardRemoteNotFound", data->thisServerID) .detail("Range", auditState.range) @@ -5135,7 +5143,8 @@ ACTOR Future validateRangeShard(StorageServer* data, AuditStorageState aud ACTOR Future validateRangeAgainstServers(StorageServer* data, AuditStorageState auditState, - std::vector targetServers) { + std::vector targetServers, + UID ddId) { TraceEvent(SevDebug, "ValidateRangeAgainstServersBegin", data->thisServerID) .detail("AuditID", auditState.id) .detail("Range", auditState.range) @@ -5172,7 +5181,7 @@ ACTOR Future validateRangeAgainstServers(StorageServer* data, .detail("Range", auditState.range); throw audit_storage_failed(); } - fs.push_back(validateRangeAgainstServer(data, auditState, version, decodeServerListValue(v.get()))); + fs.push_back(validateRangeAgainstServer(data, auditState, version, decodeServerListValue(v.get()), ddId)); } wait(waitForAll(fs)); @@ -5502,9 +5511,6 @@ ACTOR Future auditStorageStorageServerShardQ(StorageServer* data, AuditSto state Key rangeToReadBegin = req.range.begin; state KeyRangeRef rangeToRead; state int retryCount = 0; - state int storageAutoProceedCount = 0; - // storageAutoProceedCount is guard to make sure that audit at SS does not run too long - // by itself without being notified by DD state int64_t cumulatedValidatedLocalShardsNum = 0; state int64_t cumulatedValidatedServerKeysNum = 0; @@ -5651,6 +5657,8 @@ ACTOR Future auditStorageStorageServerShardQ(StorageServer* data, AuditSto .detail("AuditServer", data->thisServerID); res.range = claimRange; res.setPhase(AuditPhase::Error); + ASSERT(req.ddId.isValid()); + res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId wait(persistAuditStateByServer(data->cx, res)); req.reply.sendError(audit_storage_error()); break; @@ -5658,6 +5666,8 @@ ACTOR Future auditStorageStorageServerShardQ(StorageServer* data, AuditSto // Expand persisted complete range res.range = Standalone(KeyRangeRef(req.range.begin, claimRange.end)); res.setPhase(AuditPhase::Complete); + ASSERT(req.ddId.isValid()); + res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId wait(persistAuditStateByServer(data->cx, res)); TraceEvent(SevInfo, "AuditStorageSsShardDone", data->thisServerID) .detail("AuditId", req.id) @@ -5665,11 +5675,7 @@ ACTOR Future auditStorageStorageServerShardQ(StorageServer* data, AuditSto .detail("AuditServer", data->thisServerID) .detail("CompleteRange", res.range); if (claimRange.end < rangeToRead.end) { - if (storageAutoProceedCount > SERVER_KNOBS->SS_AUDIT_AUTO_PROCEED_COUNT_MAX) { - throw audit_storage_failed(); - } rangeToReadBegin = claimRange.end; - storageAutoProceedCount++; } else { // complete req.reply.send(res); break; @@ -5726,9 +5732,6 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora state Transaction tr(data->cx); state Key rangeToReadBegin = req.range.begin; state KeyRangeRef rangeToRead; - state int storageAutoProceedCount = 0; - // storageAutoProceedCount is guard to make sure that audit at SS does not run too long - // by itself without being notified by DD state int64_t cumulatedValidatedServerKeysNum = 0; state int64_t cumulatedValidatedKeyServersNum = 0; @@ -5887,6 +5890,8 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora .detail("ClaimRange", claimRange); res.range = claimRange; res.setPhase(AuditPhase::Error); + ASSERT(req.ddId.isValid()); + res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId wait(persistAuditStateByRange(data->cx, res)); req.reply.sendError(audit_storage_error()); break; @@ -5894,6 +5899,8 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora // Expand persisted complete range res.range = Standalone(KeyRangeRef(req.range.begin, claimRange.end)); res.setPhase(AuditPhase::Complete); + ASSERT(req.ddId.isValid()); + res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId wait(persistAuditStateByRange(data->cx, res)); TraceEvent(SevInfo, "AuditStorageShardLocMetadataDone", data->thisServerID) .detail("AuditId", req.id) @@ -5902,11 +5909,7 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora .detail("AuditServerId", data->thisServerID) .detail("CompleteRange", res.range); if (claimRange.end < rangeToRead.end) { - if (storageAutoProceedCount > SERVER_KNOBS->SS_AUDIT_AUTO_PROCEED_COUNT_MAX) { - throw audit_storage_failed(); - } rangeToReadBegin = claimRange.end; - storageAutoProceedCount++; } else { // complete req.reply.send(res); break; @@ -5968,7 +5971,8 @@ ACTOR Future auditStorageQ(StorageServer* data, AuditStorageRequest req) { fs.push_back(validateRangeShard( data, AuditStorageState(res.id, KeyRangeRef(shards[i].key, shards[i + 1].key), res.getType()), - src)); + src, + req.ddId)); begin = shards[i + 1].key; } } catch (Error& e) { @@ -5976,7 +5980,7 @@ ACTOR Future auditStorageQ(StorageServer* data, AuditStorageRequest req) { } } } else { - fs.push_back(validateRangeAgainstServers(data, res, req.targetServers)); + fs.push_back(validateRangeAgainstServers(data, res, req.targetServers, req.ddId)); } wait(waitForAll(fs)); From 2385dd36f38003278f26870d7c90f0aeec74af9e Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Fri, 26 May 2023 09:26:26 -0700 Subject: [PATCH 04/10] Update GLOBAL_TAG_THROTTLING_FOLDING_TIME default to 10.0 --- fdbclient/ServerKnobs.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 6a04928254..6bc4543591 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -833,13 +833,13 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( GLOBAL_TAG_THROTTLING, true ); if(isSimulated) GLOBAL_TAG_THROTTLING = deterministicRandom()->coinflip(); init( ENFORCE_TAG_THROTTLING_ON_PROXIES, GLOBAL_TAG_THROTTLING ); init( GLOBAL_TAG_THROTTLING_MIN_RATE, 1.0 ); - // 60 seconds was chosen as a default value to ensure that + // 10 seconds was chosen as a default value to ensure that // the global tag throttler does not react too drastically to // changes in workload. To make the global tag throttler more reactive, // lower this knob. To make global tag throttler more smooth, raise this knob. // Setting this knob lower than TAG_MEASUREMENT_INTERVAL can cause erratic // behaviour and is not recommended. - init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 60.0 ); + init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 10.0 ); init( GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED, 10 ); init( GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER, 240.0 ); init( GLOBAL_TAG_THROTTLING_PROXY_LOGGING_INTERVAL, 60.0 ); @@ -918,7 +918,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( WAIT_METRICS_WRONG_SHARD_CHANCE, isSimulated ? 1.0 : 0.1 ); init( MIN_TAG_READ_PAGES_RATE, 100 ); if( randomize && BUGGIFY ) MIN_TAG_READ_PAGES_RATE = 0; init( MIN_TAG_WRITE_PAGES_RATE, 100 ); if( randomize && BUGGIFY ) MIN_TAG_WRITE_PAGES_RATE = 0; - init( TAG_MEASUREMENT_INTERVAL, 30.0 ); if( randomize && BUGGIFY ) TAG_MEASUREMENT_INTERVAL = 4.0; + init( TAG_MEASUREMENT_INTERVAL, 5.0 ); if( randomize && BUGGIFY ) TAG_MEASUREMENT_INTERVAL = 10.0; init( PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS, true ); if( randomize && BUGGIFY ) PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS = false; init( REPORT_DD_METRICS, true ); init( DD_METRICS_REPORT_INTERVAL, 30.0 ); From ae6167e576297ce57f0a1b4b7b0849c34df42f1e Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Fri, 26 May 2023 11:03:37 -0700 Subject: [PATCH 05/10] Update StorageQueueInfo::getTagThrottlingRatio implementation --- fdbserver/Ratekeeper.actor.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index e1b91d8457..ff20e2b869 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -1391,11 +1391,13 @@ UpdateCommitCostRequest StorageQueueInfo::refreshCommitCost(double elapsed) { Optional StorageQueueInfo::getTagThrottlingRatio(int64_t storageTargetBytes, int64_t storageSpringBytes) const { auto const storageQueue = getStorageQueueBytes(); - if (storageQueue < storageTargetBytes - storageSpringBytes) { - return {}; + // TODO: Remove duplicate calculation from Ratekeeper::updateRate + double inverseResult = std::min( + 2.0, (storageQueue - storageTargetBytes + storageSpringBytes) / static_cast(storageSpringBytes)); + if (inverseResult > 0) { + return 1.0 / inverseResult; } else { - return std::max( - 0.0, static_cast((storageTargetBytes + storageSpringBytes) - storageQueue) / storageSpringBytes); + return {}; } } From 926a7cbb4db16784652d5f1fc855a9f4db9a4494 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Fri, 26 May 2023 11:16:36 -0700 Subject: [PATCH 06/10] Add AUTO_TAG_THROTTLE_SPRING_BYTES_STORAGE_SERVER knob --- fdbclient/ServerKnobs.cpp | 1 + fdbclient/include/fdbclient/ServerKnobs.h | 1 + fdbserver/GlobalTagThrottler.actor.cpp | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 6bc4543591..91583e11cf 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -755,6 +755,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( TARGET_BYTES_PER_STORAGE_SERVER, 1000e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER = 3000e3; init( SPRING_BYTES_STORAGE_SERVER, 100e6 ); if( smallStorageTarget ) SPRING_BYTES_STORAGE_SERVER = 300e3; init( AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES, 800e6 ); if( smallStorageTarget ) AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES = 2500e3; + init( AUTO_TAG_THROTTLE_SPRING_BYTES_STORAGE_SERVER, 200e6 ); if( smallStorageTarget ) AUTO_TAG_THROTTLE_SPRING_BYTES_STORAGE_SERVER = 500e3; init( TARGET_BYTES_PER_STORAGE_SERVER_BATCH, 750e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER_BATCH = 1500e3; init( SPRING_BYTES_STORAGE_SERVER_BATCH, 100e6 ); if( smallStorageTarget ) SPRING_BYTES_STORAGE_SERVER_BATCH = 150e3; init( STORAGE_HARD_LIMIT_BYTES, 1500e6 ); if( smallStorageTarget ) STORAGE_HARD_LIMIT_BYTES = 4500e3; diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 5d4b8af04b..fa6ca86d41 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -711,6 +711,7 @@ public: int64_t TARGET_BYTES_PER_STORAGE_SERVER; int64_t SPRING_BYTES_STORAGE_SERVER; int64_t AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES; + int64_t AUTO_TAG_THROTTLE_SPRING_BYTES_STORAGE_SERVER; int64_t TARGET_BYTES_PER_STORAGE_SERVER_BATCH; int64_t SPRING_BYTES_STORAGE_SERVER_BATCH; int64_t STORAGE_HARD_LIMIT_BYTES; diff --git a/fdbserver/GlobalTagThrottler.actor.cpp b/fdbserver/GlobalTagThrottler.actor.cpp index c419352243..6e23cf6c0e 100644 --- a/fdbserver/GlobalTagThrottler.actor.cpp +++ b/fdbserver/GlobalTagThrottler.actor.cpp @@ -534,7 +534,7 @@ public: Future tryUpdateAutoThrottling(StorageQueueInfo const& ss) { auto& ssInfo = ssInfos[ss.id]; ssInfo.throttlingRatio = ss.getTagThrottlingRatio(SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES, - SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER); + SERVER_KNOBS->AUTO_TAG_THROTTLE_SPRING_BYTES_STORAGE_SERVER); ssInfo.zoneId = ss.locality.zoneId(); auto& tagToThroughputCounters = throughput[ss.id]; From 61aaca005ea9e4c609ba9ffaad481c438fd34df2 Mon Sep 17 00:00:00 2001 From: Zhe Wang Date: Mon, 29 May 2023 14:43:47 -0700 Subject: [PATCH 07/10] SS Audit Storage Throttling (#10322) * ss audit storage throttling * add audit manager to ss * reduce CONCURRENT_AUDIT_TASK_COUNT_MAX * revises comments * fix audit cli * fix getAuditStates * remove toStringForCLI --- fdbcli/GetAuditStatusCommand.actor.cpp | 2 +- fdbclient/AuditUtils.actor.cpp | 18 ++-- fdbclient/ServerKnobs.cpp | 2 +- fdbclient/include/fdbclient/Audit.h | 16 +--- fdbclient/include/fdbclient/FDBTypes.h | 2 +- fdbserver/DataDistribution.actor.cpp | 3 +- fdbserver/storageserver.actor.cpp | 93 +++++++++++++++---- fdbserver/workloads/ValidateStorage.actor.cpp | 4 + 8 files changed, 94 insertions(+), 46 deletions(-) diff --git a/fdbcli/GetAuditStatusCommand.actor.cpp b/fdbcli/GetAuditStatusCommand.actor.cpp index 5256f5f64b..30962fe484 100644 --- a/fdbcli/GetAuditStatusCommand.actor.cpp +++ b/fdbcli/GetAuditStatusCommand.actor.cpp @@ -63,7 +63,7 @@ ACTOR Future getAuditStatusCommandActor(Database cx, std::vectorTOO_MANY; if (tokens.size() == 4) { diff --git a/fdbclient/AuditUtils.actor.cpp b/fdbclient/AuditUtils.actor.cpp index ba43a34d5f..7a96f2d855 100644 --- a/fdbclient/AuditUtils.actor.cpp +++ b/fdbclient/AuditUtils.actor.cpp @@ -119,26 +119,28 @@ ACTOR Future> getAuditStates(Database cx, state std::vector auditStates; state Key readBegin; state Key readEnd; - state RangeResult res; state Reverse reverse = newFirst ? Reverse::True : Reverse::False; - + if (num.present() && num.get() == 0) { + return auditStates; + } loop { try { readBegin = auditKeyRange(auditType).begin; readEnd = auditKeyRange(auditType).end; auditStates.clear(); while (true) { - res.clear(); tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); KeyRangeRef rangeToRead(readBegin, readEnd); - if (num.present()) { - wait(store(res, tr.getRange(rangeToRead, num.get(), Snapshot::False, reverse))); - } else { - wait(store(res, tr.getRange(rangeToRead, GetRangeLimits(), Snapshot::False, reverse))); - } + state RangeResult res = wait(tr.getRange(rangeToRead, + num.present() ? GetRangeLimits(num.get()) : GetRangeLimits(), + Snapshot::False, + reverse)); for (int i = 0; i < res.size(); ++i) { auditStates.push_back(decodeAuditStorageState(res[i].value)); + if (num.present() && auditStates.size() == num.get()) { + return auditStates; // since res.more is not reliable when GetRangeLimits is set to 1 + } } if (!res.more) { break; diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 2273a98555..136c6e92d9 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -878,7 +878,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( SERVE_AUDIT_STORAGE_PARALLELISM, 1 ); init( PERSIST_FINISH_AUDIT_COUNT, 10 ); if ( isSimulated ) PERSIST_FINISH_AUDIT_COUNT = 1; init( AUDIT_RETRY_COUNT_MAX, 100 ); if ( isSimulated ) AUDIT_RETRY_COUNT_MAX = 10; - init( CONCURRENT_AUDIT_TASK_COUNT_MAX, 50 ); if ( isSimulated ) CONCURRENT_AUDIT_TASK_COUNT_MAX = deterministicRandom()->randomInt(1, CONCURRENT_AUDIT_TASK_COUNT_MAX+1); + init( CONCURRENT_AUDIT_TASK_COUNT_MAX, 10 ); if ( isSimulated ) CONCURRENT_AUDIT_TASK_COUNT_MAX = deterministicRandom()->randomInt(1, CONCURRENT_AUDIT_TASK_COUNT_MAX+1); init( BUGGIFY_BLOCK_BYTES, 10000 ); init( STORAGE_RECOVERY_VERSION_LAG_LIMIT, 2 * MAX_READ_TRANSACTION_LIFE_VERSIONS ); init( STORAGE_COMMIT_BYTES, 10000000 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_BYTES = 2000000; diff --git a/fdbclient/include/fdbclient/Audit.h b/fdbclient/include/fdbclient/Audit.h index ce9ae90a6b..fc1e591b19 100644 --- a/fdbclient/include/fdbclient/Audit.h +++ b/fdbclient/include/fdbclient/Audit.h @@ -64,24 +64,10 @@ struct AuditStorageState { inline void setPhase(AuditPhase phase) { this->phase = static_cast(phase); } inline AuditPhase getPhase() const { return static_cast(this->phase); } - // for fdbcli get_audit_status - std::string toStringForCLI() const { - std::string res = "AuditStorageState: [ID]: " + id.toString() + - ", [Range]: " + Traceable::toString(range) + - ", [Type]: " + std::to_string(type) + ", [Phase]: " + std::to_string(phase); - if (!error.empty()) { - res += "[Error]: " + error; - } - - return res; - } - - // for traceevent std::string toString() const { std::string res = "AuditStorageState: [ID]: " + id.toString() + ", [Range]: " + Traceable::toString(range) + - ", [Type]: " + std::to_string(type) + ", [Phase]: " + std::to_string(phase) + - ", [AuditServerID]: " + auditServerId.toString(); + ", [Type]: " + std::to_string(type) + ", [Phase]: " + std::to_string(phase); if (!error.empty()) { res += "[Error]: " + error; } diff --git a/fdbclient/include/fdbclient/FDBTypes.h b/fdbclient/include/fdbclient/FDBTypes.h index e411844c9d..f71f2b833f 100644 --- a/fdbclient/include/fdbclient/FDBTypes.h +++ b/fdbclient/include/fdbclient/FDBTypes.h @@ -829,7 +829,7 @@ struct RangeResultRef : VectorRef { serializer(ar, ((VectorRef&)*this), more, readThrough, readToBegin, readThroughEnd); } - int logicalSize() const { + int64_t logicalSize() const { return VectorRef::expectedSize() - VectorRef::size() * sizeof(KeyValueRef); } diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 9c70181620..84af2ba1e4 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -2571,7 +2571,8 @@ ACTOR Future doAuditOnStorageServer(Reference self, self->remainingBudgetForAuditTasks[auditType].set(self->remainingBudgetForAuditTasks[auditType].get() + 1); ASSERT(self->remainingBudgetForAuditTasks[auditType].get() <= SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX); - if (e.code() == error_code_actor_cancelled) { + if (e.code() == error_code_actor_cancelled || e.code() == error_code_not_implemented || + e.code() == error_code_audit_storage_exceeded_request_limit) { throw e; } else if (e.code() == error_code_audit_storage_error) { audit->foundError = true; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 81bcea9c21..021a0be39d 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -997,7 +997,7 @@ public: std::unordered_map> moveInShards; bool shardAware; // True if the storage server is aware of the physical shards. - Future auditSSShardInfoActor; + std::unordered_map> auditTasks; // Histograms struct FetchKeysHistograms { @@ -4912,6 +4912,10 @@ ACTOR Future validateRangeAgainstServer(StorageServer* data, state int validatedKeys = 0; state std::string error; state int64_t cumulatedValidatedKeysNum = 0; + state Reference rateLimiter = + Reference(new SpeedLimit(CLIENT_KNOBS->CONSISTENCY_CHECK_RATE_LIMIT_MAX, 1)); + state int64_t remoteReadBytes = 0; + loop { try { std::vector>> fs; @@ -4957,6 +4961,7 @@ ACTOR Future validateRangeAgainstServer(StorageServer* data, } const GetKeyValuesReply &remote = reps[0].get(), local = reps[1].get(); + remoteReadBytes = remote.data.expectedSize(); Key lastKey = range.begin; auditState.range = range; @@ -5034,6 +5039,9 @@ ACTOR Future validateRangeAgainstServer(StorageServer* data, auditState.ddId = ddId; // used to compare req.ddId with existing persisted ddId wait(persistAuditStateByRange(data->cx, auditState)); } + + wait(rateLimiter->getAllowance(remoteReadBytes)); // RateKeeping + } catch (Error& e) { TraceEvent(SevWarn, "ValidateRangeAgainstServerFailed", data->thisServerID) .errorUnsuppressed(e) @@ -5341,9 +5349,15 @@ struct AuditGetServerKeysRes { Version readAtVersion; UID serverId; std::vector ownRanges; + int64_t readBytes; AuditGetServerKeysRes() = default; - AuditGetServerKeysRes(KeyRange completeRange, Version readAtVersion, UID serverId, std::vector ownRanges) - : completeRange(completeRange), readAtVersion(readAtVersion), serverId(serverId), ownRanges(ownRanges) {} + AuditGetServerKeysRes(KeyRange completeRange, + Version readAtVersion, + UID serverId, + std::vector ownRanges, + int64_t readBytes) + : completeRange(completeRange), readAtVersion(readAtVersion), serverId(serverId), ownRanges(ownRanges), + readBytes(readBytes) {} }; // Given an input server, get ranges within the input range via the input transaction @@ -5391,7 +5405,7 @@ ACTOR Future getThisServerKeysFromServerKeys(UID serverID .detail("ReadAtVersion", readAtVersion) .detail("CompleteRange", completeRange) .detail("ResultSize", ownRanges.size()); - res = AuditGetServerKeysRes(completeRange, readAtVersion, serverID, ownRanges); + res = AuditGetServerKeysRes(completeRange, readAtVersion, serverID, ownRanges, readResult.logicalSize()); } catch (Error& e) { TraceEvent(SevDebug, "AuditStorageGetThisServerKeysError", serverID) @@ -5406,12 +5420,15 @@ ACTOR Future getThisServerKeysFromServerKeys(UID serverID struct AuditGetKeyServersRes { KeyRange completeRange; Version readAtVersion; + int64_t readBytes; std::unordered_map> rangeOwnershipMap; AuditGetKeyServersRes() = default; AuditGetKeyServersRes(KeyRange completeRange, Version readAtVersion, - std::unordered_map> rangeOwnershipMap) - : completeRange(completeRange), readAtVersion(readAtVersion), rangeOwnershipMap(rangeOwnershipMap) {} + std::unordered_map> rangeOwnershipMap, + int64_t readBytes) + : completeRange(completeRange), readAtVersion(readAtVersion), rangeOwnershipMap(rangeOwnershipMap), + readBytes(readBytes) {} }; // Given an input server, get ranges within the input range via the input transaction @@ -5471,7 +5488,7 @@ ACTOR Future getShardMapFromKeyServers(UID auditServerId, .detail("AtVersion", readAtVersion) .detail("ShardsInAnonymousPhysicalShardCount", shardsInAnonymousPhysicalShardCount) .detail("TotalShardsCount", totalShardsCount); - res = AuditGetKeyServersRes(completeRange, readAtVersion, serverOwnRanges); + res = AuditGetKeyServersRes(completeRange, readAtVersion, serverOwnRanges, readResult.logicalSize()); } catch (Error& e) { TraceEvent(SevDebug, "AuditStorageGetThisServerKeysFromKeyServersError", auditServerId) @@ -5513,6 +5530,9 @@ ACTOR Future auditStorageStorageServerShardQ(StorageServer* data, AuditSto state int retryCount = 0; state int64_t cumulatedValidatedLocalShardsNum = 0; state int64_t cumulatedValidatedServerKeysNum = 0; + state Reference rateLimiter = + Reference(new SpeedLimit(CLIENT_KNOBS->CONSISTENCY_CHECK_RATE_LIMIT_MAX, 1)); + state int64_t remoteReadBytes = 0; try { while (true) { @@ -5544,12 +5564,14 @@ ACTOR Future auditStorageStorageServerShardQ(StorageServer* data, AuditSto serverKeyCompleteRange = serverKeyRes.completeRange; serverKeyReadAtVersion = serverKeyRes.readAtVersion; ownRangesSeenByServerKey = serverKeyRes.ownRanges; + remoteReadBytes = serverKeyRes.readBytes; // We want to do transactional read at a version newer than data->version while (serverKeyReadAtVersion < localShardInfoReadAtVersion) { if (retryCount >= SERVER_KNOBS->AUDIT_RETRY_COUNT_MAX) { failureReason = "Read serverKeys retry count exceeds the max"; throw audit_storage_failed(); } + wait(rateLimiter->getAllowance(remoteReadBytes)); // RateKeeping retryCount++; wait(delay(0.5)); tr.reset(); @@ -5558,6 +5580,7 @@ ACTOR Future auditStorageStorageServerShardQ(StorageServer* data, AuditSto serverKeyCompleteRange = serverKeyRes.completeRange; serverKeyReadAtVersion = serverKeyRes.readAtVersion; ownRangesSeenByServerKey = serverKeyRes.ownRanges; + remoteReadBytes = serverKeyRes.readBytes; } // retry until serverKeyReadAtVersion is as larger as localShardInfoReadAtVersion ASSERT(serverKeyReadAtVersion >= localShardInfoReadAtVersion); try { @@ -5681,6 +5704,8 @@ ACTOR Future auditStorageStorageServerShardQ(StorageServer* data, AuditSto break; } } + + wait(rateLimiter->getAllowance(remoteReadBytes)); // RateKeeping } } catch (Error& e) { TraceEvent(SevInfo, "AuditStorageSsShardFailed", data->thisServerID) @@ -5734,6 +5759,9 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora state KeyRangeRef rangeToRead; state int64_t cumulatedValidatedServerKeysNum = 0; state int64_t cumulatedValidatedKeyServersNum = 0; + state Reference rateLimiter = + Reference(new SpeedLimit(CLIENT_KNOBS->CONSISTENCY_CHECK_RATE_LIMIT_MAX, 1)); + state int64_t remoteReadBytes = 0; try { while (true) { @@ -5744,6 +5772,7 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora mapFromKeyServers.clear(); serverKeyResMap.clear(); mapFromKeyServersRaw.clear(); + remoteReadBytes = 0; rangeToRead = KeyRangeRef(rangeToReadBegin, req.range.end); tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); @@ -5752,6 +5781,7 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora completeRangeByKeyServer = keyServerRes.completeRange; readAtVersion = keyServerRes.readAtVersion; mapFromKeyServersRaw = keyServerRes.rangeOwnershipMap; + remoteReadBytes += keyServerRes.readBytes; // Use ssid of mapFromKeyServersRaw to read ServerKeys for (auto& [ssid, _] : mapFromKeyServersRaw) { actors.push_back(store(serverKeyResMap[ssid], getThisServerKeysFromServerKeys(ssid, &tr, rangeToRead))); @@ -5771,6 +5801,7 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora ASSERT(!overlappingRange.empty()); claimRange = overlappingRange; ASSERT(readAtVersion == serverKeyRes.readAtVersion); + remoteReadBytes += serverKeyRes.readBytes; } // Use claimRange to get mapFromServerKeys and mapFromKeyServers to compare int64_t numValidatedServerKeys = 0; @@ -5915,6 +5946,7 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora break; } } + wait(rateLimiter->getAllowance(remoteReadBytes)); // Rate Keeping } } catch (Error& e) { @@ -13516,20 +13548,43 @@ ACTOR Future storageServerCore(StorageServer* self, StorageServerInterface self->actors.add(fetchCheckpointKeyValuesQ(self, req)); } when(AuditStorageRequest req = waitNext(ssi.auditStorage.getFuture())) { - // A SS can run one ValidateStorageServerShard at a time - // We do not have this limitation on other audit types - if (req.getType() == AuditType::ValidateStorageServerShard) { - if (self->auditSSShardInfoActor.isValid() && !self->auditSSShardInfoActor.isReady()) { - TraceEvent(SevWarn, "ExistRunningAuditStorageForServerShard") - .detail("NewAuditId", req.id) - .detail("NewAuditType", req.getType()); - self->auditSSShardInfoActor.cancel(); - } // New audit immediately starts and existing one gets cancelled - self->auditSSShardInfoActor = auditStorageStorageServerShardQ(self, req); + // Check existing audit task states + if (self->auditTasks.contains(req.getType())) { + if (req.id != self->auditTasks[req.getType()].first) { + // Any task of past audit must be ready + if (!self->auditTasks[req.getType()].second.getResult().isReady()) { + req.reply.sendError(audit_storage_exceeded_request_limit()); + TraceEvent(SevWarnAlways, "ExistSSAuditWithDifferentId") // unexpected + .detail("NewAuditId", req.id) + .detail("NewAuditType", req.getType()); + continue; + } + } else if (req.getType() == AuditType::ValidateStorageServerShard && + !self->auditTasks[req.getType()].second.getResult().isReady()) { + // Only one ValidateStorageServerShard is allowed to run at a time + TraceEvent(SevWarn, "ExistSSAuditForServerShardWithSameId") + .detail("AuditId", req.id) + .detail("AuditType", req.getType()); + self->auditTasks[req.getType()].second.clear(true); + } + } + // Prepare for the new audit task + if (!self->auditTasks.contains(req.getType()) || + self->auditTasks[req.getType()].second.getResult().isReady()) { + ASSERT(req.id.isValid()); + self->auditTasks[req.getType()] = std::make_pair(req.id, ActorCollection(true)); + } + // Start the new audit task + if (req.getType() == AuditType::ValidateHA) { + self->auditTasks[req.getType()].second.add(auditStorageQ(self, req)); + } else if (req.getType() == AuditType::ValidateReplica) { + self->auditTasks[req.getType()].second.add(auditStorageQ(self, req)); } else if (req.getType() == AuditType::ValidateLocationMetadata) { - self->actors.add(auditStorageLocationMetadataQ(self, req)); + self->auditTasks[req.getType()].second.add(auditStorageLocationMetadataQ(self, req)); + } else if (req.getType() == AuditType::ValidateStorageServerShard) { + self->auditTasks[req.getType()].second.add(auditStorageStorageServerShardQ(self, req)); } else { - self->actors.add(auditStorageQ(self, req)); + req.reply.sendError(not_implemented()); } } when(wait(updateProcessStatsTimer)) { diff --git a/fdbserver/workloads/ValidateStorage.actor.cpp b/fdbserver/workloads/ValidateStorage.actor.cpp index 7716b01765..c42ac3d4f5 100644 --- a/fdbserver/workloads/ValidateStorage.actor.cpp +++ b/fdbserver/workloads/ValidateStorage.actor.cpp @@ -207,6 +207,10 @@ struct ValidateStorage : TestWorkload { .detail("AuditIDA", auditIdA) .detail("AuditIDB", auditIdB); } + std::vector res = wait(getAuditStates(cx, type, /*newFirst=*/true, 1)); + if (res.size() != 1) { + TraceEvent(SevError, "TestGetAuditStatesError").detail("ActualResSize", res.size()); + } return Void(); } From f94eada7c066a39495e48205fbb9d3be4afbe832 Mon Sep 17 00:00:00 2001 From: Aaron Molitor Date: Fri, 12 May 2023 11:57:48 -0500 Subject: [PATCH 08/10] add cmake option to include RocksDB Tools with the Rocks DB compile --- cmake/CompileRocksDB.cmake | 4 ++-- cmake/FDBComponents.cmake | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cmake/CompileRocksDB.cmake b/cmake/CompileRocksDB.cmake index 2a4413dc83..43ca729823 100644 --- a/cmake/CompileRocksDB.cmake +++ b/cmake/CompileRocksDB.cmake @@ -22,8 +22,8 @@ set(RocksDB_CMAKE_ARGS -DFAIL_ON_WARNINGS=OFF -DWITH_GFLAGS=OFF -DWITH_TESTS=OFF - -DWITH_TOOLS=OFF - -DWITH_CORE_TOOLS=OFF + -DWITH_TOOLS=${ROCKSDB_TOOLS} + -DWITH_CORE_TOOLS=${ROCKSDB_TOOLS} -DWITH_BENCHMARK_TOOLS=OFF -DWITH_BZ2=OFF -DWITH_LZ4=ON diff --git a/cmake/FDBComponents.cmake b/cmake/FDBComponents.cmake index 9a55045f0c..e9594646af 100644 --- a/cmake/FDBComponents.cmake +++ b/cmake/FDBComponents.cmake @@ -157,6 +157,7 @@ set(PORTABLE_ROCKSDB ON CACHE BOOL "Compile RocksDB in portable mode") # Set thi set(ROCKSDB_SSE42 OFF CACHE BOOL "Compile RocksDB with SSE42 enabled") set(ROCKSDB_AVX ${USE_AVX} CACHE BOOL "Compile RocksDB with AVX enabled") set(ROCKSDB_AVX2 OFF CACHE BOOL "Compile RocksDB with AVX2 enabled") +set(ROCKSDB_TOOLS OFF CACHE BOOL "Compile RocksDB tools") set(WITH_LIBURING OFF CACHE BOOL "Build with liburing enabled") # Set this to ON to include liburing # RocksDB is currently enabled by default for GCC but does not build with the latest # Clang. From 0674984ab190dca14ebdf136ae0f2ecc1cc63b8c Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Tue, 30 May 2023 09:30:06 -0700 Subject: [PATCH 09/10] Fix a simulation DR stuck issue When buggify is enabled, it's possible the version map has 5 entries, which is larger than BACKUP_MAP_KEY_LOWER_LIMIT, causing the range task to be delayed infinitely: the BackupRangeTaskFunc::_execute() skips the execution and schedules the task to be added back in BackupRangeTaskFunc::_finish(). Reproduction: Seed: -f ./tests/slow/SharedDefaultBackupCorrectness.toml -s 3202874095 -b on -f ./tests/slow/VersionStampBackupToDB.toml -s 1190111003 -b on Commit: 6e5773dd5 at release-7.3 Build: clang --- fdbclient/DatabaseBackupAgent.actor.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/fdbclient/DatabaseBackupAgent.actor.cpp b/fdbclient/DatabaseBackupAgent.actor.cpp index 3af962ca09..d0ba76e926 100644 --- a/fdbclient/DatabaseBackupAgent.actor.cpp +++ b/fdbclient/DatabaseBackupAgent.actor.cpp @@ -26,7 +26,9 @@ #include "fdbclient/NativeAPI.actor.h" #include #include +#include "fdbrpc/simulator.h" #include "flow/IAsyncFile.h" +#include "flow/flow.h" #include "flow/genericactors.actor.h" #include "flow/Hash3.h" #include @@ -361,8 +363,10 @@ struct BackupRangeTaskFunc : TaskFuncBase { if ((!prevAdjacent || !nextAdjacent) && rangeCount > ((prevAdjacent || nextAdjacent) ? CLIENT_KNOBS->BACKUP_MAP_KEY_UPPER_LIMIT - : CLIENT_KNOBS->BACKUP_MAP_KEY_LOWER_LIMIT)) { - CODE_PROBE(true, "range insert delayed because too versionMap is too large"); + : CLIENT_KNOBS->BACKUP_MAP_KEY_LOWER_LIMIT) && + (!g_network->isSimulated() || + (isBuggifyEnabled(BuggifyType::General) && g_simulator->speedUpSimulation))) { + CODE_PROBE(true, "range insert delayed because versionMap is too large"); if (rangeCount > CLIENT_KNOBS->BACKUP_MAP_KEY_UPPER_LIMIT) TraceEvent(SevWarnAlways, "DBA_KeyRangeMapTooLarge").log(); From 43d67d6f986f0e27f4887bba4943cc8e460220a7 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Tue, 30 May 2023 10:04:05 -0700 Subject: [PATCH 10/10] Should repeat when speedUpSimulation is false --- fdbclient/DatabaseBackupAgent.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbclient/DatabaseBackupAgent.actor.cpp b/fdbclient/DatabaseBackupAgent.actor.cpp index d0ba76e926..4cc18eef2a 100644 --- a/fdbclient/DatabaseBackupAgent.actor.cpp +++ b/fdbclient/DatabaseBackupAgent.actor.cpp @@ -365,7 +365,7 @@ struct BackupRangeTaskFunc : TaskFuncBase { rangeCount > ((prevAdjacent || nextAdjacent) ? CLIENT_KNOBS->BACKUP_MAP_KEY_UPPER_LIMIT : CLIENT_KNOBS->BACKUP_MAP_KEY_LOWER_LIMIT) && (!g_network->isSimulated() || - (isBuggifyEnabled(BuggifyType::General) && g_simulator->speedUpSimulation))) { + (isBuggifyEnabled(BuggifyType::General) && !g_simulator->speedUpSimulation))) { CODE_PROBE(true, "range insert delayed because versionMap is too large"); if (rangeCount > CLIENT_KNOBS->BACKUP_MAP_KEY_UPPER_LIMIT)