Split shards along tenant keyspace boundaries (disabled by default) (#7432)

* Boilerplate code for tenant-aware DD

* Add an DD tenant-cache-assembly actor

* Add basic tenant list monitoring for tenant cache. Tenant cache is not yet a DD-scoped structure

* Create a DDTenantCache class

* Move DDTenantCache to new files

* Tweak some traces; fix placement of build and monitor actor invocations

* Update DD tenant cache refresh to be more efficient and unit-testable

* Fix incorrect use of prefixToID in the DDTenantCache code; it is not correct when the tenant is within a tenant subspace

* Fix a faulty assertion in a DDTenantCache unit test, and add a server knob to switch of DDTenantCache code for the moment

* Fix a DDTenantCache UT bug where count of tenants could be 0, causing divide-by-0 exceptions in the test code; Fix accidental movement of atrace statement; rename some DD tenant-cache related variables

* Make some member DDTenantCache member functions const, and change the ddtc abbreviation to ddTenantCache, for ddtc could refer to DDTeamCollection

* Remove unnecessary assertion that erase-ing an element from a Map really removes it

* Remove the DD prefix in the tenant cache class name (and associated impl and UT class names); there is nothing specific to DD in it; DD uses it; other modules may use it in the future

* Disable DD tenant awareness by default

* Split shards along tenant keyspace boundaries

* Update some naming, switch to using prefixRange and fix a bug in tenantShardSplit

* Reorganize tenant shard splitter

* Moved new header file to new include dir under fdbserver

* Make shardEvaluator trigger shard merger only if adjacent shards belong to the same tenant, if tenant-keyspace boundaries are being honored

* Fix issues in the way shard merge decisions are made based on tenant boundaries

* Minor reorg and bug fix in shard merger decision-making

* Resolve merge issues

* Fix bug in ShardEvaluator that made it run in a loop rather than waiting for metrics changes

* Fix tenant splitter for when start and end are in different shards and only one of them needs to be split

* Remove some uses of printable

* Remove uses of printable

* Fix code formatting

* Add a TENANT_SPLIT RelocateReason and fix errors from incorrect merge of upstream code

* Move tenantCreationSignal ownership to TenantCache

* Update fdbserver/DataDistributionTracker.actor.cpp

Co-authored-by: Xiaoxi Wang <xiaoxi.wang@snowflake.com>

* Fix a couple of bugs and spelling errors in tenantShardSplitter

* Update fdbserver/DataDistributionTracker.actor.cpp

Co-authored-by: Xiaoxi Wang <xiaoxi.wang@snowflake.com>

* Refactor split-merge feasibility check methods

* Merge upstream changes

* Break down tenantShardSplitter to remove dependency on DataDistributionTracker and make the splitter unit-testable

* Add unit tests for tenant shard splitting

* Factor out some repeated code when finding tenant boundaries within a shard

* Extract fault line verification steps used in UTs into a separate function

Co-authored-by: Xiaoxi Wang <xiaoxi.wang@snowflake.com>
This commit is contained in:
Bharadwaj V.R 2022-09-06 10:53:02 -07:00 committed by GitHub
parent fda4d6c8ea
commit beb00b4b7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 689 additions and 176 deletions

View File

@ -18,13 +18,17 @@
* limitations under the License.
*/
#include "fdbclient/FDBTypes.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/DDSharedContext.h"
#include "fdbserver/TenantCache.h"
#include "fdbserver/Knobs.h"
#include "fdbclient/DatabaseContext.h"
#include "flow/ActorCollection.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/Trace.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -121,6 +125,8 @@ struct DataDistributionTracker : public IDDShardTracker {
}
};
Optional<Reference<TenantCache>> ddTenantCache;
DataDistributionTracker(Database cx,
UID distributorId,
Promise<Void> const& readyToStart,
@ -129,12 +135,13 @@ struct DataDistributionTracker : public IDDShardTracker {
Reference<PhysicalShardCollection> physicalShardCollection,
Reference<AsyncVar<bool>> anyZeroHealthyTeams,
KeyRangeMap<ShardTrackedData>* shards,
bool* trackerCancelled)
bool* trackerCancelled,
Optional<Reference<TenantCache>> ddTenantCache)
: IDDShardTracker(), cx(cx), distributorId(distributorId), shards(shards), sizeChanges(false),
systemSizeEstimate(0), dbSizeEstimate(new AsyncVar<int64_t>()), maxShardSize(new AsyncVar<Optional<int64_t>>()),
output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure),
physicalShardCollection(physicalShardCollection), readyToStart(readyToStart),
anyZeroHealthyTeams(anyZeroHealthyTeams), trackerCancelled(trackerCancelled) {}
anyZeroHealthyTeams(anyZeroHealthyTeams), trackerCancelled(trackerCancelled), ddTenantCache(ddTenantCache) {}
~DataDistributionTracker() override {
*trackerCancelled = true;
@ -501,6 +508,375 @@ private:
Promise<Void> cleared;
};
std::string describeSplit(KeyRange keys, Standalone<VectorRef<KeyRef>>& splitKeys) {
std::string s;
s += "[" + keys.begin.toString() + ", " + keys.end.toString() + ") -> ";
for (auto& sk : splitKeys) {
s += sk.printable() + " ";
}
return s;
}
void traceSplit(KeyRange keys, Standalone<VectorRef<KeyRef>>& splitKeys) {
auto s = describeSplit(keys, splitKeys);
TraceEvent(SevInfo, "ExecutingShardSplit").detail("AtKeys", s);
}
void executeShardSplit(DataDistributionTracker* self,
KeyRange keys,
Standalone<VectorRef<KeyRef>> splitKeys,
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize,
bool relocate,
RelocateReason reason) {
int numShards = splitKeys.size() - 1;
ASSERT(numShards > 1);
int skipRange = deterministicRandom()->randomInt(0, numShards);
auto s = describeSplit(keys, splitKeys);
TraceEvent(SevInfo, "ExecutingShardSplit").suppressFor(0.5).detail("Splitting", s).detail("NumShards", numShards);
// The queue can't deal with RelocateShard requests which split an existing shard into three pieces, so
// we have to send the unskipped ranges in this order (nibbling in from the edges of the old range)
for (int i = 0; i < skipRange; i++)
restartShardTrackers(self, KeyRangeRef(splitKeys[i], splitKeys[i + 1]));
restartShardTrackers(self, KeyRangeRef(splitKeys[skipRange], splitKeys[skipRange + 1]));
for (int i = numShards - 1; i > skipRange; i--)
restartShardTrackers(self, KeyRangeRef(splitKeys[i], splitKeys[i + 1]));
for (int i = 0; i < skipRange; i++) {
KeyRangeRef r(splitKeys[i], splitKeys[i + 1]);
self->shardsAffectedByTeamFailure->defineShard(r);
if (relocate) {
self->output.send(RelocateShard(r, DataMovementReason::SPLIT_SHARD, reason));
}
}
for (int i = numShards - 1; i > skipRange; i--) {
KeyRangeRef r(splitKeys[i], splitKeys[i + 1]);
self->shardsAffectedByTeamFailure->defineShard(r);
if (relocate) {
self->output.send(RelocateShard(r, DataMovementReason::SPLIT_SHARD, reason));
}
}
self->sizeChanges.add(changeSizes(self, keys, shardSize->get().get().metrics.bytes));
}
struct RangeToSplit {
RangeMap<Standalone<StringRef>, ShardTrackedData, KeyRangeRef>::iterator shard;
Standalone<VectorRef<KeyRef>> faultLines;
RangeToSplit(RangeMap<Standalone<StringRef>, ShardTrackedData, KeyRangeRef>::iterator shard,
Standalone<VectorRef<KeyRef>> faultLines)
: shard(shard), faultLines(faultLines) {}
};
Standalone<VectorRef<KeyRef>> findShardFaultLines(KeyRef shardBegin,
KeyRef shardEnd,
KeyRef tenantBegin,
KeyRef tenantEnd) {
Standalone<VectorRef<KeyRef>> faultLines;
ASSERT((shardBegin < tenantBegin && shardEnd > tenantBegin) || (shardBegin < tenantEnd && shardEnd > tenantEnd));
faultLines.push_back_deep(faultLines.arena(), shardBegin);
if (shardBegin < tenantBegin && shardEnd > tenantBegin) {
faultLines.push_back_deep(faultLines.arena(), tenantBegin);
}
if (shardBegin < tenantEnd && shardEnd > tenantEnd) {
faultLines.push_back_deep(faultLines.arena(), tenantEnd);
}
faultLines.push_back_deep(faultLines.arena(), shardEnd);
return faultLines;
}
std::vector<RangeToSplit> findTenantShardBoundaries(KeyRangeMap<ShardTrackedData>* shards, KeyRange tenantKeys) {
std::vector<RangeToSplit> result;
auto shardContainingTenantStart = shards->rangeContaining(tenantKeys.begin);
auto shardContainingTenantEnd = shards->rangeContainingKeyBefore(tenantKeys.end);
// same shard
if (shardContainingTenantStart == shardContainingTenantEnd) {
// If shard boundaries are not aligned with tenantKeys
if (shardContainingTenantStart.begin() != tenantKeys.begin ||
shardContainingTenantStart.end() != tenantKeys.end) {
auto startShardSize = shardContainingTenantStart->value().stats;
if (startShardSize->get().present()) {
auto faultLines = findShardFaultLines(shardContainingTenantStart->begin(),
shardContainingTenantStart->end(),
tenantKeys.begin,
tenantKeys.end);
result.emplace_back(shardContainingTenantStart, faultLines);
}
}
} else {
auto startShardSize = shardContainingTenantStart->value().stats;
auto endShardSize = shardContainingTenantEnd->value().stats;
if (startShardSize->get().present() && endShardSize->get().present()) {
if (shardContainingTenantStart->begin() != tenantKeys.begin) {
auto faultLines = findShardFaultLines(shardContainingTenantStart->begin(),
shardContainingTenantStart->end(),
tenantKeys.begin,
tenantKeys.end);
result.emplace_back(shardContainingTenantStart, faultLines);
}
if (shardContainingTenantEnd->end() != tenantKeys.end) {
auto faultLines = findShardFaultLines(shardContainingTenantEnd->begin(),
shardContainingTenantEnd->end(),
tenantKeys.begin,
tenantKeys.end);
result.emplace_back(shardContainingTenantEnd, faultLines);
}
}
}
return result;
}
bool faultLinesMatch(std::vector<RangeToSplit>& ranges, std::vector<std::vector<KeyRef>>& expectedFaultLines) {
if (ranges.size() != expectedFaultLines.size()) {
return false;
}
for (auto& range : ranges) {
KeyRangeRef keys = KeyRangeRef(range.shard->begin(), range.shard->end());
traceSplit(keys, range.faultLines);
}
for (int r = 0; r < ranges.size(); r++) {
if (ranges[r].faultLines.size() != expectedFaultLines[r].size()) {
return false;
}
for (int fl = 0; fl < ranges[r].faultLines.size(); fl++) {
if (ranges[r].faultLines[fl] != expectedFaultLines[r][fl]) {
return false;
}
}
}
return true;
}
TEST_CASE("/DataDistribution/Tenant/SingleShardSplit") {
wait(Future<Void>(Void()));
ShardTrackedData data;
ShardMetrics sm(StorageMetrics(), now(), 1);
data.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
KeyRangeMap<ShardTrackedData> shards;
KeyRef begin = "a"_sr, end = "f"_sr;
KeyRangeRef k(begin, end);
shards.insert(k, data);
KeyRangeRef tenantKeys("b"_sr, "c"_sr);
data.stats->set(sm);
std::vector<RangeToSplit> result = findTenantShardBoundaries(&shards, tenantKeys);
std::vector<std::vector<KeyRef>> expectedFaultLines = { { "a"_sr, "b"_sr, "c"_sr, "f"_sr } };
ASSERT(faultLinesMatch(result, expectedFaultLines));
return Void();
}
TEST_CASE("/DataDistribution/Tenant/SingleShardTenantAligned") {
wait(Future<Void>(Void()));
ShardTrackedData data;
ShardMetrics sm(StorageMetrics(), now(), 1);
data.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
KeyRangeMap<ShardTrackedData> shards;
KeyRef begin = "a"_sr, end = "f"_sr;
KeyRangeRef k(begin, end);
shards.insert(k, data);
KeyRangeRef tenantKeys("a"_sr, "f"_sr);
data.stats->set(sm);
std::vector<RangeToSplit> result = findTenantShardBoundaries(&shards, tenantKeys);
std::vector<std::vector<KeyRef>> expectedFaultLines = {};
ASSERT(faultLinesMatch(result, expectedFaultLines));
return Void();
}
TEST_CASE("/DataDistribution/Tenant/SingleShardTenantAlignedAtStart") {
wait(Future<Void>(Void()));
ShardTrackedData data;
ShardMetrics sm(StorageMetrics(), now(), 1);
data.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
KeyRangeMap<ShardTrackedData> shards;
KeyRef begin = "a"_sr, end = "f"_sr;
KeyRangeRef k(begin, end);
shards.insert(k, data);
KeyRangeRef tenantKeys("a"_sr, "d"_sr);
data.stats->set(sm);
std::vector<RangeToSplit> result = findTenantShardBoundaries(&shards, tenantKeys);
std::vector<std::vector<KeyRef>> expectedFaultLines = { { "a"_sr, "d"_sr, "f"_sr } };
ASSERT(faultLinesMatch(result, expectedFaultLines));
return Void();
}
TEST_CASE("/DataDistribution/Tenant/SingleShardTenantAlignedAtEnd") {
wait(Future<Void>(Void()));
ShardTrackedData data;
ShardMetrics sm(StorageMetrics(), now(), 1);
data.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
KeyRangeMap<ShardTrackedData> shards;
KeyRef begin = "a"_sr, end = "f"_sr;
KeyRangeRef k(begin, end);
shards.insert(k, data);
KeyRangeRef tenantKeys("b"_sr, "f"_sr);
data.stats->set(sm);
std::vector<RangeToSplit> result = findTenantShardBoundaries(&shards, tenantKeys);
std::vector<std::vector<KeyRef>> expectedFaultLines = { { "a"_sr, "b"_sr, "f"_sr } };
ASSERT(faultLinesMatch(result, expectedFaultLines));
return Void();
}
TEST_CASE("/DataDistribution/Tenant/DoubleShardSplit") {
wait(Future<Void>(Void()));
ShardTrackedData data1, data2;
ShardMetrics sm(StorageMetrics(), now(), 1);
data1.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
data2.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
KeyRangeMap<ShardTrackedData> shards;
KeyRef begin1 = "a"_sr, end1 = "c"_sr;
KeyRef begin2 = "d"_sr, end2 = "f"_sr;
KeyRangeRef k1(begin1, end1);
KeyRangeRef k2(begin2, end2);
shards.insert(k1, data1);
shards.insert(k2, data2);
KeyRangeRef tenantKeys("b"_sr, "e"_sr);
data1.stats->set(sm);
data2.stats->set(sm);
std::vector<RangeToSplit> result = findTenantShardBoundaries(&shards, tenantKeys);
for (auto& range : result) {
KeyRangeRef keys = KeyRangeRef(range.shard->begin(), range.shard->end());
traceSplit(keys, range.faultLines);
}
std::vector<std::vector<KeyRef>> expectedFaultLines = { { "a"_sr, "b"_sr, "c"_sr }, { "d"_sr, "e"_sr, "f"_sr } };
ASSERT(faultLinesMatch(result, expectedFaultLines));
return Void();
}
TEST_CASE("/DataDistribution/Tenant/DoubleShardTenantAlignedAtStart") {
wait(Future<Void>(Void()));
ShardTrackedData data1, data2;
ShardMetrics sm(StorageMetrics(), now(), 1);
data1.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
data2.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
KeyRangeMap<ShardTrackedData> shards;
KeyRef begin1 = "a"_sr, end1 = "c"_sr;
KeyRef begin2 = "d"_sr, end2 = "f"_sr;
KeyRangeRef k1(begin1, end1);
KeyRangeRef k2(begin2, end2);
shards.insert(k1, data1);
shards.insert(k2, data2);
KeyRangeRef tenantKeys("a"_sr, "e"_sr);
data1.stats->set(sm);
data2.stats->set(sm);
std::vector<RangeToSplit> result = findTenantShardBoundaries(&shards, tenantKeys);
std::vector<std::vector<KeyRef>> expectedFaultLines = { { "d"_sr, "e"_sr, "f"_sr } };
ASSERT(faultLinesMatch(result, expectedFaultLines));
return Void();
}
TEST_CASE("/DataDistribution/Tenant/DoubleShardTenantAlignedAtEnd") {
wait(Future<Void>(Void()));
ShardTrackedData data1, data2;
ShardMetrics sm(StorageMetrics(), now(), 1);
data1.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
data2.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
KeyRangeMap<ShardTrackedData> shards;
KeyRef begin1 = "a"_sr, end1 = "c"_sr;
KeyRef begin2 = "d"_sr, end2 = "f"_sr;
KeyRangeRef k1(begin1, end1);
KeyRangeRef k2(begin2, end2);
shards.insert(k1, data1);
shards.insert(k2, data2);
KeyRangeRef tenantKeys("b"_sr, "f"_sr);
data1.stats->set(sm);
data2.stats->set(sm);
std::vector<RangeToSplit> result = findTenantShardBoundaries(&shards, tenantKeys);
std::vector<std::vector<KeyRef>> expectedFaultLines = { { "a"_sr, "b"_sr, "c"_sr } };
ASSERT(faultLinesMatch(result, expectedFaultLines));
return Void();
}
ACTOR Future<Void> tenantShardSplitter(DataDistributionTracker* self, KeyRange tenantKeys) {
wait(Future<Void>(Void()));
std::vector<RangeToSplit> rangesToSplit = findTenantShardBoundaries(self->shards, tenantKeys);
for (auto& range : rangesToSplit) {
KeyRangeRef keys = KeyRangeRef(range.shard->begin(), range.shard->end());
traceSplit(keys, range.faultLines);
executeShardSplit(self, keys, range.faultLines, range.shard->value().stats, true, RelocateReason::TENANT_SPLIT);
}
return Void();
}
ACTOR Future<Void> tenantCreationHandling(DataDistributionTracker* self, TenantCacheTenantCreated req) {
TraceEvent(SevInfo, "TenantCacheTenantCreated").detail("Begin", req.keys.begin).detail("End", req.keys.end);
wait(tenantShardSplitter(self, req.keys));
req.reply.send(true);
return Void();
}
ACTOR Future<Void> shardSplitter(DataDistributionTracker* self,
KeyRange keys,
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize,
@ -540,27 +916,7 @@ ACTOR Future<Void> shardSplitter(DataDistributionTracker* self,
.detail("NumShards", numShards);
if (numShards > 1) {
int skipRange = deterministicRandom()->randomInt(0, numShards);
// The queue can't deal with RelocateShard requests which split an existing shard into three pieces, so
// we have to send the unskipped ranges in this order (nibbling in from the edges of the old range)
for (int i = 0; i < skipRange; i++)
restartShardTrackers(self, KeyRangeRef(splitKeys[i], splitKeys[i + 1]));
restartShardTrackers(self, KeyRangeRef(splitKeys[skipRange], splitKeys[skipRange + 1]));
for (int i = numShards - 1; i > skipRange; i--)
restartShardTrackers(self, KeyRangeRef(splitKeys[i], splitKeys[i + 1]));
for (int i = 0; i < skipRange; i++) {
KeyRangeRef r(splitKeys[i], splitKeys[i + 1]);
self->shardsAffectedByTeamFailure->defineShard(r);
self->output.send(RelocateShard(r, DataMovementReason::SPLIT_SHARD, reason));
}
for (int i = numShards - 1; i > skipRange; i--) {
KeyRangeRef r(splitKeys[i], splitKeys[i + 1]);
self->shardsAffectedByTeamFailure->defineShard(r);
self->output.send(RelocateShard(r, DataMovementReason::SPLIT_SHARD, reason));
}
self->sizeChanges.add(changeSizes(self, keys, shardSize->get().get().metrics.bytes));
executeShardSplit(self, keys, splitKeys, shardSize, true, reason);
} else {
wait(delay(1.0, TaskPriority::DataDistribution)); // In case the reason the split point was off was due to a
// discrepancy between storage servers
@ -579,6 +935,43 @@ ACTOR Future<Void> brokenPromiseToReady(Future<Void> f) {
return Void();
}
static bool shardMergeFeasible(DataDistributionTracker* self, KeyRange const& keys, KeyRangeRef adjRange) {
bool honorTenantKeyspaceBoundaries = self->ddTenantCache.present();
if (!honorTenantKeyspaceBoundaries) {
return true;
}
Optional<Reference<TCTenantInfo>> tenantOwningRange = {};
Optional<Reference<TCTenantInfo>> tenantOwningAdjRange = {};
tenantOwningRange = self->ddTenantCache.get()->tenantOwning(keys.begin);
tenantOwningAdjRange = self->ddTenantCache.get()->tenantOwning(adjRange.begin);
if ((tenantOwningRange.present() != tenantOwningAdjRange.present()) ||
(tenantOwningRange.present() && (tenantOwningRange != tenantOwningAdjRange))) {
return false;
}
return true;
}
static bool shardForwardMergeFeasible(DataDistributionTracker* self, KeyRange const& keys, KeyRangeRef nextRange) {
if (keys.end == allKeys.end) {
return false;
}
return shardMergeFeasible(self, keys, nextRange);
}
static bool shardBackwardMergeFeasible(DataDistributionTracker* self, KeyRange const& keys, KeyRangeRef prevRange) {
if (keys.begin == allKeys.begin) {
return false;
}
return shardMergeFeasible(self, keys, prevRange);
}
Future<Void> shardMerger(DataDistributionTracker* self,
KeyRange const& keys,
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize) {
@ -594,6 +987,7 @@ Future<Void> shardMerger(DataDistributionTracker* self,
int shardsMerged = 1;
bool forwardComplete = false;
KeyRangeRef merged;
StorageMetrics endingStats = shardSize->get().get().metrics;
int shardCount = shardSize->get().get().shardCount;
double lastLowBandwidthStartTime = shardSize->get().get().lastLowBandwidthStartTime;
@ -614,7 +1008,14 @@ Future<Void> shardMerger(DataDistributionTracker* self,
forwardComplete = true;
continue;
}
++nextIter;
if (!shardForwardMergeFeasible(self, keys, nextIter->range())) {
--nextIter;
forwardComplete = true;
continue;
}
newMetrics = nextIter->value().stats->get();
// If going forward, give up when the next shard's stats are not yet present, or if the
@ -629,6 +1030,11 @@ Future<Void> shardMerger(DataDistributionTracker* self,
--prevIter;
newMetrics = prevIter->value().stats->get();
if (!shardBackwardMergeFeasible(self, keys, prevIter->range())) {
++prevIter;
break;
}
// If going backward, stop when the stats are not present or if the shard is already over the merge
// bounds. If this check triggers right away (if we have not merged anything) then return a trigger
// on the previous shard changing "size".
@ -654,8 +1060,8 @@ Future<Void> shardMerger(DataDistributionTracker* self,
shardsMerged++;
auto shardBounds = getShardSizeBounds(merged, maxShardSize);
// If we just recently get the current shard's metrics (i.e., less than DD_LOW_BANDWIDTH_DELAY ago), it means
// the shard's metric may not be stable yet. So we cannot continue merging in this direction.
// If we just recently get the current shard's metrics (i.e., less than DD_LOW_BANDWIDTH_DELAY ago), it
// means the shard's metric may not be stable yet. So we cannot continue merging in this direction.
if (endingStats.bytes >= shardBounds.min.bytes || getBandwidthStatus(endingStats) != BandwidthStatusLow ||
now() - lastLowBandwidthStartTime < SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY ||
shardsMerged >= SERVER_KNOBS->DD_MERGE_LIMIT) {
@ -682,6 +1088,10 @@ Future<Void> shardMerger(DataDistributionTracker* self,
}
}
if (shardsMerged == 1) {
return brokenPromiseToReady(nextIter->value().stats->onChange());
}
// restarting shard tracker will derefenced values in the shard map, so make a copy
KeyRange mergeRange = merged;
@ -689,9 +1099,11 @@ Future<Void> shardMerger(DataDistributionTracker* self,
// NewKeys: New key range after shards are merged;
// EndingSize: The new merged shard size in bytes;
// BatchedMerges: The number of shards merged. Each shard is defined in self->shards;
// LastLowBandwidthStartTime: When does a shard's bandwidth status becomes BandwidthStatusLow. If a shard's status
// LastLowBandwidthStartTime: When does a shard's bandwidth status becomes BandwidthStatusLow. If a shard's
// status
// becomes BandwidthStatusLow less than DD_LOW_BANDWIDTH_DELAY ago, the merging logic will stop at the shard;
// ShardCount: The number of non-splittable shards that are merged. Each shard is defined in self->shards may have
// ShardCount: The number of non-splittable shards that are merged. Each shard is defined in self->shards may
// have
// more than 1 shards.
TraceEvent("RelocateShardMergeMetrics", self->distributorId)
.detail("OldKeys", keys)
@ -724,10 +1136,22 @@ ACTOR Future<Void> shardEvaluator(DataDistributionTracker* self,
ShardSizeBounds shardBounds = getShardSizeBounds(keys, self->maxShardSize->get().get());
StorageMetrics const& stats = shardSize->get().get().metrics;
auto bandwidthStatus = getBandwidthStatus(stats);
bool sizeSplit = stats.bytes > shardBounds.max.bytes,
writeSplit = bandwidthStatus == BandwidthStatusHigh && keys.begin < keyServersKeys.begin;
bool shouldSplit = sizeSplit || writeSplit;
bool shouldMerge = stats.bytes < shardBounds.min.bytes && bandwidthStatus == BandwidthStatusLow;
auto prevIter = self->shards->rangeContaining(keys.begin);
if (keys.begin > allKeys.begin)
--prevIter;
auto nextIter = self->shards->rangeContaining(keys.begin);
if (keys.end < allKeys.end)
++nextIter;
bool shouldMerge = stats.bytes < shardBounds.min.bytes && bandwidthStatus == BandwidthStatusLow &&
(shardForwardMergeFeasible(self, keys, nextIter.range()) ||
shardBackwardMergeFeasible(self, keys, prevIter.range()));
// Every invocation must set this or clear it
if (shouldMerge && !self->anyZeroHealthyTeams->get()) {
@ -796,8 +1220,8 @@ ACTOR Future<Void> shardTracker(DataDistributionTracker::SafeAccessor self,
// Use the current known size to check for (and start) splits and merges.
wait(shardEvaluator(self(), keys, shardSize, wantsToMerge));
// We could have a lot of actors being released from the previous wait at the same time. Immediately calling
// delay(0) mitigates the resulting SlowTask
// We could have a lot of actors being released from the previous wait at the same time. Immediately
// calling delay(0) mitigates the resulting SlowTask
wait(delay(0, TaskPriority::DataDistribution));
}
} catch (Error& e) {
@ -1045,7 +1469,8 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
Reference<AsyncVar<bool>> anyZeroHealthyTeams,
UID distributorId,
KeyRangeMap<ShardTrackedData>* shards,
bool* trackerCancelled) {
bool* trackerCancelled,
Optional<Reference<TenantCache>> ddTenantCache) {
state DataDistributionTracker self(cx,
distributorId,
readyToStart,
@ -1054,7 +1479,8 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
physicalShardCollection,
anyZeroHealthyTeams,
shards,
trackerCancelled);
trackerCancelled,
ddTenantCache);
state Future<Void> loggingTrigger = Void();
state Future<Void> readHotDetect = readHotDetector(&self);
state Reference<EventCacheHolder> ddTrackerStatsEventHolder = makeReference<EventCacheHolder>("DDTrackerStats");
@ -1062,6 +1488,11 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
wait(trackInitialShards(&self, initData));
initData = Reference<InitialDataDistribution>();
state PromiseStream<TenantCacheTenantCreated> tenantCreationSignal;
if (self.ddTenantCache.present()) {
tenantCreationSignal = self.ddTenantCache.get()->tenantCreationSignal;
}
loop choose {
when(Promise<int64_t> req = waitNext(getAverageShardBytes)) { req.send(self.getAverageShardBytes()); }
when(wait(loggingTrigger)) {
@ -1083,6 +1514,11 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
self.sizeChanges.add(fetchShardMetricsList(&self, req));
}
when(wait(self.sizeChanges.getResult())) {}
when(TenantCacheTenantCreated newTenant = waitNext(tenantCreationSignal.getFuture())) {
self.sizeChanges.add(tenantCreationHandling(&self, newTenant));
}
when(KeyRange req = waitNext(self.shardsAffectedByTeamFailure->restartShardTracker.getFuture())) {
restartShardTrackers(&self, req);
}

View File

@ -584,12 +584,6 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
try {
wait(DataDistributor::init(self));
state Reference<TenantCache> ddTenantCache;
if (ddIsTenantAware) {
ddTenantCache = makeReference<TenantCache>(cx, self->ddId);
wait(ddTenantCache->build(cx));
}
// When/If this assertion fails, Evan owes Ben a pat on the back for his foresight
ASSERT(self->configuration.storageTeamSize > 0);
@ -601,6 +595,12 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
state Reference<AsyncVar<bool>> processingWiggle(new AsyncVar<bool>(false));
state Promise<Void> readyToStart;
state Optional<Reference<TenantCache>> ddTenantCache;
if (ddIsTenantAware) {
ddTenantCache = makeReference<TenantCache>(cx, self->ddId);
wait(ddTenantCache.get()->build());
}
self->shardsAffectedByTeamFailure = makeReference<ShardsAffectedByTeamFailure>();
self->physicalShardCollection = makeReference<PhysicalShardCollection>();
wait(self->resumeRelocations());
@ -624,10 +624,6 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
} else {
anyZeroHealthyTeams = zeroHealthyTeams[0];
}
if (ddIsTenantAware) {
actors.push_back(reportErrorsExcept(
ddTenantCache->monitorTenantMap(), "DDTenantCacheMonitor", self->ddId, &normalDDQueueErrors()));
}
actors.push_back(self->pollMoveKeysLock());
actors.push_back(reportErrorsExcept(dataDistributionTracker(self->initData,
@ -643,7 +639,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
anyZeroHealthyTeams,
self->ddId,
&shards,
&trackerCancelled),
&trackerCancelled,
ddTenantCache),
"DDTracker",
self->ddId,
&normalDDQueueErrors()));
@ -673,6 +670,13 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
self->ddId,
&normalDDQueueErrors()));
if (ddIsTenantAware) {
actors.push_back(reportErrorsExcept(ddTenantCache.get()->monitorTenantMap(),
"DDTenantCacheMonitor",
self->ddId,
&normalDDQueueErrors()));
}
std::vector<DDTeamCollection*> teamCollectionsPtrs;
primaryTeamCollection = makeReference<DDTeamCollection>(
cx,

View File

@ -18,8 +18,11 @@
* limitations under the License.
*/
#include "fdbclient/SystemData.h"
#include "fdbclient/FDBTypes.h"
#include "fdbserver/DDTeamCollection.h"
#include "fdbserver/TenantCache.h"
#include "flow/flow.h"
#include <limits>
#include <string>
#include "flow/actorcompiler.h"
@ -87,6 +90,8 @@ public:
for (int i = 0; i < tenantList.size(); i++) {
if (tenantCache->update(tenantList[i].first, tenantList[i].second)) {
tenantListUpdated = true;
TenantCacheTenantCreated req(tenantList[i].second.prefix);
tenantCache->tenantCreationSignal.send(req);
}
}
@ -174,7 +179,7 @@ std::string TenantCache::desc() const {
s += ", ";
}
s += "Name: " + tenant->name().toString() + " Prefix: " + tenantPrefix.printable();
s += "Name: " + tenant->name().toString() + " Prefix: " + tenantPrefix.toString();
count++;
}
@ -194,10 +199,23 @@ bool TenantCache::isTenantKey(KeyRef key) const {
return true;
}
Future<Void> TenantCache::build(Database cx) {
Future<Void> TenantCache::build() {
return TenantCacheImpl::build(this);
}
Optional<Reference<TCTenantInfo>> TenantCache::tenantOwning(KeyRef key) const {
auto it = tenantCache.lastLessOrEqual(key);
if (it == tenantCache.end()) {
return {};
}
if (!key.startsWith(it->key)) {
return {};
}
return it->value;
}
Future<Void> TenantCache::monitorTenantMap() {
return TenantCacheImpl::monitorTenantMap(this);
}

View File

@ -25,6 +25,9 @@
#define FDBSERVER_DATA_DISTRIBUTION_ACTOR_H
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/TenantCache.h"
#include "fdbserver/TCInfo.h"
#include "fdbclient/RunTransaction.actor.h"
#include "fdbserver/DDTxnProcessor.h"
#include "fdbserver/Knobs.h"
@ -45,7 +48,16 @@
// RelocateReason to DataMovementReason is one-to-N mapping
class RelocateReason {
public:
enum Value : int8_t { OTHER = 0, REBALANCE_DISK, REBALANCE_READ, MERGE_SHARD, SIZE_SPLIT, WRITE_SPLIT, __COUNT };
enum Value : int8_t {
OTHER = 0,
REBALANCE_DISK,
REBALANCE_READ,
MERGE_SHARD,
SIZE_SPLIT,
WRITE_SPLIT,
TENANT_SPLIT,
__COUNT
};
RelocateReason(Value v) : value(v) { ASSERT(value != __COUNT); }
explicit RelocateReason(int v) : value((Value)v) { ASSERT(value != __COUNT); }
std::string toString() const {
@ -62,6 +74,8 @@ public:
return "SizeSplit";
case WRITE_SPLIT:
return "WriteSplit";
case TENANT_SPLIT:
return "TenantSplit";
case __COUNT:
ASSERT(false);
}
@ -153,130 +167,6 @@ private:
moveReason(DataMovementReason::INVALID) {}
};
struct IDataDistributionTeam {
virtual std::vector<StorageServerInterface> getLastKnownServerInterfaces() const = 0;
virtual int size() const = 0;
virtual std::vector<UID> const& getServerIDs() const = 0;
virtual void addDataInFlightToTeam(int64_t delta) = 0;
virtual void addReadInFlightToTeam(int64_t delta) = 0;
virtual int64_t getDataInFlightToTeam() const = 0;
virtual int64_t getLoadBytes(bool includeInFlight = true, double inflightPenalty = 1.0) const = 0;
virtual int64_t getReadInFlightToTeam() const = 0;
virtual double getLoadReadBandwidth(bool includeInFlight = true, double inflightPenalty = 1.0) const = 0;
virtual int64_t getMinAvailableSpace(bool includeInFlight = true) const = 0;
virtual double getMinAvailableSpaceRatio(bool includeInFlight = true) const = 0;
virtual bool hasHealthyAvailableSpace(double minRatio) const = 0;
virtual Future<Void> updateStorageMetrics() = 0;
virtual void addref() const = 0;
virtual void delref() const = 0;
virtual bool isHealthy() const = 0;
virtual void setHealthy(bool) = 0;
virtual int getPriority() const = 0;
virtual void setPriority(int) = 0;
virtual bool isOptimal() const = 0;
virtual bool isWrongConfiguration() const = 0;
virtual void setWrongConfiguration(bool) = 0;
virtual void addServers(const std::vector<UID>& servers) = 0;
virtual std::string getTeamID() const = 0;
std::string getDesc() const {
const auto& servers = getLastKnownServerInterfaces();
std::string s = format("TeamID %s; ", getTeamID().c_str());
s += format("Size %d; ", servers.size());
for (int i = 0; i < servers.size(); i++) {
if (i)
s += ", ";
s += servers[i].address().toString() + " " + servers[i].id().shortString();
}
return s;
}
};
FDB_DECLARE_BOOLEAN_PARAM(WantNewServers);
FDB_DECLARE_BOOLEAN_PARAM(WantTrueBest);
FDB_DECLARE_BOOLEAN_PARAM(PreferLowerDiskUtil);
FDB_DECLARE_BOOLEAN_PARAM(TeamMustHaveShards);
FDB_DECLARE_BOOLEAN_PARAM(ForReadBalance);
FDB_DECLARE_BOOLEAN_PARAM(PreferLowerReadUtil);
FDB_DECLARE_BOOLEAN_PARAM(FindTeamByServers);
struct GetTeamRequest {
bool wantsNewServers; // In additional to servers in completeSources, try to find teams with new server
bool wantsTrueBest;
bool preferLowerDiskUtil; // if true, lower utilized team has higher score
bool teamMustHaveShards;
bool forReadBalance;
bool preferLowerReadUtil; // only make sense when forReadBalance is true
double inflightPenalty;
bool findTeamByServers;
std::vector<UID> completeSources;
std::vector<UID> src;
Promise<std::pair<Optional<Reference<IDataDistributionTeam>>, bool>> reply;
typedef Reference<IDataDistributionTeam> TeamRef;
GetTeamRequest() {}
GetTeamRequest(WantNewServers wantsNewServers,
WantTrueBest wantsTrueBest,
PreferLowerDiskUtil preferLowerDiskUtil,
TeamMustHaveShards teamMustHaveShards,
ForReadBalance forReadBalance = ForReadBalance::False,
PreferLowerReadUtil preferLowerReadUtil = PreferLowerReadUtil::False,
double inflightPenalty = 1.0)
: wantsNewServers(wantsNewServers), wantsTrueBest(wantsTrueBest), preferLowerDiskUtil(preferLowerDiskUtil),
teamMustHaveShards(teamMustHaveShards), forReadBalance(forReadBalance),
preferLowerReadUtil(preferLowerReadUtil), inflightPenalty(inflightPenalty),
findTeamByServers(FindTeamByServers::False) {}
GetTeamRequest(std::vector<UID> servers)
: wantsNewServers(WantNewServers::False), wantsTrueBest(WantTrueBest::False),
preferLowerDiskUtil(PreferLowerDiskUtil::False), teamMustHaveShards(TeamMustHaveShards::False),
forReadBalance(ForReadBalance::False), preferLowerReadUtil(PreferLowerReadUtil::False), inflightPenalty(1.0),
findTeamByServers(FindTeamByServers::True), src(std::move(servers)) {}
// return true if a.score < b.score
[[nodiscard]] bool lessCompare(TeamRef a, TeamRef b, int64_t aLoadBytes, int64_t bLoadBytes) const {
int res = 0;
if (forReadBalance) {
res = preferLowerReadUtil ? greaterReadLoad(a, b) : lessReadLoad(a, b);
}
return res == 0 ? lessCompareByLoad(aLoadBytes, bLoadBytes) : res < 0;
}
std::string getDesc() const {
std::stringstream ss;
ss << "WantsNewServers:" << wantsNewServers << " WantsTrueBest:" << wantsTrueBest
<< " PreferLowerDiskUtil:" << preferLowerDiskUtil << " teamMustHaveShards:" << teamMustHaveShards
<< "forReadBalance" << forReadBalance << " inflightPenalty:" << inflightPenalty
<< " findTeamByServers:" << findTeamByServers << ";";
ss << "CompleteSources:";
for (const auto& cs : completeSources) {
ss << cs.toString() << ",";
}
return std::move(ss).str();
}
private:
// return true if preferHigherUtil && aLoadBytes <= bLoadBytes (higher load bytes has larger score)
// or preferLowerUtil && aLoadBytes > bLoadBytes
bool lessCompareByLoad(int64_t aLoadBytes, int64_t bLoadBytes) const {
bool lessLoad = aLoadBytes <= bLoadBytes;
return preferLowerDiskUtil ? !lessLoad : lessLoad;
}
// return -1 if a.readload > b.readload
static int greaterReadLoad(TeamRef a, TeamRef b) {
auto r1 = a->getLoadReadBandwidth(true), r2 = b->getLoadReadBandwidth(true);
return r1 == r2 ? 0 : (r1 > r2 ? -1 : 1);
}
// return -1 if a.readload < b.readload
static int lessReadLoad(TeamRef a, TeamRef b) {
auto r1 = a->getLoadReadBandwidth(false), r2 = b->getLoadReadBandwidth(false);
return r1 == r2 ? 0 : (r1 < r2 ? -1 : 1);
}
};
struct GetMetricsRequest {
KeyRange keys;
Promise<StorageMetrics> reply;
@ -621,7 +511,8 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
Reference<AsyncVar<bool>> zeroHealthyTeams,
UID distributorId,
KeyRangeMap<ShardTrackedData>* shards,
bool* trackerCancelled);
bool* trackerCancelled,
Optional<Reference<TenantCache>> ddTenantCache);
ACTOR Future<Void> dataDistributionQueue(Database cx,
PromiseStream<RelocateShard> output,

View File

@ -0,0 +1,147 @@
/*
* DataDistributionTeam.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbclient/StorageServerInterface.h"
struct IDataDistributionTeam {
virtual std::vector<StorageServerInterface> getLastKnownServerInterfaces() const = 0;
virtual int size() const = 0;
virtual std::vector<UID> const& getServerIDs() const = 0;
virtual void addDataInFlightToTeam(int64_t delta) = 0;
virtual void addReadInFlightToTeam(int64_t delta) = 0;
virtual int64_t getDataInFlightToTeam() const = 0;
virtual int64_t getLoadBytes(bool includeInFlight = true, double inflightPenalty = 1.0) const = 0;
virtual int64_t getReadInFlightToTeam() const = 0;
virtual double getLoadReadBandwidth(bool includeInFlight = true, double inflightPenalty = 1.0) const = 0;
virtual int64_t getMinAvailableSpace(bool includeInFlight = true) const = 0;
virtual double getMinAvailableSpaceRatio(bool includeInFlight = true) const = 0;
virtual bool hasHealthyAvailableSpace(double minRatio) const = 0;
virtual Future<Void> updateStorageMetrics() = 0;
virtual void addref() const = 0;
virtual void delref() const = 0;
virtual bool isHealthy() const = 0;
virtual void setHealthy(bool) = 0;
virtual int getPriority() const = 0;
virtual void setPriority(int) = 0;
virtual bool isOptimal() const = 0;
virtual bool isWrongConfiguration() const = 0;
virtual void setWrongConfiguration(bool) = 0;
virtual void addServers(const std::vector<UID>& servers) = 0;
virtual std::string getTeamID() const = 0;
std::string getDesc() const {
const auto& servers = getLastKnownServerInterfaces();
std::string s = format("TeamID %s; ", getTeamID().c_str());
s += format("Size %d; ", servers.size());
for (int i = 0; i < servers.size(); i++) {
if (i)
s += ", ";
s += servers[i].address().toString() + " " + servers[i].id().shortString();
}
return s;
}
};
FDB_DECLARE_BOOLEAN_PARAM(WantNewServers);
FDB_DECLARE_BOOLEAN_PARAM(WantTrueBest);
FDB_DECLARE_BOOLEAN_PARAM(PreferLowerDiskUtil);
FDB_DECLARE_BOOLEAN_PARAM(TeamMustHaveShards);
FDB_DECLARE_BOOLEAN_PARAM(ForReadBalance);
FDB_DECLARE_BOOLEAN_PARAM(PreferLowerReadUtil);
FDB_DECLARE_BOOLEAN_PARAM(FindTeamByServers);
struct GetTeamRequest {
bool wantsNewServers; // In additional to servers in completeSources, try to find teams with new server
bool wantsTrueBest;
bool preferLowerDiskUtil; // if true, lower utilized team has higher score
bool teamMustHaveShards;
bool forReadBalance;
bool preferLowerReadUtil; // only make sense when forReadBalance is true
double inflightPenalty;
bool findTeamByServers;
std::vector<UID> completeSources;
std::vector<UID> src;
Promise<std::pair<Optional<Reference<IDataDistributionTeam>>, bool>> reply;
typedef Reference<IDataDistributionTeam> TeamRef;
GetTeamRequest() {}
GetTeamRequest(WantNewServers wantsNewServers,
WantTrueBest wantsTrueBest,
PreferLowerDiskUtil preferLowerDiskUtil,
TeamMustHaveShards teamMustHaveShards,
ForReadBalance forReadBalance = ForReadBalance::False,
PreferLowerReadUtil preferLowerReadUtil = PreferLowerReadUtil::False,
double inflightPenalty = 1.0)
: wantsNewServers(wantsNewServers), wantsTrueBest(wantsTrueBest), preferLowerDiskUtil(preferLowerDiskUtil),
teamMustHaveShards(teamMustHaveShards), forReadBalance(forReadBalance),
preferLowerReadUtil(preferLowerReadUtil), inflightPenalty(inflightPenalty),
findTeamByServers(FindTeamByServers::False) {}
GetTeamRequest(std::vector<UID> servers)
: wantsNewServers(WantNewServers::False), wantsTrueBest(WantTrueBest::False),
preferLowerDiskUtil(PreferLowerDiskUtil::False), teamMustHaveShards(TeamMustHaveShards::False),
forReadBalance(ForReadBalance::False), preferLowerReadUtil(PreferLowerReadUtil::False), inflightPenalty(1.0),
findTeamByServers(FindTeamByServers::True), src(std::move(servers)) {}
// return true if a.score < b.score
[[nodiscard]] bool lessCompare(TeamRef a, TeamRef b, int64_t aLoadBytes, int64_t bLoadBytes) const {
int res = 0;
if (forReadBalance) {
res = preferLowerReadUtil ? greaterReadLoad(a, b) : lessReadLoad(a, b);
}
return res == 0 ? lessCompareByLoad(aLoadBytes, bLoadBytes) : res < 0;
}
std::string getDesc() const {
std::stringstream ss;
ss << "WantsNewServers:" << wantsNewServers << " WantsTrueBest:" << wantsTrueBest
<< " PreferLowerDiskUtil:" << preferLowerDiskUtil << " teamMustHaveShards:" << teamMustHaveShards
<< "forReadBalance" << forReadBalance << " inflightPenalty:" << inflightPenalty
<< " findTeamByServers:" << findTeamByServers << ";";
ss << "CompleteSources:";
for (const auto& cs : completeSources) {
ss << cs.toString() << ",";
}
return std::move(ss).str();
}
private:
// return true if preferHigherUtil && aLoadBytes <= bLoadBytes (higher load bytes has larger score)
// or preferLowerUtil && aLoadBytes > bLoadBytes
bool lessCompareByLoad(int64_t aLoadBytes, int64_t bLoadBytes) const {
bool lessLoad = aLoadBytes <= bLoadBytes;
return preferLowerDiskUtil ? !lessLoad : lessLoad;
}
// return -1 if a.readload > b.readload
static int greaterReadLoad(TeamRef a, TeamRef b) {
auto r1 = a->getLoadReadBandwidth(true), r2 = b->getLoadReadBandwidth(true);
return r1 == r2 ? 0 : (r1 > r2 ? -1 : 1);
}
// return -1 if a.readload < b.readload
static int lessReadLoad(TeamRef a, TeamRef b) {
auto r1 = a->getLoadReadBandwidth(false), r2 = b->getLoadReadBandwidth(false);
return r1 == r2 ? 0 : (r1 < r2 ? -1 : 1);
}
};

View File

@ -20,8 +20,11 @@
#pragma once
#include "fdbclient/SystemData.h"
#include "fdbclient/Tenant.h"
#include "fdbserver/DDTeamCollection.h"
#include "fdbrpc/ReplicationTypes.h"
#include "fdbserver/DataDistributionTeam.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
@ -29,6 +32,7 @@ class TCTeamInfo;
class TCTenantInfo;
class TCMachineInfo;
class TCMachineTeamInfo;
class DDTeamCollection;
class TCServerInfo : public ReferenceCounted<TCServerInfo> {
friend class TCServerInfoImpl;
@ -257,8 +261,8 @@ public:
TCTenantInfo(TenantInfo tinfo, Key prefix) : m_tenantInfo(tinfo), m_prefix(prefix) {}
std::vector<Reference<TCTeamInfo>>& teams() { return m_tenantTeams; }
TenantName name() { return m_tenantInfo.name.get(); }
std::string prefixDesc() { return m_prefix.printable(); }
TenantName name() const { return m_tenantInfo.name.get(); }
std::string prefixDesc() const { return m_prefix.printable(); }
void addTeam(TCTeamInfo team);
void removeTeam(TCTeamInfo team);

View File

@ -18,17 +18,26 @@
* limitations under the License.
*/
#pragma once
#include "fdbclient/FDBTypes.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/Tenant.h"
#include "fdbserver/DDTeamCollection.h"
#include "fdbserver/TCInfo.h"
#include "flow/IRandom.h"
#include "flow/IndexedSet.h"
#include "flow/flow.h"
#include <limits>
#include <string>
typedef Map<KeyRef, Reference<TCTenantInfo>> TenantMapByPrefix;
struct TenantCacheTenantCreated {
KeyRange keys;
Promise<bool> reply;
TenantCacheTenantCreated(Key prefix) { keys = prefixRange(prefix); }
};
class TenantCache : public ReferenceCounted<TenantCache> {
friend class TenantCacheImpl;
friend class TenantCacheUnitTest;
@ -62,11 +71,15 @@ public:
generation = deterministicRandom()->randomUInt32();
}
Future<Void> build(Database cx);
PromiseStream<TenantCacheTenantCreated> tenantCreationSignal;
Future<Void> build();
Future<Void> monitorTenantMap();
std::string desc() const;
bool isTenantKey(KeyRef key) const;
Optional<Reference<TCTenantInfo>> tenantOwning(KeyRef key) const;
};