Merge branch 'main' into fix-tenant-list-infinite-loop

This commit is contained in:
A.J. Beamon 2023-03-20 14:11:16 -07:00
commit 6becf12ecd
7 changed files with 431 additions and 341 deletions

View File

@ -81,6 +81,17 @@ Actors are created to monitor the reasons of key movement:
(3) `serverTeamRemover` and `machineTeamRemover` actors periodically evaluate if the number of server teams and machine teams is larger than the desired number. If so, they respectively pick a server team or a machine team to remove based on predefined criteria;
(4) `teamTracker` actor monitors a teams healthiness. When a server in the team becomes unhealthy, it issues the `RelocateShard` request to repair the replication factor. The less servers a team has, the higher priority the `RelocateShard` request will be.
#### Movement Priority
There are roughly 4 class of movement priorities
* Healthy priority. The movement is for maintain the cluster healthy status, and the priority is depended on the healthy status of the source team.
* Load balance priority. The movement is for balance cluster workload.
* Boundary change priority. The movement will change current shard boundaries.
* Others. Like resuming a in-flight movement.
Each shard movement has a priority associating with the move attempt, The explanation of each priority knob (`PRIORITY_<XXX>`) is in `ServerKnobs.h`.
In `status json` output, please look at field `.data.team_tracker.state` for team priority state.
### How to move keys?
A key range is a shard. A shard is the minimum unit of moving data. The storage servers ownership of a shard -- which SS owns which shard -- is stored in the system keyspace *serverKeys* (`\xff/serverKeys/`) and *keyServers* (`\xff/keyServers/`). To simplify the explanation, we refer to the storage servers ownership of a shard as a shards ownership.
@ -152,11 +163,11 @@ CPU utilization. This metric is in a positive relationship with “FinishedQueri
* Read-aware DD will balance the read workload under the read-skew scenario. Starting from an imbalance `STD(FinishedQueries per minute)=16k`,the best result it can achieve is `STD(FinishedQueries per minute) = 2k`.
* The typical movement size under a read-skew scenario is 100M ~ 600M under default KNOB value `READ_REBALANCE_MAX_SHARD_FRAC=0.2, READ_REBALANCE_SRC_PARALLELISM = 20`. Increasing those knobs may accelerate the converge speed with the risk of data movement churn, which overwhelms the destination and over-cold the source.
* The upper bound of `READ_REBALANCE_MAX_SHARD_FRAC` is 0.5. Any value larger than 0.5 can result in hot server switching.
* When needing a deeper diagnosis of the read aware DD, `BgDDMountainChopper_New`, and `BgDDValleyFiller_New` trace events are where to go.
* When needing a deeper diagnosis of the read aware DD, `BgDDMountainChopper`, and `BgDDValleyFiller` trace events are where to go.
## Data Distribution Diagnosis Q&A
* Why Read-aware DD hasn't been triggered when there's a read imbalance?
* Check `BgDDMountainChopper_New`, `BgDDValleyFiller_New` `SkipReason` field.
* Check `BgDDMountainChopper`, `BgDDValleyFiller` `SkipReason` field.
* The Read-aware DD is triggered, and some data movement happened, but it doesn't help the read balance. Why?
* Need to figure out which server is selected as the source and destination. The information is in `BgDDMountainChopper*`, `BgDDValleyFiller*` `DestTeam` and `SourceTeam` field.
* Also, the `DDQueueServerCounter` event tells how many times a server being a source or destination (defined in

View File

@ -898,6 +898,7 @@ const KeyRef JSONSchemas::statusSchema = R"statusSchema(
"healthy_repartitioning",
"healthy_removing_server",
"healthy_rebalancing",
"healthy_perpetual_wiggle",
"healthy"
]
},
@ -933,6 +934,7 @@ const KeyRef JSONSchemas::statusSchema = R"statusSchema(
"healthy_repartitioning",
"healthy_removing_server",
"healthy_rebalancing",
"healthy_perpetual_wiggle",
"healthy"
]
},

View File

