Support pagination for StorageServer splitMetrics API

This commit is contained in:
Hui Liu 2023-02-01 16:24:03 -08:00
parent f8b1da8bc6
commit 774446d3a0
5 changed files with 38 additions and 19 deletions

View File

@ -8661,24 +8661,36 @@ ACTOR Future<Optional<Standalone<VectorRef<KeyRef>>>> splitStorageMetricsWithLoc
try {
state int i = 0;
for (; i < locations.size(); i++) {
SplitMetricsRequest req(
locations[i].range, limit, used, estimated, i == locations.size() - 1, minSplitBytes);
SplitMetricsReply res = wait(loadBalance(locations[i].locations->locations(),
&StorageServerInterface::splitMetrics,
req,
TaskPriority::DataDistribution));
if (res.splits.size() && res.splits[0] <= results.back()) { // split points are out of order, possibly
// because of moving data, throw error to retry
ASSERT_WE_THINK(false); // FIXME: This seems impossible and doesn't seem to be covered by testing
throw all_alternatives_failed();
}
if (res.splits.size()) {
results.append(results.arena(), res.splits.begin(), res.splits.size());
results.arena().dependsOn(res.splits.arena());
}
used = res.used;
state Key beginKey = locations[i].range.begin;
loop {
KeyRangeRef range(beginKey, locations[i].range.end);
SplitMetricsRequest req(range, limit, used, estimated, i == locations.size() - 1, minSplitBytes);
SplitMetricsReply res = wait(loadBalance(locations[i].locations->locations(),
&StorageServerInterface::splitMetrics,
req,
TaskPriority::DataDistribution));
if (res.splits.size() &&
res.splits[0] <= results.back()) { // split points are out of order, possibly
// because of moving data, throw error to retry
ASSERT_WE_THINK(false); // FIXME: This seems impossible and doesn't seem to be covered by testing
throw all_alternatives_failed();
}
//TraceEvent("SplitStorageMetricsResult").detail("Used", used.bytes).detail("Location", i).detail("Size", res.splits.size());
if (res.splits.size()) {
results.append(results.arena(), res.splits.begin(), res.splits.size());
results.arena().dependsOn(res.splits.arena());
}
used = res.used;
if (res.more && res.splits.size()) {
// Next request will return split points after this one
beginKey = KeyRef(beginKey.arena(), res.splits.back());
} else {
break;
}
//TraceEvent("SplitStorageMetricsResult").detail("Used", used.bytes).detail("Location", i).detail("Size", res.splits.size());
}
}
if (used.allLessOrEqual(limit * CLIENT_KNOBS->STORAGE_METRICS_UNFAIR_SPLIT_LIMIT) && results.size() > 1) {

View File

@ -850,6 +850,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
// This exists for flexibility but assigning each ReadType to its own unique priority number makes the most sense
// The enumeration is currently: eager, fetch, low, normal, high
init( STORAGESERVER_READTYPE_PRIORITY_MAP, "0,1,2,3,4" );
init( SPLIT_METRICS_MAX_ROWS, 10000 );
//Wait Failure
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;

View File

@ -791,6 +791,7 @@ public:
std::string STORAGESERVER_READ_PRIORITIES;
int STORAGE_SERVER_READ_CONCURRENCY;
std::string STORAGESERVER_READTYPE_PRIORITY_MAP;
int SPLIT_METRICS_MAX_ROWS;
// Wait Failure
int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS;

View File

@ -740,10 +740,11 @@ struct SplitMetricsReply {
constexpr static FileIdentifier file_identifier = 11530792;
Standalone<VectorRef<KeyRef>> splits;
StorageMetrics used;
bool more = false;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, splits, used);
serializer(ar, splits, used, more);
}
};

View File

@ -292,6 +292,10 @@ void StorageServerMetrics::splitMetrics(SplitMetricsRequest req) const {
if (key == req.keys.end)
break;
reply.splits.push_back_deep(reply.splits.arena(), key);
if (reply.splits.size() > SERVER_KNOBS->SPLIT_METRICS_MAX_ROWS) {
reply.more = true;
break;
}
StorageMetrics diff = (getMetrics(KeyRangeRef(lastKey, key)) + used);
remaining -= diff;
@ -301,7 +305,7 @@ void StorageServerMetrics::splitMetrics(SplitMetricsRequest req) const {
lastKey = key;
}
reply.used = getMetrics(KeyRangeRef(lastKey, req.keys.end)) + used;
reply.used = reply.more ? StorageMetrics() : getMetrics(KeyRangeRef(lastKey, req.keys.end)) + used;
req.reply.send(reply);
} catch (Error& e) {
req.reply.sendError(e);