Reject Range Lock/Unlock Requests with Conflicting Range, User, or Lock Type (#12047)

* fix range lock

* make bulkload workload correct

* fix bugs and improve test coverage

* nits

* address comments

* nits

* address comments
This commit is contained in:
Zhe Wang 2025-03-21 16:59:52 -07:00 committed by GitHub
parent 432374a4df
commit ee0cd50c7c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 504 additions and 323 deletions

View File

@ -39,16 +39,19 @@ Ideally, we would use the write lock to achieve this; however, we are currently
How to use?
-----------
Currently, FDB only provides ManagementAPI to lock a range.
Before a user can lock a range, the user must register its identity to the database.
A range can only be locked by a registered owner.
The user can use the following API to register an identity and lock a range.
Currently, FDB provides the ManagementAPI for range locking, intended as an interface for FDB feature development.
Before locking a range, a user must first register their identity with the database.
Only registered users are permitted to acquire range locks.
The following API can be used to register an identity and lock a range.
Put an exclusive read lock on a range. The range must be within the user key space, aka ``"" ~ \xff``.
The locking request is rejected with a range_lock_reject error if the range contains any existing lock with a different range, user, or lock type.
Currently, only the ExclusiveReadLock type is supported, but the design allows for future extension.
``ACTOR Future<Void> takeExclusiveReadLockOnRange(Database cx, KeyRange range, RangeLockOwnerName ownerUniqueID);``
Release an exclusive read lock on a range. The range must be within the user key space, aka ``"" ~ \xff``.
The release request is rejected with a range_lock_reject error if the range contains any existing lock with a different range, user, or lock type.
``ACTOR Future<Void> releaseExclusiveReadLockOnRange(Database cx, KeyRange range, RangeLockOwnerName ownerUniqueID);``
@ -58,7 +61,7 @@ If the execution is failed, no range is locked/unlocked.
Get exclusive read locks on the input range
``ACTOR Future<std::vector<KeyRange>> getExclusiveReadLockOnRange(Database cx, KeyRange range);``
``ACTOR Future<std::vector<std::pair<KeyRange, RangeLockState>>> findExclusiveReadLockOnRange(Database cx, KeyRange range);``
Register a range lock owner to database metadata.

View File

@ -291,6 +291,50 @@ ACTOR Future<UID> bulkLoadCommandActor(Database cx, std::vector<StringRef> token
printLongDesc(tokens[0]);
return UID();
} else if (tokencmp(tokens[1], "printlock")) {
// For debugging purposes and invisible to users.
if (tokens.size() != 2) {
fmt::println("{}", BULK_LOAD_STATUS_USAGE);
return UID();
}
std::vector<std::pair<KeyRange, RangeLockState>> lockedRanges =
wait(findExclusiveReadLockOnRange(cx, normalKeys));
fmt::println("Total {} locked ranges", lockedRanges.size());
if (lockedRanges.size() > 10) {
fmt::println("First 10 locks are:");
}
int count = 1;
for (const auto& lock : lockedRanges) {
if (count > 10) {
break;
}
fmt::println("Lock {} on {} for {}", count, lock.first.toString(), lock.second.toString());
count++;
}
return UID();
} else if (tokencmp(tokens[1], "printlockowner")) {
// For debugging purposes and invisible to users.
if (tokens.size() != 2) {
fmt::println("{}", BULK_LOAD_STATUS_USAGE);
return UID();
}
std::vector<RangeLockOwner> owners = wait(getAllRangeLockOwners(cx));
for (const auto owner : owners) {
fmt::println("{}", owner.toString());
}
return UID();
} else if (tokencmp(tokens[1], "clearlock")) {
// For debugging purposes and invisible to users.
if (tokens.size() != 3) {
fmt::println("{}", BULK_LOAD_STATUS_USAGE);
return UID();
}
std::string ownerUniqueID = tokens[2].toString();
wait(releaseExclusiveReadLockByUser(cx, ownerUniqueID));
return UID();
} else {
printUsage(tokens[0]);
printLongDesc(tokens[0]);

View File

@ -2833,61 +2833,9 @@ ACTOR Future<int> setBulkLoadMode(Database cx, int mode) {
}
}
ACTOR Future<std::vector<BulkLoadTaskState>> getBulkLoadTasksWithinRange(Database cx,
KeyRange rangeToRead,
size_t limit,
Optional<BulkLoadPhase> phase) {
state Transaction tr(cx);
state Key readBegin = rangeToRead.begin;
state Key readEnd = rangeToRead.end;
state RangeResult rangeResult;
state std::vector<BulkLoadTaskState> res;
while (readBegin < readEnd) {
state int retryCount = 0;
loop {
try {
rangeResult.clear();
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
wait(store(rangeResult,
krmGetRanges(&tr,
bulkLoadTaskPrefix,
KeyRangeRef(readBegin, readEnd),
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES)));
break;
} catch (Error& e) {
if (retryCount > 30) {
throw timed_out();
}
wait(tr.onError(e));
retryCount++;
}
}
for (int i = 0; i < rangeResult.size() - 1; ++i) {
if (rangeResult[i].value.empty()) {
continue;
}
BulkLoadTaskState bulkLoadTaskState = decodeBulkLoadTaskState(rangeResult[i].value);
KeyRange range = Standalone(KeyRangeRef(rangeResult[i].key, rangeResult[i + 1].key));
if (range != bulkLoadTaskState.getRange()) {
ASSERT(bulkLoadTaskState.getRange().contains(range));
continue;
}
if (!phase.present() || phase.get() == bulkLoadTaskState.phase) {
res.push_back(bulkLoadTaskState);
}
if (res.size() >= limit) {
return res;
}
}
readBegin = rangeResult.back().key;
}
return res;
}
ACTOR Future<Void> setBulkLoadSubmissionTransaction(Transaction* tr, BulkLoadTaskState bulkLoadTask) {
ACTOR Future<Void> setBulkLoadSubmissionTransaction(Transaction* tr,
BulkLoadTaskState bulkLoadTask,
bool checkTaskExclusive) {
ASSERT(normalKeys.contains(bulkLoadTask.getRange()) &&
(bulkLoadTask.phase == BulkLoadPhase::Submitted ||
(bulkLoadTask.phase == BulkLoadPhase::Complete && bulkLoadTask.hasEmptyData())));
@ -2896,7 +2844,9 @@ ACTOR Future<Void> setBulkLoadSubmissionTransaction(Transaction* tr, BulkLoadTas
try {
wait(takeExclusiveReadLockOnRange(tr, bulkLoadTask.getRange(), rangeLockNameForBulkLoad));
} catch (Error& e) {
ASSERT(e.code() != error_code_range_locked_by_different_user); // Currently, only bulkload uses the range lock.
ASSERT(!checkTaskExclusive || e.code() != error_code_range_lock_reject);
// CheckTaskExclusive is set for bulkload job.
// Currently, only bulkload job uses the range lock, and tasks have exclusive ranges.
throw e;
}
bulkLoadTask.submitTime = now();
@ -2904,21 +2854,6 @@ ACTOR Future<Void> setBulkLoadSubmissionTransaction(Transaction* tr, BulkLoadTas
return Void();
}
// Submit bulkload task and overwrite any existing task and lock range
ACTOR Future<Void> submitBulkLoadTask(Database cx, BulkLoadTaskState bulkLoadTask) {
state Transaction tr(cx);
loop {
try {
wait(setBulkLoadSubmissionTransaction(&tr, bulkLoadTask));
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}
// Get bulk load task metadata with range and taskId and phase selector
// Throw error if the task is outdated or the task is not in any input phase at the tr read version
// TODO: check jobId
@ -2991,7 +2926,10 @@ ACTOR Future<BulkLoadTaskState> getBulkLoadTask(Transaction* tr,
return bulkLoadTaskState;
}
ACTOR Future<Void> setBulkLoadFinalizeTransaction(Transaction* tr, KeyRange range, UID taskId) {
ACTOR Future<Void> setBulkLoadFinalizeTransaction(Transaction* tr,
KeyRange range,
UID taskId,
bool checkTaskExclusive) {
state BulkLoadTaskState bulkLoadTaskState;
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
@ -3010,29 +2948,14 @@ ACTOR Future<Void> setBulkLoadFinalizeTransaction(Transaction* tr, KeyRange rang
try {
wait(releaseExclusiveReadLockOnRange(tr, bulkLoadTaskState.getRange(), rangeLockNameForBulkLoad));
} catch (Error& e) {
ASSERT(e.code() != error_code_range_locked_by_different_user); // Currently, only bulkload uses the range lock.
ASSERT(!checkTaskExclusive || e.code() != error_code_range_unlock_reject);
// CheckTaskExclusive is set for bulkload job.
// Currently, only bulkload job uses the range lock, and tasks have exclusive ranges.
throw e;
}
return Void();
}
// We finalize a bulkload task when the bulkload job see the task completes.
// Update bulkload task to acknowledge state and unlock the range.
// A acknowledged bulkload task will be automatically erased by DD.
ACTOR Future<Void> finalizeBulkLoadTask(Database cx, KeyRange range, UID taskId) {
state Transaction tr(cx);
loop {
try {
wait(setBulkLoadFinalizeTransaction(&tr, range, taskId));
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}
// This is the only place to update job history map. So, we check the number of job history entries here is sufficient
// to maintain that the number of jobs in the history is no more than BULKLOAD_JOB_HISTORY_COUNT_MAX.
ACTOR Future<Void> addBulkLoadJobToHistory(Transaction* tr, BulkLoadJobState jobState) {
@ -3647,7 +3570,7 @@ ACTOR Future<std::vector<RangeLockOwner>> getAllRangeLockOwners(Database cx) {
state Key beginKey = rangeLockOwnerKeys.begin;
state Key endKey = rangeLockOwnerKeys.end;
state Transaction tr(cx);
loop {
while (beginKey < endKey) {
state KeyRange rangeToRead = Standalone(KeyRangeRef(beginKey, endKey));
try {
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
@ -3656,32 +3579,30 @@ ACTOR Future<std::vector<RangeLockOwner>> getAllRangeLockOwners(Database cx) {
for (const auto& kv : result) {
RangeLockOwner owner = decodeRangeLockOwner(kv.value);
ASSERT(owner.isValid());
RangeLockOwnerName uidFromKey = decodeRangeLockOwnerKey(kv.key);
ASSERT(owner.getOwnerUniqueId() == uidFromKey);
res.push_back(owner);
beginKey = keyAfter(kv.key);
}
if (result[result.size() - 1].key == endKey) {
return res;
} else {
beginKey = result[result.size() - 1].key;
tr.reset();
if (!result.more) {
break;
}
} catch (Error& e) {
wait(tr.onError(e));
}
}
return res;
}
// Not transactional
ACTOR Future<std::vector<KeyRange>> getExclusiveReadLockOnRange(Database cx, KeyRange range) {
ACTOR Future<std::vector<std::pair<KeyRange, RangeLockState>>>
findExclusiveReadLockOnRange(Database cx, KeyRange range, Optional<RangeLockOwnerName> ownerName) {
if (range.end > normalKeys.end) {
throw range_lock_failed();
}
state std::vector<KeyRange> lockedRanges;
state std::vector<std::pair<KeyRange, RangeLockState>> lockedRanges;
state Key beginKey = range.begin;
state Key endKey = range.end;
state Transaction tr(cx);
loop {
while (beginKey < endKey) {
state KeyRange rangeToRead = Standalone(KeyRangeRef(beginKey, endKey));
try {
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
@ -3693,16 +3614,15 @@ ACTOR Future<std::vector<KeyRange>> getExclusiveReadLockOnRange(Database cx, Key
}
RangeLockStateSet rangeLockStateSet = decodeRangeLockStateSet(result[i].value);
ASSERT(rangeLockStateSet.isValid());
if (rangeLockStateSet.isLockedFor(RangeLockType::ExclusiveReadLock)) {
lockedRanges.push_back(Standalone(KeyRangeRef(result[i].key, result[i + 1].key)));
if (rangeLockStateSet.isLockedFor(RangeLockType::ExclusiveReadLock) &&
(!ownerName.present() ||
ownerName.get() == rangeLockStateSet.getAllLockStats()[0].getOwnerUniqueId())) {
// Exclusive lock can only have one lock in the set, so we just check the first lock in the set
lockedRanges.push_back(std::make_pair(Standalone(KeyRangeRef(result[i].key, result[i + 1].key)),
rangeLockStateSet.getAllLockStats()[0]));
}
}
if (result[result.size() - 1].key == range.end) {
break;
} else {
beginKey = result[result.size() - 1].key;
tr.reset();
}
beginKey = result.back().key;
} catch (Error& e) {
wait(tr.onError(e));
}
@ -3712,8 +3632,7 @@ ACTOR Future<std::vector<KeyRange>> getExclusiveReadLockOnRange(Database cx, Key
// Validate the input range and owner.
// If invalid, reject the request by throwing range_lock_failed error.
// Check if the range has been locked by a different user.
// If yes, reject the request by throwing range_locked_by_different_user error.
// If the range has been locked, reject the request by throwing range_lock_reject error.
ACTOR Future<Void> prepareExclusiveRangeLockOperation(Transaction* tr,
KeyRange range,
RangeLockOwnerName ownerUniqueID) {
@ -3736,7 +3655,6 @@ ACTOR Future<Void> prepareExclusiveRangeLockOperation(Transaction* tr,
state RangeLockOwner owner = decodeRangeLockOwner(ownerValue.get());
ASSERT(owner.isValid());
// Check lock state on the entire input range. Throw exception if the range has been locked by a different owner.
state RangeLockState newLock(RangeLockType::ExclusiveReadLock, ownerUniqueID);
state Key beginKey = range.begin;
state Key endKey = range.end;
state KeyRange rangeToRead;
@ -3750,13 +3668,68 @@ ACTOR Future<Void> prepareExclusiveRangeLockOperation(Transaction* tr,
RangeLockStateSet rangeLockStateSet = decodeRangeLockStateSet(res[i].value);
ASSERT(rangeLockStateSet.isValid());
auto lockSet = rangeLockStateSet.getLocks();
if (!lockSet.empty() && lockSet.find(newLock.getLockUniqueString()) == lockSet.end()) {
if (!lockSet.empty() && (!rangeLockStateSet.isLockedFor(RangeLockType::ExclusiveReadLock) ||
lockSet.find(RangeLockState(RangeLockType::ExclusiveReadLock, ownerUniqueID, range)
.getLockUniqueString()) == lockSet.end())) {
TraceEvent(SevDebug, "PrepareExclusiveRangeLockOperationFailed")
.detail("Reason", "Locked")
.detail("NewLock", newLock.toString())
.detail("ExistingLocks", rangeLockStateSet.toString())
.detail("Range", range);
throw range_locked_by_different_user(); // Has been locked by a different owner
.detail("NewLockType", RangeLockType::ExclusiveReadLock)
.detail("NewLockRange", range)
.detail("NewLockOwner", ownerUniqueID)
.detail("ExistingLocks", rangeLockStateSet.toString());
throw range_lock_reject(); // Has been locked
}
}
beginKey = res[res.size() - 1].key;
}
return Void();
}
ACTOR Future<Void> prepareExclusiveRangeUnlockOperation(Transaction* tr,
KeyRange range,
RangeLockOwnerName ownerUniqueID) {
// Check input range
if (range.end > normalKeys.end) {
TraceEvent(SevDebug, "PrepareExclusiveRangeUnlockOperationFailed")
.detail("Reason", "Range out of scope")
.detail("Range", range);
throw range_lock_failed();
}
// Check owner
state Optional<Value> ownerValue = wait(tr->get(rangeLockOwnerKeyFor(ownerUniqueID)));
if (!ownerValue.present()) {
TraceEvent(SevDebug, "PrepareExclusiveRangeUnlockOperationFailed")
.detail("Reason", "Owner not found")
.detail("Owner", ownerUniqueID)
.detail("Range", range);
throw range_lock_failed();
}
state RangeLockOwner owner = decodeRangeLockOwner(ownerValue.get());
ASSERT(owner.isValid());
// Check lock state on the entire input range. Throw exception if the range has been locked by a different owner.
state Key beginKey = range.begin;
state Key endKey = range.end;
state KeyRange rangeToRead;
while (beginKey < endKey) {
rangeToRead = KeyRangeRef(beginKey, endKey);
RangeResult res = wait(krmGetRanges(tr, rangeLockPrefix, rangeToRead));
for (int i = 0; i < res.size() - 1; i++) {
if (res[i].value.empty()) {
continue;
}
RangeLockStateSet rangeLockStateSet = decodeRangeLockStateSet(res[i].value);
ASSERT(rangeLockStateSet.isValid());
auto lockSet = rangeLockStateSet.getLocks();
if (!lockSet.empty() && (!rangeLockStateSet.isLockedFor(RangeLockType::ExclusiveReadLock) ||
lockSet.find(RangeLockState(RangeLockType::ExclusiveReadLock, ownerUniqueID, range)
.getLockUniqueString()) == lockSet.end())) {
TraceEvent(SevDebug, "PrepareExclusiveRangeUnlockOperationFailed")
.detail("Reason", "Has been locked by a different user or the same user with a different range")
.detail("UnLockOwner", ownerUniqueID)
.detail("UnLockRange", range)
.detail("ExistingLocks", rangeLockStateSet.toString());
throw range_unlock_reject();
}
}
beginKey = res[res.size() - 1].key;
@ -3775,9 +3748,9 @@ ACTOR Future<Void> takeExclusiveReadLockOnRange(Transaction* tr, KeyRange range,
// At this point, no lock presents on the range.
// Lock range by writting the range.
RangeLockStateSet rangeLockStateSet;
rangeLockStateSet.insertIfNotExist(RangeLockState(RangeLockType::ExclusiveReadLock, ownerUniqueID));
wait(krmSetRangeCoalescing(tr, rangeLockPrefix, range, normalKeys, rangeLockStateSetValue(rangeLockStateSet)));
TraceEvent(SevInfo, "TakeExclusiveReadLockOnRange").detail("Range", range);
rangeLockStateSet.insertIfNotExist(RangeLockState(RangeLockType::ExclusiveReadLock, ownerUniqueID, range));
wait(krmSetRange(tr, rangeLockPrefix, range, rangeLockStateSetValue(rangeLockStateSet)));
TraceEvent(SevInfo, "TakeExclusiveReadLockTransactionOnRange").detail("Range", range);
return Void();
}
@ -3786,11 +3759,57 @@ ACTOR Future<Void> takeExclusiveReadLockOnRange(Transaction* tr, KeyRange range,
ACTOR Future<Void> releaseExclusiveReadLockOnRange(Transaction* tr, KeyRange range, RangeLockOwnerName ownerUniqueID) {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
wait(prepareExclusiveRangeLockOperation(tr, range, ownerUniqueID));
wait(prepareExclusiveRangeUnlockOperation(tr, range, ownerUniqueID));
// At this point, no lock presents on the range.
// Unlock by overwiting the range.
wait(krmSetRangeCoalescing(tr, rangeLockPrefix, range, normalKeys, rangeLockStateSetValue(RangeLockStateSet())));
TraceEvent(SevInfo, "ReleaseExclusiveReadLockOnRange").detail("Range", range);
TraceEvent(SevInfo, "ReleaseExclusiveReadLockTransactionOnRange").detail("Range", range);
return Void();
}
ACTOR Future<Void> releaseExclusiveReadLockByUser(Database cx, RangeLockOwnerName ownerUniqueID) {
state Key beginKey = normalKeys.begin;
state Key endKey = normalKeys.end;
state Transaction tr(cx);
state int i = 0;
state RangeResult result;
state KeyRange rangeToRead;
state RangeLockStateSet currentRangeLockStateSet;
state KeyRange currentRange;
while (beginKey < endKey) {
rangeToRead = Standalone(KeyRangeRef(beginKey, endKey));
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
result.clear();
wait(store(result, krmGetRanges(&tr, rangeLockPrefix, rangeToRead)));
i = 0;
for (; i < result.size() - 1; i++) {
currentRange = KeyRangeRef(result[i].key, result[i + 1].key);
if (result[i].value.empty()) {
beginKey = currentRange.end;
continue;
}
currentRangeLockStateSet = decodeRangeLockStateSet(result[i].value);
ASSERT(currentRangeLockStateSet.isValid());
if (currentRangeLockStateSet.isLockedFor(RangeLockType::ExclusiveReadLock) &&
currentRangeLockStateSet.getAllLockStats()[0].getOwnerUniqueId() == ownerUniqueID) {
// TODO(BulkLoad): krmSetRangeCoalescing per small range is inefficient especially when the lock
// count is over 10K. Optimize this.
wait(krmSetRangeCoalescing(
&tr, rangeLockPrefix, currentRange, normalKeys, rangeLockStateSetValue(RangeLockStateSet())));
wait(tr.commit());
tr.reset();
beginKey = currentRange.end;
break;
} else {
beginKey = currentRange.end;
}
}
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}
@ -3801,6 +3820,7 @@ ACTOR Future<Void> takeExclusiveReadLockOnRange(Database cx, KeyRange range, Ran
try {
wait(takeExclusiveReadLockOnRange(&tr, range, ownerUniqueID));
wait(tr.commit());
TraceEvent(SevInfo, "TakeExclusiveReadLockOnRange").detail("Range", range);
break;
} catch (Error& e) {
wait(tr.onError(e));
@ -3816,6 +3836,7 @@ ACTOR Future<Void> releaseExclusiveReadLockOnRange(Database cx, KeyRange range,
try {
wait(releaseExclusiveReadLockOnRange(&tr, range, ownerUniqueID));
wait(tr.commit());
TraceEvent(SevInfo, "ReleaseExclusiveReadLockOnRange").detail("Range", range);
break;
} catch (Error& e) {
wait(tr.onError(e));

View File

@ -1291,13 +1291,6 @@ const Key rangeLockOwnerKeyFor(const RangeLockOwnerName& ownerUniqueID) {
return wr.toValue();
}
const RangeLockOwnerName decodeRangeLockOwnerKey(const KeyRef& key) {
std::string ownerUniqueID;
BinaryReader rd(key.removePrefix(rangeLockOwnerPrefix), Unversioned());
rd >> ownerUniqueID;
return ownerUniqueID;
}
const Value rangeLockOwnerValue(const RangeLockOwner& rangeLockOwner) {
return ObjectWriter::toValue(rangeLockOwner, IncludeVersion());
}

View File

@ -173,27 +173,18 @@ ACTOR Future<UID> cancelAuditStorage(Reference<IClusterConnectionRecord> cluster
// When the mode is on, DD will periodically check if there is any bulkload task to do by scaning the metadata.
ACTOR Future<int> setBulkLoadMode(Database cx, int mode);
// Get bulk load tasks which range is fully contained by the input range.
// If phase is provided, then return the task with the input phase.
ACTOR Future<std::vector<BulkLoadTaskState>> getBulkLoadTasksWithinRange(
Database cx,
KeyRange rangeToRead,
size_t limit = 10,
Optional<BulkLoadPhase> phase = Optional<BulkLoadPhase>());
// Create a bulkload task submission transaction without commit
// Used by ManagementAPI and bulkdumpRestore at DD
ACTOR Future<Void> setBulkLoadSubmissionTransaction(Transaction* tr, BulkLoadTaskState bulkLoadTask);
// Submit a bulk load task
ACTOR Future<Void> submitBulkLoadTask(Database cx, BulkLoadTaskState bulkLoadTask);
ACTOR Future<Void> setBulkLoadSubmissionTransaction(Transaction* tr,
BulkLoadTaskState bulkLoadTask,
bool checkTaskExclusive = true);
// Create an bulkload task acknowledge transaction without commit
// Used by ManagementAPI and bulkdumpRestore at DD
ACTOR Future<Void> setBulkLoadFinalizeTransaction(Transaction* tr, KeyRange range, UID taskId);
// Finalize a bulk load task if it has been completed
ACTOR Future<Void> finalizeBulkLoadTask(Database cx, KeyRange range, UID taskId);
ACTOR Future<Void> setBulkLoadFinalizeTransaction(Transaction* tr,
KeyRange range,
UID taskId,
bool checkTaskExclusive = true);
// Get bulk load task for the input range and taskId
ACTOR Future<BulkLoadTaskState> getBulkLoadTask(Transaction* tr,
@ -273,7 +264,13 @@ ACTOR Future<Void> releaseExclusiveReadLockOnRange(Transaction* tr, KeyRange ran
ACTOR Future<Void> releaseExclusiveReadLockOnRange(Database cx, KeyRange range, RangeLockOwnerName ownerUniqueID);
// Get locked ranges within the input range (the input range must be within normalKeys)
ACTOR Future<std::vector<KeyRange>> getExclusiveReadLockOnRange(Database cx, KeyRange range);
ACTOR Future<std::vector<std::pair<KeyRange, RangeLockState>>> findExclusiveReadLockOnRange(
Database cx,
KeyRange range,
Optional<RangeLockOwnerName> ownerName = Optional<RangeLockOwnerName>());
// Clear all exclusive read lock by the input user. Not transactional.
ACTOR Future<Void> releaseExclusiveReadLockByUser(Database cx, RangeLockOwnerName ownerUniqueID);
ACTOR Future<Void> printHealthyZone(Database cx);
ACTOR Future<bool> clearHealthyZone(Database cx, bool printWarning = false, bool clearSSFailureZoneString = false);

View File

@ -30,6 +30,7 @@
using RangeLockOwnerName = std::string;
using RangeLockUniqueString = std::string;
using RangeLockID = std::string;
enum class RangeLockType : uint8_t {
Invalid = 0,
@ -88,8 +89,8 @@ struct RangeLockState {
public:
RangeLockState() = default;
RangeLockState(RangeLockType type, const RangeLockOwnerName& ownerUniqueId)
: lockType(type), ownerUniqueId(ownerUniqueId) {
RangeLockState(RangeLockType type, const RangeLockOwnerName& ownerUniqueId, const KeyRange& range)
: lockType(type), ownerUniqueId(ownerUniqueId), range(range) {
ASSERT(isValid());
}
@ -105,26 +106,36 @@ public:
}
}
KeyRange getRange() const { return range; }
std::string toString() const {
return "RangeLockState: [LockType]: " + rangeLockTypeString(lockType) + " [Owner]: " + ownerUniqueId;
return "RangeLockState: [LockType]: " + rangeLockTypeString(lockType) + ", [Owner]: " + ownerUniqueId +
", [Range]: " + range.toString() + ", [RangeLockID]: " + lockId;
}
bool isLockedFor(RangeLockType inputLockType) const { return lockType == inputLockType; }
bool operator==(RangeLockState const& r) const {
return lockType == r.lockType && ownerUniqueId == r.ownerUniqueId;
return lockType == r.lockType && ownerUniqueId == r.ownerUniqueId && range == r.range;
}
RangeLockUniqueString getLockUniqueString() const { return ownerUniqueId + rangeLockTypeString(lockType); }
// TODO: use lockId
RangeLockUniqueString getLockUniqueString() const {
return ownerUniqueId + rangeLockTypeString(lockType) + range.toString();
}
RangeLockOwnerName getOwnerUniqueId() const { return ownerUniqueId; }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ownerUniqueId, lockType);
serializer(ar, ownerUniqueId, lockType, range, lockId);
}
private:
RangeLockOwnerName ownerUniqueId; // The app/user that owns the lock.
RangeLockType lockType;
KeyRange range;
RangeLockID lockId; // Reserved for physical lock. Is not used now.
};
// Persisted state on a range. A range can have multiple locks distinguishing by owner and lockType.

View File

@ -560,7 +560,6 @@ RangeLockStateSet decodeRangeLockStateSet(const ValueRef& value);
extern const KeyRangeRef rangeLockOwnerKeys;
extern const KeyRef rangeLockOwnerPrefix;
const Key rangeLockOwnerKeyFor(const RangeLockOwnerName& ownerUniqueID);
const RangeLockOwnerName decodeRangeLockOwnerKey(const KeyRef& key);
const Value rangeLockOwnerValue(const RangeLockOwner& rangeLockOwner);
RangeLockOwner decodeRangeLockOwner(const ValueRef& value);

View File

@ -323,6 +323,9 @@ struct BulkDumping : TestWorkload {
// BulkLoad uses range lock
wait(registerRangeLockOwner(cx, rangeLockNameForBulkLoad, rangeLockNameForBulkLoad));
std::vector<RangeLockOwner> lockOwners = wait(getAllRangeLockOwners(cx));
ASSERT(lockOwners.size() == 1 && lockOwners[0].getOwnerUniqueId() == rangeLockNameForBulkLoad);
// Submit a bulk dump job
state int oldBulkDumpMode = 0;
wait(store(oldBulkDumpMode, setBulkDumpMode(cx, 1))); // Enable bulkDump
@ -385,8 +388,16 @@ struct BulkDumping : TestWorkload {
break;
}
// Make sure all ranges locked by the workload are unlocked
std::vector<std::pair<KeyRange, RangeLockState>> res =
wait(findExclusiveReadLockOnRange(cx, normalKeys, rangeLockNameForBulkLoad));
ASSERT(res.empty());
wait(removeRangeLockOwner(cx, rangeLockNameForBulkLoad));
std::vector<RangeLockOwner> lockOwnersAfterRemove = wait(getAllRangeLockOwners(cx));
ASSERT(lockOwnersAfterRemove.empty());
return Void();
}
};

View File

@ -21,6 +21,7 @@
#include "fdbclient/BulkLoading.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/BulkLoadUtil.actor.h"
#include "fdbserver/RocksDBCheckpointUtils.actor.h"
#include "fdbserver/StorageMetrics.actor.h"
@ -103,50 +104,49 @@ struct BulkLoading : TestWorkload {
return Void();
}
ACTOR Future<Void> submitBulkLoadTasks(BulkLoading* self, Database cx, std::vector<BulkLoadTaskState> tasks) {
state int i = 0;
for (; i < tasks.size(); i++) {
loop {
try {
wait(submitBulkLoadTask(cx, tasks[i]));
TraceEvent(SevDebug, "BulkLoadingSubmitBulkLoadTask")
.detail("BulkLoadTaskState", tasks[i].toString());
break;
} catch (Error& e) {
TraceEvent(SevWarn, "BulkLoadingSubmitBulkLoadTaskError")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.errorUnsuppressed(e)
.detail("BulkLoadTaskState", tasks[i].toString());
wait(delay(0.1));
// Submit task can be failed due to range lock reject
ACTOR Future<bool> submitBulkLoadTask(Database cx, BulkLoadTaskState bulkLoadTask) {
loop {
try {
state Transaction tr(cx);
wait(setBulkLoadSubmissionTransaction(&tr, bulkLoadTask, /*checkTaskExclusive=*/false));
wait(tr.commit());
TraceEvent(SevDebug, "BulkLoadingSubmitBulkLoadTask")
.detail("BulkLoadTaskState", bulkLoadTask.toString());
break;
} catch (Error& e) {
TraceEvent(SevWarn, "BulkLoadingSubmitBulkLoadTaskError")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.errorUnsuppressed(e)
.detail("BulkLoadTaskState", bulkLoadTask.toString());
if (e.code() == error_code_range_lock_reject) {
return false;
}
wait(delay(0.1));
}
}
return Void();
return true;
}
ACTOR Future<Void> finalizeBulkLoadTasks(BulkLoading* self, Database cx, std::vector<BulkLoadTaskState> tasks) {
state int i = 0;
for (; i < tasks.size(); i++) {
loop {
try {
wait(finalizeBulkLoadTask(cx, tasks[i].getRange(), tasks[i].getTaskId()));
TraceEvent(SevDebug, "BulkLoadingAcknowledgeBulkLoadTask")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("BulkLoadTaskState", tasks[i].toString());
break;
} catch (Error& e) {
TraceEvent(SevWarn, "BulkLoadingAcknowledgeBulkLoadTaskError")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.errorUnsuppressed(e)
.detail("BulkLoadTaskState", tasks[i].toString());
if (e.code() == error_code_bulkload_task_outdated) {
break; // has been erased or overwritten by other tasks
}
wait(delay(0.1));
}
// Finish task must always succeed
ACTOR Future<Void> finalizeBulkLoadTask(Database cx, KeyRange range, UID taskId) {
loop {
try {
state Transaction tr(cx);
wait(setBulkLoadFinalizeTransaction(&tr, range, taskId, /*checkTaskExclusive=*/false));
wait(tr.commit());
TraceEvent(SevDebug, "BulkLoadingAcknowledgeBulkLoadTask")
.detail("TaskID", taskId.toString())
.detail("TaskRange", range);
break;
} catch (Error& e) {
TraceEvent(SevWarn, "BulkLoadingAcknowledgeBulkLoadTaskError")
.errorUnsuppressed(e)
.detail("TaskID", taskId.toString())
.detail("TaskRange", range);
ASSERT(e.code() != error_code_bulkload_task_outdated && e.code() != error_code_range_lock_reject);
wait(delay(0.1));
}
}
return Void();
@ -508,46 +508,37 @@ struct BulkLoading : TestWorkload {
// Issue three non-overlapping tasks and check data consistency and correctness
ACTOR Future<Void> simpleTest(BulkLoading* self, Database cx) {
TraceEvent("BulkLoadingWorkLoadSimpleTestBegin");
state int counter = 0;
state int oldBulkLoadMode = 0;
state std::vector<BulkLoadTaskState> bulkLoadTaskStates;
state std::vector<KeyRange> taskRanges;
state std::vector<KeyRange> errorRanges;
state std::vector<BulkLoadTaskTestUnit> taskUnits;
loop { // New tasks overwrite old tasks on the same range
bulkLoadTaskStates.clear();
taskRanges.clear();
errorRanges.clear();
taskUnits.clear();
for (int i = 0; i < 2; i++) {
std::string indexStr = std::to_string(i);
std::string indexStrNext = std::to_string(i + 1);
Key beginKey = StringRef(indexStr);
Key endKey = StringRef(indexStrNext);
std::string folderPath = joinPath(simulationBulkLoadFolder, indexStr);
int dataSize = std::pow(10, deterministicRandom()->randomInt(0, 4));
BulkLoadTaskTestUnit taskUnit =
self->generateBulkLoadTaskUnit(self, folderPath, dataSize, KeyRangeRef(beginKey, endKey));
bulkLoadTaskStates.push_back(taskUnit.bulkLoadTask);
taskRanges.push_back(taskUnit.bulkLoadTask.getRange());
taskUnits.push_back(taskUnit);
}
// Issue above 2 tasks in the same transaction
wait(self->submitBulkLoadTasks(self, cx, bulkLoadTaskStates));
TraceEvent("BulkLoadingWorkLoadSimpleTestIssuedTasks");
wait(store(oldBulkLoadMode, setBulkLoadMode(cx, 1)));
TraceEvent("BulkLoadingWorkLoadSimpleTestSetMode").detail("OldMode", oldBulkLoadMode).detail("NewMode", 1);
std::vector<BulkLoadTaskState> errorTasks = wait(self->waitUntilAllTaskCompleteOrError(self, cx));
for (const auto& errorTask : errorTasks) {
errorRanges.push_back(errorTask.getRange());
}
TraceEvent("BulkLoadingWorkLoadSimpleTestAllComplete");
counter++;
if (counter > 1) {
break;
}
state int i = 0;
for (i = 0; i < 2; i++) {
std::string indexStr = std::to_string(i);
std::string indexStrNext = std::to_string(i + 1);
Key beginKey = StringRef(indexStr);
Key endKey = StringRef(indexStrNext);
std::string folderPath = joinPath(simulationBulkLoadFolder, indexStr);
int dataSize = std::pow(10, deterministicRandom()->randomInt(0, 4));
BulkLoadTaskTestUnit taskUnit =
self->generateBulkLoadTaskUnit(self, folderPath, dataSize, KeyRangeRef(beginKey, endKey));
bulkLoadTaskStates.push_back(taskUnit.bulkLoadTask);
taskRanges.push_back(taskUnit.bulkLoadTask.getRange());
taskUnits.push_back(taskUnit);
bool succeed = wait(self->submitBulkLoadTask(cx, taskUnit.bulkLoadTask));
ASSERT(succeed);
}
TraceEvent("BulkLoadingWorkLoadSimpleTestIssuedTasks");
wait(store(oldBulkLoadMode, setBulkLoadMode(cx, 1)));
TraceEvent("BulkLoadingWorkLoadSimpleTestSetMode").detail("OldMode", oldBulkLoadMode).detail("NewMode", 1);
std::vector<BulkLoadTaskState> errorTasks = wait(self->waitUntilAllTaskCompleteOrError(self, cx));
for (const auto& errorTask : errorTasks) {
errorRanges.push_back(errorTask.getRange());
}
TraceEvent("BulkLoadingWorkLoadSimpleTestAllComplete");
// Check data
wait(store(oldBulkLoadMode, setBulkLoadMode(cx, 0)));
TraceEvent("BulkLoadingWorkLoadSimpleTestSetMode").detail("OldMode", oldBulkLoadMode).detail("NewMode", 0);
@ -571,7 +562,9 @@ struct BulkLoading : TestWorkload {
// Check bulk load metadata
wait(store(oldBulkLoadMode, setBulkLoadMode(cx, 1)));
TraceEvent("BulkLoadingWorkLoadSimpleTestSetMode").detail("OldMode", oldBulkLoadMode).detail("NewMode", 1);
wait(self->finalizeBulkLoadTasks(self, cx, bulkLoadTaskStates));
for (i = 0; i < bulkLoadTaskStates.size(); i++) {
wait(self->finalizeBulkLoadTask(cx, bulkLoadTaskStates[i].getRange(), bulkLoadTaskStates[i].getTaskId()));
}
wait(acknowledgeAllErrorBulkLoadTasks(cx, self->jobId, normalKeys));
loop {
bool cleared = wait(self->checkBulkLoadMetadataCleared(self, cx));
@ -626,8 +619,10 @@ struct BulkLoading : TestWorkload {
int dataSize = std::pow(10, deterministicRandom()->randomInt(0, 4));
taskUnit = self->generateBulkLoadTaskUnit(self, folderPath, dataSize);
ASSERT(normalKeys.contains(taskUnit.bulkLoadTask.getRange()));
taskMap.insert(taskUnit.bulkLoadTask.getRange(), taskUnit);
wait(self->submitBulkLoadTasks(self, cx, { taskUnit.bulkLoadTask }));
bool succeed = wait(self->submitBulkLoadTask(cx, taskUnit.bulkLoadTask));
if (succeed) {
taskMap.insert(taskUnit.bulkLoadTask.getRange(), taskUnit);
}
if (deterministicRandom()->coinflip()) {
std::vector<BulkLoadTaskState> errorTasks = wait(self->waitUntilAllTaskCompleteOrError(self, cx));
}
@ -697,13 +692,12 @@ struct BulkLoading : TestWorkload {
std::vector<KeyValue> dbkvs = wait(self->getKvsFromDB(self, cx, ignoreRanges, completeTaskRanges));
ASSERT(self->checkSame(self, kvs, dbkvs));
// Clear all range lock
wait(releaseExclusiveReadLockOnRange(cx, normalKeys, "BulkLoad"));
// Clear metadata
wait(store(oldBulkLoadMode, setBulkLoadMode(cx, 1)));
TraceEvent("BulkLoadingWorkLoadComplexTestSetMode").detail("OldMode", oldBulkLoadMode).detail("NewMode", 1);
wait(self->finalizeBulkLoadTasks(self, cx, bulkLoadTaskStates));
for (i = 0; i < bulkLoadTaskStates.size(); i++) {
wait(self->finalizeBulkLoadTask(cx, bulkLoadTaskStates[i].getRange(), bulkLoadTaskStates[i].getTaskId()));
}
wait(acknowledgeAllErrorBulkLoadTasks(cx, self->jobId, normalKeys));
loop {
bool cleared = wait(self->checkBulkLoadMetadataCleared(self, cx));
@ -712,23 +706,15 @@ struct BulkLoading : TestWorkload {
}
wait(delay(1.0));
}
// Make sure all ranges locked by the workload are unlocked
std::vector<std::pair<KeyRange, RangeLockState>> res =
wait(findExclusiveReadLockOnRange(cx, normalKeys, rangeLockNameForBulkLoad));
ASSERT(res.empty());
TraceEvent("BulkLoadingWorkLoadComplexTestComplete");
return Void();
}
// For offline test
void produceLargeData(BulkLoading* self, Database cx) {
for (int i = 0; i < 3; i++) {
std::string folderName = std::to_string(i);
Key beginKey = StringRef(std::to_string(i));
Key endKey = StringRef(std::to_string(i + 1));
KeyRange range = KeyRangeRef(beginKey, endKey);
std::string folderPath = joinPath(simulationBulkLoadFolder, folderName);
self->generateBulkLoadTaskUnit(self, folderPath, 5000000, range);
}
return;
}
ACTOR Future<Void> _start(BulkLoading* self, Database cx) {
if (self->clientId != 0) {
return Void();
@ -756,6 +742,9 @@ struct BulkLoading : TestWorkload {
wait(registerRangeLockOwner(cx, rangeLockNameForBulkLoad, rangeLockNameForBulkLoad));
std::vector<RangeLockOwner> lockOwners = wait(getAllRangeLockOwners(cx));
ASSERT(lockOwners.size() == 1 && lockOwners[0].getOwnerUniqueId() == rangeLockNameForBulkLoad);
// Run test
if (deterministicRandom()->coinflip()) {
// Inject data to three non-overlapping ranges
@ -764,10 +753,12 @@ struct BulkLoading : TestWorkload {
// Inject data to many ranges and those ranges can be overlapping
wait(self->complexTest(self, cx));
}
// self->produceLargeData(self, cx); // Produce data set that is used in loop back cluster test
wait(removeRangeLockOwner(cx, rangeLockNameForBulkLoad));
std::vector<RangeLockOwner> lockOwnersAfterRemove = wait(getAllRangeLockOwners(cx));
ASSERT(lockOwnersAfterRemove.empty());
return Void();
}
};

View File

@ -30,7 +30,6 @@
#include "flow/IRandom.h"
#include "flow/Trace.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include <string>
struct RandomRangeLockWorkload : FailureInjectionWorkload {
static constexpr auto NAME = "RandomRangeLock";
@ -84,11 +83,11 @@ struct RandomRangeLockWorkload : FailureInjectionWorkload {
}
}
ACTOR Future<Void> lockActor(Database cx, RandomRangeLockWorkload* self) {
ACTOR Future<Void> lockActor(Database cx, RandomRangeLockWorkload* self, std::string rangeLockOwnerNamePrefix) {
state double testDuration = deterministicRandom()->random01() * self->maxLockDuration;
state double testStartDelay = deterministicRandom()->random01() * self->maxStartDelay;
state std::string rangeLockOwnerName =
"Owner" + std::to_string(deterministicRandom()->randomInt(0, self->lockActorCount));
rangeLockOwnerNamePrefix + "-" + std::to_string(deterministicRandom()->randomInt(0, self->lockActorCount));
// Here we intentionally introduced duplicated owner name between different lockActor
std::string lockOwnerDescription = rangeLockOwnerName + ":" + self->getRandomStringRef().toString();
wait(registerRangeLockOwner(cx, rangeLockOwnerName, lockOwnerDescription));
@ -111,14 +110,33 @@ struct RandomRangeLockWorkload : FailureInjectionWorkload {
ASSERT(range.end <= normalKeys.end);
} catch (Error& e) {
if (e.code() == error_code_range_lock_failed) {
TraceEvent(SevWarnAlways, "InjectRangeLockFailed")
.detail("RangeLockOwnerName", rangeLockOwnerName)
.detail("Range", range)
.detail("LockTime", testDuration);
ASSERT(range.end > normalKeys.end);
} else if (e.code() == error_code_range_locked_by_different_user) {
} else if (e.code() == error_code_range_lock_reject) {
TraceEvent(SevWarnAlways, "InjectRangeLockRejected")
.detail("RangeLockOwnerName", rangeLockOwnerName)
.detail("Range", range)
.detail("LockTime", testDuration);
// pass
} else {
TraceEvent(SevError, "InjectRangeLockError")
.errorUnsuppressed(e)
.detail("RangeLockOwnerName", rangeLockOwnerName)
.detail("Range", range)
.detail("LockTime", testDuration);
throw e;
}
}
wait(delay(testDuration));
TraceEvent(SevWarnAlways, "InjectRangeUnlockSubmit")
.detail("RangeLockOwnerName", rangeLockOwnerName)
.detail("Range", range)
.detail("LockStartDelayTime", testStartDelay)
.detail("LockTime", testDuration);
try {
wait(releaseExclusiveReadLockOnRange(cx, range, rangeLockOwnerName));
TraceEvent(SevWarnAlways, "InjectRangeUnlocked")
@ -127,13 +145,27 @@ struct RandomRangeLockWorkload : FailureInjectionWorkload {
ASSERT(range.end <= normalKeys.end);
} catch (Error& e) {
if (e.code() == error_code_range_lock_failed) {
TraceEvent(SevWarnAlways, "InjectRangeUnlockFailed")
.detail("RangeLockOwnerName", rangeLockOwnerName)
.detail("Range", range)
.detail("LockTime", testDuration);
ASSERT(range.end > normalKeys.end);
} else if (e.code() == error_code_range_locked_by_different_user) {
} else if (e.code() == error_code_range_unlock_reject) {
TraceEvent(SevWarnAlways, "InjectRangeUnlockRejected")
.detail("RangeLockOwnerName", rangeLockOwnerName)
.detail("Range", range)
.detail("LockTime", testDuration);
// pass
} else {
TraceEvent(SevError, "InjectRangeUnlockError")
.errorUnsuppressed(e)
.detail("RangeLockOwnerName", rangeLockOwnerName)
.detail("Range", range)
.detail("LockTime", testDuration);
throw e;
}
}
return Void();
}
@ -148,11 +180,22 @@ struct RandomRangeLockWorkload : FailureInjectionWorkload {
// The rangeLock mechanism should approperiately handled those conflict.
// When all actors complete, it is expected that all locks are removed,
// and this injected workload should not block other workloads.
state std::string rangeLockOwnerNamePrefix = "Owner" + std::to_string(self->clientId);
std::vector<Future<Void>> actors;
for (int i = 0; i < self->lockActorCount; i++) {
actors.push_back(self->lockActor(cx, self));
actors.push_back(self->lockActor(cx, self, rangeLockOwnerNamePrefix));
}
wait(waitForAll(actors));
// Make sure all ranges locked by the workload client are unlocked
state int j = 0;
for (; j < self->lockActorCount; j++) {
std::vector<std::pair<KeyRange, RangeLockState>> res = wait(
findExclusiveReadLockOnRange(cx, normalKeys, rangeLockOwnerNamePrefix + "-" + std::to_string(j)));
ASSERT(res.empty());
}
TraceEvent("RandomRangeLockWorkloadEnd").detail("OwnerPrefix", rangeLockOwnerNamePrefix);
}
return Void();
}

View File

@ -144,63 +144,17 @@ struct RangeLocking : TestWorkload {
}
}
ACTOR Future<Void> simpleTest(RangeLocking* self, Database cx) {
state Key keyUpdate = "11"_sr;
state KeyRange keyToClear = KeyRangeRef("1"_sr, "3"_sr);
state KeyRange rangeLock = KeyRangeRef("1"_sr, "2"_sr);
state Optional<Value> value;
state std::vector<KeyRange> lockedRanges;
wait(self->setKey(cx, keyUpdate, "1"_sr));
wait(store(value, self->getKey(cx, keyUpdate)));
ASSERT(value.present() && value.get() == "1"_sr);
wait(self->clearKey(cx, keyUpdate));
wait(store(value, self->getKey(cx, keyUpdate)));
ASSERT(!value.present());
wait(takeExclusiveReadLockOnRange(cx, rangeLock, self->rangeLockOwnerName));
TraceEvent("RangeLockWorkLoadLockRange").detail("Range", rangeLock);
wait(store(lockedRanges, getExclusiveReadLockOnRange(cx, normalKeys)));
TraceEvent("RangeLockWorkLoadGetLockedRange")
.detail("Range", rangeLock)
.detail("LockState", describe(lockedRanges));
try {
wait(self->setKey(cx, keyUpdate, "2"_sr));
ASSERT(false);
} catch (Error& e) {
ASSERT(e.code() == error_code_transaction_rejected_range_locked);
std::string getLockRangesString(const std::vector<std::pair<KeyRange, RangeLockState>>& locks) {
std::string res = "";
int count = 0;
for (const auto& lock : locks) {
res = res + lock.first.toString();
if (count < locks.size()) {
res = res + ", ";
}
count = count + 1;
}
try {
wait(self->clearRange(cx, keyToClear));
ASSERT(false);
} catch (Error& e) {
ASSERT(e.code() == error_code_transaction_rejected_range_locked);
}
wait(store(value, self->getKey(cx, keyUpdate)));
ASSERT(!value.present());
wait(releaseExclusiveReadLockOnRange(cx, rangeLock, self->rangeLockOwnerName));
TraceEvent("RangeLockWorkLoadUnlockRange").detail("Range", rangeLock);
lockedRanges.clear();
wait(store(lockedRanges, getExclusiveReadLockOnRange(cx, normalKeys)));
TraceEvent("RangeLockWorkLoadGetLockedRange")
.detail("Range", rangeLock)
.detail("LockState", describe(lockedRanges));
wait(self->setKey(cx, keyUpdate, "3"_sr));
wait(store(value, self->getKey(cx, keyUpdate)));
ASSERT(value.present() && value.get() == "3"_sr);
return Void();
return res;
}
KeyValue getRandomKeyValue() const {
@ -283,7 +237,7 @@ struct RangeLocking : TestWorkload {
.errorUnsuppressed(e)
.detail("Ops", "LockFailed")
.detail("Range", range);
ASSERT(e.code() == error_code_range_locked_by_different_user);
ASSERT(e.code() == error_code_range_lock_reject);
continue; // Do not add the operation to lockRangeOperations.
}
} else {
@ -300,7 +254,7 @@ struct RangeLocking : TestWorkload {
.errorUnsuppressed(e)
.detail("Ops", "UnlockFailed")
.detail("Range", range);
ASSERT(e.code() == error_code_range_locked_by_different_user);
ASSERT(e.code() == error_code_range_unlock_reject);
continue; // Do not add the operation to lockRangeOperations.
}
}
@ -382,9 +336,13 @@ struct RangeLocking : TestWorkload {
}
ACTOR Future<std::vector<KeyRange>> getLockedRangesFromDB(Database cx) {
state std::vector<KeyRange> res;
wait(store(res, getExclusiveReadLockOnRange(cx, normalKeys)));
return coalesceRangeList(res);
state std::vector<std::pair<KeyRange, RangeLockState>> res;
wait(store(res, findExclusiveReadLockOnRange(cx, normalKeys)));
std::vector<KeyRange> ranges;
for (const auto& lock : res) {
ranges.push_back(lock.first);
}
return coalesceRangeList(ranges);
}
std::vector<KeyRange> getLockedRangesFromMemory(RangeLocking* self) {
@ -490,8 +448,6 @@ struct RangeLocking : TestWorkload {
ACTOR Future<Void> complexTest(RangeLocking* self, Database cx) {
state int iterationCount = 100;
state int iteration = 0;
state std::string rangeLockOwnerName = "RangeLockingSimpleTest";
wait(registerRangeLockOwner(cx, rangeLockOwnerName, rangeLockOwnerName));
loop {
if (iteration > iterationCount || self->shouldExit) {
break;
@ -529,17 +485,121 @@ struct RangeLocking : TestWorkload {
.detail("Phase", "CheckDBCorrectness");
iteration++;
}
wait(releaseExclusiveReadLockOnRange(cx, normalKeys, self->rangeLockOwnerName));
wait(releaseExclusiveReadLockByUser(cx, self->rangeLockOwnerName));
std::vector<std::pair<KeyRange, RangeLockState>> locks = wait(findExclusiveReadLockOnRange(cx, normalKeys));
ASSERT(locks.empty());
TraceEvent("RangeLockWorkloadProgress").detail("Phase", "End");
return Void();
}
bool sameRangeList(const std::vector<KeyRange>& rangesA,
const std::vector<KeyRange>& rangesB,
const RangeLockOwnerName& owner) {
if (rangesA.size() != rangesB.size()) {
TraceEvent(SevError, "RangeLockWorkloadTestUnlockRangeByUserMismatch")
.detail("RangesA", describe(rangesA))
.detail("RangesB", describe(rangesB))
.detail("Owner", owner);
return false;
}
for (const auto& rangeA : rangesA) {
if (std::find(rangesB.begin(), rangesB.end(), rangeA) == rangesB.end()) {
TraceEvent(SevError, "RangeLockWorkloadTestUnlockRangeByUserMismatch")
.detail("RangesA", describe(rangesA))
.detail("RangesB", describe(rangesB))
.detail("Owner", owner);
return false;
}
}
for (const auto& rangeB : rangesB) {
if (std::find(rangesA.begin(), rangesA.end(), rangeB) == rangesA.end()) {
TraceEvent(SevError, "RangeLockWorkloadTestUnlockRangeByUserMismatch")
.detail("RangesA", describe(rangesA))
.detail("RangesB", describe(rangesB))
.detail("Owner", owner);
return false;
}
}
return true;
}
ACTOR Future<Void> testUnlockByUser(RangeLocking* self, Database cx) {
state int i = 0;
state int j = 0;
state std::unordered_map<RangeLockOwnerName, std::vector<KeyRange>> rangeLocks;
state RangeLockOwnerName rangeLockOwnerName;
state std::vector<RangeLockOwnerName> candidates;
state KeyRange rangeToLock;
state std::vector<KeyRange> lockedRanges;
state std::vector<RangeLockOwnerName> usersToUnlock; // can contain duplicated users
state std::vector<std::pair<KeyRange, RangeLockState>> locksPerUser;
for (; i < 100; i++) {
rangeLockOwnerName = "TestUnlockByUser" + std::to_string(i);
wait(registerRangeLockOwner(cx, rangeLockOwnerName, rangeLockOwnerName));
lockedRanges.clear();
for (; j < 2; j++) {
try {
rangeToLock = self->getRandomRange();
wait(takeExclusiveReadLockOnRange(cx, rangeToLock, rangeLockOwnerName));
lockedRanges.push_back(rangeToLock);
TraceEvent("RangeLockWorkloadTestUnlockRangeByUser")
.detail("Ops", "LockRange")
.detail("Range", rangeToLock)
.detail("User", rangeLockOwnerName);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
}
ASSERT(e.code() == error_code_range_lock_reject);
}
}
auto res = rangeLocks.insert({ rangeLockOwnerName, lockedRanges });
ASSERT(res.second);
candidates.push_back(rangeLockOwnerName);
}
for (i = 0; i < 10; i++) {
usersToUnlock.push_back(deterministicRandom()->randomChoice(candidates));
}
for (i = 0; i < usersToUnlock.size(); i++) {
wait(releaseExclusiveReadLockByUser(cx, usersToUnlock[i]));
TraceEvent("RangeLockWorkloadTestUnlockRangeByUser")
.detail("Ops", "Unlock by user")
.detail("User", usersToUnlock[i]);
}
for (i = 0; i < candidates.size(); i++) {
locksPerUser.clear();
wait(store(locksPerUser, findExclusiveReadLockOnRange(cx, normalKeys, candidates[i])));
if (std::find(usersToUnlock.begin(), usersToUnlock.end(), candidates[i]) != usersToUnlock.end()) {
TraceEvent("RangeLockWorkloadTestUnlockRangeByUser")
.detail("Ops", "Find unlocked user")
.detail("User", candidates[i])
.detail("LockCount", locksPerUser.size());
ASSERT(locksPerUser.empty());
} else {
std::vector<KeyRange> lockedRangeFromMetadata;
for (const auto& lock : locksPerUser) {
ASSERT(lock.first == lock.second.getRange());
lockedRangeFromMetadata.push_back(lock.first);
}
TraceEvent("RangeLockWorkloadTestUnlockRangeByUser")
.detail("Ops", "Find locked user")
.detail("User", candidates[i])
.detail("LockCount", locksPerUser.size());
ASSERT(self->sameRangeList(coalesceRangeList(lockedRangeFromMetadata),
coalesceRangeList(rangeLocks[candidates[i]]),
candidates[i]));
}
}
return Void();
}
ACTOR Future<Void> _start(RangeLocking* self, Database cx) {
if (self->clientId != 0) {
return Void();
}
// wait(self->simpleTest(self, cx));
wait(self->complexTest(self, cx));
wait(self->testUnlockByUser(self, cx));
return Void();
}
};

View File

@ -49,6 +49,13 @@ struct WatchesWorkload : TestWorkload {
tempRand.randomShuffle(nodeOrder);
}
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override {
// We disable RandomRangeLock workload injection because watchesWorker() does not handle the
// transaction error, in particular, transaction_rejected_range_locked can happen when enabling RandomRangeLock.
// TODO: remove after the workload handles the transaction error.
out.insert("RandomRangeLock");
}
Future<Void> setup(Database const& cx) override {
// return _setup(cx, this);
std::vector<Future<Void>> setupActors;

View File

@ -163,7 +163,8 @@ ERROR( bulkdump_task_failed, 1243, "Bulk dumping task failed" )
ERROR( bulkdump_task_outdated, 1244, "Bulk dumping task outdated" )
ERROR( bulkload_fileset_invalid_filepath, 1245, "Bulkload fileset provides invalid filepath" )
ERROR( bulkload_manifest_decode_error, 1246, "Bulkload manifest string is failed to decode" )
ERROR( range_locked_by_different_user, 1247, "Range has been locked by a different user" )
ERROR( range_lock_reject, 1247, "Range lock is rejected" )
ERROR( range_unlock_reject, 1248, "Range unlock is rejected" )
// 15xx Platform errors
ERROR( platform_error, 1500, "Platform error" )