Make the storage metrics function tenant aware [WIP]

This change makes the getEstimatedRangeSizeBytes function tenant aware.
Previously, this function would return the size of the requested
keyspace even if the tenant in the Transaction or DatabaseContext did
not match the tenant corresponding to the keyspace.

Also make some improvements to the new workload.
This commit is contained in:
Ankita Kejriwal 2022-09-01 18:28:37 -07:00
parent 85ba47f4c8
commit f63934117d
7 changed files with 100 additions and 53 deletions

View File

@ -32,6 +32,7 @@
#include <vector>
#include "boost/algorithm/string.hpp"
#include "fdbrpc/TenantInfo.h"
#include "fmt/format.h"
#include "fdbclient/FDBOptions.g.h"
@ -6036,7 +6037,7 @@ ACTOR Future<Optional<ClientTrCommitCostEstimation>> estimateCommitCosts(Referen
trCommitCosts.opsCount++;
keyRange = KeyRangeRef(it->param1, it->param2);
if (trState->options.expensiveClearCostEstimation) {
StorageMetrics m = wait(trState->cx->getStorageMetrics(keyRange, CLIENT_KNOBS->TOO_MANY));
StorageMetrics m = wait(trState->cx->getStorageMetrics(keyRange, CLIENT_KNOBS->TOO_MANY, trState));
trCommitCosts.clearIdxCosts.emplace_back(i, getWriteOperationCost(m.bytes));
trCommitCosts.writeCosts += getWriteOperationCost(m.bytes);
++trCommitCosts.expensiveCostEstCount;
@ -7241,12 +7242,18 @@ Future<Void> Transaction::onError(Error const& e) {
return e;
}
ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx, KeyRange keys);
ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx,
KeyRange keys,
Future<TenantInfo> tenantInfoFuture);
ACTOR Future<StorageMetrics> doGetStorageMetrics(Database cx, KeyRange keys, Reference<LocationInfo> locationInfo) {
ACTOR Future<StorageMetrics> doGetStorageMetrics(Database cx,
KeyRange keys,
Reference<LocationInfo> locationInfo,
Future<TenantInfo> tenantInfoFuture) {
state TenantInfo tenantInfo = wait(tenantInfoFuture);
loop {
try {
WaitMetricsRequest req(keys, StorageMetrics(), StorageMetrics());
WaitMetricsRequest req(tenantInfo, keys, StorageMetrics(), StorageMetrics());
req.min.bytes = 0;
req.max.bytes = -1;
StorageMetrics m = wait(loadBalance(
@ -7259,16 +7266,19 @@ ACTOR Future<StorageMetrics> doGetStorageMetrics(Database cx, KeyRange keys, Ref
}
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskPriority::DataDistribution));
cx->invalidateCache(Key(), keys);
StorageMetrics m = wait(getStorageMetricsLargeKeyRange(cx, keys));
StorageMetrics m = wait(getStorageMetricsLargeKeyRange(cx, keys, tenantInfo));
return m;
}
}
}
ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx, KeyRange keys) {
ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx,
KeyRange keys,
Future<TenantInfo> tenantInfoFuture) {
state Span span("NAPI:GetStorageMetricsLargeKeyRange"_loc);
state TenantInfo tenantInfo = wait(tenantInfoFuture);
std::vector<KeyRangeLocationInfo> locations = wait(getKeyRangeLocations(cx,
TenantInfo(),
tenantInfo,
keys,
std::numeric_limits<int>::max(),
Reverse::False,
@ -7284,7 +7294,7 @@ ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx, KeyRang
for (int i = 0; i < nLocs; i++) {
partBegin = (i == 0) ? keys.begin : locations[i].range.begin;
partEnd = (i == nLocs - 1) ? keys.end : locations[i].range.end;
fx[i] = doGetStorageMetrics(cx, KeyRangeRef(partBegin, partEnd), locations[i].locations);
fx[i] = doGetStorageMetrics(cx, KeyRangeRef(partBegin, partEnd), locations[i].locations, tenantInfo);
}
wait(waitForAll(fx));
for (int i = 0; i < nLocs; i++) {
@ -7293,14 +7303,15 @@ ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx, KeyRang
return total;
}
ACTOR Future<Void> trackBoundedStorageMetrics(KeyRange keys,
ACTOR Future<Void> trackBoundedStorageMetrics(TenantInfo tenantInfo,
KeyRange keys,
Reference<LocationInfo> location,
StorageMetrics x,
StorageMetrics halfError,
PromiseStream<StorageMetrics> deltaStream) {
try {
loop {
WaitMetricsRequest req(keys, x - halfError, x + halfError);
WaitMetricsRequest req(tenantInfo, keys, x - halfError, x + halfError);
StorageMetrics nextX = wait(loadBalance(location->locations(), &StorageServerInterface::waitMetrics, req));
deltaStream.send(nextX - x);
x = nextX;
@ -7311,7 +7322,8 @@ ACTOR Future<Void> trackBoundedStorageMetrics(KeyRange keys,
}
}
ACTOR Future<StorageMetrics> waitStorageMetricsMultipleLocations(std::vector<KeyRangeLocationInfo> locations,
ACTOR Future<StorageMetrics> waitStorageMetricsMultipleLocations(TenantInfo tenantInfo,
std::vector<KeyRangeLocationInfo> locations,
StorageMetrics min,
StorageMetrics max,
StorageMetrics permittedError) {
@ -7325,7 +7337,7 @@ ACTOR Future<StorageMetrics> waitStorageMetricsMultipleLocations(std::vector<Key
state StorageMetrics minMinus = min - halfErrorPerMachine * (nLocs - 1);
for (int i = 0; i < nLocs; i++) {
WaitMetricsRequest req(locations[i].range, StorageMetrics(), StorageMetrics());
WaitMetricsRequest req(tenantInfo, locations[i].range, StorageMetrics(), StorageMetrics());
req.min.bytes = 0;
req.max.bytes = -1;
fx[i] = loadBalance(locations[i].locations->locations(),
@ -7346,7 +7358,7 @@ ACTOR Future<StorageMetrics> waitStorageMetricsMultipleLocations(std::vector<Key
for (int i = 0; i < nLocs; i++)
wx[i] = trackBoundedStorageMetrics(
locations[i].range, locations[i].locations, fx[i].get(), halfErrorPerMachine, deltas);
tenantInfo, locations[i].range, locations[i].locations, fx[i].get(), halfErrorPerMachine, deltas);
loop {
StorageMetrics delta = waitNext(deltas.getFuture());
@ -7437,11 +7449,13 @@ ACTOR Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(Databa
StorageMetrics max,
StorageMetrics permittedError,
int shardLimit,
int expectedShardCount) {
int expectedShardCount,
Future<TenantInfo> tenantInfoFuture) {
state Span span("NAPI:WaitStorageMetrics"_loc, generateSpanID(cx->transactionTracingSample));
state TenantInfo tenantInfo = wait(tenantInfoFuture);
loop {
std::vector<KeyRangeLocationInfo> locations = wait(getKeyRangeLocations(cx,
TenantInfo(),
tenantInfo,
keys,
shardLimit,
Reverse::False,
@ -7460,9 +7474,9 @@ ACTOR Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(Databa
try {
Future<StorageMetrics> fx;
if (locations.size() > 1) {
fx = waitStorageMetricsMultipleLocations(locations, min, max, permittedError);
fx = waitStorageMetricsMultipleLocations(tenantInfo, locations, min, max, permittedError);
} else {
WaitMetricsRequest req(keys, min, max);
WaitMetricsRequest req(tenantInfo, keys, min, max);
fx = loadBalance(locations[0].locations->locations(),
&StorageServerInterface::waitMetrics,
req,
@ -7496,17 +7510,26 @@ Future<std::pair<Optional<StorageMetrics>, int>> DatabaseContext::waitStorageMet
StorageMetrics const& max,
StorageMetrics const& permittedError,
int shardLimit,
int expectedShardCount) {
int expectedShardCount,
Optional<Reference<TransactionState>> trState) {
Future<TenantInfo> tenantInfoFuture =
trState.present() ? populateAndGetTenant(trState.get(), keys.begin, latestVersion) : TenantInfo();
return ::waitStorageMetrics(Database(Reference<DatabaseContext>::addRef(this)),
keys,
min,
max,
permittedError,
shardLimit,
expectedShardCount);
expectedShardCount,
tenantInfoFuture);
}
Future<StorageMetrics> DatabaseContext::getStorageMetrics(KeyRange const& keys, int shardLimit) {
Future<StorageMetrics> DatabaseContext::getStorageMetrics(KeyRange const& keys,
int shardLimit,
Optional<Reference<TransactionState>> trState) {
Future<TenantInfo> tenantInfoFuture =
trState.present() ? populateAndGetTenant(trState.get(), keys.begin, latestVersion) : TenantInfo();
if (shardLimit > 0) {
StorageMetrics m;
m.bytes = -1;
@ -7516,9 +7539,11 @@ Future<StorageMetrics> DatabaseContext::getStorageMetrics(KeyRange const& keys,
m,
StorageMetrics(),
shardLimit,
-1));
-1,
tenantInfoFuture));
} else {
return ::getStorageMetricsLargeKeyRange(Database(Reference<DatabaseContext>::addRef(this)), keys);
return ::getStorageMetricsLargeKeyRange(
Database(Reference<DatabaseContext>::addRef(this)), keys, tenantInfoFuture);
}
}

View File

@ -1764,7 +1764,13 @@ Future<int64_t> ReadYourWritesTransaction::getEstimatedRangeSizeBytes(const KeyR
if (resetPromise.isSet())
return resetPromise.getFuture().getError();
return map(waitOrError(tr.getDatabase()->getStorageMetrics(keys, -1), resetPromise.getFuture()),
TraceEvent(SevWarnAlways, "AKDebug")
.detail("Status", "getEstimatedRangeSizeBytes")
.detail("Tenant", tr.trState->hasTenant() ? tr.trState->tenant().get().toString() : "not present")
.detail("TenantIdValid", tr.trState->tenantId() != TenantInfo::INVALID_TENANT)
.detail("AuthPresent", tr.trState->authToken.present());
return map(waitOrError(tr.getDatabase()->getStorageMetrics(keys, -1, tr.trState), resetPromise.getFuture()),
[](const StorageMetrics& m) { return m.bytes; });
}
@ -2492,6 +2498,7 @@ void ReadYourWritesTransaction::setOptionImpl(FDBTransactionOptions::Option opti
validateOptionValueNotPresent(value);
options.bypassUnreadable = true;
break;
// TODO(kejriwal): Check if this is needed
case FDBTransactionOptions::AUTHORIZATION_TOKEN:
tr.setOption(option, value);
default:

View File

@ -295,13 +295,19 @@ public:
Future<Void> onProxiesChanged() const;
Future<HealthMetrics> getHealthMetrics(bool detailed);
// Pass a negative value for `shardLimit` to indicate no limit on the shard number.
Future<StorageMetrics> getStorageMetrics(KeyRange const& keys, int shardLimit);
Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(KeyRange const& keys,
StorageMetrics const& min,
StorageMetrics const& max,
StorageMetrics const& permittedError,
int shardLimit,
int expectedShardCount);
// Pass a valid `trState` with `hasTenant() == true` to make the function tenant-aware.
Future<StorageMetrics> getStorageMetrics(
KeyRange const& keys,
int shardLimit,
Optional<Reference<TransactionState>> trState = Optional<Reference<TransactionState>>());
Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(
KeyRange const& keys,
StorageMetrics const& min,
StorageMetrics const& max,
StorageMetrics const& permittedError,
int shardLimit,
int expectedShardCount,
Optional<Reference<TransactionState>> trState = Optional<Reference<TransactionState>>());
Future<Void> splitStorageMetricsStream(PromiseStream<Key> const& resultsStream,
KeyRange const& keys,
StorageMetrics const& limit,

View File

@ -102,7 +102,7 @@ struct StorageServerInterface {
PublicRequestStream<struct GetMappedKeyValuesRequest> getMappedKeyValues;
RequestStream<struct GetShardStateRequest> getShardState;
RequestStream<struct WaitMetricsRequest> waitMetrics;
PublicRequestStream<struct WaitMetricsRequest> waitMetrics;
RequestStream<struct SplitMetricsRequest> splitMetrics;
RequestStream<struct GetStorageMetricsRequest> getStorageMetrics;
RequestStream<ReplyPromise<Void>> waitFailure;
@ -160,7 +160,8 @@ public:
PublicRequestStream<struct GetKeyValuesRequest>(getValue.getEndpoint().getAdjustedEndpoint(2));
getShardState =
RequestStream<struct GetShardStateRequest>(getValue.getEndpoint().getAdjustedEndpoint(3));
waitMetrics = RequestStream<struct WaitMetricsRequest>(getValue.getEndpoint().getAdjustedEndpoint(4));
waitMetrics =
PublicRequestStream<struct WaitMetricsRequest>(getValue.getEndpoint().getAdjustedEndpoint(4));
splitMetrics = RequestStream<struct SplitMetricsRequest>(getValue.getEndpoint().getAdjustedEndpoint(5));
getStorageMetrics =
RequestStream<struct GetStorageMetricsRequest>(getValue.getEndpoint().getAdjustedEndpoint(6));
@ -715,18 +716,24 @@ struct WaitMetricsRequest {
// Waits for any of the given minimum or maximum metrics to be exceeded, and then returns the current values
// Send a reversed range for min, max to receive an immediate report
constexpr static FileIdentifier file_identifier = 1795961;
TenantInfo tenantInfo;
Arena arena;
KeyRangeRef keys;
StorageMetrics min, max;
ReplyPromise<StorageMetrics> reply;
bool verify() const { return tenantInfo.isAuthorized(); }
WaitMetricsRequest() {}
WaitMetricsRequest(KeyRangeRef const& keys, StorageMetrics const& min, StorageMetrics const& max)
: keys(arena, keys), min(min), max(max) {}
WaitMetricsRequest(TenantInfo tenantInfo,
KeyRangeRef const& keys,
StorageMetrics const& min,
StorageMetrics const& max)
: tenantInfo(tenantInfo), keys(arena, keys), min(min), max(max) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, keys, min, max, reply, arena);
serializer(ar, keys, min, max, reply, tenantInfo, arena);
}
};
@ -831,6 +838,7 @@ struct SplitRangeReply {
}
};
// TODO(kejriwal): ref
struct SplitRangeRequest {
constexpr static FileIdentifier file_identifier = 10725174;
Arena arena;

View File

@ -465,7 +465,7 @@ struct StorageServerMetrics {
req.reply.send(rep);
}
Future<Void> waitMetrics(WaitMetricsRequest req, Future<Void> delay);
Future<Void> waitMetrics(WaitMetricsRequest req, Future<Void> delay, Optional<Key> tenantPrefix);
// Given a read hot shard, this function will divide the shard into chunks and find those chunks whose
// readBytes/sizeBytes exceeds the `readDensityRatio`. Please make sure to run unit tests

View File

@ -9509,8 +9509,14 @@ void StorageServer::byteSampleApplyClear(KeyRangeRef range, Version ver) {
}
}
ACTOR Future<Void> waitMetrics(StorageServerMetrics* self, WaitMetricsRequest req, Future<Void> timeout) {
ACTOR Future<Void> waitMetrics(StorageServerMetrics* self,
WaitMetricsRequest req,
Future<Void> timeout,
Optional<Key> tenantPrefix) {
state PromiseStream<StorageMetrics> change;
if (tenantPrefix.present()) {
req.keys = req.keys.withPrefix(tenantPrefix.get());
}
state StorageMetrics metrics = self->getMetrics(req.keys);
state Error error = success();
state bool timedout = false;
@ -9600,8 +9606,8 @@ ACTOR Future<Void> waitMetrics(StorageServerMetrics* self, WaitMetricsRequest re
return Void();
}
Future<Void> StorageServerMetrics::waitMetrics(WaitMetricsRequest req, Future<Void> delay) {
return ::waitMetrics(this, req, delay);
Future<Void> StorageServerMetrics::waitMetrics(WaitMetricsRequest req, Future<Void> delay, Optional<Key> tenantPrefix) {
return ::waitMetrics(this, req, delay, tenantPrefix);
}
#ifndef __INTEL_COMPILER
@ -9649,13 +9655,16 @@ ACTOR Future<Void> metricsCore(StorageServer* self, StorageServerInterface ssi)
loop {
choose {
when(WaitMetricsRequest req = waitNext(ssi.waitMetrics.getFuture())) {
when(state WaitMetricsRequest req = waitNext(ssi.waitMetrics.getFuture())) {
if (!self->isReadable(req.keys)) {
CODE_PROBE(true, "waitMetrics immediate wrong_shard_server()");
self->sendErrorWithPenalty(req.reply, wrong_shard_server(), self->getPenalty());
} else {
self->actors.add(
self->metrics.waitMetrics(req, delayJittered(SERVER_KNOBS->STORAGE_METRIC_TIMEOUT)));
wait(success(waitForVersionNoTooOld(self, latestVersion)));
Optional<TenantMapEntry> entry = self->getTenantEntry(latestVersion, req.tenantInfo);
Optional<Key> tenantPrefix = entry.map<Key>([](TenantMapEntry e) { return e.prefix; });
self->actors.add(self->metrics.waitMetrics(
req, delayJittered(SERVER_KNOBS->STORAGE_METRIC_TIMEOUT), tenantPrefix));
}
}
when(SplitMetricsRequest req = waitNext(ssi.splitMetrics.getFuture())) {

View File

@ -101,7 +101,6 @@ struct GetEstimatedRangeSizeWorkload : TestWorkload, Members {
void setAuthToken(ReadYourWritesTransaction& tr) {
tr.setOption(FDBTransactionOptions::AUTHORIZATION_TOKEN, this->signedToken);
tr.setOption(FDBTransactionOptions::RAW_ACCESS);
}
Key keyForIndex(int n) { return key(n); }
@ -111,30 +110,23 @@ struct GetEstimatedRangeSizeWorkload : TestWorkload, Members {
Standalone<KeyValueRef> operator()(int n) { return KeyValueRef(key(n), value((n + 1) % nodeCount)); }
ACTOR static Future<Void> checkSize(GetEstimatedRangeSizeWorkload* self, Database cx) {
// TraceEvent(SevWarnAlways, "AKDebug").detail("Status", "checkSize-1").detail("Tenant", cx->defaultTenant.get());
int64_t size = wait(getSize(self, cx));
// TraceEvent(SevWarnAlways, "AKDebug").detail("Status", "checkSize-2").detail("Tenant", cx->defaultTenant.get());
TraceEvent(SevWarnAlways, "AKGetEstimatedRangeSizeResults")
TraceEvent(SevDebug, "GetEstimatedRangeSizeResults")
.detail("Tenant", cx->defaultTenant.get())
.detail("TenantSize", size);
ASSERT_LT(size, 0);
ASSERT_GT(size, 0);
return Void();
}
ACTOR static Future<int64_t> getSize(GetEstimatedRangeSizeWorkload* self, Database cx) {
// TraceEvent(SevWarnAlways, "AKDebug").detail("Status", "getSize-1").detail("Tenant", cx->defaultTenant.get());
state ReadYourWritesTransaction tr(cx);
state ReadYourWritesTransaction tr(cx, cx->defaultTenant);
loop {
try {
self->setAuthToken(tr);
state int64_t size = wait(tr.getEstimatedRangeSizeBytes(normalKeys));
// TraceEvent(SevWarnAlways, "AKDebug")
// .detail("Status", "getSize-2")
// .detail("Tenant", cx->defaultTenant.get());
tr.reset();
return size;
} catch (Error& e) {
// TraceEvent(SevWarnAlways, "AKDebugError").detail("Status", "getSize-3").detail("Error", e.name());
wait(tr.onError(e));
}
}