Merge pull request #8549 from sfc-gh-tclinkenbeard/expose-txn-cost
Create `fdb_transaction_get_total_cost` function
This commit is contained in:
commit
39abc712b0
|
@ -21,7 +21,7 @@
|
|||
#include "fdbclient/FDBTypes.h"
|
||||
#include "flow/ProtocolVersion.h"
|
||||
#include <cstdint>
|
||||
#define FDB_API_VERSION 720
|
||||
#define FDB_API_VERSION 730
|
||||
#define FDB_INCLUDE_LEGACY_TYPES
|
||||
|
||||
#include "fdbclient/MultiVersionTransaction.h"
|
||||
|
@ -905,6 +905,10 @@ extern "C" DLLEXPORT fdb_error_t fdb_transaction_get_committed_version(FDBTransa
|
|||
CATCH_AND_RETURN(*out_version = TXN(tr)->getCommittedVersion(););
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_total_cost(FDBTransaction* tr) {
|
||||
return (FDBFuture*)TXN(tr)->getTotalCost().extractPtr();
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr) {
|
||||
return (FDBFuture*)TXN(tr)->getApproximateSize().extractPtr();
|
||||
}
|
||||
|
|
|
@ -27,10 +27,10 @@
|
|||
#endif
|
||||
|
||||
#if !defined(FDB_API_VERSION)
|
||||
#error You must #define FDB_API_VERSION prior to including fdb_c.h (current version is 720)
|
||||
#error You must #define FDB_API_VERSION prior to including fdb_c.h (current version is 730)
|
||||
#elif FDB_API_VERSION < 13
|
||||
#error API version no longer supported (upgrade to 13)
|
||||
#elif FDB_API_VERSION > 720
|
||||
#elif FDB_API_VERSION > 730
|
||||
#error Requested API version requires a newer version of this header
|
||||
#endif
|
||||
|
||||
|
@ -514,12 +514,14 @@ DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_transaction_get_committed_version(F
|
|||
int64_t* out_version);
|
||||
|
||||
/*
|
||||
* This function intentionally returns an FDBFuture instead of an integer
|
||||
* directly, so that calling this API can see the effect of previous
|
||||
* These functions intentionally return an FDBFuture instead of an integer
|
||||
* directly, so that calling the API can see the effect of previous
|
||||
* mutations on the transaction. Specifically, mutations are applied
|
||||
* asynchronously by the main thread. In order to see them, this call has to
|
||||
* be serviced by the main thread too.
|
||||
*/
|
||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_total_cost(FDBTransaction* tr);
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr);
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_versionstamp(FDBTransaction* tr);
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#define FDB_API_VERSION 720
|
||||
#define FDB_API_VERSION 730
|
||||
|
||||
namespace FdbApiTester {
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ namespace FdbApiTester {
|
|||
|
||||
namespace {
|
||||
|
||||
#define API_VERSION_CLIENT_TMP_DIR 720
|
||||
#define API_VERSION_CLIENT_TMP_DIR 730
|
||||
|
||||
enum TesterOptionId {
|
||||
OPT_CONNFILE,
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#define FDB_API_VERSION 720
|
||||
#define FDB_API_VERSION 730
|
||||
#include <foundationdb/fdb_c.h>
|
||||
|
||||
#include "unit/fdb_api.hpp"
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
#pragma once
|
||||
|
||||
#ifndef FDB_API_VERSION
|
||||
#define FDB_API_VERSION 720
|
||||
#define FDB_API_VERSION 730
|
||||
#endif
|
||||
|
||||
#include <cassert>
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
#define MAKO_HPP
|
||||
|
||||
#ifndef FDB_API_VERSION
|
||||
#define FDB_API_VERSION 720
|
||||
#define FDB_API_VERSION 730
|
||||
#endif
|
||||
|
||||
#include <array>
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
#include <inttypes.h>
|
||||
|
||||
#ifndef FDB_API_VERSION
|
||||
#define FDB_API_VERSION 720
|
||||
#define FDB_API_VERSION 730
|
||||
#endif
|
||||
|
||||
#include <foundationdb/fdb_c.h>
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
|
||||
// Unit tests that test the timeouts for a disconnected cluster
|
||||
|
||||
#define FDB_API_VERSION 720
|
||||
#define FDB_API_VERSION 730
|
||||
#include <foundationdb/fdb_c.h>
|
||||
|
||||
#include <chrono>
|
||||
|
|
|
@ -231,6 +231,10 @@ Int64Future Transaction::get_approximate_size() {
|
|||
return Int64Future(fdb_transaction_get_approximate_size(tr_));
|
||||
}
|
||||
|
||||
Int64Future Transaction::get_total_cost() {
|
||||
return Int64Future(fdb_transaction_get_total_cost(tr_));
|
||||
}
|
||||
|
||||
KeyFuture Transaction::get_versionstamp() {
|
||||
return KeyFuture(fdb_transaction_get_versionstamp(tr_));
|
||||
}
|
||||
|
|
|
@ -39,7 +39,7 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#define FDB_API_VERSION 720
|
||||
#define FDB_API_VERSION 730
|
||||
#include <foundationdb/fdb_c.h>
|
||||
|
||||
#include <string>
|
||||
|
@ -276,6 +276,9 @@ public:
|
|||
// Returns a future which will be set to the approximate transaction size so far.
|
||||
Int64Future get_approximate_size();
|
||||
|
||||
// Returns a future which will be set tot the transaction's total cost so far.
|
||||
Int64Future get_total_cost();
|
||||
|
||||
// Returns a future which will be set to the versionstamp which was used by
|
||||
// any versionstamp operations in the transaction.
|
||||
KeyFuture get_versionstamp();
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
|
||||
// Unit tests for API setup, network initialization functions from the FDB C API.
|
||||
|
||||
#define FDB_API_VERSION 720
|
||||
#define FDB_API_VERSION 730
|
||||
#include <foundationdb/fdb_c.h>
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
// Unit tests for the FoundationDB C API.
|
||||
|
||||
#include "fdb_c_options.g.h"
|
||||
#define FDB_API_VERSION 720
|
||||
#define FDB_API_VERSION 730
|
||||
#include <foundationdb/fdb_c.h>
|
||||
#include <assert.h>
|
||||
#include <string.h>
|
||||
|
@ -1945,6 +1945,30 @@ TEST_CASE("fdb_transaction_get_committed_version") {
|
|||
}
|
||||
}
|
||||
|
||||
TEST_CASE("fdb_transaction_get_total_cost") {
|
||||
fdb::Transaction tr(db);
|
||||
while (1) {
|
||||
fdb::ValueFuture f1 = tr.get("foo", /*snapshot*/ false);
|
||||
fdb_error_t err = wait_future(f1);
|
||||
if (err) {
|
||||
fdb::EmptyFuture fOnError = tr.on_error(err);
|
||||
fdb_check(wait_future(fOnError));
|
||||
continue;
|
||||
}
|
||||
fdb::Int64Future f2 = tr.get_total_cost();
|
||||
err = wait_future(f2);
|
||||
if (err) {
|
||||
fdb::EmptyFuture fOnError = tr.on_error(err);
|
||||
fdb_check(wait_future(fOnError));
|
||||
continue;
|
||||
}
|
||||
int64_t cost;
|
||||
fdb_check(f2.get(&cost));
|
||||
CHECK(cost > 0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("fdb_transaction_get_approximate_size") {
|
||||
fdb::Transaction tr(db);
|
||||
while (1) {
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#define FDB_API_VERSION 720
|
||||
#define FDB_API_VERSION 730
|
||||
#include "foundationdb/fdb_c.h"
|
||||
#undef DLLEXPORT
|
||||
#include "workloads.h"
|
||||
|
|
|
@ -11,16 +11,16 @@ The global tag throttler bases throttling decisions on "quotas" provided by clie
|
|||
The global tag throttler cannot throttle tags to a throughput below the reserved quota, and it cannot allow throughput to exceed the total quota.
|
||||
|
||||
### Cost
|
||||
Internally, the units for these quotas are "page costs", computed as follows. The "page cost" of a read operation is computed as:
|
||||
Internally, the units for these quotas are bytes. The cost of an operation is rounded up to the nearest page size. The cost of a read operation is computed as:
|
||||
|
||||
```
|
||||
readCost = ceiling(bytesRead / CLIENT_KNOBS->READ_COST_BYTE_FACTOR);
|
||||
readCost = ceiling(bytesRead / CLIENT_KNOBS->READ_COST_BYTE_FACTOR) * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
|
||||
```
|
||||
|
||||
The "page cost" of a write operation is computed as:
|
||||
The cost of a write operation is computed as:
|
||||
|
||||
```
|
||||
writeCost = SERVER_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * ceiling(bytesWritten / CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR);
|
||||
writeCost = CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * ceiling(bytesWritten / CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR) * CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR;
|
||||
```
|
||||
|
||||
Here `bytesWritten` includes cleared bytes. The size of range clears is estimated at commit time.
|
||||
|
@ -41,12 +41,6 @@ To set the quota through `fdbcli`, run:
|
|||
fdbcli> quota set <tag> [reserved_throughput|total_throughput] <bytes_per_second>
|
||||
```
|
||||
|
||||
Note that the quotas are specified in terms of bytes/second, and internally converted to page costs:
|
||||
|
||||
```
|
||||
page_cost_quota = ceiling(byte_quota / CLIENT_KNOBS->READ_COST_BYTE_FACTOR)
|
||||
```
|
||||
|
||||
To clear a both reserved and total throughput quotas for a tag, run:
|
||||
|
||||
```
|
||||
|
|
|
@ -43,9 +43,9 @@ Optional<LimitType> parseLimitType(StringRef token) {
|
|||
}
|
||||
}
|
||||
|
||||
Optional<double> parseLimitValue(StringRef token) {
|
||||
Optional<int64_t> parseLimitValue(StringRef token) {
|
||||
try {
|
||||
return std::stod(token.toString());
|
||||
return std::stol(token.toString());
|
||||
} catch (...) {
|
||||
return {};
|
||||
}
|
||||
|
@ -63,9 +63,9 @@ ACTOR Future<Void> getQuota(Reference<IDatabase> db, TransactionTag tag, LimitTy
|
|||
} else {
|
||||
auto const quota = ThrottleApi::TagQuotaValue::fromValue(v.get());
|
||||
if (limitType == LimitType::TOTAL) {
|
||||
fmt::print("{}\n", quota.totalQuota * CLIENT_KNOBS->READ_COST_BYTE_FACTOR);
|
||||
fmt::print("{}\n", quota.totalQuota);
|
||||
} else if (limitType == LimitType::RESERVED) {
|
||||
fmt::print("{}\n", quota.reservedQuota * CLIENT_KNOBS->READ_COST_BYTE_FACTOR);
|
||||
fmt::print("{}\n", quota.reservedQuota);
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
|
@ -75,7 +75,7 @@ ACTOR Future<Void> getQuota(Reference<IDatabase> db, TransactionTag tag, LimitTy
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> setQuota(Reference<IDatabase> db, TransactionTag tag, LimitType limitType, double value) {
|
||||
ACTOR Future<Void> setQuota(Reference<IDatabase> db, TransactionTag tag, LimitType limitType, int64_t value) {
|
||||
state Reference<ITransaction> tr = db->createTransaction();
|
||||
loop {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
@ -89,9 +89,13 @@ ACTOR Future<Void> setQuota(Reference<IDatabase> db, TransactionTag tag, LimitTy
|
|||
// Internally, costs are stored in terms of pages, but in the API,
|
||||
// costs are specified in terms of bytes
|
||||
if (limitType == LimitType::TOTAL) {
|
||||
quota.totalQuota = (value - 1) / CLIENT_KNOBS->READ_COST_BYTE_FACTOR + 1;
|
||||
// Round up to nearest page size
|
||||
quota.totalQuota =
|
||||
((value - 1) / CLIENT_KNOBS->READ_COST_BYTE_FACTOR + 1) * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
|
||||
} else if (limitType == LimitType::RESERVED) {
|
||||
quota.reservedQuota = (value - 1) / CLIENT_KNOBS->READ_COST_BYTE_FACTOR + 1;
|
||||
// Round up to nearest page size
|
||||
quota.reservedQuota =
|
||||
((value - 1) / CLIENT_KNOBS->READ_COST_BYTE_FACTOR + 1) * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
|
||||
}
|
||||
if (!quota.isValid()) {
|
||||
throw invalid_throttle_quota_value();
|
||||
|
|
|
@ -272,6 +272,7 @@ void ClientKnobs::initialize(Randomize randomize) {
|
|||
init( TAG_THROTTLE_EXPIRATION_INTERVAL, 60.0 ); if( randomize && BUGGIFY ) TAG_THROTTLE_EXPIRATION_INTERVAL = 1.0;
|
||||
init( WRITE_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) WRITE_COST_BYTE_FACTOR = 4096;
|
||||
init( READ_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) READ_COST_BYTE_FACTOR = 4096;
|
||||
init( GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO, 5.0 );
|
||||
init( PROXY_MAX_TAG_THROTTLE_DURATION, 5.0 ); if( randomize && BUGGIFY ) PROXY_MAX_TAG_THROTTLE_DURATION = 0.5;
|
||||
|
||||
// busyness reporting
|
||||
|
|
|
@ -414,6 +414,20 @@ Version DLTransaction::getCommittedVersion() {
|
|||
return version;
|
||||
}
|
||||
|
||||
ThreadFuture<int64_t> DLTransaction::getTotalCost() {
|
||||
if (!api->transactionGetTotalCost) {
|
||||
return unsupported_operation();
|
||||
}
|
||||
|
||||
FdbCApi::FDBFuture* f = api->transactionGetTotalCost(tr);
|
||||
return toThreadFuture<int64_t>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
|
||||
int64_t size = 0;
|
||||
FdbCApi::fdb_error_t error = api->futureGetInt64(f, &size);
|
||||
ASSERT(!error);
|
||||
return size;
|
||||
});
|
||||
}
|
||||
|
||||
ThreadFuture<int64_t> DLTransaction::getApproximateSize() {
|
||||
if (!api->transactionGetApproximateSize) {
|
||||
return unsupported_operation();
|
||||
|
@ -950,6 +964,11 @@ void DLApi::init() {
|
|||
fdbCPath,
|
||||
"fdb_transaction_get_committed_version",
|
||||
headerVersion >= 0);
|
||||
loadClientFunction(&api->transactionGetTotalCost,
|
||||
lib,
|
||||
fdbCPath,
|
||||
"fdb_transaction_get_total_cost",
|
||||
headerVersion >= ApiVersion::withGetTotalCost().version());
|
||||
loadClientFunction(&api->transactionGetApproximateSize,
|
||||
lib,
|
||||
fdbCPath,
|
||||
|
@ -1486,6 +1505,12 @@ ThreadFuture<SpanContext> MultiVersionTransaction::getSpanContext() {
|
|||
return SpanContext();
|
||||
}
|
||||
|
||||
ThreadFuture<int64_t> MultiVersionTransaction::getTotalCost() {
|
||||
auto tr = getTransaction();
|
||||
auto f = tr.transaction ? tr.transaction->getTotalCost() : makeTimeout<int64_t>();
|
||||
return abortableFuture(f, tr.onChange);
|
||||
}
|
||||
|
||||
ThreadFuture<int64_t> MultiVersionTransaction::getApproximateSize() {
|
||||
auto tr = getTransaction();
|
||||
auto f = tr.transaction ? tr.transaction->getApproximateSize() : makeTimeout<int64_t>();
|
||||
|
|
|
@ -3456,6 +3456,8 @@ ACTOR Future<Optional<Value>> getValue(Reference<TransactionState> trState,
|
|||
}
|
||||
trState->cx->getValueCompleted->latency = timer_int() - startTime;
|
||||
trState->cx->getValueCompleted->log();
|
||||
trState->totalCost +=
|
||||
getReadOperationCost(key.size() + (reply.value.present() ? reply.value.get().size() : 0));
|
||||
|
||||
if (getValueID.present()) {
|
||||
g_traceBatch.addEvent("GetValueDebug",
|
||||
|
@ -4285,6 +4287,7 @@ void getRangeFinished(Reference<TransactionState> trState,
|
|||
RangeResultFamily result) {
|
||||
int64_t bytes = getRangeResultFamilyBytes(result);
|
||||
|
||||
trState->totalCost += getReadOperationCost(bytes);
|
||||
trState->cx->transactionBytesRead += bytes;
|
||||
trState->cx->transactionKeysRead += result.size();
|
||||
|
||||
|
@ -5767,6 +5770,7 @@ void Transaction::set(const KeyRef& key, const ValueRef& value, AddConflictRange
|
|||
auto r = singleKeyRange(key, req.arena);
|
||||
auto v = ValueRef(req.arena, value);
|
||||
t.mutations.emplace_back(req.arena, MutationRef::SetValue, r.begin, v);
|
||||
trState->totalCost += getWriteOperationCost(key.expectedSize() + value.expectedSize());
|
||||
|
||||
if (addConflictRange) {
|
||||
t.write_conflict_ranges.push_back(req.arena, r);
|
||||
|
@ -5796,6 +5800,7 @@ void Transaction::atomicOp(const KeyRef& key,
|
|||
auto v = ValueRef(req.arena, operand);
|
||||
|
||||
t.mutations.emplace_back(req.arena, operationType, r.begin, v);
|
||||
trState->totalCost += getWriteOperationCost(key.expectedSize());
|
||||
|
||||
if (addConflictRange && operationType != MutationRef::SetVersionstampedKey)
|
||||
t.write_conflict_ranges.push_back(req.arena, r);
|
||||
|
@ -5827,7 +5832,10 @@ void Transaction::clear(const KeyRangeRef& range, AddConflictRange addConflictRa
|
|||
return;
|
||||
|
||||
t.mutations.emplace_back(req.arena, MutationRef::ClearRange, r.begin, r.end);
|
||||
|
||||
// NOTE: The throttling cost of each clear is assumed to be one page.
|
||||
// This makes compuation fast, but can be inaccurate and may
|
||||
// underestimate the cost of large clears.
|
||||
trState->totalCost += CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR;
|
||||
if (addConflictRange)
|
||||
t.write_conflict_ranges.push_back(req.arena, r);
|
||||
}
|
||||
|
@ -6240,14 +6248,14 @@ ACTOR Future<Optional<ClientTrCommitCostEstimation>> estimateCommitCosts(Referen
|
|||
state int i = 0;
|
||||
|
||||
for (; i < transaction->mutations.size(); ++i) {
|
||||
auto* it = &transaction->mutations[i];
|
||||
auto const& mutation = transaction->mutations[i];
|
||||
|
||||
if (it->type == MutationRef::Type::SetValue || it->isAtomicOp()) {
|
||||
if (mutation.type == MutationRef::Type::SetValue || mutation.isAtomicOp()) {
|
||||
trCommitCosts.opsCount++;
|
||||
trCommitCosts.writeCosts += getWriteOperationCost(it->expectedSize());
|
||||
} else if (it->type == MutationRef::Type::ClearRange) {
|
||||
trCommitCosts.writeCosts += getWriteOperationCost(mutation.expectedSize());
|
||||
} else if (mutation.type == MutationRef::Type::ClearRange) {
|
||||
trCommitCosts.opsCount++;
|
||||
keyRange = KeyRangeRef(it->param1, it->param2);
|
||||
keyRange = KeyRangeRef(mutation.param1, mutation.param2);
|
||||
if (trState->options.expensiveClearCostEstimation) {
|
||||
StorageMetrics m = wait(trState->cx->getStorageMetrics(keyRange, CLIENT_KNOBS->TOO_MANY, trState));
|
||||
trCommitCosts.clearIdxCosts.emplace_back(i, getWriteOperationCost(m.bytes));
|
||||
|
|
|
@ -564,6 +564,10 @@ Version PaxosConfigTransaction::getCommittedVersion() const {
|
|||
return impl->getCommittedVersion();
|
||||
}
|
||||
|
||||
int64_t PaxosConfigTransaction::getTotalCost() const {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int64_t PaxosConfigTransaction::getApproximateSize() const {
|
||||
return impl->getApproximateSize();
|
||||
}
|
||||
|
|
|
@ -732,7 +732,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( ENFORCE_TAG_THROTTLING_ON_PROXIES, GLOBAL_TAG_THROTTLING );
|
||||
init( GLOBAL_TAG_THROTTLING_MIN_RATE, 1.0 );
|
||||
init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 10.0 );
|
||||
init( GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO, 5.0 );
|
||||
init( GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED, 10 );
|
||||
init( GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER, 240.0 );
|
||||
init( GLOBAL_TAG_THROTTLING_PROXY_LOGGING_INTERVAL, 60.0 );
|
||||
|
|
|
@ -296,6 +296,10 @@ Version SimpleConfigTransaction::getCommittedVersion() const {
|
|||
return impl->getCommittedVersion();
|
||||
}
|
||||
|
||||
int64_t SimpleConfigTransaction::getTotalCost() const {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int64_t SimpleConfigTransaction::getApproximateSize() const {
|
||||
return impl->getApproximateSize();
|
||||
}
|
||||
|
|
|
@ -626,6 +626,14 @@ ThreadFuture<SpanContext> ThreadSafeTransaction::getSpanContext() {
|
|||
});
|
||||
}
|
||||
|
||||
ThreadFuture<int64_t> ThreadSafeTransaction::getTotalCost() {
|
||||
ISingleThreadTransaction* tr = this->tr;
|
||||
return onMainThread([tr]() -> Future<int64_t> {
|
||||
tr->checkDeferredError();
|
||||
return tr->getTotalCost();
|
||||
});
|
||||
}
|
||||
|
||||
ThreadFuture<int64_t> ThreadSafeTransaction::getApproximateSize() {
|
||||
ISingleThreadTransaction* tr = this->tr;
|
||||
return onMainThread([tr]() -> Future<int64_t> {
|
||||
|
|
|
@ -262,6 +262,8 @@ public:
|
|||
double TAG_THROTTLE_EXPIRATION_INTERVAL;
|
||||
int64_t WRITE_COST_BYTE_FACTOR; // Used to round up the cost of write operations
|
||||
int64_t READ_COST_BYTE_FACTOR; // Used to round up the cost of read operations
|
||||
// Cost multiplier for writes (because write operations are more expensive than reads):
|
||||
double GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO;
|
||||
double PROXY_MAX_TAG_THROTTLE_DURATION; // Maximum duration that a transaction can be tag throttled by proxy before
|
||||
// being rejected
|
||||
|
||||
|
|
|
@ -120,6 +120,7 @@ public:
|
|||
// later if they are not really needed.
|
||||
virtual ThreadFuture<VersionVector> getVersionVector() = 0;
|
||||
virtual ThreadFuture<SpanContext> getSpanContext() = 0;
|
||||
virtual ThreadFuture<int64_t> getTotalCost() = 0;
|
||||
virtual ThreadFuture<int64_t> getApproximateSize() = 0;
|
||||
|
||||
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
|
||||
|
|
|
@ -101,6 +101,7 @@ public:
|
|||
virtual Version getCommittedVersion() const = 0;
|
||||
virtual VersionVector getVersionVector() const = 0;
|
||||
virtual SpanContext getSpanContext() const = 0;
|
||||
virtual int64_t getTotalCost() const = 0;
|
||||
virtual int64_t getApproximateSize() const = 0;
|
||||
virtual Future<Standalone<StringRef>> getVersionstamp() = 0;
|
||||
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
|
||||
|
|
|
@ -377,6 +377,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
|
|||
|
||||
FDBFuture* (*transactionCommit)(FDBTransaction* tr);
|
||||
fdb_error_t (*transactionGetCommittedVersion)(FDBTransaction* tr, int64_t* outVersion);
|
||||
FDBFuture* (*transactionGetTotalCost)(FDBTransaction* tr);
|
||||
FDBFuture* (*transactionGetApproximateSize)(FDBTransaction* tr);
|
||||
FDBFuture* (*transactionWatch)(FDBTransaction* tr, uint8_t const* keyName, int keyNameLength);
|
||||
FDBFuture* (*transactionOnError)(FDBTransaction* tr, fdb_error_t error);
|
||||
|
@ -505,6 +506,7 @@ public:
|
|||
Version getCommittedVersion() override;
|
||||
ThreadFuture<VersionVector> getVersionVector() override;
|
||||
ThreadFuture<SpanContext> getSpanContext() override { return SpanContext(); };
|
||||
ThreadFuture<int64_t> getTotalCost() override;
|
||||
ThreadFuture<int64_t> getApproximateSize() override;
|
||||
|
||||
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
|
||||
|
@ -732,6 +734,7 @@ public:
|
|||
Version getCommittedVersion() override;
|
||||
ThreadFuture<VersionVector> getVersionVector() override;
|
||||
ThreadFuture<SpanContext> getSpanContext() override;
|
||||
ThreadFuture<int64_t> getTotalCost() override;
|
||||
ThreadFuture<int64_t> getApproximateSize() override;
|
||||
|
||||
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
|
||||
|
|
|
@ -249,6 +249,9 @@ struct TransactionState : ReferenceCounted<TransactionState> {
|
|||
SpanContext spanContext;
|
||||
UseProvisionalProxies useProvisionalProxies = UseProvisionalProxies::False;
|
||||
bool readVersionObtainedFromGrvProxy;
|
||||
// Measured by summing the bytes accessed by each read and write operation
|
||||
// after rounding up to the nearest page size and applying a write penalty
|
||||
int64_t totalCost = 0;
|
||||
|
||||
// Special flag to skip prepending tenant prefix to mutations and conflict ranges
|
||||
// when a dummy, internal transaction gets commited. The sole purpose of commitDummyTransaction() is to
|
||||
|
@ -447,6 +450,8 @@ public:
|
|||
// May be called only after commit() returns success
|
||||
Version getCommittedVersion() const { return trState->committedVersion; }
|
||||
|
||||
int64_t getTotalCost() const { return trState->totalCost; }
|
||||
|
||||
// Will be fulfilled only after commit() returns success
|
||||
[[nodiscard]] Future<Standalone<StringRef>> getVersionstamp();
|
||||
|
||||
|
@ -566,9 +571,16 @@ ACTOR Future<std::vector<CheckpointMetaData>> getCheckpointMetaData(Database cx,
|
|||
// Checks with Data Distributor that it is safe to mark all servers in exclusions as failed
|
||||
ACTOR Future<bool> checkSafeExclusions(Database cx, std::vector<AddressExclusion> exclusions);
|
||||
|
||||
// Round up to the nearest page size
|
||||
// Measured in bytes, rounded up to the nearest page size. Multiply by fungibility ratio
|
||||
// because writes are more expensive than reads.
|
||||
inline uint64_t getWriteOperationCost(uint64_t bytes) {
|
||||
return (bytes - 1) / CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR + 1;
|
||||
return CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR *
|
||||
((bytes - 1) / CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR + 1);
|
||||
}
|
||||
|
||||
// Measured in bytes, rounded up to the nearest page size.
|
||||
inline uint64_t getReadOperationCost(uint64_t bytes) {
|
||||
return ((bytes - 1) / CLIENT_KNOBS->READ_COST_BYTE_FACTOR + 1) * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
|
||||
}
|
||||
|
||||
// Create a transaction to set the value of system key \xff/conf/perpetual_storage_wiggle. If enable == true, the value
|
||||
|
|
|
@ -64,6 +64,7 @@ public:
|
|||
void clear(KeyRef const&) override;
|
||||
Future<Void> commit() override;
|
||||
Version getCommittedVersion() const override;
|
||||
int64_t getTotalCost() const override;
|
||||
int64_t getApproximateSize() const override;
|
||||
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
|
||||
Future<Void> onError(Error const& e) override;
|
||||
|
|
|
@ -149,6 +149,7 @@ public:
|
|||
VersionVector getVersionVector() const override { return tr.getVersionVector(); }
|
||||
SpanContext getSpanContext() const override { return tr.getSpanContext(); }
|
||||
|
||||
int64_t getTotalCost() const override { return tr.getTotalCost(); }
|
||||
int64_t getApproximateSize() const override { return approximateSize; }
|
||||
[[nodiscard]] Future<Standalone<StringRef>> getVersionstamp() override;
|
||||
|
||||
|
|
|
@ -629,8 +629,6 @@ public:
|
|||
double GLOBAL_TAG_THROTTLING_MIN_RATE;
|
||||
// Used by global tag throttling counters
|
||||
double GLOBAL_TAG_THROTTLING_FOLDING_TIME;
|
||||
// Cost multiplier for writes (because write operations are more expensive than reads)
|
||||
double GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO;
|
||||
// Maximum number of tags tracked by global tag throttler. Additional tags will be ignored
|
||||
// until some existing tags expire
|
||||
int64_t GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED;
|
||||
|
@ -742,7 +740,6 @@ public:
|
|||
int64_t MIN_TAG_READ_PAGES_RATE;
|
||||
int64_t MIN_TAG_WRITE_PAGES_RATE;
|
||||
double TAG_MEASUREMENT_INTERVAL;
|
||||
int64_t READ_COST_BYTE_FACTOR;
|
||||
bool PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS;
|
||||
bool REPORT_DD_METRICS;
|
||||
double DD_METRICS_REPORT_INTERVAL;
|
||||
|
|
|
@ -76,6 +76,7 @@ public:
|
|||
void reset() override;
|
||||
void debugTransaction(UID dID) override;
|
||||
void checkDeferredError() const override;
|
||||
int64_t getTotalCost() const override;
|
||||
int64_t getApproximateSize() const override;
|
||||
void set(KeyRef const&, ValueRef const&) override;
|
||||
void clear(KeyRangeRef const&) override { throw client_invalid_operation(); }
|
||||
|
|
|
@ -205,6 +205,7 @@ public:
|
|||
Version getCommittedVersion() override;
|
||||
ThreadFuture<VersionVector> getVersionVector() override;
|
||||
ThreadFuture<SpanContext> getSpanContext() override;
|
||||
ThreadFuture<int64_t> getTotalCost() override;
|
||||
ThreadFuture<int64_t> getApproximateSize() override;
|
||||
|
||||
ThreadFuture<uint64_t> getProtocolVersion();
|
||||
|
|
|
@ -107,7 +107,7 @@ class GlobalTagThrottlerImpl {
|
|||
if (opType == OpType::READ) {
|
||||
readCost.setTotal(newCost);
|
||||
} else {
|
||||
writeCost.setTotal(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * newCost);
|
||||
writeCost.setTotal(CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * newCost);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/TransactionTagCounter.h"
|
||||
#include "flow/Trace.h"
|
||||
|
@ -90,9 +91,6 @@ class TransactionTagCounterImpl {
|
|||
std::vector<StorageQueuingMetricsReply::TagInfo> previousBusiestTags;
|
||||
Reference<EventCacheHolder> busiestReadTagEventHolder;
|
||||
|
||||
// Round up to the nearest page size
|
||||
static int64_t costFunction(int64_t bytes) { return (bytes - 1) / CLIENT_KNOBS->READ_COST_BYTE_FACTOR + 1; }
|
||||
|
||||
public:
|
||||
TransactionTagCounterImpl(UID thisServerID)
|
||||
: thisServerID(thisServerID), topTags(SERVER_KNOBS->SS_THROTTLE_TAGS_TRACKED),
|
||||
|
@ -101,7 +99,7 @@ public:
|
|||
void addRequest(Optional<TagSet> const& tags, int64_t bytes) {
|
||||
if (tags.present()) {
|
||||
CODE_PROBE(true, "Tracking transaction tag in counter");
|
||||
double cost = costFunction(bytes);
|
||||
auto const cost = getReadOperationCost(bytes);
|
||||
for (auto& tag : tags.get()) {
|
||||
int64_t& count = intervalCounts[TransactionTag(tag, tags.get().getArena())];
|
||||
topTags.incrementCount(tag, count, cost);
|
||||
|
|
|
@ -2001,7 +2001,9 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
|
|||
data->sendErrorWithPenalty(req.reply, e, data->getPenalty());
|
||||
}
|
||||
|
||||
data->transactionTagCounter.addRequest(req.tags, resultSize);
|
||||
// Key size is not included in "BytesQueried", but still contributes to cost,
|
||||
// so it must be accounted for here.
|
||||
data->transactionTagCounter.addRequest(req.tags, req.key.size() + resultSize);
|
||||
|
||||
++data->counters.finishedQueries;
|
||||
|
||||
|
|
|
@ -0,0 +1,323 @@
|
|||
/*
|
||||
* TransactionCost.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/actorcompiler.h"
|
||||
|
||||
class TransactionCostWorkload : public TestWorkload {
|
||||
int iterations{ 1000 };
|
||||
Key prefix;
|
||||
bool debugTransactions{ false };
|
||||
|
||||
Key getKey(uint64_t testNumber, uint64_t index = 0) const {
|
||||
BinaryWriter bw(Unversioned());
|
||||
bw << bigEndian64(testNumber);
|
||||
bw << bigEndian64(index);
|
||||
return bw.toValue().withPrefix(prefix);
|
||||
}
|
||||
|
||||
static Value getValue(uint32_t size) { return makeString(size); }
|
||||
|
||||
static UID getDebugID(uint64_t testNumber) { return UID(testNumber << 32, testNumber << 32); }
|
||||
|
||||
class ITest {
|
||||
protected:
|
||||
uint64_t testNumber;
|
||||
explicit ITest(uint64_t testNumber) : testNumber(testNumber) {}
|
||||
|
||||
public:
|
||||
void debugTransaction(ReadYourWritesTransaction& tr) { tr.debugTransaction(getDebugID(testNumber)); }
|
||||
virtual Future<Void> setup(TransactionCostWorkload const& workload, Database const&) { return Void(); }
|
||||
virtual Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction>) = 0;
|
||||
virtual int64_t expectedFinalCost() const = 0;
|
||||
virtual ~ITest() = default;
|
||||
};
|
||||
|
||||
class ReadEmptyTest : public ITest {
|
||||
public:
|
||||
explicit ReadEmptyTest(uint64_t testNumber) : ITest(testNumber) {}
|
||||
|
||||
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||
return success(tr->get(workload.getKey(testNumber)));
|
||||
}
|
||||
|
||||
int64_t expectedFinalCost() const override { return CLIENT_KNOBS->READ_COST_BYTE_FACTOR; }
|
||||
};
|
||||
|
||||
class ReadLargeValueTest : public ITest {
|
||||
ACTOR static Future<Void> setup(TransactionCostWorkload const* workload,
|
||||
ReadLargeValueTest* self,
|
||||
Database cx) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
tr.set(workload->getKey(self->testNumber), getValue(CLIENT_KNOBS->READ_COST_BYTE_FACTOR));
|
||||
wait(tr.commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
explicit ReadLargeValueTest(int64_t testNumber) : ITest(testNumber) {}
|
||||
|
||||
Future<Void> setup(TransactionCostWorkload const& workload, Database const& cx) override {
|
||||
return setup(&workload, this, cx);
|
||||
}
|
||||
|
||||
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||
return success(tr->get(workload.getKey(testNumber)));
|
||||
}
|
||||
|
||||
int64_t expectedFinalCost() const override { return 2 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR; }
|
||||
};
|
||||
|
||||
class WriteTest : public ITest {
|
||||
public:
|
||||
explicit WriteTest(int64_t testNumber) : ITest(testNumber) {}
|
||||
|
||||
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||
tr->set(workload.getKey(testNumber), getValue(20));
|
||||
return Void();
|
||||
}
|
||||
|
||||
int64_t expectedFinalCost() const override {
|
||||
return CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR;
|
||||
}
|
||||
};
|
||||
|
||||
class WriteLargeValueTest : public ITest {
|
||||
public:
|
||||
explicit WriteLargeValueTest(int64_t testNumber) : ITest(testNumber) {}
|
||||
|
||||
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||
tr->set(workload.getKey(testNumber), getValue(CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR));
|
||||
return Void();
|
||||
}
|
||||
|
||||
int64_t expectedFinalCost() const override {
|
||||
return 2 * CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR;
|
||||
}
|
||||
};
|
||||
|
||||
class WriteMultipleValuesTest : public ITest {
|
||||
public:
|
||||
explicit WriteMultipleValuesTest(int64_t testNumber) : ITest(testNumber) {}
|
||||
|
||||
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
tr->set(workload.getKey(testNumber, i), getValue(20));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
int64_t expectedFinalCost() const override {
|
||||
return 10 * CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR;
|
||||
}
|
||||
};
|
||||
|
||||
class ClearTest : public ITest {
|
||||
public:
|
||||
explicit ClearTest(int64_t testNumber) : ITest(testNumber) {}
|
||||
|
||||
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||
tr->clear(singleKeyRange(workload.getKey(testNumber)));
|
||||
return Void();
|
||||
}
|
||||
|
||||
int64_t expectedFinalCost() const override { return CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR; }
|
||||
};
|
||||
|
||||
class ReadRangeTest : public ITest {
|
||||
ACTOR static Future<Void> setup(ReadRangeTest* self, TransactionCostWorkload const* workload, Database cx) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
tr.set(workload->getKey(self->testNumber, i), workload->getValue(20));
|
||||
}
|
||||
wait(tr.commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
explicit ReadRangeTest(int64_t testNumber) : ITest(testNumber) {}
|
||||
|
||||
Future<Void> setup(TransactionCostWorkload const& workload, Database const& cx) override {
|
||||
return setup(this, &workload, cx);
|
||||
}
|
||||
|
||||
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||
KeyRange const keys = KeyRangeRef(workload.getKey(testNumber, 0), workload.getKey(testNumber, 10));
|
||||
return success(tr->getRange(keys, 10));
|
||||
}
|
||||
|
||||
int64_t expectedFinalCost() const override { return CLIENT_KNOBS->READ_COST_BYTE_FACTOR; }
|
||||
};
|
||||
|
||||
class ReadMultipleValuesTest : public ITest {
|
||||
ACTOR static Future<Void> setup(ReadMultipleValuesTest* self,
|
||||
TransactionCostWorkload const* workload,
|
||||
Database cx) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
tr.set(workload->getKey(self->testNumber, i), workload->getValue(20));
|
||||
}
|
||||
wait(tr.commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
explicit ReadMultipleValuesTest(int64_t testNumber) : ITest(testNumber) {}
|
||||
|
||||
Future<Void> setup(TransactionCostWorkload const& workload, Database const& cx) override {
|
||||
return setup(this, &workload, cx);
|
||||
}
|
||||
|
||||
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||
std::vector<Future<Void>> futures;
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
futures.push_back(success(tr->get(workload.getKey(testNumber, i))));
|
||||
}
|
||||
return waitForAll(futures);
|
||||
}
|
||||
|
||||
int64_t expectedFinalCost() const override { return 10 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR; }
|
||||
};
|
||||
|
||||
class LargeReadRangeTest : public ITest {
|
||||
ACTOR static Future<Void> setup(LargeReadRangeTest* self,
|
||||
TransactionCostWorkload const* workload,
|
||||
Database cx) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
tr.set(workload->getKey(self->testNumber, i),
|
||||
workload->getValue(CLIENT_KNOBS->READ_COST_BYTE_FACTOR));
|
||||
}
|
||||
wait(tr.commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
explicit LargeReadRangeTest(int64_t testNumber) : ITest(testNumber) {}
|
||||
|
||||
Future<Void> setup(TransactionCostWorkload const& workload, Database const& cx) override {
|
||||
return setup(this, &workload, cx);
|
||||
}
|
||||
|
||||
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||
KeyRange const keys = KeyRangeRef(workload.getKey(testNumber, 0), workload.getKey(testNumber, 10));
|
||||
return success(tr->getRange(keys, 10));
|
||||
}
|
||||
|
||||
int64_t expectedFinalCost() const override { return 11 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR; }
|
||||
};
|
||||
|
||||
static std::unique_ptr<ITest> createRandomTest(int64_t testNumber) {
|
||||
auto const rand = deterministicRandom()->randomInt(0, 9);
|
||||
if (rand == 0) {
|
||||
return std::make_unique<ReadEmptyTest>(testNumber);
|
||||
} else if (rand == 1) {
|
||||
return std::make_unique<ReadLargeValueTest>(testNumber);
|
||||
} else if (rand == 2) {
|
||||
return std::make_unique<ReadMultipleValuesTest>(testNumber);
|
||||
} else if (rand == 3) {
|
||||
return std::make_unique<WriteTest>(testNumber);
|
||||
} else if (rand == 4) {
|
||||
return std::make_unique<WriteLargeValueTest>(testNumber);
|
||||
} else if (rand == 5) {
|
||||
return std::make_unique<WriteMultipleValuesTest>(testNumber);
|
||||
} else if (rand == 6) {
|
||||
return std::make_unique<ClearTest>(testNumber);
|
||||
} else if (rand == 7) {
|
||||
return std::make_unique<ReadRangeTest>(testNumber);
|
||||
} else {
|
||||
return std::make_unique<LargeReadRangeTest>(testNumber);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> runTest(TransactionCostWorkload* self, Database cx, ITest* test) {
|
||||
wait(test->setup(*self, cx));
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||
if (self->debugTransactions) {
|
||||
test->debugTransaction(*tr);
|
||||
}
|
||||
loop {
|
||||
try {
|
||||
wait(test->exec(*self, tr));
|
||||
wait(tr->commit());
|
||||
ASSERT_EQ(tr->getTotalCost(), test->expectedFinalCost());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> start(TransactionCostWorkload* self, Database cx) {
|
||||
state uint64_t testNumber = 0;
|
||||
state Future<Void> f;
|
||||
// Must use shared_ptr because Flow doesn't support perfect forwarding into actors
|
||||
state std::shared_ptr<ITest> test;
|
||||
for (; testNumber < self->iterations; ++testNumber) {
|
||||
test = createRandomTest(testNumber);
|
||||
wait(runTest(self, cx, test.get()));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
public:
|
||||
TransactionCostWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
iterations = getOption(options, "iterations"_sr, 1000);
|
||||
prefix = getOption(options, "prefix"_sr, "transactionCost/"_sr);
|
||||
debugTransactions = getOption(options, "debug"_sr, false);
|
||||
}
|
||||
|
||||
static constexpr auto NAME = "TransactionCost";
|
||||
|
||||
Future<Void> setup(Database const& cx) override { return Void(); }
|
||||
|
||||
Future<Void> start(Database const& cx) override { return clientId ? Void() : start(this, cx); }
|
||||
|
||||
Future<bool> check(Database const& cx) override { return true; }
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||
};
|
||||
|
||||
WorkloadFactory<TransactionCostWorkload> TransactionCostWorkloadFactory;
|
|
@ -71,6 +71,7 @@ public: // introduced features
|
|||
API_VERSION_FEATURE(@FDB_AV_FUTURE_GET_BOOL@, FutureGetBool);
|
||||
API_VERSION_FEATURE(@FDB_AV_FUTURE_PROTOCOL_VERSION_API@, FutureProtocolVersionApi);
|
||||
API_VERSION_FEATURE(@FDB_AV_TENANT_BLOB_RANGE_API@, TenantBlobRangeApi);
|
||||
API_VERSION_FEATURE(@FDB_AV_GET_TOTAL_COST@, GetTotalCost);
|
||||
};
|
||||
|
||||
#endif // FLOW_CODE_API_VERSION_H
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
# API Versions
|
||||
set(FDB_AV_LATEST_VERSION "720")
|
||||
set(FDB_AV_LATEST_VERSION "730")
|
||||
|
||||
# Features
|
||||
set(FDB_AV_SNAPSHOT_RYW "300")
|
||||
|
@ -11,4 +11,5 @@ set(FDB_AV_BLOB_RANGE_API "720")
|
|||
set(FDB_AV_CREATE_DB_FROM_CONN_STRING "720")
|
||||
set(FDB_AV_FUTURE_GET_BOOL "720")
|
||||
set(FDB_AV_FUTURE_PROTOCOL_VERSION_API "720")
|
||||
set(FDB_AV_TENANT_BLOB_RANGE_API "720")
|
||||
set(FDB_AV_TENANT_BLOB_RANGE_API "720")
|
||||
set(FDB_AV_GET_TOTAL_COST "730")
|
||||
|
|
|
@ -240,6 +240,7 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES rare/RedwoodDeltaTree.toml)
|
||||
add_fdb_test(TEST_FILES rare/Throttling.toml)
|
||||
add_fdb_test(TEST_FILES rare/ThroughputQuota.toml)
|
||||
add_fdb_test(TEST_FILES rare/TransactionCost.toml)
|
||||
add_fdb_test(TEST_FILES rare/TransactionTagApiCorrectness.toml)
|
||||
add_fdb_test(TEST_FILES rare/TransactionTagSwizzledApiCorrectness.toml)
|
||||
add_fdb_test(TEST_FILES rare/WriteTagThrottling.toml)
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
[[test]]
|
||||
testTitle = 'TransactionCostTest'
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'TransactionCost'
|
||||
iterations = 1000
|
Loading…
Reference in New Issue