ratekeeper and ser-des fixes

This commit is contained in:
Bharadwaj V.R 2022-03-24 17:25:07 -07:00
parent abce71c146
commit 961e4ae7fd
6 changed files with 355 additions and 105 deletions

View File

@ -51,6 +51,151 @@ struct VersionReply {
}
};
struct StorageServerInterfaceOld {
constexpr static FileIdentifier file_identifier = 15302073;
enum { BUSY_ALLOWED = 0, BUSY_FORCE = 1, BUSY_LOCAL = 2 };
enum { LocationAwareLoadBalance = 1 };
enum { AlwaysFresh = 0 };
LocalityData locality;
UID uniqueID;
Optional<UID> tssPairID;
RequestStream<struct GetValueRequest> getValue;
RequestStream<struct GetKeyRequest> getKey;
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large
// selector offset prevents all data from being read in one range read
RequestStream<struct GetKeyValuesRequest> getKeyValues;
RequestStream<struct GetMappedKeyValuesRequest> getMappedKeyValues;
RequestStream<struct GetShardStateRequest> getShardState;
RequestStream<struct WaitMetricsRequest> waitMetrics;
RequestStream<struct SplitMetricsRequest> splitMetrics;
RequestStream<struct GetStorageMetricsRequest> getStorageMetrics;
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct StorageQueuingMetricsRequest> getQueuingMetrics;
RequestStream<ReplyPromise<KeyValueStoreType>> getKeyValueStoreType;
RequestStream<struct WatchValueRequest> watchValue;
RequestStream<struct ReadHotSubRangeRequest> getReadHotRanges;
RequestStream<struct SplitRangeRequest> getRangeSplitPoints;
RequestStream<struct GetKeyValuesStreamRequest> getKeyValuesStream;
RequestStream<struct ChangeFeedStreamRequest> changeFeedStream;
RequestStream<struct OverlappingChangeFeedsRequest> overlappingChangeFeeds;
RequestStream<struct ChangeFeedPopRequest> changeFeedPop;
RequestStream<struct ChangeFeedVersionUpdateRequest> changeFeedVersionUpdate;
RequestStream<struct GetCheckpointRequest> checkpoint;
RequestStream<struct FetchCheckpointRequest> fetchCheckpoint;
explicit StorageServerInterfaceOld(UID uid) : uniqueID(uid) {}
StorageServerInterfaceOld() : uniqueID(deterministicRandom()->randomUniqueID()) {}
NetworkAddress address() const { return getValue.getEndpoint().getPrimaryAddress(); }
NetworkAddress stableAddress() const { return getValue.getEndpoint().getStableAddress(); }
Optional<NetworkAddress> secondaryAddress() const { return getValue.getEndpoint().addresses.secondaryAddress; }
UID id() const { return uniqueID; }
bool isTss() const { return tssPairID.present(); }
std::string toString() const { return id().shortString(); }
template <class Ar>
void serialize(Ar& ar) {
// StorageServerInterface is persisted in the database, so changes here have to be versioned carefully!
// To change this serialization, ProtocolVersion::ServerListValue must be updated, and downgrades need to be
// considered
if (ar.protocolVersion().hasSmallEndpoints()) {
if (ar.protocolVersion().hasTSS()) {
serializer(ar, uniqueID, locality, getValue, tssPairID);
} else {
serializer(ar, uniqueID, locality, getValue);
}
if (Ar::isDeserializing) {
getKey = RequestStream<struct GetKeyRequest>(getValue.getEndpoint().getAdjustedEndpoint(1));
getKeyValues = RequestStream<struct GetKeyValuesRequest>(getValue.getEndpoint().getAdjustedEndpoint(2));
getShardState =
RequestStream<struct GetShardStateRequest>(getValue.getEndpoint().getAdjustedEndpoint(3));
waitMetrics = RequestStream<struct WaitMetricsRequest>(getValue.getEndpoint().getAdjustedEndpoint(4));
splitMetrics = RequestStream<struct SplitMetricsRequest>(getValue.getEndpoint().getAdjustedEndpoint(5));
getStorageMetrics =
RequestStream<struct GetStorageMetricsRequest>(getValue.getEndpoint().getAdjustedEndpoint(6));
waitFailure = RequestStream<ReplyPromise<Void>>(getValue.getEndpoint().getAdjustedEndpoint(7));
getQueuingMetrics =
RequestStream<struct StorageQueuingMetricsRequest>(getValue.getEndpoint().getAdjustedEndpoint(8));
getKeyValueStoreType =
RequestStream<ReplyPromise<KeyValueStoreType>>(getValue.getEndpoint().getAdjustedEndpoint(9));
watchValue = RequestStream<struct WatchValueRequest>(getValue.getEndpoint().getAdjustedEndpoint(10));
getReadHotRanges =
RequestStream<struct ReadHotSubRangeRequest>(getValue.getEndpoint().getAdjustedEndpoint(11));
getRangeSplitPoints =
RequestStream<struct SplitRangeRequest>(getValue.getEndpoint().getAdjustedEndpoint(12));
getKeyValuesStream =
RequestStream<struct GetKeyValuesStreamRequest>(getValue.getEndpoint().getAdjustedEndpoint(13));
getMappedKeyValues =
RequestStream<struct GetMappedKeyValuesRequest>(getValue.getEndpoint().getAdjustedEndpoint(14));
changeFeedStream =
RequestStream<struct ChangeFeedStreamRequest>(getValue.getEndpoint().getAdjustedEndpoint(15));
overlappingChangeFeeds =
RequestStream<struct OverlappingChangeFeedsRequest>(getValue.getEndpoint().getAdjustedEndpoint(16));
changeFeedPop =
RequestStream<struct ChangeFeedPopRequest>(getValue.getEndpoint().getAdjustedEndpoint(17));
changeFeedVersionUpdate = RequestStream<struct ChangeFeedVersionUpdateRequest>(
getValue.getEndpoint().getAdjustedEndpoint(18));
checkpoint = RequestStream<struct GetCheckpointRequest>(getValue.getEndpoint().getAdjustedEndpoint(19));
fetchCheckpoint =
RequestStream<struct FetchCheckpointRequest>(getValue.getEndpoint().getAdjustedEndpoint(20));
}
} else {
ASSERT(Ar::isDeserializing);
if constexpr (is_fb_function<Ar>) {
ASSERT(false);
}
serializer(ar,
uniqueID,
locality,
getValue,
getKey,
getKeyValues,
getShardState,
waitMetrics,
splitMetrics,
getStorageMetrics,
waitFailure,
getQueuingMetrics,
getKeyValueStoreType);
if (ar.protocolVersion().hasWatches()) {
serializer(ar, watchValue);
}
}
}
bool operator==(StorageServerInterfaceOld const& s) const { return uniqueID == s.uniqueID; }
bool operator<(StorageServerInterfaceOld const& s) const { return uniqueID < s.uniqueID; }
void initEndpoints() {
std::vector<std::pair<FlowReceiver*, TaskPriority>> streams;
streams.push_back(getValue.getReceiver(TaskPriority::LoadBalancedEndpoint));
streams.push_back(getKey.getReceiver(TaskPriority::LoadBalancedEndpoint));
streams.push_back(getKeyValues.getReceiver(TaskPriority::LoadBalancedEndpoint));
streams.push_back(getShardState.getReceiver());
streams.push_back(waitMetrics.getReceiver());
streams.push_back(splitMetrics.getReceiver());
streams.push_back(getStorageMetrics.getReceiver());
streams.push_back(waitFailure.getReceiver());
streams.push_back(getQueuingMetrics.getReceiver());
streams.push_back(getKeyValueStoreType.getReceiver());
streams.push_back(watchValue.getReceiver());
streams.push_back(getReadHotRanges.getReceiver());
streams.push_back(getRangeSplitPoints.getReceiver());
streams.push_back(getKeyValuesStream.getReceiver(TaskPriority::LoadBalancedEndpoint));
streams.push_back(getMappedKeyValues.getReceiver(TaskPriority::LoadBalancedEndpoint));
streams.push_back(changeFeedStream.getReceiver());
streams.push_back(overlappingChangeFeeds.getReceiver());
streams.push_back(changeFeedPop.getReceiver());
streams.push_back(changeFeedVersionUpdate.getReceiver());
streams.push_back(checkpoint.getReceiver());
streams.push_back(fetchCheckpoint.getReceiver());
FlowTransport::transport().addEndpoints(streams);
}
};
struct StorageServerInterface {
constexpr static FileIdentifier file_identifier = 15302073;
enum { BUSY_ALLOWED = 0, BUSY_FORCE = 1, BUSY_LOCAL = 2 };

View File

@ -587,29 +587,30 @@ const Key serverListKeyFor(UID serverID) {
return wr.toValue();
}
// TODO use flatbuffers depending on version
const Value serverListValue(StorageServerInterface const& server) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withServerListValue()));
const Value serverListValueOld(StorageServerInterfaceOld const& server) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withTSS()));
wr << server;
return wr.toValue();
}
const Value serverListValue(StorageServerInterface const& server) {
return serverListValueFB(server);
}
UID decodeServerListKey(KeyRef const& key) {
UID serverID;
BinaryReader rd(key.removePrefix(serverListKeys.begin), Unversioned());
rd >> serverID;
return serverID;
}
StorageServerInterface decodeServerListValue(ValueRef const& value) {
StorageServerInterface s;
BinaryReader reader(value, IncludeVersion());
StorageServerInterfaceOld decodeServerListValueOld(ValueRef const& value) {
StorageServerInterfaceOld s;
BinaryReader reader(value, IncludeVersion(ProtocolVersion::withTSS()));
reader >> s;
return s;
}
const Value serverListValueFB(StorageServerInterface const& server) {
return ObjectWriter::toValue(server, IncludeVersion());
}
StorageServerInterface decodeServerListValueFB(ValueRef const& value) {
StorageServerInterface s;
ObjectReader reader(value.begin(), IncludeVersion());
@ -617,6 +618,24 @@ StorageServerInterface decodeServerListValueFB(ValueRef const& value) {
return s;
}
StorageServerInterface decodeServerListValue(ValueRef const& value) {
StorageServerInterface s;
BinaryReader reader(value, IncludeVersion());
if (!reader.protocolVersion().hasStorageInterfaceReadiness()) {
reader >> s;
return s;
}
return decodeServerListValueFB(value);
}
const Value serverListValueFB(StorageServerInterface const& server) {
auto protocolVersion = currentProtocolVersion;
protocolVersion.addObjectSerializerFlag();
return ObjectWriter::toValue(server, IncludeVersion(protocolVersion));
}
// processClassKeys.contains(k) iff k.startsWith( processClassKeys.begin ) because '/'+1 == '0'
const KeyRangeRef processClassKeys(LiteralStringRef("\xff/processClass/"), LiteralStringRef("\xff/processClass0"));
const KeyRef processClassPrefix = processClassKeys.begin;
@ -1401,7 +1420,7 @@ void testSSISerdes(StorageServerInterface const& ssi, bool useFB) {
}
// unit test for serialization since tss stuff had bugs
TEST_CASE("/SystemData/SerDes/SSI") {
TEST_CASE("/SystemData/SSI/SerDes") {
printf("testing ssi serdes\n");
LocalityData localityData(Optional<Standalone<StringRef>>(),
Standalone<StringRef>(deterministicRandom()->randomUniqueID().toString()),
@ -1425,3 +1444,80 @@ TEST_CASE("/SystemData/SerDes/SSI") {
return Void();
}
TEST_CASE("/SystemData/SSI/Downgrade") {
std::vector<StorageServerInterface> newssis;
constexpr int num_ssis = 10;
LocalityData localityData(Optional<Standalone<StringRef>>(),
Standalone<StringRef>(deterministicRandom()->randomUniqueID().toString()),
Standalone<StringRef>(deterministicRandom()->randomUniqueID().toString()),
Optional<Standalone<StringRef>>());
for (int i = 0; i < num_ssis; i++) {
StorageServerInterface ssi;
ssi.locality = localityData;
ssi.uniqueID = UID(0x1234123412341234 + i, 0x5678567856785678 + i);
ssi.acceptingRequests = i % 2;
ssi.initEndpoints();
newssis.push_back(ssi);
}
for (int i = 0; i < num_ssis; i++) {
StorageServerInterfaceOld oldssi;
StorageServerInterface newssi;
auto value = serverListValueFB(newssis[i]);
oldssi = decodeServerListValueOld(value);
newssi = decodeServerListValue(value);
ASSERT(oldssi.locality == newssis[i].locality);
ASSERT(oldssi.id() == newssis[i].id());
ASSERT(oldssi.getValue.getEndpoint().token == newssis[i].getValue.getEndpoint().token);
ASSERT(newssi.locality == newssis[i].locality);
ASSERT(newssi.id() == newssis[i].id());
ASSERT(newssi.isAcceptingRequests() == newssis[i].isAcceptingRequests());
ASSERT(newssi.getValue.getEndpoint().token == newssis[i].getValue.getEndpoint().token);
}
return Void();
}
TEST_CASE("/SystemData/SSI/Upgrade") {
std::vector<StorageServerInterfaceOld> oldssis;
constexpr int num_ssis = 10;
LocalityData localityData(Optional<Standalone<StringRef>>(),
Standalone<StringRef>(deterministicRandom()->randomUniqueID().toString()),
Standalone<StringRef>(deterministicRandom()->randomUniqueID().toString()),
Optional<Standalone<StringRef>>());
for (int i = 0; i < num_ssis; i++) {
StorageServerInterfaceOld ssi;
ssi.locality = localityData;
ssi.uniqueID = UID(0x1234123412341234 + i, 0x5678567856785678 + i);
ssi.initEndpoints();
oldssis.push_back(ssi);
}
for (int i = 0; i < num_ssis; i++) {
StorageServerInterfaceOld oldssi;
StorageServerInterface newssi;
auto value = serverListValueOld(oldssis[i]);
oldssi = decodeServerListValueOld(value);
newssi = decodeServerListValue(value);
ASSERT(oldssi.locality == oldssis[i].locality);
ASSERT(oldssi.id() == oldssis[i].id());
ASSERT(oldssi.getValue.getEndpoint().token == oldssis[i].getValue.getEndpoint().token);
ASSERT(newssi.locality == oldssis[i].locality);
ASSERT(newssi.id() == oldssis[i].id());
ASSERT(newssi.isAcceptingRequests() == 0);
ASSERT(newssi.getValue.getEndpoint().token == oldssis[i].getValue.getEndpoint().token);
}
return Void();
}

View File

@ -202,6 +202,7 @@ extern const KeyRangeRef serverListKeys;
extern const KeyRef serverListPrefix;
const Key serverListKeyFor(UID serverID);
const Value serverListValue(StorageServerInterface const&);
const Value serverListValueFB(StorageServerInterface const&);
UID decodeServerListKey(KeyRef const&);
StorageServerInterface decodeServerListValue(ValueRef const&);

View File

@ -121,7 +121,8 @@ public:
newServers[serverId] = ssi;
if (oldServers.count(serverId)) {
if (ssi.getValue.getEndpoint() != oldServers[serverId].getValue.getEndpoint()) {
if (ssi.getValue.getEndpoint() != oldServers[serverId].getValue.getEndpoint() ||
ssi.isAcceptingRequests() != oldServers[serverId].isAcceptingRequests()) {
serverChanges.send(std::make_pair(serverId, Optional<StorageServerInterface>(ssi)));
}
oldServers.erase(serverId);
@ -183,6 +184,7 @@ public:
myQueueInfo->value.busiestReadTag = reply.get().busiestTag;
myQueueInfo->value.busiestReadTagFractionalBusyness = reply.get().busiestTagFractionalBusyness;
myQueueInfo->value.busiestReadTagRate = reply.get().busiestTagRate;
myQueueInfo->value.acceptingRequests = ssi.isAcceptingRequests();
} else {
if (myQueueInfo->value.valid) {
TraceEvent("RkStorageServerDidNotRespond", self->id).detail("StorageServer", ssi.id());
@ -255,7 +257,7 @@ public:
when(state std::pair<UID, Optional<StorageServerInterface>> change = waitNext(serverChanges)) {
wait(delay(0)); // prevent storageServerTracker from getting cancelled while on the call stack
if (change.second.present()) {
if (!change.second.get().isTss() && change.second.get().isAcceptingRequests()) {
if (!change.second.get().isTss()) {
auto& a = actors[change.first];
a = Future<Void>();
a = splitError(trackStorageServerQueueInfo(self, change.second.get()), err);
@ -523,7 +525,7 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
// Look at each storage server's write queue and local rate, compute and store the desired rate ratio
for (auto i = storageQueueInfo.begin(); i != storageQueueInfo.end(); ++i) {
auto const& ss = i->value;
if (!ss.valid || (remoteDC.present() && ss.locality.dcId() == remoteDC))
if (!ss.valid || !ss.acceptingRequests || (remoteDC.present() && ss.locality.dcId() == remoteDC))
continue;
++sscount;

View File

@ -52,6 +52,7 @@ struct StorageQueueInfo {
LocalityData locality;
StorageQueuingMetricsReply lastReply;
StorageQueuingMetricsReply prevReply;
bool acceptingRequests;
Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes;
Smoother smoothDurableVersion, smoothLatestVersion;
Smoother smoothFreeSpace;
@ -70,8 +71,9 @@ struct StorageQueueInfo {
int totalWriteOps = 0;
StorageQueueInfo(UID id, LocalityData locality)
: valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
: valid(false), id(id), locality(locality), acceptingRequests(false),
smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
limitReason(limitReason_t::unlimited),

View File

@ -7382,10 +7382,10 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
self->coreStarted.send(Void());
// if (self->registerInterfaceAcceptingRequests.canBeSet()) {
// self->registerInterfaceAcceptingRequests.send(true);
// wait(self->interfaceRegistered);
// }
if (self->registerInterfaceAcceptingRequests.canBeSet()) {
self->registerInterfaceAcceptingRequests.send(true);
wait(self->interfaceRegistered);
}
loop {
++self->counters.loops;
@ -7576,91 +7576,6 @@ ACTOR Future<Void> initTenantMap(StorageServer* self) {
return Void();
}
// for creating a new storage server
ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
StorageServerInterface ssi,
Tag seedTag,
UID clusterId,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder) {
state StorageServer self(persistentData, db, ssi);
state Future<Void> ssCore;
self.clusterId.send(clusterId);
if (ssi.isTss()) {
self.setTssPair(ssi.tssPairID.get());
ASSERT(self.isTss());
}
self.sk = serverKeysPrefixFor(self.tssPairID.present() ? self.tssPairID.get() : self.thisServerID)
.withPrefix(systemKeys.begin); // FFFF/serverKeys/[this server]/
self.folder = folder;
self.registerInterfaceAcceptingRequests.send(false);
try {
wait(self.storage.init());
wait(self.storage.commit());
++self.counters.kvCommits;
ssi.startAcceptingRequests();
if (seedTag == invalidTag) {
// Might throw recruitment_failed in case of simultaneous master failure
std::pair<Version, Tag> verAndTag = wait(addStorageServer(self.cx, ssi));
self.tag = verAndTag.second;
if (ssi.isTss()) {
self.setInitialVersion(tssSeedVersion);
} else {
self.setInitialVersion(verAndTag.first - 1);
}
wait(initTenantMap(&self));
} else {
self.tag = seedTag;
}
self.storage.makeNewStorageServerDurable();
wait(self.storage.commit());
++self.counters.kvCommits;
TraceEvent("StorageServerInit", ssi.id())
.detail("Version", self.version.get())
.detail("SeedTag", seedTag.toString())
.detail("TssPair", ssi.isTss() ? ssi.tssPairID.get().toString() : "");
InitializeStorageReply rep;
rep.interf = ssi;
rep.addedVersion = self.version.get();
recruitReply.send(rep);
self.byteSampleRecovery = Void();
ssCore = storageServerCore(&self, ssi);
wait(ssCore);
throw internal_error();
} catch (Error& e) {
// If we die with an error before replying to the recruitment request, send the error to the recruiter
// (ClusterController, and from there to the DataDistributionTeamCollection)
if (!recruitReply.isSet())
recruitReply.sendError(recruitment_failed());
// If the storage server dies while something that uses self is still on the stack,
// we want that actor to complete before we terminate and that memory goes out of scope
state Error err = e;
if (storageServerTerminated(self, persistentData, err)) {
ssCore.cancel();
self.actors.clear(true);
wait(delay(0));
return Void();
}
ssCore.cancel();
self.actors.clear(true);
wait(delay(0));
throw err;
}
}
ACTOR Future<Void> replaceInterface(StorageServer* self, StorageServerInterface ssi) {
ASSERT(!ssi.isTss());
state Transaction tr(self->cx);
@ -7838,6 +7753,95 @@ ACTOR Future<Void> storageInterfaceRegistration(StorageServer* self,
return Void();
}
// for creating a new storage server
ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
StorageServerInterface ssi,
Tag seedTag,
UID clusterId,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder) {
state StorageServer self(persistentData, db, ssi);
state Future<Void> ssCore;
self.clusterId.send(clusterId);
if (ssi.isTss()) {
self.setTssPair(ssi.tssPairID.get());
ASSERT(self.isTss());
}
self.sk = serverKeysPrefixFor(self.tssPairID.present() ? self.tssPairID.get() : self.thisServerID)
.withPrefix(systemKeys.begin); // FFFF/serverKeys/[this server]/
self.folder = folder;
try {
wait(self.storage.init());
wait(self.storage.commit());
++self.counters.kvCommits;
if (seedTag == invalidTag) {
ssi.startAcceptingRequests();
self.registerInterfaceAcceptingRequests.send(false);
// Might throw recruitment_failed in case of simultaneous master failure
std::pair<Version, Tag> verAndTag = wait(addStorageServer(self.cx, ssi));
self.tag = verAndTag.second;
if (ssi.isTss()) {
self.setInitialVersion(tssSeedVersion);
} else {
self.setInitialVersion(verAndTag.first - 1);
}
wait(initTenantMap(&self));
} else {
self.tag = seedTag;
}
self.interfaceRegistered =
storageInterfaceRegistration(&self, ssi, self.registerInterfaceAcceptingRequests.getFuture());
wait(delay(0));
self.storage.makeNewStorageServerDurable();
wait(self.storage.commit());
++self.counters.kvCommits;
TraceEvent("StorageServerInit", ssi.id())
.detail("Version", self.version.get())
.detail("SeedTag", seedTag.toString())
.detail("TssPair", ssi.isTss() ? ssi.tssPairID.get().toString() : "");
InitializeStorageReply rep;
rep.interf = ssi;
rep.addedVersion = self.version.get();
recruitReply.send(rep);
self.byteSampleRecovery = Void();
ssCore = storageServerCore(&self, ssi);
wait(ssCore);
throw internal_error();
} catch (Error& e) {
// If we die with an error before replying to the recruitment request, send the error to the recruiter
// (ClusterController, and from there to the DataDistributionTeamCollection)
if (!recruitReply.isSet())
recruitReply.sendError(recruitment_failed());
// If the storage server dies while something that uses self is still on the stack,
// we want that actor to complete before we terminate and that memory goes out of scope
state Error err = e;
if (storageServerTerminated(self, persistentData, err)) {
ssCore.cancel();
self.actors.clear(true);
wait(delay(0));
return Void();
}
ssCore.cancel();
self.actors.clear(true);
wait(delay(0));
throw err;
}
}
// for recovering an existing storage server
ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
StorageServerInterface ssi,