Add default encode, decode methods. Add test for setclass special keys

This commit is contained in:
Chaoguang Lin 2020-08-20 13:50:35 -07:00
parent 5660de9c09
commit 707b88583a
3 changed files with 114 additions and 97 deletions

View File

@ -572,63 +572,11 @@ void ManagementCommandsOptionsImpl::clear(ReadYourWritesTransaction* ryw, const
}
}
Key ManagementCommandsOptionsImpl::decode(const KeyRef& key) const {
// Should never be used
ASSERT(false);
return key;
}
Key ManagementCommandsOptionsImpl::encode(const KeyRef& key) const {
// Should never be used
ASSERT(false);
return key;
}
Future<Optional<std::string>> ManagementCommandsOptionsImpl::commit(ReadYourWritesTransaction* ryw) {
// Nothing to do, keys should be used by other impls' commit callback
return Optional<std::string>();
}
Standalone<RangeResultRef> rywModuleGetRange(ReadYourWritesTransaction* ryw, Standalone<RangeResultRef>* res,
KeyRangeRef kr) {
// res is read from database, if ryw enabled, we update it with writes in the transaction
if (ryw->readYourWritesDisabled()) {
return *res;
} else {
Standalone<RangeResultRef> result;
result.arena().dependsOn(res->arena());
RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::Ranges ranges =
ryw->getSpecialKeySpaceWriteMap().containedRanges(kr);
RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::iterator iter = ranges.begin();
int index = 0;
while (iter != ranges.end()) {
// add all previous entries into result
KeyRef rk = (*res)[index].key;
while (index < res->size() && rk < iter->begin()) {
result.push_back(result.arena(), KeyValueRef(rk, (*res)[index].value));
++index;
}
std::pair<bool, Optional<Value>> entry = iter->value();
if (entry.first) {
// add the writen entries if exists
if (entry.second.present()) {
result.push_back(result.arena(), KeyValueRef(iter->begin(), entry.second.get()));
}
// move index to skip all entries in the iter->range
while (index < res->size() && iter->range().contains((*res)[index].key)) ++index;
}
++iter;
}
// add all remaining entries into result
while (index < res->size()) {
const KeyValueRef& kv = (*res)[index];
result.push_back(result.arena(), KeyValueRef(kv.key, kv.value));
++index;
}
return result;
}
}
// read from rwModule
ACTOR Future<Standalone<RangeResultRef>> rwModuleGetRangeActor(ReadYourWritesTransaction* ryw,
const SpecialKeyRangeRWImpl* impl, KeyRangeRef kr) {
@ -1002,37 +950,60 @@ Future<Standalone<RangeResultRef>> ExclusionInProgressRangeImpl::getRange(ReadYo
return ExclusionInProgressActor(ryw, getKeyRange().begin, kr);
}
Standalone<RangeResultRef> rywModuleGetRange(ReadYourWritesTransaction* ryw, Standalone<RangeResultRef> res,
KeyRangeRef kr) {
// res is read from database, if ryw enabled, we update it with writes in the transaction
Standalone<RangeResultRef> result;
RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::Ranges ranges =
ryw->getSpecialKeySpaceWriteMap().containedRanges(kr);
RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::iterator iter = ranges.begin();
int index = 0;
while (iter != ranges.end()) {
// add all previous entries into result
while (index < res.size() && res[index].key < iter->begin()) {
result.push_back(result.arena(), KeyValueRef(res[index].key, res[index].value));
result.arena().dependsOn(res.arena());
++index;
}
std::pair<bool, Optional<Value>> entry = iter->value();
if (entry.first) {
// add the writen entries if exists
if (entry.second.present()) {
result.push_back_deep(result.arena(), KeyValueRef(iter->begin(), entry.second.get()));
}
// move index to skip all entries in the iter->range
while (index < res.size() && iter->range().contains(res[index].key)) ++index;
}
++iter;
}
// add all remaining entries into result
while (index < res.size()) {
const KeyValueRef& kv = res[index];
result.push_back(result.arena(), KeyValueRef(kv.key, kv.value));
result.arena().dependsOn(res.arena());
++index;
}
return result;
}
ACTOR Future<Standalone<RangeResultRef>> getProcessClassActor(ReadYourWritesTransaction* ryw, KeyRef prefix,
KeyRangeRef kr) {
state Future<Standalone<RangeResultRef>> processClasses = ryw->getRange(processClassKeys, CLIENT_KNOBS->TOO_MANY);
state Future<Standalone<RangeResultRef>> processData = ryw->getRange(workerListKeys, CLIENT_KNOBS->TOO_MANY);
wait(success(processClasses) && success(processData));
ASSERT(!processClasses.get().more && processClasses.get().size() < CLIENT_KNOBS->TOO_MANY);
ASSERT(!processData.get().more && processData.get().size() < CLIENT_KNOBS->TOO_MANY);
std::map<Optional<Standalone<StringRef>>, ProcessClass> id_class;
for (int i = 0; i < processClasses.get().size(); i++) {
id_class[decodeProcessClassKey(processClasses.get()[i].key)] =
decodeProcessClassValue(processClasses.get()[i].value);
}
vector<ProcessData> _workers = wait(getWorkers(&ryw->getTransaction()));
auto workers = _workers; // strip const
std::sort(workers.begin(), workers.end(), ProcessData::sort_by_address());
Standalone<RangeResultRef> result;
for (int i = 0; i < processData.get().size(); i++) {
ProcessData data = decodeWorkerListValue(processData.get()[i].value);
ProcessClass processClass = id_class[data.locality.processId()];
if (processClass.classSource() == ProcessClass::DBSource ||
data.processClass.classType() == ProcessClass::UnsetClass)
data.processClass = processClass;
if (data.processClass.classType() != ProcessClass::TesterClass) {
result.push_back_deep(result.arena(), KeyValueRef(prefix.withSuffix(data.address.toString()),
Value(data.processClass.toString())));
for (auto& w : workers) {
// exclude :tls in keys even the network addresss is TLS
Key k(prefix.withSuffix(formatIpPort(w.address.ip, w.address.port)));
if (kr.contains(k)) {
Value v(w.processClass.toString());
result.push_back(result.arena(), KeyValueRef(k, v));
result.arena().dependsOn(k.arena());
result.arena().dependsOn(v.arena());
}
}
return rywModuleGetRange(ryw, &result, kr);
if (ryw->readYourWritesDisabled()) return result;
return rywModuleGetRange(ryw, result, kr);
}
ACTOR Future<Optional<std::string>> processClassCommitActor(ReadYourWritesTransaction* ryw, KeyRangeRef range) {

View File

@ -79,9 +79,17 @@ public:
virtual Future<Optional<std::string>> commit(
ReadYourWritesTransaction* ryw) = 0; // all delayed async operations of writes in special-key-space
// Given the special key to write, return the real key that needs to be modified
virtual Key decode(const KeyRef& key) const = 0;
virtual Key decode(const KeyRef& key) const {
// Default implementation should never be used
ASSERT(false);
return key;
}
// Given the read key, return the corresponding special key
virtual Key encode(const KeyRef& key) const = 0;
virtual Key encode(const KeyRef& key) const {
// Default implementation should never be used
ASSERT(false);
return key;
};
explicit SpecialKeyRangeRWImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
@ -245,8 +253,6 @@ public:
void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;
Key decode(const KeyRef& key) const override;
Key encode(const KeyRef& key) const override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
@ -284,8 +290,6 @@ class ProcessClassRangeImpl : public SpecialKeyRangeRWImpl {
public:
explicit ProcessClassRangeImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
Key decode(const KeyRef& key) const override { return Key(); }
Key encode(const KeyRef& key) const override { return Key(); }
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;

View File

@ -36,7 +36,7 @@ public:
// all keys are written to RYW, since GRV is set, the read should happen locally
ASSERT(resultFuture.isReady());
auto result = resultFuture.getValue();
ASSERT(!result.more);
ASSERT(!result.more && result.size() < CLIENT_KNOBS->TOO_MANY);
// To make the test more complext, instead of simply returning the k-v pairs, we reverse all the value strings
auto kvs = resultFuture.getValue();
for (int i = 0; i < kvs.size(); ++i) {
@ -523,6 +523,19 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
return Void();
}
bool getRangeResultInOrder(const Standalone<RangeResultRef>& result) {
for (int i = 0; i < result.size() - 1; ++i) {
if (result[i].key >= result[i + 1].key) {
TraceEvent(SevDebug, "GetRangeResultNotInOrder")
.detail("Index", i)
.detail("Key1", result[i].key)
.detail("Key2", result[i + 1].key);
return false;
}
}
return true;
}
ACTOR Future<Void> managementApiCorrectnessActor(Database cx_, SpecialKeySpaceCorrectnessWorkload* self) {
// All management api related tests
Database cx = cx_->clone();
@ -536,12 +549,13 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
.withSuffix(option),
ValueRef());
}
Standalone<RangeResultRef> res = wait(tx->getRange(
Standalone<RangeResultRef> result = wait(tx->getRange(
KeyRangeRef(LiteralStringRef("options/"), LiteralStringRef("options0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin),
CLIENT_KNOBS->TOO_MANY));
ASSERT(res.size() == SpecialKeySpace::getManagementApiOptionsSet().size());
for (int i = 0; i < res.size() - 1; ++i) ASSERT(res[i].key < res[i + 1].key);
ASSERT(!result.more && result.size() < CLIENT_KNOBS->TOO_MANY);
ASSERT(result.size() == SpecialKeySpace::getManagementApiOptionsSet().size());
ASSERT(self->getRangeResultInOrder(result));
tx->reset();
}
// "exclude" error message shema check
@ -569,17 +583,45 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
{
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
// test getRange
state Standalone<RangeResultRef> result = wait(tx->getRange(
KeyRangeRef(LiteralStringRef("class/"), LiteralStringRef("class0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin),
CLIENT_KNOBS->TOO_MANY));
ASSERT(!result.more && result.size() < CLIENT_KNOBS->TOO_MANY);
ASSERT(self->getRangeResultInOrder(result));
// check correctness of classType of each process
vector<ProcessData> workers = wait(getWorkers(&tx->getTransaction()));
auto worker = deterministicRandom()->randomChoice(workers);
std::string addr = worker.address.toString();
std::string suffix = ":tls";
// remove :tls suffix if needed
if ((addr.size() >= suffix.size()) && (addr.rfind(suffix) == addr.size() - suffix.size()))
addr = addr.substr(0, addr.size() - suffix.size());
tx->set(Key("class/" + addr)
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin),
LiteralStringRef("InvalidProcessType"));
for (const auto& worker : workers ) {
// TODO : test here
// ASSERT(!worker.address.isTLS());
Key addr = Key("class/" + formatIpPort(worker.address.ip, worker.address.port))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin);
bool found = false;
for (const auto& kv : result) {
if (kv.key == addr) {
ASSERT(kv.value.toString() == worker.processClass.toString());
found = true;
break;
}
}
// Each process should find its corresponding element
ASSERT(found);
}
state ProcessData worker = deterministicRandom()->randomChoice(workers);
state Key addr = Key("class/" + formatIpPort(worker.address.ip, worker.address.port))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin);
tx->set(addr, LiteralStringRef("InvalidProcessType"));
// test ryw
Optional<Value> processType = wait(tx->get(addr));
ASSERT(processType.present() && processType.get() == LiteralStringRef("InvalidProcessType"));
// test ryw disabled
tx->setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE);
Optional<Value> originalProcessType = wait(tx->get(addr));
ASSERT(originalProcessType.present() && originalProcessType.get() == worker.processClass.toString());
// test error handling (invalid value type)
wait(tx->commit());
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) throw;
ASSERT(e.code() == error_code_special_keys_api_failure);