@ -142,24 +142,50 @@ public:
// is possible within but not between priority groups; fewer priority groups
// mean better worst case time bounds
// Maximum allowable priority is 999.
// Update the status json .data.team_tracker.state field when necessary
//
// Priority for movement resume from previous unfinished in-flight movement when a new DD
// start
int PRIORITY_RECOVER_MOVE;
// A load-balance priority for disk valley filler
int PRIORITY_REBALANCE_UNDERUTILIZED_TEAM;
// A load-balance priority disk mountain chopper
int PRIORITY_REBALANCE_OVERUTILIZED_TEAM;
// A load-balance priority read valley filler
int PRIORITY_REBALANCE_READ_OVERUTIL_TEAM;
// A load-balance priority read mountain chopper
int PRIORITY_REBALANCE_READ_UNDERUTIL_TEAM;
// A team healthy priority for wiggle a storage server
int PRIORITY_PERPETUAL_STORAGE_WIGGLE;
// A team healthy priority when all servers in a team are healthy. When a team changes from any unhealthy states to
// healthy, the unfinished relocations will be overriden to healthy priority
int PRIORITY_TEAM_HEALTHY;
// A team healthy priority when there's undesired servers in the team. (ex. same ip
// address as other SS process, or SS is lagging too far ...)
int PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER;
// A team healthy priority for removing redundant team to make the team count within a good range
int PRIORITY_TEAM_REDUNDANT;
// A shard boundary priority for merge small and write cold shard.
int PRIORITY_MERGE_SHARD;
// A team healthy priority for populate remote region
int PRIORITY_POPULATE_REGION;
// A team healthy priority when the replica > 3 and there's at least one unhealthy server in a team.
// Or when the team contains a server with wrong configuration (ex. storage engine,
// locality, excluded ...)
int PRIORITY_TEAM_UNHEALTHY;
// A team healthy priority when there should be >= 3 replicas and there's 2 healthy servers in a team
int PRIORITY_TEAM_2_LEFT;
// A team healthy priority when there should be >= 2 replicas and there's 1 healthy server in a team
int PRIORITY_TEAM_1_LEFT;
int PRIORITY_TEAM_FAILED; // Priority when a server in the team is excluded as failed
// A team healthy priority when a server in the team is excluded as failed
int PRIORITY_TEAM_FAILED;
// A team healthy priority when there's no healthy server in a team
int PRIORITY_TEAM_0_LEFT;
// A shard boundary priority for split large or write hot shard.
int PRIORITY_SPLIT_SHARD;
int PRIORITY_ENFORCE_MOVE_OUT_OF_PHYSICAL_SHARD; // Priority when a physical shard is oversize or anonymous
// Priority when a physical shard is oversize or anonymous. When DD enable physical shard, the shard created before
// it are default to be 'anonymous' for compatibility.
int PRIORITY_ENFORCE_MOVE_OUT_OF_PHYSICAL_SHARD;
// Data distribution
bool SHARD_ENCODE_LOCATION_METADATA; // If true, location metadata will contain shard ID.

View File

