Merge pull request #3872 from sfc-gh-clin:add-lock-unlock-to-special-keys
Add lock/unlock to special keys
This commit is contained in:
commit
bbc6d2aa03
|
@ -946,6 +946,10 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
|||
std::make_unique<ProcessClassSourceRangeImpl>(
|
||||
KeyRangeRef(LiteralStringRef("process/class_source/"), LiteralStringRef("process/class_source0"))
|
||||
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin)));
|
||||
registerSpecialKeySpaceModule(
|
||||
SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||
std::make_unique<LockDatabaseImpl>(singleKeyRange(LiteralStringRef("dbLocked"))
|
||||
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
|
||||
}
|
||||
if (apiVersionAtLeast(630)) {
|
||||
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, SpecialKeySpace::IMPLTYPE::READONLY,
|
||||
|
|
|
@ -45,7 +45,8 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandT
|
|||
{ "exclude", KeyRangeRef(LiteralStringRef("excluded/"), LiteralStringRef("excluded0"))
|
||||
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
|
||||
{ "failed", KeyRangeRef(LiteralStringRef("failed/"), LiteralStringRef("failed0"))
|
||||
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
|
||||
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
|
||||
{ "lock", singleKeyRange(LiteralStringRef("dbLocked")).withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
|
||||
};
|
||||
|
||||
std::set<std::string> SpecialKeySpace::options = { "excluded/force", "failed/force" };
|
||||
|
@ -1078,19 +1079,19 @@ Future<Optional<std::string>> ProcessClassRangeImpl::commit(ReadYourWritesTransa
|
|||
return processClassCommitActor(ryw, getKeyRange());
|
||||
}
|
||||
|
||||
void throwNotAllowedError(ReadYourWritesTransaction* ryw) {
|
||||
auto msg = ManagementAPIError::toJsonString(false, "setclass",
|
||||
"Clear operation is meaningless thus forbidden for setclass");
|
||||
void throwSpecialKeyApiFailure(ReadYourWritesTransaction* ryw, std::string command, std::string message) {
|
||||
auto msg = ManagementAPIError::toJsonString(false, command, message);
|
||||
ryw->setSpecialKeySpaceErrorMsg(msg);
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
|
||||
void ProcessClassRangeImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) {
|
||||
return throwNotAllowedError(ryw);
|
||||
return throwSpecialKeyApiFailure(ryw, "setclass", "Clear operation is meaningless thus forbidden for setclass");
|
||||
}
|
||||
|
||||
void ProcessClassRangeImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
|
||||
return throwNotAllowedError(ryw);
|
||||
return throwSpecialKeyApiFailure(ryw, "setclass",
|
||||
"Clear range operation is meaningless thus forbidden for setclass");
|
||||
}
|
||||
|
||||
ACTOR Future<Standalone<RangeResultRef>> getProcessClassSourceActor(ReadYourWritesTransaction* ryw, KeyRef prefix,
|
||||
|
@ -1120,4 +1121,74 @@ ProcessClassSourceRangeImpl::ProcessClassSourceRangeImpl(KeyRangeRef kr) : Speci
|
|||
Future<Standalone<RangeResultRef>> ProcessClassSourceRangeImpl::getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr) const {
|
||||
return getProcessClassSourceActor(ryw, getKeyRange().begin, kr);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Standalone<RangeResultRef>> getLockedKeyActor(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr) {
|
||||
ryw->getTransaction().setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> val = wait(ryw->getTransaction().get(databaseLockedKey));
|
||||
Standalone<RangeResultRef> result;
|
||||
if (val.present()) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(kr.begin, val.get()));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
LockDatabaseImpl::LockDatabaseImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
|
||||
|
||||
Future<Standalone<RangeResultRef>> LockDatabaseImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
|
||||
// sigle key range, the queried range should always be the same as the underlying range
|
||||
ASSERT(kr == getKeyRange());
|
||||
auto lockEntry = ryw->getSpecialKeySpaceWriteMap()[SpecialKeySpace::getManagementApiCommandPrefix("lock")];
|
||||
if (!ryw->readYourWritesDisabled() && lockEntry.first) {
|
||||
// ryw enabled and we have written to the special key
|
||||
Standalone<RangeResultRef> result;
|
||||
if (lockEntry.second.present()) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(kr.begin, lockEntry.second.get()));
|
||||
}
|
||||
return result;
|
||||
} else {
|
||||
return getLockedKeyActor(ryw, kr);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Optional<std::string>> lockDatabaseCommitActor(ReadYourWritesTransaction* ryw) {
|
||||
state Optional<std::string> msg;
|
||||
ryw->getTransaction().setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> val = wait(ryw->getTransaction().get(databaseLockedKey));
|
||||
UID uid = deterministicRandom()->randomUniqueID();
|
||||
|
||||
if (val.present() && BinaryReader::fromStringRef<UID>(val.get().substr(10), Unversioned()) != uid) {
|
||||
// check database not locked
|
||||
// if locked already, throw error
|
||||
msg = ManagementAPIError::toJsonString(false, "lock", "Database has already been locked");
|
||||
} else if (!val.present()) {
|
||||
// lock database
|
||||
ryw->getTransaction().atomicOp(databaseLockedKey,
|
||||
BinaryWriter::toValue(uid, Unversioned())
|
||||
.withPrefix(LiteralStringRef("0123456789"))
|
||||
.withSuffix(LiteralStringRef("\x00\x00\x00\x00")),
|
||||
MutationRef::SetVersionstampedValue);
|
||||
ryw->getTransaction().addWriteConflictRange(normalKeys);
|
||||
}
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
ACTOR Future<Optional<std::string>> unlockDatabaseCommitActor(ReadYourWritesTransaction* ryw) {
|
||||
ryw->getTransaction().setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> val = wait(ryw->getTransaction().get(databaseLockedKey));
|
||||
if (val.present()) {
|
||||
ryw->getTransaction().clear(singleKeyRange(databaseLockedKey));
|
||||
}
|
||||
return Optional<std::string>();
|
||||
}
|
||||
|
||||
Future<Optional<std::string>> LockDatabaseImpl::commit(ReadYourWritesTransaction* ryw) {
|
||||
auto lockId = ryw->getSpecialKeySpaceWriteMap()[SpecialKeySpace::getManagementApiCommandPrefix("lock")].second;
|
||||
if (lockId.present()) {
|
||||
return lockDatabaseCommitActor(ryw);
|
||||
} else {
|
||||
return unlockDatabaseCommitActor(ryw);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -305,5 +305,12 @@ public:
|
|||
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
|
||||
};
|
||||
|
||||
class LockDatabaseImpl : public SpecialKeyRangeRWImpl {
|
||||
public:
|
||||
explicit LockDatabaseImpl(KeyRangeRef kr);
|
||||
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
|
||||
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
|
||||
};
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
|
@ -104,8 +104,8 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
|
|||
Future<Void> f;
|
||||
{
|
||||
ReadYourWritesTransaction ryw{ cx->clone() };
|
||||
if(!ryw.getDatabase()->apiVersionAtLeast(630)) {
|
||||
//This test is not valid for API versions smaller than 630
|
||||
if (!ryw.getDatabase()->apiVersionAtLeast(630)) {
|
||||
// This test is not valid for API versions smaller than 630
|
||||
return;
|
||||
}
|
||||
f = success(ryw.get(LiteralStringRef("\xff\xff/status/json")));
|
||||
|
@ -745,6 +745,68 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
|
|||
wait(tx->onError(e));
|
||||
}
|
||||
}
|
||||
// test lock and unlock
|
||||
// maske sure we lock the database
|
||||
loop {
|
||||
try {
|
||||
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
// lock the database
|
||||
tx->set(SpecialKeySpace::getManagementApiCommandPrefix("lock"), LiteralStringRef(""));
|
||||
// commit
|
||||
wait(tx->commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevDebug, "DatabaseLockFailure").error(e);
|
||||
if (e.code() == error_code_actor_cancelled) throw;
|
||||
// In case commit_unknown_result is thrown by buggify, we may try to lock more than once
|
||||
// The second lock commit will throw special_keys_api_failure error
|
||||
if (e.code() == error_code_special_keys_api_failure) {
|
||||
Optional<Value> errorMsg =
|
||||
wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
|
||||
ASSERT(errorMsg.present());
|
||||
std::string errorStr;
|
||||
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
|
||||
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
|
||||
// special_key_space_management_api_error_msg schema validation
|
||||
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
|
||||
ASSERT(valueObj["command"].get_str() == "lock" && !valueObj["retriable"].get_bool());
|
||||
break;
|
||||
} else {
|
||||
wait(tx->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
TraceEvent(SevDebug, "DatabaseLocked");
|
||||
// if database locked, fdb read should get database_locked error
|
||||
try {
|
||||
tx->reset();
|
||||
Standalone<RangeResultRef> res = wait(tx->getRange(normalKeys, 1));
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) throw;
|
||||
ASSERT(e.code() == error_code_database_locked);
|
||||
}
|
||||
// make sure we unlock the database
|
||||
// unlock is idempotent, thus we can commit many times until successful
|
||||
loop {
|
||||
try {
|
||||
tx->reset();
|
||||
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
// unlock the database
|
||||
tx->clear(SpecialKeySpace::getManagementApiCommandPrefix("lock"));
|
||||
wait(tx->commit());
|
||||
TraceEvent(SevDebug, "DatabaseUnlocked");
|
||||
tx->reset();
|
||||
// read should be successful
|
||||
Standalone<RangeResultRef> res = wait(tx->getRange(normalKeys, 1));
|
||||
tx->reset();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevDebug, "DatabaseUnlockFailure").error(e);
|
||||
ASSERT(e.code() != error_code_database_locked);
|
||||
if (e.code() == error_code_actor_cancelled) throw;
|
||||
wait(tx->onError(e));
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue