refactoring old waitStorageMetrics and finish MGS::waitStorageMetrics
(no unit test yet)
This commit is contained in:
parent
5d90703dc8
commit
1603926595
|
@ -1915,7 +1915,8 @@ Optional<KeyRangeLocationInfo> DatabaseContext::getCachedLocation(const Optional
|
|||
auto range =
|
||||
isBackward ? locationCache.rangeContainingKeyBefore(resolvedKey) : locationCache.rangeContaining(resolvedKey);
|
||||
if (range->value()) {
|
||||
return KeyRangeLocationInfo(tenantEntry, toPrefixRelativeRange(range->range(), tenantEntry.prefix), range->value());
|
||||
return KeyRangeLocationInfo(
|
||||
tenantEntry, toPrefixRelativeRange(range->range(), tenantEntry.prefix), range->value());
|
||||
}
|
||||
|
||||
return Optional<KeyRangeLocationInfo>();
|
||||
|
@ -1952,7 +1953,8 @@ bool DatabaseContext::getCachedLocations(const Optional<TenantNameRef>& tenantNa
|
|||
result.clear();
|
||||
return false;
|
||||
}
|
||||
result.emplace_back(tenantEntry, toPrefixRelativeRange(r->range() & resolvedRange, tenantEntry.prefix), r->value());
|
||||
result.emplace_back(
|
||||
tenantEntry, toPrefixRelativeRange(r->range() & resolvedRange, tenantEntry.prefix), r->value());
|
||||
if (result.size() == limit || begin == end) {
|
||||
break;
|
||||
}
|
||||
|
@ -7714,6 +7716,34 @@ ACTOR Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>> getReadHotRanges(Da
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Optional<StorageMetrics>> waitStorageMetricsWithLocation(TenantInfo tenantInfo,
|
||||
KeyRange keys,
|
||||
std::vector<KeyRangeLocationInfo> locations,
|
||||
StorageMetrics min,
|
||||
StorageMetrics max,
|
||||
StorageMetrics permittedError) {
|
||||
try {
|
||||
Future<StorageMetrics> fx;
|
||||
if (locations.size() > 1) {
|
||||
fx = waitStorageMetricsMultipleLocations(tenantInfo, locations, min, max, permittedError);
|
||||
} else {
|
||||
WaitMetricsRequest req(tenantInfo, keys, min, max);
|
||||
fx = loadBalance(locations[0].locations->locations(),
|
||||
&StorageServerInterface::waitMetrics,
|
||||
req,
|
||||
TaskPriority::DataDistribution);
|
||||
}
|
||||
StorageMetrics x = wait(fx);
|
||||
return x;
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_wrong_shard_server && e.code() != error_code_all_alternatives_failed) {
|
||||
TraceEvent(SevError, "WaitStorageMetricsError").error(e);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
return Optional<StorageMetrics>();
|
||||
}
|
||||
|
||||
ACTOR Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(
|
||||
Database cx,
|
||||
KeyRange keys,
|
||||
|
@ -7743,30 +7773,8 @@ ACTOR Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(
|
|||
}
|
||||
|
||||
// SOMEDAY: Right now, if there are too many shards we delay and check again later. There may be a better
|
||||
// solution to this.
|
||||
if (locations.size() < shardLimit) {
|
||||
try {
|
||||
Future<StorageMetrics> fx;
|
||||
if (locations.size() > 1) {
|
||||
fx = waitStorageMetricsMultipleLocations(tenantInfo, locations, min, max, permittedError);
|
||||
} else {
|
||||
WaitMetricsRequest req(tenantInfo, keys, min, max);
|
||||
fx = loadBalance(locations[0].locations->locations(),
|
||||
&StorageServerInterface::waitMetrics,
|
||||
req,
|
||||
TaskPriority::DataDistribution);
|
||||
}
|
||||
StorageMetrics x = wait(fx);
|
||||
return std::make_pair(x, -1);
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_wrong_shard_server && e.code() != error_code_all_alternatives_failed) {
|
||||
TraceEvent(SevError, "WaitStorageMetricsError").error(e);
|
||||
throw;
|
||||
}
|
||||
cx->invalidateCache(locations[0].tenantEntry.prefix, keys);
|
||||
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskPriority::DataDistribution));
|
||||
}
|
||||
} else {
|
||||
// solution to this. How could this happen?
|
||||
if (locations.size() >= shardLimit) {
|
||||
TraceEvent(SevWarn, "WaitStorageMetricsPenalty")
|
||||
.detail("Keys", keys)
|
||||
.detail("Limit", shardLimit)
|
||||
|
@ -7775,7 +7783,16 @@ ACTOR Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(
|
|||
wait(delayJittered(CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, TaskPriority::DataDistribution));
|
||||
// make sure that the next getKeyRangeLocations() call will actually re-fetch the range
|
||||
cx->invalidateCache(locations[0].tenantEntry.prefix, keys);
|
||||
continue;
|
||||
}
|
||||
|
||||
Optional<StorageMetrics> res =
|
||||
wait(waitStorageMetricsWithLocation(tenantInfo, keys, locations, min, max, permittedError));
|
||||
if (res.present()) {
|
||||
return std::make_pair(res, -1);
|
||||
}
|
||||
cx->invalidateCache(locations[0].tenantEntry.prefix, keys);
|
||||
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskPriority::DataDistribution));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -591,6 +591,16 @@ int64_t getMaxWriteKeySize(KeyRef const& key, bool hasRawAccess);
|
|||
// Returns the maximum legal size of a key that can be cleared. Keys larger than this will be assumed not to exist.
|
||||
int64_t getMaxClearKeySize(KeyRef const& key);
|
||||
|
||||
struct KeyRangeLocationInfo;
|
||||
// Return the aggregated StorageMetrics of range keys to the caller. The locations tell which interface should
|
||||
// serve the request. The final result is within (min-permittedError/2, max + permittedError/2) if valid.
|
||||
ACTOR Future<Optional<StorageMetrics>> waitStorageMetricsWithLocation(TenantInfo tenantInfo,
|
||||
KeyRange keys,
|
||||
std::vector<KeyRangeLocationInfo> locations,
|
||||
StorageMetrics min,
|
||||
StorageMetrics max,
|
||||
StorageMetrics permittedError);
|
||||
|
||||
namespace NativeAPI {
|
||||
ACTOR Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> getServerListAndProcessClasses(
|
||||
Transaction* tr);
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* MockGlobalState.cpp
|
||||
* MockGlobalState.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
|
@ -20,6 +20,42 @@
|
|||
|
||||
#include "fdbserver/MockGlobalState.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/actorcompiler.h"
|
||||
|
||||
class MockGlobalStateImpl {
|
||||
public:
|
||||
ACTOR static Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(MockGlobalState* mgs,
|
||||
KeyRange keys,
|
||||
StorageMetrics min,
|
||||
StorageMetrics max,
|
||||
StorageMetrics permittedError,
|
||||
int shardLimit,
|
||||
int expectedShardCount) {
|
||||
state TenantInfo tenantInfo;
|
||||
loop {
|
||||
auto locations = mgs->getKeyRangeLocations(tenantInfo,
|
||||
keys,
|
||||
shardLimit,
|
||||
Reverse::False,
|
||||
SpanContext(),
|
||||
Optional<UID>(),
|
||||
UseProvisionalProxies::False,
|
||||
0)
|
||||
.get();
|
||||
// NOTE(xwang): in native API, there's code handling the non-equal situation, but I think in mock world
|
||||
// there shouldn't have any delay to update the locations.
|
||||
ASSERT_EQ(expectedShardCount, locations.size());
|
||||
|
||||
Optional<StorageMetrics> res =
|
||||
wait(::waitStorageMetricsWithLocation(tenantInfo, keys, locations, min, max, permittedError));
|
||||
|
||||
if (res.present()) {
|
||||
return std::make_pair(res, -1);
|
||||
}
|
||||
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskPriority::DataDistribution));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
bool MockStorageServer::allShardStatusEqual(KeyRangeRef range, MockShardStatus status) {
|
||||
auto ranges = serverKeys.intersectingRanges(range);
|
||||
|
@ -198,7 +234,8 @@ Future<std::pair<Optional<StorageMetrics>, int>> MockGlobalState::waitStorageMet
|
|||
const StorageMetrics& permittedError,
|
||||
int shardLimit,
|
||||
int expectedShardCount) {
|
||||
return Future<std::pair<Optional<StorageMetrics>, int>>();
|
||||
return MockGlobalStateImpl::waitStorageMetrics(
|
||||
this, keys, min, max, permittedError, shardLimit, expectedShardCount);
|
||||
}
|
||||
|
||||
Reference<LocationInfo> buildLocationInfo(const std::vector<StorageServerInterface>& interfaces) {
|
|
@ -113,8 +113,11 @@ protected:
|
|||
void twoWayShardSplitting(KeyRangeRef range, KeyRef splitPoint, uint64_t rangeSize, bool restrictSize);
|
||||
};
|
||||
|
||||
class MockGlobalStateImpl;
|
||||
|
||||
class MockGlobalState : public IKeyLocationService {
|
||||
friend struct MockGlobalStateTester;
|
||||
friend class MockGlobalStateImpl;
|
||||
|
||||
std::vector<StorageServerInterface> extractStorageServerInterfaces(const std::vector<UID>& ids) const;
|
||||
|
||||
|
@ -167,6 +170,7 @@ public:
|
|||
*/
|
||||
bool allShardRemovedFromServer(const UID& serverId);
|
||||
|
||||
// SOMEDAY: NativeAPI::waitStorageMetrics should share the code in the future, this is a simpler version of it
|
||||
Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(KeyRange const& keys,
|
||||
StorageMetrics const& min,
|
||||
StorageMetrics const& max,
|
||||
|
@ -175,21 +179,21 @@ public:
|
|||
int expectedShardCount);
|
||||
|
||||
Future<KeyRangeLocationInfo> getKeyLocation(TenantInfo tenant,
|
||||
Key key,
|
||||
Key key,
|
||||
SpanContext spanContext,
|
||||
Optional<UID> debugID,
|
||||
UseProvisionalProxies useProvisionalProxies,
|
||||
Reverse isBackward,
|
||||
Version version) override;
|
||||
|
||||
Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(TenantInfo tenant,
|
||||
KeyRange keys,
|
||||
int limit,
|
||||
Reverse reverse,
|
||||
SpanContext spanContext,
|
||||
Optional<UID> debugID,
|
||||
UseProvisionalProxies useProvisionalProxies,
|
||||
Reverse isBackward,
|
||||
Version version) override;
|
||||
|
||||
Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(TenantInfo tenant,
|
||||
KeyRange keys,
|
||||
int limit,
|
||||
Reverse reverse,
|
||||
SpanContext spanContext,
|
||||
Optional<UID> debugID,
|
||||
UseProvisionalProxies useProvisionalProxies,
|
||||
Version version) override;
|
||||
};
|
||||
|
||||
#endif // FOUNDATIONDB_MOCKGLOBALSTATE_H
|
||||
|
|
Loading…
Reference in New Issue