addressing review comments
This commit is contained in:
parent
e0f6897481
commit
ba31229797
|
@ -9384,7 +9384,7 @@ Future<Void> DatabaseContext::getChangeFeedStream(Reference<ChangeFeedData> resu
|
|||
|
||||
Version OverlappingChangeFeedsInfo::getFeedMetadataVersion(const KeyRangeRef& range) const {
|
||||
Version v = invalidVersion;
|
||||
for (auto& it : metadataVersions) {
|
||||
for (auto& it : feedMetadataVersions) {
|
||||
if (it.second > v && it.first.intersects(range)) {
|
||||
v = it.second;
|
||||
}
|
||||
|
@ -9454,13 +9454,13 @@ ACTOR Future<OverlappingChangeFeedsInfo> getOverlappingChangeFeedsActor(Referenc
|
|||
for (int i = 0; i < locations.size(); i++) {
|
||||
result.arena.dependsOn(allOverlappingRequests[i].get().arena);
|
||||
result.arena.dependsOn(locations[i].range.arena());
|
||||
result.metadataVersions.push_back(
|
||||
{ locations[i].range, allOverlappingRequests[i].get().metadataVersion });
|
||||
result.feedMetadataVersions.push_back(
|
||||
{ locations[i].range, allOverlappingRequests[i].get().feedMetadataVersion });
|
||||
for (auto& it : allOverlappingRequests[i].get().feeds) {
|
||||
auto res = latestFeedMetadata.insert({ it.feedId, it });
|
||||
if (!res.second) {
|
||||
CODE_PROBE(true, "deduping fetched overlapping feed by higher metadata version");
|
||||
if (res.first->second.metadataVersion < it.metadataVersion) {
|
||||
if (res.first->second.feedMetadataVersion < it.feedMetadataVersion) {
|
||||
res.first->second = it;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -211,7 +211,7 @@ struct OverlappingChangeFeedsInfo {
|
|||
Arena arena;
|
||||
VectorRef<OverlappingChangeFeedEntry> feeds;
|
||||
// would prefer to use key range map but it complicates copy/move constructors
|
||||
std::vector<std::pair<KeyRangeRef, Version>> metadataVersions;
|
||||
std::vector<std::pair<KeyRangeRef, Version>> feedMetadataVersions;
|
||||
|
||||
// for a feed that wasn't present, returns the metadata version it would have been fetched at.
|
||||
Version getFeedMetadataVersion(const KeyRangeRef& feedRange) const;
|
||||
|
|
|
@ -970,11 +970,11 @@ struct OverlappingChangeFeedEntry {
|
|||
KeyRangeRef range;
|
||||
Version emptyVersion;
|
||||
Version stopVersion;
|
||||
Version metadataVersion;
|
||||
Version feedMetadataVersion;
|
||||
|
||||
bool operator==(const OverlappingChangeFeedEntry& r) const {
|
||||
return feedId == r.feedId && range == r.range && emptyVersion == r.emptyVersion &&
|
||||
stopVersion == r.stopVersion && metadataVersion == r.metadataVersion;
|
||||
stopVersion == r.stopVersion && feedMetadataVersion == r.feedMetadataVersion;
|
||||
}
|
||||
|
||||
OverlappingChangeFeedEntry() {}
|
||||
|
@ -982,17 +982,17 @@ struct OverlappingChangeFeedEntry {
|
|||
KeyRangeRef const& range,
|
||||
Version emptyVersion,
|
||||
Version stopVersion,
|
||||
Version metadataVersion)
|
||||
Version feedMetadataVersion)
|
||||
: feedId(feedId), range(range), emptyVersion(emptyVersion), stopVersion(stopVersion),
|
||||
metadataVersion(metadataVersion) {}
|
||||
feedMetadataVersion(feedMetadataVersion) {}
|
||||
|
||||
OverlappingChangeFeedEntry(Arena& arena, const OverlappingChangeFeedEntry& rhs)
|
||||
: feedId(arena, rhs.feedId), range(arena, rhs.range), emptyVersion(rhs.emptyVersion),
|
||||
stopVersion(rhs.stopVersion), metadataVersion(rhs.metadataVersion) {}
|
||||
stopVersion(rhs.stopVersion), feedMetadataVersion(rhs.feedMetadataVersion) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, feedId, range, emptyVersion, stopVersion, metadataVersion);
|
||||
serializer(ar, feedId, range, emptyVersion, stopVersion, feedMetadataVersion);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -1001,15 +1001,15 @@ struct OverlappingChangeFeedsReply {
|
|||
VectorRef<OverlappingChangeFeedEntry> feeds;
|
||||
bool cached;
|
||||
Arena arena;
|
||||
Version metadataVersion;
|
||||
Version feedMetadataVersion;
|
||||
|
||||
OverlappingChangeFeedsReply() : cached(false), metadataVersion(invalidVersion) {}
|
||||
explicit OverlappingChangeFeedsReply(VectorRef<OverlappingChangeFeedEntry> const& feeds, Version metadataVersion)
|
||||
: feeds(feeds), cached(false), metadataVersion(metadataVersion) {}
|
||||
OverlappingChangeFeedsReply() : cached(false), feedMetadataVersion(invalidVersion) {}
|
||||
explicit OverlappingChangeFeedsReply(VectorRef<OverlappingChangeFeedEntry> const& feeds, Version feedMetadataVersion)
|
||||
: feeds(feeds), cached(false), feedMetadataVersion(feedMetadataVersion) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, feeds, arena, metadataVersion);
|
||||
serializer(ar, feeds, arena, feedMetadataVersion);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -2101,7 +2101,7 @@ ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChang
|
|||
}
|
||||
}
|
||||
state OverlappingChangeFeedsReply reply;
|
||||
reply.metadataVersion = data->version.get();
|
||||
reply.feedMetadataVersion = data->version.get();
|
||||
for (auto& it : rangeIds) {
|
||||
reply.feeds.push_back_deep(reply.arena,
|
||||
OverlappingChangeFeedEntry(it.first,
|
||||
|
@ -2115,7 +2115,7 @@ ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChang
|
|||
.detail("Range", std::get<0>(it.second))
|
||||
.detail("EmptyVersion", std::get<1>(it.second))
|
||||
.detail("StopVersion", std::get<2>(it.second))
|
||||
.detail("MetadataVersion", std::get<3>(it.second));
|
||||
.detail("FeedMetadataVersion", std::get<3>(it.second));
|
||||
}
|
||||
|
||||
// Make sure all of the metadata we are sending won't get rolled back
|
||||
|
@ -5690,7 +5690,7 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
.detail("FetchVersion", fetchVersion)
|
||||
.detail("EmptyVersion", cfEntry.emptyVersion)
|
||||
.detail("StopVersion", cfEntry.stopVersion)
|
||||
.detail("MetadataVersion", cfEntry.metadataVersion)
|
||||
.detail("FeedMetadataVersion", cfEntry.feedMetadataVersion)
|
||||
.detail("Existing", existing)
|
||||
.detail("ExistingMetadataVersion", existing ? existingEntry->second->metadataVersion : invalidVersion)
|
||||
.detail("CleanupPendingVersion", cleanupPending ? cleanupEntry->second : invalidVersion)
|
||||
|
@ -5721,7 +5721,7 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
} else {
|
||||
changeFeedInfo = existingEntry->second;
|
||||
|
||||
CODE_PROBE(cfEntry.metadataVersion > data->version.get(), "Change Feed fetched future metadata version");
|
||||
CODE_PROBE(cfEntry.feedMetadataVersion > data->version.get(), "Change Feed fetched future metadata version");
|
||||
|
||||
auto fid = missingFeeds.find(cfEntry.feedId);
|
||||
if (fid != missingFeeds.end()) {
|
||||
|
@ -5732,7 +5732,7 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
.detail("EmptyVersion", changeFeedInfo->emptyVersion)
|
||||
.detail("StopVersion", changeFeedInfo->stopVersion)
|
||||
.detail("PreviousMetadataVersion", changeFeedInfo->metadataVersion)
|
||||
.detail("NewMetadataVersion", cfEntry.metadataVersion)
|
||||
.detail("NewMetadataVersion", cfEntry.feedMetadataVersion)
|
||||
.detail("FKID", fetchKeysID);
|
||||
|
||||
missingFeeds.erase(fid);
|
||||
|
@ -5770,7 +5770,7 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
}
|
||||
}
|
||||
feedIds.push_back(cfEntry.feedId);
|
||||
addMutationToLog |= changeFeedInfo->updateMetadataVersion(cfEntry.metadataVersion);
|
||||
addMutationToLog |= changeFeedInfo->updateMetadataVersion(cfEntry.feedMetadataVersion);
|
||||
if (addMutationToLog) {
|
||||
ASSERT(changeFeedInfo.isValid());
|
||||
Version logV = data->data().getLatestVersion();
|
||||
|
|
Loading…
Reference in New Issue