Fix blob restore stuck issue (#10574)
This commit is contained in:
parent
cf3b5d8c57
commit
66a7acd960
|
@ -705,8 +705,8 @@ public:
|
|||
TraceEvent("BlobRestoreMissingData").detail("KeyRange", granuleRange.toString());
|
||||
} else {
|
||||
TraceEvent("BlobManifestError").error(e).detail("KeyRange", granuleRange.toString());
|
||||
throw;
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
return results;
|
||||
|
|
|
@ -965,7 +965,7 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
|
|||
}
|
||||
// if we got transaction_too_old naturally, have lower threshold for re-evaluating (2xlimit)
|
||||
if (initialSnapshot && snapshot.size() > 1 && e.code() == error_code_transaction_too_old &&
|
||||
(injectTooBig || bytesRead >= 2 * SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES)) {
|
||||
(injectTooBig || bytesRead >= 2 * SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES) && snapshot.size() > 3) {
|
||||
// idle this actor, while we tell the manager this is too big and to re-evaluate granules and revoke us
|
||||
if (BW_DEBUG) {
|
||||
fmt::print("Granule [{0} - {1}) re-evaluating snapshot after {2} bytes ({3} limit) {4}\n",
|
||||
|
@ -2231,7 +2231,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
|
|||
fmt::print(" blobFilesToSnapshot={}\n", startState.blobFilesToSnapshot.size());
|
||||
}
|
||||
|
||||
state Version startVersion;
|
||||
state Version startVersion = invalidVersion;
|
||||
state BlobFileIndex newSnapshotFile;
|
||||
|
||||
// if this is a reassign, calculate how close to a snapshot the previous owner was
|
||||
|
@ -2277,16 +2277,20 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
|
|||
|
||||
metadata->durableSnapshotVersion.set(minDurableSnapshotV);
|
||||
} else {
|
||||
ASSERT(startState.previousDurableVersion == invalidVersion);
|
||||
BlobFileIndex fromFDB = wait(
|
||||
dumpInitialSnapshotFromFDB(bwData, bstore, metadata, startState.granuleID, cfKey, &inFlightPops));
|
||||
newSnapshotFile = fromFDB;
|
||||
ASSERT(startState.changeFeedStartVersion <= fromFDB.version);
|
||||
startVersion = newSnapshotFile.version;
|
||||
metadata->files.snapshotFiles.push_back(newSnapshotFile);
|
||||
metadata->durableSnapshotVersion.set(startVersion);
|
||||
// In restore mode, we couldn't dump snapshot because storage server doesn't have
|
||||
// data yet
|
||||
if (!bwData->isFullRestoreMode) {
|
||||
ASSERT(startState.previousDurableVersion == invalidVersion);
|
||||
BlobFileIndex fromFDB = wait(dumpInitialSnapshotFromFDB(
|
||||
bwData, bstore, metadata, startState.granuleID, cfKey, &inFlightPops));
|
||||
newSnapshotFile = fromFDB;
|
||||
ASSERT(startState.changeFeedStartVersion <= fromFDB.version);
|
||||
startVersion = newSnapshotFile.version;
|
||||
metadata->files.snapshotFiles.push_back(newSnapshotFile);
|
||||
metadata->durableSnapshotVersion.set(startVersion);
|
||||
|
||||
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
|
||||
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
|
||||
}
|
||||
}
|
||||
metadata->initialSnapshotVersion = startVersion;
|
||||
metadata->pendingSnapshotVersion = startVersion;
|
||||
|
@ -3984,6 +3988,17 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
|
|||
chunk.tenantPrefix = Optional<StringRef>(tenantPrefix.get());
|
||||
}
|
||||
|
||||
if (bwData->isFullRestoreMode && item.second.snapshotFiles.empty()) {
|
||||
CODE_PROBE(true, "empty snapshot file during restore");
|
||||
chunk.includedVersion = req.readVersion;
|
||||
rep.chunks.push_back(rep.arena, chunk);
|
||||
readThrough = chunk.keyRange.end;
|
||||
TraceEvent(SevDebug, "EmptyGranuleSnapshotFile", bwData->id)
|
||||
.detail("Chunk", chunk.keyRange)
|
||||
.detail("Version", req.readVersion);
|
||||
continue;
|
||||
}
|
||||
|
||||
int64_t deltaBytes = 0;
|
||||
item.second.getFiles(granuleBeginVersion,
|
||||
req.readVersion,
|
||||
|
|
|
@ -7786,6 +7786,16 @@ ACTOR Future<Void> tryGetRangeFromBlob(PromiseStream<RangeResult> results,
|
|||
state int i;
|
||||
for (i = 0; i < chunks.size(); ++i) {
|
||||
state KeyRangeRef chunkRange = chunks[i].keyRange;
|
||||
// Chunk is empty if no snapshot file. Skip it
|
||||
if (!chunks[i].snapshotFile.present()) {
|
||||
TraceEvent("SkipBlobChunk")
|
||||
.detail("Chunk", chunks[i].keyRange)
|
||||
.detail("Version", chunks[i].includedVersion);
|
||||
RangeResult rows;
|
||||
results.send(rows);
|
||||
rows.readThrough = KeyRef(rows.arena(), std::min(chunkRange.end, keys.end));
|
||||
continue;
|
||||
}
|
||||
state Reference<BlobConnectionProvider> blobConn = wait(loadBStoreForTenant(tenantData, chunkRange));
|
||||
state RangeResult rows = wait(readBlobGranule(chunks[i], keys, 0, fetchVersion, blobConn));
|
||||
|
||||
|
|
|
@ -200,7 +200,13 @@ struct BlobRestoreWorkload : TestWorkload {
|
|||
wait(verify(cx, self));
|
||||
|
||||
// Check if we can flush ranges after restore
|
||||
wait(killBlobWorkers(self->extraDb_));
|
||||
state ISimulator::KillType kt = ISimulator::KillType::RebootProcessAndSwitch;
|
||||
g_simulator->killAll(kt, true);
|
||||
g_simulator->toggleGlobalSwitchCluster();
|
||||
wait(delay(2));
|
||||
g_simulator->killAll(kt, true);
|
||||
g_simulator->toggleGlobalSwitchCluster();
|
||||
|
||||
wait(flushBlobRanges(self->extraDb_, self, {}));
|
||||
return Void();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue