rollback fixes

This commit is contained in:
Josh Slocum 2021-09-25 09:19:00 -05:00
parent fa1fe5f08b
commit 3b23d6aba5
1 changed files with 58 additions and 40 deletions

View File

@ -972,14 +972,19 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
if (metadata->currentDeltas[mIdx].version <= rollbackVersion) {
break;
}
for (auto& m : metadata->currentDeltas[mIdx].mutations) {
metadata->bufferedDeltaBytes -= m.totalSize();
}
mIdx--;
}
mIdx++;
if (BW_DEBUG) {
printf("[%s - %s) rollback discarding only %d in-memory mutations\n",
printf("[%s - %s) rollback discarding %d in-memory mutations, %d mutations and %lld bytes left\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
metadata->currentDeltas.size() - mIdx);
metadata->currentDeltas.size() - mIdx,
mIdx,
metadata->bufferedDeltaBytes);
}
metadata->currentDeltas.resize(metadata->deltaArena, mIdx);
@ -1029,7 +1034,6 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
state std::deque<std::pair<Version, Version>> rollbacksInProgress;
state std::deque<std::pair<Version, Version>> rollbacksCompleted;
state Optional<std::pair<Version, Version>> currentRollback;
state bool snapshotEligible; // just wrote a delta file or just took granule over from another worker
state bool justDidRollback = false;
@ -1201,17 +1205,22 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
if (metadata->bufferedDeltaBytes >= SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES &&
deltas.version > metadata->bufferedDeltaVersion.get()) {
if (BW_DEBUG) {
printf("Granule [%s - %s) flushing delta file after %d bytes @ %lld%s\n",
printf("Granule [%s - %s) flushing delta file after %d bytes @ %lld %lld%s\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
metadata->bufferedDeltaBytes,
metadata->bufferedDeltaVersion.get(),
deltas.version,
oldChangeFeedDataComplete.present() ? ". Finalizing " : "");
}
TraceEvent("BlobGranuleDeltaFile", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", metadata->bufferedDeltaVersion.get());
// sanity check for version order
ASSERT(metadata->bufferedDeltaVersion.get() >= metadata->currentDeltas.back().version);
ASSERT(metadata->pendingDeltaVersion < metadata->currentDeltas.front().version);
// launch pipelined, but wait for previous operation to complete before persisting to FDB
Future<BlobFileIndex> previousDeltaFileFuture;
if (inFlightBlobSnapshot.isValid() && inFlightDeltaFiles.empty()) {
@ -1236,6 +1245,9 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
oldChangeFeedDataComplete.reset();
// add new pending delta file
if (metadata->pendingDeltaVersion >= metadata->bufferedDeltaVersion.get()) {
printf("%lld >= %lld\n", metadata->pendingDeltaVersion, metadata->bufferedDeltaVersion.get());
}
ASSERT(metadata->pendingDeltaVersion < metadata->bufferedDeltaVersion.get());
metadata->pendingDeltaVersion = metadata->bufferedDeltaVersion.get();
metadata->bytesInNewDeltaFiles += metadata->bufferedDeltaBytes;
@ -1392,45 +1404,50 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
metadata->currentDeltas.back().version <= rollbackVersion)) {
if (BW_DEBUG) {
printf("BW could skip rollback completely\n");
printf("BW skipping rollback %lld -> %lld completely\n",
deltas.version,
rollbackVersion);
}
}
if (BW_DEBUG) {
printf("BW [%s - %s) ROLLBACK @ %lld -> %lld\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
deltas.version,
rollbackVersion);
TraceEvent(SevWarn, "GranuleRollback", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", deltas.version)
.detail("RollbackVersion", rollbackVersion);
}
Version cfRollbackVersion = doGranuleRollback(metadata,
deltas.version,
rollbackVersion,
inFlightDeltaFiles,
rollbacksInProgress,
rollbacksCompleted);
// reset change feeds
if (readOldChangeFeed) {
oldChangeFeedStream = PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>();
oldChangeFeedFuture = bwData->db->getChangeFeedStream(
oldChangeFeedStream,
oldCFKey.get(),
cfRollbackVersion + 1,
MAX_VERSION,
changeFeedInfo.granuleSplitFrom.get() /*metadata->keyRange*/);
} else {
changeFeedStream = PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>();
changeFeedFuture = bwData->db->getChangeFeedStream(
changeFeedStream, cfKey, cfRollbackVersion + 1, MAX_VERSION, metadata->keyRange);
if (BW_DEBUG) {
printf("BW [%s - %s) ROLLBACK @ %lld -> %lld\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
deltas.version,
rollbackVersion);
TraceEvent(SevWarn, "GranuleRollback", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", deltas.version)
.detail("RollbackVersion", rollbackVersion);
}
Version cfRollbackVersion = doGranuleRollback(metadata,
deltas.version,
rollbackVersion,
inFlightDeltaFiles,
rollbacksInProgress,
rollbacksCompleted);
// reset change feeds to cfRollbackVersion
if (readOldChangeFeed) {
oldChangeFeedStream =
PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>();
oldChangeFeedFuture = bwData->db->getChangeFeedStream(
oldChangeFeedStream,
oldCFKey.get(),
cfRollbackVersion + 1,
MAX_VERSION,
changeFeedInfo.granuleSplitFrom.get() /*metadata->keyRange*/);
} else {
changeFeedStream = PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>();
changeFeedFuture = bwData->db->getChangeFeedStream(changeFeedStream,
cfKey,
cfRollbackVersion + 1,
MAX_VERSION,
metadata->keyRange);
}
justDidRollback = true;
break;
}
justDidRollback = true;
currentRollback.reset();
break;
}
} else if (!rollbacksInProgress.empty() && rollbacksInProgress.front().first < deltas.version &&
rollbacksInProgress.front().second > deltas.version) {
@ -1474,6 +1491,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
metadata->bufferedDeltaVersion.get());
}
}
justDidRollback = false;
}
} catch (Error& e) {