Merge pull request #7431 from xis19/main
Let the storage server reports busiest write tag
This commit is contained in:
commit
84d483605b
|
@ -22,7 +22,6 @@
|
|||
#define FDBCLIENT_STORAGESERVERINTERFACE_H
|
||||
#pragma once
|
||||
|
||||
#include <ostream>
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/StorageCheckpoint.h"
|
||||
#include "fdbclient/StorageServerShard.h"
|
||||
|
@ -55,6 +54,34 @@ struct VersionReply {
|
|||
}
|
||||
};
|
||||
|
||||
// This struct is used by RK to forward the commit cost to SS, see discussion in #7258
|
||||
struct UpdateCommitCostRequest {
|
||||
constexpr static FileIdentifier file_identifier = 4159439;
|
||||
|
||||
// Ratekeeper ID, it is only reasonable to compare postTime from the same Ratekeeper
|
||||
UID ratekeeperID;
|
||||
|
||||
// The time the request being posted
|
||||
double postTime;
|
||||
|
||||
double elapsed;
|
||||
TransactionTag busiestTag;
|
||||
|
||||
// Properties that are defined in TransactionCommitCostEstimation
|
||||
int opsSum;
|
||||
uint64_t costSum;
|
||||
|
||||
uint64_t totalWriteCosts;
|
||||
bool reported;
|
||||
|
||||
ReplyPromise<Void> reply;
|
||||
|
||||
template <typename Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, ratekeeperID, postTime, elapsed, busiestTag, opsSum, costSum, totalWriteCosts, reported, reply);
|
||||
}
|
||||
};
|
||||
|
||||
struct StorageServerInterface {
|
||||
constexpr static FileIdentifier file_identifier = 15302073;
|
||||
enum { BUSY_ALLOWED = 0, BUSY_FORCE = 1, BUSY_LOCAL = 2 };
|
||||
|
@ -94,6 +121,8 @@ struct StorageServerInterface {
|
|||
RequestStream<struct FetchCheckpointRequest> fetchCheckpoint;
|
||||
RequestStream<struct FetchCheckpointKeyValuesRequest> fetchCheckpointKeyValues;
|
||||
|
||||
RequestStream<struct UpdateCommitCostRequest> updateCommitCostRequest;
|
||||
|
||||
private:
|
||||
bool acceptingRequests;
|
||||
|
||||
|
@ -163,6 +192,8 @@ public:
|
|||
RequestStream<struct FetchCheckpointRequest>(getValue.getEndpoint().getAdjustedEndpoint(20));
|
||||
fetchCheckpointKeyValues = RequestStream<struct FetchCheckpointKeyValuesRequest>(
|
||||
getValue.getEndpoint().getAdjustedEndpoint(21));
|
||||
updateCommitCostRequest =
|
||||
RequestStream<struct UpdateCommitCostRequest>(getValue.getEndpoint().getAdjustedEndpoint(22));
|
||||
}
|
||||
} else {
|
||||
ASSERT(Ar::isDeserializing);
|
||||
|
@ -213,6 +244,7 @@ public:
|
|||
streams.push_back(checkpoint.getReceiver());
|
||||
streams.push_back(fetchCheckpoint.getReceiver());
|
||||
streams.push_back(fetchCheckpointKeyValues.getReceiver());
|
||||
streams.push_back(updateCommitCostRequest.getReceiver());
|
||||
FlowTransport::transport().addEndpoints(streams);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include "fdbserver/Ratekeeper.h"
|
||||
#include "fdbserver/TagThrottler.h"
|
||||
#include "fdbserver/WaitFailure.h"
|
||||
#include "flow/OwningResource.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // must be last include
|
||||
|
||||
|
@ -147,9 +148,9 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> trackStorageServerQueueInfo(Ratekeeper* self, StorageServerInterface ssi) {
|
||||
self->storageQueueInfo.insert(mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality)));
|
||||
state Map<UID, StorageQueueInfo>::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id());
|
||||
ACTOR static Future<Void> trackStorageServerQueueInfo(ActorWeakSelfRef<Ratekeeper> self,
|
||||
StorageServerInterface ssi) {
|
||||
self->storageQueueInfo.insert(mapPair(ssi.id(), StorageQueueInfo(self->id, ssi.id(), ssi.locality)));
|
||||
TraceEvent("RkTracking", self->id)
|
||||
.detail("StorageServer", ssi.id())
|
||||
.detail("Locality", ssi.locality.toString());
|
||||
|
@ -157,6 +158,7 @@ public:
|
|||
loop {
|
||||
ErrorOr<StorageQueuingMetricsReply> reply = wait(ssi.getQueuingMetrics.getReplyUnlessFailedFor(
|
||||
StorageQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply?
|
||||
Map<UID, StorageQueueInfo>::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id());
|
||||
if (reply.present()) {
|
||||
myQueueInfo->value.update(reply.get(), self->smoothTotalDurableBytes);
|
||||
myQueueInfo->value.acceptingRequests = ssi.isAcceptingRequests();
|
||||
|
@ -173,7 +175,8 @@ public:
|
|||
}
|
||||
} catch (...) {
|
||||
// including cancellation
|
||||
self->storageQueueInfo.erase(myQueueInfo);
|
||||
self->storageQueueInfo.erase(ssi.id());
|
||||
self->storageServerInterfaces.erase(ssi.id());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
@ -207,28 +210,40 @@ public:
|
|||
}
|
||||
|
||||
ACTOR static Future<Void> trackEachStorageServer(
|
||||
Ratekeeper* self,
|
||||
ActorWeakSelfRef<Ratekeeper> self,
|
||||
FutureStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges) {
|
||||
state Map<UID, Future<Void>> actors;
|
||||
|
||||
state std::unordered_map<UID, Future<Void>> storageServerTrackers;
|
||||
state Promise<Void> err;
|
||||
|
||||
loop choose {
|
||||
when(state std::pair<UID, Optional<StorageServerInterface>> change = waitNext(serverChanges)) {
|
||||
wait(delay(0)); // prevent storageServerTracker from getting cancelled while on the call stack
|
||||
|
||||
const UID& id = change.first;
|
||||
if (change.second.present()) {
|
||||
if (!change.second.get().isTss()) {
|
||||
auto& a = actors[change.first];
|
||||
|
||||
auto& a = storageServerTrackers[change.first];
|
||||
a = Future<Void>();
|
||||
a = splitError(trackStorageServerQueueInfo(self, change.second.get()), err);
|
||||
|
||||
self->storageServerInterfaces[id] = change.second.get();
|
||||
}
|
||||
} else
|
||||
actors.erase(change.first);
|
||||
} else {
|
||||
storageServerTrackers.erase(id);
|
||||
|
||||
self->storageServerInterfaces.erase(id);
|
||||
}
|
||||
}
|
||||
when(wait(err.getFuture())) {}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
state Ratekeeper self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True));
|
||||
state ActorOwningSelfRef<Ratekeeper> pSelf(
|
||||
new Ratekeeper(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)));
|
||||
state Ratekeeper& self = *pSelf;
|
||||
state Future<Void> timeout = Void();
|
||||
state std::vector<Future<Void>> tlogTrackers;
|
||||
state std::vector<TLogInterface> tlogInterfs;
|
||||
|
@ -241,7 +256,7 @@ public:
|
|||
|
||||
PromiseStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges;
|
||||
self.addActor.send(self.monitorServerListChange(serverChanges));
|
||||
self.addActor.send(self.trackEachStorageServer(serverChanges.getFuture()));
|
||||
self.addActor.send(RatekeeperImpl::trackEachStorageServer(pSelf, serverChanges.getFuture()));
|
||||
self.addActor.send(traceRole(Role::RATEKEEPER, rkInterf.id()));
|
||||
|
||||
self.addActor.send(self.monitorThrottlingChanges());
|
||||
|
@ -367,17 +382,23 @@ public:
|
|||
|
||||
ACTOR static Future<Void> refreshStorageServerCommitCosts(Ratekeeper* self) {
|
||||
state double lastBusiestCommitTagPick;
|
||||
state std::vector<Future<Void>> replies;
|
||||
loop {
|
||||
lastBusiestCommitTagPick = now();
|
||||
wait(delay(SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL));
|
||||
|
||||
replies.clear();
|
||||
|
||||
double elapsed = now() - lastBusiestCommitTagPick;
|
||||
// for each SS, select the busiest commit tag from ssTrTagCommitCost
|
||||
for (auto& [ssId, ssQueueInfo] : self->storageQueueInfo) {
|
||||
ssQueueInfo.refreshCommitCost(elapsed);
|
||||
// NOTE: In some cases, for unknown reason SS will not respond to the updateCommitCostRequest. Since the
|
||||
// information is not time-sensitive, we do not wait for the replies.
|
||||
replies.push_back(self->storageServerInterfaces[ssId].updateCommitCostRequest.getReply(
|
||||
ssQueueInfo.refreshCommitCost(elapsed)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}; // class RatekeeperImpl
|
||||
|
||||
Future<Void> Ratekeeper::configurationMonitor() {
|
||||
|
@ -389,15 +410,6 @@ Future<Void> Ratekeeper::monitorServerListChange(
|
|||
return RatekeeperImpl::monitorServerListChange(this, serverChanges);
|
||||
}
|
||||
|
||||
Future<Void> Ratekeeper::trackEachStorageServer(
|
||||
FutureStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges) {
|
||||
return RatekeeperImpl::trackEachStorageServer(this, serverChanges);
|
||||
}
|
||||
|
||||
Future<Void> Ratekeeper::trackStorageServerQueueInfo(StorageServerInterface ssi) {
|
||||
return RatekeeperImpl::trackStorageServerQueueInfo(this, ssi);
|
||||
}
|
||||
|
||||
Future<Void> Ratekeeper::trackTLogQueueInfo(TLogInterface tli) {
|
||||
return RatekeeperImpl::trackTLogQueueInfo(this, tli);
|
||||
}
|
||||
|
@ -940,17 +952,19 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
|
|||
return Void();
|
||||
}
|
||||
|
||||
StorageQueueInfo::StorageQueueInfo(UID id, LocalityData locality)
|
||||
: busiestWriteTagEventHolder(makeReference<EventCacheHolder>(id.toString() + "/BusiestWriteTag")), valid(false),
|
||||
id(id), locality(locality), acceptingRequests(false), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
|
||||
smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
limitReason(limitReason_t::unlimited) {
|
||||
StorageQueueInfo::StorageQueueInfo(const UID& ratekeeperID_, const UID& id_, const LocalityData& locality_)
|
||||
: valid(false), ratekeeperID(ratekeeperID_), id(id_), locality(locality_), acceptingRequests(false),
|
||||
smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited) {
|
||||
// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
|
||||
lastReply.instanceID = -1;
|
||||
}
|
||||
|
||||
StorageQueueInfo::StorageQueueInfo(const UID& id_, const LocalityData& locality_)
|
||||
: StorageQueueInfo(UID(), id_, locality_) {}
|
||||
|
||||
void StorageQueueInfo::addCommitCost(TransactionTagRef tagName, TransactionCommitCostEstimation const& cost) {
|
||||
tagCostEst[tagName] += cost;
|
||||
totalWriteCosts += cost.getCostSum();
|
||||
|
@ -983,7 +997,7 @@ void StorageQueueInfo::update(StorageQueuingMetricsReply const& reply, Smoother&
|
|||
busiestReadTags = reply.busiestTags;
|
||||
}
|
||||
|
||||
void StorageQueueInfo::refreshCommitCost(double elapsed) {
|
||||
UpdateCommitCostRequest StorageQueueInfo::refreshCommitCost(double elapsed) {
|
||||
busiestWriteTags.clear();
|
||||
TransactionTag busiestTag;
|
||||
TransactionCommitCostEstimation maxCost;
|
||||
|
@ -1003,19 +1017,22 @@ void StorageQueueInfo::refreshCommitCost(double elapsed) {
|
|||
busiestWriteTags.emplace_back(busiestTag, maxRate, maxBusyness);
|
||||
}
|
||||
|
||||
TraceEvent("BusiestWriteTag", id)
|
||||
.detail("Elapsed", elapsed)
|
||||
.detail("Tag", printable(busiestTag))
|
||||
.detail("TagOps", maxCost.getOpsSum())
|
||||
.detail("TagCost", maxCost.getCostSum())
|
||||
.detail("TotalCost", totalWriteCosts)
|
||||
.detail("Reported", !busiestWriteTags.empty())
|
||||
.trackLatest(busiestWriteTagEventHolder->trackingKey);
|
||||
UpdateCommitCostRequest updateCommitCostRequest{ ratekeeperID,
|
||||
now(),
|
||||
elapsed,
|
||||
busiestTag,
|
||||
maxCost.getOpsSum(),
|
||||
maxCost.getCostSum(),
|
||||
totalWriteCosts,
|
||||
!busiestWriteTags.empty(),
|
||||
ReplyPromise<Void>() };
|
||||
|
||||
// reset statistics
|
||||
tagCostEst.clear();
|
||||
totalWriteOps = 0;
|
||||
totalWriteCosts = 0;
|
||||
|
||||
return updateCommitCostRequest;
|
||||
}
|
||||
|
||||
Optional<double> StorageQueueInfo::getThrottlingRatio(int64_t storageTargetBytes, int64_t storageSpringBytes) const {
|
||||
|
|
|
@ -1900,27 +1900,6 @@ static Future<std::vector<std::pair<iface, EventMap>>> getServerMetrics(
|
|||
return results;
|
||||
}
|
||||
|
||||
ACTOR template <class iface>
|
||||
static Future<std::vector<TraceEventFields>> getServerBusiestWriteTags(
|
||||
std::vector<iface> servers,
|
||||
std::unordered_map<NetworkAddress, WorkerInterface> address_workers,
|
||||
WorkerDetails rkWorker) {
|
||||
state std::vector<Future<Optional<TraceEventFields>>> futures;
|
||||
futures.reserve(servers.size());
|
||||
for (const auto& s : servers) {
|
||||
futures.push_back(latestEventOnWorker(rkWorker.interf, s.id().toString() + "/BusiestWriteTag"));
|
||||
}
|
||||
wait(waitForAll(futures));
|
||||
|
||||
std::vector<TraceEventFields> result(servers.size());
|
||||
for (int i = 0; i < servers.size(); ++i) {
|
||||
if (futures[i].get().present()) {
|
||||
result[i] = futures[i].get().get();
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
ACTOR
|
||||
static Future<std::vector<StorageServerStatusInfo>> readStorageInterfaceAndMetadata(Database cx,
|
||||
bool use_system_priority) {
|
||||
|
@ -1958,6 +1937,16 @@ static Future<std::vector<StorageServerStatusInfo>> readStorageInterfaceAndMetad
|
|||
return servers;
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
const std::vector<std::string> STORAGE_SERVER_METRICS_LIST{ "StorageMetrics",
|
||||
"ReadLatencyMetrics",
|
||||
"ReadLatencyBands",
|
||||
"BusiestReadTag",
|
||||
"BusiestWriteTag" };
|
||||
|
||||
} // namespace
|
||||
|
||||
ACTOR static Future<std::vector<StorageServerStatusInfo>> getStorageServerStatusInfos(
|
||||
Database cx,
|
||||
std::unordered_map<NetworkAddress, WorkerInterface> address_workers,
|
||||
|
@ -1965,18 +1954,9 @@ ACTOR static Future<std::vector<StorageServerStatusInfo>> getStorageServerStatus
|
|||
state std::vector<StorageServerStatusInfo> servers =
|
||||
wait(timeoutError(readStorageInterfaceAndMetadata(cx, true), 5.0));
|
||||
state std::vector<std::pair<StorageServerStatusInfo, EventMap>> results;
|
||||
state std::vector<TraceEventFields> busiestWriteTags;
|
||||
wait(store(results,
|
||||
getServerMetrics(servers,
|
||||
address_workers,
|
||||
std::vector<std::string>{
|
||||
"StorageMetrics", "ReadLatencyMetrics", "ReadLatencyBands", "BusiestReadTag" })) &&
|
||||
store(busiestWriteTags, getServerBusiestWriteTags(servers, address_workers, rkWorker)));
|
||||
|
||||
ASSERT(busiestWriteTags.size() == results.size());
|
||||
wait(store(results, getServerMetrics(servers, address_workers, STORAGE_SERVER_METRICS_LIST)));
|
||||
for (int i = 0; i < results.size(); ++i) {
|
||||
servers[i].eventMap = std::move(results[i].second);
|
||||
servers[i].eventMap.emplace("BusiestWriteTag", busiestWriteTags[i]);
|
||||
}
|
||||
return servers;
|
||||
}
|
||||
|
|
|
@ -18,6 +18,9 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBSERVER_RATEKEEPER_H
|
||||
#define FDBSERVER_RATEKEEPER_H
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/DatabaseConfiguration.h"
|
||||
|
@ -49,13 +52,13 @@ enum limitReason_t {
|
|||
class StorageQueueInfo {
|
||||
uint64_t totalWriteCosts{ 0 };
|
||||
int totalWriteOps{ 0 };
|
||||
Reference<EventCacheHolder> busiestWriteTagEventHolder;
|
||||
|
||||
// refresh periodically
|
||||
TransactionTagMap<TransactionCommitCostEstimation> tagCostEst;
|
||||
|
||||
public:
|
||||
bool valid;
|
||||
UID ratekeeperID;
|
||||
UID id;
|
||||
LocalityData locality;
|
||||
StorageQueuingMetricsReply lastReply;
|
||||
|
@ -67,8 +70,10 @@ public:
|
|||
limitReason_t limitReason;
|
||||
std::vector<StorageQueuingMetricsReply::TagInfo> busiestReadTags, busiestWriteTags;
|
||||
|
||||
StorageQueueInfo(UID id, LocalityData locality);
|
||||
void refreshCommitCost(double elapsed);
|
||||
StorageQueueInfo(const UID& id, const LocalityData& locality);
|
||||
StorageQueueInfo(const UID& rateKeeperID, const UID& id, const LocalityData& locality);
|
||||
// Summarizes up the commit cost per storage server. Returns the UpdateCommitCostRequest for corresponding SS.
|
||||
UpdateCommitCostRequest refreshCommitCost(double elapsed);
|
||||
int64_t getStorageQueueBytes() const { return lastReply.bytesInput - smoothDurableBytes.smoothTotal(); }
|
||||
int64_t getDurabilityLag() const { return smoothLatestVersion.smoothTotal() - smoothDurableVersion.smoothTotal(); }
|
||||
void update(StorageQueuingMetricsReply const&, Smoother& smoothTotalDurableBytes);
|
||||
|
@ -153,6 +158,9 @@ class Ratekeeper {
|
|||
|
||||
std::unique_ptr<class ITagThrottler> tagThrottler;
|
||||
|
||||
// Maps storage server ID to storage server interface
|
||||
std::unordered_map<UID, StorageServerInterface> storageServerInterfaces;
|
||||
|
||||
RatekeeperLimits normalLimits;
|
||||
RatekeeperLimits batchLimits;
|
||||
|
||||
|
@ -166,7 +174,6 @@ class Ratekeeper {
|
|||
void updateRate(RatekeeperLimits* limits);
|
||||
Future<Void> refreshStorageServerCommitCosts();
|
||||
Future<Void> monitorServerListChange(PromiseStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges);
|
||||
Future<Void> trackEachStorageServer(FutureStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges);
|
||||
|
||||
// SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function
|
||||
Future<Void> trackStorageServerQueueInfo(StorageServerInterface);
|
||||
|
@ -179,3 +186,5 @@ class Ratekeeper {
|
|||
public:
|
||||
static Future<Void> run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo);
|
||||
};
|
||||
|
||||
#endif // FDBSERVER_RATEKEEPER_H
|
|
@ -51,6 +51,8 @@ struct RatekeeperInterface {
|
|||
};
|
||||
|
||||
struct TransactionCommitCostEstimation {
|
||||
// NOTE: If class variables are changed, counterparts in StorageServerInterface.h:UpdateCommitCostRequest should be
|
||||
// updated too.
|
||||
int opsSum = 0;
|
||||
uint64_t costSum = 0;
|
||||
|
||||
|
|
|
@ -69,6 +69,7 @@
|
|||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbserver/MutationTracking.h"
|
||||
#include "fdbserver/OTELSpanContextMessage.h"
|
||||
#include "fdbserver/Ratekeeper.h"
|
||||
#include "fdbserver/RecoveryState.h"
|
||||
#include "fdbserver/RocksDBCheckpointUtils.actor.h"
|
||||
#include "fdbserver/ServerCheckpoint.actor.h"
|
||||
|
@ -635,6 +636,17 @@ public:
|
|||
: key(key), value(value), version(version), tags(tags), debugID(debugID) {}
|
||||
};
|
||||
|
||||
struct BusiestWriteTagContext {
|
||||
const std::string busiestWriteTagTrackingKey;
|
||||
UID ratekeeperID;
|
||||
Reference<EventCacheHolder> busiestWriteTagEventHolder;
|
||||
double lastUpdateTime;
|
||||
|
||||
BusiestWriteTagContext(const UID& thisServerID)
|
||||
: busiestWriteTagTrackingKey(thisServerID.toString() + "/BusiestWriteTag"), ratekeeperID(UID()),
|
||||
busiestWriteTagEventHolder(makeReference<EventCacheHolder>(busiestWriteTagTrackingKey)), lastUpdateTime(-1) {}
|
||||
};
|
||||
|
||||
struct StorageServer {
|
||||
typedef VersionedMap<KeyRef, ValueOrClearToRef> VersionedData;
|
||||
|
||||
|
@ -1033,6 +1045,7 @@ public:
|
|||
}
|
||||
|
||||
TransactionTagCounter transactionTagCounter;
|
||||
BusiestWriteTagContext busiestWriteTagContext;
|
||||
|
||||
Optional<LatencyBandConfig> latencyBandConfig;
|
||||
|
||||
|
@ -1241,7 +1254,8 @@ public:
|
|||
serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM),
|
||||
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
|
||||
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0),
|
||||
lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()), counters(this),
|
||||
lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()),
|
||||
busiestWriteTagContext(ssi.id()), counters(this),
|
||||
storageServerSourceTLogIDEventHolder(
|
||||
makeReference<EventCacheHolder>(ssi.id().toString() + "/StorageServerSourceTLogID")) {
|
||||
version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id);
|
||||
|
@ -10090,6 +10104,35 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
|
|||
when(FetchCheckpointRequest req = waitNext(ssi.fetchCheckpoint.getFuture())) {
|
||||
self->actors.add(fetchCheckpointQ(self, req));
|
||||
}
|
||||
when(UpdateCommitCostRequest req = waitNext(ssi.updateCommitCostRequest.getFuture())) {
|
||||
// Ratekeeper might change with a new ID. In this case, always accept the data.
|
||||
if (req.ratekeeperID != self->busiestWriteTagContext.ratekeeperID) {
|
||||
TraceEvent("RatekeeperIDChange")
|
||||
.detail("OldID", self->busiestWriteTagContext.ratekeeperID)
|
||||
.detail("OldLastUpdateTime", self->busiestWriteTagContext.lastUpdateTime)
|
||||
.detail("NewID", req.ratekeeperID)
|
||||
.detail("LastUpdateTime", req.postTime);
|
||||
self->busiestWriteTagContext.ratekeeperID = req.ratekeeperID;
|
||||
self->busiestWriteTagContext.lastUpdateTime = -1;
|
||||
}
|
||||
// In case we received an old request/duplicate request, due to, e.g. network problem
|
||||
ASSERT(req.postTime > 0);
|
||||
if (req.postTime < self->busiestWriteTagContext.lastUpdateTime) {
|
||||
continue;
|
||||
}
|
||||
|
||||
self->busiestWriteTagContext.lastUpdateTime = req.postTime;
|
||||
TraceEvent("BusiestWriteTag", self->thisServerID)
|
||||
.detail("Elapsed", req.elapsed)
|
||||
.detail("Tag", printable(req.busiestTag))
|
||||
.detail("TagOps", req.opsSum)
|
||||
.detail("TagCost", req.costSum)
|
||||
.detail("TotalCost", req.totalWriteCosts)
|
||||
.detail("Reported", req.reported)
|
||||
.trackLatest(self->busiestWriteTagContext.busiestWriteTagTrackingKey);
|
||||
|
||||
req.reply.send(Void());
|
||||
}
|
||||
when(FetchCheckpointKeyValuesRequest req = waitNext(ssi.fetchCheckpointKeyValues.getFuture())) {
|
||||
self->actors.add(fetchCheckpointKeyValuesQ(self, req));
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/asio/ssl.hpp>
|
||||
#include <boost/bind.hpp>
|
||||
#include <fmt/format.h>
|
||||
#include <algorithm>
|
||||
#include <iostream>
|
||||
|
|
|
@ -0,0 +1,155 @@
|
|||
/*
|
||||
* OwningResource.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FLOW_OWNING_REOSURCE_H
|
||||
#define FLOW_OWNING_REOSURCE_H
|
||||
|
||||
#include "flow/FastRef.h"
|
||||
|
||||
// Consider the following situation:
|
||||
//
|
||||
// 1. An ACTOR A0 allocates an object O
|
||||
// 2. A0 spawns another ACTOR A1, which depends on O
|
||||
// 3. A0 triggers A1 and then terminates, destroying O
|
||||
// 4. Since A1 is triggered by A0 while not knowing A0 is terminated and O is released, it would cause a SEGV error.
|
||||
//
|
||||
// In this header file, two classes
|
||||
//
|
||||
// * ResourceOwningRef
|
||||
// * ResourceWeakRef
|
||||
//
|
||||
// are provided. The ResourceOwningRef is the reference that "holds" the resource, When it is destructed, the resource
|
||||
// is also released; while ResourceWeakRef is the reference that "weakly holds" the resource. Before each access, it is
|
||||
// the user's responsibility to verify if the resource is still available, via the available() method.
|
||||
//
|
||||
// With the two classes, the issue above can be solved by:
|
||||
//
|
||||
// 1. A0 allocates the object O via ResourceOwningRef
|
||||
// 2. A0 forwards O to A1, via ResourceWeakRef
|
||||
// 3. Every time A1 accesses O, it will verify if the resource is still available.
|
||||
// 4. When A0 terminates, O is released and all ResourceWeakRef available() call will report the resource is not
|
||||
// available anymore, preventing the SEGV error being raised.
|
||||
|
||||
namespace details {
|
||||
|
||||
// The class holding the pointer to the resource.
|
||||
// SOMEDAY: Think using std::unique_ptr
|
||||
template <typename T>
|
||||
struct Resource : public ReferenceCounted<Resource<T>>, NonCopyable {
|
||||
T* resource;
|
||||
|
||||
Resource(T* resource_) : resource(resource_) {}
|
||||
~Resource() { delete resource; }
|
||||
|
||||
void reset(T* resource_) {
|
||||
delete resource;
|
||||
resource = resource_;
|
||||
}
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
class ResourceRef {
|
||||
protected:
|
||||
Reference<Resource<T>> resourceRef;
|
||||
|
||||
ResourceRef(const Reference<Resource<T>>& ref) : resourceRef(ref) {}
|
||||
ResourceRef(Reference<Resource<T>>&& ref) : resourceRef(std::move(ref)) {}
|
||||
ResourceRef& operator=(const Reference<Resource<T>>& ref) {
|
||||
resourceRef = ref.resourceRef;
|
||||
return *this;
|
||||
}
|
||||
ResourceRef& operator=(Reference<Resource<T>>&& ref) {
|
||||
resourceRef = std::move(ref);
|
||||
return *this;
|
||||
}
|
||||
|
||||
virtual ~ResourceRef() {}
|
||||
|
||||
public:
|
||||
// Retrieves the resource as a pointer
|
||||
T* operator->() const noexcept { return resourceRef->resource; }
|
||||
|
||||
// Retrieves the resource as a reference
|
||||
T& operator*() const {
|
||||
if (!available()) {
|
||||
throw internal_error();
|
||||
} else {
|
||||
return *(resourceRef->resource);
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if the resource is available, i.e. not nullptr
|
||||
bool available() const noexcept { return resourceRef->resource != nullptr; }
|
||||
};
|
||||
|
||||
} // namespace details
|
||||
|
||||
// The class that holds a Reference to the details::Resource which holds the real object. If the instance is destroyed,
|
||||
// the object is destroyed, too.
|
||||
template <typename T>
|
||||
class ResourceOwningRef : public details::ResourceRef<T>, NonCopyable {
|
||||
template <typename U>
|
||||
friend class ResourceWeakRef;
|
||||
|
||||
template <typename U>
|
||||
friend class ActorWeakSelfRef;
|
||||
|
||||
public:
|
||||
ResourceOwningRef(T* resource) : details::ResourceRef<T>(makeReference<details::Resource<T>>(resource)) {}
|
||||
virtual ~ResourceOwningRef() { details::ResourceRef<T>::resourceRef->reset(nullptr); }
|
||||
};
|
||||
|
||||
// The class that weakly holds a Reference to the details::Resource. Destroying the reference will have no impact to the
|
||||
// real object. On the other hand, each time accessing the object requires a verification that the object is still alive
|
||||
template <typename T>
|
||||
class ResourceWeakRef : public details::ResourceRef<T> {
|
||||
public:
|
||||
ResourceWeakRef(const ResourceOwningRef<T>& ref) : details::ResourceRef<T>(ref.resourceRef) {}
|
||||
ResourceWeakRef(const ResourceWeakRef& ref) : details::ResourceRef<T>(ref.resourceRef) {}
|
||||
};
|
||||
|
||||
// A unique reference that takes the ownership of the self object. The self object is widely used as the "global"
|
||||
// context of each role.
|
||||
template <typename T>
|
||||
using ActorOwningSelfRef = ResourceOwningRef<T>;
|
||||
|
||||
// A wrapper of ResourceWeakRef, used to forward the widely used `self` pointer from the core ACTOR to other ACTORs. It
|
||||
// will check the resource before returning it. If the resource is not available, an operation_cancelled error will be
|
||||
// thrown to terminate the current ACTOR.
|
||||
template <typename T>
|
||||
class ActorWeakSelfRef : public ResourceWeakRef<T> {
|
||||
public:
|
||||
ActorWeakSelfRef(const ResourceOwningRef<T>& ref) : ResourceWeakRef<T>(ref) {}
|
||||
ActorWeakSelfRef(const ResourceWeakRef<T>& ref) : ResourceWeakRef<T>(ref) {}
|
||||
ActorWeakSelfRef(const ActorWeakSelfRef<T>& ref)
|
||||
: ResourceWeakRef<T>(static_cast<const ResourceWeakRef<T>&>(ref)) {}
|
||||
|
||||
// Retrieves the resource as a pointer, throws operation_cancelled if the resource is not available
|
||||
T* operator->() const {
|
||||
if (!ResourceWeakRef<T>::available())
|
||||
throw operation_cancelled();
|
||||
return ResourceWeakRef<T>::resourceRef->resource;
|
||||
}
|
||||
|
||||
// Gets the reference to the resource, Throws operation_cancelled if the resource is not available
|
||||
T& operator*() const { return *(this->operator->()); }
|
||||
};
|
||||
|
||||
#endif // FLOW_OWNING_REOSURCE_H
|
Loading…
Reference in New Issue