parent
11b8be6f3d
commit
c21d8830a2
|
@ -2096,9 +2096,7 @@ struct KeyValueGen {
|
||||||
return StringRef(ar, value);
|
return StringRef(ar, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
KeyRef randomUsedKey() const {
|
KeyRef randomUsedKey() const { return usedKeysList[deterministicRandom()->randomInt(0, usedKeysList.size())]; }
|
||||||
return usedKeysList[deterministicRandom()->randomInt(0, usedKeysList.size())];
|
|
||||||
}
|
|
||||||
|
|
||||||
KeyRange randomKeyRange() const {
|
KeyRange randomKeyRange() const {
|
||||||
ASSERT(!usedKeysList.empty());
|
ASSERT(!usedKeysList.empty());
|
||||||
|
|
|
@ -1423,9 +1423,7 @@ ACTOR Future<Void> reevaluateInitialSplit(Reference<BlobWorkerData> bwData,
|
||||||
// wait for manager stream to become ready, and send a message
|
// wait for manager stream to become ready, and send a message
|
||||||
loop {
|
loop {
|
||||||
choose {
|
choose {
|
||||||
when(wait(bwData->currentManagerStatusStream.get().onReady())) {
|
when(wait(bwData->currentManagerStatusStream.get().onReady())) { break; }
|
||||||
break;
|
|
||||||
}
|
|
||||||
when(wait(bwData->currentManagerStatusStream.onChange())) {}
|
when(wait(bwData->currentManagerStatusStream.onChange())) {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2582,9 +2582,7 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
|
||||||
debug_setCheckRelocationDuration(false);
|
debug_setCheckRelocationDuration(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
when(KeyRange done = waitNext(rangesComplete.getFuture())) {
|
when(KeyRange done = waitNext(rangesComplete.getFuture())) { keysToLaunchFrom = done; }
|
||||||
keysToLaunchFrom = done;
|
|
||||||
}
|
|
||||||
when(wait(recordMetrics)) {
|
when(wait(recordMetrics)) {
|
||||||
Promise<int64_t> req;
|
Promise<int64_t> req;
|
||||||
getAverageShardBytes.send(req);
|
getAverageShardBytes.send(req);
|
||||||
|
@ -2656,9 +2654,7 @@ TEST_CASE("/DataDistribution/DDQueue/ServerCounterTrace") {
|
||||||
std::cout << "Start trace counter unit test for " << duration << "s ...\n";
|
std::cout << "Start trace counter unit test for " << duration << "s ...\n";
|
||||||
loop choose {
|
loop choose {
|
||||||
when(wait(counterFuture)) {}
|
when(wait(counterFuture)) {}
|
||||||
when(wait(finishFuture)) {
|
when(wait(finishFuture)) { break; }
|
||||||
break;
|
|
||||||
}
|
|
||||||
when(wait(delayJittered(2.0))) {
|
when(wait(delayJittered(2.0))) {
|
||||||
std::vector<UID> team(3);
|
std::vector<UID> team(3);
|
||||||
for (int i = 0; i < team.size(); ++i) {
|
for (int i = 0; i < team.size(); ++i) {
|
||||||
|
|
|
@ -1023,9 +1023,7 @@ ACTOR Future<Void> fetchShardMetricsList_impl(DataDistributionTracker* self, Get
|
||||||
ACTOR Future<Void> fetchShardMetricsList(DataDistributionTracker* self, GetMetricsListRequest req) {
|
ACTOR Future<Void> fetchShardMetricsList(DataDistributionTracker* self, GetMetricsListRequest req) {
|
||||||
choose {
|
choose {
|
||||||
when(wait(fetchShardMetricsList_impl(self, req))) {}
|
when(wait(fetchShardMetricsList_impl(self, req))) {}
|
||||||
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) {
|
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) { req.reply.sendError(timed_out()); }
|
||||||
req.reply.sendError(timed_out());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
@ -1061,9 +1059,7 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
|
||||||
initData = Reference<InitialDataDistribution>();
|
initData = Reference<InitialDataDistribution>();
|
||||||
|
|
||||||
loop choose {
|
loop choose {
|
||||||
when(Promise<int64_t> req = waitNext(getAverageShardBytes)) {
|
when(Promise<int64_t> req = waitNext(getAverageShardBytes)) { req.send(self.getAverageShardBytes()); }
|
||||||
req.send(self.getAverageShardBytes());
|
|
||||||
}
|
|
||||||
when(wait(loggingTrigger)) {
|
when(wait(loggingTrigger)) {
|
||||||
TraceEvent("DDTrackerStats", self.distributorId)
|
TraceEvent("DDTrackerStats", self.distributorId)
|
||||||
.detail("Shards", self.shards->size())
|
.detail("Shards", self.shards->size())
|
||||||
|
|
|
@ -423,9 +423,7 @@ ACTOR Future<Void> globalConfigRequestServer(GrvProxyData* grvProxyData, GrvProx
|
||||||
Void()) &&
|
Void()) &&
|
||||||
delay(SERVER_KNOBS->GLOBAL_CONFIG_REFRESH_INTERVAL);
|
delay(SERVER_KNOBS->GLOBAL_CONFIG_REFRESH_INTERVAL);
|
||||||
}
|
}
|
||||||
when(wait(actors.getResult())) {
|
when(wait(actors.getResult())) { ASSERT(false); }
|
||||||
ASSERT(false);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,12 +50,8 @@ constexpr uint64_t futureProtocolVersionValue = 0x0FDB00B073000000LL;
|
||||||
struct x { \
|
struct x { \
|
||||||
static constexpr uint64_t protocolVersion = v; \
|
static constexpr uint64_t protocolVersion = v; \
|
||||||
}; \
|
}; \
|
||||||
constexpr bool has##x() const { \
|
constexpr bool has##x() const { return this->version() >= x ::protocolVersion; } \
|
||||||
return this->version() >= x ::protocolVersion; \
|
static constexpr ProtocolVersion with##x() { return ProtocolVersion(x ::protocolVersion); }
|
||||||
} \
|
|
||||||
static constexpr ProtocolVersion with##x() { \
|
|
||||||
return ProtocolVersion(x ::protocolVersion); \
|
|
||||||
}
|
|
||||||
|
|
||||||
// ProtocolVersion wraps a uint64_t to make it type safe. It will know about the current versions.
|
// ProtocolVersion wraps a uint64_t to make it type safe. It will know about the current versions.
|
||||||
// The default constructor will initialize the version to 0 (which is an invalid
|
// The default constructor will initialize the version to 0 (which is an invalid
|
||||||
|
|
Loading…
Reference in New Issue