Improving change feed performance and fixing durability issue

This commit is contained in:
Josh Slocum 2022-02-02 11:10:32 -06:00
parent c90c356c2c
commit f187d01b71
1 changed files with 131 additions and 72 deletions

View File

@ -338,18 +338,16 @@ struct FetchInjectionInfo {
struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
std::deque<Standalone<MutationsAndVersionRef>> mutations;
Version fetchVersion =
invalidVersion; // The version that commits from a fetch have been written to storage, but have not yet been
// committed as part of updateStorage. Because a fetch can merge mutations with incoming data
// before updateStorage updates the storage version, updateStorage must know to skip mutations
// that have already been written to storage by change feed fetch.
Version fetchVersion = invalidVersion; // The version that commits from a fetch have been written to storage, but
// have not yet been committed as part of updateStorage.
Version storageVersion = invalidVersion; // The version between the storage version and the durable version are
// being written to disk as part of the current commit in updateStorage
// being written to disk as part of the current commit in updateStorage.
Version durableVersion = invalidVersion; // All versions before the durable version are durable on disk
Version emptyVersion = 0; // The change feed does not have any mutations before emptyVersion
KeyRange range;
Key id;
AsyncTrigger newMutations;
NotifiedVersion durableFetchVersion;
Promise<Void> onMove;
bool stopped = false; // A stopped change feed no longer adds new mutations, but is still queriable
bool removing = false;
@ -1713,7 +1711,7 @@ MutationsAndVersionRef filterMutations(Arena& arena,
#define DEBUG_SS_CF_BEGIN_VERSION invalidVersion
#define DEBUG_SS_CFM(ssId, cfId, v) \
ssId.toString().substr(0, 4) == DEBUG_SS_ID&& cfId.printable().substr(0, 6) == DEBUG_SS_CF_ID && \
(v == DEBUG_SS_CF_BEGIN_VERSION || latestVersion == DEBUG_SS_CF_BEGIN_VERSION)
(v >= DEBUG_SS_CF_BEGIN_VERSION || latestVersion == DEBUG_SS_CF_BEGIN_VERSION)
ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(StorageServer* data,
ChangeFeedStreamRequest req,
@ -1823,17 +1821,15 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
bool readDurable = feedInfo->durableVersion != invalidVersion && req.begin <= feedInfo->durableVersion;
bool readFetched = feedInfo->durableVersion < feedInfo->fetchVersion && req.begin <= feedInfo->fetchVersion;
if (req.end > emptyVersion + 1 && (readDurable || readFetched)) {
if (readFetched && feedInfo->durableVersion == invalidVersion) {
// To not block fetchKeys on making change feed data written to storage, we wait in here instead for all
// fetched data to become readable from the storage engine.
while (feedInfo->durableVersion == invalidVersion) {
TEST(true); // getChangeFeedMutations before any fetched data durable
// wait for next commit
wait(data->durableVersion.whenAtLeast(data->durableVersion.get() + 1));
// TODO it may be safer to always just wait for durableVersion whenAtLeast feedVersion?
// To return control back to updateStorage
wait(delay(0));
}
if (readFetched) {
// To not block fetchKeys making normal SS data readable on making change feed data written to storage, we
// wait in here instead for all fetched data to become readable from the storage engine.
TEST(true); // getChangeFeedMutations before fetched data durable
// wait for next commit to write feed data to storage
wait(feedInfo->durableFetchVersion.whenAtLeast(
std::min(feedInfo->fetchVersion, feedInfo->durableFetchVersion.get() + 1)));
// To return control back to updateStorage
wait(delay(0));
}
RangeResult res = wait(
data->storage.readRange(KeyRangeRef(changeFeedDurableKey(req.rangeID, std::max(req.begin, emptyVersion)),
@ -4075,33 +4071,32 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req)
}
// TODO handle being popped in the middle by removing after fetch
ACTOR Future<Void> fetchChangeFeedApplier(StorageServer* data,
Reference<ChangeFeedInfo> changeFeedInfo,
Key rangeId,
KeyRange range,
Version emptyVersion,
Version fetchVersion,
bool existing) {
// Returns max version fetched
ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
Reference<ChangeFeedInfo> changeFeedInfo,
Key rangeId,
KeyRange range,
Version emptyVersion,
Version beginVersion,
Version endVersion,
bool existing) {
Version startVersion = emptyVersion + 1;
if (startVersion < 0) {
startVersion = 0;
}
state Version startVersion = std::max(beginVersion, emptyVersion + 1);
ASSERT(startVersion >= 0);
if (startVersion >= fetchVersion) {
if (startVersion >= endVersion) {
TEST(true); // Change Feed popped before fetch
TraceEvent(SevDebug, "FetchChangeFeedNoOp", data->thisServerID)
.detail("RangeID", rangeId.printable())
.detail("Range", range.toString())
.detail("StartVersion", startVersion)
.detail("FetchVersion", fetchVersion);
return Void();
.detail("EndVersion", endVersion);
return invalidVersion;
}
state Reference<ChangeFeedData> feedResults = makeReference<ChangeFeedData>();
// TODO somwhere this is initialized to -2 instead of -1 but it's fine
state Future<Void> feed =
data->cx->getChangeFeedStream(feedResults, rangeId, startVersion, fetchVersion, range, true);
data->cx->getChangeFeedStream(feedResults, rangeId, startVersion, endVersion, range, true);
// TODO remove debugging eventually?
state Version firstVersion = invalidVersion;
@ -4151,21 +4146,22 @@ ACTOR Future<Void> fetchChangeFeedApplier(StorageServer* data,
TraceEvent(SevDebug, "FetchChangeFeedDone", data->thisServerID)
.detail("RangeID", rangeId.printable())
.detail("Range", range.toString())
.detail("FetchVersion", fetchVersion)
.detail("StartVersion", startVersion)
.detail("EndVersion", endVersion)
.detail("FirstFetchedVersion", firstVersion)
.detail("LastFetchedVersion", lastVersion)
.detail("VersionsFetched", versionsFetched)
.detail("Existing", existing);
return Void();
return lastVersion;
}
}
state PromiseStream<Standalone<MutationsAndVersionRef>> localResults;
// Add 2 to fetch version to make sure the local stream will have more versions in the stream than the remote stream
// Add 1 to fetch version to make sure the local stream will have more versions in the stream than the remote stream
// to avoid edge cases in the merge logic
state Future<Void> localStream =
localChangeFeedStream(data, localResults, rangeId, emptyVersion + 1, fetchVersion + 2, range);
localChangeFeedStream(data, localResults, rangeId, startVersion, endVersion + 1, range);
state Standalone<MutationsAndVersionRef> localResult;
Standalone<MutationsAndVersionRef> _localResult = waitNext(localResults.getFuture());
@ -4255,23 +4251,27 @@ ACTOR Future<Void> fetchChangeFeedApplier(StorageServer* data,
TraceEvent(SevDebug, "FetchChangeFeedDone", data->thisServerID)
.detail("RangeID", rangeId.printable())
.detail("Range", range.toString())
.detail("FetchVersion", fetchVersion)
.detail("StartVersion", startVersion)
.detail("EndVersion", endVersion)
.detail("FirstFetchedVersion", firstVersion)
.detail("LastFetchedVersion", lastVersion)
.detail("VersionsFetched", versionsFetched)
.detail("Existing", existing);
return Void();
return lastVersion;
}
ACTOR Future<Void> fetchChangeFeed(StorageServer* data,
Reference<ChangeFeedInfo> changeFeedInfo,
Version transferredVersion) {
// returns largest version fetched
ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
Reference<ChangeFeedInfo> changeFeedInfo,
Version beginVersion,
Version endVersion) {
wait(delay(0)); // allow this actor to be cancelled by removals
TraceEvent(SevDebug, "FetchChangeFeed", data->thisServerID)
.detail("RangeID", changeFeedInfo->id.printable())
.detail("Range", changeFeedInfo->range.toString())
.detail("FetchVersion", transferredVersion);
.detail("BeginVersion", beginVersion)
.detail("EndVersion", endVersion);
auto cleanupPending = data->changeFeedCleanupDurable.find(changeFeedInfo->id);
if (cleanupPending != data->changeFeedCleanupDurable.end()) {
@ -4280,7 +4280,8 @@ ACTOR Future<Void> fetchChangeFeed(StorageServer* data,
.detail("Range", changeFeedInfo->range.toString())
.detail("CleanupVersion", cleanupPending->second)
.detail("EmptyVersion", changeFeedInfo->emptyVersion)
.detail("FetchVersion", transferredVersion);
.detail("BeginVersion", beginVersion)
.detail("EndVersion", endVersion);
wait(data->durableVersion.whenAtLeast(cleanupPending->second + 1));
ASSERT(!data->changeFeedCleanupDurable.count(changeFeedInfo->id));
}
@ -4288,15 +4289,16 @@ ACTOR Future<Void> fetchChangeFeed(StorageServer* data,
loop {
try {
// TODO clean up existing param for !existing
wait(fetchChangeFeedApplier(data,
changeFeedInfo,
changeFeedInfo->id,
changeFeedInfo->range,
changeFeedInfo->emptyVersion,
transferredVersion,
false));
Version maxFetched = wait(fetchChangeFeedApplier(data,
changeFeedInfo,
changeFeedInfo->id,
changeFeedInfo->range,
changeFeedInfo->emptyVersion,
beginVersion,
endVersion,
false));
data->fetchingChangeFeeds.insert(changeFeedInfo->id);
return Void();
return maxFetched;
} catch (Error& e) {
if (e.code() != error_code_change_feed_not_registered) {
throw;
@ -4376,33 +4378,38 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data, KeyR
return feedIds;
}
ACTOR Future<Void> dispatchChangeFeeds(StorageServer* data,
UID fetchKeysID,
KeyRange keys,
Version transferredVersion,
std::vector<Key> feedIds) {
// returns max version fetched for each feed
ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer* data,
UID fetchKeysID,
KeyRange keys,
Version beginVersion,
Version endVersion,
std::vector<Key> feedIds) {
// find overlapping range feeds
state std::map<Key, Future<Void>> feedFetches;
state std::unordered_map<Key, Version> feedMaxFetched;
state std::map<Key, Future<Version>> feedFetches;
state PromiseStream<Key> removals;
data->changeFeedRemovals[fetchKeysID] = removals;
try {
// TODO add trace events for some of these
for (auto& feedId : feedIds) {
auto feedIt = data->uidChangeFeed.find(feedId);
// TODO REMOVE this assert once we enable change feed deletion
ASSERT(feedIt != data->uidChangeFeed.end());
Reference<ChangeFeedInfo> feed = feedIt->second;
feedFetches[feed->id] = fetchChangeFeed(data, feed, transferredVersion);
feedFetches[feed->id] = fetchChangeFeed(data, feed, beginVersion, endVersion);
}
loop {
Future<Void> nextFeed = Never();
Future<Version> nextFeed = Never();
if (!removals.getFuture().isReady()) {
bool done = true;
while (!feedFetches.empty()) {
if (feedFetches.begin()->second.isReady()) {
Version maxFetched = feedFetches.begin()->second.get();
if (maxFetched != invalidVersion) {
feedFetches[feedFetches.begin()->first] = maxFetched;
}
feedFetches.erase(feedFetches.begin());
} else {
nextFeed = feedFetches.begin()->second;
@ -4412,12 +4419,12 @@ ACTOR Future<Void> dispatchChangeFeeds(StorageServer* data,
}
if (done) {
data->changeFeedRemovals.erase(fetchKeysID);
return Void();
return feedMaxFetched;
}
}
choose {
when(Key remove = waitNext(removals.getFuture())) { feedFetches.erase(remove); }
when(wait(nextFeed)) {}
when(wait(success(nextFeed))) {}
}
}
@ -4662,11 +4669,16 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// being recovered. Instead we wait for the updateStorage loop to commit something (and consequently also what
// we have written)
state Future<std::unordered_map<Key, Version>> feedFetchMain =
dispatchChangeFeeds(data, fetchKeysID, keys, 0, fetchVersion + 1, changeFeedsToFetch);
state Future<Void> fetchDurable = data->durableVersion.whenAtLeast(data->storageVersion() + 1);
holdingFKPL.release();
wait(fetchDurable);
state std::unordered_map<Key, Version> feedFetchedVersions = wait(feedFetchMain);
TraceEvent(SevDebug, "FKAfterFinalCommit", data->thisServerID)
.detail("FKID", interval.pairID)
.detail("SV", data->storageVersion())
@ -4697,8 +4709,11 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
ASSERT(shard->transferredVersion > data->storageVersion());
ASSERT(shard->transferredVersion == data->data().getLatestVersion());
state Future<Void> cfFetch =
dispatchChangeFeeds(data, fetchKeysID, keys, shard->transferredVersion, changeFeedsToFetch);
// This is split into two fetches to reduce tail. Fetch [0 - fetchVersion+1)
// once fetchVersion is finalized, and [fetchVersion+1, transferredVersion) here once transferredVersion is
// finalized
Future<std::unordered_map<Key, Version>> feedFetchTransferred = dispatchChangeFeeds(
data, fetchKeysID, keys, fetchVersion + 1, shard->transferredVersion, changeFeedsToFetch);
TraceEvent(SevDebug, "FetchKeysHaveData", data->thisServerID)
.detail("FKID", interval.pairID)
@ -4736,8 +4751,16 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
shard->updates.clear();
// wait on change feed fetch to complete before marking data as available
wait(cfFetch);
// wait on change feed fetch to complete writing to storage before marking data as available
std::unordered_map<Key, Version> feedFetchedVersions2 = wait(feedFetchTransferred);
for (auto& newFetch : feedFetchedVersions2) {
auto prevFetch = feedFetchedVersions.find(newFetch.first);
if (prevFetch != feedFetchedVersions.end()) {
prevFetch->second = std::max(prevFetch->second, newFetch.second);
} else {
feedFetchedVersions[newFetch.first] = newFetch.second;
}
}
setAvailableStatus(data,
keys,
@ -4749,6 +4772,21 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// Wait for the transferredVersion (and therefore the shard data) to be committed and durable.
wait(data->durableVersion.whenAtLeast(shard->transferredVersion));
// Also wait on all fetched change feed data to become committed and durable
while (!feedFetchedVersions.empty()) {
auto feed = feedFetchedVersions.begin();
state Key feedId = feed->first;
Version maxFetched = feed->second;
feedFetchedVersions.erase(feed);
auto feedIt = data->uidChangeFeed.find(feedId);
if (feedIt != data->uidChangeFeed.end() && feedIt->second->durableFetchVersion.get() < maxFetched) {
wait(feedIt->second->durableFetchVersion.whenAtLeast(maxFetched));
// return to updateStorage
wait(delay(0));
}
}
ASSERT(data->shards[shard->keys.begin]->assigned() &&
data->shards[shard->keys.begin]->keys ==
shard->keys); // We aren't changing whether the shard is assigned
@ -5091,7 +5129,7 @@ void StorageServer::addMutation(Version version,
if (!fromFetch) {
// have to do change feed before applyMutation because nonExpanded wasn't copied into the mutation log arena,
// and thus would go out of scope
// and thus would go out of scope if it wasn't copied into the change feed arena
applyChangeFeedMutation(this, expanded.type == MutationRef::ClearRange ? nonExpanded : expanded, version);
}
applyMutation(this, expanded, mLog.arena(), mutableData(), version);
@ -5913,14 +5951,17 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
data->changeFeedVersions.pop_front();
}
state std::vector<std::pair<Key, Version>> feedFetchVersions;
state std::vector<Key> updatedChangeFeeds(modifiedChangeFeeds.begin(), modifiedChangeFeeds.end());
state int curFeed = 0;
while (curFeed < updatedChangeFeeds.size()) {
auto info = data->uidChangeFeed.find(updatedChangeFeeds[curFeed]);
if (info != data->uidChangeFeed.end()) {
// Cannot yield in mutation updating loop because of race between fetchVersion and storageVersion
// Cannot yield in mutation updating loop because of race with fetchVersion
Version alreadyFetched = std::max(info->second->fetchVersion, info->second->durableFetchVersion.get());
for (auto& it : info->second->mutations) {
if (it.version <= info->second->fetchVersion) {
if (it.version <= alreadyFetched) {
continue;
} else if (it.version > newOldestVersion) {
break;
@ -5933,6 +5974,10 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
ASSERT(it.version >= info->second->storageVersion);
info->second->storageVersion = it.version;
}
if (info->second->fetchVersion != invalidVersion) {
feedFetchVersions.push_back(std::pair(info->second->id, info->second->fetchVersion));
}
// handle case where fetch had version ahead of last in-memory mutation
if (info->second->fetchVersion > info->second->storageVersion) {
info->second->storageVersion = std::min(info->second->fetchVersion, newOldestVersion);
@ -5942,6 +5987,8 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
// to add it back to fetchingChangeFeeds
data->fetchingChangeFeeds.insert(info->first);
}
} else {
info->second->fetchVersion = invalidVersion;
}
wait(yield(TaskPriority::UpdateStorage));
}
@ -5995,6 +6042,18 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
curFeed++;
}
// if commit included fetched data from this change feed, update the fetched durable version
curFeed = 0;
while (curFeed < feedFetchVersions.size()) {
auto info = data->uidChangeFeed.find(feedFetchVersions[curFeed].first);
if (info != data->uidChangeFeed.end()) {
if (feedFetchVersions[curFeed].second > info->second->durableFetchVersion.get()) {
info->second->durableFetchVersion.set(feedFetchVersions[curFeed].second);
}
}
curFeed++;
}
// remove any entries from changeFeedCleanupPending that were persisted
auto cfCleanup = data->changeFeedCleanupDurable.begin();
while (cfCleanup != data->changeFeedCleanupDurable.end()) {