changefeeds now have a whenAtLeast function for efficiently learning when the version has updated but no mutations have been committed

This commit is contained in:
Evan Tschannen 2021-11-14 19:08:46 -08:00
parent 86ea63d1da
commit 6909754b21
10 changed files with 360 additions and 66 deletions

View File

@ -62,6 +62,17 @@ ACTOR Future<Void> changeFeedList(Database db) {
namespace fdb_cli {
ACTOR Future<Void> requestVersionUpdate(Database localDb, Reference<ChangeFeedData> feedData) {
loop {
wait(delay(5.0));
Transaction tr(localDb);
state Version ver = wait(tr.getReadVersion());
printf("Requesting version %d\n", ver);
wait(feedData->whenAtLeast(ver));
printf("Feed at version %d\n", ver);
}
}
ACTOR Future<bool> changeFeedCommandActor(Database localDb, std::vector<StringRef> tokens, Future<Void> warn) {
if (tokens.size() == 1) {
printUsage(tokens[0]);
@ -117,14 +128,16 @@ ACTOR Future<bool> changeFeedCommandActor(Database localDb, std::vector<StringRe
if (warn.isValid()) {
warn.cancel();
}
state PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> feedResults;
state Future<Void> feed = localDb->getChangeFeedStream(feedResults, tokens[2], begin, end);
state Reference<ChangeFeedData> feedData = makeReference<ChangeFeedData>();
state Future<Void> feed = localDb->getChangeFeedStream(feedData, tokens[2], begin, end);
state Future<Void> versionUpdates = requestVersionUpdate(localDb, feedData);
printf("\n");
try {
state Future<Void> feedInterrupt = LineNoise::onKeyboardInterrupt();
loop {
choose {
when(Standalone<VectorRef<MutationsAndVersionRef>> res = waitNext(feedResults.getFuture())) {
when(Standalone<VectorRef<MutationsAndVersionRef>> res =
waitNext(feedData->mutations.getFuture())) {
for (auto& it : res) {
for (auto& it2 : it.mutations) {
printf("%lld %s\n", it.version, it2.toString().c_str());
@ -134,7 +147,7 @@ ACTOR Future<bool> changeFeedCommandActor(Database localDb, std::vector<StringRe
when(wait(feedInterrupt)) {
feedInterrupt = Future<Void>();
feed.cancel();
feedResults = PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>();
feedData = makeReference<ChangeFeedData>();
break;
}
}

View File

@ -100,6 +100,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( RANGESTREAM_FRAGMENT_SIZE, 1e6 );
init( RANGESTREAM_BUFFERED_FRAGMENTS_LIMIT, 20 );
init( QUARANTINE_TSS_ON_MISMATCH, true ); if( randomize && BUGGIFY ) QUARANTINE_TSS_ON_MISMATCH = false; // if true, a tss mismatch will put the offending tss in quarantine. If false, it will just be killed
init( CHANGE_FEED_EMPTY_BATCH_TIME, 0.005 );
//KeyRangeMap
init( KRM_GET_RANGE_LIMIT, 1e5 ); if( randomize && BUGGIFY ) KRM_GET_RANGE_LIMIT = 10;

View File

@ -100,6 +100,7 @@ public:
int64_t RANGESTREAM_FRAGMENT_SIZE;
int RANGESTREAM_BUFFERED_FRAGMENTS_LIMIT;
bool QUARANTINE_TSS_ON_MISMATCH;
double CHANGE_FEED_EMPTY_BATCH_TIME;
// KeyRangeMap
int KRM_GET_RANGE_LIMIT;

View File

@ -20,6 +20,7 @@
#ifndef DatabaseContext_h
#define DatabaseContext_h
#include "fdbclient/Notified.h"
#include "flow/FastAlloc.h"
#include "flow/FastRef.h"
#include "fdbclient/StorageServerInterface.h"
@ -146,6 +147,25 @@ public:
WatchMetadata(Key key, Optional<Value> value, Version version, TransactionInfo info, TagSet tags);
};
struct ChangeFeedStorageData : ReferenceCounted<ChangeFeedStorageData> {
UID id;
Future<Void> updater;
NotifiedVersion version;
NotifiedVersion desired;
};
struct ChangeFeedData : ReferenceCounted<ChangeFeedData> {
PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> mutations;
Version getVersion();
Future<Void> whenAtLeast(Version version);
NotifiedVersion lastReturnedVersion;
std::vector<Reference<ChangeFeedStorageData>> storageData;
AsyncVar<int> notAtLatest;
Promise<Void> refresh;
};
class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAllocated<DatabaseContext>, NonCopyable {
public:
static DatabaseContext* allocateOnForeignThread() {
@ -252,7 +272,7 @@ public:
// Management API, create snapshot
Future<Void> createSnapshot(StringRef uid, StringRef snapshot_command);
Future<Void> getChangeFeedStream(const PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>& results,
Future<Void> getChangeFeedStream(Reference<ChangeFeedData> results,
Key rangeID,
Version begin = 0,
Version end = std::numeric_limits<Version>::max(),
@ -345,6 +365,9 @@ public:
std::unordered_map<UID, Reference<TSSMetrics>> tssMetrics;
// map from changeFeedId -> changeFeedRange
std::unordered_map<Key, KeyRange> changeFeedCache;
std::unordered_map<UID, Reference<ChangeFeedStorageData>> changeFeedUpdaters;
Reference<ChangeFeedStorageData> getStorageData(StorageServerInterface interf);
UID dbId;
IsInternal internal; // Only contexts created through the C client and fdbcli are non-internal

View File

@ -65,6 +65,7 @@
#include "flow/ActorCollection.h"
#include "flow/DeterministicRandom.h"
#include "flow/Error.h"
#include "flow/FastRef.h"
#include "flow/IRandom.h"
#include "flow/flow.h"
#include "flow/genericactors.actor.h"
@ -6663,12 +6664,101 @@ Future<Void> DatabaseContext::createSnapshot(StringRef uid, StringRef snapshot_c
return createSnapshotActor(this, UID::fromString(uid_str), snapshot_command);
}
ACTOR Future<Void> storageFeedVersionUpdater(StorageServerInterface interf, ChangeFeedStorageData* self) {
loop {
if (self->version.get() < self->desired.get()) {
wait(delay(CLIENT_KNOBS->CHANGE_FEED_EMPTY_BATCH_TIME) || self->version.whenAtLeast(self->desired.get()));
if (self->version.get() < self->desired.get()) {
ChangeFeedVersionUpdateReply rep = wait(brokenPromiseToNever(
interf.changeFeedVersionUpdate.getReply(ChangeFeedVersionUpdateRequest(self->desired.get()))));
if (rep.version > self->version.get()) {
self->version.set(rep.version);
}
}
} else {
wait(self->desired.whenAtLeast(self->version.get() + 1));
}
}
}
Reference<ChangeFeedStorageData> DatabaseContext::getStorageData(StorageServerInterface interf) {
auto it = changeFeedUpdaters.find(interf.id());
if (it == changeFeedUpdaters.end()) {
Reference<ChangeFeedStorageData> newStorageUpdater = makeReference<ChangeFeedStorageData>();
newStorageUpdater->id = interf.id();
newStorageUpdater->updater = storageFeedVersionUpdater(interf, newStorageUpdater.getPtr());
changeFeedUpdaters[interf.id()] = newStorageUpdater;
return newStorageUpdater;
}
return it->second;
}
Version ChangeFeedData::getVersion() {
if (notAtLatest.get() == 0 && mutations.isEmpty()) {
Version v = storageData[0]->version.get();
for (int i = 1; i < storageData.size(); i++) {
if (storageData[i]->version.get() < v) {
v = storageData[i]->version.get();
}
}
return std::max(v, lastReturnedVersion.get());
}
return lastReturnedVersion.get();
}
ACTOR Future<Void> changeFeedWhenAtLatest(ChangeFeedData* self, Version version) {
state Future<Void> lastReturned = self->lastReturnedVersion.whenAtLeast(version);
loop {
if (self->notAtLatest.get() == 0) {
std::vector<Future<Void>> allAtLeast;
for (auto& it : self->storageData) {
if (it->version.get() < version) {
if (version > it->desired.get()) {
it->desired.set(version);
}
allAtLeast.push_back(it->version.whenAtLeast(version));
}
}
choose {
when(wait(lastReturned)) { return Void(); }
when(wait(waitForAll(allAtLeast))) {
if (self->mutations.isEmpty()) {
return Void();
}
choose {
when(wait(self->mutations.onEmpty())) {
wait(delay(0));
return Void();
}
when(wait(lastReturned)) { return Void(); }
when(wait(self->refresh.getFuture())) {}
}
}
when(wait(self->refresh.getFuture())) {}
}
} else {
choose {
when(wait(lastReturned)) { return Void(); }
when(wait(self->notAtLatest.onChange())) {}
when(wait(self->refresh.getFuture())) {}
}
}
}
}
Future<Void> ChangeFeedData::whenAtLeast(Version version) {
return changeFeedWhenAtLatest(this, version);
}
ACTOR Future<Void> singleChangeFeedStream(StorageServerInterface interf,
PromiseStream<Standalone<MutationsAndVersionRef>> results,
Key rangeID,
Version begin,
Version end,
KeyRange range) {
KeyRange range,
Reference<ChangeFeedData> feedData,
Reference<ChangeFeedStorageData> storageData) {
state bool atLatestVersion = false;
loop {
try {
state Version lastEmpty = invalidVersion;
@ -6699,6 +6789,13 @@ ACTOR Future<Void> singleChangeFeedStream(StorageServerInterface interf,
results.sendError(end_of_stream());
return Void();
}
if (!atLatestVersion && rep.atLatestVersion) {
atLatestVersion = true;
feedData->notAtLatest.set(feedData->notAtLatest.get() - 1);
}
if (rep.minStreamVersion > storageData->version.get()) {
storageData->version.set(rep.minStreamVersion);
}
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
@ -6716,17 +6813,39 @@ struct MutationAndVersionStream {
bool operator<(MutationAndVersionStream const& rhs) const { return next.version > rhs.next.version; }
};
ACTOR Future<Void> mergeChangeFeedStream(std::vector<std::pair<StorageServerInterface, KeyRange>> interfs,
PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> results,
ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
std::vector<std::pair<StorageServerInterface, KeyRange>> interfs,
Reference<ChangeFeedData> results,
Key rangeID,
Version* begin,
Version end) {
state std::priority_queue<MutationAndVersionStream, std::vector<MutationAndVersionStream>> mutations;
state std::vector<Future<Void>> fetchers(interfs.size());
state std::vector<MutationAndVersionStream> streams(interfs.size());
for (auto& it : results->storageData) {
if (it->debugGetReferenceCount() == 2) {
db->changeFeedUpdaters.erase(it->id);
}
}
results->storageData.clear();
Promise<Void> refresh = results->refresh;
results->refresh = Promise<Void>();
for (int i = 0; i < interfs.size(); i++) {
fetchers[i] =
singleChangeFeedStream(interfs[i].first, streams[i].results, rangeID, *begin, end, interfs[i].second);
results->storageData.push_back(db->getStorageData(interfs[i].first));
}
results->notAtLatest.set(interfs.size());
refresh.send(Void());
for (int i = 0; i < interfs.size(); i++) {
fetchers[i] = singleChangeFeedStream(interfs[i].first,
streams[i].results,
rangeID,
*begin,
end,
interfs[i].second,
results,
results->storageData[i]);
}
state int interfNum = 0;
while (interfNum < interfs.size()) {
@ -6750,7 +6869,8 @@ ACTOR Future<Void> mergeChangeFeedStream(std::vector<std::pair<StorageServerInte
if (nextStream.next.version != checkVersion) {
if (nextOut.size()) {
*begin = checkVersion + 1;
results.send(nextOut);
results->mutations.send(nextOut);
results->lastReturnedVersion.set(nextOut.back().version);
nextOut = Standalone<VectorRef<MutationsAndVersionRef>>();
}
checkVersion = nextStream.next.version;
@ -6775,7 +6895,8 @@ ACTOR Future<Void> mergeChangeFeedStream(std::vector<std::pair<StorageServerInte
}
}
if (nextOut.size()) {
results.send(nextOut);
results->mutations.send(nextOut);
results->lastReturnedVersion.set(nextOut.back().version);
}
throw end_of_stream();
}
@ -6814,7 +6935,7 @@ ACTOR Future<KeyRange> getChangeFeedRange(Reference<DatabaseContext> db, Databas
}
ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> results,
Reference<ChangeFeedData> results,
Key rangeID,
Version begin,
Version end,
@ -6887,32 +7008,57 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
interfs.push_back(std::make_pair(locations[i].second->getInterface(chosenLocations[i]),
locations[i].first & range));
}
wait(mergeChangeFeedStream(interfs, results, rangeID, &begin, end) || cx->connectionFileChanged());
wait(mergeChangeFeedStream(db, interfs, results, rangeID, &begin, end) || cx->connectionFileChanged());
} else {
state ChangeFeedStreamRequest req;
req.rangeID = rangeID;
req.begin = begin;
req.end = end;
req.range = range;
StorageServerInterface interf = locations[0].second->getInterface(chosenLocations[0]);
state ReplyPromiseStream<ChangeFeedStreamReply> replyStream =
locations[0]
.second->get(chosenLocations[0], &StorageServerInterface::changeFeedStream)
.getReplyStream(req);
interf.changeFeedStream.getReplyStream(req);
for (auto& it : results->storageData) {
if (it->debugGetReferenceCount() == 2) {
db->changeFeedUpdaters.erase(it->id);
}
}
results->storageData.clear();
results->storageData.push_back(db->getStorageData(interf));
Promise<Void> refresh = results->refresh;
results->refresh = Promise<Void>();
results->notAtLatest.set(1);
refresh.send(Void());
state bool atLatest = false;
loop {
wait(results.onEmpty());
wait(results->mutations.onEmpty());
choose {
when(wait(cx->connectionFileChanged())) { break; }
when(ChangeFeedStreamReply rep = waitNext(replyStream.getFuture())) {
begin = rep.mutations.back().version + 1;
results.send(Standalone<VectorRef<MutationsAndVersionRef>>(rep.mutations, rep.arena));
results->mutations.send(
Standalone<VectorRef<MutationsAndVersionRef>>(rep.mutations, rep.arena));
results->lastReturnedVersion.set(rep.mutations.back().version);
if (!atLatest && rep.atLatestVersion) {
atLatest = true;
results->notAtLatest.set(0);
}
if (rep.minStreamVersion > results->storageData[0]->version.get()) {
results->storageData[0]->version.set(rep.minStreamVersion);
}
}
}
}
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
for (auto& it : results->storageData) {
if (it->debugGetReferenceCount() == 2) {
db->changeFeedUpdaters.erase(it->id);
}
}
results->storageData.clear();
results->refresh.sendError(change_feed_cancelled());
throw;
}
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
@ -6922,19 +7068,25 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
cx->invalidateCache(keys);
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY));
} else {
results.sendError(e);
results->mutations.sendError(e);
results->refresh.sendError(change_feed_cancelled());
for (auto& it : results->storageData) {
if (it->debugGetReferenceCount() == 2) {
db->changeFeedUpdaters.erase(it->id);
}
}
results->storageData.clear();
return Void();
}
}
}
}
Future<Void> DatabaseContext::getChangeFeedStream(
const PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>& results,
Key rangeID,
Version begin,
Version end,
KeyRange range) {
Future<Void> DatabaseContext::getChangeFeedStream(Reference<ChangeFeedData> results,
Key rangeID,
Version begin,
Version end,
KeyRange range) {
return getChangeFeedStreamActor(Reference<DatabaseContext>::addRef(this), results, rangeID, begin, end, range);
}

View File

@ -81,6 +81,7 @@ struct StorageServerInterface {
RequestStream<struct ChangeFeedStreamRequest> changeFeedStream;
RequestStream<struct OverlappingChangeFeedsRequest> overlappingChangeFeeds;
RequestStream<struct ChangeFeedPopRequest> changeFeedPop;
RequestStream<struct ChangeFeedVersionUpdateRequest> changeFeedVersionUpdate;
explicit StorageServerInterface(UID uid) : uniqueID(uid) {}
StorageServerInterface() : uniqueID(deterministicRandom()->randomUniqueID()) {}
@ -129,6 +130,8 @@ struct StorageServerInterface {
RequestStream<struct OverlappingChangeFeedsRequest>(getValue.getEndpoint().getAdjustedEndpoint(15));
changeFeedPop =
RequestStream<struct ChangeFeedPopRequest>(getValue.getEndpoint().getAdjustedEndpoint(16));
changeFeedVersionUpdate = RequestStream<struct ChangeFeedVersionUpdateRequest>(
getValue.getEndpoint().getAdjustedEndpoint(17));
}
} else {
ASSERT(Ar::isDeserializing);
@ -174,6 +177,7 @@ struct StorageServerInterface {
streams.push_back(changeFeedStream.getReceiver());
streams.push_back(overlappingChangeFeeds.getReceiver());
streams.push_back(changeFeedPop.getReceiver());
streams.push_back(changeFeedVersionUpdate.getReceiver());
FlowTransport::transport().addEndpoints(streams);
}
};
@ -639,6 +643,8 @@ struct ChangeFeedStreamReply : public ReplyPromiseStreamReply {
constexpr static FileIdentifier file_identifier = 1783066;
Arena arena;
VectorRef<MutationsAndVersionRef> mutations;
bool atLatestVersion = false;
Version minStreamVersion = invalidVersion;
ChangeFeedStreamReply() {}
@ -646,7 +652,13 @@ struct ChangeFeedStreamReply : public ReplyPromiseStreamReply {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, mutations, arena);
serializer(ar,
ReplyPromiseStreamReply::acknowledgeToken,
ReplyPromiseStreamReply::sequence,
mutations,
atLatestVersion,
minStreamVersion,
arena);
}
};
@ -734,6 +746,33 @@ struct OverlappingChangeFeedsRequest {
}
};
struct ChangeFeedVersionUpdateReply {
constexpr static FileIdentifier file_identifier = 11815134;
Version version = 0;
ChangeFeedVersionUpdateReply() {}
explicit ChangeFeedVersionUpdateReply(Version version) : version(version) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version);
}
};
struct ChangeFeedVersionUpdateRequest {
constexpr static FileIdentifier file_identifier = 6795746;
Version minVersion;
ReplyPromise<ChangeFeedVersionUpdateReply> reply;
ChangeFeedVersionUpdateRequest() {}
explicit ChangeFeedVersionUpdateRequest(Version minVersion) : minVersion(minVersion) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, minVersion, reply);
}
};
struct GetStorageMetricsReply {
constexpr static FileIdentifier file_identifier = 15491478;
StorageMetrics load;

View File

@ -1054,8 +1054,8 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
Reference<GranuleMetadata> metadata,
Future<GranuleStartState> assignFuture) {
state PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> oldChangeFeedStream;
state PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> changeFeedStream;
state Reference<ChangeFeedData> oldChangeFeedStream = makeReference<ChangeFeedData>();
state Reference<ChangeFeedData> changeFeedStream = makeReference<ChangeFeedData>();
state Future<BlobFileIndex> inFlightBlobSnapshot;
state std::deque<InFlightDeltaFile> inFlightDeltaFiles;
state Future<Void> oldChangeFeedFuture;
@ -1220,7 +1220,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
state Standalone<VectorRef<MutationsAndVersionRef>> mutations;
if (readOldChangeFeed) {
Standalone<VectorRef<MutationsAndVersionRef>> oldMutations = waitNext(oldChangeFeedStream.getFuture());
Standalone<VectorRef<MutationsAndVersionRef>> oldMutations =
waitNext(oldChangeFeedStream->mutations.getFuture());
// TODO filter old mutations won't be necessary, SS does it already
if (filterOldMutations(
metadata->keyRange, &oldMutations, &mutations, startState.changeFeedStartVersion)) {
@ -1235,10 +1236,11 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
// now that old change feed is cancelled, clear out any mutations still in buffer by replacing
// promise stream
oldChangeFeedStream = PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>();
oldChangeFeedStream = makeReference<ChangeFeedData>();
}
} else {
Standalone<VectorRef<MutationsAndVersionRef>> newMutations = waitNext(changeFeedStream.getFuture());
Standalone<VectorRef<MutationsAndVersionRef>> newMutations =
waitNext(changeFeedStream->mutations.getFuture());
mutations = newMutations;
}
@ -1504,8 +1506,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
// reset change feeds to cfRollbackVersion
if (readOldChangeFeed) {
oldChangeFeedStream =
PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>();
oldChangeFeedStream = makeReference<ChangeFeedData>();
oldChangeFeedFuture = bwData->db->getChangeFeedStream(
oldChangeFeedStream,
oldCFKey.get(),
@ -1513,7 +1514,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
MAX_VERSION,
startState.parentGranule.get().first /*metadata->keyRange*/);
} else {
changeFeedStream = PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>();
changeFeedStream = makeReference<ChangeFeedData>();
changeFeedFuture = bwData->db->getChangeFeedStream(changeFeedStream,
cfKey,
cfRollbackVersion + 1,

View File

@ -607,6 +607,7 @@ public:
Deque<std::pair<std::vector<Key>, Version>> changeFeedVersions;
std::map<UID, PromiseStream<Key>> changeFeedRemovals;
std::set<Key> currentChangeFeeds;
std::unordered_map<NetworkAddress, std::map<UID, Version>> changeFeedClientVersions;
// newestAvailableVersion[k]
// == invalidVersion -> k is unavailable at all versions
@ -622,7 +623,7 @@ public:
// that were only partly available (due to cancelled fetchKeys)
// The following are in rough order from newest to oldest
Version lastTLogVersion, lastVersionWithData, restoredVersion;
Version lastTLogVersion, lastVersionWithData, restoredVersion, prevVersion;
NotifiedVersion version;
NotifiedVersion desiredOldestVersion; // We can increase oldestVersion (and then durableVersion) to this version
// when the disk permits
@ -884,10 +885,11 @@ public:
Histogram::Unit::microseconds)),
tag(invalidTag), poppedAllAfter(std::numeric_limits<Version>::max()), cpuUsage(0.0), diskUsage(0.0),
storage(this, storage), shardChangeCounter(0), lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
rebootAfterDurableVersion(std::numeric_limits<Version>::max()), primaryLocality(tagLocalityInvalid),
knownCommittedVersion(0), versionLag(0), logProtocol(0), thisServerID(ssi.id()), tssInQuarantine(false), db(db),
actors(false), byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), durableInProgress(Void()),
watchBytes(0), numWatches(0), noRecentUpdates(false), lastUpdate(now()),
prevVersion(0), rebootAfterDurableVersion(std::numeric_limits<Version>::max()),
primaryLocality(tagLocalityInvalid), knownCommittedVersion(0), versionLag(0), logProtocol(0),
thisServerID(ssi.id()), tssInQuarantine(false), db(db), actors(false),
byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), durableInProgress(Void()), watchBytes(0),
numWatches(0), noRecentUpdates(false), lastUpdate(now()),
readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")), updateEagerReads(nullptr),
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM),
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
@ -1712,7 +1714,7 @@ ACTOR Future<ChangeFeedStreamReply> getChangeFeedMutations(StorageServer* data,
state ChangeFeedStreamReply memoryReply;
state int remainingLimitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
state int remainingDurableBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
wait(delay(0, TaskPriority::DefaultEndpoint));
if (data->version.get() < req.begin) {
wait(data->version.whenAtLeast(req.begin));
}
@ -1847,17 +1849,47 @@ ACTOR Future<Void> localChangeFeedStream(StorageServer* data,
ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamRequest req) {
state Span span("SS:getChangeFeedStream"_loc, { req.spanContext });
state bool atLatest = false;
state UID streamUID = deterministicRandom()->randomUniqueID();
state bool removeUID = false;
req.reply.setByteLimit(SERVER_KNOBS->RANGESTREAM_LIMIT_BYTES);
wait(delay(0, TaskPriority::DefaultEndpoint));
try {
loop {
wait(req.reply.onReady());
ChangeFeedStreamReply _feedReply = wait(getChangeFeedMutations(data, req, false));
Future<Void> onReady = req.reply.onReady();
if (atLatest && !onReady.isReady()) {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][streamUID] =
data->version.get();
removeUID = true;
}
wait(onReady);
state Future<ChangeFeedStreamReply> feedReplyFuture = getChangeFeedMutations(data, req, false);
if (atLatest && !removeUID && !feedReplyFuture.isReady()) {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][streamUID] =
data->prevVersion;
removeUID = true;
}
ChangeFeedStreamReply _feedReply = wait(feedReplyFuture);
ChangeFeedStreamReply feedReply = _feedReply;
req.begin = feedReply.mutations.back().version + 1;
if (!atLatest && feedReply.mutations.back().mutations.empty()) {
atLatest = true;
}
auto& clientVersions = data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()];
Version minVersion = removeUID ? data->version.get() : data->prevVersion;
if (removeUID) {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()].erase(streamUID);
removeUID = false;
}
for (auto& it : clientVersions) {
minVersion = std::min(minVersion, it.second);
}
feedReply.atLatestVersion = atLatest;
feedReply.minStreamVersion = minVersion;
req.reply.send(feedReply);
if (feedReply.mutations.back().version == req.end - 1) {
req.reply.sendError(end_of_stream());
@ -1869,13 +1901,21 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
req.reply.sendError(unknown_change_feed());
return Void();
}
choose {
when(wait(delay(5.0, TaskPriority::DefaultEndpoint))) {}
when(wait(feed->second->newMutations.onTrigger())) {}
}
wait(feed->second->newMutations
.onTrigger()); // FIXME: check that this is triggered when the range is moved to a different
// server, also check that the stream is closed
}
}
} catch (Error& e) {
auto it = data->changeFeedClientVersions.find(req.reply.getEndpoint().getPrimaryAddress());
if (it != data->changeFeedClientVersions.end()) {
if (removeUID) {
it->second.erase(streamUID);
}
if (it->second.empty()) {
data->changeFeedClientVersions.erase(it);
}
}
if (e.code() != error_code_operation_obsolete) {
if (!canReplyWith(e))
throw;
@ -1885,6 +1925,18 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
return Void();
}
ACTOR Future<Void> changeFeedVersionUpdateQ(StorageServer* data, ChangeFeedVersionUpdateRequest req) {
wait(data->version.whenAtLeast(req.minVersion));
wait(delay(0));
auto& clientVersions = data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()];
Version minVersion = data->version.get();
for (auto& it : clientVersions) {
minVersion = std::min(minVersion, it.second);
}
req.reply.send(ChangeFeedVersionUpdateReply(minVersion));
return Void();
}
#ifdef NO_INTELLISENSE
size_t WATCH_OVERHEAD_WATCHQ =
sizeof(WatchValueSendReplyActorState<WatchValueSendReplyActor>) + sizeof(WatchValueSendReplyActor);
@ -3294,14 +3346,14 @@ ACTOR Future<Void> fetchChangeFeedApplier(StorageServer* data,
KeyRange range,
Version fetchVersion,
bool existing) {
state PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> feedResults;
state Reference<ChangeFeedData> feedResults = makeReference<ChangeFeedData>();
state Future<Void> feed = data->cx->getChangeFeedStream(
feedResults, rangeId, 0, existing ? fetchVersion + 1 : data->version.get() + 1, range);
if (!existing) {
try {
loop {
Standalone<VectorRef<MutationsAndVersionRef>> res = waitNext(feedResults.getFuture());
Standalone<VectorRef<MutationsAndVersionRef>> res = waitNext(feedResults->mutations.getFuture());
for (auto& it : res) {
if (it.mutations.size()) {
data->storage.writeKeyValue(
@ -3332,7 +3384,8 @@ ACTOR Future<Void> fetchChangeFeedApplier(StorageServer* data,
localResult = _localResult;
try {
loop {
state Standalone<VectorRef<MutationsAndVersionRef>> remoteResult = waitNext(feedResults.getFuture());
state Standalone<VectorRef<MutationsAndVersionRef>> remoteResult =
waitNext(feedResults->mutations.getFuture());
state int remoteLoc = 0;
while (remoteLoc < remoteResult.size()) {
if (remoteResult[remoteLoc].version < localResult.version) {
@ -4581,6 +4634,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
cloneCursor2->setProtocolVersion(data->logProtocol);
state SpanID spanContext = SpanID();
state double beforeTLogMsgsUpdates = now();
state std::set<Key> updatedChangeFeeds;
for (; cloneCursor2->hasMessage(); cloneCursor2->nextMessage()) {
if (mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
mutationBytes = 0;
@ -4599,12 +4653,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
if (data->currentChangeFeeds.size()) {
data->changeFeedVersions.push_back(std::make_pair(
std::vector<Key>(data->currentChangeFeeds.begin(), data->currentChangeFeeds.end()), ver));
for (auto& it : data->currentChangeFeeds) {
auto feed = data->uidChangeFeed.find(it);
if (feed != data->uidChangeFeed.end()) {
feed->second->newMutations.trigger();
}
}
updatedChangeFeeds.insert(data->currentChangeFeeds.begin(), data->currentChangeFeeds.end());
data->currentChangeFeeds.clear();
}
ver = cloneCursor2->version().version;
@ -4690,12 +4739,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
if (data->currentChangeFeeds.size()) {
data->changeFeedVersions.push_back(std::make_pair(
std::vector<Key>(data->currentChangeFeeds.begin(), data->currentChangeFeeds.end()), ver));
for (auto& it : data->currentChangeFeeds) {
auto feed = data->uidChangeFeed.find(it);
if (feed != data->uidChangeFeed.end()) {
feed->second->newMutations.trigger();
}
}
updatedChangeFeeds.insert(data->currentChangeFeeds.begin(), data->currentChangeFeeds.end());
data->currentChangeFeeds.clear();
}
@ -4737,8 +4781,17 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
data->noRecentUpdates.set(false);
data->lastUpdate = now();
data->prevVersion = data->version.get();
data->version.set(ver); // Triggers replies to waiting gets for new version(s)
for (auto& it : updatedChangeFeeds) {
auto feed = data->uidChangeFeed.find(it);
if (feed != data->uidChangeFeed.end()) {
feed->second->newMutations.trigger();
}
}
setDataVersion(data->thisServerID, data->version.get());
if (data->otherError.getFuture().isReady())
data->otherError.getFuture().get();
@ -5843,6 +5896,15 @@ ACTOR Future<Void> serveChangeFeedPopRequests(StorageServer* self, FutureStream<
}
}
ACTOR Future<Void> serveChangeFeedVersionUpdateRequests(
StorageServer* self,
FutureStream<ChangeFeedVersionUpdateRequest> changeFeedVersionUpdate) {
loop {
ChangeFeedVersionUpdateRequest req = waitNext(changeFeedVersionUpdate);
self->actors.add(self->readGuard(req, changeFeedVersionUpdateQ));
}
}
ACTOR Future<Void> reportStorageServerState(StorageServer* self) {
if (!SERVER_KNOBS->REPORT_DD_METRICS) {
return Void();
@ -5895,6 +5957,7 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
self->actors.add(serveChangeFeedStreamRequests(self, ssi.changeFeedStream.getFuture()));
self->actors.add(serveOverlappingChangeFeedsRequests(self, ssi.overlappingChangeFeeds.getFuture()));
self->actors.add(serveChangeFeedPopRequests(self, ssi.changeFeedPop.getFuture()));
self->actors.add(serveChangeFeedVersionUpdateRequests(self, ssi.changeFeedVersionUpdate.getFuture()));
self->actors.add(traceRole(Role::STORAGE_SERVER, ssi.id()));
self->actors.add(reportStorageServerState(self));

View File

@ -65,10 +65,10 @@ ACTOR Future<Standalone<VectorRef<MutationsAndVersionRef>>> readMutations(Databa
state Standalone<VectorRef<MutationsAndVersionRef>> output;
loop {
try {
state PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> results;
state Reference<ChangeFeedData> results = makeReference<ChangeFeedData>();
state Future<Void> stream = cx->getChangeFeedStream(results, rangeID, begin, end, normalKeys);
loop {
Standalone<VectorRef<MutationsAndVersionRef>> res = waitNext(results.getFuture());
Standalone<VectorRef<MutationsAndVersionRef>> res = waitNext(results->mutations.getFuture());
output.arena().dependsOn(res.arena());
for (auto& it : res) {
if (it.mutations.size() == 1 && it.mutations.back().param1 == lastEpochEndPrivateKey) {

View File

@ -82,6 +82,7 @@ ERROR( wrong_format_version, 1058, "Format version not recognized" )
ERROR( unknown_change_feed, 1059, "Change feed not found" )
ERROR( change_feed_not_registered, 1060, "Change feed not registered" )
ERROR( granule_assignment_conflict, 1061, "Conflicting attempts to assign blob granules" )
ERROR( change_feed_cancelled, 1062, "Change feed was cancelled" )
ERROR( broken_promise, 1100, "Broken promise" )
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )