diff --git a/fdbclient/include/fdbclient/StorageServerInterface.h b/fdbclient/include/fdbclient/StorageServerInterface.h index 8e1d82d6a0..0206dd6340 100644 --- a/fdbclient/include/fdbclient/StorageServerInterface.h +++ b/fdbclient/include/fdbclient/StorageServerInterface.h @@ -22,7 +22,6 @@ #define FDBCLIENT_STORAGESERVERINTERFACE_H #pragma once -#include #include "fdbclient/FDBTypes.h" #include "fdbclient/StorageCheckpoint.h" #include "fdbclient/StorageServerShard.h" @@ -55,6 +54,34 @@ struct VersionReply { } }; +// This struct is used by RK to forward the commit cost to SS, see discussion in #7258 +struct UpdateCommitCostRequest { + constexpr static FileIdentifier file_identifier = 4159439; + + // Ratekeeper ID, it is only reasonable to compare postTime from the same Ratekeeper + UID ratekeeperID; + + // The time the request being posted + double postTime; + + double elapsed; + TransactionTag busiestTag; + + // Properties that are defined in TransactionCommitCostEstimation + int opsSum; + uint64_t costSum; + + uint64_t totalWriteCosts; + bool reported; + + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, ratekeeperID, postTime, elapsed, busiestTag, opsSum, costSum, totalWriteCosts, reported, reply); + } +}; + struct StorageServerInterface { constexpr static FileIdentifier file_identifier = 15302073; enum { BUSY_ALLOWED = 0, BUSY_FORCE = 1, BUSY_LOCAL = 2 }; @@ -94,6 +121,8 @@ struct StorageServerInterface { RequestStream fetchCheckpoint; RequestStream fetchCheckpointKeyValues; + RequestStream updateCommitCostRequest; + private: bool acceptingRequests; @@ -163,6 +192,8 @@ public: RequestStream(getValue.getEndpoint().getAdjustedEndpoint(20)); fetchCheckpointKeyValues = RequestStream( getValue.getEndpoint().getAdjustedEndpoint(21)); + updateCommitCostRequest = + RequestStream(getValue.getEndpoint().getAdjustedEndpoint(22)); } } else { ASSERT(Ar::isDeserializing); @@ -213,6 +244,7 @@ public: streams.push_back(checkpoint.getReceiver()); streams.push_back(fetchCheckpoint.getReceiver()); streams.push_back(fetchCheckpointKeyValues.getReceiver()); + streams.push_back(updateCommitCostRequest.getReceiver()); FlowTransport::transport().addEndpoints(streams); } }; diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index a5383bc1f6..13a7f61ae3 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -22,6 +22,7 @@ #include "fdbserver/Ratekeeper.h" #include "fdbserver/TagThrottler.h" #include "fdbserver/WaitFailure.h" +#include "flow/OwningResource.h" #include "flow/actorcompiler.h" // must be last include @@ -147,9 +148,9 @@ public: } } - ACTOR static Future trackStorageServerQueueInfo(Ratekeeper* self, StorageServerInterface ssi) { - self->storageQueueInfo.insert(mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality))); - state Map::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id()); + ACTOR static Future trackStorageServerQueueInfo(ActorWeakSelfRef self, + StorageServerInterface ssi) { + self->storageQueueInfo.insert(mapPair(ssi.id(), StorageQueueInfo(self->id, ssi.id(), ssi.locality))); TraceEvent("RkTracking", self->id) .detail("StorageServer", ssi.id()) .detail("Locality", ssi.locality.toString()); @@ -157,6 +158,7 @@ public: loop { ErrorOr reply = wait(ssi.getQueuingMetrics.getReplyUnlessFailedFor( StorageQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply? + Map::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id()); if (reply.present()) { myQueueInfo->value.update(reply.get(), self->smoothTotalDurableBytes); myQueueInfo->value.acceptingRequests = ssi.isAcceptingRequests(); @@ -173,7 +175,8 @@ public: } } catch (...) { // including cancellation - self->storageQueueInfo.erase(myQueueInfo); + self->storageQueueInfo.erase(ssi.id()); + self->storageServerInterfaces.erase(ssi.id()); throw; } } @@ -207,28 +210,40 @@ public: } ACTOR static Future trackEachStorageServer( - Ratekeeper* self, + ActorWeakSelfRef self, FutureStream>> serverChanges) { - state Map> actors; + + state std::unordered_map> storageServerTrackers; state Promise err; + loop choose { when(state std::pair> change = waitNext(serverChanges)) { wait(delay(0)); // prevent storageServerTracker from getting cancelled while on the call stack + + const UID& id = change.first; if (change.second.present()) { if (!change.second.get().isTss()) { - auto& a = actors[change.first]; + + auto& a = storageServerTrackers[change.first]; a = Future(); a = splitError(trackStorageServerQueueInfo(self, change.second.get()), err); + + self->storageServerInterfaces[id] = change.second.get(); } - } else - actors.erase(change.first); + } else { + storageServerTrackers.erase(id); + + self->storageServerInterfaces.erase(id); + } } when(wait(err.getFuture())) {} } } ACTOR static Future run(RatekeeperInterface rkInterf, Reference const> dbInfo) { - state Ratekeeper self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)); + state ActorOwningSelfRef pSelf( + new Ratekeeper(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True))); + state Ratekeeper& self = *pSelf; state Future timeout = Void(); state std::vector> tlogTrackers; state std::vector tlogInterfs; @@ -241,7 +256,7 @@ public: PromiseStream>> serverChanges; self.addActor.send(self.monitorServerListChange(serverChanges)); - self.addActor.send(self.trackEachStorageServer(serverChanges.getFuture())); + self.addActor.send(RatekeeperImpl::trackEachStorageServer(pSelf, serverChanges.getFuture())); self.addActor.send(traceRole(Role::RATEKEEPER, rkInterf.id())); self.addActor.send(self.monitorThrottlingChanges()); @@ -367,17 +382,23 @@ public: ACTOR static Future refreshStorageServerCommitCosts(Ratekeeper* self) { state double lastBusiestCommitTagPick; + state std::vector> replies; loop { lastBusiestCommitTagPick = now(); wait(delay(SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL)); + + replies.clear(); + double elapsed = now() - lastBusiestCommitTagPick; // for each SS, select the busiest commit tag from ssTrTagCommitCost for (auto& [ssId, ssQueueInfo] : self->storageQueueInfo) { - ssQueueInfo.refreshCommitCost(elapsed); + // NOTE: In some cases, for unknown reason SS will not respond to the updateCommitCostRequest. Since the + // information is not time-sensitive, we do not wait for the replies. + replies.push_back(self->storageServerInterfaces[ssId].updateCommitCostRequest.getReply( + ssQueueInfo.refreshCommitCost(elapsed))); } } } - }; // class RatekeeperImpl Future Ratekeeper::configurationMonitor() { @@ -389,15 +410,6 @@ Future Ratekeeper::monitorServerListChange( return RatekeeperImpl::monitorServerListChange(this, serverChanges); } -Future Ratekeeper::trackEachStorageServer( - FutureStream>> serverChanges) { - return RatekeeperImpl::trackEachStorageServer(this, serverChanges); -} - -Future Ratekeeper::trackStorageServerQueueInfo(StorageServerInterface ssi) { - return RatekeeperImpl::trackStorageServerQueueInfo(this, ssi); -} - Future Ratekeeper::trackTLogQueueInfo(TLogInterface tli) { return RatekeeperImpl::trackTLogQueueInfo(this, tli); } @@ -940,17 +952,19 @@ ACTOR Future ratekeeper(RatekeeperInterface rkInterf, Reference(id.toString() + "/BusiestWriteTag")), valid(false), - id(id), locality(locality), acceptingRequests(false), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), - smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), - limitReason(limitReason_t::unlimited) { +StorageQueueInfo::StorageQueueInfo(const UID& ratekeeperID_, const UID& id_, const LocalityData& locality_) + : valid(false), ratekeeperID(ratekeeperID_), id(id_), locality(locality_), acceptingRequests(false), + smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), + verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), + smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), + smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited) { // FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo lastReply.instanceID = -1; } +StorageQueueInfo::StorageQueueInfo(const UID& id_, const LocalityData& locality_) + : StorageQueueInfo(UID(), id_, locality_) {} + void StorageQueueInfo::addCommitCost(TransactionTagRef tagName, TransactionCommitCostEstimation const& cost) { tagCostEst[tagName] += cost; totalWriteCosts += cost.getCostSum(); @@ -983,7 +997,7 @@ void StorageQueueInfo::update(StorageQueuingMetricsReply const& reply, Smoother& busiestReadTags = reply.busiestTags; } -void StorageQueueInfo::refreshCommitCost(double elapsed) { +UpdateCommitCostRequest StorageQueueInfo::refreshCommitCost(double elapsed) { busiestWriteTags.clear(); TransactionTag busiestTag; TransactionCommitCostEstimation maxCost; @@ -1003,19 +1017,22 @@ void StorageQueueInfo::refreshCommitCost(double elapsed) { busiestWriteTags.emplace_back(busiestTag, maxRate, maxBusyness); } - TraceEvent("BusiestWriteTag", id) - .detail("Elapsed", elapsed) - .detail("Tag", printable(busiestTag)) - .detail("TagOps", maxCost.getOpsSum()) - .detail("TagCost", maxCost.getCostSum()) - .detail("TotalCost", totalWriteCosts) - .detail("Reported", !busiestWriteTags.empty()) - .trackLatest(busiestWriteTagEventHolder->trackingKey); + UpdateCommitCostRequest updateCommitCostRequest{ ratekeeperID, + now(), + elapsed, + busiestTag, + maxCost.getOpsSum(), + maxCost.getCostSum(), + totalWriteCosts, + !busiestWriteTags.empty(), + ReplyPromise() }; // reset statistics tagCostEst.clear(); totalWriteOps = 0; totalWriteCosts = 0; + + return updateCommitCostRequest; } Optional StorageQueueInfo::getThrottlingRatio(int64_t storageTargetBytes, int64_t storageSpringBytes) const { diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index 58122537d8..fa6d7eeb43 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -1900,27 +1900,6 @@ static Future>> getServerMetrics( return results; } -ACTOR template -static Future> getServerBusiestWriteTags( - std::vector servers, - std::unordered_map address_workers, - WorkerDetails rkWorker) { - state std::vector>> futures; - futures.reserve(servers.size()); - for (const auto& s : servers) { - futures.push_back(latestEventOnWorker(rkWorker.interf, s.id().toString() + "/BusiestWriteTag")); - } - wait(waitForAll(futures)); - - std::vector result(servers.size()); - for (int i = 0; i < servers.size(); ++i) { - if (futures[i].get().present()) { - result[i] = futures[i].get().get(); - } - } - return result; -} - ACTOR static Future> readStorageInterfaceAndMetadata(Database cx, bool use_system_priority) { @@ -1958,6 +1937,16 @@ static Future> readStorageInterfaceAndMetad return servers; } +namespace { + +const std::vector STORAGE_SERVER_METRICS_LIST{ "StorageMetrics", + "ReadLatencyMetrics", + "ReadLatencyBands", + "BusiestReadTag", + "BusiestWriteTag" }; + +} // namespace + ACTOR static Future> getStorageServerStatusInfos( Database cx, std::unordered_map address_workers, @@ -1965,18 +1954,9 @@ ACTOR static Future> getStorageServerStatus state std::vector servers = wait(timeoutError(readStorageInterfaceAndMetadata(cx, true), 5.0)); state std::vector> results; - state std::vector busiestWriteTags; - wait(store(results, - getServerMetrics(servers, - address_workers, - std::vector{ - "StorageMetrics", "ReadLatencyMetrics", "ReadLatencyBands", "BusiestReadTag" })) && - store(busiestWriteTags, getServerBusiestWriteTags(servers, address_workers, rkWorker))); - - ASSERT(busiestWriteTags.size() == results.size()); + wait(store(results, getServerMetrics(servers, address_workers, STORAGE_SERVER_METRICS_LIST))); for (int i = 0; i < results.size(); ++i) { servers[i].eventMap = std::move(results[i].second); - servers[i].eventMap.emplace("BusiestWriteTag", busiestWriteTags[i]); } return servers; } diff --git a/fdbserver/include/fdbserver/Ratekeeper.h b/fdbserver/include/fdbserver/Ratekeeper.h index 1b7a3f990a..2c46533e45 100644 --- a/fdbserver/include/fdbserver/Ratekeeper.h +++ b/fdbserver/include/fdbserver/Ratekeeper.h @@ -18,6 +18,9 @@ * limitations under the License. */ +#ifndef FDBSERVER_RATEKEEPER_H +#define FDBSERVER_RATEKEEPER_H + #pragma once #include "fdbclient/DatabaseConfiguration.h" @@ -49,13 +52,13 @@ enum limitReason_t { class StorageQueueInfo { uint64_t totalWriteCosts{ 0 }; int totalWriteOps{ 0 }; - Reference busiestWriteTagEventHolder; // refresh periodically TransactionTagMap tagCostEst; public: bool valid; + UID ratekeeperID; UID id; LocalityData locality; StorageQueuingMetricsReply lastReply; @@ -67,8 +70,10 @@ public: limitReason_t limitReason; std::vector busiestReadTags, busiestWriteTags; - StorageQueueInfo(UID id, LocalityData locality); - void refreshCommitCost(double elapsed); + StorageQueueInfo(const UID& id, const LocalityData& locality); + StorageQueueInfo(const UID& rateKeeperID, const UID& id, const LocalityData& locality); + // Summarizes up the commit cost per storage server. Returns the UpdateCommitCostRequest for corresponding SS. + UpdateCommitCostRequest refreshCommitCost(double elapsed); int64_t getStorageQueueBytes() const { return lastReply.bytesInput - smoothDurableBytes.smoothTotal(); } int64_t getDurabilityLag() const { return smoothLatestVersion.smoothTotal() - smoothDurableVersion.smoothTotal(); } void update(StorageQueuingMetricsReply const&, Smoother& smoothTotalDurableBytes); @@ -153,6 +158,9 @@ class Ratekeeper { std::unique_ptr tagThrottler; + // Maps storage server ID to storage server interface + std::unordered_map storageServerInterfaces; + RatekeeperLimits normalLimits; RatekeeperLimits batchLimits; @@ -166,7 +174,6 @@ class Ratekeeper { void updateRate(RatekeeperLimits* limits); Future refreshStorageServerCommitCosts(); Future monitorServerListChange(PromiseStream>> serverChanges); - Future trackEachStorageServer(FutureStream>> serverChanges); // SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function Future trackStorageServerQueueInfo(StorageServerInterface); @@ -179,3 +186,5 @@ class Ratekeeper { public: static Future run(RatekeeperInterface rkInterf, Reference const> dbInfo); }; + +#endif // FDBSERVER_RATEKEEPER_H \ No newline at end of file diff --git a/fdbserver/include/fdbserver/RatekeeperInterface.h b/fdbserver/include/fdbserver/RatekeeperInterface.h index 9d90a980ef..b361b87a80 100644 --- a/fdbserver/include/fdbserver/RatekeeperInterface.h +++ b/fdbserver/include/fdbserver/RatekeeperInterface.h @@ -51,6 +51,8 @@ struct RatekeeperInterface { }; struct TransactionCommitCostEstimation { + // NOTE: If class variables are changed, counterparts in StorageServerInterface.h:UpdateCommitCostRequest should be + // updated too. int opsSum = 0; uint64_t costSum = 0; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 68137c8d6a..42403a1a36 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -69,6 +69,7 @@ #include "fdbserver/MoveKeys.actor.h" #include "fdbserver/MutationTracking.h" #include "fdbserver/OTELSpanContextMessage.h" +#include "fdbserver/Ratekeeper.h" #include "fdbserver/RecoveryState.h" #include "fdbserver/RocksDBCheckpointUtils.actor.h" #include "fdbserver/ServerCheckpoint.actor.h" @@ -635,6 +636,17 @@ public: : key(key), value(value), version(version), tags(tags), debugID(debugID) {} }; +struct BusiestWriteTagContext { + const std::string busiestWriteTagTrackingKey; + UID ratekeeperID; + Reference busiestWriteTagEventHolder; + double lastUpdateTime; + + BusiestWriteTagContext(const UID& thisServerID) + : busiestWriteTagTrackingKey(thisServerID.toString() + "/BusiestWriteTag"), ratekeeperID(UID()), + busiestWriteTagEventHolder(makeReference(busiestWriteTagTrackingKey)), lastUpdateTime(-1) {} +}; + struct StorageServer { typedef VersionedMap VersionedData; @@ -1033,6 +1045,7 @@ public: } TransactionTagCounter transactionTagCounter; + BusiestWriteTagContext busiestWriteTagContext; Optional latencyBandConfig; @@ -1241,7 +1254,8 @@ public: serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM), instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false), versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0), - lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()), counters(this), + lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()), + busiestWriteTagContext(ssi.id()), counters(this), storageServerSourceTLogIDEventHolder( makeReference(ssi.id().toString() + "/StorageServerSourceTLogID")) { version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id); @@ -10090,6 +10104,35 @@ ACTOR Future storageServerCore(StorageServer* self, StorageServerInterface when(FetchCheckpointRequest req = waitNext(ssi.fetchCheckpoint.getFuture())) { self->actors.add(fetchCheckpointQ(self, req)); } + when(UpdateCommitCostRequest req = waitNext(ssi.updateCommitCostRequest.getFuture())) { + // Ratekeeper might change with a new ID. In this case, always accept the data. + if (req.ratekeeperID != self->busiestWriteTagContext.ratekeeperID) { + TraceEvent("RatekeeperIDChange") + .detail("OldID", self->busiestWriteTagContext.ratekeeperID) + .detail("OldLastUpdateTime", self->busiestWriteTagContext.lastUpdateTime) + .detail("NewID", req.ratekeeperID) + .detail("LastUpdateTime", req.postTime); + self->busiestWriteTagContext.ratekeeperID = req.ratekeeperID; + self->busiestWriteTagContext.lastUpdateTime = -1; + } + // In case we received an old request/duplicate request, due to, e.g. network problem + ASSERT(req.postTime > 0); + if (req.postTime < self->busiestWriteTagContext.lastUpdateTime) { + continue; + } + + self->busiestWriteTagContext.lastUpdateTime = req.postTime; + TraceEvent("BusiestWriteTag", self->thisServerID) + .detail("Elapsed", req.elapsed) + .detail("Tag", printable(req.busiestTag)) + .detail("TagOps", req.opsSum) + .detail("TagCost", req.costSum) + .detail("TotalCost", req.totalWriteCosts) + .detail("Reported", req.reported) + .trackLatest(self->busiestWriteTagContext.busiestWriteTagTrackingKey); + + req.reply.send(Void()); + } when(FetchCheckpointKeyValuesRequest req = waitNext(ssi.fetchCheckpointKeyValues.getFuture())) { self->actors.add(fetchCheckpointKeyValuesQ(self, req)); } diff --git a/flow/TLSTest.cpp b/flow/TLSTest.cpp index edf78128e5..ebbbdacd3a 100644 --- a/flow/TLSTest.cpp +++ b/flow/TLSTest.cpp @@ -20,7 +20,6 @@ #include #include -#include #include #include #include diff --git a/flow/include/flow/OwningResource.h b/flow/include/flow/OwningResource.h new file mode 100644 index 0000000000..9635129733 --- /dev/null +++ b/flow/include/flow/OwningResource.h @@ -0,0 +1,155 @@ +/* + * OwningResource.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLOW_OWNING_REOSURCE_H +#define FLOW_OWNING_REOSURCE_H + +#include "flow/FastRef.h" + +// Consider the following situation: +// +// 1. An ACTOR A0 allocates an object O +// 2. A0 spawns another ACTOR A1, which depends on O +// 3. A0 triggers A1 and then terminates, destroying O +// 4. Since A1 is triggered by A0 while not knowing A0 is terminated and O is released, it would cause a SEGV error. +// +// In this header file, two classes +// +// * ResourceOwningRef +// * ResourceWeakRef +// +// are provided. The ResourceOwningRef is the reference that "holds" the resource, When it is destructed, the resource +// is also released; while ResourceWeakRef is the reference that "weakly holds" the resource. Before each access, it is +// the user's responsibility to verify if the resource is still available, via the available() method. +// +// With the two classes, the issue above can be solved by: +// +// 1. A0 allocates the object O via ResourceOwningRef +// 2. A0 forwards O to A1, via ResourceWeakRef +// 3. Every time A1 accesses O, it will verify if the resource is still available. +// 4. When A0 terminates, O is released and all ResourceWeakRef available() call will report the resource is not +// available anymore, preventing the SEGV error being raised. + +namespace details { + +// The class holding the pointer to the resource. +// SOMEDAY: Think using std::unique_ptr +template +struct Resource : public ReferenceCounted>, NonCopyable { + T* resource; + + Resource(T* resource_) : resource(resource_) {} + ~Resource() { delete resource; } + + void reset(T* resource_) { + delete resource; + resource = resource_; + } +}; + +template +class ResourceRef { +protected: + Reference> resourceRef; + + ResourceRef(const Reference>& ref) : resourceRef(ref) {} + ResourceRef(Reference>&& ref) : resourceRef(std::move(ref)) {} + ResourceRef& operator=(const Reference>& ref) { + resourceRef = ref.resourceRef; + return *this; + } + ResourceRef& operator=(Reference>&& ref) { + resourceRef = std::move(ref); + return *this; + } + + virtual ~ResourceRef() {} + +public: + // Retrieves the resource as a pointer + T* operator->() const noexcept { return resourceRef->resource; } + + // Retrieves the resource as a reference + T& operator*() const { + if (!available()) { + throw internal_error(); + } else { + return *(resourceRef->resource); + } + } + + // Returns true if the resource is available, i.e. not nullptr + bool available() const noexcept { return resourceRef->resource != nullptr; } +}; + +} // namespace details + +// The class that holds a Reference to the details::Resource which holds the real object. If the instance is destroyed, +// the object is destroyed, too. +template +class ResourceOwningRef : public details::ResourceRef, NonCopyable { + template + friend class ResourceWeakRef; + + template + friend class ActorWeakSelfRef; + +public: + ResourceOwningRef(T* resource) : details::ResourceRef(makeReference>(resource)) {} + virtual ~ResourceOwningRef() { details::ResourceRef::resourceRef->reset(nullptr); } +}; + +// The class that weakly holds a Reference to the details::Resource. Destroying the reference will have no impact to the +// real object. On the other hand, each time accessing the object requires a verification that the object is still alive +template +class ResourceWeakRef : public details::ResourceRef { +public: + ResourceWeakRef(const ResourceOwningRef& ref) : details::ResourceRef(ref.resourceRef) {} + ResourceWeakRef(const ResourceWeakRef& ref) : details::ResourceRef(ref.resourceRef) {} +}; + +// A unique reference that takes the ownership of the self object. The self object is widely used as the "global" +// context of each role. +template +using ActorOwningSelfRef = ResourceOwningRef; + +// A wrapper of ResourceWeakRef, used to forward the widely used `self` pointer from the core ACTOR to other ACTORs. It +// will check the resource before returning it. If the resource is not available, an operation_cancelled error will be +// thrown to terminate the current ACTOR. +template +class ActorWeakSelfRef : public ResourceWeakRef { +public: + ActorWeakSelfRef(const ResourceOwningRef& ref) : ResourceWeakRef(ref) {} + ActorWeakSelfRef(const ResourceWeakRef& ref) : ResourceWeakRef(ref) {} + ActorWeakSelfRef(const ActorWeakSelfRef& ref) + : ResourceWeakRef(static_cast&>(ref)) {} + + // Retrieves the resource as a pointer, throws operation_cancelled if the resource is not available + T* operator->() const { + if (!ResourceWeakRef::available()) + throw operation_cancelled(); + return ResourceWeakRef::resourceRef->resource; + } + + // Gets the reference to the resource, Throws operation_cancelled if the resource is not available + T& operator*() const { return *(this->operator->()); } +}; + +#endif // FLOW_OWNING_REOSURCE_H \ No newline at end of file