Fixing merge convergence bugs

This commit is contained in:
Josh Slocum 2022-07-19 18:44:40 -05:00
parent bfab550435
commit 166de4b704
2 changed files with 30 additions and 9 deletions

View File

@ -170,6 +170,8 @@ bool compareFDBAndBlob(RangeResult fdb,
ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range) { ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range) {
// clear key range and check whether it is merged or not, repeatedly // clear key range and check whether it is merged or not, repeatedly
state Transaction tr(cx); state Transaction tr(cx);
state int reClearCount = 1;
state int reClearInterval = 1; // do quadratic backoff on clear rate, b/c large keys can keep it not write-cold
loop { loop {
try { try {
Standalone<VectorRef<KeyRangeRef>> ranges = wait(tr.getBlobGranuleRanges(range)); Standalone<VectorRef<KeyRangeRef>> ranges = wait(tr.getBlobGranuleRanges(range));
@ -177,13 +179,21 @@ ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range) {
return Void(); return Void();
} }
TEST(true); // clearAndAwaitMerge doing clear TEST(true); // clearAndAwaitMerge doing clear
tr.clear(range); reClearCount--;
wait(tr.commit()); if (reClearCount <= 0) {
tr.clear(range);
wait(tr.commit());
fmt::print("ClearAndAwaitMerge cleared [{0} - {1}) @ {2}\n",
range.begin.printable(),
range.end.printable(),
tr.getCommittedVersion());
reClearCount = reClearInterval;
reClearInterval++;
}
wait(delay(30.0)); // sleep a bit before checking on merge again wait(delay(30.0)); // sleep a bit before checking on merge again
tr.reset(); tr.reset();
} catch (Error& e) { } catch (Error& e) {
wait(tr.onError(e)); wait(tr.onError(e));
} }
} }
} }

View File

@ -1088,11 +1088,9 @@ ACTOR Future<Void> granuleCheckMergeCandidate(Reference<BlobWorkerData> bwData,
metadata->originalEpoch, metadata->originalEpoch,
metadata->originalSeqno)); metadata->originalSeqno));
// if a new manager appears, also tell it about this granule being mergeable // if a new manager appears, also tell it about this granule being mergeable
state int64_t lastSendEpoch = bwData->currentManagerEpoch; // or if a new stream from the existing manager, it may have missed the message due to a network issue
while (lastSendEpoch == bwData->currentManagerEpoch) { wait(bwData->currentManagerStatusStream.onChange());
wait(bwData->currentManagerStatusStream.onChange()); wait(delay(0));
wait(delay(0));
}
TEST(true); // Blob worker re-sending merge candidate to new manager TEST(true); // Blob worker re-sending merge candidate to new manager
} catch (Error& e) { } catch (Error& e) {
if (e.code() == error_code_operation_cancelled) { if (e.code() == error_code_operation_cancelled) {
@ -1769,6 +1767,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
.detail("RollbackVersion", rollbackVersion); .detail("RollbackVersion", rollbackVersion);
} }
Version oldPendingSnapshot = metadata->pendingSnapshotVersion;
Version cfRollbackVersion = doGranuleRollback(metadata, Version cfRollbackVersion = doGranuleRollback(metadata,
deltas.version, deltas.version,
rollbackVersion, rollbackVersion,
@ -1776,6 +1775,18 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
rollbacksInProgress, rollbacksInProgress,
rollbacksCompleted); rollbacksCompleted);
if (oldPendingSnapshot > metadata->pendingSnapshotVersion) {
// If rollback cancelled in-flight snapshot, merge candidate checker also got
// cancelled. Restart it
TEST(true); // Restarting merge candidate checker after rolling back snapshot
checkMergeCandidate = granuleCheckMergeCandidate(
bwData,
metadata,
startState.granuleID,
inFlightFiles.empty() ? Future<Void>(Void())
: success(inFlightFiles.back().future));
}
Reference<ChangeFeedData> cfData = makeReference<ChangeFeedData>(); Reference<ChangeFeedData> cfData = makeReference<ChangeFeedData>();
if (!readOldChangeFeed && cfRollbackVersion < startState.changeFeedStartVersion) { if (!readOldChangeFeed && cfRollbackVersion < startState.changeFeedStartVersion) {