Fixes for streaming split keys

This commit is contained in:
Josh Slocum 2022-02-07 10:36:22 -06:00
parent 809ec24184
commit c48ca9430d
1 changed files with 17 additions and 8 deletions

View File

@ -6781,8 +6781,8 @@ ACTOR Future<Void> splitStorageMetricsStream(PromiseStream<Key> resultStream,
state Key beginKey = keys.begin;
state Key lastKey = beginKey;
resultStream.send(beginKey);
// track used across loops in case one loop does not find any split points
state StorageMetrics used;
// track used across loops
state StorageMetrics globalUsed;
loop {
state std::vector<std::pair<KeyRange, Reference<LocationInfo>>> locations =
wait(getKeyRangeLocations(cx,
@ -6791,41 +6791,50 @@ ACTOR Future<Void> splitStorageMetricsStream(PromiseStream<Key> resultStream,
Reverse::False,
&StorageServerInterface::splitMetrics,
TransactionInfo(TaskPriority::DataDistribution, span.context)));
try {
//TraceEvent("SplitStorageMetrics").detail("Locations", locations.size());
state StorageMetrics localUsed = globalUsed;
state Standalone<VectorRef<KeyRef>> results;
state int i = 0;
for (; i < locations.size(); i++) {
SplitMetricsRequest req(locations[i].first, limit, used, estimated, i == locations.size() - 1);
SplitMetricsRequest req(locations[i].first,
limit,
localUsed,
estimated,
i == locations.size() - 1 && keys.end <= locations.back().first.end);
SplitMetricsReply res = wait(loadBalance(locations[i].second->locations(),
&StorageServerInterface::splitMetrics,
req,
TaskPriority::DataDistribution));
if (res.splits.size() && res.splits[0] <= lastKey) { // split points are out of order, possibly because
// of moving data, throw error to retry
// of moving data, throw error to retry
ASSERT_WE_THINK(false); // FIXME: This seems impossible and doesn't seem to be covered by testing
throw all_alternatives_failed();
}
if (res.splits.size()) {
results.append(results.arena(), res.splits.begin(), res.splits.size());
results.arena().dependsOn(res.splits.arena());
lastKey = res.splits.back();
}
used = res.used;
localUsed = res.used;
//TraceEvent("SplitStorageMetricsResult").detail("Used", used.bytes).detail("Location", i).detail("Size", res.splits.size());
}
globalUsed = localUsed;
// only truncate split at end
if (keys.end <= locations.back().first.end &&
used.allLessOrEqual(limit * CLIENT_KNOBS->STORAGE_METRICS_UNFAIR_SPLIT_LIMIT) && results.size() > 1) {
globalUsed.allLessOrEqual(limit * CLIENT_KNOBS->STORAGE_METRICS_UNFAIR_SPLIT_LIMIT) &&
results.size() > 1) {
results.resize(results.arena(), results.size() - 1);
lastKey = results.back();
}
for (auto& splitKey : results) {
resultStream.send(splitKey);
lastKey = splitKey;
}
if (keys.end <= locations.back().first.end) {