More bugs in streaming split
This commit is contained in:
parent
88b1593681
commit
9b3f6fe452
|
@ -6779,7 +6779,7 @@ ACTOR Future<Void> splitStorageMetricsStream(PromiseStream<Key> resultStream,
|
||||||
StorageMetrics estimated) {
|
StorageMetrics estimated) {
|
||||||
state Span span("NAPI:SplitStorageMetricsStream"_loc);
|
state Span span("NAPI:SplitStorageMetricsStream"_loc);
|
||||||
state Key beginKey = keys.begin;
|
state Key beginKey = keys.begin;
|
||||||
state Key lastKey = beginKey;
|
state Key globalLastKey = beginKey;
|
||||||
resultStream.send(beginKey);
|
resultStream.send(beginKey);
|
||||||
// track used across loops
|
// track used across loops
|
||||||
state StorageMetrics globalUsed;
|
state StorageMetrics globalUsed;
|
||||||
|
@ -6795,6 +6795,7 @@ ACTOR Future<Void> splitStorageMetricsStream(PromiseStream<Key> resultStream,
|
||||||
//TraceEvent("SplitStorageMetrics").detail("Locations", locations.size());
|
//TraceEvent("SplitStorageMetrics").detail("Locations", locations.size());
|
||||||
|
|
||||||
state StorageMetrics localUsed = globalUsed;
|
state StorageMetrics localUsed = globalUsed;
|
||||||
|
state Key localLastKey = globalLastKey;
|
||||||
state Standalone<VectorRef<KeyRef>> results;
|
state Standalone<VectorRef<KeyRef>> results;
|
||||||
state int i = 0;
|
state int i = 0;
|
||||||
for (; i < locations.size(); i++) {
|
for (; i < locations.size(); i++) {
|
||||||
|
@ -6807,7 +6808,8 @@ ACTOR Future<Void> splitStorageMetricsStream(PromiseStream<Key> resultStream,
|
||||||
&StorageServerInterface::splitMetrics,
|
&StorageServerInterface::splitMetrics,
|
||||||
req,
|
req,
|
||||||
TaskPriority::DataDistribution));
|
TaskPriority::DataDistribution));
|
||||||
if (res.splits.size() && res.splits[0] <= lastKey) { // split points are out of order, possibly because
|
if (res.splits.size() &&
|
||||||
|
res.splits[0] <= localLastKey) { // split points are out of order, possibly because
|
||||||
// of moving data, throw error to retry
|
// of moving data, throw error to retry
|
||||||
ASSERT_WE_THINK(false); // FIXME: This seems impossible and doesn't seem to be covered by testing
|
ASSERT_WE_THINK(false); // FIXME: This seems impossible and doesn't seem to be covered by testing
|
||||||
throw all_alternatives_failed();
|
throw all_alternatives_failed();
|
||||||
|
@ -6816,7 +6818,7 @@ ACTOR Future<Void> splitStorageMetricsStream(PromiseStream<Key> resultStream,
|
||||||
if (res.splits.size()) {
|
if (res.splits.size()) {
|
||||||
results.append(results.arena(), res.splits.begin(), res.splits.size());
|
results.append(results.arena(), res.splits.begin(), res.splits.size());
|
||||||
results.arena().dependsOn(res.splits.arena());
|
results.arena().dependsOn(res.splits.arena());
|
||||||
lastKey = res.splits.back();
|
localLastKey = res.splits.back();
|
||||||
}
|
}
|
||||||
localUsed = res.used;
|
localUsed = res.used;
|
||||||
|
|
||||||
|
@ -6830,8 +6832,9 @@ ACTOR Future<Void> splitStorageMetricsStream(PromiseStream<Key> resultStream,
|
||||||
globalUsed.allLessOrEqual(limit * CLIENT_KNOBS->STORAGE_METRICS_UNFAIR_SPLIT_LIMIT) &&
|
globalUsed.allLessOrEqual(limit * CLIENT_KNOBS->STORAGE_METRICS_UNFAIR_SPLIT_LIMIT) &&
|
||||||
results.size() > 1) {
|
results.size() > 1) {
|
||||||
results.resize(results.arena(), results.size() - 1);
|
results.resize(results.arena(), results.size() - 1);
|
||||||
lastKey = results.back();
|
localLastKey = results.back();
|
||||||
}
|
}
|
||||||
|
globalLastKey = localLastKey;
|
||||||
|
|
||||||
for (auto& splitKey : results) {
|
for (auto& splitKey : results) {
|
||||||
resultStream.send(splitKey);
|
resultStream.send(splitKey);
|
||||||
|
|
|
@ -1718,9 +1718,9 @@ MutationsAndVersionRef filterMutations(Arena& arena,
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO REMOVE!!! when BG is correctness clean
|
// TODO REMOVE!!! when BG is correctness clean
|
||||||
#define DEBUG_SS_ID "bdd3"_sr
|
#define DEBUG_SS_ID ""_sr
|
||||||
#define DEBUG_SS_CF_ID "b65dda"_sr
|
#define DEBUG_SS_CF_ID ""_sr
|
||||||
#define DEBUG_SS_CF_BEGIN_VERSION 244526755
|
#define DEBUG_SS_CF_BEGIN_VERSION invalidVersion
|
||||||
#define DEBUG_SS_CFM(ssId, cfId, v) \
|
#define DEBUG_SS_CFM(ssId, cfId, v) \
|
||||||
ssId.toString().substr(0, 4) == DEBUG_SS_ID&& cfId.printable().substr(0, 6) == DEBUG_SS_CF_ID && \
|
ssId.toString().substr(0, 4) == DEBUG_SS_ID&& cfId.printable().substr(0, 6) == DEBUG_SS_CF_ID && \
|
||||||
(v >= DEBUG_SS_CF_BEGIN_VERSION || latestVersion == DEBUG_SS_CF_BEGIN_VERSION)
|
(v >= DEBUG_SS_CF_BEGIN_VERSION || latestVersion == DEBUG_SS_CF_BEGIN_VERSION)
|
||||||
|
|
Loading…
Reference in New Issue