@ -22,6 +22,7 @@
#include "fdbclient/BlobCipher.h"
#include "fdbclient/BlobGranuleFiles.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/GetEncryptCipherKeys.actor.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/BackupContainerFileSystem.h"
@ -36,6 +37,7 @@
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/Notified.h"
#include "fdbserver/BlobWorker.h"
#include "fdbserver/BlobGranuleServerCommon.actor.h"
#include "fdbclient/GetEncryptCipherKeys.actor.h"
#include "fdbserver/Knobs.h"
@ -67,357 +69,180 @@
#define BW_HISTORY_DEBUG false
#define BW_REQUEST_DEBUG false
void GranuleMetadata::resume() {
if (resumeSnapshot.canBeSet()) {
resumeSnapshot.send(Void());
}
}
void GranuleMetadata::resetReadStats() {
rdcCandidate = false;
readStats.reset();
runRDC.reset();
}
double GranuleMetadata::weightRDC() {
// ratio of read amp to write amp that would be incurred by re-snapshotting now
int64_t lastSnapshotSize = (files.snapshotFiles.empty()) ? 0 : files.snapshotFiles.back().length;
int64_t minSnapshotSize = SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES / 2;
lastSnapshotSize = std::max(minSnapshotSize, lastSnapshotSize);
int64_t writeAmp = lastSnapshotSize + bufferedDeltaBytes + bytesInNewDeltaFiles;
// read amp is deltaBytesRead. Read amp must be READ_FACTOR times larger than write amp
return (1.0 * readStats.deltaBytesRead) / (writeAmp * SERVER_KNOBS->BG_RDC_READ_FACTOR);
}
bool GranuleMetadata::isEligibleRDC() const {
// granule should be reasonably read-hot to be eligible
int64_t bytesWritten = bufferedDeltaBytes + bytesInNewDeltaFiles;
return bytesWritten * SERVER_KNOBS->BG_RDC_READ_FACTOR < readStats.deltaBytesRead;
}
bool GranuleMetadata::updateReadStats(Version readVersion, const BlobGranuleChunkRef& chunk) {
// Only update stats for re-compacting for at-latest reads that have to do snapshot + delta merge
if (!SERVER_KNOBS->BG_ENABLE_READ_DRIVEN_COMPACTION || !chunk.snapshotFile.present() ||
pendingSnapshotVersion != durableSnapshotVersion.get() || readVersion <= pendingSnapshotVersion) {
return false;
}
if (chunk.newDeltas.empty() && chunk.deltaFiles.empty()) {
return false;
}
readStats.deltaBytesRead += chunk.newDeltas.expectedSize();
for (auto& it : chunk.deltaFiles) {
readStats.deltaBytesRead += it.length;
}
if (rdcCandidate) {
return false;
}
if (isEligibleRDC() && weightRDC() > 1.0) {
rdcCandidate = true;
CODE_PROBE(true, "Granule read triggering read-driven compaction");
if (BW_DEBUG) {
fmt::print("Triggering read-driven compaction of [{0} - {1})\n",
keyRange.begin.printable(),
keyRange.end.printable());
}
return true;
}
return false;
}
void GranuleRangeMetadata::cancel() {
if (activeMetadata->cancelled.canBeSet()) {
activeMetadata->cancelled.send(Void());
}
activeMetadata.clear();
assignFuture.cancel();
historyLoaderFuture.cancel();
fileUpdaterFuture.cancel();
}
/*
* The Blob Worker is a stateless role assigned a set of granules by the Blob Manager.
* It is responsible for managing the change feeds for those granules, and for consuming the mutations from
* those change feeds and writing them out as files to blob storage.
*/
struct GranuleStartState {
UID granuleID;
Version changeFeedStartVersion;
Version previousDurableVersion;
Optional<std::pair<KeyRange, UID>> splitParentGranule;
bool doSnapshot;
std::vector<GranuleFiles> blobFilesToSnapshot;
Optional<GranuleFiles> existingFiles;
Optional<GranuleHistory> history;
};
// TODO: add more (blob file request cost, in-memory mutations vs blob delta file, etc...)
struct GranuleReadStats {
int64_t deltaBytesRead;
void reset() { deltaBytesRead = 0; }
GranuleReadStats() { reset(); }
};
struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {
KeyRange keyRange;
GranuleFiles files;
Standalone<GranuleDeltas>
currentDeltas; // only contain deltas in pendingDeltaVersion + 1 through bufferedDeltaVersion
uint64_t bytesInNewDeltaFiles = 0;
uint64_t bufferedDeltaBytes = 0;
// for client to know when it is safe to read a certain version and from where (check waitForVersion)
Version bufferedDeltaVersion; // largest delta version in currentDeltas (including empty versions)
Version pendingDeltaVersion = 0; // largest version in progress writing to s3/fdb
NotifiedVersion durableDeltaVersion; // largest version persisted in s3/fdb
NotifiedVersion durableSnapshotVersion; // same as delta vars, except for snapshots
Version pendingSnapshotVersion = 0;
Version initialSnapshotVersion = invalidVersion;
Version historyVersion = invalidVersion;
Version knownCommittedVersion;
NotifiedVersion forceFlushVersion; // Version to force a flush at, if necessary
Version forceCompactVersion = invalidVersion;
int64_t originalEpoch;
int64_t originalSeqno;
int64_t continueEpoch;
int64_t continueSeqno;
Promise<Void> cancelled;
Promise<Void> readable;
Promise<Void> historyLoaded;
Promise<Void> resumeSnapshot;
AsyncVar<Reference<ChangeFeedData>> activeCFData;
AssignBlobRangeRequest originalReq;
GranuleReadStats readStats;
bool rdcCandidate;
Promise<Void> runRDC;
void resume() {
if (resumeSnapshot.canBeSet()) {
resumeSnapshot.send(Void());
}
}
void resetReadStats() {
rdcCandidate = false;
readStats.reset();
runRDC.reset();
}
// determine eligibility (>1) and priority for re-snapshotting this granule
double weightRDC() {
// ratio of read amp to write amp that would be incurred by re-snapshotting now
int64_t lastSnapshotSize = (files.snapshotFiles.empty()) ? 0 : files.snapshotFiles.back().length;
int64_t minSnapshotSize = SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES / 2;
lastSnapshotSize = std::max(minSnapshotSize, lastSnapshotSize);
int64_t writeAmp = lastSnapshotSize + bufferedDeltaBytes + bytesInNewDeltaFiles;
// read amp is deltaBytesRead. Read amp must be READ_FACTOR times larger than write amp
return (1.0 * readStats.deltaBytesRead) / (writeAmp * SERVER_KNOBS->BG_RDC_READ_FACTOR);
}
bool isEligibleRDC() const {
// granule should be reasonably read-hot to be eligible
int64_t bytesWritten = bufferedDeltaBytes + bytesInNewDeltaFiles;
return bytesWritten * SERVER_KNOBS->BG_RDC_READ_FACTOR < readStats.deltaBytesRead;
}
bool updateReadStats(Version readVersion, const BlobGranuleChunkRef& chunk) {
// Only update stats for re-compacting for at-latest reads that have to do snapshot + delta merge
if (!SERVER_KNOBS->BG_ENABLE_READ_DRIVEN_COMPACTION || !chunk.snapshotFile.present() ||
pendingSnapshotVersion != durableSnapshotVersion.get() || readVersion <= pendingSnapshotVersion) {
return false;
}
if (chunk.newDeltas.empty() && chunk.deltaFiles.empty()) {
return false;
}
readStats.deltaBytesRead += chunk.newDeltas.expectedSize();
for (auto& it : chunk.deltaFiles) {
readStats.deltaBytesRead += it.length;
}
if (rdcCandidate) {
return false;
}
if (isEligibleRDC() && weightRDC() > 1.0) {
rdcCandidate = true;
CODE_PROBE(true, "Granule read triggering read-driven compaction");
if (BW_DEBUG) {
fmt::print("Triggering read-driven compaction of [{0} - {1})\n",
keyRange.begin.printable(),
keyRange.end.printable());
}
return true;
bool BlobWorkerData::managerEpochOk(int64_t epoch) {
if (epoch < currentManagerEpoch) {
if (BW_DEBUG) {
fmt::print(
"BW {0} got request from old epoch {1}, notifying them they are out of date\n", id.toString(), epoch);
}
return false;
}
inline bool doEarlyReSnapshot() {
return runRDC.isSet() ||
(forceCompactVersion <= pendingDeltaVersion && forceCompactVersion > pendingSnapshotVersion);
}
};
struct GranuleRangeMetadata {
int64_t lastEpoch;
int64_t lastSeqno;
Reference<GranuleMetadata> activeMetadata;
Future<GranuleStartState> assignFuture;
Future<Void> fileUpdaterFuture;
Future<Void> historyLoaderFuture;
void cancel() {
if (activeMetadata->cancelled.canBeSet()) {
activeMetadata->cancelled.send(Void());
}
activeMetadata.clear();
assignFuture.cancel();
historyLoaderFuture.cancel();
fileUpdaterFuture.cancel();
}
GranuleRangeMetadata() : lastEpoch(0), lastSeqno(0) {}
GranuleRangeMetadata(int64_t epoch, int64_t seqno, Reference<GranuleMetadata> activeMetadata)
: lastEpoch(epoch), lastSeqno(seqno), activeMetadata(activeMetadata) {}
};
// represents a previous version of a granule, and optionally the files that compose it.
struct GranuleHistoryEntry : NonCopyable, ReferenceCounted<GranuleHistoryEntry> {
KeyRange range;
UID granuleID;
Version startVersion; // version of the first snapshot
Version endVersion; // version of the last delta file
// load files lazily, and allows for clearing old cold-queried files to save memory
// FIXME: add memory limit and evictor for old cached files
Future<GranuleFiles> files;
// FIXME: do skip pointers with single back-pointer and neighbor pointers
std::vector<Reference<GranuleHistoryEntry>> parentGranules;
GranuleHistoryEntry() : startVersion(invalidVersion), endVersion(invalidVersion) {}
GranuleHistoryEntry(KeyRange range, UID granuleID, Version startVersion, Version endVersion)
: range(range), granuleID(granuleID), startVersion(startVersion), endVersion(endVersion) {}
};
struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
UID id;
Database db;
IKeyValueStore* storage;
PromiseStream<Future<Void>> addActor;
LocalityData locality;
int64_t currentManagerEpoch = -1;
AsyncVar<ReplyPromiseStream<GranuleStatusReply>> currentManagerStatusStream;
bool statusStreamInitialized = false;
// FIXME: refactor out the parts of this that are just for interacting with blob stores from the backup business
// logic
Reference<BlobConnectionProvider> bstore;
KeyRangeMap<GranuleRangeMetadata> granuleMetadata;
BGTenantMap tenantData;
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
// contains the history of completed granules before the existing ones. Maps to the latest one, and has
// back-pointers to earlier granules
// FIXME: expire from map after a delay when granule is revoked and the history is no longer needed
KeyRangeMap<Reference<GranuleHistoryEntry>> granuleHistory;
PromiseStream<AssignBlobRangeRequest> granuleUpdateErrors;
Promise<Void> doGRVCheck;
NotifiedVersion grvVersion;
std::deque<Version> prevGRVVersions;
Promise<Void> fatalError;
Promise<Void> simInjectFailure;
Promise<Void> doReadDrivenCompaction;
Reference<FlowLock> initialSnapshotLock;
Reference<FlowLock> resnapshotBudget;
Reference<FlowLock> deltaWritesBudget;
BlobWorkerStats stats;
bool shuttingDown = false;
// FIXME: have cap on this independent of delta file size for larger granules
int changeFeedStreamReplyBufferSize = SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES / 4;
EncryptionAtRestMode encryptMode;
bool buggifyFull = false;
int64_t memoryFullThreshold =
(int64_t)(SERVER_KNOBS->BLOB_WORKER_REJECT_WHEN_FULL_THRESHOLD * SERVER_KNOBS->SERVER_MEM_LIMIT);
int64_t lastResidentMemory = 0;
double lastResidentMemoryCheckTime = -100.0;
bool isFullRestoreMode = false;
BlobWorkerData(UID id, Reference<AsyncVar<ServerDBInfo> const> dbInfo, Database db, IKeyValueStore* storage)
: id(id), db(db), storage(storage), tenantData(BGTenantMap(dbInfo)), dbInfo(dbInfo),
initialSnapshotLock(new FlowLock(SERVER_KNOBS->BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM)),
resnapshotBudget(new FlowLock(SERVER_KNOBS->BLOB_WORKER_RESNAPSHOT_BUDGET_BYTES)),
deltaWritesBudget(new FlowLock(SERVER_KNOBS->BLOB_WORKER_DELTA_WRITE_BUDGET_BYTES)),
stats(id,
SERVER_KNOBS->WORKER_LOGGING_INTERVAL,
initialSnapshotLock,
resnapshotBudget,
deltaWritesBudget,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->FILE_LATENCY_SKETCH_ACCURACY,
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
encryptMode(EncryptionAtRestMode::DISABLED) {}
bool managerEpochOk(int64_t epoch) {
if (epoch < currentManagerEpoch) {
} else {
if (epoch > currentManagerEpoch) {
currentManagerEpoch = epoch;
if (BW_DEBUG) {
fmt::print("BW {0} got request from old epoch {1}, notifying them they are out of date\n",
id.toString(),
epoch);
fmt::print("BW {0} found new manager epoch {1}\n", id.toString(), currentManagerEpoch);
}
return false;
} else {
if (epoch > currentManagerEpoch) {
currentManagerEpoch = epoch;
if (BW_DEBUG) {
fmt::print("BW {0} found new manager epoch {1}\n", id.toString(), currentManagerEpoch);
}
TraceEvent(SevDebug, "BlobWorkerFoundNewManager", id).detail("Epoch", epoch);
}
return true;
TraceEvent(SevDebug, "BlobWorkerFoundNewManager", id).detail("Epoch", epoch);
}
return true;
}
}
bool isFull() {
if (!SERVER_KNOBS->BLOB_WORKER_DO_REJECT_WHEN_FULL) {
return false;
}
if (g_network->isSimulated()) {
if (g_simulator->speedUpSimulation) {
return false;
}
return buggifyFull;
}
// TODO knob?
if (now() >= 1.0 + lastResidentMemoryCheckTime) {
// fdb as of 7.1 limits on resident memory instead of virtual memory
stats.lastResidentMemory = getResidentMemoryUsage();
lastResidentMemoryCheckTime = now();
}
// if we are already over threshold, no need to estimate extra memory
if (stats.lastResidentMemory >= memoryFullThreshold) {
return true;
}
// FIXME: since this isn't tested in simulation, could unit test this
// Try to model how much memory we *could* use given the already existing assignments and workload on this blob
// worker, before agreeing to take on a new assignment, given that several large sources of memory can grow and
// change post-assignment
// FIXME: change these to be byte counts
// FIXME: buggify an extra multiplication factor for short periods of time to hopefully trigger this logic more
// often? estimate slack in bytes buffered as max(0, assignments * (delta file size / 2) - bytesBuffered)
// FIXME: this doesn't take increased delta file size for heavy write amp cases into account
int64_t expectedExtraBytesBuffered = std::max<int64_t>(
0, stats.numRangesAssigned * (SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES / 2) - stats.mutationBytesBuffered);
// estimate slack in potential pending resnapshot
int64_t totalExtra =
expectedExtraBytesBuffered + deltaWritesBudget->available() + resnapshotBudget->available();
// assumes initial snapshot parallelism is small enough and uncommon enough to not add it to this computation
stats.estimatedMaxResidentMemory = stats.lastResidentMemory + totalExtra;
return stats.estimatedMaxResidentMemory >= memoryFullThreshold;
}
void triggerReadDrivenCompaction() {
Promise<Void> doRDC = doReadDrivenCompaction;
if (doRDC.canBeSet()) {
doRDC.send(Void());
}
}
void addGRVHistory(Version readVersion) {
if (grvVersion.get() < readVersion) {
// We use GRVs from grv checker loop, plus other common BW transactions. To prevent the deque size from
// exploding or the effective version window from getting too small, only put GRVs in the deque if they are
// at least some small distance apart
if (prevGRVVersions.empty() ||
prevGRVVersions.back() + SERVER_KNOBS->BLOB_WORKER_GRV_HISTORY_MIN_VERSION_GRANULARITY <= readVersion) {
prevGRVVersions.push_back(readVersion);
while (prevGRVVersions.size() > SERVER_KNOBS->BLOB_WORKER_GRV_HISTORY_MAX_SIZE) {
prevGRVVersions.pop_front();
}
}
// set notified version last, so that all triggered waiters have prevGRVVersions populated too
grvVersion.set(readVersion);
}
}
bool maybeInjectTargetedRestart() {
// inject a BW restart at most once per test
if (g_network->isSimulated() && !g_simulator->speedUpSimulation &&
now() > g_simulator->injectTargetedBWRestartTime) {
CODE_PROBE(true, "Injecting BW targeted restart");
TraceEvent("SimBWInjectTargetedRestart", id);
g_simulator->injectTargetedBWRestartTime = std::numeric_limits<double>::max();
simInjectFailure.send(Void());
return true;
}
bool BlobWorkerData::isFull() {
if (!SERVER_KNOBS->BLOB_WORKER_DO_REJECT_WHEN_FULL) {
return false;
}
};
if (g_network->isSimulated()) {
if (g_simulator->speedUpSimulation) {
return false;
}
return buggifyFull;
}
// TODO knob?
if (now() >= 1.0 + lastResidentMemoryCheckTime) {
// fdb as of 7.1 limits on resident memory instead of virtual memory
stats.lastResidentMemory = getResidentMemoryUsage();
lastResidentMemoryCheckTime = now();
}
// if we are already over threshold, no need to estimate extra memory
if (stats.lastResidentMemory >= memoryFullThreshold) {
return true;
}
// FIXME: since this isn't tested in simulation, could unit test this
// Try to model how much memory we *could* use given the already existing assignments and workload on this blob
// worker, before agreeing to take on a new assignment, given that several large sources of memory can grow and
// change post-assignment
// FIXME: change these to be byte counts
// FIXME: buggify an extra multiplication factor for short periods of time to hopefully trigger this logic more
// often? estimate slack in bytes buffered as max(0, assignments * (delta file size / 2) - bytesBuffered)
// FIXME: this doesn't take increased delta file size for heavy write amp cases into account
int64_t expectedExtraBytesBuffered = std::max<int64_t>(
0, stats.numRangesAssigned * (SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES / 2) - stats.mutationBytesBuffered);
// estimate slack in potential pending resnapshot
int64_t totalExtra = expectedExtraBytesBuffered + deltaWritesBudget->available() + resnapshotBudget->available();
// assumes initial snapshot parallelism is small enough and uncommon enough to not add it to this computation
stats.estimatedMaxResidentMemory = stats.lastResidentMemory + totalExtra;
return stats.estimatedMaxResidentMemory >= memoryFullThreshold;
}
void BlobWorkerData::triggerReadDrivenCompaction() {
Promise<Void> doRDC = doReadDrivenCompaction;
if (doRDC.canBeSet()) {
doRDC.send(Void());
}
}
void BlobWorkerData::addGRVHistory(Version readVersion) {
if (grvVersion.get() < readVersion) {
// We use GRVs from grv checker loop, plus other common BW transactions. To prevent the deque size from
// exploding or the effective version window from getting too small, only put GRVs in the deque if they are
// at least some small distance apart
if (prevGRVVersions.empty() ||
prevGRVVersions.back() + SERVER_KNOBS->BLOB_WORKER_GRV_HISTORY_MIN_VERSION_GRANULARITY <= readVersion) {
prevGRVVersions.push_back(readVersion);
while (prevGRVVersions.size() > SERVER_KNOBS->BLOB_WORKER_GRV_HISTORY_MAX_SIZE) {
prevGRVVersions.pop_front();
}
}
// set notified version last, so that all triggered waiters have prevGRVVersions populated too
grvVersion.set(readVersion);
}
}
bool BlobWorkerData::maybeInjectTargetedRestart() {
// inject a BW restart at most once per test
if (g_network->isSimulated() && !g_simulator->speedUpSimulation &&
now() > g_simulator->injectTargetedBWRestartTime) {
CODE_PROBE(true, "Injecting BW targeted restart");
TraceEvent("SimBWInjectTargetedRestart", id);
g_simulator->injectTargetedBWRestartTime = std::numeric_limits<double>::max();
simInjectFailure.send(Void());
return true;
}
return false;
}
namespace {

View File

@ -1893,6 +1893,10 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
} else if (highestPriority == SERVER_KNOBS->PRIORITY_TEAM_HEALTHY) {
stateSectionObj["healthy"] = true;
stateSectionObj["name"] = "healthy";
} else if (highestPriority == SERVER_KNOBS->PRIORITY_PERPETUAL_STORAGE_WIGGLE) {
stateSectionObj["healthy"] = true;
stateSectionObj["name"] = "healthy_perpetual_wiggle";
stateSectionObj["description"] = "Wiggling storage server";
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_RECOVER_MOVE) {
stateSectionObj["healthy"] = true;
stateSectionObj["name"] = "healthy_rebalancing";

View File

@ -0,0 +1,221 @@
/*
* BlobWorker.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBSERVER_BLOBWORKER_H
#define FDBSERVER_BLOBWORKER_H
#include "fdbclient/BlobWorkerCommon.h"
#include "fdbserver/BlobGranuleServerCommon.actor.h"
#include "fdbserver/Knobs.h"
#include <vector>
#include "flow/actorcompiler.h" // has to be last include
struct GranuleStartState {
UID granuleID;
Version changeFeedStartVersion;
Version previousDurableVersion;
Optional<std::pair<KeyRange, UID>> splitParentGranule;
bool doSnapshot;
std::vector<GranuleFiles> blobFilesToSnapshot;
Optional<GranuleFiles> existingFiles;
Optional<GranuleHistory> history;
};
// TODO: add more (blob file request cost, in-memory mutations vs blob delta file, etc...)
struct GranuleReadStats {
int64_t deltaBytesRead;
void reset() { deltaBytesRead = 0; }
GranuleReadStats() { reset(); }
};
struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {
KeyRange keyRange;
GranuleFiles files;
Standalone<GranuleDeltas>
currentDeltas; // only contain deltas in pendingDeltaVersion + 1 through bufferedDeltaVersion
uint64_t bytesInNewDeltaFiles = 0;
uint64_t bufferedDeltaBytes = 0;
// for client to know when it is safe to read a certain version and from where (check waitForVersion)
Version bufferedDeltaVersion; // largest delta version in currentDeltas (including empty versions)
Version pendingDeltaVersion = 0; // largest version in progress writing to s3/fdb
NotifiedVersion durableDeltaVersion; // largest version persisted in s3/fdb
NotifiedVersion durableSnapshotVersion; // same as delta vars, except for snapshots
Version pendingSnapshotVersion = 0;
Version initialSnapshotVersion = invalidVersion;
Version historyVersion = invalidVersion;
Version knownCommittedVersion;
NotifiedVersion forceFlushVersion; // Version to force a flush at, if necessary
Version forceCompactVersion = invalidVersion;
int64_t originalEpoch;
int64_t originalSeqno;
int64_t continueEpoch;
int64_t continueSeqno;
Promise<Void> cancelled;
Promise<Void> readable;
Promise<Void> historyLoaded;
Promise<Void> resumeSnapshot;
AsyncVar<Reference<ChangeFeedData>> activeCFData;
AssignBlobRangeRequest originalReq;
GranuleReadStats readStats;
bool rdcCandidate;
Promise<Void> runRDC;
void resume();
void resetReadStats();
// determine eligibility (>1) and priority for re-snapshotting this granule
double weightRDC();
bool isEligibleRDC() const;
bool updateReadStats(Version readVersion, const BlobGranuleChunkRef& chunk);
inline bool doEarlyReSnapshot() {
return runRDC.isSet() ||
(forceCompactVersion <= pendingDeltaVersion && forceCompactVersion > pendingSnapshotVersion);
}
};
struct GranuleRangeMetadata {
int64_t lastEpoch;
int64_t lastSeqno;
Reference<GranuleMetadata> activeMetadata;
Future<GranuleStartState> assignFuture;
Future<Void> fileUpdaterFuture;
Future<Void> historyLoaderFuture;
void cancel();
GranuleRangeMetadata() : lastEpoch(0), lastSeqno(0) {}
GranuleRangeMetadata(int64_t epoch, int64_t seqno, Reference<GranuleMetadata> activeMetadata)
: lastEpoch(epoch), lastSeqno(seqno), activeMetadata(activeMetadata) {}
};
// represents a previous version of a granule, and optionally the files that compose it.
struct GranuleHistoryEntry : NonCopyable, ReferenceCounted<GranuleHistoryEntry> {
KeyRange range;
UID granuleID;
Version startVersion; // version of the first snapshot
Version endVersion; // version of the last delta file
// load files lazily, and allows for clearing old cold-queried files to save memory
// FIXME: add memory limit and evictor for old cached files
Future<GranuleFiles> files;
// FIXME: do skip pointers with single back-pointer and neighbor pointers
std::vector<Reference<GranuleHistoryEntry>> parentGranules;
GranuleHistoryEntry() : startVersion(invalidVersion), endVersion(invalidVersion) {}
GranuleHistoryEntry(KeyRange range, UID granuleID, Version startVersion, Version endVersion)
: range(range), granuleID(granuleID), startVersion(startVersion), endVersion(endVersion) {}
};
struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
UID id;
Database db;
IKeyValueStore* storage;
PromiseStream<Future<Void>> addActor;
LocalityData locality;
int64_t currentManagerEpoch = -1;
AsyncVar<ReplyPromiseStream<GranuleStatusReply>> currentManagerStatusStream;
bool statusStreamInitialized = false;
// FIXME: refactor out the parts of this that are just for interacting with blob stores from the backup business
// logic
Reference<BlobConnectionProvider> bstore;
KeyRangeMap<GranuleRangeMetadata> granuleMetadata;
BGTenantMap tenantData;
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
// contains the history of completed granules before the existing ones. Maps to the latest one, and has
// back-pointers to earlier granules
// FIXME: expire from map after a delay when granule is revoked and the history is no longer needed
KeyRangeMap<Reference<GranuleHistoryEntry>> granuleHistory;
PromiseStream<AssignBlobRangeRequest> granuleUpdateErrors;
Promise<Void> doGRVCheck;
NotifiedVersion grvVersion;
std::deque<Version> prevGRVVersions;
Promise<Void> fatalError;
Promise<Void> simInjectFailure;
Promise<Void> doReadDrivenCompaction;
Reference<FlowLock> initialSnapshotLock;
Reference<FlowLock> resnapshotBudget;
Reference<FlowLock> deltaWritesBudget;
BlobWorkerStats stats;
bool shuttingDown = false;
// FIXME: have cap on this independent of delta file size for larger granules
int changeFeedStreamReplyBufferSize = SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES / 4;
EncryptionAtRestMode encryptMode;
bool buggifyFull = false;
int64_t memoryFullThreshold =
(int64_t)(SERVER_KNOBS->BLOB_WORKER_REJECT_WHEN_FULL_THRESHOLD * SERVER_KNOBS->SERVER_MEM_LIMIT);
int64_t lastResidentMemory = 0;
double lastResidentMemoryCheckTime = -100.0;
bool isFullRestoreMode = false;
BlobWorkerData(UID id, Reference<AsyncVar<ServerDBInfo> const> dbInfo, Database db, IKeyValueStore* storage)
: id(id), db(db), storage(storage), tenantData(BGTenantMap(dbInfo)), dbInfo(dbInfo),
initialSnapshotLock(new FlowLock(SERVER_KNOBS->BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM)),
resnapshotBudget(new FlowLock(SERVER_KNOBS->BLOB_WORKER_RESNAPSHOT_BUDGET_BYTES)),
deltaWritesBudget(new FlowLock(SERVER_KNOBS->BLOB_WORKER_DELTA_WRITE_BUDGET_BYTES)),
stats(id,
SERVER_KNOBS->WORKER_LOGGING_INTERVAL,
initialSnapshotLock,
resnapshotBudget,
deltaWritesBudget,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->FILE_LATENCY_SKETCH_ACCURACY,
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
encryptMode(EncryptionAtRestMode::DISABLED) {}
bool managerEpochOk(int64_t epoch);
bool isFull();
void triggerReadDrivenCompaction();
void addGRVHistory(Version readVersion);
bool maybeInjectTargetedRestart();
};
#endif

View File

@ -158,6 +158,7 @@ struct PerpetualWiggleStatsWorkload : public TestWorkload {
MoveKeysLock lock = wait(takeMoveKeysLock(cx, UID())); // force current DD to quit
bool success = wait(IssueConfigurationChange(cx, "storage_migration_type=disabled", true));
ASSERT(success);
wait(delay(30.0)); // make sure the DD has already quit before the test start
return Void();
}