Add ability to bulk load data into TxnStateStore
* Changes BulkLoad workload to support a specific volume of data to load * Changes BulkLoad and Cycle to correctly handle \xff in keyPrefix * Adds BulkLoad to TxnStateStoreCycleTest
This commit is contained in:
parent
690ffb91b0
commit
4ac1a0f557
|
@ -25,9 +25,11 @@
|
|||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
struct BulkLoadWorkload : TestWorkload {
|
||||
int actorCount, writesPerTransaction, valueBytes;
|
||||
int clientCount, actorCount, writesPerTransaction, valueBytes;
|
||||
double testDuration;
|
||||
Value value;
|
||||
uint64_t targetBytes;
|
||||
Key keyPrefix;
|
||||
|
||||
vector<Future<Void>> clients;
|
||||
PerfIntCounter transactions, retries;
|
||||
|
@ -35,13 +37,16 @@ struct BulkLoadWorkload : TestWorkload {
|
|||
|
||||
BulkLoadWorkload(WorkloadContext const& wcx)
|
||||
: TestWorkload(wcx),
|
||||
transactions("Transactions"), retries("Retries"), latencies( 2000 )
|
||||
clientCount(wcx.clientCount), transactions("Transactions"), retries("Retries"), latencies( 2000 )
|
||||
{
|
||||
testDuration = getOption( options, LiteralStringRef("testDuration"), 10.0 );
|
||||
actorCount = getOption( options, LiteralStringRef("actorCount"), 20 );
|
||||
writesPerTransaction = getOption( options, LiteralStringRef("writesPerTransaction"), 10 );
|
||||
valueBytes = std::max( getOption( options, LiteralStringRef("valueBytes"), 96 ), 16 );
|
||||
value = Value( std::string( valueBytes, '.' ) );
|
||||
targetBytes = getOption( options, LiteralStringRef("targetBytes"), std::numeric_limits<uint64_t>::max() );
|
||||
keyPrefix = getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef(""));
|
||||
keyPrefix = unprintable( keyPrefix.toString() );
|
||||
}
|
||||
|
||||
virtual std::string description() { return "BulkLoad"; }
|
||||
|
@ -74,17 +79,23 @@ struct BulkLoadWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> bulkLoadClient( Database cx, BulkLoadWorkload *self, int clientId, int actorId ) {
|
||||
state uint64_t totalBytes = 0;
|
||||
state int idx = 0;
|
||||
loop {
|
||||
state double tstart = now();
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
state uint64_t txnBytes = 0;
|
||||
try {
|
||||
for(int i = 0; i < self->writesPerTransaction; i++)
|
||||
tr.set( format( "/bulkload/%04x/%04x/%08x", self->clientId, actorId, idx + i ), self->value );
|
||||
for(int i = 0; i < self->writesPerTransaction; i++) {
|
||||
std::string key = format( "%s/bulkload/%04x/%04x/%08x", self->keyPrefix.toString().c_str(), self->clientId, actorId, idx + i );
|
||||
tr.set( key, self->value );
|
||||
txnBytes += key.size() + self->value.size();
|
||||
}
|
||||
tr.makeSelfConflicting();
|
||||
wait(success( tr.getReadVersion() ));
|
||||
wait( tr.commit() );
|
||||
totalBytes += txnBytes;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait( tr.onError(e) );
|
||||
|
@ -94,6 +105,7 @@ struct BulkLoadWorkload : TestWorkload {
|
|||
self->latencies.addSample( now() - tstart );
|
||||
++self->transactions;
|
||||
idx += self->writesPerTransaction;
|
||||
if (totalBytes > self->targetBytes / self->clientCount / self->actorCount) return Void();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
@ -43,7 +43,7 @@ struct CycleWorkload : TestWorkload {
|
|||
transactionsPerSecond = getOption( options, LiteralStringRef("transactionsPerSecond"), 5000.0 ) / clientCount;
|
||||
actorCount = getOption( options, LiteralStringRef("actorsPerClient"), transactionsPerSecond / 5 );
|
||||
nodeCount = getOption(options, LiteralStringRef("nodeCount"), transactionsPerSecond * clientCount);
|
||||
keyPrefix = getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef(""));
|
||||
keyPrefix = unprintable( getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef("")).toString() );
|
||||
minExpectedTransactionsPerSecond = transactionsPerSecond * getOption(options, LiteralStringRef("expectedRate"), 0.7);
|
||||
checkOnly = getOption(options, LiteralStringRef("checkOnly"), false);
|
||||
}
|
||||
|
|
|
@ -1,3 +1,10 @@
|
|||
testTitle=PreLoad
|
||||
testName=BulkLoad
|
||||
testDuration=3000.0
|
||||
valueBytes=100
|
||||
targetBytes=1000000
|
||||
keyPrefix=\xff/TESTONLYtxnStateStore/notcycle/
|
||||
|
||||
testTitle=Clogged
|
||||
testName=Cycle
|
||||
transactionsPerSecond=2500.0
|
||||
|
@ -23,10 +30,3 @@ testTitle=Clogged
|
|||
machinesToLeave=3
|
||||
reboot=true
|
||||
testDuration=10.0
|
||||
|
||||
testTitle=Unclogged
|
||||
testName=Cycle
|
||||
transactionsPerSecond=250.0
|
||||
testDuration=10.0
|
||||
expectedRate=0.80
|
||||
keyPrefix=\xff/TESTONLYtxnStateStore/
|
||||
|
|
Loading…
Reference in New Issue