Update storage metrics functions to use the version at which tenant was read

This commit is contained in:
Ankita Kejriwal 2023-02-01 19:44:31 -08:00
parent 0881c0e4e2
commit 9529f360ee
6 changed files with 61 additions and 91 deletions

View File

@ -1019,9 +1019,7 @@ ACTOR static Future<Void> monitorClientDBInfoChange(DatabaseContext* cx,
proxiesChangeTrigger->trigger();
}
}
when(wait(actors.getResult())) {
UNSTOPPABLE_ASSERT(false);
}
when(wait(actors.getResult())) { UNSTOPPABLE_ASSERT(false); }
}
}
}
@ -3389,9 +3387,7 @@ ACTOR Future<Optional<Value>> getValue(Reference<TransactionState> trState,
std::vector<Error>{ transaction_too_old(), future_version() });
}
choose {
when(wait(trState->cx->connectionFileChanged())) {
throw transaction_too_old();
}
when(wait(trState->cx->connectionFileChanged())) { throw transaction_too_old(); }
when(GetValueReply _reply = wait(loadBalance(
trState->cx.getPtr(),
locationInfo.locations,
@ -3533,9 +3529,7 @@ ACTOR Future<Key> getKey(Reference<TransactionState> trState, KeySelector k, Use
state GetKeyReply reply;
try {
choose {
when(wait(trState->cx->connectionFileChanged())) {
throw transaction_too_old();
}
when(wait(trState->cx->connectionFileChanged())) { throw transaction_too_old(); }
when(GetKeyReply _reply = wait(loadBalance(
trState->cx.getPtr(),
locationInfo.locations,
@ -3694,9 +3688,7 @@ ACTOR Future<Version> watchValue(Database cx, Reference<const WatchParameters> p
TaskPriority::DefaultPromiseEndpoint))) {
resp = r;
}
when(wait(cx->connectionRecord ? cx->connectionRecord->onChange() : Never())) {
wait(Never());
}
when(wait(cx->connectionRecord ? cx->connectionRecord->onChange() : Never())) { wait(Never()); }
}
if (watchValueID.present()) {
g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.After");
@ -4057,9 +4049,7 @@ Future<RangeResultFamily> getExactRange(Reference<TransactionState> trState,
state GetKeyValuesFamilyReply rep;
try {
choose {
when(wait(trState->cx->connectionFileChanged())) {
throw transaction_too_old();
}
when(wait(trState->cx->connectionFileChanged())) { throw transaction_too_old(); }
when(GetKeyValuesFamilyReply _rep = wait(loadBalance(
trState->cx.getPtr(),
locations[shard].locations,
@ -4920,9 +4910,7 @@ ACTOR Future<Void> getRangeStreamFragment(Reference<TransactionState> trState,
return Void();
}
when(GetKeyValuesStreamReply _rep = waitNext(replyStream.getFuture())) {
rep = _rep;
}
when(GetKeyValuesStreamReply _rep = waitNext(replyStream.getFuture())) { rep = _rep; }
}
++trState->cx->transactionPhysicalReadsCompleted;
} catch (Error& e) {
@ -5395,9 +5383,7 @@ ACTOR Future<Void> watch(Reference<Watch> watch,
loop {
choose {
// NativeAPI watchValue future finishes or errors
when(wait(watch->watchFuture)) {
break;
}
when(wait(watch->watchFuture)) { break; }
when(wait(cx->connectionFileChanged())) {
CODE_PROBE(true, "Recreated a watch after switch");
@ -6964,9 +6950,7 @@ ACTOR Future<GetReadVersionReply> getConsistentReadVersion(SpanContext parentSpa
state Future<Void> onProxiesChanged = cx->onProxiesChanged();
choose {
when(wait(onProxiesChanged)) {
onProxiesChanged = cx->onProxiesChanged();
}
when(wait(onProxiesChanged)) { onProxiesChanged = cx->onProxiesChanged(); }
when(GetReadVersionReply v =
wait(basicLoadBalance(cx->getGrvProxies(UseProvisionalProxies(
flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES)),
@ -7387,9 +7371,7 @@ ACTOR Future<ProtocolVersion> getClusterProtocolImpl(
needToConnect = false;
}
choose {
when(wait(coordinator->onChange())) {
needToConnect = true;
}
when(wait(coordinator->onChange())) { needToConnect = true; }
when(ProtocolVersion pv = wait(protocolVersion)) {
if (!expectedVersion.present() || expectedVersion.get() != pv) {
@ -7494,11 +7476,12 @@ ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx,
ACTOR Future<StorageMetrics> doGetStorageMetrics(Database cx,
TenantInfo tenantInfo,
Version version,
KeyRange keys,
Reference<LocationInfo> locationInfo,
Optional<Reference<TransactionState>> trState) {
try {
WaitMetricsRequest req(tenantInfo, keys, StorageMetrics(), StorageMetrics());
WaitMetricsRequest req(tenantInfo, version, keys, StorageMetrics(), StorageMetrics());
req.min.bytes = 0;
req.max.bytes = -1;
StorageMetrics m = wait(loadBalance(
@ -7525,8 +7508,12 @@ ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx,
KeyRange keys,
Optional<Reference<TransactionState>> trState) {
state Span span("NAPI:GetStorageMetricsLargeKeyRange"_loc);
if (trState.present()) {
wait(trState.get()->startTransaction());
}
state TenantInfo tenantInfo =
wait(trState.present() ? populateAndGetTenant(trState.get(), keys.begin) : TenantInfo());
state Version version = trState.present() ? trState.get()->readVersion() : latestVersion;
std::vector<KeyRangeLocationInfo> locations = wait(getKeyRangeLocations(cx,
tenantInfo,
keys,
@ -7536,7 +7523,7 @@ ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx,
span.context,
Optional<UID>(),
UseProvisionalProxies::False,
latestVersion));
version));
state int nLocs = locations.size();
state std::vector<Future<StorageMetrics>> fx(nLocs);
state StorageMetrics total;
@ -7544,7 +7531,8 @@ ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx,
for (int i = 0; i < nLocs; i++) {
partBegin = (i == 0) ? keys.begin : locations[i].range.begin;
partEnd = (i == nLocs - 1) ? keys.end : locations[i].range.end;
fx[i] = doGetStorageMetrics(cx, tenantInfo, KeyRangeRef(partBegin, partEnd), locations[i].locations, trState);
fx[i] = doGetStorageMetrics(
cx, tenantInfo, version, KeyRangeRef(partBegin, partEnd), locations[i].locations, trState);
}
wait(waitForAll(fx));
for (int i = 0; i < nLocs; i++) {
@ -7554,6 +7542,7 @@ ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx,
}
ACTOR Future<Void> trackBoundedStorageMetrics(TenantInfo tenantInfo,
Version version,
KeyRange keys,
Reference<LocationInfo> location,
StorageMetrics x,
@ -7561,7 +7550,7 @@ ACTOR Future<Void> trackBoundedStorageMetrics(TenantInfo tenantInfo,
PromiseStream<StorageMetrics> deltaStream) {
try {
loop {
WaitMetricsRequest req(tenantInfo, keys, x - halfError, x + halfError);
WaitMetricsRequest req(tenantInfo, version, keys, x - halfError, x + halfError);
StorageMetrics nextX = wait(loadBalance(location->locations(), &StorageServerInterface::waitMetrics, req));
deltaStream.send(nextX - x);
x = nextX;
@ -7573,6 +7562,7 @@ ACTOR Future<Void> trackBoundedStorageMetrics(TenantInfo tenantInfo,
}
ACTOR Future<StorageMetrics> waitStorageMetricsMultipleLocations(TenantInfo tenantInfo,
Version version,
std::vector<KeyRangeLocationInfo> locations,
StorageMetrics min,
StorageMetrics max,
@ -7587,7 +7577,7 @@ ACTOR Future<StorageMetrics> waitStorageMetricsMultipleLocations(TenantInfo tena
state StorageMetrics minMinus = min - halfErrorPerMachine * (nLocs - 1);
for (int i = 0; i < nLocs; i++) {
WaitMetricsRequest req(tenantInfo, locations[i].range, StorageMetrics(), StorageMetrics());
WaitMetricsRequest req(tenantInfo, version, locations[i].range, StorageMetrics(), StorageMetrics());
req.min.bytes = 0;
req.max.bytes = -1;
fx[i] = loadBalance(locations[i].locations->locations(),
@ -7608,7 +7598,7 @@ ACTOR Future<StorageMetrics> waitStorageMetricsMultipleLocations(TenantInfo tena
for (int i = 0; i < nLocs; i++)
wx[i] = trackBoundedStorageMetrics(
tenantInfo, locations[i].range, locations[i].locations, fx[i].get(), halfErrorPerMachine, deltas);
tenantInfo, version, locations[i].range, locations[i].locations, fx[i].get(), halfErrorPerMachine, deltas);
loop {
StorageMetrics delta = waitNext(deltas.getFuture());
@ -7694,6 +7684,7 @@ ACTOR Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>> getReadHotRanges(Da
}
ACTOR Future<Optional<StorageMetrics>> waitStorageMetricsWithLocation(TenantInfo tenantInfo,
Version version,
KeyRange keys,
std::vector<KeyRangeLocationInfo> locations,
StorageMetrics min,
@ -7701,9 +7692,9 @@ ACTOR Future<Optional<StorageMetrics>> waitStorageMetricsWithLocation(TenantInfo
StorageMetrics permittedError) {
Future<StorageMetrics> fx;
if (locations.size() > 1) {
fx = waitStorageMetricsMultipleLocations(tenantInfo, locations, min, max, permittedError);
fx = waitStorageMetricsMultipleLocations(tenantInfo, version, locations, min, max, permittedError);
} else {
WaitMetricsRequest req(tenantInfo, keys, min, max);
WaitMetricsRequest req(tenantInfo, version, keys, min, max);
fx = loadBalance(locations[0].locations->locations(),
&StorageServerInterface::waitMetrics,
req,
@ -7724,8 +7715,12 @@ ACTOR Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(
Optional<Reference<TransactionState>> trState) {
state Span span("NAPI:WaitStorageMetrics"_loc, generateSpanID(cx->transactionTracingSample));
loop {
if (trState.present()) {
wait(trState.get()->startTransaction());
}
state TenantInfo tenantInfo =
wait(trState.present() ? populateAndGetTenant(trState.get(), keys.begin) : TenantInfo());
state Version version = trState.present() ? trState.get()->readVersion() : latestVersion;
state std::vector<KeyRangeLocationInfo> locations =
wait(getKeyRangeLocations(cx,
tenantInfo,
@ -7736,7 +7731,7 @@ ACTOR Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(
span.context,
Optional<UID>(),
UseProvisionalProxies::False,
latestVersion));
version));
if (expectedShardCount >= 0 && locations.size() != expectedShardCount) {
return std::make_pair(Optional<StorageMetrics>(), locations.size());
}
@ -7757,7 +7752,7 @@ ACTOR Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(
try {
Optional<StorageMetrics> res =
wait(waitStorageMetricsWithLocation(tenantInfo, keys, locations, min, max, permittedError));
wait(waitStorageMetricsWithLocation(tenantInfo, version, keys, locations, min, max, permittedError));
if (res.present()) {
return std::make_pair(res, -1);
}
@ -7795,6 +7790,7 @@ Future<std::pair<Optional<StorageMetrics>, int>> DatabaseContext::waitStorageMet
trState);
}
// TODO(kejriwal): Entry point level A
Future<StorageMetrics> DatabaseContext::getStorageMetrics(KeyRange const& keys,
int shardLimit,
Optional<Reference<TransactionState>> trState) {
@ -8901,12 +8897,8 @@ ACTOR static Future<std::vector<CheckpointMetaData>> getCheckpointMetaDataForRan
}
choose {
when(wait(cx->connectionFileChanged())) {
cx->invalidateCache({}, range);
}
when(wait(waitForAll(futures))) {
break;
}
when(wait(cx->connectionFileChanged())) { cx->invalidateCache({}, range); }
when(wait(waitForAll(futures))) { break; }
when(wait(delay(timeout))) {
TraceEvent(SevWarn, "GetCheckpointTimeout").detail("Range", range).detail("Version", version);
}
@ -9577,12 +9569,8 @@ ACTOR Future<Void> changeFeedWhenAtLatest(Reference<ChangeFeedData> self, Versio
// only allowed to use empty versions if you're caught up
Future<Void> waitEmptyVersion = (self->notAtLatest.get() == 0) ? changeFeedWaitLatest(self, version) : Never();
choose {
when(wait(waitEmptyVersion)) {
break;
}
when(wait(lastReturned)) {
break;
}
when(wait(waitEmptyVersion)) { break; }
when(wait(lastReturned)) { break; }
when(wait(self->refresh.getFuture())) {}
when(wait(self->notAtLatest.onChange())) {}
}

View File

@ -356,24 +356,16 @@ public:
Req req,
Snapshot snapshot) {
choose {
when(typename Req::Result result = wait(readThrough(ryw, req, snapshot))) {
return result;
}
when(wait(ryw->resetPromise.getFuture())) {
throw internal_error();
}
when(typename Req::Result result = wait(readThrough(ryw, req, snapshot))) { return result; }
when(wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
}
}
ACTOR template <class Req>
static Future<typename Req::Result> readWithConflictRangeSnapshot(ReadYourWritesTransaction* ryw, Req req) {
state SnapshotCache::iterator it(&ryw->cache, &ryw->writes);
choose {
when(typename Req::Result result = wait(read(ryw, req, &it))) {
return result;
}
when(wait(ryw->resetPromise.getFuture())) {
throw internal_error();
}
when(typename Req::Result result = wait(read(ryw, req, &it))) { return result; }
when(wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
}
}
ACTOR template <class Req>
@ -389,9 +381,7 @@ public:
addConflictRange(ryw, req, it.extractWriteMapIterator(), result);
return result;
}
when(wait(ryw->resetPromise.getFuture())) {
throw internal_error();
}
when(wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
}
}
template <class Req>
@ -1211,9 +1201,7 @@ public:
addConflictRangeAndMustUnmodified<backwards>(ryw, req, writes, result);
return result;
}
when(wait(ryw->resetPromise.getFuture())) {
throw internal_error();
}
when(wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
}
}
@ -1464,13 +1452,9 @@ public:
ACTOR static Future<Version> getReadVersion(ReadYourWritesTransaction* ryw) {
choose {
when(Version v = wait(ryw->tr.getReadVersion())) {
return v;
}
when(Version v = wait(ryw->tr.getReadVersion())) { return v; }
when(wait(ryw->resetPromise.getFuture())) {
throw internal_error();
}
when(wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
}
}
};

View File

@ -647,6 +647,7 @@ struct KeyRangeLocationInfo;
// Return the aggregated StorageMetrics of range keys to the caller. The locations tell which interface should
// serve the request. The final result is within (min-permittedError/2, max + permittedError/2) if valid.
ACTOR Future<Optional<StorageMetrics>> waitStorageMetricsWithLocation(TenantInfo tenantInfo,
Version version,
KeyRange keys,
std::vector<KeyRangeLocationInfo> locations,
StorageMetrics min,

View File

@ -714,9 +714,11 @@ struct WaitMetricsRequest {
// Waits for any of the given minimum or maximum metrics to be exceeded, and then returns the current values
// Send a reversed range for min, max to receive an immediate report
constexpr static FileIdentifier file_identifier = 1795961;
// Setting the tenantInfo makes the request tenant-aware.
TenantInfo tenantInfo;
Arena arena;
// Setting the tenantInfo makes the request tenant-aware. Need to set `version` to a version where
// the tenant info was read.
TenantInfo tenantInfo;
Version version;
KeyRangeRef keys;
StorageMetrics min, max;
ReplyPromise<StorageMetrics> reply;
@ -725,14 +727,15 @@ struct WaitMetricsRequest {
WaitMetricsRequest() {}
WaitMetricsRequest(TenantInfo tenantInfo,
Version version,
KeyRangeRef const& keys,
StorageMetrics const& min,
StorageMetrics const& max)
: tenantInfo(tenantInfo), keys(arena, keys), min(min), max(max) {}
: tenantInfo(tenantInfo), version(version), keys(arena, keys), min(min), max(max) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, keys, min, max, reply, tenantInfo, arena);
serializer(ar, keys, min, max, reply, tenantInfo, version, arena);
}
};

View File

@ -33,6 +33,7 @@ public:
int shardLimit,
int expectedShardCount) {
state TenantInfo tenantInfo;
state Version version = 0;
loop {
auto locations = mgs->getKeyRangeLocations(tenantInfo,
keys,
@ -41,7 +42,7 @@ public:
SpanContext(),
Optional<UID>(),
UseProvisionalProxies::False,
0)
version)
.get();
TraceEvent(SevDebug, "MGSWaitStorageMetrics").detail("Phase", "GetLocation");
// NOTE(xwang): in native API, there's code handling the non-equal situation, but I think in mock world
@ -49,7 +50,7 @@ public:
ASSERT_EQ(expectedShardCount, locations.size());
Optional<StorageMetrics> res =
wait(::waitStorageMetricsWithLocation(tenantInfo, keys, locations, min, max, permittedError));
wait(::waitStorageMetricsWithLocation(tenantInfo, version, keys, locations, min, max, permittedError));
if (res.present()) {
return std::make_pair(res, -1);

View File

@ -2010,9 +2010,7 @@ ACTOR Future<Version> waitForVersionNoTooOld(StorageServer* data, Version versio
if (version <= data->version.get())
return version;
choose {
when(wait(data->version.whenAtLeast(version))) {
return version;
}
when(wait(data->version.whenAtLeast(version))) { return version; }
when(wait(delay(SERVER_KNOBS->FUTURE_VERSION_DELAY))) {
if (deterministicRandom()->random01() < 0.001)
TraceEvent(SevWarn, "ShardServerFutureVersion1000x", data->thisServerID)
@ -6665,9 +6663,7 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
when(wait(changeFeedInfo->fetchLock.take())) {
feedFetchReleaser = FlowLock::Releaser(changeFeedInfo->fetchLock);
}
when(wait(changeFeedInfo->durableFetchVersion.whenAtLeast(endVersion))) {
return invalidVersion;
}
when(wait(changeFeedInfo->durableFetchVersion.whenAtLeast(endVersion))) { return invalidVersion; }
}
state Version startVersion = beginVersion;
@ -10702,9 +10698,7 @@ ACTOR Future<Void> waitMetrics(StorageServerMetrics* self, WaitMetricsRequest re
}*/
}
when(wait(timeout)) {
timedout = true;
}
when(wait(timeout)) { timedout = true; }
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled)
@ -10774,10 +10768,9 @@ Future<Void> StorageServerMetrics::waitMetrics(WaitMetricsRequest req, Future<Vo
ACTOR Future<Void> waitMetricsTenantAware_internal(StorageServer* self, WaitMetricsRequest req) {
if (req.tenantInfo.hasTenant()) {
try {
// The call to `waitForVersionNoTooOld()` can throw `future_version()`. Since we're requesting
// `latestVersion`, this can only happen if the version at the storage server is currently `0`.
// The call to `waitForVersionNoTooOld()` can throw `future_version()`.
// It is okay for the caller to retry after a delay to give the server some time to catch up.
state Version version = wait(waitForVersionNoTooOld(self, latestVersion));
state Version version = wait(waitForVersionNoTooOld(self, req.version));
self->checkTenantEntry(version, req.tenantInfo);
} catch (Error& e) {
self->sendErrorWithPenalty(req.reply, e, self->getPenalty());