Add consistencycheck command to special keys
This commit is contained in:
parent
7955442597
commit
b728fe473e
|
@ -950,6 +950,10 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
|||
SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||
std::make_unique<LockDatabaseImpl>(singleKeyRange(LiteralStringRef("dbLocked"))
|
||||
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
|
||||
registerSpecialKeySpaceModule(
|
||||
SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||
std::make_unique<ConsistencyCheckImpl>(singleKeyRange(LiteralStringRef("consistency_check_suspended"))
|
||||
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
|
||||
}
|
||||
if (apiVersionAtLeast(630)) {
|
||||
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, SpecialKeySpace::IMPLTYPE::READONLY,
|
||||
|
|
|
@ -46,7 +46,9 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandT
|
|||
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
|
||||
{ "failed", KeyRangeRef(LiteralStringRef("failed/"), LiteralStringRef("failed0"))
|
||||
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
|
||||
{ "lock", singleKeyRange(LiteralStringRef("dbLocked")).withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
|
||||
{ "lock", singleKeyRange(LiteralStringRef("dbLocked")).withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
|
||||
{ "consistencycheck", singleKeyRange(LiteralStringRef("consistency_check_suspended"))
|
||||
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
|
||||
};
|
||||
|
||||
std::set<std::string> SpecialKeySpace::options = { "excluded/force", "failed/force" };
|
||||
|
@ -1123,8 +1125,7 @@ Future<Standalone<RangeResultRef>> ProcessClassSourceRangeImpl::getRange(ReadYou
|
|||
return getProcessClassSourceActor(ryw, getKeyRange().begin, kr);
|
||||
}
|
||||
|
||||
ACTOR Future<Standalone<RangeResultRef>> getLockedKeyActor(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr) {
|
||||
ACTOR Future<Standalone<RangeResultRef>> getLockedKeyActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
|
||||
ryw->getTransaction().setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> val = wait(ryw->getTransaction().get(databaseLockedKey));
|
||||
Standalone<RangeResultRef> result;
|
||||
|
@ -1192,3 +1193,44 @@ Future<Optional<std::string>> LockDatabaseImpl::commit(ReadYourWritesTransaction
|
|||
return unlockDatabaseCommitActor(ryw);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Standalone<RangeResultRef>> getConsistencyCheckKeyActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
|
||||
ryw->getTransaction().setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
ryw->getTransaction().setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
Optional<Value> val = wait(ryw->getTransaction().get(fdbShouldConsistencyCheckBeSuspended));
|
||||
bool ccSuspendSetting = val.present() ? BinaryReader::fromStringRef<bool>(val.get(), Unversioned()) : false;
|
||||
Standalone<RangeResultRef> result;
|
||||
if (ccSuspendSetting) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(kr.begin, ValueRef()));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
ConsistencyCheckImpl::ConsistencyCheckImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
|
||||
|
||||
Future<Standalone<RangeResultRef>> ConsistencyCheckImpl::getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr) const {
|
||||
// sigle key range, the queried range should always be the same as the underlying range
|
||||
ASSERT(kr == getKeyRange());
|
||||
auto entry = ryw->getSpecialKeySpaceWriteMap()[SpecialKeySpace::getManagementApiCommandPrefix("consistencycheck")];
|
||||
if (!ryw->readYourWritesDisabled() && entry.first) {
|
||||
// ryw enabled and we have written to the special key
|
||||
Standalone<RangeResultRef> result;
|
||||
if (entry.second.present()) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(kr.begin, entry.second.get()));
|
||||
}
|
||||
return result;
|
||||
} else {
|
||||
return getConsistencyCheckKeyActor(ryw, kr);
|
||||
}
|
||||
}
|
||||
|
||||
Future<Optional<std::string>> ConsistencyCheckImpl::commit(ReadYourWritesTransaction* ryw) {
|
||||
auto entry =
|
||||
ryw->getSpecialKeySpaceWriteMap()[SpecialKeySpace::getManagementApiCommandPrefix("consistencycheck")].second;
|
||||
ryw->getTransaction().setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
ryw->getTransaction().setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
ryw->getTransaction().set(fdbShouldConsistencyCheckBeSuspended,
|
||||
BinaryWriter::toValue(entry.present(), Unversioned()));
|
||||
return Optional<std::string>();
|
||||
}
|
||||
|
|
|
@ -312,5 +312,12 @@ public:
|
|||
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
|
||||
};
|
||||
|
||||
class ConsistencyCheckImpl : public SpecialKeyRangeRWImpl {
|
||||
public:
|
||||
explicit ConsistencyCheckImpl(KeyRangeRef kr);
|
||||
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
|
||||
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
|
||||
};
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
|
@ -807,6 +807,50 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
|
|||
wait(tx->onError(e));
|
||||
}
|
||||
}
|
||||
// test consistencycheck which only used by ConsistencyCheck Workload
|
||||
{
|
||||
try {
|
||||
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
Optional<Value> val1 = wait(tx->get(fdbShouldConsistencyCheckBeSuspended));
|
||||
state bool ccSuspendSetting =
|
||||
val1.present() ? BinaryReader::fromStringRef<bool>(val1.get(), Unversioned()) : false;
|
||||
Optional<Value> val2 =
|
||||
wait(tx->get(SpecialKeySpace::getManagementApiCommandPrefix("consistencycheck")));
|
||||
ASSERT(ccSuspendSetting ? val2.present() : !val2.present());
|
||||
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
if (ccSuspendSetting)
|
||||
tx->clear(SpecialKeySpace::getManagementApiCommandPrefix("consistencycheck"));
|
||||
else
|
||||
tx->set(SpecialKeySpace::getManagementApiCommandPrefix("consistencycheck"), ValueRef());
|
||||
wait(tx->commit());
|
||||
// check the system key is set/clear
|
||||
tx->reset();
|
||||
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
Optional<Value> val3 = wait(tx->get(fdbShouldConsistencyCheckBeSuspended));
|
||||
bool ccSuspendSetting2 =
|
||||
val3.present() ? BinaryReader::fromStringRef<bool>(val3.get(), Unversioned()) : false;
|
||||
ASSERT(ccSuspendSetting == !ccSuspendSetting2);
|
||||
tx->reset();
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) throw;
|
||||
wait(tx->onError(e));
|
||||
}
|
||||
}
|
||||
// make sure we enable consistencycheck by the end
|
||||
{
|
||||
loop {
|
||||
try {
|
||||
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
tx->clear(SpecialKeySpace::getManagementApiCommandPrefix("consistencycheck"));
|
||||
wait(tx->commit());
|
||||
tx->reset();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) throw;
|
||||
wait(tx->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue