Merge branch 'feature-range-feed' into blob_full

This commit is contained in:
Josh Slocum 2021-08-12 10:22:28 -05:00
commit 1a4e70d22f
6 changed files with 370 additions and 41 deletions

View File

@ -36,6 +36,7 @@
#include "fdbclient/Schemas.h"
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/TagThrottle.h"
#include "fdbclient/Tuple.h"
@ -657,7 +658,8 @@ void initHelp() {
CommandHelp("triggerddteaminfolog",
"trigger the data distributor teams logging",
"Trigger the data distributor to log detailed information about its teams.");
helpMap["rangefeed"] = CommandHelp("rangefeed <register|get|pop> <RANGEID> <BEGINKEY> <ENDKEY>", "", "");
helpMap["rangefeed"] =
CommandHelp("rangefeed <register|destroy|get|stream|pop|list> <RANGEID> <BEGIN> <END>", "", "");
helpMap["tssq"] =
CommandHelp("tssq start|stop <StorageUID>",
"start/stop tss quarantine",
@ -1993,6 +1995,31 @@ ACTOR Future<Void> commitTransaction(Reference<ReadYourWritesTransaction> tr) {
return Void();
}
ACTOR Future<Void> rangeFeedList(Database db) {
state ReadYourWritesTransaction tr(db);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
RangeResult result = wait(tr.getRange(rangeFeedKeys, CLIENT_KNOBS->TOO_MANY));
// shouldn't have many quarantined TSSes
ASSERT(!result.more);
printf("Found %d range feeds%s\n", result.size(), result.size() == 0 ? "." : ":");
for (auto& it : result) {
auto range = decodeRangeFeedValue(it.value);
printf(" %s: %s - %s\n",
it.key.removePrefix(rangeFeedPrefix).toString().c_str(),
range.begin.toString().c_str(),
range.end.toString().c_str());
}
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<bool> configure(Database db,
std::vector<StringRef> tokens,
Reference<ClusterConnectionFile> ccf,
@ -3264,6 +3291,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
state Database db;
state Reference<ReadYourWritesTransaction> tr;
state Transaction trx;
// TODO: refactoring work, will replace db, tr when we have all commands through the general fdb interface
state Reference<IDatabase> db2;
state Reference<ITransaction> tr2;
@ -3534,13 +3562,21 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
is_error = true;
continue;
}
if (tokencmp(tokens[1], "register")) {
if (tokencmp(tokens[1], "list")) {
if (tokens.size() != 2) {
printUsage(tokens[0]);
is_error = true;
continue;
}
wait(rangeFeedList(db));
continue;
} else if (tokencmp(tokens[1], "register")) {
if (tokens.size() != 5) {
printUsage(tokens[0]);
is_error = true;
continue;
}
state Transaction trx(db);
trx = Transaction(db);
loop {
try {
wait(trx.registerRangeFeed(tokens[2], KeyRangeRef(tokens[3], tokens[4])));
@ -3550,6 +3586,22 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
wait(trx.onError(e));
}
}
} else if (tokencmp(tokens[1], "destroy")) {
if (tokens.size() != 3) {
printUsage(tokens[0]);
is_error = true;
continue;
}
trx = Transaction(db);
loop {
try {
trx.destroyRangeFeed(tokens[2]);
wait(trx.commit());
break;
} catch (Error& e) {
wait(trx.onError(e));
}
}
} else if (tokencmp(tokens[1], "get")) {
if (tokens.size() < 3 || tokens.size() > 5) {
printUsage(tokens[0]);
@ -3577,11 +3629,70 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
}
Standalone<VectorRef<MutationsAndVersionRef>> res =
wait(db->getRangeFeedMutations(tokens[2], begin, end));
printf("\n");
for (auto& it : res) {
for (auto& it2 : it.mutations) {
printf("%lld %s\n", it.version, it2.toString().c_str());
}
}
} else if (tokencmp(tokens[1], "stream")) {
if (tokens.size() < 3 || tokens.size() > 5) {
printUsage(tokens[0]);
is_error = true;
continue;
}
Version begin = 0;
Version end = std::numeric_limits<Version>::max();
if (tokens.size() > 3) {
int n = 0;
if (sscanf(tokens[3].toString().c_str(), "%ld%n", &begin, &n) != 1 ||
n != tokens[3].size()) {
printUsage(tokens[0]);
is_error = true;
continue;
}
}
if (tokens.size() > 4) {
int n = 0;
if (sscanf(tokens[4].toString().c_str(), "%ld%n", &end, &n) != 1 || n != tokens[4].size()) {
printUsage(tokens[0]);
is_error = true;
continue;
}
}
if (warn.isValid()) {
warn.cancel();
}
state PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> feedResults;
state Future<Void> feed = db->getRangeFeedStream(feedResults, tokens[2], begin, end);
printf("\n");
try {
state Future<Void> feedInterrupt = LineNoise::onKeyboardInterrupt();
loop {
choose {
when(Standalone<VectorRef<MutationsAndVersionRef>> res =
waitNext(feedResults.getFuture())) {
for (auto& it : res) {
for (auto& it2 : it.mutations) {
printf("%lld %s\n", it.version, it2.toString().c_str());
}
}
}
when(wait(feedInterrupt)) {
feedInterrupt = Future<Void>();
feed.cancel();
feedResults = PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>();
break;
}
}
}
continue;
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
continue;
}
throw;
}
} else if (tokencmp(tokens[1], "pop")) {
if (tokens.size() != 4) {
printUsage(tokens[0]);

View File

@ -257,6 +257,13 @@ public:
Version begin = 0,
Version end = std::numeric_limits<Version>::max(),
KeyRange range = allKeys);
Future<Void> getRangeFeedStream(const PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>& results,
StringRef rangeID,
Version begin = 0,
Version end = std::numeric_limits<Version>::max(),
KeyRange range = allKeys);
Future<std::vector<std::pair<Key, KeyRange>>> getOverlappingRangeFeeds(KeyRangeRef ranges, Version minVersion);
Future<Void> popRangeFeedMutations(StringRef rangeID, Version version);

View File

@ -4330,6 +4330,10 @@ Future<Void> Transaction::registerRangeFeed(const Key& rangeID, const KeyRange&
return registerRangeFeedActor(this, rangeID, range);
}
void Transaction::destroyRangeFeed(const Key& rangeID) {
clear(rangeID.withPrefix(rangeFeedPrefix));
}
ACTOR Future<Key> getKeyAndConflictRange(Database cx,
KeySelector k,
Future<Version> version,
@ -6575,6 +6579,126 @@ Future<Standalone<VectorRef<MutationsAndVersionRef>>> DatabaseContext::getRangeF
return getRangeFeedMutationsActor(Reference<DatabaseContext>::addRef(this), rangeID, begin, end, range);
}
ACTOR Future<Void> getRangeFeedStreamActor(Reference<DatabaseContext> db,
PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> results,
StringRef rangeID,
Version begin,
Version end,
KeyRange range) {
state Database cx(db);
state Transaction tr(cx);
state Key rangeIDKey = rangeID.withPrefix(rangeFeedPrefix);
state Span span("NAPI:GetRangeFeedStream"_loc);
state KeyRange keys;
loop {
try {
Optional<Value> val = wait(tr.get(rangeIDKey));
if (!val.present()) {
results.sendError(unsupported_operation());
return Void();
}
keys = decodeRangeFeedValue(val.get());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
loop {
try {
state vector<pair<KeyRange, Reference<LocationInfo>>> locations =
wait(getKeyRangeLocations(cx,
keys,
100,
Reverse::False,
&StorageServerInterface::rangeFeed,
TransactionInfo(TaskPriority::DefaultEndpoint, span.context)));
if (locations.size() > 1) {
results.sendError(unsupported_operation());
return Void();
}
state int useIdx = -1;
loop {
// FIXME: create a load balance function for this code so future users of reply streams do not have
// to duplicate this code
int count = 0;
for (int i = 0; i < locations[0].second->size(); i++) {
if (!IFailureMonitor::failureMonitor()
.getState(
locations[0].second->get(i, &StorageServerInterface::rangeFeedStream).getEndpoint())
.failed) {
if (deterministicRandom()->random01() <= 1.0 / ++count) {
useIdx = i;
}
}
}
if (useIdx >= 0) {
break;
}
vector<Future<Void>> ok(locations[0].second->size());
for (int i = 0; i < ok.size(); i++) {
ok[i] = IFailureMonitor::failureMonitor().onStateEqual(
locations[0].second->get(i, &StorageServerInterface::rangeFeedStream).getEndpoint(),
FailureStatus(false));
}
// Making this SevWarn means a lot of clutter
if (now() - g_network->networkInfo.newestAlternativesFailure > 1 ||
deterministicRandom()->random01() < 0.01) {
TraceEvent("AllAlternativesFailed").detail("Alternatives", locations[0].second->description());
}
wait(allAlternativesFailedDelay(quorum(ok, 1)));
}
state RangeFeedStreamRequest req;
req.rangeID = rangeID;
req.begin = begin;
req.end = end;
state ReplyPromiseStream<RangeFeedStreamReply> replyStream =
locations[0].second->get(useIdx, &StorageServerInterface::rangeFeedStream).getReplyStream(req);
loop {
wait(results.onEmpty());
choose {
when(wait(cx->connectionFileChanged())) { break; }
when(RangeFeedStreamReply rep = waitNext(replyStream.getFuture())) {
begin = rep.mutations.back().version + 1;
results.send(Standalone<VectorRef<MutationsAndVersionRef>>(rep.mutations, rep.arena));
}
}
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
e.code() == error_code_connection_failed) {
cx->invalidateCache(keys);
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY));
} else {
results.sendError(e);
return Void();
}
}
}
}
Future<Void> DatabaseContext::getRangeFeedStream(
const PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>& results,
StringRef rangeID,
Version begin,
Version end,
KeyRange range) {
return getRangeFeedStreamActor(Reference<DatabaseContext>::addRef(this), results, rangeID, begin, end, range);
}
ACTOR Future<std::vector<std::pair<Key, KeyRange>>> getOverlappingRangeFeedsActor(Reference<DatabaseContext> db,
KeyRangeRef range,
Version minVersion) {
@ -6633,6 +6757,7 @@ ACTOR Future<Void> popRangeFeedMutationsActor(Reference<DatabaseContext> db, Str
throw unsupported_operation();
}
// FIXME: lookup both the src and dest shards as of the pop version to ensure all locations are popped
state std::vector<Future<Void>> popRequests;
for (int i = 0; i < locations[0].second->size(); i++) {
popRequests.push_back(

View File

@ -327,6 +327,7 @@ public:
[[nodiscard]] Future<Standalone<VectorRef<const char*>>> getAddressesForKey(const Key& key);
Future<Void> registerRangeFeed(const Key& rangeID, const KeyRange& range);
void destroyRangeFeed(const Key& rangeID);
void enableCheckWrites();
void addReadConflictRange(KeyRangeRef const& keys);

View File

@ -77,10 +77,11 @@ struct StorageServerInterface {
RequestStream<struct WatchValueRequest> watchValue;
RequestStream<struct ReadHotSubRangeRequest> getReadHotRanges;
RequestStream<struct SplitRangeRequest> getRangeSplitPoints;
RequestStream<struct GetKeyValuesStreamRequest> getKeyValuesStream;
RequestStream<struct RangeFeedRequest> rangeFeed;
RequestStream<struct RangeFeedStreamRequest> rangeFeedStream;
RequestStream<struct OverlappingRangeFeedsRequest> overlappingRangeFeeds;
RequestStream<struct RangeFeedPopRequest> rangeFeedPop;
RequestStream<struct GetKeyValuesStreamRequest> getKeyValuesStream;
explicit StorageServerInterface(UID uid) : uniqueID(uid) {}
StorageServerInterface() : uniqueID(deterministicRandom()->randomUniqueID()) {}
@ -124,10 +125,12 @@ struct StorageServerInterface {
getKeyValuesStream =
RequestStream<struct GetKeyValuesStreamRequest>(getValue.getEndpoint().getAdjustedEndpoint(13));
rangeFeed = RequestStream<struct RangeFeedRequest>(getValue.getEndpoint().getAdjustedEndpoint(14));
rangeFeedStream =
RequestStream<struct RangeFeedStreamRequest>(getValue.getEndpoint().getAdjustedEndpoint(15));
overlappingRangeFeeds =
RequestStream<struct OverlappingRangeFeedsRequest>(getValue.getEndpoint().getAdjustedEndpoint(15));
RequestStream<struct OverlappingRangeFeedsRequest>(getValue.getEndpoint().getAdjustedEndpoint(16));
rangeFeedPop =
RequestStream<struct RangeFeedPopRequest>(getValue.getEndpoint().getAdjustedEndpoint(16));
RequestStream<struct RangeFeedPopRequest>(getValue.getEndpoint().getAdjustedEndpoint(17));
}
} else {
ASSERT(Ar::isDeserializing);
@ -171,6 +174,7 @@ struct StorageServerInterface {
streams.push_back(getRangeSplitPoints.getReceiver());
streams.push_back(getKeyValuesStream.getReceiver(TaskPriority::LoadBalancedEndpoint));
streams.push_back(rangeFeed.getReceiver());
streams.push_back(rangeFeedStream.getReceiver());
streams.push_back(overlappingRangeFeeds.getReceiver());
streams.push_back(rangeFeedPop.getReceiver());
FlowTransport::transport().addEndpoints(streams);
@ -676,6 +680,38 @@ struct RangeFeedRequest {
}
};
struct RangeFeedStreamReply : public ReplyPromiseStreamReply {
constexpr static FileIdentifier file_identifier = 1783066;
Arena arena;
VectorRef<MutationsAndVersionRef> mutations;
RangeFeedStreamReply() {}
RangeFeedStreamReply(RangeFeedReply r) : arena(r.arena), mutations(r.mutations) {}
int expectedSize() const { return sizeof(RangeFeedStreamReply) + mutations.expectedSize(); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, mutations, arena);
}
};
struct RangeFeedStreamRequest {
constexpr static FileIdentifier file_identifier = 6795746;
SpanID spanContext;
Arena arena;
Key rangeID;
Version begin = 0;
Version end = 0;
ReplyPromiseStream<RangeFeedStreamReply> reply;
RangeFeedStreamRequest() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, rangeID, begin, end, reply, spanContext, arena);
}
};
struct RangeFeedPopRequest {
constexpr static FileIdentifier file_identifier = 10726174;
Key rangeID;

View File

@ -317,6 +317,7 @@ struct RangeFeedInfo : ReferenceCounted<RangeFeedInfo> {
Version emptyVersion = 0;
KeyRange range;
Key id;
AsyncTrigger newMutations;
};
class ServerWatchMetadata : public ReferenceCounted<ServerWatchMetadata> {
@ -1555,7 +1556,11 @@ ACTOR Future<Void> overlappingRangeFeedsQ(StorageServer* data, OverlappingRangeF
ACTOR Future<RangeFeedReply> getRangeFeedMutations(StorageServer* data, RangeFeedRequest req) {
state RangeFeedReply reply;
wait(delay(0));
state int remainingLimitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
wait(delay(0, TaskPriority::DefaultEndpoint));
if (data->version.get() < req.begin) {
wait(data->version.whenAtLeast(req.begin));
}
auto& feedInfo = data->uidRangeFeed[req.rangeID];
/*printf("SS processing range feed req %s for version [%lld - %lld)\n",
req.rangeID.printable().c_str(),
@ -1566,12 +1571,12 @@ ACTOR Future<RangeFeedReply> getRangeFeedMutations(StorageServer* data, RangeFee
// printf(" Skipping b/c empty version\n");
} else if (feedInfo->durableVersion == invalidVersion || req.begin > feedInfo->durableVersion) {
for (auto& it : data->uidRangeFeed[req.rangeID]->mutations) {
// TODO could optimize this with binary search
if (it.version >= req.end) {
if (it.version >= req.end || remainingLimitBytes <= 0) {
break;
}
if (it.version >= req.begin) {
reply.mutations.push_back(reply.arena, it);
remainingLimitBytes -= sizeof(MutationsAndVersionRef) + it.expectedSize();
}
}
// printf(" Found %d in memory mutations\n", reply.mutations.size());
@ -1579,39 +1584,33 @@ ACTOR Future<RangeFeedReply> getRangeFeedMutations(StorageServer* data, RangeFee
state std::deque<Standalone<MutationsAndVersionRef>> mutationsDeque =
data->uidRangeFeed[req.rangeID]->mutations;
state Version startingDurableVersion = feedInfo->durableVersion;
Value rangeFeedKeyStart = rangeFeedDurableKey(req.rangeID, req.begin);
Value rangeFeedKeyEnd = rangeFeedDurableKey(req.rangeID, req.end);
/*printf(" Reading range feed versions from disk [%s - %s)\n",
rangeFeedKeyStart.printable().c_str(),
rangeFeedKeyEnd.printable().c_str());
*/
RangeResult res = wait(data->storage.readRange(KeyRangeRef(rangeFeedKeyStart, rangeFeedKeyEnd)));
Version lastVersion = req.begin - 1; // if no results, include all mutations from memory
// TODO REMOVE, for debugging
int diskMutations = 0;
RangeResult res = wait(data->storage.readRange(
KeyRangeRef(rangeFeedDurableKey(req.rangeID, req.begin), rangeFeedDurableKey(req.rangeID, req.end)),
1 << 30,
remainingLimitBytes));
Version lastVersion = req.begin - 1;
for (auto& kv : res) {
Key id;
Version version;
std::tie(id, version) = decodeRangeFeedDurableKey(kv.key);
auto mutations = decodeRangeFeedDurableValue(kv.value);
diskMutations += mutations.size();
reply.mutations.push_back_deep(reply.arena, MutationsAndVersionRef(mutations, version));
remainingLimitBytes -=
sizeof(KeyValueRef) +
kv.expectedSize(); // FIXME: this is currently tracking the size on disk rather than the reply size
// because we cannot add mutaitons from memory if there are potentially more on disk
lastVersion = version;
}
// printf(" Found %d on disk mutations from %d entries\n", diskMutations, res.size());
int memoryMutations = 0;
for (auto& it : mutationsDeque) {
if (it.version >= req.end) {
if (it.version >= req.end || remainingLimitBytes <= 0) {
break;
}
if (it.version > lastVersion) {
memoryMutations += it.mutations.size();
reply.mutations.push_back(reply.arena, it);
remainingLimitBytes -= sizeof(MutationsAndVersionRef) + it.expectedSize();
}
}
/*printf(
" Found %d in memory mutations from %d entries\n", memoryMutations, reply.mutations.size() - res.size());
*/
if (reply.mutations.empty()) {
auto& feedInfo = data->uidRangeFeed[req.rangeID];
if (startingDurableVersion == feedInfo->storageVersion && req.end > startingDurableVersion) {
@ -1637,6 +1636,10 @@ ACTOR Future<RangeFeedReply> getRangeFeedMutations(StorageServer* data, RangeFee
}
}
}
Version finalVersion = std::min(req.end - 1, data->version.get());
if ((reply.mutations.empty() || reply.mutations.back().version) < finalVersion && remainingLimitBytes > 0) {
reply.mutations.push_back(reply.arena, MutationsAndVersionRef(finalVersion));
}
return reply;
}
@ -1646,6 +1649,47 @@ ACTOR Future<Void> rangeFeedQ(StorageServer* data, RangeFeedRequest req) {
return Void();
}
ACTOR Future<Void> rangeFeedStreamQ(StorageServer* data, RangeFeedStreamRequest req)
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large
// selector offset prevents all data from being read in one range read
{
state Span span("SS:getRangeFeedStream"_loc, { req.spanContext });
state Version begin = req.begin;
req.reply.setByteLimit(SERVER_KNOBS->RANGESTREAM_LIMIT_BYTES);
wait(delay(0, TaskPriority::DefaultEndpoint));
try {
loop {
wait(req.reply.onReady());
state RangeFeedRequest feedRequest;
feedRequest.rangeID = req.rangeID;
feedRequest.begin = begin;
feedRequest.end = req.end;
RangeFeedReply feedReply = wait(getRangeFeedMutations(data, feedRequest));
begin = feedReply.mutations.back().version + 1;
req.reply.send(RangeFeedStreamReply(feedReply));
if (feedReply.mutations.back().version == req.end - 1) {
req.reply.sendError(end_of_stream());
return Void();
}
if (feedReply.mutations.back().mutations.empty()) {
choose {
when(wait(delay(5.0, TaskPriority::DefaultEndpoint))) {}
when(wait(data->uidRangeFeed[req.rangeID]->newMutations.onTrigger())) {}
}
}
}
} catch (Error& e) {
if (e.code() != error_code_operation_obsolete) {
if (!canReplyWith(e))
throw;
req.reply.sendError(e);
}
}
return Void();
}
#ifdef NO_INTELLISENSE
size_t WATCH_OVERHEAD_WATCHQ =
sizeof(WatchValueSendReplyActorState<WatchValueSendReplyActor>) + sizeof(WatchValueSendReplyActor);
@ -4244,12 +4288,6 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
.trackLatest(data->thisServerID.toString() + "/StorageServerSourceTLogID");
}
if (data->currentRangeFeeds.size()) {
data->rangeFeedVersions.push_back(std::make_pair(
std::vector<Key>(data->currentRangeFeeds.begin(), data->currentRangeFeeds.end()), ver));
data->currentRangeFeeds.clear();
}
data->noRecentUpdates.set(false);
data->lastUpdate = now();
data->version.set(ver); // Triggers replies to waiting gets for new version(s)
@ -4285,6 +4323,15 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
data->recoveryVersionSkips.pop_front();
}
data->desiredOldestVersion.set(proposedOldestVersion);
if (data->currentRangeFeeds.size()) {
data->rangeFeedVersions.push_back(std::make_pair(
std::vector<Key>(data->currentRangeFeeds.begin(), data->currentRangeFeeds.end()), ver));
for (auto& it : data->currentRangeFeeds) {
data->uidRangeFeed[it]->newMutations.trigger();
}
data->currentRangeFeeds.clear();
}
}
validate(data);
@ -4363,12 +4410,6 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
if (it.version >= newOldestVersion) {
break;
}
// TODO REMOVE!!
/*printf("Persisting %d range feed %s mutations at %lld\n",
info->mutations.front().mutations.size(),
info->id.printable().c_str(),
info->mutations.front().version);
*/
data->storage.writeKeyValue(
KeyValueRef(rangeFeedDurableKey(info->id, it.version), rangeFeedDurableValue(it.mutations)));
info->storageVersion = it.version;
@ -4413,11 +4454,10 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
curFeed = 0;
while (curFeed < updatedRangeFeeds.size()) {
auto info = data->uidRangeFeed[updatedRangeFeeds[curFeed]];
while (info->mutations.size() && info->mutations.front().version < newOldestVersion) {
while (!info->mutations.empty() && info->mutations.front().version < newOldestVersion) {
info->mutations.pop_front();
}
info->durableVersion = info->storageVersion;
// printf(" Updating range feed durable version to %lld\n", info->durableVersion);
wait(yield(TaskPriority::UpdateStorage));
curFeed++;
}
@ -5330,6 +5370,14 @@ ACTOR Future<Void> serveRangeFeedRequests(StorageServer* self, FutureStream<Rang
}
}
ACTOR Future<Void> serveRangeFeedStreamRequests(StorageServer* self,
FutureStream<RangeFeedStreamRequest> rangeFeedStream) {
loop {
RangeFeedStreamRequest req = waitNext(rangeFeedStream);
self->actors.add(rangeFeedStreamQ(self, req));
}
}
ACTOR Future<Void> serveOverlappingRangeFeedsRequests(
StorageServer* self,
FutureStream<OverlappingRangeFeedsRequest> overlappingRangeFeeds) {
@ -5396,6 +5444,7 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
self->actors.add(serveGetKeyRequests(self, ssi.getKey.getFuture()));
self->actors.add(serveWatchValueRequests(self, ssi.watchValue.getFuture()));
self->actors.add(serveRangeFeedRequests(self, ssi.rangeFeed.getFuture()));
self->actors.add(serveRangeFeedStreamRequests(self, ssi.rangeFeedStream.getFuture()));
self->actors.add(serveOverlappingRangeFeedsRequests(self, ssi.overlappingRangeFeeds.getFuture()));
self->actors.add(serveRangeFeedPopRequests(self, ssi.rangeFeedPop.getFuture()));
self->actors.add(traceRole(Role::STORAGE_SERVER, ssi.id()));