From c73577de7dbe9c3d3663c87db80b59fcb35c6d80 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Fri, 3 Mar 2023 15:37:45 -0800 Subject: [PATCH 1/7] Add team priority comments and document. --- design/data-distributor-internals.md | 9 +++++-- fdbclient/include/fdbclient/ServerKnobs.h | 33 +++++++++++++---------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/design/data-distributor-internals.md b/design/data-distributor-internals.md index ccaba537b6..49b04b2cb9 100644 --- a/design/data-distributor-internals.md +++ b/design/data-distributor-internals.md @@ -81,6 +81,11 @@ Actors are created to monitor the reasons of key movement: (3) `serverTeamRemover` and `machineTeamRemover` actors periodically evaluate if the number of server teams and machine teams is larger than the desired number. If so, they respectively pick a server team or a machine team to remove based on predefined criteria; (4) `teamTracker` actor monitors a team’s healthiness. When a server in the team becomes unhealthy, it issues the `RelocateShard` request to repair the replication factor. The less servers a team has, the higher priority the `RelocateShard` request will be. +#### Movement Priority +Each shard movement has a priority associating with the move attempt, which is depended on the priority of the source team. The explanation of each priority knob (`PRIORITY_`) is in `ServerKnobs.h`. + +In `status json` output, please look at field `.data.team_tracker.state` for team priority state. + ### How to move keys? A key range is a shard. A shard is the minimum unit of moving data. The storage server’s ownership of a shard -- which SS owns which shard -- is stored in the system keyspace *serverKeys* (`\xff/serverKeys/`) and *keyServers* (`\xff/keyServers/`). To simplify the explanation, we refer to the storage server’s ownership of a shard as a shard’s ownership. @@ -152,11 +157,11 @@ CPU utilization. This metric is in a positive relationship with “FinishedQueri * Read-aware DD will balance the read workload under the read-skew scenario. Starting from an imbalance `STD(FinishedQueries per minute)=16k`,the best result it can achieve is `STD(FinishedQueries per minute) = 2k`. * The typical movement size under a read-skew scenario is 100M ~ 600M under default KNOB value `READ_REBALANCE_MAX_SHARD_FRAC=0.2, READ_REBALANCE_SRC_PARALLELISM = 20`. Increasing those knobs may accelerate the converge speed with the risk of data movement churn, which overwhelms the destination and over-cold the source. * The upper bound of `READ_REBALANCE_MAX_SHARD_FRAC` is 0.5. Any value larger than 0.5 can result in hot server switching. -* When needing a deeper diagnosis of the read aware DD, `BgDDMountainChopper_New`, and `BgDDValleyFiller_New` trace events are where to go. +* When needing a deeper diagnosis of the read aware DD, `BgDDMountainChopper`, and `BgDDValleyFiller` trace events are where to go. ## Data Distribution Diagnosis Q&A * Why Read-aware DD hasn't been triggered when there's a read imbalance? - * Check `BgDDMountainChopper_New`, `BgDDValleyFiller_New` `SkipReason` field. + * Check `BgDDMountainChopper`, `BgDDValleyFiller` `SkipReason` field. * The Read-aware DD is triggered, and some data movement happened, but it doesn't help the read balance. Why? * Need to figure out which server is selected as the source and destination. The information is in `BgDDMountainChopper*`, `BgDDValleyFiller*` `DestTeam` and `SourceTeam` field. * Also, the `DDQueueServerCounter` event tells how many times a server being a source or destination (defined in diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 58d66e22d1..157d49887d 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -142,22 +142,27 @@ public: // is possible within but not between priority groups; fewer priority groups // mean better worst case time bounds // Maximum allowable priority is 999. - int PRIORITY_RECOVER_MOVE; - int PRIORITY_REBALANCE_UNDERUTILIZED_TEAM; - int PRIORITY_REBALANCE_OVERUTILIZED_TEAM; - int PRIORITY_REBALANCE_READ_OVERUTIL_TEAM; - int PRIORITY_REBALANCE_READ_UNDERUTIL_TEAM; - int PRIORITY_PERPETUAL_STORAGE_WIGGLE; - int PRIORITY_TEAM_HEALTHY; - int PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER; - int PRIORITY_TEAM_REDUNDANT; + // Update the status json .data.team_tracker.state field when necessary + int PRIORITY_RECOVER_MOVE; // Priority for movement resume from previous unfinished in-flight movement when a new DD + // start + int PRIORITY_REBALANCE_UNDERUTILIZED_TEAM; // For disk valley filler + int PRIORITY_REBALANCE_OVERUTILIZED_TEAM; // For disk mountain chopper + int PRIORITY_REBALANCE_READ_OVERUTIL_TEAM; // For read valley filler + int PRIORITY_REBALANCE_READ_UNDERUTIL_TEAM; // For read mountain chopper + int PRIORITY_PERPETUAL_STORAGE_WIGGLE; // Priority for wiggle a storage server + int PRIORITY_TEAM_HEALTHY; // Priority when all servers in a team are healthy + int PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER; // Priority when there's undesired servers in the team. (ex. same ip + // address as other SS process, or SS is lagging too far ...) + int PRIORITY_TEAM_REDUNDANT; // Priority for removing redundant team to make the team count within a good range int PRIORITY_MERGE_SHARD; - int PRIORITY_POPULATE_REGION; - int PRIORITY_TEAM_UNHEALTHY; - int PRIORITY_TEAM_2_LEFT; - int PRIORITY_TEAM_1_LEFT; + int PRIORITY_POPULATE_REGION; // Priority for populate remote region + int PRIORITY_TEAM_UNHEALTHY; // Priority when the replica > 3 and there's at least one unhealthy server in a team. + // Or when the team contains a server with wrong configuration (ex. storage engine, + // locality, excluded ...) + int PRIORITY_TEAM_2_LEFT; // Priority when the replica <= 3 and there's 2 healthy servers in a team + int PRIORITY_TEAM_1_LEFT; // Priority when the replica <= 3 and there's 1 healthy server in a team int PRIORITY_TEAM_FAILED; // Priority when a server in the team is excluded as failed - int PRIORITY_TEAM_0_LEFT; + int PRIORITY_TEAM_0_LEFT; // Priority when the replica <= 3 and there's no healthy server in a team int PRIORITY_SPLIT_SHARD; int PRIORITY_ENFORCE_MOVE_OUT_OF_PHYSICAL_SHARD; // Priority when a physical shard is oversize or anonymous From f89a483f3d3acfd63823116acbce296f255c687c Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Thu, 16 Mar 2023 12:26:46 -0700 Subject: [PATCH 2/7] add informal classification of priority --- design/data-distributor-internals.md | 8 ++- fdbclient/include/fdbclient/ServerKnobs.h | 59 +++++++++++++++-------- 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/design/data-distributor-internals.md b/design/data-distributor-internals.md index 49b04b2cb9..a7fad7e26f 100644 --- a/design/data-distributor-internals.md +++ b/design/data-distributor-internals.md @@ -82,7 +82,13 @@ Actors are created to monitor the reasons of key movement: (4) `teamTracker` actor monitors a team’s healthiness. When a server in the team becomes unhealthy, it issues the `RelocateShard` request to repair the replication factor. The less servers a team has, the higher priority the `RelocateShard` request will be. #### Movement Priority -Each shard movement has a priority associating with the move attempt, which is depended on the priority of the source team. The explanation of each priority knob (`PRIORITY_`) is in `ServerKnobs.h`. +There are roughly 4 class of movement priorities +* Healthy priority. The movement is for maintain the cluster healthy status, and the priority is depended on the healthy status of the source team. +* Load balance priority. The movement is for balance cluster workload. +* Boundary change priority. The movement will change current shard boundaries. +* Others. Like resuming a in-flight movement. + +Each shard movement has a priority associating with the move attempt, The explanation of each priority knob (`PRIORITY_`) is in `ServerKnobs.h`. In `status json` output, please look at field `.data.team_tracker.state` for team priority state. diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 157d49887d..4ec0c1cc11 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -143,28 +143,47 @@ public: // mean better worst case time bounds // Maximum allowable priority is 999. // Update the status json .data.team_tracker.state field when necessary - int PRIORITY_RECOVER_MOVE; // Priority for movement resume from previous unfinished in-flight movement when a new DD - // start - int PRIORITY_REBALANCE_UNDERUTILIZED_TEAM; // For disk valley filler - int PRIORITY_REBALANCE_OVERUTILIZED_TEAM; // For disk mountain chopper - int PRIORITY_REBALANCE_READ_OVERUTIL_TEAM; // For read valley filler - int PRIORITY_REBALANCE_READ_UNDERUTIL_TEAM; // For read mountain chopper - int PRIORITY_PERPETUAL_STORAGE_WIGGLE; // Priority for wiggle a storage server - int PRIORITY_TEAM_HEALTHY; // Priority when all servers in a team are healthy - int PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER; // Priority when there's undesired servers in the team. (ex. same ip - // address as other SS process, or SS is lagging too far ...) - int PRIORITY_TEAM_REDUNDANT; // Priority for removing redundant team to make the team count within a good range + // + // Priority for movement resume from previous unfinished in-flight movement when a new DD + // start + int PRIORITY_RECOVER_MOVE; + // A load-balance priority for disk valley filler + int PRIORITY_REBALANCE_UNDERUTILIZED_TEAM; + // A load-balance priority disk mountain chopper + int PRIORITY_REBALANCE_OVERUTILIZED_TEAM; + // A load-balance priority read valley filler + int PRIORITY_REBALANCE_READ_OVERUTIL_TEAM; + // A load-balance priority read mountain chopper + int PRIORITY_REBALANCE_READ_UNDERUTIL_TEAM; + // A team healthy priority for wiggle a storage server + int PRIORITY_PERPETUAL_STORAGE_WIGGLE; + // A team healthy priority when all servers in a team are healthy + int PRIORITY_TEAM_HEALTHY; + // A team healthy priority when there's undesired servers in the team. (ex. same ip + // address as other SS process, or SS is lagging too far ...) + int PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER; + // A team healthy priority for removing redundant team to make the team count within a good range + int PRIORITY_TEAM_REDUNDANT; + // A shard boundary priority for merge small and write cold shard. int PRIORITY_MERGE_SHARD; - int PRIORITY_POPULATE_REGION; // Priority for populate remote region - int PRIORITY_TEAM_UNHEALTHY; // Priority when the replica > 3 and there's at least one unhealthy server in a team. - // Or when the team contains a server with wrong configuration (ex. storage engine, - // locality, excluded ...) - int PRIORITY_TEAM_2_LEFT; // Priority when the replica <= 3 and there's 2 healthy servers in a team - int PRIORITY_TEAM_1_LEFT; // Priority when the replica <= 3 and there's 1 healthy server in a team - int PRIORITY_TEAM_FAILED; // Priority when a server in the team is excluded as failed - int PRIORITY_TEAM_0_LEFT; // Priority when the replica <= 3 and there's no healthy server in a team + // A team healthy priority for populate remote region + int PRIORITY_POPULATE_REGION; + // A team healthy priority when the replica > 3 and there's at least one unhealthy server in a team. + // Or when the team contains a server with wrong configuration (ex. storage engine, + // locality, excluded ...) + int PRIORITY_TEAM_UNHEALTHY; + // A team healthy priority when the replica <= 3 and there's 2 healthy servers in a team + int PRIORITY_TEAM_2_LEFT; + // A team healthy priority when the replica <= 3 and there's 1 healthy server in a team + int PRIORITY_TEAM_1_LEFT; + // A team healthy priority when a server in the team is excluded as failed + int PRIORITY_TEAM_FAILED; + // A team healthy priority when the replica <= 3 and there's no healthy server in a team + int PRIORITY_TEAM_0_LEFT; + // A shard boundary priority for split large or write hot shard. int PRIORITY_SPLIT_SHARD; - int PRIORITY_ENFORCE_MOVE_OUT_OF_PHYSICAL_SHARD; // Priority when a physical shard is oversize or anonymous + // Priority when a physical shard is oversize or anonymous + int PRIORITY_ENFORCE_MOVE_OUT_OF_PHYSICAL_SHARD; // Data distribution bool SHARD_ENCODE_LOCATION_METADATA; // If true, location metadata will contain shard ID. From e48fd10d8d2eb3bc8ed8efcc3786e277f734249b Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Thu, 16 Mar 2023 14:04:15 -0700 Subject: [PATCH 3/7] add perpetual wiggle to .team_tracker field --- fdbclient/Schemas.cpp | 1 + fdbserver/Status.actor.cpp | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp index 49b762a28e..97646de02d 100644 --- a/fdbclient/Schemas.cpp +++ b/fdbclient/Schemas.cpp @@ -933,6 +933,7 @@ const KeyRef JSONSchemas::statusSchema = R"statusSchema( "healthy_repartitioning", "healthy_removing_server", "healthy_rebalancing", + "healthy_perpetual_wiggle", "healthy" ] }, diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index c7df40f1ee..bdf3383f22 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -1893,6 +1893,10 @@ ACTOR static Future dataStatusFetcher(WorkerDetails ddWorker, } else if (highestPriority == SERVER_KNOBS->PRIORITY_TEAM_HEALTHY) { stateSectionObj["healthy"] = true; stateSectionObj["name"] = "healthy"; + } else if (highestPriority == SERVER_KNOBS->PRIORITY_PERPETUAL_STORAGE_WIGGLE) { + stateSectionObj["healthy"] = true; + stateSectionObj["name"] = "healthy_perpetual_wiggle"; + stateSectionObj["description"] = "Wiggling storage server"; } else if (highestPriority >= SERVER_KNOBS->PRIORITY_RECOVER_MOVE) { stateSectionObj["healthy"] = true; stateSectionObj["name"] = "healthy_rebalancing"; From ef706e551f4e75a18e023f851fdb7240e45ba9e8 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Fri, 17 Mar 2023 14:54:26 -0700 Subject: [PATCH 4/7] Add more details into priority comments. --- fdbclient/include/fdbclient/ServerKnobs.h | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 4ec0c1cc11..2ef3a45704 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -157,7 +157,8 @@ public: int PRIORITY_REBALANCE_READ_UNDERUTIL_TEAM; // A team healthy priority for wiggle a storage server int PRIORITY_PERPETUAL_STORAGE_WIGGLE; - // A team healthy priority when all servers in a team are healthy + // A team healthy priority when all servers in a team are healthy. When a team changes from any unhealthy states to + // healthy, the unfinished relocations will be overriden to healthy priority int PRIORITY_TEAM_HEALTHY; // A team healthy priority when there's undesired servers in the team. (ex. same ip // address as other SS process, or SS is lagging too far ...) @@ -172,17 +173,18 @@ public: // Or when the team contains a server with wrong configuration (ex. storage engine, // locality, excluded ...) int PRIORITY_TEAM_UNHEALTHY; - // A team healthy priority when the replica <= 3 and there's 2 healthy servers in a team + // A team healthy priority when there should be >= 3 replicas and there's 2 healthy servers in a team int PRIORITY_TEAM_2_LEFT; - // A team healthy priority when the replica <= 3 and there's 1 healthy server in a team + // A team healthy priority when there should be >= 2 replicas and there's 1 healthy server in a team int PRIORITY_TEAM_1_LEFT; // A team healthy priority when a server in the team is excluded as failed int PRIORITY_TEAM_FAILED; - // A team healthy priority when the replica <= 3 and there's no healthy server in a team + // A team healthy priority when there's no healthy server in a team int PRIORITY_TEAM_0_LEFT; // A shard boundary priority for split large or write hot shard. int PRIORITY_SPLIT_SHARD; - // Priority when a physical shard is oversize or anonymous + // Priority when a physical shard is oversize or anonymous. When DD enable physical shard, the shard created before + // it are default to be 'anonymous' for compatibility. int PRIORITY_ENFORCE_MOVE_OUT_OF_PHYSICAL_SHARD; // Data distribution From dc1eb1375b9b1358b2586bb1f904fc16623fce4f Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Fri, 17 Mar 2023 17:19:30 -0700 Subject: [PATCH 5/7] add a miss healthy_perpetual_wiggle enum --- fdbclient/Schemas.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp index 97646de02d..c3b9e84c63 100644 --- a/fdbclient/Schemas.cpp +++ b/fdbclient/Schemas.cpp @@ -898,6 +898,7 @@ const KeyRef JSONSchemas::statusSchema = R"statusSchema( "healthy_repartitioning", "healthy_removing_server", "healthy_rebalancing", + "healthy_perpetual_wiggle", "healthy" ] }, From 080837cf86e0ee29d94b887a4a8f66ecb1b2b593 Mon Sep 17 00:00:00 2001 From: Chunhao Xu Date: Mon, 20 Mar 2023 11:46:27 -0700 Subject: [PATCH 6/7] refactor struct BlobWorkerData and struct GranuleMetadata (#9659) * refactor struct BlobWorkerData and struct GranuleMetadata * undo the change on blobgranuleservercommon --- fdbserver/BlobWorker.actor.cpp | 499 ++++++++--------------- fdbserver/include/fdbserver/BlobWorker.h | 221 ++++++++++ 2 files changed, 383 insertions(+), 337 deletions(-) create mode 100644 fdbserver/include/fdbserver/BlobWorker.h diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index a56836bcab..96563d81f9 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -22,6 +22,7 @@ #include "fdbclient/BlobCipher.h" #include "fdbclient/BlobGranuleFiles.h" #include "fdbclient/FDBTypes.h" +#include "fdbclient/GetEncryptCipherKeys.actor.h" #include "fdbclient/KeyRangeMap.h" #include "fdbclient/SystemData.h" #include "fdbclient/BackupContainerFileSystem.h" @@ -36,6 +37,7 @@ #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/Notified.h" +#include "fdbserver/BlobWorker.h" #include "fdbserver/BlobGranuleServerCommon.actor.h" #include "fdbclient/GetEncryptCipherKeys.actor.h" #include "fdbserver/Knobs.h" @@ -67,357 +69,180 @@ #define BW_HISTORY_DEBUG false #define BW_REQUEST_DEBUG false +void GranuleMetadata::resume() { + if (resumeSnapshot.canBeSet()) { + resumeSnapshot.send(Void()); + } +} + +void GranuleMetadata::resetReadStats() { + rdcCandidate = false; + readStats.reset(); + runRDC.reset(); +} + +double GranuleMetadata::weightRDC() { + // ratio of read amp to write amp that would be incurred by re-snapshotting now + int64_t lastSnapshotSize = (files.snapshotFiles.empty()) ? 0 : files.snapshotFiles.back().length; + int64_t minSnapshotSize = SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES / 2; + lastSnapshotSize = std::max(minSnapshotSize, lastSnapshotSize); + + int64_t writeAmp = lastSnapshotSize + bufferedDeltaBytes + bytesInNewDeltaFiles; + // read amp is deltaBytesRead. Read amp must be READ_FACTOR times larger than write amp + return (1.0 * readStats.deltaBytesRead) / (writeAmp * SERVER_KNOBS->BG_RDC_READ_FACTOR); +} + +bool GranuleMetadata::isEligibleRDC() const { + // granule should be reasonably read-hot to be eligible + int64_t bytesWritten = bufferedDeltaBytes + bytesInNewDeltaFiles; + return bytesWritten * SERVER_KNOBS->BG_RDC_READ_FACTOR < readStats.deltaBytesRead; +} + +bool GranuleMetadata::updateReadStats(Version readVersion, const BlobGranuleChunkRef& chunk) { + // Only update stats for re-compacting for at-latest reads that have to do snapshot + delta merge + if (!SERVER_KNOBS->BG_ENABLE_READ_DRIVEN_COMPACTION || !chunk.snapshotFile.present() || + pendingSnapshotVersion != durableSnapshotVersion.get() || readVersion <= pendingSnapshotVersion) { + return false; + } + + if (chunk.newDeltas.empty() && chunk.deltaFiles.empty()) { + return false; + } + + readStats.deltaBytesRead += chunk.newDeltas.expectedSize(); + for (auto& it : chunk.deltaFiles) { + readStats.deltaBytesRead += it.length; + } + + if (rdcCandidate) { + return false; + } + + if (isEligibleRDC() && weightRDC() > 1.0) { + rdcCandidate = true; + CODE_PROBE(true, "Granule read triggering read-driven compaction"); + if (BW_DEBUG) { + fmt::print("Triggering read-driven compaction of [{0} - {1})\n", + keyRange.begin.printable(), + keyRange.end.printable()); + } + return true; + } + return false; +} + +void GranuleRangeMetadata::cancel() { + if (activeMetadata->cancelled.canBeSet()) { + activeMetadata->cancelled.send(Void()); + } + activeMetadata.clear(); + assignFuture.cancel(); + historyLoaderFuture.cancel(); + fileUpdaterFuture.cancel(); +} /* * The Blob Worker is a stateless role assigned a set of granules by the Blob Manager. * It is responsible for managing the change feeds for those granules, and for consuming the mutations from * those change feeds and writing them out as files to blob storage. */ - -struct GranuleStartState { - UID granuleID; - Version changeFeedStartVersion; - Version previousDurableVersion; - Optional> splitParentGranule; - bool doSnapshot; - std::vector blobFilesToSnapshot; - Optional existingFiles; - Optional history; -}; - -// TODO: add more (blob file request cost, in-memory mutations vs blob delta file, etc...) -struct GranuleReadStats { - int64_t deltaBytesRead; - - void reset() { deltaBytesRead = 0; } - - GranuleReadStats() { reset(); } -}; - -struct GranuleMetadata : NonCopyable, ReferenceCounted { - KeyRange keyRange; - - GranuleFiles files; - Standalone - currentDeltas; // only contain deltas in pendingDeltaVersion + 1 through bufferedDeltaVersion - - uint64_t bytesInNewDeltaFiles = 0; - uint64_t bufferedDeltaBytes = 0; - - // for client to know when it is safe to read a certain version and from where (check waitForVersion) - Version bufferedDeltaVersion; // largest delta version in currentDeltas (including empty versions) - Version pendingDeltaVersion = 0; // largest version in progress writing to s3/fdb - NotifiedVersion durableDeltaVersion; // largest version persisted in s3/fdb - NotifiedVersion durableSnapshotVersion; // same as delta vars, except for snapshots - Version pendingSnapshotVersion = 0; - Version initialSnapshotVersion = invalidVersion; - Version historyVersion = invalidVersion; - Version knownCommittedVersion; - NotifiedVersion forceFlushVersion; // Version to force a flush at, if necessary - Version forceCompactVersion = invalidVersion; - - int64_t originalEpoch; - int64_t originalSeqno; - int64_t continueEpoch; - int64_t continueSeqno; - - Promise cancelled; - Promise readable; - Promise historyLoaded; - - Promise resumeSnapshot; - - AsyncVar> activeCFData; - - AssignBlobRangeRequest originalReq; - - GranuleReadStats readStats; - bool rdcCandidate; - Promise runRDC; - - void resume() { - if (resumeSnapshot.canBeSet()) { - resumeSnapshot.send(Void()); - } - } - - void resetReadStats() { - rdcCandidate = false; - readStats.reset(); - runRDC.reset(); - } - - // determine eligibility (>1) and priority for re-snapshotting this granule - double weightRDC() { - // ratio of read amp to write amp that would be incurred by re-snapshotting now - int64_t lastSnapshotSize = (files.snapshotFiles.empty()) ? 0 : files.snapshotFiles.back().length; - int64_t minSnapshotSize = SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES / 2; - lastSnapshotSize = std::max(minSnapshotSize, lastSnapshotSize); - - int64_t writeAmp = lastSnapshotSize + bufferedDeltaBytes + bytesInNewDeltaFiles; - // read amp is deltaBytesRead. Read amp must be READ_FACTOR times larger than write amp - return (1.0 * readStats.deltaBytesRead) / (writeAmp * SERVER_KNOBS->BG_RDC_READ_FACTOR); - } - - bool isEligibleRDC() const { - // granule should be reasonably read-hot to be eligible - int64_t bytesWritten = bufferedDeltaBytes + bytesInNewDeltaFiles; - return bytesWritten * SERVER_KNOBS->BG_RDC_READ_FACTOR < readStats.deltaBytesRead; - } - - bool updateReadStats(Version readVersion, const BlobGranuleChunkRef& chunk) { - // Only update stats for re-compacting for at-latest reads that have to do snapshot + delta merge - if (!SERVER_KNOBS->BG_ENABLE_READ_DRIVEN_COMPACTION || !chunk.snapshotFile.present() || - pendingSnapshotVersion != durableSnapshotVersion.get() || readVersion <= pendingSnapshotVersion) { - return false; - } - - if (chunk.newDeltas.empty() && chunk.deltaFiles.empty()) { - return false; - } - - readStats.deltaBytesRead += chunk.newDeltas.expectedSize(); - for (auto& it : chunk.deltaFiles) { - readStats.deltaBytesRead += it.length; - } - - if (rdcCandidate) { - return false; - } - - if (isEligibleRDC() && weightRDC() > 1.0) { - rdcCandidate = true; - CODE_PROBE(true, "Granule read triggering read-driven compaction"); - if (BW_DEBUG) { - fmt::print("Triggering read-driven compaction of [{0} - {1})\n", - keyRange.begin.printable(), - keyRange.end.printable()); - } - return true; +bool BlobWorkerData::managerEpochOk(int64_t epoch) { + if (epoch < currentManagerEpoch) { + if (BW_DEBUG) { + fmt::print( + "BW {0} got request from old epoch {1}, notifying them they are out of date\n", id.toString(), epoch); } return false; - } - - inline bool doEarlyReSnapshot() { - return runRDC.isSet() || - (forceCompactVersion <= pendingDeltaVersion && forceCompactVersion > pendingSnapshotVersion); - } -}; - -struct GranuleRangeMetadata { - int64_t lastEpoch; - int64_t lastSeqno; - Reference activeMetadata; - - Future assignFuture; - Future fileUpdaterFuture; - Future historyLoaderFuture; - - void cancel() { - if (activeMetadata->cancelled.canBeSet()) { - activeMetadata->cancelled.send(Void()); - } - activeMetadata.clear(); - assignFuture.cancel(); - historyLoaderFuture.cancel(); - fileUpdaterFuture.cancel(); - } - - GranuleRangeMetadata() : lastEpoch(0), lastSeqno(0) {} - GranuleRangeMetadata(int64_t epoch, int64_t seqno, Reference activeMetadata) - : lastEpoch(epoch), lastSeqno(seqno), activeMetadata(activeMetadata) {} -}; - -// represents a previous version of a granule, and optionally the files that compose it. -struct GranuleHistoryEntry : NonCopyable, ReferenceCounted { - KeyRange range; - UID granuleID; - Version startVersion; // version of the first snapshot - Version endVersion; // version of the last delta file - - // load files lazily, and allows for clearing old cold-queried files to save memory - // FIXME: add memory limit and evictor for old cached files - Future files; - - // FIXME: do skip pointers with single back-pointer and neighbor pointers - std::vector> parentGranules; - - GranuleHistoryEntry() : startVersion(invalidVersion), endVersion(invalidVersion) {} - GranuleHistoryEntry(KeyRange range, UID granuleID, Version startVersion, Version endVersion) - : range(range), granuleID(granuleID), startVersion(startVersion), endVersion(endVersion) {} -}; - -struct BlobWorkerData : NonCopyable, ReferenceCounted { - UID id; - Database db; - IKeyValueStore* storage; - - PromiseStream> addActor; - - LocalityData locality; - int64_t currentManagerEpoch = -1; - - AsyncVar> currentManagerStatusStream; - bool statusStreamInitialized = false; - - // FIXME: refactor out the parts of this that are just for interacting with blob stores from the backup business - // logic - Reference bstore; - KeyRangeMap granuleMetadata; - BGTenantMap tenantData; - Reference const> dbInfo; - - // contains the history of completed granules before the existing ones. Maps to the latest one, and has - // back-pointers to earlier granules - // FIXME: expire from map after a delay when granule is revoked and the history is no longer needed - KeyRangeMap> granuleHistory; - - PromiseStream granuleUpdateErrors; - - Promise doGRVCheck; - NotifiedVersion grvVersion; - std::deque prevGRVVersions; - Promise fatalError; - Promise simInjectFailure; - Promise doReadDrivenCompaction; - - Reference initialSnapshotLock; - Reference resnapshotBudget; - Reference deltaWritesBudget; - - BlobWorkerStats stats; - - bool shuttingDown = false; - - // FIXME: have cap on this independent of delta file size for larger granules - int changeFeedStreamReplyBufferSize = SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES / 4; - - EncryptionAtRestMode encryptMode; - bool buggifyFull = false; - - int64_t memoryFullThreshold = - (int64_t)(SERVER_KNOBS->BLOB_WORKER_REJECT_WHEN_FULL_THRESHOLD * SERVER_KNOBS->SERVER_MEM_LIMIT); - int64_t lastResidentMemory = 0; - double lastResidentMemoryCheckTime = -100.0; - - bool isFullRestoreMode = false; - - BlobWorkerData(UID id, Reference const> dbInfo, Database db, IKeyValueStore* storage) - : id(id), db(db), storage(storage), tenantData(BGTenantMap(dbInfo)), dbInfo(dbInfo), - initialSnapshotLock(new FlowLock(SERVER_KNOBS->BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM)), - resnapshotBudget(new FlowLock(SERVER_KNOBS->BLOB_WORKER_RESNAPSHOT_BUDGET_BYTES)), - deltaWritesBudget(new FlowLock(SERVER_KNOBS->BLOB_WORKER_DELTA_WRITE_BUDGET_BYTES)), - stats(id, - SERVER_KNOBS->WORKER_LOGGING_INTERVAL, - initialSnapshotLock, - resnapshotBudget, - deltaWritesBudget, - SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, - SERVER_KNOBS->FILE_LATENCY_SKETCH_ACCURACY, - SERVER_KNOBS->LATENCY_SKETCH_ACCURACY), - encryptMode(EncryptionAtRestMode::DISABLED) {} - - bool managerEpochOk(int64_t epoch) { - if (epoch < currentManagerEpoch) { + } else { + if (epoch > currentManagerEpoch) { + currentManagerEpoch = epoch; if (BW_DEBUG) { - fmt::print("BW {0} got request from old epoch {1}, notifying them they are out of date\n", - id.toString(), - epoch); + fmt::print("BW {0} found new manager epoch {1}\n", id.toString(), currentManagerEpoch); } - return false; - } else { - if (epoch > currentManagerEpoch) { - currentManagerEpoch = epoch; - if (BW_DEBUG) { - fmt::print("BW {0} found new manager epoch {1}\n", id.toString(), currentManagerEpoch); - } - TraceEvent(SevDebug, "BlobWorkerFoundNewManager", id).detail("Epoch", epoch); - } - - return true; + TraceEvent(SevDebug, "BlobWorkerFoundNewManager", id).detail("Epoch", epoch); } + + return true; } +} - bool isFull() { - if (!SERVER_KNOBS->BLOB_WORKER_DO_REJECT_WHEN_FULL) { - return false; - } - if (g_network->isSimulated()) { - if (g_simulator->speedUpSimulation) { - return false; - } - return buggifyFull; - } - - // TODO knob? - if (now() >= 1.0 + lastResidentMemoryCheckTime) { - // fdb as of 7.1 limits on resident memory instead of virtual memory - stats.lastResidentMemory = getResidentMemoryUsage(); - lastResidentMemoryCheckTime = now(); - } - - // if we are already over threshold, no need to estimate extra memory - if (stats.lastResidentMemory >= memoryFullThreshold) { - return true; - } - - // FIXME: since this isn't tested in simulation, could unit test this - // Try to model how much memory we *could* use given the already existing assignments and workload on this blob - // worker, before agreeing to take on a new assignment, given that several large sources of memory can grow and - // change post-assignment - - // FIXME: change these to be byte counts - // FIXME: buggify an extra multiplication factor for short periods of time to hopefully trigger this logic more - // often? estimate slack in bytes buffered as max(0, assignments * (delta file size / 2) - bytesBuffered) - // FIXME: this doesn't take increased delta file size for heavy write amp cases into account - int64_t expectedExtraBytesBuffered = std::max( - 0, stats.numRangesAssigned * (SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES / 2) - stats.mutationBytesBuffered); - // estimate slack in potential pending resnapshot - int64_t totalExtra = - expectedExtraBytesBuffered + deltaWritesBudget->available() + resnapshotBudget->available(); - // assumes initial snapshot parallelism is small enough and uncommon enough to not add it to this computation - stats.estimatedMaxResidentMemory = stats.lastResidentMemory + totalExtra; - - return stats.estimatedMaxResidentMemory >= memoryFullThreshold; - } - - void triggerReadDrivenCompaction() { - Promise doRDC = doReadDrivenCompaction; - if (doRDC.canBeSet()) { - doRDC.send(Void()); - } - } - - void addGRVHistory(Version readVersion) { - if (grvVersion.get() < readVersion) { - // We use GRVs from grv checker loop, plus other common BW transactions. To prevent the deque size from - // exploding or the effective version window from getting too small, only put GRVs in the deque if they are - // at least some small distance apart - if (prevGRVVersions.empty() || - prevGRVVersions.back() + SERVER_KNOBS->BLOB_WORKER_GRV_HISTORY_MIN_VERSION_GRANULARITY <= readVersion) { - prevGRVVersions.push_back(readVersion); - while (prevGRVVersions.size() > SERVER_KNOBS->BLOB_WORKER_GRV_HISTORY_MAX_SIZE) { - prevGRVVersions.pop_front(); - } - } - - // set notified version last, so that all triggered waiters have prevGRVVersions populated too - grvVersion.set(readVersion); - } - } - - bool maybeInjectTargetedRestart() { - // inject a BW restart at most once per test - if (g_network->isSimulated() && !g_simulator->speedUpSimulation && - now() > g_simulator->injectTargetedBWRestartTime) { - CODE_PROBE(true, "Injecting BW targeted restart"); - TraceEvent("SimBWInjectTargetedRestart", id); - g_simulator->injectTargetedBWRestartTime = std::numeric_limits::max(); - simInjectFailure.send(Void()); - return true; - } +bool BlobWorkerData::isFull() { + if (!SERVER_KNOBS->BLOB_WORKER_DO_REJECT_WHEN_FULL) { return false; } -}; + if (g_network->isSimulated()) { + if (g_simulator->speedUpSimulation) { + return false; + } + return buggifyFull; + } + + // TODO knob? + if (now() >= 1.0 + lastResidentMemoryCheckTime) { + // fdb as of 7.1 limits on resident memory instead of virtual memory + stats.lastResidentMemory = getResidentMemoryUsage(); + lastResidentMemoryCheckTime = now(); + } + + // if we are already over threshold, no need to estimate extra memory + if (stats.lastResidentMemory >= memoryFullThreshold) { + return true; + } + + // FIXME: since this isn't tested in simulation, could unit test this + // Try to model how much memory we *could* use given the already existing assignments and workload on this blob + // worker, before agreeing to take on a new assignment, given that several large sources of memory can grow and + // change post-assignment + + // FIXME: change these to be byte counts + // FIXME: buggify an extra multiplication factor for short periods of time to hopefully trigger this logic more + // often? estimate slack in bytes buffered as max(0, assignments * (delta file size / 2) - bytesBuffered) + // FIXME: this doesn't take increased delta file size for heavy write amp cases into account + int64_t expectedExtraBytesBuffered = std::max( + 0, stats.numRangesAssigned * (SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES / 2) - stats.mutationBytesBuffered); + // estimate slack in potential pending resnapshot + int64_t totalExtra = expectedExtraBytesBuffered + deltaWritesBudget->available() + resnapshotBudget->available(); + // assumes initial snapshot parallelism is small enough and uncommon enough to not add it to this computation + stats.estimatedMaxResidentMemory = stats.lastResidentMemory + totalExtra; + + return stats.estimatedMaxResidentMemory >= memoryFullThreshold; +} + +void BlobWorkerData::triggerReadDrivenCompaction() { + Promise doRDC = doReadDrivenCompaction; + if (doRDC.canBeSet()) { + doRDC.send(Void()); + } +} + +void BlobWorkerData::addGRVHistory(Version readVersion) { + if (grvVersion.get() < readVersion) { + // We use GRVs from grv checker loop, plus other common BW transactions. To prevent the deque size from + // exploding or the effective version window from getting too small, only put GRVs in the deque if they are + // at least some small distance apart + if (prevGRVVersions.empty() || + prevGRVVersions.back() + SERVER_KNOBS->BLOB_WORKER_GRV_HISTORY_MIN_VERSION_GRANULARITY <= readVersion) { + prevGRVVersions.push_back(readVersion); + while (prevGRVVersions.size() > SERVER_KNOBS->BLOB_WORKER_GRV_HISTORY_MAX_SIZE) { + prevGRVVersions.pop_front(); + } + } + + // set notified version last, so that all triggered waiters have prevGRVVersions populated too + grvVersion.set(readVersion); + } +} +bool BlobWorkerData::maybeInjectTargetedRestart() { + // inject a BW restart at most once per test + if (g_network->isSimulated() && !g_simulator->speedUpSimulation && + now() > g_simulator->injectTargetedBWRestartTime) { + CODE_PROBE(true, "Injecting BW targeted restart"); + TraceEvent("SimBWInjectTargetedRestart", id); + g_simulator->injectTargetedBWRestartTime = std::numeric_limits::max(); + simInjectFailure.send(Void()); + return true; + } + return false; +} namespace { diff --git a/fdbserver/include/fdbserver/BlobWorker.h b/fdbserver/include/fdbserver/BlobWorker.h new file mode 100644 index 0000000000..1f748e1042 --- /dev/null +++ b/fdbserver/include/fdbserver/BlobWorker.h @@ -0,0 +1,221 @@ +/* + * BlobWorker.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBSERVER_BLOBWORKER_H +#define FDBSERVER_BLOBWORKER_H + +#include "fdbclient/BlobWorkerCommon.h" + +#include "fdbserver/BlobGranuleServerCommon.actor.h" +#include "fdbserver/Knobs.h" + +#include + +#include "flow/actorcompiler.h" // has to be last include + +struct GranuleStartState { + UID granuleID; + Version changeFeedStartVersion; + Version previousDurableVersion; + Optional> splitParentGranule; + bool doSnapshot; + std::vector blobFilesToSnapshot; + Optional existingFiles; + Optional history; +}; + +// TODO: add more (blob file request cost, in-memory mutations vs blob delta file, etc...) +struct GranuleReadStats { + int64_t deltaBytesRead; + + void reset() { deltaBytesRead = 0; } + + GranuleReadStats() { reset(); } +}; + +struct GranuleMetadata : NonCopyable, ReferenceCounted { + KeyRange keyRange; + + GranuleFiles files; + Standalone + currentDeltas; // only contain deltas in pendingDeltaVersion + 1 through bufferedDeltaVersion + + uint64_t bytesInNewDeltaFiles = 0; + uint64_t bufferedDeltaBytes = 0; + + // for client to know when it is safe to read a certain version and from where (check waitForVersion) + Version bufferedDeltaVersion; // largest delta version in currentDeltas (including empty versions) + Version pendingDeltaVersion = 0; // largest version in progress writing to s3/fdb + NotifiedVersion durableDeltaVersion; // largest version persisted in s3/fdb + NotifiedVersion durableSnapshotVersion; // same as delta vars, except for snapshots + Version pendingSnapshotVersion = 0; + Version initialSnapshotVersion = invalidVersion; + Version historyVersion = invalidVersion; + Version knownCommittedVersion; + NotifiedVersion forceFlushVersion; // Version to force a flush at, if necessary + Version forceCompactVersion = invalidVersion; + + int64_t originalEpoch; + int64_t originalSeqno; + int64_t continueEpoch; + int64_t continueSeqno; + + Promise cancelled; + Promise readable; + Promise historyLoaded; + + Promise resumeSnapshot; + + AsyncVar> activeCFData; + + AssignBlobRangeRequest originalReq; + + GranuleReadStats readStats; + bool rdcCandidate; + Promise runRDC; + + void resume(); + void resetReadStats(); + + // determine eligibility (>1) and priority for re-snapshotting this granule + double weightRDC(); + + bool isEligibleRDC() const; + + bool updateReadStats(Version readVersion, const BlobGranuleChunkRef& chunk); + + inline bool doEarlyReSnapshot() { + return runRDC.isSet() || + (forceCompactVersion <= pendingDeltaVersion && forceCompactVersion > pendingSnapshotVersion); + } +}; + +struct GranuleRangeMetadata { + int64_t lastEpoch; + int64_t lastSeqno; + Reference activeMetadata; + + Future assignFuture; + Future fileUpdaterFuture; + Future historyLoaderFuture; + + void cancel(); + GranuleRangeMetadata() : lastEpoch(0), lastSeqno(0) {} + GranuleRangeMetadata(int64_t epoch, int64_t seqno, Reference activeMetadata) + : lastEpoch(epoch), lastSeqno(seqno), activeMetadata(activeMetadata) {} +}; + +// represents a previous version of a granule, and optionally the files that compose it. +struct GranuleHistoryEntry : NonCopyable, ReferenceCounted { + KeyRange range; + UID granuleID; + Version startVersion; // version of the first snapshot + Version endVersion; // version of the last delta file + + // load files lazily, and allows for clearing old cold-queried files to save memory + // FIXME: add memory limit and evictor for old cached files + Future files; + + // FIXME: do skip pointers with single back-pointer and neighbor pointers + std::vector> parentGranules; + + GranuleHistoryEntry() : startVersion(invalidVersion), endVersion(invalidVersion) {} + GranuleHistoryEntry(KeyRange range, UID granuleID, Version startVersion, Version endVersion) + : range(range), granuleID(granuleID), startVersion(startVersion), endVersion(endVersion) {} +}; + +struct BlobWorkerData : NonCopyable, ReferenceCounted { + UID id; + Database db; + IKeyValueStore* storage; + + PromiseStream> addActor; + + LocalityData locality; + int64_t currentManagerEpoch = -1; + + AsyncVar> currentManagerStatusStream; + bool statusStreamInitialized = false; + + // FIXME: refactor out the parts of this that are just for interacting with blob stores from the backup business + // logic + Reference bstore; + KeyRangeMap granuleMetadata; + BGTenantMap tenantData; + Reference const> dbInfo; + + // contains the history of completed granules before the existing ones. Maps to the latest one, and has + // back-pointers to earlier granules + // FIXME: expire from map after a delay when granule is revoked and the history is no longer needed + KeyRangeMap> granuleHistory; + + PromiseStream granuleUpdateErrors; + + Promise doGRVCheck; + NotifiedVersion grvVersion; + std::deque prevGRVVersions; + Promise fatalError; + Promise simInjectFailure; + Promise doReadDrivenCompaction; + + Reference initialSnapshotLock; + Reference resnapshotBudget; + Reference deltaWritesBudget; + + BlobWorkerStats stats; + + bool shuttingDown = false; + + // FIXME: have cap on this independent of delta file size for larger granules + int changeFeedStreamReplyBufferSize = SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES / 4; + + EncryptionAtRestMode encryptMode; + bool buggifyFull = false; + + int64_t memoryFullThreshold = + (int64_t)(SERVER_KNOBS->BLOB_WORKER_REJECT_WHEN_FULL_THRESHOLD * SERVER_KNOBS->SERVER_MEM_LIMIT); + int64_t lastResidentMemory = 0; + double lastResidentMemoryCheckTime = -100.0; + + bool isFullRestoreMode = false; + + BlobWorkerData(UID id, Reference const> dbInfo, Database db, IKeyValueStore* storage) + : id(id), db(db), storage(storage), tenantData(BGTenantMap(dbInfo)), dbInfo(dbInfo), + initialSnapshotLock(new FlowLock(SERVER_KNOBS->BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM)), + resnapshotBudget(new FlowLock(SERVER_KNOBS->BLOB_WORKER_RESNAPSHOT_BUDGET_BYTES)), + deltaWritesBudget(new FlowLock(SERVER_KNOBS->BLOB_WORKER_DELTA_WRITE_BUDGET_BYTES)), + stats(id, + SERVER_KNOBS->WORKER_LOGGING_INTERVAL, + initialSnapshotLock, + resnapshotBudget, + deltaWritesBudget, + SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, + SERVER_KNOBS->FILE_LATENCY_SKETCH_ACCURACY, + SERVER_KNOBS->LATENCY_SKETCH_ACCURACY), + encryptMode(EncryptionAtRestMode::DISABLED) {} + + bool managerEpochOk(int64_t epoch); + bool isFull(); + void triggerReadDrivenCompaction(); + void addGRVHistory(Version readVersion); + bool maybeInjectTargetedRestart(); +}; + +#endif \ No newline at end of file From 2b0857f2ee7bc87692ab27f256220aa840d91bbc Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Tue, 14 Mar 2023 13:03:07 -0700 Subject: [PATCH 7/7] fix a rare race condition in PerpetualWiggleStatsWorkload.actor.cpp --- fdbserver/workloads/PerpetualWiggleStatsWorkload.actor.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/fdbserver/workloads/PerpetualWiggleStatsWorkload.actor.cpp b/fdbserver/workloads/PerpetualWiggleStatsWorkload.actor.cpp index 45d00275d4..f6064eb702 100644 --- a/fdbserver/workloads/PerpetualWiggleStatsWorkload.actor.cpp +++ b/fdbserver/workloads/PerpetualWiggleStatsWorkload.actor.cpp @@ -158,6 +158,7 @@ struct PerpetualWiggleStatsWorkload : public TestWorkload { MoveKeysLock lock = wait(takeMoveKeysLock(cx, UID())); // force current DD to quit bool success = wait(IssueConfigurationChange(cx, "storage_migration_type=disabled", true)); ASSERT(success); + wait(delay(30.0)); // make sure the DD has already quit before the test start return Void(); }