comments and bug fix

This commit is contained in:
Josh Slocum 2022-07-13 08:01:26 -05:00
parent b2a96b64e6
commit 0720b358ff
1 changed files with 66 additions and 52 deletions

View File

@ -1355,6 +1355,8 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
}
// read mapping from db to handle any in flight granules or other issues
// Forces all granules in the specified key range to flush data to blob up to the specified version. This is required
// for executing a merge.
ACTOR Future<Void> forceGranuleFlush(Reference<BlobManagerData> bmData, KeyRange keyRange, Version version) {
state Transaction tr(bmData->db);
state KeyRange currentRange = keyRange;
@ -1483,6 +1485,8 @@ ACTOR Future<Void> forceGranuleFlush(Reference<BlobManagerData> bmData, KeyRange
return Void();
}
// Persist the merge intent for this merge in the database. Once this transaction commits, the merge is in progress. It
// cannot be aborted, and must be completed.
ACTOR Future<std::pair<UID, Version>> persistMergeGranulesStart(Reference<BlobManagerData> bmData,
KeyRange mergeRange,
std::vector<UID> parentGranuleIDs,
@ -1537,6 +1541,8 @@ ACTOR Future<std::pair<UID, Version>> persistMergeGranulesStart(Reference<BlobMa
}
// FIXME: why not just make parentGranuleRanges vector of N+1 keys?
// Persists the merge being complete in the database by clearing the merge intent. Once this transaction commits, the
// merge is considered completed.
ACTOR Future<Void> persistMergeGranulesDone(Reference<BlobManagerData> bmData,
UID mergeGranuleID,
KeyRange mergeRange,
@ -1633,6 +1639,7 @@ ACTOR Future<Void> persistMergeGranulesDone(Reference<BlobManagerData> bmData,
}
}
// This is the idempotent function that executes a granule merge once the initial merge intent has been persisted.
ACTOR Future<Void> finishMergeGranules(Reference<BlobManagerData> bmData,
UID mergeGranuleID,
KeyRange mergeRange,
@ -1645,8 +1652,6 @@ ACTOR Future<Void> finishMergeGranules(Reference<BlobManagerData> bmData,
wait(bmData->doneRecovering.getFuture());
wait(delay(0));
// each step here is idempotent, so it's ok if this stops and gets restarted from the beginning at any point
// force granules to persist state up to mergeVersion
wait(forceGranuleFlush(bmData, mergeRange, mergeVersion));
@ -1683,6 +1688,7 @@ ACTOR Future<Void> finishMergeGranules(Reference<BlobManagerData> bmData,
return Void();
}
// Make a decision on whether to merge this granule with surrounding ones.
ACTOR Future<Void> maybeMergeRange(Reference<BlobManagerData> bmData,
UID granuleID,
KeyRange granuleRange,
@ -1722,6 +1728,7 @@ ACTOR Future<Void> maybeMergeRange(Reference<BlobManagerData> bmData,
}
}
// look for candidates to right
if (granuleRange.end != normalKeys.end) {
auto rangeAfter = bmData->mergeCandidates.rangeContaining(granuleRange.end);
while (rangeAfter.cvalue().present() && afterCandidates.size() < SERVER_KNOBS->BG_MAX_MERGE_FANIN - 1) {
@ -1768,6 +1775,7 @@ ACTOR Future<Void> maybeMergeRange(Reference<BlobManagerData> bmData,
afterCandidates.size());
}
// get metrics for current granule to see if it is still mergeable
StorageMetrics targetGranuleMetrics = wait(bmData->db->getStorageMetrics(granuleRange, CLIENT_KNOBS->TOO_MANY));
if (targetGranuleMetrics.bytesPerKSecond >= SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC ||
targetGranuleMetrics.bytes >= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES) {
@ -1786,7 +1794,8 @@ ACTOR Future<Void> maybeMergeRange(Reference<BlobManagerData> bmData,
state int64_t windowBytes = targetGranuleMetrics.bytes;
windowGranules.push_back(std::tuple(granuleID, granuleRange, granuleStartVersion, windowBytes));
// first walk backwards until granule would be too large/we hit a granule that has too high bytesPerKSec
// first walk backwards through before candidates until combined granule would be too large to merge, or we hit a
// granule that has too high bytesPerKSec and isn't mergeable
state int i;
for (i = beforeCandidates.size() - 1; i >= 0; i--) {
if (BM_DEBUG) {
@ -1827,6 +1836,8 @@ ACTOR Future<Void> maybeMergeRange(Reference<BlobManagerData> bmData,
bestGranuleStartVersions.push_back(std::get<2>(it));
}
// Do moving window algorithm where we add the next afterCandidate to the merge window, and then remove the tail end
// of beforeCandidates until we are down to a mergeable granule
for (i = 0; i < afterCandidates.size(); i++) {
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): Checking after candidate [{3} - {4})\n",
@ -1916,8 +1927,8 @@ ACTOR Future<Void> maybeMergeRange(Reference<BlobManagerData> bmData,
TEST(bestGranuleIDs.size() == 1); // Cannot combine merge candidates into mergeable granule
TEST(bestGranuleIDs.size() > 1); // Granule ready for merge!
if (BM_DEBUG) {
if (bestGranuleIDs.size() > 1) {
if (bestGranuleIDs.size() > 1) {
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): Found {3} consecutive granules in range [{4} - {5}):\n",
bmData->epoch,
granuleRange.begin.printable(),
@ -1925,51 +1936,50 @@ ACTOR Future<Void> maybeMergeRange(Reference<BlobManagerData> bmData,
bestGranuleIDs.size(),
bestGranuleRange.begin.printable(),
bestGranuleRange.end.printable());
// check active merge to avoid races where 2 consecutive granules become merge candidates at the same time
if ((!g_network->isSimulated() || !g_simulator.speedUpSimulation) &&
!bmData->isMergeActive(bestGranuleRange)) {
// check to avoid races where a split eval came in while merge was evaluating
auto reCheckMergeCandidates = bmData->mergeCandidates.intersectingRanges(bestGranuleRange);
bool mergeStillOk = true;
for (auto it : reCheckMergeCandidates) {
if (!it->cvalue().present()) {
TEST(true); // granule no longer merge candidate after checking metrics, because of split eval
mergeStillOk = false;
break;
}
}
if (mergeStillOk) {
fmt::print("BM {0} maybe merge [{1} - {2}): Starting merge of [{3} - {4}) ({5})\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
bestGranuleRange.begin.printable(),
bestGranuleRange.end.printable(),
bestGranuleIDs.size());
TEST(true); // Doing granule merge!
bmData->activeGranuleMerges.insert(bestGranuleRange, 0);
bmData->mergeCandidates.insert(bestGranuleRange, Optional<std::pair<UID, Version>>());
state std::pair<UID, Version> persistMerge = wait(persistMergeGranulesStart(
bmData, bestGranuleRange, bestGranuleIDs, bestGranuleRanges, bestGranuleStartVersions));
wait(finishMergeGranules(bmData,
persistMerge.first,
bestGranuleRange,
persistMerge.second,
bestGranuleIDs,
bestGranuleRanges,
bestGranuleStartVersions));
}
// This code block must execute withou a wait for the lock checks (isMergeActive, mergeCandidates) to not
// deadlock and to avoid merge-merge races.
if ((!g_network->isSimulated() || !g_simulator.speedUpSimulation) && !bmData->isMergeActive(bestGranuleRange)) {
// check to avoid races where a split eval came in while merge was evaluating
auto reCheckMergeCandidates = bmData->mergeCandidates.intersectingRanges(bestGranuleRange);
bool mergeStillOk = true;
for (auto it : reCheckMergeCandidates) {
if (!it->cvalue().present()) {
TEST(true); // granule no longer merge candidate after checking metrics, because of split eval
mergeStillOk = false;
break;
}
}
} else {
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): No mergeable granules after checking metrics\n",
if (mergeStillOk) {
fmt::print("BM {0} maybe merge [{1} - {2}): Starting merge of [{3} - {4}) ({5})\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable());
granuleRange.end.printable(),
bestGranuleRange.begin.printable(),
bestGranuleRange.end.printable(),
bestGranuleIDs.size());
TEST(true); // Doing granule merge!
bmData->activeGranuleMerges.insert(bestGranuleRange, 0);
bmData->mergeCandidates.insert(bestGranuleRange, Optional<std::pair<UID, Version>>());
state std::pair<UID, Version> persistMerge = wait(persistMergeGranulesStart(
bmData, bestGranuleRange, bestGranuleIDs, bestGranuleRanges, bestGranuleStartVersions));
wait(finishMergeGranules(bmData,
persistMerge.first,
bestGranuleRange,
persistMerge.second,
bestGranuleIDs,
bestGranuleRanges,
bestGranuleStartVersions));
}
}
} else {
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): No mergeable granules after checking metrics\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable());
}
}
return Void();
@ -2218,15 +2228,19 @@ ACTOR Future<Void> monitorBlobWorkerStatus(Reference<BlobManagerData> bmData, Bl
bool clearMergeCandidate = !existingInProgress.present() ||
existingInProgress.get().second.type != BoundaryEvalType::MERGE;
// Check for split/merge race
Version inProgressMergeVersion = bmData->activeMergeVersion(rep.granuleRange);
fmt::print(
"BM {0} splt eval [{1} - {2}). existing={3}, inProgressMergeVersion={4}, blockedVersion={5}\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
existingInProgress.present() ? "T" : "F",
inProgressMergeVersion,
rep.blockedVersion);
if (BM_DEBUG) {
fmt::print("BM {0} splt eval [{1} - {2}). existing={3}, inProgressMergeVersion={4}, "
"blockedVersion={5}\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
existingInProgress.present() ? "T" : "F",
inProgressMergeVersion,
rep.blockedVersion);
}
// If the in progress one is a merge, and the blockedVersion < the mergeVersion, this granule
// needs to continue to flush up to the merge version. If the merge intent is still not