diff --git a/fdbcli/CMakeLists.txt b/fdbcli/CMakeLists.txt index 32323fb788..628f917436 100644 --- a/fdbcli/CMakeLists.txt +++ b/fdbcli/CMakeLists.txt @@ -16,6 +16,7 @@ set(FDBCLI_SRCS IncludeCommand.actor.cpp KillCommand.actor.cpp LockCommand.actor.cpp + ChangeFeedCommand.actor.cpp MaintenanceCommand.actor.cpp ProfileCommand.actor.cpp SetClassCommand.actor.cpp diff --git a/fdbcli/ChangeFeedCommand.actor.cpp b/fdbcli/ChangeFeedCommand.actor.cpp new file mode 100644 index 0000000000..d28d96c367 --- /dev/null +++ b/fdbcli/ChangeFeedCommand.actor.cpp @@ -0,0 +1,172 @@ +/* + * ChangeFeedCommand.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbcli/fdbcli.actor.h" + +#include "fdbclient/FDBOptions.g.h" +#include "fdbclient/IClientApi.h" +#include "fdbclient/Knobs.h" +#include "fdbclient/Schemas.h" +#include "fdbclient/ManagementAPI.actor.h" + +#include "flow/Arena.h" +#include "flow/FastRef.h" +#include "flow/ThreadHelper.actor.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +namespace { + +ACTOR Future changeFeedList(Database db) { + state ReadYourWritesTransaction tr(db); + loop { + try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + + RangeResult result = wait(tr.getRange(changeFeedKeys, CLIENT_KNOBS->TOO_MANY)); + // shouldn't have many quarantined TSSes + ASSERT(!result.more); + printf("Found %d range feeds%s\n", result.size(), result.size() == 0 ? "." : ":"); + for (auto& it : result) { + auto range = std::get<0>(decodeChangeFeedValue(it.value)); + printf(" %s: %s - %s\n", + it.key.removePrefix(changeFeedPrefix).toString().c_str(), + range.begin.toString().c_str(), + range.end.toString().c_str()); + } + return Void(); + } catch (Error& e) { + wait(tr.onError(e)); + } + } +} + +} // namespace + +namespace fdb_cli { + +ACTOR Future changeFeedCommandActor(Database localDb, std::vector tokens, Future warn) { + if (tokens.size() == 1) { + printUsage(tokens[0]); + return false; + } + if (tokencmp(tokens[1], "list")) { + if (tokens.size() != 2) { + printUsage(tokens[0]); + return false; + } + wait(changeFeedList(localDb)); + return true; + } else if (tokencmp(tokens[1], "register")) { + if (tokens.size() != 5) { + printUsage(tokens[0]); + return false; + } + wait(updateChangeFeed( + localDb, tokens[2], ChangeFeedStatus::CHANGE_FEED_CREATE, KeyRangeRef(tokens[3], tokens[4]))); + } else if (tokencmp(tokens[1], "stop")) { + if (tokens.size() != 3) { + printUsage(tokens[0]); + return false; + } + wait(updateChangeFeed(localDb, tokens[2], ChangeFeedStatus::CHANGE_FEED_STOP)); + } else if (tokencmp(tokens[1], "destroy")) { + if (tokens.size() != 3) { + printUsage(tokens[0]); + return false; + } + wait(updateChangeFeed(localDb, tokens[2], ChangeFeedStatus::CHANGE_FEED_DESTROY)); + } else if (tokencmp(tokens[1], "stream")) { + if (tokens.size() < 3 || tokens.size() > 5) { + printUsage(tokens[0]); + return false; + } + Version begin = 0; + Version end = std::numeric_limits::max(); + if (tokens.size() > 3) { + int n = 0; + if (sscanf(tokens[3].toString().c_str(), "%ld%n", &begin, &n) != 1 || n != tokens[3].size()) { + printUsage(tokens[0]); + return false; + } + } + if (tokens.size() > 4) { + int n = 0; + if (sscanf(tokens[4].toString().c_str(), "%ld%n", &end, &n) != 1 || n != tokens[4].size()) { + printUsage(tokens[0]); + return false; + } + } + if (warn.isValid()) { + warn.cancel(); + } + state PromiseStream>> feedResults; + state Future feed = localDb->getChangeFeedStream(feedResults, tokens[2], begin, end); + printf("\n"); + try { + state Future feedInterrupt = LineNoise::onKeyboardInterrupt(); + loop { + choose { + when(Standalone> res = waitNext(feedResults.getFuture())) { + for (auto& it : res) { + for (auto& it2 : it.mutations) { + printf("%lld %s\n", it.version, it2.toString().c_str()); + } + } + } + when(wait(feedInterrupt)) { + feedInterrupt = Future(); + feed.cancel(); + feedResults = PromiseStream>>(); + break; + } + } + } + return true; + } catch (Error& e) { + if (e.code() == error_code_end_of_stream) { + return true; + } + throw; + } + } else if (tokencmp(tokens[1], "pop")) { + if (tokens.size() != 4) { + printUsage(tokens[0]); + return false; + } + Version v; + int n = 0; + if (sscanf(tokens[3].toString().c_str(), "%ld%n", &v, &n) != 1 || n != tokens[3].size()) { + printUsage(tokens[0]); + return false; + } else { + wait(localDb->popChangeFeedMutations(tokens[2], v)); + } + } else { + printUsage(tokens[0]); + return false; + } + return true; +} + +CommandFactory changeFeedFactory( + "changefeed", + CommandHelp("changefeed ", "", "")); +} // namespace fdb_cli diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index 6995beab30..e10be14ca8 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -36,6 +36,7 @@ #include "fdbclient/Schemas.h" #include "fdbclient/CoordinationInterface.h" #include "fdbclient/FDBOptions.g.h" +#include "fdbclient/SystemData.h" #include "fdbclient/TagThrottle.actor.h" #include "fdbclient/Tuple.h" @@ -1556,6 +1557,7 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise) { state Database localDb; state Reference db; state Reference tr; + state Transaction trx; state bool writeMode = false; @@ -1879,6 +1881,13 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise) { continue; } + if (tokencmp(tokens[0], "changefeed")) { + bool _result = wait(makeInterruptable(changeFeedCommandActor(localDb, tokens, warn))); + if (!_result) + is_error = true; + continue; + } + if (tokencmp(tokens[0], "unlock")) { if ((tokens.size() != 2) || (tokens[1].size() != 32) || !std::all_of(tokens[1].begin(), tokens[1].end(), &isxdigit)) { diff --git a/fdbcli/fdbcli.actor.h b/fdbcli/fdbcli.actor.h index 7340b3b682..7645f9beb1 100644 --- a/fdbcli/fdbcli.actor.h +++ b/fdbcli/fdbcli.actor.h @@ -165,6 +165,8 @@ ACTOR Future killCommandActor(Reference db, // lock/unlock command ACTOR Future lockCommandActor(Reference db, std::vector tokens); ACTOR Future unlockDatabaseActor(Reference db, UID uid); +// changefeed command +ACTOR Future changeFeedCommandActor(Database localDb, std::vector tokens, Future warn); // maintenance command ACTOR Future setHealthyZone(Reference db, StringRef zoneId, double seconds, bool printWarning = false); ACTOR Future clearHealthyZone(Reference db, diff --git a/fdbclient/ClientKnobs.cpp b/fdbclient/ClientKnobs.cpp index 0b42148de8..98ccf2ea33 100644 --- a/fdbclient/ClientKnobs.cpp +++ b/fdbclient/ClientKnobs.cpp @@ -75,6 +75,9 @@ void ClientKnobs::initialize(Randomize randomize) { init( VALUE_SIZE_LIMIT, 1e5 ); init( SPLIT_KEY_SIZE_LIMIT, KEY_SIZE_LIMIT/2 ); if( randomize && BUGGIFY ) SPLIT_KEY_SIZE_LIMIT = KEY_SIZE_LIMIT - 31;//serverKeysPrefixFor(UID()).size() - 1; init( METADATA_VERSION_CACHE_SIZE, 1000 ); + init( CHANGE_FEED_LOCATION_LIMIT, 10000 ); + init( CHANGE_FEED_CACHE_SIZE, 100000 ); if( randomize && BUGGIFY ) CHANGE_FEED_CACHE_SIZE = 1; + init( CHANGE_FEED_POP_TIMEOUT, 5.0 ); init( MAX_BATCH_SIZE, 1000 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1; init( GRV_BATCH_TIMEOUT, 0.005 ); if( randomize && BUGGIFY ) GRV_BATCH_TIMEOUT = 0.1; diff --git a/fdbclient/ClientKnobs.h b/fdbclient/ClientKnobs.h index dd52bd98c5..6f947d3f07 100644 --- a/fdbclient/ClientKnobs.h +++ b/fdbclient/ClientKnobs.h @@ -74,6 +74,9 @@ public: int64_t VALUE_SIZE_LIMIT; int64_t SPLIT_KEY_SIZE_LIMIT; int METADATA_VERSION_CACHE_SIZE; + int64_t CHANGE_FEED_LOCATION_LIMIT; + int64_t CHANGE_FEED_CACHE_SIZE; + double CHANGE_FEED_POP_TIMEOUT; int MAX_BATCH_SIZE; double GRV_BATCH_TIMEOUT; diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 0db71aa895..073364d730 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -252,6 +252,15 @@ public: // Management API, create snapshot Future createSnapshot(StringRef uid, StringRef snapshot_command); + Future getChangeFeedStream(const PromiseStream>>& results, + Key rangeID, + Version begin = 0, + Version end = std::numeric_limits::max(), + KeyRange range = allKeys); + + Future> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion); + Future popChangeFeedMutations(StringRef rangeID, Version version); + // private: explicit DatabaseContext(Reference>> connectionRecord, Reference> clientDBInfo, @@ -327,6 +336,8 @@ public: std::unordered_map tssMapping; // map from tssid -> metrics for that tss pair std::unordered_map> tssMetrics; + // map from changeFeedId -> changeFeedRange + std::unordered_map changeFeedCache; UID dbId; IsInternal internal; // Only contexts created through the C client and fdbcli are non-internal diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 8e7e873bcf..53e9dba1a8 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -2081,6 +2081,90 @@ ACTOR Future checkDatabaseLock(Reference tr, UI return Void(); } +ACTOR Future updateChangeFeed(Transaction* tr, Key rangeID, ChangeFeedStatus status, KeyRange range) { + state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + + Optional val = wait(tr->get(rangeIDKey)); + if (status == ChangeFeedStatus::CHANGE_FEED_CREATE) { + if (!val.present()) { + tr->set(rangeIDKey, changeFeedValue(range, invalidVersion, status)); + } else if (std::get<0>(decodeChangeFeedValue(val.get())) != range) { + throw unsupported_operation(); + } + } else if (status == ChangeFeedStatus::CHANGE_FEED_STOP) { + if (val.present()) { + tr->set(rangeIDKey, + changeFeedValue(std::get<0>(decodeChangeFeedValue(val.get())), + std::get<1>(decodeChangeFeedValue(val.get())), + status)); + } else { + throw unsupported_operation(); + } + } else if (status == ChangeFeedStatus::CHANGE_FEED_DESTROY) { + if (val.present()) { + tr->set(rangeIDKey, + changeFeedValue(std::get<0>(decodeChangeFeedValue(val.get())), + std::get<1>(decodeChangeFeedValue(val.get())), + status)); + tr->clear(rangeIDKey); + } else { + throw unsupported_operation(); + } + } + return Void(); +} + +ACTOR Future updateChangeFeed(Reference tr, + Key rangeID, + ChangeFeedStatus status, + KeyRange range) { + state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + + Optional val = wait(tr->get(rangeIDKey)); + if (status == ChangeFeedStatus::CHANGE_FEED_CREATE) { + if (!val.present()) { + tr->set(rangeIDKey, changeFeedValue(range, invalidVersion, status)); + } else if (std::get<0>(decodeChangeFeedValue(val.get())) != range) { + throw unsupported_operation(); + } + } else if (status == ChangeFeedStatus::CHANGE_FEED_STOP) { + if (val.present()) { + tr->set(rangeIDKey, + changeFeedValue(std::get<0>(decodeChangeFeedValue(val.get())), + std::get<1>(decodeChangeFeedValue(val.get())), + status)); + } else { + throw unsupported_operation(); + } + } else if (status == ChangeFeedStatus::CHANGE_FEED_DESTROY) { + if (val.present()) { + tr->set(rangeIDKey, + changeFeedValue(std::get<0>(decodeChangeFeedValue(val.get())), + std::get<1>(decodeChangeFeedValue(val.get())), + status)); + tr->clear(rangeIDKey); + } else { + throw unsupported_operation(); + } + } + return Void(); +} + +ACTOR Future updateChangeFeed(Database cx, Key rangeID, ChangeFeedStatus status, KeyRange range) { + state Transaction tr(cx); + loop { + try { + wait(updateChangeFeed(&tr, rangeID, status, range)); + wait(tr.commit()); + return Void(); + } catch (Error& e) { + wait(tr.onError(e)); + } + } +} + ACTOR Future advanceVersion(Database cx, Version v) { state Transaction tr(cx); loop { diff --git a/fdbclient/ManagementAPI.actor.h b/fdbclient/ManagementAPI.actor.h index 3b5dc5beda..64e1e9c6b9 100644 --- a/fdbclient/ManagementAPI.actor.h +++ b/fdbclient/ManagementAPI.actor.h @@ -207,6 +207,13 @@ ACTOR Future unlockDatabase(Database cx, UID id); ACTOR Future checkDatabaseLock(Transaction* tr, UID id); ACTOR Future checkDatabaseLock(Reference tr, UID id); +ACTOR Future updateChangeFeed(Transaction* tr, Key rangeID, ChangeFeedStatus status, KeyRange range = KeyRange()); +ACTOR Future updateChangeFeed(Reference tr, + Key rangeID, + ChangeFeedStatus status, + KeyRange range = KeyRange()); +ACTOR Future updateChangeFeed(Database cx, Key rangeID, ChangeFeedStatus status, KeyRange range = KeyRange()); + ACTOR Future advanceVersion(Database cx, Version v); ACTOR Future setDDMode(Database cx, int mode); diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index c07675c180..ec757f3e97 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -6662,6 +6662,447 @@ Future DatabaseContext::createSnapshot(StringRef uid, StringRef snapshot_c return createSnapshotActor(this, UID::fromString(uid_str), snapshot_command); } +ACTOR Future singleChangeFeedStream(StorageServerInterface interf, + PromiseStream> results, + Key rangeID, + Version begin, + Version end, + KeyRange range) { + loop { + try { + state ChangeFeedStreamRequest req; + req.rangeID = rangeID; + req.begin = begin; + req.end = end; + req.range = range; + + state ReplyPromiseStream replyStream = interf.changeFeedStream.getReplyStream(req); + + loop { + state ChangeFeedStreamReply rep = waitNext(replyStream.getFuture()); + begin = rep.mutations.back().version + 1; + state int resultLoc = 0; + while (resultLoc < rep.mutations.size()) { + if (rep.mutations[resultLoc].mutations.size() || rep.mutations[resultLoc].version + 1 == end) { + wait(results.onEmpty()); + results.send(rep.mutations[resultLoc]); + } + resultLoc++; + } + if (begin == end) { + results.sendError(end_of_stream()); + return Void(); + } + } + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) { + throw; + } + results.sendError(e); + return Void(); + } + } +} + +struct MutationAndVersionStream { + Standalone next; + PromiseStream> results; + + bool operator<(MutationAndVersionStream const& rhs) const { return next.version > rhs.next.version; } +}; + +ACTOR Future mergeChangeFeedStream(std::vector> interfs, + PromiseStream>> results, + Key rangeID, + Version* begin, + Version end) { + state std::priority_queue> mutations; + state std::vector> fetchers(interfs.size()); + state std::vector streams(interfs.size()); + for (int i = 0; i < interfs.size(); i++) { + fetchers[i] = + singleChangeFeedStream(interfs[i].first, streams[i].results, rangeID, *begin, end, interfs[i].second); + } + state int interfNum = 0; + while (interfNum < interfs.size()) { + try { + Standalone res = waitNext(streams[interfNum].results.getFuture()); + streams[interfNum].next = res; + mutations.push(streams[interfNum]); + } catch (Error& e) { + if (e.code() != error_code_end_of_stream) { + throw e; + } + } + interfNum++; + } + state Version checkVersion = invalidVersion; + state Standalone> nextOut; + while (mutations.size()) { + state MutationAndVersionStream nextStream = mutations.top(); + mutations.pop(); + ASSERT(nextStream.next.version >= checkVersion); + if (nextStream.next.version != checkVersion) { + if (nextOut.size()) { + *begin = checkVersion + 1; + results.send(nextOut); + nextOut = Standalone>(); + } + checkVersion = nextStream.next.version; + } + if (nextOut.size() && nextStream.next.version == nextOut.back().version) { + if (nextStream.next.mutations.size() && + nextStream.next.mutations.front().param1 != lastEpochEndPrivateKey) { + nextOut.back().mutations.append_deep( + nextOut.arena(), nextStream.next.mutations.begin(), nextStream.next.mutations.size()); + } + } else { + nextOut.push_back_deep(nextOut.arena(), nextStream.next); + } + try { + Standalone res = waitNext(nextStream.results.getFuture()); + nextStream.next = res; + mutations.push(nextStream); + } catch (Error& e) { + if (e.code() != error_code_end_of_stream) { + throw e; + } + } + } + if (nextOut.size()) { + results.send(nextOut); + } + throw end_of_stream(); +} + +ACTOR Future getChangeFeedRange(Reference db, Database cx, Key rangeID, Version begin = 0) { + state Transaction tr(cx); + state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix); + + auto cacheLoc = db->changeFeedCache.find(rangeID); + if (cacheLoc != db->changeFeedCache.end()) { + return cacheLoc->second; + } + + loop { + try { + Version readVer = wait(tr.getReadVersion()); + if (readVer < begin) { + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); + tr.reset(); + } else { + Optional val = wait(tr.get(rangeIDKey)); + if (!val.present()) { + throw change_feed_not_registered(); + } + if (db->changeFeedCache.size() > CLIENT_KNOBS->CHANGE_FEED_CACHE_SIZE) { + db->changeFeedCache.clear(); + } + KeyRange range = std::get<0>(decodeChangeFeedValue(val.get())); + db->changeFeedCache[rangeID] = range; + return range; + } + } catch (Error& e) { + wait(tr.onError(e)); + } + } +} + +ACTOR Future getChangeFeedStreamActor(Reference db, + PromiseStream>> results, + Key rangeID, + Version begin, + Version end, + KeyRange range) { + state Database cx(db); + state Span span("NAPI:GetChangeFeedStream"_loc); + + loop { + state KeyRange keys; + try { + KeyRange fullRange = wait(getChangeFeedRange(db, cx, rangeID, begin)); + keys = fullRange & range; + state std::vector>> locations = + wait(getKeyRangeLocations(cx, + keys, + CLIENT_KNOBS->CHANGE_FEED_LOCATION_LIMIT, + Reverse::False, + &StorageServerInterface::changeFeedStream, + TransactionInfo(TaskPriority::DefaultEndpoint, span.context))); + + if (locations.size() >= CLIENT_KNOBS->CHANGE_FEED_LOCATION_LIMIT) { + ASSERT_WE_THINK(false); + throw unknown_change_feed(); + } + + state std::vector chosenLocations(locations.size()); + state int loc = 0; + while (loc < locations.size()) { + // FIXME: create a load balance function for this code so future users of reply streams do not have + // to duplicate this code + int count = 0; + int useIdx = -1; + for (int i = 0; i < locations[loc].second->size(); i++) { + if (!IFailureMonitor::failureMonitor() + .getState( + locations[loc].second->get(i, &StorageServerInterface::changeFeedStream).getEndpoint()) + .failed) { + if (deterministicRandom()->random01() <= 1.0 / ++count) { + useIdx = i; + } + } + } + + if (useIdx >= 0) { + chosenLocations[loc] = useIdx; + loc++; + continue; + } + + std::vector> ok(locations[loc].second->size()); + for (int i = 0; i < ok.size(); i++) { + ok[i] = IFailureMonitor::failureMonitor().onStateEqual( + locations[loc].second->get(i, &StorageServerInterface::changeFeedStream).getEndpoint(), + FailureStatus(false)); + } + + // Making this SevWarn means a lot of clutter + if (now() - g_network->networkInfo.newestAlternativesFailure > 1 || + deterministicRandom()->random01() < 0.01) { + TraceEvent("AllAlternativesFailed").detail("Alternatives", locations[0].second->description()); + } + + wait(allAlternativesFailedDelay(quorum(ok, 1))); + loc = 0; + } + + if (locations.size() > 1) { + std::vector> interfs; + for (int i = 0; i < locations.size(); i++) { + interfs.push_back(std::make_pair(locations[i].second->getInterface(chosenLocations[i]), + locations[i].first & range)); + } + wait(mergeChangeFeedStream(interfs, results, rangeID, &begin, end) || cx->connectionFileChanged()); + } else { + state ChangeFeedStreamRequest req; + req.rangeID = rangeID; + req.begin = begin; + req.end = end; + req.range = range; + + state ReplyPromiseStream replyStream = + locations[0] + .second->get(chosenLocations[0], &StorageServerInterface::changeFeedStream) + .getReplyStream(req); + + loop { + wait(results.onEmpty()); + choose { + when(wait(cx->connectionFileChanged())) { break; } + when(ChangeFeedStreamReply rep = waitNext(replyStream.getFuture())) { + begin = rep.mutations.back().version + 1; + results.send(Standalone>(rep.mutations, rep.arena)); + } + } + } + } + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) { + throw; + } + if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed || + e.code() == error_code_connection_failed || e.code() == error_code_unknown_change_feed || + e.code() == error_code_broken_promise) { + db->changeFeedCache.erase(rangeID); + cx->invalidateCache(keys); + wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY)); + } else { + results.sendError(e); + return Void(); + } + } + } +} + +Future DatabaseContext::getChangeFeedStream( + const PromiseStream>>& results, + Key rangeID, + Version begin, + Version end, + KeyRange range) { + return getChangeFeedStreamActor(Reference::addRef(this), results, rangeID, begin, end, range); +} + +ACTOR Future> singleLocationOverlappingChangeFeeds( + Database cx, + Reference location, + KeyRangeRef range, + Version minVersion) { + state OverlappingChangeFeedsRequest req; + req.range = range; + req.minVersion = minVersion; + + OverlappingChangeFeedsReply rep = wait(loadBalance(cx.getPtr(), + location, + &StorageServerInterface::overlappingChangeFeeds, + req, + TaskPriority::DefaultPromiseEndpoint, + AtMostOnce::False, + cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr)); + return rep.rangeIds; +} + +bool compareChangeFeedResult(const OverlappingChangeFeedEntry& i, const OverlappingChangeFeedEntry& j) { + return i.rangeId < j.rangeId; +} + +ACTOR Future> getOverlappingChangeFeedsActor(Reference db, + KeyRangeRef range, + Version minVersion) { + state Database cx(db); + state Transaction tr(cx); + state Span span("NAPI:GetOverlappingChangeFeeds"_loc); + + loop { + try { + state std::vector>> locations = + wait(getKeyRangeLocations(cx, + range, + CLIENT_KNOBS->CHANGE_FEED_LOCATION_LIMIT, + Reverse::False, + &StorageServerInterface::overlappingChangeFeeds, + TransactionInfo(TaskPriority::DefaultEndpoint, span.context))); + + if (locations.size() >= CLIENT_KNOBS->CHANGE_FEED_LOCATION_LIMIT) { + TraceEvent(SevError, "OverlappingRangeTooLarge") + .detail("Range", range) + .detail("Limit", CLIENT_KNOBS->CHANGE_FEED_LOCATION_LIMIT); + wait(delay(1.0)); + throw all_alternatives_failed(); + } + + state std::vector>> allOverlappingRequests; + for (auto& it : locations) { + allOverlappingRequests.push_back( + singleLocationOverlappingChangeFeeds(cx, it.second, it.first & range, minVersion)); + } + wait(waitForAll(allOverlappingRequests)); + + std::vector result; + for (auto& it : allOverlappingRequests) { + result.insert(result.end(), it.get().begin(), it.get().end()); + } + std::sort(result.begin(), result.end(), compareChangeFeedResult); + result.resize(std::unique(result.begin(), result.end()) - result.begin()); + return result; + } catch (Error& e) { + if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) { + cx->invalidateCache(range); + wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY)); + } else { + throw e; + } + } + } +} + +Future> DatabaseContext::getOverlappingChangeFeeds(KeyRangeRef range, + Version minVersion) { + return getOverlappingChangeFeedsActor(Reference::addRef(this), range, minVersion); +} + +ACTOR static Future popChangeFeedBackup(Database cx, StringRef rangeID, Version version) { + state Transaction tr(cx); + loop { + try { + state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix); + Optional val = wait(tr.get(rangeIDKey)); + if (val.present()) { + KeyRange range; + Version popVersion; + ChangeFeedStatus status; + std::tie(range, popVersion, status) = decodeChangeFeedValue(val.get()); + if (version > popVersion) { + tr.set(rangeIDKey, changeFeedValue(range, version, status)); + } + } else { + throw change_feed_not_registered(); + } + wait(tr.commit()); + return Void(); + } catch (Error& e) { + wait(tr.onError(e)); + } + } +} + +ACTOR Future popChangeFeedMutationsActor(Reference db, StringRef rangeID, Version version) { + state Database cx(db); + state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix); + state Span span("NAPI:PopChangeFeedMutations"_loc); + + state KeyRange keys = wait(getChangeFeedRange(db, cx, rangeID)); + + state std::vector>> locations = + wait(getKeyRangeLocations(cx, + keys, + 3, + Reverse::False, + &StorageServerInterface::changeFeedPop, + TransactionInfo(TaskPriority::DefaultEndpoint, span.context))); + + if (locations.size() > 2) { + wait(popChangeFeedBackup(cx, rangeID, version)); + return Void(); + } + + bool foundFailed = false; + for (int i = 0; i < locations.size() && !foundFailed; i++) { + for (int j = 0; j < locations[i].second->size() && !foundFailed; j++) { + if (IFailureMonitor::failureMonitor() + .getState(locations[i].second->get(j, &StorageServerInterface::changeFeedPop).getEndpoint()) + .isFailed()) { + foundFailed = true; + } + } + } + + if (foundFailed) { + wait(popChangeFeedBackup(cx, rangeID, version)); + return Void(); + } + + // FIXME: lookup both the src and dest shards as of the pop version to ensure all locations are popped + state std::vector> popRequests; + for (int i = 0; i < locations.size(); i++) { + for (int j = 0; j < locations[i].second->size(); j++) { + popRequests.push_back(locations[i].second->getInterface(j).changeFeedPop.getReply( + ChangeFeedPopRequest(rangeID, version, locations[i].first))); + } + } + + try { + choose { + when(wait(waitForAll(popRequests))) {} + when(wait(delay(CLIENT_KNOBS->CHANGE_FEED_POP_TIMEOUT))) { + wait(popChangeFeedBackup(cx, rangeID, version)); + } + } + } catch (Error& e) { + if (e.code() != error_code_unknown_change_feed && e.code() != error_code_wrong_shard_server && + e.code() != error_code_all_alternatives_failed) { + throw; + } + db->changeFeedCache.erase(rangeID); + cx->invalidateCache(keys); + wait(popChangeFeedBackup(cx, rangeID, version)); + } + return Void(); +} + +Future DatabaseContext::popChangeFeedMutations(StringRef rangeID, Version version) { + return popChangeFeedMutationsActor(Reference::addRef(this), rangeID, version); +} ACTOR Future setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware) { state ReadYourWritesTransaction tr(cx); loop { diff --git a/fdbclient/StorageServerInterface.cpp b/fdbclient/StorageServerInterface.cpp index d67df9cf95..efca15289c 100644 --- a/fdbclient/StorageServerInterface.cpp +++ b/fdbclient/StorageServerInterface.cpp @@ -270,6 +270,27 @@ void TSS_traceMismatch(TraceEvent& event, ASSERT(false); } +// change feed +template <> +bool TSS_doCompare(const OverlappingChangeFeedsReply& src, const OverlappingChangeFeedsReply& tss) { + ASSERT(false); + return true; +} + +template <> +const char* TSS_mismatchTraceName(const OverlappingChangeFeedsRequest& req) { + ASSERT(false); + return ""; +} + +template <> +void TSS_traceMismatch(TraceEvent& event, + const OverlappingChangeFeedsRequest& req, + const OverlappingChangeFeedsReply& src, + const OverlappingChangeFeedsReply& tss) { + ASSERT(false); +} + // template specializations for metrics replies that should never be called because these requests aren't duplicated // storage metrics @@ -331,6 +352,9 @@ void TSSMetrics::recordLatency(const SplitRangeRequest& req, double ssLatency, d template <> void TSSMetrics::recordLatency(const GetKeyValuesStreamRequest& req, double ssLatency, double tssLatency) {} +template <> +void TSSMetrics::recordLatency(const OverlappingChangeFeedsRequest& req, double ssLatency, double tssLatency) {} + // ------------------- TEST_CASE("/StorageServerInterface/TSSCompare/TestComparison") { diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index fdc37287a5..ac8e9c344f 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -30,6 +30,7 @@ #include "fdbrpc/Stats.h" #include "fdbrpc/TimedRequest.h" #include "fdbrpc/TSSComparison.h" +#include "fdbclient/CommitTransaction.h" #include "fdbclient/TagThrottle.actor.h" #include "flow/UnitTest.h" @@ -77,6 +78,9 @@ struct StorageServerInterface { RequestStream getReadHotRanges; RequestStream getRangeSplitPoints; RequestStream getKeyValuesStream; + RequestStream changeFeedStream; + RequestStream overlappingChangeFeeds; + RequestStream changeFeedPop; explicit StorageServerInterface(UID uid) : uniqueID(uid) {} StorageServerInterface() : uniqueID(deterministicRandom()->randomUniqueID()) {} @@ -119,6 +123,12 @@ struct StorageServerInterface { RequestStream(getValue.getEndpoint().getAdjustedEndpoint(12)); getKeyValuesStream = RequestStream(getValue.getEndpoint().getAdjustedEndpoint(13)); + changeFeedStream = + RequestStream(getValue.getEndpoint().getAdjustedEndpoint(14)); + overlappingChangeFeeds = + RequestStream(getValue.getEndpoint().getAdjustedEndpoint(15)); + changeFeedPop = + RequestStream(getValue.getEndpoint().getAdjustedEndpoint(16)); } } else { ASSERT(Ar::isDeserializing); @@ -161,6 +171,9 @@ struct StorageServerInterface { streams.push_back(getReadHotRanges.getReceiver()); streams.push_back(getRangeSplitPoints.getReceiver()); streams.push_back(getKeyValuesStream.getReceiver(TaskPriority::LoadBalancedEndpoint)); + streams.push_back(changeFeedStream.getReceiver()); + streams.push_back(overlappingChangeFeeds.getReceiver()); + streams.push_back(changeFeedPop.getReceiver()); FlowTransport::transport().addEndpoints(streams); } }; @@ -622,6 +635,133 @@ struct SplitRangeRequest { } }; +struct MutationsAndVersionRef { + VectorRef mutations; + Version version = invalidVersion; + Version knownCommittedVersion = invalidVersion; + + MutationsAndVersionRef() {} + explicit MutationsAndVersionRef(Version version, Version knownCommittedVersion) + : version(version), knownCommittedVersion(knownCommittedVersion) {} + MutationsAndVersionRef(VectorRef mutations, Version version, Version knownCommittedVersion) + : mutations(mutations), version(version), knownCommittedVersion(knownCommittedVersion) {} + MutationsAndVersionRef(Arena& to, VectorRef mutations, Version version, Version knownCommittedVersion) + : mutations(to, mutations), version(version), knownCommittedVersion(knownCommittedVersion) {} + MutationsAndVersionRef(Arena& to, const MutationsAndVersionRef& from) + : mutations(to, from.mutations), version(from.version), knownCommittedVersion(from.knownCommittedVersion) {} + int expectedSize() const { return mutations.expectedSize(); } + + struct OrderByVersion { + bool operator()(MutationsAndVersionRef const& a, MutationsAndVersionRef const& b) const { + return a.version < b.version; + } + }; + + template + void serialize(Ar& ar) { + serializer(ar, mutations, version, knownCommittedVersion); + } +}; + +struct ChangeFeedStreamReply : public ReplyPromiseStreamReply { + constexpr static FileIdentifier file_identifier = 1783066; + Arena arena; + VectorRef mutations; + + ChangeFeedStreamReply() {} + + int expectedSize() const { return sizeof(ChangeFeedStreamReply) + mutations.expectedSize(); } + + template + void serialize(Ar& ar) { + serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, mutations, arena); + } +}; + +struct ChangeFeedStreamRequest { + constexpr static FileIdentifier file_identifier = 6795746; + SpanID spanContext; + Arena arena; + Key rangeID; + Version begin = 0; + Version end = 0; + KeyRange range; + ReplyPromiseStream reply; + + ChangeFeedStreamRequest() {} + template + void serialize(Ar& ar) { + serializer(ar, rangeID, begin, end, range, reply, spanContext, arena); + } +}; + +struct ChangeFeedPopRequest { + constexpr static FileIdentifier file_identifier = 10726174; + Key rangeID; + Version version; + KeyRange range; + ReplyPromise reply; + + ChangeFeedPopRequest() {} + ChangeFeedPopRequest(Key const& rangeID, Version version, KeyRange const& range) + : rangeID(rangeID), version(version), range(range) {} + + template + void serialize(Ar& ar) { + serializer(ar, rangeID, version, range, reply); + } +}; + +struct OverlappingChangeFeedEntry { + Key rangeId; + KeyRange range; + bool stopped = false; + + bool operator==(const OverlappingChangeFeedEntry& r) const { + return rangeId == r.rangeId && range == r.range && stopped == r.stopped; + } + + OverlappingChangeFeedEntry() {} + OverlappingChangeFeedEntry(Key const& rangeId, KeyRange const& range, bool stopped) + : rangeId(rangeId), range(range), stopped(stopped) {} + + template + void serialize(Ar& ar) { + serializer(ar, rangeId, range, stopped); + } +}; + +struct OverlappingChangeFeedsReply { + constexpr static FileIdentifier file_identifier = 11815134; + std::vector rangeIds; + bool cached; + Arena arena; + + OverlappingChangeFeedsReply() : cached(false) {} + explicit OverlappingChangeFeedsReply(std::vector const& rangeIds) + : rangeIds(rangeIds), cached(false) {} + + template + void serialize(Ar& ar) { + serializer(ar, rangeIds, arena); + } +}; + +struct OverlappingChangeFeedsRequest { + constexpr static FileIdentifier file_identifier = 10726174; + KeyRange range; + Version minVersion; + ReplyPromise reply; + + OverlappingChangeFeedsRequest() {} + explicit OverlappingChangeFeedsRequest(KeyRange const& range) : range(range) {} + + template + void serialize(Ar& ar) { + serializer(ar, range, minVersion, reply); + } +}; + struct GetStorageMetricsReply { constexpr static FileIdentifier file_identifier = 15491478; StorageMetrics load; diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 3545defccb..334d0b4b7f 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -1032,6 +1032,62 @@ const KeyRef writeRecoveryKey = LiteralStringRef("\xff/writeRecovery"); const ValueRef writeRecoveryKeyTrue = LiteralStringRef("1"); const KeyRef snapshotEndVersionKey = LiteralStringRef("\xff/snapshotEndVersion"); +const KeyRangeRef changeFeedKeys(LiteralStringRef("\xff\x02/feed/"), LiteralStringRef("\xff\x02/feed0")); +const KeyRef changeFeedPrefix = changeFeedKeys.begin; +const KeyRef changeFeedPrivatePrefix = LiteralStringRef("\xff\xff\x02/feed/"); + +const Value changeFeedValue(KeyRangeRef const& range, Version popVersion, ChangeFeedStatus status) { + BinaryWriter wr(IncludeVersion(ProtocolVersion::withChangeFeed())); + wr << range; + wr << popVersion; + wr << status; + return wr.toValue(); +} + +std::tuple decodeChangeFeedValue(ValueRef const& value) { + KeyRange range; + Version version; + ChangeFeedStatus status; + BinaryReader reader(value, IncludeVersion()); + reader >> range; + reader >> version; + reader >> status; + return std::make_tuple(range, version, status); +} + +const KeyRangeRef changeFeedDurableKeys(LiteralStringRef("\xff\xff/cf/"), LiteralStringRef("\xff\xff/cf0")); +const KeyRef changeFeedDurablePrefix = changeFeedDurableKeys.begin; + +const Value changeFeedDurableKey(Key const& feed, Version version) { + BinaryWriter wr(AssumeVersion(ProtocolVersion::withChangeFeed())); + wr.serializeBytes(changeFeedDurablePrefix); + wr << feed; + wr << bigEndian64(version); + return wr.toValue(); +} +std::pair decodeChangeFeedDurableKey(ValueRef const& key) { + Key feed; + Version version; + BinaryReader reader(key.removePrefix(changeFeedDurablePrefix), AssumeVersion(ProtocolVersion::withChangeFeed())); + reader >> feed; + reader >> version; + return std::make_pair(feed, bigEndian64(version)); +} +const Value changeFeedDurableValue(Standalone> const& mutations, Version knownCommittedVersion) { + BinaryWriter wr(IncludeVersion(ProtocolVersion::withChangeFeed())); + wr << mutations; + wr << knownCommittedVersion; + return wr.toValue(); +} +std::pair>, Version> decodeChangeFeedDurableValue(ValueRef const& value) { + Standalone> mutations; + Version knownCommittedVersion; + BinaryReader reader(value, IncludeVersion()); + reader >> mutations; + reader >> knownCommittedVersion; + return std::make_pair(mutations, knownCommittedVersion); +} + const KeyRef configTransactionDescriptionKey = "\xff\xff/description"_sr; const KeyRange globalConfigKnobKeys = singleKeyRange("\xff\xff/globalKnobs"_sr); const KeyRangeRef configKnobKeys("\xff\xff/knobs/"_sr, "\xff\xff/knobs0"_sr); diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index 64eb82c400..73ac1a39c9 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -496,6 +496,21 @@ extern const ValueRef writeRecoveryKeyTrue; // Allows incremental restore to read and set starting version for consistency. extern const KeyRef snapshotEndVersionKey; +extern const KeyRangeRef changeFeedKeys; +enum class ChangeFeedStatus { CHANGE_FEED_CREATE = 0, CHANGE_FEED_STOP = 1, CHANGE_FEED_DESTROY = 2 }; +const Value changeFeedValue(KeyRangeRef const& range, Version popVersion, ChangeFeedStatus status); +std::tuple decodeChangeFeedValue(ValueRef const& value); +extern const KeyRef changeFeedPrefix; +extern const KeyRef changeFeedPrivatePrefix; + +extern const KeyRangeRef changeFeedDurableKeys; +extern const KeyRef changeFeedDurablePrefix; + +const Value changeFeedDurableKey(Key const& feed, Version version); +std::pair decodeChangeFeedDurableKey(ValueRef const& key); +const Value changeFeedDurableValue(Standalone> const& mutations, Version knownCommittedVersion); +std::pair>, Version> decodeChangeFeedDurableValue(ValueRef const& value); + // Configuration database special keys extern const KeyRef configTransactionDescriptionKey; extern const KeyRange globalConfigKnobKeys; diff --git a/fdbrpc/FailureMonitor.h b/fdbrpc/FailureMonitor.h index 04154fa103..795a6e9953 100644 --- a/fdbrpc/FailureMonitor.h +++ b/fdbrpc/FailureMonitor.h @@ -157,7 +157,7 @@ public: private: std::unordered_map addressStatus; YieldedAsyncMap endpointKnownFailed; - YieldedAsyncMap disconnectTriggers; + AsyncMap disconnectTriggers; std::unordered_set failedEndpoints; friend class OnStateChangedActorActor; diff --git a/fdbrpc/fdbrpc.h b/fdbrpc/fdbrpc.h index bb23b9f21a..652c34b676 100644 --- a/fdbrpc/fdbrpc.h +++ b/fdbrpc/fdbrpc.h @@ -441,10 +441,12 @@ public: template void sendError(const E& exc) const { - if (queue->isRemoteEndpoint() && !queue->sentError) { - queue->sentError = true; - FlowTransport::transport().sendUnreliable( - SerializeSource>>(exc), getEndpoint(), false); + if (queue->isRemoteEndpoint()) { + if (!queue->sentError && !queue->acknowledgements.failures.isError()) { + queue->sentError = true; + FlowTransport::transport().sendUnreliable( + SerializeSource>>(exc), getEndpoint(), false); + } } else { queue->sendError(exc); if (errors && errors->canBeSet()) { diff --git a/fdbserver/ApplyMetadataMutation.cpp b/fdbserver/ApplyMetadataMutation.cpp index 40ba33c2d2..5a720ca594 100644 --- a/fdbserver/ApplyMetadataMutation.cpp +++ b/fdbserver/ApplyMetadataMutation.cpp @@ -308,6 +308,32 @@ private: txnStateStore->set(KeyValueRef(m.param1, m.param2)); } + void checkSetChangeFeedPrefix(MutationRef m) { + if (!m.param1.startsWith(changeFeedPrefix)) { + return; + } + if (toCommit && keyInfo) { + KeyRange r = std::get<0>(decodeChangeFeedValue(m.param2)); + MutationRef privatized = m; + privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); + auto ranges = keyInfo->intersectingRanges(r); + auto firstRange = ranges.begin(); + ++firstRange; + if (firstRange == ranges.end()) { + ranges.begin().value().populateTags(); + toCommit->addTags(ranges.begin().value().tags); + } else { + std::set allSources; + for (auto r : ranges) { + r.value().populateTags(); + allSources.insert(r.value().tags.begin(), r.value().tags.end()); + } + toCommit->addTags(allSources); + } + toCommit->writeTypedMessage(privatized); + } + } + void checkSetServerListPrefix(MutationRef m) { if (!m.param1.startsWith(serverListPrefix)) { return; @@ -974,6 +1000,7 @@ public: checkSetCacheKeysPrefix(m); checkSetConfigKeys(m); checkSetServerListPrefix(m); + checkSetChangeFeedPrefix(m); checkSetTSSMappingKeys(m); checkSetTSSQuarantineKeys(m); checkSetApplyMutationsEndRange(m); diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 3f30d0e246..9e2a4599d8 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -169,6 +169,7 @@ set(FDBSERVER_SRCS workloads/ConsistencyCheck.actor.cpp workloads/CpuProfiler.actor.cpp workloads/Cycle.actor.cpp + workloads/ChangeFeeds.actor.cpp workloads/DataDistributionMetrics.actor.cpp workloads/DataLossRecovery.actor.cpp workloads/DDBalance.actor.cpp diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index b761ce44e0..fe9076a08b 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1867,10 +1867,13 @@ Future tLogPeekMessages(PromiseType replyPromise, reply.end = endVersion; reply.onlySpilled = onlySpilled; - //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("Tag", reqTag.toString()). - // detail("BeginVer", reqBegin).detail("EndVer", reply.end). - // detail("MsgBytes", reply.messages.expectedSize()). - // detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()); + // TraceEvent("TlogPeek", self->dbgid) + // .detail("LogId", logData->logId) + // .detail("Tag", req.tag.toString()) + // .detail("BeginVer", req.begin) + // .detail("EndVer", reply.end) + // .detail("MsgBytes", reply.messages.expectedSize()) + // .detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()); if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 752c6c410d..bacbbb165d 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -82,6 +82,7 @@ bool canReplyWith(Error e) { case error_code_wrong_shard_server: case error_code_process_behind: case error_code_watch_cancelled: + case error_code_unknown_change_feed: // case error_code_all_alternatives_failed: return true; default: @@ -95,6 +96,7 @@ struct AddingShard : NonCopyable { Future fetchClient; // holds FetchKeys() actor Promise fetchComplete; Promise readWrite; + PromiseStream changeFeedRemovals; // During the Fetching phase, it saves newer mutations whose version is greater or equal to fetchClient's // fetchVersion, while the shard is still busy catching up with fetchClient. It applies these updates after fetching @@ -131,7 +133,7 @@ struct AddingShard : NonCopyable { readWrite.send(Void()); } - void addMutation(Version version, MutationRef const& mutation); + void addMutation(Version version, bool fromFetch, MutationRef const& mutation); bool isTransferred() const { return phase == Waiting; } }; @@ -160,7 +162,7 @@ public: bool notAssigned() const { return !readWrite && !adding; } bool assigned() const { return readWrite || adding; } bool isInVersionedData() const { return readWrite || (adding && adding->isTransferred()); } - void addMutation(Version version, MutationRef const& mutation); + void addMutation(Version version, bool fromFetch, MutationRef const& mutation); bool isFetched() const { return readWrite || (adding && adding->fetchComplete.isSet()); } const char* debugDescribeState() const { @@ -324,6 +326,18 @@ struct FetchInjectionInfo { std::vector changes; }; +struct ChangeFeedInfo : ReferenceCounted { + std::deque> mutations; + Version storageVersion = invalidVersion; // The version between the storage version and the durable version are + // currently being written to disk + Version durableVersion = invalidVersion; // All versions before the durable version are durable on disk + Version emptyVersion = 0; // The change feed does not have any mutations before emptyVersion + KeyRange range; + Key id; + AsyncTrigger newMutations; + bool stopped = false; // A stopped change feed no longer adds new mutations, but is still queriable +}; + class ServerWatchMetadata : public ReferenceCounted { public: Key key; @@ -588,6 +602,12 @@ public: KeyRangeMap cachedRangeMap; // indicates if a key-range is being cached + KeyRangeMap>> keyChangeFeed; + std::map> uidChangeFeed; + Deque, Version>> changeFeedVersions; + std::map> changeFeedRemovals; + std::set currentChangeFeeds; + // newestAvailableVersion[k] // == invalidVersion -> k is unavailable at all versions // <= storageVersion -> k is unavailable at all versions (but might be read anyway from storage if we are in the @@ -610,6 +630,7 @@ public: NotifiedVersion durableVersion; // At least this version will be readable from storage after a power failure Version rebootAfterDurableVersion; int8_t primaryLocality; + Version knownCommittedVersion; Deque> recoveryVersionSkips; int64_t versionLag; // An estimate for how many versions it takes for the data to move from the logs to this storage @@ -864,9 +885,9 @@ public: tag(invalidTag), poppedAllAfter(std::numeric_limits::max()), cpuUsage(0.0), diskUsage(0.0), storage(this, storage), shardChangeCounter(0), lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0), rebootAfterDurableVersion(std::numeric_limits::max()), primaryLocality(tagLocalityInvalid), - versionLag(0), logProtocol(0), thisServerID(ssi.id()), tssInQuarantine(false), db(db), actors(false), - byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), durableInProgress(Void()), watchBytes(0), - numWatches(0), noRecentUpdates(false), lastUpdate(now()), + knownCommittedVersion(0), versionLag(0), logProtocol(0), thisServerID(ssi.id()), tssInQuarantine(false), db(db), + actors(false), byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), durableInProgress(Void()), + watchBytes(0), numWatches(0), noRecentUpdates(false), lastUpdate(now()), readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")), updateEagerReads(nullptr), fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM), fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false), @@ -902,6 +923,7 @@ public: shards.insert(newShard->keys, Reference(newShard)); } void addMutation(Version version, + bool fromFetch, MutationRef const& mutation, KeyRangeRef const& shard, UpdateEagerReadInfo* eagerReads); @@ -1537,6 +1559,319 @@ ACTOR Future watchValueSendReply(StorageServer* data, } } +ACTOR Future changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req) { + wait(delay(0)); + + TraceEvent(SevDebug, "ChangeFeedPopQuery", self->thisServerID) + .detail("RangeID", req.rangeID.printable()) + .detail("Version", req.version) + .detail("Range", req.range.toString()); + + if (!self->isReadable(req.range)) { + req.reply.sendError(wrong_shard_server()); + return Void(); + } + auto feed = self->uidChangeFeed.find(req.rangeID); + if (feed == self->uidChangeFeed.end()) { + req.reply.sendError(unknown_change_feed()); + return Void(); + } + if (req.version - 1 > feed->second->emptyVersion) { + feed->second->emptyVersion = req.version - 1; + while (!feed->second->mutations.empty() && feed->second->mutations.front().version < req.version) { + feed->second->mutations.pop_front(); + } + if (feed->second->storageVersion != invalidVersion) { + self->storage.clearRange(KeyRangeRef(changeFeedDurableKey(feed->second->id, 0), + changeFeedDurableKey(feed->second->id, req.version))); + if (req.version > feed->second->storageVersion) { + feed->second->storageVersion = invalidVersion; + feed->second->durableVersion = invalidVersion; + } + wait(self->durableVersion.whenAtLeast(self->storageVersion() + 1)); + } + } + req.reply.send(Void()); + return Void(); +} + +ACTOR Future overlappingChangeFeedsQ(StorageServer* data, OverlappingChangeFeedsRequest req) { + wait(delay(0)); + wait(data->version.whenAtLeast(req.minVersion)); + + if (!data->isReadable(req.range)) { + req.reply.sendError(wrong_shard_server()); + return Void(); + } + + auto ranges = data->keyChangeFeed.intersectingRanges(req.range); + std::map> rangeIds; + for (auto r : ranges) { + for (auto& it : r.value()) { + rangeIds[it->id] = std::make_pair(it->range, it->stopped); + } + } + OverlappingChangeFeedsReply reply; + for (auto& it : rangeIds) { + reply.rangeIds.push_back(OverlappingChangeFeedEntry(it.first, it.second.first, it.second.second)); + } + req.reply.send(reply); + return Void(); +} + +MutationsAndVersionRef filterMutationsInverted(Arena& arena, MutationsAndVersionRef const& m, KeyRange const& range) { + Optional> modifiedMutations; + for (int i = 0; i < m.mutations.size(); i++) { + if (m.mutations[i].type == MutationRef::SetValue) { + if (modifiedMutations.present() && !range.contains(m.mutations[i].param1)) { + modifiedMutations.get().push_back(arena, m.mutations[i]); + } + if (!modifiedMutations.present() && range.contains(m.mutations[i].param1)) { + modifiedMutations = m.mutations.slice(0, i); + arena.dependsOn(range.arena()); + } + } else { + ASSERT(m.mutations[i].type == MutationRef::ClearRange); + if (!modifiedMutations.present() && + ((m.mutations[i].param1 < range.begin && m.mutations[i].param2 > range.begin) || + (m.mutations[i].param2 > range.end && m.mutations[i].param1 < range.end))) { + modifiedMutations = m.mutations.slice(0, i); + arena.dependsOn(range.arena()); + } + if (modifiedMutations.present()) { + if (m.mutations[i].param1 < range.begin) { + modifiedMutations.get().push_back(arena, + MutationRef(MutationRef::ClearRange, + m.mutations[i].param1, + std::min(range.begin, m.mutations[i].param2))); + } + if (m.mutations[i].param2 > range.end) { + modifiedMutations.get().push_back(arena, + MutationRef(MutationRef::ClearRange, + std::max(range.end, m.mutations[i].param1), + m.mutations[i].param2)); + } + } + } + } + if (modifiedMutations.present()) { + return MutationsAndVersionRef(modifiedMutations.get(), m.version, m.knownCommittedVersion); + } + return m; +} + +MutationsAndVersionRef filterMutations(Arena& arena, + MutationsAndVersionRef const& m, + KeyRange const& range, + bool inverted) { + if (m.mutations.size() == 1 && m.mutations.back().param1 == lastEpochEndPrivateKey) { + return m; + } + + if (inverted) { + return filterMutationsInverted(arena, m, range); + } + + Optional> modifiedMutations; + for (int i = 0; i < m.mutations.size(); i++) { + if (m.mutations[i].type == MutationRef::SetValue) { + if (modifiedMutations.present() && range.contains(m.mutations[i].param1)) { + modifiedMutations.get().push_back(arena, m.mutations[i]); + } + if (!modifiedMutations.present() && !range.contains(m.mutations[i].param1)) { + modifiedMutations = m.mutations.slice(0, i); + arena.dependsOn(range.arena()); + } + } else { + ASSERT(m.mutations[i].type == MutationRef::ClearRange); + if (!modifiedMutations.present() && + (m.mutations[i].param1 < range.begin || m.mutations[i].param2 > range.end)) { + modifiedMutations = m.mutations.slice(0, i); + arena.dependsOn(range.arena()); + } + if (modifiedMutations.present()) { + if (m.mutations[i].param1 < range.end && range.begin < m.mutations[i].param2) { + modifiedMutations.get().push_back(arena, + MutationRef(MutationRef::ClearRange, + std::max(range.begin, m.mutations[i].param1), + std::min(range.end, m.mutations[i].param2))); + } + } + } + } + if (modifiedMutations.present()) { + return MutationsAndVersionRef(modifiedMutations.get(), m.version, m.knownCommittedVersion); + } + return m; +} + +ACTOR Future getChangeFeedMutations(StorageServer* data, + ChangeFeedStreamRequest req, + bool inverted) { + state ChangeFeedStreamReply reply; + state ChangeFeedStreamReply memoryReply; + state int remainingLimitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT; + state int remainingDurableBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT; + wait(delay(0, TaskPriority::DefaultEndpoint)); + if (data->version.get() < req.begin) { + wait(data->version.whenAtLeast(req.begin)); + } + state uint64_t changeCounter = data->shardChangeCounter; + if (!inverted && !data->isReadable(req.range)) { + throw wrong_shard_server(); + } + + auto feed = data->uidChangeFeed.find(req.rangeID); + if (feed == data->uidChangeFeed.end()) { + throw unknown_change_feed(); + } + + // We must copy the mutationDeque when fetching the durable bytes in case mutations are popped from memory while + // waiting for the results + state Version dequeVersion = data->version.get(); + state Version dequeKnownCommit = data->knownCommittedVersion; + + if (req.end > feed->second->emptyVersion + 1) { + for (auto& it : feed->second->mutations) { + if (it.version >= req.end || remainingLimitBytes <= 0) { + break; + } + if (it.version >= req.begin) { + memoryReply.arena.dependsOn(it.arena()); + auto m = filterMutations(memoryReply.arena, it, req.range, inverted); + memoryReply.mutations.push_back(memoryReply.arena, m); + remainingLimitBytes -= sizeof(MutationsAndVersionRef) + m.expectedSize(); + } + } + } + + if (req.end > feed->second->emptyVersion + 1 && feed->second->durableVersion != invalidVersion && + req.begin <= feed->second->durableVersion) { + RangeResult res = wait(data->storage.readRange( + KeyRangeRef(changeFeedDurableKey(req.rangeID, std::max(req.begin, feed->second->emptyVersion)), + changeFeedDurableKey(req.rangeID, req.end)), + 1 << 30, + remainingDurableBytes)); + + if (!req.range.empty()) { + data->checkChangeCounter(changeCounter, req.range); + } + + Version lastVersion = req.begin - 1; + for (auto& kv : res) { + Key id; + Version version, knownCommittedVersion; + Standalone> mutations; + std::tie(id, version) = decodeChangeFeedDurableKey(kv.key); + std::tie(mutations, knownCommittedVersion) = decodeChangeFeedDurableValue(kv.value); + reply.arena.dependsOn(mutations.arena()); + auto m = filterMutations( + reply.arena, MutationsAndVersionRef(mutations, version, knownCommittedVersion), req.range, inverted); + reply.mutations.push_back(reply.arena, m); + remainingDurableBytes -= + sizeof(KeyValueRef) + + kv.expectedSize(); // This is tracking the size on disk rather than the reply size + // because we cannot add mutations from memory if there are potentially more on disk + lastVersion = version; + } + if (remainingDurableBytes > 0) { + reply.arena.dependsOn(memoryReply.arena); + auto it = memoryReply.mutations.begin(); + int totalCount = memoryReply.mutations.size(); + while (it != memoryReply.mutations.end() && it->version <= lastVersion) { + ++it; + --totalCount; + } + reply.mutations.append(reply.arena, it, totalCount); + } + } else { + reply = memoryReply; + } + + Version finalVersion = std::min(req.end - 1, dequeVersion); + if ((reply.mutations.empty() || reply.mutations.back().version < finalVersion) && remainingLimitBytes > 0 && + remainingDurableBytes > 0) { + reply.mutations.push_back( + reply.arena, MutationsAndVersionRef(finalVersion, finalVersion == dequeVersion ? dequeKnownCommit : 0)); + } + return reply; +} + +ACTOR Future localChangeFeedStream(StorageServer* data, + PromiseStream> results, + Key rangeID, + Version begin, + Version end, + KeyRange range) { + try { + loop { + state ChangeFeedStreamRequest feedRequest; + feedRequest.rangeID = rangeID; + feedRequest.begin = begin; + feedRequest.end = end; + feedRequest.range = range; + state ChangeFeedStreamReply feedReply = wait(getChangeFeedMutations(data, feedRequest, true)); + begin = feedReply.mutations.back().version + 1; + state int resultLoc = 0; + while (resultLoc < feedReply.mutations.size()) { + if (feedReply.mutations[resultLoc].mutations.size() || + feedReply.mutations[resultLoc].version == end - 1) { + wait(results.onEmpty()); + results.send(feedReply.mutations[resultLoc]); + } + resultLoc++; + } + + if (begin == end) { + return Void(); + } + } + } catch (Error& e) { + TraceEvent(SevError, "LocalChangeFeedError", data->thisServerID).error(e); + throw; + } +} + +ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamRequest req) { + state Span span("SS:getChangeFeedStream"_loc, { req.spanContext }); + req.reply.setByteLimit(SERVER_KNOBS->RANGESTREAM_LIMIT_BYTES); + + wait(delay(0, TaskPriority::DefaultEndpoint)); + + try { + loop { + wait(req.reply.onReady()); + ChangeFeedStreamReply _feedReply = wait(getChangeFeedMutations(data, req, false)); + ChangeFeedStreamReply feedReply = _feedReply; + + req.begin = feedReply.mutations.back().version + 1; + req.reply.send(feedReply); + if (feedReply.mutations.back().version == req.end - 1) { + req.reply.sendError(end_of_stream()); + return Void(); + } + if (feedReply.mutations.back().mutations.empty()) { + auto feed = data->uidChangeFeed.find(req.rangeID); + if (feed == data->uidChangeFeed.end()) { + req.reply.sendError(unknown_change_feed()); + return Void(); + } + choose { + when(wait(delay(5.0, TaskPriority::DefaultEndpoint))) {} + when(wait(feed->second->newMutations.onTrigger())) {} + } + } + } + } catch (Error& e) { + if (e.code() != error_code_operation_obsolete) { + if (!canReplyWith(e)) + throw; + req.reply.sendError(e); + } + } + return Void(); +} + #ifdef NO_INTELLISENSE size_t WATCH_OVERHEAD_WATCHQ = sizeof(WatchValueSendReplyActorState) + sizeof(WatchValueSendReplyActor); @@ -2624,7 +2959,12 @@ bool expandMutation(MutationRef& m, return true; } -void applyMutation(StorageServer* self, MutationRef const& m, Arena& arena, StorageServer::VersionedData& data) { +void applyMutation(StorageServer* self, + MutationRef const& m, + Arena& arena, + StorageServer::VersionedData& data, + Version version, + bool fromFetch) { // m is expected to be in arena already // Clear split keys are added to arena StorageMetrics metrics; @@ -2657,12 +2997,39 @@ void applyMutation(StorageServer* self, MutationRef const& m, Arena& arena, Stor } data.insert(m.param1, ValueOrClearToRef::value(m.param2)); self->watches.trigger(m.param1); + + if (!fromFetch) { + for (auto& it : self->keyChangeFeed[m.param1]) { + if (!it->stopped) { + if (it->mutations.empty() || it->mutations.back().version != version) { + it->mutations.push_back(MutationsAndVersionRef(version, self->knownCommittedVersion)); + } + it->mutations.back().mutations.push_back_deep(it->mutations.back().arena(), m); + self->currentChangeFeeds.insert(it->id); + } + } + } } else if (m.type == MutationRef::ClearRange) { data.erase(m.param1, m.param2); ASSERT(m.param2 > m.param1); ASSERT(!data.isClearContaining(data.atLatest(), m.param1)); data.insert(m.param1, ValueOrClearToRef::clearTo(m.param2)); self->watches.triggerRange(m.param1, m.param2); + + if (!fromFetch) { + auto ranges = self->keyChangeFeed.intersectingRanges(KeyRangeRef(m.param1, m.param2)); + for (auto& r : ranges) { + for (auto& it : r.value()) { + if (!it->stopped) { + if (it->mutations.empty() || it->mutations.back().version != version) { + it->mutations.push_back(MutationsAndVersionRef(version, self->knownCommittedVersion)); + } + it->mutations.back().mutations.push_back_deep(it->mutations.back().arena(), m); + self->currentChangeFeeds.insert(it->id); + } + } + } + } } } @@ -2743,34 +3110,34 @@ void coalesceShards(StorageServer* data, KeyRangeRef keys) { } template -void addMutation(T& target, Version version, MutationRef const& mutation) { - target.addMutation(version, mutation); +void addMutation(T& target, Version version, bool fromFetch, MutationRef const& mutation) { + target.addMutation(version, fromFetch, mutation); } template -void addMutation(Reference& target, Version version, MutationRef const& mutation) { - addMutation(*target, version, mutation); +void addMutation(Reference& target, Version version, bool fromFetch, MutationRef const& mutation) { + addMutation(*target, version, fromFetch, mutation); } template void splitMutations(StorageServer* data, KeyRangeMap& map, VerUpdateRef const& update) { for (int i = 0; i < update.mutations.size(); i++) { - splitMutation(data, map, update.mutations[i], update.version); + splitMutation(data, map, update.mutations[i], update.version, update.version); } } template -void splitMutation(StorageServer* data, KeyRangeMap& map, MutationRef const& m, Version ver) { +void splitMutation(StorageServer* data, KeyRangeMap& map, MutationRef const& m, Version ver, bool fromFetch) { if (isSingleKeyMutation((MutationRef::Type)m.type)) { if (!SHORT_CIRCUT_ACTUAL_STORAGE || !normalKeys.contains(m.param1)) - addMutation(map.rangeContaining(m.param1)->value(), ver, m); + addMutation(map.rangeContaining(m.param1)->value(), ver, fromFetch, m); } else if (m.type == MutationRef::ClearRange) { KeyRangeRef mKeys(m.param1, m.param2); if (!SHORT_CIRCUT_ACTUAL_STORAGE || !normalKeys.contains(mKeys)) { auto r = map.intersectingRanges(mKeys); for (auto i = r.begin(); i != r.end(); ++i) { KeyRangeRef k = mKeys & i->range(); - addMutation(i->value(), ver, MutationRef((MutationRef::Type)m.type, k.begin, k.end)); + addMutation(i->value(), ver, fromFetch, MutationRef((MutationRef::Type)m.type, k.begin, k.end)); } } } else @@ -2873,12 +3240,222 @@ ACTOR Future tryGetRange(PromiseStream results, Transaction* } } +#define PERSIST_PREFIX "\xff\xff" + +// Immutable +static const KeyValueRef persistFormat(LiteralStringRef(PERSIST_PREFIX "Format"), + LiteralStringRef("FoundationDB/StorageServer/1/4")); +static const KeyRangeRef persistFormatReadableRange(LiteralStringRef("FoundationDB/StorageServer/1/2"), + LiteralStringRef("FoundationDB/StorageServer/1/5")); +static const KeyRef persistID = LiteralStringRef(PERSIST_PREFIX "ID"); +static const KeyRef persistTssPairID = LiteralStringRef(PERSIST_PREFIX "tssPairID"); +static const KeyRef persistTssQuarantine = LiteralStringRef(PERSIST_PREFIX "tssQ"); + +// (Potentially) change with the durable version or when fetchKeys completes +static const KeyRef persistVersion = LiteralStringRef(PERSIST_PREFIX "Version"); +static const KeyRangeRef persistShardAssignedKeys = + KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "ShardAssigned/"), LiteralStringRef(PERSIST_PREFIX "ShardAssigned0")); +static const KeyRangeRef persistShardAvailableKeys = + KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "ShardAvailable/"), LiteralStringRef(PERSIST_PREFIX "ShardAvailable0")); +static const KeyRangeRef persistByteSampleKeys = + KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "BS/"), LiteralStringRef(PERSIST_PREFIX "BS0")); +static const KeyRangeRef persistByteSampleSampleKeys = + KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS/"), + LiteralStringRef(PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS0")); +static const KeyRef persistLogProtocol = LiteralStringRef(PERSIST_PREFIX "LogProtocol"); +static const KeyRef persistPrimaryLocality = LiteralStringRef(PERSIST_PREFIX "PrimaryLocality"); +static const KeyRangeRef persistChangeFeedKeys = + KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "RF/"), LiteralStringRef(PERSIST_PREFIX "RF0")); +// data keys are unmangled (but never start with PERSIST_PREFIX because they are always in allKeys) + +ACTOR Future fetchChangeFeedApplier(StorageServer* data, + Reference changeFeedInfo, + Key rangeId, + KeyRange range, + Version fetchVersion, + bool existing) { + state PromiseStream>> feedResults; + state Future feed = data->cx->getChangeFeedStream( + feedResults, rangeId, 0, existing ? fetchVersion + 1 : data->version.get() + 1, range); + + if (!existing) { + try { + loop { + Standalone> res = waitNext(feedResults.getFuture()); + for (auto& it : res) { + if (it.mutations.size()) { + data->storage.writeKeyValue( + KeyValueRef(changeFeedDurableKey(rangeId, it.version), + changeFeedDurableValue(it.mutations, it.knownCommittedVersion))); + changeFeedInfo->storageVersion = std::max(changeFeedInfo->durableVersion, it.version); + changeFeedInfo->durableVersion = changeFeedInfo->storageVersion; + } + } + wait(yield()); + } + } catch (Error& e) { + if (e.code() != error_code_end_of_stream) { + throw; + } + return Void(); + } + } + + state PromiseStream> localResults; + + // Add 2 to fetch version to make sure the local stream will have more versions in the stream than the remote stream + // to avoid edge cases in the merge logic + state Future localStream = localChangeFeedStream(data, localResults, rangeId, 0, fetchVersion + 2, range); + state Standalone localResult; + + Standalone _localResult = waitNext(localResults.getFuture()); + localResult = _localResult; + try { + loop { + state Standalone> remoteResult = waitNext(feedResults.getFuture()); + state int remoteLoc = 0; + while (remoteLoc < remoteResult.size()) { + if (remoteResult[remoteLoc].version < localResult.version) { + if (remoteResult[remoteLoc].mutations.size()) { + data->storage.writeKeyValue( + KeyValueRef(changeFeedDurableKey(rangeId, remoteResult[remoteLoc].version), + changeFeedDurableValue(remoteResult[remoteLoc].mutations, + remoteResult[remoteLoc].knownCommittedVersion))); + changeFeedInfo->storageVersion = + std::max(changeFeedInfo->durableVersion, remoteResult[remoteLoc].version); + changeFeedInfo->durableVersion = changeFeedInfo->storageVersion; + } + remoteLoc++; + } else if (remoteResult[remoteLoc].version == localResult.version) { + if (remoteResult[remoteLoc].mutations.size()) { + ASSERT(localResult.mutations.size()); + remoteResult[remoteLoc].mutations.append( + remoteResult.arena(), localResult.mutations.begin(), localResult.mutations.size()); + data->storage.writeKeyValue( + KeyValueRef(changeFeedDurableKey(rangeId, remoteResult[remoteLoc].version), + changeFeedDurableValue(remoteResult[remoteLoc].mutations, + remoteResult[remoteLoc].knownCommittedVersion))); + changeFeedInfo->storageVersion = + std::max(changeFeedInfo->durableVersion, remoteResult[remoteLoc].version); + changeFeedInfo->durableVersion = changeFeedInfo->storageVersion; + } + remoteLoc++; + Standalone _localResult = waitNext(localResults.getFuture()); + localResult = _localResult; + } else { + Standalone _localResult = waitNext(localResults.getFuture()); + localResult = _localResult; + } + } + wait(yield()); + } + } catch (Error& e) { + if (e.code() != error_code_end_of_stream) { + throw; + } + } + return Void(); +} + +ACTOR Future fetchChangeFeed(StorageServer* data, + Key rangeId, + KeyRange range, + bool stopped, + Version fetchVersion) { + state Reference changeFeedInfo; + wait(delay(0)); // allow this actor to be cancelled by removals + state bool existing = data->uidChangeFeed.count(rangeId); + + TraceEvent(SevDebug, "FetchChangeFeed", data->thisServerID) + .detail("RangeID", rangeId.printable()) + .detail("Range", range.toString()) + .detail("Existing", existing); + + if (!existing) { + changeFeedInfo = Reference(new ChangeFeedInfo()); + changeFeedInfo->range = range; + changeFeedInfo->id = rangeId; + changeFeedInfo->stopped = stopped; + data->uidChangeFeed[rangeId] = changeFeedInfo; + auto rs = data->keyChangeFeed.modify(range); + for (auto r = rs.begin(); r != rs.end(); ++r) { + r->value().push_back(changeFeedInfo); + } + data->keyChangeFeed.coalesce(range.contents()); + auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion()); + data->addMutationToMutationLog( + mLV, + MutationRef(MutationRef::SetValue, + persistChangeFeedKeys.begin.toString() + rangeId.toString(), + changeFeedValue(range, invalidVersion, ChangeFeedStatus::CHANGE_FEED_CREATE))); + } else { + changeFeedInfo = data->uidChangeFeed[rangeId]; + } + + loop { + try { + wait(fetchChangeFeedApplier(data, changeFeedInfo, rangeId, range, fetchVersion, existing)); + return Void(); + } catch (Error& e) { + if (e.code() != error_code_change_feed_not_registered) { + throw; + } + } + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); + } +} + +ACTOR Future dispatchChangeFeeds(StorageServer* data, UID fetchKeysID, KeyRange keys, Version fetchVersion) { + // find overlapping range feeds + state std::map> feedFetches; + state PromiseStream removals; + data->changeFeedRemovals[fetchKeysID] = removals; + try { + state std::vector feeds = + wait(data->cx->getOverlappingChangeFeeds(keys, fetchVersion + 1)); + for (auto& feed : feeds) { + feedFetches[feed.rangeId] = fetchChangeFeed(data, feed.rangeId, feed.range, feed.stopped, fetchVersion); + } + + loop { + Future nextFeed = Never(); + if (!removals.getFuture().isReady()) { + bool done = true; + while (!feedFetches.empty()) { + if (feedFetches.begin()->second.isReady()) { + feedFetches.erase(feedFetches.begin()); + } else { + nextFeed = feedFetches.begin()->second; + done = false; + break; + } + } + if (done) { + data->changeFeedRemovals.erase(fetchKeysID); + return Void(); + } + } + choose { + when(Key remove = waitNext(removals.getFuture())) { feedFetches.erase(remove); } + when(wait(nextFeed)) {} + } + } + + } catch (Error& e) { + if (!data->shuttingDown) { + data->changeFeedRemovals.erase(fetchKeysID); + } + throw; + } +} + ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { state const UID fetchKeysID = deterministicRandom()->randomUniqueID(); state TraceInterval interval("FetchKeys"); state KeyRange keys = shard->keys; state Future warningLogger = logFetchKeysWarning(shard); state const double startTime = now(); + state Version fetchVersion = invalidVersion; state FetchKeysMetricReporter metricReporter(fetchKeysID, startTime, keys, @@ -2947,7 +3524,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { loop { state Transaction tr(data->cx); - state Version fetchVersion = data->version.get(); + fetchVersion = data->version.get(); TraceEvent(SevDebug, "FetchKeysUnblocked", data->thisServerID) .detail("FKID", interval.pairID) @@ -3096,8 +3673,11 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { // being recovered. Instead we wait for the updateStorage loop to commit something (and consequently also what // we have written) + state Future fetchDurable = data->durableVersion.whenAtLeast(data->storageVersion() + 1); + wait(dispatchChangeFeeds(data, fetchKeysID, keys, fetchVersion)); + holdingFKPL.release(); - wait(data->durableVersion.whenAtLeast(data->storageVersion() + 1)); + wait(fetchDurable); TraceEvent(SevDebug, "FKAfterFinalCommit", data->thisServerID) .detail("FKID", interval.pairID) @@ -3139,6 +3719,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { // Eager reads will be done for them by update(), and the mutations will come back through // AddingShard::addMutations and be applied to versionedMap and mutationLog as normal. The lie about their // version is acceptable because this shard will never be read at versions < transferredVersion + for (auto i = shard->updates.begin(); i != shard->updates.end(); ++i) { i->version = shard->transferredVersion; batch->arena.dependsOn(i->arena()); @@ -3224,7 +3805,7 @@ AddingShard::AddingShard(StorageServer* server, KeyRangeRef const& keys) fetchClient = fetchKeys(server, this); } -void AddingShard::addMutation(Version version, MutationRef const& mutation) { +void AddingShard::addMutation(Version version, bool fromFetch, MutationRef const& mutation) { if (mutation.type == mutation.ClearRange) { ASSERT(keys.begin <= mutation.param1 && mutation.param2 <= keys.end); } else if (isSingleKeyMutation((MutationRef::Type)mutation.type)) { @@ -3247,19 +3828,45 @@ void AddingShard::addMutation(Version version, MutationRef const& mutation) { } // Add the mutation to the version. updates.back().mutations.push_back_deep(updates.back().arena(), mutation); + if (!fromFetch) { + if (mutation.type == MutationRef::SetValue) { + for (auto& it : server->keyChangeFeed[mutation.param1]) { + if (!it->stopped) { + if (it->mutations.empty() || it->mutations.back().version != version) { + it->mutations.push_back(MutationsAndVersionRef(version, server->knownCommittedVersion)); + } + it->mutations.back().mutations.push_back_deep(it->mutations.back().arena(), mutation); + server->currentChangeFeeds.insert(it->id); + } + } + } else if (mutation.type == MutationRef::ClearRange) { + auto ranges = server->keyChangeFeed.intersectingRanges(KeyRangeRef(mutation.param1, mutation.param2)); + for (auto& r : ranges) { + for (auto& it : r.value()) { + if (!it->stopped) { + if (it->mutations.empty() || it->mutations.back().version != version) { + it->mutations.push_back(MutationsAndVersionRef(version, server->knownCommittedVersion)); + } + it->mutations.back().mutations.push_back_deep(it->mutations.back().arena(), mutation); + server->currentChangeFeeds.insert(it->id); + } + } + } + } + } } else if (phase == Waiting) { - server->addMutation(version, mutation, keys, server->updateEagerReads); + server->addMutation(version, fromFetch, mutation, keys, server->updateEagerReads); } else ASSERT(false); } -void ShardInfo::addMutation(Version version, MutationRef const& mutation) { +void ShardInfo::addMutation(Version version, bool fromFetch, MutationRef const& mutation) { ASSERT((void*)this); ASSERT(keys.contains(mutation.param1)); if (adding) - adding->addMutation(version, mutation); + adding->addMutation(version, fromFetch, mutation); else if (readWrite) - readWrite->addMutation(version, mutation, this->keys, readWrite->updateEagerReads); + readWrite->addMutation(version, fromFetch, mutation, this->keys, readWrite->updateEagerReads); else if (mutation.type != MutationRef::ClearRange) { TraceEvent(SevError, "DeliveredToNotAssigned").detail("Version", version).detail("Mutation", mutation); ASSERT(false); // Mutation delivered to notAssigned shard! @@ -3403,7 +4010,7 @@ void changeServerKeys(StorageServer* data, // Clear the moving-in empty range, and set it available at the latestVersion. for (const auto& range : newEmptyRanges) { MutationRef clearRange(MutationRef::ClearRange, range.begin, range.end); - data->addMutation(data->data().getLatestVersion(), clearRange, range, data->updateEagerReads); + data->addMutation(data->data().getLatestVersion(), true, clearRange, range, data->updateEagerReads); data->newestAvailableVersion.insert(range, latestVersion); setAvailableStatus(data, range, true); } @@ -3427,6 +4034,7 @@ void rollback(StorageServer* data, Version rollbackVersion, Version nextVersion) } void StorageServer::addMutation(Version version, + bool fromFetch, MutationRef const& mutation, KeyRangeRef const& shard, UpdateEagerReadInfo* eagerReads) { @@ -3440,7 +4048,7 @@ void StorageServer::addMutation(Version version, DEBUG_MUTATION("applyMutation", version, expanded, thisServerID) .detail("ShardBegin", shard.begin) .detail("ShardEnd", shard.end); - applyMutation(this, expanded, mLog.arena(), mutableData()); + applyMutation(this, expanded, mLog.arena(), mutableData(), version, fromFetch); // printf("\nSSUpdate: Printing versioned tree after applying mutation\n"); // mutableData().printTree(version); } @@ -3455,32 +4063,6 @@ struct OrderByVersion { } }; -#define PERSIST_PREFIX "\xff\xff" - -// Immutable -static const KeyValueRef persistFormat(LiteralStringRef(PERSIST_PREFIX "Format"), - LiteralStringRef("FoundationDB/StorageServer/1/4")); -static const KeyRangeRef persistFormatReadableRange(LiteralStringRef("FoundationDB/StorageServer/1/2"), - LiteralStringRef("FoundationDB/StorageServer/1/5")); -static const KeyRef persistID = LiteralStringRef(PERSIST_PREFIX "ID"); -static const KeyRef persistTssPairID = LiteralStringRef(PERSIST_PREFIX "tssPairID"); -static const KeyRef persistTssQuarantine = LiteralStringRef(PERSIST_PREFIX "tssQ"); - -// (Potentially) change with the durable version or when fetchKeys completes -static const KeyRef persistVersion = LiteralStringRef(PERSIST_PREFIX "Version"); -static const KeyRangeRef persistShardAssignedKeys = - KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "ShardAssigned/"), LiteralStringRef(PERSIST_PREFIX "ShardAssigned0")); -static const KeyRangeRef persistShardAvailableKeys = - KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "ShardAvailable/"), LiteralStringRef(PERSIST_PREFIX "ShardAvailable0")); -static const KeyRangeRef persistByteSampleKeys = - KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "BS/"), LiteralStringRef(PERSIST_PREFIX "BS0")); -static const KeyRangeRef persistByteSampleSampleKeys = - KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS/"), - LiteralStringRef(PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS0")); -static const KeyRef persistLogProtocol = LiteralStringRef(PERSIST_PREFIX "LogProtocol"); -static const KeyRef persistPrimaryLocality = LiteralStringRef(PERSIST_PREFIX "PrimaryLocality"); -// data keys are unmangled (but never start with PERSIST_PREFIX because they are always in allKeys) - class StorageUpdater { public: StorageUpdater() @@ -3490,7 +4072,7 @@ public: : currentVersion(fromVersion), fromVersion(fromVersion), restoredVersion(restoredVersion), processedStartKey(false), processedCacheStartKey(false) {} - void applyMutation(StorageServer* data, MutationRef const& m, Version ver) { + void applyMutation(StorageServer* data, MutationRef const& m, Version ver, bool fromFetch) { //TraceEvent("SSNewVersion", data->thisServerID).detail("VerWas", data->mutableData().latestVersion).detail("ChVer", ver); if (currentVersion != ver) { @@ -3511,7 +4093,7 @@ public: // DEBUG_MUTATION("SSUpdateMutation", changes[c].version, *m, data->thisServerID); //} - splitMutation(data, data->shards, m, ver); + splitMutation(data, data->shards, m, ver, fromFetch); } if (data->otherError.getFuture().isReady()) @@ -3583,6 +4165,11 @@ private: ASSERT(rollbackVersion >= data->storageVersion()); rollback(data, rollbackVersion, currentVersion); } + for (auto& it : data->uidChangeFeed) { + it.second->mutations.push_back(MutationsAndVersionRef(currentVersion, rollbackVersion)); + it.second->mutations.back().mutations.push_back_deep(it.second->mutations.back().arena(), m); + data->currentChangeFeeds.insert(it.first); + } data->recoveryVersionSkips.emplace_back(rollbackVersion, currentVersion - rollbackVersion); } else if (m.type == MutationRef::SetValue && m.param1 == killStoragePrivateKey) { @@ -3615,6 +4202,82 @@ private: data->primaryLocality = BinaryReader::fromStringRef(m.param2, Unversioned()); auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion()); data->addMutationToMutationLog(mLV, MutationRef(MutationRef::SetValue, persistPrimaryLocality, m.param2)); + } else if (m.type == MutationRef::SetValue && m.param1.startsWith(changeFeedPrivatePrefix)) { + Key changeFeedId = m.param1.removePrefix(changeFeedPrivatePrefix); + KeyRange changeFeedRange; + Version popVersion; + ChangeFeedStatus status; + std::tie(changeFeedRange, popVersion, status) = decodeChangeFeedValue(m.param2); + auto feed = data->uidChangeFeed.find(changeFeedId); + if (feed == data->uidChangeFeed.end()) { + if (status == ChangeFeedStatus::CHANGE_FEED_CREATE) { + TraceEvent(SevDebug, "AddingChangeFeed", data->thisServerID) + .detail("RangeID", changeFeedId.printable()) + .detail("Range", changeFeedRange.toString()) + .detail("Version", currentVersion); + Reference changeFeedInfo(new ChangeFeedInfo()); + changeFeedInfo->range = changeFeedRange; + changeFeedInfo->id = changeFeedId; + changeFeedInfo->emptyVersion = currentVersion - 1; + data->uidChangeFeed[changeFeedId] = changeFeedInfo; + + auto rs = data->keyChangeFeed.modify(changeFeedRange); + for (auto r = rs.begin(); r != rs.end(); ++r) { + r->value().push_back(changeFeedInfo); + } + data->keyChangeFeed.coalesce(changeFeedRange.contents()); + auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion()); + data->addMutationToMutationLog( + mLV, + MutationRef(MutationRef::SetValue, + persistChangeFeedKeys.begin.toString() + changeFeedId.toString(), + m.param2)); + } + } else { + if (status == ChangeFeedStatus::CHANGE_FEED_DESTROY) { + Key beginClearKey = changeFeedId.withPrefix(persistChangeFeedKeys.begin); + auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion()); + data->addMutationToMutationLog( + mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey))); + data->addMutationToMutationLog(mLV, + MutationRef(MutationRef::ClearRange, + changeFeedDurableKey(feed->second->id, 0), + changeFeedDurableKey(feed->second->id, currentVersion))); + auto rs = data->keyChangeFeed.modify(feed->second->range); + for (auto r = rs.begin(); r != rs.end(); ++r) { + auto& feedList = r->value(); + for (int i = 0; i < feedList.size(); i++) { + if (feedList[i] == feed->second) { + swapAndPop(&feedList, i--); + } + } + } + data->uidChangeFeed.erase(feed); + } else { + if (popVersion != invalidVersion && popVersion - 1 > feed->second->emptyVersion) { + feed->second->emptyVersion = popVersion - 1; + while (!feed->second->mutations.empty() && + feed->second->mutations.front().version < popVersion) { + feed->second->mutations.pop_front(); + } + if (feed->second->storageVersion != invalidVersion) { + data->storage.clearRange(KeyRangeRef(changeFeedDurableKey(feed->second->id, 0), + changeFeedDurableKey(feed->second->id, popVersion))); + if (popVersion > feed->second->storageVersion) { + feed->second->storageVersion = invalidVersion; + feed->second->durableVersion = invalidVersion; + } + } + } + feed->second->stopped = (status == ChangeFeedStatus::CHANGE_FEED_STOP); + auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion()); + data->addMutationToMutationLog( + mLV, + MutationRef(MutationRef::SetValue, + persistChangeFeedKeys.begin.toString() + changeFeedId.toString(), + m.param2)); + } + } } else if (m.param1.substr(1).startsWith(tssMappingKeys.begin) && (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange)) { if (!data->isTss()) { @@ -3763,6 +4426,7 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { ++data->counters.updateBatches; data->lastTLogVersion = cursor->getMaxKnownVersion(); + data->knownCommittedVersion = cursor->getMinKnownCommittedVersion(); data->versionLag = std::max(0, data->lastTLogVersion - data->version.get()); ASSERT(*pReceivedUpdate == false); @@ -3874,7 +4538,7 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { state int mutationNum = 0; state VerUpdateRef* pUpdate = &fii.changes[changeNum]; for (; mutationNum < pUpdate->mutations.size(); mutationNum++) { - updater.applyMutation(data, pUpdate->mutations[mutationNum], pUpdate->version); + updater.applyMutation(data, pUpdate->mutations[mutationNum], pUpdate->version, true); mutationBytes += pUpdate->mutations[mutationNum].totalSize(); // data->counters.mutationBytes or data->counters.mutations should not be updated because they should // have counted when the mutations arrive from cursor initially. @@ -3906,6 +4570,17 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { if (cloneCursor2->version().version > ver && cloneCursor2->version().version > data->version.get()) { ++data->counters.updateVersions; + if (data->currentChangeFeeds.size()) { + data->changeFeedVersions.push_back(std::make_pair( + std::vector(data->currentChangeFeeds.begin(), data->currentChangeFeeds.end()), ver)); + for (auto& it : data->currentChangeFeeds) { + auto feed = data->uidChangeFeed.find(it); + if (feed != data->uidChangeFeed.end()) { + feed->second->newMutations.trigger(); + } + } + data->currentChangeFeeds.clear(); + } ver = cloneCursor2->version().version; } @@ -3953,7 +4628,7 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { .detail("Version", cloneCursor2->version().toString()); } - updater.applyMutation(data, msg, ver); + updater.applyMutation(data, msg, ver, false); mutationBytes += msg.totalSize(); data->counters.mutationBytes += msg.totalSize(); ++data->counters.mutations; @@ -3986,6 +4661,17 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { } } data->tLogMsgsPTreeUpdatesLatencyHistogram->sampleSeconds(now() - beforeTLogMsgsUpdates); + if (data->currentChangeFeeds.size()) { + data->changeFeedVersions.push_back(std::make_pair( + std::vector(data->currentChangeFeeds.begin(), data->currentChangeFeeds.end()), ver)); + for (auto& it : data->currentChangeFeeds) { + auto feed = data->uidChangeFeed.find(it); + if (feed != data->uidChangeFeed.end()) { + feed->second->newMutations.trigger(); + } + } + data->currentChangeFeeds.clear(); + } if (ver != invalidVersion) { data->lastVersionWithData = ver; @@ -4121,6 +4807,32 @@ ACTOR Future updateStorage(StorageServer* data) { break; } + std::set modifiedChangeFeeds; + while (!data->changeFeedVersions.empty() && data->changeFeedVersions.front().second <= newOldestVersion) { + modifiedChangeFeeds.insert(data->changeFeedVersions.front().first.begin(), + data->changeFeedVersions.front().first.end()); + data->changeFeedVersions.pop_front(); + } + + state std::vector updatedChangeFeeds(modifiedChangeFeeds.begin(), modifiedChangeFeeds.end()); + state int curFeed = 0; + while (curFeed < updatedChangeFeeds.size()) { + auto info = data->uidChangeFeed.find(updatedChangeFeeds[curFeed]); + if (info != data->uidChangeFeed.end()) { + for (auto& it : info->second->mutations) { + if (it.version > newOldestVersion) { + break; + } + data->storage.writeKeyValue( + KeyValueRef(changeFeedDurableKey(info->second->id, it.version), + changeFeedDurableValue(it.mutations, it.knownCommittedVersion))); + info->second->storageVersion = it.version; + } + wait(yield(TaskPriority::UpdateStorage)); + } + curFeed++; + } + // Set the new durable version as part of the outstanding change set, before commit if (startOldestVersion != newOldestVersion) data->storage.makeVersionDurable(newOldestVersion); @@ -4154,6 +4866,19 @@ ACTOR Future updateStorage(StorageServer* data) { throw please_reboot(); } + curFeed = 0; + while (curFeed < updatedChangeFeeds.size()) { + auto info = data->uidChangeFeed.find(updatedChangeFeeds[curFeed]); + if (info != data->uidChangeFeed.end()) { + while (!info->second->mutations.empty() && info->second->mutations.front().version < newOldestVersion) { + info->second->mutations.pop_front(); + } + info->second->durableVersion = info->second->storageVersion; + wait(yield(TaskPriority::UpdateStorage)); + } + curFeed++; + } + durableInProgress.send(Void()); wait(delay(0, TaskPriority::UpdateStorage)); // Setting durableInProgess could cause the storage server to shut // down, so delay to check for cancellation @@ -4439,6 +5164,7 @@ ACTOR Future restoreDurableState(StorageServer* data, IKeyValueStore* stor state Future> fPrimaryLocality = storage->readValue(persistPrimaryLocality); state Future fShardAssigned = storage->readRange(persistShardAssignedKeys); state Future fShardAvailable = storage->readRange(persistShardAvailableKeys); + state Future fChangeFeeds = storage->readRange(persistChangeFeedKeys); state Promise byteSampleSampleRecovered; state Promise startByteSampleRestore; @@ -4447,7 +5173,7 @@ ACTOR Future restoreDurableState(StorageServer* data, IKeyValueStore* stor TraceEvent("ReadingDurableState", data->thisServerID).log(); wait(waitForAll(std::vector{ fFormat, fID, ftssPairID, fTssQuarantine, fVersion, fLogProtocol, fPrimaryLocality })); - wait(waitForAll(std::vector{ fShardAssigned, fShardAvailable })); + wait(waitForAll(std::vector{ fShardAssigned, fShardAvailable, fChangeFeeds })); wait(byteSampleSampleRecovered.getFuture()); TraceEvent("RestoringDurableState", data->thisServerID).log(); @@ -4525,6 +5251,34 @@ ACTOR Future restoreDurableState(StorageServer* data, IKeyValueStore* stor wait(yield()); } + state RangeResult changeFeeds = fChangeFeeds.get(); + state int feedLoc; + for (feedLoc = 0; feedLoc < changeFeeds.size(); feedLoc++) { + Key changeFeedId = changeFeeds[feedLoc].key.removePrefix(persistChangeFeedKeys.begin); + KeyRange changeFeedRange; + Version popVersion; + ChangeFeedStatus status; + std::tie(changeFeedRange, popVersion, status) = decodeChangeFeedValue(changeFeeds[feedLoc].value); + TraceEvent(SevDebug, "RestoringChangeFeed", data->thisServerID) + .detail("RangeID", changeFeedId.printable()) + .detail("Range", changeFeedRange.toString()) + .detail("Status", status) + .detail("PopVer", popVersion); + Reference changeFeedInfo(new ChangeFeedInfo()); + changeFeedInfo->range = changeFeedRange; + changeFeedInfo->id = changeFeedId; + changeFeedInfo->durableVersion = version; + changeFeedInfo->storageVersion = version; + changeFeedInfo->emptyVersion = popVersion - 1; + changeFeedInfo->stopped = status == ChangeFeedStatus::CHANGE_FEED_STOP; + data->uidChangeFeed[changeFeedId] = changeFeedInfo; + auto rs = data->keyChangeFeed.modify(changeFeedRange); + for (auto r = rs.begin(); r != rs.end(); ++r) { + r->value().push_back(changeFeedInfo); + } + wait(yield()); + } + data->keyChangeFeed.coalesce(allKeys); // TODO: why is this seemingly random delay here? wait(delay(0.0001)); @@ -5039,6 +5793,30 @@ ACTOR Future serveWatchValueRequests(StorageServer* self, FutureStream serveChangeFeedStreamRequests(StorageServer* self, + FutureStream changeFeedStream) { + loop { + ChangeFeedStreamRequest req = waitNext(changeFeedStream); + self->actors.add(changeFeedStreamQ(self, req)); + } +} + +ACTOR Future serveOverlappingChangeFeedsRequests( + StorageServer* self, + FutureStream overlappingChangeFeeds) { + loop { + OverlappingChangeFeedsRequest req = waitNext(overlappingChangeFeeds); + self->actors.add(self->readGuard(req, overlappingChangeFeedsQ)); + } +} + +ACTOR Future serveChangeFeedPopRequests(StorageServer* self, FutureStream changeFeedPops) { + loop { + ChangeFeedPopRequest req = waitNext(changeFeedPops); + self->actors.add(self->readGuard(req, changeFeedPopQ)); + } +} + ACTOR Future reportStorageServerState(StorageServer* self) { if (!SERVER_KNOBS->REPORT_DD_METRICS) { return Void(); @@ -5088,6 +5866,9 @@ ACTOR Future storageServerCore(StorageServer* self, StorageServerInterface self->actors.add(serveGetKeyValuesStreamRequests(self, ssi.getKeyValuesStream.getFuture())); self->actors.add(serveGetKeyRequests(self, ssi.getKey.getFuture())); self->actors.add(serveWatchValueRequests(self, ssi.watchValue.getFuture())); + self->actors.add(serveChangeFeedStreamRequests(self, ssi.changeFeedStream.getFuture())); + self->actors.add(serveOverlappingChangeFeedsRequests(self, ssi.overlappingChangeFeeds.getFuture())); + self->actors.add(serveChangeFeedPopRequests(self, ssi.changeFeedPop.getFuture())); self->actors.add(traceRole(Role::STORAGE_SERVER, ssi.id())); self->actors.add(reportStorageServerState(self)); diff --git a/fdbserver/workloads/ChangeFeeds.actor.cpp b/fdbserver/workloads/ChangeFeeds.actor.cpp new file mode 100644 index 0000000000..06415d0a81 --- /dev/null +++ b/fdbserver/workloads/ChangeFeeds.actor.cpp @@ -0,0 +1,223 @@ +/* + * ChangeFeeds.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/FDBOptions.g.h" +#include "fdbclient/ManagementAPI.actor.h" +#include "fdbclient/NativeAPI.actor.h" +#include "fdbclient/SystemData.h" +#include "fdbserver/TesterInterface.actor.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "fdbserver/workloads/BulkSetup.actor.h" +#include "flow/Arena.h" +#include "flow/IRandom.h" +#include "flow/Trace.h" +#include "flow/actorcompiler.h" // This must be the last #include. +#include "flow/serialize.h" +#include + +ACTOR Future>, Version>> readDatabase(Database cx) { + state Transaction tr(cx); + loop { + state Standalone> output; + state Version readVersion; + try { + Version ver = wait(tr.getReadVersion()); + readVersion = ver; + + state PromiseStream> results; + state Future stream = tr.getRangeStream(results, normalKeys, GetRangeLimits()); + + loop { + Standalone res = waitNext(results.getFuture()); + output.arena().dependsOn(res.arena()); + output.append(output.arena(), res.begin(), res.size()); + } + } catch (Error& e) { + if (e.code() == error_code_end_of_stream) { + return std::make_pair(output, readVersion); + } + wait(tr.onError(e)); + } + } +} + +ACTOR Future>> readMutations(Database cx, + Key rangeID, + Version begin, + Version end) { + state Standalone> output; + loop { + try { + state PromiseStream>> results; + state Future stream = cx->getChangeFeedStream(results, rangeID, begin, end, normalKeys); + loop { + Standalone> res = waitNext(results.getFuture()); + output.arena().dependsOn(res.arena()); + for (auto& it : res) { + if (it.mutations.size() == 1 && it.mutations.back().param1 == lastEpochEndPrivateKey) { + Version rollbackVersion; + BinaryReader br(it.mutations.back().param2, Unversioned()); + br >> rollbackVersion; + TraceEvent("ChangeFeedRollback") + .detail("Ver", it.version) + .detail("RollbackVer", rollbackVersion); + while (output.size() && output.back().version > rollbackVersion) { + TraceEvent("ChangeFeedRollbackVer").detail("Ver", output.back().version); + output.pop_back(); + } + } else { + output.push_back(output.arena(), it); + } + } + begin = res.back().version + 1; + } + } catch (Error& e) { + if (e.code() == error_code_end_of_stream) { + return output; + } + throw; + } + } +} + +Standalone> advanceData(Standalone> source, + Standalone> mutations) { + StringRef dbgKey = LiteralStringRef(""); + std::map data; + for (auto& kv : source) { + if (kv.key == dbgKey) + TraceEvent("ChangeFeedDbgStart").detail("K", kv.key).detail("V", kv.value); + data[kv.key] = kv.value; + } + for (auto& it : mutations) { + for (auto& m : it.mutations) { + if (m.type == MutationRef::SetValue) { + if (m.param1 == dbgKey) + TraceEvent("ChangeFeedDbgSet") + .detail("Ver", it.version) + .detail("K", m.param1) + .detail("V", m.param2); + data[m.param1] = m.param2; + } else { + ASSERT(m.type == MutationRef::ClearRange); + if (KeyRangeRef(m.param1, m.param2).contains(dbgKey)) + TraceEvent("ChangeFeedDbgClear") + .detail("Ver", it.version) + .detail("Begin", m.param1) + .detail("End", m.param2); + data.erase(data.lower_bound(m.param1), data.lower_bound(m.param2)); + } + } + } + Standalone> output; + output.arena().dependsOn(source.arena()); + output.arena().dependsOn(mutations.arena()); + for (auto& kv : data) { + output.push_back(output.arena(), KeyValueRef(kv.first, kv.second)); + } + return output; +} + +bool compareData(Standalone> source, Standalone> dest) { + if (source.size() != dest.size()) { + TraceEvent(SevError, "ChangeFeedSizeMismatch").detail("SrcSize", source.size()).detail("DestSize", dest.size()); + } + for (int i = 0; i < std::min(source.size(), dest.size()); i++) { + if (source[i] != dest[i]) { + TraceEvent("ChangeFeedMutationMismatch") + .detail("Index", i) + .detail("SrcKey", source[i].key) + .detail("DestKey", dest[i].key) + .detail("SrcValue", source[i].value) + .detail("DestValue", dest[i].value); + return false; + } + } + return source.size() == dest.size(); +} + +struct ChangeFeedsWorkload : TestWorkload { + double testDuration; + Future client; + + ChangeFeedsWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { + testDuration = getOption(options, "testDuration"_sr, 10.0); + } + + std::string description() const override { return "ChangeFeedsWorkload"; } + Future setup(Database const& cx) override { return Void(); } + Future start(Database const& cx) override { + if (clientId == 0) { + client = changeFeedClient(cx->clone(), this); + return delay(testDuration); + } + return Void(); + } + Future check(Database const& cx) override { + client = Future(); + return true; + } + void getMetrics(std::vector& m) override {} + + ACTOR Future changeFeedClient(Database cx, ChangeFeedsWorkload* self) { + // Enable change feeds for a key range + state Key rangeID = StringRef(deterministicRandom()->randomUniqueID().toString()); + wait(updateChangeFeed(cx, rangeID, ChangeFeedStatus::CHANGE_FEED_CREATE, normalKeys)); + + loop { + wait(delay(deterministicRandom()->random01())); + + state std::pair>, Version> firstResults = wait(readDatabase(cx)); + TraceEvent("ChangeFeedReadDB").detail("Ver1", firstResults.second); + + wait(delay(10 * deterministicRandom()->random01())); + + state std::pair>, Version> secondResults = wait(readDatabase(cx)); + TraceEvent("ChangeFeedReadDB").detail("Ver2", secondResults.second); + state Standalone> mutations = + wait(readMutations(cx, rangeID, firstResults.second, secondResults.second + 1)); + + Standalone> advancedResults = advanceData(firstResults.first, mutations); + + if (!compareData(secondResults.first, advancedResults)) { + TraceEvent(SevError, "ChangeFeedMismatch") + .detail("FirstVersion", firstResults.second) + .detail("SecondVersion", secondResults.second); + for (int i = 0; i < secondResults.first.size(); i++) { + TraceEvent("ChangeFeedBase") + .detail("Index", i) + .detail("K", secondResults.first[i].key) + .detail("V", secondResults.first[i].value); + } + for (int i = 0; i < advancedResults.size(); i++) { + TraceEvent("ChangeFeedAdvanced") + .detail("Index", i) + .detail("K", advancedResults[i].key) + .detail("V", advancedResults[i].value); + } + ASSERT(false); + } + + wait(cx->popChangeFeedMutations(rangeID, secondResults.second)); + } + } +}; + +WorkloadFactory ChangeFeedsWorkloadFactory("ChangeFeeds"); diff --git a/flow/Arena.h b/flow/Arena.h index d2c448f886..fab4b5e12c 100644 --- a/flow/Arena.h +++ b/flow/Arena.h @@ -640,6 +640,16 @@ struct hash { }; } // namespace std +namespace std { +template <> +struct hash> { + static constexpr std::hash hashFunc{}; + std::size_t operator()(Standalone const& tag) const { + return hashFunc(std::string_view((const char*)tag.begin(), tag.size())); + } +}; +} // namespace std + template <> struct TraceableString { static const char* begin(StringRef value) { return reinterpret_cast(value.begin()); } diff --git a/flow/ProtocolVersion.h b/flow/ProtocolVersion.h index af7c6f1108..ebf8df96bf 100644 --- a/flow/ProtocolVersion.h +++ b/flow/ProtocolVersion.h @@ -138,6 +138,7 @@ public: // introduced features PROTOCOL_VERSION_FEATURE(0x0FDB00B070010000LL, StableInterfaces); PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, TagThrottleValueReason); PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, SpanContext); + PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, ChangeFeed); PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, TSS); }; diff --git a/flow/error_definitions.h b/flow/error_definitions.h index 9be30980c7..3d60b81ebf 100755 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -79,6 +79,8 @@ ERROR( version_already_compacted, 1055, "The requested changes have been compact ERROR( local_config_changed, 1056, "Local configuration file has changed. Restart and apply these changes" ) ERROR( failed_to_reach_quorum, 1057, "Failed to reach quorum from configuration database nodes. Retry sending these requests" ) ERROR( wrong_format_version, 1058, "Format version not recognize." ) +ERROR( unknown_change_feed, 1059, "Change feed not found" ) +ERROR( change_feed_not_registered, 1060, "Change feed not registered" ) ERROR( broken_promise, 1100, "Broken promise" ) ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" ) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index dfa22ab5e3..f8f8dfd8b4 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -131,6 +131,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES fast/ConstrainedRandomSelector.toml) add_fdb_test(TEST_FILES fast/CycleAndLock.toml) add_fdb_test(TEST_FILES fast/CycleTest.toml) + add_fdb_test(TEST_FILES fast/ChangeFeeds.toml) add_fdb_test(TEST_FILES fast/DataLossRecovery.toml) add_fdb_test(TEST_FILES fast/FuzzApiCorrectness.toml) add_fdb_test(TEST_FILES fast/FuzzApiCorrectnessClean.toml) diff --git a/tests/fast/ChangeFeeds.toml b/tests/fast/ChangeFeeds.toml new file mode 100644 index 0000000000..d88341be6c --- /dev/null +++ b/tests/fast/ChangeFeeds.toml @@ -0,0 +1,35 @@ +[[test]] +testTitle = 'ChangeFeed' + + [[test.workload]] + testName = 'Cycle' + transactionsPerSecond = 250.0 + testDuration = 60.0 + expectedRate = 0 + + [[test.workload]] + testName = 'ChangeFeeds' + testDuration = 60.0 + + [[test.workload]] + testName = 'RandomClogging' + testDuration = 60.0 + + [[test.workload]] + testName = 'Rollback' + meanDelay = 60.0 + testDuration = 60.0 + + [[test.workload]] + testName = 'Attrition' + machinesToKill = 10 + machinesToLeave = 3 + reboot = true + testDuration = 60.0 + + [[test.workload]] + testName = 'Attrition' + machinesToKill = 10 + machinesToLeave = 3 + reboot = true + testDuration = 60.0