Got full demo working

This commit is contained in:
Josh Slocum 2021-08-12 10:18:58 -05:00
parent 921a2cfca1
commit 76a3c47e42
4 changed files with 205 additions and 132 deletions

View File

@ -1053,7 +1053,7 @@ const Value rangeFeedDurableKey(Key const& feed, Version const& version) {
BinaryWriter wr(AssumeVersion(ProtocolVersion::withRangeFeed()));
wr.serializeBytes(rangeFeedDurablePrefix);
wr << feed;
wr << littleEndian64(version);
wr << bigEndian64(version);
return wr.toValue();
}
std::pair<Key, Version> decodeRangeFeedDurableKey(ValueRef const& key) {
@ -1062,7 +1062,7 @@ std::pair<Key, Version> decodeRangeFeedDurableKey(ValueRef const& key) {
BinaryReader reader(key.removePrefix(rangeFeedDurablePrefix), AssumeVersion(ProtocolVersion::withRangeFeed()));
reader >> feed;
reader >> version;
return std::make_pair(feed, littleEndian64(version));
return std::make_pair(feed, bigEndian64(version));
}
const Value rangeFeedDurableValue(Standalone<VectorRef<MutationRef>> const& mutations) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withRangeFeed()));

View File

@ -2059,7 +2059,8 @@ public:
// disable the determinism check for remote region satellites.
bool remoteDCUsedAsSatellite = false;
if (req.configuration.regions.size() > 1) {
auto [region, remoteRegion] = getPrimaryAndRemoteRegion(req.configuration.regions, req.configuration.regions[0].dcId);
auto [region, remoteRegion] =
getPrimaryAndRemoteRegion(req.configuration.regions, req.configuration.regions[0].dcId);
for (const auto& satellite : region.satellites) {
if (satellite.dcId == remoteRegion.dcId) {
remoteDCUsedAsSatellite = true;
@ -3097,7 +3098,8 @@ ACTOR Future<Void> doBlobGranuleRequests(ClusterControllerData* self, Ratekeeper
state Reference<S3BlobStoreEndpoint> bstore;
// TODO CHANGE BACK
wait(delay(10.0));
// wait(delay(10.0));
wait(delay(70.0));
printf("Initializing CC s3 stuff\n");
try {
@ -3120,7 +3122,8 @@ ACTOR Future<Void> doBlobGranuleRequests(ClusterControllerData* self, Ratekeeper
try {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state KeyRange keyRange = KeyRange(KeyRangeRef(LiteralStringRef("\x01"), LiteralStringRef("\x02")));
// state KeyRange keyRange = KeyRange(KeyRangeRef(LiteralStringRef("\x01"), LiteralStringRef("\x02")));
// state KeyRange keyRange = KeyRange(KeyRangeRef());
state Version v = wait(tr->getReadVersion());
if (deterministicRandom()->random01() < 0.3) {
v -= 5000000;
@ -3128,8 +3131,10 @@ ACTOR Future<Void> doBlobGranuleRequests(ClusterControllerData* self, Ratekeeper
v -= 30000000;
}
state RangeResult blobGranuleMapping = wait(
krmGetRanges(tr, blobGranuleMappingKeys.begin, keyRange, 1000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
// right now just read whole blob range
state RangeResult blobGranuleMapping = wait(krmGetRanges(
tr, blobGranuleMappingKeys.begin, normalKeys /*keyRange*/, 1000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
ASSERT(!blobGranuleMapping.more && blobGranuleMapping.size() < CLIENT_KNOBS->TOO_MANY);
if (blobGranuleMapping.size() == 0) {
@ -3148,7 +3153,9 @@ ACTOR Future<Void> doBlobGranuleRequests(ClusterControllerData* self, Ratekeeper
granuleStartKey.printable().c_str(),
granuleEndKey.printable().c_str());
// TODO probably new exception type instead
throw transaction_too_old();
// TODO ADD BACK
// throw transaction_too_old();
continue;
}
state UID workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value);
@ -3157,12 +3164,14 @@ ACTOR Future<Void> doBlobGranuleRequests(ClusterControllerData* self, Ratekeeper
granuleEndKey.printable().c_str(),
workerId.toString().c_str());
if (i == 0) {
granuleStartKey = keyRange.begin;
// TODO CHANGE BACK!!
// prune first/last granules to requested range
/*if (i == 0) {
granuleStartKey = keyRange.begin;
}
if (i == blobGranuleMapping.size() - 2) {
granuleEndKey = keyRange.end;
}
granuleEndKey = keyRange.end;
}*/
if (!workerInterfaceCache.count(workerId)) {
Optional<Value> workerInterface = wait(tr->get(blobWorkerListKeyFor(workerId)));
@ -3210,8 +3219,16 @@ ACTOR Future<Void> doBlobGranuleRequests(ClusterControllerData* self, Ratekeeper
printf("Waiting for result chunk\n");
RangeResult result = waitNext(results.getFuture());
printf("Result chunk (%d):\n", result.size());
int resultIdx = 0;
for (auto& it : result) {
printf(" %s=%s\n", it.key.printable().c_str(), it.value.printable().c_str());
resultIdx++;
if (resultIdx >= 10) {
break;
}
}
if (resultIdx >= 10) {
printf(" ...\n");
}
}
} catch (Error& e) {

View File

@ -18,12 +18,15 @@
* limitations under the License.
*/
#include "fdbclient/DatabaseContext.h" // TODO REMOVE
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/IndexedSet.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/Smoother.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/NativeAPI.actor.h" // TODO REMOVE
#include "fdbclient/Tuple.h" // TODO REMOVE
#include "fdbclient/S3BlobStore.h" // TODO REMOVE
#include "fdbclient/AsyncFileS3BlobStore.actor.h" // TODO REMOVE
@ -532,7 +535,8 @@ struct RatekeeperLimits {
context(context) {}
};
struct GrvProxyInfo {
// TODO CHANGE BACK ONCE MOVING BLOB OUT OF HERE
struct GrvProxyInfoRk {
int64_t totalTransactions;
int64_t batchTransactions;
uint64_t lastThrottledTagChangeId;
@ -540,7 +544,7 @@ struct GrvProxyInfo {
double lastUpdateTime;
double lastTagPushTime;
GrvProxyInfo()
GrvProxyInfoRk()
: totalTransactions(0), batchTransactions(0), lastThrottledTagChangeId(0), lastUpdateTime(0), lastTagPushTime(0) {
}
};
@ -552,7 +556,7 @@ struct RatekeeperData {
Map<UID, StorageQueueInfo> storageQueueInfo;
Map<UID, TLogQueueInfo> tlogQueueInfo;
std::map<UID, GrvProxyInfo> grvProxyInfo;
std::map<UID, GrvProxyInfoRk> grvProxyInfo;
Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes;
HealthMetrics healthMetrics;
DatabaseConfiguration configuration;
@ -1413,7 +1417,6 @@ ACTOR Future<Void> configurationMonitor(RatekeeperData* self) {
}
}
<<<<<<< HEAD
// |-------------------------------------|
// | Blob Granule Stuff |
// |-------------------------------------|
@ -1429,9 +1432,7 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {
Arena deltaArena;
KeyRange keyRange;
Future<Void> rangeFeedFuture;
Future<Void> fileUpdaterFuture;
PromiseStream<MutationAndVersion> rangeFeed;
PromiseStream<Version> snapshotVersions;
// FIXME: right now there is a dependency because this contains both the actual file/delta data as well as the
@ -1439,7 +1440,7 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {
// better to have this in 2 separate objects, where the granule metadata map has the futures, but the read
// queries/file updater/range feed only copy the reference to the file/delta data.
void cancel() {
rangeFeedFuture = Never();
// rangeFeedFuture = Never();
fileUpdaterFuture = Never();
}
@ -1607,62 +1608,146 @@ ACTOR Future<std::pair<Version, std::string>> dumpSnapshotFromFDB(RatekeeperData
}
}
ACTOR Future<std::pair<Key, Version>> createRangeFeed(RatekeeperData* rkData, KeyRange keyRange) {
state Key rangeFeedID = StringRef(deterministicRandom()->randomUniqueID().toString());
state Transaction tr(rkData->db);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
loop {
try {
wait(tr.registerRangeFeed(rangeFeedID, keyRange));
wait(tr.commit());
return std::pair<Key, Version>(rangeFeedID, tr.getCommittedVersion());
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
// TODO this will have to change merging multiple range feeds and with the streaming API
ACTOR Future<Void> rangeFeedReader(RatekeeperData* rkData,
Key rangeFeedID,
KeyRange keyRange,
Version startVersion,
PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> mutationStream) {
// TODO this is a hack, basically want unbounded endVersion, maybe make it optional in the request?
state Version beginVersion = startVersion;
state Version endVersion = beginVersion + 1000000000;
loop {
Standalone<VectorRef<MutationsAndVersionRef>> mutations =
wait(rkData->db->getRangeFeedMutations(rangeFeedID, beginVersion, endVersion, keyRange));
// printf("RF got %d mutations for version [%lld - %lld)\n", mutations.size(), beginVersion, endVersion);
if (mutations.size()) {
// TODO REMOVE sanity check
for (auto& it : mutations) {
if (it.version < beginVersion || it.version >= endVersion) {
printf("RF returned version out of bounds! %lld [%lld - %lld)\n",
it.version,
beginVersion,
endVersion);
}
ASSERT(it.version >= beginVersion && it.version < endVersion);
}
beginVersion = mutations.back().version + 1;
endVersion = beginVersion + 100000000;
mutationStream.send(std::move(mutations));
// printf("RangeFeed beginVersion=%lld\n", beginVersion);
// TODO REMOVE, just for debugging
wait(delay(1.0));
} else {
// TODO this won't be necessary once we use the streaming API
wait(delay(1.0));
endVersion += 1000000;
}
}
}
// updater for a single granule
ACTOR Future<Void> blobGranuleUpdateFiles(RatekeeperData* rkData,
BlobWorkerData* bwData,
Reference<GranuleMetadata> metadata) {
state PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> rangeFeedStream;
state Future<Void> rangeFeedFuture;
try {
// create range feed first so the version the SS start recording mutations <= the snapshot version
state std::pair<Key, Version> rangeFeedData = wait(createRangeFeed(rkData, metadata->keyRange));
printf("Successfully created range feed %s for [%s - %s) @ %lld\n",
rangeFeedData.first.printable().c_str(),
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
rangeFeedData.second);
std::pair<Version, std::string> newSnapshotFile = wait(dumpSnapshotFromFDB(rkData, bwData, metadata->keyRange));
ASSERT(rangeFeedData.second <= newSnapshotFile.first);
metadata->snapshotFiles.push_back(newSnapshotFile);
metadata->lastWriteVersion = newSnapshotFile.first;
metadata->snapshotVersions.send(newSnapshotFile.first);
rangeFeedFuture = rangeFeedReader(
rkData, rangeFeedData.first, metadata->keyRange, newSnapshotFile.first + 1, rangeFeedStream);
loop {
MutationAndVersion delta = waitNext(metadata->rangeFeed.getFuture());
metadata->currentDeltas.push_back(metadata->deltaArena, delta);
// 8 for version, 1 for type, 4 for each param length then actual param size
metadata->currentDeltaBytes += 17 + delta.m.param1.size() + delta.m.param2.size();
state Standalone<VectorRef<MutationsAndVersionRef>> mutations = waitNext(rangeFeedStream.getFuture());
// TODO should maybe change mutation buffer to MutationsAndVersionRef instead of MutationAndVersion
for (auto& deltas : mutations) {
for (auto& delta : deltas.mutations) {
// TODO REMOVE!!! Just for initial debugging
/*printf("BlobWorker [%s - %s) Got Mutation @ %lld: %s\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
deltas.version,
delta.toString().c_str());*/
if (metadata->currentDeltaBytes >= SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES &&
metadata->currentDeltas.back().v > metadata->lastWriteVersion) {
printf("Granule [%s - %s) flushing delta file after %d bytes\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
metadata->currentDeltaBytes);
std::pair<Version, std::string> newDeltaFile =
wait(writeDeltaFile(rkData, bwData, metadata->keyRange, &metadata->currentDeltas));
metadata->currentDeltas.emplace_back_deep(metadata->deltaArena, delta, deltas.version);
// 8 for version, 1 for type, 4 for each param length then actual param size
metadata->currentDeltaBytes += 17 + delta.param1.size() + delta.param2.size();
}
// add new delta file
metadata->deltaFiles.push_back(newDeltaFile);
metadata->lastWriteVersion = newDeltaFile.first;
metadata->bytesInNewDeltaFiles += metadata->currentDeltaBytes;
// TODO handle version batch barriers
if (metadata->currentDeltaBytes >= SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES &&
metadata->currentDeltas.back().v > metadata->lastWriteVersion) {
printf("Granule [%s - %s) flushing delta file after %d bytes\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
metadata->currentDeltaBytes);
std::pair<Version, std::string> newDeltaFile =
wait(writeDeltaFile(rkData, bwData, metadata->keyRange, &metadata->currentDeltas));
// reset current deltas
metadata->deltaArena = Arena();
metadata->currentDeltas = GranuleDeltas();
metadata->currentDeltaBytes = 0;
}
// add new delta file
metadata->deltaFiles.push_back(newDeltaFile);
metadata->lastWriteVersion = newDeltaFile.first;
metadata->bytesInNewDeltaFiles += metadata->currentDeltaBytes;
if (metadata->bytesInNewDeltaFiles >= SERVER_KNOBS->BG_DELTA_BYTES_BEFORE_COMPACT) {
printf("Granule [%s - %s) compacting after %d bytes\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
metadata->bytesInNewDeltaFiles);
// FIXME: instead of just doing new snapshot, it should offer shard back to blob manager and get
// reassigned
// FIXME: this should read previous snapshot + delta files instead, unless it knows it's really small or
// there was a huge clear or something
std::pair<Version, std::string> newSnapshotFile =
wait(dumpSnapshotFromFDB(rkData, bwData, metadata->keyRange));
// reset current deltas
metadata->deltaArena = Arena();
metadata->currentDeltas = GranuleDeltas();
metadata->currentDeltaBytes = 0;
// add new snapshot file
metadata->snapshotFiles.push_back(newSnapshotFile);
metadata->lastWriteVersion = newSnapshotFile.first;
metadata->snapshotVersions.send(newSnapshotFile.first);
printf(
"Popping range feed %s at %lld\n", rangeFeedData.first.printable().c_str(), newDeltaFile.first);
wait(rkData->db->popRangeFeedMutations(rangeFeedData.first, newDeltaFile.first));
}
// reset metadata
metadata->bytesInNewDeltaFiles = 0;
if (metadata->bytesInNewDeltaFiles >= SERVER_KNOBS->BG_DELTA_BYTES_BEFORE_COMPACT) {
printf("Granule [%s - %s) compacting after %d bytes\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
metadata->bytesInNewDeltaFiles);
// FIXME: instead of just doing new snapshot, it should offer shard back to blob manager and get
// reassigned
// FIXME: this should read previous snapshot + delta files instead, unless it knows it's really
// small or there was a huge clear or something
std::pair<Version, std::string> newSnapshotFile =
wait(dumpSnapshotFromFDB(rkData, bwData, metadata->keyRange));
// add new snapshot file
metadata->snapshotFiles.push_back(newSnapshotFile);
metadata->lastWriteVersion = newSnapshotFile.first;
metadata->snapshotVersions.send(newSnapshotFile.first);
// reset metadata
metadata->bytesInNewDeltaFiles = 0;
}
}
}
} catch (Error& e) {
@ -1777,66 +1862,6 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* wkData, const BlobGranu
req.reply.send(rep);
}
// dumb series of mutations that just sets/clears same key that is unrelated to actual db transactions
ACTOR Future<Void> fakeRangeFeed(PromiseStream<MutationAndVersion> mutationStream,
PromiseStream<Version> snapshotVersions,
KeyRange keyRange) {
state Version version = waitNext(snapshotVersions.getFuture());
state uint32_t targetKbPerSec = (uint32_t)(SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES) / 10;
state Arena arena;
printf("Fake range feed got initial version %lld\n", version);
loop {
// dumb series of mutations that just sets/clears same key that is unrelated to actual db transactions
state uint32_t bytesGenerated = 0;
state uint32_t targetKbThisSec =
targetKbPerSec / 2 + (uint32_t)(deterministicRandom()->random01() * targetKbPerSec);
state uint32_t mutationsGenerated = 0;
while (bytesGenerated < targetKbThisSec) {
MutationAndVersion update;
update.v = version;
update.m.param1 = keyRange.begin;
update.m.param2 = keyRange.begin;
if (deterministicRandom()->random01() < 0.5) {
// clear start key
update.m.type = MutationRef::Type::ClearRange;
} else {
// set
update.m.type = MutationRef::Type::SetValue;
}
mutationsGenerated++;
bytesGenerated += 17 + 2 * keyRange.begin.size();
mutationStream.send(update);
// simulate multiple mutations with same version (TODO: this should be possible right)
if (deterministicRandom()->random01() < 0.4) {
version++;
}
if (mutationsGenerated % 1000 == 0) {
wait(yield());
}
}
// printf("Fake range feed generated %d mutations at version %lld\n", mutationsGenerated, version);
choose {
when(wait(delay(1.0))) {
// slightly slower than real versions, to try to ensure it doesn't get ahead
version += 950000;
}
when(Version _v = waitNext(snapshotVersions.getFuture())) {
if (_v > version) {
printf("updating fake range feed from %lld to snapshot version %lld\n", version, _v);
version = _v;
} else {
printf("snapshot version %lld was ahead of fake range feed version %lld, keeping fake version\n",
_v,
version);
}
}
}
}
}
static Reference<GranuleMetadata> constructNewBlobRange(RatekeeperData* rkData,
BlobWorkerData* wkData,
KeyRange keyRange) {
@ -1845,7 +1870,7 @@ static Reference<GranuleMetadata> constructNewBlobRange(RatekeeperData* rkData,
keyRange.end.printable().c_str());
Reference<GranuleMetadata> newMetadata = makeReference<GranuleMetadata>();
newMetadata->keyRange = keyRange;
newMetadata->rangeFeedFuture = fakeRangeFeed(newMetadata->rangeFeed, newMetadata->snapshotVersions, keyRange);
// newMetadata->rangeFeedFuture = fakeRangeFeed(newMetadata->rangeFeed, newMetadata->snapshotVersions, keyRange);
newMetadata->fileUpdaterFuture = blobGranuleUpdateFiles(rkData, wkData, newMetadata);
return newMetadata;
@ -2102,6 +2127,7 @@ ACTOR Future<Void> nukeBlobWorkerData(RatekeeperData* rkData) {
try {
tr->clear(blobWorkerListKeys);
tr->clear(blobGranuleMappingKeys);
tr->clear(rangeFeedKeys);
return Void();
} catch (Error& e) {
@ -2249,10 +2275,7 @@ ACTOR Future<Void> blobManagerPoc(RatekeeperData* rkData, LocalityData locality)
}
}
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
=======
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
>>>>>>> feature-range-feed
state RatekeeperData self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True));
state Future<Void> timeout = Void();
state std::vector<Future<Void>> tlogTrackers;

View File

@ -1557,34 +1557,61 @@ ACTOR Future<RangeFeedReply> getRangeFeedMutations(StorageServer* data, RangeFee
state RangeFeedReply reply;
wait(delay(0));
auto& feedInfo = data->uidRangeFeed[req.rangeID];
/*printf("SS processing range feed req %s for version [%lld - %lld)\n",
req.rangeID.printable().c_str(),
req.begin,
req.end);
*/
if (req.end <= feedInfo->emptyVersion + 1) {
// printf(" Skipping b/c empty version\n");
} else if (feedInfo->durableVersion == invalidVersion || req.begin > feedInfo->durableVersion) {
for (auto& it : data->uidRangeFeed[req.rangeID]->mutations) {
reply.mutations.push_back(reply.arena, it);
// TODO could optimize this with binary search
if (it.version >= req.end) {
break;
}
if (it.version >= req.begin) {
reply.mutations.push_back(reply.arena, it);
}
}
// printf(" Found %d in memory mutations\n", reply.mutations.size());
} else {
state std::deque<Standalone<MutationsAndVersionRef>> mutationsDeque =
data->uidRangeFeed[req.rangeID]->mutations;
state Version startingDurableVersion = feedInfo->durableVersion;
RangeResult res = wait(data->storage.readRange(
KeyRangeRef(rangeFeedDurableKey(req.rangeID, req.begin), rangeFeedDurableKey(req.rangeID, req.end))));
Version lastVersion = invalidVersion;
Value rangeFeedKeyStart = rangeFeedDurableKey(req.rangeID, req.begin);
Value rangeFeedKeyEnd = rangeFeedDurableKey(req.rangeID, req.end);
/*printf(" Reading range feed versions from disk [%s - %s)\n",
rangeFeedKeyStart.printable().c_str(),
rangeFeedKeyEnd.printable().c_str());
*/
RangeResult res = wait(data->storage.readRange(KeyRangeRef(rangeFeedKeyStart, rangeFeedKeyEnd)));
Version lastVersion = req.begin - 1; // if no results, include all mutations from memory
// TODO REMOVE, for debugging
int diskMutations = 0;
for (auto& kv : res) {
Key id;
Version version;
std::tie(id, version) = decodeRangeFeedDurableKey(kv.key);
auto mutations = decodeRangeFeedDurableValue(kv.value);
diskMutations += mutations.size();
reply.mutations.push_back_deep(reply.arena, MutationsAndVersionRef(mutations, version));
lastVersion = version;
}
// printf(" Found %d on disk mutations from %d entries\n", diskMutations, res.size());
int memoryMutations = 0;
for (auto& it : mutationsDeque) {
if (it.version >= req.end) {
break;
}
if (it.version > lastVersion) {
memoryMutations += it.mutations.size();
reply.mutations.push_back(reply.arena, it);
}
}
/*printf(
" Found %d in memory mutations from %d entries\n", memoryMutations, reply.mutations.size() - res.size());
*/
if (reply.mutations.empty()) {
auto& feedInfo = data->uidRangeFeed[req.rangeID];
if (startingDurableVersion == feedInfo->storageVersion && req.end > startingDurableVersion) {
@ -4336,9 +4363,15 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
if (it.version >= newOldestVersion) {
break;
}
data->storage.writeKeyValue(KeyValueRef(rangeFeedDurableKey(info->id, info->mutations.front().version),
rangeFeedDurableValue(info->mutations.front().mutations)));
info->storageVersion = info->mutations.front().version;
// TODO REMOVE!!
/*printf("Persisting %d range feed %s mutations at %lld\n",
info->mutations.front().mutations.size(),
info->id.printable().c_str(),
info->mutations.front().version);
*/
data->storage.writeKeyValue(
KeyValueRef(rangeFeedDurableKey(info->id, it.version), rangeFeedDurableValue(it.mutations)));
info->storageVersion = it.version;
}
wait(yield(TaskPriority::UpdateStorage));
curFeed++;
@ -4380,12 +4413,11 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
curFeed = 0;
while (curFeed < updatedRangeFeeds.size()) {
auto info = data->uidRangeFeed[updatedRangeFeeds[curFeed]];
while (info->mutations.front().version < newOldestVersion) {
while (info->mutations.size() && info->mutations.front().version < newOldestVersion) {
info->mutations.pop_front();
}
if (info->storageVersion != invalidVersion) {
info->durableVersion = info->mutations.front().version;
}
info->durableVersion = info->storageVersion;
// printf(" Updating range feed durable version to %lld\n", info->durableVersion);
wait(yield(TaskPriority::UpdateStorage));
curFeed++;
}
@ -5788,6 +5820,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
throw internal_error();
} catch (Error& e) {
printf("SS crashed with error %s\n", e.name());
if (recovered.canBeSet())
recovered.send(Void());
if (storageServerTerminated(self, persistentData, e))