batch periodic merging in blob manager

This commit is contained in:
Josh Slocum 2022-07-15 15:52:10 -05:00
parent 866dda5763
commit 306610bfcb
4 changed files with 253 additions and 318 deletions

View File

@ -910,7 +910,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( BG_ENABLE_MERGING, true ); if (randomize && BUGGIFY) BG_ENABLE_MERGING = false;
init( BG_MERGE_CANDIDATE_THRESHOLD_SECONDS, isSimulated ? 20.0 : 30 * 60 ); if (randomize && BUGGIFY) BG_MERGE_CANDIDATE_THRESHOLD_SECONDS = 5.0;
init( BG_MERGE_CANDIDATE_DELAY_SECONDS, BG_MERGE_CANDIDATE_THRESHOLD_SECONDS / 10.0 );
init( BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM, 8 ); if( randomize && BUGGIFY ) BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM = 1;
init( BLOB_WORKER_TIMEOUT, 10.0 ); if( randomize && BUGGIFY ) BLOB_WORKER_TIMEOUT = 1.0;
@ -922,6 +922,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( BLOB_MANAGER_STATUS_EXP_BACKOFF_MIN, 0.1 );
init( BLOB_MANAGER_STATUS_EXP_BACKOFF_MAX, 5.0 );
init( BLOB_MANAGER_STATUS_EXP_BACKOFF_EXPONENT, 1.5 );
init( BLOB_MANAGER_CONCURRENT_MERGE_CHECKS, 64 ); if( randomize && BUGGIFY ) BLOB_MANAGER_CONCURRENT_MERGE_CHECKS = 1 << deterministicRandom()->randomInt(0, 7);
init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 );
init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 );

View File

@ -887,6 +887,7 @@ public:
int BG_CONSISTENCY_CHECK_TARGET_SPEED_KB;
bool BG_ENABLE_MERGING;
int BG_MERGE_CANDIDATE_THRESHOLD_SECONDS;
int BG_MERGE_CANDIDATE_DELAY_SECONDS;
int BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM;
double BLOB_WORKER_TIMEOUT; // Blob Manager's reaction time to a blob worker failure
@ -897,6 +898,7 @@ public:
double BLOB_MANAGER_STATUS_EXP_BACKOFF_MIN;
double BLOB_MANAGER_STATUS_EXP_BACKOFF_MAX;
double BLOB_MANAGER_STATUS_EXP_BACKOFF_EXPONENT;
int BLOB_MANAGER_CONCURRENT_MERGE_CHECKS;
double BGCC_TIMEOUT;
double BGCC_MIN_INTERVAL;

View File

