Fixed a couple rollback issues and endianness of versionstamp version
This commit is contained in:
parent
0d1d1d7f9e
commit
3b711af061
|
@ -1249,7 +1249,8 @@ std::pair<BlobGranuleSplitState, Version> decodeBlobGranuleSplitValue(const Valu
|
|||
BinaryReader reader(value, IncludeVersion());
|
||||
reader >> st;
|
||||
reader >> v;
|
||||
return std::pair(st, v);
|
||||
|
||||
return std::pair(st, bigEndian64(v));
|
||||
}
|
||||
|
||||
const Key blobGranuleHistoryKeyFor(KeyRangeRef const& range, Version version) {
|
||||
|
|
|
@ -964,7 +964,7 @@ ACTOR Future<Void> handleCompletedDeltaFile(Reference<BlobWorkerData> bwData,
|
|||
BlobFileIndex completedDeltaFile,
|
||||
Key cfKey,
|
||||
Version cfStartVersion,
|
||||
std::deque<std::pair<Version, Version>> rollbacksInProgress) {
|
||||
std::deque<std::pair<Version, Version>> rollbacksCompleted) {
|
||||
metadata->files.deltaFiles.push_back(completedDeltaFile);
|
||||
ASSERT(metadata->durableDeltaVersion.get() < completedDeltaFile.version);
|
||||
metadata->durableDeltaVersion.set(completedDeltaFile.version);
|
||||
|
@ -980,8 +980,8 @@ ACTOR Future<Void> handleCompletedDeltaFile(Reference<BlobWorkerData> bwData,
|
|||
Future<Void> popFuture = bwData->db->popChangeFeedMutations(cfKey, completedDeltaFile.version);
|
||||
wait(popFuture);
|
||||
}
|
||||
while (!rollbacksInProgress.empty() && completedDeltaFile.version >= rollbacksInProgress.front().first) {
|
||||
rollbacksInProgress.pop_front();
|
||||
while (!rollbacksCompleted.empty() && completedDeltaFile.version >= rollbacksCompleted.front().first) {
|
||||
rollbacksCompleted.pop_front();
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
@ -1077,6 +1077,14 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
|
|||
// the callers pick this new one up for the next waitForVersion
|
||||
metadata->bufferedDeltaVersion = NotifiedVersion(cfRollbackVersion);
|
||||
|
||||
// Track that this rollback happened, since we have to re-read mutations up to the rollback
|
||||
// Add this rollback to in progress, and put all completed ones back in progress
|
||||
rollbacksInProgress.push_back(std::pair(rollbackVersion, mutationVersion));
|
||||
for (int i = rollbacksCompleted.size() - 1; i >= 0; i--) {
|
||||
rollbacksInProgress.push_front(rollbacksCompleted[i]);
|
||||
}
|
||||
rollbacksCompleted.clear();
|
||||
|
||||
} else {
|
||||
// No pending delta files to discard, just in-memory mutations
|
||||
|
||||
|
@ -1105,9 +1113,10 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
|
|||
|
||||
// delete all deltas in rollback range, but we can optimize here to just skip the uncommitted mutations
|
||||
// directly and immediately pop the rollback out of inProgress
|
||||
// TODO: bufferedDeltaVersion is of type Notified so you must set it to something greater than the previous val
|
||||
// This does not hold true for rollback versions.
|
||||
metadata->bufferedDeltaVersion.set(rollbackVersion);
|
||||
|
||||
// Create new notified version so it doesn't go backwards. Do this before signaling the rollback counter so that
|
||||
// the callers pick this new one up for the next waitForVersion
|
||||
metadata->bufferedDeltaVersion = NotifiedVersion(rollbackVersion);
|
||||
cfRollbackVersion = mutationVersion;
|
||||
}
|
||||
|
||||
|
@ -1120,13 +1129,6 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
|
|||
|
||||
metadata->rollbackCount.set(metadata->rollbackCount.get() + 1);
|
||||
|
||||
// add this rollback to in progress, and put all completed ones back in progress
|
||||
rollbacksInProgress.push_back(std::pair(rollbackVersion, mutationVersion));
|
||||
for (int i = rollbacksCompleted.size() - 1; i >= 0; i--) {
|
||||
rollbacksInProgress.push_front(rollbacksCompleted[i]);
|
||||
}
|
||||
rollbacksCompleted.clear();
|
||||
|
||||
return cfRollbackVersion;
|
||||
}
|
||||
|
||||
|
@ -1346,11 +1348,14 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
|
|||
br >> rollbackVersion;
|
||||
|
||||
ASSERT(rollbackVersion >= metadata->durableDeltaVersion.get());
|
||||
ASSERT(rollbackVersion >= committedVersion.get());
|
||||
|
||||
if (!rollbacksInProgress.empty()) {
|
||||
ASSERT(rollbacksInProgress.front().first == rollbackVersion);
|
||||
ASSERT(rollbacksInProgress.front().second == deltas.version);
|
||||
printf("Passed rollback %lld -> %lld\n", deltas.version, rollbackVersion);
|
||||
if (BW_DEBUG) {
|
||||
printf("Passed rollback %lld -> %lld\n", deltas.version, rollbackVersion);
|
||||
}
|
||||
rollbacksCompleted.push_back(rollbacksInProgress.front());
|
||||
rollbacksInProgress.pop_front();
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue