removed incorrect assert for now

This commit is contained in:
Josh Slocum 2021-12-09 17:06:20 -06:00
parent 8dc5f79dc7
commit c95c93b527
1 changed files with 106 additions and 103 deletions

View File

@ -1391,123 +1391,126 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
// same version
ASSERT(deltas.version > metadata->bufferedDeltaVersion);
ASSERT(deltas.version > lastDeltaVersion);
ASSERT(!deltas.mutations.empty());
if (deltas.mutations.size() == 1 && deltas.mutations.back().param1 == lastEpochEndPrivateKey) {
// Note rollbackVerision is durable, [rollbackVersion+1 - deltas.version] needs to be tossed
// For correctness right now, there can be no waits and yields either in rollback handling
// or in handleBlobGranuleFileRequest once waitForVersion has succeeded, otherwise this will
// race and clobber results
Version rollbackVersion;
BinaryReader br(deltas.mutations[0].param2, Unversioned());
br >> rollbackVersion;
// FIXME: this assert isn't true - why
// ASSERT(!deltas.mutations.empty());
if (!deltas.mutations.empty()) {
if (deltas.mutations.size() == 1 && deltas.mutations.back().param1 == lastEpochEndPrivateKey) {
// Note rollbackVerision is durable, [rollbackVersion+1 - deltas.version] needs to be tossed
// For correctness right now, there can be no waits and yields either in rollback handling
// or in handleBlobGranuleFileRequest once waitForVersion has succeeded, otherwise this will
// race and clobber results
Version rollbackVersion;
BinaryReader br(deltas.mutations[0].param2, Unversioned());
br >> rollbackVersion;
ASSERT(rollbackVersion >= metadata->durableDeltaVersion.get());
ASSERT(rollbackVersion >= committedVersion.get());
if (!rollbacksInProgress.empty()) {
ASSERT(rollbacksInProgress.front().first == rollbackVersion);
ASSERT(rollbacksInProgress.front().second == deltas.version);
if (BW_DEBUG) {
fmt::print("Passed rollback {0} -> {1}\n", deltas.version, rollbackVersion);
}
rollbacksCompleted.push_back(rollbacksInProgress.front());
rollbacksInProgress.pop_front();
} else {
// FIXME: add counter for granule rollbacks and rollbacks skipped?
// explicitly check last delta in currentDeltas because lastVersion and
// bufferedDeltaVersion include empties
if (metadata->pendingDeltaVersion <= rollbackVersion &&
(metadata->currentDeltas.empty() ||
metadata->currentDeltas.back().version <= rollbackVersion)) {
ASSERT(rollbackVersion >= metadata->durableDeltaVersion.get());
ASSERT(rollbackVersion >= committedVersion.get());
if (!rollbacksInProgress.empty()) {
ASSERT(rollbacksInProgress.front().first == rollbackVersion);
ASSERT(rollbacksInProgress.front().second == deltas.version);
if (BW_DEBUG) {
fmt::print("BW skipping rollback {0} -> {1} completely\n",
deltas.version,
rollbackVersion);
fmt::print("Passed rollback {0} -> {1}\n", deltas.version, rollbackVersion);
}
rollbacksCompleted.push_back(rollbacksInProgress.front());
rollbacksInProgress.pop_front();
} else {
if (BW_DEBUG) {
fmt::print("BW [{0} - {1}) ROLLBACK @ {2} -> {3}\n",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
deltas.version,
rollbackVersion);
TraceEvent(SevWarn, "GranuleRollback", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", deltas.version)
.detail("RollbackVersion", rollbackVersion);
}
Version cfRollbackVersion = doGranuleRollback(metadata,
deltas.version,
rollbackVersion,
inFlightFiles,
rollbacksInProgress,
rollbacksCompleted);
// Reset change feeds to cfRollbackVersion
metadata->activeCFData.set(newChangeFeedData(cfRollbackVersion));
if (readOldChangeFeed) {
// It shouldn't be possible to roll back across the parent/child feed boundary,
// because the transaction creating the child change feed had to commit before
// we got here.
ASSERT(cfRollbackVersion < startState.changeFeedStartVersion);
oldChangeFeedFuture =
bwData->db->getChangeFeedStream(metadata->activeCFData.get(),
oldCFKey.get(),
cfRollbackVersion + 1,
startState.changeFeedStartVersion,
metadata->keyRange);
// FIXME: add counter for granule rollbacks and rollbacks skipped?
// explicitly check last delta in currentDeltas because lastVersion and
// bufferedDeltaVersion include empties
if (metadata->pendingDeltaVersion <= rollbackVersion &&
(metadata->currentDeltas.empty() ||
metadata->currentDeltas.back().version <= rollbackVersion)) {
if (BW_DEBUG) {
fmt::print("BW skipping rollback {0} -> {1} completely\n",
deltas.version,
rollbackVersion);
}
} else {
ASSERT(cfRollbackVersion > startState.changeFeedStartVersion);
if (BW_DEBUG) {
fmt::print("BW [{0} - {1}) ROLLBACK @ {2} -> {3}\n",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
deltas.version,
rollbackVersion);
TraceEvent(SevWarn, "GranuleRollback", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", deltas.version)
.detail("RollbackVersion", rollbackVersion);
}
changeFeedFuture = bwData->db->getChangeFeedStream(metadata->activeCFData.get(),
cfKey,
cfRollbackVersion + 1,
MAX_VERSION,
metadata->keyRange);
Version cfRollbackVersion = doGranuleRollback(metadata,
deltas.version,
rollbackVersion,
inFlightFiles,
rollbacksInProgress,
rollbacksCompleted);
// Reset change feeds to cfRollbackVersion
metadata->activeCFData.set(newChangeFeedData(cfRollbackVersion));
if (readOldChangeFeed) {
// It shouldn't be possible to roll back across the parent/child feed boundary,
// because the transaction creating the child change feed had to commit before
// we got here.
ASSERT(cfRollbackVersion < startState.changeFeedStartVersion);
oldChangeFeedFuture =
bwData->db->getChangeFeedStream(metadata->activeCFData.get(),
oldCFKey.get(),
cfRollbackVersion + 1,
startState.changeFeedStartVersion,
metadata->keyRange);
} else {
ASSERT(cfRollbackVersion > startState.changeFeedStartVersion);
changeFeedFuture = bwData->db->getChangeFeedStream(metadata->activeCFData.get(),
cfKey,
cfRollbackVersion + 1,
MAX_VERSION,
metadata->keyRange);
}
justDidRollback = true;
break;
}
justDidRollback = true;
break;
}
}
} else if (!rollbacksInProgress.empty() && rollbacksInProgress.front().first < deltas.version &&
rollbacksInProgress.front().second > deltas.version) {
if (BW_DEBUG) {
fmt::print("Skipping mutations @ {} b/c prior rollback\n", deltas.version);
}
} else {
for (auto& delta : deltas.mutations) {
metadata->bufferedDeltaBytes += delta.totalSize();
bwData->stats.changeFeedInputBytes += delta.totalSize();
bwData->stats.mutationBytesBuffered += delta.totalSize();
} else if (!rollbacksInProgress.empty() && rollbacksInProgress.front().first < deltas.version &&
rollbacksInProgress.front().second > deltas.version) {
if (BW_DEBUG) {
fmt::print("Skipping mutations @ {} b/c prior rollback\n", deltas.version);
}
} else {
for (auto& delta : deltas.mutations) {
metadata->bufferedDeltaBytes += delta.totalSize();
bwData->stats.changeFeedInputBytes += delta.totalSize();
bwData->stats.mutationBytesBuffered += delta.totalSize();
DEBUG_MUTATION("BlobWorkerBuffer", deltas.version, delta, bwData->id)
.detail("Granule", metadata->keyRange)
.detail("ChangeFeedID", readOldChangeFeed ? oldCFKey.get() : cfKey)
.detail("OldChangeFeed", readOldChangeFeed ? "T" : "F");
}
if (DEBUG_BW_VERSION(deltas.version)) {
fmt::print("BW {0}: ({1}), KCV={2}\n",
deltas.version,
deltas.mutations.size(),
deltas.knownCommittedVersion);
}
metadata->currentDeltas.push_back_deep(metadata->deltaArena, deltas);
DEBUG_MUTATION("BlobWorkerBuffer", deltas.version, delta, bwData->id)
.detail("Granule", metadata->keyRange)
.detail("ChangeFeedID", readOldChangeFeed ? oldCFKey.get() : cfKey)
.detail("OldChangeFeed", readOldChangeFeed ? "T" : "F");
}
if (DEBUG_BW_VERSION(deltas.version)) {
fmt::print("BW {0}: ({1}), KCV={2}\n",
deltas.version,
deltas.mutations.size(),
deltas.knownCommittedVersion);
}
metadata->currentDeltas.push_back_deep(metadata->deltaArena, deltas);
processedAnyMutations = true;
ASSERT(deltas.version != invalidVersion);
ASSERT(deltas.version > lastDeltaVersion);
lastDeltaVersion = deltas.version;
processedAnyMutations = true;
ASSERT(deltas.version != invalidVersion);
ASSERT(deltas.version > lastDeltaVersion);
lastDeltaVersion = deltas.version;
Version nextKnownNoRollbacksPast = std::min(deltas.version, deltas.knownCommittedVersion);
ASSERT(nextKnownNoRollbacksPast >= knownNoRollbacksPast ||
nextKnownNoRollbacksPast == invalidVersion);
Version nextKnownNoRollbacksPast = std::min(deltas.version, deltas.knownCommittedVersion);
ASSERT(nextKnownNoRollbacksPast >= knownNoRollbacksPast ||
nextKnownNoRollbacksPast == invalidVersion);
if (nextKnownNoRollbacksPast != invalidVersion) {
knownNoRollbacksPast = nextKnownNoRollbacksPast;
if (nextKnownNoRollbacksPast != invalidVersion) {
knownNoRollbacksPast = nextKnownNoRollbacksPast;
}
}
}
if (justDidRollback) {