Merge pull request #7803 from sfc-gh-xwang/feature/main/ddvisibility
Add server selection counter in DDQueue
This commit is contained in:
commit
9133d4e16d
|
@ -20,7 +20,7 @@ Data distribution manages the lifetime of storage servers, decides which storage
|
|||
|
||||
**RelocateShard (`struct RelocateShard`)**: A `RelocateShard` records the key range that need to be moved among servers and the data movement’s priority. DD always move shards with higher priorities first.
|
||||
|
||||
**Data distribution queue (`struct DDQueueData`)**: It receives shards to be relocated (i.e., RelocateShards), decides which shard should be moved to which server team, prioritizes the data movement based on relocate shard’s priority, and controls the progress of data movement based on servers’ workload.
|
||||
**Data distribution queue (`struct DDQueue`)**: It receives shards to be relocated (i.e., RelocateShards), decides which shard should be moved to which server team, prioritizes the data movement based on relocate shard’s priority, and controls the progress of data movement based on servers’ workload.
|
||||
|
||||
**Special keys in the system keyspace**: DD saves its state in the system keyspace to recover from failure and to ensure every process (e.g., commit proxies, tLogs and storage servers) has a consistent view of which storage server is responsible for which key range.
|
||||
|
||||
|
@ -153,3 +153,25 @@ CPU utilization. This metric is in a positive relationship with “FinishedQueri
|
|||
* The typical movement size under a read-skew scenario is 100M ~ 600M under default KNOB value `READ_REBALANCE_MAX_SHARD_FRAC=0.2, READ_REBALANCE_SRC_PARALLELISM = 20`. Increasing those knobs may accelerate the converge speed with the risk of data movement churn, which overwhelms the destination and over-cold the source.
|
||||
* The upper bound of `READ_REBALANCE_MAX_SHARD_FRAC` is 0.5. Any value larger than 0.5 can result in hot server switching.
|
||||
* When needing a deeper diagnosis of the read aware DD, `BgDDMountainChopper_New`, and `BgDDValleyFiller_New` trace events are where to go.
|
||||
|
||||
## Data Distribution Diagnosis Q&A
|
||||
* Why Read-aware DD hasn't been triggered when there's a read imbalance?
|
||||
* Check `BgDDMountainChopper_New`, `BgDDValleyFiller_New` `SkipReason` field.
|
||||
* The Read-aware DD is triggered, and some data movement happened, but it doesn't help the read balance. Why?
|
||||
* Need to figure out which server is selected as the source and destination. The information is in `BgDDMountainChopper*`, `BgDDValleyFiller*` `DestTeam` and `SourceTeam` field.
|
||||
* Also, the `DDQueueServerCounter` event tells how many times a server being a source or destination (defined in
|
||||
```c++
|
||||
enum CountType : uint8_t { ProposedSource = 0, QueuedSource, LaunchedSource, LaunchedDest };
|
||||
```
|
||||
) for different relocation reason (`Other`, `RebalanceDisk` and so on) in different phase within `DD_QUEUE_COUNTER_REFRESH_INTERVAL` (default 60) seconds. For example,
|
||||
```xml
|
||||
<Event Severity="10" Time="1659974950.984176" DateTime="2022-08-08T16:09:10Z" Type="DDQueueServerCounter" ID="0000000000000000" ServerId="0000000000000004" OtherPQSD="0 1 3 2" RebalanceDiskPQSD="0 0 1 4" RebalanceReadPQSD="2 0 0 5" MergeShardPQSD="0 0 1 0" SizeSplitPQSD="0 0 5 0" WriteSplitPQSD="1 0 0 0" ThreadID="9733255463206053180" Machine="0.0.0.0:0" LogGroup="default" Roles="TS" />
|
||||
```
|
||||
`RebalanceReadPQSD="2 0 0 5"` means server `0000000000000004` has been selected as for read balancing for twice, but it's not queued and executed yet. This server also has been a destination for read balancing for 5 times in the past 1 min. Note that the field will be skipped if all 4 numbers are 0. To avoid spammy traces, if is enabled with knob `DD_QUEUE_COUNTER_SUMMARIZE = true`, event `DDQueueServerCounterTooMany` will summarize the unreported servers that involved in launched relocations (aka. `LaunchedSource`, `LaunchedDest` count are non-zero):
|
||||
```xml
|
||||
<Event Severity="10" Time="1660095057.995837" DateTime="2022-08-10T01:30:57Z" Type="DDQueueServerCounterTooMany" ID="0000000000000000" RemainedLaunchedSources="000000000000007f,00000000000000d9,00000000000000e8,000000000000014c,0000000000000028,00000000000000d6,0000000000000067,000000000000003e,000000000000007d,000000000000000a,00000000000000cb,0000000000000106,00000000000000c1,000000000000003c,000000000000016e,00000000000000e4,000000000000013c,0000000000000016,0000000000000179,0000000000000061,00000000000000c2,000000000000005a,0000000000000001,00000000000000c9,000000000000012a,00000000000000fb,0000000000000146," RemainedLaunchedDestinations="0000000000000079,0000000000000115,000000000000018e,0000000000000167,0000000000000135,0000000000000139,0000000000000077,0000000000000118,00000000000000bb,0000000000000177,00000000000000c0,000000000000014d,000000000000017f,00000000000000c3,000000000000015c,00000000000000fb,0000000000000186,0000000000000157,00000000000000b6,0000000000000072,0000000000000144," ThreadID="1322639651557440362" Machine="0.0.0.0:0" LogGroup="default" Roles="TS" />
|
||||
```
|
||||
* How to track the lifecycle of a relocation attempt for balancing?
|
||||
* First find the TraceId fields in `BgDDMountainChopper*`, `BgDDValleyFiller*`, which indicates a relocation is triggered.
|
||||
* (Only when enabled) Find the `QueuedRelocation` event with the same `BeginPair` and `EndPair` as the original `TraceId`. This means the relocation request is queued.
|
||||
* Find the `RelocateShard` event whose `BeginPair`, `EndPair` field is the same as `TraceId`. This event means the relocation is ongoing.
|
||||
|
|
|
@ -128,6 +128,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( BG_REBALANCE_POLLING_INTERVAL, 10.0 );
|
||||
init( BG_REBALANCE_SWITCH_CHECK_INTERVAL, 5.0 ); if (randomize && BUGGIFY) BG_REBALANCE_SWITCH_CHECK_INTERVAL = 1.0;
|
||||
init( DD_QUEUE_LOGGING_INTERVAL, 5.0 );
|
||||
init( DD_QUEUE_COUNTER_REFRESH_INTERVAL, 60.0 );
|
||||
// 100 / 60 < 2 trace/sec ~ 2 * 200 = 400b/sec
|
||||
init( DD_QUEUE_COUNTER_MAX_LOG, 100 ); if( randomize && BUGGIFY ) DD_QUEUE_COUNTER_MAX_LOG = 1;
|
||||
init( DD_QUEUE_COUNTER_SUMMARIZE, true );
|
||||
init( RELOCATION_PARALLELISM_PER_SOURCE_SERVER, 2 ); if( randomize && BUGGIFY ) RELOCATION_PARALLELISM_PER_SOURCE_SERVER = 1;
|
||||
init( RELOCATION_PARALLELISM_PER_DEST_SERVER, 10 ); if( randomize && BUGGIFY ) RELOCATION_PARALLELISM_PER_DEST_SERVER = 1; // Note: if this is smaller than FETCH_KEYS_PARALLELISM, this will artificially reduce performance. The current default of 10 is probably too high but is set conservatively for now.
|
||||
init( DD_QUEUE_MAX_KEY_SERVERS, 100 ); if( randomize && BUGGIFY ) DD_QUEUE_MAX_KEY_SERVERS = 1;
|
||||
|
|
|
@ -118,6 +118,11 @@ public:
|
|||
double BG_REBALANCE_POLLING_INTERVAL;
|
||||
double BG_REBALANCE_SWITCH_CHECK_INTERVAL;
|
||||
double DD_QUEUE_LOGGING_INTERVAL;
|
||||
double DD_QUEUE_COUNTER_REFRESH_INTERVAL;
|
||||
double DD_QUEUE_COUNTER_MAX_LOG; // max number of servers for which trace events will be generated in each round of
|
||||
// DD_QUEUE_COUNTER_REFRESH_INTERVAL duration
|
||||
bool DD_QUEUE_COUNTER_SUMMARIZE; // Enable summary of remaining servers when the number of servers with ongoing
|
||||
// relocations in the last minute exceeds DD_QUEUE_COUNTER_MAX_LOG
|
||||
double RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
|
||||
double RELOCATION_PARALLELISM_PER_DEST_SERVER;
|
||||
int DD_QUEUE_MAX_KEY_SERVERS;
|
||||
|
|
|
@ -949,11 +949,13 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
RelocateShard rs(shards[i], maxPriority, RelocateReason::OTHER);
|
||||
RelocateShard rs(
|
||||
shards[i], maxPriority, RelocateReason::OTHER, deterministicRandom()->randomUniqueID());
|
||||
|
||||
self->output.send(rs);
|
||||
TraceEvent("SendRelocateToDDQueue", self->distributorId)
|
||||
.suppressFor(1.0)
|
||||
.detail("TraceId", rs.traceId)
|
||||
.detail("ServerPrimary", self->primary)
|
||||
.detail("ServerTeam", team->getDesc())
|
||||
.detail("KeyBegin", rs.keys.begin)
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbserver/DDTxnProcessor.h"
|
||||
#include "flow/DebugTrace.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
#define WORK_FULL_UTILIZATION 10000 // This is not a knob; it is a fixed point scaling factor!
|
||||
|
@ -124,7 +125,7 @@ struct RelocateData {
|
|||
RelocateReason reason;
|
||||
|
||||
double startTime;
|
||||
UID randomId;
|
||||
UID randomId; // inherit from RelocateShard.traceId
|
||||
UID dataMoveId;
|
||||
int workFactor;
|
||||
std::vector<UID> src;
|
||||
|
@ -142,11 +143,12 @@ struct RelocateData {
|
|||
explicit RelocateData(RelocateShard const& rs)
|
||||
: keys(rs.keys), priority(rs.priority), boundaryPriority(isBoundaryPriority(rs.priority) ? rs.priority : -1),
|
||||
healthPriority(isHealthPriority(rs.priority) ? rs.priority : -1), reason(rs.reason), startTime(now()),
|
||||
randomId(deterministicRandom()->randomUniqueID()), dataMoveId(rs.dataMoveId), workFactor(0),
|
||||
wantsNewServers(
|
||||
isDataMovementForMountainChopper(rs.moveReason) || isDataMovementForValleyFiller(rs.moveReason) ||
|
||||
rs.moveReason == DataMovementReason::SPLIT_SHARD || rs.moveReason == DataMovementReason::TEAM_REDUNDANT),
|
||||
cancellable(true), interval("QueuedRelocation"), dataMove(rs.dataMove) {
|
||||
randomId(rs.traceId.isValid() ? rs.traceId : deterministicRandom()->randomUniqueID()),
|
||||
dataMoveId(rs.dataMoveId), workFactor(0), wantsNewServers(isDataMovementForMountainChopper(rs.moveReason) ||
|
||||
isDataMovementForValleyFiller(rs.moveReason) ||
|
||||
rs.moveReason == DataMovementReason::SPLIT_SHARD ||
|
||||
rs.moveReason == DataMovementReason::TEAM_REDUNDANT),
|
||||
cancellable(true), interval("QueuedRelocation", randomId), dataMove(rs.dataMove) {
|
||||
if (dataMove != nullptr) {
|
||||
this->src.insert(this->src.end(), dataMove->meta.src.begin(), dataMove->meta.src.end());
|
||||
}
|
||||
|
@ -507,14 +509,14 @@ void complete(RelocateData const& relocation, std::map<UID, Busyness>& busymap,
|
|||
}
|
||||
|
||||
// Cancells in-flight data moves intersecting with range.
|
||||
ACTOR Future<Void> cancelDataMove(struct DDQueueData* self, KeyRange range, const DDEnabledState* ddEnabledState);
|
||||
ACTOR Future<Void> cancelDataMove(struct DDQueue* self, KeyRange range, const DDEnabledState* ddEnabledState);
|
||||
|
||||
ACTOR Future<Void> dataDistributionRelocator(struct DDQueueData* self,
|
||||
ACTOR Future<Void> dataDistributionRelocator(struct DDQueue* self,
|
||||
RelocateData rd,
|
||||
Future<Void> prevCleanup,
|
||||
const DDEnabledState* ddEnabledState);
|
||||
|
||||
struct DDQueueData {
|
||||
struct DDQueue {
|
||||
struct DDDataMove {
|
||||
DDDataMove() = default;
|
||||
explicit DDDataMove(UID id) : id(id) {}
|
||||
|
@ -525,6 +527,100 @@ struct DDQueueData {
|
|||
Future<Void> cancel;
|
||||
};
|
||||
|
||||
struct ServerCounter {
|
||||
enum CountType : uint8_t { ProposedSource = 0, QueuedSource, LaunchedSource, LaunchedDest, __COUNT };
|
||||
|
||||
private:
|
||||
typedef std::array<int, (int)__COUNT> Item; // one for each CountType
|
||||
typedef std::array<Item, RelocateReason::typeCount()> ReasonItem; // one for each RelocateReason
|
||||
|
||||
std::unordered_map<UID, ReasonItem> counter;
|
||||
|
||||
std::string toString(const Item& item) const {
|
||||
return format("%d %d %d %d", item[0], item[1], item[2], item[3]);
|
||||
}
|
||||
|
||||
void traceReasonItem(TraceEvent* event, const ReasonItem& item) const {
|
||||
for (int i = 0; i < item.size(); ++i) {
|
||||
if (std::accumulate(item[i].cbegin(), item[i].cend(), 0) > 0) {
|
||||
// "PQSD" corresponding to CounterType
|
||||
event->detail(RelocateReason(i).toString() + "PQSD", toString(item[i]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool countNonZero(const ReasonItem& item, CountType type) const {
|
||||
return std::any_of(item.cbegin(), item.cend(), [type](const Item& item) { return item[(int)type] > 0; });
|
||||
}
|
||||
|
||||
void increase(const UID& id, RelocateReason reason, CountType type) {
|
||||
int idx = (int)(reason);
|
||||
// if (idx < 0 || idx >= RelocateReason::typeCount()) {
|
||||
// TraceEvent(SevWarnAlways, "ServerCounterDebug").detail("Reason", reason.toString());
|
||||
// }
|
||||
ASSERT(idx >= 0 && idx < RelocateReason::typeCount());
|
||||
counter[id][idx][(int)type] += 1;
|
||||
}
|
||||
|
||||
void summarizeLaunchedServers(decltype(counter.cbegin()) begin,
|
||||
decltype(counter.cend()) end,
|
||||
TraceEvent* event) const {
|
||||
if (begin == end)
|
||||
return;
|
||||
|
||||
std::string execSrc, execDest;
|
||||
for (; begin != end; ++begin) {
|
||||
if (countNonZero(begin->second, LaunchedSource)) {
|
||||
execSrc += begin->first.shortString() + ",";
|
||||
}
|
||||
if (countNonZero(begin->second, LaunchedDest)) {
|
||||
execDest += begin->first.shortString() + ",";
|
||||
}
|
||||
}
|
||||
event->detail("RemainedLaunchedSources", execSrc).detail("RemainedLaunchedDestinations", execDest);
|
||||
}
|
||||
|
||||
public:
|
||||
void clear() { counter.clear(); }
|
||||
|
||||
int get(const UID& id, RelocateReason reason, CountType type) const {
|
||||
return counter.at(id)[(int)reason][(int)type];
|
||||
}
|
||||
|
||||
void increaseForTeam(const std::vector<UID>& ids, RelocateReason reason, CountType type) {
|
||||
for (auto& id : ids) {
|
||||
increase(id, reason, type);
|
||||
}
|
||||
}
|
||||
|
||||
void traceAll(const UID& debugId = UID()) const {
|
||||
auto it = counter.cbegin();
|
||||
int count = 0;
|
||||
for (; count < SERVER_KNOBS->DD_QUEUE_COUNTER_MAX_LOG && it != counter.cend(); ++count, ++it) {
|
||||
TraceEvent event("DDQueueServerCounter", debugId);
|
||||
event.detail("ServerId", it->first);
|
||||
traceReasonItem(&event, it->second);
|
||||
}
|
||||
|
||||
if (it != counter.cend()) {
|
||||
TraceEvent e(SevWarn, "DDQueueServerCounterTooMany", debugId);
|
||||
e.detail("Servers", size());
|
||||
if (SERVER_KNOBS->DD_QUEUE_COUNTER_SUMMARIZE) {
|
||||
summarizeLaunchedServers(it, counter.cend(), &e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
size_t size() const { return counter.size(); }
|
||||
|
||||
// for random test
|
||||
static CountType randomCountType() {
|
||||
int i = deterministicRandom()->randomInt(0, (int)__COUNT);
|
||||
return (CountType)i;
|
||||
}
|
||||
};
|
||||
|
||||
ActorCollectionNoErrors noErrorActors; // has to be the last one to be destroyed because other Actors may use it.
|
||||
UID distributorId;
|
||||
MoveKeysLock lock;
|
||||
|
@ -558,6 +654,7 @@ struct DDQueueData {
|
|||
// The last time one server was selected as source team for read rebalance reason. We want to throttle read
|
||||
// rebalance on time bases because the read workload sample update has delay after the previous moving
|
||||
std::map<UID, double> lastAsSource;
|
||||
ServerCounter serverCounter;
|
||||
|
||||
KeyRangeMap<RelocateData> inFlight;
|
||||
// Track all actors that relocates specified keys to a good place; Key: keyRange; Value: actor
|
||||
|
@ -625,18 +722,18 @@ struct DDQueueData {
|
|||
}
|
||||
}
|
||||
|
||||
DDQueueData(UID mid,
|
||||
MoveKeysLock lock,
|
||||
Database cx,
|
||||
std::vector<TeamCollectionInterface> teamCollections,
|
||||
Reference<ShardsAffectedByTeamFailure> sABTF,
|
||||
PromiseStream<Promise<int64_t>> getAverageShardBytes,
|
||||
int teamSize,
|
||||
int singleRegionTeamSize,
|
||||
PromiseStream<RelocateShard> output,
|
||||
FutureStream<RelocateShard> input,
|
||||
PromiseStream<GetMetricsRequest> getShardMetrics,
|
||||
PromiseStream<GetTopKMetricsRequest> getTopKMetrics)
|
||||
DDQueue(UID mid,
|
||||
MoveKeysLock lock,
|
||||
Database cx,
|
||||
std::vector<TeamCollectionInterface> teamCollections,
|
||||
Reference<ShardsAffectedByTeamFailure> sABTF,
|
||||
PromiseStream<Promise<int64_t>> getAverageShardBytes,
|
||||
int teamSize,
|
||||
int singleRegionTeamSize,
|
||||
PromiseStream<RelocateShard> output,
|
||||
FutureStream<RelocateShard> input,
|
||||
PromiseStream<GetMetricsRequest> getShardMetrics,
|
||||
PromiseStream<GetTopKMetricsRequest> getTopKMetrics)
|
||||
: distributorId(mid), lock(lock), cx(cx), txnProcessor(new DDTxnProcessor(cx)), teamCollections(teamCollections),
|
||||
shardsAffectedByTeamFailure(sABTF), getAverageShardBytes(getAverageShardBytes),
|
||||
startMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
|
||||
|
@ -648,6 +745,7 @@ struct DDQueueData {
|
|||
suppressIntervals(0), rawProcessingUnhealthy(new AsyncVar<bool>(false)),
|
||||
rawProcessingWiggle(new AsyncVar<bool>(false)), unhealthyRelocations(0),
|
||||
movedKeyServersEventHolder(makeReference<EventCacheHolder>("MovedKeyServers")) {}
|
||||
DDQueue() = default;
|
||||
|
||||
void validate() {
|
||||
if (EXPENSIVE_VALIDATION) {
|
||||
|
@ -793,7 +891,7 @@ struct DDQueueData {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> getSourceServersForRange(DDQueueData* self,
|
||||
ACTOR static Future<Void> getSourceServersForRange(DDQueue* self,
|
||||
RelocateData input,
|
||||
PromiseStream<RelocateData> output,
|
||||
Reference<FlowLock> fetchLock) {
|
||||
|
@ -895,11 +993,14 @@ struct DDQueueData {
|
|||
|
||||
if (rrs.src.size() == 0 && (rrs.keys == rd.keys || fetchingSourcesQueue.erase(rrs) > 0)) {
|
||||
rrs.keys = affectedQueuedItems[r];
|
||||
rrs.interval = TraceInterval("QueuedRelocation", rrs.randomId); // inherit the old randomId
|
||||
|
||||
DebugRelocationTraceEvent(rrs.interval.begin(), distributorId)
|
||||
.detail("KeyBegin", rrs.keys.begin)
|
||||
.detail("KeyEnd", rrs.keys.end)
|
||||
.detail("Priority", rrs.priority)
|
||||
.detail("WantsNewServers", rrs.wantsNewServers);
|
||||
|
||||
rrs.interval = TraceInterval("QueuedRelocation");
|
||||
/*TraceEvent(rrs.interval.begin(), distributorId);
|
||||
.detail("KeyBegin", rrs.keys.begin).detail("KeyEnd", rrs.keys.end)
|
||||
.detail("Priority", rrs.priority).detail("WantsNewServers", rrs.wantsNewServers);*/
|
||||
queuedRelocations++;
|
||||
TraceEvent(SevVerbose, "QueuedRelocationsChanged")
|
||||
.detail("DataMoveID", rrs.dataMoveId)
|
||||
|
@ -921,11 +1022,15 @@ struct DDQueueData {
|
|||
|
||||
if (serverQueue.erase(rrs) > 0) {
|
||||
if (!foundActiveRelocation) {
|
||||
newData.interval = TraceInterval("QueuedRelocation");
|
||||
/*TraceEvent(newData.interval.begin(), distributorId);
|
||||
.detail("KeyBegin", newData.keys.begin).detail("KeyEnd", newData.keys.end)
|
||||
.detail("Priority", newData.priority).detail("WantsNewServers",
|
||||
newData.wantsNewServers);*/
|
||||
newData.interval =
|
||||
TraceInterval("QueuedRelocation", rrs.randomId); // inherit the old randomId
|
||||
|
||||
DebugRelocationTraceEvent(newData.interval.begin(), distributorId)
|
||||
.detail("KeyBegin", newData.keys.begin)
|
||||
.detail("KeyEnd", newData.keys.end)
|
||||
.detail("Priority", newData.priority)
|
||||
.detail("WantsNewServers", newData.wantsNewServers);
|
||||
|
||||
queuedRelocations++;
|
||||
TraceEvent(SevVerbose, "QueuedRelocationsChanged")
|
||||
.detail("DataMoveID", newData.dataMoveId)
|
||||
|
@ -946,11 +1051,11 @@ struct DDQueueData {
|
|||
}
|
||||
}
|
||||
|
||||
/*TraceEvent("ReceivedRelocateShard", distributorId)
|
||||
.detail("KeyBegin", rd.keys.begin)
|
||||
.detail("KeyEnd", rd.keys.end)
|
||||
DebugRelocationTraceEvent("ReceivedRelocateShard", distributorId)
|
||||
.detail("KeyBegin", rd.keys.begin)
|
||||
.detail("KeyEnd", rd.keys.end)
|
||||
.detail("Priority", rd.priority)
|
||||
.detail("AffectedRanges", affectedQueuedItems.size()); */
|
||||
.detail("AffectedRanges", affectedQueuedItems.size());
|
||||
}
|
||||
|
||||
void completeSourceFetch(const RelocateData& results) {
|
||||
|
@ -964,6 +1069,7 @@ struct DDQueueData {
|
|||
queue[results.src[i]].insert(results);
|
||||
}
|
||||
updateLastAsSource(results.src);
|
||||
serverCounter.increaseForTeam(results.src, results.reason, ServerCounter::CountType::QueuedSource);
|
||||
}
|
||||
|
||||
void logRelocation(const RelocateData& rd, const char* title) {
|
||||
|
@ -1032,10 +1138,12 @@ struct DDQueueData {
|
|||
if (fetchKeysComplete.count(it->value()) && inFlightActors.liveActorAt(it->range().begin) &&
|
||||
!rd.keys.contains(it->range()) && it->value().priority >= rd.priority &&
|
||||
rd.healthPriority < SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY) {
|
||||
/*TraceEvent("OverlappingInFlight", distributorId)
|
||||
|
||||
DebugRelocationTraceEvent("OverlappingInFlight", distributorId)
|
||||
.detail("KeyBegin", it->value().keys.begin)
|
||||
.detail("KeyEnd", it->value().keys.end)
|
||||
.detail("Priority", it->value().priority);*/
|
||||
.detail("Priority", it->value().priority);
|
||||
|
||||
overlappingInFlight = true;
|
||||
break;
|
||||
}
|
||||
|
@ -1070,8 +1178,8 @@ struct DDQueueData {
|
|||
// because they do not have too much inflight data movement.
|
||||
|
||||
// logRelocation( rd, "LaunchingRelocation" );
|
||||
DebugRelocationTraceEvent(rd.interval.end(), distributorId).detail("Result", "Success");
|
||||
|
||||
//TraceEvent(rd.interval.end(), distributorId).detail("Result","Success");
|
||||
if (!rd.isRestore()) {
|
||||
queuedRelocations--;
|
||||
TraceEvent(SevVerbose, "QueuedRelocationsChanged")
|
||||
|
@ -1188,7 +1296,7 @@ struct DDQueueData {
|
|||
}
|
||||
}
|
||||
|
||||
DDQueueData::DDDataMove dataMove(dataMoveId);
|
||||
DDQueue::DDDataMove dataMove(dataMoveId);
|
||||
dataMove.cancel = cleanUpDataMove(
|
||||
this->cx, dataMoveId, this->lock, &this->cleanUpDataMoveParallelismLock, range, ddEnabledState);
|
||||
this->dataMoves.insert(range, dataMove);
|
||||
|
@ -1196,9 +1304,17 @@ struct DDQueueData {
|
|||
.detail("DataMoveID", dataMoveId)
|
||||
.detail("Range", range);
|
||||
}
|
||||
|
||||
Future<Void> periodicalRefreshCounter() {
|
||||
auto f = [this]() {
|
||||
serverCounter.traceAll(distributorId);
|
||||
serverCounter.clear();
|
||||
};
|
||||
return recurring(f, SERVER_KNOBS->DD_QUEUE_COUNTER_REFRESH_INTERVAL);
|
||||
}
|
||||
};
|
||||
|
||||
ACTOR Future<Void> cancelDataMove(struct DDQueueData* self, KeyRange range, const DDEnabledState* ddEnabledState) {
|
||||
ACTOR Future<Void> cancelDataMove(struct DDQueue* self, KeyRange range, const DDEnabledState* ddEnabledState) {
|
||||
std::vector<Future<Void>> cleanup;
|
||||
auto f = self->dataMoves.intersectingRanges(range);
|
||||
for (auto it = f.begin(); it != f.end(); ++it) {
|
||||
|
@ -1219,7 +1335,7 @@ ACTOR Future<Void> cancelDataMove(struct DDQueueData* self, KeyRange range, cons
|
|||
wait(waitForAll(cleanup));
|
||||
auto ranges = self->dataMoves.getAffectedRangesAfterInsertion(range);
|
||||
if (!ranges.empty()) {
|
||||
self->dataMoves.insert(KeyRangeRef(ranges.front().begin, ranges.back().end), DDQueueData::DDDataMove());
|
||||
self->dataMoves.insert(KeyRangeRef(ranges.front().begin, ranges.back().end), DDQueue::DDDataMove());
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
@ -1238,12 +1354,12 @@ static std::string destServersString(std::vector<std::pair<Reference<IDataDistri
|
|||
|
||||
// This actor relocates the specified keys to a good place.
|
||||
// The inFlightActor key range map stores the actor for each RelocateData
|
||||
ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self,
|
||||
ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
|
||||
RelocateData rd,
|
||||
Future<Void> prevCleanup,
|
||||
const DDEnabledState* ddEnabledState) {
|
||||
state Promise<Void> errorOut(self->error);
|
||||
state TraceInterval relocateShardInterval("RelocateShard");
|
||||
state TraceInterval relocateShardInterval("RelocateShard", rd.randomId);
|
||||
state PromiseStream<RelocateData> dataTransferComplete(self->dataTransferComplete);
|
||||
state PromiseStream<RelocateData> relocationComplete(self->relocationComplete);
|
||||
state bool signalledTransferComplete = false;
|
||||
|
@ -1270,7 +1386,6 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self,
|
|||
.detail("KeyBegin", rd.keys.begin)
|
||||
.detail("KeyEnd", rd.keys.end)
|
||||
.detail("Priority", rd.priority)
|
||||
.detail("RelocationID", relocateShardInterval.pairID)
|
||||
.detail("SuppressedEventCount", self->suppressIntervals);
|
||||
|
||||
if (relocateShardInterval.severity != SevDebug) {
|
||||
|
@ -1298,7 +1413,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self,
|
|||
.detail("Range", kr);
|
||||
}
|
||||
}
|
||||
self->dataMoves.insert(rd.keys, DDQueueData::DDDataMove(rd.dataMoveId));
|
||||
self->dataMoves.insert(rd.keys, DDQueue::DDDataMove(rd.dataMoveId));
|
||||
}
|
||||
|
||||
state StorageMetrics metrics =
|
||||
|
@ -1524,6 +1639,10 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self,
|
|||
.detail("ExtraIds", describe(extraIds));
|
||||
}
|
||||
|
||||
self->serverCounter.increaseForTeam(rd.src, rd.reason, DDQueue::ServerCounter::LaunchedSource);
|
||||
self->serverCounter.increaseForTeam(destIds, rd.reason, DDQueue::ServerCounter::LaunchedDest);
|
||||
self->serverCounter.increaseForTeam(extraIds, rd.reason, DDQueue::ServerCounter::LaunchedDest);
|
||||
|
||||
state Error error = success();
|
||||
state Promise<Void> dataMovementComplete;
|
||||
// Move keys from source to destination by changing the serverKeyList and keyServerList system keys
|
||||
|
@ -1571,7 +1690,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self,
|
|||
auto ranges = self->dataMoves.getAffectedRangesAfterInsertion(rd.keys);
|
||||
if (ranges.size() == 1 && static_cast<KeyRange>(ranges[0]) == rd.keys &&
|
||||
ranges[0].value.id == rd.dataMoveId && !ranges[0].value.cancel.isValid()) {
|
||||
self->dataMoves.insert(rd.keys, DDQueueData::DDDataMove());
|
||||
self->dataMoves.insert(rd.keys, DDQueue::DDDataMove());
|
||||
TraceEvent(SevVerbose, "DequeueDataMoveOnSuccess", self->distributorId)
|
||||
.detail("DataMoveID", rd.dataMoveId)
|
||||
.detail("DataMoveRange", rd.keys);
|
||||
|
@ -1718,7 +1837,7 @@ inline double getWorstCpu(const HealthMetrics& metrics, const std::vector<UID>&
|
|||
|
||||
// Move the shard with the top K highest read density of sourceTeam's to destTeam if sourceTeam has much more read load
|
||||
// than destTeam
|
||||
ACTOR Future<bool> rebalanceReadLoad(DDQueueData* self,
|
||||
ACTOR Future<bool> rebalanceReadLoad(DDQueue* self,
|
||||
DataMovementReason moveReason,
|
||||
Reference<IDataDistributionTeam> sourceTeam,
|
||||
Reference<IDataDistributionTeam> destTeam,
|
||||
|
@ -1787,8 +1906,15 @@ ACTOR Future<bool> rebalanceReadLoad(DDQueueData* self,
|
|||
ShardsAffectedByTeamFailure::Team(sourceTeam->getServerIDs(), primary));
|
||||
for (int i = 0; i < shards.size(); i++) {
|
||||
if (shard == shards[i]) {
|
||||
self->output.send(RelocateShard(shard, moveReason, RelocateReason::REBALANCE_READ));
|
||||
self->updateLastAsSource(sourceTeam->getServerIDs());
|
||||
UID traceId = deterministicRandom()->randomUniqueID();
|
||||
self->output.send(RelocateShard(shard, moveReason, RelocateReason::REBALANCE_READ, traceId));
|
||||
traceEvent->detail("TraceId", traceId);
|
||||
|
||||
auto serverIds = sourceTeam->getServerIDs();
|
||||
self->updateLastAsSource(serverIds);
|
||||
|
||||
self->serverCounter.increaseForTeam(
|
||||
serverIds, RelocateReason::REBALANCE_READ, DDQueue::ServerCounter::ProposedSource);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -1797,7 +1923,7 @@ ACTOR Future<bool> rebalanceReadLoad(DDQueueData* self,
|
|||
}
|
||||
|
||||
// Move a random shard from sourceTeam if sourceTeam has much more data than provided destTeam
|
||||
ACTOR static Future<bool> rebalanceTeams(DDQueueData* self,
|
||||
ACTOR static Future<bool> rebalanceTeams(DDQueue* self,
|
||||
DataMovementReason moveReason,
|
||||
Reference<IDataDistributionTeam const> sourceTeam,
|
||||
Reference<IDataDistributionTeam const> destTeam,
|
||||
|
@ -1859,7 +1985,12 @@ ACTOR static Future<bool> rebalanceTeams(DDQueueData* self,
|
|||
ShardsAffectedByTeamFailure::Team(sourceTeam->getServerIDs(), primary));
|
||||
for (int i = 0; i < shards.size(); i++) {
|
||||
if (moveShard == shards[i]) {
|
||||
self->output.send(RelocateShard(moveShard, moveReason, RelocateReason::REBALANCE_DISK));
|
||||
UID traceId = deterministicRandom()->randomUniqueID();
|
||||
self->output.send(RelocateShard(moveShard, moveReason, RelocateReason::REBALANCE_DISK, traceId));
|
||||
traceEvent->detail("TraceId", traceId);
|
||||
|
||||
self->serverCounter.increaseForTeam(
|
||||
sourceTeam->getServerIDs(), RelocateReason::REBALANCE_DISK, DDQueue::ServerCounter::ProposedSource);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -1868,7 +1999,7 @@ ACTOR static Future<bool> rebalanceTeams(DDQueueData* self,
|
|||
return false;
|
||||
}
|
||||
|
||||
ACTOR Future<SrcDestTeamPair> getSrcDestTeams(DDQueueData* self,
|
||||
ACTOR Future<SrcDestTeamPair> getSrcDestTeams(DDQueue* self,
|
||||
int teamCollectionIndex,
|
||||
GetTeamRequest srcReq,
|
||||
GetTeamRequest destReq,
|
||||
|
@ -1895,7 +2026,7 @@ ACTOR Future<SrcDestTeamPair> getSrcDestTeams(DDQueueData* self,
|
|||
return {};
|
||||
}
|
||||
|
||||
ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex, DataMovementReason reason) {
|
||||
ACTOR Future<Void> BgDDLoadRebalance(DDQueue* self, int teamCollectionIndex, DataMovementReason reason) {
|
||||
state int resetCount = SERVER_KNOBS->DD_REBALANCE_RESET_AMOUNT;
|
||||
state Transaction tr(self->cx);
|
||||
state double lastRead = 0;
|
||||
|
@ -1996,7 +2127,7 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex,
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> BgDDMountainChopper(DDQueueData* self, int teamCollectionIndex) {
|
||||
ACTOR Future<Void> BgDDMountainChopper(DDQueue* self, int teamCollectionIndex) {
|
||||
state double rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
|
||||
state Transaction tr(self->cx);
|
||||
state double lastRead = 0;
|
||||
|
@ -2094,7 +2225,7 @@ ACTOR Future<Void> BgDDMountainChopper(DDQueueData* self, int teamCollectionInde
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> BgDDValleyFiller(DDQueueData* self, int teamCollectionIndex) {
|
||||
ACTOR Future<Void> BgDDValleyFiller(DDQueue* self, int teamCollectionIndex) {
|
||||
state double rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
|
||||
state Transaction tr(self->cx);
|
||||
state double lastRead = 0;
|
||||
|
@ -2209,41 +2340,42 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
|
|||
int teamSize,
|
||||
int singleRegionTeamSize,
|
||||
const DDEnabledState* ddEnabledState) {
|
||||
state DDQueueData self(distributorId,
|
||||
lock,
|
||||
cx,
|
||||
teamCollections,
|
||||
shardsAffectedByTeamFailure,
|
||||
getAverageShardBytes,
|
||||
teamSize,
|
||||
singleRegionTeamSize,
|
||||
output,
|
||||
input,
|
||||
getShardMetrics,
|
||||
getTopKMetrics);
|
||||
state DDQueue self(distributorId,
|
||||
lock,
|
||||
cx,
|
||||
teamCollections,
|
||||
shardsAffectedByTeamFailure,
|
||||
getAverageShardBytes,
|
||||
teamSize,
|
||||
singleRegionTeamSize,
|
||||
output,
|
||||
input,
|
||||
getShardMetrics,
|
||||
getTopKMetrics);
|
||||
state std::set<UID> serversToLaunchFrom;
|
||||
state KeyRange keysToLaunchFrom;
|
||||
state RelocateData launchData;
|
||||
state Future<Void> recordMetrics = delay(SERVER_KNOBS->DD_QUEUE_LOGGING_INTERVAL);
|
||||
|
||||
state std::vector<Future<Void>> balancingFutures;
|
||||
state std::vector<Future<Void>> ddQueueFutures;
|
||||
|
||||
state PromiseStream<KeyRange> rangesComplete;
|
||||
state Future<Void> launchQueuedWorkTimeout = Never();
|
||||
|
||||
for (int i = 0; i < teamCollections.size(); i++) {
|
||||
// FIXME: Use BgDDLoadBalance for disk rebalance too after DD simulation test proof.
|
||||
// balancingFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_OVERUTILIZED_TEAM));
|
||||
// balancingFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_UNDERUTILIZED_TEAM));
|
||||
// ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_OVERUTILIZED_TEAM));
|
||||
// ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_UNDERUTILIZED_TEAM));
|
||||
if (SERVER_KNOBS->READ_SAMPLING_ENABLED) {
|
||||
balancingFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_READ_OVERUTIL_TEAM));
|
||||
balancingFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_READ_UNDERUTIL_TEAM));
|
||||
ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_READ_OVERUTIL_TEAM));
|
||||
ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_READ_UNDERUTIL_TEAM));
|
||||
}
|
||||
balancingFutures.push_back(BgDDMountainChopper(&self, i));
|
||||
balancingFutures.push_back(BgDDValleyFiller(&self, i));
|
||||
ddQueueFutures.push_back(BgDDMountainChopper(&self, i));
|
||||
ddQueueFutures.push_back(BgDDValleyFiller(&self, i));
|
||||
}
|
||||
balancingFutures.push_back(delayedAsyncVar(self.rawProcessingUnhealthy, processingUnhealthy, 0));
|
||||
balancingFutures.push_back(delayedAsyncVar(self.rawProcessingWiggle, processingWiggle, 0));
|
||||
ddQueueFutures.push_back(delayedAsyncVar(self.rawProcessingUnhealthy, processingUnhealthy, 0));
|
||||
ddQueueFutures.push_back(delayedAsyncVar(self.rawProcessingWiggle, processingWiggle, 0));
|
||||
ddQueueFutures.push_back(self.periodicalRefreshCounter());
|
||||
|
||||
try {
|
||||
loop {
|
||||
|
@ -2356,7 +2488,7 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
|
|||
// key we use here must match the key used in the holder.
|
||||
}
|
||||
when(wait(self.error.getFuture())) {} // Propagate errors from dataDistributionRelocator
|
||||
when(wait(waitForAll(balancingFutures))) {}
|
||||
when(wait(waitForAll(ddQueueFutures))) {}
|
||||
when(Promise<int> r = waitNext(getUnhealthyRelocationCount)) { r.send(self.unhealthyRelocations); }
|
||||
}
|
||||
}
|
||||
|
@ -2369,3 +2501,27 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/DDQueue/ServerCounterTrace") {
|
||||
state double duration = 2.5 * SERVER_KNOBS->DD_QUEUE_COUNTER_REFRESH_INTERVAL;
|
||||
state DDQueue self;
|
||||
state Future<Void> counterFuture = self.periodicalRefreshCounter();
|
||||
state Future<Void> finishFuture = delay(duration);
|
||||
std::cout << "Start trace counter unit test for " << duration << "s ...\n";
|
||||
loop choose {
|
||||
when(wait(counterFuture)) {}
|
||||
when(wait(finishFuture)) { break; }
|
||||
when(wait(delayJittered(2.0))) {
|
||||
std::vector<UID> team(3);
|
||||
for (int i = 0; i < team.size(); ++i) {
|
||||
team[i] = UID(deterministicRandom()->randomInt(1, 400), 0);
|
||||
}
|
||||
auto reason = RelocateReason(deterministicRandom()->randomInt(0, RelocateReason::typeCount()));
|
||||
auto countType = DDQueue::ServerCounter::randomCountType();
|
||||
self.serverCounter.increaseForTeam(team, reason, countType);
|
||||
ASSERT(self.serverCounter.get(team[0], reason, countType));
|
||||
}
|
||||
}
|
||||
std::cout << "Finished.";
|
||||
return Void();
|
||||
}
|
|
@ -477,7 +477,8 @@ private:
|
|||
ACTOR Future<Void> shardSplitter(DataDistributionTracker* self,
|
||||
KeyRange keys,
|
||||
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize,
|
||||
ShardSizeBounds shardBounds) {
|
||||
ShardSizeBounds shardBounds,
|
||||
RelocateReason reason) {
|
||||
state StorageMetrics metrics = shardSize->get().get().metrics;
|
||||
state BandwidthStatus bandwidthStatus = getBandwidthStatus(metrics);
|
||||
|
||||
|
@ -524,12 +525,12 @@ ACTOR Future<Void> shardSplitter(DataDistributionTracker* self,
|
|||
for (int i = 0; i < skipRange; i++) {
|
||||
KeyRangeRef r(splitKeys[i], splitKeys[i + 1]);
|
||||
self->shardsAffectedByTeamFailure->defineShard(r);
|
||||
self->output.send(RelocateShard(r, DataMovementReason::SPLIT_SHARD, RelocateReason::OTHER));
|
||||
self->output.send(RelocateShard(r, DataMovementReason::SPLIT_SHARD, reason));
|
||||
}
|
||||
for (int i = numShards - 1; i > skipRange; i--) {
|
||||
KeyRangeRef r(splitKeys[i], splitKeys[i + 1]);
|
||||
self->shardsAffectedByTeamFailure->defineShard(r);
|
||||
self->output.send(RelocateShard(r, DataMovementReason::SPLIT_SHARD, RelocateReason::OTHER));
|
||||
self->output.send(RelocateShard(r, DataMovementReason::SPLIT_SHARD, reason));
|
||||
}
|
||||
|
||||
self->sizeChanges.add(changeSizes(self, keys, shardSize->get().get().metrics.bytes));
|
||||
|
@ -675,7 +676,7 @@ Future<Void> shardMerger(DataDistributionTracker* self,
|
|||
}
|
||||
restartShardTrackers(self, mergeRange, ShardMetrics(endingStats, lastLowBandwidthStartTime, shardCount));
|
||||
self->shardsAffectedByTeamFailure->defineShard(mergeRange);
|
||||
self->output.send(RelocateShard(mergeRange, DataMovementReason::MERGE_SHARD, RelocateReason::OTHER));
|
||||
self->output.send(RelocateShard(mergeRange, DataMovementReason::MERGE_SHARD, RelocateReason::MERGE_SHARD));
|
||||
|
||||
// We are about to be cancelled by the call to restartShardTrackers
|
||||
return Void();
|
||||
|
@ -693,9 +694,9 @@ ACTOR Future<Void> shardEvaluator(DataDistributionTracker* self,
|
|||
ShardSizeBounds shardBounds = getShardSizeBounds(keys, self->maxShardSize->get().get());
|
||||
StorageMetrics const& stats = shardSize->get().get().metrics;
|
||||
auto bandwidthStatus = getBandwidthStatus(stats);
|
||||
|
||||
bool shouldSplit = stats.bytes > shardBounds.max.bytes ||
|
||||
(bandwidthStatus == BandwidthStatusHigh && keys.begin < keyServersKeys.begin);
|
||||
bool sizeSplit = stats.bytes > shardBounds.max.bytes,
|
||||
writeSplit = bandwidthStatus == BandwidthStatusHigh && keys.begin < keyServersKeys.begin;
|
||||
bool shouldSplit = sizeSplit || writeSplit;
|
||||
bool shouldMerge = stats.bytes < shardBounds.min.bytes && bandwidthStatus == BandwidthStatusLow;
|
||||
|
||||
// Every invocation must set this or clear it
|
||||
|
@ -721,14 +722,14 @@ ACTOR Future<Void> shardEvaluator(DataDistributionTracker* self,
|
|||
// .detail("ShardBoundsMaxBytes", shardBounds.max.bytes)
|
||||
// .detail("ShardBoundsMinBytes", shardBounds.min.bytes)
|
||||
// .detail("WriteBandwitdhStatus", bandwidthStatus)
|
||||
// .detail("SplitBecauseHighWriteBandWidth",
|
||||
// (bandwidthStatus == BandwidthStatusHigh && keys.begin < keyServersKeys.begin) ? "Yes" : "No");
|
||||
// .detail("SplitBecauseHighWriteBandWidth", writeSplit ? "Yes" : "No");
|
||||
|
||||
if (!self->anyZeroHealthyTeams->get() && wantsToMerge->hasBeenTrueForLongEnough()) {
|
||||
onChange = onChange || shardMerger(self, keys, shardSize);
|
||||
}
|
||||
if (shouldSplit) {
|
||||
onChange = onChange || shardSplitter(self, keys, shardSize, shardBounds);
|
||||
RelocateReason reason = writeSplit ? RelocateReason::WRITE_SPLIT : RelocateReason::SIZE_SPLIT;
|
||||
onChange = onChange || shardSplitter(self, keys, shardSize, shardBounds, reason);
|
||||
}
|
||||
|
||||
wait(onChange);
|
||||
|
|
|
@ -36,7 +36,38 @@
|
|||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
enum class RelocateReason { OTHER = 0, REBALANCE_DISK, REBALANCE_READ };
|
||||
// SOMEDAY: whether it's possible to combine RelocateReason and DataMovementReason together?
|
||||
// RelocateReason to DataMovementReason is one-to-N mapping
|
||||
class RelocateReason {
|
||||
public:
|
||||
enum Value : int8_t { OTHER = 0, REBALANCE_DISK, REBALANCE_READ, MERGE_SHARD, SIZE_SPLIT, WRITE_SPLIT, __COUNT };
|
||||
RelocateReason(Value v) : value(v) { ASSERT(value != __COUNT); }
|
||||
explicit RelocateReason(int v) : value((Value)v) { ASSERT(value != __COUNT); }
|
||||
std::string toString() const {
|
||||
switch (value) {
|
||||
case OTHER:
|
||||
return "Other";
|
||||
case REBALANCE_DISK:
|
||||
return "RebalanceDisk";
|
||||
case REBALANCE_READ:
|
||||
return "RebalanceRead";
|
||||
case MERGE_SHARD:
|
||||
return "MergeShard";
|
||||
case SIZE_SPLIT:
|
||||
return "SizeSplit";
|
||||
case WRITE_SPLIT:
|
||||
return "WriteSplit";
|
||||
case __COUNT:
|
||||
ASSERT(false);
|
||||
}
|
||||
return "";
|
||||
}
|
||||
operator int() const { return (int)value; }
|
||||
constexpr static int8_t typeCount() { return (int)__COUNT; }
|
||||
|
||||
private:
|
||||
Value value;
|
||||
};
|
||||
|
||||
// One-to-one relationship to the priority knobs
|
||||
enum class DataMovementReason {
|
||||
|
@ -95,16 +126,18 @@ struct RelocateShard {
|
|||
RelocateReason reason;
|
||||
DataMovementReason moveReason;
|
||||
|
||||
UID traceId; // track the lifetime of this relocate shard
|
||||
|
||||
// Initialization when define is a better practice. We should avoid assignment of member after definition.
|
||||
// static RelocateShard emptyRelocateShard() { return {}; }
|
||||
|
||||
RelocateShard(KeyRange const& keys, DataMovementReason moveReason, RelocateReason reason)
|
||||
RelocateShard(KeyRange const& keys, DataMovementReason moveReason, RelocateReason reason, UID traceId = UID())
|
||||
: keys(keys), priority(dataMovementPriority(moveReason)), cancelled(false), dataMoveId(anonymousShardId),
|
||||
reason(reason), moveReason(moveReason) {}
|
||||
reason(reason), moveReason(moveReason), traceId(traceId) {}
|
||||
|
||||
RelocateShard(KeyRange const& keys, int priority, RelocateReason reason)
|
||||
RelocateShard(KeyRange const& keys, int priority, RelocateReason reason, UID traceId = UID())
|
||||
: keys(keys), priority(priority), cancelled(false), dataMoveId(anonymousShardId), reason(reason),
|
||||
moveReason(priorityToDataMovementReason(priority)) {}
|
||||
moveReason(priorityToDataMovementReason(priority)), traceId(traceId) {}
|
||||
|
||||
bool isRestore() const { return this->dataMove != nullptr; }
|
||||
|
||||
|
|
|
@ -1337,7 +1337,9 @@ std::string BaseTraceEvent::printRealTime(double time) {
|
|||
}
|
||||
|
||||
TraceInterval& TraceInterval::begin() {
|
||||
pairID = nondeterministicRandom()->randomUniqueID();
|
||||
if (!pairID.isValid()) {
|
||||
pairID = nondeterministicRandom()->randomUniqueID();
|
||||
}
|
||||
count = 0;
|
||||
return *this;
|
||||
}
|
||||
|
|
|
@ -26,4 +26,7 @@
|
|||
constexpr bool debugLogTraces = false;
|
||||
#define DebugLogTraceEvent(...) DebugTraceEvent(debugLogTraces, __VA_ARGS__)
|
||||
|
||||
constexpr bool debugRelocationTraces = false;
|
||||
#define DebugRelocationTraceEvent(...) DebugTraceEvent(debugRelocationTraces, __VA_ARGS__)
|
||||
|
||||
#endif // FOUNDATIONDB_DEBUGTRACE_H
|
||||
|
|
|
@ -583,7 +583,7 @@ private:
|
|||
class StringRef;
|
||||
|
||||
struct TraceInterval {
|
||||
TraceInterval(const char* type) : type(type), count(-1), severity(SevInfo) {}
|
||||
TraceInterval(const char* type, UID id = UID()) : type(type), pairID(id), count(-1), severity(SevInfo) {}
|
||||
|
||||
TraceInterval& begin();
|
||||
TraceInterval& end() { return *this; }
|
||||
|
|
Loading…
Reference in New Issue