made range feeds durable on the storage server

This commit is contained in:
Evan Tschannen 2021-07-30 15:23:42 -07:00
parent e9409b02fe
commit 0989c28a6b
7 changed files with 167 additions and 26 deletions

View File

@ -3585,9 +3585,11 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
is_error = true;
continue;
}
Standalone<VectorRef<MutationRefAndVersion>> res = wait(db->getRangeFeedMutations(tokens[2]));
Standalone<VectorRef<MutationsAndVersionRef>> res = wait(db->getRangeFeedMutations(tokens[2]));
for (auto& it : res) {
printf("%lld %s\n", it.version, it.mutation.toString().c_str());
for (auto& it2 : it.mutations) {
printf("%lld %s\n", it.version, it2.toString().c_str());
}
}
} else if (tokencmp(tokens[1], "pop")) {
if (tokens.size() != 4) {

View File

@ -252,7 +252,7 @@ public:
// Management API, create snapshot
Future<Void> createSnapshot(StringRef uid, StringRef snapshot_command);
Future<Standalone<VectorRef<MutationRefAndVersion>>> getRangeFeedMutations(StringRef rangeID);
Future<Standalone<VectorRef<MutationsAndVersionRef>>> getRangeFeedMutations(StringRef rangeID);
Future<Void> popRangeFeedMutations(StringRef rangeID, Version version);
// private:

View File

@ -6517,8 +6517,8 @@ Future<Void> DatabaseContext::createSnapshot(StringRef uid, StringRef snapshot_c
return createSnapshotActor(this, UID::fromString(uid_str), snapshot_command);
}
ACTOR Future<Standalone<VectorRef<MutationRefAndVersion>>> getRangeFeedMutationsActor(Reference<DatabaseContext> db,
StringRef rangeID) {
ACTOR Future<Standalone<VectorRef<MutationsAndVersionRef>>> getRangeFeedMutationsActor(Reference<DatabaseContext> db,
StringRef rangeID) {
state Database cx(db);
state Transaction tr(cx);
state Key rangeIDKey = rangeID.withPrefix(rangeFeedPrefix);
@ -6550,10 +6550,10 @@ ACTOR Future<Standalone<VectorRef<MutationRefAndVersion>>> getRangeFeedMutations
TaskPriority::DefaultPromiseEndpoint,
AtMostOnce::False,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr));
return Standalone<VectorRef<MutationRefAndVersion>>(rep.mutations, rep.arena);
return Standalone<VectorRef<MutationsAndVersionRef>>(rep.mutations, rep.arena);
}
Future<Standalone<VectorRef<MutationRefAndVersion>>> DatabaseContext::getRangeFeedMutations(StringRef rangeID) {
Future<Standalone<VectorRef<MutationsAndVersionRef>>> DatabaseContext::getRangeFeedMutations(StringRef rangeID) {
return getRangeFeedMutationsActor(Reference<DatabaseContext>::addRef(this), rangeID);
}

View File

@ -623,27 +623,29 @@ struct SplitRangeRequest {
}
};
struct MutationRefAndVersion {
MutationRef mutation;
struct MutationsAndVersionRef {
VectorRef<MutationRef> mutations;
Version version;
MutationRefAndVersion() {}
MutationRefAndVersion(MutationRef mutation, Version version) : mutation(mutation), version(version) {}
MutationRefAndVersion(Arena& to, MutationRef mutation, Version version)
: mutation(to, mutation), version(version) {}
MutationRefAndVersion(Arena& to, const MutationRefAndVersion& from)
: mutation(to, from.mutation), version(from.version) {}
int expectedSize() const { return mutation.expectedSize(); }
MutationsAndVersionRef() {}
explicit MutationsAndVersionRef(Version version) : version(version) {}
MutationsAndVersionRef(VectorRef<MutationRef> mutations, Version version)
: mutations(mutations), version(version) {}
MutationsAndVersionRef(Arena& to, VectorRef<MutationRef> mutations, Version version)
: mutations(to, mutations), version(version) {}
MutationsAndVersionRef(Arena& to, const MutationsAndVersionRef& from)
: mutations(to, from.mutations), version(from.version) {}
int expectedSize() const { return mutations.expectedSize(); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, mutation, version);
serializer(ar, mutations, version);
}
};
struct RangeFeedReply {
constexpr static FileIdentifier file_identifier = 11815134;
VectorRef<MutationRefAndVersion> mutations;
VectorRef<MutationsAndVersionRef> mutations;
bool cached;
Arena arena;

View File

@ -1046,6 +1046,36 @@ KeyRange decodeRangeFeedValue(ValueRef const& value) {
return range;
}
const KeyRangeRef rangeFeedDurableKeys(LiteralStringRef("\xff\xff/rf/"), LiteralStringRef("\xff\xff/rf0"));
const KeyRef rangeFeedDurablePrefix = rangeFeedDurableKeys.begin;
const Value rangeFeedDurableKey(Key const& feed, Version const& version) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(rangeFeedDurablePrefix);
wr << feed;
wr << version;
return wr.toValue();
}
std::pair<Key, Version> decodeRangeFeedDurableKey(ValueRef const& key) {
Key feed;
Version version;
BinaryReader reader(key.removePrefix(rangeFeedDurablePrefix), Unversioned());
reader >> feed;
reader >> version;
return std::make_pair(feed, version);
}
const Value rangeFeedDurableValue(Standalone<VectorRef<MutationRef>> const& mutations) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withRangeFeed()));
wr << mutations;
return wr.toValue();
}
Standalone<VectorRef<MutationRef>> decodeRangeFeedDurableValue(ValueRef const& value) {
Standalone<VectorRef<MutationRef>> mutations;
BinaryReader reader(value, IncludeVersion());
reader >> mutations;
return mutations;
}
const KeyRef configTransactionDescriptionKey = "\xff\xff/description"_sr;
const KeyRange globalConfigKnobKeys = singleKeyRange("\xff\xff/globalKnobs"_sr);
const KeyRangeRef configKnobKeys("\xff\xff/knobs/"_sr, "\xff\xff/knobs0"_sr);

