fixes for split metrics stream

This commit is contained in:
Josh Slocum 2022-02-04 18:24:47 -06:00
parent c695958823
commit 1736eabd33
2 changed files with 19 additions and 11 deletions

View File

@ -6772,13 +6772,14 @@ ACTOR Future<Void> setPerpetualStorageWiggle(Database cx, bool enable, LockAware
return Void();
}
ACTOR Future<Void> splitStorageMetricsStream(Database cx,
ACTOR Future<Void> splitStorageMetricsStream(PromiseStream<Key> resultStream,
Database cx,
KeyRange keys,
StorageMetrics limit,
StorageMetrics estimated,
PromiseStream<Key> resultStream) {
StorageMetrics estimated) {
state Span span("NAPI:SplitStorageMetricsStream"_loc);
Key beginKey = keys.begin;
state Key beginKey = keys.begin;
state Key lastKey = beginKey;
resultStream.send(beginKey);
// track used across loops in case one loop does not find any split points
state StorageMetrics used;
@ -6802,9 +6803,8 @@ ACTOR Future<Void> splitStorageMetricsStream(Database cx,
&StorageServerInterface::splitMetrics,
req,
TaskPriority::DataDistribution));
if (res.splits.size() &&
res.splits[0] <= results.back()) { // split points are out of order, possibly because of
// moving data, throw error to retry
if (res.splits.size() && res.splits[0] <= lastKey) { // split points are out of order, possibly because
// of moving data, throw error to retry
ASSERT_WE_THINK(false); // FIXME: This seems impossible and doesn't seem to be covered by testing
throw all_alternatives_failed();
}
@ -6825,6 +6825,7 @@ ACTOR Future<Void> splitStorageMetricsStream(Database cx,
for (auto& splitKey : results) {
resultStream.send(splitKey);
lastKey = splitKey;
}
if (keys.end <= locations.back().first.end) {
@ -6848,6 +6849,14 @@ ACTOR Future<Void> splitStorageMetricsStream(Database cx,
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskPriority::DataDistribution));
}
}
return Void();
}
Future<Void> Transaction::splitStorageMetricsStream(const PromiseStream<Key>& resultStream,
KeyRange const& keys,
StorageMetrics const& limit,
StorageMetrics const& estimated) {
return ::splitStorageMetricsStream(resultStream, cx, keys, limit, estimated);
}
ACTOR Future<Standalone<VectorRef<KeyRef>>> splitStorageMetrics(Database cx,
@ -6922,9 +6931,8 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> splitStorageMetrics(Database cx,
Future<Standalone<VectorRef<KeyRef>>> Transaction::splitStorageMetrics(KeyRange const& keys,
StorageMetrics const& limit,
StorageMetrics const& estimated,
bool allowPartial) {
return ::splitStorageMetrics(cx, keys, limit, estimated, allowPartial);
StorageMetrics const& estimated) {
return ::splitStorageMetrics(cx, keys, limit, estimated);
}
void Transaction::checkDeferredError() const {

View File

@ -364,7 +364,7 @@ public:
int expectedShardCount);
// Pass a negative value for `shardLimit` to indicate no limit on the shard number.
Future<StorageMetrics> getStorageMetrics(KeyRange const& keys, int shardLimit);
Future<Void> splitStorageMetricsStream(PromiseStream<Key> resultsStream,
Future<Void> splitStorageMetricsStream(PromiseStream<Key> const& resultsStream,
KeyRange const& keys,
StorageMetrics const& limit,
StorageMetrics const& estimated);