Fixing new race in change feed cleanup while still working for fetch+cleanup race
This commit is contained in:
parent
cfbb3f5b2b
commit
10c3cc870f
|
@ -633,6 +633,7 @@ public:
|
|||
std::set<Key> currentChangeFeeds;
|
||||
std::set<Key> fetchingChangeFeeds;
|
||||
std::unordered_map<NetworkAddress, std::map<UID, Version>> changeFeedClientVersions;
|
||||
std::unordered_map<Key, Version> changeFeedCleanupDurable;
|
||||
|
||||
// newestAvailableVersion[k]
|
||||
// == invalidVersion -> k is unavailable at all versions
|
||||
|
@ -4154,6 +4155,18 @@ ACTOR Future<Void> fetchChangeFeed(StorageServer* data,
|
|||
.detail("RangeID", changeFeedInfo->id.printable())
|
||||
.detail("Range", changeFeedInfo->range.toString())
|
||||
.detail("FetchVersion", fetchVersion);
|
||||
|
||||
auto cleanupPending = data->changeFeedCleanupDurable.find(changeFeedInfo->id);
|
||||
if (cleanupPending != data->changeFeedCleanupDurable.end()) {
|
||||
TraceEvent(SevDebug, "FetchChangeFeedWaitCleanup", data->thisServerID)
|
||||
.detail("RangeID", changeFeedInfo->id.printable())
|
||||
.detail("Range", changeFeedInfo->range.toString())
|
||||
.detail("CleanupVersion", cleanupPending->second)
|
||||
.detail("FetchVersion", fetchVersion);
|
||||
wait(data->durableVersion.whenAtLeast(cleanupPending->second + 1));
|
||||
ASSERT(!data->changeFeedCleanupDurable.count(changeFeedInfo->id));
|
||||
}
|
||||
|
||||
loop {
|
||||
try {
|
||||
// TODO clean up existing param for !existing
|
||||
|
@ -4176,14 +4189,18 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data, KeyR
|
|||
feedIds.reserve(feeds.size());
|
||||
// create change feed metadata if it does not exist
|
||||
for (auto& cfEntry : feeds) {
|
||||
bool cleanupPending = data->changeFeedCleanupDurable.count(cfEntry.rangeId);
|
||||
feedIds.push_back(cfEntry.rangeId);
|
||||
bool existing = data->uidChangeFeed.count(cfEntry.rangeId);
|
||||
if (!existing) {
|
||||
TEST(cleanupPending); // Fetch change feed which is cleanup pending. This means there was a move away and a
|
||||
// move back, this will remake the metadata
|
||||
TraceEvent(SevDebug, "FetchChangeFeedMetadata", data->thisServerID)
|
||||
.detail("RangeID", cfEntry.rangeId.printable())
|
||||
.detail("Range", cfEntry.range.toString())
|
||||
.detail("FetchVersion", fetchVersion)
|
||||
.detail("Stopped", cfEntry.stopped);
|
||||
.detail("Stopped", cfEntry.stopped)
|
||||
.detail("CleanupPending", cleanupPending);
|
||||
|
||||
Reference<ChangeFeedInfo> changeFeedInfo = Reference<ChangeFeedInfo>(new ChangeFeedInfo());
|
||||
changeFeedInfo->range = cfEntry.range;
|
||||
|
@ -4202,6 +4219,8 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data, KeyR
|
|||
MutationRef(MutationRef::SetValue,
|
||||
persistChangeFeedKeys.begin.toString() + cfEntry.rangeId.toString(),
|
||||
changeFeedValue(cfEntry.range, invalidVersion, ChangeFeedStatus::CHANGE_FEED_CREATE)));
|
||||
} else {
|
||||
ASSERT(!cleanupPending);
|
||||
}
|
||||
}
|
||||
return feedIds;
|
||||
|
@ -4865,9 +4884,23 @@ void changeServerKeys(StorageServer* data,
|
|||
|
||||
if (!foundAssigned) {
|
||||
// TODO REMOVE
|
||||
|
||||
Version durableVersion = data->data().getLatestVersion();
|
||||
TraceEvent(SevDebug, "ChangeFeedCleanup", data->thisServerID)
|
||||
.detail("FeedID", f.first)
|
||||
.detail("Version", version);
|
||||
.detail("Version", version)
|
||||
.detail("DurableVersion", durableVersion);
|
||||
|
||||
data->changeFeedCleanupDurable[f.first] = durableVersion;
|
||||
|
||||
Key beginClearKey = f.first.withPrefix(persistChangeFeedKeys.begin);
|
||||
auto& mLV = data->addVersionToMutationLog(durableVersion);
|
||||
data->addMutationToMutationLog(
|
||||
mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey)));
|
||||
data->addMutationToMutationLog(mLV,
|
||||
MutationRef(MutationRef::ClearRange,
|
||||
changeFeedDurableKey(f.first, 0),
|
||||
changeFeedDurableKey(f.first, version)));
|
||||
|
||||
auto rs = data->keyChangeFeed.modify(f.second);
|
||||
for (auto r = rs.begin(); r != rs.end(); ++r) {
|
||||
|
@ -4886,14 +4919,6 @@ void changeServerKeys(StorageServer* data,
|
|||
feed->second->newMutations.trigger();
|
||||
data->uidChangeFeed.erase(feed);
|
||||
}
|
||||
|
||||
Key beginClearKey = f.first.withPrefix(persistChangeFeedKeys.begin);
|
||||
|
||||
// all fetching actors should be cancelled by now because removing=true and moved(), so it's safe to
|
||||
// clear storage directly
|
||||
data->storage.clearRange(KeyRangeRef(beginClearKey, keyAfter(beginClearKey)));
|
||||
data->storage.clearRange(
|
||||
KeyRangeRef(changeFeedDurableKey(f.first, 0), changeFeedDurableKey(f.first, version)));
|
||||
} else {
|
||||
// if just part of feed's range is moved away
|
||||
auto feed = data->uidChangeFeed.find(f.first);
|
||||
|
@ -5196,7 +5221,8 @@ private:
|
|||
.detail("Range", changeFeedRange.toString())
|
||||
.detail("Version", currentVersion);
|
||||
Key beginClearKey = changeFeedId.withPrefix(persistChangeFeedKeys.begin);
|
||||
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
|
||||
Version cleanupVersion = data->data().getLatestVersion();
|
||||
auto& mLV = data->addVersionToMutationLog(cleanupVersion);
|
||||
data->addMutationToMutationLog(
|
||||
mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey)));
|
||||
data->addMutationToMutationLog(mLV,
|
||||
|
@ -5214,6 +5240,7 @@ private:
|
|||
}
|
||||
data->keyChangeFeed.coalesce(feed->second->range.contents());
|
||||
data->uidChangeFeed.erase(feed);
|
||||
data->changeFeedCleanupDurable[feed->first] = cleanupVersion;
|
||||
}
|
||||
|
||||
if (addMutationToLog) {
|
||||
|
@ -5845,6 +5872,16 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
|||
curFeed++;
|
||||
}
|
||||
|
||||
// remove any entries from changeFeedCleanupPending that were persisted
|
||||
auto cfCleanup = data->changeFeedCleanupDurable.begin();
|
||||
while (cfCleanup != data->changeFeedCleanupDurable.end()) {
|
||||
if (cfCleanup->second <= newOldestVersion) {
|
||||
cfCleanup = data->changeFeedCleanupDurable.erase(cfCleanup);
|
||||
} else {
|
||||
cfCleanup++;
|
||||
}
|
||||
}
|
||||
|
||||
durableInProgress.send(Void());
|
||||
wait(delay(0, TaskPriority::UpdateStorage)); // Setting durableInProgess could cause the storage server to shut
|
||||
// down, so delay to check for cancellation
|
||||
|
|
Loading…
Reference in New Issue