View File

@ -501,6 +501,14 @@ KeyRange decodeRangeFeedValue(ValueRef const& value);
extern const KeyRef rangeFeedPrefix;
extern const KeyRef rangeFeedPrivatePrefix;
extern const KeyRangeRef rangeFeedDurableKeys;
extern const KeyRef rangeFeedDurablePrefix;
const Value rangeFeedDurableKey(Key const& feed, Version const& version);
std::pair<Key, Version> decodeRangeFeedDurableKey(ValueRef const& key);
const Value rangeFeedDurableValue(Standalone<VectorRef<MutationRef>> const& mutations);
Standalone<VectorRef<MutationRef>> decodeRangeFeedDurableValue(ValueRef const& value);
// Configuration database special keys
extern const KeyRef configTransactionDescriptionKey;
extern const KeyRange globalConfigKnobKeys;

View File

@ -310,7 +310,8 @@ struct FetchInjectionInfo {
};
struct RangeFeedInfo : ReferenceCounted<RangeFeedInfo> {
std::deque<Standalone<MutationRefAndVersion>> mutations;
std::deque<Standalone<MutationsAndVersionRef>> mutations;
Version durableVersion = invalidVersion;
KeyRange range;
Key id;
};
@ -581,6 +582,8 @@ public:
KeyRangeMap<std::vector<Reference<RangeFeedInfo>>> keyRangeFeed;
std::map<Key, Reference<RangeFeedInfo>> uidRangeFeed;
Deque<std::pair<std::vector<Key>, Version>> rangeFeedVersions;
std::set<Key> currentRangeFeeds;
// newestAvailableVersion[k]
// == invalidVersion -> k is unavailable at all versions
@ -1504,11 +1507,37 @@ ACTOR Future<Void> watchValueSendReply(StorageServer* data,
}
ACTOR Future<Void> rangeFeedQ(StorageServer* data, RangeFeedRequest req) {
state RangeFeedReply reply;
wait(delay(0));
RangeFeedReply reply;
for (auto& it : data->uidRangeFeed[req.rangeID]->mutations) {
reply.mutations.push_back(reply.arena, it);
auto& feedInfo = data->uidRangeFeed[req.rangeID];
if (feedInfo->durableVersion == invalidVersion) {
for (auto& it : data->uidRangeFeed[req.rangeID]->mutations) {
reply.mutations.push_back(reply.arena, it);
}
} else {
state std::deque<Standalone<MutationsAndVersionRef>> mutationsDeque =
data->uidRangeFeed[req.rangeID]->mutations;
RangeResult res = wait(data->storage.readRange(
KeyRangeRef(rangeFeedDurableKey(req.rangeID, 0), rangeFeedDurableKey(req.rangeID, data->version.get()))));
if (res.empty()) {
data->uidRangeFeed[req.rangeID]->durableVersion = invalidVersion;
}
Version lastVersion = invalidVersion;
for (auto& kv : res) {
Key id;
Version version;
std::tie(id, version) = decodeRangeFeedDurableKey(kv.key);
auto mutations = decodeRangeFeedDurableValue(kv.value);
reply.mutations.push_back(reply.arena, MutationsAndVersionRef(mutations, version));
lastVersion = version;
}
for (auto& it : mutationsDeque) {
if (it.version > lastVersion) {
reply.mutations.push_back(reply.arena, it);
}
}
}
TraceEvent("RangeFeedQuery", data->thisServerID)
.detail("RangeID", req.rangeID.printable())
.detail("Mutations", reply.mutations.size());
@ -2628,7 +2657,11 @@ void applyMutation(StorageServer* self,
self->watches.trigger(m.param1);
for (auto& it : self->keyRangeFeed[m.param1]) {
it->mutations.push_back(MutationRefAndVersion(m, version));
if (it->mutations.empty() || it->mutations.back().version != version) {
it->mutations.push_back(MutationsAndVersionRef(version));
}
it->mutations.back().mutations.push_back_deep(it->mutations.back().arena(), m);
self->currentRangeFeeds.insert(it->id);
}
} else if (m.type == MutationRef::ClearRange) {
data.erase(m.param1, m.param2);
@ -2640,7 +2673,11 @@ void applyMutation(StorageServer* self,
auto ranges = self->keyRangeFeed.intersectingRanges(KeyRangeRef(m.param1, m.param2));
for (auto& r : ranges) {
for (auto& it : r.value()) {
it->mutations.push_back(MutationRefAndVersion(m, version));
if (it->mutations.empty() || it->mutations.back().version != version) {
it->mutations.push_back(MutationsAndVersionRef(version));
}
it->mutations.back().mutations.push_back_deep(it->mutations.back().arena(), m);
self->currentRangeFeeds.insert(it->id);
}
}
}
@ -3434,6 +3471,8 @@ static const KeyRangeRef persistByteSampleSampleKeys =
LiteralStringRef(PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS0"));
static const KeyRef persistLogProtocol = LiteralStringRef(PERSIST_PREFIX "LogProtocol");
static const KeyRef persistPrimaryLocality = LiteralStringRef(PERSIST_PREFIX "PrimaryLocality");
static const KeyRangeRef persistRangeFeedKeys =
KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "RF/"), LiteralStringRef(PERSIST_PREFIX "RF0"));
// data keys are unmangled (but never start with PERSIST_PREFIX because they are always in allKeys)
class StorageUpdater {
@ -3575,6 +3614,11 @@ private:
r->value().push_back(rangeFeedInfo);
}
data->keyRangeFeed.coalesce(rangeFeedRange.contents());
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
data->addMutationToMutationLog(mLV,
MutationRef(MutationRef::SetValue,
persistRangeFeedKeys.begin.toString() + rangeFeedId.toString(),
m.param2));
} else if (m.param1.substr(1).startsWith(tssMappingKeys.begin) &&
(m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange)) {
if (!data->isTss()) {
@ -3962,6 +4006,12 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
.trackLatest(data->thisServerID.toString() + "/StorageServerSourceTLogID");
}
if (data->currentRangeFeeds.size()) {
data->rangeFeedVersions.push_back(std::make_pair(
std::vector<Key>(data->currentRangeFeeds.begin(), data->currentRangeFeeds.end()), ver));
data->currentRangeFeeds.clear();
}
data->noRecentUpdates.set(false);
data->lastUpdate = now();
data->version.set(ver); // Triggers replies to waiting gets for new version(s)
@ -4060,6 +4110,26 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
break;
}
std::set<Key> modifiedRangeFeeds;
while (data->rangeFeedVersions.front().second < newOldestVersion) {
modifiedRangeFeeds.insert(data->rangeFeedVersions.front().first.begin(),
data->rangeFeedVersions.front().first.end());
data->rangeFeedVersions.pop_front();
}
state std::vector<Key> updatedRangeFeeds(modifiedRangeFeeds.begin(), modifiedRangeFeeds.end());
state int curFeed = 0;
while (curFeed < updatedRangeFeeds.size()) {
auto info = data->uidRangeFeed[updatedRangeFeeds[curFeed]];
while (info->mutations.front().version < newOldestVersion) {
data->storage.writeKeyValue(KeyValueRef(rangeFeedDurableKey(info->id, info->mutations.front().version),
rangeFeedDurableValue(info->mutations.front().mutations)));
info->durableVersion = info->mutations.front().version;
info->mutations.pop_front();
}
wait(yield(TaskPriority::UpdateStorage));
}
// Set the new durable version as part of the outstanding change set, before commit
if (startOldestVersion != newOldestVersion)
data->storage.makeVersionDurable(newOldestVersion);
@ -4379,6 +4449,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
state Future<Optional<Value>> fPrimaryLocality = storage->readValue(persistPrimaryLocality);
state Future<RangeResult> fShardAssigned = storage->readRange(persistShardAssignedKeys);
state Future<RangeResult> fShardAvailable = storage->readRange(persistShardAvailableKeys);
state Future<RangeResult> fRangeFeeds = storage->readRange(persistRangeFeedKeys);
state Promise<Void> byteSampleSampleRecovered;
state Promise<Void> startByteSampleRestore;
@ -4387,7 +4458,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
TraceEvent("ReadingDurableState", data->thisServerID).log();
wait(waitForAll(std::vector{ fFormat, fID, ftssPairID, fTssQuarantine, fVersion, fLogProtocol, fPrimaryLocality }));
wait(waitForAll(std::vector{ fShardAssigned, fShardAvailable }));
wait(waitForAll(std::vector{ fShardAssigned, fShardAvailable, fRangeFeeds }));
wait(byteSampleSampleRecovered.getFuture());
TraceEvent("RestoringDurableState", data->thisServerID).log();
@ -4465,6 +4536,26 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
wait(yield());
}
state RangeResult rangeFeeds = fRangeFeeds.get();
state int feedLoc;
for (feedLoc = 0; feedLoc < rangeFeeds.size(); feedLoc++) {
Key rangeFeedId = rangeFeeds[feedLoc].key.removePrefix(persistRangeFeedKeys.begin);
KeyRange rangeFeedRange = decodeRangeFeedValue(rangeFeeds[feedLoc].value);
TraceEvent("RestoringRangeFeed", data->thisServerID)
.detail("RangeID", rangeFeedId.printable())
.detail("Range", rangeFeedRange.toString());
Reference<RangeFeedInfo> rangeFeedInfo(new RangeFeedInfo());
rangeFeedInfo->range = rangeFeedRange;
rangeFeedInfo->id = rangeFeedId;
rangeFeedInfo->durableVersion = version;
data->uidRangeFeed[rangeFeedId] = rangeFeedInfo;
auto rs = data->keyRangeFeed.modify(rangeFeedRange);
for (auto r = rs.begin(); r != rs.end(); ++r) {
r->value().push_back(rangeFeedInfo);
}
wait(yield());
}
data->keyRangeFeed.coalesce(allKeys);
// TODO: why is this seemingly random delay here?
wait(delay(0.0001));
@ -4982,9 +5073,17 @@ ACTOR Future<Void> serveRangeFeedRequests(StorageServer* self, FutureStream<Rang
ACTOR Future<Void> serveRangeFeedPopRequests(StorageServer* self, FutureStream<RangeFeedPopRequest> rangeFeedPops) {
loop {
RangeFeedPopRequest req = waitNext(rangeFeedPops);
while (self->uidRangeFeed[req.rangeID]->mutations.front().version < req.version) {
auto& feed = self->uidRangeFeed[req.rangeID];
while (feed->mutations.front().version < req.version) {
self->uidRangeFeed[req.rangeID]->mutations.pop_front();
}
if (feed->durableVersion != invalidVersion) {
self->storage.clearRange(
KeyRangeRef(rangeFeedDurableKey(feed->id, 0), rangeFeedDurableKey(feed->id, req.version)));
if (req.version > feed->durableVersion) {
feed->durableVersion = invalidVersion;
}
}
TraceEvent("RangeFeedPopQuery", self->thisServerID)
.detail("RangeID", req.rangeID.printable())
.detail("Version", req.version);