Made client range watching handle very long/large transactions
This commit is contained in:
parent
0f66cca8e0
commit
2438fee519
|
@ -538,6 +538,9 @@ ACTOR Future<Void> writeInitialGranuleMapping(BlobManagerData* bmData, Standalon
|
|||
tr->setOption(FDBTransactionOptions::Option::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr->setOption(FDBTransactionOptions::Option::ACCESS_SYSTEM_KEYS);
|
||||
state int j = 0;
|
||||
loop {
|
||||
try {
|
||||
|
||||
while (i + j < boundaries.size() - 1 && j < transactionChunkSize) {
|
||||
// TODO REMOVE
|
||||
if (BM_DEBUG) {
|
||||
|
@ -553,6 +556,12 @@ ACTOR Future<Void> writeInitialGranuleMapping(BlobManagerData* bmData, Standalon
|
|||
j++;
|
||||
}
|
||||
wait(tr->commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
j = 0;
|
||||
}
|
||||
}
|
||||
i += j;
|
||||
}
|
||||
return Void();
|
||||
|
@ -561,6 +570,7 @@ ACTOR Future<Void> writeInitialGranuleMapping(BlobManagerData* bmData, Standalon
|
|||
// FIXME: this does all logic in one transaction. Adding a giant range to an existing database to blobify would
|
||||
// require doing a ton of storage metrics calls, which we should split up across multiple transactions likely.
|
||||
ACTOR Future<Void> monitorClientRanges(BlobManagerData* bmData) {
|
||||
state Optional<Value> lastChangeKeyValue;
|
||||
loop {
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
|
||||
|
||||
|
@ -572,6 +582,9 @@ ACTOR Future<Void> monitorClientRanges(BlobManagerData* bmData) {
|
|||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
// read change key at this point along with ranges
|
||||
state Optional<Value> ckvBegin = wait(tr->get(blobRangeChangeKey));
|
||||
|
||||
// TODO probably knobs here? This should always be pretty small though
|
||||
RangeResult results = wait(krmGetRanges(
|
||||
tr, blobRangeKeys.begin, KeyRange(normalKeys), 10000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
|
||||
|
@ -629,13 +642,30 @@ ACTOR Future<Void> monitorClientRanges(BlobManagerData* bmData) {
|
|||
ra.assign = RangeAssignmentData(); // type=normal
|
||||
bmData->rangesToAssign.send(ra);
|
||||
}
|
||||
wait(bmData->rangesToAssign.onEmpty());
|
||||
}
|
||||
|
||||
state Future<Void> watchFuture = tr->watch(blobRangeChangeKey);
|
||||
lastChangeKeyValue =
|
||||
ckvBegin; // the version of the ranges we processed is the one read alongside the ranges
|
||||
|
||||
// do a new transaction, check for change in change key, watch if none
|
||||
tr->reset();
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
state Future<Void> watchFuture;
|
||||
|
||||
Optional<Value> ckvEnd = wait(tr->get(blobRangeChangeKey));
|
||||
|
||||
if (ckvEnd == lastChangeKeyValue) {
|
||||
watchFuture = tr->watch(blobRangeChangeKey); // watch for change in key
|
||||
wait(tr->commit());
|
||||
if (BM_DEBUG) {
|
||||
printf("Blob manager done processing client ranges, awaiting update\n");
|
||||
}
|
||||
} else {
|
||||
watchFuture = Future<Void>(Void()); // restart immediately
|
||||
}
|
||||
|
||||
wait(watchFuture);
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
|
|
Loading…
Reference in New Issue