Fixed granule purging bug and improved debugging for purging
This commit is contained in:
parent
ee1b0cdd43
commit
af60e2ea32
|
@ -143,30 +143,34 @@ bool compareFDBAndBlob(RangeResult fdb,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
printf("Chunks:\n");
|
printGranuleChunks(blob.second);
|
||||||
for (auto& chunk : blob.second) {
|
|
||||||
printf("[%s - %s)\n", chunk.keyRange.begin.printable().c_str(), chunk.keyRange.end.printable().c_str());
|
|
||||||
|
|
||||||
printf(" SnapshotFile:\n %s\n",
|
|
||||||
chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : "<none>");
|
|
||||||
printf(" DeltaFiles:\n");
|
|
||||||
for (auto& df : chunk.deltaFiles) {
|
|
||||||
printf(" %s\n", df.toString().c_str());
|
|
||||||
}
|
|
||||||
printf(" Deltas: (%d)", chunk.newDeltas.size());
|
|
||||||
if (chunk.newDeltas.size() > 0) {
|
|
||||||
fmt::print(" with version [{0} - {1}]",
|
|
||||||
chunk.newDeltas[0].version,
|
|
||||||
chunk.newDeltas[chunk.newDeltas.size() - 1].version);
|
|
||||||
}
|
|
||||||
fmt::print(" IncludedVersion: {}\n", chunk.includedVersion);
|
|
||||||
}
|
|
||||||
printf("\n");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return correct;
|
return correct;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void printGranuleChunks(const Standalone<VectorRef<BlobGranuleChunkRef>>& chunks) {
|
||||||
|
printf("Chunks:\n");
|
||||||
|
for (auto& chunk : chunks) {
|
||||||
|
printf("[%s - %s)\n", chunk.keyRange.begin.printable().c_str(), chunk.keyRange.end.printable().c_str());
|
||||||
|
|
||||||
|
printf(" SnapshotFile:\n %s\n",
|
||||||
|
chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : "<none>");
|
||||||
|
printf(" DeltaFiles:\n");
|
||||||
|
for (auto& df : chunk.deltaFiles) {
|
||||||
|
printf(" %s\n", df.toString().c_str());
|
||||||
|
}
|
||||||
|
printf(" Deltas: (%d)", chunk.newDeltas.size());
|
||||||
|
if (chunk.newDeltas.size() > 0) {
|
||||||
|
fmt::print(" with version [{0} - {1}]",
|
||||||
|
chunk.newDeltas[0].version,
|
||||||
|
chunk.newDeltas[chunk.newDeltas.size() - 1].version);
|
||||||
|
}
|
||||||
|
fmt::print(" IncludedVersion: {}\n", chunk.includedVersion);
|
||||||
|
}
|
||||||
|
printf("\n");
|
||||||
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range) {
|
ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range) {
|
||||||
// clear key range and check whether it is merged or not, repeatedly
|
// clear key range and check whether it is merged or not, repeatedly
|
||||||
state Transaction tr(cx);
|
state Transaction tr(cx);
|
||||||
|
|
|
@ -52,6 +52,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define BM_DEBUG false
|
#define BM_DEBUG false
|
||||||
|
#define BM_PURGE_DEBUG false
|
||||||
|
|
||||||
void handleClientBlobRange(KeyRangeMap<bool>* knownBlobRanges,
|
void handleClientBlobRange(KeyRangeMap<bool>* knownBlobRanges,
|
||||||
Arena& ar,
|
Arena& ar,
|
||||||
|
@ -3168,8 +3169,8 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
|
||||||
Key historyKey,
|
Key historyKey,
|
||||||
Version purgeVersion,
|
Version purgeVersion,
|
||||||
KeyRange granuleRange) {
|
KeyRange granuleRange) {
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("Fully deleting granule {0}: init\n", granuleId.toString());
|
fmt::print("BM {0} Fully deleting granule {1}: init\n", self->epoch, granuleId.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
// if granule is still splitting and files are needed for new sub-granules to re-snapshot, we can only partially
|
// if granule is still splitting and files are needed for new sub-granules to re-snapshot, we can only partially
|
||||||
|
@ -3195,8 +3196,8 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
|
||||||
filesToDelete.emplace_back(fname);
|
filesToDelete.emplace_back(fname);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("Fully deleting granule {0}: deleting {1} files\n", granuleId.toString(), filesToDelete.size());
|
fmt::print("BM {0} Fully deleting granule {1}: deleting {2} files\n", self->epoch, granuleId.toString(), filesToDelete.size());
|
||||||
for (auto filename : filesToDelete) {
|
for (auto filename : filesToDelete) {
|
||||||
fmt::print(" - {}\n", filename.c_str());
|
fmt::print(" - {}\n", filename.c_str());
|
||||||
}
|
}
|
||||||
|
@ -3209,8 +3210,8 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
|
||||||
wait(waitForAll(deletions));
|
wait(waitForAll(deletions));
|
||||||
|
|
||||||
// delete metadata in FDB (history entry and file keys)
|
// delete metadata in FDB (history entry and file keys)
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("Fully deleting granule {0}: deleting history and file keys\n", granuleId.toString());
|
fmt::print("BM {0} Fully deleting granule {1}: deleting history and file keys\n", self->epoch, granuleId.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
state Transaction tr(self->db);
|
state Transaction tr(self->db);
|
||||||
|
@ -3229,8 +3230,8 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("Fully deleting granule {0}: success\n", granuleId.toString());
|
fmt::print("BM {0} Fully deleting granule {1}: success\n", self->epoch, granuleId.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
TraceEvent("GranuleFullPurge", self->id)
|
TraceEvent("GranuleFullPurge", self->id)
|
||||||
|
@ -3259,8 +3260,8 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
|
||||||
UID granuleId,
|
UID granuleId,
|
||||||
Version purgeVersion,
|
Version purgeVersion,
|
||||||
KeyRange granuleRange) {
|
KeyRange granuleRange) {
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("Partially deleting granule {0}: init\n", granuleId.toString());
|
fmt::print("BM {0} Partially deleting granule {1}: init\n", self->epoch, granuleId.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
state Reference<BlobConnectionProvider> bstore = wait(getBStoreForGranule(self, granuleRange));
|
state Reference<BlobConnectionProvider> bstore = wait(getBStoreForGranule(self, granuleRange));
|
||||||
|
@ -3309,8 +3310,8 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
|
||||||
filesToDelete.emplace_back(fname);
|
filesToDelete.emplace_back(fname);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("Partially deleting granule {0}: deleting {1} files\n", granuleId.toString(), filesToDelete.size());
|
fmt::print("BM {0} Partially deleting granule {1}: deleting {2} files\n", self->epoch, granuleId.toString(), filesToDelete.size());
|
||||||
for (auto filename : filesToDelete) {
|
for (auto filename : filesToDelete) {
|
||||||
fmt::print(" - {0}\n", filename);
|
fmt::print(" - {0}\n", filename);
|
||||||
}
|
}
|
||||||
|
@ -3327,8 +3328,8 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
|
||||||
wait(waitForAll(deletions));
|
wait(waitForAll(deletions));
|
||||||
|
|
||||||
// delete metadata in FDB (deleted file keys)
|
// delete metadata in FDB (deleted file keys)
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("Partially deleting granule {0}: deleting file keys\n", granuleId.toString());
|
fmt::print("BM {0} Partially deleting granule {1}: deleting file keys\n", self->epoch, granuleId.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
state Transaction tr(self->db);
|
state Transaction tr(self->db);
|
||||||
|
@ -3347,8 +3348,8 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("Partially deleting granule {0}: success\n", granuleId.toString());
|
fmt::print("BM {0} Partially deleting granule {1}: success\n", self->epoch, granuleId.toString());
|
||||||
}
|
}
|
||||||
TraceEvent("GranulePartialPurge", self->id)
|
TraceEvent("GranulePartialPurge", self->id)
|
||||||
.detail("Epoch", self->epoch)
|
.detail("Epoch", self->epoch)
|
||||||
|
@ -3373,8 +3374,9 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
|
||||||
* processing this purge intent.
|
* processing this purge intent.
|
||||||
*/
|
*/
|
||||||
ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range, Version purgeVersion, bool force) {
|
ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range, Version purgeVersion, bool force) {
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("purgeRange starting for range [{0} - {1}) @ purgeVersion={2}, force={3}\n",
|
fmt::print("BM {0} purgeRange starting for range [{1} - {2}) @ purgeVersion={3}, force={4}\n",
|
||||||
|
self->epoch,
|
||||||
range.begin.printable(),
|
range.begin.printable(),
|
||||||
range.end.printable(),
|
range.end.printable(),
|
||||||
purgeVersion,
|
purgeVersion,
|
||||||
|
@ -3396,7 +3398,7 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
||||||
|
|
||||||
// track which granules we have already added to traversal
|
// track which granules we have already added to traversal
|
||||||
// note: (startKey, startVersion) uniquely identifies a granule
|
// note: (startKey, startVersion) uniquely identifies a granule
|
||||||
state std::unordered_set<std::pair<const uint8_t*, Version>, boost::hash<std::pair<const uint8_t*, Version>>>
|
state std::unordered_set<std::pair<std::string, Version>, boost::hash<std::pair<std::string, Version>>>
|
||||||
visited;
|
visited;
|
||||||
|
|
||||||
// find all active granules (that comprise the range) and add to the queue
|
// find all active granules (that comprise the range) and add to the queue
|
||||||
|
@ -3408,8 +3410,9 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
||||||
|
|
||||||
state KeyRangeMap<UID>::iterator activeRange;
|
state KeyRangeMap<UID>::iterator activeRange;
|
||||||
for (activeRange = activeRanges.begin(); activeRange != activeRanges.end(); ++activeRange) {
|
for (activeRange = activeRanges.begin(); activeRange != activeRanges.end(); ++activeRange) {
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("Checking if active range [{0} - {1}), owned by BW {2}, should be purged\n",
|
fmt::print("BM {0} Checking if active range [{1} - {2}), owned by BW {3}, should be purged\n",
|
||||||
|
self->epoch,
|
||||||
activeRange.begin().printable(),
|
activeRange.begin().printable(),
|
||||||
activeRange.end().printable(),
|
activeRange.end().printable(),
|
||||||
activeRange.value().toString());
|
activeRange.value().toString());
|
||||||
|
@ -3417,6 +3420,7 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
||||||
|
|
||||||
// assumption: purge boundaries must respect granule boundaries
|
// assumption: purge boundaries must respect granule boundaries
|
||||||
if (activeRange.begin() < range.begin || activeRange.end() > range.end) {
|
if (activeRange.begin() < range.begin || activeRange.end() > range.end) {
|
||||||
|
TraceEvent(SevWarn, "GranulePurgeRangesUnaligned", self->id).detail("Epoch", self->epoch).detail("PurgeRange", range).detail("GranuleRange", activeRange.range());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3426,20 +3430,24 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
try {
|
try {
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("Fetching latest history entry for range [{0} - {1})\n",
|
fmt::print("BM {0} Fetching latest history entry for range [{1} - {2})\n",
|
||||||
|
self->epoch,
|
||||||
activeRange.begin().printable(),
|
activeRange.begin().printable(),
|
||||||
activeRange.end().printable());
|
activeRange.end().printable());
|
||||||
}
|
}
|
||||||
|
// FIXME: doing this serially will likely be too slow for large purges
|
||||||
Optional<GranuleHistory> history = wait(getLatestGranuleHistory(&tr, activeRange.range()));
|
Optional<GranuleHistory> history = wait(getLatestGranuleHistory(&tr, activeRange.range()));
|
||||||
// TODO: can we tell from the krm that this range is not valid, so that we don't need to do a
|
// TODO: can we tell from the krm that this range is not valid, so that we don't need to do a
|
||||||
// get
|
// get
|
||||||
if (history.present()) {
|
if (history.present()) {
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
printf("Adding range to history queue\n");
|
fmt::print("BM {0} Adding range to history queue: [{1} - {2}) @ {3} ({4})\n", self->epoch, activeRange.begin().printable(), activeRange.end().printable(), history.get().version, (void*)(activeRange.range().begin.begin()));
|
||||||
}
|
}
|
||||||
visited.insert({ activeRange.range().begin.begin(), history.get().version });
|
visited.insert({ activeRange.range().begin.toString(), history.get().version });
|
||||||
historyEntryQueue.push({ activeRange.range(), history.get().version, MAX_VERSION });
|
historyEntryQueue.push({ activeRange.range(), history.get().version, MAX_VERSION });
|
||||||
|
} else if (BM_PURGE_DEBUG) {
|
||||||
|
fmt::print("BM {0} No history for range, ignoring\n", self->epoch);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
|
@ -3448,8 +3456,8 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
printf("Beginning BFS traversal of history\n");
|
fmt::print("BM {0} Beginning BFS traversal of {1} history items for range [{2} - {3}) \n", self->epoch, historyEntryQueue.size(), range.begin.printable(), range.end.printable());
|
||||||
}
|
}
|
||||||
while (!historyEntryQueue.empty()) {
|
while (!historyEntryQueue.empty()) {
|
||||||
// process the node at the front of the queue and remove it
|
// process the node at the front of the queue and remove it
|
||||||
|
@ -3459,8 +3467,9 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
||||||
std::tie(currRange, startVersion, endVersion) = historyEntryQueue.front();
|
std::tie(currRange, startVersion, endVersion) = historyEntryQueue.front();
|
||||||
historyEntryQueue.pop();
|
historyEntryQueue.pop();
|
||||||
|
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("Processing history node [{0} - {1}) with versions [{2}, {3})\n",
|
fmt::print("BM {0} Processing history node [{1} - {2}) with versions [{3}, {4})\n",
|
||||||
|
self->epoch,
|
||||||
currRange.begin.printable(),
|
currRange.begin.printable(),
|
||||||
currRange.end.printable(),
|
currRange.end.printable(),
|
||||||
startVersion,
|
startVersion,
|
||||||
|
@ -3485,12 +3494,15 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!foundHistory) {
|
if (!foundHistory) {
|
||||||
|
if (BM_PURGE_DEBUG) {
|
||||||
|
fmt::print("BM {0} No history for this node, skipping\n", self->epoch);
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("Found history entry for this node. It's granuleID is {0}\n",
|
fmt::print("BM {0} Found history entry for this node. It's granuleID is {1}\n",
|
||||||
currHistoryNode.granuleID.toString());
|
self->epoch, currHistoryNode.granuleID.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
// There are three cases this granule can fall into:
|
// There are three cases this granule can fall into:
|
||||||
|
@ -3500,33 +3512,38 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
||||||
// and so this granule should be partially deleted
|
// and so this granule should be partially deleted
|
||||||
// - otherwise, this granule is active, so don't schedule it for deletion
|
// - otherwise, this granule is active, so don't schedule it for deletion
|
||||||
if (force || endVersion <= purgeVersion) {
|
if (force || endVersion <= purgeVersion) {
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("Granule {0} will be FULLY deleted\n", currHistoryNode.granuleID.toString());
|
fmt::print("BM {0} Granule {1} will be FULLY deleted\n", self->epoch, currHistoryNode.granuleID.toString());
|
||||||
}
|
}
|
||||||
toFullyDelete.push_back({ currHistoryNode.granuleID, historyKey, currRange });
|
toFullyDelete.push_back({ currHistoryNode.granuleID, historyKey, currRange });
|
||||||
} else if (startVersion < purgeVersion) {
|
} else if (startVersion < purgeVersion) {
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("Granule {0} will be partially deleted\n", currHistoryNode.granuleID.toString());
|
fmt::print("BM {0} Granule {1} will be partially deleted\n", self->epoch, currHistoryNode.granuleID.toString());
|
||||||
}
|
}
|
||||||
toPartiallyDelete.push_back({ currHistoryNode.granuleID, currRange });
|
toPartiallyDelete.push_back({ currHistoryNode.granuleID, currRange });
|
||||||
}
|
}
|
||||||
|
|
||||||
// add all of the node's parents to the queue
|
// add all of the node's parents to the queue
|
||||||
|
if (BM_PURGE_DEBUG) {
|
||||||
|
fmt::print("BM {0} Checking {1} parents\n", self->epoch, currHistoryNode.parentVersions.size());
|
||||||
|
}
|
||||||
for (int i = 0; i < currHistoryNode.parentVersions.size(); i++) {
|
for (int i = 0; i < currHistoryNode.parentVersions.size(); i++) {
|
||||||
// for (auto& parent : currHistoryNode.parentVersions.size()) {
|
// for (auto& parent : currHistoryNode.parentVersions.size()) {
|
||||||
// if we already added this node to queue, skip it; otherwise, mark it as visited
|
// if we already added this node to queue, skip it; otherwise, mark it as visited
|
||||||
KeyRangeRef parentRange(currHistoryNode.parentBoundaries[i], currHistoryNode.parentBoundaries[i + 1]);
|
KeyRangeRef parentRange(currHistoryNode.parentBoundaries[i], currHistoryNode.parentBoundaries[i + 1]);
|
||||||
Version parentVersion = currHistoryNode.parentVersions[i];
|
Version parentVersion = currHistoryNode.parentVersions[i];
|
||||||
if (visited.count({ parentRange.begin.begin(), parentVersion })) {
|
std::string beginStr = parentRange.begin.toString();
|
||||||
if (BM_DEBUG) {
|
if (!visited.insert({ beginStr, parentVersion }).second) {
|
||||||
fmt::print("Already added {0} to queue, so skipping it\n", currHistoryNode.granuleID.toString());
|
if (BM_PURGE_DEBUG) {
|
||||||
|
fmt::print("BM {0} Already added [{1} - {2}) @ {3} - {4} to queue, so skipping it\n", self->epoch, parentRange.begin.printable(),
|
||||||
|
parentRange.end.printable(), parentVersion, startVersion);
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
visited.insert({ parentRange.begin.begin(), parentVersion });
|
|
||||||
|
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("Adding parent [{0} - {1}) with versions [{2} - {3}) to queue\n",
|
fmt::print("BM {0} Adding parent [{1} - {2}) @ {3} - {4} to queue\n",
|
||||||
|
self->epoch,
|
||||||
parentRange.begin.printable(),
|
parentRange.begin.printable(),
|
||||||
parentRange.end.printable(),
|
parentRange.end.printable(),
|
||||||
parentVersion,
|
parentVersion,
|
||||||
|
@ -3554,10 +3571,19 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
||||||
// we won't run into any issues with trying to "re-delete" a blob file since deleting
|
// we won't run into any issues with trying to "re-delete" a blob file since deleting
|
||||||
// a file that doesn't exist is considered successful
|
// a file that doesn't exist is considered successful
|
||||||
|
|
||||||
|
TraceEvent("PurgeGranulesTraversalComplete", self->id)
|
||||||
|
.detail("Epoch", self->epoch)
|
||||||
|
.detail("Range", range)
|
||||||
|
.detail("PurgeVersion", purgeVersion)
|
||||||
|
.detail("Force", force)
|
||||||
|
.detail("VisitedCount", visited.size())
|
||||||
|
.detail("DeletingFullyCount", toFullyDelete.size())
|
||||||
|
.detail("DeletingPartiallyCount", toPartiallyDelete.size());
|
||||||
|
|
||||||
state std::vector<Future<Void>> partialDeletions;
|
state std::vector<Future<Void>> partialDeletions;
|
||||||
state int i;
|
state int i;
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("{0} granules to fully delete\n", toFullyDelete.size());
|
fmt::print("BM {0}: {1} granules to fully delete\n", self->epoch, toFullyDelete.size());
|
||||||
}
|
}
|
||||||
for (i = toFullyDelete.size() - 1; i >= 0; --i) {
|
for (i = toFullyDelete.size() - 1; i >= 0; --i) {
|
||||||
state UID granuleId;
|
state UID granuleId;
|
||||||
|
@ -3565,22 +3591,22 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
||||||
KeyRange keyRange;
|
KeyRange keyRange;
|
||||||
std::tie(granuleId, historyKey, keyRange) = toFullyDelete[i];
|
std::tie(granuleId, historyKey, keyRange) = toFullyDelete[i];
|
||||||
// FIXME: consider batching into a single txn (need to take care of txn size limit)
|
// FIXME: consider batching into a single txn (need to take care of txn size limit)
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("About to fully delete granule {0}\n", granuleId.toString());
|
fmt::print("BM {0}: About to fully delete granule {1}\n", self->epoch, granuleId.toString());
|
||||||
}
|
}
|
||||||
wait(fullyDeleteGranule(self, granuleId, historyKey, purgeVersion, range));
|
wait(fullyDeleteGranule(self, granuleId, historyKey, purgeVersion, range));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("{0} granules to partially delete\n", toPartiallyDelete.size());
|
fmt::print("BM {0}: {1} granules to partially delete\n", self->epoch, toPartiallyDelete.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
for (i = toPartiallyDelete.size() - 1; i >= 0; --i) {
|
for (i = toPartiallyDelete.size() - 1; i >= 0; --i) {
|
||||||
UID granuleId;
|
UID granuleId;
|
||||||
KeyRange range;
|
KeyRange range;
|
||||||
std::tie(granuleId, range) = toPartiallyDelete[i];
|
std::tie(granuleId, range) = toPartiallyDelete[i];
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("About to partially delete granule {0}\n", granuleId.toString());
|
fmt::print("BM {0}: About to partially delete granule {1}\n", self->epoch, granuleId.toString());
|
||||||
}
|
}
|
||||||
partialDeletions.emplace_back(partiallyDeleteGranule(self, granuleId, purgeVersion, range));
|
partialDeletions.emplace_back(partiallyDeleteGranule(self, granuleId, purgeVersion, range));
|
||||||
}
|
}
|
||||||
|
@ -3592,8 +3618,9 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
||||||
// another purgeIntent that got written for this table while we were processing this one.
|
// another purgeIntent that got written for this table while we were processing this one.
|
||||||
// If that is the case, we should not clear the key. Otherwise, we can just clear the key.
|
// If that is the case, we should not clear the key. Otherwise, we can just clear the key.
|
||||||
|
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("Successfully purged range [{0} - {1}) at purgeVersion={2}\n",
|
fmt::print("BM {0}: Successfully purged range [{1} - {2}) at purgeVersion={3}\n",
|
||||||
|
self->epoch,
|
||||||
range.begin.printable(),
|
range.begin.printable(),
|
||||||
range.end.printable(),
|
range.end.printable(),
|
||||||
purgeVersion);
|
purgeVersion);
|
||||||
|
@ -3679,8 +3706,9 @@ ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
|
||||||
}
|
}
|
||||||
purgeMap.insert(range, std::make_pair(purgeVersion, force));
|
purgeMap.insert(range, std::make_pair(purgeVersion, force));
|
||||||
|
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
fmt::print("about to purge range [{0} - {1}) @ {2}, force={3}\n",
|
fmt::print("BM {0} about to purge range [{1} - {2}) @ {3}, force={4}\n",
|
||||||
|
self->epoch,
|
||||||
range.begin.printable(),
|
range.begin.printable(),
|
||||||
range.end.printable(),
|
range.end.printable(),
|
||||||
purgeVersion,
|
purgeVersion,
|
||||||
|
@ -3732,8 +3760,8 @@ ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (BM_DEBUG) {
|
if (BM_PURGE_DEBUG) {
|
||||||
printf("Done clearing current set of purge intents.\n");
|
fmt::print("BM {0} Done clearing current set of purge intents.\n", self->epoch);
|
||||||
}
|
}
|
||||||
|
|
||||||
CODE_PROBE(true, "BM finished processing purge intents");
|
CODE_PROBE(true, "BM finished processing purge intents");
|
||||||
|
|
|
@ -51,6 +51,8 @@ bool compareFDBAndBlob(RangeResult fdb,
|
||||||
Version v,
|
Version v,
|
||||||
bool debug);
|
bool debug);
|
||||||
|
|
||||||
|
void printGranuleChunks(const Standalone<VectorRef<BlobGranuleChunkRef>>& chunks);
|
||||||
|
|
||||||
ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range);
|
ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range);
|
||||||
|
|
||||||
#include "flow/unactorcompiler.h"
|
#include "flow/unactorcompiler.h"
|
||||||
|
|
|
@ -309,20 +309,30 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
||||||
// reading older than the purge version
|
// reading older than the purge version
|
||||||
if (doPurging) {
|
if (doPurging) {
|
||||||
wait(self->killBlobWorkers(cx, self));
|
wait(self->killBlobWorkers(cx, self));
|
||||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead =
|
if (BGV_DEBUG) {
|
||||||
|
fmt::print("BGV Reading post-purge [{0} - {1}) @ {2}\n", oldRead.range.begin.printable(), oldRead.range.end.printable(), prevPurgeVersion);
|
||||||
|
}
|
||||||
|
// ensure purge version exactly is still readable
|
||||||
|
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead1 =
|
||||||
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, prevPurgeVersion));
|
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, prevPurgeVersion));
|
||||||
|
if (BGV_DEBUG) {
|
||||||
|
fmt::print("BGV Post-purge first read:\n");
|
||||||
|
printGranuleChunks(versionRead1.second);
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
|
// read at purgeVersion - 1, should NOT be readable
|
||||||
Version minSnapshotVersion = newPurgeVersion;
|
Version minSnapshotVersion = newPurgeVersion;
|
||||||
for (auto& it : versionRead.second) {
|
for (auto& it : versionRead1.second) {
|
||||||
minSnapshotVersion = std::min(minSnapshotVersion, it.snapshotVersion);
|
minSnapshotVersion = std::min(minSnapshotVersion, it.snapshotVersion);
|
||||||
}
|
}
|
||||||
if (BGV_DEBUG) {
|
if (BGV_DEBUG) {
|
||||||
fmt::print("Reading post-purge @ {0}\n", minSnapshotVersion - 1);
|
fmt::print("BGV Reading post-purge again [{0} - {1}) @ {2}\n", oldRead.range.begin.printable(), oldRead.range.end.printable(), minSnapshotVersion - 1);
|
||||||
}
|
}
|
||||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead =
|
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead2 =
|
||||||
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, minSnapshotVersion - 1));
|
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, minSnapshotVersion - 1));
|
||||||
if (BGV_DEBUG) {
|
if (BGV_DEBUG) {
|
||||||
fmt::print("ERROR: data not purged! Read successful!!\n");
|
fmt::print("BGV ERROR: data not purged! Read successful!!\n");
|
||||||
|
printGranuleChunks(versionRead2.second);
|
||||||
}
|
}
|
||||||
ASSERT(false);
|
ASSERT(false);
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
|
@ -507,6 +517,8 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
||||||
// For some reason simulation is still passing when this fails?.. so assert for now
|
// For some reason simulation is still passing when this fails?.. so assert for now
|
||||||
ASSERT(result);
|
ASSERT(result);
|
||||||
|
|
||||||
|
// FIXME: if doPurging was set, possibly do one last purge here, and verify it succeeds with no errors
|
||||||
|
|
||||||
if (self->clientId == 0 && SERVER_KNOBS->BG_ENABLE_MERGING && deterministicRandom()->random01() < 0.1) {
|
if (self->clientId == 0 && SERVER_KNOBS->BG_ENABLE_MERGING && deterministicRandom()->random01() < 0.1) {
|
||||||
CODE_PROBE(true, "BGV clearing database and awaiting merge");
|
CODE_PROBE(true, "BGV clearing database and awaiting merge");
|
||||||
wait(clearAndAwaitMerge(cx, normalKeys));
|
wait(clearAndAwaitMerge(cx, normalKeys));
|
||||||
|
|
Loading…
Reference in New Issue