ApiTester: Prepopulating database; commit-read operation for testing GRV correctness

This commit is contained in:
Vaidas Gasiunas 2022-03-01 18:32:50 +01:00
parent 328c5dad28
commit e3de8c6847
3 changed files with 90 additions and 13 deletions

View File

@ -75,6 +75,10 @@ void Transaction::reset() {
fdb_transaction_reset(tx_.get());
}
fdb_error_t Transaction::setOption(FDBTransactionOption option) {
return fdb_transaction_set_option(tx_.get(), option, reinterpret_cast<const uint8_t*>(""), 0);
}
fdb_error_t FdbApi::setOption(FDBNetworkOption option, std::string_view value) {
return fdb_network_set_option(option, reinterpret_cast<const uint8_t*>(value.data()), value.size());
}

View File

@ -70,6 +70,7 @@ public:
Future commit();
Future onError(fdb_error_t err);
void reset();
fdb_error_t setOption(FDBTransactionOption option);
private:
std::shared_ptr<FDBTransaction> tx_;

View File

@ -30,7 +30,7 @@ namespace FdbApiTester {
class ApiCorrectnessWorkload : public WorkloadBase {
public:
enum OpType { OP_INSERT, OP_GET, OP_LAST = OP_GET };
enum OpType { OP_INSERT, OP_GET, OP_COMMIT_READ, OP_LAST = OP_COMMIT_READ };
// The minimum length of a key
int minKeyLength;
@ -47,6 +47,9 @@ public:
// Maximum number of keys to be accessed by a transaction
int maxKeysPerTransaction;
// Initial data size (number of key-value pairs)
int initialSize;
// The number of operations to be executed
int numOperations;
@ -59,9 +62,10 @@ public:
ApiCorrectnessWorkload(std::string_view prefix) {
minKeyLength = 1;
maxKeyLength = 64;
minValueLength = 1;
maxValueLength = 1000;
minValueLength = 5;
maxValueLength = 10;
maxKeysPerTransaction = 50;
initialSize = 100;
numOperations = 1000;
readExistingKeysRatio = 0.9;
keyPrefix = prefix;
@ -69,9 +73,16 @@ public:
}
void start() override {
schedule([this]() { nextTransaction(); });
schedule([this]() {
// 1. Populate initial data
populateData([this]() {
// 2. Generate random workload
randomOperations();
});
});
}
private:
std::string randomKeyName() { return keyPrefix + random.randomStringLowerCase(minKeyLength, maxKeyLength); }
std::string randomValue() { return random.randomStringLowerCase(minValueLength, maxValueLength); }
@ -90,9 +101,13 @@ public:
std::string key = store.getKey(genKey, true, 1);
if (key != store.endKey()) {
return key;
} else {
return store.getKey(genKey, true, 0);
}
key = store.getKey(genKey, true, 0);
if (key != store.startKey()) {
return key;
}
std::cout << "No existing key found, using a new random key." << std::endl;
return genKey;
}
std::string randomKey(double existingKeyRatio) {
@ -120,7 +135,54 @@ public:
for (const KeyValue& kv : *kvPairs) {
store.set(kv.key, kv.value);
}
cont();
schedule(cont);
});
}
void randomCommitReadOp(TTaskFct cont) {
int numKeys = random.randomInt(1, maxKeysPerTransaction);
auto kvPairs = std::make_shared<std::vector<KeyValue>>();
for (int i = 0; i < numKeys; i++) {
kvPairs->push_back(KeyValue{ randomKey(readExistingKeysRatio), randomValue() });
}
execTransaction(
[kvPairs](auto ctx) {
for (const KeyValue& kv : *kvPairs) {
ctx->tx()->set(kv.key, kv.value);
}
ctx->commit();
},
[this, kvPairs, cont]() {
for (const KeyValue& kv : *kvPairs) {
store.set(kv.key, kv.value);
}
auto results = std::make_shared<std::vector<std::optional<std::string>>>();
execTransaction(
[kvPairs, results](auto ctx) {
// TODO: Enable after merging with GRV caching
// ctx->tx()->setOption(FDB_TR_OPTION_USE_GRV_CACHE);
auto futures = std::make_shared<std::vector<Future>>();
for (const auto& kv : *kvPairs) {
futures->push_back(ctx->tx()->get(kv.key, false));
}
ctx->continueAfterAll(futures, [ctx, futures, results]() {
for (auto& f : *futures) {
results->push_back(((ValueFuture&)f).getValue());
}
ctx->done();
});
},
[this, kvPairs, results, cont]() {
ASSERT(results->size() == kvPairs->size());
for (int i = 0; i < kvPairs->size(); i++) {
auto expected = store.get((*kvPairs)[i].key);
if ((*results)[i] != expected) {
std::cout << "randomCommitReadOp mismatch. key: " << (*kvPairs)[i].key
<< " expected: " << expected << " actual: " << (*results)[i] << std::endl;
}
}
schedule(cont);
});
});
}
@ -149,11 +211,11 @@ public:
for (int i = 0; i < keys->size(); i++) {
auto expected = store.get((*keys)[i]);
if ((*results)[i] != expected) {
std::cout << "randomGetOp mismatch. expected: " << expected << " actual: " << (*results)[i]
<< std::endl;
std::cout << "randomGetOp mismatch. key :" << (*keys)[i] << " expected: " << expected
<< " actual: " << (*results)[i] << std::endl;
}
}
cont();
schedule(cont);
});
}
@ -166,11 +228,21 @@ public:
case OP_GET:
randomGetOp(cont);
break;
case OP_COMMIT_READ:
randomCommitReadOp(cont);
break;
}
}
private:
void nextTransaction() {
void populateData(TTaskFct cont) {
if (store.size() < initialSize) {
randomInsertOp([this, cont]() { populateData(cont); });
} else {
schedule(cont);
}
}
void randomOperations() {
if (numOpLeft % 100 == 0) {
std::cout << numOpLeft << " transactions left" << std::endl;
}
@ -178,7 +250,7 @@ private:
return;
numOpLeft--;
randomOperation([this]() { schedule([this]() { nextTransaction(); }); });
randomOperation([this]() { randomOperations(); });
}
int numOpLeft;