@ -25,6 +25,7 @@
#include <vector>
#include <unordered_map>
#include "fdbrpc/simulator.h"
#include "fmt/format.h"
#include "fdbclient/BackupContainerFileSystem.h"
#include "fdbclient/BlobGranuleCommon.h"
@ -50,7 +51,7 @@
* The Blob Manager is responsible for managing range granules, and recruiting and monitoring Blob Workers.
*/
#define BM_DEBUG false
#define BM_DEBUG true
void handleClientBlobRange(KeyRangeMap<bool>* knownBlobRanges,
Arena& ar,
@ -281,6 +282,32 @@ struct BlobManagerStats {
}
};
enum MergeCandidateState {
MergeCandidateCannotMerge,
MergeCandidateCanMerge,
MergeCandidateUnknown,
MergeCandidateMerging
};
struct MergeCandidateInfo {
MergeCandidateState st;
UID granuleID;
Version startVersion;
bool mergeNow;
MergeCandidateInfo() : st(MergeCandidateUnknown), startVersion(invalidVersion), mergeNow(false) {}
MergeCandidateInfo(MergeCandidateState st) : st(st), startVersion(invalidVersion), mergeNow(false) {
ASSERT(st != MergeCandidateCanMerge);
}
MergeCandidateInfo(UID granuleID, Version startVersion)
: st(MergeCandidateCanMerge), granuleID(granuleID), startVersion(startVersion), mergeNow(false) {}
bool canMerge() const { return st == MergeCandidateCanMerge; }
bool canMergeNow() const { return st == MergeCandidateCanMerge && mergeNow; }
};
struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
UID id;
Database db;
@ -301,11 +328,13 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
KeyRangeMap<BoundaryEvaluation> boundaryEvaluations;
KeyRangeMap<bool> knownBlobRanges;
BGTenantMap tenantData;
KeyRangeMap<Optional<std::pair<UID, Version>>> mergeCandidates; // granule range to granule id + start version.
KeyRangeMap<MergeCandidateInfo> mergeCandidates; // granule range to granule id + start version.
KeyRangeMap<Version> activeGranuleMerges; // range map of active granule merges, because range in boundaryEval
// doesn't correspond to merge range. invalidVersion is no merge,
// 0 is no merge version determined yet
FlowLock concurrentMergeChecks;
AsyncTrigger startRecruiting;
Debouncer restartRecruiting;
std::set<NetworkAddress> recruitingLocalities; // the addrs of the workers being recruited on
@ -321,9 +350,10 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
BlobManagerData(UID id, Reference<AsyncVar<ServerDBInfo> const> dbInfo, Database db, Optional<Key> dcId)
: id(id), db(db), dcId(dcId), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &workersById),
knownBlobRanges(false, normalKeys.end), tenantData(BGTenantMap(dbInfo)),
mergeCandidates(Optional<std::pair<UID, Version>>(), normalKeys.end),
activeGranuleMerges(invalidVersion, normalKeys.end), restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY),
recruitingStream(0) {}
mergeCandidates(MergeCandidateInfo(MergeCandidateUnknown), normalKeys.end),
activeGranuleMerges(invalidVersion, normalKeys.end),
concurrentMergeChecks(SERVER_KNOBS->BLOB_MANAGER_CONCURRENT_MERGE_CHECKS),
restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), recruitingStream(0) {}
// only initialize blob store if actually needed
void initBStore() {
@ -347,6 +377,7 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
}
return false;
}
Version activeMergeVersion(const KeyRangeRef& range) {
auto ranges = activeGranuleMerges.intersectingRanges(range);
Version v = invalidVersion;
@ -355,6 +386,30 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
}
return v;
}
void setMergeCandidate(const KeyRangeRef& range, UID granuleID, Version startVersion) {
// Want this to be idempotent. If a granule was already reported as merge-eligible, we want to use the existing
// merge and mergeNow state.
auto it = mergeCandidates.rangeContaining(range.begin);
if (it->begin() == range.begin && it.end() == range.end) {
if (it->cvalue().st != MergeCandidateCanMerge) {
// same range, just update
it->value() = MergeCandidateInfo(granuleID, startVersion);
} else {
// else no-op, but validate data
ASSERT(granuleID == it->cvalue().granuleID);
ASSERT(startVersion == it->cvalue().startVersion);
}
} else if (it->cvalue().st != MergeCandidateMerging) {
mergeCandidates.insert(range, MergeCandidateInfo(granuleID, startVersion));
}
}
void clearMergeCandidate(const KeyRangeRef& range, MergeCandidateState st) {
ASSERT(st != MergeCandidateCanMerge);
mergeCandidates.insert(range, MergeCandidateInfo(st));
}
};
ACTOR Future<Standalone<VectorRef<KeyRef>>> splitRange(Reference<BlobManagerData> bmData,
@ -1684,307 +1739,193 @@ ACTOR Future<Void> finishMergeGranules(Reference<BlobManagerData> bmData,
bmData->boundaryEvaluations.insert(mergeRange,
BoundaryEvaluation(bmData->epoch, seqnoForEval, BoundaryEvalType::MERGE, 0, 0));
bmData->clearMergeCandidate(mergeRange, MergeCandidateMerging);
return Void();
}
// Make a decision on whether to merge this granule with surrounding ones.
ACTOR Future<Void> maybeMergeRange(Reference<BlobManagerData> bmData,
UID granuleID,
KeyRange granuleRange,
Version granuleStartVersion) {
state std::deque<std::tuple<UID, KeyRange, Version>> beforeCandidates, afterCandidates;
ACTOR Future<Void> doMerge(Reference<BlobManagerData> bmData,
KeyRange mergeRange,
std::vector<std::tuple<UID, KeyRange, Version>> toMerge) {
// switch to format persist merge wants
state std::vector<UID> ids;
state std::vector<KeyRange> ranges;
state std::vector<Version> startVersions;
for (auto& it : toMerge) {
ids.push_back(std::get<0>(it));
ranges.push_back(std::get<1>(it));
startVersions.push_back(std::get<2>(it));
}
try {
std::pair<UID, Version> persistMerge =
wait(persistMergeGranulesStart(bmData, mergeRange, ids, ranges, startVersions));
wait(finishMergeGranules(
bmData, persistMerge.first, mergeRange, persistMerge.second, ids, ranges, startVersions));
return Void();
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled || e.code() == error_code_blob_manager_replaced) {
throw;
}
TraceEvent(SevError, "UnexpectedErrorGranuleMerge").error(e).detail("Range", mergeRange);
throw e;
}
}
// Needs to not be an actor to run synchronously for the race checking.
// Technically this could just be the first part of doMerge, but this guarantees no waits happen for the checks before
// the logic starts
static void attemptStartMerge(Reference<BlobManagerData> bmData,
const std::vector<std::tuple<UID, KeyRange, Version>>& toMerge) {
if (toMerge.size() < 2) {
return;
}
KeyRange mergeRange(KeyRangeRef(std::get<1>(toMerge.front()).begin, std::get<1>(toMerge.back()).end));
// merge/merge races should not be possible because granuleMergeChecker should only start attemptMerges() for
// disjoint ranges, and merge candidate is not updated if it is already in the state MergeCandidateMerging
ASSERT(!bmData->isMergeActive(mergeRange));
// Check to avoid races where a split eval came in while merge was evaluating. This also effectively checks
// boundaryEvals because they're both updated before maybeSplitRange is called. This handles split/merge races.
auto reCheckMergeCandidates = bmData->mergeCandidates.intersectingRanges(mergeRange);
for (auto it : reCheckMergeCandidates) {
if (!it->cvalue().canMergeNow()) {
TEST(true); // granule no longer merge candidate after checking metrics, aborting merge
return;
}
}
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): Start\n",
fmt::print("BM {0} Starting merge of [{1} - {2}) ({3})\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable());
mergeRange.begin.printable(),
mergeRange.end.printable(),
toMerge.size());
}
TEST(true); // Doing granule merge!
bmData->activeGranuleMerges.insert(mergeRange, 0);
bmData->clearMergeCandidate(mergeRange, MergeCandidateMerging);
// Now, after setting activeGranuleMerges, we have committed to doing the merge, so any subsequent split eval for
// any of the ranges will be ignored. This handles merge/split races.
bmData->addActor.send(doMerge(bmData, mergeRange, toMerge));
}
// look for candidates to the left
if (granuleRange.begin != normalKeys.begin) {
auto rangeBefore = bmData->mergeCandidates.rangeContainingKeyBefore(granuleRange.begin);
while (rangeBefore.cvalue().present() && beforeCandidates.size() < SERVER_KNOBS->BG_MAX_MERGE_FANIN - 1) {
// if it is a merge candidate, add it to the list
beforeCandidates.push_front(
std::tuple(rangeBefore.cvalue().get().first, rangeBefore.range(), rangeBefore.cvalue().get().second));
// Greedily merges any consecutive 2+ granules in a row that are mergeable
ACTOR Future<Void> attemptMerges(Reference<BlobManagerData> bmData,
std::vector<std::tuple<UID, KeyRange, Version>> candidates) {
ASSERT(candidates.size() >= 2);
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): Before candidate [{3} - {4})\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
rangeBefore.begin().printable(),
rangeBefore.end().printable());
}
ASSERT(rangeBefore.begin() >= normalKeys.begin);
if (rangeBefore.begin() == normalKeys.begin) {
break;
} else {
--rangeBefore;
}
}
// TODO REMOVE validation eventually
for (int i = 0; i < candidates.size() - 1; i++) {
ASSERT(std::get<1>(candidates[i]).end == std::get<1>(candidates[i + 1]).begin);
}
TEST(true); // Candidate ranges to merge
wait(bmData->concurrentMergeChecks.take());
state FlowLock::Releaser holdingDVL(bmData->concurrentMergeChecks);
// look for candidates to right
if (granuleRange.end != normalKeys.end) {
auto rangeAfter = bmData->mergeCandidates.rangeContaining(granuleRange.end);
while (rangeAfter.cvalue().present() && afterCandidates.size() < SERVER_KNOBS->BG_MAX_MERGE_FANIN - 1) {
// if it is a merge candidate, add it to the list
afterCandidates.push_back(
std::tuple(rangeAfter.cvalue().get().first, rangeAfter.range(), rangeAfter.cvalue().get().second));
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): After candidate [{3} - {4})\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
rangeAfter.begin().printable(),
rangeAfter.end().printable());
}
ASSERT(rangeAfter.end() <= normalKeys.end);
if (rangeAfter.end() == normalKeys.end) {
break;
} else {
++rangeAfter;
}
}
}
if (beforeCandidates.empty() && afterCandidates.empty()) {
TEST(true); // no consecutive merge candidates
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): No merge candidates\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable());
}
return Void();
}
TEST(true); // consecutive granule merge candidates
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): Checking metrics for {3} candidates ({4} - {5})\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
beforeCandidates.size() + afterCandidates.size() + 1,
beforeCandidates.size(),
afterCandidates.size());
}
// get metrics for current granule to see if it is still mergeable
StorageMetrics targetGranuleMetrics = wait(bmData->db->getStorageMetrics(granuleRange, CLIENT_KNOBS->TOO_MANY));
if (targetGranuleMetrics.bytesPerKSecond >= SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC ||
targetGranuleMetrics.bytes >= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES) {
TEST(true); // granule merge candidate no longer mergeable
return Void();
}
// best set of granules to merge
state std::vector<UID> bestGranuleIDs;
state std::vector<KeyRange> bestGranuleRanges;
state std::vector<Version> bestGranuleStartVersions;
state KeyRange bestGranuleRange;
// current set of granules being evaluated
state std::deque<std::tuple<UID, KeyRange, Version, int64_t>> windowGranules;
state int64_t windowBytes = targetGranuleMetrics.bytes;
windowGranules.push_back(std::tuple(granuleID, granuleRange, granuleStartVersion, windowBytes));
// first walk backwards through before candidates until combined granule would be too large to merge, or we hit a
// granule that has too high bytesPerKSec and isn't mergeable
// start merging any set of 2+ consecutive granules that can be merged
state int64_t currentBytes = 0;
state std::vector<std::tuple<UID, KeyRange, Version>> currentCandidates;
state int i;
for (i = beforeCandidates.size() - 1; i >= 0; i--) {
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): Checking before candidate [{3} - {4})\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
std::get<1>(beforeCandidates[i]).begin.printable(),
std::get<1>(beforeCandidates[i]).end.printable());
for (i = 0; i < candidates.size(); i++) {
StorageMetrics metrics =
wait(bmData->db->getStorageMetrics(std::get<1>(candidates[i]), CLIENT_KNOBS->TOO_MANY));
if (metrics.bytes >= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES ||
metrics.bytesPerKSecond >= SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC) {
// This granule cannot be merged with any neighbors.
// If current candidates up to here can be merged, merge them and skip over this one
attemptStartMerge(bmData, currentCandidates);
currentCandidates.clear();
currentBytes = 0;
continue;
}
StorageMetrics beforeMetrics =
wait(bmData->db->getStorageMetrics(std::get<1>(beforeCandidates[i]), CLIENT_KNOBS->TOO_MANY));
if (windowBytes + beforeMetrics.bytes >= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES ||
beforeMetrics.bytesPerKSecond >= SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC) {
break;
// if the current window is already at the maximum merge size, or adding this granule would push the window over
// the edge, merge the existing candidates if possible
ASSERT(currentCandidates.size() <= SERVER_KNOBS->BG_MAX_MERGE_FANIN);
if (currentCandidates.size() == SERVER_KNOBS->BG_MAX_MERGE_FANIN ||
currentBytes + metrics.bytes > SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES) {
ASSERT(currentBytes <= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES);
attemptStartMerge(bmData, currentCandidates);
currentCandidates.clear();
currentBytes = 0;
}
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): Before Candidate [{3} - {4}): {5} bytes\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
std::get<1>(beforeCandidates[i]).begin.printable(),
std::get<1>(beforeCandidates[i]).end.printable(),
beforeMetrics.bytes);
}
windowBytes += beforeMetrics.bytes;
windowGranules.push_front(std::tuple(std::get<0>(beforeCandidates[i]),
std::get<1>(beforeCandidates[i]),
std::get<2>(beforeCandidates[i]),
beforeMetrics.bytes));
// add this granule to the window
currentCandidates.push_back(candidates[i]);
}
// set first window as the best range
bestGranuleRange = KeyRangeRef(std::get<1>(windowGranules.front()).begin, std::get<1>(windowGranules.back()).end);
for (auto& it : windowGranules) {
bestGranuleIDs.push_back(std::get<0>(it));
bestGranuleRanges.push_back(std::get<1>(it));
bestGranuleStartVersions.push_back(std::get<2>(it));
}
// Do moving window algorithm where we add the next afterCandidate to the merge window, and then remove the tail end
// of beforeCandidates until we are down to a mergeable granule
for (i = 0; i < afterCandidates.size(); i++) {
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): Checking after candidate [{3} - {4})\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
std::get<1>(afterCandidates[i]).begin.printable(),
std::get<1>(afterCandidates[i]).end.printable());
}
// include this granule in the window
StorageMetrics afterMetrics =
wait(bmData->db->getStorageMetrics(std::get<1>(afterCandidates[i]), CLIENT_KNOBS->TOO_MANY));
if (afterMetrics.bytes >= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES ||
afterMetrics.bytesPerKSecond >= SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC) {
break;
}
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): After Candidate [{3} - {4}): {5} bytes\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
std::get<1>(afterCandidates[i]).begin.printable(),
std::get<1>(afterCandidates[i]).end.printable(),
afterMetrics.bytes);
}
windowBytes += afterMetrics.bytes;
windowGranules.push_back(std::tuple(std::get<0>(afterCandidates[i]),
std::get<1>(afterCandidates[i]),
std::get<2>(afterCandidates[i]),
afterMetrics.bytes));
// slide the window forward back down to mergeable size
while (windowBytes >= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES) {
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): window bytes {3} >= target {4}\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
windowBytes,
SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES);
}
ASSERT(!windowGranules.empty());
if (std::get<0>(windowGranules.front()) == granuleID) {
// merge must include target granule
break;
}
if (BM_DEBUG) {
fmt::print(
"BM {0} maybe merge [{1} - {2}): After Candidate [{3} - {4}) popping [{5} - {6}): {7} bytes\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
std::get<1>(afterCandidates[i]).begin.printable(),
std::get<1>(afterCandidates[i]).end.printable(),
std::get<1>(windowGranules.front()).begin.printable(),
std::get<1>(windowGranules.front()).end.printable(),
std::get<3>(windowGranules.front()));
}
windowBytes -= std::get<3>(windowGranules.front());
windowGranules.pop_front();
}
// compare this candidate window to previous best
if (windowBytes >= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES) {
break;
} else if (windowGranules.size() > bestGranuleIDs.size()) {
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): new best granules {3}\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
windowGranules.size());
}
bestGranuleRange =
KeyRangeRef(std::get<1>(windowGranules.front()).begin, std::get<1>(windowGranules.back()).end);
bestGranuleIDs.clear();
bestGranuleRanges.clear();
bestGranuleStartVersions.clear();
for (auto& it : windowGranules) {
bestGranuleIDs.push_back(std::get<0>(it));
bestGranuleRanges.push_back(std::get<1>(it));
bestGranuleStartVersions.push_back(std::get<2>(it));
}
}
}
TEST(bestGranuleIDs.size() == 1); // Cannot combine merge candidates into mergeable granule
TEST(bestGranuleIDs.size() > 1); // Granule ready for merge!
if (bestGranuleIDs.size() > 1) {
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): Found {3} consecutive granules in range [{4} - {5}):\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
bestGranuleIDs.size(),
bestGranuleRange.begin.printable(),
bestGranuleRange.end.printable());
}
// This code block must execute withou a wait for the lock checks (isMergeActive, mergeCandidates) to not
// deadlock and to avoid merge-merge races.
if (!bmData->isMergeActive(bestGranuleRange)) {
// check to avoid races where a split eval came in while merge was evaluating
auto reCheckMergeCandidates = bmData->mergeCandidates.intersectingRanges(bestGranuleRange);
bool mergeStillOk = true;
for (auto it : reCheckMergeCandidates) {
if (!it->cvalue().present()) {
TEST(true); // granule no longer merge candidate after checking metrics, because of split eval
mergeStillOk = false;
break;
}
}
if (mergeStillOk) {
fmt::print("BM {0} maybe merge [{1} - {2}): Starting merge of [{3} - {4}) ({5})\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
bestGranuleRange.begin.printable(),
bestGranuleRange.end.printable(),
bestGranuleIDs.size());
TEST(true); // Doing granule merge!
bmData->activeGranuleMerges.insert(bestGranuleRange, 0);
bmData->mergeCandidates.insert(bestGranuleRange, Optional<std::pair<UID, Version>>());
state std::pair<UID, Version> persistMerge = wait(persistMergeGranulesStart(
bmData, bestGranuleRange, bestGranuleIDs, bestGranuleRanges, bestGranuleStartVersions));
wait(finishMergeGranules(bmData,
persistMerge.first,
bestGranuleRange,
persistMerge.second,
bestGranuleIDs,
bestGranuleRanges,
bestGranuleStartVersions));
}
}
} else {
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): No mergeable granules after checking metrics\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable());
}
}
attemptStartMerge(bmData, currentCandidates);
return Void();
}
// Uses single-pass algorithm to identify mergeable sections of granules.
// To ensure each granule waits to see whether all of its neighbors are merge-eligible before merging it, a newly
// merge-eligible granule will be ignored on the first pass
ACTOR Future<Void> granuleMergeChecker(Reference<BlobManagerData> bmData) {
// initial sleep
wait(delayJittered(SERVER_KNOBS->BG_MERGE_CANDIDATE_DELAY_SECONDS));
// TODO could optimize to not check if there are no new merge-eligible granules and none in merge pending state
loop {
double sleepTime = SERVER_KNOBS->BG_MERGE_CANDIDATE_DELAY_SECONDS;
// Check more frequently if speedUpSimulation is set. This may
if (g_network->isSimulated() && g_simulator.speedUpSimulation) {
sleepTime = std::min(5.0, sleepTime);
}
// start delay at the start of the loop, to account for time spend in calculation
state Future<Void> intervalDelay = delayJittered(sleepTime);
// go over granule states, and start a findMergeableGranules for each sub-range of mergeable granules
// FIXME: avoid SlowTask by breaking this up periodically
// Break it up into parallel chunks. This makes it possible to process large ranges, but does mean the merges
// can be slightly suboptimal at boundaries. Use relatively large chunks to minimize the impact of this.
int maxRangeSize = SERVER_KNOBS->BG_MAX_MERGE_FANIN * 10;
state std::vector<Future<Void>> mergeChecks;
auto allRanges = bmData->mergeCandidates.ranges();
std::vector<std::tuple<UID, KeyRange, Version>> currentCandidates;
for (auto& it : allRanges) {
if (!it->cvalue().canMergeNow() || currentCandidates.size() == maxRangeSize) {
if (currentCandidates.size() >= 2) {
mergeChecks.push_back(attemptMerges(bmData, currentCandidates));
}
currentCandidates.clear();
}
if (it->cvalue().canMergeNow()) {
currentCandidates.push_back(std::tuple(it->cvalue().granuleID, it->range(), it->cvalue().startVersion));
} else if (it->cvalue().canMerge()) {
// The current algorithm, skipping just granules that will be merge-eligible on the next pass, but not
// their neighbors, is optimal for guaranteeing merges to make progress where possible, with decently
// optimal but not globally optimal merge behavior.
// Alternative algorithms include not doing a two-pass consideration at all and immediately considering
// all merge candidates, which guarantees the most progress but pretty much guarantees undesirably
// suboptimal merge decisions, because of the time variance of granules becoming merge candidates. Or,
// also skipping adjacent eligible granules in addition to the one that will be eligible next pass,
// which ensures optimally large merges in a future pass, but adds decent delay to doing the merge. Or,
// smarter considering of merge candidates adjacent to the one that will be eligible next pass
// (depending on whether potential future merges with adjacent ones could include this candidate), which
// would be the best of both worlds, but would add a decent amount of code complexity.
// set flag so this can get merged on the next pass
it->value().mergeNow = true;
}
}
if (currentCandidates.size() >= 2) {
mergeChecks.push_back(attemptMerges(bmData, currentCandidates));
}
TEST(mergeChecks.size() > 1); // parallel merge checks
wait(waitForAll(mergeChecks));
// if the calculation took longer than the desired interval, still wait a bit
wait(intervalDelay && delay(5.0));
}
}
ACTOR Future<Void> deregisterBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerInterface interf) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
loop {
@ -2309,34 +2250,21 @@ ACTOR Future<Void> monitorBlobWorkerStatus(Reference<BlobManagerData> bmData, Bl
// clear merge candidates for range, if not already merging
if (clearMergeCandidate) {
bmData->mergeCandidates.insert(rep.granuleRange, Optional<std::pair<UID, Version>>());
bmData->clearMergeCandidate(rep.granuleRange, MergeCandidateCannotMerge);
}
}
if (rep.mergeCandidate && !ignore) {
// mark granule as merge candidate
ASSERT(!rep.doSplit);
// TODO: do we need any sort of validation that this is coming from the worker that currently owns
// the granule?
if (existingInProgress.present()) {
// TODO LOG?
} else {
if (BM_DEBUG) {
fmt::print("Manager {0} evaluating [{1} - {2}) {3}\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
newEval.toString());
}
if (!bmData->isMergeActive(rep.granuleRange)) {
ASSERT(rep.mergeCandidate);
TEST(true); // Granule merge candidate
bmData->mergeCandidates.insert(rep.granuleRange,
std::pair(rep.granuleID, rep.startVersion));
newEval.inProgress =
maybeMergeRange(bmData, rep.granuleID, rep.granuleRange, rep.startVersion);
// still update epoch/seqno even if not doing a merge eval
bmData->boundaryEvaluations.insert(rep.granuleRange, newEval);
}
if (BM_DEBUG) {
fmt::print("Manager {0} merge candidate granule [{1} - {2}) {3}\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
newEval.toString());
}
bmData->boundaryEvaluations.insert(rep.granuleRange, newEval);
bmData->setMergeCandidate(rep.granuleRange, rep.granuleID, rep.startVersion);
}
}
} catch (Error& e) {
@ -2597,15 +2525,16 @@ ACTOR Future<Void> resumeActiveMerges(Reference<BlobManagerData> bmData) {
// report updated status. Start with early (epoch, seqno) to guarantee lower than later status
BoundaryEvaluation eval(1, 0, BoundaryEvalType::MERGE, 1, 0);
ASSERT(!bmData->isMergeActive(mergeRange));
eval.inProgress = finishMergeGranules(bmData,
mergeGranuleID,
mergeRange,
mergeVersion,
parentGranuleIDs,
parentGranuleRanges,
parentGranuleStartVersions);
bmData->addActor.send(finishMergeGranules(bmData,
mergeGranuleID,
mergeRange,
mergeVersion,
parentGranuleIDs,
parentGranuleRanges,
parentGranuleStartVersions));
bmData->boundaryEvaluations.insert(mergeRange, eval);
bmData->activeGranuleMerges.insert(mergeRange, mergeVersion);
bmData->clearMergeCandidate(mergeRange, MergeCandidateMerging);
}
if (result.more) {
@ -3983,6 +3912,9 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
if (SERVER_KNOBS->BG_CONSISTENCY_CHECK_ENABLED) {
self->addActor.send(bgConsistencyCheck(self));
}
if (SERVER_KNOBS->BG_ENABLE_MERGING) {
self->addActor.send(granuleMergeChecker(self));
}
if (BUGGIFY) {
self->addActor.send(chaosRangeMover(self));

View File

@ -1035,13 +1035,13 @@ ACTOR Future<Void> granuleCheckMergeCandidate(Reference<BlobWorkerData> bwData,
}
// wait for the last snapshot to finish, so that the delay is from the last snapshot
wait(waitStart);
wait(delayJittered(SERVER_KNOBS->BG_MERGE_CANDIDATE_THRESHOLD_SECONDS));
double jitter = deterministicRandom()->random01() * 0.8 * SERVER_KNOBS->BG_MERGE_CANDIDATE_DELAY_SECONDS;
wait(delay(SERVER_KNOBS->BG_MERGE_CANDIDATE_THRESHOLD_SECONDS + jitter));
loop {
// this actor will be cancelled if a split check happened, or if the granule was moved away, so this
// being here means that granule is cold enough during that period. Now we just need to check if it is
// also small enough to be a merge candidate.
StorageMetrics currentMetrics = wait(bwData->db->getStorageMetrics(metadata->keyRange, CLIENT_KNOBS->TOO_MANY));
state int64_t granuleBytes = currentMetrics.bytes;
// FIXME: maybe separate knob and/or value for write rate?
if (currentMetrics.bytes >= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES / 2 ||