Merge pull request #5842 from sfc-gh-etschannen/feature-range-feed

Add support for change feeds
This commit is contained in:
Evan Tschannen 2021-10-25 11:31:55 -07:00 committed by GitHub
commit 92e0c92fd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 2117 additions and 63 deletions

View File

@ -16,6 +16,7 @@ set(FDBCLI_SRCS
IncludeCommand.actor.cpp
KillCommand.actor.cpp
LockCommand.actor.cpp
ChangeFeedCommand.actor.cpp
MaintenanceCommand.actor.cpp
ProfileCommand.actor.cpp
SetClassCommand.actor.cpp

View File

@ -0,0 +1,172 @@
/*
* ChangeFeedCommand.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbcli/fdbcli.actor.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/IClientApi.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/Schemas.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace {
ACTOR Future<Void> changeFeedList(Database db) {
state ReadYourWritesTransaction tr(db);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
RangeResult result = wait(tr.getRange(changeFeedKeys, CLIENT_KNOBS->TOO_MANY));
// shouldn't have many quarantined TSSes
ASSERT(!result.more);
printf("Found %d range feeds%s\n", result.size(), result.size() == 0 ? "." : ":");
for (auto& it : result) {
auto range = std::get<0>(decodeChangeFeedValue(it.value));
printf(" %s: %s - %s\n",
it.key.removePrefix(changeFeedPrefix).toString().c_str(),
range.begin.toString().c_str(),
range.end.toString().c_str());
}
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
} // namespace
namespace fdb_cli {
ACTOR Future<bool> changeFeedCommandActor(Database localDb, std::vector<StringRef> tokens, Future<Void> warn) {
if (tokens.size() == 1) {
printUsage(tokens[0]);
return false;
}
if (tokencmp(tokens[1], "list")) {
if (tokens.size() != 2) {
printUsage(tokens[0]);
return false;
}
wait(changeFeedList(localDb));
return true;
} else if (tokencmp(tokens[1], "register")) {
if (tokens.size() != 5) {
printUsage(tokens[0]);
return false;
}
wait(updateChangeFeed(
localDb, tokens[2], ChangeFeedStatus::CHANGE_FEED_CREATE, KeyRangeRef(tokens[3], tokens[4])));
} else if (tokencmp(tokens[1], "stop")) {
if (tokens.size() != 3) {
printUsage(tokens[0]);
return false;
}
wait(updateChangeFeed(localDb, tokens[2], ChangeFeedStatus::CHANGE_FEED_STOP));
} else if (tokencmp(tokens[1], "destroy")) {
if (tokens.size() != 3) {
printUsage(tokens[0]);
return false;
}
wait(updateChangeFeed(localDb, tokens[2], ChangeFeedStatus::CHANGE_FEED_DESTROY));
} else if (tokencmp(tokens[1], "stream")) {
if (tokens.size() < 3 || tokens.size() > 5) {
printUsage(tokens[0]);
return false;
}
Version begin = 0;
Version end = std::numeric_limits<Version>::max();
if (tokens.size() > 3) {
int n = 0;
if (sscanf(tokens[3].toString().c_str(), "%ld%n", &begin, &n) != 1 || n != tokens[3].size()) {
printUsage(tokens[0]);
return false;
}
}
if (tokens.size() > 4) {
int n = 0;
if (sscanf(tokens[4].toString().c_str(), "%ld%n", &end, &n) != 1 || n != tokens[4].size()) {
printUsage(tokens[0]);
return false;
}
}
if (warn.isValid()) {
warn.cancel();
}
state PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> feedResults;
state Future<Void> feed = localDb->getChangeFeedStream(feedResults, tokens[2], begin, end);
printf("\n");
try {
state Future<Void> feedInterrupt = LineNoise::onKeyboardInterrupt();
loop {
choose {
when(Standalone<VectorRef<MutationsAndVersionRef>> res = waitNext(feedResults.getFuture())) {
for (auto& it : res) {
for (auto& it2 : it.mutations) {
printf("%lld %s\n", it.version, it2.toString().c_str());
}
}
}
when(wait(feedInterrupt)) {
feedInterrupt = Future<Void>();
feed.cancel();
feedResults = PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>();
break;
}
}
}
return true;
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
return true;
}
throw;
}
} else if (tokencmp(tokens[1], "pop")) {
if (tokens.size() != 4) {
printUsage(tokens[0]);
return false;
}
Version v;
int n = 0;
if (sscanf(tokens[3].toString().c_str(), "%ld%n", &v, &n) != 1 || n != tokens[3].size()) {
printUsage(tokens[0]);
return false;
} else {
wait(localDb->popChangeFeedMutations(tokens[2], v));
}
} else {
printUsage(tokens[0]);
return false;
}
return true;
}
CommandFactory changeFeedFactory(
"changefeed",
CommandHelp("changefeed <register|destroy|stop|stream|pop|list> <RANGEID> <BEGIN> <END>", "", ""));
} // namespace fdb_cli

View File

@ -36,6 +36,7 @@
#include "fdbclient/Schemas.h"
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/TagThrottle.actor.h"
#include "fdbclient/Tuple.h"
@ -1556,6 +1557,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
state Database localDb;
state Reference<IDatabase> db;
state Reference<ITransaction> tr;
state Transaction trx;
state bool writeMode = false;
@ -1879,6 +1881,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
if (tokencmp(tokens[0], "changefeed")) {
bool _result = wait(makeInterruptable(changeFeedCommandActor(localDb, tokens, warn)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "unlock")) {
if ((tokens.size() != 2) || (tokens[1].size() != 32) ||
!std::all_of(tokens[1].begin(), tokens[1].end(), &isxdigit)) {

View File

@ -165,6 +165,8 @@ ACTOR Future<bool> killCommandActor(Reference<IDatabase> db,
// lock/unlock command
ACTOR Future<bool> lockCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
ACTOR Future<bool> unlockDatabaseActor(Reference<IDatabase> db, UID uid);
// changefeed command
ACTOR Future<bool> changeFeedCommandActor(Database localDb, std::vector<StringRef> tokens, Future<Void> warn);
// maintenance command
ACTOR Future<bool> setHealthyZone(Reference<IDatabase> db, StringRef zoneId, double seconds, bool printWarning = false);
ACTOR Future<bool> clearHealthyZone(Reference<IDatabase> db,

View File

@ -75,6 +75,9 @@ void ClientKnobs::initialize(Randomize randomize) {
init( VALUE_SIZE_LIMIT, 1e5 );
init( SPLIT_KEY_SIZE_LIMIT, KEY_SIZE_LIMIT/2 ); if( randomize && BUGGIFY ) SPLIT_KEY_SIZE_LIMIT = KEY_SIZE_LIMIT - 31;//serverKeysPrefixFor(UID()).size() - 1;
init( METADATA_VERSION_CACHE_SIZE, 1000 );
init( CHANGE_FEED_LOCATION_LIMIT, 10000 );
init( CHANGE_FEED_CACHE_SIZE, 100000 ); if( randomize && BUGGIFY ) CHANGE_FEED_CACHE_SIZE = 1;
init( CHANGE_FEED_POP_TIMEOUT, 5.0 );
init( MAX_BATCH_SIZE, 1000 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1;
init( GRV_BATCH_TIMEOUT, 0.005 ); if( randomize && BUGGIFY ) GRV_BATCH_TIMEOUT = 0.1;

View File

@ -74,6 +74,9 @@ public:
int64_t VALUE_SIZE_LIMIT;
int64_t SPLIT_KEY_SIZE_LIMIT;
int METADATA_VERSION_CACHE_SIZE;
int64_t CHANGE_FEED_LOCATION_LIMIT;
int64_t CHANGE_FEED_CACHE_SIZE;
double CHANGE_FEED_POP_TIMEOUT;
int MAX_BATCH_SIZE;
double GRV_BATCH_TIMEOUT;

View File

@ -252,6 +252,15 @@ public:
// Management API, create snapshot
Future<Void> createSnapshot(StringRef uid, StringRef snapshot_command);
Future<Void> getChangeFeedStream(const PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>& results,
Key rangeID,
Version begin = 0,
Version end = std::numeric_limits<Version>::max(),
KeyRange range = allKeys);
Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion);
Future<Void> popChangeFeedMutations(StringRef rangeID, Version version);
// private:
explicit DatabaseContext(Reference<AsyncVar<Reference<IClusterConnectionRecord>>> connectionRecord,
Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
@ -327,6 +336,8 @@ public:
std::unordered_map<UID, StorageServerInterface> tssMapping;
// map from tssid -> metrics for that tss pair
std::unordered_map<UID, Reference<TSSMetrics>> tssMetrics;
// map from changeFeedId -> changeFeedRange
std::unordered_map<Key, KeyRange> changeFeedCache;
UID dbId;
IsInternal internal; // Only contexts created through the C client and fdbcli are non-internal

View File

@ -2081,6 +2081,90 @@ ACTOR Future<Void> checkDatabaseLock(Reference<ReadYourWritesTransaction> tr, UI
return Void();
}
ACTOR Future<Void> updateChangeFeed(Transaction* tr, Key rangeID, ChangeFeedStatus status, KeyRange range) {
state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
Optional<Value> val = wait(tr->get(rangeIDKey));
if (status == ChangeFeedStatus::CHANGE_FEED_CREATE) {
if (!val.present()) {
tr->set(rangeIDKey, changeFeedValue(range, invalidVersion, status));
} else if (std::get<0>(decodeChangeFeedValue(val.get())) != range) {
throw unsupported_operation();
}
} else if (status == ChangeFeedStatus::CHANGE_FEED_STOP) {
if (val.present()) {
tr->set(rangeIDKey,
changeFeedValue(std::get<0>(decodeChangeFeedValue(val.get())),
std::get<1>(decodeChangeFeedValue(val.get())),
status));
} else {
throw unsupported_operation();
}
} else if (status == ChangeFeedStatus::CHANGE_FEED_DESTROY) {
if (val.present()) {
tr->set(rangeIDKey,
changeFeedValue(std::get<0>(decodeChangeFeedValue(val.get())),
std::get<1>(decodeChangeFeedValue(val.get())),
status));
tr->clear(rangeIDKey);
} else {
throw unsupported_operation();
}
}
return Void();
}
ACTOR Future<Void> updateChangeFeed(Reference<ReadYourWritesTransaction> tr,
Key rangeID,
ChangeFeedStatus status,
KeyRange range) {
state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
Optional<Value> val = wait(tr->get(rangeIDKey));
if (status == ChangeFeedStatus::CHANGE_FEED_CREATE) {
if (!val.present()) {
tr->set(rangeIDKey, changeFeedValue(range, invalidVersion, status));
} else if (std::get<0>(decodeChangeFeedValue(val.get())) != range) {
throw unsupported_operation();
}
} else if (status == ChangeFeedStatus::CHANGE_FEED_STOP) {
if (val.present()) {
tr->set(rangeIDKey,
changeFeedValue(std::get<0>(decodeChangeFeedValue(val.get())),
std::get<1>(decodeChangeFeedValue(val.get())),
status));
} else {
throw unsupported_operation();
}
} else if (status == ChangeFeedStatus::CHANGE_FEED_DESTROY) {
if (val.present()) {
tr->set(rangeIDKey,
changeFeedValue(std::get<0>(decodeChangeFeedValue(val.get())),
std::get<1>(decodeChangeFeedValue(val.get())),
status));
tr->clear(rangeIDKey);
} else {
throw unsupported_operation();
}
}
return Void();
}
ACTOR Future<Void> updateChangeFeed(Database cx, Key rangeID, ChangeFeedStatus status, KeyRange range) {
state Transaction tr(cx);
loop {
try {
wait(updateChangeFeed(&tr, rangeID, status, range));
wait(tr.commit());
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> advanceVersion(Database cx, Version v) {
state Transaction tr(cx);
loop {

View File

@ -207,6 +207,13 @@ ACTOR Future<Void> unlockDatabase(Database cx, UID id);
ACTOR Future<Void> checkDatabaseLock(Transaction* tr, UID id);
ACTOR Future<Void> checkDatabaseLock(Reference<ReadYourWritesTransaction> tr, UID id);
ACTOR Future<Void> updateChangeFeed(Transaction* tr, Key rangeID, ChangeFeedStatus status, KeyRange range = KeyRange());
ACTOR Future<Void> updateChangeFeed(Reference<ReadYourWritesTransaction> tr,
Key rangeID,
ChangeFeedStatus status,
KeyRange range = KeyRange());
ACTOR Future<Void> updateChangeFeed(Database cx, Key rangeID, ChangeFeedStatus status, KeyRange range = KeyRange());
ACTOR Future<Void> advanceVersion(Database cx, Version v);
ACTOR Future<int> setDDMode(Database cx, int mode);

View File

@ -6662,6 +6662,447 @@ Future<Void> DatabaseContext::createSnapshot(StringRef uid, StringRef snapshot_c
return createSnapshotActor(this, UID::fromString(uid_str), snapshot_command);
}
ACTOR Future<Void> singleChangeFeedStream(StorageServerInterface interf,
PromiseStream<Standalone<MutationsAndVersionRef>> results,
Key rangeID,
Version begin,
Version end,
KeyRange range) {
loop {
try {
state ChangeFeedStreamRequest req;
req.rangeID = rangeID;
req.begin = begin;
req.end = end;
req.range = range;
state ReplyPromiseStream<ChangeFeedStreamReply> replyStream = interf.changeFeedStream.getReplyStream(req);
loop {
state ChangeFeedStreamReply rep = waitNext(replyStream.getFuture());
begin = rep.mutations.back().version + 1;
state int resultLoc = 0;
while (resultLoc < rep.mutations.size()) {
if (rep.mutations[resultLoc].mutations.size() || rep.mutations[resultLoc].version + 1 == end) {
wait(results.onEmpty());
results.send(rep.mutations[resultLoc]);
}
resultLoc++;
}
if (begin == end) {
results.sendError(end_of_stream());
return Void();
}
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
results.sendError(e);
return Void();
}
}
}
struct MutationAndVersionStream {
Standalone<MutationsAndVersionRef> next;
PromiseStream<Standalone<MutationsAndVersionRef>> results;
bool operator<(MutationAndVersionStream const& rhs) const { return next.version > rhs.next.version; }
};
ACTOR Future<Void> mergeChangeFeedStream(std::vector<std::pair<StorageServerInterface, KeyRange>> interfs,
PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> results,
Key rangeID,
Version* begin,
Version end) {
state std::priority_queue<MutationAndVersionStream, std::vector<MutationAndVersionStream>> mutations;
state std::vector<Future<Void>> fetchers(interfs.size());
state std::vector<MutationAndVersionStream> streams(interfs.size());
for (int i = 0; i < interfs.size(); i++) {
fetchers[i] =
singleChangeFeedStream(interfs[i].first, streams[i].results, rangeID, *begin, end, interfs[i].second);
}
state int interfNum = 0;
while (interfNum < interfs.size()) {
try {
Standalone<MutationsAndVersionRef> res = waitNext(streams[interfNum].results.getFuture());
streams[interfNum].next = res;
mutations.push(streams[interfNum]);
} catch (Error& e) {
if (e.code() != error_code_end_of_stream) {
throw e;
}
}
interfNum++;
}
state Version checkVersion = invalidVersion;
state Standalone<VectorRef<MutationsAndVersionRef>> nextOut;
while (mutations.size()) {
state MutationAndVersionStream nextStream = mutations.top();
mutations.pop();
ASSERT(nextStream.next.version >= checkVersion);
if (nextStream.next.version != checkVersion) {
if (nextOut.size()) {
*begin = checkVersion + 1;
results.send(nextOut);
nextOut = Standalone<VectorRef<MutationsAndVersionRef>>();
}
checkVersion = nextStream.next.version;
}
if (nextOut.size() && nextStream.next.version == nextOut.back().version) {
if (nextStream.next.mutations.size() &&
nextStream.next.mutations.front().param1 != lastEpochEndPrivateKey) {
nextOut.back().mutations.append_deep(
nextOut.arena(), nextStream.next.mutations.begin(), nextStream.next.mutations.size());
}
} else {
nextOut.push_back_deep(nextOut.arena(), nextStream.next);
}
try {
Standalone<MutationsAndVersionRef> res = waitNext(nextStream.results.getFuture());
nextStream.next = res;
mutations.push(nextStream);
} catch (Error& e) {
if (e.code() != error_code_end_of_stream) {
throw e;
}
}
}
if (nextOut.size()) {
results.send(nextOut);
}
throw end_of_stream();
}
ACTOR Future<KeyRange> getChangeFeedRange(Reference<DatabaseContext> db, Database cx, Key rangeID, Version begin = 0) {
state Transaction tr(cx);
state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix);
auto cacheLoc = db->changeFeedCache.find(rangeID);
if (cacheLoc != db->changeFeedCache.end()) {
return cacheLoc->second;
}
loop {
try {
Version readVer = wait(tr.getReadVersion());
if (readVer < begin) {
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
tr.reset();
} else {
Optional<Value> val = wait(tr.get(rangeIDKey));
if (!val.present()) {
throw change_feed_not_registered();
}
if (db->changeFeedCache.size() > CLIENT_KNOBS->CHANGE_FEED_CACHE_SIZE) {
db->changeFeedCache.clear();
}
KeyRange range = std::get<0>(decodeChangeFeedValue(val.get()));
db->changeFeedCache[rangeID] = range;
return range;
}
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> results,
Key rangeID,
Version begin,
Version end,
KeyRange range) {
state Database cx(db);
state Span span("NAPI:GetChangeFeedStream"_loc);
loop {
state KeyRange keys;
try {
KeyRange fullRange = wait(getChangeFeedRange(db, cx, rangeID, begin));
keys = fullRange & range;
state std::vector<std::pair<KeyRange, Reference<LocationInfo>>> locations =
wait(getKeyRangeLocations(cx,
keys,
CLIENT_KNOBS->CHANGE_FEED_LOCATION_LIMIT,
Reverse::False,
&StorageServerInterface::changeFeedStream,
TransactionInfo(TaskPriority::DefaultEndpoint, span.context)));
if (locations.size() >= CLIENT_KNOBS->CHANGE_FEED_LOCATION_LIMIT) {
ASSERT_WE_THINK(false);
throw unknown_change_feed();
}
state std::vector<int> chosenLocations(locations.size());
state int loc = 0;
while (loc < locations.size()) {
// FIXME: create a load balance function for this code so future users of reply streams do not have
// to duplicate this code
int count = 0;
int useIdx = -1;
for (int i = 0; i < locations[loc].second->size(); i++) {
if (!IFailureMonitor::failureMonitor()
.getState(
locations[loc].second->get(i, &StorageServerInterface::changeFeedStream).getEndpoint())
.failed) {
if (deterministicRandom()->random01() <= 1.0 / ++count) {
useIdx = i;
}
}
}
if (useIdx >= 0) {
chosenLocations[loc] = useIdx;
loc++;
continue;
}
std::vector<Future<Void>> ok(locations[loc].second->size());
for (int i = 0; i < ok.size(); i++) {
ok[i] = IFailureMonitor::failureMonitor().onStateEqual(
locations[loc].second->get(i, &StorageServerInterface::changeFeedStream).getEndpoint(),
FailureStatus(false));
}
// Making this SevWarn means a lot of clutter
if (now() - g_network->networkInfo.newestAlternativesFailure > 1 ||
deterministicRandom()->random01() < 0.01) {
TraceEvent("AllAlternativesFailed").detail("Alternatives", locations[0].second->description());
}
wait(allAlternativesFailedDelay(quorum(ok, 1)));
loc = 0;
}
if (locations.size() > 1) {
std::vector<std::pair<StorageServerInterface, KeyRange>> interfs;
for (int i = 0; i < locations.size(); i++) {
interfs.push_back(std::make_pair(locations[i].second->getInterface(chosenLocations[i]),
locations[i].first & range));
}
wait(mergeChangeFeedStream(interfs, results, rangeID, &begin, end) || cx->connectionFileChanged());
} else {
state ChangeFeedStreamRequest req;
req.rangeID = rangeID;
req.begin = begin;
req.end = end;
req.range = range;
state ReplyPromiseStream<ChangeFeedStreamReply> replyStream =
locations[0]
.second->get(chosenLocations[0], &StorageServerInterface::changeFeedStream)
.getReplyStream(req);
loop {
wait(results.onEmpty());
choose {
when(wait(cx->connectionFileChanged())) { break; }
when(ChangeFeedStreamReply rep = waitNext(replyStream.getFuture())) {
begin = rep.mutations.back().version + 1;
results.send(Standalone<VectorRef<MutationsAndVersionRef>>(rep.mutations, rep.arena));
}
}
}
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
e.code() == error_code_connection_failed || e.code() == error_code_unknown_change_feed ||
e.code() == error_code_broken_promise) {
db->changeFeedCache.erase(rangeID);
cx->invalidateCache(keys);
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY));
} else {
results.sendError(e);
return Void();
}
}
}
}
Future<Void> DatabaseContext::getChangeFeedStream(
const PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>& results,
Key rangeID,
Version begin,
Version end,
KeyRange range) {
return getChangeFeedStreamActor(Reference<DatabaseContext>::addRef(this), results, rangeID, begin, end, range);
}
ACTOR Future<std::vector<OverlappingChangeFeedEntry>> singleLocationOverlappingChangeFeeds(
Database cx,
Reference<LocationInfo> location,
KeyRangeRef range,
Version minVersion) {
state OverlappingChangeFeedsRequest req;
req.range = range;
req.minVersion = minVersion;
OverlappingChangeFeedsReply rep = wait(loadBalance(cx.getPtr(),
location,
&StorageServerInterface::overlappingChangeFeeds,
req,
TaskPriority::DefaultPromiseEndpoint,
AtMostOnce::False,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr));
return rep.rangeIds;
}
bool compareChangeFeedResult(const OverlappingChangeFeedEntry& i, const OverlappingChangeFeedEntry& j) {
return i.rangeId < j.rangeId;
}
ACTOR Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeedsActor(Reference<DatabaseContext> db,
KeyRangeRef range,
Version minVersion) {
state Database cx(db);
state Transaction tr(cx);
state Span span("NAPI:GetOverlappingChangeFeeds"_loc);
loop {
try {
state std::vector<std::pair<KeyRange, Reference<LocationInfo>>> locations =
wait(getKeyRangeLocations(cx,
range,
CLIENT_KNOBS->CHANGE_FEED_LOCATION_LIMIT,
Reverse::False,
&StorageServerInterface::overlappingChangeFeeds,
TransactionInfo(TaskPriority::DefaultEndpoint, span.context)));
if (locations.size() >= CLIENT_KNOBS->CHANGE_FEED_LOCATION_LIMIT) {
TraceEvent(SevError, "OverlappingRangeTooLarge")
.detail("Range", range)
.detail("Limit", CLIENT_KNOBS->CHANGE_FEED_LOCATION_LIMIT);
wait(delay(1.0));
throw all_alternatives_failed();
}
state std::vector<Future<std::vector<OverlappingChangeFeedEntry>>> allOverlappingRequests;
for (auto& it : locations) {
allOverlappingRequests.push_back(
singleLocationOverlappingChangeFeeds(cx, it.second, it.first & range, minVersion));
}
wait(waitForAll(allOverlappingRequests));
std::vector<OverlappingChangeFeedEntry> result;
for (auto& it : allOverlappingRequests) {
result.insert(result.end(), it.get().begin(), it.get().end());
}
std::sort(result.begin(), result.end(), compareChangeFeedResult);
result.resize(std::unique(result.begin(), result.end()) - result.begin());
return result;
} catch (Error& e) {
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) {
cx->invalidateCache(range);
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY));
} else {
throw e;
}
}
}
}
Future<std::vector<OverlappingChangeFeedEntry>> DatabaseContext::getOverlappingChangeFeeds(KeyRangeRef range,
Version minVersion) {
return getOverlappingChangeFeedsActor(Reference<DatabaseContext>::addRef(this), range, minVersion);
}
ACTOR static Future<Void> popChangeFeedBackup(Database cx, StringRef rangeID, Version version) {
state Transaction tr(cx);
loop {
try {
state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix);
Optional<Value> val = wait(tr.get(rangeIDKey));
if (val.present()) {
KeyRange range;
Version popVersion;
ChangeFeedStatus status;
std::tie(range, popVersion, status) = decodeChangeFeedValue(val.get());
if (version > popVersion) {
tr.set(rangeIDKey, changeFeedValue(range, version, status));
}
} else {
throw change_feed_not_registered();
}
wait(tr.commit());
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> popChangeFeedMutationsActor(Reference<DatabaseContext> db, StringRef rangeID, Version version) {
state Database cx(db);
state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix);
state Span span("NAPI:PopChangeFeedMutations"_loc);
state KeyRange keys = wait(getChangeFeedRange(db, cx, rangeID));
state std::vector<std::pair<KeyRange, Reference<LocationInfo>>> locations =
wait(getKeyRangeLocations(cx,
keys,
3,
Reverse::False,
&StorageServerInterface::changeFeedPop,
TransactionInfo(TaskPriority::DefaultEndpoint, span.context)));
if (locations.size() > 2) {
wait(popChangeFeedBackup(cx, rangeID, version));
return Void();
}
bool foundFailed = false;
for (int i = 0; i < locations.size() && !foundFailed; i++) {
for (int j = 0; j < locations[i].second->size() && !foundFailed; j++) {
if (IFailureMonitor::failureMonitor()
.getState(locations[i].second->get(j, &StorageServerInterface::changeFeedPop).getEndpoint())
.isFailed()) {
foundFailed = true;
}
}
}
if (foundFailed) {
wait(popChangeFeedBackup(cx, rangeID, version));
return Void();
}
// FIXME: lookup both the src and dest shards as of the pop version to ensure all locations are popped
state std::vector<Future<Void>> popRequests;
for (int i = 0; i < locations.size(); i++) {
for (int j = 0; j < locations[i].second->size(); j++) {
popRequests.push_back(locations[i].second->getInterface(j).changeFeedPop.getReply(
ChangeFeedPopRequest(rangeID, version, locations[i].first)));
}
}
try {
choose {
when(wait(waitForAll(popRequests))) {}
when(wait(delay(CLIENT_KNOBS->CHANGE_FEED_POP_TIMEOUT))) {
wait(popChangeFeedBackup(cx, rangeID, version));
}
}
} catch (Error& e) {
if (e.code() != error_code_unknown_change_feed && e.code() != error_code_wrong_shard_server &&
e.code() != error_code_all_alternatives_failed) {
throw;
}
db->changeFeedCache.erase(rangeID);
cx->invalidateCache(keys);
wait(popChangeFeedBackup(cx, rangeID, version));
}
return Void();
}
Future<Void> DatabaseContext::popChangeFeedMutations(StringRef rangeID, Version version) {
return popChangeFeedMutationsActor(Reference<DatabaseContext>::addRef(this), rangeID, version);
}
ACTOR Future<Void> setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware) {
state ReadYourWritesTransaction tr(cx);
loop {

View File

@ -270,6 +270,27 @@ void TSS_traceMismatch(TraceEvent& event,
ASSERT(false);
}
// change feed
template <>
bool TSS_doCompare(const OverlappingChangeFeedsReply& src, const OverlappingChangeFeedsReply& tss) {
ASSERT(false);
return true;
}
template <>
const char* TSS_mismatchTraceName(const OverlappingChangeFeedsRequest& req) {
ASSERT(false);
return "";
}
template <>
void TSS_traceMismatch(TraceEvent& event,
const OverlappingChangeFeedsRequest& req,
const OverlappingChangeFeedsReply& src,
const OverlappingChangeFeedsReply& tss) {
ASSERT(false);
}
// template specializations for metrics replies that should never be called because these requests aren't duplicated
// storage metrics
@ -331,6 +352,9 @@ void TSSMetrics::recordLatency(const SplitRangeRequest& req, double ssLatency, d
template <>
void TSSMetrics::recordLatency(const GetKeyValuesStreamRequest& req, double ssLatency, double tssLatency) {}
template <>
void TSSMetrics::recordLatency(const OverlappingChangeFeedsRequest& req, double ssLatency, double tssLatency) {}
// -------------------
TEST_CASE("/StorageServerInterface/TSSCompare/TestComparison") {

View File

@ -30,6 +30,7 @@
#include "fdbrpc/Stats.h"
#include "fdbrpc/TimedRequest.h"
#include "fdbrpc/TSSComparison.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/TagThrottle.actor.h"
#include "flow/UnitTest.h"
@ -77,6 +78,9 @@ struct StorageServerInterface {
RequestStream<struct ReadHotSubRangeRequest> getReadHotRanges;
RequestStream<struct SplitRangeRequest> getRangeSplitPoints;
RequestStream<struct GetKeyValuesStreamRequest> getKeyValuesStream;
RequestStream<struct ChangeFeedStreamRequest> changeFeedStream;
RequestStream<struct OverlappingChangeFeedsRequest> overlappingChangeFeeds;
RequestStream<struct ChangeFeedPopRequest> changeFeedPop;
explicit StorageServerInterface(UID uid) : uniqueID(uid) {}
StorageServerInterface() : uniqueID(deterministicRandom()->randomUniqueID()) {}
@ -119,6 +123,12 @@ struct StorageServerInterface {
RequestStream<struct SplitRangeRequest>(getValue.getEndpoint().getAdjustedEndpoint(12));
getKeyValuesStream =
RequestStream<struct GetKeyValuesStreamRequest>(getValue.getEndpoint().getAdjustedEndpoint(13));
changeFeedStream =
RequestStream<struct ChangeFeedStreamRequest>(getValue.getEndpoint().getAdjustedEndpoint(14));
overlappingChangeFeeds =
RequestStream<struct OverlappingChangeFeedsRequest>(getValue.getEndpoint().getAdjustedEndpoint(15));
changeFeedPop =
RequestStream<struct ChangeFeedPopRequest>(getValue.getEndpoint().getAdjustedEndpoint(16));
}
} else {
ASSERT(Ar::isDeserializing);
@ -161,6 +171,9 @@ struct StorageServerInterface {
streams.push_back(getReadHotRanges.getReceiver());
streams.push_back(getRangeSplitPoints.getReceiver());
streams.push_back(getKeyValuesStream.getReceiver(TaskPriority::LoadBalancedEndpoint));
streams.push_back(changeFeedStream.getReceiver());
streams.push_back(overlappingChangeFeeds.getReceiver());
streams.push_back(changeFeedPop.getReceiver());
FlowTransport::transport().addEndpoints(streams);
}
};
@ -622,6 +635,133 @@ struct SplitRangeRequest {
}
};
struct MutationsAndVersionRef {
VectorRef<MutationRef> mutations;
Version version = invalidVersion;
Version knownCommittedVersion = invalidVersion;
MutationsAndVersionRef() {}
explicit MutationsAndVersionRef(Version version, Version knownCommittedVersion)
: version(version), knownCommittedVersion(knownCommittedVersion) {}
MutationsAndVersionRef(VectorRef<MutationRef> mutations, Version version, Version knownCommittedVersion)
: mutations(mutations), version(version), knownCommittedVersion(knownCommittedVersion) {}
MutationsAndVersionRef(Arena& to, VectorRef<MutationRef> mutations, Version version, Version knownCommittedVersion)
: mutations(to, mutations), version(version), knownCommittedVersion(knownCommittedVersion) {}
MutationsAndVersionRef(Arena& to, const MutationsAndVersionRef& from)
: mutations(to, from.mutations), version(from.version), knownCommittedVersion(from.knownCommittedVersion) {}
int expectedSize() const { return mutations.expectedSize(); }
struct OrderByVersion {
bool operator()(MutationsAndVersionRef const& a, MutationsAndVersionRef const& b) const {
return a.version < b.version;
}
};
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, mutations, version, knownCommittedVersion);
}
};
struct ChangeFeedStreamReply : public ReplyPromiseStreamReply {
constexpr static FileIdentifier file_identifier = 1783066;
Arena arena;
VectorRef<MutationsAndVersionRef> mutations;
ChangeFeedStreamReply() {}
int expectedSize() const { return sizeof(ChangeFeedStreamReply) + mutations.expectedSize(); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, mutations, arena);
}
};
struct ChangeFeedStreamRequest {
constexpr static FileIdentifier file_identifier = 6795746;
SpanID spanContext;
Arena arena;
Key rangeID;
Version begin = 0;
Version end = 0;
KeyRange range;
ReplyPromiseStream<ChangeFeedStreamReply> reply;
ChangeFeedStreamRequest() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, rangeID, begin, end, range, reply, spanContext, arena);
}
};
struct ChangeFeedPopRequest {
constexpr static FileIdentifier file_identifier = 10726174;
Key rangeID;
Version version;
KeyRange range;
ReplyPromise<Void> reply;
ChangeFeedPopRequest() {}
ChangeFeedPopRequest(Key const& rangeID, Version version, KeyRange const& range)
: rangeID(rangeID), version(version), range(range) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, rangeID, version, range, reply);
}
};
struct OverlappingChangeFeedEntry {
Key rangeId;
KeyRange range;
bool stopped = false;
bool operator==(const OverlappingChangeFeedEntry& r) const {
return rangeId == r.rangeId && range == r.range && stopped == r.stopped;
}
OverlappingChangeFeedEntry() {}
OverlappingChangeFeedEntry(Key const& rangeId, KeyRange const& range, bool stopped)
: rangeId(rangeId), range(range), stopped(stopped) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, rangeId, range, stopped);
}
};
struct OverlappingChangeFeedsReply {
constexpr static FileIdentifier file_identifier = 11815134;
std::vector<OverlappingChangeFeedEntry> rangeIds;
bool cached;
Arena arena;
OverlappingChangeFeedsReply() : cached(false) {}
explicit OverlappingChangeFeedsReply(std::vector<OverlappingChangeFeedEntry> const& rangeIds)
: rangeIds(rangeIds), cached(false) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, rangeIds, arena);
}
};
struct OverlappingChangeFeedsRequest {
constexpr static FileIdentifier file_identifier = 10726174;
KeyRange range;
Version minVersion;
ReplyPromise<OverlappingChangeFeedsReply> reply;
OverlappingChangeFeedsRequest() {}
explicit OverlappingChangeFeedsRequest(KeyRange const& range) : range(range) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, range, minVersion, reply);
}
};
struct GetStorageMetricsReply {
constexpr static FileIdentifier file_identifier = 15491478;
StorageMetrics load;

View File

@ -1032,6 +1032,62 @@ const KeyRef writeRecoveryKey = LiteralStringRef("\xff/writeRecovery");
const ValueRef writeRecoveryKeyTrue = LiteralStringRef("1");
const KeyRef snapshotEndVersionKey = LiteralStringRef("\xff/snapshotEndVersion");
const KeyRangeRef changeFeedKeys(LiteralStringRef("\xff\x02/feed/"), LiteralStringRef("\xff\x02/feed0"));
const KeyRef changeFeedPrefix = changeFeedKeys.begin;
const KeyRef changeFeedPrivatePrefix = LiteralStringRef("\xff\xff\x02/feed/");
const Value changeFeedValue(KeyRangeRef const& range, Version popVersion, ChangeFeedStatus status) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withChangeFeed()));
wr << range;
wr << popVersion;
wr << status;
return wr.toValue();
}
std::tuple<KeyRange, Version, ChangeFeedStatus> decodeChangeFeedValue(ValueRef const& value) {
KeyRange range;
Version version;
ChangeFeedStatus status;
BinaryReader reader(value, IncludeVersion());
reader >> range;
reader >> version;
reader >> status;
return std::make_tuple(range, version, status);
}
const KeyRangeRef changeFeedDurableKeys(LiteralStringRef("\xff\xff/cf/"), LiteralStringRef("\xff\xff/cf0"));
const KeyRef changeFeedDurablePrefix = changeFeedDurableKeys.begin;
const Value changeFeedDurableKey(Key const& feed, Version version) {
BinaryWriter wr(AssumeVersion(ProtocolVersion::withChangeFeed()));
wr.serializeBytes(changeFeedDurablePrefix);
wr << feed;
wr << bigEndian64(version);
return wr.toValue();
}
std::pair<Key, Version> decodeChangeFeedDurableKey(ValueRef const& key) {
Key feed;
Version version;
BinaryReader reader(key.removePrefix(changeFeedDurablePrefix), AssumeVersion(ProtocolVersion::withChangeFeed()));
reader >> feed;
reader >> version;
return std::make_pair(feed, bigEndian64(version));
}
const Value changeFeedDurableValue(Standalone<VectorRef<MutationRef>> const& mutations, Version knownCommittedVersion) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withChangeFeed()));
wr << mutations;
wr << knownCommittedVersion;
return wr.toValue();
}
std::pair<Standalone<VectorRef<MutationRef>>, Version> decodeChangeFeedDurableValue(ValueRef const& value) {
Standalone<VectorRef<MutationRef>> mutations;
Version knownCommittedVersion;
BinaryReader reader(value, IncludeVersion());
reader >> mutations;
reader >> knownCommittedVersion;
return std::make_pair(mutations, knownCommittedVersion);
}
const KeyRef configTransactionDescriptionKey = "\xff\xff/description"_sr;
const KeyRange globalConfigKnobKeys = singleKeyRange("\xff\xff/globalKnobs"_sr);
const KeyRangeRef configKnobKeys("\xff\xff/knobs/"_sr, "\xff\xff/knobs0"_sr);

View File

@ -496,6 +496,21 @@ extern const ValueRef writeRecoveryKeyTrue;
// Allows incremental restore to read and set starting version for consistency.
extern const KeyRef snapshotEndVersionKey;
extern const KeyRangeRef changeFeedKeys;
enum class ChangeFeedStatus { CHANGE_FEED_CREATE = 0, CHANGE_FEED_STOP = 1, CHANGE_FEED_DESTROY = 2 };
const Value changeFeedValue(KeyRangeRef const& range, Version popVersion, ChangeFeedStatus status);
std::tuple<KeyRange, Version, ChangeFeedStatus> decodeChangeFeedValue(ValueRef const& value);
extern const KeyRef changeFeedPrefix;
extern const KeyRef changeFeedPrivatePrefix;
extern const KeyRangeRef changeFeedDurableKeys;
extern const KeyRef changeFeedDurablePrefix;
const Value changeFeedDurableKey(Key const& feed, Version version);
std::pair<Key, Version> decodeChangeFeedDurableKey(ValueRef const& key);
const Value changeFeedDurableValue(Standalone<VectorRef<MutationRef>> const& mutations, Version knownCommittedVersion);
std::pair<Standalone<VectorRef<MutationRef>>, Version> decodeChangeFeedDurableValue(ValueRef const& value);
// Configuration database special keys
extern const KeyRef configTransactionDescriptionKey;
extern const KeyRange globalConfigKnobKeys;

View File

@ -157,7 +157,7 @@ public:
private:
std::unordered_map<NetworkAddress, FailureStatus> addressStatus;
YieldedAsyncMap<Endpoint, bool> endpointKnownFailed;
YieldedAsyncMap<NetworkAddress, bool> disconnectTriggers;
AsyncMap<NetworkAddress, bool> disconnectTriggers;
std::unordered_set<Endpoint> failedEndpoints;
friend class OnStateChangedActorActor;

View File

@ -441,10 +441,12 @@ public:
template <class E>
void sendError(const E& exc) const {
if (queue->isRemoteEndpoint() && !queue->sentError) {
queue->sentError = true;
FlowTransport::transport().sendUnreliable(
SerializeSource<ErrorOr<EnsureTable<T>>>(exc), getEndpoint(), false);
if (queue->isRemoteEndpoint()) {
if (!queue->sentError && !queue->acknowledgements.failures.isError()) {
queue->sentError = true;
FlowTransport::transport().sendUnreliable(
SerializeSource<ErrorOr<EnsureTable<T>>>(exc), getEndpoint(), false);
}
} else {
queue->sendError(exc);
if (errors && errors->canBeSet()) {

View File

@ -308,6 +308,32 @@ private:
txnStateStore->set(KeyValueRef(m.param1, m.param2));
}
void checkSetChangeFeedPrefix(MutationRef m) {
if (!m.param1.startsWith(changeFeedPrefix)) {
return;
}
if (toCommit && keyInfo) {
KeyRange r = std::get<0>(decodeChangeFeedValue(m.param2));
MutationRef privatized = m;
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
auto ranges = keyInfo->intersectingRanges(r);
auto firstRange = ranges.begin();
++firstRange;
if (firstRange == ranges.end()) {
ranges.begin().value().populateTags();
toCommit->addTags(ranges.begin().value().tags);
} else {
std::set<Tag> allSources;
for (auto r : ranges) {
r.value().populateTags();
allSources.insert(r.value().tags.begin(), r.value().tags.end());
}
toCommit->addTags(allSources);
}
toCommit->writeTypedMessage(privatized);
}
}
void checkSetServerListPrefix(MutationRef m) {
if (!m.param1.startsWith(serverListPrefix)) {
return;
@ -974,6 +1000,7 @@ public:
checkSetCacheKeysPrefix(m);
checkSetConfigKeys(m);
checkSetServerListPrefix(m);
checkSetChangeFeedPrefix(m);
checkSetTSSMappingKeys(m);
checkSetTSSQuarantineKeys(m);
checkSetApplyMutationsEndRange(m);

View File

@ -169,6 +169,7 @@ set(FDBSERVER_SRCS
workloads/ConsistencyCheck.actor.cpp
workloads/CpuProfiler.actor.cpp
workloads/Cycle.actor.cpp
workloads/ChangeFeeds.actor.cpp
workloads/DataDistributionMetrics.actor.cpp
workloads/DataLossRecovery.actor.cpp
workloads/DDBalance.actor.cpp

View File

@ -1867,10 +1867,13 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
reply.end = endVersion;
reply.onlySpilled = onlySpilled;
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("Tag", reqTag.toString()).
// detail("BeginVer", reqBegin).detail("EndVer", reply.end).
// detail("MsgBytes", reply.messages.expectedSize()).
// detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress());
// TraceEvent("TlogPeek", self->dbgid)
// .detail("LogId", logData->logId)
// .detail("Tag", req.tag.toString())
// .detail("BeginVer", req.begin)
// .detail("EndVer", reply.end)
// .detail("MsgBytes", reply.messages.expectedSize())
// .detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());
if (reqSequence.present()) {
auto& trackerData = logData->peekTracker[peekId];

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,223 @@
/*
* ChangeFeeds.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "flow/Arena.h"
#include "flow/IRandom.h"
#include "flow/Trace.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/serialize.h"
#include <cstring>
ACTOR Future<std::pair<Standalone<VectorRef<KeyValueRef>>, Version>> readDatabase(Database cx) {
state Transaction tr(cx);
loop {
state Standalone<VectorRef<KeyValueRef>> output;
state Version readVersion;
try {
Version ver = wait(tr.getReadVersion());
readVersion = ver;
state PromiseStream<Standalone<RangeResultRef>> results;
state Future<Void> stream = tr.getRangeStream(results, normalKeys, GetRangeLimits());
loop {
Standalone<RangeResultRef> res = waitNext(results.getFuture());
output.arena().dependsOn(res.arena());
output.append(output.arena(), res.begin(), res.size());
}
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
return std::make_pair(output, readVersion);
}
wait(tr.onError(e));
}
}
}
ACTOR Future<Standalone<VectorRef<MutationsAndVersionRef>>> readMutations(Database cx,
Key rangeID,
Version begin,
Version end) {
state Standalone<VectorRef<MutationsAndVersionRef>> output;
loop {
try {
state PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> results;
state Future<Void> stream = cx->getChangeFeedStream(results, rangeID, begin, end, normalKeys);
loop {
Standalone<VectorRef<MutationsAndVersionRef>> res = waitNext(results.getFuture());
output.arena().dependsOn(res.arena());
for (auto& it : res) {
if (it.mutations.size() == 1 && it.mutations.back().param1 == lastEpochEndPrivateKey) {
Version rollbackVersion;
BinaryReader br(it.mutations.back().param2, Unversioned());
br >> rollbackVersion;
TraceEvent("ChangeFeedRollback")
.detail("Ver", it.version)
.detail("RollbackVer", rollbackVersion);
while (output.size() && output.back().version > rollbackVersion) {
TraceEvent("ChangeFeedRollbackVer").detail("Ver", output.back().version);
output.pop_back();
}
} else {
output.push_back(output.arena(), it);
}
}
begin = res.back().version + 1;
}
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
return output;
}
throw;
}
}
}
Standalone<VectorRef<KeyValueRef>> advanceData(Standalone<VectorRef<KeyValueRef>> source,
Standalone<VectorRef<MutationsAndVersionRef>> mutations) {
StringRef dbgKey = LiteralStringRef("");
std::map<KeyRef, ValueRef> data;
for (auto& kv : source) {
if (kv.key == dbgKey)
TraceEvent("ChangeFeedDbgStart").detail("K", kv.key).detail("V", kv.value);
data[kv.key] = kv.value;
}
for (auto& it : mutations) {
for (auto& m : it.mutations) {
if (m.type == MutationRef::SetValue) {
if (m.param1 == dbgKey)
TraceEvent("ChangeFeedDbgSet")
.detail("Ver", it.version)
.detail("K", m.param1)
.detail("V", m.param2);
data[m.param1] = m.param2;
} else {
ASSERT(m.type == MutationRef::ClearRange);
if (KeyRangeRef(m.param1, m.param2).contains(dbgKey))
TraceEvent("ChangeFeedDbgClear")
.detail("Ver", it.version)
.detail("Begin", m.param1)
.detail("End", m.param2);
data.erase(data.lower_bound(m.param1), data.lower_bound(m.param2));
}
}
}
Standalone<VectorRef<KeyValueRef>> output;
output.arena().dependsOn(source.arena());
output.arena().dependsOn(mutations.arena());
for (auto& kv : data) {
output.push_back(output.arena(), KeyValueRef(kv.first, kv.second));
}
return output;
}
bool compareData(Standalone<VectorRef<KeyValueRef>> source, Standalone<VectorRef<KeyValueRef>> dest) {
if (source.size() != dest.size()) {
TraceEvent(SevError, "ChangeFeedSizeMismatch").detail("SrcSize", source.size()).detail("DestSize", dest.size());
}
for (int i = 0; i < std::min(source.size(), dest.size()); i++) {
if (source[i] != dest[i]) {
TraceEvent("ChangeFeedMutationMismatch")
.detail("Index", i)
.detail("SrcKey", source[i].key)
.detail("DestKey", dest[i].key)
.detail("SrcValue", source[i].value)
.detail("DestValue", dest[i].value);
return false;
}
}
return source.size() == dest.size();
}
struct ChangeFeedsWorkload : TestWorkload {
double testDuration;
Future<Void> client;
ChangeFeedsWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
testDuration = getOption(options, "testDuration"_sr, 10.0);
}
std::string description() const override { return "ChangeFeedsWorkload"; }
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override {
if (clientId == 0) {
client = changeFeedClient(cx->clone(), this);
return delay(testDuration);
}
return Void();
}
Future<bool> check(Database const& cx) override {
client = Future<Void>();
return true;
}
void getMetrics(std::vector<PerfMetric>& m) override {}
ACTOR Future<Void> changeFeedClient(Database cx, ChangeFeedsWorkload* self) {
// Enable change feeds for a key range
state Key rangeID = StringRef(deterministicRandom()->randomUniqueID().toString());
wait(updateChangeFeed(cx, rangeID, ChangeFeedStatus::CHANGE_FEED_CREATE, normalKeys));
loop {
wait(delay(deterministicRandom()->random01()));
state std::pair<Standalone<VectorRef<KeyValueRef>>, Version> firstResults = wait(readDatabase(cx));
TraceEvent("ChangeFeedReadDB").detail("Ver1", firstResults.second);
wait(delay(10 * deterministicRandom()->random01()));
state std::pair<Standalone<VectorRef<KeyValueRef>>, Version> secondResults = wait(readDatabase(cx));
TraceEvent("ChangeFeedReadDB").detail("Ver2", secondResults.second);
state Standalone<VectorRef<MutationsAndVersionRef>> mutations =
wait(readMutations(cx, rangeID, firstResults.second, secondResults.second + 1));
Standalone<VectorRef<KeyValueRef>> advancedResults = advanceData(firstResults.first, mutations);
if (!compareData(secondResults.first, advancedResults)) {
TraceEvent(SevError, "ChangeFeedMismatch")
.detail("FirstVersion", firstResults.second)
.detail("SecondVersion", secondResults.second);
for (int i = 0; i < secondResults.first.size(); i++) {
TraceEvent("ChangeFeedBase")
.detail("Index", i)
.detail("K", secondResults.first[i].key)
.detail("V", secondResults.first[i].value);
}
for (int i = 0; i < advancedResults.size(); i++) {
TraceEvent("ChangeFeedAdvanced")
.detail("Index", i)
.detail("K", advancedResults[i].key)
.detail("V", advancedResults[i].value);
}
ASSERT(false);
}
wait(cx->popChangeFeedMutations(rangeID, secondResults.second));
}
}
};
WorkloadFactory<ChangeFeedsWorkload> ChangeFeedsWorkloadFactory("ChangeFeeds");

View File

@ -640,6 +640,16 @@ struct hash<StringRef> {
};
} // namespace std
namespace std {
template <>
struct hash<Standalone<StringRef>> {
static constexpr std::hash<std::string_view> hashFunc{};
std::size_t operator()(Standalone<StringRef> const& tag) const {
return hashFunc(std::string_view((const char*)tag.begin(), tag.size()));
}
};
} // namespace std
template <>
struct TraceableString<StringRef> {
static const char* begin(StringRef value) { return reinterpret_cast<const char*>(value.begin()); }

View File

@ -138,6 +138,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010000LL, StableInterfaces);
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, TagThrottleValueReason);
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, SpanContext);
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, ChangeFeed);
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, TSS);
};

View File

@ -79,6 +79,8 @@ ERROR( version_already_compacted, 1055, "The requested changes have been compact
ERROR( local_config_changed, 1056, "Local configuration file has changed. Restart and apply these changes" )
ERROR( failed_to_reach_quorum, 1057, "Failed to reach quorum from configuration database nodes. Retry sending these requests" )
ERROR( wrong_format_version, 1058, "Format version not recognize." )
ERROR( unknown_change_feed, 1059, "Change feed not found" )
ERROR( change_feed_not_registered, 1060, "Change feed not registered" )
ERROR( broken_promise, 1100, "Broken promise" )
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )

View File

@ -131,6 +131,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/ConstrainedRandomSelector.toml)
add_fdb_test(TEST_FILES fast/CycleAndLock.toml)
add_fdb_test(TEST_FILES fast/CycleTest.toml)
add_fdb_test(TEST_FILES fast/ChangeFeeds.toml)
add_fdb_test(TEST_FILES fast/DataLossRecovery.toml)
add_fdb_test(TEST_FILES fast/FuzzApiCorrectness.toml)
add_fdb_test(TEST_FILES fast/FuzzApiCorrectnessClean.toml)

View File

@ -0,0 +1,35 @@
[[test]]
testTitle = 'ChangeFeed'
[[test.workload]]
testName = 'Cycle'
transactionsPerSecond = 250.0
testDuration = 60.0
expectedRate = 0
[[test.workload]]
testName = 'ChangeFeeds'
testDuration = 60.0
[[test.workload]]
testName = 'RandomClogging'
testDuration = 60.0
[[test.workload]]
testName = 'Rollback'
meanDelay = 60.0
testDuration = 60.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 60.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 60.0