Merge branch 'main' into improved_cf_testing
This commit is contained in:
commit
1ea43aa878
|
@ -9409,11 +9409,20 @@ Future<Void> DatabaseContext::getChangeFeedStream(Reference<ChangeFeedData> resu
|
|||
Reference<DatabaseContext>::addRef(this), results, rangeID, begin, end, range, replyBufferSize, canReadPopped);
|
||||
}
|
||||
|
||||
ACTOR Future<std::vector<OverlappingChangeFeedEntry>> singleLocationOverlappingChangeFeeds(
|
||||
Database cx,
|
||||
Reference<LocationInfo> location,
|
||||
KeyRangeRef range,
|
||||
Version minVersion) {
|
||||
Version OverlappingChangeFeedsInfo::getFeedMetadataVersion(const KeyRangeRef& range) const {
|
||||
Version v = invalidVersion;
|
||||
for (auto& it : feedMetadataVersions) {
|
||||
if (it.second > v && it.first.intersects(range)) {
|
||||
v = it.second;
|
||||
}
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
ACTOR Future<OverlappingChangeFeedsReply> singleLocationOverlappingChangeFeeds(Database cx,
|
||||
Reference<LocationInfo> location,
|
||||
KeyRangeRef range,
|
||||
Version minVersion) {
|
||||
state OverlappingChangeFeedsRequest req;
|
||||
req.range = range;
|
||||
req.minVersion = minVersion;
|
||||
|
@ -9425,16 +9434,16 @@ ACTOR Future<std::vector<OverlappingChangeFeedEntry>> singleLocationOverlappingC
|
|||
TaskPriority::DefaultPromiseEndpoint,
|
||||
AtMostOnce::False,
|
||||
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr));
|
||||
return rep.rangeIds;
|
||||
return rep;
|
||||
}
|
||||
|
||||
bool compareChangeFeedResult(const OverlappingChangeFeedEntry& i, const OverlappingChangeFeedEntry& j) {
|
||||
return i.rangeId < j.rangeId;
|
||||
return i.feedId < j.feedId;
|
||||
}
|
||||
|
||||
ACTOR Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeedsActor(Reference<DatabaseContext> db,
|
||||
KeyRangeRef range,
|
||||
Version minVersion) {
|
||||
ACTOR Future<OverlappingChangeFeedsInfo> getOverlappingChangeFeedsActor(Reference<DatabaseContext> db,
|
||||
KeyRangeRef range,
|
||||
Version minVersion) {
|
||||
state Database cx(db);
|
||||
state Span span("NAPI:GetOverlappingChangeFeeds"_loc);
|
||||
|
||||
|
@ -9460,19 +9469,33 @@ ACTOR Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeedsA
|
|||
throw all_alternatives_failed();
|
||||
}
|
||||
|
||||
state std::vector<Future<std::vector<OverlappingChangeFeedEntry>>> allOverlappingRequests;
|
||||
state std::vector<Future<OverlappingChangeFeedsReply>> allOverlappingRequests;
|
||||
for (auto& it : locations) {
|
||||
allOverlappingRequests.push_back(
|
||||
singleLocationOverlappingChangeFeeds(cx, it.locations, it.range & range, minVersion));
|
||||
}
|
||||
wait(waitForAll(allOverlappingRequests));
|
||||
|
||||
std::vector<OverlappingChangeFeedEntry> result;
|
||||
for (auto& it : allOverlappingRequests) {
|
||||
result.insert(result.end(), it.get().begin(), it.get().end());
|
||||
OverlappingChangeFeedsInfo result;
|
||||
std::unordered_map<KeyRef, OverlappingChangeFeedEntry> latestFeedMetadata;
|
||||
for (int i = 0; i < locations.size(); i++) {
|
||||
result.arena.dependsOn(allOverlappingRequests[i].get().arena);
|
||||
result.arena.dependsOn(locations[i].range.arena());
|
||||
result.feedMetadataVersions.push_back(
|
||||
{ locations[i].range, allOverlappingRequests[i].get().feedMetadataVersion });
|
||||
for (auto& it : allOverlappingRequests[i].get().feeds) {
|
||||
auto res = latestFeedMetadata.insert({ it.feedId, it });
|
||||
if (!res.second) {
|
||||
CODE_PROBE(true, "deduping fetched overlapping feed by higher metadata version");
|
||||
if (res.first->second.feedMetadataVersion < it.feedMetadataVersion) {
|
||||
res.first->second = it;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for (auto& it : latestFeedMetadata) {
|
||||
result.feeds.push_back(result.arena, it.second);
|
||||
}
|
||||
std::sort(result.begin(), result.end(), compareChangeFeedResult);
|
||||
result.resize(std::unique(result.begin(), result.end()) - result.begin());
|
||||
return result;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) {
|
||||
|
@ -9485,8 +9508,7 @@ ACTOR Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeedsA
|
|||
}
|
||||
}
|
||||
|
||||
Future<std::vector<OverlappingChangeFeedEntry>> DatabaseContext::getOverlappingChangeFeeds(KeyRangeRef range,
|
||||
Version minVersion) {
|
||||
Future<OverlappingChangeFeedsInfo> DatabaseContext::getOverlappingChangeFeeds(KeyRangeRef range, Version minVersion) {
|
||||
return getOverlappingChangeFeedsActor(Reference<DatabaseContext>::addRef(this), range, minVersion);
|
||||
}
|
||||
|
||||
|
@ -9610,7 +9632,7 @@ ACTOR Future<Key> purgeBlobGranulesActor(Reference<DatabaseContext> db,
|
|||
state bool loadedTenantPrefix = false;
|
||||
|
||||
// FIXME: implement force
|
||||
if (!force) {
|
||||
if (force) {
|
||||
throw unsupported_operation();
|
||||
}
|
||||
|
||||
|
|
|
@ -207,6 +207,16 @@ struct KeyRangeLocationInfo {
|
|||
: tenantEntry(tenantEntry), range(range), locations(locations) {}
|
||||
};
|
||||
|
||||
struct OverlappingChangeFeedsInfo {
|
||||
Arena arena;
|
||||
VectorRef<OverlappingChangeFeedEntry> feeds;
|
||||
// would prefer to use key range map but it complicates copy/move constructors
|
||||
std::vector<std::pair<KeyRangeRef, Version>> feedMetadataVersions;
|
||||
|
||||
// for a feed that wasn't present, returns the metadata version it would have been fetched at.
|
||||
Version getFeedMetadataVersion(const KeyRangeRef& feedRange) const;
|
||||
};
|
||||
|
||||
class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAllocated<DatabaseContext>, NonCopyable {
|
||||
public:
|
||||
static DatabaseContext* allocateOnForeignThread() {
|
||||
|
@ -361,7 +371,7 @@ public:
|
|||
int replyBufferSize = -1,
|
||||
bool canReadPopped = true);
|
||||
|
||||
Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion);
|
||||
Future<OverlappingChangeFeedsInfo> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion);
|
||||
Future<Void> popChangeFeedMutations(Key rangeID, Version version);
|
||||
|
||||
Future<Key> purgeBlobGranules(KeyRange keyRange,
|
||||
|
|
|
@ -970,39 +970,51 @@ struct FetchCheckpointKeyValuesRequest {
|
|||
};
|
||||
|
||||
struct OverlappingChangeFeedEntry {
|
||||
Key rangeId;
|
||||
KeyRange range;
|
||||
KeyRef feedId;
|
||||
KeyRangeRef range;
|
||||
Version emptyVersion;
|
||||
Version stopVersion;
|
||||
Version feedMetadataVersion;
|
||||
|
||||
bool operator==(const OverlappingChangeFeedEntry& r) const {
|
||||
return rangeId == r.rangeId && range == r.range && emptyVersion == r.emptyVersion &&
|
||||
stopVersion == r.stopVersion;
|
||||
return feedId == r.feedId && range == r.range && emptyVersion == r.emptyVersion &&
|
||||
stopVersion == r.stopVersion && feedMetadataVersion == r.feedMetadataVersion;
|
||||
}
|
||||
|
||||
OverlappingChangeFeedEntry() {}
|
||||
OverlappingChangeFeedEntry(Key const& rangeId, KeyRange const& range, Version emptyVersion, Version stopVersion)
|
||||
: rangeId(rangeId), range(range), emptyVersion(emptyVersion), stopVersion(stopVersion) {}
|
||||
OverlappingChangeFeedEntry(KeyRef const& feedId,
|
||||
KeyRangeRef const& range,
|
||||
Version emptyVersion,
|
||||
Version stopVersion,
|
||||
Version feedMetadataVersion)
|
||||
: feedId(feedId), range(range), emptyVersion(emptyVersion), stopVersion(stopVersion),
|
||||
feedMetadataVersion(feedMetadataVersion) {}
|
||||
|
||||
OverlappingChangeFeedEntry(Arena& arena, const OverlappingChangeFeedEntry& rhs)
|
||||
: feedId(arena, rhs.feedId), range(arena, rhs.range), emptyVersion(rhs.emptyVersion),
|
||||
stopVersion(rhs.stopVersion), feedMetadataVersion(rhs.feedMetadataVersion) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, rangeId, range, emptyVersion, stopVersion);
|
||||
serializer(ar, feedId, range, emptyVersion, stopVersion, feedMetadataVersion);
|
||||
}
|
||||
};
|
||||
|
||||
struct OverlappingChangeFeedsReply {
|
||||
constexpr static FileIdentifier file_identifier = 11815134;
|
||||
std::vector<OverlappingChangeFeedEntry> rangeIds;
|
||||
VectorRef<OverlappingChangeFeedEntry> feeds;
|
||||
bool cached;
|
||||
Arena arena;
|
||||
Version feedMetadataVersion;
|
||||
|
||||
OverlappingChangeFeedsReply() : cached(false) {}
|
||||
explicit OverlappingChangeFeedsReply(std::vector<OverlappingChangeFeedEntry> const& rangeIds)
|
||||
: rangeIds(rangeIds), cached(false) {}
|
||||
OverlappingChangeFeedsReply() : cached(false), feedMetadataVersion(invalidVersion) {}
|
||||
explicit OverlappingChangeFeedsReply(VectorRef<OverlappingChangeFeedEntry> const& feeds,
|
||||
Version feedMetadataVersion)
|
||||
: feeds(feeds), cached(false), feedMetadataVersion(feedMetadataVersion) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, rangeIds, arena);
|
||||
serializer(ar, feeds, arena, feedMetadataVersion);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "flow/ProtocolVersion.h"
|
||||
#include <algorithm>
|
||||
#include <string>
|
||||
#include <limits>
|
||||
#pragma once
|
||||
|
||||
#include "flow/flow.h"
|
||||
|
@ -469,6 +470,8 @@ public:
|
|||
bool setDiffProtocol; // true if a process with a different protocol version has been started
|
||||
|
||||
bool allowStorageMigrationTypeChange = false;
|
||||
double injectTargetedSSRestartTime = std::numeric_limits<double>::max();
|
||||
double injectSSDelayTime = std::numeric_limits<double>::max();
|
||||
|
||||
flowGlobalType global(int id) const final { return getCurrentProcess()->global(id); };
|
||||
void setGlobal(size_t id, flowGlobalType v) final { getCurrentProcess()->setGlobal(id, v); };
|
||||
|
|
|
@ -143,30 +143,34 @@ bool compareFDBAndBlob(RangeResult fdb,
|
|||
}
|
||||
}
|
||||
|
||||
printf("Chunks:\n");
|
||||
for (auto& chunk : blob.second) {
|
||||
printf("[%s - %s)\n", chunk.keyRange.begin.printable().c_str(), chunk.keyRange.end.printable().c_str());
|
||||
|
||||
printf(" SnapshotFile:\n %s\n",
|
||||
chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : "<none>");
|
||||
printf(" DeltaFiles:\n");
|
||||
for (auto& df : chunk.deltaFiles) {
|
||||
printf(" %s\n", df.toString().c_str());
|
||||
}
|
||||
printf(" Deltas: (%d)", chunk.newDeltas.size());
|
||||
if (chunk.newDeltas.size() > 0) {
|
||||
fmt::print(" with version [{0} - {1}]",
|
||||
chunk.newDeltas[0].version,
|
||||
chunk.newDeltas[chunk.newDeltas.size() - 1].version);
|
||||
}
|
||||
fmt::print(" IncludedVersion: {}\n", chunk.includedVersion);
|
||||
}
|
||||
printf("\n");
|
||||
printGranuleChunks(blob.second);
|
||||
}
|
||||
}
|
||||
return correct;
|
||||
}
|
||||
|
||||
void printGranuleChunks(const Standalone<VectorRef<BlobGranuleChunkRef>>& chunks) {
|
||||
printf("Chunks:\n");
|
||||
for (auto& chunk : chunks) {
|
||||
printf("[%s - %s)\n", chunk.keyRange.begin.printable().c_str(), chunk.keyRange.end.printable().c_str());
|
||||
|
||||
printf(" SnapshotFile:\n %s\n",
|
||||
chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : "<none>");
|
||||
printf(" DeltaFiles:\n");
|
||||
for (auto& df : chunk.deltaFiles) {
|
||||
printf(" %s\n", df.toString().c_str());
|
||||
}
|
||||
printf(" Deltas: (%d)", chunk.newDeltas.size());
|
||||
if (chunk.newDeltas.size() > 0) {
|
||||
fmt::print(" with version [{0} - {1}]",
|
||||
chunk.newDeltas[0].version,
|
||||
chunk.newDeltas[chunk.newDeltas.size() - 1].version);
|
||||
}
|
||||
fmt::print(" IncludedVersion: {}\n", chunk.includedVersion);
|
||||
}
|
||||
printf("\n");
|
||||
}
|
||||
|
||||
ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range) {
|
||||
// clear key range and check whether it is merged or not, repeatedly
|
||||
state Transaction tr(cx);
|
||||
|
|
|
@ -52,6 +52,7 @@
|
|||
*/
|
||||
|
||||
#define BM_DEBUG false
|
||||
#define BM_PURGE_DEBUG false
|
||||
|
||||
void handleClientBlobRange(KeyRangeMap<bool>* knownBlobRanges,
|
||||
Arena& ar,
|
||||
|
@ -1649,7 +1650,9 @@ ACTOR Future<Void> persistMergeGranulesDone(Reference<BlobManagerData> bmData,
|
|||
state Key lockKey = blobGranuleLockKeyFor(parentRange);
|
||||
state Future<Optional<Value>> oldLockFuture = tr->get(lockKey);
|
||||
|
||||
wait(updateChangeFeed(tr,
|
||||
// This has to be
|
||||
// TODO: fix this better! (privatize change feed key clear)
|
||||
wait(updateChangeFeed(&tr->getTransaction(),
|
||||
granuleIDToCFKey(parentGranuleIDs[parentIdx]),
|
||||
ChangeFeedStatus::CHANGE_FEED_DESTROY,
|
||||
parentRange));
|
||||
|
@ -3168,8 +3171,8 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
|
|||
Key historyKey,
|
||||
Version purgeVersion,
|
||||
KeyRange granuleRange) {
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Fully deleting granule {0}: init\n", granuleId.toString());
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Fully deleting granule {1}: init\n", self->epoch, granuleId.toString());
|
||||
}
|
||||
|
||||
// if granule is still splitting and files are needed for new sub-granules to re-snapshot, we can only partially
|
||||
|
@ -3195,8 +3198,11 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
|
|||
filesToDelete.emplace_back(fname);
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Fully deleting granule {0}: deleting {1} files\n", granuleId.toString(), filesToDelete.size());
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Fully deleting granule {1}: deleting {2} files\n",
|
||||
self->epoch,
|
||||
granuleId.toString(),
|
||||
filesToDelete.size());
|
||||
for (auto filename : filesToDelete) {
|
||||
fmt::print(" - {}\n", filename.c_str());
|
||||
}
|
||||
|
@ -3209,8 +3215,9 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
|
|||
wait(waitForAll(deletions));
|
||||
|
||||
// delete metadata in FDB (history entry and file keys)
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Fully deleting granule {0}: deleting history and file keys\n", granuleId.toString());
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print(
|
||||
"BM {0} Fully deleting granule {1}: deleting history and file keys\n", self->epoch, granuleId.toString());
|
||||
}
|
||||
|
||||
state Transaction tr(self->db);
|
||||
|
@ -3229,8 +3236,8 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
|
|||
}
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Fully deleting granule {0}: success\n", granuleId.toString());
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Fully deleting granule {1}: success\n", self->epoch, granuleId.toString());
|
||||
}
|
||||
|
||||
TraceEvent("GranuleFullPurge", self->id)
|
||||
|
@ -3242,6 +3249,8 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
|
|||
++self->stats.granulesFullyPurged;
|
||||
self->stats.filesPurged += filesToDelete.size();
|
||||
|
||||
CODE_PROBE(true, "full granule purged");
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -3257,8 +3266,8 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
|
|||
UID granuleId,
|
||||
Version purgeVersion,
|
||||
KeyRange granuleRange) {
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Partially deleting granule {0}: init\n", granuleId.toString());
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Partially deleting granule {1}: init\n", self->epoch, granuleId.toString());
|
||||
}
|
||||
|
||||
state Reference<BlobConnectionProvider> bstore = wait(getBStoreForGranule(self, granuleRange));
|
||||
|
@ -3307,8 +3316,11 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
|
|||
filesToDelete.emplace_back(fname);
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Partially deleting granule {0}: deleting {1} files\n", granuleId.toString(), filesToDelete.size());
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Partially deleting granule {1}: deleting {2} files\n",
|
||||
self->epoch,
|
||||
granuleId.toString(),
|
||||
filesToDelete.size());
|
||||
for (auto filename : filesToDelete) {
|
||||
fmt::print(" - {0}\n", filename);
|
||||
}
|
||||
|
@ -3325,8 +3337,8 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
|
|||
wait(waitForAll(deletions));
|
||||
|
||||
// delete metadata in FDB (deleted file keys)
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Partially deleting granule {0}: deleting file keys\n", granuleId.toString());
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Partially deleting granule {1}: deleting file keys\n", self->epoch, granuleId.toString());
|
||||
}
|
||||
|
||||
state Transaction tr(self->db);
|
||||
|
@ -3345,8 +3357,8 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
|
|||
}
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Partially deleting granule {0}: success\n", granuleId.toString());
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Partially deleting granule {1}: success\n", self->epoch, granuleId.toString());
|
||||
}
|
||||
TraceEvent("GranulePartialPurge", self->id)
|
||||
.detail("Epoch", self->epoch)
|
||||
|
@ -3357,6 +3369,8 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
|
|||
++self->stats.granulesPartiallyPurged;
|
||||
self->stats.filesPurged += filesToDelete.size();
|
||||
|
||||
CODE_PROBE(true, " partial granule purged");
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -3369,8 +3383,9 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
|
|||
* processing this purge intent.
|
||||
*/
|
||||
ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range, Version purgeVersion, bool force) {
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("purgeRange starting for range [{0} - {1}) @ purgeVersion={2}, force={3}\n",
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} purgeRange starting for range [{1} - {2}) @ purgeVersion={3}, force={4}\n",
|
||||
self->epoch,
|
||||
range.begin.printable(),
|
||||
range.end.printable(),
|
||||
purgeVersion,
|
||||
|
@ -3392,8 +3407,7 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
|||
|
||||
// track which granules we have already added to traversal
|
||||
// note: (startKey, startVersion) uniquely identifies a granule
|
||||
state std::unordered_set<std::pair<const uint8_t*, Version>, boost::hash<std::pair<const uint8_t*, Version>>>
|
||||
visited;
|
||||
state std::unordered_set<std::pair<std::string, Version>, boost::hash<std::pair<std::string, Version>>> visited;
|
||||
|
||||
// find all active granules (that comprise the range) and add to the queue
|
||||
state KeyRangeMap<UID>::Ranges activeRanges = self->workerAssignments.intersectingRanges(range);
|
||||
|
@ -3404,8 +3418,9 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
|||
|
||||
state KeyRangeMap<UID>::iterator activeRange;
|
||||
for (activeRange = activeRanges.begin(); activeRange != activeRanges.end(); ++activeRange) {
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Checking if active range [{0} - {1}), owned by BW {2}, should be purged\n",
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Checking if active range [{1} - {2}), owned by BW {3}, should be purged\n",
|
||||
self->epoch,
|
||||
activeRange.begin().printable(),
|
||||
activeRange.end().printable(),
|
||||
activeRange.value().toString());
|
||||
|
@ -3413,6 +3428,10 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
|||
|
||||
// assumption: purge boundaries must respect granule boundaries
|
||||
if (activeRange.begin() < range.begin || activeRange.end() > range.end) {
|
||||
TraceEvent(SevWarn, "GranulePurgeRangesUnaligned", self->id)
|
||||
.detail("Epoch", self->epoch)
|
||||
.detail("PurgeRange", range)
|
||||
.detail("GranuleRange", activeRange.range());
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -3422,20 +3441,29 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
|||
|
||||
loop {
|
||||
try {
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Fetching latest history entry for range [{0} - {1})\n",
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Fetching latest history entry for range [{1} - {2})\n",
|
||||
self->epoch,
|
||||
activeRange.begin().printable(),
|
||||
activeRange.end().printable());
|
||||
}
|
||||
// FIXME: doing this serially will likely be too slow for large purges
|
||||
Optional<GranuleHistory> history = wait(getLatestGranuleHistory(&tr, activeRange.range()));
|
||||
// TODO: can we tell from the krm that this range is not valid, so that we don't need to do a
|
||||
// get
|
||||
if (history.present()) {
|
||||
if (BM_DEBUG) {
|
||||
printf("Adding range to history queue\n");
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Adding range to history queue: [{1} - {2}) @ {3} ({4})\n",
|
||||
self->epoch,
|
||||
activeRange.begin().printable(),
|
||||
activeRange.end().printable(),
|
||||
history.get().version,
|
||||
(void*)(activeRange.range().begin.begin()));
|
||||
}
|
||||
visited.insert({ activeRange.range().begin.begin(), history.get().version });
|
||||
visited.insert({ activeRange.range().begin.toString(), history.get().version });
|
||||
historyEntryQueue.push({ activeRange.range(), history.get().version, MAX_VERSION });
|
||||
} else if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} No history for range, ignoring\n", self->epoch);
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
|
@ -3444,8 +3472,12 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
|||
}
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
printf("Beginning BFS traversal of history\n");
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Beginning BFS traversal of {1} history items for range [{2} - {3}) \n",
|
||||
self->epoch,
|
||||
historyEntryQueue.size(),
|
||||
range.begin.printable(),
|
||||
range.end.printable());
|
||||
}
|
||||
while (!historyEntryQueue.empty()) {
|
||||
// process the node at the front of the queue and remove it
|
||||
|
@ -3455,8 +3487,9 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
|||
std::tie(currRange, startVersion, endVersion) = historyEntryQueue.front();
|
||||
historyEntryQueue.pop();
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Processing history node [{0} - {1}) with versions [{2}, {3})\n",
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Processing history node [{1} - {2}) with versions [{3}, {4})\n",
|
||||
self->epoch,
|
||||
currRange.begin.printable(),
|
||||
currRange.end.printable(),
|
||||
startVersion,
|
||||
|
@ -3481,11 +3514,15 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
|||
}
|
||||
|
||||
if (!foundHistory) {
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} No history for this node, skipping\n", self->epoch);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Found history entry for this node. It's granuleID is {0}\n",
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Found history entry for this node. It's granuleID is {1}\n",
|
||||
self->epoch,
|
||||
currHistoryNode.granuleID.toString());
|
||||
}
|
||||
|
||||
|
@ -3496,33 +3533,45 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
|||
// and so this granule should be partially deleted
|
||||
// - otherwise, this granule is active, so don't schedule it for deletion
|
||||
if (force || endVersion <= purgeVersion) {
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Granule {0} will be FULLY deleted\n", currHistoryNode.granuleID.toString());
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print(
|
||||
"BM {0} Granule {1} will be FULLY deleted\n", self->epoch, currHistoryNode.granuleID.toString());
|
||||
}
|
||||
toFullyDelete.push_back({ currHistoryNode.granuleID, historyKey, currRange });
|
||||
} else if (startVersion < purgeVersion) {
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Granule {0} will be partially deleted\n", currHistoryNode.granuleID.toString());
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Granule {1} will be partially deleted\n",
|
||||
self->epoch,
|
||||
currHistoryNode.granuleID.toString());
|
||||
}
|
||||
toPartiallyDelete.push_back({ currHistoryNode.granuleID, currRange });
|
||||
}
|
||||
|
||||
// add all of the node's parents to the queue
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Checking {1} parents\n", self->epoch, currHistoryNode.parentVersions.size());
|
||||
}
|
||||
for (int i = 0; i < currHistoryNode.parentVersions.size(); i++) {
|
||||
// for (auto& parent : currHistoryNode.parentVersions.size()) {
|
||||
// if we already added this node to queue, skip it; otherwise, mark it as visited
|
||||
KeyRangeRef parentRange(currHistoryNode.parentBoundaries[i], currHistoryNode.parentBoundaries[i + 1]);
|
||||
Version parentVersion = currHistoryNode.parentVersions[i];
|
||||
if (visited.count({ parentRange.begin.begin(), parentVersion })) {
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Already added {0} to queue, so skipping it\n", currHistoryNode.granuleID.toString());
|
||||
std::string beginStr = parentRange.begin.toString();
|
||||
if (!visited.insert({ beginStr, parentVersion }).second) {
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Already added [{1} - {2}) @ {3} - {4} to queue, so skipping it\n",
|
||||
self->epoch,
|
||||
parentRange.begin.printable(),
|
||||
parentRange.end.printable(),
|
||||
parentVersion,
|
||||
startVersion);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
visited.insert({ parentRange.begin.begin(), parentVersion });
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Adding parent [{0} - {1}) with versions [{2} - {3}) to queue\n",
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Adding parent [{1} - {2}) @ {3} - {4} to queue\n",
|
||||
self->epoch,
|
||||
parentRange.begin.printable(),
|
||||
parentRange.end.printable(),
|
||||
parentVersion,
|
||||
|
@ -3550,10 +3599,19 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
|||
// we won't run into any issues with trying to "re-delete" a blob file since deleting
|
||||
// a file that doesn't exist is considered successful
|
||||
|
||||
TraceEvent("PurgeGranulesTraversalComplete", self->id)
|
||||
.detail("Epoch", self->epoch)
|
||||
.detail("Range", range)
|
||||
.detail("PurgeVersion", purgeVersion)
|
||||
.detail("Force", force)
|
||||
.detail("VisitedCount", visited.size())
|
||||
.detail("DeletingFullyCount", toFullyDelete.size())
|
||||
.detail("DeletingPartiallyCount", toPartiallyDelete.size());
|
||||
|
||||
state std::vector<Future<Void>> partialDeletions;
|
||||
state int i;
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("{0} granules to fully delete\n", toFullyDelete.size());
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0}: {1} granules to fully delete\n", self->epoch, toFullyDelete.size());
|
||||
}
|
||||
for (i = toFullyDelete.size() - 1; i >= 0; --i) {
|
||||
state UID granuleId;
|
||||
|
@ -3561,22 +3619,22 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
|||
KeyRange keyRange;
|
||||
std::tie(granuleId, historyKey, keyRange) = toFullyDelete[i];
|
||||
// FIXME: consider batching into a single txn (need to take care of txn size limit)
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("About to fully delete granule {0}\n", granuleId.toString());
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0}: About to fully delete granule {1}\n", self->epoch, granuleId.toString());
|
||||
}
|
||||
wait(fullyDeleteGranule(self, granuleId, historyKey, purgeVersion, range));
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("{0} granules to partially delete\n", toPartiallyDelete.size());
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0}: {1} granules to partially delete\n", self->epoch, toPartiallyDelete.size());
|
||||
}
|
||||
|
||||
for (i = toPartiallyDelete.size() - 1; i >= 0; --i) {
|
||||
UID granuleId;
|
||||
KeyRange range;
|
||||
std::tie(granuleId, range) = toPartiallyDelete[i];
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("About to partially delete granule {0}\n", granuleId.toString());
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0}: About to partially delete granule {1}\n", self->epoch, granuleId.toString());
|
||||
}
|
||||
partialDeletions.emplace_back(partiallyDeleteGranule(self, granuleId, purgeVersion, range));
|
||||
}
|
||||
|
@ -3588,8 +3646,9 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
|||
// another purgeIntent that got written for this table while we were processing this one.
|
||||
// If that is the case, we should not clear the key. Otherwise, we can just clear the key.
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Successfully purged range [{0} - {1}) at purgeVersion={2}\n",
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0}: Successfully purged range [{1} - {2}) at purgeVersion={3}\n",
|
||||
self->epoch,
|
||||
range.begin.printable(),
|
||||
range.end.printable(),
|
||||
purgeVersion);
|
||||
|
@ -3601,6 +3660,8 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
|
|||
.detail("PurgeVersion", purgeVersion)
|
||||
.detail("Force", force);
|
||||
|
||||
CODE_PROBE(true, "range purge complete");
|
||||
|
||||
++self->stats.purgesProcessed;
|
||||
return Void();
|
||||
}
|
||||
|
@ -3651,6 +3712,7 @@ ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
|
|||
// TODO: replace 10000 with a knob
|
||||
state RangeResult purgeIntents = wait(tr->getRange(blobGranulePurgeKeys, BUGGIFY ? 1 : 10000));
|
||||
if (purgeIntents.size()) {
|
||||
CODE_PROBE(true, "BM found purges to process");
|
||||
int rangeIdx = 0;
|
||||
for (; rangeIdx < purgeIntents.size(); ++rangeIdx) {
|
||||
Version purgeVersion;
|
||||
|
@ -3672,8 +3734,9 @@ ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
|
|||
}
|
||||
purgeMap.insert(range, std::make_pair(purgeVersion, force));
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("about to purge range [{0} - {1}) @ {2}, force={3}\n",
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} about to purge range [{1} - {2}) @ {3}, force={4}\n",
|
||||
self->epoch,
|
||||
range.begin.printable(),
|
||||
range.end.printable(),
|
||||
purgeVersion,
|
||||
|
@ -3725,9 +3788,11 @@ ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
|
|||
}
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
printf("Done clearing current set of purge intents.\n");
|
||||
if (BM_PURGE_DEBUG) {
|
||||
fmt::print("BM {0} Done clearing current set of purge intents.\n", self->epoch);
|
||||
}
|
||||
|
||||
CODE_PROBE(true, "BM finished processing purge intents");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -280,6 +280,13 @@ class TestConfig {
|
|||
if (attrib == "blobGranulesEnabled") {
|
||||
blobGranulesEnabled = strcmp(value.c_str(), "true") == 0;
|
||||
}
|
||||
if (attrib == "injectSSTargetedRestart") {
|
||||
injectTargetedSSRestart = strcmp(value.c_str(), "true") == 0;
|
||||
}
|
||||
|
||||
if (attrib == "injectSSDelay") {
|
||||
injectSSDelay = strcmp(value.c_str(), "true") == 0;
|
||||
}
|
||||
}
|
||||
|
||||
ifs.close();
|
||||
|
@ -327,6 +334,8 @@ public:
|
|||
|
||||
bool allowDefaultTenant = true;
|
||||
bool allowDisablingTenants = true;
|
||||
bool injectTargetedSSRestart = false;
|
||||
bool injectSSDelay = false;
|
||||
|
||||
ConfigDBType getConfigDBType() const { return configDBType; }
|
||||
|
||||
|
@ -384,7 +393,9 @@ public:
|
|||
.add("blobGranulesEnabled", &blobGranulesEnabled)
|
||||
.add("allowDefaultTenant", &allowDefaultTenant)
|
||||
.add("allowDisablingTenants", &allowDisablingTenants)
|
||||
.add("randomlyRenameZoneId", &randomlyRenameZoneId);
|
||||
.add("randomlyRenameZoneId", &randomlyRenameZoneId)
|
||||
.add("injectTargetedSSRestart", &injectTargetedSSRestart)
|
||||
.add("injectSSDelay", &injectSSDelay);
|
||||
try {
|
||||
auto file = toml::parse(testFile);
|
||||
if (file.contains("configuration") && toml::find(file, "configuration").is_table()) {
|
||||
|
@ -2364,6 +2375,13 @@ ACTOR void setupAndRun(std::string dataFolder,
|
|||
testConfig.readFromConfig(testFile);
|
||||
g_simulator.hasDiffProtocolProcess = testConfig.startIncompatibleProcess;
|
||||
g_simulator.setDiffProtocol = false;
|
||||
if (testConfig.injectTargetedSSRestart && deterministicRandom()->random01() < 0.25) {
|
||||
g_simulator.injectTargetedSSRestartTime = 60.0 + 340.0 * deterministicRandom()->random01();
|
||||
}
|
||||
|
||||
if (testConfig.injectSSDelay && deterministicRandom()->random01() < 0.25) {
|
||||
g_simulator.injectSSDelayTime = 60.0 + 240.0 * deterministicRandom()->random01();
|
||||
}
|
||||
|
||||
// Build simulator allow list
|
||||
allowList.addTrustedSubnet("0.0.0.0/2"sv);
|
||||
|
|
|
@ -51,6 +51,8 @@ bool compareFDBAndBlob(RangeResult fdb,
|
|||
Version v,
|
||||
bool debug);
|
||||
|
||||
void printGranuleChunks(const Standalone<VectorRef<BlobGranuleChunkRef>>& chunks);
|
||||
|
||||
ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range);
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
|
|
|
@ -536,6 +536,9 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
|
|||
Version storageVersion = invalidVersion; // The version between the storage version and the durable version are
|
||||
// being written to disk as part of the current commit in updateStorage.
|
||||
Version durableVersion = invalidVersion; // All versions before the durable version are durable on disk
|
||||
// FIXME: this needs to get persisted to disk to still fix same races across restart!
|
||||
Version metadataVersion = invalidVersion; // Last update to the change feed metadata. Used for reasoning about
|
||||
// fetched metadata vs local metadata
|
||||
Version emptyVersion = 0; // The change feed does not have any mutations before emptyVersion
|
||||
KeyRange range;
|
||||
Key id;
|
||||
|
@ -551,8 +554,6 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
|
|||
|
||||
bool removing = false;
|
||||
bool destroyed = false;
|
||||
bool possiblyDestroyed = false;
|
||||
bool refreshInProgress = false;
|
||||
|
||||
KeyRangeMap<std::unordered_map<UID, Promise<Void>>> moveTriggers;
|
||||
|
||||
|
@ -587,12 +588,21 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
|
|||
}
|
||||
|
||||
void destroy(Version destroyVersion) {
|
||||
updateMetadataVersion(destroyVersion);
|
||||
removing = true;
|
||||
destroyed = true;
|
||||
refreshInProgress = false;
|
||||
moved(range);
|
||||
newMutations.trigger();
|
||||
}
|
||||
|
||||
bool updateMetadataVersion(Version version) {
|
||||
// don't update metadata version if removing, so that metadata version remains the moved away version
|
||||
if (!removing && version > metadataVersion) {
|
||||
metadataVersion = version;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
class ServerWatchMetadata : public ReferenceCounted<ServerWatchMetadata> {
|
||||
|
@ -895,7 +905,7 @@ public:
|
|||
KeyRangeMap<std::vector<Reference<ChangeFeedInfo>>> keyChangeFeed;
|
||||
std::map<Key, Reference<ChangeFeedInfo>> uidChangeFeed;
|
||||
Deque<std::pair<std::vector<Key>, Version>> changeFeedVersions;
|
||||
std::map<UID, PromiseStream<Key>> changeFeedRemovals;
|
||||
std::map<UID, PromiseStream<Key>> changeFeedDestroys;
|
||||
std::set<Key> currentChangeFeeds;
|
||||
std::set<Key> fetchingChangeFeeds;
|
||||
std::unordered_map<NetworkAddress, std::map<UID, Version>> changeFeedClientVersions;
|
||||
|
@ -1400,6 +1410,28 @@ public:
|
|||
req.reply.sendError(e);
|
||||
}
|
||||
}
|
||||
|
||||
void maybeInjectTargetedRestart(Version v) {
|
||||
// inject an SS restart at most once per test
|
||||
if (g_network->isSimulated() && !g_simulator.speedUpSimulation &&
|
||||
now() > g_simulator.injectTargetedSSRestartTime &&
|
||||
rebootAfterDurableVersion == std::numeric_limits<Version>::max()) {
|
||||
CODE_PROBE(true, "Injecting SS targeted restart");
|
||||
TraceEvent("SimSSInjectTargetedRestart", thisServerID).detail("Version", v);
|
||||
rebootAfterDurableVersion = v;
|
||||
g_simulator.injectTargetedSSRestartTime = std::numeric_limits<double>::max();
|
||||
}
|
||||
}
|
||||
|
||||
bool maybeInjectDelay() {
|
||||
if (g_network->isSimulated() && !g_simulator.speedUpSimulation && now() > g_simulator.injectSSDelayTime) {
|
||||
CODE_PROBE(true, "Injecting SS targeted delay");
|
||||
TraceEvent("SimSSInjectDelay", thisServerID);
|
||||
g_simulator.injectSSDelayTime = std::numeric_limits<double>::max();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
const StringRef StorageServer::CurrentRunningFetchKeys::emptyString = LiteralStringRef("");
|
||||
|
@ -2212,46 +2244,54 @@ ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChang
|
|||
return Void();
|
||||
}
|
||||
|
||||
Version metadataVersion = invalidVersion;
|
||||
Version metadataWaitVersion = invalidVersion;
|
||||
|
||||
auto ranges = data->keyChangeFeed.intersectingRanges(req.range);
|
||||
std::map<Key, std::tuple<KeyRange, Version, Version>> rangeIds;
|
||||
std::map<Key, std::tuple<KeyRange, Version, Version, Version>> rangeIds;
|
||||
for (auto r : ranges) {
|
||||
for (auto& it : r.value()) {
|
||||
if (!it->removing) {
|
||||
// Can't tell other SS about a change feed create or stopVersion that may get rolled back, and we only
|
||||
// need to tell it about the metadata if req.minVersion > metadataVersion, since it will get the
|
||||
// information from its own private mutations if it hasn't processed up that version yet
|
||||
metadataVersion = std::max(metadataVersion, it->metadataCreateVersion);
|
||||
metadataWaitVersion = std::max(metadataWaitVersion, it->metadataCreateVersion);
|
||||
|
||||
// don't wait for all it->metadataVersion updates, if metadata was fetched from elsewhere it's already
|
||||
// durable, and some updates are unecessary to wait for
|
||||
Version stopVersion;
|
||||
if (it->stopVersion != MAX_VERSION && req.minVersion > it->stopVersion) {
|
||||
stopVersion = it->stopVersion;
|
||||
metadataVersion = std::max(metadataVersion, stopVersion);
|
||||
metadataWaitVersion = std::max(metadataWaitVersion, stopVersion);
|
||||
} else {
|
||||
stopVersion = MAX_VERSION;
|
||||
}
|
||||
|
||||
rangeIds[it->id] = std::tuple(it->range, it->emptyVersion, stopVersion);
|
||||
rangeIds[it->id] = std::tuple(it->range, it->emptyVersion, stopVersion, it->metadataVersion);
|
||||
}
|
||||
}
|
||||
}
|
||||
state OverlappingChangeFeedsReply reply;
|
||||
reply.feedMetadataVersion = data->version.get();
|
||||
for (auto& it : rangeIds) {
|
||||
reply.rangeIds.push_back(OverlappingChangeFeedEntry(
|
||||
it.first, std::get<0>(it.second), std::get<1>(it.second), std::get<2>(it.second)));
|
||||
reply.feeds.push_back_deep(reply.arena,
|
||||
OverlappingChangeFeedEntry(it.first,
|
||||
std::get<0>(it.second),
|
||||
std::get<1>(it.second),
|
||||
std::get<2>(it.second),
|
||||
std::get<3>(it.second)));
|
||||
TraceEvent(SevDebug, "OverlappingChangeFeedEntry", data->thisServerID)
|
||||
.detail("MinVersion", req.minVersion)
|
||||
.detail("FeedID", it.first)
|
||||
.detail("Range", std::get<0>(it.second))
|
||||
.detail("EmptyVersion", std::get<1>(it.second))
|
||||
.detail("StopVersion", std::get<2>(it.second));
|
||||
.detail("StopVersion", std::get<2>(it.second))
|
||||
.detail("FeedMetadataVersion", std::get<3>(it.second));
|
||||
}
|
||||
|
||||
// Make sure all of the metadata we are sending won't get rolled back
|
||||
if (metadataVersion != invalidVersion && metadataVersion > data->knownCommittedVersion.get()) {
|
||||
if (metadataWaitVersion != invalidVersion && metadataWaitVersion > data->knownCommittedVersion.get()) {
|
||||
CODE_PROBE(true, "overlapping change feeds waiting for metadata version to be committed");
|
||||
wait(data->desiredOldestVersion.whenAtLeast(metadataVersion));
|
||||
wait(data->desiredOldestVersion.whenAtLeast(metadataWaitVersion));
|
||||
}
|
||||
req.reply.send(reply);
|
||||
return Void();
|
||||
|
@ -2582,21 +2622,37 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
|
|||
}
|
||||
} else if (memoryVerifyIdx < memoryReply.mutations.size() &&
|
||||
version == memoryReply.mutations[memoryVerifyIdx].version) {
|
||||
fmt::print("ERROR: SS {0} CF {1} SQ {2} has mutation at {3} in memory but all filtered out on disk!\n",
|
||||
data->thisServerID.toString().substr(0, 4),
|
||||
req.rangeID.printable().substr(0, 6),
|
||||
streamUID.toString().substr(0, 8),
|
||||
version);
|
||||
if (version > feedInfo->storageVersion && version > feedInfo->fetchVersion) {
|
||||
// Another validation case - feed was popped, data was fetched, fetched data was persisted but pop
|
||||
// wasn't yet, then SS restarted. Now SS has the data without the popped version. This looks wrong
|
||||
// here but is fine.
|
||||
memoryVerifyIdx++;
|
||||
} else {
|
||||
fmt::print(
|
||||
"ERROR: SS {0} CF {1} SQ {2} has mutation at {3} in memory but all filtered out on disk!\n",
|
||||
data->thisServerID.toString().substr(0, 4),
|
||||
req.rangeID.printable().substr(0, 6),
|
||||
streamUID.toString().substr(0, 8),
|
||||
version);
|
||||
|
||||
fmt::print(" Memory: ({})\n", memoryReply.mutations[memoryVerifyIdx].mutations.size());
|
||||
for (auto& it : memoryReply.mutations[memoryVerifyIdx].mutations) {
|
||||
if (it.type == MutationRef::SetValue) {
|
||||
fmt::print(" {}=\n", it.param1.printable().c_str());
|
||||
} else {
|
||||
fmt::print(" {} - {}\n", it.param1.printable().c_str(), it.param2.printable().c_str());
|
||||
fmt::print(" Memory: ({})\n", memoryReply.mutations[memoryVerifyIdx].mutations.size());
|
||||
for (auto& it : memoryReply.mutations[memoryVerifyIdx].mutations) {
|
||||
if (it.type == MutationRef::SetValue) {
|
||||
fmt::print(" {}=\n", it.param1.printable().c_str());
|
||||
} else {
|
||||
fmt::print(" {} - {}\n", it.param1.printable().c_str(), it.param2.printable().c_str());
|
||||
}
|
||||
}
|
||||
fmt::print(" Disk(pre-filter): ({})\n", mutations.size());
|
||||
for (auto& it : mutations) {
|
||||
if (it.type == MutationRef::SetValue) {
|
||||
fmt::print(" {}=\n", it.param1.printable().c_str());
|
||||
} else {
|
||||
fmt::print(" {} - {}\n", it.param1.printable().c_str(), it.param2.printable().c_str());
|
||||
}
|
||||
}
|
||||
ASSERT(false);
|
||||
}
|
||||
ASSERT(false);
|
||||
}
|
||||
remainingDurableBytes -=
|
||||
sizeof(KeyValueRef) +
|
||||
|
@ -5369,22 +5425,27 @@ ACTOR Future<Void> tryGetRange(PromiseStream<RangeResult> results, Transaction*
|
|||
// We have to store the version the change feed was stopped at in the SS instead of just the stopped status
|
||||
// In addition to simplifying stopping logic, it enables communicating stopped status when fetching change feeds
|
||||
// from other SS correctly
|
||||
const Value changeFeedSSValue(KeyRangeRef const& range, Version popVersion, Version stopVersion) {
|
||||
const Value changeFeedSSValue(KeyRangeRef const& range,
|
||||
Version popVersion,
|
||||
Version stopVersion,
|
||||
Version metadataVersion) {
|
||||
BinaryWriter wr(IncludeVersion(ProtocolVersion::withChangeFeed()));
|
||||
wr << range;
|
||||
wr << popVersion;
|
||||
wr << stopVersion;
|
||||
wr << metadataVersion;
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
std::tuple<KeyRange, Version, Version> decodeChangeFeedSSValue(ValueRef const& value) {
|
||||
std::tuple<KeyRange, Version, Version, Version> decodeChangeFeedSSValue(ValueRef const& value) {
|
||||
KeyRange range;
|
||||
Version popVersion, stopVersion;
|
||||
Version popVersion, stopVersion, metadataVersion;
|
||||
BinaryReader reader(value, IncludeVersion());
|
||||
reader >> range;
|
||||
reader >> popVersion;
|
||||
reader >> stopVersion;
|
||||
return std::make_tuple(range, popVersion, stopVersion);
|
||||
reader >> metadataVersion;
|
||||
return std::make_tuple(range, popVersion, stopVersion, metadataVersion);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req) {
|
||||
|
@ -5418,10 +5479,12 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req)
|
|||
auto& mLV = self->addVersionToMutationLog(durableVersion);
|
||||
self->addMutationToMutationLog(
|
||||
mLV,
|
||||
MutationRef(
|
||||
MutationRef::SetValue,
|
||||
persistChangeFeedKeys.begin.toString() + feed->second->id.toString(),
|
||||
changeFeedSSValue(feed->second->range, feed->second->emptyVersion + 1, feed->second->stopVersion)));
|
||||
MutationRef(MutationRef::SetValue,
|
||||
persistChangeFeedKeys.begin.toString() + feed->second->id.toString(),
|
||||
changeFeedSSValue(feed->second->range,
|
||||
feed->second->emptyVersion + 1,
|
||||
feed->second->stopVersion,
|
||||
feed->second->metadataVersion)));
|
||||
if (feed->second->storageVersion != invalidVersion) {
|
||||
++self->counters.kvSystemClearRanges;
|
||||
self->addMutationToMutationLog(mLV,
|
||||
|
@ -5513,7 +5576,8 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
|
|||
persistChangeFeedKeys.begin.toString() + changeFeedInfo->id.toString(),
|
||||
changeFeedSSValue(changeFeedInfo->range,
|
||||
changeFeedInfo->emptyVersion + 1,
|
||||
changeFeedInfo->stopVersion)));
|
||||
changeFeedInfo->stopVersion,
|
||||
changeFeedInfo->metadataVersion)));
|
||||
data->addMutationToMutationLog(
|
||||
mLV,
|
||||
MutationRef(MutationRef::ClearRange,
|
||||
|
@ -5632,8 +5696,10 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
|
|||
mLV,
|
||||
MutationRef(MutationRef::SetValue,
|
||||
persistChangeFeedKeys.begin.toString() + changeFeedInfo->id.toString(),
|
||||
changeFeedSSValue(
|
||||
changeFeedInfo->range, changeFeedInfo->emptyVersion + 1, changeFeedInfo->stopVersion)));
|
||||
changeFeedSSValue(changeFeedInfo->range,
|
||||
changeFeedInfo->emptyVersion + 1,
|
||||
changeFeedInfo->stopVersion,
|
||||
changeFeedInfo->metadataVersion)));
|
||||
data->addMutationToMutationLog(mLV,
|
||||
MutationRef(MutationRef::ClearRange,
|
||||
changeFeedDurableKey(changeFeedInfo->id, 0),
|
||||
|
@ -5730,13 +5796,6 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
|
|||
}
|
||||
}
|
||||
|
||||
/*fmt::print("DBG: SS {} Feed {} possibly destroyed {}, {} metadata create, {} desired committed\n",
|
||||
data->thisServerID.toString().substr(0, 4),
|
||||
changeFeedInfo->id.printable(),
|
||||
changeFeedInfo->possiblyDestroyed,
|
||||
changeFeedInfo->metadataCreateVersion,
|
||||
data->desiredOldestVersion.get());*/
|
||||
|
||||
// There are two reasons for change_feed_not_registered:
|
||||
// 1. The feed was just created, but the ss mutation stream is ahead of the GRV that fetchChangeFeedApplier
|
||||
// uses to read the change feed data from the database. In this case we need to wait and retry
|
||||
|
@ -5775,7 +5834,7 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
|
|||
data->changeFeedCleanupDurable[changeFeedInfo->id] = cleanupVersion;
|
||||
}
|
||||
|
||||
for (auto& it : data->changeFeedRemovals) {
|
||||
for (auto& it : data->changeFeedDestroys) {
|
||||
it.second.send(changeFeedInfo->id);
|
||||
}
|
||||
|
||||
|
@ -5791,7 +5850,7 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
|
|||
|
||||
ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
||||
KeyRange keys,
|
||||
PromiseStream<Key> removals,
|
||||
PromiseStream<Key> destroyedFeeds,
|
||||
UID fetchKeysID) {
|
||||
|
||||
// Wait for current TLog batch to finish to ensure that we're fetching metadata at a version >= the version of the
|
||||
|
@ -5805,82 +5864,55 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
.detail("FetchVersion", fetchVersion)
|
||||
.detail("FKID", fetchKeysID);
|
||||
|
||||
state std::set<Key> refreshedFeedIds;
|
||||
state std::set<Key> destroyedFeedIds;
|
||||
// before fetching feeds from other SS's, refresh any feeds we already have that are being marked as removed
|
||||
state OverlappingChangeFeedsInfo feedMetadata = wait(data->cx->getOverlappingChangeFeeds(keys, fetchVersion));
|
||||
// rest of this actor needs to happen without waits that might yield to scheduler, to avoid races in feed metadata.
|
||||
|
||||
// Find set of feeds we currently have that were not present in fetch, to infer that they may have been destroyed.
|
||||
state std::unordered_map<Key, Version> missingFeeds;
|
||||
auto ranges = data->keyChangeFeed.intersectingRanges(keys);
|
||||
for (auto& r : ranges) {
|
||||
for (auto& cfInfo : r.value()) {
|
||||
auto feedCleanup = data->changeFeedCleanupDurable.find(cfInfo->id);
|
||||
if (feedCleanup != data->changeFeedCleanupDurable.end() && cfInfo->removing && !cfInfo->destroyed) {
|
||||
CODE_PROBE(true, "re-fetching feed scheduled for deletion! Un-mark it as removing");
|
||||
destroyedFeedIds.insert(cfInfo->id);
|
||||
|
||||
cfInfo->removing = false;
|
||||
// because we now have a gap in the metadata, it's possible this feed was destroyed
|
||||
cfInfo->possiblyDestroyed = true;
|
||||
// Set refreshInProgress, so that if this actor is replaced by an expanded move actor, the new actor
|
||||
// picks up the refresh
|
||||
cfInfo->refreshInProgress = true;
|
||||
// reset fetch versions because everything previously fetched was cleaned up
|
||||
cfInfo->fetchVersion = invalidVersion;
|
||||
|
||||
cfInfo->durableFetchVersion = NotifiedVersion();
|
||||
|
||||
TraceEvent(SevDebug, "ResetChangeFeedInfo", data->thisServerID)
|
||||
.detail("RangeID", cfInfo->id)
|
||||
.detail("Range", cfInfo->range)
|
||||
.detail("FetchVersion", fetchVersion)
|
||||
.detail("EmptyVersion", cfInfo->emptyVersion)
|
||||
.detail("StopVersion", cfInfo->stopVersion)
|
||||
.detail("FKID", fetchKeysID);
|
||||
} else if (cfInfo->refreshInProgress) {
|
||||
CODE_PROBE(true, "Racing refreshes for same change feed in fetch");
|
||||
destroyedFeedIds.insert(cfInfo->id);
|
||||
if (cfInfo->removing && !cfInfo->destroyed) {
|
||||
missingFeeds.insert({ cfInfo->id, cfInfo->metadataVersion });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state std::vector<OverlappingChangeFeedEntry> feeds = wait(data->cx->getOverlappingChangeFeeds(keys, fetchVersion));
|
||||
// handle change feeds removed while fetching overlapping
|
||||
while (removals.getFuture().isReady()) {
|
||||
Key remove = waitNext(removals.getFuture());
|
||||
for (int i = 0; i < feeds.size(); i++) {
|
||||
if (feeds[i].rangeId == remove) {
|
||||
swapAndPop(&feeds, i--);
|
||||
// handle change feeds destroyed while fetching overlapping info
|
||||
while (destroyedFeeds.getFuture().isReady()) {
|
||||
Key destroyed = waitNext(destroyedFeeds.getFuture());
|
||||
for (int i = 0; i < feedMetadata.feeds.size(); i++) {
|
||||
if (feedMetadata.feeds[i].feedId == destroyed) {
|
||||
missingFeeds.erase(destroyed); // feed definitely destroyed, no need to infer
|
||||
swapAndPop(&feedMetadata.feeds, i--);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<Key> feedIds;
|
||||
feedIds.reserve(feeds.size());
|
||||
feedIds.reserve(feedMetadata.feeds.size());
|
||||
// create change feed metadata if it does not exist
|
||||
for (auto& cfEntry : feeds) {
|
||||
auto cleanupEntry = data->changeFeedCleanupDurable.find(cfEntry.rangeId);
|
||||
for (auto& cfEntry : feedMetadata.feeds) {
|
||||
auto cleanupEntry = data->changeFeedCleanupDurable.find(cfEntry.feedId);
|
||||
bool cleanupPending = cleanupEntry != data->changeFeedCleanupDurable.end();
|
||||
feedIds.push_back(cfEntry.rangeId);
|
||||
auto existingEntry = data->uidChangeFeed.find(cfEntry.rangeId);
|
||||
auto existingEntry = data->uidChangeFeed.find(cfEntry.feedId);
|
||||
bool existing = existingEntry != data->uidChangeFeed.end();
|
||||
|
||||
TraceEvent(SevDebug, "FetchedChangeFeedInfo", data->thisServerID)
|
||||
.detail("RangeID", cfEntry.rangeId)
|
||||
.detail("RangeID", cfEntry.feedId)
|
||||
.detail("Range", cfEntry.range)
|
||||
.detail("FetchVersion", fetchVersion)
|
||||
.detail("EmptyVersion", cfEntry.emptyVersion)
|
||||
.detail("StopVersion", cfEntry.stopVersion)
|
||||
.detail("FeedMetadataVersion", cfEntry.feedMetadataVersion)
|
||||
.detail("Existing", existing)
|
||||
.detail("ExistingMetadataVersion", existing ? existingEntry->second->metadataVersion : invalidVersion)
|
||||
.detail("CleanupPendingVersion", cleanupPending ? cleanupEntry->second : invalidVersion)
|
||||
.detail("FKID", fetchKeysID);
|
||||
|
||||
bool addMutationToLog = false;
|
||||
Reference<ChangeFeedInfo> changeFeedInfo;
|
||||
|
||||
auto fid = destroyedFeedIds.find(cfEntry.rangeId);
|
||||
if (fid != destroyedFeedIds.end()) {
|
||||
refreshedFeedIds.insert(cfEntry.rangeId);
|
||||
destroyedFeedIds.erase(fid);
|
||||
}
|
||||
|
||||
if (!existing) {
|
||||
CODE_PROBE(cleanupPending,
|
||||
"Fetch change feed which is cleanup pending. This means there was a move away and a move back, "
|
||||
|
@ -5888,24 +5920,51 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
|
||||
changeFeedInfo = Reference<ChangeFeedInfo>(new ChangeFeedInfo());
|
||||
changeFeedInfo->range = cfEntry.range;
|
||||
changeFeedInfo->id = cfEntry.rangeId;
|
||||
changeFeedInfo->id = cfEntry.feedId;
|
||||
|
||||
changeFeedInfo->emptyVersion = cfEntry.emptyVersion;
|
||||
changeFeedInfo->stopVersion = cfEntry.stopVersion;
|
||||
data->uidChangeFeed[cfEntry.rangeId] = changeFeedInfo;
|
||||
data->uidChangeFeed[cfEntry.feedId] = changeFeedInfo;
|
||||
auto rs = data->keyChangeFeed.modify(cfEntry.range);
|
||||
for (auto r = rs.begin(); r != rs.end(); ++r) {
|
||||
r->value().push_back(changeFeedInfo);
|
||||
}
|
||||
data->keyChangeFeed.coalesce(cfEntry.range.contents());
|
||||
data->keyChangeFeed.coalesce(cfEntry.range);
|
||||
|
||||
addMutationToLog = true;
|
||||
} else {
|
||||
changeFeedInfo = existingEntry->second;
|
||||
|
||||
CODE_PROBE(cfEntry.feedMetadataVersion > data->version.get(),
|
||||
"Change Feed fetched future metadata version");
|
||||
|
||||
auto fid = missingFeeds.find(cfEntry.feedId);
|
||||
if (fid != missingFeeds.end()) {
|
||||
TraceEvent(SevDebug, "ResetChangeFeedInfo", data->thisServerID)
|
||||
.detail("RangeID", changeFeedInfo->id.printable())
|
||||
.detail("Range", changeFeedInfo->range)
|
||||
.detail("FetchVersion", fetchVersion)
|
||||
.detail("EmptyVersion", changeFeedInfo->emptyVersion)
|
||||
.detail("StopVersion", changeFeedInfo->stopVersion)
|
||||
.detail("PreviousMetadataVersion", changeFeedInfo->metadataVersion)
|
||||
.detail("NewMetadataVersion", cfEntry.feedMetadataVersion)
|
||||
.detail("FKID", fetchKeysID);
|
||||
|
||||
missingFeeds.erase(fid);
|
||||
ASSERT(!changeFeedInfo->destroyed);
|
||||
ASSERT(changeFeedInfo->removing);
|
||||
CODE_PROBE(true, "re-fetching feed scheduled for deletion! Un-mark it as removing");
|
||||
|
||||
changeFeedInfo->removing = false;
|
||||
// reset fetch versions because everything previously fetched was cleaned up
|
||||
changeFeedInfo->fetchVersion = invalidVersion;
|
||||
changeFeedInfo->durableFetchVersion = NotifiedVersion();
|
||||
|
||||
addMutationToLog = true;
|
||||
}
|
||||
|
||||
if (changeFeedInfo->destroyed) {
|
||||
// race where multiple feeds fetched overlapping change feed, one realized feed was missing and marked
|
||||
// it removed+destroyed, then this one fetched the same info
|
||||
CODE_PROBE(true, "Change feed fetched and destroyed by other fetch while fetching metadata");
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -5925,82 +5984,63 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
addMutationToLog = true;
|
||||
}
|
||||
}
|
||||
feedIds.push_back(cfEntry.feedId);
|
||||
addMutationToLog |= changeFeedInfo->updateMetadataVersion(cfEntry.feedMetadataVersion);
|
||||
if (addMutationToLog) {
|
||||
ASSERT(changeFeedInfo.isValid());
|
||||
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
|
||||
Version logV = data->data().getLatestVersion();
|
||||
auto& mLV = data->addVersionToMutationLog(logV);
|
||||
data->addMutationToMutationLog(
|
||||
mLV,
|
||||
MutationRef(
|
||||
MutationRef::SetValue,
|
||||
persistChangeFeedKeys.begin.toString() + cfEntry.rangeId.toString(),
|
||||
changeFeedSSValue(cfEntry.range, changeFeedInfo->emptyVersion + 1, changeFeedInfo->stopVersion)));
|
||||
MutationRef(MutationRef::SetValue,
|
||||
persistChangeFeedKeys.begin.toString() + cfEntry.feedId.toString(),
|
||||
changeFeedSSValue(cfEntry.range,
|
||||
changeFeedInfo->emptyVersion + 1,
|
||||
changeFeedInfo->stopVersion,
|
||||
changeFeedInfo->metadataVersion)));
|
||||
// if we updated pop version, remove mutations
|
||||
while (!changeFeedInfo->mutations.empty() &&
|
||||
changeFeedInfo->mutations.front().version <= changeFeedInfo->emptyVersion) {
|
||||
changeFeedInfo->mutations.pop_front();
|
||||
}
|
||||
if (BUGGIFY) {
|
||||
data->maybeInjectTargetedRestart(logV);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
CODE_PROBE(!refreshedFeedIds.empty(), "Feed refreshed between move away and move back");
|
||||
CODE_PROBE(!destroyedFeedIds.empty(), "Feed destroyed between move away and move back");
|
||||
for (auto& feedId : refreshedFeedIds) {
|
||||
auto existingEntry = data->uidChangeFeed.find(feedId);
|
||||
if (existingEntry == data->uidChangeFeed.end() || existingEntry->second->destroyed ||
|
||||
!existingEntry->second->refreshInProgress) {
|
||||
CODE_PROBE(true, "feed refreshed");
|
||||
for (auto& feed : missingFeeds) {
|
||||
auto existingEntry = data->uidChangeFeed.find(feed.first);
|
||||
ASSERT(existingEntry != data->uidChangeFeed.end());
|
||||
ASSERT(existingEntry->second->removing);
|
||||
ASSERT(!existingEntry->second->destroyed);
|
||||
|
||||
Version fetchedMetadataVersion = feedMetadata.getFeedMetadataVersion(existingEntry->second->range);
|
||||
Version lastMetadataVersion = feed.second;
|
||||
// Look for case where feed's range was moved away, feed was destroyed, and then feed's range was moved back.
|
||||
// This happens where feed is removing, the fetch metadata is higher than the moved away version, and the feed
|
||||
// isn't in the fetched response. In that case, the feed must have been destroyed between lastMetadataVersion
|
||||
// and fetchedMetadataVersion
|
||||
if (lastMetadataVersion >= fetchedMetadataVersion) {
|
||||
CODE_PROBE(true, "Change Feed fetched higher metadata version before moved away");
|
||||
continue;
|
||||
}
|
||||
|
||||
// Since cleanup put a mutation in the log to delete the change feed data, put one in the log to restore
|
||||
// it
|
||||
// We may just want to refactor this so updateStorage does explicit deletes based on
|
||||
// changeFeedCleanupDurable and not use the mutation log at all for the change feed metadata cleanup.
|
||||
// Then we wouldn't have to reset anything here or above
|
||||
// Do the mutation log update here instead of above to ensure we only add it back to the mutation log if we're
|
||||
// sure it wasn't deleted in the metadata gap
|
||||
Version metadataVersion = data->data().getLatestVersion();
|
||||
auto& mLV = data->addVersionToMutationLog(metadataVersion);
|
||||
data->addMutationToMutationLog(
|
||||
mLV,
|
||||
MutationRef(MutationRef::SetValue,
|
||||
persistChangeFeedKeys.begin.toString() + existingEntry->second->id.toString(),
|
||||
changeFeedSSValue(existingEntry->second->range,
|
||||
existingEntry->second->emptyVersion + 1,
|
||||
existingEntry->second->stopVersion)));
|
||||
TraceEvent(SevDebug, "PersistingResetChangeFeedInfo", data->thisServerID)
|
||||
.detail("RangeID", existingEntry->second->id)
|
||||
.detail("Range", existingEntry->second->range)
|
||||
.detail("FetchVersion", fetchVersion)
|
||||
.detail("EmptyVersion", existingEntry->second->emptyVersion)
|
||||
.detail("StopVersion", existingEntry->second->stopVersion)
|
||||
.detail("FKID", fetchKeysID)
|
||||
.detail("MetadataVersion", metadataVersion);
|
||||
existingEntry->second->refreshInProgress = false;
|
||||
}
|
||||
for (auto& feedId : destroyedFeedIds) {
|
||||
auto existingEntry = data->uidChangeFeed.find(feedId);
|
||||
if (existingEntry == data->uidChangeFeed.end() || existingEntry->second->destroyed) {
|
||||
CODE_PROBE(true, "feed refreshed but then destroyed elsewhere");
|
||||
continue;
|
||||
}
|
||||
|
||||
/*fmt::print("DBG: SS {} fetching feed {} was refreshed but not present!! assuming destroyed\n",
|
||||
data->thisServerID.toString().substr(0, 4),
|
||||
feedId.printable());*/
|
||||
Version cleanupVersion = data->data().getLatestVersion();
|
||||
|
||||
CODE_PROBE(true, "Destroying change feed from fetch metadata"); //
|
||||
TraceEvent(SevDebug, "DestroyingChangeFeedFromFetchMetadata", data->thisServerID)
|
||||
.detail("RangeID", feedId)
|
||||
.detail("RangeID", feed.first)
|
||||
.detail("Range", existingEntry->second->range)
|
||||
.detail("Version", cleanupVersion)
|
||||
.detail("FKID", fetchKeysID);
|
||||
|
||||
if (g_network->isSimulated()) {
|
||||
ASSERT(g_simulator.validationData.allDestroyedChangeFeedIDs.count(feedId.toString()));
|
||||
// verify that the feed was actually destroyed and it's not an error in this inference logic
|
||||
ASSERT(g_simulator.validationData.allDestroyedChangeFeedIDs.count(feed.first.toString()));
|
||||
}
|
||||
|
||||
Key beginClearKey = feedId.withPrefix(persistChangeFeedKeys.begin);
|
||||
Key beginClearKey = feed.first.withPrefix(persistChangeFeedKeys.begin);
|
||||
|
||||
auto& mLV = data->addVersionToMutationLog(cleanupVersion);
|
||||
data->addMutationToMutationLog(mLV,
|
||||
|
@ -6008,15 +6048,18 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
++data->counters.kvSystemClearRanges;
|
||||
data->addMutationToMutationLog(mLV,
|
||||
MutationRef(MutationRef::ClearRange,
|
||||
changeFeedDurableKey(feedId, 0),
|
||||
changeFeedDurableKey(feedId, cleanupVersion)));
|
||||
changeFeedDurableKey(feed.first, 0),
|
||||
changeFeedDurableKey(feed.first, cleanupVersion)));
|
||||
++data->counters.kvSystemClearRanges;
|
||||
|
||||
existingEntry->second->destroy(cleanupVersion);
|
||||
data->changeFeedCleanupDurable[feedId] = cleanupVersion;
|
||||
data->changeFeedCleanupDurable[feed.first] = cleanupVersion;
|
||||
|
||||
for (auto& it : data->changeFeedRemovals) {
|
||||
it.second.send(feedId);
|
||||
for (auto& it : data->changeFeedDestroys) {
|
||||
it.second.send(feed.first);
|
||||
}
|
||||
if (BUGGIFY) {
|
||||
data->maybeInjectTargetedRestart(cleanupVersion);
|
||||
}
|
||||
}
|
||||
return feedIds;
|
||||
|
@ -6029,7 +6072,7 @@ ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer
|
|||
KeyRange keys,
|
||||
Version beginVersion,
|
||||
Version endVersion,
|
||||
PromiseStream<Key> removals,
|
||||
PromiseStream<Key> destroyedFeeds,
|
||||
std::vector<Key>* feedIds,
|
||||
std::unordered_set<Key> newFeedIds) {
|
||||
state std::unordered_map<Key, Version> feedMaxFetched;
|
||||
|
@ -6058,7 +6101,7 @@ ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer
|
|||
|
||||
loop {
|
||||
Future<Version> nextFeed = Never();
|
||||
if (!removals.getFuture().isReady()) {
|
||||
if (!destroyedFeeds.getFuture().isReady()) {
|
||||
bool done = true;
|
||||
while (!feedFetches.empty()) {
|
||||
if (feedFetches.begin()->second.isReady()) {
|
||||
|
@ -6078,11 +6121,11 @@ ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer
|
|||
}
|
||||
}
|
||||
choose {
|
||||
when(state Key remove = waitNext(removals.getFuture())) {
|
||||
when(state Key destroyed = waitNext(destroyedFeeds.getFuture())) {
|
||||
wait(delay(0));
|
||||
feedFetches.erase(remove);
|
||||
feedFetches.erase(destroyed);
|
||||
for (int i = 0; i < feedIds->size(); i++) {
|
||||
if ((*feedIds)[i] == remove) {
|
||||
if ((*feedIds)[i] == destroyed) {
|
||||
swapAndPop(feedIds, i--);
|
||||
}
|
||||
}
|
||||
|
@ -6093,7 +6136,7 @@ ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer
|
|||
|
||||
} catch (Error& e) {
|
||||
if (!data->shuttingDown) {
|
||||
data->changeFeedRemovals.erase(fetchKeysID);
|
||||
data->changeFeedDestroys.erase(fetchKeysID);
|
||||
}
|
||||
throw;
|
||||
}
|
||||
|
@ -6106,6 +6149,8 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
state Future<Void> warningLogger = logFetchKeysWarning(shard);
|
||||
state const double startTime = now();
|
||||
state Version fetchVersion = invalidVersion;
|
||||
|
||||
state PromiseStream<Key> destroyedFeeds;
|
||||
state FetchKeysMetricReporter metricReporter(fetchKeysID,
|
||||
startTime,
|
||||
keys,
|
||||
|
@ -6114,17 +6159,27 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
data->counters.bytesFetched,
|
||||
data->counters.kvFetched);
|
||||
|
||||
// need to set this at the very start of the fetch, to handle any private change feed destroy mutations we get for
|
||||
// this key range, that apply to change feeds we don't know about yet because their metadata hasn't been fetched yet
|
||||
data->changeFeedDestroys[fetchKeysID] = destroyedFeeds;
|
||||
|
||||
// delay(0) to force a return to the run loop before the work of fetchKeys is started.
|
||||
// This allows adding->start() to be called inline with CSK.
|
||||
wait(data->coreStarted.getFuture() && delay(0));
|
||||
try {
|
||||
wait(data->coreStarted.getFuture() && delay(0));
|
||||
|
||||
// On SS Reboot, durableVersion == latestVersion, so any mutations we add to the mutation log would be skipped if
|
||||
// added before latest version advances.
|
||||
// To ensure this doesn't happen, we wait for version to increase by one if this fetchKeys was initiated by a
|
||||
// changeServerKeys from restoreDurableState
|
||||
if (data->version.get() == data->durableVersion.get()) {
|
||||
wait(data->version.whenAtLeast(data->version.get() + 1));
|
||||
wait(delay(0));
|
||||
// On SS Reboot, durableVersion == latestVersion, so any mutations we add to the mutation log would be skipped
|
||||
// if added before latest version advances. To ensure this doesn't happen, we wait for version to increase by
|
||||
// one if this fetchKeys was initiated by a changeServerKeys from restoreDurableState
|
||||
if (data->version.get() == data->durableVersion.get()) {
|
||||
wait(data->version.whenAtLeast(data->version.get() + 1));
|
||||
wait(delay(0));
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (!data->shuttingDown) {
|
||||
data->changeFeedDestroys.erase(fetchKeysID);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -6136,9 +6191,8 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
.detail("Version", data->version.get())
|
||||
.detail("FKID", fetchKeysID);
|
||||
|
||||
state PromiseStream<Key> removals;
|
||||
data->changeFeedRemovals[fetchKeysID] = removals;
|
||||
state Future<std::vector<Key>> fetchCFMetadata = fetchChangeFeedMetadata(data, keys, removals, fetchKeysID);
|
||||
state Future<std::vector<Key>> fetchCFMetadata =
|
||||
fetchChangeFeedMetadata(data, keys, destroyedFeeds, fetchKeysID);
|
||||
|
||||
validate(data);
|
||||
|
||||
|
@ -6395,8 +6449,14 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
// being recovered. Instead we wait for the updateStorage loop to commit something (and consequently also what
|
||||
// we have written)
|
||||
|
||||
state Future<std::unordered_map<Key, Version>> feedFetchMain = dispatchChangeFeeds(
|
||||
data, fetchKeysID, keys, 0, fetchVersion + 1, removals, &changeFeedsToFetch, std::unordered_set<Key>());
|
||||
state Future<std::unordered_map<Key, Version>> feedFetchMain = dispatchChangeFeeds(data,
|
||||
fetchKeysID,
|
||||
keys,
|
||||
0,
|
||||
fetchVersion + 1,
|
||||
destroyedFeeds,
|
||||
&changeFeedsToFetch,
|
||||
std::unordered_set<Key>());
|
||||
|
||||
state Future<Void> fetchDurable = data->durableVersion.whenAtLeast(data->storageVersion() + 1);
|
||||
state Future<Void> dataArrive = data->version.whenAtLeast(fetchVersion);
|
||||
|
@ -6459,7 +6519,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
keys,
|
||||
fetchVersion + 1,
|
||||
shard->transferredVersion,
|
||||
removals,
|
||||
destroyedFeeds,
|
||||
&changeFeedsToFetch,
|
||||
newChangeFeeds);
|
||||
|
||||
|
@ -6513,7 +6573,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
}
|
||||
}
|
||||
|
||||
data->changeFeedRemovals.erase(fetchKeysID);
|
||||
data->changeFeedDestroys.erase(fetchKeysID);
|
||||
|
||||
shard->phase = AddingShard::Waiting;
|
||||
|
||||
|
@ -6569,7 +6629,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
.errorUnsuppressed(e)
|
||||
.detail("Version", data->version.get());
|
||||
if (!data->shuttingDown) {
|
||||
data->changeFeedRemovals.erase(fetchKeysID);
|
||||
data->changeFeedDestroys.erase(fetchKeysID);
|
||||
}
|
||||
if (e.code() == error_code_actor_cancelled && !data->shuttingDown && shard->phase >= AddingShard::Fetching) {
|
||||
if (shard->phase < AddingShard::FetchingCF) {
|
||||
|
@ -6822,11 +6882,15 @@ void cleanUpChangeFeeds(StorageServer* data, const KeyRangeRef& keys, Version ve
|
|||
|
||||
auto feed = data->uidChangeFeed.find(f.first);
|
||||
if (feed != data->uidChangeFeed.end()) {
|
||||
feed->second->updateMetadataVersion(version);
|
||||
feed->second->removing = true;
|
||||
feed->second->refreshInProgress = false;
|
||||
feed->second->moved(feed->second->range);
|
||||
feed->second->newMutations.trigger();
|
||||
}
|
||||
|
||||
if (BUGGIFY) {
|
||||
data->maybeInjectTargetedRestart(durableVersion);
|
||||
}
|
||||
} else {
|
||||
// if just part of feed's range is moved away
|
||||
auto feed = data->uidChangeFeed.find(f.first);
|
||||
|
@ -7447,7 +7511,7 @@ private:
|
|||
.detail("Status", status);
|
||||
|
||||
// Because of data moves, we can get mutations operating on a change feed we don't yet know about, because
|
||||
// the fetch hasn't started yet
|
||||
// the metadata fetch hasn't started yet
|
||||
bool createdFeed = false;
|
||||
if (feed == data->uidChangeFeed.end() && status != ChangeFeedStatus::CHANGE_FEED_DESTROY) {
|
||||
createdFeed = true;
|
||||
|
@ -7479,6 +7543,9 @@ private:
|
|||
}
|
||||
data->keyChangeFeed.coalesce(changeFeedRange.contents());
|
||||
}
|
||||
if (feed != data->uidChangeFeed.end()) {
|
||||
feed->second->updateMetadataVersion(currentVersion);
|
||||
}
|
||||
|
||||
bool popMutationLog = false;
|
||||
bool addMutationToLog = false;
|
||||
|
@ -7540,22 +7607,29 @@ private:
|
|||
|
||||
feed->second->destroy(currentVersion);
|
||||
data->changeFeedCleanupDurable[feed->first] = cleanupVersion;
|
||||
|
||||
if (BUGGIFY) {
|
||||
data->maybeInjectTargetedRestart(cleanupVersion);
|
||||
}
|
||||
}
|
||||
|
||||
if (status == ChangeFeedStatus::CHANGE_FEED_DESTROY) {
|
||||
for (auto& it : data->changeFeedRemovals) {
|
||||
for (auto& it : data->changeFeedDestroys) {
|
||||
it.second.send(changeFeedId);
|
||||
}
|
||||
}
|
||||
|
||||
if (addMutationToLog) {
|
||||
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
|
||||
Version logV = data->data().getLatestVersion();
|
||||
auto& mLV = data->addVersionToMutationLog(logV);
|
||||
data->addMutationToMutationLog(
|
||||
mLV,
|
||||
MutationRef(MutationRef::SetValue,
|
||||
persistChangeFeedKeys.begin.toString() + changeFeedId.toString(),
|
||||
changeFeedSSValue(
|
||||
feed->second->range, feed->second->emptyVersion + 1, feed->second->stopVersion)));
|
||||
changeFeedSSValue(feed->second->range,
|
||||
feed->second->emptyVersion + 1,
|
||||
feed->second->stopVersion,
|
||||
feed->second->metadataVersion)));
|
||||
if (popMutationLog) {
|
||||
++data->counters.kvSystemClearRanges;
|
||||
data->addMutationToMutationLog(mLV,
|
||||
|
@ -7563,6 +7637,9 @@ private:
|
|||
changeFeedDurableKey(feed->second->id, 0),
|
||||
changeFeedDurableKey(feed->second->id, popVersion)));
|
||||
}
|
||||
if (BUGGIFY) {
|
||||
data->maybeInjectTargetedRestart(logV);
|
||||
}
|
||||
}
|
||||
} else if ((m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) &&
|
||||
m.param1.startsWith(TenantMetadata::tenantMapPrivatePrefix)) {
|
||||
|
@ -7775,6 +7852,10 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
|||
}
|
||||
}
|
||||
|
||||
if (data->maybeInjectDelay()) {
|
||||
wait(delay(deterministicRandom()->random01() * 10.0));
|
||||
}
|
||||
|
||||
while (data->byteSampleClearsTooLarge.get()) {
|
||||
wait(data->byteSampleClearsTooLarge.onChange());
|
||||
}
|
||||
|
@ -8524,6 +8605,7 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
|||
TraceEvent("RebootWhenDurableTriggered", data->thisServerID)
|
||||
.detail("NewOldestVersion", newOldestVersion)
|
||||
.detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
|
||||
CODE_PROBE(true, "SS rebooting after durable");
|
||||
// To avoid brokenPromise error, which is caused by the sender of the durableInProgress (i.e., this
|
||||
// process) never sets durableInProgress, we should set durableInProgress before send the
|
||||
// please_reboot() error. Otherwise, in the race situation when storage server receives both reboot and
|
||||
|
@ -8672,7 +8754,8 @@ void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available) {
|
|||
// ASSERT( self->debug_inApplyUpdate );
|
||||
ASSERT(!keys.empty());
|
||||
|
||||
auto& mLV = self->addVersionToMutationLog(self->data().getLatestVersion());
|
||||
Version logV = self->data().getLatestVersion();
|
||||
auto& mLV = self->addVersionToMutationLog(logV);
|
||||
|
||||
KeyRange availableKeys = KeyRangeRef(persistShardAvailableKeys.begin.toString() + keys.begin.toString(),
|
||||
persistShardAvailableKeys.begin.toString() + keys.end.toString());
|
||||
|
@ -8708,6 +8791,10 @@ void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available) {
|
|||
.detail("DeleteVersion", mLV.version + 1);
|
||||
}
|
||||
}
|
||||
|
||||
if (BUGGIFY) {
|
||||
self->maybeInjectTargetedRestart(logV);
|
||||
}
|
||||
}
|
||||
|
||||
void updateStorageShard(StorageServer* data, StorageServerShard shard) {
|
||||
|
@ -8744,7 +8831,8 @@ void updateStorageShard(StorageServer* data, StorageServerShard shard) {
|
|||
|
||||
void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned) {
|
||||
ASSERT(!keys.empty());
|
||||
auto& mLV = self->addVersionToMutationLog(self->data().getLatestVersion());
|
||||
Version logV = self->data().getLatestVersion();
|
||||
auto& mLV = self->addVersionToMutationLog(logV);
|
||||
KeyRange assignedKeys = KeyRangeRef(persistShardAssignedKeys.begin.toString() + keys.begin.toString(),
|
||||
persistShardAssignedKeys.begin.toString() + keys.end.toString());
|
||||
//TraceEvent("SetAssignedStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", assignedKeys.begin).detail("RangeEnd", assignedKeys.end);
|
||||
|
@ -8761,6 +8849,10 @@ void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned)
|
|||
assignedKeys.end,
|
||||
endAssigned ? LiteralStringRef("1") : LiteralStringRef("0")));
|
||||
}
|
||||
|
||||
if (BUGGIFY) {
|
||||
self->maybeInjectTargetedRestart(logV);
|
||||
}
|
||||
}
|
||||
|
||||
void StorageServerDisk::clearRange(KeyRangeRef keys) {
|
||||
|
@ -9164,13 +9256,15 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
|
|||
for (feedLoc = 0; feedLoc < changeFeeds.size(); feedLoc++) {
|
||||
Key changeFeedId = changeFeeds[feedLoc].key.removePrefix(persistChangeFeedKeys.begin);
|
||||
KeyRange changeFeedRange;
|
||||
Version popVersion, stopVersion;
|
||||
std::tie(changeFeedRange, popVersion, stopVersion) = decodeChangeFeedSSValue(changeFeeds[feedLoc].value);
|
||||
Version popVersion, stopVersion, metadataVersion;
|
||||
std::tie(changeFeedRange, popVersion, stopVersion, metadataVersion) =
|
||||
decodeChangeFeedSSValue(changeFeeds[feedLoc].value);
|
||||
TraceEvent(SevDebug, "RestoringChangeFeed", data->thisServerID)
|
||||
.detail("RangeID", changeFeedId)
|
||||
.detail("Range", changeFeedRange)
|
||||
.detail("StopVersion", stopVersion)
|
||||
.detail("PopVer", popVersion);
|
||||
.detail("PopVer", popVersion)
|
||||
.detail("MetadataVersion", metadataVersion);
|
||||
Reference<ChangeFeedInfo> changeFeedInfo(new ChangeFeedInfo());
|
||||
changeFeedInfo->range = changeFeedRange;
|
||||
changeFeedInfo->id = changeFeedId;
|
||||
|
@ -9178,6 +9272,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
|
|||
changeFeedInfo->storageVersion = version;
|
||||
changeFeedInfo->emptyVersion = popVersion - 1;
|
||||
changeFeedInfo->stopVersion = stopVersion;
|
||||
changeFeedInfo->metadataVersion = metadataVersion;
|
||||
data->uidChangeFeed[changeFeedId] = changeFeedInfo;
|
||||
auto rs = data->keyChangeFeed.modify(changeFeedRange);
|
||||
for (auto r = rs.begin(); r != rs.end(); ++r) {
|
||||
|
|
|
@ -237,57 +237,64 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
|||
while (timeTravelIt != timeTravelChecks.end() && currentTime >= timeTravelIt->first) {
|
||||
state OldRead oldRead = timeTravelIt->second;
|
||||
timeTravelChecksMemory -= oldRead.oldResult.expectedSize();
|
||||
// advance iterator before doing read, so if it gets error we don't retry it
|
||||
timeTravelIt = timeTravelChecks.erase(timeTravelIt);
|
||||
if (prevPurgeVersion == -1) {
|
||||
prevPurgeVersion = oldRead.v;
|
||||
}
|
||||
// advance iterator before doing read, so if it gets error we don't retry it
|
||||
|
||||
try {
|
||||
state Version newPurgeVersion = 0;
|
||||
state bool doPurging = allowPurging && deterministicRandom()->random01() < 0.5;
|
||||
if (doPurging) {
|
||||
Version maxPurgeVersion = oldRead.v;
|
||||
for (auto& it : timeTravelChecks) {
|
||||
maxPurgeVersion = std::min(it.second.v, maxPurgeVersion);
|
||||
}
|
||||
if (prevPurgeVersion < maxPurgeVersion) {
|
||||
newPurgeVersion = deterministicRandom()->randomInt64(prevPurgeVersion, maxPurgeVersion);
|
||||
prevPurgeVersion = std::max(prevPurgeVersion, newPurgeVersion);
|
||||
Key purgeKey = wait(cx->purgeBlobGranules(normalKeys, newPurgeVersion, {}, false));
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKey));
|
||||
self->purges++;
|
||||
} else {
|
||||
doPurging = false;
|
||||
}
|
||||
// before doing read, purge just before read version
|
||||
state Version newPurgeVersion = 0;
|
||||
state bool doPurging = allowPurging && deterministicRandom()->random01() < 0.5;
|
||||
if (doPurging) {
|
||||
CODE_PROBE(true, "BGV considering purge");
|
||||
Version maxPurgeVersion = oldRead.v;
|
||||
for (auto& it : timeTravelChecks) {
|
||||
maxPurgeVersion = std::min(it.second.v, maxPurgeVersion);
|
||||
}
|
||||
if (prevPurgeVersion < maxPurgeVersion) {
|
||||
CODE_PROBE(true, "BGV doing purge");
|
||||
newPurgeVersion = deterministicRandom()->randomInt64(prevPurgeVersion, maxPurgeVersion);
|
||||
prevPurgeVersion = std::max(prevPurgeVersion, newPurgeVersion);
|
||||
if (BGV_DEBUG) {
|
||||
fmt::print("BGV Purging @ {0}\n", newPurgeVersion);
|
||||
}
|
||||
try {
|
||||
Key purgeKey = wait(cx->purgeBlobGranules(normalKeys, newPurgeVersion, {}, false));
|
||||
if (BGV_DEBUG) {
|
||||
fmt::print("BGV Purged @ {0}, waiting\n", newPurgeVersion);
|
||||
}
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKey));
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_operation_cancelled) {
|
||||
throw e;
|
||||
}
|
||||
// purging shouldn't error, it should retry.
|
||||
if (BGV_DEBUG) {
|
||||
fmt::print("Unexpected error {0} purging @ {1}!\n", e.name(), newPurgeVersion);
|
||||
}
|
||||
ASSERT(false);
|
||||
}
|
||||
CODE_PROBE(true, "BGV purge complete");
|
||||
if (BGV_DEBUG) {
|
||||
fmt::print("BGV Purge complete @ {0}\n", newPurgeVersion);
|
||||
}
|
||||
self->purges++;
|
||||
} else {
|
||||
doPurging = false;
|
||||
}
|
||||
}
|
||||
|
||||
// do time travel read
|
||||
try {
|
||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> reReadResult =
|
||||
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, oldRead.v));
|
||||
if (!compareFDBAndBlob(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, BGV_DEBUG)) {
|
||||
self->mismatches++;
|
||||
}
|
||||
self->timeTravelReads++;
|
||||
|
||||
if (doPurging) {
|
||||
wait(self->killBlobWorkers(cx, self));
|
||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead =
|
||||
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, prevPurgeVersion));
|
||||
try {
|
||||
Version minSnapshotVersion = newPurgeVersion;
|
||||
for (auto& it : versionRead.second) {
|
||||
minSnapshotVersion = std::min(minSnapshotVersion, it.snapshotVersion);
|
||||
}
|
||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead =
|
||||
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, minSnapshotVersion - 1));
|
||||
ASSERT(false);
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) {
|
||||
throw;
|
||||
}
|
||||
ASSERT(e.code() == error_code_blob_granule_transaction_too_old);
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
fmt::print("Error TT: {0}\n", e.name());
|
||||
if (e.code() == error_code_blob_granule_transaction_too_old) {
|
||||
self->timeTravelTooOld++;
|
||||
// TODO: add debugging info for when this is a failure
|
||||
|
@ -297,6 +304,51 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
|||
oldRead.v);
|
||||
}
|
||||
}
|
||||
|
||||
// if purged just before read, verify that purge cleaned up data by restarting blob workers and
|
||||
// reading older than the purge version
|
||||
if (doPurging) {
|
||||
wait(self->killBlobWorkers(cx, self));
|
||||
if (BGV_DEBUG) {
|
||||
fmt::print("BGV Reading post-purge [{0} - {1}) @ {2}\n",
|
||||
oldRead.range.begin.printable(),
|
||||
oldRead.range.end.printable(),
|
||||
prevPurgeVersion);
|
||||
}
|
||||
// ensure purge version exactly is still readable
|
||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead1 =
|
||||
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, prevPurgeVersion));
|
||||
if (BGV_DEBUG) {
|
||||
fmt::print("BGV Post-purge first read:\n");
|
||||
printGranuleChunks(versionRead1.second);
|
||||
}
|
||||
try {
|
||||
// read at purgeVersion - 1, should NOT be readable
|
||||
Version minSnapshotVersion = newPurgeVersion;
|
||||
for (auto& it : versionRead1.second) {
|
||||
minSnapshotVersion = std::min(minSnapshotVersion, it.snapshotVersion);
|
||||
}
|
||||
if (BGV_DEBUG) {
|
||||
fmt::print("BGV Reading post-purge again [{0} - {1}) @ {2}\n",
|
||||
oldRead.range.begin.printable(),
|
||||
oldRead.range.end.printable(),
|
||||
minSnapshotVersion - 1);
|
||||
}
|
||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead2 =
|
||||
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, minSnapshotVersion - 1));
|
||||
if (BGV_DEBUG) {
|
||||
fmt::print("BGV ERROR: data not purged! Read successful!!\n");
|
||||
printGranuleChunks(versionRead2.second);
|
||||
}
|
||||
ASSERT(false);
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) {
|
||||
throw;
|
||||
}
|
||||
ASSERT(e.code() == error_code_blob_granule_transaction_too_old);
|
||||
CODE_PROBE(true, "BGV verified too old after purge");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pick a random range
|
||||
|
@ -471,6 +523,8 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
|||
// For some reason simulation is still passing when this fails?.. so assert for now
|
||||
ASSERT(result);
|
||||
|
||||
// FIXME: if doPurging was set, possibly do one last purge here, and verify it succeeds with no errors
|
||||
|
||||
if (self->clientId == 0 && SERVER_KNOBS->BG_ENABLE_MERGING && deterministicRandom()->random01() < 0.1) {
|
||||
CODE_PROBE(true, "BGV clearing database and awaiting merge");
|
||||
wait(clearAndAwaitMerge(cx, normalKeys));
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
[configuration]
|
||||
blobGranulesEnabled = true
|
||||
allowDefaultTenant = false
|
||||
injectTargetedSSRestart = true
|
||||
injectSSDelay = true
|
||||
# FIXME: exclude redwood because WriteDuringRead can write massive KV pairs and we don't chunk change feed data on disk yet
|
||||
# FIXME: re-enable rocks at some point
|
||||
storageEngineExcludeTypes = [3, 4, 5]
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
[configuration]
|
||||
blobGranulesEnabled = true
|
||||
allowDefaultTenant = false
|
||||
injectTargetedSSRestart = true
|
||||
injectSSDelay = true
|
||||
# FIXME: re-enable rocks at some point
|
||||
storageEngineExcludeTypes = [4, 5]
|
||||
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
[configuration]
|
||||
blobGranulesEnabled = true
|
||||
allowDefaultTenant = false
|
||||
injectTargetedSSRestart = true
|
||||
injectSSDelay = true
|
||||
# FIXME: re-enable rocks at some point
|
||||
storageEngineExcludeTypes = [4, 5]
|
||||
|
||||
|
|
|
@ -2,6 +2,8 @@
|
|||
blobGranulesEnabled = true
|
||||
allowDefaultTenant = false
|
||||
allowDisablingTenants = false
|
||||
injectTargetedSSRestart = true
|
||||
injectSSDelay = true
|
||||
# FIXME: re-enable rocks at some point
|
||||
storageEngineExcludeTypes = [4, 5]
|
||||
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
[configuration]
|
||||
blobGranulesEnabled = true
|
||||
allowDefaultTenant = false
|
||||
injectTargetedSSRestart = true
|
||||
injectSSDelay = true
|
||||
# FIXME: re-enable rocks at some point
|
||||
storageEngineExcludeTypes = [4, 5]
|
||||
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
[configuration]
|
||||
blobGranulesEnabled = true
|
||||
allowDefaultTenant = false
|
||||
injectTargetedSSRestart = true
|
||||
injectSSDelay = true
|
||||
# FIXME: re-enable rocks at some point
|
||||
storageEngineExcludeTypes = [4, 5]
|
||||
|
||||
|
|
Loading…
Reference in New Issue