From 2438fee519a6ef38400a1d9a56d2520be417755a Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Wed, 5 Jan 2022 14:46:01 -0600 Subject: [PATCH] Made client range watching handle very long/large transactions --- fdbserver/BlobManager.actor.cpp | 64 ++++++++++++++++++++++++--------- 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index d577206513..fd0e6a6d26 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -538,21 +538,30 @@ ACTOR Future writeInitialGranuleMapping(BlobManagerData* bmData, Standalon tr->setOption(FDBTransactionOptions::Option::PRIORITY_SYSTEM_IMMEDIATE); tr->setOption(FDBTransactionOptions::Option::ACCESS_SYSTEM_KEYS); state int j = 0; - while (i + j < boundaries.size() - 1 && j < transactionChunkSize) { - // TODO REMOVE - if (BM_DEBUG) { - printf("Persisting initial mapping for [%s - %s)\n", - boundaries[i + j].printable().c_str(), - boundaries[i + j + 1].printable().c_str()); + loop { + try { + + while (i + j < boundaries.size() - 1 && j < transactionChunkSize) { + // TODO REMOVE + if (BM_DEBUG) { + printf("Persisting initial mapping for [%s - %s)\n", + boundaries[i + j].printable().c_str(), + boundaries[i + j + 1].printable().c_str()); + } + // set to empty UID - no worker assigned yet + wait(krmSetRange(tr, + blobGranuleMappingKeys.begin, + KeyRangeRef(boundaries[i + j], boundaries[i + j + 1]), + blobGranuleMappingValueFor(UID()))); + j++; + } + wait(tr->commit()); + break; + } catch (Error& e) { + wait(tr->onError(e)); + j = 0; } - // set to empty UID - no worker assigned yet - wait(krmSetRange(tr, - blobGranuleMappingKeys.begin, - KeyRangeRef(boundaries[i + j], boundaries[i + j + 1]), - blobGranuleMappingValueFor(UID()))); - j++; } - wait(tr->commit()); i += j; } return Void(); @@ -561,6 +570,7 @@ ACTOR Future writeInitialGranuleMapping(BlobManagerData* bmData, Standalon // FIXME: this does all logic in one transaction. Adding a giant range to an existing database to blobify would // require doing a ton of storage metrics calls, which we should split up across multiple transactions likely. ACTOR Future monitorClientRanges(BlobManagerData* bmData) { + state Optional lastChangeKeyValue; loop { state Reference tr = makeReference(bmData->db); @@ -572,6 +582,9 @@ ACTOR Future monitorClientRanges(BlobManagerData* bmData) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + // read change key at this point along with ranges + state Optional ckvBegin = wait(tr->get(blobRangeChangeKey)); + // TODO probably knobs here? This should always be pretty small though RangeResult results = wait(krmGetRanges( tr, blobRangeKeys.begin, KeyRange(normalKeys), 10000, GetRangeLimits::BYTE_LIMIT_UNLIMITED)); @@ -629,13 +642,30 @@ ACTOR Future monitorClientRanges(BlobManagerData* bmData) { ra.assign = RangeAssignmentData(); // type=normal bmData->rangesToAssign.send(ra); } + wait(bmData->rangesToAssign.onEmpty()); } - state Future watchFuture = tr->watch(blobRangeChangeKey); - wait(tr->commit()); - if (BM_DEBUG) { - printf("Blob manager done processing client ranges, awaiting update\n"); + lastChangeKeyValue = + ckvBegin; // the version of the ranges we processed is the one read alongside the ranges + + // do a new transaction, check for change in change key, watch if none + tr->reset(); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + state Future watchFuture; + + Optional ckvEnd = wait(tr->get(blobRangeChangeKey)); + + if (ckvEnd == lastChangeKeyValue) { + watchFuture = tr->watch(blobRangeChangeKey); // watch for change in key + wait(tr->commit()); + if (BM_DEBUG) { + printf("Blob manager done processing client ranges, awaiting update\n"); + } + } else { + watchFuture = Future(Void()); // restart immediately } + wait(watchFuture); break; } catch (Error& e) {