Add a new blob restore state to fix a race after data copy (#10854)
This commit is contained in:
parent
ae11557fc2
commit
4d2a7d507d
|
@ -1199,6 +1199,9 @@ void printStatus(StatusObjectReader statusObj,
|
|||
}
|
||||
}
|
||||
break;
|
||||
case BlobRestorePhase::COPIED_DATA:
|
||||
statusStr = fmt::format("Copied successfully at {}", tsShortStr);
|
||||
break;
|
||||
case BlobRestorePhase::APPLYING_MLOGS:
|
||||
statusStr = fmt::format("Applying mutation logs. Started at {}", tsShortStr);
|
||||
break;
|
||||
|
|
|
@ -1200,7 +1200,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( BLOB_RESTORE_MANIFEST_RETENTION_MAX, 10 );
|
||||
init( BLOB_RESTORE_MLOGS_RETENTION_SECS, isSimulated ? 180 : 3600 * 24 * 14 );
|
||||
init( BLOB_RESTORE_LOAD_KEY_VERSION_MAP_STEP_SIZE, 10000 );
|
||||
init( BLOB_RESTORE_SKIP_EMPTY_RANGES, false );
|
||||
init( BLOB_RESTORE_SKIP_EMPTY_RANGES, false ); if ( randomize && BUGGIFY ) BLOB_RESTORE_SKIP_EMPTY_RANGES = true;
|
||||
|
||||
init( BLOB_GRANULES_FLUSH_BATCH_SIZE, isSimulated ? 2 : 64 );
|
||||
|
||||
|
|
|
@ -46,10 +46,11 @@ enum BlobRestorePhase {
|
|||
LOADING_MANIFEST = 3,
|
||||
LOADED_MANIFEST = 4,
|
||||
COPYING_DATA = 5,
|
||||
APPLYING_MLOGS = 6,
|
||||
DONE = 7,
|
||||
ERROR = 8,
|
||||
MAX = 9
|
||||
COPIED_DATA = 6,
|
||||
APPLYING_MLOGS = 7,
|
||||
DONE = 8,
|
||||
ERROR = 9,
|
||||
MAX = 10
|
||||
};
|
||||
|
||||
struct BlobGranuleRestoreConfig : public KeyBackedClass {
|
||||
|
|
|
@ -96,7 +96,7 @@ private:
|
|||
continue;
|
||||
}
|
||||
|
||||
if (phase > BlobRestorePhase::COPYING_DATA) {
|
||||
if (phase > BlobRestorePhase::COPIED_DATA) {
|
||||
CODE_PROBE(true, "Restart blob migrator after data copy");
|
||||
TraceEvent("BlobMigratorAlreadyCopied", self->interf_.id()).detail("Phase", phase);
|
||||
return Void();
|
||||
|
@ -143,6 +143,9 @@ private:
|
|||
TraceEvent("ReplacedStorageInterfaceError", self->interf_.id()).error(e);
|
||||
throw e;
|
||||
}
|
||||
} else if (phase == BlobRestorePhase::COPIED_DATA) {
|
||||
CODE_PROBE(true, "Restart blob migrator after data copy");
|
||||
self->addActor(logProgress(self));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
@ -213,11 +216,13 @@ private:
|
|||
state Reference<BlobRestoreController> controller = makeReference<BlobRestoreController>(self->db_, normalKeys);
|
||||
loop {
|
||||
BlobRestorePhase phase = wait(BlobRestoreController::currentPhase(controller));
|
||||
if (phase > COPYING_DATA) {
|
||||
if (phase > COPIED_DATA) {
|
||||
return Void();
|
||||
}
|
||||
bool done = wait(checkCopyProgress(self));
|
||||
if (done) {
|
||||
wait(BlobRestoreController::setPhase(controller, COPIED_DATA, self->interf_.id()));
|
||||
wait(waitForPendingDataMovements(self));
|
||||
wait(BlobRestoreController::setPhase(controller, APPLYING_MLOGS, self->interf_.id()));
|
||||
TraceEvent("BlobMigratorCopied", self->interf_.id()).log();
|
||||
return Void();
|
||||
|
@ -226,6 +231,58 @@ private:
|
|||
}
|
||||
}
|
||||
|
||||
// Wait until all pending data movements are done. Data movement starts earlier may still potentially
|
||||
// read data from blob, which may cause race with applying mutation logs.
|
||||
ACTOR static Future<Void> waitForPendingDataMovements(Reference<BlobMigrator> self) {
|
||||
loop {
|
||||
bool pending = wait(checkPendingDataMovements(self));
|
||||
TraceEvent("BlobMigratorCheckPendingMovement", self->interf_.id()).detail("Pending", pending);
|
||||
if (!pending)
|
||||
return Void();
|
||||
wait(delay(SERVER_KNOBS->BLOB_MIGRATOR_CHECK_INTERVAL));
|
||||
}
|
||||
}
|
||||
|
||||
// Check if there is any pending data movement
|
||||
ACTOR static Future<bool> checkPendingDataMovements(Reference<BlobMigrator> self) {
|
||||
state Reference<BlobRestoreController> controller = makeReference<BlobRestoreController>(self->db_, normalKeys);
|
||||
state Transaction tr(self->db_);
|
||||
state Key begin = normalKeys.begin;
|
||||
|
||||
loop {
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
state std::vector<UID> src;
|
||||
state std::vector<UID> dest;
|
||||
state UID srcId;
|
||||
state UID destId;
|
||||
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
while (begin < normalKeys.end) {
|
||||
state RangeResult keyServers = wait(krmGetRanges(&tr,
|
||||
keyServersPrefix,
|
||||
KeyRangeRef(begin, normalKeys.end),
|
||||
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT,
|
||||
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES));
|
||||
state int i = 0;
|
||||
for (; i < keyServers.size() - 1; ++i) {
|
||||
state KeyValueRef it = keyServers[i];
|
||||
decodeKeyServersValue(UIDtoTagMap, it.value, src, dest, srcId, destId);
|
||||
if (!dest.empty()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
begin = keyServers.back().key;
|
||||
}
|
||||
return false;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check key ranges that are copied. Return true if all ranges are done
|
||||
ACTOR static Future<bool> checkCopyProgress(Reference<BlobMigrator> self) {
|
||||
state Reference<BlobRestoreController> controller = makeReference<BlobRestoreController>(self->db_, normalKeys);
|
||||
|
|
|
@ -7458,8 +7458,13 @@ ACTOR Future<Void> tryGetRangeFromBlob(PromiseStream<RangeResult> results,
|
|||
.detail("Chunk", chunks[i].keyRange)
|
||||
.detail("Version", chunks[i].includedVersion);
|
||||
RangeResult rows;
|
||||
if (i == chunks.size() - 1) {
|
||||
rows.more = false;
|
||||
} else {
|
||||
rows.more = true;
|
||||
rows.readThrough = KeyRef(rows.arena(), std::min(chunkRange.end, keys.end));
|
||||
}
|
||||
results.send(rows);
|
||||
rows.readThrough = KeyRef(rows.arena(), std::min(chunkRange.end, keys.end));
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
|
|
|
@ -292,6 +292,7 @@ struct BlobRestoreWorkload : TestWorkload {
|
|||
for (auto& r : rows) {
|
||||
data.push_back_deep(data.arena(), r);
|
||||
}
|
||||
fmt::print("Read trunk {} size {}\n", chunks[i].keyRange.toString(), rows.size());
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
|
@ -314,7 +315,14 @@ struct BlobRestoreWorkload : TestWorkload {
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
while (i < src.size()) {
|
||||
fmt::print(" src {} = {}\n", src[i].key.printable(), src[i].value.printable());
|
||||
i++;
|
||||
}
|
||||
while (i < dest.size()) {
|
||||
fmt::print(" dest {} = {}\n", dest[i].key.printable(), dest[i].value.printable());
|
||||
i++;
|
||||
}
|
||||
TraceEvent(SevError, "TestFailure")
|
||||
.detail("Reason", "Size Mismatch")
|
||||
.detail("Src", dest.size())
|
||||
|
|
Loading…
Reference in New Issue