fixed bugs in small splits
This commit is contained in:
parent
ba53045fcd
commit
bab8756e17
|
@ -87,7 +87,7 @@ struct BlobGranuleChunkRef {
|
|||
}
|
||||
};
|
||||
|
||||
enum BlobGranuleSplitState { Unknown = 0, Started = 1, Assigned = 2, Done = 3 };
|
||||
enum BlobGranuleSplitState { Unknown = 0, Initialized = 1, Assigned = 2, Done = 3 };
|
||||
|
||||
struct BlobGranuleHistoryValue {
|
||||
constexpr static FileIdentifier file_identifier = 991434;
|
||||
|
|
|
@ -256,7 +256,6 @@ std::string printable(const VectorRef<StringRef>& val) {
|
|||
std::string printable(const StringRef& val) {
|
||||
return val.printable();
|
||||
}
|
||||
|
||||
std::string printable(const std::string& str) {
|
||||
return StringRef(str).printable();
|
||||
}
|
||||
|
@ -6432,7 +6431,8 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> splitStorageMetrics(Database cx,
|
|||
//TraceEvent("SplitStorageMetricsResult").detail("Used", used.bytes).detail("Location", i).detail("Size", res.splits.size());
|
||||
}
|
||||
|
||||
if (used.allLessOrEqual(limit * CLIENT_KNOBS->STORAGE_METRICS_UNFAIR_SPLIT_LIMIT)) {
|
||||
if (used.allLessOrEqual(limit * CLIENT_KNOBS->STORAGE_METRICS_UNFAIR_SPLIT_LIMIT) &&
|
||||
results.size() > 1) {
|
||||
results.resize(results.arena(), results.size() - 1);
|
||||
}
|
||||
|
||||
|
|
|
@ -255,6 +255,7 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> splitRange(Reference<ReadYourWritesT
|
|||
|
||||
Standalone<VectorRef<KeyRef>> keys =
|
||||
wait(tr->getTransaction().splitStorageMetrics(range, splitMetrics, estimated));
|
||||
ASSERT(keys.size() >= 2);
|
||||
return keys;
|
||||
} else {
|
||||
// printf(" Not splitting range\n");
|
||||
|
@ -613,10 +614,8 @@ ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData,
|
|||
state int64_t newLockSeqno = -1;
|
||||
|
||||
// first get ranges to split
|
||||
if (newRanges.empty()) {
|
||||
Standalone<VectorRef<KeyRef>> _newRanges = wait(splitRange(tr, granuleRange));
|
||||
newRanges = _newRanges;
|
||||
}
|
||||
Standalone<VectorRef<KeyRef>> _newRanges = wait(splitRange(tr, granuleRange));
|
||||
newRanges = _newRanges;
|
||||
|
||||
if (newRanges.size() == 2) {
|
||||
// not large enough to split, just reassign back to worker
|
||||
|
@ -635,13 +634,23 @@ ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData,
|
|||
return Void();
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
printf("Splitting range [%s - %s) into (%d):\n",
|
||||
granuleRange.begin.printable().c_str(),
|
||||
granuleRange.end.printable().c_str(),
|
||||
newRanges.size() - 1);
|
||||
for (int i = 0; i < newRanges.size(); i++) {
|
||||
printf(" %s\n", newRanges[i].printable().c_str());
|
||||
}
|
||||
}
|
||||
|
||||
// Need to split range. Persist intent to split and split metadata to DB BEFORE sending split requests
|
||||
loop {
|
||||
try {
|
||||
tr->reset();
|
||||
tr->setOption(FDBTransactionOptions::Option::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr->setOption(FDBTransactionOptions::Option::ACCESS_SYSTEM_KEYS);
|
||||
ASSERT(newRanges.size() >= 2);
|
||||
ASSERT(newRanges.size() > 2);
|
||||
|
||||
// make sure we're still manager when this transaction gets committed
|
||||
wait(checkManagerLock(tr, bmData));
|
||||
|
@ -685,7 +694,7 @@ ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData,
|
|||
Key splitKey = blobGranuleSplitKeyFor(granuleID, newGranuleID);
|
||||
|
||||
tr->atomicOp(splitKey,
|
||||
blobGranuleSplitValueFor(BlobGranuleSplitState::Started),
|
||||
blobGranuleSplitValueFor(BlobGranuleSplitState::Initialized),
|
||||
MutationRef::SetVersionstampedValue);
|
||||
|
||||
Key historyKey = blobGranuleHistoryKeyFor(KeyRangeRef(newRanges[i], newRanges[i + 1]), latestVersion);
|
||||
|
@ -716,16 +725,6 @@ ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData,
|
|||
}
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
printf("Splitting range [%s - %s) into (%d):\n",
|
||||
granuleRange.begin.printable().c_str(),
|
||||
granuleRange.end.printable().c_str(),
|
||||
newRanges.size() - 1);
|
||||
for (int i = 0; i < newRanges.size() - 1; i++) {
|
||||
printf(" [%s - %s)\n", newRanges[i].printable().c_str(), newRanges[i + 1].printable().c_str());
|
||||
}
|
||||
}
|
||||
|
||||
// transaction committed, send range assignments
|
||||
// revoke from current worker
|
||||
RangeAssignment raRevoke;
|
||||
|
@ -1084,7 +1083,7 @@ ACTOR Future<Void> recoverBlobManager(BlobManagerData* bmData) {
|
|||
std::tie(parentGranuleID, granuleID) = decodeBlobGranuleSplitKey(split.key);
|
||||
std::tie(splitState, version) = decodeBlobGranuleSplitValue(split.value);
|
||||
const KeyRange range = blobGranuleSplitKeyRangeFor(parentGranuleID);
|
||||
if (splitState <= BlobGranuleSplitState::Started) {
|
||||
if (splitState <= BlobGranuleSplitState::Initialized) {
|
||||
bmData->workerAssignments.insert(range, UID());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -405,7 +405,7 @@ ACTOR Future<Void> updateGranuleSplitState(Transaction* tr,
|
|||
|
||||
BlobGranuleSplitState st = decodeBlobGranuleSplitValue(it.value).first;
|
||||
ASSERT(st != BlobGranuleSplitState::Unknown);
|
||||
if (st == BlobGranuleSplitState::Started) {
|
||||
if (st == BlobGranuleSplitState::Initialized) {
|
||||
totalStarted++;
|
||||
} else if (st == BlobGranuleSplitState::Done) {
|
||||
totalDone++;
|
||||
|
@ -445,7 +445,7 @@ ACTOR Future<Void> updateGranuleSplitState(Transaction* tr,
|
|||
tr->clear(currentRange);
|
||||
} else {
|
||||
tr->atomicOp(myStateKey, blobGranuleSplitValueFor(newState), MutationRef::SetVersionstampedValue);
|
||||
if (newState == BlobGranuleSplitState::Assigned && currentState == BlobGranuleSplitState::Started &&
|
||||
if (newState == BlobGranuleSplitState::Assigned && currentState == BlobGranuleSplitState::Initialized &&
|
||||
totalStarted == 1) {
|
||||
// We are the last one to change from Start -> Assigned, so we can stop the parent change feed.
|
||||
if (BW_DEBUG) {
|
||||
|
@ -2221,7 +2221,7 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
|
|||
info.parentGranule = std::pair(parentGranuleRange, parentGranuleID);
|
||||
|
||||
state std::pair<BlobGranuleSplitState, Version> granuleSplitState =
|
||||
std::pair(BlobGranuleSplitState::Started, invalidVersion);
|
||||
std::pair(BlobGranuleSplitState::Initialized, invalidVersion);
|
||||
if (hasPrevOwner) {
|
||||
std::pair<BlobGranuleSplitState, Version> _gss =
|
||||
wait(getGranuleSplitState(&tr, parentGranuleID, info.granuleID));
|
||||
|
@ -2232,7 +2232,7 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
|
|||
// was already assigned, use change feed start version
|
||||
ASSERT(granuleSplitState.second > 0);
|
||||
info.changeFeedStartVersion = granuleSplitState.second;
|
||||
} else if (granuleSplitState.first == BlobGranuleSplitState::Started) {
|
||||
} else if (granuleSplitState.first == BlobGranuleSplitState::Initialized) {
|
||||
wait(updateGranuleSplitState(&tr,
|
||||
info.parentGranule.get().first,
|
||||
info.parentGranule.get().second,
|
||||
|
|
Loading…
Reference in New Issue